Files
OSIT-AE-API-FastAPI/app/db_sql.py

1309 lines
59 KiB
Python

from __future__ import annotations
import datetime, pytz, redis, secrets
from timeit import default_timer as timer
from app.config import settings
from app.log import log, logging, logger_reset
from sqlalchemy import create_engine, text, Time
from sqlalchemy.exc import IntegrityError, OperationalError
db_uri = settings.SQLALCHEMY_DATABASE_URI
connection_string = db_uri
engine = create_engine(url=connection_string, pool_size=25, pool_recycle=60, pool_pre_ping=True, echo=False, echo_pool=True, isolation_level='READ COMMITTED')
# NOTE: The default isolation_level is 'REPEATABLE READ'. This can sometimes not show updated data.
# NOTE: The "echo" set to True option shows the SQL queries.
db = engine.connect()
# log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
# log.debug(db)
# log.debug(vars(db))
# log.debug(dir(db))
# #### ### ## # BEGIN SQL # ## ### ####
# Create, Read/Get, Update, Delete
# CRUD or CGUD
# ### BEGIN ### Core Help CRUD ### sql_insert() ###
# Updated 2021-09-07
@logger_reset
def sql_insert(
sql: str|None = None,
data: dict|None = None,
table_name: str|None = None,
rm_id_random: bool = False,
id_random_length: int = 8,
):
log.setLevel(logging.INFO) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
if sql:
log.info(f'SQL INSERT using sql string.')
log.debug(sql)
sql_insert = text(sql)
elif table_name and data:
log.info(f'SQL INSERT using table_name and data. Table Name: {table_name}')
if rm_id_random:
data = lookup_id_random_pop(obj_data=data)
if not data.get('id_random', None) and id_random_length:
data['id_random'] = secrets.token_urlsafe(id_random_length)
log.debug(data)
fields = []
values = []
for key, value in data.items():
if key != 'id': # A special exception for the id auto increment field.
fields.append('`'+str(key)+'`')
values.append(':'+str(key))
fields_string = ', '.join(fields)
values_string = ', '.join(values)
log.debug(fields_string)
log.debug(values_string)
field_list = []
for key, value in data.items():
if key != 'id': # Creating a special exception for the id field.
field_list.append('`'+str(key) + '` = :' + str(key))
set_values_string = ', '.join(field_list)
sql_insert = text(f"""
INSERT INTO `{table_name}` ({fields_string}) VALUES ({values_string});
"""
)
# print(sql_insert)
log.debug(f"""
INSERT INTO `{table_name}` ({fields_string}) VALUES ({values_string});
"""
)
log.debug(sql_insert)
log.debug(data)
trans = db.begin()
try:
result_insert = db.execute(sql_insert, data)
trans.commit()
except IntegrityError as e: # Specifically want to capture duplicate entry attempts
# http://sqlalche.me/e/14/gkpj
# Need a check for this: sqlalchemy.exc.IntegrityError: (MySQLdb._exceptions.IntegrityError) (1062, "Duplicate entry 'z-yyyy-xxxx-wwww for key 'PRIMARY'"
trans.rollback()
log.error('An integrity error exception happened. This is likely because there was an attempt to create a duplicate entry. Returning None')
log.exception('**** *** ** * ### BEGIN ### Integrity Error Exception Happened: Returning None * ** *** ****')
log.error('^^^^ ^^^ ^^ ^ ### END ### Integrity Error Exception ^ ^^ ^^^ ^^^^')
log.error('**** *** ** * ### BEGIN ### Integrity Error Exception Details: * ** *** ****')
log.error('**** *** ** * SQL Statement: * ** *** ****')
log.error(e.statement)
log.error('**** *** ** * SQL Parameters: * ** *** ****')
log.error(e.params)
log.error('**** *** ** * SQL Origin Message: * ** *** ****')
log.error(e.orig)
log.error('**** *** ** * ### END ### Integrity Error Exception Details: * ** *** ****')
return None
except OperationalError as e: # Likely an unknown field or related
trans.rollback()
log.error('An operational error exception happened. This is likely because there was an unknown field or similar included. Returning False')
log.exception('**** *** ** * ### BEGIN ### Operational Error Exception Happened: Returning False * ** *** ****')
log.error('^^^^ ^^^ ^^ ^ ### END ### Operational Error Exception ^ ^^ ^^^ ^^^^')
log.error('**** *** ** * ### BEGIN ### Operational Error Exception Details: * ** *** ****')
log.error('**** *** ** * SQL Statement: * ** *** ****')
log.error(e.statement)
log.error('**** *** ** * SQL Parameters: * ** *** ****')
log.error(e.params)
log.error('**** *** ** * SQL Origin Message: * ** *** ****')
log.error(e.orig)
log.error('**** *** ** * ### END ### Operational Error Exception Details: * ** *** ****')
return False
except Exception as e:
trans.rollback()
log.error('An unknown exception happened. Returning False')
log.exception('**** *** ** * ### BEGIN ### Exception Happened: Returning False * ** *** ****')
log.error('^^^^ ^^^ ^^ ^ ### END ### Exception ^ ^^ ^^^ ^^^^')
log.error('**** *** ** * ### BEGIN ### Exception Details: * ** *** ****')
log.error('**** *** ** * SQL Statement: * ** *** ****')
log.error(e.statement)
log.error('**** *** ** * SQL Parameters: * ** *** ****')
log.error(e.params)
log.error('**** *** ** * SQL Origin Message: * ** *** ****')
log.error(e.orig)
log.error('**** *** ** * ### END ### Exception Details: * ** *** ****')
return False
else:
log.debug(result_insert)
log.debug(f'rowcount = {result_insert.rowcount}; lastrowid = {result_insert.lastrowid}')
if result_insert.rowcount == 1 and result_insert.lastrowid > 0: # insert
record_id = result_insert.lastrowid
log.info(f'Insert record: {record_id}')
return record_id
#elif result_insert.rowcount == 1 and result_insert.lastrowid == 0: # update with no change
#log.info('Update record with no change')
#return True
#elif result_insert.rowcount == 2 and result_insert.lastrowid > 0: # update with change
#log.info('Update record with changes')
#record_id = result_insert.lastrowid
#return record_id
else:
log.debug(vars(result_insert))
log.debug(dir(result_insert))
log.debug(result_insert.rowcount) # returns 1 on insert and 2 on update with change
log.debug(result_insert.lastrowid) # returns last row ID on insert and update with a change and returns 0 if nothing changed
return False
return False
# ### END ### Core Help CRUD ### sql_insert() ###
# ### BEGIN ### Core Help CRUD ### sql_update() ###
# Updated 2021-09-07
@logger_reset
def sql_update(
sql: str|None = None,
data: dict|None = None,
table_name: str|None = None,
record_id: int|None = None,
record_id_random: str|None = None,
rm_id_random: bool = False,
id_random_length: None|int = None
):
log.setLevel(logging.INFO) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
if sql:
sql_update = text(sql)
elif table_name and data:
if rm_id_random:
data = lookup_id_random_pop(obj_data=data)
if not data.get('id_random', None) and id_random_length:
data['id_random'] = secrets.token_urlsafe(id_random_length)
log.debug(data)
fields_string = []
for key, value in data.items():
if key != 'id': # Creating a special exception for the id field.
fields_string.append('`'+str(key) + '` = :' + str(key))
sql_set = ', '.join(fields_string)
log.debug(sql_set)
if len(sql_set) < 4:
# NOTE: Returning None instead of False since technically the SQL query did not fail. Just that nothing was updated related to that record. I have been returning False if there is a SQL query problem. STI 2022-01-19
# NOTE: A better check might be specifically for 'id' and no other dict values.
log.warning('The SQL SET is unexpectedly short and may not have data. Returning None')
return None
if record_id:
log.info(f'Update record with ID: {record_id}')
data['id'] = record_id
sql = 'UPDATE `'+table_name+'` SET '+ sql_set + ' WHERE id = :id'
elif record_id_random:
log.info(f'Update record with ID random: {record_id_random}')
data['id_random'] = record_id_random
sql = 'UPDATE `'+table_name+'` SET '+ sql_set + ' WHERE id_random = :id_random'
elif 'id' in data:
log.info(f"Update record with ID in data dict: {data['id']}")
sql = 'UPDATE `'+table_name+'` SET '+ sql_set + ' WHERE id = :id'
elif 'id_random' in data:
# NOTE: For now it is not possible to update the id_random when supplying the id_random as the primary key for a record.
# NOTE: In the future I can use record_id_random=True as a special case SQL UPDATE.
log.info(f"Update record with ID in data dict: {data['id_random']}")
sql = 'UPDATE `'+table_name+'` SET '+ sql_set + ' WHERE id_random = :id_random'
else:
log.warning('Something was missing from the sql_update function call.')
return False
sql_update = text(sql)
log.debug(sql_update)
trans = db.begin()
try:
log.info('Trying to execute the SQL UPDATE query...')
result_update = db.execute(sql_update, data)
trans.commit()
except IntegrityError as e: # Specifically want to capture duplicate entry attempts
# http://sqlalche.me/e/14/gkpj
# Need a check for this: sqlalchemy.exc.IntegrityError: (MySQLdb._exceptions.IntegrityError) (1062, "Duplicate entry 'z-yyyy-xxxx-wwww for key 'PRIMARY'"
trans.rollback()
log.error('An integrity error exception happened. This is likely because there was an attempt to create a duplicate entry. Returning None')
log.exception('**** *** ** * ### BEGIN ### Integrity Error Exception Happened: Returning None * ** *** ****')
log.error('^^^^ ^^^ ^^ ^ ### END ### Integrity Error Exception ^ ^^ ^^^ ^^^^')
log.error('**** *** ** * ### BEGIN ### Integrity Error Exception Details: * ** *** ****')
log.error('**** *** ** * SQL Statement: * ** *** ****')
log.error(e.statement)
log.error('**** *** ** * SQL Parameters: * ** *** ****')
log.error(e.params)
log.error('**** *** ** * SQL Origin Message: * ** *** ****')
log.error(e.orig)
log.error('**** *** ** * ### END ### Integrity Error Exception Details: * ** *** ****')
return None
except OperationalError as e: # Likely an unknown field or related
trans.rollback()
log.error('An operational error exception happened. This is likely because there was an unknown field or similar included. Returning False')
log.exception('**** *** ** * ### BEGIN ### Operational Error Exception Happened: Returning False * ** *** ****')
log.error('^^^^ ^^^ ^^ ^ ### END ### Operational Error Exception ^ ^^ ^^^ ^^^^')
log.error('**** *** ** * ### BEGIN ### Operational Error Exception Details: * ** *** ****')
log.error('**** *** ** * SQL Statement: * ** *** ****')
log.error(e.statement)
log.error('**** *** ** * SQL Parameters: * ** *** ****')
log.error(e.params)
log.error('**** *** ** * SQL Origin Message: * ** *** ****')
log.error(e.orig)
log.error('**** *** ** * ### END ### Operational Error Exception Details: * ** *** ****')
return False
except Exception as e:
trans.rollback()
log.error('An unknown exception happened. Returning False')
log.exception('**** *** ** * ### BEGIN ### Exception Happened: Returning False * ** *** ****')
log.error('^^^^ ^^^ ^^ ^ ### END ### Exception ^ ^^ ^^^ ^^^^')
log.error('**** *** ** * ### BEGIN ### Exception Details: * ** *** ****')
log.error('**** *** ** * SQL Statement: * ** *** ****')
log.error(e.statement)
log.error('**** *** ** * SQL Parameters: * ** *** ****')
log.error(e.params)
log.error('**** *** ** * SQL Origin Message: * ** *** ****')
log.error(e.orig)
log.error('**** *** ** * ### END ### Exception Details: * ** *** ****')
return False
else:
log.debug(result_update)
log.debug(f'rowcount = {result_update.rowcount}; lastrowid = {result_update.lastrowid}')
if result_update.rowcount >= 1 and result_update.lastrowid == 0: # update with no change
log.info(f'Updated {result_update.rowcount} records (with no changes?)') # With SQL UPDATE this record may have actually changed
return True
elif result_update.rowcount == 2 and result_update.lastrowid > 0: # update with change
log.warning('Should we be here???')
log.info('Update record with changes')
record_id = result_update.lastrowid
return record_id
else:
log.debug(result_update)
log.debug(vars(result_update))
log.debug(dir(result_update))
log.debug(result_update.rowcount) # returns 1 on insert and 2 on update with change
log.debug(result_update.lastrowid) # returns last row ID on insert and update with a change and returns 0 if nothing changed
return False
return False
# ### END ### Core Help CRUD ### sql_update() ###
# ### BEGIN ### Core Help CRUD ### sql_insert_or_update() ###
# The catch all SQL INSERT or UPDATE function - STI 2021-02-17
# This one does it all for SQL INSERT and UPDATE queries
# Updated 2021-09-07
@logger_reset
def sql_insert_or_update(
sql: str|None = None,
data: dict|None = None,
table_name: str|None = None,
rm_id_random: bool = False,
id_random_length: int|None = None,
):
log.setLevel(logging.INFO) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
#if sql: pass
#else:
#log.error('SQL text is missing')
#return False
if sql:
sql_insert_or_update = text(sql)
elif table_name and data:
if rm_id_random:
data = lookup_id_random_pop(obj_data=data)
pass
if not data.get('id_random', None) and id_random_length:
data['id_random'] = secrets.token_urlsafe(id_random_length)
fields = []
values = []
for key, value in data.items():
if key != 'id': # A special exception for the id auto increment field.
fields.append('`'+str(key)+'`')
values.append(':'+str(key))
fields_string = ', '.join(fields)
values_string = ', '.join(values)
field_list = []
for key, value in data.items():
if key != 'id': # Creating a special exception for the id field.
field_list.append('`'+str(key) + '` = :' + str(key))
set_values_string = ', '.join(field_list)
sql_insert_or_update = text(f"""
INSERT INTO `{table_name}` ({fields_string}) VALUES ({values_string})
ON DUPLICATE KEY UPDATE
{set_values_string}
;
""")
log.setLevel(logging.DEBUG)
log.debug(f"""
INSERT INTO `{table_name}` ({fields_string}) VALUES ({values_string})
ON DUPLICATE KEY UPDATE
{set_values_string}
;
""")
trans = db.begin()
try:
log.debug(data)
result_insert_or_update = db.execute(sql_insert_or_update, data)
trans.commit()
except IntegrityError as e: # Specifically want to capture duplicate entry attempts
# http://sqlalche.me/e/14/gkpj
# Need a check for this: sqlalchemy.exc.IntegrityError: (MySQLdb._exceptions.IntegrityError) (1062, "Duplicate entry 'z-yyyy-xxxx-wwww for key 'PRIMARY'"
trans.rollback()
log.error('An integrity error exception happened. This is likely because there was an attempt to create a duplicate entry. Returning None')
log.exception('**** *** ** * ### BEGIN ### Integrity Error Exception Happened: Returning None * ** *** ****')
log.error('^^^^ ^^^ ^^ ^ ### END ### Integrity Error Exception ^ ^^ ^^^ ^^^^')
log.error('**** *** ** * ### BEGIN ### Integrity Error Exception Details: * ** *** ****')
log.error('**** *** ** * SQL Statement: * ** *** ****')
log.error(e.statement)
log.error('**** *** ** * SQL Parameters: * ** *** ****')
log.error(e.params)
log.error('**** *** ** * SQL Origin Message: * ** *** ****')
log.error(e.orig)
log.error('**** *** ** * ### END ### Integrity Error Exception Details: * ** *** ****')
return None
except OperationalError as e: # Likely an unknown field or related
trans.rollback()
log.error('An operational error exception happened. This is likely because there was an unknown field or similar included. Returning False')
log.exception('**** *** ** * ### BEGIN ### Operational Error Exception Happened: Returning False * ** *** ****')
log.error('^^^^ ^^^ ^^ ^ ### END ### Operational Error Exception ^ ^^ ^^^ ^^^^')
log.error('**** *** ** * ### BEGIN ### Operational Error Exception Details: * ** *** ****')
log.error('**** *** ** * SQL Statement: * ** *** ****')
log.error(e.statement)
log.error('**** *** ** * SQL Parameters: * ** *** ****')
log.error(e.params)
log.error('**** *** ** * SQL Origin Message: * ** *** ****')
log.error(e.orig)
log.error('**** *** ** * ### END ### Operational Error Exception Details: * ** *** ****')
return False
except Exception as e:
trans.rollback()
log.exception('**** *** ** * ### BEGIN ### Exception Happened: Returning False * ** *** ****')
log.error('^^^^ ^^^ ^^ ^ ### END ### Exception ^ ^^ ^^^ ^^^^')
log.error('**** *** ** * ### BEGIN ### Exception Details: * ** *** ****')
log.error('**** *** ** * SQL Statement: * ** *** ****')
log.error(e.statement)
log.error('**** *** ** * SQL Parameters: * ** *** ****')
log.error(e.params)
log.error('**** *** ** * SQL Origin Message: * ** *** ****')
log.error(e.orig)
log.error('**** *** ** * ### END ### Exception Details: * ** *** ****')
return False
else:
log.debug(result_insert_or_update)
log.debug(f'rowcount = {result_insert_or_update.rowcount}; lastrowid = {result_insert_or_update.lastrowid}')
if result_insert_or_update.rowcount == 1 and result_insert_or_update.lastrowid > 0: # insert
record_id = result_insert_or_update.lastrowid
log.info(f'Insert record: {record_id}')
return record_id
elif result_insert_or_update.rowcount == 1 and result_insert_or_update.lastrowid == 0: # update with no change
log.info('Update record with no change')
return True
elif result_insert_or_update.rowcount == 2 and result_insert_or_update.lastrowid > 0: # update with change
record_id = result_insert_or_update.lastrowid
log.info(f'Update record with changes: {record_id}')
return record_id
else:
log.debug(result_insert_or_update)
log.debug(vars(result_insert_or_update))
log.debug(dir(result_insert_or_update))
log.debug(result_insert_or_update.rowcount) # returns 1 on insert and 2 on update with change
log.debug(result_insert_or_update.lastrowid) # returns last row ID on insert and update with a change and returns 0 if nothing changed
return False
return False
# ### END ### Core Help CRUD ### sql_insert_or_update() ###
# ### BEGIN ### Core Help CRUD ### sql_select() ###
# The catch all SQL SELECT function - STI 2021-02-17
# This one does it all for SQL SELECT queries
# Updated 2021-09-07
@logger_reset
def sql_select(
table_name: str|None = None,
record_id: int|None = None,
record_id_random: str|None = None,
field_name: str|None = None,
field_value = None,
sql: str|None = None,
data: dict|None = None,
rm_id_random: bool = False,
as_dict: bool|None = True,
as_list: bool|None = False,
max_count: int = 100000,
) -> None|bool|dict|list:
log.setLevel(logging.INFO) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
if table_name and not (record_id or record_id_random or field_name or field_value or sql or data):
# Select all records from a table
log.info('Select all records from a table')
sql = text(
f"""
SELECT *
FROM `{table_name}`
;
"""
)
elif table_name and (record_id or record_id_random) and not (field_name or field_value or sql or data):
# Select all records from a table with an ID (auto or random)
log.info('Select all records from a table with an ID (auto or random)')
data = {}
if record_id:
data['record_id'] = record_id
sql = text(
f"""
SELECT *
FROM `{table_name}`
WHERE `{table_name}`.id = :record_id
;
"""
)
elif record_id_random:
data['record_id_random'] = record_id_random
sql = text(
f"""
SELECT *
FROM `{table_name}`
WHERE `{table_name}`.id_random = :record_id_random
;
"""
)
elif table_name and field_name and field_value and not (record_id or record_id_random or sql or data):
# Select all records from a table with a specific field and field value
log.info('Select all records from a table with a specific field and field value')
data = {}
data[field_name] = field_value
sql = text(
f"""
SELECT *
FROM `{table_name}`
WHERE `{table_name}`.{field_name} = :{field_name}
;
"""
)
elif table_name and data and not (record_id or record_id_random or field_name or field_value or sql):
# Select all records from a table with a specific list of fields and field values (list of dicts)
log.info('Select all records from a table with a specific list of fields and field values (list of dicts)')
if rm_id_random:
data = lookup_id_random_pop(obj_data=data)
log.debug(data)
sql_where = []
for field_name in data:
log.debug(field_name)
sql_where_line = f"""`{table_name}`.{field_name} = :{field_name}"""
sql_where.append(sql_where_line)
sql_where_string = ' AND '.join(sql_where)
log.debug(sql_where_string)
sql = text(
f"""
SELECT *
FROM `{table_name}`
WHERE {sql_where_string}
;
"""
)
elif sql and not (table_name or record_id or record_id_random or field_name or field_value or data):
# Select records based on the SQL statement given
log.info('Select records based on the SQL statement given')
sql = text(sql)
elif sql and data and not (table_name or record_id or record_id_random or field_name or field_value):
# Select records based on the SQL statement given and with the matching data dict fields and values
if rm_id_random:
data = lookup_id_random_pop(obj_data=data)
log.info('Select records based on the SQL statement given and with the matching data dict fields and values')
sql = text(sql)
else:
# Nothing matched the expected combination of parameters passed to this function
log.warning('Nothing matched the expected combination of parameters passed to this function')
return False # Not successful
#log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug('*** ** * ** ***')
log.debug(sql)
log.debug('*** ** * ** ***')
log.debug(data)
log.debug('*** ** * ** ***')
log.debug(vars(sql))
log.debug('*** ** * ** ***')
log.debug(dir(sql))
log.debug('*** ** * ** ***')
try:
# https://docs.sqlalchemy.org/en/13/core/tutorial.html#using-textual-sql
# https://docs.sqlalchemy.org/en/13/core/sqlelement.html#sqlalchemy.sql.expression.TextClause.columns
# https://docs.sqlalchemy.org/en/13/core/type_basics.html
sql = sql.columns(recurring_start_time=Time, recurring_end_time=Time)
if data:
log.info('Executing with SQL statement and data...')
result = db.execute(sql, data)
else:
log.info('Executing with SQL statement only...')
result = db.execute(sql)
except OperationalError as e:
log.error('An operational error exception happended. This is likely a "MySQL server has gone away" error. Going to try again...')
log.exception('**** *** ** * ### BEGIN ### Operational Exception Happened: Trying again... * ** *** ****')
log.error('^^^^ ^^^ ^^ ^ ### END ### Operational Exception ^ ^^ ^^^ ^^^^')
log.info('Trying to recreate the pool...')
log.debug('############## ############')
log.debug(dir(db))
log.debug(vars(db))
log.debug('############## ############')
log.debug(dir(db.engine))
log.debug(vars(db.engine))
log.debug('############## ############')
log.debug(dir(db.engine.pool))
log.debug(vars(db.engine.pool))
log.debug('############## ############')
db.engine.dispose()
log.info('Now trying the query again...')
try:
if data:
log.info('2x Executing with SQL statement and data...')
result = db.execute(sql, data)
else:
log.info('2x Executing with SQL statement only...')
result = db.execute(sql)
except Exception as e:
log.error('Tried again an exception was raised again. Not going to try again.')
log.exception('**** *** ** * ### BEGIN ### (2x) Second Exception Happened: Returning False * ** *** ****')
log.error('^^^^ ^^^ ^^ ^ ### END ### Exception ^ ^^ ^^^ ^^^^')
return False # Not successful
else:
log.info('Successfully executed the SQL on the second try.')
pass
except Exception as e:
log.error('An unknown exception happened. Returning False.')
log.exception('**** *** ** * ### BEGIN ### Exception Happened: Returning False * ** *** ****')
log.error('^^^^ ^^^ ^^ ^ ### END ### Exception ^ ^^ ^^^ ^^^^')
log.error('**** *** ** * ### BEGIN ### Exception Details: * ** *** ****')
log.error('**** *** ** * SQL Statement: * ** *** ****')
log.error(e.statement)
log.error('**** *** ** * SQL Parameters: * ** *** ****')
log.error(e.params)
log.error('**** *** ** * SQL Origin Message: * ** *** ****')
log.error(e.orig)
log.error('**** *** ** * ### END ### Exception Details: * ** *** ****')
return False # Not successful
else:
log.debug('Successfully executed the SQL on the first try.')
pass
log.debug(f'Row count: {result.rowcount}')
#log.debug(vars(result))
#log.debug(dir(result))
if result.rowcount == 1:
log.info(f'Found one record. as_dict={as_dict}, as_list={as_list}')
if as_dict:
# After testing, this method is the fastest way to convert to a dict - STI 2021-03-09
# my custom sql_result_proxy_to_dict_simple(result_proxy=result.first()) is slower
record = dict(result.first())
else:
record = result.first()
if as_list:
record_li = []
record_li.append(record)
#log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(record_li)
return record_li # Successful
else:
#log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(record)
return record # Successful
elif result.rowcount > 1:
log.info(f'Found {result.rowcount} records. as_dict={as_dict}, as_list={as_list}')
#log.info('Found more than one record. Returning as a list of dicts.')
if as_dict:
# After testing, this method is the fastest way to convert to a list of dicts - STI 2021-03-09
# list(result) was tested and seems to be the slowest
# my custom sql_result_proxy_to_dict_simple(result_proxy=result.fetchall()) was tested and is only slightly faster than list(result)
#timer_1_start = timer()
record_li = [dict(record) for record in result.fetchall()]
#log.debug(record_li)
#log.debug(type(record_li))
#log.debug(type(record_li[0]))
#timer_1_end = timer()
#log.debug( round((timer_1_end - timer_1_start), 8) )
else:
record_li = result.fetchall()
log.debug(record_li)
return record_li # Successful
else:
if as_list:
# log.info('No records found. Returning None since the list is an empty list.')
log.info('No records found. Returning an empty list.')
log.debug(result)
return [] # Successful
else:
log.info('No records found. Returning None.')
log.debug(result)
return None # Successful
# ### END ### Core Help CRUD ### sql_select() ###
# ### BEGIN ### Core Help CRUD ### sql_delete() ###
# The catch all SQL DELETE function - STI 2021-02-17
# This one does it all for SQL DELETE queries
# Updated 2021-09-07
@logger_reset
def sql_delete(
table_name: str|None = None,
record_id: int|None = None,
record_id_random: str|None = None,
field_name: str|None = None,
field_value = None,
sql: str|None = None,
data: dict|None = None
):
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
if table_name and (record_id or record_id_random) and not (field_name or field_value or sql or data):
# Delete all records from a table with an ID (auto or random)
log.info('Delete all records from a table with an ID (auto or random)')
data = {}
if record_id:
data['record_id'] = record_id
sql = text(
f"""
DELETE FROM `{table_name}`
WHERE `{table_name}`.id = :record_id
"""
)
elif record_id_random:
data['record_id_random'] = record_id_random
sql = text(
f"""
DELETE FROM `{table_name}`
WHERE `{table_name}`.id_random = :record_id_random
"""
)
elif table_name and field_name and field_value and not (record_id or record_id_random or sql or data):
# Delete all records from a table with a specific field and field value
log.info('Delete all records from a table with a specific field and field value')
data = {}
data[field_name] = field_value
sql = text(
f"""
DELETE FROM `{table_name}`
WHERE `{table_name}`.{field_name} = :{field_name}
"""
)
elif table_name and data and not (record_id or record_id_random or field_name or field_value or sql):
# Delete all records from a table with a specific list of fields and field values (list of dicts)
log.info('Delete all records from a table with a specific list of fields and field values (list of dicts)')
sql_where = []
for field_name in data:
sql_where_line = f"""`{table_name}`.{field_name} = :{field_name}"""
sql_where.append(sql_where_line)
sql_where_string = ' AND '.join(sql_where)
log.debug(sql_where_string)
sql = text(
f"""
DELETE FROM `{table_name}`
WHERE {sql_where_string}
"""
)
elif sql and not (table_name or record_id or record_id_random or field_name or field_value or data):
# Delete records based on the SQL statement given
log.info('Delete records based on the SQL statement given')
sql = text(sql)
elif sql and data and not (table_name or record_id or record_id_random or field_name or field_value):
# Delete records based on the SQL statement given and with the matching data dict fields and values
log.info('Delete records based on the SQL statement given and with the matching data dict fields and values')
sql = text(sql)
else:
# Nothing matched the expected combination of parameters passed to this function
log.warning('Nothing matched the expected combination of parameters passed to this function')
return False # Not successful
log.debug(sql)
# return False
try:
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
if data:
log.info('Executing with SQL (DELETE) statement and data...')
result = db.execute(sql, data)
else:
log.info('Executing with SQL (DELETE?) statement only...')
result = db.execute(sql)
log.debug(result)
log.debug(dir(result))
log.debug(vars(result))
except OperationalError as e:
log.error('An operational error exception happended. This is likely a "MySQL server has gone away" error. Going to try again...')
log.exception('**** *** ** * ### BEGIN ### Operational Exception Happened: Trying again... * ** *** ****')
log.error('^^^^ ^^^ ^^ ^ ### END ### Operational Exception ^ ^^ ^^^ ^^^^')
log.info('Trying to recreate the pool...')
log.debug('############## ############')
log.debug(dir(db))
log.debug(vars(db))
log.debug('############## ############')
log.debug(dir(db.engine))
log.debug(vars(db.engine))
log.debug('############## ############')
log.debug(dir(db.engine.pool))
log.debug(vars(db.engine.pool))
log.debug('############## ############')
db.engine.dispose()
log.info('Now trying the query again...')
try:
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
if data:
log.info('2x Executing with SQL statement and data...')
result = db.execute(sql, data)
else:
log.info('2x Executing with SQL statement only...')
result = db.execute(sql)
log.debug(result)
except Exception as e:
log.error('Tried again an exception was raised again. Not going to try again.')
log.exception('**** *** ** * ### BEGIN ### (2x) Second Exception Happened: Returning False * ** *** ****')
log.error('^^^^ ^^^ ^^ ^ ### END ### Exception ^ ^^ ^^^ ^^^^')
return False # Not successful
else:
log.info('Successfully executed the SQL on the second try.')
pass
except Exception as e:
log.error('An unknown exception happened. Returning False.')
log.exception('**** *** ** * ### BEGIN ### Exception Happened: Returning False * ** *** ****')
log.error('^^^^ ^^^ ^^ ^ ### END ### Exception ^ ^^ ^^^ ^^^^')
log.error('**** *** ** * ### BEGIN ### Exception Details: * ** *** ****')
log.error('**** *** ** * SQL Statement: * ** *** ****')
log.error(e.statement)
log.error('**** *** ** * SQL Parameters: * ** *** ****')
log.error(e.params)
log.error('**** *** ** * SQL Origin Message: * ** *** ****')
log.error(e.orig)
log.error('**** *** ** * ### END ### Exception Details: * ** *** ****')
return False # Not successful
else:
log.info('Successfully executed the SQL on the first try.')
pass
# NOTE: Need to deal with 0 rows affected when the WHERE clause was not satisfied and there was no error.
return True # Successful
# ### END ### Core Help CRUD ### sql_delete() ###
# ### BEGIN ### API DB SQL ### redis_lookup_id_random() ###
# Just return the value if it is an integer
# Check if the id_random value is a string and the correct length
# Attempt to look up id_random key in Redis
# If success then return the ID number
# If not success and there is a table_name then check the database table passed
# If found in database table then store in Redis and return the ID number
@logger_reset
def redis_lookup_id_random(
record_id_random: int|str,
table_name: str,
check_int_id: bool = False,
) -> str|int|bool|None:
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
if isinstance(record_id_random, str) and len(record_id_random) >= 11 and len(record_id_random) <= 22: pass
elif isinstance(record_id_random, int):
record_id = record_id_random
if check_int_id:
log.info(f'Checking the int ID if exists. Table Name: {table_name} ID: {record_id}')
if get_id_random_result := get_id_random(
record_id = record_id,
table_name = table_name,
):
log.info(f'The int ID exists. Returning the int ID. ID Random: {get_id_random_result}')
return record_id
else:
log.info(f'The int ID does not exists. Returning False. Table Name: {table_name} ID: {record_id}')
return False
else:
log.info(f'Not checking if the int ID exists. Returning the int ID. ID: {record_id}')
return record_id
elif record_id_random is None:
log.info(f'No record ID was passed. Returning None')
return None
else:
log.warning(f'Unexpected data type or string format: {str(type(record_id_random))} Expected type is a string 11 or 22 characters long.')
return False
if record_id_random and table_name:
# WARNING: The record_id_random string length should be checked just in case?
if len(record_id_random) < 11:
log.warning(f'The length of id_random is too short: {str(record_id_random)} ({len(record_id_random)} chars)')
return False
elif len(record_id_random) > 22:
log.warning(f'The length of id_random is too long {str(record_id_random)} ({len(record_id_random)} chars)')
return False
else:
pass
elif record_id_random:
log.warning('Missing id_random')
return False
elif table_name:
log.warning('Missing table_name to select from for id_random')
return False
else:
log.warning('Missing table_name and record_id_random')
return False
r = redis.Redis(host='localhost', port=6379, db=7, password=None, decode_responses=True)
key_name = 'record_id:'+record_id_random
record_id = r.get(key_name)
log.debug(f'Record ID? {str(record_id)}')
if record_id:
log.info('The record ID was found using the record_id_random value.')
log.info(f'TTL for: {key_name} : {str(record_id)} is {str(r.ttl(key_name))} seconds')
return int(record_id)
elif table_name:
data = { 'id_random': record_id_random }
sql = f"""
SELECT id
FROM `{table_name}` AS `table`
WHERE `table`.id_random = :id_random;
"""
if select_results := sql_select(sql=sql, data=data):
log.debug(select_results)
log.debug(type(select_results))
if isinstance(select_results, dict):
log.info(f"""Record ID random found: {str(select_results['id'])}""")
if record_id := select_results.get('id'):
r.setex(key_name, datetime.timedelta(minutes=90), value=record_id)
return int(record_id)
else:
log.error('The SQL result was not what was expected.')
return False
else:
log.error('More than one record may have been found. There may be a duplicate id_random.')
log.error(select_results)
return False
else:
log.info('Record ID random was not found')
return None
log.error('We should not be here. Something unexpected happened.')
return False # Just in case
# ### END ### API DB SQL ### redis_lookup_id_random() ###
# ### BEGIN ### API DB SQL ### get_id_random() ###
# Updated 2022-01-06
@logger_reset
def get_id_random(
record_id: int,
table_name: str
) -> str|bool|None:
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
data = { 'id': record_id }
sql = f"""
SELECT id_random
FROM `{table_name}` AS `table`
WHERE `table`.id = :id;
"""
if select_results := sql_select(sql=sql, data=data):
log.debug(select_results)
log.debug(type(select_results))
if isinstance(select_results, dict):
# log.info(f"""Record ID found: {select_results['id_random']}""") # DOES UNCOMMENTING THIS BREAK STUFF???
if record_id_random := select_results.get('id_random'):
return str(record_id_random)
else:
log.error('The SQL result was not what was expected.')
return False
elif isinstance(select_results, list):
log.exception('More than one record may have been found. There may be a duplicate id. This should never happen.')
log.error(select_results)
return False
else:
log.exception(f'Got an unexpected result while trying to look up the ID. Is the table name correct? {table_name} Is the record ID valid? {record_id}')
log.error(select_results)
return False
elif select_results is None:
log.warning(f'No results with: Table Name: {table_name} ID: {record_id}')
log.debug(select_results)
return None
else: # False or something else not True
log.error(f'Something went wrong while trying to look up the ID. Is the table name correct? {table_name} Is the record ID valid? {record_id}')
log.error(select_results)
return False
# ### END ### API DB SQL ### get_id_random() ###
# ### BEGIN ### API DB SQL ### lookup_id_random_pop() ###
# Look up and resolve id_random values to their id
# Remove the unneeded *_id_random key from the dict
# This really needs to be simplified... Use a list of dicts instead. Can store as JSON in the DB.
@logger_reset
def lookup_id_random_pop(
obj_data: dict
):
log.setLevel(logging.INFO) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
if 'account_id_random' in obj_data:
obj_data['account_id'] = redis_lookup_id_random(record_id_random=obj_data['account_id_random'], table_name='account')
obj_data.pop('account_id_random')
if 'address_id_random' in obj_data:
obj_data['address_id'] = redis_lookup_id_random(record_id_random=obj_data['address_id_random'], table_name='address')
obj_data.pop('address_id_random')
if 'address_location_id_random' in obj_data:
obj_data['address_location_id'] = redis_lookup_id_random(record_id_random=obj_data['address_location_id_random'], table_name='address')
obj_data.pop('address_location_id_random')
if 'archive_id_random' in obj_data:
obj_data['archive_id'] = redis_lookup_id_random(record_id_random=obj_data['archive_id_random'], table_name='archive')
obj_data.pop('archive_id_random')
if 'contact_id_random' in obj_data:
obj_data['contact_id'] = redis_lookup_id_random(record_id_random=obj_data['contact_id_random'], table_name='contact')
obj_data.pop('contact_id_random')
if 'contact_1_id_random' in obj_data:
obj_data['contact_1_id'] = redis_lookup_id_random(record_id_random=obj_data['contact_1_id_random'], table_name='contact')
obj_data.pop('contact_1_id_random')
if 'contact_2_id_random' in obj_data:
obj_data['contact_2_id'] = redis_lookup_id_random(record_id_random=obj_data['contact_2_id_random'], table_name='contact')
obj_data.pop('contact_2_id_random')
if 'cont_edu_cert_id_random' in obj_data:
obj_data['cont_edu_cert_id'] = redis_lookup_id_random(record_id_random=obj_data['cont_edu_cert_id_random'], table_name='cont_edu_cert')
obj_data.pop('cont_edu_cert_id_random')
if 'cont_edu_cert_person_id_random' in obj_data:
obj_data['cont_edu_cert_person_id'] = redis_lookup_id_random(record_id_random=obj_data['cont_edu_cert_person_id_random'], table_name='cont_edu_cert_person')
obj_data.pop('cont_edu_cert_person_id_random')
if 'event_id_random' in obj_data:
obj_data['event_id'] = redis_lookup_id_random(record_id_random=obj_data.get('event_id_random', None), table_name='event')
obj_data.pop('event_id_random')
if 'event_abstract_id_random' in obj_data:
obj_data['event_abstract_id'] = redis_lookup_id_random(record_id_random=obj_data.get('event_abstract_id_random', None), table_name='event_abstract')
obj_data.pop('event_abstract_id_random')
if 'event_badge_id_random' in obj_data:
obj_data['event_badge_id'] = redis_lookup_id_random(record_id_random=obj_data.get('event_badge_id_random', None), table_name='event_badge')
obj_data.pop('event_badge_id_random')
if 'event_exhibit_id_random' in obj_data:
obj_data['event_exhibit_id'] = redis_lookup_id_random(record_id_random=obj_data.get('event_exhibit_id_random', None), table_name='event_exhibit')
obj_data.pop('event_exhibit_id_random')
if 'event_file_id_random' in obj_data:
obj_data['event_file_id'] = redis_lookup_id_random(record_id_random=obj_data.get('event_file_id_random', None), table_name='event_file')
obj_data.pop('event_file_id_random')
if 'event_location_id_random' in obj_data:
obj_data['event_location_id'] = redis_lookup_id_random(record_id_random=obj_data.get('event_location_id_random', None), table_name='event_location')
obj_data.pop('event_location_id_random')
if 'event_person_id_random' in obj_data:
obj_data['event_person_id'] = redis_lookup_id_random(record_id_random=obj_data.get('event_person_id_random', None), table_name='event_person')
obj_data.pop('event_person_id_random')
if 'event_presentation_id_random' in obj_data:
obj_data['event_presentation_id'] = redis_lookup_id_random(record_id_random=obj_data.get('event_presentation_id_random', None), table_name='event_presentation')
obj_data.pop('event_presentation_id_random')
if 'event_presenter_id_random' in obj_data:
obj_data['event_presenter_id'] = redis_lookup_id_random(record_id_random=obj_data.get('event_presenter_id_random', None), table_name='event_presenter')
obj_data.pop('event_presenter_id_random')
if 'event_registration_id_random' in obj_data:
obj_data['event_registration_id'] = redis_lookup_id_random(record_id_random=obj_data.get('event_registration_id_random', None), table_name='event_registration')
obj_data.pop('event_registration_id_random')
if 'event_session_id_random' in obj_data:
obj_data['event_session_id'] = redis_lookup_id_random(record_id_random=obj_data.get('event_session_id_random', None), table_name='event_session')
obj_data.pop('event_session_id_random')
if 'event_track_id_random' in obj_data:
obj_data['event_track_id'] = redis_lookup_id_random(record_id_random=obj_data.get('event_track_id_random', None), table_name='event_track')
obj_data.pop('event_track_id_random')
if 'hosted_file_id_random' in obj_data:
obj_data['hosted_file_id'] = redis_lookup_id_random(record_id_random=obj_data.get('hosted_file_id_random', None), table_name='hosted_file')
obj_data.pop('hosted_file_id_random')
if 'journal_id_random' in obj_data:
obj_data['journal_id'] = redis_lookup_id_random(record_id_random=obj_data.get('journal_id_random', None), table_name='journal')
obj_data.pop('journal_id_random')
if 'journal_entry_id_random' in obj_data:
obj_data['journal_entry_id'] = redis_lookup_id_random(record_id_random=obj_data.get('journal_entry_id_random', None), table_name='journal_entry')
obj_data.pop('journal_entry_id_random')
if 'membership_group_id_random' in obj_data:
obj_data['membership_group_id'] = redis_lookup_id_random(record_id_random=obj_data.get('membership_group_id_random', None), table_name='membership_group')
obj_data.pop('membership_group_id_random')
if 'membership_person_group_id_random' in obj_data:
obj_data['membership_person_group_id'] = redis_lookup_id_random(record_id_random=obj_data.get('membership_person_group_id_random', None), table_name='membership_person_group')
obj_data.pop('membership_person_group_id_random')
if 'membership_person_id_random' in obj_data:
obj_data['membership_person_id'] = redis_lookup_id_random(record_id_random=obj_data.get('membership_person_id_random', None), table_name='membership_person')
obj_data.pop('membership_person_id_random')
if 'membership_type_id_random' in obj_data:
obj_data['membership_type_id'] = redis_lookup_id_random(record_id_random=obj_data.get('membership_type_id_random', None), table_name='membership_type')
obj_data.pop('membership_type_id_random')
if 'membership_person_type_id_random' in obj_data:
obj_data['membership_person_type_id'] = redis_lookup_id_random(record_id_random=obj_data.get('membership_person_type_id_random', None), table_name='membership_person_type')
obj_data.pop('membership_person_type_id_random')
if 'order_id_random' in obj_data:
obj_data['order_id'] = redis_lookup_id_random(record_id_random=obj_data.get('order_id_random', None), table_name='order')
obj_data.pop('order_id_random')
if 'order_line_id_random' in obj_data:
obj_data['order_line_id'] = redis_lookup_id_random(record_id_random=obj_data.get('order_line_id_random', None), table_name='order_line')
obj_data.pop('order_line_id_random')
if 'order_cart_id_random' in obj_data:
obj_data['order_cart_id'] = redis_lookup_id_random(record_id_random=obj_data.get('order_cart_id_random', None), table_name='order_cart')
obj_data.pop('order_cart_id_random')
if 'order_cart_line_id_random' in obj_data:
obj_data['order_cart_line_id'] = redis_lookup_id_random(record_id_random=obj_data.get('order_cart_line_id_random', None), table_name='order_cart_line')
obj_data.pop('order_cart_line_id_random')
if 'organization_id_random' in obj_data:
obj_data['organization_id'] = redis_lookup_id_random(record_id_random=obj_data.get('organization_id_random', None), table_name='organization')
obj_data.pop('organization_id_random')
if 'page_id_random' in obj_data:
obj_data['page_id'] = redis_lookup_id_random(record_id_random=obj_data['page_id_random'], table_name='page')
obj_data.pop('page_id_random')
if 'person_id_random' in obj_data:
obj_data['person_id'] = redis_lookup_id_random(record_id_random=obj_data['person_id_random'], table_name='person')
obj_data.pop('person_id_random')
if 'poc_event_person_id_random' in obj_data:
obj_data['poc_event_person_id'] = redis_lookup_id_random(record_id_random=obj_data['poc_event_person_id_random'], table_name='event_person')
obj_data.pop('poc_event_person_id_random')
if 'poc_person_id_random' in obj_data:
obj_data['poc_person_id'] = redis_lookup_id_random(record_id_random=obj_data['poc_person_id_random'], table_name='person')
obj_data.pop('poc_person_id_random')
if 'post_id_random' in obj_data:
obj_data['post_id'] = redis_lookup_id_random(record_id_random=obj_data.get('post_id_random', None), table_name='post')
obj_data.pop('post_id_random')
if 'product_id_random' in obj_data:
obj_data['product_id'] = redis_lookup_id_random(record_id_random=obj_data['product_id_random'], table_name='product')
obj_data.pop('product_id_random')
if 'site_id_random' in obj_data:
obj_data['site_id'] = redis_lookup_id_random(record_id_random=obj_data['site_id_random'], table_name='site')
obj_data.pop('site_id_random')
if 'user_id_random' in obj_data:
obj_data['user_id'] = redis_lookup_id_random(record_id_random=obj_data['user_id_random'], table_name='user')
obj_data.pop('user_id_random')
if 'for_type' in obj_data and 'for_id_random' in obj_data:
obj_data['for_id'] = redis_lookup_id_random(record_id_random=obj_data.get('for_id_random', None), table_name=obj_data.get('for_type', None))
obj_data.pop('for_id_random')
#log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(obj_data)
elif 'for_id_random' in obj_data:
# In case for_id_random was passed without for_type
log.warn('for_id_random was passed without for_type')
obj_data.pop('for_id_random')
if 'link_to_type' in obj_data and 'link_to_id_random' in obj_data:
obj_data['link_to_id'] = redis_lookup_id_random(record_id_random=obj_data.get('link_to_id_random', None), table_name=obj_data.get('link_to_type', None))
obj_data.pop('link_to_id_random')
#log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(obj_data)
elif 'link_to_id_random' in obj_data:
# In case link_to_id_random was passed without link_to_type
log.warn('link_to_id_random was passed without link_to_type')
obj_data.pop('link_to_id_random')
if 'object_type' in obj_data and 'object_id_random' in obj_data:
obj_data['object_id'] = redis_lookup_id_random(record_id_random=obj_data.get('object_id_random', None), table_name=obj_data.get('object_type', None))
obj_data.pop('object_id_random')
#log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(obj_data)
elif 'object_id_random' in obj_data:
# In case object_id_random was passed without object_type
log.warn('object_id_random was passed without object_type')
obj_data.pop('object_id_random')
if 'to_object_type' in obj_data and 'to_object_id_random' in obj_data:
obj_data['to_object_id'] = redis_lookup_id_random(record_id_random=obj_data.get('to_object_id_random', None), table_name=obj_data.get('to_object_type', None))
obj_data.pop('to_object_id_random')
#log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(obj_data)
elif 'to_object_id_random' in obj_data:
# In case to_object_id_random was passed without to_object_type
log.warn('to_object_id_random was passed without to_object_type')
obj_data.pop('to_object_id_random')
if 'from_object_type' in obj_data and 'from_object_id_random' in obj_data:
obj_data['from_object_id'] = redis_lookup_id_random(record_id_random=obj_data.get('from_object_id_random', None), table_name=obj_data.get('from_object_type', None))
obj_data.pop('from_object_id_random')
#log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(obj_data)
elif 'from_object_id_random' in obj_data:
# In case from_object_id_random was passed without from_object_type
log.warn('from_object_id_random was passed without from_object_type')
obj_data.pop('from_object_id_random')
return obj_data
# ### END ### API DB SQL ### lookup_id_random_pop() ###
# ### BEGIN ### API DB SQL Methods ### get_account_id_w_for_type_id() ###
# NOTE: This is only useful for a few tables that have account_id as a field or views that have it included.
# address, contact, event, people, user
# Updated 2022-01-07
@logger_reset
def get_account_id_w_for_type_id(
for_type: str, # This is the table name
for_id: int|str,
) -> bool|int|None:
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
if for_id := redis_lookup_id_random(record_id_random=for_id, table_name=for_type): pass
else: return False
data = {}
data['for_type'] = for_type
data['for_id'] = for_id
sql = f"""
SELECT `tbl`.id AS 'for_id', `tbl`.id_random AS 'for_id_random', `tbl`.account_id AS account_id
FROM `{for_type}` AS `tbl`
WHERE `tbl`.id = :for_id
LIMIT 1;
"""
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
if for_data_result := sql_select(data=data, sql=sql):
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(for_data_result)
if account_id := for_data_result.get('account_id', None): return account_id
else: return False
else: return None
# ### END ### API DB SQL Methods ### get_account_id_w_for_type_id() ###
# ### BEGIN ### API DB SQL Methods ### sql_enable_part() ###
# Updated 2022-01-17
@logger_reset
def sql_enable_part(table_name: str, enabled: str) -> bool|dict:
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
if not table_name: return False
if enabled in ['enabled', 'disabled', 'all']:
log.info(f'Creating partial SQL string for "enabled" check. Enabled: {enabled}')
if enabled == 'enabled':
# sql = f'AND `person`.enable = :enable'
sql = f'AND `{table_name}`.enable = true'
enable = True
elif enabled == 'disabled':
# sql = f'AND `person`.enable = :enable'
sql = f'AND `{table_name}`.enable = false'
enable = False
elif enabled == 'all':
sql = f'AND (`{table_name}`.enable = true OR `{table_name}`.enable = false OR `{table_name}`.enable IS NULL)'
enable = None
log.debug(sql)
log.debug(enable)
# return result
return sql, enable
else:
return False
# ### END ### API DB SQL Methods ### sql_enable_part() ###
# ### BEGIN ### API DB SQL Methods ### sql_limit_offset_part() ###
# Updated 2022-01-17
@logger_reset
def sql_limit_offset_part(limit: int, offset: int = 0) -> bool|str:
log.setLevel(logging.INFO) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
if limit >= 0 and offset >= 0:
log.info(f'Creating partial SQL string for LIMIT and OFFSET. Limit: {limit}; Offset: {offset}')
sql = f'LIMIT {limit} OFFSET {offset}'
return sql
else:
return False
# ### END ### API DB SQL Methods ### sql_limit_offset_part() ###