Files
OSIT-AE-API-FastAPI/app/routers/api_crud_v3.py
Scott Idem cffde249d3 fix(models): migrate Archive_Content_Base to Vision ID pattern
- Replace integer `id` (alias archive_content_id) with Vision string fields:
  `id: Optional[str]` and `archive_content_id: Optional[str]` — both always
  hold the random string ID, never the DB integer.
- Add `root_validator(pre=True)` (map_v3_ids) that maps id_random /
  archive_content_id_random → id and archive_content_id, with collision
  prevention to reject any integer that arrives in these fields.
- Remove old `archive_content_id_lookup` integer validator (superseded by
  sanitize_payload + root_validator).
- Keep `id_random` (alias archive_content_id_random) in responses for
  backward compatibility; add id, archive_content_id, id_random to
  fields_to_exclude_from_db so they never appear in INSERT/UPDATE payloads.

Generic CRUD layer safety net (post_obj + post_child_obj):
- After building resp_data on create, swap any integer {obj_type}_id with
  the corresponding {obj_type}_id_random value — catches models not yet
  migrated to Vision IDs.
- Fix return_obj=False fallback to return obj_id as the random string.

Docs: add Section 3D to GUIDE__AE_API_V3_for_Frontend.md documenting the
Vision ID convention — {obj_type}_id is always the random string; the
_id_random suffix is a legacy artifact that frontend code should phase out.

Fixes: POST /v3/crud/archive/{id}/archive_content/ returning integer ID,
breaking the subsequent PATCH flow (422 min_length validation failure).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-25 17:40:27 -04:00

598 lines
28 KiB
Python

