114 lines
3.9 KiB
Python
114 lines
3.9 KiB
Python
from __future__ import annotations
|
|
import datetime, jwt, pytz, redis, time
|
|
from passlib.hash import argon2
|
|
|
|
from fastapi import APIRouter, Depends, Header, HTTPException, Response, status
|
|
from pydantic import BaseModel, EmailStr, Field
|
|
from typing import Dict, List, Optional, Set, Union
|
|
|
|
from app.log import log, logging, logger_reset
|
|
from app.db_sql import redis_lookup_id_random, sql_select
|
|
|
|
|
|
# ### BEGIN ### API Lib General ### async get_token_header() ###
|
|
async def get_token_header(x_token:str = Header(...)):
|
|
if x_token != 'fake-super-secret-token':
|
|
raise HTTPException(status_code=400, detail='X-Token header invalid')
|
|
# ### END ### API Lib General ### async get_token_header() ###
|
|
|
|
|
|
# ### BEGIN ### API Lib General ### async get_account_header() ###
|
|
# Updated 2021-08-23
|
|
async def get_account_header(x_account_id:str = Header(...)) -> dict:
|
|
log.setLevel(logging.WARNING) # DEBUG, INFO, WARN, WARNING, ERROR, EXCEPTION, CRITICAL
|
|
log.debug(locals())
|
|
|
|
if len(x_account_id):
|
|
log.info(f'The x-account-id header has a value. x-account-id: {x_account_id}')
|
|
if account_id := redis_lookup_id_random(table_name='account', record_id_random=x_account_id):
|
|
log.setLevel(logging.DEBUG)
|
|
log.info('Found the account_id with the account_id_random value: '+x_account_id)
|
|
account = { 'id': account_id, 'id_random': x_account_id }
|
|
else:
|
|
log.warning('The x-account-id Account ID was not found or it was invalid...')
|
|
#raise HTTPException(status_code=500)
|
|
raise HTTPException(status_code=400) # or 404?
|
|
#return False
|
|
elif x_account_id == '':
|
|
log.info('The x-account-id header was empty.')
|
|
account = { 'id': None, 'id_random': None }
|
|
#account = { 'id': 0, 'id_random': 'abcdef123456' }
|
|
|
|
return account
|
|
# ### END ### API Lib General ### async get_account_header() ###
|
|
|
|
|
|
def secure_hash_string(string:str):
|
|
string_hash = argon2.using(rounds=14, memory_cost=1536, parallelism=2).hash(string)
|
|
|
|
return string_hash
|
|
|
|
|
|
def verify_secure_hash_string(string:str, string_hash:str):
|
|
if argon2.verify(string, string_hash):
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
|
|
# ### BEGIN ### API Lib General ### sign_jwt() ###
|
|
# Updated 2021-07-14
|
|
@logger_reset
|
|
def sign_jwt(
|
|
secret_key: str, # Secret/Private/Password
|
|
public_key: str, # Will be part of the token. Use to look up secret when verifying.
|
|
ttl: int = 60, # Default to 60 seconds
|
|
max_renew: int = 0, # Default to 0
|
|
account_id: str = None,
|
|
person_id: str = None,
|
|
user_id: str = None,
|
|
) -> Dict[str, str]:
|
|
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
|
|
log.debug(locals())
|
|
|
|
payload = {
|
|
'iat': time.time(), # Issued at
|
|
'eat': time.time() + ttl, # Expires at
|
|
'max_renew': max_renew, # Number of times allowed to request renew without API secret key
|
|
'public_key': public_key, # Use to lookup the secret/private/password key when verifying
|
|
'account_id': account_id,
|
|
'person_id': person_id,
|
|
'user_id': user_id,
|
|
}
|
|
secret = secret_key
|
|
algorithm = 'HS256'
|
|
token = jwt.encode(payload, secret, algorithm=algorithm)
|
|
|
|
log.debug(token)
|
|
|
|
return token
|
|
# ### END ### API Lib General ### sign_jwt() ###
|
|
|
|
|
|
# ### BEGIN ### API Lib General ### decode_jwt() ###
|
|
# Updated 2021-07-14
|
|
@logger_reset
|
|
def decode_jwt(
|
|
secret_key: str,
|
|
token: str,
|
|
) -> dict:
|
|
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
|
|
log.debug(locals())
|
|
|
|
secret = secret_key
|
|
algorithm = 'HS256'
|
|
|
|
try:
|
|
decoded_token = jwt.decode(token, secret, algorithms=[algorithm])
|
|
log.debug(decoded_token)
|
|
if decoded_token['eat'] >= time.time(): return decoded_token
|
|
else: return False
|
|
except:
|
|
return None
|
|
# ### END ### API Lib General ### decode_jwt() ###
|