Files
Cortex-Inara/cortex/tools/ae_tasks.py
Scott Idem ed472ce9a0 feat: Intelligence Layer Phase 1 — orchestrator service
Adds the Gemini API orchestrator (ReAct tool loop → Claude responder):

Orchestrator engine + router:
- orchestrator_engine.py: Gemini API tool loop, Claude CLI handoff
- routers/orchestrator.py: POST /orchestrate (async job queue), GET /orchestrate/{job_id}

Tools (cortex/tools/):
- web.py: DuckDuckGo web search (no key required)
- ae_knowledge.py: ae_journal_search + ae_journal_entry_create (AE V3 API)
- ae_tasks.py: ae_task_list (reads agents_sync Kanban filesystem)
- files.py: file_read (path-allowlisted to safe dirs)

Config + deps:
- config.py: orchestrator, DuckDuckGo, and AE API settings
- requirements.txt: google-genai, duckduckgo-search
- .env.default: reference config with all new keys documented

Docs:
- CLAUDE.md, README.md, documentation/ added to repo
- Port references updated 7331 → 8000 throughout
- Default model updated to gemini-2.5-flash

Tested: ae_task_list, ae_journal_search, web_search all working end-to-end.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-18 19:37:49 -04:00

101 lines
3.1 KiB
Python

"""
Aether task list tool — reads the agents_sync Kanban board.
Reads task JSON files directly from the agents_sync filesystem rather than
making an HTTP call, since the tasks directory is always locally available
(synced via Syncthing). This avoids needing a separate API endpoint for tasks.
Structure:
agents_sync/tasks/01_todo/ — pending tasks
agents_sync/tasks/02_in_progress/ — active tasks
agents_sync/tasks/03_done/ — completed tasks (not included by default)
"""
import asyncio
import json
import logging
from pathlib import Path
logger = logging.getLogger(__name__)
# Resolved at import time — agents_sync is always at ~/agents_sync on this machine.
# If the path doesn't exist the tool returns a helpful error rather than crashing.
_AGENTS_SYNC = Path.home() / "agents_sync"
_TASKS_ROOT = _AGENTS_SYNC / "tasks"
async def task_list(include_done: bool = False) -> str:
"""List tasks from the agents_sync Kanban board.
Reads the todo and in_progress buckets (and optionally done).
Returns a markdown summary grouped by status.
Args:
include_done: If True, also include completed tasks (can be noisy).
"""
return await asyncio.to_thread(_sync_task_list, include_done)
def _sync_task_list(include_done: bool) -> str:
if not _TASKS_ROOT.exists():
return f"Task directory not found: {_TASKS_ROOT}"
buckets = [
("01_todo", "Todo"),
("02_in_progress", "In Progress"),
]
if include_done:
buckets.append(("03_done", "Done"))
sections: list[str] = []
total = 0
for dir_name, label in buckets:
bucket_dir = _TASKS_ROOT / dir_name
if not bucket_dir.exists():
continue
tasks = _read_bucket(bucket_dir)
total += len(tasks)
if not tasks:
continue
lines = [f"## {label} ({len(tasks)})\n"]
for task in tasks:
title = task.get("title") or task.get("name") or "(untitled)"
assigned = task.get("assigned_to") or ""
task_id = task.get("id") or ""
desc = task.get("description") or ""
header = f"- **{title}**"
if assigned:
header += f" (assigned: {assigned})"
if task_id:
header += f" — `{task_id}`"
lines.append(header)
if desc:
# First sentence / 120 chars of description
short = desc.split(".")[0][:120]
lines.append(f" {short}")
sections.append("\n".join(lines))
if not sections:
return "No tasks found on the Kanban board."
header_line = f"# Kanban Board — {total} task(s)\n"
return header_line + "\n\n".join(sections)
def _read_bucket(bucket_dir: Path) -> list[dict]:
"""Read and parse all JSON task files in a bucket directory."""
tasks = []
for path in sorted(bucket_dir.glob("*.json")):
try:
data = json.loads(path.read_text())
tasks.append(data)
except Exception as e:
logger.warning("Failed to read task file %s: %s", path, e)
return tasks