refactor(sql): complete modularization of search builders and ID resolution
This commit is contained in:
161
app/db_sql.py
161
app/db_sql.py
@@ -20,7 +20,8 @@ from app.lib_sql_search import (
|
||||
sql_fulltext_qry_part as _sql_fulltext_qry_part,
|
||||
sql_enable_part as _sql_enable_part,
|
||||
sql_hidden_part as _sql_hidden_part,
|
||||
sql_where_qry_part as _sql_where_qry_part
|
||||
sql_where_qry_part as _sql_where_qry_part,
|
||||
sql_search_qry_part as _sql_search_qry_part
|
||||
)
|
||||
|
||||
from app.lib_redis_helpers import (
|
||||
@@ -1768,156 +1769,32 @@ def sql_limit_offset_part(limit: int, offset: int = 0) -> bool|str:
|
||||
# ### END ### API DB SQL Methods ### sql_limit_offset_part() ###
|
||||
|
||||
|
||||
# ### BEGIN ### API DB SQL Methods ### sql_search_qry_part() ###
|
||||
# NEW 2026-01-02
|
||||
# Updated to support complex POST-based searches with recursive logical grouping.
|
||||
# Updated 2026-01-06 to handle missing default_qry_str column gracefully.
|
||||
@logger_reset
|
||||
|
||||
|
||||
def sql_search_qry_part(
|
||||
|
||||
|
||||
search_query: Any, # SearchQuery model instance
|
||||
|
||||
|
||||
searchable_fields: List[str]|None = None, # List of allowed fields
|
||||
|
||||
|
||||
max_depth: int = 5, # Maximum recursion depth
|
||||
|
||||
|
||||
table_name: str|None = None, # Target table for schema validation
|
||||
|
||||
|
||||
) -> tuple[str, dict]:
|
||||
"""
|
||||
Recursively builds a SQL WHERE clause from a SearchQuery model.
|
||||
Uses unique parameter names to prevent collisions.
|
||||
Enforces security via field allowlist and recursion depth limits.
|
||||
"""
|
||||
|
||||
|
||||
log.setLevel(logging.INFO)
|
||||
|
||||
|
||||
log.debug(locals())
|
||||
|
||||
data = {}
|
||||
param_counter = [0]
|
||||
|
||||
def get_param_name():
|
||||
param_counter[0] += 1
|
||||
return f"sp_{param_counter[0]}"
|
||||
return _sql_search_qry_part(search_query, searchable_fields, max_depth, table_name)
|
||||
|
||||
operator_map = {
|
||||
"eq": "=",
|
||||
"ne": "!=",
|
||||
"gt": ">",
|
||||
"gte": ">=",
|
||||
"lt": "<",
|
||||
"lte": "<=",
|
||||
"like": "LIKE",
|
||||
"in": "IN",
|
||||
"is_null": "IS NULL",
|
||||
"is_not_null": "IS NOT NULL",
|
||||
"contains": "LIKE",
|
||||
"icontains": "LIKE",
|
||||
"startswith": "LIKE",
|
||||
"istartswith": "LIKE",
|
||||
"endswith": "LIKE",
|
||||
"iendswith": "LIKE"
|
||||
}
|
||||
|
||||
def process_node(query_node, current_depth: int) -> str:
|
||||
if current_depth > max_depth:
|
||||
raise HTTPException(status_code=400, detail=f"Search query too complex (max depth {max_depth} reached).")
|
||||
|
||||
clauses = []
|
||||
|
||||
# Process 'query_string' (Standardized Full-Text Search)
|
||||
if hasattr(query_node, 'query_string') and query_node.query_string:
|
||||
if query_node.query_string == '%':
|
||||
# Wildcard: Skip filtering for this part
|
||||
pass
|
||||
else:
|
||||
# Check if default_qry_str exists in this table/view
|
||||
use_match = True
|
||||
if table_name:
|
||||
try:
|
||||
db.execute(text(f"SELECT default_qry_str FROM `{table_name}` LIMIT 0"))
|
||||
except:
|
||||
use_match = False
|
||||
else:
|
||||
use_match = False # Safe default if no table_name
|
||||
|
||||
if use_match:
|
||||
p_name = get_param_name()
|
||||
clauses.append(f"MATCH( default_qry_str ) AGAINST( :{p_name} IN BOOLEAN MODE )")
|
||||
data[p_name] = query_node.query_string
|
||||
elif searchable_fields:
|
||||
# Fallback: OR LIKE across all searchable fields
|
||||
like_clauses = []
|
||||
for field in searchable_fields:
|
||||
# Skip internal/numeric fields for full-text-like search
|
||||
if not any(x in field for x in ['_id', 'enable', 'hide', 'priority', 'sort', 'group', 'created_on', 'updated_on']):
|
||||
f_p_name = get_param_name()
|
||||
like_clauses.append(f"`{field}` LIKE :{f_p_name}")
|
||||
data[f_p_name] = f"%{query_node.query_string}%"
|
||||
if like_clauses:
|
||||
clauses.append(f"({' OR '.join(like_clauses)})")
|
||||
|
||||
# Process 'and' filters
|
||||
if hasattr(query_node, 'and_filters') and query_node.and_filters:
|
||||
and_clauses = []
|
||||
for item in query_node.and_filters:
|
||||
if hasattr(item, 'field'): # SearchFilter
|
||||
clause, item_data = process_filter(item)
|
||||
and_clauses.append(clause)
|
||||
data.update(item_data)
|
||||
else: # Nested SearchQuery
|
||||
and_clauses.append(f"({process_node(item, current_depth + 1)})")
|
||||
if and_clauses:
|
||||
clauses.append(f"({' AND '.join(and_clauses)})")
|
||||
|
||||
# Process 'or' filters
|
||||
if hasattr(query_node, 'or_filters') and query_node.or_filters:
|
||||
or_clauses = []
|
||||
for item in query_node.or_filters:
|
||||
if hasattr(item, 'field'): # SearchFilter
|
||||
clause, item_data = process_filter(item)
|
||||
or_clauses.append(clause)
|
||||
data.update(item_data)
|
||||
else: # Nested SearchQuery
|
||||
or_clauses.append(f"({process_node(item, current_depth + 1)})")
|
||||
if or_clauses:
|
||||
clauses.append(f"({' OR '.join(or_clauses)})")
|
||||
|
||||
return ' AND '.join(clauses)
|
||||
|
||||
def process_filter(f) -> tuple[str, dict]:
|
||||
# Field Validation: Check against allowlist
|
||||
if searchable_fields is not None and f.field not in searchable_fields:
|
||||
raise HTTPException(status_code=400, detail=f"Searching on field '{f.field}' is not permitted.")
|
||||
|
||||
op_lower = f.op.lower()
|
||||
sql_op = operator_map.get(op_lower)
|
||||
if not sql_op:
|
||||
raise HTTPException(status_code=400, detail=f"Unsupported search operator: {f.op}")
|
||||
|
||||
filter_data = {}
|
||||
if op_lower in ['is_null', 'is_not_null']:
|
||||
clause = f"`{f.field}` {sql_op}"
|
||||
elif op_lower == 'in':
|
||||
p_name = get_param_name()
|
||||
clause = f"`{f.field}` IN (:{p_name})"
|
||||
filter_data[p_name] = f.value
|
||||
elif op_lower in ['contains', 'icontains']:
|
||||
p_name = get_param_name()
|
||||
clause = f"`{f.field}` LIKE :{p_name}"
|
||||
filter_data[p_name] = f"%{f.value}%"
|
||||
elif op_lower in ['startswith', 'istartswith']:
|
||||
p_name = get_param_name()
|
||||
clause = f"`{f.field}` LIKE :{p_name}"
|
||||
filter_data[p_name] = f"{f.value}%"
|
||||
elif op_lower in ['endswith', 'iendswith']:
|
||||
p_name = get_param_name()
|
||||
clause = f"`{f.field}` LIKE :{p_name}"
|
||||
filter_data[p_name] = f"%{f.value}"
|
||||
else:
|
||||
p_name = get_param_name()
|
||||
clause = f"`{f.field}` {sql_op} :{p_name}"
|
||||
filter_data[p_name] = f.value
|
||||
|
||||
return clause, filter_data
|
||||
|
||||
# Initial processing
|
||||
sql_where = process_node(search_query, 1)
|
||||
if sql_where:
|
||||
return f"AND ({sql_where})", data
|
||||
return "", {}
|
||||
# ### END ### API DB SQL Methods ### sql_search_qry_part() ###
|
||||
Reference in New Issue
Block a user