Files
Cortex-Inara/cortex/tools/agents.py
Scott Idem f336ae9687 feat: task_list priority filter, session delete confirm, spawn_agent tool restrictions
- task_list: add priority param ('low'/'normal'/'high') alongside existing status filter
- Session delete: inline confirm row (Delete / Cancel) instead of immediate delete
- spawn_agent: allow_tools and deny_tools per-call params; role config remains ceiling;
  deny_tools falls back to confirm_deny gate when no explicit tool_list is set

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-12 21:09:50 -04:00

242 lines
9.1 KiB
Python

"""
Agent spawning tool — lets the orchestrator launch sub-agents synchronously.
Sub-agents run using the model assigned to the specified role. The call blocks
until the sub-agent completes or times out.
Supported model types: local_openai, gemini_api.
claude_cli / gemini_cli are chat-only and do not support sub-agent tool loops.
"""
import asyncio
import logging
from google.genai import types
logger = logging.getLogger(__name__)
# Per-host semaphores — keyed by "host:<host_id>" or "type:<model_type>"
# Created lazily on first use; never deleted (module-level singletons)
_semaphores: dict[str, asyncio.Semaphore] = {}
_sem_lock = asyncio.Lock()
async def _get_semaphore(key: str, max_concurrent: int) -> asyncio.Semaphore:
"""Return (or create) the semaphore for a given host/type key."""
async with _sem_lock:
if key not in _semaphores:
_semaphores[key] = asyncio.Semaphore(max_concurrent)
return _semaphores[key]
async def spawn_agent(
task: str,
role: str = "chat",
tier: int = 1,
timeout: int = 120,
max_rounds: int | None = None,
allow_tools: list[str] | None = None,
deny_tools: list[str] | None = None,
) -> str:
"""
Spawn a sub-agent to complete a task synchronously.
The sub-agent uses the model and tools assigned to the given role. Returns
the sub-agent's response as a string.
"""
import model_registry
from context_loader import load_context
from auth_utils import get_user_role, get_tool_policy
from persona import get_user
user = get_user() or "scott"
role_cfg = model_registry.get_role_config(user, role)
model_cfg = model_registry.get_model_for_role(user, role)
if not model_cfg:
return f"spawn_agent: no model configured for role '{role}'"
model_type = model_cfg.get("type", "unknown")
if model_type not in ("local_openai", "gemini_api"):
return (
f"spawn_agent: model type '{model_type}' does not support tool-enabled sub-agents. "
f"Assign a local_openai or gemini_api model to role '{role}'."
)
# Determine concurrency key and semaphore limit
host_id = model_cfg.get("host_id")
if host_id:
registry = model_registry.get_registry(user)
host = next((h for h in registry.get("hosts", []) if h["id"] == host_id), None)
max_concurrent = (host or {}).get("max_concurrent", 3)
sem_key = f"host:{host_id}"
else:
max_concurrent = 5 if model_type == "gemini_api" else 3
sem_key = f"type:{model_type}"
sem = await _get_semaphore(sem_key, max_concurrent)
system_prompt = load_context(
tier=tier,
include_long=(tier >= 2),
include_mid=(tier >= 2),
include_short=(tier >= 2),
role_append=role_cfg.get("system_append", ""),
inject_datetime=role_cfg.get("inject_datetime", True),
)
user_role = get_user_role(user)
tool_list = role_cfg.get("tools")
policy = get_tool_policy(user)
confirm_allow = set(policy.get("allow", []))
confirm_deny = set(policy.get("deny", []))
# Per-call tool restrictions — role config remains the authoritative ceiling
if allow_tools is not None:
if tool_list is not None:
tool_list = [t for t in tool_list if t in allow_tools]
else:
tool_list = list(allow_tools)
if deny_tools is not None:
deny_set = set(deny_tools)
if tool_list is not None:
tool_list = [t for t in tool_list if t not in deny_set]
else:
# tool_list is unrestricted — block via confirm_deny so the gate fires
confirm_deny = confirm_deny | deny_set
if max_rounds is not None:
model_cfg = dict(model_cfg)
model_cfg["max_rounds"] = max_rounds
async def _run() -> str:
if model_type == "local_openai":
import openai_orchestrator
result = await openai_orchestrator.run(
task=task,
system_prompt=system_prompt,
model_cfg=model_cfg,
respond_with_final=True,
user_role=user_role,
tool_list=tool_list,
confirm_allow=confirm_allow,
confirm_deny=confirm_deny,
)
if result.checkpoint:
return (
"Sub-agent requires user confirmation — "
"confirmation gates are not supported inside spawn_agent. "
"Pre-allow the tool in the user's tool policy or use a different role."
)
return result.response or "(sub-agent returned no output)"
# gemini_api
import orchestrator_engine
from auth_utils import get_user_gemini_key
gemini_key = model_cfg.get("api_key") or get_user_gemini_key(user)
result = await orchestrator_engine.run(
task=task,
system_prompt=system_prompt,
session_messages=None,
respond_with_claude=True,
gemini_api_key=gemini_key,
model_name=model_cfg.get("model_name"),
response_role=role,
user_role=user_role,
tool_list=tool_list,
confirm_allow=confirm_allow,
confirm_deny=confirm_deny,
max_rounds=model_cfg.get("max_rounds"),
)
if result.checkpoint:
return (
"Sub-agent requires user confirmation — "
"confirmation gates are not supported inside spawn_agent."
)
return result.response or "(sub-agent returned no output)"
async with sem:
try:
logger.info(
"spawn_agent: role=%s tier=%d timeout=%ds task=%.80s",
role, tier, timeout, task,
)
response = await asyncio.wait_for(_run(), timeout=float(timeout))
logger.info("spawn_agent: done role=%s response=%d chars", role, len(response))
return response
except asyncio.TimeoutError:
logger.warning("spawn_agent: timed out after %ds role=%s", timeout, role)
return f"Sub-agent timed out after {timeout}s (role={role})"
except Exception as e:
logger.exception("spawn_agent: failed role=%s", role)
return f"Sub-agent error ({role}): {e}"
DECLARATIONS = [
types.FunctionDeclaration(
name="spawn_agent",
description=(
"Spawn a sub-agent to complete a task synchronously. "
"The sub-agent uses the model and tool set assigned to the given role. "
"Use for processing pipelines, parallel analysis, or delegating "
"specialized work (research, coding, data migration, etc.)."
),
parameters=types.Schema(
type=types.Type.OBJECT,
properties={
"task": types.Schema(
type=types.Type.STRING,
description="The complete task description for the sub-agent.",
),
"role": types.Schema(
type=types.Type.STRING,
description=(
"Role determining the model and tools. "
"E.g. 'research' for web lookups, 'coder' for code tasks, "
"'distill' for summarization. Defaults to 'chat'."
),
),
"tier": types.Schema(
type=types.Type.INTEGER,
description=(
"Context tier: 1 = minimal (fast, identity only), "
"2 = standard (+ memory), 3 = + last 2 session logs. "
"Use 1 for pure processing tasks."
),
),
"timeout": types.Schema(
type=types.Type.INTEGER,
description="Max seconds to wait (default 120).",
),
"max_rounds": types.Schema(
type=types.Type.INTEGER,
description="Override max tool-loop iterations for this call.",
),
"allow_tools": types.Schema(
type=types.Type.ARRAY,
items=types.Schema(type=types.Type.STRING),
description=(
"Restrict the sub-agent to only these tools. "
"Intersected with the role's tool set — cannot grant more than the role allows. "
"Omit to give the sub-agent the role's full tool set. "
"Example: ['web_search', 'web_read'] for a pure research agent."
),
),
"deny_tools": types.Schema(
type=types.Type.ARRAY,
items=types.Schema(type=types.Type.STRING),
description=(
"Block these tools from the sub-agent regardless of role config. "
"Use to prevent destructive operations in sensitive sub-tasks. "
"Example: ['shell_exec', 'file_write', 'cortex_restart']."
),
),
},
required=["task"],
),
)
]