- Added test_ws_v3_ping.py for gateway connectivity verification. - Added test_int_websockets_v3_real.py for multi-client routing isolation verification. - Updated Script Inventory in tests/README.md.
40 lines
1.4 KiB
Python
40 lines
1.4 KiB
Python
import asyncio
|
|
import websockets
|
|
import json
|
|
|
|
async def test_ping():
|
|
# Using fastapi.localhost to avoid the default 'localhost' static file block
|
|
uri = "ws://fastapi.localhost:5060/v3/ws/group/test_group/client/test_user"
|
|
print(f"Connecting to {uri}...")
|
|
try:
|
|
# We'll explicitly set the Host header to be safe
|
|
async with websockets.connect(uri) as websocket:
|
|
print("✅ Connection established!")
|
|
|
|
ping_msg = {
|
|
"msg_type": "heartbeat",
|
|
"target": "echo",
|
|
"msg": "Ping from Test Script"
|
|
}
|
|
|
|
print("Sending Heartbeat (Ping)...")
|
|
await websocket.send(json.dumps(ping_msg))
|
|
|
|
# Wait for the echo back
|
|
print("Waiting for response...")
|
|
response = await asyncio.wait_for(websocket.recv(), timeout=3.0)
|
|
data = json.loads(response)
|
|
|
|
if data.get("msg_type") == "heartbeat":
|
|
print(f"✅ Echo received successfully!")
|
|
print(f"Server Timestamp: {data.get('sent_at')}")
|
|
print("WS V3 is confirmed working through the gateway.")
|
|
else:
|
|
print(f"❓ Received unexpected message type: {data.get('msg_type')}")
|
|
print(f"Full payload: {data}")
|
|
|
|
except Exception as e:
|
|
print(f"❌ Failed: {e}")
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(test_ping()) |