Files
Cortex-Inara/cortex/tools/__init__.py
Scott Idem 584ae679a6 feat: tool call audit log
Every orchestrator tool invocation is recorded to home/{user}/tool_audit/YYYY-MM-DD.jsonl.
Each entry captures: timestamp, user, tool, args (truncated), status (ok/error/denied),
result length, and a 300-char result snippet.

- tool_audit.py: JSONL writer with per-file asyncio locks; read_recent / read_recent_all_users helpers
- tools/__init__.py: hook in call_tool() — fire-and-forget record on every dispatch
- routers/audit.py: GET /api/audit/recent and /api/audit/stats (admin-only)
- tools/files.py: add home_root() to file_read allowed roots so agents can read audit JSONL

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-05 19:55:59 -04:00

322 lines
13 KiB
Python

"""
Orchestrator tool registry.
Declarations live in each domain module alongside their callables.
This file assembles them into the unified registry used by both engines.
To add a new tool:
1. Implement it in tools/<domain>.py — add the async callable + append to DECLARATIONS
2. Import the callable here and add it to _CALLABLES
3. If admin-only, add it to TOOL_ROLES; if confirmation needed, add to CONFIRM_REQUIRED
IMPORTANT: These tools are separate from the ae_* MCP tools used by the fleet agents.
Do not modify the ae_* MCP server to support orchestrator needs.
"""
from google.genai import types
# ── Callable imports ──────────────────────────────────────────────────────────
from tools.web import search as _web_search, http_fetch as _http_fetch
from tools.ae_knowledge import (
journal_list as _ae_journal_list,
journal_search as _ae_journal_search,
journal_entry_read as _ae_journal_entry_read,
journal_entries_list as _ae_journal_entries_list,
journal_entry_create as _ae_journal_entry_create,
journal_entry_update as _ae_journal_entry_update,
journal_entry_disable as _ae_journal_entry_disable,
journal_entry_append as _ae_journal_entry_append,
journal_entry_prepend as _ae_journal_entry_prepend,
)
from tools.ae_tasks import task_list as _ae_task_list
from tools.files import file_read as _file_read, file_list as _file_list, file_write as _file_write
from tools.system import (
shell_exec as _shell_exec,
claude_allow_dir as _claude_allow_dir,
cortex_restart as _cortex_restart,
cortex_logs as _cortex_logs,
cortex_status as _cortex_status,
cortex_update as _cortex_update,
)
from tools.tasks import (
task_list as _task_list,
task_create as _task_create,
task_update as _task_update,
task_complete as _task_complete,
)
from tools.cron import (
cron_list as _cron_list,
cron_add as _cron_add,
cron_remove as _cron_remove,
cron_toggle as _cron_toggle,
)
from tools.reminders import (
reminders_add as _reminders_add,
reminders_list as _reminders_list,
reminders_remove as _reminders_remove,
reminders_clear as _reminders_clear,
)
from tools.scratch import (
scratch_read as _scratch_read,
scratch_write as _scratch_write,
scratch_append as _scratch_append,
scratch_clear as _scratch_clear,
)
from tools.notify import nc_talk_send as _nc_talk_send, email_send as _email_send, web_push as _web_push
# ── Declaration imports ───────────────────────────────────────────────────────
import tools.web as _mod_web
import tools.ae_knowledge as _mod_ae_knowledge
import tools.ae_tasks as _mod_ae_tasks
import tools.files as _mod_files
import tools.system as _mod_system
import tools.tasks as _mod_tasks
import tools.cron as _mod_cron
import tools.reminders as _mod_reminders
import tools.scratch as _mod_scratch
import tools.notify as _mod_notify
# ── Tool categories — used by the Model Registry UI for grouped checkboxes ───
TOOL_CATEGORIES: dict[str, list[str]] = {
"Web": ["web_search", "http_fetch"],
"Files": ["file_read", "file_list", "file_write"],
"Shell": ["shell_exec", "claude_allow_dir"],
"System": ["cortex_restart", "cortex_logs", "cortex_status", "cortex_update"],
"Tasks": ["task_list", "task_create", "task_update", "task_complete"],
"Cron": ["cron_list", "cron_add", "cron_remove", "cron_toggle"],
"Reminders": ["reminders_add", "reminders_list", "reminders_remove", "reminders_clear"],
"Scratchpad": ["scratch_read", "scratch_write", "scratch_append", "scratch_clear"],
"Notifications": ["web_push", "email_send", "nc_talk_send"],
"Aether Journals": [
"ae_journal_list", "ae_journal_search",
"ae_journal_entries_list", "ae_journal_entry_read",
"ae_journal_entry_create", "ae_journal_entry_update",
"ae_journal_entry_disable", "ae_journal_entry_append",
"ae_journal_entry_prepend",
],
"Aether Tasks": ["ae_task_list"],
}
# ── Callable registry ─────────────────────────────────────────────────────────
_CALLABLES: dict[str, callable] = {
"web_search": _web_search,
"http_fetch": _http_fetch,
"ae_journal_list": _ae_journal_list,
"ae_journal_search": _ae_journal_search,
"ae_journal_entry_read": _ae_journal_entry_read,
"ae_journal_entries_list": _ae_journal_entries_list,
"ae_journal_entry_create": _ae_journal_entry_create,
"ae_journal_entry_update": _ae_journal_entry_update,
"ae_journal_entry_disable": _ae_journal_entry_disable,
"ae_journal_entry_append": _ae_journal_entry_append,
"ae_journal_entry_prepend": _ae_journal_entry_prepend,
"ae_task_list": _ae_task_list,
"file_read": _file_read,
"file_list": _file_list,
"file_write": _file_write,
"shell_exec": _shell_exec,
"claude_allow_dir": _claude_allow_dir,
"cortex_restart": _cortex_restart,
"cortex_logs": _cortex_logs,
"cortex_status": _cortex_status,
"cortex_update": _cortex_update,
"task_list": _task_list,
"task_create": _task_create,
"task_update": _task_update,
"task_complete": _task_complete,
"cron_list": _cron_list,
"cron_add": _cron_add,
"cron_remove": _cron_remove,
"cron_toggle": _cron_toggle,
"reminders_add": _reminders_add,
"reminders_list": _reminders_list,
"reminders_remove": _reminders_remove,
"reminders_clear": _reminders_clear,
"scratch_read": _scratch_read,
"scratch_write": _scratch_write,
"scratch_append": _scratch_append,
"scratch_clear": _scratch_clear,
"email_send": _email_send,
"nc_talk_send": _nc_talk_send,
"web_push": _web_push,
}
# ── Role-based access control ─────────────────────────────────────────────────
# Minimum role required to use each tool. Unlisted tools default to "user".
TOOL_ROLES: dict[str, str] = {
"shell_exec": "admin",
"claude_allow_dir": "admin",
"cortex_restart": "admin",
"cortex_logs": "admin",
"cortex_status": "admin",
"cortex_update": "admin",
"file_read": "admin",
"file_list": "admin",
"file_write": "admin",
"ae_task_list": "admin",
"email_send": "admin",
"nc_talk_send": "admin",
}
# Tools that require explicit user confirmation before executing.
CONFIRM_REQUIRED: set[str] = {
"cortex_restart",
"cortex_update",
"file_write",
"shell_exec",
"cron_remove",
"reminders_clear",
}
_ROLE_RANK: dict[str, int] = {"user": 0, "admin": 1}
def _role_allowed(tool_name: str, role: str) -> bool:
required = TOOL_ROLES.get(tool_name, "user")
return _ROLE_RANK.get(role, 0) >= _ROLE_RANK.get(required, 0)
# ── Declaration assembly ──────────────────────────────────────────────────────
_ALL_DECLARATIONS: list[types.FunctionDeclaration] = (
_mod_web.DECLARATIONS
+ _mod_files.DECLARATIONS
+ _mod_system.DECLARATIONS
+ _mod_tasks.DECLARATIONS
+ _mod_cron.DECLARATIONS
+ _mod_reminders.DECLARATIONS
+ _mod_scratch.DECLARATIONS
+ _mod_notify.DECLARATIONS
+ _mod_ae_knowledge.DECLARATIONS
+ _mod_ae_tasks.DECLARATIONS
)
# Full Gemini Tool object (all tools — use get_tools_for_role() in production)
TOOL_DECLARATIONS = [types.Tool(function_declarations=_ALL_DECLARATIONS)]
# ── Tool dispatch ─────────────────────────────────────────────────────────────
async def call_tool(name: str, args: dict, callables: dict | None = None) -> str:
"""Dispatch a tool call by name. Returns result as a string.
Pass `callables` (from get_tools_for_role) to enforce role restrictions.
Falls back to the full _CALLABLES dict if omitted.
Every call is recorded to the tool audit log (tool_audit.py).
"""
import asyncio
import tool_audit
from persona import get_user
user = get_user() or "unknown"
dispatch = callables if callables is not None else _CALLABLES
fn = dispatch.get(name)
if fn is None:
asyncio.create_task(tool_audit.record(user, name, args, "denied"))
return f"Tool not available or access denied: {name}"
try:
result = await fn(**args)
asyncio.create_task(tool_audit.record(user, name, args, "ok", result))
return result
except Exception as e:
asyncio.create_task(tool_audit.record(user, name, args, "error", str(e)))
raise
# ── OpenAI JSON Schema conversion ────────────────────────────────────────────
_GEMINI_TYPE_TO_JSON = {
"OBJECT": "object",
"STRING": "string",
"INTEGER": "integer",
"NUMBER": "number",
"BOOLEAN": "boolean",
"ARRAY": "array",
}
def _schema_to_json(schema) -> dict:
"""Recursively convert a Gemini types.Schema to a JSON Schema dict."""
type_name = getattr(getattr(schema, "type", None), "name", "STRING")
result: dict = {"type": _GEMINI_TYPE_TO_JSON.get(type_name, "string")}
if getattr(schema, "description", None):
result["description"] = schema.description
props = getattr(schema, "properties", None) or {}
if result["type"] == "object":
result["properties"] = {k: _schema_to_json(v) for k, v in props.items()}
req = getattr(schema, "required", None)
if req:
result["required"] = list(req)
return result
def _build_openai_tools() -> list[dict]:
out = []
for decl in _ALL_DECLARATIONS:
params = (
_schema_to_json(decl.parameters)
if decl.parameters
else {"type": "object", "properties": {}}
)
out.append({
"type": "function",
"function": {
"name": decl.name,
"description": decl.description or "",
"parameters": params,
},
})
return out
# OpenAI-format tool list — all tools (use get_openai_tools_for_role() in production)
OPENAI_TOOL_SCHEMAS: list[dict] = _build_openai_tools()
# ── Role-filtered tool access ─────────────────────────────────────────────────
def get_tools_for_role(
role: str,
tool_list: list[str] | None = None,
) -> tuple[list, dict]:
"""Return (gemini_tool_declarations, callables_dict) filtered to tools the role can use.
role — user access level ("user" | "admin"); gates admin-only tools
tool_list — optional explicit allow-list from role config (e.g. coder role);
intersected with the access-level filter so it can only restrict,
never elevate privileges
"""
allowed = {name for name in _CALLABLES if _role_allowed(name, role)}
if tool_list is not None:
allowed &= set(tool_list)
decls = [d for d in _ALL_DECLARATIONS if d.name in allowed]
callables = {k: v for k, v in _CALLABLES.items() if k in allowed}
return [types.Tool(function_declarations=decls)], callables
def get_openai_tools_for_role(
role: str,
tool_list: list[str] | None = None,
) -> list[dict]:
"""Return OpenAI tool schemas filtered to tools the role can use.
role — user access level ("user" | "admin")
tool_list — optional explicit allow-list from role config
"""
allowed = {name for name in _CALLABLES if _role_allowed(name, role)}
if tool_list is not None:
allowed &= set(tool_list)
return [t for t in OPENAI_TOOL_SCHEMAS if t["function"]["name"] in allowed]