Files
Cortex-Inara/cortex/tools/files.py
Scott Idem 750cde489d feat: session_search tool + tool expansion docs update
session_search (tools/files.py):
- Full-text search across past session logs, exposed to the orchestrator
- Params: query (required), limit (default 5, max 20)
- Returns dated excerpts, newest first; own sessions only via ContextVars
- User-level — no TOOL_ROLES gating needed
- Registered in __init__.py callables + TOOL_CATEGORIES["Files"]

ARCH__FUTURE.md §2: updated tool count to 44, marked prior tools complete,
added Round 2 planned tools table (session_search now done, reminders due dates,
http_post, nc_talk_history, task_list priority filter, http_fetch max_chars),
noted datetime_now is not needed (already in system prompt via context_loader)

TODO__Agents.md: session_search checked off, Round 2 task list added

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-08 21:41:26 -04:00

351 lines
12 KiB
Python

"""
File read/write/search tools — restricted to known-safe directory roots.
Lets the orchestrator read local files (documentation, notes, config references)
and search past session logs without exposing arbitrary filesystem access.
All paths are resolved and checked against an allowlist of roots before any
read or write is performed.
"""
import asyncio
import logging
import re
from pathlib import Path
from google.genai import types
logger = logging.getLogger(__name__)
# Directories the orchestrator is allowed to read from.
# Paths are resolved (symlinks followed, ~ expanded) at import time.
def _build_allowed_roots() -> list[Path]:
roots = [
Path.home() / "agents_sync",
Path.home() / "OSIT_dev",
Path.home() / "DgrZone_Nextcloud",
Path.home() / "OSIT_Nextcloud",
]
try:
from config import settings
roots.append(settings.home_root())
except Exception:
pass
return roots
_ALLOWED_ROOTS: list[Path] = _build_allowed_roots()
# Hard cap on file size to prevent accidental context blowout
_MAX_BYTES = 50_000 # ~50 KB
_MAX_LINES = 500
async def file_read(path: str, max_lines: int | None = None) -> str:
"""Read a local file and return its contents as a string.
Only files within allowed directories can be read:
~/agents_sync/, ~/OSIT_dev/, ~/DgrZone_Nextcloud/, ~/OSIT_Nextcloud/
Args:
path: Absolute or home-relative path to the file (e.g. ~/agents_sync/CLAUDE.md).
max_lines: Optional line limit (default 500, hard cap). Use for large files.
Returns the file contents (truncated if over the size limit), or an error message.
"""
return await asyncio.to_thread(_sync_file_read, path, max_lines)
def _sync_file_read(path: str, max_lines: int | None) -> str:
# Expand ~ and resolve to absolute path
try:
resolved = Path(path).expanduser().resolve()
except Exception as e:
return f"Invalid path: {e}"
# Security check — must be under an allowed root
if not _is_allowed(resolved):
allowed_str = ", ".join(str(r) for r in _ALLOWED_ROOTS)
return (
f"Access denied: {resolved}\n"
f"Allowed directories: {allowed_str}"
)
if not resolved.exists():
return f"File not found: {resolved}"
if not resolved.is_file():
# If it's a directory, list its contents instead
try:
entries = sorted(resolved.iterdir())
names = [e.name + ("/" if e.is_dir() else "") for e in entries[:100]]
return f"Directory listing for {resolved}:\n" + "\n".join(names)
except Exception as e:
return f"Cannot list directory: {e}"
# Read the file
try:
raw = resolved.read_bytes()
except Exception as e:
return f"Read error: {e}"
# Binary files
try:
text = raw.decode("utf-8")
except UnicodeDecodeError:
return f"Binary file (not readable as text): {resolved} [{len(raw)} bytes]"
# Apply line limit
limit = min(max_lines or _MAX_LINES, _MAX_LINES)
lines = text.splitlines()
truncated = False
if len(lines) > limit:
lines = lines[:limit]
truncated = True
# Apply byte cap as a final safety net
result = "\n".join(lines)
if len(result) > _MAX_BYTES:
result = result[:_MAX_BYTES]
truncated = True
if truncated:
result += f"\n\n… [truncated — file has {len(text.splitlines())} lines total]"
return result
def _is_allowed(resolved: Path) -> bool:
"""Check that resolved path is under one of the allowed roots."""
for root in _ALLOWED_ROOTS:
try:
resolved.relative_to(root)
return True
except ValueError:
continue
return False
# Write is restricted to a tighter set of paths to limit blast radius.
_WRITE_ROOTS: list[Path] = [
Path.home() / "agents_sync",
]
def _is_write_allowed(resolved: Path) -> bool:
for root in _WRITE_ROOTS:
try:
resolved.relative_to(root)
return True
except ValueError:
continue
# Also allow the Cortex home/ directory (persona memory, tasks, etc.)
try:
from config import settings
cortex_home = settings.home_root()
resolved.relative_to(cortex_home)
return True
except (ValueError, Exception):
pass
return False
async def file_list(path: str) -> str:
"""List the contents of a directory.
Returns names of files and subdirectories with type indicators (/ for dirs).
Same allow-list as file_read.
"""
return await asyncio.to_thread(_sync_file_list, path)
def _sync_file_list(path: str) -> str:
try:
resolved = Path(path).expanduser().resolve()
except Exception as e:
return f"Invalid path: {e}"
if not _is_allowed(resolved):
allowed_str = ", ".join(str(r) for r in _ALLOWED_ROOTS)
return f"Access denied: {resolved}\nAllowed directories: {allowed_str}"
if not resolved.exists():
return f"Path not found: {resolved}"
if resolved.is_file():
return f"{resolved} is a file, not a directory. Use file_read to read it."
try:
entries = sorted(resolved.iterdir(), key=lambda e: (e.is_file(), e.name.lower()))
lines = []
for e in entries[:200]:
suffix = "/" if e.is_dir() else f" ({e.stat().st_size} bytes)" if e.is_file() else ""
lines.append(f"{e.name}{suffix}")
result = "\n".join(lines)
if len(entries) > 200:
result += f"\n… ({len(entries) - 200} more entries not shown)"
return f"Contents of {resolved}:\n\n{result}"
except Exception as e:
return f"Cannot list directory: {e}"
async def file_write(path: str, content: str, mode: str = "overwrite") -> str:
"""Write content to a file.
mode: 'overwrite' (default) replaces the file; 'append' adds to the end.
Write-allowed paths: ~/agents_sync/ and the Cortex home/ directory.
Parent directories are created if they don't exist.
"""
return await asyncio.to_thread(_sync_file_write, path, content, mode)
def _sync_file_write(path: str, content: str, mode: str) -> str:
try:
resolved = Path(path).expanduser().resolve()
except Exception as e:
return f"Invalid path: {e}"
if not _is_write_allowed(resolved):
return (
f"Write access denied: {resolved}\n"
f"Allowed write roots: ~/agents_sync/ and the Cortex home/ directory."
)
if mode not in ("overwrite", "append"):
return f"Invalid mode '{mode}' — use 'overwrite' or 'append'."
try:
resolved.parent.mkdir(parents=True, exist_ok=True)
if mode == "append":
with resolved.open("a", encoding="utf-8") as f:
f.write(content)
return f"Appended {len(content)} chars to {resolved}"
else:
resolved.write_text(content, encoding="utf-8")
return f"Wrote {len(content)} chars to {resolved}"
except Exception as e:
logger.error("file_write error for %s: %s", resolved, e)
return f"Write error: {e}"
_SEARCH_EXCERPT_CHARS = 150
async def session_search(query: str, limit: int = 5) -> str:
"""Search past session logs for a keyword or phrase.
Returns up to `limit` matching excerpts with session dates, newest first.
Only searches the current user's own sessions (per-persona isolation via ContextVars).
"""
return await asyncio.to_thread(_sync_session_search, query, limit)
def _sync_session_search(query: str, limit: int) -> str:
from persona import persona_path
sessions_dir = persona_path() / "sessions"
if not sessions_dir.exists():
return "No session logs found."
limit = max(1, min(limit, 20))
pattern = re.compile(re.escape(query), re.IGNORECASE)
session_files = sorted(sessions_dir.glob("*.md"), reverse=True)
matches = []
for sf in session_files:
if len(matches) >= limit:
break
try:
text = sf.read_text()
except OSError:
continue
for m in pattern.finditer(text):
if len(matches) >= limit:
break
start = max(0, m.start() - _SEARCH_EXCERPT_CHARS)
end = min(len(text), m.end() + _SEARCH_EXCERPT_CHARS)
excerpt = text[start:end].strip()
if start > 0:
excerpt = "" + excerpt
if end < len(text):
excerpt = excerpt + ""
matches.append(f"[{sf.stem}] {excerpt}")
if not matches:
return f"No matches for '{query}' across {len(session_files)} session logs."
header = f"Session search: '{query}'{len(matches)} match(es) across {len(session_files)} logs\n"
return header + "\n\n".join(matches)
DECLARATIONS = [
types.FunctionDeclaration(
name="file_read",
description=(
"Read a local file and return its contents. "
"Allowed directories: ~/agents_sync/, ~/OSIT_dev/, ~/DgrZone_Nextcloud/, ~/OSIT_Nextcloud/, "
"and the Cortex home/ directory (persona memory, tool audit logs, etc.). "
"Use this to read documentation, notes, CLAUDE.md files, config references, "
"or tool audit logs at home/{user}/tool_audit/YYYY-MM-DD.jsonl. "
"If given a directory path, returns a directory listing instead."
),
parameters=types.Schema(
type=types.Type.OBJECT,
properties={
"path": types.Schema(type=types.Type.STRING, description="Absolute or home-relative path to the file (e.g. ~/agents_sync/CLAUDE.md or /home/scott/agents_sync/tasks/01_todo/)"),
"max_lines": types.Schema(type=types.Type.INTEGER, description="Optional line limit (default 500)"),
},
required=["path"],
),
),
types.FunctionDeclaration(
name="file_list",
description=(
"List the files and subdirectories in a directory. "
"Allowed paths: ~/agents_sync/, ~/OSIT_dev/, ~/DgrZone_Nextcloud/, ~/OSIT_Nextcloud/. "
"ADMIN ONLY."
),
parameters=types.Schema(
type=types.Type.OBJECT,
properties={
"path": types.Schema(type=types.Type.STRING, description="Absolute or home-relative path to the directory"),
},
required=["path"],
),
),
types.FunctionDeclaration(
name="file_write",
description=(
"Write or append content to a file. "
"Write-allowed paths: ~/agents_sync/ and the Cortex home/ directory. "
"Creates parent directories if needed. "
"ADMIN ONLY. Requires user confirmation before executing."
),
parameters=types.Schema(
type=types.Type.OBJECT,
properties={
"path": types.Schema(type=types.Type.STRING, description="Absolute or home-relative path to write to"),
"content": types.Schema(type=types.Type.STRING, description="Content to write"),
"mode": types.Schema(type=types.Type.STRING, description="'overwrite' (default, replaces file) or 'append' (adds to end)"),
},
required=["path", "content"],
),
),
types.FunctionDeclaration(
name="session_search",
description=(
"Search past conversation session logs for a keyword or phrase. "
"Use this to recall what was discussed in previous sessions — "
"e.g. 'what did we decide about X?', 'when did we set up Y?'. "
"Returns matching excerpts with session dates, newest first. "
"Only searches this user's own sessions."
),
parameters=types.Schema(
type=types.Type.OBJECT,
properties={
"query": types.Schema(type=types.Type.STRING, description="Keyword or phrase to search for"),
"limit": types.Schema(type=types.Type.INTEGER, description="Max results to return (default 5, max 20)"),
},
required=["query"],
),
),
]