""" Cron job management tools for Inara. Jobs are stored in inara/CRONS.json and registered with the live APScheduler instance so they survive restarts and take effect immediately without a restart. Tools: cron_list — show all scheduled jobs cron_add — create a job and register it immediately cron_remove — delete a job and unschedule it cron_toggle — pause or resume a job without deleting it reminders_clear — erase inara/REMINDERS.md (dismiss all pending reminders) """ import asyncio import secrets from datetime import datetime, timezone from pathlib import Path from persona import persona_path, get_user, get_persona from cron_runner import load_crons, save_crons, parse_schedule def _now() -> str: return datetime.now(timezone.utc).isoformat() def _short_id() -> str: return "c_" + secrets.token_urlsafe(6) # --------------------------------------------------------------------------- # Sync implementations # --------------------------------------------------------------------------- def _cron_list() -> str: crons = load_crons(get_user(), get_persona()) if not crons: return "No crons scheduled." lines = [f"Crons ({len(crons)}):\n"] for c in crons: status = "enabled" if c.get("enabled", True) else "PAUSED " last = c.get("last_run") last_str = last[:10] if last else "never" lines.append( f" {c['id']} [{status}] {c['schedule']:<18} " f"{c['type']:<7} {c['label']} (last: {last_str})" ) return "\n".join(lines) def _cron_add(label: str, schedule: str, job_type: str, payload: str) -> str: # Validate schedule first — raises ValueError with a clear message on bad input try: sched_kwargs = parse_schedule(schedule) except ValueError as e: return f"Bad schedule: {e}" if job_type not in ("remind", "note"): return "Bad type: must be 'remind' or 'note'." current_user = get_user() current_persona = get_persona() crons = load_crons(current_user, current_persona) job = { "id": _short_id(), "user": current_user, "persona": current_persona, "label": label, "schedule": schedule, "type": job_type, "payload": payload, "enabled": True, "created_at": _now(), "last_run": None, } crons.append(job) save_crons(crons, current_user, current_persona) # Register with the live scheduler _scheduler_add(job, sched_kwargs) return f"Created and scheduled: {job['id']} {schedule} [{job_type}] {label} (user: {current_user}, persona: {current_persona})" def _cron_remove(cron_id: str) -> str: user = get_user() persona = get_persona() crons = load_crons(user, persona) before = len(crons) crons = [c for c in crons if c["id"] != cron_id] if len(crons) == before: return f"Not found: {cron_id}" save_crons(crons, user, persona) _scheduler_remove(f"{user}:{persona}:{cron_id}") return f"Removed: {cron_id}" def _cron_toggle(cron_id: str) -> str: user = get_user() persona = get_persona() crons = load_crons(user, persona) for c in crons: if c["id"] == cron_id: c["enabled"] = not c.get("enabled", True) save_crons(crons, user, persona) action = "resumed" if c["enabled"] else "paused" sched_id = f"{user}:{persona}:{cron_id}" _scheduler_resume(sched_id) if c["enabled"] else _scheduler_pause(sched_id) return f"{action.capitalize()}: {cron_id} {c['label']}" return f"Not found: {cron_id}" def _reminders_clear() -> str: p = persona_path() / "REMINDERS.md" p.write_text("") return "Reminders cleared." # --------------------------------------------------------------------------- # Scheduler bridge — thin wrappers so the tool layer never touches APScheduler # directly, keeping it swappable # --------------------------------------------------------------------------- def _scheduler_add(job: dict, sched_kwargs: dict) -> None: try: import scheduler as sched_module from cron_runner import run_job s = sched_module.get_scheduler() if s and s.running: sched_id = f"{job.get('user', 'scott')}:{job.get('persona', 'inara')}:{job['id']}" s.add_job( lambda j=job: asyncio.ensure_future(run_job(j)), "cron", id=sched_id, replace_existing=True, **sched_kwargs, ) except Exception as e: import logging logging.getLogger(__name__).warning("scheduler_add failed: %s", e) def _scheduler_remove(job_id: str) -> None: try: import scheduler as sched_module s = sched_module.get_scheduler() if s and s.running: s.remove_job(job_id) except Exception: pass def _scheduler_pause(job_id: str) -> None: try: import scheduler as sched_module s = sched_module.get_scheduler() if s and s.running: s.pause_job(job_id) except Exception: pass def _scheduler_resume(job_id: str) -> None: try: import scheduler as sched_module s = sched_module.get_scheduler() if s and s.running: s.resume_job(job_id) except Exception: pass # --------------------------------------------------------------------------- # Async wrappers # --------------------------------------------------------------------------- async def cron_list() -> str: return await asyncio.to_thread(_cron_list) async def cron_add(label: str, schedule: str, job_type: str, payload: str) -> str: return await asyncio.to_thread(_cron_add, label, schedule, job_type, payload) async def cron_remove(cron_id: str) -> str: return await asyncio.to_thread(_cron_remove, cron_id) async def cron_toggle(cron_id: str) -> str: return await asyncio.to_thread(_cron_toggle, cron_id) async def reminders_clear() -> str: return await asyncio.to_thread(_reminders_clear)