import sys import os import json import asyncio from unittest.mock import MagicMock # Add project root to path sys.path.append(os.getcwd()) # --- Robust Mocking BEFORE App Imports --- class MockSettings: def __init__(self): self.REDIS = {'server': 'localhost', 'port': 6379} self.DB = { 'server': 'localhost', 'port': 3306, 'username': 'user', 'password': 'pass', 'database': 'db', 'connect_timeout': 10, 'pool_recycle': 3600 } self.JWT_KEY = 'fake-key' self.AETHER_CFG = {'id': '0'} self.LOG_PATH = {'app': '/tmp/ae.log'} self.FILES_PATH = {'hosted_files_root': '/tmp', 'hosted_tmp_root': '/tmp'} self.ORIGINS_REGEX = '.*' self.ORIGINS = [] @property def SQLALCHEMY_DB_URI(self) -> str: return "mysql://user:pass@localhost:3306/db" mock_settings = MockSettings() mock_config = MagicMock() mock_config.settings = mock_settings sys.modules["app.config"] = mock_config # Mock DB related modules to prevent connection attempts at import time sys.modules["app.db_sql"] = MagicMock() sys.modules["app.lib_sql_core"] = MagicMock() sys.modules["app.db_connection"] = MagicMock() from fastapi.testclient import TestClient from app.main import app # Assume local Redis is running for integration testing client = TestClient(app) def test_v3_websocket_communication(): print("\n--- Testing V3 WebSocket: Group & Direct Communication ---") group_id = "test_group_v3" client_a_id = "client_a" client_b_id = "client_b" try: # 1. Connect both clients with client.websocket_connect(f"/v3/ws/group/{group_id}/client/{client_a_id}") as ws_a, \ client.websocket_connect(f"/v3/ws/group/{group_id}/client/{client_b_id}") as ws_b: print("Connected Client A and Client B.") # --- Scenario A: Group Message --- print("\n[Scenario A] Client A sends a GROUP message...") msg_group = { "msg_type": "msg", "target": "group", "msg": "Hello Group!" } ws_a.send_json(msg_group) resp_a = ws_a.receive_json() resp_b = ws_b.receive_json() print(f"Client A received: {resp_a.get('msg')}") print(f"Client B received: {resp_b.get('msg')}") assert resp_a["msg"] == "Hello Group!" assert resp_b["msg"] == "Hello Group!" assert resp_b["from_id"] == client_a_id print("✅ Group messaging verified.") # --- Scenario B: Echo Message --- print("\n[Scenario B] Client A sends an ECHO message...") msg_echo = { "msg_type": "msg", "target": "echo", "msg": "Only for me" } ws_a.send_json(msg_echo) resp_a_echo = ws_a.receive_json() print(f"Client A received: {resp_a_echo.get('msg')}") assert resp_a_echo["msg"] == "Only for me" print("✅ Echo messaging verified.") # --- Scenario C: Direct Message --- print("\n[Scenario C] Client A sends a DIRECT message to Client B...") msg_direct = { "msg_type": "cmd", "target": "direct", "to_id": client_b_id, "cmd": "RUN_TEST" } ws_a.send_json(msg_direct) resp_b_direct = ws_b.receive_json() print(f"Client B received command: {resp_b_direct.get('cmd')}") assert resp_b_direct["cmd"] == "RUN_TEST" assert resp_b_direct["from_id"] == client_a_id print("✅ Direct messaging verified.") except ConnectionRefusedError: print("\n⚠️ Skipping test: Local Redis not found on port 6379.") except Exception as e: if "Connection refused" in str(e): print("\n⚠️ Skipping test: Local Redis not found on port 6379.") else: raise e if __name__ == "__main__": try: test_v3_websocket_communication() print("\n🎉 V3 WebSocket Integration Test Finished!") except Exception as e: print(f"\n❌ TEST FAILED: {e}") import traceback traceback.print_exc() sys.exit(1)