web_read(url, max_chars=16000) — fetches a URL and extracts clean article text via trafilatura, stripping ads/nav/boilerplate. Returns markdown. session_read(date) — reads a full session log by YYYY-MM-DD date; lists available dates if the requested one is not found. http_fetch gains a max_chars param (default 8192, max 32768) so the cap is configurable instead of hardcoded. Tool count: 45 → 47. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
395 lines
14 KiB
Python
395 lines
14 KiB
Python
"""
|
|
File read/write/search tools — restricted to known-safe directory roots.
|
|
|
|
Lets the orchestrator read local files (documentation, notes, config references)
|
|
and search past session logs without exposing arbitrary filesystem access.
|
|
All paths are resolved and checked against an allowlist of roots before any
|
|
read or write is performed.
|
|
"""
|
|
|
|
import asyncio
|
|
import logging
|
|
import re
|
|
from pathlib import Path
|
|
|
|
from google.genai import types
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Directories the orchestrator is allowed to read from.
|
|
# Paths are resolved (symlinks followed, ~ expanded) at import time.
|
|
def _build_allowed_roots() -> list[Path]:
|
|
roots = [
|
|
Path.home() / "agents_sync",
|
|
Path.home() / "OSIT_dev",
|
|
Path.home() / "DgrZone_Nextcloud",
|
|
Path.home() / "OSIT_Nextcloud",
|
|
]
|
|
try:
|
|
from config import settings
|
|
roots.append(settings.home_root())
|
|
except Exception:
|
|
pass
|
|
return roots
|
|
|
|
_ALLOWED_ROOTS: list[Path] = _build_allowed_roots()
|
|
|
|
# Hard cap on file size to prevent accidental context blowout
|
|
_MAX_BYTES = 50_000 # ~50 KB
|
|
_MAX_LINES = 500
|
|
|
|
|
|
async def file_read(path: str, max_lines: int | None = None) -> str:
|
|
"""Read a local file and return its contents as a string.
|
|
|
|
Only files within allowed directories can be read:
|
|
~/agents_sync/, ~/OSIT_dev/, ~/DgrZone_Nextcloud/, ~/OSIT_Nextcloud/
|
|
|
|
Args:
|
|
path: Absolute or home-relative path to the file (e.g. ~/agents_sync/CLAUDE.md).
|
|
max_lines: Optional line limit (default 500, hard cap). Use for large files.
|
|
|
|
Returns the file contents (truncated if over the size limit), or an error message.
|
|
"""
|
|
return await asyncio.to_thread(_sync_file_read, path, max_lines)
|
|
|
|
|
|
def _sync_file_read(path: str, max_lines: int | None) -> str:
|
|
# Expand ~ and resolve to absolute path
|
|
try:
|
|
resolved = Path(path).expanduser().resolve()
|
|
except Exception as e:
|
|
return f"Invalid path: {e}"
|
|
|
|
# Security check — must be under an allowed root
|
|
if not _is_allowed(resolved):
|
|
allowed_str = ", ".join(str(r) for r in _ALLOWED_ROOTS)
|
|
return (
|
|
f"Access denied: {resolved}\n"
|
|
f"Allowed directories: {allowed_str}"
|
|
)
|
|
|
|
if not resolved.exists():
|
|
return f"File not found: {resolved}"
|
|
|
|
if not resolved.is_file():
|
|
# If it's a directory, list its contents instead
|
|
try:
|
|
entries = sorted(resolved.iterdir())
|
|
names = [e.name + ("/" if e.is_dir() else "") for e in entries[:100]]
|
|
return f"Directory listing for {resolved}:\n" + "\n".join(names)
|
|
except Exception as e:
|
|
return f"Cannot list directory: {e}"
|
|
|
|
# Read the file
|
|
try:
|
|
raw = resolved.read_bytes()
|
|
except Exception as e:
|
|
return f"Read error: {e}"
|
|
|
|
# Binary files
|
|
try:
|
|
text = raw.decode("utf-8")
|
|
except UnicodeDecodeError:
|
|
return f"Binary file (not readable as text): {resolved} [{len(raw)} bytes]"
|
|
|
|
# Apply line limit
|
|
limit = min(max_lines or _MAX_LINES, _MAX_LINES)
|
|
lines = text.splitlines()
|
|
truncated = False
|
|
|
|
if len(lines) > limit:
|
|
lines = lines[:limit]
|
|
truncated = True
|
|
|
|
# Apply byte cap as a final safety net
|
|
result = "\n".join(lines)
|
|
if len(result) > _MAX_BYTES:
|
|
result = result[:_MAX_BYTES]
|
|
truncated = True
|
|
|
|
if truncated:
|
|
result += f"\n\n… [truncated — file has {len(text.splitlines())} lines total]"
|
|
|
|
return result
|
|
|
|
|
|
def _is_allowed(resolved: Path) -> bool:
|
|
"""Check that resolved path is under one of the allowed roots."""
|
|
for root in _ALLOWED_ROOTS:
|
|
try:
|
|
resolved.relative_to(root)
|
|
return True
|
|
except ValueError:
|
|
continue
|
|
return False
|
|
|
|
|
|
# Write is restricted to a tighter set of paths to limit blast radius.
|
|
_WRITE_ROOTS: list[Path] = [
|
|
Path.home() / "agents_sync",
|
|
]
|
|
|
|
|
|
def _is_write_allowed(resolved: Path) -> bool:
|
|
for root in _WRITE_ROOTS:
|
|
try:
|
|
resolved.relative_to(root)
|
|
return True
|
|
except ValueError:
|
|
continue
|
|
# Also allow the Cortex home/ directory (persona memory, tasks, etc.)
|
|
try:
|
|
from config import settings
|
|
cortex_home = settings.home_root()
|
|
resolved.relative_to(cortex_home)
|
|
return True
|
|
except (ValueError, Exception):
|
|
pass
|
|
return False
|
|
|
|
|
|
async def file_list(path: str) -> str:
|
|
"""List the contents of a directory.
|
|
|
|
Returns names of files and subdirectories with type indicators (/ for dirs).
|
|
Same allow-list as file_read.
|
|
"""
|
|
return await asyncio.to_thread(_sync_file_list, path)
|
|
|
|
|
|
def _sync_file_list(path: str) -> str:
|
|
try:
|
|
resolved = Path(path).expanduser().resolve()
|
|
except Exception as e:
|
|
return f"Invalid path: {e}"
|
|
|
|
if not _is_allowed(resolved):
|
|
allowed_str = ", ".join(str(r) for r in _ALLOWED_ROOTS)
|
|
return f"Access denied: {resolved}\nAllowed directories: {allowed_str}"
|
|
|
|
if not resolved.exists():
|
|
return f"Path not found: {resolved}"
|
|
|
|
if resolved.is_file():
|
|
return f"{resolved} is a file, not a directory. Use file_read to read it."
|
|
|
|
try:
|
|
entries = sorted(resolved.iterdir(), key=lambda e: (e.is_file(), e.name.lower()))
|
|
lines = []
|
|
for e in entries[:200]:
|
|
suffix = "/" if e.is_dir() else f" ({e.stat().st_size} bytes)" if e.is_file() else ""
|
|
lines.append(f"{e.name}{suffix}")
|
|
result = "\n".join(lines)
|
|
if len(entries) > 200:
|
|
result += f"\n… ({len(entries) - 200} more entries not shown)"
|
|
return f"Contents of {resolved}:\n\n{result}"
|
|
except Exception as e:
|
|
return f"Cannot list directory: {e}"
|
|
|
|
|
|
async def file_write(path: str, content: str, mode: str = "overwrite") -> str:
|
|
"""Write content to a file.
|
|
|
|
mode: 'overwrite' (default) replaces the file; 'append' adds to the end.
|
|
Write-allowed paths: ~/agents_sync/ and the Cortex home/ directory.
|
|
Parent directories are created if they don't exist.
|
|
"""
|
|
return await asyncio.to_thread(_sync_file_write, path, content, mode)
|
|
|
|
|
|
def _sync_file_write(path: str, content: str, mode: str) -> str:
|
|
try:
|
|
resolved = Path(path).expanduser().resolve()
|
|
except Exception as e:
|
|
return f"Invalid path: {e}"
|
|
|
|
if not _is_write_allowed(resolved):
|
|
return (
|
|
f"Write access denied: {resolved}\n"
|
|
f"Allowed write roots: ~/agents_sync/ and the Cortex home/ directory."
|
|
)
|
|
|
|
if mode not in ("overwrite", "append"):
|
|
return f"Invalid mode '{mode}' — use 'overwrite' or 'append'."
|
|
|
|
try:
|
|
resolved.parent.mkdir(parents=True, exist_ok=True)
|
|
if mode == "append":
|
|
with resolved.open("a", encoding="utf-8") as f:
|
|
f.write(content)
|
|
return f"Appended {len(content)} chars to {resolved}"
|
|
else:
|
|
resolved.write_text(content, encoding="utf-8")
|
|
return f"Wrote {len(content)} chars to {resolved}"
|
|
except Exception as e:
|
|
logger.error("file_write error for %s: %s", resolved, e)
|
|
return f"Write error: {e}"
|
|
|
|
|
|
_SEARCH_EXCERPT_CHARS = 150
|
|
|
|
|
|
async def session_read(date: str) -> str:
|
|
"""Read a full session log by date (YYYY-MM-DD).
|
|
|
|
Returns the complete session log for that date. If the date is not found,
|
|
lists the most recent available dates instead.
|
|
Only reads the current user's own sessions (per-persona isolation via ContextVars).
|
|
"""
|
|
return await asyncio.to_thread(_sync_session_read, date.strip())
|
|
|
|
|
|
def _sync_session_read(date: str) -> str:
|
|
from persona import persona_path
|
|
sessions_dir = persona_path() / "sessions"
|
|
if not sessions_dir.exists():
|
|
return "No session logs found."
|
|
|
|
target = sessions_dir / f"{date}.md"
|
|
if target.exists():
|
|
content = target.read_text()
|
|
return f"Session log for {date} ({len(content)} chars):\n\n{content}"
|
|
|
|
available = sorted([f.stem for f in sessions_dir.glob("*.md")], reverse=True)
|
|
if not available:
|
|
return "No session logs found."
|
|
recent = "\n".join(f" {d}" for d in available[:15])
|
|
return f"No session log found for '{date}'. Available dates (most recent first):\n{recent}"
|
|
|
|
|
|
async def session_search(query: str, limit: int = 5) -> str:
|
|
"""Search past session logs for a keyword or phrase.
|
|
|
|
Returns up to `limit` matching excerpts with session dates, newest first.
|
|
Only searches the current user's own sessions (per-persona isolation via ContextVars).
|
|
"""
|
|
return await asyncio.to_thread(_sync_session_search, query, limit)
|
|
|
|
|
|
def _sync_session_search(query: str, limit: int) -> str:
|
|
from persona import persona_path
|
|
sessions_dir = persona_path() / "sessions"
|
|
if not sessions_dir.exists():
|
|
return "No session logs found."
|
|
|
|
limit = max(1, min(limit, 20))
|
|
pattern = re.compile(re.escape(query), re.IGNORECASE)
|
|
session_files = sorted(sessions_dir.glob("*.md"), reverse=True)
|
|
|
|
matches = []
|
|
for sf in session_files:
|
|
if len(matches) >= limit:
|
|
break
|
|
try:
|
|
text = sf.read_text()
|
|
except OSError:
|
|
continue
|
|
for m in pattern.finditer(text):
|
|
if len(matches) >= limit:
|
|
break
|
|
start = max(0, m.start() - _SEARCH_EXCERPT_CHARS)
|
|
end = min(len(text), m.end() + _SEARCH_EXCERPT_CHARS)
|
|
excerpt = text[start:end].strip()
|
|
if start > 0:
|
|
excerpt = "…" + excerpt
|
|
if end < len(text):
|
|
excerpt = excerpt + "…"
|
|
matches.append(f"[{sf.stem}] {excerpt}")
|
|
|
|
if not matches:
|
|
return f"No matches for '{query}' across {len(session_files)} session logs."
|
|
|
|
header = f"Session search: '{query}' — {len(matches)} match(es) across {len(session_files)} logs\n"
|
|
return header + "\n\n".join(matches)
|
|
|
|
|
|
DECLARATIONS = [
|
|
types.FunctionDeclaration(
|
|
name="file_read",
|
|
description=(
|
|
"Read a local file and return its contents. "
|
|
"Allowed directories: ~/agents_sync/, ~/OSIT_dev/, ~/DgrZone_Nextcloud/, ~/OSIT_Nextcloud/, "
|
|
"and the Cortex home/ directory (persona memory, tool audit logs, etc.). "
|
|
"Use this to read documentation, notes, CLAUDE.md files, config references, "
|
|
"or tool audit logs at home/{user}/tool_audit/YYYY-MM-DD.jsonl. "
|
|
"If given a directory path, returns a directory listing instead."
|
|
),
|
|
parameters=types.Schema(
|
|
type=types.Type.OBJECT,
|
|
properties={
|
|
"path": types.Schema(type=types.Type.STRING, description="Absolute or home-relative path to the file (e.g. ~/agents_sync/CLAUDE.md or /home/scott/agents_sync/tasks/01_todo/)"),
|
|
"max_lines": types.Schema(type=types.Type.INTEGER, description="Optional line limit (default 500)"),
|
|
},
|
|
required=["path"],
|
|
),
|
|
),
|
|
types.FunctionDeclaration(
|
|
name="file_list",
|
|
description=(
|
|
"List the files and subdirectories in a directory. "
|
|
"Allowed paths: ~/agents_sync/, ~/OSIT_dev/, ~/DgrZone_Nextcloud/, ~/OSIT_Nextcloud/. "
|
|
"ADMIN ONLY."
|
|
),
|
|
parameters=types.Schema(
|
|
type=types.Type.OBJECT,
|
|
properties={
|
|
"path": types.Schema(type=types.Type.STRING, description="Absolute or home-relative path to the directory"),
|
|
},
|
|
required=["path"],
|
|
),
|
|
),
|
|
types.FunctionDeclaration(
|
|
name="file_write",
|
|
description=(
|
|
"Write or append content to a file. "
|
|
"Write-allowed paths: ~/agents_sync/ and the Cortex home/ directory. "
|
|
"Creates parent directories if needed. "
|
|
"ADMIN ONLY. Requires user confirmation before executing."
|
|
),
|
|
parameters=types.Schema(
|
|
type=types.Type.OBJECT,
|
|
properties={
|
|
"path": types.Schema(type=types.Type.STRING, description="Absolute or home-relative path to write to"),
|
|
"content": types.Schema(type=types.Type.STRING, description="Content to write"),
|
|
"mode": types.Schema(type=types.Type.STRING, description="'overwrite' (default, replaces file) or 'append' (adds to end)"),
|
|
},
|
|
required=["path", "content"],
|
|
),
|
|
),
|
|
types.FunctionDeclaration(
|
|
name="session_read",
|
|
description=(
|
|
"Read a full session log by date (YYYY-MM-DD). Returns the complete conversation "
|
|
"from that session — useful for continuity, recalling decisions, or reviewing "
|
|
"what was discussed on a specific day. If the date is not found, lists available dates. "
|
|
"Only reads this user's own sessions."
|
|
),
|
|
parameters=types.Schema(
|
|
type=types.Type.OBJECT,
|
|
properties={
|
|
"date": types.Schema(type=types.Type.STRING, description="Date in YYYY-MM-DD format (e.g. '2026-05-08')"),
|
|
},
|
|
required=["date"],
|
|
),
|
|
),
|
|
types.FunctionDeclaration(
|
|
name="session_search",
|
|
description=(
|
|
"Search past conversation session logs for a keyword or phrase. "
|
|
"Use this to recall what was discussed in previous sessions — "
|
|
"e.g. 'what did we decide about X?', 'when did we set up Y?'. "
|
|
"Returns matching excerpts with session dates, newest first. "
|
|
"Only searches this user's own sessions."
|
|
),
|
|
parameters=types.Schema(
|
|
type=types.Type.OBJECT,
|
|
properties={
|
|
"query": types.Schema(type=types.Type.STRING, description="Keyword or phrase to search for"),
|
|
"limit": types.Schema(type=types.Type.INTEGER, description="Max results to return (default 5, max 20)"),
|
|
},
|
|
required=["query"],
|
|
),
|
|
),
|
|
]
|