""" Agent spawning tool — lets the orchestrator launch sub-agents synchronously. Sub-agents run using the model assigned to the specified role. The call blocks until the sub-agent completes or times out. Supported model types: local_openai, gemini_api. claude_cli / gemini_cli are chat-only and do not support sub-agent tool loops. """ import asyncio import logging from google.genai import types logger = logging.getLogger(__name__) # Per-host semaphores — keyed by "host:" or "type:" # Created lazily on first use; never deleted (module-level singletons) _semaphores: dict[str, asyncio.Semaphore] = {} _sem_lock = asyncio.Lock() async def _get_semaphore(key: str, max_concurrent: int) -> asyncio.Semaphore: """Return (or create) the semaphore for a given host/type key.""" async with _sem_lock: if key not in _semaphores: _semaphores[key] = asyncio.Semaphore(max_concurrent) return _semaphores[key] async def spawn_agent( task: str, role: str = "chat", tier: int = 1, timeout: int = 120, max_rounds: int | None = None, ) -> str: """ Spawn a sub-agent to complete a task synchronously. The sub-agent uses the model and tools assigned to the given role. Returns the sub-agent's response as a string. """ import model_registry from context_loader import load_context from auth_utils import get_user_role, get_tool_policy from persona import get_user user = get_user() or "scott" role_cfg = model_registry.get_role_config(user, role) model_cfg = model_registry.get_model_for_role(user, role) if not model_cfg: return f"spawn_agent: no model configured for role '{role}'" model_type = model_cfg.get("type", "unknown") if model_type not in ("local_openai", "gemini_api"): return ( f"spawn_agent: model type '{model_type}' does not support tool-enabled sub-agents. " f"Assign a local_openai or gemini_api model to role '{role}'." ) # Determine concurrency key and semaphore limit host_id = model_cfg.get("host_id") if host_id: registry = model_registry.get_registry(user) host = next((h for h in registry.get("hosts", []) if h["id"] == host_id), None) max_concurrent = (host or {}).get("max_concurrent", 3) sem_key = f"host:{host_id}" else: max_concurrent = 5 if model_type == "gemini_api" else 3 sem_key = f"type:{model_type}" sem = await _get_semaphore(sem_key, max_concurrent) system_prompt = load_context( tier=tier, include_long=(tier >= 2), include_mid=(tier >= 2), include_short=(tier >= 2), role_append=role_cfg.get("system_append", ""), inject_datetime=role_cfg.get("inject_datetime", True), ) user_role = get_user_role(user) tool_list = role_cfg.get("tools") policy = get_tool_policy(user) confirm_allow = set(policy.get("allow", [])) confirm_deny = set(policy.get("deny", [])) if max_rounds is not None: model_cfg = dict(model_cfg) model_cfg["max_rounds"] = max_rounds async def _run() -> str: if model_type == "local_openai": import openai_orchestrator result = await openai_orchestrator.run( task=task, system_prompt=system_prompt, model_cfg=model_cfg, respond_with_final=True, user_role=user_role, tool_list=tool_list, confirm_allow=confirm_allow, confirm_deny=confirm_deny, ) if result.checkpoint: return ( "Sub-agent requires user confirmation — " "confirmation gates are not supported inside spawn_agent. " "Pre-allow the tool in the user's tool policy or use a different role." ) return result.response or "(sub-agent returned no output)" # gemini_api import orchestrator_engine from auth_utils import get_user_gemini_key gemini_key = model_cfg.get("api_key") or get_user_gemini_key(user) result = await orchestrator_engine.run( task=task, system_prompt=system_prompt, session_messages=None, respond_with_claude=True, gemini_api_key=gemini_key, model_name=model_cfg.get("model_name"), response_role=role, user_role=user_role, tool_list=tool_list, confirm_allow=confirm_allow, confirm_deny=confirm_deny, ) if result.checkpoint: return ( "Sub-agent requires user confirmation — " "confirmation gates are not supported inside spawn_agent." ) return result.response or "(sub-agent returned no output)" async with sem: try: logger.info( "spawn_agent: role=%s tier=%d timeout=%ds task=%.80s", role, tier, timeout, task, ) response = await asyncio.wait_for(_run(), timeout=float(timeout)) logger.info("spawn_agent: done role=%s response=%d chars", role, len(response)) return response except asyncio.TimeoutError: logger.warning("spawn_agent: timed out after %ds role=%s", timeout, role) return f"Sub-agent timed out after {timeout}s (role={role})" except Exception as e: logger.exception("spawn_agent: failed role=%s", role) return f"Sub-agent error ({role}): {e}" DECLARATIONS = [ types.FunctionDeclaration( name="spawn_agent", description=( "Spawn a sub-agent to complete a task synchronously. " "The sub-agent uses the model and tool set assigned to the given role. " "Use for processing pipelines, parallel analysis, or delegating " "specialized work (research, coding, data migration, etc.)." ), parameters=types.Schema( type=types.Type.OBJECT, properties={ "task": types.Schema( type=types.Type.STRING, description="The complete task description for the sub-agent.", ), "role": types.Schema( type=types.Type.STRING, description=( "Role determining the model and tools. " "E.g. 'research' for web lookups, 'coder' for code tasks, " "'distill' for summarization. Defaults to 'chat'." ), ), "tier": types.Schema( type=types.Type.INTEGER, description=( "Context tier: 1 = minimal (fast, identity only), " "2 = standard (+ memory), 3 = + last 2 session logs. " "Use 1 for pure processing tasks." ), ), "timeout": types.Schema( type=types.Type.INTEGER, description="Max seconds to wait (default 120).", ), "max_rounds": types.Schema( type=types.Type.INTEGER, description="Override max tool-loop iterations for this call.", ), }, required=["task"], ), ) ]