4.9 KiB
Project: Aether API WebSockets V3
1. Overview
The goal of WebSockets V3 is to provide a high-performance, scalable, and standardized real-time communication layer for the Aether Platform. This version focuses on efficient message routing using Redis granular Pub/Sub, integration with the Vision ID (string-based) pattern, and strict data validation via Pydantic.
The primary use case is Group Coordination: allowing a "controller" client to send commands or messages to one or more "worker" clients within the same group.
2. Analysis of Previous Versions
V1: websockets.py (Memory-Based)
- Mechanism: Maintained a list of
WebSocketobjects in a Python list (active_connections). - Limitation: Did not scale across multiple Docker containers. Clients on instance A could not communicate with clients on instance B.
- Feature: Basic support for
direct,group, andbroadcast.
V2: websockets_redis.py (Global Pub/Sub)
- Mechanism: Uses
redis.asyncioto publish all messages to a singlechannel:ws. - Limitation: "Noisy Neighbor" Problem. Every API instance receives every message sent across the entire platform and must filter them in Python code (
if data.get('target') == 'group'). This wastes CPU and network bandwidth at scale. - Feature: Solved multi-instance connectivity.
3. V3 Architecture: Granular Pub/Sub
Granular Redis Channels
V3 will move filtering from Python to Redis by using specific channel names. A client will subscribe only to the channels relevant to them:
- Client Channel:
ws:client:{client_id_random}(For Direct Messages) - Group Channel:
ws:group:{group_id_random}(For Group Messages) - Global Channel:
ws:broadcast(For System-wide Messages)
Vision ID Integration
- All IDs in the WebSocket path and payload will be string-based
id_randomvalues. - Path format:
/v3/ws/group/{group_id_random}/client/{client_id_random}
Standardized Message Schema
All V3 messages will follow a strict Pydantic model to ensure consistency between different device types.
class WS_Message_V3(BaseModel):
version: str = "3"
msg_type: str # 'msg', 'cmd', 'heartbeat', 'presence'
target: str # 'direct', 'group', 'broadcast', 'echo'
from_id: str # client_id_random
to_id: Optional[str] # target client_id_random (for direct)
group_id: Optional[str] # target group_id_random (for group)
cmd: Optional[str] # Specific command string
msg: Optional[str] # Human-readable message
payload: Dict[str, Any] # Flexible JSON data
sent_at: datetime
4. Backend Implementation Plan
Phase 1: Library Layer (app/lib_websockets_v3.py)
- Define the
WS_Message_V3model. - Implement
WS_Manager_V3to handle Redis connections and channel string generation. - Add presence tracking using Redis Sets (
SADDon connect,SREMon disconnect).
Phase 2: Router Layer (app/routers/websockets_v3.py)
- Implement the
/v3/ws/...endpoint. - Receiver Loop: Receives JSON from client -> Validates -> Publishes to correct Redis channel.
- Sender Loop: Listens to multiple Redis channels -> Forwards messages to the client.
Phase 3: Integration
- Register the router in
app/routers/registry.py. - Ensure legacy endpoints (
/ws/group/...) remain functional inwebsockets_redis.py.
5. Frontend Integration & Changes
The frontend will need several updates to support the V3 protocol:
- Connection URL: Update connection logic to use the
/v3/prefix.- Old:
ws://api.domain.com/ws/group/{id}/client/{id} - New:
ws://api.domain.com/v3/ws/group/{id}/client/{id}
- Old:
- Payload Wrapping: All outgoing messages must be wrapped in the
WS_Message_V3structure.- Instead of sending raw text or simple JSON, send the structured object.
- Targeting Logic:
- To send to the group, set
target: "group". - To send to one specific device, set
target: "direct"and provideto_id.
- To send to the group, set
- Heartbeats: The frontend should ideally send a
msg_type: "heartbeat"every 30-60 seconds to keep the connection alive and update presence in Redis. - Response Handling: Incoming messages will now have a consistent shape, making it easier to route data to internal app state or components.
6. Security & Safety
- API Key Verification: WebSocket handshakes should optionally verify the
X-Aether-API-Keyduring the upgrade request. - Isolation: V3 will use its own Redis database or a strict prefixing strategy to ensure messages never bleed into legacy channels.
- Error Handling: Standardize the close codes (e.g., 4000 for invalid message schema).
7. Verification Plan
- Create
tests/e2e/test_e2e_v3_websockets.py. - Use
websocketspython library to simulate multiple concurrent clients. - Test cross-instance communication (if possible in the test environment).