Files
Cortex-Inara/cortex/cron_runner.py
Scott Idem 6a1a1c2686 feat: unified model registry with role-based routing
Introduces model_registry.py as the single source of truth for all LLM
backend configuration. Replaces scattered backend settings across user_settings,
config distill_backend_*, and the UI toggle.

model_registry.py:
- Per-user home/{user}/model_registry.json with version, hosts, models, roles
- Models have: type (local_openai|claude_cli|gemini_cli|gemini_api), label,
  model_name, host_id, context_k (tokens), tags (capability labels)
- Roles map to priority chains: primary, backup_1..backup_4
- Built-in IDs (claude_cli, gemini_cli, gemini_api) always resolvable
- Auto-migrates existing local_llm.json on first access
- CRUD: save_host, remove_host, save_model, remove_model, set_role
- get_model_for_role(): registry → .env default → hardcoded fallback

config.py:
- role_chat/orchestrator/distill/coder/research .env defaults
- defined_roles: comma-separated standard role list (extensible)
- get_defined_roles() and get_role_default() helper methods

llm_client.complete():
- New role= parameter (default "chat") for registry-based routing
- model= still accepted for explicit UI toggle override
- _claude() and _local() accept model_cfg dict instead of raw string
- _local() uses pre-resolved config from registry

memory_distiller.py:
- distill_mid/long now use role="distill" (no more distill_backend_* .env vars needed)

cron_runner.py:
- brief jobs use role="chat"

routers/chat.py + auth.py:
- Use model_registry instead of user_settings for local model info

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-05 21:25:18 -04:00

203 lines
6.9 KiB
Python

