Files
Cortex-Inara/cortex/tools/ae_knowledge.py
Scott Idem 71e472bebe feat: improved ae_journal_search + AE integration docs
Search improvements:
- Switched from LIKE on default_qry_str to query_string path (fulltext
  MATCH/AGAINST IN BOOLEAN MODE — uses the index, supports +/- boolean ops)
- Added tag filter (icontains on tags field)
- Added date_from / date_to filters (created_on gte/lte)
- Added type_code / topic_code exact-match filters
- Added sort_by / sort_order control (updated, created, name, priority)
- Added status / priority filters
- Added page parameter for pagination
- Richer output: updated date, tags, pagination hint
- Updated Gemini tool declaration with all new params

Docs:
- documentation/ARCH__AE_INTEGRATION.md — journal_entry full schema,
  search operator reference, current tool inventory, planned phases
  (broader AE integration: tasks, people, calendar, knowledge import)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-30 20:10:04 -04:00

579 lines
19 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Aether Platform knowledge tools — journal search, listing, and entry management.
These tools give the orchestrator read/write access to the AE Journals module,
which serves as the primary long-term knowledge base.
Auth: x-aether-api-key + x-account-id headers (same pattern as agents_sync scripts).
API: V3 CRUD — POST /v3/crud/journal_entry/search, POST /v3/crud/journal/{id}/journal_entry/
PATCH /v3/crud/journal_entry/{entry_id}, GET /v3/crud/journal_entry/{entry_id}
"""
import asyncio
import logging
from config import settings
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Shared helpers
# ---------------------------------------------------------------------------
def _headers() -> dict:
return {
"x-aether-api-key": settings.ae_api_key,
"x-account-id": settings.ae_account_id,
"Content-Type": "application/json",
}
def _check_config() -> str | None:
"""Return an error string if AE API is not configured, else None."""
if not settings.ae_api_key or not settings.ae_account_id:
return (
"AE API not configured. Set AE_API_KEY and AE_ACCOUNT_ID in .env. "
"Values are the same as agents_sync/mcp/.env."
)
return None
# ---------------------------------------------------------------------------
# Tool: ae_journal_search
# ---------------------------------------------------------------------------
async def journal_search(
query: str = "",
journal_id: str = "",
tags: str = "",
type_code: str = "",
topic_code: str = "",
date_from: str = "",
date_to: str = "",
sort_by: str = "updated",
sort_order: str = "desc",
status: int | None = None,
priority: int | None = None,
max_results: int = 10,
page: int = 1,
) -> str:
"""Search AE Journal entries.
At least one of query, tags, type_code, topic_code, date_from, or journal_id
should be provided. All filters combine with AND.
"""
err = _check_config()
if err:
return err
return await asyncio.to_thread(
_sync_journal_search,
query, journal_id, tags, type_code, topic_code,
date_from, date_to, sort_by, sort_order,
status, priority, max_results, page,
)
def _sync_journal_search(
query: str,
journal_id: str,
tags: str,
type_code: str,
topic_code: str,
date_from: str,
date_to: str,
sort_by: str,
sort_order: str,
status: int | None,
priority: int | None,
max_results: int,
page: int,
) -> str:
import requests
# Build sort field
sort_field_map = {
"updated": "updated_on",
"created": "created_on",
"name": "name",
"priority": "priority",
}
sort_field = sort_field_map.get(sort_by, "updated_on")
order_by = f"{'-' if sort_order == 'desc' else ''}{sort_field}"
search_body: dict = {"page_size": max_results, "page": page, "order_by": order_by}
# Fulltext keyword — uses MATCH/AGAINST index
if query:
search_body["query_string"] = query
# Additional AND filters
and_filters: list[dict] = []
if tags:
and_filters.append({"field": "tags", "op": "icontains", "value": tags})
if type_code:
and_filters.append({"field": "type_code", "op": "eq", "value": type_code})
if topic_code:
and_filters.append({"field": "topic_code", "op": "eq", "value": topic_code})
if date_from:
and_filters.append({"field": "created_on", "op": "gte", "value": date_from})
if date_to:
and_filters.append({"field": "created_on", "op": "lte", "value": date_to})
if status is not None:
and_filters.append({"field": "status", "op": "eq", "value": status})
if priority is not None:
and_filters.append({"field": "priority", "op": "eq", "value": priority})
if and_filters:
search_body["and_filters"] = and_filters
params: dict = {}
if journal_id:
params["for_obj_type"] = "journal"
params["for_obj_id"] = journal_id
url = f"{settings.ae_api_url}/v3/crud/journal_entry/search"
try:
resp = requests.post(
url,
headers=_headers(),
params=params,
json=search_body,
timeout=settings.ae_api_timeout,
)
resp.raise_for_status()
data = resp.json()
except Exception as e:
logger.warning("ae_journal_search failed: %s", e)
return f"Journal search error: {e}"
entries = data.get("data", [])
total = data.get("total") or data.get("count") or len(entries)
if not entries:
desc = query or tags or type_code or topic_code or f"journal {journal_id}"
return f"No journal entries found for: {desc}"
label = query or tags or f"{len(entries)} entries"
lines = [f"Journal entries — **{label}** ({total} total, page {page}):\n"]
for entry in entries:
title = entry.get("name") or "(untitled)"
entry_id = entry.get("id_random", "")
journal_name = entry.get("journal_name") or entry.get("parent_name") or ""
summary = entry.get("summary") or ""
entry_tags = entry.get("tags") or []
updated = (entry.get("updated_on") or entry.get("updated_at") or
entry.get("created_on") or entry.get("created_at") or "")[:10]
content_preview = (entry.get("content") or "")[:400].replace("\n", " ")
header = f"**{title}**"
if journal_name:
header += f" ({journal_name})"
header += f" — id: `{entry_id}`"
if updated:
header += f" [{updated}]"
lines.append(header)
if entry_tags:
tag_list = entry_tags if isinstance(entry_tags, list) else [t.strip() for t in str(entry_tags).split(",")]
lines.append(f" Tags: {', '.join(tag_list)}")
if summary:
lines.append(f" {summary}")
elif content_preview:
lines.append(f" {content_preview}{'' if len(entry.get('content', '')) > 400 else ''}")
lines.append("")
if total > page * max_results:
lines.append(f"(More results — call again with page={page + 1})")
return "\n".join(lines).strip()
# ---------------------------------------------------------------------------
# Tool: ae_journal_list
# ---------------------------------------------------------------------------
async def journal_list() -> str:
"""List all journals accessible to the configured AE account."""
err = _check_config()
if err:
return err
return await asyncio.to_thread(_sync_journal_list)
def _sync_journal_list() -> str:
import requests
url = f"{settings.ae_api_url}/v3/crud/journal/search"
try:
resp = requests.post(
url,
headers=_headers(),
json={"page_size": 100},
timeout=settings.ae_api_timeout,
)
resp.raise_for_status()
data = resp.json()
except Exception as e:
logger.warning("ae_journal_list failed: %s", e)
return f"Journal list error: {e}"
journals = data.get("data", [])
if not journals:
return "No journals found for this account."
lines = [f"Journals ({len(journals)}):\n"]
for j in journals:
jid = j.get("journal_id") or j.get("id_random") or j.get("id") or "?"
name = j.get("name") or "(untitled)"
desc = j.get("description") or ""
line = f"- **{name}** — id: `{jid}`"
if desc:
line += f"\n {desc}"
lines.append(line)
return "\n".join(lines)
# ---------------------------------------------------------------------------
# Tool: ae_journal_entry_create
# ---------------------------------------------------------------------------
async def journal_entry_create(
journal_id: str,
title: str,
content: str,
summary: str = "",
tags: str = "",
) -> str:
"""Create a new entry in an AE Journal.
Args:
journal_id: The id_random of the target journal (use ae_journal_search to find it,
or ask the user which journal to write to).
title: Entry title (name field).
content: Full entry content (markdown supported).
summary: Optional short summary (1-2 sentences).
tags: Optional comma-separated tags.
Returns a confirmation with the new entry's id_random, or an error message.
"""
err = _check_config()
if err:
return err
return await asyncio.to_thread(
_sync_journal_entry_create, journal_id, title, content, summary, tags
)
def _sync_journal_entry_create(
journal_id: str, title: str, content: str, summary: str, tags: str
) -> str:
import requests
url = f"{settings.ae_api_url}/v3/crud/journal/{journal_id}/journal_entry/"
data: dict = {"name": title, "content": content}
if summary:
data["summary"] = summary
if tags:
data["tags"] = [t.strip() for t in tags.split(",") if t.strip()]
try:
resp = requests.post(
url,
headers=_headers(),
json=data,
timeout=settings.ae_api_timeout,
)
resp.raise_for_status()
result = resp.json()
except Exception as e:
logger.warning("ae_journal_entry_create failed: %s", e)
return f"Journal entry creation error: {e}"
entry_id = (
result.get("data", {}).get("journal_entry_id")
or result.get("data", {}).get("id_random")
or result.get("id_random")
or "unknown"
)
return f"Journal entry created. id: `{entry_id}`, title: \"{title}\", journal: `{journal_id}`"
# ---------------------------------------------------------------------------
# Shared helper: fetch a single journal entry by id
# ---------------------------------------------------------------------------
def _get_entry(entry_id: str) -> dict | str:
"""Return the entry dict, or an error string on failure."""
import requests
url = f"{settings.ae_api_url}/v3/crud/journal_entry/{entry_id}"
try:
resp = requests.get(url, headers=_headers(), timeout=settings.ae_api_timeout)
resp.raise_for_status()
data = resp.json()
entry = data.get("data") or data
if not isinstance(entry, dict):
return f"Unexpected response shape for entry {entry_id}"
return entry
except Exception as e:
logger.warning("_get_entry %s failed: %s", entry_id, e)
return f"Error fetching entry {entry_id}: {e}"
def _patch_entry(entry_id: str, payload: dict) -> str:
"""PATCH a journal entry. Returns a success/error string."""
import requests
url = f"{settings.ae_api_url}/v3/crud/journal_entry/{entry_id}"
try:
resp = requests.patch(
url,
headers=_headers(),
json=payload,
timeout=settings.ae_api_timeout,
)
resp.raise_for_status()
return "ok"
except Exception as e:
logger.warning("_patch_entry %s failed: %s", entry_id, e)
return f"Error updating entry {entry_id}: {e}"
# ---------------------------------------------------------------------------
# Tool: ae_journal_entry_read
# ---------------------------------------------------------------------------
async def journal_entry_read(entry_id: str, max_content_chars: int = 4000) -> str:
"""Return the full content of a single journal entry by its id_random."""
err = _check_config()
if err:
return err
return await asyncio.to_thread(_sync_journal_entry_read, entry_id, max_content_chars)
def _sync_journal_entry_read(entry_id: str, max_content_chars: int) -> str:
entry = _get_entry(entry_id)
if isinstance(entry, str):
return entry
title = entry.get("name") or "(untitled)"
journal = entry.get("journal_name") or entry.get("parent_name") or ""
summary = entry.get("summary") or ""
tags = entry.get("tags") or []
content = entry.get("content") or ""
updated = (entry.get("updated_at") or entry.get("created_at") or "")[:19].replace("T", " ")
enabled = entry.get("enable", True)
lines = [f"# {title}"]
meta: list[str] = [f"id: `{entry_id}`"]
if journal:
meta.append(f"journal: {journal}")
if updated:
meta.append(f"updated: {updated}")
if not enabled:
meta.append("**DISABLED**")
lines.append(" ".join(meta))
if tags:
lines.append(f"Tags: {', '.join(tags)}")
if summary:
lines.append(f"\nSummary: {summary}")
lines.append("\n---\n")
truncated = len(content) > max_content_chars
lines.append(content[:max_content_chars])
if truncated:
lines.append(
f"\n\n[Content truncated at {max_content_chars} chars — "
f"{len(content)} total. Call again with a higher max_content_chars to read more.]"
)
return "\n".join(lines)
# ---------------------------------------------------------------------------
# Tool: ae_journal_entries_list
# ---------------------------------------------------------------------------
async def journal_entries_list(journal_id: str, max_results: int = 20, page: int = 1) -> str:
"""List entries in a specific journal, newest first."""
err = _check_config()
if err:
return err
return await asyncio.to_thread(_sync_journal_entries_list, journal_id, max_results, page)
def _sync_journal_entries_list(journal_id: str, max_results: int, page: int) -> str:
import requests
url = f"{settings.ae_api_url}/v3/crud/journal_entry/search"
search_body: dict = {
"page_size": max_results,
"page": page,
"order_by": "-updated_at",
}
params = {"for_obj_type": "journal", "for_obj_id": journal_id}
try:
resp = requests.post(
url,
headers=_headers(),
params=params,
json=search_body,
timeout=settings.ae_api_timeout,
)
resp.raise_for_status()
data = resp.json()
except Exception as e:
logger.warning("ae_journal_entries_list failed: %s", e)
return f"Journal entries list error: {e}"
entries = data.get("data", [])
total = data.get("total") or data.get("count") or len(entries)
if not entries:
return f"No entries found in journal `{journal_id}`."
offset = (page - 1) * max_results + 1
lines = [f"Entries in journal `{journal_id}` — showing {offset}{offset + len(entries) - 1} of {total}:\n"]
for i, entry in enumerate(entries, offset):
title = entry.get("name") or "(untitled)"
entry_id = entry.get("id_random", "")
tags = entry.get("tags") or []
summary = entry.get("summary") or ""
updated = (entry.get("updated_at") or entry.get("created_at") or "")[:10]
enabled = entry.get("enable", True)
status = "" if enabled else " [disabled]"
date_str = f" [{updated}]" if updated else ""
lines.append(f"{i}. **{title}**{status} — id: `{entry_id}`{date_str}")
if tags:
lines.append(f" Tags: {', '.join(tags)}")
if summary:
lines.append(f" {summary[:150]}{'' if len(summary) > 150 else ''}")
lines.append("")
if total > offset + len(entries) - 1:
lines.append(f"(More entries available — call again with page={page + 1})")
return "\n".join(lines).rstrip()
# ---------------------------------------------------------------------------
# Tool: ae_journal_entry_update
# ---------------------------------------------------------------------------
async def journal_entry_update(
entry_id: str,
title: str = "",
content: str = "",
summary: str = "",
tags: str = "",
enable: bool | None = None,
) -> str:
"""Update fields on an existing journal entry. Only provided fields are changed."""
err = _check_config()
if err:
return err
return await asyncio.to_thread(_sync_journal_entry_update, entry_id, title, content, summary, tags, enable)
def _sync_journal_entry_update(
entry_id: str,
title: str,
content: str,
summary: str,
tags: str,
enable: bool | None,
) -> str:
payload: dict = {}
if title:
payload["name"] = title
if content:
payload["content"] = content
if summary:
payload["summary"] = summary
if tags:
payload["tags"] = [t.strip() for t in tags.split(",") if t.strip()]
if enable is not None:
payload["enable"] = enable
if not payload:
return "Nothing to update — no fields provided."
result = _patch_entry(entry_id, payload)
if result != "ok":
return result
updated = ", ".join(payload.keys())
return f"Journal entry `{entry_id}` updated. Fields changed: {updated}"
# ---------------------------------------------------------------------------
# Tool: ae_journal_entry_disable
# ---------------------------------------------------------------------------
async def journal_entry_disable(entry_id: str) -> str:
"""Soft-delete a journal entry by setting enable=false."""
err = _check_config()
if err:
return err
return await asyncio.to_thread(_patch_entry, entry_id, {"enable": False})
# ---------------------------------------------------------------------------
# Tool: ae_journal_entry_append
# ---------------------------------------------------------------------------
async def journal_entry_append(entry_id: str, content: str, heading: str = "") -> str:
"""Append a timestamped section to the bottom of a journal entry's content."""
err = _check_config()
if err:
return err
return await asyncio.to_thread(_sync_journal_entry_append, entry_id, content, heading)
def _sync_journal_entry_append(entry_id: str, content: str, heading: str) -> str:
from datetime import datetime, timezone
entry = _get_entry(entry_id)
if isinstance(entry, str):
return entry
existing = (entry.get("content") or "").rstrip()
ts = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M UTC")
section_heading = heading or ts
new_content = f"{existing}\n\n### {section_heading}\n{content.strip()}"
result = _patch_entry(entry_id, {"content": new_content})
if result != "ok":
return result
return f"Appended to journal entry `{entry_id}` under heading \"{section_heading}\"."
# ---------------------------------------------------------------------------
# Tool: ae_journal_entry_prepend
# ---------------------------------------------------------------------------
async def journal_entry_prepend(entry_id: str, content: str, heading: str = "") -> str:
"""Prepend a timestamped section to the top of a journal entry's content."""
err = _check_config()
if err:
return err
return await asyncio.to_thread(_sync_journal_entry_prepend, entry_id, content, heading)
def _sync_journal_entry_prepend(entry_id: str, content: str, heading: str) -> str:
from datetime import datetime, timezone
entry = _get_entry(entry_id)
if isinstance(entry, str):
return entry
existing = (entry.get("content") or "").lstrip()
ts = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M UTC")
section_heading = heading or ts
new_content = f"### {section_heading}\n{content.strip()}\n\n{existing}"
result = _patch_entry(entry_id, {"content": new_content})
if result != "ok":
return result
return f"Prepended to journal entry `{entry_id}` under heading \"{section_heading}\"."