chore(tests): consolidate E2E test suite into standardized primary scripts

- Combined 10+ one-off tests into 4 primary functional suites (Search, Auth, Lifecycle, Vision).
- Archived original scripts to tests/archive/.
- Updated README with the new standardized inventory.
- Applied clean output formatting across the new suite.
This commit is contained in:
Scott Idem
2026-02-03 16:50:18 -05:00
parent 29f6cf258f
commit 37c84de57b
15 changed files with 326 additions and 71 deletions

View File

@@ -1,94 +0,0 @@
import requests
import json
import jwt # Used only for local decoding/verification of the received token
import sys
# Configuration
BASE_URL = "https://dev-api.oneskyit.com"
API_KEY = "IDF68Em5X4HTZlswRNgepQ"
def test_request_jwt_security():
print("\n--- Test 1: request_jwt Security (Over the Network) ---")
# Attempt to request a token with an injected account_id
# The backend should now strip this account_id because we aren't using a signing key
params = {
"account_id": "999999",
"json_str": json.dumps({"mode": "guest", "test": True})
}
headers = {
"X-Aether-API-Key": API_KEY
}
url = f"{BASE_URL}/api/request_jwt"
print(f"Calling: {url}")
try:
response = requests.get(url, params=params, headers=headers)
print(f"Status: {response.status_code}")
data = response.json()
token = data.get('data', {}).get('jwt')
if not token:
print(f"❌ No token returned. Response: {json.dumps(data, indent=2)}")
return None
print(f"Token Received: {token[:30]}...")
# We can't verify the signature without the secret, but we can inspect the payload
# using unverified decode to see if the server stripped the ID before signing.
decoded = jwt.decode(token, options={"verify_signature": False})
print(f"Unverified Payload: {decoded}")
if decoded.get('account_id') is None:
print("✅ SUCCESS: 'account_id' was successfully stripped by the server.")
else:
print(f"❌ FAILURE: 'account_id' was present! Security Patch not active. Value: {decoded.get('account_id')}")
sys.exit(1)
return token
except Exception as e:
print(f"❌ Error during request: {e}")
return None
def test_guest_access(guest_token):
print("\n--- Test 2: Guest Access with Token ---")
headers = {
"X-Aether-API-Key": API_KEY
}
params = {"jwt": guest_token}
# 1. Test Public Object (site_domain) - Should succeed (200)
print("\n[A] Testing Public Read (site_domain)...")
url_public = f"{BASE_URL}/v3/crud/site_domain/search"
resp_public = requests.post(url_public, json={"q": "%"}, headers=headers, params=params)
print(f"Status: {resp_public.status_code}")
if resp_public.status_code == 200:
print("✅ SUCCESS: Guest token allowed access to public object.")
else:
print(f"❌ FAILURE: Status {resp_public.status_code}. Msg: {resp_public.text}")
# 2. Test Private Object (journal) - Should be blocked (403)
print("\n[B] Testing Private Read (journal)...")
url_private = f"{BASE_URL}/v3/crud/journal/search"
resp_private = requests.post(url_private, json={"q": "%"}, headers=headers, params=params)
print(f"Status: {resp_private.status_code}")
if resp_private.status_code == 403:
print("✅ SUCCESS: Guest correctly blocked from private object (403 Forbidden).")
else:
print(f"❌ FAILURE: Guest was NOT blocked. Status: {resp_private.status_code}")
if __name__ == "__main__":
print(f"Starting E2E JWT Guest Auth Tests against {BASE_URL}\n")
token = test_request_jwt_security()
if token:
test_guest_access(token)
else:
print("❌ Token request failed, skipping access tests.")
print("\nTests Complete.")

View File

