Files
OSIT-AE-API-FastAPI/app/db_sql.py
2021-03-05 17:27:16 -05:00

500 lines
19 KiB
Python

from app.config import settings
from .log import *
from sqlalchemy import create_engine, text
from sqlalchemy.exc import IntegrityError, OperationalError
db_uri = settings.SQLALCHEMY_DATABASE_URI
connection_string = db_uri
engine = create_engine(name_or_url=connection_string, pool_size=25, pool_recycle=60, pool_pre_ping=True, echo=True, echo_pool=True, isolation_level='READ COMMITTED')
# NOTE: The default isolation_level is 'REPEATABLE READ'. This can sometimes not show updated data.
db = engine.connect()
# #### ### ## # BEGIN SQL # ## ### ####
# Create, Read/Get, Update, Delete
# CRUD or CGUD
# ### BEGIN ### Core Help CRUD ### sql_insert() ###
def sql_insert(sql=None, data=None, table_name=None, rm_id_random=None, id_random_length=None):
log.setLevel(logging.ERROR) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
return False
# ### END ### Core Help CRUD ### sql_insert() ###
# ### BEGIN ### Core Help CRUD ### sql_update() ###
def sql_update(sql=None, data=None, table_name=None, rm_id_random=None, id_random_length=None):
log.setLevel(logging.ERROR) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
return False
# ### END ### Core Help CRUD ### sql_update() ###
# ### BEGIN ### Core Help CRUD ### sql_insert_or_update() ###
# The catch all SQL INSERT or UPDATE function - STI 2021-02-17
# This one does it all for SQL INSERT and UPDATE queries
def sql_insert_or_update(sql:str=None, data:dict=None, table_name:str=None, rm_id_random:bool=None, id_random_length:int=None):
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
#if sql: pass
#else:
#log.error('SQL text is missing')
#return False
if sql:
sql_insert_or_update = text(sql)
elif table_name and data:
if rm_id_random:
data = lookup_id_random_pop(obj_data=data)
if not data.get('id_random', None) and id_random_length:
data['id_random'] = secrets.token_urlsafe(id_random_length)
fields = []
values = []
for key, value in data.items():
if key != 'id': # A special exception for the id auto increment field.
fields.append('`'+str(key)+'`')
values.append(':'+str(key))
fields_string = ', '.join(fields)
values_string = ', '.join(values)
field_list = []
for key, value in data.items():
if key != 'id': # Creating a special exception for the id field.
field_list.append('`'+str(key) + '` = :' + str(key))
set_values_string = ', '.join(field_list)
sql_insert_or_update = text(
f"""
INSERT INTO `{table_name}` ({fields_string}) VALUES ({values_string})
ON DUPLICATE KEY UPDATE
{set_values_string}
;
"""
)
log.debug(f"""
INSERT INTO `{table_name}` ({fields_string}) VALUES ({values_string})
ON DUPLICATE KEY UPDATE
{set_values_string}
;
""")
trans = db.begin()
try:
result_insert = db.execute(sql_insert_or_update, data)
trans.commit()
except Exception as e:
trans.rollback()
log.exception('*** An exception happened. ***')
log.exception(repr(e))
log.exception('***')
log.exception(str(e))
log.exception('^^^ exception ^^^')
return False
else:
if result_insert.rowcount == 1 and result_insert.lastrowid > 0: # insert
log.info('Insert record')
record_id = result_insert.lastrowid
return record_id
elif result_insert.rowcount == 1 and result_insert.lastrowid == 0: # update with no change
log.info('Update record with no change')
return True
elif result_insert.rowcount == 2 and result_insert.lastrowid > 0: # update with change
log.info('Update record with changes')
record_id = result_insert.lastrowid
return record_id
else:
log.debug(result_insert)
log.debug(vars(result_insert))
log.debug(dir(result_insert))
log.debug(result_insert.rowcount) # returns 1 on insert and 2 on update with change
log.debug(result_insert.lastrowid) # returns last row ID on insert and update with a change and returns 0 if nothing changed
return False
return False
# ### END ### Core Help CRUD ### sql_insert_or_update() ###
# ### BEGIN ### Core Help CRUD ### sql_select() ###
# The catch all SQL SELECT function - STI 2021-02-17
# This one does it all for SQL SELECT queries
def sql_select(table_name=None, record_id=None, record_id_random=None, field_name=None, field_value=None, sql=None, data=None, rm_id_random=None, as_dict=True, as_list=None):
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
if table_name and not (record_id or record_id_random or field_name or field_value or sql or data):
# Select all records from a table
log.info('Select all records from a table')
sql = text(
f"""
SELECT *
FROM `{table_name}`
;
"""
)
elif table_name and (record_id or record_id_random) and not (field_name or field_value or sql or data):
# Select all records from a table with an ID (auto or random)
log.info('Select all records from a table with an ID (auto or random)')
data = {}
if record_id:
data['record_id'] = record_id
sql = text(
f"""
SELECT *
FROM `{table_name}`
WHERE `{table_name}`.id = :record_id
;
"""
)
elif record_id_random:
data['record_id_random'] = record_id_random
sql = text(
f"""
SELECT *
FROM `{table_name}`
WHERE `{table_name}`.id_random = :record_id_random
;
"""
)
elif table_name and field_name and field_value and not (record_id or record_id_random or sql or data):
# Select all records from a table with a specific field and field value
log.info('Select all records from a table with a specific field and field value')
data = {}
data[field_name] = field_value
sql = text(
f"""
SELECT *
FROM `{table_name}`
WHERE `{table_name}`.{field_name} = :{field_name}
;
"""
)
elif table_name and data and not (record_id or record_id_random or field_name or field_value or sql):
# Select all records from a table with a specific list of fields and field values (list of dicts)
log.info('Select all records from a table with a specific list of fields and field values (list of dicts)')
if rm_id_random:
data = lookup_id_random_pop(obj_data=data)
sql_where = []
for field_name in data:
sql_where_line = f"""`{table_name}`.{field_name} = :{field_name}"""
sql_where.append(sql_where_line)
sql_where_string = ' AND '.join(sql_where)
log.debug(sql_where_string)
sql = text(
f"""
SELECT *
FROM `{table_name}`
WHERE {sql_where_string}
;
"""
)
elif sql and not (table_name or record_id or record_id_random or field_name or field_value or data):
# Select records based on the SQL statement given
log.info('Select records based on the SQL statement given')
sql = text(sql)
elif sql and data and not (table_name or record_id or record_id_random or field_name or field_value):
# Select records based on the SQL statement given and with the matching data dict fields and values
if rm_id_random:
data = lookup_id_random_pop(obj_data=data)
log.info('Select records based on the SQL statement given and with the matching data dict fields and values')
sql = text(sql)
else:
# Nothing matched the expected combination of parameters passed to this function
log.warning('Nothing matched the expected combination of parameters passed to this function')
return False # Not successful
#log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(sql)
log.debug(data)
try:
if data:
log.info('Executing with SQL statement and data...')
result = db.execute(sql, data)
else:
log.info('Executing with SQL statement only...')
result = db.execute(sql)
except OperationalError as e:
log.warning('*** An exception happened: OperationalError ***')
log.warning('* This is likely a "MySQL server has gone away" error. Going to try again... *')
log.warning(repr(e))
log.warning('***')
log.warning(str(e))
log.warning('^^^ exception ^^^')
log.warning('Trying to recreate the pool...')
log.debug('############## ############')
log.debug(dir(db))
log.debug(vars(db))
log.debug('############## ############')
log.debug(dir(db.engine))
log.debug(vars(db.engine))
log.debug('############## ############')
log.debug(dir(db.engine.pool))
log.debug(vars(db.engine.pool))
log.debug('############## ############')
db.engine.dispose()
log.warning('Now trying the query again...')
try:
if data:
log.warning('2x Executing with SQL statement and data...')
result = db.execute(sql, data)
else:
log.warning('2x Executing with SQL statement only...')
result = db.execute(sql)
except Exception as e:
log.warning('2x A *second* exception happened. Returning False.')
log.exception(repr(e))
log.exception('***')
log.exception(str(e))
log.exception('^^^ exception ^^^')
return False # Not successful
else:
log.info('Successfully executed the SQL on the second try.')
pass
except Exception as e:
log.info('An exception happened. Returning False.')
log.exception(repr(e))
log.exception('***')
log.exception(str(e))
log.exception('^^^ exception ^^^')
return False # Not successful
else:
log.info('Successfully executed the SQL on the first try.')
pass
#log.debug(result.fetchall()) # Uncommenting this breaks things?
# BEGIN NOTE: Check this out later! ###
#header = result.keys()
#for row in result:
# yield dict(zip(header, row))
# END NOTE: Check this out later! ###
#log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(result.rowcount)
log.debug(vars(result))
log.debug(dir(result))
if result.rowcount == 1:
log.info(f'Found one record. as_dict={as_dict}, as_list={as_list}')
if as_dict:
record = sql_result_proxy_to_dict_simple(result_proxy=result.first())
else:
record = result.first()
if as_list:
record_li = []
record_li.append(record)
#log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(record_li)
return record_li # Successful
else:
#log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(record)
return record # Successful
elif result.rowcount > 1:
log.info(f'Found {result.rowcount} records. as_dict={as_dict}, as_list={as_list}')
#log.info('Found more than one record. Returning as a list of dicts.')
if as_dict:
record_li = sql_result_proxy_to_dict_simple(result_proxy=result.fetchall())
else:
record_li = result.fetchall()
log.debug(record_li)
return record_li # Successful
else:
log.info('No records found. Returning None.')
log.debug(result)
return None # Successful
# ### END ### Core Help CRUD ### sql_select() ###
# ### BEGIN ### Core Help CRUD ### sql_delete() ###
# The catch all SQL DELETE function - STI 2021-02-17
# This one does it all for SQL DELETE queries
def sql_delete(table_name:str=None, record_id:int=None, record_id_random:str=None, field_name:str=None, field_value=None, sql:str=None, data:dict=None):
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
if table_name and (record_id or record_id_random) and not (field_name or field_value or sql or data):
# Delete all records from a table with an ID (auto or random)
log.info('Delete all records from a table with an ID (auto or random)')
data = {}
if record_id:
data['record_id'] = record_id
sql = text(
f"""
DELETE FROM `{table_name}`
WHERE `{table_name}`.id = :record_id
"""
)
elif record_id_random:
data['record_id_random'] = record_id_random
sql = text(
f"""
DELETE FROM `{table_name}`
WHERE `{table_name}`.id_random = :record_id_random
"""
)
elif table_name and field_name and field_value and not (record_id or record_id_random or sql or data):
# Delete all records from a table with a specific field and field value
log.info('Delete all records from a table with a specific field and field value')
data = {}
data[field_name] = field_value
sql = text(
f"""
DELETE FROM `{table_name}`
WHERE `{table_name}`.{field_name} = :{field_name}
"""
)
elif table_name and data and not (record_id or record_id_random or field_name or field_value or sql):
# Delete all records from a table with a specific list of fields and field values (list of dicts)
log.info('Delete all records from a table with a specific list of fields and field values (list of dicts)')
sql_where = []
for field_name in data:
sql_where_line = f"""`{table_name}`.{field_name} = :{field_name}"""
sql_where.append(sql_where_line)
sql_where_string = ' AND '.join(sql_where)
log.debug(sql_where_string)
sql = text(
f"""
DELETE FROM `{table_name}`
WHERE {sql_where_string}
"""
)
log.debug(sql)
elif sql and not (table_name or record_id or record_id_random or field_name or field_value or data):
# Delete records based on the SQL statement given
log.info('Delete records based on the SQL statement given')
sql = text(sql)
elif sql and data and not (table_name or record_id or record_id_random or field_name or field_value):
# Delete records based on the SQL statement given and with the matching data dict fields and values
log.info('Delete records based on the SQL statement given and with the matching data dict fields and values')
sql = text(sql)
else:
# Nothing matched the expected combination of parameters passed to this function
log.warning('Nothing matched the expected combination of parameters passed to this function')
return False # Not successful
log.debug(sql)
try:
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
if data:
log.info('Executing with SQL (DELETE) statement and data...')
result = db.execute(sql, data)
else:
log.info('Executing with SQL (DELETE?) statement only...')
result = db.execute(sql)
log.debug(result)
log.debug(dir(result))
log.debug(vars(result))
except OperationalError as e:
log.warning('*** An exception happened: OperationalError ***')
log.warning('* This is likely a "MySQL server has gone away" error. Going to try again... *')
log.warning(repr(e))
log.warning('***')
log.warning(str(e))
log.warning('^^^ exception ^^^')
log.warning('Trying to recreate the pool...')
log.debug('############## ############')
log.debug(dir(db))
log.debug(vars(db))
log.debug('############## ############')
log.debug(dir(db.engine))
log.debug(vars(db.engine))
log.debug('############## ############')
log.debug(dir(db.engine.pool))
log.debug(vars(db.engine.pool))
log.debug('############## ############')
db.engine.dispose()
log.warning('Now trying the query again...')
try:
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
if data:
log.warning('2x Executing with SQL statement and data...')
result = db.execute(sql, data)
else:
log.warning('2x Executing with SQL statement only...')
result = db.execute(sql)
log.debug(result)
except Exception as e:
log.warning('2x A *second* exception happened. Returning False.')
log.exception(repr(e))
log.exception('***')
log.exception(str(e))
log.exception('^^^ exception ^^^')
return False # Not successful
else:
log.info('Successfully executed the SQL on the second try.')
pass
except Exception as e:
log.info('An exception happened. Returning False.')
log.exception(repr(e))
log.exception('***')
log.exception(str(e))
log.exception('^^^ exception ^^^')
return False # Not successful
else:
log.info('Successfully executed the SQL on the first try.')
pass
# NOTE: Need to deal with 0 rows affected when the WHERE clause was not satisfied and there was no error.
return True # Successful
# NOTE WARNING: This is a near duplicate of what is under lib_rest (was lib_general). WARNING
# Change SQL SELECT result RowProxy record to a dict (named key/value)
# Change SQL SELECT list result RowProxy records to a list of dicts (named key/value)
def sql_result_proxy_to_dict_simple(result_proxy=None):
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
log.debug(type(result_proxy))
if isinstance(result_proxy, list):
log.info('Processing a SQL list...')
record_li = []
for row_proxy in result_proxy:
log.debug(row_proxy)
record = {}
for key, value in row_proxy.items():
record[key] = value
record_li.append(record)
return record_li
# Must import sqlalchemy to check the type correctly.
# Or convert it to a string and compare.
if str(type(result_proxy)) == '<class \'sqlalchemy.engine.result.RowProxy\'>':
#if isinstance(result_proxy, sqlalchemy.engine.result.RowProxy):
log.info('Processing a SQL record (sqlalchemy.engine.result.RowProxy)')
row_proxy = result_proxy
record = {}
for key, value in row_proxy.items():
record[key] = value
return record
return False