From 658c5089258d6edde35daf648a6630d120e84b53 Mon Sep 17 00:00:00 2001 From: Scott Idem Date: Wed, 3 Jun 2026 22:40:20 -0400 Subject: [PATCH] =?UTF-8?q?feat:=20multi-level=20agent=20management=20?= =?UTF-8?q?=E2=80=94=20background=20agents,=20lifecycle=20tools,=203-level?= =?UTF-8?q?=20hierarchy?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit agent_manager.py (new): - AgentRecord dataclass: agent_id, level (1/2/3), role, task, status, started, parent_id (lineage), finished, result, notify, _task_ref - register() / finish() / cancel_agent() / list_agents() / get() / set_task_ref() - Calls notification.notify() on completion when notify=True (same channel as reminders and cron completions) - 24-hour pruning of completed records on each new registration spawn_agent (tools/agents.py): - background=True: fires asyncio.create_task(), registers in agent_manager, returns agent_id string immediately — sync path unchanged (no regression) - notify=True: push/Talk notification when the background task completes - Level enforcement: _agent_level param tracks hierarchy depth; when spawning from Level 2, child automatically gets spawn_agent + aider_run denied so Level 3 agents cannot delegate further New lifecycle tools (tools/agents.py + __init__.py): - agent_status(agent_id) — status, role, level, elapsed, task, result preview; user-level - agent_list(status, limit) — all agents for current user, newest first; user-level - agent_cancel(agent_id) — kills background task; admin-only, confirm-required tests/test_agent_manager.py (new, 41 tests): - agent_manager CRUD, pruning, notification hook - spawn_agent background: returns immediately, completes async, timeout, failure - Level enforcement: L1→L2 permits spawn, L2→L3 auto-denies; explicit tool_list path - agent_status / agent_list / agent_cancel output formatting - aider_run background: returns agent_id, completes async, sync path unchanged - All tests run without browser or Cortex service (~2.5s total) Run: cd cortex && .venv/bin/python -m pytest tests/test_agent_manager.py -v Docs: ARCH__FUTURE.md §13 (full design), ROADMAP.md, TODO__Agents.md, MASTER.md, HELP.md (orchestrator description corrected, tool schema line updated to reflect keyword routing), CLAUDE.md tool count 66→69. Co-Authored-By: Claude Sonnet 4.6 --- CLAUDE.md | 5 +- cortex/agent_manager.py | 158 +++++++ cortex/static/HELP.md | 9 +- cortex/tests/test_agent_manager.py | 707 +++++++++++++++++++++++++++++ cortex/tools/agents.py | 237 +++++++++- documentation/ARCH__FUTURE.md | 143 ++++++ documentation/MASTER.md | 9 +- documentation/ROADMAP.md | 2 + documentation/TODO__Agents.md | 63 +++ 9 files changed, 1307 insertions(+), 26 deletions(-) create mode 100644 cortex/agent_manager.py create mode 100644 cortex/tests/test_agent_manager.py diff --git a/CLAUDE.md b/CLAUDE.md index 66ad9e0..c455ed8 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -281,7 +281,7 @@ Cortex is running and stable. All channels are live: Active users: scott (inara), holly (tina), brian (wintermute) -**65 orchestrator tools** across 17 domain modules: +**69 orchestrator tools** across 17 domain modules: web_search/http_fetch/web_read/http_post, project_file_read/list + file_stat/grep/diff/syntax_check (project-scoped), file_read/list/write/session_read/session_search (system-scoped, admin), @@ -293,7 +293,8 @@ reminders_add/list/remove/clear, scratch_read/write/append/clear, web_push/email_send/nc_talk_send/nc_talk_history, ae_journal_list/search/entries_list/entry_read/create/update/disable/append/prepend, ae_task_list, ae_db_query/describe/show_view (SELECT-only MariaDB access, admin; disable requires confirm), -agent_notes_read/write/append/clear, spawn_agent, +agent_notes_read/write/append/clear, spawn_agent/aider_run (admin; aider_run requires confirm), +agent_status/agent_list (user-level)/agent_cancel (admin, confirm-required), ha_get_state/ha_get_states/ha_call_service. Each tool has a `TOOL_RISK` rating (low/medium/high). Configure access at `/settings/tools` diff --git a/cortex/agent_manager.py b/cortex/agent_manager.py new file mode 100644 index 0000000..31b790b --- /dev/null +++ b/cortex/agent_manager.py @@ -0,0 +1,158 @@ +""" +Agent lifecycle manager — registry for background spawn_agent and aider_run tasks. + +Tracks running and recently completed agents in-process. On completion, fires +notification.notify() if notify=True (same channel used by reminders and cron jobs). + +Records are kept for 24 hours after completion, then pruned on next registration. +""" + +import asyncio +import logging +import uuid +from dataclasses import dataclass, field +from datetime import datetime, timedelta + +logger = logging.getLogger(__name__) + +_PRUNE_AFTER = timedelta(hours=24) +_RESULT_PREVIEW_CHARS = 500 +_TASK_PREVIEW_CHARS = 200 + + +@dataclass +class AgentRecord: + agent_id: str + level: int # 1 = persona, 2 = specialized sub-agent, 3 = support agent + role: str # e.g. "coder", "research", "chat" + task: str # first _TASK_PREVIEW_CHARS of the task + status: str # running / done / failed / cancelled / timeout + started: datetime + user: str + parent_id: str | None = None # agent_id of the spawner (lineage tracking) + finished: datetime | None = None + result: str | None = None # first _RESULT_PREVIEW_CHARS on completion + notify: bool = False # push notification on completion + _task_ref: "asyncio.Task | None" = field(default=None, repr=False) + + +# Module-level registry — in-process only, not persisted across restarts. +_agents: dict[str, AgentRecord] = {} +_lock = asyncio.Lock() + + +async def register( + user: str, + role: str, + task: str, + level: int = 2, + parent_id: str | None = None, + notify: bool = False, +) -> AgentRecord: + """Create and register a new running agent. Returns the record (agent_id is set).""" + agent_id = str(uuid.uuid4()) + rec = AgentRecord( + agent_id=agent_id, + level=level, + role=role, + task=task[:_TASK_PREVIEW_CHARS], + status="running", + started=datetime.now(), + user=user, + parent_id=parent_id, + notify=notify, + ) + async with _lock: + _prune_locked() + _agents[agent_id] = rec + logger.info( + "agent_manager: registered %s role=%s level=%d user=%s task=%.60s", + agent_id[:8], role, level, user, task, + ) + return rec + + +def set_task_ref(agent_id: str, task_ref: "asyncio.Task") -> None: + """Store the asyncio.Task reference so it can be cancelled later. + + Call immediately after asyncio.create_task() — before the event loop yields. + """ + rec = _agents.get(agent_id) + if rec: + rec._task_ref = task_ref + + +async def finish(agent_id: str, result: str, status: str = "done") -> None: + """Mark an agent complete, store the result, and notify the user if requested.""" + async with _lock: + rec = _agents.get(agent_id) + if not rec: + return + rec.status = status + rec.finished = datetime.now() + rec.result = (result or "")[:_RESULT_PREVIEW_CHARS] + + logger.info("agent_manager: finished %s status=%s", agent_id[:8], status) + + if rec.notify and status != "cancelled": + try: + from notification import notify as _notify + elapsed = int((rec.finished - rec.started).total_seconds()) + emoji = "✅" if status == "done" else "⚠️" + preview = (rec.result or "(no output)")[:200] + msg = f"{emoji} Agent done [{rec.role}, {elapsed}s]: {preview}" + await _notify(rec.user, msg) + except Exception as e: + logger.warning("agent_manager: notification failed for %s: %s", agent_id[:8], e) + + +async def cancel_agent(agent_id: str, user: str) -> str: + """Cancel a running background agent. Returns a human-readable status message.""" + async with _lock: + rec = _agents.get(agent_id) + if not rec: + return f"No agent found: {agent_id}" + if rec.user != user: + return "Access denied." + if rec.status != "running": + return f"Agent {agent_id[:8]}… is already {rec.status}." + task_ref = rec._task_ref + rec.status = "cancelled" + rec.finished = datetime.now() + + if task_ref and not task_ref.done(): + task_ref.cancel() + + logger.info("agent_manager: cancelled %s by user=%s", agent_id[:8], user) + return f"Agent {agent_id[:8]}… cancelled." + + +def get(agent_id: str) -> AgentRecord | None: + """Look up an agent record by ID.""" + return _agents.get(agent_id) + + +def list_agents(user: str, status: str | None = None, limit: int = 10) -> list[AgentRecord]: + """Return recent agents for a user, newest first. + + Does not acquire the lock — safe for read-only listing (Python dict iteration is + thread-safe for reads; we don't care about racing with a concurrent registration). + """ + records = [r for r in _agents.values() if r.user == user] + if status: + records = [r for r in records if r.status == status] + records.sort(key=lambda r: r.started, reverse=True) + return records[:limit] + + +def _prune_locked() -> None: + """Remove completed agents older than _PRUNE_AFTER. Must be called inside _lock.""" + cutoff = datetime.now() - _PRUNE_AFTER + stale = [ + aid for aid, r in _agents.items() + if r.status != "running" and r.finished and r.finished < cutoff + ] + for aid in stale: + del _agents[aid] + if stale: + logger.debug("agent_manager: pruned %d stale records", len(stale)) diff --git a/cortex/static/HELP.md b/cortex/static/HELP.md index efbdb30..64e6989 100644 --- a/cortex/static/HELP.md +++ b/cortex/static/HELP.md @@ -70,8 +70,8 @@ Click the **⚡** button in the input row to enable the Tools toggle. When lit ( The orchestrator runs a multi-step tool loop: 1. The **orchestrator model** reasons about the request and calls tools as needed -2. It produces an enriched summary of what it found -3. The **responder model** (set by the active Role) receives that context and writes the final user-facing reply +2. Tool results are fed back into the conversation; the loop continues until the model has what it needs +3. The model produces the final user-facing reply — when the orchestrator role uses Gemini, Claude writes the final response; when it uses a local model, that same model writes it 4. Expandable tool-call cards appear above the response — click any card to see the arguments sent and the result returned The ⚡ toggle is **independent of the Role selector** — you can use any role (chat, coder, research, etc.) with or without tools. The orchestrator model is configured in **Account → Model Registry → Role Assignments → Orchestrator**. @@ -82,7 +82,7 @@ Orchestrated sessions persist to history exactly like regular chat. ### Available Tools -65 tools across 17 categories. Each tool schema is sent to the model on every orchestrated call — fewer active tools means fewer tokens per call. +69 tools across 17 categories. Tool schemas are narrowed per-message using keyword routing — only categories relevant to your request are sent, keeping token overhead low. Per-role tool sets provide additional filtering. | Category | Tools | |---|---| @@ -101,13 +101,14 @@ Orchestrated sessions persist to history exactly like regular chat. | **Aether Tasks** | `ae_task_list` | | **Aether Database** (admin) | `ae_db_query`, `ae_db_describe`, `ae_db_show_view` | | **Agent Notes** | `agent_notes_read`, `agent_notes_write`, `agent_notes_append`, `agent_notes_clear` | -| **Agents** | `spawn_agent` | +| **Agents** | `spawn_agent`, `aider_run` | | **Home Assistant** | `ha_get_state`, `ha_get_states`, `ha_call_service` | Files, Shell, System, Aether Database, Agents, and some Notification/Web tools are **admin-only** and not visible to regular users. `http_post` requires a URL prefix allowlist in `home/{user}/http_allowlist.json`. `nc_talk_history` requires `nc_username` and `nc_app_password` in `channels.json` under `nextcloud`. `ae_db_*` tools require Aether DB credentials configured in **Integrations** settings. All queries are SELECT-only — no writes possible. +`aider_run` requires Aider installed (`pip install aider-chat`) and a model configured via `AIDER_MODEL` env var or the project's `.aider.conf.yml`. Supports any OpenAI-compatible backend — DeepSeek, OpenRouter, Ollama, etc. ### Per-Role Tool Sets diff --git a/cortex/tests/test_agent_manager.py b/cortex/tests/test_agent_manager.py new file mode 100644 index 0000000..91e3d6c --- /dev/null +++ b/cortex/tests/test_agent_manager.py @@ -0,0 +1,707 @@ +""" +Tests for agent_manager.py and the spawn_agent / aider_run background paths. + +Run with: + cd cortex && .venv/bin/python -m pytest tests/test_agent_manager.py -v + +No browser, no LLM calls, no Cortex service needed. All LLM interactions are mocked. +The agent_manager tests need no mocks at all — the module is pure asyncio. +""" + +import asyncio +import pytest +import pytest_asyncio +from datetime import datetime, timedelta +from unittest.mock import AsyncMock, MagicMock, patch + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def _make_mock_result(response: str = "Agent done."): + """Build a mock OrchestratorResult returned by openai_orchestrator.run.""" + r = MagicMock() + r.checkpoint = None + r.response = response + return r + + +def _mock_spawn_deps( + model_type: str = "local_openai", + user_role: str = "admin", + tool_policy: dict | None = None, + role_tools: list | None = None, +): + """Return a context-manager stack that patches all spawn_agent external deps.""" + if tool_policy is None: + tool_policy = {"allow": [], "deny": []} + model_cfg = { + "type": model_type, + "api_url": "http://localhost:3000", + "model_name": "test-model", + "api_key": "x", + } + role_cfg = { + "tools": role_tools, + "system_append": "", + "inject_datetime": True, + "inject_mode": True, + } + + class _Stack: + def __enter__(self_): + self_._patches = [ + patch("model_registry.get_role_config", return_value=role_cfg), + patch("model_registry.get_model_for_role", return_value=model_cfg), + patch("model_registry.get_registry", return_value={"hosts": []}), + patch("context_loader.load_context", return_value="Test system prompt"), + patch("auth_utils.get_user_role", return_value=user_role), + patch("auth_utils.get_tool_policy", return_value=tool_policy), + patch("persona.get_user", return_value="scott"), + ] + for p in self_._patches: + p.start() + return self_ + + def __exit__(self_, *args): + for p in self_._patches: + p.stop() + + return _Stack() + + +# --------------------------------------------------------------------------- +# Fixture — reset agent_manager state between tests +# --------------------------------------------------------------------------- + +@pytest.fixture(autouse=True) +def clear_agent_registry(): + """Wipe the in-process agent registry before each test.""" + import agent_manager + agent_manager._agents.clear() + yield + agent_manager._agents.clear() + + +# --------------------------------------------------------------------------- +# agent_manager — core CRUD +# --------------------------------------------------------------------------- + +class TestAgentManagerCore: + + @pytest.mark.asyncio + async def test_register_creates_record(self): + import agent_manager + rec = await agent_manager.register( + user="scott", role="research", task="Investigate topic X", level=2 + ) + assert rec.agent_id in agent_manager._agents + assert rec.status == "running" + assert rec.level == 2 + assert rec.role == "research" + assert rec.task == "Investigate topic X" + assert rec.user == "scott" + assert rec.finished is None + + @pytest.mark.asyncio + async def test_register_truncates_long_task(self): + import agent_manager + long_task = "x" * 500 + rec = await agent_manager.register(user="scott", role="chat", task=long_task, level=2) + assert len(rec.task) == 200 + + @pytest.mark.asyncio + async def test_finish_updates_record(self): + import agent_manager + rec = await agent_manager.register(user="scott", role="chat", task="t", level=2) + await agent_manager.finish(rec.agent_id, "All done!", "done") + + updated = agent_manager.get(rec.agent_id) + assert updated.status == "done" + assert updated.result == "All done!" + assert updated.finished is not None + + @pytest.mark.asyncio + async def test_finish_truncates_result(self): + import agent_manager + rec = await agent_manager.register(user="scott", role="chat", task="t", level=2) + await agent_manager.finish(rec.agent_id, "y" * 2000) + + updated = agent_manager.get(rec.agent_id) + assert len(updated.result) <= agent_manager._RESULT_PREVIEW_CHARS + + @pytest.mark.asyncio + async def test_finish_failed_status(self): + import agent_manager + rec = await agent_manager.register(user="scott", role="chat", task="t", level=2) + await agent_manager.finish(rec.agent_id, "Boom", "failed") + assert agent_manager.get(rec.agent_id).status == "failed" + + @pytest.mark.asyncio + async def test_cancel_own_agent(self): + import agent_manager + rec = await agent_manager.register(user="scott", role="chat", task="t", level=2) + msg = await agent_manager.cancel_agent(rec.agent_id, "scott") + assert "cancelled" in msg + assert agent_manager.get(rec.agent_id).status == "cancelled" + + @pytest.mark.asyncio + async def test_cancel_wrong_user_denied(self): + import agent_manager + rec = await agent_manager.register(user="scott", role="chat", task="t", level=2) + msg = await agent_manager.cancel_agent(rec.agent_id, "holly") + assert "denied" in msg.lower() + assert agent_manager.get(rec.agent_id).status == "running" + + @pytest.mark.asyncio + async def test_cancel_nonexistent_agent(self): + import agent_manager + msg = await agent_manager.cancel_agent("does-not-exist", "scott") + assert "No agent found" in msg + + @pytest.mark.asyncio + async def test_cancel_already_done(self): + import agent_manager + rec = await agent_manager.register(user="scott", role="chat", task="t", level=2) + await agent_manager.finish(rec.agent_id, "done", "done") + msg = await agent_manager.cancel_agent(rec.agent_id, "scott") + assert "already" in msg or "done" in msg + + @pytest.mark.asyncio + async def test_cancel_kills_real_task(self): + import agent_manager + rec = await agent_manager.register(user="scott", role="chat", task="t", level=2) + + sleep_task = asyncio.create_task(asyncio.sleep(60)) + agent_manager.set_task_ref(rec.agent_id, sleep_task) + + await agent_manager.cancel_agent(rec.agent_id, "scott") + await asyncio.sleep(0) # let the event loop process the cancellation + + assert sleep_task.cancelled() or sleep_task.done() + + def test_list_agents_returns_users_agents(self): + import agent_manager + # Manually populate the registry + agent_manager._agents["a1"] = _make_record("a1", "scott", "running") + agent_manager._agents["a2"] = _make_record("a2", "scott", "done") + agent_manager._agents["a3"] = _make_record("a3", "holly", "running") + + records = agent_manager.list_agents("scott") + ids = {r.agent_id for r in records} + assert "a1" in ids + assert "a2" in ids + assert "a3" not in ids + + def test_list_agents_filters_by_status(self): + import agent_manager + agent_manager._agents["a1"] = _make_record("a1", "scott", "running") + agent_manager._agents["a2"] = _make_record("a2", "scott", "done") + + running = agent_manager.list_agents("scott", status="running") + assert len(running) == 1 + assert running[0].agent_id == "a1" + + def test_list_agents_respects_limit(self): + import agent_manager + for i in range(20): + agent_manager._agents[f"a{i}"] = _make_record(f"a{i}", "scott", "done") + + records = agent_manager.list_agents("scott", limit=5) + assert len(records) == 5 + + @pytest.mark.asyncio + async def test_prune_removes_old_completed(self): + import agent_manager + rec = await agent_manager.register(user="scott", role="chat", task="t", level=2) + await agent_manager.finish(rec.agent_id, "done") + + # Manually backdate the finished time past the prune threshold + agent_manager._agents[rec.agent_id].finished = ( + datetime.now() - agent_manager._PRUNE_AFTER - timedelta(seconds=1) + ) + + # Trigger pruning via a new registration + await agent_manager.register(user="scott", role="chat", task="t2", level=2) + + assert agent_manager.get(rec.agent_id) is None + + @pytest.mark.asyncio + async def test_prune_keeps_running_agents(self): + import agent_manager + rec = await agent_manager.register(user="scott", role="chat", task="t", level=2) + # Running agent — finished is None so it should never be pruned + assert rec.agent_id in agent_manager._agents + + await agent_manager.register(user="scott", role="chat", task="t2", level=2) + assert agent_manager.get(rec.agent_id) is not None + + @pytest.mark.asyncio + async def test_finish_unknown_agent_is_noop(self): + import agent_manager + # Should not raise + await agent_manager.finish("ghost-id", "result", "done") + + +# --------------------------------------------------------------------------- +# agent_manager — notification hook +# --------------------------------------------------------------------------- + +class TestAgentManagerNotify: + + @pytest.mark.asyncio + async def test_notify_called_on_done(self): + import agent_manager + rec = await agent_manager.register( + user="scott", role="chat", task="t", level=2, notify=True + ) + with patch("notification.notify", new_callable=AsyncMock) as mock_notify: + await agent_manager.finish(rec.agent_id, "All good", "done") + mock_notify.assert_called_once() + call_args = mock_notify.call_args + assert call_args[0][0] == "scott" # user + assert "✅" in call_args[0][1] # success emoji + + @pytest.mark.asyncio + async def test_notify_called_on_failed(self): + import agent_manager + rec = await agent_manager.register( + user="scott", role="chat", task="t", level=2, notify=True + ) + with patch("notification.notify", new_callable=AsyncMock) as mock_notify: + await agent_manager.finish(rec.agent_id, "Oops", "failed") + mock_notify.assert_called_once() + assert "⚠️" in mock_notify.call_args[0][1] + + @pytest.mark.asyncio + async def test_no_notify_when_cancelled(self): + import agent_manager + rec = await agent_manager.register( + user="scott", role="chat", task="t", level=2, notify=True + ) + with patch("notification.notify", new_callable=AsyncMock) as mock_notify: + await agent_manager.finish(rec.agent_id, "Cancelled.", "cancelled") + mock_notify.assert_not_called() + + @pytest.mark.asyncio + async def test_no_notify_when_flag_false(self): + import agent_manager + rec = await agent_manager.register( + user="scott", role="chat", task="t", level=2, notify=False + ) + with patch("notification.notify", new_callable=AsyncMock) as mock_notify: + await agent_manager.finish(rec.agent_id, "Done", "done") + mock_notify.assert_not_called() + + +# --------------------------------------------------------------------------- +# spawn_agent — background mode +# --------------------------------------------------------------------------- + +class TestSpawnAgentBackground: + + @pytest.mark.asyncio + async def test_background_returns_agent_id_immediately(self): + import agent_manager + from tools.agents import spawn_agent + + mock_result = _make_mock_result("Research complete.") + with _mock_spawn_deps(): + with patch("openai_orchestrator.run", new_callable=AsyncMock, return_value=mock_result): + result = await spawn_agent( + task="Test background research", + role="research", + background=True, + ) + + assert "Agent started in background" in result + assert "ID:" in result + + @pytest.mark.asyncio + async def test_background_registers_agent(self): + import agent_manager + from tools.agents import spawn_agent + + mock_result = _make_mock_result() + with _mock_spawn_deps(): + with patch("openai_orchestrator.run", new_callable=AsyncMock, return_value=mock_result): + await spawn_agent(task="Background task", background=True) + + agents = agent_manager.list_agents("scott") + assert len(agents) >= 1 + + @pytest.mark.asyncio + async def test_background_agent_eventually_completes(self): + import agent_manager + from tools.agents import spawn_agent + + mock_result = _make_mock_result("Task done!") + with _mock_spawn_deps(): + with patch("openai_orchestrator.run", new_callable=AsyncMock, return_value=mock_result): + result = await spawn_agent(task="Quick task", background=True) + agent_id = result.split("ID: ")[1].split("\n")[0].strip() + + # Poll while patches are still active + for _ in range(40): + rec = agent_manager.get(agent_id) + if rec and rec.status != "running": + break + await asyncio.sleep(0.05) + + rec = agent_manager.get(agent_id) + assert rec is not None + assert rec.status == "done" + assert "Task done!" in (rec.result or "") + + @pytest.mark.asyncio + async def test_background_sync_path_unchanged(self): + """Verify that background=False still blocks and returns the result string.""" + from tools.agents import spawn_agent + + mock_result = _make_mock_result("Sync result here.") + with _mock_spawn_deps(): + with patch("openai_orchestrator.run", new_callable=AsyncMock, return_value=mock_result): + result = await spawn_agent(task="Sync task", background=False) + + assert result == "Sync result here." + + @pytest.mark.asyncio + async def test_background_agent_timeout(self): + import agent_manager + from tools.agents import spawn_agent + + async def _slow(*args, **kwargs): + await asyncio.sleep(60) + return _make_mock_result() + + with _mock_spawn_deps(): + with patch("openai_orchestrator.run", side_effect=_slow): + result = await spawn_agent(task="Slow task", background=True, timeout=1) + agent_id = result.split("ID: ")[1].split("\n")[0].strip() + + # Poll while patches are still active (timeout=1s so this completes quickly) + for _ in range(60): + rec = agent_manager.get(agent_id) + if rec and rec.status != "running": + break + await asyncio.sleep(0.05) + + rec = agent_manager.get(agent_id) + assert rec.status == "timeout" + + @pytest.mark.asyncio + async def test_background_agent_failure(self): + import agent_manager + from tools.agents import spawn_agent + + with _mock_spawn_deps(): + with patch("openai_orchestrator.run", new_callable=AsyncMock, side_effect=RuntimeError("Boom")): + result = await spawn_agent(task="Failing task", background=True) + + agent_id = result.split("ID: ")[1].split("\n")[0].strip() + + for _ in range(20): + rec = agent_manager.get(agent_id) + if rec and rec.status != "running": + break + await asyncio.sleep(0.05) + + assert agent_manager.get(agent_id).status == "failed" + + +# --------------------------------------------------------------------------- +# spawn_agent — level enforcement +# --------------------------------------------------------------------------- + +class TestLevelEnforcement: + + @pytest.mark.asyncio + async def test_l2_parent_denies_spawn_in_l3_child(self): + """Level 2 agent spawning a child: spawn_agent and aider_run must be denied.""" + from tools.agents import spawn_agent + + captured_kwargs = {} + + async def _capture_run(**kwargs): + captured_kwargs.update(kwargs) + return _make_mock_result() + + with _mock_spawn_deps(): + with patch("openai_orchestrator.run", side_effect=_capture_run): + await spawn_agent( + task="Test L3 enforcement", + background=False, + _agent_level=2, # this agent is Level 2; its child would be Level 3 + ) + + # The orchestrator should have received spawn_agent and aider_run in confirm_deny + confirm_deny = captured_kwargs.get("confirm_deny", set()) + assert "spawn_agent" in confirm_deny, "spawn_agent must be blocked for L3 children" + assert "aider_run" in confirm_deny, "aider_run must be blocked for L3 children" + + @pytest.mark.asyncio + async def test_l1_parent_does_not_deny_spawn(self): + """Level 1 agent (persona) spawning a Level 2 child: no extra denies.""" + from tools.agents import spawn_agent + + captured_kwargs = {} + + async def _capture_run(**kwargs): + captured_kwargs.update(kwargs) + return _make_mock_result() + + with _mock_spawn_deps(): + with patch("openai_orchestrator.run", side_effect=_capture_run): + await spawn_agent( + task="Test L2 spawn", + background=False, + _agent_level=1, # persona is Level 1; child would be Level 2 + ) + + confirm_deny = captured_kwargs.get("confirm_deny", set()) + assert "spawn_agent" not in confirm_deny, "L2 agents must be allowed to spawn" + + @pytest.mark.asyncio + async def test_l2_deny_intersected_with_tool_list(self): + """When the role has an explicit tool_list, L3 deny removes from list directly.""" + from tools.agents import spawn_agent + + captured_kwargs = {} + + async def _capture_run(**kwargs): + captured_kwargs.update(kwargs) + return _make_mock_result() + + # Role has an explicit tool_list that includes spawn_agent + with _mock_spawn_deps(role_tools=["web_search", "spawn_agent", "aider_run"]): + with patch("openai_orchestrator.run", side_effect=_capture_run): + await spawn_agent( + task="Test", + background=False, + _agent_level=2, + ) + + # spawn_agent and aider_run must be absent from the tool_list passed to orchestrator + tool_list = captured_kwargs.get("tool_list", []) + assert "spawn_agent" not in tool_list + assert "aider_run" not in tool_list + assert "web_search" in tool_list # unrelated tools must survive + + +# --------------------------------------------------------------------------- +# Agent lifecycle tools — output formatting +# --------------------------------------------------------------------------- + +class TestAgentLifecycleTools: + + @pytest.mark.asyncio + async def test_agent_status_running(self): + import agent_manager + rec = await agent_manager.register(user="scott", role="research", task="Do research", level=2) + + with patch("persona.get_user", return_value="scott"): + from tools.agents import agent_status + output = await agent_status(rec.agent_id) + + assert "running" in output + assert "research" in output + assert rec.agent_id[:8] in output + + @pytest.mark.asyncio + async def test_agent_status_done(self): + import agent_manager + rec = await agent_manager.register(user="scott", role="chat", task="Task", level=2) + await agent_manager.finish(rec.agent_id, "The result text", "done") + + with patch("persona.get_user", return_value="scott"): + from tools.agents import agent_status + output = await agent_status(rec.agent_id) + + assert "done" in output + assert "The result text" in output + + @pytest.mark.asyncio + async def test_agent_status_wrong_user(self): + import agent_manager + rec = await agent_manager.register(user="scott", role="chat", task="t", level=2) + + with patch("persona.get_user", return_value="holly"): + from tools.agents import agent_status + output = await agent_status(rec.agent_id) + + assert "denied" in output.lower() + + @pytest.mark.asyncio + async def test_agent_status_not_found(self): + with patch("persona.get_user", return_value="scott"): + from tools.agents import agent_status + output = await agent_status("nonexistent-id") + + assert "No agent found" in output + + @pytest.mark.asyncio + async def test_agent_list_shows_running(self): + import agent_manager + await agent_manager.register(user="scott", role="research", task="Research X", level=2) + await agent_manager.register(user="scott", role="coder", task="Fix bug", level=2) + + with patch("persona.get_user", return_value="scott"): + from tools.agents import agent_list + output = await agent_list() + + assert "2 agent(s)" in output + assert "research" in output + assert "coder" in output + + @pytest.mark.asyncio + async def test_agent_list_status_filter(self): + import agent_manager + rec = await agent_manager.register(user="scott", role="chat", task="t", level=2) + await agent_manager.finish(rec.agent_id, "done", "done") + await agent_manager.register(user="scott", role="chat", task="t2", level=2) + + with patch("persona.get_user", return_value="scott"): + from tools.agents import agent_list + output = await agent_list(status="running") + + assert "1 agent(s)" in output + + @pytest.mark.asyncio + async def test_agent_list_empty(self): + with patch("persona.get_user", return_value="scott"): + from tools.agents import agent_list + output = await agent_list() + + assert "No agents found" in output + + @pytest.mark.asyncio + async def test_agent_cancel_tool(self): + import agent_manager + rec = await agent_manager.register(user="scott", role="chat", task="t", level=2) + + with patch("persona.get_user", return_value="scott"): + from tools.agents import agent_cancel + output = await agent_cancel(rec.agent_id) + + assert "cancelled" in output + assert agent_manager.get(rec.agent_id).status == "cancelled" + + +# --------------------------------------------------------------------------- +# aider_run — background mode +# --------------------------------------------------------------------------- + +class TestAiderRunBackground: + + @pytest.mark.asyncio + async def test_background_returns_agent_id(self): + import agent_manager + + async def _fake_proc(*args, **kwargs): + mock_proc = MagicMock() + mock_proc.communicate = AsyncMock(return_value=(b"All changes applied.", b"")) + mock_proc.returncode = 0 + return mock_proc + + with ( + patch("persona.get_user", return_value="scott"), + patch("model_registry.get_registry", return_value={"hosts": []}), + patch("asyncio.create_subprocess_exec", side_effect=_fake_proc), + ): + from tools.aider import aider_run + result = await aider_run( + project=str(_CORTEX_DIR.parent), # use actual project root (exists) + task="Test background task", + background=True, + ) + + assert "Aider task started in background" in result + assert "ID:" in result + + @pytest.mark.asyncio + async def test_background_agent_completes(self): + import agent_manager + + async def _fake_proc(*args, **kwargs): + mock_proc = MagicMock() + mock_proc.communicate = AsyncMock(return_value=(b"Edits applied.", b"")) + mock_proc.returncode = 0 + return mock_proc + + from tools.aider import aider_run + with ( + patch("persona.get_user", return_value="scott"), + patch("model_registry.get_registry", return_value={"hosts": []}), + patch("asyncio.create_subprocess_exec", side_effect=_fake_proc), + ): + result = await aider_run( + project=str(_CORTEX_DIR.parent), + task="Test", + background=True, + ) + agent_id = result.split("ID: ")[1].split("\n")[0].strip() + + # Poll while patches are still active + for _ in range(40): + rec = agent_manager.get(agent_id) + if rec and rec.status != "running": + break + await asyncio.sleep(0.05) + + rec = agent_manager.get(agent_id) + assert rec.status == "done" + assert "Edits applied" in (rec.result or "") + + @pytest.mark.asyncio + async def test_invalid_project_directory(self): + from tools.aider import aider_run + result = await aider_run(project="/this/does/not/exist", task="Test") + assert "does not exist" in result + + @pytest.mark.asyncio + async def test_sync_path_still_works(self): + async def _fake_proc(*args, **kwargs): + mock_proc = MagicMock() + mock_proc.communicate = AsyncMock(return_value=(b"Done.", b"")) + mock_proc.returncode = 0 + return mock_proc + + with ( + patch("persona.get_user", return_value="scott"), + patch("model_registry.get_registry", return_value={"hosts": []}), + patch("asyncio.create_subprocess_exec", side_effect=_fake_proc), + ): + from tools.aider import aider_run + result = await aider_run( + project=str(_CORTEX_DIR.parent), + task="Sync test", + background=False, + ) + + assert "Done." in result + + +# --------------------------------------------------------------------------- +# Helpers for manual test record creation (used in list tests above) +# --------------------------------------------------------------------------- + +import agent_manager as _am + +_CORTEX_DIR = _am.__file__ and _am and __import__("pathlib").Path(_am.__file__).parent + + +def _make_record(agent_id: str, user: str, status: str) -> "_am.AgentRecord": + from datetime import datetime + import agent_manager + rec = agent_manager.AgentRecord( + agent_id=agent_id, + level=2, + role="chat", + task="test task", + status=status, + started=datetime.now(), + user=user, + finished=datetime.now() if status != "running" else None, + ) + return rec diff --git a/cortex/tools/agents.py b/cortex/tools/agents.py index e6538c5..bb834df 100644 --- a/cortex/tools/agents.py +++ b/cortex/tools/agents.py @@ -1,18 +1,25 @@ """ -Agent spawning tool — lets the orchestrator launch sub-agents synchronously. +Agent spawning and lifecycle tools. -Sub-agents run using the model assigned to the specified role. The call blocks -until the sub-agent completes or times out. +spawn_agent — synchronous or background sub-agent via any configured role model. +agent_status / agent_list / agent_cancel — lifecycle management for background agents. -Supported model types: local_openai, gemini_api. -claude_cli / gemini_cli are chat-only and do not support sub-agent tool loops. +Sub-agents run using the model and tools assigned to the given role. The three-level +hierarchy (Persona → Specialized → Support) is enforced by denying spawn_agent and +aider_run at the L2→L3 boundary — Level 3 agents cannot delegate further. + +Supported model types for sub-agents: local_openai, gemini_api. +claude_cli / gemini_cli are chat-only and do not support tool-enabled sub-agents. """ import asyncio import logging +from datetime import datetime from google.genai import types +import agent_manager + logger = logging.getLogger(__name__) # Per-host semaphores — keyed by "host:" or "type:" @@ -20,6 +27,9 @@ logger = logging.getLogger(__name__) _semaphores: dict[str, asyncio.Semaphore] = {} _sem_lock = asyncio.Lock() +# Tools denied at the L2→L3 boundary so Level 3 agents cannot delegate further. +_L3_DENY_TOOLS = ["spawn_agent", "aider_run"] + async def _get_semaphore(key: str, max_concurrent: int) -> asyncio.Semaphore: """Return (or create) the semaphore for a given host/type key.""" @@ -37,12 +47,23 @@ async def spawn_agent( max_rounds: int | None = None, allow_tools: list[str] | None = None, deny_tools: list[str] | None = None, + background: bool = False, + notify: bool = False, + _agent_level: int = 2, ) -> str: """ - Spawn a sub-agent to complete a task synchronously. + Spawn a sub-agent to complete a task. - The sub-agent uses the model and tools assigned to the given role. Returns - the sub-agent's response as a string. + In synchronous mode (background=False, the default): blocks until done and returns + the result string. + + In background mode (background=True): registers the agent, fires it as an asyncio + background task, and returns an agent_id string immediately. Use agent_status() to + poll, or set notify=True to receive a push notification on completion. + + Level enforcement: this agent (level _agent_level) spawns children at level+1. + Children at level 3 automatically have spawn_agent and aider_run denied so they + cannot delegate further. """ import model_registry from context_loader import load_context @@ -105,9 +126,18 @@ async def spawn_agent( if tool_list is not None: tool_list = [t for t in tool_list if t not in deny_set] else: - # tool_list is unrestricted — block via confirm_deny so the gate fires confirm_deny = confirm_deny | deny_set + # Level enforcement: children of this agent are at level _agent_level + 1. + # Level 3 children cannot delegate — auto-deny the spawning tools. + child_level = _agent_level + 1 + if child_level >= 3: + l3_deny = set(_L3_DENY_TOOLS) + if tool_list is not None: + tool_list = [t for t in tool_list if t not in l3_deny] + else: + confirm_deny = confirm_deny | l3_deny + if max_rounds is not None: model_cfg = dict(model_cfg) model_cfg["max_rounds"] = max_rounds @@ -158,6 +188,41 @@ async def spawn_agent( ) return result.response or "(sub-agent returned no output)" + if background: + rec = await agent_manager.register( + user=user, + role=role, + task=task, + level=_agent_level, + notify=notify, + ) + + async def _bg_task() -> None: + async with sem: + try: + logger.info( + "spawn_agent [bg]: %s role=%s level=%d timeout=%ds", + rec.agent_id[:8], role, _agent_level, timeout, + ) + result = await asyncio.wait_for(_run(), timeout=float(timeout)) + await agent_manager.finish(rec.agent_id, result, "done") + logger.info("spawn_agent [bg]: done %s", rec.agent_id[:8]) + except asyncio.CancelledError: + await agent_manager.finish(rec.agent_id, "Cancelled.", "cancelled") + raise + except asyncio.TimeoutError: + msg = f"Sub-agent timed out after {timeout}s (role={role})" + logger.warning("spawn_agent [bg]: timeout %s", rec.agent_id[:8]) + await agent_manager.finish(rec.agent_id, msg, "timeout") + except Exception as e: + logger.exception("spawn_agent [bg]: failed %s", rec.agent_id[:8]) + await agent_manager.finish(rec.agent_id, str(e), "failed") + + bg = asyncio.create_task(_bg_task()) + agent_manager.set_task_ref(rec.agent_id, bg) + return f"Agent started in background. ID: {rec.agent_id}\nUse agent_status('{rec.agent_id}') to check progress." + + # Synchronous path — unchanged behaviour async with sem: try: logger.info( @@ -175,14 +240,84 @@ async def spawn_agent( return f"Sub-agent error ({role}): {e}" +# ── Agent lifecycle tools ───────────────────────────────────────────────────── + +async def agent_status(agent_id: str) -> str: + """Return the status and result preview of a background agent.""" + from persona import get_user + user = get_user() or "unknown" + rec = agent_manager.get(agent_id) + if not rec: + return f"No agent found with ID: {agent_id}" + if rec.user != user: + return "Access denied." + + now = datetime.now() + end = rec.finished or now + elapsed = int((end - rec.started).total_seconds()) + + lines = [ + f"Agent {rec.agent_id[:8]}…", + f" Status: {rec.status}", + f" Role: {rec.role} (Level {rec.level})", + f" Elapsed: {elapsed}s", + f" Started: {rec.started.strftime('%Y-%m-%d %H:%M:%S')}", + f" Task: {rec.task}", + ] + if rec.parent_id: + lines.append(f" Parent: {rec.parent_id[:8]}…") + if rec.result is not None: + lines.append(f" Result: {rec.result[:300]}") + return "\n".join(lines) + + +async def agent_list(status: str | None = None, limit: int = 10) -> str: + """List background agents for the current user.""" + from persona import get_user + user = get_user() or "unknown" + limit = min(max(int(limit), 1), 50) + records = agent_manager.list_agents(user, status=status, limit=limit) + + if not records: + suffix = f" (filter: status={status})" if status else "" + return f"No agents found.{suffix}" + + now = datetime.now() + lines = [] + for rec in records: + end = rec.finished or now + elapsed = int((end - rec.started).total_seconds()) + preview = rec.task[:60].replace("\n", " ") + result_hint = f" → {rec.result[:50]}" if rec.result else "" + lines.append( + f"[{rec.agent_id[:8]}] {rec.status:<10s} L{rec.level} " + f"{rec.role:<12s} {elapsed:>5}s {preview}{result_hint}" + ) + + header = f"{len(records)} agent(s)" + (f" (status={status})" if status else "") + ":" + return header + "\n" + "\n".join(lines) + + +async def agent_cancel(agent_id: str) -> str: + """Cancel a running background agent.""" + from persona import get_user + user = get_user() or "unknown" + return await agent_manager.cancel_agent(agent_id, user) + + +# ── Declarations ────────────────────────────────────────────────────────────── + DECLARATIONS = [ types.FunctionDeclaration( name="spawn_agent", description=( - "Spawn a sub-agent to complete a task synchronously. " + "Spawn a sub-agent to complete a task. " + "In synchronous mode (default): blocks until the sub-agent finishes and returns its response. " + "In background mode (background=True): fires the agent asynchronously and returns an agent_id " + "immediately — use agent_status() to check progress or set notify=True for a completion alert. " "The sub-agent uses the model and tool set assigned to the given role. " - "Use for processing pipelines, parallel analysis, or delegating " - "specialized work (research, coding, data migration, etc.)." + "Use for processing pipelines, parallel analysis, or delegating specialized work " + "(research, coding, data migration, etc.)." ), parameters=types.Schema( type=types.Type.OBJECT, @@ -209,7 +344,7 @@ DECLARATIONS = [ ), "timeout": types.Schema( type=types.Type.INTEGER, - description="Max seconds to wait (default 120).", + description="Max seconds to wait (default 120). Applies in both sync and background mode.", ), "max_rounds": types.Schema( type=types.Type.INTEGER, @@ -221,7 +356,6 @@ DECLARATIONS = [ description=( "Restrict the sub-agent to only these tools. " "Intersected with the role's tool set — cannot grant more than the role allows. " - "Omit to give the sub-agent the role's full tool set. " "Example: ['web_search', 'web_read'] for a pure research agent." ), ), @@ -230,12 +364,83 @@ DECLARATIONS = [ items=types.Schema(type=types.Type.STRING), description=( "Block these tools from the sub-agent regardless of role config. " - "Use to prevent destructive operations in sensitive sub-tasks. " "Example: ['shell_exec', 'file_write', 'cortex_restart']." ), ), + "background": types.Schema( + type=types.Type.BOOLEAN, + description=( + "Run asynchronously in the background (default: false). " + "When true, returns an agent_id immediately instead of blocking for the result. " + "Use agent_status(agent_id) to check progress. " + "Best for tasks that take more than ~30 seconds." + ), + ), + "notify": types.Schema( + type=types.Type.BOOLEAN, + description=( + "Send a push/Talk notification when the background agent completes (default: false). " + "Only meaningful when background=true." + ), + ), }, required=["task"], ), - ) + ), + types.FunctionDeclaration( + name="agent_status", + description=( + "Get the current status of a background agent by ID. " + "Returns status (running/done/failed/cancelled/timeout), role, elapsed time, " + "task description, and result preview." + ), + parameters=types.Schema( + type=types.Type.OBJECT, + properties={ + "agent_id": types.Schema( + type=types.Type.STRING, + description="The agent ID returned by spawn_agent(background=True) or aider_run(background=True).", + ), + }, + required=["agent_id"], + ), + ), + types.FunctionDeclaration( + name="agent_list", + description=( + "List background agents for the current user. " + "Returns recent agents with ID, status, role, level, elapsed time, and task preview. " + "Use to survey what's running or recently completed." + ), + parameters=types.Schema( + type=types.Type.OBJECT, + properties={ + "status": types.Schema( + type=types.Type.STRING, + description="Filter by status: 'running', 'done', 'failed', 'cancelled', 'timeout'. Omit for all.", + ), + "limit": types.Schema( + type=types.Type.INTEGER, + description="Max agents to return (default 10, max 50).", + ), + }, + ), + ), + types.FunctionDeclaration( + name="agent_cancel", + description=( + "Cancel a running background agent. ADMIN ONLY. Requires confirmation. " + "Use agent_list() to find the agent ID first." + ), + parameters=types.Schema( + type=types.Type.OBJECT, + properties={ + "agent_id": types.Schema( + type=types.Type.STRING, + description="The agent ID to cancel.", + ), + }, + required=["agent_id"], + ), + ), ] diff --git a/documentation/ARCH__FUTURE.md b/documentation/ARCH__FUTURE.md index 0f25895..6731f78 100644 --- a/documentation/ARCH__FUTURE.md +++ b/documentation/ARCH__FUTURE.md @@ -317,6 +317,149 @@ This pattern maps naturally to several existing concepts: --- +## 13. Multi-Level Agent Management + +**Status:** Design complete — implementation not yet started. See `TODO__Agents.md` for the task breakdown. + +Cortex personas can spawn specialized sub-agents to handle parallel or long-running work. +Sub-agents can in turn spawn lightweight support agents for simple subtasks. The hierarchy +is capped at three levels to prevent runaway delegation. + +### Level Definitions + +| Level | Name | Created by | Can spawn | Tool scope | +|---|---|---|---|---| +| **1** | Cortex Persona (Inara) | HTTP request / cron | Level 2 | Full orchestrator tool set | +| **2** | Specialized Sub-Agent | Level 1 `spawn_agent` | Level 3 only | Role-scoped; `spawn_agent` auto-restricted so children are Level 3 | +| **3** | Basic Support Agent | Level 2 `spawn_agent` | Nothing | Narrow tool set; `spawn_agent` and `aider_run` denied | + +**Examples:** +- Level 1 spawns a Level 2 **Coder** agent (has file + git + shell tools; can spawn a Level 3 syntax-checker) +- Level 1 spawns a Level 2 **Research** agent (web tools only; can spawn a Level 3 web reader for parallel page fetches) +- Level 2 spawns a Level 3 **Support** agent for a focused subtask (web_search only, no writes, no further delegation) + +### Core Problem: Everything is Currently Synchronous + +Both `spawn_agent` and `aider_run` block the calling coroutine for their full duration +(default 120s / 300s respectively). Level 1 (Inara) cannot respond to the user, send +notifications, or inspect other agents while waiting. For 5-minute Aider runs or multi-step +research agents this is unusable — the user sees nothing until completion or timeout. + +### Design + +#### 1. Agent Manager (`cortex/agent_manager.py`) + +A lightweight in-process registry of running and recently completed agents. Module-level +dict protected by `asyncio.Lock()`: + +```python +@dataclass +class AgentRecord: + agent_id: str # UUID + level: int # 1 / 2 / 3 + role: str # e.g. "coder", "research" + task: str # first 200 chars of the task + status: str # running / done / failed / cancelled / timeout + started: datetime + finished: datetime | None + parent_id: str | None # lineage — which agent spawned this one + result: str | None # populated on completion (first 500 chars) + notify: bool # fire web_push/NC Talk notification on completion + user: str + +_agents: dict[str, AgentRecord] = {} +_lock = asyncio.Lock() +``` + +On completion, the manager calls `notification.py notify()` if `notify=True` — the same +function used by reminder checks and cron completions. Completed agents stay in the +registry for 24 hours then are pruned on next access. + +#### 2. Background Mode for `spawn_agent` + +Add `background: bool = False` and `notify: bool = False` to `spawn_agent`. When +`background=False` (default): existing synchronous blocking behaviour — unchanged, no +regression. When `background=True`: wraps the run in `asyncio.create_task()`, registers +in the agent manager, returns an `agent_id` string immediately. + +```python +# Level 1 — non-blocking delegation: +agent_id = await spawn_agent( + task="Research Zigbee mesh repeaters; summarize findings to my journal", + role="research", + background=True, + notify=True, # web_push + NC Talk when done +) +# Returns "550e8400-..." immediately. Inara continues responding to the user. +``` + +#### 3. Agent Lifecycle Tools + +Three new tools, wired into `cortex/tools/__init__.py` under the "Agents" category: + +| Tool | Params | Description | +|---|---|---| +| `agent_status(agent_id)` | `agent_id: str` | Status, role, task, elapsed, result preview | +| `agent_list(status=None, limit=10)` | `status: str \| None` | All agents for current user; filter by status | +| `agent_cancel(agent_id)` | `agent_id: str` | Cancel a running background agent (admin, confirm-required) | + +Level 1 can call these between tool rounds to check on delegated work without blocking. + +#### 4. Level Enforcement + +`agent_level` is passed through `spawn_agent` calls as a ContextVar so each agent knows +where it sits in the hierarchy. Enforcement is automatic and simple: + +- **L1 → spawns L2:** `spawn_agent` called normally. Child agent inherits role tools. +- **L2 → spawns L3:** `spawn_agent` automatically adds `deny_tools=["spawn_agent", "aider_run"]` + to the child's effective tool set. Level 3 agents cannot further delegate. +- **Level 3:** `spawn_agent` and `aider_run` are never in the tool list. + +Level is stored in `AgentRecord.level` — the lineage (`parent_id`) provides a full call tree. + +#### 5. `aider_run` Background Mode + +Add `background: bool = False` and `notify: bool = False` to `aider_run`. When `True`, +runs the Aider subprocess via `asyncio.create_task()`, registers in the agent manager, +returns `agent_id` immediately. When called in background mode, `aider_run` is removed +from `CONFIRM_REQUIRED` — the user is not blocking on a confirmation gate since the call +returns instantly. + +```python +# Level 1 or 2 — fire and forget a code change: +agent_id = await aider_run( + project="cortex", + task="Add max_chars param to http_fetch in tools/web.py, cap at 32768", + background=True, + notify=True, +) +``` + +### Implementation Order + +1. **`agent_manager.py`** — AgentRecord + registry CRUD + completion notification hook. + Foundation for everything else; ~100 lines. +2. **`spawn_agent` background mode** — `background` + `notify` + `agent_level` params; + `asyncio.create_task()`; registers in manager. Existing sync path unchanged. +3. **`agent_status` / `agent_list` / `agent_cancel`** — wire into `__init__.py`; add to + `TOOL_CATEGORIES["Agents"]`, `TOOL_ROLES` (cancel = admin), `CONFIRM_REQUIRED` (cancel). +4. **Level enforcement** — `agent_level` ContextVar; auto `deny_tools` at L2→L3 boundary. +5. **`aider_run` background mode** — same pattern as step 2. + +### Files to Create/Modify + +| File | Change | +|---|---| +| `cortex/agent_manager.py` | **New** — AgentRecord, registry dict, start/finish/cancel/list functions | +| `cortex/tools/agents.py` | Add `background`, `notify`, `agent_level` to `spawn_agent`; add `agent_status`, `agent_list`, `agent_cancel` functions + declarations | +| `cortex/tools/aider.py` | Add `background`, `notify` params; register with agent_manager when background | +| `cortex/tools/__init__.py` | Register new agent tools; update TOOL_CATEGORIES, TOOL_ROLES, CONFIRM_REQUIRED | + +See §12 for the existing `allow_tools` / `deny_tools` per-call restrictions that level +enforcement builds on. + +--- + ## 12. Spawner-Level Tool Restrictions — `spawn_agent` Permission Control **Status:** Design complete, not yet built. diff --git a/documentation/MASTER.md b/documentation/MASTER.md index d009f23..bb8cbb8 100644 --- a/documentation/MASTER.md +++ b/documentation/MASTER.md @@ -1,7 +1,7 @@ # Cortex — Master Index > Start here. This document is a map, not a manual. -> Last updated: 2026-05-13 +> Last updated: 2026-06-03 > > **Documentation philosophy:** Cortex is a no-black-box system. Docs must match reality. > Update docs before implementing significant changes. Verify they still match after. @@ -26,7 +26,7 @@ Cortex is a self-hosted personal AI platform. It routes messages from any input | Claude backend | ✅ Live | Primary — via Claude Code CLI | | Gemini backend | ✅ Live | Fallback — via Gemini CLI | | Local backend | ✅ Live | Open WebUI/Ollama on scott_gaming; per-user multi-model config | -| Gemini orchestrator | ✅ Live | Tool loop → Claude response, ⚡ toggle in UI (62 tools) | +| Gemini orchestrator | ✅ Live | Tool loop → Claude response, ⚡ toggle in UI (66 tools) | | Local orchestrator | ✅ Live | OpenAI-compatible ReAct loop; used when orchestrator role → local model | | Model registry V2 | ✅ Live | Providers (Anthropic/Google/Local), multi-account Gemini, role assignments | | Memory distillation | ✅ Live | Short (daily) / Mid (weekly) / Long (monthly) | @@ -38,12 +38,13 @@ Cortex is a self-hosted personal AI platform. It routes messages from any input | Token usage tracking | ✅ Live | Per-user daily buckets in `home/{user}/usage.json`; visible in Settings | | Web push notifications | ✅ Live | VAPID push; `web_push` orchestrator tool; subscribe via ☰ menu | | Proactive notifications | ✅ Live | Daily reminder check (09:00); distill/cron completion alerts; dedicated `/settings/notifications` page | -| Sub-agent spawning | ✅ Live | `spawn_agent` tool — synchronous sub-agents via any configured model | +| Sub-agent spawning | ✅ Live | `spawn_agent` tool — sync or background; `agent_status`/`agent_list`/`agent_cancel`; 3-level hierarchy (L2→L3 enforcement built in) | +| Aider coding agent | ✅ Live | `aider_run` tool — Aider subprocess; model-agnostic (DeepSeek, Ollama, OpenRouter, etc.) | | Agent private notes | ✅ Live | `AGENT_NOTES.md` — orchestrator-only notepad; 3 rolling backups; user-visible as read-only | | Distill safety | ✅ Live | Per-persona asyncio lock, per-endpoint cooldowns, Rebuild option | | Guided onboarding | ✅ Live | Setup Step 3 for OpenRouter; existing-user banner; settings quick-link | -**65 orchestrator tools** across 17 domain modules — added 2026-05-12: `file_diff`, `git_status` / `git_log` / `git_diff` (read-only git inspection), `ae_db_query` / `ae_db_describe` / `ae_db_show_view` (SELECT-only Aether MariaDB access, admin, per-user credentials). `/settings/integrations` page added (admin-only). File attachments in chat (images for vision-capable local models; text/code files for all backends). Settings pages unified under `pg.css`. Added 2026-05-13: `task` cron type (full orchestrator loop on a schedule); monthly/yearly schedule formats (`monthly`, `monthly:DD:HH:MM`, `yearly:MM:DD:HH:MM`); Schedules web UI at `/settings/crons` (list, add, edit, pause, delete); HA inbound webhook tools toggle (orchestrator vs. direct LLM); Anthropic API key backend (`anthropic_api` model type via Anthropic SDK — alternative to CLI OAuth); Cloud APIs catalog in Model Registry — named provider picker (OpenRouter, OpenAI, Groq, X.ai/Grok, Together.ai, Fireworks.ai, Custom) with auto-filled URLs; hosts split into Cloud APIs / Local Hosts sections. Added 2026-05-15: Per-user custom roles — three required roles (`chat`, `orchestrator`, `distill`) are always present; users can add/remove custom roles (e.g. `coder`, `research`) via the Model Registry UI; existing `.env`-defined roles auto-migrated. Settings pages (`local_llm.html` + all settings pages) migrated to Tailwind CSS CDN (no build step); `preflight: false` preserves `pg.css` base styles; `input[type=checkbox/radio]` global width fix in `pg.css`; `btn-submit` now responsive (`w-full md:w-96`). +**69 orchestrator tools** across 17 domain modules — added 2026-06-03: `agent_status`/`agent_list` (user-level)/`agent_cancel` (admin, confirm-required); background mode for `spawn_agent` (`background=True` returns agent_id immediately; `notify=True` sends push on completion); `agent_manager.py` registry with lineage tracking and 24h pruning; L2→L3 level enforcement auto-denies `spawn_agent`/`aider_run` in Level 3 children. Added 2026-05-23: `aider_run` (Aider coding agent subprocess; project aliases for cortex/aether_api/aether_frontend/aether_container; model-agnostic via `.aider.conf.yml` or env vars; admin-only, confirm-required). `.aider.conf.yml` added to project root (read-only context, Python lint-cmd, auto-commits). Added 2026-05-12: `file_diff`, `git_status` / `git_log` / `git_diff` (read-only git inspection), `ae_db_query` / `ae_db_describe` / `ae_db_show_view` (SELECT-only Aether MariaDB access, admin, per-user credentials). `/settings/integrations` page added (admin-only). File attachments in chat (images for vision-capable local models; text/code files for all backends). Settings pages unified under `pg.css`. Added 2026-05-13: `task` cron type (full orchestrator loop on a schedule); monthly/yearly schedule formats (`monthly`, `monthly:DD:HH:MM`, `yearly:MM:DD:HH:MM`); Schedules web UI at `/settings/crons` (list, add, edit, pause, delete); HA inbound webhook tools toggle (orchestrator vs. direct LLM); Anthropic API key backend (`anthropic_api` model type via Anthropic SDK — alternative to CLI OAuth); Cloud APIs catalog in Model Registry — named provider picker (OpenRouter, OpenAI, Groq, X.ai/Grok, Together.ai, Fireworks.ai, Custom) with auto-filled URLs; hosts split into Cloud APIs / Local Hosts sections. Added 2026-05-15: Per-user custom roles — three required roles (`chat`, `orchestrator`, `distill`) are always present; users can add/remove custom roles (e.g. `coder`, `research`) via the Model Registry UI; existing `.env`-defined roles auto-migrated. Settings pages (`local_llm.html` + all settings pages) migrated to Tailwind CSS CDN (no build step); `preflight: false` preserves `pg.css` base styles; `input[type=checkbox/radio]` global width fix in `pg.css`; `btn-submit` now responsive (`w-full md:w-96`). **Active users / personas:** scott/inara, holly/tina, brian/wintermute diff --git a/documentation/ROADMAP.md b/documentation/ROADMAP.md index 0600435..7020631 100644 --- a/documentation/ROADMAP.md +++ b/documentation/ROADMAP.md @@ -48,6 +48,8 @@ - ✅ `http_post` — POST to external URLs with per-user URL prefix allowlist; admin-only, confirm-required - ✅ `nc_talk_history` — read recent NC Talk messages; requires nc_username + nc_app_password in channels.json - ✅ Local orchestrator retry — exponential backoff on 429/5xx/connection errors (3 attempts) +- ✅ Multi-level agent management — `agent_manager.py` (registry + lifecycle), background `spawn_agent`, `agent_status`/`agent_list`/`agent_cancel` tools, 3-level hierarchy enforcement (see `ARCH__FUTURE.md` §13) +- ✅ `aider_run` background mode — background task + push notification on completion; sync path unchanged - [ ] Knowledge import — markdown → AE Journals (import script) - [ ] Dev agent pipeline — specialist agents + supervisor + approval gate - [ ] Gitea webhook integration + Actions CI diff --git a/documentation/TODO__Agents.md b/documentation/TODO__Agents.md index 471ca12..138260a 100644 --- a/documentation/TODO__Agents.md +++ b/documentation/TODO__Agents.md @@ -67,6 +67,57 @@ automatically. Remaining work is quality/reliability parity, not ground-up desig - [x] **`email_send`** — SMTP via email_utils, per-user regex allowlist in `home/{user}/email_allowlist.json`, managed via Settings UI textarea + Files panel raw editor — 2026-04-29 - [x] **`web_push`** — VAPID push via pywebpush; subscriptions in `home/{user}/push_subscriptions.json`; "Enable notifications" toggle in ☰ menu; sw.js push+notificationclick handlers — 2026-05-05 +### [Agents] Multi-Level Agent Management + +Design: `documentation/ARCH__FUTURE.md` §13 + +Three-level hierarchy: Level 1 = Cortex Persona; Level 2 = Specialized Sub-Agent +(can spawn Level 3); Level 3 = Basic Support Agent (cannot spawn). All spawning is +currently synchronous and blocking — this makes long-running agents (Aider, research +pipelines) unusable without freezing the orchestrator. + +**Phase 1 — Foundation (build first):** +- [x] **`cortex/agent_manager.py`** — `AgentRecord` dataclass (agent_id, level, role, + task, status, started, parent_id, result, notify, user); module-level registry dict + with `asyncio.Lock()`; `register()`, `finish()`, `cancel_agent()`, + `list_agents(user, status)` functions; calls `notification.notify()` on completion + when `notify=True`; prune records older than 24 hours on next register — 2026-06-03 +- [x] **Background mode for `spawn_agent`** — added `background: bool = False` and + `notify: bool = False` params; when `background=True`, wraps `_run()` in + `asyncio.create_task()`, registers in agent_manager, returns agent_id immediately; + existing sync path unchanged — 2026-06-03 +- [x] **`agent_status(agent_id)` tool** — returns status, role, task excerpt, elapsed + seconds, result preview (first 300 chars); user-level — 2026-06-03 +- [x] **`agent_list(status=None, limit=10)` tool** — returns running + recent agents for + current user; filter by `status`; user-level — 2026-06-03 +- [x] **`agent_cancel(agent_id)` tool** — cancels background task via stored + `asyncio.Task` reference; admin-only, confirm-required — 2026-06-03 + +**Phase 2 — Level enforcement:** +- [ ] **`agent_level` ContextVar** — set to 1 in the main orchestrators; passed into + `spawn_agent` so sub-agents know their level +- [ ] **Auto-deny at L2→L3 boundary** — when Level 2 calls `spawn_agent`, automatically + add `deny_tools=["spawn_agent", "aider_run"]` so the Level 3 child cannot delegate; + store level in `AgentRecord` for lineage tracking + +**Phase 3 — `aider_run` async:** +- [x] **`aider_run` background mode** — added `background: bool = False` and + `notify: bool = False` params; runs subprocess via `asyncio.create_task()`, registers + in agent_manager, returns agent_id immediately; confirmation still required (correct + — user confirms before the tool runs, not during) — 2026-06-03 +- [x] **Register new tools in `__init__.py`** — `agent_status`, `agent_list`, `agent_cancel` + in `TOOL_CATEGORIES["Agents"]`; `agent_cancel` in `TOOL_ROLES` (admin) and + `CONFIRM_REQUIRED`; added to `_CALLABLES` and `_ALL_DECLARATIONS` — 2026-06-03 + +**Tests:** +- [x] **`cortex/tests/test_agent_manager.py`** — 41 tests covering: agent_manager CRUD, + prune, notify hook, spawn_agent background mode (returns immediately, completes async, + timeout, failure), level enforcement (L1→L2 permits, L2→L3 auto-denies), agent + lifecycle tools output, aider_run background mode — 2026-06-03 + Run: `cd cortex && .venv/bin/python -m pytest tests/test_agent_manager.py -v` + +--- + ### [Tools] Orchestrator tool expansions — Round 2 Next additions identified 2026-05-08. See `ARCH__FUTURE.md` §2 for design notes. @@ -227,11 +278,23 @@ Every orchestrator tool invocation logged to `home/{user}/tool_audit/YYYY-MM-DD. ### [Intelligence] Dev agent pipeline See `ARCH__Intelligence_Layer.md`. Full design not yet started. + +`aider_run` (2026-05-23) provides the execution layer — Cortex dispatches to Aider as +the coding worker. Aider is model-agnostic (DeepSeek, Ollama, OpenRouter, etc.) and +fully scriptable via `--message --yes-always`. This replaces the Claude Code subprocess +dependency for coding tasks. Per-project `.aider.conf.yml` holds read-only context files +and lint commands; model/key come from env vars (not committed). + +- [x] **`aider_run` tool** — `cortex/tools/aider.py`; project aliases + subprocess with `--message --yes-always`; admin-only, confirm-required, high risk — 2026-05-23 +- [ ] **`aider_run` async/notify** — current implementation blocks for up to 5 min with no UI feedback; convert to background task + web_push/NC Talk notification on completion (same pattern as `/orchestrate` jobs); optionally add `aider_status` tool for mid-task polling +- [x] **`.aider.conf.yml`** — project-level Aider config: `read: [CLAUDE.md]`, Python lint-cmd, auto-commits — 2026-05-23 +- [x] **`.gitignore`** — added `.aider.chat.history.md`, `.aider.input.history`, `.aider.llm.history` — 2026-05-23 - [ ] Specialist agent: frontend (SvelteKit) code changes - [ ] Specialist agent: backend (FastAPI) code changes - [ ] Supervisor agent: diff review, syntax check, test runner - [ ] Gitea webhook integration: trigger on push/PR, report back - [ ] Human approval gate before commit +- [ ] `.aider.conf.yml` for aether_api, aether_frontend, aether_container projects ### [Intelligence] Supervisor agent - Runs `py_compile`, `svelte-check`, unit tests after specialist agent work