Compare commits

...

4 Commits

Author SHA1 Message Date
Scott Idem
e8819773ee docs: update TODO — mark completed items from 2026-06-03 session
- aider_run async/notify: done
- L2→L3 boundary enforcement: done (default _agent_level=2)
- aider_run multi-provider credentials: done
- Added remaining item: pass _agent_level=1 from main orchestrators

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-03 23:06:47 -04:00
Scott Idem
0c1cf3989a feat: aider multi-provider credentials + test suite green (182/182)
aider_run multi-provider credentials (tools/aider.py):
- _resolve_credentials() — general credential resolver; replaces the previous
  OpenRouter-only injection; resolution priority: Anthropic model hint → explicit
  host_label → model prefix (openrouter/*, groq/*, deepseek/*, …) → OpenRouter
  default → Anthropic API key → any keyed cloud host → local/generic host
- _host_flags() — generates --api-key slug=key for known cloud providers (OpenRouter,
  OpenAI, Groq, Together, Fireworks, X.ai, DeepSeek, Mistral); generates
  --openai-api-base + --openai-api-key for generic/local hosts (Open WebUI, Ollama);
  appends /api suffix for openwebui host_type; auto-prefixes model with 'openai/'
  for generic endpoints when model has no / prefix
- Anthropic API keys from providers.anthropic.credentials (not a host entry)
- host_label param added to aider_run and FunctionDeclaration — pick a configured
  host by partial label match (e.g. 'OpenRouter', 'Local', 'scott-lt-i7-rtx')
- 16 unit tests for _resolve_credentials covering all resolution paths

main.py: move @app.get("/health") before app.include_router(ui.router) — the
/{username} catch-all in ui.router was swallowing the /health path

Test suite: 37 pre-existing failures → 182/182 passing
- test_tools.py: _task_list() missing priority arg (6 callsites); cron ID regex
  c_\w+ → c_[\w-]+ (token_urlsafe includes '-', causing intermittent truncation)
- test_webhooks.py: rewritten for per-user channel config architecture —
  patch routers.nextcloud_talk/google_chat.get_user_channels instead of removed
  settings fields; corrected endpoints /webhook/nextcloud/scott and
  /channels/google-chat/scott; non-empty cfg dicts so falsy-guard passes
- test_health.py: test_unknown_route_404 now uses 3-segment path (/{u}/{p}/x)
  since single-segment paths hit the /{username} UI catch-all
- test_api_files.py: removed '../config.py' from not-in-allowed test (ASGI
  normalizes it to /config.py which hits /{username} catch-all, not files router)
- test_security.py: same webhook patch target fix; per-user endpoint URLs

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-03 23:00:45 -04:00
Scott Idem
658c508925 feat: multi-level agent management — background agents, lifecycle tools, 3-level hierarchy
agent_manager.py (new):
- AgentRecord dataclass: agent_id, level (1/2/3), role, task, status, started,
  parent_id (lineage), finished, result, notify, _task_ref
- register() / finish() / cancel_agent() / list_agents() / get() / set_task_ref()
- Calls notification.notify() on completion when notify=True (same channel as
  reminders and cron completions)
- 24-hour pruning of completed records on each new registration

spawn_agent (tools/agents.py):
- background=True: fires asyncio.create_task(), registers in agent_manager, returns
  agent_id string immediately — sync path unchanged (no regression)
- notify=True: push/Talk notification when the background task completes
- Level enforcement: _agent_level param tracks hierarchy depth; when spawning from
  Level 2, child automatically gets spawn_agent + aider_run denied so Level 3 agents
  cannot delegate further

New lifecycle tools (tools/agents.py + __init__.py):
- agent_status(agent_id) — status, role, level, elapsed, task, result preview; user-level
- agent_list(status, limit) — all agents for current user, newest first; user-level
- agent_cancel(agent_id) — kills background task; admin-only, confirm-required

tests/test_agent_manager.py (new, 41 tests):
- agent_manager CRUD, pruning, notification hook
- spawn_agent background: returns immediately, completes async, timeout, failure
- Level enforcement: L1→L2 permits spawn, L2→L3 auto-denies; explicit tool_list path
- agent_status / agent_list / agent_cancel output formatting
- aider_run background: returns agent_id, completes async, sync path unchanged
- All tests run without browser or Cortex service (~2.5s total)
  Run: cd cortex && .venv/bin/python -m pytest tests/test_agent_manager.py -v

Docs: ARCH__FUTURE.md §13 (full design), ROADMAP.md, TODO__Agents.md, MASTER.md,
HELP.md (orchestrator description corrected, tool schema line updated to reflect
keyword routing), CLAUDE.md tool count 66→69.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-03 22:40:20 -04:00
Scott Idem
29d8aa4aae feat: tool schema optimization, keyword routing, aider_run coding agent
Tool schema optimization (PLAN__Tool_Schema_Optimization.md Phases 1-3):
- model_registry.py: ROLE_DEFAULT_TOOLS — distill gets [], research/coder get
  narrow tool lists by default; applied in get_role_config() when user hasn't
  configured a custom list
- openai_orchestrator.py: keyword routing via narrow_tools_by_keywords() — scans
  user message + last assistant turn; narrows active schemas to matched categories
  only (e.g. "weather" → 3 web tools instead of 69); zero tools sent for pure chat
- openai_orchestrator.py: _get_cached_tools() — module-level schema cache keyed by
  (role, sorted_tool_list, risk_params); eliminates redundant schema rebuilds
- openai_orchestrator.py: _TOOL_SCHEMA_OVERHEAD 3000 → 500 tokens (schemas now
  excluded from the per-call fixed estimate since they're cached separately)
- tools/__init__.py: CATEGORY_TOOL_MAP + _KEYWORD_CATEGORY_MAP + classify_tool_categories()
  + narrow_tools_by_keywords() — the classifier logic lives here so both orchestrators
  can share it

aider_run tool (cortex/tools/aider.py):
- Invokes Aider as a subprocess with --message --yes-always --no-pretty --no-stream
- Project aliases: cortex / aether_api / aether_frontend / aether_container
- Auto-injects OpenRouter API key from Cortex model registry (no ~/.env needed)
- background=True fires async + registers in agent_manager; notify=True sends push
  notification on completion
- admin-only, confirm-required, TOOL_RISK=high
- .gitignore: added .aider.chat.history.md / .aider.input.history / .aider.llm.history

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-03 22:39:44 -04:00
21 changed files with 2532 additions and 75 deletions

6
.gitignore vendored
View File

@@ -25,5 +25,11 @@ tmp/
*.tmp
*.log
# Aider — history files are personal/ephemeral; .aider.conf.yml is project config and IS tracked
.aider.chat.history.md
.aider.input.history
.aider.llm.history
# System files
.DS_Store
.aider*

View File

@@ -281,7 +281,7 @@ Cortex is running and stable. All channels are live:
Active users: scott (inara), holly (tina), brian (wintermute)
**65 orchestrator tools** across 17 domain modules:
**69 orchestrator tools** across 17 domain modules:
web_search/http_fetch/web_read/http_post,
project_file_read/list + file_stat/grep/diff/syntax_check (project-scoped),
file_read/list/write/session_read/session_search (system-scoped, admin),
@@ -293,7 +293,8 @@ reminders_add/list/remove/clear, scratch_read/write/append/clear,
web_push/email_send/nc_talk_send/nc_talk_history,
ae_journal_list/search/entries_list/entry_read/create/update/disable/append/prepend,
ae_task_list, ae_db_query/describe/show_view (SELECT-only MariaDB access, admin; disable requires confirm),
agent_notes_read/write/append/clear, spawn_agent,
agent_notes_read/write/append/clear, spawn_agent/aider_run (admin; aider_run requires confirm),
agent_status/agent_list (user-level)/agent_cancel (admin, confirm-required),
ha_get_state/ha_get_states/ha_call_service.
Each tool has a `TOOL_RISK` rating (low/medium/high). Configure access at `/settings/tools`

158
cortex/agent_manager.py Normal file
View File

@@ -0,0 +1,158 @@
"""
Agent lifecycle manager — registry for background spawn_agent and aider_run tasks.
Tracks running and recently completed agents in-process. On completion, fires
notification.notify() if notify=True (same channel used by reminders and cron jobs).
Records are kept for 24 hours after completion, then pruned on next registration.
"""
import asyncio
import logging
import uuid
from dataclasses import dataclass, field
from datetime import datetime, timedelta
logger = logging.getLogger(__name__)
_PRUNE_AFTER = timedelta(hours=24)
_RESULT_PREVIEW_CHARS = 500
_TASK_PREVIEW_CHARS = 200
@dataclass
class AgentRecord:
agent_id: str
level: int # 1 = persona, 2 = specialized sub-agent, 3 = support agent
role: str # e.g. "coder", "research", "chat"
task: str # first _TASK_PREVIEW_CHARS of the task
status: str # running / done / failed / cancelled / timeout
started: datetime
user: str
parent_id: str | None = None # agent_id of the spawner (lineage tracking)
finished: datetime | None = None
result: str | None = None # first _RESULT_PREVIEW_CHARS on completion
notify: bool = False # push notification on completion
_task_ref: "asyncio.Task | None" = field(default=None, repr=False)
# Module-level registry — in-process only, not persisted across restarts.
_agents: dict[str, AgentRecord] = {}
_lock = asyncio.Lock()
async def register(
user: str,
role: str,
task: str,
level: int = 2,
parent_id: str | None = None,
notify: bool = False,
) -> AgentRecord:
"""Create and register a new running agent. Returns the record (agent_id is set)."""
agent_id = str(uuid.uuid4())
rec = AgentRecord(
agent_id=agent_id,
level=level,
role=role,
task=task[:_TASK_PREVIEW_CHARS],
status="running",
started=datetime.now(),
user=user,
parent_id=parent_id,
notify=notify,
)
async with _lock:
_prune_locked()
_agents[agent_id] = rec
logger.info(
"agent_manager: registered %s role=%s level=%d user=%s task=%.60s",
agent_id[:8], role, level, user, task,
)
return rec
def set_task_ref(agent_id: str, task_ref: "asyncio.Task") -> None:
"""Store the asyncio.Task reference so it can be cancelled later.
Call immediately after asyncio.create_task() — before the event loop yields.
"""
rec = _agents.get(agent_id)
if rec:
rec._task_ref = task_ref
async def finish(agent_id: str, result: str, status: str = "done") -> None:
"""Mark an agent complete, store the result, and notify the user if requested."""
async with _lock:
rec = _agents.get(agent_id)
if not rec:
return
rec.status = status
rec.finished = datetime.now()
rec.result = (result or "")[:_RESULT_PREVIEW_CHARS]
logger.info("agent_manager: finished %s status=%s", agent_id[:8], status)
if rec.notify and status != "cancelled":
try:
from notification import notify as _notify
elapsed = int((rec.finished - rec.started).total_seconds())
emoji = "" if status == "done" else "⚠️"
preview = (rec.result or "(no output)")[:200]
msg = f"{emoji} Agent done [{rec.role}, {elapsed}s]: {preview}"
await _notify(rec.user, msg)
except Exception as e:
logger.warning("agent_manager: notification failed for %s: %s", agent_id[:8], e)
async def cancel_agent(agent_id: str, user: str) -> str:
"""Cancel a running background agent. Returns a human-readable status message."""
async with _lock:
rec = _agents.get(agent_id)
if not rec:
return f"No agent found: {agent_id}"
if rec.user != user:
return "Access denied."
if rec.status != "running":
return f"Agent {agent_id[:8]}… is already {rec.status}."
task_ref = rec._task_ref
rec.status = "cancelled"
rec.finished = datetime.now()
if task_ref and not task_ref.done():
task_ref.cancel()
logger.info("agent_manager: cancelled %s by user=%s", agent_id[:8], user)
return f"Agent {agent_id[:8]}… cancelled."
def get(agent_id: str) -> AgentRecord | None:
"""Look up an agent record by ID."""
return _agents.get(agent_id)
def list_agents(user: str, status: str | None = None, limit: int = 10) -> list[AgentRecord]:
"""Return recent agents for a user, newest first.
Does not acquire the lock — safe for read-only listing (Python dict iteration is
thread-safe for reads; we don't care about racing with a concurrent registration).
"""
records = [r for r in _agents.values() if r.user == user]
if status:
records = [r for r in records if r.status == status]
records.sort(key=lambda r: r.started, reverse=True)
return records[:limit]
def _prune_locked() -> None:
"""Remove completed agents older than _PRUNE_AFTER. Must be called inside _lock."""
cutoff = datetime.now() - _PRUNE_AFTER
stale = [
aid for aid, r in _agents.items()
if r.status != "running" and r.finished and r.finished < cutoff
]
for aid in stale:
del _agents[aid]
if stale:
logger.debug("agent_manager: pruned %d stale records", len(stale))

View File

@@ -58,15 +58,16 @@ app.include_router(crons.router)
# Help page
app.include_router(help.router)
# UI router (login + /{user}/{persona} — must be last to avoid swallowing API paths)
app.include_router(ui.router)
# Health check — must be before ui.router so /{username} catch-all doesn't swallow it.
@app.get("/health")
async def health() -> dict:
return {"status": "ok"}
# UI router (login + /{user}/{persona} — must be last to avoid swallowing API paths)
app.include_router(ui.router)
if __name__ == "__main__":
uvicorn.run(
"main:app",

View File

@@ -81,6 +81,24 @@ from config import settings
logger = logging.getLogger(__name__)
# ── Role-level tool defaults ───────────────────────────────────────────────────
# Applied when a user hasn't configured a custom tool list for a role.
# None = no restriction (all accessible tools); [] = no tools (pure text processing).
# "chat" is intentionally absent: the /chat endpoint never sends tool schemas anyway,
# and the orchestrator uses chat_role="chat" as its default — restricting it here
# would block all tools from every default orchestration request.
# "orchestrator" is intentionally absent — Phase 2 keyword routing narrows it per message.
ROLE_DEFAULT_TOOLS: dict[str, list[str] | None] = {
"distill": [], # pure text processing — no tools needed
"research": ["web_search", "web_read", "http_fetch"],
"coder": [
"project_file_read", "project_file_list", "file_stat", "file_grep",
"file_diff", "file_syntax_check", "file_read", "file_list", "file_write",
"git_status", "git_log", "git_diff", "shell_exec",
],
}
# ── Provider model catalogs ───────────────────────────────────────────────────
# Server-side defaults. Update here when providers release new models.
# Users can add entries via the settings UI (Phase 2).
@@ -482,9 +500,16 @@ def get_role_config(username: str, role: str) -> dict:
"""
registry = _load(username)
role_cfg = registry.get("roles", {}).get(role, {})
user_tools = role_cfg.get("tools")
if user_tools is None:
# No user-configured list — fall back to system defaults for this role
effective_tools: list[str] | None = ROLE_DEFAULT_TOOLS.get(role)
else:
# User has configured tools; preserve their setting (empty list → no restriction)
effective_tools = user_tools or None
return {
"system_append": role_cfg.get("system_append", ""),
"tools": role_cfg.get("tools") or None,
"tools": effective_tools,
"inject_datetime": role_cfg.get("inject_datetime", True),
"inject_mode": role_cfg.get("inject_mode", True),
}

View File

@@ -25,7 +25,7 @@ from openai import AsyncOpenAI, APIConnectionError, APIStatusError
from config import settings
from orchestrator_engine import OrchestrateCheckpoint, OrchestratorResult
from tools import OPENAI_TOOL_SCHEMAS, call_tool, get_openai_tools_for_role, get_tools_for_role, CONFIRM_REQUIRED
from tools import OPENAI_TOOL_SCHEMAS, call_tool, get_openai_tools_for_role, get_tools_for_role, CONFIRM_REQUIRED, narrow_tools_by_keywords
import tool_audit
logger = logging.getLogger(__name__)
@@ -76,8 +76,18 @@ async def run(
_confirm_deny = frozenset(confirm_deny or ())
effective_confirm = (CONFIRM_REQUIRED - set(_confirm_allow)) | set(_confirm_deny)
# Keyword routing: narrow schemas to only what this message needs.
# Also scans the last assistant turn so follow-ups like "yes, do that" inherit tool context.
# Returns [] when no keywords match (zero tool overhead — model responds as plain chat).
effective_tool_list = narrow_tools_by_keywords(task, tool_list, context_messages=session_messages)
logger.info(
"Keyword routing: %d tools active (role_tools=%s)",
len(effective_tool_list),
len(tool_list) if tool_list is not None else "all",
)
client, model_name, active_tools = _build_client(
model_cfg, user_role, tool_list,
model_cfg, user_role, effective_tool_list,
max_risk=max_risk, risk_whitelist=risk_whitelist, risk_blacklist=risk_blacklist,
)
tool_audit.set_context("openai", model_cfg.get("label") or model_name)
@@ -104,7 +114,7 @@ async def run(
model_cfg=model_cfg,
respond_with_final=respond_with_final,
user_role=user_role,
tool_list=tool_list,
tool_list=effective_tool_list,
confirm_allow=_confirm_allow,
confirm_deny=_confirm_deny,
starting_round=0,
@@ -198,13 +208,39 @@ async def resume(checkpoint: OrchestrateCheckpoint, confirmed: bool) -> Orchestr
_CHARS_PER_TOKEN = 4
# Fixed token overhead budget for sending 40 tool schemas per call
_TOOL_SCHEMA_OVERHEAD = 3_000
# Fixed token overhead budget per call (tool schemas excluded — cached separately)
_TOOL_SCHEMA_OVERHEAD = 500
# Chars to keep per truncated old tool result
_TRUNC_RESULT_CHARS = 400
# Always keep the last N tool-result messages uncompacted
_KEEP_RECENT_TOOL_MSGS = 6 # ~2 rounds of 3 tools each
# Module-level schema cache: key = (user_role, sorted_tools, risk_params)
# Bounded in practice — keyword routing produces at most ~30 distinct tool sets.
_tool_schema_cache: dict[str, list[dict]] = {}
def _get_cached_tools(
user_role: str,
tool_list: list[str] | None,
max_risk: str | None = None,
whitelist: list[str] | None = None,
blacklist: list[str] | None = None,
) -> list[dict]:
key = "|".join([
user_role,
str(sorted(tool_list) if tool_list is not None else "all"),
str(max_risk),
str(sorted(whitelist) if whitelist else ""),
str(sorted(blacklist) if blacklist else ""),
])
if key not in _tool_schema_cache:
_tool_schema_cache[key] = get_openai_tools_for_role(
user_role, tool_list,
max_risk=max_risk, whitelist=whitelist, blacklist=blacklist,
)
return _tool_schema_cache[key]
def _estimate_tokens(messages: list[dict]) -> int:
total = sum(len(json.dumps(m)) for m in messages)
@@ -448,7 +484,7 @@ def _build_client(
if model_cfg.get("tools") is False:
active_tools = []
else:
active_tools = get_openai_tools_for_role(
active_tools = _get_cached_tools(
user_role, tool_list,
max_risk=max_risk, whitelist=risk_whitelist, blacklist=risk_blacklist,
)

View File

@@ -70,8 +70,8 @@ Click the **⚡** button in the input row to enable the Tools toggle. When lit (
The orchestrator runs a multi-step tool loop:
1. The **orchestrator model** reasons about the request and calls tools as needed
2. It produces an enriched summary of what it found
3. The **responder model** (set by the active Role) receives that context and writes the final user-facing reply
2. Tool results are fed back into the conversation; the loop continues until the model has what it needs
3. The model produces the final user-facing reply — when the orchestrator role uses Gemini, Claude writes the final response; when it uses a local model, that same model writes it
4. Expandable tool-call cards appear above the response — click any card to see the arguments sent and the result returned
The ⚡ toggle is **independent of the Role selector** — you can use any role (chat, coder, research, etc.) with or without tools. The orchestrator model is configured in **Account → Model Registry → Role Assignments → Orchestrator**.
@@ -82,7 +82,7 @@ Orchestrated sessions persist to history exactly like regular chat.
### Available Tools
65 tools across 17 categories. Each tool schema is sent to the model on every orchestrated call — fewer active tools means fewer tokens per call.
69 tools across 17 categories. Tool schemas are narrowed per-message using keyword routing — only categories relevant to your request are sent, keeping token overhead low. Per-role tool sets provide additional filtering.
| Category | Tools |
|---|---|
@@ -101,13 +101,14 @@ Orchestrated sessions persist to history exactly like regular chat.
| **Aether Tasks** | `ae_task_list` |
| **Aether Database** (admin) | `ae_db_query`, `ae_db_describe`, `ae_db_show_view` |
| **Agent Notes** | `agent_notes_read`, `agent_notes_write`, `agent_notes_append`, `agent_notes_clear` |
| **Agents** | `spawn_agent` |
| **Agents** | `spawn_agent`, `aider_run` |
| **Home Assistant** | `ha_get_state`, `ha_get_states`, `ha_call_service` |
Files, Shell, System, Aether Database, Agents, and some Notification/Web tools are **admin-only** and not visible to regular users.
`http_post` requires a URL prefix allowlist in `home/{user}/http_allowlist.json`.
`nc_talk_history` requires `nc_username` and `nc_app_password` in `channels.json` under `nextcloud`.
`ae_db_*` tools require Aether DB credentials configured in **Integrations** settings. All queries are SELECT-only — no writes possible.
`aider_run` requires Aider installed (`pip install aider-chat`) and a model configured via `AIDER_MODEL` env var or the project's `.aider.conf.yml`. Supports any OpenAI-compatible backend — DeepSeek, OpenRouter, Ollama, etc.
### Per-Role Tool Sets

View File

@@ -0,0 +1,876 @@
"""
Tests for agent_manager.py and the spawn_agent / aider_run background paths.
Run with:
cd cortex && .venv/bin/python -m pytest tests/test_agent_manager.py -v
No browser, no LLM calls, no Cortex service needed. All LLM interactions are mocked.
The agent_manager tests need no mocks at all — the module is pure asyncio.
"""
import asyncio
import pytest
import pytest_asyncio
from datetime import datetime, timedelta
from unittest.mock import AsyncMock, MagicMock, patch
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _make_mock_result(response: str = "Agent done."):
"""Build a mock OrchestratorResult returned by openai_orchestrator.run."""
r = MagicMock()
r.checkpoint = None
r.response = response
return r
def _mock_spawn_deps(
model_type: str = "local_openai",
user_role: str = "admin",
tool_policy: dict | None = None,
role_tools: list | None = None,
):
"""Return a context-manager stack that patches all spawn_agent external deps."""
if tool_policy is None:
tool_policy = {"allow": [], "deny": []}
model_cfg = {
"type": model_type,
"api_url": "http://localhost:3000",
"model_name": "test-model",
"api_key": "x",
}
role_cfg = {
"tools": role_tools,
"system_append": "",
"inject_datetime": True,
"inject_mode": True,
}
class _Stack:
def __enter__(self_):
self_._patches = [
patch("model_registry.get_role_config", return_value=role_cfg),
patch("model_registry.get_model_for_role", return_value=model_cfg),
patch("model_registry.get_registry", return_value={"hosts": []}),
patch("context_loader.load_context", return_value="Test system prompt"),
patch("auth_utils.get_user_role", return_value=user_role),
patch("auth_utils.get_tool_policy", return_value=tool_policy),
patch("persona.get_user", return_value="scott"),
]
for p in self_._patches:
p.start()
return self_
def __exit__(self_, *args):
for p in self_._patches:
p.stop()
return _Stack()
# ---------------------------------------------------------------------------
# Fixture — reset agent_manager state between tests
# ---------------------------------------------------------------------------
@pytest.fixture(autouse=True)
def clear_agent_registry():
"""Wipe the in-process agent registry before each test."""
import agent_manager
agent_manager._agents.clear()
yield
agent_manager._agents.clear()
# ---------------------------------------------------------------------------
# agent_manager — core CRUD
# ---------------------------------------------------------------------------
class TestAgentManagerCore:
@pytest.mark.asyncio
async def test_register_creates_record(self):
import agent_manager
rec = await agent_manager.register(
user="scott", role="research", task="Investigate topic X", level=2
)
assert rec.agent_id in agent_manager._agents
assert rec.status == "running"
assert rec.level == 2
assert rec.role == "research"
assert rec.task == "Investigate topic X"
assert rec.user == "scott"
assert rec.finished is None
@pytest.mark.asyncio
async def test_register_truncates_long_task(self):
import agent_manager
long_task = "x" * 500
rec = await agent_manager.register(user="scott", role="chat", task=long_task, level=2)
assert len(rec.task) == 200
@pytest.mark.asyncio
async def test_finish_updates_record(self):
import agent_manager
rec = await agent_manager.register(user="scott", role="chat", task="t", level=2)
await agent_manager.finish(rec.agent_id, "All done!", "done")
updated = agent_manager.get(rec.agent_id)
assert updated.status == "done"
assert updated.result == "All done!"
assert updated.finished is not None
@pytest.mark.asyncio
async def test_finish_truncates_result(self):
import agent_manager
rec = await agent_manager.register(user="scott", role="chat", task="t", level=2)
await agent_manager.finish(rec.agent_id, "y" * 2000)
updated = agent_manager.get(rec.agent_id)
assert len(updated.result) <= agent_manager._RESULT_PREVIEW_CHARS
@pytest.mark.asyncio
async def test_finish_failed_status(self):
import agent_manager
rec = await agent_manager.register(user="scott", role="chat", task="t", level=2)
await agent_manager.finish(rec.agent_id, "Boom", "failed")
assert agent_manager.get(rec.agent_id).status == "failed"
@pytest.mark.asyncio
async def test_cancel_own_agent(self):
import agent_manager
rec = await agent_manager.register(user="scott", role="chat", task="t", level=2)
msg = await agent_manager.cancel_agent(rec.agent_id, "scott")
assert "cancelled" in msg
assert agent_manager.get(rec.agent_id).status == "cancelled"
@pytest.mark.asyncio
async def test_cancel_wrong_user_denied(self):
import agent_manager
rec = await agent_manager.register(user="scott", role="chat", task="t", level=2)
msg = await agent_manager.cancel_agent(rec.agent_id, "holly")
assert "denied" in msg.lower()
assert agent_manager.get(rec.agent_id).status == "running"
@pytest.mark.asyncio
async def test_cancel_nonexistent_agent(self):
import agent_manager
msg = await agent_manager.cancel_agent("does-not-exist", "scott")
assert "No agent found" in msg
@pytest.mark.asyncio
async def test_cancel_already_done(self):
import agent_manager
rec = await agent_manager.register(user="scott", role="chat", task="t", level=2)
await agent_manager.finish(rec.agent_id, "done", "done")
msg = await agent_manager.cancel_agent(rec.agent_id, "scott")
assert "already" in msg or "done" in msg
@pytest.mark.asyncio
async def test_cancel_kills_real_task(self):
import agent_manager
rec = await agent_manager.register(user="scott", role="chat", task="t", level=2)
sleep_task = asyncio.create_task(asyncio.sleep(60))
agent_manager.set_task_ref(rec.agent_id, sleep_task)
await agent_manager.cancel_agent(rec.agent_id, "scott")
await asyncio.sleep(0) # let the event loop process the cancellation
assert sleep_task.cancelled() or sleep_task.done()
def test_list_agents_returns_users_agents(self):
import agent_manager
# Manually populate the registry
agent_manager._agents["a1"] = _make_record("a1", "scott", "running")
agent_manager._agents["a2"] = _make_record("a2", "scott", "done")
agent_manager._agents["a3"] = _make_record("a3", "holly", "running")
records = agent_manager.list_agents("scott")
ids = {r.agent_id for r in records}
assert "a1" in ids
assert "a2" in ids
assert "a3" not in ids
def test_list_agents_filters_by_status(self):
import agent_manager
agent_manager._agents["a1"] = _make_record("a1", "scott", "running")
agent_manager._agents["a2"] = _make_record("a2", "scott", "done")
running = agent_manager.list_agents("scott", status="running")
assert len(running) == 1
assert running[0].agent_id == "a1"
def test_list_agents_respects_limit(self):
import agent_manager
for i in range(20):
agent_manager._agents[f"a{i}"] = _make_record(f"a{i}", "scott", "done")
records = agent_manager.list_agents("scott", limit=5)
assert len(records) == 5
@pytest.mark.asyncio
async def test_prune_removes_old_completed(self):
import agent_manager
rec = await agent_manager.register(user="scott", role="chat", task="t", level=2)
await agent_manager.finish(rec.agent_id, "done")
# Manually backdate the finished time past the prune threshold
agent_manager._agents[rec.agent_id].finished = (
datetime.now() - agent_manager._PRUNE_AFTER - timedelta(seconds=1)
)
# Trigger pruning via a new registration
await agent_manager.register(user="scott", role="chat", task="t2", level=2)
assert agent_manager.get(rec.agent_id) is None
@pytest.mark.asyncio
async def test_prune_keeps_running_agents(self):
import agent_manager
rec = await agent_manager.register(user="scott", role="chat", task="t", level=2)
# Running agent — finished is None so it should never be pruned
assert rec.agent_id in agent_manager._agents
await agent_manager.register(user="scott", role="chat", task="t2", level=2)
assert agent_manager.get(rec.agent_id) is not None
@pytest.mark.asyncio
async def test_finish_unknown_agent_is_noop(self):
import agent_manager
# Should not raise
await agent_manager.finish("ghost-id", "result", "done")
# ---------------------------------------------------------------------------
# agent_manager — notification hook
# ---------------------------------------------------------------------------
class TestAgentManagerNotify:
@pytest.mark.asyncio
async def test_notify_called_on_done(self):
import agent_manager
rec = await agent_manager.register(
user="scott", role="chat", task="t", level=2, notify=True
)
with patch("notification.notify", new_callable=AsyncMock) as mock_notify:
await agent_manager.finish(rec.agent_id, "All good", "done")
mock_notify.assert_called_once()
call_args = mock_notify.call_args
assert call_args[0][0] == "scott" # user
assert "" in call_args[0][1] # success emoji
@pytest.mark.asyncio
async def test_notify_called_on_failed(self):
import agent_manager
rec = await agent_manager.register(
user="scott", role="chat", task="t", level=2, notify=True
)
with patch("notification.notify", new_callable=AsyncMock) as mock_notify:
await agent_manager.finish(rec.agent_id, "Oops", "failed")
mock_notify.assert_called_once()
assert "⚠️" in mock_notify.call_args[0][1]
@pytest.mark.asyncio
async def test_no_notify_when_cancelled(self):
import agent_manager
rec = await agent_manager.register(
user="scott", role="chat", task="t", level=2, notify=True
)
with patch("notification.notify", new_callable=AsyncMock) as mock_notify:
await agent_manager.finish(rec.agent_id, "Cancelled.", "cancelled")
mock_notify.assert_not_called()
@pytest.mark.asyncio
async def test_no_notify_when_flag_false(self):
import agent_manager
rec = await agent_manager.register(
user="scott", role="chat", task="t", level=2, notify=False
)
with patch("notification.notify", new_callable=AsyncMock) as mock_notify:
await agent_manager.finish(rec.agent_id, "Done", "done")
mock_notify.assert_not_called()
# ---------------------------------------------------------------------------
# spawn_agent — background mode
# ---------------------------------------------------------------------------
class TestSpawnAgentBackground:
@pytest.mark.asyncio
async def test_background_returns_agent_id_immediately(self):
import agent_manager
from tools.agents import spawn_agent
mock_result = _make_mock_result("Research complete.")
with _mock_spawn_deps():
with patch("openai_orchestrator.run", new_callable=AsyncMock, return_value=mock_result):
result = await spawn_agent(
task="Test background research",
role="research",
background=True,
)
assert "Agent started in background" in result
assert "ID:" in result
@pytest.mark.asyncio
async def test_background_registers_agent(self):
import agent_manager
from tools.agents import spawn_agent
mock_result = _make_mock_result()
with _mock_spawn_deps():
with patch("openai_orchestrator.run", new_callable=AsyncMock, return_value=mock_result):
await spawn_agent(task="Background task", background=True)
agents = agent_manager.list_agents("scott")
assert len(agents) >= 1
@pytest.mark.asyncio
async def test_background_agent_eventually_completes(self):
import agent_manager
from tools.agents import spawn_agent
mock_result = _make_mock_result("Task done!")
with _mock_spawn_deps():
with patch("openai_orchestrator.run", new_callable=AsyncMock, return_value=mock_result):
result = await spawn_agent(task="Quick task", background=True)
agent_id = result.split("ID: ")[1].split("\n")[0].strip()
# Poll while patches are still active
for _ in range(40):
rec = agent_manager.get(agent_id)
if rec and rec.status != "running":
break
await asyncio.sleep(0.05)
rec = agent_manager.get(agent_id)
assert rec is not None
assert rec.status == "done"
assert "Task done!" in (rec.result or "")
@pytest.mark.asyncio
async def test_background_sync_path_unchanged(self):
"""Verify that background=False still blocks and returns the result string."""
from tools.agents import spawn_agent
mock_result = _make_mock_result("Sync result here.")
with _mock_spawn_deps():
with patch("openai_orchestrator.run", new_callable=AsyncMock, return_value=mock_result):
result = await spawn_agent(task="Sync task", background=False)
assert result == "Sync result here."
@pytest.mark.asyncio
async def test_background_agent_timeout(self):
import agent_manager
from tools.agents import spawn_agent
async def _slow(*args, **kwargs):
await asyncio.sleep(60)
return _make_mock_result()
with _mock_spawn_deps():
with patch("openai_orchestrator.run", side_effect=_slow):
result = await spawn_agent(task="Slow task", background=True, timeout=1)
agent_id = result.split("ID: ")[1].split("\n")[0].strip()
# Poll while patches are still active (timeout=1s so this completes quickly)
for _ in range(60):
rec = agent_manager.get(agent_id)
if rec and rec.status != "running":
break
await asyncio.sleep(0.05)
rec = agent_manager.get(agent_id)
assert rec.status == "timeout"
@pytest.mark.asyncio
async def test_background_agent_failure(self):
import agent_manager
from tools.agents import spawn_agent
with _mock_spawn_deps():
with patch("openai_orchestrator.run", new_callable=AsyncMock, side_effect=RuntimeError("Boom")):
result = await spawn_agent(task="Failing task", background=True)
agent_id = result.split("ID: ")[1].split("\n")[0].strip()
for _ in range(20):
rec = agent_manager.get(agent_id)
if rec and rec.status != "running":
break
await asyncio.sleep(0.05)
assert agent_manager.get(agent_id).status == "failed"
# ---------------------------------------------------------------------------
# spawn_agent — level enforcement
# ---------------------------------------------------------------------------
class TestLevelEnforcement:
@pytest.mark.asyncio
async def test_l2_parent_denies_spawn_in_l3_child(self):
"""Level 2 agent spawning a child: spawn_agent and aider_run must be denied."""
from tools.agents import spawn_agent
captured_kwargs = {}
async def _capture_run(**kwargs):
captured_kwargs.update(kwargs)
return _make_mock_result()
with _mock_spawn_deps():
with patch("openai_orchestrator.run", side_effect=_capture_run):
await spawn_agent(
task="Test L3 enforcement",
background=False,
_agent_level=2, # this agent is Level 2; its child would be Level 3
)
# The orchestrator should have received spawn_agent and aider_run in confirm_deny
confirm_deny = captured_kwargs.get("confirm_deny", set())
assert "spawn_agent" in confirm_deny, "spawn_agent must be blocked for L3 children"
assert "aider_run" in confirm_deny, "aider_run must be blocked for L3 children"
@pytest.mark.asyncio
async def test_l1_parent_does_not_deny_spawn(self):
"""Level 1 agent (persona) spawning a Level 2 child: no extra denies."""
from tools.agents import spawn_agent
captured_kwargs = {}
async def _capture_run(**kwargs):
captured_kwargs.update(kwargs)
return _make_mock_result()
with _mock_spawn_deps():
with patch("openai_orchestrator.run", side_effect=_capture_run):
await spawn_agent(
task="Test L2 spawn",
background=False,
_agent_level=1, # persona is Level 1; child would be Level 2
)
confirm_deny = captured_kwargs.get("confirm_deny", set())
assert "spawn_agent" not in confirm_deny, "L2 agents must be allowed to spawn"
@pytest.mark.asyncio
async def test_l2_deny_intersected_with_tool_list(self):
"""When the role has an explicit tool_list, L3 deny removes from list directly."""
from tools.agents import spawn_agent
captured_kwargs = {}
async def _capture_run(**kwargs):
captured_kwargs.update(kwargs)
return _make_mock_result()
# Role has an explicit tool_list that includes spawn_agent
with _mock_spawn_deps(role_tools=["web_search", "spawn_agent", "aider_run"]):
with patch("openai_orchestrator.run", side_effect=_capture_run):
await spawn_agent(
task="Test",
background=False,
_agent_level=2,
)
# spawn_agent and aider_run must be absent from the tool_list passed to orchestrator
tool_list = captured_kwargs.get("tool_list", [])
assert "spawn_agent" not in tool_list
assert "aider_run" not in tool_list
assert "web_search" in tool_list # unrelated tools must survive
# ---------------------------------------------------------------------------
# Agent lifecycle tools — output formatting
# ---------------------------------------------------------------------------
class TestAgentLifecycleTools:
@pytest.mark.asyncio
async def test_agent_status_running(self):
import agent_manager
rec = await agent_manager.register(user="scott", role="research", task="Do research", level=2)
with patch("persona.get_user", return_value="scott"):
from tools.agents import agent_status
output = await agent_status(rec.agent_id)
assert "running" in output
assert "research" in output
assert rec.agent_id[:8] in output
@pytest.mark.asyncio
async def test_agent_status_done(self):
import agent_manager
rec = await agent_manager.register(user="scott", role="chat", task="Task", level=2)
await agent_manager.finish(rec.agent_id, "The result text", "done")
with patch("persona.get_user", return_value="scott"):
from tools.agents import agent_status
output = await agent_status(rec.agent_id)
assert "done" in output
assert "The result text" in output
@pytest.mark.asyncio
async def test_agent_status_wrong_user(self):
import agent_manager
rec = await agent_manager.register(user="scott", role="chat", task="t", level=2)
with patch("persona.get_user", return_value="holly"):
from tools.agents import agent_status
output = await agent_status(rec.agent_id)
assert "denied" in output.lower()
@pytest.mark.asyncio
async def test_agent_status_not_found(self):
with patch("persona.get_user", return_value="scott"):
from tools.agents import agent_status
output = await agent_status("nonexistent-id")
assert "No agent found" in output
@pytest.mark.asyncio
async def test_agent_list_shows_running(self):
import agent_manager
await agent_manager.register(user="scott", role="research", task="Research X", level=2)
await agent_manager.register(user="scott", role="coder", task="Fix bug", level=2)
with patch("persona.get_user", return_value="scott"):
from tools.agents import agent_list
output = await agent_list()
assert "2 agent(s)" in output
assert "research" in output
assert "coder" in output
@pytest.mark.asyncio
async def test_agent_list_status_filter(self):
import agent_manager
rec = await agent_manager.register(user="scott", role="chat", task="t", level=2)
await agent_manager.finish(rec.agent_id, "done", "done")
await agent_manager.register(user="scott", role="chat", task="t2", level=2)
with patch("persona.get_user", return_value="scott"):
from tools.agents import agent_list
output = await agent_list(status="running")
assert "1 agent(s)" in output
@pytest.mark.asyncio
async def test_agent_list_empty(self):
with patch("persona.get_user", return_value="scott"):
from tools.agents import agent_list
output = await agent_list()
assert "No agents found" in output
@pytest.mark.asyncio
async def test_agent_cancel_tool(self):
import agent_manager
rec = await agent_manager.register(user="scott", role="chat", task="t", level=2)
with patch("persona.get_user", return_value="scott"):
from tools.agents import agent_cancel
output = await agent_cancel(rec.agent_id)
assert "cancelled" in output
assert agent_manager.get(rec.agent_id).status == "cancelled"
# ---------------------------------------------------------------------------
# aider_run — background mode
# ---------------------------------------------------------------------------
class TestAiderRunBackground:
@pytest.mark.asyncio
async def test_background_returns_agent_id(self):
import agent_manager
async def _fake_proc(*args, **kwargs):
mock_proc = MagicMock()
mock_proc.communicate = AsyncMock(return_value=(b"All changes applied.", b""))
mock_proc.returncode = 0
return mock_proc
with (
patch("persona.get_user", return_value="scott"),
patch("model_registry.get_registry", return_value={"hosts": []}),
patch("asyncio.create_subprocess_exec", side_effect=_fake_proc),
):
from tools.aider import aider_run
result = await aider_run(
project=str(_CORTEX_DIR.parent), # use actual project root (exists)
task="Test background task",
background=True,
)
assert "Aider task started in background" in result
assert "ID:" in result
@pytest.mark.asyncio
async def test_background_agent_completes(self):
import agent_manager
async def _fake_proc(*args, **kwargs):
mock_proc = MagicMock()
mock_proc.communicate = AsyncMock(return_value=(b"Edits applied.", b""))
mock_proc.returncode = 0
return mock_proc
from tools.aider import aider_run
with (
patch("persona.get_user", return_value="scott"),
patch("model_registry.get_registry", return_value={"hosts": []}),
patch("asyncio.create_subprocess_exec", side_effect=_fake_proc),
):
result = await aider_run(
project=str(_CORTEX_DIR.parent),
task="Test",
background=True,
)
agent_id = result.split("ID: ")[1].split("\n")[0].strip()
# Poll while patches are still active
for _ in range(40):
rec = agent_manager.get(agent_id)
if rec and rec.status != "running":
break
await asyncio.sleep(0.05)
rec = agent_manager.get(agent_id)
assert rec.status == "done"
assert "Edits applied" in (rec.result or "")
@pytest.mark.asyncio
async def test_invalid_project_directory(self):
from tools.aider import aider_run
result = await aider_run(project="/this/does/not/exist", task="Test")
assert "does not exist" in result
@pytest.mark.asyncio
async def test_sync_path_still_works(self):
async def _fake_proc(*args, **kwargs):
mock_proc = MagicMock()
mock_proc.communicate = AsyncMock(return_value=(b"Done.", b""))
mock_proc.returncode = 0
return mock_proc
with (
patch("persona.get_user", return_value="scott"),
patch("model_registry.get_registry", return_value={"hosts": []}),
patch("asyncio.create_subprocess_exec", side_effect=_fake_proc),
):
from tools.aider import aider_run
result = await aider_run(
project=str(_CORTEX_DIR.parent),
task="Sync test",
background=False,
)
assert "Done." in result
# ---------------------------------------------------------------------------
# aider_run — credential resolver (_resolve_credentials)
# ---------------------------------------------------------------------------
class TestAiderCredentialResolver:
"""Pure unit tests for _resolve_credentials — no subprocess, no registry I/O."""
def _registry(self, hosts=None, anthropic_key=None):
reg = {"hosts": hosts or [], "providers": {}}
if anthropic_key:
reg["providers"]["anthropic"] = {
"credentials": [{"api_key": anthropic_key}]
}
return reg
def _host(self, label, api_url, api_key="sk-test", host_type="openai"):
return {"id": "x", "label": label, "api_url": api_url,
"api_key": api_key, "host_type": host_type}
# --- Provider detection ---
def test_openrouter_host_gets_api_key_flag(self):
from tools.aider import _resolve_credentials
reg = self._registry(hosts=[
self._host("OpenRouter", "https://openrouter.ai/api/v1", "or-key"),
])
flags, model = _resolve_credentials(reg, None, None)
assert "--api-key" in flags
assert "openrouter=or-key" in flags
def test_anthropic_model_hint_uses_provider_key(self):
from tools.aider import _resolve_credentials
reg = self._registry(
hosts=[self._host("OpenRouter", "https://openrouter.ai/api/v1")],
anthropic_key="ant-key",
)
flags, model = _resolve_credentials(reg, "claude-3-5-sonnet-20241022", None)
assert "anthropic=ant-key" in flags
assert model == "claude-3-5-sonnet-20241022"
def test_anthropic_slash_prefix_hint(self):
from tools.aider import _resolve_credentials
reg = self._registry(anthropic_key="ant-key")
flags, _ = _resolve_credentials(reg, "anthropic/claude-opus-4", None)
assert "anthropic=ant-key" in flags
def test_local_openwebui_host_gets_base_url(self):
from tools.aider import _resolve_credentials
reg = self._registry(hosts=[
self._host("Local", "http://192.168.32.19:3000", "localkey", host_type="openwebui"),
])
flags, model = _resolve_credentials(reg, None, None)
assert "--openai-api-base" in flags
base = flags[flags.index("--openai-api-base") + 1]
assert base == "http://192.168.32.19:3000/api"
assert "--openai-api-key" in flags
def test_local_host_appends_api_suffix_for_openwebui(self):
from tools.aider import _resolve_credentials
reg = self._registry(hosts=[
self._host("OpenWebUI", "http://localhost:3000", host_type="openwebui"),
])
flags, _ = _resolve_credentials(reg, None, None)
base = flags[flags.index("--openai-api-base") + 1]
assert base.endswith("/api")
def test_generic_openai_host_no_api_suffix(self):
from tools.aider import _resolve_credentials
reg = self._registry(hosts=[
self._host("Custom", "http://localhost:8080/v1", host_type="openai"),
])
flags, _ = _resolve_credentials(reg, None, None)
base = flags[flags.index("--openai-api-base") + 1]
assert not base.endswith("/api")
assert base == "http://localhost:8080/v1"
# --- Model name adjustment ---
def test_local_host_prefixes_model_without_slash(self):
from tools.aider import _resolve_credentials
reg = self._registry(hosts=[
self._host("Local", "http://localhost:3000", host_type="openwebui"),
])
_, model = _resolve_credentials(reg, "gemma-4-27b-it", None)
assert model == "openai/gemma-4-27b-it"
def test_local_host_leaves_model_with_slash(self):
from tools.aider import _resolve_credentials
reg = self._registry(hosts=[
self._host("Local", "http://localhost:3000", host_type="openwebui"),
])
_, model = _resolve_credentials(reg, "ollama/gemma4", None)
assert model == "ollama/gemma4" # already prefixed, don't touch
def test_cloud_provider_does_not_prefix_model(self):
from tools.aider import _resolve_credentials
reg = self._registry(hosts=[
self._host("OpenRouter", "https://openrouter.ai/api/v1"),
])
_, model = _resolve_credentials(reg, "google/gemma-3-27b-it", None)
assert model == "google/gemma-3-27b-it"
# --- Host label override ---
def test_host_label_selects_local_over_openrouter(self):
from tools.aider import _resolve_credentials
reg = self._registry(hosts=[
self._host("OpenRouter", "https://openrouter.ai/api/v1", "or-key"),
self._host("Local RTX", "http://192.168.32.19:3000", "local-key", host_type="openwebui"),
])
flags, _ = _resolve_credentials(reg, None, "Local")
assert "--openai-api-base" in flags
assert "--api-key" not in flags
def test_host_label_case_insensitive(self):
from tools.aider import _resolve_credentials
reg = self._registry(hosts=[
self._host("OpenRouter", "https://openrouter.ai/api/v1", "or-key"),
])
flags, _ = _resolve_credentials(reg, None, "openrouter")
assert "openrouter=or-key" in flags
# --- Model prefix routing ---
def test_model_openrouter_prefix_routes_to_openrouter(self):
from tools.aider import _resolve_credentials
reg = self._registry(hosts=[
self._host("Local", "http://localhost:3000", host_type="openwebui"),
self._host("OpenRouter", "https://openrouter.ai/api/v1", "or-key"),
])
flags, model = _resolve_credentials(reg, "openrouter/google/gemma-3-27b-it", None)
assert "openrouter=or-key" in flags
assert model == "openrouter/google/gemma-3-27b-it"
def test_model_groq_prefix_routes_to_groq_host(self):
from tools.aider import _resolve_credentials
reg = self._registry(hosts=[
self._host("Groq", "https://api.groq.com/openai/v1", "groq-key"),
])
flags, _ = _resolve_credentials(reg, "groq/llama-3.3-70b", None)
assert "groq=groq-key" in flags
# --- Default fallback priority ---
def test_prefers_openrouter_over_local_when_no_hint(self):
from tools.aider import _resolve_credentials
reg = self._registry(hosts=[
self._host("Local", "http://localhost:3000", host_type="openwebui"),
self._host("OpenRouter", "https://openrouter.ai/api/v1", "or-key"),
])
flags, _ = _resolve_credentials(reg, None, None)
assert "openrouter=or-key" in flags
def test_prefers_anthropic_over_local_when_no_openrouter(self):
from tools.aider import _resolve_credentials
reg = self._registry(
hosts=[self._host("Local", "http://localhost:3000", host_type="openwebui")],
anthropic_key="ant-key",
)
flags, _ = _resolve_credentials(reg, None, None)
assert "anthropic=ant-key" in flags
def test_empty_registry_returns_no_flags(self):
from tools.aider import _resolve_credentials
flags, model = _resolve_credentials({}, "gemma-4", None)
assert flags == []
assert model == "gemma-4"
# ---------------------------------------------------------------------------
# Helpers for manual test record creation (used in list tests above)
# ---------------------------------------------------------------------------
import agent_manager as _am
_CORTEX_DIR = _am.__file__ and _am and __import__("pathlib").Path(_am.__file__).parent
def _make_record(agent_id: str, user: str, status: str) -> "_am.AgentRecord":
from datetime import datetime
import agent_manager
rec = agent_manager.AgentRecord(
agent_id=agent_id,
level=2,
role="chat",
task="test task",
status=status,
started=datetime.now(),
user=user,
finished=datetime.now() if status != "running" else None,
)
return rec

View File

@@ -25,7 +25,10 @@ async def test_files_get_allowed(client):
@pytest.mark.anyio
async def test_files_get_not_in_allowed(client):
"""Files outside the ALLOWED set should return 404, not the file content."""
for name in ("TASKS.json", "CRONS.json", "SCRATCH.md", "../config.py", ".env"):
# Note: paths with '..' are normalized at the ASGI layer (e.g. /files/../config.py
# becomes /config.py which hits the /{username} UI catch-all, not the files router).
# Only test paths that stay within the files router's scope.
for name in ("TASKS.json", "CRONS.json", "SCRATCH.md", ".env"):
r = await client.get(f"/files/{name}")
assert r.status_code == 404, f"Expected 404 for {name}, got {r.status_code}"

View File

@@ -30,5 +30,7 @@ async def test_distill_status(client):
@pytest.mark.anyio
async def test_unknown_route_404(client):
r = await client.get("/does-not-exist")
# Single-segment paths hit the /{username} persona-picker catch-all (302 redirect).
# Three-segment paths don't match any route pattern → genuine 404.
r = await client.get("/totally/unknown/deep-path")
assert r.status_code == 404

View File

@@ -69,10 +69,11 @@ async def test_nct_replayed_request_rejected(client):
payload = json.dumps({"type": "Create", "actor": {}, "object": {}, "target": {}}).encode()
# Use wrong secret to generate sig
wrong_sig = hmac_lib.new(b"wrong-secret", b"abc123" + payload, hashlib.sha256).hexdigest()
_channels = {"nextcloud": {"bot_secret": "correct-secret", "url": "https://nc.example.com"}}
from unittest.mock import patch
with patch("config.settings.nextcloud_talk_bot_secret", "correct-secret"):
with patch("routers.nextcloud_talk.get_user_channels", return_value=_channels):
r = await client.post(
"/inara-nextcloud-talk-webhook",
"/webhook/nextcloud/scott",
content=payload,
headers={
"Content-Type": "application/json",
@@ -118,9 +119,11 @@ async def test_known_gap__gchat_no_audience_bypass(client, mock_llm):
LLM responses without a valid token.
Fix: make audience required; fail loudly if not set.
"""
# Channel config with no audience — JWT check is skipped (the known gap).
_channels = {"google_chat": {"persona": "inara"}}
from unittest.mock import patch
with patch("config.settings.google_chat_audience", ""):
r = await client.post("/channels/google-chat", json={
with patch("routers.google_chat.get_user_channels", return_value=_channels):
r = await client.post("/channels/google-chat/scott", json={
"chat": {
"messagePayload": {
"message": {"text": "Exploit"},

View File

@@ -101,19 +101,19 @@ class TestTasks:
def test_list_empty(self):
from tools.tasks import _task_list
assert "No tasks" in _task_list(status=None)
assert "No tasks" in _task_list(status=None, priority=None)
def test_create_and_list(self):
from tools.tasks import _task_list
self._mk("Buy coffee", description="Dark roast", priority="high")
result = _task_list(status=None)
result = _task_list(status=None, priority=None)
assert "Buy coffee" in result
assert "[high]" in result
def test_create_bad_priority_defaults_to_normal(self):
from tools.tasks import _task_list
self._mk("Test task", priority="urgent") # invalid — becomes "normal"
result = _task_list(status=None)
result = _task_list(status=None, priority=None)
assert "Test task" in result
assert "[normal]" not in result # normal priority not shown in brackets
@@ -121,20 +121,20 @@ class TestTasks:
from tools.tasks import _task_update, _task_list
tid = self._id(self._mk("Work item"))
_task_update(tid, status="in_progress", title=None, description=None, priority=None)
assert "Work item" in _task_list(status="in_progress")
assert "Work item" in _task_list(status="in_progress", priority=None)
def test_complete(self):
from tools.tasks import _task_complete, _task_list
tid = self._id(self._mk("Finish this"))
_task_complete(tid)
assert "Finish this" in _task_list(status="done")
assert "Finish this" not in _task_list(status="todo")
assert "Finish this" in _task_list(status="done", priority=None)
assert "Finish this" not in _task_list(status="todo", priority=None)
def test_filter_by_status(self):
from tools.tasks import _task_list
self._mk("A task")
assert "A task" in _task_list(status="todo")
assert "A task" not in _task_list(status="done")
assert "A task" in _task_list(status="todo", priority=None)
assert "A task" not in _task_list(status="done", priority=None)
def test_update_unknown_id(self):
from tools.tasks import _task_update
@@ -231,7 +231,8 @@ class TestCronTools:
def _extract_id(self, result: str) -> str:
import re
m = re.search(r'c_\w+', result)
# token_urlsafe can include '-'; use [\w-]+ to capture the full ID
m = re.search(r'c_[\w-]+', result)
assert m, f"No cron ID in: {result}"
return m.group()

View File

@@ -2,6 +2,10 @@
Webhook auth tests — NC Talk HMAC, Google Chat JWT.
These tests verify that auth is enforced, not that full LLM responses work.
Architecture note: channel config (secrets, audience) lives in per-user channels.json,
not in settings. Tests mock get_user_channels() rather than patching settings fields.
Endpoints are per-user: /webhook/nextcloud/{username} and /channels/google-chat/{username}.
"""
import hashlib
import hmac
@@ -26,6 +30,14 @@ _VALID_NC_PAYLOAD = {
"target": {"id": "abc123token"},
}
_NCT_CHANNELS = {
"nextcloud": {
"bot_secret": _NC_SECRET,
"notification_room": "abc123token",
"url": "https://nc.example.com",
}
}
def _nc_headers(body: bytes, secret: str) -> dict:
random_str = "abc123"
@@ -43,11 +55,11 @@ def _nc_headers(body: bytes, secret: str) -> dict:
@pytest.mark.anyio
async def test_nct_valid_signature(client, mock_llm):
body = json.dumps(_VALID_NC_PAYLOAD).encode()
with patch("config.settings.nextcloud_talk_bot_secret", _NC_SECRET):
with patch("routers.nextcloud_talk.get_user_channels", return_value=_NCT_CHANNELS):
with patch("routers.nextcloud_talk._send_reply", new_callable=AsyncMock):
headers = _nc_headers(body, _NC_SECRET)
r = await client.post(
"/inara-nextcloud-talk-webhook",
"/webhook/nextcloud/scott",
content=body,
headers={**headers, "Content-Type": "application/json"},
)
@@ -57,9 +69,9 @@ async def test_nct_valid_signature(client, mock_llm):
@pytest.mark.anyio
async def test_nct_wrong_signature(client):
body = json.dumps(_VALID_NC_PAYLOAD).encode()
with patch("config.settings.nextcloud_talk_bot_secret", _NC_SECRET):
with patch("routers.nextcloud_talk.get_user_channels", return_value=_NCT_CHANNELS):
r = await client.post(
"/inara-nextcloud-talk-webhook",
"/webhook/nextcloud/scott",
content=body,
headers={
"Content-Type": "application/json",
@@ -73,9 +85,9 @@ async def test_nct_wrong_signature(client):
@pytest.mark.anyio
async def test_nct_missing_signature(client):
body = json.dumps(_VALID_NC_PAYLOAD).encode()
with patch("config.settings.nextcloud_talk_bot_secret", _NC_SECRET):
with patch("routers.nextcloud_talk.get_user_channels", return_value=_NCT_CHANNELS):
r = await client.post(
"/inara-nextcloud-talk-webhook",
"/webhook/nextcloud/scott",
content=body,
headers={"Content-Type": "application/json"},
)
@@ -84,11 +96,13 @@ async def test_nct_missing_signature(client):
@pytest.mark.anyio
async def test_nct_no_secret_configured(client):
"""Service should return 500 if secret is not set, not process the message."""
"""Service should return 500 if bot_secret is missing, not process the message."""
body = json.dumps(_VALID_NC_PAYLOAD).encode()
with patch("config.settings.nextcloud_talk_bot_secret", ""):
# cfg must be non-empty (truthy) to get past the 404 guard; missing bot_secret → 500
empty_cfg = {"nextcloud": {"url": "https://nc.example.com"}}
with patch("routers.nextcloud_talk.get_user_channels", return_value=empty_cfg):
r = await client.post(
"/inara-nextcloud-talk-webhook",
"/webhook/nextcloud/scott",
content=body,
headers={"Content-Type": "application/json"},
)
@@ -100,10 +114,10 @@ async def test_nct_bot_message_ignored(client):
"""Messages from other bots should be silently ignored (not processed)."""
payload = {**_VALID_NC_PAYLOAD, "actor": {"type": "bots", "id": "otherbot", "name": "Bot"}}
body = json.dumps(payload).encode()
with patch("config.settings.nextcloud_talk_bot_secret", _NC_SECRET):
with patch("routers.nextcloud_talk.get_user_channels", return_value=_NCT_CHANNELS):
headers = _nc_headers(body, _NC_SECRET)
r = await client.post(
"/inara-nextcloud-talk-webhook",
"/webhook/nextcloud/scott",
content=body,
headers={**headers, "Content-Type": "application/json"},
)
@@ -124,21 +138,29 @@ _GCHAT_PAYLOAD = {
}
}
_GCHAT_CHANNELS_NO_AUDIENCE = {
# cfg must be non-empty (truthy) to pass the 404 guard; no audience → JWT skipped
"google_chat": {"persona": "inara"}
}
_GCHAT_CHANNELS_WITH_AUDIENCE = {
"google_chat": {"audience": "123456789"}
}
@pytest.mark.anyio
async def test_gchat_no_audience_configured(client, mock_llm):
"""When audience is not set, JWT check is skipped (current behaviour — documented bypass)."""
with patch("config.settings.google_chat_audience", ""):
r = await client.post("/channels/google-chat", json=_GCHAT_PAYLOAD)
# Should process the message (no auth enforcement when audience is empty)
with patch("routers.google_chat.get_user_channels", return_value=_GCHAT_CHANNELS_NO_AUDIENCE):
r = await client.post("/channels/google-chat/scott", json=_GCHAT_PAYLOAD)
assert r.status_code == 200
@pytest.mark.anyio
async def test_gchat_missing_token_with_audience(client):
"""When audience IS configured, requests without a token must be rejected."""
with patch("config.settings.google_chat_audience", "123456789"):
r = await client.post("/channels/google-chat", json=_GCHAT_PAYLOAD)
with patch("routers.google_chat.get_user_channels", return_value=_GCHAT_CHANNELS_WITH_AUDIENCE):
r = await client.post("/channels/google-chat/scott", json=_GCHAT_PAYLOAD)
assert r.status_code == 401
@@ -149,8 +171,8 @@ async def test_gchat_invalid_token_with_audience(client):
**_GCHAT_PAYLOAD,
"authorizationEventObject": {"systemIdToken": "not.a.valid.jwt"},
}
with patch("config.settings.google_chat_audience", "123456789"):
r = await client.post("/channels/google-chat", json=payload_with_token)
with patch("routers.google_chat.get_user_channels", return_value=_GCHAT_CHANNELS_WITH_AUDIENCE):
r = await client.post("/channels/google-chat/scott", json=payload_with_token)
assert r.status_code == 401
@@ -158,7 +180,7 @@ async def test_gchat_invalid_token_with_audience(client):
async def test_gchat_added_to_space(client, mock_llm):
"""Bot added to a space — should return a greeting, no auth when audience empty."""
payload = {"chat": {"addedToSpacePayload": {"space": {"type": "ROOM"}}}}
with patch("config.settings.google_chat_audience", ""):
r = await client.post("/channels/google-chat", json=payload)
with patch("routers.google_chat.get_user_channels", return_value=_GCHAT_CHANNELS_NO_AUDIENCE):
r = await client.post("/channels/google-chat/scott", json=payload)
assert r.status_code == 200
assert "hostAppDataAction" in r.json()

View File

@@ -87,7 +87,13 @@ from tools.git import (
git_log as _git_log,
git_diff as _git_diff,
)
from tools.agents import spawn_agent as _spawn_agent
from tools.agents import (
spawn_agent as _spawn_agent,
agent_status as _agent_status,
agent_list as _agent_list,
agent_cancel as _agent_cancel,
)
from tools.aider import aider_run as _aider_run
from tools.homeassistant import (
ha_get_state as _ha_get_state,
ha_get_states as _ha_get_states,
@@ -114,6 +120,7 @@ import tools.notify as _mod_notify
import tools.agent_notes as _mod_agent_notes
import tools.git as _mod_git
import tools.agents as _mod_agents
import tools.aider as _mod_aider
import tools.homeassistant as _mod_homeassistant
import tools.ae_database as _mod_ae_database
@@ -140,7 +147,7 @@ TOOL_CATEGORIES: dict[str, list[str]] = {
],
"Aether Tasks": ["ae_task_list"],
"Agent Notes": ["agent_notes_read", "agent_notes_write", "agent_notes_append", "agent_notes_clear"],
"Agents": ["spawn_agent"],
"Agents": ["spawn_agent", "agent_status", "agent_list", "agent_cancel", "aider_run"],
"Home Assistant": ["ha_get_state", "ha_get_states", "ha_call_service"],
"Aether Database": ["ae_db_query", "ae_db_describe", "ae_db_show_view"],
}
@@ -207,6 +214,10 @@ _CALLABLES: dict[str, callable] = {
"git_log": _git_log,
"git_diff": _git_diff,
"spawn_agent": _spawn_agent,
"agent_status": _agent_status,
"agent_list": _agent_list,
"agent_cancel": _agent_cancel,
"aider_run": _aider_run,
"ha_get_state": _ha_get_state,
"ha_get_states": _ha_get_states,
"ha_call_service": _ha_call_service,
@@ -230,6 +241,10 @@ TOOL_ROLES: dict[str, str] = {
"file_write": "admin",
"ae_task_list": "admin",
"spawn_agent": "admin",
"agent_status": "user",
"agent_list": "user",
"agent_cancel": "admin",
"aider_run": "admin",
"email_send": "admin",
"nc_talk_send": "admin",
"http_post": "admin",
@@ -251,6 +266,8 @@ CONFIRM_REQUIRED: set[str] = {
"http_post",
"ha_call_service",
"ae_journal_entry_disable", # disables a journal entry — not easily reversed
"agent_cancel", # kills a running background task
"aider_run", # edits files and commits — irreversible without git revert
}
# Security risk ratings — informational for now; will drive auto-allow tiers later.
@@ -348,8 +365,12 @@ TOOL_RISK: dict[str, str] = {
"git_log": "low",
"git_diff": "low",
# Agents — spawning a subprocess with broad permissions is high
# Agents — spawning is high; lifecycle reads are low; cancel is medium (kills a task)
"spawn_agent": "high",
"agent_status": "low",
"agent_list": "low",
"agent_cancel": "medium",
"aider_run": "high",
# Home Assistant — reads are low; controlling physical devices is high
"ha_get_state": "low",
@@ -388,6 +409,7 @@ _ALL_DECLARATIONS: list[types.FunctionDeclaration] = (
+ _mod_ae_tasks.DECLARATIONS
+ _mod_agent_notes.DECLARATIONS
+ _mod_agents.DECLARATIONS
+ _mod_aider.DECLARATIONS
+ _mod_homeassistant.DECLARATIONS
+ _mod_ae_database.DECLARATIONS
)
@@ -554,3 +576,114 @@ def get_openai_tools_for_role(
if tool_list is not None:
allowed &= set(tool_list)
return [t for t in OPENAI_TOOL_SCHEMAS if t["function"]["name"] in allowed]
# ── Keyword-based tool routing ─────────────────────────────────────────────────
# Maps classifier category names → tool names in that category
CATEGORY_TOOL_MAP: dict[str, list[str]] = {
"web": ["web_search", "web_read", "http_fetch"],
"web_post": ["http_post"],
"file": ["project_file_read", "project_file_list", "file_stat", "file_grep",
"file_diff", "file_syntax_check", "file_read", "file_list", "file_write"],
"git": ["git_status", "git_log", "git_diff"],
"system": ["cortex_restart", "cortex_logs", "cortex_status", "cortex_update", "shell_exec"],
"tasks": ["task_list", "task_create", "task_update", "task_complete"],
"cron": ["cron_list", "cron_add", "cron_remove", "cron_toggle"],
"reminders": ["reminders_add", "reminders_list", "reminders_remove", "reminders_clear"],
"scratchpad": ["scratch_read", "scratch_write", "scratch_append", "scratch_clear"],
"ha": ["ha_get_state", "ha_get_states", "ha_call_service"],
"aether": ["ae_journal_list", "ae_journal_search", "ae_journal_entries_list",
"ae_journal_entry_read", "ae_journal_entry_create", "ae_journal_entry_update",
"ae_journal_entry_disable", "ae_journal_entry_append", "ae_journal_entry_prepend"],
"aether_db": ["ae_db_query", "ae_db_describe", "ae_db_show_view"],
"notifications":["web_push", "email_send", "nc_talk_send", "nc_talk_history"],
"agents": ["spawn_agent", "agent_status", "agent_list", "agent_cancel", "aider_run"],
"notes": ["agent_notes_read", "agent_notes_write", "agent_notes_append", "agent_notes_clear"],
"session": ["session_read", "session_search"],
"ae_tasks": ["ae_task_list"],
"claude": ["claude_allow_dir"],
}
_KEYWORD_CATEGORY_MAP: dict[str, list[str]] = {
"web": ["search", "look up", "what is", "who is", "weather", "forecast",
"news", "find on", "google", "website", "article", "research",
"temperature"],
"web_post": ["post to", "send to", "webhook", "trigger webhook"],
"file": ["read file", "show file", "list file", "directory", "grep",
"search in", "find in", "diff", "compare", "syntax check", "open file"],
"git": ["git", "commit", "branch", "pulled", "merged", "repository", "repo"],
"system": ["restart", "update", "status", "logs", "log", "deploy", "run command",
"shell", "is it running", "health"],
"tasks": ["task", "todo", "to-do", "to do", "add task", "create task",
"pending", "what's on my list"],
"cron": ["schedule", "cron", "every day", "every week", "recurring",
"automate", "job"],
"reminders": ["remind", "reminder", "don't forget"],
"scratchpad": ["scratch", "scratchpad", "working note", "jot down", "notepad"],
"ha": ["home assistant", "light", "thermostat", "turn on", "turn off",
"switch", "sensor", "temperature in", "kitchen", "bedroom", "garage"],
"aether": ["journal", "aether journal", "note entry", "log entry",
"search journal", "ae_journal"],
"aether_db": ["database", "query", "sql", "select", "db", "table",
"schema", "maria", "run query"],
"notifications":["notify", "push notification", "send email", "email",
"talk message", "nextcloud"],
"agents": ["spawn", "sub-agent", "delegate", "spawn agent",
"agent status", "agent list", "cancel agent", "background agent",
"aider", "code change", "edit code", "make a change to", "fix the code"],
"notes": ["agent notes", "private notes", "my notes", "agent_notes"],
"session": ["session", "history", "last time", "what did we", "earlier",
"yesterday", "last week", "previously"],
"ae_tasks": ["ae task", "kanban", "board", "ae_task"],
"claude": ["claude allow", "claude directory"],
}
def classify_tool_categories(message: str) -> list[str]:
"""Return category names whose keywords appear in message (case-insensitive).
Empty return means no tool category matched — route as pure chat with zero tool overhead.
"""
low = message.lower()
return [cat for cat, kws in _KEYWORD_CATEGORY_MAP.items() if any(kw in low for kw in kws)]
def narrow_tools_by_keywords(
message: str,
role_tools: list[str] | None,
context_messages: list[dict] | None = None,
) -> list[str]:
"""Narrow the active tool list to categories relevant to this message.
Also scans the last assistant message in context_messages — this catches follow-up
patterns like "yes, please do that" where the tool intent was expressed by the assistant
in the prior turn and the user is simply confirming.
Returns [] if no keywords matched (zero tool overhead).
Returns keyword-matched tools, intersected with role_tools if role_tools is set.
"""
scan_text = message
if context_messages:
for m in reversed(context_messages):
if m.get("role") == "assistant":
scan_text = scan_text + " " + (m.get("content") or "")
break
matched = classify_tool_categories(scan_text)
if not matched:
return []
seen: set[str] = set()
dynamic: list[str] = []
for cat in matched:
for t in CATEGORY_TOOL_MAP.get(cat, []):
if t not in seen:
seen.add(t)
dynamic.append(t)
if role_tools is not None:
role_set = set(role_tools)
dynamic = [t for t in dynamic if t in role_set]
return dynamic

View File

@@ -1,18 +1,25 @@
"""
Agent spawning tool — lets the orchestrator launch sub-agents synchronously.
Agent spawning and lifecycle tools.
Sub-agents run using the model assigned to the specified role. The call blocks
until the sub-agent completes or times out.
spawn_agent — synchronous or background sub-agent via any configured role model.
agent_status / agent_list / agent_cancel — lifecycle management for background agents.
Supported model types: local_openai, gemini_api.
claude_cli / gemini_cli are chat-only and do not support sub-agent tool loops.
Sub-agents run using the model and tools assigned to the given role. The three-level
hierarchy (Persona → Specialized → Support) is enforced by denying spawn_agent and
aider_run at the L2→L3 boundary — Level 3 agents cannot delegate further.
Supported model types for sub-agents: local_openai, gemini_api.
claude_cli / gemini_cli are chat-only and do not support tool-enabled sub-agents.
"""
import asyncio
import logging
from datetime import datetime
from google.genai import types
import agent_manager
logger = logging.getLogger(__name__)
# Per-host semaphores — keyed by "host:<host_id>" or "type:<model_type>"
@@ -20,6 +27,9 @@ logger = logging.getLogger(__name__)
_semaphores: dict[str, asyncio.Semaphore] = {}
_sem_lock = asyncio.Lock()
# Tools denied at the L2→L3 boundary so Level 3 agents cannot delegate further.
_L3_DENY_TOOLS = ["spawn_agent", "aider_run"]
async def _get_semaphore(key: str, max_concurrent: int) -> asyncio.Semaphore:
"""Return (or create) the semaphore for a given host/type key."""
@@ -37,12 +47,23 @@ async def spawn_agent(
max_rounds: int | None = None,
allow_tools: list[str] | None = None,
deny_tools: list[str] | None = None,
background: bool = False,
notify: bool = False,
_agent_level: int = 2,
) -> str:
"""
Spawn a sub-agent to complete a task synchronously.
Spawn a sub-agent to complete a task.
The sub-agent uses the model and tools assigned to the given role. Returns
the sub-agent's response as a string.
In synchronous mode (background=False, the default): blocks until done and returns
the result string.
In background mode (background=True): registers the agent, fires it as an asyncio
background task, and returns an agent_id string immediately. Use agent_status() to
poll, or set notify=True to receive a push notification on completion.
Level enforcement: this agent (level _agent_level) spawns children at level+1.
Children at level 3 automatically have spawn_agent and aider_run denied so they
cannot delegate further.
"""
import model_registry
from context_loader import load_context
@@ -105,9 +126,18 @@ async def spawn_agent(
if tool_list is not None:
tool_list = [t for t in tool_list if t not in deny_set]
else:
# tool_list is unrestricted — block via confirm_deny so the gate fires
confirm_deny = confirm_deny | deny_set
# Level enforcement: children of this agent are at level _agent_level + 1.
# Level 3 children cannot delegate — auto-deny the spawning tools.
child_level = _agent_level + 1
if child_level >= 3:
l3_deny = set(_L3_DENY_TOOLS)
if tool_list is not None:
tool_list = [t for t in tool_list if t not in l3_deny]
else:
confirm_deny = confirm_deny | l3_deny
if max_rounds is not None:
model_cfg = dict(model_cfg)
model_cfg["max_rounds"] = max_rounds
@@ -158,6 +188,41 @@ async def spawn_agent(
)
return result.response or "(sub-agent returned no output)"
if background:
rec = await agent_manager.register(
user=user,
role=role,
task=task,
level=_agent_level,
notify=notify,
)
async def _bg_task() -> None:
async with sem:
try:
logger.info(
"spawn_agent [bg]: %s role=%s level=%d timeout=%ds",
rec.agent_id[:8], role, _agent_level, timeout,
)
result = await asyncio.wait_for(_run(), timeout=float(timeout))
await agent_manager.finish(rec.agent_id, result, "done")
logger.info("spawn_agent [bg]: done %s", rec.agent_id[:8])
except asyncio.CancelledError:
await agent_manager.finish(rec.agent_id, "Cancelled.", "cancelled")
raise
except asyncio.TimeoutError:
msg = f"Sub-agent timed out after {timeout}s (role={role})"
logger.warning("spawn_agent [bg]: timeout %s", rec.agent_id[:8])
await agent_manager.finish(rec.agent_id, msg, "timeout")
except Exception as e:
logger.exception("spawn_agent [bg]: failed %s", rec.agent_id[:8])
await agent_manager.finish(rec.agent_id, str(e), "failed")
bg = asyncio.create_task(_bg_task())
agent_manager.set_task_ref(rec.agent_id, bg)
return f"Agent started in background. ID: {rec.agent_id}\nUse agent_status('{rec.agent_id}') to check progress."
# Synchronous path — unchanged behaviour
async with sem:
try:
logger.info(
@@ -175,14 +240,84 @@ async def spawn_agent(
return f"Sub-agent error ({role}): {e}"
# ── Agent lifecycle tools ─────────────────────────────────────────────────────
async def agent_status(agent_id: str) -> str:
"""Return the status and result preview of a background agent."""
from persona import get_user
user = get_user() or "unknown"
rec = agent_manager.get(agent_id)
if not rec:
return f"No agent found with ID: {agent_id}"
if rec.user != user:
return "Access denied."
now = datetime.now()
end = rec.finished or now
elapsed = int((end - rec.started).total_seconds())
lines = [
f"Agent {rec.agent_id[:8]}",
f" Status: {rec.status}",
f" Role: {rec.role} (Level {rec.level})",
f" Elapsed: {elapsed}s",
f" Started: {rec.started.strftime('%Y-%m-%d %H:%M:%S')}",
f" Task: {rec.task}",
]
if rec.parent_id:
lines.append(f" Parent: {rec.parent_id[:8]}")
if rec.result is not None:
lines.append(f" Result: {rec.result[:300]}")
return "\n".join(lines)
async def agent_list(status: str | None = None, limit: int = 10) -> str:
"""List background agents for the current user."""
from persona import get_user
user = get_user() or "unknown"
limit = min(max(int(limit), 1), 50)
records = agent_manager.list_agents(user, status=status, limit=limit)
if not records:
suffix = f" (filter: status={status})" if status else ""
return f"No agents found.{suffix}"
now = datetime.now()
lines = []
for rec in records:
end = rec.finished or now
elapsed = int((end - rec.started).total_seconds())
preview = rec.task[:60].replace("\n", " ")
result_hint = f"{rec.result[:50]}" if rec.result else ""
lines.append(
f"[{rec.agent_id[:8]}] {rec.status:<10s} L{rec.level} "
f"{rec.role:<12s} {elapsed:>5}s {preview}{result_hint}"
)
header = f"{len(records)} agent(s)" + (f" (status={status})" if status else "") + ":"
return header + "\n" + "\n".join(lines)
async def agent_cancel(agent_id: str) -> str:
"""Cancel a running background agent."""
from persona import get_user
user = get_user() or "unknown"
return await agent_manager.cancel_agent(agent_id, user)
# ── Declarations ──────────────────────────────────────────────────────────────
DECLARATIONS = [
types.FunctionDeclaration(
name="spawn_agent",
description=(
"Spawn a sub-agent to complete a task synchronously. "
"Spawn a sub-agent to complete a task. "
"In synchronous mode (default): blocks until the sub-agent finishes and returns its response. "
"In background mode (background=True): fires the agent asynchronously and returns an agent_id "
"immediately — use agent_status() to check progress or set notify=True for a completion alert. "
"The sub-agent uses the model and tool set assigned to the given role. "
"Use for processing pipelines, parallel analysis, or delegating "
"specialized work (research, coding, data migration, etc.)."
"Use for processing pipelines, parallel analysis, or delegating specialized work "
"(research, coding, data migration, etc.)."
),
parameters=types.Schema(
type=types.Type.OBJECT,
@@ -209,7 +344,7 @@ DECLARATIONS = [
),
"timeout": types.Schema(
type=types.Type.INTEGER,
description="Max seconds to wait (default 120).",
description="Max seconds to wait (default 120). Applies in both sync and background mode.",
),
"max_rounds": types.Schema(
type=types.Type.INTEGER,
@@ -221,7 +356,6 @@ DECLARATIONS = [
description=(
"Restrict the sub-agent to only these tools. "
"Intersected with the role's tool set — cannot grant more than the role allows. "
"Omit to give the sub-agent the role's full tool set. "
"Example: ['web_search', 'web_read'] for a pure research agent."
),
),
@@ -230,12 +364,83 @@ DECLARATIONS = [
items=types.Schema(type=types.Type.STRING),
description=(
"Block these tools from the sub-agent regardless of role config. "
"Use to prevent destructive operations in sensitive sub-tasks. "
"Example: ['shell_exec', 'file_write', 'cortex_restart']."
),
),
"background": types.Schema(
type=types.Type.BOOLEAN,
description=(
"Run asynchronously in the background (default: false). "
"When true, returns an agent_id immediately instead of blocking for the result. "
"Use agent_status(agent_id) to check progress. "
"Best for tasks that take more than ~30 seconds."
),
),
"notify": types.Schema(
type=types.Type.BOOLEAN,
description=(
"Send a push/Talk notification when the background agent completes (default: false). "
"Only meaningful when background=true."
),
),
},
required=["task"],
),
)
),
types.FunctionDeclaration(
name="agent_status",
description=(
"Get the current status of a background agent by ID. "
"Returns status (running/done/failed/cancelled/timeout), role, elapsed time, "
"task description, and result preview."
),
parameters=types.Schema(
type=types.Type.OBJECT,
properties={
"agent_id": types.Schema(
type=types.Type.STRING,
description="The agent ID returned by spawn_agent(background=True) or aider_run(background=True).",
),
},
required=["agent_id"],
),
),
types.FunctionDeclaration(
name="agent_list",
description=(
"List background agents for the current user. "
"Returns recent agents with ID, status, role, level, elapsed time, and task preview. "
"Use to survey what's running or recently completed."
),
parameters=types.Schema(
type=types.Type.OBJECT,
properties={
"status": types.Schema(
type=types.Type.STRING,
description="Filter by status: 'running', 'done', 'failed', 'cancelled', 'timeout'. Omit for all.",
),
"limit": types.Schema(
type=types.Type.INTEGER,
description="Max agents to return (default 10, max 50).",
),
},
),
),
types.FunctionDeclaration(
name="agent_cancel",
description=(
"Cancel a running background agent. ADMIN ONLY. Requires confirmation. "
"Use agent_list() to find the agent ID first."
),
parameters=types.Schema(
type=types.Type.OBJECT,
properties={
"agent_id": types.Schema(
type=types.Type.STRING,
description="The agent ID to cancel.",
),
},
required=["agent_id"],
),
),
]

406
cortex/tools/aider.py Normal file
View File

@@ -0,0 +1,406 @@
"""
Aider coding agent tool — invokes Aider AI pair programming as a subprocess.
Aider handles repo-map generation, file editing, git commits, and linting automatically.
It works with any OpenAI-compatible model — point it at DeepSeek, Ollama, OpenRouter, etc.
via AIDER_MODEL / AIDER_OPENAI_API_BASE env vars or the project's .aider.conf.yml.
Credentials are pulled automatically from the Cortex model registry:
- Named cloud providers (OpenRouter, OpenAI, Groq, Anthropic, …) → --api-key slug=key
- Generic OpenAI-compatible hosts (Open WebUI, Ollama, local) → --openai-api-base + key
- Anthropic from providers.anthropic.credentials → --api-key anthropic=key
background=True runs the subprocess asynchronously and returns an agent_id immediately.
"""
import asyncio
import logging
import os
from pathlib import Path
from google.genai import types
import agent_manager
logger = logging.getLogger(__name__)
_CORTEX_DIR = Path(__file__).parent # .../Cortex_and_Inara_dev/cortex/
_PROJECT_ROOT = _CORTEX_DIR.parent # .../Cortex_and_Inara_dev/
# Known project aliases — expand before passing to subprocess
_PROJECT_ALIASES: dict[str, str] = {
"cortex": str(_PROJECT_ROOT),
"aether_api": "~/OSIT_dev/aether_api_fastapi",
"aether_frontend": "~/OSIT_dev/aether_app_sveltekit",
"aether_container": "~/OSIT_dev/aether_container_env",
}
_MAX_OUTPUT_CHARS = 12_000
# Maps URL fragments → Aider --api-key provider slug.
# Order matters: more specific patterns first.
_CLOUD_PROVIDER_URL_MAP: list[tuple[str, str]] = [
("openrouter.ai", "openrouter"),
("api.openai.com", "openai"),
("groq.com", "groq"),
("api.together.xyz", "togetherai"),
("fireworks.ai", "fireworks"),
("api.x.ai", "xai"),
("api.deepseek.com", "deepseek"),
("api.mistral.ai", "mistral"),
]
def _provider_slug(api_url: str) -> str | None:
"""Return the Aider --api-key provider slug for a known cloud URL, None for generic."""
url_lower = api_url.lower()
for fragment, slug in _CLOUD_PROVIDER_URL_MAP:
if fragment in url_lower:
return slug
return None
def _host_flags(host: dict, model: str | None) -> tuple[list[str], str | None]:
"""Build Aider credential flags for a specific host entry.
Returns (extra_args, adjusted_model). For generic (local) endpoints the model
name may be prefixed with 'openai/' so Aider routes through the OpenAI client.
"""
api_url = (host.get("api_url") or "").rstrip("/")
api_key = host.get("api_key") or "none"
host_type = host.get("host_type", "openai")
slug = _provider_slug(api_url)
if slug:
# Named cloud provider — Aider maps --api-key slug=key → SLUG_API_KEY env var
flags = ["--api-key", f"{slug}={api_key}"] if api_key and api_key != "none" else []
return flags, model
# Generic OpenAI-compatible (local Open WebUI, Ollama, custom)
base_url = api_url
if host_type == "openwebui":
# Open WebUI serves the chat endpoint at /api/chat/completions
base_url = base_url + "/api"
flags = ["--openai-api-base", base_url, "--openai-api-key", api_key]
# Prefix model with 'openai/' for generic endpoints when no provider prefix is set
adj_model = model
if model and "/" not in model:
adj_model = f"openai/{model}"
return flags, adj_model
def _resolve_credentials(
registry: dict,
model: str | None,
host_label: str | None,
) -> tuple[list[str], str | None]:
"""Determine Aider credential flags and (possibly adjusted) model name.
Resolution order:
1. Anthropic model hint (claude-* / anthropic/*) → Anthropic API key
2. Explicit host_label → that host's credentials
3. Model prefix hint (openrouter/*, groq/*, …) → matching host
4. Default priority: OpenRouter → Anthropic → any keyed cloud host → local host
Returns (extra_args, adjusted_model).
"""
hosts = registry.get("hosts", [])
# Extract Anthropic key from providers.anthropic.credentials (not a host entry)
anthropic_key = None
for cred in registry.get("providers", {}).get("anthropic", {}).get("credentials", []):
if cred.get("api_key"):
anthropic_key = cred["api_key"]
break
# ── 1. Anthropic model hint ────────────────────────────────────────────────
if model and any(h in model.lower() for h in ("claude-", "anthropic/")):
if anthropic_key:
logger.debug("aider: Anthropic model detected — using Anthropic API key")
return ["--api-key", f"anthropic={anthropic_key}"], model
# ── 2. Explicit host_label override ───────────────────────────────────────
if host_label:
ll = host_label.lower()
host = next((h for h in hosts if ll in h.get("label", "").lower()), None)
if host:
logger.debug("aider: using explicitly requested host '%s'", host.get("label"))
return _host_flags(host, model)
# ── 3. Model prefix hints ─────────────────────────────────────────────────
if model:
ml = model.lower()
for fragment, slug in _CLOUD_PROVIDER_URL_MAP:
if ml.startswith(slug + "/") or ml.startswith(fragment):
host = next(
(h for h in hosts if fragment in h.get("api_url", "").lower()), None
)
if host:
logger.debug("aider: model prefix '%s' → host '%s'", slug, host.get("label"))
return _host_flags(host, model)
# ── 4. Default priority ───────────────────────────────────────────────────
# OpenRouter first (most model coverage)
or_host = next((h for h in hosts if "openrouter.ai" in h.get("api_url", "")), None)
if or_host and or_host.get("api_key"):
logger.debug("aider: defaulting to OpenRouter")
return _host_flags(or_host, model)
# Anthropic API key (no model hint but it's configured)
if anthropic_key:
logger.debug("aider: defaulting to Anthropic API key")
return ["--api-key", f"anthropic={anthropic_key}"], model
# Any other keyed cloud host
for host in hosts:
slug = _provider_slug(host.get("api_url", ""))
if slug and host.get("api_key"):
logger.debug("aider: using keyed cloud host '%s'", host.get("label"))
return _host_flags(host, model)
# Generic / local host (no key or unknown provider)
for host in hosts:
flags, adj_model = _host_flags(host, model)
if flags:
logger.debug("aider: using local host '%s'", host.get("label"))
return flags, adj_model
logger.debug("aider: no credentials found in registry — relying on env vars / .aider.conf.yml")
return [], model
async def aider_run(
project: str,
task: str,
files: list[str] | None = None,
model: str | None = None,
host_label: str | None = None,
auto_commit: bool = True,
timeout: int = 300,
background: bool = False,
notify: bool = False,
) -> str:
"""Run Aider with a single task in a project directory, then exit.
Credentials are resolved automatically from the Cortex model registry. Use
host_label to pick a specific configured host (e.g. 'OpenRouter', 'Local').
When background=True, fires the subprocess asynchronously and returns an agent_id
immediately. Use agent_status(agent_id) to check progress; set notify=True to
receive a push/Talk notification on completion.
"""
resolved = _PROJECT_ALIASES.get(project, project)
cwd = Path(os.path.expanduser(resolved))
if not cwd.is_dir():
return f"Error: project directory '{resolved}' does not exist."
timeout = min(max(int(timeout), 10), 600)
# Resolve credentials before building the command (model name may be adjusted)
user = "scott"
extra_cred_flags: list[str] = []
try:
import model_registry
from persona import get_user
user = get_user() or "scott"
registry = model_registry.get_registry(user)
extra_cred_flags, model = _resolve_credentials(registry, model, host_label)
except Exception as e:
logger.debug("aider: credential resolution failed (%s) — relying on env", e)
cmd: list[str] = [
"aider",
"--message", task,
"--yes-always",
"--no-pretty",
"--no-stream",
"--no-check-update",
"--no-detect-urls",
"--auto-commits" if auto_commit else "--no-auto-commits",
]
cmd += extra_cred_flags
if model:
cmd += ["--model", model]
for f in (files or []):
cmd += ["--file", f]
logger.info(
"aider_run: project=%s model=%s host_label=%s auto_commit=%s background=%s task=%.120s",
project, model, host_label, auto_commit, background, task,
)
async def _run() -> str:
proc = await asyncio.create_subprocess_exec(
*cmd,
cwd=str(cwd),
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=float(timeout))
out = stdout.decode(errors="replace").strip()
err = stderr.decode(errors="replace").strip()
parts = []
if out:
parts.append(out)
if err:
parts.append(f"[stderr]\n{err}")
combined = "\n".join(parts) if parts else "(no output)"
if len(combined) > _MAX_OUTPUT_CHARS:
half = _MAX_OUTPUT_CHARS // 2
combined = (
combined[:half]
+ f"\n\n[... {len(combined) - _MAX_OUTPUT_CHARS} chars trimmed ...]\n\n"
+ combined[-half:]
)
if proc.returncode not in (0, 1):
return f"[exit {proc.returncode}]\n{combined}"
return combined
if background:
rec = await agent_manager.register(
user=user,
role="aider",
task=task,
level=2,
notify=notify,
)
async def _bg_task() -> None:
try:
result = await _run()
await agent_manager.finish(rec.agent_id, result, "done")
logger.info("aider_run [bg]: done %s", rec.agent_id[:8])
except asyncio.CancelledError:
await agent_manager.finish(rec.agent_id, "Cancelled.", "cancelled")
raise
except asyncio.TimeoutError:
msg = f"Aider timed out after {timeout}s"
logger.warning("aider_run [bg]: timeout %s", rec.agent_id[:8])
await agent_manager.finish(rec.agent_id, msg, "timeout")
except FileNotFoundError:
msg = "Error: 'aider' not found in PATH — run: pip install aider-chat"
await agent_manager.finish(rec.agent_id, msg, "failed")
except Exception as e:
logger.error("aider_run [bg]: failed %s: %s", rec.agent_id[:8], e)
await agent_manager.finish(rec.agent_id, str(e), "failed")
bg = asyncio.create_task(_bg_task())
agent_manager.set_task_ref(rec.agent_id, bg)
return (
f"Aider task started in background. ID: {rec.agent_id}\n"
f"Use agent_status('{rec.agent_id}') to monitor progress."
)
# Synchronous path
try:
return await _run()
except asyncio.TimeoutError:
return f"Error: aider timed out after {timeout}s"
except FileNotFoundError:
return "Error: 'aider' not found in PATH — run: pip install aider-chat"
except Exception as e:
logger.error("aider_run error: %s", e)
return f"Error: {e}"
DECLARATIONS = [
types.FunctionDeclaration(
name="aider_run",
description=(
"Run the Aider AI coding agent on a project with a single task, then exit. "
"Aider maps the repo, edits files, runs lint checks, and optionally commits. "
"Credentials are resolved automatically from the Cortex model registry — "
"OpenRouter, local Open WebUI/Ollama, Anthropic API, and other configured hosts "
"are all supported. Use host_label to pick a specific host. "
"Set background=True for long tasks — returns an agent_id immediately and sends "
"a notification when done. ADMIN ONLY. Requires confirmation."
),
parameters=types.Schema(
type=types.Type.OBJECT,
properties={
"project": types.Schema(
type=types.Type.STRING,
description=(
"Project alias or absolute path. Known aliases: "
"'cortex' (this project), 'aether_api', 'aether_frontend', "
"'aether_container'. Or provide an absolute path."
),
),
"task": types.Schema(
type=types.Type.STRING,
description=(
"Full task description sent to Aider as --message. "
"Be specific — include file names, what to change, and why."
),
),
"files": types.Schema(
type=types.Type.ARRAY,
items=types.Schema(type=types.Type.STRING),
description=(
"Optional files to add explicitly to the editing context "
"(paths relative to project root). Aider builds a repo map "
"automatically — these get priority."
),
),
"model": types.Schema(
type=types.Type.STRING,
description=(
"Optional model override. Format depends on the provider: "
"'openrouter/anthropic/claude-3-5-haiku-20241022' (OpenRouter), "
"'claude-3-5-sonnet-20241022' (Anthropic direct), "
"'gemma-4-27b-it' or 'openai/gemma-4-27b-it' (local Open WebUI), "
"'deepseek/deepseek-chat' (DeepSeek via OpenRouter). "
"Defaults to the project's .aider.conf.yml model or AIDER_MODEL env var."
),
),
"host_label": types.Schema(
type=types.Type.STRING,
description=(
"Pick a specific configured host by label (partial match, case-insensitive). "
"Examples: 'OpenRouter', 'Local', 'scott-lt-i7-rtx'. "
"Overrides automatic credential resolution. "
"Omit to let credentials be chosen automatically."
),
),
"auto_commit": types.Schema(
type=types.Type.BOOLEAN,
description=(
"Auto-commit changes after edits (default: true). "
"Set to false to review diffs before committing manually."
),
),
"timeout": types.Schema(
type=types.Type.INTEGER,
description="Max seconds to wait for Aider to finish (default 300, max 600).",
),
"background": types.Schema(
type=types.Type.BOOLEAN,
description=(
"Run asynchronously in the background (default: false). "
"Returns an agent_id immediately; use agent_status(agent_id) to monitor. "
"Recommended for tasks expected to take more than ~60 seconds."
),
),
"notify": types.Schema(
type=types.Type.BOOLEAN,
description=(
"Send a push/Talk notification when the background task completes "
"(default: false). Only applies when background=true."
),
),
},
required=["project", "task"],
),
)
]

View File

@@ -317,6 +317,149 @@ This pattern maps naturally to several existing concepts:
---
## 13. Multi-Level Agent Management
**Status:** Design complete — implementation not yet started. See `TODO__Agents.md` for the task breakdown.
Cortex personas can spawn specialized sub-agents to handle parallel or long-running work.
Sub-agents can in turn spawn lightweight support agents for simple subtasks. The hierarchy
is capped at three levels to prevent runaway delegation.
### Level Definitions
| Level | Name | Created by | Can spawn | Tool scope |
|---|---|---|---|---|
| **1** | Cortex Persona (Inara) | HTTP request / cron | Level 2 | Full orchestrator tool set |
| **2** | Specialized Sub-Agent | Level 1 `spawn_agent` | Level 3 only | Role-scoped; `spawn_agent` auto-restricted so children are Level 3 |
| **3** | Basic Support Agent | Level 2 `spawn_agent` | Nothing | Narrow tool set; `spawn_agent` and `aider_run` denied |
**Examples:**
- Level 1 spawns a Level 2 **Coder** agent (has file + git + shell tools; can spawn a Level 3 syntax-checker)
- Level 1 spawns a Level 2 **Research** agent (web tools only; can spawn a Level 3 web reader for parallel page fetches)
- Level 2 spawns a Level 3 **Support** agent for a focused subtask (web_search only, no writes, no further delegation)
### Core Problem: Everything is Currently Synchronous
Both `spawn_agent` and `aider_run` block the calling coroutine for their full duration
(default 120s / 300s respectively). Level 1 (Inara) cannot respond to the user, send
notifications, or inspect other agents while waiting. For 5-minute Aider runs or multi-step
research agents this is unusable — the user sees nothing until completion or timeout.
### Design
#### 1. Agent Manager (`cortex/agent_manager.py`)
A lightweight in-process registry of running and recently completed agents. Module-level
dict protected by `asyncio.Lock()`:
```python
@dataclass
class AgentRecord:
agent_id: str # UUID
level: int # 1 / 2 / 3
role: str # e.g. "coder", "research"
task: str # first 200 chars of the task
status: str # running / done / failed / cancelled / timeout
started: datetime
finished: datetime | None
parent_id: str | None # lineage — which agent spawned this one
result: str | None # populated on completion (first 500 chars)
notify: bool # fire web_push/NC Talk notification on completion
user: str
_agents: dict[str, AgentRecord] = {}
_lock = asyncio.Lock()
```
On completion, the manager calls `notification.py notify()` if `notify=True` — the same
function used by reminder checks and cron completions. Completed agents stay in the
registry for 24 hours then are pruned on next access.
#### 2. Background Mode for `spawn_agent`
Add `background: bool = False` and `notify: bool = False` to `spawn_agent`. When
`background=False` (default): existing synchronous blocking behaviour — unchanged, no
regression. When `background=True`: wraps the run in `asyncio.create_task()`, registers
in the agent manager, returns an `agent_id` string immediately.
```python
# Level 1 — non-blocking delegation:
agent_id = await spawn_agent(
task="Research Zigbee mesh repeaters; summarize findings to my journal",
role="research",
background=True,
notify=True, # web_push + NC Talk when done
)
# Returns "550e8400-..." immediately. Inara continues responding to the user.
```
#### 3. Agent Lifecycle Tools
Three new tools, wired into `cortex/tools/__init__.py` under the "Agents" category:
| Tool | Params | Description |
|---|---|---|
| `agent_status(agent_id)` | `agent_id: str` | Status, role, task, elapsed, result preview |
| `agent_list(status=None, limit=10)` | `status: str \| None` | All agents for current user; filter by status |
| `agent_cancel(agent_id)` | `agent_id: str` | Cancel a running background agent (admin, confirm-required) |
Level 1 can call these between tool rounds to check on delegated work without blocking.
#### 4. Level Enforcement
`agent_level` is passed through `spawn_agent` calls as a ContextVar so each agent knows
where it sits in the hierarchy. Enforcement is automatic and simple:
- **L1 → spawns L2:** `spawn_agent` called normally. Child agent inherits role tools.
- **L2 → spawns L3:** `spawn_agent` automatically adds `deny_tools=["spawn_agent", "aider_run"]`
to the child's effective tool set. Level 3 agents cannot further delegate.
- **Level 3:** `spawn_agent` and `aider_run` are never in the tool list.
Level is stored in `AgentRecord.level` — the lineage (`parent_id`) provides a full call tree.
#### 5. `aider_run` Background Mode
Add `background: bool = False` and `notify: bool = False` to `aider_run`. When `True`,
runs the Aider subprocess via `asyncio.create_task()`, registers in the agent manager,
returns `agent_id` immediately. When called in background mode, `aider_run` is removed
from `CONFIRM_REQUIRED` — the user is not blocking on a confirmation gate since the call
returns instantly.
```python
# Level 1 or 2 — fire and forget a code change:
agent_id = await aider_run(
project="cortex",
task="Add max_chars param to http_fetch in tools/web.py, cap at 32768",
background=True,
notify=True,
)
```
### Implementation Order
1. **`agent_manager.py`** — AgentRecord + registry CRUD + completion notification hook.
Foundation for everything else; ~100 lines.
2. **`spawn_agent` background mode** — `background` + `notify` + `agent_level` params;
`asyncio.create_task()`; registers in manager. Existing sync path unchanged.
3. **`agent_status` / `agent_list` / `agent_cancel`** — wire into `__init__.py`; add to
`TOOL_CATEGORIES["Agents"]`, `TOOL_ROLES` (cancel = admin), `CONFIRM_REQUIRED` (cancel).
4. **Level enforcement**`agent_level` ContextVar; auto `deny_tools` at L2→L3 boundary.
5. **`aider_run` background mode** — same pattern as step 2.
### Files to Create/Modify
| File | Change |
|---|---|
| `cortex/agent_manager.py` | **New** — AgentRecord, registry dict, start/finish/cancel/list functions |
| `cortex/tools/agents.py` | Add `background`, `notify`, `agent_level` to `spawn_agent`; add `agent_status`, `agent_list`, `agent_cancel` functions + declarations |
| `cortex/tools/aider.py` | Add `background`, `notify` params; register with agent_manager when background |
| `cortex/tools/__init__.py` | Register new agent tools; update TOOL_CATEGORIES, TOOL_ROLES, CONFIRM_REQUIRED |
See §12 for the existing `allow_tools` / `deny_tools` per-call restrictions that level
enforcement builds on.
---
## 12. Spawner-Level Tool Restrictions — `spawn_agent` Permission Control
**Status:** Design complete, not yet built.

View File

@@ -1,7 +1,7 @@
# Cortex — Master Index
> Start here. This document is a map, not a manual.
> Last updated: 2026-05-13
> Last updated: 2026-06-03
>
> **Documentation philosophy:** Cortex is a no-black-box system. Docs must match reality.
> Update docs before implementing significant changes. Verify they still match after.
@@ -26,7 +26,7 @@ Cortex is a self-hosted personal AI platform. It routes messages from any input
| Claude backend | ✅ Live | Primary — via Claude Code CLI |
| Gemini backend | ✅ Live | Fallback — via Gemini CLI |
| Local backend | ✅ Live | Open WebUI/Ollama on scott_gaming; per-user multi-model config |
| Gemini orchestrator | ✅ Live | Tool loop → Claude response, ⚡ toggle in UI (62 tools) |
| Gemini orchestrator | ✅ Live | Tool loop → Claude response, ⚡ toggle in UI (66 tools) |
| Local orchestrator | ✅ Live | OpenAI-compatible ReAct loop; used when orchestrator role → local model |
| Model registry V2 | ✅ Live | Providers (Anthropic/Google/Local), multi-account Gemini, role assignments |
| Memory distillation | ✅ Live | Short (daily) / Mid (weekly) / Long (monthly) |
@@ -38,12 +38,13 @@ Cortex is a self-hosted personal AI platform. It routes messages from any input
| Token usage tracking | ✅ Live | Per-user daily buckets in `home/{user}/usage.json`; visible in Settings |
| Web push notifications | ✅ Live | VAPID push; `web_push` orchestrator tool; subscribe via ☰ menu |
| Proactive notifications | ✅ Live | Daily reminder check (09:00); distill/cron completion alerts; dedicated `/settings/notifications` page |
| Sub-agent spawning | ✅ Live | `spawn_agent` tool — synchronous sub-agents via any configured model |
| Sub-agent spawning | ✅ Live | `spawn_agent` tool — sync or background; `agent_status`/`agent_list`/`agent_cancel`; 3-level hierarchy (L2→L3 enforcement built in) |
| Aider coding agent | ✅ Live | `aider_run` tool — Aider subprocess; model-agnostic (DeepSeek, Ollama, OpenRouter, etc.) |
| Agent private notes | ✅ Live | `AGENT_NOTES.md` — orchestrator-only notepad; 3 rolling backups; user-visible as read-only |
| Distill safety | ✅ Live | Per-persona asyncio lock, per-endpoint cooldowns, Rebuild option |
| Guided onboarding | ✅ Live | Setup Step 3 for OpenRouter; existing-user banner; settings quick-link |
**65 orchestrator tools** across 17 domain modules — added 2026-05-12: `file_diff`, `git_status` / `git_log` / `git_diff` (read-only git inspection), `ae_db_query` / `ae_db_describe` / `ae_db_show_view` (SELECT-only Aether MariaDB access, admin, per-user credentials). `/settings/integrations` page added (admin-only). File attachments in chat (images for vision-capable local models; text/code files for all backends). Settings pages unified under `pg.css`. Added 2026-05-13: `task` cron type (full orchestrator loop on a schedule); monthly/yearly schedule formats (`monthly`, `monthly:DD:HH:MM`, `yearly:MM:DD:HH:MM`); Schedules web UI at `/settings/crons` (list, add, edit, pause, delete); HA inbound webhook tools toggle (orchestrator vs. direct LLM); Anthropic API key backend (`anthropic_api` model type via Anthropic SDK — alternative to CLI OAuth); Cloud APIs catalog in Model Registry — named provider picker (OpenRouter, OpenAI, Groq, X.ai/Grok, Together.ai, Fireworks.ai, Custom) with auto-filled URLs; hosts split into Cloud APIs / Local Hosts sections. Added 2026-05-15: Per-user custom roles — three required roles (`chat`, `orchestrator`, `distill`) are always present; users can add/remove custom roles (e.g. `coder`, `research`) via the Model Registry UI; existing `.env`-defined roles auto-migrated. Settings pages (`local_llm.html` + all settings pages) migrated to Tailwind CSS CDN (no build step); `preflight: false` preserves `pg.css` base styles; `input[type=checkbox/radio]` global width fix in `pg.css`; `btn-submit` now responsive (`w-full md:w-96`).
**69 orchestrator tools** across 17 domain modules — added 2026-06-03: `agent_status`/`agent_list` (user-level)/`agent_cancel` (admin, confirm-required); background mode for `spawn_agent` (`background=True` returns agent_id immediately; `notify=True` sends push on completion); `agent_manager.py` registry with lineage tracking and 24h pruning; L2→L3 level enforcement auto-denies `spawn_agent`/`aider_run` in Level 3 children. Added 2026-05-23: `aider_run` (Aider coding agent subprocess; project aliases for cortex/aether_api/aether_frontend/aether_container; model-agnostic via `.aider.conf.yml` or env vars; admin-only, confirm-required). `.aider.conf.yml` added to project root (read-only context, Python lint-cmd, auto-commits). Added 2026-05-12: `file_diff`, `git_status` / `git_log` / `git_diff` (read-only git inspection), `ae_db_query` / `ae_db_describe` / `ae_db_show_view` (SELECT-only Aether MariaDB access, admin, per-user credentials). `/settings/integrations` page added (admin-only). File attachments in chat (images for vision-capable local models; text/code files for all backends). Settings pages unified under `pg.css`. Added 2026-05-13: `task` cron type (full orchestrator loop on a schedule); monthly/yearly schedule formats (`monthly`, `monthly:DD:HH:MM`, `yearly:MM:DD:HH:MM`); Schedules web UI at `/settings/crons` (list, add, edit, pause, delete); HA inbound webhook tools toggle (orchestrator vs. direct LLM); Anthropic API key backend (`anthropic_api` model type via Anthropic SDK — alternative to CLI OAuth); Cloud APIs catalog in Model Registry — named provider picker (OpenRouter, OpenAI, Groq, X.ai/Grok, Together.ai, Fireworks.ai, Custom) with auto-filled URLs; hosts split into Cloud APIs / Local Hosts sections. Added 2026-05-15: Per-user custom roles — three required roles (`chat`, `orchestrator`, `distill`) are always present; users can add/remove custom roles (e.g. `coder`, `research`) via the Model Registry UI; existing `.env`-defined roles auto-migrated. Settings pages (`local_llm.html` + all settings pages) migrated to Tailwind CSS CDN (no build step); `preflight: false` preserves `pg.css` base styles; `input[type=checkbox/radio]` global width fix in `pg.css`; `btn-submit` now responsive (`w-full md:w-96`).
**Active users / personas:** scott/inara, holly/tina, brian/wintermute

View File

@@ -0,0 +1,362 @@
# PLAN — Reduce Tool Schema Overhead in Cortex
**Goal:** Eliminate the per-round, per-message transmission of all 45 tool definitions.
Drop overhead from ~8K-10K tokens per round to near zero for casual chat, and to a
relevant subset for orchestrated work.
**Status:** Draft — ready for Claude Code implementation.
---
## Background
Every orchestrated (⚡ toggled on) message triggers a ReAct tool loop. The full 45-tool
schema is rebuilt and transmitted **on every round of every call** — including rounds
where no tool is invoked and messages where no tool is needed at all. This wastes
thousands of tokens per interaction.
The architecture already has the building blocks for a fix: role configs support a
`tools` allow-list, and `get_openai_tools_for_role()` already accepts filtering
parameters. They're just not being wired together effectively.
---
## Phase 1 — Role-Based Tool Filtering (Foundation)
**Effort:** Small. **Impact:** High.
### What
Define which tools each role actually needs, then enforce the filtering so roles
only receive their relevant tool subset.
### Implementation
**1. Audit every role and define tool lists.**
| Role | Tools needed | Approx count |
|------|-------------|-------------|
| `chat` | None (zero tools — should never be in the orchestration loop) | 0 |
| `orchestrator` | web, file (admin), shell (admin), tasks, cron, reminders, scratchpad, Aether journals, agent notes, system (admin), spawn_agent, HA, ae_db, git, file_diff, file_syntax_check, notifications (admin) | 25-30 |
| `distill` | None (pure text processing) | 0 |
| `coder` | file (admin), shell (admin), git, file_diff, file_syntax_check | 8-10 |
| `research` | web_search, web_read, http_fetch | 3 |
| `admin` (role) | All 45 (admin-level access) | 45 |
**2. Store tool lists per role in `config.yaml` or the model registry defaults.**
The role config already has a `tools` field — populate it with the lists above.
**3. Enforce in `get_openai_tools_for_role()`.**
The function is called from `openai_orchestrator.py` around line 451. Currently if
`tools` is empty/missing it returns all tools. Change so that:
- If role config has a `tools` list → return only those tools
- If role config has `tools: false` → return empty list
- If role config has no `tools` field → return all (backward compat)
At the call site (`_run_from_messages`), pass the role's tool allow-list into
`get_openai_tools_for_role()` via the `tool_list` parameter that already exists.
### Files to change
- `cortex/openai_orchestrator.py` — wire role config `tools` into the call to
`get_openai_tools_for_role()`
- `cortex/model_registry.py` — ensure `get_role_config()` returns the `tools` field
(it does already, line 487)
- `cortex/config.py` or `home/{user}/model_registry.json` — define the tool lists
per default role
---
## Phase 2 — Dynamic Keyword-Based Tool Routing (High Impact)
**Effort:** Small. **Impact:** Very High.
### What
Before entering the ReAct tool loop, scan the user's message with a lightweight
keyword classifier to determine which tool categories are relevant. Only include
tools from matched categories — typically 3-8 tools instead of 45.
This is the **core optimization.** For the 80%+ of messages that only need a narrow
set of tools (or none at all), this eliminates the bulk of schema overhead on every
round.
### The Hybrid Stack
```
User message
[1] Role filter (Phase 1) — narrows 45 tools → ~25 for orchestrator role
[2] Keyword classifier (Phase 2) — narrows ~25 → 3-8 relevant tools
[3] ReAct loop — only transmitting the relevant subset each round
```
If the keyword classifier matches nothing (e.g. "good morning", "test", "what do you
think?"), it returns an empty tool set — effectively routing the message as a pure
chat interaction with zero tool overhead.
### Keyword Category Map
Each category maps keywords → tool names. Simple regex/contains matching.
| Category | Trigger keywords | Tools included |
|----------|-----------------|---------------|
| `web` | search, google, look up, what is, who is, weather, forecast, temperature, news, article, website, find, research | web_search, web_read, http_fetch |
| `web_post` | post to, send to, webhook, trigger, notify | http_post |
| `file` | read file, show file, open file, list files, directory, grep, find in, search in, diff, compare, syntax check | file_read, file_list, file_write, file_diff, file_grep, file_syntax_check, file_stat |
| `git` | git, commit, branch, pushed, pulled, merge, repo, repository | git_status, git_log, git_diff |
| `system` | restart, update, status, logs, deploy, shell, command, run, health, is it running | cortex_status, cortex_logs, cortex_restart, cortex_update, shell_exec |
| `tasks` | task, todo, to-do, to do, add task, create task, what's on my list, pending | task_list, task_create, task_update, task_complete |
| `cron` | schedule, cron, every day, every week, recurring, automate, job | cron_list, cron_add, cron_remove, cron_toggle |
| `reminders` | remind, reminder, remember, don't forget | reminders_add, reminders_list, reminders_remove, reminders_clear |
| `scratchpad` | scratch, scratchpad, working notes, jot down, notepad | scratch_read, scratch_write, scratch_append, scratch_clear |
| `ha` | home assistant, light, thermostat, turn on, turn off, kitchen, bedroom, switch, sensor, temperature | ha_get_state, ha_get_states, ha_call_service |
| `aether` | journal, aether, note entry, log entry, search journals, ae_ | ae_journal_list, ae_journal_search, ae_journal_entry_read, ae_journal_entries_list, ae_journal_entry_create, ae_journal_entry_update, ae_journal_entry_disable, ae_journal_entry_append, ae_journal_entry_prepend |
| `aether_db` | database, query, sql, select, db, table, schema, maria | ae_db_query, ae_db_describe, ae_db_show_view |
| `notifications` | notify, push, send email, email, message, talk, nextcloud | web_push, email_send, nc_talk_send, nc_talk_history |
| `agents` | spawn, sub-agent, delegate, agent | spawn_agent |
| `notes` | agent notes, private notes, my notes | agent_notes_read, agent_notes_write, agent_notes_append, agent_notes_clear |
| `session` | remember, session, history, last time, what did we, earlier, yesterday, last week | session_read, session_search |
| `ae_tasks` | ae task, kanban, board | ae_task_list |
| `claude` | claude, allow directory, permissions | claude_allow_dir |
### Implementation
In `openai_orchestrator.py`, before the ReAct loop starts:
```python
def _classify_tool_categories(user_message: str) -> list[str]:
"""Classify a user message into tool categories based on keywords.
Returns a list of category names whose tools should be included.
Returns empty list if no categories match (pure chat).
"""
message_lower = user_message.lower()
category_keywords = {
"web": ["search", "look up", "what is", "who is", "weather",
"forecast", "news", "find on", "google", "website",
"article", "research", "temperature"],
"web_post": ["post to", "send to", "webhook", "trigger webhook"],
"file": ["read file", "show file", "list file", "directory",
"grep", "search in", "find in", "diff", "compare",
"syntax check", "open file"],
"git": ["git", "commit", "branch", "pulled", "merged",
"repository", "repo"],
"system": ["restart", "update", "status", "logs", "deploy",
"run command", "shell", "is it running", "health"],
"tasks": ["task", "todo", "to-do", "to do", "add task",
"create task", "pending", "what's on my list"],
"cron": ["schedule", "cron", "every day", "every week",
"recurring", "automate", "job"],
"reminders": ["remind", "reminder", "remember", "don't forget"],
"scratchpad": ["scratch", "scratchpad", "working note", "jot down",
"notepad"],
"ha": ["home assistant", "light", "thermostat", "turn on",
"turn off", "switch", "sensor", "temperature in",
"kitchen", "bedroom", "garage"],
"aether": ["journal", "aether journal", "note entry", "log entry",
"search journal", "ae_journal"],
"aether_db": ["database", "query", "sql", "select", "db", "table",
"schema", "maria", "run query"],
"notifications":["notify", "push notification", "send email", "email",
"talk message", "nextcloud"],
"agents": ["spawn", "sub-agent", "delegate", "spawn agent"],
"notes": ["agent notes", "private notes", "my notes",
"agent_notes"],
"session": ["remember", "session", "history", "last time",
"what did we", "earlier", "yesterday", "last week",
"previously"],
"ae_tasks": ["ae task", "kanban", "board", "ae_task"],
"claude": ["claude allow", "claude directory"],
}
matched = []
for category, keywords in category_keywords.items():
if any(kw in message_lower for kw in keywords):
matched.append(category)
return matched
```
Then at the orchestration entry point, after determining the role's base tool list
(Phase 1), apply the keyword filter:
```python
# Phase 1: Get role's base tool list
role_tools = get_role_config(username, role).get("tools")
# Phase 2: Dynamically narrow based on message content
matched_categories = _classify_tool_categories(user_message)
if matched_categories:
category_tool_map = { ... } # defined at module level
dynamic_tools = []
for cat in matched_categories:
dynamic_tools.extend(category_tool_map.get(cat, []))
# Intersect with role_tools so we never grant more than the role allows
if role_tools:
dynamic_tools = [t for t in dynamic_tools if t in role_tools]
active_tools = get_openai_tools_for_role(
role=user_role,
tool_list=dynamic_tools or None
)
else:
# No keywords matched — likely causal chat route to /chat
# or use empty tool list
active_tools = []
```
### Edge Cases to Handle
1. **Multiple categories match:** Union all matched tool sets. The `for cat in matched_categories` loop handles this naturally.
2. **No categories match:** Return empty tool set. The orchestrator loop won't start — this effectively becomes a chat message without incurring the schema tax. If the LLM needs tools anyway, it will respond with a natural language request, and the user can rephrase.
3. **Ambiguous short messages:** "Hey can you check something" — matches nothing, falls through to empty tools. This is correct behavior; the LLM will ask "what do you want me to check?" and the next message will have a clear intent.
4. **Over-broad keywords:** "search" in "search journals" could trigger both `web` and `aether`. The union handles this — both categories' tools are included, which is what you want.
### File to change
- `cortex/openai_orchestrator.py` — add `_classify_tool_categories()` function and
wire it into the orchestration entry point before the ReAct loop
---
## Phase 3 — Cache Tool Schema per Session
**Effort:** Medium. **Impact:** Medium.
### What
The tool schema doesn't change between rounds of the same session for a given role.
After Phase 2 narrows it to, say, 5 tools, those 5 tool definitions are identical
every round. Cache them.
### Implementation
Add a session-scoped cache in `openai_orchestrator.py`:
```python
# Module-level cache: key = f"{session_id}:{role}:{sorted_tool_list}"
_tool_schema_cache: dict[str, list[dict]] = {}
def _get_cached_tool_schema(session_id: str, role: str, tool_list: list[str] | None) -> list[dict]:
key = f"{session_id}:{role}:{sorted(tool_list) if tool_list else 'all'}"
if key in _tool_schema_cache:
return _tool_schema_cache[key]
schemas = get_openai_tools_for_role(role=role, tool_list=tool_list)
_tool_schema_cache[key] = schemas
return schemas
```
Invalidation: Cache key includes the tool list, so if the dynamic classifier returns
different categories on the next message, it gets a fresh cache entry. No explicit
invalidation needed.
### File to change
- `cortex/openai_orchestrator.py` — add cache dict and lookup before calling
`get_openai_tools_for_role()`
---
## Phase 4 — Reduce Default Max Rounds
**Effort:** Trivial. **Impact:** Low-to-medium.
### What
Most requests resolve in 1-3 tool calls. A global cap of 10 means up to 7 wasted
schema transmissions on edge cases.
### Implementation
1. Make `max_rounds` configurable per model in the model registry (it already exists
in some model configs — see `home/brian/model_registry.json` line 42).
2. Read it from the model config during orchestration instead of using the global
`.env` value.
3. Lower the default from 10 to 5.
### Files to change
- `cortex/.env` — change `ORCHESTRATOR_MAX_ROUNDS=10` to `=5`
- `cortex/openai_orchestrator.py` — read per-model `max_rounds` from `model_cfg`
instead of only from settings
---
## Phase 5 — UI Improvements (Independent)
**Effort:** Small. **Impact:** Medium (UX).
### What
Make the tool mode indicator more obvious so the user can quickly tell whether
they're incurring the tool tax.
### Ideas
- Change ⚡ color: green when tools are on, gray when off
- Swap icon: ⚡ (tools) vs. 💬 (chat only)
- Add tooltip: "Tools enabled — all 45 tool schemas sent with each message"
- Optional: add a "Quick Question" button that sends to `/chat` directly, bypassing
the orchestrator entirely
### Files to change
- Svelte UI component — likely `ChatInput.svelte` or the chat mode toggle component
---
## Recommended Execution Order
1. **Phase 1** (role filtering) — foundation. Defines the base tool set per role.
2. **Phase 2** (keyword routing) — **the big one.** Slashes 45 tools → 3-8 for the
vast majority of messages. Builds on Phase 1's role filtering.
3. **Phase 4** (lower max_rounds) — trivial change, do alongside Phase 2.
4. **Phase 3** (schema caching) — more involved, compounds savings from Phase 2.
5. **Phase 5** (UI) — independent UX polish, can be done any time.
### Quick Win Path (Recommended First Session)
Phases 1 + 2 + 4 can be done in a single Claude Code session. They're all in
`openai_orchestrator.py` and `model_registry.py` — the same few files. Estimated
effort: 45-60 minutes of coding.
Phase 3 (caching) is a separate, focused session afterward.
---
## Appendix A: Code Locations (from grep audit 2026-05-15)
| What | File | Line |
|------|------|------|
| `get_openai_tools_for_role` definition | `cortex/tools.py` | ~540 |
| Call site (decides active_tools) | `cortex/openai_orchestrator.py` | ~449 |
| `_run_from_messages()` tool loop | `cortex/openai_orchestrator.py` | ~260 |
| Role config tools field | `cortex/model_registry.py` | ~487 |
| `get_role_config()` | `cortex/model_registry.py` | ~473 |
| `save_role_config()` (tools allow-list) | `cortex/model_registry.py` | ~455 |
| Global `ORCHESTRATOR_MAX_ROUNDS` | `cortex/.env` | 35 |
| `REQUIRED_ROLES` | `cortex/model_registry.py` | 163 |
| `DEFINED_ROLES` config | `cortex/config.py` | 80 |
| Per-model `max_rounds` example | `home/brian/model_registry.json` | 42 |
## Appendix B: Token Savings Estimate
| Scenario | Before (per round) | After Phase 1 | After Phase 1+2 | After All Phases |
|----------|-------------------|--------------|-----------------|-----------------|
| "What's the weather?" | ~9K tokens | ~5K (25 tools) | ~600 (3 web tools) | ~600 (cached) |
| "Good morning" | ~9K tokens | ~5K (25 tools) | 0 (routed to chat) | 0 |
| "Turn off kitchen lights" | ~9K tokens | ~5K (25 tools) | ~600 (3 HA tools) | ~600 (cached) |
| "Search journals for X" | ~9K tokens | ~5K (25 tools) | ~2K (10 aether tools) | ~2K (cached) |
| "Create a task" | ~9K tokens | ~5K (25 tools) | ~800 (4 task tools) | ~800 (cached) |
| "Run a SQL query" | ~9K tokens | ~5K (25 tools) | ~600 (3 db tools) | ~600 (cached) |
At 3 rounds per request and 50 requests/day, that's roughly **1.3M tokens/day saved**
vs. **~13K/day after all optimizations** — a 99% reduction for casual chat, ~90% for
most tool-using queries.

View File

@@ -48,6 +48,8 @@
-`http_post` — POST to external URLs with per-user URL prefix allowlist; admin-only, confirm-required
-`nc_talk_history` — read recent NC Talk messages; requires nc_username + nc_app_password in channels.json
- ✅ Local orchestrator retry — exponential backoff on 429/5xx/connection errors (3 attempts)
- ✅ Multi-level agent management — `agent_manager.py` (registry + lifecycle), background `spawn_agent`, `agent_status`/`agent_list`/`agent_cancel` tools, 3-level hierarchy enforcement (see `ARCH__FUTURE.md` §13)
-`aider_run` background mode — background task + push notification on completion; sync path unchanged
- [ ] Knowledge import — markdown → AE Journals (import script)
- [ ] Dev agent pipeline — specialist agents + supervisor + approval gate
- [ ] Gitea webhook integration + Actions CI

View File

@@ -67,6 +67,59 @@ automatically. Remaining work is quality/reliability parity, not ground-up desig
- [x] **`email_send`** — SMTP via email_utils, per-user regex allowlist in `home/{user}/email_allowlist.json`, managed via Settings UI textarea + Files panel raw editor — 2026-04-29
- [x] **`web_push`** — VAPID push via pywebpush; subscriptions in `home/{user}/push_subscriptions.json`; "Enable notifications" toggle in ☰ menu; sw.js push+notificationclick handlers — 2026-05-05
### [Agents] Multi-Level Agent Management
Design: `documentation/ARCH__FUTURE.md` §13
Three-level hierarchy: Level 1 = Cortex Persona; Level 2 = Specialized Sub-Agent
(can spawn Level 3); Level 3 = Basic Support Agent (cannot spawn). All spawning is
currently synchronous and blocking — this makes long-running agents (Aider, research
pipelines) unusable without freezing the orchestrator.
**Phase 1 — Foundation (build first):**
- [x] **`cortex/agent_manager.py`** — `AgentRecord` dataclass (agent_id, level, role,
task, status, started, parent_id, result, notify, user); module-level registry dict
with `asyncio.Lock()`; `register()`, `finish()`, `cancel_agent()`,
`list_agents(user, status)` functions; calls `notification.notify()` on completion
when `notify=True`; prune records older than 24 hours on next register — 2026-06-03
- [x] **Background mode for `spawn_agent`** — added `background: bool = False` and
`notify: bool = False` params; when `background=True`, wraps `_run()` in
`asyncio.create_task()`, registers in agent_manager, returns agent_id immediately;
existing sync path unchanged — 2026-06-03
- [x] **`agent_status(agent_id)` tool** — returns status, role, task excerpt, elapsed
seconds, result preview (first 300 chars); user-level — 2026-06-03
- [x] **`agent_list(status=None, limit=10)` tool** — returns running + recent agents for
current user; filter by `status`; user-level — 2026-06-03
- [x] **`agent_cancel(agent_id)` tool** — cancels background task via stored
`asyncio.Task` reference; admin-only, confirm-required — 2026-06-03
**Phase 2 — Level enforcement:**
- [x] **L2→L3 boundary enforcement**`spawn_agent` param `_agent_level` (default 2);
when `child_level >= 3`, auto-adds `spawn_agent` + `aider_run` to deny_tools so
Level 3 children cannot delegate; level stored in AgentRecord — 2026-06-03
- [ ] **`_agent_level=1` from main orchestrators** — Gemini and OpenAI orchestrators
should pass `_agent_level=1` when calling spawn_agent so the hierarchy is rooted
correctly; currently defaults to 2 (children become Level 3, which is safe but
means Level 1 cannot spawn Level 2 that itself spawns Level 3)
**Phase 3 — `aider_run` async:**
- [x] **`aider_run` background mode** — added `background: bool = False` and
`notify: bool = False` params; runs subprocess via `asyncio.create_task()`, registers
in agent_manager, returns agent_id immediately; confirmation still required (correct
— user confirms before the tool runs, not during) — 2026-06-03
- [x] **Register new tools in `__init__.py`**`agent_status`, `agent_list`, `agent_cancel`
in `TOOL_CATEGORIES["Agents"]`; `agent_cancel` in `TOOL_ROLES` (admin) and
`CONFIRM_REQUIRED`; added to `_CALLABLES` and `_ALL_DECLARATIONS` — 2026-06-03
**Tests:**
- [x] **`cortex/tests/test_agent_manager.py`** — 41 tests covering: agent_manager CRUD,
prune, notify hook, spawn_agent background mode (returns immediately, completes async,
timeout, failure), level enforcement (L1→L2 permits, L2→L3 auto-denies), agent
lifecycle tools output, aider_run background mode — 2026-06-03
Run: `cd cortex && .venv/bin/python -m pytest tests/test_agent_manager.py -v`
---
### [Tools] Orchestrator tool expansions — Round 2
Next additions identified 2026-05-08. See `ARCH__FUTURE.md` §2 for design notes.
@@ -227,11 +280,28 @@ Every orchestrator tool invocation logged to `home/{user}/tool_audit/YYYY-MM-DD.
### [Intelligence] Dev agent pipeline
See `ARCH__Intelligence_Layer.md`. Full design not yet started.
`aider_run` (2026-05-23) provides the execution layer — Cortex dispatches to Aider as
the coding worker. Aider is model-agnostic (DeepSeek, Ollama, OpenRouter, etc.) and
fully scriptable via `--message --yes-always`. This replaces the Claude Code subprocess
dependency for coding tasks. Per-project `.aider.conf.yml` holds read-only context files
and lint commands; model/key come from env vars (not committed).
- [x] **`aider_run` tool** — `cortex/tools/aider.py`; project aliases + subprocess with `--message --yes-always`; admin-only, confirm-required, high risk — 2026-05-23
- [x] **`aider_run` async/notify** — background=True fires subprocess via asyncio.create_task(), registers in agent_manager, returns agent_id immediately; notify=True sends push/Talk on completion — 2026-06-03
- [x] **`.aider.conf.yml`** — project-level Aider config: `read: [CLAUDE.md]`, Python lint-cmd, auto-commits — 2026-05-23
- [x] **`aider_run` multi-provider credentials** — `_resolve_credentials()` pulls from
all configured hosts: OpenRouter/OpenAI/Groq/etc. → `--api-key slug=key`;
local Open WebUI/Ollama → `--openai-api-base + key`; Anthropic from
`providers.anthropic.credentials`; `host_label` param for explicit host selection;
auto-prefixes model with `openai/` for generic endpoints — 2026-06-03
- [x] **`.gitignore`** — added `.aider.chat.history.md`, `.aider.input.history`, `.aider.llm.history` — 2026-05-23
- [ ] Specialist agent: frontend (SvelteKit) code changes
- [ ] Specialist agent: backend (FastAPI) code changes
- [ ] Supervisor agent: diff review, syntax check, test runner
- [ ] Gitea webhook integration: trigger on push/PR, report back
- [ ] Human approval gate before commit
- [ ] `.aider.conf.yml` for aether_api, aether_frontend, aether_container projects
### [Intelligence] Supervisor agent
- Runs `py_compile`, `svelte-check`, unit tests after specialist agent work