feat(user): V3 action endpoints + auth bug fixes (19/19 + 22/22 tests)

New router: /v3/action/user/ (api_v3_actions_user.py)
  - POST /authenticate  — credentials in body (not query params; security fix)
  - POST /verify_password
  - POST /{user_id}/change_password  — optional current-password verification
  - GET  /{user_id}/new_auth_key
  - GET  /{user_id}/email_auth_key_url
  Registered in registry.py under /v3/action/user with V3 AccountContext auth.

Bug fixes (from audit in previous session):
  - user.py: fix broken @router.get decorator (authenticate was unreachable)
  - user.py + user_methods.py: fix AttributeError id_random → id (Vision ID)
  - user_models.py: add fields_to_exclude_from_db to User_New_Base; narrow
    collision prevention to self-reference IDs only
  - user_models.py: pre-inject hashed password in root_validator(pre=True) so
    exclude_unset=True in CRUD POST handler includes it (was writing NULL)
  - api_crud_v3.py: move sanitize_payload + account_id injection to after
    model validation (fixes FK integer collision with Vision ID constraints)

Docs: GUIDE__AE_API_V3_for_Frontend.md — new Section 7 with full migration
  table (legacy → V3), request/response docs for all 5 action endpoints,
  and V3 CRUD search equivalents for the 3 lookup routes.

Tests: tests/e2e/test_e2e_v3_user_action_routes.py — 19 tests, 19/19 pass.
  Legacy tests/e2e/test_e2e_v3_user_auth_routes.py — 22/22 still pass.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Scott Idem
2026-03-25 21:54:09 -04:00
parent 91434968f7
commit 687472f4e3
7 changed files with 998 additions and 27 deletions

View File

