feat: Intelligence Layer Phase 1 — orchestrator service

Adds the Gemini API orchestrator (ReAct tool loop → Claude responder):

Orchestrator engine + router:
- orchestrator_engine.py: Gemini API tool loop, Claude CLI handoff
- routers/orchestrator.py: POST /orchestrate (async job queue), GET /orchestrate/{job_id}

Tools (cortex/tools/):
- web.py: DuckDuckGo web search (no key required)
- ae_knowledge.py: ae_journal_search + ae_journal_entry_create (AE V3 API)
- ae_tasks.py: ae_task_list (reads agents_sync Kanban filesystem)
- files.py: file_read (path-allowlisted to safe dirs)

Config + deps:
- config.py: orchestrator, DuckDuckGo, and AE API settings
- requirements.txt: google-genai, duckduckgo-search
- .env.default: reference config with all new keys documented

Docs:
- CLAUDE.md, README.md, documentation/ added to repo
- Port references updated 7331 → 8000 throughout
- Default model updated to gemini-2.5-flash

Tested: ae_task_list, ae_journal_search, web_search all working end-to-end.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Scott Idem
2026-03-18 19:37:49 -04:00
parent 23f8659aaa
commit ed472ce9a0
15 changed files with 1840 additions and 1 deletions

View File

@@ -4,6 +4,24 @@ from pydantic_settings import BaseSettings, SettingsConfigDict
class Settings(BaseSettings):
anthropic_api_key: str | None = None # not used — claude CLI handles auth
# Orchestrator (Gemini API — separate from Gemini CLI)
# Get a key at: https://aistudio.google.com/apikey (free tier is sufficient)
gemini_api_key: str | None = None
orchestrator_model: str = "gemini-2.5-flash" # model used for tool loop
orchestrator_max_rounds: int = 10 # safety cap on tool iterations
# DuckDuckGo search (used by orchestrator web_search tool)
# Leave blank to use the free unauthenticated tier; set to your API key for higher limits
ddg_api_key: str | None = None
ddg_max_results: int = 5
# Aether Platform API (used by orchestrator ae_journal_* and ae_task_list tools)
ae_api_url: str = "https://dev-api.oneskyit.com"
ae_api_key: str = "" # x-aether-api-key header
ae_account_id: str = "" # x-account-id header
ae_api_timeout: int = 15 # per-request timeout in seconds
inara_dir: Path = Path("../inara")
sessions_dir: Path = Path("./data/sessions")
default_model: str = "claude-sonnet-4-6"

View File

@@ -8,7 +8,7 @@ import uvicorn
logging.basicConfig(level=logging.INFO, format="%(levelname)s:%(name)s: %(message)s")
from config import settings
from routers import chat, google_chat, nextcloud_talk, files, distill, auth
from routers import chat, google_chat, nextcloud_talk, files, distill, auth, orchestrator
@asynccontextmanager
@@ -29,6 +29,7 @@ app.include_router(nextcloud_talk.router)
app.include_router(files.router)
app.include_router(distill.router)
app.include_router(auth.router)
app.include_router(orchestrator.router)
app.mount("/static", StaticFiles(directory="static"), name="static")

View File

