Files
OSIT-AE-API-FastAPI/app/lib_general.py
2022-01-24 13:06:27 -05:00

319 lines
12 KiB
Python

from __future__ import annotations
import datetime, html2text, jwt, os, pandas, pathlib, pytz, redis, time
from passlib.hash import argon2
# Import smtplib for the actual sending function
import smtplib, ssl
# Import the email package modules needed
from email.message import EmailMessage
from email.headerregistry import Address
from email.utils import make_msgid
from fastapi import APIRouter, Depends, Header, HTTPException, Response, status
from pydantic import BaseModel, EmailStr, Field
from typing import Dict, List, Optional, Set, Union
from app.log import log, logging, logger_reset
from app.config import settings
from app.db_sql import redis_lookup_id_random, sql_select
# ### BEGIN ### API Lib General ### async get_token_header() ###
async def get_token_header(x_token: str = Header(...)):
if x_token != 'fake-super-secret-token':
raise HTTPException(status_code=400, detail='X-Token header invalid')
# ### END ### API Lib General ### async get_token_header() ###
# ### BEGIN ### API Lib General ### class Common_Route_Params ###
# Updated 2022-01-05
class Common_Route_Params:
def __init__(
self,
x_account_id: int,
x_account_id_random: str,
enabled: str = 'enabled',
limit: int = 10,
offset: int = 0,
by_alias: bool = True,
exclude_unset: bool = True,
response = None,
):
self.x_account_id = x_account_id
self.x_account_id_random = x_account_id_random
self.enabled = enabled
self.limit = limit
self.offset = offset
self.by_alias = by_alias
self.exclude_unset = exclude_unset
self.response = response
# log.debug(response)
# ### END ### API Lib General ### class Common_Route_Params ###
# ### BEGIN ### API Lib General ### async route_commons() ###
# Updated 2022-01-05
async def common_route_params(
x_account_id: str = Header(..., min_length=11, max_length=22),
enabled: str = 'enabled', # all, enabled, disabled
limit: int = 100,
offset: int = 0,
by_alias: bool = True,
exclude_none: Optional[bool] = True,
exclude_unset: bool = True,
# NOTE: Uncommenting either exclude or include breaks the JSON body format. I do not know why? Should be: {} Becomes this: {"obj_name": {"data_name": "data_value"}} -STI 2022-01-05
# exclude: Optional[list] = [], # Leaving this and include commented out
# include: Optional[list] = [], # Leaving this and exclude commented out
response: Response = Response,
) -> Common_Route_Params:
log.setLevel(logging.INFO) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
log.info(f'Setting commons values')
log.info(f'The x-account-id header has a value. x-account-id: {x_account_id}')
x_account_id_random = x_account_id
if x_account_id := redis_lookup_id_random(table_name='account', record_id_random=x_account_id):
log.info(f'Found the x-account-id with the value: {x_account_id}')
# x_account = { 'id': account_id, 'id_random': x_account_id }
# log.debug(x_account)
# return x_account
else:
log.warning(f'The x-account-id Account ID was not found. Account ID: {x_account_id}')
raise HTTPException(status_code=403, detail='The x-account-id Account ID was not found.') # Forbidden
commons = Common_Route_Params( x_account_id=x_account_id, x_account_id_random=x_account_id_random, limit=limit, offset=offset, enabled=enabled, by_alias=by_alias, exclude_unset=exclude_unset, response=response )
log.debug(commons)
return commons
# ### END ### API Lib General ### async route_commons() ###
def secure_hash_string(string: str):
string_hash = argon2.using(rounds=14, memory_cost=1536, parallelism=2).hash(string)
return string_hash
def verify_secure_hash_string(string: str, string_hash: str):
if argon2.verify(string, string_hash):
return True
else:
return False
# ### BEGIN ### API Lib General ### sign_jwt() ###
# Updated 2021-07-14
@logger_reset
def sign_jwt(
secret_key: str, # Secret/Private/Password
public_key: str, # Will be part of the token. Use to look up secret when verifying.
ttl: int = 60, # Default to 60 seconds
max_renew: int = 0, # Default to 0
account_id: str = None,
person_id: str = None,
user_id: str = None,
) -> Dict[str, str]:
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
payload = {
'iat': time.time(), # Issued at
'eat': time.time() + ttl, # Expires at
'max_renew': max_renew, # Number of times allowed to request renew without API secret key
'public_key': public_key, # Use to lookup the secret/private/password key when verifying
'account_id': account_id,
'person_id': person_id,
'user_id': user_id,
}
secret = secret_key
algorithm = 'HS256'
token = jwt.encode(payload, secret, algorithm=algorithm)
log.debug(token)
return token
# ### END ### API Lib General ### sign_jwt() ###
# ### BEGIN ### API Lib General ### decode_jwt() ###
# Updated 2021-07-14
@logger_reset
def decode_jwt(
secret_key: str,
token: str,
) -> dict:
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
secret = secret_key
algorithm = 'HS256'
try:
decoded_token = jwt.decode(token, secret, algorithms=[algorithm])
log.debug(decoded_token)
if decoded_token['eat'] >= time.time(): return decoded_token
else: return False
except:
return None
# ### END ### API Lib General ### decode_jwt() ###
# ### BEGIN ### API Lib General ### create_export() ###
# Updated 2021-11-23
@logger_reset
def create_export_file(
data_dict_list: list,
subdir_path: str,
filename: str,
column_name_li: list = [],
rm_id: bool = True,
export_type: str = 'CSV', # CSV, Excel
) -> bool|str:
log.setLevel(logging.INFO) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
hosted_tmp_path = settings.PATH_HOSTED_TMP_ROOT
# hosted_tmp_path = 'admin/temp'
log.info(f'Hosted Temp Path: {hosted_tmp_path}')
subdirectory_dest = os.path.join(hosted_tmp_path, subdir_path)
log.debug(subdirectory_dest)
pathlib.Path(subdirectory_dest).mkdir(parents=True, exist_ok=True)
file_dest_w_subdir = os.path.join(subdirectory_dest, filename)
log.info(f'File Dest With Subdir: {file_dest_w_subdir}')
if column_name_li:
log.info('Using column name list passed')
else:
log.info('Using an auto generated column name list')
column_name_li = list(data_dict_list[0].keys())
log.debug(column_name_li)
if rm_id:
for column_name in list(column_name_li):
# log.info(f'Checking column name: {column_name}')
if column_name.endswith('_id'):
column_name_li.remove(column_name)
log.info(f'Removing column name: {column_name}')
log.debug(column_name_li)
# column_name_li = ['order_line_id_random', 'order_id_random', 'product_id_random', 'product_type', 'product_name', 'quantity', 'amount', 'dollar_amount', 'message', 'person_id_random', 'person_given_name', 'person_family_name', 'person_display_name', 'person_full_name', 'person_contact_email', 'person_contact_cc_email', 'person_contact_address_name', 'person_contact_address_organization_name', 'person_contact_address_line_1', 'person_contact_address_line_2', 'person_contact_address_line_3', 'person_contact_address_city', 'person_contact_address_country_subdivision_code', 'person_contact_address_state_province', 'person_contact_address_postal_code', 'person_contact_address_country_alpha_2_code', 'person_contact_address_country_name', 'person_contact_address_country', 'order_status', 'order_created_on', 'order_updated_on']
data_dataframe = pandas.DataFrame(data_dict_list)
log.debug(data_dataframe)
try:
if export_type == 'CSV':
log.info('Saving dataframe to CSV file')
full_dest_path = file_dest_w_subdir+'.csv'
filename_w_ext = filename+'.csv'
tmp_file_path = os.path.join(subdir_path,filename_w_ext)
data_dataframe.to_csv(full_dest_path, columns=column_name_li, index=False)
elif export_type == 'Excel':
log.info('Saving dataframe to Excel file')
full_dest_path = file_dest_w_subdir+'.xlsx'
filename_w_ext = filename+'.xlsx'
tmp_file_path = os.path.join(subdir_path,filename_w_ext)
data_dataframe.to_excel(full_dest_path, columns=column_name_li, index=False) # sheet_name='Sheet_name_1'
except:
log.exception('Something went wrong while trying to save the export file.')
return False
log.info(f'Temp File Path: {tmp_file_path}')
return tmp_file_path # True
# ### END ### API Lib General ### create_export() ###
# ### BEGIN ### API Lib General ### send_email() ###
# Updated 2021-12-02
@logger_reset
def send_email(
from_email: str,
to_email: str,
subject: str,
body_html: str,
from_name: str = '',
reply_to_email: str = '',
reply_to_name: str = '',
to_name: str = '',
cc_email: str = '',
cc_name: str = '',
bcc_email: str = '',
bcc_name: str = '',
body_text: str = '',
):
log.setLevel(logging.INFO) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
message = EmailMessage()
if subject:
message['Subject'] = subject
else:
return False
if from_email:
#message['From'] = Address(display_name=from_name, username=from_email.split('@')[0], domain=from_email.split('@')[1])
message['From'] = Address(display_name=from_name, addr_spec=from_email)
else:
return False
if reply_to_email:
message['Reply-To'] = Address(display_name=reply_to_name, addr_spec=reply_to_email)
if to_email:
message['To'] = Address(display_name=to_name, addr_spec=to_email)
else:
return False
if cc_email:
message['Cc'] = Address(display_name=cc_name, addr_spec=cc_email)
if bcc_email:
message['Bcc'] = Address(display_name=bcc_name, addr_spec=bcc_email)
html_version = """\
<html>
<body>
<div>
"""+body_html+"""
</div>
</body>
</html>
"""
if body_text:
text_version = body_text
else:
text_version = html2text.html2text(html_version)
message.set_content(text_version)
message.add_alternative(html_version, subtype='html')
log.info('Sending email...')
log.debug(settings.SMTP)
log.info(f'Subject: {subject}')
log.info(f'From: {from_email} Reply To: {reply_to_email} To: {to_email} CC: {cc_email} BCC: {bcc_email}')
log.debug('Message:')
log.debug(message.as_string())
log.info('Creating SMTP SSL connection...')
context = ssl.create_default_context()
log.info('SMTP configuration, connect, and send')
try:
with smtplib.SMTP_SSL(settings.SMTP['server_name'], settings.SMTP['server_port'], context=context) as server:
server.login(settings.SMTP['username'], settings.SMTP['password'])
server.send_message(message)
log.info('Email sent')
return True
except:
#except SMTPException:
log.error('Error: unable to send email')
return False
# ### END ### API Lib General ### send_email() ###