Files
OSIT-AE-API-FastAPI/app/routers/api_crud_v3.py
Scott Idem 34a752d455 feat(api-v3): implement permissive updates, automatic ID resolution, and structured error reporting
- Added 'x-ae-ignore-extra-fields' header to support stripping unknown fields in POST/PATCH.
- Added automatic resolution of '*_id_random' strings to integer IDs in 'sanitize_payload'.
- Refactored 'post_obj' to return structured (field -> message) validation errors in 'meta.details'.
- Updated 'mk_resp' to support non-string 'details' in response metadata.
- Added 'tests/verify_feedback_fixes.py' to validate logic changes.

Ref: V3 API Refinement Feedback from mcp_agent.
2026-01-14 19:11:56 -05:00

556 lines
26 KiB
Python

from fastapi import APIRouter, Depends, Header, HTTPException, Path, Query, Request, Response, status
from typing import Any, Dict, List, Optional, Set, Union
import json
import urllib.parse
import time
import asyncio
import logging
log = logging.getLogger(__name__)
from app.lib_general_v3 import (
AccountContext, get_account_context, get_account_context_optional,
PaginationParams, StatusFilterParams,
SerializationParams, DelayParams
)
from app.lib_api_crud_v3 import (
check_account_access, apply_forced_account_filter, filter_order_by,
get_supported_filters, safe_json_loads, sanitize_payload, format_db_error
)
from app.lib_schema_v3 import get_object_schema_info
from app.db_sql import get_last_sql_error
from app.models.response_models import *
from app.models.api_crud_models import SearchFilter, SearchQuery
from app.ae_obj_types_def import obj_type_kv_li
# from app.db_sql import redis_lookup_id_random, sql_select, sql_insert, sql_update, sql_delete, get_id_random
"""
Aether API V3 - Generic CRUD Router
-----------------------------------
This router provides a standardized, dynamic interface for Create, Read, Update, and Delete operations
across the Aether Object System.
Key Features:
1. Dynamic Object Handling: Routes work for any object defined in `ae_obj_types_def`.
2. Multi-Tenancy: Automatically enforces `account_id` isolation for non-superusers.
3. ID Obfuscation: Uses `id_random` (public strings) -> `id` (database integers) resolution.
4. Data Sanitization: Automatically strips virtual fields and view-only fields before DB writes.
"""
router = APIRouter()
# --- Routes ---
@router.get("/health", response_model=Resp_Body_Base)
async def health_check(delay: DelayParams = Depends()):
"""
Health Check Endpoint.
Used by monitoring systems to verify API availability.
"""
if delay.sleep_time_s > 0: await asyncio.sleep(delay.sleep_time_s)
return mk_resp(data={"status": "V3 API is healthy!"})
@router.get("/{obj_type}/schema", response_model=Resp_Body_Base, tags=['CRUD v3 Schema (Dev)'])
async def get_obj_schema(
response: Response,
obj_type: str = Path(min_length=2, max_length=50),
view: str = Query('default'),
variant: str = Query('base'),
account: AccountContext = Depends(get_account_context),
):
"""
Dynamic Schema Introspection.
Allows the frontend (e.g., Svelte/React apps) to retrieve the structure of an object type on the fly.
Returns:
- Database column definitions (types, defaults, nullability).
- Pydantic model field definitions (validation rules, aliases).
This enables dynamic form generation without hardcoding schemas in the frontend.
"""
schema_info = get_object_schema_info(obj_type, view, variant)
if "error" in schema_info:
status_code = 400 if "not found" in schema_info["error"] else 500
return mk_resp(data=False, status_code=status_code, response=response, status_message=schema_info["error"])
return mk_resp(data=schema_info, response=response)
@router.post("/{obj_type}/validate", response_model=Resp_Body_Base, tags=['CRUD v3 Validation (Dev)'])
async def validate_obj_payload(
request: Request,
response: Response,
obj_type: str = Path(min_length=2, max_length=50),
account: AccountContext = Depends(get_account_context),
):
"""
Dry-Run Payload Validation.
Verifies that a payload is valid according to the Pydantic model
without performing any database operations.
"""
obj_data = await request.json()
if obj_type not in obj_type_kv_li:
return mk_resp(data=False, status_code=400, response=response, status_message=f"Object type '{obj_type}' not found.")
obj_cfg = obj_type_kv_li[obj_type]
input_model = obj_cfg.get('mdl_in', obj_cfg.get('mdl'))
try:
input_model(**obj_data)
return mk_resp(data=True, response=response, status_message="Payload is valid.")
except Exception as e:
return mk_resp(data=False, status_code=400, response=response, status_message="Validation Failed", details=str(e))
@router.get('/{obj_type_l1}/{obj_id}', response_model=Resp_Body_Base)
async def get_obj(
response: Response,
obj_type_l1: str = Path(min_length=2, max_length=50),
obj_id: str = Path(min_length=11, max_length=22),
view: str = Query('default'),
account: AccountContext = Depends(get_account_context),
serialization: SerializationParams = Depends(),
delay: DelayParams = Depends(),
):
"""
Retrieve a Single Object.
1. Resolves the public `id_random` (string) to the internal `id` (integer).
2. Performs a SQL SELECT.
3. Enforces Multi-Tenant access checks.
4. Serializes the result via Pydantic.
"""
from app.db_sql import redis_lookup_id_random, sql_select
if delay.sleep_time_s > 0: await asyncio.sleep(delay.sleep_time_s)
obj_name = obj_type_l1
if obj_name not in obj_type_kv_li:
return mk_resp(data=False, status_code=400, response=response, status_message=f"Object type '{obj_name}' not found.")
obj_cfg = obj_type_kv_li[obj_name]
table_name = obj_cfg.get(f'tbl_{view}', obj_cfg.get('tbl_default', obj_cfg.get('tbl')))
base_name = obj_cfg.get(f'mdl_{view}', obj_cfg.get('mdl_default', obj_cfg.get('mdl')))
if not table_name or not base_name:
return mk_resp(data=False, status_code=500, response=response, status_message=f"Configuration error.")
record_id = redis_lookup_id_random(record_id_random=obj_id, table_name=obj_name)
if not record_id:
return mk_resp(data=False, status_code=404, response=response, status_message=f"Object with ID '{obj_id}' not found.")
if sql_result := sql_select(table_name=table_name, record_id=record_id):
if not check_account_access(sql_result, account, obj_name):
return mk_resp(data=False, status_code=403, response=response, status_message="Access denied.")
resp_data = base_name(**sql_result).dict(by_alias=serialization.by_alias, exclude_unset=serialization.exclude_unset, exclude_defaults=serialization.exclude_defaults, exclude_none=serialization.exclude_none)
return mk_resp(data=resp_data, response=response)
else:
return mk_resp(data=False, status_code=404, response=response, status_message=f"Object with ID '{obj_id}' not found in database.")
@router.get('/{obj_type_l1}/', response_model=Resp_Body_Base)
async def get_obj_li(
response: Response,
obj_type_l1: str,
for_obj_type: Optional[str] = None,
for_obj_id: Optional[str] = None,
view: str = Query('default'),
order_by_li: Optional[str] = None,
jp: Optional[Union[str, None]] = None,
account: AccountContext = Depends(get_account_context),
pagination: PaginationParams = Depends(),
status_filter: StatusFilterParams = Depends(),
serialization: SerializationParams = Depends(),
delay: DelayParams = Depends(),
):
"""
List Objects (Pagination & Filtering).
Supports:
- Standard filtering (enabled/hidden).
- Advanced filtering via JSON Payload (`jp`) param (Search, Fulltext, AND/OR queries).
- Sorting (`order_by_li`).
- Parent-Child filtering (`for_obj_type`, `for_obj_id`).
- Account Isolation (automatically enforced).
"""
from app.db_sql import redis_lookup_id_random, sql_select
if delay.sleep_time_s > 0: await asyncio.sleep(delay.sleep_time_s)
qry_dict_li = None
fulltext_qry_dict_obj = None
and_qry_dict_obj = None
and_like_dict_obj = None
or_like_dict_obj = None
and_in_dict_li_obj = None
jp_obj = safe_json_loads(urllib.parse.unquote(jp)) if jp else None
if jp_obj:
if jp_obj.get('qry'): qry_dict_li = jp_obj['qry']
if jp_obj.get('ft_qry'): fulltext_qry_dict_obj = jp_obj['ft_qry']
if jp_obj.get('and_qry'): and_qry_dict_obj = jp_obj['and_qry']
if jp_obj.get('and_like'): and_like_dict_obj = jp_obj['and_like']
if jp_obj.get('or_like'): or_like_dict_obj = jp_obj['or_like']
if jp_obj.get('and_in_li'): and_in_dict_li_obj = jp_obj['and_in_li']
order_by_li = safe_json_loads(order_by_li)
obj_name = obj_type_l1
if obj_name not in obj_type_kv_li:
return mk_resp(data=False, status_code=400, response=response, status_message=f"Object type '{obj_name}' not found.")
if obj_name == 'site' and not (for_obj_type == 'account' and for_obj_id):
return mk_resp(data=False, status_code=403, response=response, status_message="Listing sites is only permitted when filtered by account.")
if for_obj_type == 'account' and for_obj_id:
if not account.super and for_obj_id != account.account_id_random:
return mk_resp(data=False, status_code=403, response=response, status_message="Access denied to requested account.")
obj_cfg = obj_type_kv_li[obj_name]
table_name = obj_cfg.get(f'tbl_{view}', obj_cfg.get('tbl_default', obj_cfg.get('tbl')))
base_name = obj_cfg.get(f'mdl_{view}', obj_cfg.get('mdl_default', obj_cfg.get('mdl')))
if not table_name or not base_name:
return mk_resp(data=False, status_code=500, response=response, status_message=f"Configuration error.")
order_by_li = filter_order_by(order_by_li, base_name, table_name)
status_filter = get_supported_filters(base_name, status_filter)
and_qry_dict_obj = apply_forced_account_filter(and_qry_dict_obj, account, base_name, obj_name)
if for_obj_type and for_obj_id:
resolved_for_obj_id = redis_lookup_id_random(record_id_random=for_obj_id, table_name=for_obj_type)
if not resolved_for_obj_id:
return mk_resp(data=False, status_code=404, response=response, status_message=f"Parent object with ID '{for_obj_id}' not found.")
sql_result = sql_select(
table_name=table_name,
field_name=f'{for_obj_type}_id',
field_value=resolved_for_obj_id,
enabled=status_filter.enabled,
hidden=status_filter.hidden,
qry_dict_li=qry_dict_li,
fulltext_qry_dict=fulltext_qry_dict_obj,
and_qry_dict=and_qry_dict_obj,
and_like_dict=and_like_dict_obj,
or_like_dict=or_like_dict_obj,
and_in_dict_li=and_in_dict_li_obj,
order_by_li=order_by_li,
limit=pagination.limit,
offset=pagination.offset,
as_list=True,
)
else:
sql_result = sql_select(
table_name=table_name,
enabled=status_filter.enabled,
hidden=status_filter.hidden,
qry_dict_li=qry_dict_li,
fulltext_qry_dict=fulltext_qry_dict_obj,
and_qry_dict=and_qry_dict_obj,
and_like_dict=and_like_dict_obj,
or_like_dict=or_like_dict_obj,
and_in_dict_li=and_in_dict_li_obj,
order_by_li=order_by_li,
limit=pagination.limit,
offset=pagination.offset,
as_list=True,
)
if sql_result:
resp_data_li = [base_name(**record).dict(by_alias=serialization.by_alias, exclude_unset=serialization.exclude_unset, exclude_defaults=serialization.exclude_defaults, exclude_none=serialization.exclude_none) for record in sql_result]
return mk_resp(data=resp_data_li, response=response)
else:
return mk_resp(data=[], status_code=200, response=response)
@router.post('/{obj_type_l1}/search', response_model=Resp_Body_Base, tags=['CRUD v3 Search (Dev)'])
async def search_obj_li(
response: Response,
obj_type_l1: str,
search_query: SearchQuery,
for_obj_type: Optional[str] = Query(None),
for_obj_id: Optional[str] = Query(None),
view: str = Query('default'),
order_by_li: Optional[str] = Query(None),
account: AccountContext = Depends(get_account_context_optional),
pagination: PaginationParams = Depends(),
status_filter: StatusFilterParams = Depends(),
serialization: SerializationParams = Depends(),
delay: DelayParams = Depends(),
):
"""
Search Objects (POST).
Advanced search endpoint using `SearchQuery` body.
- Security: Guests can access specific objects (e.g., site_domain) if permitted.
- Filtering: Supports dynamic AND/OR filters built from the frontend.
"""
from app.db_sql import redis_lookup_id_random, sql_select
if delay.sleep_time_s > 0: await asyncio.sleep(delay.sleep_time_s)
order_by_li = safe_json_loads(order_by_li)
obj_name = obj_type_l1
if obj_name not in obj_type_kv_li:
return mk_resp(data=False, status_code=400, response=response, status_message=f"Object type '{obj_name}' not found.")
is_guest = (account.auth_method == 'guest')
is_site_domain_lookup = (obj_name == 'site_domain')
if is_guest and not is_site_domain_lookup:
return mk_resp(data=False, status_code=403, response=response, status_message="Authentication required for this search.")
if obj_name == 'site' and not (for_obj_type == 'account' and for_obj_id):
return mk_resp(data=False, status_code=403, response=response, status_message="Listing sites is only permitted when filtered by account.")
obj_cfg = obj_type_kv_li[obj_name]
table_name = obj_cfg.get(f'tbl_{view}', obj_cfg.get('tbl_default', obj_cfg.get('tbl')))
base_name = obj_cfg.get(f'mdl_{view}', obj_cfg.get('mdl_default', obj_cfg.get('mdl')))
if not table_name or not base_name:
return mk_resp(data=False, status_code=500, response=response, status_message=f"Configuration error.")
order_by_li = filter_order_by(order_by_li, base_name, table_name)
status_filter = get_supported_filters(base_name, status_filter)
searchable_fields = obj_cfg.get('searchable_fields')
if for_obj_type == 'account' and for_obj_id:
if not account.super and for_obj_id != account.account_id_random:
return mk_resp(data=False, status_code=403, response=response, status_message="Access denied to requested account.")
if not account.super and account.auth_method != 'bypass' and account.account_id:
if search_query.and_filters is None: search_query.and_filters = []
if obj_name == 'account':
search_query.and_filters.append(SearchFilter(field='id', op='eq', value=account.account_id))
elif base_name and hasattr(base_name, '__fields__') and 'account_id' in base_name.__fields__:
search_query.and_filters.append(SearchFilter(field='account_id', op='eq', value=account.account_id))
if for_obj_type and for_obj_id:
resolved_for_obj_id = redis_lookup_id_random(record_id_random=for_obj_id, table_name=for_obj_type)
if not resolved_for_obj_id:
return mk_resp(data=False, status_code=404, response=response, status_message=f"Parent object with ID '{for_obj_id}' not found.")
sql_result = sql_select(
table_name=table_name,
field_name=f'{for_obj_type}_id',
field_value=resolved_for_obj_id,
enabled=status_filter.enabled,
hidden=status_filter.hidden,
search_query=search_query,
searchable_fields=searchable_fields,
order_by_li=order_by_li,
limit=pagination.limit,
offset=pagination.offset,
as_list=True,
)
else:
sql_result = sql_select(
table_name=table_name,
enabled=status_filter.enabled,
hidden=status_filter.hidden,
search_query=search_query,
searchable_fields=searchable_fields,
order_by_li=order_by_li,
limit=pagination.limit,
offset=pagination.offset,
as_list=True,
)
if sql_result:
resp_data_li = [base_name(**record).dict(by_alias=serialization.by_alias, exclude_unset=serialization.exclude_unset, exclude_defaults=serialization.exclude_defaults, exclude_none=serialization.exclude_none) for record in sql_result]
return mk_resp(data=resp_data_li, response=response)
else:
return mk_resp(data=[], status_code=200, response=response)
@router.post('/{obj_type_l1}/', response_model=Resp_Body_Base)
async def post_obj(
request: Request,
response: Response,
obj_type_l1: str = Path(min_length=2, max_length=50),
return_obj: Optional[bool] = True,
x_ae_ignore_extra_fields: Optional[bool] = Header(False),
account: AccountContext = Depends(get_account_context),
serialization: SerializationParams = Depends(),
delay: DelayParams = Depends(),
):
"""
Create Object.
1. Injects `account_id` for ownership.
2. **Sanitizes Payload**: Resolves `*_id_random` -> `*_id`, removes virtual fields, and view-only fields.
- If `x-ae-ignore-extra-fields: true` header is provided, unknown fields are stripped.
3. Validates input against Pydantic model (`mdl_in`).
4. Returns the created object or just its ID.
"""
from app.db_sql import sql_insert, get_id_random, sql_select
if delay.sleep_time_s > 0: await asyncio.sleep(delay.sleep_time_s)
obj_data = await request.json()
obj_name = obj_type_l1
if obj_name not in obj_type_kv_li:
return mk_resp(data=False, status_code=400, response=response, status_message=f"Object type '{obj_name}' not found.")
obj_cfg = obj_type_kv_li[obj_name]
table_name_insert = obj_cfg.get('tbl_update', obj_cfg.get('tbl'))
table_name_select = obj_cfg.get('tbl_default', obj_cfg.get('tbl'))
input_model = obj_cfg.get('mdl_in', obj_cfg.get('mdl'))
output_model = obj_cfg.get('mdl_out', obj_cfg.get('mdl_default', obj_cfg.get('mdl')))
if not table_name_insert or not input_model or not table_name_select or not output_model:
return mk_resp(data=False, status_code=500, response=response, status_message=f"Configuration error.")
if not account.super and account.auth_method != 'bypass' and account.account_id:
if 'account_id' in input_model.__fields__:
obj_data['account_id'] = account.account_id
elif obj_name == 'account':
return mk_resp(data=False, status_code=403, response=response, status_message="Account creation is restricted.")
# Sanitize payload (ID resolution, virtual fields, and optionally extra fields)
sanitize_payload(obj_data, input_model, ignore_extra=x_ae_ignore_extra_fields)
try:
validated_obj = input_model(**obj_data)
except ValidationError as e:
# Return structured errors (field -> error message) for UI feedback
structured_errors = {err['loc'][-1]: err['msg'] for err in e.errors()}
return mk_resp(data=False, status_code=400, response=response, status_message="Validation Failed", details=structured_errors)
except Exception as e:
return mk_resp(data=False, status_code=400, response=response, status_message="Validation Failed", details=str(e))
data_to_insert = validated_obj.dict(exclude_unset=True)
if sql_insert_result := sql_insert(data=data_to_insert, table_name=table_name_insert):
new_obj_id = sql_insert_result
new_obj_id_random = get_id_random(record_id=new_obj_id, table_name=obj_name)
if return_obj:
if sql_select_result := sql_select(table_name=table_name_select, record_id=new_obj_id):
resp_data = output_model(**sql_select_result).dict(by_alias=serialization.by_alias, exclude_unset=serialization.exclude_unset)
return mk_resp(data=resp_data, response=response)
return mk_resp(data={"obj_id": new_obj_id, "obj_id_random": new_obj_id_random}, response=response)
else:
db_err = format_db_error(get_last_sql_error())
return mk_resp(data=False, status_code=400, response=response, status_message="Failed to create object.", details=db_err)
@router.patch('/{obj_type_l1}/{obj_id}', response_model=Resp_Body_Base)
async def patch_obj(
request: Request,
response: Response,
obj_type_l1: str = Path(min_length=2, max_length=50),
obj_id: str = Path(min_length=11, max_length=22),
return_obj: Optional[bool] = True,
x_ae_ignore_extra_fields: Optional[bool] = Header(False),
account: AccountContext = Depends(get_account_context),
serialization: SerializationParams = Depends(),
delay: DelayParams = Depends(),
):
"""
Update Object (Partial).
1. Resolves ID and checks access permissions.
2. **Sanitizes Payload**: Resolves `*_id_random` -> `*_id`, removes virtual fields, and view-only fields.
- If `x-ae-ignore-extra-fields: true` header is provided, unknown fields are stripped.
3. Performs SQL UPDATE.
"""
from app.db_sql import redis_lookup_id_random, sql_select, sql_update
if delay.sleep_time_s > 0: await asyncio.sleep(delay.sleep_time_s)
obj_data = await request.json()
obj_name = obj_type_l1
if obj_name not in obj_type_kv_li:
return mk_resp(data=False, status_code=400, response=response, status_message=f"Object type '{obj_name}' not found.")
obj_cfg = obj_type_kv_li[obj_name]
table_name_update = obj_cfg.get('tbl_update', obj_cfg.get('tbl'))
table_name_select = obj_cfg.get('tbl_default', obj_cfg.get('tbl'))
input_model = obj_cfg.get('mdl_in', obj_cfg.get('mdl'))
output_model = obj_cfg.get('mdl_out', obj_cfg.get('mdl_default', obj_cfg.get('mdl')))
if not table_name_update or not input_model or not table_name_select or not output_model:
return mk_resp(data=False, status_code=500, response=response, status_message=f"Configuration error.")
record_id = redis_lookup_id_random(record_id_random=obj_id, table_name=obj_name)
if not record_id:
return mk_resp(data=False, status_code=404, response=response, status_message=f"Object with ID '{obj_id}' not found.")
if existing_obj := sql_select(table_name=table_name_select, record_id=record_id):
if not check_account_access(existing_obj, account, obj_name):
return mk_resp(data=False, status_code=403, response=response, status_message="Access denied.")
else:
return mk_resp(data=False, status_code=404, response=response, status_message=f"Object with ID '{obj_id}' not found in database.")
# Sanitize payload (ID resolution, virtual fields, and optionally extra fields)
sanitize_payload(obj_data, input_model, ignore_extra=x_ae_ignore_extra_fields)
if sql_update(data=obj_data, table_name=table_name_update, record_id=record_id):
if return_obj:
if sql_select_result := sql_select(table_name=table_name_select, record_id=record_id):
resp_data = output_model(**sql_select_result).dict(by_alias=serialization.by_alias, exclude_unset=serialization.exclude_unset)
return mk_resp(data=resp_data, response=response)
return mk_resp(data=True, response=response, status_message="Object updated successfully.")
else:
db_err = format_db_error(get_last_sql_error())
return mk_resp(data=False, status_code=400, response=response, status_message="Failed to update object.", details=db_err)
@router.delete('/{obj_type_l1}/{obj_id}', response_model=Resp_Body_Base)
async def delete_obj(
response: Response,
obj_type_l1: str = Path(min_length=2, max_length=50),
obj_id: str = Path(min_length=11, max_length=22),
method: str = Query('delete', regex='^(delete|hide|disable)$'),
account: AccountContext = Depends(get_account_context),
delay: DelayParams = Depends(),
):
"""
Delete Object.
Supports:
- Soft Delete: `method='hide'` or `method='disable'`.
- Hard Delete: `method='delete'`.
"""
from app.db_sql import redis_lookup_id_random, sql_select, sql_update, sql_delete
if delay.sleep_time_s > 0: await asyncio.sleep(delay.sleep_time_s)
obj_name = obj_type_l1
if obj_name not in obj_type_kv_li:
return mk_resp(data=False, status_code=400, response=response, status_message=f"Object type '{obj_name}' not found.")
obj_cfg = obj_type_kv_li[obj_name]
table_name_delete = obj_cfg.get('tbl_update', obj_cfg.get('tbl'))
table_name_select = obj_cfg.get('tbl_default', obj_cfg.get('tbl'))
if not table_name_delete:
return mk_resp(data=False, status_code=500, response=response, status_message=f"Configuration error.")
record_id = redis_lookup_id_random(record_id_random=obj_id, table_name=obj_name)
if not record_id:
return mk_resp(data=False, status_code=404, response=response, status_message=f"Object with ID '{obj_id}' not found.")
if existing_obj := sql_select(table_name=table_name_select, record_id=record_id):
if not check_account_access(existing_obj, account, obj_name):
return mk_resp(data=False, status_code=403, response=response, status_message="Access denied.")
else:
return mk_resp(data=False, status_code=404, response=response, status_message=f"Object with ID '{obj_id}' not found in database.")
if method == 'hide':
success = sql_update(table_name=table_name_delete, record_id=record_id, data={'hide': True})
elif method == 'disable':
success = sql_update(table_name=table_name_delete, record_id=record_id, data={'enable': False})
else:
success = sql_delete(table_name=table_name_delete, record_id=record_id)
if success:
return mk_resp(data=True, response=response, status_message=f"Object deleted successfully.")
return mk_resp(data=False, status_code=400, response=response, status_message="Failed to delete object.")
# Nested / Child Routes are imported and included below to keep this file manageable.
from app.routers.api_crud_v3_nested import router as nested_router
router.include_router(nested_router)