# from __future__ import annotations import datetime, html2text, jwt, os, pandas, pathlib, pytz, redis, time from passlib.hash import argon2 # Import smtplib for the actual sending function import smtplib, ssl # Import the email package modules needed from email.message import EmailMessage from email.headerregistry import Address from email.utils import make_msgid from fastapi import APIRouter, Depends, Header, HTTPException, Response, status from pydantic import BaseModel, EmailStr, Field from typing import Dict, List, Optional, Set, Union from app.log import log, logging, logger_reset from app.config import settings from app.db_sql import redis_lookup_id_random, sql_select # ### BEGIN ### API Lib General ### async get_token_header() ### def get_token_header(x_token: str = Header(...)): if x_token != 'fake-super-secret-token': raise HTTPException(status_code=400, detail='X-Token header invalid') # ### END ### API Lib General ### async get_token_header() ### # ### BEGIN ### API Lib General ### class Common_Route_Params ### # Updated 2023-01-30 class Common_Route_Params_No_Account_ID: def __init__( self, x_account_id: int|None = None, x_account_id_random: str|None = None, x_no_account_id_token: str|None = None, enabled: str = 'enabled', limit: int = 10, offset: int = 0, by_alias: bool = True, exclude_unset: bool = False, response = None, ): self.x_account_id = x_account_id self.x_account_id_random = x_account_id_random self.x_no_account_id_token = x_no_account_id_token self.enabled = enabled self.limit = limit self.offset = offset self.by_alias = by_alias self.exclude_unset = exclude_unset self.response = response # log.debug(response) # ### END ### API Lib General ### class Common_Route_Params ### # ### BEGIN ### API Lib General ### common_route_params() ### # Updated 2023-01-30 @logger_reset # This breaks things for some reason when the function is async. Do not use async def common_route_params()! def common_route_params_no_account_id( x_account_id: str = Header(None, min_length=11, max_length=22), enabled: str = 'enabled', # all, enabled, disabled limit: int = 100, offset: int = 0, by_alias: bool = True, exclude_none: Optional[bool] = True, exclude_unset: bool = False, # NOTE: Uncommenting either exclude or include breaks the JSON body format. I do not know why? Should be: {} Becomes this: {"obj_name": {"data_name": "data_value"}} -STI 2022-01-05 # exclude: Optional[list] = [], # Leaving this and include commented out # include: Optional[list] = [], # Leaving this and exclude commented out response: Response = Response, log_lvl: int = logging.WARNING, # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL ) -> Common_Route_Params_No_Account_ID: log.setLevel(log_lvl) log.debug(locals()) log.info(f'Setting commons values: x_account_id, x_account_id_random, limit, offset, enabled, by_alias, exclude_unset, response') x_account_id_random = x_account_id if x_account_id := redis_lookup_id_random(table_name='account', record_id_random=x_account_id): log.info(f'Found the x-account-id header with the value: {x_account_id}') elif x_account_id is None: log.warning(f'No x-account-id header value passed') else: log.warning(f'The x-account-id header was found, but the Account ID was not found or is not valid. Account ID: {x_account_id}') raise HTTPException(status_code=403, detail='The x-account-id Account ID was not found.') # Forbidden commons = Common_Route_Params_No_Account_ID( x_account_id=x_account_id, x_account_id_random=x_account_id_random, limit=limit, offset=offset, enabled=enabled, by_alias=by_alias, exclude_unset=exclude_unset, response=response ) log.debug(commons) return commons # ### END ### API Lib General ### async common_route_params() ### # ### BEGIN ### API Lib General ### class Common_Route_Params ### # Updated 2022-01-05 class Common_Route_Params: def __init__( self, x_account_id: int, x_account_id_random: str, x_no_account_id_token: str|None = None, enabled: str = 'enabled', limit: int = 10, offset: int = 0, by_alias: bool = True, exclude_unset: bool = False, response = None, ): self.x_account_id = x_account_id self.x_account_id_random = x_account_id_random self.x_no_account_id_token = x_no_account_id_token self.enabled = enabled self.limit = limit self.offset = offset self.by_alias = by_alias self.exclude_unset = exclude_unset self.response = response # log.debug(response) # ### END ### API Lib General ### class Common_Route_Params ### # ### BEGIN ### API Lib General ### common_route_params() ### # Updated 2022-02-15 @logger_reset # This breaks things for some reason when the function is async. Do not use async def common_route_params()! def common_route_params( # x_account_id: str = Header(..., min_length=11, max_length=22), # NOTE WARNING: Commented out 2023-08-17 x_account_id: str = Header(None, min_length=11, max_length=22), # NOTE WARNING: Changed to this 2023-08-17 x_no_account_id: str = Header(None, min_length=11, max_length=22), # NOTE WARNING: Changed to this 2023-08-17 # x_aether_api_key: Optional[str] = Header(..., min_length=11, max_length=22), # x_aether_api_token: Optional[str] = Header(..., min_length=11, max_length=22), # x_aether_jwt_token: Optional[str] = Header(..., min_length=11, max_length=50), x_no_account_id_token: str|None = None, # NOTE: Not a header value! Added 2023-08-17 enabled: str = 'enabled', # all, enabled, disabled limit: int = 100, offset: int = 0, by_alias: bool = True, exclude_unset: bool = False, exclude_defaults: Optional[bool] = False, exclude_none: Optional[bool] = False, # NOTE: Uncommenting either exclude or include breaks the JSON body format. I do not know why? Should be: {} Becomes this: {"obj_name": {"data_name": "data_value"}} -STI 2022-01-05 # exclude: Optional[list] = [], # Leaving this and include commented out # include: Optional[list] = [], # Leaving this and exclude commented out response: Response = Response, log_lvl: int = logging.WARNING, # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL ) -> Common_Route_Params|Common_Route_Params_No_Account_ID: log.setLevel(log_lvl) log.debug(locals()) log.info(f'Setting commons values: x_account_id, x_account_id_random, limit, offset, enabled, by_alias, exclude_unset, response') x_account_id_random = x_account_id if x_account_id: if x_account_id := redis_lookup_id_random(table_name='account', record_id_random=x_account_id): log.info(f'Found the x-account-id header with the value: {x_account_id}') else: log.warning(f'The x-account-id header was found, but the Account ID was not found or is not valid. Account ID: {x_account_id}') raise HTTPException(status_code=403, detail='The x-account-id Account ID was not found.') # Forbidden elif x_no_account_id and len(x_no_account_id) > 10: log.warning(f'Found the x_no_account_id header param with the value: {x_no_account_id}') x_account_id = None x_account_id_random = '--- NOT SET ---' elif x_no_account_id_token and len(x_no_account_id_token) > 10: # NOTE: Not a header value! # NOTE WARNING: This token should be verified and able to be disabled quickly. log.warning(f'Found the x_no_account_id_token URL param with the value: {x_no_account_id_token}') if x_account_id := redis_lookup_id_random(table_name='account', record_id_random=x_no_account_id_token): log.info(f'Found the x-account-id header with the value: {x_account_id}') x_account_id_random = x_no_account_id_token else: x_account_id = 0 x_account_id_random = '' x_account_id = 0 x_account_id_random = '--- NOT SET ---' else: log.warning(f'The x-account-id and x-no-account-id-token headers were not found.') raise HTTPException(status_code=403, detail='The x-account-id and x-no-account-id-token headers were not found.') # Forbidden if x_account_id: commons = Common_Route_Params( x_account_id=x_account_id, x_account_id_random=x_account_id_random, x_no_account_id_token=x_no_account_id_token, limit=limit, offset=offset, enabled=enabled, by_alias=by_alias, exclude_unset=exclude_unset, response=response ) else: commons = Common_Route_Params_No_Account_ID( x_account_id=None, x_account_id_random=None, x_no_account_id_token=x_no_account_id_token, limit=limit, offset=offset, enabled=enabled, by_alias=by_alias, exclude_unset=exclude_unset, response=response ) log.debug(commons) return commons # ### END ### API Lib General ### async common_route_params() ### # ### BEGIN ### API Lib General ### class Common_Route_Params_Min ### # Updated 2022-01-05 # NOTE: Is this essentially the same as Common_Route_Params_No_Account_ID above? class Common_Route_Params_Min: def __init__( self, x_account_id: int = None, x_account_id_random: str = None, enabled: str = 'enabled', limit: int = 10, offset: int = 0, by_alias: bool = True, exclude_unset: bool = False, response = None, ): self.x_account_id = x_account_id self.x_account_id_random = x_account_id_random self.enabled = enabled self.limit = limit self.offset = offset self.by_alias = by_alias self.exclude_unset = exclude_unset self.response = response # log.debug(response) # ### END ### API Lib General ### class Common_Route_Params_Min ### # ### BEGIN ### API Lib General ### common_route_params_min() ### # Updated 2022-02-15 # NOTE: Is this essentially the same as common_route_params_no_account_id above? @logger_reset # This breaks things for some reason when the function is async. Do not use async def common_route_params()! def common_route_params_min( x_account_id: str = Header(None, min_length=11, max_length=22), enabled: str = 'enabled', # all, enabled, disabled limit: int = 100, offset: int = 0, by_alias: bool = True, exclude_none: Optional[bool] = True, exclude_unset: bool = False, # NOTE: Uncommenting either exclude or include breaks the JSON body format. I do not know why? Should be: {} Becomes this: {"obj_name": {"data_name": "data_value"}} -STI 2022-01-05 # exclude: Optional[list] = [], # Leaving this and include commented out # include: Optional[list] = [], # Leaving this and exclude commented out response: Response = Response, log_lvl: int = logging.WARNING, # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL ) -> Common_Route_Params: log.setLevel(log_lvl) log.debug(locals()) log.info(f'Setting commons values: x_account_id, x_account_id_random, limit, offset, enabled, by_alias, exclude_unset, response') log.debug(f'X Account ID: {x_account_id}') if x_account_id: x_account_id_random = x_account_id if x_account_id := redis_lookup_id_random(table_name='account', record_id_random=x_account_id): log.info(f'Found the x-account-id header with the value: {x_account_id}') else: log.warning(f'The x-account-id header was found, but the Account ID was not found or is not valid. Account ID: {x_account_id}') raise HTTPException(status_code=403, detail='The x-account-id Account ID was not found.') # Forbidden else: x_account_id_random = None commons = Common_Route_Params_Min( x_account_id=x_account_id, x_account_id_random=x_account_id_random, limit=limit, offset=offset, enabled=enabled, by_alias=by_alias, exclude_unset=exclude_unset, response=response ) log.debug(commons) return commons # ### END ### API Lib General ### async common_route_params_min() ### def secure_hash_string(string: str): string_hash = argon2.using(rounds=14, memory_cost=1536, parallelism=2).hash(string) return string_hash def verify_secure_hash_string(string: str, string_hash: str): if argon2.verify(string, string_hash): return True else: return False # ### BEGIN ### API Lib General ### sign_jwt() ### # Updated 2021-07-14 @logger_reset def sign_jwt( secret_key: str, # Secret/Private/Password ttl: int = 60, # Default to 60 seconds max_renew: int = 0, # Default to 0 public_key: str = None, # Will be part of the token. Use to look up secret when verifying.??? account_id: str = None, person_id: str = None, user_id: str = None, json_str: str = None, b64_str: str = None, ) -> Dict[str, str]: log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) payload = { 'iat': time.time(), # Issued at 'eat': time.time() + ttl, # Expires at 'max_renew': max_renew, # Number of times allowed to request renew without API secret key 'public_key': public_key, # Use to lookup the secret/private/password key when verifying 'account_id': account_id, 'person_id': person_id, 'user_id': user_id, 'json_str': json_str, 'b64_str': b64_str, } secret = secret_key algorithm = 'HS256' token = jwt.encode(payload, secret, algorithm=algorithm) log.debug(token) return token # ### END ### API Lib General ### sign_jwt() ### # ### BEGIN ### API Lib General ### decode_jwt() ### # Updated 2021-07-14 @logger_reset def decode_jwt( secret_key: str, token: str, ) -> dict: log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) secret = secret_key algorithm = 'HS256' try: decoded_token = jwt.decode(token, secret, algorithms=[algorithm]) log.debug(decoded_token) if decoded_token['eat'] >= time.time(): return decoded_token else: return False except: return None # ### END ### API Lib General ### decode_jwt() ### # ### BEGIN ### API Lib General ### create_export() ### # Updated 2021-11-23 @logger_reset def create_export_file( data_dict_list: list, subdir_path: str, filename: str, column_name_li: list = [], rm_id: bool = True, export_type: str = 'CSV', # CSV, Excel ) -> bool|str: log.setLevel(logging.INFO) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) hosted_tmp_path = settings.FILES_PATH['hosted_tmp_root'] # hosted_tmp_path = 'admin/temp' log.info(f'Hosted Temp Path: {hosted_tmp_path}') subdirectory_dest = os.path.join(hosted_tmp_path, subdir_path) log.debug(subdirectory_dest) pathlib.Path(subdirectory_dest).mkdir(parents=True, exist_ok=True) file_dest_w_subdir = os.path.join(subdirectory_dest, filename) log.info(f'File Dest With Subdir: {file_dest_w_subdir}') if column_name_li: log.info('Using column name list passed') else: log.info('Using an auto generated column name list') column_name_li = list(data_dict_list[0].keys()) log.debug(column_name_li) if rm_id: for column_name in list(column_name_li): # log.info(f'Checking column name: {column_name}') if column_name.endswith('_id'): column_name_li.remove(column_name) log.info(f'Removing column name: {column_name}') log.info(column_name_li) # column_name_li = ['order_line_id_random', 'order_id_random', 'product_id_random', 'product_type', 'product_name', 'quantity', 'amount', 'dollar_amount', 'message', 'person_id_random', 'person_given_name', 'person_family_name', 'person_display_name', 'person_full_name', 'person_contact_email', 'person_contact_cc_email', 'person_contact_address_name', 'person_contact_address_organization_name', 'person_contact_address_line_1', 'person_contact_address_line_2', 'person_contact_address_line_3', 'person_contact_address_city', 'person_contact_address_country_subdivision_code', 'person_contact_address_state_province', 'person_contact_address_postal_code', 'person_contact_address_country_alpha_2_code', 'person_contact_address_country_name', 'person_contact_address_country', 'order_status', 'order_created_on', 'order_updated_on'] data_dataframe = pandas.DataFrame(data_dict_list) log.debug(data_dataframe) try: if export_type == 'CSV': log.info('Saving dataframe to CSV file') full_dest_path = file_dest_w_subdir+'.csv' filename_w_ext = filename+'.csv' tmp_file_path = os.path.join(subdir_path,filename_w_ext) data_dataframe.to_csv( full_dest_path, # na_rep='NULL', columns=column_name_li, index=False, # errors='ignore', ) elif export_type == 'Excel': log.info('Saving dataframe to Excel file') full_dest_path = file_dest_w_subdir+'.xlsx' filename_w_ext = filename+'.xlsx' tmp_file_path = os.path.join(subdir_path,filename_w_ext) # This should ignore unknown columns data_dataframe.to_excel( full_dest_path, # na_rep='NULL', columns=column_name_li, index=False, # engine='openpyxl', # errors='ignore', ) except: log.exception('Something went wrong while trying to save the export file.') return False log.info(f'Temp File Path: {tmp_file_path}') return tmp_file_path # True # ### END ### API Lib General ### create_export() ### # ### BEGIN ### API Lib General ### return_full_tmp_path() ### # This is for using with return FileResponse(path=full_tmp_path, filename=filename) # Updated 2022-04-22 @logger_reset def return_full_tmp_path( full_tmp_path: str = None, subdir_path: str = None, filename: str = None, ) -> bool|str: log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) hosted_tmp_path = settings.FILES_PATH['hosted_tmp_root'] # hosted_tmp_path = 'admin/temp' log.info(f'Hosted Temp Path: {hosted_tmp_path}') if full_tmp_path: file_dest = os.path.join(hosted_tmp_path, full_tmp_path) return file_dest elif subdir_path and filename: subdirectory_dest = os.path.join(hosted_tmp_path, subdir_path) log.debug(subdirectory_dest) pathlib.Path(subdirectory_dest).mkdir(parents=True, exist_ok=True) file_dest_w_subdir = os.path.join(subdirectory_dest, filename) log.info(f'File Dest With Subdir: {file_dest_w_subdir}') return file_dest_w_subdir else: return False # ### END ### API Lib General ### return_full_tmp_path() ### # ### BEGIN ### API Lib General ### send_email() ### # Updated 2021-12-02 @logger_reset def send_email( from_email: str, to_email: str, subject: str, body_html: str, from_name: str = '', reply_to_email: str = '', reply_to_name: str = '', to_name: str = '', cc_email: str = '', cc_name: str = '', bcc_email: str = '', bcc_name: str = '', body_text: str = '', test: bool = False ): log.setLevel(logging.INFO) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) message = EmailMessage() if subject: message['Subject'] = subject else: return False if from_email and from_name: #message['From'] = Address(display_name=from_name, username=from_email.split('@')[0], domain=from_email.split('@')[1]) try: message['From'] = Address(display_name=from_name, addr_spec=from_email) except Exception as e: log.exception('**** *** ** * ### BEGIN ### Exception Happened: Returning False * ** *** ****') return False elif from_email: try: message['From'] = Address(display_name=from_email, addr_spec=from_email) except Exception as e: log.exception('**** *** ** * ### BEGIN ### Exception Happened: Returning False * ** *** ****') return False else: return False if reply_to_email and reply_to_name: try: message['Reply-To'] = Address(display_name=reply_to_name, addr_spec=reply_to_email) except Exception as e: log.exception('**** *** ** * ### BEGIN ### Exception Happened: Returning False * ** *** ****') return False elif reply_to_email: try: message['Reply-To'] = Address(display_name=reply_to_email, addr_spec=reply_to_email) except Exception as e: log.exception('**** *** ** * ### BEGIN ### Exception Happened: Returning False * ** *** ****') return False if to_email and to_name: try: message['To'] = Address(display_name=to_name, addr_spec=to_email) except Exception as e: log.exception('**** *** ** * ### BEGIN ### Exception Happened: Returning False * ** *** ****') return False elif to_email: try: message['To'] = Address(display_name=to_email, addr_spec=to_email) except Exception as e: log.exception('**** *** ** * ### BEGIN ### Exception Happened: Returning False * ** *** ****') return False else: return False if cc_email and cc_name: try: message['Cc'] = Address(display_name=cc_name, addr_spec=cc_email) except Exception as e: log.exception('**** *** ** * ### BEGIN ### Exception Happened: Returning False * ** *** ****') return False elif cc_email: try: message['Cc'] = Address(display_name=cc_email, addr_spec=cc_email) except Exception as e: log.exception('**** *** ** * ### BEGIN ### Exception Happened: Returning False * ** *** ****') return False if bcc_email and bcc_name: try: message['Bcc'] = Address(display_name=bcc_name, addr_spec=bcc_email) except Exception as e: log.exception('**** *** ** * ### BEGIN ### Exception Happened: Returning False * ** *** ****') return False elif bcc_email: try: message['Bcc'] = Address(display_name=bcc_email, addr_spec=bcc_email) except Exception as e: log.exception('**** *** ** * ### BEGIN ### Exception Happened: Returning False * ** *** ****') return False html_version = """\
"""+body_html+""" """ # html_version = """\ # # #