""" Aether Platform knowledge tools — journal search and entry creation. These tools give the orchestrator read/write access to the AE Journals module, which serves as the primary long-term knowledge base. Auth: x-aether-api-key + x-account-id headers (same pattern as agents_sync scripts). API: V3 CRUD — POST /v3/crud/journal_entry/search, POST /v3/crud/journal/{id}/journal_entry/ """ import asyncio import logging from config import settings logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # Shared helpers # --------------------------------------------------------------------------- def _headers() -> dict: return { "x-aether-api-key": settings.ae_api_key, "x-account-id": settings.ae_account_id, "Content-Type": "application/json", } def _check_config() -> str | None: """Return an error string if AE API is not configured, else None.""" if not settings.ae_api_key or not settings.ae_account_id: return ( "AE API not configured. Set AE_API_KEY and AE_ACCOUNT_ID in .env. " "Values are the same as agents_sync/mcp/.env." ) return None # --------------------------------------------------------------------------- # Tool: ae_journal_search # --------------------------------------------------------------------------- async def journal_search(query: str, journal_id: str | None = None, max_results: int = 10) -> str: """Search AE Journal entries by keyword. Searches across the default_qry_str field (title + content excerpt). Optionally scoped to a specific journal by journal_id (id_random). Returns a markdown-formatted list of matching entries. """ err = _check_config() if err: return err return await asyncio.to_thread(_sync_journal_search, query, journal_id, max_results) def _sync_journal_search(query: str, journal_id: str | None, max_results: int) -> str: import requests url = f"{settings.ae_api_url}/v3/crud/journal_entry/search" search_body = { "and_filters": [ {"field": "default_qry_str", "op": "icontains", "value": query} ], "page_size": max_results, } params = {} if journal_id: params["for_obj_type"] = "journal" params["for_obj_id"] = journal_id try: resp = requests.post( url, headers=_headers(), params=params, json=search_body, timeout=settings.ae_api_timeout, ) resp.raise_for_status() data = resp.json() except Exception as e: logger.warning("ae_journal_search failed: %s", e) return f"Journal search error: {e}" entries = data.get("data", []) if not entries: return f"No journal entries found matching: {query}" lines = [f"Journal entries matching **{query}** ({len(entries)} result(s)):\n"] for entry in entries: title = entry.get("name") or "(untitled)" entry_id = entry.get("id_random", "") journal_name = entry.get("journal_name") or entry.get("parent_name") or "" summary = entry.get("summary") or "" content_preview = (entry.get("content") or "")[:200].replace("\n", " ") header = f"**{title}**" if journal_name: header += f" ({journal_name})" if entry_id: header += f" — id: `{entry_id}`" lines.append(header) if summary: lines.append(f" Summary: {summary}") if content_preview: lines.append(f" {content_preview}…") lines.append("") return "\n".join(lines).strip() # --------------------------------------------------------------------------- # Tool: ae_journal_entry_create # --------------------------------------------------------------------------- async def journal_entry_create( journal_id: str, title: str, content: str, summary: str = "", tags: str = "", ) -> str: """Create a new entry in an AE Journal. Args: journal_id: The id_random of the target journal (use ae_journal_search to find it, or ask the user which journal to write to). title: Entry title (name field). content: Full entry content (markdown supported). summary: Optional short summary (1-2 sentences). tags: Optional comma-separated tags. Returns a confirmation with the new entry's id_random, or an error message. """ err = _check_config() if err: return err return await asyncio.to_thread( _sync_journal_entry_create, journal_id, title, content, summary, tags ) def _sync_journal_entry_create( journal_id: str, title: str, content: str, summary: str, tags: str ) -> str: import requests url = f"{settings.ae_api_url}/v3/crud/journal/{journal_id}/journal_entry/" data: dict = {"name": title, "content": content} if summary: data["summary"] = summary if tags: data["tags"] = [t.strip() for t in tags.split(",") if t.strip()] try: resp = requests.post( url, headers=_headers(), json=data, timeout=settings.ae_api_timeout, ) resp.raise_for_status() result = resp.json() except Exception as e: logger.warning("ae_journal_entry_create failed: %s", e) return f"Journal entry creation error: {e}" entry_id = ( result.get("data", {}).get("id_random") or result.get("id_random") or "unknown" ) return f"Journal entry created. id: `{entry_id}`, title: \"{title}\", journal: `{journal_id}`"