""" Standardized SQL CRUD operations for the Aether API. Provides high-level helpers for INSERT, UPDATE, SELECT, and DELETE. """ import logging import json from typing import Any, List, Optional from sqlalchemy import text, Time from sqlalchemy.exc import IntegrityError, OperationalError, ProgrammingError from app.log import log, logger_reset # CRITICAL: Import the core module to access current global state from app import lib_sql_core from app.lib_sql_core import sql_connect, set_last_sql_error # Helper for resolving random IDs from app.lib_redis_helpers import lookup_id_random_pop # ### BEGIN ### API DB SQL ### sql_insert() ### @logger_reset def sql_insert( sql: str|None = None, data: dict|None = None, table_name: str|None = None, rm_id_random: bool = False, id_random_length: int = 8, log_lvl: int = logging.WARNING, ) -> None|bool|int: log.setLevel(log_lvl) if sql: sql_insert_stmt = text(sql) elif table_name and data: if rm_id_random: data = lookup_id_random_pop(obj_data=data) if not data.get('id_random', None) and id_random_length: import secrets data['id_random'] = secrets.token_urlsafe(id_random_length) fields = [] values = [] for key, value in data.items(): if key != 'id': fields.append('`'+str(key)+'`') values.append(':'+str(key)) if isinstance(value, (dict, list)): data[key] = json.dumps(value) sql_insert_stmt = text(f"INSERT INTO `{table_name}` ({', '.join(fields)}) VALUES ({', '.join(values)});") else: log.error('SQL INSERT statement could not be created. Missing params.') return False trans = lib_sql_core.db.begin() try: result_insert = lib_sql_core.db.execute(sql_insert_stmt, data) trans.commit() except IntegrityError as e: trans.rollback() log.error('Integrity error (likely duplicate). Returning None') log.debug(e) set_last_sql_error(e) return None except Exception as e: trans.rollback() log.error('Unknown exception in sql_insert. Returning False') log.exception(e) set_last_sql_error(e) return False else: if result_insert.rowcount == 1 and result_insert.lastrowid > 0: return result_insert.lastrowid return False # ### END ### API DB SQL ### sql_insert() ### # ### BEGIN ### API DB SQL ### sql_update() ### @logger_reset def sql_update( sql: str|None = None, data: dict|None = None, table_name: str|None = None, record_id: int|None = None, record_id_random: str|None = None, rm_id_random: bool = False, id_random_length: None|int = None, log_lvl: int = logging.WARNING, ): log.setLevel(log_lvl) if sql: sql_update_stmt = text(sql) elif table_name and data: if rm_id_random: data = lookup_id_random_pop(obj_data=data) if not data.get('id_random', None) and id_random_length: import secrets data['id_random'] = secrets.token_urlsafe(id_random_length) field_list = [] for key, value in data.items(): if key != 'id': field_list.append('`'+str(key) + '` = :' + str(key)) if isinstance(value, (dict, list)): data[key] = json.dumps(value) sql_set = ', '.join(field_list) if len(sql_set) < 4: return None if record_id: data['id'] = record_id sql_update_stmt = text(f'UPDATE `{table_name}` SET {sql_set} WHERE id = :id') elif record_id_random: data['id_random'] = record_id_random sql_update_stmt = text(f'UPDATE `{table_name}` SET {sql_set} WHERE id_random = :id_random') elif 'id' in data: sql_update_stmt = text(f'UPDATE `{table_name}` SET {sql_set} WHERE id = :id') elif 'id_random' in data: sql_update_stmt = text(f'UPDATE `{table_name}` SET {sql_set} WHERE id_random = :id_random') else: return False else: return False trans = lib_sql_core.db.begin() try: result_update = lib_sql_core.db.execute(sql_update_stmt, data) trans.commit() except OperationalError: trans.rollback() log.error('Operational error (gone away?). Retrying once...') sql_connect(current_db=lib_sql_core.db) try: result_update = lib_sql_core.db.execute(sql_update_stmt, data) trans.commit() except Exception as e: set_last_sql_error(e) return False except Exception as e: trans.rollback() log.exception(e) set_last_sql_error(e) return False else: if result_update.rowcount >= 1: return True return None # ### END ### API DB SQL ### sql_update() ### # ### BEGIN ### Core Help CRUD ### sql_insert_or_update() ### @logger_reset def sql_insert_or_update( sql: str|None = None, data: dict|None = None, table_name: str|None = None, rm_id_random: bool = False, id_random_length: int|None = None, log_lvl: int = logging.DEBUG, ): log.setLevel(log_lvl) if sql: stmt = text(sql) elif table_name and data: if rm_id_random: data = lookup_id_random_pop(obj_data=data) if not data.get('id_random', None) and id_random_length: import secrets data['id_random'] = secrets.token_urlsafe(id_random_length) fields = [f'`{k}`' for k in data.keys() if k != 'id'] placeholders = [f':{k}' for k in data.keys() if k != 'id'] updates = [f'`{k}` = :{k}' for k in data.keys() if k != 'id'] for k, v in data.items(): if isinstance(v, (dict, list)): data[k] = json.dumps(v) stmt = text(f"INSERT INTO `{table_name}` ({', '.join(fields)}) VALUES ({', '.join(placeholders)}) " f"ON DUPLICATE KEY UPDATE {', '.join(updates)};") else: return False trans = lib_sql_core.db.begin() try: res = lib_sql_core.db.execute(stmt, data) trans.commit() return res.lastrowid if res.lastrowid > 0 else True except Exception as e: trans.rollback() log.exception(e) return False # ### END ### Core Help CRUD ### sql_insert_or_update() ### # ### BEGIN ### Core Help CRUD ### sql_select() ### @logger_reset def sql_select( table_name: str|None = None, record_id: int|None = None, record_id_random: str|None = None, field_name: str|None = None, field_value = None, enabled: str|None = None, hidden: str|None = None, qry_dict_li: dict|None = None, fulltext_qry_dict: dict|None = None, and_qry_dict: dict|None = None, and_like_dict: dict|None = None, or_like_dict: dict|None = None, and_in_dict_li: dict|None = None, search_query: Any|None = None, searchable_fields: List[str]|None = None, order_by_li: dict|None = None, limit: int = 9999999, offset: int = 0, sql: str|None = None, data: dict|None = None, rm_id_random: bool = False, as_dict: bool|None = True, as_list: bool|None = False, max_count: int = 100000, log_lvl: int = logging.WARNING, ) -> None|bool|dict|list: from app.lib_sql_search import ( sql_enable_part, sql_hidden_part, sql_search_qry_part, sql_where_qry_part, sql_fulltext_qry_part, sql_and_qry_part, sql_and_like_part, sql_or_like_part, sql_and_in_dict_li_part ) log.setLevel(log_lvl) sql_limit_offset = f'LIMIT {limit} OFFSET {offset}' if limit >= 0 and offset >= 0 else '' sql_order_by = '' if order_by_li and isinstance(order_by_li, dict): order_by_str_li = [f'`{table_name}`.`{k}` {v}' for k, v in order_by_li.items()] sql_order_by = f"ORDER BY {', '.join(order_by_str_li)}" if table_name and not (record_id or record_id_random or field_name or field_value or sql or data): data = {} s_en, d_en = sql_enable_part(table_name, enabled) if enabled else ('', None) s_hi, d_hi = sql_hidden_part(table_name, hidden) if hidden else ('', None) if d_en is not None: data['enabled'] = d_en if d_hi is not None: data['hidden'] = d_hi s_search, d_search = ('', {}) if search_query: s_search, d_search = sql_search_qry_part(search_query, searchable_fields, table_name=table_name) data.update(d_search) stmt = text(f"SELECT * FROM `{table_name}` WHERE 1=1 {s_search} {s_en} {s_hi} {sql_order_by} {sql_limit_offset};") elif table_name and (record_id or record_id_random) and not (field_name or field_value or sql or data): data = {'rid': record_id} if record_id else {'ridr': record_id_random} where = f"`{table_name}`.id = :rid" if record_id else f"`{table_name}`.id_random = :ridr" stmt = text(f"SELECT * FROM `{table_name}` WHERE {where} {sql_order_by} {sql_limit_offset};") elif table_name and field_name and field_value and not (record_id or record_id_random or sql or data): data = {field_name: field_value} s_where, d_where = sql_where_qry_part(qry_dict_li) if qry_dict_li else ('', {}) s_ft, d_ft = sql_fulltext_qry_part(fulltext_qry_dict) if fulltext_qry_dict else ('', {}) s_and, d_and = sql_and_qry_part(and_qry_dict) if and_qry_dict else ('', {}) s_alike, d_alike = sql_and_like_part(and_like_dict) if and_like_dict else ('', {}) s_olike, d_olike = sql_or_like_part(or_like_dict) if or_like_dict else ('', {}) s_in, d_in = sql_and_in_dict_li_part(and_in_dict_li) if and_in_dict_li else ('', {}) s_search, d_search = sql_search_qry_part(search_query, searchable_fields, table_name=table_name) if search_query else ('', {}) s_en, d_en = sql_enable_part(table_name, enabled) if enabled else ('', None) s_hi, d_hi = sql_hidden_part(table_name, hidden) if hidden else ('', None) data.update(d_where); data.update(d_ft); data.update(d_and); data.update(d_alike) data.update(d_olike); data.update(d_in); data.update(d_search) if d_en is not None: data['enabled'] = d_en if d_hi is not None: data['hidden'] = d_hi stmt = text(f"SELECT * FROM `{table_name}` WHERE `{table_name}`.{field_name} = :{field_name} " f"{s_where} {s_ft} {s_and} {s_alike} {s_olike} {s_in} {s_search} {s_en} {s_hi} {sql_order_by} {sql_limit_offset};") elif table_name and data and not (record_id or record_id_random or field_name or field_value or sql): if rm_id_random: data = lookup_id_random_pop(obj_data=data) where_clauses = [f"`{table_name}`.{k} = :{k}" for k in data.keys()] stmt = text(f"SELECT * FROM `{table_name}` WHERE {' AND '.join(where_clauses)} {sql_order_by} {sql_limit_offset};") elif sql: stmt = text(sql) else: return False result = run_sql_select(sql=stmt, data=data) if not result: return [] if as_list else None # Fetch all rows first to determine actual count reliably rows = result.all() count = len(rows) if count == 0: return [] if as_list else None if count == 1: record = dict(rows[0]) if as_dict else rows[0] return [record] if as_list else record # count > 1 records = [dict(r) for r in rows] if as_dict else rows return records # ### END ### Core Help CRUD ### sql_select() ### # ### BEGIN ### API DB SQL ### run_sql_select() ### @logger_reset def run_sql_select( sql: text, data: dict|None = None, log_lvl: int = logging.WARNING, ) -> Any: log.setLevel(log_lvl) # print(f"Executing SQL: {sql} with data: {data}", flush=True) try: return lib_sql_core.db.execute(sql, data) except (OperationalError, ProgrammingError) as e: log.error(f'DB Error: {e}. Retrying once...') sql_connect(current_db=lib_sql_core.db) try: return lib_sql_core.db.execute(sql, data) except Exception as e2: set_last_sql_error(e2) raise e2 # RAISING instead of returning False except Exception as e: log.exception(e) set_last_sql_error(e) raise e # RAISING instead of returning False # ### END ### API DB SQL ### run_sql_select() ### # ### BEGIN ### Core Help CRUD ### sql_delete() ### @logger_reset def sql_delete( table_name: str|None = None, record_id: int|None = None, record_id_random: str|None = None, field_name: str|None = None, field_value = None, sql: str|None = None, data: dict|None = None, log_lvl: int = logging.INFO, ) -> None|bool: log.setLevel(log_lvl) if table_name and (record_id or record_id_random) and not (field_name or field_value or sql or data): data = {'rid': record_id} if record_id else {'ridr': record_id_random} where = f"`{table_name}`.id = :rid" if record_id else f"`{table_name}`.id_random = :ridr" stmt = text(f"DELETE FROM `{table_name}` WHERE {where}") elif table_name and field_name and field_value and not (record_id or record_id_random or sql or data): data = {field_name: field_value} stmt = text(f"DELETE FROM `{table_name}` WHERE `{table_name}`.{field_name} = :{field_name}") elif sql: stmt = text(sql) else: return False try: result = lib_sql_core.db.execute(stmt, data) if data else lib_sql_core.db.execute(stmt) return True if result.rowcount >= 1 else None except Exception as e: log.exception(e) return False # ### END ### Core Help CRUD ### sql_delete() ###