- New /settings/tools page: max_risk selector (low/medium/high) + per-tool override dropdowns (Default / Force include / Force exclude) for all 58 tools grouped by category with color-coded risk badges; JS updates Auto status live - get_tools_for_role() + get_openai_tools_for_role() now accept max_risk, whitelist, blacklist; _apply_risk_policy() handles the filtering logic - get_risk_policy() helper in auth_utils reads from tool_policy.json - Risk policy wired through orchestrator.py, openai_orchestrator.py, orchestrator_engine.py, nextcloud_talk.py, homeassistant.py - Tools nav link added to settings.html and notifications.html - CLAUDE.md and ARCH__SYSTEM.md updated: tool count 50→58, risk system docs, tool access control three-layer model documented Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
261 lines
8.7 KiB
Python
261 lines
8.7 KiB
Python
"""
|
|
Authentication utilities — password hashing and JWT session tokens.
|
|
|
|
Passwords are stored as bcrypt hashes in home/{username}/auth.json.
|
|
Sessions are JWT cookies signed with JWT_SECRET from settings.
|
|
|
|
Usage:
|
|
set_password("scott", "mypassword") # admin setup
|
|
check_credentials("scott", "mypassword") # login validation
|
|
create_token("scott") # returns JWT string
|
|
decode_token(token) # returns username or raises
|
|
"""
|
|
|
|
import json
|
|
import logging
|
|
import secrets
|
|
from datetime import datetime, timedelta, timezone
|
|
from pathlib import Path
|
|
|
|
import bcrypt
|
|
import jwt
|
|
|
|
from config import settings
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
COOKIE_NAME = "cortex_session"
|
|
ALGORITHM = "HS256"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# auth.json helpers — read/write without clobbering unrelated fields
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _auth_path(username: str) -> Path:
|
|
return settings.home_root() / username / "auth.json"
|
|
|
|
|
|
def _read_auth(username: str) -> dict:
|
|
path = _auth_path(username)
|
|
if not path.exists():
|
|
return {}
|
|
try:
|
|
return json.loads(path.read_text())
|
|
except Exception:
|
|
return {}
|
|
|
|
|
|
def _write_auth(username: str, data: dict) -> None:
|
|
path = _auth_path(username)
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
path.write_text(json.dumps(data, indent=2) + "\n")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Password helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def set_password(username: str, password: str) -> None:
|
|
"""Hash and store a password. Preserves any existing fields in auth.json."""
|
|
data = _read_auth(username)
|
|
data["password_hash"] = bcrypt.hashpw(password.encode(), bcrypt.gensalt()).decode()
|
|
_write_auth(username, data)
|
|
logger.info("password set for user: %s", username)
|
|
|
|
|
|
def check_credentials(username: str, password: str) -> bool:
|
|
"""Return True if username+password are valid, False otherwise."""
|
|
try:
|
|
stored = _read_auth(username).get("password_hash", "").encode()
|
|
if not stored:
|
|
return False
|
|
return bcrypt.checkpw(password.encode(), stored)
|
|
except Exception:
|
|
return False
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Google OAuth helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def find_user_by_google(sub: str, email: str) -> str | None:
|
|
"""
|
|
Scan all users for one whose auth.json matches the given Google sub or email.
|
|
Sub match takes priority (stable); email match is a fallback for first sign-in.
|
|
Returns the username, or None if no match.
|
|
"""
|
|
root = settings.home_root()
|
|
if not root.exists():
|
|
return None
|
|
for user_dir in sorted(root.iterdir()):
|
|
if not user_dir.is_dir():
|
|
continue
|
|
data = _read_auth(user_dir.name)
|
|
if not data:
|
|
continue
|
|
if sub and data.get("google_sub") == sub:
|
|
return user_dir.name
|
|
if email and data.get("google_email", "").lower() == email.lower():
|
|
return user_dir.name
|
|
return None
|
|
|
|
|
|
def link_google(username: str, sub: str, email: str) -> None:
|
|
"""Store / update Google sub and email in a user's auth.json."""
|
|
data = _read_auth(username)
|
|
data["google_sub"] = sub
|
|
data["google_email"] = email
|
|
_write_auth(username, data)
|
|
logger.info("Google account linked for user: %s (%s)", username, email)
|
|
|
|
|
|
def get_user_gemini_key(username: str) -> str | None:
|
|
"""Return the user's personal Gemini API key, or None to use the server key."""
|
|
return _read_auth(username).get("gemini_api_key") or None
|
|
|
|
|
|
def get_user_role(username: str) -> str:
|
|
"""Return the user's role: 'admin' or 'user' (default).
|
|
|
|
Role is stored as auth.json["role"]. Any unrecognised value falls back to 'user'.
|
|
Set via: manage_passwords.py role <username> admin|user
|
|
"""
|
|
role = _read_auth(username).get("role", "user")
|
|
return role if role in ("admin", "user") else "user"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# JWT helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def create_token(username: str) -> str:
|
|
"""Return a signed JWT encoding the username."""
|
|
expire = datetime.now(timezone.utc) + timedelta(days=settings.jwt_expire_days)
|
|
payload = {"sub": username, "exp": expire}
|
|
return jwt.encode(payload, settings.jwt_secret, algorithm=ALGORITHM)
|
|
|
|
|
|
def decode_token(token: str) -> str:
|
|
"""Decode a JWT and return the username. Raises jwt.InvalidTokenError on failure."""
|
|
payload = jwt.decode(token, settings.jwt_secret, algorithms=[ALGORITHM])
|
|
return payload["sub"]
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Invite tokens — one-time setup links for new users
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _invite_path(username: str) -> Path:
|
|
return settings.home_root() / username / "invite.json"
|
|
|
|
|
|
def create_invite(username: str, expire_hours: int = 72) -> str:
|
|
"""
|
|
Generate a one-time invite token for a user and save it to invite.json.
|
|
Returns the raw token string (embed in a URL).
|
|
"""
|
|
token = secrets.token_urlsafe(32)
|
|
expires = (datetime.now(timezone.utc) + timedelta(hours=expire_hours)).isoformat()
|
|
user_dir = settings.home_root() / username
|
|
user_dir.mkdir(parents=True, exist_ok=True)
|
|
_invite_path(username).write_text(
|
|
json.dumps({"token": token, "expires_at": expires, "used": False}) + "\n"
|
|
)
|
|
logger.info("invite created for user: %s (expires %s)", username, expires[:10])
|
|
return token
|
|
|
|
|
|
def validate_invite(token: str) -> str | None:
|
|
"""
|
|
Check an invite token across all users.
|
|
Returns the username if valid and unused, None otherwise.
|
|
"""
|
|
root = settings.home_root()
|
|
if not root.exists():
|
|
return None
|
|
for user_dir in root.iterdir():
|
|
if not user_dir.is_dir():
|
|
continue
|
|
inv_path = user_dir / "invite.json"
|
|
if not inv_path.exists():
|
|
continue
|
|
try:
|
|
data = json.loads(inv_path.read_text())
|
|
except Exception:
|
|
continue
|
|
if data.get("used"):
|
|
continue
|
|
if data.get("token") != token:
|
|
continue
|
|
expires = datetime.fromisoformat(data["expires_at"])
|
|
if datetime.now(timezone.utc) > expires:
|
|
continue
|
|
return user_dir.name
|
|
return None
|
|
|
|
|
|
def consume_invite(username: str) -> None:
|
|
"""Mark the invite token for a user as used."""
|
|
path = _invite_path(username)
|
|
if path.exists():
|
|
try:
|
|
data = json.loads(path.read_text())
|
|
data["used"] = True
|
|
path.write_text(json.dumps(data) + "\n")
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Per-user channel config
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _channels_path(username: str) -> Path:
|
|
return settings.home_root() / username / "channels.json"
|
|
|
|
|
|
def get_user_channels(username: str) -> dict:
|
|
"""Return the parsed channels.json for a user, or {} if not found."""
|
|
path = _channels_path(username)
|
|
if not path.exists():
|
|
return {}
|
|
try:
|
|
return json.loads(path.read_text())
|
|
except Exception:
|
|
return {}
|
|
|
|
|
|
def get_tool_policy(username: str) -> dict:
|
|
"""Return the parsed tool_policy.json for a user.
|
|
|
|
Confirmation-gate keys (existing):
|
|
allow — tools in CONFIRM_REQUIRED that this user has pre-approved (skip gate)
|
|
deny — tools always blocked for this user regardless of global CONFIRM_REQUIRED
|
|
|
|
Risk-policy keys (new):
|
|
max_risk — auto-include all tools at/below this level ("low"|"medium"|"high")
|
|
whitelist — force-include specific tools above max_risk
|
|
blacklist — force-exclude specific tools regardless of max_risk
|
|
"""
|
|
path = settings.home_root() / username / "tool_policy.json"
|
|
try:
|
|
return json.loads(path.read_text())
|
|
except Exception:
|
|
return {}
|
|
|
|
|
|
def get_risk_policy(username: str) -> tuple[str | None, list[str], list[str]]:
|
|
"""Return (max_risk, whitelist, blacklist) from the user's tool policy."""
|
|
policy = get_tool_policy(username)
|
|
return (
|
|
policy.get("max_risk") or None,
|
|
policy.get("whitelist") or [],
|
|
policy.get("blacklist") or [],
|
|
)
|
|
|
|
|
|
def save_tool_policy(username: str, data: dict) -> None:
|
|
path = settings.home_root() / username / "tool_policy.json"
|
|
path.write_text(json.dumps(data, indent=2) + "\n")
|