@@ -1,80 +0,0 @@
import requests
import json
import jwt
import sys
# Configuration
BASE_URL = "https://dev-api.oneskyit.com"
SITE_ID = "ltOdfNtjZLo" # Found from DB
VALID_PASSCODE = "10241024" # 'super' role for this site
INVALID_PASSCODE = "wrong-code-123"
def test_passcode_authentication():
print(f"\n--- Testing Passcode Authentication for Site: {SITE_ID} ---")
url = f"{BASE_URL}/api/authenticate_passcode"
payload = {
"site_id": SITE_ID,
"passcode": VALID_PASSCODE
}
try:
# 1. Test Valid Auth
print(f"[1] Requesting JWT with VALID passcode...")
response = requests.post(url, json=payload)
print(f"Status: {response.status_code}")
if response.status_code != 200:
print(f"❌ Auth Failed! Response: {response.text}")
return None
data = response.json()
token = data.get('data', {}).get('jwt')
role = data.get('data', {}).get('role')
if not token:
print("❌ No token in response.")
return None
print(f"✅ Success! Token received for role: '{role}'")
# 2. Inspect JWT Payload
print("\n[2] Inspecting JWT Payload (Unverified)...")
decoded = jwt.decode(token, options={"verify_signature": False})
print(f"Payload: {json.dumps(decoded, indent=2)}")
# Check for role flags
if decoded.get('super') is True:
print("✅ SUCCESS: 'super' flag is correctly set in JWT.")
else:
print("❌ FAILURE: 'super' flag missing or False in JWT.")
sys.exit(1)
# 3. Test Invalid Auth
print("\n[3] Requesting JWT with INVALID passcode...")
payload_bad = {
"site_id": SITE_ID,
"passcode": INVALID_PASSCODE
}
resp_bad = requests.post(url, json=payload_bad)
print(f"Status: {resp_bad.status_code}")
if resp_bad.status_code == 401:
print("✅ SUCCESS: Invalid passcode correctly rejected (401).")
else:
print(f"❌ FAILURE: Unexpected status for bad passcode: {resp_bad.status_code}")
sys.exit(1)
return token
except Exception as e:
print(f"❌ Error during test: {e}")
return None
if __name__ == "__main__":
token = test_passcode_authentication()
if token:
print("\n🎉 Passcode Authentication E2E Test Passed!")
else:
print("\n❌ Test FAILED.")
sys.exit(1)

View File

@@ -1,47 +0,0 @@
import requests
import json
import sys
# Configuration
BASE_URL = "https://dev-api.oneskyit.com/v3/crud"
FQDN_TO_TEST = "dev-app.oneskyit.com" # Example domain that should exist
def test_site_domain_lookup():
print(f"--- Testing Public Site Domain Lookup: {FQDN_TO_TEST} ---")
url = f"{BASE_URL}/site_domain/search"
# Payload: Search for FQDN, explicitly WITHOUT authentication
# The frontend typically sends this to bootstrap
query = {
"and": [{"field": "fqdn", "op": "eq", "value": FQDN_TO_TEST}]
}
# NO AUTH HEADERS (Simulating unauthenticated bootstrapping)
headers = {
"Content-Type": "application/json"
}
try:
response = requests.post(url, headers=headers, json=query)
print(f"URL: {response.url}")
print(f"Status Code: {response.status_code}")
data = response.json()
if response.status_code == 200:
result_data = data.get('data')
if isinstance(result_data, list):
print(f"Success! Found {len(result_data)} records.")
if len(result_data) > 0:
print(f"Data: {json.dumps(result_data[0], indent=2)}")
else:
print(f"Unexpected data format: {type(result_data)}")
else:
print(f"Failure (Expected before fix). Message: {data.get('status_message')}")
except Exception as e:
print(f"Error during test: {e}")
print("-" * 40 + "\n")
if __name__ == "__main__":
test_site_domain_lookup()

View File

