Files
Cortex-Inara/cortex/openai_orchestrator.py
Scott Idem d9a322164a feat: OpenAI-compatible orchestrator + backend auto-routing
- openai_orchestrator.py — new ReAct tool loop engine for any
  OpenAI-compatible endpoint (OpenRouter, Open WebUI, Ollama, LiteLLM);
  model handles both tool loop and final response, no Claude handoff needed
- tools/__init__.py — auto-derive OpenAI JSON Schema from existing Gemini
  FunctionDeclarations so tool definitions have a single source of truth
- routers/orchestrator.py — route to openai_orchestrator when model registry
  "orchestrator" role resolves to a local_openai type host
- routers/chat.py — pass role to _backend_label(); fix fallback_used logic
  (only meaningful for explicit backend overrides, not auto-routing)
- static/app.js — add null/"auto" to backend cycle; fetch local model hint
  without overriding the auto default on page load
- model_registry.py — _normalize() back-fills host_type on old registry files
- requirements.txt — add openai>=1.0.0
- ARCH__BACKENDS.md — document OpenAI-compat backend and routing logic

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-08 19:18:18 -04:00

197 lines
7.3 KiB
Python

"""
OpenAI-compatible orchestrator engine.
Implements the same ReAct tool loop as orchestrator_engine.py but uses the
OpenAI tool calling format, which works with any OpenAI-compatible endpoint:
OpenRouter, LiteLLM, Open WebUI, Ollama (tool-capable models), etc.
The model both runs the tool loop AND writes the final user-facing response —
no separate handoff step needed when a single capable model handles everything.
Flow:
1. POST to {api_url}/chat/completions with tools + user message
2. If finish_reason == "tool_calls": execute tools, feed results back, repeat
3. If finish_reason == "stop": final assistant message is the user-facing response
Used when the "orchestrator" role in the model registry resolves to a local_openai
type model. The Gemini engine (orchestrator_engine.py) is used otherwise.
"""
import asyncio
import json
import logging
from openai import AsyncOpenAI
from config import settings
from orchestrator_engine import OrchestratorResult
from tools import OPENAI_TOOL_SCHEMAS, call_tool
logger = logging.getLogger(__name__)
# Appended to the persona system prompt so the model knows it has tools.
# Kept brief — capable models handle tool use without much coaching.
_TOOL_INSTRUCTION = (
"\n\nYou have access to tools. Use them when you need current information, "
"need to read files, or need to take actions on the user's behalf. "
"Respond naturally after gathering what you need."
)
async def run(
task: str,
system_prompt: str = "",
session_messages: list[dict] | None = None,
model_cfg: dict | None = None,
respond_with_final: bool = True,
) -> OrchestratorResult:
"""
Run a tool-enabled task using an OpenAI-compatible API.
Args:
task: The user's request (plain text)
system_prompt: Persona system prompt from context_loader (passed through)
session_messages: Recent conversation history for session continuity
model_cfg: Resolved model config from model_registry (local_openai type)
respond_with_final: If False, return just the tool-loop summary without a
full persona-voiced response (faster; for cron/background)
Returns:
OrchestratorResult — same shape as the Gemini engine for drop-in compatibility
"""
if not model_cfg:
raise RuntimeError("model_cfg is required for the OpenAI orchestrator")
api_url = model_cfg.get("api_url", "")
api_key = model_cfg.get("api_key", "") or "none"
model_name = model_cfg.get("model_name", "")
if not api_url or not model_name:
raise RuntimeError(
f"model_cfg missing api_url or model_name: {model_cfg.get('label', model_cfg)}"
)
client = AsyncOpenAI(base_url=api_url, api_key=api_key)
# System prompt: persona context + brief tool instruction
sys_content = (system_prompt or "") + _TOOL_INSTRUCTION
# Build messages: [system, ...recent_session, current_task]
messages: list[dict] = [{"role": "system", "content": sys_content}]
if session_messages:
messages.extend(session_messages[-6:]) # last 3 turns for context
messages.append({"role": "user", "content": task})
tool_call_log: list[dict] = []
final_response = ""
for round_num in range(settings.orchestrator_max_rounds):
logger.info("OpenAI orchestrator round %d / %d model=%s",
round_num + 1, settings.orchestrator_max_rounds, model_name)
response = await client.chat.completions.create(
model=model_name,
messages=messages,
tools=OPENAI_TOOL_SCHEMAS,
tool_choice="auto",
)
choice = response.choices[0]
msg = choice.message
# Append the assistant turn (MUST include tool_calls if present so the
# next request is valid — OpenAI requires the full history to be consistent)
assistant_msg: dict = {"role": "assistant"}
if msg.content:
assistant_msg["content"] = msg.content
if msg.tool_calls:
assistant_msg["tool_calls"] = [
{
"id": tc.id,
"type": "function",
"function": {
"name": tc.function.name,
"arguments": tc.function.arguments,
},
}
for tc in msg.tool_calls
]
messages.append(assistant_msg)
if choice.finish_reason == "tool_calls" and msg.tool_calls:
# Execute all tool calls in parallel, then feed results back
tool_tasks = [
_execute_tool(tc.function.name, tc.function.arguments)
for tc in msg.tool_calls
]
results = await asyncio.gather(*tool_tasks, return_exceptions=True)
for tc, result in zip(msg.tool_calls, results):
result_str = (
str(result)
if not isinstance(result, Exception)
else f"Tool error: {result}"
)
logger.info("Tool %s%d chars", tc.function.name, len(result_str))
try:
args_parsed = json.loads(tc.function.arguments)
except json.JSONDecodeError:
args_parsed = {"raw": tc.function.arguments}
tool_call_log.append({
"tool": tc.function.name,
"args": args_parsed,
"result": result_str,
})
# Tool result message — tools array must be re-sent on every request
messages.append({
"role": "tool",
"tool_call_id": tc.id,
"content": result_str,
})
else:
# finish_reason == "stop" (or no tool_calls) — model is done
final_response = msg.content or ""
logger.info(
"OpenAI orchestrator done after %d round(s). Tools used: %d",
round_num + 1, len(tool_call_log),
)
break
else:
# Hit the round limit
logger.warning("OpenAI orchestrator hit max rounds (%d)", settings.orchestrator_max_rounds)
final_response = (
f"Reached the tool iteration limit ({settings.orchestrator_max_rounds} rounds). "
"Here is what was gathered:\n\n"
+ "\n\n".join(
f"**{t['tool']}**: {t['result'][:500]}" for t in tool_call_log
)
)
model_label = model_cfg.get("label") or model_name
logger.info("OpenAI orchestrator complete — model=%s tools=%d", model_label, len(tool_call_log))
return OrchestratorResult(
response=final_response,
tool_calls=tool_call_log,
backend="local",
gemini_summary=final_response, # reused for UI display; same content in single-model mode
)
async def _execute_tool(name: str, arguments_json: str) -> str:
"""Parse tool arguments and execute, returning a string result."""
try:
args = json.loads(arguments_json)
except json.JSONDecodeError:
args = {}
try:
return await call_tool(name, args)
except Exception as e:
logger.warning("Tool %s failed: %s", name, e)
return f"Tool error: {e}"