feat(auth): implement site-based passcode-to-JWT endpoint

- Add POST /api/authenticate_passcode to verify site access codes
- Refactor sign_jwt to support arbitrary role flags (super, admin, etc.)
- Update dependencies_v3 to extract role flags from JWT payloads
- Add E2E test for passcode auth verification
This commit is contained in:
Scott Idem
2026-01-20 17:51:54 -05:00
parent e16fbaa34b
commit d4e46a4a97
4 changed files with 183 additions and 3 deletions

View File

@@ -20,6 +20,7 @@ def sign_jwt(
user_id: str = None,
json_str: str = None,
b64_str: str = None,
**kwargs # Allow arbitrary claims (e.g. administrator, manager, super)
) -> str:
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
@@ -43,6 +44,11 @@ def sign_jwt(
'json_str': json_str,
'b64_str': b64_str,
}
# Merge any additional claims provided via kwargs
if kwargs:
payload.update(kwargs)
secret = secret_key
algorithm = 'HS256'
token = jwt.encode(payload, secret, algorithm=algorithm)

View File

@@ -2,6 +2,7 @@ from fastapi import APIRouter, Body, Depends, Header, HTTPException, Path, Query
from pydantic import BaseModel, EmailStr, Field
from typing import Dict, List, Optional, Set, Union
from sqlalchemy import text
import json
from app.db_connection import db
from app.lib_general import log, logging, sign_jwt, decode_jwt, common_route_params, Common_Route_Params, common_route_params_min, Common_Route_Params_Min
@@ -17,6 +18,90 @@ from app.models.response_models import Resp_Body_Base, mk_resp
router = APIRouter()
class PasscodeAuthRequest(BaseModel):
"""Request model for site-based passcode authentication."""
site_id: str = Field(..., description="The random string ID of the site")
passcode: str = Field(..., description="The passcode to verify")
@router.post('/authenticate_passcode', response_model=Resp_Body_Base)
async def authenticate_passcode(
auth_req: PasscodeAuthRequest,
response: Response = Response,
):
"""
Passcode-to-JWT Endpoint.
Verifies a passcode against site.access_code_kv_json.
Returns a signed JWT with the site's account context and role flags.
"""
from app.db_sql import get_id_random
log.setLevel(logging.DEBUG)
log.debug(locals())
site_id = auth_req.site_id
passcode = auth_req.passcode
# 1. Look up the site record
search_data = {'id_random': site_id}
if record := sql_select(table_name='site', data=search_data):
log.debug(f"Record found for site {site_id}")
# 2. Parse access codes
access_codes_raw = record.get('access_code_kv_json')
log.debug(f"Access Codes Raw: {access_codes_raw}")
access_codes = {}
if access_codes_raw:
try:
access_codes = json.loads(access_codes_raw) if isinstance(access_codes_raw, str) else access_codes_raw
except Exception as e:
log.error(f"Failed to parse access_code_kv_json for site {site_id}: {e}")
# 3. Verify Passcode and Resolve Role
matched_role = None
for role, code in access_codes.items():
if str(code) == str(passcode):
matched_role = role
break
if matched_role:
log.info(f"Auth Success: Verified '{matched_role}' passcode for site {site_id}")
# 4. Resolve Account Context (Ensure it is the random string ID)
account_id_random = record.get('account_id_random')
if not account_id_random:
# If the record came from the physical table, it only has account_id (int)
if account_id_int := record.get('account_id'):
account_id_random = get_id_random(record_id=account_id_int, table_name='account')
# 5. Mint JWT
payload = {
'account_id': account_id_random,
'administrator': (matched_role == 'administrator'),
'manager': (matched_role == 'manager'),
'super': (matched_role == 'super'),
'json_str': json.dumps({
'auth_type': 'passcode',
'site_id': site_id,
'role': matched_role
})
}
token = sign_jwt(
secret_key=settings.JWT_KEY,
ttl=3600 * 24, # 24 hour session
**payload
)
return mk_resp(data={'jwt': token, 'account_id': account_id_random, 'role': matched_role}, response=response)
else:
log.warning(f"Auth Failed: Invalid passcode for site {site_id}. Provided: {passcode}")
return mk_resp(data=False, status_code=401, response=response, status_message="Invalid passcode.")
else:
log.warning(f"Auth Failed: Site {site_id} not found.")
return mk_resp(data=False, status_code=404, response=response, status_message="Site not found.")
# ### BEGIN ### API API ### request_jwt() ###
# This can be used to generate JWTs for various purposes:
# * for end client browser API access

