Files
Cortex-Inara/cortex/routers/settings.py
Scott Idem 348ca120c1 feat: full channels.json UI + http_allowlist settings
Notifications page:
- NC Talk section expanded: url, bot_secret, notification_room,
  nc_username, nc_app_password — all fields from channels.json now editable
- Per-channel sections use <details>/<summary> collapsibles; auto-open
  when values are present
- Secrets use type=password with "leave blank to keep" semantics
- Google Chat outbound webhook in its own collapsible section

Account settings:
- HTTP POST Allowlist section added (same textarea pattern as email allowlist)
- POST /settings/http-allowlist route saves home/{user}/http_allowlist.json
- Example placeholder shows ha.dgrzone.com and n8n patterns

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-09 13:57:18 -04:00

419 lines
16 KiB
Python

"""
Account settings router.
Routes:
GET /settings → show account settings page (requires auth)
POST /settings/password → change password
POST /settings/username → rename the user account (forces re-login)
POST /settings/persona/rename → rename a persona directory
"""
import html as _html
import json
import logging
import re
from pathlib import Path
import jwt
from fastapi import APIRouter, Form, Request
from fastapi.responses import HTMLResponse, RedirectResponse
from auth_utils import COOKIE_NAME, decode_token, check_credentials, set_password, _read_auth, _write_auth, get_user_channels, get_tool_policy, save_tool_policy
from tools import CONFIRM_REQUIRED
from persona import list_user_personas
from config import settings as app_settings
_SLUG_RE = re.compile(r"^[a-z_][a-z0-9_-]{0,31}$")
logger = logging.getLogger(__name__)
router = APIRouter()
_STATIC = Path(__file__).parent.parent / "static"
_LAST_PERSONA_COOKIE = "cx_last_persona"
def _get_session_user(request: Request) -> str | None:
token = request.cookies.get(COOKIE_NAME)
if not token:
return None
try:
return decode_token(token)
except jwt.InvalidTokenError:
return None
def _preferred_persona(request: Request, username: str) -> str:
names = list_user_personas(username)
if not names:
return ""
cookie_val = request.cookies.get(_LAST_PERSONA_COOKIE, "")
if cookie_val in names:
return cookie_val
return names[0]
def _notifications_page(username: str, back_persona: str = "", success: str = "", error: str = "") -> str:
html = (_STATIC / "notifications.html").read_text()
channels = get_user_channels(username)
nct = channels.get("nextcloud") or {}
notify_ch = _html.escape(channels.get("notification_channel", "") or "")
notify_email = _html.escape(channels.get("notification_email", "") or "")
nc_url = _html.escape(nct.get("url", "") or "")
nc_bot_secret = _html.escape(nct.get("bot_secret", "") or "")
nc_room = _html.escape(nct.get("notification_room", "") or "")
nc_username = _html.escape(nct.get("nc_username", "") or "")
nc_app_password = _html.escape(nct.get("nc_app_password", "") or "")
gc_webhook = _html.escape((channels.get("google_chat") or {}).get("outbound_webhook", "") or "")
html = html.replace("{{ notify_channel }}", notify_ch)
html = html.replace("{{ notify_email_override }}", notify_email)
html = html.replace("{{ nc_url }}", nc_url)
html = html.replace("{{ nc_bot_secret }}", nc_bot_secret)
html = html.replace("{{ nc_notify_room }}", nc_room)
html = html.replace("{{ nc_username }}", nc_username)
html = html.replace("{{ nc_app_password }}", nc_app_password)
html = html.replace("{{ gc_webhook }}", gc_webhook)
html = html.replace("{{ back_href }}", f"/{username}/{back_persona}" if back_persona else "/")
html = html.replace("{{ help_href }}", f"/help?persona={back_persona}" if back_persona else "/help")
if success:
html = html.replace("<!-- SUCCESS -->", f'<p class="success">{success}</p>')
if error:
html = html.replace("<!-- ERROR -->", f'<p class="error">{error}</p>')
return html
def _settings_page(username: str, personas: list[str], back_persona: str = "", success: str = "", error: str = "") -> str:
html = (_STATIC / "settings.html").read_text()
html = html.replace("{{ username }}", username)
# Connected Google account (OAuth sign-in)
auth_data = _read_auth(username)
google_email = auth_data.get("google_email") or ""
html = html.replace("{{ google_email }}", google_email)
role = auth_data.get("role", "user")
html = html.replace("{{ user_role }}", role)
al_path = app_settings.home_root() / username / "email_allowlist.json"
try:
patterns = json.loads(al_path.read_text())
allowlist_text = _html.escape("\n".join(str(p) for p in patterns if str(p).strip()))
except Exception:
allowlist_text = ""
html = html.replace("{{ email_allowlist }}", allowlist_text)
http_al_path = app_settings.home_root() / username / "http_allowlist.json"
try:
http_prefixes = json.loads(http_al_path.read_text())
http_allowlist_text = _html.escape("\n".join(str(p) for p in http_prefixes if str(p).strip()))
except Exception:
http_allowlist_text = ""
html = html.replace("{{ http_allowlist }}", http_allowlist_text)
# Tool permission policy
policy = get_tool_policy(username)
tool_allow_text = _html.escape("\n".join(policy.get("allow", [])))
tool_deny_text = _html.escape("\n".join(policy.get("deny", [])))
confirm_tools_list = _html.escape(", ".join(sorted(CONFIRM_REQUIRED)))
html = html.replace("{{ tool_allow }}", tool_allow_text)
html = html.replace("{{ tool_deny }}", tool_deny_text)
html = html.replace("{{ confirm_required_tools }}", confirm_tools_list)
persona_items = "\n".join(
f'''<li>
<a href="/{username}/{p}" class="persona-link">{p}</a>
<button class="persona-rename-toggle" data-persona="{p}" title="Rename">✏</button>
<form class="persona-rename-form" data-persona="{p}"
method="POST" action="/settings/persona/rename" style="display:none">
<input type="hidden" name="old_name" value="{p}">
<input type="text" name="new_name" value="{p}"
pattern="[a-z_][a-z0-9_\\-]{{0,31}}" required>
<button type="submit">Save</button>
<button type="button" class="persona-rename-cancel">Cancel</button>
</form>
</li>''' for p in personas
)
html = html.replace("{{ persona_items }}", persona_items or "<li><em>No personas yet.</em></li>")
if not back_persona:
back_persona = personas[0] if personas else ""
html = html.replace("{{ back_href }}", f"/{username}/{back_persona}" if back_persona else "/")
html = html.replace("{{ help_href }}", f"/help?persona={back_persona}" if back_persona else "/help")
if success:
html = html.replace("<!-- SUCCESS -->", f'<p class="success">{success}</p>')
if error:
html = html.replace("<!-- ERROR -->", f'<p class="error">{error}</p>')
return html
@router.get("/settings", include_in_schema=False)
async def settings_page(request: Request):
username = _get_session_user(request)
if not username:
return RedirectResponse("/login", status_code=302)
personas = list_user_personas(username)
back_persona = _preferred_persona(request, username)
return HTMLResponse(_settings_page(username, personas, back_persona=back_persona))
@router.post("/settings/password", include_in_schema=False)
async def change_password(
request: Request,
current_password: str = Form(...),
new_password: str = Form(...),
confirm_password: str = Form(...),
):
username = _get_session_user(request)
if not username:
return RedirectResponse("/login", status_code=302)
personas = list_user_personas(username)
back_persona = _preferred_persona(request, username)
if not check_credentials(username, current_password):
return HTMLResponse(_settings_page(username, personas, back_persona, error="Current password is incorrect."))
if len(new_password) < 8:
return HTMLResponse(_settings_page(username, personas, back_persona, error="New password must be at least 8 characters."))
if new_password != confirm_password:
return HTMLResponse(_settings_page(username, personas, back_persona, error="New passwords do not match."))
set_password(username, new_password)
logger.info("password changed: %s", username)
return HTMLResponse(_settings_page(username, personas, back_persona, success="Password updated successfully."))
@router.post("/settings/username", include_in_schema=False)
async def rename_username(
request: Request,
new_username: str = Form(...),
):
username = _get_session_user(request)
if not username:
return RedirectResponse("/login", status_code=302)
personas = list_user_personas(username)
back_persona = _preferred_persona(request, username)
new_username = new_username.strip().lower()
if not _SLUG_RE.match(new_username):
return HTMLResponse(_settings_page(
username, personas, back_persona,
error="Invalid username. Use lowercase letters, digits, _ or - only."))
if new_username == username:
return RedirectResponse("/settings", status_code=302)
home_root = app_settings.home_root()
old_dir = home_root / username
new_dir = home_root / new_username
if new_dir.exists():
return HTMLResponse(_settings_page(
username, personas, back_persona,
error=f"Username '{new_username}' is already taken."))
old_dir.rename(new_dir)
logger.info("user renamed: %s%s", username, new_username)
# Clear the auth cookie — old JWT now refers to a non-existent user
resp = RedirectResponse("/login?msg=username_changed", status_code=302)
resp.delete_cookie(COOKIE_NAME)
return resp
@router.post("/settings/gemini-key", include_in_schema=False)
async def save_gemini_key(
request: Request,
gemini_api_key: str = Form(...),
):
username = _get_session_user(request)
if not username:
return RedirectResponse("/login", status_code=302)
personas = list_user_personas(username)
back_persona = _preferred_persona(request, username)
gemini_api_key = gemini_api_key.strip()
data = _read_auth(username)
if gemini_api_key:
data["gemini_api_key"] = gemini_api_key
msg = "Gemini API key saved."
else:
data.pop("gemini_api_key", None)
msg = "Gemini API key removed — using server key."
_write_auth(username, data)
logger.info("gemini key updated: %s", username)
return HTMLResponse(_settings_page(username, personas, back_persona, success=msg))
@router.post("/settings/persona/rename", include_in_schema=False)
async def rename_persona(
request: Request,
old_name: str = Form(...),
new_name: str = Form(...),
):
username = _get_session_user(request)
if not username:
return RedirectResponse("/login", status_code=302)
personas = list_user_personas(username)
back_persona = _preferred_persona(request, username)
new_name = new_name.strip().lower()
if not _SLUG_RE.match(new_name):
return HTMLResponse(_settings_page(
username, personas, back_persona,
error="Invalid name. Use lowercase letters, digits, _ or - only."))
if new_name == old_name:
return RedirectResponse("/settings", status_code=302)
persona_root = app_settings.home_root() / username / "persona"
old_dir = persona_root / old_name
new_dir = persona_root / new_name
if not old_dir.exists():
return HTMLResponse(_settings_page(username, personas, back_persona, error=f"Persona '{old_name}' not found."))
if new_dir.exists():
return HTMLResponse(_settings_page(
username, personas, back_persona,
error=f"A persona named '{new_name}' already exists."))
old_dir.rename(new_dir)
logger.info("persona renamed: %s/%s%s", username, old_name, new_name)
return RedirectResponse("/settings", status_code=302)
@router.get("/settings/notifications", include_in_schema=False)
async def notifications_page(request: Request):
username = _get_session_user(request)
if not username:
return RedirectResponse("/login", status_code=302)
back_persona = _preferred_persona(request, username)
return HTMLResponse(_notifications_page(username, back_persona))
@router.post("/settings/notifications", include_in_schema=False)
async def save_notifications(
request: Request,
notification_channel: str = Form(""),
notification_email: str = Form(""),
nc_url: str = Form(""),
nc_bot_secret: str = Form(""),
nc_notification_room: str = Form(""),
nc_username: str = Form(""),
nc_app_password: str = Form(""),
gc_outbound_webhook: str = Form(""),
):
username = _get_session_user(request)
if not username:
return RedirectResponse("/login", status_code=302)
back_persona = _preferred_persona(request, username)
channels_path = app_settings.home_root() / username / "channels.json"
try:
channels = json.loads(channels_path.read_text())
except Exception:
channels = {}
# Top-level notification preference
notification_channel = notification_channel.strip()
if notification_channel in ("web_push", "email", "nextcloud", "google_chat"):
channels["notification_channel"] = notification_channel
else:
channels.pop("notification_channel", None)
# Optional email address override (blank = use login email)
notification_email = notification_email.strip()
if notification_email:
channels["notification_email"] = notification_email
else:
channels.pop("notification_email", None)
# Nextcloud Talk — full config nested under "nextcloud"
if "nextcloud" not in channels:
channels["nextcloud"] = {}
nct = channels["nextcloud"]
if nc_url.strip():
nct["url"] = nc_url.strip().rstrip("/")
# Only overwrite secrets if a new value was provided (blank = keep existing)
if nc_bot_secret.strip():
nct["bot_secret"] = nc_bot_secret.strip()
nct["notification_room"] = nc_notification_room.strip()
if nc_username.strip():
nct["nc_username"] = nc_username.strip()
if nc_app_password.strip():
nct["nc_app_password"] = nc_app_password.strip()
# Google Chat outbound webhook — nested under "google_chat"
if "google_chat" not in channels:
channels["google_chat"] = {}
channels["google_chat"]["outbound_webhook"] = gc_outbound_webhook.strip()
channels_path.write_text(json.dumps(channels, indent=2) + "\n")
logger.info("notifications updated for %s (channel=%s)", username, notification_channel or "none")
return HTMLResponse(_notifications_page(username, back_persona, success="Notification settings saved."))
@router.post("/settings/tool-policy", include_in_schema=False)
async def save_tool_policy_route(
request: Request,
allow_list: str = Form(""),
deny_list: str = Form(""),
):
username = _get_session_user(request)
if not username:
return RedirectResponse("/login", status_code=302)
personas = list_user_personas(username)
back_persona = _preferred_persona(request, username)
allow_tools = [ln.strip() for ln in allow_list.splitlines() if ln.strip()]
deny_tools = [ln.strip() for ln in deny_list.splitlines() if ln.strip()]
save_tool_policy(username, {"allow": allow_tools, "deny": deny_tools})
logger.info("tool policy updated for %s (allow=%d deny=%d)", username, len(allow_tools), len(deny_tools))
return HTMLResponse(_settings_page(username, personas, back_persona,
success="Tool permission policy saved."))
@router.post("/settings/email-allowlist", include_in_schema=False)
async def save_email_allowlist(
request: Request,
patterns: str = Form(""),
):
username = _get_session_user(request)
if not username:
return RedirectResponse("/login", status_code=302)
personas = list_user_personas(username)
back_persona = _preferred_persona(request, username)
lines = [ln.strip() for ln in patterns.splitlines() if ln.strip()]
path = app_settings.home_root() / username / "email_allowlist.json"
path.write_text(json.dumps(lines, indent=2))
logger.info("email allowlist updated for %s (%d patterns)", username, len(lines))
return HTMLResponse(_settings_page(username, personas, back_persona, success=f"Email allowlist saved ({len(lines)} pattern{'s' if len(lines) != 1 else ''})."))
@router.post("/settings/http-allowlist", include_in_schema=False)
async def save_http_allowlist(
request: Request,
prefixes: str = Form(""),
):
username = _get_session_user(request)
if not username:
return RedirectResponse("/login", status_code=302)
personas = list_user_personas(username)
back_persona = _preferred_persona(request, username)
lines = [ln.strip() for ln in prefixes.splitlines() if ln.strip()]
path = app_settings.home_root() / username / "http_allowlist.json"
path.write_text(json.dumps(lines, indent=2))
logger.info("http allowlist updated for %s (%d prefixes)", username, len(lines))
return HTMLResponse(_settings_page(username, personas, back_persona, success=f"HTTP allowlist saved ({len(lines)} prefix{'es' if len(lines) != 1 else ''})."))