- Categorized scripts into tests/unit/, tests/integration/, tests/e2e/, and tests/tools/. - Adopted consistent naming prefixes (test_unit_*, test_int_*, test_e2e_*, tool_*). - Renamed conftest_mock.py to mock_config_helper.py for clarity. - Updated test_int_boot_diagnosis.py with sys.path setup for root-level execution.
71 lines
2.3 KiB
Python
71 lines
2.3 KiB
Python
#!/usr/bin/env python3
|
|
import sys
|
|
import os
|
|
import json
|
|
import datetime
|
|
import sqlalchemy
|
|
from unittest.mock import MagicMock
|
|
|
|
# 1. Setup Path to ensure we can import the app
|
|
sys.path.append("/srv/aether_api")
|
|
sys.path.append(os.getcwd()) # For local execution
|
|
|
|
# 2. Mock app.config BEFORE importing app modules
|
|
mock_config = MagicMock()
|
|
mock_config.settings = MagicMock()
|
|
sys.modules["app.config"] = mock_config
|
|
|
|
# 3. Setup REAL DB connection for introspection
|
|
DB_USER = "aether_dev"
|
|
DB_PASS = "$1sky.AE_dev.2023"
|
|
DB_SERVER = "vpn-db.oneskyit.com"
|
|
DB_PORT = "3306"
|
|
DB_NAME = "aether_dev"
|
|
DB_URI = f"mysql://{DB_USER}:{DB_PASS}@{DB_SERVER}:{DB_PORT}/{DB_NAME}"
|
|
|
|
mock_db = MagicMock()
|
|
mock_db.execute = lambda stmt: engine.execute(stmt)
|
|
engine = sqlalchemy.create_engine(DB_URI)
|
|
sys.modules["app.db_sql"] = MagicMock(db=mock_db)
|
|
|
|
def type_handler(x):
|
|
if isinstance(x, (datetime.datetime, datetime.date)):
|
|
return x.isoformat()
|
|
if isinstance(x, type):
|
|
return x.__name__
|
|
return str(x)
|
|
|
|
def export_registry():
|
|
try:
|
|
from app.ae_obj_types_def import obj_type_kv_li
|
|
from app.lib_schema_v3 import get_object_schema_info
|
|
except ImportError as e:
|
|
print(f"Error: Could not import Aether API definitions. {e}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
final_output = {}
|
|
allowed_keys = ["tbl", "tbl_default", "tbl_update", "table_name", "tbl_name_update", "searchable_fields", "exp_default"]
|
|
|
|
for obj_key, obj_def in obj_type_kv_li.items():
|
|
final_output[obj_key] = {}
|
|
for k in allowed_keys:
|
|
if k in obj_def: final_output[obj_key][k] = obj_def[k]
|
|
|
|
for k in ["mdl", "mdl_out", "mdl_in", "mdl_default"]:
|
|
if k in obj_def:
|
|
model = obj_def[k]
|
|
try:
|
|
if hasattr(model, "schema"): final_output[obj_key][f"{k}_schema"] = model.schema()
|
|
final_output[obj_key][k] = model.__name__
|
|
except Exception: final_output[obj_key][k] = str(model)
|
|
|
|
try:
|
|
schema_info = get_object_schema_info(obj_key)
|
|
if "database" in schema_info: final_output[obj_key]["db_schema"] = schema_info["database"]
|
|
except Exception as e: final_output[obj_key]["db_schema_error"] = str(e)
|
|
|
|
print(json.dumps(final_output, indent=2, default=type_handler))
|
|
|
|
if __name__ == "__main__":
|
|
export_registry()
|