""" System tools — local machine operations. These tools affect the host system directly. Use with care. All tools in this module require the admin role. """ import asyncio import logging import os import subprocess from pathlib import Path from google.genai import types logger = logging.getLogger(__name__) # Absolute paths — resolved relative to this file so they work regardless of cwd _CORTEX_DIR = Path(__file__).parent # .../Cortex_and_Inara_dev/cortex/ _PROJECT_ROOT = _CORTEX_DIR.parent # .../Cortex_and_Inara_dev/ ALLOW_SCRIPT = "/home/scott/.local/bin/claude-allow-dir" async def claude_allow_dir(path: str, mode: str = "rw") -> str: """Add Read/Edit allow rules to ~/.claude/settings.json for a directory. Calls the claude-allow-dir script, which edits settings.json directly. Changes take effect in the next Claude Code session (or after /hooks reload). """ if mode not in ("r", "w", "rw"): return f"Error: mode must be r, w, or rw (got '{mode}')" try: proc = await asyncio.create_subprocess_exec( "python3", ALLOW_SCRIPT, path, mode, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=10) output = stdout.decode().strip() err = stderr.decode().strip() if proc.returncode != 0: logger.warning("claude-allow-dir failed (rc=%d): %s", proc.returncode, err) return f"Failed (exit {proc.returncode}): {err or output}" return output or "Done." except asyncio.TimeoutError: return "Error: script timed out" except Exception as e: logger.error("claude_allow_dir error: %s", e) return f"Error: {e}" async def shell_exec(command: str, working_dir: str | None = None, timeout: int = 30) -> str: """Execute a shell command on the Cortex host and return combined stdout/stderr.""" timeout = min(max(timeout, 1), 120) cwd = None if working_dir: cwd = os.path.expanduser(working_dir) if not os.path.isdir(cwd): return f"Error: working_dir '{working_dir}' does not exist or is not a directory" try: proc = await asyncio.create_subprocess_shell( command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, cwd=cwd, ) stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=timeout) out = stdout.decode(errors="replace").strip() err = stderr.decode(errors="replace").strip() parts = [] if out: parts.append(out) if err: parts.append(f"[stderr]\n{err}") combined = "\n".join(parts) if parts else "(no output)" if proc.returncode != 0: return f"Exit {proc.returncode}:\n{combined}" return combined except asyncio.TimeoutError: return f"Error: command timed out after {timeout}s" except Exception as e: logger.error("shell_exec error: %s", e) return f"Error: {e}" async def cortex_restart() -> str: """Schedule a Cortex service restart 5 seconds from now. Uses a detached subprocess so the restart survives the current process being terminated by systemd. The calling session will drop — user should refresh. """ subprocess.Popen( ["bash", "-c", "sleep 5 && systemctl --user restart cortex"], start_new_session=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, close_fds=True, ) logger.info("cortex_restart: restart scheduled in 5 seconds") return ( "Cortex restart scheduled in 5 seconds. " "The current connection will drop — please refresh the page after a moment." ) async def cortex_logs(lines: int = 50) -> str: """Return recent lines from the Cortex systemd journal.""" n = min(max(int(lines), 1), 200) try: proc = await asyncio.create_subprocess_exec( "journalctl", "--user", "-u", "cortex", f"-n{n}", "--no-pager", stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=15) out = stdout.decode(errors="replace").strip() return out or stderr.decode(errors="replace").strip() or "No log output." except asyncio.TimeoutError: return "Error: journalctl timed out" except Exception as e: logger.error("cortex_logs error: %s", e) return f"Error: {e}" async def cortex_status() -> str: """Return Cortex service status: git branch/commit, ahead/behind remote, and systemctl state.""" lines = [] async def _git(*args: str) -> str: proc = await asyncio.create_subprocess_exec( "git", "-C", str(_PROJECT_ROOT), *args, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.DEVNULL, ) stdout, _ = await asyncio.wait_for(proc.communicate(), timeout=10) return stdout.decode(errors="replace").strip() try: branch = await _git("rev-parse", "--abbrev-ref", "HEAD") commit = await _git("log", "--oneline", "-1") # fetch quietly so ahead/behind is current await asyncio.create_subprocess_exec( "git", "-C", str(_PROJECT_ROOT), "fetch", "--quiet", stdout=asyncio.subprocess.DEVNULL, stderr=asyncio.subprocess.DEVNULL, ) ahead_behind = await _git("rev-list", "--left-right", "--count", f"HEAD...origin/{branch}") ahead, behind = (ahead_behind.split() + ["?", "?"])[:2] lines.append(f"**Branch:** {branch} | ahead {ahead} / behind {behind}") lines.append(f"**Commit:** {commit}") except Exception as e: lines.append(f"Git info unavailable: {e}") lines.append("") try: proc = await asyncio.create_subprocess_exec( "systemctl", "--user", "status", "cortex", "--no-pager", "-l", stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT, ) stdout, _ = await asyncio.wait_for(proc.communicate(), timeout=10) # First 15 lines of systemctl output is enough — avoids log flood status_lines = stdout.decode(errors="replace").splitlines()[:15] lines.extend(status_lines) except Exception as e: lines.append(f"systemctl status unavailable: {e}") return "\n".join(lines) async def cortex_update() -> str: """Pull the latest code from git, syntax-check all Python files, and report. Does NOT restart automatically — call cortex_restart separately after reviewing the output if you want to apply changes. """ lines = [] async def _run(*cmd: str, cwd: Path = _PROJECT_ROOT, timeout: int = 30) -> tuple[int, str]: proc = await asyncio.create_subprocess_exec( *cmd, cwd=str(cwd), stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT, ) stdout, _ = await asyncio.wait_for(proc.communicate(), timeout=timeout) return proc.returncode, stdout.decode(errors="replace").strip() # 1. Check for incoming commits before pulling try: await _run("git", "fetch", "--quiet") rc, incoming = await _run("git", "log", "--oneline", "HEAD..origin/HEAD") if rc == 0 and not incoming: # Double-check with branch name in case origin/HEAD isn't set branch_rc, branch = await _run("git", "rev-parse", "--abbrev-ref", "HEAD") _, incoming = await _run("git", "log", "--oneline", f"HEAD..origin/{branch.strip()}") except asyncio.TimeoutError: return "Error: git fetch timed out — check network connectivity." except Exception as e: return f"Error during git fetch: {e}" if not incoming: rc2, current = await _run("git", "log", "--oneline", "-1") return f"Already up to date.\n\nCurrent commit: {current}" lines.append(f"**Incoming commits:**\n{incoming}\n") # 2. Pull try: rc, pull_out = await _run("git", "pull", "--ff-only") except asyncio.TimeoutError: return "Error: git pull timed out." except Exception as e: return f"Error during git pull: {e}" if rc != 0: return f"git pull failed (exit {rc}):\n{pull_out}" lines.append(f"**git pull:**\n{pull_out}\n") # 3. Syntax check all Python files under cortex/ py_files = sorted(_CORTEX_DIR.rglob("*.py")) errors = [] for f in py_files: result = subprocess.run( ["python3", "-m", "py_compile", str(f)], capture_output=True, text=True, ) if result.returncode != 0: errors.append(f" {f.relative_to(_PROJECT_ROOT)}: {result.stderr.strip()}") if errors: lines.append(f"**Syntax errors — do NOT restart until fixed:**") lines.extend(errors) else: lines.append(f"**Syntax check:** {len(py_files)} files — all OK.") lines.append("Call `cortex_restart` to apply the update.") return "\n".join(lines) DECLARATIONS = [ types.FunctionDeclaration( name="shell_exec", description=( "Execute a shell command on the Cortex host machine and return its output. " "Use for system diagnostics: disk usage (df -h), process status (ps aux), " "directory listings (ls), memory (free -h), uptime, network info, log tails, etc. " "Commands run as the Cortex service user. Timeout enforced (default 30s, max 120s). " "Avoid destructive commands — prefer read-only system queries." ), parameters=types.Schema( type=types.Type.OBJECT, properties={ "command": types.Schema(type=types.Type.STRING, description="Shell command to run (e.g. 'df -h', 'ls ~/agents_sync/', 'journalctl --user -u cortex -n 50')"), "working_dir": types.Schema(type=types.Type.STRING, description="Optional working directory (e.g. '~/agents_sync/projects'). Defaults to home directory."), "timeout": types.Schema(type=types.Type.INTEGER, description="Timeout in seconds (default 30, max 120)"), }, required=["command"], ), ), types.FunctionDeclaration( name="claude_allow_dir", description=( "Add a directory to Claude Code's auto-allow list so Claude can read or write " "files there without prompting. Edits ~/.claude/settings.json on the local machine. " "Use this when Claude is silently hanging or being blocked from accessing a directory. " "Changes take effect in the next Claude Code session." ), parameters=types.Schema( type=types.Type.OBJECT, properties={ "path": types.Schema(type=types.Type.STRING, description="Absolute or home-relative path to the directory (e.g. ~/OSIT_dev/aether_api_fastapi or /home/scott/agents_sync)"), "mode": types.Schema(type=types.Type.STRING, description="Permission mode: 'r' (read-only), 'w' (write-only), or 'rw' (both). Default: rw"), }, required=["path"], ), ), types.FunctionDeclaration( name="cortex_restart", description=( "Restart the Cortex service via systemd. Schedules a restart 5 seconds from now. " "The current connection will drop — inform the user to refresh the page. " "Use after config changes, memory edits, or when the service needs a fresh start. " "ADMIN ONLY." ), parameters=types.Schema(type=types.Type.OBJECT, properties={}), ), types.FunctionDeclaration( name="cortex_logs", description=( "Fetch recent lines from the Cortex systemd service journal. " "Use for debugging errors, checking startup status, or reviewing recent activity. " "ADMIN ONLY." ), parameters=types.Schema( type=types.Type.OBJECT, properties={ "lines": types.Schema(type=types.Type.INTEGER, description="Number of log lines to return (default 50, max 200)"), }, ), ), types.FunctionDeclaration( name="cortex_status", description=( "Return Cortex service status: current git branch and commit, how many commits " "ahead/behind the remote, and the systemctl service state. " "Use to check what version is running or whether the service is healthy. " "ADMIN ONLY." ), parameters=types.Schema(type=types.Type.OBJECT, properties={}), ), types.FunctionDeclaration( name="cortex_update", description=( "Pull the latest code from git, run a syntax check on all Python files, and report " "what changed. Does NOT restart automatically — call cortex_restart separately after " "reviewing the output. Will report syntax errors if the pull introduces broken code. " "ADMIN ONLY. Requires confirmation." ), parameters=types.Schema(type=types.Type.OBJECT, properties={}), ), ]