""" Cron job storage and execution. Handles reading/writing CRONS.json and running jobs when they fire. Imported by scheduler.py (to load jobs at startup) and tools/cron.py (to add/remove jobs at runtime). Job schema: { "id": "c_abc123", "label": "Human-readable name", "schedule": "daily:09:00", # see parse_schedule() for all formats "type": "remind" | "note" | "message" | "brief" | "task", "payload": "Text or prompt when the job fires", "channel": null | "nextcloud" | "google_chat", # for message/brief/task types "enabled": true, "created_at": "ISO 8601", "last_run": null | "ISO 8601" } Job types: remind → appends to REMINDERS.md (auto-loaded into context at tier 2+) note → appends to SCRATCH.md (read on demand via scratch_read) message → sends payload as-is to notification channel brief → calls LLM (no tools) with payload as prompt, sends response (good for morning briefings, summaries, proactive check-ins) task → runs full orchestrator tool loop with payload as the user request, sends Claude's response to notification channel (good for agentic scheduled work: research, file updates, checks) Tools that require confirmation are skipped — pre-approve them in Settings → Tools to allow them in scheduled tasks. """ import logging from datetime import datetime, timezone from pathlib import Path from persona import persona_path as _persona_path logger = logging.getLogger(__name__) _DEFAULT_HOUR = 9 _DEFAULT_MINUTE = 0 _DOW = { "mon": "mon", "tue": "tue", "wed": "wed", "thu": "thu", "fri": "fri", "sat": "sat", "sun": "sun", "monday": "mon", "tuesday": "tue", "wednesday": "wed", "thursday": "thu", "friday": "fri", "saturday": "sat", "sunday": "sun", } # --------------------------------------------------------------------------- # Storage # --------------------------------------------------------------------------- def crons_path(username: str | None = None, persona: str | None = None) -> Path: return _persona_path(username, persona) / "CRONS.json" def load_crons(username: str | None = None, persona: str | None = None) -> list[dict]: p = crons_path(username, persona) if not p.exists(): return [] try: import json return json.loads(p.read_text()) except Exception: return [] def save_crons(crons: list[dict], username: str | None = None, persona: str | None = None) -> None: import json crons_path(username, persona).write_text(json.dumps(crons, indent=2) + "\n") # --------------------------------------------------------------------------- # Schedule parsing # --------------------------------------------------------------------------- def parse_schedule(schedule: str) -> dict: """ Convert a human schedule string to APScheduler cron kwargs. Formats: "hourly" → every hour at :00 "daily" → every day at 09:00 "daily:HH:MM" → every day at HH:MM "weekly:DOW" → every DOW at 09:00 "weekly:DOW:HH:MM" → every DOW at HH:MM "monthly" → 1st of every month at 09:00 "monthly:DD" → day DD of every month at 09:00 "monthly:DD:HH:MM" → day DD of every month at HH:MM "yearly:MM:DD" → every year on MM/DD at 09:00 (birthdays, anniversaries) "yearly:MM:DD:HH:MM" → every year on MM/DD at HH:MM """ s = schedule.strip().lower() if s == "hourly": return {"minute": 0} if s == "daily": return {"hour": _DEFAULT_HOUR, "minute": _DEFAULT_MINUTE} if s.startswith("daily:"): h, m = _parse_hhmm(s[6:], schedule) return {"hour": h, "minute": m} if s.startswith("weekly:"): rest = s[7:].split(":") dow = _DOW.get(rest[0]) if not dow: raise ValueError( f"Unknown day of week {rest[0]!r}. " f"Use: mon tue wed thu fri sat sun" ) if len(rest) == 3: h, m = _parse_hhmm(f"{rest[1]}:{rest[2]}", schedule) else: h, m = _DEFAULT_HOUR, _DEFAULT_MINUTE return {"day_of_week": dow, "hour": h, "minute": m} if s.startswith("monthly"): rest = s[7:].lstrip(":") if not rest: return {"day": 1, "hour": _DEFAULT_HOUR, "minute": _DEFAULT_MINUTE} parts = rest.split(":") day = _parse_day(parts[0], schedule) if len(parts) == 3: h, m = _parse_hhmm(f"{parts[1]}:{parts[2]}", schedule) else: h, m = _DEFAULT_HOUR, _DEFAULT_MINUTE return {"day": day, "hour": h, "minute": m} if s.startswith("yearly:"): rest = s[7:].split(":") if len(rest) < 2: raise ValueError( f"yearly requires at least MM:DD in {schedule!r}. " f"Example: yearly:03:15 or yearly:03:15:09:00" ) month = _parse_month(rest[0], schedule) day = _parse_day(rest[1], schedule) if len(rest) == 4: h, m = _parse_hhmm(f"{rest[2]}:{rest[3]}", schedule) else: h, m = _DEFAULT_HOUR, _DEFAULT_MINUTE return {"month": month, "day": day, "hour": h, "minute": m} raise ValueError( f"Unrecognised schedule {schedule!r}. " f"Valid formats: hourly | daily | daily:HH:MM | weekly:DOW | weekly:DOW:HH:MM | " f"monthly | monthly:DD | monthly:DD:HH:MM | yearly:MM:DD | yearly:MM:DD:HH:MM" ) def _parse_hhmm(s: str, original: str) -> tuple[int, int]: parts = s.split(":") if len(parts) != 2: raise ValueError(f"Expected HH:MM in {original!r}, got {s!r}") return int(parts[0]), int(parts[1]) def _parse_day(s: str, original: str) -> int: try: d = int(s) except ValueError: raise ValueError(f"Expected day number (1–31) in {original!r}, got {s!r}") if not 1 <= d <= 31: raise ValueError(f"Day must be 1–31 in {original!r}, got {d}") return d def _parse_month(s: str, original: str) -> int: try: m = int(s) except ValueError: raise ValueError(f"Expected month number (1–12) in {original!r}, got {s!r}") if not 1 <= m <= 12: raise ValueError(f"Month must be 1–12 in {original!r}, got {m}") return m # --------------------------------------------------------------------------- # Execution # --------------------------------------------------------------------------- def _now_label() -> str: return datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M UTC") async def run_job(job: dict) -> None: """Execute a cron job. Called by APScheduler when the job fires.""" job_type = job.get("type") payload = job.get("payload", "").strip() label = job.get("label", job.get("id", "cron")) section = f"\n## {label} — {_now_label()}\n\n{payload}\n" p_root = _persona_path(job.get("user"), job.get("persona")) if job_type == "remind": p = p_root / "REMINDERS.md" existing = p.read_text() if p.exists() else "" p.write_text(existing.rstrip() + "\n" + section) logger.info("cron [remind] fired: %s", label) elif job_type == "note": p = p_root / "SCRATCH.md" existing = p.read_text() if p.exists() else "" p.write_text(existing.rstrip() + "\n" + section) logger.info("cron [note] fired: %s", label) elif job_type == "message": # Send payload text directly to the user's notification channel from notification import notify username = job.get("user") or "scott" channel = job.get("channel") or None await notify(username, payload, channel=channel) logger.info("cron [message] sent: %s", label) elif job_type == "brief": # Run LLM with payload as the prompt, send response to notification channel. # Great for morning briefings, reminders, proactive check-ins. from context_loader import load_context from llm_client import complete from notification import notify from persona import set_context from config import settings as _s username = job.get("user") or _s.user_name.lower() persona_nm = job.get("persona") or _s.agent_name.lower() channel = job.get("channel") or None set_context(username, persona_nm) system_prompt = load_context(2) # tier 2: identity + memory + user profile try: response_text, backend = await complete( system_prompt=system_prompt, messages=[{"role": "user", "content": payload}], role="chat", ) await notify(username, response_text, channel=channel) logger.info("cron [brief] sent via %s: %s", backend, label) except Exception as e: logger.error("cron [brief] LLM error for %s: %s", label, e) elif job_type == "task": # Run the full orchestrator tool loop, send Claude's response to the # notification channel. Tools that require confirmation are skipped in # cron context — the user is notified to pre-approve them. from orchestrator_engine import run as _orch_run from context_loader import load_context from notification import notify from persona import set_context from auth_utils import get_user_gemini_key, get_tool_policy, get_risk_policy from config import settings as _s username = job.get("user") or _s.user_name.lower() persona_nm = job.get("persona") or _s.agent_name.lower() channel = job.get("channel") or None set_context(username, persona_nm) system_prompt = load_context(2) policy = get_tool_policy(username) max_risk, whitelist, blacklist = get_risk_policy(username) gemini_key = get_user_gemini_key(username) try: result = await _orch_run( task=payload, system_prompt=system_prompt, gemini_api_key=gemini_key, respond_with_claude=True, confirm_allow=set(policy.get("allow") or []), confirm_deny=set(policy.get("deny") or []), max_risk=max_risk, risk_whitelist=whitelist, risk_blacklist=blacklist, ) if result.checkpoint: tool_name = (result.checkpoint.pending_calls[0].name if result.checkpoint.pending_calls else "unknown tool") msg = ( f"Scheduled task '{label}' paused — " f"'{tool_name}' requires confirmation. " "Pre-approve it in Settings → Tools to allow it in scheduled tasks." ) await notify(username, msg, channel=channel) logger.warning("cron [task] %s: confirmation required for %s", label, tool_name) else: await notify(username, result.response, channel=channel) logger.info("cron [task] completed via %s: %s", result.backend, label) except Exception as e: logger.error("cron [task] error for %s: %s", label, e) else: logger.warning("cron: unknown type %r (job %s)", job_type, job.get("id")) return # Record last_run in the right persona's CRONS.json u, p = job.get("user"), job.get("persona") crons = load_crons(u, p) for c in crons: if c["id"] == job["id"]: c["last_run"] = datetime.now(timezone.utc).isoformat() break save_crons(crons, u, p)