@@ -0,0 +1,243 @@
"""
Orchestrator engine — two-brain architecture.
Flow:
1. Gemini API runs a ReAct tool loop (reason → act → observe → repeat)
2. When Gemini has gathered enough context, it produces a final summary
3. That enriched context is handed off to Claude for the user-facing response
Why this split:
- Gemini API has native structured tool calling (Gemini CLI subprocess does not)
- Claude produces higher-quality user-facing prose and reasoning
- Claude Pro subscription has no API cost; Gemini free tier handles orchestration load
For direct chat (no tools needed), this engine is not invoked — the chat router
calls llm_client.complete() directly, which is faster and has no orchestration overhead.
"""
import asyncio
import logging
from dataclasses import dataclass, field
from google import genai
from google.genai import types
from config import settings
from llm_client import complete
from tools import TOOL_DECLARATIONS, call_tool
logger = logging.getLogger(__name__)
# System prompt given to Gemini during the tool loop.
# Gemini's job is information gathering and planning — NOT writing the final response.
_ORCHESTRATOR_SYSTEM = """You are an intelligent orchestrator. Your job is to:
1. Understand the user's request
2. Call tools to gather the information needed to answer it
3. Once you have enough information, produce a concise summary of:
- What the user asked
- What you found (tool results, key facts)
- Any important context that would help generate a good answer
Do NOT write a polished final answer — a human-facing AI will do that next.
Keep your summary factual and complete. Include relevant URLs, data, and specifics.
If no tools are needed, return an empty summary."""
@dataclass
class OrchestratorResult:
response: str # final user-facing response (from Claude)
tool_calls: list[dict] = field(default_factory=list) # [{tool, args, result}]
backend: str = "claude" # model that produced the final response
gemini_summary: str = "" # what Gemini handed to Claude (debug/display)
async def run(
task: str,
system_prompt: str = "",
session_messages: list[dict] | None = None,
respond_with_claude: bool = True,
) -> OrchestratorResult:
"""
Run the full orchestration loop for a task.
Args:
task: The user's request (plain text)
system_prompt: Inara's system prompt (from context_loader) — passed to Claude
session_messages: Prior conversation history for session continuity
respond_with_claude: If False, return Gemini's summary as the response (useful for
background/cron tasks where a polished reply isn't needed)
Returns:
OrchestratorResult with response, tool call log, backend used, and Gemini summary
"""
if not settings.gemini_api_key:
raise RuntimeError(
"GEMINI_API_KEY not set — orchestrator requires Gemini API. "
"Get a free key at https://aistudio.google.com/apikey and add it to .env"
)
client = genai.Client(api_key=settings.gemini_api_key)
# Seed Gemini with the task — include recent session context if available
task_with_context = _build_task_prompt(task, session_messages)
contents: list[types.Content] = [
types.Content(role="user", parts=[types.Part(text=task_with_context)])
]
tool_call_log: list[dict] = []
gemini_summary = ""
# --- ReAct tool loop ---
for round_num in range(settings.orchestrator_max_rounds):
logger.info("Orchestrator round %d for task: %.80s", round_num + 1, task)
response = await asyncio.to_thread(
client.models.generate_content,
model=settings.orchestrator_model,
contents=contents,
config=types.GenerateContentConfig(
tools=TOOL_DECLARATIONS,
system_instruction=_ORCHESTRATOR_SYSTEM,
),
)
candidate = response.candidates[0]
parts = candidate.content.parts if candidate.content else []
# Check if Gemini wants to call any tools
tool_call_parts = [
p for p in parts
if hasattr(p, "function_call") and p.function_call and p.function_call.name
]
if not tool_call_parts:
# No more tool calls — extract Gemini's text summary
gemini_summary = "".join(
p.text for p in parts if hasattr(p, "text") and p.text
).strip()
logger.info("Orchestrator done after %d round(s). Tools used: %d",
round_num + 1, len(tool_call_log))
break
# Add Gemini's response (with function calls) to the conversation
contents.append(candidate.content)
# Execute all tool calls in parallel
tool_tasks = [
_execute_tool(fc.function_call.name, dict(fc.function_call.args))
for fc in tool_call_parts
]
tool_results = await asyncio.gather(*tool_tasks, return_exceptions=True)
# Build function response parts and update log
response_parts: list[types.Part] = []
for fc_part, result in zip(tool_call_parts, tool_results):
fc = fc_part.function_call
result_str = str(result) if not isinstance(result, Exception) else f"Error: {result}"
logger.info("Tool %s%d chars", fc.name, len(result_str))
tool_call_log.append({
"tool": fc.name,
"args": dict(fc.args),
"result": result_str,
})
response_parts.append(
types.Part(
function_response=types.FunctionResponse(
name=fc.name,
response={"result": result_str},
)
)
)
contents.append(types.Content(role="user", parts=response_parts))
else:
# Hit the round limit — use whatever Gemini produced last
logger.warning("Orchestrator hit max rounds (%d)", settings.orchestrator_max_rounds)
gemini_summary = (
f"Reached the tool iteration limit ({settings.orchestrator_max_rounds} rounds). "
"Here is what was gathered so far:\n\n"
+ "\n\n".join(f"**{t['tool']}**: {t['result'][:500]}" for t in tool_call_log)
)
# --- Claude handoff ---
if respond_with_claude:
claude_prompt = _build_claude_prompt(task, tool_call_log, gemini_summary)
# Merge with session history so Claude has conversation context
messages = list(session_messages or [])
messages.append({"role": "user", "content": claude_prompt})
response_text, backend = await complete(
system_prompt=system_prompt,
messages=messages,
model="claude",
)
else:
# Cron/background tasks: return Gemini's summary directly, no Claude call
response_text = gemini_summary or "No information gathered."
backend = "gemini"
return OrchestratorResult(
response=response_text,
tool_calls=tool_call_log,
backend=backend,
gemini_summary=gemini_summary,
)
async def _execute_tool(name: str, args: dict) -> str:
"""Execute a single tool call, catching all exceptions."""
try:
return await call_tool(name, args)
except Exception as e:
logger.warning("Tool %s failed: %s", name, e)
return f"Tool error: {e}"
def _build_task_prompt(task: str, session_messages: list[dict] | None) -> str:
"""Prepend recent session context so Gemini understands the conversation."""
if not session_messages:
return task
# Include last few turns for context (don't send the full history to keep tokens low)
recent = session_messages[-6:] # last 3 turns
history_lines = []
for msg in recent:
label = "User" if msg["role"] == "user" else "Assistant"
history_lines.append(f"{label}: {msg['content'][:300]}") # truncate long messages
context = "\n".join(history_lines)
return f"<recent_conversation>\n{context}\n</recent_conversation>\n\nCurrent request: {task}"
def _build_claude_prompt(
task: str,
tool_calls: list[dict],
gemini_summary: str,
) -> str:
"""Build the enriched context handed from Gemini to Claude."""
parts = [f"User request: {task}\n"]
if tool_calls:
parts.append("## Research gathered\n")
for tc in tool_calls:
parts.append(f"### {tc['tool']}({_format_args(tc['args'])})")
# Truncate very long results — Claude gets the gist
result = tc["result"]
if len(result) > 2000:
result = result[:2000] + "\n… [truncated]"
parts.append(result)
parts.append("")
if gemini_summary:
parts.append("## Summary of findings\n")
parts.append(gemini_summary)
return "\n".join(parts)
def _format_args(args: dict) -> str:
"""Format tool args as a compact string for display."""
return ", ".join(f"{k}={repr(v)}" for k, v in args.items())

