import datetime from fastapi import APIRouter, Body, Depends, Header, HTTPException, Query, status from pydantic import BaseModel, EmailStr, Field from typing import Dict, List, Optional, Set, Union from app.lib_general import log, logging from app.config import settings from app.db_sql import * from .api_crud import delete_obj_template, get_obj_template, get_obj_li_template, patch_obj_template, post_obj_template from app.methods.contact_methods import load_contact_obj, update_contact_obj from app.models.contact_models import Contact_Base from app.models.response_models import * router = APIRouter() @router.post('', response_model=Resp_Body_Base) async def post_contact_obj( obj: Contact_Base, x_account_id: str = Header(...), return_obj: Optional[bool] = True, by_alias: Optional[bool] = True, exclude_unset: Optional[bool] = True, ): log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) obj_type = 'contact' obj_data_dict = obj.dict(by_alias=False, exclude_unset=True) result = post_obj_template( obj_type=obj_type, data=obj_data_dict, return_obj=True, by_alias=True, exclude_unset=True, ) return result @router.patch('/{obj_id}', response_model=Resp_Body_Base) async def patch_contact_obj( obj: Contact_Base, obj_id: str = Query(..., min_length=1, max_length=22), x_account_id: Optional[str] = Header(..., ), return_obj: Optional[bool] = True, by_alias: Optional[bool] = True, exclude_unset: Optional[bool] = True, ): log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) obj_type = 'contact' obj_data_dict = obj.dict(by_alias=False, exclude_unset=True) obj_data_dict['id'] = redis_lookup_id_random(record_id_random=obj_id, table_name=obj_type) obj_data_dict['id_random'] = obj_id result = patch_obj_template( obj_type=obj_type, data=obj_data_dict, obj_id=obj_id, return_obj=True, by_alias=True, exclude_unset=True, ) return result # ### BEGIN ### API Contact ### patch_contact_json() ### @router.patch('/{contact_id}/json', response_model=Resp_Body_Base) async def patch_contact_json( contact_obj: Contact_Base, contact_id: str = Query(..., min_length=1, max_length=22), create_missing_obj: bool = False, x_account_id: Optional[str] = Header(..., ), return_obj: Optional[bool] = True, by_alias: Optional[bool] = True, include: Optional[list] = [], exclude: Optional[list] = [], exclude_unset: Optional[bool] = True, exclude_none: Optional[bool] = True, ): log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) if contact_id := redis_lookup_id_random(record_id_random=contact_id, table_name='contact'): pass else: return mk_resp(data=None, status_code=404) if contact_obj_up_result := update_contact_obj( contact_id=contact_id, contact_obj_up=contact_obj, create_missing_obj=create_missing_obj, ): log.debug(contact_obj_up_result) if return_obj: contact_obj = load_contact_obj(contact_id=contact_id) contact_dict = contact_obj.dict(by_alias=by_alias, exclude_unset=exclude_unset) return mk_resp(data=contact_dict) else: return mk_resp(data=contact_obj_up_result) else: return mk_resp(data=False, status_code=400) # Bad Request # ### END ### API Contact ### patch_contact_json() ### @router.get('/list', response_model=Resp_Body_Base) async def get_contact_obj_li( for_obj_type: str = Query(None, min_length=2, max_length=50), for_obj_id: str = Query(None, min_length=1, max_length=22), group: str = Query(None, min_length=2, max_length=50), x_account_id: str = Header(...), by_alias: bool = True, exclude_unset: bool = True, ): log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) obj_type = 'contact' if for_obj_type == 'event_exhibit' and for_obj_id: #base_name = obj_type_li[obj_type]['base_name'] base_name = Contact_Base for_obj_id_random = for_obj_id for_obj_id = redis_lookup_id_random(record_id_random=for_obj_id_random, table_name=for_obj_type) data = {} data['for_obj_type'] = for_obj_type data['for_obj_id'] = for_obj_id data['for_obj_id_random'] = for_obj_id_random data['group'] = group if group: sql = """ SELECT * FROM `contact` AS contact WHERE contact.for_type = :for_obj_type AND contact.for_id = :for_obj_id AND contact.group = :group """ else: sql = """ SELECT * FROM `contact` AS contact WHERE contact.for_type = :for_obj_type AND contact.for_id = :for_obj_id """ if sql_result := sql_select(data=data, sql=sql, as_list=True): resp_data_li = [] for record in sql_result: resp_data = base_name(**record).dict(by_alias=by_alias, exclude_unset=exclude_unset) resp_data_li.append(resp_data) return mk_resp(data=resp_data_li) else: log.debug(sql_result) return mk_resp(data=False, status_code=404) result = get_obj_li_template( obj_type=obj_type, for_obj_type=for_obj_type, for_obj_id=for_obj_id, by_alias=True, exclude_unset=True, ) return result @router.get('/{obj_id}', response_model=Resp_Body_Base) async def get_contact_obj( obj_id: str = Query(..., min_length=1, max_length=22), x_account_id: str = Header(...), by_alias: Optional[bool] = True, exclude_unset: Optional[bool] = True, ): log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) obj_type = 'contact' result = get_obj_template( obj_type=obj_type, obj_id=obj_id, by_alias=True, exclude_unset=True, ) return result @router.delete('/{obj_id}', response_model=Resp_Body_Base) async def delete_contact_obj( obj_id: str = Query(..., min_length=1, max_length=22), x_account_id: str = Header(...), ): log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) obj_type = 'contact' result = delete_obj_template( obj_type=obj_type, obj_id=obj_id, ) return result