From 584ae679a6d219b21970d87c32de4787544915c9 Mon Sep 17 00:00:00 2001 From: Scott Idem Date: Tue, 5 May 2026 19:55:59 -0400 Subject: [PATCH] feat: tool call audit log MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Every orchestrator tool invocation is recorded to home/{user}/tool_audit/YYYY-MM-DD.jsonl. Each entry captures: timestamp, user, tool, args (truncated), status (ok/error/denied), result length, and a 300-char result snippet. - tool_audit.py: JSONL writer with per-file asyncio locks; read_recent / read_recent_all_users helpers - tools/__init__.py: hook in call_tool() — fire-and-forget record on every dispatch - routers/audit.py: GET /api/audit/recent and /api/audit/stats (admin-only) - tools/files.py: add home_root() to file_read allowed roots so agents can read audit JSONL Co-Authored-By: Claude Sonnet 4.6 --- cortex/main.py | 3 +- cortex/routers/audit.py | 83 ++++++++++++++++++++++++++ cortex/tool_audit.py | 123 +++++++++++++++++++++++++++++++++++++++ cortex/tools/__init__.py | 18 +++++- cortex/tools/files.py | 27 ++++++--- 5 files changed, 244 insertions(+), 10 deletions(-) create mode 100644 cortex/routers/audit.py create mode 100644 cortex/tool_audit.py diff --git a/cortex/main.py b/cortex/main.py index c010bda..69ae419 100644 --- a/cortex/main.py +++ b/cortex/main.py @@ -9,7 +9,7 @@ logging.basicConfig(level=logging.INFO, format="%(levelname)s:%(name)s: %(messag from config import settings from auth_middleware import SessionAuthMiddleware from routers import chat, google_chat, nextcloud_talk, files, distill, auth, orchestrator -from routers import ui, onboarding, settings, help, auth_google, local_llm, push +from routers import ui, onboarding, settings, help, auth_google, local_llm, push, audit @asynccontextmanager @@ -35,6 +35,7 @@ app.include_router(distill.router) app.include_router(auth.router) app.include_router(orchestrator.router) app.include_router(push.router) +app.include_router(audit.router) # Static files — must be mounted BEFORE ui.router so /static/* is matched first. # ui.router has a wildcard /{username}/{persona} that would otherwise catch /static/style.css etc. diff --git a/cortex/routers/audit.py b/cortex/routers/audit.py new file mode 100644 index 0000000..55bfb4f --- /dev/null +++ b/cortex/routers/audit.py @@ -0,0 +1,83 @@ +""" +Tool audit log endpoints — admin only. + + GET /api/audit/recent?user=scott&days=7&limit=200 + Returns recent tool call entries for one user (or all users if user omitted). + + GET /api/audit/stats?user=scott&days=7 + Returns aggregate counts by tool and status. +""" +import jwt +from collections import Counter +from fastapi import APIRouter, HTTPException, Query, Request + +from auth_utils import COOKIE_NAME, decode_token, get_user_role +import tool_audit +from persona import list_users + +router = APIRouter(prefix="/api/audit") + + +def _require_admin(request: Request) -> str: + token = request.cookies.get(COOKIE_NAME) + if not token: + raise HTTPException(status_code=401, detail="Not authenticated") + try: + username = decode_token(token) + except jwt.InvalidTokenError: + raise HTTPException(status_code=401, detail="Invalid session") + if get_user_role(username) != "admin": + raise HTTPException(status_code=403, detail="Admin access required") + return username + + +@router.get("/recent") +async def audit_recent( + request: Request, + user: str = Query(None, description="Username to filter (omit for all users)"), + days: int = Query(7, ge=1, le=90), + limit: int = Query(200, ge=1, le=1000), +) -> dict: + _require_admin(request) + + if user: + if user not in list_users(): + raise HTTPException(status_code=404, detail=f"User not found: {user}") + entries = tool_audit.read_recent(user, days=days, limit=limit) + else: + entries = tool_audit.read_recent_all_users(days=days, limit=limit) + + return {"entries": entries, "count": len(entries), "days": days} + + +@router.get("/stats") +async def audit_stats( + request: Request, + user: str = Query(None), + days: int = Query(7, ge=1, le=90), +) -> dict: + _require_admin(request) + + if user: + if user not in list_users(): + raise HTTPException(status_code=404, detail=f"User not found: {user}") + entries = tool_audit.read_recent(user, days=days, limit=10000) + else: + entries = tool_audit.read_recent_all_users(days=days, limit=10000) + + tool_counts: Counter = Counter() + status_counts: Counter = Counter() + user_counts: Counter = Counter() + + for e in entries: + tool_counts[e.get("tool", "?")] += 1 + status_counts[e.get("status", "?")] += 1 + user_counts[e.get("user", "?")] += 1 + + return { + "total": len(entries), + "days": days, + "by_tool": dict(tool_counts.most_common()), + "by_status": dict(status_counts), + "by_user": dict(user_counts.most_common()), + } diff --git a/cortex/tool_audit.py b/cortex/tool_audit.py new file mode 100644 index 0000000..a824e90 --- /dev/null +++ b/cortex/tool_audit.py @@ -0,0 +1,123 @@ +""" +Tool call audit log. + +One JSONL file per user per day: + home/{user}/tool_audit/YYYY-MM-DD.jsonl + +Each line is a JSON object: + ts ISO timestamp (seconds) + user username + tool tool name + args call arguments (string values truncated at ARG_MAX chars) + status "ok" | "error" | "denied" + result_chars length of full result string + result_snippet first SNIPPET_MAX chars of result +""" +import asyncio +import json +import logging +from datetime import datetime, date +from pathlib import Path + +from config import settings + +logger = logging.getLogger(__name__) + +_ARG_MAX = 500 # truncate individual arg string values longer than this +_SNIPPET_MAX = 300 # chars of result to keep as snippet + +# Per-file write locks — prevents interleaved lines under concurrent tool calls +_locks: dict[str, asyncio.Lock] = {} + + +def _truncate_args(args: dict) -> dict: + out = {} + for k, v in args.items(): + if isinstance(v, str) and len(v) > _ARG_MAX: + out[k] = v[:_ARG_MAX] + f" …[{len(v)} chars total]" + else: + out[k] = v + return out + + +def _audit_path(user: str, day: date | None = None) -> Path: + d = day or date.today() + audit_dir = settings.home_root() / user / "tool_audit" + audit_dir.mkdir(parents=True, exist_ok=True) + return audit_dir / f"{d.isoformat()}.jsonl" + + +async def record( + user: str, + tool: str, + args: dict, + status: str, # "ok" | "error" | "denied" + result: str = "", +) -> None: + """Append one audit entry. Fire with asyncio.create_task — never awaited directly.""" + path = _audit_path(user) + key = str(path) + if key not in _locks: + _locks[key] = asyncio.Lock() + + entry = { + "ts": datetime.now().isoformat(timespec="seconds"), + "user": user, + "tool": tool, + "args": _truncate_args(args), + "status": status, + "result_chars": len(result), + "result_snippet": result[:_SNIPPET_MAX], + } + + async with _locks[key]: + try: + with path.open("a", encoding="utf-8") as f: + f.write(json.dumps(entry) + "\n") + except Exception as e: + logger.warning("audit log write failed for %s: %s", user, e) + + +def read_recent(user: str, days: int = 7, limit: int = 200) -> list[dict]: + """Read the most recent `limit` entries across the last `days` days. + + Returns entries sorted newest-first (by ts field, file order within a day). + """ + from datetime import timedelta + today = date.today() + entries: list[dict] = [] + + for offset in range(days): + day = today - timedelta(days=offset) + path = settings.home_root() / user / "tool_audit" / f"{day.isoformat()}.jsonl" + if not path.exists(): + continue + try: + lines = path.read_text(encoding="utf-8").splitlines() + except Exception: + continue + day_entries = [] + for line in lines: + line = line.strip() + if not line: + continue + try: + day_entries.append(json.loads(line)) + except json.JSONDecodeError: + pass + # Newest within the day first + entries.extend(reversed(day_entries)) + if len(entries) >= limit: + break + + return entries[:limit] + + +def read_recent_all_users(days: int = 7, limit: int = 500) -> list[dict]: + """Read recent entries across all users, sorted newest-first.""" + from persona import list_users + all_entries: list[dict] = [] + for user in list_users(): + all_entries.extend(read_recent(user, days=days, limit=limit)) + all_entries.sort(key=lambda e: e.get("ts", ""), reverse=True) + return all_entries[:limit] diff --git a/cortex/tools/__init__.py b/cortex/tools/__init__.py index 19e45f4..4e9cd7e 100644 --- a/cortex/tools/__init__.py +++ b/cortex/tools/__init__.py @@ -207,12 +207,28 @@ async def call_tool(name: str, args: dict, callables: dict | None = None) -> str Pass `callables` (from get_tools_for_role) to enforce role restrictions. Falls back to the full _CALLABLES dict if omitted. + + Every call is recorded to the tool audit log (tool_audit.py). """ + import asyncio + import tool_audit + from persona import get_user + + user = get_user() or "unknown" dispatch = callables if callables is not None else _CALLABLES fn = dispatch.get(name) + if fn is None: + asyncio.create_task(tool_audit.record(user, name, args, "denied")) return f"Tool not available or access denied: {name}" - return await fn(**args) + + try: + result = await fn(**args) + asyncio.create_task(tool_audit.record(user, name, args, "ok", result)) + return result + except Exception as e: + asyncio.create_task(tool_audit.record(user, name, args, "error", str(e))) + raise # ── OpenAI JSON Schema conversion ──────────────────────────────────────────── diff --git a/cortex/tools/files.py b/cortex/tools/files.py index 156c17e..332ed31 100644 --- a/cortex/tools/files.py +++ b/cortex/tools/files.py @@ -16,12 +16,21 @@ logger = logging.getLogger(__name__) # Directories the orchestrator is allowed to read from. # Paths are resolved (symlinks followed, ~ expanded) at import time. -_ALLOWED_ROOTS: list[Path] = [ - Path.home() / "agents_sync", - Path.home() / "OSIT_dev", - Path.home() / "DgrZone_Nextcloud", - Path.home() / "OSIT_Nextcloud", -] +def _build_allowed_roots() -> list[Path]: + roots = [ + Path.home() / "agents_sync", + Path.home() / "OSIT_dev", + Path.home() / "DgrZone_Nextcloud", + Path.home() / "OSIT_Nextcloud", + ] + try: + from config import settings + roots.append(settings.home_root()) + except Exception: + pass + return roots + +_ALLOWED_ROOTS: list[Path] = _build_allowed_roots() # Hard cap on file size to prevent accidental context blowout _MAX_BYTES = 50_000 # ~50 KB @@ -221,8 +230,10 @@ DECLARATIONS = [ name="file_read", description=( "Read a local file and return its contents. " - "Allowed directories: ~/agents_sync/, ~/OSIT_dev/, ~/DgrZone_Nextcloud/, ~/OSIT_Nextcloud/. " - "Use this to read documentation, notes, CLAUDE.md files, or config references. " + "Allowed directories: ~/agents_sync/, ~/OSIT_dev/, ~/DgrZone_Nextcloud/, ~/OSIT_Nextcloud/, " + "and the Cortex home/ directory (persona memory, tool audit logs, etc.). " + "Use this to read documentation, notes, CLAUDE.md files, config references, " + "or tool audit logs at home/{user}/tool_audit/YYYY-MM-DD.jsonl. " "If given a directory path, returns a directory listing instead." ), parameters=types.Schema(