fix(v3-nested): support aliases in nested CRUD routes

1. Added 'entry' alias for 'journal_entry' in object definitions.\n2. Updated nested router to resolve physical table names from the registry before ID resolution.\n3. Updated ID resolution helpers to recognize 'entry' prefix.\nThis resolves 404 errors when using shorter aliases in nested paths (e.g., /journal/{id}/entry/).
This commit is contained in:
Scott Idem
2026-02-06 14:13:22 -05:00
parent b9c00e423c
commit b63131e3fa
3 changed files with 160 additions and 17 deletions

View File

@@ -158,7 +158,7 @@ def lookup_id_random_pop(
id_prefixes = [
'account', 'activity_log', 'address', 'address_location', 'archive',
'contact', 'contact_1', 'contact_2', 'cont_edu_cert', 'cont_edu_cert_person',
'event', 'event_id_random_only', 'event_abstract', 'event_badge',
'entry', 'event', 'event_id_random_only', 'event_abstract', 'event_badge',
'event_badge_template', 'event_exhibit', 'event_file', 'event_location',
'event_person', 'event_person_profile', 'event_presentation',
'event_presenter', 'event_registration', 'event_session', 'event_track',
@@ -178,6 +178,7 @@ def lookup_id_random_pop(
table = prefix
if prefix == 'address_location': table = 'address'
elif prefix in ['contact_1', 'contact_2']: table = 'contact'
elif prefix == 'entry': table = 'journal_entry'
elif prefix == 'event_id_random_only': table = 'event'
elif prefix == 'poc_event_person': table = 'event_person'
elif prefix == 'poc_person': table = 'person'

View File

@@ -54,3 +54,6 @@ journal_obj_li = {
],
},
}
# Aliases for shorter/cleaner URLs
journal_obj_li['entry'] = journal_obj_li['journal_entry']

View File

@@ -78,6 +78,8 @@ async def get_child_obj_li(
if obj_name not in obj_type_kv_li or parent_obj_type not in obj_type_kv_li:
return mk_resp(data=False, status_code=400, response=response, status_message=f"Invalid object type(s).")
# ID Vision: Resolve physical table names from registry to support aliases
parent_table = obj_type_kv_li[parent_obj_type].get('tbl')
obj_cfg = obj_type_kv_li[obj_name]
table_name = obj_cfg.get(f'tbl_{view}', obj_cfg.get('tbl_default', obj_cfg.get('tbl')))
base_name = obj_cfg.get(f'mdl_{view}', obj_cfg.get('mdl_default', obj_cfg.get('mdl')))
@@ -88,13 +90,13 @@ async def get_child_obj_li(
order_by_li = filter_order_by(order_by_li, base_name, table_name)
status_filter = get_supported_filters(base_name, status_filter)
resolved_parent_id = redis_lookup_id_random(record_id_random=parent_obj_id, table_name=parent_obj_type)
resolved_parent_id = redis_lookup_id_random(record_id_random=parent_obj_id, table_name=parent_table)
if not resolved_parent_id:
return mk_resp(data=False, status_code=404, response=response, status_message=f"Parent not found.")
parent_cfg = obj_type_kv_li[parent_obj_type]
parent_table = parent_cfg.get('tbl_default', parent_cfg.get('tbl'))
if parent_sql_res := sql_select(table_name=parent_table, record_id=resolved_parent_id):
parent_table_select = parent_cfg.get('tbl_default', parent_cfg.get('tbl'))
if parent_sql_res := sql_select(table_name=parent_table_select, record_id=resolved_parent_id):
if not check_account_access(parent_sql_res, account, parent_obj_type):
return mk_resp(data=False, status_code=403, response=response, status_message="Access denied to parent.")
else:
@@ -165,6 +167,8 @@ async def search_child_obj_li(
if obj_name not in obj_type_kv_li or parent_obj_type not in obj_type_kv_li:
return mk_resp(data=False, status_code=400, response=response, status_message="Invalid object type(s).")
# ID Vision: Resolve physical table names from registry to support aliases
parent_table = obj_type_kv_li[parent_obj_type].get('tbl')
obj_cfg = obj_type_kv_li[obj_name]
table_name = obj_cfg.get(f'tbl_{view}', obj_cfg.get('tbl_default', obj_cfg.get('tbl')))
base_name = obj_cfg.get(f'mdl_{view}', obj_cfg.get('mdl_default', obj_cfg.get('mdl')))
@@ -176,13 +180,13 @@ async def search_child_obj_li(
status_filter = get_supported_filters(base_name, status_filter)
searchable_fields = obj_cfg.get('searchable_fields')
resolved_parent_id = redis_lookup_id_random(record_id_random=parent_obj_id, table_name=parent_obj_type)
resolved_parent_id = redis_lookup_id_random(record_id_random=parent_obj_id, table_name=parent_table)
if not resolved_parent_id:
return mk_resp(data=False, status_code=404, response=response, status_message="Parent not found.")
parent_cfg = obj_type_kv_li[parent_obj_type]
parent_table = parent_cfg.get('tbl_default', parent_cfg.get('tbl'))
if parent_sql_res := sql_select(table_name=parent_table, record_id=resolved_parent_id):
parent_table_select = parent_cfg.get('tbl_default', parent_cfg.get('tbl'))
if parent_sql_res := sql_select(table_name=parent_table_select, record_id=resolved_parent_id):
if not check_account_access(parent_sql_res, account, parent_obj_type):
return mk_resp(data=False, status_code=403, response=response, status_message="Access denied to parent.")
else:
@@ -249,13 +253,15 @@ async def post_child_obj(
if parent_obj_type not in obj_type_kv_li or child_obj_type not in obj_type_kv_li:
return mk_resp(data=False, status_code=400, response=response, status_message="Invalid object type.")
resolved_parent_id = redis_lookup_id_random(record_id_random=parent_obj_id, table_name=parent_obj_type)
# ID Vision: Resolve physical table names from registry to support aliases
parent_table = obj_type_kv_li[parent_obj_type].get('tbl')
resolved_parent_id = redis_lookup_id_random(record_id_random=parent_obj_id, table_name=parent_table)
if not resolved_parent_id:
return mk_resp(data=False, status_code=404, response=response, status_message="Parent not found.")
parent_cfg = obj_type_kv_li[parent_obj_type]
parent_table = parent_cfg.get('tbl_default', parent_cfg.get('tbl'))
if parent_sql_res := sql_select(table_name=parent_table, record_id=resolved_parent_id):
parent_table_select = parent_cfg.get('tbl_default', parent_cfg.get('tbl'))
if parent_sql_res := sql_select(table_name=parent_table_select, record_id=resolved_parent_id):
if not check_account_access(parent_sql_res, account, parent_obj_type):
return mk_resp(data=False, status_code=403, response=response, status_message="Access denied to parent.")
else:
@@ -325,8 +331,15 @@ async def get_child_obj(
if delay.sleep_time_s > 0: await asyncio.sleep(delay.sleep_time_s)
resolved_parent_id = redis_lookup_id_random(record_id_random=parent_obj_id, table_name=parent_obj_type)
resolved_child_id = redis_lookup_id_random(record_id_random=child_obj_id, table_name=child_obj_type)
# ID Vision: Resolve physical table names from registry to support aliases
if parent_obj_type not in obj_type_kv_li or child_obj_type not in obj_type_kv_li:
return mk_resp(data=False, status_code=400, response=response, status_message="Invalid object type(s).")
parent_table = obj_type_kv_li[parent_obj_type].get('tbl')
child_table = obj_type_kv_li[child_obj_type].get('tbl')
resolved_parent_id = redis_lookup_id_random(record_id_random=parent_obj_id, table_name=parent_table)
resolved_child_id = redis_lookup_id_random(record_id_random=child_obj_id, table_name=child_table)
if not resolved_parent_id or not resolved_child_id:
return mk_resp(data=False, status_code=404, response=response, status_message="Object(s) not found.")
@@ -368,8 +381,127 @@ async def patch_child_obj(
if delay.sleep_time_s > 0: await asyncio.sleep(delay.sleep_time_s)
obj_data = await request.json()
resolved_parent_id = redis_lookup_id_random(record_id_random=parent_obj_id, table_name=parent_obj_type)
resolved_child_id = redis_lookup_id_random(record_id_random=child_obj_id, table_name=child_obj_type)
# ID Vision: Resolve physical table names from registry to support aliases
if parent_obj_type not in obj_type_kv_li or child_obj_type not in obj_type_kv_li:
return mk_resp(data=False, status_code=400, response=response, status_message="Invalid object type(s).")
parent_table = obj_type_kv_li[parent_obj_type].get('tbl')
child_table = obj_type_kv_li[child_obj_type].get('tbl')
resolved_parent_id = redis_lookup_id_random(record_id_random=parent_obj_id, table_name=parent_table)
resolved_child_id = redis_lookup_id_random(record_id_random=child_obj_id, table_name=child_table)
if not resolved_parent_id or not resolved_child_id:
return mk_resp(data=False, status_code=404, response=response, status_message="Object(s) not found.")
obj_cfg = obj_type_kv_li[child_obj_type]
table_name_update = obj_cfg.get('tbl_update', obj_cfg.get('tbl'))
table_name_select = obj_cfg.get('tbl_default', obj_cfg.get('tbl'))
input_model = obj_cfg.get('mdl_in', obj_cfg.get('mdl'))
output_model = obj_cfg.get('mdl_out', obj_cfg.get('mdl_default', obj_cfg.get('mdl')))
if existing_child := sql_select(table_name=table_name_select, record_id=resolved_child_id):
if existing_child.get(f'{parent_obj_type}_id') != resolved_parent_id:
return mk_resp(data=False, status_code=404, response=response, status_message="Child not found under parent.")
else:
return mk_resp(data=False, status_code=404, response=response, status_message="Child not found.")
# Sanitize payload (ID resolution, virtual fields, and optionally extra fields)
sanitize_payload(obj_data, input_model, ignore_extra=x_ae_ignore_extra_fields)
if sql_update(data=obj_data, table_name=table_name_update, record_id=resolved_child_id):
if return_obj:
if updated_child := sql_select(table_name=table_name_select, record_id=resolved_child_id):
resp_data = output_model(**updated_child).dict(by_alias=serialization.by_alias, exclude_unset=serialization.exclude_unset)
return mk_resp(data=resp_data, response=response)
return mk_resp(data=True, response=response, status_message="Updated successfully.")
else:
db_err = format_db_error(get_last_sql_error())
return mk_resp(data=False, status_code=400, response=response, status_message="Update failed.", details=db_err.dict())
@router.get('/{parent_obj_type}/{parent_obj_id}/{child_obj_type}/{child_obj_id}', response_model=Resp_Body_Base)
async def get_child_obj(
response: Response,
parent_obj_type: str = Path(min_length=2, max_length=50),
parent_obj_id: str = Path(min_length=11, max_length=22),
child_obj_type: str = Path(min_length=2, max_length=50),
child_obj_id: str = Path(min_length=11, max_length=22),
view: str = Query('default'),
account: AccountContext = Depends(get_account_context),
serialization: SerializationParams = Depends(),
delay: DelayParams = Depends(),
):
"""
Retrieve Child Object.
Verifies that the child belongs to the specified parent.
"""
from app.db_sql import redis_lookup_id_random, sql_select
if delay.sleep_time_s > 0: await asyncio.sleep(delay.sleep_time_s)
# ID Vision: Resolve physical table names from registry to support aliases
if parent_obj_type not in obj_type_kv_li or child_obj_type not in obj_type_kv_li:
return mk_resp(data=False, status_code=400, response=response, status_message="Invalid object type(s).")
parent_table = obj_type_kv_li[parent_obj_type].get('tbl')
child_table = obj_type_kv_li[child_obj_type].get('tbl')
resolved_parent_id = redis_lookup_id_random(record_id_random=parent_obj_id, table_name=parent_table)
resolved_child_id = redis_lookup_id_random(record_id_random=child_obj_id, table_name=child_table)
if not resolved_parent_id or not resolved_child_id:
return mk_resp(data=False, status_code=404, response=response, status_message="Object(s) not found.")
obj_cfg = obj_type_kv_li[child_obj_type]
table_name = obj_cfg.get(f'tbl_{view}', obj_cfg.get('tbl_default', obj_cfg.get('tbl')))
base_name = obj_cfg.get(f'mdl_{view}', obj_cfg.get('mdl_default', obj_cfg.get('mdl')))
if sql_result := sql_select(table_name=table_name, record_id=resolved_child_id):
if sql_result.get(f'{parent_obj_type}_id') != resolved_parent_id:
return mk_resp(data=False, status_code=404, response=response, status_message="Child not found under parent.")
resp_data = base_name(**sql_result).dict(by_alias=serialization.by_alias, exclude_unset=serialization.exclude_unset)
return mk_resp(data=resp_data, response=response)
return mk_resp(data=False, status_code=404, response=response, status_message="Child not found.")
@router.patch('/{parent_obj_type}/{parent_obj_id}/{child_obj_type}/{child_obj_id}', response_model=Resp_Body_Base)
async def patch_child_obj(
request: Request,
response: Response,
parent_obj_type: str = Path(min_length=2, max_length=50),
parent_obj_id: str = Path(min_length=11, max_length=22),
child_obj_type: str = Path(min_length=2, max_length=50),
child_obj_id: str = Path(min_length=11, max_length=22),
return_obj: Optional[bool] = True,
x_ae_ignore_extra_fields: Optional[bool] = Header(False),
account: AccountContext = Depends(get_account_context),
serialization: SerializationParams = Depends(),
delay: DelayParams = Depends(),
):
"""
Update Child Object.
Verifies that the child belongs to the specified parent before updating.
"""
from app.db_sql import redis_lookup_id_random, sql_select, sql_update
if delay.sleep_time_s > 0: await asyncio.sleep(delay.sleep_time_s)
obj_data = await request.json()
# ID Vision: Resolve physical table names from registry to support aliases
if parent_obj_type not in obj_type_kv_li or child_obj_type not in obj_type_kv_li:
return mk_resp(data=False, status_code=400, response=response, status_message="Invalid object type(s).")
parent_table = obj_type_kv_li[parent_obj_type].get('tbl')
child_table = obj_type_kv_li[child_obj_type].get('tbl')
resolved_parent_id = redis_lookup_id_random(record_id_random=parent_obj_id, table_name=parent_table)
resolved_child_id = redis_lookup_id_random(record_id_random=child_obj_id, table_name=child_table)
if not resolved_parent_id or not resolved_child_id:
return mk_resp(data=False, status_code=404, response=response, status_message="Object(s) not found.")
@@ -420,8 +552,15 @@ async def delete_child_obj(
if delay.sleep_time_s > 0: await asyncio.sleep(delay.sleep_time_s)
resolved_parent_id = redis_lookup_id_random(record_id_random=parent_obj_id, table_name=parent_obj_type)
resolved_child_id = redis_lookup_id_random(record_id_random=child_obj_id, table_name=child_obj_type)
# ID Vision: Resolve physical table names from registry to support aliases
if parent_obj_type not in obj_type_kv_li or child_obj_type not in obj_type_kv_li:
return mk_resp(data=False, status_code=400, response=response, status_message="Invalid object type(s).")
parent_table = obj_type_kv_li[parent_obj_type].get('tbl')
child_table = obj_type_kv_li[child_obj_type].get('tbl')
resolved_parent_id = redis_lookup_id_random(record_id_random=parent_obj_id, table_name=parent_table)
resolved_child_id = redis_lookup_id_random(record_id_random=child_obj_id, table_name=child_table)
if not resolved_parent_id or not resolved_child_id:
return mk_resp(data=False, status_code=404, response=response, status_message="Object(s) not found.")
@@ -445,4 +584,4 @@ async def delete_child_obj(
if success:
return mk_resp(data=True, response=response, status_message=f"Deleted successfully via {method}.")
return mk_resp(data=False, status_code=400, response=response, status_message="Deletion failed.")
return mk_resp(data=False, status_code=400, response=response, status_message="Deletion failed.")