Compare commits

...

9 Commits

Author SHA1 Message Date
Scott Idem
77997bc4ae feat: add cortex_status and cortex_update tools
cortex_status: git branch/commit/ahead-behind + systemctl state — read-only
cortex_update: git pull + syntax check all .py files + report; does NOT auto-restart.
  If syntax errors are found after pull, warns and blocks restart suggestion.
  Call cortex_restart separately to apply a clean update.

Both are admin-only. cortex_update is confirm-required (modifies files on disk).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-30 22:01:42 -04:00
Scott Idem
1ffa846edd docs: sync HELP.md tools table and files list with current implementation
- Add reminders_remove (targeted single-reminder removal, no confirm needed)
- Add ae_journal_entry_read, ae_journal_entries_list to AE Journals row
- Add email_send (admin-only) to Notifications row
- Remove TASKS.json from Files table (not in the Files panel)
- Add email_allowlist.json to Files table (Settings group in Files panel)
- Update last-updated date

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-30 21:54:50 -04:00
Scott Idem
98546abe21 docs: update ARCH__AE_INTEGRATION with verified API behavior
- query_string required for and/or filters to apply; use "%" as wildcard
- Total count is in meta.data_list_count, not top-level
- id_random is None in responses; Vision ID convention uses {obj_type}_id
- tags comes back as string on read, not list — normalize before joining
- Replace stale "Planned: Search Improvements" with current signature + notes
- Clarify date_to boundary (lte midnight, use next day to include full day)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-30 21:17:19 -04:00
Scott Idem
1fa5151d8a fix: correct V3 search filter key and response field names in ae_knowledge
- Filter key is "and" not "and_filters" (V3 API format)
- Entry IDs use journal_entry_id/id, not id_random (id_random is None)
- Dates use updated_on/created_on, not updated_at/created_at
- Total count lives in meta.data_list_count, not top-level total/count
- Inject query_string="%" when and filters present but no query, since
  the V3 search engine requires query_string for filters to apply
- Normalize tags from string to list in both entry_read and entries_list
- Fix order_by to use updated_on (not updated_at) in entries_list
- Correct ARCH__AE_INTEGRATION.md: and_filters → and, or_filters → or

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-30 21:12:44 -04:00
Scott Idem
71e472bebe feat: improved ae_journal_search + AE integration docs
Search improvements:
- Switched from LIKE on default_qry_str to query_string path (fulltext
  MATCH/AGAINST IN BOOLEAN MODE — uses the index, supports +/- boolean ops)
- Added tag filter (icontains on tags field)
- Added date_from / date_to filters (created_on gte/lte)
- Added type_code / topic_code exact-match filters
- Added sort_by / sort_order control (updated, created, name, priority)
- Added status / priority filters
- Added page parameter for pagination
- Richer output: updated date, tags, pagination hint
- Updated Gemini tool declaration with all new params

Docs:
- documentation/ARCH__AE_INTEGRATION.md — journal_entry full schema,
  search operator reference, current tool inventory, planned phases
  (broader AE integration: tasks, people, calendar, knowledge import)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-30 20:10:04 -04:00
Scott Idem
77327d97ad feat: improve AE Journal read toolset
- ae_journal_entry_read: expose full entry content by id_random (title,
  journal, tags, summary, full content with configurable truncation)
- ae_journal_entries_list: browse all entries in a journal newest-first,
  numbered with id/title/tags/summary/date and pagination support
- ae_journal_search: richer output — tags, updated date, 400-char preview
  (was 200), show summary OR preview (not both when summary exists)

_get_entry() was already implemented; read tool just exposes it properly.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-30 19:47:59 -04:00
Scott Idem
36fdda6728 feat: add reminders_remove tool for single-reminder removal
- reminders_remove(index) removes one reminder by 1-based index
- reminders_list now returns numbered output (1. heading / body)
  so any model can easily identify which index to pass
- _parse_sections() / _sections_to_text() helpers for clean round-trip
- Not in CONFIRM_REQUIRED — targeted removal is safe without a gate

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-30 19:27:53 -04:00
Scott Idem
6405dd338d feat: proper confirmation-resume flow + per-user tool policy
Fixes the broken confirmation gate where users had no way to approve
or deny a blocked tool call in the web UI.

Changes:
- orchestrator_engine.py: add OrchestrateCheckpoint dataclass, extract
  loop into _run_from_contents(), add resume() function
- openai_orchestrator.py: same treatment — _run_from_messages(), resume()
- routers/orchestrator.py: POST /{job_id}/confirm and /deny endpoints,
  separate _checkpoints store, _resume_job() + _finalize_job() helpers,
  "awaiting_confirmation" job status with pending_confirmation payload
- auth_utils.py: get_tool_policy() and save_tool_policy() helpers reading
  home/{user}/tool_policy.json (allow/deny lists)
- routers/orchestrator.py: loads tool_policy per user and passes
  confirm_allow/confirm_deny to both engines
- app.js: poll loop handles awaiting_confirmation — shows Confirm/Deny
  buttons inline, resumes polling after user action
- settings.html + settings.py: Tool Permissions section with allow/deny
  textareas, POST /settings/tool-policy route
- style.css: .confirm-gate, .confirm-btn, .deny-btn styles

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-30 19:14:53 -04:00
Scott Idem
bce7de647c feat: proactive notifications — email, NC Talk, Google Chat per user
notification.py now handles all three outbound channels. Email defaults
to the user's login address (google_email from auth.json); an optional
override can be set in channels.json. Google Chat uses an incoming
webhook URL. NC Talk was already wired, just needs notification_room set.

Settings page gains a Notifications section: channel dropdown, optional
email override, NC room token, and Google Chat webhook URL. All stored
in per-user channels.json.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-29 22:32:22 -04:00
15 changed files with 1718 additions and 222 deletions

View File

@@ -224,3 +224,22 @@ def get_user_channels(username: str) -> dict:
return json.loads(path.read_text()) return json.loads(path.read_text())
except Exception: except Exception:
return {} return {}
def get_tool_policy(username: str) -> dict:
"""Return the parsed tool_policy.json for a user.
Keys:
allow — tools in CONFIRM_REQUIRED that this user has pre-approved (skip gate)
deny — tools always blocked for this user regardless of global CONFIRM_REQUIRED
"""
path = settings.home_root() / username / "tool_policy.json"
try:
return json.loads(path.read_text())
except Exception:
return {}
def save_tool_policy(username: str, data: dict) -> None:
path = settings.home_root() / username / "tool_policy.json"
path.write_text(json.dumps(data, indent=2) + "\n")

View File

@@ -1,22 +1,21 @@
""" """
Outbound notification helpers — send messages to user channels proactively. Outbound notification helpers — send messages to user channels proactively.
Channel config lives in home/{user}/channels.json. Channel config lives in home/{user}/channels.json:
Each channel that supports proactive notifications needs a notification_channel
set to its key name (e.g. "nextcloud", "google_chat") in the user's channels.json:
{ {
"notification_channel": "nextcloud", "notification_channel": "email" | "nextcloud" | "google_chat",
"notification_email": "<override address — defaults to login email>",
"nextcloud": { "nextcloud": {
"url": "https://cloud.example.com", "url": "...", "bot_secret": "...", "notification_room": "<token>", ...
"bot_secret": "...", },
"notification_room": "<room-token>", "google_chat": {
... "outbound_webhook": "https://chat.googleapis.com/v1/spaces/...", ...
} }
} }
If notification_channel is absent, defaults to "nextcloud" if configured. If notification_channel is absent, defaults to "nextcloud" if configured.
If notification_room (for NCT) is absent, notifications are silently skipped.
""" """
import asyncio
import hashlib import hashlib
import hmac import hmac
import json import json
@@ -73,34 +72,89 @@ async def _notify_nct(nct: dict, message: str, username: str) -> None:
await _send_nct_message(url, secret, room, message) await _send_nct_message(url, secret, room, message)
async def _notify_email(username: str, message: str, email_override: str | None = None) -> None:
"""Send notification via email. Address = override → google_email from auth.json."""
from auth_utils import _read_auth
from email_utils import send_email
to_addr = email_override or _read_auth(username).get("google_email", "").strip()
if not to_addr:
logger.warning("notify: no email address for %s — set notification_email in channels.json", username)
return
ok = await asyncio.to_thread(
send_email,
to_email=to_addr,
subject="Cortex",
body_text=message,
body_html=message.replace("\n", "<br>"),
)
if ok:
logger.info("notify email → %s (%d chars)", to_addr, len(message))
else:
logger.warning("notify: email send failed for %s", username)
async def _notify_google_chat(webhook_url: str, message: str, username: str) -> None:
"""POST a message to a Google Chat space via incoming webhook."""
body = json.dumps({"text": message}, ensure_ascii=False).encode("utf-8")
try:
async with httpx.AsyncClient() as client:
resp = await client.post(
webhook_url,
content=body,
headers={"Content-Type": "application/json"},
timeout=15,
)
if resp.status_code not in (200, 201):
logger.warning("notify Google Chat %s → HTTP %d: %s", username, resp.status_code, resp.text[:200])
else:
logger.info("notify Google Chat → %s (%d chars)", username, len(message))
except Exception as e:
logger.error("notify Google Chat error for %s: %s", username, e)
async def notify(username: str, message: str, channel: str | None = None) -> None: async def notify(username: str, message: str, channel: str | None = None) -> None:
"""Send a notification to the user's preferred outbound channel. """Send a notification to the user's preferred outbound channel.
Channel resolution order: Channel resolution order:
1. `channel` parameter if provided 1. `channel` parameter if provided
2. `notification_channel` key in channels.json 2. `notification_channel` key in channels.json
3. "nextcloud" if configured 3. "nextcloud" if notification_room is configured
4. Silent no-op 4. Silent no-op
To configure: set `notification_channel` in home/{user}/channels.json. Configure via home/{user}/channels.json — see module docstring.
For NCT: also set `notification_room` in the nextcloud section.
""" """
from auth_utils import get_user_channels from auth_utils import get_user_channels
channels = get_user_channels(username) channels = get_user_channels(username)
target = channel or channels.get("notification_channel", "").strip() target = channel or channels.get("notification_channel", "").strip()
if not target: if not target:
# Auto-detect: use nextcloud if configured # Auto-detect: nextcloud if a notification_room is set
if "nextcloud" in channels: nct = channels.get("nextcloud", {})
if nct.get("notification_room", "").strip():
target = "nextcloud" target = "nextcloud"
else: else:
return return
if target == "nextcloud": if target == "email":
email_override = channels.get("notification_email", "").strip() or None
await _notify_email(username, message, email_override=email_override)
elif target == "nextcloud":
nct = channels.get("nextcloud") nct = channels.get("nextcloud")
if not nct: if not nct:
logger.debug("notify: nextcloud not configured for %s", username) logger.debug("notify: nextcloud not configured for %s", username)
return return
await _notify_nct(nct, message, username) await _notify_nct(nct, message, username)
elif target == "google_chat":
gc = channels.get("google_chat", {})
webhook = gc.get("outbound_webhook", "").strip()
if not webhook:
logger.debug("notify: google_chat outbound_webhook not set for %s", username)
return
await _notify_google_chat(webhook, message, username)
else: else:
logger.debug("notify: channel %r not yet supported for outbound (user %s)", target, username) logger.debug("notify: channel %r not supported for outbound (user %s)", target, username)

View File

