Files
OSIT-AE-API-FastAPI/tests/e2e/test_e2e_v3_user_action_routes.py
Scott Idem 687472f4e3 feat(user): V3 action endpoints + auth bug fixes (19/19 + 22/22 tests)
New router: /v3/action/user/ (api_v3_actions_user.py)
  - POST /authenticate  — credentials in body (not query params; security fix)
  - POST /verify_password
  - POST /{user_id}/change_password  — optional current-password verification
  - GET  /{user_id}/new_auth_key
  - GET  /{user_id}/email_auth_key_url
  Registered in registry.py under /v3/action/user with V3 AccountContext auth.

Bug fixes (from audit in previous session):
  - user.py: fix broken @router.get decorator (authenticate was unreachable)
  - user.py + user_methods.py: fix AttributeError id_random → id (Vision ID)
  - user_models.py: add fields_to_exclude_from_db to User_New_Base; narrow
    collision prevention to self-reference IDs only
  - user_models.py: pre-inject hashed password in root_validator(pre=True) so
    exclude_unset=True in CRUD POST handler includes it (was writing NULL)
  - api_crud_v3.py: move sanitize_payload + account_id injection to after
    model validation (fixes FK integer collision with Vision ID constraints)

Docs: GUIDE__AE_API_V3_for_Frontend.md — new Section 7 with full migration
  table (legacy → V3), request/response docs for all 5 action endpoints,
  and V3 CRUD search equivalents for the 3 lookup routes.

Tests: tests/e2e/test_e2e_v3_user_action_routes.py — 19 tests, 19/19 pass.
  Legacy tests/e2e/test_e2e_v3_user_auth_routes.py — 22/22 still pass.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-25 21:54:09 -04:00

