101 lines
3.4 KiB
Python
101 lines
3.4 KiB
Python
import requests
|
|
import json
|
|
import time
|
|
|
|
# --- Configuration ---
|
|
BASE_URL = "https://dev-api.oneskyit.com"
|
|
AGENT_API_KEY = "PMM4n50teUCaOMMTN8qOJA"
|
|
|
|
# Default Contexts for Testing
|
|
CONTEXTS = {
|
|
"account_1": "_XY7DXtc9MY", # Standard Test Account
|
|
"account_5": "xFP7AhU8Zlc",
|
|
"event_1": "nmBfuGFeR0k"
|
|
}
|
|
|
|
def get_headers(account_id=None):
|
|
headers = {
|
|
"X-Aether-API-Key": AGENT_API_KEY,
|
|
"Content-Type": "application/json"
|
|
}
|
|
if account_id:
|
|
headers["x-account-id"] = account_id
|
|
else:
|
|
headers["x-no-account-id"] = "bypass"
|
|
return headers
|
|
|
|
def print_result(label, success, message=""):
|
|
"""Standardized output helper."""
|
|
status = "✅ PASS" if success else "❌ FAIL"
|
|
print(f"[{status}] {label} {message}")
|
|
|
|
def test_standard_lookup():
|
|
"""Verifies the cascading lookup (Object > Account > Global)."""
|
|
print("\n--- Testing Standard Cascading Lookup ---")
|
|
code = "event_launcher_main_info"
|
|
url = f"{BASE_URL}/v3/data_store/code/{code}"
|
|
|
|
# Test Account Lookup
|
|
resp = requests.get(url, headers=get_headers(CONTEXTS["account_1"]))
|
|
print_result("Lookup: Account Context", resp.status_code == 200)
|
|
|
|
# Test Object Override Lookup
|
|
params = {"for_type": "event", "for_id": CONTEXTS["event_1"]}
|
|
resp = requests.get(url, headers=get_headers(CONTEXTS["account_1"]), params=params)
|
|
print_result("Lookup: Object Context Override", resp.status_code == 200)
|
|
|
|
def test_delay_simulation():
|
|
"""Verifies X-Delay-ms header and delay_ms query param."""
|
|
print("\n--- Testing Latency Simulation ---")
|
|
code = "event_launcher_main_info"
|
|
url = f"{BASE_URL}/v3/data_store/code/{code}"
|
|
|
|
delay_ms = 500
|
|
start_time = time.time()
|
|
resp = requests.get(url, headers=get_headers(CONTEXTS["account_1"]), params={"delay_ms": delay_ms})
|
|
duration = (time.time() - start_time) * 1000
|
|
|
|
# Allow for some network overhead but check if it's at least the delay
|
|
print_result(f"Delay: query param ({delay_ms}ms)", resp.status_code == 200 and duration >= delay_ms)
|
|
|
|
def test_advanced_search():
|
|
"""Verifies POST /search with hierarchical logic and ID Vision."""
|
|
print("\n--- Testing Advanced POST Search ---")
|
|
code = "event_launcher_main_info"
|
|
url = f"{BASE_URL}/v3/data_store/code/{code}/search"
|
|
|
|
search_query = {
|
|
"and_filters": [
|
|
{"field": "name", "op": "like", "value": "%"}
|
|
]
|
|
}
|
|
|
|
resp = requests.post(url, headers=get_headers(), json=search_query)
|
|
success = resp.status_code == 200
|
|
|
|
vision_ok = True
|
|
if success:
|
|
data = resp.json().get('data', [])
|
|
if data:
|
|
# Check ID Vision (strings, not ints)
|
|
item = data[0]
|
|
if not isinstance(item.get('id'), str) or not isinstance(item.get('account_id'), (str, type(None))):
|
|
vision_ok = False
|
|
print(f" ❌ ID Vision Failure: id={item.get('id')}, account_id={item.get('account_id')}")
|
|
|
|
print_result("Search: POST /search with ID Vision", success and vision_ok)
|
|
|
|
if __name__ == "__main__":
|
|
print(f"=== Aether Data Store V3 E2E Suite ===")
|
|
print(f"Target: {BASE_URL}")
|
|
|
|
start_time = time.time()
|
|
try:
|
|
test_standard_lookup()
|
|
test_delay_simulation()
|
|
test_advanced_search()
|
|
except Exception as e:
|
|
print(f"💥 Suite Error: {e}")
|
|
|
|
print(f"\nSuite completed in {time.time() - start_time:.2f}s")
|