Docs: Consolidate admin documentation and migrate reference data
- Created LOCAL_DEVELOPMENT_GUIDE.md and DEPLOYMENT_GUIDE_MANUAL.md from legacy txt files. - Migrated country/time_zone data and requirements.txt to documentation/reference_data/. - Removed redundant admin/documentation/ and admin/data_files/ directories. - Enhanced app/lib_schema_v3.py to explicitly capture 'required' fields from DB 'NOT NULL' constraint. - Added verification tests for schema logic and standalone DB connectivity.
This commit is contained in:
10
tests/conftest_mock.py
Normal file
10
tests/conftest_mock.py
Normal file
@@ -0,0 +1,10 @@
|
||||
import sys
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
# Create a mock for app.config before any other app imports
|
||||
mock_config = MagicMock()
|
||||
mock_config.settings = MagicMock()
|
||||
|
||||
# Manually inject into sys.modules
|
||||
sys.modules["app.config"] = mock_config
|
||||
print("MOCKED: app.config")
|
||||
70
tests/generate_registry_live.py
Normal file
70
tests/generate_registry_live.py
Normal file
@@ -0,0 +1,70 @@
|
||||
#!/usr/bin/env python3
|
||||
import sys
|
||||
import os
|
||||
import json
|
||||
import datetime
|
||||
import sqlalchemy
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
# 1. Setup Path to ensure we can import the app
|
||||
sys.path.append("/srv/aether_api")
|
||||
sys.path.append(os.getcwd()) # For local execution
|
||||
|
||||
# 2. Mock app.config BEFORE importing app modules
|
||||
mock_config = MagicMock()
|
||||
mock_config.settings = MagicMock()
|
||||
sys.modules["app.config"] = mock_config
|
||||
|
||||
# 3. Setup REAL DB connection for introspection
|
||||
DB_USER = "aether_dev"
|
||||
DB_PASS = "$1sky.AE_dev.2023"
|
||||
DB_SERVER = "vpn-db.oneskyit.com"
|
||||
DB_PORT = "3306"
|
||||
DB_NAME = "aether_dev"
|
||||
DB_URI = f"mysql://{DB_USER}:{DB_PASS}@{DB_SERVER}:{DB_PORT}/{DB_NAME}"
|
||||
|
||||
mock_db = MagicMock()
|
||||
mock_db.execute = lambda stmt: engine.execute(stmt)
|
||||
engine = sqlalchemy.create_engine(DB_URI)
|
||||
sys.modules["app.db_sql"] = MagicMock(db=mock_db)
|
||||
|
||||
def type_handler(x):
|
||||
if isinstance(x, (datetime.datetime, datetime.date)):
|
||||
return x.isoformat()
|
||||
if isinstance(x, type):
|
||||
return x.__name__
|
||||
return str(x)
|
||||
|
||||
def export_registry():
|
||||
try:
|
||||
from app.ae_obj_types_def import obj_type_kv_li
|
||||
from app.lib_schema_v3 import get_object_schema_info
|
||||
except ImportError as e:
|
||||
print(f"Error: Could not import Aether API definitions. {e}", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
final_output = {}
|
||||
allowed_keys = ["tbl", "tbl_default", "tbl_update", "table_name", "tbl_name_update", "searchable_fields", "exp_default"]
|
||||
|
||||
for obj_key, obj_def in obj_type_kv_li.items():
|
||||
final_output[obj_key] = {}
|
||||
for k in allowed_keys:
|
||||
if k in obj_def: final_output[obj_key][k] = obj_def[k]
|
||||
|
||||
for k in ["mdl", "mdl_out", "mdl_in", "mdl_default"]:
|
||||
if k in obj_def:
|
||||
model = obj_def[k]
|
||||
try:
|
||||
if hasattr(model, "schema"): final_output[obj_key][f"{k}_schema"] = model.schema()
|
||||
final_output[obj_key][k] = model.__name__
|
||||
except Exception: final_output[obj_key][k] = str(model)
|
||||
|
||||
try:
|
||||
schema_info = get_object_schema_info(obj_key)
|
||||
if "database" in schema_info: final_output[obj_key]["db_schema"] = schema_info["database"]
|
||||
except Exception as e: final_output[obj_key]["db_schema_error"] = str(e)
|
||||
|
||||
print(json.dumps(final_output, indent=2, default=type_handler))
|
||||
|
||||
if __name__ == "__main__":
|
||||
export_registry()
|
||||
46
tests/standalone_db_test.py
Normal file
46
tests/standalone_db_test.py
Normal file
@@ -0,0 +1,46 @@
|
||||
import sqlalchemy
|
||||
from sqlalchemy import text
|
||||
import json
|
||||
|
||||
# Connection details from .env
|
||||
DB_USER = "aether_dev"
|
||||
DB_PASS = "$1sky.AE_dev.2023"
|
||||
DB_SERVER = "vpn-db.oneskyit.com"
|
||||
DB_PORT = "3306"
|
||||
DB_NAME = "aether_dev"
|
||||
|
||||
# Construct URI
|
||||
DB_URI = f"mysql://{DB_USER}:{DB_PASS}@{DB_SERVER}:{DB_PORT}/{DB_NAME}"
|
||||
|
||||
def test_raw_describe():
|
||||
engine = sqlalchemy.create_engine(DB_URI)
|
||||
print(f"Connecting to {DB_SERVER}...")
|
||||
|
||||
try:
|
||||
with engine.connect() as conn:
|
||||
result = conn.execute(text("DESCRIBE journal"))
|
||||
columns = []
|
||||
for row in result.fetchall():
|
||||
columns.append({
|
||||
"Field": row[0],
|
||||
"Type": row[1],
|
||||
"Null": row[2],
|
||||
"Default": row[4]
|
||||
})
|
||||
|
||||
# Print as a nice JSON snippet
|
||||
print("\n--- Raw DB Metadata for 'journal' (Sample) ---")
|
||||
print(json.dumps(columns[:5], indent=2))
|
||||
|
||||
# Highlight the specific fields mentioned in the task
|
||||
print("\n--- Target Fields ---")
|
||||
targets = ["name", "enable", "passcode_timeout", "created_on"]
|
||||
for col in columns:
|
||||
if col["Field"] in targets:
|
||||
print(f"Field: {col['Field']:16} | Type: {col['Type']:15} | Null: {col['Null']:3} | Default: {col['Default']}")
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error: {e}")
|
||||
|
||||
if __name__ == "__main__":
|
||||
test_raw_describe()
|
||||
44
tests/test_lib_schema_isolated.py
Normal file
44
tests/test_lib_schema_isolated.py
Normal file
@@ -0,0 +1,44 @@
|
||||
import sys
|
||||
import os
|
||||
import json
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
# Add current directory to path
|
||||
sys.path.append(os.getcwd())
|
||||
|
||||
# 1. Mock EVERYTHING before importing the target
|
||||
mock_config = MagicMock()
|
||||
mock_config.settings = MagicMock()
|
||||
sys.modules["app.config"] = mock_config
|
||||
|
||||
mock_db = MagicMock()
|
||||
sys.modules["app.db_sql"] = MagicMock(db=mock_db)
|
||||
|
||||
# 2. Mock DESCRIBE result
|
||||
mock_row = [
|
||||
("created_on", "timestamp", "NO", "", "current_timestamp()", "")
|
||||
]
|
||||
mock_db.execute.return_value.fetchall.return_value = mock_row
|
||||
|
||||
# 3. Import and test
|
||||
from app.lib_schema_v3 import get_object_schema_info
|
||||
|
||||
def test_isolated_logic():
|
||||
# We need to mock obj_type_kv_li as well
|
||||
import app.lib_schema_v3
|
||||
app.lib_schema_v3.obj_type_kv_li = {"journal": {"tbl": "journal", "mdl": MagicMock()}}
|
||||
|
||||
print("Testing isolated logic for get_object_schema_info...")
|
||||
info = get_object_schema_info("journal")
|
||||
|
||||
col = info["database"]["columns"][0]
|
||||
print(f"Resulting column info: {json.dumps(col, indent=2)}")
|
||||
|
||||
assert col["field"] == "created_on"
|
||||
assert col["db_type"] == "timestamp"
|
||||
assert col["required"] is True
|
||||
assert col["db_default"] == "current_timestamp()"
|
||||
print("\nIsolated test passed! The logic in lib_schema_v3 is correct.")
|
||||
|
||||
if __name__ == "__main__":
|
||||
test_isolated_logic()
|
||||
56
tests/verify_schema_logic.py
Normal file
56
tests/verify_schema_logic.py
Normal file
@@ -0,0 +1,56 @@
|
||||
import sys
|
||||
import os
|
||||
import json
|
||||
import sqlalchemy
|
||||
from sqlalchemy import text
|
||||
|
||||
# Add current directory to path
|
||||
sys.path.append(os.getcwd())
|
||||
|
||||
# MUST BE FIRST: Mock app.config
|
||||
import tests.conftest_mock
|
||||
|
||||
# Connection details from .env (Bypassing app.config)
|
||||
DB_USER = "aether_dev"
|
||||
DB_PASS = "$1sky.AE_dev.2023"
|
||||
DB_SERVER = "vpn-db.oneskyit.com"
|
||||
DB_PORT = "3306"
|
||||
DB_NAME = "aether_dev"
|
||||
DB_URI = f"mysql://{DB_USER}:{DB_PASS}@{DB_SERVER}:{DB_PORT}/{DB_NAME}"
|
||||
|
||||
# Mock the db object before importing lib_schema_v3
|
||||
from app.db_sql import db
|
||||
db.engine = sqlalchemy.create_engine(DB_URI)
|
||||
|
||||
from app.lib_schema_v3 import get_object_schema_info
|
||||
|
||||
def verify_enhanced_schema():
|
||||
print("Testing enhanced get_object_schema_info for 'journal'...")
|
||||
try:
|
||||
info = get_object_schema_info("journal")
|
||||
|
||||
if "error" in info:
|
||||
print(f"Error: {info['error']}")
|
||||
return
|
||||
|
||||
cols = info["database"]["columns"]
|
||||
targets = ["name", "enable", "passcode_timeout", "created_on"]
|
||||
|
||||
print("\n--- Verified Enhanced Metadata ---")
|
||||
for col in cols:
|
||||
if col["field"] in targets:
|
||||
print(f"Field: {col['field']:16} | DB Type: {col['db_type']:15} | Required: {str(col['required']):5} | Default: {col['db_default']}")
|
||||
|
||||
# Basic check to ensure new keys exist
|
||||
assert "db_type" in cols[0]
|
||||
assert "required" in cols[0]
|
||||
assert "db_default" in cols[0]
|
||||
print("\nSuccess: New schema keys are present and populated.")
|
||||
|
||||
except Exception as e:
|
||||
print(f"Test failed: {e}")
|
||||
import traceback
|
||||
traceback.print_exc()
|
||||
|
||||
if __name__ == "__main__":
|
||||
verify_enhanced_schema()
|
||||
Reference in New Issue
Block a user