fix(crud): extend Vision ID safety net to all response paths

- Extracted apply_vision_id_fix() helper to lib_api_crud_v3.py — single
  source of truth for the fix that ensures {obj_type}_id in responses is
  always the random string, never the DB integer.
- Applied to all response-returning paths in api_crud_v3.py:
  GET single, GET list, POST search, POST create, PATCH update.
- Applied to all response-returning paths in api_crud_v3_nested.py:
  GET child list, POST search, POST create, GET single child, PATCH child.
- Removed duplicate get_child_obj and patch_child_obj route handlers in
  api_crud_v3_nested.py — FastAPI silently routes to only the first
  matching handler, so the second definitions were unreachable dead code.

Covers all 23 V3 CRUD models still using the old integer-alias pattern.
The archive_content model was already migrated to Vision IDs; this fix
ensures every other model gets correct responses without individual migration.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Scott Idem
2026-03-25 18:35:21 -04:00
parent cffde249d3
commit 6bde236633
3 changed files with 27 additions and 126 deletions

View File

@@ -8,6 +8,19 @@ from app.models.error_models import StandardError
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
def apply_vision_id_fix(resp_data: dict, obj_type: str, by_alias: bool) -> dict:
"""
V3 contract: {obj_type}_id in responses must be the random string, never the DB integer.
Applies to models not yet migrated to the Vision ID pattern (root_validator).
Safe to call on already-migrated models — no-op if the value is already a string.
"""
_id_key = f'{obj_type}_id' if by_alias else 'id'
_rand_key = f'{obj_type}_id_random' if by_alias else 'id_random'
if isinstance(resp_data.get(_id_key), int) and resp_data.get(_rand_key):
resp_data[_id_key] = resp_data[_rand_key]
return resp_data
def format_db_error(raw_error: str) -> StandardError: def format_db_error(raw_error: str) -> StandardError:
""" """
Parses raw SQLAlchemy/MariaDB errors into structured StandardError objects. Parses raw SQLAlchemy/MariaDB errors into structured StandardError objects.

View File

