Adds task_list, task_create, task_update, task_complete orchestrator tools backed by inara/TASKS.json — private to each agent instance. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
136 lines
4.0 KiB
Python
136 lines
4.0 KiB
Python
"""
|
|
Personal task management tools for Inara.
|
|
|
|
Tasks are stored in inara/TASKS.json — private to each agent instance.
|
|
Schema per task:
|
|
{
|
|
"id": short random string,
|
|
"title": str,
|
|
"description": str | None,
|
|
"status": "todo" | "in_progress" | "done",
|
|
"priority": "low" | "normal" | "high",
|
|
"created_at": ISO 8601,
|
|
"updated_at": ISO 8601
|
|
}
|
|
"""
|
|
|
|
import json
|
|
import secrets
|
|
import asyncio
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
|
|
from config import settings
|
|
|
|
|
|
def _tasks_path() -> Path:
|
|
return settings.inara_path() / "TASKS.json"
|
|
|
|
|
|
def _now() -> str:
|
|
return datetime.now(timezone.utc).isoformat()
|
|
|
|
|
|
def _load() -> list[dict]:
|
|
p = _tasks_path()
|
|
if not p.exists():
|
|
return []
|
|
try:
|
|
return json.loads(p.read_text())
|
|
except Exception:
|
|
return []
|
|
|
|
|
|
def _save(tasks: list[dict]) -> None:
|
|
_tasks_path().write_text(json.dumps(tasks, indent=2) + "\n")
|
|
|
|
|
|
def _short_id() -> str:
|
|
return "t_" + secrets.token_urlsafe(6)
|
|
|
|
|
|
def _format_task(t: dict) -> str:
|
|
pri = f"[{t['priority']}]" if t.get("priority") != "normal" else ""
|
|
desc = f"\n {t['description']}" if t.get("description") else ""
|
|
return f"• [{t['status']}] {t['id']} {pri} {t['title']}{desc}".strip()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Sync implementations — called via asyncio.to_thread
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _task_list(status: str | None) -> str:
|
|
tasks = _load()
|
|
if status:
|
|
tasks = [t for t in tasks if t["status"] == status]
|
|
if not tasks:
|
|
label = f"No {status} tasks." if status else "No tasks yet."
|
|
return label
|
|
lines = [f"Tasks ({len(tasks)}):\n"]
|
|
for t in tasks:
|
|
lines.append(_format_task(t))
|
|
return "\n".join(lines)
|
|
|
|
|
|
def _task_create(title: str, description: str | None, priority: str) -> str:
|
|
if priority not in ("low", "normal", "high"):
|
|
priority = "normal"
|
|
tasks = _load()
|
|
task = {
|
|
"id": _short_id(),
|
|
"title": title,
|
|
"description": description,
|
|
"status": "todo",
|
|
"priority": priority,
|
|
"created_at": _now(),
|
|
"updated_at": _now(),
|
|
}
|
|
tasks.append(task)
|
|
_save(tasks)
|
|
return f"Created: {_format_task(task)}"
|
|
|
|
|
|
def _task_update(task_id: str, status: str | None, title: str | None,
|
|
description: str | None, priority: str | None) -> str:
|
|
tasks = _load()
|
|
for t in tasks:
|
|
if t["id"] == task_id:
|
|
if status and status in ("todo", "in_progress", "done"):
|
|
t["status"] = status
|
|
if title:
|
|
t["title"] = title
|
|
if description is not None:
|
|
t["description"] = description
|
|
if priority and priority in ("low", "normal", "high"):
|
|
t["priority"] = priority
|
|
t["updated_at"] = _now()
|
|
_save(tasks)
|
|
return f"Updated: {_format_task(t)}"
|
|
return f"Task not found: {task_id}"
|
|
|
|
|
|
def _task_complete(task_id: str) -> str:
|
|
return _task_update(task_id, status="done", title=None, description=None, priority=None)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Async wrappers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
async def task_list(status: str | None = None) -> str:
|
|
return await asyncio.to_thread(_task_list, status)
|
|
|
|
|
|
async def task_create(title: str, description: str | None = None,
|
|
priority: str = "normal") -> str:
|
|
return await asyncio.to_thread(_task_create, title, description, priority)
|
|
|
|
|
|
async def task_update(task_id: str, status: str | None = None, title: str | None = None,
|
|
description: str | None = None, priority: str | None = None) -> str:
|
|
return await asyncio.to_thread(_task_update, task_id, status, title, description, priority)
|
|
|
|
|
|
async def task_complete(task_id: str) -> str:
|
|
return await asyncio.to_thread(_task_complete, task_id)
|