Files
Cortex-Inara/cortex/scheduler.py
Scott Idem a4daebdc9b feat: local LLM multi-model, session search, cron proactive types, notifications, docs overhaul
Local LLM:
- user_settings.py: per-user hosts/models config (local_llm.json)
- routers/local_llm.py + static/local_llm.html: dedicated settings page
- llm_client.py: local OpenAI-compatible backend via httpx
- config.py: LOCAL_API_URL/KEY/MODEL + per-backend timeouts
- Active model shown near backend toggle (amber hint text)

Memory distillation:
- memory_distiller.py: DISTILL_BACKEND_MID/LONG .env overrides
- scheduler.py + notification.py: notify NC Talk after mid/long distill
- notification.py: outbound channel abstraction (NC Talk, extensible)

Session search:
- routers/files.py: GET /sessions/search?q= with excerpts grouped by date
- static/index.html + app.js: search UI in file sidebar with highlight
- _esc() helper to prevent XSS in search results

Proactive cron:
- cron_runner.py: new job types — message (send directly) and brief (LLM + send)
- Both support optional per-job channel override

Channels:
- routers/nextcloud_talk.py: consolidated using notification._send_nct_message()
- routers/auth.py: local backend status in /auth/status
- routers/chat.py: /backend returns {primary, fallback, local_model} object

UI / UX:
- Copy button for user messages (matching assistant)
- Autocomplete disabled on sensitive form fields
- settings.html: local model section replaced with link to /settings/local

Docs overhaul:
- MASTER.md hub + ARCH__SYSTEM/BACKENDS/PERSONA/CHANNELS/FUTURE.md
- ARCH__Intelligence_Layer.md replaced with redirect table
- CORTEX.md trimmed to vision only; README updated
- OPEN_WEBUI_API.md added to docs/

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-05 20:53:06 -04:00

148 lines
5.4 KiB
Python

"""
Auto memory distillation scheduler.
Default schedule (all overridable via .env flags):
short — daily at 03:00 (no LLM — fast)
mid — weekly Sun at 03:30 (LLM call)
long — monthly 1st at 04:00 (LLM call — off by default)
Set AUTO_DISTILL=false to disable entirely.
Set AUTO_DISTILL_LONG=true to enable monthly long-term integration.
"""
import logging
from zoneinfo import ZoneInfo
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from config import settings
logger = logging.getLogger(__name__)
_scheduler: AsyncIOScheduler | None = None
async def _run_short() -> None:
from memory_distiller import distill_short
try:
result = distill_short()
logger.info("auto distill short: %d files, %d chars", result["files_included"], result["chars_written"])
except Exception as e:
logger.error("auto distill short failed: %s", e)
async def _run_mid() -> None:
from memory_distiller import distill_mid
from notification import notify
try:
result = await distill_mid()
if "error" in result:
logger.warning("auto distill mid skipped: %s", result["error"])
else:
logger.info("auto distill mid: %d chars via %s", result["chars_written"], result["backend"])
await notify(result["username"], f"📝 Weekly memory digest complete ({result['chars_written']} chars via {result['backend']}).")
except Exception as e:
logger.error("auto distill mid failed: %s", e)
async def _run_long() -> None:
from memory_distiller import distill_long
from notification import notify
try:
result = await distill_long()
if "error" in result:
logger.warning("auto distill long skipped: %s", result["error"])
else:
logger.info("auto distill long: %d chars via %s", result["chars_written"], result["backend"])
await notify(result["username"], f"🧠 Monthly long-term memory integration complete ({result['chars_written']} chars via {result['backend']}). Worth a quick review.")
except Exception as e:
logger.error("auto distill long failed: %s", e)
def get_scheduler() -> AsyncIOScheduler | None:
"""Return the running scheduler instance (used by cron tools for live add/remove)."""
return _scheduler
def start() -> None:
global _scheduler
_scheduler = AsyncIOScheduler(timezone=ZoneInfo(settings.scheduler_timezone))
if not settings.auto_distill:
logger.info("auto distillation disabled (AUTO_DISTILL=false)")
if settings.auto_distill_short:
_scheduler.add_job(_run_short, "cron", hour=3, minute=0, id="distill_short")
logger.info("scheduled: distill_short daily at 03:00")
if settings.auto_distill_mid:
_scheduler.add_job(_run_mid, "cron", day_of_week="sun", hour=3, minute=30, id="distill_mid")
logger.info("scheduled: distill_mid weekly Sun at 03:30")
if settings.auto_distill_long:
_scheduler.add_job(_run_long, "cron", day=1, hour=4, minute=0, id="distill_long")
logger.info("scheduled: distill_long monthly on 1st at 04:00")
# Load user-defined cron jobs from CRONS.json
_load_user_crons()
_scheduler.start()
logger.info("scheduler started (%d jobs)", len(_scheduler.get_jobs()))
def _load_user_crons() -> None:
"""Register all enabled user-defined cron jobs across all users and personas."""
import asyncio
try:
from cron_runner import load_crons, parse_schedule, run_job
from persona import list_users, list_user_personas
except ImportError as e:
logger.warning("could not import cron modules: %s", e)
return
total = 0
persona_count = 0
for username in list_users():
for persona_name in list_user_personas(username):
persona_count += 1
for job in load_crons(username, persona_name):
if not job.get("enabled", True):
continue
# Ensure user + persona are stamped on the job for run_job() path resolution
job.setdefault("user", username)
job.setdefault("persona", persona_name)
try:
kwargs = parse_schedule(job["schedule"])
sched_id = f"{username}:{persona_name}:{job['id']}"
_scheduler.add_job(
lambda j=job: asyncio.ensure_future(run_job(j)),
"cron",
id=sched_id,
replace_existing=True,
**kwargs,
)
total += 1
except Exception as e:
logger.warning("cron %s/%s/%s skipped: %s", username, persona_name, job.get("id"), e)
if total:
logger.info("loaded %d user cron job(s) across %d persona(s)", total, persona_count)
def stop() -> None:
global _scheduler
if _scheduler and _scheduler.running:
_scheduler.shutdown(wait=False)
logger.info("auto distillation scheduler stopped")
def status() -> list[dict]:
"""Return next-run info for all scheduled jobs."""
if not _scheduler or not _scheduler.running:
return []
jobs = []
for job in _scheduler.get_jobs():
next_run = job.next_run_time
jobs.append({
"id": job.id,
"next_run": next_run.isoformat() if next_run else None,
})
return jobs