Files
Cortex-Inara/cortex/auth_utils.py
Scott Idem 334e7f0dea feat: role-based tool access, confirmation gates, and new orchestrator tools
- auth_utils: get_user_role() reads role from auth.json (admin|user, default user)
- manage_passwords: new `role` command to promote/demote users (admin-only by convention)
- tools/__init__: TOOL_ROLES map, CONFIRM_REQUIRED set, get_tools_for_role(),
  get_openai_tools_for_role() — both orchestrators now filter tools by caller's role
- tools/system: cortex_restart (detached subprocess, 5s delay), cortex_logs (admin-only)
- tools/web: http_fetch — direct URL fetch, distinct from web_search
- tools/files: file_list (directory listing), file_write (restricted paths, admin-only)
- tools/notify: nc_talk_send — proactive outbound via notification.py
- orchestrator_engine + openai_orchestrator: user_role param; CONFIRM_REQUIRED tools
  return a confirmation-request result instead of executing — loop breaks after Claude
  asks user to confirm in a follow-up message
- home/scott/auth.json: role set to admin

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-29 19:23:53 -04:00

227 lines
7.5 KiB
Python

"""
Authentication utilities — password hashing and JWT session tokens.
Passwords are stored as bcrypt hashes in home/{username}/auth.json.
Sessions are JWT cookies signed with JWT_SECRET from settings.
Usage:
set_password("scott", "mypassword") # admin setup
check_credentials("scott", "mypassword") # login validation
create_token("scott") # returns JWT string
decode_token(token) # returns username or raises
"""
import json
import logging
import secrets
from datetime import datetime, timedelta, timezone
from pathlib import Path
import bcrypt
import jwt
from config import settings
logger = logging.getLogger(__name__)
COOKIE_NAME = "cortex_session"
ALGORITHM = "HS256"
# ---------------------------------------------------------------------------
# auth.json helpers — read/write without clobbering unrelated fields
# ---------------------------------------------------------------------------
def _auth_path(username: str) -> Path:
return settings.home_root() / username / "auth.json"
def _read_auth(username: str) -> dict:
path = _auth_path(username)
if not path.exists():
return {}
try:
return json.loads(path.read_text())
except Exception:
return {}
def _write_auth(username: str, data: dict) -> None:
path = _auth_path(username)
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(data, indent=2) + "\n")
# ---------------------------------------------------------------------------
# Password helpers
# ---------------------------------------------------------------------------
def set_password(username: str, password: str) -> None:
"""Hash and store a password. Preserves any existing fields in auth.json."""
data = _read_auth(username)
data["password_hash"] = bcrypt.hashpw(password.encode(), bcrypt.gensalt()).decode()
_write_auth(username, data)
logger.info("password set for user: %s", username)
def check_credentials(username: str, password: str) -> bool:
"""Return True if username+password are valid, False otherwise."""
try:
stored = _read_auth(username).get("password_hash", "").encode()
if not stored:
return False
return bcrypt.checkpw(password.encode(), stored)
except Exception:
return False
# ---------------------------------------------------------------------------
# Google OAuth helpers
# ---------------------------------------------------------------------------
def find_user_by_google(sub: str, email: str) -> str | None:
"""
Scan all users for one whose auth.json matches the given Google sub or email.
Sub match takes priority (stable); email match is a fallback for first sign-in.
Returns the username, or None if no match.
"""
root = settings.home_root()
if not root.exists():
return None
for user_dir in sorted(root.iterdir()):
if not user_dir.is_dir():
continue
data = _read_auth(user_dir.name)
if not data:
continue
if sub and data.get("google_sub") == sub:
return user_dir.name
if email and data.get("google_email", "").lower() == email.lower():
return user_dir.name
return None
def link_google(username: str, sub: str, email: str) -> None:
"""Store / update Google sub and email in a user's auth.json."""
data = _read_auth(username)
data["google_sub"] = sub
data["google_email"] = email
_write_auth(username, data)
logger.info("Google account linked for user: %s (%s)", username, email)
def get_user_gemini_key(username: str) -> str | None:
"""Return the user's personal Gemini API key, or None to use the server key."""
return _read_auth(username).get("gemini_api_key") or None
def get_user_role(username: str) -> str:
"""Return the user's role: 'admin' or 'user' (default).
Role is stored as auth.json["role"]. Any unrecognised value falls back to 'user'.
Set via: manage_passwords.py role <username> admin|user
"""
role = _read_auth(username).get("role", "user")
return role if role in ("admin", "user") else "user"
# ---------------------------------------------------------------------------
# JWT helpers
# ---------------------------------------------------------------------------
def create_token(username: str) -> str:
"""Return a signed JWT encoding the username."""
expire = datetime.now(timezone.utc) + timedelta(days=settings.jwt_expire_days)
payload = {"sub": username, "exp": expire}
return jwt.encode(payload, settings.jwt_secret, algorithm=ALGORITHM)
def decode_token(token: str) -> str:
"""Decode a JWT and return the username. Raises jwt.InvalidTokenError on failure."""
payload = jwt.decode(token, settings.jwt_secret, algorithms=[ALGORITHM])
return payload["sub"]
# ---------------------------------------------------------------------------
# Invite tokens — one-time setup links for new users
# ---------------------------------------------------------------------------
def _invite_path(username: str) -> Path:
return settings.home_root() / username / "invite.json"
def create_invite(username: str, expire_hours: int = 72) -> str:
"""
Generate a one-time invite token for a user and save it to invite.json.
Returns the raw token string (embed in a URL).
"""
token = secrets.token_urlsafe(32)
expires = (datetime.now(timezone.utc) + timedelta(hours=expire_hours)).isoformat()
user_dir = settings.home_root() / username
user_dir.mkdir(parents=True, exist_ok=True)
_invite_path(username).write_text(
json.dumps({"token": token, "expires_at": expires, "used": False}) + "\n"
)
logger.info("invite created for user: %s (expires %s)", username, expires[:10])
return token
def validate_invite(token: str) -> str | None:
"""
Check an invite token across all users.
Returns the username if valid and unused, None otherwise.
"""
root = settings.home_root()
if not root.exists():
return None
for user_dir in root.iterdir():
if not user_dir.is_dir():
continue
inv_path = user_dir / "invite.json"
if not inv_path.exists():
continue
try:
data = json.loads(inv_path.read_text())
except Exception:
continue
if data.get("used"):
continue
if data.get("token") != token:
continue
expires = datetime.fromisoformat(data["expires_at"])
if datetime.now(timezone.utc) > expires:
continue
return user_dir.name
return None
def consume_invite(username: str) -> None:
"""Mark the invite token for a user as used."""
path = _invite_path(username)
if path.exists():
try:
data = json.loads(path.read_text())
data["used"] = True
path.write_text(json.dumps(data) + "\n")
except Exception:
pass
# ---------------------------------------------------------------------------
# Per-user channel config
# ---------------------------------------------------------------------------
def _channels_path(username: str) -> Path:
return settings.home_root() / username / "channels.json"
def get_user_channels(username: str) -> dict:
"""Return the parsed channels.json for a user, or {} if not found."""
path = _channels_path(username)
if not path.exists():
return {}
try:
return json.loads(path.read_text())
except Exception:
return {}