""" Tool call audit log. One JSONL file per user per day: home/{user}/tool_audit/YYYY-MM-DD.jsonl Each line is a JSON object: ts ISO timestamp (seconds) user username tool tool name args call arguments (string values truncated at ARG_MAX chars) status "ok" | "error" | "denied" result_chars length of full result string result_snippet first SNIPPET_MAX chars of result """ import asyncio import json import logging from contextvars import ContextVar from datetime import datetime, date from pathlib import Path from config import settings logger = logging.getLogger(__name__) _ARG_MAX = 500 # truncate individual arg string values longer than this _SNIPPET_MAX = 300 # chars of result to keep as snippet # Per-file write locks — prevents interleaved lines under concurrent tool calls _locks: dict[str, asyncio.Lock] = {} # ContextVars set by orchestrators before their tool loop runs _audit_engine: ContextVar[str] = ContextVar("_audit_engine", default="") _audit_model: ContextVar[str] = ContextVar("_audit_model", default="") def set_context(engine: str, model: str) -> None: """Call at the start of each orchestrator run to tag subsequent tool calls.""" _audit_engine.set(engine) _audit_model.set(model) def _truncate_args(args: dict) -> dict: out = {} for k, v in args.items(): if isinstance(v, str) and len(v) > _ARG_MAX: out[k] = v[:_ARG_MAX] + f" …[{len(v)} chars total]" else: out[k] = v return out def _audit_path(user: str, day: date | None = None) -> Path: d = day or date.today() audit_dir = settings.home_root() / user / "tool_audit" audit_dir.mkdir(parents=True, exist_ok=True) return audit_dir / f"{d.isoformat()}.jsonl" async def record( user: str, tool: str, args: dict, status: str, # "ok" | "error" | "denied" result: str = "", ) -> None: """Append one audit entry. Fire with asyncio.create_task — never awaited directly.""" path = _audit_path(user) key = str(path) if key not in _locks: _locks[key] = asyncio.Lock() entry = { "ts": datetime.now().isoformat(timespec="seconds"), "user": user, "engine": _audit_engine.get(), "model": _audit_model.get(), "tool": tool, "args": _truncate_args(args), "status": status, "result_chars": len(result), "result_snippet": result[:_SNIPPET_MAX], } async with _locks[key]: try: with path.open("a", encoding="utf-8") as f: f.write(json.dumps(entry) + "\n") except Exception as e: logger.warning("audit log write failed for %s: %s", user, e) def read_recent(user: str, days: int = 7, limit: int = 200) -> list[dict]: """Read the most recent `limit` entries across the last `days` days. Returns entries sorted newest-first (by ts field, file order within a day). """ from datetime import timedelta today = date.today() entries: list[dict] = [] for offset in range(days): day = today - timedelta(days=offset) path = settings.home_root() / user / "tool_audit" / f"{day.isoformat()}.jsonl" if not path.exists(): continue try: lines = path.read_text(encoding="utf-8").splitlines() except Exception: continue day_entries = [] for line in lines: line = line.strip() if not line: continue try: day_entries.append(json.loads(line)) except json.JSONDecodeError: pass # Newest within the day first entries.extend(reversed(day_entries)) if len(entries) >= limit: break return entries[:limit] def read_day(user: str, day_str: str) -> list[dict]: """Read all entries for a specific date string (YYYY-MM-DD), chronological order.""" path = settings.home_root() / user / "tool_audit" / f"{day_str}.jsonl" if not path.exists(): return [] entries = [] try: for line in path.read_text(encoding="utf-8").splitlines(): line = line.strip() if not line: continue try: entries.append(json.loads(line)) except json.JSONDecodeError: pass except Exception: pass return entries def read_recent_all_users(days: int = 7, limit: int = 500) -> list[dict]: """Read recent entries across all users, sorted newest-first.""" from persona import list_users all_entries: list[dict] = [] for user in list_users(): all_entries.extend(read_recent(user, days=days, limit=limit)) all_entries.sort(key=lambda e: e.get("ts", ""), reverse=True) return all_entries[:limit]