Files
Cortex-Inara/cortex/tools/ae_knowledge.py
Scott Idem eab92d876d refactor: split tool declarations into domain files + role config UI
tools/__init__.py shrinks from 1,137 → 250 lines. Each domain file now
owns both its callables and its FunctionDeclarations (DECLARATIONS list),
so adding a new tool only touches one file.

New TOOL_CATEGORIES dict exported from __init__ — used by the UI for
grouped tool checkboxes.

Role config UI (Settings → Model Registry → Role Assignments):
- ⚙ button per role expands an inline configure panel
- Textarea for system_append (injected into system prompt for this role)
- Grouped checkboxes for tool allow-list (all checked = no restriction)
- POST /api/models/role-config saves both fields; updates ROLE_CONFIG_DATA
  in-page so re-open reflects current state without a page reload

Backend:
- model_registry.set_role_config() writes system_append + tools to registry
- TOOL_CATEGORIES exported from tools/__init__ for UI rendering
- TOOLS.md header updated: 30 → 39 tools (ae_journal_* and cortex_* additions)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-01 20:40:50 -04:00

748 lines
29 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Aether Platform knowledge tools — journal search, listing, and entry management.
These tools give the orchestrator read/write access to the AE Journals module,
which serves as the primary long-term knowledge base.
Auth: x-aether-api-key + x-account-id headers (same pattern as agents_sync scripts).
API: V3 CRUD — POST /v3/crud/journal_entry/search, POST /v3/crud/journal/{id}/journal_entry/
PATCH /v3/crud/journal_entry/{entry_id}, GET /v3/crud/journal_entry/{entry_id}
"""
import asyncio
import logging
from google.genai import types
from config import settings
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Shared helpers
# ---------------------------------------------------------------------------
def _headers() -> dict:
return {
"x-aether-api-key": settings.ae_api_key,
"x-account-id": settings.ae_account_id,
"Content-Type": "application/json",
}
def _check_config() -> str | None:
"""Return an error string if AE API is not configured, else None."""
if not settings.ae_api_key or not settings.ae_account_id:
return (
"AE API not configured. Set AE_API_KEY and AE_ACCOUNT_ID in .env. "
"Values are the same as agents_sync/mcp/.env."
)
return None
# ---------------------------------------------------------------------------
# Tool: ae_journal_search
# ---------------------------------------------------------------------------
async def journal_search(
query: str = "",
journal_id: str = "",
tags: str = "",
type_code: str = "",
topic_code: str = "",
date_from: str = "",
date_to: str = "",
sort_by: str = "updated",
sort_order: str = "desc",
status: int | None = None,
priority: int | None = None,
max_results: int = 10,
page: int = 1,
) -> str:
"""Search AE Journal entries.
At least one of query, tags, type_code, topic_code, date_from, or journal_id
should be provided. All filters combine with AND.
"""
err = _check_config()
if err:
return err
return await asyncio.to_thread(
_sync_journal_search,
query, journal_id, tags, type_code, topic_code,
date_from, date_to, sort_by, sort_order,
status, priority, max_results, page,
)
def _sync_journal_search(
query: str,
journal_id: str,
tags: str,
type_code: str,
topic_code: str,
date_from: str,
date_to: str,
sort_by: str,
sort_order: str,
status: int | None,
priority: int | None,
max_results: int,
page: int,
) -> str:
import requests
# Build sort field
sort_field_map = {
"updated": "updated_on",
"created": "created_on",
"name": "name",
"priority": "priority",
}
sort_field = sort_field_map.get(sort_by, "updated_on")
order_by = f"{'-' if sort_order == 'desc' else ''}{sort_field}"
search_body: dict = {"page_size": max_results, "page": page, "order_by": order_by}
# Fulltext keyword — uses MATCH/AGAINST index
if query:
search_body["query_string"] = query
# Additional AND filters
and_filters: list[dict] = []
if tags:
and_filters.append({"field": "tags", "op": "icontains", "value": tags})
if type_code:
and_filters.append({"field": "type_code", "op": "eq", "value": type_code})
if topic_code:
and_filters.append({"field": "topic_code", "op": "eq", "value": topic_code})
if date_from:
and_filters.append({"field": "created_on", "op": "gte", "value": date_from})
if date_to:
and_filters.append({"field": "created_on", "op": "lte", "value": date_to})
if status is not None:
and_filters.append({"field": "status", "op": "eq", "value": status})
if priority is not None:
and_filters.append({"field": "priority", "op": "eq", "value": priority})
if and_filters:
search_body["and"] = and_filters
# query_string must be present for `and` filters to apply
if "query_string" not in search_body:
search_body["query_string"] = "%"
params: dict = {}
if journal_id:
params["for_obj_type"] = "journal"
params["for_obj_id"] = journal_id
url = f"{settings.ae_api_url}/v3/crud/journal_entry/search"
try:
resp = requests.post(
url,
headers=_headers(),
params=params,
json=search_body,
timeout=settings.ae_api_timeout,
)
resp.raise_for_status()
data = resp.json()
except Exception as e:
logger.warning("ae_journal_search failed: %s", e)
return f"Journal search error: {e}"
entries = data.get("data", [])
total = (data.get("meta") or {}).get("data_list_count") or len(entries)
if not entries:
desc = query or tags or type_code or topic_code or f"journal {journal_id}"
return f"No journal entries found for: {desc}"
label = query or tags or f"{len(entries)} entries"
lines = [f"Journal entries — **{label}** ({total} total, page {page}):\n"]
for entry in entries:
title = entry.get("name") or "(untitled)"
entry_id = entry.get("journal_entry_id") or entry.get("id") or ""
journal_name = entry.get("journal_name") or entry.get("parent_name") or ""
summary = entry.get("summary") or ""
entry_tags = entry.get("tags") or []
updated = (entry.get("updated_on") or entry.get("created_on") or "")[:10]
content_preview = (entry.get("content") or "")[:400].replace("\n", " ")
header = f"**{title}**"
if journal_name:
header += f" ({journal_name})"
header += f" — id: `{entry_id}`"
if updated:
header += f" [{updated}]"
lines.append(header)
if entry_tags:
tag_list = entry_tags if isinstance(entry_tags, list) else [t.strip() for t in str(entry_tags).split(",")]
lines.append(f" Tags: {', '.join(tag_list)}")
if summary:
lines.append(f" {summary}")
elif content_preview:
lines.append(f" {content_preview}{'' if len(entry.get('content', '')) > 400 else ''}")
lines.append("")
if total > page * max_results:
lines.append(f"(More results — call again with page={page + 1})")
return "\n".join(lines).strip()
# ---------------------------------------------------------------------------
# Tool: ae_journal_list
# ---------------------------------------------------------------------------
async def journal_list() -> str:
"""List all journals accessible to the configured AE account."""
err = _check_config()
if err:
return err
return await asyncio.to_thread(_sync_journal_list)
def _sync_journal_list() -> str:
import requests
url = f"{settings.ae_api_url}/v3/crud/journal/search"
try:
resp = requests.post(
url,
headers=_headers(),
json={"page_size": 100},
timeout=settings.ae_api_timeout,
)
resp.raise_for_status()
data = resp.json()
except Exception as e:
logger.warning("ae_journal_list failed: %s", e)
return f"Journal list error: {e}"
journals = data.get("data", [])
if not journals:
return "No journals found for this account."
lines = [f"Journals ({len(journals)}):\n"]
for j in journals:
jid = j.get("journal_id") or j.get("id_random") or j.get("id") or "?"
name = j.get("name") or "(untitled)"
desc = j.get("description") or ""
line = f"- **{name}** — id: `{jid}`"
if desc:
line += f"\n {desc}"
lines.append(line)
return "\n".join(lines)
# ---------------------------------------------------------------------------
# Tool: ae_journal_entry_create
# ---------------------------------------------------------------------------
async def journal_entry_create(
journal_id: str,
title: str,
content: str,
summary: str = "",
tags: str = "",
) -> str:
"""Create a new entry in an AE Journal.
Args:
journal_id: The id_random of the target journal (use ae_journal_search to find it,
or ask the user which journal to write to).
title: Entry title (name field).
content: Full entry content (markdown supported).
summary: Optional short summary (1-2 sentences).
tags: Optional comma-separated tags.
Returns a confirmation with the new entry's id_random, or an error message.
"""
err = _check_config()
if err:
return err
return await asyncio.to_thread(
_sync_journal_entry_create, journal_id, title, content, summary, tags
)
def _sync_journal_entry_create(
journal_id: str, title: str, content: str, summary: str, tags: str
) -> str:
import requests
url = f"{settings.ae_api_url}/v3/crud/journal/{journal_id}/journal_entry/"
data: dict = {"name": title, "content": content}
if summary:
data["summary"] = summary
if tags:
data["tags"] = [t.strip() for t in tags.split(",") if t.strip()]
try:
resp = requests.post(
url,
headers=_headers(),
json=data,
timeout=settings.ae_api_timeout,
)
resp.raise_for_status()
result = resp.json()
except Exception as e:
logger.warning("ae_journal_entry_create failed: %s", e)
return f"Journal entry creation error: {e}"
entry_id = (
result.get("data", {}).get("journal_entry_id")
or result.get("data", {}).get("id_random")
or result.get("id_random")
or "unknown"
)
return f"Journal entry created. id: `{entry_id}`, title: \"{title}\", journal: `{journal_id}`"
# ---------------------------------------------------------------------------
# Shared helper: fetch a single journal entry by id
# ---------------------------------------------------------------------------
def _get_entry(entry_id: str) -> dict | str:
"""Return the entry dict, or an error string on failure."""
import requests
url = f"{settings.ae_api_url}/v3/crud/journal_entry/{entry_id}"
try:
resp = requests.get(url, headers=_headers(), timeout=settings.ae_api_timeout)
resp.raise_for_status()
data = resp.json()
entry = data.get("data") or data
if not isinstance(entry, dict):
return f"Unexpected response shape for entry {entry_id}"
return entry
except Exception as e:
logger.warning("_get_entry %s failed: %s", entry_id, e)
return f"Error fetching entry {entry_id}: {e}"
def _patch_entry(entry_id: str, payload: dict) -> str:
"""PATCH a journal entry. Returns a success/error string."""
import requests
url = f"{settings.ae_api_url}/v3/crud/journal_entry/{entry_id}"
try:
resp = requests.patch(
url,
headers=_headers(),
json=payload,
timeout=settings.ae_api_timeout,
)
resp.raise_for_status()
return "ok"
except Exception as e:
logger.warning("_patch_entry %s failed: %s", entry_id, e)
return f"Error updating entry {entry_id}: {e}"
# ---------------------------------------------------------------------------
# Tool: ae_journal_entry_read
# ---------------------------------------------------------------------------
async def journal_entry_read(entry_id: str, max_content_chars: int = 4000) -> str:
"""Return the full content of a single journal entry by its id_random."""
err = _check_config()
if err:
return err
return await asyncio.to_thread(_sync_journal_entry_read, entry_id, max_content_chars)
def _sync_journal_entry_read(entry_id: str, max_content_chars: int) -> str:
entry = _get_entry(entry_id)
if isinstance(entry, str):
return entry
title = entry.get("name") or "(untitled)"
journal = entry.get("journal_name") or entry.get("parent_name") or ""
summary = entry.get("summary") or ""
raw_tags = entry.get("tags") or []
tags = raw_tags if isinstance(raw_tags, list) else [t.strip() for t in str(raw_tags).split(",") if t.strip()]
content = entry.get("content") or ""
updated = (entry.get("updated_on") or entry.get("created_on") or "")[:19].replace("T", " ")
enabled = entry.get("enable", True)
lines = [f"# {title}"]
meta: list[str] = [f"id: `{entry_id}`"]
if journal:
meta.append(f"journal: {journal}")
if updated:
meta.append(f"updated: {updated}")
if not enabled:
meta.append("**DISABLED**")
lines.append(" ".join(meta))
if tags:
lines.append(f"Tags: {', '.join(tags)}")
if summary:
lines.append(f"\nSummary: {summary}")
lines.append("\n---\n")
truncated = len(content) > max_content_chars
lines.append(content[:max_content_chars])
if truncated:
lines.append(
f"\n\n[Content truncated at {max_content_chars} chars — "
f"{len(content)} total. Call again with a higher max_content_chars to read more.]"
)
return "\n".join(lines)
# ---------------------------------------------------------------------------
# Tool: ae_journal_entries_list
# ---------------------------------------------------------------------------
async def journal_entries_list(journal_id: str, max_results: int = 20, page: int = 1) -> str:
"""List entries in a specific journal, newest first."""
err = _check_config()
if err:
return err
return await asyncio.to_thread(_sync_journal_entries_list, journal_id, max_results, page)
def _sync_journal_entries_list(journal_id: str, max_results: int, page: int) -> str:
import requests
url = f"{settings.ae_api_url}/v3/crud/journal_entry/search"
search_body: dict = {
"page_size": max_results,
"page": page,
"order_by": "-updated_on",
}
params = {"for_obj_type": "journal", "for_obj_id": journal_id}
try:
resp = requests.post(
url,
headers=_headers(),
params=params,
json=search_body,
timeout=settings.ae_api_timeout,
)
resp.raise_for_status()
data = resp.json()
except Exception as e:
logger.warning("ae_journal_entries_list failed: %s", e)
return f"Journal entries list error: {e}"
entries = data.get("data", [])
total = (data.get("meta") or {}).get("data_list_count") or len(entries)
if not entries:
return f"No entries found in journal `{journal_id}`."
offset = (page - 1) * max_results + 1
lines = [f"Entries in journal `{journal_id}` — showing {offset}{offset + len(entries) - 1} of {total}:\n"]
for i, entry in enumerate(entries, offset):
title = entry.get("name") or "(untitled)"
entry_id = entry.get("journal_entry_id") or entry.get("id") or ""
raw_tags = entry.get("tags") or []
tags = raw_tags if isinstance(raw_tags, list) else [t.strip() for t in str(raw_tags).split(",") if t.strip()]
summary = entry.get("summary") or ""
updated = (entry.get("updated_on") or entry.get("created_on") or "")[:10]
enabled = entry.get("enable", True)
status = "" if enabled else " [disabled]"
date_str = f" [{updated}]" if updated else ""
lines.append(f"{i}. **{title}**{status} — id: `{entry_id}`{date_str}")
if tags:
lines.append(f" Tags: {', '.join(tags)}")
if summary:
lines.append(f" {summary[:150]}{'' if len(summary) > 150 else ''}")
lines.append("")
if total > offset + len(entries) - 1:
lines.append(f"(More entries available — call again with page={page + 1})")
return "\n".join(lines).rstrip()
# ---------------------------------------------------------------------------
# Tool: ae_journal_entry_update
# ---------------------------------------------------------------------------
async def journal_entry_update(
entry_id: str,
title: str = "",
content: str = "",
summary: str = "",
tags: str = "",
enable: bool | None = None,
) -> str:
"""Update fields on an existing journal entry. Only provided fields are changed."""
err = _check_config()
if err:
return err
return await asyncio.to_thread(_sync_journal_entry_update, entry_id, title, content, summary, tags, enable)
def _sync_journal_entry_update(
entry_id: str,
title: str,
content: str,
summary: str,
tags: str,
enable: bool | None,
) -> str:
payload: dict = {}
if title:
payload["name"] = title
if content:
payload["content"] = content
if summary:
payload["summary"] = summary
if tags:
payload["tags"] = [t.strip() for t in tags.split(",") if t.strip()]
if enable is not None:
payload["enable"] = enable
if not payload:
return "Nothing to update — no fields provided."
result = _patch_entry(entry_id, payload)
if result != "ok":
return result
updated = ", ".join(payload.keys())
return f"Journal entry `{entry_id}` updated. Fields changed: {updated}"
# ---------------------------------------------------------------------------
# Tool: ae_journal_entry_disable
# ---------------------------------------------------------------------------
async def journal_entry_disable(entry_id: str) -> str:
"""Soft-delete a journal entry by setting enable=false."""
err = _check_config()
if err:
return err
return await asyncio.to_thread(_patch_entry, entry_id, {"enable": False})
# ---------------------------------------------------------------------------
# Tool: ae_journal_entry_append
# ---------------------------------------------------------------------------
async def journal_entry_append(entry_id: str, content: str, heading: str = "") -> str:
"""Append a timestamped section to the bottom of a journal entry's content."""
err = _check_config()
if err:
return err
return await asyncio.to_thread(_sync_journal_entry_append, entry_id, content, heading)
def _sync_journal_entry_append(entry_id: str, content: str, heading: str) -> str:
from datetime import datetime, timezone
entry = _get_entry(entry_id)
if isinstance(entry, str):
return entry
existing = (entry.get("content") or "").rstrip()
ts = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M UTC")
section_heading = heading or ts
new_content = f"{existing}\n\n### {section_heading}\n{content.strip()}"
result = _patch_entry(entry_id, {"content": new_content})
if result != "ok":
return result
return f"Appended to journal entry `{entry_id}` under heading \"{section_heading}\"."
# ---------------------------------------------------------------------------
# Tool: ae_journal_entry_prepend
# ---------------------------------------------------------------------------
async def journal_entry_prepend(entry_id: str, content: str, heading: str = "") -> str:
"""Prepend a timestamped section to the top of a journal entry's content."""
err = _check_config()
if err:
return err
return await asyncio.to_thread(_sync_journal_entry_prepend, entry_id, content, heading)
def _sync_journal_entry_prepend(entry_id: str, content: str, heading: str) -> str:
from datetime import datetime, timezone
entry = _get_entry(entry_id)
if isinstance(entry, str):
return entry
existing = (entry.get("content") or "").lstrip()
ts = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M UTC")
section_heading = heading or ts
new_content = f"### {section_heading}\n{content.strip()}\n\n{existing}"
result = _patch_entry(entry_id, {"content": new_content})
if result != "ok":
return result
return f"Prepended to journal entry `{entry_id}` under heading \"{section_heading}\"."
DECLARATIONS = [
types.FunctionDeclaration(
name="ae_journal_list",
description=(
"List all Aether Journals available for this account. "
"Returns each journal's name and id_random. "
"Call this first when you need to write a new entry or scope a search to a specific journal "
"and don't already know the journal's id."
),
parameters=types.Schema(type=types.Type.OBJECT, properties={}),
),
types.FunctionDeclaration(
name="ae_journal_search",
description=(
"Search Aether Journal entries. All parameters are optional — combine freely. "
"Use 'query' for fulltext keyword search (supports boolean: +required -excluded \"phrase\"). "
"Use 'tags' to filter by tag substring. Use 'date_from'/'date_to' for date ranges (YYYY-MM-DD). "
"Always search before creating a new entry to avoid duplicates."
),
parameters=types.Schema(
type=types.Type.OBJECT,
properties={
"query": types.Schema(type=types.Type.STRING, description="Fulltext keyword search. Supports boolean mode: +required -excluded \"exact phrase\"."),
"journal_id": types.Schema(type=types.Type.STRING, description="Scope results to a specific journal by its id_random. Omit to search all journals."),
"tags": types.Schema(type=types.Type.STRING, description="Filter by tag substring (e.g. 'networking' matches entries tagged 'networking' or 'home-networking')."),
"type_code": types.Schema(type=types.Type.STRING, description="Filter by exact type_code (e.g. 'note', 'meeting', 'log')."),
"topic_code": types.Schema(type=types.Type.STRING, description="Filter by exact topic_code."),
"date_from": types.Schema(type=types.Type.STRING, description="Return entries created on or after this date (YYYY-MM-DD)."),
"date_to": types.Schema(type=types.Type.STRING, description="Return entries created on or before this date (YYYY-MM-DD)."),
"sort_by": types.Schema(type=types.Type.STRING, description="Sort field: 'updated' (default), 'created', 'name', or 'priority'."),
"sort_order": types.Schema(type=types.Type.STRING, description="Sort direction: 'desc' (default, newest first) or 'asc'."),
"status": types.Schema(type=types.Type.INTEGER, description="Filter by exact status code."),
"priority": types.Schema(type=types.Type.INTEGER, description="Filter by exact priority (1=low, 5=high)."),
"max_results": types.Schema(type=types.Type.INTEGER, description="Number of results per page (default 10)."),
"page": types.Schema(type=types.Type.INTEGER, description="Page number for pagination (default 1)."),
},
required=[],
),
),
types.FunctionDeclaration(
name="ae_journal_entry_read",
description=(
"Fetch the full content of a single journal entry by its id_random. "
"Use this when you need to read an entry before editing it, or when search results "
"don't show enough content. Returns title, journal, tags, summary, and full content."
),
parameters=types.Schema(
type=types.Type.OBJECT,
properties={
"entry_id": types.Schema(type=types.Type.STRING, description="The id_random of the journal entry to read."),
"max_content_chars": types.Schema(type=types.Type.INTEGER, description="Maximum characters of content to return (default 4000). Increase for long entries."),
},
required=["entry_id"],
),
),
types.FunctionDeclaration(
name="ae_journal_entries_list",
description=(
"List entries in a specific journal, newest first. "
"Use this to browse what's in a journal when you don't have a search keyword, "
"or to find entries by browsing rather than searching. "
"Returns numbered entries with id, title, tags, summary, and date."
),
parameters=types.Schema(
type=types.Type.OBJECT,
properties={
"journal_id": types.Schema(type=types.Type.STRING, description="The id_random of the journal to list entries from."),
"max_results": types.Schema(type=types.Type.INTEGER, description="Number of entries to return (default 20, max 50)."),
"page": types.Schema(type=types.Type.INTEGER, description="Page number for pagination (default 1)."),
},
required=["journal_id"],
),
),
types.FunctionDeclaration(
name="ae_journal_entry_create",
description=(
"Create a new entry in an Aether Journal. "
"Use this to save notes, summaries, or any content the user wants to store. "
"Always call ae_journal_search first to check for existing entries on the same topic."
),
parameters=types.Schema(
type=types.Type.OBJECT,
properties={
"journal_id": types.Schema(type=types.Type.STRING, description="The id_random of the target journal. Ask the user which journal to write to if not specified."),
"title": types.Schema(type=types.Type.STRING, description="Entry title"),
"content": types.Schema(type=types.Type.STRING, description="Full entry content (markdown supported)"),
"summary": types.Schema(type=types.Type.STRING, description="Optional short summary (1-2 sentences)"),
"tags": types.Schema(type=types.Type.STRING, description="Optional comma-separated tags (e.g. 'wireguard, networking, homelab')"),
},
required=["journal_id", "title", "content"],
),
),
types.FunctionDeclaration(
name="ae_journal_entry_update",
description=(
"Update fields on an existing journal entry. Only the fields you provide are changed — "
"omitted fields are left as-is. Use ae_journal_search to find the entry_id first. "
"To soft-delete, use ae_journal_entry_disable instead."
),
parameters=types.Schema(
type=types.Type.OBJECT,
properties={
"entry_id": types.Schema(type=types.Type.STRING, description="Journal entry id_random"),
"title": types.Schema(type=types.Type.STRING, description="New title"),
"content": types.Schema(type=types.Type.STRING, description="Replacement content (full, markdown supported)"),
"summary": types.Schema(type=types.Type.STRING, description="New summary"),
"tags": types.Schema(type=types.Type.STRING, description="Replacement comma-separated tags"),
"enable": types.Schema(type=types.Type.BOOLEAN, description="Set false to hide/disable the entry"),
},
required=["entry_id"],
),
),
types.FunctionDeclaration(
name="ae_journal_entry_disable",
description=(
"Soft-delete a journal entry by setting enable=false. "
"The entry is hidden but not permanently removed. "
"Use ae_journal_search to find the entry_id first."
),
parameters=types.Schema(
type=types.Type.OBJECT,
properties={
"entry_id": types.Schema(type=types.Type.STRING, description="Journal entry id_random"),
},
required=["entry_id"],
),
),
types.FunctionDeclaration(
name="ae_journal_entry_append",
description=(
"Append a new section to the bottom of a journal entry's content. "
"Each section gets a UTC timestamp heading unless you provide one. "
"Ideal for timestamped logs, running notes, or data logs."
),
parameters=types.Schema(
type=types.Type.OBJECT,
properties={
"entry_id": types.Schema(type=types.Type.STRING, description="Journal entry id_random"),
"content": types.Schema(type=types.Type.STRING, description="The text to append (markdown supported)"),
"heading": types.Schema(type=types.Type.STRING, description="Optional section heading (defaults to current UTC timestamp)"),
},
required=["entry_id", "content"],
),
),
types.FunctionDeclaration(
name="ae_journal_entry_prepend",
description=(
"Prepend a new section to the top of a journal entry's content. "
"Each section gets a UTC timestamp heading unless you provide one. "
"Useful for most-recent-first logs."
),
parameters=types.Schema(
type=types.Type.OBJECT,
properties={
"entry_id": types.Schema(type=types.Type.STRING, description="Journal entry id_random"),
"content": types.Schema(type=types.Type.STRING, description="The text to prepend (markdown supported)"),
"heading": types.Schema(type=types.Type.STRING, description="Optional section heading (defaults to current UTC timestamp)"),
},
required=["entry_id", "content"],
),
),
]