204 lines
9.3 KiB
Python
204 lines
9.3 KiB
Python
from fastapi import APIRouter, FastAPI, Response, WebSocket, WebSocketDisconnect
|
|
from fastapi.responses import HTMLResponse
|
|
from pydantic import BaseModel, EmailStr, Field
|
|
from typing import Dict, List, Optional, Set, Union
|
|
import asyncio, base64, datetime, hashlib, json, os, pathlib, shutil, time
|
|
import redis.asyncio as redis
|
|
|
|
from app.lib_general import log, logging, common_route_params, Common_Route_Params, common_route_params_min, Common_Route_Params_Min
|
|
from app.config import settings
|
|
|
|
router = APIRouter()
|
|
|
|
|
|
@router.websocket('/ws/group/{group_id}/client/{client_id}')
|
|
@router.websocket('/ws_redis/group/{group_id}/client/{client_id}')
|
|
async def redis_ws_client_id(
|
|
websocket: WebSocket,
|
|
group_id: str,
|
|
client_id: str,
|
|
):
|
|
log.setLevel(logging.INFO)
|
|
log.debug(locals())
|
|
|
|
log.info('WS Redis: Group ID and Client ID. Waiting to "accept" a websocket and then the "redis_connector"')
|
|
|
|
await websocket.accept()
|
|
|
|
await redis_connector(websocket, client_id, group_id)
|
|
|
|
|
|
async def redis_connector(
|
|
ws_conn: WebSocket,
|
|
client_id: str,
|
|
group_id = None
|
|
):
|
|
log.setLevel(logging.INFO)
|
|
log.debug(locals())
|
|
|
|
# This receives WS JSON data. Then publishes to Redis channel.
|
|
async def receiver_handler(
|
|
ws_conn: WebSocket,
|
|
r_conn, # Redis connection
|
|
client_id = None,
|
|
group_id = None,
|
|
):
|
|
log.setLevel(logging.INFO)
|
|
log.info('receiver_handler()')
|
|
|
|
# Loop until the WS is disconnected
|
|
# Publish data as messages to Redis
|
|
# Close Redis connection on disconnect
|
|
try:
|
|
while True:
|
|
# Likely properties of the data dict:
|
|
# type, cmd, msg, client_id, group_id
|
|
data = await ws_conn.receive_json() # Returns dict
|
|
log.debug(data)
|
|
|
|
# Likely msg_type values:
|
|
# echo (echo message), dm (direct message), group (group message), all (broadcast message to all), cmd, group_cmd(?)
|
|
msg_type = data.get('type')
|
|
cmd = data.get('cmd')
|
|
msg = data.get('msg')
|
|
|
|
# log.setLevel(logging.INFO)
|
|
log.info(f'Group ID: {group_id}; Client ID: {client_id}; Type: {msg_type};')
|
|
log.debug(f'Command: {cmd}')
|
|
log.debug(f'Message: {msg}')
|
|
|
|
if client_id:
|
|
data['client_id'] = client_id
|
|
if group_id:
|
|
data['group_id'] = group_id
|
|
log.debug(data)
|
|
|
|
if data:
|
|
# Publish to Redis channel
|
|
# The channel should probably be a variable that can be set outside of this API.
|
|
await r_conn.publish('channel:ws', json.dumps(data))
|
|
except WebSocketDisconnect as exc:
|
|
# TODO: This needs handling better?
|
|
# FUTURE: Remove from list of connected clients?
|
|
log.error(exc) # usually 1001?
|
|
# await r_conn.close() # Uncommenting causes an exception below in sender_handler()
|
|
except Exception as e:
|
|
log.exception('Exception in Aether!')
|
|
log.error(e)
|
|
await r_conn.close()
|
|
|
|
# Also called the Redis reader for a channel subscription
|
|
# This watches a Redis channel. Then sends out the message.
|
|
async def sender_handler(
|
|
r_channel, # Redis channel
|
|
ws_conn: WebSocket,
|
|
client_id = None,
|
|
group_id = None,
|
|
):
|
|
log.setLevel(logging.INFO)
|
|
log.info(f'sender_handler() {r_channel} {ws_conn} {client_id} {group_id}')
|
|
|
|
try:
|
|
while True:
|
|
# log.setLevel(logging.DEBUG)
|
|
|
|
# WARNING: If the timeout is not defined in the function call to something greater than 0 (the default if not set?) or None, the get_message() method will cause high CPU usage and or just not work. Or at least that specific thread will just hang. This is because the timeout is set to 0. This is not the same as None.
|
|
# The get_message() method is a blocking method (timeout=None) that waits for a message to be published to the channel. This seems to be a good solution for now. The other option is to set a timeout of something like 1 second or .01 seconds to do other things (like send a heartbeat after each timeout).
|
|
# message = await r_channel.get_message(ignore_subscribe_messages=True, timeout=None) # Timeout should be None. Other options are something like: 1 second, .1 seconds, or .01 seconds
|
|
message = await r_channel.get_message(ignore_subscribe_messages=True, timeout=.1) # Timeout should be None. Other options are something like: 1 second, .1 seconds, or .01 seconds
|
|
log.debug(f'(Redis Reader) Message received or timeout reached: {message}')
|
|
if message is not None:
|
|
log.debug(f'(Redis Reader) Message Received: {message}')
|
|
data = json.loads(message['data'])
|
|
|
|
# Need to make sure the ws_connection is still open before trying to send.
|
|
log.debug(vars(ws_conn))
|
|
|
|
# log.debug(ws_conn.client_state)
|
|
log.debug(ws_conn.client_state.name)
|
|
|
|
# We only want to send to the WS marked as connected.
|
|
# if ws_conn.client_state == 'WebSocketState.CONNECTED'
|
|
# if ws_conn.client_state == 'WebSocketState.DISCONNECTED'
|
|
|
|
if ws_conn.client_state.name == 'CONNECTED':
|
|
log.info(f'WS client ({client_id}) connected!')
|
|
|
|
send_flag = False
|
|
|
|
# Figure out which WS client(s) get the data.
|
|
if data.get('target') == 'group' and group_id and group_id == data.get('group_id'):
|
|
log.info('Sending to matching group ID')
|
|
# await ws_conn.send_json(data)
|
|
send_flag = True
|
|
elif group_id:
|
|
log.debug('WS group: Skip sending to client')
|
|
send_flag = False
|
|
# continue # Do not send
|
|
|
|
if data.get('target') == 'echo' and client_id == data.get('client_id'):
|
|
log.info('Sending echo to client')
|
|
send_flag = True
|
|
elif data.get('target') == 'echo':
|
|
log.debug('WS echo: Skip sending to client')
|
|
send_flag = False
|
|
# continue
|
|
|
|
if data.get('target') == 'all':
|
|
send_flag = True
|
|
log.info('Sending to all')
|
|
# await ws_conn.send_json(data)
|
|
|
|
if send_flag:
|
|
log.debug('Sending!')
|
|
await ws_conn.send_json(data)
|
|
else:
|
|
log.info('Not sending')
|
|
continue
|
|
else: # == 'WebSocketState.DISCONNECTED'
|
|
log.warning(f'WS Redis client {client_id}: Skipping not connected (orphan) WS client. They missed this message. They should be removed from the Redis list of connected clients.') # Use unsubscribe to remove from list of connected clients.
|
|
await r_channel.unsubscribe() # Added line 2024-05-21
|
|
|
|
if data == 'STOP':
|
|
log.debug('(Reader) STOP')
|
|
break
|
|
except ConnectionError as conn_e:
|
|
# TODO: This needs handling better?
|
|
log.error(conn_e)
|
|
# await r_channel.unsubscribe()
|
|
await r_channel.close()
|
|
# except ConnectionClosedError as conn_close_e:
|
|
# log.error(conn_close_e)
|
|
# except redis.exceptions.ConnectionError as e:
|
|
# TODO: This needs handling better?
|
|
# log.error(e)
|
|
# except WebSocketDisconnect as ws_disconn_e:
|
|
# log.error(ws_disconn_e)
|
|
except Exception as e:
|
|
log.exception('Exception in Aether!')
|
|
log.error(e)
|
|
|
|
# raise RuntimeError(msg % message_type)
|
|
# RuntimeError: Unexpected ASGI message 'websocket.send', after sending 'websocket.close'.
|
|
|
|
# raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR) from None
|
|
# redis.exceptions.ConnectionError: Connection closed by server.
|
|
|
|
|
|
# Redis client bound to pool of connections (auto-reconnecting).
|
|
log.info('Create async Redis connection')
|
|
redis_conn = redis.Redis(host=settings.REDIS['server'], port=settings.REDIS['port'], db=6, password=None, encoding='utf-8', decode_responses=True)
|
|
|
|
redis_conn_pubsub = redis_conn.pubsub()
|
|
|
|
await redis_conn_pubsub.subscribe('channel:ws', 'channel:1', 'channel:2', 'chat:c')
|
|
|
|
log.info('Create receiver task')
|
|
future_receiver = asyncio.create_task(receiver_handler(ws_conn, redis_conn, client_id, group_id))
|
|
|
|
log.info('Create sender task')
|
|
future_sender = asyncio.create_task(sender_handler(redis_conn_pubsub, ws_conn, client_id, group_id))
|
|
|
|
await future_receiver # Get WS messages and set in Redis
|
|
await future_sender # Send out messages from Redis subscribe
|