Compare commits

...

9 Commits

Author SHA1 Message Date
Scott Idem
77997bc4ae feat: add cortex_status and cortex_update tools
cortex_status: git branch/commit/ahead-behind + systemctl state — read-only
cortex_update: git pull + syntax check all .py files + report; does NOT auto-restart.
  If syntax errors are found after pull, warns and blocks restart suggestion.
  Call cortex_restart separately to apply a clean update.

Both are admin-only. cortex_update is confirm-required (modifies files on disk).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-30 22:01:42 -04:00
Scott Idem
1ffa846edd docs: sync HELP.md tools table and files list with current implementation
- Add reminders_remove (targeted single-reminder removal, no confirm needed)
- Add ae_journal_entry_read, ae_journal_entries_list to AE Journals row
- Add email_send (admin-only) to Notifications row
- Remove TASKS.json from Files table (not in the Files panel)
- Add email_allowlist.json to Files table (Settings group in Files panel)
- Update last-updated date

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-30 21:54:50 -04:00
Scott Idem
98546abe21 docs: update ARCH__AE_INTEGRATION with verified API behavior
- query_string required for and/or filters to apply; use "%" as wildcard
- Total count is in meta.data_list_count, not top-level
- id_random is None in responses; Vision ID convention uses {obj_type}_id
- tags comes back as string on read, not list — normalize before joining
- Replace stale "Planned: Search Improvements" with current signature + notes
- Clarify date_to boundary (lte midnight, use next day to include full day)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-30 21:17:19 -04:00
Scott Idem
1fa5151d8a fix: correct V3 search filter key and response field names in ae_knowledge
- Filter key is "and" not "and_filters" (V3 API format)
- Entry IDs use journal_entry_id/id, not id_random (id_random is None)
- Dates use updated_on/created_on, not updated_at/created_at
- Total count lives in meta.data_list_count, not top-level total/count
- Inject query_string="%" when and filters present but no query, since
  the V3 search engine requires query_string for filters to apply
- Normalize tags from string to list in both entry_read and entries_list
- Fix order_by to use updated_on (not updated_at) in entries_list
- Correct ARCH__AE_INTEGRATION.md: and_filters → and, or_filters → or

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-30 21:12:44 -04:00
Scott Idem
71e472bebe feat: improved ae_journal_search + AE integration docs
Search improvements:
- Switched from LIKE on default_qry_str to query_string path (fulltext
  MATCH/AGAINST IN BOOLEAN MODE — uses the index, supports +/- boolean ops)
- Added tag filter (icontains on tags field)
- Added date_from / date_to filters (created_on gte/lte)
- Added type_code / topic_code exact-match filters
- Added sort_by / sort_order control (updated, created, name, priority)
- Added status / priority filters
- Added page parameter for pagination
- Richer output: updated date, tags, pagination hint
- Updated Gemini tool declaration with all new params

Docs:
- documentation/ARCH__AE_INTEGRATION.md — journal_entry full schema,
  search operator reference, current tool inventory, planned phases
  (broader AE integration: tasks, people, calendar, knowledge import)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-30 20:10:04 -04:00
Scott Idem
77327d97ad feat: improve AE Journal read toolset
- ae_journal_entry_read: expose full entry content by id_random (title,
  journal, tags, summary, full content with configurable truncation)
- ae_journal_entries_list: browse all entries in a journal newest-first,
  numbered with id/title/tags/summary/date and pagination support
- ae_journal_search: richer output — tags, updated date, 400-char preview
  (was 200), show summary OR preview (not both when summary exists)

_get_entry() was already implemented; read tool just exposes it properly.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-30 19:47:59 -04:00
Scott Idem
36fdda6728 feat: add reminders_remove tool for single-reminder removal
- reminders_remove(index) removes one reminder by 1-based index
- reminders_list now returns numbered output (1. heading / body)
  so any model can easily identify which index to pass
- _parse_sections() / _sections_to_text() helpers for clean round-trip
- Not in CONFIRM_REQUIRED — targeted removal is safe without a gate

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-30 19:27:53 -04:00
Scott Idem
6405dd338d feat: proper confirmation-resume flow + per-user tool policy
Fixes the broken confirmation gate where users had no way to approve
or deny a blocked tool call in the web UI.

Changes:
- orchestrator_engine.py: add OrchestrateCheckpoint dataclass, extract
  loop into _run_from_contents(), add resume() function
- openai_orchestrator.py: same treatment — _run_from_messages(), resume()
- routers/orchestrator.py: POST /{job_id}/confirm and /deny endpoints,
  separate _checkpoints store, _resume_job() + _finalize_job() helpers,
  "awaiting_confirmation" job status with pending_confirmation payload
- auth_utils.py: get_tool_policy() and save_tool_policy() helpers reading
  home/{user}/tool_policy.json (allow/deny lists)
- routers/orchestrator.py: loads tool_policy per user and passes
  confirm_allow/confirm_deny to both engines
- app.js: poll loop handles awaiting_confirmation — shows Confirm/Deny
  buttons inline, resumes polling after user action
- settings.html + settings.py: Tool Permissions section with allow/deny
  textareas, POST /settings/tool-policy route
- style.css: .confirm-gate, .confirm-btn, .deny-btn styles

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-30 19:14:53 -04:00
Scott Idem
bce7de647c feat: proactive notifications — email, NC Talk, Google Chat per user
notification.py now handles all three outbound channels. Email defaults
to the user's login address (google_email from auth.json); an optional
override can be set in channels.json. Google Chat uses an incoming
webhook URL. NC Talk was already wired, just needs notification_room set.

Settings page gains a Notifications section: channel dropdown, optional
email override, NC room token, and Google Chat webhook URL. All stored
in per-user channels.json.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-29 22:32:22 -04:00
15 changed files with 1718 additions and 222 deletions

View File

@@ -224,3 +224,22 @@ def get_user_channels(username: str) -> dict:
return json.loads(path.read_text())
except Exception:
return {}
def get_tool_policy(username: str) -> dict:
"""Return the parsed tool_policy.json for a user.
Keys:
allow — tools in CONFIRM_REQUIRED that this user has pre-approved (skip gate)
deny — tools always blocked for this user regardless of global CONFIRM_REQUIRED
"""
path = settings.home_root() / username / "tool_policy.json"
try:
return json.loads(path.read_text())
except Exception:
return {}
def save_tool_policy(username: str, data: dict) -> None:
path = settings.home_root() / username / "tool_policy.json"
path.write_text(json.dumps(data, indent=2) + "\n")

View File

@@ -1,22 +1,21 @@
"""
Outbound notification helpers — send messages to user channels proactively.
Channel config lives in home/{user}/channels.json.
Each channel that supports proactive notifications needs a notification_channel
set to its key name (e.g. "nextcloud", "google_chat") in the user's channels.json:
Channel config lives in home/{user}/channels.json:
{
"notification_channel": "nextcloud",
"notification_channel": "email" | "nextcloud" | "google_chat",
"notification_email": "<override address — defaults to login email>",
"nextcloud": {
"url": "https://cloud.example.com",
"bot_secret": "...",
"notification_room": "<room-token>",
...
"url": "...", "bot_secret": "...", "notification_room": "<token>", ...
},
"google_chat": {
"outbound_webhook": "https://chat.googleapis.com/v1/spaces/...", ...
}
}
If notification_channel is absent, defaults to "nextcloud" if configured.
If notification_room (for NCT) is absent, notifications are silently skipped.
"""
import asyncio
import hashlib
import hmac
import json
@@ -73,34 +72,89 @@ async def _notify_nct(nct: dict, message: str, username: str) -> None:
await _send_nct_message(url, secret, room, message)
async def _notify_email(username: str, message: str, email_override: str | None = None) -> None:
"""Send notification via email. Address = override → google_email from auth.json."""
from auth_utils import _read_auth
from email_utils import send_email
to_addr = email_override or _read_auth(username).get("google_email", "").strip()
if not to_addr:
logger.warning("notify: no email address for %s — set notification_email in channels.json", username)
return
ok = await asyncio.to_thread(
send_email,
to_email=to_addr,
subject="Cortex",
body_text=message,
body_html=message.replace("\n", "<br>"),
)
if ok:
logger.info("notify email → %s (%d chars)", to_addr, len(message))
else:
logger.warning("notify: email send failed for %s", username)
async def _notify_google_chat(webhook_url: str, message: str, username: str) -> None:
"""POST a message to a Google Chat space via incoming webhook."""
body = json.dumps({"text": message}, ensure_ascii=False).encode("utf-8")
try:
async with httpx.AsyncClient() as client:
resp = await client.post(
webhook_url,
content=body,
headers={"Content-Type": "application/json"},
timeout=15,
)
if resp.status_code not in (200, 201):
logger.warning("notify Google Chat %s → HTTP %d: %s", username, resp.status_code, resp.text[:200])
else:
logger.info("notify Google Chat → %s (%d chars)", username, len(message))
except Exception as e:
logger.error("notify Google Chat error for %s: %s", username, e)
async def notify(username: str, message: str, channel: str | None = None) -> None:
"""Send a notification to the user's preferred outbound channel.
Channel resolution order:
1. `channel` parameter if provided
2. `notification_channel` key in channels.json
3. "nextcloud" if configured
3. "nextcloud" if notification_room is configured
4. Silent no-op
To configure: set `notification_channel` in home/{user}/channels.json.
For NCT: also set `notification_room` in the nextcloud section.
Configure via home/{user}/channels.json — see module docstring.
"""
from auth_utils import get_user_channels
channels = get_user_channels(username)
target = channel or channels.get("notification_channel", "").strip()
if not target:
# Auto-detect: use nextcloud if configured
if "nextcloud" in channels:
# Auto-detect: nextcloud if a notification_room is set
nct = channels.get("nextcloud", {})
if nct.get("notification_room", "").strip():
target = "nextcloud"
else:
return
if target == "nextcloud":
if target == "email":
email_override = channels.get("notification_email", "").strip() or None
await _notify_email(username, message, email_override=email_override)
elif target == "nextcloud":
nct = channels.get("nextcloud")
if not nct:
logger.debug("notify: nextcloud not configured for %s", username)
return
await _notify_nct(nct, message, username)
elif target == "google_chat":
gc = channels.get("google_chat", {})
webhook = gc.get("outbound_webhook", "").strip()
if not webhook:
logger.debug("notify: google_chat outbound_webhook not set for %s", username)
return
await _notify_google_chat(webhook, message, username)
else:
logger.debug("notify: channel %r not yet supported for outbound (user %s)", target, username)
logger.debug("notify: channel %r not supported for outbound (user %s)", target, username)

View File