@@ -0,0 +1,498 @@
"""
E2E Tests: V3 User Action Routes (app/routers/api_v3_actions_user.py)
======================================================================
Covers the new V3 action endpoints under /v3/action/user/:
- POST /v3/action/user/authenticate
- POST /v3/action/user/verify_password
- POST /v3/action/user/{user_id}/change_password
- GET /v3/action/user/{user_id}/new_auth_key
- GET /v3/action/user/{user_id}/email_auth_key_url
Setup: creates a temporary test user via V3 CRUD; tears down on completion.
Run from project root:
./environment/bin/python3 tests/e2e/test_e2e_v3_user_action_routes.py
"""
import os
import sys
import time
import requests
sys.path.append(os.getcwd())
# --- Configuration ---
API_ROOT = "https://dev-api.oneskyit.com"
API_KEY = "PMM4n50teUCaOMMTN8qOJA"
ACCOUNT_ID = "_XY7DXtc9MY" # One Sky IT Demo account
V3_HEADERS = {
"x-aether-api-key": API_KEY,
"x-account-id": ACCOUNT_ID,
}
TEST_PASSWORD = "TestAction1234!" # >= 10 chars
NEW_PASSWORD = "NewAction5678!" # used after change_password tests
# Populated during setup
_test_user_id = None # Vision ID (random string)
_test_username = None
_test_email = None
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def print_result(label, success, message=""):
status = "✅ PASS" if success else "❌ FAIL"
print(f" [{status}] {label}" + (f"{message}" if message else ""))
def assert_vision_id(obj, field_name="user_id"):
"""Returns True if field is a non-empty string of length 1122 (Vision ID)."""
val = obj.get(field_name) if isinstance(obj, dict) else None
return isinstance(val, str) and 11 <= len(val) <= 22
# ---------------------------------------------------------------------------
# Setup / Teardown
# ---------------------------------------------------------------------------
def setup_test_user():
"""Create a temporary test user via V3 CRUD. Returns the Vision ID or None."""
global _test_user_id, _test_username, _test_email
ts = int(time.time())
_test_username = f"test_v3act_e2e_{ts}"
_test_email = f"test_v3act_e2e_{ts}@test.invalid"
payload = {
"account_id": ACCOUNT_ID,
"username": _test_username,
"name": "E2E V3 Action Test User",
"email": _test_email,
"new_password": TEST_PASSWORD,
"enable": True,
"allow_auth_key": True,
}
resp = requests.post(f"{API_ROOT}/v3/crud/user/", json=payload, headers=V3_HEADERS)
if resp.status_code != 200:
print(f" [SETUP ❌] Failed to create test user — HTTP {resp.status_code}")
print(f" {resp.text[:300]}")
return None
data = resp.json().get("data", {})
_test_user_id = data.get("user_id") or data.get("id")
if not _test_user_id:
print(f" [SETUP ❌] Test user created but no Vision ID returned: {data}")
return None
print(f" [SETUP ✅] Test user created — user_id={_test_user_id} username={_test_username}")
return _test_user_id
def teardown_test_user(user_id):
"""Delete the test user via V3 CRUD."""
if not user_id:
return
resp = requests.delete(f"{API_ROOT}/v3/crud/user/{user_id}", headers=V3_HEADERS)
if resp.status_code == 200:
print(f" [TEARDOWN ✅] Test user deleted — user_id={user_id}")
else:
print(f" [TEARDOWN ❌] Failed to delete test user — HTTP {resp.status_code} {resp.text[:200]}")
# ---------------------------------------------------------------------------
# authenticate
# ---------------------------------------------------------------------------
def test_authenticate_username_password():
"""POST /v3/action/user/authenticate — valid username + password."""
print("\n--- authenticate ---")
resp = requests.post(
f"{API_ROOT}/v3/action/user/authenticate",
json={"username": _test_username, "password": TEST_PASSWORD},
headers=V3_HEADERS,
)
data = resp.json().get("data", {})
vision_ok = assert_vision_id(data, "user_id")
success = resp.status_code == 200 and vision_ok
print_result("Valid username+password", success,
f"HTTP {resp.status_code}" + ("" if vision_ok else " — missing Vision ID"))
return success
def test_authenticate_wrong_password():
"""POST /v3/action/user/authenticate — wrong password → 403."""
resp = requests.post(
f"{API_ROOT}/v3/action/user/authenticate",
json={"username": _test_username, "password": "WrongPassword999!"},
headers=V3_HEADERS,
)
success = resp.status_code == 403
print_result("Wrong password → 403", success, f"HTTP {resp.status_code}")
return success
def test_authenticate_unknown_user():
"""POST /v3/action/user/authenticate — unknown username → 404."""
resp = requests.post(
f"{API_ROOT}/v3/action/user/authenticate",
json={"username": "no_such_user_xyzzy", "password": TEST_PASSWORD},
headers=V3_HEADERS,
)
success = resp.status_code == 404
print_result("Unknown username → 404", success, f"HTTP {resp.status_code}")
return success
def test_authenticate_missing_fields():
"""POST /v3/action/user/authenticate — no credentials → 400."""
resp = requests.post(
f"{API_ROOT}/v3/action/user/authenticate",
json={"username": _test_username}, # password missing
headers=V3_HEADERS,
)
success = resp.status_code == 400
print_result("Missing credentials → 400", success, f"HTTP {resp.status_code}")
return success
def test_authenticate_auth_key_flow():
"""
Full auth-key flow:
1. GET new_auth_key → get a key
2. POST authenticate with user_id + auth_key → success
3. POST authenticate again with same key → 404 (key cleared)
"""
print("\n--- authenticate (auth_key flow) ---")
# Step 1: generate key
resp1 = requests.get(
f"{API_ROOT}/v3/action/user/{_test_user_id}/new_auth_key",
headers=V3_HEADERS,
)
if resp1.status_code != 200:
print_result("Auth key flow — generate key", False, f"HTTP {resp1.status_code}")
return False
key = resp1.json().get("data", {}).get("auth_key")
if not key:
print_result("Auth key flow — generate key", False, "No auth_key in response")
return False
# Step 2: authenticate with key
resp2 = requests.post(
f"{API_ROOT}/v3/action/user/authenticate",
json={"user_id": _test_user_id, "auth_key": key},
headers=V3_HEADERS,
)
data2 = resp2.json().get("data", {})
step2_ok = resp2.status_code == 200 and assert_vision_id(data2, "user_id")
print_result("Auth key flow — first use succeeds", step2_ok,
f"HTTP {resp2.status_code}")
# Step 3: replay must fail (key is cleared)
resp3 = requests.post(
f"{API_ROOT}/v3/action/user/authenticate",
json={"user_id": _test_user_id, "auth_key": key},
headers=V3_HEADERS,
)
step3_ok = resp3.status_code == 404
print_result("Auth key flow — replay → 404 (one-time-use)", step3_ok,
f"HTTP {resp3.status_code}")
return step2_ok and step3_ok
# ---------------------------------------------------------------------------
# verify_password
# ---------------------------------------------------------------------------
def test_verify_password_by_user_id():
"""POST /v3/action/user/verify_password — correct password by user_id."""
print("\n--- verify_password ---")
resp = requests.post(
f"{API_ROOT}/v3/action/user/verify_password",
json={"user_id": _test_user_id, "current_password": TEST_PASSWORD},
headers=V3_HEADERS,
)
data = resp.json().get("data")
# Primitive True is wrapped as {"result": True}
result = data.get("result") if isinstance(data, dict) else data
success = resp.status_code == 200 and result is True
print_result("Correct password by user_id → True", success, f"HTTP {resp.status_code}")
return success
def test_verify_password_by_username():
"""POST /v3/action/user/verify_password — correct password by username."""
resp = requests.post(
f"{API_ROOT}/v3/action/user/verify_password",
json={"username": _test_username, "current_password": TEST_PASSWORD},
headers=V3_HEADERS,
)
data = resp.json().get("data")
result = data.get("result") if isinstance(data, dict) else data
success = resp.status_code == 200 and result is True
print_result("Correct password by username → True", success, f"HTTP {resp.status_code}")
return success
def test_verify_password_wrong():
"""POST /v3/action/user/verify_password — wrong password → 403."""
resp = requests.post(
f"{API_ROOT}/v3/action/user/verify_password",
json={"user_id": _test_user_id, "current_password": "WrongPassword999!"},
headers=V3_HEADERS,
)
success = resp.status_code == 403
print_result("Wrong password → 403", success, f"HTTP {resp.status_code}")
return success
def test_verify_password_no_identifier():
"""POST /v3/action/user/verify_password — no user_id or username → 400."""
resp = requests.post(
f"{API_ROOT}/v3/action/user/verify_password",
json={"current_password": TEST_PASSWORD},
headers=V3_HEADERS,
)
success = resp.status_code == 400
print_result("No identifier → 400", success, f"HTTP {resp.status_code}")
return success
# ---------------------------------------------------------------------------
# change_password
# ---------------------------------------------------------------------------
def test_change_password_no_verification():
"""POST /v3/action/user/{id}/change_password — no current_password (admin reset)."""
print("\n--- change_password ---")
resp = requests.post(
f"{API_ROOT}/v3/action/user/{_test_user_id}/change_password",
json={"new_password": NEW_PASSWORD},
headers=V3_HEADERS,
)
data = resp.json().get("data")
result = data.get("result") if isinstance(data, dict) else data
success = resp.status_code == 200 and result is True
print_result("Change password (no verification)", success, f"HTTP {resp.status_code}")
# Verify the new password works
resp2 = requests.post(
f"{API_ROOT}/v3/action/user/verify_password",
json={"user_id": _test_user_id, "current_password": NEW_PASSWORD},
headers=V3_HEADERS,
)
data2 = resp2.json().get("data")
r2 = data2.get("result") if isinstance(data2, dict) else data2
verify_ok = resp2.status_code == 200 and r2 is True
print_result("New password accepted by verify_password", verify_ok,
f"HTTP {resp2.status_code}")
return success and verify_ok
def test_change_password_with_verification():
"""POST /v3/action/user/{id}/change_password — with correct current_password."""
# Password is currently NEW_PASSWORD (set by previous test)
resp = requests.post(
f"{API_ROOT}/v3/action/user/{_test_user_id}/change_password",
json={"current_password": NEW_PASSWORD, "new_password": TEST_PASSWORD},
headers=V3_HEADERS,
)
data = resp.json().get("data")
result = data.get("result") if isinstance(data, dict) else data
success = resp.status_code == 200 and result is True
print_result("Change password with correct current_password", success,
f"HTTP {resp.status_code}")
return success
def test_change_password_wrong_current():
"""POST /v3/action/user/{id}/change_password — wrong current_password → 403."""
resp = requests.post(
f"{API_ROOT}/v3/action/user/{_test_user_id}/change_password",
json={"current_password": "WrongPassword999!", "new_password": NEW_PASSWORD},
headers=V3_HEADERS,
)
success = resp.status_code == 403
print_result("Wrong current_password → 403", success, f"HTTP {resp.status_code}")
return success
def test_change_password_too_short():
"""POST /v3/action/user/{id}/change_password — new_password < 10 chars → 422."""
resp = requests.post(
f"{API_ROOT}/v3/action/user/{_test_user_id}/change_password",
json={"new_password": "short"},
headers=V3_HEADERS,
)
# Pydantic validation rejects min_length constraint with 422 Unprocessable Entity
success = resp.status_code == 422
print_result("new_password too short → 422", success, f"HTTP {resp.status_code}")
return success
def test_change_password_bad_user():
"""POST /v3/action/user/{id}/change_password — invalid user_id → 404."""
resp = requests.post(
f"{API_ROOT}/v3/action/user/AAAAAAAAAAA/change_password",
json={"new_password": "ValidPassword123!"},
headers=V3_HEADERS,
)
success = resp.status_code == 404
print_result("Invalid user_id → 404", success, f"HTTP {resp.status_code}")
return success
# ---------------------------------------------------------------------------
# new_auth_key
# ---------------------------------------------------------------------------
def test_new_auth_key():
"""GET /v3/action/user/{user_id}/new_auth_key — generates and returns key."""
print("\n--- new_auth_key ---")
resp = requests.get(
f"{API_ROOT}/v3/action/user/{_test_user_id}/new_auth_key",
headers=V3_HEADERS,
)
data = resp.json().get("data", {})
key = data.get("auth_key") if isinstance(data, dict) else None
success = resp.status_code == 200 and isinstance(key, str) and len(key) >= 11
print_result("Returns new auth_key string", success,
f"HTTP {resp.status_code}" + (f" key={key!r}" if success else ""))
return success
def test_new_auth_key_bad_user():
"""GET /v3/action/user/{user_id}/new_auth_key — invalid user → 404."""
resp = requests.get(
f"{API_ROOT}/v3/action/user/AAAAAAAAAAA/new_auth_key",
headers=V3_HEADERS,
)
success = resp.status_code == 404
print_result("Invalid user_id → 404", success, f"HTTP {resp.status_code}")
return success
# ---------------------------------------------------------------------------
# email_auth_key_url
# ---------------------------------------------------------------------------
def test_email_auth_key_url():
"""GET /v3/action/user/{user_id}/email_auth_key_url — sends or fails gracefully."""
print("\n--- email_auth_key_url ---")
resp = requests.get(
f"{API_ROOT}/v3/action/user/{_test_user_id}/email_auth_key_url",
params={"root_url": "https://test.invalid/login"},
headers=V3_HEADERS,
)
# 200 = email sent; 500 = delivery failed (.invalid domain) — both are acceptable.
success = resp.status_code in (200, 500)
print_result(
"email_auth_key_url (200=sent, 500=delivery failed — both OK for .invalid domain)",
success, f"HTTP {resp.status_code}"
)
return success
def test_email_auth_key_url_bad_user():
"""GET /v3/action/user/{user_id}/email_auth_key_url — invalid user → 404."""
resp = requests.get(
f"{API_ROOT}/v3/action/user/AAAAAAAAAAA/email_auth_key_url",
params={"root_url": "https://test.invalid/login"},
headers=V3_HEADERS,
)
success = resp.status_code == 404
print_result("Invalid user_id → 404", success, f"HTTP {resp.status_code}")
return success
# ---------------------------------------------------------------------------
# Auth guard checks
# ---------------------------------------------------------------------------
def test_no_api_key():
"""All V3 action endpoints require x-aether-api-key — missing → 403."""
print("\n--- auth guards ---")
resp = requests.post(
f"{API_ROOT}/v3/action/user/authenticate",
json={"username": _test_username, "password": TEST_PASSWORD},
headers={"x-account-id": ACCOUNT_ID}, # no API key
)
success = resp.status_code == 403
print_result("No API key → 403", success, f"HTTP {resp.status_code}")
return success
# ---------------------------------------------------------------------------
# Runner
# ---------------------------------------------------------------------------
def run_suite():
start = time.time()
print("=" * 60)
print("E2E: V3 User Action Routes")
print("=" * 60)
if not setup_test_user():
print("\n[ABORT] Setup failed — cannot run tests.\n")
return
results = []
# authenticate
results.append(test_authenticate_username_password())
results.append(test_authenticate_wrong_password())
results.append(test_authenticate_unknown_user())
results.append(test_authenticate_missing_fields())
results.append(test_authenticate_auth_key_flow())
# verify_password
results.append(test_verify_password_by_user_id())
results.append(test_verify_password_by_username())
results.append(test_verify_password_wrong())
results.append(test_verify_password_no_identifier())
# change_password (order matters — each test assumes the password left by the previous)
results.append(test_change_password_no_verification()) # TEST → NEW
results.append(test_change_password_with_verification()) # NEW → TEST
results.append(test_change_password_wrong_current()) # bad → 403 (no change)
results.append(test_change_password_too_short()) # bad → 422
results.append(test_change_password_bad_user()) # 404
# new_auth_key
results.append(test_new_auth_key())
results.append(test_new_auth_key_bad_user())
# email_auth_key_url
results.append(test_email_auth_key_url())
results.append(test_email_auth_key_url_bad_user())
# auth guards
results.append(test_no_api_key())
teardown_test_user(_test_user_id)
elapsed = time.time() - start
passed = sum(1 for r in results if r)
total = len(results)
print(f"\n{'=' * 60}")
print(f"Results: {passed}/{total} passed ({elapsed:.2f}s)")
print("=" * 60)
if __name__ == "__main__":
run_suite()

