import asyncio import hashlib import hmac import json import logging from fastapi import APIRouter, BackgroundTasks, HTTPException, Request, Response from auth_utils import get_user_channels, get_user_gemini_key, get_user_role, get_tool_policy, get_risk_policy from context_loader import load_context from llm_client import complete from notification import _send_nct_message from persona import set_context from session_logger import log_turn from session_store import load as load_session, save as save_session from config import settings import event_bus import model_registry import orchestrator_engine import openai_orchestrator logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) if not logger.handlers: _h = logging.StreamHandler() _h.setFormatter(logging.Formatter("%(levelname)s:%(name)s: %(message)s")) logger.addHandler(_h) logger.propagate = False router = APIRouter() def _verify_signature(body: bytes, random_header: str, sig_header: str, secret: str) -> bool: """Nextcloud signs requests with HMAC-SHA256(key=secret, msg=random+body).""" expected = hmac.new( secret.encode(), (random_header + body.decode("utf-8", errors="replace")).encode(), hashlib.sha256, ).hexdigest() return hmac.compare_digest(expected, sig_header.lower()) async def _send_reply(conversation_token: str, message: str, nextcloud_url: str, secret: str) -> None: """Post a message to Nextcloud Talk as the bot.""" logger.info("NCT _send_reply → room %s (%d chars)", conversation_token, len(message)) await _send_nct_message(nextcloud_url, secret, conversation_token, message) async def _process_message( conversation_token: str, user_text: str, actor_name: str, username: str, persona_name: str, nextcloud_url: str, secret: str, timeout: int, cfg: dict, ) -> None: logger.info("NCT process: token=%s user=%s text=%r", conversation_token, actor_name, user_text) set_context(username, persona_name) tier = cfg.get("tier") or settings.default_tier role = cfg.get("role", "chat") use_tools = cfg.get("tools", False) session_id = f"nct_{username}_{conversation_token}" history = load_session(session_id) session_msgs = list(history) # snapshot before we append await event_bus.publish({ "type": "nct_message", "session_id": session_id, "role": "user", "content": user_text, "actor": actor_name, }) backend = "unknown" try: if use_tools: await _send_reply(conversation_token, "⏳ Working on it…", nextcloud_url, secret) role_cfg = model_registry.get_role_config(username, role) system_prompt = load_context( tier, role_append=role_cfg.get("system_append", ""), inject_datetime=role_cfg.get("inject_datetime", True), inject_mode=role_cfg.get("inject_mode", True), ) orch_model = model_registry.get_model_for_role(username, "orchestrator") user_role_val = get_user_role(username) tool_list = role_cfg.get("tools") policy = get_tool_policy(username) c_allow = set(policy.get("allow", [])) c_deny = set(policy.get("deny", [])) max_risk, risk_wl, risk_bl = get_risk_policy(username) if orch_model and orch_model.get("type") == "local_openai": result = await openai_orchestrator.run( task=user_text, system_prompt=system_prompt, session_messages=session_msgs or None, model_cfg=orch_model, user_role=user_role_val, tool_list=tool_list, confirm_allow=c_allow, confirm_deny=c_deny, max_risk=max_risk, risk_whitelist=risk_wl, risk_blacklist=risk_bl, ) else: gemini_key = ( (orch_model.get("api_key") if orch_model else None) or get_user_gemini_key(username) ) result = await orchestrator_engine.run( task=user_text, system_prompt=system_prompt, session_messages=session_msgs or None, respond_with_claude=True, gemini_api_key=gemini_key, model_name=orch_model.get("model_name") if orch_model else None, response_role=role, user_role=user_role_val, tool_list=tool_list, confirm_allow=c_allow, confirm_deny=c_deny, max_risk=max_risk, risk_whitelist=risk_wl, risk_blacklist=risk_bl, ) response_text = result.response backend = result.backend if result.checkpoint: response_text += "\n\n_(This action requires confirmation — use the web UI to approve or deny.)_" else: system_prompt = load_context(tier) history_for_llm = list(session_msgs) + [{"role": "user", "content": user_text}] response_text, backend = await asyncio.wait_for( complete(system_prompt=system_prompt, messages=history_for_llm), timeout=timeout, ) except asyncio.TimeoutError: logger.warning("NCT timeout for %s", conversation_token) await _send_reply(conversation_token, "⏳ Still thinking — this is taking longer than usual.", nextcloud_url, secret) return except Exception as e: logger.error("NCT LLM error for %s: %s", conversation_token, e) await _send_reply(conversation_token, "⚠️ Something went wrong on my end.", nextcloud_url, secret) return logger.info("NCT LLM responded via %s (%d chars)", backend, len(response_text)) history.append({"role": "user", "content": user_text}) history.append({"role": "assistant", "content": response_text}) save_session(session_id, history) log_turn(session_id, user_text, response_text) await event_bus.publish({ "type": "nct_response", "session_id": session_id, "role": "assistant", "content": response_text, "backend": backend, }) await _send_reply(conversation_token, response_text, nextcloud_url, secret) @router.post("/webhook/nextcloud/{username}") async def nextcloud_talk_webhook(username: str, request: Request, background_tasks: BackgroundTasks): channels = get_user_channels(username) cfg = channels.get("nextcloud") if not cfg: logger.warning("NCT webhook: no channel config for user %r", username) raise HTTPException(status_code=404, detail="Channel not configured for this user") persona_name = cfg.get("persona", "inara") nextcloud_url = cfg.get("url", "") secret = cfg.get("bot_secret", "") timeout = cfg.get("timeout", 55) if not secret: logger.error("NCT webhook: bot_secret missing for user %r", username) return Response(status_code=500) body = await request.body() random_header = request.headers.get("X-Nextcloud-Talk-Random", "") sig_header = request.headers.get("X-Nextcloud-Talk-Signature", "") if not _verify_signature(body, random_header, sig_header, secret): logger.warning("NCT webhook: signature mismatch for %s", username) raise HTTPException(status_code=401, detail="Invalid signature") try: payload = json.loads(body) except json.JSONDecodeError: raise HTTPException(status_code=400, detail="Invalid JSON") if payload.get("type") != "Create": return Response(status_code=200) obj = payload.get("object", {}) if obj.get("type") != "Note": return Response(status_code=200) actor = payload.get("actor", {}) target = payload.get("target", {}) if actor.get("type") == "bots": return Response(status_code=200) conversation_token = target.get("id", "") try: content = json.loads(obj.get("content", "{}")) user_text = content.get("message", "").strip() except (json.JSONDecodeError, AttributeError): user_text = (obj.get("name") or obj.get("content", "")).strip() mention_prefix = f"@{persona_name.lower()}" if user_text.lower().startswith(mention_prefix): user_text = user_text[len(mention_prefix):].strip() if not user_text: return Response(status_code=200) actor_name = actor.get("name", "User") logger.info("NCT message from %s in %s: %r", actor_name, conversation_token, user_text[:60]) background_tasks.add_task( _process_message, conversation_token, user_text, actor_name, username, persona_name, nextcloud_url, secret, timeout, cfg, ) return Response(status_code=200)