View File

@@ -4,5 +4,9 @@ uvicorn[standard]>=0.30.0
pydantic-settings>=2.0.0
python-dotenv>=1.0.0
# Orchestrator — Gemini API (native tool calling) + web search
google-genai>=1.0.0
duckduckgo-search>=6.3.0
# anthropic SDK not needed — using claude CLI subprocess for auth
# anthropic>=0.40.0

View File

@@ -0,0 +1,174 @@
"""
Orchestrator router — POST /orchestrate, GET /orchestrate/{job_id}
Accepts a task description, runs it through the orchestrator engine
(Gemini tool loop → Claude response), and returns the result.
Designed to be triggered from:
- The Cortex web UI (future "Agent mode" toggle)
- Cron jobs: curl -X POST http://localhost:8000/orchestrate -d '{"task":"..."}'
- Webhooks: Gitea, Aether events, etc.
"""
import asyncio
import logging
import uuid
from datetime import datetime, timezone
from fastapi import APIRouter
from pydantic import BaseModel
from config import settings
from context_loader import load_context
import orchestrator_engine
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/orchestrate", tags=["orchestrator"])
# ---------------------------------------------------------------------------
# In-memory job store
# Jobs are keyed by UUID. For this phase, memory is fine — jobs are short-lived.
# ---------------------------------------------------------------------------
_jobs: dict[str, dict] = {}
_jobs_lock = asyncio.Lock()
# ---------------------------------------------------------------------------
# Request / response models
# ---------------------------------------------------------------------------
class OrchestrateRequest(BaseModel):
task: str
session_id: str | None = None # include session history in context
tier: int | None = None # Inara context tier (default from settings)
respond_with_claude: bool = True # False = return Gemini summary only (faster, for cron)
include_long: bool = True
include_mid: bool = True
include_short: bool = True
class OrchestrateResponse(BaseModel):
job_id: str
status: str # "queued" | "running" | "complete" | "error"
class JobStatusResponse(BaseModel):
job_id: str
status: str
task: str
created_at: str
completed_at: str | None = None
response: str | None = None
tool_calls: list[dict] | None = None
backend: str | None = None
gemini_summary: str | None = None
error: str | None = None
# ---------------------------------------------------------------------------
# Endpoints
# ---------------------------------------------------------------------------
@router.post("", response_model=OrchestrateResponse)
async def orchestrate(req: OrchestrateRequest) -> OrchestrateResponse:
"""Submit a task to the orchestrator. Returns a job_id to poll."""
job_id = str(uuid.uuid4())
now = datetime.now(timezone.utc).isoformat()
job: dict = {
"job_id": job_id,
"status": "queued",
"task": req.task,
"created_at": now,
"completed_at": None,
"response": None,
"tool_calls": None,
"backend": None,
"gemini_summary": None,
"error": None,
}
async with _jobs_lock:
_jobs[job_id] = job
# Run in background — caller polls GET /orchestrate/{job_id}
asyncio.create_task(_run_job(job_id, req))
logger.info("Orchestrator job queued: %s%.80s", job_id, req.task)
return OrchestrateResponse(job_id=job_id, status="queued")
@router.get("/{job_id}", response_model=JobStatusResponse)
async def job_status(job_id: str) -> JobStatusResponse:
"""Poll the status of an orchestrator job."""
async with _jobs_lock:
job = _jobs.get(job_id)
if job is None:
from fastapi import HTTPException
raise HTTPException(status_code=404, detail=f"Job {job_id} not found")
return JobStatusResponse(**job)
@router.get("", response_model=list[JobStatusResponse])
async def list_jobs() -> list[JobStatusResponse]:
"""List all jobs (most recent first). Useful for debugging."""
async with _jobs_lock:
jobs = sorted(_jobs.values(), key=lambda j: j["created_at"], reverse=True)
return [JobStatusResponse(**j) for j in jobs]
# ---------------------------------------------------------------------------
# Background runner
# ---------------------------------------------------------------------------
async def _run_job(job_id: str, req: OrchestrateRequest) -> None:
"""Execute the orchestration job and update the job store."""
async with _jobs_lock:
_jobs[job_id]["status"] = "running"
try:
# Load Inara's system prompt (same as the chat router does)
tier = req.tier or settings.default_tier
system_prompt = load_context(
tier,
include_long=req.include_long,
include_mid=req.include_mid,
include_short=req.include_short,
)
# Load session history if a session_id was provided
session_messages: list[dict] | None = None
if req.session_id:
from session_store import load as load_session
session_messages = load_session(req.session_id) or None
result = await orchestrator_engine.run(
task=req.task,
system_prompt=system_prompt,
session_messages=session_messages,
respond_with_claude=req.respond_with_claude,
)
now = datetime.now(timezone.utc).isoformat()
async with _jobs_lock:
_jobs[job_id].update({
"status": "complete",
"completed_at": now,
"response": result.response,
"tool_calls": result.tool_calls,
"backend": result.backend,
"gemini_summary": result.gemini_summary,
})
logger.info("Orchestrator job complete: %s (%d tool calls)", job_id, len(result.tool_calls))
except Exception as e:
logger.exception("Orchestrator job failed: %s", job_id)
now = datetime.now(timezone.utc).isoformat()
async with _jobs_lock:
_jobs[job_id].update({
"status": "error",
"completed_at": now,
"error": str(e),
})

