diff --git a/app/lib_email.py b/app/lib_email.py new file mode 100644 index 0000000..b2bdcb8 --- /dev/null +++ b/app/lib_email.py @@ -0,0 +1,168 @@ +import html2text +import smtplib, ssl +import logging +from email.message import EmailMessage +from email.headerregistry import Address +from typing import Optional + +from app.log import logger_reset +from app.config import settings + +log = logging.getLogger(__name__) + +# ### BEGIN ### API Lib Email ### send_email() ### +# Moved from lib_general.py 2026-01-07 +@logger_reset +def send_email( + from_email: str, + to_email: str, + subject: str, + body_html: str, + + from_name: str = '', + reply_to_email: str = '', + reply_to_name: str = '', + to_name: str = '', + cc_email: str = '', + cc_name: str = '', + bcc_email: str = '', + bcc_name: str = '', + body_text: str = '', + + test: bool = False, + log_lvl: int = logging.WARNING, # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL + ): + log.setLevel(log_lvl) + log.debug(locals()) + + if test: + log.setLevel(logging.DEBUG) + log.debug('[TESTING] Running with send_email() in TEST mode') + + message = EmailMessage() + if subject: + message['Subject'] = subject + else: + return False + + if from_email and from_name: + try: + message['From'] = Address(display_name=from_name, addr_spec=from_email) + except Exception as e: + log.exception('**** *** ** * ### BEGIN ### Exception Happened: Returning False * ** *** ****') + return False + elif from_email: + try: + message['From'] = Address(display_name=from_email, addr_spec=from_email) + except Exception as e: + log.exception('**** *** ** * ### BEGIN ### Exception Happened: Returning False * ** *** ****') + return False + else: + return False + + if reply_to_email and reply_to_name: + try: + message['Reply-To'] = Address(display_name=reply_to_name, addr_spec=reply_to_email) + except Exception as e: + log.exception('**** *** ** * ### BEGIN ### Exception Happened: Returning False * ** *** ****') + return False + elif reply_to_email: + try: + message['Reply-To'] = Address(display_name=reply_to_email, addr_spec=reply_to_email) + except Exception as e: + log.exception('**** *** ** * ### BEGIN ### Exception Happened: Returning False * ** *** ****') + return False + + if to_email and to_name: + try: + message['To'] = Address(display_name=to_name, addr_spec=to_email) + except Exception as e: + log.exception('**** *** ** * ### BEGIN ### Exception Happened: Returning False * ** *** ****') + return False + elif to_email: + try: + message['To'] = Address(display_name=to_email, addr_spec=to_email) + except Exception as e: + log.exception('**** *** ** * ### BEGIN ### Exception Happened: Returning False * ** *** ****') + return False + else: + return False + + if cc_email and cc_name: + try: + message['Cc'] = Address(display_name=cc_name, addr_spec=cc_email) + except Exception as e: + log.exception('**** *** ** * ### BEGIN ### Exception Happened: Returning False * ** *** ****') + return False + elif cc_email: + try: + message['Cc'] = Address(display_name=cc_email, addr_spec=cc_email) + except Exception as e: + log.exception('**** *** ** * ### BEGIN ### Exception Happened: Returning False * ** *** ****') + return False + + if bcc_email and bcc_name: + try: + message['Bcc'] = Address(display_name=bcc_name, addr_spec=bcc_email) + except Exception as e: + log.exception('**** *** ** * ### BEGIN ### Exception Happened: Returning False * ** *** ****') + return False + elif bcc_email: + try: + message['Bcc'] = Address(display_name=bcc_email, addr_spec=bcc_email) + except Exception as e: + log.exception('**** *** ** * ### BEGIN ### Exception Happened: Returning False * ** *** ****') + return False + + html_version = """ + +
+ +"""+body_html+""" + + + + """ + + if body_text: + text_version = body_text + else: + text_version = html2text.html2text(html_version) + + message.set_content(text_version) + message.add_alternative(html_version, subtype='html') + + log.info('Sending email...') + log.debug(settings.SMTP) + + log.info(f'Subject: {subject}') + log.info(f'From: {from_email} Reply To: {reply_to_email} To: {to_email} CC: {cc_email} BCC: {bcc_email}') + + log.debug('Message:') + log.debug(message.as_string()) + + log.info('Creating SMTP SSL connection...') + context = ssl.create_default_context() + + log.info('SMTP configuration, connect, and send') + log.info(f'Server: {settings.SMTP["server"]} Port: {settings.SMTP["port"]} Username: {settings.SMTP["username"]}') + + log.info('Trying smtplib.SMTP_SSL in send_email()...') + if test: + log.info('[TESTING] Email will NOT actually be sent! [TEST MODE]') + try: + with smtplib.SMTP_SSL(settings.SMTP['server'], settings.SMTP['port'], context=context) as server: + log.info('SMTP log in...') + log.debug(f'Server: {settings.SMTP["server"]} Port: {settings.SMTP["port"]} Username: {settings.SMTP["username"]} Password: {settings.SMTP["password"]}') + server.login(settings.SMTP['username'], settings.SMTP['password']) + log.info('SMTP send message...') + if not test: + log.info('Email sent! Returning True') + server.send_message(message) + else: + log.info('[TESTING] Email (NOT) sent! Returning True [TEST MODE]') + return True + except: + log.error('Error: Unable to send email. Returning False') + return False +# ### END ### API Lib Email ### send_email() ### diff --git a/app/lib_export.py b/app/lib_export.py new file mode 100644 index 0000000..cc5f325 --- /dev/null +++ b/app/lib_export.py @@ -0,0 +1,116 @@ +import os +import pandas +import pathlib +import logging +from typing import Optional, Union + +from app.log import logger_reset +from app.config import settings + +log = logging.getLogger(__name__) + +# ### BEGIN ### API Lib Export ### create_export_file() ### +# Moved from lib_general.py 2026-01-07 +@logger_reset +def create_export_file( + data_dict_list: list, + subdir_path: str, + filename: str, + column_name_li: list = [], + rm_id: bool = True, + export_type: str = 'CSV', # CSV, Excel + ) -> Union[bool, str]: + log.setLevel(logging.INFO) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL + log.debug(locals()) + + hosted_tmp_path = settings.FILES_PATH['hosted_tmp_root'] + log.info(f'Hosted Temp Path: {hosted_tmp_path}') + + subdirectory_dest = os.path.join(hosted_tmp_path, subdir_path) + log.debug(subdirectory_dest) + pathlib.Path(subdirectory_dest).mkdir(parents=True, exist_ok=True) + file_dest_w_subdir = os.path.join(subdirectory_dest, filename) + log.info(f'File Dest With Subdir: {file_dest_w_subdir}') + + if column_name_li: + log.info('Using column name list passed') + else: + log.info('Using an auto generated column name list') + column_name_li = list(data_dict_list[0].keys()) + log.debug(column_name_li) + + if rm_id: + for column_name in list(column_name_li): + if column_name.endswith('_id'): + column_name_li.remove(column_name) + log.info(f'Removing column name: {column_name}') + log.info(column_name_li) + + data_dataframe = pandas.DataFrame(data_dict_list) + log.debug(data_dataframe) + + missing_cols = [col for col in column_name_li if col not in data_dataframe.columns] + if missing_cols: + column_name_li = [col for col in column_name_li if col not in missing_cols] + + try: + if export_type == 'CSV': + log.info('Saving dataframe to CSV file') + full_dest_path = file_dest_w_subdir+'.csv' + filename_w_ext = filename+'.csv' + tmp_file_path = os.path.join(subdir_path,filename_w_ext) + data_dataframe.to_csv( + full_dest_path, + na_rep='NULL', + columns=column_name_li, + index=False, + ) + elif export_type == 'Excel': + log.info('Saving dataframe to Excel file') + full_dest_path = file_dest_w_subdir+'.xlsx' + filename_w_ext = filename+'.xlsx' + tmp_file_path = os.path.join(subdir_path,filename_w_ext) + data_dataframe.to_excel( + full_dest_path, + na_rep='NULL', + columns=column_name_li, + index=False, + ) + except: + log.exception('Something went wrong while trying to save the export file.') + return False + + log.info(f'Temp File Path: {tmp_file_path}') + + return tmp_file_path +# ### END ### API Lib Export ### create_export_file() ### + +# ### BEGIN ### API Lib Export ### return_full_tmp_path() ### +# This is for using with return FileResponse(path=full_tmp_path, filename=filename) +# Moved from lib_general.py 2026-01-07 +@logger_reset +def return_full_tmp_path( + full_tmp_path: str = None, + subdir_path: str = None, + filename: str = None, + ) -> Union[bool, str]: + log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL + log.debug(locals()) + + hosted_tmp_path = settings.FILES_PATH['hosted_tmp_root'] + log.info(f'Hosted Temp Path: {hosted_tmp_path}') + + if full_tmp_path: + file_dest = os.path.join(hosted_tmp_path, full_tmp_path) + return file_dest + elif subdir_path and filename: + subdirectory_dest = os.path.join(hosted_tmp_path, subdir_path) + log.debug(subdirectory_dest) + pathlib.Path(subdirectory_dest).mkdir(parents=True, exist_ok=True) + file_dest_w_subdir = os.path.join(subdirectory_dest, filename) + log.info(f'File Dest With Subdir: {file_dest_w_subdir}') + + return file_dest_w_subdir + else: + return False +# ### END ### API Lib Export ### return_full_tmp_path() ### diff --git a/app/lib_general.py b/app/lib_general.py index 29b39bd..34355d7 100644 --- a/app/lib_general.py +++ b/app/lib_general.py @@ -1,26 +1,21 @@ -import html2text, jwt, os, pandas, pathlib, time -# No longer needed here: datetime, pytz, redis -from passlib.hash import argon2 - -# Import smtplib for the actual sending function -import smtplib, ssl - -# Import the email package modules needed -from email.message import EmailMessage -from email.headerregistry import Address -# from email.utils import make_msgid - -from fastapi import APIRouter, Depends, Header, HTTPException, Response, status -# from pydantic import BaseModel, EmailStr, Field +import html2text, time from typing import Dict, List, Optional, Set, Union - import logging -from app.log import logger_reset -log = logging.getLogger(__name__) +from fastapi import Header, HTTPException, Response, status + +from app.log import logger_reset from app.config import settings from app.db_sql import redis_lookup_id_random, sql_select +from app.lib_email import send_email +from app.lib_export import create_export_file, return_full_tmp_path +from app.lib_jwt import sign_jwt, decode_jwt +from app.lib_hash import secure_hash_string, verify_secure_hash_string + +log = logging.getLogger(__name__) + + # ### BEGIN ### API Lib General ### async get_token_header() ### def get_token_header(x_token: str = Header(...)): @@ -267,388 +262,4 @@ def common_route_params_min( # ### END ### API Lib General ### async common_route_params_min() ### -def secure_hash_string(string: str): - string_hash = argon2.using(rounds=14, memory_cost=1536, parallelism=2).hash(string) - return string_hash - - -def verify_secure_hash_string(string: str, string_hash: str): - if argon2.verify(string, string_hash): - return True - else: - return False - - -# ### BEGIN ### API Lib General ### sign_jwt() ### -# Updated 2021-07-14 -@logger_reset -def sign_jwt( - secret_key: str, # Secret/Private/Password - ttl: int = 60, # Default to 60 seconds - max_renew: int = 0, # Default to 0 - public_key: str = None, # Will be part of the token. Use to look up secret when verifying.??? - account_id: str = None, - person_id: str = None, - user_id: str = None, - json_str: str = None, - b64_str: str = None, - ) -> Dict[str, str]: - log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL - log.debug(locals()) - - payload = { - 'iat': time.time(), # Issued at - 'eat': time.time() + ttl, # Expires at - 'max_renew': max_renew, # Number of times allowed to request renew without API secret key - 'public_key': public_key, # Use to lookup the secret/private/password key when verifying - 'account_id': account_id, - 'person_id': person_id, - 'user_id': user_id, - 'json_str': json_str, - 'b64_str': b64_str, - } - secret = secret_key - algorithm = 'HS256' - token = jwt.encode(payload, secret, algorithm=algorithm) - - log.debug(token) - - return token -# ### END ### API Lib General ### sign_jwt() ### - - -# ### BEGIN ### API Lib General ### decode_jwt() ### -# Updated 2021-07-14 -@logger_reset -def decode_jwt( - secret_key: str, - token: str, - ) -> dict: - log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL - log.debug(locals()) - - secret = secret_key - algorithm = 'HS256' - - try: - decoded_token = jwt.decode(token, secret, algorithms=[algorithm]) - log.debug(decoded_token) - if decoded_token['eat'] >= time.time(): return decoded_token - else: return False - except: - return None -# ### END ### API Lib General ### decode_jwt() ### - - -# ### BEGIN ### API Lib General ### create_export() ### -# Updated 2021-11-23 -@logger_reset -def create_export_file( - data_dict_list: list, - subdir_path: str, - filename: str, - column_name_li: list = [], - rm_id: bool = True, - export_type: str = 'CSV', # CSV, Excel - ) -> bool|str: - log.setLevel(logging.INFO) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL - log.debug(locals()) - - hosted_tmp_path = settings.FILES_PATH['hosted_tmp_root'] - # hosted_tmp_path = 'admin/temp' - log.info(f'Hosted Temp Path: {hosted_tmp_path}') - - subdirectory_dest = os.path.join(hosted_tmp_path, subdir_path) - log.debug(subdirectory_dest) - pathlib.Path(subdirectory_dest).mkdir(parents=True, exist_ok=True) - file_dest_w_subdir = os.path.join(subdirectory_dest, filename) - log.info(f'File Dest With Subdir: {file_dest_w_subdir}') - - if column_name_li: - log.info('Using column name list passed') - else: - log.info('Using an auto generated column name list') - column_name_li = list(data_dict_list[0].keys()) - log.debug(column_name_li) - - if rm_id: - for column_name in list(column_name_li): - # log.info(f'Checking column name: {column_name}') - if column_name.endswith('_id'): - column_name_li.remove(column_name) - log.info(f'Removing column name: {column_name}') - log.info(column_name_li) - - # column_name_li = ['order_line_id_random', 'order_id_random', 'product_id_random', 'product_type', 'product_name', 'quantity', 'amount', 'dollar_amount', 'message', 'person_id_random', 'person_given_name', 'person_family_name', 'person_display_name', 'person_full_name', 'person_contact_email', 'person_contact_cc_email', 'person_contact_address_name', 'person_contact_address_organization_name', 'person_contact_address_line_1', 'person_contact_address_line_2', 'person_contact_address_line_3', 'person_contact_address_city', 'person_contact_address_country_subdivision_code', 'person_contact_address_state_province', 'person_contact_address_postal_code', 'person_contact_address_country_alpha_2_code', 'person_contact_address_country_name', 'person_contact_address_country', 'order_status', 'order_created_on', 'order_updated_on'] - - data_dataframe = pandas.DataFrame(data_dict_list) - log.debug(data_dataframe) - - # Need to deal with this error when a field is empty (null/None) for every record in the export: KeyError("Not all names specified in 'columns' are found") - # There does not seem to be a Panda option to ignore missing columns - missing_cols = [col for col in column_name_li if col not in data_dataframe.columns] - if missing_cols: - # Remove the missing columns from the column list - column_name_li = [col for col in column_name_li if col not in missing_cols] - # raise KeyError(f"The following columns are not found in the DataFrame: {', '.join(missing_cols)}") - - - try: - if export_type == 'CSV': - log.info('Saving dataframe to CSV file') - full_dest_path = file_dest_w_subdir+'.csv' - filename_w_ext = filename+'.csv' - tmp_file_path = os.path.join(subdir_path,filename_w_ext) - data_dataframe.to_csv( - full_dest_path, - na_rep='NULL', - columns=column_name_li, - index=False, - # errors='ignore', - ) - elif export_type == 'Excel': - log.info('Saving dataframe to Excel file') - full_dest_path = file_dest_w_subdir+'.xlsx' - filename_w_ext = filename+'.xlsx' - tmp_file_path = os.path.join(subdir_path,filename_w_ext) - # This should ignore unknown columns - data_dataframe.to_excel( - full_dest_path, - na_rep='NULL', - columns=column_name_li, - index=False, - # engine='openpyxl', - # errors='ignore', - ) - except: - log.exception('Something went wrong while trying to save the export file.') - return False - - log.info(f'Temp File Path: {tmp_file_path}') - - return tmp_file_path # True -# ### END ### API Lib General ### create_export() ### - -# ### BEGIN ### API Lib General ### return_full_tmp_path() ### -# This is for using with return FileResponse(path=full_tmp_path, filename=filename) -# Updated 2022-04-22 -@logger_reset -def return_full_tmp_path( - full_tmp_path: str = None, - subdir_path: str = None, - filename: str = None, - ) -> bool|str: - log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL - log.debug(locals()) - - hosted_tmp_path = settings.FILES_PATH['hosted_tmp_root'] - # hosted_tmp_path = 'admin/temp' - log.info(f'Hosted Temp Path: {hosted_tmp_path}') - - if full_tmp_path: - file_dest = os.path.join(hosted_tmp_path, full_tmp_path) - return file_dest - elif subdir_path and filename: - subdirectory_dest = os.path.join(hosted_tmp_path, subdir_path) - log.debug(subdirectory_dest) - pathlib.Path(subdirectory_dest).mkdir(parents=True, exist_ok=True) - file_dest_w_subdir = os.path.join(subdirectory_dest, filename) - log.info(f'File Dest With Subdir: {file_dest_w_subdir}') - - return file_dest_w_subdir - else: - return False -# ### END ### API Lib General ### return_full_tmp_path() ### - - -# ### BEGIN ### API Lib General ### send_email() ### -# Updated 2021-12-02 -@logger_reset -def send_email( - from_email: str, - to_email: str, - subject: str, - body_html: str, - - from_name: str = '', - reply_to_email: str = '', - reply_to_name: str = '', - to_name: str = '', - cc_email: str = '', - cc_name: str = '', - bcc_email: str = '', - bcc_name: str = '', - body_text: str = '', - - test: bool = False, - log_lvl: int = logging.WARNING, # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL - ): - log.setLevel(log_lvl) - log.debug(locals()) - - if test: - log.setLevel(logging.DEBUG) - log.debug('[TESTING] Running with send_email() in TEST mode') - - message = EmailMessage() - if subject: - message['Subject'] = subject - else: - return False - - if from_email and from_name: - #message['From'] = Address(display_name=from_name, username=from_email.split('@')[0], domain=from_email.split('@')[1]) - try: - message['From'] = Address(display_name=from_name, addr_spec=from_email) - except Exception as e: - log.exception('**** *** ** * ### BEGIN ### Exception Happened: Returning False * ** *** ****') - return False - elif from_email: - try: - message['From'] = Address(display_name=from_email, addr_spec=from_email) - except Exception as e: - log.exception('**** *** ** * ### BEGIN ### Exception Happened: Returning False * ** *** ****') - return False - else: - return False - - if reply_to_email and reply_to_name: - try: - message['Reply-To'] = Address(display_name=reply_to_name, addr_spec=reply_to_email) - except Exception as e: - log.exception('**** *** ** * ### BEGIN ### Exception Happened: Returning False * ** *** ****') - return False - elif reply_to_email: - try: - message['Reply-To'] = Address(display_name=reply_to_email, addr_spec=reply_to_email) - except Exception as e: - log.exception('**** *** ** * ### BEGIN ### Exception Happened: Returning False * ** *** ****') - return False - - if to_email and to_name: - try: - message['To'] = Address(display_name=to_name, addr_spec=to_email) - except Exception as e: - log.exception('**** *** ** * ### BEGIN ### Exception Happened: Returning False * ** *** ****') - return False - elif to_email: - try: - message['To'] = Address(display_name=to_email, addr_spec=to_email) - except Exception as e: - log.exception('**** *** ** * ### BEGIN ### Exception Happened: Returning False * ** *** ****') - return False - else: - return False - - if cc_email and cc_name: - try: - message['Cc'] = Address(display_name=cc_name, addr_spec=cc_email) - except Exception as e: - log.exception('**** *** ** * ### BEGIN ### Exception Happened: Returning False * ** *** ****') - return False - elif cc_email: - try: - message['Cc'] = Address(display_name=cc_email, addr_spec=cc_email) - except Exception as e: - log.exception('**** *** ** * ### BEGIN ### Exception Happened: Returning False * ** *** ****') - return False - - if bcc_email and bcc_name: - try: - message['Bcc'] = Address(display_name=bcc_name, addr_spec=bcc_email) - except Exception as e: - log.exception('**** *** ** * ### BEGIN ### Exception Happened: Returning False * ** *** ****') - return False - elif bcc_email: - try: - message['Bcc'] = Address(display_name=bcc_email, addr_spec=bcc_email) - except Exception as e: - log.exception('**** *** ** * ### BEGIN ### Exception Happened: Returning False * ** *** ****') - return False - - html_version = """\ - - - -"""+body_html+""" - - - - """ - - # html_version = """\ - # - # - #