77 lines
2.6 KiB
Python
77 lines
2.6 KiB
Python
import jwt
|
|
import time
|
|
import logging
|
|
from typing import Dict, Optional
|
|
|
|
from app.log import logger_reset
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
# ### BEGIN ### API Lib JWT ### sign_jwt() ###
|
|
# Moved from lib_general.py 2026-01-07
|
|
@logger_reset
|
|
def sign_jwt(
|
|
secret_key: str, # Secret/Private/Password
|
|
ttl: int = 60, # Default to 60 seconds
|
|
max_renew: int = 0, # Default to 0
|
|
public_key: str = None, # Will be part of the token. Use to look up secret when verifying.???
|
|
account_id: str = None,
|
|
person_id: str = None,
|
|
user_id: str = None,
|
|
json_str: str = None,
|
|
b64_str: str = None,
|
|
) -> str:
|
|
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
|
|
log.debug(locals())
|
|
|
|
# SECURITY CHECK: Ensure we are not signing numeric IDs
|
|
for label, val in [('account_id', account_id), ('person_id', person_id), ('user_id', user_id)]:
|
|
if val is not None:
|
|
if isinstance(val, int) or (isinstance(val, str) and val.isdigit()):
|
|
log.critical(f"SECURITY BREACH: Attempted to sign a numeric ID for {label}='{val}'. Only random string IDs allowed.")
|
|
# For now we log and proceed, but in Phase 3 we should raise an Exception
|
|
# raise ValueError(f"Numeric IDs cannot be signed in JWTs.")
|
|
|
|
payload = {
|
|
'iat': time.time(), # Issued at
|
|
'eat': time.time() + ttl, # Expires at
|
|
'max_renew': max_renew, # Number of times allowed to request renew without API secret key
|
|
'public_key': public_key, # Use to lookup the secret/private/password key when verifying
|
|
'account_id': account_id,
|
|
'person_id': person_id,
|
|
'user_id': user_id,
|
|
'json_str': json_str,
|
|
'b64_str': b64_str,
|
|
}
|
|
secret = secret_key
|
|
algorithm = 'HS256'
|
|
token = jwt.encode(payload, secret, algorithm=algorithm)
|
|
|
|
log.debug(token)
|
|
|
|
return token
|
|
# ### END ### API Lib JWT ### sign_jwt() ###
|
|
|
|
|
|
# ### BEGIN ### API Lib JWT ### decode_jwt() ###
|
|
# Moved from lib_general.py 2026-01-07
|
|
@logger_reset
|
|
def decode_jwt(
|
|
secret_key: str,
|
|
token: str,
|
|
) -> Optional[dict]:
|
|
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
|
|
log.debug(locals())
|
|
|
|
secret = secret_key
|
|
algorithm = 'HS256'
|
|
|
|
try:
|
|
decoded_token = jwt.decode(token, secret, algorithms=[algorithm])
|
|
log.debug(decoded_token)
|
|
if decoded_token['eat'] >= time.time(): return decoded_token
|
|
else: return False
|
|
except:
|
|
return None
|
|
# ### END ### API Lib JWT ### decode_jwt() ###
|