Files
Cortex-Inara/cortex/event_bus.py
Scott Idem 3455c7a09c Add SSE real-time Talk activity, file editor UI, and identity file API
- event_bus.py: in-process asyncio pub/sub (one Queue per SSE client)
- nextcloud_talk.py: publishes nct_message/nct_response events to bus
- chat.py: GET /events SSE endpoint streams Talk activity to browser
- routers/files.py: whitelist-protected GET/PUT for Inara identity .md files
- main.py: register files router
- static/index.html: real-time Talk feed, blue badge on Sessions btn,
  Files modal with preview/edit toggle and Ctrl+S save

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-17 21:10:07 -04:00

34 lines
737 B
Python

"""
Simple in-process pub/sub for server-sent events.
Usage:
# Publisher (e.g. nextcloud_talk router)
await event_bus.publish({"type": "nct_message", ...})
# Consumer (SSE endpoint in chat router)
q = event_bus.subscribe()
try:
event = await asyncio.wait_for(q.get(), timeout=20)
finally:
event_bus.unsubscribe(q)
"""
import asyncio
from typing import Any
_subscribers: set[asyncio.Queue] = set()
def subscribe() -> asyncio.Queue:
q: asyncio.Queue = asyncio.Queue()
_subscribers.add(q)
return q
def unsubscribe(q: asyncio.Queue) -> None:
_subscribers.discard(q)
async def publish(event: dict[str, Any]) -> None:
for q in list(_subscribers):
await q.put(event)