agent_manager.py (new): - AgentRecord dataclass: agent_id, level (1/2/3), role, task, status, started, parent_id (lineage), finished, result, notify, _task_ref - register() / finish() / cancel_agent() / list_agents() / get() / set_task_ref() - Calls notification.notify() on completion when notify=True (same channel as reminders and cron completions) - 24-hour pruning of completed records on each new registration spawn_agent (tools/agents.py): - background=True: fires asyncio.create_task(), registers in agent_manager, returns agent_id string immediately — sync path unchanged (no regression) - notify=True: push/Talk notification when the background task completes - Level enforcement: _agent_level param tracks hierarchy depth; when spawning from Level 2, child automatically gets spawn_agent + aider_run denied so Level 3 agents cannot delegate further New lifecycle tools (tools/agents.py + __init__.py): - agent_status(agent_id) — status, role, level, elapsed, task, result preview; user-level - agent_list(status, limit) — all agents for current user, newest first; user-level - agent_cancel(agent_id) — kills background task; admin-only, confirm-required tests/test_agent_manager.py (new, 41 tests): - agent_manager CRUD, pruning, notification hook - spawn_agent background: returns immediately, completes async, timeout, failure - Level enforcement: L1→L2 permits spawn, L2→L3 auto-denies; explicit tool_list path - agent_status / agent_list / agent_cancel output formatting - aider_run background: returns agent_id, completes async, sync path unchanged - All tests run without browser or Cortex service (~2.5s total) Run: cd cortex && .venv/bin/python -m pytest tests/test_agent_manager.py -v Docs: ARCH__FUTURE.md §13 (full design), ROADMAP.md, TODO__Agents.md, MASTER.md, HELP.md (orchestrator description corrected, tool schema line updated to reflect keyword routing), CLAUDE.md tool count 66→69. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
447 lines
18 KiB
Python
447 lines
18 KiB
Python
"""
|
|
Agent spawning and lifecycle tools.
|
|
|
|
spawn_agent — synchronous or background sub-agent via any configured role model.
|
|
agent_status / agent_list / agent_cancel — lifecycle management for background agents.
|
|
|
|
Sub-agents run using the model and tools assigned to the given role. The three-level
|
|
hierarchy (Persona → Specialized → Support) is enforced by denying spawn_agent and
|
|
aider_run at the L2→L3 boundary — Level 3 agents cannot delegate further.
|
|
|
|
Supported model types for sub-agents: local_openai, gemini_api.
|
|
claude_cli / gemini_cli are chat-only and do not support tool-enabled sub-agents.
|
|
"""
|
|
|
|
import asyncio
|
|
import logging
|
|
from datetime import datetime
|
|
|
|
from google.genai import types
|
|
|
|
import agent_manager
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Per-host semaphores — keyed by "host:<host_id>" or "type:<model_type>"
|
|
# Created lazily on first use; never deleted (module-level singletons)
|
|
_semaphores: dict[str, asyncio.Semaphore] = {}
|
|
_sem_lock = asyncio.Lock()
|
|
|
|
# Tools denied at the L2→L3 boundary so Level 3 agents cannot delegate further.
|
|
_L3_DENY_TOOLS = ["spawn_agent", "aider_run"]
|
|
|
|
|
|
async def _get_semaphore(key: str, max_concurrent: int) -> asyncio.Semaphore:
|
|
"""Return (or create) the semaphore for a given host/type key."""
|
|
async with _sem_lock:
|
|
if key not in _semaphores:
|
|
_semaphores[key] = asyncio.Semaphore(max_concurrent)
|
|
return _semaphores[key]
|
|
|
|
|
|
async def spawn_agent(
|
|
task: str,
|
|
role: str = "chat",
|
|
tier: int = 1,
|
|
timeout: int = 120,
|
|
max_rounds: int | None = None,
|
|
allow_tools: list[str] | None = None,
|
|
deny_tools: list[str] | None = None,
|
|
background: bool = False,
|
|
notify: bool = False,
|
|
_agent_level: int = 2,
|
|
) -> str:
|
|
"""
|
|
Spawn a sub-agent to complete a task.
|
|
|
|
In synchronous mode (background=False, the default): blocks until done and returns
|
|
the result string.
|
|
|
|
In background mode (background=True): registers the agent, fires it as an asyncio
|
|
background task, and returns an agent_id string immediately. Use agent_status() to
|
|
poll, or set notify=True to receive a push notification on completion.
|
|
|
|
Level enforcement: this agent (level _agent_level) spawns children at level+1.
|
|
Children at level 3 automatically have spawn_agent and aider_run denied so they
|
|
cannot delegate further.
|
|
"""
|
|
import model_registry
|
|
from context_loader import load_context
|
|
from auth_utils import get_user_role, get_tool_policy
|
|
from persona import get_user
|
|
|
|
user = get_user() or "scott"
|
|
|
|
role_cfg = model_registry.get_role_config(user, role)
|
|
model_cfg = model_registry.get_model_for_role(user, role)
|
|
|
|
if not model_cfg:
|
|
return f"spawn_agent: no model configured for role '{role}'"
|
|
|
|
model_type = model_cfg.get("type", "unknown")
|
|
|
|
if model_type not in ("local_openai", "gemini_api"):
|
|
return (
|
|
f"spawn_agent: model type '{model_type}' does not support tool-enabled sub-agents. "
|
|
f"Assign a local_openai or gemini_api model to role '{role}'."
|
|
)
|
|
|
|
# Determine concurrency key and semaphore limit
|
|
host_id = model_cfg.get("host_id")
|
|
if host_id:
|
|
registry = model_registry.get_registry(user)
|
|
host = next((h for h in registry.get("hosts", []) if h["id"] == host_id), None)
|
|
max_concurrent = (host or {}).get("max_concurrent", 3)
|
|
sem_key = f"host:{host_id}"
|
|
else:
|
|
max_concurrent = 5 if model_type == "gemini_api" else 3
|
|
sem_key = f"type:{model_type}"
|
|
|
|
sem = await _get_semaphore(sem_key, max_concurrent)
|
|
|
|
system_prompt = load_context(
|
|
tier=tier,
|
|
include_long=(tier >= 2),
|
|
include_mid=(tier >= 2),
|
|
include_short=(tier >= 2),
|
|
role_append=role_cfg.get("system_append", ""),
|
|
inject_datetime=role_cfg.get("inject_datetime", True),
|
|
)
|
|
|
|
user_role = get_user_role(user)
|
|
tool_list = role_cfg.get("tools")
|
|
policy = get_tool_policy(user)
|
|
confirm_allow = set(policy.get("allow", []))
|
|
confirm_deny = set(policy.get("deny", []))
|
|
|
|
# Per-call tool restrictions — role config remains the authoritative ceiling
|
|
if allow_tools is not None:
|
|
if tool_list is not None:
|
|
tool_list = [t for t in tool_list if t in allow_tools]
|
|
else:
|
|
tool_list = list(allow_tools)
|
|
|
|
if deny_tools is not None:
|
|
deny_set = set(deny_tools)
|
|
if tool_list is not None:
|
|
tool_list = [t for t in tool_list if t not in deny_set]
|
|
else:
|
|
confirm_deny = confirm_deny | deny_set
|
|
|
|
# Level enforcement: children of this agent are at level _agent_level + 1.
|
|
# Level 3 children cannot delegate — auto-deny the spawning tools.
|
|
child_level = _agent_level + 1
|
|
if child_level >= 3:
|
|
l3_deny = set(_L3_DENY_TOOLS)
|
|
if tool_list is not None:
|
|
tool_list = [t for t in tool_list if t not in l3_deny]
|
|
else:
|
|
confirm_deny = confirm_deny | l3_deny
|
|
|
|
if max_rounds is not None:
|
|
model_cfg = dict(model_cfg)
|
|
model_cfg["max_rounds"] = max_rounds
|
|
|
|
async def _run() -> str:
|
|
if model_type == "local_openai":
|
|
import openai_orchestrator
|
|
result = await openai_orchestrator.run(
|
|
task=task,
|
|
system_prompt=system_prompt,
|
|
model_cfg=model_cfg,
|
|
respond_with_final=True,
|
|
user_role=user_role,
|
|
tool_list=tool_list,
|
|
confirm_allow=confirm_allow,
|
|
confirm_deny=confirm_deny,
|
|
)
|
|
if result.checkpoint:
|
|
return (
|
|
"Sub-agent requires user confirmation — "
|
|
"confirmation gates are not supported inside spawn_agent. "
|
|
"Pre-allow the tool in the user's tool policy or use a different role."
|
|
)
|
|
return result.response or "(sub-agent returned no output)"
|
|
|
|
# gemini_api
|
|
import orchestrator_engine
|
|
from auth_utils import get_user_gemini_key
|
|
gemini_key = model_cfg.get("api_key") or get_user_gemini_key(user)
|
|
result = await orchestrator_engine.run(
|
|
task=task,
|
|
system_prompt=system_prompt,
|
|
session_messages=None,
|
|
respond_with_claude=True,
|
|
gemini_api_key=gemini_key,
|
|
model_name=model_cfg.get("model_name"),
|
|
response_role=role,
|
|
user_role=user_role,
|
|
tool_list=tool_list,
|
|
confirm_allow=confirm_allow,
|
|
confirm_deny=confirm_deny,
|
|
max_rounds=model_cfg.get("max_rounds"),
|
|
)
|
|
if result.checkpoint:
|
|
return (
|
|
"Sub-agent requires user confirmation — "
|
|
"confirmation gates are not supported inside spawn_agent."
|
|
)
|
|
return result.response or "(sub-agent returned no output)"
|
|
|
|
if background:
|
|
rec = await agent_manager.register(
|
|
user=user,
|
|
role=role,
|
|
task=task,
|
|
level=_agent_level,
|
|
notify=notify,
|
|
)
|
|
|
|
async def _bg_task() -> None:
|
|
async with sem:
|
|
try:
|
|
logger.info(
|
|
"spawn_agent [bg]: %s role=%s level=%d timeout=%ds",
|
|
rec.agent_id[:8], role, _agent_level, timeout,
|
|
)
|
|
result = await asyncio.wait_for(_run(), timeout=float(timeout))
|
|
await agent_manager.finish(rec.agent_id, result, "done")
|
|
logger.info("spawn_agent [bg]: done %s", rec.agent_id[:8])
|
|
except asyncio.CancelledError:
|
|
await agent_manager.finish(rec.agent_id, "Cancelled.", "cancelled")
|
|
raise
|
|
except asyncio.TimeoutError:
|
|
msg = f"Sub-agent timed out after {timeout}s (role={role})"
|
|
logger.warning("spawn_agent [bg]: timeout %s", rec.agent_id[:8])
|
|
await agent_manager.finish(rec.agent_id, msg, "timeout")
|
|
except Exception as e:
|
|
logger.exception("spawn_agent [bg]: failed %s", rec.agent_id[:8])
|
|
await agent_manager.finish(rec.agent_id, str(e), "failed")
|
|
|
|
bg = asyncio.create_task(_bg_task())
|
|
agent_manager.set_task_ref(rec.agent_id, bg)
|
|
return f"Agent started in background. ID: {rec.agent_id}\nUse agent_status('{rec.agent_id}') to check progress."
|
|
|
|
# Synchronous path — unchanged behaviour
|
|
async with sem:
|
|
try:
|
|
logger.info(
|
|
"spawn_agent: role=%s tier=%d timeout=%ds task=%.80s",
|
|
role, tier, timeout, task,
|
|
)
|
|
response = await asyncio.wait_for(_run(), timeout=float(timeout))
|
|
logger.info("spawn_agent: done role=%s response=%d chars", role, len(response))
|
|
return response
|
|
except asyncio.TimeoutError:
|
|
logger.warning("spawn_agent: timed out after %ds role=%s", timeout, role)
|
|
return f"Sub-agent timed out after {timeout}s (role={role})"
|
|
except Exception as e:
|
|
logger.exception("spawn_agent: failed role=%s", role)
|
|
return f"Sub-agent error ({role}): {e}"
|
|
|
|
|
|
# ── Agent lifecycle tools ─────────────────────────────────────────────────────
|
|
|
|
async def agent_status(agent_id: str) -> str:
|
|
"""Return the status and result preview of a background agent."""
|
|
from persona import get_user
|
|
user = get_user() or "unknown"
|
|
rec = agent_manager.get(agent_id)
|
|
if not rec:
|
|
return f"No agent found with ID: {agent_id}"
|
|
if rec.user != user:
|
|
return "Access denied."
|
|
|
|
now = datetime.now()
|
|
end = rec.finished or now
|
|
elapsed = int((end - rec.started).total_seconds())
|
|
|
|
lines = [
|
|
f"Agent {rec.agent_id[:8]}…",
|
|
f" Status: {rec.status}",
|
|
f" Role: {rec.role} (Level {rec.level})",
|
|
f" Elapsed: {elapsed}s",
|
|
f" Started: {rec.started.strftime('%Y-%m-%d %H:%M:%S')}",
|
|
f" Task: {rec.task}",
|
|
]
|
|
if rec.parent_id:
|
|
lines.append(f" Parent: {rec.parent_id[:8]}…")
|
|
if rec.result is not None:
|
|
lines.append(f" Result: {rec.result[:300]}")
|
|
return "\n".join(lines)
|
|
|
|
|
|
async def agent_list(status: str | None = None, limit: int = 10) -> str:
|
|
"""List background agents for the current user."""
|
|
from persona import get_user
|
|
user = get_user() or "unknown"
|
|
limit = min(max(int(limit), 1), 50)
|
|
records = agent_manager.list_agents(user, status=status, limit=limit)
|
|
|
|
if not records:
|
|
suffix = f" (filter: status={status})" if status else ""
|
|
return f"No agents found.{suffix}"
|
|
|
|
now = datetime.now()
|
|
lines = []
|
|
for rec in records:
|
|
end = rec.finished or now
|
|
elapsed = int((end - rec.started).total_seconds())
|
|
preview = rec.task[:60].replace("\n", " ")
|
|
result_hint = f" → {rec.result[:50]}" if rec.result else ""
|
|
lines.append(
|
|
f"[{rec.agent_id[:8]}] {rec.status:<10s} L{rec.level} "
|
|
f"{rec.role:<12s} {elapsed:>5}s {preview}{result_hint}"
|
|
)
|
|
|
|
header = f"{len(records)} agent(s)" + (f" (status={status})" if status else "") + ":"
|
|
return header + "\n" + "\n".join(lines)
|
|
|
|
|
|
async def agent_cancel(agent_id: str) -> str:
|
|
"""Cancel a running background agent."""
|
|
from persona import get_user
|
|
user = get_user() or "unknown"
|
|
return await agent_manager.cancel_agent(agent_id, user)
|
|
|
|
|
|
# ── Declarations ──────────────────────────────────────────────────────────────
|
|
|
|
DECLARATIONS = [
|
|
types.FunctionDeclaration(
|
|
name="spawn_agent",
|
|
description=(
|
|
"Spawn a sub-agent to complete a task. "
|
|
"In synchronous mode (default): blocks until the sub-agent finishes and returns its response. "
|
|
"In background mode (background=True): fires the agent asynchronously and returns an agent_id "
|
|
"immediately — use agent_status() to check progress or set notify=True for a completion alert. "
|
|
"The sub-agent uses the model and tool set assigned to the given role. "
|
|
"Use for processing pipelines, parallel analysis, or delegating specialized work "
|
|
"(research, coding, data migration, etc.)."
|
|
),
|
|
parameters=types.Schema(
|
|
type=types.Type.OBJECT,
|
|
properties={
|
|
"task": types.Schema(
|
|
type=types.Type.STRING,
|
|
description="The complete task description for the sub-agent.",
|
|
),
|
|
"role": types.Schema(
|
|
type=types.Type.STRING,
|
|
description=(
|
|
"Role determining the model and tools. "
|
|
"E.g. 'research' for web lookups, 'coder' for code tasks, "
|
|
"'distill' for summarization. Defaults to 'chat'."
|
|
),
|
|
),
|
|
"tier": types.Schema(
|
|
type=types.Type.INTEGER,
|
|
description=(
|
|
"Context tier: 1 = minimal (fast, identity only), "
|
|
"2 = standard (+ memory), 3 = + last 2 session logs. "
|
|
"Use 1 for pure processing tasks."
|
|
),
|
|
),
|
|
"timeout": types.Schema(
|
|
type=types.Type.INTEGER,
|
|
description="Max seconds to wait (default 120). Applies in both sync and background mode.",
|
|
),
|
|
"max_rounds": types.Schema(
|
|
type=types.Type.INTEGER,
|
|
description="Override max tool-loop iterations for this call.",
|
|
),
|
|
"allow_tools": types.Schema(
|
|
type=types.Type.ARRAY,
|
|
items=types.Schema(type=types.Type.STRING),
|
|
description=(
|
|
"Restrict the sub-agent to only these tools. "
|
|
"Intersected with the role's tool set — cannot grant more than the role allows. "
|
|
"Example: ['web_search', 'web_read'] for a pure research agent."
|
|
),
|
|
),
|
|
"deny_tools": types.Schema(
|
|
type=types.Type.ARRAY,
|
|
items=types.Schema(type=types.Type.STRING),
|
|
description=(
|
|
"Block these tools from the sub-agent regardless of role config. "
|
|
"Example: ['shell_exec', 'file_write', 'cortex_restart']."
|
|
),
|
|
),
|
|
"background": types.Schema(
|
|
type=types.Type.BOOLEAN,
|
|
description=(
|
|
"Run asynchronously in the background (default: false). "
|
|
"When true, returns an agent_id immediately instead of blocking for the result. "
|
|
"Use agent_status(agent_id) to check progress. "
|
|
"Best for tasks that take more than ~30 seconds."
|
|
),
|
|
),
|
|
"notify": types.Schema(
|
|
type=types.Type.BOOLEAN,
|
|
description=(
|
|
"Send a push/Talk notification when the background agent completes (default: false). "
|
|
"Only meaningful when background=true."
|
|
),
|
|
),
|
|
},
|
|
required=["task"],
|
|
),
|
|
),
|
|
types.FunctionDeclaration(
|
|
name="agent_status",
|
|
description=(
|
|
"Get the current status of a background agent by ID. "
|
|
"Returns status (running/done/failed/cancelled/timeout), role, elapsed time, "
|
|
"task description, and result preview."
|
|
),
|
|
parameters=types.Schema(
|
|
type=types.Type.OBJECT,
|
|
properties={
|
|
"agent_id": types.Schema(
|
|
type=types.Type.STRING,
|
|
description="The agent ID returned by spawn_agent(background=True) or aider_run(background=True).",
|
|
),
|
|
},
|
|
required=["agent_id"],
|
|
),
|
|
),
|
|
types.FunctionDeclaration(
|
|
name="agent_list",
|
|
description=(
|
|
"List background agents for the current user. "
|
|
"Returns recent agents with ID, status, role, level, elapsed time, and task preview. "
|
|
"Use to survey what's running or recently completed."
|
|
),
|
|
parameters=types.Schema(
|
|
type=types.Type.OBJECT,
|
|
properties={
|
|
"status": types.Schema(
|
|
type=types.Type.STRING,
|
|
description="Filter by status: 'running', 'done', 'failed', 'cancelled', 'timeout'. Omit for all.",
|
|
),
|
|
"limit": types.Schema(
|
|
type=types.Type.INTEGER,
|
|
description="Max agents to return (default 10, max 50).",
|
|
),
|
|
},
|
|
),
|
|
),
|
|
types.FunctionDeclaration(
|
|
name="agent_cancel",
|
|
description=(
|
|
"Cancel a running background agent. ADMIN ONLY. Requires confirmation. "
|
|
"Use agent_list() to find the agent ID first."
|
|
),
|
|
parameters=types.Schema(
|
|
type=types.Type.OBJECT,
|
|
properties={
|
|
"agent_id": types.Schema(
|
|
type=types.Type.STRING,
|
|
description="The agent ID to cancel.",
|
|
),
|
|
},
|
|
required=["agent_id"],
|
|
),
|
|
),
|
|
]
|