Refactor: Modularize database logic and extract core CRUD operations

This commit is contained in:
Scott Idem
2026-01-15 17:16:48 -05:00
parent 5ece1d34e3
commit eccd71f450
3 changed files with 473 additions and 735 deletions

View File

@@ -1,281 +1,86 @@
"""
Core SQL CRUD operations and connection management for Aether.
Foundational SQL connection management for the Aether API.
Isolates the SQLAlchemy engine and global connection state to prevent circular imports.
"""
import logging
import json
from typing import Any, List, Optional
from sqlalchemy import text, Time
from sqlalchemy.exc import IntegrityError, OperationalError, ProgrammingError
import threading
from typing import Any, Optional
from sqlalchemy import create_engine
from app.config import settings
from app.log import logger_reset
# Local logger to avoid cycles
log = logging.getLogger(__name__)
log = logging.getLogger('root')
# 1. Thread-local storage for capturing last SQL error message
_sql_error_state = threading.local()
def get_last_sql_error() -> Optional[str]:
"""Retrieves and clears the last captured SQL error message."""
error = getattr(_sql_error_state, 'last_error', None)
_sql_error_state.last_error = None
return error
def set_last_sql_error(error: Any):
"""Sets the last captured SQL error message."""
_sql_error_state.last_error = str(error)
# 2. Initial Engine Setup
db_uri = settings.SQLALCHEMY_DB_URI
engine = create_engine(
url = db_uri,
echo = False,
pool_use_lifo = True,
pool_pre_ping = True,
pool_recycle = settings.DB['pool_recycle'],
isolation_level = 'READ COMMITTED',
connect_args = {'connect_timeout': settings.DB['connect_timeout']}
)
log.info('DB SQL Core: Initializing connection...')
db = None
try:
db = engine.connect()
log.info(f'DB SQL Core: Connected to database: {db_uri}')
except Exception:
log.exception('DB SQL Core: Could not connect to database.')
# 3. Connection Management Logic
def reconnect_db() -> bool:
"""
Re-initializes the global database engine and connection using current settings.
Useful after bootstrapping new credentials from the 'cfg' table.
"""
global engine, db, db_uri
log.info("DB SQL Core: Refreshing database connection engine...")
try:
if engine:
engine.dispose()
log.info("DB SQL Core: Disposed of previous database engine.")
db_uri = settings.SQLALCHEMY_DB_URI
engine = create_engine(
url = db_uri,
echo = False,
pool_use_lifo = True,
pool_pre_ping = True,
pool_recycle = settings.DB['pool_recycle'],
isolation_level = 'READ COMMITTED',
connect_args = {'connect_timeout': settings.DB['connect_timeout']}
)
db = engine.connect()
log.info(f"DB SQL Core: Database connection re-established successfully: {db_uri}")
return True
except Exception:
log.exception("DB SQL Core: FAILED to refresh database connection!")
return False
def sql_connect(current_db, log_lvl: int = logging.INFO) -> bool:
"""Disposes of the current connection pool and prepares for reconnection."""
"""Old compatibility wrapper for disposing the engine."""
if current_db:
current_db.engine.dispose()
log.info('DB SQL Core: Disposed of the current engine via sql_connect.')
return True
return False
def run_sql_select(
sql: Any,
data: dict|None = None,
commit: bool = False,
log_lvl: int = logging.WARNING,
) -> Any:
"""Executes a SQL SELECT statement with retry logic for operational errors."""
from app.db_sql import db
log.setLevel(log_lvl)
if not db:
log.error('The database connection is not available!!!')
return False
try:
if commit: trans = db.begin()
sql = sql.columns(recurring_start_time=Time, recurring_end_time=Time)
result = db.execute(sql, data) if data else db.execute(sql)
if commit: trans.commit()
return result
except (OperationalError, ProgrammingError) as e:
log.error(f'DB Error ({type(e).__name__}). Retrying...')
try:
if commit: trans = db.begin()
result = db.execute(sql, data) if data else db.execute(sql)
if commit: trans.commit()
return result
except Exception:
log.exception('Second attempt failed.')
return False
except Exception:
log.exception('Unknown SQL exception.')
return False
def sql_select(
table_name: str|None = None,
record_id: int|None = None,
record_id_random: str|None = None,
field_name: str|None = None,
field_value = None,
enabled: str|None = None,
hidden: str|None = None,
qry_dict_li: dict|None = None,
fulltext_qry_dict: dict|None = None,
and_qry_dict: dict|None = None,
and_like_dict: dict|None = None,
or_like_dict: dict|None = None,
and_in_dict_li: dict|None = None,
search_query: Any|None = None,
searchable_fields: List[str]|None = None,
order_by_li: dict|None = None,
limit: int = 9999999,
offset: int = 0,
sql: str|None = None,
data: dict|None = None,
as_dict: bool = True,
as_list: bool = False,
log_lvl: int = logging.WARNING,
) -> Any:
"""The main entry point for selecting records from the database."""
from app.lib_sql_search import (
sql_enable_part, sql_hidden_part, sql_search_qry_part,
sql_where_qry_part, sql_fulltext_qry_part, sql_and_qry_part,
sql_and_like_part, sql_or_like_part, sql_and_in_dict_li_part
)
log.setLevel(log_lvl)
sql_limit_offset = f'LIMIT {limit} OFFSET {offset}' if limit >= 0 and offset >= 0 else ''
sql_order_by = ''
if order_by_li and isinstance(order_by_li, dict):
order_by_str_li = [f'`{table_name}`.`{k}` {v}' for k, v in order_by_li.items()]
sql_order_by = f"ORDER BY {', '.join(order_by_str_li)}"
if table_name and not (record_id or record_id_random or field_name or field_value or sql or data):
data = {}
s_en, d_en = sql_enable_part(table_name, enabled) if enabled else ('', None)
s_hi, d_hi = sql_hidden_part(table_name, hidden) if hidden else ('', None)
if d_en is not None: data['enabled'] = d_en
if d_hi is not None: data['hidden'] = d_hi
s_search, d_search = ('', {})
if search_query:
s_search, d_search = sql_search_qry_part(search_query, searchable_fields, table_name=table_name)
data.update(d_search)
query = text(f"SELECT * FROM `{table_name}` WHERE 1=1 {s_search} {s_en} {s_hi} {sql_order_by} {sql_limit_offset};")
elif table_name and (record_id or record_id_random) and not (field_name or field_value or sql or data):
data = {'rid': record_id} if record_id else {'ridr': record_id_random}
where = f"`{table_name}`.id = :rid" if record_id else f"`{table_name}`.id_random = :ridr"
query = text(f"SELECT * FROM `{table_name}` WHERE {where} {sql_order_by} {sql_limit_offset};")
elif table_name and field_name and field_value and not (record_id or record_id_random or sql or data):
data = {field_name: field_value}
s_where, d_where = sql_where_qry_part(qry_dict_li) if qry_dict_li else ('', {})
s_ft, d_ft = sql_fulltext_qry_part(fulltext_qry_dict) if fulltext_qry_dict else ('', {})
s_and, d_and = sql_and_qry_part(and_qry_dict) if and_qry_dict else ('', {})
s_alike, d_alike = sql_and_like_part(and_like_dict) if and_like_dict else ('', {})
s_olike, d_olike = sql_or_like_part(or_like_dict) if or_like_dict else ('', {})
s_in, d_in = sql_and_in_dict_li_part(and_in_dict_li) if and_in_dict_li else ('', {})
s_search, d_search = sql_search_qry_part(search_query, searchable_fields, table_name=table_name) if search_query else ('', {})
s_en, d_en = sql_enable_part(table_name, enabled) if enabled else ('', None)
s_hi, d_hi = sql_hidden_part(table_name, hidden) if hidden else ('', None)
data.update(d_where); data.update(d_ft); data.update(d_and); data.update(d_alike)
data.update(d_olike); data.update(d_in); data.update(d_search)
if d_en is not None: data['enabled'] = d_en
if d_hi is not None: data['hidden'] = d_hi
query = text(f"SELECT * FROM `{table_name}` WHERE `{table_name}`.{field_name} = :{field_name} "
f"{s_where} {s_ft} {s_and} {s_alike} {s_olike} {s_in} {s_search} {s_en} {s_hi} {sql_order_by} {sql_limit_offset};")
else:
log.warning('No matched parameter combination for sql_select')
return False
result = run_sql_select(sql=query, data=data)
if result is False: return False
if result.rowcount == 1:
record = dict(result.first()) if as_dict else result.first()
return [record] if as_list else record
elif result.rowcount > 1:
records = [dict(r) for record in result.fetchall()] if as_dict else result.fetchall()
return records
else:
return [] if as_list else None
def sql_insert(table_name: str, data: dict, log_lvl: int = logging.WARNING) -> Any:
"""Inserts a new record into the specified table."""
from app.db_sql import db
from app.lib_redis_helpers import lookup_id_random_pop
log.setLevel(log_lvl)
data = lookup_id_random_pop(data)
if not data.get('id_random'):
import secrets
data['id_random'] = secrets.token_urlsafe(8)
fields = [f'`{k}`' for k in data.keys() if k != 'id']
placeholders = [f':{k}' for k in data.keys() if k != 'id']
for k, v in data.items():
if isinstance(v, (dict, list)): data[k] = json.dumps(v)
query = text(f"INSERT INTO `{table_name}` ({', '.join(fields)}) VALUES ({', '.join(placeholders)});")
trans = db.begin()
try:
result = db.execute(query, data)
trans.commit()
return result.lastrowid if result.rowcount == 1 else False
except Exception:
trans.rollback()
log.exception('Insert failed.')
return False
def sql_update(table_name: str, data: dict, record_id: int|None = None, record_id_random: str|None = None, log_lvl: int = logging.WARNING) -> Any:
"""Updates an existing record in the specified table."""
from app.db_sql import db
from app.lib_redis_helpers import lookup_id_random_pop
log.setLevel(log_lvl)
data = lookup_id_random_pop(data)
set_clauses = [f"`{k}` = :{k}" for k in data.keys() if k != 'id']
for k, v in data.items():
if isinstance(v, (dict, list)): data[k] = json.dumps(v)
if record_id:
where = "id = :rid"; data['rid'] = record_id
elif record_id_random:
where = "id_random = :ridr"; data['ridr'] = record_id_random
elif 'id' in data:
where = "id = :id"
else:
return False
query = text(f"UPDATE `{table_name}` SET {', '.join(set_clauses)} WHERE {where};")
trans = db.begin()
try:
result = db.execute(query, data)
trans.commit()
return True if result.rowcount >= 1 else None
except Exception:
trans.rollback()
log.exception('Update failed.')
return False
def sql_delete(table_name: str, record_id: int|None = None, record_id_random: str|None = None, log_lvl: int = logging.INFO) -> Any:
"""Deletes a record from the specified table."""
from app.db_sql import db
log.setLevel(log_lvl)
if record_id:
where = "id = :rid"; data = {'rid': record_id}
elif record_id_random:
where = "id_random = :ridr"; data = {'ridr': record_id_random}
else:
return False
query = text(f"DELETE FROM `{table_name}` WHERE {where};")
try:
result = db.execute(query, data)
return True if result.rowcount >= 1 else None
except Exception:
log.exception('Delete failed.')
return False
def sql_insert_or_update(
table_name: str,
data: dict,
rm_id_random: bool = False,
log_lvl: int = logging.DEBUG,
):
"""Modularized wrapper for INSERT ... ON DUPLICATE KEY UPDATE."""
from app.db_sql import db
from app.lib_redis_helpers import lookup_id_random_pop
if not table_name or not data: return False
if rm_id_random:
data = lookup_id_random_pop(obj_data=data)
fields = [f'`{k}`' for k in data.keys() if k != 'id']
values = [f':{k}' for k in data.keys() if k != 'id']
updates = [f'`{k}` = :{k}' for k in data.keys() if k != 'id']
query = text(f"INSERT INTO `{table_name}` ({', '.join(fields)}) VALUES ({', '.join(values)}) "
f"ON DUPLICATE KEY UPDATE {', '.join(updates)};")
trans = db.begin()
try:
result = db.execute(query, data)
trans.commit()
return result.lastrowid if result.lastrowid > 0 else True
except Exception:
trans.rollback()
log.exception('sql_insert_or_update failed.')
return False
def get_account_id_w_for_type_id(for_type: str, for_id: int|str) -> int|bool|None:
"""Modularized helper to find an account_id associated with an object."""
from app.lib_redis_helpers import redis_lookup_id_random
from app.db_sql import sql_select
if fid := redis_lookup_id_random(record_id_random=for_id, table_name=for_type):
sql = f"SELECT account_id FROM `{for_type}` WHERE id = :fid LIMIT 1;"
if result := sql_select(sql=sql, data={'fid': fid}):
return result.get('account_id')
return False
return False