193
cortex/tools/__init__.py Normal file
View File

@@ -0,0 +1,193 @@
"""
Orchestrator tool registry.
Each tool has two parts:
1. A Gemini FunctionDeclaration — tells the model what the tool does and what args it takes
2. A Python async callable — the actual implementation
To add a new tool:
1. Implement it in a tools/<domain>.py module
2. Import it here and add (declaration, callable) to _REGISTRY
3. Add a FunctionDeclaration below and include it in TOOL_DECLARATIONS
IMPORTANT: These tools are separate from the ae_* MCP tools used by the fleet agents.
Do not modify the ae_* MCP server to support orchestrator needs.
"""
from google.genai import types
from tools.web import search as _web_search
from tools.ae_knowledge import journal_search as _ae_journal_search
from tools.ae_knowledge import journal_entry_create as _ae_journal_entry_create
from tools.ae_tasks import task_list as _ae_task_list
from tools.files import file_read as _file_read
# ---------------------------------------------------------------------------
# Gemini function declarations
# ---------------------------------------------------------------------------
_web_search_declaration = types.FunctionDeclaration(
name="web_search",
description=(
"Search the web for current information. Use this when you need up-to-date "
"facts, news, documentation, or anything not in your training data."
),
parameters=types.Schema(
type=types.Type.OBJECT,
properties={
"query": types.Schema(
type=types.Type.STRING,
description="The search query string",
),
"max_results": types.Schema(
type=types.Type.INTEGER,
description="Number of results to return (default 5, max 10)",
),
},
required=["query"],
),
)
_ae_journal_search_declaration = types.FunctionDeclaration(
name="ae_journal_search",
description=(
"Search the Aether Journals knowledge base by keyword. "
"Use this to look up notes, documentation, meeting summaries, or any saved knowledge. "
"Always search before creating a new entry to avoid duplicates."
),
parameters=types.Schema(
type=types.Type.OBJECT,
properties={
"query": types.Schema(
type=types.Type.STRING,
description="Keyword or phrase to search for",
),
"journal_id": types.Schema(
type=types.Type.STRING,
description=(
"Optional: scope search to a specific journal by its id_random. "
"Omit to search all journals."
),
),
"max_results": types.Schema(
type=types.Type.INTEGER,
description="Maximum number of entries to return (default 10)",
),
},
required=["query"],
),
)
_ae_journal_entry_create_declaration = types.FunctionDeclaration(
name="ae_journal_entry_create",
description=(
"Create a new entry in an Aether Journal. "
"Use this to save notes, summaries, or any content the user wants to store. "
"Always call ae_journal_search first to check for existing entries on the same topic."
),
parameters=types.Schema(
type=types.Type.OBJECT,
properties={
"journal_id": types.Schema(
type=types.Type.STRING,
description=(
"The id_random of the target journal. "
"Ask the user which journal to write to if not specified."
),
),
"title": types.Schema(
type=types.Type.STRING,
description="Entry title",
),
"content": types.Schema(
type=types.Type.STRING,
description="Full entry content (markdown supported)",
),
"summary": types.Schema(
type=types.Type.STRING,
description="Optional short summary (1-2 sentences)",
),
"tags": types.Schema(
type=types.Type.STRING,
description="Optional comma-separated tags (e.g. 'wireguard, networking, homelab')",
),
},
required=["journal_id", "title", "content"],
),
)
_ae_task_list_declaration = types.FunctionDeclaration(
name="ae_task_list",
description=(
"List tasks from the agents_sync Kanban board (todo and in-progress). "
"Use this when asked about current work, pending tasks, or project status."
),
parameters=types.Schema(
type=types.Type.OBJECT,
properties={
"include_done": types.Schema(
type=types.Type.BOOLEAN,
description="If true, also include completed tasks (default false)",
),
},
),
)
_file_read_declaration = types.FunctionDeclaration(
name="file_read",
description=(
"Read a local file and return its contents. "
"Allowed directories: ~/agents_sync/, ~/OSIT_dev/, ~/DgrZone_Nextcloud/, ~/OSIT_Nextcloud/. "
"Use this to read documentation, notes, CLAUDE.md files, or config references. "
"If given a directory path, returns a directory listing instead."
),
parameters=types.Schema(
type=types.Type.OBJECT,
properties={
"path": types.Schema(
type=types.Type.STRING,
description=(
"Absolute or home-relative path to the file "
"(e.g. ~/agents_sync/CLAUDE.md or /home/scott/agents_sync/tasks/01_todo/)"
),
),
"max_lines": types.Schema(
type=types.Type.INTEGER,
description="Optional line limit (default 500)",
),
},
required=["path"],
),
)
# ---------------------------------------------------------------------------
# Registry: maps tool name → async callable
# ---------------------------------------------------------------------------
_CALLABLES: dict[str, callable] = {
"web_search": _web_search,
"ae_journal_search": _ae_journal_search,
"ae_journal_entry_create": _ae_journal_entry_create,
"ae_task_list": _ae_task_list,
"file_read": _file_read,
}
# Gemini Tool object — pass this to GenerateContentConfig
TOOL_DECLARATIONS = [
types.Tool(function_declarations=[
_web_search_declaration,
_ae_journal_search_declaration,
_ae_journal_entry_create_declaration,
_ae_task_list_declaration,
_file_read_declaration,
])
]
async def call_tool(name: str, args: dict) -> str:
"""Dispatch a tool call by name. Returns result as a string."""
fn = _CALLABLES.get(name)
if fn is None:
return f"Unknown tool: {name}"
return await fn(**args)

