Files
OSIT-AE-API-FastAPI/app/lib_general.py

159 lines
5.5 KiB
Python

from __future__ import annotations
import datetime, jwt, os, pandas, pathlib, pytz, redis, time
from passlib.hash import argon2
from fastapi import APIRouter, Depends, Header, HTTPException, Response, status
from pydantic import BaseModel, EmailStr, Field
from typing import Dict, List, Optional, Set, Union
from app.log import log, logging, logger_reset
from app.db_sql import redis_lookup_id_random, sql_select
# ### BEGIN ### API Lib General ### async get_token_header() ###
async def get_token_header(x_token:str = Header(...)):
if x_token != 'fake-super-secret-token':
raise HTTPException(status_code=400, detail='X-Token header invalid')
# ### END ### API Lib General ### async get_token_header() ###
# ### BEGIN ### API Lib General ### async get_account_header() ###
# Updated 2021-08-23
async def get_account_header(x_account_id:str = Header(...)) -> dict:
log.setLevel(logging.WARNING) # DEBUG, INFO, WARN, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
if len(x_account_id):
log.info(f'The x-account-id header has a value. x-account-id: {x_account_id}')
if account_id := redis_lookup_id_random(table_name='account', record_id_random=x_account_id):
log.setLevel(logging.DEBUG)
log.info('Found the account_id with the account_id_random value: '+x_account_id)
account = { 'id': account_id, 'id_random': x_account_id }
else:
log.warning('The x-account-id Account ID was not found or it was invalid...')
#raise HTTPException(status_code=500)
raise HTTPException(status_code=400) # or 404?
#return False
elif x_account_id == '':
log.info('The x-account-id header was empty.')
account = { 'id': None, 'id_random': None }
#account = { 'id': 0, 'id_random': 'abcdef123456' }
return account
# ### END ### API Lib General ### async get_account_header() ###
def secure_hash_string(string:str):
string_hash = argon2.using(rounds=14, memory_cost=1536, parallelism=2).hash(string)
return string_hash
def verify_secure_hash_string(string:str, string_hash:str):
if argon2.verify(string, string_hash):
return True
else:
return False
# ### BEGIN ### API Lib General ### sign_jwt() ###
# Updated 2021-07-14
@logger_reset
def sign_jwt(
secret_key: str, # Secret/Private/Password
public_key: str, # Will be part of the token. Use to look up secret when verifying.
ttl: int = 60, # Default to 60 seconds
max_renew: int = 0, # Default to 0
account_id: str = None,
person_id: str = None,
user_id: str = None,
) -> Dict[str, str]:
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
payload = {
'iat': time.time(), # Issued at
'eat': time.time() + ttl, # Expires at
'max_renew': max_renew, # Number of times allowed to request renew without API secret key
'public_key': public_key, # Use to lookup the secret/private/password key when verifying
'account_id': account_id,
'person_id': person_id,
'user_id': user_id,
}
secret = secret_key
algorithm = 'HS256'
token = jwt.encode(payload, secret, algorithm=algorithm)
log.debug(token)
return token
# ### END ### API Lib General ### sign_jwt() ###
# ### BEGIN ### API Lib General ### decode_jwt() ###
# Updated 2021-07-14
@logger_reset
def decode_jwt(
secret_key: str,
token: str,
) -> dict:
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
secret = secret_key
algorithm = 'HS256'
try:
decoded_token = jwt.decode(token, secret, algorithms=[algorithm])
log.debug(decoded_token)
if decoded_token['eat'] >= time.time(): return decoded_token
else: return False
except:
return None
# ### END ### API Lib General ### decode_jwt() ###
# ### BEGIN ### API Lib General ### create_export() ###
# Updated 2021-07-14
@logger_reset
def create_export_file(
data_dict_list: list,
subdir_path: str,
filename: str,
export_type: str = 'CSV', # CSV, Excel
) -> bool:
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
hosted_temp_path = 'admin/temp'
log.info(f'Hosted Temp Path: {hosted_temp_path}')
subdirectory_dest = os.path.join(hosted_temp_path, subdir_path)
log.debug(subdirectory_dest)
pathlib.Path(subdirectory_dest).mkdir(parents=True, exist_ok=True)
file_dest_w_subdir = os.path.join(subdirectory_dest, filename)
log.info(f'File Dest With Subdir: {file_dest_w_subdir}')
column_name_li = data_dict_list[0].keys()
log.debug(column_name_li)
data_dataframe = pandas.DataFrame(data_dict_list)
log.debug(data_dataframe)
try:
if export_type == 'CSV':
log.info('Saving dataframe to CSV file')
full_dest_path = file_dest_w_subdir+'.csv'
data_dataframe.to_csv(full_dest_path, columns=column_name_li, index=False)
elif export_type == 'Excel':
log.info('Saving dataframe to Excel file')
full_dest_path = file_dest_w_subdir+'.xlsx'
data_dataframe.to_excel(full_dest_path, columns=column_name_li, index=False) # sheet_name='Sheet_name_1'
except:
log.exception('Something went wrong while trying to save the export file.')
return False
return True
# ### END ### API Lib General ### create_export() ###