diff --git a/app/lib_general_v3.py b/app/lib_general_v3.py index b7cb0da..ef092bd 100644 --- a/app/lib_general_v3.py +++ b/app/lib_general_v3.py @@ -64,6 +64,9 @@ class AuthContext(BaseModel): account_id_random: Optional[str] = None user_id: Optional[int] = None person_id: Optional[int] = None + administrator: bool = False + manager: bool = False + super: bool = False auth_method: str = 'none' # 'jwt_header', 'jwt_query', 'legacy_header', 'bypass' # Alias for backward compatibility with initial V3 implementation @@ -83,8 +86,9 @@ def get_v3_auth_context( Supports JWT in Authorization header (Bearer) OR 'jwt' query parameter. Falls back to legacy headers for backward compatibility. """ - # Defer import to break circular dependency + # Defer imports to break circular dependency from app.db_sql import redis_lookup_id_random + from app.methods.user_methods import load_user_obj # 1. Check for JWT (Header preferred, then Query for downloads) token = None @@ -101,13 +105,28 @@ def get_v3_auth_context( payload = decode_jwt(settings.JWT_KEY, token) if payload: logger.info(f"JWT Validated ({method}). User: {payload.get('user_id')}, Account: {payload.get('account_id')}") - return AuthContext( + + # Initialize AuthContext + ctx = AuthContext( account_id=payload.get('account_id'), account_id_random=payload.get('public_key'), # existing sign_jwt uses public_key for id_random user_id=payload.get('user_id'), person_id=payload.get('person_id'), auth_method=method ) + + # Populate roles if user_id is present + if ctx.user_id: + try: + user = load_user_obj(user_id=ctx.user_id) + if user: + ctx.administrator = getattr(user, 'administrator', False) + ctx.manager = getattr(user, 'manager', False) + ctx.super = getattr(user, 'super', False) + except Exception as e: + logger.warning(f"Failed to load user roles for user {ctx.user_id}: {e}") + + return ctx else: logger.warning(f"Invalid or expired JWT provided via {method}") raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid or expired authentication token.") @@ -129,7 +148,10 @@ def get_v3_auth_context( logger.info("Authentication bypassed via X-No-Account-ID") return AuthContext( account_id_random='--- NO ACCOUNT ---', - auth_method='bypass' + auth_method='bypass', + administrator=True, # Bypass usually implies admin for dev/utility + manager=True, + super=True ) # 4. No Auth Found diff --git a/app/routers/api_crud_v3.py b/app/routers/api_crud_v3.py index c115e8e..3adedc5 100644 --- a/app/routers/api_crud_v3.py +++ b/app/routers/api_crud_v3.py @@ -22,6 +22,42 @@ from app.db_sql import redis_lookup_id_random, sql_select, sql_insert, sql_updat router = APIRouter() +def check_account_access(sql_result: Any, account: AccountContext, obj_name: str = None) -> bool: + """ + Verifies if the current account context has access to the given SQL result. + Always returns True for super users. + """ + if account.super or account.auth_method == 'bypass': + return True + if not account.account_id: + return False + + res_account_id = None + if isinstance(sql_result, dict): + if obj_name == 'account': + res_account_id = sql_result.get('id') + else: + res_account_id = sql_result.get('account_id') + + if res_account_id is not None and res_account_id != account.account_id: + return False + return True + +def apply_forced_account_filter(and_qry_dict: Optional[Dict], account: AccountContext, model: Any, obj_name: str) -> Dict: + """ + Adds a forced account_id filter to the query dictionary if the user is not a super user. + """ + forced = and_qry_dict or {} + if account.super or account.auth_method == 'bypass': + return forced + + if obj_name == 'account': + forced['id'] = account.account_id + elif model and hasattr(model, '__fields__') and 'account_id' in model.__fields__: + forced['account_id'] = account.account_id + + return forced + def safe_json_loads(json_str: Optional[str]) -> Any: """ Safely load a JSON string, handling 'undefined' or invalid formats @@ -226,6 +262,10 @@ async def get_obj( return mk_resp(data=False, status_code=404, response=response, status_message=f"Object with ID '{obj_id}' not found.") if sql_result := sql_select(table_name=table_name, record_id=record_id): + # Security Check + if not check_account_access(sql_result, account, obj_name): + return mk_resp(data=False, status_code=403, response=response, status_message="Access denied.") + resp_data = base_name(**sql_result).dict( by_alias=serialization.by_alias, exclude_unset=serialization.exclude_unset, @@ -287,6 +327,11 @@ async def get_obj_li( if obj_name == 'site' and not (for_obj_type == 'account' and for_obj_id): return mk_resp(data=False, status_code=403, response=response, status_message="Listing sites is only permitted when filtered by account.") + # Explicit Account isolation + if for_obj_type == 'account' and for_obj_id: + if not account.super and for_obj_id != account.account_id_random: + return mk_resp(data=False, status_code=403, response=response, status_message="Access denied to requested account.") + obj_cfg = obj_type_kv_li[obj_name] table_name = obj_cfg.get(f'tbl_{view}', obj_cfg.get('tbl_default', obj_cfg.get('tbl'))) base_name = obj_cfg.get(f'mdl_{view}', obj_cfg.get('mdl_default', obj_cfg.get('mdl'))) @@ -296,6 +341,9 @@ async def get_obj_li( order_by_li = filter_order_by(order_by_li, base_name, table_name) status_filter = get_supported_filters(base_name, status_filter) + + # Forced account isolation + and_qry_dict_obj = apply_forced_account_filter(and_qry_dict_obj, account, base_name, obj_name) if for_obj_type and for_obj_id: resolved_for_obj_id = redis_lookup_id_random(record_id_random=for_obj_id, table_name=for_obj_type) @@ -395,6 +443,19 @@ async def search_obj_li( status_filter = get_supported_filters(base_name, status_filter) searchable_fields = obj_cfg.get('searchable_fields') + # Explicit Account isolation + if for_obj_type == 'account' and for_obj_id: + if not account.super and for_obj_id != account.account_id_random: + return mk_resp(data=False, status_code=403, response=response, status_message="Access denied to requested account.") + + # Forced account isolation for the search query itself + # We inject the account_id into the SearchQuery object if it's not a super user + if not account.super and account.auth_method != 'bypass' and account.account_id: + if obj_name == 'account': + search_query.and_filter['id'] = account.account_id + elif base_name and hasattr(base_name, '__fields__') and 'account_id' in base_name.__fields__: + search_query.and_filter['account_id'] = account.account_id + if for_obj_type and for_obj_id: resolved_for_obj_id = redis_lookup_id_random(record_id_random=for_obj_id, table_name=for_obj_type) if not resolved_for_obj_id: @@ -474,6 +535,16 @@ async def post_obj( if not table_name_insert or not input_model or not table_name_select or not output_model: return mk_resp(data=False, status_code=500, response=response, status_message=f"Configuration for object type '{obj_name}' is incomplete.") + # Security: Force account_id for non-super users if model supports it + if not account.super and account.auth_method != 'bypass' and account.account_id: + if 'account_id' in input_model.__fields__: + obj_data['account_id'] = account.account_id + elif obj_name == 'account': + # Users can't create accounts unless they are super (usually) + # But if they can, it should be their own ID? + # Actually, account creation is usually a super-admin task. + return mk_resp(data=False, status_code=403, response=response, status_message="Account creation is restricted to super administrators.") + try: validated_obj = input_model(**obj_data) except Exception as e: @@ -537,6 +608,13 @@ async def patch_obj( if not record_id: return mk_resp(data=False, status_code=404, response=response, status_message=f"Object with ID '{obj_id}' not found.") + # Security Check + if existing_obj := sql_select(table_name=table_name_select, record_id=record_id): + if not check_account_access(existing_obj, account, obj_name): + return mk_resp(data=False, status_code=403, response=response, status_message="Access denied.") + else: + return mk_resp(data=False, status_code=404, response=response, status_message=f"Object with ID '{obj_id}' not found in database.") + data_to_update = obj_data if sql_update_result := sql_update(data=data_to_update, table_name=table_name_update, record_id=record_id): @@ -576,6 +654,7 @@ async def delete_obj( obj_cfg = obj_type_kv_li[obj_name] table_name_delete = obj_cfg.get('tbl_update', obj_cfg.get('tbl')) + table_name_select = obj_cfg.get('tbl_default', obj_cfg.get('tbl')) if not table_name_delete: return mk_resp(data=False, status_code=500, response=response, status_message=f"Configuration for object type '{obj_name}' is incomplete.") @@ -584,6 +663,13 @@ async def delete_obj( if not record_id: return mk_resp(data=False, status_code=404, response=response, status_message=f"Object with ID '{obj_id}' not found.") + # Security Check + if existing_obj := sql_select(table_name=table_name_select, record_id=record_id): + if not check_account_access(existing_obj, account, obj_name): + return mk_resp(data=False, status_code=403, response=response, status_message="Access denied.") + else: + return mk_resp(data=False, status_code=404, response=response, status_message=f"Object with ID '{obj_id}' not found in database.") + if method == 'hide': if sql_update(table_name=table_name_delete, record_id=record_id, data={'hide': True}): return mk_resp(data=True, response=response, status_message=f"Object with ID '{obj_id}' hidden successfully.") @@ -645,8 +731,8 @@ async def get_child_obj_li( order_by_li = safe_json_loads(order_by_li) obj_name = child_obj_type - if obj_name not in obj_type_kv_li: - return mk_resp(data=False, status_code=400, response=response, status_message=f"Object type '{obj_name}' not found.") + if obj_name not in obj_type_kv_li or parent_obj_type not in obj_type_kv_li: + return mk_resp(data=False, status_code=400, response=response, status_message=f"Invalid object type(s).") obj_cfg = obj_type_kv_li[obj_name] table_name = obj_cfg.get('tbl_default', obj_cfg.get('tbl')) @@ -662,6 +748,18 @@ async def get_child_obj_li( if not resolved_parent_id: return mk_resp(data=False, status_code=404, response=response, status_message=f"Parent object '{parent_obj_type}' with ID '{parent_obj_id}' not found.") + # Parent Security Check + parent_cfg = obj_type_kv_li[parent_obj_type] + parent_table = parent_cfg.get('tbl_default', parent_cfg.get('tbl')) + if parent_sql_res := sql_select(table_name=parent_table, record_id=resolved_parent_id): + if not check_account_access(parent_sql_res, account, parent_obj_type): + return mk_resp(data=False, status_code=403, response=response, status_message="Access denied to parent object.") + else: + return mk_resp(data=False, status_code=404, response=response, status_message="Parent object not found.") + + # Child security: Forced account isolation if child supports it + and_qry_dict_obj = apply_forced_account_filter(and_qry_dict_obj, account, base_name, obj_name) + sql_result = sql_select( table_name=table_name, field_name=f'{parent_obj_type}_id', @@ -724,12 +822,26 @@ async def post_child_obj( if not resolved_parent_id: return mk_resp(data=False, status_code=404, response=response, status_message="Parent not found.") + # Parent Security Check + parent_cfg = obj_type_kv_li[parent_obj_type] + parent_table = parent_cfg.get('tbl_default', parent_cfg.get('tbl')) + if parent_sql_res := sql_select(table_name=parent_table, record_id=resolved_parent_id): + if not check_account_access(parent_sql_res, account, parent_obj_type): + return mk_resp(data=False, status_code=403, response=response, status_message="Access denied to parent object.") + else: + return mk_resp(data=False, status_code=404, response=response, status_message="Parent object not found.") + obj_cfg = obj_type_kv_li[child_obj_type] table_name_insert = obj_cfg.get('tbl_update', obj_cfg.get('tbl')) table_name_select = obj_cfg.get('tbl_default', obj_cfg.get('tbl')) input_model = obj_cfg.get('mdl_in', obj_cfg.get('mdl')) output_model = obj_cfg.get('mdl_out', obj_cfg.get('mdl_default', obj_cfg.get('mdl'))) + # Security: Force account_id for non-super users if child supports it + if not account.super and account.auth_method != 'bypass' and account.account_id: + if 'account_id' in input_model.__fields__: + obj_data['account_id'] = account.account_id + obj_data[f'{parent_obj_type}_id'] = resolved_parent_id try: diff --git a/documentation/V3_CRUD_ARCHITECTURE_AND_LEARNINGS.md b/documentation/V3_CRUD_ARCHITECTURE_AND_LEARNINGS.md index 4d02fe1..6b43aa0 100644 --- a/documentation/V3_CRUD_ARCHITECTURE_AND_LEARNINGS.md +++ b/documentation/V3_CRUD_ARCHITECTURE_AND_LEARNINGS.md @@ -55,3 +55,21 @@ The following objects have been migrated to the modern V3 configuration and are - `hosted_file`, `post`, `post_comment` - `person`, `user` - `lu_country`, `lu_country_subdivision`, `lu_time_zone` + +## 5. Security and Data Isolation + +Implemented in Jan 2026, the V3 architecture enforces strict data isolation and role-aware authorization. + +### Role-Aware Authorization (`AuthContext`) +- **Deferred User Lookup**: When a JWT is provided, the API performs a deferred database lookup via `load_user_obj` to populate `administrator`, `manager`, and `super` flags. +- **Role Scoping**: These flags are used to bypass account isolation for support and system maintenance tasks. + +### Multi-Tenant Isolation +- **Forced Account Filtering**: For all non-super users, the API automatically injects an `account_id` filter into every list and search query. This is enforced at the SQL level via `apply_forced_account_filter`. +- **Post-Retrieval Verification**: Single object retrievals (`GET`), updates (`PATCH`), and deletions (`DELETE`) include a secondary ownership check (`check_account_access`). If a user attempts to access an ID belonging to another account, they receive a `403 Forbidden` response. +- **Hierarchical Verification**: Child/Nested endpoints verify the ownership of the parent object before allowing operations on children, preventing cross-tenant "sideways" traversal. +- **Creation Guard**: When creating records (`POST`), the user's `account_id` is automatically forced onto the new record, preventing a user from creating data for another account. + +### Bypass and Utility Access +- **X-No-Account-ID**: A specialized header used by development utilities and administrative scripts to bypass standard account isolation. This header grants `super` permissions and should only be used in trusted internal environments. +- **JWT Query Parameter**: Supported for direct file downloads and sharing links where custom headers cannot be provided.