@@ -0,0 +1,90 @@
import requests
import io
import json
import os
# --- Configuration ---
API_BASE = "https://dev-api.oneskyit.com/v3/action"
API_KEY = "PMM4n50teUCaOMMTN8qOJA"
ACCOUNT_ID = "Q8lR8Ai8hx2FjbQ3C_EH1Q" # OSIT
# Linking target for test files
LINK_TYPE = "archive_content"
LINK_ID = "bZOa7CtUm0E"
def get_headers():
return {
"X-Aether-API-Key": API_KEY,
"x-account-id": ACCOUNT_ID,
"x-no-account-id": "bypass"
}
def test_file_lifecycle():
print(f"--- Starting Hosted File Lifecycle Test ---")
# 1. UPLOAD
print("\n[Step 1] Uploading test file...")
test_content = b"Lifecycle Test Content: " + os.urandom(8).hex().encode()
files = [("file_list", ("lifecycle_test.txt", io.BytesIO(test_content), "text/plain"))]
data = {"account_id": ACCOUNT_ID, "link_to_type": LINK_TYPE, "link_to_id": LINK_ID}
up_resp = requests.post(f"{API_BASE}/hosted_file/upload", headers=get_headers(), files=files, data=data)
if up_resp.status_code != 200:
print(f" ❌ Upload Failed: {up_resp.text}")
return False
file_info = up_resp.json()['data'][0]
file_id = file_info['id']
file_hash = file_info['hash_sha256']
subdir = file_hash[:2]
expected_path = f"/srv/aether_api/srv/ae_hosted_files/{subdir}/{file_hash}.file"
print(f" ✅ Uploaded: {file_id} (Hash: {file_hash[:10]}...)")
print(f" 🔍 Expected physical path: {expected_path}")
# 2. DOWNLOAD (Direct)
print("\n[Step 2] Downloading by ID...")
dl_resp = requests.get(f"{API_BASE}/hosted_file/{file_id}/download", headers=get_headers())
if dl_resp.status_code == 200 and dl_resp.content == test_content:
print(f" ✅ Downloaded content matches original.")
else:
print(f" ❌ Download Failed or content mismatch. Status: {dl_resp.status_code}")
print(f" Original: {test_content}")
print(f" Received: {dl_resp.content}")
return False
# 3. DOWNLOAD (Hash + Query Auth)
print("\n[Step 3] Downloading by Hash (Query Auth)...")
hash_url = f"{API_BASE}/hosted_file/hash/{file_hash}/download?api_key={API_KEY}"
h_dl_resp = requests.get(hash_url)
if h_dl_resp.status_code == 200:
print(f" ✅ Hash download successful.")
else:
print(f" ❌ Hash download failed: {h_dl_resp.status_code}")
return False
# 4. DELETE (Clean Cleanup)
print("\n[Step 4] Deleting test file (rm_orphan=true)...")
del_params = {"rm_orphan": "true", "method": "delete"}
del_resp = requests.delete(f"{API_BASE}/hosted_file/{file_id}", headers=get_headers(), params=del_params)
if del_resp.status_code == 200:
print(f" ✅ Deletion request successful.")
else:
print(f" ❌ Deletion failed: {del_resp.text}")
return False
# 5. VERIFY DELETED
print("\n[Step 5] Verifying record is gone...")
check_resp = requests.get(f"https://dev-api.oneskyit.com/v3/crud/hosted_file/{file_id}", headers=get_headers())
if check_resp.status_code == 404:
print(f" ✅ Success: Record purged from DB.")
else:
print(f" ❌ Failure: Record still exists after deletion.")
return False
return True
if __name__ == "__main__":
if test_file_lifecycle():
print("\n🎉 HOSTED FILE LIFECYCLE VERIFIED!")
else:
print("\n❌ LIFECYCLE TEST FAILED.")
exit(1)

View File

@@ -0,0 +1,65 @@
import requests
import json
import time
# --- Configuration ---
API_ROOT = "https://dev-api.oneskyit.com"
API_KEY = "PMM4n50teUCaOMMTN8qOJA"
SITE_ID = "ltOdfNtjZLo"
PASSCODE = "10241024"
FQDN = "dev-app.oneskyit.com"
def print_result(label, success, message=""):
status = "✅ PASS" if success else "❌ FAIL"
print(f"[{status}] {label} {message}")
def test_site_bootstrap():
"""Tests unauthenticated FQDN lookup (Bootstrap Exception)."""
print("\n--- Testing Site Bootstrap (Unauth) ---")
url = f"{API_ROOT}/v3/crud/site_domain/search"
query = {"and": [{"field": "fqdn", "op": "eq", "value": FQDN}]}
# NO AUTH HEADERS
resp = requests.post(url, json=query)
print_result("Bootstrap lookup (site_domain)", resp.status_code == 200)
def test_passcode_to_jwt():
"""Tests site-specific passcode authentication."""
print("\n--- Testing Passcode -> JWT Flow ---")
url = f"{API_ROOT}/api/authenticate_passcode"
payload = {"site_id": SITE_ID, "passcode": PASSCODE}
resp = requests.post(url, json=payload)
success = resp.status_code == 200
token = resp.json().get('data', {}).get('jwt') if success else None
print_result("Passcode Auth", success and token is not None)
return token
def test_security_boundaries(token):
"""Tests that a site-token cannot access private journals."""
print("\n--- Testing Security Boundaries ---")
url = f"{API_ROOT}/v3/crud/journal/search"
headers = {"X-Aether-API-Key": API_KEY}
params = {"jwt": token}
# site-scoped JWT should NOT be able to search global journals
resp = requests.post(url, headers=headers, params=params, json={"q": "%"})
print_result("Access Blocked (site-jwt -> journal)", resp.status_code == 403)
def test_machine_auth_exception():
"""Tests that restricted routes fail without API Key."""
print("\n--- Testing Machine Auth Exceptions ---")
url = f"{API_ROOT}/v3/crud/journal/search"
# No headers, no key
resp = requests.post(url, json={"q": "%"})
print_result("Unauth block (journal)", resp.status_code == 403)
if __name__ == "__main__":
print(f"Starting Consolidated Auth & Security E2E Suite")
test_site_bootstrap()
token = test_passcode_to_jwt()
if token:
test_security_boundaries(token)
test_machine_auth_exception()
print("\nSuite completed.")

