From c9fa280c572190763c6205cf77fb0be3f81c8da3 Mon Sep 17 00:00:00 2001 From: Scott Idem Date: Fri, 7 Apr 2023 15:43:08 -0400 Subject: [PATCH] Work on websockets. Finally 95.75% working? --- app/routers/websockets.py | 86 +++++++++++++++ app/routers/websockets_redis.py | 181 ++++++++++---------------------- 2 files changed, 144 insertions(+), 123 deletions(-) diff --git a/app/routers/websockets.py b/app/routers/websockets.py index 7ac219b..8ff6f56 100644 --- a/app/routers/websockets.py +++ b/app/routers/websockets.py @@ -764,3 +764,89 @@ async def redis_connector( task.cancel() await redis.close() # await redis.wait_closed() + + + +#### #### + +# @app.on_event('shutdown') + # async def shutdown(): + # log.info('The Aether FastAPI API is shutting down...') + # # await ws_conn.close() + # await redis_conn.close() + + # pending_tasks = asyncio.all_tasks(loop=event_loop) + + # # cancel each pending task + # for task in pending_tasks: + # logger.info(f'killing task {task.get_coro()}') + # event_loop.call_soon_threadsafe(task.cancel) + + # # This does not work for some reason. + # # "Passing coroutines is forbidden, use tasks explicitly." + # log.info('Start asyncio wait') + # done, pending = await asyncio.wait( + # [receiver_task, producer_task], + # return_when = asyncio.FIRST_COMPLETED, + # ) + # log.debug(f'Done task: {done}') + + # for task in pending: + # log.debug(f'Canceling task: {task}') + # task.cancel() + # await redis_conn.close() + # await redis_conn.wait_closed() + + + + # (channel,) = await r.subscribe('chat:c') + # assert isinstance(channel, redis.Channel) + # try: + # while True: + # data = await channel.get() + # if data: + # await ws_conn.send_json(data) + # except Exception as exc: + # # TODO this needs handling better + # log.error(exc) + + # receiver_task = receiver_handler(ws_conn, redis_conn, client_id, group_id) + + # future_receiver = asyncio.create_task(receiver_task()) + + # sender_task = sender_handler(redis_conn_pubsub, ws_conn, client_id, group_id) + + # future_sender = asyncio.create_task(sender_task()) + + +# async def reader(channel: redis.client.PubSub): +# while True: +# message = await channel.get_message(ignore_subscribe_messages=True) +# if message is not None: +# log.debug(f'(Reader) Message Received: {message}') +# log.info('WHAT NOW???') +# if message['data'].decode() == STOPWORD: +# log.debug('(Reader) STOP') +# break + +# r = redis.from_url('redis://localhost') +# r = redis.Redis(host=settings.REDIS['server'], port=settings.REDIS['port'], db=6, password=None, encoding='utf-8', decode_responses=True) +# async with r.pubsub() as pubsub: + # await pubsub.subscribe('channel:1', 'channel:2', 'chat:c') + + # future = asyncio.create_task(reader(pubsub)) + + # await r.publish('channel:1', 'Hello') + # await r.publish('channel:2', 'World') + # await r.publish('channel:1', STOPWORD) + + # await future + +# @app.on_event('shutdown') +# async def shutdown(): +# log.setLevel(logging.INFO) # DEBUG, INFO, WARN, WARNING, ERROR, EXCEPTION, CRITICAL +# log.debug(locals()) + +# log.info('The Aether FastAPI API is shutting down...') +# #await database.disconnect() +# r.close() \ No newline at end of file diff --git a/app/routers/websockets_redis.py b/app/routers/websockets_redis.py index 970291c..8406af7 100644 --- a/app/routers/websockets_redis.py +++ b/app/routers/websockets_redis.py @@ -614,11 +614,9 @@ async def redis_connector( log.setLevel(logging.DEBUG) log.debug(locals()) - # The consumer is receiving WS JSON data - # ws_receiver_json_handler() - # receiver_handler() # RENAME TO THIS - async def consumer_handler(ws_conn: WebSocket, r_conn, client_id, group_id): - log.info('consumer_handler()') + # The receives WS JSON data. Then publishes to Redis channel. + async def receiver_handler(ws_conn: WebSocket, r_conn, client_id, group_id): + log.info('receiver_handler()') # Loop until the WS is disconnected # Publish data as messages to Redis @@ -646,24 +644,19 @@ async def redis_connector( if data: #logging.info(ws) #logging.info(dir(message)) - # data = json.loads(message) - #await r.publish("chat:c", message) - #await r.publish("chat:c", str(data['message'])) + # log.debug(await r.get('chat:c')) - # causes: TypeError: object int can't be used in 'await' expression - # await r.publish('chat:c', json.dumps(data.get('client_id'))) - # log.debug(await r.get('chat:c')) await r_conn.publish('chat:c', json.dumps(data)) - # log.debug(await r.get('chat:c')) - # r.publish('chat:c', str({'test': 'xxxx'})) - # r.publish('chat:c', str('data goes here...')) + + # log.debug(await r_conn.get_message()) + except WebSocketDisconnect as exc: # TODO: This needs handling better? # FUTURE: Remove from list of connected clients? - log.error(exc) - await r_conn.close() + log.error(exc) # usually 1001? + # await r_conn.close() # Uncommenting causes an exception below (sender_handler) except Exception as e: - log.exception('Aether Exception!') + log.exception('Exception in Aether!') log.error(e) await r_conn.close() @@ -671,10 +664,8 @@ async def redis_connector( # Also called the Redis reader for a channel subscription # This watches a Redis channel. Then sends out the message. - # redis_watcher_ws_sender_json_handler() - # sender_handler() # RENAME TO THIS - async def producer_handler(r_channel, ws_conn: WebSocket, client_id, group_id): - log.info('producer_handler()') + async def sender_handler(r_channel, ws_conn: WebSocket, client_id, group_id): + log.info('sender_handler()') try: while True: @@ -683,125 +674,69 @@ async def redis_connector( log.debug(f'(Redis Reader) Message Received: {message}') data = json.loads(message['data']) - if group_id and group_id == data.get('group_id'): - log.info('Sending to matching group ID') - await ws_conn.send_json(data) - elif group_id: - log.info('Skip sending to client') - pass - else: - log.info('Sending to all') - await ws_conn.send_json(data) + # Need to make sure the ws_connection is still open before trying to send. + # log.debug(ws_conn.open()) + # log.debug(ws_conn.closed) + log.debug(vars(ws_conn)) - if data == STOPWORD: + log.debug(ws_conn.client_state) + log.debug(vars(ws_conn.client_state)) + + log.debug(ws_conn.application_state) + log.debug(vars(ws_conn.application_state)) + + # 'WebSocketState.CONNECTED' or 'WebSocketState.DISCONNECTED' + if ws_conn.client_state == 'WebSocketState.CONNECTED': + if group_id and group_id == data.get('group_id'): + log.info('Sending to matching group ID') + await ws_conn.send_json(data) + elif group_id: + log.info('Skip sending to client') + pass + else: + log.info('Sending to all') + await ws_conn.send_json(data) + else: + log.warning(f'WS client ({client_id}) no longer connected!? They have just missed a message.') + + if data == 'STOP': log.debug('(Reader) STOP') break except ConnectionError as conn_e: # TODO: This needs handling better? log.error(conn_e) + # await r_channel.unsubscribe() + await r_channel.close() + # except ConnectionClosedError as conn_close_e: + # log.error(conn_close_e) + # except redis.exceptions.ConnectionError as e: + # TODO: This needs handling better? + # log.error(e) except Exception as e: - # raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR) from None - # redis.exceptions.ConnectionError: Connection closed by server. + log.exception('Exception in Aether!') log.error(e) - # (channel,) = await r.subscribe('chat:c') - # assert isinstance(channel, redis.Channel) - # try: - # while True: - # data = await channel.get() - # if data: - # await ws_conn.send_json(data) - # except Exception as exc: - # # TODO this needs handling better - # log.error(exc) + # raise RuntimeError(msg % message_type) + # RuntimeError: Unexpected ASGI message 'websocket.send', after sending 'websocket.close'. + + # raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR) from None + # redis.exceptions.ConnectionError: Connection closed by server. - # redis_conn = await redis.create_pool(redis_url) # Redis client bound to pool of connections (auto-reconnecting). - # r = redis.Redis(host=settings.REDIS['server'], port=settings.REDIS['port'], db=7, password=None, decode_responses=True) - log.info('Create Redis connection') + log.info('Create async Redis connection') redis_conn = redis.Redis(host=settings.REDIS['server'], port=settings.REDIS['port'], db=6, password=None, encoding='utf-8', decode_responses=True) - # redis = redis.from_url( - # redis_url, encoding="utf-8", decode_responses=True - # ) + redis_conn_pubsub = redis_conn.pubsub() await redis_conn_pubsub.subscribe('channel:1', 'channel:2', 'chat:c') - log.info('Run consumer_task') - # consumer_task = consumer_handler(ws_conn, redis_conn, client_id, group_id) + log.info('Run receiver_task') + future_receiver = asyncio.create_task(receiver_handler(ws_conn, redis_conn, client_id, group_id)) - # future_consumer = asyncio.create_task(consumer_task()) - future_consumer = asyncio.create_task(consumer_handler(ws_conn, redis_conn, client_id, group_id)) + log.info('Run sender_task') - log.info('Run producer_task') - # producer_task = producer_handler(redis_conn_pubsub, ws_conn, client_id, group_id) + future_sender = asyncio.create_task(sender_handler(redis_conn_pubsub, ws_conn, client_id, group_id)) - # future_producer = asyncio.create_task(producer_task()) - future_producer = asyncio.create_task(producer_handler(redis_conn_pubsub, ws_conn, client_id, group_id)) - - await future_consumer # Get WS messages and set in Redis - await future_producer # Send out messages from Redis subscribe - - # @app.on_event('shutdown') - # async def shutdown(): - # log.info('The Aether FastAPI API is shutting down...') - # # await ws_conn.close() - # await redis_conn.close() - - # pending_tasks = asyncio.all_tasks(loop=event_loop) - - # # cancel each pending task - # for task in pending_tasks: - # logger.info(f'killing task {task.get_coro()}') - # event_loop.call_soon_threadsafe(task.cancel) - - # # This does not work for some reason. - # # "Passing coroutines is forbidden, use tasks explicitly." - # log.info('Start asyncio wait') - # done, pending = await asyncio.wait( - # [consumer_task, producer_task], - # return_when = asyncio.FIRST_COMPLETED, - # ) - # log.debug(f'Done task: {done}') - - # for task in pending: - # log.debug(f'Canceling task: {task}') - # task.cancel() - # await redis_conn.close() - # await redis_conn.wait_closed() - - -STOPWORD = 'STOP' - -# async def reader(channel: redis.client.PubSub): -# while True: -# message = await channel.get_message(ignore_subscribe_messages=True) -# if message is not None: -# log.debug(f'(Reader) Message Received: {message}') -# log.info('WHAT NOW???') -# if message['data'].decode() == STOPWORD: -# log.debug('(Reader) STOP') -# break - -# r = redis.from_url('redis://localhost') -# r = redis.Redis(host=settings.REDIS['server'], port=settings.REDIS['port'], db=6, password=None, encoding='utf-8', decode_responses=True) -# async with r.pubsub() as pubsub: - # await pubsub.subscribe('channel:1', 'channel:2', 'chat:c') - - # future = asyncio.create_task(reader(pubsub)) - - # await r.publish('channel:1', 'Hello') - # await r.publish('channel:2', 'World') - # await r.publish('channel:1', STOPWORD) - - # await future - -# @app.on_event('shutdown') -# async def shutdown(): -# log.setLevel(logging.INFO) # DEBUG, INFO, WARN, WARNING, ERROR, EXCEPTION, CRITICAL -# log.debug(locals()) - -# log.info('The Aether FastAPI API is shutting down...') -# #await database.disconnect() -# r.close() \ No newline at end of file + await future_receiver # Get WS messages and set in Redis + await future_sender # Send out messages from Redis subscribe