Files
Cortex-Inara/cortex/tools/files.py
Scott Idem ac06b3bc7b feat: project-scoped file tools — grep, stat, syntax_check, offset reads
Add five project-scoped tools (user-level, no admin required):
  project_file_read — read with 1-based offset for paging large files
  project_file_list — list with sizes + timestamps
  file_stat         — size, modified time, line count / entry count
  file_grep         — regex search with context lines, up to 50 matches
  file_syntax_check — py_compile (.py) or json.loads (.json)

Also add offset support to existing file_read (system scope).
Rename "Files" tool category to "System Files"; add "Project Files" category.
Project scope restricted to Cortex_and_Inara_dev/ project root.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-11 22:23:50 -04:00

742 lines
26 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
File read/write/search tools — two access scopes.
Project scope (no admin required):
project_file_read — read a file with optional line-range (offset)
project_file_list — list a directory with sizes + timestamps
file_stat — size, modified time, line count for a path
file_grep — regex search with context lines; up to 50 matches
file_syntax_check — py_compile (.py) or json.loads (.json) check
System scope (admin-only):
file_read — read a file from ~/agents_sync/, ~/OSIT_dev/, etc.
file_list — list a directory (same roots)
file_write — write/append (~/agents_sync/ + Cortex home/)
Session tools (user-level, persona-isolated):
session_read — read a session log by date
session_search — keyword search across session logs
All project-scope tools are restricted to the Cortex project root:
~/agents_sync/projects/Cortex_and_Inara_dev/
"""
import asyncio
import json
import logging
import re
import subprocess
from datetime import datetime
from pathlib import Path
from google.genai import types
logger = logging.getLogger(__name__)
# ── Access roots ──────────────────────────────────────────────────────────────
# Project root: two levels up from cortex/tools/files.py → Cortex_and_Inara_dev/
_PROJECT_ROOT: Path = Path(__file__).parent.parent.parent.resolve()
# System-wide read roots
def _build_allowed_roots() -> list[Path]:
roots = [
Path.home() / "agents_sync",
Path.home() / "OSIT_dev",
Path.home() / "DgrZone_Nextcloud",
Path.home() / "OSIT_Nextcloud",
]
try:
from config import settings
roots.append(settings.home_root())
except Exception:
pass
return roots
_ALLOWED_ROOTS: list[Path] = _build_allowed_roots()
# Write is tighter
_WRITE_ROOTS: list[Path] = [Path.home() / "agents_sync"]
# Size limits
_MAX_BYTES = 50_000
_MAX_LINES = 500
_MAX_GREP_MATCHES = 50
def _is_project_allowed(resolved: Path) -> bool:
try:
resolved.relative_to(_PROJECT_ROOT)
return True
except ValueError:
return False
def _is_allowed(resolved: Path) -> bool:
for root in _ALLOWED_ROOTS:
try:
resolved.relative_to(root)
return True
except ValueError:
continue
return False
def _is_write_allowed(resolved: Path) -> bool:
for root in _WRITE_ROOTS:
try:
resolved.relative_to(root)
return True
except ValueError:
continue
try:
from config import settings
resolved.relative_to(settings.home_root())
return True
except (ValueError, Exception):
pass
return False
# ── Shared implementations ────────────────────────────────────────────────────
def _read_impl(path_str: str, offset: int | None, max_lines: int | None, is_allowed_fn) -> str:
try:
resolved = Path(path_str).expanduser().resolve()
except Exception as e:
return f"Invalid path: {e}"
if not is_allowed_fn(resolved):
return f"Access denied: {resolved}"
if not resolved.exists():
return f"File not found: {resolved}"
if not resolved.is_file():
try:
entries = sorted(resolved.iterdir())
names = [e.name + ("/" if e.is_dir() else "") for e in entries[:100]]
return f"Directory listing for {resolved}:\n" + "\n".join(names)
except Exception as e:
return f"Cannot list directory: {e}"
try:
raw = resolved.read_bytes()
except Exception as e:
return f"Read error: {e}"
try:
text = raw.decode("utf-8")
except UnicodeDecodeError:
return f"Binary file (not readable as text): {resolved} [{len(raw)} bytes]"
all_lines = text.splitlines()
total = len(all_lines)
# offset is 1-based; default = start of file
start = max(0, (offset or 1) - 1)
working = all_lines[start:]
limit = min(max_lines or _MAX_LINES, _MAX_LINES)
truncated = False
if len(working) > limit:
working = working[:limit]
truncated = True
result = "\n".join(working)
if len(result) > _MAX_BYTES:
result = result[:_MAX_BYTES]
truncated = True
end_line = start + len(working)
header = f"[Lines {start + 1}{end_line} of {total}]\n" if (start > 0 or truncated) else ""
trailer = f"\n\n… [truncated — file has {total} lines; use offset={end_line + 1} to read more]" if truncated else ""
return header + result + trailer
def _list_impl(path_str: str, is_allowed_fn) -> str:
try:
resolved = Path(path_str).expanduser().resolve()
except Exception as e:
return f"Invalid path: {e}"
if not is_allowed_fn(resolved):
return f"Access denied: {resolved}"
if not resolved.exists():
return f"Path not found: {resolved}"
if resolved.is_file():
return f"{resolved} is a file. Use file_read / project_file_read to read it."
try:
entries = sorted(resolved.iterdir(), key=lambda e: (e.is_file(), e.name.lower()))
lines = []
for e in entries[:200]:
if e.is_dir():
suffix = "/"
else:
try:
st = e.stat()
mtime = datetime.fromtimestamp(st.st_mtime).strftime("%Y-%m-%d %H:%M")
suffix = f" ({st.st_size:,} B, {mtime})"
except Exception:
suffix = ""
lines.append(f"{e.name}{suffix}")
result = "\n".join(lines)
if len(entries) > 200:
result += f"\n… ({len(entries) - 200} more not shown)"
return f"Contents of {resolved}:\n\n{result}"
except Exception as e:
return f"Cannot list directory: {e}"
# ── Project-scoped tools ──────────────────────────────────────────────────────
async def project_file_read(path: str, offset: int | None = None, max_lines: int | None = None) -> str:
"""Read a file within the Cortex project directory, with optional line range."""
return await asyncio.to_thread(_read_impl, path, offset, max_lines, _is_project_allowed)
async def project_file_list(path: str) -> str:
"""List directory contents within the Cortex project directory, with sizes and timestamps."""
return await asyncio.to_thread(_list_impl, path, _is_project_allowed)
async def file_stat(path: str) -> str:
"""Return metadata for a file or directory: type, size, modified time, line count."""
return await asyncio.to_thread(_sync_file_stat, path)
def _sync_file_stat(path_str: str) -> str:
try:
resolved = Path(path_str).expanduser().resolve()
except Exception as e:
return f"Invalid path: {e}"
if not _is_project_allowed(resolved):
return f"Access denied: {resolved}\nProject root: {_PROJECT_ROOT}"
if not resolved.exists():
return f"Path not found: {resolved}"
try:
st = resolved.stat()
except Exception as e:
return f"Cannot stat: {e}"
modified = datetime.fromtimestamp(st.st_mtime).strftime("%Y-%m-%d %H:%M:%S")
lines = [
f"Path: {resolved}",
f"Type: {'directory' if resolved.is_dir() else 'file'}",
f"Size: {st.st_size:,} bytes",
f"Modified: {modified}",
]
if resolved.is_file():
try:
raw = resolved.read_bytes()
if b'\x00' not in raw[:1024]:
lines.append(f"Lines: {len(raw.decode('utf-8', errors='replace').splitlines())}")
except Exception:
pass
elif resolved.is_dir():
try:
entries = list(resolved.iterdir())
n_files = sum(1 for e in entries if e.is_file())
n_dirs = sum(1 for e in entries if e.is_dir())
lines.append(f"Contents: {n_files} file(s), {n_dirs} subdirector{'y' if n_dirs == 1 else 'ies'}")
except Exception:
pass
return "\n".join(lines)
async def file_grep(path: str, pattern: str, context_lines: int = 2, recursive: bool = True) -> str:
"""Search for a regex pattern in a file or directory, returning matching lines with context."""
return await asyncio.to_thread(_sync_file_grep, path, pattern, context_lines, recursive)
def _sync_file_grep(path_str: str, pattern: str, context_lines: int, recursive: bool) -> str:
try:
resolved = Path(path_str).expanduser().resolve()
except Exception as e:
return f"Invalid path: {e}"
if not _is_project_allowed(resolved):
return f"Access denied: {resolved}\nProject root: {_PROJECT_ROOT}"
if not resolved.exists():
return f"Path not found: {resolved}"
try:
regex = re.compile(pattern, re.IGNORECASE)
except re.error as e:
return f"Invalid regex pattern: {e}"
ctx = max(0, min(context_lines, 5))
if resolved.is_file():
files_to_search = [resolved]
elif recursive:
files_to_search = sorted(f for f in resolved.rglob("*") if f.is_file())
else:
files_to_search = sorted(f for f in resolved.iterdir() if f.is_file())
total_matches = 0
sections: list[str] = []
capped = False
for fp in files_to_search:
if total_matches >= _MAX_GREP_MATCHES:
capped = True
break
try:
raw = fp.read_bytes()
except OSError:
continue
if b'\x00' in raw[:1024]:
continue # skip binary
try:
text = raw.decode("utf-8", errors="replace")
except Exception:
continue
file_lines = text.splitlines()
match_indices = [i for i, line in enumerate(file_lines) if regex.search(line)]
if not match_indices:
continue
total_matches += len(match_indices)
try:
label = str(fp.relative_to(_PROJECT_ROOT))
except ValueError:
label = str(fp)
file_output = [f"── {label} ──"]
printed: set[int] = set()
for mi in match_indices:
start = max(0, mi - ctx)
end = min(len(file_lines), mi + ctx + 1)
if printed and start > max(printed) + 1:
file_output.append(" ···")
for j in range(start, end):
if j not in printed:
marker = "" if j == mi else " "
file_output.append(f" {j + 1:4d}{marker} {file_lines[j]}")
printed.add(j)
sections.append("\n".join(file_output))
if not sections:
return f"No matches for '{pattern}' in {resolved}"
cap_note = f" (capped at {_MAX_GREP_MATCHES})" if capped else ""
header = f"grep '{pattern}'{total_matches} match(es){cap_note}:"
return header + "\n\n" + "\n\n".join(sections)
async def file_syntax_check(path: str) -> str:
"""Check syntax of a Python (.py) or JSON (.json) file."""
return await asyncio.to_thread(_sync_file_syntax_check, path)
def _sync_file_syntax_check(path_str: str) -> str:
try:
resolved = Path(path_str).expanduser().resolve()
except Exception as e:
return f"Invalid path: {e}"
if not _is_project_allowed(resolved):
return f"Access denied: {resolved}\nProject root: {_PROJECT_ROOT}"
if not resolved.exists():
return f"File not found: {resolved}"
if not resolved.is_file():
return f"Not a file: {resolved}"
suffix = resolved.suffix.lower()
if suffix == ".py":
try:
result = subprocess.run(
["python3", "-m", "py_compile", str(resolved)],
capture_output=True, text=True, timeout=15,
)
if result.returncode == 0:
return f"OK — {resolved.name}: syntax valid"
err = (result.stderr or result.stdout).strip()
return f"Syntax error in {resolved.name}:\n{err}"
except subprocess.TimeoutExpired:
return f"Timeout running py_compile on {resolved.name}"
except Exception as e:
return f"Error: {e}"
elif suffix == ".json":
try:
text = resolved.read_text(encoding="utf-8")
json.loads(text)
return f"OK — {resolved.name}: valid JSON"
except json.JSONDecodeError as e:
return f"JSON error in {resolved.name}: {e}"
except Exception as e:
return f"Error reading {resolved.name}: {e}"
else:
return f"Syntax check not supported for '{suffix}' files. Supported: .py, .json"
# ── System-scoped tools ───────────────────────────────────────────────────────
async def file_read(path: str, offset: int | None = None, max_lines: int | None = None) -> str:
"""Read a local file from the broader system. Allowed: ~/agents_sync/, ~/OSIT_dev/, etc. ADMIN ONLY."""
return await asyncio.to_thread(_read_impl, path, offset, max_lines, _is_allowed)
async def file_list(path: str) -> str:
"""List directory contents from the broader system. ADMIN ONLY."""
return await asyncio.to_thread(_list_impl, path, _is_allowed)
async def file_write(path: str, content: str, mode: str = "overwrite") -> str:
"""Write or append content to a file. Write roots: ~/agents_sync/ and Cortex home/. ADMIN ONLY."""
return await asyncio.to_thread(_sync_file_write, path, content, mode)
def _sync_file_write(path: str, content: str, mode: str) -> str:
try:
resolved = Path(path).expanduser().resolve()
except Exception as e:
return f"Invalid path: {e}"
if not _is_write_allowed(resolved):
return (
f"Write access denied: {resolved}\n"
f"Allowed write roots: ~/agents_sync/ and the Cortex home/ directory."
)
if mode not in ("overwrite", "append"):
return f"Invalid mode '{mode}' — use 'overwrite' or 'append'."
try:
resolved.parent.mkdir(parents=True, exist_ok=True)
if mode == "append":
with resolved.open("a", encoding="utf-8") as f:
f.write(content)
return f"Appended {len(content)} chars to {resolved}"
else:
resolved.write_text(content, encoding="utf-8")
return f"Wrote {len(content)} chars to {resolved}"
except Exception as e:
logger.error("file_write error for %s: %s", resolved, e)
return f"Write error: {e}"
# ── Session tools ─────────────────────────────────────────────────────────────
_SEARCH_EXCERPT_CHARS = 150
async def session_read(date: str) -> str:
"""Read a full session log by date (YYYY-MM-DD)."""
return await asyncio.to_thread(_sync_session_read, date.strip())
def _sync_session_read(date: str) -> str:
from persona import persona_path
sessions_dir = persona_path() / "sessions"
if not sessions_dir.exists():
return "No session logs found."
target = sessions_dir / f"{date}.md"
if target.exists():
content = target.read_text()
return f"Session log for {date} ({len(content)} chars):\n\n{content}"
available = sorted([f.stem for f in sessions_dir.glob("*.md")], reverse=True)
if not available:
return "No session logs found."
recent = "\n".join(f" {d}" for d in available[:15])
return f"No session log found for '{date}'. Available dates (most recent first):\n{recent}"
async def session_search(query: str, limit: int = 5) -> str:
"""Search past session logs for a keyword or phrase."""
return await asyncio.to_thread(_sync_session_search, query, limit)
def _sync_session_search(query: str, limit: int) -> str:
from persona import persona_path
sessions_dir = persona_path() / "sessions"
if not sessions_dir.exists():
return "No session logs found."
limit = max(1, min(limit, 20))
pattern = re.compile(re.escape(query), re.IGNORECASE)
session_files = sorted(sessions_dir.glob("*.md"), reverse=True)
matches = []
for sf in session_files:
if len(matches) >= limit:
break
try:
text = sf.read_text()
except OSError:
continue
for m in pattern.finditer(text):
if len(matches) >= limit:
break
start = max(0, m.start() - _SEARCH_EXCERPT_CHARS)
end = min(len(text), m.end() + _SEARCH_EXCERPT_CHARS)
excerpt = text[start:end].strip()
if start > 0:
excerpt = "" + excerpt
if end < len(text):
excerpt = excerpt + ""
matches.append(f"[{sf.stem}] {excerpt}")
if not matches:
return f"No matches for '{query}' across {len(session_files)} session logs."
header = f"Session search: '{query}'{len(matches)} match(es) across {len(session_files)} logs\n"
return header + "\n\n".join(matches)
# ── Declarations ──────────────────────────────────────────────────────────────
DECLARATIONS = [
# Project-scoped
types.FunctionDeclaration(
name="project_file_read",
description=(
"Read a file within the Cortex project directory (source code, docs, config, persona files). "
"Supports reading a specific line range via offset — use to page through large files "
"without re-reading from the top. If given a directory path, returns a listing instead. "
"Project root: ~/agents_sync/projects/Cortex_and_Inara_dev/"
),
parameters=types.Schema(
type=types.Type.OBJECT,
properties={
"path": types.Schema(
type=types.Type.STRING,
description="Absolute or ~/... path to the file",
),
"offset": types.Schema(
type=types.Type.INTEGER,
description="Start reading from this line number (1-based). Omit to read from the top.",
),
"max_lines": types.Schema(
type=types.Type.INTEGER,
description="Maximum lines to return (default 500)",
),
},
required=["path"],
),
),
types.FunctionDeclaration(
name="project_file_list",
description=(
"List files and subdirectories within the Cortex project directory. "
"Shows file sizes and modified timestamps. "
"Project root: ~/agents_sync/projects/Cortex_and_Inara_dev/"
),
parameters=types.Schema(
type=types.Type.OBJECT,
properties={
"path": types.Schema(
type=types.Type.STRING,
description="Absolute or ~/... path to the directory",
),
},
required=["path"],
),
),
types.FunctionDeclaration(
name="file_stat",
description=(
"Get metadata for a file or directory: type, size, modified timestamp, line count (for text files) "
"or entry counts (for directories). Use before reading to check recency or size. "
"Restricted to the Cortex project directory."
),
parameters=types.Schema(
type=types.Type.OBJECT,
properties={
"path": types.Schema(
type=types.Type.STRING,
description="Absolute or ~/... path to the file or directory",
),
},
required=["path"],
),
),
types.FunctionDeclaration(
name="file_grep",
description=(
"Search for a regex pattern in a file or directory, returning matching lines with surrounding "
"context. Much more efficient than reading an entire source file — use this to find function "
"definitions, variable names, TODO comments, imports, error strings, etc. "
"Searches recursively by default. Capped at 50 matches. Skips binary files. "
"Case-insensitive. Restricted to the Cortex project directory."
),
parameters=types.Schema(
type=types.Type.OBJECT,
properties={
"path": types.Schema(
type=types.Type.STRING,
description="File or directory to search (e.g. ~/agents_sync/projects/Cortex_and_Inara_dev/cortex/)",
),
"pattern": types.Schema(
type=types.Type.STRING,
description="Regex pattern to search for (case-insensitive). Examples: 'def ha_', 'import httpx', 'TODO'",
),
"context_lines": types.Schema(
type=types.Type.INTEGER,
description="Lines of context before/after each match (default 2, max 5)",
),
"recursive": types.Schema(
type=types.Type.BOOLEAN,
description="Search subdirectories recursively (default true)",
),
},
required=["path", "pattern"],
),
),
types.FunctionDeclaration(
name="file_syntax_check",
description=(
"Check the syntax of a Python (.py) or JSON (.json) file without executing it. "
"Returns OK or the error with line number. "
"Use after editing a file before restarting Cortex. "
"Restricted to the Cortex project directory."
),
parameters=types.Schema(
type=types.Type.OBJECT,
properties={
"path": types.Schema(
type=types.Type.STRING,
description="Path to the .py or .json file to check",
),
},
required=["path"],
),
),
# System-scoped
types.FunctionDeclaration(
name="file_read",
description=(
"Read a local file from the broader system (~/agents_sync/, ~/OSIT_dev/, ~/DgrZone_Nextcloud/, "
"~/OSIT_Nextcloud/, Cortex home/). Supports offset for reading specific line ranges. "
"For files within the Cortex project, prefer project_file_read instead. "
"ADMIN ONLY."
),
parameters=types.Schema(
type=types.Type.OBJECT,
properties={
"path": types.Schema(
type=types.Type.STRING,
description="Absolute or ~/... path to the file",
),
"offset": types.Schema(
type=types.Type.INTEGER,
description="Start reading from this line number (1-based)",
),
"max_lines": types.Schema(
type=types.Type.INTEGER,
description="Maximum lines to return (default 500)",
),
},
required=["path"],
),
),
types.FunctionDeclaration(
name="file_list",
description=(
"List files and subdirectories from the broader system. "
"Shows sizes and modified timestamps. "
"Allowed: ~/agents_sync/, ~/OSIT_dev/, ~/DgrZone_Nextcloud/, ~/OSIT_Nextcloud/. "
"ADMIN ONLY."
),
parameters=types.Schema(
type=types.Type.OBJECT,
properties={
"path": types.Schema(
type=types.Type.STRING,
description="Absolute or ~/... path to the directory",
),
},
required=["path"],
),
),
types.FunctionDeclaration(
name="file_write",
description=(
"Write or append content to a file. "
"Write-allowed paths: ~/agents_sync/ and the Cortex home/ directory. "
"Creates parent directories if needed. "
"ADMIN ONLY. Requires user confirmation before executing."
),
parameters=types.Schema(
type=types.Type.OBJECT,
properties={
"path": types.Schema(
type=types.Type.STRING,
description="Absolute or ~/... path to write to",
),
"content": types.Schema(
type=types.Type.STRING,
description="Content to write",
),
"mode": types.Schema(
type=types.Type.STRING,
description="'overwrite' (default, replaces file) or 'append' (adds to end)",
),
},
required=["path", "content"],
),
),
types.FunctionDeclaration(
name="session_read",
description=(
"Read a full conversation session log by date (YYYY-MM-DD). "
"Useful for continuity and recalling past decisions. "
"If the date is not found, lists available dates. "
"Only reads this user's own sessions."
),
parameters=types.Schema(
type=types.Type.OBJECT,
properties={
"date": types.Schema(
type=types.Type.STRING,
description="Date in YYYY-MM-DD format (e.g. '2026-05-08')",
),
},
required=["date"],
),
),
types.FunctionDeclaration(
name="session_search",
description=(
"Search past conversation session logs for a keyword or phrase. "
"Returns matching excerpts with session dates, newest first. "
"Only searches this user's own sessions."
),
parameters=types.Schema(
type=types.Type.OBJECT,
properties={
"query": types.Schema(
type=types.Type.STRING,
description="Keyword or phrase to search for",
),
"limit": types.Schema(
type=types.Type.INTEGER,
description="Max results to return (default 5, max 20)",
),
},
required=["query"],
),
),
]