""" Tool audit log endpoints. Self-service (any authenticated user, own data): GET /api/audit/files → list of available date strings (newest first) GET /api/audit/day?date=YYYY-MM-DD → entries for one day Admin-only (cross-user aggregation): GET /api/audit/recent?user=scott&days=7&limit=200 GET /api/audit/stats?user=scott&days=7 """ import jwt from collections import Counter from datetime import date, timedelta from fastapi import APIRouter, HTTPException, Query, Request from auth_utils import COOKIE_NAME, decode_token, get_user_role from config import settings import tool_audit from persona import list_users router = APIRouter(prefix="/api/audit") def _session_user(request: Request) -> str: """Return the authenticated username or raise 401.""" token = request.cookies.get(COOKIE_NAME) if not token: raise HTTPException(status_code=401, detail="Not authenticated") try: return decode_token(token) except jwt.InvalidTokenError: raise HTTPException(status_code=401, detail="Invalid session") def _require_admin(request: Request) -> str: username = _session_user(request) if get_user_role(username) != "admin": raise HTTPException(status_code=403, detail="Admin access required") return username @router.get("/files") async def audit_files(request: Request) -> dict: """List available audit log dates for the current user (newest first).""" username = _session_user(request) audit_dir = settings.home_root() / username / "tool_audit" if not audit_dir.exists(): return {"dates": []} dates = sorted( [p.stem for p in audit_dir.glob("*.jsonl") if p.stem], reverse=True, ) return {"dates": dates} @router.get("/day") async def audit_day( request: Request, date: str = Query(..., description="YYYY-MM-DD"), ) -> dict: """Return all entries for a specific day (current user only).""" username = _session_user(request) try: from datetime import date as _date d = _date.fromisoformat(date) except ValueError: raise HTTPException(status_code=400, detail="date must be YYYY-MM-DD") entries = tool_audit.read_day(username, date) return {"date": date, "entries": entries} @router.get("/recent") async def audit_recent( request: Request, user: str = Query(None, description="Username to filter (omit for all users)"), days: int = Query(7, ge=1, le=90), limit: int = Query(200, ge=1, le=1000), ) -> dict: _require_admin(request) if user: if user not in list_users(): raise HTTPException(status_code=404, detail=f"User not found: {user}") entries = tool_audit.read_recent(user, days=days, limit=limit) else: entries = tool_audit.read_recent_all_users(days=days, limit=limit) return {"entries": entries, "count": len(entries), "days": days} @router.get("/stats") async def audit_stats( request: Request, user: str = Query(None), days: int = Query(7, ge=1, le=90), ) -> dict: _require_admin(request) if user: if user not in list_users(): raise HTTPException(status_code=404, detail=f"User not found: {user}") entries = tool_audit.read_recent(user, days=days, limit=10000) else: entries = tool_audit.read_recent_all_users(days=days, limit=10000) tool_counts: Counter = Counter() status_counts: Counter = Counter() user_counts: Counter = Counter() for e in entries: tool_counts[e.get("tool", "?")] += 1 status_counts[e.get("status", "?")] += 1 user_counts[e.get("user", "?")] += 1 return { "total": len(entries), "days": days, "by_tool": dict(tool_counts.most_common()), "by_status": dict(status_counts), "by_user": dict(user_counts.most_common()), }