Files
Cortex-Inara/cortex/routers/nextcloud_talk.py
Scott Idem 3455c7a09c Add SSE real-time Talk activity, file editor UI, and identity file API
- event_bus.py: in-process asyncio pub/sub (one Queue per SSE client)
- nextcloud_talk.py: publishes nct_message/nct_response events to bus
- chat.py: GET /events SSE endpoint streams Talk activity to browser
- routers/files.py: whitelist-protected GET/PUT for Inara identity .md files
- main.py: register files router
- static/index.html: real-time Talk feed, blue badge on Sessions btn,
  Files modal with preview/edit toggle and Ctrl+S save

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-17 21:10:07 -04:00

172 lines
6.0 KiB
Python

import asyncio
import hashlib
import hmac
import json
import logging
import secrets
import httpx
from fastapi import APIRouter, BackgroundTasks, HTTPException, Request, Response
from config import settings
from context_loader import load_context
from llm_client import complete
from session_logger import log_turn
from session_store import load as load_session, save as save_session
import event_bus
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
if not logger.handlers:
_h = logging.StreamHandler()
_h.setFormatter(logging.Formatter("%(levelname)s:%(name)s: %(message)s"))
logger.addHandler(_h)
logger.propagate = False
router = APIRouter()
def _verify_signature(body: bytes, random_header: str, sig_header: str) -> bool:
"""Nextcloud signs requests with HMAC-SHA256(key=secret, msg=random+body)."""
expected = hmac.new(
settings.nextcloud_talk_bot_secret.encode(),
(random_header + body.decode("utf-8", errors="replace")).encode(),
hashlib.sha256,
).hexdigest()
return hmac.compare_digest(expected, sig_header.lower())
async def _send_reply(conversation_token: str, message: str) -> None:
"""Post a message to Nextcloud Talk as the bot."""
url = (
f"{settings.nextcloud_url}/ocs/v2.php/apps/spreed/api/v1"
f"/bot/{conversation_token}/message"
)
# NC Talk verifies HMAC over (random + message_text), NOT the raw body.
# See BotController::getBotFromHeaders → checksumVerificationService::validateRequest($random, $sig, $secret, $message)
body_dict = {"message": message}
body_bytes = json.dumps(body_dict, ensure_ascii=False).encode("utf-8")
random_str = secrets.token_hex(32)
sig = hmac.new(
settings.nextcloud_talk_bot_secret.encode(),
(random_str + message).encode("utf-8"),
hashlib.sha256,
).hexdigest()
logger.info("NCT _send_reply → %s (body: %s)", url, body_bytes.decode())
try:
async with httpx.AsyncClient() as client:
resp = await client.post(
url,
content=body_bytes,
headers={
"Content-Type": "application/json",
"OCS-APIRequest": "true",
"X-Nextcloud-Talk-Bot-Random": random_str,
"X-Nextcloud-Talk-Bot-Signature": sig,
},
timeout=15,
)
logger.info("NCT reply: %s%s", resp.status_code, resp.text[:400])
except Exception as e:
logger.error("NCT reply error: %s", e)
async def _process_message(conversation_token: str, user_text: str, actor_name: str) -> None:
logger.info("NCT process: token=%s user=%s text=%r", conversation_token, actor_name, user_text)
session_id = f"nct_{conversation_token}"
system_prompt = load_context(settings.default_tier)
history = load_session(session_id)
history.append({"role": "user", "content": user_text})
await event_bus.publish({
"type": "nct_message",
"session_id": session_id,
"role": "user",
"content": user_text,
"actor": actor_name,
})
try:
response_text, backend = await asyncio.wait_for(
complete(system_prompt=system_prompt, messages=history),
timeout=settings.nextcloud_talk_timeout,
)
except asyncio.TimeoutError:
logger.warning("NCT timeout for %s", conversation_token)
await _send_reply(conversation_token, "⏳ Still thinking — this is taking longer than usual.")
return
except Exception as e:
logger.error("NCT LLM error for %s: %s", conversation_token, e)
await _send_reply(conversation_token, "⚠️ Something went wrong on my end.")
return
logger.info("NCT LLM responded via %s (%d chars)", backend, len(response_text))
history.append({"role": "assistant", "content": response_text})
save_session(session_id, history)
log_turn(session_id, user_text, response_text)
await event_bus.publish({
"type": "nct_response",
"session_id": session_id,
"role": "assistant",
"content": response_text,
"backend": backend,
})
await _send_reply(conversation_token, response_text)
@router.post("/inara-nextcloud-talk-webhook")
async def nextcloud_talk_webhook(request: Request, background_tasks: BackgroundTasks):
body = await request.body()
if not settings.nextcloud_talk_bot_secret:
logger.error("nextcloud_talk_bot_secret not configured")
return Response(status_code=500)
random_header = request.headers.get("X-Nextcloud-Talk-Random", "")
sig_header = request.headers.get("X-Nextcloud-Talk-Signature", "")
if not _verify_signature(body, random_header, sig_header):
logger.warning("NCT webhook: signature mismatch")
raise HTTPException(status_code=401, detail="Invalid signature")
try:
payload = json.loads(body)
except json.JSONDecodeError:
raise HTTPException(status_code=400, detail="Invalid JSON")
if payload.get("type") != "Create":
return Response(status_code=200)
obj = payload.get("object", {})
if obj.get("type") != "Note":
return Response(status_code=200)
actor = payload.get("actor", {})
target = payload.get("target", {})
if actor.get("type") == "bots":
return Response(status_code=200)
conversation_token = target.get("id", "")
try:
content = json.loads(obj.get("content", "{}"))
user_text = content.get("message", "").strip()
except (json.JSONDecodeError, AttributeError):
user_text = (obj.get("name") or obj.get("content", "")).strip()
if user_text.lower().startswith("@inara"):
user_text = user_text[6:].strip()
if not user_text:
return Response(status_code=200)
actor_name = actor.get("name", "User")
logger.info("NCT message from %s in %s: %r", actor_name, conversation_token, user_text[:60])
background_tasks.add_task(_process_message, conversation_token, user_text, actor_name)
return Response(status_code=200)