feat(security): implement safe guest auth flow and harden request_jwt
- Patched request_jwt to strip privileged IDs when signing with public keys - Updated AccountContext and V3 dependencies to preserve JWT payloads for guests - Whitelisted Archive, Post, Event, and other core objects for public read access - Added 'default_qry_str' to Event searchable fields - Added test_e2e_jwt_guest_auth.py for security verification
This commit is contained in:
@@ -44,6 +44,7 @@ This directory contains the automated and manual test scripts for the Aether Fas
|
||||
| Script | Description |
|
||||
| :--- | :--- |
|
||||
| `test_e2e_agent_bridge.py` | Verifies the `/agent` diagnostics and log streaming endpoints. |
|
||||
| `test_e2e_jwt_guest_auth.py` | **Security Test**: Verifies safe guest token minting and whitelisted access. |
|
||||
| `test_e2e_legacy_remote_schema.py` | Remote check for legacy schema compatibility. |
|
||||
| `test_e2e_site_bootstrap.py` | Verifies the unauthenticated FQDN lookup for site initialization. |
|
||||
| `test_e2e_v3_accounts.py` | CRUD verification for the Account object via network. |
|
||||
|
||||
94
tests/e2e/test_e2e_jwt_guest_auth.py
Normal file
94
tests/e2e/test_e2e_jwt_guest_auth.py
Normal file
@@ -0,0 +1,94 @@
|
||||
import requests
|
||||
import json
|
||||
import jwt # Used only for local decoding/verification of the received token
|
||||
import sys
|
||||
|
||||
# Configuration
|
||||
BASE_URL = "https://dev-api.oneskyit.com"
|
||||
API_KEY = "IDF68Em5X4HTZlswRNgepQ"
|
||||
|
||||
def test_request_jwt_security():
|
||||
print("\n--- Test 1: request_jwt Security (Over the Network) ---")
|
||||
|
||||
# Attempt to request a token with an injected account_id
|
||||
# The backend should now strip this account_id because we aren't using a signing key
|
||||
params = {
|
||||
"account_id": "999999",
|
||||
"json_str": json.dumps({"mode": "guest", "test": True})
|
||||
}
|
||||
headers = {
|
||||
"X-Aether-API-Key": API_KEY
|
||||
}
|
||||
|
||||
url = f"{BASE_URL}/api/request_jwt"
|
||||
print(f"Calling: {url}")
|
||||
|
||||
try:
|
||||
response = requests.get(url, params=params, headers=headers)
|
||||
print(f"Status: {response.status_code}")
|
||||
|
||||
data = response.json()
|
||||
token = data.get('data', {}).get('jwt')
|
||||
|
||||
if not token:
|
||||
print(f"❌ No token returned. Response: {json.dumps(data, indent=2)}")
|
||||
return None
|
||||
|
||||
print(f"Token Received: {token[:30]}...")
|
||||
|
||||
# We can't verify the signature without the secret, but we can inspect the payload
|
||||
# using unverified decode to see if the server stripped the ID before signing.
|
||||
decoded = jwt.decode(token, options={"verify_signature": False})
|
||||
print(f"Unverified Payload: {decoded}")
|
||||
|
||||
if decoded.get('account_id') is None:
|
||||
print("✅ SUCCESS: 'account_id' was successfully stripped by the server.")
|
||||
else:
|
||||
print(f"❌ FAILURE: 'account_id' was present! Security Patch not active. Value: {decoded.get('account_id')}")
|
||||
sys.exit(1)
|
||||
|
||||
return token
|
||||
|
||||
except Exception as e:
|
||||
print(f"❌ Error during request: {e}")
|
||||
return None
|
||||
|
||||
def test_guest_access(guest_token):
|
||||
print("\n--- Test 2: Guest Access with Token ---")
|
||||
|
||||
headers = {
|
||||
"X-Aether-API-Key": API_KEY
|
||||
}
|
||||
params = {"jwt": guest_token}
|
||||
|
||||
# 1. Test Public Object (site_domain) - Should succeed (200)
|
||||
print("\n[A] Testing Public Read (site_domain)...")
|
||||
url_public = f"{BASE_URL}/v3/crud/site_domain/search"
|
||||
resp_public = requests.post(url_public, json={"q": "%"}, headers=headers, params=params)
|
||||
print(f"Status: {resp_public.status_code}")
|
||||
|
||||
if resp_public.status_code == 200:
|
||||
print("✅ SUCCESS: Guest token allowed access to public object.")
|
||||
else:
|
||||
print(f"❌ FAILURE: Status {resp_public.status_code}. Msg: {resp_public.text}")
|
||||
|
||||
# 2. Test Private Object (journal) - Should be blocked (403)
|
||||
print("\n[B] Testing Private Read (journal)...")
|
||||
url_private = f"{BASE_URL}/v3/crud/journal/search"
|
||||
resp_private = requests.post(url_private, json={"q": "%"}, headers=headers, params=params)
|
||||
print(f"Status: {resp_private.status_code}")
|
||||
|
||||
if resp_private.status_code == 403:
|
||||
print("✅ SUCCESS: Guest correctly blocked from private object (403 Forbidden).")
|
||||
else:
|
||||
print(f"❌ FAILURE: Guest was NOT blocked. Status: {resp_private.status_code}")
|
||||
|
||||
if __name__ == "__main__":
|
||||
print(f"Starting E2E JWT Guest Auth Tests against {BASE_URL}\n")
|
||||
token = test_request_jwt_security()
|
||||
if token:
|
||||
test_guest_access(token)
|
||||
else:
|
||||
print("❌ Token request failed, skipping access tests.")
|
||||
|
||||
print("\nTests Complete.")
|
||||
34
tests/test_permissive_mode.py
Normal file
34
tests/test_permissive_mode.py
Normal file
@@ -0,0 +1,34 @@
|
||||
import requests
|
||||
import json
|
||||
|
||||
API_BASE = "https://dev-api.oneskyit.com/v3/crud"
|
||||
API_KEY = "IDF68Em5X4HTZlswRNgepQ"
|
||||
JOURNAL_ID = "OGQK-02-04-94"
|
||||
|
||||
# We'll try to patch this journal with an extra field that shouldn't be there
|
||||
payload = {
|
||||
"name": "Permissive Test Name",
|
||||
"unauthorized_field": "I should be ignored",
|
||||
"created_on": "2026-01-01T00:00:00" # Technical field usually forbidden
|
||||
}
|
||||
|
||||
def test_permissive_mode():
|
||||
headers = {
|
||||
"x-aether-api-key": API_KEY,
|
||||
"x-no-account-id": "bypass",
|
||||
"Content-Type": "application/json"
|
||||
}
|
||||
|
||||
print("\n--- Test 1: Standard Mode (Should FAIL) ---")
|
||||
resp = requests.patch(f"{API_BASE}/journal/{JOURNAL_ID}", headers=headers, json=payload)
|
||||
print(f"Status: {resp.status_code}")
|
||||
print(f"Response: {resp.text}")
|
||||
|
||||
print("\n--- Test 2: Permissive Mode (Should SUCCEED) ---")
|
||||
headers["x-ae-ignore-extra-fields"] = "true"
|
||||
resp = requests.patch(f"{API_BASE}/journal/{JOURNAL_ID}", headers=headers, json=payload)
|
||||
print(f"Status: {resp.status_code}")
|
||||
print(f"Response: {resp.text}")
|
||||
|
||||
if __name__ == "__main__":
|
||||
test_permissive_mode()
|
||||
Reference in New Issue
Block a user