Files
Cortex-Inara/cortex/tools/files.py
Scott Idem 334e7f0dea feat: role-based tool access, confirmation gates, and new orchestrator tools
- auth_utils: get_user_role() reads role from auth.json (admin|user, default user)
- manage_passwords: new `role` command to promote/demote users (admin-only by convention)
- tools/__init__: TOOL_ROLES map, CONFIRM_REQUIRED set, get_tools_for_role(),
  get_openai_tools_for_role() — both orchestrators now filter tools by caller's role
- tools/system: cortex_restart (detached subprocess, 5s delay), cortex_logs (admin-only)
- tools/web: http_fetch — direct URL fetch, distinct from web_search
- tools/files: file_list (directory listing), file_write (restricted paths, admin-only)
- tools/notify: nc_talk_send — proactive outbound via notification.py
- orchestrator_engine + openai_orchestrator: user_role param; CONFIRM_REQUIRED tools
  return a confirmation-request result instead of executing — loop breaks after Claude
  asks user to confirm in a follow-up message
- home/scott/auth.json: role set to admin

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-29 19:23:53 -04:00

215 lines
6.7 KiB
Python

"""
File read tool — restricted to known-safe directory roots.
Lets the orchestrator read local files (documentation, notes, config references)
without exposing arbitrary filesystem access. All paths are resolved and checked
against an allowlist of roots before any read is performed.
"""
import asyncio
import logging
from pathlib import Path
logger = logging.getLogger(__name__)
# Directories the orchestrator is allowed to read from.
# Paths are resolved (symlinks followed, ~ expanded) at import time.
_ALLOWED_ROOTS: list[Path] = [
Path.home() / "agents_sync",
Path.home() / "OSIT_dev",
Path.home() / "DgrZone_Nextcloud",
Path.home() / "OSIT_Nextcloud",
]
# Hard cap on file size to prevent accidental context blowout
_MAX_BYTES = 50_000 # ~50 KB
_MAX_LINES = 500
async def file_read(path: str, max_lines: int | None = None) -> str:
"""Read a local file and return its contents as a string.
Only files within allowed directories can be read:
~/agents_sync/, ~/OSIT_dev/, ~/DgrZone_Nextcloud/, ~/OSIT_Nextcloud/
Args:
path: Absolute or home-relative path to the file (e.g. ~/agents_sync/CLAUDE.md).
max_lines: Optional line limit (default 500, hard cap). Use for large files.
Returns the file contents (truncated if over the size limit), or an error message.
"""
return await asyncio.to_thread(_sync_file_read, path, max_lines)
def _sync_file_read(path: str, max_lines: int | None) -> str:
# Expand ~ and resolve to absolute path
try:
resolved = Path(path).expanduser().resolve()
except Exception as e:
return f"Invalid path: {e}"
# Security check — must be under an allowed root
if not _is_allowed(resolved):
allowed_str = ", ".join(str(r) for r in _ALLOWED_ROOTS)
return (
f"Access denied: {resolved}\n"
f"Allowed directories: {allowed_str}"
)
if not resolved.exists():
return f"File not found: {resolved}"
if not resolved.is_file():
# If it's a directory, list its contents instead
try:
entries = sorted(resolved.iterdir())
names = [e.name + ("/" if e.is_dir() else "") for e in entries[:100]]
return f"Directory listing for {resolved}:\n" + "\n".join(names)
except Exception as e:
return f"Cannot list directory: {e}"
# Read the file
try:
raw = resolved.read_bytes()
except Exception as e:
return f"Read error: {e}"
# Binary files
try:
text = raw.decode("utf-8")
except UnicodeDecodeError:
return f"Binary file (not readable as text): {resolved} [{len(raw)} bytes]"
# Apply line limit
limit = min(max_lines or _MAX_LINES, _MAX_LINES)
lines = text.splitlines()
truncated = False
if len(lines) > limit:
lines = lines[:limit]
truncated = True
# Apply byte cap as a final safety net
result = "\n".join(lines)
if len(result) > _MAX_BYTES:
result = result[:_MAX_BYTES]
truncated = True
if truncated:
result += f"\n\n… [truncated — file has {len(text.splitlines())} lines total]"
return result
def _is_allowed(resolved: Path) -> bool:
"""Check that resolved path is under one of the allowed roots."""
for root in _ALLOWED_ROOTS:
try:
resolved.relative_to(root)
return True
except ValueError:
continue
return False
# Write is restricted to a tighter set of paths to limit blast radius.
_WRITE_ROOTS: list[Path] = [
Path.home() / "agents_sync",
]
def _is_write_allowed(resolved: Path) -> bool:
for root in _WRITE_ROOTS:
try:
resolved.relative_to(root)
return True
except ValueError:
continue
# Also allow the Cortex home/ directory (persona memory, tasks, etc.)
try:
from config import settings
cortex_home = settings.home_root()
resolved.relative_to(cortex_home)
return True
except (ValueError, Exception):
pass
return False
async def file_list(path: str) -> str:
"""List the contents of a directory.
Returns names of files and subdirectories with type indicators (/ for dirs).
Same allow-list as file_read.
"""
return await asyncio.to_thread(_sync_file_list, path)
def _sync_file_list(path: str) -> str:
try:
resolved = Path(path).expanduser().resolve()
except Exception as e:
return f"Invalid path: {e}"
if not _is_allowed(resolved):
allowed_str = ", ".join(str(r) for r in _ALLOWED_ROOTS)
return f"Access denied: {resolved}\nAllowed directories: {allowed_str}"
if not resolved.exists():
return f"Path not found: {resolved}"
if resolved.is_file():
return f"{resolved} is a file, not a directory. Use file_read to read it."
try:
entries = sorted(resolved.iterdir(), key=lambda e: (e.is_file(), e.name.lower()))
lines = []
for e in entries[:200]:
suffix = "/" if e.is_dir() else f" ({e.stat().st_size} bytes)" if e.is_file() else ""
lines.append(f"{e.name}{suffix}")
result = "\n".join(lines)
if len(entries) > 200:
result += f"\n… ({len(entries) - 200} more entries not shown)"
return f"Contents of {resolved}:\n\n{result}"
except Exception as e:
return f"Cannot list directory: {e}"
async def file_write(path: str, content: str, mode: str = "overwrite") -> str:
"""Write content to a file.
mode: 'overwrite' (default) replaces the file; 'append' adds to the end.
Write-allowed paths: ~/agents_sync/ and the Cortex home/ directory.
Parent directories are created if they don't exist.
"""
return await asyncio.to_thread(_sync_file_write, path, content, mode)
def _sync_file_write(path: str, content: str, mode: str) -> str:
try:
resolved = Path(path).expanduser().resolve()
except Exception as e:
return f"Invalid path: {e}"
if not _is_write_allowed(resolved):
return (
f"Write access denied: {resolved}\n"
f"Allowed write roots: ~/agents_sync/ and the Cortex home/ directory."
)
if mode not in ("overwrite", "append"):
return f"Invalid mode '{mode}' — use 'overwrite' or 'append'."
try:
resolved.parent.mkdir(parents=True, exist_ok=True)
if mode == "append":
with resolved.open("a", encoding="utf-8") as f:
f.write(content)
return f"Appended {len(content)} chars to {resolved}"
else:
resolved.write_text(content, encoding="utf-8")
return f"Wrote {len(content)} chars to {resolved}"
except Exception as e:
logger.error("file_write error for %s: %s", resolved, e)
return f"Write error: {e}"