""" Agent spawning and lifecycle tools. spawn_agent — synchronous or background sub-agent via any configured role model. agent_status / agent_list / agent_cancel — lifecycle management for background agents. Sub-agents run using the model and tools assigned to the given role. The three-level hierarchy (Persona → Specialized → Support) is enforced by denying spawn_agent and aider_run at the L2→L3 boundary — Level 3 agents cannot delegate further. Supported model types for sub-agents: local_openai, gemini_api. claude_cli / gemini_cli are chat-only and do not support tool-enabled sub-agents. """ import asyncio import logging from datetime import datetime from google.genai import types import agent_manager logger = logging.getLogger(__name__) # Per-host semaphores — keyed by "host:" or "type:" # Created lazily on first use; never deleted (module-level singletons) _semaphores: dict[str, asyncio.Semaphore] = {} _sem_lock = asyncio.Lock() # Tools denied at the L2→L3 boundary so Level 3 agents cannot delegate further. _L3_DENY_TOOLS = ["spawn_agent", "aider_run"] async def _get_semaphore(key: str, max_concurrent: int) -> asyncio.Semaphore: """Return (or create) the semaphore for a given host/type key.""" async with _sem_lock: if key not in _semaphores: _semaphores[key] = asyncio.Semaphore(max_concurrent) return _semaphores[key] async def spawn_agent( task: str, role: str = "chat", tier: int = 1, timeout: int = 120, max_rounds: int | None = None, allow_tools: list[str] | None = None, deny_tools: list[str] | None = None, background: bool = False, notify: bool = False, _agent_level: int = 2, ) -> str: """ Spawn a sub-agent to complete a task. In synchronous mode (background=False, the default): blocks until done and returns the result string. In background mode (background=True): registers the agent, fires it as an asyncio background task, and returns an agent_id string immediately. Use agent_status() to poll, or set notify=True to receive a push notification on completion. Level enforcement: this agent (level _agent_level) spawns children at level+1. Children at level 3 automatically have spawn_agent and aider_run denied so they cannot delegate further. """ import model_registry from context_loader import load_context from auth_utils import get_user_role, get_tool_policy from persona import get_user user = get_user() or "scott" role_cfg = model_registry.get_role_config(user, role) model_cfg = model_registry.get_model_for_role(user, role) if not model_cfg: return f"spawn_agent: no model configured for role '{role}'" model_type = model_cfg.get("type", "unknown") if model_type not in ("local_openai", "gemini_api"): return ( f"spawn_agent: model type '{model_type}' does not support tool-enabled sub-agents. " f"Assign a local_openai or gemini_api model to role '{role}'." ) # Determine concurrency key and semaphore limit host_id = model_cfg.get("host_id") if host_id: registry = model_registry.get_registry(user) host = next((h for h in registry.get("hosts", []) if h["id"] == host_id), None) max_concurrent = (host or {}).get("max_concurrent", 3) sem_key = f"host:{host_id}" else: max_concurrent = 5 if model_type == "gemini_api" else 3 sem_key = f"type:{model_type}" sem = await _get_semaphore(sem_key, max_concurrent) system_prompt = load_context( tier=tier, include_long=(tier >= 2), include_mid=(tier >= 2), include_short=(tier >= 2), role_append=role_cfg.get("system_append", ""), inject_datetime=role_cfg.get("inject_datetime", True), ) user_role = get_user_role(user) tool_list = role_cfg.get("tools") policy = get_tool_policy(user) confirm_allow = set(policy.get("allow", [])) confirm_deny = set(policy.get("deny", [])) # Per-call tool restrictions — role config remains the authoritative ceiling if allow_tools is not None: if tool_list is not None: tool_list = [t for t in tool_list if t in allow_tools] else: tool_list = list(allow_tools) if deny_tools is not None: deny_set = set(deny_tools) if tool_list is not None: tool_list = [t for t in tool_list if t not in deny_set] else: confirm_deny = confirm_deny | deny_set # Level enforcement: children of this agent are at level _agent_level + 1. # Level 3 children cannot delegate — auto-deny the spawning tools. child_level = _agent_level + 1 if child_level >= 3: l3_deny = set(_L3_DENY_TOOLS) if tool_list is not None: tool_list = [t for t in tool_list if t not in l3_deny] else: confirm_deny = confirm_deny | l3_deny if max_rounds is not None: model_cfg = dict(model_cfg) model_cfg["max_rounds"] = max_rounds async def _run() -> str: if model_type == "local_openai": import openai_orchestrator result = await openai_orchestrator.run( task=task, system_prompt=system_prompt, model_cfg=model_cfg, respond_with_final=True, user_role=user_role, tool_list=tool_list, confirm_allow=confirm_allow, confirm_deny=confirm_deny, ) if result.checkpoint: return ( "Sub-agent requires user confirmation — " "confirmation gates are not supported inside spawn_agent. " "Pre-allow the tool in the user's tool policy or use a different role." ) return result.response or "(sub-agent returned no output)" # gemini_api import orchestrator_engine from auth_utils import get_user_gemini_key gemini_key = model_cfg.get("api_key") or get_user_gemini_key(user) result = await orchestrator_engine.run( task=task, system_prompt=system_prompt, session_messages=None, respond_with_claude=True, gemini_api_key=gemini_key, model_name=model_cfg.get("model_name"), response_role=role, user_role=user_role, tool_list=tool_list, confirm_allow=confirm_allow, confirm_deny=confirm_deny, max_rounds=model_cfg.get("max_rounds"), ) if result.checkpoint: return ( "Sub-agent requires user confirmation — " "confirmation gates are not supported inside spawn_agent." ) return result.response or "(sub-agent returned no output)" if background: rec = await agent_manager.register( user=user, role=role, task=task, level=_agent_level, notify=notify, ) async def _bg_task() -> None: async with sem: try: logger.info( "spawn_agent [bg]: %s role=%s level=%d timeout=%ds", rec.agent_id[:8], role, _agent_level, timeout, ) result = await asyncio.wait_for(_run(), timeout=float(timeout)) await agent_manager.finish(rec.agent_id, result, "done") logger.info("spawn_agent [bg]: done %s", rec.agent_id[:8]) except asyncio.CancelledError: await agent_manager.finish(rec.agent_id, "Cancelled.", "cancelled") raise except asyncio.TimeoutError: msg = f"Sub-agent timed out after {timeout}s (role={role})" logger.warning("spawn_agent [bg]: timeout %s", rec.agent_id[:8]) await agent_manager.finish(rec.agent_id, msg, "timeout") except Exception as e: logger.exception("spawn_agent [bg]: failed %s", rec.agent_id[:8]) await agent_manager.finish(rec.agent_id, str(e), "failed") bg = asyncio.create_task(_bg_task()) agent_manager.set_task_ref(rec.agent_id, bg) return f"Agent started in background. ID: {rec.agent_id}\nUse agent_status('{rec.agent_id}') to check progress." # Synchronous path — unchanged behaviour async with sem: try: logger.info( "spawn_agent: role=%s tier=%d timeout=%ds task=%.80s", role, tier, timeout, task, ) response = await asyncio.wait_for(_run(), timeout=float(timeout)) logger.info("spawn_agent: done role=%s response=%d chars", role, len(response)) return response except asyncio.TimeoutError: logger.warning("spawn_agent: timed out after %ds role=%s", timeout, role) return f"Sub-agent timed out after {timeout}s (role={role})" except Exception as e: logger.exception("spawn_agent: failed role=%s", role) return f"Sub-agent error ({role}): {e}" # ── Agent lifecycle tools ───────────────────────────────────────────────────── async def agent_status(agent_id: str) -> str: """Return the status and result preview of a background agent.""" from persona import get_user user = get_user() or "unknown" rec = agent_manager.get(agent_id) if not rec: return f"No agent found with ID: {agent_id}" if rec.user != user: return "Access denied." now = datetime.now() end = rec.finished or now elapsed = int((end - rec.started).total_seconds()) lines = [ f"Agent {rec.agent_id[:8]}…", f" Status: {rec.status}", f" Role: {rec.role} (Level {rec.level})", f" Elapsed: {elapsed}s", f" Started: {rec.started.strftime('%Y-%m-%d %H:%M:%S')}", f" Task: {rec.task}", ] if rec.parent_id: lines.append(f" Parent: {rec.parent_id[:8]}…") if rec.result is not None: lines.append(f" Result: {rec.result[:300]}") return "\n".join(lines) async def agent_list(status: str | None = None, limit: int = 10) -> str: """List background agents for the current user.""" from persona import get_user user = get_user() or "unknown" limit = min(max(int(limit), 1), 50) records = agent_manager.list_agents(user, status=status, limit=limit) if not records: suffix = f" (filter: status={status})" if status else "" return f"No agents found.{suffix}" now = datetime.now() lines = [] for rec in records: end = rec.finished or now elapsed = int((end - rec.started).total_seconds()) preview = rec.task[:60].replace("\n", " ") result_hint = f" → {rec.result[:50]}" if rec.result else "" lines.append( f"[{rec.agent_id[:8]}] {rec.status:<10s} L{rec.level} " f"{rec.role:<12s} {elapsed:>5}s {preview}{result_hint}" ) header = f"{len(records)} agent(s)" + (f" (status={status})" if status else "") + ":" return header + "\n" + "\n".join(lines) async def agent_cancel(agent_id: str) -> str: """Cancel a running background agent.""" from persona import get_user user = get_user() or "unknown" return await agent_manager.cancel_agent(agent_id, user) # ── Declarations ────────────────────────────────────────────────────────────── DECLARATIONS = [ types.FunctionDeclaration( name="spawn_agent", description=( "Spawn a sub-agent to complete a task. " "In synchronous mode (default): blocks until the sub-agent finishes and returns its response. " "In background mode (background=True): fires the agent asynchronously and returns an agent_id " "immediately — use agent_status() to check progress or set notify=True for a completion alert. " "The sub-agent uses the model and tool set assigned to the given role. " "Use for processing pipelines, parallel analysis, or delegating specialized work " "(research, coding, data migration, etc.)." ), parameters=types.Schema( type=types.Type.OBJECT, properties={ "task": types.Schema( type=types.Type.STRING, description="The complete task description for the sub-agent.", ), "role": types.Schema( type=types.Type.STRING, description=( "Role determining the model and tools. " "E.g. 'research' for web lookups, 'coder' for code tasks, " "'distill' for summarization. Defaults to 'chat'." ), ), "tier": types.Schema( type=types.Type.INTEGER, description=( "Context tier: 1 = minimal (fast, identity only), " "2 = standard (+ memory), 3 = + last 2 session logs. " "Use 1 for pure processing tasks." ), ), "timeout": types.Schema( type=types.Type.INTEGER, description="Max seconds to wait (default 120). Applies in both sync and background mode.", ), "max_rounds": types.Schema( type=types.Type.INTEGER, description="Override max tool-loop iterations for this call.", ), "allow_tools": types.Schema( type=types.Type.ARRAY, items=types.Schema(type=types.Type.STRING), description=( "Restrict the sub-agent to only these tools. " "Intersected with the role's tool set — cannot grant more than the role allows. " "Example: ['web_search', 'web_read'] for a pure research agent." ), ), "deny_tools": types.Schema( type=types.Type.ARRAY, items=types.Schema(type=types.Type.STRING), description=( "Block these tools from the sub-agent regardless of role config. " "Example: ['shell_exec', 'file_write', 'cortex_restart']." ), ), "background": types.Schema( type=types.Type.BOOLEAN, description=( "Run asynchronously in the background (default: false). " "When true, returns an agent_id immediately instead of blocking for the result. " "Use agent_status(agent_id) to check progress. " "Best for tasks that take more than ~30 seconds." ), ), "notify": types.Schema( type=types.Type.BOOLEAN, description=( "Send a push/Talk notification when the background agent completes (default: false). " "Only meaningful when background=true." ), ), }, required=["task"], ), ), types.FunctionDeclaration( name="agent_status", description=( "Get the current status of a background agent by ID. " "Returns status (running/done/failed/cancelled/timeout), role, elapsed time, " "task description, and result preview." ), parameters=types.Schema( type=types.Type.OBJECT, properties={ "agent_id": types.Schema( type=types.Type.STRING, description="The agent ID returned by spawn_agent(background=True) or aider_run(background=True).", ), }, required=["agent_id"], ), ), types.FunctionDeclaration( name="agent_list", description=( "List background agents for the current user. " "Returns recent agents with ID, status, role, level, elapsed time, and task preview. " "Use to survey what's running or recently completed." ), parameters=types.Schema( type=types.Type.OBJECT, properties={ "status": types.Schema( type=types.Type.STRING, description="Filter by status: 'running', 'done', 'failed', 'cancelled', 'timeout'. Omit for all.", ), "limit": types.Schema( type=types.Type.INTEGER, description="Max agents to return (default 10, max 50).", ), }, ), ), types.FunctionDeclaration( name="agent_cancel", description=( "Cancel a running background agent. ADMIN ONLY. Requires confirmation. " "Use agent_list() to find the agent ID first." ), parameters=types.Schema( type=types.Type.OBJECT, properties={ "agent_id": types.Schema( type=types.Type.STRING, description="The agent ID to cancel.", ), }, required=["agent_id"], ), ), ]