import requests import json import time import concurrent.futures # Configuration BASE_URL = "https://dev-api.oneskyit.com/v3/crud" API_KEY = "IDF68Em5X4HTZlswRNgepQ" ACCOUNT_ID = "_XY7DXtc9MY" JOURNAL_ID = "DCAV-06-35-85" EVENT_ID = "pjrcghqwert" HEADERS = { "X-Aether-API-Key": API_KEY, "X-Account-ID": ACCOUNT_ID, "Content-Type": "application/json" } def hit_endpoint(name, method, url, body=None, params=None): try: start = time.time() if method == "GET": resp = requests.get(url, headers=HEADERS, params=params, timeout=15) else: resp = requests.post(url, headers=HEADERS, json=body, params=params, timeout=15) duration = time.time() - start status = resp.status_code print(f"[{name}] Status: {status} | Duration: {duration:.3f}s") if status != 200: print(f" ❌ ERROR: {resp.text[:200]}") return False return True except Exception as e: print(f"[{name}] ❌ EXCEPTION: {e}") return False def test_batch(): success_count = 0 total = 0 # 1. Journal Entries (Nested GET) url = f"{BASE_URL}/journal/{JOURNAL_ID}/journal_entry/" if hit_endpoint("Journal Entries (Nested GET)", "GET", url): success_count += 1 total += 1 # 2. Event Badges (Search POST) url = f"{BASE_URL}/event_badge/search" query = {"q": "%"} if hit_endpoint("Event Badges (Search POST)", "POST", url, body=query): success_count += 1 total += 1 # 3. Event Sessions (Search POST) url = f"{BASE_URL}/event_session/search" query = {"q": "%"} if hit_endpoint("Event Sessions (Search POST)", "POST", url, body=query): success_count += 1 total += 1 # 4. Event Sessions (List GET with parent) url = f"{BASE_URL}/event_session/" params = {"for_obj_type": "event", "for_obj_id": EVENT_ID} if hit_endpoint("Event Sessions (List GET)", "GET", url, params=params): success_count += 1 total += 1 return success_count, total if __name__ == "__main__": print(f"Reproduction script started against {BASE_URL}") print(f"Using Account: {ACCOUNT_ID}\n") grand_success = 0 grand_total = 0 iterations = 5 # Running 20 requests total (5 batches of 4) with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor: futures = [executor.submit(test_batch) for _ in range(iterations)] for future in concurrent.futures.as_completed(futures): s, t = future.result() grand_success += s grand_total += t print(f"\nFinal Results: {grand_success}/{grand_total} requests succeeded.") if grand_success == grand_total: print("🎉 SUCCESS! No more timeouts or 403s.") else: print("❌ Reproduced some failures!")