diff --git a/app/db_connection.py b/app/db_connection.py new file mode 100644 index 0000000..ca6ed33 --- /dev/null +++ b/app/db_connection.py @@ -0,0 +1,28 @@ +""" +Independent database connection module to prevent circular imports. +""" +import logging +from sqlalchemy import create_engine +from app.config import settings + +# Use local logger to avoid importing app.log (which might create cycles) +log = logging.getLogger(__name__) + +db_uri = settings.SQLALCHEMY_DB_URI +engine = create_engine( + url = db_uri, + echo = False, + pool_use_lifo = True, + pool_pre_ping = True, + pool_recycle = settings.DB['pool_recycle'], + isolation_level = 'READ COMMITTED', + connect_args = {'connect_timeout': settings.DB['connect_timeout']} +) + +log.info('DB Connection initializing...') +db = None +try: + db = engine.connect() + log.info(f'Connected to database: {db_uri}') +except Exception: + log.exception('Could not connect to database.') \ No newline at end of file diff --git a/app/lib_id_resolver.py b/app/lib_id_resolver.py new file mode 100644 index 0000000..9933c0b --- /dev/null +++ b/app/lib_id_resolver.py @@ -0,0 +1,130 @@ +""" +Centralized ID random to integer ID resolution. +""" +import logging +import datetime +import random +import redis +from app.config import settings +from app.db_connection import db + +log = logging.getLogger(__name__) + +def redis_lookup_id_random( + record_id_random: int|str, + table_name: str, + check_int_id: bool = False, + log_lvl: int = logging.WARNING, + minutes: int = 30, + reset_rate: int = 10, + ) -> str|int|bool|None: + """ + Looks up a record ID in Redis, falling back to SQL if not found. + Resolves 'id_random' (URL-safe string) to internal integer 'id'. + """ + from app.db_sql import sql_select, get_id_random + + log.setLevel(log_lvl) + + if isinstance(record_id_random, str) and 11 <= len(record_id_random) <= 22: + pass + elif isinstance(record_id_random, int): + if check_int_id: + if get_id_random(record_id=record_id_random, table_name=table_name): + return record_id_random + return False + return record_id_random + elif record_id_random is None: + return None + else: + log.error(f'Unexpected data type: {type(record_id_random)}. Expected string (11-22 chars) or int.') + return False + + if not table_name: + log.error(f'Missing table_name for id_random lookup: {record_id_random}') + return False + + r = redis.Redis(host=settings.REDIS['server'], port=settings.REDIS['port'], db=7, password=None, decode_responses=True) + key_name = f'{table_name}:{record_id_random}' + + record_id = r.get(key_name) + + # Periodic cache refresh + if record_id and random.randint(1, reset_rate) == 1: + log.warning(f'Redis: Randomly (1/{reset_rate}) refreshing cache for Key="{key_name}"') + record_id = None + + if record_id: + r.setex(key_name, datetime.timedelta(minutes=minutes), value=record_id) + return int(record_id) + else: + data = { 'id_random': record_id_random } + sql = f"SELECT id FROM `{table_name}` WHERE id_random = :id_random;" + + if select_results := sql_select(sql=sql, data=data): + if isinstance(select_results, dict): + if rid := select_results.get('id'): + r.setex(key_name, datetime.timedelta(minutes=minutes), value=rid) + return int(rid) + log.error('SQL result missing ID field.') + return False + else: + log.error(f'SQL: Duplicate id_random found in "{table_name}". Retrying...') + return redis_lookup_id_random(record_id_random=record_id_random, table_name=table_name) + else: + log.warning(f'SQL: ID Random "{record_id_random}" not found in "{table_name}".') + return None + +def lookup_id_random_pop( + obj_data: dict, + log_lvl: int = logging.WARNING, + ): + """ + Resolves any *_id_random fields in a dict to their integer IDs and removes the random keys. + """ + log.setLevel(log_lvl) + + # List of common prefix patterns to resolve + id_patterns = [ + 'account', 'activity_log', 'address', 'archive', 'contact', 'cont_edu_cert', + 'cont_edu_cert_person', 'event', 'event_abstract', 'event_badge', + 'event_badge_template', 'event_exhibit', 'event_file', 'event_location', + 'event_person', 'event_person_profile', 'event_presentation', + 'event_presenter', 'event_registration', 'event_session', 'event_track', + 'grant', 'hosted_file', 'journal', 'journal_entry', 'membership_group', + 'membership_person_group', 'membership_person', 'membership_type', + 'membership_person_type', 'order', 'order_line', 'order_cart', + 'order_cart_line', 'organization', 'page', 'person', 'post', 'product', + 'sponsorship', 'sponsorship_cfg', 'site', 'user' + ] + + for prefix in id_patterns: + key = f'{prefix}_id_random' + if key in obj_data: + table = prefix + if prefix == 'address_location': table = 'address' + if prefix in ['contact_1', 'contact_2']: table = 'contact' + if prefix == 'event_id_random_only': table = 'event' + + resolved_id = redis_lookup_id_random(record_id_random=obj_data[key], table_name=table) + obj_data[f'{prefix}_id'] = resolved_id + obj_data.pop(key) + + # Handle polymorphic link fields + polymorphic = [ + ('for_type', 'for_id_random', 'for_id'), + ('link_to_type', 'link_to_id_random', 'link_to_id'), + ('object_type', 'object_id_random', 'object_id'), + ('to_object_type', 'to_object_id_random', 'to_object_id'), + ('from_object_type', 'from_object_id_random', 'from_object_id') + ] + + for type_key, rand_key, id_key in polymorphic: + if type_key in obj_data and rand_key in obj_data: + obj_data[id_key] = redis_lookup_id_random( + record_id_random=obj_data[rand_key], + table_name=obj_data[type_key] + ) + obj_data.pop(rand_key) + + return obj_data \ No newline at end of file diff --git a/app/lib_sql_core.py b/app/lib_sql_core.py new file mode 100644 index 0000000..3325a6c --- /dev/null +++ b/app/lib_sql_core.py @@ -0,0 +1,281 @@ +""" +Core SQL CRUD operations and connection management for Aether. +""" +import logging +import json +from typing import Any, List, Optional +from sqlalchemy import text, Time +from sqlalchemy.exc import IntegrityError, OperationalError, ProgrammingError + +from app.config import settings +from app.log import logger_reset + +# Local logger to avoid cycles +log = logging.getLogger(__name__) + +def sql_connect(current_db, log_lvl: int = logging.INFO) -> bool: + """Disposes of the current connection pool and prepares for reconnection.""" + if current_db: + current_db.engine.dispose() + return True + return False + + +def run_sql_select( + sql: Any, + data: dict|None = None, + commit: bool = False, + log_lvl: int = logging.WARNING, + ) -> Any: + """Executes a SQL SELECT statement with retry logic for operational errors.""" + from app.db_sql import db + log.setLevel(log_lvl) + + if not db: + log.error('The database connection is not available!!!') + return False + + try: + if commit: trans = db.begin() + sql = sql.columns(recurring_start_time=Time, recurring_end_time=Time) + result = db.execute(sql, data) if data else db.execute(sql) + if commit: trans.commit() + return result + except (OperationalError, ProgrammingError) as e: + log.error(f'DB Error ({type(e).__name__}). Retrying...') + try: + if commit: trans = db.begin() + result = db.execute(sql, data) if data else db.execute(sql) + if commit: trans.commit() + return result + except Exception: + log.exception('Second attempt failed.') + return False + except Exception: + log.exception('Unknown SQL exception.') + return False + + +def sql_select( + table_name: str|None = None, + record_id: int|None = None, + record_id_random: str|None = None, + field_name: str|None = None, + field_value = None, + enabled: str|None = None, + hidden: str|None = None, + qry_dict_li: dict|None = None, + fulltext_qry_dict: dict|None = None, + and_qry_dict: dict|None = None, + and_like_dict: dict|None = None, + or_like_dict: dict|None = None, + and_in_dict_li: dict|None = None, + search_query: Any|None = None, + searchable_fields: List[str]|None = None, + order_by_li: dict|None = None, + limit: int = 9999999, + offset: int = 0, + sql: str|None = None, + data: dict|None = None, + as_dict: bool = True, + as_list: bool = False, + log_lvl: int = logging.WARNING, + ) -> Any: + """The main entry point for selecting records from the database.""" + from app.lib_sql_search import ( + sql_enable_part, sql_hidden_part, sql_search_qry_part, + sql_where_qry_part, sql_fulltext_qry_part, sql_and_qry_part, + sql_and_like_part, sql_or_like_part, sql_and_in_dict_li_part + ) + + log.setLevel(log_lvl) + sql_limit_offset = f'LIMIT {limit} OFFSET {offset}' if limit >= 0 and offset >= 0 else '' + + sql_order_by = '' + if order_by_li and isinstance(order_by_li, dict): + order_by_str_li = [f'`{table_name}`.`{k}` {v}' for k, v in order_by_li.items()] + sql_order_by = f"ORDER BY {', '.join(order_by_str_li)}" + + if table_name and not (record_id or record_id_random or field_name or field_value or sql or data): + data = {} + s_en, d_en = sql_enable_part(table_name, enabled) if enabled else ('', None) + s_hi, d_hi = sql_hidden_part(table_name, hidden) if hidden else ('', None) + if d_en is not None: data['enabled'] = d_en + if d_hi is not None: data['hidden'] = d_hi + + s_search, d_search = ('', {}) + if search_query: + s_search, d_search = sql_search_qry_part(search_query, searchable_fields, table_name=table_name) + data.update(d_search) + + query = text(f"SELECT * FROM `{table_name}` WHERE 1=1 {s_search} {s_en} {s_hi} {sql_order_by} {sql_limit_offset};") + + elif table_name and (record_id or record_id_random) and not (field_name or field_value or sql or data): + data = {'rid': record_id} if record_id else {'ridr': record_id_random} + where = f"`{table_name}`.id = :rid" if record_id else f"`{table_name}`.id_random = :ridr" + query = text(f"SELECT * FROM `{table_name}` WHERE {where} {sql_order_by} {sql_limit_offset};") + + elif table_name and field_name and field_value and not (record_id or record_id_random or sql or data): + data = {field_name: field_value} + + s_where, d_where = sql_where_qry_part(qry_dict_li) if qry_dict_li else ('', {}) + s_ft, d_ft = sql_fulltext_qry_part(fulltext_qry_dict) if fulltext_qry_dict else ('', {}) + s_and, d_and = sql_and_qry_part(and_qry_dict) if and_qry_dict else ('', {}) + s_alike, d_alike = sql_and_like_part(and_like_dict) if and_like_dict else ('', {}) + s_olike, d_olike = sql_or_like_part(or_like_dict) if or_like_dict else ('', {}) + s_in, d_in = sql_and_in_dict_li_part(and_in_dict_li) if and_in_dict_li else ('', {}) + s_search, d_search = sql_search_qry_part(search_query, searchable_fields, table_name=table_name) if search_query else ('', {}) + s_en, d_en = sql_enable_part(table_name, enabled) if enabled else ('', None) + s_hi, d_hi = sql_hidden_part(table_name, hidden) if hidden else ('', None) + + data.update(d_where); data.update(d_ft); data.update(d_and); data.update(d_alike) + data.update(d_olike); data.update(d_in); data.update(d_search) + if d_en is not None: data['enabled'] = d_en + if d_hi is not None: data['hidden'] = d_hi + + query = text(f"SELECT * FROM `{table_name}` WHERE `{table_name}`.{field_name} = :{field_name} " + f"{s_where} {s_ft} {s_and} {s_alike} {s_olike} {s_in} {s_search} {s_en} {s_hi} {sql_order_by} {sql_limit_offset};") + else: + log.warning('No matched parameter combination for sql_select') + return False + + result = run_sql_select(sql=query, data=data) + if result is False: return False + + if result.rowcount == 1: + record = dict(result.first()) if as_dict else result.first() + return [record] if as_list else record + elif result.rowcount > 1: + records = [dict(r) for record in result.fetchall()] if as_dict else result.fetchall() + return records + else: + return [] if as_list else None + + +def sql_insert(table_name: str, data: dict, log_lvl: int = logging.WARNING) -> Any: + """Inserts a new record into the specified table.""" + from app.db_sql import db + from app.lib_redis_helpers import lookup_id_random_pop + log.setLevel(log_lvl) + + data = lookup_id_random_pop(data) + if not data.get('id_random'): + import secrets + data['id_random'] = secrets.token_urlsafe(8) + + fields = [f'`{k}`' for k in data.keys() if k != 'id'] + placeholders = [f':{k}' for k in data.keys() if k != 'id'] + + for k, v in data.items(): + if isinstance(v, (dict, list)): data[k] = json.dumps(v) + + query = text(f"INSERT INTO `{table_name}` ({', '.join(fields)}) VALUES ({', '.join(placeholders)});") + + trans = db.begin() + try: + result = db.execute(query, data) + trans.commit() + return result.lastrowid if result.rowcount == 1 else False + except Exception: + trans.rollback() + log.exception('Insert failed.') + return False + + +def sql_update(table_name: str, data: dict, record_id: int|None = None, record_id_random: str|None = None, log_lvl: int = logging.WARNING) -> Any: + """Updates an existing record in the specified table.""" + from app.db_sql import db + from app.lib_redis_helpers import lookup_id_random_pop + log.setLevel(log_lvl) + + data = lookup_id_random_pop(data) + set_clauses = [f"`{k}` = :{k}" for k in data.keys() if k != 'id'] + + for k, v in data.items(): + if isinstance(v, (dict, list)): data[k] = json.dumps(v) + + if record_id: + where = "id = :rid"; data['rid'] = record_id + elif record_id_random: + where = "id_random = :ridr"; data['ridr'] = record_id_random + elif 'id' in data: + where = "id = :id" + else: + return False + + query = text(f"UPDATE `{table_name}` SET {', '.join(set_clauses)} WHERE {where};") + + trans = db.begin() + try: + result = db.execute(query, data) + trans.commit() + return True if result.rowcount >= 1 else None + except Exception: + trans.rollback() + log.exception('Update failed.') + return False + + +def sql_delete(table_name: str, record_id: int|None = None, record_id_random: str|None = None, log_lvl: int = logging.INFO) -> Any: + """Deletes a record from the specified table.""" + from app.db_sql import db + log.setLevel(log_lvl) + + if record_id: + where = "id = :rid"; data = {'rid': record_id} + elif record_id_random: + where = "id_random = :ridr"; data = {'ridr': record_id_random} + else: + return False + + query = text(f"DELETE FROM `{table_name}` WHERE {where};") + + try: + result = db.execute(query, data) + return True if result.rowcount >= 1 else None + except Exception: + log.exception('Delete failed.') + return False + + +def sql_insert_or_update( + table_name: str, + data: dict, + rm_id_random: bool = False, + log_lvl: int = logging.DEBUG, + ): + """Modularized wrapper for INSERT ... ON DUPLICATE KEY UPDATE.""" + from app.db_sql import db + from app.lib_redis_helpers import lookup_id_random_pop + if not table_name or not data: return False + + if rm_id_random: + data = lookup_id_random_pop(obj_data=data) + + fields = [f'`{k}`' for k in data.keys() if k != 'id'] + values = [f':{k}' for k in data.keys() if k != 'id'] + updates = [f'`{k}` = :{k}' for k in data.keys() if k != 'id'] + + query = text(f"INSERT INTO `{table_name}` ({', '.join(fields)}) VALUES ({', '.join(values)}) " + f"ON DUPLICATE KEY UPDATE {', '.join(updates)};") + + trans = db.begin() + try: + result = db.execute(query, data) + trans.commit() + return result.lastrowid if result.lastrowid > 0 else True + except Exception: + trans.rollback() + log.exception('sql_insert_or_update failed.') + return False + + +def get_account_id_w_for_type_id(for_type: str, for_id: int|str) -> int|bool|None: + """Modularized helper to find an account_id associated with an object.""" + from app.lib_redis_helpers import redis_lookup_id_random + from app.db_sql import sql_select + if fid := redis_lookup_id_random(record_id_random=for_id, table_name=for_type): + sql = f"SELECT account_id FROM `{for_type}` WHERE id = :fid LIMIT 1;" + if result := sql_select(sql=sql, data={'fid': fid}): + return result.get('account_id') + return False