Files
OSIT-AE-API-FastAPI/tests/integration/test_ws_v3_ping.py
Scott Idem 39391e5949 test(websockets): add real-world integration and ping tests for V3
- Added test_ws_v3_ping.py for gateway connectivity verification.
- Added test_int_websockets_v3_real.py for multi-client routing isolation verification.
- Updated Script Inventory in tests/README.md.
2026-01-30 17:55:45 -05:00

40 lines
1.4 KiB
Python

import asyncio
import websockets
import json
async def test_ping():
# Using fastapi.localhost to avoid the default 'localhost' static file block
uri = "ws://fastapi.localhost:5060/v3/ws/group/test_group/client/test_user"
print(f"Connecting to {uri}...")
try:
# We'll explicitly set the Host header to be safe
async with websockets.connect(uri) as websocket:
print("✅ Connection established!")
ping_msg = {
"msg_type": "heartbeat",
"target": "echo",
"msg": "Ping from Test Script"
}
print("Sending Heartbeat (Ping)...")
await websocket.send(json.dumps(ping_msg))
# Wait for the echo back
print("Waiting for response...")
response = await asyncio.wait_for(websocket.recv(), timeout=3.0)
data = json.loads(response)
if data.get("msg_type") == "heartbeat":
print(f"✅ Echo received successfully!")
print(f"Server Timestamp: {data.get('sent_at')}")
print("WS V3 is confirmed working through the gateway.")
else:
print(f"❓ Received unexpected message type: {data.get('msg_type')}")
print(f"Full payload: {data}")
except Exception as e:
print(f"❌ Failed: {e}")
if __name__ == "__main__":
asyncio.run(test_ping())