190 lines
7.5 KiB
Python
190 lines
7.5 KiB
Python
from fastapi import APIRouter, FastAPI, Response, WebSocket, WebSocketDisconnect
|
|
from fastapi.responses import HTMLResponse
|
|
from pydantic import BaseModel, EmailStr, Field
|
|
from typing import Dict, List, Optional, Set, Union
|
|
import asyncio, base64, datetime, hashlib, json, os, pathlib, shutil, time
|
|
import redis.asyncio as redis
|
|
|
|
from app.lib_general import log, logging, common_route_params, Common_Route_Params, common_route_params_min, Common_Route_Params_Min
|
|
from app.config import settings
|
|
|
|
router = APIRouter()
|
|
|
|
|
|
@router.websocket('/ws/group/{group_id}/client/{client_id}')
|
|
@router.websocket('/ws_redis/group/{group_id}/client/{client_id}')
|
|
async def redis_ws_client_id(
|
|
websocket: WebSocket,
|
|
group_id: str,
|
|
client_id: str,
|
|
):
|
|
log.setLevel(logging.INFO)
|
|
log.debug(locals())
|
|
|
|
log.info('WS Redis: Group ID and Client ID. Waiting to "accept" a websocket and then the "redis_connector"')
|
|
|
|
await websocket.accept()
|
|
|
|
await redis_connector(websocket, client_id, group_id)
|
|
|
|
|
|
async def redis_connector(
|
|
ws_conn: WebSocket,
|
|
client_id: str,
|
|
group_id = None
|
|
):
|
|
log.setLevel(logging.INFO)
|
|
log.debug(locals())
|
|
|
|
# The receives WS JSON data. Then publishes to Redis channel.
|
|
async def receiver_handler(
|
|
ws_conn: WebSocket,
|
|
r_conn,
|
|
client_id = None,
|
|
group_id = None,
|
|
):
|
|
log.setLevel(logging.INFO)
|
|
log.info('receiver_handler()')
|
|
|
|
# Loop until the WS is disconnected
|
|
# Publish data as messages to Redis
|
|
# Close Redis connection on disconnect
|
|
try:
|
|
while True:
|
|
data = await ws_conn.receive_json() # Returns dict
|
|
log.debug(data)
|
|
|
|
# echo (echo message), dm (direct message), group (group message), all (broadcast message to all), cmd, group_cmd(?)
|
|
msg_type = data.get('type')
|
|
cmd = data.get('cmd')
|
|
msg = data.get('msg')
|
|
|
|
log.setLevel(logging.INFO)
|
|
log.info(f'Group ID: {group_id}; Client ID: {client_id}; Type: {msg_type};')
|
|
log.debug(f'Command: {cmd}')
|
|
log.debug(f'Message: {msg}')
|
|
|
|
if client_id:
|
|
data['client_id'] = client_id
|
|
if group_id:
|
|
data['group_id'] = group_id
|
|
log.debug(data)
|
|
|
|
if data:
|
|
await r_conn.publish('channel:ws', json.dumps(data))
|
|
except WebSocketDisconnect as exc:
|
|
# TODO: This needs handling better?
|
|
# FUTURE: Remove from list of connected clients?
|
|
log.error(exc) # usually 1001?
|
|
# await r_conn.close() # Uncommenting causes an exception below in sender_handler()
|
|
except Exception as e:
|
|
log.exception('Exception in Aether!')
|
|
log.error(e)
|
|
await r_conn.close()
|
|
|
|
# Also called the Redis reader for a channel subscription
|
|
# This watches a Redis channel. Then sends out the message.
|
|
async def sender_handler(
|
|
r_channel,
|
|
ws_conn: WebSocket,
|
|
client_id = None,
|
|
group_id = None,
|
|
):
|
|
log.setLevel(logging.INFO)
|
|
log.info('sender_handler()')
|
|
|
|
try:
|
|
while True:
|
|
message = await r_channel.get_message(ignore_subscribe_messages=True)
|
|
if message is not None:
|
|
log.debug(f'(Redis Reader) Message Received: {message}')
|
|
data = json.loads(message['data'])
|
|
|
|
# Need to make sure the ws_connection is still open before trying to send.
|
|
log.debug(vars(ws_conn))
|
|
|
|
# log.debug(ws_conn.client_state)
|
|
log.debug(ws_conn.client_state.name)
|
|
|
|
# We only want to send to the WS marked as connected.
|
|
# 'WebSocketState.CONNECTED' or 'WebSocketState.DISCONNECTED'
|
|
if ws_conn.client_state.name == 'CONNECTED':
|
|
log.info(f'WS client ({client_id}) connected!')
|
|
|
|
send_flag = False
|
|
|
|
# Figure out which WS client(s) get the data.
|
|
if data.get('target') == 'group' and group_id and group_id == data.get('group_id'):
|
|
log.info('Sending to matching group ID')
|
|
# await ws_conn.send_json(data)
|
|
send_flag = True
|
|
elif group_id:
|
|
log.info('Skip sending to client')
|
|
send_flag = False
|
|
# continue # Do not send
|
|
|
|
if data.get('target') == 'echo' and client_id == data.get('client_id'):
|
|
log.info('Echo')
|
|
send_flag = True
|
|
elif data.get('target') == 'echo':
|
|
log.info('Skip sending to client')
|
|
send_flag = False
|
|
# continue
|
|
|
|
if data.get('target') == 'all':
|
|
send_flag = True
|
|
log.info('Sending to all')
|
|
# await ws_conn.send_json(data)
|
|
|
|
if send_flag:
|
|
log.info('Sending!')
|
|
await ws_conn.send_json(data)
|
|
else:
|
|
log.info('Not sending')
|
|
continue
|
|
else:
|
|
log.warning(f'WS client ({client_id}) no longer connected!? They have just missed a message.')
|
|
|
|
if data == 'STOP':
|
|
log.debug('(Reader) STOP')
|
|
break
|
|
except ConnectionError as conn_e:
|
|
# TODO: This needs handling better?
|
|
log.error(conn_e)
|
|
# await r_channel.unsubscribe()
|
|
await r_channel.close()
|
|
# except ConnectionClosedError as conn_close_e:
|
|
# log.error(conn_close_e)
|
|
# except redis.exceptions.ConnectionError as e:
|
|
# TODO: This needs handling better?
|
|
# log.error(e)
|
|
# except WebSocketDisconnect as ws_disconn_e:
|
|
# log.error(ws_disconn_e)
|
|
except Exception as e:
|
|
log.exception('Exception in Aether!')
|
|
log.error(e)
|
|
|
|
# raise RuntimeError(msg % message_type)
|
|
# RuntimeError: Unexpected ASGI message 'websocket.send', after sending 'websocket.close'.
|
|
|
|
# raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR) from None
|
|
# redis.exceptions.ConnectionError: Connection closed by server.
|
|
|
|
|
|
# Redis client bound to pool of connections (auto-reconnecting).
|
|
log.info('Create async Redis connection')
|
|
redis_conn = redis.Redis(host=settings.REDIS['server'], port=settings.REDIS['port'], db=6, password=None, encoding='utf-8', decode_responses=True)
|
|
|
|
redis_conn_pubsub = redis_conn.pubsub()
|
|
|
|
await redis_conn_pubsub.subscribe('channel:ws', 'channel:1', 'channel:2', 'chat:c')
|
|
|
|
log.info('Create receiver task')
|
|
future_receiver = asyncio.create_task(receiver_handler(ws_conn, redis_conn, client_id, group_id))
|
|
|
|
log.info('Create sender task')
|
|
future_sender = asyncio.create_task(sender_handler(redis_conn_pubsub, ws_conn, client_id, group_id))
|
|
|
|
await future_receiver # Get WS messages and set in Redis
|
|
await future_sender # Send out messages from Redis subscribe
|