Implement V3 API security hardening and multi-tenant data isolation

- Enhanced AuthContext with role-aware fields (administrator, manager, super).
- Implemented deferred database lookups for user roles in get_v3_auth_context.
- Added global account isolation in api_crud_v3.py using check_account_access and apply_forced_account_filter.
- Hardened all V3 CRUD endpoints (GET, POST, PATCH, DELETE) and nested routes with ownership verification.
- Enforced forced account filtering at the SQL level for Listing and Searching.
- Updated documentation with details on the new security and data isolation architecture.
This commit is contained in:
Scott Idem
2026-01-07 13:34:38 -05:00
parent 270712f905
commit 6d13b952c4
3 changed files with 157 additions and 5 deletions

View File

@@ -64,6 +64,9 @@ class AuthContext(BaseModel):
account_id_random: Optional[str] = None
user_id: Optional[int] = None
person_id: Optional[int] = None
administrator: bool = False
manager: bool = False
super: bool = False
auth_method: str = 'none' # 'jwt_header', 'jwt_query', 'legacy_header', 'bypass'
# Alias for backward compatibility with initial V3 implementation
@@ -83,8 +86,9 @@ def get_v3_auth_context(
Supports JWT in Authorization header (Bearer) OR 'jwt' query parameter.
Falls back to legacy headers for backward compatibility.
"""
# Defer import to break circular dependency
# Defer imports to break circular dependency
from app.db_sql import redis_lookup_id_random
from app.methods.user_methods import load_user_obj
# 1. Check for JWT (Header preferred, then Query for downloads)
token = None
@@ -101,13 +105,28 @@ def get_v3_auth_context(
payload = decode_jwt(settings.JWT_KEY, token)
if payload:
logger.info(f"JWT Validated ({method}). User: {payload.get('user_id')}, Account: {payload.get('account_id')}")
return AuthContext(
# Initialize AuthContext
ctx = AuthContext(
account_id=payload.get('account_id'),
account_id_random=payload.get('public_key'), # existing sign_jwt uses public_key for id_random
user_id=payload.get('user_id'),
person_id=payload.get('person_id'),
auth_method=method
)
# Populate roles if user_id is present
if ctx.user_id:
try:
user = load_user_obj(user_id=ctx.user_id)
if user:
ctx.administrator = getattr(user, 'administrator', False)
ctx.manager = getattr(user, 'manager', False)
ctx.super = getattr(user, 'super', False)
except Exception as e:
logger.warning(f"Failed to load user roles for user {ctx.user_id}: {e}")
return ctx
else:
logger.warning(f"Invalid or expired JWT provided via {method}")
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid or expired authentication token.")
@@ -129,7 +148,10 @@ def get_v3_auth_context(
logger.info("Authentication bypassed via X-No-Account-ID")
return AuthContext(
account_id_random='--- NO ACCOUNT ---',
auth_method='bypass'
auth_method='bypass',
administrator=True, # Bypass usually implies admin for dev/utility
manager=True,
super=True
)
# 4. No Auth Found