""" OpenAI-compatible orchestrator engine. Implements the same ReAct tool loop as orchestrator_engine.py but uses the OpenAI tool calling format, which works with any OpenAI-compatible endpoint: OpenRouter, LiteLLM, Open WebUI, Ollama (tool-capable models), etc. The model both runs the tool loop AND writes the final user-facing response — no separate handoff step needed when a single capable model handles everything. Flow: 1. POST to {api_url}/chat/completions with tools + user message 2. If finish_reason == "tool_calls": execute tools, feed results back, repeat 3. If finish_reason == "stop": final assistant message is the user-facing response Used when the "orchestrator" role in the model registry resolves to a local_openai type model. The Gemini engine (orchestrator_engine.py) is used otherwise. """ import asyncio import json import logging from openai import AsyncOpenAI from config import settings from orchestrator_engine import OrchestrateCheckpoint, OrchestratorResult from tools import OPENAI_TOOL_SCHEMAS, call_tool, get_openai_tools_for_role, get_tools_for_role, CONFIRM_REQUIRED import tool_audit logger = logging.getLogger(__name__) # Appended to the persona system prompt so the model knows it has tools. # Kept brief — capable models handle tool use without much coaching. _TOOL_INSTRUCTION = ( "\n\nYou have access to tools. Use them when you need current information, " "need to read files, or need to take actions on the user's behalf. " "Respond naturally after gathering what you need." ) async def run( task: str, system_prompt: str = "", session_messages: list[dict] | None = None, model_cfg: dict | None = None, respond_with_final: bool = True, user_role: str = "user", tool_list: list[str] | None = None, confirm_allow: set[str] | None = None, confirm_deny: set[str] | None = None, ) -> OrchestratorResult: """ Run a tool-enabled task using an OpenAI-compatible API. Args: task: The user's request (plain text) system_prompt: Persona system prompt from context_loader (passed through) session_messages: Recent conversation history for session continuity model_cfg: Resolved model config from model_registry (local_openai type) respond_with_final: If False, return just the tool-loop summary without a full persona-voiced response (faster; for cron/background) confirm_allow: Tools to bypass the confirmation gate for this user confirm_deny: Tools to always block for this user Returns: OrchestratorResult — if checkpoint is set, the job is awaiting confirmation """ if not model_cfg: raise RuntimeError("model_cfg is required for the OpenAI orchestrator") _confirm_allow = frozenset(confirm_allow or ()) _confirm_deny = frozenset(confirm_deny or ()) effective_confirm = (CONFIRM_REQUIRED - set(_confirm_allow)) | set(_confirm_deny) client, model_name, active_tools = _build_client(model_cfg, user_role, tool_list) tool_audit.set_context("openai", model_cfg.get("label") or model_name) sys_content = (system_prompt or "") + _TOOL_INSTRUCTION messages: list[dict] = [{"role": "system", "content": sys_content}] if session_messages: messages.extend( {"role": m["role"], "content": m["content"]} for m in session_messages[-6:] ) messages.append({"role": "user", "content": task}) tool_call_log: list[dict] = [] final_response, checkpoint = await _run_from_messages( client=client, messages=messages, active_tools=active_tools, tool_call_log=tool_call_log, effective_confirm=effective_confirm, model_name=model_name, task=task, model_cfg=model_cfg, respond_with_final=respond_with_final, user_role=user_role, tool_list=tool_list, confirm_allow=_confirm_allow, confirm_deny=_confirm_deny, starting_round=0, ) if checkpoint: return OrchestratorResult( response=final_response, tool_calls=list(tool_call_log), backend="local", gemini_summary=final_response, checkpoint=checkpoint, ) model_label = model_cfg.get("label") or model_name logger.info("OpenAI orchestrator complete — model=%s tools=%d", model_label, len(tool_call_log)) return OrchestratorResult( response=final_response, tool_calls=tool_call_log, backend="local", gemini_summary=final_response, ) async def resume(checkpoint: OrchestrateCheckpoint, confirmed: bool) -> OrchestratorResult: """Continue an OpenAI orchestrator job that was paused at a confirmation gate.""" client, model_name, active_tools = _build_client(checkpoint.model_cfg, checkpoint.user_role, checkpoint.tool_list) effective_confirm = (CONFIRM_REQUIRED - set(checkpoint.confirm_allow)) | set(checkpoint.confirm_deny) messages = list(checkpoint.pre_fn_state) tool_call_log = [t for t in checkpoint.tool_call_log if t["result"] != "[awaiting confirmation]"] # Build tool responses for this round for er in checkpoint.executed_results: messages.append({ "role": "tool", "tool_call_id": er.get("tool_call_id", er["name"]), "content": er["result"], }) for pt in checkpoint.pending_tools: if confirmed: result_str = await _execute_tool_dict(pt["name"], pt["args"], checkpoint.user_role, checkpoint.tool_list) logger.info("Confirmed tool %s → %d chars", pt["name"], len(result_str)) else: result_str = "Action denied by user." logger.info("Tool %s denied by user", pt["name"]) tool_call_log.append({"tool": pt["name"], "args": pt["args"], "result": result_str}) messages.append({ "role": "tool", "tool_call_id": pt.get("tool_call_id", pt["name"]), "content": result_str, }) final_response, new_checkpoint = await _run_from_messages( client=client, messages=messages, active_tools=active_tools, tool_call_log=tool_call_log, effective_confirm=effective_confirm, model_name=model_name, task=checkpoint.task, model_cfg=checkpoint.model_cfg, respond_with_final=checkpoint.respond_with_final, user_role=checkpoint.user_role, tool_list=checkpoint.tool_list, confirm_allow=checkpoint.confirm_allow, confirm_deny=checkpoint.confirm_deny, starting_round=checkpoint.rounds_used, ) if new_checkpoint: return OrchestratorResult( response=final_response, tool_calls=list(tool_call_log), backend="local", gemini_summary=final_response, checkpoint=new_checkpoint, ) model_label = (checkpoint.model_cfg or {}).get("label") or model_name logger.info("OpenAI orchestrator resumed — model=%s tools=%d", model_label, len(tool_call_log)) return OrchestratorResult( response=final_response, tool_calls=tool_call_log, backend="local", gemini_summary=final_response, ) _CHARS_PER_TOKEN = 4 # Fixed token overhead budget for sending 40 tool schemas per call _TOOL_SCHEMA_OVERHEAD = 3_000 # Chars to keep per truncated old tool result _TRUNC_RESULT_CHARS = 400 # Always keep the last N tool-result messages uncompacted _KEEP_RECENT_TOOL_MSGS = 6 # ~2 rounds of 3 tools each def _estimate_tokens(messages: list[dict]) -> int: total = sum(len(json.dumps(m)) for m in messages) return total // _CHARS_PER_TOKEN + _TOOL_SCHEMA_OVERHEAD def _compact_messages(messages: list[dict], budget_tokens: int) -> list[dict]: """ Truncate old tool result content when approaching the context budget. Strategy: keep system message, recent assistant/tool rounds, and the original user task intact. Truncate content of old tool results in the middle of the conversation — the model only needs recent results to reason. """ if _estimate_tokens(messages) <= budget_tokens: return messages tool_indices = [i for i, m in enumerate(messages) if m.get("role") == "tool"] n_to_compact = max(0, len(tool_indices) - _KEEP_RECENT_TOOL_MSGS) if n_to_compact == 0: return messages # nothing old enough to trim compact_set = set(tool_indices[:n_to_compact]) result = [] chars_saved = 0 for i, msg in enumerate(messages): if i in compact_set: content = msg.get("content", "") if len(content) > _TRUNC_RESULT_CHARS: msg = dict(msg) saved = len(content) - _TRUNC_RESULT_CHARS chars_saved += saved msg["content"] = ( content[:_TRUNC_RESULT_CHARS] + f" …[{len(content) - _TRUNC_RESULT_CHARS} chars omitted]" ) result.append(msg) new_est = _estimate_tokens(result) logger.info( "context compaction: saved ~%d tokens (%d chars), now ~%d / %d tokens", chars_saved // _CHARS_PER_TOKEN, chars_saved, new_est, budget_tokens, ) return result def _context_budget(model_cfg: dict | None) -> int: """Return the usable token budget (75% of context window, min 16k, default 32k).""" context_k = (model_cfg or {}).get("context_k") or 32 return max(16_000, int(context_k * 1000 * 0.75)) async def _run_from_messages( client, messages: list[dict], active_tools: list, tool_call_log: list[dict], effective_confirm: set[str], model_name: str, task: str, model_cfg: dict | None, respond_with_final: bool, user_role: str, confirm_allow: frozenset, confirm_deny: frozenset, starting_round: int = 0, tool_list: list[str] | None = None, ) -> tuple[str, OrchestrateCheckpoint | None]: """ Run the OpenAI ReAct loop from the current messages state. Returns (final_response, checkpoint) — checkpoint is set if confirmation is needed. """ final_response = "" budget = _context_budget(model_cfg) per_model_limit = (model_cfg or {}).get("max_rounds") or settings.orchestrator_max_rounds effective_limit = min(per_model_limit, settings.orchestrator_max_rounds) for round_num in range(starting_round, effective_limit): messages = _compact_messages(messages, budget) est = _estimate_tokens(messages) logger.info("OpenAI orchestrator round %d / %d model=%s ~%d tokens", round_num + 1, effective_limit, model_name, est) call_kwargs: dict = {"model": model_name, "messages": messages} if active_tools: call_kwargs["tools"] = active_tools call_kwargs["tool_choice"] = "auto" response = await client.chat.completions.create(**call_kwargs) choice = response.choices[0] msg = choice.message assistant_msg: dict = {"role": "assistant"} if msg.content: assistant_msg["content"] = msg.content if msg.tool_calls: assistant_msg["tool_calls"] = [ { "id": tc.id, "type": "function", "function": {"name": tc.function.name, "arguments": tc.function.arguments}, } for tc in msg.tool_calls ] messages.append(assistant_msg) # Some models set finish_reason="stop" even when tool_calls are present if msg.tool_calls and (choice.finish_reason in ("tool_calls", "stop", None)): # Snapshot state before tool responses for potential checkpoint pre_fn_state = list(messages) pending_tools: list[dict] = [] executed_results: list[dict] = [] for tc in msg.tool_calls: name = tc.function.name raw_args = tc.function.arguments or "{}" try: args_parsed = json.loads(raw_args) if not isinstance(args_parsed, dict): raise ValueError("args must be a JSON object") except (json.JSONDecodeError, ValueError) as e: logger.warning("Malformed tool args for %s: %s — args: %.200s", name, e, raw_args) args_parsed = {} if name in effective_confirm: pending_tools.append({"name": name, "args": args_parsed, "tool_call_id": tc.id}) logger.info("Tool %s blocked — confirmation required", name) else: result_str = await _execute_tool(name, tc.function.arguments, user_role, tool_list) logger.info("Tool %s → %d chars", name, len(result_str)) executed_results.append({"name": name, "args": args_parsed, "result": result_str, "tool_call_id": tc.id}) tool_call_log.append({"tool": name, "args": args_parsed, "result": result_str}) messages.append({"role": "tool", "tool_call_id": tc.id, "content": result_str}) if pending_tools: # Add placeholder responses for pt in pending_tools: placeholder = f"[AWAITING USER CONFIRMATION for {pt['name']}]" tool_call_log.append({"tool": pt["name"], "args": pt["args"], "result": "[awaiting confirmation]"}) messages.append({"role": "tool", "tool_call_id": pt["tool_call_id"], "content": placeholder}) messages = _compact_messages(messages, budget) conf_call: dict = {"model": model_name, "messages": messages, "tool_choice": "none"} if active_tools: conf_call["tools"] = active_tools conf_resp = await client.chat.completions.create(**conf_call) final_response = conf_resp.choices[0].message.content or ( "This action requires your explicit confirmation before it can proceed." ) checkpoint = OrchestrateCheckpoint( engine="openai", pre_fn_state=pre_fn_state, executed_results=executed_results, pending_tools=pending_tools, tool_call_log=list(tool_call_log), task=task, model_cfg=model_cfg, respond_with_final=respond_with_final, user_role=user_role, tool_list=tool_list, confirm_allow=confirm_allow, confirm_deny=confirm_deny, rounds_used=round_num + 2, ) return final_response, checkpoint else: final_response = msg.content or "" logger.info( "OpenAI orchestrator done after %d round(s). Tools used: %d", round_num + 1, len(tool_call_log), ) break else: logger.warning("OpenAI orchestrator hit max rounds (%d)", effective_limit) final_response = ( f"Reached the tool iteration limit ({effective_limit} rounds). " "Here is what was gathered:\n\n" + "\n\n".join(f"**{t['tool']}**: {t['result'][:500]}" for t in tool_call_log) ) return final_response, None def _build_client( model_cfg: dict | None, user_role: str = "user", tool_list: list[str] | None = None, ) -> tuple: """Build AsyncOpenAI client and return (client, model_name, active_tools).""" if not model_cfg: raise RuntimeError("model_cfg is required for the OpenAI orchestrator") api_url = model_cfg.get("api_url", "") api_key = model_cfg.get("api_key", "") or "none" model_name = model_cfg.get("model_name", "") host_type = model_cfg.get("host_type", "openwebui") if not api_url or not model_name: raise RuntimeError( f"model_cfg missing api_url or model_name: {model_cfg.get('label', model_cfg)}" ) base_url = api_url.rstrip("/") if host_type == "openwebui": base_url = base_url + "/api" client = AsyncOpenAI(base_url=base_url, api_key=api_key, timeout=settings.timeout_local) if model_cfg.get("tools") is False: active_tools = [] else: active_tools = get_openai_tools_for_role(user_role, tool_list) return client, model_name, active_tools async def _execute_tool( name: str, arguments_json: str, user_role: str = "user", tool_list: list[str] | None = None, ) -> str: """Parse tool arguments and execute with role-filtered callables.""" _, callables = get_tools_for_role(user_role, tool_list) try: args = json.loads(arguments_json) except json.JSONDecodeError: args = {} try: return await call_tool(name, args, callables) except Exception as e: logger.warning("Tool %s failed: %s", name, e) return f"Tool error: {e}" async def _execute_tool_dict( name: str, args: dict, user_role: str = "user", tool_list: list[str] | None = None, ) -> str: """Execute a tool from a pre-parsed args dict.""" _, callables = get_tools_for_role(user_role, tool_list) try: return await call_tool(name, args, callables) except Exception as e: logger.warning("Tool %s failed: %s", name, e) return f"Tool error: {e}"