Runs diff -u on two project-scoped files. Low risk, no admin required. Covers code review, config comparison, and before/after verification. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
805 lines
29 KiB
Python
805 lines
29 KiB
Python
"""
|
||
File read/write/search tools — two access scopes.
|
||
|
||
Project scope (no admin required):
|
||
project_file_read — read a file with optional line-range (offset)
|
||
project_file_list — list a directory with sizes + timestamps
|
||
file_stat — size, modified time, line count for a path
|
||
file_grep — regex search with context lines; up to 50 matches
|
||
file_syntax_check — py_compile (.py) or json.loads (.json) check
|
||
|
||
System scope (admin-only):
|
||
file_read — read a file from ~/agents_sync/, ~/OSIT_dev/, etc.
|
||
file_list — list a directory (same roots)
|
||
file_write — write/append (~/agents_sync/ + Cortex home/)
|
||
|
||
Session tools (user-level, persona-isolated):
|
||
session_read — read a session log by date
|
||
session_search — keyword search across session logs
|
||
|
||
All project-scope tools are restricted to the Cortex project root:
|
||
~/agents_sync/projects/Cortex_and_Inara_dev/
|
||
"""
|
||
|
||
import asyncio
|
||
import json
|
||
import logging
|
||
import re
|
||
import subprocess
|
||
from datetime import datetime
|
||
from pathlib import Path
|
||
|
||
from google.genai import types
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
# ── Access roots ──────────────────────────────────────────────────────────────
|
||
|
||
# Project root: two levels up from cortex/tools/files.py → Cortex_and_Inara_dev/
|
||
_PROJECT_ROOT: Path = Path(__file__).parent.parent.parent.resolve()
|
||
|
||
# System-wide read roots
|
||
def _build_allowed_roots() -> list[Path]:
|
||
roots = [
|
||
Path.home() / "agents_sync",
|
||
Path.home() / "OSIT_dev",
|
||
Path.home() / "DgrZone_Nextcloud",
|
||
Path.home() / "OSIT_Nextcloud",
|
||
]
|
||
try:
|
||
from config import settings
|
||
roots.append(settings.home_root())
|
||
except Exception:
|
||
pass
|
||
return roots
|
||
|
||
_ALLOWED_ROOTS: list[Path] = _build_allowed_roots()
|
||
|
||
# Write is tighter
|
||
_WRITE_ROOTS: list[Path] = [Path.home() / "agents_sync"]
|
||
|
||
# Size limits
|
||
_MAX_BYTES = 50_000
|
||
_MAX_LINES = 500
|
||
_MAX_GREP_MATCHES = 50
|
||
|
||
|
||
def _is_project_allowed(resolved: Path) -> bool:
|
||
try:
|
||
resolved.relative_to(_PROJECT_ROOT)
|
||
return True
|
||
except ValueError:
|
||
return False
|
||
|
||
|
||
def _is_allowed(resolved: Path) -> bool:
|
||
for root in _ALLOWED_ROOTS:
|
||
try:
|
||
resolved.relative_to(root)
|
||
return True
|
||
except ValueError:
|
||
continue
|
||
return False
|
||
|
||
|
||
def _is_write_allowed(resolved: Path) -> bool:
|
||
for root in _WRITE_ROOTS:
|
||
try:
|
||
resolved.relative_to(root)
|
||
return True
|
||
except ValueError:
|
||
continue
|
||
try:
|
||
from config import settings
|
||
resolved.relative_to(settings.home_root())
|
||
return True
|
||
except (ValueError, Exception):
|
||
pass
|
||
return False
|
||
|
||
|
||
# ── Shared implementations ────────────────────────────────────────────────────
|
||
|
||
def _read_impl(path_str: str, offset: int | None, max_lines: int | None, is_allowed_fn) -> str:
|
||
try:
|
||
resolved = Path(path_str).expanduser().resolve()
|
||
except Exception as e:
|
||
return f"Invalid path: {e}"
|
||
|
||
if not is_allowed_fn(resolved):
|
||
return f"Access denied: {resolved}"
|
||
|
||
if not resolved.exists():
|
||
return f"File not found: {resolved}"
|
||
|
||
if not resolved.is_file():
|
||
try:
|
||
entries = sorted(resolved.iterdir())
|
||
names = [e.name + ("/" if e.is_dir() else "") for e in entries[:100]]
|
||
return f"Directory listing for {resolved}:\n" + "\n".join(names)
|
||
except Exception as e:
|
||
return f"Cannot list directory: {e}"
|
||
|
||
try:
|
||
raw = resolved.read_bytes()
|
||
except Exception as e:
|
||
return f"Read error: {e}"
|
||
|
||
try:
|
||
text = raw.decode("utf-8")
|
||
except UnicodeDecodeError:
|
||
return f"Binary file (not readable as text): {resolved} [{len(raw)} bytes]"
|
||
|
||
all_lines = text.splitlines()
|
||
total = len(all_lines)
|
||
|
||
# offset is 1-based; default = start of file
|
||
start = max(0, (offset or 1) - 1)
|
||
working = all_lines[start:]
|
||
|
||
limit = min(max_lines or _MAX_LINES, _MAX_LINES)
|
||
truncated = False
|
||
if len(working) > limit:
|
||
working = working[:limit]
|
||
truncated = True
|
||
|
||
result = "\n".join(working)
|
||
if len(result) > _MAX_BYTES:
|
||
result = result[:_MAX_BYTES]
|
||
truncated = True
|
||
|
||
end_line = start + len(working)
|
||
header = f"[Lines {start + 1}–{end_line} of {total}]\n" if (start > 0 or truncated) else ""
|
||
trailer = f"\n\n… [truncated — file has {total} lines; use offset={end_line + 1} to read more]" if truncated else ""
|
||
|
||
return header + result + trailer
|
||
|
||
|
||
def _list_impl(path_str: str, is_allowed_fn) -> str:
|
||
try:
|
||
resolved = Path(path_str).expanduser().resolve()
|
||
except Exception as e:
|
||
return f"Invalid path: {e}"
|
||
|
||
if not is_allowed_fn(resolved):
|
||
return f"Access denied: {resolved}"
|
||
|
||
if not resolved.exists():
|
||
return f"Path not found: {resolved}"
|
||
|
||
if resolved.is_file():
|
||
return f"{resolved} is a file. Use file_read / project_file_read to read it."
|
||
|
||
try:
|
||
entries = sorted(resolved.iterdir(), key=lambda e: (e.is_file(), e.name.lower()))
|
||
lines = []
|
||
for e in entries[:200]:
|
||
if e.is_dir():
|
||
suffix = "/"
|
||
else:
|
||
try:
|
||
st = e.stat()
|
||
mtime = datetime.fromtimestamp(st.st_mtime).strftime("%Y-%m-%d %H:%M")
|
||
suffix = f" ({st.st_size:,} B, {mtime})"
|
||
except Exception:
|
||
suffix = ""
|
||
lines.append(f"{e.name}{suffix}")
|
||
result = "\n".join(lines)
|
||
if len(entries) > 200:
|
||
result += f"\n… ({len(entries) - 200} more not shown)"
|
||
return f"Contents of {resolved}:\n\n{result}"
|
||
except Exception as e:
|
||
return f"Cannot list directory: {e}"
|
||
|
||
|
||
# ── Project-scoped tools ──────────────────────────────────────────────────────
|
||
|
||
async def project_file_read(path: str, offset: int | None = None, max_lines: int | None = None) -> str:
|
||
"""Read a file within the Cortex project directory, with optional line range."""
|
||
return await asyncio.to_thread(_read_impl, path, offset, max_lines, _is_project_allowed)
|
||
|
||
|
||
async def project_file_list(path: str) -> str:
|
||
"""List directory contents within the Cortex project directory, with sizes and timestamps."""
|
||
return await asyncio.to_thread(_list_impl, path, _is_project_allowed)
|
||
|
||
|
||
async def file_stat(path: str) -> str:
|
||
"""Return metadata for a file or directory: type, size, modified time, line count."""
|
||
return await asyncio.to_thread(_sync_file_stat, path)
|
||
|
||
|
||
def _sync_file_stat(path_str: str) -> str:
|
||
try:
|
||
resolved = Path(path_str).expanduser().resolve()
|
||
except Exception as e:
|
||
return f"Invalid path: {e}"
|
||
|
||
if not _is_project_allowed(resolved):
|
||
return f"Access denied: {resolved}\nProject root: {_PROJECT_ROOT}"
|
||
|
||
if not resolved.exists():
|
||
return f"Path not found: {resolved}"
|
||
|
||
try:
|
||
st = resolved.stat()
|
||
except Exception as e:
|
||
return f"Cannot stat: {e}"
|
||
|
||
modified = datetime.fromtimestamp(st.st_mtime).strftime("%Y-%m-%d %H:%M:%S")
|
||
lines = [
|
||
f"Path: {resolved}",
|
||
f"Type: {'directory' if resolved.is_dir() else 'file'}",
|
||
f"Size: {st.st_size:,} bytes",
|
||
f"Modified: {modified}",
|
||
]
|
||
|
||
if resolved.is_file():
|
||
try:
|
||
raw = resolved.read_bytes()
|
||
if b'\x00' not in raw[:1024]:
|
||
lines.append(f"Lines: {len(raw.decode('utf-8', errors='replace').splitlines())}")
|
||
except Exception:
|
||
pass
|
||
elif resolved.is_dir():
|
||
try:
|
||
entries = list(resolved.iterdir())
|
||
n_files = sum(1 for e in entries if e.is_file())
|
||
n_dirs = sum(1 for e in entries if e.is_dir())
|
||
lines.append(f"Contents: {n_files} file(s), {n_dirs} subdirector{'y' if n_dirs == 1 else 'ies'}")
|
||
except Exception:
|
||
pass
|
||
|
||
return "\n".join(lines)
|
||
|
||
|
||
async def file_grep(path: str, pattern: str, context_lines: int = 2, recursive: bool = True) -> str:
|
||
"""Search for a regex pattern in a file or directory, returning matching lines with context."""
|
||
return await asyncio.to_thread(_sync_file_grep, path, pattern, context_lines, recursive)
|
||
|
||
|
||
def _sync_file_grep(path_str: str, pattern: str, context_lines: int, recursive: bool) -> str:
|
||
try:
|
||
resolved = Path(path_str).expanduser().resolve()
|
||
except Exception as e:
|
||
return f"Invalid path: {e}"
|
||
|
||
if not _is_project_allowed(resolved):
|
||
return f"Access denied: {resolved}\nProject root: {_PROJECT_ROOT}"
|
||
|
||
if not resolved.exists():
|
||
return f"Path not found: {resolved}"
|
||
|
||
try:
|
||
regex = re.compile(pattern, re.IGNORECASE)
|
||
except re.error as e:
|
||
return f"Invalid regex pattern: {e}"
|
||
|
||
ctx = max(0, min(context_lines, 5))
|
||
|
||
if resolved.is_file():
|
||
files_to_search = [resolved]
|
||
elif recursive:
|
||
files_to_search = sorted(f for f in resolved.rglob("*") if f.is_file())
|
||
else:
|
||
files_to_search = sorted(f for f in resolved.iterdir() if f.is_file())
|
||
|
||
total_matches = 0
|
||
sections: list[str] = []
|
||
capped = False
|
||
|
||
for fp in files_to_search:
|
||
if total_matches >= _MAX_GREP_MATCHES:
|
||
capped = True
|
||
break
|
||
try:
|
||
raw = fp.read_bytes()
|
||
except OSError:
|
||
continue
|
||
if b'\x00' in raw[:1024]:
|
||
continue # skip binary
|
||
try:
|
||
text = raw.decode("utf-8", errors="replace")
|
||
except Exception:
|
||
continue
|
||
|
||
file_lines = text.splitlines()
|
||
match_indices = [i for i, line in enumerate(file_lines) if regex.search(line)]
|
||
if not match_indices:
|
||
continue
|
||
|
||
total_matches += len(match_indices)
|
||
|
||
try:
|
||
label = str(fp.relative_to(_PROJECT_ROOT))
|
||
except ValueError:
|
||
label = str(fp)
|
||
|
||
file_output = [f"── {label} ──"]
|
||
printed: set[int] = set()
|
||
|
||
for mi in match_indices:
|
||
start = max(0, mi - ctx)
|
||
end = min(len(file_lines), mi + ctx + 1)
|
||
if printed and start > max(printed) + 1:
|
||
file_output.append(" ···")
|
||
for j in range(start, end):
|
||
if j not in printed:
|
||
marker = "►" if j == mi else " "
|
||
file_output.append(f" {j + 1:4d}{marker} {file_lines[j]}")
|
||
printed.add(j)
|
||
|
||
sections.append("\n".join(file_output))
|
||
|
||
if not sections:
|
||
return f"No matches for '{pattern}' in {resolved}"
|
||
|
||
cap_note = f" (capped at {_MAX_GREP_MATCHES})" if capped else ""
|
||
header = f"grep '{pattern}' — {total_matches} match(es){cap_note}:"
|
||
return header + "\n\n" + "\n\n".join(sections)
|
||
|
||
|
||
async def file_diff(path_a: str, path_b: str) -> str:
|
||
"""Compare two files and return a unified diff."""
|
||
return await asyncio.to_thread(_sync_file_diff, path_a, path_b)
|
||
|
||
|
||
def _sync_file_diff(path_a: str, path_b: str) -> str:
|
||
try:
|
||
resolved_a = Path(path_a).expanduser().resolve()
|
||
resolved_b = Path(path_b).expanduser().resolve()
|
||
except Exception as e:
|
||
return f"Invalid path: {e}"
|
||
|
||
for resolved in (resolved_a, resolved_b):
|
||
if not _is_project_allowed(resolved):
|
||
return f"Access denied: {resolved}"
|
||
if not resolved.exists():
|
||
return f"File not found: {resolved}"
|
||
if not resolved.is_file():
|
||
return f"Not a file: {resolved}"
|
||
|
||
try:
|
||
result = subprocess.run(
|
||
["diff", "-u", str(resolved_a), str(resolved_b)],
|
||
capture_output=True, text=True, timeout=15,
|
||
)
|
||
if result.returncode == 0:
|
||
return f"Files are identical: {resolved_a.name} vs {resolved_b.name}"
|
||
output = result.stdout
|
||
if not output:
|
||
return f"diff returned no output (exit {result.returncode}): {result.stderr}"
|
||
if len(output) > _MAX_BYTES:
|
||
output = output[:_MAX_BYTES] + "\n… [truncated]"
|
||
return output
|
||
except subprocess.TimeoutExpired:
|
||
return "Timeout running diff"
|
||
except Exception as e:
|
||
return f"Error: {e}"
|
||
|
||
|
||
async def file_syntax_check(path: str) -> str:
|
||
"""Check syntax of a Python (.py) or JSON (.json) file."""
|
||
return await asyncio.to_thread(_sync_file_syntax_check, path)
|
||
|
||
|
||
def _sync_file_syntax_check(path_str: str) -> str:
|
||
try:
|
||
resolved = Path(path_str).expanduser().resolve()
|
||
except Exception as e:
|
||
return f"Invalid path: {e}"
|
||
|
||
if not _is_project_allowed(resolved):
|
||
return f"Access denied: {resolved}\nProject root: {_PROJECT_ROOT}"
|
||
|
||
if not resolved.exists():
|
||
return f"File not found: {resolved}"
|
||
|
||
if not resolved.is_file():
|
||
return f"Not a file: {resolved}"
|
||
|
||
suffix = resolved.suffix.lower()
|
||
|
||
if suffix == ".py":
|
||
try:
|
||
result = subprocess.run(
|
||
["python3", "-m", "py_compile", str(resolved)],
|
||
capture_output=True, text=True, timeout=15,
|
||
)
|
||
if result.returncode == 0:
|
||
return f"OK — {resolved.name}: syntax valid"
|
||
err = (result.stderr or result.stdout).strip()
|
||
return f"Syntax error in {resolved.name}:\n{err}"
|
||
except subprocess.TimeoutExpired:
|
||
return f"Timeout running py_compile on {resolved.name}"
|
||
except Exception as e:
|
||
return f"Error: {e}"
|
||
|
||
elif suffix == ".json":
|
||
try:
|
||
text = resolved.read_text(encoding="utf-8")
|
||
json.loads(text)
|
||
return f"OK — {resolved.name}: valid JSON"
|
||
except json.JSONDecodeError as e:
|
||
return f"JSON error in {resolved.name}: {e}"
|
||
except Exception as e:
|
||
return f"Error reading {resolved.name}: {e}"
|
||
|
||
else:
|
||
return f"Syntax check not supported for '{suffix}' files. Supported: .py, .json"
|
||
|
||
|
||
# ── System-scoped tools ───────────────────────────────────────────────────────
|
||
|
||
async def file_read(path: str, offset: int | None = None, max_lines: int | None = None) -> str:
|
||
"""Read a local file from the broader system. Allowed: ~/agents_sync/, ~/OSIT_dev/, etc. ADMIN ONLY."""
|
||
return await asyncio.to_thread(_read_impl, path, offset, max_lines, _is_allowed)
|
||
|
||
|
||
async def file_list(path: str) -> str:
|
||
"""List directory contents from the broader system. ADMIN ONLY."""
|
||
return await asyncio.to_thread(_list_impl, path, _is_allowed)
|
||
|
||
|
||
async def file_write(path: str, content: str, mode: str = "overwrite") -> str:
|
||
"""Write or append content to a file. Write roots: ~/agents_sync/ and Cortex home/. ADMIN ONLY."""
|
||
return await asyncio.to_thread(_sync_file_write, path, content, mode)
|
||
|
||
|
||
def _sync_file_write(path: str, content: str, mode: str) -> str:
|
||
try:
|
||
resolved = Path(path).expanduser().resolve()
|
||
except Exception as e:
|
||
return f"Invalid path: {e}"
|
||
|
||
if not _is_write_allowed(resolved):
|
||
return (
|
||
f"Write access denied: {resolved}\n"
|
||
f"Allowed write roots: ~/agents_sync/ and the Cortex home/ directory."
|
||
)
|
||
|
||
if mode not in ("overwrite", "append"):
|
||
return f"Invalid mode '{mode}' — use 'overwrite' or 'append'."
|
||
|
||
try:
|
||
resolved.parent.mkdir(parents=True, exist_ok=True)
|
||
if mode == "append":
|
||
with resolved.open("a", encoding="utf-8") as f:
|
||
f.write(content)
|
||
return f"Appended {len(content)} chars to {resolved}"
|
||
else:
|
||
resolved.write_text(content, encoding="utf-8")
|
||
return f"Wrote {len(content)} chars to {resolved}"
|
||
except Exception as e:
|
||
logger.error("file_write error for %s: %s", resolved, e)
|
||
return f"Write error: {e}"
|
||
|
||
|
||
# ── Session tools ─────────────────────────────────────────────────────────────
|
||
|
||
_SEARCH_EXCERPT_CHARS = 150
|
||
|
||
|
||
async def session_read(date: str) -> str:
|
||
"""Read a full session log by date (YYYY-MM-DD)."""
|
||
return await asyncio.to_thread(_sync_session_read, date.strip())
|
||
|
||
|
||
def _sync_session_read(date: str) -> str:
|
||
from persona import persona_path
|
||
sessions_dir = persona_path() / "sessions"
|
||
if not sessions_dir.exists():
|
||
return "No session logs found."
|
||
|
||
target = sessions_dir / f"{date}.md"
|
||
if target.exists():
|
||
content = target.read_text()
|
||
return f"Session log for {date} ({len(content)} chars):\n\n{content}"
|
||
|
||
available = sorted([f.stem for f in sessions_dir.glob("*.md")], reverse=True)
|
||
if not available:
|
||
return "No session logs found."
|
||
recent = "\n".join(f" {d}" for d in available[:15])
|
||
return f"No session log found for '{date}'. Available dates (most recent first):\n{recent}"
|
||
|
||
|
||
async def session_search(query: str, limit: int = 5) -> str:
|
||
"""Search past session logs for a keyword or phrase."""
|
||
return await asyncio.to_thread(_sync_session_search, query, limit)
|
||
|
||
|
||
def _sync_session_search(query: str, limit: int) -> str:
|
||
from persona import persona_path
|
||
sessions_dir = persona_path() / "sessions"
|
||
if not sessions_dir.exists():
|
||
return "No session logs found."
|
||
|
||
limit = max(1, min(limit, 20))
|
||
pattern = re.compile(re.escape(query), re.IGNORECASE)
|
||
session_files = sorted(sessions_dir.glob("*.md"), reverse=True)
|
||
|
||
matches = []
|
||
for sf in session_files:
|
||
if len(matches) >= limit:
|
||
break
|
||
try:
|
||
text = sf.read_text()
|
||
except OSError:
|
||
continue
|
||
for m in pattern.finditer(text):
|
||
if len(matches) >= limit:
|
||
break
|
||
start = max(0, m.start() - _SEARCH_EXCERPT_CHARS)
|
||
end = min(len(text), m.end() + _SEARCH_EXCERPT_CHARS)
|
||
excerpt = text[start:end].strip()
|
||
if start > 0:
|
||
excerpt = "…" + excerpt
|
||
if end < len(text):
|
||
excerpt = excerpt + "…"
|
||
matches.append(f"[{sf.stem}] {excerpt}")
|
||
|
||
if not matches:
|
||
return f"No matches for '{query}' across {len(session_files)} session logs."
|
||
header = f"Session search: '{query}' — {len(matches)} match(es) across {len(session_files)} logs\n"
|
||
return header + "\n\n".join(matches)
|
||
|
||
|
||
# ── Declarations ──────────────────────────────────────────────────────────────
|
||
|
||
DECLARATIONS = [
|
||
# Project-scoped
|
||
types.FunctionDeclaration(
|
||
name="project_file_read",
|
||
description=(
|
||
"Read a file within the Cortex project directory (source code, docs, config, persona files). "
|
||
"Supports reading a specific line range via offset — use to page through large files "
|
||
"without re-reading from the top. If given a directory path, returns a listing instead. "
|
||
"Project root: ~/agents_sync/projects/Cortex_and_Inara_dev/"
|
||
),
|
||
parameters=types.Schema(
|
||
type=types.Type.OBJECT,
|
||
properties={
|
||
"path": types.Schema(
|
||
type=types.Type.STRING,
|
||
description="Absolute or ~/... path to the file",
|
||
),
|
||
"offset": types.Schema(
|
||
type=types.Type.INTEGER,
|
||
description="Start reading from this line number (1-based). Omit to read from the top.",
|
||
),
|
||
"max_lines": types.Schema(
|
||
type=types.Type.INTEGER,
|
||
description="Maximum lines to return (default 500)",
|
||
),
|
||
},
|
||
required=["path"],
|
||
),
|
||
),
|
||
types.FunctionDeclaration(
|
||
name="project_file_list",
|
||
description=(
|
||
"List files and subdirectories within the Cortex project directory. "
|
||
"Shows file sizes and modified timestamps. "
|
||
"Project root: ~/agents_sync/projects/Cortex_and_Inara_dev/"
|
||
),
|
||
parameters=types.Schema(
|
||
type=types.Type.OBJECT,
|
||
properties={
|
||
"path": types.Schema(
|
||
type=types.Type.STRING,
|
||
description="Absolute or ~/... path to the directory",
|
||
),
|
||
},
|
||
required=["path"],
|
||
),
|
||
),
|
||
types.FunctionDeclaration(
|
||
name="file_stat",
|
||
description=(
|
||
"Get metadata for a file or directory: type, size, modified timestamp, line count (for text files) "
|
||
"or entry counts (for directories). Use before reading to check recency or size. "
|
||
"Restricted to the Cortex project directory."
|
||
),
|
||
parameters=types.Schema(
|
||
type=types.Type.OBJECT,
|
||
properties={
|
||
"path": types.Schema(
|
||
type=types.Type.STRING,
|
||
description="Absolute or ~/... path to the file or directory",
|
||
),
|
||
},
|
||
required=["path"],
|
||
),
|
||
),
|
||
types.FunctionDeclaration(
|
||
name="file_grep",
|
||
description=(
|
||
"Search for a regex pattern in a file or directory, returning matching lines with surrounding "
|
||
"context. Much more efficient than reading an entire source file — use this to find function "
|
||
"definitions, variable names, TODO comments, imports, error strings, etc. "
|
||
"Searches recursively by default. Capped at 50 matches. Skips binary files. "
|
||
"Case-insensitive. Restricted to the Cortex project directory."
|
||
),
|
||
parameters=types.Schema(
|
||
type=types.Type.OBJECT,
|
||
properties={
|
||
"path": types.Schema(
|
||
type=types.Type.STRING,
|
||
description="File or directory to search (e.g. ~/agents_sync/projects/Cortex_and_Inara_dev/cortex/)",
|
||
),
|
||
"pattern": types.Schema(
|
||
type=types.Type.STRING,
|
||
description="Regex pattern to search for (case-insensitive). Examples: 'def ha_', 'import httpx', 'TODO'",
|
||
),
|
||
"context_lines": types.Schema(
|
||
type=types.Type.INTEGER,
|
||
description="Lines of context before/after each match (default 2, max 5)",
|
||
),
|
||
"recursive": types.Schema(
|
||
type=types.Type.BOOLEAN,
|
||
description="Search subdirectories recursively (default true)",
|
||
),
|
||
},
|
||
required=["path", "pattern"],
|
||
),
|
||
),
|
||
types.FunctionDeclaration(
|
||
name="file_diff",
|
||
description=(
|
||
"Compare two files and return a unified diff (diff -u). "
|
||
"Use for code review, verifying what changed between two versions of a file, "
|
||
"or comparing config files side-by-side. "
|
||
"Returns 'Files are identical' if there are no differences. "
|
||
"Restricted to the Cortex project directory."
|
||
),
|
||
parameters=types.Schema(
|
||
type=types.Type.OBJECT,
|
||
properties={
|
||
"path_a": types.Schema(
|
||
type=types.Type.STRING,
|
||
description="Path to the first file (the 'before' or reference file)",
|
||
),
|
||
"path_b": types.Schema(
|
||
type=types.Type.STRING,
|
||
description="Path to the second file (the 'after' or comparison file)",
|
||
),
|
||
},
|
||
required=["path_a", "path_b"],
|
||
),
|
||
),
|
||
types.FunctionDeclaration(
|
||
name="file_syntax_check",
|
||
description=(
|
||
"Check the syntax of a Python (.py) or JSON (.json) file without executing it. "
|
||
"Returns OK or the error with line number. "
|
||
"Use after editing a file before restarting Cortex. "
|
||
"Restricted to the Cortex project directory."
|
||
),
|
||
parameters=types.Schema(
|
||
type=types.Type.OBJECT,
|
||
properties={
|
||
"path": types.Schema(
|
||
type=types.Type.STRING,
|
||
description="Path to the .py or .json file to check",
|
||
),
|
||
},
|
||
required=["path"],
|
||
),
|
||
),
|
||
# System-scoped
|
||
types.FunctionDeclaration(
|
||
name="file_read",
|
||
description=(
|
||
"Read a local file from the broader system (~/agents_sync/, ~/OSIT_dev/, ~/DgrZone_Nextcloud/, "
|
||
"~/OSIT_Nextcloud/, Cortex home/). Supports offset for reading specific line ranges. "
|
||
"For files within the Cortex project, prefer project_file_read instead. "
|
||
"ADMIN ONLY."
|
||
),
|
||
parameters=types.Schema(
|
||
type=types.Type.OBJECT,
|
||
properties={
|
||
"path": types.Schema(
|
||
type=types.Type.STRING,
|
||
description="Absolute or ~/... path to the file",
|
||
),
|
||
"offset": types.Schema(
|
||
type=types.Type.INTEGER,
|
||
description="Start reading from this line number (1-based)",
|
||
),
|
||
"max_lines": types.Schema(
|
||
type=types.Type.INTEGER,
|
||
description="Maximum lines to return (default 500)",
|
||
),
|
||
},
|
||
required=["path"],
|
||
),
|
||
),
|
||
types.FunctionDeclaration(
|
||
name="file_list",
|
||
description=(
|
||
"List files and subdirectories from the broader system. "
|
||
"Shows sizes and modified timestamps. "
|
||
"Allowed: ~/agents_sync/, ~/OSIT_dev/, ~/DgrZone_Nextcloud/, ~/OSIT_Nextcloud/. "
|
||
"ADMIN ONLY."
|
||
),
|
||
parameters=types.Schema(
|
||
type=types.Type.OBJECT,
|
||
properties={
|
||
"path": types.Schema(
|
||
type=types.Type.STRING,
|
||
description="Absolute or ~/... path to the directory",
|
||
),
|
||
},
|
||
required=["path"],
|
||
),
|
||
),
|
||
types.FunctionDeclaration(
|
||
name="file_write",
|
||
description=(
|
||
"Write or append content to a file. "
|
||
"Write-allowed paths: ~/agents_sync/ and the Cortex home/ directory. "
|
||
"Creates parent directories if needed. "
|
||
"ADMIN ONLY. Requires user confirmation before executing."
|
||
),
|
||
parameters=types.Schema(
|
||
type=types.Type.OBJECT,
|
||
properties={
|
||
"path": types.Schema(
|
||
type=types.Type.STRING,
|
||
description="Absolute or ~/... path to write to",
|
||
),
|
||
"content": types.Schema(
|
||
type=types.Type.STRING,
|
||
description="Content to write",
|
||
),
|
||
"mode": types.Schema(
|
||
type=types.Type.STRING,
|
||
description="'overwrite' (default, replaces file) or 'append' (adds to end)",
|
||
),
|
||
},
|
||
required=["path", "content"],
|
||
),
|
||
),
|
||
types.FunctionDeclaration(
|
||
name="session_read",
|
||
description=(
|
||
"Read a full conversation session log by date (YYYY-MM-DD). "
|
||
"Useful for continuity and recalling past decisions. "
|
||
"If the date is not found, lists available dates. "
|
||
"Only reads this user's own sessions."
|
||
),
|
||
parameters=types.Schema(
|
||
type=types.Type.OBJECT,
|
||
properties={
|
||
"date": types.Schema(
|
||
type=types.Type.STRING,
|
||
description="Date in YYYY-MM-DD format (e.g. '2026-05-08')",
|
||
),
|
||
},
|
||
required=["date"],
|
||
),
|
||
),
|
||
types.FunctionDeclaration(
|
||
name="session_search",
|
||
description=(
|
||
"Search past conversation session logs for a keyword or phrase. "
|
||
"Returns matching excerpts with session dates, newest first. "
|
||
"Only searches this user's own sessions."
|
||
),
|
||
parameters=types.Schema(
|
||
type=types.Type.OBJECT,
|
||
properties={
|
||
"query": types.Schema(
|
||
type=types.Type.STRING,
|
||
description="Keyword or phrase to search for",
|
||
),
|
||
"limit": types.Schema(
|
||
type=types.Type.INTEGER,
|
||
description="Max results to return (default 5, max 20)",
|
||
),
|
||
},
|
||
required=["query"],
|
||
),
|
||
),
|
||
]
|