from fastapi import APIRouter, Body, Depends, Header, HTTPException, Path, Query, Response, status from pydantic import BaseModel, EmailStr, Field from typing import Dict, List, Optional, Set, Union from sqlalchemy import text import json import time import secrets import jwt as pyjwt # Avoid conflict with app.lib_jwt from app.db_connection import db from app.lib_general import sign_jwt, decode_jwt, log, logging from app.config import settings from app.db_sql import sql_insert, sql_update, sql_select, redis_lookup_id_random, get_id_random from app.routers.api_crud import delete_obj_template, get_obj_template, get_obj_li_template, patch_obj_template, post_obj_template from app.routers.dependencies_v3 import DeprecationParams from app.models.api_models import Api_Base from app.models.response_models import Resp_Body_Base, mk_resp router = APIRouter() # --- Passcode Authentication --- class PasscodeAuthRequest(BaseModel): """Request model for site-based passcode authentication.""" site_id: str = Field(..., description="The random string ID of the site") passcode: str = Field(..., description="The passcode to verify") @router.post('/authenticate_passcode', response_model=Resp_Body_Base) async def authenticate_passcode( auth_req: PasscodeAuthRequest, response: Response = Response, ): """ Passcode-to-JWT Endpoint. Verifies a passcode against site.access_code_kv_json. Returns a signed JWT with the site's account context and role flags. """ log.setLevel(logging.INFO) log.debug(locals()) site_id = auth_req.site_id passcode = auth_req.passcode # 1. Look up the site record search_data = {'id_random': site_id} if record := sql_select(table_name='site', data=search_data): # 2. Parse access codes access_codes_raw = record.get('access_code_kv_json') access_codes = {} if access_codes_raw: try: access_codes = json.loads(access_codes_raw) if isinstance(access_codes_raw, str) else access_codes_raw except Exception as e: log.error(f"Failed to parse access_code_kv_json for site {site_id}: {e}") # 3. Verify Passcode and Resolve Role matched_role = None for role, code in access_codes.items(): if str(code) == str(passcode): matched_role = role break if matched_role: log.info(f"Auth Success: Verified '{matched_role}' passcode for site {site_id}") # 4. Resolve Account Context account_id_random = record.get('account_id_random') if not account_id_random: if account_id_int := record.get('account_id'): account_id_random = get_id_random(record_id=account_id_int, table_name='account') # 5. Mint JWT payload = { 'account_id': account_id_random, 'administrator': (matched_role == 'administrator'), 'manager': (matched_role == 'manager'), 'super': (matched_role == 'super'), 'json_str': json.dumps({ 'auth_type': 'passcode', 'site_id': site_id, 'role': matched_role }) } token = sign_jwt( secret_key=settings.JWT_KEY, ttl=3600 * 24, # 24 hour session **payload ) return mk_resp(data={'jwt': token, 'account_id': account_id_random, 'role': matched_role}, response=response) else: log.warning(f"Auth Failed: Invalid passcode for site {site_id}") return mk_resp(data=False, status_code=401, response=response, status_message="Invalid passcode.") else: log.warning(f"Auth Failed: Site {site_id} not found.") return mk_resp(data=False, status_code=404, response=response, status_message="Site not found.") # --- JWT Request --- @router.get('/request_jwt', response_model=Resp_Body_Base, dependencies=[Depends(DeprecationParams)]) async def request_jwt( x_aether_signing_key: Optional[str] = Header(None, min_length=22, max_length=22), x_aether_api_key: Optional[str] = Header(None, min_length=22, max_length=22), x_aether_jwt: Optional[str] = Header(None), account_id: str = None, person_id: str = None, user_id: str = None, json_str: str = None, b64_str: str = None, max_ttl: int = 300, max_renew: int = 5, response: Response = Response, ): log.setLevel(logging.WARNING) log.debug(locals()) if account_id or json_str or b64_str: pass else: return mk_resp(data=False, status_code=400, response=response) if x_aether_signing_key: pass elif x_aether_api_key: max_ttl = 3600 max_renew = 5 signing_key = None if x_aether_signing_key: signing_key = x_aether_signing_key elif settings.JWT_KEY: signing_key = settings.JWT_KEY else: log.error('No key found to sign the JWT with!') return mk_resp(data=False, status_code=400, response=response) # SECURITY PATCH: Prevent public API key from minting privileged tokens if not x_aether_signing_key and not x_aether_jwt: if account_id or person_id or user_id: log.warning("Security: Attempt to mint privileged JWT without signing key. Downgrading to Guest.") account_id = None person_id = None user_id = None payload = { 'account_id': account_id, 'person_id': person_id, 'user_id': user_id, 'json_str': json_str, 'b64_str': b64_str, } token = sign_jwt(secret_key=signing_key, public_key=x_aether_api_key, ttl=max_ttl, max_renew=max_renew, **payload) return mk_resp(data={ 'jwt': token }) @router.get('/temp_token', response_model=Resp_Body_Base, dependencies=[Depends(DeprecationParams)]) async def get_api_temp_token( x_aether_api_key: Optional[str] = Header(None), response: Response = Response, ): log.setLevel(logging.WARNING) table_name_select = 'api_key' if x_aether_api_key: sql_result = sql_select(table_name=table_name_select, field_name='secret_key', field_value=x_aether_api_key) if sql_result: resp_data = Api_Base(**sql_result).dict(by_alias=True, exclude_unset=False) return mk_resp(data=resp_data) return mk_resp(data=False, status_code=404, response=response) # --- Jitsi Token --- # NOTE: This is still actively used by IDAA for their video conferences using self hosted Jitsi. Thi is actually live. We do need to change the app secret once things have stabilized. JWT_APP_ID = "my_jitsi_app_id" JWT_APP_SECRET = "my_jitsi_app_secret-9876543210" JITSI_DOMAIN = "jitsi.dgrzone.com" class JitsiTokenRequest(BaseModel): room: str = Field(..., description="The name of the Jitsi room.") name: str = Field(..., description="The display name of the user.") email: EmailStr = Field(..., description="The email of the user.") is_moderator: bool = Field(..., description="Whether the user should be a moderator.") user: Optional[Dict[str, Union[str, bool]]] = Field(None) features: Optional[Dict[str, bool]] = Field(None) settings: Optional[Dict[str, bool]] = Field(None) config: Optional[Dict] = Field(None) @router.post("/jitsi_token") async def create_jitsi_jwt(request_data: JitsiTokenRequest = Body(...)): log.setLevel(logging.INFO) try: payload = { "aud": JWT_APP_ID, "iss": JWT_APP_ID, "sub": JITSI_DOMAIN, "room": request_data.room, "exp": int(time.time()) + 3600, "config": request_data.config or {}, "context": { "user": { "id": request_data.user['id'] if request_data.user else "guest", "name": request_data.name, "email": request_data.email, "moderator": request_data.is_moderator, }, "features": request_data.features or {}, "settings": request_data.settings or {}, } } token = pyjwt.encode(payload, JWT_APP_SECRET, algorithm="HS256") return {"token": token} except Exception as e: log.exception("Failed to create Jitsi JWT") raise HTTPException(status_code=500, detail=f"Failed to create JWT: {str(e)}") # --- Api_Base CRUD --- # LEGACY (disabled) - superseded by V3 CRUD: /v3/crud/api/ # @router.post('', response_model=Resp_Body_Base) # async def post_api_obj(obj: Api_Base, x_account_id: str = Header(...)): # return post_obj_template(obj_type='api', data=obj.dict(by_alias=False, exclude_unset=True), return_obj=True) # @router.patch('/{obj_id}', response_model=Resp_Body_Base) # async def patch_api_obj(obj_id: str, obj: Api_Base, x_account_id: str = Header(...)): # data = obj.dict(by_alias=False, exclude_unset=True) # data['id'] = redis_lookup_id_random(record_id_random=obj_id, table_name='api') # return patch_obj_template(obj_type='api', data=data, obj_id=obj_id, return_obj=True) # @router.get('/list', response_model=Resp_Body_Base) # async def get_api_obj_li(for_obj_type: Optional[str] = Query(None), for_obj_id: Optional[str] = Query(None), x_account_id: str = Header(...)): # return get_obj_li_template(obj_type='api', for_obj_type=for_obj_type, for_obj_id=for_obj_id) # @router.get('/{obj_id}', response_model=Resp_Body_Base) # async def get_api_obj(obj_id: str, x_account_id: str = Header(...)): # return get_obj_template(obj_type='api', obj_id=obj_id) # @router.delete('/{obj_id}', response_model=Resp_Body_Base) # async def delete_api_obj(obj_id: str, x_account_id: str = Header(...)): # return delete_obj_template(obj_type='api', obj_id=obj_id) # LEGACY (disabled) - exposes internal integer IDs, breaks id_random abstraction # @router.get('/get_id/{object_type}/{object_id_random}', response_model=Resp_Body_Base) # async def get_api_object_id(object_type: str, object_id_random: str): # if object_id := redis_lookup_id_random(record_id_random=object_id_random, table_name=object_type): # return mk_resp(data={ 'object_id': object_id}) # return mk_resp(data=None, status_code=404) # LEGACY (disabled) - testing/debug endpoint # @router.get('/sql_test', tags=['Testing']) # async def sql_test(response: Response = Response): # sql = text("SELECT NOW() as current_time, VERSION() as version") # try: # result = db.execute(sql).fetchone() # return mk_resp(data={"current_time": str(result[0]), "version": result[1]}) # except Exception as e: # return mk_resp(data=False, status_code=500, details=str(e), response=response)