import requests import json import sys import os # --- Configuration --- API_ROOT = "https://dev-api.oneskyit.com" # Using the key provided in your examples API_KEY = "dFP6J9DVj9hUgIMn-fNIqg" # A known private journal ID from account 1 PRIVATE_JOURNAL_ID = "SWFK-48-89-90" # A known public object type/ID PUBLIC_FQDN = "dev-app.oneskyit.com" # A known valid account ID random for testing restoration of access VALID_ACCOUNT_ID_RAND = "_XY7DXtc9MY" def print_result(label, success, message=""): status = "āœ… PASS" if success else "āŒ FAIL" print(f"[{status}] {label} {message}") def test_hardened_search_leak(): """Verify that search NO LONGER leaks private data when account_id is missing.""" print("\n--- Test 1: Global Leak Prevention (Search) ---") url = f"{API_ROOT}/v3/crud/journal/search" headers = {"x-aether-api-key": API_KEY} # NO account header, NO JWT payload = {"and": []} resp = requests.post(url, headers=headers, json=payload) if resp.status_code == 200: data = resp.json().get('data', []) # Should be 0 because all journals in DB have an account_id, # and we are now strictly filtering for account_id IS NULL. success = (len(data) == 0) print_result("Leak Blocked (Journal Search)", success, f"- Found {len(data)} records (Expected 0)") else: print_result("Leak Blocked (Journal Search)", False, f"- Unexpected Status: {resp.status_code}") def test_strict_id_block(): """Verify that GET ID correctly blocks private records when account_id is missing.""" print("\n--- Test 2: Strict Access Control (GET ID) ---") url = f"{API_ROOT}/v3/crud/journal/{PRIVATE_JOURNAL_ID}" headers = {"x-aether-api-key": API_KEY} # NO account header, NO JWT resp = requests.get(url, headers=headers) success = (resp.status_code == 403) print_result("Access Denied (Journal GET ID)", success, f"- Status: {resp.status_code} (Expected 403)") def test_bootstrap_exception(): """Verify that public_read objects still work for bootstrapping.""" print("\n--- Test 3: Bootstrap Exception (Public Read) ---") url = f"{API_ROOT}/v3/crud/site_domain/search" headers = {"x-aether-api-key": API_KEY} payload = {"and": [{"field": "fqdn", "op": "eq", "value": PUBLIC_FQDN}]} resp = requests.post(url, headers=headers, json=payload) success = (resp.status_code == 200 and len(resp.json().get('data', [])) > 0) print_result("Bootstrap Allowed (Site Domain)", success, f"- Status: {resp.status_code}") def test_restored_access(): """Verify that providing the correct x-account-id restores access.""" print("\n--- Test 4: Restored Access (With Context) ---") url = f"{API_ROOT}/v3/crud/journal/{PRIVATE_JOURNAL_ID}" headers = { "x-aether-api-key": API_KEY, "x-account-id": VALID_ACCOUNT_ID_RAND } resp = requests.get(url, headers=headers) success = (resp.status_code == 200) print_result("Access Restored (Journal with Header)", success, f"- Status: {resp.status_code}") if __name__ == "__main__": print(f"Starting V3 Security Hardening Verification") print(f"Target: {API_ROOT}") try: test_hardened_search_leak() test_strict_id_block() test_bootstrap_exception() test_restored_access() except Exception as e: print(f"\nāŒ ERROR during test execution: {e}") print("\nVerification completed.")