diff --git a/cortex/auth_utils.py b/cortex/auth_utils.py index f80acd9..e78b054 100644 --- a/cortex/auth_utils.py +++ b/cortex/auth_utils.py @@ -115,6 +115,16 @@ def get_user_gemini_key(username: str) -> str | None: return _read_auth(username).get("gemini_api_key") or None +def get_user_role(username: str) -> str: + """Return the user's role: 'admin' or 'user' (default). + + Role is stored as auth.json["role"]. Any unrecognised value falls back to 'user'. + Set via: manage_passwords.py role admin|user + """ + role = _read_auth(username).get("role", "user") + return role if role in ("admin", "user") else "user" + + # --------------------------------------------------------------------------- # JWT helpers # --------------------------------------------------------------------------- diff --git a/cortex/manage_passwords.py b/cortex/manage_passwords.py index 04eb34e..d4b4814 100644 --- a/cortex/manage_passwords.py +++ b/cortex/manage_passwords.py @@ -170,6 +170,25 @@ def cmd_google_add(args): print(f"They can now sign in at {settings.cortex_base_url}/login using that Google account.") +def cmd_role(args): + if len(args) < 2: + print("Usage: manage_passwords.py role admin|user") + sys.exit(1) + username, role = args[0], args[1].lower().strip() + if role not in ("admin", "user"): + print("Role must be 'admin' or 'user'.") + sys.exit(1) + from auth_utils import _read_auth, _write_auth + data = _read_auth(username) + if not data: + print(f"User '{username}' not found — no auth.json.") + sys.exit(1) + old_role = data.get("role", "user") + data["role"] = role + _write_auth(username, data) + print(f"Role for '{username}': {old_role} → {role}") + + if __name__ == "__main__": if len(sys.argv) < 2: print(__doc__) @@ -190,6 +209,8 @@ if __name__ == "__main__": cmd_invite(rest) elif command == "google-add": cmd_google_add(rest) + elif command == "role": + cmd_role(rest) else: print(f"Unknown command: {command}") print(__doc__) diff --git a/cortex/openai_orchestrator.py b/cortex/openai_orchestrator.py index 0041634..d8caa38 100644 --- a/cortex/openai_orchestrator.py +++ b/cortex/openai_orchestrator.py @@ -25,7 +25,7 @@ from openai import AsyncOpenAI from config import settings from orchestrator_engine import OrchestratorResult -from tools import OPENAI_TOOL_SCHEMAS, call_tool +from tools import OPENAI_TOOL_SCHEMAS, call_tool, get_openai_tools_for_role, CONFIRM_REQUIRED logger = logging.getLogger(__name__) @@ -44,6 +44,7 @@ async def run( session_messages: list[dict] | None = None, model_cfg: dict | None = None, respond_with_final: bool = True, + user_role: str = "user", ) -> OrchestratorResult: """ Run a tool-enabled task using an OpenAI-compatible API. @@ -93,6 +94,9 @@ async def run( ) messages.append({"role": "user", "content": task}) + active_tools = get_openai_tools_for_role(user_role) + active_callables: dict | None = None # resolved lazily below + tool_call_log: list[dict] = [] final_response = "" @@ -103,7 +107,7 @@ async def run( response = await client.chat.completions.create( model=model_name, messages=messages, - tools=OPENAI_TOOL_SCHEMAS, + tools=active_tools, tool_choice="auto", ) @@ -130,39 +134,54 @@ async def run( messages.append(assistant_msg) if choice.finish_reason == "tool_calls" and msg.tool_calls: - # Execute all tool calls in parallel, then feed results back - tool_tasks = [ - _execute_tool(tc.function.name, tc.function.arguments) - for tc in msg.tool_calls - ] - results = await asyncio.gather(*tool_tasks, return_exceptions=True) - - for tc, result in zip(msg.tool_calls, results): - result_str = ( - str(result) - if not isinstance(result, Exception) - else f"Tool error: {result}" - ) - logger.info("Tool %s → %d chars", tc.function.name, len(result_str)) + confirm_requested = False + for tc in msg.tool_calls: + name = tc.function.name try: args_parsed = json.loads(tc.function.arguments) except json.JSONDecodeError: args_parsed = {"raw": tc.function.arguments} - tool_call_log.append({ - "tool": tc.function.name, - "args": args_parsed, - "result": result_str, - }) + if name in CONFIRM_REQUIRED: + args_str = json.dumps(args_parsed, indent=2) if args_parsed else "(no arguments)" + result_str = ( + f"⚠️ CONFIRMATION REQUIRED ⚠️\n" + f"Tool: {name}\nArguments:\n{args_str}\n\n" + f"Do NOT call this tool again. Tell the user exactly what you were " + f"about to do, explain the potential impact, and ask them to confirm " + f"by sending a follow-up message before you proceed." + ) + confirm_requested = True + logger.info("Tool %s blocked — confirmation required", name) + else: + result_str = await _execute_tool(name, tc.function.arguments, user_role) + logger.info("Tool %s → %d chars", name, len(result_str)) - # Tool result message — tools array must be re-sent on every request + tool_call_log.append({ + "tool": name, + "args": args_parsed, + "result": "[awaiting confirmation]" if name in CONFIRM_REQUIRED else result_str, + }) messages.append({ "role": "tool", "tool_call_id": tc.id, "content": result_str, }) + if confirm_requested: + # One more model round to produce the confirmation-request message, then stop. + conf_resp = await client.chat.completions.create( + model=model_name, + messages=messages, + tools=active_tools, + tool_choice="none", + ) + final_response = conf_resp.choices[0].message.content or ( + "This action requires your explicit confirmation before it can proceed." + ) + break + else: # finish_reason == "stop" (or no tool_calls) — model is done final_response = msg.content or "" @@ -194,14 +213,16 @@ async def run( ) -async def _execute_tool(name: str, arguments_json: str) -> str: - """Parse tool arguments and execute, returning a string result.""" +async def _execute_tool(name: str, arguments_json: str, user_role: str = "user") -> str: + """Parse tool arguments and execute with role-filtered callables.""" + from tools import get_tools_for_role + _, callables = get_tools_for_role(user_role) try: args = json.loads(arguments_json) except json.JSONDecodeError: args = {} try: - return await call_tool(name, args) + return await call_tool(name, args, callables) except Exception as e: logger.warning("Tool %s failed: %s", name, e) return f"Tool error: {e}" diff --git a/cortex/orchestrator_engine.py b/cortex/orchestrator_engine.py index 13e2c53..f9afd13 100644 --- a/cortex/orchestrator_engine.py +++ b/cortex/orchestrator_engine.py @@ -16,6 +16,7 @@ calls llm_client.complete() directly, which is faster and has no orchestration o """ import asyncio +import json import logging from dataclasses import dataclass, field @@ -24,7 +25,7 @@ from google.genai import types from config import settings from llm_client import complete -from tools import TOOL_DECLARATIONS, call_tool +from tools import TOOL_DECLARATIONS, call_tool, get_tools_for_role, CONFIRM_REQUIRED logger = logging.getLogger(__name__) @@ -59,6 +60,7 @@ async def run( gemini_api_key: str | None = None, model_name: str | None = None, response_role: str = "chat", + user_role: str = "user", ) -> OrchestratorResult: """ Run the full orchestration loop for a task. @@ -89,6 +91,8 @@ async def run( types.Content(role="user", parts=[types.Part(text=task_with_context)]) ] + tool_declarations, tool_callables = get_tools_for_role(user_role) + tool_call_log: list[dict] = [] gemini_summary = "" @@ -101,7 +105,7 @@ async def run( model=model_name or settings.orchestrator_model, contents=contents, config=types.GenerateContentConfig( - tools=TOOL_DECLARATIONS, + tools=tool_declarations, system_instruction=_ORCHESTRATOR_SYSTEM, ), ) @@ -127,29 +131,39 @@ async def run( # Add Gemini's response (with function calls) to the conversation contents.append(candidate.content) - # Execute all tool calls in parallel - tool_tasks = [ - _execute_tool(fc.function_call.name, dict(fc.function_call.args)) - for fc in tool_call_parts - ] - tool_results = await asyncio.gather(*tool_tasks, return_exceptions=True) - - # Build function response parts and update log + # Execute tool calls — check confirmation requirement before calling response_parts: list[types.Part] = [] - for fc_part, result in zip(tool_call_parts, tool_results): + confirm_requested = False + + for fc_part in tool_call_parts: fc = fc_part.function_call - result_str = str(result) if not isinstance(result, Exception) else f"Error: {result}" - logger.info("Tool %s → %d chars", fc.name, len(result_str)) + name = fc.name + args = dict(fc.args) + + if name in CONFIRM_REQUIRED: + args_str = json.dumps(args, indent=2) if args else "(no arguments)" + result_str = ( + f"⚠️ CONFIRMATION REQUIRED ⚠️\n" + f"Tool: {name}\nArguments:\n{args_str}\n\n" + f"Do NOT call this tool again. Tell the user exactly what you were " + f"about to do, explain the potential impact, and ask them to confirm " + f"by sending a follow-up message before you proceed." + ) + confirm_requested = True + logger.info("Tool %s blocked — confirmation required", name) + else: + result_str = await _execute_tool(name, args, tool_callables) + logger.info("Tool %s → %d chars", name, len(result_str)) tool_call_log.append({ - "tool": fc.name, - "args": dict(fc.args), - "result": result_str, + "tool": name, + "args": args, + "result": "[awaiting confirmation]" if name in CONFIRM_REQUIRED else result_str, }) response_parts.append( types.Part( function_response=types.FunctionResponse( - name=fc.name, + name=name, response={"result": result_str}, ) ) @@ -157,6 +171,28 @@ async def run( contents.append(types.Content(role="user", parts=response_parts)) + if confirm_requested: + # Allow one more Gemini round to produce the confirmation-request message, + # then break — tool is not executed until user confirms in a follow-up. + conf_response = await asyncio.to_thread( + client.models.generate_content, + model=model_name or settings.orchestrator_model, + contents=contents, + config=types.GenerateContentConfig( + tools=tool_declarations, + system_instruction=_ORCHESTRATOR_SYSTEM, + ), + ) + conf_parts = ( + conf_response.candidates[0].content.parts + if conf_response.candidates and conf_response.candidates[0].content + else [] + ) + gemini_summary = "".join( + p.text for p in conf_parts if hasattr(p, "text") and p.text + ).strip() or "This action requires your explicit confirmation before it can proceed." + break + else: # Hit the round limit — use whatever Gemini produced last logger.warning("Orchestrator hit max rounds (%d)", settings.orchestrator_max_rounds) @@ -192,10 +228,10 @@ async def run( ) -async def _execute_tool(name: str, args: dict) -> str: +async def _execute_tool(name: str, args: dict, callables: dict | None = None) -> str: """Execute a single tool call, catching all exceptions.""" try: - return await call_tool(name, args) + return await call_tool(name, args, callables) except Exception as e: logger.warning("Tool %s failed: %s", name, e) return f"Tool error: {e}" diff --git a/cortex/routers/orchestrator.py b/cortex/routers/orchestrator.py index 4e185c4..e6a8baa 100644 --- a/cortex/routers/orchestrator.py +++ b/cortex/routers/orchestrator.py @@ -18,7 +18,7 @@ from datetime import datetime, timezone from fastapi import APIRouter from pydantic import BaseModel -from auth_utils import get_user_gemini_key +from auth_utils import get_user_gemini_key, get_user_role from config import settings from context_loader import load_context from persona import set_context, validate as validate_persona @@ -163,6 +163,8 @@ async def _run_job(job_id: str, req: OrchestrateRequest, user: str) -> None: # Choose engine based on the orchestrator role in the model registry orch_model = model_registry.get_model_for_role(user, "orchestrator") + user_role = get_user_role(user) + if orch_model and orch_model.get("type") == "local_openai": result = await openai_orchestrator.run( task=req.task, @@ -170,6 +172,7 @@ async def _run_job(job_id: str, req: OrchestrateRequest, user: str) -> None: session_messages=session_messages, model_cfg=orch_model, respond_with_final=req.respond_with_claude, + user_role=user_role, ) else: # Use the API key embedded in the resolved model config (V2 registry with @@ -186,6 +189,7 @@ async def _run_job(job_id: str, req: OrchestrateRequest, user: str) -> None: gemini_api_key=gemini_key, model_name=orch_model.get("model_name") if orch_model else None, response_role=req.chat_role, + user_role=user_role, ) # Save the turn to the session store so it survives a page refresh diff --git a/cortex/tools/__init__.py b/cortex/tools/__init__.py index d8026c8..65469cc 100644 --- a/cortex/tools/__init__.py +++ b/cortex/tools/__init__.py @@ -45,6 +45,10 @@ from tools.scratch import ( scratch_append as _scratch_append, scratch_clear as _scratch_clear, ) +from tools.system import cortex_restart as _cortex_restart, cortex_logs as _cortex_logs +from tools.web import http_fetch as _http_fetch +from tools.files import file_list as _file_list, file_write as _file_write +from tools.notify import nc_talk_send as _nc_talk_send # --------------------------------------------------------------------------- @@ -285,8 +289,14 @@ _CALLABLES: dict[str, callable] = { "ae_journal_entry_prepend": _ae_journal_entry_prepend, "ae_task_list": _ae_task_list, "file_read": _file_read, + "file_list": _file_list, + "file_write": _file_write, "claude_allow_dir": _claude_allow_dir, "shell_exec": _shell_exec, + "cortex_restart": _cortex_restart, + "cortex_logs": _cortex_logs, + "http_fetch": _http_fetch, + "nc_talk_send": _nc_talk_send, "task_list": _task_list, "task_create": _task_create, "task_update": _task_update, @@ -640,46 +650,219 @@ _scratch_clear_declaration = types.FunctionDeclaration( parameters=types.Schema(type=types.Type.OBJECT, properties={}), ) +_cortex_restart_declaration = types.FunctionDeclaration( + name="cortex_restart", + description=( + "Restart the Cortex service via systemd. Schedules a restart 5 seconds from now. " + "The current connection will drop — inform the user to refresh the page. " + "Use after config changes, memory edits, or when the service needs a fresh start. " + "ADMIN ONLY." + ), + parameters=types.Schema(type=types.Type.OBJECT, properties={}), +) -# Gemini Tool object — pass this to GenerateContentConfig -TOOL_DECLARATIONS = [ - types.Tool(function_declarations=[ - _web_search_declaration, - _ae_journal_list_declaration, - _ae_journal_search_declaration, - _ae_journal_entry_create_declaration, - _ae_journal_entry_update_declaration, - _ae_journal_entry_disable_declaration, - _ae_journal_entry_append_declaration, - _ae_journal_entry_prepend_declaration, - _ae_task_list_declaration, - _file_read_declaration, - _claude_allow_dir_declaration, - _shell_exec_declaration, - _task_list_declaration, - _task_create_declaration, - _task_update_declaration, - _task_complete_declaration, - _cron_list_declaration, - _cron_add_declaration, - _cron_remove_declaration, - _cron_toggle_declaration, - _reminders_add_declaration, - _reminders_list_declaration, - _reminders_clear_declaration, - _scratch_read_declaration, - _scratch_write_declaration, - _scratch_append_declaration, - _scratch_clear_declaration, - ]) +_cortex_logs_declaration = types.FunctionDeclaration( + name="cortex_logs", + description=( + "Fetch recent lines from the Cortex systemd service journal. " + "Use for debugging errors, checking startup status, or reviewing recent activity. " + "ADMIN ONLY." + ), + parameters=types.Schema( + type=types.Type.OBJECT, + properties={ + "lines": types.Schema( + type=types.Type.INTEGER, + description="Number of log lines to return (default 50, max 200)", + ), + }, + ), +) + +_http_fetch_declaration = types.FunctionDeclaration( + name="http_fetch", + description=( + "Fetch a specific URL and return the response. Unlike web_search, this hits " + "a direct URL — useful for health checks, JSON API endpoints, webhook testing, " + "or reading a specific page when you already know the URL. " + "Response body is capped at 8 KB." + ), + parameters=types.Schema( + type=types.Type.OBJECT, + properties={ + "url": types.Schema(type=types.Type.STRING, description="Full URL to fetch"), + "method": types.Schema( + type=types.Type.STRING, + description="HTTP method: GET (default), POST, HEAD", + ), + "body": types.Schema( + type=types.Type.STRING, + description="Optional request body (for POST requests)", + ), + "timeout": types.Schema( + type=types.Type.INTEGER, + description="Request timeout in seconds (default 15, max 60)", + ), + }, + required=["url"], + ), +) + +_file_list_declaration = types.FunctionDeclaration( + name="file_list", + description=( + "List the files and subdirectories in a directory. " + "Allowed paths: ~/agents_sync/, ~/OSIT_dev/, ~/DgrZone_Nextcloud/, ~/OSIT_Nextcloud/. " + "ADMIN ONLY." + ), + parameters=types.Schema( + type=types.Type.OBJECT, + properties={ + "path": types.Schema( + type=types.Type.STRING, + description="Absolute or home-relative path to the directory", + ), + }, + required=["path"], + ), +) + +_file_write_declaration = types.FunctionDeclaration( + name="file_write", + description=( + "Write or append content to a file. " + "Write-allowed paths: ~/agents_sync/ and the Cortex home/ directory. " + "Creates parent directories if needed. " + "ADMIN ONLY. Requires user confirmation before executing." + ), + parameters=types.Schema( + type=types.Type.OBJECT, + properties={ + "path": types.Schema( + type=types.Type.STRING, + description="Absolute or home-relative path to write to", + ), + "content": types.Schema( + type=types.Type.STRING, + description="Content to write", + ), + "mode": types.Schema( + type=types.Type.STRING, + description="'overwrite' (default, replaces file) or 'append' (adds to end)", + ), + }, + required=["path", "content"], + ), +) + +_nc_talk_send_declaration = types.FunctionDeclaration( + name="nc_talk_send", + description=( + "Send a proactive message to the user via their configured notification channel " + "(Nextcloud Talk by default). Use this to notify the user of completed background " + "tasks, important events, or anything they should know between sessions. " + "Requires notification_channel and notification_room set in channels.json." + ), + parameters=types.Schema( + type=types.Type.OBJECT, + properties={ + "message": types.Schema( + type=types.Type.STRING, + description="The message to send to the user", + ), + }, + required=["message"], + ), +) + + +# --------------------------------------------------------------------------- +# Role-based access control +# --------------------------------------------------------------------------- + +# Minimum role required to use each tool. Unlisted tools default to "user". +TOOL_ROLES: dict[str, str] = { + # Admin-only — system-level or broad filesystem access + "shell_exec": "admin", + "claude_allow_dir": "admin", + "cortex_restart": "admin", + "cortex_logs": "admin", + "file_read": "admin", + "file_list": "admin", + "file_write": "admin", + "ae_task_list": "admin", # reads agents_sync kanban +} + +# Tools that require explicit user confirmation before executing. +# The orchestrator injects a CONFIRMATION_REQUIRED result instead of calling +# the tool, prompting Claude to ask the user to confirm in a follow-up message. +CONFIRM_REQUIRED: set[str] = { + "cortex_restart", + "file_write", + "shell_exec", + "cron_remove", + "reminders_clear", +} + +_ROLE_RANK: dict[str, int] = {"user": 0, "admin": 1} + + +def _role_allowed(tool_name: str, role: str) -> bool: + required = TOOL_ROLES.get(tool_name, "user") + return _ROLE_RANK.get(role, 0) >= _ROLE_RANK.get(required, 0) + + +# Flat list of all declarations — single source of truth for both Gemini and OpenAI formats. +_ALL_DECLARATIONS: list[types.FunctionDeclaration] = [ + _web_search_declaration, + _ae_journal_list_declaration, + _ae_journal_search_declaration, + _ae_journal_entry_create_declaration, + _ae_journal_entry_update_declaration, + _ae_journal_entry_disable_declaration, + _ae_journal_entry_append_declaration, + _ae_journal_entry_prepend_declaration, + _ae_task_list_declaration, + _file_read_declaration, + _file_list_declaration, + _file_write_declaration, + _claude_allow_dir_declaration, + _shell_exec_declaration, + _cortex_restart_declaration, + _cortex_logs_declaration, + _http_fetch_declaration, + _nc_talk_send_declaration, + _task_list_declaration, + _task_create_declaration, + _task_update_declaration, + _task_complete_declaration, + _cron_list_declaration, + _cron_add_declaration, + _cron_remove_declaration, + _cron_toggle_declaration, + _reminders_add_declaration, + _reminders_list_declaration, + _reminders_clear_declaration, + _scratch_read_declaration, + _scratch_write_declaration, + _scratch_append_declaration, + _scratch_clear_declaration, ] +# Full Gemini Tool object (all tools — use get_tools_for_role() in production) +TOOL_DECLARATIONS = [types.Tool(function_declarations=_ALL_DECLARATIONS)] -async def call_tool(name: str, args: dict) -> str: - """Dispatch a tool call by name. Returns result as a string.""" - fn = _CALLABLES.get(name) + +async def call_tool(name: str, args: dict, callables: dict | None = None) -> str: + """Dispatch a tool call by name. Returns result as a string. + + Pass `callables` (from get_tools_for_role) to enforce role restrictions. + Falls back to the full _CALLABLES dict if omitted. + """ + dispatch = callables if callables is not None else _CALLABLES + fn = dispatch.get(name) if fn is None: - return f"Unknown tool: {name}" + return f"Tool not available or access denied: {name}" return await fn(**args) @@ -718,9 +901,9 @@ def _schema_to_json(schema) -> dict: def _build_openai_tools() -> list[dict]: - """Convert TOOL_DECLARATIONS (Gemini format) to OpenAI tool schemas.""" + """Convert _ALL_DECLARATIONS (Gemini format) to OpenAI tool schemas.""" out = [] - for decl in TOOL_DECLARATIONS[0].function_declarations: + for decl in _ALL_DECLARATIONS: params = ( _schema_to_json(decl.parameters) if decl.parameters @@ -737,5 +920,27 @@ def _build_openai_tools() -> list[dict]: return out -# OpenAI-format tool list — pass to client.chat.completions.create(tools=...) +# OpenAI-format tool list — all tools (use get_openai_tools_for_role() in production) OPENAI_TOOL_SCHEMAS: list[dict] = _build_openai_tools() + + +# --------------------------------------------------------------------------- +# Role-filtered tool access +# --------------------------------------------------------------------------- + +def get_tools_for_role(role: str) -> tuple[list, dict]: + """Return (gemini_tool_declarations, callables_dict) filtered to tools the role can use. + + Usage in orchestrator: + tool_declarations, tool_callables = get_tools_for_role(user_role) + """ + allowed = {name for name in _CALLABLES if _role_allowed(name, role)} + decls = [d for d in _ALL_DECLARATIONS if d.name in allowed] + callables = {k: v for k, v in _CALLABLES.items() if k in allowed} + return [types.Tool(function_declarations=decls)], callables + + +def get_openai_tools_for_role(role: str) -> list[dict]: + """Return OpenAI tool schemas filtered to tools the role can use.""" + allowed = {name for name in _CALLABLES if _role_allowed(name, role)} + return [t for t in OPENAI_TOOL_SCHEMAS if t["function"]["name"] in allowed] diff --git a/cortex/tools/files.py b/cortex/tools/files.py index e4dd8e0..949483b 100644 --- a/cortex/tools/files.py +++ b/cortex/tools/files.py @@ -110,3 +110,105 @@ def _is_allowed(resolved: Path) -> bool: except ValueError: continue return False + + +# Write is restricted to a tighter set of paths to limit blast radius. +_WRITE_ROOTS: list[Path] = [ + Path.home() / "agents_sync", +] + + +def _is_write_allowed(resolved: Path) -> bool: + for root in _WRITE_ROOTS: + try: + resolved.relative_to(root) + return True + except ValueError: + continue + # Also allow the Cortex home/ directory (persona memory, tasks, etc.) + try: + from config import settings + cortex_home = settings.home_root() + resolved.relative_to(cortex_home) + return True + except (ValueError, Exception): + pass + return False + + +async def file_list(path: str) -> str: + """List the contents of a directory. + + Returns names of files and subdirectories with type indicators (/ for dirs). + Same allow-list as file_read. + """ + return await asyncio.to_thread(_sync_file_list, path) + + +def _sync_file_list(path: str) -> str: + try: + resolved = Path(path).expanduser().resolve() + except Exception as e: + return f"Invalid path: {e}" + + if not _is_allowed(resolved): + allowed_str = ", ".join(str(r) for r in _ALLOWED_ROOTS) + return f"Access denied: {resolved}\nAllowed directories: {allowed_str}" + + if not resolved.exists(): + return f"Path not found: {resolved}" + + if resolved.is_file(): + return f"{resolved} is a file, not a directory. Use file_read to read it." + + try: + entries = sorted(resolved.iterdir(), key=lambda e: (e.is_file(), e.name.lower())) + lines = [] + for e in entries[:200]: + suffix = "/" if e.is_dir() else f" ({e.stat().st_size} bytes)" if e.is_file() else "" + lines.append(f"{e.name}{suffix}") + result = "\n".join(lines) + if len(entries) > 200: + result += f"\n… ({len(entries) - 200} more entries not shown)" + return f"Contents of {resolved}:\n\n{result}" + except Exception as e: + return f"Cannot list directory: {e}" + + +async def file_write(path: str, content: str, mode: str = "overwrite") -> str: + """Write content to a file. + + mode: 'overwrite' (default) replaces the file; 'append' adds to the end. + Write-allowed paths: ~/agents_sync/ and the Cortex home/ directory. + Parent directories are created if they don't exist. + """ + return await asyncio.to_thread(_sync_file_write, path, content, mode) + + +def _sync_file_write(path: str, content: str, mode: str) -> str: + try: + resolved = Path(path).expanduser().resolve() + except Exception as e: + return f"Invalid path: {e}" + + if not _is_write_allowed(resolved): + return ( + f"Write access denied: {resolved}\n" + f"Allowed write roots: ~/agents_sync/ and the Cortex home/ directory." + ) + + if mode not in ("overwrite", "append"): + return f"Invalid mode '{mode}' — use 'overwrite' or 'append'." + + try: + resolved.parent.mkdir(parents=True, exist_ok=True) + if mode == "append": + with resolved.open("a", encoding="utf-8") as f: + f.write(content) + return f"Appended {len(content)} chars to {resolved}" + else: + resolved.write_text(content, encoding="utf-8") + return f"Wrote {len(content)} chars to {resolved}" + except Exception as e: + logger.error("file_write error for %s: %s", resolved, e) + return f"Write error: {e}" diff --git a/cortex/tools/notify.py b/cortex/tools/notify.py new file mode 100644 index 0000000..fdc2c14 --- /dev/null +++ b/cortex/tools/notify.py @@ -0,0 +1,28 @@ +""" +Notification tools — proactively send messages to user channels. + +nc_talk_send routes through notification.py → channels.json. +Requires notification_channel and notification_room set in the user's channels.json. +""" + +import logging + +from persona import get_user + +logger = logging.getLogger(__name__) + + +async def nc_talk_send(message: str) -> str: + """Send a message to the user via their configured notification channel. + + Channel is resolved from the user's channels.json (notification_channel key). + Falls back to Nextcloud Talk if configured. No-op if no channel is set. + """ + from notification import notify + username = get_user() + try: + await notify(username, message) + return f"Message sent to {username}'s notification channel." + except Exception as e: + logger.warning("nc_talk_send error for %s: %s", username, e) + return f"Failed to send notification: {e}" diff --git a/cortex/tools/system.py b/cortex/tools/system.py index 48dfbf6..a0e4ea3 100644 --- a/cortex/tools/system.py +++ b/cortex/tools/system.py @@ -2,11 +2,13 @@ System tools — local machine operations. These tools affect the host system directly. Use with care. +cortex_restart and cortex_logs require admin role. """ import asyncio import logging import os +import subprocess logger = logging.getLogger(__name__) @@ -83,3 +85,42 @@ async def shell_exec(command: str, working_dir: str | None = None, timeout: int except Exception as e: logger.error("shell_exec error: %s", e) return f"Error: {e}" + + +async def cortex_restart() -> str: + """Schedule a Cortex service restart 5 seconds from now. + + Uses a detached subprocess so the restart survives the current process being + terminated by systemd. The calling session will drop — user should refresh. + """ + subprocess.Popen( + ["bash", "-c", "sleep 5 && systemctl --user restart cortex"], + start_new_session=True, + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + close_fds=True, + ) + logger.info("cortex_restart: restart scheduled in 5 seconds") + return ( + "Cortex restart scheduled in 5 seconds. " + "The current connection will drop — please refresh the page after a moment." + ) + + +async def cortex_logs(lines: int = 50) -> str: + """Return recent lines from the Cortex systemd journal.""" + n = min(max(int(lines), 1), 200) + try: + proc = await asyncio.create_subprocess_exec( + "journalctl", "--user", "-u", "cortex", f"-n{n}", "--no-pager", + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=15) + out = stdout.decode(errors="replace").strip() + return out or stderr.decode(errors="replace").strip() or "No log output." + except asyncio.TimeoutError: + return "Error: journalctl timed out" + except Exception as e: + logger.error("cortex_logs error: %s", e) + return f"Error: {e}" diff --git a/cortex/tools/web.py b/cortex/tools/web.py index 5d87c2b..4ff177a 100644 --- a/cortex/tools/web.py +++ b/cortex/tools/web.py @@ -1,12 +1,12 @@ """ -Web search tool — DuckDuckGo backend. - -Uses the duckduckgo-search library. Set DDG_API_KEY in .env for a paid account -(higher rate limits). The free unauthenticated tier works for moderate usage. +Web tools — search (DuckDuckGo) and direct HTTP fetch. """ import asyncio import logging + +import httpx + from config import settings logger = logging.getLogger(__name__) @@ -48,3 +48,29 @@ def _sync_search(query: str, max_results: int) -> list[dict]: except Exception as e: logger.warning("DuckDuckGo search error: %s", e) return [] + + +async def http_fetch( + url: str, + method: str = "GET", + body: str | None = None, + timeout: int = 15, +) -> str: + """Fetch a URL directly and return the response body. + + Unlike web_search, this hits a specific URL — useful for health checks, + API probing, JSON endpoints, webhook testing, etc. + Response body is capped at 8 KB. + """ + method = method.upper() + timeout = min(max(int(timeout), 1), 60) + try: + async with httpx.AsyncClient(timeout=timeout, follow_redirects=True) as client: + resp = await client.request(method, url, content=body) + body_text = resp.text[:8192] + return f"HTTP {resp.status_code} {resp.url}\n\n{body_text}" + except httpx.HTTPError as e: + return f"HTTP error: {e}" + except Exception as e: + logger.warning("http_fetch error for %s: %s", url, e) + return f"Error: {e}"