@@ -15,7 +15,8 @@ from app.lib_general_v3 import (
) )
from app.lib_api_crud_v3 import ( from app.lib_api_crud_v3 import (
check_account_access, apply_forced_account_filter, filter_order_by, check_account_access, apply_forced_account_filter, filter_order_by,
get_supported_filters, safe_json_loads, sanitize_payload, format_db_error get_supported_filters, safe_json_loads, sanitize_payload, format_db_error,
apply_vision_id_fix
) )
from app.lib_schema_v3 import get_object_schema_info from app.lib_schema_v3 import get_object_schema_info
from app.db_sql import get_last_sql_error from app.db_sql import get_last_sql_error
@@ -157,6 +158,7 @@ async def get_obj(
sql_result['inc_hosted_file'] = True sql_result['inc_hosted_file'] = True
resp_data = base_name(**sql_result).dict(by_alias=serialization.by_alias, exclude_unset=serialization.exclude_unset, exclude_defaults=serialization.exclude_defaults, exclude_none=serialization.exclude_none) resp_data = base_name(**sql_result).dict(by_alias=serialization.by_alias, exclude_unset=serialization.exclude_unset, exclude_defaults=serialization.exclude_defaults, exclude_none=serialization.exclude_none)
apply_vision_id_fix(resp_data, obj_name, serialization.by_alias)
return mk_resp(data=resp_data, response=response) return mk_resp(data=resp_data, response=response)
else: else:
return mk_resp(data=False, status_code=404, response=response, status_message=f"Object with ID '{obj_id}' not found in database.") return mk_resp(data=False, status_code=404, response=response, status_message=f"Object with ID '{obj_id}' not found in database.")
@@ -283,7 +285,7 @@ async def get_obj_li(
return mk_resp(data=False, status_code=status_code, response=response, status_message="Listing failed due to database error.", details=db_err.dict()) return mk_resp(data=False, status_code=status_code, response=response, status_message="Listing failed due to database error.", details=db_err.dict())
if sql_result: if sql_result:
resp_data_li = [base_name(**record).dict(by_alias=serialization.by_alias, exclude_unset=serialization.exclude_unset, exclude_defaults=serialization.exclude_defaults, exclude_none=serialization.exclude_none) for record in sql_result] resp_data_li = [apply_vision_id_fix(base_name(**record).dict(by_alias=serialization.by_alias, exclude_unset=serialization.exclude_unset, exclude_defaults=serialization.exclude_defaults, exclude_none=serialization.exclude_none), obj_name, serialization.by_alias) for record in sql_result]
return mk_resp(data=resp_data_li, response=response) return mk_resp(data=resp_data_li, response=response)
else: else:
return mk_resp(data=[], status_code=200, response=response) return mk_resp(data=[], status_code=200, response=response)
@@ -393,7 +395,7 @@ async def search_obj_li(
return mk_resp(data=False, status_code=status_code, response=response, status_message="Search failed due to database error.", details=db_err.dict()) return mk_resp(data=False, status_code=status_code, response=response, status_message="Search failed due to database error.", details=db_err.dict())
if sql_result: if sql_result:
resp_data_li = [base_name(**record).dict(by_alias=serialization.by_alias, exclude_unset=serialization.exclude_unset, exclude_defaults=serialization.exclude_defaults, exclude_none=serialization.exclude_none) for record in sql_result] resp_data_li = [apply_vision_id_fix(base_name(**record).dict(by_alias=serialization.by_alias, exclude_unset=serialization.exclude_unset, exclude_defaults=serialization.exclude_defaults, exclude_none=serialization.exclude_none), obj_name, serialization.by_alias) for record in sql_result]
return mk_resp(data=resp_data_li, response=response) return mk_resp(data=resp_data_li, response=response)
else: else:
return mk_resp(data=[], status_code=200, response=response) return mk_resp(data=[], status_code=200, response=response)
@@ -464,11 +466,7 @@ async def post_obj(
if return_obj: if return_obj:
if sql_select_result := sql_select(table_name=table_name_select, record_id=new_obj_id): if sql_select_result := sql_select(table_name=table_name_select, record_id=new_obj_id):
resp_data = output_model(**sql_select_result).dict(by_alias=serialization.by_alias, exclude_unset=serialization.exclude_unset) resp_data = output_model(**sql_select_result).dict(by_alias=serialization.by_alias, exclude_unset=serialization.exclude_unset)
# V3 contract: {obj_type}_id in responses must be the random string, never the integer. apply_vision_id_fix(resp_data, obj_name, serialization.by_alias)
_id_key = f'{obj_name}_id' if serialization.by_alias else 'id'
_rand_key = f'{obj_name}_id_random' if serialization.by_alias else 'id_random'
if isinstance(resp_data.get(_id_key), int) and resp_data.get(_rand_key):
resp_data[_id_key] = resp_data[_rand_key]
return mk_resp(data=resp_data, response=response) return mk_resp(data=resp_data, response=response)
return mk_resp(data={"obj_id": new_obj_id_random, "obj_id_random": new_obj_id_random}, response=response) return mk_resp(data={"obj_id": new_obj_id_random, "obj_id_random": new_obj_id_random}, response=response)
else: else:
@@ -532,6 +530,7 @@ async def patch_obj(
if return_obj: if return_obj:
if sql_select_result := sql_select(table_name=table_name_select, record_id=record_id): if sql_select_result := sql_select(table_name=table_name_select, record_id=record_id):
resp_data = output_model(**sql_select_result).dict(by_alias=serialization.by_alias, exclude_unset=serialization.exclude_unset) resp_data = output_model(**sql_select_result).dict(by_alias=serialization.by_alias, exclude_unset=serialization.exclude_unset)
apply_vision_id_fix(resp_data, obj_name, serialization.by_alias)
return mk_resp(data=resp_data, response=response) return mk_resp(data=resp_data, response=response)
return mk_resp(data=True, response=response, status_message="Object updated successfully.") return mk_resp(data=True, response=response, status_message="Object updated successfully.")
else: else:

View File

@@ -13,7 +13,8 @@ from app.lib_general_v3 import (
) )
from app.lib_api_crud_v3 import ( from app.lib_api_crud_v3 import (
check_account_access, apply_forced_account_filter, filter_order_by, check_account_access, apply_forced_account_filter, filter_order_by,
get_supported_filters, safe_json_loads, sanitize_payload, format_db_error get_supported_filters, safe_json_loads, sanitize_payload, format_db_error,
apply_vision_id_fix
) )
from app.db_sql import get_last_sql_error from app.db_sql import get_last_sql_error
from app.models.response_models import * from app.models.response_models import *
@@ -132,7 +133,7 @@ async def get_child_obj_li(
return mk_resp(data=False, status_code=status_code, response=response, status_message="Listing failed due to database error.", details=db_err.dict()) return mk_resp(data=False, status_code=status_code, response=response, status_message="Listing failed due to database error.", details=db_err.dict())
if sql_result: if sql_result:
resp_data_li = [base_name(**record).dict(by_alias=serialization.by_alias, exclude_unset=serialization.exclude_unset, exclude_defaults=serialization.exclude_defaults, exclude_none=serialization.exclude_none) for record in sql_result] resp_data_li = [apply_vision_id_fix(base_name(**record).dict(by_alias=serialization.by_alias, exclude_unset=serialization.exclude_unset, exclude_defaults=serialization.exclude_defaults, exclude_none=serialization.exclude_none), obj_name, serialization.by_alias) for record in sql_result]
return mk_resp(data=resp_data_li, response=response) return mk_resp(data=resp_data_li, response=response)
else: else:
return mk_resp(data=[], status_code=200, response=response) return mk_resp(data=[], status_code=200, response=response)
@@ -218,7 +219,7 @@ async def search_child_obj_li(
return mk_resp(data=False, status_code=status_code, response=response, status_message="Search failed due to database error.", details=db_err.dict()) return mk_resp(data=False, status_code=status_code, response=response, status_message="Search failed due to database error.", details=db_err.dict())
if sql_result: if sql_result:
resp_data_li = [base_name(**record).dict(by_alias=serialization.by_alias, exclude_unset=serialization.exclude_unset, exclude_defaults=serialization.exclude_defaults, exclude_none=serialization.exclude_none) for record in sql_result] resp_data_li = [apply_vision_id_fix(base_name(**record).dict(by_alias=serialization.by_alias, exclude_unset=serialization.exclude_unset, exclude_defaults=serialization.exclude_defaults, exclude_none=serialization.exclude_none), obj_name, serialization.by_alias) for record in sql_result]
return mk_resp(data=resp_data_li, response=response) return mk_resp(data=resp_data_li, response=response)
else: else:
return mk_resp(data=[], status_code=200, response=response) return mk_resp(data=[], status_code=200, response=response)
@@ -306,11 +307,7 @@ async def post_child_obj(
if return_obj: if return_obj:
if sql_select_result := sql_select(table_name=table_name_select, record_id=new_obj_id): if sql_select_result := sql_select(table_name=table_name_select, record_id=new_obj_id):
resp_data = output_model(**sql_select_result).dict(by_alias=serialization.by_alias, exclude_unset=serialization.exclude_unset) resp_data = output_model(**sql_select_result).dict(by_alias=serialization.by_alias, exclude_unset=serialization.exclude_unset)
# V3 contract: {obj_type}_id in responses must be the random string, never the integer. apply_vision_id_fix(resp_data, child_obj_type, serialization.by_alias)
_id_key = f'{child_obj_type}_id' if serialization.by_alias else 'id'
_rand_key = f'{child_obj_type}_id_random' if serialization.by_alias else 'id_random'
if isinstance(resp_data.get(_id_key), int) and resp_data.get(_rand_key):
resp_data[_id_key] = resp_data[_rand_key]
return mk_resp(data=resp_data, response=response) return mk_resp(data=resp_data, response=response)
return mk_resp(data={"obj_id": new_obj_id_random, "obj_id_random": new_obj_id_random}, response=response) return mk_resp(data={"obj_id": new_obj_id_random, "obj_id_random": new_obj_id_random}, response=response)
else: else:
@@ -362,6 +359,7 @@ async def get_child_obj(
return mk_resp(data=False, status_code=404, response=response, status_message="Child not found under parent.") return mk_resp(data=False, status_code=404, response=response, status_message="Child not found under parent.")
resp_data = base_name(**sql_result).dict(by_alias=serialization.by_alias, exclude_unset=serialization.exclude_unset) resp_data = base_name(**sql_result).dict(by_alias=serialization.by_alias, exclude_unset=serialization.exclude_unset)
apply_vision_id_fix(resp_data, child_obj_type, serialization.by_alias)
return mk_resp(data=resp_data, response=response) return mk_resp(data=resp_data, response=response)
return mk_resp(data=False, status_code=404, response=response, status_message="Child not found.") return mk_resp(data=False, status_code=404, response=response, status_message="Child not found.")
@@ -423,6 +421,7 @@ async def patch_child_obj(
if return_obj: if return_obj:
if updated_child := sql_select(table_name=table_name_select, record_id=resolved_child_id): if updated_child := sql_select(table_name=table_name_select, record_id=resolved_child_id):
resp_data = output_model(**updated_child).dict(by_alias=serialization.by_alias, exclude_unset=serialization.exclude_unset) resp_data = output_model(**updated_child).dict(by_alias=serialization.by_alias, exclude_unset=serialization.exclude_unset)
apply_vision_id_fix(resp_data, child_obj_type, serialization.by_alias)
return mk_resp(data=resp_data, response=response) return mk_resp(data=resp_data, response=response)
return mk_resp(data=True, response=response, status_message="Updated successfully.") return mk_resp(data=True, response=response, status_message="Updated successfully.")
else: else:
@@ -430,116 +429,6 @@ async def patch_child_obj(
return mk_resp(data=False, status_code=400, response=response, status_message="Update failed.", details=db_err.dict()) return mk_resp(data=False, status_code=400, response=response, status_message="Update failed.", details=db_err.dict())
@router.get('/{parent_obj_type}/{parent_obj_id}/{child_obj_type}/{child_obj_id}', response_model=Resp_Body_Base)
async def get_child_obj(
response: Response,
parent_obj_type: str = Path(min_length=2, max_length=50),
parent_obj_id: str = Path(min_length=11, max_length=22),
child_obj_type: str = Path(min_length=2, max_length=50),
child_obj_id: str = Path(min_length=11, max_length=22),
view: str = Query('default'),
account: AccountContext = Depends(get_account_context),
serialization: SerializationParams = Depends(),
delay: DelayParams = Depends(),
):
"""
Retrieve Child Object.
Verifies that the child belongs to the specified parent.
"""
from app.db_sql import redis_lookup_id_random, sql_select
if delay.sleep_time_s > 0: await asyncio.sleep(delay.sleep_time_s)
# ID Vision: Resolve physical table names from registry to support aliases
if parent_obj_type not in obj_type_kv_li or child_obj_type not in obj_type_kv_li:
return mk_resp(data=False, status_code=400, response=response, status_message="Invalid object type(s).")
parent_table = obj_type_kv_li[parent_obj_type].get('tbl')
child_table = obj_type_kv_li[child_obj_type].get('tbl')
resolved_parent_id = redis_lookup_id_random(record_id_random=parent_obj_id, table_name=parent_table)
resolved_child_id = redis_lookup_id_random(record_id_random=child_obj_id, table_name=child_table)
if not resolved_parent_id or not resolved_child_id:
return mk_resp(data=False, status_code=404, response=response, status_message="Object(s) not found.")
obj_cfg = obj_type_kv_li[child_obj_type]
table_name = obj_cfg.get(f'tbl_{view}', obj_cfg.get('tbl_default', obj_cfg.get('tbl')))
base_name = obj_cfg.get(f'mdl_{view}', obj_cfg.get('mdl_default', obj_cfg.get('mdl')))
if sql_result := sql_select(table_name=table_name, record_id=resolved_child_id):
if sql_result.get(f'{parent_obj_type}_id') != resolved_parent_id:
return mk_resp(data=False, status_code=404, response=response, status_message="Child not found under parent.")
resp_data = base_name(**sql_result).dict(by_alias=serialization.by_alias, exclude_unset=serialization.exclude_unset)
return mk_resp(data=resp_data, response=response)
return mk_resp(data=False, status_code=404, response=response, status_message="Child not found.")
@router.patch('/{parent_obj_type}/{parent_obj_id}/{child_obj_type}/{child_obj_id}', response_model=Resp_Body_Base)
async def patch_child_obj(
request: Request,
response: Response,
parent_obj_type: str = Path(min_length=2, max_length=50),
parent_obj_id: str = Path(min_length=11, max_length=22),
child_obj_type: str = Path(min_length=2, max_length=50),
child_obj_id: str = Path(min_length=11, max_length=22),
return_obj: Optional[bool] = True,
x_ae_ignore_extra_fields: Optional[bool] = Header(False),
account: AccountContext = Depends(get_account_context),
serialization: SerializationParams = Depends(),
delay: DelayParams = Depends(),
):
"""
Update Child Object.
Verifies that the child belongs to the specified parent before updating.
"""
from app.db_sql import redis_lookup_id_random, sql_select, sql_update
if delay.sleep_time_s > 0: await asyncio.sleep(delay.sleep_time_s)
obj_data = await request.json()
# ID Vision: Resolve physical table names from registry to support aliases
if parent_obj_type not in obj_type_kv_li or child_obj_type not in obj_type_kv_li:
return mk_resp(data=False, status_code=400, response=response, status_message="Invalid object type(s).")
parent_table = obj_type_kv_li[parent_obj_type].get('tbl')
child_table = obj_type_kv_li[child_obj_type].get('tbl')
resolved_parent_id = redis_lookup_id_random(record_id_random=parent_obj_id, table_name=parent_table)
resolved_child_id = redis_lookup_id_random(record_id_random=child_obj_id, table_name=child_table)
if not resolved_parent_id or not resolved_child_id:
return mk_resp(data=False, status_code=404, response=response, status_message="Object(s) not found.")
obj_cfg = obj_type_kv_li[child_obj_type]
table_name_update = obj_cfg.get('tbl_update', obj_cfg.get('tbl'))
table_name_select = obj_cfg.get('tbl_default', obj_cfg.get('tbl'))
input_model = obj_cfg.get('mdl_in', obj_cfg.get('mdl'))
output_model = obj_cfg.get('mdl_out', obj_cfg.get('mdl_default', obj_cfg.get('mdl')))
if existing_child := sql_select(table_name=table_name_select, record_id=resolved_child_id):
if existing_child.get(f'{parent_obj_type}_id') != resolved_parent_id:
return mk_resp(data=False, status_code=404, response=response, status_message="Child not found under parent.")
else:
return mk_resp(data=False, status_code=404, response=response, status_message="Child not found.")
# Sanitize payload (ID resolution, virtual fields, and optionally extra fields)
sanitize_payload(obj_data, input_model, ignore_extra=x_ae_ignore_extra_fields)
if sql_update(data=obj_data, table_name=table_name_update, record_id=resolved_child_id):
if return_obj:
if updated_child := sql_select(table_name=table_name_select, record_id=resolved_child_id):
resp_data = output_model(**updated_child).dict(by_alias=serialization.by_alias, exclude_unset=serialization.exclude_unset)
return mk_resp(data=resp_data, response=response)
return mk_resp(data=True, response=response, status_message="Updated successfully.")
else:
db_err = format_db_error(get_last_sql_error())
return mk_resp(data=False, status_code=400, response=response, status_message="Update failed.", details=db_err.dict())
@router.delete('/{parent_obj_type}/{parent_obj_id}/{child_obj_type}/{child_obj_id}', response_model=Resp_Body_Base) @router.delete('/{parent_obj_type}/{parent_obj_id}/{child_obj_type}/{child_obj_id}', response_model=Resp_Body_Base)
async def delete_child_obj( async def delete_child_obj(