feat(auth): add /v3/action/auth/authenticate_passcode endpoint

Moves passcode→JWT auth into the V3 action namespace. The new router
(api_v3_actions_auth.py) is a clean re-registration of the existing
working logic. The legacy /api/authenticate_passcode is kept but marked
deprecated via DeprecationParams while the frontend transitions.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Scott Idem
2026-06-18 15:48:48 -04:00
parent 0bb65efb7c
commit 50b1e00903
3 changed files with 100 additions and 1 deletions

View File

@@ -0,0 +1,97 @@
from fastapi import APIRouter, Response
from pydantic import BaseModel, Field
import json
from app.lib_general import sign_jwt, log
from app.config import settings
from app.db_sql import sql_select, get_id_random
from app.models.response_models import Resp_Body_Base, mk_resp
router = APIRouter()
ROLE_PRIORITY = ['super', 'manager', 'administrator', 'trusted', 'public', 'authenticated']
ROLE_TTL = {
'super': 8 * 3600,
'manager': 24 * 3600,
'administrator': 48 * 3600,
'trusted': 48 * 3600,
'public': 24 * 3600,
'authenticated': 12 * 3600,
}
class PasscodeAuthRequest(BaseModel):
"""Request model for site-based passcode authentication."""
site_id: str = Field(..., description="Random string ID of the site")
passcode: str = Field(..., min_length=5, description="The passcode to verify")
@router.post('/authenticate_passcode', response_model=Resp_Body_Base)
async def authenticate_passcode(
auth_req: PasscodeAuthRequest,
response: Response = Response,
):
"""
Passcode-to-JWT. Verifies a passcode against site.access_code_kv_json.
Returns a signed JWT with the site's account context, full role flags,
and a per-role TTL. jwt.json_str.auth_type='passcode' distinguishes this
token from a user login JWT.
"""
site_id = auth_req.site_id
passcode = auth_req.passcode
search_data = {'id_random': site_id}
if record := sql_select(table_name='site', data=search_data):
access_codes_raw = record.get('access_code_kv_json')
access_codes = {}
if access_codes_raw:
try:
access_codes = json.loads(access_codes_raw) if isinstance(access_codes_raw, str) else access_codes_raw
except Exception as e:
log.error(f"Failed to parse access_code_kv_json for site {site_id}: {e}")
matched_role = None
for role in ROLE_PRIORITY:
code = access_codes.get(role)
if code and str(code) == str(passcode):
matched_role = role
break
if matched_role:
log.info(f"Auth Success: Verified '{matched_role}' passcode for site {site_id}")
account_id_random = record.get('account_id_random')
if not account_id_random:
if account_id_int := record.get('account_id'):
account_id_random = get_id_random(record_id=account_id_int, table_name='account')
payload = {
'account_id': account_id_random,
'super': (matched_role == 'super'),
'manager': (matched_role == 'manager'),
'administrator': (matched_role == 'administrator'),
'trusted': (matched_role == 'trusted'),
'public': (matched_role == 'public'),
'authenticated': (matched_role == 'authenticated'),
'json_str': json.dumps({
'auth_type': 'passcode',
'site_id': site_id,
'role': matched_role
})
}
token = sign_jwt(
secret_key=settings.JWT_KEY,
ttl=ROLE_TTL[matched_role],
**payload
)
return mk_resp(
data={'jwt': token, 'account_id': account_id_random, 'role': matched_role},
response=response
)
else:
log.warning(f"Auth Failed: Invalid passcode for site {site_id}")
return mk_resp(data=False, status_code=401, response=response, status_message="Invalid passcode.")
else:
log.warning(f"Auth Failed: Site {site_id} not found.")
return mk_resp(data=False, status_code=404, response=response, status_message="Site not found.")