""" Auto memory distillation scheduler. Default schedule (all overridable via .env flags): short — daily at 03:00 (no LLM — fast) mid — weekly Sun at 03:30 (LLM call) long — monthly 1st at 04:00 (LLM call — off by default) Set AUTO_DISTILL=false to disable entirely. Set AUTO_DISTILL_LONG=true to enable monthly long-term integration. """ import logging from zoneinfo import ZoneInfo from apscheduler.schedulers.asyncio import AsyncIOScheduler from config import settings logger = logging.getLogger(__name__) _scheduler: AsyncIOScheduler | None = None def _all_personas() -> list[tuple[str, str]]: """Return [(username, persona_name)] for every persona on this instance.""" from persona import list_users, list_user_personas pairs = [] for u in list_users(): for p in list_user_personas(u): pairs.append((u, p)) return pairs async def _run_short() -> None: from memory_distiller import distill_short for u, p in _all_personas(): try: result = distill_short(u, p) logger.info("auto distill short [%s/%s]: %d files, %d chars", u, p, result["files_included"], result["chars_written"]) except Exception as e: logger.error("auto distill short [%s/%s] failed: %s", u, p, e) async def _run_mid() -> None: from memory_distiller import distill_mid from notification import notify for u, p in _all_personas(): try: result = await distill_mid(u, p) if "error" in result: logger.warning("auto distill mid [%s/%s] skipped: %s", u, p, result["error"]) else: logger.info("auto distill mid [%s/%s]: %d chars via %s", u, p, result["chars_written"], result["backend"]) await notify(u, f"📝 Weekly memory digest complete ({result['chars_written']} chars via {result['backend']}).") except Exception as e: logger.error("auto distill mid [%s/%s] failed: %s", u, p, e) async def _run_long() -> None: from memory_distiller import distill_long from notification import notify for u, p in _all_personas(): try: result = await distill_long(u, p) if "error" in result: logger.warning("auto distill long [%s/%s] skipped: %s", u, p, result["error"]) else: logger.info("auto distill long [%s/%s]: %d chars via %s", u, p, result["chars_written"], result["backend"]) await notify(u, f"🧠 Monthly long-term memory integration complete ({result['chars_written']} chars via {result['backend']}). Worth a quick review.") except Exception as e: logger.error("auto distill long [%s/%s] failed: %s", u, p, e) def get_scheduler() -> AsyncIOScheduler | None: """Return the running scheduler instance (used by cron tools for live add/remove).""" return _scheduler def start() -> None: global _scheduler _scheduler = AsyncIOScheduler(timezone=ZoneInfo(settings.scheduler_timezone)) if not settings.auto_distill: logger.info("auto distillation disabled (AUTO_DISTILL=false)") if settings.auto_distill_short: _scheduler.add_job(_run_short, "cron", hour=3, minute=0, id="distill_short") logger.info("scheduled: distill_short daily at 03:00") if settings.auto_distill_mid: _scheduler.add_job(_run_mid, "cron", day_of_week="sun", hour=3, minute=30, id="distill_mid") logger.info("scheduled: distill_mid weekly Sun at 03:30") if settings.auto_distill_long: _scheduler.add_job(_run_long, "cron", day=1, hour=4, minute=0, id="distill_long") logger.info("scheduled: distill_long monthly on 1st at 04:00") # Load user-defined cron jobs from CRONS.json _load_user_crons() _scheduler.start() logger.info("scheduler started (%d jobs)", len(_scheduler.get_jobs())) def _load_user_crons() -> None: """Register all enabled user-defined cron jobs across all users and personas.""" import asyncio try: from cron_runner import load_crons, parse_schedule, run_job from persona import list_users, list_user_personas except ImportError as e: logger.warning("could not import cron modules: %s", e) return total = 0 persona_count = 0 for username in list_users(): for persona_name in list_user_personas(username): persona_count += 1 for job in load_crons(username, persona_name): if not job.get("enabled", True): continue # Ensure user + persona are stamped on the job for run_job() path resolution job.setdefault("user", username) job.setdefault("persona", persona_name) try: kwargs = parse_schedule(job["schedule"]) sched_id = f"{username}:{persona_name}:{job['id']}" _scheduler.add_job( lambda j=job: asyncio.ensure_future(run_job(j)), "cron", id=sched_id, replace_existing=True, **kwargs, ) total += 1 except Exception as e: logger.warning("cron %s/%s/%s skipped: %s", username, persona_name, job.get("id"), e) if total: logger.info("loaded %d user cron job(s) across %d persona(s)", total, persona_count) def stop() -> None: global _scheduler if _scheduler and _scheduler.running: _scheduler.shutdown(wait=False) logger.info("auto distillation scheduler stopped") def status() -> list[dict]: """Return next-run info for all scheduled jobs.""" if not _scheduler or not _scheduler.running: return [] jobs = [] for job in _scheduler.get_jobs(): next_run = job.next_run_time jobs.append({ "id": job.id, "next_run": next_run.isoformat() if next_run else None, }) return jobs