diff --git a/.gitignore b/.gitignore index add7b1a..9eea54b 100644 --- a/.gitignore +++ b/.gitignore @@ -15,6 +15,7 @@ home/**/sessions/ home/**/auth.json home/**/invite.json home/**/profile.json +home/**/channels.json # Syncthing Metadata .stfolder/ diff --git a/cortex/auth_utils.py b/cortex/auth_utils.py index dc0b986..f80acd9 100644 --- a/cortex/auth_utils.py +++ b/cortex/auth_utils.py @@ -195,3 +195,22 @@ def consume_invite(username: str) -> None: path.write_text(json.dumps(data) + "\n") except Exception: pass + + +# --------------------------------------------------------------------------- +# Per-user channel config +# --------------------------------------------------------------------------- + +def _channels_path(username: str) -> Path: + return settings.home_root() / username / "channels.json" + + +def get_user_channels(username: str) -> dict: + """Return the parsed channels.json for a user, or {} if not found.""" + path = _channels_path(username) + if not path.exists(): + return {} + try: + return json.loads(path.read_text()) + except Exception: + return {} diff --git a/cortex/config.py b/cortex/config.py index e16eb3a..8184e7f 100644 --- a/cortex/config.py +++ b/cortex/config.py @@ -45,21 +45,6 @@ class Settings(BaseSettings): timeout_gemini: int = 120 # frequently slow under load timeout_local: int = 300 # local models may need to load first - # Google Chat - # JWT audience (aud) claim to verify on inbound webhook requests. - # Google Chat sets aud = the Google Cloud project number (e.g. "741112865538"). - # Set to "" to disable verification (dev/testing only). - google_chat_audience: str = "" - # Google Chat must receive a response within 30s or shows an error to the user - google_chat_timeout: int = 25 - # Backend forced for Google Chat — Claude is more reliable within the 25s deadline - google_chat_backend: str = "claude" - - # Nextcloud Talk bot - nextcloud_url: str = "https://cloud.dgrzone.com" - nextcloud_talk_bot_secret: str = "" # set in .env - nextcloud_talk_timeout: int = 55 - # Auto-distillation schedule — override in .env # AUTO_DISTILL=false disables entirely scheduler_timezone: str = "America/New_York" # IANA tz — override in .env if needed diff --git a/cortex/routers/google_chat.py b/cortex/routers/google_chat.py index 391f621..85738b0 100644 --- a/cortex/routers/google_chat.py +++ b/cortex/routers/google_chat.py @@ -3,14 +3,16 @@ import logging from fastapi import APIRouter, HTTPException, Request, Response from google.auth.transport import requests as google_requests from google.oauth2 import id_token +from auth_utils import get_user_channels from context_loader import load_context from llm_client import complete +from persona import set_context from session_logger import log_turn from session_store import load as load_session, save as save_session from config import settings logger = logging.getLogger(__name__) -router = APIRouter(prefix="/channels/google-chat") +router = APIRouter() # Workspace Add-on Chat apps: JWT is issued by accounts.google.com. # (Legacy standalone Chat bots used chat@system.gserviceaccount.com — different format.) @@ -35,7 +37,7 @@ def _msg(text: str) -> dict: } -def _verify_system_id_token(token: str) -> None: +def _verify_system_id_token(token: str, audience: str) -> None: """Verify the systemIdToken from authorizationEventObject. For Workspace Add-on Chat apps Google sends the token in the request body @@ -44,13 +46,13 @@ def _verify_system_id_token(token: str) -> None: Claims verified: iss = "https://accounts.google.com" - aud = settings.google_chat_audience (the endpoint URL) + aud = the per-user audience from channels.json (the endpoint URL) """ try: claims = id_token.verify_oauth2_token( token, google_requests.Request(), - audience=settings.google_chat_audience, + audience=audience, ) except Exception as exc: logger.warning("Google Chat JWT verification failed: %s", exc) @@ -60,17 +62,30 @@ def _verify_system_id_token(token: str) -> None: raise HTTPException(status_code=401, detail="Wrong issuer") -@router.post("") -async def receive(request: Request): +@router.post("/channels/google-chat/{username}") +async def receive(username: str, request: Request): + channels = get_user_channels(username) + cfg = channels.get("google_chat") + if not cfg: + logger.warning("Google Chat: no channel config for user %r", username) + raise HTTPException(status_code=404, detail="Channel not configured for this user") + + persona_name = cfg.get("persona", "inara") + audience = cfg.get("audience", "") + backend = cfg.get("backend", settings.primary_backend) + timeout = cfg.get("timeout", 25) + + set_context(username, persona_name) + body = await request.json() # Verify the systemIdToken embedded in the request body - if settings.google_chat_audience: + if audience: token = body.get("authorizationEventObject", {}).get("systemIdToken", "") if not token: - logger.warning("Google Chat: missing systemIdToken") + logger.warning("Google Chat: missing systemIdToken for %s", username) raise HTTPException(status_code=401, detail="Missing token") - _verify_system_id_token(token) + _verify_system_id_token(token, audience) chat = body.get("chat", {}) @@ -79,8 +94,8 @@ async def receive(request: Request): if "addedToSpacePayload" in chat: space_type = chat["addedToSpacePayload"].get("space", {}).get("type", "") if space_type == "DM": - return _msg(f"✨ Hello! I'm {settings.agent_name}. What can I help you with?") - return _msg(f"✨ Hello! I'm {settings.agent_name}. Send me a message and I'll do my best to help.") + return _msg(f"✨ Hello! I'm {persona_name.capitalize()}. What can I help you with?") + return _msg(f"✨ Hello! I'm {persona_name.capitalize()}. Send me a message and I'll do my best to help.") if "removedFromSpacePayload" in chat: return Response(status_code=200) @@ -89,10 +104,10 @@ async def receive(request: Request): logger.info("Google Chat: unhandled event keys: %s", list(chat.keys())) return Response(status_code=200) - payload = chat["messagePayload"] - message = payload.get("message", {}) - space = payload.get("space", {}) - user = chat.get("user", {}) + payload = chat["messagePayload"] + message = payload.get("message", {}) + space = payload.get("space", {}) + user = chat.get("user", {}) # argumentText strips @BotName mentions in Spaces; fall back to full text in DMs user_text = (message.get("argumentText") or message.get("text", "")).strip() @@ -107,7 +122,7 @@ async def receive(request: Request): logger.warning("Google Chat: empty user_text, ignoring") return Response(status_code=200) - session_id = "gc_" + space_name.replace("/", "_") + session_id = f"gc_{username}_{space_name.replace('/', '_')}" system_prompt = load_context(settings.default_tier) history = load_session(session_id) history.append({"role": "user", "content": user_text}) @@ -117,9 +132,9 @@ async def receive(request: Request): complete( system_prompt=system_prompt, messages=history, - model=settings.google_chat_backend, + model=backend, ), - timeout=settings.google_chat_timeout, + timeout=timeout, ) except asyncio.TimeoutError: logger.warning("Google Chat request timed out for session %s", session_id) diff --git a/cortex/routers/nextcloud_talk.py b/cortex/routers/nextcloud_talk.py index cfe5ffd..d2ded63 100644 --- a/cortex/routers/nextcloud_talk.py +++ b/cortex/routers/nextcloud_talk.py @@ -8,11 +8,13 @@ import secrets import httpx from fastapi import APIRouter, BackgroundTasks, HTTPException, Request, Response -from config import settings +from auth_utils import get_user_channels from context_loader import load_context from llm_client import complete +from persona import set_context from session_logger import log_turn from session_store import load as load_session, save as save_session +from config import settings import event_bus logger = logging.getLogger(__name__) @@ -26,29 +28,29 @@ if not logger.handlers: router = APIRouter() -def _verify_signature(body: bytes, random_header: str, sig_header: str) -> bool: +def _verify_signature(body: bytes, random_header: str, sig_header: str, secret: str) -> bool: """Nextcloud signs requests with HMAC-SHA256(key=secret, msg=random+body).""" expected = hmac.new( - settings.nextcloud_talk_bot_secret.encode(), + secret.encode(), (random_header + body.decode("utf-8", errors="replace")).encode(), hashlib.sha256, ).hexdigest() return hmac.compare_digest(expected, sig_header.lower()) -async def _send_reply(conversation_token: str, message: str) -> None: +async def _send_reply(conversation_token: str, message: str, nextcloud_url: str, secret: str) -> None: """Post a message to Nextcloud Talk as the bot.""" url = ( - f"{settings.nextcloud_url}/ocs/v2.php/apps/spreed/api/v1" + f"{nextcloud_url}/ocs/v2.php/apps/spreed/api/v1" f"/bot/{conversation_token}/message" ) # NC Talk verifies HMAC over (random + message_text), NOT the raw body. # See BotController::getBotFromHeaders → checksumVerificationService::validateRequest($random, $sig, $secret, $message) - body_dict = {"message": message} + body_dict = {"message": message} body_bytes = json.dumps(body_dict, ensure_ascii=False).encode("utf-8") random_str = secrets.token_hex(32) sig = hmac.new( - settings.nextcloud_talk_bot_secret.encode(), + secret.encode(), (random_str + message).encode("utf-8"), hashlib.sha256, ).hexdigest() @@ -72,9 +74,21 @@ async def _send_reply(conversation_token: str, message: str) -> None: logger.error("NCT reply error: %s", e) -async def _process_message(conversation_token: str, user_text: str, actor_name: str) -> None: +async def _process_message( + conversation_token: str, + user_text: str, + actor_name: str, + username: str, + persona_name: str, + nextcloud_url: str, + secret: str, + timeout: int, +) -> None: logger.info("NCT process: token=%s user=%s text=%r", conversation_token, actor_name, user_text) - session_id = f"nct_{conversation_token}" + + set_context(username, persona_name) + + session_id = f"nct_{username}_{conversation_token}" system_prompt = load_context(settings.default_tier) history = load_session(session_id) history.append({"role": "user", "content": user_text}) @@ -90,15 +104,15 @@ async def _process_message(conversation_token: str, user_text: str, actor_name: try: response_text, backend = await asyncio.wait_for( complete(system_prompt=system_prompt, messages=history), - timeout=settings.nextcloud_talk_timeout, + timeout=timeout, ) except asyncio.TimeoutError: logger.warning("NCT timeout for %s", conversation_token) - await _send_reply(conversation_token, "⏳ Still thinking — this is taking longer than usual.") + await _send_reply(conversation_token, "⏳ Still thinking — this is taking longer than usual.", nextcloud_url, secret) return except Exception as e: logger.error("NCT LLM error for %s: %s", conversation_token, e) - await _send_reply(conversation_token, "⚠️ Something went wrong on my end.") + await _send_reply(conversation_token, "⚠️ Something went wrong on my end.", nextcloud_url, secret) return logger.info("NCT LLM responded via %s (%d chars)", backend, len(response_text)) @@ -114,22 +128,33 @@ async def _process_message(conversation_token: str, user_text: str, actor_name: "backend": backend, }) - await _send_reply(conversation_token, response_text) + await _send_reply(conversation_token, response_text, nextcloud_url, secret) -@router.post("/inara-nextcloud-talk-webhook") -async def nextcloud_talk_webhook(request: Request, background_tasks: BackgroundTasks): - body = await request.body() +@router.post("/webhook/nextcloud/{username}") +async def nextcloud_talk_webhook(username: str, request: Request, background_tasks: BackgroundTasks): + channels = get_user_channels(username) + cfg = channels.get("nextcloud") + if not cfg: + logger.warning("NCT webhook: no channel config for user %r", username) + raise HTTPException(status_code=404, detail="Channel not configured for this user") - if not settings.nextcloud_talk_bot_secret: - logger.error("nextcloud_talk_bot_secret not configured") + persona_name = cfg.get("persona", "inara") + nextcloud_url = cfg.get("url", "") + secret = cfg.get("bot_secret", "") + timeout = cfg.get("timeout", 55) + + if not secret: + logger.error("NCT webhook: bot_secret missing for user %r", username) return Response(status_code=500) + body = await request.body() + random_header = request.headers.get("X-Nextcloud-Talk-Random", "") sig_header = request.headers.get("X-Nextcloud-Talk-Signature", "") - if not _verify_signature(body, random_header, sig_header): - logger.warning("NCT webhook: signature mismatch") + if not _verify_signature(body, random_header, sig_header, secret): + logger.warning("NCT webhook: signature mismatch for %s", username) raise HTTPException(status_code=401, detail="Invalid signature") try: @@ -153,12 +178,12 @@ async def nextcloud_talk_webhook(request: Request, background_tasks: BackgroundT conversation_token = target.get("id", "") try: - content = json.loads(obj.get("content", "{}")) + content = json.loads(obj.get("content", "{}")) user_text = content.get("message", "").strip() except (json.JSONDecodeError, AttributeError): user_text = (obj.get("name") or obj.get("content", "")).strip() - mention_prefix = f"@{settings.agent_name.lower()}" + mention_prefix = f"@{persona_name.lower()}" if user_text.lower().startswith(mention_prefix): user_text = user_text[len(mention_prefix):].strip() @@ -168,5 +193,9 @@ async def nextcloud_talk_webhook(request: Request, background_tasks: BackgroundT actor_name = actor.get("name", "User") logger.info("NCT message from %s in %s: %r", actor_name, conversation_token, user_text[:60]) - background_tasks.add_task(_process_message, conversation_token, user_text, actor_name) + background_tasks.add_task( + _process_message, + conversation_token, user_text, actor_name, + username, persona_name, nextcloud_url, secret, timeout, + ) return Response(status_code=200)