Files
Cortex-Inara/cortex/llm_client.py
Scott Idem 97438f1a0f feat: multi-instance support — agent_name and user_name configurable
All hardcoded "Inara"/"Scott" strings replaced with settings.agent_name
and settings.user_name, read from .env at startup:

- config.py: AGENT_NAME and USER_NAME settings (defaults: Inara / Scott)
- llm_client.py: conversation labels in prompt builder
- session_logger.py: **Name:** labels in session log markdown
- memory_distiller.py: distillation system prompts (mid + long)
- routers/nextcloud_talk.py: @mention prefix strip
- routers/google_chat.py: greeting message

Second instance scaffolding:
- holly/: identity directory with placeholder files (USER_NAME=Holly,
  AGENT_NAME to be chosen by Holly)
- cortex/.env.holly: config for Holly's instance on port 8001
- cortex-holly.service: systemd unit for the second instance

No behavioural change to the Inara/Scott instance — defaults unchanged.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-18 20:13:11 -04:00

210 lines
6.5 KiB
Python

import asyncio
import logging
import os
import signal
import subprocess
from config import settings
logger = logging.getLogger(__name__)
# Track active Gemini process group IDs so we can kill them on shutdown
_active_pgroups: set[int] = set()
def _register_pgroup(pid: int) -> None:
_active_pgroups.add(pid)
def _unregister_pgroup(pid: int) -> None:
_active_pgroups.discard(pid)
async def cleanup() -> None:
"""Kill any lingering Gemini process groups. Call from lifespan shutdown."""
for pid in list(_active_pgroups):
try:
os.killpg(pid, signal.SIGKILL)
logger.info("Shutdown: killed Gemini process group %d", pid)
except ProcessLookupError:
pass
_active_pgroups.clear()
async def complete(
system_prompt: str,
messages: list[dict],
model: str | None = None,
max_tokens: int = 2048,
) -> tuple[str, str]:
"""Returns (response_text, actual_backend_used)."""
if model in ("claude", "gemini"):
primary = model
else:
primary = settings.primary_backend
fallback = "gemini" if primary == "claude" else "claude"
try:
response = await _dispatch(primary, system_prompt, messages, model)
return response, primary
except Exception as e:
logger.warning("%s failed (%s) — falling back to %s", primary, e, fallback)
response = await _dispatch(fallback, system_prompt, messages, None)
return response, fallback
async def _dispatch(
backend: str,
system_prompt: str,
messages: list[dict],
model: str | None,
) -> str:
if backend == "gemini":
return await _gemini(system_prompt, messages)
return await _claude(system_prompt, messages, model)
def _fresh_claude_token() -> str | None:
"""Read the current OAuth access token from the Claude credentials file.
The token in the systemd .env goes stale (it rotates on each login).
Reading directly from ~/.claude/.credentials.json always gets the latest.
"""
import json as _json
creds_path = os.path.expanduser("~/.claude/.credentials.json")
try:
with open(creds_path) as f:
data = _json.load(f)
return data["claudeAiOauth"]["accessToken"]
except Exception as e:
logger.debug("Could not read Claude credentials file: %s", e)
return None
async def _claude(system_prompt: str, messages: list[dict], model: str | None) -> str:
cmd = [
"claude", "--print",
"--no-session-persistence",
"--output-format", "text",
]
if model and model not in ("claude", "gemini"):
cmd.extend(["--model", model])
if system_prompt:
cmd.extend(["--system-prompt", system_prompt])
cmd.append(_build_conversation(messages))
# Always use the freshest token from the credentials file so the systemd
# service doesn't break when the env-var token rotates after a login.
env = os.environ.copy()
token = _fresh_claude_token()
if token:
env["CLAUDE_CODE_OAUTH_TOKEN"] = token
env.pop("ANTHROPIC_API_KEY", None) # never let a stale API key override OAuth
return await _run(cmd, timeout=settings.timeout_claude, env=env)
async def _gemini(system_prompt: str, messages: list[dict]) -> str:
# Gemini CLI spawns MCP child processes that keep stdout pipes open after responding.
# start_new_session=True puts the whole tree in its own process group so
# os.killpg kills everything at once on timeout.
cmd = [
"gemini",
"--output-format", "text",
"--extensions", "", # disable all extensions — prevents MCP child processes
"-p", _build_prompt(system_prompt, messages),
]
try:
proc = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
start_new_session=True,
)
except FileNotFoundError:
raise RuntimeError("gemini not found in PATH")
_register_pgroup(proc.pid)
timeout = settings.timeout_gemini
try:
stdout_bytes, _ = await asyncio.wait_for(proc.communicate(), timeout=timeout)
raw = stdout_bytes.decode()
except asyncio.TimeoutError:
try:
os.killpg(proc.pid, signal.SIGKILL)
except ProcessLookupError:
pass
raise RuntimeError(f"Gemini timed out after {timeout}s")
except asyncio.CancelledError:
try:
os.killpg(proc.pid, signal.SIGKILL)
except ProcessLookupError:
pass
raise
finally:
_unregister_pgroup(proc.pid)
clean = _clean_gemini_output(raw)
if not clean:
raise RuntimeError("Gemini returned an empty response")
return clean
# Lines Gemini CLI writes to stdout that are not part of the actual response
_GEMINI_NOISE = (
"Loaded cached credentials",
"Loading extension:",
"Server '",
"Listening for",
"Model is overloaded",
"High demand",
"Retrying",
"retrying",
"429",
"quota",
)
def _clean_gemini_output(text: str) -> str:
lines = [
line for line in text.splitlines()
if not any(line.strip().startswith(p) for p in _GEMINI_NOISE)
]
return "\n".join(lines).strip()
async def _run(cmd: list[str], timeout: int = 60, env: dict | None = None) -> str:
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(
None,
lambda: subprocess.run(cmd, capture_output=True, text=True, timeout=timeout, env=env),
)
if result.returncode != 0:
detail = result.stderr.strip() or result.stdout.strip() or f"exit code {result.returncode}"
raise RuntimeError(f"{cmd[0]} failed: {detail}")
return result.stdout.strip()
def _build_conversation(messages: list[dict]) -> str:
"""Conversation only — used for Claude (system prompt passed separately)."""
parts = []
prior = messages[:-1]
if prior:
history_lines = []
for msg in prior:
label = settings.user_name if msg["role"] == "user" else settings.agent_name
history_lines.append(f"{label}: {msg['content']}")
parts.append("<conversation>\n" + "\n\n".join(history_lines) + "\n</conversation>")
parts.append(messages[-1]["content"] if messages else "")
return "\n\n".join(parts)
def _build_prompt(system_prompt: str, messages: list[dict]) -> str:
"""Full prompt with system context embedded — used for Gemini."""
parts = []
if system_prompt:
parts.append(f"<system>\n{system_prompt}\n</system>")
parts.append(_build_conversation(messages))
return "\n\n".join(parts)