import datetime #from datetime import datetime, time, timedelta from fastapi import APIRouter, Body, Depends, Header, HTTPException, Query, status from pydantic import BaseModel, EmailStr, Field from typing import Dict, List, Optional, Set, Union from app.lib_general import log, logging #from ..log import * from app.config import settings from app.db_sql import * from .api_crud import delete_obj_template, get_obj_template, get_obj_li_template, patch_obj_template, post_obj_template from app.methods.contact_methods import load_contact_obj, update_contact_obj from app.models.contact_models import Contact_Base from app.models.response_models import * router = APIRouter() @router.post('', response_model=Resp_Body_Base) async def post_contact_obj( obj: Contact_Base, x_account_id: str = Header(...), return_obj: Optional[bool] = True, by_alias: Optional[bool] = True, exclude_unset: Optional[bool] = True, ): log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) obj_type = 'contact' obj_data_dict = obj.dict(by_alias=False, exclude_unset=True) result = post_obj_template( obj_type=obj_type, data=obj_data_dict, return_obj=True, by_alias=True, exclude_unset=True, ) return result @router.patch('/{obj_id}', response_model=Resp_Body_Base) async def patch_contact_obj( obj: Contact_Base, obj_id: str = Query(..., min_length=1, max_length=22), x_account_id: Optional[str] = Header(..., ), return_obj: Optional[bool] = True, by_alias: Optional[bool] = True, exclude_unset: Optional[bool] = True, ): log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) obj_type = 'contact' obj_data_dict = obj.dict(by_alias=False, exclude_unset=True) obj_data_dict['id'] = redis_lookup_id_random(record_id_random=obj_id, table_name=obj_type) obj_data_dict['id_random'] = obj_id result = patch_obj_template( obj_type=obj_type, data=obj_data_dict, obj_id=obj_id, return_obj=True, by_alias=True, exclude_unset=True, ) return result # ### BEGIN ### API Contact ### patch_contact_json() ### @router.patch('/{contact_id}/json', response_model=Resp_Body_Base) async def patch_contact_json( contact_obj: Contact_Base, contact_id: str = Query(..., min_length=1, max_length=22), create_missing_obj: bool = False, x_account_id: Optional[str] = Header(..., ), return_obj: Optional[bool] = True, by_alias: Optional[bool] = True, include: Optional[list] = [], exclude: Optional[list] = [], exclude_unset: Optional[bool] = True, exclude_none: Optional[bool] = True, ): log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) if contact_id := redis_lookup_id_random(record_id_random=contact_id, table_name='contact'): pass else: return mk_resp(data=None, status_code=404) if contact_obj_up_result := update_contact_obj( contact_id=contact_id, contact_obj_up=contact_obj, create_missing_obj=create_missing_obj, ): log.debug(contact_obj_up_result) if return_obj: contact_obj = load_contact_obj(contact_id=contact_id) contact_dict = contact_obj.dict(by_alias=by_alias, exclude_unset=exclude_unset) return mk_resp(data=contact_dict) else: return mk_resp(data=contact_obj_up_result) else: return mk_resp(data=False, status_code=400) # Bad Request # ### END ### API Contact ### patch_contact_json() ### @router.get('/list', response_model=Resp_Body_Base) async def get_contact_obj_li( for_obj_type: str = Query(None, min_length=2, max_length=50), for_obj_id: str = Query(None, min_length=1, max_length=22), group: str = Query(None, min_length=2, max_length=50), x_account_id: str = Header(...), by_alias: bool = True, exclude_unset: bool = True, ): log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) obj_type = 'contact' if for_obj_type == 'event_exhibit' and for_obj_id: #base_name = obj_type_li[obj_type]['base_name'] base_name = Contact_Base for_obj_id_random = for_obj_id for_obj_id = redis_lookup_id_random(record_id_random=for_obj_id_random, table_name=for_obj_type) data = {} data['for_obj_type'] = for_obj_type data['for_obj_id'] = for_obj_id data['for_obj_id_random'] = for_obj_id_random data['group'] = group if group: sql = """ SELECT * FROM `contact` AS contact WHERE contact.for_type = :for_obj_type AND contact.for_id = :for_obj_id AND contact.group = :group """ else: sql = """ SELECT * FROM `contact` AS contact WHERE contact.for_type = :for_obj_type AND contact.for_id = :for_obj_id """ if sql_result := sql_select(data=data, sql=sql, as_list=True): resp_data_li = [] for record in sql_result: resp_data = base_name(**record).dict(by_alias=by_alias, exclude_unset=exclude_unset) resp_data_li.append(resp_data) return mk_resp(data=resp_data_li) else: log.debug(sql_result) return mk_resp(data=False, status_code=404) result = get_obj_li_template( obj_type=obj_type, for_obj_type=for_obj_type, for_obj_id=for_obj_id, by_alias=True, exclude_unset=True, ) return result @router.get('/{obj_id}', response_model=Resp_Body_Base) async def get_contact_obj( obj_id: str = Query(..., min_length=1, max_length=22), x_account_id: str = Header(...), by_alias: Optional[bool] = True, exclude_unset: Optional[bool] = True, ): log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) obj_type = 'contact' result = get_obj_template( obj_type=obj_type, obj_id=obj_id, by_alias=True, exclude_unset=True, ) return result @router.delete('/{obj_id}', response_model=Resp_Body_Base) async def delete_contact_obj( obj_id: str = Query(..., min_length=1, max_length=22), x_account_id: str = Header(...), ): log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) obj_type = 'contact' result = delete_obj_template( obj_type=obj_type, obj_id=obj_id, ) return result