""" OpenAI-compatible orchestrator engine. Implements the same ReAct tool loop as orchestrator_engine.py but uses the OpenAI tool calling format, which works with any OpenAI-compatible endpoint: OpenRouter, LiteLLM, Open WebUI, Ollama (tool-capable models), etc. The model both runs the tool loop AND writes the final user-facing response — no separate handoff step needed when a single capable model handles everything. Flow: 1. POST to {api_url}/chat/completions with tools + user message 2. If finish_reason == "tool_calls": execute tools, feed results back, repeat 3. If finish_reason == "stop": final assistant message is the user-facing response Used when the "orchestrator" role in the model registry resolves to a local_openai type model. The Gemini engine (orchestrator_engine.py) is used otherwise. """ import asyncio import json import logging from openai import AsyncOpenAI from config import settings from orchestrator_engine import OrchestratorResult from tools import OPENAI_TOOL_SCHEMAS, call_tool logger = logging.getLogger(__name__) # Appended to the persona system prompt so the model knows it has tools. # Kept brief — capable models handle tool use without much coaching. _TOOL_INSTRUCTION = ( "\n\nYou have access to tools. Use them when you need current information, " "need to read files, or need to take actions on the user's behalf. " "Respond naturally after gathering what you need." ) async def run( task: str, system_prompt: str = "", session_messages: list[dict] | None = None, model_cfg: dict | None = None, respond_with_final: bool = True, ) -> OrchestratorResult: """ Run a tool-enabled task using an OpenAI-compatible API. Args: task: The user's request (plain text) system_prompt: Persona system prompt from context_loader (passed through) session_messages: Recent conversation history for session continuity model_cfg: Resolved model config from model_registry (local_openai type) respond_with_final: If False, return just the tool-loop summary without a full persona-voiced response (faster; for cron/background) Returns: OrchestratorResult — same shape as the Gemini engine for drop-in compatibility """ if not model_cfg: raise RuntimeError("model_cfg is required for the OpenAI orchestrator") api_url = model_cfg.get("api_url", "") api_key = model_cfg.get("api_key", "") or "none" model_name = model_cfg.get("model_name", "") if not api_url or not model_name: raise RuntimeError( f"model_cfg missing api_url or model_name: {model_cfg.get('label', model_cfg)}" ) client = AsyncOpenAI(base_url=api_url, api_key=api_key) # System prompt: persona context + brief tool instruction sys_content = (system_prompt or "") + _TOOL_INSTRUCTION # Build messages: [system, ...recent_session, current_task] messages: list[dict] = [{"role": "system", "content": sys_content}] if session_messages: messages.extend(session_messages[-6:]) # last 3 turns for context messages.append({"role": "user", "content": task}) tool_call_log: list[dict] = [] final_response = "" for round_num in range(settings.orchestrator_max_rounds): logger.info("OpenAI orchestrator round %d / %d model=%s", round_num + 1, settings.orchestrator_max_rounds, model_name) response = await client.chat.completions.create( model=model_name, messages=messages, tools=OPENAI_TOOL_SCHEMAS, tool_choice="auto", ) choice = response.choices[0] msg = choice.message # Append the assistant turn (MUST include tool_calls if present so the # next request is valid — OpenAI requires the full history to be consistent) assistant_msg: dict = {"role": "assistant"} if msg.content: assistant_msg["content"] = msg.content if msg.tool_calls: assistant_msg["tool_calls"] = [ { "id": tc.id, "type": "function", "function": { "name": tc.function.name, "arguments": tc.function.arguments, }, } for tc in msg.tool_calls ] messages.append(assistant_msg) if choice.finish_reason == "tool_calls" and msg.tool_calls: # Execute all tool calls in parallel, then feed results back tool_tasks = [ _execute_tool(tc.function.name, tc.function.arguments) for tc in msg.tool_calls ] results = await asyncio.gather(*tool_tasks, return_exceptions=True) for tc, result in zip(msg.tool_calls, results): result_str = ( str(result) if not isinstance(result, Exception) else f"Tool error: {result}" ) logger.info("Tool %s → %d chars", tc.function.name, len(result_str)) try: args_parsed = json.loads(tc.function.arguments) except json.JSONDecodeError: args_parsed = {"raw": tc.function.arguments} tool_call_log.append({ "tool": tc.function.name, "args": args_parsed, "result": result_str, }) # Tool result message — tools array must be re-sent on every request messages.append({ "role": "tool", "tool_call_id": tc.id, "content": result_str, }) else: # finish_reason == "stop" (or no tool_calls) — model is done final_response = msg.content or "" logger.info( "OpenAI orchestrator done after %d round(s). Tools used: %d", round_num + 1, len(tool_call_log), ) break else: # Hit the round limit logger.warning("OpenAI orchestrator hit max rounds (%d)", settings.orchestrator_max_rounds) final_response = ( f"Reached the tool iteration limit ({settings.orchestrator_max_rounds} rounds). " "Here is what was gathered:\n\n" + "\n\n".join( f"**{t['tool']}**: {t['result'][:500]}" for t in tool_call_log ) ) model_label = model_cfg.get("label") or model_name logger.info("OpenAI orchestrator complete — model=%s tools=%d", model_label, len(tool_call_log)) return OrchestratorResult( response=final_response, tool_calls=tool_call_log, backend="local", gemini_summary=final_response, # reused for UI display; same content in single-model mode ) async def _execute_tool(name: str, arguments_json: str) -> str: """Parse tool arguments and execute, returning a string result.""" try: args = json.loads(arguments_json) except json.JSONDecodeError: args = {} try: return await call_tool(name, args) except Exception as e: logger.warning("Tool %s failed: %s", name, e) return f"Tool error: {e}"