Files
OSIT-AE-API-FastAPI/app/lib_general.py
2021-07-14 17:12:20 -04:00

110 lines
3.6 KiB
Python

from __future__ import annotations
import datetime, jwt, pytz, redis, time
from passlib.hash import argon2
#from datetime import datetime, time, timedelta
from fastapi import APIRouter, Depends, Header, HTTPException, status
from pydantic import BaseModel, EmailStr, Field
from typing import Dict, List, Optional, Set, Union
from app.log import *
from app.db_sql import sql_select
# ### BEGIN ### API Lib General ### async get_token_header() ###
async def get_token_header(x_token:str = Header(...)):
if x_token != 'fake-super-secret-token':
raise HTTPException(status_code=400, detail='X-Token header invalid')
# ### END ### API Lib General ### async get_token_header() ###
# ### BEGIN ### API Lib General ### async get_account_header() ###
async def get_account_header(x_account_id:str = Header(...)):
log.setLevel(logging.WARNING) # DEBUG, INFO, WARN, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
if len(x_account_id):
log.info('The x-account-id header has a value.')
if account_id := redis_lookup_id_random(table_name='account', record_id_random=x_account_id):
log.setLevel(logging.DEBUG)
log.info('Found the account_id with the account_id_random value: '+x_account_id)
account = { 'id': account_id, 'id_random': x_account_id }
else:
log.setLevel(logging.DEBUG)
log.info('The x-account-id was invalid and not empty...')
#raise HTTPException(status_code=500)
raise HTTPException(status_code=400) # or 404?
#return False
elif x_account_id == '':
log.info('The x-account-id header was empty.')
account = { 'id': None, 'id_random': None }
#account = { 'id': 0, 'id_random': 'abcdef123456' }
return account
# ### END ### API Lib General ### async get_account_header() ###
def secure_hash_string(string:str):
string_hash = argon2.using(rounds=14, memory_cost=1536, parallelism=2).hash(string)
return string_hash
def verify_secure_hash_string(string:str, string_hash:str):
if argon2.verify(string, string_hash):
return True
else:
return False
# Updated 2021-07-14
def sign_jwt(
secret_key: str, # Secret/Private/Password
public_key: str, # Will be part of the token. Use to look up secret when verifying.
ttl: int = 60, # Default to 60 seconds
max_renew: int = 0, # Default to 0
account_id: str = None,
person_id: str = None,
user_id: str = None,
) -> Dict[str, str]:
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
payload = {
'iat': time.time(), # Issued at
'eat': time.time() + ttl, # Expires at
'max_renew': max_renew, # Number of times allowed to request renew without API secret key
'public_key': public_key, # Use to lookup the secret/private/password key when verifying
'account_id': account_id,
'person_id': person_id,
'user_id': user_id,
}
secret = secret_key
algorithm = 'HS256'
token = jwt.encode(payload, secret, algorithm=algorithm)
log.debug(token)
return token
# Updated 2021-07-14
def decode_jwt(
secret_key: str,
token: str,
) -> dict:
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
secret = secret_key
algorithm = 'HS256'
try:
decoded_token = jwt.decode(token, secret, algorithms=[algorithm])
log.debug(decoded_token)
if decoded_token['eat'] >= time.time(): return decoded_token
else: return False
except:
return None