""" Redis-based ID resolution and caching helpers for Aether. """ import datetime import random import redis import logging from app.config import settings log = logging.getLogger(__name__) def redis_lookup_id_random( record_id_random: int|str, table_name: str, check_int_id: bool = False, log_lvl: int = logging.WARNING, # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL minutes: int = 30, # Expire the Redis key after 8 minutes reset_rate: int = 10, # 1 in 10 chance of resetting the Redis key ) -> str|int|bool|None: """ Looks up a record ID in Redis, falling back to SQL if not found. Resolves 'id_random' (URL-safe string) to internal integer 'id'. """ from app.db_sql import sql_select, get_id_random log.setLevel(log_lvl) if isinstance(record_id_random, str) and len(record_id_random) >= 11 and len(record_id_random) <= 22: pass elif isinstance(record_id_random, int): record_id = record_id_random if check_int_id: log.info(f'Checking the int ID if exists. Table Name: {table_name} ID: {record_id}') if get_id_random_result := get_id_random( record_id = record_id, table_name = table_name, ): log.info(f'The int ID exists. Returning the int ID. ID Random: {get_id_random_result}') return record_id else: log.info(f'The int ID does not exists. Returning False. Table Name: {table_name} ID: {record_id}') return False else: log.debug(f'Not checking if the int ID exists. Returning the int ID. ID: {record_id}') return record_id elif record_id_random is None: log.info(f'No record ID was passed. Returning None') return None else: log.error(f'Unexpected data type or string format: {type(record_id_random)} Expected type is a string 11 or 22 characters long.') return False if record_id_random and table_name: if len(record_id_random) < 11: log.error(f'The length of id_random is too short: {record_id_random} ({len(record_id_random)} chars)') return False elif len(record_id_random) > 22: log.error(f'The length of id_random is too long: {record_id_random} ({len(record_id_random)} chars)') return False elif record_id_random: log.error(f'Missing table_name to select from for id_random "{record_id_random}"') return False elif table_name: log.error(f'Missing id_random to select from table "{table_name}"') return False else: log.error('Missing table_name and record_id_random') return False r = redis.Redis(host=settings.REDIS['server'], port=settings.REDIS['port'], db=7, password=None, decode_responses=True) key_name = f'{table_name}:{record_id_random}' record_id = r.get(key_name) if record_id and random.randint(1, reset_rate) == 1: log.warning(f'Redis: Randomly (1/{reset_rate}) setting record_id to None. Key="{key_name}" value="{record_id}" TTL={r.ttl(key_name)} seconds') record_id = None if record_id: r.setex(key_name, datetime.timedelta(minutes=minutes), value=record_id) log.info(f'Redis: Entry found for: Key="{key_name}" value="{record_id}" TTL={r.ttl(key_name)} seconds') return int(record_id) elif table_name: data = { 'id_random': record_id_random } sql = f"SELECT id FROM `{table_name}` AS `table` WHERE `table`.id_random = :id_random;" if select_results := sql_select(sql=sql, data=data): log.debug(f'SQL: SELECT result: {select_results}') if isinstance(select_results, dict): log.info(f"""SQL: Found ID Random for: {str(record_id_random)} = {str(select_results.get('id'))}""") if rid := select_results.get('id'): r.setex(key_name, datetime.timedelta(minutes=minutes), value=rid) return int(rid) else: log.error('The SQL result was not what was expected. The ID field was not found.') return False else: log.error(f'SQL: More than one record found in "{table_name}". Duplicate id_random!') return redis_lookup_id_random(record_id_random=record_id_random, table_name=table_name) else: log.warning(f'SQL: ID Random "{record_id_random}" not found in "{table_name}". Returning None.') return None log.error('Unexpected state in redis_lookup_id_random.') return False def get_id_random( record_id: int, table_name: str, log_lvl: int = logging.WARNING, # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL ) -> str|bool|None: """ Looks up the 'id_random' for a given internal integer ID. """ from app.db_sql import sql_select log.setLevel(log_lvl) data = { 'id': record_id } sql = f"SELECT id_random FROM `{table_name}` AS `table` WHERE `table`.id = :id;" if select_results := sql_select(sql=sql, data=data): if isinstance(select_results, dict): if record_id_random := select_results.get('id_random'): return str(record_id_random) else: log.error('The SQL result was not what was expected.') return False elif isinstance(select_results, list): log.exception('More than one record may have been found. Duplicate ID!') return False else: log.exception(f'Got an unexpected result while trying to look up the ID.') return False elif select_results is None: return None else: return False def reset_redis(): """Flushes the Redis database used for ID caching.""" r = redis.Redis(host=settings.REDIS['server'], port=settings.REDIS['port'], db=7, password=None, decode_responses=True) r.flushdb() return True