import requests import json import time # --- Configuration --- API_ROOT = "https://dev-api.oneskyit.com" API_KEY = "dFP6J9DVj9hUgIMn-fNIqg" # Test Matrix: (obj_type, id_random, is_public) TEST_OBJECTS = [ ("journal", "SWFK-48-89-90", False), ("journal_entry", "w1p-OE-Hm-zz", False), ("event", "aJ0KmgvU62Q", True), ("post", "-qTmbMlEjAY", True), ] # Primary Account Context for Auth tests (Account 1) ACCOUNT_A_RAND = "_XY7DXtc9MY" # Secondary Account Context (Account 2 - assume this exists and doesn't own Account 1 records) ACCOUNT_B_RAND = "nqOzejLCDXM" def print_result(label, success, message=""): status = "āœ… PASS" if success else "āŒ FAIL" print(f"[{status}] {label} {message}") def check_id_vision(data, path=""): """Recursively check that no integers are leaked in ID-named fields.""" leaks = [] if isinstance(data, dict): for k, v in data.items(): current_path = f"{path}.{k}" if path else k # Check fields that look like IDs if k == "id" or k.endswith("_id") or k.endswith("_id_random"): if isinstance(v, int): leaks.append(f"{current_path}: {v} (Type: int)") # Recurse leaks.extend(check_id_vision(v, current_path)) elif isinstance(data, list): for i, item in enumerate(data): leaks.extend(check_id_vision(item, f"{path}[{i}]")) return leaks def run_security_audit(): print("====================================================") print(" V3 COMPREHENSIVE SECURITY & VISION AUDIT ") print(f" Target: {API_ROOT}") print("====================================================") for obj_type, obj_id, is_public in TEST_OBJECTS: print(f"\n--- Testing Object Type: {obj_type} ---") # 1. READ LEAK CHECK (No context) url = f"{API_ROOT}/v3/crud/{obj_type}/{obj_id}" headers_unauth = {"x-aether-api-key": API_KEY} resp_unauth = requests.get(url, headers=headers_unauth) if is_public: print_result(f"Unauth GET {obj_type}", resp_unauth.status_code == 200) else: print_result(f"Unauth GET {obj_type}", resp_unauth.status_code == 403, f"- Blocked: {resp_unauth.json().get('meta', {}).get('status_message')}") # 2. VISION COMPLIANCE CHECK (With context) headers_auth = {"x-aether-api-key": API_KEY, "x-account-id": ACCOUNT_A_RAND} resp_auth = requests.get(url, headers=headers_auth) if resp_auth.status_code == 200: data = resp_auth.json().get('data', {}) leaks = check_id_vision(data) print_result(f"Vision Compliance {obj_type}", len(leaks) == 0, f"- Leaks: {leaks if leaks else 'None'}") else: print_result(f"Auth GET {obj_type}", False, f"- Failed to get record for Vision check: {resp_auth.status_code}") # 3. WRITE ISOLATION CHECK (PATCH someone else's record) if not is_public: headers_wrong_account = {"x-aether-api-key": API_KEY, "x-account-id": ACCOUNT_B_RAND} # Attempt to rename a journal that doesn't belong to Account B resp_patch = requests.patch(url, headers=headers_wrong_account, json={"notes": "Hacked!"}) print_result(f"Cross-Account Write Block {obj_type}", resp_patch.status_code == 403, f"- Status: {resp_patch.status_code}") # 4. SEARCH LEAK CHECK (Wide open search) print("\n--- Search Leakage Audit ---") resp_search = requests.post(f"{API_ROOT}/v3/crud/journal/search", headers=headers_unauth, json={"and": []}) count = len(resp_search.json().get('data', [])) print_result("Search Leakage (Journal)", count == 0, f"- Found {count} records (Expected 0)") if __name__ == "__main__": start_time = time.time() try: run_security_audit() except Exception as e: print(f"\nāŒ AUDIT CRASHED: {e}") print("\n====================================================") print(f"Audit completed in {time.time() - start_time:.2f}s") print("====================================================")