from __future__ import annotations import datetime, jwt, pytz, redis, time from passlib.hash import argon2 #from datetime import datetime, time, timedelta from fastapi import APIRouter, Depends, Header, HTTPException, Response, status from pydantic import BaseModel, EmailStr, Field from typing import Dict, List, Optional, Set, Union from app.log import * from app.db_sql import sql_select # ### BEGIN ### API Lib General ### async get_token_header() ### async def get_token_header(x_token:str = Header(...)): if x_token != 'fake-super-secret-token': raise HTTPException(status_code=400, detail='X-Token header invalid') # ### END ### API Lib General ### async get_token_header() ### # ### BEGIN ### API Lib General ### async get_account_header() ### async def get_account_header(x_account_id:str = Header(...)): log.setLevel(logging.WARNING) # DEBUG, INFO, WARN, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) if len(x_account_id): log.info('The x-account-id header has a value.') if account_id := redis_lookup_id_random(table_name='account', record_id_random=x_account_id): log.setLevel(logging.DEBUG) log.info('Found the account_id with the account_id_random value: '+x_account_id) account = { 'id': account_id, 'id_random': x_account_id } else: log.setLevel(logging.DEBUG) log.info('The x-account-id was invalid and not empty...') #raise HTTPException(status_code=500) raise HTTPException(status_code=400) # or 404? #return False elif x_account_id == '': log.info('The x-account-id header was empty.') account = { 'id': None, 'id_random': None } #account = { 'id': 0, 'id_random': 'abcdef123456' } return account # ### END ### API Lib General ### async get_account_header() ### def secure_hash_string(string:str): string_hash = argon2.using(rounds=14, memory_cost=1536, parallelism=2).hash(string) return string_hash def verify_secure_hash_string(string:str, string_hash:str): if argon2.verify(string, string_hash): return True else: return False # Updated 2021-07-14 def sign_jwt( secret_key: str, # Secret/Private/Password public_key: str, # Will be part of the token. Use to look up secret when verifying. ttl: int = 60, # Default to 60 seconds max_renew: int = 0, # Default to 0 account_id: str = None, person_id: str = None, user_id: str = None, ) -> Dict[str, str]: log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) payload = { 'iat': time.time(), # Issued at 'eat': time.time() + ttl, # Expires at 'max_renew': max_renew, # Number of times allowed to request renew without API secret key 'public_key': public_key, # Use to lookup the secret/private/password key when verifying 'account_id': account_id, 'person_id': person_id, 'user_id': user_id, } secret = secret_key algorithm = 'HS256' token = jwt.encode(payload, secret, algorithm=algorithm) log.debug(token) return token # Updated 2021-07-14 def decode_jwt( secret_key: str, token: str, ) -> dict: log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) secret = secret_key algorithm = 'HS256' try: decoded_token = jwt.decode(token, secret, algorithms=[algorithm]) log.debug(decoded_token) if decoded_token['eat'] >= time.time(): return decoded_token else: return False except: return None