""" Per-user unified model registry — V2. Stored in: home/{user}/model_registry.json V2 Schema: { "version": 2, # Per-provider accounts / credentials (user-configured) "providers": { "anthropic": { "credentials": [ {"id": "cli", "label": "Claude CLI (OAuth)", "type": "cli"} ] }, "google": { "accounts": [ {"id": "", "label": "My Google account", "api_key": "AIza..."} ] } }, # Local OpenAI-compatible hosts (unchanged from V1) "hosts": [{"id", "label", "api_url", "api_key", "host_type"}, ...], # User-registered model entries (all providers) "models": [ { "id": str, # unique within this registry "type": str, # see TYPES below "label": str, # human-readable "model_name": str, # identifier sent to the API / CLI "provider": str | null, # "anthropic" | "google" | "local" | null "host_id": str | null, # local_openai only — references hosts[].id "credential_id":str | null, # claude_cli only — references providers.anthropic.credentials "account_id": str | null, # gemini_api only — references providers.google.accounts "context_k": int, # context window in k tokens (informational) "max_rounds": int | null, # per-model tool-loop cap; null = use orchestrator_max_rounds global "tags": [str], # user-defined capability tags }, ], # Role assignments — any model (any provider) can fill any role "roles": { "": { "primary": "" | null, "backup_1": "" | null, ... "backup_4": "" | null, }, }, } Types: "claude_cli" — Claude CLI subprocess (~/.claude/.credentials.json) "gemini_cli" — Gemini CLI subprocess "gemini_api" — Gemini API (google-genai SDK); account_id → api_key from providers.google "local_openai" — OpenAI-compatible endpoint; host_id → api_url/api_key from hosts[] "anthropic_api" — Anthropic SDK direct; credential_id → api_key from providers.anthropic.credentials Built-in model IDs (always resolvable without a registry entry): "claude_cli" — resolves to the default Claude CLI model "gemini_cli" — resolves to Gemini CLI "gemini_api" — resolves to Gemini API using GEMINI_API_KEY from .env Role resolution for get_model_for_role(username, role): 1. User registry: roles[role].primary → backup_1 → ... → backup_4 2. .env default: ROLE_= 3. Hardcoded last-resort defaults per role 4. claude_cli (absolute fallback) """ import json import logging import secrets from pathlib import Path from config import settings logger = logging.getLogger(__name__) # ── Provider model catalogs ─────────────────────────────────────────────────── # Server-side defaults. Update here when providers release new models. # Users can add entries via the settings UI (Phase 2). ANTHROPIC_CATALOG: list[dict] = [ # Latest {"id": "claude-opus-4-7", "label": "Claude Opus 4.7", "context_k": 1000}, {"id": "claude-sonnet-4-6", "label": "Claude Sonnet 4.6", "context_k": 1000}, {"id": "claude-haiku-4-5-20251001", "label": "Claude Haiku 4.5", "context_k": 200}, # Previous versions (still available, not deprecated) {"id": "claude-opus-4-6", "label": "Claude Opus 4.6", "context_k": 1000}, {"id": "claude-sonnet-4-5", "label": "Claude Sonnet 4.5", "context_k": 200}, ] GOOGLE_CATALOG: list[dict] = [ # Stable / generally available {"id": "gemini-2.5-pro", "label": "Gemini 2.5 Pro", "context_k": 1000}, {"id": "gemini-2.5-flash", "label": "Gemini 2.5 Flash", "context_k": 1000}, {"id": "gemini-2.5-flash-lite", "label": "Gemini 2.5 Flash-Lite", "context_k": 1000}, # Preview {"id": "gemini-3.1-pro-preview", "label": "Gemini 3.1 Pro (preview)", "context_k": 1000}, {"id": "gemini-3-flash-preview", "label": "Gemini 3 Flash (preview)", "context_k": 1000}, {"id": "gemini-3.1-flash-lite-preview", "label": "Gemini 3.1 Flash-Lite (preview)", "context_k": 1000}, ] # Known OpenAI-compatible cloud inference services. # All use host_type "openai" (/chat/completions + /models paths). CLOUD_API_CATALOG: list[dict] = [ {"id": "openrouter", "label": "OpenRouter", "api_url": "https://openrouter.ai/api/v1"}, {"id": "openai", "label": "OpenAI", "api_url": "https://api.openai.com/v1"}, {"id": "groq", "label": "Groq", "api_url": "https://api.groq.com/openai/v1"}, {"id": "xai", "label": "X.ai / Grok", "api_url": "https://api.x.ai/v1"}, {"id": "together", "label": "Together.ai", "api_url": "https://api.together.xyz/v1"}, {"id": "fireworks", "label": "Fireworks.ai", "api_url": "https://api.fireworks.ai/inference/v1"}, {"id": "custom", "label": "Custom", "api_url": ""}, ] # ── Built-in model definitions ──────────────────────────────────────────────── def _builtins() -> dict[str, dict]: return { "claude_cli": { "id": "claude_cli", "type": "claude_cli", "label": f"Claude (CLI) — {settings.default_model}", "model_name": settings.default_model, "context_k": 200, "tags": ["chat", "persona", "creative"], }, "gemini_cli": { "id": "gemini_cli", "type": "gemini_cli", "label": "Gemini (CLI)", "model_name": "", "context_k": 1000, "tags": ["chat", "research", "long_context"], }, "gemini_api": { "id": "gemini_api", "type": "gemini_api", "label": f"Gemini API — {settings.orchestrator_model}", "model_name": settings.orchestrator_model, "context_k": 1000, "tags": ["orchestrator", "research", "long_context", "tools"], }, } _ROLE_LAST_RESORT: dict[str, str] = { "chat": "claude_cli", "orchestrator": "gemini_api", "distill": "claude_cli", "coder": "claude_cli", "research": "gemini_api", } PRIORITY_KEYS = ["primary", "backup_1", "backup_2", "backup_3", "backup_4"] REQUIRED_ROLES: list[str] = ["chat", "orchestrator", "distill"] # ── Storage ─────────────────────────────────────────────────────────────────── def _registry_path(username: str) -> Path: return settings.home_root() / username / "model_registry.json" def _local_llm_path(username: str) -> Path: return settings.home_root() / username / "local_llm.json" def _auth_path(username: str) -> Path: return settings.home_root() / username / "auth.json" def _empty() -> dict: return { "version": 2, "providers": _default_providers(), "hosts": [], "models": [], "roles": {}, } def _default_providers() -> dict: return { "anthropic": { "credentials": [ {"id": "cli", "label": "Claude CLI (OAuth)", "type": "cli"} ] }, "google": { "accounts": [] }, } def _normalize(data: dict) -> dict: """Back-fill missing fields introduced by schema additions.""" for h in data.get("hosts", []): h.setdefault("host_type", "openwebui") h.setdefault("max_concurrent", 3) data.setdefault("providers", _default_providers()) data["providers"].setdefault("anthropic", {"credentials": [{"id": "cli", "label": "Claude CLI (OAuth)", "type": "cli"}]}) data["providers"].setdefault("google", {"accounts": []}) return data def _load(username: str) -> dict: path = _registry_path(username) if path.exists(): try: data = json.loads(path.read_text()) if isinstance(data, dict) and "version" in data: if data["version"] == 1: data = _migrate_v1_to_v2(username, data) _save(username, data) return _normalize(data) except (json.JSONDecodeError, OSError): logger.warning("model_registry.json for %s is unreadable — starting fresh", username) return _empty() # No registry — try migrating from local_llm.json legacy = _local_llm_path(username) if legacy.exists(): data = _migrate_from_local_llm(username, legacy) _save(username, data) logger.info("Migrated local_llm.json → model_registry.json for %s", username) return data return _empty() def _save(username: str, data: dict) -> None: _registry_path(username).write_text(json.dumps(data, indent=2)) # ── Migration ───────────────────────────────────────────────────────────────── def _migrate_v1_to_v2(username: str, data: dict) -> dict: """ Upgrade a V1 registry to V2. Changes: - Adds providers section with default structure - Migrates gemini_api_key from auth.json → first Google account entry - Sets version to 2 """ logger.info("Migrating model_registry.json V1 → V2 for %s", username) data["version"] = 2 if "providers" not in data: data["providers"] = _default_providers() else: data["providers"].setdefault("anthropic", {"credentials": [{"id": "cli", "label": "Claude CLI (OAuth)", "type": "cli"}]}) data["providers"].setdefault("google", {"accounts": []}) # Pull existing Gemini key from auth.json (stored there in V1) → first account entry accounts = data["providers"]["google"]["accounts"] if not accounts: try: auth = json.loads(_auth_path(username).read_text()) existing_key = auth.get("gemini_api_key") if existing_key: accounts.append({ "id": secrets.token_hex(4), "label": "Gemini API Key", "api_key": existing_key, }) logger.info("Migrated gemini_api_key from auth.json → providers.google.accounts for %s", username) except (OSError, json.JSONDecodeError): pass return data def _migrate_from_local_llm(username: str, path: Path) -> dict: """Convert local_llm.json → V2 model_registry format.""" try: old = json.loads(path.read_text()) except Exception: return _empty() data = _empty() # Handle v0 flat format if "hosts" not in old: api_url = old.get("api_url") or settings.local_api_url api_key = old.get("api_key") or settings.local_api_key model_name = old.get("model") or settings.local_model if not api_url: return data host_id = secrets.token_hex(4) old = { "hosts": [{"id": host_id, "label": "Local Model Server", "api_url": api_url, "api_key": api_key}], "models": [{"id": secrets.token_hex(4), "host_id": host_id, "label": model_name, "model_name": model_name}] if model_name else [], "active_model_id": None, } if old["models"]: old["active_model_id"] = old["models"][0]["id"] data["hosts"] = old.get("hosts", []) for m in old.get("models", []): data["models"].append({ "id": m["id"], "type": "local_openai", "label": m.get("label") or m.get("model_name", ""), "model_name": m.get("model_name", ""), "provider": "local", "host_id": m.get("host_id"), "context_k": 0, "tags": [], }) active_id = old.get("active_model_id") if active_id and any(m["id"] == active_id for m in data["models"]): data["roles"]["chat"] = {"primary": active_id} if settings.distill_backend_mid == "local": data["roles"]["distill"] = {"primary": active_id} # Migrate Gemini key from auth.json data = _migrate_v1_to_v2(username, {"version": 1, **data}) return data # ── Model resolution ────────────────────────────────────────────────────────── def _resolve_model(registry: dict, model_id: str) -> dict | None: """Resolve a model_id to its full config dict (credentials merged in), or None.""" builtins = _builtins() # Built-in IDs take priority over user-defined entries with the same ID if model_id in builtins: return dict(builtins[model_id]) model = next((m for m in registry.get("models", []) if m["id"] == model_id), None) if not model: return None model_type = model.get("type") if model_type == "local_openai": host_id = model.get("host_id") host = next((h for h in registry.get("hosts", []) if h["id"] == host_id), None) if not host: logger.warning("model %s references missing host_id %s", model_id, host_id) return None return { **model, "api_url": host.get("api_url", ""), "api_key": host.get("api_key", ""), "host_type": host.get("host_type", "openwebui"), } if model_type == "gemini_api": account_id = model.get("account_id") if account_id: accounts = registry.get("providers", {}).get("google", {}).get("accounts", []) account = next((a for a in accounts if a["id"] == account_id), None) if account: return {**model, "api_key": account.get("api_key", "")} logger.warning("model %s references missing account_id %s", model_id, account_id) return dict(model) if model_type == "anthropic_api": credential_id = model.get("credential_id") if credential_id: creds = registry.get("providers", {}).get("anthropic", {}).get("credentials", []) cred = next((c for c in creds if c["id"] == credential_id), None) if cred and cred.get("api_key"): return {**model, "api_key": cred["api_key"]} logger.warning("model %s references missing/keyless credential_id %s", model_id, credential_id) return dict(model) if model_type == "claude_cli": return dict(model) return dict(model) def get_model_for_role(username: str, role: str) -> dict | None: """ Return the resolved model config for the given role. Resolution order: 1. User registry: roles[role].primary → backup_1 → ... → backup_4 2. .env: ROLE_ = builtin model ID 3. Hardcoded last-resort default per role 4. claude_cli (absolute fallback) """ registry = _load(username) role_cfg = registry.get("roles", {}).get(role, {}) for key in PRIORITY_KEYS: model_id = role_cfg.get(key) if not model_id: continue resolved = _resolve_model(registry, model_id) if resolved: return resolved logger.debug("role %s.%s = %s but model not found", role, key, model_id) # .env default env_type = settings.get_role_default(role) builtins = _builtins() if env_type and env_type in builtins: return dict(builtins[env_type]) # Hardcoded last resort fallback_id = _ROLE_LAST_RESORT.get(role, "claude_cli") return dict(builtins.get(fallback_id, builtins["claude_cli"])) def get_best_local_model(username: str, role: str = "chat") -> dict | None: """ Return the best available local_openai model for the given role. Used when the user explicitly selects "local" backend in the UI. """ registry = _load(username) role_cfg = registry.get("roles", {}).get(role, {}) for key in PRIORITY_KEYS: model_id = role_cfg.get(key) if not model_id: continue resolved = _resolve_model(registry, model_id) if resolved and resolved.get("type") == "local_openai": return resolved for model in registry.get("models", []): if model.get("type") == "local_openai": resolved = _resolve_model(registry, model["id"]) if resolved: return resolved return None def set_role_config( username: str, role: str, system_append: str, tools: list[str] | None, inject_datetime: bool = True, inject_mode: bool = True, ) -> None: """Save system_append, tools allow-list, and per-injection flags for a role. tools=None clears the allow-list (role uses all accessible tools). inject_datetime=False suppresses the date/time header for pure processing roles. inject_mode=False suppresses the session mode (OTR) line for pure processing roles. """ data = _load(username) roles = data.setdefault("roles", {}) if role not in roles: roles[role] = {} roles[role]["system_append"] = system_append.strip() roles[role]["inject_datetime"] = inject_datetime roles[role]["inject_mode"] = inject_mode if tools is None: roles[role].pop("tools", None) else: roles[role]["tools"] = [t for t in tools if t] _save(username, data) def get_role_config(username: str, role: str) -> dict: """ Return supplemental config for a role: system_append, tools, and injection flags. All keys are optional in the registry — missing means "use defaults": system_append: str — appended to the system prompt for this role tools: list[str] | None — explicit tool allow-list (None = no restriction) inject_datetime: bool — whether to inject current date/time (default True) inject_mode: bool — whether to inject session mode (OTR) line (default True) """ registry = _load(username) role_cfg = registry.get("roles", {}).get(role, {}) return { "system_append": role_cfg.get("system_append", ""), "tools": role_cfg.get("tools") or None, "inject_datetime": role_cfg.get("inject_datetime", True), "inject_mode": role_cfg.get("inject_mode", True), } def get_model_for_slot(username: str, role: str, slot: str) -> dict | None: """ Resolve a single named priority slot from a role without walking the fallback chain. Used by Phase 3 explicit slot selection — the user has pinned a specific model; don't silently redirect to another slot if this one is empty or broken. Returns None if the slot is unset or the model can't be resolved. """ if slot not in PRIORITY_KEYS: return None registry = _load(username) model_id = registry.get("roles", {}).get(role, {}).get(slot) if not model_id: return None return _resolve_model(registry, model_id) def get_google_api_key(username: str, account_id: str | None = None) -> str | None: """ Return the best available Gemini API key for the user. If account_id is specified, returns that account's key (or None if not found). Otherwise returns the first configured account key, falling back to the server-level GEMINI_API_KEY from .env. """ registry = _load(username) accounts = registry.get("providers", {}).get("google", {}).get("accounts", []) if account_id: account = next((a for a in accounts if a["id"] == account_id), None) return account.get("api_key") if account else None # First configured account if accounts: return accounts[0].get("api_key") or None # Fall back to .env server key return settings.gemini_api_key or None # ── Read API ────────────────────────────────────────────────────────────────── def get_registry(username: str) -> dict: """Return the full registry (providers + hosts + models + roles).""" return _load(username) def get_all_models(username: str) -> list[dict]: """Return all user-defined models (resolved — credentials/hosts merged in).""" registry = _load(username) out = [] for m in registry.get("models", []): resolved = _resolve_model(registry, m["id"]) if resolved: out.append(resolved) return out def get_defined_roles(username: str) -> dict[str, dict]: """Return the roles section, filling gaps with empty dicts.""" registry = _load(username) roles = registry.get("roles", {}) return {role: roles.get(role, {}) for role in settings.get_defined_roles()} def get_google_accounts(username: str) -> list[dict]: """Return Google account entries (api_key masked for display).""" registry = _load(username) accounts = registry.get("providers", {}).get("google", {}).get("accounts", []) return [ { "id": a["id"], "label": a.get("label", ""), "hint": (a.get("api_key") or "")[:8] + "…" if a.get("api_key") else "", } for a in accounts ] def get_catalog(provider: str, username: str | None = None) -> list[dict]: """ Return the model catalog for a provider. For now returns server defaults. Phase 2 will merge in per-user additions. """ if provider == "anthropic": return list(ANTHROPIC_CATALOG) if provider == "google": return list(GOOGLE_CATALOG) if provider == "cloud": return list(CLOUD_API_CATALOG) return [] # ── Write API — Google accounts ─────────────────────────────────────────────── def save_google_account(username: str, account_id: str | None, label: str, api_key: str) -> str: """Create or update a Google account entry. Returns the account ID.""" data = _load(username) accounts = data["providers"]["google"]["accounts"] if account_id: for a in accounts: if a["id"] == account_id: a["label"] = label.strip() if api_key.strip(): a["api_key"] = api_key.strip() _save(username, data) return account_id account_id = secrets.token_hex(4) accounts.append({ "id": account_id, "label": label.strip(), "api_key": api_key.strip(), }) _save(username, data) return account_id def remove_google_account(username: str, account_id: str) -> bool: """Remove a Google account. Clears any model entries that reference it.""" data = _load(username) accounts = data["providers"]["google"]["accounts"] before = len(accounts) data["providers"]["google"]["accounts"] = [a for a in accounts if a["id"] != account_id] # Clear role assignments for models that referenced this account removed_model_ids = { m["id"] for m in data.get("models", []) if m.get("account_id") == account_id } data["models"] = [m for m in data.get("models", []) if m["id"] not in removed_model_ids] for role_cfg in data.get("roles", {}).values(): for key in PRIORITY_KEYS: if role_cfg.get(key) in removed_model_ids: role_cfg[key] = None _save(username, data) return len(data["providers"]["google"]["accounts"]) < before # ── Write API — Anthropic API keys ─────────────────────────────────────────── def get_anthropic_api_keys(username: str) -> list[dict]: """Return Anthropic API key credentials (type='api_key') with key masked for display.""" registry = _load(username) creds = registry.get("providers", {}).get("anthropic", {}).get("credentials", []) return [ { "id": c["id"], "label": c.get("label", ""), "hint": (c.get("api_key") or "")[:8] + "…" if c.get("api_key") else "no key", } for c in creds if c.get("type") == "api_key" ] def save_anthropic_api_key(username: str, key_id: str | None, label: str, api_key: str) -> str: """Create or update an Anthropic API key credential. Returns the credential ID.""" data = _load(username) creds = data["providers"]["anthropic"]["credentials"] if key_id: for c in creds: if c["id"] == key_id and c.get("type") == "api_key": c["label"] = label.strip() or c.get("label", "API Key") if api_key.strip(): c["api_key"] = api_key.strip() _save(username, data) return key_id key_id = secrets.token_hex(4) creds.append({ "id": key_id, "label": label.strip() or "API Key", "type": "api_key", "api_key": api_key.strip(), }) _save(username, data) return key_id def remove_anthropic_api_key(username: str, key_id: str) -> bool: """Remove an Anthropic API key credential. Clears model entries that reference it.""" data = _load(username) creds = data["providers"]["anthropic"]["credentials"] before = len(creds) data["providers"]["anthropic"]["credentials"] = [ c for c in creds if c["id"] != key_id ] removed_model_ids = { m["id"] for m in data.get("models", []) if m.get("credential_id") == key_id } data["models"] = [m for m in data.get("models", []) if m["id"] not in removed_model_ids] for role_cfg in data.get("roles", {}).values(): for key in PRIORITY_KEYS: if role_cfg.get(key) in removed_model_ids: role_cfg[key] = None _save(username, data) return len(data["providers"]["anthropic"]["credentials"]) < before # ── Write API — Hosts ───────────────────────────────────────────────────────── def save_host(username: str, host_id: str | None, label: str, api_url: str, api_key: str, host_type: str = "openwebui", max_concurrent: int = 3) -> str: """Create or update a host. Returns the host ID.""" data = _load(username) host_type = host_type if host_type in ("openwebui", "openai") else "openwebui" max_concurrent = max(1, min(int(max_concurrent), 20)) if host_id: for h in data["hosts"]: if h["id"] == host_id: h["label"] = label.strip() h["api_url"] = api_url.strip() h["host_type"] = host_type h["max_concurrent"] = max_concurrent if api_key.strip(): h["api_key"] = api_key.strip() _save(username, data) return host_id host_id = None host_id = secrets.token_hex(4) data["hosts"].append({ "id": host_id, "label": label.strip(), "api_url": api_url.strip(), "api_key": api_key.strip(), "host_type": host_type, "max_concurrent": max_concurrent, }) _save(username, data) return host_id def remove_host(username: str, host_id: str) -> bool: """Remove a host and all models that reference it.""" data = _load(username) before = len(data["hosts"]) removed_model_ids = {m["id"] for m in data["models"] if m.get("host_id") == host_id} data["hosts"] = [h for h in data["hosts"] if h["id"] != host_id] data["models"] = [m for m in data["models"] if m.get("host_id") != host_id] for role_cfg in data.get("roles", {}).values(): for key in PRIORITY_KEYS: if role_cfg.get(key) in removed_model_ids: role_cfg[key] = None _save(username, data) return len(data["hosts"]) < before # ── Write API — Models ──────────────────────────────────────────────────────── def save_model(username: str, model_id: str | None, host_id: str, label: str, model_name: str, context_k: int = 0, tags: list[str] | None = None, max_rounds: int | None = None, tools: bool = True, reasoning_budget_tokens: int | None = None) -> str: """Create or update a local_openai model entry. Returns the model ID.""" data = _load(username) tags = tags or [] if model_id: for m in data["models"]: if m["id"] == model_id: m["host_id"] = host_id m["label"] = label.strip() or model_name.strip() m["model_name"] = model_name.strip() m["context_k"] = context_k m["max_rounds"] = max_rounds m["tools"] = tools m["tags"] = tags m["reasoning_budget_tokens"] = reasoning_budget_tokens _save(username, data) return model_id model_id = None model_id = secrets.token_hex(4) data["models"].append({ "id": model_id, "type": "local_openai", "label": label.strip() or model_name.strip(), "model_name": model_name.strip(), "provider": "local", "host_id": host_id, "context_k": context_k, "max_rounds": max_rounds, "tools": tools, "tags": tags, "reasoning_budget_tokens": reasoning_budget_tokens, }) _save(username, data) return model_id def save_cloud_model(username: str, model_id: str | None, provider: str, model_name: str, label: str, account_id: str | None = None, credential_id: str | None = None, context_k: int = 0, tags: list[str] | None = None, max_rounds: int | None = None, tools: bool = True) -> str: """ Create or update an Anthropic or Google model entry. Returns the model ID. provider: "anthropic" | "google" account_id: Google only — references providers.google.accounts[].id credential_id: Anthropic only — "cli" for OAuth CLI, or a hex ID for an API key credential """ data = _load(username) # Determine model type from credential (anthropic only) if provider == "anthropic": creds = data.get("providers", {}).get("anthropic", {}).get("credentials", []) cred = next((c for c in creds if c["id"] == credential_id), None) if credential_id else None entry_type = "anthropic_api" if (cred and cred.get("type") == "api_key") else "claude_cli" elif provider == "google": entry_type = "gemini_api" else: entry_type = "claude_cli" tags = tags or [] entry: dict = { "type": entry_type, "label": label.strip() or model_name.strip(), "model_name": model_name.strip(), "provider": provider, "context_k": context_k, "max_rounds": max_rounds, "tools": tools, "tags": tags, } if account_id: entry["account_id"] = account_id if credential_id: entry["credential_id"] = credential_id if model_id: for m in data["models"]: if m["id"] == model_id: m.update(entry) _save(username, data) return model_id model_id = None model_id = secrets.token_hex(4) entry["id"] = model_id data["models"].append(entry) _save(username, data) return model_id def remove_model(username: str, model_id: str) -> bool: """Remove a model and clear any role assignments pointing to it.""" data = _load(username) before = len(data["models"]) data["models"] = [m for m in data["models"] if m["id"] != model_id] for role_cfg in data.get("roles", {}).values(): for key in PRIORITY_KEYS: if role_cfg.get(key) == model_id: role_cfg[key] = None _save(username, data) return len(data["models"]) < before def get_custom_roles(username: str) -> list[str]: """ Return the user's custom (non-required) roles. Falls back to config-defined roles minus required ones for migration. """ registry = _load(username) if "custom_roles" in registry: return [r for r in registry["custom_roles"] if r and r not in REQUIRED_ROLES] from config import settings as _cfg return [r for r in _cfg.get_defined_roles() if r not in REQUIRED_ROLES] def get_all_roles(username: str) -> list[str]: """Return required roles followed by the user's custom roles.""" return list(REQUIRED_ROLES) + get_custom_roles(username) def add_custom_role(username: str, role_name: str) -> bool: """Add a custom role. Returns False if the name is invalid or already a required role.""" role_name = role_name.strip().lower() if not role_name or role_name in REQUIRED_ROLES: return False data = _load(username) if "custom_roles" not in data: from config import settings as _cfg data["custom_roles"] = [r for r in _cfg.get_defined_roles() if r not in REQUIRED_ROLES] if role_name not in data["custom_roles"]: data["custom_roles"].append(role_name) _save(username, data) return True def remove_custom_role(username: str, role_name: str) -> bool: """Remove a custom role. Required roles cannot be removed.""" if role_name in REQUIRED_ROLES: return False data = _load(username) if "custom_roles" not in data: from config import settings as _cfg data["custom_roles"] = [r for r in _cfg.get_defined_roles() if r not in REQUIRED_ROLES] if role_name in data["custom_roles"]: data["custom_roles"].remove(role_name) _save(username, data) return True def set_role(username: str, role: str, priority: str, model_id: str | None) -> bool: """ Assign a model to a role priority slot. priority must be one of: primary, backup_1, backup_2, backup_3, backup_4 model_id None clears the slot. Built-in IDs (claude_cli, gemini_cli, gemini_api) are always valid. """ if priority not in PRIORITY_KEYS: return False data = _load(username) if model_id and model_id not in _builtins(): if not any(m["id"] == model_id for m in data["models"]): return False roles = data.setdefault("roles", {}) if role not in roles: roles[role] = {} roles[role][priority] = model_id or None _save(username, data) return True # ── Utility ─────────────────────────────────────────────────────────────────── def fetch_models_from_host(api_url: str, api_key: str, host_type: str = "openwebui") -> list[str]: """Synchronously fetch the model list from an OpenAI-compatible host.""" import httpx path = "/api/models" if host_type == "openwebui" else "/models" url = api_url.rstrip("/") + path headers = {"Authorization": f"Bearer {api_key}"} if api_key else {} resp = httpx.get(url, headers=headers, timeout=10) resp.raise_for_status() data = resp.json() models = data.get("data", []) return sorted(m.get("id", m.get("name", "")) for m in models if m.get("id") or m.get("name"))