Files
Cortex-Inara/cortex/tools/files.py
Scott Idem 584ae679a6 feat: tool call audit log
Every orchestrator tool invocation is recorded to home/{user}/tool_audit/YYYY-MM-DD.jsonl.
Each entry captures: timestamp, user, tool, args (truncated), status (ok/error/denied),
result length, and a 300-char result snippet.

- tool_audit.py: JSONL writer with per-file asyncio locks; read_recent / read_recent_all_users helpers
- tools/__init__.py: hook in call_tool() — fire-and-forget record on every dispatch
- routers/audit.py: GET /api/audit/recent and /api/audit/stats (admin-only)
- tools/files.py: add home_root() to file_read allowed roots so agents can read audit JSONL

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-05 19:55:59 -04:00

282 lines
9.5 KiB
Python

"""
File read tool — restricted to known-safe directory roots.
Lets the orchestrator read local files (documentation, notes, config references)
without exposing arbitrary filesystem access. All paths are resolved and checked
against an allowlist of roots before any read is performed.
"""
import asyncio
import logging
from pathlib import Path
from google.genai import types
logger = logging.getLogger(__name__)
# Directories the orchestrator is allowed to read from.
# Paths are resolved (symlinks followed, ~ expanded) at import time.
def _build_allowed_roots() -> list[Path]:
roots = [
Path.home() / "agents_sync",
Path.home() / "OSIT_dev",
Path.home() / "DgrZone_Nextcloud",
Path.home() / "OSIT_Nextcloud",
]
try:
from config import settings
roots.append(settings.home_root())
except Exception:
pass
return roots
_ALLOWED_ROOTS: list[Path] = _build_allowed_roots()
# Hard cap on file size to prevent accidental context blowout
_MAX_BYTES = 50_000 # ~50 KB
_MAX_LINES = 500
async def file_read(path: str, max_lines: int | None = None) -> str:
"""Read a local file and return its contents as a string.
Only files within allowed directories can be read:
~/agents_sync/, ~/OSIT_dev/, ~/DgrZone_Nextcloud/, ~/OSIT_Nextcloud/
Args:
path: Absolute or home-relative path to the file (e.g. ~/agents_sync/CLAUDE.md).
max_lines: Optional line limit (default 500, hard cap). Use for large files.
Returns the file contents (truncated if over the size limit), or an error message.
"""
return await asyncio.to_thread(_sync_file_read, path, max_lines)
def _sync_file_read(path: str, max_lines: int | None) -> str:
# Expand ~ and resolve to absolute path
try:
resolved = Path(path).expanduser().resolve()
except Exception as e:
return f"Invalid path: {e}"
# Security check — must be under an allowed root
if not _is_allowed(resolved):
allowed_str = ", ".join(str(r) for r in _ALLOWED_ROOTS)
return (
f"Access denied: {resolved}\n"
f"Allowed directories: {allowed_str}"
)
if not resolved.exists():
return f"File not found: {resolved}"
if not resolved.is_file():
# If it's a directory, list its contents instead
try:
entries = sorted(resolved.iterdir())
names = [e.name + ("/" if e.is_dir() else "") for e in entries[:100]]
return f"Directory listing for {resolved}:\n" + "\n".join(names)
except Exception as e:
return f"Cannot list directory: {e}"
# Read the file
try:
raw = resolved.read_bytes()
except Exception as e:
return f"Read error: {e}"
# Binary files
try:
text = raw.decode("utf-8")
except UnicodeDecodeError:
return f"Binary file (not readable as text): {resolved} [{len(raw)} bytes]"
# Apply line limit
limit = min(max_lines or _MAX_LINES, _MAX_LINES)
lines = text.splitlines()
truncated = False
if len(lines) > limit:
lines = lines[:limit]
truncated = True
# Apply byte cap as a final safety net
result = "\n".join(lines)
if len(result) > _MAX_BYTES:
result = result[:_MAX_BYTES]
truncated = True
if truncated:
result += f"\n\n… [truncated — file has {len(text.splitlines())} lines total]"
return result
def _is_allowed(resolved: Path) -> bool:
"""Check that resolved path is under one of the allowed roots."""
for root in _ALLOWED_ROOTS:
try:
resolved.relative_to(root)
return True
except ValueError:
continue
return False
# Write is restricted to a tighter set of paths to limit blast radius.
_WRITE_ROOTS: list[Path] = [
Path.home() / "agents_sync",
]
def _is_write_allowed(resolved: Path) -> bool:
for root in _WRITE_ROOTS:
try:
resolved.relative_to(root)
return True
except ValueError:
continue
# Also allow the Cortex home/ directory (persona memory, tasks, etc.)
try:
from config import settings
cortex_home = settings.home_root()
resolved.relative_to(cortex_home)
return True
except (ValueError, Exception):
pass
return False
async def file_list(path: str) -> str:
"""List the contents of a directory.
Returns names of files and subdirectories with type indicators (/ for dirs).
Same allow-list as file_read.
"""
return await asyncio.to_thread(_sync_file_list, path)
def _sync_file_list(path: str) -> str:
try:
resolved = Path(path).expanduser().resolve()
except Exception as e:
return f"Invalid path: {e}"
if not _is_allowed(resolved):
allowed_str = ", ".join(str(r) for r in _ALLOWED_ROOTS)
return f"Access denied: {resolved}\nAllowed directories: {allowed_str}"
if not resolved.exists():
return f"Path not found: {resolved}"
if resolved.is_file():
return f"{resolved} is a file, not a directory. Use file_read to read it."
try:
entries = sorted(resolved.iterdir(), key=lambda e: (e.is_file(), e.name.lower()))
lines = []
for e in entries[:200]:
suffix = "/" if e.is_dir() else f" ({e.stat().st_size} bytes)" if e.is_file() else ""
lines.append(f"{e.name}{suffix}")
result = "\n".join(lines)
if len(entries) > 200:
result += f"\n… ({len(entries) - 200} more entries not shown)"
return f"Contents of {resolved}:\n\n{result}"
except Exception as e:
return f"Cannot list directory: {e}"
async def file_write(path: str, content: str, mode: str = "overwrite") -> str:
"""Write content to a file.
mode: 'overwrite' (default) replaces the file; 'append' adds to the end.
Write-allowed paths: ~/agents_sync/ and the Cortex home/ directory.
Parent directories are created if they don't exist.
"""
return await asyncio.to_thread(_sync_file_write, path, content, mode)
def _sync_file_write(path: str, content: str, mode: str) -> str:
try:
resolved = Path(path).expanduser().resolve()
except Exception as e:
return f"Invalid path: {e}"
if not _is_write_allowed(resolved):
return (
f"Write access denied: {resolved}\n"
f"Allowed write roots: ~/agents_sync/ and the Cortex home/ directory."
)
if mode not in ("overwrite", "append"):
return f"Invalid mode '{mode}' — use 'overwrite' or 'append'."
try:
resolved.parent.mkdir(parents=True, exist_ok=True)
if mode == "append":
with resolved.open("a", encoding="utf-8") as f:
f.write(content)
return f"Appended {len(content)} chars to {resolved}"
else:
resolved.write_text(content, encoding="utf-8")
return f"Wrote {len(content)} chars to {resolved}"
except Exception as e:
logger.error("file_write error for %s: %s", resolved, e)
return f"Write error: {e}"
DECLARATIONS = [
types.FunctionDeclaration(
name="file_read",
description=(
"Read a local file and return its contents. "
"Allowed directories: ~/agents_sync/, ~/OSIT_dev/, ~/DgrZone_Nextcloud/, ~/OSIT_Nextcloud/, "
"and the Cortex home/ directory (persona memory, tool audit logs, etc.). "
"Use this to read documentation, notes, CLAUDE.md files, config references, "
"or tool audit logs at home/{user}/tool_audit/YYYY-MM-DD.jsonl. "
"If given a directory path, returns a directory listing instead."
),
parameters=types.Schema(
type=types.Type.OBJECT,
properties={
"path": types.Schema(type=types.Type.STRING, description="Absolute or home-relative path to the file (e.g. ~/agents_sync/CLAUDE.md or /home/scott/agents_sync/tasks/01_todo/)"),
"max_lines": types.Schema(type=types.Type.INTEGER, description="Optional line limit (default 500)"),
},
required=["path"],
),
),
types.FunctionDeclaration(
name="file_list",
description=(
"List the files and subdirectories in a directory. "
"Allowed paths: ~/agents_sync/, ~/OSIT_dev/, ~/DgrZone_Nextcloud/, ~/OSIT_Nextcloud/. "
"ADMIN ONLY."
),
parameters=types.Schema(
type=types.Type.OBJECT,
properties={
"path": types.Schema(type=types.Type.STRING, description="Absolute or home-relative path to the directory"),
},
required=["path"],
),
),
types.FunctionDeclaration(
name="file_write",
description=(
"Write or append content to a file. "
"Write-allowed paths: ~/agents_sync/ and the Cortex home/ directory. "
"Creates parent directories if needed. "
"ADMIN ONLY. Requires user confirmation before executing."
),
parameters=types.Schema(
type=types.Type.OBJECT,
properties={
"path": types.Schema(type=types.Type.STRING, description="Absolute or home-relative path to write to"),
"content": types.Schema(type=types.Type.STRING, description="Content to write"),
"mode": types.Schema(type=types.Type.STRING, description="'overwrite' (default, replaces file) or 'append' (adds to end)"),
},
required=["path", "content"],
),
),
]