347 lines
13 KiB
Python
347 lines
13 KiB
Python
# from __future__ import annotations
|
|
import datetime, html2text, jwt, os, pandas, pathlib, pytz, redis, time
|
|
from passlib.hash import argon2
|
|
|
|
# Import smtplib for the actual sending function
|
|
import smtplib, ssl
|
|
|
|
# Import the email package modules needed
|
|
from email.message import EmailMessage
|
|
from email.headerregistry import Address
|
|
from email.utils import make_msgid
|
|
|
|
from fastapi import APIRouter, Depends, Header, HTTPException, Response, status
|
|
from pydantic import BaseModel, EmailStr, Field
|
|
from typing import Dict, List, Optional, Set, Union
|
|
|
|
from app.log import log, logging, logger_reset
|
|
from app.config import settings
|
|
from app.db_sql import redis_lookup_id_random, sql_select
|
|
|
|
|
|
# ### BEGIN ### API Lib General ### async get_token_header() ###
|
|
def get_token_header(x_token: str = Header(...)):
|
|
if x_token != 'fake-super-secret-token':
|
|
raise HTTPException(status_code=400, detail='X-Token header invalid')
|
|
# ### END ### API Lib General ### async get_token_header() ###
|
|
|
|
|
|
# ### BEGIN ### API Lib General ### class Common_Route_Params ###
|
|
# Updated 2022-01-05
|
|
class Common_Route_Params:
|
|
def __init__(
|
|
self,
|
|
x_account_id: int,
|
|
x_account_id_random: str,
|
|
enabled: str = 'enabled',
|
|
limit: int = 10,
|
|
offset: int = 0,
|
|
by_alias: bool = True,
|
|
exclude_unset: bool = True,
|
|
response = None,
|
|
):
|
|
self.x_account_id = x_account_id
|
|
self.x_account_id_random = x_account_id_random
|
|
self.enabled = enabled
|
|
self.limit = limit
|
|
self.offset = offset
|
|
self.by_alias = by_alias
|
|
self.exclude_unset = exclude_unset
|
|
self.response = response
|
|
# log.debug(response)
|
|
# ### END ### API Lib General ### class Common_Route_Params ###
|
|
|
|
|
|
# ### BEGIN ### API Lib General ### common_route_params() ###
|
|
# Updated 2022-02-15
|
|
@logger_reset # This breaks things for some reason when the function is async. Do not use async def common_route_params()!
|
|
def common_route_params(
|
|
x_account_id: str = Header(..., min_length=11, max_length=22),
|
|
enabled: str = 'enabled', # all, enabled, disabled
|
|
limit: int = 100,
|
|
offset: int = 0,
|
|
by_alias: bool = True,
|
|
exclude_none: Optional[bool] = True,
|
|
exclude_unset: bool = True,
|
|
# NOTE: Uncommenting either exclude or include breaks the JSON body format. I do not know why? Should be: {} Becomes this: {"obj_name": {"data_name": "data_value"}} -STI 2022-01-05
|
|
# exclude: Optional[list] = [], # Leaving this and include commented out
|
|
# include: Optional[list] = [], # Leaving this and exclude commented out
|
|
response: Response = Response,
|
|
log_lvl: int = logging.WARNING, # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
|
|
) -> Common_Route_Params:
|
|
log.setLevel(log_lvl)
|
|
log.debug(locals())
|
|
|
|
log.info(f'Setting commons values: x_account_id, x_account_id_random, limit, offset, enabled, by_alias, exclude_unset, response')
|
|
|
|
x_account_id_random = x_account_id
|
|
|
|
if x_account_id := redis_lookup_id_random(table_name='account', record_id_random=x_account_id):
|
|
log.info(f'Found the x-account-id header with the value: {x_account_id}')
|
|
else:
|
|
log.warning(f'The x-account-id header was found, but the Account ID was not found or is not valid. Account ID: {x_account_id}')
|
|
raise HTTPException(status_code=403, detail='The x-account-id Account ID was not found.') # Forbidden
|
|
|
|
commons = Common_Route_Params( x_account_id=x_account_id, x_account_id_random=x_account_id_random, limit=limit, offset=offset, enabled=enabled, by_alias=by_alias, exclude_unset=exclude_unset, response=response )
|
|
|
|
log.debug(commons)
|
|
|
|
return commons
|
|
# ### END ### API Lib General ### async common_route_params() ###
|
|
|
|
|
|
def secure_hash_string(string: str):
|
|
string_hash = argon2.using(rounds=14, memory_cost=1536, parallelism=2).hash(string)
|
|
|
|
return string_hash
|
|
|
|
|
|
def verify_secure_hash_string(string: str, string_hash: str):
|
|
if argon2.verify(string, string_hash):
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
|
|
# ### BEGIN ### API Lib General ### sign_jwt() ###
|
|
# Updated 2021-07-14
|
|
@logger_reset
|
|
def sign_jwt(
|
|
secret_key: str, # Secret/Private/Password
|
|
public_key: str, # Will be part of the token. Use to look up secret when verifying.
|
|
ttl: int = 60, # Default to 60 seconds
|
|
max_renew: int = 0, # Default to 0
|
|
account_id: str = None,
|
|
person_id: str = None,
|
|
user_id: str = None,
|
|
) -> Dict[str, str]:
|
|
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
|
|
log.debug(locals())
|
|
|
|
payload = {
|
|
'iat': time.time(), # Issued at
|
|
'eat': time.time() + ttl, # Expires at
|
|
'max_renew': max_renew, # Number of times allowed to request renew without API secret key
|
|
'public_key': public_key, # Use to lookup the secret/private/password key when verifying
|
|
'account_id': account_id,
|
|
'person_id': person_id,
|
|
'user_id': user_id,
|
|
}
|
|
secret = secret_key
|
|
algorithm = 'HS256'
|
|
token = jwt.encode(payload, secret, algorithm=algorithm)
|
|
|
|
log.debug(token)
|
|
|
|
return token
|
|
# ### END ### API Lib General ### sign_jwt() ###
|
|
|
|
|
|
# ### BEGIN ### API Lib General ### decode_jwt() ###
|
|
# Updated 2021-07-14
|
|
@logger_reset
|
|
def decode_jwt(
|
|
secret_key: str,
|
|
token: str,
|
|
) -> dict:
|
|
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
|
|
log.debug(locals())
|
|
|
|
secret = secret_key
|
|
algorithm = 'HS256'
|
|
|
|
try:
|
|
decoded_token = jwt.decode(token, secret, algorithms=[algorithm])
|
|
log.debug(decoded_token)
|
|
if decoded_token['eat'] >= time.time(): return decoded_token
|
|
else: return False
|
|
except:
|
|
return None
|
|
# ### END ### API Lib General ### decode_jwt() ###
|
|
|
|
|
|
# ### BEGIN ### API Lib General ### create_export() ###
|
|
# Updated 2021-11-23
|
|
@logger_reset
|
|
def create_export_file(
|
|
data_dict_list: list,
|
|
subdir_path: str,
|
|
filename: str,
|
|
column_name_li: list = [],
|
|
rm_id: bool = True,
|
|
export_type: str = 'CSV', # CSV, Excel
|
|
) -> bool|str:
|
|
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
|
|
log.debug(locals())
|
|
|
|
hosted_tmp_path = settings.PATH_HOSTED_TMP_ROOT
|
|
# hosted_tmp_path = 'admin/temp'
|
|
log.info(f'Hosted Temp Path: {hosted_tmp_path}')
|
|
|
|
subdirectory_dest = os.path.join(hosted_tmp_path, subdir_path)
|
|
log.debug(subdirectory_dest)
|
|
pathlib.Path(subdirectory_dest).mkdir(parents=True, exist_ok=True)
|
|
file_dest_w_subdir = os.path.join(subdirectory_dest, filename)
|
|
log.info(f'File Dest With Subdir: {file_dest_w_subdir}')
|
|
|
|
if column_name_li:
|
|
log.info('Using column name list passed')
|
|
else:
|
|
log.info('Using an auto generated column name list')
|
|
column_name_li = list(data_dict_list[0].keys())
|
|
log.debug(column_name_li)
|
|
|
|
if rm_id:
|
|
for column_name in list(column_name_li):
|
|
# log.info(f'Checking column name: {column_name}')
|
|
if column_name.endswith('_id'):
|
|
column_name_li.remove(column_name)
|
|
log.info(f'Removing column name: {column_name}')
|
|
log.debug(column_name_li)
|
|
|
|
|
|
# column_name_li = ['order_line_id_random', 'order_id_random', 'product_id_random', 'product_type', 'product_name', 'quantity', 'amount', 'dollar_amount', 'message', 'person_id_random', 'person_given_name', 'person_family_name', 'person_display_name', 'person_full_name', 'person_contact_email', 'person_contact_cc_email', 'person_contact_address_name', 'person_contact_address_organization_name', 'person_contact_address_line_1', 'person_contact_address_line_2', 'person_contact_address_line_3', 'person_contact_address_city', 'person_contact_address_country_subdivision_code', 'person_contact_address_state_province', 'person_contact_address_postal_code', 'person_contact_address_country_alpha_2_code', 'person_contact_address_country_name', 'person_contact_address_country', 'order_status', 'order_created_on', 'order_updated_on']
|
|
|
|
data_dataframe = pandas.DataFrame(data_dict_list)
|
|
log.debug(data_dataframe)
|
|
|
|
try:
|
|
if export_type == 'CSV':
|
|
log.info('Saving dataframe to CSV file')
|
|
full_dest_path = file_dest_w_subdir+'.csv'
|
|
filename_w_ext = filename+'.csv'
|
|
tmp_file_path = os.path.join(subdir_path,filename_w_ext)
|
|
data_dataframe.to_csv(full_dest_path, columns=column_name_li, index=False)
|
|
elif export_type == 'Excel':
|
|
log.info('Saving dataframe to Excel file')
|
|
full_dest_path = file_dest_w_subdir+'.xlsx'
|
|
filename_w_ext = filename+'.xlsx'
|
|
tmp_file_path = os.path.join(subdir_path,filename_w_ext)
|
|
data_dataframe.to_excel(full_dest_path, columns=column_name_li, index=False) # sheet_name='Sheet_name_1'
|
|
except:
|
|
log.exception('Something went wrong while trying to save the export file.')
|
|
return False
|
|
|
|
log.info(f'Temp File Path: {tmp_file_path}')
|
|
|
|
return tmp_file_path # True
|
|
# ### END ### API Lib General ### create_export() ###
|
|
|
|
# ### BEGIN ### API Lib General ### return_full_tmp_path() ###
|
|
# This is for using with return FileResponse(path=full_tmp_path, filename=filename)
|
|
# Updated 2022-04-22
|
|
@logger_reset
|
|
def return_full_tmp_path(
|
|
full_tmp_path: str = None,
|
|
subdir_path: str = None,
|
|
filename: str = None,
|
|
) -> bool|str:
|
|
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
|
|
log.debug(locals())
|
|
|
|
hosted_tmp_path = settings.PATH_HOSTED_TMP_ROOT
|
|
# hosted_tmp_path = 'admin/temp'
|
|
log.info(f'Hosted Temp Path: {hosted_tmp_path}')
|
|
|
|
if full_tmp_path:
|
|
file_dest = os.path.join(hosted_tmp_path, full_tmp_path)
|
|
return file_dest
|
|
elif subdir_path and filename:
|
|
subdirectory_dest = os.path.join(hosted_tmp_path, subdir_path)
|
|
log.debug(subdirectory_dest)
|
|
pathlib.Path(subdirectory_dest).mkdir(parents=True, exist_ok=True)
|
|
file_dest_w_subdir = os.path.join(subdirectory_dest, filename)
|
|
log.info(f'File Dest With Subdir: {file_dest_w_subdir}')
|
|
|
|
return file_dest_w_subdir
|
|
else:
|
|
return False
|
|
# ### END ### API Lib General ### return_full_tmp_path() ###
|
|
|
|
|
|
# ### BEGIN ### API Lib General ### send_email() ###
|
|
# Updated 2021-12-02
|
|
@logger_reset
|
|
def send_email(
|
|
from_email: str,
|
|
to_email: str,
|
|
subject: str,
|
|
body_html: str,
|
|
from_name: str = '',
|
|
reply_to_email: str = '',
|
|
reply_to_name: str = '',
|
|
to_name: str = '',
|
|
cc_email: str = '',
|
|
cc_name: str = '',
|
|
bcc_email: str = '',
|
|
bcc_name: str = '',
|
|
body_text: str = '',
|
|
):
|
|
log.setLevel(logging.INFO) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
|
|
log.debug(locals())
|
|
|
|
message = EmailMessage()
|
|
if subject:
|
|
message['Subject'] = subject
|
|
else:
|
|
return False
|
|
if from_email:
|
|
#message['From'] = Address(display_name=from_name, username=from_email.split('@')[0], domain=from_email.split('@')[1])
|
|
message['From'] = Address(display_name=from_name, addr_spec=from_email)
|
|
else:
|
|
return False
|
|
if reply_to_email:
|
|
message['Reply-To'] = Address(display_name=reply_to_name, addr_spec=reply_to_email)
|
|
if to_email:
|
|
message['To'] = Address(display_name=to_name, addr_spec=to_email)
|
|
else:
|
|
return False
|
|
if cc_email:
|
|
message['Cc'] = Address(display_name=cc_name, addr_spec=cc_email)
|
|
if bcc_email:
|
|
message['Bcc'] = Address(display_name=bcc_name, addr_spec=bcc_email)
|
|
|
|
html_version = """\
|
|
<html>
|
|
<body>
|
|
<div>
|
|
"""+body_html+"""
|
|
</div>
|
|
</body>
|
|
</html>
|
|
"""
|
|
|
|
if body_text:
|
|
text_version = body_text
|
|
else:
|
|
text_version = html2text.html2text(html_version)
|
|
|
|
message.set_content(text_version)
|
|
|
|
message.add_alternative(html_version, subtype='html')
|
|
|
|
log.info('Sending email...')
|
|
log.debug(settings.SMTP)
|
|
|
|
log.info(f'Subject: {subject}')
|
|
log.info(f'From: {from_email} Reply To: {reply_to_email} To: {to_email} CC: {cc_email} BCC: {bcc_email}')
|
|
|
|
log.debug('Message:')
|
|
log.debug(message.as_string())
|
|
|
|
log.info('Creating SMTP SSL connection...')
|
|
context = ssl.create_default_context()
|
|
|
|
log.info('SMTP configuration, connect, and send')
|
|
try:
|
|
with smtplib.SMTP_SSL(settings.SMTP['server_name'], settings.SMTP['server_port'], context=context) as server:
|
|
server.login(settings.SMTP['username'], settings.SMTP['password'])
|
|
server.send_message(message)
|
|
log.info('Email sent')
|
|
return True
|
|
except:
|
|
#except SMTPException:
|
|
log.error('Error: unable to send email')
|
|
return False
|
|
# ### END ### API Lib General ### send_email() ###
|