2 Commits

4 changed files with 138 additions and 0 deletions

View File

@@ -33,6 +33,12 @@ events_registration_obj_li = {
'member_status', 'registration_type_code', 'member_status', 'registration_type_code',
'notes', 'created_on', 'updated_on', 'default_qry_str' 'notes', 'created_on', 'updated_on', 'default_qry_str'
], ],
# Allow nested operations under both `event` and `event_person` parents.
# `event_badge` is directly linked to `event_person` (FK: event_person_id),
# but views expose it under `event` as well. Explicitly register both
# so nested CRUD routes like POST /v3/crud/event_person/{id}/event_badge/
# will be accepted by the generic nested router.
'parent_types': ['event', 'event_person'],
}, },
'event_badge_template': { 'event_badge_template': {
'tbl': 'event_badge_template', 'tbl': 'event_badge_template',

View File

@@ -85,6 +85,9 @@ async def get_child_obj_li(
table_name = obj_cfg.get(f'tbl_{view}', obj_cfg.get('tbl_default', obj_cfg.get('tbl'))) table_name = obj_cfg.get(f'tbl_{view}', obj_cfg.get('tbl_default', obj_cfg.get('tbl')))
base_name = obj_cfg.get(f'mdl_{view}', obj_cfg.get('mdl_default', obj_cfg.get('mdl'))) base_name = obj_cfg.get(f'mdl_{view}', obj_cfg.get('mdl_default', obj_cfg.get('mdl')))
# Log parent/child resolution details (use INFO so logs appear in production)
log.info("nested.list start parent=%s parent_table=%s parent_id_random=%s child=%s table=%s allowed_parents=%s", parent_obj_type, parent_table, parent_obj_id, obj_name, table_name, obj_cfg.get('parent_types'))
if not table_name or not base_name: if not table_name or not base_name:
return mk_resp(data=False, status_code=500, response=response, status_message=f"Configuration error.") return mk_resp(data=False, status_code=500, response=response, status_message=f"Configuration error.")
@@ -92,15 +95,26 @@ async def get_child_obj_li(
status_filter = get_supported_filters(base_name, status_filter) status_filter = get_supported_filters(base_name, status_filter)
resolved_parent_id = redis_lookup_id_random(record_id_random=parent_obj_id, table_name=parent_table) resolved_parent_id = redis_lookup_id_random(record_id_random=parent_obj_id, table_name=parent_table)
log.info("nested.list resolved_parent_id=%s (random=%s) for parent_table=%s", resolved_parent_id, parent_obj_id, parent_table)
if not resolved_parent_id: if not resolved_parent_id:
log.info("nested.list parent resolution failed for random id=%s table=%s", parent_obj_id, parent_table)
return mk_resp(data=False, status_code=404, response=response, status_message=f"Parent not found.") return mk_resp(data=False, status_code=404, response=response, status_message=f"Parent not found.")
# Enforce allowed parent types when configured on the child object
allowed_parents = obj_cfg.get('parent_types')
if allowed_parents and parent_obj_type not in allowed_parents:
log.info("nested.list invalid parent type: parent=%s allowed=%s", parent_obj_type, allowed_parents)
return mk_resp(data=False, status_code=400, response=response, status_message=f"Invalid parent type for this child.")
parent_cfg = obj_type_kv_li[parent_obj_type] parent_cfg = obj_type_kv_li[parent_obj_type]
parent_table_select = parent_cfg.get('tbl_default', parent_cfg.get('tbl')) parent_table_select = parent_cfg.get('tbl_default', parent_cfg.get('tbl'))
if parent_sql_res := sql_select(table_name=parent_table_select, record_id=resolved_parent_id): if parent_sql_res := sql_select(table_name=parent_table_select, record_id=resolved_parent_id):
log.info("nested.list parent_sql_res found for id=%s table=%s", resolved_parent_id, parent_table_select)
if not check_account_access(parent_sql_res, account, parent_obj_type): if not check_account_access(parent_sql_res, account, parent_obj_type):
log.info("nested.list access denied to parent id=%s for account=%s", resolved_parent_id, getattr(account, 'account_id', None))
return mk_resp(data=False, status_code=403, response=response, status_message="Access denied to parent.") return mk_resp(data=False, status_code=403, response=response, status_message="Access denied to parent.")
else: else:
log.info("nested.list parent sql_select returned no row for id=%s table=%s", resolved_parent_id, parent_table_select)
return mk_resp(data=False, status_code=404, response=response, status_message="Parent not found.") return mk_resp(data=False, status_code=404, response=response, status_message="Parent not found.")
and_qry_dict_obj = apply_forced_account_filter(and_qry_dict_obj, account, base_name, obj_name, table_name=table_name) and_qry_dict_obj = apply_forced_account_filter(and_qry_dict_obj, account, base_name, obj_name, table_name=table_name)
@@ -182,15 +196,26 @@ async def search_child_obj_li(
searchable_fields = obj_cfg.get('searchable_fields') searchable_fields = obj_cfg.get('searchable_fields')
resolved_parent_id = redis_lookup_id_random(record_id_random=parent_obj_id, table_name=parent_table) resolved_parent_id = redis_lookup_id_random(record_id_random=parent_obj_id, table_name=parent_table)
log.info("nested.search resolved_parent_id=%s (random=%s) for parent_table=%s", resolved_parent_id, parent_obj_id, parent_table)
if not resolved_parent_id: if not resolved_parent_id:
log.info("nested.search parent resolution failed for random id=%s table=%s", parent_obj_id, parent_table)
return mk_resp(data=False, status_code=404, response=response, status_message="Parent not found.") return mk_resp(data=False, status_code=404, response=response, status_message="Parent not found.")
# Enforce allowed parent types when configured on the child object
allowed_parents = obj_cfg.get('parent_types')
if allowed_parents and parent_obj_type not in allowed_parents:
log.info("nested.search invalid parent type: parent=%s allowed=%s", parent_obj_type, allowed_parents)
return mk_resp(data=False, status_code=400, response=response, status_message="Invalid parent type for this child.")
parent_cfg = obj_type_kv_li[parent_obj_type] parent_cfg = obj_type_kv_li[parent_obj_type]
parent_table_select = parent_cfg.get('tbl_default', parent_cfg.get('tbl')) parent_table_select = parent_cfg.get('tbl_default', parent_cfg.get('tbl'))
if parent_sql_res := sql_select(table_name=parent_table_select, record_id=resolved_parent_id): if parent_sql_res := sql_select(table_name=parent_table_select, record_id=resolved_parent_id):
log.info("nested.search parent_sql_res found for id=%s table=%s", resolved_parent_id, parent_table_select)
if not check_account_access(parent_sql_res, account, parent_obj_type): if not check_account_access(parent_sql_res, account, parent_obj_type):
log.info("nested.search access denied to parent id=%s for account=%s", resolved_parent_id, getattr(account, 'account_id', None))
return mk_resp(data=False, status_code=403, response=response, status_message="Access denied to parent.") return mk_resp(data=False, status_code=403, response=response, status_message="Access denied to parent.")
else: else:
log.info("nested.search parent sql_select returned no row for id=%s table=%s", resolved_parent_id, parent_table_select)
return mk_resp(data=False, status_code=404, response=response, status_message="Parent not found.") return mk_resp(data=False, status_code=404, response=response, status_message="Parent not found.")
# Enforce account isolation on the search query # Enforce account isolation on the search query
@@ -257,18 +282,29 @@ async def post_child_obj(
# ID Vision: Resolve physical table names from registry to support aliases # ID Vision: Resolve physical table names from registry to support aliases
parent_table = obj_type_kv_li[parent_obj_type].get('tbl') parent_table = obj_type_kv_li[parent_obj_type].get('tbl')
resolved_parent_id = redis_lookup_id_random(record_id_random=parent_obj_id, table_name=parent_table) resolved_parent_id = redis_lookup_id_random(record_id_random=parent_obj_id, table_name=parent_table)
log.info("nested.post parent=%s parent_table=%s parent_id_random=%s", parent_obj_type, parent_table, parent_obj_id)
log.info("nested.post resolved_parent_id=%s for random=%s table=%s", resolved_parent_id, parent_obj_id, parent_table)
if not resolved_parent_id: if not resolved_parent_id:
log.info("nested.post parent resolution failed for random id=%s table=%s", parent_obj_id, parent_table)
return mk_resp(data=False, status_code=404, response=response, status_message="Parent not found.") return mk_resp(data=False, status_code=404, response=response, status_message="Parent not found.")
parent_cfg = obj_type_kv_li[parent_obj_type] parent_cfg = obj_type_kv_li[parent_obj_type]
parent_table_select = parent_cfg.get('tbl_default', parent_cfg.get('tbl')) parent_table_select = parent_cfg.get('tbl_default', parent_cfg.get('tbl'))
if parent_sql_res := sql_select(table_name=parent_table_select, record_id=resolved_parent_id): if parent_sql_res := sql_select(table_name=parent_table_select, record_id=resolved_parent_id):
log.info("nested.post parent_sql_res found for id=%s table=%s", resolved_parent_id, parent_table_select)
if not check_account_access(parent_sql_res, account, parent_obj_type): if not check_account_access(parent_sql_res, account, parent_obj_type):
log.info("nested.post access denied to parent id=%s for account=%s", resolved_parent_id, getattr(account, 'account_id', None))
return mk_resp(data=False, status_code=403, response=response, status_message="Access denied to parent.") return mk_resp(data=False, status_code=403, response=response, status_message="Access denied to parent.")
else: else:
log.info("nested.post parent sql_select returned no row for id=%s table=%s", resolved_parent_id, parent_table_select)
return mk_resp(data=False, status_code=404, response=response, status_message="Parent not found.") return mk_resp(data=False, status_code=404, response=response, status_message="Parent not found.")
obj_cfg = obj_type_kv_li[child_obj_type] obj_cfg = obj_type_kv_li[child_obj_type]
# Enforce allowed parent types when configured on the child object
allowed_parents = obj_cfg.get('parent_types')
if allowed_parents and parent_obj_type not in allowed_parents:
log.info("nested.post invalid parent type: parent=%s allowed=%s", parent_obj_type, allowed_parents)
return mk_resp(data=False, status_code=400, response=response, status_message="Invalid parent type for this child.")
table_name_insert = obj_cfg.get('tbl_update', obj_cfg.get('tbl')) table_name_insert = obj_cfg.get('tbl_update', obj_cfg.get('tbl'))
table_name_select = obj_cfg.get('tbl_default', obj_cfg.get('tbl')) table_name_select = obj_cfg.get('tbl_default', obj_cfg.get('tbl'))
input_model = obj_cfg.get('mdl_in', obj_cfg.get('mdl')) input_model = obj_cfg.get('mdl_in', obj_cfg.get('mdl'))
@@ -351,6 +387,10 @@ async def get_child_obj(
return mk_resp(data=False, status_code=404, response=response, status_message="Object(s) not found.") return mk_resp(data=False, status_code=404, response=response, status_message="Object(s) not found.")
obj_cfg = obj_type_kv_li[child_obj_type] obj_cfg = obj_type_kv_li[child_obj_type]
# Enforce allowed parent types when configured on the child object
allowed_parents = obj_cfg.get('parent_types')
if allowed_parents and parent_obj_type not in allowed_parents:
return mk_resp(data=False, status_code=400, response=response, status_message="Invalid parent type for this child.")
table_name = obj_cfg.get(f'tbl_{view}', obj_cfg.get('tbl_default', obj_cfg.get('tbl'))) table_name = obj_cfg.get(f'tbl_{view}', obj_cfg.get('tbl_default', obj_cfg.get('tbl')))
base_name = obj_cfg.get(f'mdl_{view}', obj_cfg.get('mdl_default', obj_cfg.get('mdl'))) base_name = obj_cfg.get(f'mdl_{view}', obj_cfg.get('mdl_default', obj_cfg.get('mdl')))

View File

@@ -96,6 +96,7 @@ Always run test scripts from the **project root** directory. Most scripts includ
* Use snake_case (or Snake_Case or Snake_case or test_NASA_example or test_API_key) * Use snake_case (or Snake_Case or Snake_case or test_NASA_example or test_API_key)
* Aether test/demo base URL: 'http://demo.localhost:5173' * Aether test/demo base URL: 'http://demo.localhost:5173'
* Aether development API: 'https://dev-api.oneskyit.com' * Aether development API: 'https://dev-api.oneskyit.com'
* Aether development API "secret" key: 'nT0jPeiCfxSifkiDZur9jA'
These are IDs for records that we can use for testing. Please do not delete them. They are also used for demo purposes with clients. These are IDs for records that we can use for testing. Please do not delete them. They are also used for demo purposes with clients.

View File

@@ -0,0 +1,91 @@
"""
E2E: Nested-create regression test
- Creates an `event_person` under demo event `pjrcghqwert` then creates an
`event_badge` under that person using the nested CRUD endpoints.
- Cleans up created records on success.
Usage:
./environment/bin/python3 tests/e2e/test_e2e_v3_nested_create_event_badge.py
This test uses the standard Agent API Key defined in the project README.
"""
import os
import requests
import sys
import time
BASE = os.environ.get('AE_API_BASE', 'https://dev-api.oneskyit.com')
API_BASE = BASE.rstrip('/') + '/v3/crud'
AGENT_API_KEY = os.environ.get('AE_AGENT_API_KEY', 'nT0jPeiCfxSifkiDZur9jA')
EVENT_ID = os.environ.get('AE_TEST_EVENT', 'pjrcghqwert')
ACCOUNT_ID = os.environ.get('AE_ACCOUNT', '_XY7DXtc9MY')
HEADERS = {'x-aether-api-key': AGENT_API_KEY, 'x-account-id': ACCOUNT_ID, 'Content-Type': 'application/json'}
def print_result(label, success, message=""):
mark = '✅ PASS' if success else '❌ FAIL'
print(f"{mark} - {label}: {message}")
def run():
created = {}
try:
# 1) Create event_person under event
url = f"{API_BASE}/event/{EVENT_ID}/event_person/?return_obj=false"
r = requests.post(url, headers=HEADERS, json={})
if r.status_code != 200:
print_result('create event_person', False, f'status={r.status_code} body={r.text}')
return 2
data = r.json().get('data') or {}
person_id = data.get('obj_id') or data.get('obj_id_random')
if not person_id:
print_result('create event_person', False, f'missing obj_id in response {r.json()}')
return 2
created['person'] = person_id
print_result('create event_person', True, f'person_id={person_id}')
# small delay to let DB/indexing settle on remote dev
time.sleep(0.5)
# 2) Create event_badge under event_person
url = f"{API_BASE}/event_person/{person_id}/event_badge/?return_obj=false"
r2 = requests.post(url, headers=HEADERS, json={})
if r2.status_code != 200:
print_result('create event_badge', False, f'status={r2.status_code} body={r2.text}')
return 2
data2 = r2.json().get('data') or {}
badge_id = data2.get('obj_id') or data2.get('obj_id_random')
if not badge_id:
print_result('create event_badge', False, f'missing obj_id in response {r2.json()}')
return 2
created['badge'] = badge_id
print_result('create event_badge', True, f'badge_id={badge_id}')
# 3) Cleanup: delete badge then person
# Delete badge
del_url = f"{API_BASE}/event_person/{person_id}/event_badge/{badge_id}?method=delete"
rd = requests.delete(del_url, headers=HEADERS)
if rd.status_code == 200:
print_result('delete event_badge', True, '')
else:
print_result('delete event_badge', False, f'status={rd.status_code} body={rd.text}')
# Delete person (as child of event)
delp_url = f"{API_BASE}/event/{EVENT_ID}/event_person/{person_id}?method=delete"
rp = requests.delete(delp_url, headers=HEADERS)
if rp.status_code == 200:
print_result('delete event_person', True, '')
else:
print_result('delete event_person', False, f'status={rp.status_code} body={rp.text}')
return 0
except Exception as e:
print_result('exception', False, str(e))
return 2
if __name__ == '__main__':
sys.exit(run())