Files
OSIT-AE-API-FastAPI/app/routers/api_v3_actions_user.py
Scott Idem b590bc09a0 fix: require root_url on email_auth_key_url; correct frontend guide for user auth endpoints
- Make root_url a required query param on GET /v3/action/user/{id}/email_auth_key_url
  (previously Optional[str]=None, which produced a malformed link in the emailed URL)
- Update GUIDE__AE_API_V3_for_Frontend.md: document root_url as required, add magic link
  URL format, note valid_email=True side effect, add 404 error, expand 403 conditions
  for authenticate, add 400 for verify_password when no password is set
- Add test_e2e_v3_user_action_routes.py and test_e2e_v3_user_auth_routes.py to tests/README.md

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-25 12:34:49 -04:00

300 lines
13 KiB
Python

"""
Aether API V3 - User Action Router
------------------------------------
Handles secure, stateful user account operations that are not standard CRUD.
Routes:
POST /authenticate — username+password or user_id+auth_key (body, not query params)
POST /verify_password — verify a user's current password without changing it
POST /{user_id}/change_password — change password (with optional current-password verification)
GET /{user_id}/new_auth_key — generate a new one-time login auth key
GET /{user_id}/email_auth_key_url — email a one-time login link to the user
Security improvements over legacy /user/* routes:
- Credentials are in the POST body, never in query params (no URL logging exposure).
- Uses V3 AccountContext (x-aether-api-key mandatory).
- HTTPException for all error paths (native FastAPI status codes).
"""
from fastapi import APIRouter, Body, Depends, HTTPException, Path, Query, status
import datetime
import logging
from typing import Optional
from pydantic import BaseModel, Field
from app.db_sql import redis_lookup_id_random, sql_select, sql_update
from app.lib_general import secure_hash_string, verify_secure_hash_string
from app.lib_general_v3 import AccountContext, get_account_context
from app.methods.user_methods import email_user_auth_key_url, load_user_obj
from app.models.common_field_schema import default_num_bytes
from app.models.response_models import Resp_Body_Base, mk_resp
log = logging.getLogger(__name__)
router = APIRouter()
# --- Request Body Models ---
class ChangePasswordRequest(BaseModel):
new_password: str = Field(..., min_length=10, max_length=100)
current_password: Optional[str] = Field(None, description="If provided, verified before applying the change.")
class AuthenticateRequest(BaseModel):
"""Provide either username+password or user_id+auth_key."""
username: Optional[str] = Field(None, min_length=3, max_length=50)
password: Optional[str] = Field(None, min_length=8, max_length=100)
user_id: Optional[str] = Field(None, min_length=11, max_length=22, description="Vision ID (id_random) of the user.")
auth_key: Optional[str] = Field(None, min_length=11, max_length=22)
valid_email: Optional[bool] = Field(None, description="If True, marks email_verified=True on successful auth.")
class VerifyPasswordRequest(BaseModel):
"""Provide user_id (Vision ID) or username, plus the password to verify."""
current_password: str = Field(..., min_length=1, max_length=100)
user_id: Optional[str] = Field(None, min_length=11, max_length=22)
username: Optional[str] = Field(None, min_length=2, max_length=50)
# --- Internal Helper ---
def _check_user_enabled(rec: dict) -> Optional[str]:
"""
Returns an error message string if the user account is not currently active, None if OK.
Checks: enable flag, enable_from, enable_to (all treated as UTC).
"""
if not rec.get('enable'):
return 'This user account is not enabled.'
now = datetime.datetime.now(datetime.timezone.utc)
if enable_from := rec.get('enable_from'):
ef = enable_from.replace(tzinfo=datetime.timezone.utc)
if ef > now:
return f'This user account is not yet enabled (active from: {ef}).'
if enable_to := rec.get('enable_to'):
et = enable_to.replace(tzinfo=datetime.timezone.utc)
if et < now:
return f'This user account has expired (expired: {et}).'
return None
# --- Routes ---
@router.post('/authenticate', response_model=Resp_Body_Base)
async def action_authenticate(
body: AuthenticateRequest = Body(...),
inc_user_role_list: bool = Query(False),
account: AccountContext = Depends(get_account_context),
):
"""
Authenticate a user by username+password or user_id+auth_key.
- Credentials are in the POST body (not query params) — safe from URL logging.
- Auth key is one-time-use: cleared on successful authentication.
- On success: stamps logged_in_on, returns the full user object.
- Provide x-account-id to scope username lookups to the correct account.
"""
account_id = account.account_id
if body.username and body.password:
sql = """
SELECT id AS user_id, id_random AS user_id_random, password,
enable, enable_from, enable_to
FROM `user`
WHERE account_id = :account_id AND username = :username
LIMIT 1
"""
rec = sql_select(sql=sql, data={'account_id': account_id, 'username': body.username})
if not rec:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND,
detail='User not found for this account and username.')
if not rec.get('password'):
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST,
detail='No password is set for this user.')
if not verify_secure_hash_string(string=body.password, string_hash=rec['password']):
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN,
detail='Password did not match.')
if err := _check_user_enabled(rec):
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=err)
db_user_id = rec['user_id']
update_data = {'id': db_user_id, 'logged_in_on': datetime.datetime.utcnow()}
if body.valid_email:
update_data['email_verified'] = True
sql_update(table_name='user', data=update_data)
elif body.user_id and body.auth_key:
sql = """
SELECT id AS user_id, id_random AS user_id_random, password,
enable, enable_from, enable_to
FROM `user`
WHERE id_random = :user_id_random
AND auth_key = :auth_key
AND allow_auth_key = 1
LIMIT 1
"""
rec = sql_select(sql=sql, data={'user_id_random': body.user_id, 'auth_key': body.auth_key})
if not rec:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND,
detail='User + auth key combination not found.')
if err := _check_user_enabled(rec):
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=err)
db_user_id = rec['user_id']
# Auth key is one-time-use — clear it immediately.
update_data = {'id': db_user_id, 'auth_key': None, 'logged_in_on': datetime.datetime.utcnow()}
if body.valid_email:
update_data['email_verified'] = True
sql_update(table_name='user', data=update_data)
else:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST,
detail='Provide either username+password or user_id+auth_key.')
user_obj = load_user_obj(user_id=db_user_id, inc_user_role_list=inc_user_role_list)
if not user_obj:
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail='Authentication succeeded but user record could not be loaded.')
return mk_resp(data=user_obj.dict(by_alias=True), status_message='Authentication successful.')
@router.post('/verify_password', response_model=Resp_Body_Base)
async def action_verify_password(
body: VerifyPasswordRequest = Body(...),
account: AccountContext = Depends(get_account_context),
):
"""
Verify a user's current password without changing it.
Provide user_id (Vision ID) or username + current_password.
Returns data=True on match, 403 on mismatch.
"""
account_id = account.account_id
if body.user_id:
sql = """
SELECT id AS user_id, username, password
FROM `user`
WHERE id_random = :user_id_random
LIMIT 1
"""
rec = sql_select(sql=sql, data={'user_id_random': body.user_id})
elif body.username:
sql = """
SELECT id AS user_id, username, password
FROM `user`
WHERE account_id = :account_id AND username = :username
LIMIT 1
"""
rec = sql_select(sql=sql, data={'account_id': account_id, 'username': body.username})
else:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST,
detail='Provide user_id or username.')
if not rec:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail='User not found.')
if not rec.get('password'):
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST,
detail='No password is set for this user.')
if not verify_secure_hash_string(string=body.current_password, string_hash=rec['password']):
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail='Password did not match.')
return mk_resp(data=True, status_message='Password verified.')
@router.post('/{user_id}/change_password', response_model=Resp_Body_Base)
async def action_change_password(
user_id: str = Path(min_length=11, max_length=22),
body: ChangePasswordRequest = Body(...),
account: AccountContext = Depends(get_account_context),
):
"""
Change a user's password.
- new_password is required (min 10 chars).
- If current_password is provided, it is verified before the change is applied.
- Stamps password_set_on on success.
"""
db_user_id = redis_lookup_id_random(record_id_random=user_id, table_name='user')
if not db_user_id:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail='User not found.')
if body.current_password:
sql = "SELECT password FROM `user` WHERE id = :uid LIMIT 1"
rec = sql_select(sql=sql, data={'uid': db_user_id})
if not rec or not rec.get('password'):
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND,
detail='User not found or password not set.')
if not verify_secure_hash_string(string=body.current_password, string_hash=rec['password']):
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN,
detail='Current password is incorrect.')
update_data = {
'id': db_user_id,
'password': secure_hash_string(string=body.new_password),
'password_set_on': datetime.datetime.utcnow(),
}
if not sql_update(table_name='user', data=update_data):
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail='Password update failed.')
return mk_resp(data=True, status_message='Password changed successfully.')
@router.get('/{user_id}/new_auth_key', response_model=Resp_Body_Base)
async def action_new_auth_key(
user_id: str = Path(min_length=11, max_length=22),
account: AccountContext = Depends(get_account_context),
):
"""
Generate a new one-time-use auth key for the user.
The key is written to the DB and returned in the response body.
The user record must have allow_auth_key=1 for the key to be usable
with the /authenticate endpoint.
"""
import secrets
db_user_id = redis_lookup_id_random(record_id_random=user_id, table_name='user')
if not db_user_id:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail='User not found.')
new_key = secrets.token_urlsafe(default_num_bytes)
update_data = {'id': db_user_id, 'auth_key': new_key}
if not sql_update(table_name='user', data=update_data):
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail='Failed to write auth key.')
return mk_resp(data={'auth_key': new_key}, status_message='New auth key generated.')
@router.get('/{user_id}/email_auth_key_url', response_model=Resp_Body_Base)
async def action_email_auth_key_url(
user_id: str = Path(min_length=11, max_length=22),
root_url: str = Query(..., min_length=10, max_length=200),
key_param_name: str = Query('auth_key', min_length=2, max_length=30),
account: AccountContext = Depends(get_account_context),
):
"""
Generate a new auth key and email a one-time login URL to the user.
root_url is the base URL the login link will be built from.
key_param_name controls the query param name used for the auth key in the link (default: auth_key).
Returns data=True on success (email sent), 500 if delivery failed.
"""
db_user_id = redis_lookup_id_random(record_id_random=user_id, table_name='user')
if not db_user_id:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail='User not found.')
result = email_user_auth_key_url(
account_id=account.account_id,
user_id=db_user_id,
root_url=root_url,
key_param_name=key_param_name,
)
if not result:
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail='Auth key email could not be sent. Check account email config and user enable status.')
return mk_resp(data=True, status_message='Auth key email sent.')