- Added 'x-ae-ignore-extra-fields' header to support stripping unknown fields in POST/PATCH. - Added automatic resolution of '*_id_random' strings to integer IDs in 'sanitize_payload'. - Refactored 'post_obj' to return structured (field -> message) validation errors in 'meta.details'. - Updated 'mk_resp' to support non-string 'details' in response metadata. - Added 'tests/verify_feedback_fixes.py' to validate logic changes. Ref: V3 API Refinement Feedback from mcp_agent.
177 lines
6.6 KiB
Python
177 lines
6.6 KiB
Python
from typing import Any, Dict, Optional
|
|
import json
|
|
import logging
|
|
import re
|
|
|
|
from app.lib_general_v3 import AccountContext, StatusFilterParams
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
def format_db_error(raw_error: str) -> str:
|
|
"""
|
|
Parses raw SQLAlchemy/MariaDB errors into user-friendly strings.
|
|
"""
|
|
if not raw_error:
|
|
return ""
|
|
|
|
# Standard MariaDB pattern: (code, "message")
|
|
match = re.search(r'\(\d+,\s*["\'](.*?)["\']\s*\)', raw_error)
|
|
if match:
|
|
return match.group(1).strip()
|
|
|
|
# Fallback: remove all (parenthesized) blocks which often contain codes
|
|
clean = re.sub(r'\(.*?\)', '', raw_error)
|
|
return clean.strip()
|
|
|
|
def check_account_access(sql_result: Any, account: AccountContext, obj_name: str = None) -> bool:
|
|
"""
|
|
Enforce Multi-Tenant Data Isolation.
|
|
|
|
Verifies that the requested record belongs to the authenticated user's account.
|
|
Returns True if:
|
|
- User is a Super User or System (Bypass).
|
|
- The record's `account_id` matches the user's `account_id`.
|
|
"""
|
|
if account.super or account.auth_method == 'bypass':
|
|
return True
|
|
if not account.account_id:
|
|
return False
|
|
|
|
res_account_id = None
|
|
if isinstance(sql_result, dict):
|
|
if obj_name == 'account':
|
|
res_account_id = sql_result.get('id')
|
|
else:
|
|
res_account_id = sql_result.get('account_id')
|
|
|
|
if res_account_id is not None and res_account_id != account.account_id:
|
|
return False
|
|
return True
|
|
|
|
def apply_forced_account_filter(and_qry_dict: Optional[Dict], account: AccountContext, model: Any, obj_name: str) -> Dict:
|
|
"""
|
|
Secure Search Filtering.
|
|
|
|
Automatically appends an `account_id` filter to database queries to ensure
|
|
users only retrieve records associated with their own account.
|
|
"""
|
|
forced = and_qry_dict or {}
|
|
if account.super or account.auth_method == 'bypass':
|
|
return forced
|
|
|
|
if obj_name == 'account':
|
|
forced['id'] = account.account_id
|
|
elif model and hasattr(model, '__fields__') and 'account_id' in model.__fields__:
|
|
forced['account_id'] = account.account_id
|
|
|
|
return forced
|
|
|
|
def filter_order_by(order_by_li: Any, model: Any, table_name: str = None) -> Optional[Dict[str, str]]:
|
|
"""
|
|
Sanitize Sorting Parameters.
|
|
|
|
Prevents SQL injection and logic errors by validating that requested sort columns
|
|
actually exist in the Pydantic model and/or the database table.
|
|
"""
|
|
if not order_by_li or not isinstance(order_by_li, dict) or not model:
|
|
return order_by_li
|
|
if not hasattr(model, '__fields__'):
|
|
return order_by_li
|
|
|
|
model_fields = set(model.__fields__.keys())
|
|
model_fields.update({f.alias for f in model.__fields__.values() if f.alias})
|
|
filtered = {k: v for k, v in order_by_li.items() if k in model_fields}
|
|
|
|
if table_name and filtered:
|
|
from app.db_sql import db
|
|
from sqlalchemy import text
|
|
final_filtered = {}
|
|
for column in filtered:
|
|
try:
|
|
# Lightweight check to see if column exists in SQL
|
|
db.execute(text(f"SELECT `{column}` FROM `{table_name}` LIMIT 0"))
|
|
final_filtered[column] = filtered[column]
|
|
except Exception:
|
|
pass
|
|
filtered = final_filtered
|
|
return filtered
|
|
|
|
def get_supported_filters(model: Any, status_filter: StatusFilterParams) -> StatusFilterParams:
|
|
"""
|
|
Adaptive Status Filtering.
|
|
|
|
Adjusts the default filters (enabled/hidden) based on whether the target object
|
|
actually supports those concepts (i.e., has those columns).
|
|
"""
|
|
if not model or not hasattr(model, "__fields__"):
|
|
return status_filter
|
|
# We create a new instance to avoid side effects on the dependency object
|
|
from app.routers.dependencies_v3 import StatusFilterParams as SF
|
|
adjusted = SF()
|
|
adjusted.enabled = status_filter.enabled
|
|
adjusted.hidden = status_filter.hidden
|
|
|
|
if 'enable' not in model.__fields__:
|
|
adjusted.enabled = 'all'
|
|
if 'hide' not in model.__fields__:
|
|
adjusted.hidden = 'all'
|
|
return adjusted
|
|
|
|
def safe_json_loads(json_str: Optional[str]) -> Any:
|
|
if not json_str or json_str == 'undefined': return None
|
|
try: return json.loads(json_str)
|
|
except: return None
|
|
|
|
def sanitize_payload(data: dict, model: Any, ignore_extra: bool = False) -> None:
|
|
"""
|
|
Sanitizes an input payload before database insertion or update.
|
|
|
|
1. Resolves virtual lookup fields (`*_id_random`) into their integer database IDs.
|
|
2. Removes virtual lookup fields (ending in `_id_random`) that are used for API
|
|
convenience but do not exist in the database.
|
|
3. Removes fields explicitly marked for exclusion in the model's
|
|
`fields_to_exclude_from_db` ClassVar (e.g., view-only fields).
|
|
4. If `ignore_extra` is True, removes all fields NOT present in the model definition.
|
|
|
|
Modifies the `data` dictionary in-place.
|
|
"""
|
|
if not isinstance(data, dict):
|
|
return
|
|
|
|
from app.db_sql import redis_lookup_id_random
|
|
|
|
# Resolve virtual _id_random fields to integer IDs (e.g., account_id_random -> account_id)
|
|
# This must happen BEFORE we delete them.
|
|
for k, v in list(data.items()):
|
|
if k.endswith('_id_random') and k != 'id_random' and v:
|
|
target_id_field = k.replace('_id_random', '_id')
|
|
# Only resolve if the integer version is missing or null
|
|
if not data.get(target_id_field):
|
|
obj_type_lookup = k.replace('_id_random', '')
|
|
resolved_id = redis_lookup_id_random(record_id_random=v, table_name=obj_type_lookup)
|
|
if resolved_id:
|
|
data[target_id_field] = resolved_id
|
|
|
|
# Filter out virtual _id_random fields (e.g., account_id_random)
|
|
keys_to_remove = [k for k in data.keys() if k.endswith('_id_random') and k != 'id_random']
|
|
for k in keys_to_remove:
|
|
del data[k]
|
|
|
|
# Filter out model-specific excluded fields (e.g., view-only fields)
|
|
if hasattr(model, 'fields_to_exclude_from_db'):
|
|
for k in model.fields_to_exclude_from_db:
|
|
if k in data:
|
|
del data[k]
|
|
|
|
# If permissive mode is on, remove any field not in the Pydantic model
|
|
if ignore_extra and model and hasattr(model, '__fields__'):
|
|
model_fields = set(model.__fields__.keys())
|
|
# Also check for aliases
|
|
for f in model.__fields__.values():
|
|
if f.alias:
|
|
model_fields.add(f.alias)
|
|
|
|
extra_keys = [k for k in data.keys() if k not in model_fields]
|
|
for k in extra_keys:
|
|
del data[k]
|