feat(websockets): implement WebSockets V3 with granular Redis Pub/Sub
- Introduced WS_Message_V3 standardized Pydantic model and WS_Manager_V3. - Implemented /v3/ws/ endpoint with granular Redis routing to solve "noisy neighbor" scaling issues. - Added presence tracking using Redis Sets for group coordination. - Comprehensive test suite added (unit and integration) covering models, manager, and routing logic. - Documentation: Created V3 Frontend WebSocket Guide and Project design spec. - Updated main Frontend API guide and tests README with new standards.
This commit is contained in:
119
app/lib_websockets_v3.py
Normal file
119
app/lib_websockets_v3.py
Normal file
@@ -0,0 +1,119 @@
|
||||
import datetime
|
||||
import json
|
||||
import logging
|
||||
from typing import Any, Dict, List, Optional, Union
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
import redis.asyncio as redis
|
||||
from app.config import settings
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
# --- Models ---
|
||||
|
||||
class WS_Message_V3(BaseModel):
|
||||
"""
|
||||
Standardized message schema for WebSockets V3.
|
||||
"""
|
||||
version: str = "3"
|
||||
msg_type: str = Field(..., description="'msg', 'cmd', 'heartbeat', 'presence'")
|
||||
target: str = Field(..., description="'direct', 'group', 'broadcast', 'echo'")
|
||||
|
||||
from_id: str = Field(..., description="client_id_random of the sender")
|
||||
to_id: Optional[str] = Field(None, description="target client_id_random (for direct messages)")
|
||||
group_id: Optional[str] = Field(None, description="target group_id_random (for group messages)")
|
||||
|
||||
cmd: Optional[str] = Field(None, description="Specific command string (e.g., 'RELOAD', 'OPEN_FILE')")
|
||||
msg: Optional[str] = Field(None, description="Human-readable message content")
|
||||
|
||||
payload: Dict[str, Any] = Field(default_factory=dict, description="Flexible JSON data payload")
|
||||
sent_at: datetime.datetime = Field(default_factory=lambda: datetime.datetime.now(datetime.timezone.utc))
|
||||
|
||||
class Config:
|
||||
json_encoders = {
|
||||
datetime.datetime: lambda v: v.isoformat()
|
||||
}
|
||||
|
||||
# --- Manager ---
|
||||
|
||||
class WS_Manager_V3:
|
||||
"""
|
||||
Manages Redis Granular Pub/Sub and Presence for WebSockets V3.
|
||||
"""
|
||||
def __init__(self, redis_db: int = 6):
|
||||
self.redis_db = redis_db
|
||||
self.redis_url = f"redis://{settings.REDIS['server']}:{settings.REDIS['port']}"
|
||||
self._redis_conn: Optional[redis.Redis] = None
|
||||
|
||||
async def get_redis(self) -> redis.Redis:
|
||||
"""Lazy-loaded async Redis connection."""
|
||||
if self._redis_conn is None:
|
||||
log.info(f"WS V3: Connecting to Redis DB {self.redis_db}")
|
||||
self._redis_conn = redis.Redis.from_url(
|
||||
self.redis_url,
|
||||
db=self.redis_db,
|
||||
encoding='utf-8',
|
||||
decode_responses=True
|
||||
)
|
||||
return self._redis_conn
|
||||
|
||||
def get_channel_names(self, client_id: str, group_id: Optional[str] = None) -> List[str]:
|
||||
"""
|
||||
Generates the list of Redis channels a client should subscribe to.
|
||||
"""
|
||||
channels = [
|
||||
f"ws:client:{client_id}", # Direct messages
|
||||
"ws:broadcast" # System-wide messages
|
||||
]
|
||||
if group_id:
|
||||
channels.append(f"ws:group:{group_id}") # Group messages
|
||||
return channels
|
||||
|
||||
async def update_presence(self, client_id: str, group_id: str, online: bool = True):
|
||||
"""
|
||||
Tracks which clients are online in which groups using Redis Sets.
|
||||
"""
|
||||
r = await self.get_redis()
|
||||
key = f"ws:presence:{group_id}"
|
||||
if online:
|
||||
await r.sadd(key, client_id)
|
||||
await r.expire(key, 3600) # Auto-expire in 1 hour if not refreshed
|
||||
else:
|
||||
await r.srem(key, client_id)
|
||||
|
||||
async def get_online_clients(self, group_id: str) -> List[str]:
|
||||
"""Returns list of online client IDs in a group."""
|
||||
r = await self.get_redis()
|
||||
return await r.smembers(f"ws:presence:{group_id}")
|
||||
|
||||
async def publish_message(self, message: WS_Message_V3):
|
||||
"""
|
||||
Publishes a structured message to the correct granular Redis channel.
|
||||
"""
|
||||
r = await self.get_redis()
|
||||
channel = ""
|
||||
|
||||
if message.target == "direct":
|
||||
if not message.to_id:
|
||||
log.warning("WS V3: Attempted direct publish without to_id")
|
||||
return
|
||||
channel = f"ws:client:{message.to_id}"
|
||||
|
||||
elif message.target == "group":
|
||||
if not message.group_id:
|
||||
log.warning("WS V3: Attempted group publish without group_id")
|
||||
return
|
||||
channel = f"ws:group:{message.group_id}"
|
||||
|
||||
elif message.target == "broadcast":
|
||||
channel = "ws:broadcast"
|
||||
|
||||
elif message.target == "echo":
|
||||
channel = f"ws:client:{message.from_id}"
|
||||
|
||||
if channel:
|
||||
log.debug(f"WS V3: Publishing to {channel}")
|
||||
await r.publish(channel, message.json())
|
||||
|
||||
# Global instance
|
||||
ws_manager_v3 = WS_Manager_V3()
|
||||
@@ -10,7 +10,7 @@ from app.routers import (
|
||||
flask_cfg, hosted_file, api_v3_actions_hosted_file, lookup,
|
||||
organization, page, person,
|
||||
person_user, qr, site, site_domain, user,
|
||||
util_email, websockets, websockets_redis, e_confex, e_cvent, e_impexium, e_stripe
|
||||
util_email, websockets, websockets_redis, websockets_v3, e_confex, e_cvent, e_impexium, e_stripe
|
||||
)
|
||||
|
||||
def setup_routers(app: FastAPI):
|
||||
@@ -62,6 +62,7 @@ def setup_routers(app: FastAPI):
|
||||
app.include_router(util_email.router, tags=['Utility: Email'])
|
||||
app.include_router(websockets.router, tags=['Websockets'])
|
||||
app.include_router(websockets_redis.router, tags=['Websockets (Redis)'])
|
||||
app.include_router(websockets_v3.router, prefix='/v3', tags=['Websockets V3'])
|
||||
|
||||
app.include_router(e_confex.router, prefix='/e/confex', tags=['External Service: Confex'])
|
||||
app.include_router(e_cvent.router, prefix='/e/cvent', tags=['External Service: Cvent'])
|
||||
|
||||
109
app/routers/websockets_v3.py
Normal file
109
app/routers/websockets_v3.py
Normal file
@@ -0,0 +1,109 @@
|
||||
import asyncio
|
||||
import json
|
||||
import logging
|
||||
from typing import Optional
|
||||
|
||||
from fastapi import APIRouter, WebSocket, WebSocketDisconnect, Depends
|
||||
from pydantic import ValidationError
|
||||
|
||||
from app.lib_general_v3 import get_account_context_optional
|
||||
from app.lib_websockets_v3 import WS_Message_V3, ws_manager_v3
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
router = APIRouter()
|
||||
|
||||
@router.websocket('/ws/group/{group_id}/client/{client_id}')
|
||||
async def v3_ws_endpoint(
|
||||
websocket: WebSocket,
|
||||
group_id: str,
|
||||
client_id: str,
|
||||
):
|
||||
"""
|
||||
Main V3 WebSocket Endpoint.
|
||||
Uses granular Redis Pub/Sub for efficient message routing.
|
||||
"""
|
||||
await websocket.accept()
|
||||
log.info(f"WS V3: Client {client_id} connected to group {group_id}")
|
||||
|
||||
# 1. Presence & Subscription Setup
|
||||
await ws_manager_v3.update_presence(client_id, group_id, online=True)
|
||||
|
||||
redis_conn = await ws_manager_v3.get_redis()
|
||||
pubsub = redis_conn.pubsub()
|
||||
|
||||
channels = ws_manager_v3.get_channel_names(client_id, group_id)
|
||||
await pubsub.subscribe(*channels)
|
||||
|
||||
# --- Handlers ---
|
||||
|
||||
async def receiver_handler():
|
||||
"""Handles incoming messages from the client."""
|
||||
try:
|
||||
while True:
|
||||
data = await websocket.receive_json()
|
||||
|
||||
try:
|
||||
# Enforce standardized schema
|
||||
# Force from_id and group_id from path for security
|
||||
data['from_id'] = client_id
|
||||
data['group_id'] = group_id
|
||||
|
||||
message = WS_Message_V3(**data)
|
||||
await ws_manager_v3.publish_message(message)
|
||||
|
||||
except ValidationError as ve:
|
||||
log.warning(f"WS V3: Validation error from {client_id}: {ve.json()}")
|
||||
await websocket.send_json({
|
||||
"error": "Invalid message schema",
|
||||
"details": ve.errors(),
|
||||
"version": "3"
|
||||
})
|
||||
|
||||
except WebSocketDisconnect:
|
||||
log.info(f"WS V3: Client {client_id} disconnected (receiver)")
|
||||
raise
|
||||
except Exception as e:
|
||||
log.exception(f"WS V3: Unexpected error in receiver for {client_id}")
|
||||
|
||||
async def sender_handler():
|
||||
"""Handles outgoing messages from Redis to the client."""
|
||||
try:
|
||||
while True:
|
||||
# Use a small timeout to allow for clean task cancellation
|
||||
message = await pubsub.get_message(ignore_subscribe_messages=True, timeout=0.1)
|
||||
|
||||
if message and message['type'] == 'message':
|
||||
# Forward the structured message directly
|
||||
# Redis stores them as JSON strings
|
||||
await websocket.send_text(message['data'])
|
||||
|
||||
except Exception as e:
|
||||
log.exception(f"WS V3: Unexpected error in sender for {client_id}")
|
||||
|
||||
# --- Execution Loop ---
|
||||
|
||||
try:
|
||||
# Run both loops concurrently. If either fails or client disconnects, clean up.
|
||||
# asyncio.wait with FIRST_COMPLETED ensures we don't leave orphan tasks.
|
||||
done, pending = await asyncio.wait(
|
||||
[
|
||||
asyncio.create_task(receiver_handler()),
|
||||
asyncio.create_task(sender_handler()),
|
||||
],
|
||||
return_when=asyncio.FIRST_COMPLETED,
|
||||
)
|
||||
|
||||
# Cancel remaining task (usually the sender if the receiver caught a disconnect)
|
||||
for task in pending:
|
||||
task.cancel()
|
||||
|
||||
except Exception as e:
|
||||
log.error(f"WS V3: Loop error for {client_id}: {e}")
|
||||
|
||||
finally:
|
||||
# 2. Cleanup
|
||||
log.info(f"WS V3: Cleaning up connection for {client_id}")
|
||||
await ws_manager_v3.update_presence(client_id, group_id, online=False)
|
||||
await pubsub.unsubscribe(*channels)
|
||||
await pubsub.close()
|
||||
Reference in New Issue
Block a user