import asyncio import websockets import json async def test_ping(): # Using fastapi.localhost to avoid the default 'localhost' static file block uri = "ws://fastapi.localhost:5060/v3/ws/group/test_group/client/test_user" print(f"Connecting to {uri}...") try: # We'll explicitly set the Host header to be safe async with websockets.connect(uri) as websocket: print("✅ Connection established!") ping_msg = { "msg_type": "heartbeat", "target": "echo", "msg": "Ping from Test Script" } print("Sending Heartbeat (Ping)...") await websocket.send(json.dumps(ping_msg)) # Wait for the echo back print("Waiting for response...") response = await asyncio.wait_for(websocket.recv(), timeout=3.0) data = json.loads(response) if data.get("msg_type") == "heartbeat": print(f"✅ Echo received successfully!") print(f"Server Timestamp: {data.get('sent_at')}") print("WS V3 is confirmed working through the gateway.") else: print(f"❓ Received unexpected message type: {data.get('msg_type')}") print(f"Full payload: {data}") except Exception as e: print(f"❌ Failed: {e}") if __name__ == "__main__": asyncio.run(test_ping())