"""
Cron job storage and execution.
Handles reading/writing CRONS.json and running jobs when they fire.
Imported by scheduler.py (to load jobs at startup) and tools/cron.py
(to add/remove jobs at runtime).
Job schema:
{
"id": "c_abc123",
"label": "Human-readable name",
"schedule": "daily:09:00", # see parse_schedule() for all formats
"type": "remind" | "note" | "message" | "brief",
"payload": "Text or prompt when the job fires",
"channel": null | "nextcloud" | "google_chat", # for message/brief types
"enabled": true,
"created_at": "ISO 8601",
"last_run": null | "ISO 8601"
}
Job types:
remind → appends to REMINDERS.md (auto-loaded into context at tier 2+)
note → appends to SCRATCH.md (read on demand via scratch_read)
message → sends payload as-is to NC Talk notification_room
brief → runs LLM with payload as the prompt, sends response to NC Talk
(good for morning briefings, summaries, proactive check-ins)
"""
import logging
from datetime import datetime, timezone
from pathlib import Path
from persona import persona_path as _persona_path
logger = logging.getLogger(__name__)
_DEFAULT_HOUR = 9
_DEFAULT_MINUTE = 0
_DOW = {
"mon": "mon", "tue": "tue", "wed": "wed", "thu": "thu",
"fri": "fri", "sat": "sat", "sun": "sun",
"monday": "mon", "tuesday": "tue", "wednesday": "wed",
"thursday": "thu", "friday": "fri", "saturday": "sat", "sunday": "sun",
}
# ---------------------------------------------------------------------------
# Storage
# ---------------------------------------------------------------------------
def crons_path(username: str | None = None, persona: str | None = None) -> Path:
return _persona_path(username, persona) / "CRONS.json"
def load_crons(username: str | None = None, persona: str | None = None) -> list[dict]:
p = crons_path(username, persona)
if not p.exists():
return []
try:
import json
return json.loads(p.read_text())
except Exception:
return []
def save_crons(crons: list[dict],
username: str | None = None,
persona: str | None = None) -> None:
import json
crons_path(username, persona).write_text(json.dumps(crons, indent=2) + "\n")
# ---------------------------------------------------------------------------
# Schedule parsing
# ---------------------------------------------------------------------------
def parse_schedule(schedule: str) -> dict:
"""
Convert a human schedule string to APScheduler cron kwargs.
Formats:
"hourly" → every hour at :00
"daily" → every day at 09:00
"daily:HH:MM" → every day at HH:MM
"weekly:DOW" → every DOW at 09:00
"weekly:DOW:HH:MM" → every DOW at HH:MM
"""
s = schedule.strip().lower()
if s == "hourly":
return {"minute": 0}
if s == "daily":
return {"hour": _DEFAULT_HOUR, "minute": _DEFAULT_MINUTE}
if s.startswith("daily:"):
h, m = _parse_hhmm(s[6:], schedule)
return {"hour": h, "minute": m}
if s.startswith("weekly:"):
rest = s[7:].split(":")
dow = _DOW.get(rest[0])
if not dow:
raise ValueError(
f"Unknown day of week {rest[0]!r}. "
f"Use: mon tue wed thu fri sat sun"
)
if len(rest) == 3:
h, m = _parse_hhmm(f"{rest[1]}:{rest[2]}", schedule)
else:
h, m = _DEFAULT_HOUR, _DEFAULT_MINUTE
return {"day_of_week": dow, "hour": h, "minute": m}
raise ValueError(
f"Unrecognised schedule {schedule!r}. "
f"Valid formats: hourly | daily | daily:HH:MM | weekly:DOW | weekly:DOW:HH:MM"
)
def _parse_hhmm(s: str, original: str) -> tuple[int, int]:
parts = s.split(":")
if len(parts) != 2:
raise ValueError(f"Expected HH:MM in {original!r}, got {s!r}")
return int(parts[0]), int(parts[1])
# ---------------------------------------------------------------------------
# Execution
# ---------------------------------------------------------------------------
def _now_label() -> str:
return datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M UTC")
async def run_job(job: dict) -> None:
"""Execute a cron job. Called by APScheduler when the job fires."""
job_type = job.get("type")
payload = job.get("payload", "").strip()
label = job.get("label", job.get("id", "cron"))
section = f"\n## {label}{_now_label()}\n\n{payload}\n"
p_root = _persona_path(job.get("user"), job.get("persona"))
if job_type == "remind":
p = p_root / "REMINDERS.md"
existing = p.read_text() if p.exists() else ""
p.write_text(existing.rstrip() + "\n" + section)
logger.info("cron [remind] fired: %s", label)
elif job_type == "note":
p = p_root / "SCRATCH.md"
existing = p.read_text() if p.exists() else ""
p.write_text(existing.rstrip() + "\n" + section)
logger.info("cron [note] fired: %s", label)
elif job_type == "message":
# Send payload text directly to the user's notification channel
from notification import notify
username = job.get("user") or "scott"
channel = job.get("channel") or None
await notify(username, payload, channel=channel)
logger.info("cron [message] sent: %s", label)
elif job_type == "brief":
# Run LLM with payload as the prompt, send response to notification channel.
# Great for morning briefings, reminders, proactive check-ins.
from context_loader import load_context
from llm_client import complete
from notification import notify
from persona import set_context
from config import settings as _s
username = job.get("user") or _s.user_name.lower()
persona_nm = job.get("persona") or _s.agent_name.lower()
channel = job.get("channel") or None
set_context(username, persona_nm)
system_prompt = load_context(2) # tier 2: identity + memory + user profile
try:
response_text, backend = await complete(
system_prompt=system_prompt,
messages=[{"role": "user", "content": payload}],
role="chat",
)
await notify(username, response_text, channel=channel)
logger.info("cron [brief] sent via %s: %s", backend, label)
except Exception as e:
logger.error("cron [brief] LLM error for %s: %s", label, e)
else:
logger.warning("cron: unknown type %r (job %s)", job_type, job.get("id"))
return
# Record last_run in the right persona's CRONS.json
u, p = job.get("user"), job.get("persona")
crons = load_crons(u, p)
for c in crons:
if c["id"] == job["id"]:
c["last_run"] = datetime.now(timezone.utc).isoformat()
break
save_crons(crons, u, p)