Files
OSIT-AE-API-FastAPI/app/routers/api_crud_v3.py
2026-01-06 16:03:54 -05:00

873 lines
38 KiB
Python

from fastapi import APIRouter, Depends, Header, HTTPException, Path, Query, Request, Response, status
from typing import Any, Dict, List, Optional, Set, Union
import json
import urllib.parse
import time
import asyncio
import logging
log = logging.getLogger(__name__)
from app.lib_general_v3 import (
AccountContext, get_account_context,
PaginationParams, get_pagination_params,
StatusFilterParams, get_status_filter_params,
SerializationParams, get_serialization_params,
DelayParams, get_delay_params
)
from app.models.response_models import *
from app.models.api_crud_models import SearchQuery
from app.ae_obj_types_def import obj_type_kv_li
from app.db_sql import redis_lookup_id_random, sql_select, sql_insert, sql_update, sql_delete, get_id_random
router = APIRouter()
def safe_json_loads(json_str: Optional[str]) -> Any:
"""
Safely load a JSON string, handling 'undefined' or invalid formats
common in frontend URL parameters.
"""
if not json_str or json_str == 'undefined':
return None
try:
return json.loads(json_str)
except (json.JSONDecodeError, TypeError) as e:
log.warning(f"Failed to parse JSON string: {json_str}. Error: {e}")
return None
def filter_order_by(order_by_li: Any, model: Any) -> Optional[Dict[str, str]]:
"""
Filters the order_by_li dictionary to only include fields present in the Pydantic model.
This prevents SQL errors when the frontend requests ordering by fields that don't exist
on specific objects (e.g., 'priority' or 'sort' on 'account').
"""
if not order_by_li or not isinstance(order_by_li, dict) or not model:
return order_by_li
if not hasattr(model, '__fields__'):
return order_by_li
# Get all field names and aliases from the model
model_fields = set(model.__fields__.keys())
model_fields.update({f.alias for f in model.__fields__.values() if f.alias})
filtered = {k: v for k, v in order_by_li.items() if k in model_fields}
if len(filtered) != len(order_by_li):
log.info(f"Filtered order_by_li. Removed fields: {set(order_by_li.keys()) - set(filtered.keys())}")
return filtered
def get_supported_filters(model: Any, status_filter: StatusFilterParams) -> StatusFilterParams:
"""
Adjusts the status filter based on what the model actually supports to avoid
SQL errors when filtering by non-existent columns (like 'hide' or 'enable').
"""
if not model or not hasattr(model, "__fields__"):
return status_filter
# Create a copy of the filter params
adjusted = StatusFilterParams(
enabled=status_filter.enabled,
hidden=status_filter.hidden
)
# Check for 'enable' and 'hide' fields in the model
if 'enable' not in model.__fields__:
adjusted.enabled = 'all'
if 'hide' not in model.__fields__:
adjusted.hidden = 'all'
return adjusted
@router.get("/health", response_model=Resp_Body_Base)
async def health_check(
delay: DelayParams = Depends(get_delay_params),
):
"""
Health check endpoint for V3 API.
"""
if delay.sleep_time_s > 0:
await asyncio.sleep(delay.sleep_time_s)
log.setLevel(logging.INFO)
log.info("V3 Health Check Endpoint Hit")
return mk_resp(data={"status": "V3 API is healthy!"})
@router.get("/{obj_type}/schema", response_model=Resp_Body_Base, tags=['CRUD v3 Schema (Dev)'])
async def get_obj_schema(
response: Response,
obj_type: str = Path(min_length=2, max_length=50),
view: str = Query('default', description="Select alternative view/model (e.g., enriched, detail)"),
variant: str = Query('base', regex='^(base|in|out|default)$', description="Select model variant"),
account: AccountContext = Depends(get_account_context),
):
"""
Returns the schema for a specific Aether object type.
Combines database column metadata with Pydantic model field definitions.
"""
from app.db_sql import db
from sqlalchemy import text
if obj_type not in obj_type_kv_li:
return mk_resp(data=False, status_code=400, response=response, status_message=f"Object type '{obj_type}' not found.")
obj_cfg = obj_type_kv_li[obj_type]
# Select table/view
table_name = obj_cfg.get(f'tbl_{view}', obj_cfg.get('tbl_default', obj_cfg.get('tbl')))
# Select model variant
model_key = f'mdl_{variant}' if variant != 'base' else 'mdl'
model = obj_cfg.get(model_key, obj_cfg.get('mdl_default', obj_cfg.get('mdl')))
if not table_name:
return mk_resp(data=False, status_code=500, response=response, status_message=f"Table configuration for '{obj_type}' is missing.")
schema_info = {
"object_type": obj_type,
"view": view,
"variant": variant,
"database": {
"table_name": table_name,
"columns": []
},
"model": {
"name": model.__name__ if hasattr(model, '__name__') else str(model),
"fields": {}
}
}
# 1. Get Database Metadata
try:
db_result = db.execute(text(f"DESCRIBE `{table_name}`"))
for row in db_result.fetchall():
schema_info["database"]["columns"].append({
"field": row[0],
"type": row[1],
"nullable": row[2] == 'YES',
"key": row[3],
"default": row[4],
"extra": row[5]
})
except Exception as e:
log.warning(f"Failed to describe table '{table_name}': {e}")
schema_info["database"]["error"] = str(e)
# 2. Get Pydantic Model Metadata
if model and hasattr(model, "__fields__"):
for field_name, field in model.__fields__.items():
field_info = {
"alias": field.alias,
"type": str(field.outer_type_),
"required": field.required,
"default": field.default
}
# Include extra metadata from Field() if present
if field.field_info.description:
field_info["description"] = field.field_info.description
schema_info["model"]["fields"][field_name] = field_info
return mk_resp(data=schema_info, response=response)
# --- TOP LEVEL OBJECTS ---
@router.get('/{obj_type_l1}/{obj_id}', response_model=Resp_Body_Base)
async def get_obj(
response: Response,
obj_type_l1: str = Path(min_length=2, max_length=50),
obj_id: str = Path(min_length=11, max_length=22),
view: str = Query('default', description="Select alternative view/model (e.g., enriched, detail)"),
account: AccountContext = Depends(get_account_context),
serialization: SerializationParams = Depends(get_serialization_params),
delay: DelayParams = Depends(get_delay_params),
):
"""
Get a single top-level object by its random ID.
"""
if delay.sleep_time_s > 0:
await asyncio.sleep(delay.sleep_time_s)
log.setLevel(logging.WARNING)
log.debug(locals())
obj_name = obj_type_l1
if obj_name not in obj_type_kv_li:
return mk_resp(data=False, status_code=400, response=response, status_message=f"Object type '{obj_name}' not found.")
obj_cfg = obj_type_kv_li[obj_name]
table_name = obj_cfg.get(f'tbl_{view}', obj_cfg.get('tbl_default', obj_cfg.get('tbl')))
base_name = obj_cfg.get(f'mdl_{view}', obj_cfg.get('mdl_default', obj_cfg.get('mdl')))
if not table_name or not base_name:
return mk_resp(data=False, status_code=500, response=response, status_message=f"Configuration for object type '{obj_name}' (view: {view}) is incomplete.")
record_id = redis_lookup_id_random(record_id_random=obj_id, table_name=obj_name)
if not record_id:
return mk_resp(data=False, status_code=404, response=response, status_message=f"Object with ID '{obj_id}' not found.")
if sql_result := sql_select(table_name=table_name, record_id=record_id):
resp_data = base_name(**sql_result).dict(
by_alias=serialization.by_alias,
exclude_unset=serialization.exclude_unset,
exclude_defaults=serialization.exclude_defaults,
exclude_none=serialization.exclude_none
)
return mk_resp(data=resp_data, response=response)
else:
return mk_resp(data=False, status_code=404, response=response, status_message=f"Object with ID '{obj_id}' not found in database.")
@router.get('/{obj_type_l1}/', response_model=Resp_Body_Base)
async def get_obj_li(
response: Response,
obj_type_l1: str,
for_obj_type: Optional[str] = None,
for_obj_id: Optional[str] = None,
view: str = Query('default'),
order_by_li: Optional[str] = None,
jp: Optional[Union[str, None]] = None,
account: AccountContext = Depends(get_account_context),
pagination: PaginationParams = Depends(get_pagination_params),
status_filter: StatusFilterParams = Depends(get_status_filter_params),
serialization: SerializationParams = Depends(get_serialization_params),
delay: DelayParams = Depends(get_delay_params),
):
"""
Get a list of top-level objects.
"""
if delay.sleep_time_s > 0:
await asyncio.sleep(delay.sleep_time_s)
log.setLevel(logging.WARNING)
log.debug(locals())
qry_dict_li = None
fulltext_qry_dict_obj = None
and_qry_dict_obj = None
and_like_dict_obj = None
or_like_dict_obj = None
and_in_dict_li_obj = None
jp_obj = safe_json_loads(urllib.parse.unquote(jp)) if jp else None
if jp_obj:
if jp_obj.get('qry'): qry_dict_li = jp_obj['qry']
if jp_obj.get('ft_qry'): fulltext_qry_dict_obj = jp_obj['ft_qry']
if jp_obj.get('and_qry'): and_qry_dict_obj = jp_obj['and_qry']
if jp_obj.get('and_like'): and_like_dict_obj = jp_obj['and_like']
if jp_obj.get('or_like'): or_like_dict_obj = jp_obj['or_like']
if jp_obj.get('and_in_li'): and_in_dict_li_obj = jp_obj['and_in_li']
order_by_li = safe_json_loads(order_by_li)
obj_name = obj_type_l1
if obj_name not in obj_type_kv_li:
return mk_resp(data=False, status_code=400, response=response, status_message=f"Object type '{obj_name}' not found.")
# Security Restrictions
if obj_name == 'site' and not (for_obj_type == 'account' and for_obj_id):
return mk_resp(data=False, status_code=403, response=response, status_message="Listing sites is only permitted when filtered by account.")
obj_cfg = obj_type_kv_li[obj_name]
table_name = obj_cfg.get(f'tbl_{view}', obj_cfg.get('tbl_default', obj_cfg.get('tbl')))
base_name = obj_cfg.get(f'mdl_{view}', obj_cfg.get('mdl_default', obj_cfg.get('mdl')))
if not table_name or not base_name:
return mk_resp(data=False, status_code=500, response=response, status_message=f"Configuration for object type '{obj_name}' (view: {view}) is incomplete.")
order_by_li = filter_order_by(order_by_li, base_name)
status_filter = get_supported_filters(base_name, status_filter)
if for_obj_type and for_obj_id:
resolved_for_obj_id = redis_lookup_id_random(record_id_random=for_obj_id, table_name=for_obj_type)
if not resolved_for_obj_id:
return mk_resp(data=False, status_code=404, response=response, status_message=f"Parent object with ID '{for_obj_id}' not found.")
sql_result = sql_select(
table_name=table_name,
field_name=f'{for_obj_type}_id',
field_value=resolved_for_obj_id,
enabled=status_filter.enabled,
hidden=status_filter.hidden,
qry_dict_li=qry_dict_li,
fulltext_qry_dict=fulltext_qry_dict_obj,
and_qry_dict=and_qry_dict_obj,
and_like_dict=and_like_dict_obj,
or_like_dict=or_like_dict_obj,
and_in_dict_li=and_in_dict_li_obj,
order_by_li=order_by_li,
limit=pagination.limit,
offset=pagination.offset,
as_list=True,
)
else:
sql_result = sql_select(
table_name=table_name,
enabled=status_filter.enabled,
hidden=status_filter.hidden,
qry_dict_li=qry_dict_li,
fulltext_qry_dict=fulltext_qry_dict_obj,
and_qry_dict=and_qry_dict_obj,
and_like_dict=and_like_dict_obj,
or_like_dict=or_like_dict_obj,
and_in_dict_li=and_in_dict_li_obj,
order_by_li=order_by_li,
limit=pagination.limit,
offset=pagination.offset,
as_list=True,
)
if sql_result is False:
return mk_resp(data=False, status_code=500, response=response, status_message="Database error occurred.")
elif sql_result:
resp_data_li = [base_name(**record).dict(
by_alias=serialization.by_alias,
exclude_unset=serialization.exclude_unset,
exclude_defaults=serialization.exclude_defaults,
exclude_none=serialization.exclude_none
) for record in sql_result]
return mk_resp(data=resp_data_li, response=response)
else:
return mk_resp(data=[], status_code=200, response=response)
@router.post('/{obj_type_l1}/search', response_model=Resp_Body_Base, tags=['CRUD v3 Search (Dev)'])
async def search_obj_li(
response: Response,
obj_type_l1: str,
search_query: SearchQuery,
for_obj_type: Optional[str] = Query(None, description="Explicit parent type filter"),
for_obj_id: Optional[str] = Query(None, description="Explicit parent ID filter"),
view: str = Query('default', description="Select alternative view/model (e.g., enriched, detail)"),
order_by_li: Optional[str] = Query(None),
account: AccountContext = Depends(get_account_context),
pagination: PaginationParams = Depends(get_pagination_params),
status_filter: StatusFilterParams = Depends(get_status_filter_params),
serialization: SerializationParams = Depends(get_serialization_params),
delay: DelayParams = Depends(get_delay_params),
):
"""
Search top-level objects using a complex SearchQuery in the POST body.
"""
if delay.sleep_time_s > 0:
await asyncio.sleep(delay.sleep_time_s)
log.setLevel(logging.WARNING)
log.debug(locals())
order_by_li = safe_json_loads(order_by_li)
obj_name = obj_type_l1
if obj_name not in obj_type_kv_li:
return mk_resp(data=False, status_code=400, response=response, status_message=f"Object type '{obj_name}' not found.")
# Security Restrictions
if obj_name == 'site' and not (for_obj_type == 'account' and for_obj_id):
return mk_resp(data=False, status_code=403, response=response, status_message="Listing sites is only permitted when filtered by account.")
obj_cfg = obj_type_kv_li[obj_name]
table_name = obj_cfg.get(f'tbl_{view}', obj_cfg.get('tbl_default', obj_cfg.get('tbl')))
base_name = obj_cfg.get(f'mdl_{view}', obj_cfg.get('mdl_default', obj_cfg.get('mdl')))
if not table_name or not base_name:
return mk_resp(data=False, status_code=500, response=response, status_message=f"Configuration for object type '{obj_name}' (view: {view}) is incomplete.")
order_by_li = filter_order_by(order_by_li, base_name)
status_filter = get_supported_filters(base_name, status_filter)
searchable_fields = obj_cfg.get('searchable_fields')
if for_obj_type and for_obj_id:
resolved_for_obj_id = redis_lookup_id_random(record_id_random=for_obj_id, table_name=for_obj_type)
if not resolved_for_obj_id:
return mk_resp(data=False, status_code=404, response=response, status_message=f"Parent object with ID '{for_obj_id}' not found.")
sql_result = sql_select(
table_name=table_name,
field_name=f'{for_obj_type}_id',
field_value=resolved_for_obj_id,
enabled=status_filter.enabled,
hidden=status_filter.hidden,
search_query=search_query,
searchable_fields=searchable_fields,
order_by_li=order_by_li,
limit=pagination.limit,
offset=pagination.offset,
as_list=True,
)
else:
sql_result = sql_select(
table_name=table_name,
enabled=status_filter.enabled,
hidden=status_filter.hidden,
search_query=search_query,
searchable_fields=searchable_fields,
order_by_li=order_by_li,
limit=pagination.limit,
offset=pagination.offset,
as_list=True,
)
if sql_result is False:
return mk_resp(data=False, status_code=500, response=response, status_message="Database error occurred.")
elif sql_result:
resp_data_li = [base_name(**record).dict(
by_alias=serialization.by_alias,
exclude_unset=serialization.exclude_unset,
exclude_defaults=serialization.exclude_defaults,
exclude_none=serialization.exclude_none
) for record in sql_result]
return mk_resp(data=resp_data_li, response=response)
else:
return mk_resp(data=[], status_code=200, response=response)
@router.post('/{obj_type_l1}/', response_model=Resp_Body_Base)
async def post_obj(
request: Request,
response: Response,
obj_type_l1: str = Path(min_length=2, max_length=50),
return_obj: Optional[bool] = True,
account: AccountContext = Depends(get_account_context),
serialization: SerializationParams = Depends(get_serialization_params),
delay: DelayParams = Depends(get_delay_params),
):
"""
Create a new top-level object.
"""
if delay.sleep_time_s > 0:
await asyncio.sleep(delay.sleep_time_s)
log.setLevel(logging.WARNING)
log.debug(locals())
obj_data = await request.json()
obj_name = obj_type_l1
if obj_name not in obj_type_kv_li:
return mk_resp(data=False, status_code=400, response=response, status_message=f"Object type '{obj_name}' not found.")
obj_cfg = obj_type_kv_li[obj_name]
table_name_insert = obj_cfg.get('tbl_update', obj_cfg.get('tbl'))
table_name_select = obj_cfg.get('tbl_default', obj_cfg.get('tbl'))
input_model = obj_cfg.get('mdl_in', obj_cfg.get('mdl'))
output_model = obj_cfg.get('mdl_out', obj_cfg.get('mdl_default', obj_cfg.get('mdl')))
if not table_name_insert or not input_model or not table_name_select or not output_model:
return mk_resp(data=False, status_code=500, response=response, status_message=f"Configuration for object type '{obj_name}' is incomplete.")
try:
validated_obj = input_model(**obj_data)
except Exception as e:
log.warning(f"Validation error for {obj_name}: {e}")
return mk_resp(data=False, status_code=400, response=response, status_message=f"Validation error: {e}")
data_to_insert = validated_obj.dict(exclude_unset=True)
if sql_insert_result := sql_insert(data=data_to_insert, table_name=table_name_insert):
new_obj_id = sql_insert_result
new_obj_id_random = get_id_random(record_id=new_obj_id, table_name=obj_name)
if return_obj:
if sql_select_result := sql_select(table_name=table_name_select, record_id=new_obj_id):
resp_data = output_model(**sql_select_result).dict(by_alias=serialization.by_alias, exclude_unset=serialization.exclude_unset)
return mk_resp(data=resp_data, response=response)
else:
return mk_resp(data={"obj_id": new_obj_id, "obj_id_random": new_obj_id_random}, status_code=404, response=response, status_message="Object created but could not be retrieved.")
else:
return mk_resp(data={"obj_id": new_obj_id, "obj_id_random": new_obj_id_random}, response=response)
else:
return mk_resp(data=False, status_code=400, response=response, status_message="Failed to create object in database.")
@router.patch('/{obj_type_l1}/{obj_id}', response_model=Resp_Body_Base)
async def patch_obj(
request: Request,
response: Response,
obj_type_l1: str = Path(min_length=2, max_length=50),
obj_id: str = Path(min_length=11, max_length=22),
return_obj: Optional[bool] = True,
account: AccountContext = Depends(get_account_context),
serialization: SerializationParams = Depends(get_serialization_params),
delay: DelayParams = Depends(get_delay_params),
):
"""
Update a top-level object.
"""
if delay.sleep_time_s > 0:
await asyncio.sleep(delay.sleep_time_s)
log.setLevel(logging.WARNING)
log.debug(locals())
obj_data = await request.json()
obj_name = obj_type_l1
if obj_name not in obj_type_kv_li:
return mk_resp(data=False, status_code=400, response=response, status_message=f"Object type '{obj_name}' not found.")
obj_cfg = obj_type_kv_li[obj_name]
table_name_update = obj_cfg.get('tbl_update', obj_cfg.get('tbl'))
table_name_select = obj_cfg.get('tbl_default', obj_cfg.get('tbl'))
input_model = obj_cfg.get('mdl_in', obj_cfg.get('mdl'))
output_model = obj_cfg.get('mdl_out', obj_cfg.get('mdl_default', obj_cfg.get('mdl')))
if not table_name_update or not input_model or not table_name_select or not output_model:
return mk_resp(data=False, status_code=500, response=response, status_message=f"Configuration for object type '{obj_name}' is incomplete.")
record_id = redis_lookup_id_random(record_id_random=obj_id, table_name=obj_name)
if not record_id:
return mk_resp(data=False, status_code=404, response=response, status_message=f"Object with ID '{obj_id}' not found.")
data_to_update = obj_data
if sql_update_result := sql_update(data=data_to_update, table_name=table_name_update, record_id=record_id):
if return_obj:
if sql_select_result := sql_select(table_name=table_name_select, record_id=record_id):
resp_data = output_model(**sql_select_result).dict(by_alias=serialization.by_alias, exclude_unset=serialization.exclude_unset)
return mk_resp(data=resp_data, response=response)
else:
return mk_resp(data=True, status_code=404, response=response, status_message="Object updated but could not be retrieved.")
else:
return mk_resp(data=True, response=response, status_message="Object updated successfully.")
else:
return mk_resp(data=False, status_code=400, response=response, status_message="Failed to update object in database.")
@router.delete('/{obj_type_l1}/{obj_id}', response_model=Resp_Body_Base)
async def delete_obj(
response: Response,
obj_type_l1: str = Path(min_length=2, max_length=50),
obj_id: str = Path(min_length=11, max_length=22),
method: str = Query('delete', regex='^(delete|hide|disable)$'),
account: AccountContext = Depends(get_account_context),
delay: DelayParams = Depends(get_delay_params),
):
"""
Delete a top-level object (hard or soft).
"""
if delay.sleep_time_s > 0:
await asyncio.sleep(delay.sleep_time_s)
log.setLevel(logging.WARNING)
log.debug(locals())
obj_name = obj_type_l1
if obj_name not in obj_type_kv_li:
return mk_resp(data=False, status_code=400, response=response, status_message=f"Object type '{obj_name}' not found.")
obj_cfg = obj_type_kv_li[obj_name]
table_name_delete = obj_cfg.get('tbl_update', obj_cfg.get('tbl'))
if not table_name_delete:
return mk_resp(data=False, status_code=500, response=response, status_message=f"Configuration for object type '{obj_name}' is incomplete.")
record_id = redis_lookup_id_random(record_id_random=obj_id, table_name=obj_name)
if not record_id:
return mk_resp(data=False, status_code=404, response=response, status_message=f"Object with ID '{obj_id}' not found.")
if method == 'hide':
if sql_update(table_name=table_name_delete, record_id=record_id, data={'hide': True}):
return mk_resp(data=True, response=response, status_message=f"Object with ID '{obj_id}' hidden successfully.")
else:
return mk_resp(data=False, status_code=400, response=response, status_message="Failed to hide object.")
elif method == 'disable':
if sql_update(table_name=table_name_delete, record_id=record_id, data={'enable': False}):
return mk_resp(data=True, response=response, status_message=f"Object with ID '{obj_id}' disabled successfully.")
else:
return mk_resp(data=False, status_code=400, response=response, status_message="Failed to disable object.")
else:
if sql_delete_result := sql_delete(table_name=table_name_delete, record_id=record_id):
return mk_resp(data=True, response=response, status_message=f"Object with ID '{obj_id}' deleted successfully.")
else:
return mk_resp(data=False, status_code=400, response=response, status_message="Failed to delete object.")
# --- CHILD / NESTED OBJECTS ---
@router.get('/{parent_obj_type}/{parent_obj_id}/{child_obj_type}/', response_model=Resp_Body_Base)
async def get_child_obj_li(
response: Response,
parent_obj_type: str,
parent_obj_id: str,
child_obj_type: str,
order_by_li: Optional[str] = None,
jp: Optional[Union[str, None]] = None,
account: AccountContext = Depends(get_account_context),
pagination: PaginationParams = Depends(get_pagination_params),
status_filter: StatusFilterParams = Depends(get_status_filter_params),
serialization: SerializationParams = Depends(get_serialization_params),
delay: DelayParams = Depends(get_delay_params),
):
"""
Get a list of child objects belonging to a parent.
"""
if delay.sleep_time_s > 0:
await asyncio.sleep(delay.sleep_time_s)
log.setLevel(logging.WARNING)
log.debug(locals())
qry_dict_li = None
fulltext_qry_dict_obj = None
and_qry_dict_obj = None
and_like_dict_obj = None
or_like_dict_obj = None
and_in_dict_li_obj = None
jp_obj = safe_json_loads(urllib.parse.unquote(jp)) if jp else None
if jp_obj:
if jp_obj.get('qry'): qry_dict_li = jp_obj['qry']
if jp_obj.get('ft_qry'): fulltext_qry_dict_obj = jp_obj['ft_qry']
if jp_obj.get('and_qry'): and_qry_dict_obj = jp_obj['and_qry']
if jp_obj.get('and_like'): and_like_dict_obj = jp_obj['and_like']
if jp_obj.get('or_like'): or_like_dict_obj = jp_obj['or_like']
if jp_obj.get('and_in_li'): and_in_dict_li_obj = jp_obj['and_in_li']
order_by_li = safe_json_loads(order_by_li)
obj_name = child_obj_type
if obj_name not in obj_type_kv_li:
return mk_resp(data=False, status_code=400, response=response, status_message=f"Object type '{obj_name}' not found.")
obj_cfg = obj_type_kv_li[obj_name]
table_name = obj_cfg.get('tbl_default', obj_cfg.get('tbl'))
base_name = obj_cfg.get('mdl_default', obj_cfg.get('mdl'))
if not table_name or not base_name:
return mk_resp(data=False, status_code=500, response=response, status_message=f"Configuration for object type '{obj_name}' is incomplete.")
order_by_li = filter_order_by(order_by_li, base_name)
status_filter = get_supported_filters(base_name, status_filter)
resolved_parent_id = redis_lookup_id_random(record_id_random=parent_obj_id, table_name=parent_obj_type)
if not resolved_parent_id:
return mk_resp(data=False, status_code=404, response=response, status_message=f"Parent object '{parent_obj_type}' with ID '{parent_obj_id}' not found.")
sql_result = sql_select(
table_name=table_name,
field_name=f'{parent_obj_type}_id',
field_value=resolved_parent_id,
enabled=status_filter.enabled,
hidden=status_filter.hidden,
qry_dict_li=qry_dict_li,
fulltext_qry_dict=fulltext_qry_dict_obj,
and_qry_dict=and_qry_dict_obj,
and_like_dict=and_like_dict_obj,
or_like_dict=or_like_dict_obj,
and_in_dict_li=and_in_dict_li_obj,
order_by_li=order_by_li,
limit=pagination.limit,
offset=pagination.offset,
as_list=True,
)
if sql_result is False:
return mk_resp(data=False, status_code=500, response=response, status_message="Database error occurred.")
elif sql_result:
resp_data_li = [base_name(**record).dict(
by_alias=serialization.by_alias,
exclude_unset=serialization.exclude_unset,
exclude_defaults=serialization.exclude_defaults,
exclude_none=serialization.exclude_none
) for record in sql_result]
return mk_resp(data=resp_data_li, response=response)
else:
return mk_resp(data=[], status_code=200, response=response)
@router.post('/{parent_obj_type}/{parent_obj_id}/{child_obj_type}/', response_model=Resp_Body_Base)
async def post_child_obj(
request: Request,
response: Response,
parent_obj_type: str = Path(min_length=2, max_length=50),
parent_obj_id: str = Path(min_length=11, max_length=22),
child_obj_type: str = Path(min_length=2, max_length=50),
return_obj: Optional[bool] = True,
account: AccountContext = Depends(get_account_context),
serialization: SerializationParams = Depends(get_serialization_params),
delay: DelayParams = Depends(get_delay_params),
):
"""
Create a new child object for a given parent.
"""
if delay.sleep_time_s > 0:
await asyncio.sleep(delay.sleep_time_s)
log.setLevel(logging.WARNING)
log.debug(locals())
obj_data = await request.json()
if parent_obj_type not in obj_type_kv_li or child_obj_type not in obj_type_kv_li:
return mk_resp(data=False, status_code=400, response=response, status_message="Invalid object type.")
resolved_parent_id = redis_lookup_id_random(record_id_random=parent_obj_id, table_name=parent_obj_type)
if not resolved_parent_id:
return mk_resp(data=False, status_code=404, response=response, status_message="Parent not found.")
obj_cfg = obj_type_kv_li[child_obj_type]
table_name_insert = obj_cfg.get('tbl_update', obj_cfg.get('tbl'))
table_name_select = obj_cfg.get('tbl_default', obj_cfg.get('tbl'))
input_model = obj_cfg.get('mdl_in', obj_cfg.get('mdl'))
output_model = obj_cfg.get('mdl_out', obj_cfg.get('mdl_default', obj_cfg.get('mdl')))
obj_data[f'{parent_obj_type}_id'] = resolved_parent_id
try:
validated_obj = input_model(**obj_data)
except Exception as e:
return mk_resp(data=False, status_code=400, response=response, status_message=f"Validation error: {e}")
data_to_insert = validated_obj.dict(exclude_unset=True)
if sql_insert_result := sql_insert(data=data_to_insert, table_name=table_name_insert):
new_obj_id = sql_insert_result
new_obj_id_random = get_id_random(record_id=new_obj_id, table_name=child_obj_type)
if return_obj:
if sql_select_result := sql_select(table_name=table_name_select, record_id=new_obj_id):
resp_data = output_model(**sql_select_result).dict(by_alias=serialization.by_alias, exclude_unset=serialization.exclude_unset)
return mk_resp(data=resp_data, response=response)
return mk_resp(data={"obj_id": new_obj_id, "obj_id_random": new_obj_id_random}, response=response)
else:
return mk_resp(data=False, status_code=400, response=response, status_message="Failed to create child object.")
@router.get('/{parent_obj_type}/{parent_obj_id}/{child_obj_type}/{child_obj_id}', response_model=Resp_Body_Base)
async def get_child_obj(
response: Response,
parent_obj_type: str = Path(min_length=2, max_length=50),
parent_obj_id: str = Path(min_length=11, max_length=22),
child_obj_type: str = Path(min_length=2, max_length=50),
child_obj_id: str = Path(min_length=11, max_length=22),
account: AccountContext = Depends(get_account_context),
serialization: SerializationParams = Depends(get_serialization_params),
delay: DelayParams = Depends(get_delay_params),
):
"""
Get a single child object, verifying parentage.
"""
if delay.sleep_time_s > 0:
await asyncio.sleep(delay.sleep_time_s)
log.setLevel(logging.WARNING)
log.debug(locals())
resolved_parent_id = redis_lookup_id_random(record_id_random=parent_obj_id, table_name=parent_obj_type)
resolved_child_id = redis_lookup_id_random(record_id_random=child_obj_id, table_name=child_obj_type)
if not resolved_parent_id or not resolved_child_id:
return mk_resp(data=False, status_code=404, response=response, status_message="Object(s) not found.")
obj_cfg = obj_type_kv_li[child_obj_type]
table_name = obj_cfg.get('tbl_default', obj_cfg.get('tbl'))
base_name = obj_cfg.get('mdl_default', obj_cfg.get('mdl'))
if sql_result := sql_select(table_name=table_name, record_id=resolved_child_id):
if sql_result.get(f'{parent_obj_type}_id') != resolved_parent_id:
return mk_resp(data=False, status_code=404, response=response, status_message="Child not found under parent.")
resp_data = base_name(**sql_result).dict(by_alias=serialization.by_alias, exclude_unset=serialization.exclude_unset)
return mk_resp(data=resp_data, response=response)
return mk_resp(data=False, status_code=404, response=response, status_message="Child not found.")
@router.patch('/{parent_obj_type}/{parent_obj_id}/{child_obj_type}/{child_obj_id}', response_model=Resp_Body_Base)
async def patch_child_obj(
request: Request,
response: Response,
parent_obj_type: str = Path(min_length=2, max_length=50),
parent_obj_id: str = Path(min_length=11, max_length=22),
child_obj_type: str = Path(min_length=2, max_length=50),
child_obj_id: str = Path(min_length=11, max_length=22),
return_obj: Optional[bool] = True,
account: AccountContext = Depends(get_account_context),
serialization: SerializationParams = Depends(get_serialization_params),
delay: DelayParams = Depends(get_delay_params),
):
"""
Update a child object, verifying parentage.
"""
if delay.sleep_time_s > 0:
await asyncio.sleep(delay.sleep_time_s)
log.setLevel(logging.WARNING)
log.debug(locals())
obj_data = await request.json()
resolved_parent_id = redis_lookup_id_random(record_id_random=parent_obj_id, table_name=parent_obj_type)
resolved_child_id = redis_lookup_id_random(record_id_random=child_obj_id, table_name=child_obj_type)
if not resolved_parent_id or not resolved_child_id:
return mk_resp(data=False, status_code=404, response=response, status_message="Object(s) not found.")
obj_cfg = obj_type_kv_li[child_obj_type]
table_name_update = obj_cfg.get('tbl_update', obj_cfg.get('tbl'))
table_name_select = obj_cfg.get('tbl_default', obj_cfg.get('tbl'))
output_model = obj_cfg.get('mdl_out', obj_cfg.get('mdl_default', obj_cfg.get('mdl')))
if existing_child := sql_select(table_name=table_name_select, record_id=resolved_child_id):
if existing_child.get(f'{parent_obj_type}_id') != resolved_parent_id:
return mk_resp(data=False, status_code=404, response=response, status_message="Child not found under parent.")
else:
return mk_resp(data=False, status_code=404, response=response, status_message="Child not found.")
if sql_update(data=obj_data, table_name=table_name_update, record_id=resolved_child_id):
if return_obj:
if updated_child := sql_select(table_name=table_name_select, record_id=resolved_child_id):
resp_data = output_model(**updated_child).dict(by_alias=serialization.by_alias, exclude_unset=serialization.exclude_unset)
return mk_resp(data=resp_data, response=response)
return mk_resp(data=True, response=response, status_message="Updated successfully.")
return mk_resp(data=False, status_code=400, response=response, status_message="Update failed.")
@router.delete('/{parent_obj_type}/{parent_obj_id}/{child_obj_type}/{child_obj_id}', response_model=Resp_Body_Base)
async def delete_child_obj(
response: Response,
parent_obj_type: str = Path(min_length=2, max_length=50),
parent_obj_id: str = Path(min_length=11, max_length=22),
child_obj_type: str = Path(min_length=2, max_length=50),
child_obj_id: str = Path(min_length=11, max_length=22),
method: str = Query('delete', regex='^(delete|hide|disable)$'),
account: AccountContext = Depends(get_account_context),
delay: DelayParams = Depends(get_delay_params),
):
"""
Delete a child object, verifying parentage (hard or soft).
"""
if delay.sleep_time_s > 0:
await asyncio.sleep(delay.sleep_time_s)
log.setLevel(logging.WARNING)
log.debug(locals())
resolved_parent_id = redis_lookup_id_random(record_id_random=parent_obj_id, table_name=parent_obj_type)
resolved_child_id = redis_lookup_id_random(record_id_random=child_obj_id, table_name=child_obj_type)
if not resolved_parent_id or not resolved_child_id:
return mk_resp(data=False, status_code=404, response=response, status_message="Object(s) not found.")
obj_cfg = obj_type_kv_li[child_obj_type]
table_name_delete = obj_cfg.get('tbl_update', obj_cfg.get('tbl'))
table_name_select = obj_cfg.get('tbl_default', obj_cfg.get('tbl'))
if existing_child := sql_select(table_name=table_name_select, record_id=resolved_child_id):
if existing_child.get(f'{parent_obj_type}_id') != resolved_parent_id:
return mk_resp(data=False, status_code=404, response=response, status_message="Child not found under parent.")
else:
return mk_resp(data=False, status_code=404, response=response, status_message="Child not found.")
if method == 'hide':
success = sql_update(table_name=table_name_delete, record_id=resolved_child_id, data={'hide': True})
elif method == 'disable':
success = sql_update(table_name=table_name_delete, record_id=resolved_child_id, data={'enable': False})
else:
success = sql_delete(table_name=table_name_delete, record_id=resolved_child_id)
if success:
return mk_resp(data=True, response=response, status_message=f"Deleted successfully via {method}.")
return mk_resp(data=False, status_code=400, response=response, status_message="Deletion failed.")