feat: usage tracking + knowledge import script

- usage_tracker.py: daily token/call buckets per user (home/{user}/usage.json)
- Hook into local backend (OpenAI usage field) and Gemini API (usage_metadata)
- Claude/Gemini CLI backends produce no structured token data and are not tracked
- Fix CLAUDE.md stale tool count (27 → 39) and refresh tool list
- scripts/import_knowledge.py: walk markdown dirs, chunk by H2, call local LLM
  for summaries, create AE journal entries with path-derived tags; resumable via
  state file; --dry-run and --limit flags for safe testing

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Scott Idem
2026-05-02 20:38:31 -04:00
parent eab92d876d
commit 8d4aa4094c
5 changed files with 524 additions and 3 deletions

407
scripts/import_knowledge.py Executable file
View File

@@ -0,0 +1,407 @@
#!/usr/bin/env python3
"""
Knowledge base importer — walks a markdown directory and creates AE journal entries.
Uses a local LLM to generate a 1-2 sentence summary for each chunk.
Tracks progress in a state file so interrupted runs can be resumed.
Usage:
python import_knowledge.py --source ~/DgrZone_Nextcloud --journal <journal_id>
python import_knowledge.py --source ~/OSIT_Nextcloud --journal <journal_id> --dry-run
python import_knowledge.py --source ~/DgrZone_Nextcloud/Notes --journal <journal_id> --limit 5
Reads credentials from cortex/.env (relative to this script's parent directory)
or from environment variables:
AE_API_URL, AE_API_KEY, AE_ACCOUNT_ID
LOCAL_API_URL, LOCAL_API_KEY, LOCAL_MODEL
"""
import argparse
import hashlib
import json
import os
import re
import sys
import time
from datetime import datetime, timezone
from pathlib import Path
# ── Bootstrap: load .env from cortex/.env if not already set ─────────────────
_ENV_PATH = Path(__file__).parent.parent / "cortex" / ".env"
def _load_env(path: Path) -> None:
if not path.exists():
return
for line in path.read_text().splitlines():
line = line.strip()
if not line or line.startswith("#") or "=" not in line:
continue
key, _, val = line.partition("=")
key = key.strip()
if key not in os.environ:
os.environ[key] = val.strip().strip('"').strip("'")
_load_env(_ENV_PATH)
# ── Constants ─────────────────────────────────────────────────────────────────
# Dirs to skip regardless of source
_DEFAULT_EXCLUDE = {
"temp", "Temp", "tmp", "Tmp", "test", "Test",
"Temporary Share", "Test Share", ".obsidian", "media", "Photos",
}
# Max characters per journal entry chunk
_DEFAULT_MAX_CHUNK = 8_000
# Delay between API calls (seconds) to avoid hammering the LLM
_LLM_DELAY = 1.0
_AE_DELAY = 0.3
# ── Path / tag utilities ──────────────────────────────────────────────────────
def _path_tags(source_root: Path, file_path: Path) -> list[str]:
"""Derive tags from path components relative to the source root."""
rel = file_path.relative_to(source_root)
parts = list(rel.parts[:-1]) # exclude filename itself
tags = []
for part in parts:
cleaned = re.sub(r"[^a-zA-Z0-9 ]", " ", part).strip().lower()
words = cleaned.split()
tags.extend(w for w in words if len(w) > 2)
# Add source root name as top-level tag
tags.insert(0, source_root.name.lower().replace("_nextcloud", "").replace("_", ""))
return list(dict.fromkeys(tags)) # deduplicate preserving order
def _file_title(file_path: Path, content: str) -> str:
"""Extract the first H1 heading or fall back to the filename stem."""
m = re.search(r"^# (.+)$", content, re.MULTILINE)
if m:
return m.group(1).strip()
return file_path.stem.replace("_", " ").replace("-", " ")
# ── Chunking ──────────────────────────────────────────────────────────────────
def _chunk_content(
file_path: Path,
content: str,
source_root: Path,
max_size: int,
) -> list[dict]:
"""
Returns a list of chunk dicts:
{
"chunk_key": str, # unique ID for state tracking
"title": str,
"content": str,
"tags": list[str],
"path": str,
}
"""
base_title = _file_title(file_path, content)
base_tags = _path_tags(source_root, file_path)
rel_path = str(file_path.relative_to(source_root))
if len(content) <= max_size:
h = hashlib.sha1(content.encode()).hexdigest()[:12]
return [{
"chunk_key": f"{rel_path}#{h}",
"title": base_title,
"content": content,
"tags": base_tags,
"path": rel_path,
}]
# Split by H2 headings
sections = re.split(r"^(## .+)$", content, flags=re.MULTILINE)
# sections alternates: [preamble, heading, body, heading, body, ...]
chunks = []
preamble = sections[0].strip()
pairs = list(zip(sections[1::2], sections[2::2]))
if not pairs:
# No H2 found — hard split by max_size
for i in range(0, len(content), max_size):
part = content[i:i + max_size]
h = hashlib.sha1(part.encode()).hexdigest()[:12]
chunks.append({
"chunk_key": f"{rel_path}#part{i}_{h}",
"title": f"{base_title} (part {i // max_size + 1})",
"content": part,
"tags": base_tags,
"path": rel_path,
})
return chunks
for heading, body in pairs:
section_title = heading.lstrip("#").strip()
section_content = f"{heading}\n{body}".strip()
# Prepend preamble to first section if it has meaningful content
if preamble and chunks == []:
section_content = f"{preamble}\n\n{section_content}"
# If section itself is too big, hard split it
if len(section_content) > max_size:
for i in range(0, len(section_content), max_size):
part = section_content[i:i + max_size]
h = hashlib.sha1(part.encode()).hexdigest()[:12]
chunks.append({
"chunk_key": f"{rel_path}#{section_title}#part{i}_{h}",
"title": f"{base_title}{section_title} (part {i // max_size + 1})",
"content": part,
"tags": base_tags + [section_title.lower()[:40]],
"path": rel_path,
})
else:
h = hashlib.sha1(section_content.encode()).hexdigest()[:12]
chunks.append({
"chunk_key": f"{rel_path}#{section_title}#{h}",
"title": f"{base_title}{section_title}",
"content": section_content,
"tags": base_tags + [section_title.lower()[:40]],
"path": rel_path,
})
return chunks
# ── LLM summarization ─────────────────────────────────────────────────────────
def _summarize(content: str, llm_url: str, llm_key: str, llm_model: str) -> str:
"""Call the local LLM to generate a 1-2 sentence summary. Returns "" on failure."""
import urllib.request
# Truncate for prompt economy
snippet = content[:3000] if len(content) > 3000 else content
prompt = (
"Summarize the following note in 1-2 sentences. "
"Be specific and factual. Do not start with 'This note' or 'This document'.\n\n"
f"{snippet}"
)
payload = json.dumps({
"model": llm_model,
"messages": [{"role": "user", "content": prompt}],
"max_tokens": 150,
"temperature": 0.3,
}).encode()
headers = {"Content-Type": "application/json"}
if llm_key:
headers["Authorization"] = f"Bearer {llm_key}"
# Try Open WebUI path first, fall back to standard OpenAI path
for path in ("/api/chat/completions", "/chat/completions"):
url = llm_url.rstrip("/") + path
req = urllib.request.Request(url, data=payload, headers=headers, method="POST")
try:
with urllib.request.urlopen(req, timeout=60) as resp:
data = json.loads(resp.read())
return data["choices"][0]["message"]["content"].strip()
except Exception:
continue
return ""
# ── AE API ────────────────────────────────────────────────────────────────────
def _ae_create_entry(
journal_id: str,
title: str,
content: str,
summary: str,
tags: list[str],
ae_url: str,
ae_key: str,
ae_account: str,
) -> str:
"""POST to AE API. Returns the new entry's id_random or raises on error."""
import urllib.request
url = f"{ae_url.rstrip('/')}/v3/crud/journal/{journal_id}/journal_entry/"
payload = json.dumps({
"name": title,
"content": content,
"summary": summary,
"tags": tags,
}).encode()
headers = {
"Content-Type": "application/json",
"x-aether-api-key": ae_key,
"x-account-id": ae_account,
}
req = urllib.request.Request(url, data=payload, headers=headers, method="POST")
with urllib.request.urlopen(req, timeout=30) as resp:
data = json.loads(resp.read())
return (
data.get("data", {}).get("journal_entry_id")
or data.get("data", {}).get("id_random")
or data.get("id_random")
or "?"
)
# ── State file ────────────────────────────────────────────────────────────────
def _load_state(state_file: Path) -> dict:
if state_file.exists():
try:
return json.loads(state_file.read_text())
except Exception:
pass
return {"imported": {}}
def _save_state(state_file: Path, state: dict) -> None:
state_file.write_text(json.dumps(state, indent=2))
# ── File walker ───────────────────────────────────────────────────────────────
def _walk_markdown(source: Path, exclude: set[str]) -> list[Path]:
files = []
for f in sorted(source.rglob("*.md")):
if any(part in exclude for part in f.parts):
continue
if f.stat().st_size < 50: # skip near-empty files
continue
files.append(f)
return files
# ── Main ──────────────────────────────────────────────────────────────────────
def main() -> None:
parser = argparse.ArgumentParser(description="Import markdown notes into AE Journal")
parser.add_argument("--source", required=True, help="Root directory to import from")
parser.add_argument("--journal", required=True, help="Target AE journal id_random")
parser.add_argument("--dry-run", action="store_true", help="Preview without creating entries")
parser.add_argument("--limit", type=int, default=0, help="Stop after N chunks (0 = unlimited)")
parser.add_argument("--max-chunk", type=int, default=_DEFAULT_MAX_CHUNK, help="Max chars per chunk")
parser.add_argument("--exclude", default="", help="Extra dir names to skip (comma-separated)")
parser.add_argument("--state-file", default="import_state.json", help="State tracking file")
parser.add_argument("--no-llm", action="store_true", help="Skip LLM summarization (faster)")
parser.add_argument("--ae-url", default=os.environ.get("AE_API_URL", ""), help="AE API URL")
parser.add_argument("--ae-key", default=os.environ.get("AE_API_KEY", ""), help="AE API key")
parser.add_argument("--ae-account", default=os.environ.get("AE_ACCOUNT_ID", ""), help="AE account ID")
parser.add_argument("--llm-url", default=os.environ.get("LOCAL_API_URL", ""), help="Local LLM API URL")
parser.add_argument("--llm-key", default=os.environ.get("LOCAL_API_KEY", ""), help="Local LLM API key")
parser.add_argument("--llm-model", default=os.environ.get("LOCAL_MODEL", ""), help="Local LLM model name")
args = parser.parse_args()
source = Path(args.source).expanduser().resolve()
if not source.exists():
print(f"ERROR: source directory not found: {source}", file=sys.stderr)
sys.exit(1)
if not args.dry_run:
if not args.ae_url or not args.ae_key or not args.ae_account:
print("ERROR: AE_API_URL, AE_API_KEY, and AE_ACCOUNT_ID are required (or use --dry-run)", file=sys.stderr)
sys.exit(1)
use_llm = not args.no_llm and bool(args.llm_url) and bool(args.llm_model)
if not use_llm and not args.no_llm:
print("NOTE: LLM summarization disabled (LOCAL_API_URL or LOCAL_MODEL not set). Use --no-llm to silence this.")
exclude = _DEFAULT_EXCLUDE | {d.strip() for d in args.exclude.split(",") if d.strip()}
state_file = Path(args.state_file)
state = _load_state(state_file)
print(f"Source: {source}")
print(f"Journal: {args.journal}")
print(f"Dry run: {args.dry_run}")
print(f"LLM: {'enabled (' + args.llm_model + ')' if use_llm else 'disabled'}")
print(f"State file: {state_file} ({len(state['imported'])} already imported)")
print()
files = _walk_markdown(source, exclude)
print(f"Found {len(files)} markdown files")
created = 0
skipped = 0
errors = 0
total_chunks = 0
for file_path in files:
try:
content = file_path.read_text(encoding="utf-8", errors="replace")
except Exception as e:
print(f" SKIP (read error): {file_path.name}{e}")
errors += 1
continue
chunks = _chunk_content(file_path, content, source, args.max_chunk)
total_chunks += len(chunks)
for chunk in chunks:
key = chunk["chunk_key"]
if key in state["imported"]:
skipped += 1
continue
print(f" {'[DRY RUN] ' if args.dry_run else ''}IMPORT: {chunk['title'][:70]}")
summary = ""
if use_llm:
try:
summary = _summarize(chunk["content"], args.llm_url, args.llm_key, args.llm_model)
time.sleep(_LLM_DELAY)
except Exception as e:
print(f" LLM error (continuing without summary): {e}")
if not args.dry_run:
try:
entry_id = _ae_create_entry(
journal_id=args.journal,
title=chunk["title"],
content=chunk["content"],
summary=summary,
tags=chunk["tags"],
ae_url=args.ae_url,
ae_key=args.ae_key,
ae_account=args.ae_account,
)
state["imported"][key] = {
"entry_id": entry_id,
"imported_at": datetime.now(timezone.utc).isoformat(),
"path": chunk["path"],
"title": chunk["title"],
}
_save_state(state_file, state)
time.sleep(_AE_DELAY)
created += 1
except Exception as e:
print(f" AE API error: {e}")
errors += 1
else:
created += 1
if args.limit and created >= args.limit:
print(f"\nReached --limit {args.limit}. Stopping.")
_print_summary(created, skipped, errors, total_chunks, args.dry_run)
return
_print_summary(created, skipped, errors, total_chunks, args.dry_run)
def _print_summary(created: int, skipped: int, errors: int, total: int, dry_run: bool) -> None:
label = "Would create" if dry_run else "Created"
print(f"\n{'=' * 50}")
print(f"{label}: {created} entries")
print(f"Skipped (already imported): {skipped}")
print(f"Errors: {errors}")
print(f"Total chunks processed: {total}")
if __name__ == "__main__":
main()