Files
OSIT-AE-API-FastAPI/documentation/PROJECT__Aether_API_Websockets_v3.md
Scott Idem 48c3ce76f0 feat(websockets): implement WebSockets V3 with granular Redis Pub/Sub
- Introduced WS_Message_V3 standardized Pydantic model and WS_Manager_V3.
- Implemented /v3/ws/ endpoint with granular Redis routing to solve "noisy neighbor" scaling issues.
- Added presence tracking using Redis Sets for group coordination.
- Comprehensive test suite added (unit and integration) covering models, manager, and routing logic.
- Documentation: Created V3 Frontend WebSocket Guide and Project design spec.
- Updated main Frontend API guide and tests README with new standards.
2026-01-30 14:44:02 -05:00

89 lines
4.9 KiB
Markdown

# Project: Aether API WebSockets V3
## 1. Overview
The goal of WebSockets V3 is to provide a high-performance, scalable, and standardized real-time communication layer for the Aether Platform. This version focuses on efficient message routing using Redis granular Pub/Sub, integration with the **Vision ID** (string-based) pattern, and strict data validation via Pydantic.
The primary use case is **Group Coordination**: allowing a "controller" client to send commands or messages to one or more "worker" clients within the same group.
## 2. Analysis of Previous Versions
### V1: `websockets.py` (Memory-Based)
* **Mechanism**: Maintained a list of `WebSocket` objects in a Python list (`active_connections`).
* **Limitation**: Did not scale across multiple Docker containers. Clients on instance A could not communicate with clients on instance B.
* **Feature**: Basic support for `direct`, `group`, and `broadcast`.
### V2: `websockets_redis.py` (Global Pub/Sub)
* **Mechanism**: Uses `redis.asyncio` to publish all messages to a single `channel:ws`.
* **Limitation**: **"Noisy Neighbor" Problem**. Every API instance receives *every* message sent across the entire platform and must filter them in Python code (`if data.get('target') == 'group'`). This wastes CPU and network bandwidth at scale.
* **Feature**: Solved multi-instance connectivity.
## 3. V3 Architecture: Granular Pub/Sub
### Granular Redis Channels
V3 will move filtering from Python to Redis by using specific channel names. A client will subscribe only to the channels relevant to them:
1. **Client Channel**: `ws:client:{client_id_random}` (For Direct Messages)
2. **Group Channel**: `ws:group:{group_id_random}` (For Group Messages)
3. **Global Channel**: `ws:broadcast` (For System-wide Messages)
### Vision ID Integration
* All IDs in the WebSocket path and payload will be string-based `id_random` values.
* Path format: `/v3/ws/group/{group_id_random}/client/{client_id_random}`
### Standardized Message Schema
All V3 messages will follow a strict Pydantic model to ensure consistency between different device types.
```python
class WS_Message_V3(BaseModel):
version: str = "3"
msg_type: str # 'msg', 'cmd', 'heartbeat', 'presence'
target: str # 'direct', 'group', 'broadcast', 'echo'
from_id: str # client_id_random
to_id: Optional[str] # target client_id_random (for direct)
group_id: Optional[str] # target group_id_random (for group)
cmd: Optional[str] # Specific command string
msg: Optional[str] # Human-readable message
payload: Dict[str, Any] # Flexible JSON data
sent_at: datetime
```
## 4. Backend Implementation Plan
### Phase 1: Library Layer (`app/lib_websockets_v3.py`)
* Define the `WS_Message_V3` model.
* Implement `WS_Manager_V3` to handle Redis connections and channel string generation.
* Add presence tracking using Redis Sets (`SADD` on connect, `SREM` on disconnect).
### Phase 2: Router Layer (`app/routers/websockets_v3.py`)
* Implement the `/v3/ws/...` endpoint.
* **Receiver Loop**: Receives JSON from client -> Validates -> Publishes to correct Redis channel.
* **Sender Loop**: Listens to multiple Redis channels -> Forwards messages to the client.
### Phase 3: Integration
* Register the router in `app/routers/registry.py`.
* Ensure legacy endpoints (`/ws/group/...`) remain functional in `websockets_redis.py`.
## 5. Frontend Integration & Changes
The frontend will need several updates to support the V3 protocol:
1. **Connection URL**: Update connection logic to use the `/v3/` prefix.
* *Old*: `ws://api.domain.com/ws/group/{id}/client/{id}`
* *New*: `ws://api.domain.com/v3/ws/group/{id}/client/{id}`
2. **Payload Wrapping**: All outgoing messages must be wrapped in the `WS_Message_V3` structure.
* Instead of sending raw text or simple JSON, send the structured object.
3. **Targeting Logic**:
* To send to the group, set `target: "group"`.
* To send to one specific device, set `target: "direct"` and provide `to_id`.
4. **Heartbeats**: The frontend should ideally send a `msg_type: "heartbeat"` every 30-60 seconds to keep the connection alive and update presence in Redis.
5. **Response Handling**: Incoming messages will now have a consistent shape, making it easier to route data to internal app state or components.
## 6. Security & Safety
* **API Key Verification**: WebSocket handshakes should optionally verify the `X-Aether-API-Key` during the upgrade request.
* **Isolation**: V3 will use its own Redis database or a strict prefixing strategy to ensure messages never bleed into legacy channels.
* **Error Handling**: Standardize the close codes (e.g., 4000 for invalid message schema).
## 7. Verification Plan
* Create `tests/e2e/test_e2e_v3_websockets.py`.
* Use `websockets` python library to simulate multiple concurrent clients.
* Test cross-instance communication (if possible in the test environment).