#!/usr/bin/env python3 """ Knowledge base importer — walks a markdown directory and creates AE journal entries. Uses a local LLM to generate a 1-2 sentence summary for each chunk. Tracks progress in a state file so interrupted runs can be resumed. Usage: python import_knowledge.py --source ~/DgrZone_Nextcloud --journal python import_knowledge.py --source ~/OSIT_Nextcloud --journal --dry-run python import_knowledge.py --source ~/DgrZone_Nextcloud/Notes --journal --limit 5 Reads credentials from cortex/.env (relative to this script's parent directory) or from environment variables: AE_API_URL, AE_API_KEY, AE_ACCOUNT_ID LOCAL_API_URL, LOCAL_API_KEY, LOCAL_MODEL """ import argparse import hashlib import json import os import re import sys import time from datetime import datetime, timezone from pathlib import Path # ── Bootstrap: load .env from cortex/.env if not already set ───────────────── _ENV_PATH = Path(__file__).parent.parent / "cortex" / ".env" def _load_env(path: Path) -> None: if not path.exists(): return for line in path.read_text().splitlines(): line = line.strip() if not line or line.startswith("#") or "=" not in line: continue key, _, val = line.partition("=") key = key.strip() if key not in os.environ: os.environ[key] = val.strip().strip('"').strip("'") _load_env(_ENV_PATH) # ── Constants ───────────────────────────────────────────────────────────────── # Dirs to skip regardless of source _DEFAULT_EXCLUDE = { "temp", "Temp", "tmp", "Tmp", "test", "Test", "Temporary Share", "Test Share", ".obsidian", "media", "Photos", } # Max characters per journal entry chunk _DEFAULT_MAX_CHUNK = 8_000 # Delay between API calls (seconds) to avoid hammering the LLM _LLM_DELAY = 1.0 _AE_DELAY = 0.3 # ── Path / tag utilities ────────────────────────────────────────────────────── def _path_tags(source_root: Path, file_path: Path) -> list[str]: """Derive tags from path components relative to the source root.""" rel = file_path.relative_to(source_root) parts = list(rel.parts[:-1]) # exclude filename itself tags = [] for part in parts: cleaned = re.sub(r"[^a-zA-Z0-9 ]", " ", part).strip().lower() words = cleaned.split() tags.extend(w for w in words if len(w) > 2) # Add source root name as top-level tag tags.insert(0, source_root.name.lower().replace("_nextcloud", "").replace("_", "")) return list(dict.fromkeys(tags)) # deduplicate preserving order def _file_title(file_path: Path, content: str) -> str: """Extract the first H1 heading or fall back to the filename stem.""" m = re.search(r"^# (.+)$", content, re.MULTILINE) if m: return m.group(1).strip() return file_path.stem.replace("_", " ").replace("-", " ") # ── Chunking ────────────────────────────────────────────────────────────────── def _chunk_content( file_path: Path, content: str, source_root: Path, max_size: int, ) -> list[dict]: """ Returns a list of chunk dicts: { "chunk_key": str, # unique ID for state tracking "title": str, "content": str, "tags": list[str], "path": str, } """ base_title = _file_title(file_path, content) base_tags = _path_tags(source_root, file_path) rel_path = str(file_path.relative_to(source_root)) if len(content) <= max_size: h = hashlib.sha1(content.encode()).hexdigest()[:12] return [{ "chunk_key": f"{rel_path}#{h}", "title": base_title, "content": content, "tags": base_tags, "path": rel_path, }] # Split by H2 headings sections = re.split(r"^(## .+)$", content, flags=re.MULTILINE) # sections alternates: [preamble, heading, body, heading, body, ...] chunks = [] preamble = sections[0].strip() pairs = list(zip(sections[1::2], sections[2::2])) if not pairs: # No H2 found — hard split by max_size for i in range(0, len(content), max_size): part = content[i:i + max_size] h = hashlib.sha1(part.encode()).hexdigest()[:12] chunks.append({ "chunk_key": f"{rel_path}#part{i}_{h}", "title": f"{base_title} (part {i // max_size + 1})", "content": part, "tags": base_tags, "path": rel_path, }) return chunks for heading, body in pairs: section_title = heading.lstrip("#").strip() section_content = f"{heading}\n{body}".strip() # Prepend preamble to first section if it has meaningful content if preamble and chunks == []: section_content = f"{preamble}\n\n{section_content}" # If section itself is too big, hard split it if len(section_content) > max_size: for i in range(0, len(section_content), max_size): part = section_content[i:i + max_size] h = hashlib.sha1(part.encode()).hexdigest()[:12] chunks.append({ "chunk_key": f"{rel_path}#{section_title}#part{i}_{h}", "title": f"{base_title} — {section_title} (part {i // max_size + 1})", "content": part, "tags": base_tags + [section_title.lower()[:40]], "path": rel_path, }) else: h = hashlib.sha1(section_content.encode()).hexdigest()[:12] chunks.append({ "chunk_key": f"{rel_path}#{section_title}#{h}", "title": f"{base_title} — {section_title}", "content": section_content, "tags": base_tags + [section_title.lower()[:40]], "path": rel_path, }) return chunks # ── LLM summarization ───────────────────────────────────────────────────────── def _summarize(content: str, llm_url: str, llm_key: str, llm_model: str) -> str: """Call the local LLM to generate a 1-2 sentence summary. Returns "" on failure.""" import urllib.request # Truncate for prompt economy snippet = content[:3000] if len(content) > 3000 else content prompt = ( "Summarize the following note in 1-2 sentences. " "Be specific and factual. Do not start with 'This note' or 'This document'.\n\n" f"{snippet}" ) payload = json.dumps({ "model": llm_model, "messages": [{"role": "user", "content": prompt}], "max_tokens": 150, "temperature": 0.3, }).encode() headers = {"Content-Type": "application/json"} if llm_key: headers["Authorization"] = f"Bearer {llm_key}" # Try Open WebUI path first, fall back to standard OpenAI path for path in ("/api/chat/completions", "/chat/completions"): url = llm_url.rstrip("/") + path req = urllib.request.Request(url, data=payload, headers=headers, method="POST") try: with urllib.request.urlopen(req, timeout=60) as resp: data = json.loads(resp.read()) return data["choices"][0]["message"]["content"].strip() except Exception: continue return "" # ── AE API ──────────────────────────────────────────────────────────────────── def _ae_create_entry( journal_id: str, title: str, content: str, summary: str, tags: list[str], ae_url: str, ae_key: str, ae_account: str, ) -> str: """POST to AE API. Returns the new entry's id_random or raises on error.""" import urllib.request url = f"{ae_url.rstrip('/')}/v3/crud/journal/{journal_id}/journal_entry/" payload = json.dumps({ "name": title, "content": content, "summary": summary, "tags": tags, }).encode() headers = { "Content-Type": "application/json", "x-aether-api-key": ae_key, "x-account-id": ae_account, } req = urllib.request.Request(url, data=payload, headers=headers, method="POST") with urllib.request.urlopen(req, timeout=30) as resp: data = json.loads(resp.read()) return ( data.get("data", {}).get("journal_entry_id") or data.get("data", {}).get("id_random") or data.get("id_random") or "?" ) # ── State file ──────────────────────────────────────────────────────────────── def _load_state(state_file: Path) -> dict: if state_file.exists(): try: return json.loads(state_file.read_text()) except Exception: pass return {"imported": {}} def _save_state(state_file: Path, state: dict) -> None: state_file.write_text(json.dumps(state, indent=2)) # ── File walker ─────────────────────────────────────────────────────────────── def _walk_markdown(source: Path, exclude: set[str]) -> list[Path]: files = [] for f in sorted(source.rglob("*.md")): if any(part in exclude for part in f.parts): continue if f.stat().st_size < 50: # skip near-empty files continue files.append(f) return files # ── Main ────────────────────────────────────────────────────────────────────── def main() -> None: parser = argparse.ArgumentParser(description="Import markdown notes into AE Journal") parser.add_argument("--source", required=True, help="Root directory to import from") parser.add_argument("--journal", required=True, help="Target AE journal id_random") parser.add_argument("--dry-run", action="store_true", help="Preview without creating entries") parser.add_argument("--limit", type=int, default=0, help="Stop after N chunks (0 = unlimited)") parser.add_argument("--max-chunk", type=int, default=_DEFAULT_MAX_CHUNK, help="Max chars per chunk") parser.add_argument("--exclude", default="", help="Extra dir names to skip (comma-separated)") parser.add_argument("--state-file", default="import_state.json", help="State tracking file") parser.add_argument("--no-llm", action="store_true", help="Skip LLM summarization (faster)") parser.add_argument("--ae-url", default=os.environ.get("AE_API_URL", ""), help="AE API URL") parser.add_argument("--ae-key", default=os.environ.get("AE_API_KEY", ""), help="AE API key") parser.add_argument("--ae-account", default=os.environ.get("AE_ACCOUNT_ID", ""), help="AE account ID") parser.add_argument("--llm-url", default=os.environ.get("LOCAL_API_URL", ""), help="Local LLM API URL") parser.add_argument("--llm-key", default=os.environ.get("LOCAL_API_KEY", ""), help="Local LLM API key") parser.add_argument("--llm-model", default=os.environ.get("LOCAL_MODEL", ""), help="Local LLM model name") args = parser.parse_args() source = Path(args.source).expanduser().resolve() if not source.exists(): print(f"ERROR: source directory not found: {source}", file=sys.stderr) sys.exit(1) if not args.dry_run: if not args.ae_url or not args.ae_key or not args.ae_account: print("ERROR: AE_API_URL, AE_API_KEY, and AE_ACCOUNT_ID are required (or use --dry-run)", file=sys.stderr) sys.exit(1) use_llm = not args.no_llm and bool(args.llm_url) and bool(args.llm_model) if not use_llm and not args.no_llm: print("NOTE: LLM summarization disabled (LOCAL_API_URL or LOCAL_MODEL not set). Use --no-llm to silence this.") exclude = _DEFAULT_EXCLUDE | {d.strip() for d in args.exclude.split(",") if d.strip()} state_file = Path(args.state_file) state = _load_state(state_file) print(f"Source: {source}") print(f"Journal: {args.journal}") print(f"Dry run: {args.dry_run}") print(f"LLM: {'enabled (' + args.llm_model + ')' if use_llm else 'disabled'}") print(f"State file: {state_file} ({len(state['imported'])} already imported)") print() files = _walk_markdown(source, exclude) print(f"Found {len(files)} markdown files") created = 0 skipped = 0 errors = 0 total_chunks = 0 for file_path in files: try: content = file_path.read_text(encoding="utf-8", errors="replace") except Exception as e: print(f" SKIP (read error): {file_path.name} — {e}") errors += 1 continue chunks = _chunk_content(file_path, content, source, args.max_chunk) total_chunks += len(chunks) for chunk in chunks: key = chunk["chunk_key"] if key in state["imported"]: skipped += 1 continue print(f" {'[DRY RUN] ' if args.dry_run else ''}IMPORT: {chunk['title'][:70]}") summary = "" if use_llm: try: summary = _summarize(chunk["content"], args.llm_url, args.llm_key, args.llm_model) time.sleep(_LLM_DELAY) except Exception as e: print(f" LLM error (continuing without summary): {e}") if not args.dry_run: try: entry_id = _ae_create_entry( journal_id=args.journal, title=chunk["title"], content=chunk["content"], summary=summary, tags=chunk["tags"], ae_url=args.ae_url, ae_key=args.ae_key, ae_account=args.ae_account, ) state["imported"][key] = { "entry_id": entry_id, "imported_at": datetime.now(timezone.utc).isoformat(), "path": chunk["path"], "title": chunk["title"], } _save_state(state_file, state) time.sleep(_AE_DELAY) created += 1 except Exception as e: print(f" AE API error: {e}") errors += 1 else: created += 1 if args.limit and created >= args.limit: print(f"\nReached --limit {args.limit}. Stopping.") _print_summary(created, skipped, errors, total_chunks, args.dry_run) return _print_summary(created, skipped, errors, total_chunks, args.dry_run) def _print_summary(created: int, skipped: int, errors: int, total: int, dry_run: bool) -> None: label = "Would create" if dry_run else "Created" print(f"\n{'=' * 50}") print(f"{label}: {created} entries") print(f"Skipped (already imported): {skipped}") print(f"Errors: {errors}") print(f"Total chunks processed: {total}") if __name__ == "__main__": main()