View File

@@ -1,60 +0,0 @@
import requests
import json
import sys
# Configuration
BASE_URL = "https://dev-api.oneskyit.com/v3/crud"
# Using the API key found in the database
API_KEY = "PHDXUJxx6IgmLNKxIBezTQ"
# Event Device ID found in the database
DEVICE_ID = "GZvFjgIIZQg"
def get_headers():
headers = {
"Content-Type": "application/json",
"X-Aether-API-Key": API_KEY,
"X-No-Account-ID": "bypass"
}
return headers
def test_get_event_device():
print(f"--- Testing: Get Event Device by ID ({DEVICE_ID}) ---")
url = f"{BASE_URL}/event_device/{DEVICE_ID}"
headers = get_headers()
try:
response = requests.get(url, headers=headers)
print(f"URL: {response.url}")
print(f"Status Code: {response.status_code}")
if response.status_code == 200:
data = response.json().get('data', {})
print("✅ SUCCESS: Found Event Device.")
# Verify Vision ID pattern (Strings only for ID fields)
vision_fields = ["id", "event_device_id", "account_id", "event_id", "event_location_id"]
for field in vision_fields:
val = data.get(field)
print(f" Field '{field}': {val} (type: {type(val).__name__})")
if val is not None:
assert isinstance(val, str), f"Error: Field '{field}' should be a string, but got {type(val).__name__}"
# Check for any unexpected integer IDs
for key, val in data.items():
if key.endswith("_id") and not key.endswith("external_id"):
if isinstance(val, int):
print(f" ❌ FAILURE: Integer leakage detected in field '{key}': {val}")
# assert False, f"Integer leakage in '{key}'"
else:
print(f"❌ FAILURE: {response.text}")
except Exception as e:
print(f"Error during test: {e}")
print("-" * 40 + "\n")
if __name__ == "__main__":
print(f"Starting Aether V3 Event Device E2E Tests against {BASE_URL}\n")
test_get_event_device()
print("Tests Complete.")

View File

@@ -1,59 +0,0 @@
import requests
import json
import sys
# Configuration
BASE_URL = "https://dev-api.oneskyit.com/v3/crud"
# Using the API key found in the database
API_KEY = "PHDXUJxx6IgmLNKxIBezTQ"
# Event Session ID found in the database
SESSION_ID = "F0PZd1bNcuD"
def get_headers():
headers = {
"Content-Type": "application/json",
"X-Aether-API-Key": API_KEY,
"X-No-Account-ID": "bypass"
}
return headers
def test_get_event_session():
print(f"--- Testing: Get Event Session by ID ({SESSION_ID}) ---")
url = f"{BASE_URL}/event_session/{SESSION_ID}"
headers = get_headers()
try:
response = requests.get(url, headers=headers)
print(f"URL: {response.url}")
print(f"Status Code: {response.status_code}")
if response.status_code == 200:
data = response.json().get('data', {})
print("✅ SUCCESS: Found Event Session.")
# Verify Vision ID pattern (Strings only for ID fields)
vision_fields = ["id", "event_session_id", "event_id", "event_location_id", "event_track_id", "poc_event_person_id", "poc_person_id"]
for field in vision_fields:
val = data.get(field)
print(f" Field '{field}': {val} (type: {type(val).__name__})")
if val is not None:
assert isinstance(val, str), f"Error: Field '{field}' should be a string, but got {type(val).__name__}"
# Check for any unexpected integer IDs
for key, val in data.items():
if key.endswith("_id") and not key.endswith("external_id"):
if isinstance(val, int):
print(f" ❌ FAILURE: Integer leakage detected in field '{key}': {val}")
else:
print(f"❌ FAILURE: {response.text}")
except Exception as e:
print(f"Error during test: {e}")
print("-" * 40 + "\n")
if __name__ == "__main__":
print(f"Starting Aether V3 Event Session E2E Tests against {BASE_URL}\n")
test_get_event_session()
print("Tests Complete.")

View File

