import datetime, jwt, time from fastapi import APIRouter, Body, Depends, Header, HTTPException, Path, Query, Response, status from pydantic import BaseModel, EmailStr, Field from typing import Dict, List, Optional, Set, Union from app.lib_general import log, logging, sign_jwt, decode_jwt, common_route_params, Common_Route_Params, common_route_params_min, Common_Route_Params_Min from app.config import settings from app.db_sql import sql_insert, sql_update, sql_insert_or_update, sql_select, sql_delete, redis_lookup_id_random from app.routers.api_crud import delete_obj_template, get_obj_template, get_obj_li_template, patch_obj_template, post_obj_template from app.models.api_models import Api_Base from app.models.response_models import Resp_Body_Base, mk_resp router = APIRouter() # ### BEGIN ### API API ### request_jwt() ### # This can be used to generate JWTs for various purposes: # * for end client browser API access # * for proof of sign in # * newer/better version of sign in by URL # Generate (sign) JWT using Aether platform super secret key or x_aether_signing_key sort of secret key if passed. The Aether platform super secret JWT signing key must be used API access token # If x_aether_api_key is passed then set higher TTL # If old and valid x_aether_api_jwt_token is passed then decode and decrease TTL by 1 # Updated 2023-03-24 # Verify JWT using the API public key's associated API private key # API server or trusted app can generate JWTs # JWT contains: # * client_token (to request a new short term client token) # * iat # * eat # * account_id # * client_id # * order_cart_id # * person_id # * user_id # API server verifies JWTs # Updated 2021-07-14 @router.get('/request_jwt', response_model=Resp_Body_Base) async def request_jwt( x_aether_signing_key: Optional[str] = Header(None, min_length=22, max_length=22), # The (secret) signing key. Keep safe!!! If passed then use to sign JWT. Otherwise need to get from system/environment. # x_aether_secret_key: Optional[str] = Header(None, min_length=22, max_length=22), # The Aether secret key. Keep safe!!! If passed then can also set TTL x_aether_api_key: Optional[str] = Header(None, min_length=22, max_length=22), # The client side API key. This should be kept secret by the client. If passed then store with JWT and can set TTL. # x_aether_api_public_key: Optional[str] = Header(None, min_length=22, max_length=22), # Used to look up the API secret if not given x_aether_jwt: Optional[str] = Header(None), # A JWT that was created and given to client browser or server in the past. It may or may not be valid. If the x_aether_signing_key was not passed, then assume it was signed with the Aether super secret key. account_id: str = None, # Handle this different because it is special json_str: str = None, # This is what should be stored b64_str: str = None, # This is what should be stored # I would like payload to be a dict, but then we have to use POST instead of GET... # Maybe base64 encode and decode? # session_id: str = None, # End client (web browser) # client_id: str = None, # End client (web browser) # person_id: str = None, # user_id: str = None, max_ttl: int = 300, # Number of seconds to live. Only use if given the API secret key. # Seconds: 3600 = 1 hr; 300 = 5 min max_renew: int = 5, # Decrease count by 1 until 0 if only sent a current API token. response: Response = Response, ): log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) # One of these is required if account_id or json_str or b64_str: pass else: return mk_resp(data=False, status_code=400, response=response) # Bad Request # Possible overrides and checks go here if x_aether_signing_key: pass elif x_aether_api_key: # Override any if for API JWT??? max_ttl = 3600 max_renew = 5 # if not x_aether_secret_key: max_renew = 5 # Override any max_rewnew if no API secret # api_secret_key = x_aether_secret_key signing_key = None if x_aether_signing_key: signing_key = x_aether_signing_key elif settings.JWT_KEY: signing_key = settings.JWT_KEY else: log.error('No key found to sign the JWT with!') return mk_resp(data=False, status_code=400, response=response) # Bad Request payload = {} payload['account_id'] = account_id payload['json_str'] = json_str payload['b64_str'] = b64_str token = sign_jwt(secret_key=signing_key, public_key=x_aether_api_key, ttl=max_ttl, max_renew=max_renew, **payload) response_data = { 'jwt': token } return mk_resp(data=response_data) if x_aether_secret_key: log.debug(f'Contains a value in x_aether_secret_key: {x_aether_secret_key}') table_name_select = 'api_key' field_name = 'secret_key' field_value = api_secret_key if api_key_rec_select_result := sql_select(table_name=table_name_select, field_name=field_name, field_value=field_value): pass else: log.warning('No results when looking up the API secret key') return mk_resp(data=False, status_code=401, response=response) # Unauthorized elif x_aether_api_public_key and x_aether_api_token: table_name_select = 'api_key' field_name = 'public_key' field_value = x_aether_api_public_key if api_key_rec_select_result := sql_select(table_name=table_name_select, field_name=field_name, field_value=field_value): pass else: log.warning('No results when looking up the API public key') return mk_resp(data=False, status_code=401, response=response) # Unauthorized # Check if the API keys are valid if api_key_rec_select_result.get('enable', None): api_key_rec = api_key_rec_select_result else: log.warning('API secret key not enabled') return mk_resp(data=False, status_code=401, response=response) # Unauthorized current_datetime = datetime.datetime.utcnow() # datetime.datetime.now() Gets server local datetime if api_key_rec.get('enable_from', None) <= current_datetime and api_key_rec.get('enable_to', None) >= current_datetime: pass else: log.warning('API secret key expired') return mk_resp(data=False, status_code=401, response=response) # Unauthorized if api_secret_key := api_key_rec.get('secret_key', None): pass else: log.warning('Secret key was not found') return mk_resp(data=False, status_code=400, response=response) # Bad Request if api_public_key := api_key_rec.get('public_key', None): pass else: log.warning('Public key was not found') return mk_resp(data=False, status_code=400, response=response) # Bad Request # Decode the JWT if an API token was sent and the API secret key was sent/found. if x_aether_api_token and api_public_key and api_secret_key: if current_token := decode_jwt(secret_key=api_secret_key, token=x_aether_api_token): if current_token.get('max_renew', 0) > 0: pass else: message = 'The JWT sent is out of allowed renewals. Try again with a current JWT or just the API secret key.' log.warning(message) return mk_resp(data=False, status_code=401, status_message=message) # Unauthorized max_renew = current_token.get('max_renew', 0) - 1 if not account_id: account_id = current_token.get('account_id', None) if not person_id: person_id = current_token.get('person_id', None) if not user_id: user_id = current_token.get('user_id', None) else: message = 'The JWT sent is either expired or otherwise invalid. Try again with a current JWT or just the API secret key.' log.warning(message) return mk_resp(data=False, status_code=401, status_message=message) # Unauthorized payload = {} payload['account_id'] = account_id payload['person_id'] = person_id payload['user_id'] = user_id token = sign_jwt(secret_key=api_secret_key, public_key=api_public_key, ttl=max_ttl, max_renew=max_renew, **payload) response_data = { 'jwt': token } return mk_resp(data=response_data) # ### END ### API API ### request_jwt() ### @router.get('/temp_token', response_model=Resp_Body_Base) async def get_api_temp_token( x_aether_api_key: Optional[str] = Header(None), x_aether_api_token: Optional[str] = Header(None), x_aether_api_token_expire_on: Optional[str] = Header(None), response: Response = Response, ): log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) table_name_select = 'api_key' field_name = 'secret_key' field_value = x_aether_api_key if x_aether_api_key: log.debug(f'Contains a value in x_aether_api_key: {x_aether_api_key}') sql_result = sql_select(table_name=table_name_select, field_name=field_name, field_value=field_value) else: return mk_resp(data=False, status_code=400, response=response) # Bad Request # log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL if sql_result: log.debug(type(sql_result)) log.debug(sql_result) base_name = Api_Base log.debug(base_name) resp_data = base_name(**sql_result).dict(by_alias=True, exclude_unset=False) log.debug(resp_data) return mk_resp(data=resp_data) else: log.debug(sql_result) return mk_resp(data=False, status_code=404, response=response) # Updated 2025-09-18 # It's best practice to import settings from a config file or environment variables # For this example, we'll hardcode them, but you should use your actual values # from your .env file JWT_APP_ID = "my_jitsi_app_id" JWT_APP_SECRET = "my_jitsi_app_secret-9876543210" # Define the data model for the incoming request body from the client class JitsiTokenRequest(BaseModel): room: str = Field(..., description="The name of the Jitsi room.") name: str = Field(..., description="The display name of the user.") email: EmailStr = Field(..., description="The email of the user.") is_moderator: bool = Field(..., description="Whether the user should be a moderator.") features: Optional[Dict[str, bool]] = Field( None, description="Optional features to enable for the Jitsi meeting, such as livestreaming, recording, etc." ) # Optional settings for the Jitsi meeting # settings: Optional[Dict[str, Union[bool, int]]] = Field( # None, # description="Optional settings for the Jitsi meeting, such as startAudioMuted, startVideoMuted, etc." # ) # A simple endpoint to generate the Jitsi-specific JWT @router.post("/jitsi_token") async def create_jitsi_jwt( request_data: JitsiTokenRequest = Body(...), # commons: Common_Route_Params_Min = Depends(common_route_params_min), ): """ Generates a Jitsi-specific JWT token for authentication. The token includes claims to set the user's name, email, and moderator status. """ log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) log.info("Generating Jitsi JWT...") try: # Define the JWT payload with all the required claims for Jitsi. # This is where we securely set the moderator and user info. payload = { "aud": "jitsi", "iss": JWT_APP_ID, "sub": "jitsi.dgrzone.com", # Your Jitsi base domain "room": request_data.room, "exp": int(time.time()) + 3600, # Token expires in 1 hour "context": { "user": { "name": request_data.name, "email": request_data.email, "moderator": "true" if request_data.is_moderator else "false" }, "features": request_data.features if request_data.features else { "livestreaming": False, "recording": False, "transcription": False, "outbound-call": False, "sip-outbound-call": False, }, # "features": { # "livestreaming": True, # "recording": True, # "transcription": True, # "outbound-call": True, # "sip-outbound-call": True, # }, # "settings": request_data.settings if hasattr(request_data, 'settings') else None, # { # "disableAudioLevels": False, # "startAudioMuted": request_data.settings.startAudioMuted, # "startVideoMuted": request_data.settings.startVideoMuted, # "startMuted": request_data.settings.startMuted, # "startHidden": request_data.settings.startHidden, # "followMe": request_data.settings.followMe, # "reactionsMuted": request_data.settings.reactionsMuted # } } } log.debug(payload) # Sign the JWT with your secret key # The algorithm must be the same as configured in your Prosody setup (HS256) token = jwt.encode(payload, JWT_APP_SECRET, algorithm="HS256") log.info("Jitsi JWT generated successfully.") log.debug(token) return {"token": token} except Exception as e: raise HTTPException(status_code=500, detail=f"Failed to create JWT: {str(e)}") @router.post('', response_model=Resp_Body_Base) async def post_api_obj( obj: Api_Base, x_account_id: str = Header(...), return_obj: Optional[bool] = True, by_alias: Optional[bool] = True, exclude_unset: Optional[bool] = True, response: Response = Response, ): log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) obj_type = 'api' obj_data_dict = obj.dict(by_alias=False, exclude_unset=True) result = post_obj_template( obj_type=obj_type, data=obj_data_dict, return_obj=True, by_alias=True, exclude_unset=True, ) return result @router.patch('/{obj_id}', response_model=Resp_Body_Base) async def patch_api_obj( obj_id: str, obj: Api_Base = None, x_account_id: Optional[str] = Header(..., ), return_obj: Optional[bool] = True, by_alias: Optional[bool] = True, exclude_unset: Optional[bool] = True, response: Response = Response, ): log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) obj_type = 'api' obj_data_dict = obj.dict(by_alias=False, exclude_unset=True) obj_data_dict['id'] = redis_lookup_id_random(record_id_random=obj_id, table_name=obj_type) obj_data_dict['id_random'] = obj_id result = patch_obj_template( obj_type=obj_type, data=obj_data_dict, obj_id=obj_id, return_obj=True, by_alias=True, exclude_unset=True, ) return result @router.get('/list', response_model=Resp_Body_Base) async def get_api_obj_li( for_obj_type: Optional[str] = Query(None, min_length=2, max_length=50), for_obj_id: Optional[str] = Query(None, min_length=1, max_length=22), x_account_id: str = Header(...), by_alias: Optional[bool] = True, exclude_unset: Optional[bool] = True, response: Response = Response, ): log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) obj_type = 'api' result = get_obj_li_template( obj_type=obj_type, for_obj_type=for_obj_type, for_obj_id=for_obj_id, by_alias=True, exclude_unset=True, ) return result @router.get('/{obj_id}', response_model=Resp_Body_Base) async def get_api_obj( obj_id: str, x_account_id: str = Header(...), by_alias: Optional[bool] = True, exclude_unset: Optional[bool] = True, response: Response = Response, ): log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) obj_type = 'api' result = get_obj_template( obj_type=obj_type, obj_id=obj_id, by_alias=True, exclude_unset=True, ) return result @router.delete('/{obj_id}', response_model=Resp_Body_Base) async def delete_api_obj( obj_id: str, x_account_id: str = Header(...), response: Response = Response, ): log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) obj_type = 'api' result = delete_obj_template( obj_type=obj_type, obj_id=obj_id, ) return result @router.get('/get_id/{object_type}/{object_id_random}', response_model=Resp_Body_Base) async def get_api_object_id( object_type: str, object_id_random: str, x_account_id: str = Header(...), by_alias: Optional[bool] = True, exclude_unset: Optional[bool] = True, response: Response = Response, ): log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) if object_id := redis_lookup_id_random(record_id_random=object_id_random, table_name=object_type): return mk_resp(data={ 'object_id': object_id}, status_code=400) else: return mk_resp(data=None, status_code=400)