Files
Cortex-Inara/cortex/tools/__init__.py
Scott Idem fd0fb76c08 feat: add email_send orchestrator tool
Wraps the existing email_utils.send_email helper as an admin-only tool.
Accepts to, subject, body (plain text); newlines converted to <br> for HTML part.
Registered in _CALLABLES, _ALL_DECLARATIONS, and TOOL_ROLES (admin).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-29 21:35:29 -04:00

969 lines
35 KiB
Python

"""
Orchestrator tool registry.
Each tool has two parts:
1. A Gemini FunctionDeclaration — tells the model what the tool does and what args it takes
2. A Python async callable — the actual implementation
To add a new tool:
1. Implement it in a tools/<domain>.py module
2. Import it here and add (declaration, callable) to _REGISTRY
3. Add a FunctionDeclaration below and include it in TOOL_DECLARATIONS
IMPORTANT: These tools are separate from the ae_* MCP tools used by the fleet agents.
Do not modify the ae_* MCP server to support orchestrator needs.
"""
from google.genai import types
from tools.web import search as _web_search
from tools.ae_knowledge import journal_search as _ae_journal_search
from tools.ae_knowledge import journal_list as _ae_journal_list
from tools.ae_knowledge import journal_entry_create as _ae_journal_entry_create
from tools.ae_knowledge import journal_entry_update as _ae_journal_entry_update
from tools.ae_knowledge import journal_entry_disable as _ae_journal_entry_disable
from tools.ae_knowledge import journal_entry_append as _ae_journal_entry_append
from tools.ae_knowledge import journal_entry_prepend as _ae_journal_entry_prepend
from tools.ae_tasks import task_list as _ae_task_list
from tools.files import file_read as _file_read
from tools.system import claude_allow_dir as _claude_allow_dir, shell_exec as _shell_exec
from tools.tasks import task_list as _task_list, task_create as _task_create
from tools.tasks import task_update as _task_update, task_complete as _task_complete
from tools.cron import (
cron_list as _cron_list,
cron_add as _cron_add,
cron_remove as _cron_remove,
cron_toggle as _cron_toggle,
)
from tools.reminders import (
reminders_add as _reminders_add,
reminders_list as _reminders_list,
reminders_clear as _reminders_clear,
)
from tools.scratch import (
scratch_read as _scratch_read,
scratch_write as _scratch_write,
scratch_append as _scratch_append,
scratch_clear as _scratch_clear,
)
from tools.system import cortex_restart as _cortex_restart, cortex_logs as _cortex_logs
from tools.web import http_fetch as _http_fetch
from tools.files import file_list as _file_list, file_write as _file_write
from tools.notify import nc_talk_send as _nc_talk_send, email_send as _email_send
# ---------------------------------------------------------------------------
# Gemini function declarations
# ---------------------------------------------------------------------------
_web_search_declaration = types.FunctionDeclaration(
name="web_search",
description=(
"Search the web for current information. Use this when you need up-to-date "
"facts, news, documentation, or anything not in your training data."
),
parameters=types.Schema(
type=types.Type.OBJECT,
properties={
"query": types.Schema(
type=types.Type.STRING,
description="The search query string",
),
"max_results": types.Schema(
type=types.Type.INTEGER,
description="Number of results to return (default 5, max 10)",
),
},
required=["query"],
),
)
_ae_journal_list_declaration = types.FunctionDeclaration(
name="ae_journal_list",
description=(
"List all Aether Journals available for this account. "
"Returns each journal's name and id_random. "
"Call this first when you need to write a new entry or scope a search to a specific journal "
"and don't already know the journal's id."
),
parameters=types.Schema(type=types.Type.OBJECT, properties={}),
)
_ae_journal_search_declaration = types.FunctionDeclaration(
name="ae_journal_search",
description=(
"Search the Aether Journals knowledge base by keyword. "
"Use this to look up notes, documentation, meeting summaries, or any saved knowledge. "
"Always search before creating a new entry to avoid duplicates."
),
parameters=types.Schema(
type=types.Type.OBJECT,
properties={
"query": types.Schema(
type=types.Type.STRING,
description="Keyword or phrase to search for",
),
"journal_id": types.Schema(
type=types.Type.STRING,
description=(
"Optional: scope search to a specific journal by its id_random. "
"Omit to search all journals."
),
),
"max_results": types.Schema(
type=types.Type.INTEGER,
description="Maximum number of entries to return (default 10)",
),
},
required=["query"],
),
)
_ae_journal_entry_create_declaration = types.FunctionDeclaration(
name="ae_journal_entry_create",
description=(
"Create a new entry in an Aether Journal. "
"Use this to save notes, summaries, or any content the user wants to store. "
"Always call ae_journal_search first to check for existing entries on the same topic."
),
parameters=types.Schema(
type=types.Type.OBJECT,
properties={
"journal_id": types.Schema(
type=types.Type.STRING,
description=(
"The id_random of the target journal. "
"Ask the user which journal to write to if not specified."
),
),
"title": types.Schema(
type=types.Type.STRING,
description="Entry title",
),
"content": types.Schema(
type=types.Type.STRING,
description="Full entry content (markdown supported)",
),
"summary": types.Schema(
type=types.Type.STRING,
description="Optional short summary (1-2 sentences)",
),
"tags": types.Schema(
type=types.Type.STRING,
description="Optional comma-separated tags (e.g. 'wireguard, networking, homelab')",
),
},
required=["journal_id", "title", "content"],
),
)
_ae_journal_entry_update_declaration = types.FunctionDeclaration(
name="ae_journal_entry_update",
description=(
"Update fields on an existing journal entry. Only the fields you provide are changed — "
"omitted fields are left as-is. Use ae_journal_search to find the entry_id first. "
"To soft-delete, use ae_journal_entry_disable instead."
),
parameters=types.Schema(
type=types.Type.OBJECT,
properties={
"entry_id": types.Schema(type=types.Type.STRING, description="Journal entry id_random"),
"title": types.Schema(type=types.Type.STRING, description="New title"),
"content": types.Schema(type=types.Type.STRING, description="Replacement content (full, markdown supported)"),
"summary": types.Schema(type=types.Type.STRING, description="New summary"),
"tags": types.Schema(type=types.Type.STRING, description="Replacement comma-separated tags"),
"enable": types.Schema(type=types.Type.BOOLEAN, description="Set false to hide/disable the entry"),
},
required=["entry_id"],
),
)
_ae_journal_entry_disable_declaration = types.FunctionDeclaration(
name="ae_journal_entry_disable",
description=(
"Soft-delete a journal entry by setting enable=false. "
"The entry is hidden but not permanently removed. "
"Use ae_journal_search to find the entry_id first."
),
parameters=types.Schema(
type=types.Type.OBJECT,
properties={
"entry_id": types.Schema(type=types.Type.STRING, description="Journal entry id_random"),
},
required=["entry_id"],
),
)
_ae_journal_entry_append_declaration = types.FunctionDeclaration(
name="ae_journal_entry_append",
description=(
"Append a new section to the bottom of a journal entry's content. "
"Each section gets a UTC timestamp heading unless you provide one. "
"Ideal for timestamped logs, running notes, or data logs."
),
parameters=types.Schema(
type=types.Type.OBJECT,
properties={
"entry_id": types.Schema(type=types.Type.STRING, description="Journal entry id_random"),
"content": types.Schema(type=types.Type.STRING, description="The text to append (markdown supported)"),
"heading": types.Schema(type=types.Type.STRING, description="Optional section heading (defaults to current UTC timestamp)"),
},
required=["entry_id", "content"],
),
)
_ae_journal_entry_prepend_declaration = types.FunctionDeclaration(
name="ae_journal_entry_prepend",
description=(
"Prepend a new section to the top of a journal entry's content. "
"Each section gets a UTC timestamp heading unless you provide one. "
"Useful for most-recent-first logs."
),
parameters=types.Schema(
type=types.Type.OBJECT,
properties={
"entry_id": types.Schema(type=types.Type.STRING, description="Journal entry id_random"),
"content": types.Schema(type=types.Type.STRING, description="The text to prepend (markdown supported)"),
"heading": types.Schema(type=types.Type.STRING, description="Optional section heading (defaults to current UTC timestamp)"),
},
required=["entry_id", "content"],
),
)
_ae_task_list_declaration = types.FunctionDeclaration(
name="ae_task_list",
description=(
"List tasks from the agents_sync Kanban board (todo and in-progress). "
"Use this when asked about current work, pending tasks, or project status."
),
parameters=types.Schema(
type=types.Type.OBJECT,
properties={
"include_done": types.Schema(
type=types.Type.BOOLEAN,
description="If true, also include completed tasks (default false)",
),
},
),
)
_file_read_declaration = types.FunctionDeclaration(
name="file_read",
description=(
"Read a local file and return its contents. "
"Allowed directories: ~/agents_sync/, ~/OSIT_dev/, ~/DgrZone_Nextcloud/, ~/OSIT_Nextcloud/. "
"Use this to read documentation, notes, CLAUDE.md files, or config references. "
"If given a directory path, returns a directory listing instead."
),
parameters=types.Schema(
type=types.Type.OBJECT,
properties={
"path": types.Schema(
type=types.Type.STRING,
description=(
"Absolute or home-relative path to the file "
"(e.g. ~/agents_sync/CLAUDE.md or /home/scott/agents_sync/tasks/01_todo/)"
),
),
"max_lines": types.Schema(
type=types.Type.INTEGER,
description="Optional line limit (default 500)",
),
},
required=["path"],
),
)
# ---------------------------------------------------------------------------
# Registry: maps tool name → async callable
# ---------------------------------------------------------------------------
_CALLABLES: dict[str, callable] = {
"web_search": _web_search,
"ae_journal_list": _ae_journal_list,
"ae_journal_search": _ae_journal_search,
"ae_journal_entry_create": _ae_journal_entry_create,
"ae_journal_entry_update": _ae_journal_entry_update,
"ae_journal_entry_disable": _ae_journal_entry_disable,
"ae_journal_entry_append": _ae_journal_entry_append,
"ae_journal_entry_prepend": _ae_journal_entry_prepend,
"ae_task_list": _ae_task_list,
"file_read": _file_read,
"file_list": _file_list,
"file_write": _file_write,
"claude_allow_dir": _claude_allow_dir,
"shell_exec": _shell_exec,
"cortex_restart": _cortex_restart,
"cortex_logs": _cortex_logs,
"http_fetch": _http_fetch,
"email_send": _email_send,
"nc_talk_send": _nc_talk_send,
"task_list": _task_list,
"task_create": _task_create,
"task_update": _task_update,
"task_complete": _task_complete,
"cron_list": _cron_list,
"cron_add": _cron_add,
"cron_remove": _cron_remove,
"cron_toggle": _cron_toggle,
"reminders_add": _reminders_add,
"reminders_list": _reminders_list,
"reminders_clear": _reminders_clear,
"scratch_read": _scratch_read,
"scratch_write": _scratch_write,
"scratch_append": _scratch_append,
"scratch_clear": _scratch_clear,
}
_claude_allow_dir_declaration = types.FunctionDeclaration(
name="claude_allow_dir",
description=(
"Add a directory to Claude Code's auto-allow list so Claude can read or write "
"files there without prompting. Edits ~/.claude/settings.json on the local machine. "
"Use this when Claude is silently hanging or being blocked from accessing a directory. "
"Changes take effect in the next Claude Code session."
),
parameters=types.Schema(
type=types.Type.OBJECT,
properties={
"path": types.Schema(
type=types.Type.STRING,
description=(
"Absolute or home-relative path to the directory "
"(e.g. ~/OSIT_dev/aether_api_fastapi or /home/scott/agents_sync)"
),
),
"mode": types.Schema(
type=types.Type.STRING,
description="Permission mode: 'r' (read-only), 'w' (write-only), or 'rw' (both). Default: rw",
),
},
required=["path"],
),
)
_shell_exec_declaration = types.FunctionDeclaration(
name="shell_exec",
description=(
"Execute a shell command on the Cortex host machine and return its output. "
"Use for system diagnostics: disk usage (df -h), process status (ps aux), "
"directory listings (ls), memory (free -h), uptime, network info, log tails, etc. "
"Commands run as the Cortex service user. Timeout enforced (default 30s, max 120s). "
"Avoid destructive commands — prefer read-only system queries."
),
parameters=types.Schema(
type=types.Type.OBJECT,
properties={
"command": types.Schema(
type=types.Type.STRING,
description="Shell command to run (e.g. 'df -h', 'ls ~/agents_sync/', 'journalctl --user -u cortex -n 50')",
),
"working_dir": types.Schema(
type=types.Type.STRING,
description="Optional working directory (e.g. '~/agents_sync/projects'). Defaults to home directory.",
),
"timeout": types.Schema(
type=types.Type.INTEGER,
description="Timeout in seconds (default 30, max 120)",
),
},
required=["command"],
),
)
_task_list_declaration = types.FunctionDeclaration(
name="task_list",
description=(
"List personal tasks from Inara's task list. "
"Use this to check what's on the list, review pending work, or find a task ID. "
"Optionally filter by status: 'todo', 'in_progress', or 'done'."
),
parameters=types.Schema(
type=types.Type.OBJECT,
properties={
"status": types.Schema(
type=types.Type.STRING,
description="Filter by status: 'todo', 'in_progress', or 'done'. Omit to list all.",
),
},
),
)
_task_create_declaration = types.FunctionDeclaration(
name="task_create",
description=(
"Add a new task to Inara's personal task list. "
"Use this when the user asks to remember something, add a to-do, or track a follow-up."
),
parameters=types.Schema(
type=types.Type.OBJECT,
properties={
"title": types.Schema(
type=types.Type.STRING,
description="Short task title",
),
"description": types.Schema(
type=types.Type.STRING,
description="Optional longer description or context",
),
"priority": types.Schema(
type=types.Type.STRING,
description="Priority: 'low', 'normal', or 'high'. Default: normal.",
),
},
required=["title"],
),
)
_task_update_declaration = types.FunctionDeclaration(
name="task_update",
description=(
"Update an existing task. Use task_list first to get the task ID. "
"Can update status, title, description, or priority. "
"To just mark complete, use task_complete instead."
),
parameters=types.Schema(
type=types.Type.OBJECT,
properties={
"task_id": types.Schema(
type=types.Type.STRING,
description="Task ID (e.g. t_abc123) — get from task_list",
),
"status": types.Schema(
type=types.Type.STRING,
description="New status: 'todo', 'in_progress', or 'done'",
),
"title": types.Schema(
type=types.Type.STRING,
description="Updated title",
),
"description": types.Schema(
type=types.Type.STRING,
description="Updated description",
),
"priority": types.Schema(
type=types.Type.STRING,
description="Updated priority: 'low', 'normal', or 'high'",
),
},
required=["task_id"],
),
)
_task_complete_declaration = types.FunctionDeclaration(
name="task_complete",
description=(
"Mark a task as done. Use task_list first to get the task ID. "
"Shorthand for task_update with status='done'."
),
parameters=types.Schema(
type=types.Type.OBJECT,
properties={
"task_id": types.Schema(
type=types.Type.STRING,
description="Task ID (e.g. t_abc123) — get from task_list",
),
},
required=["task_id"],
),
)
_cron_list_declaration = types.FunctionDeclaration(
name="cron_list",
description=(
"List all scheduled cron jobs — their ID, label, schedule, type, and last run time. "
"Use this to see what's scheduled before adding or removing jobs."
),
parameters=types.Schema(type=types.Type.OBJECT, properties={}),
)
_cron_add_declaration = types.FunctionDeclaration(
name="cron_add",
description=(
"Create a new scheduled cron job and register it immediately (no restart needed). "
"Two types: 'remind' writes to the pending reminders queue (Inara sees it automatically "
"in context next session); 'note' appends to the scratchpad. "
"Schedule formats: 'hourly' | 'daily' | 'daily:HH:MM' | 'weekly:DOW' | 'weekly:DOW:HH:MM'. "
"Example: schedule='daily:09:00', type='remind', payload='Check in with Scott.'"
),
parameters=types.Schema(
type=types.Type.OBJECT,
properties={
"label": types.Schema(
type=types.Type.STRING,
description="Short human-readable name for this job (e.g. 'Morning check-in')",
),
"schedule": types.Schema(
type=types.Type.STRING,
description=(
"When to run. Formats: hourly | daily | daily:HH:MM | "
"weekly:DOW | weekly:DOW:HH:MM (e.g. 'weekly:mon:09:00')"
),
),
"job_type": types.Schema(
type=types.Type.STRING,
description="'remind' (→ REMINDERS.md, auto-surfaced in context) or 'note' (→ SCRATCH.md)",
),
"payload": types.Schema(
type=types.Type.STRING,
description="The text to write when the job fires",
),
},
required=["label", "schedule", "job_type", "payload"],
),
)
_cron_remove_declaration = types.FunctionDeclaration(
name="cron_remove",
description=(
"Permanently delete a scheduled cron job. Use cron_list first to get the ID. "
"To temporarily disable without deleting, use cron_toggle instead."
),
parameters=types.Schema(
type=types.Type.OBJECT,
properties={
"cron_id": types.Schema(
type=types.Type.STRING,
description="Job ID (e.g. c_abc123) — get from cron_list",
),
},
required=["cron_id"],
),
)
_cron_toggle_declaration = types.FunctionDeclaration(
name="cron_toggle",
description=(
"Pause a running cron job, or resume a paused one. "
"The job stays in the list and can be re-enabled later. "
"Use cron_list to see current enabled/paused state."
),
parameters=types.Schema(
type=types.Type.OBJECT,
properties={
"cron_id": types.Schema(
type=types.Type.STRING,
description="Job ID (e.g. c_abc123) — get from cron_list",
),
},
required=["cron_id"],
),
)
_reminders_add_declaration = types.FunctionDeclaration(
name="reminders_add",
description=(
"Add a new reminder to REMINDERS.md. Reminders are automatically surfaced "
"in your context at the start of each session (Tier 2+). "
"Use this when the user asks you to remember something, follow up on something, "
"or surface a note at the next session."
),
parameters=types.Schema(
type=types.Type.OBJECT,
properties={
"text": types.Schema(
type=types.Type.STRING,
description="The reminder text to add",
),
"label": types.Schema(
type=types.Type.STRING,
description="Optional heading for this reminder (e.g. 'Follow up on NC Talk'). Defaults to current timestamp.",
),
},
required=["text"],
),
)
_reminders_list_declaration = types.FunctionDeclaration(
name="reminders_list",
description=(
"Read all current pending reminders from REMINDERS.md. "
"Use this to check what reminders are queued before adding duplicates, "
"or to show the user what's pending."
),
parameters=types.Schema(type=types.Type.OBJECT, properties={}),
)
_reminders_clear_declaration = types.FunctionDeclaration(
name="reminders_clear",
description=(
"Erase all pending reminders from REMINDERS.md. "
"Use this after you have acknowledged and acted on the reminders shown in your context."
),
parameters=types.Schema(type=types.Type.OBJECT, properties={}),
)
_scratch_read_declaration = types.FunctionDeclaration(
name="scratch_read",
description=(
"Read the full contents of the scratchpad. "
"Use this to recall working notes, mid-task context, or anything previously jotted down. "
"The scratchpad is transient — nothing here is distilled or archived."
),
parameters=types.Schema(type=types.Type.OBJECT, properties={}),
)
_scratch_write_declaration = types.FunctionDeclaration(
name="scratch_write",
description=(
"Replace the entire scratchpad with new content. "
"Use this to set a clean working note, replacing whatever was there before. "
"For adding without replacing, use scratch_append instead."
),
parameters=types.Schema(
type=types.Type.OBJECT,
properties={
"content": types.Schema(
type=types.Type.STRING,
description="The new scratchpad content (markdown supported)",
),
},
required=["content"],
),
)
_scratch_append_declaration = types.FunctionDeclaration(
name="scratch_append",
description=(
"Add a new section to the bottom of the scratchpad without replacing existing content. "
"Each section gets a timestamp heading unless you supply one."
),
parameters=types.Schema(
type=types.Type.OBJECT,
properties={
"content": types.Schema(
type=types.Type.STRING,
description="The content to append (markdown supported)",
),
"heading": types.Schema(
type=types.Type.STRING,
description="Optional section heading. Defaults to current UTC timestamp.",
),
},
required=["content"],
),
)
_scratch_clear_declaration = types.FunctionDeclaration(
name="scratch_clear",
description="Erase everything in the scratchpad. Use when the working notes are no longer needed.",
parameters=types.Schema(type=types.Type.OBJECT, properties={}),
)
_cortex_restart_declaration = types.FunctionDeclaration(
name="cortex_restart",
description=(
"Restart the Cortex service via systemd. Schedules a restart 5 seconds from now. "
"The current connection will drop — inform the user to refresh the page. "
"Use after config changes, memory edits, or when the service needs a fresh start. "
"ADMIN ONLY."
),
parameters=types.Schema(type=types.Type.OBJECT, properties={}),
)
_cortex_logs_declaration = types.FunctionDeclaration(
name="cortex_logs",
description=(
"Fetch recent lines from the Cortex systemd service journal. "
"Use for debugging errors, checking startup status, or reviewing recent activity. "
"ADMIN ONLY."
),
parameters=types.Schema(
type=types.Type.OBJECT,
properties={
"lines": types.Schema(
type=types.Type.INTEGER,
description="Number of log lines to return (default 50, max 200)",
),
},
),
)
_http_fetch_declaration = types.FunctionDeclaration(
name="http_fetch",
description=(
"Fetch a specific URL and return the response. Unlike web_search, this hits "
"a direct URL — useful for health checks, JSON API endpoints, webhook testing, "
"or reading a specific page when you already know the URL. "
"Response body is capped at 8 KB."
),
parameters=types.Schema(
type=types.Type.OBJECT,
properties={
"url": types.Schema(type=types.Type.STRING, description="Full URL to fetch"),
"method": types.Schema(
type=types.Type.STRING,
description="HTTP method: GET (default), POST, HEAD",
),
"body": types.Schema(
type=types.Type.STRING,
description="Optional request body (for POST requests)",
),
"timeout": types.Schema(
type=types.Type.INTEGER,
description="Request timeout in seconds (default 15, max 60)",
),
},
required=["url"],
),
)
_file_list_declaration = types.FunctionDeclaration(
name="file_list",
description=(
"List the files and subdirectories in a directory. "
"Allowed paths: ~/agents_sync/, ~/OSIT_dev/, ~/DgrZone_Nextcloud/, ~/OSIT_Nextcloud/. "
"ADMIN ONLY."
),
parameters=types.Schema(
type=types.Type.OBJECT,
properties={
"path": types.Schema(
type=types.Type.STRING,
description="Absolute or home-relative path to the directory",
),
},
required=["path"],
),
)
_file_write_declaration = types.FunctionDeclaration(
name="file_write",
description=(
"Write or append content to a file. "
"Write-allowed paths: ~/agents_sync/ and the Cortex home/ directory. "
"Creates parent directories if needed. "
"ADMIN ONLY. Requires user confirmation before executing."
),
parameters=types.Schema(
type=types.Type.OBJECT,
properties={
"path": types.Schema(
type=types.Type.STRING,
description="Absolute or home-relative path to write to",
),
"content": types.Schema(
type=types.Type.STRING,
description="Content to write",
),
"mode": types.Schema(
type=types.Type.STRING,
description="'overwrite' (default, replaces file) or 'append' (adds to end)",
),
},
required=["path", "content"],
),
)
_email_send_declaration = types.FunctionDeclaration(
name="email_send",
description=(
"Send an email from the server's configured SMTP account. Use for delivering "
"summaries, reports, reminders, or any content the user wants emailed. "
"body is plain text; newlines are preserved."
),
parameters=types.Schema(
type=types.Type.OBJECT,
properties={
"to": types.Schema(type=types.Type.STRING, description="Recipient email address"),
"subject": types.Schema(type=types.Type.STRING, description="Email subject line"),
"body": types.Schema(type=types.Type.STRING, description="Plain-text email body"),
},
required=["to", "subject", "body"],
),
)
_nc_talk_send_declaration = types.FunctionDeclaration(
name="nc_talk_send",
description=(
"Send a proactive message to the user via their configured notification channel "
"(Nextcloud Talk by default). Use this to notify the user of completed background "
"tasks, important events, or anything they should know between sessions. "
"Requires notification_channel and notification_room set in channels.json."
),
parameters=types.Schema(
type=types.Type.OBJECT,
properties={
"message": types.Schema(
type=types.Type.STRING,
description="The message to send to the user",
),
},
required=["message"],
),
)
# ---------------------------------------------------------------------------
# Role-based access control
# ---------------------------------------------------------------------------
# Minimum role required to use each tool. Unlisted tools default to "user".
TOOL_ROLES: dict[str, str] = {
# Admin-only — system-level or broad filesystem access
"shell_exec": "admin",
"claude_allow_dir": "admin",
"cortex_restart": "admin",
"cortex_logs": "admin",
"file_read": "admin",
"file_list": "admin",
"file_write": "admin",
"ae_task_list": "admin", # reads agents_sync kanban
"email_send": "admin", # sends from server SMTP account
"nc_talk_send": "admin",
}
# Tools that require explicit user confirmation before executing.
# The orchestrator injects a CONFIRMATION_REQUIRED result instead of calling
# the tool, prompting Claude to ask the user to confirm in a follow-up message.
CONFIRM_REQUIRED: set[str] = {
"cortex_restart",
"file_write",
"shell_exec",
"cron_remove",
"reminders_clear",
}
_ROLE_RANK: dict[str, int] = {"user": 0, "admin": 1}
def _role_allowed(tool_name: str, role: str) -> bool:
required = TOOL_ROLES.get(tool_name, "user")
return _ROLE_RANK.get(role, 0) >= _ROLE_RANK.get(required, 0)
# Flat list of all declarations — single source of truth for both Gemini and OpenAI formats.
_ALL_DECLARATIONS: list[types.FunctionDeclaration] = [
_web_search_declaration,
_ae_journal_list_declaration,
_ae_journal_search_declaration,
_ae_journal_entry_create_declaration,
_ae_journal_entry_update_declaration,
_ae_journal_entry_disable_declaration,
_ae_journal_entry_append_declaration,
_ae_journal_entry_prepend_declaration,
_ae_task_list_declaration,
_file_read_declaration,
_file_list_declaration,
_file_write_declaration,
_claude_allow_dir_declaration,
_shell_exec_declaration,
_cortex_restart_declaration,
_cortex_logs_declaration,
_http_fetch_declaration,
_email_send_declaration,
_nc_talk_send_declaration,
_task_list_declaration,
_task_create_declaration,
_task_update_declaration,
_task_complete_declaration,
_cron_list_declaration,
_cron_add_declaration,
_cron_remove_declaration,
_cron_toggle_declaration,
_reminders_add_declaration,
_reminders_list_declaration,
_reminders_clear_declaration,
_scratch_read_declaration,
_scratch_write_declaration,
_scratch_append_declaration,
_scratch_clear_declaration,
]
# Full Gemini Tool object (all tools — use get_tools_for_role() in production)
TOOL_DECLARATIONS = [types.Tool(function_declarations=_ALL_DECLARATIONS)]
async def call_tool(name: str, args: dict, callables: dict | None = None) -> str:
"""Dispatch a tool call by name. Returns result as a string.
Pass `callables` (from get_tools_for_role) to enforce role restrictions.
Falls back to the full _CALLABLES dict if omitted.
"""
dispatch = callables if callables is not None else _CALLABLES
fn = dispatch.get(name)
if fn is None:
return f"Tool not available or access denied: {name}"
return await fn(**args)
# ---------------------------------------------------------------------------
# OpenAI JSON Schema format — auto-derived from the Gemini declarations above
# so there is a single source of truth for tool definitions.
# ---------------------------------------------------------------------------
_GEMINI_TYPE_TO_JSON = {
"OBJECT": "object",
"STRING": "string",
"INTEGER": "integer",
"NUMBER": "number",
"BOOLEAN": "boolean",
"ARRAY": "array",
}
def _schema_to_json(schema) -> dict:
"""Recursively convert a Gemini types.Schema to a JSON Schema dict."""
type_name = getattr(getattr(schema, "type", None), "name", "STRING")
result: dict = {"type": _GEMINI_TYPE_TO_JSON.get(type_name, "string")}
if getattr(schema, "description", None):
result["description"] = schema.description
props = getattr(schema, "properties", None) or {}
if result["type"] == "object":
result["properties"] = {k: _schema_to_json(v) for k, v in props.items()}
req = getattr(schema, "required", None)
if req:
result["required"] = list(req)
return result
def _build_openai_tools() -> list[dict]:
"""Convert _ALL_DECLARATIONS (Gemini format) to OpenAI tool schemas."""
out = []
for decl in _ALL_DECLARATIONS:
params = (
_schema_to_json(decl.parameters)
if decl.parameters
else {"type": "object", "properties": {}}
)
out.append({
"type": "function",
"function": {
"name": decl.name,
"description": decl.description or "",
"parameters": params,
},
})
return out
# OpenAI-format tool list — all tools (use get_openai_tools_for_role() in production)
OPENAI_TOOL_SCHEMAS: list[dict] = _build_openai_tools()
# ---------------------------------------------------------------------------
# Role-filtered tool access
# ---------------------------------------------------------------------------
def get_tools_for_role(role: str) -> tuple[list, dict]:
"""Return (gemini_tool_declarations, callables_dict) filtered to tools the role can use.
Usage in orchestrator:
tool_declarations, tool_callables = get_tools_for_role(user_role)
"""
allowed = {name for name in _CALLABLES if _role_allowed(name, role)}
decls = [d for d in _ALL_DECLARATIONS if d.name in allowed]
callables = {k: v for k, v in _CALLABLES.items() if k in allowed}
return [types.Tool(function_declarations=decls)], callables
def get_openai_tools_for_role(role: str) -> list[dict]:
"""Return OpenAI tool schemas filtered to tools the role can use."""
allowed = {name for name in _CALLABLES if _role_allowed(name, role)}
return [t for t in OPENAI_TOOL_SCHEMAS if t["function"]["name"] in allowed]