import secrets from datetime import timedelta from app.config import settings from .log import * from sqlalchemy import create_engine, text from sqlalchemy.exc import IntegrityError, OperationalError #from sqlalchemy.ext.declarative import declarative_base #from sqlalchemy.orm import sessionmaker, session db_uri = settings.SQLALCHEMY_DATABASE_URI connection_string = db_uri engine = create_engine(name_or_url=connection_string, pool_size=10, pool_recycle=120, pool_pre_ping=True, echo=True, echo_pool=True, isolation_level='READ COMMITTED') # NOTE: The default isolation_level is 'REPEATABLE READ'. This can sometimes not show updated data. #SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) #Base = declarative_base() db = engine.connect() # Dependency #def get_db(): #db = SessionLocal() #try: #yield db #finally: #db.close() # Insert a new record with values given. def sql_insert(table_name=None, record=None, sql=None, data=None, id_random_length=None): print('** sql_insert() ***') if table_name and record: #record.pop('id') if id_random_length in [8, 16]: record['id_random'] = secrets.token_urlsafe(id_random_length) fields = [] values = [] for key, value in record.items(): #if key != 'id': # A special exception for the id auto increment field. fields.append('`'+str(key)+'`') values.append(':'+str(key)) fields_string = ', '.join(fields) values_string = ', '.join(values) sql_insert = text( """ INSERT INTO `"""+table_name+"""` ("""+fields_string+""") VALUES ("""+values_string+"""); """ ) elif table_name: sql_insert = text( """ INSERT INTO `"""+table_name+"""` () VALUES (); """ ) elif sql: sql_insert = text(sql) else: print('One or more required fields are missing') return False trans = db.begin() try: if record: result_insert = db.execute(sql_insert, record) else: result_insert = db.execute(sql_insert) trans.commit() except OperationalError as e: trans.rollback() print('*** An exception happened: OperationalError ***') print('* This is likely because a field that does not exist. *') print(repr(e)) print('***') print(str(e)) print('^^^ exception ^^^') return False except IntegrityError as e: trans.rollback() print('*** An exception happened: IntegrityError ***') print('* This is likely because of a duplicate entry for a primary or unique field. *') print(repr(e)) print('***') print(str(e)) print('^^^ exception ^^^') return True # NOTE: This is returning True even though there was an exception except Exception as e: trans.rollback() print('*** An exception happened: catch all ***') print(repr(e)) print('***') print(str(e)) print('^^^ exception ^^^') return False else: record_id = result_insert.lastrowid if record_id == 0: #print('******') #print(dir(result_insert)) #print('******') #print(vars(result_insert)) #print('******') return True else: return record_id # NOTE: Select records using custom SQL SELECT statements. def sql_select(sql=None, data=None, table_name=None, record_id=None, record_id_random=None, field_name=None, field_value=None, as_list=False): log.setLevel(logging.DEBUG) # DEBUG, INFO, WARN, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) custom_sql = None if record_id and table_name: sql = text( """ SELECT * FROM `"""+table_name+"""` WHERE `"""+table_name+"""`.id = :record_id """ ) elif record_id_random and table_name: sql = text( """ SELECT * FROM `"""+table_name+"""` WHERE `"""+table_name+"""`.id_random = :record_id_random """ ) elif field_name and field_value and table_name: sql = text( """ SELECT * FROM `"""+table_name+"""` WHERE `"""+table_name+"""`."""+field_name+""" = :field_value """ ) data = {} data[field_name] = field_value elif table_name: sql = text( """ SELECT * FROM `"""+table_name+"""` """ ) elif sql: log.info('SQL found') custom_sql = True sql = text(sql) else: log.warn('One or more required fields are missing') return False try: if not custom_sql: log.info('Executing a simple SQL select with no extra data dict...') result = db.execute(sql, record_id=record_id, record_id_random=record_id_random, table_name=table_name, field_name=field_name, field_value=field_value) elif custom_sql and data: log.info('Executing a custom SQL select and including the data dict...') result = db.execute(sql, data) elif custom_sql: log.info('Executing a custom SQL select with no extra data dict...') result = db.execute(sql) except Exception as e: log.error('*** An exception happened. ***') log.error(repr(e)) log.error('***') log.error(str(e)) log.error('^^^ exception ^^^') return False else: if result.rowcount == 1 and as_list: log.info('Found one record. Returning as a list.') record = dict(result.fetchone()) return [record] elif result.rowcount == 1 and not as_list: log.info('Found one record. Returning as a dict.') #record = result.fetchone() record = dict(result.fetchone()) return record elif result.rowcount > 1: log.info('Found more than one record. Returning as a list of dicts.') #records = result.fetchall() records = [dict(u) for u in result.fetchall()] return records elif as_list: log.info('No records found. Returning as a list.') return [None] else: log.info('No records found. Returning None.') return None