Enhance V3 CRUD: Implement Error Bubbling and Dry-Run Validation.

- Updated app/db_sql.py to capture SQL exceptions in thread-local storage for later retrieval.
- Implemented format_db_error() in app/lib_api_crud_v3.py to clean up raw MariaDB error strings.
- Added POST /v3/crud/{obj_type}/validate endpoint for dry-run payload validation.
- Updated main and nested routers to bubble up validation and database errors into the response 'meta.details' field.
- Added tests/test_v3_error_bubbling.py to verify formatting logic.
This commit is contained in:
Scott Idem
2026-01-09 16:57:54 -05:00
parent 3885cc6aba
commit 4b86432381
5 changed files with 122 additions and 10 deletions

View File

@@ -1,4 +1,4 @@
import datetime, json, pytz, random, redis, secrets
import datetime, json, pytz, random, redis, secrets, threading
from typing import Any, List, Optional
from timeit import default_timer as timer
@@ -10,6 +10,19 @@ from sqlalchemy import create_engine, text, Time
from sqlalchemy.exc import IntegrityError, OperationalError, ProgrammingError
from sqlalchemy.pool import NullPool
# Thread-local storage for capturing last SQL error message
_sql_error_state = threading.local()
def get_last_sql_error() -> Optional[str]:
"""Retrieves and clears the last captured SQL error message."""
error = getattr(_sql_error_state, 'last_error', None)
_sql_error_state.last_error = None
return error
def set_last_sql_error(error: Any):
"""Sets the last captured SQL error message."""
_sql_error_state.last_error = str(error)
from app.lib_sql_search import (
sql_limit_offset_part as _sql_limit_offset_part,
sql_and_like_part as _sql_and_like_part,
@@ -114,11 +127,13 @@ def sql_insert(
trans.rollback()
log.error('Integrity error (likely duplicate). Returning None')
log.debug(e)
set_last_sql_error(e)
return None
except Exception as e:
trans.rollback()
log.error('Unknown exception in sql_insert. Returning False')
log.exception(e)
set_last_sql_error(e)
return False
else:
if result_insert.rowcount == 1 and result_insert.lastrowid > 0:
@@ -186,11 +201,13 @@ def sql_update(
try:
result_update = db.execute(sql_update_stmt, data)
trans.commit()
except Exception:
except Exception as e:
set_last_sql_error(e)
return False
except Exception as e:
trans.rollback()
log.exception(e)
set_last_sql_error(e)
return False
else:
if result_update.rowcount >= 1:

View File

@@ -1,11 +1,28 @@
from typing import Any, Dict, Optional
import json
import logging
import re
from app.lib_general_v3 import AccountContext, StatusFilterParams
log = logging.getLogger(__name__)
def format_db_error(raw_error: str) -> str:
"""
Parses raw SQLAlchemy/MariaDB errors into user-friendly strings.
"""
if not raw_error:
return ""
# Standard MariaDB pattern: (code, "message")
match = re.search(r'\(\d+,\s*["\'](.*?)["\']\s*\)', raw_error)
if match:
return match.group(1).strip()
# Fallback: remove all (parenthesized) blocks which often contain codes
clean = re.sub(r'\(.*?\)', '', raw_error)
return clean.strip()
def check_account_access(sql_result: Any, account: AccountContext, obj_name: str = None) -> bool:
"""
Enforce Multi-Tenant Data Isolation.

View File

@@ -15,9 +15,10 @@ from app.lib_general_v3 import (
)
from app.lib_api_crud_v3 import (
check_account_access, apply_forced_account_filter, filter_order_by,
get_supported_filters, safe_json_loads, sanitize_payload
get_supported_filters, safe_json_loads, sanitize_payload, format_db_error
)
from app.lib_schema_v3 import get_object_schema_info
from app.db_sql import get_last_sql_error
from app.models.response_models import *
from app.models.api_crud_models import SearchFilter, SearchQuery
from app.ae_obj_types_def import obj_type_kv_li
@@ -76,6 +77,33 @@ async def get_obj_schema(
return mk_resp(data=schema_info, response=response)
@router.post("/{obj_type}/validate", response_model=Resp_Body_Base, tags=['CRUD v3 Validation (Dev)'])
async def validate_obj_payload(
request: Request,
response: Response,
obj_type: str = Path(min_length=2, max_length=50),
account: AccountContext = Depends(get_account_context),
):
"""
Dry-Run Payload Validation.
Verifies that a payload is valid according to the Pydantic model
without performing any database operations.
"""
obj_data = await request.json()
if obj_type not in obj_type_kv_li:
return mk_resp(data=False, status_code=400, response=response, status_message=f"Object type '{obj_type}' not found.")
obj_cfg = obj_type_kv_li[obj_type]
input_model = obj_cfg.get('mdl_in', obj_cfg.get('mdl'))
try:
input_model(**obj_data)
return mk_resp(data=True, response=response, status_message="Payload is valid.")
except Exception as e:
return mk_resp(data=False, status_code=400, response=response, status_message="Validation Failed", details=str(e))
@router.get('/{obj_type_l1}/{obj_id}', response_model=Resp_Body_Base)
async def get_obj(
response: Response,
@@ -382,7 +410,7 @@ async def post_obj(
try:
validated_obj = input_model(**obj_data)
except Exception as e:
return mk_resp(data=False, status_code=400, response=response, status_message=f"Validation error: {e}")
return mk_resp(data=False, status_code=400, response=response, status_message="Validation Failed", details=str(e))
data_to_insert = validated_obj.dict(exclude_unset=True)
@@ -399,7 +427,8 @@ async def post_obj(
return mk_resp(data=resp_data, response=response)
return mk_resp(data={"obj_id": new_obj_id, "obj_id_random": new_obj_id_random}, response=response)
else:
return mk_resp(data=False, status_code=400, response=response, status_message="Failed to create object.")
db_err = format_db_error(get_last_sql_error())
return mk_resp(data=False, status_code=400, response=response, status_message="Failed to create object.", details=db_err)
@router.patch('/{obj_type_l1}/{obj_id}', response_model=Resp_Body_Base)
@@ -458,7 +487,8 @@ async def patch_obj(
return mk_resp(data=resp_data, response=response)
return mk_resp(data=True, response=response, status_message="Object updated successfully.")
else:
return mk_resp(data=False, status_code=400, response=response, status_message="Failed to update object.")
db_err = format_db_error(get_last_sql_error())
return mk_resp(data=False, status_code=400, response=response, status_message="Failed to update object.", details=db_err)
@router.delete('/{obj_type_l1}/{obj_id}', response_model=Resp_Body_Base)

View File

@@ -12,8 +12,9 @@ from app.lib_general_v3 import (
)
from app.lib_api_crud_v3 import (
check_account_access, apply_forced_account_filter, filter_order_by,
get_supported_filters, safe_json_loads, sanitize_payload
get_supported_filters, safe_json_loads, sanitize_payload, format_db_error
)
from app.db_sql import get_last_sql_error
from app.models.response_models import *
from app.ae_obj_types_def import obj_type_kv_li
@@ -178,7 +179,7 @@ async def post_child_obj(
try:
validated_obj = input_model(**obj_data)
except Exception as e:
return mk_resp(data=False, status_code=400, response=response, status_message=f"Validation error: {e}")
return mk_resp(data=False, status_code=400, response=response, status_message="Validation Failed", details=str(e))
data_to_insert = validated_obj.dict(exclude_unset=True)
@@ -195,7 +196,8 @@ async def post_child_obj(
return mk_resp(data=resp_data, response=response)
return mk_resp(data={"obj_id": new_obj_id, "obj_id_random": new_obj_id_random}, response=response)
else:
return mk_resp(data=False, status_code=400, response=response, status_message="Failed to create child object.")
db_err = format_db_error(get_last_sql_error())
return mk_resp(data=False, status_code=400, response=response, status_message="Failed to create child object.", details=db_err)
@router.get('/{parent_obj_type}/{parent_obj_id}/{child_obj_type}/{child_obj_id}', response_model=Resp_Body_Base)
@@ -286,7 +288,9 @@ async def patch_child_obj(
resp_data = output_model(**updated_child).dict(by_alias=serialization.by_alias, exclude_unset=serialization.exclude_unset)
return mk_resp(data=resp_data, response=response)
return mk_resp(data=True, response=response, status_message="Updated successfully.")
return mk_resp(data=False, status_code=400, response=response, status_message="Update failed.")
else:
db_err = format_db_error(get_last_sql_error())
return mk_resp(data=False, status_code=400, response=response, status_message="Update failed.", details=db_err)
@router.delete('/{parent_obj_type}/{parent_obj_id}/{child_obj_type}/{child_obj_id}', response_model=Resp_Body_Base)

View File

@@ -0,0 +1,44 @@
import sys
import os
import asyncio
from unittest.mock import MagicMock, AsyncMock
# --- Environment Setup ---
sys.modules['redis'] = MagicMock()
sys.modules['sqlalchemy'] = MagicMock()
sys.modules['sqlalchemy.text'] = MagicMock()
sys.modules['app.config'] = MagicMock()
sys.modules['app.log'] = MagicMock()
sys.modules['app.lib_general'] = MagicMock()
# Mock app.db_sql
mock_db_sql = MagicMock()
mock_db_sql.get_last_sql_error.return_value = '(pymysql.err.IntegrityError) (1062, "Duplicate entry \'test-id\' for key \'id_random\'" )'
sys.modules['app.db_sql'] = mock_db_sql
# Add project root to path
sys.path.append(os.getcwd())
from app.lib_api_crud_v3 import format_db_error
def test_error_formatting():
print("\n--- Testing Error Formatting ---")
raw = '(pymysql.err.IntegrityError) (1062, "Duplicate entry \'abc\' for key \'id_random\'" )'
formatted = format_db_error(raw)
print(f"Raw: {raw}")
print(f"Formatted: {formatted}")
if formatted == "Duplicate entry 'abc' for key 'id_random'":
print("✅ Error formatting works.")
else:
print("❌ Error formatting FAILED.")
def test_null_error_handling():
print("\n--- Testing Null Error Handling ---")
if format_db_error(None) == "":
print("✅ Null error handled correctly.")
else:
print("❌ Null error check FAILED.")
if __name__ == "__main__":
test_error_formatting()
test_null_error_handling()