import datetime, json, pytz, random, redis, secrets from typing import Any, List, Optional from timeit import default_timer as timer from fastapi import HTTPException from app.config import settings from app.log import log, logging, logger_reset from sqlalchemy import create_engine, text, Time from sqlalchemy.exc import IntegrityError, OperationalError, ProgrammingError from sqlalchemy.pool import NullPool from app.lib_sql_search import ( sql_limit_offset_part as _sql_limit_offset_part, sql_and_like_part as _sql_and_like_part, sql_or_like_part as _sql_or_like_part, sql_and_in_dict_li_part as _sql_and_in_dict_li_part, sql_and_qry_part as _sql_and_qry_part, sql_fulltext_qry_part as _sql_fulltext_qry_part, sql_enable_part as _sql_enable_part, sql_hidden_part as _sql_hidden_part, sql_where_qry_part as _sql_where_qry_part, sql_search_qry_part as _sql_search_qry_part ) from app.lib_redis_helpers import ( redis_lookup_id_random as _redis_lookup_id_random, get_id_random as _get_id_random, reset_redis as _reset_redis ) db_uri = settings.SQLALCHEMY_DB_URI engine = create_engine( url = db_uri, echo = False, pool_use_lifo = True, pool_pre_ping = True, pool_recycle = settings.DB['pool_recycle'], isolation_level = 'READ COMMITTED', connect_args = {'connect_timeout': settings.DB['connect_timeout']} ) # NOTE: The default isolation_level is 'REPEATABLE READ'. This can sometimes not show updated data. # NOTE: The "echo" set to True option shows the SQL queries. # NOTE: Switching from READ COMMITTED to READ UNCOMMITTED (or REPEATABLE READ). Testing started 2024-04-23 # levels: "REPEATABLE READ" "READ COMMITTED" "READ UNCOMMITTED" "SERIALIZABLE" log.info('DB SQL trying to connect...') db = None try: db = engine.connect() log.info(f'Connected to database: {db_uri}') except Exception: log.exception('Could not connect to database.') # ### BEGIN ### API DB SQL ### sql_connect() ### @logger_reset def sql_connect(current_db, log_lvl: int = logging.INFO) -> None|bool|int: log.setLevel(log_lvl) log.debug(locals()) log.info('Trying to create a new engine (connection pool)...') if current_db: current_db.engine.dispose() log.info('Disposed of the current engine (connection pool).') return True else: log.warning(f'Could not created and or connect to database') return False # ### END ### API DB SQL ### sql_connect() ### # ### BEGIN ### API DB SQL ### sql_insert() ### # Returns the auto number ID of the record inserted, or returns None if there was likely a duplicate record, or False if there was a problem. @logger_reset def sql_insert( sql: str|None = None, data: dict|None = None, table_name: str|None = None, rm_id_random: bool = False, id_random_length: int = 8, log_lvl: int = logging.WARNING, ) -> None|bool|int: log.setLevel(log_lvl) if sql: sql_insert_stmt = text(sql) elif table_name and data: if rm_id_random: data = lookup_id_random_pop(obj_data=data) if not data.get('id_random', None) and id_random_length: data['id_random'] = secrets.token_urlsafe(id_random_length) fields = [] values = [] for key, value in data.items(): if key != 'id': fields.append('`'+str(key)+'`') values.append(':'+str(key)) if isinstance(value, (dict, list)): data[key] = json.dumps(value) sql_insert_stmt = text(f"INSERT INTO `{table_name}` ({', '.join(fields)}) VALUES ({', '.join(values)});") else: log.error('SQL INSERT statement could not be created. Missing params.') return False trans = db.begin() try: result_insert = db.execute(sql_insert_stmt, data) trans.commit() except IntegrityError as e: trans.rollback() log.error('Integrity error (likely duplicate). Returning None') log.debug(e) return None except Exception as e: trans.rollback() log.error('Unknown exception in sql_insert. Returning False') log.exception(e) return False else: if result_insert.rowcount == 1 and result_insert.lastrowid > 0: return result_insert.lastrowid return False # ### END ### API DB SQL ### sql_insert() ### # ### BEGIN ### API DB SQL ### sql_update() ### @logger_reset def sql_update( sql: str|None = None, data: dict|None = None, table_name: str|None = None, record_id: int|None = None, record_id_random: str|None = None, rm_id_random: bool = False, id_random_length: None|int = None, log_lvl: int = logging.WARNING, ): log.setLevel(log_lvl) if sql: sql_update_stmt = text(sql) elif table_name and data: if rm_id_random: data = lookup_id_random_pop(obj_data=data) if not data.get('id_random', None) and id_random_length: data['id_random'] = secrets.token_urlsafe(id_random_length) field_list = [] for key, value in data.items(): if key != 'id': field_list.append('`'+str(key) + '` = :' + str(key)) if isinstance(value, (dict, list)): data[key] = json.dumps(value) sql_set = ', '.join(field_list) if len(sql_set) < 4: return None if record_id: data['id'] = record_id sql_update_stmt = text(f'UPDATE `{table_name}` SET {sql_set} WHERE id = :id') elif record_id_random: data['id_random'] = record_id_random sql_update_stmt = text(f'UPDATE `{table_name}` SET {sql_set} WHERE id_random = :id_random') elif 'id' in data: sql_update_stmt = text(f'UPDATE `{table_name}` SET {sql_set} WHERE id = :id') elif 'id_random' in data: sql_update_stmt = text(f'UPDATE `{table_name}` SET {sql_set} WHERE id_random = :id_random') else: return False else: return False trans = db.begin() try: result_update = db.execute(sql_update_stmt, data) trans.commit() except OperationalError: trans.rollback() log.error('Operational error (gone away?). Retrying once...') sql_connect(current_db=db) try: result_update = db.execute(sql_update_stmt, data) trans.commit() except Exception: return False except Exception as e: trans.rollback() log.exception(e) return False else: if result_update.rowcount >= 1: return True return None # ### END ### API DB SQL ### sql_update() ### # ### BEGIN ### Core Help CRUD ### sql_insert_or_update() ### @logger_reset def sql_insert_or_update( sql: str|None = None, data: dict|None = None, table_name: str|None = None, rm_id_random: bool = False, id_random_length: int|None = None, log_lvl: int = logging.DEBUG, ): log.setLevel(log_lvl) if sql: stmt = text(sql) elif table_name and data: if rm_id_random: data = lookup_id_random_pop(obj_data=data) if not data.get('id_random', None) and id_random_length: data['id_random'] = secrets.token_urlsafe(id_random_length) fields = [f'`{k}`' for k in data.keys() if k != 'id'] placeholders = [f':{k}' for k in data.keys() if k != 'id'] updates = [f'`{k}` = :{k}' for k in data.keys() if k != 'id'] for k, v in data.items(): if isinstance(v, (dict, list)): data[k] = json.dumps(v) stmt = text(f"INSERT INTO `{table_name}` ({', '.join(fields)}) VALUES ({', '.join(placeholders)}) " f"ON DUPLICATE KEY UPDATE {', '.join(updates)};") else: return False trans = db.begin() try: res = db.execute(stmt, data) trans.commit() return res.lastrowid if res.lastrowid > 0 else True except Exception as e: trans.rollback() log.exception(e) return False # ### END ### Core Help CRUD ### sql_insert_or_update() ### # ### BEGIN ### Core Help CRUD ### sql_select() ### @logger_reset def sql_select( table_name: str|None = None, record_id: int|None = None, record_id_random: str|None = None, field_name: str|None = None, field_value = None, enabled: str|None = None, hidden: str|None = None, qry_dict_li: dict|None = None, fulltext_qry_dict: dict|None = None, and_qry_dict: dict|None = None, and_like_dict: dict|None = None, or_like_dict: dict|None = None, and_in_dict_li: dict|None = None, search_query: Any|None = None, searchable_fields: List[str]|None = None, order_by_li: dict|None = None, limit: int = 9999999, offset: int = 0, sql: str|None = None, data: dict|None = None, rm_id_random: bool = False, as_dict: bool|None = True, as_list: bool|None = False, max_count: int = 100000, log_lvl: int = logging.WARNING, ) -> None|bool|dict|list: log.setLevel(log_lvl) sql_limit_offset = f'LIMIT {limit} OFFSET {offset}' if limit >= 0 and offset >= 0 else '' sql_order_by = '' if order_by_li and isinstance(order_by_li, dict): order_by_str_li = [f'`{table_name}`.`{k}` {v}' for k, v in order_by_li.items()] sql_order_by = f"ORDER BY {', '.join(order_by_str_li)}" if table_name and not (record_id or record_id_random or field_name or field_value or sql or data): data = {} s_en, d_en = sql_enable_part(table_name, enabled) if enabled else ('', None) s_hi, d_hi = sql_hidden_part(table_name, hidden) if hidden else ('', None) if d_en is not None: data['enabled'] = d_en if d_hi is not None: data['hidden'] = d_hi s_search, d_search = ('', {}) if search_query: s_search, d_search = sql_search_qry_part(search_query, searchable_fields, table_name=table_name) data.update(d_search) stmt = text(f"SELECT * FROM `{table_name}` WHERE 1=1 {s_search} {s_en} {s_hi} {sql_order_by} {sql_limit_offset};") elif table_name and (record_id or record_id_random) and not (field_name or field_value or sql or data): data = {'rid': record_id} if record_id else {'ridr': record_id_random} where = f"`{table_name}`.id = :rid" if record_id else f"`{table_name}`.id_random = :ridr" stmt = text(f"SELECT * FROM `{table_name}` WHERE {where} {sql_order_by} {sql_limit_offset};") elif table_name and field_name and field_value and not (record_id or record_id_random or sql or data): data = {field_name: field_value} s_where, d_where = sql_where_qry_part(qry_dict_li) if qry_dict_li else ('', {}) s_ft, d_ft = sql_fulltext_qry_part(fulltext_qry_dict) if fulltext_qry_dict else ('', {}) s_and, d_and = sql_and_qry_part(and_qry_dict) if and_qry_dict else ('', {}) s_alike, d_alike = sql_and_like_part(and_like_dict) if and_like_dict else ('', {}) s_olike, d_olike = sql_or_like_part(or_like_dict) if or_like_dict else ('', {}) s_in, d_in = sql_and_in_dict_li_part(and_in_dict_li) if and_in_dict_li else ('', {}) s_search, d_search = sql_search_qry_part(search_query, searchable_fields, table_name=table_name) if search_query else ('', {}) s_en, d_en = sql_enable_part(table_name, enabled) if enabled else ('', None) s_hi, d_hi = sql_hidden_part(table_name, hidden) if hidden else ('', None) data.update(d_where); data.update(d_ft); data.update(d_and); data.update(d_alike) data.update(d_olike); data.update(d_in); data.update(d_search) if d_en is not None: data['enabled'] = d_en if d_hi is not None: data['hidden'] = d_hi stmt = text(f"SELECT * FROM `{table_name}` WHERE `{table_name}`.{field_name} = :{field_name} " f"{s_where} {s_ft} {s_and} {s_alike} {s_olike} {s_in} {s_search} {s_en} {s_hi} {sql_order_by} {sql_limit_offset};") elif table_name and data and not (record_id or record_id_random or field_name or field_value or sql): if rm_id_random: data = lookup_id_random_pop(obj_data=data) where_clauses = [f"`{table_name}`.{k} = :{k}" for k in data.keys()] stmt = text(f"SELECT * FROM `{table_name}` WHERE {' AND '.join(where_clauses)} {sql_order_by} {sql_limit_offset};") elif sql: stmt = text(sql) else: return False result = run_sql_select(sql=stmt, data=data) if not result: return [] if as_list else None if result.rowcount == 1: record = dict(result.first()) if as_dict else result.first() return [record] if as_list else record elif result.rowcount > 1: records = [dict(r) for r in result.fetchall()] if as_dict else result.fetchall() return records else: return [] if as_list else None # ### END ### Core Help CRUD ### sql_select() ### # ### BEGIN ### Core Help CRUD ### run_sql_select() ### @logger_reset def run_sql_select( sql: Any, data: dict|None = None, commit: bool = False, log_lvl: int = logging.WARNING, ) -> Any: log.setLevel(log_lvl) if not db: return False try: if commit: trans = db.begin() sql = sql.columns(recurring_start_time=Time, recurring_end_time=Time) result = db.execute(sql, data) if data else db.execute(sql) if commit: trans.commit() return result except (OperationalError, ProgrammingError): log.error('DB Error. Retrying once...') sql_connect(current_db=db) try: if commit: trans = db.begin() result = db.execute(sql, data) if data else db.execute(sql) if commit: trans.commit() return result except Exception: return False except Exception as e: log.exception(e) return False # ### END ### Core Help CRUD ### run_sql_select() ### # ### BEGIN ### Core Help CRUD ### sql_delete() ### @logger_reset def sql_delete( table_name: str|None = None, record_id: int|None = None, record_id_random: str|None = None, field_name: str|None = None, field_value = None, sql: str|None = None, data: dict|None = None, log_lvl: int = logging.INFO, ) -> None|bool: log.setLevel(log_lvl) if table_name and (record_id or record_id_random) and not (field_name or field_value or sql or data): data = {'rid': record_id} if record_id else {'ridr': record_id_random} where = f"`{table_name}`.id = :rid" if record_id else f"`{table_name}`.id_random = :ridr" stmt = text(f"DELETE FROM `{table_name}` WHERE {where}") elif table_name and field_name and field_value and not (record_id or record_id_random or sql or data): data = {field_name: field_value} stmt = text(f"DELETE FROM `{table_name}` WHERE `{table_name}`.{field_name} = :{field_name}") elif sql: stmt = text(sql) else: return False try: result = db.execute(stmt, data) if data else db.execute(stmt) return True if result.rowcount >= 1 else None except Exception as e: log.exception(e) return False # ### END ### Core Help CRUD ### sql_delete() ### # ### BEGIN ### API DB SQL ### redis_lookup_id_random() ### @logger_reset def redis_lookup_id_random( record_id_random: int|str, table_name: str, check_int_id: bool = False, log_lvl: int = logging.WARNING, minutes: int = 30, reset_rate: int = 10, ) -> str|int|bool|None: log.setLevel(log_lvl) return _redis_lookup_id_random(record_id_random, table_name, check_int_id, log_lvl, minutes, reset_rate) # ### END ### API DB SQL ### redis_lookup_id_random() ### # ### BEGIN ### API DB SQL ### get_id_random() ### @logger_reset def get_id_random(record_id: int, table_name: str, log_lvl: int = logging.WARNING) -> str|bool|None: log.setLevel(log_lvl) return _get_id_random(record_id, table_name, log_lvl) # ### END ### API DB SQL ### get_id_random() ### @logger_reset def reset_redis(): return _reset_redis() # ### BEGIN ### API DB SQL ### lookup_id_random_pop() ### @logger_reset def lookup_id_random_pop(obj_data: dict, log_lvl: int = logging.WARNING): return _lookup_id_random_pop(obj_data, log_lvl) # ### END ### API DB SQL ### lookup_id_random_pop() ### # ### BEGIN ### API DB SQL Methods ### get_account_id_w_for_type_id() ### @logger_reset def get_account_id_w_for_type_id(for_type: str, for_id: int|str) -> bool|int|None: log.setLevel(logging.WARNING) if fid := redis_lookup_id_random(record_id_random=for_id, table_name=for_type): sql = f"SELECT account_id FROM `{for_type}` WHERE id = :fid LIMIT 1;" if result := sql_select(sql=sql, data={'fid': fid}): return result.get('account_id') return False # ### END ### API DB SQL Methods ### get_account_id_w_for_type_id() ### @logger_reset def sql_where_qry_part(qry_dict_li: list) -> bool|str: log.setLevel(logging.INFO) return _sql_where_qry_part(qry_dict_li) @logger_reset def sql_fulltext_qry_part(fulltext_qry_dict: dict) -> bool|dict: log.setLevel(logging.INFO) return _sql_fulltext_qry_part(fulltext_qry_dict) @logger_reset def sql_and_qry_part(and_qry_dict_obj: dict) -> bool|dict: log.setLevel(logging.WARNING) return _sql_and_qry_part(and_qry_dict_obj) @logger_reset def sql_and_like_part(and_like_dict_obj: dict) -> bool|dict: log.setLevel(logging.INFO) return _sql_and_like_part(and_like_dict_obj) @logger_reset def sql_or_like_part(or_like_dict_obj: dict) -> bool|dict: log.setLevel(logging.INFO) return _sql_or_like_part(or_like_dict_obj) @logger_reset def sql_and_in_dict_li_part(and_in_dict_li_dict_obj: dict) -> bool|dict: log.setLevel(logging.WARNING) return _sql_and_in_dict_li_part(and_in_dict_li_dict_obj) @logger_reset def sql_enable_part(table_name: str, enabled: str) -> bool|dict: log.setLevel(logging.WARNING) return _sql_enable_part(table_name, enabled) @logger_reset def sql_hidden_part(table_name: str, hidden: str) -> bool|dict: log.setLevel(logging.WARNING) return _sql_hidden_part(table_name, hidden) @logger_reset def sql_limit_offset_part(limit: int, offset: int = 0) -> bool|str: log.setLevel(logging.WARNING) return _sql_limit_offset_part(limit, offset) @logger_reset def sql_search_qry_part(search_query: Any, searchable_fields: List[str]|None = None, max_depth: int = 5, table_name: str|None = None) -> tuple[str, dict]: log.setLevel(logging.INFO) return _sql_search_qry_part(search_query, searchable_fields, max_depth, table_name) __all__ = [ 'db', 'engine', 'sql_connect', 'sql_insert', 'sql_update', 'sql_select', 'run_sql_select', 'sql_delete', 'redis_lookup_id_random', 'get_id_random', 'reset_redis', 'lookup_id_random_pop', 'sql_where_qry_part', 'sql_fulltext_qry_part', 'sql_and_qry_part', 'sql_and_like_part', 'sql_or_like_part', 'sql_and_in_dict_li_part', 'sql_enable_part', 'sql_hidden_part', 'sql_limit_offset_part', 'sql_search_qry_part', 'sql_insert_or_update', 'get_account_id_w_for_type_id' ]