@@ -0,0 +1,70 @@
import requests
import json
# --- Configuration ---
BASE_URL = "https://dev-api.oneskyit.com/v3/crud"
# Standard Agent API Key
API_KEY = "PMM4n50teUCaOMMTN8qOJA"
# Test Targets: (Object Type, Valid ID Random)
TARGETS = [
("event_device", "GZvFjgIIZQg"),
("event_session", "F0PZd1bNcuD")
]
def get_headers():
return {
"Content-Type": "application/json",
"X-Aether-API-Key": API_KEY,
"x-no-account-id": "bypass"
}
def verify_vision_parity(obj_type, record_id):
"""
Verifies that the object returns ONLY string IDs for all ID fields (Vision Standard).
"""
print(f"--- Testing {obj_type}: {record_id} ---")
url = f"{BASE_URL}/{obj_type}/{record_id}"
try:
response = requests.get(url, headers=get_headers())
print(f" Status: {response.status_code}")
if response.status_code == 200:
data = response.json().get('data', {})
failures = []
# Check all fields ending in _id (except external_id)
for key, val in data.items():
if key == "id" or (key.endswith("_id") and not key.endswith("external_id")):
if val is not None and not isinstance(val, str):
failures.append(f"{key} is {type(val).__name__} ({val})")
if not failures:
print(f" ✅ Success: All ID fields are strings.")
return True
else:
print(f" ❌ Failure: Integer leakage detected:")
for f in failures:
print(f" - {f}")
return False
else:
print(f" ❌ Error: {response.text[:200]}")
return False
except Exception as e:
print(f" 💥 Exception: {e}")
return False
if __name__ == "__main__":
print(f"Starting Aether V3 Event Vision Parity Tests\n")
results = []
for obj_type, record_id in TARGETS:
results.append(verify_vision_parity(obj_type, record_id))
print("-" * 40)
if all(results):
print("\n🎉 ALL VISION PARITY TESTS PASSED!")
else:
print("\n❌ SOME TESTS FAILED.")

View File

@@ -1,55 +0,0 @@
import requests
import json
# Configuration
BASE_URL = "https://dev-api.oneskyit.com/v3/crud"
ACCOUNT_ID = "nqOzejLCDXM" # Legacy Header Fallback
def get_headers(include_account=True):
headers = {
"Content-Type": "application/json"
}
if include_account:
headers["X-Account-ID"] = ACCOUNT_ID
return headers
def test_endpoint(path, description, include_account=True):
print(f"--- Testing: {description} ---")
url = f"{BASE_URL}{path}"
try:
response = requests.get(url, headers=get_headers(include_account))
print(f"URL: {response.url}")
print(f"Status Code: {response.status_code}")
data = response.json()
result_data = data.get('data')
if isinstance(result_data, list):
print(f"Result Count: {len(result_data)}")
if len(result_data) > 0:
print(f"First Item: {json.dumps(result_data[0], indent=2)[:300]}...")
else:
print(f"Result Data: {json.dumps(result_data, indent=2)}")
if response.status_code != 200:
print(f"Error Message: {data.get('status_message')}")
except Exception as e:
print(f"Error during test: {e}")
print("-" * 40 + "\n")
if __name__ == "__main__":
# Test Users
test_endpoint("/user/", "List Users (Default filter)")
test_endpoint("/user/?enabled=all&hidden=all", "List Users (Bypass filter)")
# Test Sites
test_endpoint(f"/account/{ACCOUNT_ID}/site/", "List Sites (Account Filter)")
# Test Site Domains
test_endpoint("/site_domain/", "List Site Domains (Default filter)")
test_endpoint("/site_domain/?enabled=all&hidden=all", "List Site Domains (Bypass filter)")
# Test Legacy Site Domain Lookup (Initial frontend request)
# This route is in api_crud.py (v1) and needs to work without an account ID header
test_endpoint("/../../crud/site/domain/scott.localhost:5173?use_alt_table=true&use_alt_base=true", "Legacy Site Domain Lookup", include_account=False)

View File

