Updates and clean up. No longer uses 100% CPU.

This commit is contained in:
Scott Idem
2024-05-21 18:47:57 -04:00
parent 3d13dc1829
commit a8aa6bf950

View File

@@ -36,10 +36,10 @@ async def redis_connector(
log.setLevel(logging.INFO) log.setLevel(logging.INFO)
log.debug(locals()) log.debug(locals())
# The receives WS JSON data. Then publishes to Redis channel. # This receives WS JSON data. Then publishes to Redis channel.
async def receiver_handler( async def receiver_handler(
ws_conn: WebSocket, ws_conn: WebSocket,
r_conn, r_conn, # Redis connection
client_id = None, client_id = None,
group_id = None, group_id = None,
): ):
@@ -51,9 +51,12 @@ async def redis_connector(
# Close Redis connection on disconnect # Close Redis connection on disconnect
try: try:
while True: while True:
# Likely properties of the data dict:
# type, cmd, msg, client_id, group_id
data = await ws_conn.receive_json() # Returns dict data = await ws_conn.receive_json() # Returns dict
log.debug(data) log.debug(data)
# Likely msg_type values:
# echo (echo message), dm (direct message), group (group message), all (broadcast message to all), cmd, group_cmd(?) # echo (echo message), dm (direct message), group (group message), all (broadcast message to all), cmd, group_cmd(?)
msg_type = data.get('type') msg_type = data.get('type')
cmd = data.get('cmd') cmd = data.get('cmd')
@@ -71,6 +74,8 @@ async def redis_connector(
log.debug(data) log.debug(data)
if data: if data:
# Publish to Redis channel
# The channel should probably be a variable that can be set outside of this API.
await r_conn.publish('channel:ws', json.dumps(data)) await r_conn.publish('channel:ws', json.dumps(data))
except WebSocketDisconnect as exc: except WebSocketDisconnect as exc:
# TODO: This needs handling better? # TODO: This needs handling better?
@@ -85,7 +90,7 @@ async def redis_connector(
# Also called the Redis reader for a channel subscription # Also called the Redis reader for a channel subscription
# This watches a Redis channel. Then sends out the message. # This watches a Redis channel. Then sends out the message.
async def sender_handler( async def sender_handler(
r_channel, r_channel, # Redis channel
ws_conn: WebSocket, ws_conn: WebSocket,
client_id = None, client_id = None,
group_id = None, group_id = None,
@@ -95,7 +100,12 @@ async def redis_connector(
try: try:
while True: while True:
message = await r_channel.get_message(ignore_subscribe_messages=True) # log.setLevel(logging.DEBUG)
# WARNING: If the timeout is not defined in the function call to something greater than 0 (the default if not set?) or None, the get_message() method will cause high CPU usage and or just not work. Or at least that specific thread will just hang. This is because the timeout is set to 0. This is not the same as None.
# The get_message() method is a blocking method (timeout=None) that waits for a message to be published to the channel. This seems to be a good solution for now. The other option is to set a timeout of something like 1 second or .01 seconds to do other things (like send a heartbeat after each timeout).
message = await r_channel.get_message(ignore_subscribe_messages=True, timeout=None) # Timeout should be None. Other options are something like: 1 second, .1 seconds, or .01 seconds
log.debug(f'(Redis Reader) Message received or timeout reached: {message}')
if message is not None: if message is not None:
log.debug(f'(Redis Reader) Message Received: {message}') log.debug(f'(Redis Reader) Message Received: {message}')
data = json.loads(message['data']) data = json.loads(message['data'])
@@ -107,7 +117,9 @@ async def redis_connector(
log.debug(ws_conn.client_state.name) log.debug(ws_conn.client_state.name)
# We only want to send to the WS marked as connected. # We only want to send to the WS marked as connected.
# 'WebSocketState.CONNECTED' or 'WebSocketState.DISCONNECTED' # if ws_conn.client_state == 'WebSocketState.CONNECTED'
# if ws_conn.client_state == 'WebSocketState.DISCONNECTED'
if ws_conn.client_state.name == 'CONNECTED': if ws_conn.client_state.name == 'CONNECTED':
log.info(f'WS client ({client_id}) connected!') log.info(f'WS client ({client_id}) connected!')
@@ -119,15 +131,15 @@ async def redis_connector(
# await ws_conn.send_json(data) # await ws_conn.send_json(data)
send_flag = True send_flag = True
elif group_id: elif group_id:
log.info('Skip sending to client') log.info('WS group: Skip sending to client')
send_flag = False send_flag = False
# continue # Do not send # continue # Do not send
if data.get('target') == 'echo' and client_id == data.get('client_id'): if data.get('target') == 'echo' and client_id == data.get('client_id'):
log.info('Echo') log.info('Sending echo to client')
send_flag = True send_flag = True
elif data.get('target') == 'echo': elif data.get('target') == 'echo':
log.info('Skip sending to client') log.info('WS echo: Skip sending to client')
send_flag = False send_flag = False
# continue # continue
@@ -142,8 +154,9 @@ async def redis_connector(
else: else:
log.info('Not sending') log.info('Not sending')
continue continue
else: else: # == 'WebSocketState.DISCONNECTED'
log.warning(f'WS client ({client_id}) no longer connected!? They have just missed a message.') log.warning(f'WS Redis client {client_id}: Skipping not connected (orphan) WS client. They missed this message. They should be removed from the Redis list of connected clients.') # Use unsubscribe to remove from list of connected clients.
await r_channel.unsubscribe() # Added line 2024-05-21
if data == 'STOP': if data == 'STOP':
log.debug('(Reader) STOP') log.debug('(Reader) STOP')