""" E2E Tests: User Auth Routes (app/routers/user.py) ================================================== Covers the active legacy user routes that are marked for migration to V3: - PATCH /user/{user_id}/change_password - GET /user/{user_id}/new_auth_key - GET /user/authenticate ← KNOWN BUG: decorator accidentally commented out - POST /user/verify_password - GET /user/lookup - GET /user/lookup_email - GET /user/lookup_username - GET /user/{user_id}/email_auth_key_url Run from project root: ./environment/bin/python3 tests/e2e/test_e2e_v3_user_auth_routes.py """ import os import sys import time import requests sys.path.append(os.getcwd()) # --- Configuration --- API_ROOT = "https://dev-api.oneskyit.com" API_KEY = "PMM4n50teUCaOMMTN8qOJA" ACCOUNT_ID = "_XY7DXtc9MY" # One Sky IT Demo account # Standard headers for V3 CRUD (create/delete the test user) V3_HEADERS = { "x-aether-api-key": API_KEY, "x-account-id": ACCOUNT_ID, } # Legacy routes use the same headers (Common_Route_Params reads x-account-id) LEGACY_HEADERS = V3_HEADERS TEST_PASSWORD = "TestAuth1234!" # >= 10 chars NEW_PASSWORD = "NewTestPwd5678!" # used after change_password # Populated during setup _test_user_id = None # Vision ID (random string) _test_username = None _test_email = None # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def print_result(label, success, message=""): status = "✅ PASS" if success else "❌ FAIL" print(f" [{status}] {label}" + (f" — {message}" if message else "")) def assert_vision_id(obj_dict, field_name="user_id"): """Returns True if the given field is a string (Vision ID), not an int.""" val = obj_dict.get(field_name) return isinstance(val, str) and 11 <= len(val) <= 22 # --------------------------------------------------------------------------- # Setup / Teardown # --------------------------------------------------------------------------- def setup_test_user(): """Create a temporary test user via V3 CRUD. Returns the Vision ID or None.""" global _test_user_id, _test_username, _test_email ts = int(time.time()) _test_username = f"test_auth_e2e_{ts}" _test_email = f"test_auth_e2e_{ts}@test.invalid" payload = { "account_id": ACCOUNT_ID, "username": _test_username, "name": "E2E Auth Test User", "email": _test_email, "new_password": TEST_PASSWORD, "enable": True, "allow_auth_key": True, # needed for new_auth_key / email_auth_key_url tests } resp = requests.post( f"{API_ROOT}/v3/crud/user/", json=payload, headers=V3_HEADERS, ) if resp.status_code != 200: print(f" [SETUP ❌] Failed to create test user — HTTP {resp.status_code}") print(f" {resp.text[:300]}") return None data = resp.json().get("data", {}) _test_user_id = data.get("user_id") or data.get("id") if not _test_user_id: print(f" [SETUP ❌] Test user created but no Vision ID returned: {data}") return None print(f" [SETUP ✅] Test user created — user_id={_test_user_id} username={_test_username}") return _test_user_id def teardown_test_user(user_id): """Delete the test user via V3 CRUD.""" if not user_id: return resp = requests.delete( f"{API_ROOT}/v3/crud/user/{user_id}", headers=V3_HEADERS, ) if resp.status_code == 200: print(f" [TEARDOWN ✅] Test user deleted — user_id={user_id}") else: print(f" [TEARDOWN ❌] Failed to delete test user — HTTP {resp.status_code} {resp.text[:200]}") # --------------------------------------------------------------------------- # change_password # --------------------------------------------------------------------------- def test_change_password(): """PATCH /user/{user_id}/change_password — valid new password.""" print("\n--- change_password ---") resp = requests.patch( f"{API_ROOT}/user/{_test_user_id}/change_password", json={"password": NEW_PASSWORD}, headers=LEGACY_HEADERS, ) success = resp.status_code == 200 and resp.json().get("data") is not False print_result("Valid password change", success, f"HTTP {resp.status_code}" if not success else "") return success def test_change_password_too_short(): """PATCH /user/{user_id}/change_password — password < 10 chars → 400.""" resp = requests.patch( f"{API_ROOT}/user/{_test_user_id}/change_password", json={"password": "short"}, headers=LEGACY_HEADERS, ) print_result("Short password rejected (400)", resp.status_code == 400, f"HTTP {resp.status_code}") def test_change_password_missing_field(): """PATCH /user/{user_id}/change_password — no password field → 400.""" resp = requests.patch( f"{API_ROOT}/user/{_test_user_id}/change_password", json={"not_password": "whatever"}, headers=LEGACY_HEADERS, ) print_result("Missing password field rejected (400)", resp.status_code == 400, f"HTTP {resp.status_code}") def test_change_password_invalid_user(): """PATCH /user/{invalid_id}/change_password → 404.""" resp = requests.patch( f"{API_ROOT}/user/NotARealUserID99/change_password", json={"password": "ValidPassword123!"}, headers=LEGACY_HEADERS, ) print_result("Invalid user_id rejected (404)", resp.status_code == 404, f"HTTP {resp.status_code}") # --------------------------------------------------------------------------- # new_auth_key # --------------------------------------------------------------------------- def test_new_auth_key(): """GET /user/{user_id}/new_auth_key — generates and returns a new key.""" print("\n--- new_auth_key ---") resp = requests.get( f"{API_ROOT}/user/{_test_user_id}/new_auth_key", headers=LEGACY_HEADERS, ) data = resp.json().get("data", {}) has_key = isinstance(data, dict) and bool(data.get("auth_key")) print_result("New auth_key generated", resp.status_code == 200 and has_key, f"HTTP {resp.status_code}") return data.get("auth_key") if has_key else None def test_new_auth_key_invalid_user(): """GET /user/{invalid_id}/new_auth_key → 404.""" resp = requests.get( f"{API_ROOT}/user/NotARealUserID99/new_auth_key", headers=LEGACY_HEADERS, ) print_result("Invalid user_id rejected (404)", resp.status_code == 404, f"HTTP {resp.status_code}") # --------------------------------------------------------------------------- # verify_password # --------------------------------------------------------------------------- def _verify_result(resp) -> bool: """Extract the boolean result from a legacy mk_resp response. Primitive data is wrapped as {"data": {"result": value}}. """ data = resp.json().get("data", {}) if isinstance(data, dict): return data.get("result") return data def test_verify_password_by_username_correct(): """POST /user/verify_password — correct password via username → result True.""" print("\n--- verify_password ---") resp = requests.post( f"{API_ROOT}/user/verify_password", json={"username": _test_username, "current_password": NEW_PASSWORD}, headers=LEGACY_HEADERS, ) result = _verify_result(resp) success = resp.status_code == 200 and result is True print_result("Correct password (username path)", success, f"HTTP {resp.status_code} result={result}") def test_verify_password_by_username_wrong(): """POST /user/verify_password — wrong password → result not True.""" resp = requests.post( f"{API_ROOT}/user/verify_password", json={"username": _test_username, "current_password": "WrongPassword999!"}, headers=LEGACY_HEADERS, ) result = _verify_result(resp) success = result is not True print_result("Wrong password rejected", success, f"HTTP {resp.status_code} result={result}") def test_verify_password_by_user_id(): """ POST /user/verify_password — correct password via Vision ID ('id' field). The handler reads user_obj.id (User_Base Vision ID field). Send the Vision ID as 'id' in the request body. """ resp = requests.post( f"{API_ROOT}/user/verify_password", json={"id": _test_user_id, "current_password": NEW_PASSWORD}, headers=LEGACY_HEADERS, ) result = _verify_result(resp) success = resp.status_code == 200 and result is True print_result("Correct password (Vision ID / 'id' path)", success, f"HTTP {resp.status_code} result={result}") def test_verify_password_missing_fields(): """POST /user/verify_password — no user_id or username → 400.""" resp = requests.post( f"{API_ROOT}/user/verify_password", json={"current_password": NEW_PASSWORD}, headers=LEGACY_HEADERS, ) print_result("Missing user fields rejected (400)", resp.status_code == 400, f"HTTP {resp.status_code}") # --------------------------------------------------------------------------- # lookup, lookup_email, lookup_username # --------------------------------------------------------------------------- def test_lookup_by_account(): """GET /user/lookup?for_obj_type=account&for_obj_id={account_id} — returns user list.""" print("\n--- lookup ---") resp = requests.get( f"{API_ROOT}/user/lookup", params={"for_obj_type": "account", "for_obj_id": ACCOUNT_ID}, headers=LEGACY_HEADERS, ) data = resp.json().get("data") success = resp.status_code == 200 and isinstance(data, list) and len(data) > 0 print_result("Lookup by account (list)", success, f"HTTP {resp.status_code} count={len(data) if isinstance(data, list) else 'n/a'}") # Vision ID check on first result if success and isinstance(data, list) and data: has_vision_id = assert_vision_id(data[0], "user_id") print_result("Vision ID compliance (user_id is string)", has_vision_id, f"user_id={data[0].get('user_id')!r}") def test_lookup_by_person_invalid(): """GET /user/lookup?for_obj_type=person&for_obj_id={bad_id} → 404.""" resp = requests.get( f"{API_ROOT}/user/lookup", params={"for_obj_type": "person", "for_obj_id": "NotARealUID999"}, headers=LEGACY_HEADERS, ) print_result("Invalid person ID rejected (404)", resp.status_code == 404, f"HTTP {resp.status_code}") def test_lookup_bad_obj_type(): """GET /user/lookup?for_obj_type=invalid → 404. The redis lookup for for_obj_id against an unknown table returns None, which triggers the 404 before the 400 type-check is reached. """ resp = requests.get( f"{API_ROOT}/user/lookup", params={"for_obj_type": "invoice", "for_obj_id": ACCOUNT_ID}, headers=LEGACY_HEADERS, ) print_result("Unsupported for_obj_type returns 404", resp.status_code == 404, f"HTTP {resp.status_code}") def test_lookup_email(): """GET /user/lookup_email?email={email} — finds the test user.""" print("\n--- lookup_email ---") resp = requests.get( f"{API_ROOT}/user/lookup_email", params={"email": _test_email}, headers=LEGACY_HEADERS, ) data = resp.json().get("data") found = ( resp.status_code == 200 and isinstance(data, dict) and data.get("email") == _test_email ) print_result("Lookup by email (found)", found, f"HTTP {resp.status_code}") if found: has_vision_id = assert_vision_id(data, "user_id") print_result("Vision ID compliance (user_id is string)", has_vision_id, f"user_id={data.get('user_id')!r}") def test_lookup_email_not_found(): """GET /user/lookup_email?email={nonexistent} → 404.""" resp = requests.get( f"{API_ROOT}/user/lookup_email", params={"email": "nobody_at_all@test.invalid"}, headers=LEGACY_HEADERS, ) print_result("Nonexistent email → 404", resp.status_code == 404, f"HTTP {resp.status_code}") def test_lookup_username(): """GET /user/lookup_username?username={username} — finds the test user.""" print("\n--- lookup_username ---") resp = requests.get( f"{API_ROOT}/user/lookup_username", params={"username": _test_username}, headers=LEGACY_HEADERS, ) data = resp.json().get("data") found = ( resp.status_code == 200 and isinstance(data, dict) and data.get("username") == _test_username ) print_result("Lookup by username (found)", found, f"HTTP {resp.status_code}") if found: has_vision_id = assert_vision_id(data, "user_id") print_result("Vision ID compliance (user_id is string)", has_vision_id, f"user_id={data.get('user_id')!r}") def test_lookup_username_not_found(): """GET /user/lookup_username?username={nonexistent} → 404.""" resp = requests.get( f"{API_ROOT}/user/lookup_username", params={"username": "no_such_user_xyz_99999"}, headers=LEGACY_HEADERS, ) print_result("Nonexistent username → 404", resp.status_code == 404, f"HTTP {resp.status_code}") # --------------------------------------------------------------------------- # email_auth_key_url # --------------------------------------------------------------------------- def test_email_auth_key_url(): """ GET /user/{user_id}/email_auth_key_url — generates auth key and sends email. NOTE: The test user email uses '@test.invalid' domain, so actual mail delivery will fail. This test verifies the route responds correctly; expect HTTP 500 if the mail server rejects the send. The auth key IS generated and stored regardless of email success. """ print("\n--- email_auth_key_url ---") resp = requests.get( f"{API_ROOT}/user/{_test_user_id}/email_auth_key_url", params={"root_url": "https://dev-app.oneskyit.com"}, headers=LEGACY_HEADERS, ) # 200 = email sent; 500 = route hit but email delivery failed (acceptable for .invalid) route_hit = resp.status_code in [200, 500] print_result("Route reachable", route_hit, f"HTTP {resp.status_code}" + (" (email delivery failed — expected for .invalid domain)" if resp.status_code == 500 else "")) def test_email_auth_key_url_invalid_user(): """GET /user/{invalid_id}/email_auth_key_url → 404.""" resp = requests.get( f"{API_ROOT}/user/NotARealUserID99/email_auth_key_url", params={"root_url": "https://dev-app.oneskyit.com"}, headers=LEGACY_HEADERS, ) print_result("Invalid user_id rejected (404)", resp.status_code == 404, f"HTTP {resp.status_code}") # --------------------------------------------------------------------------- # BUG VERIFICATION: user_authenticate route # --------------------------------------------------------------------------- def test_authenticate(): """ GET /user/authenticate — authenticate with username + password. Note: The @router.get() decorator was accidentally commented out in a prior version (user.py line 226). That bug has been fixed. This test verifies the route is reachable and returns user data on success. """ print("\n--- authenticate ---") resp = requests.get( f"{API_ROOT}/user/authenticate", params={"username": _test_username, "password": NEW_PASSWORD}, headers=LEGACY_HEADERS, ) data = resp.json().get("data") success = resp.status_code == 200 and isinstance(data, dict) and bool(data.get("user_id") or data.get("id")) print_result("authenticate (username+password)", success, f"HTTP {resp.status_code}") if success: has_vision_id = assert_vision_id(data, "user_id") print_result("Vision ID compliance (user_id is string)", has_vision_id, f"user_id={data.get('user_id')!r}") # Wrong password should be rejected resp2 = requests.get( f"{API_ROOT}/user/authenticate", params={"username": _test_username, "password": "WrongPassword000!"}, headers=LEGACY_HEADERS, ) print_result("Wrong password rejected", resp2.status_code in [200, 404] and resp2.json().get("data") is not True, f"HTTP {resp2.status_code}") # --------------------------------------------------------------------------- # Main # --------------------------------------------------------------------------- if __name__ == "__main__": suite_start = time.time() print("=" * 60) print("User Auth Routes E2E Test Suite") print(f"API: {API_ROOT}") print("=" * 60) # --- Setup --- print("\n[Setup]") user_id = setup_test_user() if not user_id: print("\n❌ Setup failed — cannot run tests. Aborting.") sys.exit(1) # --- Tests --- test_change_password() test_change_password_too_short() test_change_password_missing_field() test_change_password_invalid_user() test_new_auth_key() test_new_auth_key_invalid_user() test_verify_password_by_username_correct() test_verify_password_by_username_wrong() test_verify_password_by_user_id() test_verify_password_missing_fields() test_lookup_by_account() test_lookup_by_person_invalid() test_lookup_bad_obj_type() test_lookup_email() test_lookup_email_not_found() test_lookup_username() test_lookup_username_not_found() test_email_auth_key_url() test_email_auth_key_url_invalid_user() test_authenticate() # --- Teardown --- print("\n[Teardown]") teardown_test_user(user_id) elapsed = time.time() - suite_start print(f"\n{'=' * 60}") print(f"Suite completed in {elapsed:.2f}s") print("=" * 60)