Four new tools for full journal entry lifecycle management: - ae_journal_entry_update — PATCH any combination of fields (title, content, summary, tags, enable); only provided fields are changed - ae_journal_entry_disable — soft-delete via enable=false - ae_journal_entry_append — fetch entry, append timestamped section to the bottom (ideal for running logs / data logs) - ae_journal_entry_prepend — fetch entry, prepend timestamped section to the top (most-recent-first pattern) Shared _get_entry / _patch_entry helpers keep the read-modify-write logic DRY. Also fixed journal_entry_create to prefer the canonical journal_entry_id field over the legacy id_random alias. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
387 lines
13 KiB
Python
387 lines
13 KiB
Python
"""
|
|
Aether Platform knowledge tools — journal search, listing, and entry management.
|
|
|
|
These tools give the orchestrator read/write access to the AE Journals module,
|
|
which serves as the primary long-term knowledge base.
|
|
|
|
Auth: x-aether-api-key + x-account-id headers (same pattern as agents_sync scripts).
|
|
API: V3 CRUD — POST /v3/crud/journal_entry/search, POST /v3/crud/journal/{id}/journal_entry/
|
|
PATCH /v3/crud/journal_entry/{entry_id}, GET /v3/crud/journal_entry/{entry_id}
|
|
"""
|
|
|
|
import asyncio
|
|
import logging
|
|
from config import settings
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Shared helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _headers() -> dict:
|
|
return {
|
|
"x-aether-api-key": settings.ae_api_key,
|
|
"x-account-id": settings.ae_account_id,
|
|
"Content-Type": "application/json",
|
|
}
|
|
|
|
|
|
def _check_config() -> str | None:
|
|
"""Return an error string if AE API is not configured, else None."""
|
|
if not settings.ae_api_key or not settings.ae_account_id:
|
|
return (
|
|
"AE API not configured. Set AE_API_KEY and AE_ACCOUNT_ID in .env. "
|
|
"Values are the same as agents_sync/mcp/.env."
|
|
)
|
|
return None
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Tool: ae_journal_search
|
|
# ---------------------------------------------------------------------------
|
|
|
|
async def journal_search(query: str, journal_id: str | None = None, max_results: int = 10) -> str:
|
|
"""Search AE Journal entries by keyword.
|
|
|
|
Searches across the default_qry_str field (title + content excerpt).
|
|
Optionally scoped to a specific journal by journal_id (id_random).
|
|
Returns a markdown-formatted list of matching entries.
|
|
"""
|
|
err = _check_config()
|
|
if err:
|
|
return err
|
|
|
|
return await asyncio.to_thread(_sync_journal_search, query, journal_id, max_results)
|
|
|
|
|
|
def _sync_journal_search(query: str, journal_id: str | None, max_results: int) -> str:
|
|
import requests
|
|
|
|
url = f"{settings.ae_api_url}/v3/crud/journal_entry/search"
|
|
search_body = {
|
|
"and_filters": [
|
|
{"field": "default_qry_str", "op": "icontains", "value": query}
|
|
],
|
|
"page_size": max_results,
|
|
}
|
|
|
|
params = {}
|
|
if journal_id:
|
|
params["for_obj_type"] = "journal"
|
|
params["for_obj_id"] = journal_id
|
|
|
|
try:
|
|
resp = requests.post(
|
|
url,
|
|
headers=_headers(),
|
|
params=params,
|
|
json=search_body,
|
|
timeout=settings.ae_api_timeout,
|
|
)
|
|
resp.raise_for_status()
|
|
data = resp.json()
|
|
except Exception as e:
|
|
logger.warning("ae_journal_search failed: %s", e)
|
|
return f"Journal search error: {e}"
|
|
|
|
entries = data.get("data", [])
|
|
if not entries:
|
|
return f"No journal entries found matching: {query}"
|
|
|
|
lines = [f"Journal entries matching **{query}** ({len(entries)} result(s)):\n"]
|
|
for entry in entries:
|
|
title = entry.get("name") or "(untitled)"
|
|
entry_id = entry.get("id_random", "")
|
|
journal_name = entry.get("journal_name") or entry.get("parent_name") or ""
|
|
summary = entry.get("summary") or ""
|
|
content_preview = (entry.get("content") or "")[:200].replace("\n", " ")
|
|
|
|
header = f"**{title}**"
|
|
if journal_name:
|
|
header += f" ({journal_name})"
|
|
if entry_id:
|
|
header += f" — id: `{entry_id}`"
|
|
|
|
lines.append(header)
|
|
if summary:
|
|
lines.append(f" Summary: {summary}")
|
|
if content_preview:
|
|
lines.append(f" {content_preview}…")
|
|
lines.append("")
|
|
|
|
return "\n".join(lines).strip()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Tool: ae_journal_list
|
|
# ---------------------------------------------------------------------------
|
|
|
|
async def journal_list() -> str:
|
|
"""List all journals accessible to the configured AE account."""
|
|
err = _check_config()
|
|
if err:
|
|
return err
|
|
return await asyncio.to_thread(_sync_journal_list)
|
|
|
|
|
|
def _sync_journal_list() -> str:
|
|
import requests
|
|
|
|
url = f"{settings.ae_api_url}/v3/crud/journal/search"
|
|
try:
|
|
resp = requests.post(
|
|
url,
|
|
headers=_headers(),
|
|
json={"page_size": 100},
|
|
timeout=settings.ae_api_timeout,
|
|
)
|
|
resp.raise_for_status()
|
|
data = resp.json()
|
|
except Exception as e:
|
|
logger.warning("ae_journal_list failed: %s", e)
|
|
return f"Journal list error: {e}"
|
|
|
|
journals = data.get("data", [])
|
|
if not journals:
|
|
return "No journals found for this account."
|
|
|
|
lines = [f"Journals ({len(journals)}):\n"]
|
|
for j in journals:
|
|
jid = j.get("journal_id") or j.get("id_random") or j.get("id") or "?"
|
|
name = j.get("name") or "(untitled)"
|
|
desc = j.get("description") or ""
|
|
line = f"- **{name}** — id: `{jid}`"
|
|
if desc:
|
|
line += f"\n {desc}"
|
|
lines.append(line)
|
|
|
|
return "\n".join(lines)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Tool: ae_journal_entry_create
|
|
# ---------------------------------------------------------------------------
|
|
|
|
async def journal_entry_create(
|
|
journal_id: str,
|
|
title: str,
|
|
content: str,
|
|
summary: str = "",
|
|
tags: str = "",
|
|
) -> str:
|
|
"""Create a new entry in an AE Journal.
|
|
|
|
Args:
|
|
journal_id: The id_random of the target journal (use ae_journal_search to find it,
|
|
or ask the user which journal to write to).
|
|
title: Entry title (name field).
|
|
content: Full entry content (markdown supported).
|
|
summary: Optional short summary (1-2 sentences).
|
|
tags: Optional comma-separated tags.
|
|
|
|
Returns a confirmation with the new entry's id_random, or an error message.
|
|
"""
|
|
err = _check_config()
|
|
if err:
|
|
return err
|
|
|
|
return await asyncio.to_thread(
|
|
_sync_journal_entry_create, journal_id, title, content, summary, tags
|
|
)
|
|
|
|
|
|
def _sync_journal_entry_create(
|
|
journal_id: str, title: str, content: str, summary: str, tags: str
|
|
) -> str:
|
|
import requests
|
|
|
|
url = f"{settings.ae_api_url}/v3/crud/journal/{journal_id}/journal_entry/"
|
|
data: dict = {"name": title, "content": content}
|
|
if summary:
|
|
data["summary"] = summary
|
|
if tags:
|
|
data["tags"] = [t.strip() for t in tags.split(",") if t.strip()]
|
|
|
|
try:
|
|
resp = requests.post(
|
|
url,
|
|
headers=_headers(),
|
|
json=data,
|
|
timeout=settings.ae_api_timeout,
|
|
)
|
|
resp.raise_for_status()
|
|
result = resp.json()
|
|
except Exception as e:
|
|
logger.warning("ae_journal_entry_create failed: %s", e)
|
|
return f"Journal entry creation error: {e}"
|
|
|
|
entry_id = (
|
|
result.get("data", {}).get("journal_entry_id")
|
|
or result.get("data", {}).get("id_random")
|
|
or result.get("id_random")
|
|
or "unknown"
|
|
)
|
|
return f"Journal entry created. id: `{entry_id}`, title: \"{title}\", journal: `{journal_id}`"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Shared helper: fetch a single journal entry by id
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _get_entry(entry_id: str) -> dict | str:
|
|
"""Return the entry dict, or an error string on failure."""
|
|
import requests
|
|
url = f"{settings.ae_api_url}/v3/crud/journal_entry/{entry_id}"
|
|
try:
|
|
resp = requests.get(url, headers=_headers(), timeout=settings.ae_api_timeout)
|
|
resp.raise_for_status()
|
|
data = resp.json()
|
|
entry = data.get("data") or data
|
|
if not isinstance(entry, dict):
|
|
return f"Unexpected response shape for entry {entry_id}"
|
|
return entry
|
|
except Exception as e:
|
|
logger.warning("_get_entry %s failed: %s", entry_id, e)
|
|
return f"Error fetching entry {entry_id}: {e}"
|
|
|
|
|
|
def _patch_entry(entry_id: str, payload: dict) -> str:
|
|
"""PATCH a journal entry. Returns a success/error string."""
|
|
import requests
|
|
url = f"{settings.ae_api_url}/v3/crud/journal_entry/{entry_id}"
|
|
try:
|
|
resp = requests.patch(
|
|
url,
|
|
headers=_headers(),
|
|
json=payload,
|
|
timeout=settings.ae_api_timeout,
|
|
)
|
|
resp.raise_for_status()
|
|
return "ok"
|
|
except Exception as e:
|
|
logger.warning("_patch_entry %s failed: %s", entry_id, e)
|
|
return f"Error updating entry {entry_id}: {e}"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Tool: ae_journal_entry_update
|
|
# ---------------------------------------------------------------------------
|
|
|
|
async def journal_entry_update(
|
|
entry_id: str,
|
|
title: str = "",
|
|
content: str = "",
|
|
summary: str = "",
|
|
tags: str = "",
|
|
enable: bool | None = None,
|
|
) -> str:
|
|
"""Update fields on an existing journal entry. Only provided fields are changed."""
|
|
err = _check_config()
|
|
if err:
|
|
return err
|
|
return await asyncio.to_thread(_sync_journal_entry_update, entry_id, title, content, summary, tags, enable)
|
|
|
|
|
|
def _sync_journal_entry_update(
|
|
entry_id: str,
|
|
title: str,
|
|
content: str,
|
|
summary: str,
|
|
tags: str,
|
|
enable: bool | None,
|
|
) -> str:
|
|
payload: dict = {}
|
|
if title:
|
|
payload["name"] = title
|
|
if content:
|
|
payload["content"] = content
|
|
if summary:
|
|
payload["summary"] = summary
|
|
if tags:
|
|
payload["tags"] = [t.strip() for t in tags.split(",") if t.strip()]
|
|
if enable is not None:
|
|
payload["enable"] = enable
|
|
|
|
if not payload:
|
|
return "Nothing to update — no fields provided."
|
|
|
|
result = _patch_entry(entry_id, payload)
|
|
if result != "ok":
|
|
return result
|
|
|
|
updated = ", ".join(payload.keys())
|
|
return f"Journal entry `{entry_id}` updated. Fields changed: {updated}"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Tool: ae_journal_entry_disable
|
|
# ---------------------------------------------------------------------------
|
|
|
|
async def journal_entry_disable(entry_id: str) -> str:
|
|
"""Soft-delete a journal entry by setting enable=false."""
|
|
err = _check_config()
|
|
if err:
|
|
return err
|
|
return await asyncio.to_thread(_patch_entry, entry_id, {"enable": False})
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Tool: ae_journal_entry_append
|
|
# ---------------------------------------------------------------------------
|
|
|
|
async def journal_entry_append(entry_id: str, content: str, heading: str = "") -> str:
|
|
"""Append a timestamped section to the bottom of a journal entry's content."""
|
|
err = _check_config()
|
|
if err:
|
|
return err
|
|
return await asyncio.to_thread(_sync_journal_entry_append, entry_id, content, heading)
|
|
|
|
|
|
def _sync_journal_entry_append(entry_id: str, content: str, heading: str) -> str:
|
|
from datetime import datetime, timezone
|
|
|
|
entry = _get_entry(entry_id)
|
|
if isinstance(entry, str):
|
|
return entry
|
|
|
|
existing = (entry.get("content") or "").rstrip()
|
|
ts = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M UTC")
|
|
section_heading = heading or ts
|
|
new_content = f"{existing}\n\n### {section_heading}\n{content.strip()}"
|
|
|
|
result = _patch_entry(entry_id, {"content": new_content})
|
|
if result != "ok":
|
|
return result
|
|
return f"Appended to journal entry `{entry_id}` under heading \"{section_heading}\"."
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Tool: ae_journal_entry_prepend
|
|
# ---------------------------------------------------------------------------
|
|
|
|
async def journal_entry_prepend(entry_id: str, content: str, heading: str = "") -> str:
|
|
"""Prepend a timestamped section to the top of a journal entry's content."""
|
|
err = _check_config()
|
|
if err:
|
|
return err
|
|
return await asyncio.to_thread(_sync_journal_entry_prepend, entry_id, content, heading)
|
|
|
|
|
|
def _sync_journal_entry_prepend(entry_id: str, content: str, heading: str) -> str:
|
|
from datetime import datetime, timezone
|
|
|
|
entry = _get_entry(entry_id)
|
|
if isinstance(entry, str):
|
|
return entry
|
|
|
|
existing = (entry.get("content") or "").lstrip()
|
|
ts = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M UTC")
|
|
section_heading = heading or ts
|
|
new_content = f"### {section_heading}\n{content.strip()}\n\n{existing}"
|
|
|
|
result = _patch_entry(entry_id, {"content": new_content})
|
|
if result != "ok":
|
|
return result
|
|
return f"Prepended to journal entry `{entry_id}` under heading \"{section_heading}\"."
|