diff --git a/admin/requirements.txt b/admin/requirements.txt index f1b3c19..1266347 100644 --- a/admin/requirements.txt +++ b/admin/requirements.txt @@ -4,8 +4,9 @@ fastapi[all] pydantic SQLAlchemy mysqlclient -redis -aioredis +# redis +redis[hiredis] +# aioredis html2text pytz stripe diff --git a/app/routers/websockets_redis.py b/app/routers/websockets_redis.py index cdf55f6..915252a 100644 --- a/app/routers/websockets_redis.py +++ b/app/routers/websockets_redis.py @@ -68,21 +68,15 @@ async def redis_connector( data['client_id'] = client_id if group_id: data['group_id'] = group_id - # log.setLevel(logging.DEBUG) log.debug(data) if data: - # logging.info(ws) - # logging.info(dir(message)) - # log.debug(await r_conn.get('channel:ws')) - # log.debug(await r_conn.get_message()) - await r_conn.publish('channel:ws', json.dumps(data)) except WebSocketDisconnect as exc: # TODO: This needs handling better? # FUTURE: Remove from list of connected clients? log.error(exc) # usually 1001? - # await r_conn.close() # Uncommenting causes an exception below (sender_handler) + # await r_conn.close() # Uncommenting causes an exception below in sender_handler() except Exception as e: log.exception('Exception in Aether!') log.error(e) @@ -101,7 +95,6 @@ async def redis_connector( try: while True: - message = await r_channel.get_message(ignore_subscribe_messages=True) if message is not None: log.debug(f'(Redis Reader) Message Received: {message}') @@ -113,22 +106,42 @@ async def redis_connector( # log.debug(ws_conn.client_state) log.debug(ws_conn.client_state.name) + # We only want to send to the WS marked as connected. # 'WebSocketState.CONNECTED' or 'WebSocketState.DISCONNECTED' - # if ws_conn.client_state.CONNECTED: - # if ws_conn.client_state.value == 'WebSocketState.CONNECTED': - # if ws_conn.client_state == WebSocketState.CONNECTED: if ws_conn.client_state.name == 'CONNECTED': log.info(f'WS client ({client_id}) connected!') - if group_id and group_id == data.get('group_id'): + send_flag = False + + # Figure out which WS client(s) get the data. + if data.get('target') == 'group' and group_id and group_id == data.get('group_id'): log.info('Sending to matching group ID') - await ws_conn.send_json(data) + # await ws_conn.send_json(data) + send_flag = True elif group_id: log.info('Skip sending to client') - pass - else: + send_flag = False + # continue # Do not send + + if data.get('target') == 'echo' and client_id == data.get('client_id'): + log.info('Echo') + send_flag = True + elif data.get('target') == 'echo': + log.info('Skip sending to client') + send_flag = False + # continue + + if data.get('target') == 'all': + send_flag = True log.info('Sending to all') + # await ws_conn.send_json(data) + + if send_flag: + log.info('Sending!') await ws_conn.send_json(data) + else: + log.info('Not sending') + continue else: log.warning(f'WS client ({client_id}) no longer connected!? They have just missed a message.')