from fastapi import APIRouter, FastAPI, WebSocket, WebSocketDisconnect from fastapi.responses import HTMLResponse from typing import List import aioredis, asyncio, json from ..lib_general import * from ..log import * from app.config import settings from app.db import * router = APIRouter() html = """ Chat

WebSocket Chat

Your ID:

""" @router.get("/ws_test") async def get(): log.setLevel(logging.DEBUG) log.debug(locals()) return HTMLResponse(html) @router.websocket("/ws") async def websocket_endpoint(websocket: WebSocket): log.setLevel(logging.DEBUG) log.debug(locals()) log.info('Root of ws. Waiting to accept a websocket and then the redis_connector') await websocket.accept() await redis_connector(websocket) async def redis_connector( websocket: WebSocket, redis_uri: str = "redis://localhost:6379" ): log.setLevel(logging.DEBUG) log.debug(locals()) async def consumer_handler(ws: WebSocket, r): try: while True: message = await ws.receive_text() if message: #logging.info(ws) #logging.info(dir(message)) data = json.loads(message) #await r.publish("chat:c", message) #await r.publish("chat:c", str(data['message'])) await r.publish("chat:c", str(data['client_id'])) await r.publish("chat:c", str(data)) except WebSocketDisconnect as exc: # TODO this needs handling better log.error(exc) async def producer_handler(r, ws: WebSocket): (channel,) = await r.subscribe("chat:c") assert isinstance(channel, aioredis.Channel) try: while True: message = await channel.get() if message: await ws.send_text(message.decode("utf-8")) except Exception as exc: # TODO this needs handling better log.error(exc) redis = await aioredis.create_redis_pool(redis_uri) consumer_task = consumer_handler(websocket, redis) producer_task = producer_handler(redis, websocket) done, pending = await asyncio.wait( [consumer_task, producer_task], return_when=asyncio.FIRST_COMPLETED, ) log.debug(f"Done task: {done}") for task in pending: log.debug(f"Canceling task: {task}") task.cancel() redis.close() await redis.wait_closed()