import requests import json import argparse import time # --- Configuration --- BASE_URL = "https://dev-api.oneskyit.com" AGENT_API_KEY = "PMM4n50teUCaOMMTN8qOJA" # Default Contexts for Testing CONTEXTS = { "account_1": "_XY7DXtc9MY", "account_5": "xFP7AhU8Zlc", "event_1358": "nmBfuGFeR0k" } def run_lookup(code, description, account_id=None, for_type=None, for_id=None, limit=1, delay_ms=0): """ Performs a Data Store lookup and prints standardized results. """ print(f"[V3] {description} (Limit: {limit}, Delay: {delay_ms}ms)") headers = { "X-Aether-API-Key": AGENT_API_KEY, "Content-Type": "application/json" } if account_id: headers["x-account-id"] = account_id else: headers["x-no-account-id"] = "bypass" url = f"{BASE_URL}/v3/data_store/code/{code}" params = {"for_type": for_type, "for_id": for_id, "limit": limit, "delay_ms": delay_ms} start_time = time.time() response = requests.get(url, headers=headers, params=params) duration = (time.time() - start_time) * 1000 print(f" Status: {response.status_code} ({duration:.0f}ms)") if response.status_code == 200: data = response.json().get('data') if data: if isinstance(data, list): print(f" Result: SUCCESS (List of {len(data)})") else: print(f" Result: SUCCESS (Single Object)") else: print(f" Result: NULL (No record found)") else: print(f" Result: ERROR - {response.text[:200]}") print("-" * 60) def run_search(code, description): """ Tests the new POST /search endpoint for Data Store codes. """ print(f"[V3-SEARCH] {description}") url = f"{BASE_URL}/v3/data_store/code/{code}/search" headers = { "X-Aether-API-Key": AGENT_API_KEY, "x-no-account-id": "bypass", "Content-Type": "application/json" } search_query = { "and_filters": [ {"field": "name", "op": "like", "value": "%"} ] } response = requests.post(url, headers=headers, json=search_query) print(f" Status: {response.status_code}") if response.status_code == 200: data = response.json().get('data', []) print(f" Result: SUCCESS (Found {len(data)} results via POST Search)") else: print(f" Result: ERROR - {response.text[:200]}") print("-" * 60) if __name__ == "__main__": CODE = "event_launcher_main_info" print(f"=== Aether Data Store V3 Parity Tester ===\n") # 1. Standard Lookup run_lookup(CODE, "Scenario: Standard Lookup", account_id=CONTEXTS["account_1"]) # 2. Delay Simulation run_lookup(CODE, "Scenario: Delay Simulation (500ms)", account_id=CONTEXTS["account_1"], delay_ms=500) # 3. Advanced Search (POST) run_search(CODE, "Scenario: Advanced Search via POST") print("\nTests Complete.")