Saving things while they work again!!! Still working on adding a special exception or something for site domain search.

This commit is contained in:
Scott Idem
2026-01-07 16:25:18 -05:00
parent cf96d93246
commit caf2868d02
5 changed files with 489 additions and 786 deletions

View File

@@ -7,7 +7,6 @@ that are relevant to the v3 API, while removing unused or outdated functionaliti
# Standard library imports
import time
import logging
import jwt
from typing import (
Any,
Dict,
@@ -31,146 +30,70 @@ from pydantic import (
BaseModel,
Field,
ValidationError,
computed_field,
model_validator,
)
# Internal imports (from this project)
from app.config import settings
from app.db_sql import redis_lookup_id_random
from app.log import get_logger
logger = logging.getLogger(__name__)
logger = get_logger(__name__)
def decode_jwt(
secret_key: str,
token: str,
) -> dict:
"""
Decodes and validates a JWT token.
Ported from lib_general.py to break circular dependencies.
"""
algorithm = 'HS256'
try:
decoded_token = jwt.decode(token, secret_key, algorithms=[algorithm])
if decoded_token['eat'] >= time.time():
return decoded_token
else:
return False
except Exception:
return None
# --- Pydantic Model for Account Context ---
class AccountContext(BaseModel):
account_id: Optional[int]
account_id_random: Optional[str]
# --- Pydantic Model for Authentication Context ---
class AuthContext(BaseModel):
account_id: Optional[int] = None
account_id_random: Optional[str] = None
user_id: Optional[int] = None
person_id: Optional[int] = None
administrator: bool = False
manager: bool = False
super: bool = False
auth_method: str = 'none' # 'jwt_header', 'jwt_query', 'legacy_header', 'bypass'
# Alias for backward compatibility with initial V3 implementation
AccountContext = AuthContext
# --- Dependency Function for V3 Authentication ---
def get_v3_auth_context(
request: Request,
authorization: Optional[str] = Header(None, description="Bearer <jwt_token>"),
jwt_query: Optional[str] = Query(None, alias="jwt", description="JWT token for URL-based auth (e.g., file downloads)"),
x_account_id: Optional[str] = Header(None, min_length=11, max_length=22, description="Legacy X-Account-ID header"),
x_no_account_id: Optional[str] = Header(None, min_length=3, max_length=100, description="Bypass account context header"),
) -> AuthContext:
"""
Standardized V3 Authentication Dependency.
Supports JWT in Authorization header (Bearer) OR 'jwt' query parameter.
Falls back to legacy headers for backward compatibility.
"""
# Defer imports to break circular dependency
from app.db_sql import redis_lookup_id_random
from app.methods.user_methods import load_user_obj
# 1. Check for JWT (Header preferred, then Query for downloads)
token = None
method = 'none'
if authorization and authorization.startswith("Bearer "):
token = authorization.split(" ")[1]
method = 'jwt_header'
elif jwt_query:
token = jwt_query
method = 'jwt_query'
if token:
payload = decode_jwt(settings.JWT_KEY, token)
if payload:
logger.info(f"JWT Validated ({method}). User: {payload.get('user_id')}, Account: {payload.get('account_id')}")
# Initialize AuthContext
ctx = AuthContext(
account_id=payload.get('account_id'),
account_id_random=payload.get('public_key'), # existing sign_jwt uses public_key for id_random
user_id=payload.get('user_id'),
person_id=payload.get('person_id'),
auth_method=method
)
# Populate roles if user_id is present
if ctx.user_id:
try:
user = load_user_obj(user_id=ctx.user_id)
if user:
ctx.administrator = getattr(user, 'administrator', False)
ctx.manager = getattr(user, 'manager', False)
ctx.super = getattr(user, 'super', False)
except Exception as e:
logger.warning(f"Failed to load user roles for user {ctx.user_id}: {e}")
return ctx
else:
logger.warning(f"Invalid or expired JWT provided via {method}")
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid or expired authentication token.")
# 2. Legacy / Testing Fallback: x_account_id
if x_account_id:
if looked_up_id := redis_lookup_id_random(table_name='account', record_id_random=x_account_id):
logger.info(f"Authenticated via legacy header: {looked_up_id}")
return AuthContext(
account_id=looked_up_id,
account_id_random=x_account_id,
auth_method='legacy_header'
)
else:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Invalid X-Account-ID header.")
# 3. Bypass Fallback
if x_no_account_id:
logger.info("Authentication bypassed via X-No-Account-ID")
return AuthContext(
account_id_random='--- NO ACCOUNT ---',
auth_method='bypass',
administrator=True, # Bypass usually implies admin for dev/utility
manager=True,
super=True
)
# 4. No Auth Found
logger.warning("No authentication provided for V3 endpoint.")
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Authentication required. Provide Authorization header or 'jwt' query parameter."
)
# --- Legacy wrapper to avoid breaking current V3 code ---
# --- Dependency Function for Account Context ---
def get_account_context(
auth: AuthContext = Depends(get_v3_auth_context)
) -> AuthContext:
x_account_id: Optional[str] = Header(None, min_length=11, max_length=22),
x_no_account_id: Optional[str] = Header(None, min_length=3, max_length=100), # Assuming 'bypass' or similar string
x_no_account_id_token: Optional[str] = Query(None, min_length=11, max_length=22),
) -> AccountContext:
"""
Alias for the new auth dependency to maintain compatibility
with existing V3 routes.
Resolves the account context from headers/query parameters with defined precedence.
Precedence: x_account_id (header) > x_no_account_id_token (query) > x_no_account_id (header flag)
Raises HTTPException 403 if no valid account is found and no bypass is indicated.
"""
return auth
logger.setLevel(logging.DEBUG) # Adjust as needed
logger.debug(locals())
resolved_account_id = None
resolved_account_id_random = None
if x_account_id:
# Primary check: x_account_id header
resolved_account_id_random = x_account_id
if looked_up_id := redis_lookup_id_random(table_name='account', record_id_random=x_account_id):
resolved_account_id = looked_up_id
logger.info(f'Found account from x_account_id header: {resolved_account_id}')
else:
logger.warning(f'Invalid x_account_id header provided: {x_account_id}')
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail='Invalid X-Account-ID header.')
elif x_no_account_id_token:
# Secondary check: x_no_account_id_token query parameter
resolved_account_id_random = x_no_account_id_token
if looked_up_id := redis_lookup_id_random(table_name='account', record_id_random=x_no_account_id_token):
resolved_account_id = looked_up_id
logger.info(f'Found account from x_no_account_id_token query: {resolved_account_id}')
else:
logger.warning(f'Invalid x_no_account_id_token query provided: {x_no_account_id_token}')
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail='Invalid X-No-Account-ID-Token query parameter.')
elif x_no_account_id:
# Tertiary check: x_no_account_id header for bypass
# For now, just presence indicates bypass. Can add a specific value check later if needed.
logger.info(f'X-No-Account-ID header found: {x_no_account_id}. Proceeding without specific account context.')
resolved_account_id = None # Explicitly None for "no specific account"
resolved_account_id_random = '--- NO ACCOUNT ---'
else:
logger.warning('No valid account context provided via X-Account-ID, X-No-Account-ID-Token, or X-No-Account-ID.')
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail='Account context required. Please provide X-Account-ID, X-No-Account-ID-Token, or X-No-Account-ID.')
return AccountContext(account_id=resolved_account_id, account_id_random=resolved_account_id_random)
# --- Pydantic Model for Pagination ---

View File

@@ -123,12 +123,12 @@ app.include_router(
)
# Deferred import to avoid circular dependencies and ensure environment is ready
from app.routers import agent_bridge
app.include_router(
agent_bridge.router,
prefix='/agent',
tags=['Agent Bridge'],
)
# from app.routers import agent_bridge
# app.include_router(
# agent_bridge.router,
# prefix='/agent',
# tags=['Agent Bridge'],
# )
app.include_router(
api.router,

File diff suppressed because it is too large Load Diff