499 lines
18 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
E2E Tests: V3 User Action Routes (app/routers/api_v3_actions_user.py)
======================================================================
Covers the new V3 action endpoints under /v3/action/user/:
- POST /v3/action/user/authenticate
- POST /v3/action/user/verify_password
- POST /v3/action/user/{user_id}/change_password
- GET /v3/action/user/{user_id}/new_auth_key
- GET /v3/action/user/{user_id}/email_auth_key_url
Setup: creates a temporary test user via V3 CRUD; tears down on completion.
Run from project root:
./environment/bin/python3 tests/e2e/test_e2e_v3_user_action_routes.py
"""
import os
import sys
import time
import requests
sys.path.append(os.getcwd())
# --- Configuration ---
API_ROOT = "https://dev-api.oneskyit.com"
API_KEY = "PMM4n50teUCaOMMTN8qOJA"
ACCOUNT_ID = "_XY7DXtc9MY" # One Sky IT Demo account
V3_HEADERS = {
"x-aether-api-key": API_KEY,
"x-account-id": ACCOUNT_ID,
}
TEST_PASSWORD = "TestAction1234!" # >= 10 chars
NEW_PASSWORD = "NewAction5678!" # used after change_password tests
# Populated during setup
_test_user_id = None # Vision ID (random string)
_test_username = None
_test_email = None
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def print_result(label, success, message=""):
status = "✅ PASS" if success else "❌ FAIL"
print(f" [{status}] {label}" + (f"{message}" if message else ""))
def assert_vision_id(obj, field_name="user_id"):
"""Returns True if field is a non-empty string of length 1122 (Vision ID)."""
val = obj.get(field_name) if isinstance(obj, dict) else None
return isinstance(val, str) and 11 <= len(val) <= 22
# ---------------------------------------------------------------------------
# Setup / Teardown
# ---------------------------------------------------------------------------
def setup_test_user():
"""Create a temporary test user via V3 CRUD. Returns the Vision ID or None."""
global _test_user_id, _test_username, _test_email
ts = int(time.time())
_test_username = f"test_v3act_e2e_{ts}"
_test_email = f"test_v3act_e2e_{ts}@test.invalid"
payload = {
"account_id": ACCOUNT_ID,
"username": _test_username,
"name": "E2E V3 Action Test User",
"email": _test_email,
"new_password": TEST_PASSWORD,
"enable": True,
"allow_auth_key": True,
}
resp = requests.post(f"{API_ROOT}/v3/crud/user/", json=payload, headers=V3_HEADERS)
if resp.status_code != 200:
print(f" [SETUP ❌] Failed to create test user — HTTP {resp.status_code}")
print(f" {resp.text[:300]}")
return None
data = resp.json().get("data", {})
_test_user_id = data.get("user_id") or data.get("id")
if not _test_user_id:
print(f" [SETUP ❌] Test user created but no Vision ID returned: {data}")
return None
print(f" [SETUP ✅] Test user created — user_id={_test_user_id} username={_test_username}")
return _test_user_id
def teardown_test_user(user_id):
"""Delete the test user via V3 CRUD."""
if not user_id:
return
resp = requests.delete(f"{API_ROOT}/v3/crud/user/{user_id}", headers=V3_HEADERS)
if resp.status_code == 200:
print(f" [TEARDOWN ✅] Test user deleted — user_id={user_id}")
else:
print(f" [TEARDOWN ❌] Failed to delete test user — HTTP {resp.status_code} {resp.text[:200]}")
# ---------------------------------------------------------------------------
# authenticate
# ---------------------------------------------------------------------------
def test_authenticate_username_password():
"""POST /v3/action/user/authenticate — valid username + password."""
print("\n--- authenticate ---")
resp = requests.post(
f"{API_ROOT}/v3/action/user/authenticate",
json={"username": _test_username, "password": TEST_PASSWORD},
headers=V3_HEADERS,
)
data = resp.json().get("data", {})
vision_ok = assert_vision_id(data, "user_id")
success = resp.status_code == 200 and vision_ok
print_result("Valid username+password", success,
f"HTTP {resp.status_code}" + ("" if vision_ok else " — missing Vision ID"))
return success
def test_authenticate_wrong_password():
"""POST /v3/action/user/authenticate — wrong password → 403."""
resp = requests.post(
f"{API_ROOT}/v3/action/user/authenticate",
json={"username": _test_username, "password": "WrongPassword999!"},
headers=V3_HEADERS,
)
success = resp.status_code == 403
print_result("Wrong password → 403", success, f"HTTP {resp.status_code}")
return success
def test_authenticate_unknown_user():
"""POST /v3/action/user/authenticate — unknown username → 404."""
resp = requests.post(
f"{API_ROOT}/v3/action/user/authenticate",
json={"username": "no_such_user_xyzzy", "password": TEST_PASSWORD},
headers=V3_HEADERS,
)
success = resp.status_code == 404
print_result("Unknown username → 404", success, f"HTTP {resp.status_code}")
return success
def test_authenticate_missing_fields():
"""POST /v3/action/user/authenticate — no credentials → 400."""
resp = requests.post(
f"{API_ROOT}/v3/action/user/authenticate",
json={"username": _test_username}, # password missing
headers=V3_HEADERS,
)
success = resp.status_code == 400
print_result("Missing credentials → 400", success, f"HTTP {resp.status_code}")
return success
def test_authenticate_auth_key_flow():
"""
Full auth-key flow:
1. GET new_auth_key → get a key
2. POST authenticate with user_id + auth_key → success
3. POST authenticate again with same key → 404 (key cleared)
"""
print("\n--- authenticate (auth_key flow) ---")
# Step 1: generate key
resp1 = requests.get(
f"{API_ROOT}/v3/action/user/{_test_user_id}/new_auth_key",
headers=V3_HEADERS,
)
if resp1.status_code != 200:
print_result("Auth key flow — generate key", False, f"HTTP {resp1.status_code}")
return False
key = resp1.json().get("data", {}).get("auth_key")
if not key:
print_result("Auth key flow — generate key", False, "No auth_key in response")
return False
# Step 2: authenticate with key
resp2 = requests.post(
f"{API_ROOT}/v3/action/user/authenticate",
json={"user_id": _test_user_id, "auth_key": key},
headers=V3_HEADERS,
)
data2 = resp2.json().get("data", {})
step2_ok = resp2.status_code == 200 and assert_vision_id(data2, "user_id")
print_result("Auth key flow — first use succeeds", step2_ok,
f"HTTP {resp2.status_code}")
# Step 3: replay must fail (key is cleared)
resp3 = requests.post(
f"{API_ROOT}/v3/action/user/authenticate",
json={"user_id": _test_user_id, "auth_key": key},
headers=V3_HEADERS,
)
step3_ok = resp3.status_code == 404
print_result("Auth key flow — replay → 404 (one-time-use)", step3_ok,
f"HTTP {resp3.status_code}")
return step2_ok and step3_ok
# ---------------------------------------------------------------------------
# verify_password
# ---------------------------------------------------------------------------
def test_verify_password_by_user_id():
"""POST /v3/action/user/verify_password — correct password by user_id."""
print("\n--- verify_password ---")
resp = requests.post(
f"{API_ROOT}/v3/action/user/verify_password",
json={"user_id": _test_user_id, "current_password": TEST_PASSWORD},
headers=V3_HEADERS,
)
data = resp.json().get("data")
# Primitive True is wrapped as {"result": True}
result = data.get("result") if isinstance(data, dict) else data
success = resp.status_code == 200 and result is True
print_result("Correct password by user_id → True", success, f"HTTP {resp.status_code}")
return success
def test_verify_password_by_username():
"""POST /v3/action/user/verify_password — correct password by username."""
resp = requests.post(
f"{API_ROOT}/v3/action/user/verify_password",
json={"username": _test_username, "current_password": TEST_PASSWORD},
headers=V3_HEADERS,
)
data = resp.json().get("data")
result = data.get("result") if isinstance(data, dict) else data
success = resp.status_code == 200 and result is True
print_result("Correct password by username → True", success, f"HTTP {resp.status_code}")
return success
def test_verify_password_wrong():
"""POST /v3/action/user/verify_password — wrong password → 403."""
resp = requests.post(
f"{API_ROOT}/v3/action/user/verify_password",
json={"user_id": _test_user_id, "current_password": "WrongPassword999!"},
headers=V3_HEADERS,
)
success = resp.status_code == 403
print_result("Wrong password → 403", success, f"HTTP {resp.status_code}")
return success
def test_verify_password_no_identifier():
"""POST /v3/action/user/verify_password — no user_id or username → 400."""
resp = requests.post(
f"{API_ROOT}/v3/action/user/verify_password",
json={"current_password": TEST_PASSWORD},
headers=V3_HEADERS,
)
success = resp.status_code == 400
print_result("No identifier → 400", success, f"HTTP {resp.status_code}")
return success
# ---------------------------------------------------------------------------
# change_password
# ---------------------------------------------------------------------------
def test_change_password_no_verification():
"""POST /v3/action/user/{id}/change_password — no current_password (admin reset)."""
print("\n--- change_password ---")
resp = requests.post(
f"{API_ROOT}/v3/action/user/{_test_user_id}/change_password",
json={"new_password": NEW_PASSWORD},
headers=V3_HEADERS,
)
data = resp.json().get("data")
result = data.get("result") if isinstance(data, dict) else data
success = resp.status_code == 200 and result is True
print_result("Change password (no verification)", success, f"HTTP {resp.status_code}")
# Verify the new password works
resp2 = requests.post(
f"{API_ROOT}/v3/action/user/verify_password",
json={"user_id": _test_user_id, "current_password": NEW_PASSWORD},
headers=V3_HEADERS,
)
data2 = resp2.json().get("data")
r2 = data2.get("result") if isinstance(data2, dict) else data2
verify_ok = resp2.status_code == 200 and r2 is True
print_result("New password accepted by verify_password", verify_ok,
f"HTTP {resp2.status_code}")
return success and verify_ok
def test_change_password_with_verification():
"""POST /v3/action/user/{id}/change_password — with correct current_password."""
# Password is currently NEW_PASSWORD (set by previous test)
resp = requests.post(
f"{API_ROOT}/v3/action/user/{_test_user_id}/change_password",
json={"current_password": NEW_PASSWORD, "new_password": TEST_PASSWORD},
headers=V3_HEADERS,
)
data = resp.json().get("data")
result = data.get("result") if isinstance(data, dict) else data
success = resp.status_code == 200 and result is True
print_result("Change password with correct current_password", success,
f"HTTP {resp.status_code}")
return success
def test_change_password_wrong_current():
"""POST /v3/action/user/{id}/change_password — wrong current_password → 403."""
resp = requests.post(
f"{API_ROOT}/v3/action/user/{_test_user_id}/change_password",
json={"current_password": "WrongPassword999!", "new_password": NEW_PASSWORD},
headers=V3_HEADERS,
)
success = resp.status_code == 403
print_result("Wrong current_password → 403", success, f"HTTP {resp.status_code}")
return success
def test_change_password_too_short():
"""POST /v3/action/user/{id}/change_password — new_password < 10 chars → 422."""
resp = requests.post(
f"{API_ROOT}/v3/action/user/{_test_user_id}/change_password",
json={"new_password": "short"},
headers=V3_HEADERS,
)
# Pydantic validation rejects min_length constraint with 422 Unprocessable Entity
success = resp.status_code == 422
print_result("new_password too short → 422", success, f"HTTP {resp.status_code}")
return success
def test_change_password_bad_user():
"""POST /v3/action/user/{id}/change_password — invalid user_id → 404."""
resp = requests.post(
f"{API_ROOT}/v3/action/user/AAAAAAAAAAA/change_password",
json={"new_password": "ValidPassword123!"},
headers=V3_HEADERS,
)
success = resp.status_code == 404
print_result("Invalid user_id → 404", success, f"HTTP {resp.status_code}")
return success
# ---------------------------------------------------------------------------
# new_auth_key
# ---------------------------------------------------------------------------
def test_new_auth_key():
"""GET /v3/action/user/{user_id}/new_auth_key — generates and returns key."""
print("\n--- new_auth_key ---")
resp = requests.get(
f"{API_ROOT}/v3/action/user/{_test_user_id}/new_auth_key",
headers=V3_HEADERS,
)
data = resp.json().get("data", {})
key = data.get("auth_key") if isinstance(data, dict) else None
success = resp.status_code == 200 and isinstance(key, str) and len(key) >= 11
print_result("Returns new auth_key string", success,
f"HTTP {resp.status_code}" + (f" key={key!r}" if success else ""))
return success
def test_new_auth_key_bad_user():
"""GET /v3/action/user/{user_id}/new_auth_key — invalid user → 404."""
resp = requests.get(
f"{API_ROOT}/v3/action/user/AAAAAAAAAAA/new_auth_key",
headers=V3_HEADERS,
)
success = resp.status_code == 404
print_result("Invalid user_id → 404", success, f"HTTP {resp.status_code}")
return success
# ---------------------------------------------------------------------------
# email_auth_key_url
# ---------------------------------------------------------------------------
def test_email_auth_key_url():
"""GET /v3/action/user/{user_id}/email_auth_key_url — sends or fails gracefully."""
print("\n--- email_auth_key_url ---")
resp = requests.get(
f"{API_ROOT}/v3/action/user/{_test_user_id}/email_auth_key_url",
params={"root_url": "https://test.invalid/login"},
headers=V3_HEADERS,
)
# 200 = email sent; 500 = delivery failed (.invalid domain) — both are acceptable.
success = resp.status_code in (200, 500)
print_result(
"email_auth_key_url (200=sent, 500=delivery failed — both OK for .invalid domain)",
success, f"HTTP {resp.status_code}"
)
return success
def test_email_auth_key_url_bad_user():
"""GET /v3/action/user/{user_id}/email_auth_key_url — invalid user → 404."""
resp = requests.get(
f"{API_ROOT}/v3/action/user/AAAAAAAAAAA/email_auth_key_url",
params={"root_url": "https://test.invalid/login"},
headers=V3_HEADERS,
)
success = resp.status_code == 404
print_result("Invalid user_id → 404", success, f"HTTP {resp.status_code}")
return success
# ---------------------------------------------------------------------------
# Auth guard checks
# ---------------------------------------------------------------------------
def test_no_api_key():
"""All V3 action endpoints require x-aether-api-key — missing → 403."""
print("\n--- auth guards ---")
resp = requests.post(
f"{API_ROOT}/v3/action/user/authenticate",
json={"username": _test_username, "password": TEST_PASSWORD},
headers={"x-account-id": ACCOUNT_ID}, # no API key
)
success = resp.status_code == 403
print_result("No API key → 403", success, f"HTTP {resp.status_code}")
return success
# ---------------------------------------------------------------------------
# Runner
# ---------------------------------------------------------------------------
def run_suite():
start = time.time()
print("=" * 60)
print("E2E: V3 User Action Routes")
print("=" * 60)
if not setup_test_user():
print("\n[ABORT] Setup failed — cannot run tests.\n")
return
results = []
# authenticate
results.append(test_authenticate_username_password())
results.append(test_authenticate_wrong_password())
results.append(test_authenticate_unknown_user())
results.append(test_authenticate_missing_fields())
results.append(test_authenticate_auth_key_flow())
# verify_password
results.append(test_verify_password_by_user_id())
results.append(test_verify_password_by_username())
results.append(test_verify_password_wrong())
results.append(test_verify_password_no_identifier())
# change_password (order matters — each test assumes the password left by the previous)
results.append(test_change_password_no_verification()) # TEST → NEW
results.append(test_change_password_with_verification()) # NEW → TEST
results.append(test_change_password_wrong_current()) # bad → 403 (no change)
results.append(test_change_password_too_short()) # bad → 422
results.append(test_change_password_bad_user()) # 404
# new_auth_key
results.append(test_new_auth_key())
results.append(test_new_auth_key_bad_user())
# email_auth_key_url
results.append(test_email_auth_key_url())
results.append(test_email_auth_key_url_bad_user())
# auth guards
results.append(test_no_api_key())
teardown_test_user(_test_user_id)
elapsed = time.time() - start
passed = sum(1 for r in results if r)
total = len(results)
print(f"\n{'=' * 60}")
print(f"Results: {passed}/{total} passed ({elapsed:.2f}s)")
print("=" * 60)
if __name__ == "__main__":
run_suite()