- Schedules web UI (/settings/crons): list, add, edit, pause/resume, delete jobs - cron task type: full orchestrator tool loop on a schedule, result → notification channel - parse_schedule: monthly/yearly formats (monthly:DD:HH:MM, yearly:MM:DD:HH:MM) - HA inbound webhook tools toggle: orchestrator loop vs. direct LLM, configurable in UI - ae_db_query/describe/show_view: SELECT-only Aether MariaDB access (admin, per-user creds) - /settings/integrations: admin-only page for Aether DB credentials - Schedules nav link added to all settings pages - pymysql added to requirements - Docs updated: HELP.md, MASTER.md, CLAUDE.md Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
310 lines
12 KiB
Python
310 lines
12 KiB
Python
"""
|
||
Cron job storage and execution.
|
||
|
||
Handles reading/writing CRONS.json and running jobs when they fire.
|
||
Imported by scheduler.py (to load jobs at startup) and tools/cron.py
|
||
(to add/remove jobs at runtime).
|
||
|
||
Job schema:
|
||
{
|
||
"id": "c_abc123",
|
||
"label": "Human-readable name",
|
||
"schedule": "daily:09:00", # see parse_schedule() for all formats
|
||
"type": "remind" | "note" | "message" | "brief" | "task",
|
||
"payload": "Text or prompt when the job fires",
|
||
"channel": null | "nextcloud" | "google_chat", # for message/brief/task types
|
||
"enabled": true,
|
||
"created_at": "ISO 8601",
|
||
"last_run": null | "ISO 8601"
|
||
}
|
||
|
||
Job types:
|
||
remind → appends to REMINDERS.md (auto-loaded into context at tier 2+)
|
||
note → appends to SCRATCH.md (read on demand via scratch_read)
|
||
message → sends payload as-is to notification channel
|
||
brief → calls LLM (no tools) with payload as prompt, sends response
|
||
(good for morning briefings, summaries, proactive check-ins)
|
||
task → runs full orchestrator tool loop with payload as the user request,
|
||
sends Claude's response to notification channel
|
||
(good for agentic scheduled work: research, file updates, checks)
|
||
Tools that require confirmation are skipped — pre-approve them
|
||
in Settings → Tools to allow them in scheduled tasks.
|
||
"""
|
||
|
||
import logging
|
||
from datetime import datetime, timezone
|
||
from pathlib import Path
|
||
|
||
from persona import persona_path as _persona_path
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
_DEFAULT_HOUR = 9
|
||
_DEFAULT_MINUTE = 0
|
||
|
||
_DOW = {
|
||
"mon": "mon", "tue": "tue", "wed": "wed", "thu": "thu",
|
||
"fri": "fri", "sat": "sat", "sun": "sun",
|
||
"monday": "mon", "tuesday": "tue", "wednesday": "wed",
|
||
"thursday": "thu", "friday": "fri", "saturday": "sat", "sunday": "sun",
|
||
}
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Storage
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def crons_path(username: str | None = None, persona: str | None = None) -> Path:
|
||
return _persona_path(username, persona) / "CRONS.json"
|
||
|
||
|
||
def load_crons(username: str | None = None, persona: str | None = None) -> list[dict]:
|
||
p = crons_path(username, persona)
|
||
if not p.exists():
|
||
return []
|
||
try:
|
||
import json
|
||
return json.loads(p.read_text())
|
||
except Exception:
|
||
return []
|
||
|
||
|
||
def save_crons(crons: list[dict],
|
||
username: str | None = None,
|
||
persona: str | None = None) -> None:
|
||
import json
|
||
crons_path(username, persona).write_text(json.dumps(crons, indent=2) + "\n")
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Schedule parsing
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def parse_schedule(schedule: str) -> dict:
|
||
"""
|
||
Convert a human schedule string to APScheduler cron kwargs.
|
||
|
||
Formats:
|
||
"hourly" → every hour at :00
|
||
"daily" → every day at 09:00
|
||
"daily:HH:MM" → every day at HH:MM
|
||
"weekly:DOW" → every DOW at 09:00
|
||
"weekly:DOW:HH:MM" → every DOW at HH:MM
|
||
"monthly" → 1st of every month at 09:00
|
||
"monthly:DD" → day DD of every month at 09:00
|
||
"monthly:DD:HH:MM" → day DD of every month at HH:MM
|
||
"yearly:MM:DD" → every year on MM/DD at 09:00 (birthdays, anniversaries)
|
||
"yearly:MM:DD:HH:MM" → every year on MM/DD at HH:MM
|
||
"""
|
||
s = schedule.strip().lower()
|
||
|
||
if s == "hourly":
|
||
return {"minute": 0}
|
||
|
||
if s == "daily":
|
||
return {"hour": _DEFAULT_HOUR, "minute": _DEFAULT_MINUTE}
|
||
|
||
if s.startswith("daily:"):
|
||
h, m = _parse_hhmm(s[6:], schedule)
|
||
return {"hour": h, "minute": m}
|
||
|
||
if s.startswith("weekly:"):
|
||
rest = s[7:].split(":")
|
||
dow = _DOW.get(rest[0])
|
||
if not dow:
|
||
raise ValueError(
|
||
f"Unknown day of week {rest[0]!r}. "
|
||
f"Use: mon tue wed thu fri sat sun"
|
||
)
|
||
if len(rest) == 3:
|
||
h, m = _parse_hhmm(f"{rest[1]}:{rest[2]}", schedule)
|
||
else:
|
||
h, m = _DEFAULT_HOUR, _DEFAULT_MINUTE
|
||
return {"day_of_week": dow, "hour": h, "minute": m}
|
||
|
||
if s.startswith("monthly"):
|
||
rest = s[7:].lstrip(":")
|
||
if not rest:
|
||
return {"day": 1, "hour": _DEFAULT_HOUR, "minute": _DEFAULT_MINUTE}
|
||
parts = rest.split(":")
|
||
day = _parse_day(parts[0], schedule)
|
||
if len(parts) == 3:
|
||
h, m = _parse_hhmm(f"{parts[1]}:{parts[2]}", schedule)
|
||
else:
|
||
h, m = _DEFAULT_HOUR, _DEFAULT_MINUTE
|
||
return {"day": day, "hour": h, "minute": m}
|
||
|
||
if s.startswith("yearly:"):
|
||
rest = s[7:].split(":")
|
||
if len(rest) < 2:
|
||
raise ValueError(
|
||
f"yearly requires at least MM:DD in {schedule!r}. "
|
||
f"Example: yearly:03:15 or yearly:03:15:09:00"
|
||
)
|
||
month = _parse_month(rest[0], schedule)
|
||
day = _parse_day(rest[1], schedule)
|
||
if len(rest) == 4:
|
||
h, m = _parse_hhmm(f"{rest[2]}:{rest[3]}", schedule)
|
||
else:
|
||
h, m = _DEFAULT_HOUR, _DEFAULT_MINUTE
|
||
return {"month": month, "day": day, "hour": h, "minute": m}
|
||
|
||
raise ValueError(
|
||
f"Unrecognised schedule {schedule!r}. "
|
||
f"Valid formats: hourly | daily | daily:HH:MM | weekly:DOW | weekly:DOW:HH:MM | "
|
||
f"monthly | monthly:DD | monthly:DD:HH:MM | yearly:MM:DD | yearly:MM:DD:HH:MM"
|
||
)
|
||
|
||
|
||
def _parse_hhmm(s: str, original: str) -> tuple[int, int]:
|
||
parts = s.split(":")
|
||
if len(parts) != 2:
|
||
raise ValueError(f"Expected HH:MM in {original!r}, got {s!r}")
|
||
return int(parts[0]), int(parts[1])
|
||
|
||
|
||
def _parse_day(s: str, original: str) -> int:
|
||
try:
|
||
d = int(s)
|
||
except ValueError:
|
||
raise ValueError(f"Expected day number (1–31) in {original!r}, got {s!r}")
|
||
if not 1 <= d <= 31:
|
||
raise ValueError(f"Day must be 1–31 in {original!r}, got {d}")
|
||
return d
|
||
|
||
|
||
def _parse_month(s: str, original: str) -> int:
|
||
try:
|
||
m = int(s)
|
||
except ValueError:
|
||
raise ValueError(f"Expected month number (1–12) in {original!r}, got {s!r}")
|
||
if not 1 <= m <= 12:
|
||
raise ValueError(f"Month must be 1–12 in {original!r}, got {m}")
|
||
return m
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Execution
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def _now_label() -> str:
|
||
return datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M UTC")
|
||
|
||
|
||
async def run_job(job: dict) -> None:
|
||
"""Execute a cron job. Called by APScheduler when the job fires."""
|
||
job_type = job.get("type")
|
||
payload = job.get("payload", "").strip()
|
||
label = job.get("label", job.get("id", "cron"))
|
||
section = f"\n## {label} — {_now_label()}\n\n{payload}\n"
|
||
|
||
p_root = _persona_path(job.get("user"), job.get("persona"))
|
||
|
||
if job_type == "remind":
|
||
p = p_root / "REMINDERS.md"
|
||
existing = p.read_text() if p.exists() else ""
|
||
p.write_text(existing.rstrip() + "\n" + section)
|
||
logger.info("cron [remind] fired: %s", label)
|
||
|
||
elif job_type == "note":
|
||
p = p_root / "SCRATCH.md"
|
||
existing = p.read_text() if p.exists() else ""
|
||
p.write_text(existing.rstrip() + "\n" + section)
|
||
logger.info("cron [note] fired: %s", label)
|
||
|
||
elif job_type == "message":
|
||
# Send payload text directly to the user's notification channel
|
||
from notification import notify
|
||
username = job.get("user") or "scott"
|
||
channel = job.get("channel") or None
|
||
await notify(username, payload, channel=channel)
|
||
logger.info("cron [message] sent: %s", label)
|
||
|
||
elif job_type == "brief":
|
||
# Run LLM with payload as the prompt, send response to notification channel.
|
||
# Great for morning briefings, reminders, proactive check-ins.
|
||
from context_loader import load_context
|
||
from llm_client import complete
|
||
from notification import notify
|
||
from persona import set_context
|
||
from config import settings as _s
|
||
|
||
username = job.get("user") or _s.user_name.lower()
|
||
persona_nm = job.get("persona") or _s.agent_name.lower()
|
||
channel = job.get("channel") or None
|
||
set_context(username, persona_nm)
|
||
|
||
system_prompt = load_context(2) # tier 2: identity + memory + user profile
|
||
try:
|
||
response_text, backend = await complete(
|
||
system_prompt=system_prompt,
|
||
messages=[{"role": "user", "content": payload}],
|
||
role="chat",
|
||
)
|
||
await notify(username, response_text, channel=channel)
|
||
logger.info("cron [brief] sent via %s: %s", backend, label)
|
||
except Exception as e:
|
||
logger.error("cron [brief] LLM error for %s: %s", label, e)
|
||
|
||
elif job_type == "task":
|
||
# Run the full orchestrator tool loop, send Claude's response to the
|
||
# notification channel. Tools that require confirmation are skipped in
|
||
# cron context — the user is notified to pre-approve them.
|
||
from orchestrator_engine import run as _orch_run
|
||
from context_loader import load_context
|
||
from notification import notify
|
||
from persona import set_context
|
||
from auth_utils import get_user_gemini_key, get_tool_policy, get_risk_policy
|
||
from config import settings as _s
|
||
|
||
username = job.get("user") or _s.user_name.lower()
|
||
persona_nm = job.get("persona") or _s.agent_name.lower()
|
||
channel = job.get("channel") or None
|
||
set_context(username, persona_nm)
|
||
|
||
system_prompt = load_context(2)
|
||
policy = get_tool_policy(username)
|
||
max_risk, whitelist, blacklist = get_risk_policy(username)
|
||
gemini_key = get_user_gemini_key(username)
|
||
|
||
try:
|
||
result = await _orch_run(
|
||
task=payload,
|
||
system_prompt=system_prompt,
|
||
gemini_api_key=gemini_key,
|
||
respond_with_claude=True,
|
||
confirm_allow=set(policy.get("allow") or []),
|
||
confirm_deny=set(policy.get("deny") or []),
|
||
max_risk=max_risk,
|
||
risk_whitelist=whitelist,
|
||
risk_blacklist=blacklist,
|
||
)
|
||
if result.checkpoint:
|
||
tool_name = (result.checkpoint.pending_calls[0].name
|
||
if result.checkpoint.pending_calls else "unknown tool")
|
||
msg = (
|
||
f"Scheduled task '{label}' paused — "
|
||
f"'{tool_name}' requires confirmation. "
|
||
"Pre-approve it in Settings → Tools to allow it in scheduled tasks."
|
||
)
|
||
await notify(username, msg, channel=channel)
|
||
logger.warning("cron [task] %s: confirmation required for %s", label, tool_name)
|
||
else:
|
||
await notify(username, result.response, channel=channel)
|
||
logger.info("cron [task] completed via %s: %s", result.backend, label)
|
||
except Exception as e:
|
||
logger.error("cron [task] error for %s: %s", label, e)
|
||
|
||
else:
|
||
logger.warning("cron: unknown type %r (job %s)", job_type, job.get("id"))
|
||
return
|
||
|
||
# Record last_run in the right persona's CRONS.json
|
||
u, p = job.get("user"), job.get("persona")
|
||
crons = load_crons(u, p)
|
||
for c in crons:
|
||
if c["id"] == job["id"]:
|
||
c["last_run"] = datetime.now(timezone.utc).isoformat()
|
||
break
|
||
save_crons(crons, u, p)
|