Add auto memory distillation scheduler (APScheduler)

- scheduler.py: AsyncIOScheduler with three cron jobs
    short  daily     03:00 (no LLM, always fast)
    mid    weekly    Sun 03:30 (LLM)
    long   monthly   1st 04:00 (LLM — off by default)
- config.py: AUTO_DISTILL, AUTO_DISTILL_SHORT/MID/LONG .env flags
- main.py: start/stop scheduler in FastAPI lifespan
- routers/distill.py: GET /distill/status — next run times + config
- requirements.txt: apscheduler>=3.10
- HELP.md: updated planned items, added /distill/status to API table

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Scott Idem
2026-03-17 22:31:38 -04:00
parent 9fb5b8ae63
commit 4253e69c0b
6 changed files with 128 additions and 1 deletions

99
cortex/scheduler.py Normal file
View File

@@ -0,0 +1,99 @@
"""
Auto memory distillation scheduler.
Default schedule (all overridable via .env flags):
short — daily at 03:00 (no LLM — fast)
mid — weekly Sun at 03:30 (LLM call)
long — monthly 1st at 04:00 (LLM call — off by default)
Set AUTO_DISTILL=false to disable entirely.
Set AUTO_DISTILL_LONG=true to enable monthly long-term integration.
"""
import logging
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from config import settings
logger = logging.getLogger(__name__)
_scheduler: AsyncIOScheduler | None = None
async def _run_short() -> None:
from memory_distiller import distill_short
try:
result = distill_short()
logger.info("auto distill short: %d files, %d chars", result["files_included"], result["chars_written"])
except Exception as e:
logger.error("auto distill short failed: %s", e)
async def _run_mid() -> None:
from memory_distiller import distill_mid
try:
result = await distill_mid()
if "error" in result:
logger.warning("auto distill mid skipped: %s", result["error"])
else:
logger.info("auto distill mid: %d chars via %s", result["chars_written"], result["backend"])
except Exception as e:
logger.error("auto distill mid failed: %s", e)
async def _run_long() -> None:
from memory_distiller import distill_long
try:
result = await distill_long()
if "error" in result:
logger.warning("auto distill long skipped: %s", result["error"])
else:
logger.info("auto distill long: %d chars via %s", result["chars_written"], result["backend"])
except Exception as e:
logger.error("auto distill long failed: %s", e)
def start() -> None:
global _scheduler
if not settings.auto_distill:
logger.info("auto distillation disabled (AUTO_DISTILL=false)")
return
_scheduler = AsyncIOScheduler(timezone="local")
if settings.auto_distill_short:
_scheduler.add_job(_run_short, "cron", hour=3, minute=0, id="distill_short")
logger.info("scheduled: distill_short daily at 03:00")
if settings.auto_distill_mid:
_scheduler.add_job(_run_mid, "cron", day_of_week="sun", hour=3, minute=30, id="distill_mid")
logger.info("scheduled: distill_mid weekly Sun at 03:30")
if settings.auto_distill_long:
_scheduler.add_job(_run_long, "cron", day=1, hour=4, minute=0, id="distill_long")
logger.info("scheduled: distill_long monthly on 1st at 04:00")
if _scheduler.get_jobs():
_scheduler.start()
logger.info("auto distillation scheduler started (%d jobs)", len(_scheduler.get_jobs()))
else:
logger.info("auto distillation: no jobs enabled")
def stop() -> None:
global _scheduler
if _scheduler and _scheduler.running:
_scheduler.shutdown(wait=False)
logger.info("auto distillation scheduler stopped")
def status() -> list[dict]:
"""Return next-run info for all scheduled jobs."""
if not _scheduler or not _scheduler.running:
return []
jobs = []
for job in _scheduler.get_jobs():
next_run = job.next_run_time
jobs.append({
"id": job.id,
"next_run": next_run.isoformat() if next_run else None,
})
return jobs