feat: project-scoped file tools — grep, stat, syntax_check, offset reads

Add five project-scoped tools (user-level, no admin required):
  project_file_read — read with 1-based offset for paging large files
  project_file_list — list with sizes + timestamps
  file_stat         — size, modified time, line count / entry count
  file_grep         — regex search with context lines, up to 50 matches
  file_syntax_check — py_compile (.py) or json.loads (.json)

Also add offset support to existing file_read (system scope).
Rename "Files" tool category to "System Files"; add "Project Files" category.
Project scope restricted to Cortex_and_Inara_dev/ project root.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Scott Idem
2026-05-11 22:23:50 -04:00
parent fc6600c33e
commit ac06b3bc7b
2 changed files with 518 additions and 154 deletions

View File

@@ -30,7 +30,18 @@ from tools.ae_knowledge import (
journal_entry_prepend as _ae_journal_entry_prepend,
)
from tools.ae_tasks import task_list as _ae_task_list
from tools.files import file_read as _file_read, file_list as _file_list, file_write as _file_write, session_search as _session_search, session_read as _session_read
from tools.files import (
project_file_read as _project_file_read,
project_file_list as _project_file_list,
file_stat as _file_stat,
file_grep as _file_grep,
file_syntax_check as _file_syntax_check,
file_read as _file_read,
file_list as _file_list,
file_write as _file_write,
session_read as _session_read,
session_search as _session_search,
)
from tools.system import (
shell_exec as _shell_exec,
claude_allow_dir as _claude_allow_dir,
@@ -97,7 +108,8 @@ import tools.homeassistant as _mod_homeassistant
TOOL_CATEGORIES: dict[str, list[str]] = {
"Web": ["web_search", "http_fetch", "web_read", "http_post"],
"Files": ["file_read", "file_list", "file_write", "session_read", "session_search"],
"Project Files": ["project_file_read", "project_file_list", "file_stat", "file_grep", "file_syntax_check"],
"System Files": ["file_read", "file_list", "file_write", "session_read", "session_search"],
"Shell": ["shell_exec", "claude_allow_dir"],
"System": ["cortex_restart", "cortex_logs", "cortex_status", "cortex_update"],
"Tasks": ["task_list", "task_create", "task_update", "task_complete"],
@@ -135,6 +147,11 @@ _CALLABLES: dict[str, callable] = {
"ae_journal_entry_append": _ae_journal_entry_append,
"ae_journal_entry_prepend": _ae_journal_entry_prepend,
"ae_task_list": _ae_task_list,
"project_file_read": _project_file_read,
"project_file_list": _project_file_list,
"file_stat": _file_stat,
"file_grep": _file_grep,
"file_syntax_check": _file_syntax_check,
"file_read": _file_read,
"file_list": _file_list,
"file_write": _file_write,

View File

@@ -1,23 +1,44 @@
"""
File read/write/search tools — restricted to known-safe directory roots.
File read/write/search tools — two access scopes.
Lets the orchestrator read local files (documentation, notes, config references)
and search past session logs without exposing arbitrary filesystem access.
All paths are resolved and checked against an allowlist of roots before any
read or write is performed.
Project scope (no admin required):
project_file_read — read a file with optional line-range (offset)
project_file_list — list a directory with sizes + timestamps
file_stat — size, modified time, line count for a path
file_grep — regex search with context lines; up to 50 matches
file_syntax_check — py_compile (.py) or json.loads (.json) check
System scope (admin-only):
file_read — read a file from ~/agents_sync/, ~/OSIT_dev/, etc.
file_list — list a directory (same roots)
file_write — write/append (~/agents_sync/ + Cortex home/)
Session tools (user-level, persona-isolated):
session_read — read a session log by date
session_search — keyword search across session logs
All project-scope tools are restricted to the Cortex project root:
~/agents_sync/projects/Cortex_and_Inara_dev/
"""
import asyncio
import json
import logging
import re
import subprocess
from datetime import datetime
from pathlib import Path
from google.genai import types
logger = logging.getLogger(__name__)
# Directories the orchestrator is allowed to read from.
# Paths are resolved (symlinks followed, ~ expanded) at import time.
# ── Access roots ──────────────────────────────────────────────────────────────
# Project root: two levels up from cortex/tools/files.py → Cortex_and_Inara_dev/
_PROJECT_ROOT: Path = Path(__file__).parent.parent.parent.resolve()
# System-wide read roots
def _build_allowed_roots() -> list[Path]:
roots = [
Path.home() / "agents_sync",
@@ -34,88 +55,24 @@ def _build_allowed_roots() -> list[Path]:
_ALLOWED_ROOTS: list[Path] = _build_allowed_roots()
# Hard cap on file size to prevent accidental context blowout
_MAX_BYTES = 50_000 # ~50 KB
# Write is tighter
_WRITE_ROOTS: list[Path] = [Path.home() / "agents_sync"]
# Size limits
_MAX_BYTES = 50_000
_MAX_LINES = 500
_MAX_GREP_MATCHES = 50
async def file_read(path: str, max_lines: int | None = None) -> str:
"""Read a local file and return its contents as a string.
Only files within allowed directories can be read:
~/agents_sync/, ~/OSIT_dev/, ~/DgrZone_Nextcloud/, ~/OSIT_Nextcloud/
Args:
path: Absolute or home-relative path to the file (e.g. ~/agents_sync/CLAUDE.md).
max_lines: Optional line limit (default 500, hard cap). Use for large files.
Returns the file contents (truncated if over the size limit), or an error message.
"""
return await asyncio.to_thread(_sync_file_read, path, max_lines)
def _sync_file_read(path: str, max_lines: int | None) -> str:
# Expand ~ and resolve to absolute path
def _is_project_allowed(resolved: Path) -> bool:
try:
resolved = Path(path).expanduser().resolve()
except Exception as e:
return f"Invalid path: {e}"
# Security check — must be under an allowed root
if not _is_allowed(resolved):
allowed_str = ", ".join(str(r) for r in _ALLOWED_ROOTS)
return (
f"Access denied: {resolved}\n"
f"Allowed directories: {allowed_str}"
)
if not resolved.exists():
return f"File not found: {resolved}"
if not resolved.is_file():
# If it's a directory, list its contents instead
try:
entries = sorted(resolved.iterdir())
names = [e.name + ("/" if e.is_dir() else "") for e in entries[:100]]
return f"Directory listing for {resolved}:\n" + "\n".join(names)
except Exception as e:
return f"Cannot list directory: {e}"
# Read the file
try:
raw = resolved.read_bytes()
except Exception as e:
return f"Read error: {e}"
# Binary files
try:
text = raw.decode("utf-8")
except UnicodeDecodeError:
return f"Binary file (not readable as text): {resolved} [{len(raw)} bytes]"
# Apply line limit
limit = min(max_lines or _MAX_LINES, _MAX_LINES)
lines = text.splitlines()
truncated = False
if len(lines) > limit:
lines = lines[:limit]
truncated = True
# Apply byte cap as a final safety net
result = "\n".join(lines)
if len(result) > _MAX_BYTES:
result = result[:_MAX_BYTES]
truncated = True
if truncated:
result += f"\n\n… [truncated — file has {len(text.splitlines())} lines total]"
return result
resolved.relative_to(_PROJECT_ROOT)
return True
except ValueError:
return False
def _is_allowed(resolved: Path) -> bool:
"""Check that resolved path is under one of the allowed roots."""
for root in _ALLOWED_ROOTS:
try:
resolved.relative_to(root)
@@ -125,12 +82,6 @@ def _is_allowed(resolved: Path) -> bool:
return False
# Write is restricted to a tighter set of paths to limit blast radius.
_WRITE_ROOTS: list[Path] = [
Path.home() / "agents_sync",
]
def _is_write_allowed(resolved: Path) -> bool:
for root in _WRITE_ROOTS:
try:
@@ -138,63 +89,321 @@ def _is_write_allowed(resolved: Path) -> bool:
return True
except ValueError:
continue
# Also allow the Cortex home/ directory (persona memory, tasks, etc.)
try:
from config import settings
cortex_home = settings.home_root()
resolved.relative_to(cortex_home)
resolved.relative_to(settings.home_root())
return True
except (ValueError, Exception):
pass
return False
async def file_list(path: str) -> str:
"""List the contents of a directory.
# ── Shared implementations ────────────────────────────────────────────────────
Returns names of files and subdirectories with type indicators (/ for dirs).
Same allow-list as file_read.
"""
return await asyncio.to_thread(_sync_file_list, path)
def _sync_file_list(path: str) -> str:
def _read_impl(path_str: str, offset: int | None, max_lines: int | None, is_allowed_fn) -> str:
try:
resolved = Path(path).expanduser().resolve()
resolved = Path(path_str).expanduser().resolve()
except Exception as e:
return f"Invalid path: {e}"
if not _is_allowed(resolved):
allowed_str = ", ".join(str(r) for r in _ALLOWED_ROOTS)
return f"Access denied: {resolved}\nAllowed directories: {allowed_str}"
if not is_allowed_fn(resolved):
return f"Access denied: {resolved}"
if not resolved.exists():
return f"File not found: {resolved}"
if not resolved.is_file():
try:
entries = sorted(resolved.iterdir())
names = [e.name + ("/" if e.is_dir() else "") for e in entries[:100]]
return f"Directory listing for {resolved}:\n" + "\n".join(names)
except Exception as e:
return f"Cannot list directory: {e}"
try:
raw = resolved.read_bytes()
except Exception as e:
return f"Read error: {e}"
try:
text = raw.decode("utf-8")
except UnicodeDecodeError:
return f"Binary file (not readable as text): {resolved} [{len(raw)} bytes]"
all_lines = text.splitlines()
total = len(all_lines)
# offset is 1-based; default = start of file
start = max(0, (offset or 1) - 1)
working = all_lines[start:]
limit = min(max_lines or _MAX_LINES, _MAX_LINES)
truncated = False
if len(working) > limit:
working = working[:limit]
truncated = True
result = "\n".join(working)
if len(result) > _MAX_BYTES:
result = result[:_MAX_BYTES]
truncated = True
end_line = start + len(working)
header = f"[Lines {start + 1}{end_line} of {total}]\n" if (start > 0 or truncated) else ""
trailer = f"\n\n… [truncated — file has {total} lines; use offset={end_line + 1} to read more]" if truncated else ""
return header + result + trailer
def _list_impl(path_str: str, is_allowed_fn) -> str:
try:
resolved = Path(path_str).expanduser().resolve()
except Exception as e:
return f"Invalid path: {e}"
if not is_allowed_fn(resolved):
return f"Access denied: {resolved}"
if not resolved.exists():
return f"Path not found: {resolved}"
if resolved.is_file():
return f"{resolved} is a file, not a directory. Use file_read to read it."
return f"{resolved} is a file. Use file_read / project_file_read to read it."
try:
entries = sorted(resolved.iterdir(), key=lambda e: (e.is_file(), e.name.lower()))
lines = []
for e in entries[:200]:
suffix = "/" if e.is_dir() else f" ({e.stat().st_size} bytes)" if e.is_file() else ""
if e.is_dir():
suffix = "/"
else:
try:
st = e.stat()
mtime = datetime.fromtimestamp(st.st_mtime).strftime("%Y-%m-%d %H:%M")
suffix = f" ({st.st_size:,} B, {mtime})"
except Exception:
suffix = ""
lines.append(f"{e.name}{suffix}")
result = "\n".join(lines)
if len(entries) > 200:
result += f"\n… ({len(entries) - 200} more entries not shown)"
result += f"\n… ({len(entries) - 200} more not shown)"
return f"Contents of {resolved}:\n\n{result}"
except Exception as e:
return f"Cannot list directory: {e}"
async def file_write(path: str, content: str, mode: str = "overwrite") -> str:
"""Write content to a file.
# ── Project-scoped tools ──────────────────────────────────────────────────────
mode: 'overwrite' (default) replaces the file; 'append' adds to the end.
Write-allowed paths: ~/agents_sync/ and the Cortex home/ directory.
Parent directories are created if they don't exist.
"""
async def project_file_read(path: str, offset: int | None = None, max_lines: int | None = None) -> str:
"""Read a file within the Cortex project directory, with optional line range."""
return await asyncio.to_thread(_read_impl, path, offset, max_lines, _is_project_allowed)
async def project_file_list(path: str) -> str:
"""List directory contents within the Cortex project directory, with sizes and timestamps."""
return await asyncio.to_thread(_list_impl, path, _is_project_allowed)
async def file_stat(path: str) -> str:
"""Return metadata for a file or directory: type, size, modified time, line count."""
return await asyncio.to_thread(_sync_file_stat, path)
def _sync_file_stat(path_str: str) -> str:
try:
resolved = Path(path_str).expanduser().resolve()
except Exception as e:
return f"Invalid path: {e}"
if not _is_project_allowed(resolved):
return f"Access denied: {resolved}\nProject root: {_PROJECT_ROOT}"
if not resolved.exists():
return f"Path not found: {resolved}"
try:
st = resolved.stat()
except Exception as e:
return f"Cannot stat: {e}"
modified = datetime.fromtimestamp(st.st_mtime).strftime("%Y-%m-%d %H:%M:%S")
lines = [
f"Path: {resolved}",
f"Type: {'directory' if resolved.is_dir() else 'file'}",
f"Size: {st.st_size:,} bytes",
f"Modified: {modified}",
]
if resolved.is_file():
try:
raw = resolved.read_bytes()
if b'\x00' not in raw[:1024]:
lines.append(f"Lines: {len(raw.decode('utf-8', errors='replace').splitlines())}")
except Exception:
pass
elif resolved.is_dir():
try:
entries = list(resolved.iterdir())
n_files = sum(1 for e in entries if e.is_file())
n_dirs = sum(1 for e in entries if e.is_dir())
lines.append(f"Contents: {n_files} file(s), {n_dirs} subdirector{'y' if n_dirs == 1 else 'ies'}")
except Exception:
pass
return "\n".join(lines)
async def file_grep(path: str, pattern: str, context_lines: int = 2, recursive: bool = True) -> str:
"""Search for a regex pattern in a file or directory, returning matching lines with context."""
return await asyncio.to_thread(_sync_file_grep, path, pattern, context_lines, recursive)
def _sync_file_grep(path_str: str, pattern: str, context_lines: int, recursive: bool) -> str:
try:
resolved = Path(path_str).expanduser().resolve()
except Exception as e:
return f"Invalid path: {e}"
if not _is_project_allowed(resolved):
return f"Access denied: {resolved}\nProject root: {_PROJECT_ROOT}"
if not resolved.exists():
return f"Path not found: {resolved}"
try:
regex = re.compile(pattern, re.IGNORECASE)
except re.error as e:
return f"Invalid regex pattern: {e}"
ctx = max(0, min(context_lines, 5))
if resolved.is_file():
files_to_search = [resolved]
elif recursive:
files_to_search = sorted(f for f in resolved.rglob("*") if f.is_file())
else:
files_to_search = sorted(f for f in resolved.iterdir() if f.is_file())
total_matches = 0
sections: list[str] = []
capped = False
for fp in files_to_search:
if total_matches >= _MAX_GREP_MATCHES:
capped = True
break
try:
raw = fp.read_bytes()
except OSError:
continue
if b'\x00' in raw[:1024]:
continue # skip binary
try:
text = raw.decode("utf-8", errors="replace")
except Exception:
continue
file_lines = text.splitlines()
match_indices = [i for i, line in enumerate(file_lines) if regex.search(line)]
if not match_indices:
continue
total_matches += len(match_indices)
try:
label = str(fp.relative_to(_PROJECT_ROOT))
except ValueError:
label = str(fp)
file_output = [f"── {label} ──"]
printed: set[int] = set()
for mi in match_indices:
start = max(0, mi - ctx)
end = min(len(file_lines), mi + ctx + 1)
if printed and start > max(printed) + 1:
file_output.append(" ···")
for j in range(start, end):
if j not in printed:
marker = "" if j == mi else " "
file_output.append(f" {j + 1:4d}{marker} {file_lines[j]}")
printed.add(j)
sections.append("\n".join(file_output))
if not sections:
return f"No matches for '{pattern}' in {resolved}"
cap_note = f" (capped at {_MAX_GREP_MATCHES})" if capped else ""
header = f"grep '{pattern}'{total_matches} match(es){cap_note}:"
return header + "\n\n" + "\n\n".join(sections)
async def file_syntax_check(path: str) -> str:
"""Check syntax of a Python (.py) or JSON (.json) file."""
return await asyncio.to_thread(_sync_file_syntax_check, path)
def _sync_file_syntax_check(path_str: str) -> str:
try:
resolved = Path(path_str).expanduser().resolve()
except Exception as e:
return f"Invalid path: {e}"
if not _is_project_allowed(resolved):
return f"Access denied: {resolved}\nProject root: {_PROJECT_ROOT}"
if not resolved.exists():
return f"File not found: {resolved}"
if not resolved.is_file():
return f"Not a file: {resolved}"
suffix = resolved.suffix.lower()
if suffix == ".py":
try:
result = subprocess.run(
["python3", "-m", "py_compile", str(resolved)],
capture_output=True, text=True, timeout=15,
)
if result.returncode == 0:
return f"OK — {resolved.name}: syntax valid"
err = (result.stderr or result.stdout).strip()
return f"Syntax error in {resolved.name}:\n{err}"
except subprocess.TimeoutExpired:
return f"Timeout running py_compile on {resolved.name}"
except Exception as e:
return f"Error: {e}"
elif suffix == ".json":
try:
text = resolved.read_text(encoding="utf-8")
json.loads(text)
return f"OK — {resolved.name}: valid JSON"
except json.JSONDecodeError as e:
return f"JSON error in {resolved.name}: {e}"
except Exception as e:
return f"Error reading {resolved.name}: {e}"
else:
return f"Syntax check not supported for '{suffix}' files. Supported: .py, .json"
# ── System-scoped tools ───────────────────────────────────────────────────────
async def file_read(path: str, offset: int | None = None, max_lines: int | None = None) -> str:
"""Read a local file from the broader system. Allowed: ~/agents_sync/, ~/OSIT_dev/, etc. ADMIN ONLY."""
return await asyncio.to_thread(_read_impl, path, offset, max_lines, _is_allowed)
async def file_list(path: str) -> str:
"""List directory contents from the broader system. ADMIN ONLY."""
return await asyncio.to_thread(_list_impl, path, _is_allowed)
async def file_write(path: str, content: str, mode: str = "overwrite") -> str:
"""Write or append content to a file. Write roots: ~/agents_sync/ and Cortex home/. ADMIN ONLY."""
return await asyncio.to_thread(_sync_file_write, path, content, mode)
@@ -227,16 +436,13 @@ def _sync_file_write(path: str, content: str, mode: str) -> str:
return f"Write error: {e}"
# ── Session tools ─────────────────────────────────────────────────────────────
_SEARCH_EXCERPT_CHARS = 150
async def session_read(date: str) -> str:
"""Read a full session log by date (YYYY-MM-DD).
Returns the complete session log for that date. If the date is not found,
lists the most recent available dates instead.
Only reads the current user's own sessions (per-persona isolation via ContextVars).
"""
"""Read a full session log by date (YYYY-MM-DD)."""
return await asyncio.to_thread(_sync_session_read, date.strip())
@@ -259,11 +465,7 @@ def _sync_session_read(date: str) -> str:
async def session_search(query: str, limit: int = 5) -> str:
"""Search past session logs for a keyword or phrase.
Returns up to `limit` matching excerpts with session dates, newest first.
Only searches the current user's own sessions (per-persona isolation via ContextVars).
"""
"""Search past session logs for a keyword or phrase."""
return await asyncio.to_thread(_sync_session_search, query, limit)
@@ -299,27 +501,152 @@ def _sync_session_search(query: str, limit: int) -> str:
if not matches:
return f"No matches for '{query}' across {len(session_files)} session logs."
header = f"Session search: '{query}'{len(matches)} match(es) across {len(session_files)} logs\n"
return header + "\n\n".join(matches)
# ── Declarations ──────────────────────────────────────────────────────────────
DECLARATIONS = [
# Project-scoped
types.FunctionDeclaration(
name="file_read",
name="project_file_read",
description=(
"Read a local file and return its contents. "
"Allowed directories: ~/agents_sync/, ~/OSIT_dev/, ~/DgrZone_Nextcloud/, ~/OSIT_Nextcloud/, "
"and the Cortex home/ directory (persona memory, tool audit logs, etc.). "
"Use this to read documentation, notes, CLAUDE.md files, config references, "
"or tool audit logs at home/{user}/tool_audit/YYYY-MM-DD.jsonl. "
"If given a directory path, returns a directory listing instead."
"Read a file within the Cortex project directory (source code, docs, config, persona files). "
"Supports reading a specific line range via offset — use to page through large files "
"without re-reading from the top. If given a directory path, returns a listing instead. "
"Project root: ~/agents_sync/projects/Cortex_and_Inara_dev/"
),
parameters=types.Schema(
type=types.Type.OBJECT,
properties={
"path": types.Schema(type=types.Type.STRING, description="Absolute or home-relative path to the file (e.g. ~/agents_sync/CLAUDE.md or /home/scott/agents_sync/tasks/01_todo/)"),
"max_lines": types.Schema(type=types.Type.INTEGER, description="Optional line limit (default 500)"),
"path": types.Schema(
type=types.Type.STRING,
description="Absolute or ~/... path to the file",
),
"offset": types.Schema(
type=types.Type.INTEGER,
description="Start reading from this line number (1-based). Omit to read from the top.",
),
"max_lines": types.Schema(
type=types.Type.INTEGER,
description="Maximum lines to return (default 500)",
),
},
required=["path"],
),
),
types.FunctionDeclaration(
name="project_file_list",
description=(
"List files and subdirectories within the Cortex project directory. "
"Shows file sizes and modified timestamps. "
"Project root: ~/agents_sync/projects/Cortex_and_Inara_dev/"
),
parameters=types.Schema(
type=types.Type.OBJECT,
properties={
"path": types.Schema(
type=types.Type.STRING,
description="Absolute or ~/... path to the directory",
),
},
required=["path"],
),
),
types.FunctionDeclaration(
name="file_stat",
description=(
"Get metadata for a file or directory: type, size, modified timestamp, line count (for text files) "
"or entry counts (for directories). Use before reading to check recency or size. "
"Restricted to the Cortex project directory."
),
parameters=types.Schema(
type=types.Type.OBJECT,
properties={
"path": types.Schema(
type=types.Type.STRING,
description="Absolute or ~/... path to the file or directory",
),
},
required=["path"],
),
),
types.FunctionDeclaration(
name="file_grep",
description=(
"Search for a regex pattern in a file or directory, returning matching lines with surrounding "
"context. Much more efficient than reading an entire source file — use this to find function "
"definitions, variable names, TODO comments, imports, error strings, etc. "
"Searches recursively by default. Capped at 50 matches. Skips binary files. "
"Case-insensitive. Restricted to the Cortex project directory."
),
parameters=types.Schema(
type=types.Type.OBJECT,
properties={
"path": types.Schema(
type=types.Type.STRING,
description="File or directory to search (e.g. ~/agents_sync/projects/Cortex_and_Inara_dev/cortex/)",
),
"pattern": types.Schema(
type=types.Type.STRING,
description="Regex pattern to search for (case-insensitive). Examples: 'def ha_', 'import httpx', 'TODO'",
),
"context_lines": types.Schema(
type=types.Type.INTEGER,
description="Lines of context before/after each match (default 2, max 5)",
),
"recursive": types.Schema(
type=types.Type.BOOLEAN,
description="Search subdirectories recursively (default true)",
),
},
required=["path", "pattern"],
),
),
types.FunctionDeclaration(
name="file_syntax_check",
description=(
"Check the syntax of a Python (.py) or JSON (.json) file without executing it. "
"Returns OK or the error with line number. "
"Use after editing a file before restarting Cortex. "
"Restricted to the Cortex project directory."
),
parameters=types.Schema(
type=types.Type.OBJECT,
properties={
"path": types.Schema(
type=types.Type.STRING,
description="Path to the .py or .json file to check",
),
},
required=["path"],
),
),
# System-scoped
types.FunctionDeclaration(
name="file_read",
description=(
"Read a local file from the broader system (~/agents_sync/, ~/OSIT_dev/, ~/DgrZone_Nextcloud/, "
"~/OSIT_Nextcloud/, Cortex home/). Supports offset for reading specific line ranges. "
"For files within the Cortex project, prefer project_file_read instead. "
"ADMIN ONLY."
),
parameters=types.Schema(
type=types.Type.OBJECT,
properties={
"path": types.Schema(
type=types.Type.STRING,
description="Absolute or ~/... path to the file",
),
"offset": types.Schema(
type=types.Type.INTEGER,
description="Start reading from this line number (1-based)",
),
"max_lines": types.Schema(
type=types.Type.INTEGER,
description="Maximum lines to return (default 500)",
),
},
required=["path"],
),
@@ -327,14 +654,18 @@ DECLARATIONS = [
types.FunctionDeclaration(
name="file_list",
description=(
"List the files and subdirectories in a directory. "
"Allowed paths: ~/agents_sync/, ~/OSIT_dev/, ~/DgrZone_Nextcloud/, ~/OSIT_Nextcloud/. "
"List files and subdirectories from the broader system. "
"Shows sizes and modified timestamps. "
"Allowed: ~/agents_sync/, ~/OSIT_dev/, ~/DgrZone_Nextcloud/, ~/OSIT_Nextcloud/. "
"ADMIN ONLY."
),
parameters=types.Schema(
type=types.Type.OBJECT,
properties={
"path": types.Schema(type=types.Type.STRING, description="Absolute or home-relative path to the directory"),
"path": types.Schema(
type=types.Type.STRING,
description="Absolute or ~/... path to the directory",
),
},
required=["path"],
),
@@ -350,9 +681,18 @@ DECLARATIONS = [
parameters=types.Schema(
type=types.Type.OBJECT,
properties={
"path": types.Schema(type=types.Type.STRING, description="Absolute or home-relative path to write to"),
"content": types.Schema(type=types.Type.STRING, description="Content to write"),
"mode": types.Schema(type=types.Type.STRING, description="'overwrite' (default, replaces file) or 'append' (adds to end)"),
"path": types.Schema(
type=types.Type.STRING,
description="Absolute or ~/... path to write to",
),
"content": types.Schema(
type=types.Type.STRING,
description="Content to write",
),
"mode": types.Schema(
type=types.Type.STRING,
description="'overwrite' (default, replaces file) or 'append' (adds to end)",
),
},
required=["path", "content"],
),
@@ -360,15 +700,18 @@ DECLARATIONS = [
types.FunctionDeclaration(
name="session_read",
description=(
"Read a full session log by date (YYYY-MM-DD). Returns the complete conversation "
"from that session — useful for continuity, recalling decisions, or reviewing "
"what was discussed on a specific day. If the date is not found, lists available dates. "
"Read a full conversation session log by date (YYYY-MM-DD). "
"Useful for continuity and recalling past decisions. "
"If the date is not found, lists available dates. "
"Only reads this user's own sessions."
),
parameters=types.Schema(
type=types.Type.OBJECT,
properties={
"date": types.Schema(type=types.Type.STRING, description="Date in YYYY-MM-DD format (e.g. '2026-05-08')"),
"date": types.Schema(
type=types.Type.STRING,
description="Date in YYYY-MM-DD format (e.g. '2026-05-08')",
),
},
required=["date"],
),
@@ -377,16 +720,20 @@ DECLARATIONS = [
name="session_search",
description=(
"Search past conversation session logs for a keyword or phrase. "
"Use this to recall what was discussed in previous sessions — "
"e.g. 'what did we decide about X?', 'when did we set up Y?'. "
"Returns matching excerpts with session dates, newest first. "
"Only searches this user's own sessions."
),
parameters=types.Schema(
type=types.Type.OBJECT,
properties={
"query": types.Schema(type=types.Type.STRING, description="Keyword or phrase to search for"),
"limit": types.Schema(type=types.Type.INTEGER, description="Max results to return (default 5, max 20)"),
"query": types.Schema(
type=types.Type.STRING,
description="Keyword or phrase to search for",
),
"limit": types.Schema(
type=types.Type.INTEGER,
description="Max results to return (default 5, max 20)",
),
},
required=["query"],
),