diff --git a/app/routers/api.py b/app/routers/api.py index dd463ac..7e42b14 100644 --- a/app/routers/api.py +++ b/app/routers/api.py @@ -168,7 +168,9 @@ async def get_api_temp_token( # --- Jitsi Token --- -# NOTE: This is still actively used by IDAA for their video conferences using self hosted Jitsi. Thi is actually live. We do need to change the app secret once things have stabilized. +# NOTE: Actively used by IDAA for video conferences on self-hosted Jitsi (jitsi.dgrzone.com). +# JWT_APP_ID and JWT_APP_SECRET must match the values in the Jitsi server .env file. +# TODO: Rotate JWT_APP_SECRET — update it here AND in /mnt/nfs_dgr_storage/env/dgr_zone_jitsi/.env (JWT_APP_SECRET) then restart prosody + jicofo. JWT_APP_ID = "my_jitsi_app_id" JWT_APP_SECRET = "my_jitsi_app_secret-9876543210" diff --git a/tests/e2e/test_e2e_jitsi_token.py b/tests/e2e/test_e2e_jitsi_token.py new file mode 100644 index 0000000..611bb1a --- /dev/null +++ b/tests/e2e/test_e2e_jitsi_token.py @@ -0,0 +1,185 @@ +""" +Jitsi JWT Token E2E Test Suite + +Tests the /api/jitsi_token endpoint to verify: + - Moderator tokens contain moderator=true in the JWT payload + - Attendee tokens contain moderator=false in the JWT payload + - Room claim is correctly scoped per request + - Basic validation rejects malformed input + +Run from project root: + ./environment/bin/python3 tests/e2e/test_e2e_jitsi_token.py +""" + +import sys +import os +import json +import base64 +import time +import requests + +sys.path.append(os.getcwd()) + +# --- Configuration --- +API_ROOT = "https://dev-api.oneskyit.com" +JITSI_ENDPOINT = f"{API_ROOT}/api/jitsi_token" + +TEST_ROOM = "idaa-test-room-001" +TEST_NAME = "E2E Test User" +TEST_EMAIL = "e2e-test@oneskyit.com" + + +def print_result(label, success, message=""): + status = "✅ PASS" if success else "❌ FAIL" + suffix = f" — {message}" if message else "" + print(f" [{status}] {label}{suffix}") + + +def decode_jwt_payload(token: str) -> dict: + """Decode a JWT payload without signature verification (for inspection).""" + try: + parts = token.split(".") + if len(parts) != 3: + return {} + # Add padding + padded = parts[1] + "=" * (4 - len(parts[1]) % 4) + return json.loads(base64.urlsafe_b64decode(padded)) + except Exception: + return {} + + +def test_moderator_token(): + """Request a moderator JWT and verify the claim is set correctly.""" + print("\n--- Test: Moderator Token ---") + payload = { + "room": TEST_ROOM, + "name": TEST_NAME, + "email": TEST_EMAIL, + "is_moderator": True, + } + resp = requests.post(JITSI_ENDPOINT, json=payload) + print_result("HTTP 200", resp.status_code == 200, f"status={resp.status_code}") + + if resp.status_code != 200: + print(f" Response: {resp.text}") + return None + + token = resp.json().get("token") + print_result("Token returned", bool(token)) + if not token: + return None + + decoded = decode_jwt_payload(token) + print(f" Decoded payload: {json.dumps(decoded, indent=6)}") + + moderator_claim = decoded.get("context", {}).get("user", {}).get("moderator") + room_claim = decoded.get("room") + + print_result("moderator == True", moderator_claim is True, f"got: {moderator_claim!r}") + print_result("room scoped correctly", room_claim == TEST_ROOM, f"got: {room_claim!r}") + + return token + + +def test_attendee_token(): + """Request a non-moderator JWT and verify the claim is False.""" + print("\n--- Test: Attendee Token (is_moderator=False) ---") + payload = { + "room": TEST_ROOM, + "name": TEST_NAME, + "email": TEST_EMAIL, + "is_moderator": False, + } + resp = requests.post(JITSI_ENDPOINT, json=payload) + print_result("HTTP 200", resp.status_code == 200, f"status={resp.status_code}") + + if resp.status_code != 200: + print(f" Response: {resp.text}") + return None + + token = resp.json().get("token") + print_result("Token returned", bool(token)) + if not token: + return None + + decoded = decode_jwt_payload(token) + print(f" Decoded payload: {json.dumps(decoded, indent=6)}") + + moderator_claim = decoded.get("context", {}).get("user", {}).get("moderator") + print_result("moderator == False", moderator_claim is False, f"got: {moderator_claim!r}") + + return token + + +def test_room_isolation(): + """Verify two requests for different rooms produce different room claims.""" + print("\n--- Test: Room Isolation ---") + rooms = ["room-alpha", "room-beta"] + tokens = [] + for room in rooms: + resp = requests.post(JITSI_ENDPOINT, json={ + "room": room, "name": TEST_NAME, "email": TEST_EMAIL, "is_moderator": False + }) + if resp.status_code == 200: + tokens.append((room, decode_jwt_payload(resp.json().get("token", "")))) + + if len(tokens) == 2: + match_0 = tokens[0][1].get("room") == tokens[0][0] + match_1 = tokens[1][1].get("room") == tokens[1][0] + print_result("room-alpha scoped", match_0, f"got: {tokens[0][1].get('room')!r}") + print_result("room-beta scoped", match_1, f"got: {tokens[1][1].get('room')!r}") + print_result("Rooms differ", tokens[0][1].get("room") != tokens[1][1].get("room")) + else: + print_result("Both requests succeeded", False, "could not get both tokens") + + +def test_invalid_email(): + """Verify that a malformed email is rejected with 422.""" + print("\n--- Test: Input Validation (bad email) ---") + payload = { + "room": TEST_ROOM, + "name": TEST_NAME, + "email": "not-an-email", + "is_moderator": False, + } + resp = requests.post(JITSI_ENDPOINT, json=payload) + print_result("422 on bad email", resp.status_code == 422, f"status={resp.status_code}") + + +def test_token_expiry(): + """Verify the exp claim is approximately 1 hour from now.""" + print("\n--- Test: Token Expiry (exp claim) ---") + payload = { + "room": TEST_ROOM, "name": TEST_NAME, "email": TEST_EMAIL, "is_moderator": False + } + resp = requests.post(JITSI_ENDPOINT, json=payload) + if resp.status_code != 200: + print_result("HTTP 200 (skipping exp check)", False) + return + + decoded = decode_jwt_payload(resp.json().get("token", "")) + exp = decoded.get("exp") + now = int(time.time()) + ttl = exp - now if exp else 0 + # Should be ~3600s (allow 30s window for test runtime) + ok = 3550 < ttl <= 3600 + print_result("exp ≈ now + 3600s", ok, f"ttl={ttl}s") + + +if __name__ == "__main__": + suite_start = time.time() + print("=" * 55) + print(" Jitsi JWT Token — E2E Test Suite") + print(f" Endpoint: {JITSI_ENDPOINT}") + print("=" * 55) + + test_moderator_token() + test_attendee_token() + test_room_isolation() + test_invalid_email() + test_token_expiry() + + elapsed = time.time() - suite_start + print(f"\n{'=' * 55}") + print(f" Suite completed in {elapsed:.2f}s") + print("=" * 55)