View File

@@ -0,0 +1,177 @@
"""
Aether Platform knowledge tools — journal search and entry creation.
These tools give the orchestrator read/write access to the AE Journals module,
which serves as the primary long-term knowledge base.
Auth: x-aether-api-key + x-account-id headers (same pattern as agents_sync scripts).
API: V3 CRUD — POST /v3/crud/journal_entry/search, POST /v3/crud/journal/{id}/journal_entry/
"""
import asyncio
import logging
from config import settings
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Shared helpers
# ---------------------------------------------------------------------------
def _headers() -> dict:
return {
"x-aether-api-key": settings.ae_api_key,
"x-account-id": settings.ae_account_id,
"Content-Type": "application/json",
}
def _check_config() -> str | None:
"""Return an error string if AE API is not configured, else None."""
if not settings.ae_api_key or not settings.ae_account_id:
return (
"AE API not configured. Set AE_API_KEY and AE_ACCOUNT_ID in .env. "
"Values are the same as agents_sync/mcp/.env."
)
return None
# ---------------------------------------------------------------------------
# Tool: ae_journal_search
# ---------------------------------------------------------------------------
async def journal_search(query: str, journal_id: str | None = None, max_results: int = 10) -> str:
"""Search AE Journal entries by keyword.
Searches across the default_qry_str field (title + content excerpt).
Optionally scoped to a specific journal by journal_id (id_random).
Returns a markdown-formatted list of matching entries.
"""
err = _check_config()
if err:
return err
return await asyncio.to_thread(_sync_journal_search, query, journal_id, max_results)
def _sync_journal_search(query: str, journal_id: str | None, max_results: int) -> str:
import requests
url = f"{settings.ae_api_url}/v3/crud/journal_entry/search"
search_body = {
"and_filters": [
{"field": "default_qry_str", "op": "icontains", "value": query}
],
"page_size": max_results,
}
params = {}
if journal_id:
params["for_obj_type"] = "journal"
params["for_obj_id"] = journal_id
try:
resp = requests.post(
url,
headers=_headers(),
params=params,
json=search_body,
timeout=settings.ae_api_timeout,
)
resp.raise_for_status()
data = resp.json()
except Exception as e:
logger.warning("ae_journal_search failed: %s", e)
return f"Journal search error: {e}"
entries = data.get("data", [])
if not entries:
return f"No journal entries found matching: {query}"
lines = [f"Journal entries matching **{query}** ({len(entries)} result(s)):\n"]
for entry in entries:
title = entry.get("name") or "(untitled)"
entry_id = entry.get("id_random", "")
journal_name = entry.get("journal_name") or entry.get("parent_name") or ""
summary = entry.get("summary") or ""
content_preview = (entry.get("content") or "")[:200].replace("\n", " ")
header = f"**{title}**"
if journal_name:
header += f" ({journal_name})"
if entry_id:
header += f" — id: `{entry_id}`"
lines.append(header)
if summary:
lines.append(f" Summary: {summary}")
if content_preview:
lines.append(f" {content_preview}")
lines.append("")
return "\n".join(lines).strip()
# ---------------------------------------------------------------------------
# Tool: ae_journal_entry_create
# ---------------------------------------------------------------------------
async def journal_entry_create(
journal_id: str,
title: str,
content: str,
summary: str = "",
tags: str = "",
) -> str:
"""Create a new entry in an AE Journal.
Args:
journal_id: The id_random of the target journal (use ae_journal_search to find it,
or ask the user which journal to write to).
title: Entry title (name field).
content: Full entry content (markdown supported).
summary: Optional short summary (1-2 sentences).
tags: Optional comma-separated tags.
Returns a confirmation with the new entry's id_random, or an error message.
"""
err = _check_config()
if err:
return err
return await asyncio.to_thread(
_sync_journal_entry_create, journal_id, title, content, summary, tags
)
def _sync_journal_entry_create(
journal_id: str, title: str, content: str, summary: str, tags: str
) -> str:
import requests
url = f"{settings.ae_api_url}/v3/crud/journal/{journal_id}/journal_entry/"
data: dict = {"name": title, "content": content}
if summary:
data["summary"] = summary
if tags:
data["tags"] = [t.strip() for t in tags.split(",") if t.strip()]
try:
resp = requests.post(
url,
headers=_headers(),
json=data,
timeout=settings.ae_api_timeout,
)
resp.raise_for_status()
result = resp.json()
except Exception as e:
logger.warning("ae_journal_entry_create failed: %s", e)
return f"Journal entry creation error: {e}"
entry_id = (
result.get("data", {}).get("id_random")
or result.get("id_random")
or "unknown"
)
return f"Journal entry created. id: `{entry_id}`, title: \"{title}\", journal: `{journal_id}`"

