""" Modular search builder and query generators for Aether. """ import logging from typing import Any, List, Optional from fastapi import HTTPException from sqlalchemy import text log = logging.getLogger(__name__) def sql_limit_offset_part(limit: int, offset: int = 0) -> str|bool: """Creates a partial SQL string for LIMIT and OFFSET.""" if limit >= 0 and offset >= 0: log.info(f'Creating partial SQL string for LIMIT and OFFSET. Limit: {limit}; Offset: {offset}') return f'LIMIT {limit} OFFSET {offset}' else: return False def sql_and_like_part(and_like_dict_obj: dict) -> tuple[str, dict]|bool: """Creates a partial SQL string for AND LIKE queries.""" data = {} if and_like_dict_obj and isinstance(and_like_dict_obj, dict): log.info('Creating partial SQL string for additional AND LIKE queries.') clauses = [] for key, value in and_like_dict_obj.items(): clauses.append(f"{key} LIKE :and_like_{key}") data[f'and_like_{key}'] = value return f"AND ({' AND '.join(clauses)})", data return False def sql_or_like_part(or_like_dict_obj: dict) -> tuple[str, dict]|bool: """Creates a partial SQL string for OR LIKE queries.""" data = {} if or_like_dict_obj and isinstance(or_like_dict_obj, dict): log.info('Creating partial SQL string for additional OR LIKE queries.') clauses = [] for key, value in or_like_dict_obj.items(): clauses.append(f"{key} LIKE :or_like_{key}") data[f'or_like_{key}'] = value return f"AND ({' OR '.join(clauses)})", data return False def sql_and_in_dict_li_part(and_in_dict_li_dict_obj: dict) -> tuple[str, dict]|bool: """Creates a partial SQL string for AND IN queries.""" data = {} if and_in_dict_li_dict_obj and isinstance(and_in_dict_li_dict_obj, dict): log.info('Creating partial SQL string for additional AND IN queries.') clauses = [] for key, value in and_in_dict_li_dict_obj.items(): clauses.append(f"{key} IN :and_in_{key}") data[f'and_in_{key}'] = value return f"AND ({' AND '.join(clauses)})", data return False def sql_and_qry_part(and_qry_dict_obj: dict) -> tuple[str, dict]|bool: """Creates a partial SQL string for additional AND queries (equals).""" data = {} if and_qry_dict_obj and isinstance(and_qry_dict_obj, dict): log.info('Creating partial SQL string for additional AND queries.') clauses = [] for key, value in and_qry_dict_obj.items(): clauses.append(f"{key} = :and_{key}") data[f'and_{key}'] = value return f"AND ({' AND '.join(clauses)})", data return False def sql_fulltext_qry_part(fulltext_qry_dict: dict) -> tuple[str, dict]|bool: """Creates a partial SQL string for fulltext search.""" data = {} if fulltext_qry_dict and isinstance(fulltext_qry_dict, dict): log.info('Creating partial SQL string for fulltext search.') clauses = [] for key, value in fulltext_qry_dict.items(): clauses.append(f"MATCH( {key} ) AGAINST( :ft_{key} IN BOOLEAN MODE )") data[f'ft_{key}'] = value return f"AND ({' OR '.join(clauses)})", data return False def sql_enable_part(table_name: str, enabled: str) -> tuple[str, bool|None]|bool: """Handles enabled/disabled status filtering with schema check.""" from app import lib_sql_core if not table_name: return False if enabled in ['enabled', 'disabled', 'all']: if enabled == 'all': return '', None try: lib_sql_core.db.execute(text(f"SELECT enable FROM `{table_name}` LIMIT 0")) except: log.warning(f"Table '{table_name}' missing 'enable' column. Skipping filter.") return '', None val = (enabled == 'enabled') return f"AND `{table_name}`.enable = {str(val).lower()}", val return False def sql_hidden_part(table_name: str, hidden: str) -> tuple[str, bool|None]|bool: """Handles hidden status filtering with schema check.""" from app import lib_sql_core if not table_name: return False if hidden in ['hidden', 'not_hidden', 'all']: if hidden == 'all': return '', None try: lib_sql_core.db.execute(text(f"SELECT hide FROM `{table_name}` LIMIT 0")) except: log.warning(f"Table '{table_name}' missing 'hide' column. Skipping filter.") return '', None if hidden == 'hidden': return f"AND `{table_name}`.hide = true", True return f"AND (`{table_name}`.hide = false OR `{table_name}`.hide IS NULL)", False return False def sql_where_qry_part(qry_dict_li: list) -> tuple[str, dict]|bool: """Standard v2 style WHERE clause builder.""" data = {} if qry_dict_li and isinstance(qry_dict_li, list): log.info('Creating partial SQL string for WHERE queries.') clauses = [] for qry in qry_dict_li: field = qry.get('field') op = qry.get('operator') val = qry.get('value') type_ = qry.get('type', 'AND') or 'AND' if op == 'MATCH': clauses.append(f'{type_} MATCH( {field} ) AGAINST( :{field} IN BOOLEAN MODE )') else: clauses.append(f'{type_} {field} {op} :{field}') data[field] = val return ' '.join(clauses), data return False def sql_search_qry_part( search_query: Any, searchable_fields: List[str]|None = None, max_depth: int = 5, table_name: str|None = None, ) -> tuple[str, dict]: """Recursively builds a SQL WHERE clause from a SearchQuery model.""" from app import lib_sql_core data = {} param_counter = [0] def get_param_name(): param_counter[0] += 1 return f"sp_{param_counter[0]}" operator_map = { "eq": "=", "ne": "!=", "gt": ">", "gte": ">=", "lt": "<", "lte": "<=", "like": "LIKE", "in": "IN", "is_null": "IS NULL", "is_not_null": "IS NOT NULL", "contains": "LIKE", "icontains": "LIKE", "startswith": "LIKE", "istartswith": "LIKE", "endswith": "LIKE", "iendswith": "LIKE" } def process_node(query_node, current_depth: int) -> str: if current_depth > max_depth: raise HTTPException(status_code=400, detail=f"Search query too complex.") clauses = [] if hasattr(query_node, 'query_string') and query_node.query_string: if query_node.query_string == '%': pass else: use_match = True if table_name: try: lib_sql_core.db.execute(text(f"SELECT default_qry_str FROM `{table_name}` LIMIT 0")) except: use_match = False else: use_match = False if use_match: p_name = get_param_name() clauses.append(f"MATCH( default_qry_str ) AGAINST( :{p_name} IN BOOLEAN MODE )") data[p_name] = query_node.query_string elif searchable_fields: like_clauses = [] # Fields to exclude from a generic text 'q' search (numeric, technical, or date fields) exclude_patterns = [ 'enable', 'hide', 'priority', 'sort', 'group', 'created_on', 'updated_on' ] for field in searchable_fields: # Exclude internal integer IDs specifically if field.endswith('_id') or field == 'id': continue # Exclude other technical/meta fields if any(x == field for x in exclude_patterns): continue f_p_name = get_param_name() like_clauses.append(f"`{field}` LIKE :{f_p_name}") data[f_p_name] = f"%{query_node.query_string}%" if like_clauses: clauses.append(f"({' OR '.join(like_clauses)})") for filter_attr in ['and_filters', 'or_filters']: if hasattr(query_node, filter_attr) and getattr(query_node, filter_attr): node_clauses = [] for item in getattr(query_node, filter_attr): if hasattr(item, 'field'): clause, item_data = process_filter(item) node_clauses.append(clause); data.update(item_data) else: node_clauses.append(f"({process_node(item, current_depth + 1)})") if node_clauses: joiner = ' AND ' if 'and' in filter_attr else ' OR ' clauses.append(f"({joiner.join(node_clauses)})") return ' AND '.join(clauses) def process_filter(f) -> tuple[str, dict]: # --- ID VISION MAPPING --- # If the frontend uses clean names (id, account_id), # map them to the database columns (id_random, account_id_random). target_field = f.field vision_fields = ['id', 'account_id', 'site_id', 'person_id', 'user_id', 'journal_id', 'journal_entry_id'] if target_field in vision_fields: if target_field == 'id': target_field = 'id_random' else: target_field = f"{target_field}_random" print(f"Search Trace: Mapping filter field '{f.field}' -> '{target_field}'", flush=True) if searchable_fields is not None and target_field not in searchable_fields: # Fallback check for original field just in case if f.field not in searchable_fields: raise HTTPException(status_code=400, detail=f"Unauthorized search field '{f.field}' (mapped to '{target_field}')") sql_op = operator_map.get(f.op.lower()) if not sql_op: raise HTTPException(status_code=400, detail=f"Unsupported operator: {f.op}") filter_data = {} if f.op.lower() in ['is_null', 'is_not_null']: clause = f"`{target_field}` {sql_op}" else: p_name = get_param_name() if f.op.lower() == 'in': clause = f"`{target_field}` IN (:{p_name})"; filter_data[p_name] = f.value elif f.op.lower() in ['contains', 'icontains']: clause = f"`{target_field}` LIKE :{p_name}"; filter_data[p_name] = f"%{f.value}%" elif f.op.lower() in ['startswith', 'istartswith']: clause = f"`{target_field}` LIKE :{p_name}"; filter_data[p_name] = f"{f.value}%" elif f.op.lower() in ['endswith', 'iendswith']: clause = f"`{target_field}` LIKE :{p_name}"; filter_data[p_name] = f"%{f.value}" else: clause = f"`{target_field}` {sql_op} :{p_name}"; filter_data[p_name] = f.value return clause, filter_data sql_where = process_node(search_query, 1) return (f"AND ({sql_where})", data) if sql_where else ("", {})