View File

@@ -89,13 +89,22 @@ def get_account_context_optional(
resolved_account_id_random = '--- NO ACCOUNT ---'
auth_method = 'bypass'
is_admin = (auth_method == 'bypass')
is_manager = (auth_method == 'bypass')
is_super = (auth_method == 'bypass')
if resolved_token_payload:
if resolved_token_payload.get('administrator'): is_admin = True
if resolved_token_payload.get('manager'): is_manager = True
if resolved_token_payload.get('super'): is_super = True
return AccountContext(
account_id=resolved_account_id,
account_id_random=resolved_account_id_random,
auth_method=auth_method,
administrator=(auth_method == 'bypass'),
manager=(auth_method == 'bypass'),
super=(auth_method == 'bypass'),
administrator=is_admin,
manager=is_manager,
super=is_super,
token_payload=resolved_token_payload
)

View File

@@ -0,0 +1,80 @@
import requests
import json
import jwt
import sys
# Configuration
BASE_URL = "https://dev-api.oneskyit.com"
SITE_ID = "ltOdfNtjZLo" # Found from DB
VALID_PASSCODE = "10241024" # 'super' role for this site
INVALID_PASSCODE = "wrong-code-123"
def test_passcode_authentication():
print(f"\n--- Testing Passcode Authentication for Site: {SITE_ID} ---")
url = f"{BASE_URL}/api/authenticate_passcode"
payload = {
"site_id": SITE_ID,
"passcode": VALID_PASSCODE
}
try:
# 1. Test Valid Auth
print(f"[1] Requesting JWT with VALID passcode...")
response = requests.post(url, json=payload)
print(f"Status: {response.status_code}")
if response.status_code != 200:
print(f"❌ Auth Failed! Response: {response.text}")
return None
data = response.json()
token = data.get('data', {}).get('jwt')
role = data.get('data', {}).get('role')
if not token:
print("❌ No token in response.")
return None
print(f"✅ Success! Token received for role: '{role}'")
# 2. Inspect JWT Payload
print("\n[2] Inspecting JWT Payload (Unverified)...")
decoded = jwt.decode(token, options={"verify_signature": False})
print(f"Payload: {json.dumps(decoded, indent=2)}")
# Check for role flags
if decoded.get('super') is True:
print("✅ SUCCESS: 'super' flag is correctly set in JWT.")
else:
print("❌ FAILURE: 'super' flag missing or False in JWT.")
sys.exit(1)
# 3. Test Invalid Auth
print("\n[3] Requesting JWT with INVALID passcode...")
payload_bad = {
"site_id": SITE_ID,
"passcode": INVALID_PASSCODE
}
resp_bad = requests.post(url, json=payload_bad)
print(f"Status: {resp_bad.status_code}")
if resp_bad.status_code == 401:
print("✅ SUCCESS: Invalid passcode correctly rejected (401).")
else:
print(f"❌ FAILURE: Unexpected status for bad passcode: {resp_bad.status_code}")
sys.exit(1)
return token
except Exception as e:
print(f"❌ Error during test: {e}")
return None
if __name__ == "__main__":
token = test_passcode_authentication()
if token:
print("\n🎉 Passcode Authentication E2E Test Passed!")
else:
print("\n❌ Test FAILED.")
sys.exit(1)