100
cortex/tools/ae_tasks.py Normal file
View File

@@ -0,0 +1,100 @@
"""
Aether task list tool — reads the agents_sync Kanban board.
Reads task JSON files directly from the agents_sync filesystem rather than
making an HTTP call, since the tasks directory is always locally available
(synced via Syncthing). This avoids needing a separate API endpoint for tasks.
Structure:
agents_sync/tasks/01_todo/ — pending tasks
agents_sync/tasks/02_in_progress/ — active tasks
agents_sync/tasks/03_done/ — completed tasks (not included by default)
"""
import asyncio
import json
import logging
from pathlib import Path
logger = logging.getLogger(__name__)
# Resolved at import time — agents_sync is always at ~/agents_sync on this machine.
# If the path doesn't exist the tool returns a helpful error rather than crashing.
_AGENTS_SYNC = Path.home() / "agents_sync"
_TASKS_ROOT = _AGENTS_SYNC / "tasks"
async def task_list(include_done: bool = False) -> str:
"""List tasks from the agents_sync Kanban board.
Reads the todo and in_progress buckets (and optionally done).
Returns a markdown summary grouped by status.
Args:
include_done: If True, also include completed tasks (can be noisy).
"""
return await asyncio.to_thread(_sync_task_list, include_done)
def _sync_task_list(include_done: bool) -> str:
if not _TASKS_ROOT.exists():
return f"Task directory not found: {_TASKS_ROOT}"
buckets = [
("01_todo", "Todo"),
("02_in_progress", "In Progress"),
]
if include_done:
buckets.append(("03_done", "Done"))
sections: list[str] = []
total = 0
for dir_name, label in buckets:
bucket_dir = _TASKS_ROOT / dir_name
if not bucket_dir.exists():
continue
tasks = _read_bucket(bucket_dir)
total += len(tasks)
if not tasks:
continue
lines = [f"## {label} ({len(tasks)})\n"]
for task in tasks:
title = task.get("title") or task.get("name") or "(untitled)"
assigned = task.get("assigned_to") or ""
task_id = task.get("id") or ""
desc = task.get("description") or ""
header = f"- **{title}**"
if assigned:
header += f" (assigned: {assigned})"
if task_id:
header += f" — `{task_id}`"
lines.append(header)
if desc:
# First sentence / 120 chars of description
short = desc.split(".")[0][:120]
lines.append(f" {short}")
sections.append("\n".join(lines))
if not sections:
return "No tasks found on the Kanban board."
header_line = f"# Kanban Board — {total} task(s)\n"
return header_line + "\n\n".join(sections)
def _read_bucket(bucket_dir: Path) -> list[dict]:
"""Read and parse all JSON task files in a bucket directory."""
tasks = []
for path in sorted(bucket_dir.glob("*.json")):
try:
data = json.loads(path.read_text())
tasks.append(data)
except Exception as e:
logger.warning("Failed to read task file %s: %s", path, e)
return tasks

