diff --git a/app/routers/websockets_redis.py b/app/routers/websockets_redis.py index 48973c0..5f8eebd 100644 --- a/app/routers/websockets_redis.py +++ b/app/routers/websockets_redis.py @@ -2,7 +2,8 @@ from fastapi import APIRouter, FastAPI, Response, WebSocket, WebSocketDisconnect from fastapi.responses import HTMLResponse from pydantic import BaseModel, EmailStr, Field from typing import Dict, List, Optional, Set, Union -import asyncio, base64, datetime, hashlib, json, os, pathlib, redis, shutil, time +import asyncio, base64, datetime, hashlib, json, os, pathlib, shutil, time +import redis.asyncio as redis # import aioredis, redis, asyncio, base64, datetime, hashlib, json, os, pathlib, shutil, time # from aioredis import from_url, Redis # import asyncio @@ -555,26 +556,6 @@ class ConnectionManager: # await websocket.accept() # await redis_connector(websocket) - - - -@router.websocket('/ws/group/{group_id}/client/{client_id}') -@router.websocket('/ws_redis/group/{group_id}/client/{client_id}') -async def redis_ws_client_id( - websocket: WebSocket, - group_id: str, - client_id: str, - ): - log.setLevel(logging.DEBUG) - log.debug(locals()) - - log.info('WS Redis: Group ID and Client ID. Waiting to "accept" a websocket and then the "redis_connector"') - - await websocket.accept() - - await redis_connector(websocket, client_id, group_id) - - # await manager.connect_w_client_id(websocket, client_id, group_id) # # await aio_manager.connect_w_client_id(websocket, client_id, group_id) # try: @@ -608,6 +589,23 @@ async def redis_ws_client_id( # # await manager.broadcast(f'Client #{client_id} left') +@router.websocket('/ws/group/{group_id}/client/{client_id}') +@router.websocket('/ws_redis/group/{group_id}/client/{client_id}') +async def redis_ws_client_id( + websocket: WebSocket, + group_id: str, + client_id: str, + ): + log.setLevel(logging.DEBUG) + log.debug(locals()) + + log.info('WS Redis: Group ID and Client ID. Waiting to "accept" a websocket and then the "redis_connector"') + + await websocket.accept() + + await redis_connector(websocket, client_id, group_id) + + async def redis_connector( ws_conn: WebSocket, client_id: str, @@ -616,8 +614,13 @@ async def redis_connector( log.setLevel(logging.DEBUG) log.debug(locals()) - async def consumer_handler(ws_conn: WebSocket, r, client_id, group_id): + # The consumer is receiving WS JSON data + async def consumer_handler(ws_conn: WebSocket, r_conn, client_id, group_id): log.info('consumer_handler()') + + # Loop until the WS is disconnected + # Publish data as messages to Redis + # Close Redis connection on disconnect try: while True: data = await ws_conn.receive_json() # Returns dict @@ -635,6 +638,7 @@ async def redis_connector( data['client_id'] = client_id data['group_id'] = group_id + log.setLevel(logging.DEBUG) log.debug(data) if data: @@ -643,28 +647,70 @@ async def redis_connector( # data = json.loads(message) #await r.publish("chat:c", message) #await r.publish("chat:c", str(data['message'])) - - # causes: TypeError: object int can't be used in 'await' expression - r.publish("chat:c", str(data.get('client_id'))) - r.publish("chat:c", str(data)) - # r.publish("chat:c", str({'test': 'xxxx'})) - # r.publish("chat:c", str('data goes here...')) - except WebSocketDisconnect as exc: - # TODO this needs handling better - log.error(exc) - async def producer_handler(r, ws_conn: redis.client.PubSub, client_id, group_id): + # causes: TypeError: object int can't be used in 'await' expression + # await r.publish('chat:c', json.dumps(data.get('client_id'))) + # log.debug(await r.get('chat:c')) + await r_conn.publish('chat:c', json.dumps(data)) + # log.debug(await r.get('chat:c')) + # r.publish('chat:c', str({'test': 'xxxx'})) + # r.publish('chat:c', str('data goes here...')) + except WebSocketDisconnect as exc: + # FUTURE: Remove from list of connected clients? + log.error(exc) + await r_conn.close() + except Exception as e: + log.exception('Aether Exception!') + log.error(e) + await r_conn.close() + + # log.debug(await r.get('chat:c')) + + # Also called the Redis reader for a channel subscription + # This watches a Redis channel. Then sends out the message. + async def producer_handler(r_channel, ws_conn: WebSocket, client_id, group_id): log.info('producer_handler()') - (channel,) = await r.subscribe("chat:c") - assert isinstance(channel, redis.Channel) + try: while True: - data = await channel.get() - if data: - await ws_conn.send_json(data) - except Exception as exc: + message = await r_channel.get_message(ignore_subscribe_messages=True) + if message is not None: + log.debug(f'(Redis Reader) Message Received: {message}') + data = json.loads(message['data']) + + if group_id and group_id == data.get('group_id'): + log.info('Sending to matching group ID') + await ws_conn.send_json(data) + elif group_id: + log.info('Skip sending to client') + pass + else: + log.info('Sending to all') + await ws_conn.send_json(data) + + if data == STOPWORD: + log.debug('(Reader) STOP') + break + except ConnectionError as conn_e: + log.error(conn_e) + except Exception as e: + # raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR) from None + # redis.exceptions.ConnectionError: Connection closed by server. + # TODO this needs handling better - log.error(exc) + log.error(e) + + + # (channel,) = await r.subscribe('chat:c') + # assert isinstance(channel, redis.Channel) + # try: + # while True: + # data = await channel.get() + # if data: + # await ws_conn.send_json(data) + # except Exception as exc: + # # TODO this needs handling better + # log.error(exc) # redis_conn = await redis.create_pool(redis_url) # Redis client bound to pool of connections (auto-reconnecting). @@ -676,6 +722,8 @@ async def redis_connector( # ) redis_conn_pubsub = redis_conn.pubsub() + await redis_conn_pubsub.subscribe('channel:1', 'channel:2', 'chat:c') + log.info('Run consumer_task') # consumer_task = consumer_handler(ws_conn, redis_conn, client_id, group_id) @@ -684,12 +732,18 @@ async def redis_connector( log.info('Run producer_task') # producer_task = producer_handler(redis_conn_pubsub, ws_conn, client_id, group_id) - + # future_producer = asyncio.create_task(producer_task()) future_producer = asyncio.create_task(producer_handler(redis_conn_pubsub, ws_conn, client_id, group_id)) - - await future_consumer - await future_producer + + await future_consumer # Get WS messages and set in Redis + await future_producer # Send out messages from Redis subscribe + + # @app.on_event('shutdown') + # async def shutdown(): + # log.info('The Aether FastAPI API is shutting down...') + # # await ws_conn.close() + # await redis_conn.close() # pending_tasks = asyncio.all_tasks(loop=event_loop) @@ -713,3 +767,37 @@ async def redis_connector( # await redis_conn.close() # await redis_conn.wait_closed() + +STOPWORD = 'STOP' + +# async def reader(channel: redis.client.PubSub): +# while True: +# message = await channel.get_message(ignore_subscribe_messages=True) +# if message is not None: +# log.debug(f'(Reader) Message Received: {message}') +# log.info('WHAT NOW???') +# if message['data'].decode() == STOPWORD: +# log.debug('(Reader) STOP') +# break + +# r = redis.from_url('redis://localhost') +# r = redis.Redis(host=settings.REDIS['server'], port=settings.REDIS['port'], db=6, password=None, encoding='utf-8', decode_responses=True) +# async with r.pubsub() as pubsub: + # await pubsub.subscribe('channel:1', 'channel:2', 'chat:c') + + # future = asyncio.create_task(reader(pubsub)) + + # await r.publish('channel:1', 'Hello') + # await r.publish('channel:2', 'World') + # await r.publish('channel:1', STOPWORD) + + # await future + +# @app.on_event('shutdown') +# async def shutdown(): +# log.setLevel(logging.INFO) # DEBUG, INFO, WARN, WARNING, ERROR, EXCEPTION, CRITICAL +# log.debug(locals()) + +# log.info('The Aether FastAPI API is shutting down...') +# #await database.disconnect() +# r.close() \ No newline at end of file