Files
Cortex-Inara/cortex/tools/__init__.py
Scott Idem ed472ce9a0 feat: Intelligence Layer Phase 1 — orchestrator service
Adds the Gemini API orchestrator (ReAct tool loop → Claude responder):

Orchestrator engine + router:
- orchestrator_engine.py: Gemini API tool loop, Claude CLI handoff
- routers/orchestrator.py: POST /orchestrate (async job queue), GET /orchestrate/{job_id}

Tools (cortex/tools/):
- web.py: DuckDuckGo web search (no key required)
- ae_knowledge.py: ae_journal_search + ae_journal_entry_create (AE V3 API)
- ae_tasks.py: ae_task_list (reads agents_sync Kanban filesystem)
- files.py: file_read (path-allowlisted to safe dirs)

Config + deps:
- config.py: orchestrator, DuckDuckGo, and AE API settings
- requirements.txt: google-genai, duckduckgo-search
- .env.default: reference config with all new keys documented

Docs:
- CLAUDE.md, README.md, documentation/ added to repo
- Port references updated 7331 → 8000 throughout
- Default model updated to gemini-2.5-flash

Tested: ae_task_list, ae_journal_search, web_search all working end-to-end.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-18 19:37:49 -04:00

194 lines
6.7 KiB
Python

"""
Orchestrator tool registry.
Each tool has two parts:
1. A Gemini FunctionDeclaration — tells the model what the tool does and what args it takes
2. A Python async callable — the actual implementation
To add a new tool:
1. Implement it in a tools/<domain>.py module
2. Import it here and add (declaration, callable) to _REGISTRY
3. Add a FunctionDeclaration below and include it in TOOL_DECLARATIONS
IMPORTANT: These tools are separate from the ae_* MCP tools used by the fleet agents.
Do not modify the ae_* MCP server to support orchestrator needs.
"""
from google.genai import types
from tools.web import search as _web_search
from tools.ae_knowledge import journal_search as _ae_journal_search
from tools.ae_knowledge import journal_entry_create as _ae_journal_entry_create
from tools.ae_tasks import task_list as _ae_task_list
from tools.files import file_read as _file_read
# ---------------------------------------------------------------------------
# Gemini function declarations
# ---------------------------------------------------------------------------
_web_search_declaration = types.FunctionDeclaration(
name="web_search",
description=(
"Search the web for current information. Use this when you need up-to-date "
"facts, news, documentation, or anything not in your training data."
),
parameters=types.Schema(
type=types.Type.OBJECT,
properties={
"query": types.Schema(
type=types.Type.STRING,
description="The search query string",
),
"max_results": types.Schema(
type=types.Type.INTEGER,
description="Number of results to return (default 5, max 10)",
),
},
required=["query"],
),
)
_ae_journal_search_declaration = types.FunctionDeclaration(
name="ae_journal_search",
description=(
"Search the Aether Journals knowledge base by keyword. "
"Use this to look up notes, documentation, meeting summaries, or any saved knowledge. "
"Always search before creating a new entry to avoid duplicates."
),
parameters=types.Schema(
type=types.Type.OBJECT,
properties={
"query": types.Schema(
type=types.Type.STRING,
description="Keyword or phrase to search for",
),
"journal_id": types.Schema(
type=types.Type.STRING,
description=(
"Optional: scope search to a specific journal by its id_random. "
"Omit to search all journals."
),
),
"max_results": types.Schema(
type=types.Type.INTEGER,
description="Maximum number of entries to return (default 10)",
),
},
required=["query"],
),
)
_ae_journal_entry_create_declaration = types.FunctionDeclaration(
name="ae_journal_entry_create",
description=(
"Create a new entry in an Aether Journal. "
"Use this to save notes, summaries, or any content the user wants to store. "
"Always call ae_journal_search first to check for existing entries on the same topic."
),
parameters=types.Schema(
type=types.Type.OBJECT,
properties={
"journal_id": types.Schema(
type=types.Type.STRING,
description=(
"The id_random of the target journal. "
"Ask the user which journal to write to if not specified."
),
),
"title": types.Schema(
type=types.Type.STRING,
description="Entry title",
),
"content": types.Schema(
type=types.Type.STRING,
description="Full entry content (markdown supported)",
),
"summary": types.Schema(
type=types.Type.STRING,
description="Optional short summary (1-2 sentences)",
),
"tags": types.Schema(
type=types.Type.STRING,
description="Optional comma-separated tags (e.g. 'wireguard, networking, homelab')",
),
},
required=["journal_id", "title", "content"],
),
)
_ae_task_list_declaration = types.FunctionDeclaration(
name="ae_task_list",
description=(
"List tasks from the agents_sync Kanban board (todo and in-progress). "
"Use this when asked about current work, pending tasks, or project status."
),
parameters=types.Schema(
type=types.Type.OBJECT,
properties={
"include_done": types.Schema(
type=types.Type.BOOLEAN,
description="If true, also include completed tasks (default false)",
),
},
),
)
_file_read_declaration = types.FunctionDeclaration(
name="file_read",
description=(
"Read a local file and return its contents. "
"Allowed directories: ~/agents_sync/, ~/OSIT_dev/, ~/DgrZone_Nextcloud/, ~/OSIT_Nextcloud/. "
"Use this to read documentation, notes, CLAUDE.md files, or config references. "
"If given a directory path, returns a directory listing instead."
),
parameters=types.Schema(
type=types.Type.OBJECT,
properties={
"path": types.Schema(
type=types.Type.STRING,
description=(
"Absolute or home-relative path to the file "
"(e.g. ~/agents_sync/CLAUDE.md or /home/scott/agents_sync/tasks/01_todo/)"
),
),
"max_lines": types.Schema(
type=types.Type.INTEGER,
description="Optional line limit (default 500)",
),
},
required=["path"],
),
)
# ---------------------------------------------------------------------------
# Registry: maps tool name → async callable
# ---------------------------------------------------------------------------
_CALLABLES: dict[str, callable] = {
"web_search": _web_search,
"ae_journal_search": _ae_journal_search,
"ae_journal_entry_create": _ae_journal_entry_create,
"ae_task_list": _ae_task_list,
"file_read": _file_read,
}
# Gemini Tool object — pass this to GenerateContentConfig
TOOL_DECLARATIONS = [
types.Tool(function_declarations=[
_web_search_declaration,
_ae_journal_search_declaration,
_ae_journal_entry_create_declaration,
_ae_task_list_declaration,
_file_read_declaration,
])
]
async def call_tool(name: str, args: dict) -> str:
"""Dispatch a tool call by name. Returns result as a string."""
fn = _CALLABLES.get(name)
if fn is None:
return f"Unknown tool: {name}"
return await fn(**args)