V3 WebSocket: wire auth dependency, add heartbeat presence refresh, update frontend guide (wss://, auth query params, schema clarifications)
This commit is contained in:
@@ -6,7 +6,7 @@ from typing import Optional
|
|||||||
from fastapi import APIRouter, WebSocket, WebSocketDisconnect, Depends
|
from fastapi import APIRouter, WebSocket, WebSocketDisconnect, Depends
|
||||||
from pydantic import ValidationError
|
from pydantic import ValidationError
|
||||||
|
|
||||||
from app.lib_general_v3 import get_account_context_optional
|
from app.lib_general_v3 import AccountContext, get_account_context_optional
|
||||||
from app.lib_websockets_v3 import WS_Message_V3, ws_manager_v3
|
from app.lib_websockets_v3 import WS_Message_V3, ws_manager_v3
|
||||||
|
|
||||||
log = logging.getLogger(__name__)
|
log = logging.getLogger(__name__)
|
||||||
@@ -18,20 +18,23 @@ async def v3_ws_endpoint(
|
|||||||
websocket: WebSocket,
|
websocket: WebSocket,
|
||||||
group_id: str,
|
group_id: str,
|
||||||
client_id: str,
|
client_id: str,
|
||||||
|
account: AccountContext = Depends(get_account_context_optional),
|
||||||
):
|
):
|
||||||
"""
|
"""
|
||||||
Main V3 WebSocket Endpoint.
|
Main V3 WebSocket Endpoint.
|
||||||
Uses granular Redis Pub/Sub for efficient message routing.
|
Uses granular Redis Pub/Sub for efficient message routing.
|
||||||
"""
|
"""
|
||||||
|
# Auth: optional — guests can connect but will be limited by downstream logic.
|
||||||
|
# Pass api_key and jwt as query params since browsers cannot set custom WS headers.
|
||||||
|
log.info(f"WS V3: Client {client_id} connected to group {group_id} (auth={account.auth_method})")
|
||||||
await websocket.accept()
|
await websocket.accept()
|
||||||
log.info(f"WS V3: Client {client_id} connected to group {group_id}")
|
|
||||||
|
|
||||||
# 1. Presence & Subscription Setup
|
# 1. Presence & Subscription Setup
|
||||||
await ws_manager_v3.update_presence(client_id, group_id, online=True)
|
await ws_manager_v3.update_presence(client_id, group_id, online=True)
|
||||||
|
|
||||||
redis_conn = await ws_manager_v3.get_redis()
|
redis_conn = await ws_manager_v3.get_redis()
|
||||||
pubsub = redis_conn.pubsub()
|
pubsub = redis_conn.pubsub()
|
||||||
|
|
||||||
channels = ws_manager_v3.get_channel_names(client_id, group_id)
|
channels = ws_manager_v3.get_channel_names(client_id, group_id)
|
||||||
await pubsub.subscribe(*channels)
|
await pubsub.subscribe(*channels)
|
||||||
|
|
||||||
@@ -42,16 +45,22 @@ async def v3_ws_endpoint(
|
|||||||
try:
|
try:
|
||||||
while True:
|
while True:
|
||||||
data = await websocket.receive_json()
|
data = await websocket.receive_json()
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# Enforce standardized schema
|
# Enforce standardized schema
|
||||||
# Force from_id and group_id from path for security
|
# Force from_id and group_id from path for security
|
||||||
data['from_id'] = client_id
|
data['from_id'] = client_id
|
||||||
data['group_id'] = group_id
|
data['group_id'] = group_id
|
||||||
|
|
||||||
message = WS_Message_V3(**data)
|
message = WS_Message_V3(**data)
|
||||||
|
|
||||||
|
# Refresh presence TTL on every heartbeat so long-lived clients
|
||||||
|
# don't drop out of the presence set before they disconnect.
|
||||||
|
if message.msg_type == 'heartbeat':
|
||||||
|
await ws_manager_v3.update_presence(client_id, group_id, online=True)
|
||||||
|
|
||||||
await ws_manager_v3.publish_message(message)
|
await ws_manager_v3.publish_message(message)
|
||||||
|
|
||||||
except ValidationError as ve:
|
except ValidationError as ve:
|
||||||
log.warning(f"WS V3: Validation error from {client_id}: {ve.json()}")
|
log.warning(f"WS V3: Validation error from {client_id}: {ve.json()}")
|
||||||
await websocket.send_json({
|
await websocket.send_json({
|
||||||
@@ -59,7 +68,7 @@ async def v3_ws_endpoint(
|
|||||||
"details": ve.errors(),
|
"details": ve.errors(),
|
||||||
"version": "3"
|
"version": "3"
|
||||||
})
|
})
|
||||||
|
|
||||||
except WebSocketDisconnect:
|
except WebSocketDisconnect:
|
||||||
log.info(f"WS V3: Client {client_id} disconnected (receiver)")
|
log.info(f"WS V3: Client {client_id} disconnected (receiver)")
|
||||||
raise
|
raise
|
||||||
@@ -72,17 +81,17 @@ async def v3_ws_endpoint(
|
|||||||
while True:
|
while True:
|
||||||
# Use a small timeout to allow for clean task cancellation
|
# Use a small timeout to allow for clean task cancellation
|
||||||
message = await pubsub.get_message(ignore_subscribe_messages=True, timeout=0.1)
|
message = await pubsub.get_message(ignore_subscribe_messages=True, timeout=0.1)
|
||||||
|
|
||||||
if message and message['type'] == 'message':
|
if message and message['type'] == 'message':
|
||||||
# Forward the structured message directly
|
# Forward the structured message directly
|
||||||
# Redis stores them as JSON strings
|
# Redis stores them as JSON strings
|
||||||
await websocket.send_text(message['data'])
|
await websocket.send_text(message['data'])
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
log.exception(f"WS V3: Unexpected error in sender for {client_id}")
|
log.exception(f"WS V3: Unexpected error in sender for {client_id}")
|
||||||
|
|
||||||
# --- Execution Loop ---
|
# --- Execution Loop ---
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# Run both loops concurrently. If either fails or client disconnects, clean up.
|
# Run both loops concurrently. If either fails or client disconnects, clean up.
|
||||||
# asyncio.wait with FIRST_COMPLETED ensures we don't leave orphan tasks.
|
# asyncio.wait with FIRST_COMPLETED ensures we don't leave orphan tasks.
|
||||||
@@ -93,14 +102,14 @@ async def v3_ws_endpoint(
|
|||||||
],
|
],
|
||||||
return_when=asyncio.FIRST_COMPLETED,
|
return_when=asyncio.FIRST_COMPLETED,
|
||||||
)
|
)
|
||||||
|
|
||||||
# Cancel remaining task (usually the sender if the receiver caught a disconnect)
|
# Cancel remaining task (usually the sender if the receiver caught a disconnect)
|
||||||
for task in pending:
|
for task in pending:
|
||||||
task.cancel()
|
task.cancel()
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
log.error(f"WS V3: Loop error for {client_id}: {e}")
|
log.error(f"WS V3: Loop error for {client_id}: {e}")
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
# 2. Cleanup
|
# 2. Cleanup
|
||||||
log.info(f"WS V3: Cleaning up connection for {client_id}")
|
log.info(f"WS V3: Cleaning up connection for {client_id}")
|
||||||
|
|||||||
@@ -22,14 +22,33 @@ This guide explains how to implement real-time communication using the **Aether
|
|||||||
The V3 WebSocket path requires both a `group_id` and a `client_id` (using **Vision ID** random strings).
|
The V3 WebSocket path requires both a `group_id` and a `client_id` (using **Vision ID** random strings).
|
||||||
|
|
||||||
```text
|
```text
|
||||||
ws://[api_domain]/v3/ws/group/{group_id}/client/{client_id}
|
wss://[api_domain]/v3/ws/group/{group_id}/client/{client_id}
|
||||||
```
|
```
|
||||||
|
|
||||||
### B. Connection Example (TypeScript)
|
> Use `ws://` for local development and `wss://` in production (any HTTPS site). The Nginx config must include the Upgrade block — see Section 6.
|
||||||
|
|
||||||
|
### B. Authentication
|
||||||
|
Browsers **cannot** set custom HTTP headers on WebSocket connections. Pass the API Key and account context as **query parameters** instead:
|
||||||
|
|
||||||
|
| Parameter | Purpose | Example |
|
||||||
|
| :--- | :--- | :--- |
|
||||||
|
| `api_key` | Entry Ticket (machine auth) | `?api_key=<your_app_key>` |
|
||||||
|
| `jwt` | Visa (user / account context) | `&jwt=<token>` |
|
||||||
|
| `x_account_id` | Alt account context | `&x_account_id=<account_id>` |
|
||||||
|
|
||||||
|
**Full example URL:**
|
||||||
|
```text
|
||||||
|
wss://dev-api.oneskyit.com/v3/ws/group/{group_id}/client/{client_id}?api_key=<key>&jwt=<token>
|
||||||
|
```
|
||||||
|
|
||||||
|
### C. Connection Example (TypeScript)
|
||||||
```ts
|
```ts
|
||||||
const group_id = "group_abc123"; // Random ID
|
const group_id = "group_abc123"; // Random Vision ID
|
||||||
const client_id = "device_xyz789"; // Random ID
|
const client_id = "device_xyz789"; // Random Vision ID
|
||||||
const ws_url = `ws://api.oneskyit.com/v3/ws/group/${group_id}/client/${client_id}`;
|
const api_key = import.meta.env.VITE_API_KEY;
|
||||||
|
const jwt = getSessionToken(); // your JWT helper
|
||||||
|
|
||||||
|
const ws_url = `wss://dev-api.oneskyit.com/v3/ws/group/${group_id}/client/${client_id}?api_key=${api_key}&jwt=${jwt}`;
|
||||||
|
|
||||||
const socket = new WebSocket(ws_url);
|
const socket = new WebSocket(ws_url);
|
||||||
|
|
||||||
@@ -47,16 +66,18 @@ All messages sent and received over V3 must follow the standardized **WS_Message
|
|||||||
### Message Fields
|
### Message Fields
|
||||||
| Field | Type | Required | Description |
|
| Field | Type | Required | Description |
|
||||||
| :--- | :--- | :--- | :--- |
|
| :--- | :--- | :--- | :--- |
|
||||||
| `version` | string | Auto | Always `"3"`. |
|
| `version` | string | Auto | Always `"3"`. Set by server. |
|
||||||
| `msg_type` | string | Yes | `'msg'`, `'cmd'`, `'heartbeat'`, `'presence'` |
|
| `msg_type` | string | Yes | `'msg'`, `'cmd'`, `'heartbeat'`, `'presence'` |
|
||||||
| `target` | string | Yes | `'direct'`, `'group'`, `'broadcast'`, `'echo'` |
|
| `target` | string | Yes | `'direct'`, `'group'`, `'broadcast'`, `'echo'` |
|
||||||
| `from_id` | string | No* | Client ID of sender (Auto-filled by server if omitted). |
|
| `from_id` | string | Auto | **Server fills this from the URL path** — do not send. |
|
||||||
| `to_id` | string | No | Target Client ID (Required for `target: 'direct'`). |
|
| `to_id` | string | Conditional | Target Client ID. Required when `target` is `'direct'`. |
|
||||||
| `group_id` | string | No* | Target Group ID (Auto-filled by server if omitted). |
|
| `group_id` | string | Auto | **Server fills this from the URL path** — do not send. |
|
||||||
| `cmd` | string | No | Specific action keyword (e.g., `'RELOAD'`). |
|
| `cmd` | string | No | Specific action keyword (e.g., `'RELOAD'`). |
|
||||||
| `msg` | string | No | Human-readable text content. |
|
| `msg` | string | No | Human-readable text content. |
|
||||||
| `payload` | object | No | Flexible key-value data. |
|
| `payload` | object | No | Flexible key-value data. |
|
||||||
| `sent_at` | string | Auto | ISO 8601 Timestamp. |
|
| `sent_at` | string | Auto | ISO 8601 Timestamp. Set by server. |
|
||||||
|
|
||||||
|
> **Frontend tip:** Only send `msg_type`, `target`, and whatever content fields you need (`msg`, `cmd`, `payload`, `to_id`). The server enforces `from_id`, `group_id`, and `sent_at` from the connection context, preventing spoofing.
|
||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
@@ -121,7 +142,12 @@ Used for remote control or orchestration.
|
|||||||
```
|
```
|
||||||
|
|
||||||
### Heartbeats (`heartbeat`)
|
### Heartbeats (`heartbeat`)
|
||||||
Keep the connection alive and refresh presence in the backend. Should be sent every 30-60 seconds.
|
Keep the connection alive and **refresh presence** in the backend. Should be sent every 30-60 seconds.
|
||||||
|
|
||||||
|
- The server intercepts `heartbeat` messages and refreshes the Redis presence TTL (1 hour window) before echoing back.
|
||||||
|
- Without periodic heartbeats, a client idle for >1 hour may disappear from the presence set even while still connected.
|
||||||
|
- Use `target: 'echo'` so the server sends the heartbeat straight back — useful for measuring round-trip latency.
|
||||||
|
|
||||||
```json
|
```json
|
||||||
{
|
{
|
||||||
"msg_type": "heartbeat",
|
"msg_type": "heartbeat",
|
||||||
|
|||||||
Reference in New Issue
Block a user