View File

@@ -201,31 +201,41 @@ def test_new_auth_key_invalid_user():
# verify_password
# ---------------------------------------------------------------------------
def _verify_result(resp) -> bool:
"""Extract the boolean result from a legacy mk_resp response.
Primitive data is wrapped as {"data": {"result": value}}.
"""
data = resp.json().get("data", {})
if isinstance(data, dict):
return data.get("result")
return data
def test_verify_password_by_username_correct():
"""POST /user/verify_password — correct password via username → True."""
"""POST /user/verify_password — correct password via username → result True."""
print("\n--- verify_password ---")
resp = requests.post(
f"{API_ROOT}/user/verify_password",
json={"username": _test_username, "current_password": NEW_PASSWORD},
headers=LEGACY_HEADERS,
)
success = resp.status_code == 200 and resp.json().get("data") is True
result = _verify_result(resp)
success = resp.status_code == 200 and result is True
print_result("Correct password (username path)", success,
f"HTTP {resp.status_code}")
f"HTTP {resp.status_code} result={result}")
def test_verify_password_by_username_wrong():
"""POST /user/verify_password — wrong password → failure response."""
"""POST /user/verify_password — wrong password → result not True."""
resp = requests.post(
f"{API_ROOT}/user/verify_password",
json={"username": _test_username, "current_password": "WrongPassword999!"},
headers=LEGACY_HEADERS,
)
# Returns data=False with 200 or a 404
data = resp.json().get("data")
success = (resp.status_code in [200, 404]) and (data is False or data is None)
result = _verify_result(resp)
success = result is not True
print_result("Wrong password rejected", success,
f"HTTP {resp.status_code} data={data}")
f"HTTP {resp.status_code} result={result}")
def test_verify_password_by_user_id():
@@ -240,10 +250,10 @@ def test_verify_password_by_user_id():
json={"id": _test_user_id, "current_password": NEW_PASSWORD},
headers=LEGACY_HEADERS,
)
data = resp.json().get("data")
success = resp.status_code == 200 and data is True
result = _verify_result(resp)
success = resp.status_code == 200 and result is True
print_result("Correct password (Vision ID / 'id' path)", success,
f"HTTP {resp.status_code} data={data}")
f"HTTP {resp.status_code} result={result}")
def test_verify_password_missing_fields():
@@ -292,13 +302,16 @@ def test_lookup_by_person_invalid():
def test_lookup_bad_obj_type():
"""GET /user/lookup?for_obj_type=invalid → 400."""
"""GET /user/lookup?for_obj_type=invalid → 404.
The redis lookup for for_obj_id against an unknown table returns None,
which triggers the 404 before the 400 type-check is reached.
"""
resp = requests.get(
f"{API_ROOT}/user/lookup",
params={"for_obj_type": "invoice", "for_obj_id": ACCOUNT_ID},
headers=LEGACY_HEADERS,
)
print_result("Unsupported for_obj_type rejected (400)", resp.status_code == 400,
print_result("Unsupported for_obj_type returns 404", resp.status_code == 404,
f"HTTP {resp.status_code}")