diff --git a/cortex/tools/__init__.py b/cortex/tools/__init__.py index f275ea5..11f8dac 100644 --- a/cortex/tools/__init__.py +++ b/cortex/tools/__init__.py @@ -30,7 +30,18 @@ from tools.ae_knowledge import ( journal_entry_prepend as _ae_journal_entry_prepend, ) from tools.ae_tasks import task_list as _ae_task_list -from tools.files import file_read as _file_read, file_list as _file_list, file_write as _file_write, session_search as _session_search, session_read as _session_read +from tools.files import ( + project_file_read as _project_file_read, + project_file_list as _project_file_list, + file_stat as _file_stat, + file_grep as _file_grep, + file_syntax_check as _file_syntax_check, + file_read as _file_read, + file_list as _file_list, + file_write as _file_write, + session_read as _session_read, + session_search as _session_search, +) from tools.system import ( shell_exec as _shell_exec, claude_allow_dir as _claude_allow_dir, @@ -97,7 +108,8 @@ import tools.homeassistant as _mod_homeassistant TOOL_CATEGORIES: dict[str, list[str]] = { "Web": ["web_search", "http_fetch", "web_read", "http_post"], - "Files": ["file_read", "file_list", "file_write", "session_read", "session_search"], + "Project Files": ["project_file_read", "project_file_list", "file_stat", "file_grep", "file_syntax_check"], + "System Files": ["file_read", "file_list", "file_write", "session_read", "session_search"], "Shell": ["shell_exec", "claude_allow_dir"], "System": ["cortex_restart", "cortex_logs", "cortex_status", "cortex_update"], "Tasks": ["task_list", "task_create", "task_update", "task_complete"], @@ -135,6 +147,11 @@ _CALLABLES: dict[str, callable] = { "ae_journal_entry_append": _ae_journal_entry_append, "ae_journal_entry_prepend": _ae_journal_entry_prepend, "ae_task_list": _ae_task_list, + "project_file_read": _project_file_read, + "project_file_list": _project_file_list, + "file_stat": _file_stat, + "file_grep": _file_grep, + "file_syntax_check": _file_syntax_check, "file_read": _file_read, "file_list": _file_list, "file_write": _file_write, diff --git a/cortex/tools/files.py b/cortex/tools/files.py index 6493ac1..ca95288 100644 --- a/cortex/tools/files.py +++ b/cortex/tools/files.py @@ -1,23 +1,44 @@ """ -File read/write/search tools — restricted to known-safe directory roots. +File read/write/search tools — two access scopes. -Lets the orchestrator read local files (documentation, notes, config references) -and search past session logs without exposing arbitrary filesystem access. -All paths are resolved and checked against an allowlist of roots before any -read or write is performed. + Project scope (no admin required): + project_file_read — read a file with optional line-range (offset) + project_file_list — list a directory with sizes + timestamps + file_stat — size, modified time, line count for a path + file_grep — regex search with context lines; up to 50 matches + file_syntax_check — py_compile (.py) or json.loads (.json) check + + System scope (admin-only): + file_read — read a file from ~/agents_sync/, ~/OSIT_dev/, etc. + file_list — list a directory (same roots) + file_write — write/append (~/agents_sync/ + Cortex home/) + + Session tools (user-level, persona-isolated): + session_read — read a session log by date + session_search — keyword search across session logs + +All project-scope tools are restricted to the Cortex project root: + ~/agents_sync/projects/Cortex_and_Inara_dev/ """ import asyncio +import json import logging import re +import subprocess +from datetime import datetime from pathlib import Path from google.genai import types logger = logging.getLogger(__name__) -# Directories the orchestrator is allowed to read from. -# Paths are resolved (symlinks followed, ~ expanded) at import time. +# ── Access roots ────────────────────────────────────────────────────────────── + +# Project root: two levels up from cortex/tools/files.py → Cortex_and_Inara_dev/ +_PROJECT_ROOT: Path = Path(__file__).parent.parent.parent.resolve() + +# System-wide read roots def _build_allowed_roots() -> list[Path]: roots = [ Path.home() / "agents_sync", @@ -34,88 +55,24 @@ def _build_allowed_roots() -> list[Path]: _ALLOWED_ROOTS: list[Path] = _build_allowed_roots() -# Hard cap on file size to prevent accidental context blowout -_MAX_BYTES = 50_000 # ~50 KB -_MAX_LINES = 500 +# Write is tighter +_WRITE_ROOTS: list[Path] = [Path.home() / "agents_sync"] + +# Size limits +_MAX_BYTES = 50_000 +_MAX_LINES = 500 +_MAX_GREP_MATCHES = 50 -async def file_read(path: str, max_lines: int | None = None) -> str: - """Read a local file and return its contents as a string. - - Only files within allowed directories can be read: - ~/agents_sync/, ~/OSIT_dev/, ~/DgrZone_Nextcloud/, ~/OSIT_Nextcloud/ - - Args: - path: Absolute or home-relative path to the file (e.g. ~/agents_sync/CLAUDE.md). - max_lines: Optional line limit (default 500, hard cap). Use for large files. - - Returns the file contents (truncated if over the size limit), or an error message. - """ - return await asyncio.to_thread(_sync_file_read, path, max_lines) - - -def _sync_file_read(path: str, max_lines: int | None) -> str: - # Expand ~ and resolve to absolute path +def _is_project_allowed(resolved: Path) -> bool: try: - resolved = Path(path).expanduser().resolve() - except Exception as e: - return f"Invalid path: {e}" - - # Security check — must be under an allowed root - if not _is_allowed(resolved): - allowed_str = ", ".join(str(r) for r in _ALLOWED_ROOTS) - return ( - f"Access denied: {resolved}\n" - f"Allowed directories: {allowed_str}" - ) - - if not resolved.exists(): - return f"File not found: {resolved}" - - if not resolved.is_file(): - # If it's a directory, list its contents instead - try: - entries = sorted(resolved.iterdir()) - names = [e.name + ("/" if e.is_dir() else "") for e in entries[:100]] - return f"Directory listing for {resolved}:\n" + "\n".join(names) - except Exception as e: - return f"Cannot list directory: {e}" - - # Read the file - try: - raw = resolved.read_bytes() - except Exception as e: - return f"Read error: {e}" - - # Binary files - try: - text = raw.decode("utf-8") - except UnicodeDecodeError: - return f"Binary file (not readable as text): {resolved} [{len(raw)} bytes]" - - # Apply line limit - limit = min(max_lines or _MAX_LINES, _MAX_LINES) - lines = text.splitlines() - truncated = False - - if len(lines) > limit: - lines = lines[:limit] - truncated = True - - # Apply byte cap as a final safety net - result = "\n".join(lines) - if len(result) > _MAX_BYTES: - result = result[:_MAX_BYTES] - truncated = True - - if truncated: - result += f"\n\n… [truncated — file has {len(text.splitlines())} lines total]" - - return result + resolved.relative_to(_PROJECT_ROOT) + return True + except ValueError: + return False def _is_allowed(resolved: Path) -> bool: - """Check that resolved path is under one of the allowed roots.""" for root in _ALLOWED_ROOTS: try: resolved.relative_to(root) @@ -125,12 +82,6 @@ def _is_allowed(resolved: Path) -> bool: return False -# Write is restricted to a tighter set of paths to limit blast radius. -_WRITE_ROOTS: list[Path] = [ - Path.home() / "agents_sync", -] - - def _is_write_allowed(resolved: Path) -> bool: for root in _WRITE_ROOTS: try: @@ -138,63 +89,321 @@ def _is_write_allowed(resolved: Path) -> bool: return True except ValueError: continue - # Also allow the Cortex home/ directory (persona memory, tasks, etc.) try: from config import settings - cortex_home = settings.home_root() - resolved.relative_to(cortex_home) + resolved.relative_to(settings.home_root()) return True except (ValueError, Exception): pass return False -async def file_list(path: str) -> str: - """List the contents of a directory. +# ── Shared implementations ──────────────────────────────────────────────────── - Returns names of files and subdirectories with type indicators (/ for dirs). - Same allow-list as file_read. - """ - return await asyncio.to_thread(_sync_file_list, path) - - -def _sync_file_list(path: str) -> str: +def _read_impl(path_str: str, offset: int | None, max_lines: int | None, is_allowed_fn) -> str: try: - resolved = Path(path).expanduser().resolve() + resolved = Path(path_str).expanduser().resolve() except Exception as e: return f"Invalid path: {e}" - if not _is_allowed(resolved): - allowed_str = ", ".join(str(r) for r in _ALLOWED_ROOTS) - return f"Access denied: {resolved}\nAllowed directories: {allowed_str}" + if not is_allowed_fn(resolved): + return f"Access denied: {resolved}" + + if not resolved.exists(): + return f"File not found: {resolved}" + + if not resolved.is_file(): + try: + entries = sorted(resolved.iterdir()) + names = [e.name + ("/" if e.is_dir() else "") for e in entries[:100]] + return f"Directory listing for {resolved}:\n" + "\n".join(names) + except Exception as e: + return f"Cannot list directory: {e}" + + try: + raw = resolved.read_bytes() + except Exception as e: + return f"Read error: {e}" + + try: + text = raw.decode("utf-8") + except UnicodeDecodeError: + return f"Binary file (not readable as text): {resolved} [{len(raw)} bytes]" + + all_lines = text.splitlines() + total = len(all_lines) + + # offset is 1-based; default = start of file + start = max(0, (offset or 1) - 1) + working = all_lines[start:] + + limit = min(max_lines or _MAX_LINES, _MAX_LINES) + truncated = False + if len(working) > limit: + working = working[:limit] + truncated = True + + result = "\n".join(working) + if len(result) > _MAX_BYTES: + result = result[:_MAX_BYTES] + truncated = True + + end_line = start + len(working) + header = f"[Lines {start + 1}–{end_line} of {total}]\n" if (start > 0 or truncated) else "" + trailer = f"\n\n… [truncated — file has {total} lines; use offset={end_line + 1} to read more]" if truncated else "" + + return header + result + trailer + + +def _list_impl(path_str: str, is_allowed_fn) -> str: + try: + resolved = Path(path_str).expanduser().resolve() + except Exception as e: + return f"Invalid path: {e}" + + if not is_allowed_fn(resolved): + return f"Access denied: {resolved}" if not resolved.exists(): return f"Path not found: {resolved}" if resolved.is_file(): - return f"{resolved} is a file, not a directory. Use file_read to read it." + return f"{resolved} is a file. Use file_read / project_file_read to read it." try: entries = sorted(resolved.iterdir(), key=lambda e: (e.is_file(), e.name.lower())) lines = [] for e in entries[:200]: - suffix = "/" if e.is_dir() else f" ({e.stat().st_size} bytes)" if e.is_file() else "" + if e.is_dir(): + suffix = "/" + else: + try: + st = e.stat() + mtime = datetime.fromtimestamp(st.st_mtime).strftime("%Y-%m-%d %H:%M") + suffix = f" ({st.st_size:,} B, {mtime})" + except Exception: + suffix = "" lines.append(f"{e.name}{suffix}") result = "\n".join(lines) if len(entries) > 200: - result += f"\n… ({len(entries) - 200} more entries not shown)" + result += f"\n… ({len(entries) - 200} more not shown)" return f"Contents of {resolved}:\n\n{result}" except Exception as e: return f"Cannot list directory: {e}" -async def file_write(path: str, content: str, mode: str = "overwrite") -> str: - """Write content to a file. +# ── Project-scoped tools ────────────────────────────────────────────────────── - mode: 'overwrite' (default) replaces the file; 'append' adds to the end. - Write-allowed paths: ~/agents_sync/ and the Cortex home/ directory. - Parent directories are created if they don't exist. - """ +async def project_file_read(path: str, offset: int | None = None, max_lines: int | None = None) -> str: + """Read a file within the Cortex project directory, with optional line range.""" + return await asyncio.to_thread(_read_impl, path, offset, max_lines, _is_project_allowed) + + +async def project_file_list(path: str) -> str: + """List directory contents within the Cortex project directory, with sizes and timestamps.""" + return await asyncio.to_thread(_list_impl, path, _is_project_allowed) + + +async def file_stat(path: str) -> str: + """Return metadata for a file or directory: type, size, modified time, line count.""" + return await asyncio.to_thread(_sync_file_stat, path) + + +def _sync_file_stat(path_str: str) -> str: + try: + resolved = Path(path_str).expanduser().resolve() + except Exception as e: + return f"Invalid path: {e}" + + if not _is_project_allowed(resolved): + return f"Access denied: {resolved}\nProject root: {_PROJECT_ROOT}" + + if not resolved.exists(): + return f"Path not found: {resolved}" + + try: + st = resolved.stat() + except Exception as e: + return f"Cannot stat: {e}" + + modified = datetime.fromtimestamp(st.st_mtime).strftime("%Y-%m-%d %H:%M:%S") + lines = [ + f"Path: {resolved}", + f"Type: {'directory' if resolved.is_dir() else 'file'}", + f"Size: {st.st_size:,} bytes", + f"Modified: {modified}", + ] + + if resolved.is_file(): + try: + raw = resolved.read_bytes() + if b'\x00' not in raw[:1024]: + lines.append(f"Lines: {len(raw.decode('utf-8', errors='replace').splitlines())}") + except Exception: + pass + elif resolved.is_dir(): + try: + entries = list(resolved.iterdir()) + n_files = sum(1 for e in entries if e.is_file()) + n_dirs = sum(1 for e in entries if e.is_dir()) + lines.append(f"Contents: {n_files} file(s), {n_dirs} subdirector{'y' if n_dirs == 1 else 'ies'}") + except Exception: + pass + + return "\n".join(lines) + + +async def file_grep(path: str, pattern: str, context_lines: int = 2, recursive: bool = True) -> str: + """Search for a regex pattern in a file or directory, returning matching lines with context.""" + return await asyncio.to_thread(_sync_file_grep, path, pattern, context_lines, recursive) + + +def _sync_file_grep(path_str: str, pattern: str, context_lines: int, recursive: bool) -> str: + try: + resolved = Path(path_str).expanduser().resolve() + except Exception as e: + return f"Invalid path: {e}" + + if not _is_project_allowed(resolved): + return f"Access denied: {resolved}\nProject root: {_PROJECT_ROOT}" + + if not resolved.exists(): + return f"Path not found: {resolved}" + + try: + regex = re.compile(pattern, re.IGNORECASE) + except re.error as e: + return f"Invalid regex pattern: {e}" + + ctx = max(0, min(context_lines, 5)) + + if resolved.is_file(): + files_to_search = [resolved] + elif recursive: + files_to_search = sorted(f for f in resolved.rglob("*") if f.is_file()) + else: + files_to_search = sorted(f for f in resolved.iterdir() if f.is_file()) + + total_matches = 0 + sections: list[str] = [] + capped = False + + for fp in files_to_search: + if total_matches >= _MAX_GREP_MATCHES: + capped = True + break + try: + raw = fp.read_bytes() + except OSError: + continue + if b'\x00' in raw[:1024]: + continue # skip binary + try: + text = raw.decode("utf-8", errors="replace") + except Exception: + continue + + file_lines = text.splitlines() + match_indices = [i for i, line in enumerate(file_lines) if regex.search(line)] + if not match_indices: + continue + + total_matches += len(match_indices) + + try: + label = str(fp.relative_to(_PROJECT_ROOT)) + except ValueError: + label = str(fp) + + file_output = [f"── {label} ──"] + printed: set[int] = set() + + for mi in match_indices: + start = max(0, mi - ctx) + end = min(len(file_lines), mi + ctx + 1) + if printed and start > max(printed) + 1: + file_output.append(" ···") + for j in range(start, end): + if j not in printed: + marker = "►" if j == mi else " " + file_output.append(f" {j + 1:4d}{marker} {file_lines[j]}") + printed.add(j) + + sections.append("\n".join(file_output)) + + if not sections: + return f"No matches for '{pattern}' in {resolved}" + + cap_note = f" (capped at {_MAX_GREP_MATCHES})" if capped else "" + header = f"grep '{pattern}' — {total_matches} match(es){cap_note}:" + return header + "\n\n" + "\n\n".join(sections) + + +async def file_syntax_check(path: str) -> str: + """Check syntax of a Python (.py) or JSON (.json) file.""" + return await asyncio.to_thread(_sync_file_syntax_check, path) + + +def _sync_file_syntax_check(path_str: str) -> str: + try: + resolved = Path(path_str).expanduser().resolve() + except Exception as e: + return f"Invalid path: {e}" + + if not _is_project_allowed(resolved): + return f"Access denied: {resolved}\nProject root: {_PROJECT_ROOT}" + + if not resolved.exists(): + return f"File not found: {resolved}" + + if not resolved.is_file(): + return f"Not a file: {resolved}" + + suffix = resolved.suffix.lower() + + if suffix == ".py": + try: + result = subprocess.run( + ["python3", "-m", "py_compile", str(resolved)], + capture_output=True, text=True, timeout=15, + ) + if result.returncode == 0: + return f"OK — {resolved.name}: syntax valid" + err = (result.stderr or result.stdout).strip() + return f"Syntax error in {resolved.name}:\n{err}" + except subprocess.TimeoutExpired: + return f"Timeout running py_compile on {resolved.name}" + except Exception as e: + return f"Error: {e}" + + elif suffix == ".json": + try: + text = resolved.read_text(encoding="utf-8") + json.loads(text) + return f"OK — {resolved.name}: valid JSON" + except json.JSONDecodeError as e: + return f"JSON error in {resolved.name}: {e}" + except Exception as e: + return f"Error reading {resolved.name}: {e}" + + else: + return f"Syntax check not supported for '{suffix}' files. Supported: .py, .json" + + +# ── System-scoped tools ─────────────────────────────────────────────────────── + +async def file_read(path: str, offset: int | None = None, max_lines: int | None = None) -> str: + """Read a local file from the broader system. Allowed: ~/agents_sync/, ~/OSIT_dev/, etc. ADMIN ONLY.""" + return await asyncio.to_thread(_read_impl, path, offset, max_lines, _is_allowed) + + +async def file_list(path: str) -> str: + """List directory contents from the broader system. ADMIN ONLY.""" + return await asyncio.to_thread(_list_impl, path, _is_allowed) + + +async def file_write(path: str, content: str, mode: str = "overwrite") -> str: + """Write or append content to a file. Write roots: ~/agents_sync/ and Cortex home/. ADMIN ONLY.""" return await asyncio.to_thread(_sync_file_write, path, content, mode) @@ -227,16 +436,13 @@ def _sync_file_write(path: str, content: str, mode: str) -> str: return f"Write error: {e}" +# ── Session tools ───────────────────────────────────────────────────────────── + _SEARCH_EXCERPT_CHARS = 150 async def session_read(date: str) -> str: - """Read a full session log by date (YYYY-MM-DD). - - Returns the complete session log for that date. If the date is not found, - lists the most recent available dates instead. - Only reads the current user's own sessions (per-persona isolation via ContextVars). - """ + """Read a full session log by date (YYYY-MM-DD).""" return await asyncio.to_thread(_sync_session_read, date.strip()) @@ -259,11 +465,7 @@ def _sync_session_read(date: str) -> str: async def session_search(query: str, limit: int = 5) -> str: - """Search past session logs for a keyword or phrase. - - Returns up to `limit` matching excerpts with session dates, newest first. - Only searches the current user's own sessions (per-persona isolation via ContextVars). - """ + """Search past session logs for a keyword or phrase.""" return await asyncio.to_thread(_sync_session_search, query, limit) @@ -273,7 +475,7 @@ def _sync_session_search(query: str, limit: int) -> str: if not sessions_dir.exists(): return "No session logs found." - limit = max(1, min(limit, 20)) + limit = max(1, min(limit, 20)) pattern = re.compile(re.escape(query), re.IGNORECASE) session_files = sorted(sessions_dir.glob("*.md"), reverse=True) @@ -288,8 +490,8 @@ def _sync_session_search(query: str, limit: int) -> str: for m in pattern.finditer(text): if len(matches) >= limit: break - start = max(0, m.start() - _SEARCH_EXCERPT_CHARS) - end = min(len(text), m.end() + _SEARCH_EXCERPT_CHARS) + start = max(0, m.start() - _SEARCH_EXCERPT_CHARS) + end = min(len(text), m.end() + _SEARCH_EXCERPT_CHARS) excerpt = text[start:end].strip() if start > 0: excerpt = "…" + excerpt @@ -299,27 +501,152 @@ def _sync_session_search(query: str, limit: int) -> str: if not matches: return f"No matches for '{query}' across {len(session_files)} session logs." - header = f"Session search: '{query}' — {len(matches)} match(es) across {len(session_files)} logs\n" return header + "\n\n".join(matches) +# ── Declarations ────────────────────────────────────────────────────────────── + DECLARATIONS = [ + # Project-scoped types.FunctionDeclaration( - name="file_read", + name="project_file_read", description=( - "Read a local file and return its contents. " - "Allowed directories: ~/agents_sync/, ~/OSIT_dev/, ~/DgrZone_Nextcloud/, ~/OSIT_Nextcloud/, " - "and the Cortex home/ directory (persona memory, tool audit logs, etc.). " - "Use this to read documentation, notes, CLAUDE.md files, config references, " - "or tool audit logs at home/{user}/tool_audit/YYYY-MM-DD.jsonl. " - "If given a directory path, returns a directory listing instead." + "Read a file within the Cortex project directory (source code, docs, config, persona files). " + "Supports reading a specific line range via offset — use to page through large files " + "without re-reading from the top. If given a directory path, returns a listing instead. " + "Project root: ~/agents_sync/projects/Cortex_and_Inara_dev/" ), parameters=types.Schema( type=types.Type.OBJECT, properties={ - "path": types.Schema(type=types.Type.STRING, description="Absolute or home-relative path to the file (e.g. ~/agents_sync/CLAUDE.md or /home/scott/agents_sync/tasks/01_todo/)"), - "max_lines": types.Schema(type=types.Type.INTEGER, description="Optional line limit (default 500)"), + "path": types.Schema( + type=types.Type.STRING, + description="Absolute or ~/... path to the file", + ), + "offset": types.Schema( + type=types.Type.INTEGER, + description="Start reading from this line number (1-based). Omit to read from the top.", + ), + "max_lines": types.Schema( + type=types.Type.INTEGER, + description="Maximum lines to return (default 500)", + ), + }, + required=["path"], + ), + ), + types.FunctionDeclaration( + name="project_file_list", + description=( + "List files and subdirectories within the Cortex project directory. " + "Shows file sizes and modified timestamps. " + "Project root: ~/agents_sync/projects/Cortex_and_Inara_dev/" + ), + parameters=types.Schema( + type=types.Type.OBJECT, + properties={ + "path": types.Schema( + type=types.Type.STRING, + description="Absolute or ~/... path to the directory", + ), + }, + required=["path"], + ), + ), + types.FunctionDeclaration( + name="file_stat", + description=( + "Get metadata for a file or directory: type, size, modified timestamp, line count (for text files) " + "or entry counts (for directories). Use before reading to check recency or size. " + "Restricted to the Cortex project directory." + ), + parameters=types.Schema( + type=types.Type.OBJECT, + properties={ + "path": types.Schema( + type=types.Type.STRING, + description="Absolute or ~/... path to the file or directory", + ), + }, + required=["path"], + ), + ), + types.FunctionDeclaration( + name="file_grep", + description=( + "Search for a regex pattern in a file or directory, returning matching lines with surrounding " + "context. Much more efficient than reading an entire source file — use this to find function " + "definitions, variable names, TODO comments, imports, error strings, etc. " + "Searches recursively by default. Capped at 50 matches. Skips binary files. " + "Case-insensitive. Restricted to the Cortex project directory." + ), + parameters=types.Schema( + type=types.Type.OBJECT, + properties={ + "path": types.Schema( + type=types.Type.STRING, + description="File or directory to search (e.g. ~/agents_sync/projects/Cortex_and_Inara_dev/cortex/)", + ), + "pattern": types.Schema( + type=types.Type.STRING, + description="Regex pattern to search for (case-insensitive). Examples: 'def ha_', 'import httpx', 'TODO'", + ), + "context_lines": types.Schema( + type=types.Type.INTEGER, + description="Lines of context before/after each match (default 2, max 5)", + ), + "recursive": types.Schema( + type=types.Type.BOOLEAN, + description="Search subdirectories recursively (default true)", + ), + }, + required=["path", "pattern"], + ), + ), + types.FunctionDeclaration( + name="file_syntax_check", + description=( + "Check the syntax of a Python (.py) or JSON (.json) file without executing it. " + "Returns OK or the error with line number. " + "Use after editing a file before restarting Cortex. " + "Restricted to the Cortex project directory." + ), + parameters=types.Schema( + type=types.Type.OBJECT, + properties={ + "path": types.Schema( + type=types.Type.STRING, + description="Path to the .py or .json file to check", + ), + }, + required=["path"], + ), + ), + # System-scoped + types.FunctionDeclaration( + name="file_read", + description=( + "Read a local file from the broader system (~/agents_sync/, ~/OSIT_dev/, ~/DgrZone_Nextcloud/, " + "~/OSIT_Nextcloud/, Cortex home/). Supports offset for reading specific line ranges. " + "For files within the Cortex project, prefer project_file_read instead. " + "ADMIN ONLY." + ), + parameters=types.Schema( + type=types.Type.OBJECT, + properties={ + "path": types.Schema( + type=types.Type.STRING, + description="Absolute or ~/... path to the file", + ), + "offset": types.Schema( + type=types.Type.INTEGER, + description="Start reading from this line number (1-based)", + ), + "max_lines": types.Schema( + type=types.Type.INTEGER, + description="Maximum lines to return (default 500)", + ), }, required=["path"], ), @@ -327,14 +654,18 @@ DECLARATIONS = [ types.FunctionDeclaration( name="file_list", description=( - "List the files and subdirectories in a directory. " - "Allowed paths: ~/agents_sync/, ~/OSIT_dev/, ~/DgrZone_Nextcloud/, ~/OSIT_Nextcloud/. " + "List files and subdirectories from the broader system. " + "Shows sizes and modified timestamps. " + "Allowed: ~/agents_sync/, ~/OSIT_dev/, ~/DgrZone_Nextcloud/, ~/OSIT_Nextcloud/. " "ADMIN ONLY." ), parameters=types.Schema( type=types.Type.OBJECT, properties={ - "path": types.Schema(type=types.Type.STRING, description="Absolute or home-relative path to the directory"), + "path": types.Schema( + type=types.Type.STRING, + description="Absolute or ~/... path to the directory", + ), }, required=["path"], ), @@ -350,9 +681,18 @@ DECLARATIONS = [ parameters=types.Schema( type=types.Type.OBJECT, properties={ - "path": types.Schema(type=types.Type.STRING, description="Absolute or home-relative path to write to"), - "content": types.Schema(type=types.Type.STRING, description="Content to write"), - "mode": types.Schema(type=types.Type.STRING, description="'overwrite' (default, replaces file) or 'append' (adds to end)"), + "path": types.Schema( + type=types.Type.STRING, + description="Absolute or ~/... path to write to", + ), + "content": types.Schema( + type=types.Type.STRING, + description="Content to write", + ), + "mode": types.Schema( + type=types.Type.STRING, + description="'overwrite' (default, replaces file) or 'append' (adds to end)", + ), }, required=["path", "content"], ), @@ -360,15 +700,18 @@ DECLARATIONS = [ types.FunctionDeclaration( name="session_read", description=( - "Read a full session log by date (YYYY-MM-DD). Returns the complete conversation " - "from that session — useful for continuity, recalling decisions, or reviewing " - "what was discussed on a specific day. If the date is not found, lists available dates. " + "Read a full conversation session log by date (YYYY-MM-DD). " + "Useful for continuity and recalling past decisions. " + "If the date is not found, lists available dates. " "Only reads this user's own sessions." ), parameters=types.Schema( type=types.Type.OBJECT, properties={ - "date": types.Schema(type=types.Type.STRING, description="Date in YYYY-MM-DD format (e.g. '2026-05-08')"), + "date": types.Schema( + type=types.Type.STRING, + description="Date in YYYY-MM-DD format (e.g. '2026-05-08')", + ), }, required=["date"], ), @@ -377,16 +720,20 @@ DECLARATIONS = [ name="session_search", description=( "Search past conversation session logs for a keyword or phrase. " - "Use this to recall what was discussed in previous sessions — " - "e.g. 'what did we decide about X?', 'when did we set up Y?'. " "Returns matching excerpts with session dates, newest first. " "Only searches this user's own sessions." ), parameters=types.Schema( type=types.Type.OBJECT, properties={ - "query": types.Schema(type=types.Type.STRING, description="Keyword or phrase to search for"), - "limit": types.Schema(type=types.Type.INTEGER, description="Max results to return (default 5, max 20)"), + "query": types.Schema( + type=types.Type.STRING, + description="Keyword or phrase to search for", + ), + "limit": types.Schema( + type=types.Type.INTEGER, + description="Max results to return (default 5, max 20)", + ), }, required=["query"], ),