""" Auto memory distillation scheduler. Default schedule (all overridable via .env flags): short — daily at 03:00 (no LLM — fast) mid — weekly Sun at 03:30 (LLM call) long — monthly 1st at 04:00 (LLM call — off by default) Set AUTO_DISTILL=false to disable entirely. Set AUTO_DISTILL_LONG=true to enable monthly long-term integration. """ import logging from zoneinfo import ZoneInfo from apscheduler.schedulers.asyncio import AsyncIOScheduler from config import settings logger = logging.getLogger(__name__) _scheduler: AsyncIOScheduler | None = None async def _run_short() -> None: from memory_distiller import distill_short try: result = distill_short() logger.info("auto distill short: %d files, %d chars", result["files_included"], result["chars_written"]) except Exception as e: logger.error("auto distill short failed: %s", e) async def _run_mid() -> None: from memory_distiller import distill_mid try: result = await distill_mid() if "error" in result: logger.warning("auto distill mid skipped: %s", result["error"]) else: logger.info("auto distill mid: %d chars via %s", result["chars_written"], result["backend"]) except Exception as e: logger.error("auto distill mid failed: %s", e) async def _run_long() -> None: from memory_distiller import distill_long try: result = await distill_long() if "error" in result: logger.warning("auto distill long skipped: %s", result["error"]) else: logger.info("auto distill long: %d chars via %s", result["chars_written"], result["backend"]) except Exception as e: logger.error("auto distill long failed: %s", e) def start() -> None: global _scheduler if not settings.auto_distill: logger.info("auto distillation disabled (AUTO_DISTILL=false)") return _scheduler = AsyncIOScheduler(timezone=ZoneInfo(settings.scheduler_timezone)) if settings.auto_distill_short: _scheduler.add_job(_run_short, "cron", hour=3, minute=0, id="distill_short") logger.info("scheduled: distill_short daily at 03:00") if settings.auto_distill_mid: _scheduler.add_job(_run_mid, "cron", day_of_week="sun", hour=3, minute=30, id="distill_mid") logger.info("scheduled: distill_mid weekly Sun at 03:30") if settings.auto_distill_long: _scheduler.add_job(_run_long, "cron", day=1, hour=4, minute=0, id="distill_long") logger.info("scheduled: distill_long monthly on 1st at 04:00") if _scheduler.get_jobs(): _scheduler.start() logger.info("auto distillation scheduler started (%d jobs)", len(_scheduler.get_jobs())) else: logger.info("auto distillation: no jobs enabled") def stop() -> None: global _scheduler if _scheduler and _scheduler.running: _scheduler.shutdown(wait=False) logger.info("auto distillation scheduler stopped") def status() -> list[dict]: """Return next-run info for all scheduled jobs.""" if not _scheduler or not _scheduler.running: return [] jobs = [] for job in _scheduler.get_jobs(): next_run = job.next_run_time jobs.append({ "id": job.id, "next_run": next_run.isoformat() if next_run else None, }) return jobs