Files
OSIT-AE-API-FastAPI/app/db.py
2020-09-14 12:41:02 -04:00

172 lines
5.4 KiB
Python

from app.config import settings
from sqlalchemy import create_engine, text
from sqlalchemy.exc import IntegrityError, OperationalError
#from app import db
AMS_DB_SERVER = 'linode.oneskyit.com'
AMS_DB_PORT = '3306' # default = 3306
AMS_DB_NAME = 'aether_dev' #onesky_ams_dev
AMS_DB_USERNAME = 'onesky_aether'
AMS_DB_PASSWORD = '$onesky.Aether.2020'
connection_string = 'mysql://'+AMS_DB_USERNAME+':'+AMS_DB_PASSWORD+'@'+AMS_DB_SERVER+'/'+AMS_DB_NAME
engine = create_engine(name_or_url=connection_string, pool_size=10, pool_recycle=120, pool_pre_ping=True, echo=True, echo_pool=True, isolation_level='READ COMMITTED')
# NOTE: The default isolation_level is 'REPEATABLE READ'. This can sometimes not show updated data.
db = engine.connect()
# Insert a new record with values given.
def sql_insert(table_name=None, record=None, sql=None, data=None):
print('** sql_insert() ***')
if table_name and record:
fields = []
values = []
for key, value in record.items():
if key != 'id': # A special exception for the id auto increment field.
fields.append('`'+str(key)+'`')
values.append(':'+str(key))
fields_string = ', '.join(fields)
values_string = ', '.join(values)
sql_insert = text(
"""
INSERT INTO `"""+table_name+"""` ("""+fields_string+""") VALUES ("""+values_string+""");
"""
)
elif table_name:
sql_insert = text(
"""
INSERT INTO `"""+table_name+"""` () VALUES ();
"""
)
elif sql:
sql_insert = text(sql)
else:
print('One or more required fields are missing')
return False
trans = db.begin()
try:
if record:
result_insert = db.execute(sql_insert, record)
else:
result_insert = db.execute(sql_insert)
trans.commit()
except OperationalError as e:
trans.rollback()
print('*** An exception happened: OperationalError ***')
print('* This is likely because a field that does not exist. *')
print(repr(e))
print('***')
print(str(e))
print('^^^ exception ^^^')
return False
except IntegrityError as e:
trans.rollback()
print('*** An exception happened: IntegrityError ***')
print('* This is likely because of a duplicate entry for a primary or unique field. *')
print(repr(e))
print('***')
print(str(e))
print('^^^ exception ^^^')
return True # NOTE: This is returning True even though there was an exception
except Exception as e:
trans.rollback()
print('*** An exception happened: catch all ***')
print(repr(e))
print('***')
print(str(e))
print('^^^ exception ^^^')
return False
else:
record_id = result_insert.lastrowid
if record_id == 0:
#print('******')
#print(dir(result_insert))
#print('******')
#print(vars(result_insert))
#print('******')
return True
else:
return record_id
# NOTE: Select records using custom SQL SELECT statements.
def sql_select(sql=None, data=None, table_name=None, record_id=None, record_id_random=None, field_name=None, field_value=None, as_list=False):
print('*** sql_select() ***')
if record_id and table_name:
sql = text(
"""
SELECT *
FROM `"""+table_name+"""`
WHERE `"""+table_name+"""`.id = :record_id
"""
)
elif record_id_random and table_name:
sql = text(
"""
SELECT *
FROM `"""+table_name+"""`
WHERE `"""+table_name+"""`.id_random = :record_id_random
"""
)
elif field_name and field_value and table_name:
sql = text(
"""
SELECT *
FROM `"""+table_name+"""`
WHERE `"""+table_name+"""`."""+field_name+""" = :field_value
"""
)
elif table_name:
sql = text(
"""
SELECT *
FROM `"""+table_name+"""`
"""
)
elif sql:
print('SQL found')
sql = text(sql)
else:
print('One or more required fields are missing')
return False
try:
#if record_id or record_id_random:
#result = db.execute(sql, record_id=record_id, record_id_random=record_id_random)
#elif field_name and field_value:
#result = db.execute(sql, field_value=field_value)
#elif sql and data:
#result = db.execute(sql, data)
print('Executing SQL...')
result = db.execute(sql, data=data, record_id=record_id, record_id_random=record_id_random, table_name=table_name, field_name=field_name, field_value=field_value)
except Exception as e:
print('*** An exception happened. ***')
print(repr(e))
print('***')
print(str(e))
print('^^^ exception ^^^')
return False
else:
if result.rowcount == 1 and as_list:
print('Single as list')
record = result.fetchall()
return record
elif result.rowcount == 1 and not as_list:
print('Single as single')
record = result.fetchone()
return record
elif result.rowcount > 1:
print('List as list')
records = result.fetchall()
return records
else:
return False