Files
OSIT-AE-API-FastAPI/app/lib_jwt.py
Scott Idem 43ac62b561 feat(auth): consolidate and secure V3 authentication flow
- Re-apply safe guest auth and passcode-to-JWT endpoint
- Consolidate AccountContext with token_payload and role flags
- Restore documentation for new guest flows and public read whitelists
- Fix 403 error in get_obj_li by allowing optional account context
2026-01-20 18:42:43 -05:00

83 lines
2.7 KiB
Python

import jwt
import time
import logging
from typing import Dict, Optional
from app.log import logger_reset
log = logging.getLogger(__name__)
# ### BEGIN ### API Lib JWT ### sign_jwt() ###
# Moved from lib_general.py 2026-01-07
@logger_reset
def sign_jwt(
secret_key: str, # Secret/Private/Password
ttl: int = 60, # Default to 60 seconds
max_renew: int = 0, # Default to 0
public_key: str = None, # Will be part of the token. Use to look up secret when verifying.???
account_id: str = None,
person_id: str = None,
user_id: str = None,
json_str: str = None,
b64_str: str = None,
**kwargs # Allow arbitrary claims
) -> str:
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
# SECURITY CHECK: Ensure we are not signing numeric IDs
for label, val in [('account_id', account_id), ('person_id', person_id), ('user_id', user_id)]:
if val is not None:
if isinstance(val, int) or (isinstance(val, str) and val.isdigit()):
log.critical(f"SECURITY BREACH: Attempted to sign a numeric ID for {label}='{val}'. Only random string IDs allowed.")
# For now we log and proceed, but in Phase 3 we should raise an Exception
# raise ValueError(f"Numeric IDs cannot be signed in JWTs.")
payload = {
'iat': time.time(), # Issued at
'eat': time.time() + ttl, # Expires at
'max_renew': max_renew, # Number of times allowed to request renew without API secret key
'public_key': public_key, # Use to lookup the secret/private/password key when verifying
'account_id': account_id,
'person_id': person_id,
'user_id': user_id,
'json_str': json_str,
'b64_str': b64_str,
}
# Merge additional claims
if kwargs:
payload.update(kwargs)
secret = secret_key
algorithm = 'HS256'
token = jwt.encode(payload, secret, algorithm=algorithm)
log.debug(token)
return token
# ### END ### API Lib JWT ### sign_jwt() ###
# ### BEGIN ### API Lib JWT ### decode_jwt() ###
# Moved from lib_general.py 2026-01-07
@logger_reset
def decode_jwt(
secret_key: str,
token: str,
) -> Optional[dict]:
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
secret = secret_key
algorithm = 'HS256'
try:
decoded_token = jwt.decode(token, secret, algorithms=[algorithm])
log.debug(decoded_token)
if decoded_token['eat'] >= time.time(): return decoded_token
else: return False
except:
return None
# ### END ### API Lib JWT ### decode_jwt() ###