feat: tool schema optimization, keyword routing, aider_run coding agent

Tool schema optimization (PLAN__Tool_Schema_Optimization.md Phases 1-3):
- model_registry.py: ROLE_DEFAULT_TOOLS — distill gets [], research/coder get
  narrow tool lists by default; applied in get_role_config() when user hasn't
  configured a custom list
- openai_orchestrator.py: keyword routing via narrow_tools_by_keywords() — scans
  user message + last assistant turn; narrows active schemas to matched categories
  only (e.g. "weather" → 3 web tools instead of 69); zero tools sent for pure chat
- openai_orchestrator.py: _get_cached_tools() — module-level schema cache keyed by
  (role, sorted_tool_list, risk_params); eliminates redundant schema rebuilds
- openai_orchestrator.py: _TOOL_SCHEMA_OVERHEAD 3000 → 500 tokens (schemas now
  excluded from the per-call fixed estimate since they're cached separately)
- tools/__init__.py: CATEGORY_TOOL_MAP + _KEYWORD_CATEGORY_MAP + classify_tool_categories()
  + narrow_tools_by_keywords() — the classifier logic lives here so both orchestrators
  can share it

aider_run tool (cortex/tools/aider.py):
- Invokes Aider as a subprocess with --message --yes-always --no-pretty --no-stream
- Project aliases: cortex / aether_api / aether_frontend / aether_container
- Auto-injects OpenRouter API key from Cortex model registry (no ~/.env needed)
- background=True fires async + registers in agent_manager; notify=True sends push
  notification on completion
- admin-only, confirm-required, TOOL_RISK=high
- .gitignore: added .aider.chat.history.md / .aider.input.history / .aider.llm.history

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Scott Idem
2026-06-03 22:39:44 -04:00
parent 29940c299b
commit 29d8aa4aae
6 changed files with 830 additions and 10 deletions

View File

@@ -81,6 +81,24 @@ from config import settings
logger = logging.getLogger(__name__)
# ── Role-level tool defaults ───────────────────────────────────────────────────
# Applied when a user hasn't configured a custom tool list for a role.
# None = no restriction (all accessible tools); [] = no tools (pure text processing).
# "chat" is intentionally absent: the /chat endpoint never sends tool schemas anyway,
# and the orchestrator uses chat_role="chat" as its default — restricting it here
# would block all tools from every default orchestration request.
# "orchestrator" is intentionally absent — Phase 2 keyword routing narrows it per message.
ROLE_DEFAULT_TOOLS: dict[str, list[str] | None] = {
"distill": [], # pure text processing — no tools needed
"research": ["web_search", "web_read", "http_fetch"],
"coder": [
"project_file_read", "project_file_list", "file_stat", "file_grep",
"file_diff", "file_syntax_check", "file_read", "file_list", "file_write",
"git_status", "git_log", "git_diff", "shell_exec",
],
}
# ── Provider model catalogs ───────────────────────────────────────────────────
# Server-side defaults. Update here when providers release new models.
# Users can add entries via the settings UI (Phase 2).
@@ -482,9 +500,16 @@ def get_role_config(username: str, role: str) -> dict:
"""
registry = _load(username)
role_cfg = registry.get("roles", {}).get(role, {})
user_tools = role_cfg.get("tools")
if user_tools is None:
# No user-configured list — fall back to system defaults for this role
effective_tools: list[str] | None = ROLE_DEFAULT_TOOLS.get(role)
else:
# User has configured tools; preserve their setting (empty list → no restriction)
effective_tools = user_tools or None
return {
"system_append": role_cfg.get("system_append", ""),
"tools": role_cfg.get("tools") or None,
"tools": effective_tools,
"inject_datetime": role_cfg.get("inject_datetime", True),
"inject_mode": role_cfg.get("inject_mode", True),
}

View File

@@ -25,7 +25,7 @@ from openai import AsyncOpenAI, APIConnectionError, APIStatusError
from config import settings
from orchestrator_engine import OrchestrateCheckpoint, OrchestratorResult
from tools import OPENAI_TOOL_SCHEMAS, call_tool, get_openai_tools_for_role, get_tools_for_role, CONFIRM_REQUIRED
from tools import OPENAI_TOOL_SCHEMAS, call_tool, get_openai_tools_for_role, get_tools_for_role, CONFIRM_REQUIRED, narrow_tools_by_keywords
import tool_audit
logger = logging.getLogger(__name__)
@@ -76,8 +76,18 @@ async def run(
_confirm_deny = frozenset(confirm_deny or ())
effective_confirm = (CONFIRM_REQUIRED - set(_confirm_allow)) | set(_confirm_deny)
# Keyword routing: narrow schemas to only what this message needs.
# Also scans the last assistant turn so follow-ups like "yes, do that" inherit tool context.
# Returns [] when no keywords match (zero tool overhead — model responds as plain chat).
effective_tool_list = narrow_tools_by_keywords(task, tool_list, context_messages=session_messages)
logger.info(
"Keyword routing: %d tools active (role_tools=%s)",
len(effective_tool_list),
len(tool_list) if tool_list is not None else "all",
)
client, model_name, active_tools = _build_client(
model_cfg, user_role, tool_list,
model_cfg, user_role, effective_tool_list,
max_risk=max_risk, risk_whitelist=risk_whitelist, risk_blacklist=risk_blacklist,
)
tool_audit.set_context("openai", model_cfg.get("label") or model_name)
@@ -104,7 +114,7 @@ async def run(
model_cfg=model_cfg,
respond_with_final=respond_with_final,
user_role=user_role,
tool_list=tool_list,
tool_list=effective_tool_list,
confirm_allow=_confirm_allow,
confirm_deny=_confirm_deny,
starting_round=0,
@@ -198,13 +208,39 @@ async def resume(checkpoint: OrchestrateCheckpoint, confirmed: bool) -> Orchestr
_CHARS_PER_TOKEN = 4
# Fixed token overhead budget for sending 40 tool schemas per call
_TOOL_SCHEMA_OVERHEAD = 3_000
# Fixed token overhead budget per call (tool schemas excluded — cached separately)
_TOOL_SCHEMA_OVERHEAD = 500
# Chars to keep per truncated old tool result
_TRUNC_RESULT_CHARS = 400
# Always keep the last N tool-result messages uncompacted
_KEEP_RECENT_TOOL_MSGS = 6 # ~2 rounds of 3 tools each
# Module-level schema cache: key = (user_role, sorted_tools, risk_params)
# Bounded in practice — keyword routing produces at most ~30 distinct tool sets.
_tool_schema_cache: dict[str, list[dict]] = {}
def _get_cached_tools(
user_role: str,
tool_list: list[str] | None,
max_risk: str | None = None,
whitelist: list[str] | None = None,
blacklist: list[str] | None = None,
) -> list[dict]:
key = "|".join([
user_role,
str(sorted(tool_list) if tool_list is not None else "all"),
str(max_risk),
str(sorted(whitelist) if whitelist else ""),
str(sorted(blacklist) if blacklist else ""),
])
if key not in _tool_schema_cache:
_tool_schema_cache[key] = get_openai_tools_for_role(
user_role, tool_list,
max_risk=max_risk, whitelist=whitelist, blacklist=blacklist,
)
return _tool_schema_cache[key]
def _estimate_tokens(messages: list[dict]) -> int:
total = sum(len(json.dumps(m)) for m in messages)
@@ -448,7 +484,7 @@ def _build_client(
if model_cfg.get("tools") is False:
active_tools = []
else:
active_tools = get_openai_tools_for_role(
active_tools = _get_cached_tools(
user_role, tool_list,
max_risk=max_risk, whitelist=risk_whitelist, blacklist=risk_blacklist,
)

View File

@@ -87,7 +87,13 @@ from tools.git import (
git_log as _git_log,
git_diff as _git_diff,
)
from tools.agents import spawn_agent as _spawn_agent
from tools.agents import (
spawn_agent as _spawn_agent,
agent_status as _agent_status,
agent_list as _agent_list,
agent_cancel as _agent_cancel,
)
from tools.aider import aider_run as _aider_run
from tools.homeassistant import (
ha_get_state as _ha_get_state,
ha_get_states as _ha_get_states,
@@ -114,6 +120,7 @@ import tools.notify as _mod_notify
import tools.agent_notes as _mod_agent_notes
import tools.git as _mod_git
import tools.agents as _mod_agents
import tools.aider as _mod_aider
import tools.homeassistant as _mod_homeassistant
import tools.ae_database as _mod_ae_database
@@ -140,7 +147,7 @@ TOOL_CATEGORIES: dict[str, list[str]] = {
],
"Aether Tasks": ["ae_task_list"],
"Agent Notes": ["agent_notes_read", "agent_notes_write", "agent_notes_append", "agent_notes_clear"],
"Agents": ["spawn_agent"],
"Agents": ["spawn_agent", "agent_status", "agent_list", "agent_cancel", "aider_run"],
"Home Assistant": ["ha_get_state", "ha_get_states", "ha_call_service"],
"Aether Database": ["ae_db_query", "ae_db_describe", "ae_db_show_view"],
}
@@ -207,6 +214,10 @@ _CALLABLES: dict[str, callable] = {
"git_log": _git_log,
"git_diff": _git_diff,
"spawn_agent": _spawn_agent,
"agent_status": _agent_status,
"agent_list": _agent_list,
"agent_cancel": _agent_cancel,
"aider_run": _aider_run,
"ha_get_state": _ha_get_state,
"ha_get_states": _ha_get_states,
"ha_call_service": _ha_call_service,
@@ -230,6 +241,10 @@ TOOL_ROLES: dict[str, str] = {
"file_write": "admin",
"ae_task_list": "admin",
"spawn_agent": "admin",
"agent_status": "user",
"agent_list": "user",
"agent_cancel": "admin",
"aider_run": "admin",
"email_send": "admin",
"nc_talk_send": "admin",
"http_post": "admin",
@@ -251,6 +266,8 @@ CONFIRM_REQUIRED: set[str] = {
"http_post",
"ha_call_service",
"ae_journal_entry_disable", # disables a journal entry — not easily reversed
"agent_cancel", # kills a running background task
"aider_run", # edits files and commits — irreversible without git revert
}
# Security risk ratings — informational for now; will drive auto-allow tiers later.
@@ -348,8 +365,12 @@ TOOL_RISK: dict[str, str] = {
"git_log": "low",
"git_diff": "low",
# Agents — spawning a subprocess with broad permissions is high
# Agents — spawning is high; lifecycle reads are low; cancel is medium (kills a task)
"spawn_agent": "high",
"agent_status": "low",
"agent_list": "low",
"agent_cancel": "medium",
"aider_run": "high",
# Home Assistant — reads are low; controlling physical devices is high
"ha_get_state": "low",
@@ -388,6 +409,7 @@ _ALL_DECLARATIONS: list[types.FunctionDeclaration] = (
+ _mod_ae_tasks.DECLARATIONS
+ _mod_agent_notes.DECLARATIONS
+ _mod_agents.DECLARATIONS
+ _mod_aider.DECLARATIONS
+ _mod_homeassistant.DECLARATIONS
+ _mod_ae_database.DECLARATIONS
)
@@ -554,3 +576,114 @@ def get_openai_tools_for_role(
if tool_list is not None:
allowed &= set(tool_list)
return [t for t in OPENAI_TOOL_SCHEMAS if t["function"]["name"] in allowed]
# ── Keyword-based tool routing ─────────────────────────────────────────────────
# Maps classifier category names → tool names in that category
CATEGORY_TOOL_MAP: dict[str, list[str]] = {
"web": ["web_search", "web_read", "http_fetch"],
"web_post": ["http_post"],
"file": ["project_file_read", "project_file_list", "file_stat", "file_grep",
"file_diff", "file_syntax_check", "file_read", "file_list", "file_write"],
"git": ["git_status", "git_log", "git_diff"],
"system": ["cortex_restart", "cortex_logs", "cortex_status", "cortex_update", "shell_exec"],
"tasks": ["task_list", "task_create", "task_update", "task_complete"],
"cron": ["cron_list", "cron_add", "cron_remove", "cron_toggle"],
"reminders": ["reminders_add", "reminders_list", "reminders_remove", "reminders_clear"],
"scratchpad": ["scratch_read", "scratch_write", "scratch_append", "scratch_clear"],
"ha": ["ha_get_state", "ha_get_states", "ha_call_service"],
"aether": ["ae_journal_list", "ae_journal_search", "ae_journal_entries_list",
"ae_journal_entry_read", "ae_journal_entry_create", "ae_journal_entry_update",
"ae_journal_entry_disable", "ae_journal_entry_append", "ae_journal_entry_prepend"],
"aether_db": ["ae_db_query", "ae_db_describe", "ae_db_show_view"],
"notifications":["web_push", "email_send", "nc_talk_send", "nc_talk_history"],
"agents": ["spawn_agent", "agent_status", "agent_list", "agent_cancel", "aider_run"],
"notes": ["agent_notes_read", "agent_notes_write", "agent_notes_append", "agent_notes_clear"],
"session": ["session_read", "session_search"],
"ae_tasks": ["ae_task_list"],
"claude": ["claude_allow_dir"],
}
_KEYWORD_CATEGORY_MAP: dict[str, list[str]] = {
"web": ["search", "look up", "what is", "who is", "weather", "forecast",
"news", "find on", "google", "website", "article", "research",
"temperature"],
"web_post": ["post to", "send to", "webhook", "trigger webhook"],
"file": ["read file", "show file", "list file", "directory", "grep",
"search in", "find in", "diff", "compare", "syntax check", "open file"],
"git": ["git", "commit", "branch", "pulled", "merged", "repository", "repo"],
"system": ["restart", "update", "status", "logs", "log", "deploy", "run command",
"shell", "is it running", "health"],
"tasks": ["task", "todo", "to-do", "to do", "add task", "create task",
"pending", "what's on my list"],
"cron": ["schedule", "cron", "every day", "every week", "recurring",
"automate", "job"],
"reminders": ["remind", "reminder", "don't forget"],
"scratchpad": ["scratch", "scratchpad", "working note", "jot down", "notepad"],
"ha": ["home assistant", "light", "thermostat", "turn on", "turn off",
"switch", "sensor", "temperature in", "kitchen", "bedroom", "garage"],
"aether": ["journal", "aether journal", "note entry", "log entry",
"search journal", "ae_journal"],
"aether_db": ["database", "query", "sql", "select", "db", "table",
"schema", "maria", "run query"],
"notifications":["notify", "push notification", "send email", "email",
"talk message", "nextcloud"],
"agents": ["spawn", "sub-agent", "delegate", "spawn agent",
"agent status", "agent list", "cancel agent", "background agent",
"aider", "code change", "edit code", "make a change to", "fix the code"],
"notes": ["agent notes", "private notes", "my notes", "agent_notes"],
"session": ["session", "history", "last time", "what did we", "earlier",
"yesterday", "last week", "previously"],
"ae_tasks": ["ae task", "kanban", "board", "ae_task"],
"claude": ["claude allow", "claude directory"],
}
def classify_tool_categories(message: str) -> list[str]:
"""Return category names whose keywords appear in message (case-insensitive).
Empty return means no tool category matched — route as pure chat with zero tool overhead.
"""
low = message.lower()
return [cat for cat, kws in _KEYWORD_CATEGORY_MAP.items() if any(kw in low for kw in kws)]
def narrow_tools_by_keywords(
message: str,
role_tools: list[str] | None,
context_messages: list[dict] | None = None,
) -> list[str]:
"""Narrow the active tool list to categories relevant to this message.
Also scans the last assistant message in context_messages — this catches follow-up
patterns like "yes, please do that" where the tool intent was expressed by the assistant
in the prior turn and the user is simply confirming.
Returns [] if no keywords matched (zero tool overhead).
Returns keyword-matched tools, intersected with role_tools if role_tools is set.
"""
scan_text = message
if context_messages:
for m in reversed(context_messages):
if m.get("role") == "assistant":
scan_text = scan_text + " " + (m.get("content") or "")
break
matched = classify_tool_categories(scan_text)
if not matched:
return []
seen: set[str] = set()
dynamic: list[str] = []
for cat in matched:
for t in CATEGORY_TOOL_MAP.get(cat, []):
if t not in seen:
seen.add(t)
dynamic.append(t)
if role_tools is not None:
role_set = set(role_tools)
dynamic = [t for t in dynamic if t in role_set]
return dynamic

258
cortex/tools/aider.py Normal file
View File

@@ -0,0 +1,258 @@
"""
Aider coding agent tool — invokes Aider AI pair programming as a subprocess.
Aider handles repo-map generation, file editing, git commits, and linting automatically.
It works with any OpenAI-compatible model — point it at DeepSeek, Ollama, OpenRouter, etc.
via AIDER_MODEL / AIDER_OPENAI_API_BASE env vars or the project's .aider.conf.yml.
background=True runs the subprocess asynchronously and returns an agent_id immediately.
The caller can poll via agent_status() or request a push notification via notify=True.
"""
import asyncio
import logging
import os
from pathlib import Path
from google.genai import types
import agent_manager
logger = logging.getLogger(__name__)
_CORTEX_DIR = Path(__file__).parent # .../Cortex_and_Inara_dev/cortex/
_PROJECT_ROOT = _CORTEX_DIR.parent # .../Cortex_and_Inara_dev/
# Known project aliases — expand before passing to subprocess
_PROJECT_ALIASES: dict[str, str] = {
"cortex": str(_PROJECT_ROOT),
"aether_api": "~/OSIT_dev/aether_api_fastapi",
"aether_frontend": "~/OSIT_dev/aether_app_sveltekit",
"aether_container": "~/OSIT_dev/aether_container_env",
}
_MAX_OUTPUT_CHARS = 12_000
async def aider_run(
project: str,
task: str,
files: list[str] | None = None,
model: str | None = None,
auto_commit: bool = True,
timeout: int = 300,
background: bool = False,
notify: bool = False,
) -> str:
"""Run Aider with a single task in a project directory, then exit.
When background=True, fires the subprocess asynchronously and returns an agent_id
immediately. Use agent_status(agent_id) to check progress; set notify=True to
receive a push/Talk notification on completion.
"""
resolved = _PROJECT_ALIASES.get(project, project)
cwd = Path(os.path.expanduser(resolved))
if not cwd.is_dir():
return f"Error: project directory '{resolved}' does not exist."
timeout = min(max(int(timeout), 10), 600)
cmd: list[str] = [
"aider",
"--message", task,
"--yes-always",
"--no-pretty",
"--no-stream",
"--no-check-update",
"--no-detect-urls",
"--auto-commits" if auto_commit else "--no-auto-commits",
]
# Inject OpenRouter credentials from the Cortex model registry if available.
# Aider's subprocess inherits Cortex's environment, which doesn't include keys
# stored in ~/.env or shell profiles. Pulling from the registry keeps it self-contained.
try:
import model_registry
from persona import get_user
user = get_user() or "scott"
registry = model_registry.get_registry(user)
or_host = next(
(h for h in registry.get("hosts", []) if "openrouter.ai" in h.get("api_url", "")),
None,
)
if or_host and or_host.get("api_key"):
cmd += ["--api-key", f"openrouter={or_host['api_key']}"]
except Exception:
user = "scott" # non-fatal — user may have key via env or .aider.conf.yml
if model:
cmd += ["--model", model]
for f in (files or []):
cmd += ["--file", f]
logger.info(
"aider_run: project=%s model=%s auto_commit=%s files=%s background=%s task=%.120s",
project, model, auto_commit, files, background, task,
)
async def _run() -> str:
proc = await asyncio.create_subprocess_exec(
*cmd,
cwd=str(cwd),
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=float(timeout))
out = stdout.decode(errors="replace").strip()
err = stderr.decode(errors="replace").strip()
parts = []
if out:
parts.append(out)
if err:
parts.append(f"[stderr]\n{err}")
combined = "\n".join(parts) if parts else "(no output)"
if len(combined) > _MAX_OUTPUT_CHARS:
half = _MAX_OUTPUT_CHARS // 2
combined = (
combined[:half]
+ f"\n\n[... {len(combined) - _MAX_OUTPUT_CHARS} chars trimmed ...]\n\n"
+ combined[-half:]
)
if proc.returncode not in (0, 1):
return f"[exit {proc.returncode}]\n{combined}"
return combined
if background:
rec = await agent_manager.register(
user=user,
role="aider",
task=task,
level=2,
notify=notify,
)
async def _bg_task() -> None:
try:
result = await _run()
await agent_manager.finish(rec.agent_id, result, "done")
logger.info("aider_run [bg]: done %s", rec.agent_id[:8])
except asyncio.CancelledError:
await agent_manager.finish(rec.agent_id, "Cancelled.", "cancelled")
raise
except asyncio.TimeoutError:
msg = f"Aider timed out after {timeout}s"
logger.warning("aider_run [bg]: timeout %s", rec.agent_id[:8])
await agent_manager.finish(rec.agent_id, msg, "timeout")
except FileNotFoundError:
msg = "Error: 'aider' not found in PATH — run: pip install aider-chat"
await agent_manager.finish(rec.agent_id, msg, "failed")
except Exception as e:
logger.error("aider_run [bg]: failed %s: %s", rec.agent_id[:8], e)
await agent_manager.finish(rec.agent_id, str(e), "failed")
bg = asyncio.create_task(_bg_task())
agent_manager.set_task_ref(rec.agent_id, bg)
return (
f"Aider task started in background. ID: {rec.agent_id}\n"
f"Use agent_status('{rec.agent_id}') to monitor progress."
)
# Synchronous path
try:
return await _run()
except asyncio.TimeoutError:
return f"Error: aider timed out after {timeout}s"
except FileNotFoundError:
return "Error: 'aider' not found in PATH — run: pip install aider-chat"
except Exception as e:
logger.error("aider_run error: %s", e)
return f"Error: {e}"
DECLARATIONS = [
types.FunctionDeclaration(
name="aider_run",
description=(
"Run the Aider AI coding agent on a project with a single task, then exit. "
"Aider maps the repo, edits files, runs lint checks, and optionally commits. "
"Use for code changes, bug fixes, refactoring, or new features across any "
"configured project. Model is set via AIDER_MODEL env var or .aider.conf.yml "
"in the project directory — no API key needed if the project is already configured. "
"Set background=True for long tasks — returns an agent_id immediately and sends "
"a notification when done. ADMIN ONLY. Requires confirmation."
),
parameters=types.Schema(
type=types.Type.OBJECT,
properties={
"project": types.Schema(
type=types.Type.STRING,
description=(
"Project alias or absolute path. Known aliases: "
"'cortex' (this project), 'aether_api', 'aether_frontend', "
"'aether_container'. Or provide an absolute path like "
"'/home/scott/OSIT_dev/aether_api_fastapi'."
),
),
"task": types.Schema(
type=types.Type.STRING,
description=(
"Full task description sent to Aider as --message. "
"Be specific — include file names, what to change, and why. "
"Example: 'In cortex/tools/web.py, add a max_chars parameter "
"to web_read() capped at 32768.'"
),
),
"files": types.Schema(
type=types.Type.ARRAY,
items=types.Schema(type=types.Type.STRING),
description=(
"Optional list of files to add explicitly to the editing context "
"(paths relative to the project root). "
"Aider also builds a repo map automatically — these get priority."
),
),
"model": types.Schema(
type=types.Type.STRING,
description=(
"Optional model override. Examples: 'deepseek/deepseek-chat', "
"'openrouter/anthropic/claude-3-5-haiku-20241022'. "
"Defaults to the project's .aider.conf.yml model or AIDER_MODEL env var."
),
),
"auto_commit": types.Schema(
type=types.Type.BOOLEAN,
description=(
"Auto-commit changes after edits (default: true). "
"Set to false to review diffs before committing manually."
),
),
"timeout": types.Schema(
type=types.Type.INTEGER,
description="Max seconds to wait for Aider to finish (default 300, max 600).",
),
"background": types.Schema(
type=types.Type.BOOLEAN,
description=(
"Run asynchronously in the background (default: false). "
"Returns an agent_id immediately; use agent_status(agent_id) to monitor. "
"Recommended for tasks expected to take more than ~60 seconds."
),
),
"notify": types.Schema(
type=types.Type.BOOLEAN,
description=(
"Send a push/Talk notification when the background task completes "
"(default: false). Only applies when background=true."
),
),
},
required=["project", "task"],
),
)
]