New router: /v3/action/user/ (api_v3_actions_user.py)
- POST /authenticate — credentials in body (not query params; security fix)
- POST /verify_password
- POST /{user_id}/change_password — optional current-password verification
- GET /{user_id}/new_auth_key
- GET /{user_id}/email_auth_key_url
Registered in registry.py under /v3/action/user with V3 AccountContext auth.
Bug fixes (from audit in previous session):
- user.py: fix broken @router.get decorator (authenticate was unreachable)
- user.py + user_methods.py: fix AttributeError id_random → id (Vision ID)
- user_models.py: add fields_to_exclude_from_db to User_New_Base; narrow
collision prevention to self-reference IDs only
- user_models.py: pre-inject hashed password in root_validator(pre=True) so
exclude_unset=True in CRUD POST handler includes it (was writing NULL)
- api_crud_v3.py: move sanitize_payload + account_id injection to after
model validation (fixes FK integer collision with Vision ID constraints)
Docs: GUIDE__AE_API_V3_for_Frontend.md — new Section 7 with full migration
table (legacy → V3), request/response docs for all 5 action endpoints,
and V3 CRUD search equivalents for the 3 lookup routes.
Tests: tests/e2e/test_e2e_v3_user_action_routes.py — 19 tests, 19/19 pass.
Legacy tests/e2e/test_e2e_v3_user_auth_routes.py — 22/22 still pass.
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
499 lines
18 KiB
Python
499 lines
18 KiB
Python
"""
|
||
E2E Tests: V3 User Action Routes (app/routers/api_v3_actions_user.py)
|
||
======================================================================
|
||
Covers the new V3 action endpoints under /v3/action/user/:
|
||
- POST /v3/action/user/authenticate
|
||
- POST /v3/action/user/verify_password
|
||
- POST /v3/action/user/{user_id}/change_password
|
||
- GET /v3/action/user/{user_id}/new_auth_key
|
||
- GET /v3/action/user/{user_id}/email_auth_key_url
|
||
|
||
Setup: creates a temporary test user via V3 CRUD; tears down on completion.
|
||
|
||
Run from project root:
|
||
./environment/bin/python3 tests/e2e/test_e2e_v3_user_action_routes.py
|
||
"""
|
||
|
||
import os
|
||
import sys
|
||
import time
|
||
import requests
|
||
|
||
sys.path.append(os.getcwd())
|
||
|
||
# --- Configuration ---
|
||
API_ROOT = "https://dev-api.oneskyit.com"
|
||
API_KEY = "PMM4n50teUCaOMMTN8qOJA"
|
||
ACCOUNT_ID = "_XY7DXtc9MY" # One Sky IT Demo account
|
||
|
||
V3_HEADERS = {
|
||
"x-aether-api-key": API_KEY,
|
||
"x-account-id": ACCOUNT_ID,
|
||
}
|
||
|
||
TEST_PASSWORD = "TestAction1234!" # >= 10 chars
|
||
NEW_PASSWORD = "NewAction5678!" # used after change_password tests
|
||
|
||
# Populated during setup
|
||
_test_user_id = None # Vision ID (random string)
|
||
_test_username = None
|
||
_test_email = None
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Helpers
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def print_result(label, success, message=""):
|
||
status = "✅ PASS" if success else "❌ FAIL"
|
||
print(f" [{status}] {label}" + (f" — {message}" if message else ""))
|
||
|
||
|
||
def assert_vision_id(obj, field_name="user_id"):
|
||
"""Returns True if field is a non-empty string of length 11–22 (Vision ID)."""
|
||
val = obj.get(field_name) if isinstance(obj, dict) else None
|
||
return isinstance(val, str) and 11 <= len(val) <= 22
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Setup / Teardown
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def setup_test_user():
|
||
"""Create a temporary test user via V3 CRUD. Returns the Vision ID or None."""
|
||
global _test_user_id, _test_username, _test_email
|
||
|
||
ts = int(time.time())
|
||
_test_username = f"test_v3act_e2e_{ts}"
|
||
_test_email = f"test_v3act_e2e_{ts}@test.invalid"
|
||
|
||
payload = {
|
||
"account_id": ACCOUNT_ID,
|
||
"username": _test_username,
|
||
"name": "E2E V3 Action Test User",
|
||
"email": _test_email,
|
||
"new_password": TEST_PASSWORD,
|
||
"enable": True,
|
||
"allow_auth_key": True,
|
||
}
|
||
|
||
resp = requests.post(f"{API_ROOT}/v3/crud/user/", json=payload, headers=V3_HEADERS)
|
||
|
||
if resp.status_code != 200:
|
||
print(f" [SETUP ❌] Failed to create test user — HTTP {resp.status_code}")
|
||
print(f" {resp.text[:300]}")
|
||
return None
|
||
|
||
data = resp.json().get("data", {})
|
||
_test_user_id = data.get("user_id") or data.get("id")
|
||
|
||
if not _test_user_id:
|
||
print(f" [SETUP ❌] Test user created but no Vision ID returned: {data}")
|
||
return None
|
||
|
||
print(f" [SETUP ✅] Test user created — user_id={_test_user_id} username={_test_username}")
|
||
return _test_user_id
|
||
|
||
|
||
def teardown_test_user(user_id):
|
||
"""Delete the test user via V3 CRUD."""
|
||
if not user_id:
|
||
return
|
||
resp = requests.delete(f"{API_ROOT}/v3/crud/user/{user_id}", headers=V3_HEADERS)
|
||
if resp.status_code == 200:
|
||
print(f" [TEARDOWN ✅] Test user deleted — user_id={user_id}")
|
||
else:
|
||
print(f" [TEARDOWN ❌] Failed to delete test user — HTTP {resp.status_code} {resp.text[:200]}")
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# authenticate
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def test_authenticate_username_password():
|
||
"""POST /v3/action/user/authenticate — valid username + password."""
|
||
print("\n--- authenticate ---")
|
||
|
||
resp = requests.post(
|
||
f"{API_ROOT}/v3/action/user/authenticate",
|
||
json={"username": _test_username, "password": TEST_PASSWORD},
|
||
headers=V3_HEADERS,
|
||
)
|
||
data = resp.json().get("data", {})
|
||
vision_ok = assert_vision_id(data, "user_id")
|
||
success = resp.status_code == 200 and vision_ok
|
||
print_result("Valid username+password", success,
|
||
f"HTTP {resp.status_code}" + ("" if vision_ok else " — missing Vision ID"))
|
||
return success
|
||
|
||
|
||
def test_authenticate_wrong_password():
|
||
"""POST /v3/action/user/authenticate — wrong password → 403."""
|
||
resp = requests.post(
|
||
f"{API_ROOT}/v3/action/user/authenticate",
|
||
json={"username": _test_username, "password": "WrongPassword999!"},
|
||
headers=V3_HEADERS,
|
||
)
|
||
success = resp.status_code == 403
|
||
print_result("Wrong password → 403", success, f"HTTP {resp.status_code}")
|
||
return success
|
||
|
||
|
||
def test_authenticate_unknown_user():
|
||
"""POST /v3/action/user/authenticate — unknown username → 404."""
|
||
resp = requests.post(
|
||
f"{API_ROOT}/v3/action/user/authenticate",
|
||
json={"username": "no_such_user_xyzzy", "password": TEST_PASSWORD},
|
||
headers=V3_HEADERS,
|
||
)
|
||
success = resp.status_code == 404
|
||
print_result("Unknown username → 404", success, f"HTTP {resp.status_code}")
|
||
return success
|
||
|
||
|
||
def test_authenticate_missing_fields():
|
||
"""POST /v3/action/user/authenticate — no credentials → 400."""
|
||
resp = requests.post(
|
||
f"{API_ROOT}/v3/action/user/authenticate",
|
||
json={"username": _test_username}, # password missing
|
||
headers=V3_HEADERS,
|
||
)
|
||
success = resp.status_code == 400
|
||
print_result("Missing credentials → 400", success, f"HTTP {resp.status_code}")
|
||
return success
|
||
|
||
|
||
def test_authenticate_auth_key_flow():
|
||
"""
|
||
Full auth-key flow:
|
||
1. GET new_auth_key → get a key
|
||
2. POST authenticate with user_id + auth_key → success
|
||
3. POST authenticate again with same key → 404 (key cleared)
|
||
"""
|
||
print("\n--- authenticate (auth_key flow) ---")
|
||
|
||
# Step 1: generate key
|
||
resp1 = requests.get(
|
||
f"{API_ROOT}/v3/action/user/{_test_user_id}/new_auth_key",
|
||
headers=V3_HEADERS,
|
||
)
|
||
if resp1.status_code != 200:
|
||
print_result("Auth key flow — generate key", False, f"HTTP {resp1.status_code}")
|
||
return False
|
||
key = resp1.json().get("data", {}).get("auth_key")
|
||
if not key:
|
||
print_result("Auth key flow — generate key", False, "No auth_key in response")
|
||
return False
|
||
|
||
# Step 2: authenticate with key
|
||
resp2 = requests.post(
|
||
f"{API_ROOT}/v3/action/user/authenticate",
|
||
json={"user_id": _test_user_id, "auth_key": key},
|
||
headers=V3_HEADERS,
|
||
)
|
||
data2 = resp2.json().get("data", {})
|
||
step2_ok = resp2.status_code == 200 and assert_vision_id(data2, "user_id")
|
||
print_result("Auth key flow — first use succeeds", step2_ok,
|
||
f"HTTP {resp2.status_code}")
|
||
|
||
# Step 3: replay must fail (key is cleared)
|
||
resp3 = requests.post(
|
||
f"{API_ROOT}/v3/action/user/authenticate",
|
||
json={"user_id": _test_user_id, "auth_key": key},
|
||
headers=V3_HEADERS,
|
||
)
|
||
step3_ok = resp3.status_code == 404
|
||
print_result("Auth key flow — replay → 404 (one-time-use)", step3_ok,
|
||
f"HTTP {resp3.status_code}")
|
||
|
||
return step2_ok and step3_ok
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# verify_password
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def test_verify_password_by_user_id():
|
||
"""POST /v3/action/user/verify_password — correct password by user_id."""
|
||
print("\n--- verify_password ---")
|
||
|
||
resp = requests.post(
|
||
f"{API_ROOT}/v3/action/user/verify_password",
|
||
json={"user_id": _test_user_id, "current_password": TEST_PASSWORD},
|
||
headers=V3_HEADERS,
|
||
)
|
||
data = resp.json().get("data")
|
||
# Primitive True is wrapped as {"result": True}
|
||
result = data.get("result") if isinstance(data, dict) else data
|
||
success = resp.status_code == 200 and result is True
|
||
print_result("Correct password by user_id → True", success, f"HTTP {resp.status_code}")
|
||
return success
|
||
|
||
|
||
def test_verify_password_by_username():
|
||
"""POST /v3/action/user/verify_password — correct password by username."""
|
||
resp = requests.post(
|
||
f"{API_ROOT}/v3/action/user/verify_password",
|
||
json={"username": _test_username, "current_password": TEST_PASSWORD},
|
||
headers=V3_HEADERS,
|
||
)
|
||
data = resp.json().get("data")
|
||
result = data.get("result") if isinstance(data, dict) else data
|
||
success = resp.status_code == 200 and result is True
|
||
print_result("Correct password by username → True", success, f"HTTP {resp.status_code}")
|
||
return success
|
||
|
||
|
||
def test_verify_password_wrong():
|
||
"""POST /v3/action/user/verify_password — wrong password → 403."""
|
||
resp = requests.post(
|
||
f"{API_ROOT}/v3/action/user/verify_password",
|
||
json={"user_id": _test_user_id, "current_password": "WrongPassword999!"},
|
||
headers=V3_HEADERS,
|
||
)
|
||
success = resp.status_code == 403
|
||
print_result("Wrong password → 403", success, f"HTTP {resp.status_code}")
|
||
return success
|
||
|
||
|
||
def test_verify_password_no_identifier():
|
||
"""POST /v3/action/user/verify_password — no user_id or username → 400."""
|
||
resp = requests.post(
|
||
f"{API_ROOT}/v3/action/user/verify_password",
|
||
json={"current_password": TEST_PASSWORD},
|
||
headers=V3_HEADERS,
|
||
)
|
||
success = resp.status_code == 400
|
||
print_result("No identifier → 400", success, f"HTTP {resp.status_code}")
|
||
return success
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# change_password
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def test_change_password_no_verification():
|
||
"""POST /v3/action/user/{id}/change_password — no current_password (admin reset)."""
|
||
print("\n--- change_password ---")
|
||
|
||
resp = requests.post(
|
||
f"{API_ROOT}/v3/action/user/{_test_user_id}/change_password",
|
||
json={"new_password": NEW_PASSWORD},
|
||
headers=V3_HEADERS,
|
||
)
|
||
data = resp.json().get("data")
|
||
result = data.get("result") if isinstance(data, dict) else data
|
||
success = resp.status_code == 200 and result is True
|
||
print_result("Change password (no verification)", success, f"HTTP {resp.status_code}")
|
||
|
||
# Verify the new password works
|
||
resp2 = requests.post(
|
||
f"{API_ROOT}/v3/action/user/verify_password",
|
||
json={"user_id": _test_user_id, "current_password": NEW_PASSWORD},
|
||
headers=V3_HEADERS,
|
||
)
|
||
data2 = resp2.json().get("data")
|
||
r2 = data2.get("result") if isinstance(data2, dict) else data2
|
||
verify_ok = resp2.status_code == 200 and r2 is True
|
||
print_result("New password accepted by verify_password", verify_ok,
|
||
f"HTTP {resp2.status_code}")
|
||
|
||
return success and verify_ok
|
||
|
||
|
||
def test_change_password_with_verification():
|
||
"""POST /v3/action/user/{id}/change_password — with correct current_password."""
|
||
# Password is currently NEW_PASSWORD (set by previous test)
|
||
resp = requests.post(
|
||
f"{API_ROOT}/v3/action/user/{_test_user_id}/change_password",
|
||
json={"current_password": NEW_PASSWORD, "new_password": TEST_PASSWORD},
|
||
headers=V3_HEADERS,
|
||
)
|
||
data = resp.json().get("data")
|
||
result = data.get("result") if isinstance(data, dict) else data
|
||
success = resp.status_code == 200 and result is True
|
||
print_result("Change password with correct current_password", success,
|
||
f"HTTP {resp.status_code}")
|
||
return success
|
||
|
||
|
||
def test_change_password_wrong_current():
|
||
"""POST /v3/action/user/{id}/change_password — wrong current_password → 403."""
|
||
resp = requests.post(
|
||
f"{API_ROOT}/v3/action/user/{_test_user_id}/change_password",
|
||
json={"current_password": "WrongPassword999!", "new_password": NEW_PASSWORD},
|
||
headers=V3_HEADERS,
|
||
)
|
||
success = resp.status_code == 403
|
||
print_result("Wrong current_password → 403", success, f"HTTP {resp.status_code}")
|
||
return success
|
||
|
||
|
||
def test_change_password_too_short():
|
||
"""POST /v3/action/user/{id}/change_password — new_password < 10 chars → 422."""
|
||
resp = requests.post(
|
||
f"{API_ROOT}/v3/action/user/{_test_user_id}/change_password",
|
||
json={"new_password": "short"},
|
||
headers=V3_HEADERS,
|
||
)
|
||
# Pydantic validation rejects min_length constraint with 422 Unprocessable Entity
|
||
success = resp.status_code == 422
|
||
print_result("new_password too short → 422", success, f"HTTP {resp.status_code}")
|
||
return success
|
||
|
||
|
||
def test_change_password_bad_user():
|
||
"""POST /v3/action/user/{id}/change_password — invalid user_id → 404."""
|
||
resp = requests.post(
|
||
f"{API_ROOT}/v3/action/user/AAAAAAAAAAA/change_password",
|
||
json={"new_password": "ValidPassword123!"},
|
||
headers=V3_HEADERS,
|
||
)
|
||
success = resp.status_code == 404
|
||
print_result("Invalid user_id → 404", success, f"HTTP {resp.status_code}")
|
||
return success
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# new_auth_key
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def test_new_auth_key():
|
||
"""GET /v3/action/user/{user_id}/new_auth_key — generates and returns key."""
|
||
print("\n--- new_auth_key ---")
|
||
|
||
resp = requests.get(
|
||
f"{API_ROOT}/v3/action/user/{_test_user_id}/new_auth_key",
|
||
headers=V3_HEADERS,
|
||
)
|
||
data = resp.json().get("data", {})
|
||
key = data.get("auth_key") if isinstance(data, dict) else None
|
||
success = resp.status_code == 200 and isinstance(key, str) and len(key) >= 11
|
||
print_result("Returns new auth_key string", success,
|
||
f"HTTP {resp.status_code}" + (f" key={key!r}" if success else ""))
|
||
return success
|
||
|
||
|
||
def test_new_auth_key_bad_user():
|
||
"""GET /v3/action/user/{user_id}/new_auth_key — invalid user → 404."""
|
||
resp = requests.get(
|
||
f"{API_ROOT}/v3/action/user/AAAAAAAAAAA/new_auth_key",
|
||
headers=V3_HEADERS,
|
||
)
|
||
success = resp.status_code == 404
|
||
print_result("Invalid user_id → 404", success, f"HTTP {resp.status_code}")
|
||
return success
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# email_auth_key_url
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def test_email_auth_key_url():
|
||
"""GET /v3/action/user/{user_id}/email_auth_key_url — sends or fails gracefully."""
|
||
print("\n--- email_auth_key_url ---")
|
||
|
||
resp = requests.get(
|
||
f"{API_ROOT}/v3/action/user/{_test_user_id}/email_auth_key_url",
|
||
params={"root_url": "https://test.invalid/login"},
|
||
headers=V3_HEADERS,
|
||
)
|
||
# 200 = email sent; 500 = delivery failed (.invalid domain) — both are acceptable.
|
||
success = resp.status_code in (200, 500)
|
||
print_result(
|
||
"email_auth_key_url (200=sent, 500=delivery failed — both OK for .invalid domain)",
|
||
success, f"HTTP {resp.status_code}"
|
||
)
|
||
return success
|
||
|
||
|
||
def test_email_auth_key_url_bad_user():
|
||
"""GET /v3/action/user/{user_id}/email_auth_key_url — invalid user → 404."""
|
||
resp = requests.get(
|
||
f"{API_ROOT}/v3/action/user/AAAAAAAAAAA/email_auth_key_url",
|
||
params={"root_url": "https://test.invalid/login"},
|
||
headers=V3_HEADERS,
|
||
)
|
||
success = resp.status_code == 404
|
||
print_result("Invalid user_id → 404", success, f"HTTP {resp.status_code}")
|
||
return success
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Auth guard checks
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def test_no_api_key():
|
||
"""All V3 action endpoints require x-aether-api-key — missing → 403."""
|
||
print("\n--- auth guards ---")
|
||
|
||
resp = requests.post(
|
||
f"{API_ROOT}/v3/action/user/authenticate",
|
||
json={"username": _test_username, "password": TEST_PASSWORD},
|
||
headers={"x-account-id": ACCOUNT_ID}, # no API key
|
||
)
|
||
success = resp.status_code == 403
|
||
print_result("No API key → 403", success, f"HTTP {resp.status_code}")
|
||
return success
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Runner
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def run_suite():
|
||
start = time.time()
|
||
print("=" * 60)
|
||
print("E2E: V3 User Action Routes")
|
||
print("=" * 60)
|
||
|
||
if not setup_test_user():
|
||
print("\n[ABORT] Setup failed — cannot run tests.\n")
|
||
return
|
||
|
||
results = []
|
||
|
||
# authenticate
|
||
results.append(test_authenticate_username_password())
|
||
results.append(test_authenticate_wrong_password())
|
||
results.append(test_authenticate_unknown_user())
|
||
results.append(test_authenticate_missing_fields())
|
||
results.append(test_authenticate_auth_key_flow())
|
||
|
||
# verify_password
|
||
results.append(test_verify_password_by_user_id())
|
||
results.append(test_verify_password_by_username())
|
||
results.append(test_verify_password_wrong())
|
||
results.append(test_verify_password_no_identifier())
|
||
|
||
# change_password (order matters — each test assumes the password left by the previous)
|
||
results.append(test_change_password_no_verification()) # TEST → NEW
|
||
results.append(test_change_password_with_verification()) # NEW → TEST
|
||
results.append(test_change_password_wrong_current()) # bad → 403 (no change)
|
||
results.append(test_change_password_too_short()) # bad → 422
|
||
results.append(test_change_password_bad_user()) # 404
|
||
|
||
# new_auth_key
|
||
results.append(test_new_auth_key())
|
||
results.append(test_new_auth_key_bad_user())
|
||
|
||
# email_auth_key_url
|
||
results.append(test_email_auth_key_url())
|
||
results.append(test_email_auth_key_url_bad_user())
|
||
|
||
# auth guards
|
||
results.append(test_no_api_key())
|
||
|
||
teardown_test_user(_test_user_id)
|
||
|
||
elapsed = time.time() - start
|
||
passed = sum(1 for r in results if r)
|
||
total = len(results)
|
||
print(f"\n{'=' * 60}")
|
||
print(f"Results: {passed}/{total} passed ({elapsed:.2f}s)")
|
||
print("=" * 60)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
run_suite()
|