Files
Cortex-Inara/cortex/memory_distiller.py
Scott Idem 508fb638ad feat: distill safeguards — rolling backups + sanity checks
Before any memory file is overwritten, _rotate_backup() keeps 2 rolling
backups: MEMORY_*.bak1.md (most recent) and MEMORY_*.bak2.md (older).

_sanity_check() now also guards against size anomalies: the new content
must be between 40% and 250% of the old file size — anything outside that
range looks like truncation or runaway output and aborts the write.
Existing checks (min length, refusal phrases) still apply.

Backup files exposed in the Files panel (ALLOWED set) so they can be
reviewed and manually restored if needed.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-05 18:54:27 -04:00

285 lines
10 KiB
Python

"""
Tiered memory distillation.
distill_short() — roll recent session logs → MEMORY_SHORT.md (no LLM)
distill_mid() — summarize MEMORY_SHORT → MEMORY_MID.md (LLM)
distill_long() — integrate MEMORY_MID → MEMORY_LONG.md (LLM)
Before any file is overwritten, two rolling backups are kept:
MEMORY_*.bak1.md — most recent backup (created just before last write)
MEMORY_*.bak2.md — backup before that
LLM responses are sanity-checked before writing. If the response looks like
a refusal, is too short, or is obviously not memory content, the distill is
aborted and the original file is left untouched.
"""
import logging
from datetime import datetime
from pathlib import Path
from config import settings
from persona import persona_path as _persona_path
logger = logging.getLogger(__name__)
# Rough chars-per-token estimate for budget enforcement
_CHARS_PER_TOKEN = 4
# Phrases that indicate the LLM refused or misunderstood the task
_REFUSAL_PREFIXES = (
"i'm sorry",
"i am sorry",
"i can't",
"i cannot",
"i'm unable",
"i am unable",
"as an ai",
"as a language model",
"i don't have access",
"i do not have access",
"i'm not able",
"i am not able",
)
# Minimum characters for a valid mid/long distill response
_MIN_RESPONSE_CHARS = 80
def _budget_chars(tokens: int) -> int:
return tokens * _CHARS_PER_TOKEN
def _read(path: Path) -> str:
return path.read_text() if path.exists() else ""
def _rotate_backup(path: Path, n: int = 2) -> None:
"""Rotate up to n rolling backups of path before a write.
MEMORY_LONG.md → MEMORY_LONG.bak1.md (most recent), MEMORY_LONG.bak2.md (older)
"""
if not path.exists():
return
# Shift older backups down: bak(n-1) → bak(n), …, bak1 stays as bak1 source
for i in range(n, 1, -1):
older = path.parent / f"{path.stem}.bak{i}.md"
newer = path.parent / f"{path.stem}.bak{i - 1}.md"
if newer.exists():
older.write_text(newer.read_text())
# Current file → bak1
bak1 = path.parent / f"{path.stem}.bak1.md"
bak1.write_text(path.read_text())
def _sanity_check(response_text: str, context: str, existing_content: str = "") -> str | None:
"""Return an error string if the LLM response looks invalid, else None.
Checks:
- Minimum absolute length
- Refusal / AI preamble phrases
- Size shrinkage: new content must be at least 40% of the old (catches truncation)
- Size explosion: new content must not exceed 250% of the old (catches runaway output)
(Both bounds only apply when an existing file is present and reasonably sized.)
"""
stripped = response_text.strip()
if len(stripped) < _MIN_RESPONSE_CHARS:
return f"{context}: response too short ({len(stripped)} chars) — not writing"
first_line = stripped.lower().splitlines()[0]
if any(first_line.startswith(p) for p in _REFUSAL_PREFIXES):
return f"{context}: response looks like a refusal — not writing"
if existing_content:
old_len = len(existing_content.strip())
new_len = len(stripped)
if old_len >= _MIN_RESPONSE_CHARS * 4: # only compare when old file has real content
ratio = new_len / old_len
if ratio < 0.40:
return (
f"{context}: new content is only {ratio:.0%} of the old "
f"({new_len} vs {old_len} chars) — looks truncated, not writing"
)
if ratio > 2.50:
return (
f"{context}: new content is {ratio:.0%} of the old "
f"({new_len} vs {old_len} chars) — looks like runaway output, not writing"
)
return None
def distill_short(username: str, persona: str) -> dict:
"""
Roll the most recent session log files into MEMORY_SHORT.md.
No LLM involved — pure aggregation with budget truncation.
Files are included newest-first until the budget is reached,
then written in chronological order (oldest first).
"""
inara_dir = _persona_path(username, persona)
sessions_dir = inara_dir / "sessions"
budget = _budget_chars(settings.memory_budget_short)
session_files = (
sorted(sessions_dir.glob("*.md"), reverse=True)
if sessions_dir.exists()
else []
)
parts = []
total_chars = 0
for sf in session_files:
content = sf.read_text()
if total_chars + len(content) > budget and parts:
break # always include at least one file
parts.append((sf.name, content))
total_chars += len(content)
if total_chars >= budget:
break
now = datetime.now().strftime("%Y-%m-%d %H:%M")
header = (
f"# MEMORY_SHORT.md — Recent Session Digest\n\n"
f"*Auto-generated: {now}. {len(parts)} session file(s).*\n\n---\n\n"
)
# Write in chronological order (oldest first)
body = "\n\n".join(
f"--- {name} ---\n{content}" for name, content in reversed(parts)
)
out_path = inara_dir / "MEMORY_SHORT.md"
_rotate_backup(out_path)
out_path.write_text(header + body)
logger.info("distill_short [%s/%s]: wrote %d chars from %d files", username, persona, len(header) + len(body), len(parts))
return {
"files_included": len(parts),
"chars_written": len(header) + len(body),
"budget_chars": budget,
}
async def distill_mid(username: str, persona: str) -> dict:
"""
Ask the LLM to summarize MEMORY_SHORT.md → MEMORY_MID.md.
Backs up the current MEMORY_MID.md before overwriting.
"""
from llm_client import complete
from persona import set_context
u, p = username, persona
set_context(u, p)
inara_dir = _persona_path(u, p)
short_content = _read(inara_dir / "MEMORY_SHORT.md")
existing_mid = _read(inara_dir / "MEMORY_MID.md")
if not short_content.strip() or "Not yet populated" in short_content:
return {"error": "MEMORY_SHORT.md is empty — run distill/short first"}
budget_tokens = settings.memory_budget_mid
persona_name = p.title()
user_name = u.title()
system_prompt = (
f"You are {persona_name}'s memory distillation system. "
"Summarize the following recent session logs into a concise mid-term memory digest. "
f"Target length: under {budget_tokens} tokens. "
"Focus on: recurring themes, important decisions made, ongoing projects, "
f"{user_name}'s current state and priorities, and anything that should persist into future sessions. "
f"Write in first person as {persona_name} (e.g. '{user_name} and I worked on...'). "
"Use markdown headings. Be specific and concrete — no filler."
)
response_text, backend = await complete(
system_prompt=system_prompt,
messages=[{"role": "user", "content": short_content}],
role="distill",
)
err = _sanity_check(response_text, "distill_mid", existing_mid)
if err:
logger.warning(err)
return {"error": err}
now = datetime.now().strftime("%Y-%m-%d %H:%M")
header = (
f"# MEMORY_MID.md — Mid-Term Memory Digest\n\n"
f"*Auto-distilled: {now} via {backend}.*\n\n---\n\n"
)
out_path = inara_dir / "MEMORY_MID.md"
_rotate_backup(out_path)
out_path.write_text(header + response_text)
logger.info("distill_mid [%s/%s]: wrote %d chars via %s", u, p, len(header) + len(response_text), backend)
return {
"username": u,
"backend": backend,
"chars_written": len(header) + len(response_text),
"budget_tokens": budget_tokens,
}
async def distill_long(username: str, persona: str) -> dict:
"""
Ask the LLM to integrate MEMORY_MID.md into MEMORY_LONG.md.
Backs up the current MEMORY_LONG.md before overwriting.
"""
from llm_client import complete
from persona import set_context
u, p = username, persona
set_context(u, p)
inara_dir = _persona_path(u, p)
long_content = _read(inara_dir / "MEMORY_LONG.md")
mid_content = _read(inara_dir / "MEMORY_MID.md")
if not mid_content.strip() or "Not yet populated" in mid_content:
return {"error": "MEMORY_MID.md is empty — run distill/mid first"}
budget_tokens = settings.memory_budget_long
persona_name = p.title()
system_prompt = (
f"You are {persona_name}'s long-term memory curator. "
"You will receive the current long-term memory and a recent mid-term digest. "
f"Integrate the new information into the long-term memory. Target: under {budget_tokens} tokens. "
"Rules: preserve important historical facts; update or replace stale information; "
"absorb recurring themes from the mid-term digest; remove things no longer relevant. "
"Return ONLY the updated MEMORY_LONG.md content in markdown. No preamble or commentary."
)
user_content = (
f"## Current MEMORY_LONG.md\n\n{long_content}\n\n"
f"## Recent MEMORY_MID.md to integrate\n\n{mid_content}"
)
response_text, backend = await complete(
system_prompt=system_prompt,
messages=[{"role": "user", "content": user_content}],
role="distill",
)
err = _sanity_check(response_text, "distill_long", long_content)
if err:
logger.warning(err)
return {"error": err}
# Ensure the file has the right header if the LLM dropped it
now = datetime.now().strftime("%Y-%m-%d %H:%M")
if not response_text.lstrip().startswith("# MEMORY_LONG"):
response_text = (
f"# MEMORY_LONG.md — {persona_name} Long-Term Memory\n\n"
f"*Last distilled: {now} via {backend}.*\n\n---\n\n"
+ response_text
)
out_path = inara_dir / "MEMORY_LONG.md"
_rotate_backup(out_path)
out_path.write_text(response_text)
logger.info("distill_long [%s/%s]: wrote %d chars via %s", u, p, len(response_text), backend)
return {
"username": u,
"backend": backend,
"chars_written": len(response_text),
"budget_tokens": budget_tokens,
}