Files
OSIT-AE-API-FastAPI/tests/e2e/test_e2e_jitsi_token.py
Scott Idem 2a1f270db6 feat(jitsi): add JWT token E2E test suite and improve api.py comments
- Add tests/e2e/test_e2e_jitsi_token.py: verifies moderator/attendee claims,
  room isolation, input validation, and exp claim correctness
- Update Jitsi section comment in api.py with actionable secret rotation TODO
  (must update JWT_APP_SECRET here AND in dgr_zone_jitsi .env, then restart
  prosody + jicofo)
2026-04-02 12:57:44 -04:00

186 lines
5.8 KiB
Python

"""
Jitsi JWT Token E2E Test Suite
Tests the /api/jitsi_token endpoint to verify:
- Moderator tokens contain moderator=true in the JWT payload
- Attendee tokens contain moderator=false in the JWT payload
- Room claim is correctly scoped per request
- Basic validation rejects malformed input
Run from project root:
./environment/bin/python3 tests/e2e/test_e2e_jitsi_token.py
"""
import sys
import os
import json
import base64
import time
import requests
sys.path.append(os.getcwd())
# --- Configuration ---
API_ROOT = "https://dev-api.oneskyit.com"
JITSI_ENDPOINT = f"{API_ROOT}/api/jitsi_token"
TEST_ROOM = "idaa-test-room-001"
TEST_NAME = "E2E Test User"
TEST_EMAIL = "e2e-test@oneskyit.com"
def print_result(label, success, message=""):
status = "✅ PASS" if success else "❌ FAIL"
suffix = f"{message}" if message else ""
print(f" [{status}] {label}{suffix}")
def decode_jwt_payload(token: str) -> dict:
"""Decode a JWT payload without signature verification (for inspection)."""
try:
parts = token.split(".")
if len(parts) != 3:
return {}
# Add padding
padded = parts[1] + "=" * (4 - len(parts[1]) % 4)
return json.loads(base64.urlsafe_b64decode(padded))
except Exception:
return {}
def test_moderator_token():
"""Request a moderator JWT and verify the claim is set correctly."""
print("\n--- Test: Moderator Token ---")
payload = {
"room": TEST_ROOM,
"name": TEST_NAME,
"email": TEST_EMAIL,
"is_moderator": True,
}
resp = requests.post(JITSI_ENDPOINT, json=payload)
print_result("HTTP 200", resp.status_code == 200, f"status={resp.status_code}")
if resp.status_code != 200:
print(f" Response: {resp.text}")
return None
token = resp.json().get("token")
print_result("Token returned", bool(token))
if not token:
return None
decoded = decode_jwt_payload(token)
print(f" Decoded payload: {json.dumps(decoded, indent=6)}")
moderator_claim = decoded.get("context", {}).get("user", {}).get("moderator")
room_claim = decoded.get("room")
print_result("moderator == True", moderator_claim is True, f"got: {moderator_claim!r}")
print_result("room scoped correctly", room_claim == TEST_ROOM, f"got: {room_claim!r}")
return token
def test_attendee_token():
"""Request a non-moderator JWT and verify the claim is False."""
print("\n--- Test: Attendee Token (is_moderator=False) ---")
payload = {
"room": TEST_ROOM,
"name": TEST_NAME,
"email": TEST_EMAIL,
"is_moderator": False,
}
resp = requests.post(JITSI_ENDPOINT, json=payload)
print_result("HTTP 200", resp.status_code == 200, f"status={resp.status_code}")
if resp.status_code != 200:
print(f" Response: {resp.text}")
return None
token = resp.json().get("token")
print_result("Token returned", bool(token))
if not token:
return None
decoded = decode_jwt_payload(token)
print(f" Decoded payload: {json.dumps(decoded, indent=6)}")
moderator_claim = decoded.get("context", {}).get("user", {}).get("moderator")
print_result("moderator == False", moderator_claim is False, f"got: {moderator_claim!r}")
return token
def test_room_isolation():
"""Verify two requests for different rooms produce different room claims."""
print("\n--- Test: Room Isolation ---")
rooms = ["room-alpha", "room-beta"]
tokens = []
for room in rooms:
resp = requests.post(JITSI_ENDPOINT, json={
"room": room, "name": TEST_NAME, "email": TEST_EMAIL, "is_moderator": False
})
if resp.status_code == 200:
tokens.append((room, decode_jwt_payload(resp.json().get("token", ""))))
if len(tokens) == 2:
match_0 = tokens[0][1].get("room") == tokens[0][0]
match_1 = tokens[1][1].get("room") == tokens[1][0]
print_result("room-alpha scoped", match_0, f"got: {tokens[0][1].get('room')!r}")
print_result("room-beta scoped", match_1, f"got: {tokens[1][1].get('room')!r}")
print_result("Rooms differ", tokens[0][1].get("room") != tokens[1][1].get("room"))
else:
print_result("Both requests succeeded", False, "could not get both tokens")
def test_invalid_email():
"""Verify that a malformed email is rejected with 422."""
print("\n--- Test: Input Validation (bad email) ---")
payload = {
"room": TEST_ROOM,
"name": TEST_NAME,
"email": "not-an-email",
"is_moderator": False,
}
resp = requests.post(JITSI_ENDPOINT, json=payload)
print_result("422 on bad email", resp.status_code == 422, f"status={resp.status_code}")
def test_token_expiry():
"""Verify the exp claim is approximately 1 hour from now."""
print("\n--- Test: Token Expiry (exp claim) ---")
payload = {
"room": TEST_ROOM, "name": TEST_NAME, "email": TEST_EMAIL, "is_moderator": False
}
resp = requests.post(JITSI_ENDPOINT, json=payload)
if resp.status_code != 200:
print_result("HTTP 200 (skipping exp check)", False)
return
decoded = decode_jwt_payload(resp.json().get("token", ""))
exp = decoded.get("exp")
now = int(time.time())
ttl = exp - now if exp else 0
# Should be ~3600s (allow 30s window for test runtime)
ok = 3550 < ttl <= 3600
print_result("exp ≈ now + 3600s", ok, f"ttl={ttl}s")
if __name__ == "__main__":
suite_start = time.time()
print("=" * 55)
print(" Jitsi JWT Token — E2E Test Suite")
print(f" Endpoint: {JITSI_ENDPOINT}")
print("=" * 55)
test_moderator_token()
test_attendee_token()
test_room_isolation()
test_invalid_email()
test_token_expiry()
elapsed = time.time() - suite_start
print(f"\n{'=' * 55}")
print(f" Suite completed in {elapsed:.2f}s")
print("=" * 55)