""" Outbound notification helpers — send messages to user channels proactively. Channel config lives in home/{user}/channels.json: { "notification_channel": "email" | "nextcloud" | "google_chat", "notification_email": "", "nextcloud": { "url": "...", "bot_secret": "...", "notification_room": "", ... }, "google_chat": { "outbound_webhook": "https://chat.googleapis.com/v1/spaces/...", ... } } If notification_channel is absent, defaults to "nextcloud" if configured. """ import asyncio import hashlib import hmac import json import logging import secrets import httpx logger = logging.getLogger(__name__) async def _send_nct_message(url: str, secret: str, room: str, message: str) -> None: """Post a message to a Nextcloud Talk room as the bot.""" endpoint = f"{url}/ocs/v2.php/apps/spreed/api/v1/bot/{room}/message" random_str = secrets.token_hex(32) sig = hmac.new( secret.encode(), (random_str + message).encode("utf-8"), hashlib.sha256, ).hexdigest() body = json.dumps({"message": message}, ensure_ascii=False).encode("utf-8") try: async with httpx.AsyncClient() as client: resp = await client.post( endpoint, content=body, headers={ "Content-Type": "application/json", "OCS-APIRequest": "true", "X-Nextcloud-Talk-Bot-Random": random_str, "X-Nextcloud-Talk-Bot-Signature": sig, }, timeout=15, ) if resp.status_code not in (200, 201): logger.warning("notify NCT %s → HTTP %d: %s", room, resp.status_code, resp.text[:200]) else: logger.info("notify NCT → %s (%d chars)", room, len(message)) except Exception as e: logger.error("notify NCT error: %s", e) async def _notify_nct(nct: dict, message: str, username: str) -> None: room = nct.get("notification_room", "").strip() url = nct.get("url", "").rstrip("/") secret = nct.get("bot_secret", "") if not room: logger.debug("notify: NCT notification_room not set for %s — skipping", username) return if not url or not secret: logger.warning("notify: NCT config incomplete for %s (missing url or secret)", username) return await _send_nct_message(url, secret, room, message) async def _notify_email(username: str, message: str, email_override: str | None = None) -> None: """Send notification via email. Address = override → google_email from auth.json.""" from auth_utils import _read_auth from email_utils import send_email to_addr = email_override or _read_auth(username).get("google_email", "").strip() if not to_addr: logger.warning("notify: no email address for %s — set notification_email in channels.json", username) return ok = await asyncio.to_thread( send_email, to_email=to_addr, subject="Cortex", body_text=message, body_html=message.replace("\n", "
"), ) if ok: logger.info("notify email → %s (%d chars)", to_addr, len(message)) else: logger.warning("notify: email send failed for %s", username) async def _notify_google_chat(webhook_url: str, message: str, username: str) -> None: """POST a message to a Google Chat space via incoming webhook.""" body = json.dumps({"text": message}, ensure_ascii=False).encode("utf-8") try: async with httpx.AsyncClient() as client: resp = await client.post( webhook_url, content=body, headers={"Content-Type": "application/json"}, timeout=15, ) if resp.status_code not in (200, 201): logger.warning("notify Google Chat %s → HTTP %d: %s", username, resp.status_code, resp.text[:200]) else: logger.info("notify Google Chat → %s (%d chars)", username, len(message)) except Exception as e: logger.error("notify Google Chat error for %s: %s", username, e) async def notify(username: str, message: str, channel: str | None = None) -> None: """Send a notification to the user's preferred outbound channel. Channel resolution order: 1. `channel` parameter if provided 2. `notification_channel` key in channels.json 3. "nextcloud" if notification_room is configured 4. Silent no-op Configure via home/{user}/channels.json — see module docstring. """ from auth_utils import get_user_channels channels = get_user_channels(username) target = channel or channels.get("notification_channel", "").strip() if not target: # Auto-detect: nextcloud if a notification_room is set nct = channels.get("nextcloud", {}) if nct.get("notification_room", "").strip(): target = "nextcloud" else: return if target == "email": email_override = channels.get("notification_email", "").strip() or None await _notify_email(username, message, email_override=email_override) elif target == "nextcloud": nct = channels.get("nextcloud") if not nct: logger.debug("notify: nextcloud not configured for %s", username) return await _notify_nct(nct, message, username) elif target == "google_chat": gc = channels.get("google_chat", {}) webhook = gc.get("outbound_webhook", "").strip() if not webhook: logger.debug("notify: google_chat outbound_webhook not set for %s", username) return await _notify_google_chat(webhook, message, username) else: logger.debug("notify: channel %r not supported for outbound (user %s)", target, username)