import asyncio import websockets import json import uuid async def test_ws_v3_real(): # Use fastapi.localhost to hit the correct Nginx proxy block uri_base = "ws://fastapi.localhost:5060/v3/ws" group_alpha = f"test_group_alpha_{uuid.uuid4().hex[:6]}" group_beta = f"test_group_beta_{uuid.uuid4().hex[:6]}" client_a_id = "client_a" client_b_id = "client_b" client_c_id = "client_c" print(f"Connecting to {uri_base}...") try: async with websockets.connect(f"{uri_base}/group/{group_alpha}/client/{client_a_id}") as ws_a, \ websockets.connect(f"{uri_base}/group/{group_alpha}/client/{client_b_id}") as ws_b, \ websockets.connect(f"{uri_base}/group/{group_beta}/client/{client_c_id}") as ws_c: print("✅ All 3 clients connected to real API through Nginx proxy.") # --- 1. GROUP MESSAGE --- print("\nScenario 1: Group Message (Alpha)") msg_group = { "msg_type": "msg", "target": "group", "msg": "Hello Alpha Squad" } await ws_a.send(json.dumps(msg_group)) # Client B (same group) should get it resp_b = await asyncio.wait_for(ws_b.recv(), timeout=2.0) data_b = json.loads(resp_b) print(f"✅ Client B received: {data_b.get('msg')}") # Client A (sender) should ALSO get it via Redis echo resp_a = await asyncio.wait_for(ws_a.recv(), timeout=2.0) data_a = json.loads(resp_a) print(f"✅ Client A received own message: {data_a.get('msg')}") # Client C (Beta) should NOT get it try: await asyncio.wait_for(ws_c.recv(), timeout=0.5) print("❌ ERROR: Client C received Alpha message!") except asyncio.TimeoutError: print("✅ Client C correctly ignored Alpha message.") # --- 2. DIRECT MESSAGE --- print("\nScenario 2: Direct Message (A -> C)") msg_direct = { "msg_type": "msg", "target": "direct", "to_id": client_c_id, "msg": "Secret code 123" } await ws_a.send(json.dumps(msg_direct)) resp_c = await asyncio.wait_for(ws_c.recv(), timeout=2.0) data_c = json.loads(resp_c) print(f"✅ Client C received direct: {data_c.get('msg')}") # --- 3. BROADCAST --- print("\nScenario 3: Global Broadcast") msg_bcast = { "msg_type": "cmd", "target": "broadcast", "cmd": "SYSTEM_PING" } await ws_b.send(json.dumps(msg_bcast)) for ws, name in [(ws_a, "A"), (ws_b, "B"), (ws_c, "C")]: resp = await asyncio.wait_for(ws.recv(), timeout=2.0) data = json.loads(resp) print(f"✅ Client {name} received broadcast: {data.get('cmd')}") print("\n🎉 ALL SCENARIOS PASSED ON REAL API!") except Exception as e: print(f"❌ TEST FAILED: {e}") if __name__ == "__main__": asyncio.run(test_ws_v3_real())