Vision ID: Standardize Site Domain and Journal objects with string-only IDs and searchable mapping

This commit is contained in:
Scott Idem
2026-01-19 15:57:00 -05:00
parent 2dbf47d874
commit 7db937f8af
10 changed files with 272 additions and 305 deletions

View File

@@ -78,12 +78,12 @@ def sql_fulltext_qry_part(fulltext_qry_dict: dict) -> tuple[str, dict]|bool:
def sql_enable_part(table_name: str, enabled: str) -> tuple[str, bool|None]|bool:
"""Handles enabled/disabled status filtering with schema check."""
from app.db_sql import db
from app import lib_sql_core
if not table_name: return False
if enabled in ['enabled', 'disabled', 'all']:
if enabled == 'all': return '', None
try:
db.execute(text(f"SELECT enable FROM `{table_name}` LIMIT 0"))
lib_sql_core.db.execute(text(f"SELECT enable FROM `{table_name}` LIMIT 0"))
except:
log.warning(f"Table '{table_name}' missing 'enable' column. Skipping filter.")
return '', None
@@ -93,12 +93,12 @@ def sql_enable_part(table_name: str, enabled: str) -> tuple[str, bool|None]|bool
def sql_hidden_part(table_name: str, hidden: str) -> tuple[str, bool|None]|bool:
"""Handles hidden status filtering with schema check."""
from app.db_sql import db
from app import lib_sql_core
if not table_name: return False
if hidden in ['hidden', 'not_hidden', 'all']:
if hidden == 'all': return '', None
try:
db.execute(text(f"SELECT hide FROM `{table_name}` LIMIT 0"))
lib_sql_core.db.execute(text(f"SELECT hide FROM `{table_name}` LIMIT 0"))
except:
log.warning(f"Table '{table_name}' missing 'hide' column. Skipping filter.")
return '', None
@@ -133,7 +133,7 @@ def sql_search_qry_part(
table_name: str|None = None,
) -> tuple[str, dict]:
"""Recursively builds a SQL WHERE clause from a SearchQuery model."""
from app.db_sql import db
from app import lib_sql_core
data = {}
param_counter = [0]
@@ -157,9 +157,13 @@ def sql_search_qry_part(
else:
use_match = True
if table_name:
try: db.execute(text(f"SELECT default_qry_str FROM `{table_name}` LIMIT 0"))
except: use_match = False
else: use_match = False
try:
lib_sql_core.db.execute(text(f"SELECT default_qry_str FROM `{table_name}` LIMIT 0"))
except:
use_match = False
else:
use_match = False
if use_match:
p_name = get_param_name()
clauses.append(f"MATCH( default_qry_str ) AGAINST( :{p_name} IN BOOLEAN MODE )")
@@ -172,7 +176,7 @@ def sql_search_qry_part(
'created_on', 'updated_on'
]
for field in searchable_fields:
# Exclude exact internal integer IDs (ending in _id)
# Exclude internal integer IDs specifically
if field.endswith('_id') or field == 'id':
continue
@@ -183,6 +187,7 @@ def sql_search_qry_part(
f_p_name = get_param_name()
like_clauses.append(f"`{field}` LIKE :{f_p_name}")
data[f_p_name] = f"%{query_node.query_string}%"
if like_clauses: clauses.append(f"({' OR '.join(like_clauses)})")
for filter_attr in ['and_filters', 'or_filters']:
if hasattr(query_node, filter_attr) and getattr(query_node, filter_attr):
@@ -198,19 +203,33 @@ def sql_search_qry_part(
return ' AND '.join(clauses)
def process_filter(f) -> tuple[str, dict]:
if searchable_fields is not None and f.field not in searchable_fields:
raise HTTPException(status_code=400, detail=f"Unauthorized search field '{f.field}'")
# --- ID VISION MAPPING ---
# If the frontend uses clean names (id, account_id),
# map them to the database columns (id_random, account_id_random).
target_field = f.field
vision_fields = ['id', 'account_id', 'site_id', 'person_id', 'user_id', 'journal_id', 'journal_entry_id']
if target_field in vision_fields:
if target_field == 'id': target_field = 'id_random'
else: target_field = f"{target_field}_random"
print(f"Search Trace: Mapping filter field '{f.field}' -> '{target_field}'", flush=True)
if searchable_fields is not None and target_field not in searchable_fields:
# Fallback check for original field just in case
if f.field not in searchable_fields:
raise HTTPException(status_code=400, detail=f"Unauthorized search field '{f.field}' (mapped to '{target_field}')")
sql_op = operator_map.get(f.op.lower())
if not sql_op: raise HTTPException(status_code=400, detail=f"Unsupported operator: {f.op}")
filter_data = {}
if f.op.lower() in ['is_null', 'is_not_null']: clause = f"`{f.field}` {sql_op}"
if f.op.lower() in ['is_null', 'is_not_null']: clause = f"`{target_field}` {sql_op}"
else:
p_name = get_param_name()
if f.op.lower() == 'in': clause = f"`{f.field}` IN (:{p_name})"; filter_data[p_name] = f.value
elif f.op.lower() in ['contains', 'icontains']: clause = f"`{f.field}` LIKE :{p_name}"; filter_data[p_name] = f"%{f.value}%"
elif f.op.lower() in ['startswith', 'istartswith']: clause = f"`{f.field}` LIKE :{p_name}"; filter_data[p_name] = f"{f.value}%"
elif f.op.lower() in ['endswith', 'iendswith']: clause = f"`{f.field}` LIKE :{p_name}"; filter_data[p_name] = f"%{f.value}"
else: clause = f"`{f.field}` {sql_op} :{p_name}"; filter_data[p_name] = f.value
if f.op.lower() == 'in': clause = f"`{target_field}` IN (:{p_name})"; filter_data[p_name] = f.value
elif f.op.lower() in ['contains', 'icontains']: clause = f"`{target_field}` LIKE :{p_name}"; filter_data[p_name] = f"%{f.value}%"
elif f.op.lower() in ['startswith', 'istartswith']: clause = f"`{target_field}` LIKE :{p_name}"; filter_data[p_name] = f"{f.value}%"
elif f.op.lower() in ['endswith', 'iendswith']: clause = f"`{target_field}` LIKE :{p_name}"; filter_data[p_name] = f"%{f.value}"
else: clause = f"`{target_field}` {sql_op} :{p_name}"; filter_data[p_name] = f.value
return clause, filter_data
sql_where = process_node(search_query, 1)