import datetime from fastapi import APIRouter, Body, Depends, Header, HTTPException, Path, Query, Response, status from pydantic import BaseModel, EmailStr, Field from typing import Dict, List, Optional, Set, Union from app.lib_general import log, logging, sign_jwt, decode_jwt from app.config import settings from app.db_sql import sql_insert, sql_update, sql_insert_or_update, sql_select, sql_delete, redis_lookup_id_random from app.routers.api_crud import delete_obj_template, get_obj_template, get_obj_li_template, patch_obj_template, post_obj_template from app.models.api_models import Api_Base from app.models.response_models import Resp_Body_Base, mk_resp router = APIRouter() # ### BEGIN ### API API ### request_jwt() ### # This can be used to generate JWTs for various purposes: # * for end client browser API access # * for proof of sign in # * newer/better version of sign in by URL # Generate (sign) JWT using Aether platform super secret key or x_aether_signing_key sort of secret key if passed. The Aether platform super secret JWT signing key must be used API access token # If x_aether_api_key is passed then set higher TTL # If old and valid x_aether_api_jwt_token is passed then decode and decrease TTL by 1 # Updated 2023-03-24 # Verify JWT using the API public key's associated API private key # API server or trusted app can generate JWTs # JWT contains: # * client_token (to request a new short term client token) # * iat # * eat # * account_id # * client_id # * order_cart_id # * person_id # * user_id # API server verifies JWTs # Updated 2021-07-14 @router.get('/request_jwt', response_model=Resp_Body_Base) async def request_jwt( x_aether_signing_key: Optional[str] = Header(None, min_length=22, max_length=22), # The (secret) signing key. Keep safe!!! If passed then use to sign JWT. Otherwise need to get from system/environment. # x_aether_secret_key: Optional[str] = Header(None, min_length=22, max_length=22), # The Aether secret key. Keep safe!!! If passed then can also set TTL x_aether_api_key: Optional[str] = Header(None, min_length=22, max_length=22), # The client side API key. This should be kept secret by the client. If passed then store with JWT and can set TTL. # x_aether_api_public_key: Optional[str] = Header(None, min_length=22, max_length=22), # Used to look up the API secret if not given x_aether_jwt: Optional[str] = Header(None), # A JWT that was created and given to client browser or server in the past. It may or may not be valid. If the x_aether_signing_key was not passed, then assume it was signed with the Aether super secret key. account_id: str = None, # Handle this different because it is special json_str: str = None, # This is what should be stored b64_str: str = None, # This is what should be stored # I would like payload to be a dict, but then we have to use POST instead of GET... # Maybe base64 encode and decode? # session_id: str = None, # End client (web browser) # client_id: str = None, # End client (web browser) # person_id: str = None, # user_id: str = None, max_ttl: int = 300, # Number of seconds to live. Only use if given the API secret key. # Seconds: 3600 = 1 hr; 300 = 5 min max_renew: int = 5, # Decrease count by 1 until 0 if only sent a current API token. response: Response = Response, ): log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) # One of these is required if account_id or json_str or b64_str: pass else: return mk_resp(data=False, status_code=400, response=response) # Bad Request # Possible overrides and checks go here if x_aether_signing_key: pass elif x_aether_api_key: # Override any if for API JWT??? max_ttl = 3600 max_renew = 5 # if not x_aether_secret_key: max_renew = 5 # Override any max_rewnew if no API secret # api_secret_key = x_aether_secret_key signing_key = None if x_aether_signing_key: signing_key = x_aether_signing_key elif settings.JWT_KEY: signing_key = settings.JWT_KEY else: log.error('No key found to sign the JWT with!') return mk_resp(data=False, status_code=400, response=response) # Bad Request payload = {} payload['account_id'] = account_id payload['json_str'] = json_str payload['b64_str'] = b64_str token = sign_jwt(secret_key=signing_key, public_key=x_aether_api_key, ttl=max_ttl, max_renew=max_renew, **payload) response_data = { 'jwt': token } return mk_resp(data=response_data) if x_aether_secret_key: log.debug(f'Contains a value in x_aether_secret_key: {x_aether_secret_key}') table_name_select = 'api_key' field_name = 'secret_key' field_value = api_secret_key if api_key_rec_select_result := sql_select(table_name=table_name_select, field_name=field_name, field_value=field_value): pass else: log.warning('No results when looking up the API secret key') return mk_resp(data=False, status_code=401, response=response) # Unauthorized elif x_aether_api_public_key and x_aether_api_token: table_name_select = 'api_key' field_name = 'public_key' field_value = x_aether_api_public_key if api_key_rec_select_result := sql_select(table_name=table_name_select, field_name=field_name, field_value=field_value): pass else: log.warning('No results when looking up the API public key') return mk_resp(data=False, status_code=401, response=response) # Unauthorized # Check if the API keys are valid if api_key_rec_select_result.get('enable', None): api_key_rec = api_key_rec_select_result else: log.warning('API secret key not enabled') return mk_resp(data=False, status_code=401, response=response) # Unauthorized current_datetime = datetime.datetime.utcnow() # datetime.datetime.now() Gets server local datetime if api_key_rec.get('enable_from', None) <= current_datetime and api_key_rec.get('enable_to', None) >= current_datetime: pass else: log.warning('API secret key expired') return mk_resp(data=False, status_code=401, response=response) # Unauthorized if api_secret_key := api_key_rec.get('secret_key', None): pass else: log.warning('Secret key was not found') return mk_resp(data=False, status_code=400, response=response) # Bad Request if api_public_key := api_key_rec.get('public_key', None): pass else: log.warning('Public key was not found') return mk_resp(data=False, status_code=400, response=response) # Bad Request # Decode the JWT if an API token was sent and the API secret key was sent/found. if x_aether_api_token and api_public_key and api_secret_key: if current_token := decode_jwt(secret_key=api_secret_key, token=x_aether_api_token): if current_token.get('max_renew', 0) > 0: pass else: message = 'The JWT sent is out of allowed renewals. Try again with a current JWT or just the API secret key.' log.warning(message) return mk_resp(data=False, status_code=401, status_message=message) # Unauthorized max_renew = current_token.get('max_renew', 0) - 1 if not account_id: account_id = current_token.get('account_id', None) if not person_id: person_id = current_token.get('person_id', None) if not user_id: user_id = current_token.get('user_id', None) else: message = 'The JWT sent is either expired or otherwise invalid. Try again with a current JWT or just the API secret key.' log.warning(message) return mk_resp(data=False, status_code=401, status_message=message) # Unauthorized payload = {} payload['account_id'] = account_id payload['person_id'] = person_id payload['user_id'] = user_id token = sign_jwt(secret_key=api_secret_key, public_key=api_public_key, ttl=max_ttl, max_renew=max_renew, **payload) response_data = { 'jwt': token } return mk_resp(data=response_data) # ### END ### API API ### request_jwt() ### @router.get('/temp_token', response_model=Resp_Body_Base) async def get_api_temp_token( x_aether_api_key: Optional[str] = Header(None), x_aether_api_token: Optional[str] = Header(None), x_aether_api_token_expire_on: Optional[str] = Header(None), response: Response = Response, ): log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) table_name_select = 'api_key' field_name = 'secret_key' field_value = x_aether_api_key if x_aether_api_key: log.debug(f'Contains a value in x_aether_api_key: {x_aether_api_key}') sql_result = sql_select(table_name=table_name_select, field_name=field_name, field_value=field_value) else: return mk_resp(data=False, status_code=400, response=response) # Bad Request # log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL if sql_result: log.debug(type(sql_result)) log.debug(sql_result) base_name = Api_Base log.debug(base_name) resp_data = base_name(**sql_result).dict(by_alias=True, exclude_unset=False) log.debug(resp_data) return mk_resp(data=resp_data) else: log.debug(sql_result) return mk_resp(data=False, status_code=404, response=response) @router.post('', response_model=Resp_Body_Base) async def post_api_obj( obj: Api_Base, x_account_id: str = Header(...), return_obj: Optional[bool] = True, by_alias: Optional[bool] = True, exclude_unset: Optional[bool] = True, response: Response = Response, ): log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) obj_type = 'api' obj_data_dict = obj.dict(by_alias=False, exclude_unset=True) result = post_obj_template( obj_type=obj_type, data=obj_data_dict, return_obj=True, by_alias=True, exclude_unset=True, ) return result @router.patch('/{obj_id}', response_model=Resp_Body_Base) async def patch_api_obj( obj_id: str, obj: Api_Base = None, x_account_id: Optional[str] = Header(..., ), return_obj: Optional[bool] = True, by_alias: Optional[bool] = True, exclude_unset: Optional[bool] = True, response: Response = Response, ): log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) obj_type = 'api' obj_data_dict = obj.dict(by_alias=False, exclude_unset=True) obj_data_dict['id'] = redis_lookup_id_random(record_id_random=obj_id, table_name=obj_type) obj_data_dict['id_random'] = obj_id result = patch_obj_template( obj_type=obj_type, data=obj_data_dict, obj_id=obj_id, return_obj=True, by_alias=True, exclude_unset=True, ) return result @router.get('/list', response_model=Resp_Body_Base) async def get_api_obj_li( for_obj_type: Optional[str] = Query(None, min_length=2, max_length=50), for_obj_id: Optional[str] = Query(None, min_length=1, max_length=22), x_account_id: str = Header(...), by_alias: Optional[bool] = True, exclude_unset: Optional[bool] = True, response: Response = Response, ): log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) obj_type = 'api' result = get_obj_li_template( obj_type=obj_type, for_obj_type=for_obj_type, for_obj_id=for_obj_id, by_alias=True, exclude_unset=True, ) return result @router.get('/{obj_id}', response_model=Resp_Body_Base) async def get_api_obj( obj_id: str, x_account_id: str = Header(...), by_alias: Optional[bool] = True, exclude_unset: Optional[bool] = True, response: Response = Response, ): log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) obj_type = 'api' result = get_obj_template( obj_type=obj_type, obj_id=obj_id, by_alias=True, exclude_unset=True, ) return result @router.delete('/{obj_id}', response_model=Resp_Body_Base) async def delete_api_obj( obj_id: str, x_account_id: str = Header(...), response: Response = Response, ): log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) obj_type = 'api' result = delete_obj_template( obj_type=obj_type, obj_id=obj_id, ) return result @router.get('/get_id/{object_type}/{object_id_random}', response_model=Resp_Body_Base) async def get_api_object_id( object_type: str, object_id_random: str, x_account_id: str = Header(...), by_alias: Optional[bool] = True, exclude_unset: Optional[bool] = True, response: Response = Response, ): log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) if object_id := redis_lookup_id_random(record_id_random=object_id_random, table_name=object_type): return mk_resp(data={ 'object_id': object_id}, status_code=400) else: return mk_resp(data=None, status_code=400)