Files
OSIT-AE-API-FastAPI/app/lib_general.py
2024-03-28 12:56:24 -04:00

611 lines
25 KiB
Python

# from __future__ import annotations
import datetime, html2text, jwt, os, pandas, pathlib, pytz, redis, time
from passlib.hash import argon2
# Import smtplib for the actual sending function
import smtplib, ssl
# Import the email package modules needed
from email.message import EmailMessage
from email.headerregistry import Address
from email.utils import make_msgid
from fastapi import APIRouter, Depends, Header, HTTPException, Response, status
from pydantic import BaseModel, EmailStr, Field
from typing import Dict, List, Optional, Set, Union
from app.log import log, logging, logger_reset
from app.config import settings
from app.db_sql import redis_lookup_id_random, sql_select
# ### BEGIN ### API Lib General ### async get_token_header() ###
def get_token_header(x_token: str = Header(...)):
if x_token != 'fake-super-secret-token':
raise HTTPException(status_code=400, detail='X-Token header invalid')
# ### END ### API Lib General ### async get_token_header() ###
# ### BEGIN ### API Lib General ### class Common_Route_Params ###
# Updated 2023-01-30
class Common_Route_Params_No_Account_ID:
def __init__(
self,
x_account_id: int|None = None,
x_account_id_random: str|None = None,
x_no_account_id_token: str|None = None,
enabled: str = 'enabled',
limit: int = 10,
offset: int = 0,
by_alias: bool = True,
exclude_unset: bool = False,
response = None,
):
self.x_account_id = x_account_id
self.x_account_id_random = x_account_id_random
self.x_no_account_id_token = x_no_account_id_token
self.enabled = enabled
self.limit = limit
self.offset = offset
self.by_alias = by_alias
self.exclude_unset = exclude_unset
self.response = response
# log.debug(response)
# ### END ### API Lib General ### class Common_Route_Params ###
# ### BEGIN ### API Lib General ### common_route_params() ###
# Updated 2023-01-30
@logger_reset # This breaks things for some reason when the function is async. Do not use async def common_route_params()!
def common_route_params_no_account_id(
x_account_id: str = Header(None, min_length=11, max_length=22),
enabled: str = 'enabled', # all, enabled, disabled
limit: int = 100,
offset: int = 0,
by_alias: bool = True,
exclude_none: Optional[bool] = True,
exclude_unset: bool = False,
# NOTE: Uncommenting either exclude or include breaks the JSON body format. I do not know why? Should be: {} Becomes this: {"obj_name": {"data_name": "data_value"}} -STI 2022-01-05
# exclude: Optional[list] = [], # Leaving this and include commented out
# include: Optional[list] = [], # Leaving this and exclude commented out
response: Response = Response,
log_lvl: int = logging.WARNING, # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
) -> Common_Route_Params_No_Account_ID:
log.setLevel(log_lvl)
log.debug(locals())
log.info(f'Setting commons values: x_account_id, x_account_id_random, limit, offset, enabled, by_alias, exclude_unset, response')
x_account_id_random = x_account_id
if x_account_id := redis_lookup_id_random(table_name='account', record_id_random=x_account_id):
log.info(f'Found the x-account-id header with the value: {x_account_id}')
elif x_account_id is None:
log.warning(f'No x-account-id header value passed')
else:
log.warning(f'The x-account-id header was found, but the Account ID was not found or is not valid. Account ID: {x_account_id}')
raise HTTPException(status_code=403, detail='The x-account-id Account ID was not found.') # Forbidden
commons = Common_Route_Params_No_Account_ID( x_account_id=x_account_id, x_account_id_random=x_account_id_random, limit=limit, offset=offset, enabled=enabled, by_alias=by_alias, exclude_unset=exclude_unset, response=response )
log.debug(commons)
return commons
# ### END ### API Lib General ### async common_route_params() ###
# ### BEGIN ### API Lib General ### class Common_Route_Params ###
# Updated 2022-01-05
class Common_Route_Params:
def __init__(
self,
x_account_id: int,
x_account_id_random: str,
x_no_account_id_token: str|None = None,
enabled: str = 'enabled',
limit: int = 10,
offset: int = 0,
by_alias: bool = True,
exclude_unset: bool = False,
response = None,
):
self.x_account_id = x_account_id
self.x_account_id_random = x_account_id_random
self.x_no_account_id_token = x_no_account_id_token
self.enabled = enabled
self.limit = limit
self.offset = offset
self.by_alias = by_alias
self.exclude_unset = exclude_unset
self.response = response
# log.debug(response)
# ### END ### API Lib General ### class Common_Route_Params ###
# ### BEGIN ### API Lib General ### common_route_params() ###
# Updated 2022-02-15
@logger_reset # This breaks things for some reason when the function is async. Do not use async def common_route_params()!
def common_route_params(
# x_account_id: str = Header(..., min_length=11, max_length=22), # NOTE WARNING: Commented out 2023-08-17
x_account_id: str = Header(None, min_length=11, max_length=22), # NOTE WARNING: Changed to this 2023-08-17
x_no_account_id: str = Header(None, min_length=11, max_length=22), # NOTE WARNING: Changed to this 2023-08-17
# x_aether_api_key: Optional[str] = Header(..., min_length=11, max_length=22),
# x_aether_api_token: Optional[str] = Header(..., min_length=11, max_length=22),
# x_aether_jwt_token: Optional[str] = Header(..., min_length=11, max_length=50),
x_no_account_id_token: str|None = None, # NOTE: Not a header value! Added 2023-08-17
enabled: str = 'enabled', # all, enabled, disabled
limit: int = 100,
offset: int = 0,
by_alias: bool = True,
exclude_unset: bool = False,
exclude_defaults: Optional[bool] = False,
exclude_none: Optional[bool] = False,
# NOTE: Uncommenting either exclude or include breaks the JSON body format. I do not know why? Should be: {} Becomes this: {"obj_name": {"data_name": "data_value"}} -STI 2022-01-05
# exclude: Optional[list] = [], # Leaving this and include commented out
# include: Optional[list] = [], # Leaving this and exclude commented out
response: Response = Response,
log_lvl: int = logging.WARNING, # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
) -> Common_Route_Params|Common_Route_Params_No_Account_ID:
log.setLevel(log_lvl)
log.debug(locals())
log.info(f'Setting commons values: x_account_id, x_account_id_random, limit, offset, enabled, by_alias, exclude_unset, response')
x_account_id_random = x_account_id
if x_account_id:
if x_account_id := redis_lookup_id_random(table_name='account', record_id_random=x_account_id):
log.info(f'Found the x-account-id header with the value: {x_account_id}')
else:
log.warning(f'The x-account-id header was found, but the Account ID was not found or is not valid. Account ID: {x_account_id}')
raise HTTPException(status_code=403, detail='The x-account-id Account ID was not found.') # Forbidden
elif x_no_account_id and len(x_no_account_id) > 10:
log.warning(f'Found the x_no_account_id header param with the value: {x_no_account_id}')
x_account_id = None
x_account_id_random = '--- NOT SET ---'
elif x_no_account_id_token and len(x_no_account_id_token) > 10: # NOTE: Not a header value!
# NOTE WARNING: This token should be verified and able to be disabled quickly.
log.warning(f'Found the x_no_account_id_token URL param with the value: {x_no_account_id_token}')
if x_account_id := redis_lookup_id_random(table_name='account', record_id_random=x_no_account_id_token):
log.info(f'Found the x-account-id header with the value: {x_account_id}')
x_account_id_random = x_no_account_id_token
else:
x_account_id = 0
x_account_id_random = ''
x_account_id = 0
x_account_id_random = '--- NOT SET ---'
else:
log.warning(f'The x-account-id and x-no-account-id-token headers were not found.')
raise HTTPException(status_code=403, detail='The x-account-id and x-no-account-id-token headers were not found.') # Forbidden
if x_account_id:
commons = Common_Route_Params( x_account_id=x_account_id, x_account_id_random=x_account_id_random, x_no_account_id_token=x_no_account_id_token, limit=limit, offset=offset, enabled=enabled, by_alias=by_alias, exclude_unset=exclude_unset, response=response )
else:
commons = Common_Route_Params_No_Account_ID( x_account_id=None, x_account_id_random=None, x_no_account_id_token=x_no_account_id_token, limit=limit, offset=offset, enabled=enabled, by_alias=by_alias, exclude_unset=exclude_unset, response=response )
log.debug(commons)
return commons
# ### END ### API Lib General ### async common_route_params() ###
# ### BEGIN ### API Lib General ### class Common_Route_Params_Min ###
# Updated 2022-01-05
# NOTE: Is this essentially the same as Common_Route_Params_No_Account_ID above?
class Common_Route_Params_Min:
def __init__(
self,
x_account_id: int = None,
x_account_id_random: str = None,
enabled: str = 'enabled',
limit: int = 10,
offset: int = 0,
by_alias: bool = True,
exclude_unset: bool = False,
response = None,
):
self.x_account_id = x_account_id
self.x_account_id_random = x_account_id_random
self.enabled = enabled
self.limit = limit
self.offset = offset
self.by_alias = by_alias
self.exclude_unset = exclude_unset
self.response = response
# log.debug(response)
# ### END ### API Lib General ### class Common_Route_Params_Min ###
# ### BEGIN ### API Lib General ### common_route_params_min() ###
# Updated 2022-02-15
# NOTE: Is this essentially the same as common_route_params_no_account_id above?
@logger_reset # This breaks things for some reason when the function is async. Do not use async def common_route_params()!
def common_route_params_min(
x_account_id: str = Header(None, min_length=11, max_length=22),
enabled: str = 'enabled', # all, enabled, disabled
limit: int = 100,
offset: int = 0,
by_alias: bool = True,
exclude_none: Optional[bool] = True,
exclude_unset: bool = False,
# NOTE: Uncommenting either exclude or include breaks the JSON body format. I do not know why? Should be: {} Becomes this: {"obj_name": {"data_name": "data_value"}} -STI 2022-01-05
# exclude: Optional[list] = [], # Leaving this and include commented out
# include: Optional[list] = [], # Leaving this and exclude commented out
response: Response = Response,
log_lvl: int = logging.WARNING, # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
) -> Common_Route_Params:
log.setLevel(log_lvl)
log.debug(locals())
log.info(f'Setting commons values: x_account_id, x_account_id_random, limit, offset, enabled, by_alias, exclude_unset, response')
log.debug(f'X Account ID: {x_account_id}')
if x_account_id:
x_account_id_random = x_account_id
if x_account_id := redis_lookup_id_random(table_name='account', record_id_random=x_account_id):
log.info(f'Found the x-account-id header with the value: {x_account_id}')
else:
log.warning(f'The x-account-id header was found, but the Account ID was not found or is not valid. Account ID: {x_account_id}')
raise HTTPException(status_code=403, detail='The x-account-id Account ID was not found.') # Forbidden
else: x_account_id_random = None
commons = Common_Route_Params_Min( x_account_id=x_account_id, x_account_id_random=x_account_id_random, limit=limit, offset=offset, enabled=enabled, by_alias=by_alias, exclude_unset=exclude_unset, response=response )
log.debug(commons)
return commons
# ### END ### API Lib General ### async common_route_params_min() ###
def secure_hash_string(string: str):
string_hash = argon2.using(rounds=14, memory_cost=1536, parallelism=2).hash(string)
return string_hash
def verify_secure_hash_string(string: str, string_hash: str):
if argon2.verify(string, string_hash):
return True
else:
return False
# ### BEGIN ### API Lib General ### sign_jwt() ###
# Updated 2021-07-14
@logger_reset
def sign_jwt(
secret_key: str, # Secret/Private/Password
ttl: int = 60, # Default to 60 seconds
max_renew: int = 0, # Default to 0
public_key: str = None, # Will be part of the token. Use to look up secret when verifying.???
account_id: str = None,
person_id: str = None,
user_id: str = None,
json_str: str = None,
b64_str: str = None,
) -> Dict[str, str]:
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
payload = {
'iat': time.time(), # Issued at
'eat': time.time() + ttl, # Expires at
'max_renew': max_renew, # Number of times allowed to request renew without API secret key
'public_key': public_key, # Use to lookup the secret/private/password key when verifying
'account_id': account_id,
'person_id': person_id,
'user_id': user_id,
'json_str': json_str,
'b64_str': b64_str,
}
secret = secret_key
algorithm = 'HS256'
token = jwt.encode(payload, secret, algorithm=algorithm)
log.debug(token)
return token
# ### END ### API Lib General ### sign_jwt() ###
# ### BEGIN ### API Lib General ### decode_jwt() ###
# Updated 2021-07-14
@logger_reset
def decode_jwt(
secret_key: str,
token: str,
) -> dict:
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
secret = secret_key
algorithm = 'HS256'
try:
decoded_token = jwt.decode(token, secret, algorithms=[algorithm])
log.debug(decoded_token)
if decoded_token['eat'] >= time.time(): return decoded_token
else: return False
except:
return None
# ### END ### API Lib General ### decode_jwt() ###
# ### BEGIN ### API Lib General ### create_export() ###
# Updated 2021-11-23
@logger_reset
def create_export_file(
data_dict_list: list,
subdir_path: str,
filename: str,
column_name_li: list = [],
rm_id: bool = True,
export_type: str = 'CSV', # CSV, Excel
) -> bool|str:
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
hosted_tmp_path = settings.FILES_PATH['hosted_tmp_root']
# hosted_tmp_path = 'admin/temp'
log.info(f'Hosted Temp Path: {hosted_tmp_path}')
subdirectory_dest = os.path.join(hosted_tmp_path, subdir_path)
log.debug(subdirectory_dest)
pathlib.Path(subdirectory_dest).mkdir(parents=True, exist_ok=True)
file_dest_w_subdir = os.path.join(subdirectory_dest, filename)
log.info(f'File Dest With Subdir: {file_dest_w_subdir}')
if column_name_li:
log.info('Using column name list passed')
else:
log.info('Using an auto generated column name list')
column_name_li = list(data_dict_list[0].keys())
log.debug(column_name_li)
if rm_id:
for column_name in list(column_name_li):
# log.info(f'Checking column name: {column_name}')
if column_name.endswith('_id'):
column_name_li.remove(column_name)
log.info(f'Removing column name: {column_name}')
log.debug(column_name_li)
# column_name_li = ['order_line_id_random', 'order_id_random', 'product_id_random', 'product_type', 'product_name', 'quantity', 'amount', 'dollar_amount', 'message', 'person_id_random', 'person_given_name', 'person_family_name', 'person_display_name', 'person_full_name', 'person_contact_email', 'person_contact_cc_email', 'person_contact_address_name', 'person_contact_address_organization_name', 'person_contact_address_line_1', 'person_contact_address_line_2', 'person_contact_address_line_3', 'person_contact_address_city', 'person_contact_address_country_subdivision_code', 'person_contact_address_state_province', 'person_contact_address_postal_code', 'person_contact_address_country_alpha_2_code', 'person_contact_address_country_name', 'person_contact_address_country', 'order_status', 'order_created_on', 'order_updated_on']
data_dataframe = pandas.DataFrame(data_dict_list)
log.debug(data_dataframe)
try:
if export_type == 'CSV':
log.info('Saving dataframe to CSV file')
full_dest_path = file_dest_w_subdir+'.csv'
filename_w_ext = filename+'.csv'
tmp_file_path = os.path.join(subdir_path,filename_w_ext)
data_dataframe.to_csv(full_dest_path, columns=column_name_li, index=False)
elif export_type == 'Excel':
log.info('Saving dataframe to Excel file')
full_dest_path = file_dest_w_subdir+'.xlsx'
filename_w_ext = filename+'.xlsx'
tmp_file_path = os.path.join(subdir_path,filename_w_ext)
data_dataframe.to_excel(full_dest_path, columns=column_name_li, index=False) # sheet_name='Sheet_name_1'
except:
log.exception('Something went wrong while trying to save the export file.')
return False
log.info(f'Temp File Path: {tmp_file_path}')
return tmp_file_path # True
# ### END ### API Lib General ### create_export() ###
# ### BEGIN ### API Lib General ### return_full_tmp_path() ###
# This is for using with return FileResponse(path=full_tmp_path, filename=filename)
# Updated 2022-04-22
@logger_reset
def return_full_tmp_path(
full_tmp_path: str = None,
subdir_path: str = None,
filename: str = None,
) -> bool|str:
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
hosted_tmp_path = settings.FILES_PATH['hosted_tmp_root']
# hosted_tmp_path = 'admin/temp'
log.info(f'Hosted Temp Path: {hosted_tmp_path}')
if full_tmp_path:
file_dest = os.path.join(hosted_tmp_path, full_tmp_path)
return file_dest
elif subdir_path and filename:
subdirectory_dest = os.path.join(hosted_tmp_path, subdir_path)
log.debug(subdirectory_dest)
pathlib.Path(subdirectory_dest).mkdir(parents=True, exist_ok=True)
file_dest_w_subdir = os.path.join(subdirectory_dest, filename)
log.info(f'File Dest With Subdir: {file_dest_w_subdir}')
return file_dest_w_subdir
else:
return False
# ### END ### API Lib General ### return_full_tmp_path() ###
# ### BEGIN ### API Lib General ### send_email() ###
# Updated 2021-12-02
@logger_reset
def send_email(
from_email: str,
to_email: str,
subject: str,
body_html: str,
from_name: str = '',
reply_to_email: str = '',
reply_to_name: str = '',
to_name: str = '',
cc_email: str = '',
cc_name: str = '',
bcc_email: str = '',
bcc_name: str = '',
body_text: str = '',
test: bool = False
):
log.setLevel(logging.INFO) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
message = EmailMessage()
if subject:
message['Subject'] = subject
else:
return False
if from_email and from_name:
#message['From'] = Address(display_name=from_name, username=from_email.split('@')[0], domain=from_email.split('@')[1])
try:
message['From'] = Address(display_name=from_name, addr_spec=from_email)
except Exception as e:
log.exception('**** *** ** * ### BEGIN ### Exception Happened: Returning False * ** *** ****')
return False
elif from_email:
try:
message['From'] = Address(display_name=from_email, addr_spec=from_email)
except Exception as e:
log.exception('**** *** ** * ### BEGIN ### Exception Happened: Returning False * ** *** ****')
return False
else:
return False
if reply_to_email and reply_to_name:
try:
message['Reply-To'] = Address(display_name=reply_to_name, addr_spec=reply_to_email)
except Exception as e:
log.exception('**** *** ** * ### BEGIN ### Exception Happened: Returning False * ** *** ****')
return False
elif reply_to_email:
try:
message['Reply-To'] = Address(display_name=reply_to_email, addr_spec=reply_to_email)
except Exception as e:
log.exception('**** *** ** * ### BEGIN ### Exception Happened: Returning False * ** *** ****')
return False
if to_email and to_name:
try:
message['To'] = Address(display_name=to_name, addr_spec=to_email)
except Exception as e:
log.exception('**** *** ** * ### BEGIN ### Exception Happened: Returning False * ** *** ****')
return False
elif to_email:
try:
message['To'] = Address(display_name=to_email, addr_spec=to_email)
except Exception as e:
log.exception('**** *** ** * ### BEGIN ### Exception Happened: Returning False * ** *** ****')
return False
else:
return False
if cc_email and cc_name:
try:
message['Cc'] = Address(display_name=cc_name, addr_spec=cc_email)
except Exception as e:
log.exception('**** *** ** * ### BEGIN ### Exception Happened: Returning False * ** *** ****')
return False
elif cc_email:
try:
message['Cc'] = Address(display_name=cc_email, addr_spec=cc_email)
except Exception as e:
log.exception('**** *** ** * ### BEGIN ### Exception Happened: Returning False * ** *** ****')
return False
if bcc_email and bcc_name:
try:
message['Bcc'] = Address(display_name=bcc_name, addr_spec=bcc_email)
except Exception as e:
log.exception('**** *** ** * ### BEGIN ### Exception Happened: Returning False * ** *** ****')
return False
elif bcc_email:
try:
message['Bcc'] = Address(display_name=bcc_email, addr_spec=bcc_email)
except Exception as e:
log.exception('**** *** ** * ### BEGIN ### Exception Happened: Returning False * ** *** ****')
return False
html_version = """\
<html>
<body>
"""+body_html+"""
</body>
</html>
"""
# html_version = """\
# <html>
# <body>
# <div>
# """+body_html+"""
# </div>
# </body>
# </html>
# """
if body_text:
text_version = body_text
else:
text_version = html2text.html2text(html_version)
message.set_content(text_version)
message.add_alternative(html_version, subtype='html')
log.info('Sending email...')
log.debug(settings.SMTP)
log.info(f'Subject: {subject}')
log.info(f'From: {from_email} Reply To: {reply_to_email} To: {to_email} CC: {cc_email} BCC: {bcc_email}')
log.debug('Message:')
log.debug(message.as_string())
log.info('Creating SMTP SSL connection...')
context = ssl.create_default_context()
# log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.info('SMTP configuration, connect, and send')
if test:
try:
with smtplib.SMTP_SSL(settings.SMTP['server'], settings.SMTP['port'], context=context) as server:
log.debug('[TEST] SMTP log in...')
server.login(settings.SMTP['username'], settings.SMTP['password'])
log.debug('[TEST] SMTP send message... [WILL NOT SEND IN TEST MODE]')
# server.send_message(message)
log.info('[TEST] Email sent!')
return True
except:
#except SMTPException:
log.error('[TEST] Error: unable to send email')
return False
try:
with smtplib.SMTP_SSL(settings.SMTP['server'], settings.SMTP['port'], context=context) as server:
log.debug('SMTP log in...')
server.login(settings.SMTP['username'], settings.SMTP['password'])
log.debug('SMTP send message...')
server.send_message(message)
log.info('Email sent!')
return True
except:
#except SMTPException:
log.error('Error: unable to send email')
return False
# ### END ### API Lib General ### send_email() ###