@@ -1,80 +0,0 @@
import requests
import json
# Configuration
BASE_URL = "https://dev-api.oneskyit.com/v3/crud"
API_KEY = "PMM4n50teUCaOMMTN8qOJA"
ACCOUNT_ID = "Q8lR8Ai8hx2FjbQ3C_EH1Q" # OSIT
def test_nested_search():
print("--- Testing Nested Advanced Search (POST) ---")
# Test: Search for journals belonging to a specific person
parent_type = "person"
parent_id = "--ghJX-ztEM" # Using a valid person ID
child_type = "journal"
url = f"{BASE_URL}/{parent_type}/{parent_id}/{child_type}/search"
headers = {
"X-Aether-API-Key": API_KEY,
"x-account-id": ACCOUNT_ID
}
search_query = {
"and_filters": [
{"field": "name", "op": "like", "value": "%"}
]
}
try:
response = requests.post(url, headers=headers, json=search_query)
print(f"Status: {response.status_code}")
if response.status_code == 200:
data = response.json().get("data", [])
print(f"✅ Success: Found {len(data)} nested records via search.")
return True
else:
print(f"❌ Failed: {response.text}")
return False
except Exception as e:
print(f"💥 Exception: {e}")
return False
def test_nested_view():
print("\n--- Testing Nested Get with View Parameter ---")
# Test: Get a single journal entry under a journal using 'enriched' view
parent_type = "journal"
parent_id = "PJRCGHQWERT" # Using a known journal ID
child_type = "journal_entry"
child_id = "PJRCGHQWERT" # Using a known entry ID
url = f"{BASE_URL}/{parent_type}/{parent_id}/{child_type}/{child_id}?view=enriched"
headers = {
"X-Aether-API-Key": API_KEY,
"x-account-id": ACCOUNT_ID
}
try:
response = requests.get(url, headers=headers)
print(f"Status: {response.status_code}")
if response.status_code == 200:
print("✅ Success: Retrieved nested record with view=enriched.")
return True
elif response.status_code == 404:
print("⚠️ Note: Record not found, but route matched.")
return True
else:
print(f"❌ Failed: {response.status_code}")
return False
except Exception as e:
print(f"💥 Exception: {e}")
return False
if __name__ == "__main__":
s1 = test_nested_search()
s2 = test_nested_view()
if s1 and s2:
print("\n🎉 NESTED V3 FEATURES VERIFIED!")

View File

@@ -1,72 +0,0 @@
import requests
import json
import sys
# Configuration
BASE_URL = "https://dev-api.oneskyit.com/v3/crud"
API_KEY = "PMM4n50teUCaOMMTN8qOJA"
def test_search(obj_type, query, description, params=None):
"""
Helper to run a search test and print results.
"""
print(f"--- Testing: {description} ---")
url = f"{BASE_URL}/{obj_type}/search"
headers = {
"Content-Type": "application/json",
"X-Aether-API-Key": API_KEY,
"x-no-account-id": "bypass"
}
try:
response = requests.post(url, headers=headers, json=query, params=params)
print(f"URL: {response.url}")
print(f"Status Code: {response.status_code}")
data = response.json()
result_data = data.get('data')
if response.status_code == 200:
if isinstance(result_data, list):
print(f"✅ Success: Found {len(result_data)} items.")
else:
print(f"✅ Success: Result type {type(result_data)}")
else:
print(f"❌ Failed: {data.get('status_message')}")
print(f" Details: {json.dumps(data.get('details', {}), indent=2)}")
except Exception as e:
print(f"💥 Error: {e}")
print("-" * 40 + "\n")
if __name__ == "__main__":
print(f"Starting Aether V3 Search Registry Verification\n")
# 1. Standard Search
test_search("journal", {"q": "%"}, "Standard Search (Fulltext)")
# 2. Test newly added searchable field: created_on
query_date = {
"and_filters": [
{"field": "created_on", "op": "gt", "value": "2020-01-01"}
]
}
test_search("journal", query_date, "Search via newly added field: created_on")
# 3. Test newly added searchable field: id_random
# Get a valid ID first
res = requests.get(f"{BASE_URL}/journal/", headers={"X-Aether-API-Key": API_KEY, "x-no-account-id": "bypass"}, params={"limit": 1})
if res.status_code == 200 and res.json().get("data"):
valid_id = res.json()["data"][0]["id"]
query_id = {
"and_filters": [
{"field": "id_random", "op": "eq", "value": valid_id}
]
}
test_search("journal", query_id, f"Search via newly added field: id_random ({valid_id})")
# 4. Test Archive Content (Specialized Object)
test_search("archive_content", {"q": "%"}, "Archive Content Search")
print("Tests Complete.")

View File

