""" File read tool — restricted to known-safe directory roots. Lets the orchestrator read local files (documentation, notes, config references) without exposing arbitrary filesystem access. All paths are resolved and checked against an allowlist of roots before any read is performed. """ import asyncio import logging from pathlib import Path logger = logging.getLogger(__name__) # Directories the orchestrator is allowed to read from. # Paths are resolved (symlinks followed, ~ expanded) at import time. _ALLOWED_ROOTS: list[Path] = [ Path.home() / "agents_sync", Path.home() / "OSIT_dev", Path.home() / "DgrZone_Nextcloud", Path.home() / "OSIT_Nextcloud", ] # Hard cap on file size to prevent accidental context blowout _MAX_BYTES = 50_000 # ~50 KB _MAX_LINES = 500 async def file_read(path: str, max_lines: int | None = None) -> str: """Read a local file and return its contents as a string. Only files within allowed directories can be read: ~/agents_sync/, ~/OSIT_dev/, ~/DgrZone_Nextcloud/, ~/OSIT_Nextcloud/ Args: path: Absolute or home-relative path to the file (e.g. ~/agents_sync/CLAUDE.md). max_lines: Optional line limit (default 500, hard cap). Use for large files. Returns the file contents (truncated if over the size limit), or an error message. """ return await asyncio.to_thread(_sync_file_read, path, max_lines) def _sync_file_read(path: str, max_lines: int | None) -> str: # Expand ~ and resolve to absolute path try: resolved = Path(path).expanduser().resolve() except Exception as e: return f"Invalid path: {e}" # Security check — must be under an allowed root if not _is_allowed(resolved): allowed_str = ", ".join(str(r) for r in _ALLOWED_ROOTS) return ( f"Access denied: {resolved}\n" f"Allowed directories: {allowed_str}" ) if not resolved.exists(): return f"File not found: {resolved}" if not resolved.is_file(): # If it's a directory, list its contents instead try: entries = sorted(resolved.iterdir()) names = [e.name + ("/" if e.is_dir() else "") for e in entries[:100]] return f"Directory listing for {resolved}:\n" + "\n".join(names) except Exception as e: return f"Cannot list directory: {e}" # Read the file try: raw = resolved.read_bytes() except Exception as e: return f"Read error: {e}" # Binary files try: text = raw.decode("utf-8") except UnicodeDecodeError: return f"Binary file (not readable as text): {resolved} [{len(raw)} bytes]" # Apply line limit limit = min(max_lines or _MAX_LINES, _MAX_LINES) lines = text.splitlines() truncated = False if len(lines) > limit: lines = lines[:limit] truncated = True # Apply byte cap as a final safety net result = "\n".join(lines) if len(result) > _MAX_BYTES: result = result[:_MAX_BYTES] truncated = True if truncated: result += f"\n\n… [truncated — file has {len(text.splitlines())} lines total]" return result def _is_allowed(resolved: Path) -> bool: """Check that resolved path is under one of the allowed roots.""" for root in _ALLOWED_ROOTS: try: resolved.relative_to(root) return True except ValueError: continue return False