Files
Cortex-Inara/cortex/routers/audit.py
Scott Idem 02accefe8f feat: audit log in Files panel sidebar
Adds an "Audit Log" section (collapsed by default) at the bottom of the Files
panel showing tool_audit/YYYY-MM-DD.jsonl files for the current user.

- GET /api/audit/files  — lists available dates (newest first, any auth user)
- GET /api/audit/day    — returns entries for one date as JSON (any auth user)
- tool_audit.read_day() — reads a single day's JSONL file chronologically
- Clicking a date renders a read-only table: time / tool / status / args / result
- Status cells are colour-coded (green ok, red error, amber denied)
- Edit/Raw/Preview/Save buttons are hidden in audit view, restored on file switch
- Audit group starts collapsed; expands on click like other file groups

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-05 20:36:08 -04:00

123 lines
3.8 KiB
Python

"""
Tool audit log endpoints.
Self-service (any authenticated user, own data):
GET /api/audit/files → list of available date strings (newest first)
GET /api/audit/day?date=YYYY-MM-DD → entries for one day
Admin-only (cross-user aggregation):
GET /api/audit/recent?user=scott&days=7&limit=200
GET /api/audit/stats?user=scott&days=7
"""
import jwt
from collections import Counter
from datetime import date, timedelta
from fastapi import APIRouter, HTTPException, Query, Request
from auth_utils import COOKIE_NAME, decode_token, get_user_role
from config import settings
import tool_audit
from persona import list_users
router = APIRouter(prefix="/api/audit")
def _session_user(request: Request) -> str:
"""Return the authenticated username or raise 401."""
token = request.cookies.get(COOKIE_NAME)
if not token:
raise HTTPException(status_code=401, detail="Not authenticated")
try:
return decode_token(token)
except jwt.InvalidTokenError:
raise HTTPException(status_code=401, detail="Invalid session")
def _require_admin(request: Request) -> str:
username = _session_user(request)
if get_user_role(username) != "admin":
raise HTTPException(status_code=403, detail="Admin access required")
return username
@router.get("/files")
async def audit_files(request: Request) -> dict:
"""List available audit log dates for the current user (newest first)."""
username = _session_user(request)
audit_dir = settings.home_root() / username / "tool_audit"
if not audit_dir.exists():
return {"dates": []}
dates = sorted(
[p.stem for p in audit_dir.glob("*.jsonl") if p.stem],
reverse=True,
)
return {"dates": dates}
@router.get("/day")
async def audit_day(
request: Request,
date: str = Query(..., description="YYYY-MM-DD"),
) -> dict:
"""Return all entries for a specific day (current user only)."""
username = _session_user(request)
try:
from datetime import date as _date
d = _date.fromisoformat(date)
except ValueError:
raise HTTPException(status_code=400, detail="date must be YYYY-MM-DD")
entries = tool_audit.read_day(username, date)
return {"date": date, "entries": entries}
@router.get("/recent")
async def audit_recent(
request: Request,
user: str = Query(None, description="Username to filter (omit for all users)"),
days: int = Query(7, ge=1, le=90),
limit: int = Query(200, ge=1, le=1000),
) -> dict:
_require_admin(request)
if user:
if user not in list_users():
raise HTTPException(status_code=404, detail=f"User not found: {user}")
entries = tool_audit.read_recent(user, days=days, limit=limit)
else:
entries = tool_audit.read_recent_all_users(days=days, limit=limit)
return {"entries": entries, "count": len(entries), "days": days}
@router.get("/stats")
async def audit_stats(
request: Request,
user: str = Query(None),
days: int = Query(7, ge=1, le=90),
) -> dict:
_require_admin(request)
if user:
if user not in list_users():
raise HTTPException(status_code=404, detail=f"User not found: {user}")
entries = tool_audit.read_recent(user, days=days, limit=10000)
else:
entries = tool_audit.read_recent_all_users(days=days, limit=10000)
tool_counts: Counter = Counter()
status_counts: Counter = Counter()
user_counts: Counter = Counter()
for e in entries:
tool_counts[e.get("tool", "?")] += 1
status_counts[e.get("status", "?")] += 1
user_counts[e.get("user", "?")] += 1
return {
"total": len(entries),
"days": days,
"by_tool": dict(tool_counts.most_common()),
"by_status": dict(status_counts),
"by_user": dict(user_counts.most_common()),
}