From 2266f149f7269e73bc2900b4ed5e14823bc77beb Mon Sep 17 00:00:00 2001 From: Scott Idem Date: Fri, 13 Feb 2026 18:45:20 -0500 Subject: [PATCH] security(v3): harden multi-tenant isolation and enhance failure feedback --- app/lib_api_crud_v3.py | 5 + app/models/archive_models.py | 60 +++-- app/models/auth_models.py | 1 + app/models/event_badge_models.py | 10 +- app/models/event_file_models.py | 3 + app/models/event_location_models.py | 11 +- app/models/event_presenter_models.py | 13 + app/models/event_session_models.py | 14 +- app/models/site_models.py | 71 +++--- app/object_definitions/cms.py | 3 +- app/routers/api_crud_v3.py | 11 +- app/routers/dependencies_v3.py | 43 +++- documentation/GUIDE__V3_FRONTEND_API.md | 276 ++++------------------ tests/e2e/test_e2e_v3_security_audit.py | 95 ++++++++ tests/e2e/verify_v3_security_hardening.py | 90 +++++++ 15 files changed, 389 insertions(+), 317 deletions(-) create mode 100644 tests/e2e/test_e2e_v3_security_audit.py create mode 100644 tests/e2e/verify_v3_security_hardening.py diff --git a/app/lib_api_crud_v3.py b/app/lib_api_crud_v3.py index 78e489f..de77ead 100644 --- a/app/lib_api_crud_v3.py +++ b/app/lib_api_crud_v3.py @@ -113,7 +113,12 @@ def apply_forced_account_filter(and_qry_dict: Optional[Dict], account: AccountCo except: has_col = False + if not has_col: + return forced + + # CRITICAL: Always apply the filter. If account_id is None, it filters for NULL. forced[target_col] = account.account_id + return forced def filter_order_by(order_by_li: Any, model: Any, table_name: str = None) -> Optional[Dict[str, str]]: diff --git a/app/models/archive_models.py b/app/models/archive_models.py index 8d62528..0b68e82 100644 --- a/app/models/archive_models.py +++ b/app/models/archive_models.py @@ -1,7 +1,7 @@ import datetime, pytz from typing import Dict, List, Optional, Set, Union -from pydantic import BaseModel, EmailStr, Field, Json, PrivateAttr, ValidationError, validator +from pydantic import BaseModel, EmailStr, Field, Json, PrivateAttr, ValidationError, validator, root_validator from app.db_sql import redis_lookup_id_random from app.lib_general import log, logging @@ -14,15 +14,39 @@ class Archive_Base(BaseModel): log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) - id_random: Optional[str] = Field( - **base_fields['archive_id_random'], - alias = 'archive_id_random', - ) - id: Optional[int] = Field( - alias = 'archive_id' - ) - account_id_random: Optional[str] - account_id: Optional[int] + # --- Standardized Vision IDs (Strings for API, Integers for DB) --- + id: Optional[Union[int, str]] = Field(None, **base_fields['archive_id_random']) + archive_id: Optional[Union[int, str]] = Field(None, **base_fields['archive_id_random']) + account_id: Optional[Union[int, str]] = Field(None, **base_fields['account_id_random']) + + # --- Standardized Legacy / Internal IDs (Excluded) --- + id_random: Optional[str] = Field(None, alias='archive_id_random', exclude=True) + account_id_random: Optional[str] = Field(None, exclude=True) + + @root_validator(pre=True) + def map_v3_ids(cls, values): + """ + Vision Transformer: + Map DB keys to clean API keys and strip internal integers during READ operations. + During CREATE (POST) operations, we ensure resolved integers are preserved. + """ + # 1. Map Random Strings to Clean Names + rid = values.get('id_random') or values.get('archive_id_random') + if rid and isinstance(rid, str): + values['id'] = rid + values['archive_id'] = rid + + if a_rid := values.get('account_id_random'): values['account_id'] = a_rid + + # 2. Prevent leakage of integers during API responses (Vision Standard) + for k in ['id', 'archive_id', 'account_id']: + val = values.get(k) + if val is not None and not isinstance(val, str): + if values.get(f'{k}_random') or (k=='id' and values.get('id_random')): + del values[k] + + return values + archive_type_id: Optional[int] archive_type: Optional[str] @@ -70,22 +94,8 @@ class Archive_Base(BaseModel): _processed_at: datetime.datetime = PrivateAttr(default_factory=datetime.datetime.now) - @validator('id', always=True) - def archive_id_lookup(cls, v, values, **kwargs): - if isinstance(v, int) and v > 0: return v - elif id_random := values.get('id_random'): - return redis_lookup_id_random(record_id_random=id_random, table_name='archive') - return None - - @validator('account_id', always=True) - def account_id_lookup(cls, v, values, **kwargs): - if isinstance(v, int) and v > 0: return v - elif id_random := values.get('account_id_random'): - return redis_lookup_id_random(record_id_random=id_random, table_name='account') - return None - class Config: underscore_attrs_are_private = True - allow_population_by_field_name = True + allow_population_by_field_name = False fields = base_fields # ### END ### API Archive Models ### Archive_Base() ### diff --git a/app/models/auth_models.py b/app/models/auth_models.py index ed2bc29..ae7ff21 100644 --- a/app/models/auth_models.py +++ b/app/models/auth_models.py @@ -12,3 +12,4 @@ class AccountContext(BaseModel): super: bool = False auth_method: str = 'legacy_header' token_payload: Optional[dict] = None + auth_error: Optional[str] = None diff --git a/app/models/event_badge_models.py b/app/models/event_badge_models.py index e85f9bf..028c858 100644 --- a/app/models/event_badge_models.py +++ b/app/models/event_badge_models.py @@ -19,6 +19,7 @@ class Event_Badge_Base(BaseModel): # --- Standardized Vision IDs (Strings for API, Integers for DB) --- id: Optional[Union[int, str]] = Field(**base_fields['event_badge_id_random']) event_badge_id: Optional[Union[int, str]] = Field(**base_fields['event_badge_id_random']) + account_id: Optional[Union[int, str]] = Field(None, **base_fields['account_id_random']) event_id: Optional[Union[int, str]] = Field(**base_fields['event_id_random']) # NOTE: This should only be used when the event_person record can not be created. And records before 2022. @@ -30,6 +31,7 @@ class Event_Badge_Base(BaseModel): # --- Standardized Legacy / Internal IDs (Excluded) --- id_random: Optional[str] = Field(None, alias='event_badge_id_random', exclude=True) + account_id_random: Optional[str] = Field(None, exclude=True) event_id_random: Optional[str] = Field(None, exclude=True) event_id_random_only: Optional[str] = Field(None, exclude=True) event_badge_template_id_random: Optional[str] = Field(None, exclude=True) @@ -49,6 +51,7 @@ class Event_Badge_Base(BaseModel): values['id'] = rid values['event_badge_id'] = rid + if a_rid := values.get('account_id_random'): values['account_id'] = a_rid if e_rid := values.get('event_id_random'): values['event_id'] = e_rid if eo_rid := values.get('event_id_random_only'): values['event_id_only'] = eo_rid if et_rid := values.get('event_badge_template_id_random'): values['event_badge_template_id'] = et_rid @@ -56,7 +59,7 @@ class Event_Badge_Base(BaseModel): if p_rid := values.get('person_id_random'): values['person_id'] = p_rid # 2. Prevent leakage of integers during API responses (Vision Standard) - for k in ['id', 'event_badge_id', 'event_id', 'event_id_only', 'event_badge_template_id', 'event_person_id', 'person_id']: + for k in ['id', 'event_badge_id', 'account_id', 'event_id', 'event_id_only', 'event_badge_template_id', 'event_person_id', 'person_id']: val = values.get(k) if val is not None and not isinstance(val, str): values[k] = None @@ -201,11 +204,13 @@ class Event_Badge_Basic_Base(BaseModel): # --- Standardized Vision IDs (Strings for API, Integers for DB) --- id: Optional[Union[int, str]] = Field(None, **base_fields['event_badge_id_random']) event_badge_id: Optional[Union[int, str]] = Field(None, **base_fields['event_badge_id_random']) + account_id: Optional[Union[int, str]] = Field(None, **base_fields['account_id_random']) event_badge_template_id: Optional[Union[int, str]] = Field(None, **base_fields['event_badge_template_id_random']) event_person_id: Optional[Union[int, str]] = Field(None, **base_fields['event_person_id_random']) # --- Standardized Legacy / Internal IDs (Excluded) --- id_random: Optional[str] = Field(None, alias='event_badge_id_random', exclude=True) + account_id_random: Optional[str] = Field(None, exclude=True) event_badge_template_id_random: Optional[str] = Field(None, exclude=True) event_person_id_random: Optional[str] = Field(None, exclude=True) @@ -222,11 +227,12 @@ class Event_Badge_Basic_Base(BaseModel): values['id'] = rid values['event_badge_id'] = rid + if a_rid := values.get('account_id_random'): values['account_id'] = a_rid if et_rid := values.get('event_badge_template_id_random'): values['event_badge_template_id'] = et_rid if ep_rid := values.get('event_person_id_random'): values['event_person_id'] = ep_rid # 2. Prevent "Collision Population" or leakage of integers during API responses - for k in ['id', 'event_badge_id', 'event_badge_template_id', 'event_person_id']: + for k in ['id', 'event_badge_id', 'account_id', 'event_badge_template_id', 'event_person_id']: val = values.get(k) if val is not None and not isinstance(val, str): if values.get(f'{k}_random') or (k=='id' and values.get('id_random')): diff --git a/app/models/event_file_models.py b/app/models/event_file_models.py index 9871f9a..571c538 100644 --- a/app/models/event_file_models.py +++ b/app/models/event_file_models.py @@ -19,6 +19,7 @@ class Event_File_Base(BaseModel): # --- Standardized Vision IDs (Strings for API, Integers for DB) --- id: Optional[Union[int, str]] = Field(**base_fields['event_file_id_random']) event_file_id: Optional[Union[int, str]] = Field(**base_fields['event_file_id_random']) + account_id: Optional[Union[int, str]] = Field(None, **base_fields['account_id_random']) hosted_file_id: Optional[Union[int, str]] = Field(**base_fields['hosted_file_id_random']) # Generic Relational target @@ -34,6 +35,7 @@ class Event_File_Base(BaseModel): # --- Standardized Legacy / Internal IDs (Excluded) --- id_random: Optional[str] = Field(None, alias='event_file_id_random', exclude=True) + account_id_random: Optional[str] = Field(None, exclude=True) hosted_file_id_random: Optional[str] = Field(None, exclude=True) for_id_random: Optional[str] = Field(None, exclude=True) event_id_random: Optional[str] = Field(None, exclude=True) @@ -60,6 +62,7 @@ class Event_File_Base(BaseModel): # 2. Map & Resolve Relational IDs # (Field Name, Table Name) id_map = [ + ('account_id', 'account'), ('hosted_file_id', 'hosted_file'), ('event_id', 'event'), ('event_exhibit_id', 'event_exhibit'), diff --git a/app/models/event_location_models.py b/app/models/event_location_models.py index b45c193..7dabfac 100644 --- a/app/models/event_location_models.py +++ b/app/models/event_location_models.py @@ -19,9 +19,16 @@ class Event_Location_Base(BaseModel): id: Optional[str] = Field(None, **base_fields['event_location_id_random']) event_location_id: Optional[str] = Field(None, **base_fields['event_location_id_random']) + account_id: Optional[str] = Field(None, **base_fields['account_id_random']) event_id: Optional[str] = Field(None, **base_fields['event_id_random']) event_track_id: Optional[str] = Field(None, **base_fields['event_track_id_random']) + # --- Standardized Legacy / Internal IDs (Excluded) --- + id_random: Optional[str] = Field(None, alias='event_location_id_random', exclude=True) + account_id_random: Optional[str] = Field(None, exclude=True) + event_id_random: Optional[str] = Field(None, exclude=True) + event_track_id_random: Optional[str] = Field(None, exclude=True) + code: Optional[str] = Field( # alias = 'event_location_code' ) @@ -108,13 +115,15 @@ class Event_Location_Base(BaseModel): values['id'] = rid values['event_location_id'] = rid + if a_rid := values.get('account_id_random'): + values['account_id'] = a_rid if e_rid := values.get('event_id_random'): values['event_id'] = e_rid if et_rid := values.get('event_track_id_random'): values['event_track_id'] = et_rid # 2. Prevent "Collision Population" - for k in ['id', 'event_location_id', 'event_id', 'event_track_id']: + for k in ['id', 'event_location_id', 'account_id', 'event_id', 'event_track_id']: if k in values and not isinstance(values[k], str) and values[k] is not None: del values[k] diff --git a/app/models/event_presenter_models.py b/app/models/event_presenter_models.py index ce2081c..b9e6dc8 100644 --- a/app/models/event_presenter_models.py +++ b/app/models/event_presenter_models.py @@ -27,6 +27,9 @@ class Event_Presenter_Base(BaseModel): alias = 'event_presenter_id' ) + account_id_random: Optional[str] + account_id: Optional[int] + external_id: Optional[str] code: Optional[str] @@ -197,6 +200,13 @@ class Event_Presenter_Base(BaseModel): return redis_lookup_id_random(record_id_random=id_random, table_name='event_presenter') return None + @validator('account_id', always=True) + def account_id_lookup(cls, v, values, **kwargs): + if isinstance(v, int) and v > 0: return v + elif id_random := values.get('account_id_random'): + return redis_lookup_id_random(record_id_random=id_random, table_name='account') + return None + @validator('event_id', always=True) def event_id_lookup(cls, v, values, **kwargs): if isinstance(v, int) and v > 0: return v @@ -253,6 +263,9 @@ class Event_Presenter_Out_Base(BaseModel): alias = 'event_presenter_id' ) + account_id_random: Optional[str] + account_id: Optional[int] + external_id: Optional[str] code: Optional[str] diff --git a/app/models/event_session_models.py b/app/models/event_session_models.py index e575b70..b4b151c 100644 --- a/app/models/event_session_models.py +++ b/app/models/event_session_models.py @@ -23,12 +23,22 @@ class Event_Session_Base(BaseModel): id: Optional[str] = Field(None, **base_fields['event_session_id_random']) event_session_id: Optional[str] = Field(None, **base_fields['event_session_id_random']) + account_id: Optional[str] = Field(None, **base_fields['account_id_random']) event_id: Optional[str] = Field(None, **base_fields['event_id_random']) event_location_id: Optional[str] = Field(None, **base_fields['event_location_id_random']) event_track_id: Optional[str] = Field(None, **base_fields['event_track_id_random']) poc_event_person_id: Optional[str] = Field(None, **base_fields['event_person_id_random']) poc_person_id: Optional[str] = Field(None, **base_fields['person_id_random']) + # --- Standardized Legacy / Internal IDs (Excluded) --- + id_random: Optional[str] = Field(None, alias='event_session_id_random', exclude=True) + account_id_random: Optional[str] = Field(None, exclude=True) + event_id_random: Optional[str] = Field(None, exclude=True) + event_location_id_random: Optional[str] = Field(None, exclude=True) + event_track_id_random: Optional[str] = Field(None, exclude=True) + poc_event_person_id_random: Optional[str] = Field(None, exclude=True) + poc_person_id_random: Optional[str] = Field(None, exclude=True) + external_id: Optional[str] = Field( # alias = 'event_session_external_id' ) @@ -184,6 +194,8 @@ class Event_Session_Base(BaseModel): values['id'] = rid values['event_session_id'] = rid + if a_rid := values.get('account_id_random'): + values['account_id'] = a_rid if e_rid := values.get('event_id_random'): values['event_id'] = e_rid if el_rid := values.get('event_location_id_random'): @@ -196,7 +208,7 @@ class Event_Session_Base(BaseModel): values['poc_person_id'] = pp_rid # 2. Prevent "Collision Population" - for k in ['id', 'event_session_id', 'event_id', 'event_location_id', 'event_track_id', 'poc_event_person_id', 'poc_person_id']: + for k in ['id', 'event_session_id', 'account_id', 'event_id', 'event_location_id', 'event_track_id', 'poc_event_person_id', 'poc_person_id']: if k in values and not isinstance(values[k], str) and values[k] is not None: del values[k] diff --git a/app/models/site_models.py b/app/models/site_models.py index 56e90c1..26b1161 100644 --- a/app/models/site_models.py +++ b/app/models/site_models.py @@ -1,7 +1,7 @@ import datetime, pytz from typing import Dict, List, Optional, Set, Union -from pydantic import BaseModel, EmailStr, Field, Json, PrivateAttr, ValidationError, validator +from pydantic import BaseModel, EmailStr, Field, Json, PrivateAttr, ValidationError, validator, root_validator from app.db_sql import redis_lookup_id_random from app.lib_general import log, logging @@ -15,16 +15,38 @@ class Site_Base(BaseModel): log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) - id_random: Optional[str] = Field( - **base_fields['site_id_random'], - alias = 'site_id_random', - ) - id: Optional[int] = Field( - alias = 'site_id' - ) + # --- Standardized Vision IDs (Strings for API, Integers for DB) --- + id: Optional[Union[int, str]] = Field(None, **base_fields['site_id_random']) + site_id: Optional[Union[int, str]] = Field(None, **base_fields['site_id_random']) + account_id: Optional[Union[int, str]] = Field(None, **base_fields['account_id_random']) - account_id_random: Optional[str] - account_id: Optional[int] + # --- Standardized Legacy / Internal IDs (Excluded) --- + id_random: Optional[str] = Field(None, alias='site_id_random', exclude=True) + account_id_random: Optional[str] = Field(None, exclude=True) + + @root_validator(pre=True) + def map_v3_ids(cls, values): + """ + Vision Transformer: + Map DB keys to clean API keys and strip internal integers during READ operations. + During CREATE (POST) operations, we ensure resolved integers are preserved. + """ + # 1. Map Random Strings to Clean Names + rid = values.get('id_random') or values.get('site_id_random') + if rid and isinstance(rid, str): + values['id'] = rid + values['site_id'] = rid + + if a_rid := values.get('account_id_random'): values['account_id'] = a_rid + + # 2. Prevent leakage of integers during API responses (Vision Standard) + for k in ['id', 'site_id', 'account_id']: + val = values.get(k) + if val is not None and not isinstance(val, str): + if values.get(f'{k}_random') or (k=='id' and values.get('id_random')): + del values[k] + + return values code: Optional[str] @@ -101,35 +123,8 @@ class Site_Base(BaseModel): _processed_at: datetime.datetime = PrivateAttr(default_factory=datetime.datetime.now) - @validator('id', always=True) - def site_id_lookup(cls, v, values, **kwargs): - log.setLevel(logging.WARNING) - log.debug(locals()) - - if values['id_random']: - log.debug(values['id_random']) - return redis_lookup_id_random(record_id_random=values['id_random'], table_name='site') - return None - - @validator('account_id', always=True) - def account_id_lookup(cls, v, values, **kwargs): - log.setLevel(logging.WARNING) - log.debug(locals()) - - if isinstance(v, int) and v > 0: return v - elif id_random := values.get('account_id_random'): - return redis_lookup_id_random(record_id_random=id_random, table_name='account') - return None - - @validator('account_id_random', always=True) - def account_id_random_lookup(cls, v, values, **kwargs): - if isinstance(v, str) and len(v) >= 11: return v - elif account_id := values.get('account_id'): - return get_id_random(record_id=account_id, table_name='account') - return None - class Config: underscore_attrs_are_private = True - allow_population_by_field_name = True + allow_population_by_field_name = False fields = base_fields # ### END ### API Site Models ### Site_Base() ### diff --git a/app/object_definitions/cms.py b/app/object_definitions/cms.py index eefdc92..e9e4801 100644 --- a/app/object_definitions/cms.py +++ b/app/object_definitions/cms.py @@ -125,7 +125,8 @@ cms_obj_li = { 'searchable_fields': [ 'id', 'account_id', 'site_id', 'id_random', 'account_id_random', 'site_id_random', - 'fqdn', 'enable', 'created_on', 'updated_on' + 'fqdn', 'access_key', 'site_access_key', + 'enable', 'created_on', 'updated_on' ], }, } diff --git a/app/routers/api_crud_v3.py b/app/routers/api_crud_v3.py index c0b0ed4..7fe318c 100644 --- a/app/routers/api_crud_v3.py +++ b/app/routers/api_crud_v3.py @@ -110,7 +110,7 @@ async def get_obj( obj_type_l1: str = Path(min_length=2, max_length=50), obj_id: str = Path(min_length=11, max_length=22), view: str = Query('default'), - account: AccountContext = Depends(get_account_context), + account: AccountContext = Depends(get_account_context_optional), serialization: SerializationParams = Depends(), delay: DelayParams = Depends(), ): @@ -143,8 +143,13 @@ async def get_obj( if sql_result := sql_select(table_name=table_name, record_id=record_id): if not obj_cfg.get('public_read', False): + # Strict context check for non-public objects + if account.auth_method == 'guest' or (account.account_id is None and not account.super): + reason = account.auth_error or "Account context required." + return mk_resp(data=False, status_code=403, response=response, status_message=reason) + if not check_account_access(sql_result, account, obj_name): - return mk_resp(data=False, status_code=403, response=response, status_message="Access denied.") + return mk_resp(data=False, status_code=403, response=response, status_message="Access denied. Record belongs to another account.") resp_data = base_name(**sql_result).dict(by_alias=serialization.by_alias, exclude_unset=serialization.exclude_unset, exclude_defaults=serialization.exclude_defaults, exclude_none=serialization.exclude_none) return mk_resp(data=resp_data, response=response) else: @@ -334,7 +339,7 @@ async def search_obj_li( if not account.super and for_obj_id != account.account_id_random: return mk_resp(data=False, status_code=403, response=response, status_message="Access denied to requested account.") - if not account.super and account.auth_method != 'bypass' and account.account_id: + if not is_public_read and not account.super and account.auth_method != 'bypass': if search_query.and_filters is None: search_query.and_filters = [] if obj_name == 'account': search_query.and_filters.append(SearchFilter(field='id', op='eq', value=account.account_id)) diff --git a/app/routers/dependencies_v3.py b/app/routers/dependencies_v3.py index c3ebb4d..c7a5aad 100644 --- a/app/routers/dependencies_v3.py +++ b/app/routers/dependencies_v3.py @@ -38,6 +38,7 @@ def get_account_context_optional( resolved_token_payload = None auth_method = 'guest' api_key_authorized = False + auth_error = None # 1. Mandatory Machine Auth (API Key) # Prefer header, fallback to query param @@ -56,16 +57,22 @@ def get_account_context_optional( if (not enable_from or enable_from <= now) and (not enable_to or now <= enable_to): api_key_authorized = True else: - log.error(f"Security: API Key {key_to_check} expired/not yet valid.") + auth_error = "API Key expired or not yet valid." + log.error(f"Security: {auth_error} Key: {key_to_check}") else: - log.error(f"Security: API Key {key_to_check} is disabled.") + auth_error = "API Key is disabled." + log.error(f"Security: {auth_error} Key: {key_to_check}") else: - log.error(f"Security: API Key {key_to_check} not found.") + auth_error = "API Key not found or invalid." + log.error(f"Security: {auth_error} Key: {key_to_check}") + else: + auth_error = "Mandatory API Key missing." # 2. Context Resolution (Only if API Key is valid) if api_key_authorized: # Default to machine auth if no account context is provided auth_method = 'api_key' + auth_error = "Account context required for this operation." # A. Resolve via Account ID Header if x_account_id: @@ -73,6 +80,9 @@ def get_account_context_optional( if looked_up_id := redis_lookup_id_random(table_name='account', record_id_random=x_account_id): resolved_account_id = looked_up_id auth_method = 'account_header' + auth_error = None + else: + auth_error = f"Account ID '{x_account_id}' not found." # B. Resolve via JWT / Token Query Param elif x_no_account_id_token: @@ -88,23 +98,34 @@ def get_account_context_optional( if looked_up_id := redis_lookup_id_random(table_name='account', record_id_random=resolved_account_id_random): resolved_account_id = looked_up_id auth_method = 'jwt_token' + auth_error = None + else: + auth_error = f"Account ID '{resolved_account_id_random}' from token not found." + else: + # JWT is valid but has no account_id (e.g. platform-wide guest) + # We keep auth_method as 'jwt_token' but account_id as None. + auth_method = 'jwt_token' + auth_error = "Valid token provided, but no account context found in payload." else: - log.warning("Security: Failed to decode JWT token.") + auth_error = "Failed to decode JWT token." + log.warning(f"Security: {auth_error}") # Legacy Fallback (just a raw random ID string) - if auth_method in ['guest', 'api_key']: - resolved_account_id_random = x_no_account_id_token + if auth_method in ['guest', 'api_key', 'jwt_token'] and auth_error: if looked_up_id := redis_lookup_id_random(table_name='account', record_id_random=x_no_account_id_token): resolved_account_id = looked_up_id + resolved_account_id_random = x_no_account_id_token auth_method = 'token_query' + auth_error = None # C. Resolve via Administrative Bypass elif x_no_account_id and x_no_account_id.lower() not in ['false', '0', 'null', 'undefined', 'none', 'no_account_id_here']: resolved_account_id = 1 resolved_account_id_random = '--- NO ACCOUNT ---' auth_method = 'bypass' + auth_error = None - log.info(f"V3 Auth: method={auth_method}, authorized={api_key_authorized}, account={resolved_account_id_random}") + log.info(f"V3 Auth: method={auth_method}, authorized={api_key_authorized}, account={resolved_account_id_random}, error={auth_error}") is_admin = (auth_method == 'bypass') is_manager = (auth_method == 'bypass') @@ -122,7 +143,8 @@ def get_account_context_optional( administrator=is_admin, manager=is_manager, super=is_super, - token_payload=resolved_token_payload + token_payload=resolved_token_payload, + auth_error=auth_error ) def get_account_context( @@ -134,8 +156,9 @@ def get_account_context( ) -> AccountContext: """Strict version of account context resolution.""" ctx = get_account_context_optional(x_account_id, x_no_account_id, x_no_account_id_token, x_aether_api_key, x_aether_api_key_query) - if ctx.auth_method == 'guest': - raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail='Account context required.') + if ctx.auth_method == 'guest' or (ctx.account_id is None and not ctx.super): + reason = ctx.auth_error or "Account context required." + raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=reason) return ctx diff --git a/documentation/GUIDE__V3_FRONTEND_API.md b/documentation/GUIDE__V3_FRONTEND_API.md index 7ce80fe..5b14a37 100644 --- a/documentation/GUIDE__V3_FRONTEND_API.md +++ b/documentation/GUIDE__V3_FRONTEND_API.md @@ -1,266 +1,70 @@ # Aether API V3 Frontend Integration Guide (Svelte/TypeScript) -This guide explains how to update or create frontend functions to interact with the new **Aether API V3 CRUD** and **Action** endpoints. V3 introduces a nested URL structure, a powerful POST-based search, and specialized "Action" routes for binary data. +This guide defines the standards for interacting with the **Aether API V3 CRUD** and **Action** endpoints. --- -## 1. Key Differences (V2 vs V3) +## 1. Authentication and Security (Mandatory) -| Feature | CRUD V2 (Legacy) | CRUD V3 (Modern) | -| ------------------ | --------------------- | ---------------------------------------------------- | -| **Base Prefix** | `/v2/crud` | `/v3/crud` | -| **List Suffix** | Uses `/list` | **No suffix** (e.g., `/v3/crud/journal/`) | -| **Nested Path** | Not supported in URL | **Supported** (e.g., `/v3/crud/journal/{id}/entry/`) | -| **Object Aliases** | Limited | **Supported** (e.g., `entry` maps to `journal_entry`)| -| **View Selection** | `tbl_alt`, `mdl_alt` | **`view` parameter** (e.g., `?view=enriched`) | -| **Complex Search** | Limited to GET `jp` | **POST `/search`** (Unlimited size + Hybrid params) | -| **Full-Text** | Manual column names | **Reserved `q` property** in SearchQuery | - ---- - -## 2. Authentication and Security (Mandatory) - -As of January 2026, the V3 architecture enforces strict **Multi-Tenant Isolation** and **Machine Authorization**. Most requests require two levels of validation. +V3 architecture enforces strict **Multi-Tenant Isolation** and **Machine Authorization**. Requests require two levels of validation. ### A. The "Entry Ticket" (API Key) -**Mandatory for all requests.** Every request must provide a valid `x-aether-api-key` in the header. This identifies the application or script. - +**Mandatory for all requests.** identifies the application or client. * **Header:** `x-aether-api-key: ` -* **Status Code:** `403 Forbidden` if missing, invalid, or expired. +* **Status Code:** `403 Forbidden` if missing or invalid. ### B. The "Visa" (Account Context) -Once the API Key is validated, you must specify the context of your request. - -1. **Standard User Access**: Provide the `x-account-id` (the random string ID). +Required for any non-public data (Journals, Badges, Users, etc.). +1. **Standard Access**: Provide the `x-account-id` (the random string ID). * **Header:** `x-account-id: ` 2. **Administrative Bypass**: For authorized scripts needing global access. * **Header:** `x-no-account-id: bypass` -3. **Guest / Anonymous Access**: Provide a **Safe Guest JWT**. - * **Header:** `x-aether-api-key: ` (No Account Header) - * **Query Param:** `?jwt=` +3. **Token Access**: Provide a **JWT** in the query string. + * **Query Param:** `?jwt=` + +> [!CAUTION] +> **UNSUPPORTED HEADERS:** The header `x-aether-api-token` is **NOT recognized** by the V3 API. If you send it, the backend will treat you as a guest and block access to private data. --- -## 3. Implementing V3 CRUD Functions +## 2. Bootstrapping (The FQDN Handshake) -### A. List & Single Object (GET) -```ts -// GET /v3/crud/journal/{id}?view=enriched -export async function get_ae_obj_v3({ api_cfg, obj_type, obj_id, view = 'default' }) { - const endpoint = `/v3/crud/${obj_type}/${obj_id}`; - return await get_object({ api_cfg, endpoint, params: { view } }); -} -``` - -### B. Advanced & Hybrid Search (POST) -```ts -// POST /v3/crud/{obj_type}/search -export async function search_ae_obj_v3({ api_cfg, obj_type, search_query, enabled = 'enabled' }) { - const endpoint = `/v3/crud/${obj_type}/search`; - return await post_object({ api_cfg, endpoint, params: { enabled }, data: search_query }); +When the frontend first loads and doesn't know the `account_id`, it performs a "handshake" using its domain name. + +**Endpoint:** `POST /v3/crud/site_domain/search` +**Body:** +```json +{ + "and": [ + { "field": "fqdn", "op": "eq", "value": "demo.oneskyit.com" } + ] } ``` +**Results:** +* Returns 200 + a list containing the `account_id` and `site_id` random strings. +* ** ڈیزائن Choice:** If the domain is not found, it returns **200 OK with an empty list `[]`**. It is NOT a 404. --- -## 4. Hosted File Management (Modern V3 Actions) +## 3. Standard CRUD Patterns -V3 uses specialized **"Action"** routes for binary operations to separate processing logic from standard metadata CRUD. +### A. GET by ID +Used when the ID is known. +* **Endpoint:** `GET /v3/crud/{obj_type}/{id_random}` +* **Security:** Returns 403 if the record doesn't belong to your `x-account-id`. -### A. Upload Action -**Path**: `POST /v3/action/hosted_file/upload` -**Format**: `multipart/form-data` - -| Field | Type | Required | Description | -| :--- | :--- | :--- | :--- | -| `file_list` | File[] | Yes | One or more files to upload. | -| `account_id` | String | Yes | Random ID of the owner account. | -| `link_to_type`| String | Yes | Object type to link (e.g., `archive_content`). | -| `link_to_id` | String | Yes | Random ID of the object to link. | - -**Query Parameters:** -- `allowed_extensions`: Whitelist check (e.g., `?allowed_extensions=png&allowed_extensions=jpg`). -- `delay_ms`: Simulate network delay for UI testing. - -**Features:** -- **Automatic Deduplication:** API checks SHA256 hashes. If a file exists on the server, it reuses the record and creates a new link instead of a duplicate upload. -- **Relational Integrity:** Automatically creates `hosted_file_link` records. - -### B. Download & Streaming Action -**Path**: `GET /v3/action/hosted_file/{id}/download` - -**Query Parameters:** -| Parameter | Type | Description | -| :--- | :--- | :--- | -| `key` | String | **Temporary V3.0 Auth:** Pass any valid `account_id_random` to bypass headers. | -| `site_key` | String | Bypass headers via `access_key` from the `site` table. | -| `filename` | String | Override the response filename. | - -**Features:** -- **ID Vision:** Automatically resolves `{id}` if it belongs to a container object (e.g., `event_file`) instead of a direct `hosted_file`. -- **Streaming:** Supports standard `Range` headers for large files and video seeking. -- **Testing:** Supports `delay_ms` query parameter. - -> [!WARNING] -> **TEMPORARY SOLUTION (V3.0):** The `?key=` and `?site_key=` unauthenticated access patterns are intended to unblock the frontend for inline images and direct links where custom headers are not possible. This will be replaced by a standardized Signed URL or Read-Token system in **Version 3.1**. Please do not rely on this pattern for long-term security architecture. - -### C. Hash-Based Download (Content-Addressable) -**Path**: `GET /v3/action/hosted_file/hash/{sha256}/download` - -**Features:** -- **Local Caching:** Ideal for systems like the Events Launcher that cache files locally by hash. -- **Flexible Auth:** Supports `api_key` in the query parameter (e.g., `?api_key=`) for simple machine-to-machine requests. -- **Zero DB Lookup:** Resolves the physical path deterministically from the hash, bypassing database latency. - -### D. Deletion & Cleanup Action -**Path**: `DELETE /v3/action/hosted_file/{id}` - -| Parameter | Type | Default | Description | -| :--- | :--- | :--- | :--- | -| `link_to_type`| Query | null | The type of the link to remove. | -| `link_to_id` | Query | null | The random ID of the link to remove. | -| `rm_orphan` | Query | false | If true, physically delete file if no links remain. | -| `method` | Query | `hide` | Cleanup method: `hide`, `disable`, or `delete` (hard). | -| `fake_delete` | Query | false | **Testing Mode:** Verifies file/record existence without modification. | +### B. POST Search +The primary way to retrieve data. +* **Endpoint:** `POST /v3/crud/{obj_type}/search` +* **Security:** Automatically filters results to only show records belonging to your `x-account-id`. If no account context is provided, it will return **0 records** for private objects. --- -## 5. Event File Management (Specialized V3 Actions) +## 4. Troubleshooting 403 Forbidden -While `hosted_file` handles generic storage, `event_file` actions are context-aware and atomic for the Event module. - -### A. Atomic Upload Action -**Path**: `POST /v3/action/event_file/upload` -**Format**: `multipart/form-data` - -| Field | Type | Required | Description | -| :--- | :--- | :--- | :--- | -| `file_list` | File[] | Yes | Files to upload. | -| `account_id`| String | Yes | Owner account. | -| `for_type` | String | Yes | Parent object type (e.g., `event_session`). | -| `for_id` | String | Yes | Random ID of the parent object. | -| `event_id` | String | No | Optional event context. | -| `title` | String | No | Display title for the file. | - -**Features:** -- **Atomic Creation:** Automatically creates the `hosted_file`, the `hosted_file_link`, AND the `event_file` association in one request. -- **Intelligent Updates:** If the same file is uploaded again for the same object, it updates the metadata instead of creating a duplicate association. -- **Enriched Return:** Returns full `event_file` objects with nested `hosted_file` data. - -### B. Specialized Download -**Path**: `GET /v3/action/event_file/{id}/download` -*Semantic alias for the universal hosted_file downloader.* - -### C. Link Existing Hosted File -**Path**: `POST /v3/action/event_file/from_hosted_file/{hosted_file_id}` - -Use this when a file is already in standard storage (e.g., from a previous session or general archive) and you want to link it to an Event object without re-uploading. - -| Parameter | Type | Required | Description | -| :--- | :--- | :--- | :--- | -| `hosted_file_id` | Path | Yes | String ID of the physical file. | -| `for_type` | Body | Yes | Target object type (e.g., `event_session`). | -| `for_id` | Body | Yes | String ID of the target object. | -| `event_id` | Body | No | Optional event context. | - -**Features:** -- **Vision IDs:** Accepts string IDs in both the path and JSON body. -- **Relational Integrity:** Automatically creates both `hosted_file_link` and `event_file` records. -- **Enriched Return:** Returns the full `event_file` object. - ---- - -## 6. Hosted File Management (Legacy) - -The following endpoints are maintained for backward compatibility but should be migrated to V3 Actions. - -| Method | Endpoint | Description | -| :--- | :--- | :--- | -| `POST` | `/hosted_file/upload_files` | Legacy multi-upload. | -| `GET` | `/hosted_file/{id}/download` | Legacy download. | -| `GET` | `/hosted_file/{id}/stream` | Legacy buffered streamer. | -| `DELETE` | `/hosted_file/{id}` | Legacy deletion. | - ---- - -## 6. The "ID Vision" Standard (2026) - -V3 uses a **String-Only ID Vision**. The frontend NEVER handles or stores database integers. - -1. **Automatic Mapping:** Internal `id_random` fields are mapped to clean names (e.g., `id`, `event_id`, `account_id`) in JSON responses. -2. **Intelligent Resolution:** You can send random string IDs back to the API in any `*_id` field; the API resolves them to integers before database insertion. -3. **Suffix Compatibility:** The `_random` suffix (e.g., `event_id_random`) is maintained for backward compatibility with V2 clients. Standard V3 frontend integration should prioritize fields without the suffix. - ---- - -## 9. Real-Time Communication (V3 WebSockets) - -V3 WebSockets provide a granular, high-performance messaging layer. - -- **Guide**: [Aether API V3 WebSocket Integration Guide](./GUIDE__V3_FRONTEND_WEBSOCKETS.md) -- **Endpoint**: `ws://[api_domain]/v3/ws/group/{group_id}/client/{client_id}` -- **Key Requirement**: All messages must conform to the `WS_Message_V3` schema. - ---- - -## 10. Structured Error Handling - -V3 returns machine-readable error objects in `meta.details` for failures. - -### HTTP Status Codes -- **`400 Bad Request`**: Used for client-driven errors including invalid search fields, validation failures, and constraint violations. -- **`403 Forbidden`**: Missing or invalid API Key / Account Context. -- **`404 Not Found`**: Object ID does not exist. -- **`500 Internal Server Error`**: Unexpected server crash or database connection failure. - -### Common Error Categories -Found in `meta.details.category`: -- `database_duplicate`: Non-unique value (Code 1062). -> **400** -- `database_constraint`: Foreign key violation (Codes 1451, 1452). -> **400** -- `database_schema`: Invalid column name or missing field in the requested `view` (Codes 1054, 1146). -> **400** -- `validation`: Pydantic validation failed (Check `details` for field-specific errors). -> **400** - ---- - -## 8. Data Store Cascading Lookup (V3) - -V3 provides a specialized endpoint for retrieving configuration or content snippets by a human-friendly `code`. This uses a **hierarchical fallback** logic to find the "best fit" record for the current user and object context. - -### A. The V3 Lookup Endpoint -**Path:** `GET /v3/data_store/code/{code}` - -**Query Parameters:** -| Parameter | Type | Required | Description | -| :--- | :--- | :--- | :--- | -| `for_type` | String | No | Parent object type (e.g., `event`, `person`). | -| `for_id` | String | No | Parent object random ID (Vision ID). | -| `limit` | Integer | No | **Dynamic Return:** Default `1` (returns single object). If `> 1`, returns a list. | - -### B. Cascading Logic (The Hierarchy of Truth) -The API automatically resolves the most specific record available using the following priority: -**Object Override > Account Override > Global System Default.** - -| Specificity | `account_id` | `for_type` | `for_id` | `code` | `Result` | -| :--- | :--- | :--- | :--- | :--- | :--- | -| **Global** | `NULL` | `NULL` | `NULL` | `'site_config'` | System default (blue theme). | -| **Account** | `'abc123_rd'` | `NULL` | `NULL` | `'site_config'` | Acme Corp default (green theme). | -| **Object** | `'abc123_rd'` | `'event'` | `'evt_xyz_rd'` | `'site_config'` | Main Conference override (gold theme). | - -### C. Rules for Frontend Agents -1. **Vision IDs Only:** Always use random string IDs (e.g., `evt_xyz_rd`) for `for_id`. Never use database integers. -2. **Explicit Context:** If you are within an Event or Person context, always provide `for_type` and `for_id`. The API will handle the fallback if a specific record doesn't exist. -3. **Automatic JSON Parsing:** If the record `type` is `'json'`, the API returns a structured object/list under the `json` key. You do not need to call `JSON.parse()`. -4. **Handling the Return:** - * `limit=1` (Default): Returns a single **Object** in `data`. - * `limit>1`: Returns a **List** of objects in `data`. - -### D. Example Implementation -```ts -// GET /v3/data_store/code/event_launcher_main_info?for_type=event&for_id=nmBfuGFeR0k&limit=1 -export async function get_data_store_v3({ api_cfg, code, for_type, for_id, limit = 1 }) { - const endpoint = `/v3/data_store/code/${code}`; - const params = { for_type, for_id, limit }; - return await get_object({ api_cfg, endpoint, params }); -} -``` +If you receive a 403 on a valid ID: +1. Verify `x-aether-api-key` is correct. +2. Ensure you are sending `x-account-id` and NOT `x-aether-api-token`. +3. Verify the record actually belongs to the account ID you are sending. +4. Check if the object is marked `public_read: True` in the registry. (Posts and Archive Content allow guest access; Journals and Badges do not). diff --git a/tests/e2e/test_e2e_v3_security_audit.py b/tests/e2e/test_e2e_v3_security_audit.py new file mode 100644 index 0000000..553d8f3 --- /dev/null +++ b/tests/e2e/test_e2e_v3_security_audit.py @@ -0,0 +1,95 @@ +import requests +import json +import time + +# --- Configuration --- +API_ROOT = "https://dev-api.oneskyit.com" +API_KEY = "dFP6J9DVj9hUgIMn-fNIqg" + +# Test Matrix: (obj_type, id_random, is_public) +TEST_OBJECTS = [ + ("journal", "SWFK-48-89-90", False), + ("journal_entry", "w1p-OE-Hm-zz", False), + ("event", "aJ0KmgvU62Q", True), + ("post", "-qTmbMlEjAY", True), +] + +# Primary Account Context for Auth tests (Account 1) +ACCOUNT_A_RAND = "_XY7DXtc9MY" +# Secondary Account Context (Account 2 - assume this exists and doesn't own Account 1 records) +ACCOUNT_B_RAND = "nqOzejLCDXM" + +def print_result(label, success, message=""): + status = "✅ PASS" if success else "❌ FAIL" + print(f"[{status}] {label} {message}") + +def check_id_vision(data, path=""): + """Recursively check that no integers are leaked in ID-named fields.""" + leaks = [] + if isinstance(data, dict): + for k, v in data.items(): + current_path = f"{path}.{k}" if path else k + # Check fields that look like IDs + if k == "id" or k.endswith("_id") or k.endswith("_id_random"): + if isinstance(v, int): + leaks.append(f"{current_path}: {v} (Type: int)") + # Recurse + leaks.extend(check_id_vision(v, current_path)) + elif isinstance(data, list): + for i, item in enumerate(data): + leaks.extend(check_id_vision(item, f"{path}[{i}]")) + return leaks + +def run_security_audit(): + print("====================================================") + print(" V3 COMPREHENSIVE SECURITY & VISION AUDIT ") + print(f" Target: {API_ROOT}") + print("====================================================") + + for obj_type, obj_id, is_public in TEST_OBJECTS: + print(f"\n--- Testing Object Type: {obj_type} ---") + + # 1. READ LEAK CHECK (No context) + url = f"{API_ROOT}/v3/crud/{obj_type}/{obj_id}" + headers_unauth = {"x-aether-api-key": API_KEY} + resp_unauth = requests.get(url, headers=headers_unauth) + + if is_public: + print_result(f"Unauth GET {obj_type}", resp_unauth.status_code == 200) + else: + print_result(f"Unauth GET {obj_type}", resp_unauth.status_code == 403, f"- Blocked: {resp_unauth.json().get('meta', {}).get('status_message')}") + + # 2. VISION COMPLIANCE CHECK (With context) + headers_auth = {"x-aether-api-key": API_KEY, "x-account-id": ACCOUNT_A_RAND} + resp_auth = requests.get(url, headers=headers_auth) + + if resp_auth.status_code == 200: + data = resp_auth.json().get('data', {}) + leaks = check_id_vision(data) + print_result(f"Vision Compliance {obj_type}", len(leaks) == 0, f"- Leaks: {leaks if leaks else 'None'}") + else: + print_result(f"Auth GET {obj_type}", False, f"- Failed to get record for Vision check: {resp_auth.status_code}") + + # 3. WRITE ISOLATION CHECK (PATCH someone else's record) + if not is_public: + headers_wrong_account = {"x-aether-api-key": API_KEY, "x-account-id": ACCOUNT_B_RAND} + # Attempt to rename a journal that doesn't belong to Account B + resp_patch = requests.patch(url, headers=headers_wrong_account, json={"notes": "Hacked!"}) + print_result(f"Cross-Account Write Block {obj_type}", resp_patch.status_code == 403, f"- Status: {resp_patch.status_code}") + + # 4. SEARCH LEAK CHECK (Wide open search) + print("\n--- Search Leakage Audit ---") + resp_search = requests.post(f"{API_ROOT}/v3/crud/journal/search", headers=headers_unauth, json={"and": []}) + count = len(resp_search.json().get('data', [])) + print_result("Search Leakage (Journal)", count == 0, f"- Found {count} records (Expected 0)") + +if __name__ == "__main__": + start_time = time.time() + try: + run_security_audit() + except Exception as e: + print(f"\n❌ AUDIT CRASHED: {e}") + + print("\n====================================================") + print(f"Audit completed in {time.time() - start_time:.2f}s") + print("====================================================") diff --git a/tests/e2e/verify_v3_security_hardening.py b/tests/e2e/verify_v3_security_hardening.py new file mode 100644 index 0000000..1e640db --- /dev/null +++ b/tests/e2e/verify_v3_security_hardening.py @@ -0,0 +1,90 @@ +import requests +import json +import sys +import os + +# --- Configuration --- +API_ROOT = "https://dev-api.oneskyit.com" +# Using the key provided in your examples +API_KEY = "dFP6J9DVj9hUgIMn-fNIqg" +# A known private journal ID from account 1 +PRIVATE_JOURNAL_ID = "SWFK-48-89-90" +# A known public object type/ID +PUBLIC_FQDN = "dev-app.oneskyit.com" +# A known valid account ID random for testing restoration of access +VALID_ACCOUNT_ID_RAND = "_XY7DXtc9MY" + +def print_result(label, success, message=""): + status = "✅ PASS" if success else "❌ FAIL" + print(f"[{status}] {label} {message}") + +def test_hardened_search_leak(): + """Verify that search NO LONGER leaks private data when account_id is missing.""" + print("\n--- Test 1: Global Leak Prevention (Search) ---") + url = f"{API_ROOT}/v3/crud/journal/search" + headers = {"x-aether-api-key": API_KEY} + # NO account header, NO JWT + payload = {"and": []} + + resp = requests.post(url, headers=headers, json=payload) + + if resp.status_code == 200: + data = resp.json().get('data', []) + # Should be 0 because all journals in DB have an account_id, + # and we are now strictly filtering for account_id IS NULL. + success = (len(data) == 0) + print_result("Leak Blocked (Journal Search)", success, f"- Found {len(data)} records (Expected 0)") + else: + print_result("Leak Blocked (Journal Search)", False, f"- Unexpected Status: {resp.status_code}") + +def test_strict_id_block(): + """Verify that GET ID correctly blocks private records when account_id is missing.""" + print("\n--- Test 2: Strict Access Control (GET ID) ---") + url = f"{API_ROOT}/v3/crud/journal/{PRIVATE_JOURNAL_ID}" + headers = {"x-aether-api-key": API_KEY} + # NO account header, NO JWT + + resp = requests.get(url, headers=headers) + + success = (resp.status_code == 403) + print_result("Access Denied (Journal GET ID)", success, f"- Status: {resp.status_code} (Expected 403)") + +def test_bootstrap_exception(): + """Verify that public_read objects still work for bootstrapping.""" + print("\n--- Test 3: Bootstrap Exception (Public Read) ---") + url = f"{API_ROOT}/v3/crud/site_domain/search" + headers = {"x-aether-api-key": API_KEY} + payload = {"and": [{"field": "fqdn", "op": "eq", "value": PUBLIC_FQDN}]} + + resp = requests.post(url, headers=headers, json=payload) + + success = (resp.status_code == 200 and len(resp.json().get('data', [])) > 0) + print_result("Bootstrap Allowed (Site Domain)", success, f"- Status: {resp.status_code}") + +def test_restored_access(): + """Verify that providing the correct x-account-id restores access.""" + print("\n--- Test 4: Restored Access (With Context) ---") + url = f"{API_ROOT}/v3/crud/journal/{PRIVATE_JOURNAL_ID}" + headers = { + "x-aether-api-key": API_KEY, + "x-account-id": VALID_ACCOUNT_ID_RAND + } + + resp = requests.get(url, headers=headers) + + success = (resp.status_code == 200) + print_result("Access Restored (Journal with Header)", success, f"- Status: {resp.status_code}") + +if __name__ == "__main__": + print(f"Starting V3 Security Hardening Verification") + print(f"Target: {API_ROOT}") + + try: + test_hardened_search_leak() + test_strict_id_block() + test_bootstrap_exception() + test_restored_access() + except Exception as e: + print(f"\n❌ ERROR during test execution: {e}") + + print("\nVerification completed.")