Files
Cortex-Inara/cortex/janitor.py
Scott Idem 67f5db70a3 feat: janitor role — session checkpoint compaction
New cortex/janitor.py runs before each orchestrator dispatch. When a session
exceeds 20 user turns or ~12K estimated tokens, the oldest half is summarized
by the janitor role model and replaced with a compact checkpoint message.
Fail-safe: always returns original history if the model call fails.

Config: JANITOR_TURN_THRESHOLD, JANITOR_TOKEN_THRESHOLD in .env.
Assign Gemma E4B or Haiku 4.5 to the janitor role for effectively-free compaction.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-17 21:32:54 -04:00

118 lines
4.1 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Session checkpoint compaction ("janitor").
Called before each orchestrator run. When a session exceeds the configured turn
or token threshold, the oldest half of the history is summarized by the janitor
role model and replaced with a compact checkpoint message. This keeps the token
count passed to the orchestrator lean while preserving a faithful record of what
happened earlier in the session.
The janitor role should be assigned a cheap, fast model — a small local model
(Gemma E4B) or a lightweight cloud model (Haiku 4.5). It has no tools and the
task is simple enough that quality matters less than speed and cost.
Thresholds (configurable in .env):
JANITOR_TURN_THRESHOLD — compact after N user turns (default: 20)
JANITOR_TOKEN_THRESHOLD — compact after ~N estimated tokens (default: 12000)
"""
import logging
from config import settings
logger = logging.getLogger(__name__)
_SYSTEM = "You are a concise summarizer. Write only the summary — no preamble, no labels."
_PROMPT_TMPL = """\
Summarize the conversation below in 38 sentences. Capture what was discussed, \
any decisions or conclusions reached, and key specifics (names, values, file paths, etc.). \
Write only the summary paragraph.
CONVERSATION:
{conversation}"""
def _format_messages(messages: list[dict]) -> str:
lines = []
for m in messages:
role = m.get("role", "unknown").upper()
content = (m.get("content") or "").strip()
if not content:
continue
# Cap individual messages so the prompt stays manageable for small models
if len(content) > 600:
content = content[:600] + ""
lines.append(f"[{role}]: {content}")
return "\n".join(lines)
async def maybe_checkpoint(session_id: str) -> list[dict]:
"""
Load the session, compact if thresholds are exceeded, and return the
message list to use for the upcoming orchestrator run.
Always returns a list — returns the original (unchanged) list if:
- the session does not exist yet
- thresholds are not met
- the janitor model call fails (fail-safe: never discard history)
"""
from session_store import load, save
messages = load(session_id)
if not messages:
return []
turn_count = sum(1 for m in messages if m["role"] == "user")
estimated_tokens = sum(len(m.get("content") or "") for m in messages) // 4
if (turn_count < settings.janitor_turn_threshold
and estimated_tokens < settings.janitor_token_threshold):
return messages
# Walk back to a clean turn boundary so we never split mid-exchange.
# midpoint lands on an "assistant" message boundary.
midpoint = len(messages) // 2
while midpoint > 0 and messages[midpoint - 1].get("role") != "assistant":
midpoint -= 1
if midpoint < 4:
# Too short to compact meaningfully — threshold likely set very low
return messages
old_messages = messages[:midpoint]
recent_messages = messages[midpoint:]
conversation_text = _format_messages(old_messages)
summary_prompt = _PROMPT_TMPL.format(conversation=conversation_text)
try:
from llm_client import complete as llm_complete
summary, backend = await llm_complete(
system_prompt=_SYSTEM,
messages=[{"role": "user", "content": summary_prompt}],
role="janitor",
)
checkpoint_msg = {
"role": "assistant",
"content": (
f"[Session checkpoint — {len(old_messages)} messages summarized "
f"via {backend}]\n\n{summary.strip()}"
),
}
compacted = [checkpoint_msg] + recent_messages
save(session_id, compacted)
logger.info(
"Janitor: session=%s compacted %d%d messages (turns=%d ~%d tokens) via %s",
session_id, len(messages), len(compacted), turn_count, estimated_tokens, backend,
)
return compacted
except Exception as exc:
# Fail-safe: never lose history because the janitor model is unavailable
logger.warning("Janitor skipped for session %s: %s", session_id, exc)
return messages