from __future__ import annotations import secrets from timeit import default_timer as timer from app.config import settings from .log import * #from .lib_general import lookup_id_random_pop from sqlalchemy import create_engine, text from sqlalchemy.exc import IntegrityError, OperationalError db_uri = settings.SQLALCHEMY_DATABASE_URI connection_string = db_uri engine = create_engine(url=connection_string, pool_size=25, pool_recycle=60, pool_pre_ping=True, echo=False, echo_pool=True, isolation_level='READ COMMITTED') # NOTE: The default isolation_level is 'REPEATABLE READ'. This can sometimes not show updated data. # NOTE: The "echo" set to True option shows the SQL queries. db = engine.connect() # #### ### ## # BEGIN SQL # ## ### #### # Create, Read/Get, Update, Delete # CRUD or CGUD # ### BEGIN ### Core Help CRUD ### sql_insert() ### def sql_insert(sql:str=None, data:dict=None, table_name:str=None, id_random_length:int=8): log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) if sql: sql_insert = text(sql) elif table_name and data: #if rm_id_random: #data = lookup_id_random_pop(obj_data=data) if not data.get('id_random', None) and id_random_length: data['id_random'] = secrets.token_urlsafe(id_random_length) log.debug(data) fields = [] values = [] for key, value in data.items(): if key != 'id': # A special exception for the id auto increment field. fields.append('`'+str(key)+'`') values.append(':'+str(key)) fields_string = ', '.join(fields) values_string = ', '.join(values) log.debug(fields_string) log.debug(values_string) field_list = [] for key, value in data.items(): if key != 'id': # Creating a special exception for the id field. field_list.append('`'+str(key) + '` = :' + str(key)) set_values_string = ', '.join(field_list) sql_insert = text(f""" INSERT INTO `{table_name}` ({fields_string}) VALUES ({values_string}); """ ) print(sql_insert) log.debug(f""" INSERT INTO `{table_name}` ({fields_string}) VALUES ({values_string}); """ ) trans = db.begin() try: result_insert = db.execute(sql_insert, data) trans.commit() except Exception as e: trans.rollback() log.exception('*** An exception happened. ***') log.exception(repr(e)) log.exception('***') log.exception(str(e)) log.exception('^^^ exception ^^^') return False else: log.debug(result_insert) log.debug(f'rowcount = {result_insert.rowcount}; lastrowid = {result_insert.lastrowid}') if result_insert.rowcount == 1 and result_insert.lastrowid > 0: # insert log.info('Insert record') log.debug(result_insert.lastrowid) record_id = result_insert.lastrowid return record_id #elif result_insert.rowcount == 1 and result_insert.lastrowid == 0: # update with no change #log.info('Update record with no change') #return True #elif result_insert.rowcount == 2 and result_insert.lastrowid > 0: # update with change #log.info('Update record with changes') #record_id = result_insert.lastrowid #return record_id else: log.debug(result_insert) log.debug(vars(result_insert)) log.debug(dir(result_insert)) log.debug(result_insert.rowcount) # returns 1 on insert and 2 on update with change log.debug(result_insert.lastrowid) # returns last row ID on insert and update with a change and returns 0 if nothing changed return False return False # ### END ### Core Help CRUD ### sql_insert() ### # ### BEGIN ### Core Help CRUD ### sql_update() ### def sql_update(sql:str=None, data:dict=None, table_name:str=None, record_id:int=None, record_id_random:str=None, rm_id_random=None, id_random_length:None|int=8): log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) if sql: sql_update = text(sql) elif table_name and data: #if rm_id_random: #data = lookup_id_random_pop(obj_data=data) if not data.get('id_random', None) and id_random_length: data['id_random'] = secrets.token_urlsafe(id_random_length) log.debug(data) fields_string = [] for key, value in data.items(): if key != 'id': # Creating a special exception for the id field. fields_string.append('`'+str(key) + '` = :' + str(key)) sql_set = ', '.join(fields_string) if record_id: log.info('Update record with ID') data['id'] = record_id sql = 'UPDATE `'+table_name+'` SET '+ sql_set + ' WHERE id = :id' elif record_id_random: log.info('Update record with ID random') data['id_random'] = record_id_random sql = 'UPDATE `'+table_name+'` SET '+ sql_set + ' WHERE id_random = :id_random' elif 'id' in data: log.info('Update record with ID') sql = 'UPDATE `'+table_name+'` SET '+ sql_set + ' WHERE id = :id' elif 'id_random' in data: # NOTE: For now it is not possible to update the id_random when supplying the id_random as the primary key for a record. # NOTE: In the future I can use record_id_random=True as a special case SQL UPDATE. log.info('Update record with ID random') sql = 'UPDATE `'+table_name+'` SET '+ sql_set + ' WHERE id_random = :id_random' else: return False sql_update = text(sql) trans = db.begin() try: result_update = db.execute(sql_update, data) trans.commit() except Exception as e: trans.rollback() log.exception('*** An exception happened. ***') log.exception(repr(e)) log.exception('***') log.exception(str(e)) log.exception('^^^ exception ^^^') return False else: log.debug(result_update) log.debug(f'rowcount = {result_update.rowcount}; lastrowid = {result_update.lastrowid}') if result_update.rowcount == 1 and result_update.lastrowid == 0: # update with no change log.info('Update record (with no change???)') # With SQL UPDATE this record may have actually changed return True elif result_update.rowcount == 2 and result_update.lastrowid > 0: # update with change log.warning('Should we be here???') log.info('Update record with changes') record_id = result_update.lastrowid return record_id else: log.debug(result_update) log.debug(vars(result_update)) log.debug(dir(result_update)) log.debug(result_update.rowcount) # returns 1 on insert and 2 on update with change log.debug(result_update.lastrowid) # returns last row ID on insert and update with a change and returns 0 if nothing changed return False return False # ### END ### Core Help CRUD ### sql_update() ### # ### BEGIN ### Core Help CRUD ### sql_insert_or_update() ### # The catch all SQL INSERT or UPDATE function - STI 2021-02-17 # This one does it all for SQL INSERT and UPDATE queries def sql_insert_or_update(sql:str=None, data:dict=None, table_name:str=None, rm_id_random:bool=None, id_random_length:int=None): log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) #if sql: pass #else: #log.error('SQL text is missing') #return False if sql: sql_insert_or_update = text(sql) elif table_name and data: if rm_id_random: data = lookup_id_random_pop(obj_data=data) if not data.get('id_random', None) and id_random_length: data['id_random'] = secrets.token_urlsafe(id_random_length) fields = [] values = [] for key, value in data.items(): if key != 'id': # A special exception for the id auto increment field. fields.append('`'+str(key)+'`') values.append(':'+str(key)) fields_string = ', '.join(fields) values_string = ', '.join(values) field_list = [] for key, value in data.items(): if key != 'id': # Creating a special exception for the id field. field_list.append('`'+str(key) + '` = :' + str(key)) set_values_string = ', '.join(field_list) sql_insert_or_update = text(f""" INSERT INTO `{table_name}` ({fields_string}) VALUES ({values_string}) ON DUPLICATE KEY UPDATE {set_values_string} ; """) log.debug(f""" INSERT INTO `{table_name}` ({fields_string}) VALUES ({values_string}) ON DUPLICATE KEY UPDATE {set_values_string} ; """) trans = db.begin() try: result_insert_or_update = db.execute(sql_insert_or_update, data) trans.commit() except Exception as e: trans.rollback() log.exception('*** An exception happened. ***') log.exception(repr(e)) log.exception('***') log.exception(str(e)) log.exception('^^^ exception ^^^') return False else: log.debug(result_insert_or_update) log.debug(f'rowcount = {result_insert_or_update.rowcount}; lastrowid = {result_insert_or_update.lastrowid}') if result_insert_or_update.rowcount == 1 and result_insert_or_update.lastrowid > 0: # insert log.info('Insert record') record_id = result_insert_or_update.lastrowid return record_id elif result_insert_or_update.rowcount == 1 and result_insert_or_update.lastrowid == 0: # update with no change log.info('Update record with no change') return True elif result_insert_or_update.rowcount == 2 and result_insert_or_update.lastrowid > 0: # update with change log.info('Update record with changes') record_id = result_insert_or_update.lastrowid return record_id else: log.debug(result_insert_or_update) log.debug(vars(result_insert_or_update)) log.debug(dir(result_insert_or_update)) log.debug(result_insert_or_update.rowcount) # returns 1 on insert and 2 on update with change log.debug(result_insert_or_update.lastrowid) # returns last row ID on insert and update with a change and returns 0 if nothing changed return False return False # ### END ### Core Help CRUD ### sql_insert_or_update() ### # ### BEGIN ### Core Help CRUD ### sql_select() ### # The catch all SQL SELECT function - STI 2021-02-17 # This one does it all for SQL SELECT queries def sql_select( table_name=None, record_id=None, record_id_random=None, field_name=None, field_value=None, sql=None, data=None, rm_id_random=None, as_dict=True, as_list=None ): #log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) if table_name and not (record_id or record_id_random or field_name or field_value or sql or data): # Select all records from a table log.info('Select all records from a table') sql = text( f""" SELECT * FROM `{table_name}` ; """ ) elif table_name and (record_id or record_id_random) and not (field_name or field_value or sql or data): # Select all records from a table with an ID (auto or random) log.info('Select all records from a table with an ID (auto or random)') data = {} if record_id: data['record_id'] = record_id sql = text( f""" SELECT * FROM `{table_name}` WHERE `{table_name}`.id = :record_id ; """ ) elif record_id_random: data['record_id_random'] = record_id_random sql = text( f""" SELECT * FROM `{table_name}` WHERE `{table_name}`.id_random = :record_id_random ; """ ) elif table_name and field_name and field_value and not (record_id or record_id_random or sql or data): # Select all records from a table with a specific field and field value log.info('Select all records from a table with a specific field and field value') data = {} data[field_name] = field_value sql = text( f""" SELECT * FROM `{table_name}` WHERE `{table_name}`.{field_name} = :{field_name} ; """ ) elif table_name and data and not (record_id or record_id_random or field_name or field_value or sql): # Select all records from a table with a specific list of fields and field values (list of dicts) log.info('Select all records from a table with a specific list of fields and field values (list of dicts)') if rm_id_random: data = lookup_id_random_pop(obj_data=data) sql_where = [] for field_name in data: sql_where_line = f"""`{table_name}`.{field_name} = :{field_name}""" sql_where.append(sql_where_line) sql_where_string = ' AND '.join(sql_where) log.debug(sql_where_string) sql = text( f""" SELECT * FROM `{table_name}` WHERE {sql_where_string} ; """ ) elif sql and not (table_name or record_id or record_id_random or field_name or field_value or data): # Select records based on the SQL statement given log.info('Select records based on the SQL statement given') sql = text(sql) elif sql and data and not (table_name or record_id or record_id_random or field_name or field_value): # Select records based on the SQL statement given and with the matching data dict fields and values if rm_id_random: data = lookup_id_random_pop(obj_data=data) log.info('Select records based on the SQL statement given and with the matching data dict fields and values') sql = text(sql) else: # Nothing matched the expected combination of parameters passed to this function log.warning('Nothing matched the expected combination of parameters passed to this function') return False # Not successful #log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(sql) log.debug(data) try: if data: log.info('Executing with SQL statement and data...') result = db.execute(sql, data) else: log.info('Executing with SQL statement only...') result = db.execute(sql) except OperationalError as e: log.warning('*** An exception happened: OperationalError ***') log.warning('* This is likely a "MySQL server has gone away" error. Going to try again... *') log.warning(repr(e)) log.warning('***') log.warning(str(e)) log.warning('^^^ exception ^^^') log.warning('Trying to recreate the pool...') log.debug('############## ############') log.debug(dir(db)) log.debug(vars(db)) log.debug('############## ############') log.debug(dir(db.engine)) log.debug(vars(db.engine)) log.debug('############## ############') log.debug(dir(db.engine.pool)) log.debug(vars(db.engine.pool)) log.debug('############## ############') db.engine.dispose() log.warning('Now trying the query again...') try: if data: log.warning('2x Executing with SQL statement and data...') result = db.execute(sql, data) else: log.warning('2x Executing with SQL statement only...') result = db.execute(sql) except Exception as e: log.warning('2x A *second* exception happened. Returning False.') log.exception(repr(e)) log.exception('***') log.exception(str(e)) log.exception('^^^ exception ^^^') return False # Not successful else: log.info('Successfully executed the SQL on the second try.') pass except Exception as e: log.info('An exception happened. Returning False.') log.exception(repr(e)) log.exception('***') log.exception(str(e)) log.exception('^^^ exception ^^^') return False # Not successful else: log.info('Successfully executed the SQL on the first try.') pass #log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(result.rowcount) log.debug(vars(result)) log.debug(dir(result)) if result.rowcount == 1: log.info(f'Found one record. as_dict={as_dict}, as_list={as_list}') if as_dict: # After testing, this method is the fastest way to convert to a dict - STI 2021-03-09 # my custom sql_result_proxy_to_dict_simple(result_proxy=result.first()) is slower record = dict(result.first()) else: record = result.first() if as_list: record_li = [] record_li.append(record) #log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(record_li) return record_li # Successful else: #log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(record) return record # Successful elif result.rowcount > 1: log.info(f'Found {result.rowcount} records. as_dict={as_dict}, as_list={as_list}') #log.info('Found more than one record. Returning as a list of dicts.') if as_dict: # After testing, this method is the fastest way to convert to a list of dicts - STI 2021-03-09 # list(result) was tested and seems to be the slowest # my custom sql_result_proxy_to_dict_simple(result_proxy=result.fetchall()) was tested and is only slightly faster than list(result) #timer_1_start = timer() record_li = [dict(record) for record in result.fetchall()] #log.debug(record_li) #log.debug(type(record_li)) #log.debug(type(record_li[0])) #timer_1_end = timer() #log.debug( round((timer_1_end - timer_1_start), 8) ) else: record_li = result.fetchall() log.debug(record_li) return record_li # Successful else: log.info('No records found. Returning None.') log.debug(result) return None # Successful # ### END ### Core Help CRUD ### sql_select() ### # ### BEGIN ### Core Help CRUD ### sql_delete() ### # The catch all SQL DELETE function - STI 2021-02-17 # This one does it all for SQL DELETE queries def sql_delete( table_name:str=None, record_id:int=None, record_id_random:str=None, field_name:str=None, field_value=None, sql:str=None, data:dict=None ): log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) if table_name and (record_id or record_id_random) and not (field_name or field_value or sql or data): # Delete all records from a table with an ID (auto or random) log.info('Delete all records from a table with an ID (auto or random)') data = {} if record_id: data['record_id'] = record_id sql = text( f""" DELETE FROM `{table_name}` WHERE `{table_name}`.id = :record_id """ ) elif record_id_random: data['record_id_random'] = record_id_random sql = text( f""" DELETE FROM `{table_name}` WHERE `{table_name}`.id_random = :record_id_random """ ) elif table_name and field_name and field_value and not (record_id or record_id_random or sql or data): # Delete all records from a table with a specific field and field value log.info('Delete all records from a table with a specific field and field value') data = {} data[field_name] = field_value sql = text( f""" DELETE FROM `{table_name}` WHERE `{table_name}`.{field_name} = :{field_name} """ ) elif table_name and data and not (record_id or record_id_random or field_name or field_value or sql): # Delete all records from a table with a specific list of fields and field values (list of dicts) log.info('Delete all records from a table with a specific list of fields and field values (list of dicts)') sql_where = [] for field_name in data: sql_where_line = f"""`{table_name}`.{field_name} = :{field_name}""" sql_where.append(sql_where_line) sql_where_string = ' AND '.join(sql_where) log.debug(sql_where_string) sql = text( f""" DELETE FROM `{table_name}` WHERE {sql_where_string} """ ) log.debug(sql) elif sql and not (table_name or record_id or record_id_random or field_name or field_value or data): # Delete records based on the SQL statement given log.info('Delete records based on the SQL statement given') sql = text(sql) elif sql and data and not (table_name or record_id or record_id_random or field_name or field_value): # Delete records based on the SQL statement given and with the matching data dict fields and values log.info('Delete records based on the SQL statement given and with the matching data dict fields and values') sql = text(sql) else: # Nothing matched the expected combination of parameters passed to this function log.warning('Nothing matched the expected combination of parameters passed to this function') return False # Not successful log.debug(sql) try: log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL if data: log.info('Executing with SQL (DELETE) statement and data...') result = db.execute(sql, data) else: log.info('Executing with SQL (DELETE?) statement only...') result = db.execute(sql) log.debug(result) log.debug(dir(result)) log.debug(vars(result)) except OperationalError as e: log.warning('*** An exception happened: OperationalError ***') log.warning('* This is likely a "MySQL server has gone away" error. Going to try again... *') log.warning(repr(e)) log.warning('***') log.warning(str(e)) log.warning('^^^ exception ^^^') log.warning('Trying to recreate the pool...') log.debug('############## ############') log.debug(dir(db)) log.debug(vars(db)) log.debug('############## ############') log.debug(dir(db.engine)) log.debug(vars(db.engine)) log.debug('############## ############') log.debug(dir(db.engine.pool)) log.debug(vars(db.engine.pool)) log.debug('############## ############') db.engine.dispose() log.warning('Now trying the query again...') try: log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL if data: log.warning('2x Executing with SQL statement and data...') result = db.execute(sql, data) else: log.warning('2x Executing with SQL statement only...') result = db.execute(sql) log.debug(result) except Exception as e: log.warning('2x A *second* exception happened. Returning False.') log.exception(repr(e)) log.exception('***') log.exception(str(e)) log.exception('^^^ exception ^^^') return False # Not successful else: log.info('Successfully executed the SQL on the second try.') pass except Exception as e: log.info('An exception happened. Returning False.') log.exception(repr(e)) log.exception('***') log.exception(str(e)) log.exception('^^^ exception ^^^') return False # Not successful else: log.info('Successfully executed the SQL on the first try.') pass # NOTE: Need to deal with 0 rows affected when the WHERE clause was not satisfied and there was no error. return True # Successful