import requests import json import time # --- Configuration --- BASE_URL = "https://dev-api.oneskyit.com" AGENT_API_KEY = "PMM4n50teUCaOMMTN8qOJA" # Default Contexts for Testing CONTEXTS = { "account_1": "_XY7DXtc9MY", # Standard Test Account "account_5": "xFP7AhU8Zlc", "event_1": "nmBfuGFeR0k" } def get_headers(account_id=None): headers = { "X-Aether-API-Key": AGENT_API_KEY, "Content-Type": "application/json" } if account_id: headers["x-account-id"] = account_id else: headers["x-no-account-id"] = "bypass" return headers def print_result(label, success, message=""): """Standardized output helper.""" status = "✅ PASS" if success else "❌ FAIL" print(f"[{status}] {label} {message}") def test_standard_lookup(): """Verifies the cascading lookup (Object > Account > Global).""" print("\n--- Testing Standard Cascading Lookup ---") code = "event_launcher_main_info" url = f"{BASE_URL}/v3/data_store/code/{code}" # Test Account Lookup resp = requests.get(url, headers=get_headers(CONTEXTS["account_1"])) print_result("Lookup: Account Context", resp.status_code == 200) # Test Object Override Lookup params = {"for_type": "event", "for_id": CONTEXTS["event_1"]} resp = requests.get(url, headers=get_headers(CONTEXTS["account_1"]), params=params) print_result("Lookup: Object Context Override", resp.status_code == 200) def test_delay_simulation(): """Verifies X-Delay-ms header and delay_ms query param.""" print("\n--- Testing Latency Simulation ---") code = "event_launcher_main_info" url = f"{BASE_URL}/v3/data_store/code/{code}" delay_ms = 500 start_time = time.time() resp = requests.get(url, headers=get_headers(CONTEXTS["account_1"]), params={"delay_ms": delay_ms}) duration = (time.time() - start_time) * 1000 # Allow for some network overhead but check if it's at least the delay print_result(f"Delay: query param ({delay_ms}ms)", resp.status_code == 200 and duration >= delay_ms) def test_advanced_search(): """Verifies POST /search with hierarchical logic and ID Vision.""" print("\n--- Testing Advanced POST Search ---") code = "event_launcher_main_info" url = f"{BASE_URL}/v3/data_store/code/{code}/search" search_query = { "and_filters": [ {"field": "name", "op": "like", "value": "%"} ] } resp = requests.post(url, headers=get_headers(), json=search_query) success = resp.status_code == 200 vision_ok = True if success: data = resp.json().get('data', []) if data: # Check ID Vision (strings, not ints) item = data[0] if not isinstance(item.get('id'), str) or not isinstance(item.get('account_id'), (str, type(None))): vision_ok = False print(f" ❌ ID Vision Failure: id={item.get('id')}, account_id={item.get('account_id')}") print_result("Search: POST /search with ID Vision", success and vision_ok) if __name__ == "__main__": print(f"=== Aether Data Store V3 E2E Suite ===") print(f"Target: {BASE_URL}") start_time = time.time() try: test_standard_lookup() test_delay_simulation() test_advanced_search() except Exception as e: print(f"💥 Suite Error: {e}") print(f"\nSuite completed in {time.time() - start_time:.2f}s")