Refactor V3 CRUD: Extract schema introspection logic.
- Created app/lib_schema_v3.py to isolate database and Pydantic model introspection. - Updated app/routers/api_crud_v3.py to use get_object_schema_info(), completing the modularization. - Finalized refactoring plan documentation in documentation/REFACTOR_API_CRUD_V3.md.
This commit is contained in:
61
app/lib_schema_v3.py
Normal file
61
app/lib_schema_v3.py
Normal file
@@ -0,0 +1,61 @@
|
||||
from typing import Any, Dict
|
||||
from sqlalchemy import text
|
||||
from app.db_sql import db
|
||||
from app.ae_obj_types_def import obj_type_kv_li
|
||||
|
||||
def get_object_schema_info(obj_type: str, view: str = 'default', variant: str = 'base') -> Dict[str, Any]:
|
||||
"""
|
||||
Introspects an object type to return its database and model structure.
|
||||
|
||||
Args:
|
||||
obj_type: The name of the object (e.g., 'person').
|
||||
view: The SQL view to describe (default, detail, etc.).
|
||||
variant: The model variant to describe (base, in, out).
|
||||
|
||||
Returns:
|
||||
A dictionary containing database column info and Pydantic field info.
|
||||
"""
|
||||
if obj_type not in obj_type_kv_li:
|
||||
return {"error": f"Object type '{obj_type}' not found."}
|
||||
|
||||
obj_cfg = obj_type_kv_li[obj_type]
|
||||
table_name = obj_cfg.get(f'tbl_{view}', obj_cfg.get('tbl_default', obj_cfg.get('tbl')))
|
||||
model_key = f'mdl_{variant}' if variant != 'base' else 'mdl'
|
||||
model = obj_cfg.get(model_key, obj_cfg.get('mdl_default', obj_cfg.get('mdl')))
|
||||
|
||||
if not table_name:
|
||||
return {"error": f"Table configuration for '{obj_type}' is missing."}
|
||||
|
||||
schema_info = {
|
||||
"object_type": obj_type,
|
||||
"view": view,
|
||||
"variant": variant,
|
||||
"database": {"table_name": table_name, "columns": []},
|
||||
"model": {"name": model.__name__ if hasattr(model, '__name__') else str(model), "fields": {}}
|
||||
}
|
||||
|
||||
# 1. Database Introspection
|
||||
try:
|
||||
db_result = db.execute(text(f"DESCRIBE `{table_name}`"))
|
||||
for row in db_result.fetchall():
|
||||
schema_info["database"]["columns"].append({
|
||||
"field": row[0], "type": row[1], "nullable": row[2] == 'YES',
|
||||
"key": row[3], "default": row[4], "extra": row[5]
|
||||
})
|
||||
except Exception as e:
|
||||
schema_info["database"]["error"] = str(e)
|
||||
|
||||
# 2. Pydantic Model Introspection
|
||||
if model and hasattr(model, "__fields__"):
|
||||
for field_name, field in model.__fields__.items():
|
||||
field_info = {
|
||||
"alias": field.alias,
|
||||
"type": str(field.outer_type_),
|
||||
"required": field.required,
|
||||
"default": field.default
|
||||
}
|
||||
if field.field_info.description:
|
||||
field_info["description"] = field.field_info.description
|
||||
schema_info["model"]["fields"][field_name] = field_info
|
||||
|
||||
return schema_info
|
||||
@@ -17,6 +17,7 @@ from app.lib_api_crud_v3 import (
|
||||
check_account_access, apply_forced_account_filter, filter_order_by,
|
||||
get_supported_filters, safe_json_loads, sanitize_payload
|
||||
)
|
||||
from app.lib_schema_v3 import get_object_schema_info
|
||||
from app.models.response_models import *
|
||||
from app.models.api_crud_models import SearchFilter, SearchQuery
|
||||
from app.ae_obj_types_def import obj_type_kv_li
|
||||
@@ -66,43 +67,11 @@ async def get_obj_schema(
|
||||
|
||||
This enables dynamic form generation without hardcoding schemas in the frontend.
|
||||
"""
|
||||
from app.db_sql import db
|
||||
from sqlalchemy import text
|
||||
|
||||
if obj_type not in obj_type_kv_li:
|
||||
return mk_resp(data=False, status_code=400, response=response, status_message=f"Object type '{obj_type}' not found.")
|
||||
|
||||
obj_cfg = obj_type_kv_li[obj_type]
|
||||
table_name = obj_cfg.get(f'tbl_{view}', obj_cfg.get('tbl_default', obj_cfg.get('tbl')))
|
||||
model_key = f'mdl_{variant}' if variant != 'base' else 'mdl'
|
||||
model = obj_cfg.get(model_key, obj_cfg.get('mdl_default', obj_cfg.get('mdl')))
|
||||
|
||||
if not table_name:
|
||||
return mk_resp(data=False, status_code=500, response=response, status_message=f"Table configuration for '{obj_type}' is missing.")
|
||||
|
||||
schema_info = {
|
||||
"object_type": obj_type,
|
||||
"view": view,
|
||||
"variant": variant,
|
||||
"database": {"table_name": table_name, "columns": []},
|
||||
"model": {"name": model.__name__ if hasattr(model, '__name__') else str(model), "fields": {}}
|
||||
}
|
||||
|
||||
try:
|
||||
db_result = db.execute(text(f"DESCRIBE `{table_name}`"))
|
||||
for row in db_result.fetchall():
|
||||
schema_info["database"]["columns"].append({
|
||||
"field": row[0], "type": row[1], "nullable": row[2] == 'YES',
|
||||
"key": row[3], "default": row[4], "extra": row[5]
|
||||
})
|
||||
except Exception as e:
|
||||
schema_info["database"]["error"] = str(e)
|
||||
|
||||
if model and hasattr(model, "__fields__"):
|
||||
for field_name, field in model.__fields__.items():
|
||||
field_info = {"alias": field.alias, "type": str(field.outer_type_), "required": field.required, "default": field.default}
|
||||
if field.field_info.description: field_info["description"] = field.field_info.description
|
||||
schema_info["model"]["fields"][field_name] = field_info
|
||||
schema_info = get_object_schema_info(obj_type, view, variant)
|
||||
|
||||
if "error" in schema_info:
|
||||
status_code = 400 if "not found" in schema_info["error"] else 500
|
||||
return mk_resp(data=False, status_code=status_code, response=response, status_message=schema_info["error"])
|
||||
|
||||
return mk_resp(data=schema_info, response=response)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user