Files
OSIT-AE-API-FastAPI/app/lib_general.py
2021-03-18 22:34:35 +00:00

254 lines
12 KiB
Python

from __future__ import annotations
import datetime, pytz, redis
from passlib.hash import argon2
#from datetime import datetime, time, timedelta
from fastapi import APIRouter, Depends, Header, HTTPException, status
from pydantic import BaseModel, EmailStr, Field
from typing import Dict, List, Optional, Set, Union
from .log import *
from .db_sql import sql_select
# ### BEGIN ### API Lib General ### async get_token_header() ###
async def get_token_header(x_token:str = Header(...)):
if x_token != 'fake-super-secret-token':
raise HTTPException(status_code=400, detail='X-Token header invalid')
# ### END ### API Lib General ### async get_token_header() ###
# ### BEGIN ### API Lib General ### async get_account_header() ###
async def get_account_header(x_account_id:str = Header(...)):
log.setLevel(logging.WARNING) # DEBUG, INFO, WARN, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
if len(x_account_id):
log.info('The x-account-id header has a value.')
if account_id := redis_lookup_id_random(table_name='account', record_id_random=x_account_id):
log.setLevel(logging.DEBUG)
log.info('Found the account_id with the account_id_random value: '+x_account_id)
account = { 'id': account_id, 'id_random': x_account_id }
else:
log.setLevel(logging.DEBUG)
log.info('The x-account-id was invalid and not empty...')
#raise HTTPException(status_code=500)
raise HTTPException(status_code=400) # or 404?
#return False
elif x_account_id == '':
log.info('The x-account-id header was empty.')
account = { 'id': None, 'id_random': None }
return account
# ### END ### API Lib General ### async get_account_header() ###
# ### BEGIN ### API Lib General ### redis_lookup_id_random() ###
# Just return the value if it is an integer
# Check if the id_random value is a string and the correct length
# Attempt to look up id_random key in Redis
# If success then return the ID number
# If not success and there is a table_name then check the database table passed
# If found in database table then store in Redis and return the ID number
def redis_lookup_id_random(record_id_random:int|str, table_name:str):
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
if isinstance(record_id_random, str): pass
elif isinstance(record_id_random, int): return record_id_random
else:
log.warning(f'Unexpected data type: {str(type(record_id_random))} Expected type is a string 11 or 22 characters long.')
return False
if record_id_random and table_name:
# WARNING: The record_id_random string length should be checked just in case?
if len(record_id_random) < 11:
log.warning(f'The length of id_random is too short: {str(record_id_random)} ({len(record_id_random)} chars)')
return False
elif len(record_id_random) > 22:
log.warning(f'The length of id_random is too long {str(record_id_random)} ({len(record_id_random)} chars)')
return False
else:
pass
else:
log.warning('Missing table_name to select from for id_random')
return False
r = redis.Redis(host='localhost', port=6379, db=7, password=None, decode_responses=True)
key_name = 'record_id:'+record_id_random
record_id = r.get(key_name)
log.debug(f'Record ID? {str(record_id)}')
if record_id:
log.info('The record ID was found using the record_id_random value.')
log.info(f'TTL for: {key_name} : {str(record_id)} is {str(r.ttl(key_name))} seconds')
return int(record_id)
elif table_name:
data = { 'id_random': record_id_random }
sql = f"""
SELECT id
FROM `{table_name}` AS `table`
WHERE `table`.id_random = :id_random;
"""
if select_results := sql_select(sql=sql, data=data):
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(select_results)
log.debug(type(select_results))
if isinstance(select_results, dict):
log.info(f"""Record ID random found: {str(select_results['id'])}""")
if record_id := select_results.get('id'):
r.setex(key_name, datetime.timedelta(minutes=90), value=record_id)
return int(record_id)
else:
log.setLevel(logging.ERROR) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.error('The SQL result was not what was expected.')
return False
else:
log.setLevel(logging.ERROR) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.error('More than one record may have been found. There may be a duplicate id_random.')
log.error(select_results)
return False
else:
#log.setLevel(logging.INFO) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.info('Record ID random was not found')
return None
log.setLevel(logging.ERROR) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.error('We should not be here. Something unexpected happened.')
return False # Just in case
# ### BEGIN ### API Lib General ### redis_lookup_id_random() ###
# ### BEGIN ### API Lib General ### lookup_id_random_pop() ###
# Look up and resolve id_random values to their id
# Remove the unneeded *_id_random key from the dict
def lookup_id_random_pop(obj_data:dict):
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
if 'account_id_random' in obj_data:
obj_data['account_id'] = redis_lookup_id_random(record_id_random=obj_data['account_id_random'], table_name='account')
obj_data.pop('account_id_random')
if 'address_id_random' in obj_data:
obj_data['address_id'] = redis_lookup_id_random(record_id_random=obj_data['address_id_random'], table_name='address')
obj_data.pop('address_id_random')
if 'address_location_id_random' in obj_data:
obj_data['address_location_id'] = redis_lookup_id_random(record_id_random=obj_data['address_location_id_random'], table_name='address')
obj_data.pop('address_location_id_random')
if 'archive_id_random' in obj_data:
obj_data['archive_id'] = redis_lookup_id_random(record_id_random=obj_data['archive_id_random'], table_name='archive')
obj_data.pop('archive_id_random')
if 'contact_id_random' in obj_data:
obj_data['contact_id'] = redis_lookup_id_random(record_id_random=obj_data['contact_id_random'], table_name='contact')
obj_data.pop('contact_id_random')
if 'event_id_random' in obj_data:
obj_data['event_id'] = redis_lookup_id_random(record_id_random=obj_data.get('event_id_random', None), table_name='event')
obj_data.pop('event_id_random')
if 'event_exhibit_id_random' in obj_data:
obj_data['event_exhibit_id'] = redis_lookup_id_random(record_id_random=obj_data.get('event_exhibit_id_random', None), table_name='event_exhibit')
obj_data.pop('event_exhibit_id_random')
if 'hosted_file_id_random' in obj_data:
obj_data['hosted_file_id'] = redis_lookup_id_random(record_id_random=obj_data.get('hosted_file_id_random', None), table_name='hosted_file')
obj_data.pop('hosted_file_id_random')
if 'order_id_random' in obj_data:
obj_data['order_id'] = redis_lookup_id_random(record_id_random=obj_data.get('order_id_random', None), table_name='order')
obj_data.pop('order_id_random')
if 'order_line_id_random' in obj_data:
obj_data['order_line_id'] = redis_lookup_id_random(record_id_random=obj_data.get('order_line_id_random', None), table_name='order_line')
obj_data.pop('order_line_id_random')
if 'order_cart_id_random' in obj_data:
obj_data['order_cart_id'] = redis_lookup_id_random(record_id_random=obj_data.get('order_cart_id_random', None), table_name='order_cart')
obj_data.pop('order_cart_id_random')
if 'order_cart_line_id_random' in obj_data:
obj_data['order_cart_line_id'] = redis_lookup_id_random(record_id_random=obj_data.get('order_cart_line_id_random', None), table_name='order_cart_line')
obj_data.pop('order_cart_line_id_random')
if 'organization_id_random' in obj_data:
obj_data['organization_id'] = redis_lookup_id_random(record_id_random=obj_data.get('organization_id_random', None), table_name='organization')
obj_data.pop('organization_id_random')
if 'person_id_random' in obj_data:
obj_data['person_id'] = redis_lookup_id_random(record_id_random=obj_data['person_id_random'], table_name='person')
obj_data.pop('person_id_random')
if 'post_id_random' in obj_data:
obj_data['post_id'] = redis_lookup_id_random(record_id_random=obj_data.get('post_id_random', None), table_name='post')
obj_data.pop('post_id_random')
if 'product_id_random' in obj_data:
obj_data['product_id'] = redis_lookup_id_random(record_id_random=obj_data['product_id_random'], table_name='product')
obj_data.pop('product_id_random')
if 'user_id_random' in obj_data:
obj_data['user_id'] = redis_lookup_id_random(record_id_random=obj_data['user_id_random'], table_name='user')
obj_data.pop('user_id_random')
if 'for_type' in obj_data and 'for_id_random' in obj_data:
obj_data['for_id'] = redis_lookup_id_random(record_id_random=obj_data.get('for_id_random', None), table_name=obj_data.get('for_type', None))
obj_data.pop('for_id_random')
#log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(obj_data)
elif 'for_id_random' in obj_data:
# In case for_id_random was passed without for_type
log.warn('for_id_random was passed without for_type')
obj_data.pop('for_id_random')
if 'object_type' in obj_data and 'object_id_random' in obj_data:
obj_data['object_id'] = redis_lookup_id_random(record_id_random=obj_data.get('object_id_random', None), table_name=obj_data.get('object_type', None))
obj_data.pop('object_id_random')
#log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(obj_data)
elif 'object_id_random' in obj_data:
# In case object_id_random was passed without object_type
log.warn('object_id_random was passed without object_type')
obj_data.pop('object_id_random')
if 'to_object_type' in obj_data and 'to_object_id_random' in obj_data:
obj_data['to_object_id'] = redis_lookup_id_random(record_id_random=obj_data.get('to_object_id_random', None), table_name=obj_data.get('to_object_type', None))
obj_data.pop('to_object_id_random')
#log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(obj_data)
elif 'to_object_id_random' in obj_data:
# In case to_object_id_random was passed without to_object_type
log.warn('to_object_id_random was passed without to_object_type')
obj_data.pop('to_object_id_random')
if 'from_object_type' in obj_data and 'from_object_id_random' in obj_data:
obj_data['from_object_id'] = redis_lookup_id_random(record_id_random=obj_data.get('from_object_id_random', None), table_name=obj_data.get('from_object_type', None))
obj_data.pop('from_object_id_random')
#log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(obj_data)
elif 'from_object_id_random' in obj_data:
# In case from_object_id_random was passed without from_object_type
log.warn('from_object_id_random was passed without from_object_type')
obj_data.pop('from_object_id_random')
return obj_data
# ### END ### API Lib General ### lookup_id_random_pop() ###
def secure_hash_string(string:str):
string_hash = argon2.using(rounds=14, memory_cost=1536, parallelism=2).hash(string)
return string_hash
def verify_secure_hash_string(string:str, string_hash:str):
if argon2.verify(string, string_hash):
return True
else:
return False