Files
OSIT-AE-API-FastAPI/app/routers/api_crud_v3.py

286 lines
12 KiB
Python

from fastapi import APIRouter, Depends, Header, HTTPException, Path, Query, Request, Response, status
from typing import Dict, List, Optional, Set, Union
import json
import urllib
from app.lib_general import log, logging, Common_Route_Params, common_route_params
from app.models.response_models import *
from app.ae_obj_types_def import obj_type_kv_li
from app.db_sql import redis_lookup_id_random, sql_select, sql_insert, sql_update, get_id_random
router = APIRouter()
@router.get("/health", response_model=Resp_Body_Base)
async def health_check():
"""
Health check endpoint for V3 API.
"""
log.setLevel(logging.INFO)
log.info("V3 Health Check Endpoint Hit")
return mk_resp(data={"status": "V3 API is healthy!"})
@router.get('/{obj_type_l1}/{obj_id}', response_model=Resp_Body_Base)
async def get_obj(
obj_type_l1: str = Path(min_length=2, max_length=50),
obj_id: str = Path(min_length=11, max_length=22),
commons: Common_Route_Params = Depends(common_route_params),
):
"""
Get a single top-level object by its random ID.
Examples:
- /v3/crud/journal/{journal_id}
- /v3/crud/account/{account_id}
"""
log.setLevel(logging.WARNING)
log.debug(locals())
obj_name = obj_type_l1
if obj_name not in obj_type_kv_li:
return mk_resp(data=False, status_code=400, response=commons.response, status_message=f"Object type '{obj_name}' not found.")
obj_cfg = obj_type_kv_li[obj_name]
table_name = obj_cfg.get('tbl_default', obj_cfg.get('tbl'))
base_name = obj_cfg.get('mdl_default', obj_cfg.get('mdl'))
if not table_name or not base_name:
return mk_resp(data=False, status_code=500, response=commons.response, status_message=f"Configuration for object type '{obj_name}' is incomplete.")
record_id = redis_lookup_id_random(record_id_random=obj_id, table_name=obj_name)
if not record_id:
return mk_resp(data=False, status_code=404, response=commons.response, status_message=f"Object with ID '{obj_id}' not found.")
if sql_result := sql_select(table_name=table_name, record_id=record_id):
resp_data = base_name(**sql_result).dict(by_alias=commons.by_alias, exclude_unset=commons.exclude_unset)
return mk_resp(data=resp_data, response=commons.response)
else:
return mk_resp(data=False, status_code=404, response=commons.response, status_message=f"Object with ID '{obj_id}' not found in database.")
@router.get('/{obj_type_l1}/', response_model=Resp_Body_Base)
async def get_obj_li(
obj_type_l1: str,
for_obj_type: Optional[str] = None,
for_obj_id: Optional[str] = None,
hidden: str = 'not_hidden',
order_by_li: Optional[str] = None,
jp: Optional[Union[str, None]] = None,
commons: Common_Route_Params = Depends(common_route_params),
):
"""
Get a list of top-level objects.
Examples:
- /v3/crud/journal/
- /v3/crud/journal/?for_obj_type=account&for_obj_id={account_id_random}
- /v3/crud/journal/?jp={"qry":[{"type":"AND","field":"for_type","operator":"=","value":"user"},{"type":"AND","field":"for_id","operator":"=","value":<user_id>}]}
"""
log.setLevel(logging.WARNING)
log.debug(locals())
# This should be a list of SQL WHERE parts defined in JSON.
qry_dict_li = None
fulltext_qry_dict_obj = None
and_qry_dict_obj = None
and_like_dict_obj = None
or_like_dict_obj = None
and_in_dict_li_obj = None
jp_obj = None
if jp:
try:
jp_obj = json.loads(urllib.parse.unquote(jp))
except Exception as e:
log.warning(e)
return mk_resp(data=False, status_code=400, response=commons.response, status_message='The JSON string was not formatted correctly.')
if jp_obj.get('qry'):
qry_dict_li = jp_obj['qry']
if jp_obj.get('ft_qry'):
fulltext_qry_dict_obj = jp_obj['ft_qry']
if jp_obj.get('and_qry'):
and_qry_dict_obj = jp_obj['and_qry']
if jp_obj.get('and_like'):
and_like_dict_obj = jp_obj['and_like']
if jp_obj.get('or_like'):
or_like_dict_obj = jp_obj['or_like']
if jp_obj.get('and_in_li'):
and_in_dict_li_obj = jp_obj['and_in_li']
if order_by_li:
order_by_li = json.loads(order_by_li)
obj_name = obj_type_l1
if obj_name not in obj_type_kv_li:
return mk_resp(data=False, status_code=400, response=commons.response, status_message=f"Object type '{obj_name}' not found.")
obj_cfg = obj_type_kv_li[obj_name]
table_name = obj_cfg.get('tbl_default', obj_cfg.get('tbl'))
base_name = obj_cfg.get('mdl_default', obj_cfg.get('mdl'))
if not table_name or not base_name:
return mk_resp(data=False, status_code=500, response=commons.response, status_message=f"Configuration for object type '{obj_name}' is incomplete.")
if for_obj_type and for_obj_id:
# Resolve random ID to integer ID
resolved_for_obj_id = redis_lookup_id_random(record_id_random=for_obj_id, table_name=for_obj_type)
if not resolved_for_obj_id:
return mk_resp(data=False, status_code=404, response=commons.response, status_message=f"Parent object with ID '{for_obj_id}' not found.")
field_name = f'{for_obj_type}_id' # Assuming convention like 'account_id' for for_obj_type='account'
sql_result = sql_select(
table_name=table_name,
field_name=field_name,
field_value=resolved_for_obj_id,
enabled=commons.enabled,
hidden=hidden,
qry_dict_li=qry_dict_li,
fulltext_qry_dict=fulltext_qry_dict_obj,
and_qry_dict=and_qry_dict_obj,
and_like_dict=and_like_dict_obj,
or_like_dict=or_like_dict_obj,
and_in_dict_li=and_in_dict_li_obj,
order_by_li=order_by_li,
limit=commons.limit,
offset=commons.offset,
as_list=True,
)
else:
sql_result = sql_select(
table_name=table_name,
enabled=commons.enabled,
hidden=hidden,
qry_dict_li=qry_dict_li,
fulltext_qry_dict=fulltext_qry_dict_obj,
and_qry_dict=and_qry_dict_obj,
and_like_dict=and_like_dict_obj,
or_like_dict=or_like_dict_obj,
and_in_dict_li=and_in_dict_li_obj,
order_by_li=order_by_li,
limit=commons.limit,
offset=commons.offset,
as_list=True,
)
if sql_result:
resp_data_li = []
for record in sql_result:
resp_data = base_name(**record).dict(by_alias=commons.by_alias, exclude_unset=commons.exclude_unset)
resp_data_li.append(resp_data)
return mk_resp(data=resp_data_li, response=commons.response)
else:
return mk_resp(data=[], status_code=200, response=commons.response) # Return empty list on no results
@router.post('/{obj_type_l1}/', response_model=Resp_Body_Base)
async def post_obj(
request: Request,
obj_type_l1: str = Path(min_length=2, max_length=50),
return_obj: Optional[bool] = True,
commons: Common_Route_Params = Depends(common_route_params),
):
"""
Create a new top-level object.
Examples:
- POST /v3/crud/journal/ (with Journal_Base in body)
"""
log.setLevel(logging.WARNING)
log.debug(locals())
obj_data = await request.json()
obj_name = obj_type_l1
if obj_name not in obj_type_kv_li:
return mk_resp(data=False, status_code=400, response=commons.response, status_message=f"Object type '{obj_name}' not found.")
obj_cfg = obj_type_kv_li[obj_name]
table_name_insert = obj_cfg.get('tbl_update', obj_cfg.get('tbl'))
table_name_select = obj_cfg.get('tbl_default', obj_cfg.get('tbl'))
input_model = obj_cfg.get('mdl_in', obj_cfg.get('mdl'))
output_model = obj_cfg.get('mdl_out', obj_cfg.get('mdl_default', obj_cfg.get('mdl')))
if not table_name_insert or not input_model or not table_name_select or not output_model:
return mk_resp(data=False, status_code=500, response=commons.response, status_message=f"Configuration for object type '{obj_name}' is incomplete.")
# Validate incoming data with the appropriate Pydantic model
try:
validated_obj = input_model(**obj_data)
except Exception as e:
log.warning(f"Validation error for {obj_name}: {e}")
return mk_resp(data=False, status_code=400, response=commons.response, status_message=f"Validation error: {e}")
# Convert to dict, excluding unset fields, for database insertion
data_to_insert = validated_obj.dict(exclude_unset=True)
if sql_insert_result := sql_insert(data=data_to_insert, table_name=table_name_insert):
new_obj_id = sql_insert_result
new_obj_id_random = get_id_random(record_id=new_obj_id, table_name=obj_name)
if return_obj:
if sql_select_result := sql_select(table_name=table_name_select, record_id=new_obj_id):
resp_data = output_model(**sql_select_result).dict(by_alias=commons.by_alias, exclude_unset=commons.exclude_unset)
return mk_resp(data=resp_data, response=commons.response)
else:
return mk_resp(data={"obj_id": new_obj_id, "obj_id_random": new_obj_id_random}, status_code=404, response=commons.response, status_message="Object created but could not be retrieved.")
else:
return mk_resp(data={"obj_id": new_obj_id, "obj_id_random": new_obj_id_random}, response=commons.response)
else:
return mk_resp(data=False, status_code=400, response=commons.response, status_message="Failed to create object in database.")
@router.patch('/{obj_type_l1}/{obj_id}', response_model=Resp_Body_Base)
async def patch_obj(
request: Request,
obj_type_l1: str = Path(min_length=2, max_length=50),
obj_id: str = Path(min_length=11, max_length=22),
return_obj: Optional[bool] = True,
commons: Common_Route_Params = Depends(common_route_params),
):
"""
Update a top-level object.
Examples:
- PATCH /v3/crud/journal/{journal_id} (with Journal_Base fields in body)
"""
log.setLevel(logging.WARNING)
log.debug(locals())
obj_data = await request.json()
obj_name = obj_type_l1
if obj_name not in obj_type_kv_li:
return mk_resp(data=False, status_code=400, response=commons.response, status_message=f"Object type '{obj_name}' not found.")
obj_cfg = obj_type_kv_li[obj_name]
table_name_update = obj_cfg.get('tbl_update', obj_cfg.get('tbl'))
table_name_select = obj_cfg.get('tbl_default', obj_cfg.get('tbl'))
input_model = obj_cfg.get('mdl_in', obj_cfg.get('mdl'))
output_model = obj_cfg.get('mdl_out', obj_cfg.get('mdl_default', obj_cfg.get('mdl')))
if not table_name_update or not input_model or not table_name_select or not output_model:
return mk_resp(data=False, status_code=500, response=commons.response, status_message=f"Configuration for object type '{obj_name}' is incomplete.")
record_id = redis_lookup_id_random(record_id_random=obj_id, table_name=obj_name)
if not record_id:
return mk_resp(data=False, status_code=404, response=commons.response, status_message=f"Object with ID '{obj_id}' not found.")
# Validate incoming data with the appropriate Pydantic model.
# For PATCH, we don't want to fail on missing fields, so we don't validate like in POST.
# The sql_update function will only update the fields provided in the dict.
data_to_update = obj_data
if sql_update_result := sql_update(data=data_to_update, table_name=table_name_update, record_id=record_id):
if return_obj:
if sql_select_result := sql_select(table_name=table_name_select, record_id=record_id):
resp_data = output_model(**sql_select_result).dict(by_alias=commons.by_alias, exclude_unset=commons.exclude_unset)
return mk_resp(data=resp_data, response=commons.response)
else:
return mk_resp(data=True, status_code=404, response=commons.response, status_message="Object updated but could not be retrieved.")
else:
return mk_resp(data=True, response=commons.response, status_message="Object updated successfully.")
else:
return mk_resp(data=False, status_code=400, response=commons.response, status_message="Failed to update object in database. It may not have been found, or the data was invalid.")