Files
Cortex-Inara/cortex/routers/nextcloud_talk.py
Scott Idem fe854ee534 Add Nextcloud Talk bot integration (Inara)
- New routers/nextcloud_talk.py: webhook handler verifies incoming HMAC,
  calls LLM via BackgroundTasks, posts reply with correctly computed
  signature (random + message_text, not raw body)
- llm_client.py: read Claude OAuth token live from
  ~/.claude/.credentials.json to avoid stale systemd env tokens;
  strip conflicting ANTHROPIC_API_KEY
- config.py: add nextcloud_url, nextcloud_talk_bot_secret,
  nextcloud_talk_timeout settings
- main.py: register nextcloud_talk router, add logging setup
- docs/NEXTCLOUD_TALK_BOT.md: installation guide + HMAC signing gotcha

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-16 23:04:26 -04:00

154 lines
5.6 KiB
Python

import asyncio
import hashlib
import hmac
import json
import logging
import secrets
import httpx
from fastapi import APIRouter, BackgroundTasks, HTTPException, Request, Response
from config import settings
from context_loader import load_context
from llm_client import complete
from session_logger import log_turn
from session_store import load as load_session, save as save_session
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
if not logger.handlers:
_h = logging.StreamHandler()
_h.setFormatter(logging.Formatter("%(levelname)s:%(name)s: %(message)s"))
logger.addHandler(_h)
logger.propagate = False
router = APIRouter()
def _verify_signature(body: bytes, random_header: str, sig_header: str) -> bool:
"""Nextcloud signs requests with HMAC-SHA256(key=secret, msg=random+body)."""
expected = hmac.new(
settings.nextcloud_talk_bot_secret.encode(),
(random_header + body.decode("utf-8", errors="replace")).encode(),
hashlib.sha256,
).hexdigest()
return hmac.compare_digest(expected, sig_header.lower())
async def _send_reply(conversation_token: str, message: str) -> None:
"""Post a message to Nextcloud Talk as the bot."""
url = (
f"{settings.nextcloud_url}/ocs/v2.php/apps/spreed/api/v1"
f"/bot/{conversation_token}/message"
)
# NC Talk verifies HMAC over (random + message_text), NOT the raw body.
# See BotController::getBotFromHeaders → checksumVerificationService::validateRequest($random, $sig, $secret, $message)
body_dict = {"message": message}
body_bytes = json.dumps(body_dict, ensure_ascii=False).encode("utf-8")
random_str = secrets.token_hex(32)
sig = hmac.new(
settings.nextcloud_talk_bot_secret.encode(),
(random_str + message).encode("utf-8"),
hashlib.sha256,
).hexdigest()
logger.info("NCT _send_reply → %s (body: %s)", url, body_bytes.decode())
try:
async with httpx.AsyncClient() as client:
resp = await client.post(
url,
content=body_bytes,
headers={
"Content-Type": "application/json",
"OCS-APIRequest": "true",
"X-Nextcloud-Talk-Bot-Random": random_str,
"X-Nextcloud-Talk-Bot-Signature": sig,
},
timeout=15,
)
logger.info("NCT reply: %s%s", resp.status_code, resp.text[:400])
except Exception as e:
logger.error("NCT reply error: %s", e)
async def _process_message(conversation_token: str, user_text: str, actor_name: str) -> None:
logger.info("NCT process: token=%s user=%s text=%r", conversation_token, actor_name, user_text)
session_id = f"nct_{conversation_token}"
system_prompt = load_context(settings.default_tier)
history = load_session(session_id)
history.append({"role": "user", "content": user_text})
try:
response_text, backend = await asyncio.wait_for(
complete(system_prompt=system_prompt, messages=history),
timeout=settings.nextcloud_talk_timeout,
)
except asyncio.TimeoutError:
logger.warning("NCT timeout for %s", conversation_token)
await _send_reply(conversation_token, "⏳ Still thinking — this is taking longer than usual.")
return
except Exception as e:
logger.error("NCT LLM error for %s: %s", conversation_token, e)
await _send_reply(conversation_token, "⚠️ Something went wrong on my end.")
return
logger.info("NCT LLM responded via %s (%d chars)", backend, len(response_text))
history.append({"role": "assistant", "content": response_text})
save_session(session_id, history)
log_turn(session_id, user_text, response_text)
await _send_reply(conversation_token, response_text)
@router.post("/inara-nextcloud-talk-webhook")
async def nextcloud_talk_webhook(request: Request, background_tasks: BackgroundTasks):
body = await request.body()
if not settings.nextcloud_talk_bot_secret:
logger.error("nextcloud_talk_bot_secret not configured")
return Response(status_code=500)
random_header = request.headers.get("X-Nextcloud-Talk-Random", "")
sig_header = request.headers.get("X-Nextcloud-Talk-Signature", "")
if not _verify_signature(body, random_header, sig_header):
logger.warning("NCT webhook: signature mismatch")
raise HTTPException(status_code=401, detail="Invalid signature")
try:
payload = json.loads(body)
except json.JSONDecodeError:
raise HTTPException(status_code=400, detail="Invalid JSON")
if payload.get("type") != "Create":
return Response(status_code=200)
obj = payload.get("object", {})
if obj.get("type") != "Note":
return Response(status_code=200)
actor = payload.get("actor", {})
target = payload.get("target", {})
if actor.get("type") == "bots":
return Response(status_code=200)
conversation_token = target.get("id", "")
try:
content = json.loads(obj.get("content", "{}"))
user_text = content.get("message", "").strip()
except (json.JSONDecodeError, AttributeError):
user_text = (obj.get("name") or obj.get("content", "")).strip()
if user_text.lower().startswith("@inara"):
user_text = user_text[6:].strip()
if not user_text:
return Response(status_code=200)
actor_name = actor.get("name", "User")
logger.info("NCT message from %s in %s: %r", actor_name, conversation_token, user_text[:60])
background_tasks.add_task(_process_message, conversation_token, user_text, actor_name)
return Response(status_code=200)