@@ -24,8 +24,8 @@ import logging
from openai import AsyncOpenAI
from config import settings
from orchestrator_engine import OrchestratorResult
from tools import OPENAI_TOOL_SCHEMAS, call_tool, get_openai_tools_for_role, CONFIRM_REQUIRED
from orchestrator_engine import OrchestrateCheckpoint, OrchestratorResult
from tools import OPENAI_TOOL_SCHEMAS, call_tool, get_openai_tools_for_role, get_tools_for_role, CONFIRM_REQUIRED
logger = logging.getLogger(__name__)
@@ -45,6 +45,8 @@ async def run(
model_cfg: dict | None = None,
respond_with_final: bool = True,
user_role: str = "user",
confirm_allow: set[str] | None = None,
confirm_deny: set[str] | None = None,
) -> OrchestratorResult:
"""
Run a tool-enabled task using an OpenAI-compatible API.
@@ -56,36 +58,22 @@ async def run(
model_cfg: Resolved model config from model_registry (local_openai type)
respond_with_final: If False, return just the tool-loop summary without a
full persona-voiced response (faster; for cron/background)
confirm_allow: Tools to bypass the confirmation gate for this user
confirm_deny: Tools to always block for this user
Returns:
OrchestratorResult — same shape as the Gemini engine for drop-in compatibility
OrchestratorResult — if checkpoint is set, the job is awaiting confirmation
"""
if not model_cfg:
raise RuntimeError("model_cfg is required for the OpenAI orchestrator")
api_url = model_cfg.get("api_url", "")
api_key = model_cfg.get("api_key", "") or "none"
model_name = model_cfg.get("model_name", "")
host_type = model_cfg.get("host_type", "openwebui")
_confirm_allow = frozenset(confirm_allow or ())
_confirm_deny = frozenset(confirm_deny or ())
effective_confirm = (CONFIRM_REQUIRED - set(_confirm_allow)) | set(_confirm_deny)
if not api_url or not model_name:
raise RuntimeError(
f"model_cfg missing api_url or model_name: {model_cfg.get('label', model_cfg)}"
)
client, model_name, active_tools = _build_client(model_cfg)
# Open WebUI's OpenAI-compatible endpoint lives at /api/chat/completions,
# so the SDK base_url needs the /api prefix; standard OpenAI-layout hosts don't.
base_url = api_url.rstrip("/")
if host_type == "openwebui":
base_url = base_url + "/api"
client = AsyncOpenAI(base_url=base_url, api_key=api_key)
# System prompt: persona context + brief tool instruction
sys_content = (system_prompt or "") + _TOOL_INSTRUCTION
# Build messages: [system, ...recent_session, current_task]
# Strip non-standard metadata fields (backend, host, etc.) before sending.
messages: list[dict] = [{"role": "system", "content": sys_content}]
if session_messages:
messages.extend(
@@ -94,13 +82,132 @@ async def run(
)
messages.append({"role": "user", "content": task})
active_tools = get_openai_tools_for_role(user_role)
active_callables: dict | None = None # resolved lazily below
tool_call_log: list[dict] = []
final_response, checkpoint = await _run_from_messages(
client=client,
messages=messages,
active_tools=active_tools,
tool_call_log=tool_call_log,
effective_confirm=effective_confirm,
model_name=model_name,
task=task,
model_cfg=model_cfg,
respond_with_final=respond_with_final,
user_role=user_role,
confirm_allow=_confirm_allow,
confirm_deny=_confirm_deny,
starting_round=0,
)
if checkpoint:
return OrchestratorResult(
response=final_response,
tool_calls=list(tool_call_log),
backend="local",
gemini_summary=final_response,
checkpoint=checkpoint,
)
model_label = model_cfg.get("label") or model_name
logger.info("OpenAI orchestrator complete — model=%s tools=%d", model_label, len(tool_call_log))
return OrchestratorResult(
response=final_response,
tool_calls=tool_call_log,
backend="local",
gemini_summary=final_response,
)
async def resume(checkpoint: OrchestrateCheckpoint, confirmed: bool) -> OrchestratorResult:
"""Continue an OpenAI orchestrator job that was paused at a confirmation gate."""
client, model_name, active_tools = _build_client(checkpoint.model_cfg)
effective_confirm = (CONFIRM_REQUIRED - set(checkpoint.confirm_allow)) | set(checkpoint.confirm_deny)
messages = list(checkpoint.pre_fn_state)
tool_call_log = [t for t in checkpoint.tool_call_log if t["result"] != "[awaiting confirmation]"]
# Build tool responses for this round
for er in checkpoint.executed_results:
messages.append({
"role": "tool",
"tool_call_id": er.get("tool_call_id", er["name"]),
"content": er["result"],
})
for pt in checkpoint.pending_tools:
if confirmed:
_, callables = get_tools_for_role(checkpoint.user_role)
result_str = await _execute_tool_dict(pt["name"], pt["args"], checkpoint.user_role)
logger.info("Confirmed tool %s%d chars", pt["name"], len(result_str))
else:
result_str = "Action denied by user."
logger.info("Tool %s denied by user", pt["name"])
tool_call_log.append({"tool": pt["name"], "args": pt["args"], "result": result_str})
messages.append({
"role": "tool",
"tool_call_id": pt.get("tool_call_id", pt["name"]),
"content": result_str,
})
final_response, new_checkpoint = await _run_from_messages(
client=client,
messages=messages,
active_tools=active_tools,
tool_call_log=tool_call_log,
effective_confirm=effective_confirm,
model_name=model_name,
task=checkpoint.task,
model_cfg=checkpoint.model_cfg,
respond_with_final=checkpoint.respond_with_final,
user_role=checkpoint.user_role,
confirm_allow=checkpoint.confirm_allow,
confirm_deny=checkpoint.confirm_deny,
starting_round=checkpoint.rounds_used,
)
if new_checkpoint:
return OrchestratorResult(
response=final_response,
tool_calls=list(tool_call_log),
backend="local",
gemini_summary=final_response,
checkpoint=new_checkpoint,
)
model_label = (checkpoint.model_cfg or {}).get("label") or model_name
logger.info("OpenAI orchestrator resumed — model=%s tools=%d", model_label, len(tool_call_log))
return OrchestratorResult(
response=final_response,
tool_calls=tool_call_log,
backend="local",
gemini_summary=final_response,
)
async def _run_from_messages(
client,
messages: list[dict],
active_tools: list,
tool_call_log: list[dict],
effective_confirm: set[str],
model_name: str,
task: str,
model_cfg: dict | None,
respond_with_final: bool,
user_role: str,
confirm_allow: frozenset,
confirm_deny: frozenset,
starting_round: int = 0,
) -> tuple[str, OrchestrateCheckpoint | None]:
"""
Run the OpenAI ReAct loop from the current messages state.
Returns (final_response, checkpoint) — checkpoint is set if confirmation is needed.
"""
final_response = ""
for round_num in range(settings.orchestrator_max_rounds):
for round_num in range(starting_round, settings.orchestrator_max_rounds):
logger.info("OpenAI orchestrator round %d / %d model=%s",
round_num + 1, settings.orchestrator_max_rounds, model_name)
@@ -112,29 +219,28 @@ async def run(
)
choice = response.choices[0]
msg = choice.message
msg = choice.message
# Append the assistant turn (MUST include tool_calls if present so the
# next request is valid — OpenAI requires the full history to be consistent)
assistant_msg: dict = {"role": "assistant"}
if msg.content:
assistant_msg["content"] = msg.content
if msg.tool_calls:
assistant_msg["tool_calls"] = [
{
"id": tc.id,
"id": tc.id,
"type": "function",
"function": {
"name": tc.function.name,
"arguments": tc.function.arguments,
},
"function": {"name": tc.function.name, "arguments": tc.function.arguments},
}
for tc in msg.tool_calls
]
messages.append(assistant_msg)
if choice.finish_reason == "tool_calls" and msg.tool_calls:
confirm_requested = False
# Snapshot state before tool responses for potential checkpoint
pre_fn_state = list(messages)
pending_tools: list[dict] = []
executed_results: list[dict] = []
for tc in msg.tool_calls:
name = tc.function.name
@@ -143,34 +249,23 @@ async def run(
except json.JSONDecodeError:
args_parsed = {"raw": tc.function.arguments}
if name in CONFIRM_REQUIRED:
args_str = json.dumps(args_parsed, indent=2) if args_parsed else "(no arguments)"
result_str = (
f"⚠️ CONFIRMATION REQUIRED ⚠️\n"
f"Tool: {name}\nArguments:\n{args_str}\n\n"
f"Do NOT call this tool again. Tell the user exactly what you were "
f"about to do, explain the potential impact, and ask them to confirm "
f"by sending a follow-up message before you proceed."
)
confirm_requested = True
if name in effective_confirm:
pending_tools.append({"name": name, "args": args_parsed, "tool_call_id": tc.id})
logger.info("Tool %s blocked — confirmation required", name)
else:
result_str = await _execute_tool(name, tc.function.arguments, user_role)
logger.info("Tool %s%d chars", name, len(result_str))
executed_results.append({"name": name, "args": args_parsed, "result": result_str, "tool_call_id": tc.id})
tool_call_log.append({"tool": name, "args": args_parsed, "result": result_str})
messages.append({"role": "tool", "tool_call_id": tc.id, "content": result_str})
tool_call_log.append({
"tool": name,
"args": args_parsed,
"result": "[awaiting confirmation]" if name in CONFIRM_REQUIRED else result_str,
})
messages.append({
"role": "tool",
"tool_call_id": tc.id,
"content": result_str,
})
if pending_tools:
# Add placeholder responses
for pt in pending_tools:
placeholder = f"[AWAITING USER CONFIRMATION for {pt['name']}]"
tool_call_log.append({"tool": pt["name"], "args": pt["args"], "result": "[awaiting confirmation]"})
messages.append({"role": "tool", "tool_call_id": pt["tool_call_id"], "content": placeholder})
if confirm_requested:
# One more model round to produce the confirmation-request message, then stop.
conf_resp = await client.chat.completions.create(
model=model_name,
messages=messages,
@@ -180,10 +275,24 @@ async def run(
final_response = conf_resp.choices[0].message.content or (
"This action requires your explicit confirmation before it can proceed."
)
break
checkpoint = OrchestrateCheckpoint(
engine="openai",
pre_fn_state=pre_fn_state,
executed_results=executed_results,
pending_tools=pending_tools,
tool_call_log=list(tool_call_log),
task=task,
model_cfg=model_cfg,
respond_with_final=respond_with_final,
user_role=user_role,
confirm_allow=confirm_allow,
confirm_deny=confirm_deny,
rounds_used=round_num + 2,
)
return final_response, checkpoint
else:
# finish_reason == "stop" (or no tool_calls) — model is done
final_response = msg.content or ""
logger.info(
"OpenAI orchestrator done after %d round(s). Tools used: %d",
@@ -192,30 +301,37 @@ async def run(
break
else:
# Hit the round limit
logger.warning("OpenAI orchestrator hit max rounds (%d)", settings.orchestrator_max_rounds)
final_response = (
f"Reached the tool iteration limit ({settings.orchestrator_max_rounds} rounds). "
"Here is what was gathered:\n\n"
+ "\n\n".join(
f"**{t['tool']}**: {t['result'][:500]}" for t in tool_call_log
)
+ "\n\n".join(f"**{t['tool']}**: {t['result'][:500]}" for t in tool_call_log)
)
model_label = model_cfg.get("label") or model_name
logger.info("OpenAI orchestrator complete — model=%s tools=%d", model_label, len(tool_call_log))
return final_response, None
return OrchestratorResult(
response=final_response,
tool_calls=tool_call_log,
backend="local",
gemini_summary=final_response, # reused for UI display; same content in single-model mode
)
def _build_client(model_cfg: dict | None) -> tuple:
"""Build AsyncOpenAI client and return (client, model_name, active_tools)."""
if not model_cfg:
raise RuntimeError("model_cfg is required for the OpenAI orchestrator")
api_url = model_cfg.get("api_url", "")
api_key = model_cfg.get("api_key", "") or "none"
model_name = model_cfg.get("model_name", "")
host_type = model_cfg.get("host_type", "openwebui")
if not api_url or not model_name:
raise RuntimeError(
f"model_cfg missing api_url or model_name: {model_cfg.get('label', model_cfg)}"
)
base_url = api_url.rstrip("/")
if host_type == "openwebui":
base_url = base_url + "/api"
client = AsyncOpenAI(base_url=base_url, api_key=api_key)
return client, model_name, OPENAI_TOOL_SCHEMAS
async def _execute_tool(name: str, arguments_json: str, user_role: str = "user") -> str:
"""Parse tool arguments and execute with role-filtered callables."""
from tools import get_tools_for_role
_, callables = get_tools_for_role(user_role)
try:
args = json.loads(arguments_json)
@@ -226,3 +342,13 @@ async def _execute_tool(name: str, arguments_json: str, user_role: str = "user")
except Exception as e:
logger.warning("Tool %s failed: %s", name, e)
return f"Tool error: {e}"
async def _execute_tool_dict(name: str, args: dict, user_role: str = "user") -> str:
"""Execute a tool from a pre-parsed args dict."""
_, callables = get_tools_for_role(user_role)
try:
return await call_tool(name, args, callables)
except Exception as e:
logger.warning("Tool %s failed: %s", name, e)
return f"Tool error: {e}"

View File

@@ -44,12 +44,39 @@ Keep your summary factual and complete. Include relevant URLs, data, and specifi
If no tools are needed, return an empty summary."""
@dataclass
class OrchestrateCheckpoint:
"""Saved execution state for a job paused at a confirmation gate."""
engine: str # "gemini" | "openai"
pre_fn_state: list # conversation state before function responses
executed_results: list[dict] # tools that already ran this round
pending_tools: list[dict] # [{name, args}] awaiting confirmation
tool_call_log: list[dict] # all tool calls so far
task: str
# Gemini-specific config (unused by openai engine)
system_prompt: str = ""
session_messages: list | None = None
model_name: str | None = None
gemini_api_key: str | None = None
respond_with_claude: bool = True
response_role: str = "chat"
# OpenAI-specific config (unused by gemini engine)
model_cfg: dict | None = None
respond_with_final: bool = True
# Common
user_role: str = "user"
confirm_allow: frozenset = field(default_factory=frozenset)
confirm_deny: frozenset = field(default_factory=frozenset)
rounds_used: int = 0
@dataclass
class OrchestratorResult:
response: str # final user-facing response (from Claude)
tool_calls: list[dict] = field(default_factory=list) # [{tool, args, result}]
backend: str = "claude" # model that produced the final response
gemini_summary: str = "" # what Gemini handed to Claude (debug/display)
checkpoint: OrchestrateCheckpoint | None = None # set when awaiting confirmation
async def run(
@@ -61,6 +88,8 @@ async def run(
model_name: str | None = None,
response_role: str = "chat",
user_role: str = "user",
confirm_allow: set[str] | None = None,
confirm_deny: set[str] | None = None,
) -> OrchestratorResult:
"""
Run the full orchestration loop for a task.
@@ -72,9 +101,11 @@ async def run(
respond_with_claude: If False, return Gemini's summary as the response (useful for
background/cron tasks where a polished reply isn't needed)
gemini_api_key: Per-user Gemini API key (falls back to GEMINI_API_KEY in .env)
confirm_allow: Tools to bypass the confirmation gate for this user
confirm_deny: Tools to always block for this user
Returns:
OrchestratorResult with response, tool call log, backend used, and Gemini summary
OrchestratorResult — if checkpoint is set, the job is awaiting confirmation
"""
api_key = gemini_api_key or settings.gemini_api_key
if not api_key:
@@ -85,19 +116,157 @@ async def run(
client = genai.Client(api_key=api_key)
# Seed Gemini with the task — include recent session context if available
_confirm_allow = frozenset(confirm_allow or ())
_confirm_deny = frozenset(confirm_deny or ())
effective_confirm = (CONFIRM_REQUIRED - set(_confirm_allow)) | set(_confirm_deny)
task_with_context = _build_task_prompt(task, session_messages)
contents: list[types.Content] = [
types.Content(role="user", parts=[types.Part(text=task_with_context)])
]
tool_declarations, tool_callables = get_tools_for_role(user_role)
tool_call_log: list[dict] = []
gemini_summary, checkpoint = await _run_from_contents(
client=client,
contents=contents,
tool_declarations=tool_declarations,
tool_callables=tool_callables,
tool_call_log=tool_call_log,
effective_confirm=effective_confirm,
model_name=model_name,
task=task,
system_prompt=system_prompt,
session_messages=session_messages,
respond_with_claude=respond_with_claude,
response_role=response_role,
user_role=user_role,
confirm_allow=_confirm_allow,
confirm_deny=_confirm_deny,
starting_round=0,
gemini_api_key=api_key,
)
if checkpoint:
return OrchestratorResult(
response=gemini_summary,
tool_calls=list(tool_call_log),
backend="gemini",
gemini_summary=gemini_summary,
checkpoint=checkpoint,
)
return await _claude_handoff(
task=task,
tool_call_log=tool_call_log,
gemini_summary=gemini_summary,
system_prompt=system_prompt,
session_messages=session_messages,
respond_with_claude=respond_with_claude,
response_role=response_role,
)
async def resume(checkpoint: OrchestrateCheckpoint, confirmed: bool) -> OrchestratorResult:
"""Continue a job that was paused at a confirmation gate."""
api_key = checkpoint.gemini_api_key or settings.gemini_api_key
client = genai.Client(api_key=api_key)
tool_declarations, tool_callables = get_tools_for_role(checkpoint.user_role)
effective_confirm = (CONFIRM_REQUIRED - set(checkpoint.confirm_allow)) | set(checkpoint.confirm_deny)
# Rebuild from saved state — strip "[awaiting confirmation]" placeholders
contents = list(checkpoint.pre_fn_state)
tool_call_log = [t for t in checkpoint.tool_call_log if t["result"] != "[awaiting confirmation]"]
# Build function responses for this round
response_parts: list[types.Part] = []
for er in checkpoint.executed_results:
response_parts.append(types.Part(function_response=types.FunctionResponse(
name=er["name"], response={"result": er["result"]}
)))
for pt in checkpoint.pending_tools:
if confirmed:
result_str = await _execute_tool(pt["name"], pt["args"], tool_callables)
logger.info("Confirmed tool %s%d chars", pt["name"], len(result_str))
else:
result_str = "Action denied by user."
logger.info("Tool %s denied by user", pt["name"])
tool_call_log.append({"tool": pt["name"], "args": pt["args"], "result": result_str})
response_parts.append(types.Part(function_response=types.FunctionResponse(
name=pt["name"], response={"result": result_str}
)))
contents.append(types.Content(role="user", parts=response_parts))
gemini_summary, new_checkpoint = await _run_from_contents(
client=client,
contents=contents,
tool_declarations=tool_declarations,
tool_callables=tool_callables,
tool_call_log=tool_call_log,
effective_confirm=effective_confirm,
model_name=checkpoint.model_name,
task=checkpoint.task,
system_prompt=checkpoint.system_prompt,
session_messages=checkpoint.session_messages,
respond_with_claude=checkpoint.respond_with_claude,
response_role=checkpoint.response_role,
user_role=checkpoint.user_role,
confirm_allow=checkpoint.confirm_allow,
confirm_deny=checkpoint.confirm_deny,
starting_round=checkpoint.rounds_used,
gemini_api_key=api_key,
)
if new_checkpoint:
return OrchestratorResult(
response=gemini_summary,
tool_calls=list(tool_call_log),
backend="gemini",
gemini_summary=gemini_summary,
checkpoint=new_checkpoint,
)
return await _claude_handoff(
task=checkpoint.task,
tool_call_log=tool_call_log,
gemini_summary=gemini_summary,
system_prompt=checkpoint.system_prompt,
session_messages=checkpoint.session_messages,
respond_with_claude=checkpoint.respond_with_claude,
response_role=checkpoint.response_role,
)
async def _run_from_contents(
client,
contents: list,
tool_declarations: list,
tool_callables: dict,
tool_call_log: list[dict],
effective_confirm: set[str],
model_name: str | None,
task: str,
system_prompt: str,
session_messages: list[dict] | None,
respond_with_claude: bool,
response_role: str,
user_role: str,
confirm_allow: frozenset,
confirm_deny: frozenset,
starting_round: int = 0,
gemini_api_key: str | None = None,
) -> tuple[str, OrchestrateCheckpoint | None]:
"""
Run the ReAct loop from the current contents state.
Returns (gemini_summary, checkpoint) — checkpoint is set if confirmation is needed.
"""
gemini_summary = ""
# --- ReAct tool loop ---
for round_num in range(settings.orchestrator_max_rounds):
for round_num in range(starting_round, settings.orchestrator_max_rounds):
logger.info("Orchestrator round %d for task: %.80s", round_num + 1, task)
response = await asyncio.to_thread(
@@ -113,67 +282,56 @@ async def run(
candidate = response.candidates[0]
parts = candidate.content.parts if candidate.content else []
# Check if Gemini wants to call any tools
tool_call_parts = [
p for p in parts
if hasattr(p, "function_call") and p.function_call and p.function_call.name
]
if not tool_call_parts:
# No more tool calls — extract Gemini's text summary
gemini_summary = "".join(
p.text for p in parts if hasattr(p, "text") and p.text
).strip()
logger.info("Orchestrator done after %d round(s). Tools used: %d",
round_num + 1, len(tool_call_log))
break
return gemini_summary, None
# Add Gemini's response (with function calls) to the conversation
contents.append(candidate.content)
# Execute tool calls — check confirmation requirement before calling
# Snapshot state before function responses — used if a checkpoint is needed
pre_fn_state = list(contents)
response_parts: list[types.Part] = []
confirm_requested = False
pending_tools: list[dict] = []
executed_results: list[dict] = []
for fc_part in tool_call_parts:
fc = fc_part.function_call
name = fc.name
args = dict(fc.args)
if name in CONFIRM_REQUIRED:
args_str = json.dumps(args, indent=2) if args else "(no arguments)"
result_str = (
f"⚠️ CONFIRMATION REQUIRED ⚠️\n"
f"Tool: {name}\nArguments:\n{args_str}\n\n"
f"Do NOT call this tool again. Tell the user exactly what you were "
f"about to do, explain the potential impact, and ask them to confirm "
f"by sending a follow-up message before you proceed."
)
confirm_requested = True
if name in effective_confirm:
pending_tools.append({"name": name, "args": args})
logger.info("Tool %s blocked — confirmation required", name)
else:
result_str = await _execute_tool(name, args, tool_callables)
logger.info("Tool %s%d chars", name, len(result_str))
executed_results.append({"name": name, "args": args, "result": result_str})
tool_call_log.append({"tool": name, "args": args, "result": result_str})
response_parts.append(types.Part(function_response=types.FunctionResponse(
name=name, response={"result": result_str}
)))
tool_call_log.append({
"tool": name,
"args": args,
"result": "[awaiting confirmation]" if name in CONFIRM_REQUIRED else result_str,
})
response_parts.append(
types.Part(
function_response=types.FunctionResponse(
name=name,
response={"result": result_str},
)
)
)
if pending_tools:
# Add placeholder responses and get Gemini to produce the confirmation message
for pt in pending_tools:
placeholder = f"[AWAITING USER CONFIRMATION for {pt['name']}]"
response_parts.append(types.Part(function_response=types.FunctionResponse(
name=pt["name"], response={"result": placeholder}
)))
tool_call_log.append({"tool": pt["name"], "args": pt["args"], "result": "[awaiting confirmation]"})
contents.append(types.Content(role="user", parts=response_parts))
contents.append(types.Content(role="user", parts=response_parts))
if confirm_requested:
# Allow one more Gemini round to produce the confirmation-request message,
# then break — tool is not executed until user confirms in a follow-up.
conf_response = await asyncio.to_thread(
client.models.generate_content,
model=model_name or settings.orchestrator_model,
@@ -191,10 +349,30 @@ async def run(
gemini_summary = "".join(
p.text for p in conf_parts if hasattr(p, "text") and p.text
).strip() or "This action requires your explicit confirmation before it can proceed."
break
checkpoint = OrchestrateCheckpoint(
engine="gemini",
pre_fn_state=pre_fn_state,
executed_results=executed_results,
pending_tools=pending_tools,
tool_call_log=list(tool_call_log),
task=task,
system_prompt=system_prompt,
session_messages=session_messages,
model_name=model_name,
gemini_api_key=gemini_api_key,
respond_with_claude=respond_with_claude,
response_role=response_role,
user_role=user_role,
confirm_allow=confirm_allow,
confirm_deny=confirm_deny,
rounds_used=round_num + 2,
)
return gemini_summary, checkpoint
contents.append(types.Content(role="user", parts=response_parts))
else:
# Hit the round limit — use whatever Gemini produced last
logger.warning("Orchestrator hit max rounds (%d)", settings.orchestrator_max_rounds)
gemini_summary = (
f"Reached the tool iteration limit ({settings.orchestrator_max_rounds} rounds). "
@@ -202,21 +380,28 @@ async def run(
+ "\n\n".join(f"**{t['tool']}**: {t['result'][:500]}" for t in tool_call_log)
)
# --- Claude handoff ---
return gemini_summary, None
async def _claude_handoff(
task: str,
tool_call_log: list[dict],
gemini_summary: str,
system_prompt: str,
session_messages: list[dict] | None,
respond_with_claude: bool,
response_role: str,
) -> OrchestratorResult:
if respond_with_claude:
claude_prompt = _build_claude_prompt(task, tool_call_log, gemini_summary)
# Merge with session history so Claude has conversation context
messages = list(session_messages or [])
messages.append({"role": "user", "content": claude_prompt})
response_text, backend = await complete(
system_prompt=system_prompt,
messages=messages,
role=response_role,
)
else:
# Cron/background tasks: return Gemini's summary directly, no Claude call
response_text = gemini_summary or "No information gathered."
backend = "gemini"
@@ -242,12 +427,11 @@ def _build_task_prompt(task: str, session_messages: list[dict] | None) -> str:
if not session_messages:
return task
# Include last few turns for context (don't send the full history to keep tokens low)
recent = session_messages[-6:] # last 3 turns
recent = session_messages[-6:]
history_lines = []
for msg in recent:
label = "User" if msg["role"] == "user" else "Assistant"
history_lines.append(f"{label}: {msg['content'][:300]}") # truncate long messages
history_lines.append(f"{label}: {msg['content'][:300]}")
context = "\n".join(history_lines)
return f"<recent_conversation>\n{context}\n</recent_conversation>\n\nCurrent request: {task}"
@@ -265,7 +449,6 @@ def _build_claude_prompt(
parts.append("## Research gathered\n")
for tc in tool_calls:
parts.append(f"### {tc['tool']}({_format_args(tc['args'])})")
# Truncate very long results — Claude gets the gist
result = tc["result"]
if len(result) > 2000:
result = result[:2000] + "\n… [truncated]"

View File

@@ -15,10 +15,10 @@ import logging
import uuid
from datetime import datetime, timezone
from fastapi import APIRouter
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel
from auth_utils import get_user_gemini_key, get_user_role
from auth_utils import get_user_gemini_key, get_user_role, get_tool_policy
from config import settings
from context_loader import load_context
from persona import set_context, validate as validate_persona
@@ -31,12 +31,16 @@ router = APIRouter(prefix="/orchestrate", tags=["orchestrator"])
# ---------------------------------------------------------------------------
# In-memory job store
# Jobs are keyed by UUID. For this phase, memory is fine — jobs are short-lived.
# ---------------------------------------------------------------------------
_jobs: dict[str, dict] = {}
_jobs_lock = asyncio.Lock()
# Checkpoints are stored separately — they hold Python objects (types.Content, etc.)
# that can't be included in the JSON-serializable job dict.
_checkpoints: dict[str, orchestrator_engine.OrchestrateCheckpoint] = {}
_checkpoints_lock = asyncio.Lock()
# ---------------------------------------------------------------------------
# Request / response models
@@ -57,7 +61,7 @@ class OrchestrateRequest(BaseModel):
class OrchestrateResponse(BaseModel):
job_id: str
status: str # "queued" | "running" | "complete" | "error"
status: str # "queued" | "running" | "complete" | "error" | "awaiting_confirmation"
class JobStatusResponse(BaseModel):
@@ -72,6 +76,7 @@ class JobStatusResponse(BaseModel):
backend: str | None = None
gemini_summary: str | None = None
error: str | None = None
pending_confirmation: dict | None = None # {tools: [{name, args}], message: str}
# ---------------------------------------------------------------------------
@@ -85,7 +90,6 @@ async def orchestrate(req: OrchestrateRequest) -> OrchestrateResponse:
user, persona = validate_persona(req.user, req.persona)
set_context(user, persona)
except ValueError as e:
from fastapi import HTTPException
raise HTTPException(status_code=400, detail=str(e))
job_id = str(uuid.uuid4())
@@ -97,17 +101,19 @@ async def orchestrate(req: OrchestrateRequest) -> OrchestrateResponse:
"task": req.task,
"created_at": now,
"completed_at": None,
"session_id": None,
"response": None,
"tool_calls": None,
"backend": None,
"gemini_summary": None,
"error": None,
"pending_confirmation": None,
"_user": user,
}
async with _jobs_lock:
_jobs[job_id] = job
# Run in background — caller polls GET /orchestrate/{job_id}
asyncio.create_task(_run_job(job_id, req, user))
logger.info("Orchestrator job queued: %s%.80s", job_id, req.task)
return OrchestrateResponse(job_id=job_id, status="queued")
@@ -120,10 +126,9 @@ async def job_status(job_id: str) -> JobStatusResponse:
job = _jobs.get(job_id)
if job is None:
from fastapi import HTTPException
raise HTTPException(status_code=404, detail=f"Job {job_id} not found")
return JobStatusResponse(**job)
return JobStatusResponse(**{k: v for k, v in job.items() if not k.startswith("_")})
@router.get("", response_model=list[JobStatusResponse])
@@ -131,11 +136,55 @@ async def list_jobs() -> list[JobStatusResponse]:
"""List all jobs (most recent first). Useful for debugging."""
async with _jobs_lock:
jobs = sorted(_jobs.values(), key=lambda j: j["created_at"], reverse=True)
return [JobStatusResponse(**j) for j in jobs]
return [JobStatusResponse(**{k: v for k, v in j.items() if not k.startswith("_")}) for j in jobs]
@router.post("/{job_id}/confirm", response_model=OrchestrateResponse)
async def confirm_job(job_id: str) -> OrchestrateResponse:
"""Confirm a pending tool call — the blocked tool will execute and the job continues."""
async with _checkpoints_lock:
checkpoint = _checkpoints.pop(job_id, None)
if checkpoint is None:
raise HTTPException(status_code=404, detail="No pending confirmation for this job")
async with _jobs_lock:
job = _jobs.get(job_id)
if not job or job["status"] != "awaiting_confirmation":
raise HTTPException(status_code=409, detail="Job is not awaiting confirmation")
_jobs[job_id]["status"] = "running"
_jobs[job_id]["pending_confirmation"] = None
user = job.get("_user", "scott")
asyncio.create_task(_resume_job(job_id, checkpoint, confirmed=True, user=user))
logger.info("Orchestrator job %s confirmed — resuming", job_id)
return OrchestrateResponse(job_id=job_id, status="running")
@router.post("/{job_id}/deny", response_model=OrchestrateResponse)
async def deny_job(job_id: str) -> OrchestrateResponse:
"""Deny a pending tool call — the tool is skipped and the job produces a final response."""
async with _checkpoints_lock:
checkpoint = _checkpoints.pop(job_id, None)
if checkpoint is None:
raise HTTPException(status_code=404, detail="No pending confirmation for this job")
async with _jobs_lock:
job = _jobs.get(job_id)
if not job or job["status"] != "awaiting_confirmation":
raise HTTPException(status_code=409, detail="Job is not awaiting confirmation")
_jobs[job_id]["status"] = "running"
_jobs[job_id]["pending_confirmation"] = None
user = job.get("_user", "scott")
asyncio.create_task(_resume_job(job_id, checkpoint, confirmed=False, user=user))
logger.info("Orchestrator job %s denied — resuming with skip", job_id)
return OrchestrateResponse(job_id=job_id, status="running")
# ---------------------------------------------------------------------------
# Background runner
# Background runners
# ---------------------------------------------------------------------------
async def _run_job(job_id: str, req: OrchestrateRequest, user: str) -> None:
@@ -146,7 +195,6 @@ async def _run_job(job_id: str, req: OrchestrateRequest, user: str) -> None:
try:
from session_store import load as load_session, save as save_session, generate_session_id
# Load Inara's system prompt (same as the chat router does)
tier = req.tier or settings.default_tier
system_prompt = load_context(
tier,
@@ -155,16 +203,17 @@ async def _run_job(job_id: str, req: OrchestrateRequest, user: str) -> None:
include_short=req.include_short,
)
# Load session history if a session_id was provided
session_id = req.session_id or generate_session_id()
history = load_session(session_id)
session_messages = history or None
# Choose engine based on the orchestrator role in the model registry
orch_model = model_registry.get_model_for_role(user, "orchestrator")
user_role = get_user_role(user)
policy = get_tool_policy(user)
confirm_allow = set(policy.get("allow", []))
confirm_deny = set(policy.get("deny", []))
if orch_model and orch_model.get("type") == "local_openai":
result = await openai_orchestrator.run(
task=req.task,
@@ -173,10 +222,10 @@ async def _run_job(job_id: str, req: OrchestrateRequest, user: str) -> None:
model_cfg=orch_model,
respond_with_final=req.respond_with_claude,
user_role=user_role,
confirm_allow=confirm_allow,
confirm_deny=confirm_deny,
)
else:
# Use the API key embedded in the resolved model config (V2 registry with
# account_id), then fall back to the per-user key from auth.json, then .env.
gemini_key = (
(orch_model.get("api_key") if orch_model else None)
or get_user_gemini_key(user)
@@ -190,28 +239,31 @@ async def _run_job(job_id: str, req: OrchestrateRequest, user: str) -> None:
model_name=orch_model.get("model_name") if orch_model else None,
response_role=req.chat_role,
user_role=user_role,
confirm_allow=confirm_allow,
confirm_deny=confirm_deny,
)
# Save the turn to the session store so it survives a page refresh
history.append({"role": "user", "content": req.task})
history.append({"role": "assistant", "content": result.response})
save_session(session_id, history)
if result.checkpoint:
async with _checkpoints_lock:
_checkpoints[job_id] = result.checkpoint
async with _jobs_lock:
_jobs[job_id].update({
"status": "awaiting_confirmation",
"response": result.response,
"tool_calls": result.tool_calls,
"backend": result.backend,
"gemini_summary": result.gemini_summary,
"session_id": session_id,
"pending_confirmation": {
"tools": result.checkpoint.pending_tools,
"message": result.response,
},
})
logger.info("Orchestrator job %s awaiting confirmation — %d tool(s) blocked",
job_id, len(result.checkpoint.pending_tools))
return
from session_logger import log_turn
log_turn(session_id, req.task, result.response)
now = datetime.now(timezone.utc).isoformat()
async with _jobs_lock:
_jobs[job_id].update({
"status": "complete",
"completed_at": now,
"session_id": session_id,
"response": result.response,
"tool_calls": result.tool_calls,
"backend": result.backend,
"gemini_summary": result.gemini_summary,
})
logger.info("Orchestrator job complete: %s (%d tool calls)", job_id, len(result.tool_calls))
await _finalize_job(job_id, result, session_id, req.task, history)
except Exception as e:
logger.exception("Orchestrator job failed: %s", job_id)
@@ -222,3 +274,87 @@ async def _run_job(job_id: str, req: OrchestrateRequest, user: str) -> None:
"completed_at": now,
"error": str(e),
})
async def _resume_job(
job_id: str,
checkpoint: orchestrator_engine.OrchestrateCheckpoint,
confirmed: bool,
user: str,
) -> None:
"""Resume a job after the user confirms or denies a pending tool call."""
try:
if checkpoint.engine == "gemini":
result = await orchestrator_engine.resume(checkpoint, confirmed)
else:
result = await openai_orchestrator.resume(checkpoint, confirmed)
if result.checkpoint:
# Another confirmation needed (chained gates)
async with _checkpoints_lock:
_checkpoints[job_id] = result.checkpoint
async with _jobs_lock:
_jobs[job_id].update({
"status": "awaiting_confirmation",
"response": result.response,
"tool_calls": result.tool_calls,
"backend": result.backend,
"gemini_summary": result.gemini_summary,
"pending_confirmation": {
"tools": result.checkpoint.pending_tools,
"message": result.response,
},
})
logger.info("Orchestrator job %s awaiting another confirmation", job_id)
return
async with _jobs_lock:
session_id = _jobs[job_id].get("session_id") or ""
task = _jobs[job_id].get("task", "")
from session_store import load as load_session
history = load_session(session_id) if session_id else []
await _finalize_job(job_id, result, session_id, task, history)
except Exception as e:
logger.exception("Orchestrator resume failed: %s", job_id)
now = datetime.now(timezone.utc).isoformat()
async with _jobs_lock:
_jobs[job_id].update({
"status": "error",
"completed_at": now,
"error": str(e),
})
async def _finalize_job(
job_id: str,
result: orchestrator_engine.OrchestratorResult,
session_id: str,
task: str,
history: list,
) -> None:
"""Save session, log the turn, and mark the job complete."""
from session_store import save as save_session, generate_session_id
from session_logger import log_turn
if not session_id:
session_id = generate_session_id()
history.append({"role": "user", "content": task})
history.append({"role": "assistant", "content": result.response})
save_session(session_id, history)
log_turn(session_id, task, result.response)
now = datetime.now(timezone.utc).isoformat()
async with _jobs_lock:
_jobs[job_id].update({
"status": "complete",
"completed_at": now,
"session_id": session_id,
"response": result.response,
"tool_calls": result.tool_calls,
"backend": result.backend,
"gemini_summary": result.gemini_summary,
})
logger.info("Orchestrator job complete: %s (%d tool calls)", job_id, len(result.tool_calls))

View File

@@ -18,7 +18,8 @@ import jwt
from fastapi import APIRouter, Form, Request
from fastapi.responses import HTMLResponse, RedirectResponse
from auth_utils import COOKIE_NAME, decode_token, check_credentials, set_password, _read_auth, _write_auth
from auth_utils import COOKIE_NAME, decode_token, check_credentials, set_password, _read_auth, _write_auth, get_user_channels, get_tool_policy, save_tool_policy
from tools import CONFIRM_REQUIRED
from persona import list_user_personas
from config import settings as app_settings
@@ -73,6 +74,26 @@ def _settings_page(username: str, personas: list[str], back_persona: str = "", s
allowlist_text = ""
html = html.replace("{{ email_allowlist }}", allowlist_text)
# Notification channel settings
channels = get_user_channels(username)
notify_ch = _html.escape(channels.get("notification_channel", "") or "")
notify_email = _html.escape(channels.get("notification_email", "") or "")
nc_room = _html.escape((channels.get("nextcloud") or {}).get("notification_room", "") or "")
gc_webhook = _html.escape((channels.get("google_chat") or {}).get("outbound_webhook", "") or "")
html = html.replace("{{ notify_channel }}", notify_ch)
html = html.replace("{{ notify_email_override }}", notify_email)
html = html.replace("{{ nc_notify_room }}", nc_room)
html = html.replace("{{ gc_webhook }}", gc_webhook)
# Tool permission policy
policy = get_tool_policy(username)
tool_allow_text = _html.escape("\n".join(policy.get("allow", [])))
tool_deny_text = _html.escape("\n".join(policy.get("deny", [])))
confirm_tools_list = _html.escape(", ".join(sorted(CONFIRM_REQUIRED)))
html = html.replace("{{ tool_allow }}", tool_allow_text)
html = html.replace("{{ tool_deny }}", tool_deny_text)
html = html.replace("{{ confirm_required_tools }}", confirm_tools_list)
persona_items = "\n".join(
f'''<li>
<a href="/{username}/{p}" class="persona-link">{p}</a>
@@ -240,6 +261,78 @@ async def rename_persona(
return RedirectResponse("/settings", status_code=302)
@router.post("/settings/notifications", include_in_schema=False)
async def save_notifications(
request: Request,
notification_channel: str = Form(""),
notification_email: str = Form(""),
nc_notification_room: str = Form(""),
gc_outbound_webhook: str = Form(""),
):
username = _get_session_user(request)
if not username:
return RedirectResponse("/login", status_code=302)
personas = list_user_personas(username)
back_persona = _preferred_persona(request, username)
channels_path = app_settings.home_root() / username / "channels.json"
try:
channels = json.loads(channels_path.read_text())
except Exception:
channels = {}
# Top-level notification preference
notification_channel = notification_channel.strip()
if notification_channel in ("email", "nextcloud", "google_chat"):
channels["notification_channel"] = notification_channel
else:
channels.pop("notification_channel", None)
# Optional email address override (blank = use login email)
notification_email = notification_email.strip()
if notification_email:
channels["notification_email"] = notification_email
else:
channels.pop("notification_email", None)
# NC Talk notification room — nested under "nextcloud"
if "nextcloud" not in channels:
channels["nextcloud"] = {}
channels["nextcloud"]["notification_room"] = nc_notification_room.strip()
# Google Chat outbound webhook — nested under "google_chat"
if "google_chat" not in channels:
channels["google_chat"] = {}
channels["google_chat"]["outbound_webhook"] = gc_outbound_webhook.strip()
channels_path.write_text(json.dumps(channels, indent=2) + "\n")
logger.info("notifications updated for %s (channel=%s)", username, notification_channel or "none")
return HTMLResponse(_settings_page(username, personas, back_persona,
success="Notification settings saved."))
@router.post("/settings/tool-policy", include_in_schema=False)
async def save_tool_policy_route(
request: Request,
allow_list: str = Form(""),
deny_list: str = Form(""),
):
username = _get_session_user(request)
if not username:
return RedirectResponse("/login", status_code=302)
personas = list_user_personas(username)
back_persona = _preferred_persona(request, username)
allow_tools = [ln.strip() for ln in allow_list.splitlines() if ln.strip()]
deny_tools = [ln.strip() for ln in deny_list.splitlines() if ln.strip()]
save_tool_policy(username, {"allow": allow_tools, "deny": deny_tools})
logger.info("tool policy updated for %s (allow=%d deny=%d)", username, len(allow_tools), len(deny_tools))
return HTMLResponse(_settings_page(username, personas, back_persona,
success="Tool permission policy saved."))
@router.post("/settings/email-allowlist", include_in_schema=False)
async def save_email_allowlist(
request: Request,

View File

@@ -6,7 +6,7 @@
and are appended automatically by help.html when present.
-->
*Last updated: 2026-04-29*
*Last updated: 2026-04-30*
---
@@ -66,13 +66,13 @@ The ⚡ toggle is **independent of the Role selector** — you can use any role
| Web | `web_search`, `http_fetch` |
| Files | `file_read` ¹, `file_list` ¹, `file_write` ¹ ² |
| Shell | `shell_exec` ¹ ², `claude_allow_dir` ¹ |
| System | `cortex_restart` ¹ ², `cortex_logs` ¹ |
| System | `cortex_restart` ¹ ², `cortex_logs` ¹, `cortex_status` ¹, `cortex_update` ¹ ² |
| Tasks | `task_list`, `task_create`, `task_update`, `task_complete` |
| Cron | `cron_list`, `cron_add`, `cron_remove` ², `cron_toggle` |
| Reminders | `reminders_add`, `reminders_list`, `reminders_clear` ² |
| Reminders | `reminders_add`, `reminders_list`, `reminders_remove`, `reminders_clear` ² |
| Scratchpad | `scratch_read`, `scratch_write`, `scratch_append`, `scratch_clear` |
| Notifications | `nc_talk_send` ¹ |
| Aether Journals | `ae_journal_list`, `ae_journal_search`, `ae_journal_entry_create`, `ae_journal_entry_update`, `ae_journal_entry_disable`, `ae_journal_entry_append`, `ae_journal_entry_prepend` |
| Notifications | `email_send` ¹, `nc_talk_send` ¹ |
| Aether Journals | `ae_journal_list`, `ae_journal_search`, `ae_journal_entries_list`, `ae_journal_entry_read`, `ae_journal_entry_create`, `ae_journal_entry_update`, `ae_journal_entry_disable`, `ae_journal_entry_append`, `ae_journal_entry_prepend` |
| Aether Tasks | `ae_task_list` ¹ |
¹ **Admin only** — requires the `admin` role. These tools are invisible to regular users.
@@ -234,8 +234,8 @@ The **Files** button opens an editor for your persona's identity and memory file
| `MEMORY_LONG.md` | Permanent curated long-term memory |
| `MEMORY_MID.md` | Rolling mid-term digest (LLM-distilled) |
| `MEMORY_SHORT.md` | Recent session rollup (auto-aggregated) |
| `TASKS.json` | Personal task list (managed via Agent mode) |
| `HELP.md` | This file |
| `HELP.md` | This file — persona-specific additions appended below |
| `email_allowlist.json` | Regex patterns for permitted `email_send` recipients (one per line) |
Toggle **preview** / **edit** to switch between rendered markdown and raw text. **Ctrl+S** saves, **Esc** closes.

View File

@@ -1192,6 +1192,37 @@
: '⚡ working…';
continue;
}
if (job.status === 'awaiting_confirmation') {
const pc = job.pending_confirmation || {};
const toolNames = (pc.tools || []).map(t => t.name).join(', ');
thinkingDiv.className = 'message assistant';
thinkingDiv.innerHTML = `<div class="confirm-gate">
<p>${escapeHtml(pc.message || 'Confirm this action?')}</p>
<p class="confirm-tools">Tool${(pc.tools||[]).length !== 1 ? 's' : ''}: <code>${escapeHtml(toolNames)}</code></p>
<div class="confirm-actions">
<button class="confirm-btn">Confirm</button>
<button class="deny-btn">Deny</button>
</div>
</div>`;
const confirmed = await new Promise(resolve => {
thinkingDiv.querySelector('.confirm-btn').onclick = () => resolve(true);
thinkingDiv.querySelector('.deny-btn').onclick = () => resolve(false);
});
thinkingDiv.className = 'message assistant thinking';
thinkingDiv.textContent = confirmed ? '⚡ confirmed — continuing…' : '⚡ denied — finishing…';
const action = confirmed ? 'confirm' : 'deny';
const resumeRes = await fetch(`/orchestrate/${job_id}/${action}`, {
method: 'POST',
signal: activeController.signal,
});
if (!resumeRes.ok) throw new Error(`Resume failed: HTTP ${resumeRes.status}`);
continue;
}
break;
}

View File

@@ -344,6 +344,84 @@
</form>
</div>
<!-- Notifications -->
<div class="section">
<h2>Notifications</h2>
<p style="font-size:0.8rem; color:var(--pg-muted); margin-bottom:0.85rem; line-height:1.55;">
Choose how Inara reaches out proactively — cron jobs, briefs, and future alerts.
Email defaults to your login address when no override is set.
</p>
<form method="POST" action="/settings/notifications">
<div class="field">
<label for="notification_channel">Notification channel</label>
<select id="notification_channel" name="notification_channel"
data-value="{{ notify_channel }}"
style="width:100%; padding:0.65rem 0.85rem; background:var(--pg-bg);
border:1px solid var(--pg-border); border-radius:6px;
color:var(--pg-text); font-size:0.95rem; outline:none;
transition:border-color 0.15s;">
<option value="">None (disabled)</option>
<option value="email">Email</option>
<option value="nextcloud">Nextcloud Talk</option>
<option value="google_chat">Google Chat</option>
</select>
</div>
<div class="field">
<label for="notification_email">Email override
<span style="color:var(--pg-dim); font-weight:400;">(optional)</span>
</label>
<input type="email" id="notification_email" name="notification_email"
value="{{ notify_email_override }}"
placeholder="Leave blank to use login email"
autocomplete="off">
</div>
<div class="field">
<label for="nc_notification_room">Nextcloud Talk room token</label>
<input type="text" id="nc_notification_room" name="nc_notification_room"
value="{{ nc_notify_room }}"
placeholder="Token from the Talk room URL"
autocomplete="off" spellcheck="false">
</div>
<div class="field">
<label for="gc_outbound_webhook">Google Chat webhook URL</label>
<input type="url" id="gc_outbound_webhook" name="gc_outbound_webhook"
value="{{ gc_webhook }}"
placeholder="https://chat.googleapis.com/v1/spaces/…"
autocomplete="off" spellcheck="false">
</div>
<button type="submit">Save notification settings</button>
</form>
</div>
<!-- Tool Permissions -->
<div class="section">
<h2>Tool Permissions</h2>
<p style="font-size:0.8rem; color:var(--pg-muted); margin-bottom:0.5rem; line-height:1.55;">
Override the default confirmation gate for orchestrator tools.
<strong>Allow list</strong> — tools that run without asking for confirmation.
<strong>Deny list</strong> — tools that are always blocked for your account.
One tool name per line.
</p>
<p style="font-size:0.78rem; color:var(--pg-muted); margin-bottom:0.85rem;">
Tools requiring confirmation by default: <code>{{ confirm_required_tools }}</code>
</p>
<form method="POST" action="/settings/tool-policy">
<div class="form-group">
<label for="allow_list">Allow list (bypass confirmation)</label>
<textarea id="allow_list" name="allow_list" rows="3"
placeholder="reminders_clear&#10;cron_remove"
autocomplete="off" spellcheck="false">{{ tool_allow }}</textarea>
</div>
<div class="form-group">
<label for="deny_list">Deny list (always block)</label>
<textarea id="deny_list" name="deny_list" rows="3"
placeholder="shell_exec&#10;file_write"
autocomplete="off" spellcheck="false">{{ tool_deny }}</textarea>
</div>
<button type="submit">Save tool permissions</button>
</form>
</div>
<!-- Browser cache -->
<div class="section">
<h2>Browser Cache</h2>
@@ -411,6 +489,12 @@
</div>
<script>
// Restore notification channel dropdown from injected value
(function() {
const sel = document.getElementById('notification_channel');
if (sel) sel.value = sel.dataset.value || '';
})();
// Password confirmation check
document.getElementById('password-form').addEventListener('submit', e => {
const np = document.getElementById('new_password').value;

View File

@@ -546,6 +546,25 @@
.message.thinking { color: var(--muted); font-style: italic; }
/* Confirmation gate */
.confirm-gate { display: flex; flex-direction: column; gap: 0.6rem; }
.confirm-gate p { margin: 0; }
.confirm-tools { font-size: 0.82rem; color: var(--muted); }
.confirm-actions { display: flex; gap: 0.5rem; margin-top: 0.25rem; }
.confirm-btn, .deny-btn {
padding: 0.35rem 0.9rem;
border-radius: 6px;
border: none;
font-size: 0.85rem;
font-weight: 600;
cursor: pointer;
transition: opacity 0.15s;
}
.confirm-btn { background: #16a34a; color: #fff; }
.confirm-btn:hover { opacity: 0.85; }
.deny-btn { background: var(--surface); border: 1px solid var(--border); color: var(--text); }
.deny-btn:hover { border-color: var(--muted); }
/* Copy button */
.message.assistant, .message.user { position: relative; }

View File

@@ -18,6 +18,8 @@ from google.genai import types
from tools.web import search as _web_search
from tools.ae_knowledge import journal_search as _ae_journal_search
from tools.ae_knowledge import journal_list as _ae_journal_list
from tools.ae_knowledge import journal_entry_read as _ae_journal_entry_read
from tools.ae_knowledge import journal_entries_list as _ae_journal_entries_list
from tools.ae_knowledge import journal_entry_create as _ae_journal_entry_create
from tools.ae_knowledge import journal_entry_update as _ae_journal_entry_update
from tools.ae_knowledge import journal_entry_disable as _ae_journal_entry_disable
@@ -37,6 +39,7 @@ from tools.cron import (
from tools.reminders import (
reminders_add as _reminders_add,
reminders_list as _reminders_list,
reminders_remove as _reminders_remove,
reminders_clear as _reminders_clear,
)
from tools.scratch import (
@@ -45,7 +48,12 @@ from tools.scratch import (
scratch_append as _scratch_append,
scratch_clear as _scratch_clear,
)
from tools.system import cortex_restart as _cortex_restart, cortex_logs as _cortex_logs
from tools.system import (
cortex_restart as _cortex_restart,
cortex_logs as _cortex_logs,
cortex_status as _cortex_status,
cortex_update as _cortex_update,
)
from tools.web import http_fetch as _http_fetch
from tools.files import file_list as _file_list, file_write as _file_write
from tools.notify import nc_talk_send as _nc_talk_send, email_send as _email_send
@@ -91,8 +99,9 @@ _ae_journal_list_declaration = types.FunctionDeclaration(
_ae_journal_search_declaration = types.FunctionDeclaration(
name="ae_journal_search",
description=(
"Search the Aether Journals knowledge base by keyword. "
"Use this to look up notes, documentation, meeting summaries, or any saved knowledge. "
"Search Aether Journal entries. All parameters are optional — combine freely. "
"Use 'query' for fulltext keyword search (supports boolean: +required -excluded \"phrase\"). "
"Use 'tags' to filter by tag substring. Use 'date_from'/'date_to' for date ranges (YYYY-MM-DD). "
"Always search before creating a new entry to avoid duplicates."
),
parameters=types.Schema(
@@ -100,21 +109,109 @@ _ae_journal_search_declaration = types.FunctionDeclaration(
properties={
"query": types.Schema(
type=types.Type.STRING,
description="Keyword or phrase to search for",
description="Fulltext keyword search. Supports boolean mode: +required -excluded \"exact phrase\".",
),
"journal_id": types.Schema(
type=types.Type.STRING,
description=(
"Optional: scope search to a specific journal by its id_random. "
"Omit to search all journals."
),
description="Scope results to a specific journal by its id_random. Omit to search all journals.",
),
"tags": types.Schema(
type=types.Type.STRING,
description="Filter by tag substring (e.g. 'networking' matches entries tagged 'networking' or 'home-networking').",
),
"type_code": types.Schema(
type=types.Type.STRING,
description="Filter by exact type_code (e.g. 'note', 'meeting', 'log').",
),
"topic_code": types.Schema(
type=types.Type.STRING,
description="Filter by exact topic_code.",
),
"date_from": types.Schema(
type=types.Type.STRING,
description="Return entries created on or after this date (YYYY-MM-DD).",
),
"date_to": types.Schema(
type=types.Type.STRING,
description="Return entries created on or before this date (YYYY-MM-DD).",
),
"sort_by": types.Schema(
type=types.Type.STRING,
description="Sort field: 'updated' (default), 'created', 'name', or 'priority'.",
),
"sort_order": types.Schema(
type=types.Type.STRING,
description="Sort direction: 'desc' (default, newest first) or 'asc'.",
),
"status": types.Schema(
type=types.Type.INTEGER,
description="Filter by exact status code.",
),
"priority": types.Schema(
type=types.Type.INTEGER,
description="Filter by exact priority (1=low, 5=high).",
),
"max_results": types.Schema(
type=types.Type.INTEGER,
description="Maximum number of entries to return (default 10)",
description="Number of results per page (default 10).",
),
"page": types.Schema(
type=types.Type.INTEGER,
description="Page number for pagination (default 1).",
),
},
required=["query"],
required=[],
),
)
_ae_journal_entry_read_declaration = types.FunctionDeclaration(
name="ae_journal_entry_read",
description=(
"Fetch the full content of a single journal entry by its id_random. "
"Use this when you need to read an entry before editing it, or when search results "
"don't show enough content. Returns title, journal, tags, summary, and full content."
),
parameters=types.Schema(
type=types.Type.OBJECT,
properties={
"entry_id": types.Schema(
type=types.Type.STRING,
description="The id_random of the journal entry to read.",
),
"max_content_chars": types.Schema(
type=types.Type.INTEGER,
description="Maximum characters of content to return (default 4000). Increase for long entries.",
),
},
required=["entry_id"],
),
)
_ae_journal_entries_list_declaration = types.FunctionDeclaration(
name="ae_journal_entries_list",
description=(
"List entries in a specific journal, newest first. "
"Use this to browse what's in a journal when you don't have a search keyword, "
"or to find entries by browsing rather than searching. "
"Returns numbered entries with id, title, tags, summary, and date."
),
parameters=types.Schema(
type=types.Type.OBJECT,
properties={
"journal_id": types.Schema(
type=types.Type.STRING,
description="The id_random of the journal to list entries from.",
),
"max_results": types.Schema(
type=types.Type.INTEGER,
description="Number of entries to return (default 20, max 50).",
),
"page": types.Schema(
type=types.Type.INTEGER,
description="Page number for pagination (default 1).",
),
},
required=["journal_id"],
),
)
@@ -282,6 +379,8 @@ _CALLABLES: dict[str, callable] = {
"web_search": _web_search,
"ae_journal_list": _ae_journal_list,
"ae_journal_search": _ae_journal_search,
"ae_journal_entry_read": _ae_journal_entry_read,
"ae_journal_entries_list": _ae_journal_entries_list,
"ae_journal_entry_create": _ae_journal_entry_create,
"ae_journal_entry_update": _ae_journal_entry_update,
"ae_journal_entry_disable": _ae_journal_entry_disable,
@@ -295,6 +394,8 @@ _CALLABLES: dict[str, callable] = {
"shell_exec": _shell_exec,
"cortex_restart": _cortex_restart,
"cortex_logs": _cortex_logs,
"cortex_status": _cortex_status,
"cortex_update": _cortex_update,
"http_fetch": _http_fetch,
"email_send": _email_send,
"nc_talk_send": _nc_talk_send,
@@ -308,6 +409,7 @@ _CALLABLES: dict[str, callable] = {
"cron_toggle": _cron_toggle,
"reminders_add": _reminders_add,
"reminders_list": _reminders_list,
"reminders_remove": _reminders_remove,
"reminders_clear": _reminders_clear,
"scratch_read": _scratch_read,
"scratch_write": _scratch_write,
@@ -584,6 +686,24 @@ _reminders_list_declaration = types.FunctionDeclaration(
parameters=types.Schema(type=types.Type.OBJECT, properties={}),
)
_reminders_remove_declaration = types.FunctionDeclaration(
name="reminders_remove",
description=(
"Remove a single reminder by its number. "
"Call reminders_list first to get the numbered list, then pass the number of the reminder to remove."
),
parameters=types.Schema(
type=types.Type.OBJECT,
properties={
"index": types.Schema(
type=types.Type.INTEGER,
description="The number of the reminder to remove (1 = first item in reminders_list output).",
),
},
required=["index"],
),
)
_reminders_clear_declaration = types.FunctionDeclaration(
name="reminders_clear",
description=(
@@ -680,6 +800,28 @@ _cortex_logs_declaration = types.FunctionDeclaration(
),
)
_cortex_status_declaration = types.FunctionDeclaration(
name="cortex_status",
description=(
"Return Cortex service status: current git branch and commit, how many commits "
"ahead/behind the remote, and the systemctl service state. "
"Use to check what version is running or whether the service is healthy. "
"ADMIN ONLY."
),
parameters=types.Schema(type=types.Type.OBJECT, properties={}),
)
_cortex_update_declaration = types.FunctionDeclaration(
name="cortex_update",
description=(
"Pull the latest code from git, run a syntax check on all Python files, and report "
"what changed. Does NOT restart automatically — call cortex_restart separately after "
"reviewing the output. Will report syntax errors if the pull introduces broken code. "
"ADMIN ONLY. Requires confirmation."
),
parameters=types.Schema(type=types.Type.OBJECT, properties={}),
)
_http_fetch_declaration = types.FunctionDeclaration(
name="http_fetch",
description=(
@@ -806,6 +948,8 @@ TOOL_ROLES: dict[str, str] = {
"claude_allow_dir": "admin",
"cortex_restart": "admin",
"cortex_logs": "admin",
"cortex_status": "admin",
"cortex_update": "admin",
"file_read": "admin",
"file_list": "admin",
"file_write": "admin",
@@ -819,6 +963,7 @@ TOOL_ROLES: dict[str, str] = {
# the tool, prompting Claude to ask the user to confirm in a follow-up message.
CONFIRM_REQUIRED: set[str] = {
"cortex_restart",
"cortex_update",
"file_write",
"shell_exec",
"cron_remove",
@@ -838,6 +983,8 @@ _ALL_DECLARATIONS: list[types.FunctionDeclaration] = [
_web_search_declaration,
_ae_journal_list_declaration,
_ae_journal_search_declaration,
_ae_journal_entry_read_declaration,
_ae_journal_entries_list_declaration,
_ae_journal_entry_create_declaration,
_ae_journal_entry_update_declaration,
_ae_journal_entry_disable_declaration,
@@ -851,6 +998,8 @@ _ALL_DECLARATIONS: list[types.FunctionDeclaration] = [
_shell_exec_declaration,
_cortex_restart_declaration,
_cortex_logs_declaration,
_cortex_status_declaration,
_cortex_update_declaration,
_http_fetch_declaration,
_email_send_declaration,
_nc_talk_send_declaration,
@@ -864,6 +1013,7 @@ _ALL_DECLARATIONS: list[types.FunctionDeclaration] = [
_cron_toggle_declaration,
_reminders_add_declaration,
_reminders_list_declaration,
_reminders_remove_declaration,
_reminders_clear_declaration,
_scratch_read_declaration,
_scratch_write_declaration,

View File

@@ -41,36 +41,98 @@ def _check_config() -> str | None:
# Tool: ae_journal_search
# ---------------------------------------------------------------------------
async def journal_search(query: str, journal_id: str | None = None, max_results: int = 10) -> str:
"""Search AE Journal entries by keyword.
async def journal_search(
query: str = "",
journal_id: str = "",
tags: str = "",
type_code: str = "",
topic_code: str = "",
date_from: str = "",
date_to: str = "",
sort_by: str = "updated",
sort_order: str = "desc",
status: int | None = None,
priority: int | None = None,
max_results: int = 10,
page: int = 1,
) -> str:
"""Search AE Journal entries.
Searches across the default_qry_str field (title + content excerpt).
Optionally scoped to a specific journal by journal_id (id_random).
Returns a markdown-formatted list of matching entries.
At least one of query, tags, type_code, topic_code, date_from, or journal_id
should be provided. All filters combine with AND.
"""
err = _check_config()
if err:
return err
return await asyncio.to_thread(_sync_journal_search, query, journal_id, max_results)
return await asyncio.to_thread(
_sync_journal_search,
query, journal_id, tags, type_code, topic_code,
date_from, date_to, sort_by, sort_order,
status, priority, max_results, page,
)
def _sync_journal_search(query: str, journal_id: str | None, max_results: int) -> str:
def _sync_journal_search(
query: str,
journal_id: str,
tags: str,
type_code: str,
topic_code: str,
date_from: str,
date_to: str,
sort_by: str,
sort_order: str,
status: int | None,
priority: int | None,
max_results: int,
page: int,
) -> str:
import requests
url = f"{settings.ae_api_url}/v3/crud/journal_entry/search"
search_body = {
"and_filters": [
{"field": "default_qry_str", "op": "icontains", "value": query}
],
"page_size": max_results,
# Build sort field
sort_field_map = {
"updated": "updated_on",
"created": "created_on",
"name": "name",
"priority": "priority",
}
sort_field = sort_field_map.get(sort_by, "updated_on")
order_by = f"{'-' if sort_order == 'desc' else ''}{sort_field}"
params = {}
search_body: dict = {"page_size": max_results, "page": page, "order_by": order_by}
# Fulltext keyword — uses MATCH/AGAINST index
if query:
search_body["query_string"] = query
# Additional AND filters
and_filters: list[dict] = []
if tags:
and_filters.append({"field": "tags", "op": "icontains", "value": tags})
if type_code:
and_filters.append({"field": "type_code", "op": "eq", "value": type_code})
if topic_code:
and_filters.append({"field": "topic_code", "op": "eq", "value": topic_code})
if date_from:
and_filters.append({"field": "created_on", "op": "gte", "value": date_from})
if date_to:
and_filters.append({"field": "created_on", "op": "lte", "value": date_to})
if status is not None:
and_filters.append({"field": "status", "op": "eq", "value": status})
if priority is not None:
and_filters.append({"field": "priority", "op": "eq", "value": priority})
if and_filters:
search_body["and"] = and_filters
# query_string must be present for `and` filters to apply
if "query_string" not in search_body:
search_body["query_string"] = "%"
params: dict = {}
if journal_id:
params["for_obj_type"] = "journal"
params["for_obj_id"] = journal_id
url = f"{settings.ae_api_url}/v3/crud/journal_entry/search"
try:
resp = requests.post(
url,
@@ -86,30 +148,43 @@ def _sync_journal_search(query: str, journal_id: str | None, max_results: int) -
return f"Journal search error: {e}"
entries = data.get("data", [])
if not entries:
return f"No journal entries found matching: {query}"
total = (data.get("meta") or {}).get("data_list_count") or len(entries)
if not entries:
desc = query or tags or type_code or topic_code or f"journal {journal_id}"
return f"No journal entries found for: {desc}"
label = query or tags or f"{len(entries)} entries"
lines = [f"Journal entries — **{label}** ({total} total, page {page}):\n"]
lines = [f"Journal entries matching **{query}** ({len(entries)} result(s)):\n"]
for entry in entries:
title = entry.get("name") or "(untitled)"
entry_id = entry.get("id_random", "")
title = entry.get("name") or "(untitled)"
entry_id = entry.get("journal_entry_id") or entry.get("id") or ""
journal_name = entry.get("journal_name") or entry.get("parent_name") or ""
summary = entry.get("summary") or ""
content_preview = (entry.get("content") or "")[:200].replace("\n", " ")
summary = entry.get("summary") or ""
entry_tags = entry.get("tags") or []
updated = (entry.get("updated_on") or entry.get("created_on") or "")[:10]
content_preview = (entry.get("content") or "")[:400].replace("\n", " ")
header = f"**{title}**"
if journal_name:
header += f" ({journal_name})"
if entry_id:
header += f" — id: `{entry_id}`"
header += f" — id: `{entry_id}`"
if updated:
header += f" [{updated}]"
lines.append(header)
if entry_tags:
tag_list = entry_tags if isinstance(entry_tags, list) else [t.strip() for t in str(entry_tags).split(",")]
lines.append(f" Tags: {', '.join(tag_list)}")
if summary:
lines.append(f" Summary: {summary}")
if content_preview:
lines.append(f" {content_preview}")
lines.append(f" {summary}")
elif content_preview:
lines.append(f" {content_preview}{'' if len(entry.get('content', '')) > 400 else ''}")
lines.append("")
if total > page * max_results:
lines.append(f"(More results — call again with page={page + 1})")
return "\n".join(lines).strip()
@@ -264,6 +339,127 @@ def _patch_entry(entry_id: str, payload: dict) -> str:
return f"Error updating entry {entry_id}: {e}"
# ---------------------------------------------------------------------------
# Tool: ae_journal_entry_read
# ---------------------------------------------------------------------------
async def journal_entry_read(entry_id: str, max_content_chars: int = 4000) -> str:
"""Return the full content of a single journal entry by its id_random."""
err = _check_config()
if err:
return err
return await asyncio.to_thread(_sync_journal_entry_read, entry_id, max_content_chars)
def _sync_journal_entry_read(entry_id: str, max_content_chars: int) -> str:
entry = _get_entry(entry_id)
if isinstance(entry, str):
return entry
title = entry.get("name") or "(untitled)"
journal = entry.get("journal_name") or entry.get("parent_name") or ""
summary = entry.get("summary") or ""
raw_tags = entry.get("tags") or []
tags = raw_tags if isinstance(raw_tags, list) else [t.strip() for t in str(raw_tags).split(",") if t.strip()]
content = entry.get("content") or ""
updated = (entry.get("updated_on") or entry.get("created_on") or "")[:19].replace("T", " ")
enabled = entry.get("enable", True)
lines = [f"# {title}"]
meta: list[str] = [f"id: `{entry_id}`"]
if journal:
meta.append(f"journal: {journal}")
if updated:
meta.append(f"updated: {updated}")
if not enabled:
meta.append("**DISABLED**")
lines.append(" ".join(meta))
if tags:
lines.append(f"Tags: {', '.join(tags)}")
if summary:
lines.append(f"\nSummary: {summary}")
lines.append("\n---\n")
truncated = len(content) > max_content_chars
lines.append(content[:max_content_chars])
if truncated:
lines.append(
f"\n\n[Content truncated at {max_content_chars} chars — "
f"{len(content)} total. Call again with a higher max_content_chars to read more.]"
)
return "\n".join(lines)
# ---------------------------------------------------------------------------
# Tool: ae_journal_entries_list
# ---------------------------------------------------------------------------
async def journal_entries_list(journal_id: str, max_results: int = 20, page: int = 1) -> str:
"""List entries in a specific journal, newest first."""
err = _check_config()
if err:
return err
return await asyncio.to_thread(_sync_journal_entries_list, journal_id, max_results, page)
def _sync_journal_entries_list(journal_id: str, max_results: int, page: int) -> str:
import requests
url = f"{settings.ae_api_url}/v3/crud/journal_entry/search"
search_body: dict = {
"page_size": max_results,
"page": page,
"order_by": "-updated_on",
}
params = {"for_obj_type": "journal", "for_obj_id": journal_id}
try:
resp = requests.post(
url,
headers=_headers(),
params=params,
json=search_body,
timeout=settings.ae_api_timeout,
)
resp.raise_for_status()
data = resp.json()
except Exception as e:
logger.warning("ae_journal_entries_list failed: %s", e)
return f"Journal entries list error: {e}"
entries = data.get("data", [])
total = (data.get("meta") or {}).get("data_list_count") or len(entries)
if not entries:
return f"No entries found in journal `{journal_id}`."
offset = (page - 1) * max_results + 1
lines = [f"Entries in journal `{journal_id}` — showing {offset}{offset + len(entries) - 1} of {total}:\n"]
for i, entry in enumerate(entries, offset):
title = entry.get("name") or "(untitled)"
entry_id = entry.get("journal_entry_id") or entry.get("id") or ""
raw_tags = entry.get("tags") or []
tags = raw_tags if isinstance(raw_tags, list) else [t.strip() for t in str(raw_tags).split(",") if t.strip()]
summary = entry.get("summary") or ""
updated = (entry.get("updated_on") or entry.get("created_on") or "")[:10]
enabled = entry.get("enable", True)
status = "" if enabled else " [disabled]"
date_str = f" [{updated}]" if updated else ""
lines.append(f"{i}. **{title}**{status} — id: `{entry_id}`{date_str}")
if tags:
lines.append(f" Tags: {', '.join(tags)}")
if summary:
lines.append(f" {summary[:150]}{'' if len(summary) > 150 else ''}")
lines.append("")
if total > offset + len(entries) - 1:
lines.append(f"(More entries available — call again with page={page + 1})")
return "\n".join(lines).rstrip()
# ---------------------------------------------------------------------------
# Tool: ae_journal_entry_update
# ---------------------------------------------------------------------------

View File

@@ -27,6 +27,28 @@ def _now_label() -> str:
return datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M UTC")
def _parse_sections(text: str) -> list[tuple[str, str]]:
"""Split REMINDERS.md into (heading, body) tuples, one per ## section."""
sections: list[tuple[str, str]] = []
heading: str | None = None
body_lines: list[str] = []
for line in text.splitlines():
if line.startswith("## "):
if heading is not None:
sections.append((heading, "\n".join(body_lines).strip()))
heading = line[3:].strip()
body_lines = []
elif heading is not None:
body_lines.append(line)
if heading is not None:
sections.append((heading, "\n".join(body_lines).strip()))
return sections
def _sections_to_text(sections: list[tuple[str, str]]) -> str:
return "".join(f"\n## {h}\n\n{b}\n" for h, b in sections)
# ---------------------------------------------------------------------------
# Sync implementations
# ---------------------------------------------------------------------------
@@ -35,7 +57,18 @@ def _reminders_list() -> str:
p = _reminders_path()
if not p.exists() or not p.read_text().strip():
return "No pending reminders."
return p.read_text()
sections = _parse_sections(p.read_text())
if not sections:
return "No pending reminders."
lines = []
for i, (heading, body) in enumerate(sections, 1):
lines.append(f"{i}. {heading}")
if body:
# Indent body so it reads as belonging to the numbered item
for bline in body.splitlines()[:4]: # cap at 4 lines for brevity
lines.append(f" {bline}")
lines.append("")
return "\n".join(lines).rstrip()
def _reminders_add(text: str, label: str | None = None) -> str:
@@ -47,6 +80,26 @@ def _reminders_add(text: str, label: str | None = None) -> str:
return f"Reminder added: {heading}"
def _reminders_remove(index: int) -> str:
p = _reminders_path()
if not p.exists() or not p.read_text().strip():
return "No reminders to remove."
sections = _parse_sections(p.read_text())
if not sections:
return "No reminders to remove."
if index < 1 or index > len(sections):
return (
f"Index {index} is out of range. "
f"There {'is' if len(sections) == 1 else 'are'} {len(sections)} "
f"reminder{'s' if len(sections) != 1 else ''} (1{len(sections)}). "
"Call reminders_list to see them."
)
removed_heading = sections[index - 1][0]
sections.pop(index - 1)
p.write_text(_sections_to_text(sections))
return f"Removed reminder {index}: {removed_heading}"
def _reminders_clear() -> str:
p = _reminders_path()
p.write_text("")
@@ -65,5 +118,9 @@ async def reminders_add(text: str, label: str | None = None) -> str:
return await asyncio.to_thread(_reminders_add, text, label)
async def reminders_remove(index: int) -> str:
return await asyncio.to_thread(_reminders_remove, index)
async def reminders_clear() -> str:
return await asyncio.to_thread(_reminders_clear)

View File

@@ -2,16 +2,21 @@
System tools — local machine operations.
These tools affect the host system directly. Use with care.
cortex_restart and cortex_logs require admin role.
All tools in this module require the admin role.
"""
import asyncio
import logging
import os
import subprocess
from pathlib import Path
logger = logging.getLogger(__name__)
# Absolute paths — resolved relative to this file so they work regardless of cwd
_CORTEX_DIR = Path(__file__).parent # .../Cortex_and_Inara_dev/cortex/
_PROJECT_ROOT = _CORTEX_DIR.parent # .../Cortex_and_Inara_dev/
ALLOW_SCRIPT = "/home/scott/.local/bin/claude-allow-dir"
@@ -124,3 +129,120 @@ async def cortex_logs(lines: int = 50) -> str:
except Exception as e:
logger.error("cortex_logs error: %s", e)
return f"Error: {e}"
async def cortex_status() -> str:
"""Return Cortex service status: git branch/commit, ahead/behind remote, and systemctl state."""
lines = []
async def _git(*args: str) -> str:
proc = await asyncio.create_subprocess_exec(
"git", "-C", str(_PROJECT_ROOT), *args,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.DEVNULL,
)
stdout, _ = await asyncio.wait_for(proc.communicate(), timeout=10)
return stdout.decode(errors="replace").strip()
try:
branch = await _git("rev-parse", "--abbrev-ref", "HEAD")
commit = await _git("log", "--oneline", "-1")
# fetch quietly so ahead/behind is current
await asyncio.create_subprocess_exec(
"git", "-C", str(_PROJECT_ROOT), "fetch", "--quiet",
stdout=asyncio.subprocess.DEVNULL, stderr=asyncio.subprocess.DEVNULL,
)
ahead_behind = await _git("rev-list", "--left-right", "--count", f"HEAD...origin/{branch}")
ahead, behind = (ahead_behind.split() + ["?", "?"])[:2]
lines.append(f"**Branch:** {branch} | ahead {ahead} / behind {behind}")
lines.append(f"**Commit:** {commit}")
except Exception as e:
lines.append(f"Git info unavailable: {e}")
lines.append("")
try:
proc = await asyncio.create_subprocess_exec(
"systemctl", "--user", "status", "cortex", "--no-pager", "-l",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.STDOUT,
)
stdout, _ = await asyncio.wait_for(proc.communicate(), timeout=10)
# First 15 lines of systemctl output is enough — avoids log flood
status_lines = stdout.decode(errors="replace").splitlines()[:15]
lines.extend(status_lines)
except Exception as e:
lines.append(f"systemctl status unavailable: {e}")
return "\n".join(lines)
async def cortex_update() -> str:
"""Pull the latest code from git, syntax-check all Python files, and report.
Does NOT restart automatically — call cortex_restart separately after reviewing
the output if you want to apply changes.
"""
lines = []
async def _run(*cmd: str, cwd: Path = _PROJECT_ROOT, timeout: int = 30) -> tuple[int, str]:
proc = await asyncio.create_subprocess_exec(
*cmd, cwd=str(cwd),
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.STDOUT,
)
stdout, _ = await asyncio.wait_for(proc.communicate(), timeout=timeout)
return proc.returncode, stdout.decode(errors="replace").strip()
# 1. Check for incoming commits before pulling
try:
await _run("git", "fetch", "--quiet")
rc, incoming = await _run("git", "log", "--oneline", "HEAD..origin/HEAD")
if rc == 0 and not incoming:
# Double-check with branch name in case origin/HEAD isn't set
branch_rc, branch = await _run("git", "rev-parse", "--abbrev-ref", "HEAD")
_, incoming = await _run("git", "log", "--oneline", f"HEAD..origin/{branch.strip()}")
except asyncio.TimeoutError:
return "Error: git fetch timed out — check network connectivity."
except Exception as e:
return f"Error during git fetch: {e}"
if not incoming:
rc2, current = await _run("git", "log", "--oneline", "-1")
return f"Already up to date.\n\nCurrent commit: {current}"
lines.append(f"**Incoming commits:**\n{incoming}\n")
# 2. Pull
try:
rc, pull_out = await _run("git", "pull", "--ff-only")
except asyncio.TimeoutError:
return "Error: git pull timed out."
except Exception as e:
return f"Error during git pull: {e}"
if rc != 0:
return f"git pull failed (exit {rc}):\n{pull_out}"
lines.append(f"**git pull:**\n{pull_out}\n")
# 3. Syntax check all Python files under cortex/
py_files = sorted(_CORTEX_DIR.rglob("*.py"))
errors = []
for f in py_files:
result = subprocess.run(
["python3", "-m", "py_compile", str(f)],
capture_output=True, text=True,
)
if result.returncode != 0:
errors.append(f" {f.relative_to(_PROJECT_ROOT)}: {result.stderr.strip()}")
if errors:
lines.append(f"**Syntax errors — do NOT restart until fixed:**")
lines.extend(errors)
else:
lines.append(f"**Syntax check:** {len(py_files)} files — all OK.")
lines.append("Call `cortex_restart` to apply the update.")
return "\n".join(lines)

View File

@@ -0,0 +1,226 @@
# Aether Platform Integration — Cortex Tool Layer
> Last updated: 2026-04-30
> Status: Journal toolset complete — broader AE integration planned
This doc covers how Cortex/Inara integrates with the Aether Platform API, what's
implemented, what the data model looks like, and what's planned next.
---
## Overview
Cortex connects to the Aether Platform V3 API to give the orchestrator read/write
access to the user's knowledge base (Journals) and task data. Auth uses the same
`x-aether-api-key` + `x-account-id` headers as every other Aether client.
Config lives in `.env`:
```
AE_API_URL=https://dev-api.oneskyit.com
AE_API_KEY=...
AE_ACCOUNT_ID=...
AE_API_TIMEOUT=15
```
Tool implementation: `cortex/tools/ae_knowledge.py`
Tool registrations: `cortex/tools/__init__.py`
---
## V3 Search Engine
### Endpoint
```
POST /v3/crud/{obj_type}/search
```
For nested objects (journal_entry scoped to a journal):
```
POST /v3/crud/journal_entry/search
?for_obj_type=journal&for_obj_id={journal_id}
```
### Search body
```json
{
"query_string": "fulltext search term",
"and": [
{ "field": "tags", "op": "icontains", "value": "networking" },
{ "field": "created_on", "op": "gte", "value": "2026-01-01" }
],
"or": [...],
"page_size": 20,
"page": 1,
"order_by": "-updated_on"
}
```
**`query_string` vs `and` filters on `default_qry_str`:**
- `query_string` → triggers `MATCH(default_qry_str) AGAINST(... IN BOOLEAN MODE)` — uses the
FULLTEXT index. Faster and supports boolean operators (`+word`, `-word`, `"phrase"`).
- `and` with `icontains` on `default_qry_str` → plain `LIKE '%term%'`. Slower, no index.
**Important:** `query_string` must be present for `and`/`or` filters to apply. When using
filters without a keyword query, pass `query_string: "%"` as a wildcard to activate the
filter path without restricting by keyword.
### Supported operators
| Operator | SQL | Notes |
|---|---|---|
| `eq` | `=` | exact match |
| `ne` | `!=` | not equal |
| `gt` / `gte` | `>` / `>=` | numeric, dates |
| `lt` / `lte` | `<` / `<=` | numeric, dates |
| `contains` / `icontains` | `LIKE '%v%'` | substring; both case-insensitive on MariaDB |
| `startswith` / `istartswith` | `LIKE 'v%'` | |
| `endswith` / `iendswith` | `LIKE '%v'` | |
| `like` | `LIKE` | raw LIKE pattern |
| `in` | `IN (...)` | value is a list |
| `is_null` / `is_not_null` | `IS NULL` / `IS NOT NULL` | no value needed |
### Sorting
`order_by` accepts any indexed field name. Prefix with `-` for descending:
- `-updated_on` (default for listing)
- `-created_on`
- `name`
- `-priority`
### Pagination
`page_size` (default 10, max ~100) + `page` (1-based).
Total count is in `response["meta"]["data_list_count"]` — not a top-level key.
---
## journal_entry Schema
Full table schema from `ae_describe journal_entry --detailed`:
| Field | Type | Indexed | Notes |
|---|---|---|---|
| `id_random` | varchar(22) | UNI | DB public ID field — but API responses return this as `journal_entry_id` (the Vision ID convention: `{obj_type}_id`). `id_random` key is `None` in responses. |
| `journal_id` | int | MUL | FK — use `for_obj_id` param in search |
| `name` | varchar(250) | MUL | Entry title |
| `short_name` | varchar(25) | | |
| `summary` | text | | Short summary (12 sentences) |
| `content` | text | | Full markdown content |
| `content_html` | text | | HTML version |
| `content_json` | longtext | | Structured content (editor format) |
| `content_encrypted` | longtext | | Optional encrypted content |
| `tags` | varchar(255) | MUL | Comma-separated string — filter with `icontains` |
| `type` / `type_code` | varchar | | Classification: type |
| `topic` / `topic_code` | varchar | | Classification: topic |
| `activity` / `activity_code` | varchar | | Classification: activity |
| `category_code` | varchar(25) | | Classification: category |
| `code` | varchar(20) | | Short entry code |
| `start_datetime` | datetime | MUL | Optional event start |
| `end_datetime` | datetime | | Optional event end |
| `seconds` / `hours` | int/decimal | | Duration |
| `priority` | tinyint | MUL | 1=low → 5=high |
| `status` | int | MUL | Status code (domain-specific) |
| `private` / `public` / `personal` / `professional` | tinyint | MUL | Visibility flags |
| `billable` | tinyint | | Billing flag |
| `enable` | tinyint NOT NULL | MUL | Soft-delete flag (default 1) |
| `hide` | tinyint | MUL | UI hide flag |
| `archive` | tinyint | MUL | Archived flag |
| `default_qry_str` | text | FULLTEXT | Auto-generated search target (name + content) |
| `data_json` | longtext | | Arbitrary structured data |
| `notes` | text | | Internal notes |
| `created_on` | timestamp NOT NULL | MUL | Auto-set on create |
| `updated_on` | timestamp | MUL | Auto-updated on change |
### journal Schema (top-level)
| Field | Type | Notes |
|---|---|---|
| `id_random` | varchar(22) | DB field — returned in API as `journal_id` |
| `name` | varchar(250) | Journal name |
| `summary` / `description` | text | |
| `type_code` | varchar(25) | Journal type |
| `enable` | tinyint | |
| `created_on` / `updated_on` | timestamp | |
---
## Current Tool Inventory
| Tool | Status | Notes |
|---|---|---|
| `ae_journal_list` | ✅ | Lists journals with id + name |
| `ae_journal_search` | ✅ | Fulltext + tag/date/type/status/priority filters; paginated |
| `ae_journal_entry_read` | ✅ | Full content by entry_id; configurable truncation |
| `ae_journal_entries_list` | ✅ | Browse a journal newest-first; paginated |
| `ae_journal_entry_create` | ✅ | Create with title, content, tags, summary |
| `ae_journal_entry_update` | ✅ | Patch any fields (title, content, tags, summary, enable) |
| `ae_journal_entry_disable` | ✅ | Soft-delete (enable=false) |
| `ae_journal_entry_append` | ✅ | Timestamped append to bottom |
| `ae_journal_entry_prepend` | ✅ | Timestamped prepend to top |
| `ae_task_list` | ✅ | agents_sync Kanban (admin only) |
---
## ae_journal_search — Current Signature
All filters are optional and combine with AND. At least one should be provided.
```python
ae_journal_search(
query: str = "", # fulltext via query_string (MATCH/AGAINST)
journal_id: str = "", # scope to a specific journal
tags: str = "", # icontains on tags field
type_code: str = "", # eq on type_code
topic_code: str = "", # eq on topic_code
date_from: str = "", # created_on gte (YYYY-MM-DD)
date_to: str = "", # created_on lte (YYYY-MM-DD, exclusive of time — use next day to include full day)
sort_by: str = "updated", # updated | created | name | priority
sort_order: str = "desc",
status: int | None = None,
priority: int | None = None,
max_results: int = 10,
page: int = 1,
)
```
**date_to boundary note:** `date_to='2026-01-17'` means `<= 2026-01-17 00:00:00`, which
excludes entries created later that day. Use `date_to='2026-01-18'` to include all of Jan 17.
---
## Planned: Broader AE Platform Integration
### Phase 1 — Journal Toolset (current)
Complete read/write/search for Journals and Journal Entries.
### Phase 2 — Tasks & Projects
- `ae_task_create` / `ae_task_update` / `ae_task_complete` on Aether tasks (not just agents_sync Kanban)
- Read project/task hierarchy
### Phase 3 — Knowledge Import Pipeline
- Script to walk markdown dirs, chunk by H2, create Journal entries
- Dedup via search-before-create pattern
- Tag and classify entries automatically via orchestrator
### Phase 4 — People & Contacts
- Read contact records (person, organization)
- Link journal entries to contacts
### Phase 5 — Calendar / Events
- `start_datetime` / `end_datetime` already on journal_entry
- Could expose time-scoped journal queries as a calendar view
---
## Notes on `tags` field
`tags` is stored as a raw comma-separated varchar(255), not a JSON array.
The API accepts a Python list on write (the `tags` PATCH key takes a list and the backend joins it).
On read, it comes back as a **string** (e.g. `"shelterluv, api"`), not a list — normalize before
displaying: `[t.strip() for t in tags_str.split(",") if t.strip()]`.
For filtering: use `icontains` on `tags` inside the `"and"` list, e.g.:
`{"field": "tags", "op": "icontains", "value": "networking"}`.
A tag search for "net" matches "networking" AND "subnet" — acceptable for now.
True per-tag filtering would require a tags junction table.
## Notes on `default_qry_str`
Auto-populated by the backend from `name` + content fields. Do not write to it directly.
FULLTEXT index supports boolean mode: `+required -excluded "exact phrase"`.
The `query_string` key in the search body triggers this path automatically.