Files
OSIT-AE-API-FastAPI/tests/e2e/test_e2e_v3_search.py
Scott Idem b2384f2869 Tests: Reorganize test suite into functional subdirectories
- Categorized scripts into tests/unit/, tests/integration/, tests/e2e/, and tests/tools/.
- Adopted consistent naming prefixes (test_unit_*, test_int_*, test_e2e_*, tool_*).
- Renamed conftest_mock.py to mock_config_helper.py for clarity.
- Updated test_int_boot_diagnosis.py with sys.path setup for root-level execution.
2026-01-16 10:46:19 -05:00

120 lines
4.2 KiB
Python

import requests
import json
import sys
# Configuration
BASE_URL = "https://dev-api.oneskyit.com/v3/crud"
# --- AUTHENTICATION CONFIG ---
# Option 1: Modern JWT (Preferred)
JWT_TOKEN = "" # PASTE YOUR JWT TOKEN HERE
# Option 2: Legacy Header Fallback
ACCOUNT_ID = "nqOzejLCDXM"
def get_headers(use_jwt=True):
headers = {
"Content-Type": "application/json"
}
if use_jwt and JWT_TOKEN:
headers["Authorization"] = f"Bearer {JWT_TOKEN}"
else:
headers["X-Account-ID"] = ACCOUNT_ID
return headers
def test_search(obj_type, query, description, params=None, use_jwt_query=False):
"""
Helper to run a search test and print results.
"""
print(f"--- Testing: {description} ---")
url = f"{BASE_URL}/{obj_type}/search"
# Handle JWT in query parameter if requested
request_params = params.copy() if params else {}
request_headers = get_headers(use_jwt=not use_jwt_query)
if use_jwt_query and JWT_TOKEN:
request_params["jwt"] = JWT_TOKEN
try:
response = requests.post(url, headers=request_headers, json=query, params=request_params)
print(f"URL: {response.url}")
print(f"Auth Method: {'JWT Query' if use_jwt_query else ('JWT Header' if JWT_TOKEN else 'Legacy Header')}")
print(f"Status Code: {response.status_code}")
data = response.json()
# Check if the result is a list or single object
result_data = data.get('data')
if isinstance(result_data, list):
print(f"Result Count: {len(result_data)}")
if len(result_data) > 0:
# Print first item as example
print(f"First Item Example: {json.dumps(result_data[0], indent=2)[:200]}...")
else:
print(f"Result Type: {type(result_data)}")
print(f"Result Data: {json.dumps(result_data, indent=2)}")
if response.status_code != 200:
print(f"Error Message: {data.get('status_message')}")
print(f"Meta Details: {json.dumps(data.get('meta', {}), indent=2)}")
except requests.exceptions.ConnectionError:
print(f"Error: Could not connect to {BASE_URL}. Is the API running?")
except Exception as e:
print(f"Error during test: {e}")
print("-" * 40 + "\n")
if __name__ == "__main__":
if not JWT_TOKEN:
print("WARNING: JWT_TOKEN is empty. Falling back to Legacy ACCOUNT_ID auth.")
print("To test JWT, please paste a token into the JWT_TOKEN variable.\n")
print(f"Starting Aether V3 Search & JWT Tests against {BASE_URL}\n")
# 1. JWT Header Authentication Test
test_search("journal", {"q": "%"}, "Auth: JWT via Authorization Header")
# 2. JWT Query Parameter Authentication Test
if JWT_TOKEN:
test_search("journal", {"q": "%"}, "Auth: JWT via 'jwt' Query Parameter", use_jwt_query=True)
# 3. New Operator: contains / icontains
query_contains = {
"and": [{"field": "name", "op": "contains", "value": "Journal"}]
}
test_search("journal", query_contains, "Operator: contains (automatically adds %%")
# 4. New Operator: startswith / istartswith
query_start = {
"and": [{"field": "name", "op": "startswith", "value": "A"}]
}
test_search("journal", query_start, "Operator: startswith (automatically adds % at end)")
# 5. New Operator: endswith / iendswith
query_end = {
"and": [{"field": "name", "op": "endswith", "value": "Test"}]
}
test_search("journal", query_end, "Operator: endswith (automatically adds % at start)")
# 6. Error Handling: Unsupported Operator
query_bad = {
"and": [{"field": "name", "op": "invalid_op", "value": "test"}]
}
test_search("journal", query_bad, "Error Handling: Unsupported Operator (Should return 400)")
# 7. Complex Nested Logic (Recap)
query_nested = {
"and": [
{"field": "enable", "op": "eq", "value": True},
{
"or": [
{"field": "name", "op": "icontains", "value": "aether"},
{"field": "summary", "op": "is_not_null"}
]
}
]
}
test_search("journal", query_nested, "Complex Logic: Nested AND/OR + icontains")
print("Tests Complete.")