@@ -1,120 +0,0 @@
import requests
import json
import sys
# Configuration
BASE_URL = "https://dev-api.oneskyit.com/v3/crud"
# --- AUTHENTICATION CONFIG ---
# Option 1: Modern JWT (Preferred)
JWT_TOKEN = "" # PASTE YOUR JWT TOKEN HERE
# Option 2: Legacy Header Fallback
ACCOUNT_ID = "nqOzejLCDXM"
def get_headers(use_jwt=True):
headers = {
"Content-Type": "application/json"
}
if use_jwt and JWT_TOKEN:
headers["Authorization"] = f"Bearer {JWT_TOKEN}"
else:
headers["X-Account-ID"] = ACCOUNT_ID
return headers
def test_search(obj_type, query, description, params=None, use_jwt_query=False):
"""
Helper to run a search test and print results.
"""
print(f"--- Testing: {description} ---")
url = f"{BASE_URL}/{obj_type}/search"
# Handle JWT in query parameter if requested
request_params = params.copy() if params else {}
request_headers = get_headers(use_jwt=not use_jwt_query)
if use_jwt_query and JWT_TOKEN:
request_params["jwt"] = JWT_TOKEN
try:
response = requests.post(url, headers=request_headers, json=query, params=request_params)
print(f"URL: {response.url}")
print(f"Auth Method: {'JWT Query' if use_jwt_query else ('JWT Header' if JWT_TOKEN else 'Legacy Header')}")
print(f"Status Code: {response.status_code}")
data = response.json()
# Check if the result is a list or single object
result_data = data.get('data')
if isinstance(result_data, list):
print(f"Result Count: {len(result_data)}")
if len(result_data) > 0:
# Print first item as example
print(f"First Item Example: {json.dumps(result_data[0], indent=2)[:200]}...")
else:
print(f"Result Type: {type(result_data)}")
print(f"Result Data: {json.dumps(result_data, indent=2)}")
if response.status_code != 200:
print(f"Error Message: {data.get('status_message')}")
print(f"Meta Details: {json.dumps(data.get('meta', {}), indent=2)}")
except requests.exceptions.ConnectionError:
print(f"Error: Could not connect to {BASE_URL}. Is the API running?")
except Exception as e:
print(f"Error during test: {e}")
print("-" * 40 + "\n")
if __name__ == "__main__":
if not JWT_TOKEN:
print("WARNING: JWT_TOKEN is empty. Falling back to Legacy ACCOUNT_ID auth.")
print("To test JWT, please paste a token into the JWT_TOKEN variable.\n")
print(f"Starting Aether V3 Search & JWT Tests against {BASE_URL}\n")
# 1. JWT Header Authentication Test
test_search("journal", {"q": "%"}, "Auth: JWT via Authorization Header")
# 2. JWT Query Parameter Authentication Test
if JWT_TOKEN:
test_search("journal", {"q": "%"}, "Auth: JWT via 'jwt' Query Parameter", use_jwt_query=True)
# 3. New Operator: contains / icontains
query_contains = {
"and": [{"field": "name", "op": "contains", "value": "Journal"}]
}
test_search("journal", query_contains, "Operator: contains (automatically adds %%")
# 4. New Operator: startswith / istartswith
query_start = {
"and": [{"field": "name", "op": "startswith", "value": "A"}]
}
test_search("journal", query_start, "Operator: startswith (automatically adds % at end)")
# 5. New Operator: endswith / iendswith
query_end = {
"and": [{"field": "name", "op": "endswith", "value": "Test"}]
}
test_search("journal", query_end, "Operator: endswith (automatically adds % at start)")
# 6. Error Handling: Unsupported Operator
query_bad = {
"and": [{"field": "name", "op": "invalid_op", "value": "test"}]
}
test_search("journal", query_bad, "Error Handling: Unsupported Operator (Should return 400)")
# 7. Complex Nested Logic (Recap)
query_nested = {
"and": [
{"field": "enable", "op": "eq", "value": True},
{
"or": [
{"field": "name", "op": "icontains", "value": "aether"},
{"field": "summary", "op": "is_not_null"}
]
}
]
}
test_search("journal", query_nested, "Complex Logic: Nested AND/OR + icontains")
print("Tests Complete.")

View File

