import datetime, json, pytz, random, redis, secrets from typing import Any, List, Optional from timeit import default_timer as timer from fastapi import HTTPException from app.config import settings from app.log import log, logging, logger_reset from sqlalchemy import create_engine, text, Time from sqlalchemy.exc import IntegrityError, OperationalError, ProgrammingError from sqlalchemy.pool import NullPool # from multiprocessing import Pool db_uri = settings.SQLALCHEMY_DB_URI # log.setLevel(logging.DEBUG) # log.debug(settings.DB) # log.setLevel(logging.INFO) connection_string = db_uri # engine = create_engine(url=connection_string, poolclass=NullPool, echo=False, isolation_level='READ COMMITTED', connect_args={'connect_timeout': settings.DB['connect_timeout']}) # 2024-10-08 engine = create_engine( url = connection_string, echo = False, pool_use_lifo = True, pool_pre_ping = True, pool_recycle = settings.DB['pool_recycle'], isolation_level = 'READ COMMITTED', connect_args = {'connect_timeout': settings.DB['connect_timeout']} ) # engine = create_engine(url=connection_string, pool_size=5, max_overflow=15, timeout=settings.DB['connect_timeout'], pool_recycle=settings.DB['pool_recycle'], pool_pre_ping=True, echo=False, echo_pool=True, isolation_level='READ UNCOMMITTED', connect_args={'connect_timeout': settings.DB['connect_timeout']}) # NOTE: The default isolation_level is 'REPEATABLE READ'. This can sometimes not show updated data. # NOTE: The "echo" set to True option shows the SQL queries. # NOTE: Switching from READ COMMITTED to READ UNCOMMITTED (or REPEATABLE READ). Testing started 2024-04-23 # levels: "REPEATABLE READ" "READ COMMITTED" "READ UNCOMMITTED" "SERIALIZABLE" # def run_in_process(some_data_record): # with engine.connect() as conn: # conn.execute(text("...")) # def initializer(): # """ensure the parent proc's database connections are not touched # in the new connection pool""" # engine.dispose(close=False) # with Pool(10, initializer=initializer) as p: # p.map(run_in_process, data) log.info('DB SQL trying to connect...') db = None try: db = engine.connect() log.info(f'Connected to database: {db_uri}') except: log.exception('Could not connect to database.') # log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL # log.debug(db) # log.debug(vars(db)) # log.debug(dir(db)) # ### BEGIN ### API DB SQL ### sql_connect() ### @logger_reset def sql_connect( current_db, log_lvl: int = logging.INFO, # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL ) -> None|bool|int: log.setLevel(log_lvl) log.debug(locals()) # log.info('Trying to recreate the pool...') log.info('Trying to create a new engine (connection pool)...') log.debug('############## ############') log.debug(dir(current_db)) log.debug(vars(current_db)) log.debug('############## ############') log.debug(dir(current_db.engine)) log.debug(vars(current_db.engine)) log.debug('############## ############') log.debug(dir(current_db.engine.pool)) log.debug(vars(current_db.engine.pool)) log.debug('############## ############') if current_db: current_db.engine.dispose() log.info('Disposed of the current engine (connection pool).') # DB = settings.DB # settings.SQLALCHEMY_DB_URI = 'mysql://'+DB['username']+':'+DB['password']+'@'+DB['server']+'/'+DB['name'] # log.debug(settings.DB) # # new_engine = create_engine(url=settings.SQLALCHEMY_DB_URI, poolclass=NullPool, echo=False, isolation_level='READ UNCOMMITTED', connect_args={'connect_timeout': settings.DB['connect_timeout']}) # new_engine = create_engine( # url = settings.SQLALCHEMY_DB_URI, # echo = False, # pool_use_lifo = True, # pool_pre_ping = True, # isolation_level = 'READ COMMITTED', # connect_args = {'connect_timeout': settings.DB['connect_timeout']} # ) # # new_engine = create_engine(url=settings.SQLALCHEMY_DB_URI, pool_size=5, max_overflow=15, timeout=settings.DB['connect_timeout'], pool_recycle=settings.DB['pool_recycle'], pool_pre_ping=True, echo=False, echo_pool=True, isolation_level='READ UNCOMMITTED', connect_args={'connect_timeout': settings.DB['connect_timeout']}) # # new_engine = create_engine(url=settings.SQLALCHEMY_DB_URI, pool_size=5, max_overflow=15, pool_recycle=settings.DB['pool_recycle'], pool_pre_ping=True, echo=False, echo_pool=True, isolation_level='READ UNCOMMITTED', connect_args={'connect_timeout': settings.DB['connect_timeout']}) # current_db.engine = new_engine # log.info(f'Created and connected to database: {settings.SQLALCHEMY_DB_URI}') return True else: log.warning(f'Could not created and or connect to database') return False # current_db.engine.dispose() # db.engine.dispose() # log.debug(db_uri) # DB = settings.DB # settings.SQLALCHEMY_DB_URI = 'mysql://'+DB['username']+':'+DB['password']+'@'+DB['server']+'/'+DB['name'] # # log.debug(settings.SQLALCHEMY_DB_URI) # new_engine = create_engine(url=settings.SQLALCHEMY_DB_URI, pool_size=25, pool_recycle=60, pool_pre_ping=True, echo=False, echo_pool=True, isolation_level='READ COMMITTED') # current_db.engine = new_engine.connect() # current_db.engine.update_url(settings.SQLALCHEMY_DB_URI) # return current_db # engine.connect() # db.engine = engine # db = engine.connect() # return db # ### END ### API DB SQL ### sql_connect() ### # #### ### ## # BEGIN SQL # ## ### #### # Create, Read/Get, Update, Delete # CRUD or CGUD # ### BEGIN ### API DB SQL ### sql_insert() ### # NOTE: Possible future change... Try to look up a duplicate entry if there is one on INSERT. # Returns the auto number ID of the record inserted, or returns None if there was likely a duplicate record, or False if there was a problem of some kind. # Updated 2023-02-08 @logger_reset def sql_insert( sql: str|None = None, data: dict|None = None, table_name: str|None = None, rm_id_random: bool = False, id_random_length: int = 8, log_lvl: int = logging.WARNING, # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL ) -> None|bool|int: log.setLevel(log_lvl) if sql: log.info(f'SQL INSERT using sql string.') log.debug(sql) sql_insert = text(sql) elif table_name and data: log.info(f'SQL INSERT using table_name and data. Table Name: {table_name}') if rm_id_random: data = lookup_id_random_pop(obj_data=data) if not data.get('id_random', None) and id_random_length: data['id_random'] = secrets.token_urlsafe(id_random_length) log.debug(data) fields = [] values = [] for key, value in data.items(): if key != 'id': # A special exception for the id auto increment field. fields.append('`'+str(key)+'`') values.append(':'+str(key)) fields_string = ', '.join(fields) values_string = ', '.join(values) log.debug(fields_string) log.debug(values_string) field_list = [] for key, value in data.items(): if key != 'id': # Creating a special exception for the id field. field_list.append('`'+str(key) + '` = :' + str(key)) if isinstance(value, dict) or isinstance(value, list): data[key] = json.dumps(value) set_values_string = ', '.join(field_list) sql_insert = text(f""" INSERT INTO `{table_name}` ({fields_string}) VALUES ({values_string}); """ ) log.debug(f""" INSERT INTO `{table_name}` ({fields_string}) VALUES ({values_string}); """ ) else: log.error('The SQL INSERT statement could not be created. Something is missing from the sql_insert call?') return False log.debug(sql_insert) log.debug(data) trans = db.begin() try: result_insert = db.execute(sql_insert, data) trans.commit() except IntegrityError as e: # Specifically want to capture duplicate entry attempts # http://sqlalche.me/e/14/gkpj # Need a check for this: sqlalchemy.exc.IntegrityError: (MySQLdb._exceptions.IntegrityError) (1062, "Duplicate entry 'z-yyyy-xxxx-wwww for key 'PRIMARY'" trans.rollback() log.error('An integrity error exception happened. This is likely because there was an attempt to create a duplicate entry. Returning None') log.exception('**** *** ** * ### BEGIN ### Integrity Error Exception Happened: Returning None * ** *** ****') log.error('^^^^ ^^^ ^^ ^ ### END ### Integrity Error Exception ^ ^^ ^^^ ^^^^') log.error('**** *** ** * ### BEGIN ### Integrity Error Exception Details: * ** *** ****') log.error('**** *** ** * SQL Statement: * ** *** ****') log.error(e.statement) log.error('**** *** ** * SQL Parameters: * ** *** ****') log.error(e.params) log.error('**** *** ** * SQL Origin Message: * ** *** ****') log.error(e.orig) log.error('**** *** ** * ### END ### Integrity Error Exception Details: * ** *** ****') return None except OperationalError as e: # Likely an unknown field or related trans.rollback() log.error('An operational error exception happened. This is likely because there was an unknown field or similar included. Returning False') log.exception('**** *** ** * ### BEGIN ### Operational Error Exception Happened: Returning False * ** *** ****') log.error('^^^^ ^^^ ^^ ^ ### END ### Operational Error Exception ^ ^^ ^^^ ^^^^') log.error('**** *** ** * ### BEGIN ### Operational Error Exception Details: * ** *** ****') log.error('**** *** ** * SQL Statement: * ** *** ****') log.error(e.statement) log.error('**** *** ** * SQL Parameters: * ** *** ****') log.error(e.params) log.error('**** *** ** * SQL Origin Message: * ** *** ****') log.error(e.orig) log.error('**** *** ** * ### END ### Operational Error Exception Details: * ** *** ****') return False except Exception as e: trans.rollback() log.error('An unknown exception happened. Returning False') log.exception('**** *** ** * ### BEGIN ### Exception Happened: Returning False * ** *** ****') log.error('^^^^ ^^^ ^^ ^ ### END ### Exception ^ ^^ ^^^ ^^^^') log.error('**** *** ** * ### BEGIN ### Exception Details: * ** *** ****') log.error('**** *** ** * SQL Statement: * ** *** ****') log.error(e.statement) log.error('**** *** ** * SQL Parameters: * ** *** ****') log.error(e.params) log.error('**** *** ** * SQL Origin Message: * ** *** ****') log.error(e.orig) log.error('**** *** ** * ### END ### Exception Details: * ** *** ****') return False else: log.debug(result_insert) log.debug(f'rowcount = {result_insert.rowcount}; lastrowid = {result_insert.lastrowid}') if result_insert.rowcount == 1 and result_insert.lastrowid > 0: # insert record_id = result_insert.lastrowid log.info(f'Insert record: {record_id}') return record_id #elif result_insert.rowcount == 1 and result_insert.lastrowid == 0: # update with no change #log.info('Update record with no change') #return True #elif result_insert.rowcount == 2 and result_insert.lastrowid > 0: # update with change #log.info('Update record with changes') #record_id = result_insert.lastrowid #return record_id else: log.debug(vars(result_insert)) log.debug(dir(result_insert)) log.debug(result_insert.rowcount) # returns 1 on insert and 2 on update with change log.debug(result_insert.lastrowid) # returns last row ID on insert and update with a change and returns 0 if nothing changed return False return False # ### END ### API DB SQL ### sql_insert() ### # ### BEGIN ### API DB SQL ### sql_update() ### # Updated 2023-02-08 @logger_reset def sql_update( sql: str|None = None, data: dict|None = None, table_name: str|None = None, record_id: int|None = None, record_id_random: str|None = None, rm_id_random: bool = False, id_random_length: None|int = None, log_lvl: int = logging.WARNING, # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL ): log.setLevel(log_lvl) if sql: sql_update = text(sql) elif table_name and data: if rm_id_random: data = lookup_id_random_pop(obj_data=data) if not data.get('id_random', None) and id_random_length: data['id_random'] = secrets.token_urlsafe(id_random_length) log.debug(data) fields_string = [] for key, value in data.items(): if key != 'id': # Creating a special exception for the id field. fields_string.append('`'+str(key) + '` = :' + str(key)) if isinstance(value, dict) or isinstance(value, list): data[key] = json.dumps(value) sql_set = ', '.join(fields_string) log.debug(sql_set) if len(sql_set) < 4: # NOTE: Returning None instead of False since technically the SQL query did not fail. Just that nothing was updated related to that record. I have been returning False if there is a SQL query problem. STI 2022-01-19 # NOTE: A better check might be specifically for 'id' and no other dict values. log.warning('The SQL SET is unexpectedly short and may not have data. Returning None') return None if record_id: log.info(f'Update record with ID: {record_id}') data['id'] = record_id sql = 'UPDATE `'+table_name+'` SET '+ sql_set + ' WHERE id = :id' elif record_id_random: log.info(f'Update record with ID random: {record_id_random}') data['id_random'] = record_id_random sql = 'UPDATE `'+table_name+'` SET '+ sql_set + ' WHERE id_random = :id_random' elif 'id' in data: log.info(f"Update record with ID in data dict: {data['id']}") sql = 'UPDATE `'+table_name+'` SET '+ sql_set + ' WHERE id = :id' elif 'id_random' in data: # NOTE: For now it is not possible to update the id_random when supplying the id_random as the primary key for a record. # NOTE: In the future I can use record_id_random=True as a special case SQL UPDATE. log.info(f"Update record with ID in data dict: {data['id_random']}") sql = 'UPDATE `'+table_name+'` SET '+ sql_set + ' WHERE id_random = :id_random' else: log.warning('Something was missing from the sql_update function call.') return False sql_update = text(sql) else: log.error('The SQL UPDATE statement could not be created. Something is missing from the sql_update call?') return False log.debug(sql_update) trans = db.begin() try: log.info('Trying to execute the SQL UPDATE query...') result_update = db.execute(sql_update, data) trans.commit() except IntegrityError as e: # Specifically want to capture duplicate entry attempts # http://sqlalche.me/e/14/gkpj # Need a check for this: sqlalchemy.exc.IntegrityError: (MySQLdb._exceptions.IntegrityError) (1062, "Duplicate entry 'z-yyyy-xxxx-wwww for key 'PRIMARY'" trans.rollback() log.error('An integrity error exception happened. This is likely because there was an attempt to create a duplicate entry. Returning None') log.exception('**** *** ** * ### BEGIN ### Integrity Error Exception Happened: Returning None * ** *** ****') log.error('^^^^ ^^^ ^^ ^ ### END ### Integrity Error Exception ^ ^^ ^^^ ^^^^') log.error('**** *** ** * ### BEGIN ### Integrity Error Exception Details: * ** *** ****') log.error('**** *** ** * SQL Statement: * ** *** ****') log.error(e.statement) log.error('**** *** ** * SQL Parameters: * ** *** ****') log.error(e.params) log.error('**** *** ** * SQL Origin Message: * ** *** ****') log.error(e.orig) log.error('**** *** ** * ### END ### Integrity Error Exception Details: * ** *** ****') return None except OperationalError as e: # Likely an unknown field or related trans.rollback() # log.error('An operational error exception happened. This is likely because there was an unknown field or similar included. Returning False') # log.exception('**** *** ** * ### BEGIN ### Operational Error Exception Happened: Returning False * ** *** ****') # log.error('^^^^ ^^^ ^^ ^ ### END ### Operational Error Exception ^ ^^ ^^^ ^^^^') # log.error('**** *** ** * ### BEGIN ### Operational Error Exception Details: * ** *** ****') # log.error('**** *** ** * SQL Statement: * ** *** ****') # log.error(e.statement) # log.error('**** *** ** * SQL Parameters: * ** *** ****') # log.error(e.params) # log.error('**** *** ** * SQL Origin Message: * ** *** ****') # log.error(e.orig) # log.error('**** *** ** * ### END ### Operational Error Exception Details: * ** *** ****') # return False log.error('An operational error exception happened. This is likely a "MySQL server has gone away" error. Going to try again...') log.exception('**** *** ** * ### BEGIN ### Operational Exception Happened: Trying again... * ** *** ****') log.error('^^^^ ^^^ ^^ ^ ### END ### Operational Exception ^ ^^ ^^^ ^^^^') sql_connect(current_db=db) log.info('Now trying the query again...') try: # log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.info('2x Executing with SQL statement and data...') result_update = db.execute(sql_update, data) trans.commit() except Exception as e: log.error('Tried again an exception was raised again. Not going to try again.') log.exception('**** *** ** * ### BEGIN ### (2x) Second Exception Happened: Returning False * ** *** ****') log.error('^^^^ ^^^ ^^ ^ ### END ### Exception ^ ^^ ^^^ ^^^^') return False # Not successful else: log.info('Successfully executed the SQL on the second try.') pass except Exception as e: trans.rollback() log.error('An unknown exception happened. Returning False') log.exception('**** *** ** * ### BEGIN ### Exception Happened: Returning False * ** *** ****') log.error('^^^^ ^^^ ^^ ^ ### END ### Exception ^ ^^ ^^^ ^^^^') log.error('**** *** ** * ### BEGIN ### Exception Details: * ** *** ****') log.error('**** *** ** * SQL Statement: * ** *** ****') log.error(e.statement) log.error('**** *** ** * SQL Parameters: * ** *** ****') log.error(e.params) log.error('**** *** ** * SQL Origin Message: * ** *** ****') log.error(e.orig) log.error('**** *** ** * ### END ### Exception Details: * ** *** ****') return False else: log.debug(result_update) log.debug(f'rowcount = {result_update.rowcount}; lastrowid = {result_update.lastrowid}') if result_update.rowcount >= 1 and result_update.lastrowid == 0: # one record updated log.info(f'One record was found and updated (changes unknown). Returning True') # With SQL UPDATE this record may have actually changed return True elif result_update.rowcount > 1 and result_update.lastrowid == 0: # multiple records updated log.info(f'Multiple records ({result_update.rowcount}) were found and updated. Returning True') return True elif result_update.rowcount == 0 and result_update.lastrowid == 0: # no records found to update (ID probably not found) log.info('No record(s) found to update. The ID was probably not found. Returning None') return None elif result_update.rowcount == 2 and result_update.lastrowid > 0: # update with change log.warning('Should we be here???') log.info('Update record with changes') record_id = result_update.lastrowid return record_id else: log.info('Unknown or unexpected SQL UPDATE response? Returning None') log.debug(result_update) log.debug(vars(result_update)) log.debug(dir(result_update)) log.debug(result_update.rowcount) # returns 1 on insert and 2 on update with change log.debug(result_update.lastrowid) # returns last row ID on insert and update with a change and returns 0 if nothing changed # NOTE: Returning None seems to make more sense. There were no errors. Just nothing needed to be updated. return None # False was False until 2022-03-16 return False # ### END ### API DB SQL ### sql_update() ### # ### BEGIN ### Core Help CRUD ### sql_insert_or_update() ### # The catch all SQL INSERT or UPDATE function - STI 2021-02-17 # This one does it all for SQL INSERT and UPDATE queries # Updated 2021-09-07 @logger_reset def sql_insert_or_update( sql: str|None = None, data: dict|None = None, table_name: str|None = None, rm_id_random: bool = False, id_random_length: int|None = None, log_lvl: int = logging.DEBUG, # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL ): log.setLevel(log_lvl) if sql: sql_insert_or_update = text(sql) elif table_name and data: if rm_id_random: data = lookup_id_random_pop(obj_data=data) pass if not data.get('id_random', None) and id_random_length: data['id_random'] = secrets.token_urlsafe(id_random_length) fields = [] values = [] for key, value in data.items(): if key != 'id': # A special exception for the id auto increment field. fields.append('`'+str(key)+'`') values.append(':'+str(key)) fields_string = ', '.join(fields) values_string = ', '.join(values) field_list = [] for key, value in data.items(): if key != 'id': # Creating a special exception for the id field. field_list.append('`'+str(key) + '` = :' + str(key)) set_values_string = ', '.join(field_list) sql_insert_or_update = text(f""" INSERT INTO `{table_name}` ({fields_string}) VALUES ({values_string}) ON DUPLICATE KEY UPDATE {set_values_string} ; """) log.setLevel(logging.DEBUG) log.debug(f""" INSERT INTO `{table_name}` ({fields_string}) VALUES ({values_string}) ON DUPLICATE KEY UPDATE {set_values_string} ; """) trans = db.begin() try: log.debug(data) result_insert_or_update = db.execute(sql_insert_or_update, data) trans.commit() except IntegrityError as e: # Specifically want to capture duplicate entry attempts # http://sqlalche.me/e/14/gkpj # Need a check for this: sqlalchemy.exc.IntegrityError: (MySQLdb._exceptions.IntegrityError) (1062, "Duplicate entry 'z-yyyy-xxxx-wwww for key 'PRIMARY'" trans.rollback() log.error('An integrity error exception happened. This is likely because there was an attempt to create a duplicate entry. Returning None') log.exception('**** *** ** * ### BEGIN ### Integrity Error Exception Happened: Returning None * ** *** ****') log.error('^^^^ ^^^ ^^ ^ ### END ### Integrity Error Exception ^ ^^ ^^^ ^^^^') log.error('**** *** ** * ### BEGIN ### Integrity Error Exception Details: * ** *** ****') log.error('**** *** ** * SQL Statement: * ** *** ****') log.error(e.statement) log.error('**** *** ** * SQL Parameters: * ** *** ****') log.error(e.params) log.error('**** *** ** * SQL Origin Message: * ** *** ****') log.error(e.orig) log.error('**** *** ** * ### END ### Integrity Error Exception Details: * ** *** ****') return None except OperationalError as e: # Likely an unknown field or related trans.rollback() log.error('An operational error exception happened. This is likely because there was an unknown field or similar included. Returning False') log.exception('**** *** ** * ### BEGIN ### Operational Error Exception Happened: Returning False * ** *** ****') log.error('^^^^ ^^^ ^^ ^ ### END ### Operational Error Exception ^ ^^ ^^^ ^^^^') log.error('**** *** ** * ### BEGIN ### Operational Error Exception Details: * ** *** ****') log.error('**** *** ** * SQL Statement: * ** *** ****') log.error(e.statement) log.error('**** *** ** * SQL Parameters: * ** *** ****') log.error(e.params) log.error('**** *** ** * SQL Origin Message: * ** *** ****') log.error(e.orig) log.error('**** *** ** * ### END ### Operational Error Exception Details: * ** *** ****') return False except Exception as e: trans.rollback() log.exception('**** *** ** * ### BEGIN ### Exception Happened: Returning False * ** *** ****') log.error('^^^^ ^^^ ^^ ^ ### END ### Exception ^ ^^ ^^^ ^^^^') log.error('**** *** ** * ### BEGIN ### Exception Details: * ** *** ****') log.error('**** *** ** * SQL Statement: * ** *** ****') log.error(e.statement) log.error('**** *** ** * SQL Parameters: * ** *** ****') log.error(e.params) log.error('**** *** ** * SQL Origin Message: * ** *** ****') log.error(e.orig) log.error('**** *** ** * ### END ### Exception Details: * ** *** ****') return False else: log.debug(result_insert_or_update) log.debug(f'rowcount = {result_insert_or_update.rowcount}; lastrowid = {result_insert_or_update.lastrowid}') if result_insert_or_update.rowcount == 1 and result_insert_or_update.lastrowid > 0: # insert record_id = result_insert_or_update.lastrowid log.info(f'Insert record: {record_id}') return record_id elif result_insert_or_update.rowcount == 1 and result_insert_or_update.lastrowid == 0: # update with no change log.info('Update record with no change') return True elif result_insert_or_update.rowcount == 2 and result_insert_or_update.lastrowid > 0: # update with change record_id = result_insert_or_update.lastrowid log.info(f'Update record with changes: {record_id}') return record_id else: log.debug(result_insert_or_update) log.debug(vars(result_insert_or_update)) log.debug(dir(result_insert_or_update)) log.debug(result_insert_or_update.rowcount) # returns 1 on insert and 2 on update with change log.debug(result_insert_or_update.lastrowid) # returns last row ID on insert and update with a change and returns 0 if nothing changed return False return False # ### END ### Core Help CRUD ### sql_insert_or_update() ### # ### BEGIN ### Core Help CRUD ### sql_select() ### # The catch all SQL SELECT function - STI 2021-02-17 # This one does it all for SQL SELECT queries. It now works with limit and offset! - STI 2023-06-29 # Updated 2023-06-29 @logger_reset def sql_select( table_name: str|None = None, record_id: int|None = None, record_id_random: str|None = None, field_name: str|None = None, field_value = None, enabled: str|None = None, # enabled, disabled, all hidden: str|None = None, # hidden, not_hidden, all qry_dict_li: dict|None = None, # NEW 2024-08-14 fulltext_qry_dict: dict|None = None, and_qry_dict: dict|None = None, and_like_dict: dict|None = None, or_like_dict: dict|None = None, and_in_dict_li: dict|None = None, search_query: Any|None = None, # NEW 2026-01-02 (SearchQuery model) searchable_fields: List[str]|None = None, # NEW 2026-01-03 fulltext_qry_field_li: list|None = None, # ['field_name_1', 'field_name_2'] fulltext_qry_str: str|None = None, # 'search string' order_by_li: dict|None = None, # {"the_field_name": "DESC"} limit: int = 9999999, offset: int = 0, sql: str|None = None, data: dict|None = None, rm_id_random: bool = False, as_dict: bool|None = True, as_list: bool|None = False, max_count: int = 100000, log_lvl: int = logging.WARNING, # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL ) -> None|bool|dict|list: log.setLevel(log_lvl) log.debug(locals()) if limit >= 0 and offset >= 0: log.info(f'Creating partial SQL string for LIMIT and OFFSET. Limit: {limit}; Offset: {offset}') sql_limit_offset = f'LIMIT {limit} OFFSET {offset}' else: sql_limit_offset = '' sql_order_by = '' # order_by_li = {'created_on': 'DESC', 'updated_on': 'DESC'} log.debug(order_by_li) if order_by_li and isinstance(order_by_li, dict): # This should be a list order_by_str_li = [] for key, value in order_by_li.items(): order_by_str_li.append(f'`{table_name}`.`{key}` {value}') # log.debug(order_by_str_li) # if isinstance(value, dict) or isinstance(value, list): # data[key] = json.dumps(value) log.debug(order_by_str_li) order_by_string = ', '.join(order_by_str_li) sql_order_by = f'ORDER BY {order_by_string}' else: sql_order_by = '' log.debug(sql_order_by) # NOTE: Version 1 of the fulltext search # NOTE: This version works fine, but can only do one MATCH AGAINST at a time. - STI 2023-11-29 # sql_fulltext_match_against = '' # log.debug(fulltext_qry_field_li) # if fulltext_qry_field_li and isinstance(fulltext_qry_field_li, list) and fulltext_qry_str: # fulltext_qry_field_li should be a list # fulltext_qry_field_string = ', '.join(fulltext_qry_field_li) # sql_fulltext_match_against = f'AND MATCH( {fulltext_qry_field_string} ) AGAINST( :fulltext_qry_str IN BOOLEAN MODE )' # else: # sql_fulltext_match_against = '' # log.debug(sql_fulltext_match_against) # NOTE: Version 2 of the fulltext search # NOTE: This version works well and can do multiple MATCH AGAINST at a time. - STI 2023-11-29 # sql_fulltext_match_against = '' # log.debug(fulltext_qry_field_li) # if fulltext_qry_field_li and isinstance(fulltext_qry_field_li, list) and fulltext_qry_str: # fulltext_qry_field_li should be a list # log.info('Creating partial SQL string for fulltext search.') # fulltext_qry_field_li_str = [] # for value in fulltext_qry_field_li: # log.debug(value) # fulltext_qry_field_li_str.append(f'MATCH( {value} ) AGAINST( :fulltext_qry_str IN BOOLEAN MODE )') # fulltext_qry_field_string = ' OR '.join(fulltext_qry_field_li_str) # sql_fulltext_match_against = f'AND ({fulltext_qry_field_string})' # log.debug(sql_fulltext_match_against) # log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL # if enabled: # sql_enabled = sql_enable_part(table_name=table_name, enabled=enabled) # Reasonably safe return str # else: # sql_enabled = '' # if hidden: # sql_hidden = sql_hidden_part(table_name=table_name, hidden=hidden) # Reasonably safe return str # else: # sql_hidden = '' if table_name and not (record_id or record_id_random or field_name or field_value or sql or data): # Select all records from a table # Updated 2023-07-06 log.info('Select all records from a table') # NOTE: This is new and currently only working with the API CRUD list endpoint and the sql_select function calls. -2023-07-06 # NOTE: This call (without field_name, field_value) may need more testing. data = {} if enabled: sql_enabled, data['enabled'] = sql_enable_part(table_name=table_name, enabled=enabled) # Reasonably safe return str else: sql_enabled = '' # data['enabled'] = '' if hidden: sql_hidden, data['hidden'] = sql_hidden_part(table_name=table_name, hidden=hidden) # Reasonably safe return str else: sql_hidden = '' # data['hidden'] = '' sql_search_qry = '' if search_query: log.info('Creating partial SQL string for complex SearchQuery.') sql_search_qry, data_search = sql_search_qry_part(search_query, searchable_fields=searchable_fields) data = {**data, **data_search} sql = text( f""" SELECT * FROM `{table_name}` WHERE 1=1 {sql_search_qry} {sql_enabled} {sql_hidden} {sql_order_by} {sql_limit_offset} ; """ ) elif table_name and (record_id or record_id_random) and not (field_name or field_value or sql or data): # Select all records from a table with an ID (auto or random) log.info('Select all records from a table with an ID (auto or random)') data = {} if record_id: data['record_id'] = record_id sql = text( f""" SELECT * FROM `{table_name}` WHERE `{table_name}`.id = :record_id {sql_order_by} {sql_limit_offset} ; """ ) elif record_id_random: data['record_id_random'] = record_id_random sql = text( f""" SELECT * FROM `{table_name}` WHERE `{table_name}`.id_random = :record_id_random {sql_order_by} {sql_limit_offset} ; """ ) elif table_name and field_name and field_value and not (record_id or record_id_random or sql or data): # Select all records from a table with a specific field and field value # Updated 2024-10-08 # log.setLevel(logging.INFO) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.info('Select all records from a table with a specific field and field value') if not data: data = {} # This is the new catch all version for building the WHERE clause for the SQL SELECT statement. -2024-08-14 sql_where_qry = '' if qry_dict_li: log.info('Creating partial SQL string for search.') sql_where_qry, data_qry = sql_where_qry_part(qry_dict_li) # NOTE: Merge the data_qry result with the data dict data = {**data, **data_qry} sql_fulltext_match_against = '' if fulltext_qry_dict: log.info('Creating partial SQL string for fulltext search.') sql_fulltext_match_against, data_qry = sql_fulltext_qry_part(fulltext_qry_dict) # NOTE: Merge the data_qry result with the data dict data = {**data, **data_qry} sql_and_qry = '' if and_qry_dict: log.info('Creating partial SQL string for AND search (equals).') sql_and_qry, data_qry = sql_and_qry_part(and_qry_dict) # NOTE: Merge the data_qry result with the data dict data = {**data, **data_qry} sql_and_like = '' if and_like_dict: log.info('Creating partial SQL string for AND search (LIKE).') sql_and_like, data_qry = sql_and_like_part(and_like_dict) # NOTE: Merge the data_qry result with the data dict data = {**data, **data_qry} sql_or_like = '' if or_like_dict: log.info('Creating partial SQL string for OR search (LIKE).') sql_or_like, data_qry = sql_or_like_part(or_like_dict) # NOTE: Merge the data_qry result with the data dict data = {**data, **data_qry} sql_and_in_dict_li = '' if and_in_dict_li: log.info('Creating partial SQL string for AND search (IN).') sql_and_in_dict_li, data_qry = sql_and_in_dict_li_part(and_in_dict_li) # NOTE: Merge the data_qry result with the data dict data = {**data, **data_qry} sql_search_qry = '' if search_query: log.info('Creating partial SQL string for complex SearchQuery.') sql_search_qry, data_search = sql_search_qry_part(search_query, searchable_fields=searchable_fields) data = {**data, **data_search} # # NOTE: Version 3 of the fulltext search # sql_fulltext_match_against = '' # log.debug(fulltext_qry_dict) # if fulltext_qry_dict and isinstance(fulltext_qry_dict, dict): # fulltext_qry_dict should be a dict # log.info('Creating partial SQL string for fulltext search.') # fulltext_qry_dict_str = [] # # if not data: # # data = {} # for key, value in fulltext_qry_dict.items(): # log.debug(f'Key = {key}; Value = {value}') # fulltext_qry_dict_str.append(f'MATCH( {key} ) AGAINST( :ft_{key} IN BOOLEAN MODE )') # # fulltext_qry_dict_str.append(f'MATCH( {key} ) AGAINST( :{key} IN BOOLEAN MODE )') # data[f'ft_{key}'] = value # # data[key] = 'temp value' # # log.debug(data) # # data[key] = value # log.debug(data) # fulltext_qry_field_string = ' OR '.join(fulltext_qry_dict_str) # sql_fulltext_match_against = f'AND ({fulltext_qry_field_string})' # log.debug(sql_fulltext_match_against) # NOTE: This is new and currently only working with the API CRUD list endpoint and the sql_select function calls. -2023-07-06 # NOTE: This may need more testing. # if not data: # data = {} data[field_name] = field_value # if sql_fulltext_match_against: # data['fulltext_qry_str'] = fulltext_qry_str if enabled: sql_enabled, data['enabled'] = sql_enable_part(table_name=table_name, enabled=enabled) # Reasonably safe return str else: sql_enabled = '' # data['enabled'] = '' if hidden: sql_hidden, data['hidden'] = sql_hidden_part(table_name=table_name, hidden=hidden) # Reasonably safe return str else: sql_hidden = '' # data['hidden'] = '' # if sql_enabled: # data['enable'] = sql_enable_part(table_name=table_name, enabled=enabled) # Reasonably safe return str and bool # if sql_hidden: # data['hidden'] = sql_hidden_part(table_name=table_name, hidden=hidden) # Reasonably safe return str and bool # sql_enabled, data['enable'] = sql_enable_part(table_name=table_name, enabled=enabled) # Reasonably safe return str and bool # # sql_hidden, data['hidden'] = sql_hidden_part(table_name=table_name, hidden=hidden) # Reasonably safe return str and bool sql = text( f""" SELECT * FROM `{table_name}` WHERE `{table_name}`.{field_name} = :{field_name} {sql_where_qry} {sql_fulltext_match_against} {sql_and_qry} {sql_and_like} {sql_or_like} {sql_and_in_dict_li} {sql_search_qry} {sql_enabled} {sql_hidden} {sql_order_by} {sql_limit_offset} ; """ ) elif table_name and data and not (record_id or record_id_random or field_name or field_value or sql): # Select all records from a table with a specific list of fields and field values (list of dicts) log.info('Select all records from a table with a specific list of fields and field values (list of dicts)') if rm_id_random: data = lookup_id_random_pop(obj_data=data) log.debug(data) sql_where = [] for field_name in data: log.debug(field_name) sql_where_line = f"""`{table_name}`.{field_name} = :{field_name}""" sql_where.append(sql_where_line) sql_where_string = ' AND '.join(sql_where) log.debug(sql_where_string) sql = text( f""" SELECT * FROM `{table_name}` WHERE {sql_where_string} {sql_order_by} {sql_limit_offset} ; """ ) elif sql and not (table_name or record_id or record_id_random or field_name or field_value or data): # Select records based on the SQL statement given log.info('Select records based on the SQL statement given') sql = text(sql) elif sql and data and not (table_name or record_id or record_id_random or field_name or field_value): # Select records based on the SQL statement given and with the matching data dict fields and values log.info('Select records based on the SQL statement given and with the matching data dict fields and values') if rm_id_random: data = lookup_id_random_pop(obj_data=data) sql = text(sql) else: # Nothing matched the expected combination of parameters passed to this function log.warning('Nothing matched the expected combination of parameters passed to this function') return False # Not successful # log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(sql) log.debug(data) # log.setLevel(logging.INFO) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL result = run_sql_select(sql=sql, data=data) # log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL if result: log.debug(f'Row count: {result.rowcount}') # log.debug(vars(result)) # log.debug(dir(result)) # NOTE: as_dict defaults to True for this function # NOTE: as_list defaults to False for this function # NOTE: After testing, this method is the fastest way to convert to a dict - STI 2021-03-09 # NOTE: My custom sql_result_proxy_to_dict_simple(result_proxy=result.first()) is slower than using dict(). # NOTE: list(result) was tested seems to be the slowest. Slower than my custom function. if result and result.rowcount == 1: log.info(f'Found one record. as_dict={as_dict}, as_list={as_list}') if as_dict: record = dict(result.first()) else: record = result.first() if as_list: record_li = [] record_li.append(record) log.debug(record_li) return record_li # Successful else: log.debug(record) return record # Successful elif result and result.rowcount > 1: log.info(f'Found {result.rowcount} records. as_dict={as_dict}, as_list={as_list}') if as_dict: #timer_1_start = timer() record_li = [dict(record) for record in result.fetchall()] #log.debug(record_li) #log.debug(type(record_li)) #log.debug(type(record_li[0])) #timer_1_end = timer() #log.debug( round((timer_1_end - timer_1_start), 8) ) else: record_li = result.fetchall() log.debug(record_li) return record_li # Successful else: if as_list: log.info('No records found. Returning an empty list.') log.debug(result) return [] # Successful even though no results else: log.info('No records found. Returning None.') log.debug(result) return None # Successful even though no results # ### END ### Core Help CRUD ### sql_select() ### # ### BEGIN ### Core Help CRUD ### run_sql_select() ### # Updated 2023-11-29 @logger_reset def run_sql_select( sql: str|None = None, data: dict|None = None, commit: bool = False, log_lvl: int = logging.WARNING, # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL ) -> None|bool|dict|list: log.setLevel(log_lvl) #log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug('*** ** * ** ***') log.debug(sql) log.debug('*** ** * ** ***') log.debug(data) log.debug('*** ** * ** ***') log.debug(vars(sql)) log.debug('*** ** * ** ***') log.debug(dir(sql)) log.debug('*** ** * ** ***') if not db: log.exception('The database connection is not available!!! Returning False.') return False try: # https://docs.sqlalchemy.org/en/13/core/tutorial.html#using-textual-sql # https://docs.sqlalchemy.org/en/13/core/sqlelement.html#sqlalchemy.sql.expression.TextClause.columns # https://docs.sqlalchemy.org/en/13/core/type_basics.html # Use the columns method to specify the data types for the columns. This may need to be done for other column names. if commit: trans = db.begin() sql = sql.columns(recurring_start_time=Time, recurring_end_time=Time) if data: log.info('Executing with SQL statement and data...') result = db.execute(sql, data) else: log.info('Executing with SQL statement only...') result = db.execute(sql) if commit: trans.commit() except OperationalError as e: log.error('An operational error exception happened. This is likely a "MySQL server has gone away" error. Going to try again...') log.exception('**** *** ** * ### BEGIN ### Operational Exception Happened: Trying again... * ** *** ****') log.error('^^^^ ^^^ ^^ ^ ### END ### Operational Exception ^ ^^ ^^^ ^^^^') # log.error('SQL Alchemy DB URI:') # log.error(settings.SQLALCHEMY_DB_URI) sql_connect(current_db=db) # log.info('Trying to recreate the pool...') # log.debug('############## ############') # log.debug(dir(db)) # log.debug(vars(db)) # log.debug('############## ############') # log.debug(dir(db.engine)) # log.debug(vars(db.engine)) # log.debug('############## ############') # log.debug(dir(db.engine.pool)) # log.debug(vars(db.engine.pool)) # log.debug('############## ############') # db.engine.dispose() log.info('Now trying the query again...') try: if commit: trans = db.begin() if data: log.info('2x Executing with SQL statement and data...') result = db.execute(sql, data) else: log.info('2x Executing with SQL statement only...') result = db.execute(sql) if commit: trans.commit() except Exception as e: log.error('Tried again an exception was raised again. Not going to try again.') log.exception('**** *** ** * ### BEGIN ### (2x) Second Exception Happened: Returning False * ** *** ****') log.error('^^^^ ^^^ ^^ ^ ### END ### Exception ^ ^^ ^^^ ^^^^') return False # Not successful else: log.info('Successfully executed the SQL on the second try.') pass except ProgrammingError as e: log.error('A programming error exception happened. This may be related to multithreading. It may also be related to a DB connection issue. Going to try again...') log.exception('**** *** ** * ### BEGIN ### Programming Exception Happened: Trying again... * ** *** ****') log.error('^^^^ ^^^ ^^ ^ ### END ### Programming Exception ^ ^^ ^^^ ^^^^') # log.error('SQL Alchemy DB URI:') # log.error(settings.SQLALCHEMY_DB_URI) sql_connect(current_db=db) log.info('Now trying the query again...') try: if data: log.info('2x Executing with SQL statement and data...') result = db.execute(sql, data) else: log.info('2x Executing with SQL statement only...') result = db.execute(sql) except Exception as e: log.error('Tried again an exception was raised again. Not going to try again.') log.exception('**** *** ** * ### BEGIN ### (2x) Second Exception Happened: Returning False * ** *** ****') log.error('^^^^ ^^^ ^^ ^ ### END ### Exception ^ ^^ ^^^ ^^^^') return False else: log.info('Successfully executed the SQL on the second try.') pass except Exception as e: log.error('An unknown exception happened. Returning False.') log.exception('**** *** ** * ### BEGIN ### Exception Happened: Returning False * ** *** ****') log.error('^^^^ ^^^ ^^ ^ ### END ### Exception ^ ^^ ^^^ ^^^^') log.error('**** *** ** * ### BEGIN ### Exception Details: * ** *** ****') log.error('**** *** ** * SQL Statement: * ** *** ****') log.error(e.statement) log.error('**** *** ** * SQL Parameters: * ** *** ****') log.error(e.params) log.error('**** *** ** * SQL Origin Message: * ** *** ****') log.error(e.orig) log.error('**** *** ** * ### END ### Exception Details: * ** *** ****') return False # Not successful else: log.info('Successfully executed the SQL on the first try.') pass return result # ### END ### Core Help CRUD ### run_sql_select() ### # ### BEGIN ### Core Help CRUD ### sql_delete() ### # The catch all SQL DELETE function - STI 2021-02-17 # This one does it all for SQL DELETE queries # Updated 2022-02-15 @logger_reset def sql_delete( table_name: str|None = None, record_id: int|None = None, record_id_random: str|None = None, field_name: str|None = None, field_value = None, sql: str|None = None, data: dict|None = None, log_lvl: int = logging.INFO, # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL ) -> None|bool: print('HERE SQL DELETE BEGIN') log.setLevel(log_lvl) if table_name and (record_id or record_id_random) and not (field_name or field_value or sql or data): # Delete all records from a table with an ID (auto or random) log.info('Delete all records from a table with an ID (auto or random)') data = {} if record_id: data['record_id'] = record_id sql = text( f""" DELETE FROM `{table_name}` WHERE `{table_name}`.id = :record_id """ ) elif record_id_random: data['record_id_random'] = record_id_random sql = text( f""" DELETE FROM `{table_name}` WHERE `{table_name}`.id_random = :record_id_random """ ) elif table_name and field_name and field_value and not (record_id or record_id_random or sql or data): # Delete all records from a table with a specific field and field value log.info('Delete all records from a table with a specific field and field value') data = {} data[field_name] = field_value sql = text( f""" DELETE FROM `{table_name}` WHERE `{table_name}`.{field_name} = :{field_name} """ ) elif table_name and data and not (record_id or record_id_random or field_name or field_value or sql): # Delete all records from a table with a specific list of fields and field values (list of dicts) log.info('Delete all records from a table with a specific list of fields and field values (list of dicts)') sql_where = [] for field_name in data: sql_where_line = f"""`{table_name}`.{field_name} = :{field_name}""" sql_where.append(sql_where_line) sql_where_string = ' AND '.join(sql_where) log.debug(sql_where_string) sql = text( f""" DELETE FROM `{table_name}` WHERE {sql_where_string} """ ) elif sql and not (table_name or record_id or record_id_random or field_name or field_value or data): # Delete records based on the SQL statement given log.info('Delete records based on the SQL statement given') sql = text(sql) elif sql and data and not (table_name or record_id or record_id_random or field_name or field_value): # Delete records based on the SQL statement given and with the matching data dict fields and values log.info('Delete records based on the SQL statement given and with the matching data dict fields and values') sql = text(sql) else: # Nothing matched the expected combination of parameters passed to this function log.warning('Nothing matched the expected combination of parameters passed to this function') return False # Not successful log.debug(sql) # return False try: # log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL if data: log.info('Executing with SQL (DELETE) statement and data...') result = db.execute(sql, data) else: log.info('Executing with SQL (DELETE?) statement only...') result = db.execute(sql) log.debug(result) log.debug(dir(result)) log.debug(vars(result)) except OperationalError as e: log.error('An operational error exception happened. This is likely a "MySQL server has gone away" error. Going to try again...') log.exception('**** *** ** * ### BEGIN ### Operational Exception Happened: Trying again... * ** *** ****') log.error('^^^^ ^^^ ^^ ^ ### END ### Operational Exception ^ ^^ ^^^ ^^^^') sql_connect(current_db=db) # log.info('Trying to recreate the pool...') # log.debug('############## ############') # log.debug(dir(db)) # log.debug(vars(db)) # log.debug('############## ############') # log.debug(dir(db.engine)) # log.debug(vars(db.engine)) # log.debug('############## ############') # log.debug(dir(db.engine.pool)) # log.debug(vars(db.engine.pool)) # log.debug('############## ############') # db.engine.dispose() log.info('Now trying the query again...') try: # log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL if data: log.info('2x Executing with SQL statement and data...') result = db.execute(sql, data) else: log.info('2x Executing with SQL statement only...') result = db.execute(sql) log.debug(result) except Exception as e: log.error('Tried again an exception was raised again. Not going to try again.') log.exception('**** *** ** * ### BEGIN ### (2x) Second Exception Happened: Returning False * ** *** ****') log.error('^^^^ ^^^ ^^ ^ ### END ### Exception ^ ^^ ^^^ ^^^^') return False # Not successful else: log.info('Successfully executed the SQL on the second try.') pass except Exception as e: log.error('An unknown exception happened. Returning False.') log.exception('**** *** ** * ### BEGIN ### Exception Happened: Returning False * ** *** ****') log.error('^^^^ ^^^ ^^ ^ ### END ### Exception ^ ^^ ^^^ ^^^^') log.error('**** *** ** * ### BEGIN ### Exception Details: * ** *** ****') log.error('**** *** ** * SQL Statement: * ** *** ****') log.error(e.statement) log.error('**** *** ** * SQL Parameters: * ** *** ****') log.error(e.params) log.error('**** *** ** * SQL Origin Message: * ** *** ****') log.error(e.orig) log.error('**** *** ** * ### END ### Exception Details: * ** *** ****') return False # Not successful else: log.info('Successfully executed the SQL on the first try.') pass # log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(f'Row count: {result.rowcount}') # log.debug(vars(result)) # log.debug(dir(result)) if result.rowcount == 1: log.info(f'Deleted one record. Returning True.') return True # Successful elif result.rowcount > 1: log.info(f'Deleted {result.rowcount} records. Returning True.') return True # Successful else: log.info('No records deleted. Returning None.') return None # Successful even though no results # ### END ### Core Help CRUD ### sql_delete() ### # ### BEGIN ### API DB SQL ### redis_lookup_id_random() ### # Just return the value if it is an integer # Check if the id_random value is a string and the correct length # Attempt to look up id_random key in Redis # If success then return the ID number # If not success and there is a table_name then check the database table passed # If found in database table then store in Redis and return the ID number # Updated 2024-10-08 @logger_reset def redis_lookup_id_random( record_id_random: int|str, table_name: str, check_int_id: bool = False, log_lvl: int = logging.WARNING, # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL minutes: int = 30, # Expire the Redis key after 8 minutes reset_rate: int = 10, # 1 in 10 chance of resetting the Redis key ) -> str|int|bool|None: log.setLevel(log_lvl) if isinstance(record_id_random, str) and len(record_id_random) >= 11 and len(record_id_random) <= 22: pass elif isinstance(record_id_random, int): record_id = record_id_random if check_int_id: log.info(f'Checking the int ID if exists. Table Name: {table_name} ID: {record_id}') if get_id_random_result := get_id_random( record_id = record_id, table_name = table_name, ): log.info(f'The int ID exists. Returning the int ID. ID Random: {get_id_random_result}') return record_id else: log.info(f'The int ID does not exists. Returning False. Table Name: {table_name} ID: {record_id}') return False else: log.debug(f'Not checking if the int ID exists. Returning the int ID. ID: {record_id}') return record_id elif record_id_random is None: log.info(f'No record ID was passed. Returning None') return None else: log.error(f'Unexpected data type or string format: {type(record_id_random)} Expected type is a string 11 or 22 characters long.') return False if record_id_random and table_name: # WARNING: The record_id_random string length should be checked just in case? if len(record_id_random) < 11: log.error(f'The length of id_random is too short: {record_id_random} ({len(record_id_random)} chars)') return False elif len(record_id_random) > 22: log.error(f'The length of id_random is too long: {record_id_random} ({len(record_id_random)} chars)') return False else: pass elif record_id_random: log.error(f'Missing table_name to select from for id_random "{record_id_random}"') return False elif table_name: log.error(f'Missing id_random to select from table "{table_name}"') return False else: log.error('Missing table_name and record_id_random') return False # r = redis.Redis(host='localhost', port=6379, db=7, password=None, decode_responses=True) r = redis.Redis(host=settings.REDIS['server'], port=settings.REDIS['port'], db=7, password=None, decode_responses=True) # key_name = 'record_id:'+record_id_random key_name = f'{table_name}:{record_id_random}' record_id = r.get(key_name) # log.debug(f'Record ID found: {record_id}') # WARNING WARNING WARNING # WARNING WARNING WARNING # WARNING WARNING WARNING # log.info(f'Looking up ID in Redis is being partially bypassed. Key="{key_name}" value="{record_id}" TTL={r.ttl(key_name)} seconds') # record_id = None # Uncomment this line to bypass the Redis lookup... trust? # WARNING WARNING WARNING # WARNING WARNING WARNING # WARNING WARNING WARNING # To help prevent corrupt data from being stored in Redis for too long, we will set the record_id to None 25% of the time. This will force the function to look up the record_id in the database. This is a temporary solution. Why is the data from SQL not correct in the first place??? -2024-10-08 if record_id and random.randint(1, reset_rate) == 1: log.warning(f'Redis: Randomly (1/{reset_rate}) setting record_id to None. Key="{key_name}" value="{record_id}" TTL={r.ttl(key_name)} seconds') record_id = None if record_id: r.setex(key_name, datetime.timedelta(minutes=minutes), value=record_id) log.info(f'Redis: Entry found for: Key="{key_name}" value="{record_id}" TTL={r.ttl(key_name)} seconds') return int(record_id) elif table_name: data = { 'id_random': record_id_random } sql = f""" SELECT id FROM `{table_name}` AS `table` WHERE `table`.id_random = :id_random; """ if select_results := sql_select(sql=sql, data=data): log.debug(f'SQL: SELECT result: {select_results}') # log.debug(type(select_results)) if isinstance(select_results, dict): log.info(f"""SQL: Found ID Random for: {str(record_id_random)} = {str(select_results.get('id'))}""") if record_id := select_results.get('id'): r.setex(key_name, datetime.timedelta(minutes=minutes), value=record_id) return int(record_id) else: log.error('The SQL result was not what was expected. The ID field was not found.') return False else: log.error(f'SQL: More than one record may have been found in the table "{table_name}". There may be a duplicate id_random value in this table. This should not happen!') log.error(select_results) # Try again... log.warning(f'SQL: ID Random "{record_id_random}" was not found in table "{table_name}". Trying again...') new_result = redis_lookup_id_random(record_id_random=record_id_random, table_name=table_name) return new_result # return False else: log.warning(f'SQL: ID Random "{record_id_random}" was not found in table "{table_name}". Returning None.') return None log.error('We should not be here. Something unexpected happened.') return False # ### END ### API DB SQL ### redis_lookup_id_random() ### # ### BEGIN ### API DB SQL ### get_id_random() ### # Updated 2022-01-06 @logger_reset def get_id_random( record_id: int, table_name: str, log_lvl: int = logging.WARNING, # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL ) -> str|bool|None: log.setLevel(log_lvl) data = { 'id': record_id } sql = f""" SELECT id_random FROM `{table_name}` AS `table` WHERE `table`.id = :id; """ if select_results := sql_select(sql=sql, data=data): log.debug(select_results) log.debug(type(select_results)) if isinstance(select_results, dict): # log.info(f"""Record ID found: {select_results['id_random']}""") # DOES UNCOMMENTING THIS BREAK STUFF??? if record_id_random := select_results.get('id_random'): return str(record_id_random) else: log.error('The SQL result was not what was expected.') return False elif isinstance(select_results, list): log.exception('More than one record may have been found. There may be a duplicate id. This should never happen.') log.error(select_results) return False else: log.exception(f'Got an unexpected result while trying to look up the ID. Is the table name correct? {table_name} Is the record ID valid? {record_id}') log.error(select_results) return False elif select_results is None: log.warning(f'No results with: Table Name: {table_name} ID: {record_id}') log.debug(select_results) return None else: # False or something else not True log.error(f'Something went wrong while trying to look up the ID. Is the table name correct? {table_name} Is the record ID valid? {record_id}') log.error(select_results) return False # ### END ### API DB SQL ### get_id_random() ### @logger_reset def reset_redis(): r = redis.Redis(host=settings.REDIS['server'], port=settings.REDIS['port'], db=7, password=None, decode_responses=True) r.flushdb() return True # ### BEGIN ### API DB SQL ### lookup_id_random_pop() ### # Look up and resolve id_random values to their id # Remove the unneeded *_id_random key from the dict # This really needs to be simplified... Use a list of dicts instead. Can store as JSON in the DB. # Updated 2023-07-06 @logger_reset def lookup_id_random_pop( obj_data: dict, log_lvl: int = logging.WARNING, # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL ): log.setLevel(log_lvl) if 'account_id_random' in obj_data: obj_data['account_id'] = redis_lookup_id_random(record_id_random=obj_data['account_id_random'], table_name='account') obj_data.pop('account_id_random') if 'activity_log_id_random' in obj_data: obj_data['activity_log_id'] = redis_lookup_id_random(record_id_random=obj_data['activity_log_id_random'], table_name='activity_log') obj_data.pop('address_id_random') if 'address_id_random' in obj_data: obj_data['address_id'] = redis_lookup_id_random(record_id_random=obj_data['address_id_random'], table_name='address') obj_data.pop('address_id_random') if 'address_location_id_random' in obj_data: obj_data['address_location_id'] = redis_lookup_id_random(record_id_random=obj_data['address_location_id_random'], table_name='address') obj_data.pop('address_location_id_random') if 'archive_id_random' in obj_data: obj_data['archive_id'] = redis_lookup_id_random(record_id_random=obj_data['archive_id_random'], table_name='archive') obj_data.pop('archive_id_random') if 'contact_id_random' in obj_data: obj_data['contact_id'] = redis_lookup_id_random(record_id_random=obj_data['contact_id_random'], table_name='contact') obj_data.pop('contact_id_random') if 'contact_1_id_random' in obj_data: obj_data['contact_1_id'] = redis_lookup_id_random(record_id_random=obj_data['contact_1_id_random'], table_name='contact') obj_data.pop('contact_1_id_random') if 'contact_2_id_random' in obj_data: obj_data['contact_2_id'] = redis_lookup_id_random(record_id_random=obj_data['contact_2_id_random'], table_name='contact') obj_data.pop('contact_2_id_random') if 'cont_edu_cert_id_random' in obj_data: obj_data['cont_edu_cert_id'] = redis_lookup_id_random(record_id_random=obj_data['cont_edu_cert_id_random'], table_name='cont_edu_cert') obj_data.pop('cont_edu_cert_id_random') if 'cont_edu_cert_person_id_random' in obj_data: obj_data['cont_edu_cert_person_id'] = redis_lookup_id_random(record_id_random=obj_data['cont_edu_cert_person_id_random'], table_name='cont_edu_cert_person') obj_data.pop('cont_edu_cert_person_id_random') if 'event_id_random' in obj_data: obj_data['event_id'] = redis_lookup_id_random(record_id_random=obj_data.get('event_id_random', None), table_name='event') obj_data.pop('event_id_random') if 'event_id_random_only' in obj_data: obj_data['event_id_only'] = redis_lookup_id_random(record_id_random=obj_data.get('event_id_random_only', None), table_name='event') obj_data.pop('event_id_random_only') if 'event_abstract_id_random' in obj_data: obj_data['event_abstract_id'] = redis_lookup_id_random(record_id_random=obj_data.get('event_abstract_id_random', None), table_name='event_abstract') obj_data.pop('event_abstract_id_random') if 'event_badge_id_random' in obj_data: obj_data['event_badge_id'] = redis_lookup_id_random(record_id_random=obj_data.get('event_badge_id_random', None), table_name='event_badge') obj_data.pop('event_badge_id_random') if 'event_badge_template_id_random' in obj_data: obj_data['event_badge_template_id'] = redis_lookup_id_random(record_id_random=obj_data.get('event_badge_template_id_random', None), table_name='event_badge_template') obj_data.pop('event_badge_template_id_random') if 'event_exhibit_id_random' in obj_data: obj_data['event_exhibit_id'] = redis_lookup_id_random(record_id_random=obj_data.get('event_exhibit_id_random', None), table_name='event_exhibit') obj_data.pop('event_exhibit_id_random') if 'event_file_id_random' in obj_data: obj_data['event_file_id'] = redis_lookup_id_random(record_id_random=obj_data.get('event_file_id_random', None), table_name='event_file') obj_data.pop('event_file_id_random') if 'event_location_id_random' in obj_data: obj_data['event_location_id'] = redis_lookup_id_random(record_id_random=obj_data.get('event_location_id_random', None), table_name='event_location') obj_data.pop('event_location_id_random') if 'event_person_id_random' in obj_data: obj_data['event_person_id'] = redis_lookup_id_random(record_id_random=obj_data.get('event_person_id_random', None), table_name='event_person') obj_data.pop('event_person_id_random') if 'event_person_profile_id_random' in obj_data: obj_data['event_person_profile_id'] = redis_lookup_id_random(record_id_random=obj_data.get('event_person_profile_id_random', None), table_name='event_person_profile') obj_data.pop('event_person_profile_id_random') if 'event_presentation_id_random' in obj_data: obj_data['event_presentation_id'] = redis_lookup_id_random(record_id_random=obj_data.get('event_presentation_id_random', None), table_name='event_presentation') obj_data.pop('event_presentation_id_random') if 'event_presenter_id_random' in obj_data: obj_data['event_presenter_id'] = redis_lookup_id_random(record_id_random=obj_data.get('event_presenter_id_random', None), table_name='event_presenter') obj_data.pop('event_presenter_id_random') if 'event_registration_id_random' in obj_data: obj_data['event_registration_id'] = redis_lookup_id_random(record_id_random=obj_data.get('event_registration_id_random', None), table_name='event_registration') obj_data.pop('event_registration_id_random') if 'event_session_id_random' in obj_data: obj_data['event_session_id'] = redis_lookup_id_random(record_id_random=obj_data.get('event_session_id_random', None), table_name='event_session') obj_data.pop('event_session_id_random') if 'event_track_id_random' in obj_data: obj_data['event_track_id'] = redis_lookup_id_random(record_id_random=obj_data.get('event_track_id_random', None), table_name='event_track') obj_data.pop('event_track_id_random') if 'grant_id_random' in obj_data: obj_data['grant_id'] = redis_lookup_id_random(record_id_random=obj_data.get('grant_id_random', None), table_name='grant') obj_data.pop('grant_id_random') if 'hosted_file_id_random' in obj_data: obj_data['hosted_file_id'] = redis_lookup_id_random(record_id_random=obj_data.get('hosted_file_id_random', None), table_name='hosted_file') obj_data.pop('hosted_file_id_random') if 'journal_id_random' in obj_data: obj_data['journal_id'] = redis_lookup_id_random(record_id_random=obj_data.get('journal_id_random', None), table_name='journal') obj_data.pop('journal_id_random') if 'journal_entry_id_random' in obj_data: obj_data['journal_entry_id'] = redis_lookup_id_random(record_id_random=obj_data.get('journal_entry_id_random', None), table_name='journal_entry') obj_data.pop('journal_entry_id_random') if 'membership_group_id_random' in obj_data: obj_data['membership_group_id'] = redis_lookup_id_random(record_id_random=obj_data.get('membership_group_id_random', None), table_name='membership_group') obj_data.pop('membership_group_id_random') if 'membership_person_group_id_random' in obj_data: obj_data['membership_person_group_id'] = redis_lookup_id_random(record_id_random=obj_data.get('membership_person_group_id_random', None), table_name='membership_person_group') obj_data.pop('membership_person_group_id_random') if 'membership_person_id_random' in obj_data: obj_data['membership_person_id'] = redis_lookup_id_random(record_id_random=obj_data.get('membership_person_id_random', None), table_name='membership_person') obj_data.pop('membership_person_id_random') if 'membership_type_id_random' in obj_data: obj_data['membership_type_id'] = redis_lookup_id_random(record_id_random=obj_data.get('membership_type_id_random', None), table_name='membership_type') obj_data.pop('membership_type_id_random') if 'membership_person_type_id_random' in obj_data: obj_data['membership_person_type_id'] = redis_lookup_id_random(record_id_random=obj_data.get('membership_person_type_id_random', None), table_name='membership_person_type') obj_data.pop('membership_person_type_id_random') if 'order_id_random' in obj_data: obj_data['order_id'] = redis_lookup_id_random(record_id_random=obj_data.get('order_id_random', None), table_name='order') obj_data.pop('order_id_random') if 'order_line_id_random' in obj_data: obj_data['order_line_id'] = redis_lookup_id_random(record_id_random=obj_data.get('order_line_id_random', None), table_name='order_line') obj_data.pop('order_line_id_random') if 'order_cart_id_random' in obj_data: obj_data['order_cart_id'] = redis_lookup_id_random(record_id_random=obj_data.get('order_cart_id_random', None), table_name='order_cart') obj_data.pop('order_cart_id_random') if 'order_cart_line_id_random' in obj_data: obj_data['order_cart_line_id'] = redis_lookup_id_random(record_id_random=obj_data.get('order_cart_line_id_random', None), table_name='order_cart_line') obj_data.pop('order_cart_line_id_random') if 'organization_id_random' in obj_data: obj_data['organization_id'] = redis_lookup_id_random(record_id_random=obj_data.get('organization_id_random', None), table_name='organization') obj_data.pop('organization_id_random') if 'page_id_random' in obj_data: obj_data['page_id'] = redis_lookup_id_random(record_id_random=obj_data['page_id_random'], table_name='page') obj_data.pop('page_id_random') if 'person_id_random' in obj_data: obj_data['person_id'] = redis_lookup_id_random(record_id_random=obj_data['person_id_random'], table_name='person') obj_data.pop('person_id_random') if 'poc_event_person_id_random' in obj_data: obj_data['poc_event_person_id'] = redis_lookup_id_random(record_id_random=obj_data['poc_event_person_id_random'], table_name='event_person') obj_data.pop('poc_event_person_id_random') if 'poc_person_id_random' in obj_data: obj_data['poc_person_id'] = redis_lookup_id_random(record_id_random=obj_data['poc_person_id_random'], table_name='person') obj_data.pop('poc_person_id_random') if 'post_id_random' in obj_data: obj_data['post_id'] = redis_lookup_id_random(record_id_random=obj_data.get('post_id_random', None), table_name='post') obj_data.pop('post_id_random') if 'product_id_random' in obj_data: obj_data['product_id'] = redis_lookup_id_random(record_id_random=obj_data['product_id_random'], table_name='product') obj_data.pop('product_id_random') if 'sponsorship_id_random' in obj_data: obj_data['sponsorship_id'] = redis_lookup_id_random(record_id_random=obj_data['sponsorship_id_random'], table_name='sponsorship') obj_data.pop('sponsorship_id_random') if 'sponsorship_cfg_id_random' in obj_data: obj_data['sponsorship_cfg_id'] = redis_lookup_id_random(record_id_random=obj_data['sponsorship_cfg_id_random'], table_name='sponsorship_cfg') obj_data.pop('sponsorship_cfg_id_random') if 'site_id_random' in obj_data: obj_data['site_id'] = redis_lookup_id_random(record_id_random=obj_data['site_id_random'], table_name='site') obj_data.pop('site_id_random') if 'user_id_random' in obj_data: obj_data['user_id'] = redis_lookup_id_random(record_id_random=obj_data['user_id_random'], table_name='user') obj_data.pop('user_id_random') if 'for_type' in obj_data and 'for_id_random' in obj_data: obj_data['for_id'] = redis_lookup_id_random(record_id_random=obj_data.get('for_id_random', None), table_name=obj_data.get('for_type', None)) obj_data.pop('for_id_random') #log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(obj_data) elif 'for_id_random' in obj_data: # In case for_id_random was passed without for_type log.warn('for_id_random was passed without for_type') obj_data.pop('for_id_random') if 'link_to_type' in obj_data and 'link_to_id_random' in obj_data: obj_data['link_to_id'] = redis_lookup_id_random(record_id_random=obj_data.get('link_to_id_random', None), table_name=obj_data.get('link_to_type', None)) obj_data.pop('link_to_id_random') #log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(obj_data) elif 'link_to_id_random' in obj_data: # In case link_to_id_random was passed without link_to_type log.warn('link_to_id_random was passed without link_to_type') obj_data.pop('link_to_id_random') if 'object_type' in obj_data and 'object_id_random' in obj_data: obj_data['object_id'] = redis_lookup_id_random(record_id_random=obj_data.get('object_id_random', None), table_name=obj_data.get('object_type', None)) obj_data.pop('object_id_random') #log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(obj_data) elif 'object_id_random' in obj_data: # In case object_id_random was passed without object_type log.warn('object_id_random was passed without object_type') obj_data.pop('object_id_random') if 'to_object_type' in obj_data and 'to_object_id_random' in obj_data: obj_data['to_object_id'] = redis_lookup_id_random(record_id_random=obj_data.get('to_object_id_random', None), table_name=obj_data.get('to_object_type', None)) obj_data.pop('to_object_id_random') #log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(obj_data) elif 'to_object_id_random' in obj_data: # In case to_object_id_random was passed without to_object_type log.warn('to_object_id_random was passed without to_object_type') obj_data.pop('to_object_id_random') if 'from_object_type' in obj_data and 'from_object_id_random' in obj_data: obj_data['from_object_id'] = redis_lookup_id_random(record_id_random=obj_data.get('from_object_id_random', None), table_name=obj_data.get('from_object_type', None)) obj_data.pop('from_object_id_random') #log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(obj_data) elif 'from_object_id_random' in obj_data: # In case from_object_id_random was passed without from_object_type log.warn('from_object_id_random was passed without from_object_type') obj_data.pop('from_object_id_random') return obj_data # ### END ### API DB SQL ### lookup_id_random_pop() ### # ### BEGIN ### API DB SQL Methods ### get_account_id_w_for_type_id() ### # NOTE: This is only useful for a few tables that have account_id as a field or views that have it included. # address, contact, event, people, user # Updated 2022-01-07 @logger_reset def get_account_id_w_for_type_id( for_type: str, # This is the table name for_id: int|str, ) -> bool|int|None: log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) if for_id := redis_lookup_id_random(record_id_random=for_id, table_name=for_type): pass else: return False data = {} data['for_type'] = for_type data['for_id'] = for_id sql = f""" SELECT `tbl`.id AS 'for_id', `tbl`.id_random AS 'for_id_random', `tbl`.account_id AS account_id FROM `{for_type}` AS `tbl` WHERE `tbl`.id = :for_id LIMIT 1; """ log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL if for_data_result := sql_select(data=data, sql=sql): log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(for_data_result) if account_id := for_data_result.get('account_id', None): return account_id else: return False else: return None # ### END ### API DB SQL Methods ### get_account_id_w_for_type_id() ### # ### BEGIN ### API DB SQL Methods ### sql_where_qry_part() ### # Example JSON data # jp: { # qry: [ # { # type: "AND", # field: "enable", # operator: "=", # value: TRUE # }, # { # type: "AND", # field: "example", # operator: ">=", # value: 2 # }, # { # type: "OR", # field: "test", # operator: "LIKE", # value: "%xyz%" # }, # ] # } # Updated 2024-08-14 def sql_where_qry_part( qry_dict_li: list, # JSON data ) -> bool|str: log.setLevel(logging.INFO) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) data = {} sql_where_qry = '' if qry_dict_li and isinstance(qry_dict_li, list): log.info('Creating partial SQL string for WHERE queries.') sql_where_qry_str = [] for qry in qry_dict_li: log.debug(qry) if qry.get('type') == '': if qry.get("operator") == 'MATCH': # sql_where_qry_str.append(f'{qry.get("field")} {qry.get("operator")} AGAINST(:{qry.get("field")} IN BOOLEAN MODE)') sql_where_qry_str.append(f'AND MATCH( {qry.get("field")} ) AGAINST( :{qry.get("field")} IN BOOLEAN MODE )') else: sql_where_qry_str.append(f'AND {qry.get("field")} {qry.get("operator")} :{qry.get("field")}') data[qry.get('field')] = qry.get('value') elif qry.get('type') == 'AND': if qry.get("operator") == 'MATCH': # sql_where_qry_str.append(f'AND {qry.get("field")} {qry.get("operator")} AGAINST(:{qry.get("field")} IN BOOLEAN MODE)') sql_where_qry_str.append(f'AND MATCH( {qry.get("field")} ) AGAINST( :{qry.get("field")} IN BOOLEAN MODE )') else: sql_where_qry_str.append(f'AND {qry.get("field")} {qry.get("operator")} :{qry.get("field")}') data[qry.get('field')] = qry.get('value') elif qry.get('type') == 'OR': if qry.get("operator") == 'MATCH': # sql_where_qry_str.append(f'OR {qry.get("field")} {qry.get("operator")} AGAINST(:{qry.get("field")} IN BOOLEAN MODE)') sql_where_qry_str.append(f'OR MATCH( {qry.get("field")} ) AGAINST( :{qry.get("field")} IN BOOLEAN MODE )') else: sql_where_qry_str.append(f'OR {qry.get("field")} {qry.get("operator")} :{qry.get("field")}') data[qry.get('field')] = qry.get('value') else: log.error(f'Unknown query type: {qry.get("type")}') return False # Should this WHERE part also be surrounded by parentheses??? # sql_where_qry = 'AND ('+' '.join(sql_where_qry_str)+')' # sql_where_qry = sql_where_qry_str sql_where_qry = ' '.join(sql_where_qry_str) log.debug(sql_where_qry) return sql_where_qry, data # ### END ### API DB SQL Methods ### sql_where_qry_part() ### # ### BEGIN ### API DB SQL Methods ### sql_fulltext_qry_part() ### # Updated 2023-11-30 @logger_reset def sql_fulltext_qry_part( fulltext_qry_dict: dict, # One or more key value pairs. key = field name; value = search string ) -> bool|dict: log.setLevel(logging.INFO) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) # NOTE: Version 3 of the fulltext search data = {} sql_fulltext_match_against = '' log.debug(fulltext_qry_dict) if fulltext_qry_dict and isinstance(fulltext_qry_dict, dict): log.info('Creating partial SQL string for fulltext search.') fulltext_qry_dict_str = [] for key, value in fulltext_qry_dict.items(): log.debug(f'Key = {key}; Value = {value}') fulltext_qry_dict_str.append(f'MATCH( {key} ) AGAINST( :ft_{key} IN BOOLEAN MODE )') # fulltext_qry_dict_str.append(f'MATCH( {key} ) AGAINST( :ft_{key} IN NATURAL LANGUAGE MODE )') data[f'ft_{key}'] = value fulltext_qry_field_string = ' OR '.join(fulltext_qry_dict_str) sql_fulltext_match_against = f'AND ({fulltext_qry_field_string})' log.debug(sql_fulltext_match_against) log.debug(data) return sql_fulltext_match_against, data # ### BEGIN ### API DB SQL Methods ### sql_and_qry_part() ### # Updated 2023-11-30 @logger_reset def sql_and_qry_part( and_qry_dict_obj: dict, # One or more key value pairs. key = field name; value = search string ) -> bool|dict: log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) # NOTE: Version 3 of the fulltext search data = {} sql_and_qry = '' log.debug(and_qry_dict_obj) if and_qry_dict_obj and isinstance(and_qry_dict_obj, dict): log.info('Creating partial SQL string for additional AND queries.') and_qry_dict_obj_str = [] for key, value in and_qry_dict_obj.items(): log.debug(f'Key = {key}; Value = {value}') and_qry_dict_obj_str.append(f'{key} = :and_{key}') data[f'and_{key}'] = value and_qry_field_string = ' AND '.join(and_qry_dict_obj_str) sql_and_qry = f'AND ({and_qry_field_string})' log.debug(sql_and_qry) log.debug(data) return sql_and_qry, data # ### BEGIN ### API DB SQL Methods ### sql_and_like_part() ### # Updated 2024-04-07 @logger_reset def sql_and_like_part( and_like_dict_obj: dict, # One or more key value pairs. key = field name; value = search string ) -> bool|dict: log.setLevel(logging.INFO) log.debug(locals()) data = {} sql_and_like = '' log.debug(and_like_dict_obj) if and_like_dict_obj and isinstance(and_like_dict_obj, dict): log.info('Creating partial SQL string for additional AND LIKE queries.') and_like_dict_obj_str = [] for key, value in and_like_dict_obj.items(): log.debug(f'Key = {key}; Value = {value}') and_like_dict_obj_str.append(f'{key} LIKE :and_like_{key}') # For now not surrounding with %... may need to be added back in later # data[f'and_like_{key}'] = f'%{value}%' data[f'and_like_{key}'] = f'{value}' and_like_field_string = ' AND '.join(and_like_dict_obj_str) sql_and_like = f'AND ({and_like_field_string})' log.debug(sql_and_like) log.debug(data) return sql_and_like, data # ### BEGIN ### API DB SQL Methods ### sql_or_like_part() ### # Updated 2024-06-21 @logger_reset def sql_or_like_part( or_like_dict_obj: dict, # One or more key value pairs. key = field name; value = search string ) -> bool|dict: log.setLevel(logging.INFO) log.debug(locals()) data = {} sql_or_like = '' log.debug(or_like_dict_obj) if or_like_dict_obj and isinstance(or_like_dict_obj, dict): log.info('Creating partial SQL string for additional OR LIKE queries.') or_like_dict_obj_str = [] for key, value in or_like_dict_obj.items(): log.debug(f'Key = {key}; Value = {value}') or_like_dict_obj_str.append(f'{key} LIKE :or_like_{key}') # For now not surrounding with %... may need to be added back in later # data[f'or_like_{key}'] = f'%{value}%' data[f'or_like_{key}'] = f'{value}' or_like_field_string = ' OR '.join(or_like_dict_obj_str) sql_or_like = f'AND ({or_like_field_string})' log.debug(sql_or_like) log.debug(data) return sql_or_like, data # ### END ### API DB SQL Methods ### sql_or_like_part() ### # ### BEGIN ### API DB SQL Methods ### sql_and_in_dict_li_part() ### # This function takes a list of values and formats them to be used in a SQL IN statement. This may contain one or more fields to use with the IN statement. # Example: sql_and_in_dict_li_part({'field1': [1, 2, 3], 'field2': ['hello', 'world', 'day']}) # Updated 2024-03-15 @logger_reset def sql_and_in_dict_li_part( and_in_dict_li_dict_obj: dict ) -> bool|dict: log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) data = {} sql_and_in_dict_li = '' log.debug(and_in_dict_li_dict_obj) if and_in_dict_li_dict_obj and isinstance(and_in_dict_li_dict_obj, dict): log.info('Creating partial SQL string for additional AND IN queries.') and_in_dict_li_dict_obj_str = [] for key, value in and_in_dict_li_dict_obj.items(): log.debug(f'Key = {key}; Value = {value}') and_in_dict_li_dict_obj_str.append(f'{key} IN :and_in_{key}') # and_in_dict_li_dict_obj_str.append(f'{key} IN ( :and_in_{key} )') data[f'and_in_{key}'] = value and_in_dict_li_field_string = ' AND '.join(and_in_dict_li_dict_obj_str) sql_and_in_dict_li = f'AND ({and_in_dict_li_field_string})' log.debug(sql_and_in_dict_li) log.debug(data) return sql_and_in_dict_li, data # ### BEGIN ### API DB SQL Methods ### sql_enable_part() ### # Updated 2022-01-17 @logger_reset def sql_enable_part(table_name: str, enabled: str) -> bool|dict: log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) if not table_name: return False if enabled in ['enabled', 'disabled', 'all']: log.info(f'Creating partial SQL string for "enabled" check. Enabled: {enabled}') if enabled == 'enabled': # sql = f'AND `person`.enable = :enable' sql = f'AND `{table_name}`.enable = true' enable = True elif enabled == 'disabled': # sql = f'AND `person`.enable = :enable' sql = f'AND `{table_name}`.enable = false' enable = False elif enabled == 'all': sql = f'AND (`{table_name}`.enable = true OR `{table_name}`.enable = false OR `{table_name}`.enable IS NULL)' enable = None log.debug(sql) return sql, enable else: return False # ### END ### API DB SQL Methods ### sql_enable_part() ### # ### BEGIN ### API DB SQL Methods ### sql_hidden_part() ### # Updated 2022-01-17 @logger_reset def sql_hidden_part(table_name: str, hidden: str) -> bool|dict: log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) if not table_name: return False if hidden in ['hidden', 'not_hidden', 'all']: log.info(f'Creating partial SQL string for "hidden" check. Hide: {hidden}') if hidden == 'hidden': sql = f'AND `{table_name}`.hide = true' hide = True elif hidden == 'not_hidden': sql = f'AND (`{table_name}`.hide = false OR `{table_name}`.hide IS NULL)' hide = False elif hidden == 'all': sql = f'AND (`{table_name}`.hide = true OR `{table_name}`.hide = false OR `{table_name}`.hide IS NULL)' hide = None log.debug(sql) return sql, hide else: return False # ### END ### API DB SQL Methods ### sql_enable_part() ### # ### BEGIN ### API DB SQL Methods ### sql_limit_offset_part() ### # Updated 2022-01-17 @logger_reset def sql_limit_offset_part(limit: int, offset: int = 0) -> bool|str: log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) if limit >= 0 and offset >= 0: log.info(f'Creating partial SQL string for LIMIT and OFFSET. Limit: {limit}; Offset: {offset}') sql = f'LIMIT {limit} OFFSET {offset}' return sql else: return False # ### END ### API DB SQL Methods ### sql_limit_offset_part() ### # ### BEGIN ### API DB SQL Methods ### sql_search_qry_part() ### # NEW 2026-01-02 # Updated to support complex POST-based searches with recursive logical grouping. @logger_reset def sql_search_qry_part( search_query: Any, # SearchQuery model instance searchable_fields: List[str]|None = None, # List of allowed fields max_depth: int = 5, # Maximum recursion depth ) -> tuple[str, dict]: """ Recursively builds a SQL WHERE clause from a SearchQuery model. Uses unique parameter names to prevent collisions. Enforces security via field allowlist and recursion depth limits. """ log.setLevel(logging.INFO) log.debug(locals()) data = {} param_counter = [0] def get_param_name(): param_counter[0] += 1 return f"sp_{param_counter[0]}" operator_map = { "eq": "=", "ne": "!=", "gt": ">", "gte": ">=", "lt": "<", "lte": "<=", "like": "LIKE", "in": "IN", "is_null": "IS NULL", "is_not_null": "IS NOT NULL", "contains": "LIKE", "icontains": "LIKE", "startswith": "LIKE", "istartswith": "LIKE", "endswith": "LIKE", "iendswith": "LIKE" } def process_node(query_node, current_depth: int) -> str: if current_depth > max_depth: raise HTTPException(status_code=400, detail=f"Search query too complex (max depth {max_depth} reached).") clauses = [] # Process 'query_string' (Standardized Full-Text Search) if hasattr(query_node, 'query_string') and query_node.query_string: p_name = get_param_name() clauses.append(f"MATCH( default_qry_str ) AGAINST( :{p_name} IN BOOLEAN MODE )") data[p_name] = query_node.query_string # Process 'and' filters if hasattr(query_node, 'and_filters') and query_node.and_filters: and_clauses = [] for item in query_node.and_filters: if hasattr(item, 'field'): # SearchFilter clause, item_data = process_filter(item) and_clauses.append(clause) data.update(item_data) else: # Nested SearchQuery and_clauses.append(f"({process_node(item, current_depth + 1)})") if and_clauses: clauses.append(f"({' AND '.join(and_clauses)})") # Process 'or' filters if hasattr(query_node, 'or_filters') and query_node.or_filters: or_clauses = [] for item in query_node.or_filters: if hasattr(item, 'field'): # SearchFilter clause, item_data = process_filter(item) or_clauses.append(clause) data.update(item_data) else: # Nested SearchQuery or_clauses.append(f"({process_node(item, current_depth + 1)})") if or_clauses: clauses.append(f"({' OR '.join(or_clauses)})") return ' AND '.join(clauses) def process_filter(f) -> tuple[str, dict]: # Field Validation: Check against allowlist if searchable_fields is not None and f.field not in searchable_fields: raise HTTPException(status_code=400, detail=f"Searching on field '{f.field}' is not permitted.") op_lower = f.op.lower() sql_op = operator_map.get(op_lower) if not sql_op: raise HTTPException(status_code=400, detail=f"Unsupported search operator: {f.op}") filter_data = {} if op_lower in ['is_null', 'is_not_null']: clause = f"`{f.field}` {sql_op}" elif op_lower == 'in': p_name = get_param_name() clause = f"`{f.field}` IN (:{p_name})" filter_data[p_name] = f.value elif op_lower in ['contains', 'icontains']: p_name = get_param_name() clause = f"`{f.field}` LIKE :{p_name}" filter_data[p_name] = f"%{f.value}%" elif op_lower in ['startswith', 'istartswith']: p_name = get_param_name() clause = f"`{f.field}` LIKE :{p_name}" filter_data[p_name] = f"{f.value}%" elif op_lower in ['endswith', 'iendswith']: p_name = get_param_name() clause = f"`{f.field}` LIKE :{p_name}" filter_data[p_name] = f"%{f.value}" else: p_name = get_param_name() clause = f"`{f.field}` {sql_op} :{p_name}" filter_data[p_name] = f.value return clause, filter_data # Initial processing sql_where = process_node(search_query, 1) if sql_where: return f"AND ({sql_where})", data return "", {} # ### END ### API DB SQL Methods ### sql_search_qry_part() ###