Files
OSIT-AE-API-FastAPI/tests/e2e/test_e2e_v3_search_engine.py
Scott Idem 37c84de57b chore(tests): consolidate E2E test suite into standardized primary scripts
- Combined 10+ one-off tests into 4 primary functional suites (Search, Auth, Lifecycle, Vision).
- Archived original scripts to tests/archive/.
- Updated README with the new standardized inventory.
- Applied clean output formatting across the new suite.
2026-02-03 16:50:18 -05:00

81 lines
3.3 KiB
Python

import requests
import json
import time
# --- Configuration ---
API_BASE = "https://dev-api.oneskyit.com/v3/crud"
API_KEY = "PMM4n50teUCaOMMTN8qOJA"
ACCOUNT_ID = "nqOzejLCDXM" # Standard Test Account
def get_headers(no_account=False):
headers = {
"Content-Type": "application/json",
"X-Aether-API-Key": API_KEY
}
if no_account:
headers["x-no-account-id"] = "bypass"
else:
headers["x-account-id"] = ACCOUNT_ID
return headers
def print_result(label, success, message=""):
status = "✅ PASS" if success else "❌ FAIL"
print(f"[{status}] {label} {message}")
def test_basic_operators():
"""Tests contains, startswith, endswith logic."""
print("\n--- Testing Basic Search Operators ---")
query = {"and": [{"field": "name", "op": "contains", "value": "Journal"}]}
resp = requests.post(f"{API_BASE}/journal/search", headers=get_headers(), json=query)
print_result("Operator: contains", resp.status_code == 200)
query = {"and": [{"field": "name", "op": "startswith", "value": "A"}]}
resp = requests.post(f"{API_BASE}/journal/search", headers=get_headers(), json=query)
print_result("Operator: startswith", resp.status_code == 200)
def test_registry_fields():
"""Tests searching by newly added registry fields (created_on, id_random)."""
print("\n--- Testing Registry-Expanded Fields ---")
query = {"and_filters": [{"field": "created_on", "op": "gt", "value": "2020-01-01"}]}
resp = requests.post(f"{API_BASE}/journal/search", headers=get_headers(), json=query)
print_result("Field: created_on", resp.status_code == 200)
# Get a valid ID for exact match test
res = requests.get(f"{API_BASE}/journal/", headers=get_headers(), params={"limit": 1})
if res.status_code == 200 and res.json().get("data"):
valid_id = res.json()["data"][0]["id"]
query = {"and_filters": [{"field": "id_random", "op": "eq", "value": valid_id}]}
resp = requests.post(f"{API_BASE}/journal/search", headers=get_headers(), json=query)
print_result(f"Field: id_random ({valid_id})", resp.status_code == 200)
def test_nested_search():
"""Tests POST /search on child objects."""
print("\n--- Testing Nested Advanced Search ---")
parent_id = "--ghJX-ztEM" # Valid person
url = f"{API_BASE}/person/{parent_id}/journal/search"
query = {"and_filters": [{"field": "name", "op": "like", "value": "%"}]}
resp = requests.post(url, headers=get_headers(), json=query)
print_result("Nested Search (person -> journal)", resp.status_code == 200)
def test_extra_filters():
"""Tests enabled=all and hidden=all bypass filters."""
print("\n--- Testing Extra Filters (enabled/hidden) ---")
# Using User object as it often has disabled records
resp = requests.get(f"{API_BASE}/user/?enabled=all&hidden=all", headers=get_headers())
print_result("Bypass Filters (enabled=all)", resp.status_code == 200)
if __name__ == "__main__":
print(f"Starting Consolidated Search Engine E2E Suite")
print(f"Target: {API_BASE}")
start_time = time.time()
try:
test_basic_operators()
test_registry_fields()
test_nested_search()
test_extra_filters()
except Exception as e:
print(f"💥 Suite Error: {e}")
print(f"\nSuite completed in {time.time() - start_time:.2f}s")