diff --git a/cortex/routers/usage.py b/cortex/routers/usage.py new file mode 100644 index 0000000..966f580 --- /dev/null +++ b/cortex/routers/usage.py @@ -0,0 +1,104 @@ +""" +Usage / token-tracking endpoints. + +Self-service (any authenticated user, own data): + GET /api/usage → full usage dict {date: {model_key: {calls, prompt_tokens, completion_tokens}}} + GET /api/usage/summary → aggregate totals per model key, with friendly labels resolved from registry + +Admin-only (cross-user aggregation): + GET /api/usage/all → summary for every user {username: summary_dict} +""" +import jwt +from fastapi import APIRouter, HTTPException, Request + +from auth_utils import COOKIE_NAME, decode_token, get_user_role +from persona import list_users +import model_registry +import usage_tracker + +router = APIRouter(prefix="/api/usage") + + +def _session_user(request: Request) -> str: + token = request.cookies.get(COOKIE_NAME) + if not token: + raise HTTPException(status_code=401, detail="Not authenticated") + try: + return decode_token(token) + except jwt.InvalidTokenError: + raise HTTPException(status_code=401, detail="Invalid session") + + +def _build_label_map(username: str) -> dict[str, str]: + """Build a map from usage key (backend/model_name) → registered label.""" + label_map: dict[str, str] = {} + try: + for m in model_registry.get_all_models(username): + model_name = m.get("model_name", "") + label = m.get("label", "") + host_type = m.get("host_type", "") + if not model_name or not label: + continue + # local models: key is "local/{model_name}" + if host_type in ("openwebui", "ollama", "openai_compatible"): + label_map[f"local/{model_name}"] = label + # cloud Gemini: key is "gemini_api/{model_name}" + elif host_type == "google": + label_map[f"gemini_api/{model_name}"] = label + except Exception: + pass + return label_map + + +def _summarize(data: dict, label_map: dict[str, str] | None = None) -> list[dict]: + """Collapse date-keyed usage dict into per-model totals, sorted by total tokens desc.""" + totals: dict[str, dict] = {} + for _date, models in data.items(): + for key, counts in models.items(): + t = totals.setdefault(key, {"calls": 0, "prompt_tokens": 0, "completion_tokens": 0}) + t["calls"] += counts.get("calls", 0) + t["prompt_tokens"] += counts.get("prompt_tokens", 0) + t["completion_tokens"] += counts.get("completion_tokens", 0) + + result = [] + for key, counts in totals.items(): + entry = { + "key": key, + "label": (label_map or {}).get(key) or key, + "calls": counts["calls"], + "prompt_tokens": counts["prompt_tokens"], + "completion_tokens": counts["completion_tokens"], + "total_tokens": counts["prompt_tokens"] + counts["completion_tokens"], + } + result.append(entry) + + result.sort(key=lambda x: x["total_tokens"], reverse=True) + return result + + +@router.get("") +async def get_usage(request: Request) -> dict: + """Return the raw daily usage log for the authenticated user.""" + username = _session_user(request) + return usage_tracker.read_usage(username) + + +@router.get("/summary") +async def get_usage_summary(request: Request) -> list: + """Return per-model totals (all time) for the authenticated user, with friendly labels.""" + username = _session_user(request) + label_map = _build_label_map(username) + return _summarize(usage_tracker.read_usage(username), label_map) + + +@router.get("/all") +async def get_all_usage(request: Request) -> dict: + """Admin: return per-model summary for every user.""" + username = _session_user(request) + if get_user_role(username) != "admin": + raise HTTPException(status_code=403, detail="Admin access required") + result = {} + for user in list_users(): + label_map = _build_label_map(user) + result[user] = _summarize(usage_tracker.read_usage(user), label_map) + return result diff --git a/cortex/tools/agent_notes.py b/cortex/tools/agent_notes.py new file mode 100644 index 0000000..9bde55c --- /dev/null +++ b/cortex/tools/agent_notes.py @@ -0,0 +1,155 @@ +""" +Agent private notes — AGENT_NOTES.md. + +A persistent notepad only the orchestrator can write to. The file itself is +never exposed in the Files panel or loaded into user-facing context tiers. +Up to 3 rolling backups are kept automatically before each write so past +versions can be reviewed. + +Use for: observations about the user's patterns, working hypotheses, +long-running goals, things to remember across sessions that shouldn't +be part of the distilled memory visible to the user. +""" + +import asyncio +from datetime import datetime, timezone +from pathlib import Path + +from google.genai import types +from persona import persona_path + + +_FILENAME = "AGENT_NOTES.md" +_N_BACKUPS = 3 + + +def _notes_path() -> Path: + return persona_path() / _FILENAME + + +def _now_label() -> str: + return datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M UTC") + + +def _rotate(path: Path) -> None: + """Rotate up to _N_BACKUPS rolling backups before a write.""" + if not path.exists(): + return + for i in range(_N_BACKUPS, 1, -1): + older = path.parent / f"{path.stem}.bak{i}.md" + newer = path.parent / f"{path.stem}.bak{i - 1}.md" + if newer.exists(): + older.write_text(newer.read_text()) + bak1 = path.parent / f"{path.stem}.bak1.md" + bak1.write_text(path.read_text()) + + +# ── Sync implementations ──────────────────────────────────────────────────── + +def _agent_notes_read() -> str: + p = _notes_path() + if not p.exists() or not p.read_text().strip(): + return "Agent notes are empty." + return p.read_text() + + +def _agent_notes_write(content: str) -> str: + p = _notes_path() + _rotate(p) + p.write_text(content.rstrip() + "\n") + return "Agent notes updated." + + +def _agent_notes_append(content: str, heading: str | None = None) -> str: + p = _notes_path() + _rotate(p) + existing = p.read_text() if p.exists() else "" + label = heading or _now_label() + section = f"\n## {label}\n\n{content.strip()}\n" + p.write_text(existing.rstrip() + "\n" + section) + return f"Appended to agent notes: {label}" + + +def _agent_notes_clear() -> str: + p = _notes_path() + _rotate(p) + p.write_text("") + return "Agent notes cleared." + + +# ── Async wrappers ─────────────────────────────────────────────────────────── + +async def agent_notes_read() -> str: + return await asyncio.to_thread(_agent_notes_read) + +async def agent_notes_write(content: str) -> str: + return await asyncio.to_thread(_agent_notes_write, content) + +async def agent_notes_append(content: str, heading: str | None = None) -> str: + return await asyncio.to_thread(_agent_notes_append, content, heading) + +async def agent_notes_clear() -> str: + return await asyncio.to_thread(_agent_notes_clear) + + +# ── Gemini FunctionDeclarations ────────────────────────────────────────────── + +DECLARATIONS = [ + types.FunctionDeclaration( + name="agent_notes_read", + description=( + "Read your private agent notes — a persistent notepad only you can write to. " + "Use this to recall observations, working hypotheses, long-running goals, or " + "anything you want to remember across sessions without surfacing it to the user. " + "This file is never shown in the user's Files panel." + ), + parameters=types.Schema(type=types.Type.OBJECT, properties={}), + ), + types.FunctionDeclaration( + name="agent_notes_write", + description=( + "Replace your private agent notes with new content. " + "A backup is saved automatically before writing. " + "Use agent_notes_append to add without replacing." + ), + parameters=types.Schema( + type=types.Type.OBJECT, + properties={ + "content": types.Schema( + type=types.Type.STRING, + description="The new notes content (markdown supported).", + ), + }, + required=["content"], + ), + ), + types.FunctionDeclaration( + name="agent_notes_append", + description=( + "Add a new section to your private agent notes without replacing existing content. " + "A backup is saved automatically before writing. " + "Each section gets a UTC timestamp heading unless you supply one." + ), + parameters=types.Schema( + type=types.Type.OBJECT, + properties={ + "content": types.Schema( + type=types.Type.STRING, + description="The content to append (markdown supported).", + ), + "heading": types.Schema( + type=types.Type.STRING, + description="Optional section heading. Defaults to current UTC timestamp.", + ), + }, + required=["content"], + ), + ), + types.FunctionDeclaration( + name="agent_notes_clear", + description=( + "Erase all private agent notes. A backup is saved automatically before clearing." + ), + parameters=types.Schema(type=types.Type.OBJECT, properties={}), + ), +]