Files
Cortex-Inara/cortex/orchestrator_engine.py
Scott Idem 334e7f0dea feat: role-based tool access, confirmation gates, and new orchestrator tools
- auth_utils: get_user_role() reads role from auth.json (admin|user, default user)
- manage_passwords: new `role` command to promote/demote users (admin-only by convention)
- tools/__init__: TOOL_ROLES map, CONFIRM_REQUIRED set, get_tools_for_role(),
  get_openai_tools_for_role() — both orchestrators now filter tools by caller's role
- tools/system: cortex_restart (detached subprocess, 5s delay), cortex_logs (admin-only)
- tools/web: http_fetch — direct URL fetch, distinct from web_search
- tools/files: file_list (directory listing), file_write (restricted paths, admin-only)
- tools/notify: nc_talk_send — proactive outbound via notification.py
- orchestrator_engine + openai_orchestrator: user_role param; CONFIRM_REQUIRED tools
  return a confirmation-request result instead of executing — loop breaks after Claude
  asks user to confirm in a follow-up message
- home/scott/auth.json: role set to admin

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-29 19:23:53 -04:00

285 lines
11 KiB
Python

"""
Orchestrator engine — two-brain architecture.
Flow:
1. Gemini API runs a ReAct tool loop (reason → act → observe → repeat)
2. When Gemini has gathered enough context, it produces a final summary
3. That enriched context is handed off to Claude for the user-facing response
Why this split:
- Gemini API has native structured tool calling (Gemini CLI subprocess does not)
- Claude produces higher-quality user-facing prose and reasoning
- Claude Pro subscription has no API cost; Gemini free tier handles orchestration load
For direct chat (no tools needed), this engine is not invoked — the chat router
calls llm_client.complete() directly, which is faster and has no orchestration overhead.
"""
import asyncio
import json
import logging
from dataclasses import dataclass, field
from google import genai
from google.genai import types
from config import settings
from llm_client import complete
from tools import TOOL_DECLARATIONS, call_tool, get_tools_for_role, CONFIRM_REQUIRED
logger = logging.getLogger(__name__)
# System prompt given to Gemini during the tool loop.
# Gemini's job is information gathering and planning — NOT writing the final response.
_ORCHESTRATOR_SYSTEM = """You are an intelligent orchestrator. Your job is to:
1. Understand the user's request
2. Call tools to gather the information needed to answer it
3. Once you have enough information, produce a concise summary of:
- What the user asked
- What you found (tool results, key facts)
- Any important context that would help generate a good answer
Do NOT write a polished final answer — a human-facing AI will do that next.
Keep your summary factual and complete. Include relevant URLs, data, and specifics.
If no tools are needed, return an empty summary."""
@dataclass
class OrchestratorResult:
response: str # final user-facing response (from Claude)
tool_calls: list[dict] = field(default_factory=list) # [{tool, args, result}]
backend: str = "claude" # model that produced the final response
gemini_summary: str = "" # what Gemini handed to Claude (debug/display)
async def run(
task: str,
system_prompt: str = "",
session_messages: list[dict] | None = None,
respond_with_claude: bool = True,
gemini_api_key: str | None = None,
model_name: str | None = None,
response_role: str = "chat",
user_role: str = "user",
) -> OrchestratorResult:
"""
Run the full orchestration loop for a task.
Args:
task: The user's request (plain text)
system_prompt: Inara's system prompt (from context_loader) — passed to Claude
session_messages: Prior conversation history for session continuity
respond_with_claude: If False, return Gemini's summary as the response (useful for
background/cron tasks where a polished reply isn't needed)
gemini_api_key: Per-user Gemini API key (falls back to GEMINI_API_KEY in .env)
Returns:
OrchestratorResult with response, tool call log, backend used, and Gemini summary
"""
api_key = gemini_api_key or settings.gemini_api_key
if not api_key:
raise RuntimeError(
"No Gemini API key available — set GEMINI_API_KEY in .env or add a personal key "
"via: manage_passwords.py gemini-key <username> <key>"
)
client = genai.Client(api_key=api_key)
# Seed Gemini with the task — include recent session context if available
task_with_context = _build_task_prompt(task, session_messages)
contents: list[types.Content] = [
types.Content(role="user", parts=[types.Part(text=task_with_context)])
]
tool_declarations, tool_callables = get_tools_for_role(user_role)
tool_call_log: list[dict] = []
gemini_summary = ""
# --- ReAct tool loop ---
for round_num in range(settings.orchestrator_max_rounds):
logger.info("Orchestrator round %d for task: %.80s", round_num + 1, task)
response = await asyncio.to_thread(
client.models.generate_content,
model=model_name or settings.orchestrator_model,
contents=contents,
config=types.GenerateContentConfig(
tools=tool_declarations,
system_instruction=_ORCHESTRATOR_SYSTEM,
),
)
candidate = response.candidates[0]
parts = candidate.content.parts if candidate.content else []
# Check if Gemini wants to call any tools
tool_call_parts = [
p for p in parts
if hasattr(p, "function_call") and p.function_call and p.function_call.name
]
if not tool_call_parts:
# No more tool calls — extract Gemini's text summary
gemini_summary = "".join(
p.text for p in parts if hasattr(p, "text") and p.text
).strip()
logger.info("Orchestrator done after %d round(s). Tools used: %d",
round_num + 1, len(tool_call_log))
break
# Add Gemini's response (with function calls) to the conversation
contents.append(candidate.content)
# Execute tool calls — check confirmation requirement before calling
response_parts: list[types.Part] = []
confirm_requested = False
for fc_part in tool_call_parts:
fc = fc_part.function_call
name = fc.name
args = dict(fc.args)
if name in CONFIRM_REQUIRED:
args_str = json.dumps(args, indent=2) if args else "(no arguments)"
result_str = (
f"⚠️ CONFIRMATION REQUIRED ⚠️\n"
f"Tool: {name}\nArguments:\n{args_str}\n\n"
f"Do NOT call this tool again. Tell the user exactly what you were "
f"about to do, explain the potential impact, and ask them to confirm "
f"by sending a follow-up message before you proceed."
)
confirm_requested = True
logger.info("Tool %s blocked — confirmation required", name)
else:
result_str = await _execute_tool(name, args, tool_callables)
logger.info("Tool %s%d chars", name, len(result_str))
tool_call_log.append({
"tool": name,
"args": args,
"result": "[awaiting confirmation]" if name in CONFIRM_REQUIRED else result_str,
})
response_parts.append(
types.Part(
function_response=types.FunctionResponse(
name=name,
response={"result": result_str},
)
)
)
contents.append(types.Content(role="user", parts=response_parts))
if confirm_requested:
# Allow one more Gemini round to produce the confirmation-request message,
# then break — tool is not executed until user confirms in a follow-up.
conf_response = await asyncio.to_thread(
client.models.generate_content,
model=model_name or settings.orchestrator_model,
contents=contents,
config=types.GenerateContentConfig(
tools=tool_declarations,
system_instruction=_ORCHESTRATOR_SYSTEM,
),
)
conf_parts = (
conf_response.candidates[0].content.parts
if conf_response.candidates and conf_response.candidates[0].content
else []
)
gemini_summary = "".join(
p.text for p in conf_parts if hasattr(p, "text") and p.text
).strip() or "This action requires your explicit confirmation before it can proceed."
break
else:
# Hit the round limit — use whatever Gemini produced last
logger.warning("Orchestrator hit max rounds (%d)", settings.orchestrator_max_rounds)
gemini_summary = (
f"Reached the tool iteration limit ({settings.orchestrator_max_rounds} rounds). "
"Here is what was gathered so far:\n\n"
+ "\n\n".join(f"**{t['tool']}**: {t['result'][:500]}" for t in tool_call_log)
)
# --- Claude handoff ---
if respond_with_claude:
claude_prompt = _build_claude_prompt(task, tool_call_log, gemini_summary)
# Merge with session history so Claude has conversation context
messages = list(session_messages or [])
messages.append({"role": "user", "content": claude_prompt})
response_text, backend = await complete(
system_prompt=system_prompt,
messages=messages,
role=response_role,
)
else:
# Cron/background tasks: return Gemini's summary directly, no Claude call
response_text = gemini_summary or "No information gathered."
backend = "gemini"
return OrchestratorResult(
response=response_text,
tool_calls=tool_call_log,
backend=backend,
gemini_summary=gemini_summary,
)
async def _execute_tool(name: str, args: dict, callables: dict | None = None) -> str:
"""Execute a single tool call, catching all exceptions."""
try:
return await call_tool(name, args, callables)
except Exception as e:
logger.warning("Tool %s failed: %s", name, e)
return f"Tool error: {e}"
def _build_task_prompt(task: str, session_messages: list[dict] | None) -> str:
"""Prepend recent session context so Gemini understands the conversation."""
if not session_messages:
return task
# Include last few turns for context (don't send the full history to keep tokens low)
recent = session_messages[-6:] # last 3 turns
history_lines = []
for msg in recent:
label = "User" if msg["role"] == "user" else "Assistant"
history_lines.append(f"{label}: {msg['content'][:300]}") # truncate long messages
context = "\n".join(history_lines)
return f"<recent_conversation>\n{context}\n</recent_conversation>\n\nCurrent request: {task}"
def _build_claude_prompt(
task: str,
tool_calls: list[dict],
gemini_summary: str,
) -> str:
"""Build the enriched context handed from Gemini to Claude."""
parts = [f"User request: {task}\n"]
if tool_calls:
parts.append("## Research gathered\n")
for tc in tool_calls:
parts.append(f"### {tc['tool']}({_format_args(tc['args'])})")
# Truncate very long results — Claude gets the gist
result = tc["result"]
if len(result) > 2000:
result = result[:2000] + "\n… [truncated]"
parts.append(result)
parts.append("")
if gemini_summary:
parts.append("## Summary of findings\n")
parts.append(gemini_summary)
return "\n".join(parts)
def _format_args(args: dict) -> str:
"""Format tool args as a compact string for display."""
return ", ".join(f"{k}={repr(v)}" for k, v in args.items())