97 lines
2.9 KiB
Python
97 lines
2.9 KiB
Python
import requests
|
|
import json
|
|
import argparse
|
|
import time
|
|
|
|
# --- Configuration ---
|
|
BASE_URL = "https://dev-api.oneskyit.com"
|
|
AGENT_API_KEY = "PMM4n50teUCaOMMTN8qOJA"
|
|
|
|
# Default Contexts for Testing
|
|
CONTEXTS = {
|
|
"account_1": "_XY7DXtc9MY",
|
|
"account_5": "xFP7AhU8Zlc",
|
|
"event_1358": "nmBfuGFeR0k"
|
|
}
|
|
|
|
def run_lookup(code, description, account_id=None, for_type=None, for_id=None, limit=1, delay_ms=0):
|
|
"""
|
|
Performs a Data Store lookup and prints standardized results.
|
|
"""
|
|
print(f"[V3] {description} (Limit: {limit}, Delay: {delay_ms}ms)")
|
|
|
|
headers = {
|
|
"X-Aether-API-Key": AGENT_API_KEY,
|
|
"Content-Type": "application/json"
|
|
}
|
|
|
|
if account_id:
|
|
headers["x-account-id"] = account_id
|
|
else:
|
|
headers["x-no-account-id"] = "bypass"
|
|
|
|
url = f"{BASE_URL}/v3/data_store/code/{code}"
|
|
params = {"for_type": for_type, "for_id": for_id, "limit": limit, "delay_ms": delay_ms}
|
|
|
|
start_time = time.time()
|
|
response = requests.get(url, headers=headers, params=params)
|
|
duration = (time.time() - start_time) * 1000
|
|
|
|
print(f" Status: {response.status_code} ({duration:.0f}ms)")
|
|
|
|
if response.status_code == 200:
|
|
data = response.json().get('data')
|
|
if data:
|
|
if isinstance(data, list):
|
|
print(f" Result: SUCCESS (List of {len(data)})")
|
|
else:
|
|
print(f" Result: SUCCESS (Single Object)")
|
|
else:
|
|
print(f" Result: NULL (No record found)")
|
|
else:
|
|
print(f" Result: ERROR - {response.text[:200]}")
|
|
print("-" * 60)
|
|
|
|
def run_search(code, description):
|
|
"""
|
|
Tests the new POST /search endpoint for Data Store codes.
|
|
"""
|
|
print(f"[V3-SEARCH] {description}")
|
|
|
|
url = f"{BASE_URL}/v3/data_store/code/{code}/search"
|
|
headers = {
|
|
"X-Aether-API-Key": AGENT_API_KEY,
|
|
"x-no-account-id": "bypass",
|
|
"Content-Type": "application/json"
|
|
}
|
|
|
|
search_query = {
|
|
"and_filters": [
|
|
{"field": "name", "op": "like", "value": "%"}
|
|
]
|
|
}
|
|
|
|
response = requests.post(url, headers=headers, json=search_query)
|
|
print(f" Status: {response.status_code}")
|
|
|
|
if response.status_code == 200:
|
|
data = response.json().get('data', [])
|
|
print(f" Result: SUCCESS (Found {len(data)} results via POST Search)")
|
|
else:
|
|
print(f" Result: ERROR - {response.text[:200]}")
|
|
print("-" * 60)
|
|
|
|
if __name__ == "__main__":
|
|
CODE = "event_launcher_main_info"
|
|
print(f"=== Aether Data Store V3 Parity Tester ===\n")
|
|
|
|
# 1. Standard Lookup
|
|
run_lookup(CODE, "Scenario: Standard Lookup", account_id=CONTEXTS["account_1"])
|
|
|
|
# 2. Delay Simulation
|
|
run_lookup(CODE, "Scenario: Delay Simulation (500ms)", account_id=CONTEXTS["account_1"], delay_ms=500)
|
|
|
|
# 3. Advanced Search (POST)
|
|
run_search(CODE, "Scenario: Advanced Search via POST")
|
|
|
|
print("\nTests Complete.") |