@@ -24,8 +24,8 @@ import logging
from openai import AsyncOpenAI from openai import AsyncOpenAI
from config import settings from config import settings
from orchestrator_engine import OrchestratorResult from orchestrator_engine import OrchestrateCheckpoint, OrchestratorResult
from tools import OPENAI_TOOL_SCHEMAS, call_tool, get_openai_tools_for_role, CONFIRM_REQUIRED from tools import OPENAI_TOOL_SCHEMAS, call_tool, get_openai_tools_for_role, get_tools_for_role, CONFIRM_REQUIRED
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -45,6 +45,8 @@ async def run(
model_cfg: dict | None = None, model_cfg: dict | None = None,
respond_with_final: bool = True, respond_with_final: bool = True,
user_role: str = "user", user_role: str = "user",
confirm_allow: set[str] | None = None,
confirm_deny: set[str] | None = None,
) -> OrchestratorResult: ) -> OrchestratorResult:
""" """
Run a tool-enabled task using an OpenAI-compatible API. Run a tool-enabled task using an OpenAI-compatible API.
@@ -56,36 +58,22 @@ async def run(
model_cfg: Resolved model config from model_registry (local_openai type) model_cfg: Resolved model config from model_registry (local_openai type)
respond_with_final: If False, return just the tool-loop summary without a respond_with_final: If False, return just the tool-loop summary without a
full persona-voiced response (faster; for cron/background) full persona-voiced response (faster; for cron/background)
confirm_allow: Tools to bypass the confirmation gate for this user
confirm_deny: Tools to always block for this user
Returns: Returns:
OrchestratorResult — same shape as the Gemini engine for drop-in compatibility OrchestratorResult — if checkpoint is set, the job is awaiting confirmation
""" """
if not model_cfg: if not model_cfg:
raise RuntimeError("model_cfg is required for the OpenAI orchestrator") raise RuntimeError("model_cfg is required for the OpenAI orchestrator")
api_url = model_cfg.get("api_url", "") _confirm_allow = frozenset(confirm_allow or ())
api_key = model_cfg.get("api_key", "") or "none" _confirm_deny = frozenset(confirm_deny or ())
model_name = model_cfg.get("model_name", "") effective_confirm = (CONFIRM_REQUIRED - set(_confirm_allow)) | set(_confirm_deny)
host_type = model_cfg.get("host_type", "openwebui")
if not api_url or not model_name: client, model_name, active_tools = _build_client(model_cfg)
raise RuntimeError(
f"model_cfg missing api_url or model_name: {model_cfg.get('label', model_cfg)}"
)
# Open WebUI's OpenAI-compatible endpoint lives at /api/chat/completions,
# so the SDK base_url needs the /api prefix; standard OpenAI-layout hosts don't.
base_url = api_url.rstrip("/")
if host_type == "openwebui":
base_url = base_url + "/api"
client = AsyncOpenAI(base_url=base_url, api_key=api_key)
# System prompt: persona context + brief tool instruction
sys_content = (system_prompt or "") + _TOOL_INSTRUCTION sys_content = (system_prompt or "") + _TOOL_INSTRUCTION
# Build messages: [system, ...recent_session, current_task]
# Strip non-standard metadata fields (backend, host, etc.) before sending.
messages: list[dict] = [{"role": "system", "content": sys_content}] messages: list[dict] = [{"role": "system", "content": sys_content}]
if session_messages: if session_messages:
messages.extend( messages.extend(
@@ -94,13 +82,132 @@ async def run(
) )
messages.append({"role": "user", "content": task}) messages.append({"role": "user", "content": task})
active_tools = get_openai_tools_for_role(user_role)
active_callables: dict | None = None # resolved lazily below
tool_call_log: list[dict] = [] tool_call_log: list[dict] = []
final_response, checkpoint = await _run_from_messages(
client=client,
messages=messages,
active_tools=active_tools,
tool_call_log=tool_call_log,
effective_confirm=effective_confirm,
model_name=model_name,
task=task,
model_cfg=model_cfg,
respond_with_final=respond_with_final,
user_role=user_role,
confirm_allow=_confirm_allow,
confirm_deny=_confirm_deny,
starting_round=0,
)
if checkpoint:
return OrchestratorResult(
response=final_response,
tool_calls=list(tool_call_log),
backend="local",
gemini_summary=final_response,
checkpoint=checkpoint,
)
model_label = model_cfg.get("label") or model_name
logger.info("OpenAI orchestrator complete — model=%s tools=%d", model_label, len(tool_call_log))
return OrchestratorResult(
response=final_response,
tool_calls=tool_call_log,
backend="local",
gemini_summary=final_response,
)
async def resume(checkpoint: OrchestrateCheckpoint, confirmed: bool) -> OrchestratorResult:
"""Continue an OpenAI orchestrator job that was paused at a confirmation gate."""
client, model_name, active_tools = _build_client(checkpoint.model_cfg)
effective_confirm = (CONFIRM_REQUIRED - set(checkpoint.confirm_allow)) | set(checkpoint.confirm_deny)
messages = list(checkpoint.pre_fn_state)
tool_call_log = [t for t in checkpoint.tool_call_log if t["result"] != "[awaiting confirmation]"]
# Build tool responses for this round
for er in checkpoint.executed_results:
messages.append({
"role": "tool",
"tool_call_id": er.get("tool_call_id", er["name"]),
"content": er["result"],
})
for pt in checkpoint.pending_tools:
if confirmed:
_, callables = get_tools_for_role(checkpoint.user_role)
result_str = await _execute_tool_dict(pt["name"], pt["args"], checkpoint.user_role)
logger.info("Confirmed tool %s%d chars", pt["name"], len(result_str))
else:
result_str = "Action denied by user."
logger.info("Tool %s denied by user", pt["name"])
tool_call_log.append({"tool": pt["name"], "args": pt["args"], "result": result_str})
messages.append({
"role": "tool",
"tool_call_id": pt.get("tool_call_id", pt["name"]),
"content": result_str,
})
final_response, new_checkpoint = await _run_from_messages(
client=client,
messages=messages,
active_tools=active_tools,
tool_call_log=tool_call_log,
effective_confirm=effective_confirm,
model_name=model_name,
task=checkpoint.task,
model_cfg=checkpoint.model_cfg,
respond_with_final=checkpoint.respond_with_final,
user_role=checkpoint.user_role,
confirm_allow=checkpoint.confirm_allow,
confirm_deny=checkpoint.confirm_deny,
starting_round=checkpoint.rounds_used,
)
if new_checkpoint:
return OrchestratorResult(
response=final_response,
tool_calls=list(tool_call_log),
backend="local",
gemini_summary=final_response,
checkpoint=new_checkpoint,
)
model_label = (checkpoint.model_cfg or {}).get("label") or model_name
logger.info("OpenAI orchestrator resumed — model=%s tools=%d", model_label, len(tool_call_log))
return OrchestratorResult(
response=final_response,
tool_calls=tool_call_log,
backend="local",
gemini_summary=final_response,
)
async def _run_from_messages(
client,
messages: list[dict],
active_tools: list,
tool_call_log: list[dict],
effective_confirm: set[str],
model_name: str,
task: str,
model_cfg: dict | None,
respond_with_final: bool,
user_role: str,
confirm_allow: frozenset,
confirm_deny: frozenset,
starting_round: int = 0,
) -> tuple[str, OrchestrateCheckpoint | None]:
"""
Run the OpenAI ReAct loop from the current messages state.
Returns (final_response, checkpoint) — checkpoint is set if confirmation is needed.
"""
final_response = "" final_response = ""
for round_num in range(settings.orchestrator_max_rounds): for round_num in range(starting_round, settings.orchestrator_max_rounds):
logger.info("OpenAI orchestrator round %d / %d model=%s", logger.info("OpenAI orchestrator round %d / %d model=%s",
round_num + 1, settings.orchestrator_max_rounds, model_name) round_num + 1, settings.orchestrator_max_rounds, model_name)
@@ -112,29 +219,28 @@ async def run(
) )
choice = response.choices[0] choice = response.choices[0]
msg = choice.message msg = choice.message
# Append the assistant turn (MUST include tool_calls if present so the
# next request is valid — OpenAI requires the full history to be consistent)
assistant_msg: dict = {"role": "assistant"} assistant_msg: dict = {"role": "assistant"}
if msg.content: if msg.content:
assistant_msg["content"] = msg.content assistant_msg["content"] = msg.content
if msg.tool_calls: if msg.tool_calls:
assistant_msg["tool_calls"] = [ assistant_msg["tool_calls"] = [
{ {
"id": tc.id, "id": tc.id,
"type": "function", "type": "function",
"function": { "function": {"name": tc.function.name, "arguments": tc.function.arguments},
"name": tc.function.name,
"arguments": tc.function.arguments,
},
} }
for tc in msg.tool_calls for tc in msg.tool_calls
] ]
messages.append(assistant_msg) messages.append(assistant_msg)
if choice.finish_reason == "tool_calls" and msg.tool_calls: if choice.finish_reason == "tool_calls" and msg.tool_calls:
confirm_requested = False # Snapshot state before tool responses for potential checkpoint
pre_fn_state = list(messages)
pending_tools: list[dict] = []
executed_results: list[dict] = []
for tc in msg.tool_calls: for tc in msg.tool_calls:
name = tc.function.name name = tc.function.name
@@ -143,34 +249,23 @@ async def run(
except json.JSONDecodeError: except json.JSONDecodeError:
args_parsed = {"raw": tc.function.arguments} args_parsed = {"raw": tc.function.arguments}
if name in CONFIRM_REQUIRED: if name in effective_confirm:
args_str = json.dumps(args_parsed, indent=2) if args_parsed else "(no arguments)" pending_tools.append({"name": name, "args": args_parsed, "tool_call_id": tc.id})
result_str = (
f"⚠️ CONFIRMATION REQUIRED ⚠️\n"
f"Tool: {name}\nArguments:\n{args_str}\n\n"
f"Do NOT call this tool again. Tell the user exactly what you were "
f"about to do, explain the potential impact, and ask them to confirm "
f"by sending a follow-up message before you proceed."
)
confirm_requested = True
logger.info("Tool %s blocked — confirmation required", name) logger.info("Tool %s blocked — confirmation required", name)
else: else:
result_str = await _execute_tool(name, tc.function.arguments, user_role) result_str = await _execute_tool(name, tc.function.arguments, user_role)
logger.info("Tool %s%d chars", name, len(result_str)) logger.info("Tool %s%d chars", name, len(result_str))
executed_results.append({"name": name, "args": args_parsed, "result": result_str, "tool_call_id": tc.id})
tool_call_log.append({"tool": name, "args": args_parsed, "result": result_str})
messages.append({"role": "tool", "tool_call_id": tc.id, "content": result_str})
tool_call_log.append({ if pending_tools:
"tool": name, # Add placeholder responses
"args": args_parsed, for pt in pending_tools:
"result": "[awaiting confirmation]" if name in CONFIRM_REQUIRED else result_str, placeholder = f"[AWAITING USER CONFIRMATION for {pt['name']}]"
}) tool_call_log.append({"tool": pt["name"], "args": pt["args"], "result": "[awaiting confirmation]"})
messages.append({ messages.append({"role": "tool", "tool_call_id": pt["tool_call_id"], "content": placeholder})
"role": "tool",
"tool_call_id": tc.id,
"content": result_str,
})
if confirm_requested:
# One more model round to produce the confirmation-request message, then stop.
conf_resp = await client.chat.completions.create( conf_resp = await client.chat.completions.create(
model=model_name, model=model_name,
messages=messages, messages=messages,
@@ -180,10 +275,24 @@ async def run(
final_response = conf_resp.choices[0].message.content or ( final_response = conf_resp.choices[0].message.content or (
"This action requires your explicit confirmation before it can proceed." "This action requires your explicit confirmation before it can proceed."
) )
break
checkpoint = OrchestrateCheckpoint(
engine="openai",
pre_fn_state=pre_fn_state,
executed_results=executed_results,
pending_tools=pending_tools,
tool_call_log=list(tool_call_log),
task=task,
model_cfg=model_cfg,
respond_with_final=respond_with_final,
user_role=user_role,
confirm_allow=confirm_allow,
confirm_deny=confirm_deny,
rounds_used=round_num + 2,
)
return final_response, checkpoint
else: else:
# finish_reason == "stop" (or no tool_calls) — model is done
final_response = msg.content or "" final_response = msg.content or ""
logger.info( logger.info(
"OpenAI orchestrator done after %d round(s). Tools used: %d", "OpenAI orchestrator done after %d round(s). Tools used: %d",
@@ -192,30 +301,37 @@ async def run(
break break
else: else:
# Hit the round limit
logger.warning("OpenAI orchestrator hit max rounds (%d)", settings.orchestrator_max_rounds) logger.warning("OpenAI orchestrator hit max rounds (%d)", settings.orchestrator_max_rounds)
final_response = ( final_response = (
f"Reached the tool iteration limit ({settings.orchestrator_max_rounds} rounds). " f"Reached the tool iteration limit ({settings.orchestrator_max_rounds} rounds). "
"Here is what was gathered:\n\n" "Here is what was gathered:\n\n"
+ "\n\n".join( + "\n\n".join(f"**{t['tool']}**: {t['result'][:500]}" for t in tool_call_log)
f"**{t['tool']}**: {t['result'][:500]}" for t in tool_call_log
)
) )
model_label = model_cfg.get("label") or model_name return final_response, None
logger.info("OpenAI orchestrator complete — model=%s tools=%d", model_label, len(tool_call_log))
return OrchestratorResult(
response=final_response, def _build_client(model_cfg: dict | None) -> tuple:
tool_calls=tool_call_log, """Build AsyncOpenAI client and return (client, model_name, active_tools)."""
backend="local", if not model_cfg:
gemini_summary=final_response, # reused for UI display; same content in single-model mode raise RuntimeError("model_cfg is required for the OpenAI orchestrator")
) api_url = model_cfg.get("api_url", "")
api_key = model_cfg.get("api_key", "") or "none"
model_name = model_cfg.get("model_name", "")
host_type = model_cfg.get("host_type", "openwebui")
if not api_url or not model_name:
raise RuntimeError(
f"model_cfg missing api_url or model_name: {model_cfg.get('label', model_cfg)}"
)
base_url = api_url.rstrip("/")
if host_type == "openwebui":
base_url = base_url + "/api"
client = AsyncOpenAI(base_url=base_url, api_key=api_key)
return client, model_name, OPENAI_TOOL_SCHEMAS
async def _execute_tool(name: str, arguments_json: str, user_role: str = "user") -> str: async def _execute_tool(name: str, arguments_json: str, user_role: str = "user") -> str:
"""Parse tool arguments and execute with role-filtered callables.""" """Parse tool arguments and execute with role-filtered callables."""
from tools import get_tools_for_role
_, callables = get_tools_for_role(user_role) _, callables = get_tools_for_role(user_role)
try: try:
args = json.loads(arguments_json) args = json.loads(arguments_json)
@@ -226,3 +342,13 @@ async def _execute_tool(name: str, arguments_json: str, user_role: str = "user")
except Exception as e: except Exception as e:
logger.warning("Tool %s failed: %s", name, e) logger.warning("Tool %s failed: %s", name, e)
return f"Tool error: {e}" return f"Tool error: {e}"
async def _execute_tool_dict(name: str, args: dict, user_role: str = "user") -> str:
"""Execute a tool from a pre-parsed args dict."""
_, callables = get_tools_for_role(user_role)
try:
return await call_tool(name, args, callables)
except Exception as e:
logger.warning("Tool %s failed: %s", name, e)
return f"Tool error: {e}"

View File

@@ -44,12 +44,39 @@ Keep your summary factual and complete. Include relevant URLs, data, and specifi
If no tools are needed, return an empty summary.""" If no tools are needed, return an empty summary."""
@dataclass
class OrchestrateCheckpoint:
"""Saved execution state for a job paused at a confirmation gate."""
engine: str # "gemini" | "openai"
pre_fn_state: list # conversation state before function responses
executed_results: list[dict] # tools that already ran this round
pending_tools: list[dict] # [{name, args}] awaiting confirmation
tool_call_log: list[dict] # all tool calls so far
task: str
# Gemini-specific config (unused by openai engine)
system_prompt: str = ""
session_messages: list | None = None
model_name: str | None = None
gemini_api_key: str | None = None
respond_with_claude: bool = True
response_role: str = "chat"
# OpenAI-specific config (unused by gemini engine)
model_cfg: dict | None = None
respond_with_final: bool = True
# Common
user_role: str = "user"
confirm_allow: frozenset = field(default_factory=frozenset)
confirm_deny: frozenset = field(default_factory=frozenset)
rounds_used: int = 0
@dataclass @dataclass
class OrchestratorResult: class OrchestratorResult:
response: str # final user-facing response (from Claude) response: str # final user-facing response (from Claude)
tool_calls: list[dict] = field(default_factory=list) # [{tool, args, result}] tool_calls: list[dict] = field(default_factory=list) # [{tool, args, result}]
backend: str = "claude" # model that produced the final response backend: str = "claude" # model that produced the final response
gemini_summary: str = "" # what Gemini handed to Claude (debug/display) gemini_summary: str = "" # what Gemini handed to Claude (debug/display)
checkpoint: OrchestrateCheckpoint | None = None # set when awaiting confirmation
async def run( async def run(
@@ -61,6 +88,8 @@ async def run(
model_name: str | None = None, model_name: str | None = None,
response_role: str = "chat", response_role: str = "chat",
user_role: str = "user", user_role: str = "user",
confirm_allow: set[str] | None = None,
confirm_deny: set[str] | None = None,
) -> OrchestratorResult: ) -> OrchestratorResult:
""" """
Run the full orchestration loop for a task. Run the full orchestration loop for a task.
@@ -72,9 +101,11 @@ async def run(
respond_with_claude: If False, return Gemini's summary as the response (useful for respond_with_claude: If False, return Gemini's summary as the response (useful for
background/cron tasks where a polished reply isn't needed) background/cron tasks where a polished reply isn't needed)
gemini_api_key: Per-user Gemini API key (falls back to GEMINI_API_KEY in .env) gemini_api_key: Per-user Gemini API key (falls back to GEMINI_API_KEY in .env)
confirm_allow: Tools to bypass the confirmation gate for this user
confirm_deny: Tools to always block for this user
Returns: Returns:
OrchestratorResult with response, tool call log, backend used, and Gemini summary OrchestratorResult — if checkpoint is set, the job is awaiting confirmation
""" """
api_key = gemini_api_key or settings.gemini_api_key api_key = gemini_api_key or settings.gemini_api_key
if not api_key: if not api_key:
@@ -85,19 +116,157 @@ async def run(
client = genai.Client(api_key=api_key) client = genai.Client(api_key=api_key)
# Seed Gemini with the task — include recent session context if available _confirm_allow = frozenset(confirm_allow or ())
_confirm_deny = frozenset(confirm_deny or ())
effective_confirm = (CONFIRM_REQUIRED - set(_confirm_allow)) | set(_confirm_deny)
task_with_context = _build_task_prompt(task, session_messages) task_with_context = _build_task_prompt(task, session_messages)
contents: list[types.Content] = [ contents: list[types.Content] = [
types.Content(role="user", parts=[types.Part(text=task_with_context)]) types.Content(role="user", parts=[types.Part(text=task_with_context)])
] ]
tool_declarations, tool_callables = get_tools_for_role(user_role) tool_declarations, tool_callables = get_tools_for_role(user_role)
tool_call_log: list[dict] = [] tool_call_log: list[dict] = []
gemini_summary, checkpoint = await _run_from_contents(
client=client,
contents=contents,
tool_declarations=tool_declarations,
tool_callables=tool_callables,
tool_call_log=tool_call_log,
effective_confirm=effective_confirm,
model_name=model_name,
task=task,
system_prompt=system_prompt,
session_messages=session_messages,
respond_with_claude=respond_with_claude,
response_role=response_role,
user_role=user_role,
confirm_allow=_confirm_allow,
confirm_deny=_confirm_deny,
starting_round=0,
gemini_api_key=api_key,
)
if checkpoint:
return OrchestratorResult(
response=gemini_summary,
tool_calls=list(tool_call_log),
backend="gemini",
gemini_summary=gemini_summary,
checkpoint=checkpoint,
)
return await _claude_handoff(
task=task,
tool_call_log=tool_call_log,
gemini_summary=gemini_summary,
system_prompt=system_prompt,
session_messages=session_messages,
respond_with_claude=respond_with_claude,
response_role=response_role,
)
async def resume(checkpoint: OrchestrateCheckpoint, confirmed: bool) -> OrchestratorResult:
"""Continue a job that was paused at a confirmation gate."""
api_key = checkpoint.gemini_api_key or settings.gemini_api_key
client = genai.Client(api_key=api_key)
tool_declarations, tool_callables = get_tools_for_role(checkpoint.user_role)
effective_confirm = (CONFIRM_REQUIRED - set(checkpoint.confirm_allow)) | set(checkpoint.confirm_deny)
# Rebuild from saved state — strip "[awaiting confirmation]" placeholders
contents = list(checkpoint.pre_fn_state)
tool_call_log = [t for t in checkpoint.tool_call_log if t["result"] != "[awaiting confirmation]"]
# Build function responses for this round
response_parts: list[types.Part] = []
for er in checkpoint.executed_results:
response_parts.append(types.Part(function_response=types.FunctionResponse(
name=er["name"], response={"result": er["result"]}
)))
for pt in checkpoint.pending_tools:
if confirmed:
result_str = await _execute_tool(pt["name"], pt["args"], tool_callables)
logger.info("Confirmed tool %s%d chars", pt["name"], len(result_str))
else:
result_str = "Action denied by user."
logger.info("Tool %s denied by user", pt["name"])
tool_call_log.append({"tool": pt["name"], "args": pt["args"], "result": result_str})
response_parts.append(types.Part(function_response=types.FunctionResponse(
name=pt["name"], response={"result": result_str}
)))
contents.append(types.Content(role="user", parts=response_parts))
gemini_summary, new_checkpoint = await _run_from_contents(
client=client,
contents=contents,
tool_declarations=tool_declarations,
tool_callables=tool_callables,
tool_call_log=tool_call_log,
effective_confirm=effective_confirm,
model_name=checkpoint.model_name,
task=checkpoint.task,
system_prompt=checkpoint.system_prompt,
session_messages=checkpoint.session_messages,
respond_with_claude=checkpoint.respond_with_claude,
response_role=checkpoint.response_role,
user_role=checkpoint.user_role,
confirm_allow=checkpoint.confirm_allow,
confirm_deny=checkpoint.confirm_deny,
starting_round=checkpoint.rounds_used,
gemini_api_key=api_key,
)
if new_checkpoint:
return OrchestratorResult(
response=gemini_summary,
tool_calls=list(tool_call_log),
backend="gemini",
gemini_summary=gemini_summary,
checkpoint=new_checkpoint,
)
return await _claude_handoff(
task=checkpoint.task,
tool_call_log=tool_call_log,
gemini_summary=gemini_summary,
system_prompt=checkpoint.system_prompt,
session_messages=checkpoint.session_messages,
respond_with_claude=checkpoint.respond_with_claude,
response_role=checkpoint.response_role,
)
async def _run_from_contents(
client,
contents: list,
tool_declarations: list,
tool_callables: dict,
tool_call_log: list[dict],
effective_confirm: set[str],
model_name: str | None,
task: str,
system_prompt: str,
session_messages: list[dict] | None,
respond_with_claude: bool,
response_role: str,
user_role: str,
confirm_allow: frozenset,
confirm_deny: frozenset,
starting_round: int = 0,
gemini_api_key: str | None = None,
) -> tuple[str, OrchestrateCheckpoint | None]:
"""
Run the ReAct loop from the current contents state.
Returns (gemini_summary, checkpoint) — checkpoint is set if confirmation is needed.
"""
gemini_summary = "" gemini_summary = ""
# --- ReAct tool loop --- for round_num in range(starting_round, settings.orchestrator_max_rounds):
for round_num in range(settings.orchestrator_max_rounds):
logger.info("Orchestrator round %d for task: %.80s", round_num + 1, task) logger.info("Orchestrator round %d for task: %.80s", round_num + 1, task)
response = await asyncio.to_thread( response = await asyncio.to_thread(
@@ -113,67 +282,56 @@ async def run(
candidate = response.candidates[0] candidate = response.candidates[0]
parts = candidate.content.parts if candidate.content else [] parts = candidate.content.parts if candidate.content else []
# Check if Gemini wants to call any tools
tool_call_parts = [ tool_call_parts = [
p for p in parts p for p in parts
if hasattr(p, "function_call") and p.function_call and p.function_call.name if hasattr(p, "function_call") and p.function_call and p.function_call.name
] ]
if not tool_call_parts: if not tool_call_parts:
# No more tool calls — extract Gemini's text summary
gemini_summary = "".join( gemini_summary = "".join(
p.text for p in parts if hasattr(p, "text") and p.text p.text for p in parts if hasattr(p, "text") and p.text
).strip() ).strip()
logger.info("Orchestrator done after %d round(s). Tools used: %d", logger.info("Orchestrator done after %d round(s). Tools used: %d",
round_num + 1, len(tool_call_log)) round_num + 1, len(tool_call_log))
break return gemini_summary, None
# Add Gemini's response (with function calls) to the conversation
contents.append(candidate.content) contents.append(candidate.content)
# Execute tool calls — check confirmation requirement before calling # Snapshot state before function responses — used if a checkpoint is needed
pre_fn_state = list(contents)
response_parts: list[types.Part] = [] response_parts: list[types.Part] = []
confirm_requested = False pending_tools: list[dict] = []
executed_results: list[dict] = []
for fc_part in tool_call_parts: for fc_part in tool_call_parts:
fc = fc_part.function_call fc = fc_part.function_call
name = fc.name name = fc.name
args = dict(fc.args) args = dict(fc.args)
if name in CONFIRM_REQUIRED: if name in effective_confirm:
args_str = json.dumps(args, indent=2) if args else "(no arguments)" pending_tools.append({"name": name, "args": args})
result_str = (
f"⚠️ CONFIRMATION REQUIRED ⚠️\n"
f"Tool: {name}\nArguments:\n{args_str}\n\n"
f"Do NOT call this tool again. Tell the user exactly what you were "
f"about to do, explain the potential impact, and ask them to confirm "
f"by sending a follow-up message before you proceed."
)
confirm_requested = True
logger.info("Tool %s blocked — confirmation required", name) logger.info("Tool %s blocked — confirmation required", name)
else: else:
result_str = await _execute_tool(name, args, tool_callables) result_str = await _execute_tool(name, args, tool_callables)
logger.info("Tool %s%d chars", name, len(result_str)) logger.info("Tool %s%d chars", name, len(result_str))
executed_results.append({"name": name, "args": args, "result": result_str})
tool_call_log.append({"tool": name, "args": args, "result": result_str})
response_parts.append(types.Part(function_response=types.FunctionResponse(
name=name, response={"result": result_str}
)))
tool_call_log.append({ if pending_tools:
"tool": name, # Add placeholder responses and get Gemini to produce the confirmation message
"args": args, for pt in pending_tools:
"result": "[awaiting confirmation]" if name in CONFIRM_REQUIRED else result_str, placeholder = f"[AWAITING USER CONFIRMATION for {pt['name']}]"
}) response_parts.append(types.Part(function_response=types.FunctionResponse(
response_parts.append( name=pt["name"], response={"result": placeholder}
types.Part( )))
function_response=types.FunctionResponse( tool_call_log.append({"tool": pt["name"], "args": pt["args"], "result": "[awaiting confirmation]"})
name=name,
response={"result": result_str},
)
)
)
contents.append(types.Content(role="user", parts=response_parts)) contents.append(types.Content(role="user", parts=response_parts))
if confirm_requested:
# Allow one more Gemini round to produce the confirmation-request message,
# then break — tool is not executed until user confirms in a follow-up.
conf_response = await asyncio.to_thread( conf_response = await asyncio.to_thread(
client.models.generate_content, client.models.generate_content,
model=model_name or settings.orchestrator_model, model=model_name or settings.orchestrator_model,
@@ -191,10 +349,30 @@ async def run(
gemini_summary = "".join( gemini_summary = "".join(
p.text for p in conf_parts if hasattr(p, "text") and p.text p.text for p in conf_parts if hasattr(p, "text") and p.text
).strip() or "This action requires your explicit confirmation before it can proceed." ).strip() or "This action requires your explicit confirmation before it can proceed."
break
checkpoint = OrchestrateCheckpoint(
engine="gemini",
pre_fn_state=pre_fn_state,
executed_results=executed_results,
pending_tools=pending_tools,
tool_call_log=list(tool_call_log),
task=task,
system_prompt=system_prompt,
session_messages=session_messages,
model_name=model_name,
gemini_api_key=gemini_api_key,
respond_with_claude=respond_with_claude,
response_role=response_role,
user_role=user_role,
confirm_allow=confirm_allow,
confirm_deny=confirm_deny,
rounds_used=round_num + 2,
)
return gemini_summary, checkpoint
contents.append(types.Content(role="user", parts=response_parts))
else: else:
# Hit the round limit — use whatever Gemini produced last
logger.warning("Orchestrator hit max rounds (%d)", settings.orchestrator_max_rounds) logger.warning("Orchestrator hit max rounds (%d)", settings.orchestrator_max_rounds)
gemini_summary = ( gemini_summary = (
f"Reached the tool iteration limit ({settings.orchestrator_max_rounds} rounds). " f"Reached the tool iteration limit ({settings.orchestrator_max_rounds} rounds). "
@@ -202,21 +380,28 @@ async def run(
+ "\n\n".join(f"**{t['tool']}**: {t['result'][:500]}" for t in tool_call_log) + "\n\n".join(f"**{t['tool']}**: {t['result'][:500]}" for t in tool_call_log)
) )
# --- Claude handoff --- return gemini_summary, None
async def _claude_handoff(
task: str,
tool_call_log: list[dict],
gemini_summary: str,
system_prompt: str,
session_messages: list[dict] | None,
respond_with_claude: bool,
response_role: str,
) -> OrchestratorResult:
if respond_with_claude: if respond_with_claude:
claude_prompt = _build_claude_prompt(task, tool_call_log, gemini_summary) claude_prompt = _build_claude_prompt(task, tool_call_log, gemini_summary)
# Merge with session history so Claude has conversation context
messages = list(session_messages or []) messages = list(session_messages or [])
messages.append({"role": "user", "content": claude_prompt}) messages.append({"role": "user", "content": claude_prompt})
response_text, backend = await complete( response_text, backend = await complete(
system_prompt=system_prompt, system_prompt=system_prompt,
messages=messages, messages=messages,
role=response_role, role=response_role,
) )
else: else:
# Cron/background tasks: return Gemini's summary directly, no Claude call
response_text = gemini_summary or "No information gathered." response_text = gemini_summary or "No information gathered."
backend = "gemini" backend = "gemini"
@@ -242,12 +427,11 @@ def _build_task_prompt(task: str, session_messages: list[dict] | None) -> str:
if not session_messages: if not session_messages:
return task return task
# Include last few turns for context (don't send the full history to keep tokens low) recent = session_messages[-6:]
recent = session_messages[-6:] # last 3 turns
history_lines = [] history_lines = []
for msg in recent: for msg in recent:
label = "User" if msg["role"] == "user" else "Assistant" label = "User" if msg["role"] == "user" else "Assistant"
history_lines.append(f"{label}: {msg['content'][:300]}") # truncate long messages history_lines.append(f"{label}: {msg['content'][:300]}")
context = "\n".join(history_lines) context = "\n".join(history_lines)
return f"<recent_conversation>\n{context}\n</recent_conversation>\n\nCurrent request: {task}" return f"<recent_conversation>\n{context}\n</recent_conversation>\n\nCurrent request: {task}"
@@ -265,7 +449,6 @@ def _build_claude_prompt(
parts.append("## Research gathered\n") parts.append("## Research gathered\n")
for tc in tool_calls: for tc in tool_calls:
parts.append(f"### {tc['tool']}({_format_args(tc['args'])})") parts.append(f"### {tc['tool']}({_format_args(tc['args'])})")
# Truncate very long results — Claude gets the gist
result = tc["result"] result = tc["result"]
if len(result) > 2000: if len(result) > 2000:
result = result[:2000] + "\n… [truncated]" result = result[:2000] + "\n… [truncated]"

View File

@@ -15,10 +15,10 @@ import logging
import uuid import uuid
from datetime import datetime, timezone from datetime import datetime, timezone
from fastapi import APIRouter from fastapi import APIRouter, HTTPException
from pydantic import BaseModel from pydantic import BaseModel
from auth_utils import get_user_gemini_key, get_user_role from auth_utils import get_user_gemini_key, get_user_role, get_tool_policy
from config import settings from config import settings
from context_loader import load_context from context_loader import load_context
from persona import set_context, validate as validate_persona from persona import set_context, validate as validate_persona
@@ -31,12 +31,16 @@ router = APIRouter(prefix="/orchestrate", tags=["orchestrator"])
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# In-memory job store # In-memory job store
# Jobs are keyed by UUID. For this phase, memory is fine — jobs are short-lived.
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
_jobs: dict[str, dict] = {} _jobs: dict[str, dict] = {}
_jobs_lock = asyncio.Lock() _jobs_lock = asyncio.Lock()
# Checkpoints are stored separately — they hold Python objects (types.Content, etc.)
# that can't be included in the JSON-serializable job dict.
_checkpoints: dict[str, orchestrator_engine.OrchestrateCheckpoint] = {}
_checkpoints_lock = asyncio.Lock()
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Request / response models # Request / response models
@@ -57,7 +61,7 @@ class OrchestrateRequest(BaseModel):
class OrchestrateResponse(BaseModel): class OrchestrateResponse(BaseModel):
job_id: str job_id: str
status: str # "queued" | "running" | "complete" | "error" status: str # "queued" | "running" | "complete" | "error" | "awaiting_confirmation"
class JobStatusResponse(BaseModel): class JobStatusResponse(BaseModel):
@@ -72,6 +76,7 @@ class JobStatusResponse(BaseModel):
backend: str | None = None backend: str | None = None
gemini_summary: str | None = None gemini_summary: str | None = None
error: str | None = None error: str | None = None
pending_confirmation: dict | None = None # {tools: [{name, args}], message: str}
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
@@ -85,7 +90,6 @@ async def orchestrate(req: OrchestrateRequest) -> OrchestrateResponse:
user, persona = validate_persona(req.user, req.persona) user, persona = validate_persona(req.user, req.persona)
set_context(user, persona) set_context(user, persona)
except ValueError as e: except ValueError as e:
from fastapi import HTTPException
raise HTTPException(status_code=400, detail=str(e)) raise HTTPException(status_code=400, detail=str(e))
job_id = str(uuid.uuid4()) job_id = str(uuid.uuid4())
@@ -97,17 +101,19 @@ async def orchestrate(req: OrchestrateRequest) -> OrchestrateResponse:
"task": req.task, "task": req.task,
"created_at": now, "created_at": now,
"completed_at": None, "completed_at": None,
"session_id": None,
"response": None, "response": None,
"tool_calls": None, "tool_calls": None,
"backend": None, "backend": None,
"gemini_summary": None, "gemini_summary": None,
"error": None, "error": None,
"pending_confirmation": None,
"_user": user,
} }
async with _jobs_lock: async with _jobs_lock:
_jobs[job_id] = job _jobs[job_id] = job
# Run in background — caller polls GET /orchestrate/{job_id}
asyncio.create_task(_run_job(job_id, req, user)) asyncio.create_task(_run_job(job_id, req, user))
logger.info("Orchestrator job queued: %s%.80s", job_id, req.task) logger.info("Orchestrator job queued: %s%.80s", job_id, req.task)
return OrchestrateResponse(job_id=job_id, status="queued") return OrchestrateResponse(job_id=job_id, status="queued")
@@ -120,10 +126,9 @@ async def job_status(job_id: str) -> JobStatusResponse:
job = _jobs.get(job_id) job = _jobs.get(job_id)
if job is None: if job is None:
from fastapi import HTTPException
raise HTTPException(status_code=404, detail=f"Job {job_id} not found") raise HTTPException(status_code=404, detail=f"Job {job_id} not found")
return JobStatusResponse(**job) return JobStatusResponse(**{k: v for k, v in job.items() if not k.startswith("_")})
@router.get("", response_model=list[JobStatusResponse]) @router.get("", response_model=list[JobStatusResponse])
@@ -131,11 +136,55 @@ async def list_jobs() -> list[JobStatusResponse]:
"""List all jobs (most recent first). Useful for debugging.""" """List all jobs (most recent first). Useful for debugging."""
async with _jobs_lock: async with _jobs_lock:
jobs = sorted(_jobs.values(), key=lambda j: j["created_at"], reverse=True) jobs = sorted(_jobs.values(), key=lambda j: j["created_at"], reverse=True)
return [JobStatusResponse(**j) for j in jobs] return [JobStatusResponse(**{k: v for k, v in j.items() if not k.startswith("_")}) for j in jobs]
@router.post("/{job_id}/confirm", response_model=OrchestrateResponse)
async def confirm_job(job_id: str) -> OrchestrateResponse:
"""Confirm a pending tool call — the blocked tool will execute and the job continues."""
async with _checkpoints_lock:
checkpoint = _checkpoints.pop(job_id, None)
if checkpoint is None:
raise HTTPException(status_code=404, detail="No pending confirmation for this job")
async with _jobs_lock:
job = _jobs.get(job_id)
if not job or job["status"] != "awaiting_confirmation":
raise HTTPException(status_code=409, detail="Job is not awaiting confirmation")
_jobs[job_id]["status"] = "running"
_jobs[job_id]["pending_confirmation"] = None
user = job.get("_user", "scott")
asyncio.create_task(_resume_job(job_id, checkpoint, confirmed=True, user=user))
logger.info("Orchestrator job %s confirmed — resuming", job_id)
return OrchestrateResponse(job_id=job_id, status="running")
@router.post("/{job_id}/deny", response_model=OrchestrateResponse)
async def deny_job(job_id: str) -> OrchestrateResponse:
"""Deny a pending tool call — the tool is skipped and the job produces a final response."""
async with _checkpoints_lock:
checkpoint = _checkpoints.pop(job_id, None)
if checkpoint is None:
raise HTTPException(status_code=404, detail="No pending confirmation for this job")
async with _jobs_lock:
job = _jobs.get(job_id)
if not job or job["status"] != "awaiting_confirmation":
raise HTTPException(status_code=409, detail="Job is not awaiting confirmation")
_jobs[job_id]["status"] = "running"
_jobs[job_id]["pending_confirmation"] = None
user = job.get("_user", "scott")
asyncio.create_task(_resume_job(job_id, checkpoint, confirmed=False, user=user))
logger.info("Orchestrator job %s denied — resuming with skip", job_id)
return OrchestrateResponse(job_id=job_id, status="running")
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Background runner # Background runners
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
async def _run_job(job_id: str, req: OrchestrateRequest, user: str) -> None: async def _run_job(job_id: str, req: OrchestrateRequest, user: str) -> None:
@@ -146,7 +195,6 @@ async def _run_job(job_id: str, req: OrchestrateRequest, user: str) -> None:
try: try:
from session_store import load as load_session, save as save_session, generate_session_id from session_store import load as load_session, save as save_session, generate_session_id
# Load Inara's system prompt (same as the chat router does)
tier = req.tier or settings.default_tier tier = req.tier or settings.default_tier
system_prompt = load_context( system_prompt = load_context(
tier, tier,
@@ -155,16 +203,17 @@ async def _run_job(job_id: str, req: OrchestrateRequest, user: str) -> None:
include_short=req.include_short, include_short=req.include_short,
) )
# Load session history if a session_id was provided
session_id = req.session_id or generate_session_id() session_id = req.session_id or generate_session_id()
history = load_session(session_id) history = load_session(session_id)
session_messages = history or None session_messages = history or None
# Choose engine based on the orchestrator role in the model registry
orch_model = model_registry.get_model_for_role(user, "orchestrator") orch_model = model_registry.get_model_for_role(user, "orchestrator")
user_role = get_user_role(user) user_role = get_user_role(user)
policy = get_tool_policy(user)
confirm_allow = set(policy.get("allow", []))
confirm_deny = set(policy.get("deny", []))
if orch_model and orch_model.get("type") == "local_openai": if orch_model and orch_model.get("type") == "local_openai":
result = await openai_orchestrator.run( result = await openai_orchestrator.run(
task=req.task, task=req.task,
@@ -173,10 +222,10 @@ async def _run_job(job_id: str, req: OrchestrateRequest, user: str) -> None:
model_cfg=orch_model, model_cfg=orch_model,
respond_with_final=req.respond_with_claude, respond_with_final=req.respond_with_claude,
user_role=user_role, user_role=user_role,
confirm_allow=confirm_allow,
confirm_deny=confirm_deny,
) )
else: else:
# Use the API key embedded in the resolved model config (V2 registry with
# account_id), then fall back to the per-user key from auth.json, then .env.
gemini_key = ( gemini_key = (
(orch_model.get("api_key") if orch_model else None) (orch_model.get("api_key") if orch_model else None)
or get_user_gemini_key(user) or get_user_gemini_key(user)
@@ -190,28 +239,31 @@ async def _run_job(job_id: str, req: OrchestrateRequest, user: str) -> None:
model_name=orch_model.get("model_name") if orch_model else None, model_name=orch_model.get("model_name") if orch_model else None,
response_role=req.chat_role, response_role=req.chat_role,
user_role=user_role, user_role=user_role,
confirm_allow=confirm_allow,
confirm_deny=confirm_deny,
) )
# Save the turn to the session store so it survives a page refresh if result.checkpoint:
history.append({"role": "user", "content": req.task}) async with _checkpoints_lock:
history.append({"role": "assistant", "content": result.response}) _checkpoints[job_id] = result.checkpoint
save_session(session_id, history) async with _jobs_lock:
_jobs[job_id].update({
"status": "awaiting_confirmation",
"response": result.response,
"tool_calls": result.tool_calls,
"backend": result.backend,
"gemini_summary": result.gemini_summary,
"session_id": session_id,
"pending_confirmation": {
"tools": result.checkpoint.pending_tools,
"message": result.response,
},
})
logger.info("Orchestrator job %s awaiting confirmation — %d tool(s) blocked",
job_id, len(result.checkpoint.pending_tools))
return
from session_logger import log_turn await _finalize_job(job_id, result, session_id, req.task, history)
log_turn(session_id, req.task, result.response)
now = datetime.now(timezone.utc).isoformat()
async with _jobs_lock:
_jobs[job_id].update({
"status": "complete",
"completed_at": now,
"session_id": session_id,
"response": result.response,
"tool_calls": result.tool_calls,
"backend": result.backend,
"gemini_summary": result.gemini_summary,
})
logger.info("Orchestrator job complete: %s (%d tool calls)", job_id, len(result.tool_calls))
except Exception as e: except Exception as e:
logger.exception("Orchestrator job failed: %s", job_id) logger.exception("Orchestrator job failed: %s", job_id)
@@ -222,3 +274,87 @@ async def _run_job(job_id: str, req: OrchestrateRequest, user: str) -> None:
"completed_at": now, "completed_at": now,
"error": str(e), "error": str(e),
}) })
async def _resume_job(
job_id: str,
checkpoint: orchestrator_engine.OrchestrateCheckpoint,
confirmed: bool,
user: str,
) -> None:
"""Resume a job after the user confirms or denies a pending tool call."""
try:
if checkpoint.engine == "gemini":
result = await orchestrator_engine.resume(checkpoint, confirmed)
else:
result = await openai_orchestrator.resume(checkpoint, confirmed)
if result.checkpoint:
# Another confirmation needed (chained gates)
async with _checkpoints_lock:
_checkpoints[job_id] = result.checkpoint
async with _jobs_lock:
_jobs[job_id].update({
"status": "awaiting_confirmation",
"response": result.response,
"tool_calls": result.tool_calls,
"backend": result.backend,
"gemini_summary": result.gemini_summary,
"pending_confirmation": {
"tools": result.checkpoint.pending_tools,
"message": result.response,
},
})
logger.info("Orchestrator job %s awaiting another confirmation", job_id)
return
async with _jobs_lock:
session_id = _jobs[job_id].get("session_id") or ""
task = _jobs[job_id].get("task", "")
from session_store import load as load_session
history = load_session(session_id) if session_id else []
await _finalize_job(job_id, result, session_id, task, history)
except Exception as e:
logger.exception("Orchestrator resume failed: %s", job_id)
now = datetime.now(timezone.utc).isoformat()
async with _jobs_lock:
_jobs[job_id].update({
"status": "error",
"completed_at": now,
"error": str(e),
})
async def _finalize_job(
job_id: str,
result: orchestrator_engine.OrchestratorResult,
session_id: str,
task: str,
history: list,
) -> None:
"""Save session, log the turn, and mark the job complete."""
from session_store import save as save_session, generate_session_id
from session_logger import log_turn
if not session_id:
session_id = generate_session_id()
history.append({"role": "user", "content": task})
history.append({"role": "assistant", "content": result.response})
save_session(session_id, history)
log_turn(session_id, task, result.response)
now = datetime.now(timezone.utc).isoformat()
async with _jobs_lock:
_jobs[job_id].update({
"status": "complete",
"completed_at": now,
"session_id": session_id,
"response": result.response,
"tool_calls": result.tool_calls,
"backend": result.backend,
"gemini_summary": result.gemini_summary,
})
logger.info("Orchestrator job complete: %s (%d tool calls)", job_id, len(result.tool_calls))

View File

@@ -18,7 +18,8 @@ import jwt
from fastapi import APIRouter, Form, Request from fastapi import APIRouter, Form, Request
from fastapi.responses import HTMLResponse, RedirectResponse from fastapi.responses import HTMLResponse, RedirectResponse
from auth_utils import COOKIE_NAME, decode_token, check_credentials, set_password, _read_auth, _write_auth from auth_utils import COOKIE_NAME, decode_token, check_credentials, set_password, _read_auth, _write_auth, get_user_channels, get_tool_policy, save_tool_policy
from tools import CONFIRM_REQUIRED
from persona import list_user_personas from persona import list_user_personas
from config import settings as app_settings from config import settings as app_settings
@@ -73,6 +74,26 @@ def _settings_page(username: str, personas: list[str], back_persona: str = "", s
allowlist_text = "" allowlist_text = ""
html = html.replace("{{ email_allowlist }}", allowlist_text) html = html.replace("{{ email_allowlist }}", allowlist_text)
# Notification channel settings
channels = get_user_channels(username)
notify_ch = _html.escape(channels.get("notification_channel", "") or "")
notify_email = _html.escape(channels.get("notification_email", "") or "")
nc_room = _html.escape((channels.get("nextcloud") or {}).get("notification_room", "") or "")
gc_webhook = _html.escape((channels.get("google_chat") or {}).get("outbound_webhook", "") or "")
html = html.replace("{{ notify_channel }}", notify_ch)
html = html.replace("{{ notify_email_override }}", notify_email)
html = html.replace("{{ nc_notify_room }}", nc_room)
html = html.replace("{{ gc_webhook }}", gc_webhook)
# Tool permission policy
policy = get_tool_policy(username)
tool_allow_text = _html.escape("\n".join(policy.get("allow", [])))
tool_deny_text = _html.escape("\n".join(policy.get("deny", [])))
confirm_tools_list = _html.escape(", ".join(sorted(CONFIRM_REQUIRED)))
html = html.replace("{{ tool_allow }}", tool_allow_text)
html = html.replace("{{ tool_deny }}", tool_deny_text)
html = html.replace("{{ confirm_required_tools }}", confirm_tools_list)
persona_items = "\n".join( persona_items = "\n".join(
f'''<li> f'''<li>
<a href="/{username}/{p}" class="persona-link">{p}</a> <a href="/{username}/{p}" class="persona-link">{p}</a>
@@ -240,6 +261,78 @@ async def rename_persona(
return RedirectResponse("/settings", status_code=302) return RedirectResponse("/settings", status_code=302)
@router.post("/settings/notifications", include_in_schema=False)
async def save_notifications(
request: Request,
notification_channel: str = Form(""),
notification_email: str = Form(""),
nc_notification_room: str = Form(""),
gc_outbound_webhook: str = Form(""),
):
username = _get_session_user(request)
if not username:
return RedirectResponse("/login", status_code=302)
personas = list_user_personas(username)
back_persona = _preferred_persona(request, username)
channels_path = app_settings.home_root() / username / "channels.json"
try:
channels = json.loads(channels_path.read_text())
except Exception:
channels = {}
# Top-level notification preference
notification_channel = notification_channel.strip()
if notification_channel in ("email", "nextcloud", "google_chat"):
channels["notification_channel"] = notification_channel
else:
channels.pop("notification_channel", None)
# Optional email address override (blank = use login email)
notification_email = notification_email.strip()
if notification_email:
channels["notification_email"] = notification_email
else:
channels.pop("notification_email", None)
# NC Talk notification room — nested under "nextcloud"
if "nextcloud" not in channels:
channels["nextcloud"] = {}
channels["nextcloud"]["notification_room"] = nc_notification_room.strip()
# Google Chat outbound webhook — nested under "google_chat"
if "google_chat" not in channels:
channels["google_chat"] = {}
channels["google_chat"]["outbound_webhook"] = gc_outbound_webhook.strip()
channels_path.write_text(json.dumps(channels, indent=2) + "\n")
logger.info("notifications updated for %s (channel=%s)", username, notification_channel or "none")
return HTMLResponse(_settings_page(username, personas, back_persona,
success="Notification settings saved."))
@router.post("/settings/tool-policy", include_in_schema=False)
async def save_tool_policy_route(
request: Request,
allow_list: str = Form(""),
deny_list: str = Form(""),
):
username = _get_session_user(request)
if not username:
return RedirectResponse("/login", status_code=302)
personas = list_user_personas(username)
back_persona = _preferred_persona(request, username)
allow_tools = [ln.strip() for ln in allow_list.splitlines() if ln.strip()]
deny_tools = [ln.strip() for ln in deny_list.splitlines() if ln.strip()]
save_tool_policy(username, {"allow": allow_tools, "deny": deny_tools})
logger.info("tool policy updated for %s (allow=%d deny=%d)", username, len(allow_tools), len(deny_tools))
return HTMLResponse(_settings_page(username, personas, back_persona,
success="Tool permission policy saved."))
@router.post("/settings/email-allowlist", include_in_schema=False) @router.post("/settings/email-allowlist", include_in_schema=False)
async def save_email_allowlist( async def save_email_allowlist(
request: Request, request: Request,

View File

@@ -6,7 +6,7 @@
and are appended automatically by help.html when present. and are appended automatically by help.html when present.
--> -->
*Last updated: 2026-04-29* *Last updated: 2026-04-30*
--- ---
@@ -66,13 +66,13 @@ The ⚡ toggle is **independent of the Role selector** — you can use any role
| Web | `web_search`, `http_fetch` | | Web | `web_search`, `http_fetch` |
| Files | `file_read` ¹, `file_list` ¹, `file_write` ¹ ² | | Files | `file_read` ¹, `file_list` ¹, `file_write` ¹ ² |
| Shell | `shell_exec` ¹ ², `claude_allow_dir` ¹ | | Shell | `shell_exec` ¹ ², `claude_allow_dir` ¹ |
| System | `cortex_restart` ¹ ², `cortex_logs` ¹ | | System | `cortex_restart` ¹ ², `cortex_logs` ¹, `cortex_status` ¹, `cortex_update` ¹ ² |
| Tasks | `task_list`, `task_create`, `task_update`, `task_complete` | | Tasks | `task_list`, `task_create`, `task_update`, `task_complete` |
| Cron | `cron_list`, `cron_add`, `cron_remove` ², `cron_toggle` | | Cron | `cron_list`, `cron_add`, `cron_remove` ², `cron_toggle` |
| Reminders | `reminders_add`, `reminders_list`, `reminders_clear` ² | | Reminders | `reminders_add`, `reminders_list`, `reminders_remove`, `reminders_clear` ² |
| Scratchpad | `scratch_read`, `scratch_write`, `scratch_append`, `scratch_clear` | | Scratchpad | `scratch_read`, `scratch_write`, `scratch_append`, `scratch_clear` |
| Notifications | `nc_talk_send` ¹ | | Notifications | `email_send` ¹, `nc_talk_send` ¹ |
| Aether Journals | `ae_journal_list`, `ae_journal_search`, `ae_journal_entry_create`, `ae_journal_entry_update`, `ae_journal_entry_disable`, `ae_journal_entry_append`, `ae_journal_entry_prepend` | | Aether Journals | `ae_journal_list`, `ae_journal_search`, `ae_journal_entries_list`, `ae_journal_entry_read`, `ae_journal_entry_create`, `ae_journal_entry_update`, `ae_journal_entry_disable`, `ae_journal_entry_append`, `ae_journal_entry_prepend` |
| Aether Tasks | `ae_task_list` ¹ | | Aether Tasks | `ae_task_list` ¹ |
¹ **Admin only** — requires the `admin` role. These tools are invisible to regular users. ¹ **Admin only** — requires the `admin` role. These tools are invisible to regular users.
@@ -234,8 +234,8 @@ The **Files** button opens an editor for your persona's identity and memory file
| `MEMORY_LONG.md` | Permanent curated long-term memory | | `MEMORY_LONG.md` | Permanent curated long-term memory |
| `MEMORY_MID.md` | Rolling mid-term digest (LLM-distilled) | | `MEMORY_MID.md` | Rolling mid-term digest (LLM-distilled) |
| `MEMORY_SHORT.md` | Recent session rollup (auto-aggregated) | | `MEMORY_SHORT.md` | Recent session rollup (auto-aggregated) |
| `TASKS.json` | Personal task list (managed via Agent mode) | | `HELP.md` | This file — persona-specific additions appended below |
| `HELP.md` | This file | | `email_allowlist.json` | Regex patterns for permitted `email_send` recipients (one per line) |
Toggle **preview** / **edit** to switch between rendered markdown and raw text. **Ctrl+S** saves, **Esc** closes. Toggle **preview** / **edit** to switch between rendered markdown and raw text. **Ctrl+S** saves, **Esc** closes.

View File

@@ -1192,6 +1192,37 @@
: '⚡ working…'; : '⚡ working…';
continue; continue;
} }
if (job.status === 'awaiting_confirmation') {
const pc = job.pending_confirmation || {};
const toolNames = (pc.tools || []).map(t => t.name).join(', ');
thinkingDiv.className = 'message assistant';
thinkingDiv.innerHTML = `<div class="confirm-gate">
<p>${escapeHtml(pc.message || 'Confirm this action?')}</p>
<p class="confirm-tools">Tool${(pc.tools||[]).length !== 1 ? 's' : ''}: <code>${escapeHtml(toolNames)}</code></p>
<div class="confirm-actions">
<button class="confirm-btn">Confirm</button>
<button class="deny-btn">Deny</button>
</div>
</div>`;
const confirmed = await new Promise(resolve => {
thinkingDiv.querySelector('.confirm-btn').onclick = () => resolve(true);
thinkingDiv.querySelector('.deny-btn').onclick = () => resolve(false);
});
thinkingDiv.className = 'message assistant thinking';
thinkingDiv.textContent = confirmed ? '⚡ confirmed — continuing…' : '⚡ denied — finishing…';
const action = confirmed ? 'confirm' : 'deny';
const resumeRes = await fetch(`/orchestrate/${job_id}/${action}`, {
method: 'POST',
signal: activeController.signal,
});
if (!resumeRes.ok) throw new Error(`Resume failed: HTTP ${resumeRes.status}`);
continue;
}
break; break;
} }

View File

@@ -344,6 +344,84 @@
</form> </form>
</div> </div>
<!-- Notifications -->
<div class="section">
<h2>Notifications</h2>
<p style="font-size:0.8rem; color:var(--pg-muted); margin-bottom:0.85rem; line-height:1.55;">
Choose how Inara reaches out proactively — cron jobs, briefs, and future alerts.
Email defaults to your login address when no override is set.
</p>
<form method="POST" action="/settings/notifications">
<div class="field">
<label for="notification_channel">Notification channel</label>
<select id="notification_channel" name="notification_channel"
data-value="{{ notify_channel }}"
style="width:100%; padding:0.65rem 0.85rem; background:var(--pg-bg);
border:1px solid var(--pg-border); border-radius:6px;
color:var(--pg-text); font-size:0.95rem; outline:none;
transition:border-color 0.15s;">
<option value="">None (disabled)</option>
<option value="email">Email</option>
<option value="nextcloud">Nextcloud Talk</option>
<option value="google_chat">Google Chat</option>
</select>
</div>
<div class="field">
<label for="notification_email">Email override
<span style="color:var(--pg-dim); font-weight:400;">(optional)</span>
</label>
<input type="email" id="notification_email" name="notification_email"
value="{{ notify_email_override }}"
placeholder="Leave blank to use login email"
autocomplete="off">
</div>
<div class="field">
<label for="nc_notification_room">Nextcloud Talk room token</label>
<input type="text" id="nc_notification_room" name="nc_notification_room"
value="{{ nc_notify_room }}"
placeholder="Token from the Talk room URL"
autocomplete="off" spellcheck="false">
</div>
<div class="field">
<label for="gc_outbound_webhook">Google Chat webhook URL</label>
<input type="url" id="gc_outbound_webhook" name="gc_outbound_webhook"
value="{{ gc_webhook }}"
placeholder="https://chat.googleapis.com/v1/spaces/…"
autocomplete="off" spellcheck="false">
</div>
<button type="submit">Save notification settings</button>
</form>
</div>
<!-- Tool Permissions -->
<div class="section">
<h2>Tool Permissions</h2>
<p style="font-size:0.8rem; color:var(--pg-muted); margin-bottom:0.5rem; line-height:1.55;">
Override the default confirmation gate for orchestrator tools.
<strong>Allow list</strong> — tools that run without asking for confirmation.
<strong>Deny list</strong> — tools that are always blocked for your account.
One tool name per line.
</p>
<p style="font-size:0.78rem; color:var(--pg-muted); margin-bottom:0.85rem;">
Tools requiring confirmation by default: <code>{{ confirm_required_tools }}</code>
</p>
<form method="POST" action="/settings/tool-policy">
<div class="form-group">
<label for="allow_list">Allow list (bypass confirmation)</label>
<textarea id="allow_list" name="allow_list" rows="3"
placeholder="reminders_clear&#10;cron_remove"
autocomplete="off" spellcheck="false">{{ tool_allow }}</textarea>
</div>
<div class="form-group">
<label for="deny_list">Deny list (always block)</label>
<textarea id="deny_list" name="deny_list" rows="3"
placeholder="shell_exec&#10;file_write"
autocomplete="off" spellcheck="false">{{ tool_deny }}</textarea>
</div>
<button type="submit">Save tool permissions</button>
</form>
</div>
<!-- Browser cache --> <!-- Browser cache -->
<div class="section"> <div class="section">
<h2>Browser Cache</h2> <h2>Browser Cache</h2>
@@ -411,6 +489,12 @@
</div> </div>
<script> <script>
// Restore notification channel dropdown from injected value
(function() {
const sel = document.getElementById('notification_channel');
if (sel) sel.value = sel.dataset.value || '';
})();
// Password confirmation check // Password confirmation check
document.getElementById('password-form').addEventListener('submit', e => { document.getElementById('password-form').addEventListener('submit', e => {
const np = document.getElementById('new_password').value; const np = document.getElementById('new_password').value;

View File

@@ -546,6 +546,25 @@
.message.thinking { color: var(--muted); font-style: italic; } .message.thinking { color: var(--muted); font-style: italic; }
/* Confirmation gate */
.confirm-gate { display: flex; flex-direction: column; gap: 0.6rem; }
.confirm-gate p { margin: 0; }
.confirm-tools { font-size: 0.82rem; color: var(--muted); }
.confirm-actions { display: flex; gap: 0.5rem; margin-top: 0.25rem; }
.confirm-btn, .deny-btn {
padding: 0.35rem 0.9rem;
border-radius: 6px;
border: none;
font-size: 0.85rem;
font-weight: 600;
cursor: pointer;
transition: opacity 0.15s;
}
.confirm-btn { background: #16a34a; color: #fff; }
.confirm-btn:hover { opacity: 0.85; }
.deny-btn { background: var(--surface); border: 1px solid var(--border); color: var(--text); }
.deny-btn:hover { border-color: var(--muted); }
/* Copy button */ /* Copy button */
.message.assistant, .message.user { position: relative; } .message.assistant, .message.user { position: relative; }

View File

@@ -18,6 +18,8 @@ from google.genai import types
from tools.web import search as _web_search from tools.web import search as _web_search
from tools.ae_knowledge import journal_search as _ae_journal_search from tools.ae_knowledge import journal_search as _ae_journal_search
from tools.ae_knowledge import journal_list as _ae_journal_list from tools.ae_knowledge import journal_list as _ae_journal_list
from tools.ae_knowledge import journal_entry_read as _ae_journal_entry_read
from tools.ae_knowledge import journal_entries_list as _ae_journal_entries_list
from tools.ae_knowledge import journal_entry_create as _ae_journal_entry_create from tools.ae_knowledge import journal_entry_create as _ae_journal_entry_create
from tools.ae_knowledge import journal_entry_update as _ae_journal_entry_update from tools.ae_knowledge import journal_entry_update as _ae_journal_entry_update
from tools.ae_knowledge import journal_entry_disable as _ae_journal_entry_disable from tools.ae_knowledge import journal_entry_disable as _ae_journal_entry_disable
@@ -37,6 +39,7 @@ from tools.cron import (
from tools.reminders import ( from tools.reminders import (
reminders_add as _reminders_add, reminders_add as _reminders_add,
reminders_list as _reminders_list, reminders_list as _reminders_list,
reminders_remove as _reminders_remove,
reminders_clear as _reminders_clear, reminders_clear as _reminders_clear,
) )
from tools.scratch import ( from tools.scratch import (
@@ -45,7 +48,12 @@ from tools.scratch import (
scratch_append as _scratch_append, scratch_append as _scratch_append,
scratch_clear as _scratch_clear, scratch_clear as _scratch_clear,
) )
from tools.system import cortex_restart as _cortex_restart, cortex_logs as _cortex_logs from tools.system import (
cortex_restart as _cortex_restart,
cortex_logs as _cortex_logs,
cortex_status as _cortex_status,
cortex_update as _cortex_update,
)
from tools.web import http_fetch as _http_fetch from tools.web import http_fetch as _http_fetch
from tools.files import file_list as _file_list, file_write as _file_write from tools.files import file_list as _file_list, file_write as _file_write
from tools.notify import nc_talk_send as _nc_talk_send, email_send as _email_send from tools.notify import nc_talk_send as _nc_talk_send, email_send as _email_send
@@ -91,8 +99,9 @@ _ae_journal_list_declaration = types.FunctionDeclaration(
_ae_journal_search_declaration = types.FunctionDeclaration( _ae_journal_search_declaration = types.FunctionDeclaration(
name="ae_journal_search", name="ae_journal_search",
description=( description=(
"Search the Aether Journals knowledge base by keyword. " "Search Aether Journal entries. All parameters are optional — combine freely. "
"Use this to look up notes, documentation, meeting summaries, or any saved knowledge. " "Use 'query' for fulltext keyword search (supports boolean: +required -excluded \"phrase\"). "
"Use 'tags' to filter by tag substring. Use 'date_from'/'date_to' for date ranges (YYYY-MM-DD). "
"Always search before creating a new entry to avoid duplicates." "Always search before creating a new entry to avoid duplicates."
), ),
parameters=types.Schema( parameters=types.Schema(
@@ -100,21 +109,109 @@ _ae_journal_search_declaration = types.FunctionDeclaration(
properties={ properties={
"query": types.Schema( "query": types.Schema(
type=types.Type.STRING, type=types.Type.STRING,
description="Keyword or phrase to search for", description="Fulltext keyword search. Supports boolean mode: +required -excluded \"exact phrase\".",
), ),
"journal_id": types.Schema( "journal_id": types.Schema(
type=types.Type.STRING, type=types.Type.STRING,
description=( description="Scope results to a specific journal by its id_random. Omit to search all journals.",
"Optional: scope search to a specific journal by its id_random. " ),
"Omit to search all journals." "tags": types.Schema(
), type=types.Type.STRING,
description="Filter by tag substring (e.g. 'networking' matches entries tagged 'networking' or 'home-networking').",
),
"type_code": types.Schema(
type=types.Type.STRING,
description="Filter by exact type_code (e.g. 'note', 'meeting', 'log').",
),
"topic_code": types.Schema(
type=types.Type.STRING,
description="Filter by exact topic_code.",
),
"date_from": types.Schema(
type=types.Type.STRING,
description="Return entries created on or after this date (YYYY-MM-DD).",
),
"date_to": types.Schema(
type=types.Type.STRING,
description="Return entries created on or before this date (YYYY-MM-DD).",
),
"sort_by": types.Schema(
type=types.Type.STRING,
description="Sort field: 'updated' (default), 'created', 'name', or 'priority'.",
),
"sort_order": types.Schema(
type=types.Type.STRING,
description="Sort direction: 'desc' (default, newest first) or 'asc'.",
),
"status": types.Schema(
type=types.Type.INTEGER,
description="Filter by exact status code.",
),
"priority": types.Schema(
type=types.Type.INTEGER,
description="Filter by exact priority (1=low, 5=high).",
), ),
"max_results": types.Schema( "max_results": types.Schema(
type=types.Type.INTEGER, type=types.Type.INTEGER,
description="Maximum number of entries to return (default 10)", description="Number of results per page (default 10).",
),
"page": types.Schema(
type=types.Type.INTEGER,
description="Page number for pagination (default 1).",
), ),
}, },
required=["query"], required=[],
),
)
_ae_journal_entry_read_declaration = types.FunctionDeclaration(
name="ae_journal_entry_read",
description=(
"Fetch the full content of a single journal entry by its id_random. "
"Use this when you need to read an entry before editing it, or when search results "
"don't show enough content. Returns title, journal, tags, summary, and full content."
),
parameters=types.Schema(
type=types.Type.OBJECT,
properties={
"entry_id": types.Schema(
type=types.Type.STRING,
description="The id_random of the journal entry to read.",
),
"max_content_chars": types.Schema(
type=types.Type.INTEGER,
description="Maximum characters of content to return (default 4000). Increase for long entries.",
),
},
required=["entry_id"],
),
)
_ae_journal_entries_list_declaration = types.FunctionDeclaration(
name="ae_journal_entries_list",
description=(
"List entries in a specific journal, newest first. "
"Use this to browse what's in a journal when you don't have a search keyword, "
"or to find entries by browsing rather than searching. "
"Returns numbered entries with id, title, tags, summary, and date."
),
parameters=types.Schema(
type=types.Type.OBJECT,
properties={
"journal_id": types.Schema(
type=types.Type.STRING,
description="The id_random of the journal to list entries from.",
),
"max_results": types.Schema(
type=types.Type.INTEGER,
description="Number of entries to return (default 20, max 50).",
),
"page": types.Schema(
type=types.Type.INTEGER,
description="Page number for pagination (default 1).",
),
},
required=["journal_id"],
), ),
) )
@@ -282,6 +379,8 @@ _CALLABLES: dict[str, callable] = {
"web_search": _web_search, "web_search": _web_search,
"ae_journal_list": _ae_journal_list, "ae_journal_list": _ae_journal_list,
"ae_journal_search": _ae_journal_search, "ae_journal_search": _ae_journal_search,
"ae_journal_entry_read": _ae_journal_entry_read,
"ae_journal_entries_list": _ae_journal_entries_list,
"ae_journal_entry_create": _ae_journal_entry_create, "ae_journal_entry_create": _ae_journal_entry_create,
"ae_journal_entry_update": _ae_journal_entry_update, "ae_journal_entry_update": _ae_journal_entry_update,
"ae_journal_entry_disable": _ae_journal_entry_disable, "ae_journal_entry_disable": _ae_journal_entry_disable,
@@ -295,6 +394,8 @@ _CALLABLES: dict[str, callable] = {
"shell_exec": _shell_exec, "shell_exec": _shell_exec,
"cortex_restart": _cortex_restart, "cortex_restart": _cortex_restart,
"cortex_logs": _cortex_logs, "cortex_logs": _cortex_logs,
"cortex_status": _cortex_status,
"cortex_update": _cortex_update,
"http_fetch": _http_fetch, "http_fetch": _http_fetch,
"email_send": _email_send, "email_send": _email_send,
"nc_talk_send": _nc_talk_send, "nc_talk_send": _nc_talk_send,
@@ -308,6 +409,7 @@ _CALLABLES: dict[str, callable] = {
"cron_toggle": _cron_toggle, "cron_toggle": _cron_toggle,
"reminders_add": _reminders_add, "reminders_add": _reminders_add,
"reminders_list": _reminders_list, "reminders_list": _reminders_list,
"reminders_remove": _reminders_remove,
"reminders_clear": _reminders_clear, "reminders_clear": _reminders_clear,
"scratch_read": _scratch_read, "scratch_read": _scratch_read,
"scratch_write": _scratch_write, "scratch_write": _scratch_write,
@@ -584,6 +686,24 @@ _reminders_list_declaration = types.FunctionDeclaration(
parameters=types.Schema(type=types.Type.OBJECT, properties={}), parameters=types.Schema(type=types.Type.OBJECT, properties={}),
) )
_reminders_remove_declaration = types.FunctionDeclaration(
name="reminders_remove",
description=(
"Remove a single reminder by its number. "
"Call reminders_list first to get the numbered list, then pass the number of the reminder to remove."
),
parameters=types.Schema(
type=types.Type.OBJECT,
properties={
"index": types.Schema(
type=types.Type.INTEGER,
description="The number of the reminder to remove (1 = first item in reminders_list output).",
),
},
required=["index"],
),
)
_reminders_clear_declaration = types.FunctionDeclaration( _reminders_clear_declaration = types.FunctionDeclaration(
name="reminders_clear", name="reminders_clear",
description=( description=(
@@ -680,6 +800,28 @@ _cortex_logs_declaration = types.FunctionDeclaration(
), ),
) )
_cortex_status_declaration = types.FunctionDeclaration(
name="cortex_status",
description=(
"Return Cortex service status: current git branch and commit, how many commits "
"ahead/behind the remote, and the systemctl service state. "
"Use to check what version is running or whether the service is healthy. "
"ADMIN ONLY."
),
parameters=types.Schema(type=types.Type.OBJECT, properties={}),
)
_cortex_update_declaration = types.FunctionDeclaration(
name="cortex_update",
description=(
"Pull the latest code from git, run a syntax check on all Python files, and report "
"what changed. Does NOT restart automatically — call cortex_restart separately after "
"reviewing the output. Will report syntax errors if the pull introduces broken code. "
"ADMIN ONLY. Requires confirmation."
),
parameters=types.Schema(type=types.Type.OBJECT, properties={}),
)
_http_fetch_declaration = types.FunctionDeclaration( _http_fetch_declaration = types.FunctionDeclaration(
name="http_fetch", name="http_fetch",
description=( description=(
@@ -806,6 +948,8 @@ TOOL_ROLES: dict[str, str] = {
"claude_allow_dir": "admin", "claude_allow_dir": "admin",
"cortex_restart": "admin", "cortex_restart": "admin",
"cortex_logs": "admin", "cortex_logs": "admin",
"cortex_status": "admin",
"cortex_update": "admin",
"file_read": "admin", "file_read": "admin",
"file_list": "admin", "file_list": "admin",
"file_write": "admin", "file_write": "admin",
@@ -819,6 +963,7 @@ TOOL_ROLES: dict[str, str] = {
# the tool, prompting Claude to ask the user to confirm in a follow-up message. # the tool, prompting Claude to ask the user to confirm in a follow-up message.
CONFIRM_REQUIRED: set[str] = { CONFIRM_REQUIRED: set[str] = {
"cortex_restart", "cortex_restart",
"cortex_update",
"file_write", "file_write",
"shell_exec", "shell_exec",
"cron_remove", "cron_remove",
@@ -838,6 +983,8 @@ _ALL_DECLARATIONS: list[types.FunctionDeclaration] = [
_web_search_declaration, _web_search_declaration,
_ae_journal_list_declaration, _ae_journal_list_declaration,
_ae_journal_search_declaration, _ae_journal_search_declaration,
_ae_journal_entry_read_declaration,
_ae_journal_entries_list_declaration,
_ae_journal_entry_create_declaration, _ae_journal_entry_create_declaration,
_ae_journal_entry_update_declaration, _ae_journal_entry_update_declaration,
_ae_journal_entry_disable_declaration, _ae_journal_entry_disable_declaration,
@@ -851,6 +998,8 @@ _ALL_DECLARATIONS: list[types.FunctionDeclaration] = [
_shell_exec_declaration, _shell_exec_declaration,
_cortex_restart_declaration, _cortex_restart_declaration,
_cortex_logs_declaration, _cortex_logs_declaration,
_cortex_status_declaration,
_cortex_update_declaration,
_http_fetch_declaration, _http_fetch_declaration,
_email_send_declaration, _email_send_declaration,
_nc_talk_send_declaration, _nc_talk_send_declaration,
@@ -864,6 +1013,7 @@ _ALL_DECLARATIONS: list[types.FunctionDeclaration] = [
_cron_toggle_declaration, _cron_toggle_declaration,
_reminders_add_declaration, _reminders_add_declaration,
_reminders_list_declaration, _reminders_list_declaration,
_reminders_remove_declaration,
_reminders_clear_declaration, _reminders_clear_declaration,
_scratch_read_declaration, _scratch_read_declaration,
_scratch_write_declaration, _scratch_write_declaration,

View File

@@ -41,36 +41,98 @@ def _check_config() -> str | None:
# Tool: ae_journal_search # Tool: ae_journal_search
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
async def journal_search(query: str, journal_id: str | None = None, max_results: int = 10) -> str: async def journal_search(
"""Search AE Journal entries by keyword. query: str = "",
journal_id: str = "",
tags: str = "",
type_code: str = "",
topic_code: str = "",
date_from: str = "",
date_to: str = "",
sort_by: str = "updated",
sort_order: str = "desc",
status: int | None = None,
priority: int | None = None,
max_results: int = 10,
page: int = 1,
) -> str:
"""Search AE Journal entries.
Searches across the default_qry_str field (title + content excerpt). At least one of query, tags, type_code, topic_code, date_from, or journal_id
Optionally scoped to a specific journal by journal_id (id_random). should be provided. All filters combine with AND.
Returns a markdown-formatted list of matching entries.
""" """
err = _check_config() err = _check_config()
if err: if err:
return err return err
return await asyncio.to_thread(
return await asyncio.to_thread(_sync_journal_search, query, journal_id, max_results) _sync_journal_search,
query, journal_id, tags, type_code, topic_code,
date_from, date_to, sort_by, sort_order,
status, priority, max_results, page,
)
def _sync_journal_search(query: str, journal_id: str | None, max_results: int) -> str: def _sync_journal_search(
query: str,
journal_id: str,
tags: str,
type_code: str,
topic_code: str,
date_from: str,
date_to: str,
sort_by: str,
sort_order: str,
status: int | None,
priority: int | None,
max_results: int,
page: int,
) -> str:
import requests import requests
url = f"{settings.ae_api_url}/v3/crud/journal_entry/search" # Build sort field
search_body = { sort_field_map = {
"and_filters": [ "updated": "updated_on",
{"field": "default_qry_str", "op": "icontains", "value": query} "created": "created_on",
], "name": "name",
"page_size": max_results, "priority": "priority",
} }
sort_field = sort_field_map.get(sort_by, "updated_on")
order_by = f"{'-' if sort_order == 'desc' else ''}{sort_field}"
params = {} search_body: dict = {"page_size": max_results, "page": page, "order_by": order_by}
# Fulltext keyword — uses MATCH/AGAINST index
if query:
search_body["query_string"] = query
# Additional AND filters
and_filters: list[dict] = []
if tags:
and_filters.append({"field": "tags", "op": "icontains", "value": tags})
if type_code:
and_filters.append({"field": "type_code", "op": "eq", "value": type_code})
if topic_code:
and_filters.append({"field": "topic_code", "op": "eq", "value": topic_code})
if date_from:
and_filters.append({"field": "created_on", "op": "gte", "value": date_from})
if date_to:
and_filters.append({"field": "created_on", "op": "lte", "value": date_to})
if status is not None:
and_filters.append({"field": "status", "op": "eq", "value": status})
if priority is not None:
and_filters.append({"field": "priority", "op": "eq", "value": priority})
if and_filters:
search_body["and"] = and_filters
# query_string must be present for `and` filters to apply
if "query_string" not in search_body:
search_body["query_string"] = "%"
params: dict = {}
if journal_id: if journal_id:
params["for_obj_type"] = "journal" params["for_obj_type"] = "journal"
params["for_obj_id"] = journal_id params["for_obj_id"] = journal_id
url = f"{settings.ae_api_url}/v3/crud/journal_entry/search"
try: try:
resp = requests.post( resp = requests.post(
url, url,
@@ -86,30 +148,43 @@ def _sync_journal_search(query: str, journal_id: str | None, max_results: int) -
return f"Journal search error: {e}" return f"Journal search error: {e}"
entries = data.get("data", []) entries = data.get("data", [])
if not entries: total = (data.get("meta") or {}).get("data_list_count") or len(entries)
return f"No journal entries found matching: {query}"
if not entries:
desc = query or tags or type_code or topic_code or f"journal {journal_id}"
return f"No journal entries found for: {desc}"
label = query or tags or f"{len(entries)} entries"
lines = [f"Journal entries — **{label}** ({total} total, page {page}):\n"]
lines = [f"Journal entries matching **{query}** ({len(entries)} result(s)):\n"]
for entry in entries: for entry in entries:
title = entry.get("name") or "(untitled)" title = entry.get("name") or "(untitled)"
entry_id = entry.get("id_random", "") entry_id = entry.get("journal_entry_id") or entry.get("id") or ""
journal_name = entry.get("journal_name") or entry.get("parent_name") or "" journal_name = entry.get("journal_name") or entry.get("parent_name") or ""
summary = entry.get("summary") or "" summary = entry.get("summary") or ""
content_preview = (entry.get("content") or "")[:200].replace("\n", " ") entry_tags = entry.get("tags") or []
updated = (entry.get("updated_on") or entry.get("created_on") or "")[:10]
content_preview = (entry.get("content") or "")[:400].replace("\n", " ")
header = f"**{title}**" header = f"**{title}**"
if journal_name: if journal_name:
header += f" ({journal_name})" header += f" ({journal_name})"
if entry_id: header += f" — id: `{entry_id}`"
header += f" — id: `{entry_id}`" if updated:
header += f" [{updated}]"
lines.append(header) lines.append(header)
if entry_tags:
tag_list = entry_tags if isinstance(entry_tags, list) else [t.strip() for t in str(entry_tags).split(",")]
lines.append(f" Tags: {', '.join(tag_list)}")
if summary: if summary:
lines.append(f" Summary: {summary}") lines.append(f" {summary}")
if content_preview: elif content_preview:
lines.append(f" {content_preview}") lines.append(f" {content_preview}{'' if len(entry.get('content', '')) > 400 else ''}")
lines.append("") lines.append("")
if total > page * max_results:
lines.append(f"(More results — call again with page={page + 1})")
return "\n".join(lines).strip() return "\n".join(lines).strip()
@@ -264,6 +339,127 @@ def _patch_entry(entry_id: str, payload: dict) -> str:
return f"Error updating entry {entry_id}: {e}" return f"Error updating entry {entry_id}: {e}"
# ---------------------------------------------------------------------------
# Tool: ae_journal_entry_read
# ---------------------------------------------------------------------------
async def journal_entry_read(entry_id: str, max_content_chars: int = 4000) -> str:
"""Return the full content of a single journal entry by its id_random."""
err = _check_config()
if err:
return err
return await asyncio.to_thread(_sync_journal_entry_read, entry_id, max_content_chars)
def _sync_journal_entry_read(entry_id: str, max_content_chars: int) -> str:
entry = _get_entry(entry_id)
if isinstance(entry, str):
return entry
title = entry.get("name") or "(untitled)"
journal = entry.get("journal_name") or entry.get("parent_name") or ""
summary = entry.get("summary") or ""
raw_tags = entry.get("tags") or []
tags = raw_tags if isinstance(raw_tags, list) else [t.strip() for t in str(raw_tags).split(",") if t.strip()]
content = entry.get("content") or ""
updated = (entry.get("updated_on") or entry.get("created_on") or "")[:19].replace("T", " ")
enabled = entry.get("enable", True)
lines = [f"# {title}"]
meta: list[str] = [f"id: `{entry_id}`"]
if journal:
meta.append(f"journal: {journal}")
if updated:
meta.append(f"updated: {updated}")
if not enabled:
meta.append("**DISABLED**")
lines.append(" ".join(meta))
if tags:
lines.append(f"Tags: {', '.join(tags)}")
if summary:
lines.append(f"\nSummary: {summary}")
lines.append("\n---\n")
truncated = len(content) > max_content_chars
lines.append(content[:max_content_chars])
if truncated:
lines.append(
f"\n\n[Content truncated at {max_content_chars} chars — "
f"{len(content)} total. Call again with a higher max_content_chars to read more.]"
)
return "\n".join(lines)
# ---------------------------------------------------------------------------
# Tool: ae_journal_entries_list
# ---------------------------------------------------------------------------
async def journal_entries_list(journal_id: str, max_results: int = 20, page: int = 1) -> str:
"""List entries in a specific journal, newest first."""
err = _check_config()
if err:
return err
return await asyncio.to_thread(_sync_journal_entries_list, journal_id, max_results, page)
def _sync_journal_entries_list(journal_id: str, max_results: int, page: int) -> str:
import requests
url = f"{settings.ae_api_url}/v3/crud/journal_entry/search"
search_body: dict = {
"page_size": max_results,
"page": page,
"order_by": "-updated_on",
}
params = {"for_obj_type": "journal", "for_obj_id": journal_id}
try:
resp = requests.post(
url,
headers=_headers(),
params=params,
json=search_body,
timeout=settings.ae_api_timeout,
)
resp.raise_for_status()
data = resp.json()
except Exception as e:
logger.warning("ae_journal_entries_list failed: %s", e)
return f"Journal entries list error: {e}"
entries = data.get("data", [])
total = (data.get("meta") or {}).get("data_list_count") or len(entries)
if not entries:
return f"No entries found in journal `{journal_id}`."
offset = (page - 1) * max_results + 1
lines = [f"Entries in journal `{journal_id}` — showing {offset}{offset + len(entries) - 1} of {total}:\n"]
for i, entry in enumerate(entries, offset):
title = entry.get("name") or "(untitled)"
entry_id = entry.get("journal_entry_id") or entry.get("id") or ""
raw_tags = entry.get("tags") or []
tags = raw_tags if isinstance(raw_tags, list) else [t.strip() for t in str(raw_tags).split(",") if t.strip()]
summary = entry.get("summary") or ""
updated = (entry.get("updated_on") or entry.get("created_on") or "")[:10]
enabled = entry.get("enable", True)
status = "" if enabled else " [disabled]"
date_str = f" [{updated}]" if updated else ""
lines.append(f"{i}. **{title}**{status} — id: `{entry_id}`{date_str}")
if tags:
lines.append(f" Tags: {', '.join(tags)}")
if summary:
lines.append(f" {summary[:150]}{'' if len(summary) > 150 else ''}")
lines.append("")
if total > offset + len(entries) - 1:
lines.append(f"(More entries available — call again with page={page + 1})")
return "\n".join(lines).rstrip()
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Tool: ae_journal_entry_update # Tool: ae_journal_entry_update
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------

View File

@@ -27,6 +27,28 @@ def _now_label() -> str:
return datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M UTC") return datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M UTC")
def _parse_sections(text: str) -> list[tuple[str, str]]:
"""Split REMINDERS.md into (heading, body) tuples, one per ## section."""
sections: list[tuple[str, str]] = []
heading: str | None = None
body_lines: list[str] = []
for line in text.splitlines():
if line.startswith("## "):
if heading is not None:
sections.append((heading, "\n".join(body_lines).strip()))
heading = line[3:].strip()
body_lines = []
elif heading is not None:
body_lines.append(line)
if heading is not None:
sections.append((heading, "\n".join(body_lines).strip()))
return sections
def _sections_to_text(sections: list[tuple[str, str]]) -> str:
return "".join(f"\n## {h}\n\n{b}\n" for h, b in sections)
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Sync implementations # Sync implementations
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
@@ -35,7 +57,18 @@ def _reminders_list() -> str:
p = _reminders_path() p = _reminders_path()
if not p.exists() or not p.read_text().strip(): if not p.exists() or not p.read_text().strip():
return "No pending reminders." return "No pending reminders."
return p.read_text() sections = _parse_sections(p.read_text())
if not sections:
return "No pending reminders."
lines = []
for i, (heading, body) in enumerate(sections, 1):
lines.append(f"{i}. {heading}")
if body:
# Indent body so it reads as belonging to the numbered item
for bline in body.splitlines()[:4]: # cap at 4 lines for brevity
lines.append(f" {bline}")
lines.append("")
return "\n".join(lines).rstrip()
def _reminders_add(text: str, label: str | None = None) -> str: def _reminders_add(text: str, label: str | None = None) -> str:
@@ -47,6 +80,26 @@ def _reminders_add(text: str, label: str | None = None) -> str:
return f"Reminder added: {heading}" return f"Reminder added: {heading}"
def _reminders_remove(index: int) -> str:
p = _reminders_path()
if not p.exists() or not p.read_text().strip():
return "No reminders to remove."
sections = _parse_sections(p.read_text())
if not sections:
return "No reminders to remove."
if index < 1 or index > len(sections):
return (
f"Index {index} is out of range. "
f"There {'is' if len(sections) == 1 else 'are'} {len(sections)} "
f"reminder{'s' if len(sections) != 1 else ''} (1{len(sections)}). "
"Call reminders_list to see them."
)
removed_heading = sections[index - 1][0]
sections.pop(index - 1)
p.write_text(_sections_to_text(sections))
return f"Removed reminder {index}: {removed_heading}"
def _reminders_clear() -> str: def _reminders_clear() -> str:
p = _reminders_path() p = _reminders_path()
p.write_text("") p.write_text("")
@@ -65,5 +118,9 @@ async def reminders_add(text: str, label: str | None = None) -> str:
return await asyncio.to_thread(_reminders_add, text, label) return await asyncio.to_thread(_reminders_add, text, label)
async def reminders_remove(index: int) -> str:
return await asyncio.to_thread(_reminders_remove, index)
async def reminders_clear() -> str: async def reminders_clear() -> str:
return await asyncio.to_thread(_reminders_clear) return await asyncio.to_thread(_reminders_clear)

View File

@@ -2,16 +2,21 @@
System tools — local machine operations. System tools — local machine operations.
These tools affect the host system directly. Use with care. These tools affect the host system directly. Use with care.
cortex_restart and cortex_logs require admin role. All tools in this module require the admin role.
""" """
import asyncio import asyncio
import logging import logging
import os import os
import subprocess import subprocess
from pathlib import Path
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# Absolute paths — resolved relative to this file so they work regardless of cwd
_CORTEX_DIR = Path(__file__).parent # .../Cortex_and_Inara_dev/cortex/
_PROJECT_ROOT = _CORTEX_DIR.parent # .../Cortex_and_Inara_dev/
ALLOW_SCRIPT = "/home/scott/.local/bin/claude-allow-dir" ALLOW_SCRIPT = "/home/scott/.local/bin/claude-allow-dir"
@@ -124,3 +129,120 @@ async def cortex_logs(lines: int = 50) -> str:
except Exception as e: except Exception as e:
logger.error("cortex_logs error: %s", e) logger.error("cortex_logs error: %s", e)
return f"Error: {e}" return f"Error: {e}"
async def cortex_status() -> str:
"""Return Cortex service status: git branch/commit, ahead/behind remote, and systemctl state."""
lines = []
async def _git(*args: str) -> str:
proc = await asyncio.create_subprocess_exec(
"git", "-C", str(_PROJECT_ROOT), *args,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.DEVNULL,
)
stdout, _ = await asyncio.wait_for(proc.communicate(), timeout=10)
return stdout.decode(errors="replace").strip()
try:
branch = await _git("rev-parse", "--abbrev-ref", "HEAD")
commit = await _git("log", "--oneline", "-1")
# fetch quietly so ahead/behind is current
await asyncio.create_subprocess_exec(
"git", "-C", str(_PROJECT_ROOT), "fetch", "--quiet",
stdout=asyncio.subprocess.DEVNULL, stderr=asyncio.subprocess.DEVNULL,
)
ahead_behind = await _git("rev-list", "--left-right", "--count", f"HEAD...origin/{branch}")
ahead, behind = (ahead_behind.split() + ["?", "?"])[:2]
lines.append(f"**Branch:** {branch} | ahead {ahead} / behind {behind}")
lines.append(f"**Commit:** {commit}")
except Exception as e:
lines.append(f"Git info unavailable: {e}")
lines.append("")
try:
proc = await asyncio.create_subprocess_exec(
"systemctl", "--user", "status", "cortex", "--no-pager", "-l",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.STDOUT,
)
stdout, _ = await asyncio.wait_for(proc.communicate(), timeout=10)
# First 15 lines of systemctl output is enough — avoids log flood
status_lines = stdout.decode(errors="replace").splitlines()[:15]
lines.extend(status_lines)
except Exception as e:
lines.append(f"systemctl status unavailable: {e}")
return "\n".join(lines)
async def cortex_update() -> str:
"""Pull the latest code from git, syntax-check all Python files, and report.
Does NOT restart automatically — call cortex_restart separately after reviewing
the output if you want to apply changes.
"""
lines = []
async def _run(*cmd: str, cwd: Path = _PROJECT_ROOT, timeout: int = 30) -> tuple[int, str]:
proc = await asyncio.create_subprocess_exec(
*cmd, cwd=str(cwd),
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.STDOUT,
)
stdout, _ = await asyncio.wait_for(proc.communicate(), timeout=timeout)
return proc.returncode, stdout.decode(errors="replace").strip()
# 1. Check for incoming commits before pulling
try:
await _run("git", "fetch", "--quiet")
rc, incoming = await _run("git", "log", "--oneline", "HEAD..origin/HEAD")
if rc == 0 and not incoming:
# Double-check with branch name in case origin/HEAD isn't set
branch_rc, branch = await _run("git", "rev-parse", "--abbrev-ref", "HEAD")
_, incoming = await _run("git", "log", "--oneline", f"HEAD..origin/{branch.strip()}")
except asyncio.TimeoutError:
return "Error: git fetch timed out — check network connectivity."
except Exception as e:
return f"Error during git fetch: {e}"
if not incoming:
rc2, current = await _run("git", "log", "--oneline", "-1")
return f"Already up to date.\n\nCurrent commit: {current}"
lines.append(f"**Incoming commits:**\n{incoming}\n")
# 2. Pull
try:
rc, pull_out = await _run("git", "pull", "--ff-only")
except asyncio.TimeoutError:
return "Error: git pull timed out."
except Exception as e:
return f"Error during git pull: {e}"
if rc != 0:
return f"git pull failed (exit {rc}):\n{pull_out}"
lines.append(f"**git pull:**\n{pull_out}\n")
# 3. Syntax check all Python files under cortex/
py_files = sorted(_CORTEX_DIR.rglob("*.py"))
errors = []
for f in py_files:
result = subprocess.run(
["python3", "-m", "py_compile", str(f)],
capture_output=True, text=True,
)
if result.returncode != 0:
errors.append(f" {f.relative_to(_PROJECT_ROOT)}: {result.stderr.strip()}")
if errors:
lines.append(f"**Syntax errors — do NOT restart until fixed:**")
lines.extend(errors)
else:
lines.append(f"**Syntax check:** {len(py_files)} files — all OK.")
lines.append("Call `cortex_restart` to apply the update.")
return "\n".join(lines)

View File

@@ -0,0 +1,226 @@
# Aether Platform Integration — Cortex Tool Layer
> Last updated: 2026-04-30
> Status: Journal toolset complete — broader AE integration planned
This doc covers how Cortex/Inara integrates with the Aether Platform API, what's
implemented, what the data model looks like, and what's planned next.
---
## Overview
Cortex connects to the Aether Platform V3 API to give the orchestrator read/write
access to the user's knowledge base (Journals) and task data. Auth uses the same
`x-aether-api-key` + `x-account-id` headers as every other Aether client.
Config lives in `.env`:
```
AE_API_URL=https://dev-api.oneskyit.com
AE_API_KEY=...
AE_ACCOUNT_ID=...
AE_API_TIMEOUT=15
```
Tool implementation: `cortex/tools/ae_knowledge.py`
Tool registrations: `cortex/tools/__init__.py`
---
## V3 Search Engine
### Endpoint
```
POST /v3/crud/{obj_type}/search
```
For nested objects (journal_entry scoped to a journal):
```
POST /v3/crud/journal_entry/search
?for_obj_type=journal&for_obj_id={journal_id}
```
### Search body
```json
{
"query_string": "fulltext search term",
"and": [
{ "field": "tags", "op": "icontains", "value": "networking" },
{ "field": "created_on", "op": "gte", "value": "2026-01-01" }
],
"or": [...],
"page_size": 20,
"page": 1,
"order_by": "-updated_on"
}
```
**`query_string` vs `and` filters on `default_qry_str`:**
- `query_string` → triggers `MATCH(default_qry_str) AGAINST(... IN BOOLEAN MODE)` — uses the
FULLTEXT index. Faster and supports boolean operators (`+word`, `-word`, `"phrase"`).
- `and` with `icontains` on `default_qry_str` → plain `LIKE '%term%'`. Slower, no index.
**Important:** `query_string` must be present for `and`/`or` filters to apply. When using
filters without a keyword query, pass `query_string: "%"` as a wildcard to activate the
filter path without restricting by keyword.
### Supported operators
| Operator | SQL | Notes |
|---|---|---|
| `eq` | `=` | exact match |
| `ne` | `!=` | not equal |
| `gt` / `gte` | `>` / `>=` | numeric, dates |
| `lt` / `lte` | `<` / `<=` | numeric, dates |
| `contains` / `icontains` | `LIKE '%v%'` | substring; both case-insensitive on MariaDB |
| `startswith` / `istartswith` | `LIKE 'v%'` | |
| `endswith` / `iendswith` | `LIKE '%v'` | |
| `like` | `LIKE` | raw LIKE pattern |
| `in` | `IN (...)` | value is a list |
| `is_null` / `is_not_null` | `IS NULL` / `IS NOT NULL` | no value needed |
### Sorting
`order_by` accepts any indexed field name. Prefix with `-` for descending:
- `-updated_on` (default for listing)
- `-created_on`
- `name`
- `-priority`
### Pagination
`page_size` (default 10, max ~100) + `page` (1-based).
Total count is in `response["meta"]["data_list_count"]` — not a top-level key.
---
## journal_entry Schema
Full table schema from `ae_describe journal_entry --detailed`:
| Field | Type | Indexed | Notes |
|---|---|---|---|
| `id_random` | varchar(22) | UNI | DB public ID field — but API responses return this as `journal_entry_id` (the Vision ID convention: `{obj_type}_id`). `id_random` key is `None` in responses. |
| `journal_id` | int | MUL | FK — use `for_obj_id` param in search |
| `name` | varchar(250) | MUL | Entry title |
| `short_name` | varchar(25) | | |
| `summary` | text | | Short summary (12 sentences) |
| `content` | text | | Full markdown content |
| `content_html` | text | | HTML version |
| `content_json` | longtext | | Structured content (editor format) |
| `content_encrypted` | longtext | | Optional encrypted content |
| `tags` | varchar(255) | MUL | Comma-separated string — filter with `icontains` |
| `type` / `type_code` | varchar | | Classification: type |
| `topic` / `topic_code` | varchar | | Classification: topic |
| `activity` / `activity_code` | varchar | | Classification: activity |
| `category_code` | varchar(25) | | Classification: category |
| `code` | varchar(20) | | Short entry code |
| `start_datetime` | datetime | MUL | Optional event start |
| `end_datetime` | datetime | | Optional event end |
| `seconds` / `hours` | int/decimal | | Duration |
| `priority` | tinyint | MUL | 1=low → 5=high |
| `status` | int | MUL | Status code (domain-specific) |
| `private` / `public` / `personal` / `professional` | tinyint | MUL | Visibility flags |
| `billable` | tinyint | | Billing flag |
| `enable` | tinyint NOT NULL | MUL | Soft-delete flag (default 1) |
| `hide` | tinyint | MUL | UI hide flag |
| `archive` | tinyint | MUL | Archived flag |
| `default_qry_str` | text | FULLTEXT | Auto-generated search target (name + content) |
| `data_json` | longtext | | Arbitrary structured data |
| `notes` | text | | Internal notes |
| `created_on` | timestamp NOT NULL | MUL | Auto-set on create |
| `updated_on` | timestamp | MUL | Auto-updated on change |
### journal Schema (top-level)
| Field | Type | Notes |
|---|---|---|
| `id_random` | varchar(22) | DB field — returned in API as `journal_id` |
| `name` | varchar(250) | Journal name |
| `summary` / `description` | text | |
| `type_code` | varchar(25) | Journal type |
| `enable` | tinyint | |
| `created_on` / `updated_on` | timestamp | |
---
## Current Tool Inventory
| Tool | Status | Notes |
|---|---|---|
| `ae_journal_list` | ✅ | Lists journals with id + name |
| `ae_journal_search` | ✅ | Fulltext + tag/date/type/status/priority filters; paginated |
| `ae_journal_entry_read` | ✅ | Full content by entry_id; configurable truncation |
| `ae_journal_entries_list` | ✅ | Browse a journal newest-first; paginated |
| `ae_journal_entry_create` | ✅ | Create with title, content, tags, summary |
| `ae_journal_entry_update` | ✅ | Patch any fields (title, content, tags, summary, enable) |
| `ae_journal_entry_disable` | ✅ | Soft-delete (enable=false) |
| `ae_journal_entry_append` | ✅ | Timestamped append to bottom |
| `ae_journal_entry_prepend` | ✅ | Timestamped prepend to top |
| `ae_task_list` | ✅ | agents_sync Kanban (admin only) |
---
## ae_journal_search — Current Signature
All filters are optional and combine with AND. At least one should be provided.
```python
ae_journal_search(
query: str = "", # fulltext via query_string (MATCH/AGAINST)
journal_id: str = "", # scope to a specific journal
tags: str = "", # icontains on tags field
type_code: str = "", # eq on type_code
topic_code: str = "", # eq on topic_code
date_from: str = "", # created_on gte (YYYY-MM-DD)
date_to: str = "", # created_on lte (YYYY-MM-DD, exclusive of time — use next day to include full day)
sort_by: str = "updated", # updated | created | name | priority
sort_order: str = "desc",
status: int | None = None,
priority: int | None = None,
max_results: int = 10,
page: int = 1,
)
```
**date_to boundary note:** `date_to='2026-01-17'` means `<= 2026-01-17 00:00:00`, which
excludes entries created later that day. Use `date_to='2026-01-18'` to include all of Jan 17.
---
## Planned: Broader AE Platform Integration
### Phase 1 — Journal Toolset (current)
Complete read/write/search for Journals and Journal Entries.
### Phase 2 — Tasks & Projects
- `ae_task_create` / `ae_task_update` / `ae_task_complete` on Aether tasks (not just agents_sync Kanban)
- Read project/task hierarchy
### Phase 3 — Knowledge Import Pipeline
- Script to walk markdown dirs, chunk by H2, create Journal entries
- Dedup via search-before-create pattern
- Tag and classify entries automatically via orchestrator
### Phase 4 — People & Contacts
- Read contact records (person, organization)
- Link journal entries to contacts
### Phase 5 — Calendar / Events
- `start_datetime` / `end_datetime` already on journal_entry
- Could expose time-scoped journal queries as a calendar view
---
## Notes on `tags` field
`tags` is stored as a raw comma-separated varchar(255), not a JSON array.
The API accepts a Python list on write (the `tags` PATCH key takes a list and the backend joins it).
On read, it comes back as a **string** (e.g. `"shelterluv, api"`), not a list — normalize before
displaying: `[t.strip() for t in tags_str.split(",") if t.strip()]`.
For filtering: use `icontains` on `tags` inside the `"and"` list, e.g.:
`{"field": "tags", "op": "icontains", "value": "networking"}`.
A tag search for "net" matches "networking" AND "subnet" — acceptable for now.
True per-tag filtering would require a tags junction table.
## Notes on `default_qry_str`
Auto-populated by the backend from `name` + content fields. Do not write to it directly.
FULLTEXT index supports boolean mode: `+required -excluded "exact phrase"`.
The `query_string` key in the search body triggers this path automatically.