diff --git a/app/lib_websockets_v3.py b/app/lib_websockets_v3.py new file mode 100644 index 0000000..18d7764 --- /dev/null +++ b/app/lib_websockets_v3.py @@ -0,0 +1,119 @@ +import datetime +import json +import logging +from typing import Any, Dict, List, Optional, Union +from pydantic import BaseModel, Field + +import redis.asyncio as redis +from app.config import settings + +log = logging.getLogger(__name__) + +# --- Models --- + +class WS_Message_V3(BaseModel): + """ + Standardized message schema for WebSockets V3. + """ + version: str = "3" + msg_type: str = Field(..., description="'msg', 'cmd', 'heartbeat', 'presence'") + target: str = Field(..., description="'direct', 'group', 'broadcast', 'echo'") + + from_id: str = Field(..., description="client_id_random of the sender") + to_id: Optional[str] = Field(None, description="target client_id_random (for direct messages)") + group_id: Optional[str] = Field(None, description="target group_id_random (for group messages)") + + cmd: Optional[str] = Field(None, description="Specific command string (e.g., 'RELOAD', 'OPEN_FILE')") + msg: Optional[str] = Field(None, description="Human-readable message content") + + payload: Dict[str, Any] = Field(default_factory=dict, description="Flexible JSON data payload") + sent_at: datetime.datetime = Field(default_factory=lambda: datetime.datetime.now(datetime.timezone.utc)) + + class Config: + json_encoders = { + datetime.datetime: lambda v: v.isoformat() + } + +# --- Manager --- + +class WS_Manager_V3: + """ + Manages Redis Granular Pub/Sub and Presence for WebSockets V3. + """ + def __init__(self, redis_db: int = 6): + self.redis_db = redis_db + self.redis_url = f"redis://{settings.REDIS['server']}:{settings.REDIS['port']}" + self._redis_conn: Optional[redis.Redis] = None + + async def get_redis(self) -> redis.Redis: + """Lazy-loaded async Redis connection.""" + if self._redis_conn is None: + log.info(f"WS V3: Connecting to Redis DB {self.redis_db}") + self._redis_conn = redis.Redis.from_url( + self.redis_url, + db=self.redis_db, + encoding='utf-8', + decode_responses=True + ) + return self._redis_conn + + def get_channel_names(self, client_id: str, group_id: Optional[str] = None) -> List[str]: + """ + Generates the list of Redis channels a client should subscribe to. + """ + channels = [ + f"ws:client:{client_id}", # Direct messages + "ws:broadcast" # System-wide messages + ] + if group_id: + channels.append(f"ws:group:{group_id}") # Group messages + return channels + + async def update_presence(self, client_id: str, group_id: str, online: bool = True): + """ + Tracks which clients are online in which groups using Redis Sets. + """ + r = await self.get_redis() + key = f"ws:presence:{group_id}" + if online: + await r.sadd(key, client_id) + await r.expire(key, 3600) # Auto-expire in 1 hour if not refreshed + else: + await r.srem(key, client_id) + + async def get_online_clients(self, group_id: str) -> List[str]: + """Returns list of online client IDs in a group.""" + r = await self.get_redis() + return await r.smembers(f"ws:presence:{group_id}") + + async def publish_message(self, message: WS_Message_V3): + """ + Publishes a structured message to the correct granular Redis channel. + """ + r = await self.get_redis() + channel = "" + + if message.target == "direct": + if not message.to_id: + log.warning("WS V3: Attempted direct publish without to_id") + return + channel = f"ws:client:{message.to_id}" + + elif message.target == "group": + if not message.group_id: + log.warning("WS V3: Attempted group publish without group_id") + return + channel = f"ws:group:{message.group_id}" + + elif message.target == "broadcast": + channel = "ws:broadcast" + + elif message.target == "echo": + channel = f"ws:client:{message.from_id}" + + if channel: + log.debug(f"WS V3: Publishing to {channel}") + await r.publish(channel, message.json()) + +# Global instance +ws_manager_v3 = WS_Manager_V3() diff --git a/app/routers/registry.py b/app/routers/registry.py index af660a4..483a1d7 100644 --- a/app/routers/registry.py +++ b/app/routers/registry.py @@ -10,7 +10,7 @@ from app.routers import ( flask_cfg, hosted_file, api_v3_actions_hosted_file, lookup, organization, page, person, person_user, qr, site, site_domain, user, - util_email, websockets, websockets_redis, e_confex, e_cvent, e_impexium, e_stripe + util_email, websockets, websockets_redis, websockets_v3, e_confex, e_cvent, e_impexium, e_stripe ) def setup_routers(app: FastAPI): @@ -62,6 +62,7 @@ def setup_routers(app: FastAPI): app.include_router(util_email.router, tags=['Utility: Email']) app.include_router(websockets.router, tags=['Websockets']) app.include_router(websockets_redis.router, tags=['Websockets (Redis)']) + app.include_router(websockets_v3.router, prefix='/v3', tags=['Websockets V3']) app.include_router(e_confex.router, prefix='/e/confex', tags=['External Service: Confex']) app.include_router(e_cvent.router, prefix='/e/cvent', tags=['External Service: Cvent']) diff --git a/app/routers/websockets_v3.py b/app/routers/websockets_v3.py new file mode 100644 index 0000000..025507c --- /dev/null +++ b/app/routers/websockets_v3.py @@ -0,0 +1,109 @@ +import asyncio +import json +import logging +from typing import Optional + +from fastapi import APIRouter, WebSocket, WebSocketDisconnect, Depends +from pydantic import ValidationError + +from app.lib_general_v3 import get_account_context_optional +from app.lib_websockets_v3 import WS_Message_V3, ws_manager_v3 + +log = logging.getLogger(__name__) + +router = APIRouter() + +@router.websocket('/ws/group/{group_id}/client/{client_id}') +async def v3_ws_endpoint( + websocket: WebSocket, + group_id: str, + client_id: str, +): + """ + Main V3 WebSocket Endpoint. + Uses granular Redis Pub/Sub for efficient message routing. + """ + await websocket.accept() + log.info(f"WS V3: Client {client_id} connected to group {group_id}") + + # 1. Presence & Subscription Setup + await ws_manager_v3.update_presence(client_id, group_id, online=True) + + redis_conn = await ws_manager_v3.get_redis() + pubsub = redis_conn.pubsub() + + channels = ws_manager_v3.get_channel_names(client_id, group_id) + await pubsub.subscribe(*channels) + + # --- Handlers --- + + async def receiver_handler(): + """Handles incoming messages from the client.""" + try: + while True: + data = await websocket.receive_json() + + try: + # Enforce standardized schema + # Force from_id and group_id from path for security + data['from_id'] = client_id + data['group_id'] = group_id + + message = WS_Message_V3(**data) + await ws_manager_v3.publish_message(message) + + except ValidationError as ve: + log.warning(f"WS V3: Validation error from {client_id}: {ve.json()}") + await websocket.send_json({ + "error": "Invalid message schema", + "details": ve.errors(), + "version": "3" + }) + + except WebSocketDisconnect: + log.info(f"WS V3: Client {client_id} disconnected (receiver)") + raise + except Exception as e: + log.exception(f"WS V3: Unexpected error in receiver for {client_id}") + + async def sender_handler(): + """Handles outgoing messages from Redis to the client.""" + try: + while True: + # Use a small timeout to allow for clean task cancellation + message = await pubsub.get_message(ignore_subscribe_messages=True, timeout=0.1) + + if message and message['type'] == 'message': + # Forward the structured message directly + # Redis stores them as JSON strings + await websocket.send_text(message['data']) + + except Exception as e: + log.exception(f"WS V3: Unexpected error in sender for {client_id}") + + # --- Execution Loop --- + + try: + # Run both loops concurrently. If either fails or client disconnects, clean up. + # asyncio.wait with FIRST_COMPLETED ensures we don't leave orphan tasks. + done, pending = await asyncio.wait( + [ + asyncio.create_task(receiver_handler()), + asyncio.create_task(sender_handler()), + ], + return_when=asyncio.FIRST_COMPLETED, + ) + + # Cancel remaining task (usually the sender if the receiver caught a disconnect) + for task in pending: + task.cancel() + + except Exception as e: + log.error(f"WS V3: Loop error for {client_id}: {e}") + + finally: + # 2. Cleanup + log.info(f"WS V3: Cleaning up connection for {client_id}") + await ws_manager_v3.update_presence(client_id, group_id, online=False) + await pubsub.unsubscribe(*channels) + await pubsub.close() diff --git a/documentation/GUIDE__V3_FRONTEND_API.md b/documentation/GUIDE__V3_FRONTEND_API.md index c6ba886..7613e73 100644 --- a/documentation/GUIDE__V3_FRONTEND_API.md +++ b/documentation/GUIDE__V3_FRONTEND_API.md @@ -128,7 +128,17 @@ V3 uses a **String-Only ID Vision**. The frontend NEVER handles or stores databa --- -## 7. Structured Error Handling +## 9. Real-Time Communication (V3 WebSockets) + +V3 WebSockets provide a granular, high-performance messaging layer. + +- **Guide**: [Aether API V3 WebSocket Integration Guide](./GUIDE__V3_FRONTEND_WEBSOCKETS.md) +- **Endpoint**: `ws://[api_domain]/v3/ws/group/{group_id}/client/{client_id}` +- **Key Requirement**: All messages must conform to the `WS_Message_V3` schema. + +--- + +## 10. Structured Error Handling V3 returns machine-readable error objects in `meta.details` for failures. diff --git a/documentation/GUIDE__V3_FRONTEND_WEBSOCKETS.md b/documentation/GUIDE__V3_FRONTEND_WEBSOCKETS.md new file mode 100644 index 0000000..08cb6b0 --- /dev/null +++ b/documentation/GUIDE__V3_FRONTEND_WEBSOCKETS.md @@ -0,0 +1,141 @@ +# Aether API V3 WebSocket Integration Guide + +This guide explains how to implement real-time communication using the **Aether API V3 WebSocket** protocol. V3 introduces granular routing, strict message schemas, and improved multi-tenant isolation compared to previous versions. + +--- + +## 1. Key Improvements (V2 vs V3) + +| Feature | WebSocket V2 (Legacy) | WebSocket V3 (Modern) | +| :--- | :--- | :--- | +| **URL Prefix** | `/ws/` or `/ws_redis/` | `/v3/ws/` | +| **Routing** | **Global**: Every client receives every message. | **Granular**: Redis filters messages before sending. | +| **Performance**| Low efficiency at scale (Python filtering). | High efficiency (Redis native pub/sub). | +| **Schema** | Loose JSON objects. | Strict Pydantic-validated models. | +| **Presence** | None / Manual. | Automatic Redis-backed presence sets. | + +--- + +## 2. Connection Strategy + +### A. Endpoint URL +The V3 WebSocket path requires both a `group_id` and a `client_id` (using **Vision ID** random strings). + +```text +ws://[api_domain]/v3/ws/group/{group_id}/client/{client_id} +``` + +### B. Connection Example (TypeScript) +```ts +const group_id = "group_abc123"; // Random ID +const client_id = "device_xyz789"; // Random ID +const ws_url = `ws://api.oneskyit.com/v3/ws/group/${group_id}/client/${client_id}`; + +const socket = new WebSocket(ws_url); + +socket.onopen = () => { + console.log("Connected to Aether WS V3"); +}; +``` + +--- + +## 3. The V3 Message Schema + +All messages sent and received over V3 must follow the standardized **WS_Message_V3** structure. + +### Message Fields +| Field | Type | Required | Description | +| :--- | :--- | :--- | :--- | +| `version` | string | Auto | Always `"3"`. | +| `msg_type` | string | Yes | `'msg'`, `'cmd'`, `'heartbeat'`, `'presence'` | +| `target` | string | Yes | `'direct'`, `'group'`, `'broadcast'`, `'echo'` | +| `from_id` | string | No* | Client ID of sender (Auto-filled by server if omitted). | +| `to_id` | string | No | Target Client ID (Required for `target: 'direct'`). | +| `group_id` | string | No* | Target Group ID (Auto-filled by server if omitted). | +| `cmd` | string | No | Specific action keyword (e.g., `'RELOAD'`). | +| `msg` | string | No | Human-readable text content. | +| `payload` | object | No | Flexible key-value data. | +| `sent_at` | string | Auto | ISO 8601 Timestamp. | + +--- + +## 4. Message Targeting Logic + +V3 uses the `target` field to determine which Redis channel to use, ensuring only the intended recipients receive the data. + +### A. Group Broadcast +Sends the message to every client connected to the same `group_id`. +```json +{ + "msg_type": "msg", + "target": "group", + "msg": "Hello team!" +} +``` + +### B. Direct Message (DM) +Sends the message to one specific client ID, regardless of their group. +```json +{ + "msg_type": "msg", + "target": "direct", + "to_id": "target_client_random_id", + "msg": "Private message just for you." +} +``` + +### C. System Broadcast +Sends the message to **every** connected client on the platform (use sparingly). +```json +{ + "msg_type": "cmd", + "target": "broadcast", + "cmd": "MAINTENANCE_WARNING" +} +``` + +### D. Echo +Sends the message back only to the sender (useful for testing round-trip latency). +```json +{ + "msg_type": "msg", + "target": "echo", + "msg": "Ping!" +} +``` + +--- + +## 5. Specialized Message Types + +### Commands (`cmd`) +Used for remote control or orchestration. +```json +{ + "msg_type": "cmd", + "target": "group", + "cmd": "RELOAD_UI", + "payload": { "force": true } +} +``` + +### Heartbeats (`heartbeat`) +Keep the connection alive and refresh presence in the backend. Should be sent every 30-60 seconds. +```json +{ + "msg_type": "heartbeat", + "target": "echo" +} +``` + +--- + +## 6. Migration Guide (V2 to V3) + +If you are upgrading from the legacy V2 WebSocket (`/ws/group/...`): + +1. **Change the URL**: Prepend `/v3/` to your WebSocket path. +2. **Wrap your JSON**: In V2, you might have sent `{"msg": "hi"}`. In V3, this must be `{"msg_type": "msg", "target": "group", "msg": "hi"}`. +3. **Use Vision IDs**: Ensure all IDs passed in the path and `to_id` fields are the random string IDs (`id_random`), not database integers. +4. **Listen for `msg_type`**: Update your frontend handlers to switch logic based on the `msg_type` field instead of proprietary keys. diff --git a/documentation/PROJECT__Aether_API_Websockets_v3.md b/documentation/PROJECT__Aether_API_Websockets_v3.md new file mode 100644 index 0000000..9dd8867 --- /dev/null +++ b/documentation/PROJECT__Aether_API_Websockets_v3.md @@ -0,0 +1,88 @@ +# Project: Aether API WebSockets V3 + +## 1. Overview +The goal of WebSockets V3 is to provide a high-performance, scalable, and standardized real-time communication layer for the Aether Platform. This version focuses on efficient message routing using Redis granular Pub/Sub, integration with the **Vision ID** (string-based) pattern, and strict data validation via Pydantic. + +The primary use case is **Group Coordination**: allowing a "controller" client to send commands or messages to one or more "worker" clients within the same group. + +## 2. Analysis of Previous Versions + +### V1: `websockets.py` (Memory-Based) +* **Mechanism**: Maintained a list of `WebSocket` objects in a Python list (`active_connections`). +* **Limitation**: Did not scale across multiple Docker containers. Clients on instance A could not communicate with clients on instance B. +* **Feature**: Basic support for `direct`, `group`, and `broadcast`. + +### V2: `websockets_redis.py` (Global Pub/Sub) +* **Mechanism**: Uses `redis.asyncio` to publish all messages to a single `channel:ws`. +* **Limitation**: **"Noisy Neighbor" Problem**. Every API instance receives *every* message sent across the entire platform and must filter them in Python code (`if data.get('target') == 'group'`). This wastes CPU and network bandwidth at scale. +* **Feature**: Solved multi-instance connectivity. + +## 3. V3 Architecture: Granular Pub/Sub + +### Granular Redis Channels +V3 will move filtering from Python to Redis by using specific channel names. A client will subscribe only to the channels relevant to them: +1. **Client Channel**: `ws:client:{client_id_random}` (For Direct Messages) +2. **Group Channel**: `ws:group:{group_id_random}` (For Group Messages) +3. **Global Channel**: `ws:broadcast` (For System-wide Messages) + +### Vision ID Integration +* All IDs in the WebSocket path and payload will be string-based `id_random` values. +* Path format: `/v3/ws/group/{group_id_random}/client/{client_id_random}` + +### Standardized Message Schema +All V3 messages will follow a strict Pydantic model to ensure consistency between different device types. + +```python +class WS_Message_V3(BaseModel): + version: str = "3" + msg_type: str # 'msg', 'cmd', 'heartbeat', 'presence' + target: str # 'direct', 'group', 'broadcast', 'echo' + from_id: str # client_id_random + to_id: Optional[str] # target client_id_random (for direct) + group_id: Optional[str] # target group_id_random (for group) + cmd: Optional[str] # Specific command string + msg: Optional[str] # Human-readable message + payload: Dict[str, Any] # Flexible JSON data + sent_at: datetime +``` + +## 4. Backend Implementation Plan + +### Phase 1: Library Layer (`app/lib_websockets_v3.py`) +* Define the `WS_Message_V3` model. +* Implement `WS_Manager_V3` to handle Redis connections and channel string generation. +* Add presence tracking using Redis Sets (`SADD` on connect, `SREM` on disconnect). + +### Phase 2: Router Layer (`app/routers/websockets_v3.py`) +* Implement the `/v3/ws/...` endpoint. +* **Receiver Loop**: Receives JSON from client -> Validates -> Publishes to correct Redis channel. +* **Sender Loop**: Listens to multiple Redis channels -> Forwards messages to the client. + +### Phase 3: Integration +* Register the router in `app/routers/registry.py`. +* Ensure legacy endpoints (`/ws/group/...`) remain functional in `websockets_redis.py`. + +## 5. Frontend Integration & Changes + +The frontend will need several updates to support the V3 protocol: + +1. **Connection URL**: Update connection logic to use the `/v3/` prefix. + * *Old*: `ws://api.domain.com/ws/group/{id}/client/{id}` + * *New*: `ws://api.domain.com/v3/ws/group/{id}/client/{id}` +2. **Payload Wrapping**: All outgoing messages must be wrapped in the `WS_Message_V3` structure. + * Instead of sending raw text or simple JSON, send the structured object. +3. **Targeting Logic**: + * To send to the group, set `target: "group"`. + * To send to one specific device, set `target: "direct"` and provide `to_id`. +4. **Heartbeats**: The frontend should ideally send a `msg_type: "heartbeat"` every 30-60 seconds to keep the connection alive and update presence in Redis. +5. **Response Handling**: Incoming messages will now have a consistent shape, making it easier to route data to internal app state or components. + +## 6. Security & Safety +* **API Key Verification**: WebSocket handshakes should optionally verify the `X-Aether-API-Key` during the upgrade request. +* **Isolation**: V3 will use its own Redis database or a strict prefixing strategy to ensure messages never bleed into legacy channels. +* **Error Handling**: Standardize the close codes (e.g., 4000 for invalid message schema). + +## 7. Verification Plan +* Create `tests/e2e/test_e2e_v3_websockets.py`. +* Use `websockets` python library to simulate multiple concurrent clients. +* Test cross-instance communication (if possible in the test environment). diff --git a/tests/README.md b/tests/README.md index 0186aff..7208282 100644 --- a/tests/README.md +++ b/tests/README.md @@ -29,6 +29,8 @@ This directory contains the automated and manual test scripts for the Aether Fas | `test_unit_payload_sanitization.py` | **Primary Logic Test**: Verifies payload stripping and ID resolution. | | `test_unit_router_stripping.py` | Simulates automatic removal of random IDs during updates. | | `test_unit_schema_logic.py` | Verifies V3 schema metadata extraction logic with mocked DB rows. | +| `test_unit_websockets_v3.py` | Unit tests for the V3 WebSocket manager and message models. | +| `test_unit_websockets_v3_router.py` | Verifies the V3 WebSocket endpoint logic and message routing. | ### Integration Tests (`tests/integration/`) | Script | Description | @@ -70,3 +72,17 @@ This directory contains the automated and manual test scripts for the Aether Fas ### Path Requirements Always run test scripts from the **project root** directory. Most scripts include `sys.path.append(os.getcwd())` to ensure local imports work correctly. + +--- + +## šŸ’” Best Practices & Reminders + +1. **Check Before Creating**: Always check the **Script Inventory** above to see if a test for your logic already exists, or find a similar one to use as a reference/template. +2. **Docker & Service Restarts**: Remember that the Aether Platform runs in Docker. If you modify core application code (e.g., in `app/`), you must restart the FastAPI service for changes to take effect: + ```bash + docker restart aether_container_env-ae_api-2 + ``` + (Note: Restarts are NOT necessary if you are only modifying the test scripts themselves). +3. **Clean Up**: Clean up any temporary or debug files created during testing. However, **keep your test scripts**! Refactor them slightly for future use and clarity so they remain valuable assets for the project. +4. **Stay Current**: Update this `README.md` when you add new tests or learn something that could help others. This is a living document; keep the **Script Inventory** and tips up to date. +5. **Commit Often**: Don't forget to commit your working code and tests before moving on to the next task! diff --git a/tests/integration/test_int_websockets_v3.py b/tests/integration/test_int_websockets_v3.py new file mode 100644 index 0000000..5496b22 --- /dev/null +++ b/tests/integration/test_int_websockets_v3.py @@ -0,0 +1,132 @@ +import sys +import os +import json +import asyncio +from unittest.mock import MagicMock + +# Add project root to path +sys.path.append(os.getcwd()) + +# --- Robust Mocking BEFORE App Imports --- +class MockSettings: + def __init__(self): + self.REDIS = {'server': 'localhost', 'port': 6379} + self.DB = { + 'server': 'localhost', + 'port': 3306, + 'username': 'user', + 'password': 'pass', + 'database': 'db', + 'connect_timeout': 10, + 'pool_recycle': 3600 + } + self.JWT_KEY = 'fake-key' + self.AETHER_CFG = {'id': '0'} + self.LOG_PATH = {'app': '/tmp/ae.log'} + self.FILES_PATH = {'hosted_files_root': '/tmp', 'hosted_tmp_root': '/tmp'} + self.ORIGINS_REGEX = '.*' + self.ORIGINS = [] + + @property + def SQLALCHEMY_DB_URI(self) -> str: + return "mysql://user:pass@localhost:3306/db" + +mock_settings = MockSettings() +mock_config = MagicMock() +mock_config.settings = mock_settings +sys.modules["app.config"] = mock_config + +# Mock DB related modules to prevent connection attempts at import time +sys.modules["app.db_sql"] = MagicMock() +sys.modules["app.lib_sql_core"] = MagicMock() +sys.modules["app.db_connection"] = MagicMock() + +from fastapi.testclient import TestClient +from app.main import app + +# Assume local Redis is running for integration testing +client = TestClient(app) + +def test_v3_websocket_communication(): + print("\n--- Testing V3 WebSocket: Group & Direct Communication ---") + + group_id = "test_group_v3" + client_a_id = "client_a" + client_b_id = "client_b" + + try: + # 1. Connect both clients + with client.websocket_connect(f"/v3/ws/group/{group_id}/client/{client_a_id}") as ws_a, \ + client.websocket_connect(f"/v3/ws/group/{group_id}/client/{client_b_id}") as ws_b: + + print("Connected Client A and Client B.") + + # --- Scenario A: Group Message --- + print("\n[Scenario A] Client A sends a GROUP message...") + msg_group = { + "msg_type": "msg", + "target": "group", + "msg": "Hello Group!" + } + ws_a.send_json(msg_group) + + resp_a = ws_a.receive_json() + resp_b = ws_b.receive_json() + + print(f"Client A received: {resp_a.get('msg')}") + print(f"Client B received: {resp_b.get('msg')}") + + assert resp_a["msg"] == "Hello Group!" + assert resp_b["msg"] == "Hello Group!" + assert resp_b["from_id"] == client_a_id + print("āœ… Group messaging verified.") + + # --- Scenario B: Echo Message --- + print("\n[Scenario B] Client A sends an ECHO message...") + msg_echo = { + "msg_type": "msg", + "target": "echo", + "msg": "Only for me" + } + ws_a.send_json(msg_echo) + + resp_a_echo = ws_a.receive_json() + print(f"Client A received: {resp_a_echo.get('msg')}") + + assert resp_a_echo["msg"] == "Only for me" + print("āœ… Echo messaging verified.") + + # --- Scenario C: Direct Message --- + print("\n[Scenario C] Client A sends a DIRECT message to Client B...") + msg_direct = { + "msg_type": "cmd", + "target": "direct", + "to_id": client_b_id, + "cmd": "RUN_TEST" + } + ws_a.send_json(msg_direct) + + resp_b_direct = ws_b.receive_json() + print(f"Client B received command: {resp_b_direct.get('cmd')}") + + assert resp_b_direct["cmd"] == "RUN_TEST" + assert resp_b_direct["from_id"] == client_a_id + print("āœ… Direct messaging verified.") + + except ConnectionRefusedError: + print("\nāš ļø Skipping test: Local Redis not found on port 6379.") + except Exception as e: + if "Connection refused" in str(e): + print("\nāš ļø Skipping test: Local Redis not found on port 6379.") + else: + raise e + +if __name__ == "__main__": + try: + test_v3_websocket_communication() + print("\nšŸŽ‰ V3 WebSocket Integration Test Finished!") + except Exception as e: + print(f"\nāŒ TEST FAILED: {e}") + import traceback + traceback.print_exc() + sys.exit(1) diff --git a/tests/unit/test_unit_websockets_v3.py b/tests/unit/test_unit_websockets_v3.py new file mode 100644 index 0000000..4f4094d --- /dev/null +++ b/tests/unit/test_unit_websockets_v3.py @@ -0,0 +1,74 @@ +import sys +import os +import asyncio +import unittest +from unittest.mock import MagicMock, AsyncMock, patch +from datetime import datetime, timezone + +# Add project root to path +sys.path.append(os.getcwd()) + +# Mock app.config BEFORE imports +mock_config = MagicMock() +mock_config.settings = MagicMock() +mock_config.settings.REDIS = {'server': 'localhost', 'port': 6379} +sys.modules["app.config"] = mock_config + +from app.lib_websockets_v3 import WS_Message_V3, WS_Manager_V3 + +class TestWSV3Library(unittest.TestCase): + + def test_message_model_validation(self): + print("\n--- Testing WS_Message_V3 Validation ---") + data = { + "msg_type": "cmd", + "target": "group", + "from_id": "client_abc", + "group_id": "group_123", + "cmd": "RELOAD", + "payload": {"force": True} + } + msg = WS_Message_V3(**data) + self.assertEqual(msg.version, "3") + self.assertEqual(msg.cmd, "RELOAD") + self.assertTrue(isinstance(msg.sent_at.isoformat(), str)) + print("āœ… Model validation passed.") + + def test_channel_name_generation(self): + print("\n--- Testing Channel Name Generation ---") + manager = WS_Manager_V3() + channels = manager.get_channel_names("client_abc", "group_123") + + self.assertIn("ws:client:client_abc", channels) + self.assertIn("ws:group:group_123", channels) + self.assertIn("ws:broadcast", channels) + print("āœ… Channel name generation passed.") + + @patch('redis.asyncio.Redis.from_url') + def test_publish_routing(self, mock_redis_factory): + print("\n--- Testing Publish Routing ---") + mock_redis = AsyncMock() + mock_redis_factory.return_value = mock_redis + + manager = WS_Manager_V3() + + async def run_test(): + # 1. Test Group Routing + msg_group = WS_Message_V3( + msg_type="msg", target="group", from_id="sender", group_id="target_group" + ) + await manager.publish_message(msg_group) + mock_redis.publish.assert_called_with("ws:group:target_group", unittest.mock.ANY) + + # 2. Test Direct Routing + msg_direct = WS_Message_V3( + msg_type="msg", target="direct", from_id="sender", to_id="target_client" + ) + await manager.publish_message(msg_direct) + mock_redis.publish.assert_called_with("ws:client:target_client", unittest.mock.ANY) + + asyncio.run(run_test()) + print("āœ… Publish routing logic passed.") + +if __name__ == "__main__": + unittest.main() diff --git a/tests/unit/test_unit_websockets_v3_router.py b/tests/unit/test_unit_websockets_v3_router.py new file mode 100644 index 0000000..1badf4e --- /dev/null +++ b/tests/unit/test_unit_websockets_v3_router.py @@ -0,0 +1,126 @@ +import sys +import os +import asyncio +import json +import unittest +from unittest.mock import MagicMock, AsyncMock, patch + +# Add project root to path +sys.path.append(os.getcwd()) + +# Mock app.config BEFORE imports to prevent attempt to load real settings +mock_config = MagicMock() +mock_config.settings = MagicMock() +mock_config.settings.REDIS = {'server': 'localhost', 'port': 6379} +sys.modules["app.config"] = mock_config + +# Mock DB related modules to prevent circular imports or DB connection attempts +sys.modules["app.db_sql"] = MagicMock() +sys.modules["app.lib_sql_core"] = MagicMock() +sys.modules["app.db_connection"] = MagicMock() + +from app.routers.websockets_v3 import v3_ws_endpoint + +class TestWSV3Router(unittest.TestCase): + + @patch('app.routers.websockets_v3.ws_manager_v3') + def test_v3_ws_endpoint_logic(self, mock_manager): + """ + Tests the core logic of the V3 WebSocket endpoint, ensuring + Redis subscription and bidirectional message handling are initiated. + """ + # 1. Setup WebSocket Mock + mock_ws = AsyncMock() + + # 2. Setup Redis PubSub Mock + mock_pubsub = MagicMock() + mock_pubsub.subscribe = AsyncMock() + mock_pubsub.unsubscribe = AsyncMock() + mock_pubsub.close = AsyncMock() + + mock_message = { + 'type': 'message', + 'data': json.dumps({ + "version": "3", + "msg_type": "msg", + "target": "group", + "from_id": "other_client", + "msg": "Hello from Redis", + "payload": {}, + "sent_at": "2026-01-30T12:00:00Z" + }) + } + + # Signal to coordinate loops + msg_delivered = asyncio.Event() + + # Counters to break the 'while True' loops in the endpoint + get_msg_count = 0 + recv_json_count = 0 + + async def mock_get_message(*args, **kwargs): + nonlocal get_msg_count + get_msg_count += 1 + if get_msg_count == 1: + msg_delivered.set() + return mock_message + await asyncio.sleep(0.05) + # Raise CancelledError to terminate the loop cleanly + raise asyncio.CancelledError("Terminate sender loop") + + mock_pubsub.get_message = mock_get_message + + mock_redis = MagicMock() + mock_redis.pubsub.return_value = mock_pubsub + + # 3. Setup Manager Mock + mock_manager.get_redis = AsyncMock(return_value=mock_redis) + mock_manager.update_presence = AsyncMock() + mock_manager.publish_message = AsyncMock() + mock_manager.get_channel_names.return_value = ["ws:group:test"] + + # Mock incoming websocket message + async def mock_receive_json(): + nonlocal recv_json_count + recv_json_count += 1 + if recv_json_count == 1: + # Wait until the sender loop has processed the Redis message + await msg_delivered.wait() + return { + "msg_type": "msg", + "target": "group", + "msg": "Client A saying hi" + } + await asyncio.sleep(0.05) + # Raise CancelledError to terminate the loop cleanly + raise asyncio.CancelledError("Terminate receiver loop") + + mock_ws.receive_json.side_effect = mock_receive_json + + # 4. Run the endpoint logic + async def run_endpoint(): + try: + # Execute endpoint with a short timeout + await asyncio.wait_for( + v3_ws_endpoint(mock_ws, "test_group", "client_a"), + timeout=0.5 + ) + except (asyncio.TimeoutError, asyncio.CancelledError): + pass + except Exception as e: + # Suppress our expected loop-termination messages + if "Terminate" not in str(e): + raise + + asyncio.run(run_endpoint()) + + # 5. Verifications + mock_ws.accept.assert_called_once() + mock_manager.update_presence.assert_any_call("client_a", "test_group", online=True) + + # Verify message from Redis was forwarded to WebSocket + mock_ws.send_text.assert_called() + print("āœ… WebSocket Router unit logic verified.") + +if __name__ == "__main__": + unittest.main()