import requests import json import sys import os # --- Configuration --- API_ROOT = "https://dev-api.oneskyit.com" # Using the key provided in your examples API_KEY = "dFP6J9DVj9hUgIMn-fNIqg" # A known private journal ID from account 1 PRIVATE_JOURNAL_ID = "SWFK-48-89-90" # A known public object type/ID PUBLIC_FQDN = "dev-app.oneskyit.com" # A known valid account ID random for testing restoration of access VALID_ACCOUNT_ID_RAND = "_XY7DXtc9MY" def print_result(label, success, message=""): status = "✅ PASS" if success else "❌ FAIL" print(f"[{status}] {label} {message}") def test_hardened_search_leak(): """Verify that search NO LONGER leaks private data when account_id is missing.""" print("\n--- Test 1: Global Leak Prevention (Search) ---") url = f"{API_ROOT}/v3/crud/journal/search" headers = {"x-aether-api-key": API_KEY} # NO account header, NO JWT payload = {"and": []} resp = requests.post(url, headers=headers, json=payload) if resp.status_code == 200: data = resp.json().get('data', []) # Should be 0 because all journals in DB have an account_id, # and we are now strictly filtering for account_id IS NULL. success = (len(data) == 0) print_result("Leak Blocked (Journal Search)", success, f"- Found {len(data)} records (Expected 0)") else: print_result("Leak Blocked (Journal Search)", False, f"- Unexpected Status: {resp.status_code}") def test_strict_id_block(): """Verify that GET ID correctly blocks private records when account_id is missing.""" print("\n--- Test 2: Strict Access Control (GET ID) ---") url = f"{API_ROOT}/v3/crud/journal/{PRIVATE_JOURNAL_ID}" headers = {"x-aether-api-key": API_KEY} # NO account header, NO JWT resp = requests.get(url, headers=headers) success = (resp.status_code == 403) print_result("Access Denied (Journal GET ID)", success, f"- Status: {resp.status_code} (Expected 403)") def test_bootstrap_exception(): """Verify that public_read objects still work for bootstrapping.""" print("\n--- Test 3: Bootstrap Exception (Public Read) ---") url = f"{API_ROOT}/v3/crud/site_domain/search" headers = {"x-aether-api-key": API_KEY} payload = {"and": [{"field": "fqdn", "op": "eq", "value": PUBLIC_FQDN}]} resp = requests.post(url, headers=headers, json=payload) success = (resp.status_code == 200 and len(resp.json().get('data', [])) > 0) print_result("Bootstrap Allowed (Site Domain)", success, f"- Status: {resp.status_code}") def test_restored_access(): """Verify that providing the correct x-account-id restores access.""" print("\n--- Test 4: Restored Access (With Context) ---") url = f"{API_ROOT}/v3/crud/journal/{PRIVATE_JOURNAL_ID}" headers = { "x-aether-api-key": API_KEY, "x-account-id": VALID_ACCOUNT_ID_RAND } resp = requests.get(url, headers=headers) success = (resp.status_code == 200) print_result("Access Restored (Journal with Header)", success, f"- Status: {resp.status_code}") def test_site_domain_access_key(): """ Verify site_domain lookup respects access_key. The frontend reads the 'key' query param from the browser URL and forwards it as 'access_key' in the POST body. No key means a public domain is expected. Valid (should return a result): https://dev-demo.oneskyit.com — public, no key needed http://idaa.localhost:5173/?key=restricted — correct key https://dev-idaa.oneskyit.com/?key=restricted-access — correct key https://sk-idaa.oneskyit.com/?key=8VTOJ0X5hvT6JdiTJsGEzQ — correct key Invalid (should return empty): http://idaa.localhost:5173/ — key required, none given http://idaa.localhost:5173/?key=bad-key-example — wrong key https://dev-idaa.oneskyit.com/ — key required, none given https://dev-idaa.oneskyit.com/?key= — empty key treated as none https://dev-idaa.oneskyit.com/?key=any-wrong-key — wrong key https://sk-idaa.oneskyit.com/ — key required, none given https://sk-idaa.oneskyit.com/?key=another-bad-key-example — wrong key """ print("\n--- Test 5: Site Domain Access Key Behavior ---") url = f"{API_ROOT}/v3/crud/site_domain/search" headers = {"x-aether-api-key": API_KEY} cases = [ # (fqdn, key, should_pass, label) # --- valid --- ("dev-demo.oneskyit.com", None, True, "public domain, no key"), ("idaa.localhost:5173", "restricted", True, "correct key"), ("dev-idaa.oneskyit.com", "restricted-access", True, "correct key"), ("sk-idaa.oneskyit.com", "8VTOJ0X5hvT6JdiTJsGEzQ", True, "correct key"), # --- invalid --- ("idaa.localhost:5173", None, False, "key required, none given"), ("idaa.localhost:5173", "bad-key-example", False, "wrong key"), ("dev-idaa.oneskyit.com", None, False, "key required, none given"), ("dev-idaa.oneskyit.com", "", False, "empty key treated as none"), ("dev-idaa.oneskyit.com", "any-wrong-key", False, "wrong key"), ("sk-idaa.oneskyit.com", None, False, "key required, none given"), ("sk-idaa.oneskyit.com", "another-bad-key-example", False, "wrong key"), ] for fqdn, key, should_pass, label in cases: payload = {"and": [{"field": "fqdn", "op": "eq", "value": fqdn}]} # Omit access_key entirely when None (no key in URL); send it when present (even if empty) if key is not None: payload["and"].append({"field": "access_key", "op": "eq", "value": key}) try: resp = requests.post(url, headers=headers, json=payload) data = resp.json().get('data', []) if resp.status_code == 200 else [] success = (resp.status_code == 200 and ((len(data) > 0) == should_pass)) tag = "VALID " if should_pass else "INVALID" print_result(f"[{tag}] {fqdn} key={key!r:30} ({label})", success, f"- Count: {len(data)}") except Exception as e: print_result(f"[{'VALID ' if should_pass else 'INVALID'}] {fqdn} key={key!r}", False, f"- Exception: {e}") if __name__ == "__main__": print(f"Starting V3 Security Hardening Verification") print(f"Target: {API_ROOT}") try: test_hardened_search_leak() test_strict_id_block() test_bootstrap_exception() test_restored_access() test_site_domain_access_key() except Exception as e: print(f"\n❌ ERROR during test execution: {e}") print("\nVerification completed.")