""" Read-only concurrent stress test against V3 list endpoints. Fires N workers each making R sequential requests across a set of list endpoints, then prints per-endpoint latency stats and an overall error summary. Usage (from project root): ./environment/bin/python3 tests/tools/stress_list_queries.py ./environment/bin/python3 tests/tools/stress_list_queries.py --workers 20 --requests 10 ./environment/bin/python3 tests/tools/stress_list_queries.py --base-url https://api.oneskyit.com --workers 5 """ import argparse import math import statistics import sys import time from concurrent.futures import ThreadPoolExecutor, as_completed import requests DEFAULT_BASE_URL = "https://test-api.oneskyit.com" API_KEY = "nT0jPeiCfxSifkiDZur9jA" ACCOUNT_ID = "_XY7DXtc9MY" # One Sky IT Demo HEADERS = { "x-aether-api-key": API_KEY, "x-account-id": ACCOUNT_ID, } # Read-only list endpoints to hammer. Each is a (label, path) tuple. ENDPOINTS = [ ("event list", "/v3/crud/event/"), ("event_session list", "/v3/crud/event_session/"), ("event_badge list", "/v3/crud/event_badge/"), ("event_file list", "/v3/crud/event_file/"), ("person list", "/v3/crud/person/"), ("journal list", "/v3/crud/journal/"), ("hosted_file list", "/v3/crud/hosted_file/"), ("data_store list", "/v3/crud/data_store/"), ] def percentile(sorted_times: list[float], pct: float) -> float: """Return the pct-th percentile of a pre-sorted list (0–100).""" if not sorted_times: return 0.0 k = (len(sorted_times) - 1) * pct / 100 lo, hi = int(math.floor(k)), int(math.ceil(k)) return sorted_times[lo] + (sorted_times[hi] - sorted_times[lo]) * (k - lo) def do_request(label: str, url: str, session: requests.Session) -> dict: t0 = time.perf_counter() try: r = session.get(url, headers=HEADERS, timeout=15) elapsed = (time.perf_counter() - t0) * 1000 return {"label": label, "status": r.status_code, "ms": elapsed, "error": None} except Exception as e: elapsed = (time.perf_counter() - t0) * 1000 return {"label": label, "status": 0, "ms": elapsed, "error": str(e)} def worker(worker_id: int, requests_per_worker: int, base_url: str, limit: int) -> list[dict]: results = [] with requests.Session() as session: for _ in range(requests_per_worker): for label, path in ENDPOINTS: url = f"{base_url}{path}?limit={limit}" results.append(do_request(label, url, session)) return results def print_result(label, success, message=""): icon = "āœ…" if success else "āŒ" suffix = f" — {message}" if message else "" print(f" [{icon}] {label}{suffix}") def main(): parser = argparse.ArgumentParser(description="Concurrent read-only stress test") parser.add_argument("--workers", type=int, default=10, help="Concurrent worker threads (default: 10)") parser.add_argument("--requests", type=int, default=5, help="Requests per worker per endpoint (default: 5)") parser.add_argument("--limit", type=int, default=20, help="?limit= param on each list request (default: 20)") parser.add_argument("--base-url", type=str, default=DEFAULT_BASE_URL, help=f"API base URL (default: {DEFAULT_BASE_URL})") args = parser.parse_args() total_requests = args.workers * args.requests * len(ENDPOINTS) print(f"\nšŸ”„ Stress Test: {args.workers} workers Ɨ {args.requests} rounds Ɨ {len(ENDPOINTS)} endpoints = {total_requests} total requests") print(f" Target: {args.base_url} limit={args.limit}\n") all_results: list[dict] = [] suite_start = time.perf_counter() with ThreadPoolExecutor(max_workers=args.workers) as pool: futures = [pool.submit(worker, wid, args.requests, args.base_url, args.limit) for wid in range(args.workers)] for f in as_completed(futures): all_results.extend(f.result()) suite_elapsed = time.perf_counter() - suite_start # --- Per-endpoint stats --- print("─" * 60) print(f"{'Endpoint':<35} {'OK':>5} {'ERR':>5} {'p50ms':>7} {'p95ms':>7} {'maxms':>7}") print("─" * 60) by_label: dict[str, list[dict]] = {} for r in all_results: by_label.setdefault(r["label"], []).append(r) any_fail = False for label, _ in ENDPOINTS: rows = by_label.get(label, []) ok = [r for r in rows if r["status"] in (200, 201, 404) and not r["error"]] err = [r for r in rows if r not in ok] times = sorted(r["ms"] for r in ok) p50 = statistics.median(times) if times else 0 p95 = percentile(times, 95) mx = max(times) if times else 0 flag = "" if not err else " ⚠" if err: any_fail = True print(f" {label:<33} {len(ok):>5} {len(err):>5} {p50:>7.0f} {p95:>7.0f} {mx:>7.0f}{flag}") print("─" * 60) # --- Error detail --- errors = [r for r in all_results if r["error"] or r["status"] not in (200, 201, 404)] if errors: print(f"\n⚠ {len(errors)} errors encountered:") seen = set() for r in errors: key = (r["label"], r["status"], r["error"]) if key not in seen: seen.add(key) print(f" [{r['status']}] {r['label']}: {r['error'] or 'non-2xx/404'}") else: print("\nāœ… Zero errors.") # --- Overall summary --- all_times = sorted(r["ms"] for r in all_results if not r["error"]) rps = total_requests / suite_elapsed print(f"\nšŸ {total_requests} requests in {suite_elapsed:.2f}s ({rps:.1f} req/s)") if all_times: print(f" p50={statistics.median(all_times):.0f}ms " f"p95={percentile(all_times, 95):.0f}ms " f"max={max(all_times):.0f}ms\n") sys.exit(1 if any_fail else 0) if __name__ == "__main__": main()