""" E2E Tests: V3 User Action Routes (app/routers/api_v3_actions_user.py) ====================================================================== Covers the new V3 action endpoints under /v3/action/user/: - POST /v3/action/user/authenticate - POST /v3/action/user/verify_password - POST /v3/action/user/{user_id}/change_password - GET /v3/action/user/{user_id}/new_auth_key - GET /v3/action/user/{user_id}/email_auth_key_url Setup: creates a temporary test user via V3 CRUD; tears down on completion. Run from project root: ./environment/bin/python3 tests/e2e/test_e2e_v3_user_action_routes.py """ import os import sys import time import requests sys.path.append(os.getcwd()) # --- Configuration --- API_ROOT = "https://dev-api.oneskyit.com" API_KEY = "PMM4n50teUCaOMMTN8qOJA" ACCOUNT_ID = "_XY7DXtc9MY" # One Sky IT Demo account V3_HEADERS = { "x-aether-api-key": API_KEY, "x-account-id": ACCOUNT_ID, } TEST_PASSWORD = "TestAction1234!" # >= 10 chars NEW_PASSWORD = "NewAction5678!" # used after change_password tests # Populated during setup _test_user_id = None # Vision ID (random string) _test_username = None _test_email = None # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def print_result(label, success, message=""): status = "✅ PASS" if success else "❌ FAIL" print(f" [{status}] {label}" + (f" — {message}" if message else "")) def assert_vision_id(obj, field_name="user_id"): """Returns True if field is a non-empty string of length 11–22 (Vision ID).""" val = obj.get(field_name) if isinstance(obj, dict) else None return isinstance(val, str) and 11 <= len(val) <= 22 # --------------------------------------------------------------------------- # Setup / Teardown # --------------------------------------------------------------------------- def setup_test_user(): """Create a temporary test user via V3 CRUD. Returns the Vision ID or None.""" global _test_user_id, _test_username, _test_email ts = int(time.time()) _test_username = f"test_v3act_e2e_{ts}" _test_email = f"test_v3act_e2e_{ts}@test.invalid" payload = { "account_id": ACCOUNT_ID, "username": _test_username, "name": "E2E V3 Action Test User", "email": _test_email, "new_password": TEST_PASSWORD, "enable": True, "allow_auth_key": True, } resp = requests.post(f"{API_ROOT}/v3/crud/user/", json=payload, headers=V3_HEADERS) if resp.status_code != 200: print(f" [SETUP ❌] Failed to create test user — HTTP {resp.status_code}") print(f" {resp.text[:300]}") return None data = resp.json().get("data", {}) _test_user_id = data.get("user_id") or data.get("id") if not _test_user_id: print(f" [SETUP ❌] Test user created but no Vision ID returned: {data}") return None print(f" [SETUP ✅] Test user created — user_id={_test_user_id} username={_test_username}") return _test_user_id def teardown_test_user(user_id): """Delete the test user via V3 CRUD.""" if not user_id: return resp = requests.delete(f"{API_ROOT}/v3/crud/user/{user_id}", headers=V3_HEADERS) if resp.status_code == 200: print(f" [TEARDOWN ✅] Test user deleted — user_id={user_id}") else: print(f" [TEARDOWN ❌] Failed to delete test user — HTTP {resp.status_code} {resp.text[:200]}") # --------------------------------------------------------------------------- # authenticate # --------------------------------------------------------------------------- def test_authenticate_username_password(): """POST /v3/action/user/authenticate — valid username + password.""" print("\n--- authenticate ---") resp = requests.post( f"{API_ROOT}/v3/action/user/authenticate", json={"username": _test_username, "password": TEST_PASSWORD}, headers=V3_HEADERS, ) data = resp.json().get("data", {}) vision_ok = assert_vision_id(data, "user_id") success = resp.status_code == 200 and vision_ok print_result("Valid username+password", success, f"HTTP {resp.status_code}" + ("" if vision_ok else " — missing Vision ID")) return success def test_authenticate_wrong_password(): """POST /v3/action/user/authenticate — wrong password → 403.""" resp = requests.post( f"{API_ROOT}/v3/action/user/authenticate", json={"username": _test_username, "password": "WrongPassword999!"}, headers=V3_HEADERS, ) success = resp.status_code == 403 print_result("Wrong password → 403", success, f"HTTP {resp.status_code}") return success def test_authenticate_unknown_user(): """POST /v3/action/user/authenticate — unknown username → 404.""" resp = requests.post( f"{API_ROOT}/v3/action/user/authenticate", json={"username": "no_such_user_xyzzy", "password": TEST_PASSWORD}, headers=V3_HEADERS, ) success = resp.status_code == 404 print_result("Unknown username → 404", success, f"HTTP {resp.status_code}") return success def test_authenticate_missing_fields(): """POST /v3/action/user/authenticate — no credentials → 400.""" resp = requests.post( f"{API_ROOT}/v3/action/user/authenticate", json={"username": _test_username}, # password missing headers=V3_HEADERS, ) success = resp.status_code == 400 print_result("Missing credentials → 400", success, f"HTTP {resp.status_code}") return success def test_authenticate_auth_key_flow(): """ Full auth-key flow: 1. GET new_auth_key → get a key 2. POST authenticate with user_id + auth_key → success 3. POST authenticate again with same key → 404 (key cleared) """ print("\n--- authenticate (auth_key flow) ---") # Step 1: generate key resp1 = requests.get( f"{API_ROOT}/v3/action/user/{_test_user_id}/new_auth_key", headers=V3_HEADERS, ) if resp1.status_code != 200: print_result("Auth key flow — generate key", False, f"HTTP {resp1.status_code}") return False key = resp1.json().get("data", {}).get("auth_key") if not key: print_result("Auth key flow — generate key", False, "No auth_key in response") return False # Step 2: authenticate with key resp2 = requests.post( f"{API_ROOT}/v3/action/user/authenticate", json={"user_id": _test_user_id, "auth_key": key}, headers=V3_HEADERS, ) data2 = resp2.json().get("data", {}) step2_ok = resp2.status_code == 200 and assert_vision_id(data2, "user_id") print_result("Auth key flow — first use succeeds", step2_ok, f"HTTP {resp2.status_code}") # Step 3: replay must fail (key is cleared) resp3 = requests.post( f"{API_ROOT}/v3/action/user/authenticate", json={"user_id": _test_user_id, "auth_key": key}, headers=V3_HEADERS, ) step3_ok = resp3.status_code == 404 print_result("Auth key flow — replay → 404 (one-time-use)", step3_ok, f"HTTP {resp3.status_code}") return step2_ok and step3_ok # --------------------------------------------------------------------------- # verify_password # --------------------------------------------------------------------------- def test_verify_password_by_user_id(): """POST /v3/action/user/verify_password — correct password by user_id.""" print("\n--- verify_password ---") resp = requests.post( f"{API_ROOT}/v3/action/user/verify_password", json={"user_id": _test_user_id, "current_password": TEST_PASSWORD}, headers=V3_HEADERS, ) data = resp.json().get("data") # Primitive True is wrapped as {"result": True} result = data.get("result") if isinstance(data, dict) else data success = resp.status_code == 200 and result is True print_result("Correct password by user_id → True", success, f"HTTP {resp.status_code}") return success def test_verify_password_by_username(): """POST /v3/action/user/verify_password — correct password by username.""" resp = requests.post( f"{API_ROOT}/v3/action/user/verify_password", json={"username": _test_username, "current_password": TEST_PASSWORD}, headers=V3_HEADERS, ) data = resp.json().get("data") result = data.get("result") if isinstance(data, dict) else data success = resp.status_code == 200 and result is True print_result("Correct password by username → True", success, f"HTTP {resp.status_code}") return success def test_verify_password_wrong(): """POST /v3/action/user/verify_password — wrong password → 403.""" resp = requests.post( f"{API_ROOT}/v3/action/user/verify_password", json={"user_id": _test_user_id, "current_password": "WrongPassword999!"}, headers=V3_HEADERS, ) success = resp.status_code == 403 print_result("Wrong password → 403", success, f"HTTP {resp.status_code}") return success def test_verify_password_no_identifier(): """POST /v3/action/user/verify_password — no user_id or username → 400.""" resp = requests.post( f"{API_ROOT}/v3/action/user/verify_password", json={"current_password": TEST_PASSWORD}, headers=V3_HEADERS, ) success = resp.status_code == 400 print_result("No identifier → 400", success, f"HTTP {resp.status_code}") return success # --------------------------------------------------------------------------- # change_password # --------------------------------------------------------------------------- def test_change_password_no_verification(): """POST /v3/action/user/{id}/change_password — no current_password (admin reset).""" print("\n--- change_password ---") resp = requests.post( f"{API_ROOT}/v3/action/user/{_test_user_id}/change_password", json={"new_password": NEW_PASSWORD}, headers=V3_HEADERS, ) data = resp.json().get("data") result = data.get("result") if isinstance(data, dict) else data success = resp.status_code == 200 and result is True print_result("Change password (no verification)", success, f"HTTP {resp.status_code}") # Verify the new password works resp2 = requests.post( f"{API_ROOT}/v3/action/user/verify_password", json={"user_id": _test_user_id, "current_password": NEW_PASSWORD}, headers=V3_HEADERS, ) data2 = resp2.json().get("data") r2 = data2.get("result") if isinstance(data2, dict) else data2 verify_ok = resp2.status_code == 200 and r2 is True print_result("New password accepted by verify_password", verify_ok, f"HTTP {resp2.status_code}") return success and verify_ok def test_change_password_with_verification(): """POST /v3/action/user/{id}/change_password — with correct current_password.""" # Password is currently NEW_PASSWORD (set by previous test) resp = requests.post( f"{API_ROOT}/v3/action/user/{_test_user_id}/change_password", json={"current_password": NEW_PASSWORD, "new_password": TEST_PASSWORD}, headers=V3_HEADERS, ) data = resp.json().get("data") result = data.get("result") if isinstance(data, dict) else data success = resp.status_code == 200 and result is True print_result("Change password with correct current_password", success, f"HTTP {resp.status_code}") return success def test_change_password_wrong_current(): """POST /v3/action/user/{id}/change_password — wrong current_password → 403.""" resp = requests.post( f"{API_ROOT}/v3/action/user/{_test_user_id}/change_password", json={"current_password": "WrongPassword999!", "new_password": NEW_PASSWORD}, headers=V3_HEADERS, ) success = resp.status_code == 403 print_result("Wrong current_password → 403", success, f"HTTP {resp.status_code}") return success def test_change_password_too_short(): """POST /v3/action/user/{id}/change_password — new_password < 10 chars → 422.""" resp = requests.post( f"{API_ROOT}/v3/action/user/{_test_user_id}/change_password", json={"new_password": "short"}, headers=V3_HEADERS, ) # Pydantic validation rejects min_length constraint with 422 Unprocessable Entity success = resp.status_code == 422 print_result("new_password too short → 422", success, f"HTTP {resp.status_code}") return success def test_change_password_bad_user(): """POST /v3/action/user/{id}/change_password — invalid user_id → 404.""" resp = requests.post( f"{API_ROOT}/v3/action/user/AAAAAAAAAAA/change_password", json={"new_password": "ValidPassword123!"}, headers=V3_HEADERS, ) success = resp.status_code == 404 print_result("Invalid user_id → 404", success, f"HTTP {resp.status_code}") return success # --------------------------------------------------------------------------- # new_auth_key # --------------------------------------------------------------------------- def test_new_auth_key(): """GET /v3/action/user/{user_id}/new_auth_key — generates and returns key.""" print("\n--- new_auth_key ---") resp = requests.get( f"{API_ROOT}/v3/action/user/{_test_user_id}/new_auth_key", headers=V3_HEADERS, ) data = resp.json().get("data", {}) key = data.get("auth_key") if isinstance(data, dict) else None success = resp.status_code == 200 and isinstance(key, str) and len(key) >= 11 print_result("Returns new auth_key string", success, f"HTTP {resp.status_code}" + (f" key={key!r}" if success else "")) return success def test_new_auth_key_bad_user(): """GET /v3/action/user/{user_id}/new_auth_key — invalid user → 404.""" resp = requests.get( f"{API_ROOT}/v3/action/user/AAAAAAAAAAA/new_auth_key", headers=V3_HEADERS, ) success = resp.status_code == 404 print_result("Invalid user_id → 404", success, f"HTTP {resp.status_code}") return success # --------------------------------------------------------------------------- # email_auth_key_url # --------------------------------------------------------------------------- def test_email_auth_key_url(): """GET /v3/action/user/{user_id}/email_auth_key_url — sends or fails gracefully.""" print("\n--- email_auth_key_url ---") resp = requests.get( f"{API_ROOT}/v3/action/user/{_test_user_id}/email_auth_key_url", params={"root_url": "https://test.invalid/login"}, headers=V3_HEADERS, ) # 200 = email sent; 500 = delivery failed (.invalid domain) — both are acceptable. success = resp.status_code in (200, 500) print_result( "email_auth_key_url (200=sent, 500=delivery failed — both OK for .invalid domain)", success, f"HTTP {resp.status_code}" ) return success def test_email_auth_key_url_bad_user(): """GET /v3/action/user/{user_id}/email_auth_key_url — invalid user → 404.""" resp = requests.get( f"{API_ROOT}/v3/action/user/AAAAAAAAAAA/email_auth_key_url", params={"root_url": "https://test.invalid/login"}, headers=V3_HEADERS, ) success = resp.status_code == 404 print_result("Invalid user_id → 404", success, f"HTTP {resp.status_code}") return success # --------------------------------------------------------------------------- # Auth guard checks # --------------------------------------------------------------------------- def test_no_api_key(): """All V3 action endpoints require x-aether-api-key — missing → 403.""" print("\n--- auth guards ---") resp = requests.post( f"{API_ROOT}/v3/action/user/authenticate", json={"username": _test_username, "password": TEST_PASSWORD}, headers={"x-account-id": ACCOUNT_ID}, # no API key ) success = resp.status_code == 403 print_result("No API key → 403", success, f"HTTP {resp.status_code}") return success # --------------------------------------------------------------------------- # Runner # --------------------------------------------------------------------------- def run_suite(): start = time.time() print("=" * 60) print("E2E: V3 User Action Routes") print("=" * 60) if not setup_test_user(): print("\n[ABORT] Setup failed — cannot run tests.\n") return results = [] # authenticate results.append(test_authenticate_username_password()) results.append(test_authenticate_wrong_password()) results.append(test_authenticate_unknown_user()) results.append(test_authenticate_missing_fields()) results.append(test_authenticate_auth_key_flow()) # verify_password results.append(test_verify_password_by_user_id()) results.append(test_verify_password_by_username()) results.append(test_verify_password_wrong()) results.append(test_verify_password_no_identifier()) # change_password (order matters — each test assumes the password left by the previous) results.append(test_change_password_no_verification()) # TEST → NEW results.append(test_change_password_with_verification()) # NEW → TEST results.append(test_change_password_wrong_current()) # bad → 403 (no change) results.append(test_change_password_too_short()) # bad → 422 results.append(test_change_password_bad_user()) # 404 # new_auth_key results.append(test_new_auth_key()) results.append(test_new_auth_key_bad_user()) # email_auth_key_url results.append(test_email_auth_key_url()) results.append(test_email_auth_key_url_bad_user()) # auth guards results.append(test_no_api_key()) teardown_test_user(_test_user_id) elapsed = time.time() - start passed = sum(1 for r in results if r) total = len(results) print(f"\n{'=' * 60}") print(f"Results: {passed}/{total} passed ({elapsed:.2f}s)") print("=" * 60) if __name__ == "__main__": run_suite()