""" Authentication utilities — password hashing and JWT session tokens. Passwords are stored as bcrypt hashes in home/{username}/auth.json. Sessions are JWT cookies signed with JWT_SECRET from settings. Usage: set_password("scott", "mypassword") # admin setup check_credentials("scott", "mypassword") # login validation create_token("scott") # returns JWT string decode_token(token) # returns username or raises """ import json import logging import secrets from datetime import datetime, timedelta, timezone from pathlib import Path import bcrypt import jwt from config import settings logger = logging.getLogger(__name__) COOKIE_NAME = "cortex_session" ALGORITHM = "HS256" # --------------------------------------------------------------------------- # auth.json helpers — read/write without clobbering unrelated fields # --------------------------------------------------------------------------- def _auth_path(username: str) -> Path: return settings.home_root() / username / "auth.json" def _read_auth(username: str) -> dict: path = _auth_path(username) if not path.exists(): return {} try: return json.loads(path.read_text()) except Exception: return {} def _write_auth(username: str, data: dict) -> None: path = _auth_path(username) path.parent.mkdir(parents=True, exist_ok=True) path.write_text(json.dumps(data, indent=2) + "\n") # --------------------------------------------------------------------------- # Password helpers # --------------------------------------------------------------------------- def set_password(username: str, password: str) -> None: """Hash and store a password. Preserves any existing fields in auth.json.""" data = _read_auth(username) data["password_hash"] = bcrypt.hashpw(password.encode(), bcrypt.gensalt()).decode() _write_auth(username, data) logger.info("password set for user: %s", username) def check_credentials(username: str, password: str) -> bool: """Return True if username+password are valid, False otherwise.""" try: stored = _read_auth(username).get("password_hash", "").encode() if not stored: return False return bcrypt.checkpw(password.encode(), stored) except Exception: return False # --------------------------------------------------------------------------- # Google OAuth helpers # --------------------------------------------------------------------------- def find_user_by_google(sub: str, email: str) -> str | None: """ Scan all users for one whose auth.json matches the given Google sub or email. Sub match takes priority (stable); email match is a fallback for first sign-in. Returns the username, or None if no match. """ root = settings.home_root() if not root.exists(): return None for user_dir in sorted(root.iterdir()): if not user_dir.is_dir(): continue data = _read_auth(user_dir.name) if not data: continue if sub and data.get("google_sub") == sub: return user_dir.name if email and data.get("google_email", "").lower() == email.lower(): return user_dir.name return None def link_google(username: str, sub: str, email: str) -> None: """Store / update Google sub and email in a user's auth.json.""" data = _read_auth(username) data["google_sub"] = sub data["google_email"] = email _write_auth(username, data) logger.info("Google account linked for user: %s (%s)", username, email) def get_user_gemini_key(username: str) -> str | None: """Return the user's personal Gemini API key, or None to use the server key.""" return _read_auth(username).get("gemini_api_key") or None def get_user_role(username: str) -> str: """Return the user's role: 'admin' or 'user' (default). Role is stored as auth.json["role"]. Any unrecognised value falls back to 'user'. Set via: manage_passwords.py role admin|user """ role = _read_auth(username).get("role", "user") return role if role in ("admin", "user") else "user" # --------------------------------------------------------------------------- # JWT helpers # --------------------------------------------------------------------------- def create_token(username: str) -> str: """Return a signed JWT encoding the username.""" expire = datetime.now(timezone.utc) + timedelta(days=settings.jwt_expire_days) payload = {"sub": username, "exp": expire} return jwt.encode(payload, settings.jwt_secret, algorithm=ALGORITHM) def decode_token(token: str) -> str: """Decode a JWT and return the username. Raises jwt.InvalidTokenError on failure.""" payload = jwt.decode(token, settings.jwt_secret, algorithms=[ALGORITHM]) return payload["sub"] # --------------------------------------------------------------------------- # Invite tokens — one-time setup links for new users # --------------------------------------------------------------------------- def _invite_path(username: str) -> Path: return settings.home_root() / username / "invite.json" def create_invite(username: str, expire_hours: int = 72) -> str: """ Generate a one-time invite token for a user and save it to invite.json. Returns the raw token string (embed in a URL). """ token = secrets.token_urlsafe(32) expires = (datetime.now(timezone.utc) + timedelta(hours=expire_hours)).isoformat() user_dir = settings.home_root() / username user_dir.mkdir(parents=True, exist_ok=True) _invite_path(username).write_text( json.dumps({"token": token, "expires_at": expires, "used": False}) + "\n" ) logger.info("invite created for user: %s (expires %s)", username, expires[:10]) return token def validate_invite(token: str) -> str | None: """ Check an invite token across all users. Returns the username if valid and unused, None otherwise. """ root = settings.home_root() if not root.exists(): return None for user_dir in root.iterdir(): if not user_dir.is_dir(): continue inv_path = user_dir / "invite.json" if not inv_path.exists(): continue try: data = json.loads(inv_path.read_text()) except Exception: continue if data.get("used"): continue if data.get("token") != token: continue expires = datetime.fromisoformat(data["expires_at"]) if datetime.now(timezone.utc) > expires: continue return user_dir.name return None def consume_invite(username: str) -> None: """Mark the invite token for a user as used.""" path = _invite_path(username) if path.exists(): try: data = json.loads(path.read_text()) data["used"] = True path.write_text(json.dumps(data) + "\n") except Exception: pass # --------------------------------------------------------------------------- # Per-user channel config # --------------------------------------------------------------------------- def _channels_path(username: str) -> Path: return settings.home_root() / username / "channels.json" def get_user_channels(username: str) -> dict: """Return the parsed channels.json for a user, or {} if not found.""" path = _channels_path(username) if not path.exists(): return {} try: return json.loads(path.read_text()) except Exception: return {} def get_tool_policy(username: str) -> dict: """Return the parsed tool_policy.json for a user. Confirmation-gate keys (existing): allow — tools in CONFIRM_REQUIRED that this user has pre-approved (skip gate) deny — tools always blocked for this user regardless of global CONFIRM_REQUIRED Risk-policy keys (new): max_risk — auto-include all tools at/below this level ("low"|"medium"|"high") whitelist — force-include specific tools above max_risk blacklist — force-exclude specific tools regardless of max_risk """ path = settings.home_root() / username / "tool_policy.json" try: return json.loads(path.read_text()) except Exception: return {} def get_risk_policy(username: str) -> tuple[str | None, list[str], list[str]]: """Return (max_risk, whitelist, blacklist) from the user's tool policy.""" policy = get_tool_policy(username) return ( policy.get("max_risk") or None, policy.get("whitelist") or [], policy.get("blacklist") or [], ) def save_tool_policy(username: str, data: dict) -> None: path = settings.home_root() / username / "tool_policy.json" path.write_text(json.dumps(data, indent=2) + "\n")