Files
OSIT-AE-API-FastAPI/tests/e2e/repro_intermittent_errors.py
Scott Idem 6ca79e9a02 chore(api): stabilize SQL core and enhance searchability
- Refactor SQL CRUD to use engine.connect() context managers for thread safety
- Optimize connection pooling in lib_sql_core
- Clean up app/routers/api.py to fix duplicate definitions and OpenAPI KeyError
- Add 'default_qry_str' to searchable_fields for Event, Session, Presentation, Presenter, Badge, and Journal
- Add 'event_location_name' to searchable_fields for Event Session
- Verified 20/20 E2E success via repro_intermittent_errors.py
2026-01-21 15:23:04 -05:00

88 lines
2.8 KiB
Python

import requests
import json
import time
import concurrent.futures
# Configuration
BASE_URL = "https://dev-api.oneskyit.com/v3/crud"
API_KEY = "IDF68Em5X4HTZlswRNgepQ"
ACCOUNT_ID = "_XY7DXtc9MY"
JOURNAL_ID = "DCAV-06-35-85"
EVENT_ID = "pjrcghqwert"
HEADERS = {
"X-Aether-API-Key": API_KEY,
"X-Account-ID": ACCOUNT_ID,
"Content-Type": "application/json"
}
def hit_endpoint(name, method, url, body=None, params=None):
try:
start = time.time()
if method == "GET":
resp = requests.get(url, headers=HEADERS, params=params, timeout=15)
else:
resp = requests.post(url, headers=HEADERS, json=body, params=params, timeout=15)
duration = time.time() - start
status = resp.status_code
print(f"[{name}] Status: {status} | Duration: {duration:.3f}s")
if status != 200:
print(f" ❌ ERROR: {resp.text[:200]}")
return False
return True
except Exception as e:
print(f"[{name}] ❌ EXCEPTION: {e}")
return False
def test_batch():
success_count = 0
total = 0
# 1. Journal Entries (Nested GET)
url = f"{BASE_URL}/journal/{JOURNAL_ID}/journal_entry/"
if hit_endpoint("Journal Entries (Nested GET)", "GET", url): success_count += 1
total += 1
# 2. Event Badges (Search POST)
url = f"{BASE_URL}/event_badge/search"
query = {"q": "%"}
if hit_endpoint("Event Badges (Search POST)", "POST", url, body=query): success_count += 1
total += 1
# 3. Event Sessions (Search POST)
url = f"{BASE_URL}/event_session/search"
query = {"q": "%"}
if hit_endpoint("Event Sessions (Search POST)", "POST", url, body=query): success_count += 1
total += 1
# 4. Event Sessions (List GET with parent)
url = f"{BASE_URL}/event_session/"
params = {"for_obj_type": "event", "for_obj_id": EVENT_ID}
if hit_endpoint("Event Sessions (List GET)", "GET", url, params=params): success_count += 1
total += 1
return success_count, total
if __name__ == "__main__":
print(f"Reproduction script started against {BASE_URL}")
print(f"Using Account: {ACCOUNT_ID}\n")
grand_success = 0
grand_total = 0
iterations = 5
# Running 20 requests total (5 batches of 4)
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
futures = [executor.submit(test_batch) for _ in range(iterations)]
for future in concurrent.futures.as_completed(futures):
s, t = future.result()
grand_success += s
grand_total += t
print(f"\nFinal Results: {grand_success}/{grand_total} requests succeeded.")
if grand_success == grand_total:
print("🎉 SUCCESS! No more timeouts or 403s.")
else:
print("❌ Reproduced some failures!")