Files
OSIT-AE-API-FastAPI/app/routers/websockets_redis.py
2023-04-07 18:09:02 -04:00

190 lines
7.5 KiB
Python

from fastapi import APIRouter, FastAPI, Response, WebSocket, WebSocketDisconnect
from fastapi.responses import HTMLResponse
from pydantic import BaseModel, EmailStr, Field
from typing import Dict, List, Optional, Set, Union
import asyncio, base64, datetime, hashlib, json, os, pathlib, shutil, time
import redis.asyncio as redis
from app.lib_general import log, logging, common_route_params, Common_Route_Params, common_route_params_min, Common_Route_Params_Min
from app.config import settings
router = APIRouter()
@router.websocket('/ws/group/{group_id}/client/{client_id}')
@router.websocket('/ws_redis/group/{group_id}/client/{client_id}')
async def redis_ws_client_id(
websocket: WebSocket,
group_id: str,
client_id: str,
):
log.setLevel(logging.INFO)
log.debug(locals())
log.info('WS Redis: Group ID and Client ID. Waiting to "accept" a websocket and then the "redis_connector"')
await websocket.accept()
await redis_connector(websocket, client_id, group_id)
async def redis_connector(
ws_conn: WebSocket,
client_id: str,
group_id = None
):
log.setLevel(logging.INFO)
log.debug(locals())
# The receives WS JSON data. Then publishes to Redis channel.
async def receiver_handler(
ws_conn: WebSocket,
r_conn,
client_id = None,
group_id = None,
):
log.setLevel(logging.INFO)
log.info('receiver_handler()')
# Loop until the WS is disconnected
# Publish data as messages to Redis
# Close Redis connection on disconnect
try:
while True:
data = await ws_conn.receive_json() # Returns dict
log.debug(data)
# echo (echo message), dm (direct message), group (group message), all (broadcast message to all), cmd, group_cmd(?)
msg_type = data.get('type')
cmd = data.get('cmd')
msg = data.get('msg')
log.setLevel(logging.INFO)
log.info(f'Group ID: {group_id}; Client ID: {client_id}; Type: {msg_type};')
log.debug(f'Command: {cmd}')
log.debug(f'Message: {msg}')
if client_id:
data['client_id'] = client_id
if group_id:
data['group_id'] = group_id
log.debug(data)
if data:
await r_conn.publish('channel:ws', json.dumps(data))
except WebSocketDisconnect as exc:
# TODO: This needs handling better?
# FUTURE: Remove from list of connected clients?
log.error(exc) # usually 1001?
# await r_conn.close() # Uncommenting causes an exception below in sender_handler()
except Exception as e:
log.exception('Exception in Aether!')
log.error(e)
await r_conn.close()
# Also called the Redis reader for a channel subscription
# This watches a Redis channel. Then sends out the message.
async def sender_handler(
r_channel,
ws_conn: WebSocket,
client_id = None,
group_id = None,
):
log.setLevel(logging.INFO)
log.info('sender_handler()')
try:
while True:
message = await r_channel.get_message(ignore_subscribe_messages=True)
if message is not None:
log.debug(f'(Redis Reader) Message Received: {message}')
data = json.loads(message['data'])
# Need to make sure the ws_connection is still open before trying to send.
log.debug(vars(ws_conn))
# log.debug(ws_conn.client_state)
log.debug(ws_conn.client_state.name)
# We only want to send to the WS marked as connected.
# 'WebSocketState.CONNECTED' or 'WebSocketState.DISCONNECTED'
if ws_conn.client_state.name == 'CONNECTED':
log.info(f'WS client ({client_id}) connected!')
send_flag = False
# Figure out which WS client(s) get the data.
if data.get('target') == 'group' and group_id and group_id == data.get('group_id'):
log.info('Sending to matching group ID')
# await ws_conn.send_json(data)
send_flag = True
elif group_id:
log.info('Skip sending to client')
send_flag = False
# continue # Do not send
if data.get('target') == 'echo' and client_id == data.get('client_id'):
log.info('Echo')
send_flag = True
elif data.get('target') == 'echo':
log.info('Skip sending to client')
send_flag = False
# continue
if data.get('target') == 'all':
send_flag = True
log.info('Sending to all')
# await ws_conn.send_json(data)
if send_flag:
log.info('Sending!')
await ws_conn.send_json(data)
else:
log.info('Not sending')
continue
else:
log.warning(f'WS client ({client_id}) no longer connected!? They have just missed a message.')
if data == 'STOP':
log.debug('(Reader) STOP')
break
except ConnectionError as conn_e:
# TODO: This needs handling better?
log.error(conn_e)
# await r_channel.unsubscribe()
await r_channel.close()
# except ConnectionClosedError as conn_close_e:
# log.error(conn_close_e)
# except redis.exceptions.ConnectionError as e:
# TODO: This needs handling better?
# log.error(e)
# except WebSocketDisconnect as ws_disconn_e:
# log.error(ws_disconn_e)
except Exception as e:
log.exception('Exception in Aether!')
log.error(e)
# raise RuntimeError(msg % message_type)
# RuntimeError: Unexpected ASGI message 'websocket.send', after sending 'websocket.close'.
# raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR) from None
# redis.exceptions.ConnectionError: Connection closed by server.
# Redis client bound to pool of connections (auto-reconnecting).
log.info('Create async Redis connection')
redis_conn = redis.Redis(host=settings.REDIS['server'], port=settings.REDIS['port'], db=6, password=None, encoding='utf-8', decode_responses=True)
redis_conn_pubsub = redis_conn.pubsub()
await redis_conn_pubsub.subscribe('channel:ws', 'channel:1', 'channel:2', 'chat:c')
log.info('Create receiver task')
future_receiver = asyncio.create_task(receiver_handler(ws_conn, redis_conn, client_id, group_id))
log.info('Create sender task')
future_sender = asyncio.create_task(sender_handler(redis_conn_pubsub, ws_conn, client_id, group_id))
await future_receiver # Get WS messages and set in Redis
await future_sender # Send out messages from Redis subscribe