Files
Cortex-Inara/cortex/cron_runner.py
Scott Idem 70665fadff feat: schedules UI, task cron type, monthly/yearly schedules, AE DB tools, integrations page
- Schedules web UI (/settings/crons): list, add, edit, pause/resume, delete jobs
- cron task type: full orchestrator tool loop on a schedule, result → notification channel
- parse_schedule: monthly/yearly formats (monthly:DD:HH:MM, yearly:MM:DD:HH:MM)
- HA inbound webhook tools toggle: orchestrator loop vs. direct LLM, configurable in UI
- ae_db_query/describe/show_view: SELECT-only Aether MariaDB access (admin, per-user creds)
- /settings/integrations: admin-only page for Aether DB credentials
- Schedules nav link added to all settings pages
- pymysql added to requirements
- Docs updated: HELP.md, MASTER.md, CLAUDE.md

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-13 21:06:43 -04:00

310 lines
12 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Cron job storage and execution.
Handles reading/writing CRONS.json and running jobs when they fire.
Imported by scheduler.py (to load jobs at startup) and tools/cron.py
(to add/remove jobs at runtime).
Job schema:
{
"id": "c_abc123",
"label": "Human-readable name",
"schedule": "daily:09:00", # see parse_schedule() for all formats
"type": "remind" | "note" | "message" | "brief" | "task",
"payload": "Text or prompt when the job fires",
"channel": null | "nextcloud" | "google_chat", # for message/brief/task types
"enabled": true,
"created_at": "ISO 8601",
"last_run": null | "ISO 8601"
}
Job types:
remind → appends to REMINDERS.md (auto-loaded into context at tier 2+)
note → appends to SCRATCH.md (read on demand via scratch_read)
message → sends payload as-is to notification channel
brief → calls LLM (no tools) with payload as prompt, sends response
(good for morning briefings, summaries, proactive check-ins)
task → runs full orchestrator tool loop with payload as the user request,
sends Claude's response to notification channel
(good for agentic scheduled work: research, file updates, checks)
Tools that require confirmation are skipped — pre-approve them
in Settings → Tools to allow them in scheduled tasks.
"""
import logging
from datetime import datetime, timezone
from pathlib import Path
from persona import persona_path as _persona_path
logger = logging.getLogger(__name__)
_DEFAULT_HOUR = 9
_DEFAULT_MINUTE = 0
_DOW = {
"mon": "mon", "tue": "tue", "wed": "wed", "thu": "thu",
"fri": "fri", "sat": "sat", "sun": "sun",
"monday": "mon", "tuesday": "tue", "wednesday": "wed",
"thursday": "thu", "friday": "fri", "saturday": "sat", "sunday": "sun",
}
# ---------------------------------------------------------------------------
# Storage
# ---------------------------------------------------------------------------
def crons_path(username: str | None = None, persona: str | None = None) -> Path:
return _persona_path(username, persona) / "CRONS.json"
def load_crons(username: str | None = None, persona: str | None = None) -> list[dict]:
p = crons_path(username, persona)
if not p.exists():
return []
try:
import json
return json.loads(p.read_text())
except Exception:
return []
def save_crons(crons: list[dict],
username: str | None = None,
persona: str | None = None) -> None:
import json
crons_path(username, persona).write_text(json.dumps(crons, indent=2) + "\n")
# ---------------------------------------------------------------------------
# Schedule parsing
# ---------------------------------------------------------------------------
def parse_schedule(schedule: str) -> dict:
"""
Convert a human schedule string to APScheduler cron kwargs.
Formats:
"hourly" → every hour at :00
"daily" → every day at 09:00
"daily:HH:MM" → every day at HH:MM
"weekly:DOW" → every DOW at 09:00
"weekly:DOW:HH:MM" → every DOW at HH:MM
"monthly" → 1st of every month at 09:00
"monthly:DD" → day DD of every month at 09:00
"monthly:DD:HH:MM" → day DD of every month at HH:MM
"yearly:MM:DD" → every year on MM/DD at 09:00 (birthdays, anniversaries)
"yearly:MM:DD:HH:MM" → every year on MM/DD at HH:MM
"""
s = schedule.strip().lower()
if s == "hourly":
return {"minute": 0}
if s == "daily":
return {"hour": _DEFAULT_HOUR, "minute": _DEFAULT_MINUTE}
if s.startswith("daily:"):
h, m = _parse_hhmm(s[6:], schedule)
return {"hour": h, "minute": m}
if s.startswith("weekly:"):
rest = s[7:].split(":")
dow = _DOW.get(rest[0])
if not dow:
raise ValueError(
f"Unknown day of week {rest[0]!r}. "
f"Use: mon tue wed thu fri sat sun"
)
if len(rest) == 3:
h, m = _parse_hhmm(f"{rest[1]}:{rest[2]}", schedule)
else:
h, m = _DEFAULT_HOUR, _DEFAULT_MINUTE
return {"day_of_week": dow, "hour": h, "minute": m}
if s.startswith("monthly"):
rest = s[7:].lstrip(":")
if not rest:
return {"day": 1, "hour": _DEFAULT_HOUR, "minute": _DEFAULT_MINUTE}
parts = rest.split(":")
day = _parse_day(parts[0], schedule)
if len(parts) == 3:
h, m = _parse_hhmm(f"{parts[1]}:{parts[2]}", schedule)
else:
h, m = _DEFAULT_HOUR, _DEFAULT_MINUTE
return {"day": day, "hour": h, "minute": m}
if s.startswith("yearly:"):
rest = s[7:].split(":")
if len(rest) < 2:
raise ValueError(
f"yearly requires at least MM:DD in {schedule!r}. "
f"Example: yearly:03:15 or yearly:03:15:09:00"
)
month = _parse_month(rest[0], schedule)
day = _parse_day(rest[1], schedule)
if len(rest) == 4:
h, m = _parse_hhmm(f"{rest[2]}:{rest[3]}", schedule)
else:
h, m = _DEFAULT_HOUR, _DEFAULT_MINUTE
return {"month": month, "day": day, "hour": h, "minute": m}
raise ValueError(
f"Unrecognised schedule {schedule!r}. "
f"Valid formats: hourly | daily | daily:HH:MM | weekly:DOW | weekly:DOW:HH:MM | "
f"monthly | monthly:DD | monthly:DD:HH:MM | yearly:MM:DD | yearly:MM:DD:HH:MM"
)
def _parse_hhmm(s: str, original: str) -> tuple[int, int]:
parts = s.split(":")
if len(parts) != 2:
raise ValueError(f"Expected HH:MM in {original!r}, got {s!r}")
return int(parts[0]), int(parts[1])
def _parse_day(s: str, original: str) -> int:
try:
d = int(s)
except ValueError:
raise ValueError(f"Expected day number (131) in {original!r}, got {s!r}")
if not 1 <= d <= 31:
raise ValueError(f"Day must be 131 in {original!r}, got {d}")
return d
def _parse_month(s: str, original: str) -> int:
try:
m = int(s)
except ValueError:
raise ValueError(f"Expected month number (112) in {original!r}, got {s!r}")
if not 1 <= m <= 12:
raise ValueError(f"Month must be 112 in {original!r}, got {m}")
return m
# ---------------------------------------------------------------------------
# Execution
# ---------------------------------------------------------------------------
def _now_label() -> str:
return datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M UTC")
async def run_job(job: dict) -> None:
"""Execute a cron job. Called by APScheduler when the job fires."""
job_type = job.get("type")
payload = job.get("payload", "").strip()
label = job.get("label", job.get("id", "cron"))
section = f"\n## {label}{_now_label()}\n\n{payload}\n"
p_root = _persona_path(job.get("user"), job.get("persona"))
if job_type == "remind":
p = p_root / "REMINDERS.md"
existing = p.read_text() if p.exists() else ""
p.write_text(existing.rstrip() + "\n" + section)
logger.info("cron [remind] fired: %s", label)
elif job_type == "note":
p = p_root / "SCRATCH.md"
existing = p.read_text() if p.exists() else ""
p.write_text(existing.rstrip() + "\n" + section)
logger.info("cron [note] fired: %s", label)
elif job_type == "message":
# Send payload text directly to the user's notification channel
from notification import notify
username = job.get("user") or "scott"
channel = job.get("channel") or None
await notify(username, payload, channel=channel)
logger.info("cron [message] sent: %s", label)
elif job_type == "brief":
# Run LLM with payload as the prompt, send response to notification channel.
# Great for morning briefings, reminders, proactive check-ins.
from context_loader import load_context
from llm_client import complete
from notification import notify
from persona import set_context
from config import settings as _s
username = job.get("user") or _s.user_name.lower()
persona_nm = job.get("persona") or _s.agent_name.lower()
channel = job.get("channel") or None
set_context(username, persona_nm)
system_prompt = load_context(2) # tier 2: identity + memory + user profile
try:
response_text, backend = await complete(
system_prompt=system_prompt,
messages=[{"role": "user", "content": payload}],
role="chat",
)
await notify(username, response_text, channel=channel)
logger.info("cron [brief] sent via %s: %s", backend, label)
except Exception as e:
logger.error("cron [brief] LLM error for %s: %s", label, e)
elif job_type == "task":
# Run the full orchestrator tool loop, send Claude's response to the
# notification channel. Tools that require confirmation are skipped in
# cron context — the user is notified to pre-approve them.
from orchestrator_engine import run as _orch_run
from context_loader import load_context
from notification import notify
from persona import set_context
from auth_utils import get_user_gemini_key, get_tool_policy, get_risk_policy
from config import settings as _s
username = job.get("user") or _s.user_name.lower()
persona_nm = job.get("persona") or _s.agent_name.lower()
channel = job.get("channel") or None
set_context(username, persona_nm)
system_prompt = load_context(2)
policy = get_tool_policy(username)
max_risk, whitelist, blacklist = get_risk_policy(username)
gemini_key = get_user_gemini_key(username)
try:
result = await _orch_run(
task=payload,
system_prompt=system_prompt,
gemini_api_key=gemini_key,
respond_with_claude=True,
confirm_allow=set(policy.get("allow") or []),
confirm_deny=set(policy.get("deny") or []),
max_risk=max_risk,
risk_whitelist=whitelist,
risk_blacklist=blacklist,
)
if result.checkpoint:
tool_name = (result.checkpoint.pending_calls[0].name
if result.checkpoint.pending_calls else "unknown tool")
msg = (
f"Scheduled task '{label}' paused — "
f"'{tool_name}' requires confirmation. "
"Pre-approve it in Settings → Tools to allow it in scheduled tasks."
)
await notify(username, msg, channel=channel)
logger.warning("cron [task] %s: confirmation required for %s", label, tool_name)
else:
await notify(username, result.response, channel=channel)
logger.info("cron [task] completed via %s: %s", result.backend, label)
except Exception as e:
logger.error("cron [task] error for %s: %s", label, e)
else:
logger.warning("cron: unknown type %r (job %s)", job_type, job.get("id"))
return
# Record last_run in the right persona's CRONS.json
u, p = job.get("user"), job.get("persona")
crons = load_crons(u, p)
for c in crons:
if c["id"] == job["id"]:
c["last_run"] = datetime.now(timezone.utc).isoformat()
break
save_crons(crons, u, p)