From a8aa6bf95035b110601f37ff12533519d6165cf3 Mon Sep 17 00:00:00 2001 From: Scott Idem Date: Tue, 21 May 2024 18:47:57 -0400 Subject: [PATCH] Updates and clean up. No longer uses 100% CPU. --- app/routers/websockets_redis.py | 33 +++++++++++++++++++++++---------- 1 file changed, 23 insertions(+), 10 deletions(-) diff --git a/app/routers/websockets_redis.py b/app/routers/websockets_redis.py index 915252a..e60fc1d 100644 --- a/app/routers/websockets_redis.py +++ b/app/routers/websockets_redis.py @@ -36,10 +36,10 @@ async def redis_connector( log.setLevel(logging.INFO) log.debug(locals()) - # The receives WS JSON data. Then publishes to Redis channel. + # This receives WS JSON data. Then publishes to Redis channel. async def receiver_handler( ws_conn: WebSocket, - r_conn, + r_conn, # Redis connection client_id = None, group_id = None, ): @@ -51,9 +51,12 @@ async def redis_connector( # Close Redis connection on disconnect try: while True: + # Likely properties of the data dict: + # type, cmd, msg, client_id, group_id data = await ws_conn.receive_json() # Returns dict log.debug(data) + # Likely msg_type values: # echo (echo message), dm (direct message), group (group message), all (broadcast message to all), cmd, group_cmd(?) msg_type = data.get('type') cmd = data.get('cmd') @@ -71,6 +74,8 @@ async def redis_connector( log.debug(data) if data: + # Publish to Redis channel + # The channel should probably be a variable that can be set outside of this API. await r_conn.publish('channel:ws', json.dumps(data)) except WebSocketDisconnect as exc: # TODO: This needs handling better? @@ -85,7 +90,7 @@ async def redis_connector( # Also called the Redis reader for a channel subscription # This watches a Redis channel. Then sends out the message. async def sender_handler( - r_channel, + r_channel, # Redis channel ws_conn: WebSocket, client_id = None, group_id = None, @@ -95,7 +100,12 @@ async def redis_connector( try: while True: - message = await r_channel.get_message(ignore_subscribe_messages=True) + # log.setLevel(logging.DEBUG) + + # WARNING: If the timeout is not defined in the function call to something greater than 0 (the default if not set?) or None, the get_message() method will cause high CPU usage and or just not work. Or at least that specific thread will just hang. This is because the timeout is set to 0. This is not the same as None. + # The get_message() method is a blocking method (timeout=None) that waits for a message to be published to the channel. This seems to be a good solution for now. The other option is to set a timeout of something like 1 second or .01 seconds to do other things (like send a heartbeat after each timeout). + message = await r_channel.get_message(ignore_subscribe_messages=True, timeout=None) # Timeout should be None. Other options are something like: 1 second, .1 seconds, or .01 seconds + log.debug(f'(Redis Reader) Message received or timeout reached: {message}') if message is not None: log.debug(f'(Redis Reader) Message Received: {message}') data = json.loads(message['data']) @@ -107,7 +117,9 @@ async def redis_connector( log.debug(ws_conn.client_state.name) # We only want to send to the WS marked as connected. - # 'WebSocketState.CONNECTED' or 'WebSocketState.DISCONNECTED' + # if ws_conn.client_state == 'WebSocketState.CONNECTED' + # if ws_conn.client_state == 'WebSocketState.DISCONNECTED' + if ws_conn.client_state.name == 'CONNECTED': log.info(f'WS client ({client_id}) connected!') @@ -119,15 +131,15 @@ async def redis_connector( # await ws_conn.send_json(data) send_flag = True elif group_id: - log.info('Skip sending to client') + log.info('WS group: Skip sending to client') send_flag = False # continue # Do not send if data.get('target') == 'echo' and client_id == data.get('client_id'): - log.info('Echo') + log.info('Sending echo to client') send_flag = True elif data.get('target') == 'echo': - log.info('Skip sending to client') + log.info('WS echo: Skip sending to client') send_flag = False # continue @@ -142,8 +154,9 @@ async def redis_connector( else: log.info('Not sending') continue - else: - log.warning(f'WS client ({client_id}) no longer connected!? They have just missed a message.') + else: # == 'WebSocketState.DISCONNECTED' + log.warning(f'WS Redis client {client_id}: Skipping not connected (orphan) WS client. They missed this message. They should be removed from the Redis list of connected clients.') # Use unsubscribe to remove from list of connected clients. + await r_channel.unsubscribe() # Added line 2024-05-21 if data == 'STOP': log.debug('(Reader) STOP')