chore(tests): organize test scripts and beautify account creation email
- Moved scattered Python test scripts from root and 'admin/development/' to 'tests/'. - Beautified the HTML email body for account creation links in 'app/methods/person_methods.py' with a modern responsive design.
This commit is contained in:
13
tests/check_db_schema.py
Normal file
13
tests/check_db_schema.py
Normal file
@@ -0,0 +1,13 @@
|
||||
from app.db_sql import db
|
||||
from sqlalchemy import text
|
||||
|
||||
def check_columns():
|
||||
try:
|
||||
result = db.execute(text("DESCRIBE account"))
|
||||
columns = [row[0] for row in result.fetchall()]
|
||||
print(f"Columns in 'account': {columns}")
|
||||
except Exception as e:
|
||||
print(f"Error describing 'account': {e}")
|
||||
|
||||
if __name__ == "__main__":
|
||||
check_columns()
|
||||
19
tests/check_site_schema.py
Normal file
19
tests/check_site_schema.py
Normal file
@@ -0,0 +1,19 @@
|
||||
from app.db_sql import db
|
||||
from sqlalchemy import text
|
||||
|
||||
def check_schema(name):
|
||||
print(f"--- Schema for {name} ---")
|
||||
try:
|
||||
result = db.execute(text(f"DESCRIBE `{name}`"))
|
||||
for row in result.fetchall():
|
||||
print(f" {row[0]} ({row[1]})")
|
||||
except Exception as e:
|
||||
print(f" Error: {e}")
|
||||
print("-" * 30)
|
||||
|
||||
if __name__ == "__main__":
|
||||
# Check tables
|
||||
check_schema("site")
|
||||
check_schema("site_domain")
|
||||
# Check views used in CRUD V3
|
||||
check_schema("v_site_domain")
|
||||
25
tests/check_site_schema_remote.py
Normal file
25
tests/check_site_schema_remote.py
Normal file
@@ -0,0 +1,25 @@
|
||||
import requests
|
||||
import json
|
||||
|
||||
BASE_URL = "https://dev-api.oneskyit.com/v3/crud"
|
||||
ACCOUNT_ID = "nqOzejLCDXM"
|
||||
|
||||
def get_headers():
|
||||
return {"X-Account-ID": ACCOUNT_ID}
|
||||
|
||||
def check_schema(obj):
|
||||
print(f"--- Schema for {obj} ---")
|
||||
url = f"{BASE_URL}/{obj}/schema"
|
||||
r = requests.get(url, headers=get_headers())
|
||||
if r.status_code == 200:
|
||||
data = r.json()['data']
|
||||
cols = [c['field'] for col in [data['database']['columns']] for c in col]
|
||||
print(f"Columns: {cols}")
|
||||
fields = list(data['model']['fields'].keys())
|
||||
print(f"Pydantic Fields: {fields}")
|
||||
else:
|
||||
print(f"Error {r.status_code}: {r.text}")
|
||||
|
||||
if __name__ == "__main__":
|
||||
check_schema("site")
|
||||
check_schema("site_domain")
|
||||
54
tests/diagnose_boot.py
Normal file
54
tests/diagnose_boot.py
Normal file
@@ -0,0 +1,54 @@
|
||||
import sys
|
||||
import os
|
||||
|
||||
print("--- Starting Import Diagnosis ---")
|
||||
|
||||
try:
|
||||
print("1. Importing app.config...")
|
||||
from app import config
|
||||
print(f" Success. Settings found: {hasattr(config, 'settings')}")
|
||||
except Exception as e:
|
||||
print(f" FAILED: {e}")
|
||||
sys.exit(1)
|
||||
|
||||
try:
|
||||
print("2. Importing app.log...")
|
||||
import app.log
|
||||
print(" Success.")
|
||||
except Exception as e:
|
||||
print(f" FAILED: {e}")
|
||||
import traceback
|
||||
traceback.print_exc()
|
||||
sys.exit(1)
|
||||
|
||||
try:
|
||||
print("3. Importing app.db_connection...")
|
||||
import app.db_connection
|
||||
print(" Success.")
|
||||
except Exception as e:
|
||||
print(f" FAILED: {e}")
|
||||
import traceback
|
||||
traceback.print_exc()
|
||||
sys.exit(1)
|
||||
|
||||
try:
|
||||
print("4. Importing app.db_sql...")
|
||||
import app.db_sql
|
||||
print(" Success.")
|
||||
except Exception as e:
|
||||
print(f" FAILED: {e}")
|
||||
import traceback
|
||||
traceback.print_exc()
|
||||
sys.exit(1)
|
||||
|
||||
try:
|
||||
print("5. Importing app.main...")
|
||||
import app.main
|
||||
print(" Success.")
|
||||
except Exception as e:
|
||||
print(f" FAILED: {e}")
|
||||
import traceback
|
||||
traceback.print_exc()
|
||||
sys.exit(1)
|
||||
|
||||
print("--- Diagnosis Complete: No top-level import errors found in local env ---")
|
||||
60
tests/mcp_docker_explorer.py
Normal file
60
tests/mcp_docker_explorer.py
Normal file
@@ -0,0 +1,60 @@
|
||||
import asyncio
|
||||
import sys
|
||||
import json
|
||||
from mcp import ClientSession, StdioServerParameters
|
||||
from mcp.client.stdio import stdio_client
|
||||
|
||||
async def run_docker_mcp_explorer():
|
||||
# Define the server parameters to run the Docker MCP server via npx
|
||||
# Using the official Docker MCP server from the Model Context Protocol organization
|
||||
server_params = StdioServerParameters(
|
||||
command="npx",
|
||||
args=["-y", "@modelcontextprotocol/server-docker"],
|
||||
env=None
|
||||
)
|
||||
|
||||
print("--- Connecting to Docker MCP Server ---")
|
||||
|
||||
try:
|
||||
async with stdio_client(server_params) as (read, write):
|
||||
async with ClientSession(read, write) as session:
|
||||
# Initialize the session
|
||||
print("Initializing session...")
|
||||
await session.initialize()
|
||||
|
||||
# 1. List available tools
|
||||
print("\n--- Available Tools ---")
|
||||
tools_result = await session.list_tools()
|
||||
for tool in tools_result.tools:
|
||||
print(f"- {tool.name}: {tool.description}")
|
||||
|
||||
# 2. Call 'docker_list_containers'
|
||||
print("\n--- Calling 'docker_list_containers' ---")
|
||||
# The official server tool name is 'docker_list_containers'
|
||||
# It doesn't require arguments for a basic list
|
||||
containers_result = await session.call_tool("docker_list_containers", arguments={})
|
||||
|
||||
# The result comes back as a list of Content objects
|
||||
if containers_result.content:
|
||||
for item in containers_result.content:
|
||||
if item.type == 'text':
|
||||
# Parse the text (which is usually a JSON string for this tool)
|
||||
try:
|
||||
containers = json.loads(item.text)
|
||||
print(f"Found {len(containers)} containers:")
|
||||
for c in containers:
|
||||
status = c.get('Status', 'Unknown')
|
||||
names = ", ".join(c.get('Names', []))
|
||||
print(f" [{c.get('Id')[:12]}] {names} ({status})")
|
||||
except json.JSONDecodeError:
|
||||
print(item.text)
|
||||
else:
|
||||
print("No containers found or empty response.")
|
||||
|
||||
except Exception as e:
|
||||
print(f"\nError: {e}")
|
||||
if "npx" in str(e):
|
||||
print("Ensure 'npx' is installed and available in your PATH.")
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(run_docker_mcp_explorer())
|
||||
55
tests/test_agent_bridge.py
Normal file
55
tests/test_agent_bridge.py
Normal file
@@ -0,0 +1,55 @@
|
||||
import requests
|
||||
import json
|
||||
|
||||
# Configuration
|
||||
BASE_URL = "https://dev-api.oneskyit.com/agent"
|
||||
|
||||
def get_headers():
|
||||
headers = {
|
||||
"Content-Type": "application/json",
|
||||
"X-No-Account-ID": "testing-bypass"
|
||||
}
|
||||
return headers
|
||||
|
||||
def test_endpoint(method, path, description, params=None):
|
||||
"""
|
||||
Helper to run a test and print results.
|
||||
"""
|
||||
print(f"--- Testing: {description} ---")
|
||||
url = f"{BASE_URL}{path}"
|
||||
|
||||
request_headers = get_headers()
|
||||
|
||||
try:
|
||||
if method == "GET":
|
||||
response = requests.get(url, headers=request_headers, params=params)
|
||||
|
||||
print(f"URL: {response.url}")
|
||||
print(f"Status Code: {response.status_code}")
|
||||
|
||||
data = response.json()
|
||||
|
||||
# Check if the result is a list or single object
|
||||
result_data = data.get('data')
|
||||
if isinstance(result_data, dict):
|
||||
print(f"Result Data: {json.dumps(result_data, indent=2)}")
|
||||
else:
|
||||
print(f"Result Data (truncated): {str(result_data)[:200]}...")
|
||||
|
||||
if response.status_code != 200:
|
||||
print(f"Error Message: {data.get('status_message')}")
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error during test: {e}")
|
||||
print("-" * 40 + "\n")
|
||||
|
||||
if __name__ == "__main__":
|
||||
print(f"Starting Aether Agent Bridge Tests against {BASE_URL}\n")
|
||||
|
||||
# 1. Get Status
|
||||
test_endpoint("GET", "/status", "Get Container Status")
|
||||
|
||||
# 2. Get Logs (last 5 lines)
|
||||
test_endpoint("GET", "/logs", "Get Latest Logs", params={"lines": 5})
|
||||
|
||||
print("Tests Complete.")
|
||||
47
tests/test_site_lookup.py
Normal file
47
tests/test_site_lookup.py
Normal file
@@ -0,0 +1,47 @@
|
||||
import requests
|
||||
import json
|
||||
import sys
|
||||
|
||||
# Configuration
|
||||
BASE_URL = "https://dev-api.oneskyit.com/v3/crud"
|
||||
FQDN_TO_TEST = "dev-app.oneskyit.com" # Example domain that should exist
|
||||
|
||||
def test_site_domain_lookup():
|
||||
print(f"--- Testing Public Site Domain Lookup: {FQDN_TO_TEST} ---")
|
||||
url = f"{BASE_URL}/site_domain/search"
|
||||
|
||||
# Payload: Search for FQDN, explicitly WITHOUT authentication
|
||||
# The frontend typically sends this to bootstrap
|
||||
query = {
|
||||
"and": [{"field": "fqdn", "op": "eq", "value": FQDN_TO_TEST}]
|
||||
}
|
||||
|
||||
# NO AUTH HEADERS (Simulating unauthenticated bootstrapping)
|
||||
headers = {
|
||||
"Content-Type": "application/json"
|
||||
}
|
||||
|
||||
try:
|
||||
response = requests.post(url, headers=headers, json=query)
|
||||
print(f"URL: {response.url}")
|
||||
print(f"Status Code: {response.status_code}")
|
||||
|
||||
data = response.json()
|
||||
|
||||
if response.status_code == 200:
|
||||
result_data = data.get('data')
|
||||
if isinstance(result_data, list):
|
||||
print(f"Success! Found {len(result_data)} records.")
|
||||
if len(result_data) > 0:
|
||||
print(f"Data: {json.dumps(result_data[0], indent=2)}")
|
||||
else:
|
||||
print(f"Unexpected data format: {type(result_data)}")
|
||||
else:
|
||||
print(f"Failure (Expected before fix). Message: {data.get('status_message')}")
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error during test: {e}")
|
||||
print("-" * 40 + "\n")
|
||||
|
||||
if __name__ == "__main__":
|
||||
test_site_domain_lookup()
|
||||
67
tests/test_v3_accounts.py
Normal file
67
tests/test_v3_accounts.py
Normal file
@@ -0,0 +1,67 @@
|
||||
import requests
|
||||
import json
|
||||
import sys
|
||||
|
||||
# Configuration
|
||||
BASE_URL = "https://dev-api.oneskyit.com/v3/crud"
|
||||
|
||||
# --- AUTHENTICATION CONFIG ---
|
||||
ACCOUNT_ID = "nqOzejLCDXM" # Legacy Header Fallback
|
||||
|
||||
def get_headers():
|
||||
headers = {
|
||||
"Content-Type": "application/json",
|
||||
"X-Account-ID": ACCOUNT_ID
|
||||
}
|
||||
return headers
|
||||
|
||||
def test_endpoint(method, path, description, query=None, params=None):
|
||||
"""
|
||||
Helper to run a test and print results.
|
||||
"""
|
||||
print(f"--- Testing: {description} ---")
|
||||
url = f"{BASE_URL}{path}"
|
||||
|
||||
request_headers = get_headers()
|
||||
|
||||
try:
|
||||
if method == "GET":
|
||||
response = requests.get(url, headers=request_headers, params=params)
|
||||
elif method == "POST":
|
||||
response = requests.post(url, headers=request_headers, json=query, params=params)
|
||||
|
||||
print(f"URL: {response.url}")
|
||||
print(f"Status Code: {response.status_code}")
|
||||
|
||||
data = response.json()
|
||||
|
||||
# Check if the result is a list or single object
|
||||
result_data = data.get('data')
|
||||
if isinstance(result_data, list):
|
||||
print(f"Result Count: {len(result_data)}")
|
||||
if len(result_data) > 0:
|
||||
print(f"First Item Example: {json.dumps(result_data[0], indent=2)}")
|
||||
else:
|
||||
print(f"Result Type: {type(result_data)}")
|
||||
print(f"Result Data: {json.dumps(result_data, indent=2)}")
|
||||
|
||||
if response.status_code != 200:
|
||||
print(f"Error Message: {data.get('status_message')}")
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error during test: {e}")
|
||||
print("-" * 40 + "\n")
|
||||
|
||||
if __name__ == "__main__":
|
||||
print(f"Starting Aether V3 Account Tests against {BASE_URL}\n")
|
||||
|
||||
# 1. List Accounts
|
||||
test_endpoint("GET", "/account/", "List Accounts (GET)")
|
||||
|
||||
# 2. Search Accounts (Full Text)
|
||||
test_endpoint("POST", "/account/search", "Search Accounts (POST - All)", query={"q": "%"})
|
||||
|
||||
# 3. Search Accounts (Specific Name)
|
||||
test_endpoint("POST", "/account/search", "Search Accounts (POST - Specific)", query={"and": [{"field": "name", "op": "icontains", "value": "Sky"}]})
|
||||
|
||||
print("Tests Complete.")
|
||||
50
tests/test_v3_auth_isolation.py
Normal file
50
tests/test_v3_auth_isolation.py
Normal file
@@ -0,0 +1,50 @@
|
||||
|
||||
import sys
|
||||
import os
|
||||
from fastapi.testclient import TestClient
|
||||
|
||||
# Add the project root to sys.path so we can import 'app'
|
||||
sys.path.append(os.getcwd())
|
||||
|
||||
from app.main import app
|
||||
|
||||
client = TestClient(app)
|
||||
|
||||
def test_site_domain_unauthenticated_search():
|
||||
"""Test that searching site_domain works without authentication."""
|
||||
print("Testing unauthenticated site_domain search...")
|
||||
# Using a simple search query that would typically be used to resolve FQDN
|
||||
search_payload = {
|
||||
"and_filters": [
|
||||
{"field": "fqdn", "op": "eq", "value": "aether.osit.dev"}
|
||||
]
|
||||
}
|
||||
response = client.post("/v3/crud/site_domain/search", json=search_payload)
|
||||
print(f"Response Status: {response.status_code}")
|
||||
print(f"Response Body: {response.json()}")
|
||||
|
||||
# We expect 200 OK (even if empty results, the point is it's not 403)
|
||||
assert response.status_code == 200
|
||||
assert response.json()["status"] == "success"
|
||||
|
||||
def test_account_unauthenticated_search_blocked():
|
||||
"""Test that searching other objects (e.g., account) is blocked without authentication."""
|
||||
print("\nTesting unauthenticated account search (should be blocked)...")
|
||||
search_payload = {
|
||||
"and_filters": []
|
||||
}
|
||||
response = client.post("/v3/crud/account/search", json=search_payload)
|
||||
print(f"Response Status: {response.status_code}")
|
||||
|
||||
# We expect 403 Forbidden
|
||||
assert response.status_code == 403
|
||||
assert "Authentication required" in response.json()["status_message"]
|
||||
|
||||
if __name__ == "__main__":
|
||||
try:
|
||||
test_site_domain_unauthenticated_search()
|
||||
test_account_unauthenticated_search_blocked()
|
||||
print("\nSUCCESS: V3 Auth Isolation bypass for site_domain is working correctly.")
|
||||
except Exception as e:
|
||||
print(f"\nFAILURE: {e}")
|
||||
sys.exit(1)
|
||||
55
tests/test_v3_extra.py
Normal file
55
tests/test_v3_extra.py
Normal file
@@ -0,0 +1,55 @@
|
||||
import requests
|
||||
import json
|
||||
|
||||
# Configuration
|
||||
BASE_URL = "https://dev-api.oneskyit.com/v3/crud"
|
||||
ACCOUNT_ID = "nqOzejLCDXM" # Legacy Header Fallback
|
||||
|
||||
def get_headers(include_account=True):
|
||||
headers = {
|
||||
"Content-Type": "application/json"
|
||||
}
|
||||
if include_account:
|
||||
headers["X-Account-ID"] = ACCOUNT_ID
|
||||
return headers
|
||||
|
||||
def test_endpoint(path, description, include_account=True):
|
||||
print(f"--- Testing: {description} ---")
|
||||
url = f"{BASE_URL}{path}"
|
||||
try:
|
||||
response = requests.get(url, headers=get_headers(include_account))
|
||||
print(f"URL: {response.url}")
|
||||
print(f"Status Code: {response.status_code}")
|
||||
|
||||
data = response.json()
|
||||
result_data = data.get('data')
|
||||
|
||||
if isinstance(result_data, list):
|
||||
print(f"Result Count: {len(result_data)}")
|
||||
if len(result_data) > 0:
|
||||
print(f"First Item: {json.dumps(result_data[0], indent=2)[:300]}...")
|
||||
else:
|
||||
print(f"Result Data: {json.dumps(result_data, indent=2)}")
|
||||
|
||||
if response.status_code != 200:
|
||||
print(f"Error Message: {data.get('status_message')}")
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error during test: {e}")
|
||||
print("-" * 40 + "\n")
|
||||
|
||||
if __name__ == "__main__":
|
||||
# Test Users
|
||||
test_endpoint("/user/", "List Users (Default filter)")
|
||||
test_endpoint("/user/?enabled=all&hidden=all", "List Users (Bypass filter)")
|
||||
|
||||
# Test Sites
|
||||
test_endpoint(f"/account/{ACCOUNT_ID}/site/", "List Sites (Account Filter)")
|
||||
|
||||
# Test Site Domains
|
||||
test_endpoint("/site_domain/", "List Site Domains (Default filter)")
|
||||
test_endpoint("/site_domain/?enabled=all&hidden=all", "List Site Domains (Bypass filter)")
|
||||
|
||||
# Test Legacy Site Domain Lookup (Initial frontend request)
|
||||
# This route is in api_crud.py (v1) and needs to work without an account ID header
|
||||
test_endpoint("/../../crud/site/domain/scott.localhost:5173?use_alt_table=true&use_alt_base=true", "Legacy Site Domain Lookup", include_account=False)
|
||||
49
tests/test_v3_schema.py
Normal file
49
tests/test_v3_schema.py
Normal file
@@ -0,0 +1,49 @@
|
||||
import requests
|
||||
import json
|
||||
|
||||
# Configuration
|
||||
BASE_URL = "https://dev-api.oneskyit.com/v3/crud"
|
||||
ACCOUNT_ID = "nqOzejLCDXM" # Legacy Header Fallback
|
||||
|
||||
def get_headers():
|
||||
return {
|
||||
"Content-Type": "application/json",
|
||||
"X-Account-ID": ACCOUNT_ID
|
||||
}
|
||||
|
||||
def test_schema(obj_type):
|
||||
print(f"--- Testing Schema: {obj_type} ---")
|
||||
url = f"{BASE_URL}/{obj_type}/schema"
|
||||
try:
|
||||
response = requests.get(url, headers=get_headers())
|
||||
print(f"URL: {response.url}")
|
||||
print(f"Status Code: {response.status_code}")
|
||||
|
||||
data = response.json()
|
||||
if response.status_code == 200:
|
||||
print(f"Object Type: {data['data']['object_type']}")
|
||||
print(f"Database Table: {data['data']['database']['table_name']}")
|
||||
print(f"Column Count: {len(data['data']['database']['columns'])}")
|
||||
print(f"Model Name: {data['data']['model']['name']}")
|
||||
print(f"Model Field Count: {len(data['data']['model']['fields'])}")
|
||||
|
||||
# Print a few columns and fields as example
|
||||
print("\nExample Columns:")
|
||||
for col in data['data']['database']['columns'][:3]:
|
||||
print(f" - {col['field']} ({col['type']})")
|
||||
|
||||
print("\nExample Model Fields:")
|
||||
fields = list(data['data']['model']['fields'].keys())
|
||||
for field in fields[:3]:
|
||||
f_info = data['data']['model']['fields'][field]
|
||||
print(f" - {field} (alias: {f_info['alias']}, type: {f_info['type']})")
|
||||
else:
|
||||
print(f"Error: {data.get('status_message')}")
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error during test: {e}")
|
||||
print("-" * 40 + "\n")
|
||||
|
||||
if __name__ == "__main__":
|
||||
test_schema("account")
|
||||
test_schema("event_badge")
|
||||
120
tests/test_v3_search.py
Normal file
120
tests/test_v3_search.py
Normal file
@@ -0,0 +1,120 @@
|
||||
import requests
|
||||
import json
|
||||
import sys
|
||||
|
||||
# Configuration
|
||||
BASE_URL = "https://dev-api.oneskyit.com/v3/crud"
|
||||
|
||||
# --- AUTHENTICATION CONFIG ---
|
||||
# Option 1: Modern JWT (Preferred)
|
||||
JWT_TOKEN = "" # PASTE YOUR JWT TOKEN HERE
|
||||
|
||||
# Option 2: Legacy Header Fallback
|
||||
ACCOUNT_ID = "nqOzejLCDXM"
|
||||
|
||||
def get_headers(use_jwt=True):
|
||||
headers = {
|
||||
"Content-Type": "application/json"
|
||||
}
|
||||
if use_jwt and JWT_TOKEN:
|
||||
headers["Authorization"] = f"Bearer {JWT_TOKEN}"
|
||||
else:
|
||||
headers["X-Account-ID"] = ACCOUNT_ID
|
||||
return headers
|
||||
|
||||
def test_search(obj_type, query, description, params=None, use_jwt_query=False):
|
||||
"""
|
||||
Helper to run a search test and print results.
|
||||
"""
|
||||
print(f"--- Testing: {description} ---")
|
||||
url = f"{BASE_URL}/{obj_type}/search"
|
||||
|
||||
# Handle JWT in query parameter if requested
|
||||
request_params = params.copy() if params else {}
|
||||
request_headers = get_headers(use_jwt=not use_jwt_query)
|
||||
|
||||
if use_jwt_query and JWT_TOKEN:
|
||||
request_params["jwt"] = JWT_TOKEN
|
||||
|
||||
try:
|
||||
response = requests.post(url, headers=request_headers, json=query, params=request_params)
|
||||
print(f"URL: {response.url}")
|
||||
print(f"Auth Method: {'JWT Query' if use_jwt_query else ('JWT Header' if JWT_TOKEN else 'Legacy Header')}")
|
||||
print(f"Status Code: {response.status_code}")
|
||||
|
||||
data = response.json()
|
||||
|
||||
# Check if the result is a list or single object
|
||||
result_data = data.get('data')
|
||||
if isinstance(result_data, list):
|
||||
print(f"Result Count: {len(result_data)}")
|
||||
if len(result_data) > 0:
|
||||
# Print first item as example
|
||||
print(f"First Item Example: {json.dumps(result_data[0], indent=2)[:200]}...")
|
||||
else:
|
||||
print(f"Result Type: {type(result_data)}")
|
||||
print(f"Result Data: {json.dumps(result_data, indent=2)}")
|
||||
|
||||
if response.status_code != 200:
|
||||
print(f"Error Message: {data.get('status_message')}")
|
||||
print(f"Meta Details: {json.dumps(data.get('meta', {}), indent=2)}")
|
||||
|
||||
except requests.exceptions.ConnectionError:
|
||||
print(f"Error: Could not connect to {BASE_URL}. Is the API running?")
|
||||
except Exception as e:
|
||||
print(f"Error during test: {e}")
|
||||
print("-" * 40 + "\n")
|
||||
|
||||
if __name__ == "__main__":
|
||||
if not JWT_TOKEN:
|
||||
print("WARNING: JWT_TOKEN is empty. Falling back to Legacy ACCOUNT_ID auth.")
|
||||
print("To test JWT, please paste a token into the JWT_TOKEN variable.\n")
|
||||
|
||||
print(f"Starting Aether V3 Search & JWT Tests against {BASE_URL}\n")
|
||||
|
||||
# 1. JWT Header Authentication Test
|
||||
test_search("journal", {"q": "%"}, "Auth: JWT via Authorization Header")
|
||||
|
||||
# 2. JWT Query Parameter Authentication Test
|
||||
if JWT_TOKEN:
|
||||
test_search("journal", {"q": "%"}, "Auth: JWT via 'jwt' Query Parameter", use_jwt_query=True)
|
||||
|
||||
# 3. New Operator: contains / icontains
|
||||
query_contains = {
|
||||
"and": [{"field": "name", "op": "contains", "value": "Journal"}]
|
||||
}
|
||||
test_search("journal", query_contains, "Operator: contains (automatically adds %%")
|
||||
|
||||
# 4. New Operator: startswith / istartswith
|
||||
query_start = {
|
||||
"and": [{"field": "name", "op": "startswith", "value": "A"}]
|
||||
}
|
||||
test_search("journal", query_start, "Operator: startswith (automatically adds % at end)")
|
||||
|
||||
# 5. New Operator: endswith / iendswith
|
||||
query_end = {
|
||||
"and": [{"field": "name", "op": "endswith", "value": "Test"}]
|
||||
}
|
||||
test_search("journal", query_end, "Operator: endswith (automatically adds % at start)")
|
||||
|
||||
# 6. Error Handling: Unsupported Operator
|
||||
query_bad = {
|
||||
"and": [{"field": "name", "op": "invalid_op", "value": "test"}]
|
||||
}
|
||||
test_search("journal", query_bad, "Error Handling: Unsupported Operator (Should return 400)")
|
||||
|
||||
# 7. Complex Nested Logic (Recap)
|
||||
query_nested = {
|
||||
"and": [
|
||||
{"field": "enable", "op": "eq", "value": True},
|
||||
{
|
||||
"or": [
|
||||
{"field": "name", "op": "icontains", "value": "aether"},
|
||||
{"field": "summary", "op": "is_not_null"}
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
||||
test_search("journal", query_nested, "Complex Logic: Nested AND/OR + icontains")
|
||||
|
||||
print("Tests Complete.")
|
||||
Reference in New Issue
Block a user