Files
Cortex-Inara/cortex/tools/ae_knowledge.py
Scott Idem 44f215c764 feat: add ae_journal_list tool
Lists all Aether Journals for the configured account via
POST /v3/crud/journal/search with no filters (account scoped by header).
Returns name + id_random for each journal so the agent can discover
available journals before searching or writing entries.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-28 21:50:02 -04:00

224 lines
6.9 KiB
Python

"""
Aether Platform knowledge tools — journal search and entry creation.
These tools give the orchestrator read/write access to the AE Journals module,
which serves as the primary long-term knowledge base.
Auth: x-aether-api-key + x-account-id headers (same pattern as agents_sync scripts).
API: V3 CRUD — POST /v3/crud/journal_entry/search, POST /v3/crud/journal/{id}/journal_entry/
"""
import asyncio
import logging
from config import settings
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Shared helpers
# ---------------------------------------------------------------------------
def _headers() -> dict:
return {
"x-aether-api-key": settings.ae_api_key,
"x-account-id": settings.ae_account_id,
"Content-Type": "application/json",
}
def _check_config() -> str | None:
"""Return an error string if AE API is not configured, else None."""
if not settings.ae_api_key or not settings.ae_account_id:
return (
"AE API not configured. Set AE_API_KEY and AE_ACCOUNT_ID in .env. "
"Values are the same as agents_sync/mcp/.env."
)
return None
# ---------------------------------------------------------------------------
# Tool: ae_journal_search
# ---------------------------------------------------------------------------
async def journal_search(query: str, journal_id: str | None = None, max_results: int = 10) -> str:
"""Search AE Journal entries by keyword.
Searches across the default_qry_str field (title + content excerpt).
Optionally scoped to a specific journal by journal_id (id_random).
Returns a markdown-formatted list of matching entries.
"""
err = _check_config()
if err:
return err
return await asyncio.to_thread(_sync_journal_search, query, journal_id, max_results)
def _sync_journal_search(query: str, journal_id: str | None, max_results: int) -> str:
import requests
url = f"{settings.ae_api_url}/v3/crud/journal_entry/search"
search_body = {
"and_filters": [
{"field": "default_qry_str", "op": "icontains", "value": query}
],
"page_size": max_results,
}
params = {}
if journal_id:
params["for_obj_type"] = "journal"
params["for_obj_id"] = journal_id
try:
resp = requests.post(
url,
headers=_headers(),
params=params,
json=search_body,
timeout=settings.ae_api_timeout,
)
resp.raise_for_status()
data = resp.json()
except Exception as e:
logger.warning("ae_journal_search failed: %s", e)
return f"Journal search error: {e}"
entries = data.get("data", [])
if not entries:
return f"No journal entries found matching: {query}"
lines = [f"Journal entries matching **{query}** ({len(entries)} result(s)):\n"]
for entry in entries:
title = entry.get("name") or "(untitled)"
entry_id = entry.get("id_random", "")
journal_name = entry.get("journal_name") or entry.get("parent_name") or ""
summary = entry.get("summary") or ""
content_preview = (entry.get("content") or "")[:200].replace("\n", " ")
header = f"**{title}**"
if journal_name:
header += f" ({journal_name})"
if entry_id:
header += f" — id: `{entry_id}`"
lines.append(header)
if summary:
lines.append(f" Summary: {summary}")
if content_preview:
lines.append(f" {content_preview}")
lines.append("")
return "\n".join(lines).strip()
# ---------------------------------------------------------------------------
# Tool: ae_journal_list
# ---------------------------------------------------------------------------
async def journal_list() -> str:
"""List all journals accessible to the configured AE account."""
err = _check_config()
if err:
return err
return await asyncio.to_thread(_sync_journal_list)
def _sync_journal_list() -> str:
import requests
url = f"{settings.ae_api_url}/v3/crud/journal/search"
try:
resp = requests.post(
url,
headers=_headers(),
json={"page_size": 100},
timeout=settings.ae_api_timeout,
)
resp.raise_for_status()
data = resp.json()
except Exception as e:
logger.warning("ae_journal_list failed: %s", e)
return f"Journal list error: {e}"
journals = data.get("data", [])
if not journals:
return "No journals found for this account."
lines = [f"Journals ({len(journals)}):\n"]
for j in journals:
jid = j.get("journal_id") or j.get("id_random") or j.get("id") or "?"
name = j.get("name") or "(untitled)"
desc = j.get("description") or ""
line = f"- **{name}** — id: `{jid}`"
if desc:
line += f"\n {desc}"
lines.append(line)
return "\n".join(lines)
# ---------------------------------------------------------------------------
# Tool: ae_journal_entry_create
# ---------------------------------------------------------------------------
async def journal_entry_create(
journal_id: str,
title: str,
content: str,
summary: str = "",
tags: str = "",
) -> str:
"""Create a new entry in an AE Journal.
Args:
journal_id: The id_random of the target journal (use ae_journal_search to find it,
or ask the user which journal to write to).
title: Entry title (name field).
content: Full entry content (markdown supported).
summary: Optional short summary (1-2 sentences).
tags: Optional comma-separated tags.
Returns a confirmation with the new entry's id_random, or an error message.
"""
err = _check_config()
if err:
return err
return await asyncio.to_thread(
_sync_journal_entry_create, journal_id, title, content, summary, tags
)
def _sync_journal_entry_create(
journal_id: str, title: str, content: str, summary: str, tags: str
) -> str:
import requests
url = f"{settings.ae_api_url}/v3/crud/journal/{journal_id}/journal_entry/"
data: dict = {"name": title, "content": content}
if summary:
data["summary"] = summary
if tags:
data["tags"] = [t.strip() for t in tags.split(",") if t.strip()]
try:
resp = requests.post(
url,
headers=_headers(),
json=data,
timeout=settings.ae_api_timeout,
)
resp.raise_for_status()
result = resp.json()
except Exception as e:
logger.warning("ae_journal_entry_create failed: %s", e)
return f"Journal entry creation error: {e}"
entry_id = (
result.get("data", {}).get("id_random")
or result.get("id_random")
or "unknown"
)
return f"Journal entry created. id: `{entry_id}`, title: \"{title}\", journal: `{journal_id}`"