- Add tests/e2e/test_e2e_jitsi_token.py: verifies moderator/attendee claims, room isolation, input validation, and exp claim correctness - Update Jitsi section comment in api.py with actionable secret rotation TODO (must update JWT_APP_SECRET here AND in dgr_zone_jitsi .env, then restart prosody + jicofo)
186 lines
5.8 KiB
Python
186 lines
5.8 KiB
Python
"""
|
|
Jitsi JWT Token E2E Test Suite
|
|
|
|
Tests the /api/jitsi_token endpoint to verify:
|
|
- Moderator tokens contain moderator=true in the JWT payload
|
|
- Attendee tokens contain moderator=false in the JWT payload
|
|
- Room claim is correctly scoped per request
|
|
- Basic validation rejects malformed input
|
|
|
|
Run from project root:
|
|
./environment/bin/python3 tests/e2e/test_e2e_jitsi_token.py
|
|
"""
|
|
|
|
import sys
|
|
import os
|
|
import json
|
|
import base64
|
|
import time
|
|
import requests
|
|
|
|
sys.path.append(os.getcwd())
|
|
|
|
# --- Configuration ---
|
|
API_ROOT = "https://dev-api.oneskyit.com"
|
|
JITSI_ENDPOINT = f"{API_ROOT}/api/jitsi_token"
|
|
|
|
TEST_ROOM = "idaa-test-room-001"
|
|
TEST_NAME = "E2E Test User"
|
|
TEST_EMAIL = "e2e-test@oneskyit.com"
|
|
|
|
|
|
def print_result(label, success, message=""):
|
|
status = "✅ PASS" if success else "❌ FAIL"
|
|
suffix = f" — {message}" if message else ""
|
|
print(f" [{status}] {label}{suffix}")
|
|
|
|
|
|
def decode_jwt_payload(token: str) -> dict:
|
|
"""Decode a JWT payload without signature verification (for inspection)."""
|
|
try:
|
|
parts = token.split(".")
|
|
if len(parts) != 3:
|
|
return {}
|
|
# Add padding
|
|
padded = parts[1] + "=" * (4 - len(parts[1]) % 4)
|
|
return json.loads(base64.urlsafe_b64decode(padded))
|
|
except Exception:
|
|
return {}
|
|
|
|
|
|
def test_moderator_token():
|
|
"""Request a moderator JWT and verify the claim is set correctly."""
|
|
print("\n--- Test: Moderator Token ---")
|
|
payload = {
|
|
"room": TEST_ROOM,
|
|
"name": TEST_NAME,
|
|
"email": TEST_EMAIL,
|
|
"is_moderator": True,
|
|
}
|
|
resp = requests.post(JITSI_ENDPOINT, json=payload)
|
|
print_result("HTTP 200", resp.status_code == 200, f"status={resp.status_code}")
|
|
|
|
if resp.status_code != 200:
|
|
print(f" Response: {resp.text}")
|
|
return None
|
|
|
|
token = resp.json().get("token")
|
|
print_result("Token returned", bool(token))
|
|
if not token:
|
|
return None
|
|
|
|
decoded = decode_jwt_payload(token)
|
|
print(f" Decoded payload: {json.dumps(decoded, indent=6)}")
|
|
|
|
moderator_claim = decoded.get("context", {}).get("user", {}).get("moderator")
|
|
room_claim = decoded.get("room")
|
|
|
|
print_result("moderator == True", moderator_claim is True, f"got: {moderator_claim!r}")
|
|
print_result("room scoped correctly", room_claim == TEST_ROOM, f"got: {room_claim!r}")
|
|
|
|
return token
|
|
|
|
|
|
def test_attendee_token():
|
|
"""Request a non-moderator JWT and verify the claim is False."""
|
|
print("\n--- Test: Attendee Token (is_moderator=False) ---")
|
|
payload = {
|
|
"room": TEST_ROOM,
|
|
"name": TEST_NAME,
|
|
"email": TEST_EMAIL,
|
|
"is_moderator": False,
|
|
}
|
|
resp = requests.post(JITSI_ENDPOINT, json=payload)
|
|
print_result("HTTP 200", resp.status_code == 200, f"status={resp.status_code}")
|
|
|
|
if resp.status_code != 200:
|
|
print(f" Response: {resp.text}")
|
|
return None
|
|
|
|
token = resp.json().get("token")
|
|
print_result("Token returned", bool(token))
|
|
if not token:
|
|
return None
|
|
|
|
decoded = decode_jwt_payload(token)
|
|
print(f" Decoded payload: {json.dumps(decoded, indent=6)}")
|
|
|
|
moderator_claim = decoded.get("context", {}).get("user", {}).get("moderator")
|
|
print_result("moderator == False", moderator_claim is False, f"got: {moderator_claim!r}")
|
|
|
|
return token
|
|
|
|
|
|
def test_room_isolation():
|
|
"""Verify two requests for different rooms produce different room claims."""
|
|
print("\n--- Test: Room Isolation ---")
|
|
rooms = ["room-alpha", "room-beta"]
|
|
tokens = []
|
|
for room in rooms:
|
|
resp = requests.post(JITSI_ENDPOINT, json={
|
|
"room": room, "name": TEST_NAME, "email": TEST_EMAIL, "is_moderator": False
|
|
})
|
|
if resp.status_code == 200:
|
|
tokens.append((room, decode_jwt_payload(resp.json().get("token", ""))))
|
|
|
|
if len(tokens) == 2:
|
|
match_0 = tokens[0][1].get("room") == tokens[0][0]
|
|
match_1 = tokens[1][1].get("room") == tokens[1][0]
|
|
print_result("room-alpha scoped", match_0, f"got: {tokens[0][1].get('room')!r}")
|
|
print_result("room-beta scoped", match_1, f"got: {tokens[1][1].get('room')!r}")
|
|
print_result("Rooms differ", tokens[0][1].get("room") != tokens[1][1].get("room"))
|
|
else:
|
|
print_result("Both requests succeeded", False, "could not get both tokens")
|
|
|
|
|
|
def test_invalid_email():
|
|
"""Verify that a malformed email is rejected with 422."""
|
|
print("\n--- Test: Input Validation (bad email) ---")
|
|
payload = {
|
|
"room": TEST_ROOM,
|
|
"name": TEST_NAME,
|
|
"email": "not-an-email",
|
|
"is_moderator": False,
|
|
}
|
|
resp = requests.post(JITSI_ENDPOINT, json=payload)
|
|
print_result("422 on bad email", resp.status_code == 422, f"status={resp.status_code}")
|
|
|
|
|
|
def test_token_expiry():
|
|
"""Verify the exp claim is approximately 1 hour from now."""
|
|
print("\n--- Test: Token Expiry (exp claim) ---")
|
|
payload = {
|
|
"room": TEST_ROOM, "name": TEST_NAME, "email": TEST_EMAIL, "is_moderator": False
|
|
}
|
|
resp = requests.post(JITSI_ENDPOINT, json=payload)
|
|
if resp.status_code != 200:
|
|
print_result("HTTP 200 (skipping exp check)", False)
|
|
return
|
|
|
|
decoded = decode_jwt_payload(resp.json().get("token", ""))
|
|
exp = decoded.get("exp")
|
|
now = int(time.time())
|
|
ttl = exp - now if exp else 0
|
|
# Should be ~3600s (allow 30s window for test runtime)
|
|
ok = 3550 < ttl <= 3600
|
|
print_result("exp ≈ now + 3600s", ok, f"ttl={ttl}s")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
suite_start = time.time()
|
|
print("=" * 55)
|
|
print(" Jitsi JWT Token — E2E Test Suite")
|
|
print(f" Endpoint: {JITSI_ENDPOINT}")
|
|
print("=" * 55)
|
|
|
|
test_moderator_token()
|
|
test_attendee_token()
|
|
test_room_isolation()
|
|
test_invalid_email()
|
|
test_token_expiry()
|
|
|
|
elapsed = time.time() - suite_start
|
|
print(f"\n{'=' * 55}")
|
|
print(f" Suite completed in {elapsed:.2f}s")
|
|
print("=" * 55)
|