Files
Cortex-Inara/cortex/routers/tools_settings.py
Scott Idem 70665fadff feat: schedules UI, task cron type, monthly/yearly schedules, AE DB tools, integrations page
- Schedules web UI (/settings/crons): list, add, edit, pause/resume, delete jobs
- cron task type: full orchestrator tool loop on a schedule, result → notification channel
- parse_schedule: monthly/yearly formats (monthly:DD:HH:MM, yearly:MM:DD:HH:MM)
- HA inbound webhook tools toggle: orchestrator loop vs. direct LLM, configurable in UI
- ae_db_query/describe/show_view: SELECT-only Aether MariaDB access (admin, per-user creds)
- /settings/integrations: admin-only page for Aether DB credentials
- Schedules nav link added to all settings pages
- pymysql added to requirements
- Docs updated: HELP.md, MASTER.md, CLAUDE.md

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-13 21:06:43 -04:00

194 lines
7.0 KiB
Python

"""
Tool settings router.
Routes:
GET /settings/tools → tool risk policy page
POST /settings/tools → save max_risk + per-tool overrides
"""
import html as _html
import json
import logging
from pathlib import Path
import jwt
from fastapi import APIRouter, Form, Request
from fastapi.responses import HTMLResponse, RedirectResponse
from auth_utils import COOKIE_NAME, decode_token, get_tool_policy, save_tool_policy, _read_auth
from persona import list_user_personas
from tools import TOOL_CATEGORIES, TOOL_RISK, CONFIRM_REQUIRED
logger = logging.getLogger(__name__)
router = APIRouter()
_STATIC = Path(__file__).parent.parent / "static"
_LAST_PERSONA_COOKIE = "cx_last_persona"
def _get_session_user(request: Request) -> str | None:
token = request.cookies.get(COOKIE_NAME)
if not token:
return None
try:
return decode_token(token)
except jwt.InvalidTokenError:
return None
def _preferred_persona(request: Request, username: str) -> str:
names = list_user_personas(username)
if not names:
return ""
cookie_val = request.cookies.get(_LAST_PERSONA_COOKIE, "")
return cookie_val if cookie_val in names else (names[0] if names else "")
def _build_tool_table(policy: dict) -> str:
"""Generate the per-tool override table rows grouped by category."""
whitelist = set(policy.get("whitelist") or [])
blacklist = set(policy.get("blacklist") or [])
rows: list[str] = []
for category, tools in TOOL_CATEGORIES.items():
# Category header spanning all columns
escaped_cat = _html.escape(category)
rows.append(f'<tr class="tool-cat-row"><td colspan="4">{escaped_cat}</td></tr>')
for tool in tools:
risk = TOOL_RISK.get(tool, "medium")
risk_cls = f"risk-{risk}"
risk_html = f'<span class="risk {risk_cls}">{_html.escape(risk)}</span>'
# Override select value
if tool in whitelist:
override_val = "whitelist"
elif tool in blacklist:
override_val = "blacklist"
else:
override_val = "default"
def _opt(val: str, label: str) -> str:
sel = 'selected' if override_val == val else ''
return f'<option value="{val}" {sel}>{label}</option>'
override_sel = (
f'<select name="override_{_html.escape(tool)}" '
f'class="override-sel" data-tool="{_html.escape(tool)}">'
+ _opt("default", "Default (auto)")
+ _opt("whitelist", "Force include")
+ _opt("blacklist", "Force exclude")
+ '</select>'
)
rows.append(
f'<tr data-tool-risk="{_html.escape(risk)}">'
f'<td class="tool-name">{_html.escape(tool)}</td>'
f'<td>{risk_html}</td>'
f'<td><span class="auto-pill"></span></td>'
f'<td>{override_sel}</td>'
f'</tr>'
)
table_body = "\n".join(rows)
return (
'<table class="tool-table">'
'<thead><tr>'
'<th>Tool</th><th>Risk</th><th>Auto status</th><th>Override</th>'
'</tr></thead>'
f'<tbody>{table_body}</tbody>'
'</table>'
)
def _tools_page(
username: str,
back_persona: str = "",
success: str = "",
error: str = "",
) -> str:
html = (_STATIC / "tools_settings.html").read_text()
policy = get_tool_policy(username)
max_risk = policy.get("max_risk") or ""
# Max risk select options
html = html.replace("{{ sel_none }}", "selected" if max_risk == "" else "")
html = html.replace("{{ sel_low }}", "selected" if max_risk == "low" else "")
html = html.replace("{{ sel_medium }}", "selected" if max_risk == "medium" else "")
html = html.replace("{{ sel_high }}", "selected" if max_risk == "high" else "")
html = html.replace("{{ tool_table_html }}", _build_tool_table(policy))
html = html.replace("{{ tool_risk_json }}", json.dumps(TOOL_RISK))
html = html.replace("{{ confirm_required_tools }}", _html.escape(", ".join(sorted(CONFIRM_REQUIRED))))
html = html.replace("{{ tool_allow }}", _html.escape("\n".join(policy.get("allow") or [])))
html = html.replace("{{ tool_deny }}", _html.escape("\n".join(policy.get("deny") or [])))
html = html.replace("{{ back_href }}", f"/{username}/{back_persona}" if back_persona else "/")
html = html.replace("{{ help_href }}", f"/help?persona={back_persona}" if back_persona else "/help")
nav = '<a href="/settings/integrations" class="nav-link">Integrations</a>' \
if _read_auth(username).get("role", "user") == "admin" else ""
html = html.replace("{{ integrations_nav }}", nav)
if success:
html = html.replace("<!-- SUCCESS -->", f'<p class="success">{success}</p>')
if error:
html = html.replace("<!-- ERROR -->", f'<p class="error">{error}</p>')
return html
@router.get("/settings/tools", include_in_schema=False)
async def tools_page(request: Request):
username = _get_session_user(request)
if not username:
return RedirectResponse("/login", status_code=302)
back_persona = _preferred_persona(request, username)
return HTMLResponse(_tools_page(username, back_persona))
@router.post("/settings/tools", include_in_schema=False)
async def save_tools(request: Request):
username = _get_session_user(request)
if not username:
return RedirectResponse("/login", status_code=302)
back_persona = _preferred_persona(request, username)
form = await request.form()
max_risk = (form.get("max_risk") or "").strip()
if max_risk not in ("", "low", "medium", "high"):
max_risk = ""
whitelist: list[str] = []
blacklist: list[str] = []
all_tools = [t for tools in TOOL_CATEGORIES.values() for t in tools]
for tool in all_tools:
val = (form.get(f"override_{tool}") or "").strip()
if val == "whitelist":
whitelist.append(tool)
elif val == "blacklist":
blacklist.append(tool)
allow_tools = [ln.strip() for ln in (form.get("allow_list") or "").splitlines() if ln.strip()]
deny_tools = [ln.strip() for ln in (form.get("deny_list") or "").splitlines() if ln.strip()]
policy = get_tool_policy(username)
if max_risk:
policy["max_risk"] = max_risk
else:
policy.pop("max_risk", None)
policy["whitelist"] = whitelist
policy["blacklist"] = blacklist
policy["allow"] = allow_tools
policy["deny"] = deny_tools
save_tool_policy(username, policy)
logger.info(
"tool policy saved for %s: max_risk=%s whitelist=%d blacklist=%d allow=%d deny=%d",
username, max_risk or "none", len(whitelist), len(blacklist), len(allow_tools), len(deny_tools),
)
return HTMLResponse(_tools_page(
username, back_persona,
success=f"Tool policy saved — max risk: {max_risk or 'none'}, "
f"{len(whitelist)} whitelisted, {len(blacklist)} blacklisted.",
))