""" Session checkpoint compaction ("janitor"). Called before each orchestrator run. When a session exceeds the configured turn or token threshold, the oldest half of the history is summarized by the janitor role model and replaced with a compact checkpoint message. This keeps the token count passed to the orchestrator lean while preserving a faithful record of what happened earlier in the session. The janitor role should be assigned a cheap, fast model — a small local model (Gemma E4B) or a lightweight cloud model (Haiku 4.5). It has no tools and the task is simple enough that quality matters less than speed and cost. Thresholds (configurable in .env): JANITOR_TURN_THRESHOLD — compact after N user turns (default: 20) JANITOR_TOKEN_THRESHOLD — compact after ~N estimated tokens (default: 12000) """ import logging from config import settings logger = logging.getLogger(__name__) _SYSTEM = "You are a concise summarizer. Write only the summary — no preamble, no labels." _PROMPT_TMPL = """\ Summarize the conversation below in 3–8 sentences. Capture what was discussed, \ any decisions or conclusions reached, and key specifics (names, values, file paths, etc.). \ Write only the summary paragraph. CONVERSATION: {conversation}""" def _format_messages(messages: list[dict]) -> str: lines = [] for m in messages: role = m.get("role", "unknown").upper() content = (m.get("content") or "").strip() if not content: continue # Cap individual messages so the prompt stays manageable for small models if len(content) > 600: content = content[:600] + "…" lines.append(f"[{role}]: {content}") return "\n".join(lines) async def maybe_checkpoint(session_id: str) -> list[dict]: """ Load the session, compact if thresholds are exceeded, and return the message list to use for the upcoming orchestrator run. Always returns a list — returns the original (unchanged) list if: - the session does not exist yet - thresholds are not met - the janitor model call fails (fail-safe: never discard history) """ from session_store import load, save messages = load(session_id) if not messages: return [] turn_count = sum(1 for m in messages if m["role"] == "user") estimated_tokens = sum(len(m.get("content") or "") for m in messages) // 4 if (turn_count < settings.janitor_turn_threshold and estimated_tokens < settings.janitor_token_threshold): return messages # Walk back to a clean turn boundary so we never split mid-exchange. # midpoint lands on an "assistant" message boundary. midpoint = len(messages) // 2 while midpoint > 0 and messages[midpoint - 1].get("role") != "assistant": midpoint -= 1 if midpoint < 4: # Too short to compact meaningfully — threshold likely set very low return messages old_messages = messages[:midpoint] recent_messages = messages[midpoint:] conversation_text = _format_messages(old_messages) summary_prompt = _PROMPT_TMPL.format(conversation=conversation_text) try: from llm_client import complete as llm_complete summary, backend = await llm_complete( system_prompt=_SYSTEM, messages=[{"role": "user", "content": summary_prompt}], role="janitor", ) checkpoint_msg = { "role": "assistant", "content": ( f"[Session checkpoint — {len(old_messages)} messages summarized " f"via {backend}]\n\n{summary.strip()}" ), } compacted = [checkpoint_msg] + recent_messages save(session_id, compacted) logger.info( "Janitor: session=%s compacted %d→%d messages (turns=%d ~%d tokens) via %s", session_id, len(messages), len(compacted), turn_count, estimated_tokens, backend, ) return compacted except Exception as exc: # Fail-safe: never lose history because the janitor model is unavailable logger.warning("Janitor skipped for session %s: %s", session_id, exc) return messages