Files
OSIT-AE-API-FastAPI/app/routers/api_crud_v3.py
Scott Idem 2ff211f2c2 Update API documentation and finalize model validators/mappings.
- Added comprehensive docstrings to api_crud_v3.py explaining multi-tenancy, sanitization, and soft-delete logic.
- Finalized Address and Contact models/mappings with correct validators and field maps.
- Consolidated test suite in tests/ directory.
2026-01-09 15:52:00 -05:00

963 lines
44 KiB
Python

from fastapi import APIRouter, Depends, Header, HTTPException, Path, Query, Request, Response, status
from typing import Any, Dict, List, Optional, Set, Union
import json
import urllib.parse
import time
import asyncio
import logging
log = logging.getLogger(__name__)
from app.lib_general_v3 import (
AccountContext, get_account_context, get_account_context_optional,
PaginationParams, StatusFilterParams,
SerializationParams, DelayParams
)
from app.models.response_models import *
from app.models.api_crud_models import SearchFilter, SearchQuery
from app.ae_obj_types_def import obj_type_kv_li
# from app.db_sql import redis_lookup_id_random, sql_select, sql_insert, sql_update, sql_delete, get_id_random
"""
Aether API V3 - Generic CRUD Router
-----------------------------------
This router provides a standardized, dynamic interface for Create, Read, Update, and Delete operations
across the Aether Object System.
Key Features:
1. Dynamic Object Handling: Routes work for any object defined in `ae_obj_types_def`.
2. Multi-Tenancy: Automatically enforces `account_id` isolation for non-superusers.
3. ID Obfuscation: Uses `id_random` (public strings) -> `id` (database integers) resolution.
4. Data Sanitization: Automatically strips virtual fields and view-only fields before DB writes.
"""
router = APIRouter()
# --- Helpers ---
def check_account_access(sql_result: Any, account: AccountContext, obj_name: str = None) -> bool:
"""
Enforce Multi-Tenant Data Isolation.
Verifies that the requested record belongs to the authenticated user's account.
Returns True if:
- User is a Super User or System (Bypass).
- The record's `account_id` matches the user's `account_id`.
"""
if account.super or account.auth_method == 'bypass':
return True
if not account.account_id:
return False
res_account_id = None
if isinstance(sql_result, dict):
if obj_name == 'account':
res_account_id = sql_result.get('id')
else:
res_account_id = sql_result.get('account_id')
if res_account_id is not None and res_account_id != account.account_id:
return False
return True
def apply_forced_account_filter(and_qry_dict: Optional[Dict], account: AccountContext, model: Any, obj_name: str) -> Dict:
"""
Secure Search Filtering.
Automatically appends an `account_id` filter to database queries to ensure
users only retrieve records associated with their own account.
"""
forced = and_qry_dict or {}
if account.super or account.auth_method == 'bypass':
return forced
if obj_name == 'account':
forced['id'] = account.account_id
elif model and hasattr(model, '__fields__') and 'account_id' in model.__fields__:
forced['account_id'] = account.account_id
return forced
def filter_order_by(order_by_li: Any, model: Any, table_name: str = None) -> Optional[Dict[str, str]]:
"""
Sanitize Sorting Parameters.
Prevents SQL injection and logic errors by validating that requested sort columns
actually exist in the Pydantic model and/or the database table.
"""
if not order_by_li or not isinstance(order_by_li, dict) or not model:
return order_by_li
if not hasattr(model, '__fields__'):
return order_by_li
model_fields = set(model.__fields__.keys())
model_fields.update({f.alias for f in model.__fields__.values() if f.alias})
filtered = {k: v for k, v in order_by_li.items() if k in model_fields}
if table_name and filtered:
from app.db_sql import db
from sqlalchemy import text
final_filtered = {}
for column in filtered:
try:
# Lightweight check to see if column exists in SQL
db.execute(text(f"SELECT `{column}` FROM `{table_name}` LIMIT 0"))
final_filtered[column] = filtered[column]
except Exception:
pass
filtered = final_filtered
return filtered
def get_supported_filters(model: Any, status_filter: StatusFilterParams) -> StatusFilterParams:
"""
Adaptive Status Filtering.
Adjusts the default filters (enabled/hidden) based on whether the target object
actually supports those concepts (i.e., has those columns).
"""
if not model or not hasattr(model, "__fields__"):
return status_filter
# We create a new instance to avoid side effects on the dependency object
from app.routers.dependencies_v3 import StatusFilterParams as SF
adjusted = SF()
adjusted.enabled = status_filter.enabled
adjusted.hidden = status_filter.hidden
if 'enable' not in model.__fields__:
adjusted.enabled = 'all'
if 'hide' not in model.__fields__:
adjusted.hidden = 'all'
return adjusted
def safe_json_loads(json_str: Optional[str]) -> Any:
if not json_str or json_str == 'undefined': return None
try: return json.loads(json_str)
except: return None
# --- Routes ---
@router.get("/health", response_model=Resp_Body_Base)
async def health_check(delay: DelayParams = Depends()):
"""
Health Check Endpoint.
Used by monitoring systems to verify API availability.
"""
if delay.sleep_time_s > 0: await asyncio.sleep(delay.sleep_time_s)
return mk_resp(data={"status": "V3 API is healthy!"})
@router.get("/{obj_type}/schema", response_model=Resp_Body_Base, tags=['CRUD v3 Schema (Dev)'])
async def get_obj_schema(
response: Response,
obj_type: str = Path(min_length=2, max_length=50),
view: str = Query('default'),
variant: str = Query('base'),
account: AccountContext = Depends(get_account_context),
):
"""
Dynamic Schema Introspection.
Allows the frontend (e.g., Svelte/React apps) to retrieve the structure of an object type on the fly.
Returns:
- Database column definitions (types, defaults, nullability).
- Pydantic model field definitions (validation rules, aliases).
This enables dynamic form generation without hardcoding schemas in the frontend.
"""
from app.db_sql import db
from sqlalchemy import text
if obj_type not in obj_type_kv_li:
return mk_resp(data=False, status_code=400, response=response, status_message=f"Object type '{obj_type}' not found.")
obj_cfg = obj_type_kv_li[obj_type]
table_name = obj_cfg.get(f'tbl_{view}', obj_cfg.get('tbl_default', obj_cfg.get('tbl')))
model_key = f'mdl_{variant}' if variant != 'base' else 'mdl'
model = obj_cfg.get(model_key, obj_cfg.get('mdl_default', obj_cfg.get('mdl')))
if not table_name:
return mk_resp(data=False, status_code=500, response=response, status_message=f"Table configuration for '{obj_type}' is missing.")
schema_info = {
"object_type": obj_type,
"view": view,
"variant": variant,
"database": {"table_name": table_name, "columns": []},
"model": {"name": model.__name__ if hasattr(model, '__name__') else str(model), "fields": {}}
}
try:
db_result = db.execute(text(f"DESCRIBE `{table_name}`"))
for row in db_result.fetchall():
schema_info["database"]["columns"].append({
"field": row[0], "type": row[1], "nullable": row[2] == 'YES',
"key": row[3], "default": row[4], "extra": row[5]
})
except Exception as e:
schema_info["database"]["error"] = str(e)
if model and hasattr(model, "__fields__"):
for field_name, field in model.__fields__.items():
field_info = {"alias": field.alias, "type": str(field.outer_type_), "required": field.required, "default": field.default}
if field.field_info.description: field_info["description"] = field.field_info.description
schema_info["model"]["fields"][field_name] = field_info
return mk_resp(data=schema_info, response=response)
@router.get('/{obj_type_l1}/{obj_id}', response_model=Resp_Body_Base)
async def get_obj(
response: Response,
obj_type_l1: str = Path(min_length=2, max_length=50),
obj_id: str = Path(min_length=11, max_length=22),
view: str = Query('default'),
account: AccountContext = Depends(get_account_context),
serialization: SerializationParams = Depends(),
delay: DelayParams = Depends(),
):
"""
Retrieve a Single Object.
1. Resolves the public `id_random` (string) to the internal `id` (integer).
2. Performs a SQL SELECT.
3. Enforces Multi-Tenant access checks.
4. Serializes the result via Pydantic.
"""
from app.db_sql import redis_lookup_id_random, sql_select
if delay.sleep_time_s > 0: await asyncio.sleep(delay.sleep_time_s)
obj_name = obj_type_l1
if obj_name not in obj_type_kv_li:
return mk_resp(data=False, status_code=400, response=response, status_message=f"Object type '{obj_name}' not found.")
obj_cfg = obj_type_kv_li[obj_name]
table_name = obj_cfg.get(f'tbl_{view}', obj_cfg.get('tbl_default', obj_cfg.get('tbl')))
base_name = obj_cfg.get(f'mdl_{view}', obj_cfg.get('mdl_default', obj_cfg.get('mdl')))
if not table_name or not base_name:
return mk_resp(data=False, status_code=500, response=response, status_message=f"Configuration error.")
record_id = redis_lookup_id_random(record_id_random=obj_id, table_name=obj_name)
if not record_id:
return mk_resp(data=False, status_code=404, response=response, status_message=f"Object with ID '{obj_id}' not found.")
if sql_result := sql_select(table_name=table_name, record_id=record_id):
if not check_account_access(sql_result, account, obj_name):
return mk_resp(data=False, status_code=403, response=response, status_message="Access denied.")
resp_data = base_name(**sql_result).dict(by_alias=serialization.by_alias, exclude_unset=serialization.exclude_unset, exclude_defaults=serialization.exclude_defaults, exclude_none=serialization.exclude_none)
return mk_resp(data=resp_data, response=response)
else:
return mk_resp(data=False, status_code=404, response=response, status_message=f"Object with ID '{obj_id}' not found in database.")
@router.get('/{obj_type_l1}/', response_model=Resp_Body_Base)
async def get_obj_li(
response: Response,
obj_type_l1: str,
for_obj_type: Optional[str] = None,
for_obj_id: Optional[str] = None,
view: str = Query('default'),
order_by_li: Optional[str] = None,
jp: Optional[Union[str, None]] = None,
account: AccountContext = Depends(get_account_context),
pagination: PaginationParams = Depends(),
status_filter: StatusFilterParams = Depends(),
serialization: SerializationParams = Depends(),
delay: DelayParams = Depends(),
):
"""
List Objects (Pagination & Filtering).
Supports:
- Standard filtering (enabled/hidden).
- Advanced filtering via JSON Payload (`jp`) param (Search, Fulltext, AND/OR queries).
- Sorting (`order_by_li`).
- Parent-Child filtering (`for_obj_type`, `for_obj_id`).
- Account Isolation (automatically enforced).
"""
from app.db_sql import redis_lookup_id_random, sql_select
if delay.sleep_time_s > 0: await asyncio.sleep(delay.sleep_time_s)
qry_dict_li = None
fulltext_qry_dict_obj = None
and_qry_dict_obj = None
and_like_dict_obj = None
or_like_dict_obj = None
and_in_dict_li_obj = None
jp_obj = safe_json_loads(urllib.parse.unquote(jp)) if jp else None
if jp_obj:
if jp_obj.get('qry'): qry_dict_li = jp_obj['qry']
if jp_obj.get('ft_qry'): fulltext_qry_dict_obj = jp_obj['ft_qry']
if jp_obj.get('and_qry'): and_qry_dict_obj = jp_obj['and_qry']
if jp_obj.get('and_like'): and_like_dict_obj = jp_obj['and_like']
if jp_obj.get('or_like'): or_like_dict_obj = jp_obj['or_like']
if jp_obj.get('and_in_li'): and_in_dict_li_obj = jp_obj['and_in_li']
order_by_li = safe_json_loads(order_by_li)
obj_name = obj_type_l1
if obj_name not in obj_type_kv_li:
return mk_resp(data=False, status_code=400, response=response, status_message=f"Object type '{obj_name}' not found.")
if obj_name == 'site' and not (for_obj_type == 'account' and for_obj_id):
return mk_resp(data=False, status_code=403, response=response, status_message="Listing sites is only permitted when filtered by account.")
if for_obj_type == 'account' and for_obj_id:
if not account.super and for_obj_id != account.account_id_random:
return mk_resp(data=False, status_code=403, response=response, status_message="Access denied to requested account.")
obj_cfg = obj_type_kv_li[obj_name]
table_name = obj_cfg.get(f'tbl_{view}', obj_cfg.get('tbl_default', obj_cfg.get('tbl')))
base_name = obj_cfg.get(f'mdl_{view}', obj_cfg.get('mdl_default', obj_cfg.get('mdl')))
if not table_name or not base_name:
return mk_resp(data=False, status_code=500, response=response, status_message=f"Configuration error.")
order_by_li = filter_order_by(order_by_li, base_name, table_name)
status_filter = get_supported_filters(base_name, status_filter)
and_qry_dict_obj = apply_forced_account_filter(and_qry_dict_obj, account, base_name, obj_name)
if for_obj_type and for_obj_id:
resolved_for_obj_id = redis_lookup_id_random(record_id_random=for_obj_id, table_name=for_obj_type)
if not resolved_for_obj_id:
return mk_resp(data=False, status_code=404, response=response, status_message=f"Parent object with ID '{for_obj_id}' not found.")
sql_result = sql_select(
table_name=table_name,
field_name=f'{for_obj_type}_id',
field_value=resolved_for_obj_id,
enabled=status_filter.enabled,
hidden=status_filter.hidden,
qry_dict_li=qry_dict_li,
fulltext_qry_dict=fulltext_qry_dict_obj,
and_qry_dict=and_qry_dict_obj,
and_like_dict=and_like_dict_obj,
or_like_dict=or_like_dict_obj,
and_in_dict_li=and_in_dict_li_obj,
order_by_li=order_by_li,
limit=pagination.limit,
offset=pagination.offset,
as_list=True,
)
else:
sql_result = sql_select(
table_name=table_name,
enabled=status_filter.enabled,
hidden=status_filter.hidden,
qry_dict_li=qry_dict_li,
fulltext_qry_dict=fulltext_qry_dict_obj,
and_qry_dict=and_qry_dict_obj,
and_like_dict=and_like_dict_obj,
or_like_dict=or_like_dict_obj,
and_in_dict_li=and_in_dict_li_obj,
order_by_li=order_by_li,
limit=pagination.limit,
offset=pagination.offset,
as_list=True,
)
if sql_result:
resp_data_li = [base_name(**record).dict(by_alias=serialization.by_alias, exclude_unset=serialization.exclude_unset, exclude_defaults=serialization.exclude_defaults, exclude_none=serialization.exclude_none) for record in sql_result]
return mk_resp(data=resp_data_li, response=response)
else:
return mk_resp(data=[], status_code=200, response=response)
@router.post('/{obj_type_l1}/search', response_model=Resp_Body_Base, tags=['CRUD v3 Search (Dev)'])
async def search_obj_li(
response: Response,
obj_type_l1: str,
search_query: SearchQuery,
for_obj_type: Optional[str] = Query(None),
for_obj_id: Optional[str] = Query(None),
view: str = Query('default'),
order_by_li: Optional[str] = Query(None),
account: AccountContext = Depends(get_account_context_optional),
pagination: PaginationParams = Depends(),
status_filter: StatusFilterParams = Depends(),
serialization: SerializationParams = Depends(),
delay: DelayParams = Depends(),
):
"""
Search Objects (POST).
Advanced search endpoint using `SearchQuery` body.
- Security: Guests can access specific objects (e.g., site_domain) if permitted.
- Filtering: Supports dynamic AND/OR filters built from the frontend.
"""
from app.db_sql import redis_lookup_id_random, sql_select
if delay.sleep_time_s > 0: await asyncio.sleep(delay.sleep_time_s)
order_by_li = safe_json_loads(order_by_li)
obj_name = obj_type_l1
if obj_name not in obj_type_kv_li:
return mk_resp(data=False, status_code=400, response=response, status_message=f"Object type '{obj_name}' not found.")
is_guest = (account.auth_method == 'guest')
is_site_domain_lookup = (obj_name == 'site_domain')
if is_guest and not is_site_domain_lookup:
return mk_resp(data=False, status_code=403, response=response, status_message="Authentication required for this search.")
if obj_name == 'site' and not (for_obj_type == 'account' and for_obj_id):
return mk_resp(data=False, status_code=403, response=response, status_message="Listing sites is only permitted when filtered by account.")
obj_cfg = obj_type_kv_li[obj_name]
table_name = obj_cfg.get(f'tbl_{view}', obj_cfg.get('tbl_default', obj_cfg.get('tbl')))
base_name = obj_cfg.get(f'mdl_{view}', obj_cfg.get('mdl_default', obj_cfg.get('mdl')))
if not table_name or not base_name:
return mk_resp(data=False, status_code=500, response=response, status_message=f"Configuration error.")
order_by_li = filter_order_by(order_by_li, base_name, table_name)
status_filter = get_supported_filters(base_name, status_filter)
searchable_fields = obj_cfg.get('searchable_fields')
if for_obj_type == 'account' and for_obj_id:
if not account.super and for_obj_id != account.account_id_random:
return mk_resp(data=False, status_code=403, response=response, status_message="Access denied to requested account.")
if not account.super and account.auth_method != 'bypass' and account.account_id:
if search_query.and_filters is None: search_query.and_filters = []
if obj_name == 'account':
search_query.and_filters.append(SearchFilter(field='id', op='eq', value=account.account_id))
elif base_name and hasattr(base_name, '__fields__') and 'account_id' in base_name.__fields__:
search_query.and_filters.append(SearchFilter(field='account_id', op='eq', value=account.account_id))
if for_obj_type and for_obj_id:
resolved_for_obj_id = redis_lookup_id_random(record_id_random=for_obj_id, table_name=for_obj_type)
if not resolved_for_obj_id:
return mk_resp(data=False, status_code=404, response=response, status_message=f"Parent object with ID '{for_obj_id}' not found.")
sql_result = sql_select(
table_name=table_name,
field_name=f'{for_obj_type}_id',
field_value=resolved_for_obj_id,
enabled=status_filter.enabled,
hidden=status_filter.hidden,
search_query=search_query,
searchable_fields=searchable_fields,
order_by_li=order_by_li,
limit=pagination.limit,
offset=pagination.offset,
as_list=True,
)
else:
sql_result = sql_select(
table_name=table_name,
enabled=status_filter.enabled,
hidden=status_filter.hidden,
search_query=search_query,
searchable_fields=searchable_fields,
order_by_li=order_by_li,
limit=pagination.limit,
offset=pagination.offset,
as_list=True,
)
if sql_result:
resp_data_li = [base_name(**record).dict(by_alias=serialization.by_alias, exclude_unset=serialization.exclude_unset, exclude_defaults=serialization.exclude_defaults, exclude_none=serialization.exclude_none) for record in sql_result]
return mk_resp(data=resp_data_li, response=response)
else:
return mk_resp(data=[], status_code=200, response=response)
@router.post('/{obj_type_l1}/', response_model=Resp_Body_Base)
async def post_obj(
request: Request,
response: Response,
obj_type_l1: str = Path(min_length=2, max_length=50),
return_obj: Optional[bool] = True,
account: AccountContext = Depends(get_account_context),
serialization: SerializationParams = Depends(),
delay: DelayParams = Depends(),
):
"""
Create Object.
1. Validates input against Pydantic model (`mdl_in`).
2. Injects `account_id` for ownership.
3. **Sanitizes Payload**: Removes virtual lookup fields (`*_id_random`) and view-only fields (`fields_to_exclude_from_db`)
to prevent "unknown column" errors during insertion.
4. Returns the created object or just its ID.
"""
from app.db_sql import sql_insert, get_id_random, sql_select
if delay.sleep_time_s > 0: await asyncio.sleep(delay.sleep_time_s)
obj_data = await request.json()
obj_name = obj_type_l1
if obj_name not in obj_type_kv_li:
return mk_resp(data=False, status_code=400, response=response, status_message=f"Object type '{obj_name}' not found.")
obj_cfg = obj_type_kv_li[obj_name]
table_name_insert = obj_cfg.get('tbl_update', obj_cfg.get('tbl'))
table_name_select = obj_cfg.get('tbl_default', obj_cfg.get('tbl'))
input_model = obj_cfg.get('mdl_in', obj_cfg.get('mdl'))
output_model = obj_cfg.get('mdl_out', obj_cfg.get('mdl_default', obj_cfg.get('mdl')))
if not table_name_insert or not input_model or not table_name_select or not output_model:
return mk_resp(data=False, status_code=500, response=response, status_message=f"Configuration error.")
if not account.super and account.auth_method != 'bypass' and account.account_id:
if 'account_id' in input_model.__fields__:
obj_data['account_id'] = account.account_id
elif obj_name == 'account':
return mk_resp(data=False, status_code=403, response=response, status_message="Account creation is restricted.")
try:
validated_obj = input_model(**obj_data)
except Exception as e:
return mk_resp(data=False, status_code=400, response=response, status_message=f"Validation error: {e}")
data_to_insert = validated_obj.dict(exclude_unset=True)
# Filter out virtual _id_random fields (e.g., account_id_random) that are not in the DB table
keys_to_remove = [k for k in data_to_insert.keys() if k.endswith('_id_random') and k != 'id_random']
for k in keys_to_remove:
del data_to_insert[k]
# Filter out model-specific excluded fields (e.g., view-only fields like person_full_name in Journal)
if hasattr(input_model, 'fields_to_exclude_from_db'):
for k in input_model.fields_to_exclude_from_db:
if k in data_to_insert:
del data_to_insert[k]
if sql_insert_result := sql_insert(data=data_to_insert, table_name=table_name_insert):
new_obj_id = sql_insert_result
new_obj_id_random = get_id_random(record_id=new_obj_id, table_name=obj_name)
if return_obj:
if sql_select_result := sql_select(table_name=table_name_select, record_id=new_obj_id):
resp_data = output_model(**sql_select_result).dict(by_alias=serialization.by_alias, exclude_unset=serialization.exclude_unset)
return mk_resp(data=resp_data, response=response)
return mk_resp(data={"obj_id": new_obj_id, "obj_id_random": new_obj_id_random}, response=response)
else:
return mk_resp(data=False, status_code=400, response=response, status_message="Failed to create object.")
@router.patch('/{obj_type_l1}/{obj_id}', response_model=Resp_Body_Base)
async def patch_obj(
request: Request,
response: Response,
obj_type_l1: str = Path(min_length=2, max_length=50),
obj_id: str = Path(min_length=11, max_length=22),
return_obj: Optional[bool] = True,
account: AccountContext = Depends(get_account_context),
serialization: SerializationParams = Depends(),
delay: DelayParams = Depends(),
):
"""
Update Object (Partial).
1. Resolves ID and checks access permissions.
2. **Sanitizes Payload**: Removes virtual lookup fields and view-only fields.
3. Performs SQL UPDATE.
"""
from app.db_sql import redis_lookup_id_random, sql_select, sql_update
if delay.sleep_time_s > 0: await asyncio.sleep(delay.sleep_time_s)
obj_data = await request.json()
obj_name = obj_type_l1
if obj_name not in obj_type_kv_li:
return mk_resp(data=False, status_code=400, response=response, status_message=f"Object type '{obj_name}' not found.")
obj_cfg = obj_type_kv_li[obj_name]
table_name_update = obj_cfg.get('tbl_update', obj_cfg.get('tbl'))
table_name_select = obj_cfg.get('tbl_default', obj_cfg.get('tbl'))
input_model = obj_cfg.get('mdl_in', obj_cfg.get('mdl'))
output_model = obj_cfg.get('mdl_out', obj_cfg.get('mdl_default', obj_cfg.get('mdl')))
if not table_name_update or not input_model or not table_name_select or not output_model:
return mk_resp(data=False, status_code=500, response=response, status_message=f"Configuration error.")
record_id = redis_lookup_id_random(record_id_random=obj_id, table_name=obj_name)
if not record_id:
return mk_resp(data=False, status_code=404, response=response, status_message=f"Object with ID '{obj_id}' not found.")
if existing_obj := sql_select(table_name=table_name_select, record_id=record_id):
if not check_account_access(existing_obj, account, obj_name):
return mk_resp(data=False, status_code=403, response=response, status_message="Access denied.")
else:
return mk_resp(data=False, status_code=404, response=response, status_message=f"Object with ID '{obj_id}' not found in database.")
# Filter out virtual _id_random fields (e.g., account_id_random) that are not in the DB table
keys_to_remove = [k for k in obj_data.keys() if k.endswith('_id_random') and k != 'id_random']
for k in keys_to_remove:
del obj_data[k]
# Filter out model-specific excluded fields (e.g., view-only fields like person_full_name in Journal)
if hasattr(input_model, 'fields_to_exclude_from_db'):
for k in input_model.fields_to_exclude_from_db:
if k in obj_data:
del obj_data[k]
if sql_update(data=obj_data, table_name=table_name_update, record_id=record_id):
if return_obj:
if sql_select_result := sql_select(table_name=table_name_select, record_id=record_id):
resp_data = output_model(**sql_select_result).dict(by_alias=serialization.by_alias, exclude_unset=serialization.exclude_unset)
return mk_resp(data=resp_data, response=response)
return mk_resp(data=True, response=response, status_message="Object updated successfully.")
else:
return mk_resp(data=False, status_code=400, response=response, status_message="Failed to update object.")
@router.delete('/{obj_type_l1}/{obj_id}', response_model=Resp_Body_Base)
async def delete_obj(
response: Response,
obj_type_l1: str = Path(min_length=2, max_length=50),
obj_id: str = Path(min_length=11, max_length=22),
method: str = Query('delete', regex='^(delete|hide|disable)$'),
account: AccountContext = Depends(get_account_context),
delay: DelayParams = Depends(),
):
"""
Delete Object.
Supports:
- Soft Delete: `method='hide'` or `method='disable'`.
- Hard Delete: `method='delete'`.
"""
from app.db_sql import redis_lookup_id_random, sql_select, sql_update, sql_delete
if delay.sleep_time_s > 0: await asyncio.sleep(delay.sleep_time_s)
obj_name = obj_type_l1
if obj_name not in obj_type_kv_li:
return mk_resp(data=False, status_code=400, response=response, status_message=f"Object type '{obj_name}' not found.")
obj_cfg = obj_type_kv_li[obj_name]
table_name_delete = obj_cfg.get('tbl_update', obj_cfg.get('tbl'))
table_name_select = obj_cfg.get('tbl_default', obj_cfg.get('tbl'))
if not table_name_delete:
return mk_resp(data=False, status_code=500, response=response, status_message=f"Configuration error.")
record_id = redis_lookup_id_random(record_id_random=obj_id, table_name=obj_name)
if not record_id:
return mk_resp(data=False, status_code=404, response=response, status_message=f"Object with ID '{obj_id}' not found.")
if existing_obj := sql_select(table_name=table_name_select, record_id=record_id):
if not check_account_access(existing_obj, account, obj_name):
return mk_resp(data=False, status_code=403, response=response, status_message="Access denied.")
else:
return mk_resp(data=False, status_code=404, response=response, status_message=f"Object with ID '{obj_id}' not found in database.")
if method == 'hide':
success = sql_update(table_name=table_name_delete, record_id=record_id, data={'hide': True})
elif method == 'disable':
success = sql_update(table_name=table_name_delete, record_id=record_id, data={'enable': False})
else:
success = sql_delete(table_name=table_name_delete, record_id=record_id)
if success:
return mk_resp(data=True, response=response, status_message=f"Object deleted successfully.")
return mk_resp(data=False, status_code=400, response=response, status_message="Failed to delete object.")
@router.get('/{parent_obj_type}/{parent_obj_id}/{child_obj_type}/', response_model=Resp_Body_Base)
async def get_child_obj_li(
response: Response,
parent_obj_type: str,
parent_obj_id: str,
child_obj_type: str,
order_by_li: Optional[str] = None,
jp: Optional[Union[str, None]] = None,
account: AccountContext = Depends(get_account_context),
pagination: PaginationParams = Depends(),
status_filter: StatusFilterParams = Depends(),
serialization: SerializationParams = Depends(),
delay: DelayParams = Depends(),
):
"""
List Child Objects (One-to-Many).
Retrieves a list of child objects associated with a specific parent.
1. Verifies parent existence and user access to the parent.
2. Filters children where `{parent_obj_type}_id` matches the parent's ID.
"""
from app.db_sql import redis_lookup_id_random, sql_select
if delay.sleep_time_s > 0: await asyncio.sleep(delay.sleep_time_s)
qry_dict_li = None
fulltext_qry_dict_obj = None
and_qry_dict_obj = None
and_like_dict_obj = None
or_like_dict_obj = None
and_in_dict_li_obj = None
jp_obj = safe_json_loads(urllib.parse.unquote(jp)) if jp else None
if jp_obj:
if jp_obj.get('qry'): qry_dict_li = jp_obj['qry']
if jp_obj.get('ft_qry'): fulltext_qry_dict_obj = jp_obj['ft_qry']
if jp_obj.get('and_qry'): and_qry_dict_obj = jp_obj['and_qry']
if jp_obj.get('and_like'): and_like_dict_obj = jp_obj['and_like']
if jp_obj.get('or_like'): or_like_dict_obj = jp_obj['or_like']
if jp_obj.get('and_in_li'): and_in_dict_li_obj = jp_obj['and_in_li']
order_by_li = safe_json_loads(order_by_li)
obj_name = child_obj_type
if obj_name not in obj_type_kv_li or parent_obj_type not in obj_type_kv_li:
return mk_resp(data=False, status_code=400, response=response, status_message=f"Invalid object type(s).")
obj_cfg = obj_type_kv_li[obj_name]
table_name = obj_cfg.get('tbl_default', obj_cfg.get('tbl'))
base_name = obj_cfg.get('mdl_default', obj_cfg.get('mdl'))
if not table_name or not base_name:
return mk_resp(data=False, status_code=500, response=response, status_message=f"Configuration error.")
order_by_li = filter_order_by(order_by_li, base_name, table_name)
status_filter = get_supported_filters(base_name, status_filter)
resolved_parent_id = redis_lookup_id_random(record_id_random=parent_obj_id, table_name=parent_obj_type)
if not resolved_parent_id:
return mk_resp(data=False, status_code=404, response=response, status_message=f"Parent not found.")
parent_cfg = obj_type_kv_li[parent_obj_type]
parent_table = parent_cfg.get('tbl_default', parent_cfg.get('tbl'))
if parent_sql_res := sql_select(table_name=parent_table, record_id=resolved_parent_id):
if not check_account_access(parent_sql_res, account, parent_obj_type):
return mk_resp(data=False, status_code=403, response=response, status_message="Access denied to parent.")
else:
return mk_resp(data=False, status_code=404, response=response, status_message="Parent not found.")
and_qry_dict_obj = apply_forced_account_filter(and_qry_dict_obj, account, base_name, obj_name)
sql_result = sql_select(
table_name=table_name,
field_name=f'{parent_obj_type}_id',
field_value=resolved_parent_id,
enabled=status_filter.enabled,
hidden=status_filter.hidden,
qry_dict_li=qry_dict_li,
fulltext_qry_dict=fulltext_qry_dict_obj,
and_qry_dict=and_qry_dict_obj,
and_like_dict=and_like_dict_obj,
or_like_dict=or_like_dict_obj,
and_in_dict_li=and_in_dict_li_obj,
order_by_li=order_by_li,
limit=pagination.limit,
offset=pagination.offset,
as_list=True,
)
if sql_result:
resp_data_li = [base_name(**record).dict(by_alias=serialization.by_alias, exclude_unset=serialization.exclude_unset, exclude_defaults=serialization.exclude_defaults, exclude_none=serialization.exclude_none) for record in sql_result]
return mk_resp(data=resp_data_li, response=response)
else:
return mk_resp(data=[], status_code=200, response=response)
@router.post('/{parent_obj_type}/{parent_obj_id}/{child_obj_type}/', response_model=Resp_Body_Base)
async def post_child_obj(
request: Request,
response: Response,
parent_obj_type: str = Path(min_length=2, max_length=50),
parent_obj_id: str = Path(min_length=11, max_length=22),
child_obj_type: str = Path(min_length=2, max_length=50),
return_obj: Optional[bool] = True,
account: AccountContext = Depends(get_account_context),
serialization: SerializationParams = Depends(),
delay: DelayParams = Depends(),
):
"""
Create Child Object.
1. Verifies Parent existence and access.
2. Automatically links the new child to the parent (`{parent_obj_type}_id` = parent_id).
3. Performs standard creation logic (validation, injection, sanitization).
"""
from app.db_sql import redis_lookup_id_random, sql_select, sql_insert, get_id_random
if delay.sleep_time_s > 0: await asyncio.sleep(delay.sleep_time_s)
obj_data = await request.json()
if parent_obj_type not in obj_type_kv_li or child_obj_type not in obj_type_kv_li:
return mk_resp(data=False, status_code=400, response=response, status_message="Invalid object type.")
resolved_parent_id = redis_lookup_id_random(record_id_random=parent_obj_id, table_name=parent_obj_type)
if not resolved_parent_id:
return mk_resp(data=False, status_code=404, response=response, status_message="Parent not found.")
parent_cfg = obj_type_kv_li[parent_obj_type]
parent_table = parent_cfg.get('tbl_default', parent_cfg.get('tbl'))
if parent_sql_res := sql_select(table_name=parent_table, record_id=resolved_parent_id):
if not check_account_access(parent_sql_res, account, parent_obj_type):
return mk_resp(data=False, status_code=403, response=response, status_message="Access denied to parent.")
else:
return mk_resp(data=False, status_code=404, response=response, status_message="Parent not found.")
obj_cfg = obj_type_kv_li[child_obj_type]
table_name_insert = obj_cfg.get('tbl_update', obj_cfg.get('tbl'))
table_name_select = obj_cfg.get('tbl_default', obj_cfg.get('tbl'))
input_model = obj_cfg.get('mdl_in', obj_cfg.get('mdl'))
output_model = obj_cfg.get('mdl_out', obj_cfg.get('mdl_default', obj_cfg.get('mdl')))
if not account.super and account.auth_method != 'bypass' and account.account_id:
if 'account_id' in input_model.__fields__:
obj_data['account_id'] = account.account_id
obj_data[f'{parent_obj_type}_id'] = resolved_parent_id
try:
validated_obj = input_model(**obj_data)
except Exception as e:
return mk_resp(data=False, status_code=400, response=response, status_message=f"Validation error: {e}")
data_to_insert = validated_obj.dict(exclude_unset=True)
if sql_insert_result := sql_insert(data=data_to_insert, table_name=table_name_insert):
new_obj_id = sql_insert_result
new_obj_id_random = get_id_random(record_id=new_obj_id, table_name=child_obj_type)
if return_obj:
if sql_select_result := sql_select(table_name=table_name_select, record_id=new_obj_id):
resp_data = output_model(**sql_select_result).dict(by_alias=serialization.by_alias, exclude_unset=serialization.exclude_unset)
return mk_resp(data=resp_data, response=response)
return mk_resp(data={"obj_id": new_obj_id, "obj_id_random": new_obj_id_random}, response=response)
else:
return mk_resp(data=False, status_code=400, response=response, status_message="Failed to create child object.")
@router.get('/{parent_obj_type}/{parent_obj_id}/{child_obj_type}/{child_obj_id}', response_model=Resp_Body_Base)
async def get_child_obj(
response: Response,
parent_obj_type: str = Path(min_length=2, max_length=50),
parent_obj_id: str = Path(min_length=11, max_length=22),
child_obj_type: str = Path(min_length=2, max_length=50),
child_obj_id: str = Path(min_length=11, max_length=22),
account: AccountContext = Depends(get_account_context),
serialization: SerializationParams = Depends(),
delay: DelayParams = Depends(),
):
"""
Retrieve Child Object.
Verifies that the child belongs to the specified parent.
"""
from app.db_sql import redis_lookup_id_random, sql_select
if delay.sleep_time_s > 0: await asyncio.sleep(delay.sleep_time_s)
resolved_parent_id = redis_lookup_id_random(record_id_random=parent_obj_id, table_name=parent_obj_type)
resolved_child_id = redis_lookup_id_random(record_id_random=child_obj_id, table_name=child_obj_type)
if not resolved_parent_id or not resolved_child_id:
return mk_resp(data=False, status_code=404, response=response, status_message="Object(s) not found.")
obj_cfg = obj_type_kv_li[child_obj_type]
table_name = obj_cfg.get('tbl_default', obj_cfg.get('tbl'))
base_name = obj_cfg.get('mdl_default', obj_cfg.get('mdl'))
if sql_result := sql_select(table_name=table_name, record_id=resolved_child_id):
if sql_result.get(f'{parent_obj_type}_id') != resolved_parent_id:
return mk_resp(data=False, status_code=404, response=response, status_message="Child not found under parent.")
resp_data = base_name(**sql_result).dict(by_alias=serialization.by_alias, exclude_unset=serialization.exclude_unset)
return mk_resp(data=resp_data, response=response)
return mk_resp(data=False, status_code=404, response=response, status_message="Child not found.")
@router.patch('/{parent_obj_type}/{parent_obj_id}/{child_obj_type}/{child_obj_id}', response_model=Resp_Body_Base)
async def patch_child_obj(
request: Request,
response: Response,
parent_obj_type: str = Path(min_length=2, max_length=50),
parent_obj_id: str = Path(min_length=11, max_length=22),
child_obj_type: str = Path(min_length=2, max_length=50),
child_obj_id: str = Path(min_length=11, max_length=22),
return_obj: Optional[bool] = True,
account: AccountContext = Depends(get_account_context),
serialization: SerializationParams = Depends(),
delay: DelayParams = Depends(),
):
"""
Update Child Object.
Verifies that the child belongs to the specified parent before updating.
"""
from app.db_sql import redis_lookup_id_random, sql_select, sql_update
if delay.sleep_time_s > 0: await asyncio.sleep(delay.sleep_time_s)
obj_data = await request.json()
resolved_parent_id = redis_lookup_id_random(record_id_random=parent_obj_id, table_name=parent_obj_type)
resolved_child_id = redis_lookup_id_random(record_id_random=child_obj_id, table_name=child_obj_type)
if not resolved_parent_id or not resolved_child_id:
return mk_resp(data=False, status_code=404, response=response, status_message="Object(s) not found.")
obj_cfg = obj_type_kv_li[child_obj_type]
table_name_update = obj_cfg.get('tbl_update', obj_cfg.get('tbl'))
table_name_select = obj_cfg.get('tbl_default', obj_cfg.get('tbl'))
output_model = obj_cfg.get('mdl_out', obj_cfg.get('mdl_default', obj_cfg.get('mdl')))
if existing_child := sql_select(table_name=table_name_select, record_id=resolved_child_id):
if existing_child.get(f'{parent_obj_type}_id') != resolved_parent_id:
return mk_resp(data=False, status_code=404, response=response, status_message="Child not found under parent.")
else:
return mk_resp(data=False, status_code=404, response=response, status_message="Child not found.")
if sql_update(data=obj_data, table_name=table_name_update, record_id=resolved_child_id):
if return_obj:
if updated_child := sql_select(table_name=table_name_select, record_id=resolved_child_id):
resp_data = output_model(**updated_child).dict(by_alias=serialization.by_alias, exclude_unset=serialization.exclude_unset)
return mk_resp(data=resp_data, response=response)
return mk_resp(data=True, response=response, status_message="Updated successfully.")
return mk_resp(data=False, status_code=400, response=response, status_message="Update failed.")
@router.delete('/{parent_obj_type}/{parent_obj_id}/{child_obj_type}/{child_obj_id}', response_model=Resp_Body_Base)
async def delete_child_obj(
response: Response,
parent_obj_type: str = Path(min_length=2, max_length=50),
parent_obj_id: str = Path(min_length=11, max_length=22),
child_obj_type: str = Path(min_length=2, max_length=50),
child_obj_id: str = Path(min_length=11, max_length=22),
method: str = Query('delete', regex='^(delete|hide|disable)$'),
account: AccountContext = Depends(get_account_context),
delay: DelayParams = Depends(),
):
"""
Delete Child Object.
Verifies that the child belongs to the specified parent before deleting.
"""
from app.db_sql import redis_lookup_id_random, sql_select, sql_update, sql_delete
if delay.sleep_time_s > 0: await asyncio.sleep(delay.sleep_time_s)
resolved_parent_id = redis_lookup_id_random(record_id_random=parent_obj_id, table_name=parent_obj_type)
resolved_child_id = redis_lookup_id_random(record_id_random=child_obj_id, table_name=child_obj_type)
if not resolved_parent_id or not resolved_child_id:
return mk_resp(data=False, status_code=404, response=response, status_message="Object(s) not found.")
obj_cfg = obj_type_kv_li[child_obj_type]
table_name_delete = obj_cfg.get('tbl_update', obj_cfg.get('tbl'))
table_name_select = obj_cfg.get('tbl_default', obj_cfg.get('tbl'))
if existing_child := sql_select(table_name=table_name_select, record_id=resolved_child_id):
if existing_child.get(f'{parent_obj_type}_id') != resolved_parent_id:
return mk_resp(data=False, status_code=404, response=response, status_message="Child not found under parent.")
else:
return mk_resp(data=False, status_code=404, response=response, status_message="Child not found.")
if method == 'hide':
success = sql_update(table_name=table_name_delete, record_id=resolved_child_id, data={'hide': True})
elif method == 'disable':
success = sql_update(table_name=table_name_delete, record_id=resolved_child_id, data={'enable': False})
else:
success = sql_delete(table_name=table_name_delete, record_id=resolved_child_id)
if success:
return mk_resp(data=True, response=response, status_message=f"Deleted successfully via {method}.")
return mk_resp(data=False, status_code=400, response=response, status_message="Deletion failed.")