Files
Cortex-Inara/cortex/openai_orchestrator.py
Scott Idem 49123cdd5c feat: per-role tool lists and system prompt overlays
Each role in model_registry.json can now carry two optional keys:
  system_append — injected into the system prompt at position 7 (after
                  memory, closest to the turn) for the active chat_role
  tools         — explicit tool allow-list; intersected with the user's
                  access-level filter so it can only restrict, never elevate

No changes needed for existing users — missing keys fall back to current
behavior. Add keys to a role to give it a specialty focus:

  "coder": {
    "primary": "claude_cli",
    "system_append": "You are in code-specialist mode...",
    "tools": ["web_search", "file_read", "shell_exec", "scratch_write"]
  }

Changes:
- model_registry.py: get_role_config() returns system_append + tools
- context_loader.py: role_append param appended as "--- Role Context ---"
- tools/__init__.py: get_tools_for_role/get_openai_tools_for_role accept
  optional tool_list and intersect with access-level filter
- orchestrator_engine.py: tool_list threaded through run/resume/checkpoint
- openai_orchestrator.py: tool_list threaded through run/resume/checkpoint;
  _build_client now calls get_openai_tools_for_role instead of returning
  unfiltered OPENAI_TOOL_SCHEMAS
- routers/orchestrator.py: pulls role_cfg for chat_role, passes both
  role_append and tool_list to context loader and engine

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-01 20:00:38 -04:00

374 lines
14 KiB
Python

