Files
OSIT-AE-API-FastAPI/app/lib_websockets_v3.py
Scott Idem 48c3ce76f0 feat(websockets): implement WebSockets V3 with granular Redis Pub/Sub
- Introduced WS_Message_V3 standardized Pydantic model and WS_Manager_V3.
- Implemented /v3/ws/ endpoint with granular Redis routing to solve "noisy neighbor" scaling issues.
- Added presence tracking using Redis Sets for group coordination.
- Comprehensive test suite added (unit and integration) covering models, manager, and routing logic.
- Documentation: Created V3 Frontend WebSocket Guide and Project design spec.
- Updated main Frontend API guide and tests README with new standards.
2026-01-30 14:44:02 -05:00

120 lines
4.2 KiB
Python

import datetime
import json
import logging
from typing import Any, Dict, List, Optional, Union
from pydantic import BaseModel, Field
import redis.asyncio as redis
from app.config import settings
log = logging.getLogger(__name__)
# --- Models ---
class WS_Message_V3(BaseModel):
"""
Standardized message schema for WebSockets V3.
"""
version: str = "3"
msg_type: str = Field(..., description="'msg', 'cmd', 'heartbeat', 'presence'")
target: str = Field(..., description="'direct', 'group', 'broadcast', 'echo'")
from_id: str = Field(..., description="client_id_random of the sender")
to_id: Optional[str] = Field(None, description="target client_id_random (for direct messages)")
group_id: Optional[str] = Field(None, description="target group_id_random (for group messages)")
cmd: Optional[str] = Field(None, description="Specific command string (e.g., 'RELOAD', 'OPEN_FILE')")
msg: Optional[str] = Field(None, description="Human-readable message content")
payload: Dict[str, Any] = Field(default_factory=dict, description="Flexible JSON data payload")
sent_at: datetime.datetime = Field(default_factory=lambda: datetime.datetime.now(datetime.timezone.utc))
class Config:
json_encoders = {
datetime.datetime: lambda v: v.isoformat()
}
# --- Manager ---
class WS_Manager_V3:
"""
Manages Redis Granular Pub/Sub and Presence for WebSockets V3.
"""
def __init__(self, redis_db: int = 6):
self.redis_db = redis_db
self.redis_url = f"redis://{settings.REDIS['server']}:{settings.REDIS['port']}"
self._redis_conn: Optional[redis.Redis] = None
async def get_redis(self) -> redis.Redis:
"""Lazy-loaded async Redis connection."""
if self._redis_conn is None:
log.info(f"WS V3: Connecting to Redis DB {self.redis_db}")
self._redis_conn = redis.Redis.from_url(
self.redis_url,
db=self.redis_db,
encoding='utf-8',
decode_responses=True
)
return self._redis_conn
def get_channel_names(self, client_id: str, group_id: Optional[str] = None) -> List[str]:
"""
Generates the list of Redis channels a client should subscribe to.
"""
channels = [
f"ws:client:{client_id}", # Direct messages
"ws:broadcast" # System-wide messages
]
if group_id:
channels.append(f"ws:group:{group_id}") # Group messages
return channels
async def update_presence(self, client_id: str, group_id: str, online: bool = True):
"""
Tracks which clients are online in which groups using Redis Sets.
"""
r = await self.get_redis()
key = f"ws:presence:{group_id}"
if online:
await r.sadd(key, client_id)
await r.expire(key, 3600) # Auto-expire in 1 hour if not refreshed
else:
await r.srem(key, client_id)
async def get_online_clients(self, group_id: str) -> List[str]:
"""Returns list of online client IDs in a group."""
r = await self.get_redis()
return await r.smembers(f"ws:presence:{group_id}")
async def publish_message(self, message: WS_Message_V3):
"""
Publishes a structured message to the correct granular Redis channel.
"""
r = await self.get_redis()
channel = ""
if message.target == "direct":
if not message.to_id:
log.warning("WS V3: Attempted direct publish without to_id")
return
channel = f"ws:client:{message.to_id}"
elif message.target == "group":
if not message.group_id:
log.warning("WS V3: Attempted group publish without group_id")
return
channel = f"ws:group:{message.group_id}"
elif message.target == "broadcast":
channel = "ws:broadcast"
elif message.target == "echo":
channel = f"ws:client:{message.from_id}"
if channel:
log.debug(f"WS V3: Publishing to {channel}")
await r.publish(channel, message.json())
# Global instance
ws_manager_v3 = WS_Manager_V3()