import redis from datetime import datetime, time, timedelta from fastapi import APIRouter, Depends, Header, HTTPException, status from pydantic import BaseModel, EmailStr, Field from typing import Dict, List, Optional, Set, Union from .log import * from .db import * async def get_token_header(x_token: str = Header(...)): if x_token != 'fake-super-secret-token': raise HTTPException(status_code=400, detail='X-Token header invalid') async def get_account_header(x_account_id: str = Header(...)): log.setLevel(logging.DEBUG) # DEBUG, INFO, WARN, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) print('get_account_header(): '+x_account_id) if len(x_account_id): log.info('The x-account-id header has a value.') if account_id := redis_lookup_id_random(table_name='account', record_id_random=x_account_id): log.setLevel(logging.DEBUG) log.info('Found the account_id with the account_id_random value: '+x_account_id) account = { 'id': account_id, 'id_random': x_account_id } else: log.setLevel(logging.DEBUG) log.info('The x-account-id was invalid and not empty...') #raise HTTPException(status_code=500) raise HTTPException(status_code=400) # or 404? #return False elif x_account_id == '': log.info('The x-account-id header was empty.') account = { 'id': None, 'id_random': None } return account #Add the processing time to the response header. #@app.middleware('http') #async def add_process_time_header(request: Request, call_next): #import time #start_time = time.time() #response = await call_next(request) #process_time = time.time() - start_time #response.headers['X-Process-Time'] = str(process_time) #return response #async def get_token_header(x_token: str = Header(...)): #if x_token != 'fake-super-secret-token': #raise HTTPException(status_code=400, detail='X-Token header invalid') #async def get_account_header(x_account_id: str = Header(...)): #@app.middleware("http") #async def get_account_header(x_account_id: str = Header(...)): #return x_account_id #x_account_id: str = Header(...) #x_account_id = 'static random ID...' #response = await call_next(request) #print(x_account_id) #return x_account_id #async def get_account_header(x_account_id: str = Header(...)): #print('get_account_header(): '+x_account_id+'z9999z') #return x_account_id+'z9999z' # Attempt to look up id_random key # If success then return the id number # If not success and there is a table_name then check the database table passed # If found in database table then store in Redis def redis_lookup_id_random(record_id_random=None, table_name=None): log.setLevel(logging.DEBUG) # DEBUG, INFO, WARN, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) r = redis.Redis(host='localhost', port=6379, db=7, password=None, decode_responses=True) key_name = 'record_id:'+record_id_random record_id = r.get(key_name) #print('Record ID? '+str(record_id)) if record_id: print('TTL for: '+key_name+' : '+str(record_id)+' is '+str(r.ttl(key_name))+' seconds') return record_id elif table_name: data = { 'id_random': record_id_random } sql = """ SELECT id FROM `"""+table_name+"""` AS `table` WHERE table.id_random = :id_random """ if select_results := sql_select(table_name=table_name, record_id_random=record_id_random): # sql_select(sql=sql, data=data) #print('Record ID random found: '+str(select_results['id'])) record_id = select_results['id'] r.setex(key_name, timedelta(minutes=2), value=record_id) return record_id else: #print('Record ID random was not found') return None else: print('Missing table_name to select from for id_random') return False #return False