Files
OSIT-AE-API-FastAPI/tests/e2e/test_e2e_v3_security_audit.py

96 lines
4.0 KiB
Python

import requests
import json
import time
# --- Configuration ---
API_ROOT = "https://dev-api.oneskyit.com"
API_KEY = "dFP6J9DVj9hUgIMn-fNIqg"
# Test Matrix: (obj_type, id_random, is_public)
TEST_OBJECTS = [
("journal", "SWFK-48-89-90", False),
("journal_entry", "w1p-OE-Hm-zz", False),
("event", "aJ0KmgvU62Q", True),
("post", "-qTmbMlEjAY", True),
]
# Primary Account Context for Auth tests (Account 1)
ACCOUNT_A_RAND = "_XY7DXtc9MY"
# Secondary Account Context (Account 2 - assume this exists and doesn't own Account 1 records)
ACCOUNT_B_RAND = "nqOzejLCDXM"
def print_result(label, success, message=""):
status = "✅ PASS" if success else "❌ FAIL"
print(f"[{status}] {label} {message}")
def check_id_vision(data, path=""):
"""Recursively check that no integers are leaked in ID-named fields."""
leaks = []
if isinstance(data, dict):
for k, v in data.items():
current_path = f"{path}.{k}" if path else k
# Check fields that look like IDs
if k == "id" or k.endswith("_id") or k.endswith("_id_random"):
if isinstance(v, int):
leaks.append(f"{current_path}: {v} (Type: int)")
# Recurse
leaks.extend(check_id_vision(v, current_path))
elif isinstance(data, list):
for i, item in enumerate(data):
leaks.extend(check_id_vision(item, f"{path}[{i}]"))
return leaks
def run_security_audit():
print("====================================================")
print(" V3 COMPREHENSIVE SECURITY & VISION AUDIT ")
print(f" Target: {API_ROOT}")
print("====================================================")
for obj_type, obj_id, is_public in TEST_OBJECTS:
print(f"\n--- Testing Object Type: {obj_type} ---")
# 1. READ LEAK CHECK (No context)
url = f"{API_ROOT}/v3/crud/{obj_type}/{obj_id}"
headers_unauth = {"x-aether-api-key": API_KEY}
resp_unauth = requests.get(url, headers=headers_unauth)
if is_public:
print_result(f"Unauth GET {obj_type}", resp_unauth.status_code == 200)
else:
print_result(f"Unauth GET {obj_type}", resp_unauth.status_code == 403, f"- Blocked: {resp_unauth.json().get('meta', {}).get('status_message')}")
# 2. VISION COMPLIANCE CHECK (With context)
headers_auth = {"x-aether-api-key": API_KEY, "x-account-id": ACCOUNT_A_RAND}
resp_auth = requests.get(url, headers=headers_auth)
if resp_auth.status_code == 200:
data = resp_auth.json().get('data', {})
leaks = check_id_vision(data)
print_result(f"Vision Compliance {obj_type}", len(leaks) == 0, f"- Leaks: {leaks if leaks else 'None'}")
else:
print_result(f"Auth GET {obj_type}", False, f"- Failed to get record for Vision check: {resp_auth.status_code}")
# 3. WRITE ISOLATION CHECK (PATCH someone else's record)
if not is_public:
headers_wrong_account = {"x-aether-api-key": API_KEY, "x-account-id": ACCOUNT_B_RAND}
# Attempt to rename a journal that doesn't belong to Account B
resp_patch = requests.patch(url, headers=headers_wrong_account, json={"notes": "Hacked!"})
print_result(f"Cross-Account Write Block {obj_type}", resp_patch.status_code == 403, f"- Status: {resp_patch.status_code}")
# 4. SEARCH LEAK CHECK (Wide open search)
print("\n--- Search Leakage Audit ---")
resp_search = requests.post(f"{API_ROOT}/v3/crud/journal/search", headers=headers_unauth, json={"and": []})
count = len(resp_search.json().get('data', []))
print_result("Search Leakage (Journal)", count == 0, f"- Found {count} records (Expected 0)")
if __name__ == "__main__":
start_time = time.time()
try:
run_security_audit()
except Exception as e:
print(f"\n❌ AUDIT CRASHED: {e}")
print("\n====================================================")
print(f"Audit completed in {time.time() - start_time:.2f}s")
print("====================================================")