""" Simple in-process pub/sub for server-sent events. Usage: # Publisher (e.g. nextcloud_talk router) await event_bus.publish({"type": "nct_message", ...}) # Consumer (SSE endpoint in chat router) q = event_bus.subscribe() try: event = await asyncio.wait_for(q.get(), timeout=20) finally: event_bus.unsubscribe(q) """ import asyncio from typing import Any _subscribers: set[asyncio.Queue] = set() def subscribe() -> asyncio.Queue: q: asyncio.Queue = asyncio.Queue() _subscribers.add(q) return q def unsubscribe(q: asyncio.Queue) -> None: _subscribers.discard(q) async def publish(event: dict[str, Any]) -> None: for q in list(_subscribers): await q.put(event)