- inject_mode: per-role toggle (parallel to inject_datetime) gates the "Current mode: Off The Record" line in the system prompt; wired through model_registry, context_loader, chat router, orchestrator router, and local_llm settings UI - OTR orchestrator fix: OrchestrateRequest now carries off_record; _finalize_job stores it per message and gates log_turn on it; JS orchestrate payload sends off_record correctly - Per-message hover metadata: removed always-visible .model-tag; replaced with .msg-meta strip in the action bar (hover-only); shows model label, host, fallback indicator, and OTR badge; stored in session JSON - Send/stop button tooltip: shows role + model and (when tools on) separate orchestrator model + engine label; live elapsed timer on stop button via startRunTimer/stopRunTimer - OrchestratorResult.backend_label: new field; openai_orchestrator fills it; finalize_job propagates it to job dict and session messages - GET /backend: exposes orchestrator_model label so the frontend tooltip can show both models separately - TODO: session delete confirmation added Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
473 lines
18 KiB
Python
473 lines
18 KiB
Python
"""
|
|
OpenAI-compatible orchestrator engine.
|
|
|
|
Implements the same ReAct tool loop as orchestrator_engine.py but uses the
|
|
OpenAI tool calling format, which works with any OpenAI-compatible endpoint:
|
|
OpenRouter, LiteLLM, Open WebUI, Ollama (tool-capable models), etc.
|
|
|
|
The model both runs the tool loop AND writes the final user-facing response —
|
|
no separate handoff step needed when a single capable model handles everything.
|
|
|
|
Flow:
|
|
1. POST to {api_url}/chat/completions with tools + user message
|
|
2. If finish_reason == "tool_calls": execute tools, feed results back, repeat
|
|
3. If finish_reason == "stop": final assistant message is the user-facing response
|
|
|
|
Used when the "orchestrator" role in the model registry resolves to a local_openai
|
|
type model. The Gemini engine (orchestrator_engine.py) is used otherwise.
|
|
"""
|
|
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
|
|
from openai import AsyncOpenAI, APIConnectionError, APIStatusError
|
|
|
|
from config import settings
|
|
from orchestrator_engine import OrchestrateCheckpoint, OrchestratorResult
|
|
from tools import OPENAI_TOOL_SCHEMAS, call_tool, get_openai_tools_for_role, get_tools_for_role, CONFIRM_REQUIRED
|
|
import tool_audit
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Appended to the persona system prompt so the model knows it has tools.
|
|
# Kept brief — capable models handle tool use without much coaching.
|
|
_TOOL_INSTRUCTION = (
|
|
"\n\nYou have access to tools. Use them when you need current information, "
|
|
"need to read files, or need to take actions on the user's behalf. "
|
|
"Respond naturally after gathering what you need."
|
|
)
|
|
|
|
|
|
async def run(
|
|
task: str,
|
|
system_prompt: str = "",
|
|
session_messages: list[dict] | None = None,
|
|
model_cfg: dict | None = None,
|
|
respond_with_final: bool = True,
|
|
user_role: str = "user",
|
|
tool_list: list[str] | None = None,
|
|
confirm_allow: set[str] | None = None,
|
|
confirm_deny: set[str] | None = None,
|
|
) -> OrchestratorResult:
|
|
"""
|
|
Run a tool-enabled task using an OpenAI-compatible API.
|
|
|
|
Args:
|
|
task: The user's request (plain text)
|
|
system_prompt: Persona system prompt from context_loader (passed through)
|
|
session_messages: Recent conversation history for session continuity
|
|
model_cfg: Resolved model config from model_registry (local_openai type)
|
|
respond_with_final: If False, return just the tool-loop summary without a
|
|
full persona-voiced response (faster; for cron/background)
|
|
confirm_allow: Tools to bypass the confirmation gate for this user
|
|
confirm_deny: Tools to always block for this user
|
|
|
|
Returns:
|
|
OrchestratorResult — if checkpoint is set, the job is awaiting confirmation
|
|
"""
|
|
if not model_cfg:
|
|
raise RuntimeError("model_cfg is required for the OpenAI orchestrator")
|
|
|
|
_confirm_allow = frozenset(confirm_allow or ())
|
|
_confirm_deny = frozenset(confirm_deny or ())
|
|
effective_confirm = (CONFIRM_REQUIRED - set(_confirm_allow)) | set(_confirm_deny)
|
|
|
|
client, model_name, active_tools = _build_client(model_cfg, user_role, tool_list)
|
|
tool_audit.set_context("openai", model_cfg.get("label") or model_name)
|
|
|
|
sys_content = (system_prompt or "") + _TOOL_INSTRUCTION
|
|
messages: list[dict] = [{"role": "system", "content": sys_content}]
|
|
if session_messages:
|
|
messages.extend(
|
|
{"role": m["role"], "content": m["content"]}
|
|
for m in session_messages[-6:]
|
|
)
|
|
messages.append({"role": "user", "content": task})
|
|
|
|
tool_call_log: list[dict] = []
|
|
|
|
final_response, checkpoint = await _run_from_messages(
|
|
client=client,
|
|
messages=messages,
|
|
active_tools=active_tools,
|
|
tool_call_log=tool_call_log,
|
|
effective_confirm=effective_confirm,
|
|
model_name=model_name,
|
|
task=task,
|
|
model_cfg=model_cfg,
|
|
respond_with_final=respond_with_final,
|
|
user_role=user_role,
|
|
tool_list=tool_list,
|
|
confirm_allow=_confirm_allow,
|
|
confirm_deny=_confirm_deny,
|
|
starting_round=0,
|
|
)
|
|
|
|
if checkpoint:
|
|
return OrchestratorResult(
|
|
response=final_response,
|
|
tool_calls=list(tool_call_log),
|
|
backend="local",
|
|
gemini_summary=final_response,
|
|
checkpoint=checkpoint,
|
|
)
|
|
|
|
model_label = model_cfg.get("label") or model_name
|
|
logger.info("OpenAI orchestrator complete — model=%s tools=%d", model_label, len(tool_call_log))
|
|
return OrchestratorResult(
|
|
response=final_response,
|
|
tool_calls=tool_call_log,
|
|
backend="local",
|
|
backend_label=model_label,
|
|
gemini_summary=final_response,
|
|
)
|
|
|
|
|
|
async def resume(checkpoint: OrchestrateCheckpoint, confirmed: bool) -> OrchestratorResult:
|
|
"""Continue an OpenAI orchestrator job that was paused at a confirmation gate."""
|
|
client, model_name, active_tools = _build_client(checkpoint.model_cfg, checkpoint.user_role, checkpoint.tool_list)
|
|
|
|
effective_confirm = (CONFIRM_REQUIRED - set(checkpoint.confirm_allow)) | set(checkpoint.confirm_deny)
|
|
|
|
messages = list(checkpoint.pre_fn_state)
|
|
tool_call_log = [t for t in checkpoint.tool_call_log if t["result"] != "[awaiting confirmation]"]
|
|
|
|
# Build tool responses for this round
|
|
for er in checkpoint.executed_results:
|
|
messages.append({
|
|
"role": "tool",
|
|
"tool_call_id": er.get("tool_call_id", er["name"]),
|
|
"content": er["result"],
|
|
})
|
|
|
|
for pt in checkpoint.pending_tools:
|
|
if confirmed:
|
|
result_str = await _execute_tool_dict(pt["name"], pt["args"], checkpoint.user_role, checkpoint.tool_list)
|
|
logger.info("Confirmed tool %s → %d chars", pt["name"], len(result_str))
|
|
else:
|
|
result_str = "Action denied by user."
|
|
logger.info("Tool %s denied by user", pt["name"])
|
|
tool_call_log.append({"tool": pt["name"], "args": pt["args"], "result": result_str})
|
|
messages.append({
|
|
"role": "tool",
|
|
"tool_call_id": pt.get("tool_call_id", pt["name"]),
|
|
"content": result_str,
|
|
})
|
|
|
|
final_response, new_checkpoint = await _run_from_messages(
|
|
client=client,
|
|
messages=messages,
|
|
active_tools=active_tools,
|
|
tool_call_log=tool_call_log,
|
|
effective_confirm=effective_confirm,
|
|
model_name=model_name,
|
|
task=checkpoint.task,
|
|
model_cfg=checkpoint.model_cfg,
|
|
respond_with_final=checkpoint.respond_with_final,
|
|
user_role=checkpoint.user_role,
|
|
tool_list=checkpoint.tool_list,
|
|
confirm_allow=checkpoint.confirm_allow,
|
|
confirm_deny=checkpoint.confirm_deny,
|
|
starting_round=checkpoint.rounds_used,
|
|
)
|
|
|
|
if new_checkpoint:
|
|
return OrchestratorResult(
|
|
response=final_response,
|
|
tool_calls=list(tool_call_log),
|
|
backend="local",
|
|
gemini_summary=final_response,
|
|
checkpoint=new_checkpoint,
|
|
)
|
|
|
|
model_label = (checkpoint.model_cfg or {}).get("label") or model_name
|
|
logger.info("OpenAI orchestrator resumed — model=%s tools=%d", model_label, len(tool_call_log))
|
|
return OrchestratorResult(
|
|
response=final_response,
|
|
tool_calls=tool_call_log,
|
|
backend="local",
|
|
gemini_summary=final_response,
|
|
)
|
|
|
|
|
|
_CHARS_PER_TOKEN = 4
|
|
# Fixed token overhead budget for sending 40 tool schemas per call
|
|
_TOOL_SCHEMA_OVERHEAD = 3_000
|
|
# Chars to keep per truncated old tool result
|
|
_TRUNC_RESULT_CHARS = 400
|
|
# Always keep the last N tool-result messages uncompacted
|
|
_KEEP_RECENT_TOOL_MSGS = 6 # ~2 rounds of 3 tools each
|
|
|
|
|
|
def _estimate_tokens(messages: list[dict]) -> int:
|
|
total = sum(len(json.dumps(m)) for m in messages)
|
|
return total // _CHARS_PER_TOKEN + _TOOL_SCHEMA_OVERHEAD
|
|
|
|
|
|
def _compact_messages(messages: list[dict], budget_tokens: int) -> list[dict]:
|
|
"""
|
|
Truncate old tool result content when approaching the context budget.
|
|
|
|
Strategy: keep system message, recent assistant/tool rounds, and the
|
|
original user task intact. Truncate content of old tool results in the
|
|
middle of the conversation — the model only needs recent results to reason.
|
|
"""
|
|
if _estimate_tokens(messages) <= budget_tokens:
|
|
return messages
|
|
|
|
tool_indices = [i for i, m in enumerate(messages) if m.get("role") == "tool"]
|
|
n_to_compact = max(0, len(tool_indices) - _KEEP_RECENT_TOOL_MSGS)
|
|
if n_to_compact == 0:
|
|
return messages # nothing old enough to trim
|
|
|
|
compact_set = set(tool_indices[:n_to_compact])
|
|
result = []
|
|
chars_saved = 0
|
|
for i, msg in enumerate(messages):
|
|
if i in compact_set:
|
|
content = msg.get("content", "")
|
|
if len(content) > _TRUNC_RESULT_CHARS:
|
|
msg = dict(msg)
|
|
saved = len(content) - _TRUNC_RESULT_CHARS
|
|
chars_saved += saved
|
|
msg["content"] = (
|
|
content[:_TRUNC_RESULT_CHARS]
|
|
+ f" …[{len(content) - _TRUNC_RESULT_CHARS} chars omitted]"
|
|
)
|
|
result.append(msg)
|
|
|
|
new_est = _estimate_tokens(result)
|
|
logger.info(
|
|
"context compaction: saved ~%d tokens (%d chars), now ~%d / %d tokens",
|
|
chars_saved // _CHARS_PER_TOKEN, chars_saved, new_est, budget_tokens,
|
|
)
|
|
return result
|
|
|
|
|
|
def _context_budget(model_cfg: dict | None) -> int:
|
|
"""Return the usable token budget (75% of context window, min 16k, default 32k)."""
|
|
context_k = (model_cfg or {}).get("context_k") or 32
|
|
return max(16_000, int(context_k * 1000 * 0.75))
|
|
|
|
|
|
async def _run_from_messages(
|
|
client,
|
|
messages: list[dict],
|
|
active_tools: list,
|
|
tool_call_log: list[dict],
|
|
effective_confirm: set[str],
|
|
model_name: str,
|
|
task: str,
|
|
model_cfg: dict | None,
|
|
respond_with_final: bool,
|
|
user_role: str,
|
|
confirm_allow: frozenset,
|
|
confirm_deny: frozenset,
|
|
starting_round: int = 0,
|
|
tool_list: list[str] | None = None,
|
|
) -> tuple[str, OrchestrateCheckpoint | None]:
|
|
"""
|
|
Run the OpenAI ReAct loop from the current messages state.
|
|
Returns (final_response, checkpoint) — checkpoint is set if confirmation is needed.
|
|
"""
|
|
final_response = ""
|
|
budget = _context_budget(model_cfg)
|
|
|
|
per_model_limit = (model_cfg or {}).get("max_rounds") or settings.orchestrator_max_rounds
|
|
effective_limit = min(per_model_limit, settings.orchestrator_max_rounds)
|
|
|
|
for round_num in range(starting_round, effective_limit):
|
|
messages = _compact_messages(messages, budget)
|
|
est = _estimate_tokens(messages)
|
|
logger.info("OpenAI orchestrator round %d / %d model=%s ~%d tokens",
|
|
round_num + 1, effective_limit, model_name, est)
|
|
|
|
call_kwargs: dict = {"model": model_name, "messages": messages}
|
|
if active_tools:
|
|
call_kwargs["tools"] = active_tools
|
|
call_kwargs["tool_choice"] = "auto"
|
|
response = await _chat_with_retry(client, **call_kwargs)
|
|
|
|
choice = response.choices[0]
|
|
msg = choice.message
|
|
|
|
assistant_msg: dict = {"role": "assistant"}
|
|
if msg.content:
|
|
assistant_msg["content"] = msg.content
|
|
if msg.tool_calls:
|
|
assistant_msg["tool_calls"] = [
|
|
{
|
|
"id": tc.id,
|
|
"type": "function",
|
|
"function": {"name": tc.function.name, "arguments": tc.function.arguments},
|
|
}
|
|
for tc in msg.tool_calls
|
|
]
|
|
messages.append(assistant_msg)
|
|
|
|
# Some models set finish_reason="stop" even when tool_calls are present
|
|
if msg.tool_calls and (choice.finish_reason in ("tool_calls", "stop", None)):
|
|
# Snapshot state before tool responses for potential checkpoint
|
|
pre_fn_state = list(messages)
|
|
|
|
pending_tools: list[dict] = []
|
|
executed_results: list[dict] = []
|
|
|
|
for tc in msg.tool_calls:
|
|
name = tc.function.name
|
|
raw_args = tc.function.arguments or "{}"
|
|
try:
|
|
args_parsed = json.loads(raw_args)
|
|
if not isinstance(args_parsed, dict):
|
|
raise ValueError("args must be a JSON object")
|
|
except (json.JSONDecodeError, ValueError) as e:
|
|
logger.warning("Malformed tool args for %s: %s — args: %.200s", name, e, raw_args)
|
|
args_parsed = {}
|
|
|
|
if name in effective_confirm:
|
|
pending_tools.append({"name": name, "args": args_parsed, "tool_call_id": tc.id})
|
|
logger.info("Tool %s blocked — confirmation required", name)
|
|
else:
|
|
result_str = await _execute_tool(name, tc.function.arguments, user_role, tool_list)
|
|
logger.info("Tool %s → %d chars", name, len(result_str))
|
|
executed_results.append({"name": name, "args": args_parsed, "result": result_str, "tool_call_id": tc.id})
|
|
tool_call_log.append({"tool": name, "args": args_parsed, "result": result_str})
|
|
messages.append({"role": "tool", "tool_call_id": tc.id, "content": result_str})
|
|
|
|
if pending_tools:
|
|
# Add placeholder responses
|
|
for pt in pending_tools:
|
|
placeholder = f"[AWAITING USER CONFIRMATION for {pt['name']}]"
|
|
tool_call_log.append({"tool": pt["name"], "args": pt["args"], "result": "[awaiting confirmation]"})
|
|
messages.append({"role": "tool", "tool_call_id": pt["tool_call_id"], "content": placeholder})
|
|
|
|
messages = _compact_messages(messages, budget)
|
|
conf_call: dict = {"model": model_name, "messages": messages, "tool_choice": "none"}
|
|
if active_tools:
|
|
conf_call["tools"] = active_tools
|
|
conf_resp = await _chat_with_retry(client, **conf_call)
|
|
final_response = conf_resp.choices[0].message.content or (
|
|
"This action requires your explicit confirmation before it can proceed."
|
|
)
|
|
|
|
checkpoint = OrchestrateCheckpoint(
|
|
engine="openai",
|
|
pre_fn_state=pre_fn_state,
|
|
executed_results=executed_results,
|
|
pending_tools=pending_tools,
|
|
tool_call_log=list(tool_call_log),
|
|
task=task,
|
|
model_cfg=model_cfg,
|
|
respond_with_final=respond_with_final,
|
|
user_role=user_role,
|
|
tool_list=tool_list,
|
|
confirm_allow=confirm_allow,
|
|
confirm_deny=confirm_deny,
|
|
rounds_used=round_num + 2,
|
|
)
|
|
return final_response, checkpoint
|
|
|
|
else:
|
|
final_response = msg.content or ""
|
|
logger.info(
|
|
"OpenAI orchestrator done after %d round(s). Tools used: %d",
|
|
round_num + 1, len(tool_call_log),
|
|
)
|
|
break
|
|
|
|
else:
|
|
logger.warning("OpenAI orchestrator hit max rounds (%d)", effective_limit)
|
|
final_response = (
|
|
f"Reached the tool iteration limit ({effective_limit} rounds). "
|
|
"Here is what was gathered:\n\n"
|
|
+ "\n\n".join(f"**{t['tool']}**: {t['result'][:500]}" for t in tool_call_log)
|
|
)
|
|
|
|
return final_response, None
|
|
|
|
|
|
_RETRY_STATUSES = {429, 500, 502, 503, 504}
|
|
_MAX_API_RETRIES = 3
|
|
|
|
|
|
async def _chat_with_retry(client, **kwargs):
|
|
"""Wrap chat.completions.create with exponential backoff on transient errors."""
|
|
last_exc: Exception = RuntimeError("No attempts made")
|
|
for attempt in range(_MAX_API_RETRIES):
|
|
try:
|
|
return await client.chat.completions.create(**kwargs)
|
|
except APIConnectionError as e:
|
|
last_exc = e
|
|
logger.warning("OpenAI connection error (attempt %d/%d): %s", attempt + 1, _MAX_API_RETRIES, e)
|
|
except APIStatusError as e:
|
|
if e.status_code in _RETRY_STATUSES:
|
|
last_exc = e
|
|
logger.warning("OpenAI status %d (attempt %d/%d): %s", e.status_code, attempt + 1, _MAX_API_RETRIES, e)
|
|
else:
|
|
raise
|
|
if attempt < _MAX_API_RETRIES - 1:
|
|
await asyncio.sleep(2 ** attempt) # 1s, 2s
|
|
raise last_exc
|
|
|
|
|
|
def _build_client(
|
|
model_cfg: dict | None,
|
|
user_role: str = "user",
|
|
tool_list: list[str] | None = None,
|
|
) -> tuple:
|
|
"""Build AsyncOpenAI client and return (client, model_name, active_tools)."""
|
|
if not model_cfg:
|
|
raise RuntimeError("model_cfg is required for the OpenAI orchestrator")
|
|
api_url = model_cfg.get("api_url", "")
|
|
api_key = model_cfg.get("api_key", "") or "none"
|
|
model_name = model_cfg.get("model_name", "")
|
|
host_type = model_cfg.get("host_type", "openwebui")
|
|
if not api_url or not model_name:
|
|
raise RuntimeError(
|
|
f"model_cfg missing api_url or model_name: {model_cfg.get('label', model_cfg)}"
|
|
)
|
|
base_url = api_url.rstrip("/")
|
|
if host_type == "openwebui":
|
|
base_url = base_url + "/api"
|
|
client = AsyncOpenAI(base_url=base_url, api_key=api_key, timeout=settings.timeout_local)
|
|
if model_cfg.get("tools") is False:
|
|
active_tools = []
|
|
else:
|
|
active_tools = get_openai_tools_for_role(user_role, tool_list)
|
|
return client, model_name, active_tools
|
|
|
|
|
|
async def _execute_tool(
|
|
name: str,
|
|
arguments_json: str,
|
|
user_role: str = "user",
|
|
tool_list: list[str] | None = None,
|
|
) -> str:
|
|
"""Parse tool arguments and execute with role-filtered callables."""
|
|
_, callables = get_tools_for_role(user_role, tool_list)
|
|
try:
|
|
args = json.loads(arguments_json)
|
|
except json.JSONDecodeError:
|
|
args = {}
|
|
try:
|
|
return await call_tool(name, args, callables)
|
|
except Exception as e:
|
|
logger.warning("Tool %s failed: %s", name, e)
|
|
return f"Tool error: {e}"
|
|
|
|
|
|
async def _execute_tool_dict(
|
|
name: str,
|
|
args: dict,
|
|
user_role: str = "user",
|
|
tool_list: list[str] | None = None,
|
|
) -> str:
|
|
"""Execute a tool from a pre-parsed args dict."""
|
|
_, callables = get_tools_for_role(user_role, tool_list)
|
|
try:
|
|
return await call_tool(name, args, callables)
|
|
except Exception as e:
|
|
logger.warning("Tool %s failed: %s", name, e)
|
|
return f"Tool error: {e}"
|