112
cortex/tools/files.py Normal file
View File

@@ -0,0 +1,112 @@
"""
File read tool — restricted to known-safe directory roots.
Lets the orchestrator read local files (documentation, notes, config references)
without exposing arbitrary filesystem access. All paths are resolved and checked
against an allowlist of roots before any read is performed.
"""
import asyncio
import logging
from pathlib import Path
logger = logging.getLogger(__name__)
# Directories the orchestrator is allowed to read from.
# Paths are resolved (symlinks followed, ~ expanded) at import time.
_ALLOWED_ROOTS: list[Path] = [
Path.home() / "agents_sync",
Path.home() / "OSIT_dev",
Path.home() / "DgrZone_Nextcloud",
Path.home() / "OSIT_Nextcloud",
]
# Hard cap on file size to prevent accidental context blowout
_MAX_BYTES = 50_000 # ~50 KB
_MAX_LINES = 500
async def file_read(path: str, max_lines: int | None = None) -> str:
"""Read a local file and return its contents as a string.
Only files within allowed directories can be read:
~/agents_sync/, ~/OSIT_dev/, ~/DgrZone_Nextcloud/, ~/OSIT_Nextcloud/
Args:
path: Absolute or home-relative path to the file (e.g. ~/agents_sync/CLAUDE.md).
max_lines: Optional line limit (default 500, hard cap). Use for large files.
Returns the file contents (truncated if over the size limit), or an error message.
"""
return await asyncio.to_thread(_sync_file_read, path, max_lines)
def _sync_file_read(path: str, max_lines: int | None) -> str:
# Expand ~ and resolve to absolute path
try:
resolved = Path(path).expanduser().resolve()
except Exception as e:
return f"Invalid path: {e}"
# Security check — must be under an allowed root
if not _is_allowed(resolved):
allowed_str = ", ".join(str(r) for r in _ALLOWED_ROOTS)
return (
f"Access denied: {resolved}\n"
f"Allowed directories: {allowed_str}"
)
if not resolved.exists():
return f"File not found: {resolved}"
if not resolved.is_file():
# If it's a directory, list its contents instead
try:
entries = sorted(resolved.iterdir())
names = [e.name + ("/" if e.is_dir() else "") for e in entries[:100]]
return f"Directory listing for {resolved}:\n" + "\n".join(names)
except Exception as e:
return f"Cannot list directory: {e}"
# Read the file
try:
raw = resolved.read_bytes()
except Exception as e:
return f"Read error: {e}"
# Binary files
try:
text = raw.decode("utf-8")
except UnicodeDecodeError:
return f"Binary file (not readable as text): {resolved} [{len(raw)} bytes]"
# Apply line limit
limit = min(max_lines or _MAX_LINES, _MAX_LINES)
lines = text.splitlines()
truncated = False
if len(lines) > limit:
lines = lines[:limit]
truncated = True
# Apply byte cap as a final safety net
result = "\n".join(lines)
if len(result) > _MAX_BYTES:
result = result[:_MAX_BYTES]
truncated = True
if truncated:
result += f"\n\n… [truncated — file has {len(text.splitlines())} lines total]"
return result
def _is_allowed(resolved: Path) -> bool:
"""Check that resolved path is under one of the allowed roots."""
for root in _ALLOWED_ROOTS:
try:
resolved.relative_to(root)
return True
except ValueError:
continue
return False

