Files
OSIT-AE-API-FastAPI/app/lib_sql_search.py

258 lines
12 KiB
Python

"""
Modular search builder and query generators for Aether.
"""
import logging
from typing import Any, List, Optional
from fastapi import HTTPException
from sqlalchemy import text
log = logging.getLogger(__name__)
def sql_limit_offset_part(limit: int, offset: int = 0) -> str:
"""Creates a partial SQL string for LIMIT and OFFSET."""
if limit >= 0 and offset >= 0:
log.info(f'Creating partial SQL string for LIMIT and OFFSET. Limit: {limit}; Offset: {offset}')
return f'LIMIT {limit} OFFSET {offset}'
else:
return ''
def sql_and_like_part(and_like_dict_obj: dict) -> tuple[str, dict]:
"""Creates a partial SQL string for AND LIKE queries."""
data = {}
if and_like_dict_obj and isinstance(and_like_dict_obj, dict):
log.info('Creating partial SQL string for additional AND LIKE queries.')
clauses = []
for key, value in and_like_dict_obj.items():
clauses.append(f"{key} LIKE :and_like_{key}")
data[f'and_like_{key}'] = value
return f"AND ({' AND '.join(clauses)})", data
return '', {}
def sql_or_like_part(or_like_dict_obj: dict) -> tuple[str, dict]:
"""Creates a partial SQL string for OR LIKE queries."""
data = {}
if or_like_dict_obj and isinstance(or_like_dict_obj, dict):
log.info('Creating partial SQL string for additional OR LIKE queries.')
clauses = []
for key, value in or_like_dict_obj.items():
clauses.append(f"{key} LIKE :or_like_{key}")
data[f'or_like_{key}'] = value
return f"AND ({' OR '.join(clauses)})", data
return '', {}
def sql_and_in_dict_li_part(and_in_dict_li_dict_obj: dict) -> tuple[str, dict]:
"""Creates a partial SQL string for AND IN queries."""
data = {}
if and_in_dict_li_dict_obj and isinstance(and_in_dict_li_dict_obj, dict):
log.info('Creating partial SQL string for additional AND IN queries.')
clauses = []
for key, value in and_in_dict_li_dict_obj.items():
clauses.append(f"{key} IN :and_in_{key}")
data[f'and_in_{key}'] = value
return f"AND ({' AND '.join(clauses)})", data
return '', {}
def sql_and_qry_part(and_qry_dict_obj: dict) -> tuple[str, dict]:
"""Creates a partial SQL string for additional AND queries (equals)."""
data = {}
if and_qry_dict_obj and isinstance(and_qry_dict_obj, dict):
log.info('Creating partial SQL string for additional AND queries.')
clauses = []
for key, value in and_qry_dict_obj.items():
clauses.append(f"{key} = :and_{key}")
data[f'and_{key}'] = value
return f"AND ({' AND '.join(clauses)})", data
return '', {}
def sql_fulltext_qry_part(fulltext_qry_dict: dict) -> tuple[str, dict]:
"""Creates a partial SQL string for fulltext search."""
data = {}
if fulltext_qry_dict and isinstance(fulltext_qry_dict, dict):
log.info('Creating partial SQL string for fulltext search.')
clauses = []
for key, value in fulltext_qry_dict.items():
clauses.append(f"MATCH( {key} ) AGAINST( :ft_{key} IN BOOLEAN MODE )")
data[f'ft_{key}'] = value
return f"AND ({' OR '.join(clauses)})", data
return '', {}
def sql_enable_part(table_name: str, enabled: str) -> tuple[str, bool|None]:
"""Handles enabled/disabled status filtering with schema check."""
from app import lib_sql_core
if not table_name: return '', None
if enabled in ['enabled', 'disabled', 'all']:
if enabled == 'all': return '', None
try:
lib_sql_core.db.execute(text(f"SELECT enable FROM `{table_name}` LIMIT 0"))
except:
log.warning(f"Table '{table_name}' missing 'enable' column. Skipping filter.")
return '', None
val = (enabled == 'enabled')
return f"AND `{table_name}`.enable = {str(val).lower()}", val
return '', None
def sql_hidden_part(table_name: str, hidden: str) -> tuple[str, bool|None]:
"""Handles hidden status filtering with schema check."""
from app import lib_sql_core
if not table_name: return '', None
if hidden in ['hidden', 'not_hidden', 'all']:
if hidden == 'all': return '', None
try:
lib_sql_core.db.execute(text(f"SELECT hide FROM `{table_name}` LIMIT 0"))
except:
log.warning(f"Table '{table_name}' missing 'hide' column. Skipping filter.")
return '', None
if hidden == 'hidden':
return f"AND `{table_name}`.hide = true", True
return f"AND (`{table_name}`.hide = false OR `{table_name}`.hide IS NULL)", False
return '', None
def sql_where_qry_part(qry_dict_li: list) -> tuple[str, dict]:
"""Standard v2 style WHERE clause builder."""
data = {}
if qry_dict_li and isinstance(qry_dict_li, list):
log.info('Creating partial SQL string for WHERE queries.')
clauses = []
for qry in qry_dict_li:
field = qry.get('field')
op = qry.get('operator')
val = qry.get('value')
type_ = qry.get('type', 'AND') or 'AND'
if op == 'MATCH':
clauses.append(f'{type_} MATCH( {field} ) AGAINST( :{field} IN BOOLEAN MODE )')
else:
clauses.append(f'{type_} {field} {op} :{field}')
data[field] = val
return ' '.join(clauses), data
return '', {}
def sql_search_qry_part(
search_query: Any,
searchable_fields: List[str]|None = None,
max_depth: int = 5,
table_name: str|None = None,
) -> tuple[str, dict]:
"""Recursively builds a SQL WHERE clause from a SearchQuery model."""
from app import lib_sql_core
data = {}
param_counter = [0]
def get_param_name():
param_counter[0] += 1
return f"sp_{param_counter[0]}"
operator_map = {
"eq": "=", "ne": "!=", "gt": ">", "gte": ">=", "lt": "<", "lte": "<=",
"like": "LIKE", "in": "IN", "is_null": "IS NULL", "is_not_null": "IS NOT NULL",
"contains": "LIKE", "icontains": "LIKE", "startswith": "LIKE", "istartswith": "LIKE",
"endswith": "LIKE", "iendswith": "LIKE"
}
def process_node(query_node, current_depth: int) -> str:
if current_depth > max_depth:
raise HTTPException(status_code=400, detail=f"Search query too complex.")
clauses = []
if hasattr(query_node, 'query_string') and query_node.query_string:
if query_node.query_string == '%': pass
else:
use_match = True
if table_name:
try:
lib_sql_core.db.execute(text(f"SELECT default_qry_str FROM `{table_name}` LIMIT 0"))
except:
use_match = False
else:
use_match = False
if use_match:
p_name = get_param_name()
clauses.append(f"MATCH( default_qry_str ) AGAINST( :{p_name} IN BOOLEAN MODE )")
data[p_name] = query_node.query_string
elif searchable_fields:
like_clauses = []
# Fields to exclude from a generic text 'q' search (numeric, technical, or date fields)
exclude_patterns = [
'enable', 'hide', 'priority', 'sort', 'group',
'created_on', 'updated_on'
]
for field in searchable_fields:
# Exclude internal integer IDs specifically
if field.endswith('_id') or field == 'id':
continue
# Exclude other technical/meta fields
if any(x == field for x in exclude_patterns):
continue
f_p_name = get_param_name()
like_clauses.append(f"`{field}` LIKE :{f_p_name}")
data[f_p_name] = f"%{query_node.query_string}%"
if like_clauses: clauses.append(f"({' OR '.join(like_clauses)})")
for filter_attr in ['and_filters', 'or_filters']:
if hasattr(query_node, filter_attr) and getattr(query_node, filter_attr):
node_clauses = []
for item in getattr(query_node, filter_attr):
if hasattr(item, 'field'):
clause, item_data = process_filter(item)
node_clauses.append(clause); data.update(item_data)
else: node_clauses.append(f"({process_node(item, current_depth + 1)})")
if node_clauses:
joiner = ' AND ' if 'and' in filter_attr else ' OR '
clauses.append(f"({joiner.join(node_clauses)})")
return ' AND '.join(clauses)
def process_filter(f) -> tuple[str, dict]:
# --- ID VISION MAPPING ---
# If the frontend uses clean names (id, account_id),
# map them to the database columns (id_random, account_id_random)
# ONLY if those columns actually exist in this table/view.
target_field = f.field
vision_fields = [
'id', 'account_id', 'site_id', 'person_id', 'user_id',
'journal_id', 'journal_entry_id', 'page_id', 'post_id',
'post_comment_id', 'organization_id', 'address_id', 'hosted_file_id'
]
if target_field in vision_fields:
candidate_field = 'id_random' if target_field == 'id' else f"{target_field}_random"
# Schema Check: Verify if the random version exists in the current table/view
use_random = False
if table_name:
try:
lib_sql_core.db.execute(text(f"SELECT `{candidate_field}` FROM `{table_name}` LIMIT 0"))
use_random = True
except Exception:
pass
if use_random:
target_field = candidate_field
# print(f"Search Trace: Mapping filter field '{f.field}' -> '{target_field}'", flush=True)
else:
# If random doesn't exist, we must stick to the integer column
# but we'll need to resolve the string value to an integer elsewhere
# or rely on the user providing an integer for now.
pass
if searchable_fields is not None and target_field not in searchable_fields:
# Fallback check for original field just in case
if f.field not in searchable_fields:
raise HTTPException(status_code=400, detail=f"Unauthorized search field '{f.field}' (mapped to '{target_field}')")
sql_op = operator_map.get(f.op.lower())
if not sql_op: raise HTTPException(status_code=400, detail=f"Unsupported operator: {f.op}")
filter_data = {}
if f.op.lower() in ['is_null', 'is_not_null']: clause = f"`{target_field}` {sql_op}"
else:
p_name = get_param_name()
if f.op.lower() == 'in': clause = f"`{target_field}` IN (:{p_name})"; filter_data[p_name] = f.value
elif f.op.lower() in ['contains', 'icontains']: clause = f"`{target_field}` LIKE :{p_name}"; filter_data[p_name] = f"%{f.value}%"
elif f.op.lower() in ['startswith', 'istartswith']: clause = f"`{target_field}` LIKE :{p_name}"; filter_data[p_name] = f"{f.value}%"
elif f.op.lower() in ['endswith', 'iendswith']: clause = f"`{target_field}` LIKE :{p_name}"; filter_data[p_name] = f"%{f.value}"
else: clause = f"`{target_field}` {sql_op} :{p_name}"; filter_data[p_name] = f.value
return clause, filter_data
sql_where = process_node(search_query, 1)
return (f"AND ({sql_where})", data) if sql_where else ("", {})