Files
Cortex-Inara/cortex/tools/ae_knowledge.py
Scott Idem 77327d97ad feat: improve AE Journal read toolset
- ae_journal_entry_read: expose full entry content by id_random (title,
  journal, tags, summary, full content with configurable truncation)
- ae_journal_entries_list: browse all entries in a journal newest-first,
  numbered with id/title/tags/summary/date and pagination support
- ae_journal_search: richer output — tags, updated date, 400-char preview
  (was 200), show summary OR preview (not both when summary exists)

_get_entry() was already implemented; read tool just exposes it properly.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-30 19:47:59 -04:00

510 lines
17 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Aether Platform knowledge tools — journal search, listing, and entry management.
These tools give the orchestrator read/write access to the AE Journals module,
which serves as the primary long-term knowledge base.
Auth: x-aether-api-key + x-account-id headers (same pattern as agents_sync scripts).
API: V3 CRUD — POST /v3/crud/journal_entry/search, POST /v3/crud/journal/{id}/journal_entry/
PATCH /v3/crud/journal_entry/{entry_id}, GET /v3/crud/journal_entry/{entry_id}
"""
import asyncio
import logging
from config import settings
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Shared helpers
# ---------------------------------------------------------------------------
def _headers() -> dict:
return {
"x-aether-api-key": settings.ae_api_key,
"x-account-id": settings.ae_account_id,
"Content-Type": "application/json",
}
def _check_config() -> str | None:
"""Return an error string if AE API is not configured, else None."""
if not settings.ae_api_key or not settings.ae_account_id:
return (
"AE API not configured. Set AE_API_KEY and AE_ACCOUNT_ID in .env. "
"Values are the same as agents_sync/mcp/.env."
)
return None
# ---------------------------------------------------------------------------
# Tool: ae_journal_search
# ---------------------------------------------------------------------------
async def journal_search(query: str, journal_id: str | None = None, max_results: int = 10) -> str:
"""Search AE Journal entries by keyword.
Searches across the default_qry_str field (title + content excerpt).
Optionally scoped to a specific journal by journal_id (id_random).
Returns a markdown-formatted list of matching entries.
"""
err = _check_config()
if err:
return err
return await asyncio.to_thread(_sync_journal_search, query, journal_id, max_results)
def _sync_journal_search(query: str, journal_id: str | None, max_results: int) -> str:
import requests
url = f"{settings.ae_api_url}/v3/crud/journal_entry/search"
search_body = {
"and_filters": [
{"field": "default_qry_str", "op": "icontains", "value": query}
],
"page_size": max_results,
}
params = {}
if journal_id:
params["for_obj_type"] = "journal"
params["for_obj_id"] = journal_id
try:
resp = requests.post(
url,
headers=_headers(),
params=params,
json=search_body,
timeout=settings.ae_api_timeout,
)
resp.raise_for_status()
data = resp.json()
except Exception as e:
logger.warning("ae_journal_search failed: %s", e)
return f"Journal search error: {e}"
entries = data.get("data", [])
if not entries:
return f"No journal entries found matching: {query}"
lines = [f"Journal entries matching **{query}** ({len(entries)} result(s)):\n"]
for entry in entries:
title = entry.get("name") or "(untitled)"
entry_id = entry.get("id_random", "")
journal_name = entry.get("journal_name") or entry.get("parent_name") or ""
summary = entry.get("summary") or ""
tags = entry.get("tags") or []
updated = (entry.get("updated_at") or entry.get("created_at") or "")[:10]
content_preview = (entry.get("content") or "")[:400].replace("\n", " ")
header = f"**{title}**"
if journal_name:
header += f" ({journal_name})"
header += f" — id: `{entry_id}`"
if updated:
header += f" [{updated}]"
lines.append(header)
if tags:
lines.append(f" Tags: {', '.join(tags)}")
if summary:
lines.append(f" Summary: {summary}")
elif content_preview:
lines.append(f" {content_preview}{'' if len(entry.get('content','')) > 400 else ''}")
lines.append("")
return "\n".join(lines).strip()
# ---------------------------------------------------------------------------
# Tool: ae_journal_list
# ---------------------------------------------------------------------------
async def journal_list() -> str:
"""List all journals accessible to the configured AE account."""
err = _check_config()
if err:
return err
return await asyncio.to_thread(_sync_journal_list)
def _sync_journal_list() -> str:
import requests
url = f"{settings.ae_api_url}/v3/crud/journal/search"
try:
resp = requests.post(
url,
headers=_headers(),
json={"page_size": 100},
timeout=settings.ae_api_timeout,
)
resp.raise_for_status()
data = resp.json()
except Exception as e:
logger.warning("ae_journal_list failed: %s", e)
return f"Journal list error: {e}"
journals = data.get("data", [])
if not journals:
return "No journals found for this account."
lines = [f"Journals ({len(journals)}):\n"]
for j in journals:
jid = j.get("journal_id") or j.get("id_random") or j.get("id") or "?"
name = j.get("name") or "(untitled)"
desc = j.get("description") or ""
line = f"- **{name}** — id: `{jid}`"
if desc:
line += f"\n {desc}"
lines.append(line)
return "\n".join(lines)
# ---------------------------------------------------------------------------
# Tool: ae_journal_entry_create
# ---------------------------------------------------------------------------
async def journal_entry_create(
journal_id: str,
title: str,
content: str,
summary: str = "",
tags: str = "",
) -> str:
"""Create a new entry in an AE Journal.
Args:
journal_id: The id_random of the target journal (use ae_journal_search to find it,
or ask the user which journal to write to).
title: Entry title (name field).
content: Full entry content (markdown supported).
summary: Optional short summary (1-2 sentences).
tags: Optional comma-separated tags.
Returns a confirmation with the new entry's id_random, or an error message.
"""
err = _check_config()
if err:
return err
return await asyncio.to_thread(
_sync_journal_entry_create, journal_id, title, content, summary, tags
)
def _sync_journal_entry_create(
journal_id: str, title: str, content: str, summary: str, tags: str
) -> str:
import requests
url = f"{settings.ae_api_url}/v3/crud/journal/{journal_id}/journal_entry/"
data: dict = {"name": title, "content": content}
if summary:
data["summary"] = summary
if tags:
data["tags"] = [t.strip() for t in tags.split(",") if t.strip()]
try:
resp = requests.post(
url,
headers=_headers(),
json=data,
timeout=settings.ae_api_timeout,
)
resp.raise_for_status()
result = resp.json()
except Exception as e:
logger.warning("ae_journal_entry_create failed: %s", e)
return f"Journal entry creation error: {e}"
entry_id = (
result.get("data", {}).get("journal_entry_id")
or result.get("data", {}).get("id_random")
or result.get("id_random")
or "unknown"
)
return f"Journal entry created. id: `{entry_id}`, title: \"{title}\", journal: `{journal_id}`"
# ---------------------------------------------------------------------------
# Shared helper: fetch a single journal entry by id
# ---------------------------------------------------------------------------
def _get_entry(entry_id: str) -> dict | str:
"""Return the entry dict, or an error string on failure."""
import requests
url = f"{settings.ae_api_url}/v3/crud/journal_entry/{entry_id}"
try:
resp = requests.get(url, headers=_headers(), timeout=settings.ae_api_timeout)
resp.raise_for_status()
data = resp.json()
entry = data.get("data") or data
if not isinstance(entry, dict):
return f"Unexpected response shape for entry {entry_id}"
return entry
except Exception as e:
logger.warning("_get_entry %s failed: %s", entry_id, e)
return f"Error fetching entry {entry_id}: {e}"
def _patch_entry(entry_id: str, payload: dict) -> str:
"""PATCH a journal entry. Returns a success/error string."""
import requests
url = f"{settings.ae_api_url}/v3/crud/journal_entry/{entry_id}"
try:
resp = requests.patch(
url,
headers=_headers(),
json=payload,
timeout=settings.ae_api_timeout,
)
resp.raise_for_status()
return "ok"
except Exception as e:
logger.warning("_patch_entry %s failed: %s", entry_id, e)
return f"Error updating entry {entry_id}: {e}"
# ---------------------------------------------------------------------------
# Tool: ae_journal_entry_read
# ---------------------------------------------------------------------------
async def journal_entry_read(entry_id: str, max_content_chars: int = 4000) -> str:
"""Return the full content of a single journal entry by its id_random."""
err = _check_config()
if err:
return err
return await asyncio.to_thread(_sync_journal_entry_read, entry_id, max_content_chars)
def _sync_journal_entry_read(entry_id: str, max_content_chars: int) -> str:
entry = _get_entry(entry_id)
if isinstance(entry, str):
return entry
title = entry.get("name") or "(untitled)"
journal = entry.get("journal_name") or entry.get("parent_name") or ""
summary = entry.get("summary") or ""
tags = entry.get("tags") or []
content = entry.get("content") or ""
updated = (entry.get("updated_at") or entry.get("created_at") or "")[:19].replace("T", " ")
enabled = entry.get("enable", True)
lines = [f"# {title}"]
meta: list[str] = [f"id: `{entry_id}`"]
if journal:
meta.append(f"journal: {journal}")
if updated:
meta.append(f"updated: {updated}")
if not enabled:
meta.append("**DISABLED**")
lines.append(" ".join(meta))
if tags:
lines.append(f"Tags: {', '.join(tags)}")
if summary:
lines.append(f"\nSummary: {summary}")
lines.append("\n---\n")
truncated = len(content) > max_content_chars
lines.append(content[:max_content_chars])
if truncated:
lines.append(
f"\n\n[Content truncated at {max_content_chars} chars — "
f"{len(content)} total. Call again with a higher max_content_chars to read more.]"
)
return "\n".join(lines)
# ---------------------------------------------------------------------------
# Tool: ae_journal_entries_list
# ---------------------------------------------------------------------------
async def journal_entries_list(journal_id: str, max_results: int = 20, page: int = 1) -> str:
"""List entries in a specific journal, newest first."""
err = _check_config()
if err:
return err
return await asyncio.to_thread(_sync_journal_entries_list, journal_id, max_results, page)
def _sync_journal_entries_list(journal_id: str, max_results: int, page: int) -> str:
import requests
url = f"{settings.ae_api_url}/v3/crud/journal_entry/search"
search_body: dict = {
"page_size": max_results,
"page": page,
"order_by": "-updated_at",
}
params = {"for_obj_type": "journal", "for_obj_id": journal_id}
try:
resp = requests.post(
url,
headers=_headers(),
params=params,
json=search_body,
timeout=settings.ae_api_timeout,
)
resp.raise_for_status()
data = resp.json()
except Exception as e:
logger.warning("ae_journal_entries_list failed: %s", e)
return f"Journal entries list error: {e}"
entries = data.get("data", [])
total = data.get("total") or data.get("count") or len(entries)
if not entries:
return f"No entries found in journal `{journal_id}`."
offset = (page - 1) * max_results + 1
lines = [f"Entries in journal `{journal_id}` — showing {offset}{offset + len(entries) - 1} of {total}:\n"]
for i, entry in enumerate(entries, offset):
title = entry.get("name") or "(untitled)"
entry_id = entry.get("id_random", "")
tags = entry.get("tags") or []
summary = entry.get("summary") or ""
updated = (entry.get("updated_at") or entry.get("created_at") or "")[:10]
enabled = entry.get("enable", True)
status = "" if enabled else " [disabled]"
date_str = f" [{updated}]" if updated else ""
lines.append(f"{i}. **{title}**{status} — id: `{entry_id}`{date_str}")
if tags:
lines.append(f" Tags: {', '.join(tags)}")
if summary:
lines.append(f" {summary[:150]}{'' if len(summary) > 150 else ''}")
lines.append("")
if total > offset + len(entries) - 1:
lines.append(f"(More entries available — call again with page={page + 1})")
return "\n".join(lines).rstrip()
# ---------------------------------------------------------------------------
# Tool: ae_journal_entry_update
# ---------------------------------------------------------------------------
async def journal_entry_update(
entry_id: str,
title: str = "",
content: str = "",
summary: str = "",
tags: str = "",
enable: bool | None = None,
) -> str:
"""Update fields on an existing journal entry. Only provided fields are changed."""
err = _check_config()
if err:
return err
return await asyncio.to_thread(_sync_journal_entry_update, entry_id, title, content, summary, tags, enable)
def _sync_journal_entry_update(
entry_id: str,
title: str,
content: str,
summary: str,
tags: str,
enable: bool | None,
) -> str:
payload: dict = {}
if title:
payload["name"] = title
if content:
payload["content"] = content
if summary:
payload["summary"] = summary
if tags:
payload["tags"] = [t.strip() for t in tags.split(",") if t.strip()]
if enable is not None:
payload["enable"] = enable
if not payload:
return "Nothing to update — no fields provided."
result = _patch_entry(entry_id, payload)
if result != "ok":
return result
updated = ", ".join(payload.keys())
return f"Journal entry `{entry_id}` updated. Fields changed: {updated}"
# ---------------------------------------------------------------------------
# Tool: ae_journal_entry_disable
# ---------------------------------------------------------------------------
async def journal_entry_disable(entry_id: str) -> str:
"""Soft-delete a journal entry by setting enable=false."""
err = _check_config()
if err:
return err
return await asyncio.to_thread(_patch_entry, entry_id, {"enable": False})
# ---------------------------------------------------------------------------
# Tool: ae_journal_entry_append
# ---------------------------------------------------------------------------
async def journal_entry_append(entry_id: str, content: str, heading: str = "") -> str:
"""Append a timestamped section to the bottom of a journal entry's content."""
err = _check_config()
if err:
return err
return await asyncio.to_thread(_sync_journal_entry_append, entry_id, content, heading)
def _sync_journal_entry_append(entry_id: str, content: str, heading: str) -> str:
from datetime import datetime, timezone
entry = _get_entry(entry_id)
if isinstance(entry, str):
return entry
existing = (entry.get("content") or "").rstrip()
ts = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M UTC")
section_heading = heading or ts
new_content = f"{existing}\n\n### {section_heading}\n{content.strip()}"
result = _patch_entry(entry_id, {"content": new_content})
if result != "ok":
return result
return f"Appended to journal entry `{entry_id}` under heading \"{section_heading}\"."
# ---------------------------------------------------------------------------
# Tool: ae_journal_entry_prepend
# ---------------------------------------------------------------------------
async def journal_entry_prepend(entry_id: str, content: str, heading: str = "") -> str:
"""Prepend a timestamped section to the top of a journal entry's content."""
err = _check_config()
if err:
return err
return await asyncio.to_thread(_sync_journal_entry_prepend, entry_id, content, heading)
def _sync_journal_entry_prepend(entry_id: str, content: str, heading: str) -> str:
from datetime import datetime, timezone
entry = _get_entry(entry_id)
if isinstance(entry, str):
return entry
existing = (entry.get("content") or "").lstrip()
ts = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M UTC")
section_heading = heading or ts
new_content = f"### {section_heading}\n{content.strip()}\n\n{existing}"
result = _patch_entry(entry_id, {"content": new_content})
if result != "ok":
return result
return f"Prepended to journal entry `{entry_id}` under heading \"{section_heading}\"."