New router: /v3/action/user/ (api_v3_actions_user.py)
- POST /authenticate — credentials in body (not query params; security fix)
- POST /verify_password
- POST /{user_id}/change_password — optional current-password verification
- GET /{user_id}/new_auth_key
- GET /{user_id}/email_auth_key_url
Registered in registry.py under /v3/action/user with V3 AccountContext auth.
Bug fixes (from audit in previous session):
- user.py: fix broken @router.get decorator (authenticate was unreachable)
- user.py + user_methods.py: fix AttributeError id_random → id (Vision ID)
- user_models.py: add fields_to_exclude_from_db to User_New_Base; narrow
collision prevention to self-reference IDs only
- user_models.py: pre-inject hashed password in root_validator(pre=True) so
exclude_unset=True in CRUD POST handler includes it (was writing NULL)
- api_crud_v3.py: move sanitize_payload + account_id injection to after
model validation (fixes FK integer collision with Vision ID constraints)
Docs: GUIDE__AE_API_V3_for_Frontend.md — new Section 7 with full migration
table (legacy → V3), request/response docs for all 5 action endpoints,
and V3 CRUD search equivalents for the 3 lookup routes.
Tests: tests/e2e/test_e2e_v3_user_action_routes.py — 19 tests, 19/19 pass.
Legacy tests/e2e/test_e2e_v3_user_auth_routes.py — 22/22 still pass.
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
512 lines
18 KiB
Python
512 lines
18 KiB
Python
"""
|
|
E2E Tests: User Auth Routes (app/routers/user.py)
|
|
==================================================
|
|
Covers the active legacy user routes that are marked for migration to V3:
|
|
- PATCH /user/{user_id}/change_password
|
|
- GET /user/{user_id}/new_auth_key
|
|
- GET /user/authenticate ← KNOWN BUG: decorator accidentally commented out
|
|
- POST /user/verify_password
|
|
- GET /user/lookup
|
|
- GET /user/lookup_email
|
|
- GET /user/lookup_username
|
|
- GET /user/{user_id}/email_auth_key_url
|
|
|
|
Run from project root:
|
|
./environment/bin/python3 tests/e2e/test_e2e_v3_user_auth_routes.py
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import time
|
|
import requests
|
|
|
|
sys.path.append(os.getcwd())
|
|
|
|
# --- Configuration ---
|
|
API_ROOT = "https://dev-api.oneskyit.com"
|
|
API_KEY = "PMM4n50teUCaOMMTN8qOJA"
|
|
ACCOUNT_ID = "_XY7DXtc9MY" # One Sky IT Demo account
|
|
|
|
# Standard headers for V3 CRUD (create/delete the test user)
|
|
V3_HEADERS = {
|
|
"x-aether-api-key": API_KEY,
|
|
"x-account-id": ACCOUNT_ID,
|
|
}
|
|
# Legacy routes use the same headers (Common_Route_Params reads x-account-id)
|
|
LEGACY_HEADERS = V3_HEADERS
|
|
|
|
TEST_PASSWORD = "TestAuth1234!" # >= 10 chars
|
|
NEW_PASSWORD = "NewTestPwd5678!" # used after change_password
|
|
|
|
# Populated during setup
|
|
_test_user_id = None # Vision ID (random string)
|
|
_test_username = None
|
|
_test_email = None
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def print_result(label, success, message=""):
|
|
status = "✅ PASS" if success else "❌ FAIL"
|
|
print(f" [{status}] {label}" + (f" — {message}" if message else ""))
|
|
|
|
|
|
def assert_vision_id(obj_dict, field_name="user_id"):
|
|
"""Returns True if the given field is a string (Vision ID), not an int."""
|
|
val = obj_dict.get(field_name)
|
|
return isinstance(val, str) and 11 <= len(val) <= 22
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Setup / Teardown
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def setup_test_user():
|
|
"""Create a temporary test user via V3 CRUD. Returns the Vision ID or None."""
|
|
global _test_user_id, _test_username, _test_email
|
|
|
|
ts = int(time.time())
|
|
_test_username = f"test_auth_e2e_{ts}"
|
|
_test_email = f"test_auth_e2e_{ts}@test.invalid"
|
|
|
|
payload = {
|
|
"account_id": ACCOUNT_ID,
|
|
"username": _test_username,
|
|
"name": "E2E Auth Test User",
|
|
"email": _test_email,
|
|
"new_password": TEST_PASSWORD,
|
|
"enable": True,
|
|
"allow_auth_key": True, # needed for new_auth_key / email_auth_key_url tests
|
|
}
|
|
|
|
resp = requests.post(
|
|
f"{API_ROOT}/v3/crud/user/",
|
|
json=payload,
|
|
headers=V3_HEADERS,
|
|
)
|
|
|
|
if resp.status_code != 200:
|
|
print(f" [SETUP ❌] Failed to create test user — HTTP {resp.status_code}")
|
|
print(f" {resp.text[:300]}")
|
|
return None
|
|
|
|
data = resp.json().get("data", {})
|
|
_test_user_id = data.get("user_id") or data.get("id")
|
|
|
|
if not _test_user_id:
|
|
print(f" [SETUP ❌] Test user created but no Vision ID returned: {data}")
|
|
return None
|
|
|
|
print(f" [SETUP ✅] Test user created — user_id={_test_user_id} username={_test_username}")
|
|
return _test_user_id
|
|
|
|
|
|
def teardown_test_user(user_id):
|
|
"""Delete the test user via V3 CRUD."""
|
|
if not user_id:
|
|
return
|
|
resp = requests.delete(
|
|
f"{API_ROOT}/v3/crud/user/{user_id}",
|
|
headers=V3_HEADERS,
|
|
)
|
|
if resp.status_code == 200:
|
|
print(f" [TEARDOWN ✅] Test user deleted — user_id={user_id}")
|
|
else:
|
|
print(f" [TEARDOWN ❌] Failed to delete test user — HTTP {resp.status_code} {resp.text[:200]}")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# change_password
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def test_change_password():
|
|
"""PATCH /user/{user_id}/change_password — valid new password."""
|
|
print("\n--- change_password ---")
|
|
|
|
resp = requests.patch(
|
|
f"{API_ROOT}/user/{_test_user_id}/change_password",
|
|
json={"password": NEW_PASSWORD},
|
|
headers=LEGACY_HEADERS,
|
|
)
|
|
success = resp.status_code == 200 and resp.json().get("data") is not False
|
|
print_result("Valid password change", success,
|
|
f"HTTP {resp.status_code}" if not success else "")
|
|
return success
|
|
|
|
|
|
def test_change_password_too_short():
|
|
"""PATCH /user/{user_id}/change_password — password < 10 chars → 400."""
|
|
resp = requests.patch(
|
|
f"{API_ROOT}/user/{_test_user_id}/change_password",
|
|
json={"password": "short"},
|
|
headers=LEGACY_HEADERS,
|
|
)
|
|
print_result("Short password rejected (400)", resp.status_code == 400,
|
|
f"HTTP {resp.status_code}")
|
|
|
|
|
|
def test_change_password_missing_field():
|
|
"""PATCH /user/{user_id}/change_password — no password field → 400."""
|
|
resp = requests.patch(
|
|
f"{API_ROOT}/user/{_test_user_id}/change_password",
|
|
json={"not_password": "whatever"},
|
|
headers=LEGACY_HEADERS,
|
|
)
|
|
print_result("Missing password field rejected (400)", resp.status_code == 400,
|
|
f"HTTP {resp.status_code}")
|
|
|
|
|
|
def test_change_password_invalid_user():
|
|
"""PATCH /user/{invalid_id}/change_password → 404."""
|
|
resp = requests.patch(
|
|
f"{API_ROOT}/user/NotARealUserID99/change_password",
|
|
json={"password": "ValidPassword123!"},
|
|
headers=LEGACY_HEADERS,
|
|
)
|
|
print_result("Invalid user_id rejected (404)", resp.status_code == 404,
|
|
f"HTTP {resp.status_code}")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# new_auth_key
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def test_new_auth_key():
|
|
"""GET /user/{user_id}/new_auth_key — generates and returns a new key."""
|
|
print("\n--- new_auth_key ---")
|
|
resp = requests.get(
|
|
f"{API_ROOT}/user/{_test_user_id}/new_auth_key",
|
|
headers=LEGACY_HEADERS,
|
|
)
|
|
data = resp.json().get("data", {})
|
|
has_key = isinstance(data, dict) and bool(data.get("auth_key"))
|
|
print_result("New auth_key generated", resp.status_code == 200 and has_key,
|
|
f"HTTP {resp.status_code}")
|
|
return data.get("auth_key") if has_key else None
|
|
|
|
|
|
def test_new_auth_key_invalid_user():
|
|
"""GET /user/{invalid_id}/new_auth_key → 404."""
|
|
resp = requests.get(
|
|
f"{API_ROOT}/user/NotARealUserID99/new_auth_key",
|
|
headers=LEGACY_HEADERS,
|
|
)
|
|
print_result("Invalid user_id rejected (404)", resp.status_code == 404,
|
|
f"HTTP {resp.status_code}")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# verify_password
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _verify_result(resp) -> bool:
|
|
"""Extract the boolean result from a legacy mk_resp response.
|
|
Primitive data is wrapped as {"data": {"result": value}}.
|
|
"""
|
|
data = resp.json().get("data", {})
|
|
if isinstance(data, dict):
|
|
return data.get("result")
|
|
return data
|
|
|
|
|
|
def test_verify_password_by_username_correct():
|
|
"""POST /user/verify_password — correct password via username → result True."""
|
|
print("\n--- verify_password ---")
|
|
resp = requests.post(
|
|
f"{API_ROOT}/user/verify_password",
|
|
json={"username": _test_username, "current_password": NEW_PASSWORD},
|
|
headers=LEGACY_HEADERS,
|
|
)
|
|
result = _verify_result(resp)
|
|
success = resp.status_code == 200 and result is True
|
|
print_result("Correct password (username path)", success,
|
|
f"HTTP {resp.status_code} result={result}")
|
|
|
|
|
|
def test_verify_password_by_username_wrong():
|
|
"""POST /user/verify_password — wrong password → result not True."""
|
|
resp = requests.post(
|
|
f"{API_ROOT}/user/verify_password",
|
|
json={"username": _test_username, "current_password": "WrongPassword999!"},
|
|
headers=LEGACY_HEADERS,
|
|
)
|
|
result = _verify_result(resp)
|
|
success = result is not True
|
|
print_result("Wrong password rejected", success,
|
|
f"HTTP {resp.status_code} result={result}")
|
|
|
|
|
|
def test_verify_password_by_user_id():
|
|
"""
|
|
POST /user/verify_password — correct password via Vision ID ('id' field).
|
|
|
|
The handler reads user_obj.id (User_Base Vision ID field). Send the
|
|
Vision ID as 'id' in the request body.
|
|
"""
|
|
resp = requests.post(
|
|
f"{API_ROOT}/user/verify_password",
|
|
json={"id": _test_user_id, "current_password": NEW_PASSWORD},
|
|
headers=LEGACY_HEADERS,
|
|
)
|
|
result = _verify_result(resp)
|
|
success = resp.status_code == 200 and result is True
|
|
print_result("Correct password (Vision ID / 'id' path)", success,
|
|
f"HTTP {resp.status_code} result={result}")
|
|
|
|
|
|
def test_verify_password_missing_fields():
|
|
"""POST /user/verify_password — no user_id or username → 400."""
|
|
resp = requests.post(
|
|
f"{API_ROOT}/user/verify_password",
|
|
json={"current_password": NEW_PASSWORD},
|
|
headers=LEGACY_HEADERS,
|
|
)
|
|
print_result("Missing user fields rejected (400)", resp.status_code == 400,
|
|
f"HTTP {resp.status_code}")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# lookup, lookup_email, lookup_username
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def test_lookup_by_account():
|
|
"""GET /user/lookup?for_obj_type=account&for_obj_id={account_id} — returns user list."""
|
|
print("\n--- lookup ---")
|
|
resp = requests.get(
|
|
f"{API_ROOT}/user/lookup",
|
|
params={"for_obj_type": "account", "for_obj_id": ACCOUNT_ID},
|
|
headers=LEGACY_HEADERS,
|
|
)
|
|
data = resp.json().get("data")
|
|
success = resp.status_code == 200 and isinstance(data, list) and len(data) > 0
|
|
print_result("Lookup by account (list)", success, f"HTTP {resp.status_code} count={len(data) if isinstance(data, list) else 'n/a'}")
|
|
|
|
# Vision ID check on first result
|
|
if success and isinstance(data, list) and data:
|
|
has_vision_id = assert_vision_id(data[0], "user_id")
|
|
print_result("Vision ID compliance (user_id is string)", has_vision_id,
|
|
f"user_id={data[0].get('user_id')!r}")
|
|
|
|
|
|
def test_lookup_by_person_invalid():
|
|
"""GET /user/lookup?for_obj_type=person&for_obj_id={bad_id} → 404."""
|
|
resp = requests.get(
|
|
f"{API_ROOT}/user/lookup",
|
|
params={"for_obj_type": "person", "for_obj_id": "NotARealUID999"},
|
|
headers=LEGACY_HEADERS,
|
|
)
|
|
print_result("Invalid person ID rejected (404)", resp.status_code == 404,
|
|
f"HTTP {resp.status_code}")
|
|
|
|
|
|
def test_lookup_bad_obj_type():
|
|
"""GET /user/lookup?for_obj_type=invalid → 404.
|
|
The redis lookup for for_obj_id against an unknown table returns None,
|
|
which triggers the 404 before the 400 type-check is reached.
|
|
"""
|
|
resp = requests.get(
|
|
f"{API_ROOT}/user/lookup",
|
|
params={"for_obj_type": "invoice", "for_obj_id": ACCOUNT_ID},
|
|
headers=LEGACY_HEADERS,
|
|
)
|
|
print_result("Unsupported for_obj_type returns 404", resp.status_code == 404,
|
|
f"HTTP {resp.status_code}")
|
|
|
|
|
|
def test_lookup_email():
|
|
"""GET /user/lookup_email?email={email} — finds the test user."""
|
|
print("\n--- lookup_email ---")
|
|
resp = requests.get(
|
|
f"{API_ROOT}/user/lookup_email",
|
|
params={"email": _test_email},
|
|
headers=LEGACY_HEADERS,
|
|
)
|
|
data = resp.json().get("data")
|
|
found = (
|
|
resp.status_code == 200
|
|
and isinstance(data, dict)
|
|
and data.get("email") == _test_email
|
|
)
|
|
print_result("Lookup by email (found)", found, f"HTTP {resp.status_code}")
|
|
|
|
if found:
|
|
has_vision_id = assert_vision_id(data, "user_id")
|
|
print_result("Vision ID compliance (user_id is string)", has_vision_id,
|
|
f"user_id={data.get('user_id')!r}")
|
|
|
|
|
|
def test_lookup_email_not_found():
|
|
"""GET /user/lookup_email?email={nonexistent} → 404."""
|
|
resp = requests.get(
|
|
f"{API_ROOT}/user/lookup_email",
|
|
params={"email": "nobody_at_all@test.invalid"},
|
|
headers=LEGACY_HEADERS,
|
|
)
|
|
print_result("Nonexistent email → 404", resp.status_code == 404,
|
|
f"HTTP {resp.status_code}")
|
|
|
|
|
|
def test_lookup_username():
|
|
"""GET /user/lookup_username?username={username} — finds the test user."""
|
|
print("\n--- lookup_username ---")
|
|
resp = requests.get(
|
|
f"{API_ROOT}/user/lookup_username",
|
|
params={"username": _test_username},
|
|
headers=LEGACY_HEADERS,
|
|
)
|
|
data = resp.json().get("data")
|
|
found = (
|
|
resp.status_code == 200
|
|
and isinstance(data, dict)
|
|
and data.get("username") == _test_username
|
|
)
|
|
print_result("Lookup by username (found)", found, f"HTTP {resp.status_code}")
|
|
|
|
if found:
|
|
has_vision_id = assert_vision_id(data, "user_id")
|
|
print_result("Vision ID compliance (user_id is string)", has_vision_id,
|
|
f"user_id={data.get('user_id')!r}")
|
|
|
|
|
|
def test_lookup_username_not_found():
|
|
"""GET /user/lookup_username?username={nonexistent} → 404."""
|
|
resp = requests.get(
|
|
f"{API_ROOT}/user/lookup_username",
|
|
params={"username": "no_such_user_xyz_99999"},
|
|
headers=LEGACY_HEADERS,
|
|
)
|
|
print_result("Nonexistent username → 404", resp.status_code == 404,
|
|
f"HTTP {resp.status_code}")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# email_auth_key_url
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def test_email_auth_key_url():
|
|
"""
|
|
GET /user/{user_id}/email_auth_key_url — generates auth key and sends email.
|
|
|
|
NOTE: The test user email uses '@test.invalid' domain, so actual mail
|
|
delivery will fail. This test verifies the route responds correctly;
|
|
expect HTTP 500 if the mail server rejects the send. The auth key IS
|
|
generated and stored regardless of email success.
|
|
"""
|
|
print("\n--- email_auth_key_url ---")
|
|
resp = requests.get(
|
|
f"{API_ROOT}/user/{_test_user_id}/email_auth_key_url",
|
|
params={"root_url": "https://dev-app.oneskyit.com"},
|
|
headers=LEGACY_HEADERS,
|
|
)
|
|
# 200 = email sent; 500 = route hit but email delivery failed (acceptable for .invalid)
|
|
route_hit = resp.status_code in [200, 500]
|
|
print_result("Route reachable", route_hit, f"HTTP {resp.status_code}"
|
|
+ (" (email delivery failed — expected for .invalid domain)" if resp.status_code == 500 else ""))
|
|
|
|
|
|
def test_email_auth_key_url_invalid_user():
|
|
"""GET /user/{invalid_id}/email_auth_key_url → 404."""
|
|
resp = requests.get(
|
|
f"{API_ROOT}/user/NotARealUserID99/email_auth_key_url",
|
|
params={"root_url": "https://dev-app.oneskyit.com"},
|
|
headers=LEGACY_HEADERS,
|
|
)
|
|
print_result("Invalid user_id rejected (404)", resp.status_code == 404,
|
|
f"HTTP {resp.status_code}")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# BUG VERIFICATION: user_authenticate route
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def test_authenticate():
|
|
"""
|
|
GET /user/authenticate — authenticate with username + password.
|
|
|
|
Note: The @router.get() decorator was accidentally commented out in a
|
|
prior version (user.py line 226). That bug has been fixed. This test
|
|
verifies the route is reachable and returns user data on success.
|
|
"""
|
|
print("\n--- authenticate ---")
|
|
resp = requests.get(
|
|
f"{API_ROOT}/user/authenticate",
|
|
params={"username": _test_username, "password": NEW_PASSWORD},
|
|
headers=LEGACY_HEADERS,
|
|
)
|
|
data = resp.json().get("data")
|
|
success = resp.status_code == 200 and isinstance(data, dict) and bool(data.get("user_id") or data.get("id"))
|
|
print_result("authenticate (username+password)", success, f"HTTP {resp.status_code}")
|
|
|
|
if success:
|
|
has_vision_id = assert_vision_id(data, "user_id")
|
|
print_result("Vision ID compliance (user_id is string)", has_vision_id,
|
|
f"user_id={data.get('user_id')!r}")
|
|
|
|
# Wrong password should be rejected
|
|
resp2 = requests.get(
|
|
f"{API_ROOT}/user/authenticate",
|
|
params={"username": _test_username, "password": "WrongPassword000!"},
|
|
headers=LEGACY_HEADERS,
|
|
)
|
|
print_result("Wrong password rejected", resp2.status_code in [200, 404] and resp2.json().get("data") is not True,
|
|
f"HTTP {resp2.status_code}")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Main
|
|
# ---------------------------------------------------------------------------
|
|
|
|
if __name__ == "__main__":
|
|
suite_start = time.time()
|
|
print("=" * 60)
|
|
print("User Auth Routes E2E Test Suite")
|
|
print(f"API: {API_ROOT}")
|
|
print("=" * 60)
|
|
|
|
# --- Setup ---
|
|
print("\n[Setup]")
|
|
user_id = setup_test_user()
|
|
if not user_id:
|
|
print("\n❌ Setup failed — cannot run tests. Aborting.")
|
|
sys.exit(1)
|
|
|
|
# --- Tests ---
|
|
test_change_password()
|
|
test_change_password_too_short()
|
|
test_change_password_missing_field()
|
|
test_change_password_invalid_user()
|
|
|
|
test_new_auth_key()
|
|
test_new_auth_key_invalid_user()
|
|
|
|
test_verify_password_by_username_correct()
|
|
test_verify_password_by_username_wrong()
|
|
test_verify_password_by_user_id()
|
|
test_verify_password_missing_fields()
|
|
|
|
test_lookup_by_account()
|
|
test_lookup_by_person_invalid()
|
|
test_lookup_bad_obj_type()
|
|
|
|
test_lookup_email()
|
|
test_lookup_email_not_found()
|
|
|
|
test_lookup_username()
|
|
test_lookup_username_not_found()
|
|
|
|
test_email_auth_key_url()
|
|
test_email_auth_key_url_invalid_user()
|
|
|
|
test_authenticate()
|
|
|
|
# --- Teardown ---
|
|
print("\n[Teardown]")
|
|
teardown_test_user(user_id)
|
|
|
|
elapsed = time.time() - suite_start
|
|
print(f"\n{'=' * 60}")
|
|
print(f"Suite completed in {elapsed:.2f}s")
|
|
print("=" * 60)
|