""" Modular search builder and query generators for Aether. """ import logging from typing import Any, List, Optional from fastapi import HTTPException from sqlalchemy import text log = logging.getLogger(__name__) def sql_limit_offset_part(limit: int, offset: int = 0) -> str: """Creates a partial SQL string for LIMIT and OFFSET.""" if limit >= 0 and offset >= 0: log.info(f'Creating partial SQL string for LIMIT and OFFSET. Limit: {limit}; Offset: {offset}') return f'LIMIT {limit} OFFSET {offset}' else: return '' def sql_and_like_part(and_like_dict_obj: dict) -> tuple[str, dict]: """Creates a partial SQL string for AND LIKE queries.""" data = {} if and_like_dict_obj and isinstance(and_like_dict_obj, dict): log.info('Creating partial SQL string for additional AND LIKE queries.') clauses = [] for key, value in and_like_dict_obj.items(): clauses.append(f"{key} LIKE :and_like_{key}") data[f'and_like_{key}'] = value return f"AND ({' AND '.join(clauses)})", data return '', {} def sql_or_like_part(or_like_dict_obj: dict) -> tuple[str, dict]: """Creates a partial SQL string for OR LIKE queries.""" data = {} if or_like_dict_obj and isinstance(or_like_dict_obj, dict): log.info('Creating partial SQL string for additional OR LIKE queries.') clauses = [] for key, value in or_like_dict_obj.items(): clauses.append(f"{key} LIKE :or_like_{key}") data[f'or_like_{key}'] = value return f"AND ({' OR '.join(clauses)})", data return '', {} def sql_and_in_dict_li_part(and_in_dict_li_dict_obj: dict) -> tuple[str, dict]: """Creates a partial SQL string for AND IN queries.""" data = {} if and_in_dict_li_dict_obj and isinstance(and_in_dict_li_dict_obj, dict): log.info('Creating partial SQL string for additional AND IN queries.') clauses = [] for key, value in and_in_dict_li_dict_obj.items(): clauses.append(f"{key} IN :and_in_{key}") data[f'and_in_{key}'] = value return f"AND ({' AND '.join(clauses)})", data return '', {} def sql_and_qry_part(and_qry_dict_obj: dict) -> tuple[str, dict]: """Creates a partial SQL string for additional AND queries (equals).""" data = {} if and_qry_dict_obj and isinstance(and_qry_dict_obj, dict): log.info('Creating partial SQL string for additional AND queries.') clauses = [] for key, value in and_qry_dict_obj.items(): clauses.append(f"{key} = :and_{key}") data[f'and_{key}'] = value return f"AND ({' AND '.join(clauses)})", data return '', {} def sql_fulltext_qry_part(fulltext_qry_dict: dict) -> tuple[str, dict]: """Creates a partial SQL string for fulltext search.""" data = {} if fulltext_qry_dict and isinstance(fulltext_qry_dict, dict): log.info('Creating partial SQL string for fulltext search.') clauses = [] for key, value in fulltext_qry_dict.items(): clauses.append(f"MATCH( {key} ) AGAINST( :ft_{key} IN BOOLEAN MODE )") data[f'ft_{key}'] = value return f"AND ({' OR '.join(clauses)})", data return '', {} def sql_enable_part(table_name: str, enabled: str) -> tuple[str, bool|None]: """Handles enabled/disabled status filtering with schema check.""" from app import lib_sql_core if not table_name: return '', None if enabled in ['enabled', 'disabled', 'all']: if enabled == 'all': return '', None try: with lib_sql_core.engine.connect() as conn: conn.execute(text(f"SELECT enable FROM `{table_name}` LIMIT 0")) except: log.warning(f"Table '{table_name}' missing 'enable' column. Skipping filter.") return '', None val = (enabled == 'enabled') return f"AND `{table_name}`.enable = {str(val).lower()}", val return '', None def sql_hidden_part(table_name: str, hidden: str) -> tuple[str, bool|None]: """Handles hidden status filtering with schema check.""" from app import lib_sql_core if not table_name: return '', None if hidden in ['hidden', 'not_hidden', 'all']: if hidden == 'all': return '', None try: with lib_sql_core.engine.connect() as conn: conn.execute(text(f"SELECT hide FROM `{table_name}` LIMIT 0")) except: log.warning(f"Table '{table_name}' missing 'hide' column. Skipping filter.") return '', None if hidden == 'hidden': return f"AND `{table_name}`.hide = true", True return f"AND (`{table_name}`.hide = false OR `{table_name}`.hide IS NULL)", False return '', None def sql_where_qry_part(qry_dict_li: list) -> tuple[str, dict]: """Standard v2 style WHERE clause builder.""" data = {} if qry_dict_li and isinstance(qry_dict_li, list): log.info('Creating partial SQL string for WHERE queries.') clauses = [] for qry in qry_dict_li: field = qry.get('field') op = qry.get('operator') val = qry.get('value') type_ = qry.get('type', 'AND') or 'AND' if op == 'MATCH': clauses.append(f'{type_} MATCH( {field} ) AGAINST( :{field} IN BOOLEAN MODE )') else: clauses.append(f'{type_} {field} {op} :{field}') data[field] = val return ' '.join(clauses), data return '', {} def sql_search_qry_part( search_query: Any, searchable_fields: List[str]|None = None, max_depth: int = 5, table_name: str|None = None, ) -> tuple[str, dict]: """Recursively builds a SQL WHERE clause from a SearchQuery model.""" from app import lib_sql_core data = {} param_counter = [0] def get_param_name(): param_counter[0] += 1 return f"sp_{param_counter[0]}" operator_map = { "eq": "=", "ne": "!=", "gt": ">", "gte": ">=", "lt": "<", "lte": "<=", "like": "LIKE", "in": "IN", "is_null": "IS NULL", "is_not_null": "IS NOT NULL", "contains": "LIKE", "icontains": "LIKE", "startswith": "LIKE", "istartswith": "LIKE", "endswith": "LIKE", "iendswith": "LIKE" } def process_node(query_node, current_depth: int) -> str: if current_depth > max_depth: raise HTTPException(status_code=400, detail=f"Search query too complex.") clauses = [] if hasattr(query_node, 'query_string') and query_node.query_string: if query_node.query_string == '%': pass else: use_match = True if table_name: try: with lib_sql_core.engine.connect() as conn: conn.execute(text(f"SELECT default_qry_str FROM `{table_name}` LIMIT 0")) except: use_match = False else: use_match = False if use_match: p_name = get_param_name() clauses.append(f"MATCH( default_qry_str ) AGAINST( :{p_name} IN BOOLEAN MODE )") data[p_name] = query_node.query_string elif searchable_fields: like_clauses = [] # Fields to exclude from a generic text 'q' search (numeric, technical, or date fields) exclude_patterns = [ 'enable', 'hide', 'priority', 'sort', 'group', 'created_on', 'updated_on' ] for field in searchable_fields: # Exclude internal integer IDs specifically if field.endswith('_id') or field == 'id': continue # Exclude other technical/meta fields if any(x == field for x in exclude_patterns): continue f_p_name = get_param_name() like_clauses.append(f"`{field}` LIKE :{f_p_name}") data[f_p_name] = f"%{query_node.query_string}%" if like_clauses: clauses.append(f"({' OR '.join(like_clauses)})") for filter_attr in ['and_filters', 'or_filters']: if hasattr(query_node, filter_attr) and getattr(query_node, filter_attr): node_clauses = [] for item in getattr(query_node, filter_attr): if hasattr(item, 'field'): clause, item_data = process_filter(item) node_clauses.append(clause); data.update(item_data) else: node_clauses.append(f"({process_node(item, current_depth + 1)})") if node_clauses: joiner = ' AND ' if 'and' in filter_attr else ' OR ' clauses.append(f"({joiner.join(node_clauses)})") return ' AND '.join(clauses) def process_filter(f) -> tuple[str, dict]: # --- ID VISION MAPPING --- # If the frontend uses clean names (id, account_id), # map them to the database columns (id_random, account_id_random) # ONLY if those columns actually exist in this table/view. target_field = f.field vision_fields = [ 'id', 'account_id', 'site_id', 'person_id', 'user_id', 'archive_id', 'archive_content_id', 'event_id', 'event_session_id', 'event_presentation_id', 'event_presenter_id', 'event_device_id', 'event_location_id', 'event_track_id', 'event_exhibit_id', 'event_person_id', 'event_registration_id', 'order_id', 'product_id', 'order_cart_id', 'membership_id', 'sponsorship_id', 'journal_id', 'journal_entry_id', 'page_id', 'post_id', 'post_comment_id', 'organization_id', 'address_id', 'contact_id', 'hosted_file_id' ] if target_field in vision_fields: # ONLY map to _random if the value is a string (looks like a random ID) # If it's an integer, we want to query the original integer column. is_int_val = isinstance(f.value, int) or (isinstance(f.value, str) and f.value.isdigit()) if not is_int_val: candidate_field = 'id_random' if target_field == 'id' else f"{target_field}_random" # Schema Check: Verify if the random version exists in the current table/view use_random = False if table_name: try: with lib_sql_core.engine.connect() as conn: conn.execute(text(f"SELECT `{candidate_field}` FROM `{table_name}` LIMIT 0")) use_random = True except Exception: pass if use_random: target_field = candidate_field # print(f"Search Trace: Mapping filter field '{f.field}' -> '{target_field}'", flush=True) else: # If random doesn't exist, we must stick to the integer column # but we'll need to resolve the string value to an integer elsewhere # or rely on the user providing an integer for now. pass if searchable_fields is not None and target_field not in searchable_fields: # Fallback check for original field just in case if f.field not in searchable_fields: raise HTTPException(status_code=400, detail=f"Unauthorized search field '{f.field}' (mapped to '{target_field}')") sql_op = operator_map.get(f.op.lower()) if not sql_op: raise HTTPException(status_code=400, detail=f"Unsupported operator: {f.op}") filter_data = {} if f.op.lower() in ['is_null', 'is_not_null']: clause = f"`{target_field}` {sql_op}" else: p_name = get_param_name() if f.op.lower() == 'in': clause = f"`{target_field}` IN (:{p_name})"; filter_data[p_name] = f.value elif f.op.lower() in ['contains', 'icontains']: clause = f"`{target_field}` LIKE :{p_name}"; filter_data[p_name] = f"%{f.value}%" elif f.op.lower() in ['startswith', 'istartswith']: clause = f"`{target_field}` LIKE :{p_name}"; filter_data[p_name] = f"{f.value}%" elif f.op.lower() in ['endswith', 'iendswith']: clause = f"`{target_field}` LIKE :{p_name}"; filter_data[p_name] = f"%{f.value}" else: clause = f"`{target_field}` {sql_op} :{p_name}"; filter_data[p_name] = f.value return clause, filter_data sql_where = process_node(search_query, 1) return (f"AND ({sql_where})", data) if sql_where else ("", {})