Adds an "Audit Log" section (collapsed by default) at the bottom of the Files panel showing tool_audit/YYYY-MM-DD.jsonl files for the current user. - GET /api/audit/files — lists available dates (newest first, any auth user) - GET /api/audit/day — returns entries for one date as JSON (any auth user) - tool_audit.read_day() — reads a single day's JSONL file chronologically - Clicking a date renders a read-only table: time / tool / status / args / result - Status cells are colour-coded (green ok, red error, amber denied) - Edit/Raw/Preview/Save buttons are hidden in audit view, restored on file switch - Audit group starts collapsed; expands on click like other file groups Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
123 lines
3.8 KiB
Python
123 lines
3.8 KiB
Python
"""
|
|
Tool audit log endpoints.
|
|
|
|
Self-service (any authenticated user, own data):
|
|
GET /api/audit/files → list of available date strings (newest first)
|
|
GET /api/audit/day?date=YYYY-MM-DD → entries for one day
|
|
|
|
Admin-only (cross-user aggregation):
|
|
GET /api/audit/recent?user=scott&days=7&limit=200
|
|
GET /api/audit/stats?user=scott&days=7
|
|
"""
|
|
import jwt
|
|
from collections import Counter
|
|
from datetime import date, timedelta
|
|
from fastapi import APIRouter, HTTPException, Query, Request
|
|
|
|
from auth_utils import COOKIE_NAME, decode_token, get_user_role
|
|
from config import settings
|
|
import tool_audit
|
|
from persona import list_users
|
|
|
|
router = APIRouter(prefix="/api/audit")
|
|
|
|
|
|
def _session_user(request: Request) -> str:
|
|
"""Return the authenticated username or raise 401."""
|
|
token = request.cookies.get(COOKIE_NAME)
|
|
if not token:
|
|
raise HTTPException(status_code=401, detail="Not authenticated")
|
|
try:
|
|
return decode_token(token)
|
|
except jwt.InvalidTokenError:
|
|
raise HTTPException(status_code=401, detail="Invalid session")
|
|
|
|
|
|
def _require_admin(request: Request) -> str:
|
|
username = _session_user(request)
|
|
if get_user_role(username) != "admin":
|
|
raise HTTPException(status_code=403, detail="Admin access required")
|
|
return username
|
|
|
|
|
|
@router.get("/files")
|
|
async def audit_files(request: Request) -> dict:
|
|
"""List available audit log dates for the current user (newest first)."""
|
|
username = _session_user(request)
|
|
audit_dir = settings.home_root() / username / "tool_audit"
|
|
if not audit_dir.exists():
|
|
return {"dates": []}
|
|
dates = sorted(
|
|
[p.stem for p in audit_dir.glob("*.jsonl") if p.stem],
|
|
reverse=True,
|
|
)
|
|
return {"dates": dates}
|
|
|
|
|
|
@router.get("/day")
|
|
async def audit_day(
|
|
request: Request,
|
|
date: str = Query(..., description="YYYY-MM-DD"),
|
|
) -> dict:
|
|
"""Return all entries for a specific day (current user only)."""
|
|
username = _session_user(request)
|
|
try:
|
|
from datetime import date as _date
|
|
d = _date.fromisoformat(date)
|
|
except ValueError:
|
|
raise HTTPException(status_code=400, detail="date must be YYYY-MM-DD")
|
|
entries = tool_audit.read_day(username, date)
|
|
return {"date": date, "entries": entries}
|
|
|
|
|
|
@router.get("/recent")
|
|
async def audit_recent(
|
|
request: Request,
|
|
user: str = Query(None, description="Username to filter (omit for all users)"),
|
|
days: int = Query(7, ge=1, le=90),
|
|
limit: int = Query(200, ge=1, le=1000),
|
|
) -> dict:
|
|
_require_admin(request)
|
|
|
|
if user:
|
|
if user not in list_users():
|
|
raise HTTPException(status_code=404, detail=f"User not found: {user}")
|
|
entries = tool_audit.read_recent(user, days=days, limit=limit)
|
|
else:
|
|
entries = tool_audit.read_recent_all_users(days=days, limit=limit)
|
|
|
|
return {"entries": entries, "count": len(entries), "days": days}
|
|
|
|
|
|
@router.get("/stats")
|
|
async def audit_stats(
|
|
request: Request,
|
|
user: str = Query(None),
|
|
days: int = Query(7, ge=1, le=90),
|
|
) -> dict:
|
|
_require_admin(request)
|
|
|
|
if user:
|
|
if user not in list_users():
|
|
raise HTTPException(status_code=404, detail=f"User not found: {user}")
|
|
entries = tool_audit.read_recent(user, days=days, limit=10000)
|
|
else:
|
|
entries = tool_audit.read_recent_all_users(days=days, limit=10000)
|
|
|
|
tool_counts: Counter = Counter()
|
|
status_counts: Counter = Counter()
|
|
user_counts: Counter = Counter()
|
|
|
|
for e in entries:
|
|
tool_counts[e.get("tool", "?")] += 1
|
|
status_counts[e.get("status", "?")] += 1
|
|
user_counts[e.get("user", "?")] += 1
|
|
|
|
return {
|
|
"total": len(entries),
|
|
"days": days,
|
|
"by_tool": dict(tool_counts.most_common()),
|
|
"by_status": dict(status_counts),
|
|
"by_user": dict(user_counts.most_common()),
|
|
}
|