""" Cron job storage and execution. Handles reading/writing CRONS.json and running jobs when they fire. Imported by scheduler.py (to load jobs at startup) and tools/cron.py (to add/remove jobs at runtime). Job schema: { "id": "c_abc123", "label": "Human-readable name", "schedule": "daily:09:00", # see parse_schedule() for all formats "type": "remind" | "note" | "message" | "brief", "payload": "Text or prompt when the job fires", "channel": null | "nextcloud" | "google_chat", # for message/brief types "enabled": true, "created_at": "ISO 8601", "last_run": null | "ISO 8601" } Job types: remind → appends to REMINDERS.md (auto-loaded into context at tier 2+) note → appends to SCRATCH.md (read on demand via scratch_read) message → sends payload as-is to NC Talk notification_room brief → runs LLM with payload as the prompt, sends response to NC Talk (good for morning briefings, summaries, proactive check-ins) """ import logging from datetime import datetime, timezone from pathlib import Path from persona import persona_path as _persona_path logger = logging.getLogger(__name__) _DEFAULT_HOUR = 9 _DEFAULT_MINUTE = 0 _DOW = { "mon": "mon", "tue": "tue", "wed": "wed", "thu": "thu", "fri": "fri", "sat": "sat", "sun": "sun", "monday": "mon", "tuesday": "tue", "wednesday": "wed", "thursday": "thu", "friday": "fri", "saturday": "sat", "sunday": "sun", } # --------------------------------------------------------------------------- # Storage # --------------------------------------------------------------------------- def crons_path(username: str | None = None, persona: str | None = None) -> Path: return _persona_path(username, persona) / "CRONS.json" def load_crons(username: str | None = None, persona: str | None = None) -> list[dict]: p = crons_path(username, persona) if not p.exists(): return [] try: import json return json.loads(p.read_text()) except Exception: return [] def save_crons(crons: list[dict], username: str | None = None, persona: str | None = None) -> None: import json crons_path(username, persona).write_text(json.dumps(crons, indent=2) + "\n") # --------------------------------------------------------------------------- # Schedule parsing # --------------------------------------------------------------------------- def parse_schedule(schedule: str) -> dict: """ Convert a human schedule string to APScheduler cron kwargs. Formats: "hourly" → every hour at :00 "daily" → every day at 09:00 "daily:HH:MM" → every day at HH:MM "weekly:DOW" → every DOW at 09:00 "weekly:DOW:HH:MM" → every DOW at HH:MM """ s = schedule.strip().lower() if s == "hourly": return {"minute": 0} if s == "daily": return {"hour": _DEFAULT_HOUR, "minute": _DEFAULT_MINUTE} if s.startswith("daily:"): h, m = _parse_hhmm(s[6:], schedule) return {"hour": h, "minute": m} if s.startswith("weekly:"): rest = s[7:].split(":") dow = _DOW.get(rest[0]) if not dow: raise ValueError( f"Unknown day of week {rest[0]!r}. " f"Use: mon tue wed thu fri sat sun" ) if len(rest) == 3: h, m = _parse_hhmm(f"{rest[1]}:{rest[2]}", schedule) else: h, m = _DEFAULT_HOUR, _DEFAULT_MINUTE return {"day_of_week": dow, "hour": h, "minute": m} raise ValueError( f"Unrecognised schedule {schedule!r}. " f"Valid formats: hourly | daily | daily:HH:MM | weekly:DOW | weekly:DOW:HH:MM" ) def _parse_hhmm(s: str, original: str) -> tuple[int, int]: parts = s.split(":") if len(parts) != 2: raise ValueError(f"Expected HH:MM in {original!r}, got {s!r}") return int(parts[0]), int(parts[1]) # --------------------------------------------------------------------------- # Execution # --------------------------------------------------------------------------- def _now_label() -> str: return datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M UTC") async def run_job(job: dict) -> None: """Execute a cron job. Called by APScheduler when the job fires.""" job_type = job.get("type") payload = job.get("payload", "").strip() label = job.get("label", job.get("id", "cron")) section = f"\n## {label} — {_now_label()}\n\n{payload}\n" p_root = _persona_path(job.get("user"), job.get("persona")) if job_type == "remind": p = p_root / "REMINDERS.md" existing = p.read_text() if p.exists() else "" p.write_text(existing.rstrip() + "\n" + section) logger.info("cron [remind] fired: %s", label) elif job_type == "note": p = p_root / "SCRATCH.md" existing = p.read_text() if p.exists() else "" p.write_text(existing.rstrip() + "\n" + section) logger.info("cron [note] fired: %s", label) elif job_type == "message": # Send payload text directly to the user's notification channel from notification import notify username = job.get("user") or "scott" channel = job.get("channel") or None await notify(username, payload, channel=channel) logger.info("cron [message] sent: %s", label) elif job_type == "brief": # Run LLM with payload as the prompt, send response to notification channel. # Great for morning briefings, reminders, proactive check-ins. from context_loader import load_context from llm_client import complete from notification import notify from persona import set_context from config import settings as _s username = job.get("user") or _s.user_name.lower() persona_nm = job.get("persona") or _s.agent_name.lower() channel = job.get("channel") or None set_context(username, persona_nm) system_prompt = load_context(2) # tier 2: identity + memory + user profile try: response_text, backend = await complete( system_prompt=system_prompt, messages=[{"role": "user", "content": payload}], role="chat", ) await notify(username, response_text, channel=channel) logger.info("cron [brief] sent via %s: %s", backend, label) except Exception as e: logger.error("cron [brief] LLM error for %s: %s", label, e) else: logger.warning("cron: unknown type %r (job %s)", job_type, job.get("id")) return # Record last_run in the right persona's CRONS.json u, p = job.get("user"), job.get("persona") crons = load_crons(u, p) for c in crons: if c["id"] == job["id"]: c["last_run"] = datetime.now(timezone.utc).isoformat() break save_crons(crons, u, p)