"""
OpenAI-compatible orchestrator engine.
Implements the same ReAct tool loop as orchestrator_engine.py but uses the
OpenAI tool calling format, which works with any OpenAI-compatible endpoint:
OpenRouter, LiteLLM, Open WebUI, Ollama (tool-capable models), etc.
The model both runs the tool loop AND writes the final user-facing response —
no separate handoff step needed when a single capable model handles everything.
Flow:
1. POST to {api_url}/chat/completions with tools + user message
2. If finish_reason == "tool_calls": execute tools, feed results back, repeat
3. If finish_reason == "stop": final assistant message is the user-facing response
Used when the "orchestrator" role in the model registry resolves to a local_openai
type model. The Gemini engine (orchestrator_engine.py) is used otherwise.
"""
import asyncio
import json
import logging
from openai import AsyncOpenAI
from config import settings
from orchestrator_engine import OrchestrateCheckpoint, OrchestratorResult
from tools import OPENAI_TOOL_SCHEMAS, call_tool, get_openai_tools_for_role, get_tools_for_role, CONFIRM_REQUIRED
logger = logging.getLogger(__name__)
# Appended to the persona system prompt so the model knows it has tools.
# Kept brief — capable models handle tool use without much coaching.
_TOOL_INSTRUCTION = (
"\n\nYou have access to tools. Use them when you need current information, "
"need to read files, or need to take actions on the user's behalf. "
"Respond naturally after gathering what you need."
)
async def run(
task: str,
system_prompt: str = "",
session_messages: list[dict] | None = None,
model_cfg: dict | None = None,
respond_with_final: bool = True,
user_role: str = "user",
tool_list: list[str] | None = None,
confirm_allow: set[str] | None = None,
confirm_deny: set[str] | None = None,
) -> OrchestratorResult:
"""
Run a tool-enabled task using an OpenAI-compatible API.
Args:
task: The user's request (plain text)
system_prompt: Persona system prompt from context_loader (passed through)
session_messages: Recent conversation history for session continuity
model_cfg: Resolved model config from model_registry (local_openai type)
respond_with_final: If False, return just the tool-loop summary without a
full persona-voiced response (faster; for cron/background)
confirm_allow: Tools to bypass the confirmation gate for this user
confirm_deny: Tools to always block for this user
Returns:
OrchestratorResult — if checkpoint is set, the job is awaiting confirmation
"""
if not model_cfg:
raise RuntimeError("model_cfg is required for the OpenAI orchestrator")
_confirm_allow = frozenset(confirm_allow or ())
_confirm_deny = frozenset(confirm_deny or ())
effective_confirm = (CONFIRM_REQUIRED - set(_confirm_allow)) | set(_confirm_deny)
client, model_name, active_tools = _build_client(model_cfg, user_role, tool_list)
sys_content = (system_prompt or "") + _TOOL_INSTRUCTION
messages: list[dict] = [{"role": "system", "content": sys_content}]
if session_messages:
messages.extend(
{"role": m["role"], "content": m["content"]}
for m in session_messages[-6:]
)
messages.append({"role": "user", "content": task})
tool_call_log: list[dict] = []
final_response, checkpoint = await _run_from_messages(
client=client,
messages=messages,
active_tools=active_tools,
tool_call_log=tool_call_log,
effective_confirm=effective_confirm,
model_name=model_name,
task=task,
model_cfg=model_cfg,
respond_with_final=respond_with_final,
user_role=user_role,
tool_list=tool_list,
confirm_allow=_confirm_allow,
confirm_deny=_confirm_deny,
starting_round=0,
)
if checkpoint:
return OrchestratorResult(
response=final_response,
tool_calls=list(tool_call_log),
backend="local",
gemini_summary=final_response,
checkpoint=checkpoint,
)
model_label = model_cfg.get("label") or model_name
logger.info("OpenAI orchestrator complete — model=%s tools=%d", model_label, len(tool_call_log))
return OrchestratorResult(
response=final_response,
tool_calls=tool_call_log,
backend="local",
gemini_summary=final_response,
)
async def resume(checkpoint: OrchestrateCheckpoint, confirmed: bool) -> OrchestratorResult:
"""Continue an OpenAI orchestrator job that was paused at a confirmation gate."""
client, model_name, active_tools = _build_client(checkpoint.model_cfg, checkpoint.user_role, checkpoint.tool_list)
effective_confirm = (CONFIRM_REQUIRED - set(checkpoint.confirm_allow)) | set(checkpoint.confirm_deny)
messages = list(checkpoint.pre_fn_state)
tool_call_log = [t for t in checkpoint.tool_call_log if t["result"] != "[awaiting confirmation]"]
# Build tool responses for this round
for er in checkpoint.executed_results:
messages.append({
"role": "tool",
"tool_call_id": er.get("tool_call_id", er["name"]),
"content": er["result"],
})
for pt in checkpoint.pending_tools:
if confirmed:
result_str = await _execute_tool_dict(pt["name"], pt["args"], checkpoint.user_role, checkpoint.tool_list)
logger.info("Confirmed tool %s%d chars", pt["name"], len(result_str))
else:
result_str = "Action denied by user."
logger.info("Tool %s denied by user", pt["name"])
tool_call_log.append({"tool": pt["name"], "args": pt["args"], "result": result_str})
messages.append({
"role": "tool",
"tool_call_id": pt.get("tool_call_id", pt["name"]),
"content": result_str,
})
final_response, new_checkpoint = await _run_from_messages(
client=client,
messages=messages,
active_tools=active_tools,
tool_call_log=tool_call_log,
effective_confirm=effective_confirm,
model_name=model_name,
task=checkpoint.task,
model_cfg=checkpoint.model_cfg,
respond_with_final=checkpoint.respond_with_final,
user_role=checkpoint.user_role,
tool_list=checkpoint.tool_list,
confirm_allow=checkpoint.confirm_allow,
confirm_deny=checkpoint.confirm_deny,
starting_round=checkpoint.rounds_used,
)
if new_checkpoint:
return OrchestratorResult(
response=final_response,
tool_calls=list(tool_call_log),
backend="local",
gemini_summary=final_response,
checkpoint=new_checkpoint,
)
model_label = (checkpoint.model_cfg or {}).get("label") or model_name
logger.info("OpenAI orchestrator resumed — model=%s tools=%d", model_label, len(tool_call_log))
return OrchestratorResult(
response=final_response,
tool_calls=tool_call_log,
backend="local",
gemini_summary=final_response,
)
async def _run_from_messages(
client,
messages: list[dict],
active_tools: list,
tool_call_log: list[dict],
effective_confirm: set[str],
model_name: str,
task: str,
model_cfg: dict | None,
respond_with_final: bool,
user_role: str,
confirm_allow: frozenset,
confirm_deny: frozenset,
starting_round: int = 0,
tool_list: list[str] | None = None,
) -> tuple[str, OrchestrateCheckpoint | None]:
"""
Run the OpenAI ReAct loop from the current messages state.
Returns (final_response, checkpoint) — checkpoint is set if confirmation is needed.
"""
final_response = ""
for round_num in range(starting_round, settings.orchestrator_max_rounds):
logger.info("OpenAI orchestrator round %d / %d model=%s",
round_num + 1, settings.orchestrator_max_rounds, model_name)
response = await client.chat.completions.create(
model=model_name,
messages=messages,
tools=active_tools,
tool_choice="auto",
)
choice = response.choices[0]
msg = choice.message
assistant_msg: dict = {"role": "assistant"}
if msg.content:
assistant_msg["content"] = msg.content
if msg.tool_calls:
assistant_msg["tool_calls"] = [
{
"id": tc.id,
"type": "function",
"function": {"name": tc.function.name, "arguments": tc.function.arguments},
}
for tc in msg.tool_calls
]
messages.append(assistant_msg)
if choice.finish_reason == "tool_calls" and msg.tool_calls:
# Snapshot state before tool responses for potential checkpoint
pre_fn_state = list(messages)
pending_tools: list[dict] = []
executed_results: list[dict] = []
for tc in msg.tool_calls:
name = tc.function.name
try:
args_parsed = json.loads(tc.function.arguments)
except json.JSONDecodeError:
args_parsed = {"raw": tc.function.arguments}
if name in effective_confirm:
pending_tools.append({"name": name, "args": args_parsed, "tool_call_id": tc.id})
logger.info("Tool %s blocked — confirmation required", name)
else:
result_str = await _execute_tool(name, tc.function.arguments, user_role, tool_list)
logger.info("Tool %s%d chars", name, len(result_str))
executed_results.append({"name": name, "args": args_parsed, "result": result_str, "tool_call_id": tc.id})
tool_call_log.append({"tool": name, "args": args_parsed, "result": result_str})
messages.append({"role": "tool", "tool_call_id": tc.id, "content": result_str})
if pending_tools:
# Add placeholder responses
for pt in pending_tools:
placeholder = f"[AWAITING USER CONFIRMATION for {pt['name']}]"
tool_call_log.append({"tool": pt["name"], "args": pt["args"], "result": "[awaiting confirmation]"})
messages.append({"role": "tool", "tool_call_id": pt["tool_call_id"], "content": placeholder})
conf_resp = await client.chat.completions.create(
model=model_name,
messages=messages,
tools=active_tools,
tool_choice="none",
)
final_response = conf_resp.choices[0].message.content or (
"This action requires your explicit confirmation before it can proceed."
)
checkpoint = OrchestrateCheckpoint(
engine="openai",
pre_fn_state=pre_fn_state,
executed_results=executed_results,
pending_tools=pending_tools,
tool_call_log=list(tool_call_log),
task=task,
model_cfg=model_cfg,
respond_with_final=respond_with_final,
user_role=user_role,
tool_list=tool_list,
confirm_allow=confirm_allow,
confirm_deny=confirm_deny,
rounds_used=round_num + 2,
)
return final_response, checkpoint
else:
final_response = msg.content or ""
logger.info(
"OpenAI orchestrator done after %d round(s). Tools used: %d",
round_num + 1, len(tool_call_log),
)
break
else:
logger.warning("OpenAI orchestrator hit max rounds (%d)", settings.orchestrator_max_rounds)
final_response = (
f"Reached the tool iteration limit ({settings.orchestrator_max_rounds} rounds). "
"Here is what was gathered:\n\n"
+ "\n\n".join(f"**{t['tool']}**: {t['result'][:500]}" for t in tool_call_log)
)
return final_response, None
def _build_client(
model_cfg: dict | None,
user_role: str = "user",
tool_list: list[str] | None = None,
) -> tuple:
"""Build AsyncOpenAI client and return (client, model_name, active_tools)."""
if not model_cfg:
raise RuntimeError("model_cfg is required for the OpenAI orchestrator")
api_url = model_cfg.get("api_url", "")
api_key = model_cfg.get("api_key", "") or "none"
model_name = model_cfg.get("model_name", "")
host_type = model_cfg.get("host_type", "openwebui")
if not api_url or not model_name:
raise RuntimeError(
f"model_cfg missing api_url or model_name: {model_cfg.get('label', model_cfg)}"
)
base_url = api_url.rstrip("/")
if host_type == "openwebui":
base_url = base_url + "/api"
client = AsyncOpenAI(base_url=base_url, api_key=api_key)
active_tools = get_openai_tools_for_role(user_role, tool_list)
return client, model_name, active_tools
async def _execute_tool(
name: str,
arguments_json: str,
user_role: str = "user",
tool_list: list[str] | None = None,
) -> str:
"""Parse tool arguments and execute with role-filtered callables."""
_, callables = get_tools_for_role(user_role, tool_list)
try:
args = json.loads(arguments_json)
except json.JSONDecodeError:
args = {}
try:
return await call_tool(name, args, callables)
except Exception as e:
logger.warning("Tool %s failed: %s", name, e)
return f"Tool error: {e}"
async def _execute_tool_dict(
name: str,
args: dict,
user_role: str = "user",
tool_list: list[str] | None = None,
) -> str:
"""Execute a tool from a pre-parsed args dict."""
_, callables = get_tools_for_role(user_role, tool_list)
try:
return await call_tool(name, args, callables)
except Exception as e:
logger.warning("Tool %s failed: %s", name, e)
return f"Tool error: {e}"