106 lines
3.6 KiB
Python
106 lines
3.6 KiB
Python
import datetime
|
|
#from datetime import datetime, time, timedelta
|
|
from fastapi import APIRouter, Depends, Header, HTTPException, Query, status
|
|
from pydantic import BaseModel, EmailStr, Field
|
|
from typing import Dict, List, Optional, Set, Union
|
|
|
|
from ..lib_general import *
|
|
from ..log import *
|
|
from app.config import settings
|
|
from app.db import *
|
|
#from .journal_models import *
|
|
#from .user_models import *
|
|
from .user_model import *
|
|
from .response_model import *
|
|
|
|
|
|
obj_l1_type_li = {}
|
|
obj_l1_type_li['account'] = 'v_account'
|
|
obj_l1_type_li['address'] = 'v_address'
|
|
obj_l1_type_li['archive'] = 'v_archive'
|
|
obj_l1_type_li['contact'] = 'v_contact'
|
|
obj_l1_type_li['event'] = 'v_event'
|
|
obj_l1_type_li['order'] = 'v_order'
|
|
obj_l1_type_li['organization'] = 'v_organization'
|
|
obj_l1_type_li['person'] = 'v_person'
|
|
obj_l1_type_li['site'] = 'v_site'
|
|
obj_l1_type_li['user'] = 'v_user'
|
|
|
|
|
|
router = APIRouter()
|
|
|
|
|
|
# Working on the basic API CRUD - STI 2021-03-05
|
|
|
|
#@router.get('/{object_l1}/list')
|
|
@router.get('/{object_l1}/{object_id}/list')
|
|
@router.get('/{object_l1}/{object_l2}/{object_id}/list')
|
|
@router.get('/{object_l1}/{object_l2}/{object_id}/{object_l3}/list')
|
|
async def get_obj_li(object_l1: str=None, object_l2: str=None, object_l3: str=None, object_id: str=None, x_account_id: str = Header(...)):
|
|
response_data = {}
|
|
response_data['object_l1'] = object_l1
|
|
response_data['object_l2'] = object_l2
|
|
response_data['object_l3'] = object_l3
|
|
response_data['object_id'] = object_id
|
|
response_data['list'] = 'li'
|
|
|
|
sql_result = sql_select(table_name='user', record_id=1)
|
|
|
|
response_data['sql_result'] = sql_result
|
|
|
|
return response_data
|
|
|
|
|
|
#@router.get('/{obj_type_l1}/{object_id_int}')
|
|
@router.get('/{obj_type_l1}/{object_id}')
|
|
@router.get('/{obj_type_l1}/{object_l2}/{object_id}')
|
|
@router.get('/{obj_type_l1}/{object_l2}/{object_l3}/{object_id}')
|
|
async def get_obj(obj_type_l1: str=None, object_l2: str=None, object_l3: str=None, object_id: str=None, x_account_id: str = Header(...),
|
|
qry_str: Optional[str] = Query(None, max_length=50),
|
|
qry_int: Optional[int] = None,
|
|
by_alias: Optional[bool] = True,
|
|
exclude_unset: Optional[bool] = True,
|
|
):
|
|
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
|
|
log.debug(locals())
|
|
|
|
debug_data = {}
|
|
debug_data['obj_type_l1'] = obj_type_l1
|
|
debug_data['object_l2'] = object_l2
|
|
debug_data['object_l3'] = object_l3
|
|
debug_data['object_id'] = object_id
|
|
#debug_data['object_id_int'] = object_id_int
|
|
#debug_data['object_id_rand'] = object_id_rand
|
|
|
|
log.debug(debug_data)
|
|
|
|
if obj_type_l1 in obj_l1_type_li:
|
|
object_type = obj_l1_type_li[obj_type_l1]
|
|
else:
|
|
return mk_resp(data=False, status_code=400)
|
|
|
|
# NOTE: Add a check for the object ID... assuming it is a random ID string for now.
|
|
sql_result = sql_select(table_name=object_type, record_id_random=object_id)
|
|
log.debug(sql_result)
|
|
resp_data = User_Base(**sql_result).dict(by_alias=by_alias, exclude_unset=exclude_unset)
|
|
return mk_resp(data=resp_data)#, details=debug_data)
|
|
|
|
data = {}
|
|
data['id_random'] = 1
|
|
|
|
sql_select_str = f"""
|
|
SELECT `user`.id AS 'user_id', `user`.id_random AS 'user_id_random', username, name, email, super
|
|
FROM `user` AS `user`
|
|
WHERE `user`.id = :id_random;
|
|
"""
|
|
|
|
#sql_result = sql_select(table_name='user', record_id=1)
|
|
sql_result = sql_select(sql=sql_select_str, data=data)
|
|
resp_data = User_Base(**sql_result).dict(by_alias=by_alias, exclude_unset=exclude_unset)
|
|
|
|
#response_data['sql_result'] = sql_result
|
|
|
|
resp = mk_resp(data=resp_data)
|
|
|
|
return resp
|