From eccd71f45092524a195b51c1b34a77491c8320db Mon Sep 17 00:00:00 2001 From: Scott Idem Date: Thu, 15 Jan 2026 17:16:48 -0500 Subject: [PATCH] Refactor: Modularize database logic and extract core CRUD operations --- app/db_sql.py | 493 +++----------------------------------------- app/lib_sql_core.py | 347 +++++++------------------------ app/lib_sql_crud.py | 368 +++++++++++++++++++++++++++++++++ 3 files changed, 473 insertions(+), 735 deletions(-) create mode 100644 app/lib_sql_crud.py diff --git a/app/db_sql.py b/app/db_sql.py index d13f964..13e47a0 100644 --- a/app/db_sql.py +++ b/app/db_sql.py @@ -1,28 +1,19 @@ -import datetime, json, pytz, random, redis, secrets, threading -from typing import Any, List, Optional -from timeit import default_timer as timer +import logging +from app.log import logger_reset -from fastapi import HTTPException -from app.config import settings -from app.log import log, logging, logger_reset +# 1. Foundational connection and error state from SQL Core +from app.lib_sql_core import ( + db, engine, reconnect_db, sql_connect, + get_last_sql_error, set_last_sql_error +) -from sqlalchemy import create_engine, text, Time -from sqlalchemy.exc import IntegrityError, OperationalError, ProgrammingError -from sqlalchemy.pool import NullPool - -# Thread-local storage for capturing last SQL error message -_sql_error_state = threading.local() - -def get_last_sql_error() -> Optional[str]: - """Retrieves and clears the last captured SQL error message.""" - error = getattr(_sql_error_state, 'last_error', None) - _sql_error_state.last_error = None - return error - -def set_last_sql_error(error: Any): - """Sets the last captured SQL error message.""" - _sql_error_state.last_error = str(error) +# 2. Foundational CRUD logic from SQL Crud library +from app.lib_sql_crud import ( + sql_insert, sql_update, sql_insert_or_update, + sql_select, run_sql_select, sql_delete +) +# 3. Search logic parts (delegated from search library) from app.lib_sql_search import ( sql_limit_offset_part as _sql_limit_offset_part, sql_and_like_part as _sql_and_like_part, @@ -36,6 +27,7 @@ from app.lib_sql_search import ( sql_search_qry_part as _sql_search_qry_part ) +# 4. Redis and ID resolution helpers from app.lib_redis_helpers import ( redis_lookup_id_random as _redis_lookup_id_random, get_id_random as _get_id_random, @@ -43,421 +35,6 @@ from app.lib_redis_helpers import ( lookup_id_random_pop as _lookup_id_random_pop ) -db_uri = settings.SQLALCHEMY_DB_URI - -engine = create_engine( - url = db_uri, - echo = False, - pool_use_lifo = True, - pool_pre_ping = True, - pool_recycle = settings.DB['pool_recycle'], - isolation_level = 'READ COMMITTED', - connect_args = {'connect_timeout': settings.DB['connect_timeout']} -) -# NOTE: The default isolation_level is 'REPEATABLE READ'. This can sometimes not show updated data. -# NOTE: The "echo" set to True option shows the SQL queries. -# NOTE: Switching from READ COMMITTED to READ UNCOMMITTED (or REPEATABLE READ). Testing started 2024-04-23 -# levels: "REPEATABLE READ" "READ COMMITTED" "READ UNCOMMITTED" "SERIALIZABLE" - -log.info('DB SQL trying to connect...') -db = None -try: - db = engine.connect() - log.info(f'Connected to database: {db_uri}') -except Exception: - log.exception('Could not connect to database.') - -def reconnect_db() -> bool: - """ - Re-initializes the global database engine and connection using current settings. - Useful after bootstrapping new credentials from the 'cfg' table. - """ - global engine, db, db_uri - - log.info("Refreshing database connection engine...") - try: - if engine: - engine.dispose() - log.info("Disposed of previous database engine.") - - db_uri = settings.SQLALCHEMY_DB_URI - engine = create_engine( - url = db_uri, - echo = False, - pool_use_lifo = True, - pool_pre_ping = True, - pool_recycle = settings.DB['pool_recycle'], - isolation_level = 'READ COMMITTED', - connect_args = {'connect_timeout': settings.DB['connect_timeout']} - ) - db = engine.connect() - log.info(f"Database connection re-established successfully: {db_uri}") - return True - except Exception: - log.exception("FAILED to refresh database connection!") - return False - -# ### BEGIN ### API DB SQL ### sql_connect() ### -@logger_reset -def sql_connect(current_db, log_lvl: int = logging.INFO) -> None|bool|int: - log.setLevel(log_lvl) - log.debug(locals()) - - log.info('Trying to create a new engine (connection pool)...') - if current_db: - current_db.engine.dispose() - log.info('Disposed of the current engine (connection pool).') - return True - else: - log.warning(f'Could not created and or connect to database') - return False -# ### END ### API DB SQL ### sql_connect() ### - - -# ### BEGIN ### API DB SQL ### sql_insert() ### -# Returns the auto number ID of the record inserted, or returns None if there was likely a duplicate record, or False if there was a problem. -@logger_reset -def sql_insert( - sql: str|None = None, - data: dict|None = None, - table_name: str|None = None, - rm_id_random: bool = False, - id_random_length: int = 8, - log_lvl: int = logging.WARNING, - ) -> None|bool|int: - log.setLevel(log_lvl) - - if sql: - sql_insert_stmt = text(sql) - elif table_name and data: - if rm_id_random: - data = lookup_id_random_pop(obj_data=data) - if not data.get('id_random', None) and id_random_length: - data['id_random'] = secrets.token_urlsafe(id_random_length) - - fields = [] - values = [] - for key, value in data.items(): - if key != 'id': - fields.append('`'+str(key)+'`') - values.append(':'+str(key)) - if isinstance(value, (dict, list)): - data[key] = json.dumps(value) - - sql_insert_stmt = text(f"INSERT INTO `{table_name}` ({', '.join(fields)}) VALUES ({', '.join(values)});") - else: - log.error('SQL INSERT statement could not be created. Missing params.') - return False - - trans = db.begin() - try: - result_insert = db.execute(sql_insert_stmt, data) - trans.commit() - except IntegrityError as e: - trans.rollback() - log.error('Integrity error (likely duplicate). Returning None') - log.debug(e) - set_last_sql_error(e) - return None - except Exception as e: - trans.rollback() - log.error('Unknown exception in sql_insert. Returning False') - log.exception(e) - set_last_sql_error(e) - return False - else: - if result_insert.rowcount == 1 and result_insert.lastrowid > 0: - return result_insert.lastrowid - return False -# ### END ### API DB SQL ### sql_insert() ### - - -# ### BEGIN ### API DB SQL ### sql_update() ### -@logger_reset -def sql_update( - sql: str|None = None, - data: dict|None = None, - table_name: str|None = None, - record_id: int|None = None, - record_id_random: str|None = None, - rm_id_random: bool = False, - id_random_length: None|int = None, - log_lvl: int = logging.WARNING, - ): - log.setLevel(log_lvl) - - if sql: - sql_update_stmt = text(sql) - elif table_name and data: - if rm_id_random: - data = lookup_id_random_pop(obj_data=data) - if not data.get('id_random', None) and id_random_length: - data['id_random'] = secrets.token_urlsafe(id_random_length) - - field_list = [] - for key, value in data.items(): - if key != 'id': - field_list.append('`'+str(key) + '` = :' + str(key)) - if isinstance(value, (dict, list)): - data[key] = json.dumps(value) - - sql_set = ', '.join(field_list) - if len(sql_set) < 4: - return None - - if record_id: - data['id'] = record_id - sql_update_stmt = text(f'UPDATE `{table_name}` SET {sql_set} WHERE id = :id') - elif record_id_random: - data['id_random'] = record_id_random - sql_update_stmt = text(f'UPDATE `{table_name}` SET {sql_set} WHERE id_random = :id_random') - elif 'id' in data: - sql_update_stmt = text(f'UPDATE `{table_name}` SET {sql_set} WHERE id = :id') - elif 'id_random' in data: - sql_update_stmt = text(f'UPDATE `{table_name}` SET {sql_set} WHERE id_random = :id_random') - else: - return False - else: - return False - - trans = db.begin() - try: - result_update = db.execute(sql_update_stmt, data) - trans.commit() - except OperationalError: - trans.rollback() - log.error('Operational error (gone away?). Retrying once...') - sql_connect(current_db=db) - try: - result_update = db.execute(sql_update_stmt, data) - trans.commit() - except Exception as e: - set_last_sql_error(e) - return False - except Exception as e: - trans.rollback() - log.exception(e) - set_last_sql_error(e) - return False - else: - if result_update.rowcount >= 1: - return True - return None -# ### END ### API DB SQL ### sql_update() ### - - -# ### BEGIN ### Core Help CRUD ### sql_insert_or_update() ### -@logger_reset -def sql_insert_or_update( - sql: str|None = None, - data: dict|None = None, - table_name: str|None = None, - rm_id_random: bool = False, - id_random_length: int|None = None, - log_lvl: int = logging.DEBUG, - ): - log.setLevel(log_lvl) - - if sql: - stmt = text(sql) - elif table_name and data: - if rm_id_random: - data = lookup_id_random_pop(obj_data=data) - if not data.get('id_random', None) and id_random_length: - data['id_random'] = secrets.token_urlsafe(id_random_length) - - fields = [f'`{k}`' for k in data.keys() if k != 'id'] - placeholders = [f':{k}' for k in data.keys() if k != 'id'] - updates = [f'`{k}` = :{k}' for k in data.keys() if k != 'id'] - - for k, v in data.items(): - if isinstance(v, (dict, list)): - data[k] = json.dumps(v) - - stmt = text(f"INSERT INTO `{table_name}` ({', '.join(fields)}) VALUES ({', '.join(placeholders)}) " - f"ON DUPLICATE KEY UPDATE {', '.join(updates)};") - else: - return False - - trans = db.begin() - try: - res = db.execute(stmt, data) - trans.commit() - return res.lastrowid if res.lastrowid > 0 else True - except Exception as e: - trans.rollback() - log.exception(e) - return False -# ### END ### Core Help CRUD ### sql_insert_or_update() ### - - -# ### BEGIN ### Core Help CRUD ### sql_select() ### -@logger_reset -def sql_select( - table_name: str|None = None, - record_id: int|None = None, - record_id_random: str|None = None, - field_name: str|None = None, - field_value = None, - enabled: str|None = None, - hidden: str|None = None, - qry_dict_li: dict|None = None, - fulltext_qry_dict: dict|None = None, - and_qry_dict: dict|None = None, - and_like_dict: dict|None = None, - or_like_dict: dict|None = None, - and_in_dict_li: dict|None = None, - search_query: Any|None = None, - searchable_fields: List[str]|None = None, - order_by_li: dict|None = None, - limit: int = 9999999, - offset: int = 0, - sql: str|None = None, - data: dict|None = None, - rm_id_random: bool = False, - as_dict: bool|None = True, - as_list: bool|None = False, - max_count: int = 100000, - log_lvl: int = logging.WARNING, - ) -> None|bool|dict|list: - log.setLevel(log_lvl) - - sql_limit_offset = f'LIMIT {limit} OFFSET {offset}' if limit >= 0 and offset >= 0 else '' - - sql_order_by = '' - if order_by_li and isinstance(order_by_li, dict): - order_by_str_li = [f'`{table_name}`.`{k}` {v}' for k, v in order_by_li.items()] - sql_order_by = f"ORDER BY {', '.join(order_by_str_li)}" - - if table_name and not (record_id or record_id_random or field_name or field_value or sql or data): - data = {} - s_en, d_en = sql_enable_part(table_name, enabled) if enabled else ('', None) - s_hi, d_hi = sql_hidden_part(table_name, hidden) if hidden else ('', None) - if d_en is not None: data['enabled'] = d_en - if d_hi is not None: data['hidden'] = d_hi - - s_search, d_search = ('', {}) - if search_query: - s_search, d_search = sql_search_qry_part(search_query, searchable_fields, table_name=table_name) - data.update(d_search) - - stmt = text(f"SELECT * FROM `{table_name}` WHERE 1=1 {s_search} {s_en} {s_hi} {sql_order_by} {sql_limit_offset};") - - elif table_name and (record_id or record_id_random) and not (field_name or field_value or sql or data): - data = {'rid': record_id} if record_id else {'ridr': record_id_random} - where = f"`{table_name}`.id = :rid" if record_id else f"`{table_name}`.id_random = :ridr" - stmt = text(f"SELECT * FROM `{table_name}` WHERE {where} {sql_order_by} {sql_limit_offset};") - - elif table_name and field_name and field_value and not (record_id or record_id_random or sql or data): - data = {field_name: field_value} - s_where, d_where = sql_where_qry_part(qry_dict_li) if qry_dict_li else ('', {}) - s_ft, d_ft = sql_fulltext_qry_part(fulltext_qry_dict) if fulltext_qry_dict else ('', {}) - s_and, d_and = sql_and_qry_part(and_qry_dict) if and_qry_dict else ('', {}) - s_alike, d_alike = sql_and_like_part(and_like_dict) if and_like_dict else ('', {}) - s_olike, d_olike = sql_or_like_part(or_like_dict) if or_like_dict else ('', {}) - s_in, d_in = sql_and_in_dict_li_part(and_in_dict_li) if and_in_dict_li else ('', {}) - s_search, d_search = sql_search_qry_part(search_query, searchable_fields, table_name=table_name) if search_query else ('', {}) - s_en, d_en = sql_enable_part(table_name, enabled) if enabled else ('', None) - s_hi, d_hi = sql_hidden_part(table_name, hidden) if hidden else ('', None) - - data.update(d_where); data.update(d_ft); data.update(d_and); data.update(d_alike) - data.update(d_olike); data.update(d_in); data.update(d_search) - if d_en is not None: data['enabled'] = d_en - if d_hi is not None: data['hidden'] = d_hi - - stmt = text(f"SELECT * FROM `{table_name}` WHERE `{table_name}`.{field_name} = :{field_name} " - f"{s_where} {s_ft} {s_and} {s_alike} {s_olike} {s_in} {s_search} {s_en} {s_hi} {sql_order_by} {sql_limit_offset};") - elif table_name and data and not (record_id or record_id_random or field_name or field_value or sql): - if rm_id_random: data = lookup_id_random_pop(obj_data=data) - where_clauses = [f"`{table_name}`.{k} = :{k}" for k in data.keys()] - stmt = text(f"SELECT * FROM `{table_name}` WHERE {' AND '.join(where_clauses)} {sql_order_by} {sql_limit_offset};") - elif sql: - stmt = text(sql) - else: - return False - - result = run_sql_select(sql=stmt, data=data) - if not result: - return [] if as_list else None - - if result.rowcount == 1: - record = dict(result.first()) if as_dict else result.first() - return [record] if as_list else record - elif result.rowcount > 1: - records = [dict(r) for r in result.fetchall()] if as_dict else result.fetchall() - return records - else: - return [] if as_list else None -# ### END ### Core Help CRUD ### sql_select() ### - - -# ### BEGIN ### Core Help CRUD ### run_sql_select() ### -@logger_reset -def run_sql_select( - sql: Any, - data: dict|None = None, - commit: bool = False, - log_lvl: int = logging.WARNING, - ) -> Any: - log.setLevel(log_lvl) - if not db: - return False - - try: - if commit: trans = db.begin() - sql = sql.columns(recurring_start_time=Time, recurring_end_time=Time) - result = db.execute(sql, data) if data else db.execute(sql) - if commit: trans.commit() - return result - except (OperationalError, ProgrammingError): - log.error('DB Error. Retrying once...') - sql_connect(current_db=db) - try: - if commit: trans = db.begin() - result = db.execute(sql, data) if data else db.execute(sql) - if commit: trans.commit() - return result - except Exception: - return False - except Exception as e: - log.exception(e) - return False -# ### END ### Core Help CRUD ### run_sql_select() ### - -# ### BEGIN ### Core Help CRUD ### sql_delete() ### -@logger_reset -def sql_delete( - table_name: str|None = None, - record_id: int|None = None, - record_id_random: str|None = None, - field_name: str|None = None, - field_value = None, - sql: str|None = None, - data: dict|None = None, - log_lvl: int = logging.INFO, - ) -> None|bool: - log.setLevel(log_lvl) - - if table_name and (record_id or record_id_random) and not (field_name or field_value or sql or data): - data = {'rid': record_id} if record_id else {'ridr': record_id_random} - where = f"`{table_name}`.id = :rid" if record_id else f"`{table_name}`.id_random = :ridr" - stmt = text(f"DELETE FROM `{table_name}` WHERE {where}") - elif table_name and field_name and field_value and not (record_id or record_id_random or sql or data): - data = {field_name: field_value} - stmt = text(f"DELETE FROM `{table_name}` WHERE `{table_name}`.{field_name} = :{field_name}") - elif sql: - stmt = text(sql) - else: - return False - - try: - result = db.execute(stmt, data) if data else db.execute(stmt) - return True if result.rowcount >= 1 else None - except Exception as e: - log.exception(e) - return False -# ### END ### Core Help CRUD ### sql_delete() ### - # ### BEGIN ### API DB SQL ### redis_lookup_id_random() ### @logger_reset @@ -468,16 +45,14 @@ def redis_lookup_id_random( log_lvl: int = logging.WARNING, minutes: int = 30, reset_rate: int = 10, - ) -> str|int|bool|None: - log.setLevel(log_lvl) + ): return _redis_lookup_id_random(record_id_random, table_name, check_int_id, log_lvl, minutes, reset_rate) # ### END ### API DB SQL ### redis_lookup_id_random() ### # ### BEGIN ### API DB SQL ### get_id_random() ### @logger_reset -def get_id_random(record_id: int, table_name: str, log_lvl: int = logging.WARNING) -> str|bool|None: - log.setLevel(log_lvl) +def get_id_random(record_id: int, table_name: str, log_lvl: int = logging.WARNING): return _get_id_random(record_id, table_name, log_lvl) # ### END ### API DB SQL ### get_id_random() ### @@ -496,8 +71,7 @@ def lookup_id_random_pop(obj_data: dict, log_lvl: int = logging.WARNING): # ### BEGIN ### API DB SQL Methods ### get_account_id_w_for_type_id() ### @logger_reset -def get_account_id_w_for_type_id(for_type: str, for_id: int|str) -> bool|int|None: - log.setLevel(logging.WARNING) +def get_account_id_w_for_type_id(for_type: str, for_id: int|str): if fid := redis_lookup_id_random(record_id_random=for_id, table_name=for_type): sql = f"SELECT account_id FROM `{for_type}` WHERE id = :fid LIMIT 1;" if result := sql_select(sql=sql, data={'fid': fid}): @@ -507,62 +81,52 @@ def get_account_id_w_for_type_id(for_type: str, for_id: int|str) -> bool|int|Non @logger_reset -def sql_where_qry_part(qry_dict_li: list) -> bool|str: - log.setLevel(logging.INFO) +def sql_where_qry_part(qry_dict_li: list): return _sql_where_qry_part(qry_dict_li) @logger_reset -def sql_fulltext_qry_part(fulltext_qry_dict: dict) -> bool|dict: - log.setLevel(logging.INFO) +def sql_fulltext_qry_part(fulltext_qry_dict: dict): return _sql_fulltext_qry_part(fulltext_qry_dict) @logger_reset -def sql_and_qry_part(and_qry_dict_obj: dict) -> bool|dict: - log.setLevel(logging.WARNING) +def sql_and_qry_part(and_qry_dict_obj: dict): return _sql_and_qry_part(and_qry_dict_obj) @logger_reset -def sql_and_like_part(and_like_dict_obj: dict) -> bool|dict: - log.setLevel(logging.INFO) +def sql_and_like_part(and_like_dict_obj: dict): return _sql_and_like_part(and_like_dict_obj) @logger_reset -def sql_or_like_part(or_like_dict_obj: dict) -> bool|dict: - log.setLevel(logging.INFO) +def sql_or_like_part(or_like_dict_obj: dict): return _sql_or_like_part(or_like_dict_obj) @logger_reset -def sql_and_in_dict_li_part(and_in_dict_li_dict_obj: dict) -> bool|dict: - log.setLevel(logging.WARNING) +def sql_and_in_dict_li_part(and_in_dict_li_dict_obj: dict): return _sql_and_in_dict_li_part(and_in_dict_li_dict_obj) @logger_reset -def sql_enable_part(table_name: str, enabled: str) -> bool|dict: - log.setLevel(logging.WARNING) +def sql_enable_part(table_name: str, enabled: str): return _sql_enable_part(table_name, enabled) @logger_reset -def sql_hidden_part(table_name: str, hidden: str) -> bool|dict: - log.setLevel(logging.WARNING) +def sql_hidden_part(table_name: str, hidden: str): return _sql_hidden_part(table_name, hidden) @logger_reset -def sql_limit_offset_part(limit: int, offset: int = 0) -> bool|str: - log.setLevel(logging.WARNING) +def sql_limit_offset_part(limit: int, offset: int = 0): return _sql_limit_offset_part(limit, offset) @logger_reset -def sql_search_qry_part(search_query: Any, searchable_fields: List[str]|None = None, max_depth: int = 5, table_name: str|None = None) -> tuple[str, dict]: - log.setLevel(logging.INFO) +def sql_search_qry_part(search_query: any, searchable_fields: list[str]|None = None, max_depth: int = 5, table_name: str|None = None): return _sql_search_qry_part(search_query, searchable_fields, max_depth, table_name) __all__ = [ @@ -572,5 +136,6 @@ __all__ = [ 'sql_fulltext_qry_part', 'sql_and_qry_part', 'sql_and_like_part', 'sql_or_like_part', 'sql_and_in_dict_li_part', 'sql_enable_part', 'sql_hidden_part', 'sql_limit_offset_part', 'sql_search_qry_part', - 'sql_insert_or_update', 'get_account_id_w_for_type_id' + 'sql_insert_or_update', 'get_account_id_w_for_type_id', 'reconnect_db', + 'get_last_sql_error', 'set_last_sql_error' ] \ No newline at end of file diff --git a/app/lib_sql_core.py b/app/lib_sql_core.py index 3325a6c..97d7530 100644 --- a/app/lib_sql_core.py +++ b/app/lib_sql_core.py @@ -1,281 +1,86 @@ """ -Core SQL CRUD operations and connection management for Aether. +Foundational SQL connection management for the Aether API. +Isolates the SQLAlchemy engine and global connection state to prevent circular imports. """ import logging -import json -from typing import Any, List, Optional -from sqlalchemy import text, Time -from sqlalchemy.exc import IntegrityError, OperationalError, ProgrammingError - +import threading +from typing import Any, Optional +from sqlalchemy import create_engine from app.config import settings -from app.log import logger_reset -# Local logger to avoid cycles -log = logging.getLogger(__name__) +log = logging.getLogger('root') + +# 1. Thread-local storage for capturing last SQL error message +_sql_error_state = threading.local() + +def get_last_sql_error() -> Optional[str]: + """Retrieves and clears the last captured SQL error message.""" + error = getattr(_sql_error_state, 'last_error', None) + _sql_error_state.last_error = None + return error + +def set_last_sql_error(error: Any): + """Sets the last captured SQL error message.""" + _sql_error_state.last_error = str(error) + + +# 2. Initial Engine Setup +db_uri = settings.SQLALCHEMY_DB_URI + +engine = create_engine( + url = db_uri, + echo = False, + pool_use_lifo = True, + pool_pre_ping = True, + pool_recycle = settings.DB['pool_recycle'], + isolation_level = 'READ COMMITTED', + connect_args = {'connect_timeout': settings.DB['connect_timeout']} +) + +log.info('DB SQL Core: Initializing connection...') +db = None +try: + db = engine.connect() + log.info(f'DB SQL Core: Connected to database: {db_uri}') +except Exception: + log.exception('DB SQL Core: Could not connect to database.') + + +# 3. Connection Management Logic +def reconnect_db() -> bool: + """ + Re-initializes the global database engine and connection using current settings. + Useful after bootstrapping new credentials from the 'cfg' table. + """ + global engine, db, db_uri + + log.info("DB SQL Core: Refreshing database connection engine...") + try: + if engine: + engine.dispose() + log.info("DB SQL Core: Disposed of previous database engine.") + + db_uri = settings.SQLALCHEMY_DB_URI + engine = create_engine( + url = db_uri, + echo = False, + pool_use_lifo = True, + pool_pre_ping = True, + pool_recycle = settings.DB['pool_recycle'], + isolation_level = 'READ COMMITTED', + connect_args = {'connect_timeout': settings.DB['connect_timeout']} + ) + db = engine.connect() + log.info(f"DB SQL Core: Database connection re-established successfully: {db_uri}") + return True + except Exception: + log.exception("DB SQL Core: FAILED to refresh database connection!") + return False def sql_connect(current_db, log_lvl: int = logging.INFO) -> bool: - """Disposes of the current connection pool and prepares for reconnection.""" + """Old compatibility wrapper for disposing the engine.""" if current_db: current_db.engine.dispose() + log.info('DB SQL Core: Disposed of the current engine via sql_connect.') return True - return False - - -def run_sql_select( - sql: Any, - data: dict|None = None, - commit: bool = False, - log_lvl: int = logging.WARNING, - ) -> Any: - """Executes a SQL SELECT statement with retry logic for operational errors.""" - from app.db_sql import db - log.setLevel(log_lvl) - - if not db: - log.error('The database connection is not available!!!') - return False - - try: - if commit: trans = db.begin() - sql = sql.columns(recurring_start_time=Time, recurring_end_time=Time) - result = db.execute(sql, data) if data else db.execute(sql) - if commit: trans.commit() - return result - except (OperationalError, ProgrammingError) as e: - log.error(f'DB Error ({type(e).__name__}). Retrying...') - try: - if commit: trans = db.begin() - result = db.execute(sql, data) if data else db.execute(sql) - if commit: trans.commit() - return result - except Exception: - log.exception('Second attempt failed.') - return False - except Exception: - log.exception('Unknown SQL exception.') - return False - - -def sql_select( - table_name: str|None = None, - record_id: int|None = None, - record_id_random: str|None = None, - field_name: str|None = None, - field_value = None, - enabled: str|None = None, - hidden: str|None = None, - qry_dict_li: dict|None = None, - fulltext_qry_dict: dict|None = None, - and_qry_dict: dict|None = None, - and_like_dict: dict|None = None, - or_like_dict: dict|None = None, - and_in_dict_li: dict|None = None, - search_query: Any|None = None, - searchable_fields: List[str]|None = None, - order_by_li: dict|None = None, - limit: int = 9999999, - offset: int = 0, - sql: str|None = None, - data: dict|None = None, - as_dict: bool = True, - as_list: bool = False, - log_lvl: int = logging.WARNING, - ) -> Any: - """The main entry point for selecting records from the database.""" - from app.lib_sql_search import ( - sql_enable_part, sql_hidden_part, sql_search_qry_part, - sql_where_qry_part, sql_fulltext_qry_part, sql_and_qry_part, - sql_and_like_part, sql_or_like_part, sql_and_in_dict_li_part - ) - - log.setLevel(log_lvl) - sql_limit_offset = f'LIMIT {limit} OFFSET {offset}' if limit >= 0 and offset >= 0 else '' - - sql_order_by = '' - if order_by_li and isinstance(order_by_li, dict): - order_by_str_li = [f'`{table_name}`.`{k}` {v}' for k, v in order_by_li.items()] - sql_order_by = f"ORDER BY {', '.join(order_by_str_li)}" - - if table_name and not (record_id or record_id_random or field_name or field_value or sql or data): - data = {} - s_en, d_en = sql_enable_part(table_name, enabled) if enabled else ('', None) - s_hi, d_hi = sql_hidden_part(table_name, hidden) if hidden else ('', None) - if d_en is not None: data['enabled'] = d_en - if d_hi is not None: data['hidden'] = d_hi - - s_search, d_search = ('', {}) - if search_query: - s_search, d_search = sql_search_qry_part(search_query, searchable_fields, table_name=table_name) - data.update(d_search) - - query = text(f"SELECT * FROM `{table_name}` WHERE 1=1 {s_search} {s_en} {s_hi} {sql_order_by} {sql_limit_offset};") - - elif table_name and (record_id or record_id_random) and not (field_name or field_value or sql or data): - data = {'rid': record_id} if record_id else {'ridr': record_id_random} - where = f"`{table_name}`.id = :rid" if record_id else f"`{table_name}`.id_random = :ridr" - query = text(f"SELECT * FROM `{table_name}` WHERE {where} {sql_order_by} {sql_limit_offset};") - - elif table_name and field_name and field_value and not (record_id or record_id_random or sql or data): - data = {field_name: field_value} - - s_where, d_where = sql_where_qry_part(qry_dict_li) if qry_dict_li else ('', {}) - s_ft, d_ft = sql_fulltext_qry_part(fulltext_qry_dict) if fulltext_qry_dict else ('', {}) - s_and, d_and = sql_and_qry_part(and_qry_dict) if and_qry_dict else ('', {}) - s_alike, d_alike = sql_and_like_part(and_like_dict) if and_like_dict else ('', {}) - s_olike, d_olike = sql_or_like_part(or_like_dict) if or_like_dict else ('', {}) - s_in, d_in = sql_and_in_dict_li_part(and_in_dict_li) if and_in_dict_li else ('', {}) - s_search, d_search = sql_search_qry_part(search_query, searchable_fields, table_name=table_name) if search_query else ('', {}) - s_en, d_en = sql_enable_part(table_name, enabled) if enabled else ('', None) - s_hi, d_hi = sql_hidden_part(table_name, hidden) if hidden else ('', None) - - data.update(d_where); data.update(d_ft); data.update(d_and); data.update(d_alike) - data.update(d_olike); data.update(d_in); data.update(d_search) - if d_en is not None: data['enabled'] = d_en - if d_hi is not None: data['hidden'] = d_hi - - query = text(f"SELECT * FROM `{table_name}` WHERE `{table_name}`.{field_name} = :{field_name} " - f"{s_where} {s_ft} {s_and} {s_alike} {s_olike} {s_in} {s_search} {s_en} {s_hi} {sql_order_by} {sql_limit_offset};") - else: - log.warning('No matched parameter combination for sql_select') - return False - - result = run_sql_select(sql=query, data=data) - if result is False: return False - - if result.rowcount == 1: - record = dict(result.first()) if as_dict else result.first() - return [record] if as_list else record - elif result.rowcount > 1: - records = [dict(r) for record in result.fetchall()] if as_dict else result.fetchall() - return records - else: - return [] if as_list else None - - -def sql_insert(table_name: str, data: dict, log_lvl: int = logging.WARNING) -> Any: - """Inserts a new record into the specified table.""" - from app.db_sql import db - from app.lib_redis_helpers import lookup_id_random_pop - log.setLevel(log_lvl) - - data = lookup_id_random_pop(data) - if not data.get('id_random'): - import secrets - data['id_random'] = secrets.token_urlsafe(8) - - fields = [f'`{k}`' for k in data.keys() if k != 'id'] - placeholders = [f':{k}' for k in data.keys() if k != 'id'] - - for k, v in data.items(): - if isinstance(v, (dict, list)): data[k] = json.dumps(v) - - query = text(f"INSERT INTO `{table_name}` ({', '.join(fields)}) VALUES ({', '.join(placeholders)});") - - trans = db.begin() - try: - result = db.execute(query, data) - trans.commit() - return result.lastrowid if result.rowcount == 1 else False - except Exception: - trans.rollback() - log.exception('Insert failed.') - return False - - -def sql_update(table_name: str, data: dict, record_id: int|None = None, record_id_random: str|None = None, log_lvl: int = logging.WARNING) -> Any: - """Updates an existing record in the specified table.""" - from app.db_sql import db - from app.lib_redis_helpers import lookup_id_random_pop - log.setLevel(log_lvl) - - data = lookup_id_random_pop(data) - set_clauses = [f"`{k}` = :{k}" for k in data.keys() if k != 'id'] - - for k, v in data.items(): - if isinstance(v, (dict, list)): data[k] = json.dumps(v) - - if record_id: - where = "id = :rid"; data['rid'] = record_id - elif record_id_random: - where = "id_random = :ridr"; data['ridr'] = record_id_random - elif 'id' in data: - where = "id = :id" - else: - return False - - query = text(f"UPDATE `{table_name}` SET {', '.join(set_clauses)} WHERE {where};") - - trans = db.begin() - try: - result = db.execute(query, data) - trans.commit() - return True if result.rowcount >= 1 else None - except Exception: - trans.rollback() - log.exception('Update failed.') - return False - - -def sql_delete(table_name: str, record_id: int|None = None, record_id_random: str|None = None, log_lvl: int = logging.INFO) -> Any: - """Deletes a record from the specified table.""" - from app.db_sql import db - log.setLevel(log_lvl) - - if record_id: - where = "id = :rid"; data = {'rid': record_id} - elif record_id_random: - where = "id_random = :ridr"; data = {'ridr': record_id_random} - else: - return False - - query = text(f"DELETE FROM `{table_name}` WHERE {where};") - - try: - result = db.execute(query, data) - return True if result.rowcount >= 1 else None - except Exception: - log.exception('Delete failed.') - return False - - -def sql_insert_or_update( - table_name: str, - data: dict, - rm_id_random: bool = False, - log_lvl: int = logging.DEBUG, - ): - """Modularized wrapper for INSERT ... ON DUPLICATE KEY UPDATE.""" - from app.db_sql import db - from app.lib_redis_helpers import lookup_id_random_pop - if not table_name or not data: return False - - if rm_id_random: - data = lookup_id_random_pop(obj_data=data) - - fields = [f'`{k}`' for k in data.keys() if k != 'id'] - values = [f':{k}' for k in data.keys() if k != 'id'] - updates = [f'`{k}` = :{k}' for k in data.keys() if k != 'id'] - - query = text(f"INSERT INTO `{table_name}` ({', '.join(fields)}) VALUES ({', '.join(values)}) " - f"ON DUPLICATE KEY UPDATE {', '.join(updates)};") - - trans = db.begin() - try: - result = db.execute(query, data) - trans.commit() - return result.lastrowid if result.lastrowid > 0 else True - except Exception: - trans.rollback() - log.exception('sql_insert_or_update failed.') - return False - - -def get_account_id_w_for_type_id(for_type: str, for_id: int|str) -> int|bool|None: - """Modularized helper to find an account_id associated with an object.""" - from app.lib_redis_helpers import redis_lookup_id_random - from app.db_sql import sql_select - if fid := redis_lookup_id_random(record_id_random=for_id, table_name=for_type): - sql = f"SELECT account_id FROM `{for_type}` WHERE id = :fid LIMIT 1;" - if result := sql_select(sql=sql, data={'fid': fid}): - return result.get('account_id') - return False + return False \ No newline at end of file diff --git a/app/lib_sql_crud.py b/app/lib_sql_crud.py new file mode 100644 index 0000000..730ecb1 --- /dev/null +++ b/app/lib_sql_crud.py @@ -0,0 +1,368 @@ +""" +Standardized SQL CRUD operations for the Aether API. +Provides high-level helpers for INSERT, UPDATE, SELECT, and DELETE. +""" +import logging +import json +from typing import Any, List, Optional +from sqlalchemy import text, Time +from sqlalchemy.exc import IntegrityError, OperationalError, ProgrammingError + +from app.log import log, logger_reset +# CRITICAL: Import the global connection state from lib_sql_core +from app.lib_sql_core import db, sql_connect, set_last_sql_error + +# Helper for resolving random IDs +from app.lib_redis_helpers import lookup_id_random_pop + +# ### BEGIN ### API DB SQL ### sql_insert() ### +@logger_reset +def sql_insert( + sql: str|None = None, + data: dict|None = None, + table_name: str|None = None, + rm_id_random: bool = False, + id_random_length: int = 8, + log_lvl: int = logging.WARNING, + ) -> None|bool|int: + log.setLevel(log_lvl) + + if sql: + sql_insert_stmt = text(sql) + elif table_name and data: + if rm_id_random: + data = lookup_id_random_pop(obj_data=data) + if not data.get('id_random', None) and id_random_length: + import secrets + data['id_random'] = secrets.token_urlsafe(id_random_length) + + fields = [] + values = [] + for key, value in data.items(): + if key != 'id': + fields.append('`'+str(key)+'`') + values.append(':'+str(key)) + if isinstance(value, (dict, list)): + data[key] = json.dumps(value) + + sql_insert_stmt = text(f"INSERT INTO `{table_name}` ({', '.join(fields)}) VALUES ({', '.join(values)});") + else: + log.error('SQL INSERT statement could not be created. Missing params.') + return False + + trans = db.begin() + try: + result_insert = db.execute(sql_insert_stmt, data) + trans.commit() + except IntegrityError as e: + trans.rollback() + log.error('Integrity error (likely duplicate). Returning None') + log.debug(e) + set_last_sql_error(e) + return None + except Exception as e: + trans.rollback() + log.error('Unknown exception in sql_insert. Returning False') + log.exception(e) + set_last_sql_error(e) + return False + else: + if result_insert.rowcount == 1 and result_insert.lastrowid > 0: + return result_insert.lastrowid + return False +# ### END ### API DB SQL ### sql_insert() ### + + +# ### BEGIN ### API DB SQL ### sql_update() ### +@logger_reset +def sql_update( + sql: str|None = None, + data: dict|None = None, + table_name: str|None = None, + record_id: int|None = None, + record_id_random: str|None = None, + rm_id_random: bool = False, + id_random_length: None|int = None, + log_lvl: int = logging.WARNING, + ): + log.setLevel(log_lvl) + + if sql: + sql_update_stmt = text(sql) + elif table_name and data: + if rm_id_random: + data = lookup_id_random_pop(obj_data=data) + if not data.get('id_random', None) and id_random_length: + import secrets + data['id_random'] = secrets.token_urlsafe(id_random_length) + + field_list = [] + for key, value in data.items(): + if key != 'id': + field_list.append('`'+str(key) + '` = :' + str(key)) + if isinstance(value, (dict, list)): + data[key] = json.dumps(value) + + sql_set = ', '.join(field_list) + if len(sql_set) < 4: + return None + + if record_id: + data['id'] = record_id + sql_update_stmt = text(f'UPDATE `{table_name}` SET {sql_set} WHERE id = :id') + elif record_id_random: + data['id_random'] = record_id_random + sql_update_stmt = text(f'UPDATE `{table_name}` SET {sql_set} WHERE id_random = :id_random') + elif 'id' in data: + sql_update_stmt = text(f'UPDATE `{table_name}` SET {sql_set} WHERE id = :id') + elif 'id_random' in data: + sql_update_stmt = text(f'UPDATE `{table_name}` SET {sql_set} WHERE id_random = :id_random') + else: + return False + else: + return False + + trans = db.begin() + try: + result_update = db.execute(sql_update_stmt, data) + trans.commit() + except OperationalError: + trans.rollback() + log.error('Operational error (gone away?). Retrying once...') + sql_connect(current_db=db) + try: + result_update = db.execute(sql_update_stmt, data) + trans.commit() + except Exception as e: + set_last_sql_error(e) + return False + except Exception as e: + trans.rollback() + log.exception(e) + set_last_sql_error(e) + return False + else: + if result_update.rowcount >= 1: + return True + return None +# ### END ### API DB SQL ### sql_update() ### + + +# ### BEGIN ### Core Help CRUD ### sql_insert_or_update() ### +@logger_reset +def sql_insert_or_update( + sql: str|None = None, + data: dict|None = None, + table_name: str|None = None, + rm_id_random: bool = False, + id_random_length: int|None = None, + log_lvl: int = logging.DEBUG, + ): + log.setLevel(log_lvl) + + if sql: + stmt = text(sql) + elif table_name and data: + if rm_id_random: + data = lookup_id_random_pop(obj_data=data) + if not data.get('id_random', None) and id_random_length: + import secrets + data['id_random'] = secrets.token_urlsafe(id_random_length) + + fields = [f'`{k}`' for k in data.keys() if k != 'id'] + placeholders = [f':{k}' for k in data.keys() if k != 'id'] + updates = [f'`{k}` = :{k}' for k in data.keys() if k != 'id'] + + for k, v in data.items(): + if isinstance(v, (dict, list)): + data[k] = json.dumps(v) + + stmt = text(f"INSERT INTO `{table_name}` ({', '.join(fields)}) VALUES ({', '.join(placeholders)}) " + f"ON DUPLICATE KEY UPDATE {', '.join(updates)};") + else: + return False + + trans = db.begin() + try: + res = db.execute(stmt, data) + trans.commit() + return res.lastrowid if res.lastrowid > 0 else True + except Exception as e: + trans.rollback() + log.exception(e) + return False +# ### END ### Core Help CRUD ### sql_insert_or_update() ### + + +# ### BEGIN ### Core Help CRUD ### sql_select() ### +@logger_reset +def sql_select( + table_name: str|None = None, + record_id: int|None = None, + record_id_random: str|None = None, + field_name: str|None = None, + field_value = None, + enabled: str|None = None, + hidden: str|None = None, + qry_dict_li: dict|None = None, + fulltext_qry_dict: dict|None = None, + and_qry_dict: dict|None = None, + and_like_dict: dict|None = None, + or_like_dict: dict|None = None, + and_in_dict_li: dict|None = None, + search_query: Any|None = None, + searchable_fields: List[str]|None = None, + order_by_li: dict|None = None, + limit: int = 9999999, + offset: int = 0, + sql: str|None = None, + data: dict|None = None, + rm_id_random: bool = False, + as_dict: bool|None = True, + as_list: bool|None = False, + max_count: int = 100000, + log_lvl: int = logging.WARNING, + ) -> None|bool|dict|list: + from app.lib_sql_search import ( + sql_enable_part, sql_hidden_part, sql_search_qry_part, + sql_where_qry_part, sql_fulltext_qry_part, sql_and_qry_part, + sql_and_like_part, sql_or_like_part, sql_and_in_dict_li_part + ) + + log.setLevel(log_lvl) + + sql_limit_offset = f'LIMIT {limit} OFFSET {offset}' if limit >= 0 and offset >= 0 else '' + + sql_order_by = '' + if order_by_li and isinstance(order_by_li, dict): + order_by_str_li = [f'`{table_name}`.`{k}` {v}' for k, v in order_by_li.items()] + sql_order_by = f"ORDER BY {', '.join(order_by_str_li)}" + + if table_name and not (record_id or record_id_random or field_name or field_value or sql or data): + data = {} + s_en, d_en = sql_enable_part(table_name, enabled) if enabled else ('', None) + s_hi, d_hi = sql_hidden_part(table_name, hidden) if hidden else ('', None) + if d_en is not None: data['enabled'] = d_en + if d_hi is not None: data['hidden'] = d_hi + + s_search, d_search = ('', {}) + if search_query: + s_search, d_search = sql_search_qry_part(search_query, searchable_fields, table_name=table_name) + data.update(d_search) + + stmt = text(f"SELECT * FROM `{table_name}` WHERE 1=1 {s_search} {s_en} {s_hi} {sql_order_by} {sql_limit_offset};") + + elif table_name and (record_id or record_id_random) and not (field_name or field_value or sql or data): + data = {'rid': record_id} if record_id else {'ridr': record_id_random} + where = f"`{table_name}`.id = :rid" if record_id else f"`{table_name}`.id_random = :ridr" + stmt = text(f"SELECT * FROM `{table_name}` WHERE {where} {sql_order_by} {sql_limit_offset};") + + elif table_name and field_name and field_value and not (record_id or record_id_random or sql or data): + data = {field_name: field_value} + s_where, d_where = sql_where_qry_part(qry_dict_li) if qry_dict_li else ('', {}) + s_ft, d_ft = sql_fulltext_qry_part(fulltext_qry_dict) if fulltext_qry_dict else ('', {}) + s_and, d_and = sql_and_qry_part(and_qry_dict) if and_qry_dict else ('', {}) + s_alike, d_alike = sql_and_like_part(and_like_dict) if and_like_dict else ('', {}) + s_olike, d_olike = sql_or_like_part(or_like_dict) if or_like_dict else ('', {}) + s_in, d_in = sql_and_in_dict_li_part(and_in_dict_li) if and_in_dict_li else ('', {}) + s_search, d_search = sql_search_qry_part(search_query, searchable_fields, table_name=table_name) if search_query else ('', {}) + s_en, d_en = sql_enable_part(table_name, enabled) if enabled else ('', None) + s_hi, d_hi = sql_hidden_part(table_name, hidden) if hidden else ('', None) + + data.update(d_where); data.update(d_ft); data.update(d_and); data.update(d_alike) + data.update(d_olike); data.update(d_in); data.update(d_search) + if d_en is not None: data['enabled'] = d_en + if d_hi is not None: data['hidden'] = d_hi + + stmt = text(f"SELECT * FROM `{table_name}` WHERE `{table_name}`.{field_name} = :{field_name} " + f"{s_where} {s_ft} {s_and} {s_alike} {s_olike} {s_in} {s_search} {s_en} {s_hi} {sql_order_by} {sql_limit_offset};") + elif table_name and data and not (record_id or record_id_random or field_name or field_value or sql): + if rm_id_random: data = lookup_id_random_pop(obj_data=data) + where_clauses = [f"`{table_name}`.{k} = :{k}" for k in data.keys()] + stmt = text(f"SELECT * FROM `{table_name}` WHERE {' AND '.join(where_clauses)} {sql_order_by} {sql_limit_offset};") + elif sql: + stmt = text(sql) + else: + return False + + result = run_sql_select(sql=stmt, data=data) + if not result: + return [] if as_list else None + + if result.rowcount == 1: + record = dict(result.first()) if as_dict else result.first() + return [record] if as_list else record + elif result.rowcount > 1: + records = [dict(r) for r in result.fetchall()] if as_dict else result.fetchall() + return records + else: + return [] if as_list else None +# ### END ### Core Help CRUD ### sql_select() ### + + +# ### BEGIN ### Core Help CRUD ### run_sql_select() ### +@logger_reset +def run_sql_select( + sql: Any, + data: dict|None = None, + commit: bool = False, + log_lvl: int = logging.WARNING, + ) -> Any: + log.setLevel(log_lvl) + if not db: + return False + + try: + if commit: trans = db.begin() + sql = sql.columns(recurring_start_time=Time, recurring_end_time=Time) + result = db.execute(sql, data) if data else db.execute(sql) + if commit: trans.commit() + return result + except (OperationalError, ProgrammingError): + log.error('DB Error. Retrying once...') + sql_connect(current_db=db) + try: + if commit: trans = db.begin() + result = db.execute(sql, data) if data else db.execute(sql) + if commit: trans.commit() + return result + except Exception: + return False + except Exception as e: + log.exception(e) + return False +# ### END ### Core Help CRUD ### run_sql_select() ### + +# ### BEGIN ### Core Help CRUD ### sql_delete() ### +@logger_reset +def sql_delete( + table_name: str|None = None, + record_id: int|None = None, + record_id_random: str|None = None, + field_name: str|None = None, + field_value = None, + sql: str|None = None, + data: dict|None = None, + log_lvl: int = logging.INFO, + ) -> None|bool: + log.setLevel(log_lvl) + + if table_name and (record_id or record_id_random) and not (field_name or field_value or sql or data): + data = {'rid': record_id} if record_id else {'ridr': record_id_random} + where = f"`{table_name}`.id = :rid" if record_id else f"`{table_name}`.id_random = :ridr" + stmt = text(f"DELETE FROM `{table_name}` WHERE {where}") + elif table_name and field_name and field_value and not (record_id or record_id_random or sql or data): + data = {field_name: field_value} + stmt = text(f"DELETE FROM `{table_name}` WHERE `{table_name}`.{field_name} = :{field_name}") + elif sql: + stmt = text(sql) + else: + return False + + try: + result = db.execute(stmt, data) if data else db.execute(stmt) + return True if result.rowcount >= 1 else None + except Exception as e: + log.exception(e) + return False +# ### END ### Core Help CRUD ### sql_delete() ###