import asyncio import logging from config import settings logger = logging.getLogger(__name__) _TYPE_TO_BACKEND = { "local_openai": "local", "anthropic_api": "anthropic_api", } _FALLBACK: dict[str, str | None] = { "local": None, "anthropic_api": None, } async def complete( system_prompt: str, messages: list[dict], model: str | None = None, role: str = "chat", slot: str | None = None, max_tokens: int = 2048, attachment: dict | None = None, token_sink=None, ) -> tuple[str, str]: """ Returns (response_text, actual_backend_used). slot: explicit role slot ("primary" | "backup_1" | "backup_2"). Resolves that exact slot, no fallback chain. Takes priority over role. role: registry role used for auto routing (default: "chat"). model: ignored — kept for API compatibility; routing is via slot/role only. """ import model_registry as _reg from persona import _user username = _user.get() resolved_cfg: dict | None = None if slot is not None: resolved_cfg = _reg.get_model_for_slot(username, role, slot) if resolved_cfg: primary = _TYPE_TO_BACKEND.get(resolved_cfg["type"], "local") else: slot = None if slot is None: resolved = _reg.get_model_for_role(username, role) if resolved: resolved_cfg = resolved primary = _TYPE_TO_BACKEND.get(resolved["type"], "local") else: raise RuntimeError( f"No model configured for role '{role}'. " "Add one at /settings/models." ) fallback = _FALLBACK.get(primary) try: response = await _dispatch(primary, system_prompt, messages, resolved_cfg, attachment=attachment, token_sink=token_sink) return response, primary except Exception as e: if resolved_cfg is not None: logger.error("%s failed (no fallback — model explicitly configured): %s", primary, e) raise if not fallback: logger.error("%s failed (no fallback configured): %s", primary, e) raise logger.warning("%s failed (%s) — falling back to %s", primary, e, fallback) response = await _dispatch(fallback, system_prompt, messages, None, token_sink=token_sink) return response, fallback async def _dispatch( backend: str, system_prompt: str, messages: list[dict], model_cfg: dict | None, attachment: dict | None = None, token_sink=None, ) -> str: if backend == "local": if token_sink: return await _local_streaming(token_sink, system_prompt, messages, model_cfg) text = await _local(system_prompt, messages, model_cfg, attachment=attachment) elif backend == "anthropic_api": if token_sink: return await _anthropic_api_streaming(token_sink, system_prompt, messages, model_cfg) text = await _anthropic_api(system_prompt, messages, model_cfg) else: raise RuntimeError(f"Unknown backend '{backend}' — check model type in registry") if token_sink and text: await token_sink(text) return text async def _local( system_prompt: str, messages: list[dict], model_cfg: dict | None = None, attachment: dict | None = None, ) -> str: """OpenAI-compatible backend — Open WebUI / Ollama. model_cfg is pre-resolved by complete() via model_registry. Falls back to registry lookup if not provided. attachment: optional image dict {filename, mime_type, data} for vision calls. """ import httpx cfg = model_cfg if not cfg: # Fallback: resolve directly from registry import model_registry as _reg from persona import _user cfg = _reg.get_best_local_model(_user.get()) if not cfg: raise RuntimeError("No local model configured — add one at /settings/models") api_url = cfg["api_url"] api_key = cfg["api_key"] model = cfg["model_name"] if not api_url: raise RuntimeError("local_api_url not configured — set LOCAL_API_URL in .env or add a host at /settings/models") if not model: raise RuntimeError("local_model not configured — add a model at /settings/models") host_type = cfg.get("host_type", "openwebui") # "openwebui" uses Open WebUI/Ollama path layout; "openai" uses standard OpenAI layout chat_path = "/chat/completions" if host_type == "openai" else "/api/chat/completions" logger.info("local backend (%s): %s @ %s", host_type, model, api_url) msgs: list[dict] = [] if system_prompt: msgs.append({"role": "system", "content": system_prompt}) # Build message list; inject image into the last user message when present. for i, m in enumerate(messages): is_last = (i == len(messages) - 1) if is_last and m["role"] == "user" and attachment: content: list[dict] = [{"type": "text", "text": m["content"]}] content.append({ "type": "image_url", "image_url": {"url": attachment["data"]}, }) msgs.append({"role": "user", "content": content}) else: # Strip non-standard metadata fields before sending to the API msgs.append({"role": m["role"], "content": m["content"]}) url = api_url.rstrip("/") + chat_path headers: dict[str, str] = {} if api_key: headers["Authorization"] = f"Bearer {api_key}" payload = {"model": model, "messages": msgs} async with httpx.AsyncClient(timeout=settings.timeout_local) as client: resp = await client.post(url, json=payload, headers=headers) resp.raise_for_status() data = resp.json() text = data["choices"][0]["message"]["content"] if not text or not text.strip(): raise RuntimeError("Local model returned an empty response") usage = data.get("usage") or {} if usage.get("prompt_tokens") is not None: import usage_tracker from persona import _user asyncio.create_task(usage_tracker.record( username=_user.get(), backend="local", model_name=model, prompt_tokens=usage.get("prompt_tokens", 0), completion_tokens=usage.get("completion_tokens", 0), )) return text.strip() async def _anthropic_api(system_prompt: str, messages: list[dict], model_cfg: dict | None) -> str: """Direct Anthropic API backend using the anthropic SDK.""" try: import anthropic except ImportError: raise RuntimeError("anthropic SDK not installed — run: pip install 'anthropic>=0.40.0'") cfg = model_cfg or {} api_key = cfg.get("api_key", "") model_name = cfg.get("model_name") or settings.default_model if not api_key: raise RuntimeError("No Anthropic API key — add one at /settings/models") client = anthropic.AsyncAnthropic(api_key=api_key) msgs = [{"role": m["role"], "content": m["content"]} for m in messages] kwargs: dict = { "model": model_name, "max_tokens": 4096, "messages": msgs, } if system_prompt: kwargs["system"] = system_prompt resp = await client.messages.create(**kwargs) text = resp.content[0].text if resp.content else "" if not text.strip(): raise RuntimeError("Anthropic API returned an empty response") if resp.usage: import usage_tracker from persona import _user asyncio.create_task(usage_tracker.record( username=_user.get(), backend="anthropic_api", model_name=model_name, prompt_tokens=resp.usage.input_tokens, completion_tokens=resp.usage.output_tokens, )) return text.strip() async def _anthropic_api_streaming( token_sink, system_prompt: str, messages: list[dict], model_cfg: dict | None ) -> str: try: import anthropic except ImportError: raise RuntimeError("anthropic SDK not installed — run: pip install 'anthropic>=0.40.0'") cfg = model_cfg or {} api_key = cfg.get("api_key", "") model_name = cfg.get("model_name") or settings.default_model if not api_key: raise RuntimeError("No Anthropic API key — add one at /settings/models") client = anthropic.AsyncAnthropic(api_key=api_key) msgs = [{"role": m["role"], "content": m["content"]} for m in messages] kwargs: dict = {"model": model_name, "max_tokens": 4096, "messages": msgs} if system_prompt: kwargs["system"] = system_prompt full_text = "" async with client.messages.stream(**kwargs) as stream: async for chunk in stream.text_stream: await token_sink(chunk) full_text += chunk final_msg = await stream.get_final_message() if final_msg.usage: import usage_tracker from persona import _user asyncio.create_task(usage_tracker.record( username=_user.get(), backend="anthropic_api", model_name=model_name, prompt_tokens=final_msg.usage.input_tokens, completion_tokens=final_msg.usage.output_tokens, )) return full_text.strip() async def _local_streaming( token_sink, system_prompt: str, messages: list[dict], model_cfg: dict | None ) -> str: import httpx import json as _json cfg = model_cfg or {} api_url = cfg.get("api_url", "") api_key = cfg.get("api_key", "") model = cfg.get("model_name", "") host_type = cfg.get("host_type", "openwebui") if not api_url: raise RuntimeError("local_api_url not configured") if not model: raise RuntimeError("local_model not configured") chat_path = "/chat/completions" if host_type == "openai" else "/api/chat/completions" url = api_url.rstrip("/") + chat_path headers: dict[str, str] = {"Authorization": f"Bearer {api_key}"} if api_key else {} msgs: list[dict] = [] if system_prompt: msgs.append({"role": "system", "content": system_prompt}) for m in messages: msgs.append({"role": m["role"], "content": m["content"]}) payload = {"model": model, "messages": msgs, "stream": True} full_text = "" async with httpx.AsyncClient(timeout=settings.timeout_local) as client: async with client.stream("POST", url, json=payload, headers=headers) as resp: resp.raise_for_status() async for line in resp.aiter_lines(): if not line or not line.startswith("data: "): continue data_str = line[6:].strip() if data_str == "[DONE]": break try: chunk = _json.loads(data_str) delta = (chunk["choices"][0]["delta"].get("content") or "") if delta: await token_sink(delta) full_text += delta except Exception: pass return full_text.strip()