Files
OSIT-AE-API-FastAPI/app/lib_sql_crud.py
Scott Idem 3d89e95c24 fix(P2): add OperationalError retry to sql_insert, sql_select, sql_insert_or_update
All three were missing the transient-connection retry that sql_update and
run_sql_select already had. On OperationalError (stale/dropped connection),
each now retries once with a fresh engine.connect() without disposing the pool.

IntegrityError (duplicate key, FK violation, NOT NULL) continues to return
None without retrying — the same data would fail again and None signals a
data conflict to callers, distinct from False (error) or an int (success).

sql_insert_or_update retry is safe because ON DUPLICATE KEY UPDATE is idempotent.
sql_insert retry is safe because OperationalError means MariaDB rolled back.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-17 19:41:26 -04:00

437 lines
17 KiB
Python

"""
Standardized SQL CRUD operations for the Aether API.
Provides high-level helpers for INSERT, UPDATE, SELECT, and DELETE.
"""
import logging
import json
from typing import Any, List, Optional
from sqlalchemy import text, Time
from sqlalchemy.exc import IntegrityError, OperationalError, ProgrammingError
from app.log import log, logger_reset
# CRITICAL: Import the core module to access current global state
from app import lib_sql_core
from app.lib_sql_core import set_last_sql_error
# log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
# Helper for resolving random IDs
from app.lib_redis_helpers import lookup_id_random_pop
# ### BEGIN ### API DB SQL ### sql_insert() ###
@logger_reset
def sql_insert(
sql: str|None = None,
data: dict|None = None,
table_name: str|None = None,
rm_id_random: bool = False,
id_random_length: int = 8,
log_lvl: int = logging.WARNING,
) -> None|bool|int:
log.setLevel(log_lvl)
if sql:
sql_insert_stmt = text(sql)
elif table_name and data:
if rm_id_random:
data = lookup_id_random_pop(obj_data=data)
if not data.get('id_random', None) and id_random_length:
import secrets
data['id_random'] = secrets.token_urlsafe(id_random_length)
fields = []
values = []
for key, value in data.items():
if key != 'id':
fields.append('`'+str(key)+'`')
values.append(':'+str(key))
if isinstance(value, (dict, list)):
data[key] = json.dumps(value)
sql_insert_stmt = text(f"INSERT INTO `{table_name}` ({', '.join(fields)}) VALUES ({', '.join(values)});")
else:
log.error('SQL INSERT statement could not be created. Missing params.')
return False
trans = None
try:
with lib_sql_core.engine.connect() as conn:
trans = conn.begin()
result_insert = conn.execute(sql_insert_stmt, data)
trans.commit()
if result_insert.rowcount == 1 and result_insert.lastrowid > 0:
return result_insert.lastrowid
return False
except IntegrityError as e:
# Data constraint violation (duplicate key, FK mismatch, NOT NULL) — do NOT retry;
# the same data would fail again. Return None so callers can distinguish from errors.
if trans: trans.rollback()
log.error('Integrity error (likely duplicate). Returning None')
log.debug(e)
set_last_sql_error(e)
return None
except OperationalError:
# Transient connection failure. The broken connection rolls back on MariaDB's side,
# so retrying with a fresh connection is safe.
if trans: trans.rollback()
log.warning('Operational error in sql_insert. Retrying once with fresh connection...')
try:
with lib_sql_core.engine.connect() as conn:
trans = conn.begin()
result_insert = conn.execute(sql_insert_stmt, data)
trans.commit()
if result_insert.rowcount == 1 and result_insert.lastrowid > 0:
return result_insert.lastrowid
return False
except Exception as e:
set_last_sql_error(e)
return False
except Exception as e:
if trans: trans.rollback()
log.error('Unknown exception in sql_insert. Returning False')
log.exception(e)
set_last_sql_error(e)
return False
# ### END ### API DB SQL ### sql_insert() ###
# ### BEGIN ### API DB SQL ### sql_update() ###
@logger_reset
def sql_update(
sql: str|None = None,
data: dict|None = None,
table_name: str|None = None,
record_id: int|None = None,
record_id_random: str|None = None,
rm_id_random: bool = False,
id_random_length: None|int = None,
log_lvl: int = logging.WARNING,
):
log.setLevel(log_lvl)
if sql:
sql_update_stmt = text(sql)
elif table_name and data:
if rm_id_random:
data = lookup_id_random_pop(obj_data=data)
if not data.get('id_random', None) and id_random_length:
import secrets
data['id_random'] = secrets.token_urlsafe(id_random_length)
field_list = []
for key, value in data.items():
if key != 'id':
field_list.append('`'+str(key) + '` = :' + str(key))
if isinstance(value, (dict, list)):
data[key] = json.dumps(value)
sql_set = ', '.join(field_list)
if len(sql_set) < 4:
return None
if record_id is not None:
data['id'] = record_id
sql_update_stmt = text(f'UPDATE `{table_name}` SET {sql_set} WHERE id = :id')
elif record_id_random:
data['id_random'] = record_id_random
sql_update_stmt = text(f'UPDATE `{table_name}` SET {sql_set} WHERE id_random = :id_random')
elif 'id' in data:
sql_update_stmt = text(f'UPDATE `{table_name}` SET {sql_set} WHERE id = :id')
elif 'id_random' in data:
sql_update_stmt = text(f'UPDATE `{table_name}` SET {sql_set} WHERE id_random = :id_random')
else:
return False
else:
return False
trans = None
try:
with lib_sql_core.engine.connect() as conn:
trans = conn.begin()
result_update = conn.execute(sql_update_stmt, data)
trans.commit()
if result_update.rowcount >= 1:
return True
return None
except OperationalError:
if trans: trans.rollback()
log.error('Operational error (gone away?). Retrying once...')
try:
with lib_sql_core.engine.connect() as conn:
trans = conn.begin()
result_update = conn.execute(sql_update_stmt, data)
trans.commit()
if result_update.rowcount >= 1:
return True
return None
except Exception as e:
set_last_sql_error(e)
return False
except Exception as e:
if trans: trans.rollback()
log.exception(e)
set_last_sql_error(e)
return False
# ### END ### API DB SQL ### sql_update() ###
# ### BEGIN ### Core Help CRUD ### sql_insert_or_update() ###
@logger_reset
def sql_insert_or_update(
sql: str|None = None,
data: dict|None = None,
table_name: str|None = None,
rm_id_random: bool = False,
id_random_length: int|None = None,
log_lvl: int = logging.DEBUG,
):
log.setLevel(log_lvl)
if sql:
stmt = text(sql)
elif table_name and data:
if rm_id_random:
data = lookup_id_random_pop(obj_data=data)
if not data.get('id_random', None) and id_random_length:
import secrets
data['id_random'] = secrets.token_urlsafe(id_random_length)
fields = [f'`{k}`' for k in data.keys() if k != 'id']
placeholders = [f':{k}' for k in data.keys() if k != 'id']
updates = [f'`{k}` = :{k}' for k in data.keys() if k != 'id']
for k, v in data.items():
if isinstance(v, (dict, list)):
data[k] = json.dumps(v)
stmt = text(f"INSERT INTO `{table_name}` ({', '.join(fields)}) VALUES ({', '.join(placeholders)}) "
f"ON DUPLICATE KEY UPDATE {', '.join(updates)};")
else:
return False
trans = None
try:
with lib_sql_core.engine.connect() as conn:
trans = conn.begin()
res = conn.execute(stmt, data)
trans.commit()
return res.lastrowid if res.lastrowid > 0 else True
except OperationalError:
# ON DUPLICATE KEY UPDATE is idempotent — safe to retry.
if trans: trans.rollback()
log.warning('Operational error in sql_insert_or_update. Retrying once...')
try:
with lib_sql_core.engine.connect() as conn:
trans = conn.begin()
res = conn.execute(stmt, data)
trans.commit()
return res.lastrowid if res.lastrowid > 0 else True
except Exception as e:
set_last_sql_error(e)
return False
except Exception as e:
if trans: trans.rollback()
log.exception(e)
return False
# ### END ### Core Help CRUD ### sql_insert_or_update() ###
# ### BEGIN ### Core Help CRUD ### sql_select() ###
@logger_reset
def sql_select(
table_name: str|None = None,
record_id: int|None = None,
record_id_random: str|None = None,
field_name: str|None = None,
field_value = None,
enabled: str|None = None,
hidden: str|None = None,
qry_dict_li: dict|None = None,
fulltext_qry_dict: dict|None = None,
and_qry_dict: dict|None = None,
and_like_dict: dict|None = None,
or_like_dict: dict|None = None,
and_in_dict_li: dict|None = None,
search_query: Any|None = None,
searchable_fields: List[str]|None = None,
order_by_li: dict|None = None,
limit: int = 9999999,
offset: int = 0,
sql: str|None = None,
data: dict|None = None,
rm_id_random: bool = False,
as_dict: bool|None = True,
as_list: bool|None = False,
max_count: int = 100000,
log_lvl: int = logging.WARNING,
) -> None|bool|dict|list:
from app.lib_sql_search import (
sql_enable_part, sql_hidden_part, sql_search_qry_part,
sql_where_qry_part, sql_fulltext_qry_part, sql_and_qry_part,
sql_and_like_part, sql_or_like_part, sql_and_in_dict_li_part
)
log.setLevel(log_lvl)
sql_limit_offset = f'LIMIT {limit} OFFSET {offset}' if limit >= 0 and offset >= 0 else ''
sql_order_by = ''
if order_by_li and isinstance(order_by_li, dict):
order_by_str_li = [f'`{table_name}`.`{k}` {v}' for k, v in order_by_li.items()]
sql_order_by = f"ORDER BY {', '.join(order_by_str_li)}"
if table_name and record_id is None and not (record_id_random or field_name or field_value or sql or data):
data = {}
s_en, d_en = sql_enable_part(table_name, enabled) if enabled else ('', None)
s_hi, d_hi = sql_hidden_part(table_name, hidden) if hidden else ('', None)
if d_en is not None: data['enabled'] = d_en
if d_hi is not None: data['hidden'] = d_hi
s_search, d_search = ('', {})
if search_query:
s_search, d_search = sql_search_qry_part(search_query, searchable_fields, table_name=table_name)
data.update(d_search)
stmt = text(f"SELECT * FROM `{table_name}` WHERE 1=1 {s_search} {s_en} {s_hi} {sql_order_by} {sql_limit_offset};")
elif table_name and (record_id is not None or record_id_random) and not (field_name or field_value or sql or data):
data = {'rid': record_id} if record_id is not None else {'ridr': record_id_random}
where = f"`{table_name}`.id = :rid" if record_id is not None else f"`{table_name}`.id_random = :ridr"
stmt = text(f"SELECT * FROM `{table_name}` WHERE {where} {sql_order_by} {sql_limit_offset};")
elif table_name and field_name and field_value and not (record_id is not None or record_id_random or sql or data):
data = {field_name: field_value}
s_where, d_where = sql_where_qry_part(qry_dict_li) if qry_dict_li else ('', {})
s_ft, d_ft = sql_fulltext_qry_part(fulltext_qry_dict) if fulltext_qry_dict else ('', {})
s_and, d_and = sql_and_qry_part(and_qry_dict) if and_qry_dict else ('', {})
s_alike, d_alike = sql_and_like_part(and_like_dict) if and_like_dict else ('', {})
s_olike, d_olike = sql_or_like_part(or_like_dict) if or_like_dict else ('', {})
s_in, d_in = sql_and_in_dict_li_part(and_in_dict_li) if and_in_dict_li else ('', {})
s_search, d_search = sql_search_qry_part(search_query, searchable_fields, table_name=table_name) if search_query else ('', {})
s_en, d_en = sql_enable_part(table_name, enabled) if enabled else ('', None)
s_hi, d_hi = sql_hidden_part(table_name, hidden) if hidden else ('', None)
data.update(d_where); data.update(d_ft); data.update(d_and); data.update(d_alike)
data.update(d_olike); data.update(d_in); data.update(d_search)
if d_en is not None: data['enabled'] = d_en
if d_hi is not None: data['hidden'] = d_hi
stmt = text(f"SELECT * FROM `{table_name}` WHERE `{table_name}`.{field_name} = :{field_name} "
f"{s_where} {s_ft} {s_and} {s_alike} {s_olike} {s_in} {s_search} {s_en} {s_hi} {sql_order_by} {sql_limit_offset};")
elif table_name and data and not (record_id or record_id_random or field_name or field_value or sql):
if rm_id_random: data = lookup_id_random_pop(obj_data=data)
where_clauses = [f"`{table_name}`.{k} = :{k}" for k in data.keys()]
stmt = text(f"SELECT * FROM `{table_name}` WHERE {' AND '.join(where_clauses)} {sql_order_by} {sql_limit_offset};")
elif sql:
stmt = text(sql)
else:
return False
try:
with lib_sql_core.engine.connect() as conn:
result = conn.execute(stmt, data)
if not result:
return [] if as_list else None
# Fetch all rows first to determine actual count reliably
if hasattr(result, 'returns_rows') and not result.returns_rows:
log.warning("SQL Result does not return rows (ResourceClosedError prevented).")
return [] if as_list else None
rows = result.all()
except OperationalError:
# Transient connection failure — reads are always safe to retry.
log.error('Operational error in sql_select. Retrying once with fresh connection...')
try:
with lib_sql_core.engine.connect() as conn:
result = conn.execute(stmt, data)
if not result:
return [] if as_list else None
if hasattr(result, 'returns_rows') and not result.returns_rows:
return [] if as_list else None
rows = result.all()
except Exception as e:
log.error(f"SQL Fetch Error on retry: {e}")
set_last_sql_error(e)
return False
except Exception as e:
log.error(f"SQL Fetch Error: {e}")
set_last_sql_error(e)
return False
count = len(rows)
if count == 0:
return [] if as_list else None
if count == 1:
record = dict(rows[0]) if as_dict else rows[0]
return [record] if as_list else record
# count > 1
records = [dict(r) for r in rows] if as_dict else rows
return records
# ### END ### Core Help CRUD ### sql_select() ###
# ### BEGIN ### API DB SQL ### run_sql_select() ###
@logger_reset
def run_sql_select(
sql: text,
data: dict|None = None,
log_lvl: int = logging.WARNING,
) -> Any:
log.setLevel(log_lvl)
try:
with lib_sql_core.engine.connect() as conn:
return conn.execute(sql, data)
except (OperationalError, ProgrammingError) as e:
log.error(f'DB Error: {e}. Retrying once...')
try:
with lib_sql_core.engine.connect() as conn:
return conn.execute(sql, data)
except Exception as e2:
set_last_sql_error(e2)
raise e2 # RAISING instead of returning False
except Exception as e:
log.exception(e)
set_last_sql_error(e)
raise e # RAISING instead of returning False
# ### END ### API DB SQL ### run_sql_select() ###
# ### BEGIN ### Core Help CRUD ### sql_delete() ###
@logger_reset
def sql_delete(
table_name: str|None = None,
record_id: int|None = None,
record_id_random: str|None = None,
field_name: str|None = None,
field_value = None,
sql: str|None = None,
data: dict|None = None,
log_lvl: int = logging.INFO,
) -> None|bool:
log.setLevel(log_lvl)
if table_name and (record_id is not None or record_id_random) and not (field_name or field_value or sql or data):
data = {'rid': record_id} if record_id is not None else {'ridr': record_id_random}
where = f"`{table_name}`.id = :rid" if record_id is not None else f"`{table_name}`.id_random = :ridr"
stmt = text(f"DELETE FROM `{table_name}` WHERE {where}")
elif table_name and field_name and field_value and not (record_id is not None or record_id_random or sql or data):
data = {field_name: field_value}
stmt = text(f"DELETE FROM `{table_name}` WHERE `{table_name}`.{field_name} = :{field_name}")
elif sql:
stmt = text(sql)
else:
return False
try:
with lib_sql_core.engine.connect() as conn:
result = conn.execute(stmt, data) if data else conn.execute(stmt)
return True if result.rowcount >= 1 else None
except Exception as e:
log.exception(e)
return False
# ### END ### Core Help CRUD ### sql_delete() ###