From 8d4aa4094c2af3e4c45d02616d49eeff2ecd5e01 Mon Sep 17 00:00:00 2001 From: Scott Idem Date: Sat, 2 May 2026 20:38:31 -0400 Subject: [PATCH] feat: usage tracking + knowledge import script MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - usage_tracker.py: daily token/call buckets per user (home/{user}/usage.json) - Hook into local backend (OpenAI usage field) and Gemini API (usage_metadata) - Claude/Gemini CLI backends produce no structured token data and are not tracked - Fix CLAUDE.md stale tool count (27 → 39) and refresh tool list - scripts/import_knowledge.py: walk markdown dirs, chunk by H2, call local LLM for summaries, create AE journal entries with path-derived tags; resumable via state file; --dry-run and --limit flags for safe testing Co-Authored-By: Claude Sonnet 4.6 --- CLAUDE.md | 9 +- cortex/llm_client.py | 13 ++ cortex/orchestrator_engine.py | 23 ++ cortex/usage_tracker.py | 75 +++++++ scripts/import_knowledge.py | 407 ++++++++++++++++++++++++++++++++++ 5 files changed, 524 insertions(+), 3 deletions(-) create mode 100644 cortex/usage_tracker.py create mode 100755 scripts/import_knowledge.py diff --git a/CLAUDE.md b/CLAUDE.md index a27cac2..6ee8a99 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -226,10 +226,13 @@ Cortex is running and stable. All channels are live: Active users: scott (inara, developer), holly (tina), brian (wintermute) -**27 orchestrator tools:** web_search, file_read, shell_exec, claude_allow_dir, +**39 orchestrator tools:** web_search, http_fetch, +file_read/list/write, shell_exec, claude_allow_dir, +cortex_restart/logs/status/update, task_list/create/update/complete, cron_list/add/remove/toggle, -reminders_add/list/clear, scratch_read/write/append/clear, -ae_journal_list/search/entry_create/entry_update/entry_disable/entry_append/entry_prepend, +reminders_add/list/remove/clear, scratch_read/write/append/clear, +email_send, nc_talk_send, +ae_journal_list/search/entries_list/entry_read/entry_create/entry_update/entry_disable/entry_append/entry_prepend, ae_task_list. See `documentation/TODO__Agents.md` for the active task list. diff --git a/cortex/llm_client.py b/cortex/llm_client.py index 28c19d9..0b2be5b 100644 --- a/cortex/llm_client.py +++ b/cortex/llm_client.py @@ -218,6 +218,19 @@ async def _local(system_prompt: str, messages: list[dict], model_cfg: dict | Non text = data["choices"][0]["message"]["content"] if not text or not text.strip(): raise RuntimeError("Local model returned an empty response") + + usage = data.get("usage") or {} + if usage.get("prompt_tokens") is not None: + import usage_tracker + from persona import _user + asyncio.create_task(usage_tracker.record( + username=_user.get(), + backend="local", + model_name=model, + prompt_tokens=usage.get("prompt_tokens", 0), + completion_tokens=usage.get("completion_tokens", 0), + )) + return text.strip() diff --git a/cortex/orchestrator_engine.py b/cortex/orchestrator_engine.py index 3f7037a..bfde052 100644 --- a/cortex/orchestrator_engine.py +++ b/cortex/orchestrator_engine.py @@ -26,6 +26,8 @@ from google.genai import types from config import settings from llm_client import complete from tools import TOOL_DECLARATIONS, call_tool, get_tools_for_role, CONFIRM_REQUIRED +import usage_tracker +from persona import _user logger = logging.getLogger(__name__) @@ -44,6 +46,25 @@ Keep your summary factual and complete. Include relevant URLs, data, and specifi If no tools are needed, return an empty summary.""" +def _track_gemini_usage(response, model_name: str | None) -> None: + meta = getattr(response, "usage_metadata", None) + if not meta: + return + prompt_tokens = getattr(meta, "prompt_token_count", 0) or 0 + completion_tokens = getattr(meta, "candidates_token_count", 0) or 0 + if prompt_tokens or completion_tokens: + try: + asyncio.create_task(usage_tracker.record( + username=_user.get(), + backend="gemini_api", + model_name=model_name or settings.orchestrator_model, + prompt_tokens=prompt_tokens, + completion_tokens=completion_tokens, + )) + except Exception: + pass + + @dataclass class OrchestrateCheckpoint: """Saved execution state for a job paused at a confirmation gate.""" @@ -285,6 +306,7 @@ async def _run_from_contents( system_instruction=_ORCHESTRATOR_SYSTEM, ), ) + _track_gemini_usage(response, model_name) candidate = response.candidates[0] parts = candidate.content.parts if candidate.content else [] @@ -348,6 +370,7 @@ async def _run_from_contents( system_instruction=_ORCHESTRATOR_SYSTEM, ), ) + _track_gemini_usage(conf_response, model_name) conf_parts = ( conf_response.candidates[0].content.parts if conf_response.candidates and conf_response.candidates[0].content diff --git a/cortex/usage_tracker.py b/cortex/usage_tracker.py new file mode 100644 index 0000000..6953eab --- /dev/null +++ b/cortex/usage_tracker.py @@ -0,0 +1,75 @@ +""" +API usage and token tracking. + +Writes daily buckets to home/{username}/usage.json: + + { + "2026-05-01": { + "gemini_api/gemini-2.0-flash": {"calls": 3, "prompt_tokens": 8400, "completion_tokens": 520}, + "local/llama3.2:latest": {"calls": 2, "prompt_tokens": 1200, "completion_tokens": 310} + } + } + +Claude CLI and Gemini CLI backends produce no structured token data and are not tracked. +""" + +import asyncio +import json +import logging +from datetime import date +from pathlib import Path + +from config import settings + +logger = logging.getLogger(__name__) + +_LOCK = asyncio.Lock() + + +def _usage_path(username: str) -> Path: + return settings.home_root() / username / "usage.json" + + +async def record( + username: str, + backend: str, + model_name: str, + prompt_tokens: int, + completion_tokens: int, +) -> None: + """Append one call's token counts to the daily usage log for this user. + + backend — "gemini_api" | "local" + model_name — the exact model string (e.g. "gemini-2.0-flash", "llama3.2:latest") + """ + path = _usage_path(username) + today = date.today().isoformat() + key = f"{backend}/{model_name}" + + async with _LOCK: + try: + data: dict = json.loads(path.read_text()) if path.exists() else {} + except Exception: + data = {} + + entry = data.setdefault(today, {}).setdefault( + key, {"calls": 0, "prompt_tokens": 0, "completion_tokens": 0} + ) + entry["calls"] += 1 + entry["prompt_tokens"] += prompt_tokens + entry["completion_tokens"] += completion_tokens + + try: + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text(json.dumps(data, indent=2)) + except Exception as e: + logger.warning("Failed to write usage data to %s: %s", path, e) + + +def read_usage(username: str) -> dict: + """Return the full usage dict for this user. Empty dict if no file yet.""" + path = _usage_path(username) + try: + return json.loads(path.read_text()) if path.exists() else {} + except Exception: + return {} diff --git a/scripts/import_knowledge.py b/scripts/import_knowledge.py new file mode 100755 index 0000000..6f40c46 --- /dev/null +++ b/scripts/import_knowledge.py @@ -0,0 +1,407 @@ +#!/usr/bin/env python3 +""" +Knowledge base importer — walks a markdown directory and creates AE journal entries. + +Uses a local LLM to generate a 1-2 sentence summary for each chunk. +Tracks progress in a state file so interrupted runs can be resumed. + +Usage: + python import_knowledge.py --source ~/DgrZone_Nextcloud --journal + python import_knowledge.py --source ~/OSIT_Nextcloud --journal --dry-run + python import_knowledge.py --source ~/DgrZone_Nextcloud/Notes --journal --limit 5 + +Reads credentials from cortex/.env (relative to this script's parent directory) +or from environment variables: + AE_API_URL, AE_API_KEY, AE_ACCOUNT_ID + LOCAL_API_URL, LOCAL_API_KEY, LOCAL_MODEL +""" + +import argparse +import hashlib +import json +import os +import re +import sys +import time +from datetime import datetime, timezone +from pathlib import Path + +# ── Bootstrap: load .env from cortex/.env if not already set ───────────────── + +_ENV_PATH = Path(__file__).parent.parent / "cortex" / ".env" + +def _load_env(path: Path) -> None: + if not path.exists(): + return + for line in path.read_text().splitlines(): + line = line.strip() + if not line or line.startswith("#") or "=" not in line: + continue + key, _, val = line.partition("=") + key = key.strip() + if key not in os.environ: + os.environ[key] = val.strip().strip('"').strip("'") + +_load_env(_ENV_PATH) + +# ── Constants ───────────────────────────────────────────────────────────────── + +# Dirs to skip regardless of source +_DEFAULT_EXCLUDE = { + "temp", "Temp", "tmp", "Tmp", "test", "Test", + "Temporary Share", "Test Share", ".obsidian", "media", "Photos", +} + +# Max characters per journal entry chunk +_DEFAULT_MAX_CHUNK = 8_000 + +# Delay between API calls (seconds) to avoid hammering the LLM +_LLM_DELAY = 1.0 +_AE_DELAY = 0.3 + + +# ── Path / tag utilities ────────────────────────────────────────────────────── + +def _path_tags(source_root: Path, file_path: Path) -> list[str]: + """Derive tags from path components relative to the source root.""" + rel = file_path.relative_to(source_root) + parts = list(rel.parts[:-1]) # exclude filename itself + tags = [] + for part in parts: + cleaned = re.sub(r"[^a-zA-Z0-9 ]", " ", part).strip().lower() + words = cleaned.split() + tags.extend(w for w in words if len(w) > 2) + # Add source root name as top-level tag + tags.insert(0, source_root.name.lower().replace("_nextcloud", "").replace("_", "")) + return list(dict.fromkeys(tags)) # deduplicate preserving order + + +def _file_title(file_path: Path, content: str) -> str: + """Extract the first H1 heading or fall back to the filename stem.""" + m = re.search(r"^# (.+)$", content, re.MULTILINE) + if m: + return m.group(1).strip() + return file_path.stem.replace("_", " ").replace("-", " ") + + +# ── Chunking ────────────────────────────────────────────────────────────────── + +def _chunk_content( + file_path: Path, + content: str, + source_root: Path, + max_size: int, +) -> list[dict]: + """ + Returns a list of chunk dicts: + { + "chunk_key": str, # unique ID for state tracking + "title": str, + "content": str, + "tags": list[str], + "path": str, + } + """ + base_title = _file_title(file_path, content) + base_tags = _path_tags(source_root, file_path) + rel_path = str(file_path.relative_to(source_root)) + + if len(content) <= max_size: + h = hashlib.sha1(content.encode()).hexdigest()[:12] + return [{ + "chunk_key": f"{rel_path}#{h}", + "title": base_title, + "content": content, + "tags": base_tags, + "path": rel_path, + }] + + # Split by H2 headings + sections = re.split(r"^(## .+)$", content, flags=re.MULTILINE) + # sections alternates: [preamble, heading, body, heading, body, ...] + + chunks = [] + preamble = sections[0].strip() + pairs = list(zip(sections[1::2], sections[2::2])) + + if not pairs: + # No H2 found — hard split by max_size + for i in range(0, len(content), max_size): + part = content[i:i + max_size] + h = hashlib.sha1(part.encode()).hexdigest()[:12] + chunks.append({ + "chunk_key": f"{rel_path}#part{i}_{h}", + "title": f"{base_title} (part {i // max_size + 1})", + "content": part, + "tags": base_tags, + "path": rel_path, + }) + return chunks + + for heading, body in pairs: + section_title = heading.lstrip("#").strip() + section_content = f"{heading}\n{body}".strip() + + # Prepend preamble to first section if it has meaningful content + if preamble and chunks == []: + section_content = f"{preamble}\n\n{section_content}" + + # If section itself is too big, hard split it + if len(section_content) > max_size: + for i in range(0, len(section_content), max_size): + part = section_content[i:i + max_size] + h = hashlib.sha1(part.encode()).hexdigest()[:12] + chunks.append({ + "chunk_key": f"{rel_path}#{section_title}#part{i}_{h}", + "title": f"{base_title} — {section_title} (part {i // max_size + 1})", + "content": part, + "tags": base_tags + [section_title.lower()[:40]], + "path": rel_path, + }) + else: + h = hashlib.sha1(section_content.encode()).hexdigest()[:12] + chunks.append({ + "chunk_key": f"{rel_path}#{section_title}#{h}", + "title": f"{base_title} — {section_title}", + "content": section_content, + "tags": base_tags + [section_title.lower()[:40]], + "path": rel_path, + }) + + return chunks + + +# ── LLM summarization ───────────────────────────────────────────────────────── + +def _summarize(content: str, llm_url: str, llm_key: str, llm_model: str) -> str: + """Call the local LLM to generate a 1-2 sentence summary. Returns "" on failure.""" + import urllib.request + + # Truncate for prompt economy + snippet = content[:3000] if len(content) > 3000 else content + + prompt = ( + "Summarize the following note in 1-2 sentences. " + "Be specific and factual. Do not start with 'This note' or 'This document'.\n\n" + f"{snippet}" + ) + + payload = json.dumps({ + "model": llm_model, + "messages": [{"role": "user", "content": prompt}], + "max_tokens": 150, + "temperature": 0.3, + }).encode() + + headers = {"Content-Type": "application/json"} + if llm_key: + headers["Authorization"] = f"Bearer {llm_key}" + + # Try Open WebUI path first, fall back to standard OpenAI path + for path in ("/api/chat/completions", "/chat/completions"): + url = llm_url.rstrip("/") + path + req = urllib.request.Request(url, data=payload, headers=headers, method="POST") + try: + with urllib.request.urlopen(req, timeout=60) as resp: + data = json.loads(resp.read()) + return data["choices"][0]["message"]["content"].strip() + except Exception: + continue + + return "" + + +# ── AE API ──────────────────────────────────────────────────────────────────── + +def _ae_create_entry( + journal_id: str, + title: str, + content: str, + summary: str, + tags: list[str], + ae_url: str, + ae_key: str, + ae_account: str, +) -> str: + """POST to AE API. Returns the new entry's id_random or raises on error.""" + import urllib.request + + url = f"{ae_url.rstrip('/')}/v3/crud/journal/{journal_id}/journal_entry/" + payload = json.dumps({ + "name": title, + "content": content, + "summary": summary, + "tags": tags, + }).encode() + headers = { + "Content-Type": "application/json", + "x-aether-api-key": ae_key, + "x-account-id": ae_account, + } + req = urllib.request.Request(url, data=payload, headers=headers, method="POST") + with urllib.request.urlopen(req, timeout=30) as resp: + data = json.loads(resp.read()) + + return ( + data.get("data", {}).get("journal_entry_id") + or data.get("data", {}).get("id_random") + or data.get("id_random") + or "?" + ) + + +# ── State file ──────────────────────────────────────────────────────────────── + +def _load_state(state_file: Path) -> dict: + if state_file.exists(): + try: + return json.loads(state_file.read_text()) + except Exception: + pass + return {"imported": {}} + + +def _save_state(state_file: Path, state: dict) -> None: + state_file.write_text(json.dumps(state, indent=2)) + + +# ── File walker ─────────────────────────────────────────────────────────────── + +def _walk_markdown(source: Path, exclude: set[str]) -> list[Path]: + files = [] + for f in sorted(source.rglob("*.md")): + if any(part in exclude for part in f.parts): + continue + if f.stat().st_size < 50: # skip near-empty files + continue + files.append(f) + return files + + +# ── Main ────────────────────────────────────────────────────────────────────── + +def main() -> None: + parser = argparse.ArgumentParser(description="Import markdown notes into AE Journal") + parser.add_argument("--source", required=True, help="Root directory to import from") + parser.add_argument("--journal", required=True, help="Target AE journal id_random") + parser.add_argument("--dry-run", action="store_true", help="Preview without creating entries") + parser.add_argument("--limit", type=int, default=0, help="Stop after N chunks (0 = unlimited)") + parser.add_argument("--max-chunk", type=int, default=_DEFAULT_MAX_CHUNK, help="Max chars per chunk") + parser.add_argument("--exclude", default="", help="Extra dir names to skip (comma-separated)") + parser.add_argument("--state-file", default="import_state.json", help="State tracking file") + parser.add_argument("--no-llm", action="store_true", help="Skip LLM summarization (faster)") + parser.add_argument("--ae-url", default=os.environ.get("AE_API_URL", ""), help="AE API URL") + parser.add_argument("--ae-key", default=os.environ.get("AE_API_KEY", ""), help="AE API key") + parser.add_argument("--ae-account", default=os.environ.get("AE_ACCOUNT_ID", ""), help="AE account ID") + parser.add_argument("--llm-url", default=os.environ.get("LOCAL_API_URL", ""), help="Local LLM API URL") + parser.add_argument("--llm-key", default=os.environ.get("LOCAL_API_KEY", ""), help="Local LLM API key") + parser.add_argument("--llm-model", default=os.environ.get("LOCAL_MODEL", ""), help="Local LLM model name") + args = parser.parse_args() + + source = Path(args.source).expanduser().resolve() + if not source.exists(): + print(f"ERROR: source directory not found: {source}", file=sys.stderr) + sys.exit(1) + + if not args.dry_run: + if not args.ae_url or not args.ae_key or not args.ae_account: + print("ERROR: AE_API_URL, AE_API_KEY, and AE_ACCOUNT_ID are required (or use --dry-run)", file=sys.stderr) + sys.exit(1) + + use_llm = not args.no_llm and bool(args.llm_url) and bool(args.llm_model) + if not use_llm and not args.no_llm: + print("NOTE: LLM summarization disabled (LOCAL_API_URL or LOCAL_MODEL not set). Use --no-llm to silence this.") + + exclude = _DEFAULT_EXCLUDE | {d.strip() for d in args.exclude.split(",") if d.strip()} + state_file = Path(args.state_file) + state = _load_state(state_file) + + print(f"Source: {source}") + print(f"Journal: {args.journal}") + print(f"Dry run: {args.dry_run}") + print(f"LLM: {'enabled (' + args.llm_model + ')' if use_llm else 'disabled'}") + print(f"State file: {state_file} ({len(state['imported'])} already imported)") + print() + + files = _walk_markdown(source, exclude) + print(f"Found {len(files)} markdown files") + + created = 0 + skipped = 0 + errors = 0 + total_chunks = 0 + + for file_path in files: + try: + content = file_path.read_text(encoding="utf-8", errors="replace") + except Exception as e: + print(f" SKIP (read error): {file_path.name} — {e}") + errors += 1 + continue + + chunks = _chunk_content(file_path, content, source, args.max_chunk) + total_chunks += len(chunks) + + for chunk in chunks: + key = chunk["chunk_key"] + + if key in state["imported"]: + skipped += 1 + continue + + print(f" {'[DRY RUN] ' if args.dry_run else ''}IMPORT: {chunk['title'][:70]}") + + summary = "" + if use_llm: + try: + summary = _summarize(chunk["content"], args.llm_url, args.llm_key, args.llm_model) + time.sleep(_LLM_DELAY) + except Exception as e: + print(f" LLM error (continuing without summary): {e}") + + if not args.dry_run: + try: + entry_id = _ae_create_entry( + journal_id=args.journal, + title=chunk["title"], + content=chunk["content"], + summary=summary, + tags=chunk["tags"], + ae_url=args.ae_url, + ae_key=args.ae_key, + ae_account=args.ae_account, + ) + state["imported"][key] = { + "entry_id": entry_id, + "imported_at": datetime.now(timezone.utc).isoformat(), + "path": chunk["path"], + "title": chunk["title"], + } + _save_state(state_file, state) + time.sleep(_AE_DELAY) + created += 1 + except Exception as e: + print(f" AE API error: {e}") + errors += 1 + else: + created += 1 + + if args.limit and created >= args.limit: + print(f"\nReached --limit {args.limit}. Stopping.") + _print_summary(created, skipped, errors, total_chunks, args.dry_run) + return + + _print_summary(created, skipped, errors, total_chunks, args.dry_run) + + +def _print_summary(created: int, skipped: int, errors: int, total: int, dry_run: bool) -> None: + label = "Would create" if dry_run else "Created" + print(f"\n{'=' * 50}") + print(f"{label}: {created} entries") + print(f"Skipped (already imported): {skipped}") + print(f"Errors: {errors}") + print(f"Total chunks processed: {total}") + + +if __name__ == "__main__": + main()