276 lines
13 KiB
Python
276 lines
13 KiB
Python
"""
|
|
Modular search builder and query generators for Aether.
|
|
"""
|
|
import logging
|
|
from typing import Any, List, Optional
|
|
from fastapi import HTTPException
|
|
from sqlalchemy import text
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
def sql_limit_offset_part(limit: int, offset: int = 0) -> str:
|
|
"""Creates a partial SQL string for LIMIT and OFFSET."""
|
|
if limit >= 0 and offset >= 0:
|
|
log.info(f'Creating partial SQL string for LIMIT and OFFSET. Limit: {limit}; Offset: {offset}')
|
|
return f'LIMIT {limit} OFFSET {offset}'
|
|
else:
|
|
return ''
|
|
|
|
def sql_and_like_part(and_like_dict_obj: dict) -> tuple[str, dict]:
|
|
"""Creates a partial SQL string for AND LIKE queries."""
|
|
data = {}
|
|
if and_like_dict_obj and isinstance(and_like_dict_obj, dict):
|
|
log.info('Creating partial SQL string for additional AND LIKE queries.')
|
|
clauses = []
|
|
for key, value in and_like_dict_obj.items():
|
|
clauses.append(f"{key} LIKE :and_like_{key}")
|
|
data[f'and_like_{key}'] = value
|
|
return f"AND ({' AND '.join(clauses)})", data
|
|
return '', {}
|
|
|
|
def sql_or_like_part(or_like_dict_obj: dict) -> tuple[str, dict]:
|
|
"""Creates a partial SQL string for OR LIKE queries."""
|
|
data = {}
|
|
if or_like_dict_obj and isinstance(or_like_dict_obj, dict):
|
|
log.info('Creating partial SQL string for additional OR LIKE queries.')
|
|
clauses = []
|
|
for key, value in or_like_dict_obj.items():
|
|
clauses.append(f"{key} LIKE :or_like_{key}")
|
|
data[f'or_like_{key}'] = value
|
|
return f"AND ({' OR '.join(clauses)})", data
|
|
return '', {}
|
|
|
|
def sql_and_in_dict_li_part(and_in_dict_li_dict_obj: dict) -> tuple[str, dict]:
|
|
"""Creates a partial SQL string for AND IN queries."""
|
|
data = {}
|
|
if and_in_dict_li_dict_obj and isinstance(and_in_dict_li_dict_obj, dict):
|
|
log.info('Creating partial SQL string for additional AND IN queries.')
|
|
clauses = []
|
|
for key, value in and_in_dict_li_dict_obj.items():
|
|
clauses.append(f"{key} IN :and_in_{key}")
|
|
data[f'and_in_{key}'] = value
|
|
return f"AND ({' AND '.join(clauses)})", data
|
|
return '', {}
|
|
|
|
def sql_and_qry_part(and_qry_dict_obj: dict) -> tuple[str, dict]:
|
|
"""Creates a partial SQL string for additional AND queries (equals)."""
|
|
data = {}
|
|
if and_qry_dict_obj and isinstance(and_qry_dict_obj, dict):
|
|
log.info('Creating partial SQL string for additional AND queries.')
|
|
clauses = []
|
|
for key, value in and_qry_dict_obj.items():
|
|
clauses.append(f"{key} = :and_{key}")
|
|
data[f'and_{key}'] = value
|
|
return f"AND ({' AND '.join(clauses)})", data
|
|
return '', {}
|
|
|
|
def sql_fulltext_qry_part(fulltext_qry_dict: dict) -> tuple[str, dict]:
|
|
"""Creates a partial SQL string for fulltext search."""
|
|
data = {}
|
|
if fulltext_qry_dict and isinstance(fulltext_qry_dict, dict):
|
|
log.info('Creating partial SQL string for fulltext search.')
|
|
clauses = []
|
|
for key, value in fulltext_qry_dict.items():
|
|
clauses.append(f"MATCH( {key} ) AGAINST( :ft_{key} IN BOOLEAN MODE )")
|
|
data[f'ft_{key}'] = value
|
|
return f"AND ({' OR '.join(clauses)})", data
|
|
return '', {}
|
|
|
|
def sql_enable_part(table_name: str, enabled: str) -> tuple[str, bool|None]:
|
|
"""Handles enabled/disabled status filtering with schema check."""
|
|
from app import lib_sql_core
|
|
if not table_name: return '', None
|
|
if enabled in ['enabled', 'disabled', 'not_enabled', 'all']:
|
|
if enabled == 'all': return '', None
|
|
try:
|
|
with lib_sql_core.engine.connect() as conn:
|
|
conn.execute(text(f"SELECT enable FROM `{table_name}` LIMIT 0"))
|
|
except:
|
|
log.warning(f"Table '{table_name}' missing 'enable' column. Skipping filter.")
|
|
return '', None
|
|
val = (enabled == 'enabled')
|
|
return f"AND `{table_name}`.enable = {str(val).lower()}", val
|
|
return '', None
|
|
|
|
def sql_hidden_part(table_name: str, hidden: str) -> tuple[str, bool|None]:
|
|
"""Handles hidden status filtering with schema check."""
|
|
from app import lib_sql_core
|
|
if not table_name: return '', None
|
|
if hidden in ['hidden', 'not_hidden', 'all']:
|
|
if hidden == 'all': return '', None
|
|
try:
|
|
with lib_sql_core.engine.connect() as conn:
|
|
conn.execute(text(f"SELECT hide FROM `{table_name}` LIMIT 0"))
|
|
except:
|
|
log.warning(f"Table '{table_name}' missing 'hide' column. Skipping filter.")
|
|
return '', None
|
|
if hidden == 'hidden':
|
|
return f"AND `{table_name}`.hide = true", True
|
|
return f"AND (`{table_name}`.hide = false OR `{table_name}`.hide IS NULL)", False
|
|
return '', None
|
|
|
|
def sql_where_qry_part(qry_dict_li: list) -> tuple[str, dict]:
|
|
"""Standard v2 style WHERE clause builder."""
|
|
data = {}
|
|
if qry_dict_li and isinstance(qry_dict_li, list):
|
|
log.info('Creating partial SQL string for WHERE queries.')
|
|
clauses = []
|
|
for qry in qry_dict_li:
|
|
field = qry.get('field')
|
|
op = qry.get('operator')
|
|
val = qry.get('value')
|
|
type_ = qry.get('type', 'AND') or 'AND'
|
|
if op == 'MATCH':
|
|
clauses.append(f'{type_} MATCH( {field} ) AGAINST( :{field} IN BOOLEAN MODE )')
|
|
else:
|
|
clauses.append(f'{type_} {field} {op} :{field}')
|
|
data[field] = val
|
|
return ' '.join(clauses), data
|
|
return '', {}
|
|
|
|
def sql_search_qry_part(
|
|
search_query: Any,
|
|
searchable_fields: List[str]|None = None,
|
|
max_depth: int = 5,
|
|
table_name: str|None = None,
|
|
) -> tuple[str, dict]:
|
|
"""Recursively builds a SQL WHERE clause from a SearchQuery model."""
|
|
from app import lib_sql_core
|
|
data = {}
|
|
param_counter = [0]
|
|
|
|
def get_param_name():
|
|
param_counter[0] += 1
|
|
return f"sp_{param_counter[0]}"
|
|
|
|
operator_map = {
|
|
"eq": "=", "ne": "!=", "gt": ">", "gte": ">=", "lt": "<", "lte": "<=",
|
|
"like": "LIKE", "in": "IN", "is_null": "IS NULL", "is_not_null": "IS NOT NULL",
|
|
"contains": "LIKE", "icontains": "LIKE", "startswith": "LIKE", "istartswith": "LIKE",
|
|
"endswith": "LIKE", "iendswith": "LIKE"
|
|
}
|
|
|
|
def process_node(query_node, current_depth: int) -> str:
|
|
if current_depth > max_depth:
|
|
raise HTTPException(status_code=400, detail=f"Search query too complex.")
|
|
clauses = []
|
|
if hasattr(query_node, 'query_string') and query_node.query_string:
|
|
if query_node.query_string == '%': pass
|
|
else:
|
|
use_match = True
|
|
if table_name:
|
|
try:
|
|
with lib_sql_core.engine.connect() as conn:
|
|
conn.execute(text(f"SELECT default_qry_str FROM `{table_name}` LIMIT 0"))
|
|
except:
|
|
use_match = False
|
|
else:
|
|
use_match = False
|
|
|
|
if use_match:
|
|
p_name = get_param_name()
|
|
clauses.append(f"MATCH( default_qry_str ) AGAINST( :{p_name} IN BOOLEAN MODE )")
|
|
data[p_name] = query_node.query_string
|
|
elif searchable_fields:
|
|
like_clauses = []
|
|
# Fields to exclude from a generic text 'q' search (numeric, technical, or date fields)
|
|
exclude_patterns = [
|
|
'enable', 'hide', 'priority', 'sort', 'group',
|
|
'created_on', 'updated_on'
|
|
]
|
|
for field in searchable_fields:
|
|
# Exclude internal integer IDs specifically
|
|
if field.endswith('_id') or field == 'id':
|
|
continue
|
|
|
|
# Exclude other technical/meta fields
|
|
if any(x == field for x in exclude_patterns):
|
|
continue
|
|
|
|
f_p_name = get_param_name()
|
|
like_clauses.append(f"`{field}` LIKE :{f_p_name}")
|
|
data[f_p_name] = f"%{query_node.query_string}%"
|
|
|
|
if like_clauses: clauses.append(f"({' OR '.join(like_clauses)})")
|
|
for filter_attr in ['and_filters', 'or_filters']:
|
|
if hasattr(query_node, filter_attr) and getattr(query_node, filter_attr):
|
|
node_clauses = []
|
|
for item in getattr(query_node, filter_attr):
|
|
if hasattr(item, 'field'):
|
|
clause, item_data = process_filter(item)
|
|
node_clauses.append(clause); data.update(item_data)
|
|
else: node_clauses.append(f"({process_node(item, current_depth + 1)})")
|
|
if node_clauses:
|
|
joiner = ' AND ' if 'and' in filter_attr else ' OR '
|
|
clauses.append(f"({joiner.join(node_clauses)})")
|
|
return ' AND '.join(clauses)
|
|
|
|
def process_filter(f) -> tuple[str, dict]:
|
|
# --- ID VISION MAPPING ---
|
|
# If the frontend uses clean names (id, account_id),
|
|
# map them to the database columns (id_random, account_id_random)
|
|
# ONLY if those columns actually exist in this table/view.
|
|
target_field = f.field
|
|
vision_fields = [
|
|
'id', 'account_id', 'site_id', 'person_id', 'user_id',
|
|
'archive_id', 'archive_content_id',
|
|
'event_id',
|
|
'event_session_id', 'event_presentation_id', 'event_presenter_id',
|
|
'event_device_id', 'event_location_id', 'event_track_id',
|
|
'event_exhibit_id',
|
|
'event_person_id', 'event_registration_id',
|
|
'order_id', 'product_id', 'order_cart_id', 'membership_id', 'sponsorship_id',
|
|
'journal_id', 'journal_entry_id', 'page_id',
|
|
'post_id', 'post_comment_id',
|
|
'organization_id', 'address_id', 'contact_id',
|
|
'hosted_file_id'
|
|
]
|
|
|
|
if target_field in vision_fields:
|
|
# ONLY map to _random if the value is a string (looks like a random ID)
|
|
# If it's an integer, we want to query the original integer column.
|
|
is_int_val = isinstance(f.value, int) or (isinstance(f.value, str) and f.value.isdigit())
|
|
|
|
if not is_int_val:
|
|
candidate_field = 'id_random' if target_field == 'id' else f"{target_field}_random"
|
|
|
|
# Schema Check: Verify if the random version exists in the current table/view
|
|
use_random = False
|
|
if table_name:
|
|
try:
|
|
with lib_sql_core.engine.connect() as conn:
|
|
conn.execute(text(f"SELECT `{candidate_field}` FROM `{table_name}` LIMIT 0"))
|
|
use_random = True
|
|
except Exception:
|
|
pass
|
|
|
|
if use_random:
|
|
target_field = candidate_field
|
|
# print(f"Search Trace: Mapping filter field '{f.field}' -> '{target_field}'", flush=True)
|
|
else:
|
|
# If random doesn't exist, we must stick to the integer column
|
|
# but we'll need to resolve the string value to an integer elsewhere
|
|
# or rely on the user providing an integer for now.
|
|
pass
|
|
|
|
if searchable_fields is not None and target_field not in searchable_fields:
|
|
# Fallback check for original field just in case
|
|
if f.field not in searchable_fields:
|
|
raise HTTPException(status_code=400, detail=f"Unauthorized search field '{f.field}' (mapped to '{target_field}')")
|
|
|
|
sql_op = operator_map.get(f.op.lower())
|
|
if not sql_op: raise HTTPException(status_code=400, detail=f"Unsupported operator: {f.op}")
|
|
filter_data = {}
|
|
if f.op.lower() in ['is_null', 'is_not_null']: clause = f"`{target_field}` {sql_op}"
|
|
else:
|
|
p_name = get_param_name()
|
|
if f.op.lower() == 'in': clause = f"`{target_field}` IN (:{p_name})"; filter_data[p_name] = f.value
|
|
elif f.op.lower() in ['contains', 'icontains']: clause = f"`{target_field}` LIKE :{p_name}"; filter_data[p_name] = f"%{f.value}%"
|
|
elif f.op.lower() in ['startswith', 'istartswith']: clause = f"`{target_field}` LIKE :{p_name}"; filter_data[p_name] = f"{f.value}%"
|
|
elif f.op.lower() in ['endswith', 'iendswith']: clause = f"`{target_field}` LIKE :{p_name}"; filter_data[p_name] = f"%{f.value}"
|
|
else: clause = f"`{target_field}` {sql_op} :{p_name}"; filter_data[p_name] = f.value
|
|
return clause, filter_data
|
|
|
|
sql_where = process_node(search_query, 1)
|
|
return (f"AND ({sql_where})", data) if sql_where else ("", {})
|