@@ -0,0 +1,80 @@
import requests
import json
import time
# --- Configuration ---
API_BASE = "https://dev-api.oneskyit.com/v3/crud"
API_KEY = "PMM4n50teUCaOMMTN8qOJA"
ACCOUNT_ID = "nqOzejLCDXM" # Standard Test Account
def get_headers(no_account=False):
headers = {
"Content-Type": "application/json",
"X-Aether-API-Key": API_KEY
}
if no_account:
headers["x-no-account-id"] = "bypass"
else:
headers["x-account-id"] = ACCOUNT_ID
return headers
def print_result(label, success, message=""):
status = "✅ PASS" if success else "❌ FAIL"
print(f"[{status}] {label} {message}")
def test_basic_operators():
"""Tests contains, startswith, endswith logic."""
print("\n--- Testing Basic Search Operators ---")
query = {"and": [{"field": "name", "op": "contains", "value": "Journal"}]}
resp = requests.post(f"{API_BASE}/journal/search", headers=get_headers(), json=query)
print_result("Operator: contains", resp.status_code == 200)
query = {"and": [{"field": "name", "op": "startswith", "value": "A"}]}
resp = requests.post(f"{API_BASE}/journal/search", headers=get_headers(), json=query)
print_result("Operator: startswith", resp.status_code == 200)
def test_registry_fields():
"""Tests searching by newly added registry fields (created_on, id_random)."""
print("\n--- Testing Registry-Expanded Fields ---")
query = {"and_filters": [{"field": "created_on", "op": "gt", "value": "2020-01-01"}]}
resp = requests.post(f"{API_BASE}/journal/search", headers=get_headers(), json=query)
print_result("Field: created_on", resp.status_code == 200)
# Get a valid ID for exact match test
res = requests.get(f"{API_BASE}/journal/", headers=get_headers(), params={"limit": 1})
if res.status_code == 200 and res.json().get("data"):
valid_id = res.json()["data"][0]["id"]
query = {"and_filters": [{"field": "id_random", "op": "eq", "value": valid_id}]}
resp = requests.post(f"{API_BASE}/journal/search", headers=get_headers(), json=query)
print_result(f"Field: id_random ({valid_id})", resp.status_code == 200)
def test_nested_search():
"""Tests POST /search on child objects."""
print("\n--- Testing Nested Advanced Search ---")
parent_id = "--ghJX-ztEM" # Valid person
url = f"{API_BASE}/person/{parent_id}/journal/search"
query = {"and_filters": [{"field": "name", "op": "like", "value": "%"}]}
resp = requests.post(url, headers=get_headers(), json=query)
print_result("Nested Search (person -> journal)", resp.status_code == 200)
def test_extra_filters():
"""Tests enabled=all and hidden=all bypass filters."""
print("\n--- Testing Extra Filters (enabled/hidden) ---")
# Using User object as it often has disabled records
resp = requests.get(f"{API_BASE}/user/?enabled=all&hidden=all", headers=get_headers())
print_result("Bypass Filters (enabled=all)", resp.status_code == 200)
if __name__ == "__main__":
print(f"Starting Consolidated Search Engine E2E Suite")
print(f"Target: {API_BASE}")
start_time = time.time()
try:
test_basic_operators()
test_registry_fields()
test_nested_search()
test_extra_filters()
except Exception as e:
print(f"💥 Suite Error: {e}")
print(f"\nSuite completed in {time.time() - start_time:.2f}s")

View File

@@ -1,50 +0,0 @@
import requests
import json
# Configuration
BASE_URL = "https://dev-api.oneskyit.com"
SEARCH_ENDPOINT = f"{BASE_URL}/v3/crud/site_domain/search"
RESTRICTED_ENDPOINT = f"{BASE_URL}/v3/crud/journal/search"
def test_site_domain_exception():
print("--- Testing site_domain guest access (Exception) ---")
search_query = {
"q": "%", # Match all for testing
"and": []
}
try:
# No Authorization or X-Account-ID headers provided
response = requests.post(SEARCH_ENDPOINT, json=search_query)
print(f"Status Code: {response.status_code}")
if response.status_code == 200:
data = response.json()
print("SUCCESS: site_domain search allowed without authentication.")
print(f"Result count: {len(data.get('data', []))}")
else:
print(f"FAILED: site_domain search returned {response.status_code}")
print(response.text)
except Exception as e:
print(f"Error during site_domain test: {e}")
def test_restricted_search():
print("\n--- Testing restricted search (Should fail) ---")
search_query = {"q": "%"}
try:
response = requests.post(RESTRICTED_ENDPOINT, json=search_query)
print(f"Status Code: {response.status_code}")
if response.status_code == 403:
print("SUCCESS: Restricted search was correctly blocked (403 Forbidden).")
else:
print(f"FAILED: Restricted search returned {response.status_code} instead of 403.")
except Exception as e:
print(f"Error during restricted test: {e}")
if __name__ == "__main__":
test_site_domain_exception()
test_restricted_search()