Files
OSIT-AE-API-FastAPI/app/routers/api_crud_v3.py
2025-12-03 18:44:14 -05:00

674 lines
32 KiB
Python

from fastapi import APIRouter, Depends, Header, HTTPException, Path, Query, Request, Response, status
from typing import Dict, List, Optional, Set, Union
import json
import urllib
from app.lib_general import log, logging, Common_Route_Params, common_route_params
from app.models.response_models import *
from app.ae_obj_types_def import obj_type_kv_li
from app.db_sql import redis_lookup_id_random, sql_select, sql_insert, sql_update, sql_delete, get_id_random
router = APIRouter()
@router.get("/health", response_model=Resp_Body_Base)
async def health_check():
"""
Health check endpoint for V3 API.
"""
log.setLevel(logging.INFO)
log.info("V3 Health Check Endpoint Hit")
return mk_resp(data={"status": "V3 API is healthy!"})
@router.get('/{obj_type_l1}/{obj_id}', response_model=Resp_Body_Base)
async def get_obj(
obj_type_l1: str = Path(min_length=2, max_length=50),
obj_id: str = Path(min_length=11, max_length=22),
commons: Common_Route_Params = Depends(common_route_params),
):
"""
Get a single top-level object by its random ID.
Examples:
- /v3/crud/journal/{journal_id}
- /v3/crud/account/{account_id}
"""
log.setLevel(logging.WARNING)
log.debug(locals())
obj_name = obj_type_l1
if obj_name not in obj_type_kv_li:
return mk_resp(data=False, status_code=400, response=commons.response, status_message=f"Object type '{obj_name}' not found.")
obj_cfg = obj_type_kv_li[obj_name]
table_name = obj_cfg.get('tbl_default', obj_cfg.get('tbl'))
base_name = obj_cfg.get('mdl_default', obj_cfg.get('mdl'))
if not table_name or not base_name:
return mk_resp(data=False, status_code=500, response=commons.response, status_message=f"Configuration for object type '{obj_name}' is incomplete.")
record_id = redis_lookup_id_random(record_id_random=obj_id, table_name=obj_name)
if not record_id:
return mk_resp(data=False, status_code=404, response=commons.response, status_message=f"Object with ID '{obj_id}' not found.")
if sql_result := sql_select(table_name=table_name, record_id=record_id):
resp_data = base_name(**sql_result).dict(by_alias=commons.by_alias, exclude_unset=commons.exclude_unset)
return mk_resp(data=resp_data, response=commons.response)
else:
return mk_resp(data=False, status_code=404, response=commons.response, status_message=f"Object with ID '{obj_id}' not found in database.")
@router.get('/{obj_type_l1}/', response_model=Resp_Body_Base)
async def get_obj_li(
obj_type_l1: str,
for_obj_type: Optional[str] = None,
for_obj_id: Optional[str] = None,
hidden: str = 'not_hidden',
order_by_li: Optional[str] = None,
jp: Optional[Union[str, None]] = None,
commons: Common_Route_Params = Depends(common_route_params),
):
"""
Get a list of top-level objects.
Examples:
- /v3/crud/journal/
- /v3/crud/journal/?for_obj_type=account&for_obj_id={account_id_random}
- /v3/crud/journal/?jp={"qry":[{"type":"AND","field":"for_type","operator":"=","value":"user"},{"type":"AND","field":"for_id","operator":"=","value":<user_id>}]}
"""
log.setLevel(logging.WARNING)
log.debug(locals())
# This should be a list of SQL WHERE parts defined in JSON.
qry_dict_li = None
fulltext_qry_dict_obj = None
and_qry_dict_obj = None
and_like_dict_obj = None
or_like_dict_obj = None
and_in_dict_li_obj = None
jp_obj = None
if jp:
try:
jp_obj = json.loads(urllib.parse.unquote(jp))
except Exception as e:
log.warning(e)
return mk_resp(data=False, status_code=400, response=commons.response, status_message='The JSON string was not formatted correctly.')
if jp_obj.get('qry'):
qry_dict_li = jp_obj['qry']
if jp_obj.get('ft_qry'):
fulltext_qry_dict_obj = jp_obj['ft_qry']
if jp_obj.get('and_qry'):
and_qry_dict_obj = jp_obj['and_qry']
if jp_obj.get('and_like'):
and_like_dict_obj = jp_obj['and_like']
if jp_obj.get('or_like'):
or_like_dict_obj = jp_obj['or_like']
if jp_obj.get('and_in_li'):
and_in_dict_li_obj = jp_obj['and_in_li']
if order_by_li:
order_by_li = json.loads(order_by_li)
obj_name = obj_type_l1
if obj_name not in obj_type_kv_li:
return mk_resp(data=False, status_code=400, response=commons.response, status_message=f"Object type '{obj_name}' not found.")
obj_cfg = obj_type_kv_li[obj_name]
table_name = obj_cfg.get('tbl_default', obj_cfg.get('tbl'))
base_name = obj_cfg.get('mdl_default', obj_cfg.get('mdl'))
if not table_name or not base_name:
return mk_resp(data=False, status_code=500, response=commons.response, status_message=f"Configuration for object type '{obj_name}' is incomplete.")
if for_obj_type and for_obj_id:
# Resolve random ID to integer ID
resolved_for_obj_id = redis_lookup_id_random(record_id_random=for_obj_id, table_name=for_obj_type)
if not resolved_for_obj_id:
return mk_resp(data=False, status_code=404, response=commons.response, status_message=f"Parent object with ID '{for_obj_id}' not found.")
field_name = f'{for_obj_type}_id' # Assuming convention like 'account_id' for for_obj_type='account'
sql_result = sql_select(
table_name=table_name,
field_name=field_name,
field_value=resolved_for_obj_id,
enabled=commons.enabled,
hidden=hidden,
qry_dict_li=qry_dict_li,
fulltext_qry_dict=fulltext_qry_dict_obj,
and_qry_dict=and_qry_dict_obj,
and_like_dict=and_like_dict_obj,
or_like_dict=or_like_dict_obj,
and_in_dict_li=and_in_dict_li_obj,
order_by_li=order_by_li,
limit=commons.limit,
offset=commons.offset,
as_list=True,
)
else:
sql_result = sql_select(
table_name=table_name,
enabled=commons.enabled,
hidden=hidden,
qry_dict_li=qry_dict_li,
fulltext_qry_dict=fulltext_qry_dict_obj,
and_qry_dict=and_qry_dict_obj,
and_like_dict=and_like_dict_obj,
or_like_dict=or_like_dict_obj,
and_in_dict_li=and_in_dict_li_obj,
order_by_li=order_by_li,
limit=commons.limit,
offset=commons.offset,
as_list=True,
)
if sql_result:
resp_data_li = []
for record in sql_result:
resp_data = base_name(**record).dict(by_alias=commons.by_alias, exclude_unset=commons.exclude_unset)
resp_data_li.append(resp_data)
return mk_resp(data=resp_data_li, response=commons.response)
else:
return mk_resp(data=[], status_code=200, response=commons.response) # Return empty list on no results
@router.post('/{obj_type_l1}/', response_model=Resp_Body_Base)
async def post_obj(
request: Request,
obj_type_l1: str = Path(min_length=2, max_length=50),
return_obj: Optional[bool] = True,
commons: Common_Route_Params = Depends(common_route_params),
):
"""
Create a new top-level object.
Examples:
- POST /v3/crud/journal/ (with Journal_Base in body)
"""
log.setLevel(logging.WARNING)
log.debug(locals())
obj_data = await request.json()
obj_name = obj_type_l1
if obj_name not in obj_type_kv_li:
return mk_resp(data=False, status_code=400, response=commons.response, status_message=f"Object type '{obj_name}' not found.")
obj_cfg = obj_type_kv_li[obj_name]
table_name_insert = obj_cfg.get('tbl_update', obj_cfg.get('tbl'))
table_name_select = obj_cfg.get('tbl_default', obj_cfg.get('tbl'))
input_model = obj_cfg.get('mdl_in', obj_cfg.get('mdl'))
output_model = obj_cfg.get('mdl_out', obj_cfg.get('mdl_default', obj_cfg.get('mdl')))
if not table_name_insert or not input_model or not table_name_select or not output_model:
return mk_resp(data=False, status_code=500, response=commons.response, status_message=f"Configuration for object type '{obj_name}' is incomplete.")
# Validate incoming data with the appropriate Pydantic model
try:
validated_obj = input_model(**obj_data)
except Exception as e:
log.warning(f"Validation error for {obj_name}: {e}")
return mk_resp(data=False, status_code=400, response=commons.response, status_message=f"Validation error: {e}")
# Convert to dict, excluding unset fields, for database insertion
data_to_insert = validated_obj.dict(exclude_unset=True)
if sql_insert_result := sql_insert(data=data_to_insert, table_name=table_name_insert):
new_obj_id = sql_insert_result
new_obj_id_random = get_id_random(record_id=new_obj_id, table_name=obj_name)
if return_obj:
if sql_select_result := sql_select(table_name=table_name_select, record_id=new_obj_id):
resp_data = output_model(**sql_select_result).dict(by_alias=commons.by_alias, exclude_unset=commons.exclude_unset)
return mk_resp(data=resp_data, response=commons.response)
else:
return mk_resp(data={"obj_id": new_obj_id, "obj_id_random": new_obj_id_random}, status_code=404, response=commons.response, status_message="Object created but could not be retrieved.")
else:
return mk_resp(data={"obj_id": new_obj_id, "obj_id_random": new_obj_id_random}, response=commons.response)
else:
return mk_resp(data=False, status_code=400, response=commons.response, status_message="Failed to create object in database.")
@router.patch('/{obj_type_l1}/{obj_id}', response_model=Resp_Body_Base)
async def patch_obj(
request: Request,
obj_type_l1: str = Path(min_length=2, max_length=50),
obj_id: str = Path(min_length=11, max_length=22),
return_obj: Optional[bool] = True,
commons: Common_Route_Params = Depends(common_route_params),
):
"""
Update a top-level object.
Examples:
- PATCH /v3/crud/journal/{journal_id} (with Journal_Base fields in body)
"""
log.setLevel(logging.WARNING)
log.debug(locals())
obj_data = await request.json()
obj_name = obj_type_l1
if obj_name not in obj_type_kv_li:
return mk_resp(data=False, status_code=400, response=commons.response, status_message=f"Object type '{obj_name}' not found.")
obj_cfg = obj_type_kv_li[obj_name]
table_name_update = obj_cfg.get('tbl_update', obj_cfg.get('tbl'))
table_name_select = obj_cfg.get('tbl_default', obj_cfg.get('tbl'))
input_model = obj_cfg.get('mdl_in', obj_cfg.get('mdl'))
output_model = obj_cfg.get('mdl_out', obj_cfg.get('mdl_default', obj_cfg.get('mdl')))
if not table_name_update or not input_model or not table_name_select or not output_model:
return mk_resp(data=False, status_code=500, response=commons.response, status_message=f"Configuration for object type '{obj_name}' is incomplete.")
record_id = redis_lookup_id_random(record_id_random=obj_id, table_name=obj_name)
if not record_id:
return mk_resp(data=False, status_code=404, response=commons.response, status_message=f"Object with ID '{obj_id}' not found.")
# Validate incoming data with the appropriate Pydantic model.
# For PATCH, we don't want to fail on missing fields, so we don't validate like in POST.
# The sql_update function will only update the fields provided in the dict.
data_to_update = obj_data
if sql_update_result := sql_update(data=data_to_update, table_name=table_name_update, record_id=record_id):
if return_obj:
if sql_select_result := sql_select(table_name=table_name_select, record_id=record_id):
resp_data = output_model(**sql_select_result).dict(by_alias=commons.by_alias, exclude_unset=commons.exclude_unset)
return mk_resp(data=resp_data, response=commons.response)
else:
return mk_resp(data=True, status_code=404, response=commons.response, status_message="Object updated but could not be retrieved.")
else:
return mk_resp(data=True, response=commons.response, status_message="Object updated successfully.")
else:
return mk_resp(data=False, status_code=400, response=commons.response, status_message="Failed to update object in database. It may not have been found, or the data was invalid.")
@router.delete('/{obj_type_l1}/{obj_id}', response_model=Resp_Body_Base)
async def delete_obj(
obj_type_l1: str = Path(min_length=2, max_length=50),
obj_id: str = Path(min_length=11, max_length=22),
commons: Common_Route_Params = Depends(common_route_params),
):
"""
Delete a top-level object.
Examples:
- DELETE /v3/crud/journal/{journal_id}
"""
log.setLevel(logging.WARNING)
log.debug(locals())
obj_name = obj_type_l1
if obj_name not in obj_type_kv_li:
return mk_resp(data=False, status_code=400, response=commons.response, status_message=f"Object type '{obj_name}' not found.")
obj_cfg = obj_type_kv_li[obj_name]
table_name_delete = obj_cfg.get('tbl_update', obj_cfg.get('tbl'))
if not table_name_delete:
return mk_resp(data=False, status_code=500, response=commons.response, status_message=f"Configuration for object type '{obj_name}' is incomplete (missing table for deletion).")
record_id = redis_lookup_id_random(record_id_random=obj_id, table_name=obj_name)
if not record_id:
return mk_resp(data=False, status_code=404, response=commons.response, status_message=f"Object with ID '{obj_id}' not found.")
if sql_delete_result := sql_delete(table_name=table_name_delete, record_id=record_id):
return mk_resp(data=True, response=commons.response, status_message=f"Object with ID '{obj_id}' deleted successfully.")
else:
return mk_resp(data=False, status_code=400, response=commons.response, status_message="Failed to delete object in database. It may not have been found.")
@router.get('/{parent_obj_type}/{parent_obj_id}/{child_obj_type}/', response_model=Resp_Body_Base)
async def get_child_obj_li(
parent_obj_type: str,
parent_obj_id: str,
child_obj_type: str,
hidden: str = 'not_hidden',
order_by_li: Optional[str] = None,
jp: Optional[Union[str, None]] = None,
commons: Common_Route_Params = Depends(common_route_params),
):
"""
Get a list of child objects belonging to a parent.
Examples:
- /v3/crud/journal/{journal_id}/journal_entry/
"""
log.setLevel(logging.WARNING)
log.debug(locals())
# This function's logic is very similar to get_obj_li,
# but it enforces the parent-child relationship from the URL path.
# We can treat the parent path parameters as if they were for_obj_type and for_obj_id query params.
for_obj_type = parent_obj_type
for_obj_id = parent_obj_id
qry_dict_li = None
fulltext_qry_dict_obj = None
and_qry_dict_obj = None
and_like_dict_obj = None
or_like_dict_obj = None
and_in_dict_li_obj = None
jp_obj = None
if jp:
try:
jp_obj = json.loads(urllib.parse.unquote(jp))
except Exception as e:
log.warning(e)
return mk_resp(data=False, status_code=400, response=commons.response, status_message='The JSON string was not formatted correctly.')
if jp_obj.get('qry'):
qry_dict_li = jp_obj['qry']
if jp_obj.get('ft_qry'):
fulltext_qry_dict_obj = jp_obj['ft_qry']
if jp_obj.get('and_qry'):
and_qry_dict_obj = jp_obj['and_qry']
if jp_obj.get('and_like'):
and_like_dict_obj = jp_obj['and_like']
if jp_obj.get('or_like'):
or_like_dict_obj = jp_obj['or_like']
if jp_obj.get('and_in_li'):
and_in_dict_li_obj = jp_obj['and_in_li']
if order_by_li:
order_by_li = json.loads(order_by_li)
obj_name = child_obj_type
if obj_name not in obj_type_kv_li:
return mk_resp(data=False, status_code=400, response=commons.response, status_message=f"Object type '{obj_name}' not found.")
obj_cfg = obj_type_kv_li[obj_name]
table_name = obj_cfg.get('tbl_default', obj_cfg.get('tbl'))
base_name = obj_cfg.get('mdl_default', obj_cfg.get('mdl'))
if not table_name or not base_name:
return mk_resp(data=False, status_code=500, response=commons.response, status_message=f"Configuration for object type '{obj_name}' is incomplete.")
# Resolve parent's random ID to integer ID
resolved_parent_id = redis_lookup_id_random(record_id_random=for_obj_id, table_name=for_obj_type)
if not resolved_parent_id:
return mk_resp(data=False, status_code=404, response=commons.response, status_message=f"Parent object '{for_obj_type}' with ID '{for_obj_id}' not found.")
field_name = f'{for_obj_type}_id' # Assuming convention like 'journal_id'
sql_result = sql_select(
table_name=table_name,
field_name=field_name,
field_value=resolved_parent_id,
enabled=commons.enabled,
hidden=hidden,
qry_dict_li=qry_dict_li,
fulltext_qry_dict=fulltext_qry_dict_obj,
and_qry_dict=and_qry_dict_obj,
and_like_dict=and_like_dict_obj,
or_like_dict=or_like_dict_obj,
and_in_dict_li=and_in_dict_li_obj,
order_by_li=order_by_li,
limit=commons.limit,
offset=commons.offset,
as_list=True,
)
if sql_result:
resp_data_li = []
for record in sql_result:
resp_data = base_name(**record).dict(by_alias=commons.by_alias, exclude_unset=commons.exclude_unset)
resp_data_li.append(resp_data)
return mk_resp(data=resp_data_li, response=commons.response)
else:
return mk_resp(data=[], status_code=200, response=commons.response) # Return empty list on no results
@router.post('/{parent_obj_type}/{parent_obj_id}/{child_obj_type}/', response_model=Resp_Body_Base)
async def post_child_obj(
request: Request,
parent_obj_type: str = Path(min_length=2, max_length=50),
parent_obj_id: str = Path(min_length=11, max_length=22),
child_obj_type: str = Path(min_length=2, max_length=50),
return_obj: Optional[bool] = True,
commons: Common_Route_Params = Depends(common_route_params),
):
"""
Create a new child object for a given parent.
Examples:
- POST /v3/crud/journal/{journal_id}/journal_entry/ (with Journal_Entry_Base in body)
"""
log.setLevel(logging.WARNING)
log.debug(locals())
obj_data = await request.json()
parent_obj_name = parent_obj_type
child_obj_name = child_obj_type
if parent_obj_name not in obj_type_kv_li:
return mk_resp(data=False, status_code=400, response=commons.response, status_message=f"Parent object type '{parent_obj_name}' not found.")
if child_obj_name not in obj_type_kv_li:
return mk_resp(data=False, status_code=400, response=commons.response, status_message=f"Child object type '{child_obj_name}' not found.")
resolved_parent_id = redis_lookup_id_random(record_id_random=parent_obj_id, table_name=parent_obj_name)
if not resolved_parent_id:
return mk_resp(data=False, status_code=404, response=commons.response, status_message=f"Parent object '{parent_obj_name}' with ID '{parent_obj_id}' not found.")
obj_cfg = obj_type_kv_li[child_obj_name]
table_name_insert = obj_cfg.get('tbl_update', obj_cfg.get('tbl'))
table_name_select = obj_cfg.get('tbl_default', obj_cfg.get('tbl'))
input_model = obj_cfg.get('mdl_in', obj_cfg.get('mdl'))
output_model = obj_cfg.get('mdl_out', obj_cfg.get('mdl_default', obj_cfg.get('mdl')))
if not table_name_insert or not input_model or not table_name_select or not output_model:
return mk_resp(data=False, status_code=500, response=commons.response, status_message=f"Configuration for child object type '{child_obj_name}' is incomplete.")
# Inject the parent ID into the child object's data
parent_fk_field_name = f'{parent_obj_name}_id'
obj_data[parent_fk_field_name] = resolved_parent_id
# Validate incoming data with the appropriate Pydantic model
try:
validated_obj = input_model(**obj_data)
except Exception as e:
log.warning(f"Validation error for {child_obj_name}: {e}")
return mk_resp(data=False, status_code=400, response=commons.response, status_message=f"Validation error: {e}")
# Convert to dict, excluding unset fields, for database insertion
data_to_insert = validated_obj.dict(exclude_unset=True)
if sql_insert_result := sql_insert(data=data_to_insert, table_name=table_name_insert):
new_obj_id = sql_insert_result
new_obj_id_random = get_id_random(record_id=new_obj_id, table_name=child_obj_name)
if return_obj:
if sql_select_result := sql_select(table_name=table_name_select, record_id=new_obj_id):
resp_data = output_model(**sql_select_result).dict(by_alias=commons.by_alias, exclude_unset=commons.exclude_unset)
return mk_resp(data=resp_data, response=commons.response)
else:
return mk_resp(data={"obj_id": new_obj_id, "obj_id_random": new_obj_id_random}, status_code=404, response=commons.response, status_message="Child object created but could not be retrieved.")
else:
return mk_resp(data={"obj_id": new_obj_id, "obj_id_random": new_obj_id_random}, response=commons.response)
else:
return mk_resp(data=False, status_code=400, response=commons.response, status_message="Failed to create child object in database.")
@router.get('/{parent_obj_type}/{parent_obj_id}/{child_obj_type}/{child_obj_id}', response_model=Resp_Body_Base)
async def get_child_obj(
parent_obj_type: str = Path(min_length=2, max_length=50),
parent_obj_id: str = Path(min_length=11, max_length=22),
child_obj_type: str = Path(min_length=2, max_length=50),
child_obj_id: str = Path(min_length=11, max_length=22),
commons: Common_Route_Params = Depends(common_route_params),
):
"""
Get a single child object by its ID, ensuring it belongs to the correct parent.
Examples:
- /v3/crud/journal/{journal_id}/journal_entry/{entry_id}
"""
log.setLevel(logging.WARNING)
log.debug(locals())
parent_obj_name = parent_obj_type
child_obj_name = child_obj_type
if parent_obj_name not in obj_type_kv_li:
return mk_resp(data=False, status_code=400, response=commons.response, status_message=f"Parent object type '{parent_obj_name}' not found.")
if child_obj_name not in obj_type_kv_li:
return mk_resp(data=False, status_code=400, response=commons.response, status_message=f"Child object type '{child_obj_name}' not found.")
resolved_parent_id = redis_lookup_id_random(record_id_random=parent_obj_id, table_name=parent_obj_name)
if not resolved_parent_id:
return mk_resp(data=False, status_code=404, response=commons.response, status_message=f"Parent object '{parent_obj_name}' with ID '{parent_obj_id}' not found.")
obj_cfg = obj_type_kv_li[child_obj_name]
table_name = obj_cfg.get('tbl_default', obj_cfg.get('tbl'))
base_name = obj_cfg.get('mdl_default', obj_cfg.get('mdl'))
if not table_name or not base_name:
return mk_resp(data=False, status_code=500, response=commons.response, status_message=f"Configuration for object type '{child_obj_name}' is incomplete.")
resolved_child_id = redis_lookup_id_random(record_id_random=child_obj_id, table_name=child_obj_name)
if not resolved_child_id:
return mk_resp(data=False, status_code=404, response=commons.response, status_message=f"Child object with ID '{child_obj_id}' not found.")
if sql_result := sql_select(table_name=table_name, record_id=resolved_child_id):
# Verify the child belongs to the parent
parent_fk_field_name = f'{parent_obj_name}_id'
if sql_result.get(parent_fk_field_name) != resolved_parent_id:
return mk_resp(data=False, status_code=404, response=commons.response, status_message=f"Child object '{child_obj_id}' not found under parent '{parent_obj_id}'.")
resp_data = base_name(**sql_result).dict(by_alias=commons.by_alias, exclude_unset=commons.exclude_unset)
return mk_resp(data=resp_data, response=commons.response)
else:
return mk_resp(data=False, status_code=404, response=commons.response, status_message=f"Child object with ID '{child_obj_id}' not found in database.")
@router.patch('/{parent_obj_type}/{parent_obj_id}/{child_obj_type}/{child_obj_id}', response_model=Resp_Body_Base)
async def patch_child_obj(
request: Request,
parent_obj_type: str = Path(min_length=2, max_length=50),
parent_obj_id: str = Path(min_length=11, max_length=22),
child_obj_type: str = Path(min_length=2, max_length=50),
child_obj_id: str = Path(min_length=11, max_length=22),
return_obj: Optional[bool] = True,
commons: Common_Route_Params = Depends(common_route_params),
):
"""
Update a child object by its ID, ensuring it belongs to the correct parent.
Examples:
- PATCH /v3/crud/journal/{journal_id}/journal_entry/{entry_id}
"""
log.setLevel(logging.WARNING)
log.debug(locals())
obj_data = await request.json()
parent_obj_name = parent_obj_type
child_obj_name = child_obj_type
if parent_obj_name not in obj_type_kv_li:
return mk_resp(data=False, status_code=400, response=commons.response, status_message=f"Parent object type '{parent_obj_name}' not found.")
if child_obj_name not in obj_type_kv_li:
return mk_resp(data=False, status_code=400, response=commons.response, status_message=f"Child object type '{child_obj_name}' not found.")
# Resolve IDs
resolved_parent_id = redis_lookup_id_random(record_id_random=parent_obj_id, table_name=parent_obj_name)
if not resolved_parent_id:
return mk_resp(data=False, status_code=404, response=commons.response, status_message=f"Parent object '{parent_obj_name}' with ID '{parent_obj_id}' not found.")
resolved_child_id = redis_lookup_id_random(record_id_random=child_obj_id, table_name=child_obj_name)
if not resolved_child_id:
return mk_resp(data=False, status_code=404, response=commons.response, status_message=f"Child object '{child_obj_name}' with ID '{child_obj_id}' not found.")
# Get config for child object
obj_cfg = obj_type_kv_li[child_obj_name]
table_name_update = obj_cfg.get('tbl_update', obj_cfg.get('tbl'))
table_name_select = obj_cfg.get('tbl_default', obj_cfg.get('tbl'))
output_model = obj_cfg.get('mdl_out', obj_cfg.get('mdl_default', obj_cfg.get('mdl')))
if not table_name_update or not table_name_select or not output_model:
return mk_resp(data=False, status_code=500, response=commons.response, status_message=f"Configuration for child object type '{child_obj_name}' is incomplete.")
# Verify parentage before updating
if existing_child := sql_select(table_name=table_name_select, record_id=resolved_child_id):
parent_fk_field_name = f'{parent_obj_name}_id'
if existing_child.get(parent_fk_field_name) != resolved_parent_id:
return mk_resp(data=False, status_code=404, response=commons.response, status_message=f"Child object '{child_obj_id}' not found under parent '{parent_obj_id}'.")
else:
return mk_resp(data=False, status_code=404, response=commons.response, status_message=f"Child object '{child_obj_id}' not found.")
# The sql_update function will only update the fields provided in the dict.
data_to_update = obj_data
if sql_update(data=data_to_update, table_name=table_name_update, record_id=resolved_child_id):
if return_obj:
if updated_child := sql_select(table_name=table_name_select, record_id=resolved_child_id):
resp_data = output_model(**updated_child).dict(by_alias=commons.by_alias, exclude_unset=commons.exclude_unset)
return mk_resp(data=resp_data, response=commons.response)
else:
return mk_resp(data=True, status_code=404, response=commons.response, status_message="Object updated but could not be retrieved post-update.")
else:
return mk_resp(data=True, response=commons.response, status_message="Object updated successfully.")
else:
return mk_resp(data=False, status_code=400, response=commons.response, status_message="Failed to update object in database.")
@router.delete('/{parent_obj_type}/{parent_obj_id}/{child_obj_type}/{child_obj_id}', response_model=Resp_Body_Base)
async def delete_child_obj(
parent_obj_type: str = Path(min_length=2, max_length=50),
parent_obj_id: str = Path(min_length=11, max_length=22),
child_obj_type: str = Path(min_length=2, max_length=50),
child_obj_id: str = Path(min_length=11, max_length=22),
commons: Common_Route_Params = Depends(common_route_params),
):
"""
Delete a child object by its ID, ensuring it belongs to the correct parent.
Examples:
- DELETE /v3/crud/journal/{journal_id}/journal_entry/{entry_id}
"""
log.setLevel(logging.WARNING)
log.debug(locals())
parent_obj_name = parent_obj_type
child_obj_name = child_obj_type
if parent_obj_name not in obj_type_kv_li:
return mk_resp(data=False, status_code=400, response=commons.response, status_message=f"Parent object type '{parent_obj_name}' not found.")
if child_obj_name not in obj_type_kv_li:
return mk_resp(data=False, status_code=400, response=commons.response, status_message=f"Child object type '{child_obj_name}' not found.")
# Resolve IDs
resolved_parent_id = redis_lookup_id_random(record_id_random=parent_obj_id, table_name=parent_obj_name)
if not resolved_parent_id:
return mk_resp(data=False, status_code=404, response=commons.response, status_message=f"Parent object '{parent_obj_name}' with ID '{parent_obj_id}' not found.")
resolved_child_id = redis_lookup_id_random(record_id_random=child_obj_id, table_name=child_obj_name)
if not resolved_child_id:
return mk_resp(data=False, status_code=404, response=commons.response, status_message=f"Child object '{child_obj_name}' with ID '{child_obj_id}' not found.")
# Get config for child object
obj_cfg = obj_type_kv_li[child_obj_name]
table_name_delete = obj_cfg.get('tbl_update', obj_cfg.get('tbl'))
table_name_select = obj_cfg.get('tbl_default', obj_cfg.get('tbl')) # For verification
if not table_name_delete or not table_name_select:
return mk_resp(data=False, status_code=500, response=commons.response, status_message=f"Configuration for child object type '{child_obj_name}' is incomplete.")
# Verify parentage before deleting
if existing_child := sql_select(table_name=table_name_select, record_id=resolved_child_id):
parent_fk_field_name = f'{parent_obj_name}_id'
if existing_child.get(parent_fk_field_name) != resolved_parent_id:
return mk_resp(data=False, status_code=404, response=commons.response, status_message=f"Child object '{child_obj_id}' not found under parent '{parent_obj_id}'.")
else:
return mk_resp(data=False, status_code=404, response=commons.response, status_message=f"Child object '{child_obj_id}' not found.")
# If verification passes, delete the object
if sql_delete(table_name=table_name_delete, record_id=resolved_child_id):
return mk_resp(data=True, response=commons.response, status_message=f"Object with ID '{child_obj_id}' deleted successfully.")
else:
return mk_resp(data=False, status_code=400, response=commons.response, status_message="Failed to delete object in database.")