Files
OSIT-AE-API-FastAPI/app/routers/contact.py

316 lines
12 KiB
Python

import datetime
from fastapi import APIRouter, Body, Depends, Header, HTTPException, Query, Response, status
from pydantic import BaseModel, EmailStr, Field
from typing import Dict, List, Optional, Set, Union
from app.lib_general import log, logging
from app.config import settings
from app.db_sql import get_id_random, sql_insert, sql_update, sql_insert_or_update, sql_select, sql_delete, redis_lookup_id_random
from app.routers.api_crud import delete_obj_template, get_obj_template, get_obj_li_template, patch_obj_template, post_obj_template
from app.methods.contact_methods import create_update_contact_obj_v4, load_contact_obj, update_contact_obj
from app.models.contact_models import Contact_Base
from app.models.response_models import *
router = APIRouter()
@router.post('', response_model=Resp_Body_Base)
async def post_contact_obj(
obj: Contact_Base,
x_account_id: str = Header(...),
return_obj: Optional[bool] = True,
by_alias: Optional[bool] = True,
exclude_unset: Optional[bool] = True,
response: Response = Response,
):
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
obj_type = 'contact'
obj_data_dict = obj.dict(by_alias=False, exclude_unset=True)
result = post_obj_template(
obj_type=obj_type,
data=obj_data_dict,
return_obj=True,
by_alias=True,
exclude_unset=True,
)
return result
@router.patch('/{obj_id}', response_model=Resp_Body_Base)
async def patch_contact_obj(
obj: Contact_Base,
obj_id: str = Query(..., min_length=1, max_length=22),
x_account_id: Optional[str] = Header(..., ),
return_obj: Optional[bool] = True,
by_alias: Optional[bool] = True,
exclude_unset: Optional[bool] = True,
response: Response = Response,
):
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
obj_type = 'contact'
obj_data_dict = obj.dict(by_alias=False, exclude_unset=True)
obj_data_dict['id'] = redis_lookup_id_random(record_id_random=obj_id, table_name=obj_type)
obj_data_dict['id_random'] = obj_id
result = patch_obj_template(
obj_type=obj_type,
data=obj_data_dict,
obj_id=obj_id,
return_obj=True,
by_alias=True,
exclude_unset=True,
)
return result
# ### BEGIN ### API Contact ### post_contact_obj_new_v4() ###
# Updated 2021-08-25
@router.post('/new_v4', response_model=Resp_Body_Base)
async def post_contact_obj_new_v4(
contact_obj: Contact_Base,
for_type: str = None,
for_id: str = None,
create_sub_obj: bool = False,
fail_any: bool = True, # Fail if any thing goes wrong for sub objects
x_account_id: str = Header(...),
return_obj: bool = True,
inc_address: bool = False,
by_alias: bool = True,
exclude_unset: bool = True,
response: Response = Response,
):
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
# There should probably be a check for the account ID before calling the create update function?
if create_update_contact_obj_result := create_update_contact_obj_v4(
contact_dict_obj = contact_obj,
contact_id = None,
account_id = x_account_id,
for_type = for_type,
for_id = for_id,
create_sub_obj = create_sub_obj,
fail_any = fail_any,
return_outline = False,
): pass
else: return mk_resp(data=False, status_code=400, response=response, status_message='The contact was not created. Check the field names and data types.')
if isinstance(create_update_contact_obj_result, int):
contact_id = create_update_contact_obj_result
if return_obj:
if load_contact_obj_result := load_contact_obj(contact_id=contact_id, inc_address=inc_address):
data = load_contact_obj_result
else:
data = False
else:
contact_id = create_update_contact_obj_result
contact_id_random = get_id_random(record_id=contact_id, table_name='contact')
data = {}
data['contact_id'] = contact_id
data['contact_id_random'] = contact_id_random
return mk_resp(data=data, response=response, status_message='The contact was created.')
else:
return mk_resp(data=False, status_code=400, response=response, status_message='The result from trying to create an contact was unexpected.')
# ### BEGIN ### API Contact ### post_contact_obj_new_v4() ###
# ### BEGIN ### API Contact ### patch_contact_obj_exist_v4() ###
# Updated 2021-08-25
@router.patch('/{contact_id}/exist_v4', response_model=Resp_Body_Base)
async def patch_contact_obj_exist_v4(
contact_obj: Contact_Base,
contact_id: str = Query(..., min_length=11, max_length=22),
for_type: str = None,
for_id: str = None,
create_sub_obj: bool = False,
fail_any: bool = True, # Fail if any thing goes wrong for sub objects
x_account_id: Optional[str] = Header(..., ),
return_obj: Optional[bool] = True,
inc_address: bool = False,
by_alias: Optional[bool] = True,
exclude_unset: Optional[bool] = True,
exclude_none: Optional[bool] = True,
response: Response = Response,
):
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
if contact_id := redis_lookup_id_random(record_id_random=contact_id, table_name='contact'): pass
else: return mk_resp(data=None, status_code=404)
if update_contact_obj_result := create_update_contact_obj_v4(
contact_dict_obj = contact_obj,
contact_id = contact_id,
account_id = x_account_id,
for_type = for_type,
for_id = for_id,
create_sub_obj = create_sub_obj,
fail_any = fail_any,
return_outline = False,
): pass
else: return mk_resp(data=False, status_code=400, response=response, status_message='The contact was not created. Check the field names and data types.')
if update_contact_obj_result:
if return_obj:
if load_contact_obj_result := load_contact_obj(contact_id=contact_id, inc_address=inc_address):
data = load_contact_obj_result
else:
data = False
else:
contact_id_random = get_id_random(record_id=contact_id, table_name='contact')
data = {}
data['contact_id'] = contact_id
data['contact_id_random'] = contact_id_random
return mk_resp(data=data, response=response, status_message='The contact was created.')
else:
return mk_resp(data=False, status_code=400, response=response, status_message='The result from trying to create an contact was unexpected.')
# ### END ### API Contact ### patch_contact_obj_exist_v4() ###
# ### BEGIN ### API Contact ### patch_contact_json() ###
@router.patch('/{contact_id}/json', response_model=Resp_Body_Base)
async def patch_contact_json(
contact_obj: Contact_Base,
contact_id: str = Query(..., min_length=1, max_length=22),
create_sub_obj: bool = False,
x_account_id: Optional[str] = Header(..., ),
return_obj: Optional[bool] = True,
by_alias: Optional[bool] = True,
include: Optional[list] = [],
exclude: Optional[list] = [],
exclude_unset: Optional[bool] = True,
exclude_none: Optional[bool] = True,
response: Response = Response,
):
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
if contact_id := redis_lookup_id_random(record_id_random=contact_id, table_name='contact'): pass
else:
return mk_resp(data=None, status_code=404)
if contact_obj_up_result := update_contact_obj(
contact_id=contact_id,
contact_obj_up=contact_obj,
create_sub_obj=create_sub_obj,
):
log.debug(contact_obj_up_result)
if return_obj:
contact_obj = load_contact_obj(contact_id=contact_id)
contact_dict = contact_obj.dict(by_alias=by_alias, exclude_unset=exclude_unset)
return mk_resp(data=contact_dict)
else:
return mk_resp(data=contact_obj_up_result)
else:
return mk_resp(data=False, status_code=400, response=response) # Bad Request
# ### END ### API Contact ### patch_contact_json() ###
@router.get('/list', response_model=Resp_Body_Base)
async def get_contact_obj_li(
for_obj_type: str = Query(None, min_length=2, max_length=50),
for_obj_id: str = Query(None, min_length=1, max_length=22),
group: str = Query(None, min_length=2, max_length=50),
x_account_id: str = Header(...),
by_alias: bool = True,
exclude_unset: bool = True,
response: Response = Response,
):
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
obj_type = 'contact'
if for_obj_type == 'event_exhibit' and for_obj_id:
#base_name = obj_type_li[obj_type]['base_name']
base_name = Contact_Base
for_obj_id_random = for_obj_id
for_obj_id = redis_lookup_id_random(record_id_random=for_obj_id_random, table_name=for_obj_type)
data = {}
data['for_obj_type'] = for_obj_type
data['for_obj_id'] = for_obj_id
data['for_obj_id_random'] = for_obj_id_random
data['group'] = group
if group:
sql = """
SELECT *
FROM `contact` AS contact
WHERE contact.for_type = :for_obj_type AND contact.for_id = :for_obj_id
AND contact.group = :group
"""
else:
sql = """
SELECT *
FROM `contact` AS contact
WHERE contact.for_type = :for_obj_type AND contact.for_id = :for_obj_id
"""
if sql_result := sql_select(data=data, sql=sql, as_list=True):
resp_data_li = []
for record in sql_result:
resp_data = base_name(**record).dict(by_alias=by_alias, exclude_unset=exclude_unset)
resp_data_li.append(resp_data)
return mk_resp(data=resp_data_li)
else:
log.debug(sql_result)
return mk_resp(data=False, status_code=404, response=response)
result = get_obj_li_template(
obj_type=obj_type,
for_obj_type=for_obj_type,
for_obj_id=for_obj_id,
by_alias=True,
exclude_unset=True,
)
return result
@router.get('/{obj_id}', response_model=Resp_Body_Base)
async def get_contact_obj(
obj_id: str = Query(..., min_length=1, max_length=22),
x_account_id: str = Header(...),
by_alias: Optional[bool] = True,
exclude_unset: Optional[bool] = True,
response: Response = Response,
):
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
obj_type = 'contact'
result = get_obj_template(
obj_type=obj_type,
obj_id=obj_id,
by_alias=True,
exclude_unset=True,
)
return result
@router.delete('/{obj_id}', response_model=Resp_Body_Base)
async def delete_contact_obj(
obj_id: str = Query(..., min_length=1, max_length=22),
x_account_id: str = Header(...),
response: Response = Response,
):
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
obj_type = 'contact'
result = delete_obj_template(
obj_type=obj_type,
obj_id=obj_id,
)
return result