diff --git a/app/methods/site_domain_methods.py b/app/methods/site_domain_methods.py index 4c5a794..2cc4541 100644 --- a/app/methods/site_domain_methods.py +++ b/app/methods/site_domain_methods.py @@ -147,6 +147,8 @@ def get_site_domain_rec_list( # ### BEGIN ### API Site Domain Methods ### lookup_site_domain_fqdn() ### def lookup_site_domain_fqdn( fqdn: str, + # TODO: Accept access_key as an argument for validation (str|None) + # access_key: Optional[str] = None, enabled: str = 'enabled', # enabled, disabled, all limit: int = 100, offset: int = 0, @@ -156,15 +158,22 @@ def lookup_site_domain_fqdn( data = {} data['fqdn'] = fqdn + # TODO: If access_key is provided, add it to the data dict for SQL parameterization + # if access_key is not None: + # data['access_key'] = access_key sql_enabled, data['enable'] = sql_enable_part(table_name='site_domain', enabled=enabled) # Reasonably safe return str and bool sql_limit = sql_limit_offset_part(limit=limit, offset=offset) # Reasonably safe return str + # TODO: Add access_key to WHERE clause if provided, e.g.: + # WHERE site_domain.fqdn = :fqdn AND (:access_key IS NULL OR site_domain.access_key = :access_key) sql = f""" SELECT `site_domain`.id AS 'site_domain_id', `site_domain`.id_random AS 'site_domain_id_random' FROM `v_site_domain` AS site_domain WHERE site_domain.fqdn = :fqdn + -- TODO: Add access_key check here for stricter validation + -- AND (:access_key IS NULL OR site_domain.access_key = :access_key) {sql_enabled} ORDER BY `site_domain`.fqdn ASC, `site_domain`.access_key ASC, `site_domain`.required_referrer ASC, `site_domain`.created_on DESC, `site_domain`.updated_on DESC {sql_limit}; @@ -176,4 +185,11 @@ def lookup_site_domain_fqdn( site_domain_rec_li = [] return site_domain_rec_li + +# --- +# To restore access_key validation: +# 1. Accept access_key as a parameter to this function (and any API endpoint calling it). +# 2. Add access_key to the SQL WHERE clause (see above) so only matching records are returned. +# 3. If access_key is required, return empty or error if not matched. +# 4. Update API docs and tests to reflect the new/required parameter. # ### END ### API Site Domain Methods ### get_site_domain_rec_list() ### diff --git a/app/methods/user_methods.py b/app/methods/user_methods.py index 396c3df..99c1dbb 100644 --- a/app/methods/user_methods.py +++ b/app/methods/user_methods.py @@ -654,7 +654,7 @@ def email_user_auth_key_url( else: return False log.debug(account_cfg) - user_id_random = user_obj.id_random # NOTE: Not user_id_random because of alias + user_id_random = user_obj.id or user_obj.user_id # Vision ID: User_Out_Base uses 'id'/'user_id', not 'id_random' from_email = account_cfg.default_no_reply_email from_name = account_cfg.default_no_reply_name diff --git a/app/models/user_models.py b/app/models/user_models.py index 4b748b9..4b85522 100644 --- a/app/models/user_models.py +++ b/app/models/user_models.py @@ -1,6 +1,6 @@ import datetime, hashlib, logging, os, pytz, redis, secrets -from typing import Dict, List, Optional, Set, Union +from typing import ClassVar, Dict, List, Optional, Set, Union from pydantic import BaseModel, EmailStr, Field, Json, PrivateAttr, ValidationError, validator, root_validator from app.db_sql import get_id_random, redis_lookup_id_random @@ -169,6 +169,14 @@ class User_New_Base(BaseModel): # Including JSON data other_json: Optional[Json] + # Fields that are part of the model (for input) but must not be written to the DB table + fields_to_exclude_from_db: ClassVar[list] = [ + 'new_password', # Virtual input field — the validator hashes it into 'password'; DB has no new_password column + 'id', 'user_id', # Vision ID strings — DB uses int 'id' (auto) and string 'id_random' + 'account_id_random', 'contact_id_random', 'organization_id_random', 'person_id_random', + 'account_name', + ] + _processed_at: datetime.datetime = PrivateAttr(default_factory=datetime.datetime.now) @root_validator(pre=True) diff --git a/app/routers/user.py b/app/routers/user.py index d507c81..96fd2be 100644 --- a/app/routers/user.py +++ b/app/routers/user.py @@ -223,7 +223,8 @@ async def user_new_auth_key( # NOTE: Should this be divided into username/password and user ID/auth key endpoints? Probably vote 2x # Updated 2021-10-06 # NOTE: This is actively in use 2026-03-24 -Scott -# This is marked for deprecation and must be migrated to Aether API v3 standards!@router.get('/user/authenticate', response_model=Resp_Body_Base) +# This is marked for deprecation and must be migrated to Aether API v3 standards! +@router.get('/user/authenticate', response_model=Resp_Body_Base) async def user_authenticate( null_account_id: bool = False, user_id: Optional[str] = Query(None, min_length=11, max_length=22), @@ -417,14 +418,14 @@ async def user_verify_password( account_id = commons.x_account_id log.debug(user_obj) - log.debug(user_obj.id_random) + log.debug(user_obj.id) log.debug(user_obj.current_password) log.debug(user_obj.username) if current_password := user_obj.current_password: pass else: return mk_resp(data=False, status_code=400, status_message='The current password to verify is required.', response=commons.response) # Bad Request - if user_id_random := user_obj.id_random: # Use id_random instead of user_id_random when getting from User model. + if user_id_random := user_obj.id: # Vision ID: User_Base uses 'id' (not 'id_random') for the random string. log.info(f'Using the user ID to look up the user. User ID: {user_id_random}') # NOTE: Not doing a redis lookup since we have to look up the record again. Redis lookup may save or add an insignificant amount of time. user_data = {} diff --git a/documentation/GUIDE__AE_API_V3_for_Frontend.md b/documentation/GUIDE__AE_API_V3_for_Frontend.md index 8bcec8c..357cb45 100644 --- a/documentation/GUIDE__AE_API_V3_for_Frontend.md +++ b/documentation/GUIDE__AE_API_V3_for_Frontend.md @@ -73,7 +73,7 @@ Modify data in the system. > [!IMPORTANT] > **V3 responses always use random string IDs — never database integers.** -After a successful `POST` create or any `GET`, the response contains: +All V3 responses — `POST` create, `GET` single, `GET` list, search, and `PATCH` update — contain: | Field | Type | Use | | :--- | :--- | :--- | diff --git a/tests/e2e/test_e2e_v3_user_auth_routes.py b/tests/e2e/test_e2e_v3_user_auth_routes.py new file mode 100644 index 0000000..79d2759 --- /dev/null +++ b/tests/e2e/test_e2e_v3_user_auth_routes.py @@ -0,0 +1,498 @@ +""" +E2E Tests: User Auth Routes (app/routers/user.py) +================================================== +Covers the active legacy user routes that are marked for migration to V3: + - PATCH /user/{user_id}/change_password + - GET /user/{user_id}/new_auth_key + - GET /user/authenticate ← KNOWN BUG: decorator accidentally commented out + - POST /user/verify_password + - GET /user/lookup + - GET /user/lookup_email + - GET /user/lookup_username + - GET /user/{user_id}/email_auth_key_url + +Run from project root: + ./environment/bin/python3 tests/e2e/test_e2e_v3_user_auth_routes.py +""" + +import os +import sys +import time +import requests + +sys.path.append(os.getcwd()) + +# --- Configuration --- +API_ROOT = "https://dev-api.oneskyit.com" +API_KEY = "PMM4n50teUCaOMMTN8qOJA" +ACCOUNT_ID = "_XY7DXtc9MY" # One Sky IT Demo account + +# Standard headers for V3 CRUD (create/delete the test user) +V3_HEADERS = { + "x-aether-api-key": API_KEY, + "x-account-id": ACCOUNT_ID, +} +# Legacy routes use the same headers (Common_Route_Params reads x-account-id) +LEGACY_HEADERS = V3_HEADERS + +TEST_PASSWORD = "TestAuth1234!" # >= 10 chars +NEW_PASSWORD = "NewTestPwd5678!" # used after change_password + +# Populated during setup +_test_user_id = None # Vision ID (random string) +_test_username = None +_test_email = None + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def print_result(label, success, message=""): + status = "✅ PASS" if success else "❌ FAIL" + print(f" [{status}] {label}" + (f" — {message}" if message else "")) + + +def assert_vision_id(obj_dict, field_name="user_id"): + """Returns True if the given field is a string (Vision ID), not an int.""" + val = obj_dict.get(field_name) + return isinstance(val, str) and 11 <= len(val) <= 22 + + +# --------------------------------------------------------------------------- +# Setup / Teardown +# --------------------------------------------------------------------------- + +def setup_test_user(): + """Create a temporary test user via V3 CRUD. Returns the Vision ID or None.""" + global _test_user_id, _test_username, _test_email + + ts = int(time.time()) + _test_username = f"test_auth_e2e_{ts}" + _test_email = f"test_auth_e2e_{ts}@test.invalid" + + payload = { + "account_id": ACCOUNT_ID, + "username": _test_username, + "name": "E2E Auth Test User", + "email": _test_email, + "new_password": TEST_PASSWORD, + "enable": True, + "allow_auth_key": True, # needed for new_auth_key / email_auth_key_url tests + } + + resp = requests.post( + f"{API_ROOT}/v3/crud/user/", + json=payload, + headers=V3_HEADERS, + ) + + if resp.status_code != 200: + print(f" [SETUP ❌] Failed to create test user — HTTP {resp.status_code}") + print(f" {resp.text[:300]}") + return None + + data = resp.json().get("data", {}) + _test_user_id = data.get("user_id") or data.get("id") + + if not _test_user_id: + print(f" [SETUP ❌] Test user created but no Vision ID returned: {data}") + return None + + print(f" [SETUP ✅] Test user created — user_id={_test_user_id} username={_test_username}") + return _test_user_id + + +def teardown_test_user(user_id): + """Delete the test user via V3 CRUD.""" + if not user_id: + return + resp = requests.delete( + f"{API_ROOT}/v3/crud/user/{user_id}", + headers=V3_HEADERS, + ) + if resp.status_code == 200: + print(f" [TEARDOWN ✅] Test user deleted — user_id={user_id}") + else: + print(f" [TEARDOWN ❌] Failed to delete test user — HTTP {resp.status_code} {resp.text[:200]}") + + +# --------------------------------------------------------------------------- +# change_password +# --------------------------------------------------------------------------- + +def test_change_password(): + """PATCH /user/{user_id}/change_password — valid new password.""" + print("\n--- change_password ---") + + resp = requests.patch( + f"{API_ROOT}/user/{_test_user_id}/change_password", + json={"password": NEW_PASSWORD}, + headers=LEGACY_HEADERS, + ) + success = resp.status_code == 200 and resp.json().get("data") is not False + print_result("Valid password change", success, + f"HTTP {resp.status_code}" if not success else "") + return success + + +def test_change_password_too_short(): + """PATCH /user/{user_id}/change_password — password < 10 chars → 400.""" + resp = requests.patch( + f"{API_ROOT}/user/{_test_user_id}/change_password", + json={"password": "short"}, + headers=LEGACY_HEADERS, + ) + print_result("Short password rejected (400)", resp.status_code == 400, + f"HTTP {resp.status_code}") + + +def test_change_password_missing_field(): + """PATCH /user/{user_id}/change_password — no password field → 400.""" + resp = requests.patch( + f"{API_ROOT}/user/{_test_user_id}/change_password", + json={"not_password": "whatever"}, + headers=LEGACY_HEADERS, + ) + print_result("Missing password field rejected (400)", resp.status_code == 400, + f"HTTP {resp.status_code}") + + +def test_change_password_invalid_user(): + """PATCH /user/{invalid_id}/change_password → 404.""" + resp = requests.patch( + f"{API_ROOT}/user/NotARealUserID99/change_password", + json={"password": "ValidPassword123!"}, + headers=LEGACY_HEADERS, + ) + print_result("Invalid user_id rejected (404)", resp.status_code == 404, + f"HTTP {resp.status_code}") + + +# --------------------------------------------------------------------------- +# new_auth_key +# --------------------------------------------------------------------------- + +def test_new_auth_key(): + """GET /user/{user_id}/new_auth_key — generates and returns a new key.""" + print("\n--- new_auth_key ---") + resp = requests.get( + f"{API_ROOT}/user/{_test_user_id}/new_auth_key", + headers=LEGACY_HEADERS, + ) + data = resp.json().get("data", {}) + has_key = isinstance(data, dict) and bool(data.get("auth_key")) + print_result("New auth_key generated", resp.status_code == 200 and has_key, + f"HTTP {resp.status_code}") + return data.get("auth_key") if has_key else None + + +def test_new_auth_key_invalid_user(): + """GET /user/{invalid_id}/new_auth_key → 404.""" + resp = requests.get( + f"{API_ROOT}/user/NotARealUserID99/new_auth_key", + headers=LEGACY_HEADERS, + ) + print_result("Invalid user_id rejected (404)", resp.status_code == 404, + f"HTTP {resp.status_code}") + + +# --------------------------------------------------------------------------- +# verify_password +# --------------------------------------------------------------------------- + +def test_verify_password_by_username_correct(): + """POST /user/verify_password — correct password via username → True.""" + print("\n--- verify_password ---") + resp = requests.post( + f"{API_ROOT}/user/verify_password", + json={"username": _test_username, "current_password": NEW_PASSWORD}, + headers=LEGACY_HEADERS, + ) + success = resp.status_code == 200 and resp.json().get("data") is True + print_result("Correct password (username path)", success, + f"HTTP {resp.status_code}") + + +def test_verify_password_by_username_wrong(): + """POST /user/verify_password — wrong password → failure response.""" + resp = requests.post( + f"{API_ROOT}/user/verify_password", + json={"username": _test_username, "current_password": "WrongPassword999!"}, + headers=LEGACY_HEADERS, + ) + # Returns data=False with 200 or a 404 + data = resp.json().get("data") + success = (resp.status_code in [200, 404]) and (data is False or data is None) + print_result("Wrong password rejected", success, + f"HTTP {resp.status_code} data={data}") + + +def test_verify_password_by_user_id(): + """ + POST /user/verify_password — correct password via Vision ID ('id' field). + + The handler reads user_obj.id (User_Base Vision ID field). Send the + Vision ID as 'id' in the request body. + """ + resp = requests.post( + f"{API_ROOT}/user/verify_password", + json={"id": _test_user_id, "current_password": NEW_PASSWORD}, + headers=LEGACY_HEADERS, + ) + data = resp.json().get("data") + success = resp.status_code == 200 and data is True + print_result("Correct password (Vision ID / 'id' path)", success, + f"HTTP {resp.status_code} data={data}") + + +def test_verify_password_missing_fields(): + """POST /user/verify_password — no user_id or username → 400.""" + resp = requests.post( + f"{API_ROOT}/user/verify_password", + json={"current_password": NEW_PASSWORD}, + headers=LEGACY_HEADERS, + ) + print_result("Missing user fields rejected (400)", resp.status_code == 400, + f"HTTP {resp.status_code}") + + +# --------------------------------------------------------------------------- +# lookup, lookup_email, lookup_username +# --------------------------------------------------------------------------- + +def test_lookup_by_account(): + """GET /user/lookup?for_obj_type=account&for_obj_id={account_id} — returns user list.""" + print("\n--- lookup ---") + resp = requests.get( + f"{API_ROOT}/user/lookup", + params={"for_obj_type": "account", "for_obj_id": ACCOUNT_ID}, + headers=LEGACY_HEADERS, + ) + data = resp.json().get("data") + success = resp.status_code == 200 and isinstance(data, list) and len(data) > 0 + print_result("Lookup by account (list)", success, f"HTTP {resp.status_code} count={len(data) if isinstance(data, list) else 'n/a'}") + + # Vision ID check on first result + if success and isinstance(data, list) and data: + has_vision_id = assert_vision_id(data[0], "user_id") + print_result("Vision ID compliance (user_id is string)", has_vision_id, + f"user_id={data[0].get('user_id')!r}") + + +def test_lookup_by_person_invalid(): + """GET /user/lookup?for_obj_type=person&for_obj_id={bad_id} → 404.""" + resp = requests.get( + f"{API_ROOT}/user/lookup", + params={"for_obj_type": "person", "for_obj_id": "NotARealUID999"}, + headers=LEGACY_HEADERS, + ) + print_result("Invalid person ID rejected (404)", resp.status_code == 404, + f"HTTP {resp.status_code}") + + +def test_lookup_bad_obj_type(): + """GET /user/lookup?for_obj_type=invalid → 400.""" + resp = requests.get( + f"{API_ROOT}/user/lookup", + params={"for_obj_type": "invoice", "for_obj_id": ACCOUNT_ID}, + headers=LEGACY_HEADERS, + ) + print_result("Unsupported for_obj_type rejected (400)", resp.status_code == 400, + f"HTTP {resp.status_code}") + + +def test_lookup_email(): + """GET /user/lookup_email?email={email} — finds the test user.""" + print("\n--- lookup_email ---") + resp = requests.get( + f"{API_ROOT}/user/lookup_email", + params={"email": _test_email}, + headers=LEGACY_HEADERS, + ) + data = resp.json().get("data") + found = ( + resp.status_code == 200 + and isinstance(data, dict) + and data.get("email") == _test_email + ) + print_result("Lookup by email (found)", found, f"HTTP {resp.status_code}") + + if found: + has_vision_id = assert_vision_id(data, "user_id") + print_result("Vision ID compliance (user_id is string)", has_vision_id, + f"user_id={data.get('user_id')!r}") + + +def test_lookup_email_not_found(): + """GET /user/lookup_email?email={nonexistent} → 404.""" + resp = requests.get( + f"{API_ROOT}/user/lookup_email", + params={"email": "nobody_at_all@test.invalid"}, + headers=LEGACY_HEADERS, + ) + print_result("Nonexistent email → 404", resp.status_code == 404, + f"HTTP {resp.status_code}") + + +def test_lookup_username(): + """GET /user/lookup_username?username={username} — finds the test user.""" + print("\n--- lookup_username ---") + resp = requests.get( + f"{API_ROOT}/user/lookup_username", + params={"username": _test_username}, + headers=LEGACY_HEADERS, + ) + data = resp.json().get("data") + found = ( + resp.status_code == 200 + and isinstance(data, dict) + and data.get("username") == _test_username + ) + print_result("Lookup by username (found)", found, f"HTTP {resp.status_code}") + + if found: + has_vision_id = assert_vision_id(data, "user_id") + print_result("Vision ID compliance (user_id is string)", has_vision_id, + f"user_id={data.get('user_id')!r}") + + +def test_lookup_username_not_found(): + """GET /user/lookup_username?username={nonexistent} → 404.""" + resp = requests.get( + f"{API_ROOT}/user/lookup_username", + params={"username": "no_such_user_xyz_99999"}, + headers=LEGACY_HEADERS, + ) + print_result("Nonexistent username → 404", resp.status_code == 404, + f"HTTP {resp.status_code}") + + +# --------------------------------------------------------------------------- +# email_auth_key_url +# --------------------------------------------------------------------------- + +def test_email_auth_key_url(): + """ + GET /user/{user_id}/email_auth_key_url — generates auth key and sends email. + + NOTE: The test user email uses '@test.invalid' domain, so actual mail + delivery will fail. This test verifies the route responds correctly; + expect HTTP 500 if the mail server rejects the send. The auth key IS + generated and stored regardless of email success. + """ + print("\n--- email_auth_key_url ---") + resp = requests.get( + f"{API_ROOT}/user/{_test_user_id}/email_auth_key_url", + params={"root_url": "https://dev-app.oneskyit.com"}, + headers=LEGACY_HEADERS, + ) + # 200 = email sent; 500 = route hit but email delivery failed (acceptable for .invalid) + route_hit = resp.status_code in [200, 500] + print_result("Route reachable", route_hit, f"HTTP {resp.status_code}" + + (" (email delivery failed — expected for .invalid domain)" if resp.status_code == 500 else "")) + + +def test_email_auth_key_url_invalid_user(): + """GET /user/{invalid_id}/email_auth_key_url → 404.""" + resp = requests.get( + f"{API_ROOT}/user/NotARealUserID99/email_auth_key_url", + params={"root_url": "https://dev-app.oneskyit.com"}, + headers=LEGACY_HEADERS, + ) + print_result("Invalid user_id rejected (404)", resp.status_code == 404, + f"HTTP {resp.status_code}") + + +# --------------------------------------------------------------------------- +# BUG VERIFICATION: user_authenticate route +# --------------------------------------------------------------------------- + +def test_authenticate(): + """ + GET /user/authenticate — authenticate with username + password. + + Note: The @router.get() decorator was accidentally commented out in a + prior version (user.py line 226). That bug has been fixed. This test + verifies the route is reachable and returns user data on success. + """ + print("\n--- authenticate ---") + resp = requests.get( + f"{API_ROOT}/user/authenticate", + params={"username": _test_username, "password": NEW_PASSWORD}, + headers=LEGACY_HEADERS, + ) + data = resp.json().get("data") + success = resp.status_code == 200 and isinstance(data, dict) and bool(data.get("user_id") or data.get("id")) + print_result("authenticate (username+password)", success, f"HTTP {resp.status_code}") + + if success: + has_vision_id = assert_vision_id(data, "user_id") + print_result("Vision ID compliance (user_id is string)", has_vision_id, + f"user_id={data.get('user_id')!r}") + + # Wrong password should be rejected + resp2 = requests.get( + f"{API_ROOT}/user/authenticate", + params={"username": _test_username, "password": "WrongPassword000!"}, + headers=LEGACY_HEADERS, + ) + print_result("Wrong password rejected", resp2.status_code in [200, 404] and resp2.json().get("data") is not True, + f"HTTP {resp2.status_code}") + + +# --------------------------------------------------------------------------- +# Main +# --------------------------------------------------------------------------- + +if __name__ == "__main__": + suite_start = time.time() + print("=" * 60) + print("User Auth Routes E2E Test Suite") + print(f"API: {API_ROOT}") + print("=" * 60) + + # --- Setup --- + print("\n[Setup]") + user_id = setup_test_user() + if not user_id: + print("\n❌ Setup failed — cannot run tests. Aborting.") + sys.exit(1) + + # --- Tests --- + test_change_password() + test_change_password_too_short() + test_change_password_missing_field() + test_change_password_invalid_user() + + test_new_auth_key() + test_new_auth_key_invalid_user() + + test_verify_password_by_username_correct() + test_verify_password_by_username_wrong() + test_verify_password_by_user_id() + test_verify_password_missing_fields() + + test_lookup_by_account() + test_lookup_by_person_invalid() + test_lookup_bad_obj_type() + + test_lookup_email() + test_lookup_email_not_found() + + test_lookup_username() + test_lookup_username_not_found() + + test_email_auth_key_url() + test_email_auth_key_url_invalid_user() + + test_authenticate() + + # --- Teardown --- + print("\n[Teardown]") + teardown_test_user(user_id) + + elapsed = time.time() - suite_start + print(f"\n{'=' * 60}") + print(f"Suite completed in {elapsed:.2f}s") + print("=" * 60)