from typing import Any, Dict, Optional, Union import json import logging import re from app.lib_general_v3 import AccountContext, StatusFilterParams from app.models.error_models import StandardError log = logging.getLogger(__name__) def format_db_error(raw_error: str) -> StandardError: """ Parses raw SQLAlchemy/MariaDB errors into structured StandardError objects. """ if not raw_error: return StandardError( category="unknown", message="An unspecified database error occurred." ) # 1. Extract Error Code and Message using regex # Standard MariaDB pattern: (code, "message") code = None message = raw_error recoverable = False match = re.search(r'\((\d+),\s*["\'](.*?)["\']\s*\)', raw_error) if match: code = int(match.group(1)) message = match.group(2).strip() else: # Fallback: remove all (parenthesized) blocks which often contain codes message = re.sub(r'\(.*?\)', '', raw_error).strip() # 2. Categorize based on known MariaDB codes # Ref: https://mariadb.com/kb/en/mariadb-error-codes/ if code in [1062]: # Duplicate Entry category = "database_duplicate" elif code in [1451, 1452]: # Foreign Key Constraint category = "database_constraint" elif code in [1045, 2002, 2003, 2006]: # Connection / Auth issues category = "database_connection" recoverable = True elif code in [1054, 1146]: # Unknown column / Table category = "database_schema" else: category = "database" return StandardError( category=category, code=code, message=message, recoverable=recoverable, details=raw_error if category == "database" else None # Only include raw details for uncategorized errors ) def check_account_access(sql_result: Any, account: AccountContext, obj_name: str = None) -> bool: """ Enforce Multi-Tenant Data Isolation. Verifies that the requested record belongs to the authenticated user's account. Returns True if: - User is a Super User or System (Bypass). - The record's `account_id` matches the user's `account_id`. """ if account.super or account.auth_method == 'bypass': return True if not account.account_id: return False res_account_id = None if isinstance(sql_result, dict): if obj_name == 'account': res_account_id = sql_result.get('id') else: res_account_id = sql_result.get('account_id') if res_account_id is not None and res_account_id != account.account_id: return False return True def apply_forced_account_filter(and_qry_dict: Optional[Dict], account: AccountContext, model: Any, obj_name: str, table_name: str = None) -> Dict: """ Secure Search Filtering. Automatically appends an `account_id` filter to database queries to ensure users only retrieve records associated with their own account. Now schema-aware: checks if the column actually exists in the DB before applying. """ forced = and_qry_dict or {} if account.super or account.auth_method == 'bypass': return forced # 1. Determine the target column target_col = 'account_id' if obj_name == 'account': target_col = 'id' # 2. Check if the model even supports it if model and hasattr(model, '__fields__') and target_col not in model.__fields__: return forced # 3. If we have a table name, verify the column exists in the physical DB schema # (Important for Views that might exclude account_id for performance/privacy) if table_name: from app import lib_sql_core from sqlalchemy import text try: with lib_sql_core.engine.connect() as conn: conn.execute(text(f"SELECT `{target_col}` FROM `{table_name}` LIMIT 0")) has_col = True except: has_col = False if not has_col: return forced # CRITICAL: Always apply the filter. If account_id is None, it filters for NULL. forced[target_col] = account.account_id return forced def filter_order_by(order_by_li: Any, model: Any, table_name: str = None) -> Optional[Dict[str, str]]: """ Sanitize Sorting Parameters. Prevents SQL injection and logic errors by validating that requested sort columns actually exist in the Pydantic model and/or the database table. """ if not order_by_li or not isinstance(order_by_li, dict) or not model: return order_by_li if not hasattr(model, '__fields__'): return order_by_li model_fields = set(model.__fields__.keys()) model_fields.update({f.alias for f in model.__fields__.values() if f.alias}) filtered = {k: v for k, v in order_by_li.items() if k in model_fields} if table_name and filtered: from app.db_sql import db from sqlalchemy import text final_filtered = {} for column in filtered: try: # Lightweight check to see if column exists in SQL db.execute(text(f"SELECT `{column}` FROM `{table_name}` LIMIT 0")) final_filtered[column] = filtered[column] except Exception: pass filtered = final_filtered return filtered def get_supported_filters(model: Any, status_filter: StatusFilterParams) -> StatusFilterParams: """ Adaptive Status Filtering. Adjusts the default filters (enabled/hidden) based on whether the target object actually supports those concepts (i.e., has those columns). """ if not model or not hasattr(model, "__fields__"): return status_filter # We create a new instance to avoid side effects on the dependency object from app.routers.dependencies_v3 import StatusFilterParams as SF adjusted = SF() adjusted.enabled = status_filter.enabled adjusted.hidden = status_filter.hidden if 'enable' not in model.__fields__: adjusted.enabled = 'all' if 'hide' not in model.__fields__: adjusted.hidden = 'all' return adjusted def safe_json_loads(json_str: Optional[str]) -> Any: if not json_str or json_str == 'undefined': return None try: return json.loads(json_str) except: return None def sanitize_payload(data: dict, model: Any, ignore_extra: bool = False) -> None: """ Sanitizes an input payload before database insertion or update. 1. Resolves ID strings to integers: - Handles legacy `*_id_random` fields. - Handles Vision `*_id` fields where the value is a string (e.g., account_id: "random_str"). 2. Removes virtual lookup fields (ending in `_id_random`) after resolution. 3. Removes fields explicitly marked for exclusion in the model's `fields_to_exclude_from_db` ClassVar (e.g., view-only fields). 4. If `ignore_extra` is True, removes all fields NOT present in the model definition. Modifies the `data` dictionary in-place. """ if not isinstance(data, dict): return from app.db_sql import redis_lookup_id_random # Resolve ID strings to integers for k, v in list(data.items()): if not v or not isinstance(v, str): continue target_id_field = None obj_type_lookup = None # Scenario A: Legacy suffix (e.g., account_id_random: "abc") if k.endswith('_id_random') and k != 'id_random': target_id_field = k.replace('_id_random', '_id') obj_type_lookup = k.replace('_id_random', '') # Scenario B: Vision naming (e.g., account_id: "abc") # We only resolve if it's a string of the correct length (random ID format) elif k.endswith('_id') and 11 <= len(v) <= 22: if k == 'external_person_id': continue target_id_field = k obj_type_lookup = k.replace('_id', '') if target_id_field and obj_type_lookup: # Special table mapping if needed if obj_type_lookup == 'address_location': obj_type_lookup = 'address' resolved_id = redis_lookup_id_random(record_id_random=v, table_name=obj_type_lookup) if resolved_id: data[target_id_field] = resolved_id # If we were handling Scenario A, remove the original random key if k.endswith('_id_random'): del data[k] # Filter out model-specific excluded fields (e.g., view-only fields) if hasattr(model, 'fields_to_exclude_from_db'): for k in model.fields_to_exclude_from_db: if k in data: del data[k] # If permissive mode is on, remove any field not in the Pydantic model if ignore_extra and model and hasattr(model, '__fields__'): model_fields = set(model.__fields__.keys()) # Also check for aliases for f in model.__fields__.values(): if f.alias: model_fields.add(f.alias) extra_keys = [k for k in data.keys() if k not in model_fields] for k in extra_keys: del data[k]