Files
OSIT-AE-API-FastAPI/tests/e2e/test_e2e_v3_data_store_lookup.py

101 lines
3.4 KiB
Python

import requests
import json
import time
# --- Configuration ---
BASE_URL = "https://dev-api.oneskyit.com"
AGENT_API_KEY = "PMM4n50teUCaOMMTN8qOJA"
# Default Contexts for Testing
CONTEXTS = {
"account_1": "_XY7DXtc9MY", # Standard Test Account
"account_5": "xFP7AhU8Zlc",
"event_1": "nmBfuGFeR0k"
}
def get_headers(account_id=None):
headers = {
"X-Aether-API-Key": AGENT_API_KEY,
"Content-Type": "application/json"
}
if account_id:
headers["x-account-id"] = account_id
else:
headers["x-no-account-id"] = "bypass"
return headers
def print_result(label, success, message=""):
"""Standardized output helper."""
status = "✅ PASS" if success else "❌ FAIL"
print(f"[{status}] {label} {message}")
def test_standard_lookup():
"""Verifies the cascading lookup (Object > Account > Global)."""
print("\n--- Testing Standard Cascading Lookup ---")
code = "event_launcher_main_info"
url = f"{BASE_URL}/v3/data_store/code/{code}"
# Test Account Lookup
resp = requests.get(url, headers=get_headers(CONTEXTS["account_1"]))
print_result("Lookup: Account Context", resp.status_code == 200)
# Test Object Override Lookup
params = {"for_type": "event", "for_id": CONTEXTS["event_1"]}
resp = requests.get(url, headers=get_headers(CONTEXTS["account_1"]), params=params)
print_result("Lookup: Object Context Override", resp.status_code == 200)
def test_delay_simulation():
"""Verifies X-Delay-ms header and delay_ms query param."""
print("\n--- Testing Latency Simulation ---")
code = "event_launcher_main_info"
url = f"{BASE_URL}/v3/data_store/code/{code}"
delay_ms = 500
start_time = time.time()
resp = requests.get(url, headers=get_headers(CONTEXTS["account_1"]), params={"delay_ms": delay_ms})
duration = (time.time() - start_time) * 1000
# Allow for some network overhead but check if it's at least the delay
print_result(f"Delay: query param ({delay_ms}ms)", resp.status_code == 200 and duration >= delay_ms)
def test_advanced_search():
"""Verifies POST /search with hierarchical logic and ID Vision."""
print("\n--- Testing Advanced POST Search ---")
code = "event_launcher_main_info"
url = f"{BASE_URL}/v3/data_store/code/{code}/search"
search_query = {
"and_filters": [
{"field": "name", "op": "like", "value": "%"}
]
}
resp = requests.post(url, headers=get_headers(), json=search_query)
success = resp.status_code == 200
vision_ok = True
if success:
data = resp.json().get('data', [])
if data:
# Check ID Vision (strings, not ints)
item = data[0]
if not isinstance(item.get('id'), str) or not isinstance(item.get('account_id'), (str, type(None))):
vision_ok = False
print(f" ❌ ID Vision Failure: id={item.get('id')}, account_id={item.get('account_id')}")
print_result("Search: POST /search with ID Vision", success and vision_ok)
if __name__ == "__main__":
print(f"=== Aether Data Store V3 E2E Suite ===")
print(f"Target: {BASE_URL}")
start_time = time.time()
try:
test_standard_lookup()
test_delay_simulation()
test_advanced_search()
except Exception as e:
print(f"💥 Suite Error: {e}")
print(f"\nSuite completed in {time.time() - start_time:.2f}s")