Files
OSIT-AE-API-FastAPI/app/lib_sql_crud.py
Scott Idem c7444a8a89 fix: remove pool-nuking reconnect_db() from OperationalError retry paths
On OperationalError, sql_update and run_sql_select were calling
sql_connect() → reconnect_db() which disposes the entire connection
pool mid-flight, killing other in-flight connections under concurrency.

Removed the sql_connect() calls; the existing retry blocks already open
a fresh engine.connect() context manager, and pool_pre_ping=True handles
stale connection detection. Also drops the now-unused sql_connect import.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-17 17:24:47 -04:00

391 lines
15 KiB
Python

"""
Standardized SQL CRUD operations for the Aether API.
Provides high-level helpers for INSERT, UPDATE, SELECT, and DELETE.
"""
import logging
import json
from typing import Any, List, Optional
from sqlalchemy import text, Time
from sqlalchemy.exc import IntegrityError, OperationalError, ProgrammingError
from app.log import log, logger_reset
# CRITICAL: Import the core module to access current global state
from app import lib_sql_core
from app.lib_sql_core import set_last_sql_error
# log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
# Helper for resolving random IDs
from app.lib_redis_helpers import lookup_id_random_pop
# ### BEGIN ### API DB SQL ### sql_insert() ###
@logger_reset
def sql_insert(
sql: str|None = None,
data: dict|None = None,
table_name: str|None = None,
rm_id_random: bool = False,
id_random_length: int = 8,
log_lvl: int = logging.WARNING,
) -> None|bool|int:
log.setLevel(log_lvl)
if sql:
sql_insert_stmt = text(sql)
elif table_name and data:
if rm_id_random:
data = lookup_id_random_pop(obj_data=data)
if not data.get('id_random', None) and id_random_length:
import secrets
data['id_random'] = secrets.token_urlsafe(id_random_length)
fields = []
values = []
for key, value in data.items():
if key != 'id':
fields.append('`'+str(key)+'`')
values.append(':'+str(key))
if isinstance(value, (dict, list)):
data[key] = json.dumps(value)
sql_insert_stmt = text(f"INSERT INTO `{table_name}` ({', '.join(fields)}) VALUES ({', '.join(values)});")
else:
log.error('SQL INSERT statement could not be created. Missing params.')
return False
trans = None
try:
with lib_sql_core.engine.connect() as conn:
trans = conn.begin()
result_insert = conn.execute(sql_insert_stmt, data)
trans.commit()
if result_insert.rowcount == 1 and result_insert.lastrowid > 0:
return result_insert.lastrowid
return False
except IntegrityError as e:
if trans: trans.rollback()
log.error('Integrity error (likely duplicate). Returning None')
log.debug(e)
set_last_sql_error(e)
return None
except Exception as e:
if trans: trans.rollback()
log.error('Unknown exception in sql_insert. Returning False')
log.exception(e)
set_last_sql_error(e)
return False
# ### END ### API DB SQL ### sql_insert() ###
# ### BEGIN ### API DB SQL ### sql_update() ###
@logger_reset
def sql_update(
sql: str|None = None,
data: dict|None = None,
table_name: str|None = None,
record_id: int|None = None,
record_id_random: str|None = None,
rm_id_random: bool = False,
id_random_length: None|int = None,
log_lvl: int = logging.WARNING,
):
log.setLevel(log_lvl)
if sql:
sql_update_stmt = text(sql)
elif table_name and data:
if rm_id_random:
data = lookup_id_random_pop(obj_data=data)
if not data.get('id_random', None) and id_random_length:
import secrets
data['id_random'] = secrets.token_urlsafe(id_random_length)
field_list = []
for key, value in data.items():
if key != 'id':
field_list.append('`'+str(key) + '` = :' + str(key))
if isinstance(value, (dict, list)):
data[key] = json.dumps(value)
sql_set = ', '.join(field_list)
if len(sql_set) < 4:
return None
if record_id is not None:
data['id'] = record_id
sql_update_stmt = text(f'UPDATE `{table_name}` SET {sql_set} WHERE id = :id')
elif record_id_random:
data['id_random'] = record_id_random
sql_update_stmt = text(f'UPDATE `{table_name}` SET {sql_set} WHERE id_random = :id_random')
elif 'id' in data:
sql_update_stmt = text(f'UPDATE `{table_name}` SET {sql_set} WHERE id = :id')
elif 'id_random' in data:
sql_update_stmt = text(f'UPDATE `{table_name}` SET {sql_set} WHERE id_random = :id_random')
else:
return False
else:
return False
trans = None
try:
with lib_sql_core.engine.connect() as conn:
trans = conn.begin()
result_update = conn.execute(sql_update_stmt, data)
trans.commit()
if result_update.rowcount >= 1:
return True
return None
except OperationalError:
if trans: trans.rollback()
log.error('Operational error (gone away?). Retrying once...')
try:
with lib_sql_core.engine.connect() as conn:
trans = conn.begin()
result_update = conn.execute(sql_update_stmt, data)
trans.commit()
if result_update.rowcount >= 1:
return True
return None
except Exception as e:
set_last_sql_error(e)
return False
except Exception as e:
if trans: trans.rollback()
log.exception(e)
set_last_sql_error(e)
return False
# ### END ### API DB SQL ### sql_update() ###
# ### BEGIN ### Core Help CRUD ### sql_insert_or_update() ###
@logger_reset
def sql_insert_or_update(
sql: str|None = None,
data: dict|None = None,
table_name: str|None = None,
rm_id_random: bool = False,
id_random_length: int|None = None,
log_lvl: int = logging.DEBUG,
):
log.setLevel(log_lvl)
if sql:
stmt = text(sql)
elif table_name and data:
if rm_id_random:
data = lookup_id_random_pop(obj_data=data)
if not data.get('id_random', None) and id_random_length:
import secrets
data['id_random'] = secrets.token_urlsafe(id_random_length)
fields = [f'`{k}`' for k in data.keys() if k != 'id']
placeholders = [f':{k}' for k in data.keys() if k != 'id']
updates = [f'`{k}` = :{k}' for k in data.keys() if k != 'id']
for k, v in data.items():
if isinstance(v, (dict, list)):
data[k] = json.dumps(v)
stmt = text(f"INSERT INTO `{table_name}` ({', '.join(fields)}) VALUES ({', '.join(placeholders)}) "
f"ON DUPLICATE KEY UPDATE {', '.join(updates)};")
else:
return False
trans = None
try:
with lib_sql_core.engine.connect() as conn:
trans = conn.begin()
res = conn.execute(stmt, data)
trans.commit()
return res.lastrowid if res.lastrowid > 0 else True
except Exception as e:
if trans: trans.rollback()
log.exception(e)
return False
# ### END ### Core Help CRUD ### sql_insert_or_update() ###
# ### BEGIN ### Core Help CRUD ### sql_select() ###
@logger_reset
def sql_select(
table_name: str|None = None,
record_id: int|None = None,
record_id_random: str|None = None,
field_name: str|None = None,
field_value = None,
enabled: str|None = None,
hidden: str|None = None,
qry_dict_li: dict|None = None,
fulltext_qry_dict: dict|None = None,
and_qry_dict: dict|None = None,
and_like_dict: dict|None = None,
or_like_dict: dict|None = None,
and_in_dict_li: dict|None = None,
search_query: Any|None = None,
searchable_fields: List[str]|None = None,
order_by_li: dict|None = None,
limit: int = 9999999,
offset: int = 0,
sql: str|None = None,
data: dict|None = None,
rm_id_random: bool = False,
as_dict: bool|None = True,
as_list: bool|None = False,
max_count: int = 100000,
log_lvl: int = logging.WARNING,
) -> None|bool|dict|list:
from app.lib_sql_search import (
sql_enable_part, sql_hidden_part, sql_search_qry_part,
sql_where_qry_part, sql_fulltext_qry_part, sql_and_qry_part,
sql_and_like_part, sql_or_like_part, sql_and_in_dict_li_part
)
log.setLevel(log_lvl)
sql_limit_offset = f'LIMIT {limit} OFFSET {offset}' if limit >= 0 and offset >= 0 else ''
sql_order_by = ''
if order_by_li and isinstance(order_by_li, dict):
order_by_str_li = [f'`{table_name}`.`{k}` {v}' for k, v in order_by_li.items()]
sql_order_by = f"ORDER BY {', '.join(order_by_str_li)}"
if table_name and record_id is None and not (record_id_random or field_name or field_value or sql or data):
data = {}
s_en, d_en = sql_enable_part(table_name, enabled) if enabled else ('', None)
s_hi, d_hi = sql_hidden_part(table_name, hidden) if hidden else ('', None)
if d_en is not None: data['enabled'] = d_en
if d_hi is not None: data['hidden'] = d_hi
s_search, d_search = ('', {})
if search_query:
s_search, d_search = sql_search_qry_part(search_query, searchable_fields, table_name=table_name)
data.update(d_search)
stmt = text(f"SELECT * FROM `{table_name}` WHERE 1=1 {s_search} {s_en} {s_hi} {sql_order_by} {sql_limit_offset};")
elif table_name and (record_id is not None or record_id_random) and not (field_name or field_value or sql or data):
data = {'rid': record_id} if record_id is not None else {'ridr': record_id_random}
where = f"`{table_name}`.id = :rid" if record_id is not None else f"`{table_name}`.id_random = :ridr"
stmt = text(f"SELECT * FROM `{table_name}` WHERE {where} {sql_order_by} {sql_limit_offset};")
elif table_name and field_name and field_value and not (record_id is not None or record_id_random or sql or data):
data = {field_name: field_value}
s_where, d_where = sql_where_qry_part(qry_dict_li) if qry_dict_li else ('', {})
s_ft, d_ft = sql_fulltext_qry_part(fulltext_qry_dict) if fulltext_qry_dict else ('', {})
s_and, d_and = sql_and_qry_part(and_qry_dict) if and_qry_dict else ('', {})
s_alike, d_alike = sql_and_like_part(and_like_dict) if and_like_dict else ('', {})
s_olike, d_olike = sql_or_like_part(or_like_dict) if or_like_dict else ('', {})
s_in, d_in = sql_and_in_dict_li_part(and_in_dict_li) if and_in_dict_li else ('', {})
s_search, d_search = sql_search_qry_part(search_query, searchable_fields, table_name=table_name) if search_query else ('', {})
s_en, d_en = sql_enable_part(table_name, enabled) if enabled else ('', None)
s_hi, d_hi = sql_hidden_part(table_name, hidden) if hidden else ('', None)
data.update(d_where); data.update(d_ft); data.update(d_and); data.update(d_alike)
data.update(d_olike); data.update(d_in); data.update(d_search)
if d_en is not None: data['enabled'] = d_en
if d_hi is not None: data['hidden'] = d_hi
stmt = text(f"SELECT * FROM `{table_name}` WHERE `{table_name}`.{field_name} = :{field_name} "
f"{s_where} {s_ft} {s_and} {s_alike} {s_olike} {s_in} {s_search} {s_en} {s_hi} {sql_order_by} {sql_limit_offset};")
elif table_name and data and not (record_id or record_id_random or field_name or field_value or sql):
if rm_id_random: data = lookup_id_random_pop(obj_data=data)
where_clauses = [f"`{table_name}`.{k} = :{k}" for k in data.keys()]
stmt = text(f"SELECT * FROM `{table_name}` WHERE {' AND '.join(where_clauses)} {sql_order_by} {sql_limit_offset};")
elif sql:
stmt = text(sql)
else:
return False
try:
with lib_sql_core.engine.connect() as conn:
result = conn.execute(stmt, data)
if not result:
return [] if as_list else None
# Fetch all rows first to determine actual count reliably
if hasattr(result, 'returns_rows') and not result.returns_rows:
log.warning("SQL Result does not return rows (ResourceClosedError prevented).")
return [] if as_list else None
rows = result.all()
except Exception as e:
log.error(f"SQL Fetch Error: {e}")
set_last_sql_error(e)
return False
count = len(rows)
if count == 0:
return [] if as_list else None
if count == 1:
record = dict(rows[0]) if as_dict else rows[0]
return [record] if as_list else record
# count > 1
records = [dict(r) for r in rows] if as_dict else rows
return records
# ### END ### Core Help CRUD ### sql_select() ###
# ### BEGIN ### API DB SQL ### run_sql_select() ###
@logger_reset
def run_sql_select(
sql: text,
data: dict|None = None,
log_lvl: int = logging.WARNING,
) -> Any:
log.setLevel(log_lvl)
try:
with lib_sql_core.engine.connect() as conn:
return conn.execute(sql, data)
except (OperationalError, ProgrammingError) as e:
log.error(f'DB Error: {e}. Retrying once...')
try:
with lib_sql_core.engine.connect() as conn:
return conn.execute(sql, data)
except Exception as e2:
set_last_sql_error(e2)
raise e2 # RAISING instead of returning False
except Exception as e:
log.exception(e)
set_last_sql_error(e)
raise e # RAISING instead of returning False
# ### END ### API DB SQL ### run_sql_select() ###
# ### BEGIN ### Core Help CRUD ### sql_delete() ###
@logger_reset
def sql_delete(
table_name: str|None = None,
record_id: int|None = None,
record_id_random: str|None = None,
field_name: str|None = None,
field_value = None,
sql: str|None = None,
data: dict|None = None,
log_lvl: int = logging.INFO,
) -> None|bool:
log.setLevel(log_lvl)
if table_name and (record_id is not None or record_id_random) and not (field_name or field_value or sql or data):
data = {'rid': record_id} if record_id is not None else {'ridr': record_id_random}
where = f"`{table_name}`.id = :rid" if record_id is not None else f"`{table_name}`.id_random = :ridr"
stmt = text(f"DELETE FROM `{table_name}` WHERE {where}")
elif table_name and field_name and field_value and not (record_id is not None or record_id_random or sql or data):
data = {field_name: field_value}
stmt = text(f"DELETE FROM `{table_name}` WHERE `{table_name}`.{field_name} = :{field_name}")
elif sql:
stmt = text(sql)
else:
return False
try:
with lib_sql_core.engine.connect() as conn:
result = conn.execute(stmt, data) if data else conn.execute(stmt)
return True if result.rowcount >= 1 else None
except Exception as e:
log.exception(e)
return False
# ### END ### Core Help CRUD ### sql_delete() ###