diff --git a/cortex/config.py b/cortex/config.py
index 4b591a3..2678117 100644
--- a/cortex/config.py
+++ b/cortex/config.py
@@ -3,7 +3,7 @@ from pydantic_settings import BaseSettings, SettingsConfigDict
class Settings(BaseSettings):
- anthropic_api_key: str | None = None # not used — claude CLI handles auth
+ anthropic_api_key: str | None = None # not used — configure via model registry
# Google OAuth — "Sign in with Google" for all users
# Create credentials at console.cloud.google.com → APIs & Services → Credentials
@@ -38,7 +38,6 @@ class Settings(BaseSettings):
default_model: str = "claude-sonnet-4-6"
default_tier: int = 2
max_history_messages: int = 40 # rolling window — 20 turns (user + assistant)
- primary_backend: str = "claude" # "claude" | "local" — gemini CLI removed June 2026
# Local model backend — OpenAI-compatible API (Open WebUI / Ollama)
# Set LOCAL_API_URL in .env to enable; leave blank to disable
@@ -46,9 +45,6 @@ class Settings(BaseSettings):
local_api_key: str = "" # sk-... from Open WebUI → Settings → Account → API Keys
local_model: str = "" # workspace or model name, e.g. test-agent-simple
- # Per-backend timeouts in seconds
- timeout_claude: int = 60
- timeout_gemini: int = 120 # frequently slow under load
timeout_local: int = 300 # local models may need to load first
# Auto-distillation schedule — override in .env
@@ -66,14 +62,13 @@ class Settings(BaseSettings):
distill_backend_long: str = ""
# Model registry: default backend type per role when user registry has no entry.
- # Values: "claude_cli" | "gemini_cli" | "gemini_api" (builtin IDs)
- # Override in .env: ROLE_CHAT=claude_cli ROLE_DISTILL=gemini_api etc.
- role_chat: str = "claude_cli"
- role_orchestrator: str = "gemini_api"
- role_distill: str = "claude_cli"
- role_janitor: str = "claude_cli" # assign a cheap/fast model: Haiku 4.5, local Gemma E4B
- role_coder: str = "claude_cli"
- role_research: str = "gemini_api"
+ # All roles must be configured via /settings/models — no built-in fallback.
+ role_chat: str = ""
+ role_orchestrator: str = ""
+ role_distill: str = ""
+ role_janitor: str = ""
+ role_coder: str = ""
+ role_research: str = ""
# Comma-separated list of standard roles shown in the model settings UI.
# Add custom roles here to extend the UI without code changes.
@@ -122,8 +117,8 @@ class Settings(BaseSettings):
return [r.strip() for r in self.defined_roles.split(",") if r.strip()]
def get_role_default(self, role: str) -> str:
- """Return the .env default backend type for a role (e.g. 'claude_cli')."""
- return getattr(self, f"role_{role.replace('-', '_')}", "claude_cli")
+ """Return the .env default backend type for a role, or '' if unconfigured."""
+ return getattr(self, f"role_{role.replace('-', '_')}", "")
def home_root(self) -> Path:
"""Resolve home_dir relative to this file's location if not absolute."""
diff --git a/cortex/llm_client.py b/cortex/llm_client.py
index 1a0ddbc..98b30b6 100644
--- a/cortex/llm_client.py
+++ b/cortex/llm_client.py
@@ -1,50 +1,18 @@
import asyncio
import logging
-import os
-import signal
-import subprocess
from config import settings
-import event_bus
logger = logging.getLogger(__name__)
-# Track active Gemini process group IDs so we can kill them on shutdown
-_active_pgroups: set[int] = set()
-
-
-def _register_pgroup(pid: int) -> None:
- _active_pgroups.add(pid)
-
-
-def _unregister_pgroup(pid: int) -> None:
- _active_pgroups.discard(pid)
-
-
-async def cleanup() -> None:
- """Kill any lingering Gemini process groups. Call from lifespan shutdown."""
- for pid in list(_active_pgroups):
- try:
- os.killpg(pid, signal.SIGKILL)
- logger.info("Shutdown: killed Gemini process group %d", pid)
- except ProcessLookupError:
- pass
- _active_pgroups.clear()
-
-
-# Map from registry model type → dispatch function key
_TYPE_TO_BACKEND = {
- "claude_cli": "claude",
- "gemini_cli": "gemini", # Gemini CLI is being replaced by Antigravity CLI (June 2026)
- "gemini_api": "gemini", # routes to CLI subprocess — no users configured; kept for compat
"local_openai": "local",
"anthropic_api": "anthropic_api",
}
-# Explicit UI toggle values (kept for backward compat)
-_EXPLICIT_BACKENDS = ("claude", "gemini", "local")
-# Gemini CLI removed from the claude fallback — it's shutting down June 18 2026.
-# claude failures now surface directly; gemini backend still falls back to claude.
-_FALLBACK: dict[str, str | None] = {"claude": None, "gemini": "claude", "local": "claude", "anthropic_api": "claude"}
+_FALLBACK: dict[str, str | None] = {
+ "local": None,
+ "anthropic_api": None,
+}
async def complete(
@@ -55,16 +23,15 @@ async def complete(
slot: str | None = None,
max_tokens: int = 2048,
attachment: dict | None = None,
- token_sink=None, # async (str) -> None; if set, stream tokens as they arrive
+ token_sink=None,
) -> tuple[str, str]:
"""
Returns (response_text, actual_backend_used).
- slot: Phase 3 — specific role slot ("primary" | "backup_1" | "backup_2").
- Resolves that exact slot, no fallback chain. Takes priority over model.
- model: legacy backend override ("claude" | "gemini" | "local") from old toggle.
- None = resolve via model registry for the given role.
- role: registry role used for slot/auto routing (default: "chat").
+ slot: explicit role slot ("primary" | "backup_1" | "backup_2").
+ Resolves that exact slot, no fallback chain. Takes priority over role.
+ role: registry role used for auto routing (default: "chat").
+ model: ignored — kept for API compatibility; routing is via slot/role only.
"""
import model_registry as _reg
from persona import _user
@@ -73,46 +40,33 @@ async def complete(
resolved_cfg: dict | None = None
if slot is not None:
- # Phase 3: explicit slot selection — no fallback within the role
resolved_cfg = _reg.get_model_for_slot(username, role, slot)
if resolved_cfg:
- primary = _TYPE_TO_BACKEND.get(resolved_cfg["type"], "claude")
+ primary = _TYPE_TO_BACKEND.get(resolved_cfg["type"], "local")
else:
- # Slot not configured — fall through to auto routing
slot = None
if slot is None:
- if model in _EXPLICIT_BACKENDS:
- # Legacy: explicit backend override from old UI toggle
- if model == "local":
- resolved_cfg = _reg.get_best_local_model(username, role)
- if not resolved_cfg:
- raise RuntimeError("No local model configured — add one at /settings/models")
- primary = model
+ resolved = _reg.get_model_for_role(username, role)
+ if resolved:
+ resolved_cfg = resolved
+ primary = _TYPE_TO_BACKEND.get(resolved["type"], "local")
else:
- # Auto: role-based routing via model registry
- resolved = _reg.get_model_for_role(username, role)
- if resolved:
- resolved_cfg = resolved
- primary = _TYPE_TO_BACKEND.get(resolved["type"], "claude")
- else:
- primary = settings.primary_backend
+ raise RuntimeError(
+ f"No model configured for role '{role}'. "
+ "Add one at /settings/models."
+ )
- fallback = _FALLBACK.get(primary, "claude")
+ fallback = _FALLBACK.get(primary)
try:
response = await _dispatch(primary, system_prompt, messages, resolved_cfg,
attachment=attachment, token_sink=token_sink)
return response, primary
except Exception as e:
- err_str = str(e)
- if primary == "claude" and any(k in err_str for k in ("401", "authenticate", "expired", "OAuth")):
- await event_bus.publish({"type": "claude_auth_expired"})
- # Surface errors when a model is explicitly configured or a specific slot was pinned.
if resolved_cfg is not None:
logger.error("%s failed (no fallback — model explicitly configured): %s", primary, e)
raise
- # No fallback defined for this backend — surface the error directly.
if not fallback:
logger.error("%s failed (no fallback configured): %s", primary, e)
raise
@@ -129,9 +83,7 @@ async def _dispatch(
attachment: dict | None = None,
token_sink=None,
) -> str:
- if backend == "gemini":
- text = await _gemini(system_prompt, messages)
- elif backend == "local":
+ if backend == "local":
if token_sink:
return await _local_streaming(token_sink, system_prompt, messages, model_cfg)
text = await _local(system_prompt, messages, model_cfg, attachment=attachment)
@@ -140,55 +92,12 @@ async def _dispatch(
return await _anthropic_api_streaming(token_sink, system_prompt, messages, model_cfg)
text = await _anthropic_api(system_prompt, messages, model_cfg)
else:
- text = await _claude(system_prompt, messages, model_cfg)
- # For non-streaming backends when token_sink is provided, emit the full text as one chunk.
+ raise RuntimeError(f"Unknown backend '{backend}' — check model type in registry")
if token_sink and text:
await token_sink(text)
return text
-def _fresh_claude_token() -> str | None:
- """Read the current OAuth access token from the Claude credentials file.
-
- The token in the systemd .env goes stale (it rotates on each login).
- Reading directly from ~/.claude/.credentials.json always gets the latest.
- """
- import json as _json
- creds_path = os.path.expanduser("~/.claude/.credentials.json")
- try:
- with open(creds_path) as f:
- data = _json.load(f)
- return data["claudeAiOauth"]["accessToken"]
- except Exception as e:
- logger.debug("Could not read Claude credentials file: %s", e)
- return None
-
-
-async def _claude(system_prompt: str, messages: list[dict], model_cfg: dict | None) -> str:
- model_name = (model_cfg or {}).get("model_name") if model_cfg else None
- cmd = [
- "claude", "--print",
- "--no-session-persistence",
- "--output-format", "text",
- ]
- # Only pass --model if it's a real model name (not a backend type string)
- if model_name and model_name not in ("claude", "gemini", "local", ""):
- cmd.extend(["--model", model_name])
- if system_prompt:
- cmd.extend(["--system-prompt", system_prompt])
- cmd.append(_build_conversation(messages))
-
- # Always use the freshest token from the credentials file so the systemd
- # service doesn't break when the env-var token rotates after a login.
- env = os.environ.copy()
- token = _fresh_claude_token()
- if token:
- env["CLAUDE_CODE_OAUTH_TOKEN"] = token
- env.pop("ANTHROPIC_API_KEY", None) # never let a stale API key override OAuth
-
- return await _run(cmd, timeout=settings.timeout_claude, env=env)
-
-
async def _local(
system_prompt: str,
messages: list[dict],
@@ -413,106 +322,3 @@ async def _local_streaming(
return full_text.strip()
-async def _gemini(system_prompt: str, messages: list[dict]) -> str:
- # Gemini CLI spawns MCP child processes that keep stdout pipes open after responding.
- # start_new_session=True puts the whole tree in its own process group so
- # os.killpg kills everything at once on timeout.
- cmd = [
- "gemini",
- "--output-format", "text",
- "--extensions", "", # disable all extensions — prevents MCP child processes
- "-p", _build_prompt(system_prompt, messages),
- ]
-
- try:
- proc = await asyncio.create_subprocess_exec(
- *cmd,
- stdout=asyncio.subprocess.PIPE,
- stderr=asyncio.subprocess.PIPE,
- start_new_session=True,
- )
- except FileNotFoundError:
- raise RuntimeError("gemini not found in PATH")
-
- _register_pgroup(proc.pid)
- timeout = settings.timeout_gemini
- try:
- stdout_bytes, _ = await asyncio.wait_for(proc.communicate(), timeout=timeout)
- raw = stdout_bytes.decode()
- except asyncio.TimeoutError:
- try:
- os.killpg(proc.pid, signal.SIGKILL)
- except ProcessLookupError:
- pass
- raise RuntimeError(f"Gemini timed out after {timeout}s")
- except asyncio.CancelledError:
- try:
- os.killpg(proc.pid, signal.SIGKILL)
- except ProcessLookupError:
- pass
- raise
- finally:
- _unregister_pgroup(proc.pid)
-
- clean = _clean_gemini_output(raw)
- if not clean:
- raise RuntimeError("Gemini returned an empty response")
- return clean
-
-
-# Lines Gemini CLI writes to stdout that are not part of the actual response
-_GEMINI_NOISE = (
- "Loaded cached credentials",
- "Loading extension:",
- "Server '",
- "Listening for",
- "Model is overloaded",
- "High demand",
- "Retrying",
- "retrying",
- "429",
- "quota",
-)
-
-
-def _clean_gemini_output(text: str) -> str:
- lines = [
- line for line in text.splitlines()
- if not any(line.strip().startswith(p) for p in _GEMINI_NOISE)
- ]
- return "\n".join(lines).strip()
-
-
-async def _run(cmd: list[str], timeout: int = 60, env: dict | None = None) -> str:
- loop = asyncio.get_running_loop()
- result = await loop.run_in_executor(
- None,
- lambda: subprocess.run(cmd, capture_output=True, text=True, timeout=timeout, env=env),
- )
- if result.returncode != 0:
- detail = result.stderr.strip() or result.stdout.strip() or f"exit code {result.returncode}"
- raise RuntimeError(f"{cmd[0]} failed: {detail}")
- return result.stdout.strip()
-
-
-def _build_conversation(messages: list[dict]) -> str:
- """Conversation only — used for Claude (system prompt passed separately)."""
- parts = []
- prior = messages[:-1]
- if prior:
- history_lines = []
- for msg in prior:
- label = settings.user_name if msg["role"] == "user" else settings.agent_name
- history_lines.append(f"{label}: {msg['content']}")
- parts.append("