Feature: Implement advanced POST-based search with recursive logical grouping and unique parameterization (Verified Working).
This commit is contained in:
116
app/db_sql.py
116
app/db_sql.py
@@ -1,4 +1,5 @@
|
||||
import datetime, json, pytz, random, redis, secrets
|
||||
from typing import Any, Optional
|
||||
from timeit import default_timer as timer
|
||||
|
||||
from app.config import settings
|
||||
@@ -614,6 +615,7 @@ def sql_select(
|
||||
and_like_dict: dict|None = None,
|
||||
or_like_dict: dict|None = None,
|
||||
and_in_dict_li: dict|None = None,
|
||||
search_query: Any|None = None, # NEW 2026-01-02 (SearchQuery model)
|
||||
fulltext_qry_field_li: list|None = None, # ['field_name_1', 'field_name_2']
|
||||
fulltext_qry_str: str|None = None, # 'search string'
|
||||
order_by_li: dict|None = None, # {"the_field_name": "DESC"}
|
||||
@@ -715,17 +717,21 @@ def sql_select(
|
||||
else:
|
||||
sql_hidden = ''
|
||||
# data['hidden'] = ''
|
||||
# if sql_enabled:
|
||||
# data['enable'] = sql_enable_part(table_name=table_name, enabled=enabled) # Reasonably safe return str and bool
|
||||
|
||||
# if sql_hidden:
|
||||
# sql_hidden = sql_hidden_part(table_name=table_name, hidden=hidden) # Reasonably safe return str and bool
|
||||
# sql_hidden, data['hidden'] = sql_hidden_part(table_name=table_name, hidden=hidden) # Reasonably safe return str and bool
|
||||
sql_search_qry = ''
|
||||
if search_query:
|
||||
log.info('Creating partial SQL string for complex SearchQuery.')
|
||||
sql_search_qry, data_search = sql_search_qry_part(search_query)
|
||||
data = {**data, **data_search}
|
||||
|
||||
sql = text(
|
||||
f"""
|
||||
SELECT *
|
||||
FROM `{table_name}`
|
||||
WHERE 1=1
|
||||
{sql_search_qry}
|
||||
{sql_enabled}
|
||||
{sql_hidden}
|
||||
{sql_order_by}
|
||||
{sql_limit_offset}
|
||||
;
|
||||
@@ -819,6 +825,12 @@ def sql_select(
|
||||
# NOTE: Merge the data_qry result with the data dict
|
||||
data = {**data, **data_qry}
|
||||
|
||||
sql_search_qry = ''
|
||||
if search_query:
|
||||
log.info('Creating partial SQL string for complex SearchQuery.')
|
||||
sql_search_qry, data_search = sql_search_qry_part(search_query)
|
||||
data = {**data, **data_search}
|
||||
|
||||
# # NOTE: Version 3 of the fulltext search
|
||||
# sql_fulltext_match_against = ''
|
||||
# log.debug(fulltext_qry_dict)
|
||||
@@ -883,6 +895,7 @@ def sql_select(
|
||||
{sql_and_like}
|
||||
{sql_or_like}
|
||||
{sql_and_in_dict_li}
|
||||
{sql_search_qry}
|
||||
{sql_enabled}
|
||||
{sql_hidden}
|
||||
{sql_order_by}
|
||||
@@ -2089,3 +2102,96 @@ def sql_limit_offset_part(limit: int, offset: int = 0) -> bool|str:
|
||||
else:
|
||||
return False
|
||||
# ### END ### API DB SQL Methods ### sql_limit_offset_part() ###
|
||||
|
||||
|
||||
# ### BEGIN ### API DB SQL Methods ### sql_search_qry_part() ###
|
||||
# NEW 2026-01-02
|
||||
# Updated to support complex POST-based searches with recursive logical grouping.
|
||||
@logger_reset
|
||||
def sql_search_qry_part(
|
||||
search_query: Any, # SearchQuery model instance
|
||||
) -> tuple[str, dict]:
|
||||
"""
|
||||
Recursively builds a SQL WHERE clause from a SearchQuery model.
|
||||
Uses unique parameter names to prevent collisions.
|
||||
"""
|
||||
log.setLevel(logging.INFO)
|
||||
log.debug(locals())
|
||||
|
||||
data = {}
|
||||
param_counter = [0] # List used as a reference for unique parameter names
|
||||
|
||||
def get_param_name():
|
||||
param_counter[0] += 1
|
||||
return f"sp_{param_counter[0]}"
|
||||
|
||||
operator_map = {
|
||||
"eq": "=",
|
||||
"ne": "!=",
|
||||
"gt": ">",
|
||||
"gte": ">=",
|
||||
"lt": "<",
|
||||
"lte": "<=",
|
||||
"like": "LIKE",
|
||||
"in": "IN",
|
||||
"is_null": "IS NULL",
|
||||
"is_not_null": "IS NOT NULL"
|
||||
}
|
||||
|
||||
def process_node(query_node) -> str:
|
||||
clauses = []
|
||||
|
||||
# Process 'and' filters
|
||||
if hasattr(query_node, 'and_filters') and query_node.and_filters:
|
||||
and_clauses = []
|
||||
for item in query_node.and_filters:
|
||||
if hasattr(item, 'field'): # SearchFilter
|
||||
clause, item_data = process_filter(item)
|
||||
and_clauses.append(clause)
|
||||
data.update(item_data)
|
||||
else: # Nested SearchQuery
|
||||
and_clauses.append(f"({process_node(item)})")
|
||||
if and_clauses:
|
||||
clauses.append(f"({' AND '.join(and_clauses)})")
|
||||
|
||||
# Process 'or' filters
|
||||
if hasattr(query_node, 'or_filters') and query_node.or_filters:
|
||||
or_clauses = []
|
||||
for item in query_node.or_filters:
|
||||
if hasattr(item, 'field'): # SearchFilter
|
||||
clause, item_data = process_filter(item)
|
||||
or_clauses.append(clause)
|
||||
data.update(item_data)
|
||||
else: # Nested SearchQuery
|
||||
or_clauses.append(f"({process_node(item)})")
|
||||
if or_clauses:
|
||||
clauses.append(f"({' OR '.join(or_clauses)})")
|
||||
|
||||
return ' AND '.join(clauses)
|
||||
|
||||
def process_filter(f) -> tuple[str, dict]:
|
||||
sql_op = operator_map.get(f.op.lower())
|
||||
if not sql_op:
|
||||
raise ValueError(f"Unsupported search operator: {f.op}")
|
||||
|
||||
filter_data = {}
|
||||
if f.op.lower() in ['is_null', 'is_not_null']:
|
||||
clause = f"`{f.field}` {sql_op}"
|
||||
elif f.op.lower() == 'in':
|
||||
p_name = get_param_name()
|
||||
# IN operator requires a tuple or list
|
||||
clause = f"`{f.field}` IN (:{p_name})"
|
||||
filter_data[p_name] = f.value
|
||||
else:
|
||||
p_name = get_param_name()
|
||||
clause = f"`{f.field}` {sql_op} :{p_name}"
|
||||
filter_data[p_name] = f.value
|
||||
|
||||
return clause, filter_data
|
||||
|
||||
# Initial processing
|
||||
sql_where = process_node(search_query)
|
||||
if sql_where:
|
||||
return f"AND ({sql_where})", data
|
||||
return "", {}
|
||||
# ### END ### API DB SQL Methods ### sql_search_qry_part() ###
|
||||
|
||||
Reference in New Issue
Block a user