153 lines
5.9 KiB
Python
153 lines
5.9 KiB
Python
"""
|
||
Read-only concurrent stress test against V3 list endpoints.
|
||
Fires N workers each making R sequential requests across a set of
|
||
list endpoints, then prints per-endpoint latency stats and an
|
||
overall error summary.
|
||
|
||
Usage (from project root):
|
||
./environment/bin/python3 tests/tools/stress_list_queries.py
|
||
./environment/bin/python3 tests/tools/stress_list_queries.py --workers 20 --requests 10
|
||
./environment/bin/python3 tests/tools/stress_list_queries.py --base-url https://api.oneskyit.com --workers 5
|
||
"""
|
||
import argparse
|
||
import math
|
||
import statistics
|
||
import sys
|
||
import time
|
||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||
|
||
import requests
|
||
|
||
DEFAULT_BASE_URL = "https://test-api.oneskyit.com"
|
||
API_KEY = "nT0jPeiCfxSifkiDZur9jA"
|
||
ACCOUNT_ID = "_XY7DXtc9MY" # One Sky IT Demo
|
||
|
||
HEADERS = {
|
||
"x-aether-api-key": API_KEY,
|
||
"x-account-id": ACCOUNT_ID,
|
||
}
|
||
|
||
# Read-only list endpoints to hammer. Each is a (label, path) tuple.
|
||
ENDPOINTS = [
|
||
("event list", "/v3/crud/event/"),
|
||
("event_session list", "/v3/crud/event_session/"),
|
||
("event_badge list", "/v3/crud/event_badge/"),
|
||
("event_file list", "/v3/crud/event_file/"),
|
||
("person list", "/v3/crud/person/"),
|
||
("journal list", "/v3/crud/journal/"),
|
||
("hosted_file list", "/v3/crud/hosted_file/"),
|
||
("data_store list", "/v3/crud/data_store/"),
|
||
]
|
||
|
||
|
||
def percentile(sorted_times: list[float], pct: float) -> float:
|
||
"""Return the pct-th percentile of a pre-sorted list (0–100)."""
|
||
if not sorted_times:
|
||
return 0.0
|
||
k = (len(sorted_times) - 1) * pct / 100
|
||
lo, hi = int(math.floor(k)), int(math.ceil(k))
|
||
return sorted_times[lo] + (sorted_times[hi] - sorted_times[lo]) * (k - lo)
|
||
|
||
|
||
def do_request(label: str, url: str, session: requests.Session) -> dict:
|
||
t0 = time.perf_counter()
|
||
try:
|
||
r = session.get(url, headers=HEADERS, timeout=15)
|
||
elapsed = (time.perf_counter() - t0) * 1000
|
||
return {"label": label, "status": r.status_code, "ms": elapsed, "error": None}
|
||
except Exception as e:
|
||
elapsed = (time.perf_counter() - t0) * 1000
|
||
return {"label": label, "status": 0, "ms": elapsed, "error": str(e)}
|
||
|
||
|
||
def worker(worker_id: int, requests_per_worker: int, base_url: str, limit: int) -> list[dict]:
|
||
results = []
|
||
with requests.Session() as session:
|
||
for _ in range(requests_per_worker):
|
||
for label, path in ENDPOINTS:
|
||
url = f"{base_url}{path}?limit={limit}"
|
||
results.append(do_request(label, url, session))
|
||
return results
|
||
|
||
|
||
def print_result(label, success, message=""):
|
||
icon = "✅" if success else "❌"
|
||
suffix = f" — {message}" if message else ""
|
||
print(f" [{icon}] {label}{suffix}")
|
||
|
||
|
||
def main():
|
||
parser = argparse.ArgumentParser(description="Concurrent read-only stress test")
|
||
parser.add_argument("--workers", type=int, default=10, help="Concurrent worker threads (default: 10)")
|
||
parser.add_argument("--requests", type=int, default=5, help="Requests per worker per endpoint (default: 5)")
|
||
parser.add_argument("--limit", type=int, default=20, help="?limit= param on each list request (default: 20)")
|
||
parser.add_argument("--base-url", type=str, default=DEFAULT_BASE_URL, help=f"API base URL (default: {DEFAULT_BASE_URL})")
|
||
args = parser.parse_args()
|
||
|
||
total_requests = args.workers * args.requests * len(ENDPOINTS)
|
||
print(f"\n🔥 Stress Test: {args.workers} workers × {args.requests} rounds × {len(ENDPOINTS)} endpoints = {total_requests} total requests")
|
||
print(f" Target: {args.base_url} limit={args.limit}\n")
|
||
|
||
all_results: list[dict] = []
|
||
suite_start = time.perf_counter()
|
||
|
||
with ThreadPoolExecutor(max_workers=args.workers) as pool:
|
||
futures = [pool.submit(worker, wid, args.requests, args.base_url, args.limit) for wid in range(args.workers)]
|
||
for f in as_completed(futures):
|
||
all_results.extend(f.result())
|
||
|
||
suite_elapsed = time.perf_counter() - suite_start
|
||
|
||
# --- Per-endpoint stats ---
|
||
print("─" * 60)
|
||
print(f"{'Endpoint':<35} {'OK':>5} {'ERR':>5} {'p50ms':>7} {'p95ms':>7} {'maxms':>7}")
|
||
print("─" * 60)
|
||
|
||
by_label: dict[str, list[dict]] = {}
|
||
for r in all_results:
|
||
by_label.setdefault(r["label"], []).append(r)
|
||
|
||
any_fail = False
|
||
for label, _ in ENDPOINTS:
|
||
rows = by_label.get(label, [])
|
||
ok = [r for r in rows if r["status"] in (200, 201, 404) and not r["error"]]
|
||
err = [r for r in rows if r not in ok]
|
||
times = sorted(r["ms"] for r in ok)
|
||
p50 = statistics.median(times) if times else 0
|
||
p95 = percentile(times, 95)
|
||
mx = max(times) if times else 0
|
||
flag = "" if not err else " ⚠"
|
||
if err:
|
||
any_fail = True
|
||
print(f" {label:<33} {len(ok):>5} {len(err):>5} {p50:>7.0f} {p95:>7.0f} {mx:>7.0f}{flag}")
|
||
|
||
print("─" * 60)
|
||
|
||
# --- Error detail ---
|
||
errors = [r for r in all_results if r["error"] or r["status"] not in (200, 201, 404)]
|
||
if errors:
|
||
print(f"\n⚠ {len(errors)} errors encountered:")
|
||
seen = set()
|
||
for r in errors:
|
||
key = (r["label"], r["status"], r["error"])
|
||
if key not in seen:
|
||
seen.add(key)
|
||
print(f" [{r['status']}] {r['label']}: {r['error'] or 'non-2xx/404'}")
|
||
else:
|
||
print("\n✅ Zero errors.")
|
||
|
||
# --- Overall summary ---
|
||
all_times = sorted(r["ms"] for r in all_results if not r["error"])
|
||
rps = total_requests / suite_elapsed
|
||
print(f"\n🏁 {total_requests} requests in {suite_elapsed:.2f}s ({rps:.1f} req/s)")
|
||
if all_times:
|
||
print(f" p50={statistics.median(all_times):.0f}ms "
|
||
f"p95={percentile(all_times, 95):.0f}ms "
|
||
f"max={max(all_times):.0f}ms\n")
|
||
|
||
sys.exit(1 if any_fail else 0)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|