Files
Cortex-Inara/cortex/model_registry.py
Scott Idem d9a322164a feat: OpenAI-compatible orchestrator + backend auto-routing
- openai_orchestrator.py — new ReAct tool loop engine for any
  OpenAI-compatible endpoint (OpenRouter, Open WebUI, Ollama, LiteLLM);
  model handles both tool loop and final response, no Claude handoff needed
- tools/__init__.py — auto-derive OpenAI JSON Schema from existing Gemini
  FunctionDeclarations so tool definitions have a single source of truth
- routers/orchestrator.py — route to openai_orchestrator when model registry
  "orchestrator" role resolves to a local_openai type host
- routers/chat.py — pass role to _backend_label(); fix fallback_used logic
  (only meaningful for explicit backend overrides, not auto-routing)
- static/app.js — add null/"auto" to backend cycle; fetch local model hint
  without overriding the auto default on page load
- model_registry.py — _normalize() back-fills host_type on old registry files
- requirements.txt — add openai>=1.0.0
- ARCH__BACKENDS.md — document OpenAI-compat backend and routing logic

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-08 19:18:18 -04:00

468 lines
17 KiB
Python

"""
Per-user unified model registry.
Stored in: home/{user}/model_registry.json
Schema:
{
"version": 1,
"hosts": [{"id", "label", "api_url", "api_key",
"host_type": "openwebui" | "openai"}, ...],
#
# host_type controls the API path layout:
# "openwebui" (default) — Open WebUI / Ollama:
# chat: POST {url}/api/chat/completions
# models: GET {url}/api/models
# "openai" — OpenRouter, LiteLLM, Anthropic-compatible, etc.:
# chat: POST {url}/chat/completions
# models: GET {url}/models
# Set api_url to the base path that ends just before /chat/completions,
# e.g. https://openrouter.ai/api/v1 for OpenRouter.
"models": [
{
"id": str, # unique within this registry
"type": str, # "local_openai" | "claude_cli" | "gemini_cli" | "gemini_api"
"label": str, # human-readable display name
"model_name": str, # model identifier sent to the API
"host_id": str | null, # only for local_openai — references hosts[].id
"context_k": int, # context window in thousands of tokens (informational)
"tags": [str], # user-defined capability tags
},
],
"roles": {
"<role>": {
"primary": "<model_id>" | null,
"backup_1": "<model_id>" | null,
"backup_2": "<model_id>" | null,
"backup_3": "<model_id>" | null,
"backup_4": "<model_id>" | null,
},
},
}
Built-in model IDs (always resolvable, no registry entry required):
"claude_cli" — Claude CLI subprocess (~/.claude/.credentials.json)
"gemini_cli" — Gemini CLI subprocess
"gemini_api" — Gemini API (google-genai SDK; used by orchestrator engine, not llm_client)
Standard roles are defined by settings.defined_roles (default: chat,orchestrator,distill,coder,research).
Additional custom roles can be added freely to roles{}.
Resolution for get_model_for_role(username, role):
1. User registry: roles[role].primary → backup_1 → backup_2 → backup_3 → backup_4
2. .env default: ROLE_<ROLE>=<builtin_id> (e.g. ROLE_CHAT=claude_cli)
3. Hardcoded last-resort defaults per role
"""
import json
import logging
import secrets
from pathlib import Path
from config import settings
logger = logging.getLogger(__name__)
# ── Built-in model definitions ────────────────────────────────────────────────
# These IDs are always resolvable without a registry entry.
def _builtins() -> dict[str, dict]:
"""Return built-in model definitions (lazy so settings are resolved at call time)."""
return {
"claude_cli": {
"id": "claude_cli",
"type": "claude_cli",
"label": f"Claude (CLI) — {settings.default_model}",
"model_name": settings.default_model,
"context_k": 200,
"tags": ["chat", "persona", "creative"],
},
"gemini_cli": {
"id": "gemini_cli",
"type": "gemini_cli",
"label": "Gemini (CLI)",
"model_name": "",
"context_k": 1000,
"tags": ["chat", "research", "long_context"],
},
"gemini_api": {
"id": "gemini_api",
"type": "gemini_api",
"label": f"Gemini API — {settings.orchestrator_model}",
"model_name": settings.orchestrator_model,
"context_k": 1000,
"tags": ["orchestrator", "research", "long_context", "tools"],
},
}
# Hardcoded last-resort defaults per role (used only if .env is also unset)
_ROLE_LAST_RESORT: dict[str, str] = {
"chat": "claude_cli",
"orchestrator": "gemini_api",
"distill": "claude_cli",
"coder": "claude_cli",
"research": "gemini_api",
}
PRIORITY_KEYS = ["primary", "backup_1", "backup_2", "backup_3", "backup_4"]
# ── Storage ───────────────────────────────────────────────────────────────────
def _registry_path(username: str) -> Path:
return settings.home_root() / username / "model_registry.json"
def _local_llm_path(username: str) -> Path:
return settings.home_root() / username / "local_llm.json"
def _empty() -> dict:
return {"version": 1, "hosts": [], "models": [], "roles": {}}
def _normalize(data: dict) -> dict:
"""Back-fill any missing fields introduced by schema additions."""
for h in data.get("hosts", []):
h.setdefault("host_type", "openwebui")
return data
def _load(username: str) -> dict:
path = _registry_path(username)
if path.exists():
try:
data = json.loads(path.read_text())
if isinstance(data, dict) and "version" in data:
return _normalize(data)
except (json.JSONDecodeError, OSError):
logger.warning("model_registry.json for %s is unreadable — starting fresh", username)
return _empty()
# No registry yet — try migrating from local_llm.json
legacy = _local_llm_path(username)
if legacy.exists():
data = _migrate_from_local_llm(username, legacy)
_save(username, data)
logger.info("Migrated local_llm.json → model_registry.json for %s", username)
return data
return _empty()
def _save(username: str, data: dict) -> None:
_registry_path(username).write_text(json.dumps(data, indent=2))
# ── Migration ─────────────────────────────────────────────────────────────────
def _migrate_from_local_llm(username: str, path: Path) -> dict:
"""Convert local_llm.json (hosts/models/active_model_id) → model_registry format."""
try:
old = json.loads(path.read_text())
except Exception:
return _empty()
data = _empty()
# Handle v0 flat format
if "hosts" not in old:
api_url = old.get("api_url") or settings.local_api_url
api_key = old.get("api_key") or settings.local_api_key
model_name = old.get("model") or settings.local_model
if not api_url:
return data
host_id = secrets.token_hex(4)
old = {
"hosts": [{"id": host_id, "label": "Local Model Server", "api_url": api_url, "api_key": api_key}],
"models": [{"id": secrets.token_hex(4), "host_id": host_id, "label": model_name, "model_name": model_name}] if model_name else [],
"active_model_id": None,
}
if old["models"]:
old["active_model_id"] = old["models"][0]["id"]
data["hosts"] = old.get("hosts", [])
for m in old.get("models", []):
data["models"].append({
"id": m["id"],
"type": "local_openai",
"label": m.get("label") or m.get("model_name", ""),
"model_name": m.get("model_name", ""),
"host_id": m.get("host_id"),
"context_k": 0,
"tags": [],
})
# Build initial role assignments
active_id = old.get("active_model_id")
distill_type = settings.distill_backend_mid or None
roles: dict[str, dict] = {}
if active_id and any(m["id"] == active_id for m in data["models"]):
roles["chat"] = {"primary": active_id}
if distill_type == "local" and active_id:
roles["distill"] = {"primary": active_id}
data["roles"] = roles
return data
# ── Model resolution ──────────────────────────────────────────────────────────
def _resolve_model(registry: dict, model_id: str) -> dict | None:
"""Resolve a model_id to its full config dict, or None if not found."""
builtins = _builtins()
# Built-in IDs take priority over user-defined entries with the same ID
if model_id in builtins:
return dict(builtins[model_id])
model = next((m for m in registry.get("models", []) if m["id"] == model_id), None)
if not model:
return None
if model.get("type") == "local_openai":
host_id = model.get("host_id")
host = next((h for h in registry.get("hosts", []) if h["id"] == host_id), None)
if not host:
logger.warning("model %s references missing host_id %s", model_id, host_id)
return None
return {
**model,
"api_url": host.get("api_url", ""),
"api_key": host.get("api_key", ""),
"host_type": host.get("host_type", "openwebui"),
}
return dict(model)
def get_model_for_role(username: str, role: str) -> dict | None:
"""
Return the resolved model config for the given role.
Resolution order:
1. User registry: roles[role].primary → backup_1 → ... → backup_4
2. .env: ROLE_<ROLE> = builtin model ID
3. Hardcoded last-resort default per role
4. claude_cli (absolute fallback)
"""
registry = _load(username)
role_cfg = registry.get("roles", {}).get(role, {})
for key in PRIORITY_KEYS:
model_id = role_cfg.get(key)
if not model_id:
continue
resolved = _resolve_model(registry, model_id)
if resolved:
return resolved
logger.debug("role %s.%s = %s but model not found", role, key, model_id)
# .env default
env_type = settings.get_role_default(role)
builtins = _builtins()
if env_type and env_type in builtins:
return dict(builtins[env_type])
# Hardcoded last resort
fallback_id = _ROLE_LAST_RESORT.get(role, "claude_cli")
return dict(builtins.get(fallback_id, builtins["claude_cli"]))
def get_best_local_model(username: str, role: str = "chat") -> dict | None:
"""
Return the best available local_openai model for the given role.
Used when the user explicitly selects "local" backend in the UI.
Tries the role's priority chain first, then any configured local model.
"""
registry = _load(username)
role_cfg = registry.get("roles", {}).get(role, {})
for key in PRIORITY_KEYS:
model_id = role_cfg.get(key)
if not model_id:
continue
resolved = _resolve_model(registry, model_id)
if resolved and resolved.get("type") == "local_openai":
return resolved
# Fall back to first configured local model
for model in registry.get("models", []):
if model.get("type") == "local_openai":
resolved = _resolve_model(registry, model["id"])
if resolved:
return resolved
return None
# ── Read API (for UI and callers) ─────────────────────────────────────────────
def get_registry(username: str) -> dict:
"""Return the full registry (with built-in models injected for display)."""
return _load(username)
def get_all_models(username: str) -> list[dict]:
"""Return all user-defined models (resolved — hosts merged in)."""
registry = _load(username)
out = []
for m in registry.get("models", []):
resolved = _resolve_model(registry, m["id"])
if resolved:
out.append(resolved)
return out
def get_defined_roles(username: str) -> dict[str, dict]:
"""Return the roles section of the registry, filling gaps with empty dicts."""
registry = _load(username)
roles = registry.get("roles", {})
result = {}
for role in settings.get_defined_roles():
result[role] = roles.get(role, {})
return result
# ── Write API (CRUD) ──────────────────────────────────────────────────────────
def save_host(username: str, host_id: str | None,
label: str, api_url: str, api_key: str,
host_type: str = "openwebui") -> str:
"""Create or update a host. Returns the host ID.
host_type: "openwebui" (default) or "openai" (OpenRouter, LiteLLM, etc.)
"""
data = _load(username)
host_type = host_type if host_type in ("openwebui", "openai") else "openwebui"
if host_id:
for h in data["hosts"]:
if h["id"] == host_id:
h["label"] = label.strip()
h["api_url"] = api_url.strip()
h["host_type"] = host_type
if api_key.strip():
h["api_key"] = api_key.strip()
_save(username, data)
return host_id
host_id = None # not found — create new
host_id = secrets.token_hex(4)
data["hosts"].append({
"id": host_id,
"label": label.strip(),
"api_url": api_url.strip(),
"api_key": api_key.strip(),
"host_type": host_type,
})
_save(username, data)
return host_id
def remove_host(username: str, host_id: str) -> bool:
"""Remove a host and all models that reference it. Returns True if found."""
data = _load(username)
before = len(data["hosts"])
data["hosts"] = [h for h in data["hosts"] if h["id"] != host_id]
data["models"] = [m for m in data["models"] if m.get("host_id") != host_id]
# Clear any role assignments that pointed to removed models
removed_ids = {m["id"] for m in data["models"] if m.get("host_id") == host_id}
for role_cfg in data.get("roles", {}).values():
for key in PRIORITY_KEYS:
if role_cfg.get(key) in removed_ids:
role_cfg[key] = None
_save(username, data)
return len(data["hosts"]) < before
def save_model(username: str, model_id: str | None, host_id: str,
label: str, model_name: str, context_k: int = 0,
tags: list[str] | None = None) -> str:
"""Create or update a model entry. Returns the model ID."""
data = _load(username)
tags = tags or []
if model_id:
for m in data["models"]:
if m["id"] == model_id:
m["host_id"] = host_id
m["label"] = label.strip() or model_name.strip()
m["model_name"] = model_name.strip()
m["context_k"] = context_k
m["tags"] = tags
_save(username, data)
return model_id
model_id = None
model_id = secrets.token_hex(4)
data["models"].append({
"id": model_id,
"type": "local_openai",
"label": label.strip() or model_name.strip(),
"model_name": model_name.strip(),
"host_id": host_id,
"context_k": context_k,
"tags": tags,
})
_save(username, data)
return model_id
def remove_model(username: str, model_id: str) -> bool:
"""Remove a model and clear any role assignments pointing to it."""
data = _load(username)
before = len(data["models"])
data["models"] = [m for m in data["models"] if m["id"] != model_id]
for role_cfg in data.get("roles", {}).values():
for key in PRIORITY_KEYS:
if role_cfg.get(key) == model_id:
role_cfg[key] = None
_save(username, data)
return len(data["models"]) < before
def set_role(username: str, role: str, priority: str, model_id: str | None) -> bool:
"""
Assign a model to a role priority slot.
priority must be one of: primary, backup_1, backup_2, backup_3, backup_4
model_id None clears the slot.
model_id "claude_cli" / "gemini_cli" / "gemini_api" are valid built-in IDs.
Returns False if model_id is set but not found.
"""
if priority not in PRIORITY_KEYS:
return False
data = _load(username)
if model_id and model_id not in _builtins():
if not any(m["id"] == model_id for m in data["models"]):
return False
roles = data.setdefault("roles", {})
if role not in roles:
roles[role] = {}
roles[role][priority] = model_id or None
_save(username, data)
return True
def fetch_models_from_host(api_url: str, api_key: str) -> list[str]:
"""Synchronously fetch the model list from an OpenAI-compatible host."""
import httpx
url = api_url.rstrip("/") + "/api/models"
headers = {"Authorization": f"Bearer {api_key}"} if api_key else {}
resp = httpx.get(url, headers=headers, timeout=10)
resp.raise_for_status()
data = resp.json()
models = data.get("data", [])
return sorted(m.get("id", m.get("name", "")) for m in models if m.get("id") or m.get("name"))