50
cortex/tools/web.py Normal file
View File

@@ -0,0 +1,50 @@
"""
Web search tool — DuckDuckGo backend.
Uses the duckduckgo-search library. Set DDG_API_KEY in .env for a paid account
(higher rate limits). The free unauthenticated tier works for moderate usage.
"""
import asyncio
import logging
from config import settings
logger = logging.getLogger(__name__)
async def search(query: str, max_results: int | None = None) -> str:
"""Search DuckDuckGo and return results as a formatted string.
Returns a markdown-formatted list of results: title, URL, and snippet.
The orchestrator includes this in the context it passes to Claude.
"""
n = min(max_results or settings.ddg_max_results, 10)
results = await asyncio.to_thread(_sync_search, query, n)
if not results:
return f"No results found for: {query}"
lines = [f"Search results for: **{query}**\n"]
for i, r in enumerate(results, 1):
lines.append(f"{i}. [{r['title']}]({r['href']})")
if r.get("body"):
lines.append(f" {r['body']}")
lines.append("")
return "\n".join(lines).strip()
def _sync_search(query: str, max_results: int) -> list[dict]:
"""Synchronous DuckDuckGo search — run via asyncio.to_thread."""
from duckduckgo_search import DDGS
kwargs = {}
if settings.ddg_api_key:
# Paid account — pass token for higher rate limits
kwargs["headers"] = {"Authorization": f"Bearer {settings.ddg_api_key}"}
try:
with DDGS(**kwargs) as ddgs:
return list(ddgs.text(query, max_results=max_results))
except Exception as e:
logger.warning("DuckDuckGo search error: %s", e)
return []