from fastapi import APIRouter, Depends, Header, HTTPException, Path, Query, Request, Response, status
from typing import Any, Dict, List, Optional, Set, Union
import json
import urllib.parse
import time
import asyncio
import logging
log = logging.getLogger(__name__)
from app.lib_general_v3 import (
AccountContext, get_account_context, get_account_context_optional,
PaginationParams, StatusFilterParams,
SerializationParams, DelayParams
)
from app.lib_api_crud_v3 import (
check_account_access, apply_forced_account_filter, filter_order_by,
get_supported_filters, safe_json_loads, sanitize_payload, format_db_error
)
from app.lib_schema_v3 import get_object_schema_info
from app.db_sql import get_last_sql_error
from app.models.response_models import *
from app.models.api_crud_models import SearchFilter, SearchQuery
from app.ae_obj_types_def import obj_type_kv_li
# from app.db_sql import redis_lookup_id_random, sql_select, sql_insert, sql_update, sql_delete, get_id_random
"""
Aether API V3 - Generic CRUD Router
-----------------------------------
This router provides a standardized, dynamic interface for Create, Read, Update, and Delete operations
across the Aether Object System.
Key Features:
1. Dynamic Object Handling: Routes work for any object defined in `ae_obj_types_def`.
2. Multi-Tenancy: Automatically enforces `account_id` isolation for non-superusers.
3. ID Obfuscation: Uses `id_random` (public strings) -> `id` (database integers) resolution.
4. Data Sanitization: Automatically strips virtual fields and view-only fields before DB writes.
"""
router = APIRouter()
# --- Routes ---
@router.get("/health", response_model=Resp_Body_Base)
async def health_check(delay: DelayParams = Depends()):
"""
Health Check Endpoint.
Used by monitoring systems to verify API availability.
"""
if delay.sleep_time_s > 0: await asyncio.sleep(delay.sleep_time_s)
return mk_resp(data={"status": "V3 API is healthy!"})
@router.get("/{obj_type}/schema", response_model=Resp_Body_Base, tags=['CRUD v3 Schema (Dev)'])
async def get_obj_schema(
response: Response,
obj_type: str = Path(min_length=2, max_length=50),
view: str = Query('default'),
variant: str = Query('base'),
account: AccountContext = Depends(get_account_context),
):
"""
Dynamic Schema Introspection.
Allows the frontend (e.g., Svelte/React apps) to retrieve the structure of an object type on the fly.
Returns:
- Database column definitions (types, defaults, nullability).
- Pydantic model field definitions (validation rules, aliases).
This enables dynamic form generation without hardcoding schemas in the frontend.
"""
schema_info = get_object_schema_info(obj_type, view, variant)
if "error" in schema_info:
status_code = 400 if "not found" in schema_info["error"] else 500
return mk_resp(data=False, status_code=status_code, response=response, status_message=schema_info["error"])
return mk_resp(data=schema_info, response=response)
@router.post("/{obj_type}/validate", response_model=Resp_Body_Base, tags=['CRUD v3 Validation (Dev)'])
async def validate_obj_payload(
request: Request,
response: Response,
obj_type: str = Path(min_length=2, max_length=50),
account: AccountContext = Depends(get_account_context),
):
"""
Dry-Run Payload Validation.
Verifies that a payload is valid according to the Pydantic model
without performing any database operations.
"""
obj_data = await request.json()
if obj_type not in obj_type_kv_li:
return mk_resp(data=False, status_code=400, response=response, status_message=f"Object type '{obj_type}' not found.")
obj_cfg = obj_type_kv_li[obj_type]
input_model = obj_cfg.get('mdl_in', obj_cfg.get('mdl'))
try:
input_model(**obj_data)
return mk_resp(data=True, response=response, status_message="Payload is valid.")
except Exception as e:
return mk_resp(data=False, status_code=400, response=response, status_message="Validation Failed", details=str(e))
@router.get('/{obj_type_l1}/{obj_id}', response_model=Resp_Body_Base)
async def get_obj(
response: Response,
obj_type_l1: str = Path(min_length=2, max_length=50),
obj_id: str = Path(min_length=11, max_length=22),
view: str = Query('default'),
inc_hosted_file: Optional[bool] = Query(False), # Added inc_hosted_file parameter
account: AccountContext = Depends(get_account_context_optional),
serialization: SerializationParams = Depends(),
delay: DelayParams = Depends(),
):
"""
Retrieve a Single Object.
1. Resolves the public `id_random` (string) to the internal `id` (integer).
2. Performs a SQL SELECT.
3. Enforces Multi-Tenant access checks.
4. Serializes the result via Pydantic.
"""
from app.db_sql import redis_lookup_id_random, sql_select
if delay.sleep_time_s > 0: await asyncio.sleep(delay.sleep_time_s)
obj_name = obj_type_l1
if obj_name not in obj_type_kv_li:
return mk_resp(data=False, status_code=400, response=response, status_message=f"Object type '{obj_name}' not found.")
obj_cfg = obj_type_kv_li[obj_name]
table_name = obj_cfg.get(f'tbl_{view}', obj_cfg.get('tbl_default', obj_cfg.get('tbl')))
base_name = obj_cfg.get(f'mdl_{view}', obj_cfg.get('mdl_default', obj_cfg.get('mdl')))
if not table_name or not base_name:
return mk_resp(data=False, status_code=500, response=response, status_message=f"Configuration error.")
record_id = redis_lookup_id_random(record_id_random=obj_id, table_name=obj_name)
if not record_id:
return mk_resp(data=False, status_code=404, response=response, status_message=f"Object with ID '{obj_id}' not found.")
if sql_result := sql_select(table_name=table_name, record_id=record_id):
if not obj_cfg.get('public_read', False):
# Strict context check for non-public objects
if account.auth_method == 'guest' or (account.account_id is None and not account.super):
reason = account.auth_error or "Account context required."
return mk_resp(data=False, status_code=403, response=response, status_message=reason)
if not check_account_access(sql_result, account, obj_name):
return mk_resp(data=False, status_code=403, response=response, status_message="Access denied. Record belongs to another account.")
# Pass inc_hosted_file to the Pydantic model if applicable
if obj_name == 'event_file' and inc_hosted_file:
sql_result['inc_hosted_file'] = True
resp_data = base_name(**sql_result).dict(by_alias=serialization.by_alias, exclude_unset=serialization.exclude_unset, exclude_defaults=serialization.exclude_defaults, exclude_none=serialization.exclude_none)
return mk_resp(data=resp_data, response=response)
else:
return mk_resp(data=False, status_code=404, response=response, status_message=f"Object with ID '{obj_id}' not found in database.")
@router.get('/{obj_type_l1}/', response_model=Resp_Body_Base)
async def get_obj_li(
response: Response,
obj_type_l1: str,
for_obj_type: Optional[str] = None,
for_obj_id: Optional[str] = None,
view: str = Query('default'),
order_by_li: Optional[str] = None,
jp: Optional[Union[str, None]] = None,
account: AccountContext = Depends(get_account_context_optional),
pagination: PaginationParams = Depends(),
status_filter: StatusFilterParams = Depends(),
serialization: SerializationParams = Depends(),
delay: DelayParams = Depends(),
):
"""
List Objects (Pagination & Filtering).
Supports:
- Standard filtering (enabled/hidden).
- Advanced filtering via JSON Payload (`jp`) param (Search, Fulltext, AND/OR queries).
- Sorting (`order_by_li`).
- Parent-Child filtering (`for_obj_type`, `for_obj_id`).
- Account Isolation (automatically enforced).
"""
from app.db_sql import redis_lookup_id_random, sql_select
if delay.sleep_time_s > 0: await asyncio.sleep(delay.sleep_time_s)
qry_dict_li = None
fulltext_qry_dict_obj = None
and_qry_dict_obj = None
and_like_dict_obj = None
or_like_dict_obj = None
and_in_dict_li_obj = None
jp_obj = safe_json_loads(urllib.parse.unquote(jp)) if jp else None
if jp_obj:
if jp_obj.get('qry'): qry_dict_li = jp_obj['qry']
if jp_obj.get('ft_qry'): fulltext_qry_dict_obj = jp_obj['ft_qry']
if jp_obj.get('and_qry'): and_qry_dict_obj = jp_obj['and_qry']
if jp_obj.get('and_like'): and_like_dict_obj = jp_obj['and_like']
if jp_obj.get('or_like'): or_like_dict_obj = jp_obj['or_like']
if jp_obj.get('and_in_li'): and_in_dict_li_obj = jp_obj['and_in_li']
order_by_li = safe_json_loads(order_by_li)
obj_name = obj_type_l1
if obj_name not in obj_type_kv_li:
return mk_resp(data=False, status_code=400, response=response, status_message=f"Object type '{obj_name}' not found.")
obj_cfg = obj_type_kv_li[obj_name]
if obj_name == 'site' and not (for_obj_type == 'account' and for_obj_id):
return mk_resp(data=False, status_code=403, response=response, status_message="Listing sites is only permitted when filtered by account.")
if for_obj_type == 'account' and for_obj_id:
if not obj_cfg.get('public_read', False):
if not account.super and for_obj_id != account.account_id_random:
return mk_resp(data=False, status_code=403, response=response, status_message="Access denied to requested account.")
table_name = obj_cfg.get(f'tbl_{view}', obj_cfg.get('tbl_default', obj_cfg.get('tbl')))
base_name = obj_cfg.get(f'mdl_{view}', obj_cfg.get('mdl_default', obj_cfg.get('mdl')))
if not table_name or not base_name:
return mk_resp(data=False, status_code=500, response=response, status_message=f"Configuration error.")
order_by_li = filter_order_by(order_by_li, base_name, table_name)
status_filter = get_supported_filters(base_name, status_filter)
if not obj_cfg.get('public_read', False):
and_qry_dict_obj = apply_forced_account_filter(and_qry_dict_obj, account, base_name, obj_name, table_name=table_name)
if for_obj_type and for_obj_id:
resolved_for_obj_id = redis_lookup_id_random(record_id_random=for_obj_id, table_name=for_obj_type)
if not resolved_for_obj_id:
return mk_resp(data=False, status_code=404, response=response, status_message=f"Parent object with ID '{for_obj_id}' not found.")
sql_result = sql_select(
table_name=table_name,
field_name=f'{for_obj_type}_id',
field_value=resolved_for_obj_id,
enabled=status_filter.enabled,
hidden=status_filter.hidden,
qry_dict_li=qry_dict_li,
fulltext_qry_dict=fulltext_qry_dict_obj,
and_qry_dict=and_qry_dict_obj,
and_like_dict=and_like_dict_obj,
or_like_dict=or_like_dict_obj,
and_in_dict_li=and_in_dict_li_obj,
order_by_li=order_by_li,
limit=pagination.limit,
offset=pagination.offset,
as_list=True,
)
else:
sql_result = sql_select(
table_name=table_name,
enabled=status_filter.enabled,
hidden=status_filter.hidden,
qry_dict_li=qry_dict_li,
fulltext_qry_dict=fulltext_qry_dict_obj,
and_qry_dict=and_qry_dict_obj,
and_like_dict=and_like_dict_obj,
or_like_dict=or_like_dict_obj,
and_in_dict_li=and_in_dict_li_obj,
order_by_li=order_by_li,
limit=pagination.limit,
offset=pagination.offset,
as_list=True,
)
if sql_result is False:
# Standardized rich error bubbling
db_err = format_db_error(get_last_sql_error())
# If it's a schema error (like Unknown Column), it's a 400 Bad Request
status_code = 400 if db_err.category == "database_schema" else 500
return mk_resp(data=False, status_code=status_code, response=response, status_message="Listing failed due to database error.", details=db_err.dict())
if sql_result:
resp_data_li = [base_name(**record).dict(by_alias=serialization.by_alias, exclude_unset=serialization.exclude_unset, exclude_defaults=serialization.exclude_defaults, exclude_none=serialization.exclude_none) for record in sql_result]
return mk_resp(data=resp_data_li, response=response)
else:
return mk_resp(data=[], status_code=200, response=response)
@router.post('/{obj_type_l1}/search', response_model=Resp_Body_Base, tags=['CRUD v3 Search (Dev)'])
async def search_obj_li(
response: Response,
obj_type_l1: str,
search_query: SearchQuery,
for_obj_type: Optional[str] = Query(None),
for_obj_id: Optional[str] = Query(None),
view: str = Query('default'),
order_by_li: Optional[str] = Query(None),
account: AccountContext = Depends(get_account_context_optional),
pagination: PaginationParams = Depends(),
status_filter: StatusFilterParams = Depends(),
serialization: SerializationParams = Depends(),
delay: DelayParams = Depends(),
):
"""
Search Objects (POST).
Advanced search endpoint using `SearchQuery` body.
- Security: Guests can access specific objects (e.g., site_domain) if permitted.
- Filtering: Supports dynamic AND/OR filters built from the frontend.
"""
from app.db_sql import redis_lookup_id_random, sql_select
if delay.sleep_time_s > 0: await asyncio.sleep(delay.sleep_time_s)
order_by_li = safe_json_loads(order_by_li)
obj_name = obj_type_l1
if obj_name not in obj_type_kv_li:
return mk_resp(data=False, status_code=400, response=response, status_message=f"Object type '{obj_name}' not found.")
obj_cfg = obj_type_kv_li[obj_name]
is_guest = (account.auth_method == 'guest')
is_public_read = obj_cfg.get('public_read', False)
if is_guest and not is_public_read:
return mk_resp(data=False, status_code=403, response=response, status_message="Authentication required for this search.")
if obj_name == 'site' and not (for_obj_type == 'account' and for_obj_id):
return mk_resp(data=False, status_code=403, response=response, status_message="Listing sites is only permitted when filtered by account.")
table_name = obj_cfg.get(f'tbl_{view}', obj_cfg.get('tbl_default', obj_cfg.get('tbl')))
base_name = obj_cfg.get(f'mdl_{view}', obj_cfg.get('mdl_default', obj_cfg.get('mdl')))
if not table_name or not base_name:
return mk_resp(data=False, status_code=500, response=response, status_message=f"Configuration error.")
order_by_li = filter_order_by(order_by_li, base_name, table_name)
status_filter = get_supported_filters(base_name, status_filter)
searchable_fields = obj_cfg.get('searchable_fields')
if for_obj_type == 'account' and for_obj_id:
if not account.super and for_obj_id != account.account_id_random:
return mk_resp(data=False, status_code=403, response=response, status_message="Access denied to requested account.")
if not is_public_read and not account.super and account.auth_method != 'bypass':
if search_query.and_filters is None: search_query.and_filters = []
if obj_name == 'account':
search_query.and_filters.append(SearchFilter(field='id', op='eq', value=account.account_id))
elif base_name and hasattr(base_name, '__fields__') and 'account_id' in base_name.__fields__:
search_query.and_filters.append(SearchFilter(field='account_id', op='eq', value=account.account_id))
if for_obj_type and for_obj_id:
resolved_for_obj_id = redis_lookup_id_random(record_id_random=for_obj_id, table_name=for_obj_type)
if not resolved_for_obj_id:
return mk_resp(data=False, status_code=404, response=response, status_message=f"Parent object with ID '{for_obj_id}' not found.")
sql_result = sql_select(
table_name=table_name,
field_name=f'{for_obj_type}_id',
field_value=resolved_for_obj_id,
enabled=status_filter.enabled,
hidden=status_filter.hidden,
search_query=search_query,
searchable_fields=searchable_fields,
order_by_li=order_by_li,
limit=pagination.limit,
offset=pagination.offset,
as_list=True,
)
else:
sql_result = sql_select(
table_name=table_name,
enabled=status_filter.enabled,
hidden=status_filter.hidden,
search_query=search_query,
searchable_fields=searchable_fields,
order_by_li=order_by_li,
limit=pagination.limit,
offset=pagination.offset,
as_list=True,
)
if sql_result is False:
# Standardized rich error bubbling
db_err = format_db_error(get_last_sql_error())
# If it's a schema error (like Unknown Column), it's a 400 Bad Request
status_code = 400 if db_err.category == "database_schema" else 500
return mk_resp(data=False, status_code=status_code, response=response, status_message="Search failed due to database error.", details=db_err.dict())
if sql_result:
resp_data_li = [base_name(**record).dict(by_alias=serialization.by_alias, exclude_unset=serialization.exclude_unset, exclude_defaults=serialization.exclude_defaults, exclude_none=serialization.exclude_none) for record in sql_result]
return mk_resp(data=resp_data_li, response=response)
else:
return mk_resp(data=[], status_code=200, response=response)
@router.post('/{obj_type_l1}/', response_model=Resp_Body_Base)
async def post_obj(
request: Request,
response: Response,
obj_type_l1: str = Path(min_length=2, max_length=50),
return_obj: Optional[bool] = True,
x_ae_ignore_extra_fields: Optional[bool] = Header(False),
account: AccountContext = Depends(get_account_context),
serialization: SerializationParams = Depends(),
delay: DelayParams = Depends(),
):
"""
Create Object.
1. Injects `account_id` for ownership.
2. **Sanitizes Payload**: Resolves `*_id_random` -> `*_id`, removes virtual fields, and view-only fields.
- If `x-ae-ignore-extra-fields: true` header is provided, unknown fields are stripped.
3. Validates input against Pydantic model (`mdl_in`).
4. Returns the created object or just its ID.
"""
from app.db_sql import sql_insert, get_id_random, sql_select
if delay.sleep_time_s > 0: await asyncio.sleep(delay.sleep_time_s)
obj_data = await request.json()
obj_name = obj_type_l1
if obj_name not in obj_type_kv_li:
return mk_resp(data=False, status_code=400, response=response, status_message=f"Object type '{obj_name}' not found.")
obj_cfg = obj_type_kv_li[obj_name]
table_name_insert = obj_cfg.get('tbl_update', obj_cfg.get('tbl'))
table_name_select = obj_cfg.get('tbl_default', obj_cfg.get('tbl'))
input_model = obj_cfg.get('mdl_in', obj_cfg.get('mdl'))
output_model = obj_cfg.get('mdl_out', obj_cfg.get('mdl_default', obj_cfg.get('mdl')))
if not table_name_insert or not input_model or not table_name_select or not output_model:
return mk_resp(data=False, status_code=500, response=response, status_message=f"Configuration error.")
if not account.super and account.auth_method != 'bypass' and account.account_id:
if 'account_id' in input_model.__fields__:
obj_data['account_id'] = account.account_id
elif obj_name == 'account':
return mk_resp(data=False, status_code=403, response=response, status_message="Account creation is restricted.")
# Sanitize payload (ID resolution, virtual fields, and optionally extra fields)
sanitize_payload(obj_data, input_model, ignore_extra=x_ae_ignore_extra_fields)
try:
validated_obj = input_model(**obj_data)
except ValidationError as e:
# Return structured errors (field -> error message) for UI feedback
structured_errors = {err['loc'][-1]: err['msg'] for err in e.errors()}
return mk_resp(data=False, status_code=400, response=response, status_message="Validation Failed", details=structured_errors)
except Exception as e:
return mk_resp(data=False, status_code=400, response=response, status_message="Validation Failed", details=str(e))
data_to_insert = validated_obj.dict(exclude_unset=True)
if sql_insert_result := sql_insert(data=data_to_insert, table_name=table_name_insert):
new_obj_id = sql_insert_result
new_obj_id_random = get_id_random(record_id=new_obj_id, table_name=obj_name)
if return_obj:
if sql_select_result := sql_select(table_name=table_name_select, record_id=new_obj_id):
resp_data = output_model(**sql_select_result).dict(by_alias=serialization.by_alias, exclude_unset=serialization.exclude_unset)
# V3 contract: {obj_type}_id in responses must be the random string, never the integer.
_id_key = f'{obj_name}_id' if serialization.by_alias else 'id'
_rand_key = f'{obj_name}_id_random' if serialization.by_alias else 'id_random'
if isinstance(resp_data.get(_id_key), int) and resp_data.get(_rand_key):
resp_data[_id_key] = resp_data[_rand_key]
return mk_resp(data=resp_data, response=response)
return mk_resp(data={"obj_id": new_obj_id_random, "obj_id_random": new_obj_id_random}, response=response)
else:
# Standardized rich error bubbling
db_err = format_db_error(get_last_sql_error())
return mk_resp(data=False, status_code=400, response=response, status_message="Failed to create object.", details=db_err.dict())
@router.patch('/{obj_type_l1}/{obj_id}', response_model=Resp_Body_Base)
async def patch_obj(
request: Request,
response: Response,
obj_type_l1: str = Path(min_length=2, max_length=50),
obj_id: str = Path(min_length=11, max_length=22),
return_obj: Optional[bool] = True,
x_ae_ignore_extra_fields: Optional[bool] = Header(False),
account: AccountContext = Depends(get_account_context),
serialization: SerializationParams = Depends(),
delay: DelayParams = Depends(),
):
"""
Update Object (Partial).
1. Resolves ID and checks access permissions.
2. **Sanitizes Payload**: Resolves `*_id_random` -> `*_id`, removes virtual fields, and view-only fields.
- If `x-ae-ignore-extra-fields: true` header is provided, unknown fields are stripped.
3. Performs SQL UPDATE.
"""
from app.db_sql import redis_lookup_id_random, sql_select, sql_update
if delay.sleep_time_s > 0: await asyncio.sleep(delay.sleep_time_s)
obj_data = await request.json()
obj_name = obj_type_l1
if obj_name not in obj_type_kv_li:
return mk_resp(data=False, status_code=400, response=response, status_message=f"Object type '{obj_name}' not found.")
obj_cfg = obj_type_kv_li[obj_name]
table_name_update = obj_cfg.get('tbl_update', obj_cfg.get('tbl'))
table_name_select = obj_cfg.get('tbl_default', obj_cfg.get('tbl'))
input_model = obj_cfg.get('mdl_in', obj_cfg.get('mdl'))
output_model = obj_cfg.get('mdl_out', obj_cfg.get('mdl_default', obj_cfg.get('mdl')))
if not table_name_update or not input_model or not table_name_select or not output_model:
return mk_resp(data=False, status_code=500, response=response, status_message=f"Configuration error.")
record_id = redis_lookup_id_random(record_id_random=obj_id, table_name=obj_name)
if not record_id:
return mk_resp(data=False, status_code=404, response=response, status_message=f"Object with ID '{obj_id}' not found.")
if existing_obj := sql_select(table_name=table_name_select, record_id=record_id):
if not check_account_access(existing_obj, account, obj_name):
return mk_resp(data=False, status_code=403, response=response, status_message="Access denied.")
else:
return mk_resp(data=False, status_code=404, response=response, status_message=f"Object with ID '{obj_id}' not found in database.")
# Sanitize payload (ID resolution, virtual fields, and optionally extra fields)
sanitize_payload(obj_data, input_model, ignore_extra=x_ae_ignore_extra_fields)
if sql_update(data=obj_data, table_name=table_name_update, record_id=record_id):
if return_obj:
if sql_select_result := sql_select(table_name=table_name_select, record_id=record_id):
resp_data = output_model(**sql_select_result).dict(by_alias=serialization.by_alias, exclude_unset=serialization.exclude_unset)
return mk_resp(data=resp_data, response=response)
return mk_resp(data=True, response=response, status_message="Object updated successfully.")
else:
# Standardized rich error bubbling
db_err = format_db_error(get_last_sql_error())
return mk_resp(data=False, status_code=400, response=response, status_message="Failed to update object.", details=db_err.dict())
@router.delete('/{obj_type_l1}/{obj_id}', response_model=Resp_Body_Base)
async def delete_obj(
response: Response,
obj_type_l1: str = Path(min_length=2, max_length=50),
obj_id: str = Path(min_length=11, max_length=22),
method: str = Query('delete', regex='^(delete|hide|disable)$'),
account: AccountContext = Depends(get_account_context),
delay: DelayParams = Depends(),
):
"""
Delete Object.
Supports:
- Soft Delete: `method='hide'` or `method='disable'`.
- Hard Delete: `method='delete'`.
"""
from app.db_sql import redis_lookup_id_random, sql_select, sql_update, sql_delete
if delay.sleep_time_s > 0: await asyncio.sleep(delay.sleep_time_s)
obj_name = obj_type_l1
if obj_name not in obj_type_kv_li:
return mk_resp(data=False, status_code=400, response=response, status_message=f"Object type '{obj_name}' not found.")
obj_cfg = obj_type_kv_li[obj_name]
table_name_delete = obj_cfg.get('tbl_update', obj_cfg.get('tbl'))
table_name_select = obj_cfg.get('tbl_default', obj_cfg.get('tbl'))
if not table_name_delete:
return mk_resp(data=False, status_code=500, response=response, status_message=f"Configuration error.")
record_id = redis_lookup_id_random(record_id_random=obj_id, table_name=obj_name)
if not record_id:
return mk_resp(data=False, status_code=404, response=response, status_message=f"Object with ID '{obj_id}' not found.")
if existing_obj := sql_select(table_name=table_name_select, record_id=record_id):
if not check_account_access(existing_obj, account, obj_name):
return mk_resp(data=False, status_code=403, response=response, status_message="Access denied.")
else:
return mk_resp(data=False, status_code=404, response=response, status_message=f"Object with ID '{obj_id}' not found in database.")
if method == 'hide':
success = sql_update(table_name=table_name_delete, record_id=record_id, data={'hide': True})
elif method == 'disable':
success = sql_update(table_name=table_name_delete, record_id=record_id, data={'enable': False})
else:
success = sql_delete(table_name=table_name_delete, record_id=record_id)
if success:
return mk_resp(data=True, response=response, status_message=f"Object deleted successfully.")
return mk_resp(data=False, status_code=400, response=response, status_message="Failed to delete object.")
# Nested / Child Routes are imported and included below to keep this file manageable.
from app.routers.api_crud_v3_nested import router as nested_router
router.include_router(nested_router)