""" Account settings router. Routes: GET /settings → show account settings page (requires auth) POST /settings/password → change password POST /settings/username → rename the user account (forces re-login) POST /settings/persona/rename → rename a persona directory """ import html as _html import json import logging import re from pathlib import Path import jwt from fastapi import APIRouter, Form, Request from fastapi.responses import HTMLResponse, RedirectResponse from auth_utils import COOKIE_NAME, decode_token, check_credentials, set_password, _read_auth, _write_auth, get_user_channels, get_tool_policy, save_tool_policy from tools import CONFIRM_REQUIRED from persona import list_user_personas from config import settings as app_settings _SLUG_RE = re.compile(r"^[a-z_][a-z0-9_-]{0,31}$") logger = logging.getLogger(__name__) router = APIRouter() _STATIC = Path(__file__).parent.parent / "static" _LAST_PERSONA_COOKIE = "cx_last_persona" def _get_session_user(request: Request) -> str | None: token = request.cookies.get(COOKIE_NAME) if not token: return None try: return decode_token(token) except jwt.InvalidTokenError: return None def _preferred_persona(request: Request, username: str) -> str: names = list_user_personas(username) if not names: return "" cookie_val = request.cookies.get(_LAST_PERSONA_COOKIE, "") if cookie_val in names: return cookie_val return names[0] def _notifications_page(username: str, back_persona: str = "", success: str = "", error: str = "") -> str: html = (_STATIC / "notifications.html").read_text() channels = get_user_channels(username) nct = channels.get("nextcloud") or {} notify_ch = _html.escape(channels.get("notification_channel", "") or "") notify_email = _html.escape(channels.get("notification_email", "") or "") nc_url = _html.escape(nct.get("url", "") or "") nc_bot_secret = _html.escape(nct.get("bot_secret", "") or "") nc_room = _html.escape(nct.get("notification_room", "") or "") nc_username = _html.escape(nct.get("nc_username", "") or "") nc_app_password = _html.escape(nct.get("nc_app_password", "") or "") gc_webhook = _html.escape((channels.get("google_chat") or {}).get("outbound_webhook", "") or "") ha = channels.get("homeassistant") or {} ha_url = _html.escape(ha.get("url", "") or "") ha_webhook_id = _html.escape(ha.get("webhook_id", "") or "") html = html.replace("{{ notify_channel }}", notify_ch) html = html.replace("{{ notify_email_override }}", notify_email) html = html.replace("{{ nc_url }}", nc_url) html = html.replace("{{ nc_bot_secret }}", nc_bot_secret) html = html.replace("{{ nc_notify_room }}", nc_room) html = html.replace("{{ nc_username }}", nc_username) html = html.replace("{{ nc_app_password }}", nc_app_password) html = html.replace("{{ gc_webhook }}", gc_webhook) html = html.replace("{{ ha_url }}", ha_url) html = html.replace("{{ ha_webhook_id }}", ha_webhook_id) html = html.replace("{{ ha_username }}", username) html = html.replace("{{ back_href }}", f"/{username}/{back_persona}" if back_persona else "/") html = html.replace("{{ help_href }}", f"/help?persona={back_persona}" if back_persona else "/help") if success: html = html.replace("", f'

{success}

') if error: html = html.replace("", f'

{error}

') return html def _settings_page(username: str, personas: list[str], back_persona: str = "", success: str = "", error: str = "") -> str: html = (_STATIC / "settings.html").read_text() html = html.replace("{{ username }}", username) # Connected Google account (OAuth sign-in) auth_data = _read_auth(username) google_email = auth_data.get("google_email") or "" html = html.replace("{{ google_email }}", google_email) role = auth_data.get("role", "user") html = html.replace("{{ user_role }}", role) al_path = app_settings.home_root() / username / "email_allowlist.json" try: patterns = json.loads(al_path.read_text()) allowlist_text = _html.escape("\n".join(str(p) for p in patterns if str(p).strip())) except Exception: allowlist_text = "" html = html.replace("{{ email_allowlist }}", allowlist_text) http_al_path = app_settings.home_root() / username / "http_allowlist.json" try: http_prefixes = json.loads(http_al_path.read_text()) http_allowlist_text = _html.escape("\n".join(str(p) for p in http_prefixes if str(p).strip())) except Exception: http_allowlist_text = "" html = html.replace("{{ http_allowlist }}", http_allowlist_text) # Tool permission policy policy = get_tool_policy(username) tool_allow_text = _html.escape("\n".join(policy.get("allow", []))) tool_deny_text = _html.escape("\n".join(policy.get("deny", []))) confirm_tools_list = _html.escape(", ".join(sorted(CONFIRM_REQUIRED))) html = html.replace("{{ tool_allow }}", tool_allow_text) html = html.replace("{{ tool_deny }}", tool_deny_text) html = html.replace("{{ confirm_required_tools }}", confirm_tools_list) persona_items = "\n".join( f'''
  • {p}
  • ''' for p in personas ) html = html.replace("{{ persona_items }}", persona_items or "
  • No personas yet.
  • ") if not back_persona: back_persona = personas[0] if personas else "" html = html.replace("{{ back_href }}", f"/{username}/{back_persona}" if back_persona else "/") html = html.replace("{{ help_href }}", f"/help?persona={back_persona}" if back_persona else "/help") if success: html = html.replace("", f'

    {success}

    ') if error: html = html.replace("", f'

    {error}

    ') return html @router.get("/settings", include_in_schema=False) async def settings_page(request: Request): username = _get_session_user(request) if not username: return RedirectResponse("/login", status_code=302) personas = list_user_personas(username) back_persona = _preferred_persona(request, username) return HTMLResponse(_settings_page(username, personas, back_persona=back_persona)) @router.post("/settings/password", include_in_schema=False) async def change_password( request: Request, current_password: str = Form(...), new_password: str = Form(...), confirm_password: str = Form(...), ): username = _get_session_user(request) if not username: return RedirectResponse("/login", status_code=302) personas = list_user_personas(username) back_persona = _preferred_persona(request, username) if not check_credentials(username, current_password): return HTMLResponse(_settings_page(username, personas, back_persona, error="Current password is incorrect.")) if len(new_password) < 8: return HTMLResponse(_settings_page(username, personas, back_persona, error="New password must be at least 8 characters.")) if new_password != confirm_password: return HTMLResponse(_settings_page(username, personas, back_persona, error="New passwords do not match.")) set_password(username, new_password) logger.info("password changed: %s", username) return HTMLResponse(_settings_page(username, personas, back_persona, success="Password updated successfully.")) @router.post("/settings/username", include_in_schema=False) async def rename_username( request: Request, new_username: str = Form(...), ): username = _get_session_user(request) if not username: return RedirectResponse("/login", status_code=302) personas = list_user_personas(username) back_persona = _preferred_persona(request, username) new_username = new_username.strip().lower() if not _SLUG_RE.match(new_username): return HTMLResponse(_settings_page( username, personas, back_persona, error="Invalid username. Use lowercase letters, digits, _ or - only.")) if new_username == username: return RedirectResponse("/settings", status_code=302) home_root = app_settings.home_root() old_dir = home_root / username new_dir = home_root / new_username if new_dir.exists(): return HTMLResponse(_settings_page( username, personas, back_persona, error=f"Username '{new_username}' is already taken.")) old_dir.rename(new_dir) logger.info("user renamed: %s → %s", username, new_username) # Clear the auth cookie — old JWT now refers to a non-existent user resp = RedirectResponse("/login?msg=username_changed", status_code=302) resp.delete_cookie(COOKIE_NAME) return resp @router.post("/settings/gemini-key", include_in_schema=False) async def save_gemini_key( request: Request, gemini_api_key: str = Form(...), ): username = _get_session_user(request) if not username: return RedirectResponse("/login", status_code=302) personas = list_user_personas(username) back_persona = _preferred_persona(request, username) gemini_api_key = gemini_api_key.strip() data = _read_auth(username) if gemini_api_key: data["gemini_api_key"] = gemini_api_key msg = "Gemini API key saved." else: data.pop("gemini_api_key", None) msg = "Gemini API key removed — using server key." _write_auth(username, data) logger.info("gemini key updated: %s", username) return HTMLResponse(_settings_page(username, personas, back_persona, success=msg)) @router.post("/settings/persona/rename", include_in_schema=False) async def rename_persona( request: Request, old_name: str = Form(...), new_name: str = Form(...), ): username = _get_session_user(request) if not username: return RedirectResponse("/login", status_code=302) personas = list_user_personas(username) back_persona = _preferred_persona(request, username) new_name = new_name.strip().lower() if not _SLUG_RE.match(new_name): return HTMLResponse(_settings_page( username, personas, back_persona, error="Invalid name. Use lowercase letters, digits, _ or - only.")) if new_name == old_name: return RedirectResponse("/settings", status_code=302) persona_root = app_settings.home_root() / username / "persona" old_dir = persona_root / old_name new_dir = persona_root / new_name if not old_dir.exists(): return HTMLResponse(_settings_page(username, personas, back_persona, error=f"Persona '{old_name}' not found.")) if new_dir.exists(): return HTMLResponse(_settings_page( username, personas, back_persona, error=f"A persona named '{new_name}' already exists.")) old_dir.rename(new_dir) logger.info("persona renamed: %s/%s → %s", username, old_name, new_name) return RedirectResponse("/settings", status_code=302) @router.get("/settings/notifications", include_in_schema=False) async def notifications_page(request: Request): username = _get_session_user(request) if not username: return RedirectResponse("/login", status_code=302) back_persona = _preferred_persona(request, username) return HTMLResponse(_notifications_page(username, back_persona)) @router.post("/settings/notifications", include_in_schema=False) async def save_notifications( request: Request, notification_channel: str = Form(""), notification_email: str = Form(""), nc_url: str = Form(""), nc_bot_secret: str = Form(""), nc_notification_room: str = Form(""), nc_username: str = Form(""), nc_app_password: str = Form(""), gc_outbound_webhook: str = Form(""), ha_url: str = Form(""), ha_token: str = Form(""), ha_webhook_id: str = Form(""), ): username = _get_session_user(request) if not username: return RedirectResponse("/login", status_code=302) back_persona = _preferred_persona(request, username) channels_path = app_settings.home_root() / username / "channels.json" try: channels = json.loads(channels_path.read_text()) except Exception: channels = {} # Top-level notification preference notification_channel = notification_channel.strip() if notification_channel in ("web_push", "email", "nextcloud", "google_chat"): channels["notification_channel"] = notification_channel else: channels.pop("notification_channel", None) # Optional email address override (blank = use login email) notification_email = notification_email.strip() if notification_email: channels["notification_email"] = notification_email else: channels.pop("notification_email", None) # Nextcloud Talk — full config nested under "nextcloud" if "nextcloud" not in channels: channels["nextcloud"] = {} nct = channels["nextcloud"] if nc_url.strip(): nct["url"] = nc_url.strip().rstrip("/") # Only overwrite secrets if a new value was provided (blank = keep existing) if nc_bot_secret.strip(): nct["bot_secret"] = nc_bot_secret.strip() nct["notification_room"] = nc_notification_room.strip() if nc_username.strip(): nct["nc_username"] = nc_username.strip() if nc_app_password.strip(): nct["nc_app_password"] = nc_app_password.strip() # Google Chat outbound webhook — nested under "google_chat" if "google_chat" not in channels: channels["google_chat"] = {} channels["google_chat"]["outbound_webhook"] = gc_outbound_webhook.strip() # Home Assistant — nested under "homeassistant" if "homeassistant" not in channels: channels["homeassistant"] = {} ha = channels["homeassistant"] if ha_url.strip(): ha["url"] = ha_url.strip().rstrip("/") if ha_token.strip(): ha["token"] = ha_token.strip() if ha_webhook_id.strip(): ha["webhook_id"] = ha_webhook_id.strip() channels_path.write_text(json.dumps(channels, indent=2) + "\n") logger.info("notifications updated for %s (channel=%s)", username, notification_channel or "none") return HTMLResponse(_notifications_page(username, back_persona, success="Notification settings saved.")) @router.post("/settings/tool-policy", include_in_schema=False) async def save_tool_policy_route( request: Request, allow_list: str = Form(""), deny_list: str = Form(""), ): username = _get_session_user(request) if not username: return RedirectResponse("/login", status_code=302) personas = list_user_personas(username) back_persona = _preferred_persona(request, username) allow_tools = [ln.strip() for ln in allow_list.splitlines() if ln.strip()] deny_tools = [ln.strip() for ln in deny_list.splitlines() if ln.strip()] save_tool_policy(username, {"allow": allow_tools, "deny": deny_tools}) logger.info("tool policy updated for %s (allow=%d deny=%d)", username, len(allow_tools), len(deny_tools)) return HTMLResponse(_settings_page(username, personas, back_persona, success="Tool permission policy saved.")) @router.post("/settings/email-allowlist", include_in_schema=False) async def save_email_allowlist( request: Request, patterns: str = Form(""), ): username = _get_session_user(request) if not username: return RedirectResponse("/login", status_code=302) personas = list_user_personas(username) back_persona = _preferred_persona(request, username) lines = [ln.strip() for ln in patterns.splitlines() if ln.strip()] path = app_settings.home_root() / username / "email_allowlist.json" path.write_text(json.dumps(lines, indent=2)) logger.info("email allowlist updated for %s (%d patterns)", username, len(lines)) return HTMLResponse(_settings_page(username, personas, back_persona, success=f"Email allowlist saved ({len(lines)} pattern{'s' if len(lines) != 1 else ''}).")) @router.post("/settings/http-allowlist", include_in_schema=False) async def save_http_allowlist( request: Request, prefixes: str = Form(""), ): username = _get_session_user(request) if not username: return RedirectResponse("/login", status_code=302) personas = list_user_personas(username) back_persona = _preferred_persona(request, username) lines = [ln.strip() for ln in prefixes.splitlines() if ln.strip()] path = app_settings.home_root() / username / "http_allowlist.json" path.write_text(json.dumps(lines, indent=2)) logger.info("http allowlist updated for %s (%d prefixes)", username, len(lines)) return HTMLResponse(_settings_page(username, personas, back_persona, success=f"HTTP allowlist saved ({len(lines)} prefix{'es' if len(lines) != 1 else ''})."))