From d4e46a4a97adc10cefb114d796e629ed1a9e77ba Mon Sep 17 00:00:00 2001 From: Scott Idem Date: Tue, 20 Jan 2026 17:51:54 -0500 Subject: [PATCH] feat(auth): implement site-based passcode-to-JWT endpoint - Add POST /api/authenticate_passcode to verify site access codes - Refactor sign_jwt to support arbitrary role flags (super, admin, etc.) - Update dependencies_v3 to extract role flags from JWT payloads - Add E2E test for passcode auth verification --- app/lib_jwt.py | 6 ++ app/routers/api.py | 85 +++++++++++++++++++++++++++++ app/routers/dependencies_v3.py | 15 ++++- tests/e2e/test_e2e_passcode_auth.py | 80 +++++++++++++++++++++++++++ 4 files changed, 183 insertions(+), 3 deletions(-) create mode 100644 tests/e2e/test_e2e_passcode_auth.py diff --git a/app/lib_jwt.py b/app/lib_jwt.py index 0759945..4c83396 100644 --- a/app/lib_jwt.py +++ b/app/lib_jwt.py @@ -20,6 +20,7 @@ def sign_jwt( user_id: str = None, json_str: str = None, b64_str: str = None, + **kwargs # Allow arbitrary claims (e.g. administrator, manager, super) ) -> str: log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) @@ -43,6 +44,11 @@ def sign_jwt( 'json_str': json_str, 'b64_str': b64_str, } + + # Merge any additional claims provided via kwargs + if kwargs: + payload.update(kwargs) + secret = secret_key algorithm = 'HS256' token = jwt.encode(payload, secret, algorithm=algorithm) diff --git a/app/routers/api.py b/app/routers/api.py index d5636a7..7fa2645 100644 --- a/app/routers/api.py +++ b/app/routers/api.py @@ -2,6 +2,7 @@ from fastapi import APIRouter, Body, Depends, Header, HTTPException, Path, Query from pydantic import BaseModel, EmailStr, Field from typing import Dict, List, Optional, Set, Union from sqlalchemy import text +import json from app.db_connection import db from app.lib_general import log, logging, sign_jwt, decode_jwt, common_route_params, Common_Route_Params, common_route_params_min, Common_Route_Params_Min @@ -17,6 +18,90 @@ from app.models.response_models import Resp_Body_Base, mk_resp router = APIRouter() +class PasscodeAuthRequest(BaseModel): + """Request model for site-based passcode authentication.""" + site_id: str = Field(..., description="The random string ID of the site") + passcode: str = Field(..., description="The passcode to verify") + + +@router.post('/authenticate_passcode', response_model=Resp_Body_Base) +async def authenticate_passcode( + auth_req: PasscodeAuthRequest, + response: Response = Response, + ): + """ + Passcode-to-JWT Endpoint. + Verifies a passcode against site.access_code_kv_json. + Returns a signed JWT with the site's account context and role flags. + """ + from app.db_sql import get_id_random + log.setLevel(logging.DEBUG) + log.debug(locals()) + + site_id = auth_req.site_id + passcode = auth_req.passcode + + # 1. Look up the site record + search_data = {'id_random': site_id} + if record := sql_select(table_name='site', data=search_data): + log.debug(f"Record found for site {site_id}") + + # 2. Parse access codes + access_codes_raw = record.get('access_code_kv_json') + log.debug(f"Access Codes Raw: {access_codes_raw}") + + access_codes = {} + if access_codes_raw: + try: + access_codes = json.loads(access_codes_raw) if isinstance(access_codes_raw, str) else access_codes_raw + except Exception as e: + log.error(f"Failed to parse access_code_kv_json for site {site_id}: {e}") + + # 3. Verify Passcode and Resolve Role + matched_role = None + for role, code in access_codes.items(): + if str(code) == str(passcode): + matched_role = role + break + + if matched_role: + log.info(f"Auth Success: Verified '{matched_role}' passcode for site {site_id}") + + # 4. Resolve Account Context (Ensure it is the random string ID) + account_id_random = record.get('account_id_random') + if not account_id_random: + # If the record came from the physical table, it only has account_id (int) + if account_id_int := record.get('account_id'): + account_id_random = get_id_random(record_id=account_id_int, table_name='account') + + # 5. Mint JWT + payload = { + 'account_id': account_id_random, + 'administrator': (matched_role == 'administrator'), + 'manager': (matched_role == 'manager'), + 'super': (matched_role == 'super'), + 'json_str': json.dumps({ + 'auth_type': 'passcode', + 'site_id': site_id, + 'role': matched_role + }) + } + + token = sign_jwt( + secret_key=settings.JWT_KEY, + ttl=3600 * 24, # 24 hour session + **payload + ) + + return mk_resp(data={'jwt': token, 'account_id': account_id_random, 'role': matched_role}, response=response) + else: + log.warning(f"Auth Failed: Invalid passcode for site {site_id}. Provided: {passcode}") + return mk_resp(data=False, status_code=401, response=response, status_message="Invalid passcode.") + else: + log.warning(f"Auth Failed: Site {site_id} not found.") + return mk_resp(data=False, status_code=404, response=response, status_message="Site not found.") + + # ### BEGIN ### API API ### request_jwt() ### # This can be used to generate JWTs for various purposes: # * for end client browser API access diff --git a/app/routers/dependencies_v3.py b/app/routers/dependencies_v3.py index 19c7470..90de6bc 100644 --- a/app/routers/dependencies_v3.py +++ b/app/routers/dependencies_v3.py @@ -89,13 +89,22 @@ def get_account_context_optional( resolved_account_id_random = '--- NO ACCOUNT ---' auth_method = 'bypass' + is_admin = (auth_method == 'bypass') + is_manager = (auth_method == 'bypass') + is_super = (auth_method == 'bypass') + + if resolved_token_payload: + if resolved_token_payload.get('administrator'): is_admin = True + if resolved_token_payload.get('manager'): is_manager = True + if resolved_token_payload.get('super'): is_super = True + return AccountContext( account_id=resolved_account_id, account_id_random=resolved_account_id_random, auth_method=auth_method, - administrator=(auth_method == 'bypass'), - manager=(auth_method == 'bypass'), - super=(auth_method == 'bypass'), + administrator=is_admin, + manager=is_manager, + super=is_super, token_payload=resolved_token_payload ) diff --git a/tests/e2e/test_e2e_passcode_auth.py b/tests/e2e/test_e2e_passcode_auth.py new file mode 100644 index 0000000..8d9b255 --- /dev/null +++ b/tests/e2e/test_e2e_passcode_auth.py @@ -0,0 +1,80 @@ +import requests +import json +import jwt +import sys + +# Configuration +BASE_URL = "https://dev-api.oneskyit.com" +SITE_ID = "ltOdfNtjZLo" # Found from DB +VALID_PASSCODE = "10241024" # 'super' role for this site +INVALID_PASSCODE = "wrong-code-123" + +def test_passcode_authentication(): + print(f"\n--- Testing Passcode Authentication for Site: {SITE_ID} ---") + + url = f"{BASE_URL}/api/authenticate_passcode" + payload = { + "site_id": SITE_ID, + "passcode": VALID_PASSCODE + } + + try: + # 1. Test Valid Auth + print(f"[1] Requesting JWT with VALID passcode...") + response = requests.post(url, json=payload) + print(f"Status: {response.status_code}") + + if response.status_code != 200: + print(f"❌ Auth Failed! Response: {response.text}") + return None + + data = response.json() + token = data.get('data', {}).get('jwt') + role = data.get('data', {}).get('role') + + if not token: + print("❌ No token in response.") + return None + + print(f"✅ Success! Token received for role: '{role}'") + + # 2. Inspect JWT Payload + print("\n[2] Inspecting JWT Payload (Unverified)...") + decoded = jwt.decode(token, options={"verify_signature": False}) + print(f"Payload: {json.dumps(decoded, indent=2)}") + + # Check for role flags + if decoded.get('super') is True: + print("✅ SUCCESS: 'super' flag is correctly set in JWT.") + else: + print("❌ FAILURE: 'super' flag missing or False in JWT.") + sys.exit(1) + + # 3. Test Invalid Auth + print("\n[3] Requesting JWT with INVALID passcode...") + payload_bad = { + "site_id": SITE_ID, + "passcode": INVALID_PASSCODE + } + resp_bad = requests.post(url, json=payload_bad) + print(f"Status: {resp_bad.status_code}") + + if resp_bad.status_code == 401: + print("✅ SUCCESS: Invalid passcode correctly rejected (401).") + else: + print(f"❌ FAILURE: Unexpected status for bad passcode: {resp_bad.status_code}") + sys.exit(1) + + return token + + except Exception as e: + print(f"❌ Error during test: {e}") + return None + +if __name__ == "__main__": + token = test_passcode_authentication() + if token: + print("\n🎉 Passcode Authentication E2E Test Passed!") + else: + print("\n❌ Test FAILED.") + sys.exit(1)