Files
Cortex-Inara/cortex/tools/ae_knowledge.py
Scott Idem 1fa5151d8a fix: correct V3 search filter key and response field names in ae_knowledge
- Filter key is "and" not "and_filters" (V3 API format)
- Entry IDs use journal_entry_id/id, not id_random (id_random is None)
- Dates use updated_on/created_on, not updated_at/created_at
- Total count lives in meta.data_list_count, not top-level total/count
- Inject query_string="%" when and filters present but no query, since
  the V3 search engine requires query_string for filters to apply
- Normalize tags from string to list in both entry_read and entries_list
- Fix order_by to use updated_on (not updated_at) in entries_list
- Correct ARCH__AE_INTEGRATION.md: and_filters → and, or_filters → or

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-30 21:12:44 -04:00

583 lines
20 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Aether Platform knowledge tools — journal search, listing, and entry management.
These tools give the orchestrator read/write access to the AE Journals module,
which serves as the primary long-term knowledge base.
Auth: x-aether-api-key + x-account-id headers (same pattern as agents_sync scripts).
API: V3 CRUD — POST /v3/crud/journal_entry/search, POST /v3/crud/journal/{id}/journal_entry/
PATCH /v3/crud/journal_entry/{entry_id}, GET /v3/crud/journal_entry/{entry_id}
"""
import asyncio
import logging
from config import settings
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Shared helpers
# ---------------------------------------------------------------------------
def _headers() -> dict:
return {
"x-aether-api-key": settings.ae_api_key,
"x-account-id": settings.ae_account_id,
"Content-Type": "application/json",
}
def _check_config() -> str | None:
"""Return an error string if AE API is not configured, else None."""
if not settings.ae_api_key or not settings.ae_account_id:
return (
"AE API not configured. Set AE_API_KEY and AE_ACCOUNT_ID in .env. "
"Values are the same as agents_sync/mcp/.env."
)
return None
# ---------------------------------------------------------------------------
# Tool: ae_journal_search
# ---------------------------------------------------------------------------
async def journal_search(
query: str = "",
journal_id: str = "",
tags: str = "",
type_code: str = "",
topic_code: str = "",
date_from: str = "",
date_to: str = "",
sort_by: str = "updated",
sort_order: str = "desc",
status: int | None = None,
priority: int | None = None,
max_results: int = 10,
page: int = 1,
) -> str:
"""Search AE Journal entries.
At least one of query, tags, type_code, topic_code, date_from, or journal_id
should be provided. All filters combine with AND.
"""
err = _check_config()
if err:
return err
return await asyncio.to_thread(
_sync_journal_search,
query, journal_id, tags, type_code, topic_code,
date_from, date_to, sort_by, sort_order,
status, priority, max_results, page,
)
def _sync_journal_search(
query: str,
journal_id: str,
tags: str,
type_code: str,
topic_code: str,
date_from: str,
date_to: str,
sort_by: str,
sort_order: str,
status: int | None,
priority: int | None,
max_results: int,
page: int,
) -> str:
import requests
# Build sort field
sort_field_map = {
"updated": "updated_on",
"created": "created_on",
"name": "name",
"priority": "priority",
}
sort_field = sort_field_map.get(sort_by, "updated_on")
order_by = f"{'-' if sort_order == 'desc' else ''}{sort_field}"
search_body: dict = {"page_size": max_results, "page": page, "order_by": order_by}
# Fulltext keyword — uses MATCH/AGAINST index
if query:
search_body["query_string"] = query
# Additional AND filters
and_filters: list[dict] = []
if tags:
and_filters.append({"field": "tags", "op": "icontains", "value": tags})
if type_code:
and_filters.append({"field": "type_code", "op": "eq", "value": type_code})
if topic_code:
and_filters.append({"field": "topic_code", "op": "eq", "value": topic_code})
if date_from:
and_filters.append({"field": "created_on", "op": "gte", "value": date_from})
if date_to:
and_filters.append({"field": "created_on", "op": "lte", "value": date_to})
if status is not None:
and_filters.append({"field": "status", "op": "eq", "value": status})
if priority is not None:
and_filters.append({"field": "priority", "op": "eq", "value": priority})
if and_filters:
search_body["and"] = and_filters
# query_string must be present for `and` filters to apply
if "query_string" not in search_body:
search_body["query_string"] = "%"
params: dict = {}
if journal_id:
params["for_obj_type"] = "journal"
params["for_obj_id"] = journal_id
url = f"{settings.ae_api_url}/v3/crud/journal_entry/search"
try:
resp = requests.post(
url,
headers=_headers(),
params=params,
json=search_body,
timeout=settings.ae_api_timeout,
)
resp.raise_for_status()
data = resp.json()
except Exception as e:
logger.warning("ae_journal_search failed: %s", e)
return f"Journal search error: {e}"
entries = data.get("data", [])
total = (data.get("meta") or {}).get("data_list_count") or len(entries)
if not entries:
desc = query or tags or type_code or topic_code or f"journal {journal_id}"
return f"No journal entries found for: {desc}"
label = query or tags or f"{len(entries)} entries"
lines = [f"Journal entries — **{label}** ({total} total, page {page}):\n"]
for entry in entries:
title = entry.get("name") or "(untitled)"
entry_id = entry.get("journal_entry_id") or entry.get("id") or ""
journal_name = entry.get("journal_name") or entry.get("parent_name") or ""
summary = entry.get("summary") or ""
entry_tags = entry.get("tags") or []
updated = (entry.get("updated_on") or entry.get("created_on") or "")[:10]
content_preview = (entry.get("content") or "")[:400].replace("\n", " ")
header = f"**{title}**"
if journal_name:
header += f" ({journal_name})"
header += f" — id: `{entry_id}`"
if updated:
header += f" [{updated}]"
lines.append(header)
if entry_tags:
tag_list = entry_tags if isinstance(entry_tags, list) else [t.strip() for t in str(entry_tags).split(",")]
lines.append(f" Tags: {', '.join(tag_list)}")
if summary:
lines.append(f" {summary}")
elif content_preview:
lines.append(f" {content_preview}{'' if len(entry.get('content', '')) > 400 else ''}")
lines.append("")
if total > page * max_results:
lines.append(f"(More results — call again with page={page + 1})")
return "\n".join(lines).strip()
# ---------------------------------------------------------------------------
# Tool: ae_journal_list
# ---------------------------------------------------------------------------
async def journal_list() -> str:
"""List all journals accessible to the configured AE account."""
err = _check_config()
if err:
return err
return await asyncio.to_thread(_sync_journal_list)
def _sync_journal_list() -> str:
import requests
url = f"{settings.ae_api_url}/v3/crud/journal/search"
try:
resp = requests.post(
url,
headers=_headers(),
json={"page_size": 100},
timeout=settings.ae_api_timeout,
)
resp.raise_for_status()
data = resp.json()
except Exception as e:
logger.warning("ae_journal_list failed: %s", e)
return f"Journal list error: {e}"
journals = data.get("data", [])
if not journals:
return "No journals found for this account."
lines = [f"Journals ({len(journals)}):\n"]
for j in journals:
jid = j.get("journal_id") or j.get("id_random") or j.get("id") or "?"
name = j.get("name") or "(untitled)"
desc = j.get("description") or ""
line = f"- **{name}** — id: `{jid}`"
if desc:
line += f"\n {desc}"
lines.append(line)
return "\n".join(lines)
# ---------------------------------------------------------------------------
# Tool: ae_journal_entry_create
# ---------------------------------------------------------------------------
async def journal_entry_create(
journal_id: str,
title: str,
content: str,
summary: str = "",
tags: str = "",
) -> str:
"""Create a new entry in an AE Journal.
Args:
journal_id: The id_random of the target journal (use ae_journal_search to find it,
or ask the user which journal to write to).
title: Entry title (name field).
content: Full entry content (markdown supported).
summary: Optional short summary (1-2 sentences).
tags: Optional comma-separated tags.
Returns a confirmation with the new entry's id_random, or an error message.
"""
err = _check_config()
if err:
return err
return await asyncio.to_thread(
_sync_journal_entry_create, journal_id, title, content, summary, tags
)
def _sync_journal_entry_create(
journal_id: str, title: str, content: str, summary: str, tags: str
) -> str:
import requests
url = f"{settings.ae_api_url}/v3/crud/journal/{journal_id}/journal_entry/"
data: dict = {"name": title, "content": content}
if summary:
data["summary"] = summary
if tags:
data["tags"] = [t.strip() for t in tags.split(",") if t.strip()]
try:
resp = requests.post(
url,
headers=_headers(),
json=data,
timeout=settings.ae_api_timeout,
)
resp.raise_for_status()
result = resp.json()
except Exception as e:
logger.warning("ae_journal_entry_create failed: %s", e)
return f"Journal entry creation error: {e}"
entry_id = (
result.get("data", {}).get("journal_entry_id")
or result.get("data", {}).get("id_random")
or result.get("id_random")
or "unknown"
)
return f"Journal entry created. id: `{entry_id}`, title: \"{title}\", journal: `{journal_id}`"
# ---------------------------------------------------------------------------
# Shared helper: fetch a single journal entry by id
# ---------------------------------------------------------------------------
def _get_entry(entry_id: str) -> dict | str:
"""Return the entry dict, or an error string on failure."""
import requests
url = f"{settings.ae_api_url}/v3/crud/journal_entry/{entry_id}"
try:
resp = requests.get(url, headers=_headers(), timeout=settings.ae_api_timeout)
resp.raise_for_status()
data = resp.json()
entry = data.get("data") or data
if not isinstance(entry, dict):
return f"Unexpected response shape for entry {entry_id}"
return entry
except Exception as e:
logger.warning("_get_entry %s failed: %s", entry_id, e)
return f"Error fetching entry {entry_id}: {e}"
def _patch_entry(entry_id: str, payload: dict) -> str:
"""PATCH a journal entry. Returns a success/error string."""
import requests
url = f"{settings.ae_api_url}/v3/crud/journal_entry/{entry_id}"
try:
resp = requests.patch(
url,
headers=_headers(),
json=payload,
timeout=settings.ae_api_timeout,
)
resp.raise_for_status()
return "ok"
except Exception as e:
logger.warning("_patch_entry %s failed: %s", entry_id, e)
return f"Error updating entry {entry_id}: {e}"
# ---------------------------------------------------------------------------
# Tool: ae_journal_entry_read
# ---------------------------------------------------------------------------
async def journal_entry_read(entry_id: str, max_content_chars: int = 4000) -> str:
"""Return the full content of a single journal entry by its id_random."""
err = _check_config()
if err:
return err
return await asyncio.to_thread(_sync_journal_entry_read, entry_id, max_content_chars)
def _sync_journal_entry_read(entry_id: str, max_content_chars: int) -> str:
entry = _get_entry(entry_id)
if isinstance(entry, str):
return entry
title = entry.get("name") or "(untitled)"
journal = entry.get("journal_name") or entry.get("parent_name") or ""
summary = entry.get("summary") or ""
raw_tags = entry.get("tags") or []
tags = raw_tags if isinstance(raw_tags, list) else [t.strip() for t in str(raw_tags).split(",") if t.strip()]
content = entry.get("content") or ""
updated = (entry.get("updated_on") or entry.get("created_on") or "")[:19].replace("T", " ")
enabled = entry.get("enable", True)
lines = [f"# {title}"]
meta: list[str] = [f"id: `{entry_id}`"]
if journal:
meta.append(f"journal: {journal}")
if updated:
meta.append(f"updated: {updated}")
if not enabled:
meta.append("**DISABLED**")
lines.append(" ".join(meta))
if tags:
lines.append(f"Tags: {', '.join(tags)}")
if summary:
lines.append(f"\nSummary: {summary}")
lines.append("\n---\n")
truncated = len(content) > max_content_chars
lines.append(content[:max_content_chars])
if truncated:
lines.append(
f"\n\n[Content truncated at {max_content_chars} chars — "
f"{len(content)} total. Call again with a higher max_content_chars to read more.]"
)
return "\n".join(lines)
# ---------------------------------------------------------------------------
# Tool: ae_journal_entries_list
# ---------------------------------------------------------------------------
async def journal_entries_list(journal_id: str, max_results: int = 20, page: int = 1) -> str:
"""List entries in a specific journal, newest first."""
err = _check_config()
if err:
return err
return await asyncio.to_thread(_sync_journal_entries_list, journal_id, max_results, page)
def _sync_journal_entries_list(journal_id: str, max_results: int, page: int) -> str:
import requests
url = f"{settings.ae_api_url}/v3/crud/journal_entry/search"
search_body: dict = {
"page_size": max_results,
"page": page,
"order_by": "-updated_on",
}
params = {"for_obj_type": "journal", "for_obj_id": journal_id}
try:
resp = requests.post(
url,
headers=_headers(),
params=params,
json=search_body,
timeout=settings.ae_api_timeout,
)
resp.raise_for_status()
data = resp.json()
except Exception as e:
logger.warning("ae_journal_entries_list failed: %s", e)
return f"Journal entries list error: {e}"
entries = data.get("data", [])
total = (data.get("meta") or {}).get("data_list_count") or len(entries)
if not entries:
return f"No entries found in journal `{journal_id}`."
offset = (page - 1) * max_results + 1
lines = [f"Entries in journal `{journal_id}` — showing {offset}{offset + len(entries) - 1} of {total}:\n"]
for i, entry in enumerate(entries, offset):
title = entry.get("name") or "(untitled)"
entry_id = entry.get("journal_entry_id") or entry.get("id") or ""
raw_tags = entry.get("tags") or []
tags = raw_tags if isinstance(raw_tags, list) else [t.strip() for t in str(raw_tags).split(",") if t.strip()]
summary = entry.get("summary") or ""
updated = (entry.get("updated_on") or entry.get("created_on") or "")[:10]
enabled = entry.get("enable", True)
status = "" if enabled else " [disabled]"
date_str = f" [{updated}]" if updated else ""
lines.append(f"{i}. **{title}**{status} — id: `{entry_id}`{date_str}")
if tags:
lines.append(f" Tags: {', '.join(tags)}")
if summary:
lines.append(f" {summary[:150]}{'' if len(summary) > 150 else ''}")
lines.append("")
if total > offset + len(entries) - 1:
lines.append(f"(More entries available — call again with page={page + 1})")
return "\n".join(lines).rstrip()
# ---------------------------------------------------------------------------
# Tool: ae_journal_entry_update
# ---------------------------------------------------------------------------
async def journal_entry_update(
entry_id: str,
title: str = "",
content: str = "",
summary: str = "",
tags: str = "",
enable: bool | None = None,
) -> str:
"""Update fields on an existing journal entry. Only provided fields are changed."""
err = _check_config()
if err:
return err
return await asyncio.to_thread(_sync_journal_entry_update, entry_id, title, content, summary, tags, enable)
def _sync_journal_entry_update(
entry_id: str,
title: str,
content: str,
summary: str,
tags: str,
enable: bool | None,
) -> str:
payload: dict = {}
if title:
payload["name"] = title
if content:
payload["content"] = content
if summary:
payload["summary"] = summary
if tags:
payload["tags"] = [t.strip() for t in tags.split(",") if t.strip()]
if enable is not None:
payload["enable"] = enable
if not payload:
return "Nothing to update — no fields provided."
result = _patch_entry(entry_id, payload)
if result != "ok":
return result
updated = ", ".join(payload.keys())
return f"Journal entry `{entry_id}` updated. Fields changed: {updated}"
# ---------------------------------------------------------------------------
# Tool: ae_journal_entry_disable
# ---------------------------------------------------------------------------
async def journal_entry_disable(entry_id: str) -> str:
"""Soft-delete a journal entry by setting enable=false."""
err = _check_config()
if err:
return err
return await asyncio.to_thread(_patch_entry, entry_id, {"enable": False})
# ---------------------------------------------------------------------------
# Tool: ae_journal_entry_append
# ---------------------------------------------------------------------------
async def journal_entry_append(entry_id: str, content: str, heading: str = "") -> str:
"""Append a timestamped section to the bottom of a journal entry's content."""
err = _check_config()
if err:
return err
return await asyncio.to_thread(_sync_journal_entry_append, entry_id, content, heading)
def _sync_journal_entry_append(entry_id: str, content: str, heading: str) -> str:
from datetime import datetime, timezone
entry = _get_entry(entry_id)
if isinstance(entry, str):
return entry
existing = (entry.get("content") or "").rstrip()
ts = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M UTC")
section_heading = heading or ts
new_content = f"{existing}\n\n### {section_heading}\n{content.strip()}"
result = _patch_entry(entry_id, {"content": new_content})
if result != "ok":
return result
return f"Appended to journal entry `{entry_id}` under heading \"{section_heading}\"."
# ---------------------------------------------------------------------------
# Tool: ae_journal_entry_prepend
# ---------------------------------------------------------------------------
async def journal_entry_prepend(entry_id: str, content: str, heading: str = "") -> str:
"""Prepend a timestamped section to the top of a journal entry's content."""
err = _check_config()
if err:
return err
return await asyncio.to_thread(_sync_journal_entry_prepend, entry_id, content, heading)
def _sync_journal_entry_prepend(entry_id: str, content: str, heading: str) -> str:
from datetime import datetime, timezone
entry = _get_entry(entry_id)
if isinstance(entry, str):
return entry
existing = (entry.get("content") or "").lstrip()
ts = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M UTC")
section_heading = heading or ts
new_content = f"### {section_heading}\n{content.strip()}\n\n{existing}"
result = _patch_entry(entry_id, {"content": new_content})
if result != "ok":
return result
return f"Prepended to journal entry `{entry_id}` under heading \"{section_heading}\"."