""" Core SQL CRUD operations and connection management for Aether. """ import logging import json from typing import Any, List, Optional from sqlalchemy import text, Time from sqlalchemy.exc import IntegrityError, OperationalError, ProgrammingError from app.config import settings from app.log import logger_reset # Local logger to avoid cycles log = logging.getLogger(__name__) def sql_connect(current_db, log_lvl: int = logging.INFO) -> bool: """Disposes of the current connection pool and prepares for reconnection.""" if current_db: current_db.engine.dispose() return True return False def run_sql_select( sql: Any, data: dict|None = None, commit: bool = False, log_lvl: int = logging.WARNING, ) -> Any: """Executes a SQL SELECT statement with retry logic for operational errors.""" from app.db_sql import db log.setLevel(log_lvl) if not db: log.error('The database connection is not available!!!') return False try: if commit: trans = db.begin() sql = sql.columns(recurring_start_time=Time, recurring_end_time=Time) result = db.execute(sql, data) if data else db.execute(sql) if commit: trans.commit() return result except (OperationalError, ProgrammingError) as e: log.error(f'DB Error ({type(e).__name__}). Retrying...') try: if commit: trans = db.begin() result = db.execute(sql, data) if data else db.execute(sql) if commit: trans.commit() return result except Exception: log.exception('Second attempt failed.') return False except Exception: log.exception('Unknown SQL exception.') return False def sql_select( table_name: str|None = None, record_id: int|None = None, record_id_random: str|None = None, field_name: str|None = None, field_value = None, enabled: str|None = None, hidden: str|None = None, qry_dict_li: dict|None = None, fulltext_qry_dict: dict|None = None, and_qry_dict: dict|None = None, and_like_dict: dict|None = None, or_like_dict: dict|None = None, and_in_dict_li: dict|None = None, search_query: Any|None = None, searchable_fields: List[str]|None = None, order_by_li: dict|None = None, limit: int = 9999999, offset: int = 0, sql: str|None = None, data: dict|None = None, as_dict: bool = True, as_list: bool = False, log_lvl: int = logging.WARNING, ) -> Any: """The main entry point for selecting records from the database.""" from app.lib_sql_search import ( sql_enable_part, sql_hidden_part, sql_search_qry_part, sql_where_qry_part, sql_fulltext_qry_part, sql_and_qry_part, sql_and_like_part, sql_or_like_part, sql_and_in_dict_li_part ) log.setLevel(log_lvl) sql_limit_offset = f'LIMIT {limit} OFFSET {offset}' if limit >= 0 and offset >= 0 else '' sql_order_by = '' if order_by_li and isinstance(order_by_li, dict): order_by_str_li = [f'`{table_name}`.`{k}` {v}' for k, v in order_by_li.items()] sql_order_by = f"ORDER BY {', '.join(order_by_str_li)}" if table_name and not (record_id or record_id_random or field_name or field_value or sql or data): data = {} s_en, d_en = sql_enable_part(table_name, enabled) if enabled else ('', None) s_hi, d_hi = sql_hidden_part(table_name, hidden) if hidden else ('', None) if d_en is not None: data['enabled'] = d_en if d_hi is not None: data['hidden'] = d_hi s_search, d_search = ('', {}) if search_query: s_search, d_search = sql_search_qry_part(search_query, searchable_fields, table_name=table_name) data.update(d_search) query = text(f"SELECT * FROM `{table_name}` WHERE 1=1 {s_search} {s_en} {s_hi} {sql_order_by} {sql_limit_offset};") elif table_name and (record_id or record_id_random) and not (field_name or field_value or sql or data): data = {'rid': record_id} if record_id else {'ridr': record_id_random} where = f"`{table_name}`.id = :rid" if record_id else f"`{table_name}`.id_random = :ridr" query = text(f"SELECT * FROM `{table_name}` WHERE {where} {sql_order_by} {sql_limit_offset};") elif table_name and field_name and field_value and not (record_id or record_id_random or sql or data): data = {field_name: field_value} s_where, d_where = sql_where_qry_part(qry_dict_li) if qry_dict_li else ('', {}) s_ft, d_ft = sql_fulltext_qry_part(fulltext_qry_dict) if fulltext_qry_dict else ('', {}) s_and, d_and = sql_and_qry_part(and_qry_dict) if and_qry_dict else ('', {}) s_alike, d_alike = sql_and_like_part(and_like_dict) if and_like_dict else ('', {}) s_olike, d_olike = sql_or_like_part(or_like_dict) if or_like_dict else ('', {}) s_in, d_in = sql_and_in_dict_li_part(and_in_dict_li) if and_in_dict_li else ('', {}) s_search, d_search = sql_search_qry_part(search_query, searchable_fields, table_name=table_name) if search_query else ('', {}) s_en, d_en = sql_enable_part(table_name, enabled) if enabled else ('', None) s_hi, d_hi = sql_hidden_part(table_name, hidden) if hidden else ('', None) data.update(d_where); data.update(d_ft); data.update(d_and); data.update(d_alike) data.update(d_olike); data.update(d_in); data.update(d_search) if d_en is not None: data['enabled'] = d_en if d_hi is not None: data['hidden'] = d_hi query = text(f"SELECT * FROM `{table_name}` WHERE `{table_name}`.{field_name} = :{field_name} " f"{s_where} {s_ft} {s_and} {s_alike} {s_olike} {s_in} {s_search} {s_en} {s_hi} {sql_order_by} {sql_limit_offset};") else: log.warning('No matched parameter combination for sql_select') return False result = run_sql_select(sql=query, data=data) if result is False: return False if result.rowcount == 1: record = dict(result.first()) if as_dict else result.first() return [record] if as_list else record elif result.rowcount > 1: records = [dict(r) for record in result.fetchall()] if as_dict else result.fetchall() return records else: return [] if as_list else None def sql_insert(table_name: str, data: dict, log_lvl: int = logging.WARNING) -> Any: """Inserts a new record into the specified table.""" from app.db_sql import db from app.lib_redis_helpers import lookup_id_random_pop log.setLevel(log_lvl) data = lookup_id_random_pop(data) if not data.get('id_random'): import secrets data['id_random'] = secrets.token_urlsafe(8) fields = [f'`{k}`' for k in data.keys() if k != 'id'] placeholders = [f':{k}' for k in data.keys() if k != 'id'] for k, v in data.items(): if isinstance(v, (dict, list)): data[k] = json.dumps(v) query = text(f"INSERT INTO `{table_name}` ({', '.join(fields)}) VALUES ({', '.join(placeholders)});") trans = db.begin() try: result = db.execute(query, data) trans.commit() return result.lastrowid if result.rowcount == 1 else False except Exception: trans.rollback() log.exception('Insert failed.') return False def sql_update(table_name: str, data: dict, record_id: int|None = None, record_id_random: str|None = None, log_lvl: int = logging.WARNING) -> Any: """Updates an existing record in the specified table.""" from app.db_sql import db from app.lib_redis_helpers import lookup_id_random_pop log.setLevel(log_lvl) data = lookup_id_random_pop(data) set_clauses = [f"`{k}` = :{k}" for k in data.keys() if k != 'id'] for k, v in data.items(): if isinstance(v, (dict, list)): data[k] = json.dumps(v) if record_id: where = "id = :rid"; data['rid'] = record_id elif record_id_random: where = "id_random = :ridr"; data['ridr'] = record_id_random elif 'id' in data: where = "id = :id" else: return False query = text(f"UPDATE `{table_name}` SET {', '.join(set_clauses)} WHERE {where};") trans = db.begin() try: result = db.execute(query, data) trans.commit() return True if result.rowcount >= 1 else None except Exception: trans.rollback() log.exception('Update failed.') return False def sql_delete(table_name: str, record_id: int|None = None, record_id_random: str|None = None, log_lvl: int = logging.INFO) -> Any: """Deletes a record from the specified table.""" from app.db_sql import db log.setLevel(log_lvl) if record_id: where = "id = :rid"; data = {'rid': record_id} elif record_id_random: where = "id_random = :ridr"; data = {'ridr': record_id_random} else: return False query = text(f"DELETE FROM `{table_name}` WHERE {where};") try: result = db.execute(query, data) return True if result.rowcount >= 1 else None except Exception: log.exception('Delete failed.') return False def sql_insert_or_update( table_name: str, data: dict, rm_id_random: bool = False, log_lvl: int = logging.DEBUG, ): """Modularized wrapper for INSERT ... ON DUPLICATE KEY UPDATE.""" from app.db_sql import db from app.lib_redis_helpers import lookup_id_random_pop if not table_name or not data: return False if rm_id_random: data = lookup_id_random_pop(obj_data=data) fields = [f'`{k}`' for k in data.keys() if k != 'id'] values = [f':{k}' for k in data.keys() if k != 'id'] updates = [f'`{k}` = :{k}' for k in data.keys() if k != 'id'] query = text(f"INSERT INTO `{table_name}` ({', '.join(fields)}) VALUES ({', '.join(values)}) " f"ON DUPLICATE KEY UPDATE {', '.join(updates)};") trans = db.begin() try: result = db.execute(query, data) trans.commit() return result.lastrowid if result.lastrowid > 0 else True except Exception: trans.rollback() log.exception('sql_insert_or_update failed.') return False def get_account_id_w_for_type_id(for_type: str, for_id: int|str) -> int|bool|None: """Modularized helper to find an account_id associated with an object.""" from app.lib_redis_helpers import redis_lookup_id_random from app.db_sql import sql_select if fid := redis_lookup_id_random(record_id_random=for_id, table_name=for_type): sql = f"SELECT account_id FROM `{for_type}` WHERE id = :fid LIMIT 1;" if result := sql_select(sql=sql, data={'fid': fid}): return result.get('account_id') return False