""" E2E tests for the Novi-Mailman Bridge — API connection checks. Verifies that both the Novi AMS API and Mailman 3 REST API are reachable and that the credentials stored in IDAA site cfg_json are valid. Run from project root: ./environment/bin/python3 tests/e2e/test_e2e_v3_action_novi_mailman.py Environment: AE_API_BASE — override the target API base URL (default: https://dev-api.oneskyit.com) Related tests: test_e2e_v3_action_novi_mailman_lists.py — member read/subscribe/unsubscribe lifecycle test_e2e_v3_action_novi_mailman_sync.py — full Novi → Mailman mirror sync (TODO) Credential storage: All Novi and Mailman credentials live in IDAA site cfg_json (id_random='58_gJESdlUh'). See project memory for key names. """ import os import sys import time import requests BASE_URL = os.environ.get('AE_API_BASE', 'https://dev-api.oneskyit.com') ACTION_BASE = f"{BASE_URL}/v3/action/e_novi_mailman" API_KEY = "PMM4n50teUCaOMMTN8qOJA" AUTH_HEADERS = { "X-Aether-API-Key": API_KEY, "x-no-account-id": "bypass", } pass_count = 0 fail_count = 0 def print_result(label: str, success: bool, message: str = ""): global pass_count, fail_count status = "✅ PASS" if success else "❌ FAIL" msg = f" {message}" if message else "" print(f" [{status}] {label}{msg}") if success: pass_count += 1 else: fail_count += 1 def test_novi_connection(): print("\n[1] Novi API Connection") try: resp = requests.get(f"{ACTION_BASE}/test_connection/novi", headers=AUTH_HEADERS, timeout=15) data = resp.json().get('data', {}) if resp.status_code == 200 and data.get('ok'): print_result("Novi credentials valid", True) elif resp.status_code == 401: print_result("Novi credentials valid", False, f"401 — {data.get('error', resp.text[:120])}") else: print_result("Novi credentials valid", False, f"HTTP {resp.status_code}: {resp.text[:120]}") except Exception as e: print_result("Novi credentials valid", False, f"Exception: {e}") def test_mailman_connection(): print("\n[2] Mailman 3 API Connection") try: resp = requests.get(f"{ACTION_BASE}/test_connection/mailman", headers=AUTH_HEADERS, timeout=15) data = resp.json().get('data', {}) if resp.status_code == 200 and data.get('ok'): print_result("Mailman credentials valid", True, f"version={data.get('version', 'unknown')}") elif resp.status_code == 401: print_result("Mailman credentials valid", False, f"401 — {data.get('error', resp.text[:120])} " f"(add mailman_* keys to IDAA cfg_json)") else: print_result("Mailman credentials valid", False, f"HTTP {resp.status_code}: {resp.text[:120]}") except Exception as e: print_result("Mailman credentials valid", False, f"Exception: {e}") if __name__ == "__main__": print("=" * 60) print(" Novi-Mailman Bridge — Connection Tests") print(f" Target: {ACTION_BASE}") print("=" * 60) t_start = time.time() test_novi_connection() test_mailman_connection() elapsed = time.time() - t_start print(f"\n{'=' * 60}") print(f" Results: {pass_count} passed, {fail_count} failed ({elapsed:.2f}s)") print("=" * 60) sys.exit(0 if fail_count == 0 else 1)