Files
OSIT-AE-API-FastAPI/app/routers/api.py
2026-04-02 15:57:36 -04:00

256 lines
11 KiB
Python

from fastapi import APIRouter, Body, Depends, Header, HTTPException, Path, Query, Response, status
from pydantic import BaseModel, EmailStr, Field
from typing import Dict, List, Optional, Set, Union
from sqlalchemy import text
import json
import time
import secrets
import jwt as pyjwt # Avoid conflict with app.lib_jwt
from app.db_connection import db
from app.lib_general import sign_jwt, decode_jwt, log, logging
from app.config import settings
from app.db_sql import sql_insert, sql_update, sql_select, redis_lookup_id_random, get_id_random
from app.routers.api_crud import delete_obj_template, get_obj_template, get_obj_li_template, patch_obj_template, post_obj_template
from app.routers.dependencies_v3 import DeprecationParams
from app.models.api_models import Api_Base
from app.models.response_models import Resp_Body_Base, mk_resp
router = APIRouter()
# --- Passcode Authentication ---
class PasscodeAuthRequest(BaseModel):
"""Request model for site-based passcode authentication."""
site_id: str = Field(..., description="The random string ID of the site")
passcode: str = Field(..., description="The passcode to verify")
@router.post('/authenticate_passcode', response_model=Resp_Body_Base)
async def authenticate_passcode(
auth_req: PasscodeAuthRequest,
response: Response = Response,
):
"""
Passcode-to-JWT Endpoint.
Verifies a passcode against site.access_code_kv_json.
Returns a signed JWT with the site's account context and role flags.
"""
log.setLevel(logging.INFO)
log.debug(locals())
site_id = auth_req.site_id
passcode = auth_req.passcode
# 1. Look up the site record
search_data = {'id_random': site_id}
if record := sql_select(table_name='site', data=search_data):
# 2. Parse access codes
access_codes_raw = record.get('access_code_kv_json')
access_codes = {}
if access_codes_raw:
try:
access_codes = json.loads(access_codes_raw) if isinstance(access_codes_raw, str) else access_codes_raw
except Exception as e:
log.error(f"Failed to parse access_code_kv_json for site {site_id}: {e}")
# 3. Verify Passcode and Resolve Role
matched_role = None
for role, code in access_codes.items():
if str(code) == str(passcode):
matched_role = role
break
if matched_role:
log.info(f"Auth Success: Verified '{matched_role}' passcode for site {site_id}")
# 4. Resolve Account Context
account_id_random = record.get('account_id_random')
if not account_id_random:
if account_id_int := record.get('account_id'):
account_id_random = get_id_random(record_id=account_id_int, table_name='account')
# 5. Mint JWT
payload = {
'account_id': account_id_random,
'administrator': (matched_role == 'administrator'),
'manager': (matched_role == 'manager'),
'super': (matched_role == 'super'),
'json_str': json.dumps({
'auth_type': 'passcode',
'site_id': site_id,
'role': matched_role
})
}
token = sign_jwt(
secret_key=settings.JWT_KEY,
ttl=3600 * 24, # 24 hour session
**payload
)
return mk_resp(data={'jwt': token, 'account_id': account_id_random, 'role': matched_role}, response=response)
else:
log.warning(f"Auth Failed: Invalid passcode for site {site_id}")
return mk_resp(data=False, status_code=401, response=response, status_message="Invalid passcode.")
else:
log.warning(f"Auth Failed: Site {site_id} not found.")
return mk_resp(data=False, status_code=404, response=response, status_message="Site not found.")
# --- JWT Request ---
@router.get('/request_jwt', response_model=Resp_Body_Base, dependencies=[Depends(DeprecationParams)])
async def request_jwt(
x_aether_signing_key: Optional[str] = Header(None, min_length=22, max_length=22),
x_aether_api_key: Optional[str] = Header(None, min_length=22, max_length=22),
x_aether_jwt: Optional[str] = Header(None),
account_id: str = None,
person_id: str = None,
user_id: str = None,
json_str: str = None,
b64_str: str = None,
max_ttl: int = 300,
max_renew: int = 5,
response: Response = Response,
):
log.setLevel(logging.WARNING)
log.debug(locals())
if account_id or json_str or b64_str: pass
else: return mk_resp(data=False, status_code=400, response=response)
if x_aether_signing_key: pass
elif x_aether_api_key:
max_ttl = 3600
max_renew = 5
signing_key = None
if x_aether_signing_key:
signing_key = x_aether_signing_key
elif settings.JWT_KEY:
signing_key = settings.JWT_KEY
else:
log.error('No key found to sign the JWT with!')
return mk_resp(data=False, status_code=400, response=response)
# SECURITY PATCH: Prevent public API key from minting privileged tokens
if not x_aether_signing_key and not x_aether_jwt:
if account_id or person_id or user_id:
log.warning("Security: Attempt to mint privileged JWT without signing key. Downgrading to Guest.")
account_id = None
person_id = None
user_id = None
payload = {
'account_id': account_id,
'person_id': person_id,
'user_id': user_id,
'json_str': json_str,
'b64_str': b64_str,
}
token = sign_jwt(secret_key=signing_key, public_key=x_aether_api_key, ttl=max_ttl, max_renew=max_renew, **payload)
return mk_resp(data={ 'jwt': token })
@router.get('/temp_token', response_model=Resp_Body_Base, dependencies=[Depends(DeprecationParams)])
async def get_api_temp_token(
x_aether_api_key: Optional[str] = Header(None),
response: Response = Response,
):
log.setLevel(logging.WARNING)
table_name_select = 'api_key'
if x_aether_api_key:
sql_result = sql_select(table_name=table_name_select, field_name='secret_key', field_value=x_aether_api_key)
if sql_result:
resp_data = Api_Base(**sql_result).dict(by_alias=True, exclude_unset=False)
return mk_resp(data=resp_data)
return mk_resp(data=False, status_code=404, response=response)
# --- Jitsi Token ---
# NOTE: Actively used by IDAA for video conferences on self-hosted Jitsi (jitsi.dgrzone.com).
# JWT_APP_ID and JWT_APP_SECRET must match the values in the Jitsi server .env file.
# TODO: Rotate JWT_APP_SECRET — update it here AND in /mnt/nfs_dgr_storage/env/dgr_zone_jitsi/.env (JWT_APP_SECRET) then restart prosody + jicofo.
JWT_APP_ID = "my_jitsi_app_id"
JWT_APP_SECRET = "my_jitsi_app_secret-9876543210"
JITSI_DOMAIN = "jitsi.dgrzone.com"
class JitsiTokenRequest(BaseModel):
room: str = Field(..., description="The name of the Jitsi room.")
name: str = Field(..., description="The display name of the user.")
email: EmailStr = Field(..., description="The email of the user.")
is_moderator: bool = Field(..., description="Whether the user should be a moderator.")
user: Optional[Dict[str, Union[str, bool]]] = Field(None)
features: Optional[Dict[str, bool]] = Field(None)
settings: Optional[Dict[str, bool]] = Field(None)
config: Optional[Dict] = Field(None)
@router.post("/jitsi_token")
async def create_jitsi_jwt(request_data: JitsiTokenRequest = Body(...)):
log.setLevel(logging.INFO)
try:
payload = {
"aud": JWT_APP_ID, "iss": JWT_APP_ID, "sub": JITSI_DOMAIN,
"room": request_data.room,
"exp": int(time.time()) + 7200, # 2 hour expiry
"config": request_data.config or {},
"context": {
"user": {
"id": request_data.user['id'] if request_data.user else "guest",
"name": request_data.name,
"email": request_data.email,
"moderator": request_data.is_moderator,
},
"features": request_data.features or {},
"settings": request_data.settings or {},
}
}
token = pyjwt.encode(payload, JWT_APP_SECRET, algorithm="HS256")
return {"token": token}
except Exception as e:
log.exception("Failed to create Jitsi JWT")
raise HTTPException(status_code=500, detail=f"Failed to create JWT: {str(e)}")
# --- Api_Base CRUD ---
# LEGACY (disabled) - superseded by V3 CRUD: /v3/crud/api/
# @router.post('', response_model=Resp_Body_Base)
# async def post_api_obj(obj: Api_Base, x_account_id: str = Header(...)):
# return post_obj_template(obj_type='api', data=obj.dict(by_alias=False, exclude_unset=True), return_obj=True)
# @router.patch('/{obj_id}', response_model=Resp_Body_Base)
# async def patch_api_obj(obj_id: str, obj: Api_Base, x_account_id: str = Header(...)):
# data = obj.dict(by_alias=False, exclude_unset=True)
# data['id'] = redis_lookup_id_random(record_id_random=obj_id, table_name='api')
# return patch_obj_template(obj_type='api', data=data, obj_id=obj_id, return_obj=True)
# @router.get('/list', response_model=Resp_Body_Base)
# async def get_api_obj_li(for_obj_type: Optional[str] = Query(None), for_obj_id: Optional[str] = Query(None), x_account_id: str = Header(...)):
# return get_obj_li_template(obj_type='api', for_obj_type=for_obj_type, for_obj_id=for_obj_id)
# @router.get('/{obj_id}', response_model=Resp_Body_Base)
# async def get_api_obj(obj_id: str, x_account_id: str = Header(...)):
# return get_obj_template(obj_type='api', obj_id=obj_id)
# @router.delete('/{obj_id}', response_model=Resp_Body_Base)
# async def delete_api_obj(obj_id: str, x_account_id: str = Header(...)):
# return delete_obj_template(obj_type='api', obj_id=obj_id)
# LEGACY (disabled) - exposes internal integer IDs, breaks id_random abstraction
# @router.get('/get_id/{object_type}/{object_id_random}', response_model=Resp_Body_Base)
# async def get_api_object_id(object_type: str, object_id_random: str):
# if object_id := redis_lookup_id_random(record_id_random=object_id_random, table_name=object_type):
# return mk_resp(data={ 'object_id': object_id})
# return mk_resp(data=None, status_code=404)
# LEGACY (disabled) - testing/debug endpoint
# @router.get('/sql_test', tags=['Testing'])
# async def sql_test(response: Response = Response):
# sql = text("SELECT NOW() as current_time, VERSION() as version")
# try:
# result = db.execute(sql).fetchone()
# return mk_resp(data={"current_time": str(result[0]), "version": result[1]})
# except Exception as e:
# return mk_resp(data=False, status_code=500, details=str(e), response=response)