Replaces nginx basic auth with a proper per-user session system:
- auth_utils.py: bcrypt password hashing, JWT cookie creation/decode
- auth_middleware.py: validates JWT cookie on all routes except /login,
/health, /static/, and webhook endpoints (/channels/, /webhook/)
- routers/ui.py: GET /login, POST /login, POST /logout,
GET /{username}/{persona} — serves index.html with CORTEX_CONFIG injected
- static/login.html: minimal login form (dark theme, matches UI)
- main.py: registers SessionAuthMiddleware + ui.router
- config.py: jwt_secret, jwt_expire_days settings
- manage_passwords.py: CLI tool to set/check/list user passwords
- app.js: reads window.CORTEX_CONFIG (user + persona), sends both on
every /chat and /orchestrate request; persona name shown in header;
logout button (⏏) added to header
- requirements.txt: bcrypt, PyJWT, python-multipart
- .env.default: JWT_SECRET, JWT_EXPIRE_DAYS documented
- tests: client fixture injects JWT cookie; security test assertions
updated for URL-normalized path traversal paths (still secure, codes differ)
All 80 tests pass.
Setup for a new user:
python manage_passwords.py set scott
python manage_passwords.py set holly
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
73 lines
2.4 KiB
Python
73 lines
2.4 KiB
Python
"""
|
|
Authentication utilities — password hashing and JWT session tokens.
|
|
|
|
Passwords are stored as bcrypt hashes in home/{username}/auth.json.
|
|
Sessions are JWT cookies signed with JWT_SECRET from settings.
|
|
|
|
Usage:
|
|
set_password("scott", "mypassword") # admin setup
|
|
check_credentials("scott", "mypassword") # login validation
|
|
create_token("scott") # returns JWT string
|
|
decode_token(token) # returns username or raises
|
|
"""
|
|
|
|
import json
|
|
import logging
|
|
from datetime import datetime, timedelta, timezone
|
|
from pathlib import Path
|
|
|
|
import bcrypt
|
|
import jwt
|
|
|
|
from config import settings
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
COOKIE_NAME = "cortex_session"
|
|
ALGORITHM = "HS256"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Password helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _auth_path(username: str) -> Path:
|
|
return settings.home_root() / username / "auth.json"
|
|
|
|
|
|
def set_password(username: str, password: str) -> None:
|
|
"""Hash and store a password for a user. Creates auth.json if needed."""
|
|
hashed = bcrypt.hashpw(password.encode(), bcrypt.gensalt()).decode()
|
|
_auth_path(username).write_text(json.dumps({"password_hash": hashed}) + "\n")
|
|
logger.info("password set for user: %s", username)
|
|
|
|
|
|
def check_credentials(username: str, password: str) -> bool:
|
|
"""Return True if username+password are valid, False otherwise."""
|
|
path = _auth_path(username)
|
|
if not path.exists():
|
|
return False
|
|
try:
|
|
data = json.loads(path.read_text())
|
|
stored = data.get("password_hash", "").encode()
|
|
return bcrypt.checkpw(password.encode(), stored)
|
|
except Exception:
|
|
return False
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# JWT helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def create_token(username: str) -> str:
|
|
"""Return a signed JWT encoding the username."""
|
|
expire = datetime.now(timezone.utc) + timedelta(days=settings.jwt_expire_days)
|
|
payload = {"sub": username, "exp": expire}
|
|
return jwt.encode(payload, settings.jwt_secret, algorithm=ALGORITHM)
|
|
|
|
|
|
def decode_token(token: str) -> str:
|
|
"""Decode a JWT and return the username. Raises jwt.InvalidTokenError on failure."""
|
|
payload = jwt.decode(token, settings.jwt_secret, algorithms=[ALGORITHM])
|
|
return payload["sub"]
|