Files
Cortex-Inara/cortex/tools/files.py
Scott Idem 7c3291960a feat: web_read (trafilatura), session_read, http_fetch max_chars
web_read(url, max_chars=16000) — fetches a URL and extracts clean article
text via trafilatura, stripping ads/nav/boilerplate. Returns markdown.

session_read(date) — reads a full session log by YYYY-MM-DD date; lists
available dates if the requested one is not found.

http_fetch gains a max_chars param (default 8192, max 32768) so the cap
is configurable instead of hardcoded.

Tool count: 45 → 47.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-09 13:04:24 -04:00

395 lines
14 KiB
Python

"""
File read/write/search tools — restricted to known-safe directory roots.
Lets the orchestrator read local files (documentation, notes, config references)
and search past session logs without exposing arbitrary filesystem access.
All paths are resolved and checked against an allowlist of roots before any
read or write is performed.
"""
import asyncio
import logging
import re
from pathlib import Path
from google.genai import types
logger = logging.getLogger(__name__)
# Directories the orchestrator is allowed to read from.
# Paths are resolved (symlinks followed, ~ expanded) at import time.
def _build_allowed_roots() -> list[Path]:
roots = [
Path.home() / "agents_sync",
Path.home() / "OSIT_dev",
Path.home() / "DgrZone_Nextcloud",
Path.home() / "OSIT_Nextcloud",
]
try:
from config import settings
roots.append(settings.home_root())
except Exception:
pass
return roots
_ALLOWED_ROOTS: list[Path] = _build_allowed_roots()
# Hard cap on file size to prevent accidental context blowout
_MAX_BYTES = 50_000 # ~50 KB
_MAX_LINES = 500
async def file_read(path: str, max_lines: int | None = None) -> str:
"""Read a local file and return its contents as a string.
Only files within allowed directories can be read:
~/agents_sync/, ~/OSIT_dev/, ~/DgrZone_Nextcloud/, ~/OSIT_Nextcloud/
Args:
path: Absolute or home-relative path to the file (e.g. ~/agents_sync/CLAUDE.md).
max_lines: Optional line limit (default 500, hard cap). Use for large files.
Returns the file contents (truncated if over the size limit), or an error message.
"""
return await asyncio.to_thread(_sync_file_read, path, max_lines)
def _sync_file_read(path: str, max_lines: int | None) -> str:
# Expand ~ and resolve to absolute path
try:
resolved = Path(path).expanduser().resolve()
except Exception as e:
return f"Invalid path: {e}"
# Security check — must be under an allowed root
if not _is_allowed(resolved):
allowed_str = ", ".join(str(r) for r in _ALLOWED_ROOTS)
return (
f"Access denied: {resolved}\n"
f"Allowed directories: {allowed_str}"
)
if not resolved.exists():
return f"File not found: {resolved}"
if not resolved.is_file():
# If it's a directory, list its contents instead
try:
entries = sorted(resolved.iterdir())
names = [e.name + ("/" if e.is_dir() else "") for e in entries[:100]]
return f"Directory listing for {resolved}:\n" + "\n".join(names)
except Exception as e:
return f"Cannot list directory: {e}"
# Read the file
try:
raw = resolved.read_bytes()
except Exception as e:
return f"Read error: {e}"
# Binary files
try:
text = raw.decode("utf-8")
except UnicodeDecodeError:
return f"Binary file (not readable as text): {resolved} [{len(raw)} bytes]"
# Apply line limit
limit = min(max_lines or _MAX_LINES, _MAX_LINES)
lines = text.splitlines()
truncated = False
if len(lines) > limit:
lines = lines[:limit]
truncated = True
# Apply byte cap as a final safety net
result = "\n".join(lines)
if len(result) > _MAX_BYTES:
result = result[:_MAX_BYTES]
truncated = True
if truncated:
result += f"\n\n… [truncated — file has {len(text.splitlines())} lines total]"
return result
def _is_allowed(resolved: Path) -> bool:
"""Check that resolved path is under one of the allowed roots."""
for root in _ALLOWED_ROOTS:
try:
resolved.relative_to(root)
return True
except ValueError:
continue
return False
# Write is restricted to a tighter set of paths to limit blast radius.
_WRITE_ROOTS: list[Path] = [
Path.home() / "agents_sync",
]
def _is_write_allowed(resolved: Path) -> bool:
for root in _WRITE_ROOTS:
try:
resolved.relative_to(root)
return True
except ValueError:
continue
# Also allow the Cortex home/ directory (persona memory, tasks, etc.)
try:
from config import settings
cortex_home = settings.home_root()
resolved.relative_to(cortex_home)
return True
except (ValueError, Exception):
pass
return False
async def file_list(path: str) -> str:
"""List the contents of a directory.
Returns names of files and subdirectories with type indicators (/ for dirs).
Same allow-list as file_read.
"""
return await asyncio.to_thread(_sync_file_list, path)
def _sync_file_list(path: str) -> str:
try:
resolved = Path(path).expanduser().resolve()
except Exception as e:
return f"Invalid path: {e}"
if not _is_allowed(resolved):
allowed_str = ", ".join(str(r) for r in _ALLOWED_ROOTS)
return f"Access denied: {resolved}\nAllowed directories: {allowed_str}"
if not resolved.exists():
return f"Path not found: {resolved}"
if resolved.is_file():
return f"{resolved} is a file, not a directory. Use file_read to read it."
try:
entries = sorted(resolved.iterdir(), key=lambda e: (e.is_file(), e.name.lower()))
lines = []
for e in entries[:200]:
suffix = "/" if e.is_dir() else f" ({e.stat().st_size} bytes)" if e.is_file() else ""
lines.append(f"{e.name}{suffix}")
result = "\n".join(lines)
if len(entries) > 200:
result += f"\n… ({len(entries) - 200} more entries not shown)"
return f"Contents of {resolved}:\n\n{result}"
except Exception as e:
return f"Cannot list directory: {e}"
async def file_write(path: str, content: str, mode: str = "overwrite") -> str:
"""Write content to a file.
mode: 'overwrite' (default) replaces the file; 'append' adds to the end.
Write-allowed paths: ~/agents_sync/ and the Cortex home/ directory.
Parent directories are created if they don't exist.
"""
return await asyncio.to_thread(_sync_file_write, path, content, mode)
def _sync_file_write(path: str, content: str, mode: str) -> str:
try:
resolved = Path(path).expanduser().resolve()
except Exception as e:
return f"Invalid path: {e}"
if not _is_write_allowed(resolved):
return (
f"Write access denied: {resolved}\n"
f"Allowed write roots: ~/agents_sync/ and the Cortex home/ directory."
)
if mode not in ("overwrite", "append"):
return f"Invalid mode '{mode}' — use 'overwrite' or 'append'."
try:
resolved.parent.mkdir(parents=True, exist_ok=True)
if mode == "append":
with resolved.open("a", encoding="utf-8") as f:
f.write(content)
return f"Appended {len(content)} chars to {resolved}"
else:
resolved.write_text(content, encoding="utf-8")
return f"Wrote {len(content)} chars to {resolved}"
except Exception as e:
logger.error("file_write error for %s: %s", resolved, e)
return f"Write error: {e}"
_SEARCH_EXCERPT_CHARS = 150
async def session_read(date: str) -> str:
"""Read a full session log by date (YYYY-MM-DD).
Returns the complete session log for that date. If the date is not found,
lists the most recent available dates instead.
Only reads the current user's own sessions (per-persona isolation via ContextVars).
"""
return await asyncio.to_thread(_sync_session_read, date.strip())
def _sync_session_read(date: str) -> str:
from persona import persona_path
sessions_dir = persona_path() / "sessions"
if not sessions_dir.exists():
return "No session logs found."
target = sessions_dir / f"{date}.md"
if target.exists():
content = target.read_text()
return f"Session log for {date} ({len(content)} chars):\n\n{content}"
available = sorted([f.stem for f in sessions_dir.glob("*.md")], reverse=True)
if not available:
return "No session logs found."
recent = "\n".join(f" {d}" for d in available[:15])
return f"No session log found for '{date}'. Available dates (most recent first):\n{recent}"
async def session_search(query: str, limit: int = 5) -> str:
"""Search past session logs for a keyword or phrase.
Returns up to `limit` matching excerpts with session dates, newest first.
Only searches the current user's own sessions (per-persona isolation via ContextVars).
"""
return await asyncio.to_thread(_sync_session_search, query, limit)
def _sync_session_search(query: str, limit: int) -> str:
from persona import persona_path
sessions_dir = persona_path() / "sessions"
if not sessions_dir.exists():
return "No session logs found."
limit = max(1, min(limit, 20))
pattern = re.compile(re.escape(query), re.IGNORECASE)
session_files = sorted(sessions_dir.glob("*.md"), reverse=True)
matches = []
for sf in session_files:
if len(matches) >= limit:
break
try:
text = sf.read_text()
except OSError:
continue
for m in pattern.finditer(text):
if len(matches) >= limit:
break
start = max(0, m.start() - _SEARCH_EXCERPT_CHARS)
end = min(len(text), m.end() + _SEARCH_EXCERPT_CHARS)
excerpt = text[start:end].strip()
if start > 0:
excerpt = "" + excerpt
if end < len(text):
excerpt = excerpt + ""
matches.append(f"[{sf.stem}] {excerpt}")
if not matches:
return f"No matches for '{query}' across {len(session_files)} session logs."
header = f"Session search: '{query}'{len(matches)} match(es) across {len(session_files)} logs\n"
return header + "\n\n".join(matches)
DECLARATIONS = [
types.FunctionDeclaration(
name="file_read",
description=(
"Read a local file and return its contents. "
"Allowed directories: ~/agents_sync/, ~/OSIT_dev/, ~/DgrZone_Nextcloud/, ~/OSIT_Nextcloud/, "
"and the Cortex home/ directory (persona memory, tool audit logs, etc.). "
"Use this to read documentation, notes, CLAUDE.md files, config references, "
"or tool audit logs at home/{user}/tool_audit/YYYY-MM-DD.jsonl. "
"If given a directory path, returns a directory listing instead."
),
parameters=types.Schema(
type=types.Type.OBJECT,
properties={
"path": types.Schema(type=types.Type.STRING, description="Absolute or home-relative path to the file (e.g. ~/agents_sync/CLAUDE.md or /home/scott/agents_sync/tasks/01_todo/)"),
"max_lines": types.Schema(type=types.Type.INTEGER, description="Optional line limit (default 500)"),
},
required=["path"],
),
),
types.FunctionDeclaration(
name="file_list",
description=(
"List the files and subdirectories in a directory. "
"Allowed paths: ~/agents_sync/, ~/OSIT_dev/, ~/DgrZone_Nextcloud/, ~/OSIT_Nextcloud/. "
"ADMIN ONLY."
),
parameters=types.Schema(
type=types.Type.OBJECT,
properties={
"path": types.Schema(type=types.Type.STRING, description="Absolute or home-relative path to the directory"),
},
required=["path"],
),
),
types.FunctionDeclaration(
name="file_write",
description=(
"Write or append content to a file. "
"Write-allowed paths: ~/agents_sync/ and the Cortex home/ directory. "
"Creates parent directories if needed. "
"ADMIN ONLY. Requires user confirmation before executing."
),
parameters=types.Schema(
type=types.Type.OBJECT,
properties={
"path": types.Schema(type=types.Type.STRING, description="Absolute or home-relative path to write to"),
"content": types.Schema(type=types.Type.STRING, description="Content to write"),
"mode": types.Schema(type=types.Type.STRING, description="'overwrite' (default, replaces file) or 'append' (adds to end)"),
},
required=["path", "content"],
),
),
types.FunctionDeclaration(
name="session_read",
description=(
"Read a full session log by date (YYYY-MM-DD). Returns the complete conversation "
"from that session — useful for continuity, recalling decisions, or reviewing "
"what was discussed on a specific day. If the date is not found, lists available dates. "
"Only reads this user's own sessions."
),
parameters=types.Schema(
type=types.Type.OBJECT,
properties={
"date": types.Schema(type=types.Type.STRING, description="Date in YYYY-MM-DD format (e.g. '2026-05-08')"),
},
required=["date"],
),
),
types.FunctionDeclaration(
name="session_search",
description=(
"Search past conversation session logs for a keyword or phrase. "
"Use this to recall what was discussed in previous sessions — "
"e.g. 'what did we decide about X?', 'when did we set up Y?'. "
"Returns matching excerpts with session dates, newest first. "
"Only searches this user's own sessions."
),
parameters=types.Schema(
type=types.Type.OBJECT,
properties={
"query": types.Schema(type=types.Type.STRING, description="Keyword or phrase to search for"),
"limit": types.Schema(type=types.Type.INTEGER, description="Max results to return (default 5, max 20)"),
},
required=["query"],
),
),
]