Files
OSIT-AE-API-FastAPI/app/routers/address.py
2021-08-25 10:58:39 -04:00

180 lines
6.1 KiB
Python

import datetime
from fastapi import APIRouter, Body, Depends, Header, HTTPException, Query, Response, status
from pydantic import BaseModel, EmailStr, Field
from typing import Dict, List, Optional, Set, Union
from app.lib_general import log, logging
from app.config import settings
from app.db_sql import *
from app.routers.api_crud import delete_obj_template, get_obj_template, get_obj_li_template, patch_obj_template, post_obj_template
from app.methods.address_methods import load_address_obj, update_address_obj
from app.models.address_models import Address_Base
from app.models.response_models import *
router = APIRouter()
@router.post('', response_model=Resp_Body_Base)
async def post_address_obj(
address_obj: Address_Base,
x_account_id: str = Header(...),
return_obj: Optional[bool] = True,
by_alias: Optional[bool] = True,
#include: Optional[list] = [],
#exclude: Optional[list] = [],
exclude_unset: Optional[bool] = True,
exclude_none: Optional[bool] = True,
response: Response = Response,
):
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
obj_type = 'address'
obj_data_dict = address_obj.dict(by_alias=False, exclude_unset=True)
result = post_obj_template(
obj_type=obj_type,
data=obj_data_dict,
return_obj=True,
by_alias=True,
exclude_unset=True,
)
return result
@router.patch('/{address_obj_id}', response_model=Resp_Body_Base)
async def patch_address_obj(
address_obj: Address_Base,
address_obj_id: str = Query(..., min_length=1, max_length=22),
x_account_id: Optional[str] = Header(..., ),
return_obj: Optional[bool] = True,
by_alias: Optional[bool] = True,
exclude_unset: Optional[bool] = True,
response: Response = Response,
):
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
address_obj_type = 'address'
address_obj_data_dict = address_obj.dict(by_alias=False, exclude_unset=True)
address_obj_data_dict['id'] = redis_lookup_id_random(record_id_random=address_obj_id, table_name=address_obj_type)
address_obj_data_dict['id_random'] = address_obj_id
result = patch_obj_template(
obj_type=address_obj_type,
data=address_obj_data_dict,
obj_id=address_obj_id,
return_obj=True,
by_alias=True,
exclude_unset=True,
)
return result
# ### BEGIN ### API Address ### patch_address_json() ###
@router.patch('/{address_id}/json', response_model=Resp_Body_Base)
async def patch_address_json(
address_obj: Address_Base,
address_id: str = Query(..., min_length=1, max_length=22),
create_sub_obj: bool = False,
x_account_id: Optional[str] = Header(..., ),
return_obj: Optional[bool] = True,
by_alias: Optional[bool] = True,
include: Optional[list] = [],
exclude: Optional[list] = [],
exclude_unset: Optional[bool] = True,
exclude_none: Optional[bool] = True,
response: Response = Response,
):
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
if address_id := redis_lookup_id_random(record_id_random=address_id, table_name='address'): pass
else:
return mk_resp(data=None, status_code=404)
if address_obj_up_result := update_address_obj(
address_id=address_id,
address_obj_up=address_obj,
create_sub_obj=create_sub_obj,
):
log.debug(address_obj_up_result)
if return_obj:
address_obj = load_address_obj(address_id=address_id)
address_dict = address_obj.dict(by_alias=by_alias, exclude_unset=exclude_unset)
return mk_resp(data=address_dict)
else:
return mk_resp(data=address_obj_up_result)
else:
return mk_resp(data=False, status_code=400, response=response) # Bad Request
# ### END ### API Address ### patch_address_json() ###
@router.get('/list', response_model=Resp_Body_Base)
async def get_address_obj_li(
for_obj_type: Optional[str] = Query(None, min_length=2, max_length=50),
for_obj_id: Optional[str] = Query(None, min_length=1, max_length=22),
x_account_id: str = Header(...),
by_alias: Optional[bool] = True,
include: Optional[list] = [],
exclude: Optional[list] = [],
exclude_unset: Optional[bool] = True,
exclude_none: Optional[bool] = True,
response: Response = Response,
):
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
obj_type = 'address'
result = get_obj_li_template(
obj_type=obj_type,
for_obj_type=for_obj_type,
for_obj_id=for_obj_id,
by_alias=True,
exclude_unset=True,
)
return result
@router.get('/{obj_id}', response_model=Resp_Body_Base)
async def get_address_obj(
obj_id: str = Query(..., min_length=1, max_length=22),
x_account_id: str = Header(...),
by_alias: Optional[bool] = True,
include: Optional[list] = [],
exclude: Optional[list] = [],
exclude_unset: Optional[bool] = True,
exclude_none: Optional[bool] = True,
response: Response = Response,
):
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
obj_type = 'address'
result = get_obj_template(
obj_type=obj_type,
obj_id=obj_id,
by_alias=True,
exclude_unset=True,
)
return result
@router.delete('/{obj_id}', response_model=Resp_Body_Base)
async def delete_address_obj(
obj_id: str = Query(..., min_length=1, max_length=22),
x_account_id: str = Header(...),
response: Response = Response,
):
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
obj_type = 'address'
result = delete_obj_template(
obj_type=obj_type,
obj_id=obj_id,
)
return result