Files
Cortex-Inara/cortex/tools/__init__.py
Scott Idem 1cc7988953 feat: add shell_exec tool and fix orchestrator model name resolution
- Add shell_exec to orchestrator tool suite (system.py + __init__.py)
  Runs arbitrary shell commands on the Cortex host with timeout (1–120s),
  combined stdout/stderr output, optional working_dir, and exit code reporting.
  Enables system diagnostics (df, ls, ps, journalctl, etc.) from Agent mode.

- Fix orchestrator_engine.run() to use model_name from resolved registry entry
  Previously used settings.orchestrator_model (.env hardcode) regardless of
  what model was assigned to the orchestrator role. Now accepts model_name param
  and falls back to settings value only when registry has no model_name.

- Update ARCH__FUTURE.md: date, running host, local orchestrator status,
  model registry V2 progress, added Cortex Mesh concept (section 9)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-28 20:29:46 -04:00

643 lines
22 KiB
Python

"""
Orchestrator tool registry.
Each tool has two parts:
1. A Gemini FunctionDeclaration — tells the model what the tool does and what args it takes
2. A Python async callable — the actual implementation
To add a new tool:
1. Implement it in a tools/<domain>.py module
2. Import it here and add (declaration, callable) to _REGISTRY
3. Add a FunctionDeclaration below and include it in TOOL_DECLARATIONS
IMPORTANT: These tools are separate from the ae_* MCP tools used by the fleet agents.
Do not modify the ae_* MCP server to support orchestrator needs.
"""
from google.genai import types
from tools.web import search as _web_search
from tools.ae_knowledge import journal_search as _ae_journal_search
from tools.ae_knowledge import journal_entry_create as _ae_journal_entry_create
from tools.ae_tasks import task_list as _ae_task_list
from tools.files import file_read as _file_read
from tools.system import claude_allow_dir as _claude_allow_dir, shell_exec as _shell_exec
from tools.tasks import task_list as _task_list, task_create as _task_create
from tools.tasks import task_update as _task_update, task_complete as _task_complete
from tools.cron import (
cron_list as _cron_list,
cron_add as _cron_add,
cron_remove as _cron_remove,
cron_toggle as _cron_toggle,
)
from tools.reminders import (
reminders_add as _reminders_add,
reminders_list as _reminders_list,
reminders_clear as _reminders_clear,
)
from tools.scratch import (
scratch_read as _scratch_read,
scratch_write as _scratch_write,
scratch_append as _scratch_append,
scratch_clear as _scratch_clear,
)
# ---------------------------------------------------------------------------
# Gemini function declarations
# ---------------------------------------------------------------------------
_web_search_declaration = types.FunctionDeclaration(
name="web_search",
description=(
"Search the web for current information. Use this when you need up-to-date "
"facts, news, documentation, or anything not in your training data."
),
parameters=types.Schema(
type=types.Type.OBJECT,
properties={
"query": types.Schema(
type=types.Type.STRING,
description="The search query string",
),
"max_results": types.Schema(
type=types.Type.INTEGER,
description="Number of results to return (default 5, max 10)",
),
},
required=["query"],
),
)
_ae_journal_search_declaration = types.FunctionDeclaration(
name="ae_journal_search",
description=(
"Search the Aether Journals knowledge base by keyword. "
"Use this to look up notes, documentation, meeting summaries, or any saved knowledge. "
"Always search before creating a new entry to avoid duplicates."
),
parameters=types.Schema(
type=types.Type.OBJECT,
properties={
"query": types.Schema(
type=types.Type.STRING,
description="Keyword or phrase to search for",
),
"journal_id": types.Schema(
type=types.Type.STRING,
description=(
"Optional: scope search to a specific journal by its id_random. "
"Omit to search all journals."
),
),
"max_results": types.Schema(
type=types.Type.INTEGER,
description="Maximum number of entries to return (default 10)",
),
},
required=["query"],
),
)
_ae_journal_entry_create_declaration = types.FunctionDeclaration(
name="ae_journal_entry_create",
description=(
"Create a new entry in an Aether Journal. "
"Use this to save notes, summaries, or any content the user wants to store. "
"Always call ae_journal_search first to check for existing entries on the same topic."
),
parameters=types.Schema(
type=types.Type.OBJECT,
properties={
"journal_id": types.Schema(
type=types.Type.STRING,
description=(
"The id_random of the target journal. "
"Ask the user which journal to write to if not specified."
),
),
"title": types.Schema(
type=types.Type.STRING,
description="Entry title",
),
"content": types.Schema(
type=types.Type.STRING,
description="Full entry content (markdown supported)",
),
"summary": types.Schema(
type=types.Type.STRING,
description="Optional short summary (1-2 sentences)",
),
"tags": types.Schema(
type=types.Type.STRING,
description="Optional comma-separated tags (e.g. 'wireguard, networking, homelab')",
),
},
required=["journal_id", "title", "content"],
),
)
_ae_task_list_declaration = types.FunctionDeclaration(
name="ae_task_list",
description=(
"List tasks from the agents_sync Kanban board (todo and in-progress). "
"Use this when asked about current work, pending tasks, or project status."
),
parameters=types.Schema(
type=types.Type.OBJECT,
properties={
"include_done": types.Schema(
type=types.Type.BOOLEAN,
description="If true, also include completed tasks (default false)",
),
},
),
)
_file_read_declaration = types.FunctionDeclaration(
name="file_read",
description=(
"Read a local file and return its contents. "
"Allowed directories: ~/agents_sync/, ~/OSIT_dev/, ~/DgrZone_Nextcloud/, ~/OSIT_Nextcloud/. "
"Use this to read documentation, notes, CLAUDE.md files, or config references. "
"If given a directory path, returns a directory listing instead."
),
parameters=types.Schema(
type=types.Type.OBJECT,
properties={
"path": types.Schema(
type=types.Type.STRING,
description=(
"Absolute or home-relative path to the file "
"(e.g. ~/agents_sync/CLAUDE.md or /home/scott/agents_sync/tasks/01_todo/)"
),
),
"max_lines": types.Schema(
type=types.Type.INTEGER,
description="Optional line limit (default 500)",
),
},
required=["path"],
),
)
# ---------------------------------------------------------------------------
# Registry: maps tool name → async callable
# ---------------------------------------------------------------------------
_CALLABLES: dict[str, callable] = {
"web_search": _web_search,
"ae_journal_search": _ae_journal_search,
"ae_journal_entry_create": _ae_journal_entry_create,
"ae_task_list": _ae_task_list,
"file_read": _file_read,
"claude_allow_dir": _claude_allow_dir,
"shell_exec": _shell_exec,
"task_list": _task_list,
"task_create": _task_create,
"task_update": _task_update,
"task_complete": _task_complete,
"cron_list": _cron_list,
"cron_add": _cron_add,
"cron_remove": _cron_remove,
"cron_toggle": _cron_toggle,
"reminders_add": _reminders_add,
"reminders_list": _reminders_list,
"reminders_clear": _reminders_clear,
"scratch_read": _scratch_read,
"scratch_write": _scratch_write,
"scratch_append": _scratch_append,
"scratch_clear": _scratch_clear,
}
_claude_allow_dir_declaration = types.FunctionDeclaration(
name="claude_allow_dir",
description=(
"Add a directory to Claude Code's auto-allow list so Claude can read or write "
"files there without prompting. Edits ~/.claude/settings.json on the local machine. "
"Use this when Claude is silently hanging or being blocked from accessing a directory. "
"Changes take effect in the next Claude Code session."
),
parameters=types.Schema(
type=types.Type.OBJECT,
properties={
"path": types.Schema(
type=types.Type.STRING,
description=(
"Absolute or home-relative path to the directory "
"(e.g. ~/OSIT_dev/aether_api_fastapi or /home/scott/agents_sync)"
),
),
"mode": types.Schema(
type=types.Type.STRING,
description="Permission mode: 'r' (read-only), 'w' (write-only), or 'rw' (both). Default: rw",
),
},
required=["path"],
),
)
_shell_exec_declaration = types.FunctionDeclaration(
name="shell_exec",
description=(
"Execute a shell command on the Cortex host machine and return its output. "
"Use for system diagnostics: disk usage (df -h), process status (ps aux), "
"directory listings (ls), memory (free -h), uptime, network info, log tails, etc. "
"Commands run as the Cortex service user. Timeout enforced (default 30s, max 120s). "
"Avoid destructive commands — prefer read-only system queries."
),
parameters=types.Schema(
type=types.Type.OBJECT,
properties={
"command": types.Schema(
type=types.Type.STRING,
description="Shell command to run (e.g. 'df -h', 'ls ~/agents_sync/', 'journalctl --user -u cortex -n 50')",
),
"working_dir": types.Schema(
type=types.Type.STRING,
description="Optional working directory (e.g. '~/agents_sync/projects'). Defaults to home directory.",
),
"timeout": types.Schema(
type=types.Type.INTEGER,
description="Timeout in seconds (default 30, max 120)",
),
},
required=["command"],
),
)
_task_list_declaration = types.FunctionDeclaration(
name="task_list",
description=(
"List personal tasks from Inara's task list. "
"Use this to check what's on the list, review pending work, or find a task ID. "
"Optionally filter by status: 'todo', 'in_progress', or 'done'."
),
parameters=types.Schema(
type=types.Type.OBJECT,
properties={
"status": types.Schema(
type=types.Type.STRING,
description="Filter by status: 'todo', 'in_progress', or 'done'. Omit to list all.",
),
},
),
)
_task_create_declaration = types.FunctionDeclaration(
name="task_create",
description=(
"Add a new task to Inara's personal task list. "
"Use this when the user asks to remember something, add a to-do, or track a follow-up."
),
parameters=types.Schema(
type=types.Type.OBJECT,
properties={
"title": types.Schema(
type=types.Type.STRING,
description="Short task title",
),
"description": types.Schema(
type=types.Type.STRING,
description="Optional longer description or context",
),
"priority": types.Schema(
type=types.Type.STRING,
description="Priority: 'low', 'normal', or 'high'. Default: normal.",
),
},
required=["title"],
),
)
_task_update_declaration = types.FunctionDeclaration(
name="task_update",
description=(
"Update an existing task. Use task_list first to get the task ID. "
"Can update status, title, description, or priority. "
"To just mark complete, use task_complete instead."
),
parameters=types.Schema(
type=types.Type.OBJECT,
properties={
"task_id": types.Schema(
type=types.Type.STRING,
description="Task ID (e.g. t_abc123) — get from task_list",
),
"status": types.Schema(
type=types.Type.STRING,
description="New status: 'todo', 'in_progress', or 'done'",
),
"title": types.Schema(
type=types.Type.STRING,
description="Updated title",
),
"description": types.Schema(
type=types.Type.STRING,
description="Updated description",
),
"priority": types.Schema(
type=types.Type.STRING,
description="Updated priority: 'low', 'normal', or 'high'",
),
},
required=["task_id"],
),
)
_task_complete_declaration = types.FunctionDeclaration(
name="task_complete",
description=(
"Mark a task as done. Use task_list first to get the task ID. "
"Shorthand for task_update with status='done'."
),
parameters=types.Schema(
type=types.Type.OBJECT,
properties={
"task_id": types.Schema(
type=types.Type.STRING,
description="Task ID (e.g. t_abc123) — get from task_list",
),
},
required=["task_id"],
),
)
_cron_list_declaration = types.FunctionDeclaration(
name="cron_list",
description=(
"List all scheduled cron jobs — their ID, label, schedule, type, and last run time. "
"Use this to see what's scheduled before adding or removing jobs."
),
parameters=types.Schema(type=types.Type.OBJECT, properties={}),
)
_cron_add_declaration = types.FunctionDeclaration(
name="cron_add",
description=(
"Create a new scheduled cron job and register it immediately (no restart needed). "
"Two types: 'remind' writes to the pending reminders queue (Inara sees it automatically "
"in context next session); 'note' appends to the scratchpad. "
"Schedule formats: 'hourly' | 'daily' | 'daily:HH:MM' | 'weekly:DOW' | 'weekly:DOW:HH:MM'. "
"Example: schedule='daily:09:00', type='remind', payload='Check in with Scott.'"
),
parameters=types.Schema(
type=types.Type.OBJECT,
properties={
"label": types.Schema(
type=types.Type.STRING,
description="Short human-readable name for this job (e.g. 'Morning check-in')",
),
"schedule": types.Schema(
type=types.Type.STRING,
description=(
"When to run. Formats: hourly | daily | daily:HH:MM | "
"weekly:DOW | weekly:DOW:HH:MM (e.g. 'weekly:mon:09:00')"
),
),
"job_type": types.Schema(
type=types.Type.STRING,
description="'remind' (→ REMINDERS.md, auto-surfaced in context) or 'note' (→ SCRATCH.md)",
),
"payload": types.Schema(
type=types.Type.STRING,
description="The text to write when the job fires",
),
},
required=["label", "schedule", "job_type", "payload"],
),
)
_cron_remove_declaration = types.FunctionDeclaration(
name="cron_remove",
description=(
"Permanently delete a scheduled cron job. Use cron_list first to get the ID. "
"To temporarily disable without deleting, use cron_toggle instead."
),
parameters=types.Schema(
type=types.Type.OBJECT,
properties={
"cron_id": types.Schema(
type=types.Type.STRING,
description="Job ID (e.g. c_abc123) — get from cron_list",
),
},
required=["cron_id"],
),
)
_cron_toggle_declaration = types.FunctionDeclaration(
name="cron_toggle",
description=(
"Pause a running cron job, or resume a paused one. "
"The job stays in the list and can be re-enabled later. "
"Use cron_list to see current enabled/paused state."
),
parameters=types.Schema(
type=types.Type.OBJECT,
properties={
"cron_id": types.Schema(
type=types.Type.STRING,
description="Job ID (e.g. c_abc123) — get from cron_list",
),
},
required=["cron_id"],
),
)
_reminders_add_declaration = types.FunctionDeclaration(
name="reminders_add",
description=(
"Add a new reminder to REMINDERS.md. Reminders are automatically surfaced "
"in your context at the start of each session (Tier 2+). "
"Use this when the user asks you to remember something, follow up on something, "
"or surface a note at the next session."
),
parameters=types.Schema(
type=types.Type.OBJECT,
properties={
"text": types.Schema(
type=types.Type.STRING,
description="The reminder text to add",
),
"label": types.Schema(
type=types.Type.STRING,
description="Optional heading for this reminder (e.g. 'Follow up on NC Talk'). Defaults to current timestamp.",
),
},
required=["text"],
),
)
_reminders_list_declaration = types.FunctionDeclaration(
name="reminders_list",
description=(
"Read all current pending reminders from REMINDERS.md. "
"Use this to check what reminders are queued before adding duplicates, "
"or to show the user what's pending."
),
parameters=types.Schema(type=types.Type.OBJECT, properties={}),
)
_reminders_clear_declaration = types.FunctionDeclaration(
name="reminders_clear",
description=(
"Erase all pending reminders from REMINDERS.md. "
"Use this after you have acknowledged and acted on the reminders shown in your context."
),
parameters=types.Schema(type=types.Type.OBJECT, properties={}),
)
_scratch_read_declaration = types.FunctionDeclaration(
name="scratch_read",
description=(
"Read the full contents of the scratchpad. "
"Use this to recall working notes, mid-task context, or anything previously jotted down. "
"The scratchpad is transient — nothing here is distilled or archived."
),
parameters=types.Schema(type=types.Type.OBJECT, properties={}),
)
_scratch_write_declaration = types.FunctionDeclaration(
name="scratch_write",
description=(
"Replace the entire scratchpad with new content. "
"Use this to set a clean working note, replacing whatever was there before. "
"For adding without replacing, use scratch_append instead."
),
parameters=types.Schema(
type=types.Type.OBJECT,
properties={
"content": types.Schema(
type=types.Type.STRING,
description="The new scratchpad content (markdown supported)",
),
},
required=["content"],
),
)
_scratch_append_declaration = types.FunctionDeclaration(
name="scratch_append",
description=(
"Add a new section to the bottom of the scratchpad without replacing existing content. "
"Each section gets a timestamp heading unless you supply one."
),
parameters=types.Schema(
type=types.Type.OBJECT,
properties={
"content": types.Schema(
type=types.Type.STRING,
description="The content to append (markdown supported)",
),
"heading": types.Schema(
type=types.Type.STRING,
description="Optional section heading. Defaults to current UTC timestamp.",
),
},
required=["content"],
),
)
_scratch_clear_declaration = types.FunctionDeclaration(
name="scratch_clear",
description="Erase everything in the scratchpad. Use when the working notes are no longer needed.",
parameters=types.Schema(type=types.Type.OBJECT, properties={}),
)
# Gemini Tool object — pass this to GenerateContentConfig
TOOL_DECLARATIONS = [
types.Tool(function_declarations=[
_web_search_declaration,
_ae_journal_search_declaration,
_ae_journal_entry_create_declaration,
_ae_task_list_declaration,
_file_read_declaration,
_claude_allow_dir_declaration,
_shell_exec_declaration,
_task_list_declaration,
_task_create_declaration,
_task_update_declaration,
_task_complete_declaration,
_cron_list_declaration,
_cron_add_declaration,
_cron_remove_declaration,
_cron_toggle_declaration,
_reminders_add_declaration,
_reminders_list_declaration,
_reminders_clear_declaration,
_scratch_read_declaration,
_scratch_write_declaration,
_scratch_append_declaration,
_scratch_clear_declaration,
])
]
async def call_tool(name: str, args: dict) -> str:
"""Dispatch a tool call by name. Returns result as a string."""
fn = _CALLABLES.get(name)
if fn is None:
return f"Unknown tool: {name}"
return await fn(**args)
# ---------------------------------------------------------------------------
# OpenAI JSON Schema format — auto-derived from the Gemini declarations above
# so there is a single source of truth for tool definitions.
# ---------------------------------------------------------------------------
_GEMINI_TYPE_TO_JSON = {
"OBJECT": "object",
"STRING": "string",
"INTEGER": "integer",
"NUMBER": "number",
"BOOLEAN": "boolean",
"ARRAY": "array",
}
def _schema_to_json(schema) -> dict:
"""Recursively convert a Gemini types.Schema to a JSON Schema dict."""
type_name = getattr(getattr(schema, "type", None), "name", "STRING")
result: dict = {"type": _GEMINI_TYPE_TO_JSON.get(type_name, "string")}
if getattr(schema, "description", None):
result["description"] = schema.description
props = getattr(schema, "properties", None) or {}
if result["type"] == "object":
result["properties"] = {k: _schema_to_json(v) for k, v in props.items()}
req = getattr(schema, "required", None)
if req:
result["required"] = list(req)
return result
def _build_openai_tools() -> list[dict]:
"""Convert TOOL_DECLARATIONS (Gemini format) to OpenAI tool schemas."""
out = []
for decl in TOOL_DECLARATIONS[0].function_declarations:
params = (
_schema_to_json(decl.parameters)
if decl.parameters
else {"type": "object", "properties": {}}
)
out.append({
"type": "function",
"function": {
"name": decl.name,
"description": decl.description or "",
"parameters": params,
},
})
return out
# OpenAI-format tool list — pass to client.chat.completions.create(tools=...)
OPENAI_TOOL_SCHEMAS: list[dict] = _build_openai_tools()