Saving our work just in case.

This commit is contained in:
Scott Idem
2026-01-06 18:06:45 -05:00
parent 55033d0749
commit a6ec6d1b2b
3 changed files with 439 additions and 0 deletions

281
app/lib_sql_core.py Normal file
View File

@@ -0,0 +1,281 @@
"""
Core SQL CRUD operations and connection management for Aether.
"""
import logging
import json
from typing import Any, List, Optional
from sqlalchemy import text, Time
from sqlalchemy.exc import IntegrityError, OperationalError, ProgrammingError
from app.config import settings
from app.log import logger_reset
# Local logger to avoid cycles
log = logging.getLogger(__name__)
def sql_connect(current_db, log_lvl: int = logging.INFO) -> bool:
"""Disposes of the current connection pool and prepares for reconnection."""
if current_db:
current_db.engine.dispose()
return True
return False
def run_sql_select(
sql: Any,
data: dict|None = None,
commit: bool = False,
log_lvl: int = logging.WARNING,
) -> Any:
"""Executes a SQL SELECT statement with retry logic for operational errors."""
from app.db_sql import db
log.setLevel(log_lvl)
if not db:
log.error('The database connection is not available!!!')
return False
try:
if commit: trans = db.begin()
sql = sql.columns(recurring_start_time=Time, recurring_end_time=Time)
result = db.execute(sql, data) if data else db.execute(sql)
if commit: trans.commit()
return result
except (OperationalError, ProgrammingError) as e:
log.error(f'DB Error ({type(e).__name__}). Retrying...')
try:
if commit: trans = db.begin()
result = db.execute(sql, data) if data else db.execute(sql)
if commit: trans.commit()
return result
except Exception:
log.exception('Second attempt failed.')
return False
except Exception:
log.exception('Unknown SQL exception.')
return False
def sql_select(
table_name: str|None = None,
record_id: int|None = None,
record_id_random: str|None = None,
field_name: str|None = None,
field_value = None,
enabled: str|None = None,
hidden: str|None = None,
qry_dict_li: dict|None = None,
fulltext_qry_dict: dict|None = None,
and_qry_dict: dict|None = None,
and_like_dict: dict|None = None,
or_like_dict: dict|None = None,
and_in_dict_li: dict|None = None,
search_query: Any|None = None,
searchable_fields: List[str]|None = None,
order_by_li: dict|None = None,
limit: int = 9999999,
offset: int = 0,
sql: str|None = None,
data: dict|None = None,
as_dict: bool = True,
as_list: bool = False,
log_lvl: int = logging.WARNING,
) -> Any:
"""The main entry point for selecting records from the database."""
from app.lib_sql_search import (
sql_enable_part, sql_hidden_part, sql_search_qry_part,
sql_where_qry_part, sql_fulltext_qry_part, sql_and_qry_part,
sql_and_like_part, sql_or_like_part, sql_and_in_dict_li_part
)
log.setLevel(log_lvl)
sql_limit_offset = f'LIMIT {limit} OFFSET {offset}' if limit >= 0 and offset >= 0 else ''
sql_order_by = ''
if order_by_li and isinstance(order_by_li, dict):
order_by_str_li = [f'`{table_name}`.`{k}` {v}' for k, v in order_by_li.items()]
sql_order_by = f"ORDER BY {', '.join(order_by_str_li)}"
if table_name and not (record_id or record_id_random or field_name or field_value or sql or data):
data = {}
s_en, d_en = sql_enable_part(table_name, enabled) if enabled else ('', None)
s_hi, d_hi = sql_hidden_part(table_name, hidden) if hidden else ('', None)
if d_en is not None: data['enabled'] = d_en
if d_hi is not None: data['hidden'] = d_hi
s_search, d_search = ('', {})
if search_query:
s_search, d_search = sql_search_qry_part(search_query, searchable_fields, table_name=table_name)
data.update(d_search)
query = text(f"SELECT * FROM `{table_name}` WHERE 1=1 {s_search} {s_en} {s_hi} {sql_order_by} {sql_limit_offset};")
elif table_name and (record_id or record_id_random) and not (field_name or field_value or sql or data):
data = {'rid': record_id} if record_id else {'ridr': record_id_random}
where = f"`{table_name}`.id = :rid" if record_id else f"`{table_name}`.id_random = :ridr"
query = text(f"SELECT * FROM `{table_name}` WHERE {where} {sql_order_by} {sql_limit_offset};")
elif table_name and field_name and field_value and not (record_id or record_id_random or sql or data):
data = {field_name: field_value}
s_where, d_where = sql_where_qry_part(qry_dict_li) if qry_dict_li else ('', {})
s_ft, d_ft = sql_fulltext_qry_part(fulltext_qry_dict) if fulltext_qry_dict else ('', {})
s_and, d_and = sql_and_qry_part(and_qry_dict) if and_qry_dict else ('', {})
s_alike, d_alike = sql_and_like_part(and_like_dict) if and_like_dict else ('', {})
s_olike, d_olike = sql_or_like_part(or_like_dict) if or_like_dict else ('', {})
s_in, d_in = sql_and_in_dict_li_part(and_in_dict_li) if and_in_dict_li else ('', {})
s_search, d_search = sql_search_qry_part(search_query, searchable_fields, table_name=table_name) if search_query else ('', {})
s_en, d_en = sql_enable_part(table_name, enabled) if enabled else ('', None)
s_hi, d_hi = sql_hidden_part(table_name, hidden) if hidden else ('', None)
data.update(d_where); data.update(d_ft); data.update(d_and); data.update(d_alike)
data.update(d_olike); data.update(d_in); data.update(d_search)
if d_en is not None: data['enabled'] = d_en
if d_hi is not None: data['hidden'] = d_hi
query = text(f"SELECT * FROM `{table_name}` WHERE `{table_name}`.{field_name} = :{field_name} "
f"{s_where} {s_ft} {s_and} {s_alike} {s_olike} {s_in} {s_search} {s_en} {s_hi} {sql_order_by} {sql_limit_offset};")
else:
log.warning('No matched parameter combination for sql_select')
return False
result = run_sql_select(sql=query, data=data)
if result is False: return False
if result.rowcount == 1:
record = dict(result.first()) if as_dict else result.first()
return [record] if as_list else record
elif result.rowcount > 1:
records = [dict(r) for record in result.fetchall()] if as_dict else result.fetchall()
return records
else:
return [] if as_list else None
def sql_insert(table_name: str, data: dict, log_lvl: int = logging.WARNING) -> Any:
"""Inserts a new record into the specified table."""
from app.db_sql import db
from app.lib_redis_helpers import lookup_id_random_pop
log.setLevel(log_lvl)
data = lookup_id_random_pop(data)
if not data.get('id_random'):
import secrets
data['id_random'] = secrets.token_urlsafe(8)
fields = [f'`{k}`' for k in data.keys() if k != 'id']
placeholders = [f':{k}' for k in data.keys() if k != 'id']
for k, v in data.items():
if isinstance(v, (dict, list)): data[k] = json.dumps(v)
query = text(f"INSERT INTO `{table_name}` ({', '.join(fields)}) VALUES ({', '.join(placeholders)});")
trans = db.begin()
try:
result = db.execute(query, data)
trans.commit()
return result.lastrowid if result.rowcount == 1 else False
except Exception:
trans.rollback()
log.exception('Insert failed.')
return False
def sql_update(table_name: str, data: dict, record_id: int|None = None, record_id_random: str|None = None, log_lvl: int = logging.WARNING) -> Any:
"""Updates an existing record in the specified table."""
from app.db_sql import db
from app.lib_redis_helpers import lookup_id_random_pop
log.setLevel(log_lvl)
data = lookup_id_random_pop(data)
set_clauses = [f"`{k}` = :{k}" for k in data.keys() if k != 'id']
for k, v in data.items():
if isinstance(v, (dict, list)): data[k] = json.dumps(v)
if record_id:
where = "id = :rid"; data['rid'] = record_id
elif record_id_random:
where = "id_random = :ridr"; data['ridr'] = record_id_random
elif 'id' in data:
where = "id = :id"
else:
return False
query = text(f"UPDATE `{table_name}` SET {', '.join(set_clauses)} WHERE {where};")
trans = db.begin()
try:
result = db.execute(query, data)
trans.commit()
return True if result.rowcount >= 1 else None
except Exception:
trans.rollback()
log.exception('Update failed.')
return False
def sql_delete(table_name: str, record_id: int|None = None, record_id_random: str|None = None, log_lvl: int = logging.INFO) -> Any:
"""Deletes a record from the specified table."""
from app.db_sql import db
log.setLevel(log_lvl)
if record_id:
where = "id = :rid"; data = {'rid': record_id}
elif record_id_random:
where = "id_random = :ridr"; data = {'ridr': record_id_random}
else:
return False
query = text(f"DELETE FROM `{table_name}` WHERE {where};")
try:
result = db.execute(query, data)
return True if result.rowcount >= 1 else None
except Exception:
log.exception('Delete failed.')
return False
def sql_insert_or_update(
table_name: str,
data: dict,
rm_id_random: bool = False,
log_lvl: int = logging.DEBUG,
):
"""Modularized wrapper for INSERT ... ON DUPLICATE KEY UPDATE."""
from app.db_sql import db
from app.lib_redis_helpers import lookup_id_random_pop
if not table_name or not data: return False
if rm_id_random:
data = lookup_id_random_pop(obj_data=data)
fields = [f'`{k}`' for k in data.keys() if k != 'id']
values = [f':{k}' for k in data.keys() if k != 'id']
updates = [f'`{k}` = :{k}' for k in data.keys() if k != 'id']
query = text(f"INSERT INTO `{table_name}` ({', '.join(fields)}) VALUES ({', '.join(values)}) "
f"ON DUPLICATE KEY UPDATE {', '.join(updates)};")
trans = db.begin()
try:
result = db.execute(query, data)
trans.commit()
return result.lastrowid if result.lastrowid > 0 else True
except Exception:
trans.rollback()
log.exception('sql_insert_or_update failed.')
return False
def get_account_id_w_for_type_id(for_type: str, for_id: int|str) -> int|bool|None:
"""Modularized helper to find an account_id associated with an object."""
from app.lib_redis_helpers import redis_lookup_id_random
from app.db_sql import sql_select
if fid := redis_lookup_id_random(record_id_random=for_id, table_name=for_type):
sql = f"SELECT account_id FROM `{for_type}` WHERE id = :fid LIMIT 1;"
if result := sql_select(sql=sql, data={'fid': fid}):
return result.get('account_id')
return False