Settings page gets an editable textarea (POST /settings/email-allowlist)
so users can view and update their per-user regex allowlist without
touching the raw JSON file.
Files panel gains a "Settings" group containing email_allowlist.json as
a raw JSON editor backup — served from home/{user}/ via files.py USER_FILES.
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
259 lines
9.3 KiB
Python
259 lines
9.3 KiB
Python
"""
|
|
Account settings router.
|
|
|
|
Routes:
|
|
GET /settings → show account settings page (requires auth)
|
|
POST /settings/password → change password
|
|
POST /settings/username → rename the user account (forces re-login)
|
|
POST /settings/persona/rename → rename a persona directory
|
|
"""
|
|
|
|
import html as _html
|
|
import json
|
|
import logging
|
|
import re
|
|
from pathlib import Path
|
|
|
|
import jwt
|
|
from fastapi import APIRouter, Form, Request
|
|
from fastapi.responses import HTMLResponse, RedirectResponse
|
|
|
|
from auth_utils import COOKIE_NAME, decode_token, check_credentials, set_password, _read_auth, _write_auth
|
|
from persona import list_user_personas
|
|
from config import settings as app_settings
|
|
|
|
_SLUG_RE = re.compile(r"^[a-z_][a-z0-9_-]{0,31}$")
|
|
|
|
logger = logging.getLogger(__name__)
|
|
router = APIRouter()
|
|
|
|
_STATIC = Path(__file__).parent.parent / "static"
|
|
|
|
|
|
_LAST_PERSONA_COOKIE = "cx_last_persona"
|
|
|
|
|
|
def _get_session_user(request: Request) -> str | None:
|
|
token = request.cookies.get(COOKIE_NAME)
|
|
if not token:
|
|
return None
|
|
try:
|
|
return decode_token(token)
|
|
except jwt.InvalidTokenError:
|
|
return None
|
|
|
|
|
|
def _preferred_persona(request: Request, username: str) -> str:
|
|
names = list_user_personas(username)
|
|
if not names:
|
|
return ""
|
|
cookie_val = request.cookies.get(_LAST_PERSONA_COOKIE, "")
|
|
if cookie_val in names:
|
|
return cookie_val
|
|
return names[0]
|
|
|
|
|
|
def _settings_page(username: str, personas: list[str], back_persona: str = "", success: str = "", error: str = "") -> str:
|
|
html = (_STATIC / "settings.html").read_text()
|
|
html = html.replace("{{ username }}", username)
|
|
|
|
# Connected Google account (OAuth sign-in)
|
|
auth_data = _read_auth(username)
|
|
google_email = auth_data.get("google_email") or ""
|
|
html = html.replace("{{ google_email }}", google_email)
|
|
|
|
role = auth_data.get("role", "user")
|
|
html = html.replace("{{ user_role }}", role)
|
|
|
|
al_path = app_settings.home_root() / username / "email_allowlist.json"
|
|
try:
|
|
patterns = json.loads(al_path.read_text())
|
|
allowlist_text = _html.escape("\n".join(str(p) for p in patterns if str(p).strip()))
|
|
except Exception:
|
|
allowlist_text = ""
|
|
html = html.replace("{{ email_allowlist }}", allowlist_text)
|
|
|
|
persona_items = "\n".join(
|
|
f'''<li>
|
|
<a href="/{username}/{p}" class="persona-link">{p}</a>
|
|
<button class="persona-rename-toggle" data-persona="{p}" title="Rename">✏</button>
|
|
<form class="persona-rename-form" data-persona="{p}"
|
|
method="POST" action="/settings/persona/rename" style="display:none">
|
|
<input type="hidden" name="old_name" value="{p}">
|
|
<input type="text" name="new_name" value="{p}"
|
|
pattern="[a-z_][a-z0-9_\\-]{{0,31}}" required>
|
|
<button type="submit">Save</button>
|
|
<button type="button" class="persona-rename-cancel">Cancel</button>
|
|
</form>
|
|
</li>''' for p in personas
|
|
)
|
|
html = html.replace("{{ persona_items }}", persona_items or "<li><em>No personas yet.</em></li>")
|
|
if not back_persona:
|
|
back_persona = personas[0] if personas else ""
|
|
html = html.replace("{{ back_href }}", f"/{username}/{back_persona}" if back_persona else "/")
|
|
html = html.replace("{{ help_href }}", f"/help?persona={back_persona}" if back_persona else "/help")
|
|
if success:
|
|
html = html.replace("<!-- SUCCESS -->", f'<p class="success">{success}</p>')
|
|
if error:
|
|
html = html.replace("<!-- ERROR -->", f'<p class="error">{error}</p>')
|
|
return html
|
|
|
|
|
|
@router.get("/settings", include_in_schema=False)
|
|
async def settings_page(request: Request):
|
|
username = _get_session_user(request)
|
|
if not username:
|
|
return RedirectResponse("/login", status_code=302)
|
|
personas = list_user_personas(username)
|
|
back_persona = _preferred_persona(request, username)
|
|
return HTMLResponse(_settings_page(username, personas, back_persona=back_persona))
|
|
|
|
|
|
@router.post("/settings/password", include_in_schema=False)
|
|
async def change_password(
|
|
request: Request,
|
|
current_password: str = Form(...),
|
|
new_password: str = Form(...),
|
|
confirm_password: str = Form(...),
|
|
):
|
|
username = _get_session_user(request)
|
|
if not username:
|
|
return RedirectResponse("/login", status_code=302)
|
|
|
|
personas = list_user_personas(username)
|
|
back_persona = _preferred_persona(request, username)
|
|
|
|
if not check_credentials(username, current_password):
|
|
return HTMLResponse(_settings_page(username, personas, back_persona, error="Current password is incorrect."))
|
|
|
|
if len(new_password) < 8:
|
|
return HTMLResponse(_settings_page(username, personas, back_persona, error="New password must be at least 8 characters."))
|
|
|
|
if new_password != confirm_password:
|
|
return HTMLResponse(_settings_page(username, personas, back_persona, error="New passwords do not match."))
|
|
|
|
set_password(username, new_password)
|
|
logger.info("password changed: %s", username)
|
|
return HTMLResponse(_settings_page(username, personas, back_persona, success="Password updated successfully."))
|
|
|
|
|
|
@router.post("/settings/username", include_in_schema=False)
|
|
async def rename_username(
|
|
request: Request,
|
|
new_username: str = Form(...),
|
|
):
|
|
username = _get_session_user(request)
|
|
if not username:
|
|
return RedirectResponse("/login", status_code=302)
|
|
|
|
personas = list_user_personas(username)
|
|
back_persona = _preferred_persona(request, username)
|
|
new_username = new_username.strip().lower()
|
|
|
|
if not _SLUG_RE.match(new_username):
|
|
return HTMLResponse(_settings_page(
|
|
username, personas, back_persona,
|
|
error="Invalid username. Use lowercase letters, digits, _ or - only."))
|
|
|
|
if new_username == username:
|
|
return RedirectResponse("/settings", status_code=302)
|
|
|
|
home_root = app_settings.home_root()
|
|
old_dir = home_root / username
|
|
new_dir = home_root / new_username
|
|
|
|
if new_dir.exists():
|
|
return HTMLResponse(_settings_page(
|
|
username, personas, back_persona,
|
|
error=f"Username '{new_username}' is already taken."))
|
|
|
|
old_dir.rename(new_dir)
|
|
logger.info("user renamed: %s → %s", username, new_username)
|
|
|
|
# Clear the auth cookie — old JWT now refers to a non-existent user
|
|
resp = RedirectResponse("/login?msg=username_changed", status_code=302)
|
|
resp.delete_cookie(COOKIE_NAME)
|
|
return resp
|
|
|
|
|
|
@router.post("/settings/gemini-key", include_in_schema=False)
|
|
async def save_gemini_key(
|
|
request: Request,
|
|
gemini_api_key: str = Form(...),
|
|
):
|
|
username = _get_session_user(request)
|
|
if not username:
|
|
return RedirectResponse("/login", status_code=302)
|
|
|
|
personas = list_user_personas(username)
|
|
back_persona = _preferred_persona(request, username)
|
|
gemini_api_key = gemini_api_key.strip()
|
|
|
|
data = _read_auth(username)
|
|
if gemini_api_key:
|
|
data["gemini_api_key"] = gemini_api_key
|
|
msg = "Gemini API key saved."
|
|
else:
|
|
data.pop("gemini_api_key", None)
|
|
msg = "Gemini API key removed — using server key."
|
|
_write_auth(username, data)
|
|
logger.info("gemini key updated: %s", username)
|
|
return HTMLResponse(_settings_page(username, personas, back_persona, success=msg))
|
|
|
|
|
|
@router.post("/settings/persona/rename", include_in_schema=False)
|
|
async def rename_persona(
|
|
request: Request,
|
|
old_name: str = Form(...),
|
|
new_name: str = Form(...),
|
|
):
|
|
username = _get_session_user(request)
|
|
if not username:
|
|
return RedirectResponse("/login", status_code=302)
|
|
|
|
personas = list_user_personas(username)
|
|
back_persona = _preferred_persona(request, username)
|
|
new_name = new_name.strip().lower()
|
|
|
|
if not _SLUG_RE.match(new_name):
|
|
return HTMLResponse(_settings_page(
|
|
username, personas, back_persona,
|
|
error="Invalid name. Use lowercase letters, digits, _ or - only."))
|
|
|
|
if new_name == old_name:
|
|
return RedirectResponse("/settings", status_code=302)
|
|
|
|
persona_root = app_settings.home_root() / username / "persona"
|
|
old_dir = persona_root / old_name
|
|
new_dir = persona_root / new_name
|
|
|
|
if not old_dir.exists():
|
|
return HTMLResponse(_settings_page(username, personas, back_persona, error=f"Persona '{old_name}' not found."))
|
|
|
|
if new_dir.exists():
|
|
return HTMLResponse(_settings_page(
|
|
username, personas, back_persona,
|
|
error=f"A persona named '{new_name}' already exists."))
|
|
|
|
old_dir.rename(new_dir)
|
|
logger.info("persona renamed: %s/%s → %s", username, old_name, new_name)
|
|
return RedirectResponse("/settings", status_code=302)
|
|
|
|
|
|
@router.post("/settings/email-allowlist", include_in_schema=False)
|
|
async def save_email_allowlist(
|
|
request: Request,
|
|
patterns: str = Form(""),
|
|
):
|
|
username = _get_session_user(request)
|
|
if not username:
|
|
return RedirectResponse("/login", status_code=302)
|
|
|
|
personas = list_user_personas(username)
|
|
back_persona = _preferred_persona(request, username)
|
|
lines = [ln.strip() for ln in patterns.splitlines() if ln.strip()]
|
|
path = app_settings.home_root() / username / "email_allowlist.json"
|
|
path.write_text(json.dumps(lines, indent=2))
|
|
logger.info("email allowlist updated for %s (%d patterns)", username, len(lines))
|
|
return HTMLResponse(_settings_page(username, personas, back_persona, success=f"Email allowlist saved ({len(lines)} pattern{'s' if len(lines) != 1 else ''})."))
|