Files
OSIT-AE-API-FastAPI/tests/e2e/test_e2e_v3_user_auth_routes.py

499 lines
17 KiB
Python

"""
E2E Tests: User Auth Routes (app/routers/user.py)
==================================================
Covers the active legacy user routes that are marked for migration to V3:
- PATCH /user/{user_id}/change_password
- GET /user/{user_id}/new_auth_key
- GET /user/authenticate ← KNOWN BUG: decorator accidentally commented out
- POST /user/verify_password
- GET /user/lookup
- GET /user/lookup_email
- GET /user/lookup_username
- GET /user/{user_id}/email_auth_key_url
Run from project root:
./environment/bin/python3 tests/e2e/test_e2e_v3_user_auth_routes.py
"""
import os
import sys
import time
import requests
sys.path.append(os.getcwd())
# --- Configuration ---
API_ROOT = "https://dev-api.oneskyit.com"
API_KEY = "PMM4n50teUCaOMMTN8qOJA"
ACCOUNT_ID = "_XY7DXtc9MY" # One Sky IT Demo account
# Standard headers for V3 CRUD (create/delete the test user)
V3_HEADERS = {
"x-aether-api-key": API_KEY,
"x-account-id": ACCOUNT_ID,
}
# Legacy routes use the same headers (Common_Route_Params reads x-account-id)
LEGACY_HEADERS = V3_HEADERS
TEST_PASSWORD = "TestAuth1234!" # >= 10 chars
NEW_PASSWORD = "NewTestPwd5678!" # used after change_password
# Populated during setup
_test_user_id = None # Vision ID (random string)
_test_username = None
_test_email = None
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def print_result(label, success, message=""):
status = "✅ PASS" if success else "❌ FAIL"
print(f" [{status}] {label}" + (f"{message}" if message else ""))
def assert_vision_id(obj_dict, field_name="user_id"):
"""Returns True if the given field is a string (Vision ID), not an int."""
val = obj_dict.get(field_name)
return isinstance(val, str) and 11 <= len(val) <= 22
# ---------------------------------------------------------------------------
# Setup / Teardown
# ---------------------------------------------------------------------------
def setup_test_user():
"""Create a temporary test user via V3 CRUD. Returns the Vision ID or None."""
global _test_user_id, _test_username, _test_email
ts = int(time.time())
_test_username = f"test_auth_e2e_{ts}"
_test_email = f"test_auth_e2e_{ts}@test.invalid"
payload = {
"account_id": ACCOUNT_ID,
"username": _test_username,
"name": "E2E Auth Test User",
"email": _test_email,
"new_password": TEST_PASSWORD,
"enable": True,
"allow_auth_key": True, # needed for new_auth_key / email_auth_key_url tests
}
resp = requests.post(
f"{API_ROOT}/v3/crud/user/",
json=payload,
headers=V3_HEADERS,
)
if resp.status_code != 200:
print(f" [SETUP ❌] Failed to create test user — HTTP {resp.status_code}")
print(f" {resp.text[:300]}")
return None
data = resp.json().get("data", {})
_test_user_id = data.get("user_id") or data.get("id")
if not _test_user_id:
print(f" [SETUP ❌] Test user created but no Vision ID returned: {data}")
return None
print(f" [SETUP ✅] Test user created — user_id={_test_user_id} username={_test_username}")
return _test_user_id
def teardown_test_user(user_id):
"""Delete the test user via V3 CRUD."""
if not user_id:
return
resp = requests.delete(
f"{API_ROOT}/v3/crud/user/{user_id}",
headers=V3_HEADERS,
)
if resp.status_code == 200:
print(f" [TEARDOWN ✅] Test user deleted — user_id={user_id}")
else:
print(f" [TEARDOWN ❌] Failed to delete test user — HTTP {resp.status_code} {resp.text[:200]}")
# ---------------------------------------------------------------------------
# change_password
# ---------------------------------------------------------------------------
def test_change_password():
"""PATCH /user/{user_id}/change_password — valid new password."""
print("\n--- change_password ---")
resp = requests.patch(
f"{API_ROOT}/user/{_test_user_id}/change_password",
json={"password": NEW_PASSWORD},
headers=LEGACY_HEADERS,
)
success = resp.status_code == 200 and resp.json().get("data") is not False
print_result("Valid password change", success,
f"HTTP {resp.status_code}" if not success else "")
return success
def test_change_password_too_short():
"""PATCH /user/{user_id}/change_password — password < 10 chars → 400."""
resp = requests.patch(
f"{API_ROOT}/user/{_test_user_id}/change_password",
json={"password": "short"},
headers=LEGACY_HEADERS,
)
print_result("Short password rejected (400)", resp.status_code == 400,
f"HTTP {resp.status_code}")
def test_change_password_missing_field():
"""PATCH /user/{user_id}/change_password — no password field → 400."""
resp = requests.patch(
f"{API_ROOT}/user/{_test_user_id}/change_password",
json={"not_password": "whatever"},
headers=LEGACY_HEADERS,
)
print_result("Missing password field rejected (400)", resp.status_code == 400,
f"HTTP {resp.status_code}")
def test_change_password_invalid_user():
"""PATCH /user/{invalid_id}/change_password → 404."""
resp = requests.patch(
f"{API_ROOT}/user/NotARealUserID99/change_password",
json={"password": "ValidPassword123!"},
headers=LEGACY_HEADERS,
)
print_result("Invalid user_id rejected (404)", resp.status_code == 404,
f"HTTP {resp.status_code}")
# ---------------------------------------------------------------------------
# new_auth_key
# ---------------------------------------------------------------------------
def test_new_auth_key():
"""GET /user/{user_id}/new_auth_key — generates and returns a new key."""
print("\n--- new_auth_key ---")
resp = requests.get(
f"{API_ROOT}/user/{_test_user_id}/new_auth_key",
headers=LEGACY_HEADERS,
)
data = resp.json().get("data", {})
has_key = isinstance(data, dict) and bool(data.get("auth_key"))
print_result("New auth_key generated", resp.status_code == 200 and has_key,
f"HTTP {resp.status_code}")
return data.get("auth_key") if has_key else None
def test_new_auth_key_invalid_user():
"""GET /user/{invalid_id}/new_auth_key → 404."""
resp = requests.get(
f"{API_ROOT}/user/NotARealUserID99/new_auth_key",
headers=LEGACY_HEADERS,
)
print_result("Invalid user_id rejected (404)", resp.status_code == 404,
f"HTTP {resp.status_code}")
# ---------------------------------------------------------------------------
# verify_password
# ---------------------------------------------------------------------------
def test_verify_password_by_username_correct():
"""POST /user/verify_password — correct password via username → True."""
print("\n--- verify_password ---")
resp = requests.post(
f"{API_ROOT}/user/verify_password",
json={"username": _test_username, "current_password": NEW_PASSWORD},
headers=LEGACY_HEADERS,
)
success = resp.status_code == 200 and resp.json().get("data") is True
print_result("Correct password (username path)", success,
f"HTTP {resp.status_code}")
def test_verify_password_by_username_wrong():
"""POST /user/verify_password — wrong password → failure response."""
resp = requests.post(
f"{API_ROOT}/user/verify_password",
json={"username": _test_username, "current_password": "WrongPassword999!"},
headers=LEGACY_HEADERS,
)
# Returns data=False with 200 or a 404
data = resp.json().get("data")
success = (resp.status_code in [200, 404]) and (data is False or data is None)
print_result("Wrong password rejected", success,
f"HTTP {resp.status_code} data={data}")
def test_verify_password_by_user_id():
"""
POST /user/verify_password — correct password via Vision ID ('id' field).
The handler reads user_obj.id (User_Base Vision ID field). Send the
Vision ID as 'id' in the request body.
"""
resp = requests.post(
f"{API_ROOT}/user/verify_password",
json={"id": _test_user_id, "current_password": NEW_PASSWORD},
headers=LEGACY_HEADERS,
)
data = resp.json().get("data")
success = resp.status_code == 200 and data is True
print_result("Correct password (Vision ID / 'id' path)", success,
f"HTTP {resp.status_code} data={data}")
def test_verify_password_missing_fields():
"""POST /user/verify_password — no user_id or username → 400."""
resp = requests.post(
f"{API_ROOT}/user/verify_password",
json={"current_password": NEW_PASSWORD},
headers=LEGACY_HEADERS,
)
print_result("Missing user fields rejected (400)", resp.status_code == 400,
f"HTTP {resp.status_code}")
# ---------------------------------------------------------------------------
# lookup, lookup_email, lookup_username
# ---------------------------------------------------------------------------
def test_lookup_by_account():
"""GET /user/lookup?for_obj_type=account&for_obj_id={account_id} — returns user list."""
print("\n--- lookup ---")
resp = requests.get(
f"{API_ROOT}/user/lookup",
params={"for_obj_type": "account", "for_obj_id": ACCOUNT_ID},
headers=LEGACY_HEADERS,
)
data = resp.json().get("data")
success = resp.status_code == 200 and isinstance(data, list) and len(data) > 0
print_result("Lookup by account (list)", success, f"HTTP {resp.status_code} count={len(data) if isinstance(data, list) else 'n/a'}")
# Vision ID check on first result
if success and isinstance(data, list) and data:
has_vision_id = assert_vision_id(data[0], "user_id")
print_result("Vision ID compliance (user_id is string)", has_vision_id,
f"user_id={data[0].get('user_id')!r}")
def test_lookup_by_person_invalid():
"""GET /user/lookup?for_obj_type=person&for_obj_id={bad_id} → 404."""
resp = requests.get(
f"{API_ROOT}/user/lookup",
params={"for_obj_type": "person", "for_obj_id": "NotARealUID999"},
headers=LEGACY_HEADERS,
)
print_result("Invalid person ID rejected (404)", resp.status_code == 404,
f"HTTP {resp.status_code}")
def test_lookup_bad_obj_type():
"""GET /user/lookup?for_obj_type=invalid → 400."""
resp = requests.get(
f"{API_ROOT}/user/lookup",
params={"for_obj_type": "invoice", "for_obj_id": ACCOUNT_ID},
headers=LEGACY_HEADERS,
)
print_result("Unsupported for_obj_type rejected (400)", resp.status_code == 400,
f"HTTP {resp.status_code}")
def test_lookup_email():
"""GET /user/lookup_email?email={email} — finds the test user."""
print("\n--- lookup_email ---")
resp = requests.get(
f"{API_ROOT}/user/lookup_email",
params={"email": _test_email},
headers=LEGACY_HEADERS,
)
data = resp.json().get("data")
found = (
resp.status_code == 200
and isinstance(data, dict)
and data.get("email") == _test_email
)
print_result("Lookup by email (found)", found, f"HTTP {resp.status_code}")
if found:
has_vision_id = assert_vision_id(data, "user_id")
print_result("Vision ID compliance (user_id is string)", has_vision_id,
f"user_id={data.get('user_id')!r}")
def test_lookup_email_not_found():
"""GET /user/lookup_email?email={nonexistent} → 404."""
resp = requests.get(
f"{API_ROOT}/user/lookup_email",
params={"email": "nobody_at_all@test.invalid"},
headers=LEGACY_HEADERS,
)
print_result("Nonexistent email → 404", resp.status_code == 404,
f"HTTP {resp.status_code}")
def test_lookup_username():
"""GET /user/lookup_username?username={username} — finds the test user."""
print("\n--- lookup_username ---")
resp = requests.get(
f"{API_ROOT}/user/lookup_username",
params={"username": _test_username},
headers=LEGACY_HEADERS,
)
data = resp.json().get("data")
found = (
resp.status_code == 200
and isinstance(data, dict)
and data.get("username") == _test_username
)
print_result("Lookup by username (found)", found, f"HTTP {resp.status_code}")
if found:
has_vision_id = assert_vision_id(data, "user_id")
print_result("Vision ID compliance (user_id is string)", has_vision_id,
f"user_id={data.get('user_id')!r}")
def test_lookup_username_not_found():
"""GET /user/lookup_username?username={nonexistent} → 404."""
resp = requests.get(
f"{API_ROOT}/user/lookup_username",
params={"username": "no_such_user_xyz_99999"},
headers=LEGACY_HEADERS,
)
print_result("Nonexistent username → 404", resp.status_code == 404,
f"HTTP {resp.status_code}")
# ---------------------------------------------------------------------------
# email_auth_key_url
# ---------------------------------------------------------------------------
def test_email_auth_key_url():
"""
GET /user/{user_id}/email_auth_key_url — generates auth key and sends email.
NOTE: The test user email uses '@test.invalid' domain, so actual mail
delivery will fail. This test verifies the route responds correctly;
expect HTTP 500 if the mail server rejects the send. The auth key IS
generated and stored regardless of email success.
"""
print("\n--- email_auth_key_url ---")
resp = requests.get(
f"{API_ROOT}/user/{_test_user_id}/email_auth_key_url",
params={"root_url": "https://dev-app.oneskyit.com"},
headers=LEGACY_HEADERS,
)
# 200 = email sent; 500 = route hit but email delivery failed (acceptable for .invalid)
route_hit = resp.status_code in [200, 500]
print_result("Route reachable", route_hit, f"HTTP {resp.status_code}"
+ (" (email delivery failed — expected for .invalid domain)" if resp.status_code == 500 else ""))
def test_email_auth_key_url_invalid_user():
"""GET /user/{invalid_id}/email_auth_key_url → 404."""
resp = requests.get(
f"{API_ROOT}/user/NotARealUserID99/email_auth_key_url",
params={"root_url": "https://dev-app.oneskyit.com"},
headers=LEGACY_HEADERS,
)
print_result("Invalid user_id rejected (404)", resp.status_code == 404,
f"HTTP {resp.status_code}")
# ---------------------------------------------------------------------------
# BUG VERIFICATION: user_authenticate route
# ---------------------------------------------------------------------------
def test_authenticate():
"""
GET /user/authenticate — authenticate with username + password.
Note: The @router.get() decorator was accidentally commented out in a
prior version (user.py line 226). That bug has been fixed. This test
verifies the route is reachable and returns user data on success.
"""
print("\n--- authenticate ---")
resp = requests.get(
f"{API_ROOT}/user/authenticate",
params={"username": _test_username, "password": NEW_PASSWORD},
headers=LEGACY_HEADERS,
)
data = resp.json().get("data")
success = resp.status_code == 200 and isinstance(data, dict) and bool(data.get("user_id") or data.get("id"))
print_result("authenticate (username+password)", success, f"HTTP {resp.status_code}")
if success:
has_vision_id = assert_vision_id(data, "user_id")
print_result("Vision ID compliance (user_id is string)", has_vision_id,
f"user_id={data.get('user_id')!r}")
# Wrong password should be rejected
resp2 = requests.get(
f"{API_ROOT}/user/authenticate",
params={"username": _test_username, "password": "WrongPassword000!"},
headers=LEGACY_HEADERS,
)
print_result("Wrong password rejected", resp2.status_code in [200, 404] and resp2.json().get("data") is not True,
f"HTTP {resp2.status_code}")
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
if __name__ == "__main__":
suite_start = time.time()
print("=" * 60)
print("User Auth Routes E2E Test Suite")
print(f"API: {API_ROOT}")
print("=" * 60)
# --- Setup ---
print("\n[Setup]")
user_id = setup_test_user()
if not user_id:
print("\n❌ Setup failed — cannot run tests. Aborting.")
sys.exit(1)
# --- Tests ---
test_change_password()
test_change_password_too_short()
test_change_password_missing_field()
test_change_password_invalid_user()
test_new_auth_key()
test_new_auth_key_invalid_user()
test_verify_password_by_username_correct()
test_verify_password_by_username_wrong()
test_verify_password_by_user_id()
test_verify_password_missing_fields()
test_lookup_by_account()
test_lookup_by_person_invalid()
test_lookup_bad_obj_type()
test_lookup_email()
test_lookup_email_not_found()
test_lookup_username()
test_lookup_username_not_found()
test_email_auth_key_url()
test_email_auth_key_url_invalid_user()
test_authenticate()
# --- Teardown ---
print("\n[Teardown]")
teardown_test_user(user_id)
elapsed = time.time() - suite_start
print(f"\n{'=' * 60}")
print(f"Suite completed in {elapsed:.2f}s")
print("=" * 60)