Files
OSIT-AE-API-FastAPI/app/routers/websockets_redis.py
2023-04-07 14:09:48 -04:00

807 lines
31 KiB
Python

from fastapi import APIRouter, FastAPI, Response, WebSocket, WebSocketDisconnect
from fastapi.responses import HTMLResponse
from pydantic import BaseModel, EmailStr, Field
from typing import Dict, List, Optional, Set, Union
import asyncio, base64, datetime, hashlib, json, os, pathlib, shutil, time
import redis.asyncio as redis
# import aioredis, redis, asyncio, base64, datetime, hashlib, json, os, pathlib, shutil, time
# from aioredis import from_url, Redis
# import asyncio
# import aioredis
# import async_timeout
from app.lib_general import log, logging, common_route_params, Common_Route_Params, common_route_params_min, Common_Route_Params_Min
from app.config import settings
router = APIRouter()
class ConnectionManager:
def __init__(self):
# NOTE: The active_connections list should be in Redis
self.active_connections: List[WebSocket] = []
async def connect(self, websocket: WebSocket):
await websocket.accept()
log.setLevel(logging.DEBUG)
log.info('WS connect: Header Security Key:'+websocket.headers.get('sec-websocket-key'))
# log.debug(dir(websocket))
# log.debug(websocket.client) # the Address(host=ip, port=number)
# log.info('WS Key: '+websocket.headers.get('sec-websocket-key'))
# log.debug(websocket.headers) # Most useful
# log.debug(websocket.app) # Nope
# log.debug(websocket.app()) # Nope
# log.debug(websocket.session) # Nope
# log.debug(websocket.session()) # Nope
# log.debug(websocket.user) # Nope
# log.debug(websocket.values()) # Not sure if useful
self.active_connections.append(websocket)
log.debug(self.active_connections)
redis_conn = redis.Redis(host=settings.REDIS['server'], port=settings.REDIS['port'], db=6, password=None, encoding='utf-8', decode_responses=True)
# message = {'hello': 'world', 'goodbye': 'test'}
# data = json.loads(message)
ws_header_key = websocket.headers.get('sec-websocket-key')
key_name = f'ws:sec_key:{ws_header_key}'
data = {'ws_header_key': ws_header_key}
redis_conn.setex(key_name, datetime.timedelta(minutes=90), json.dumps(data))
r_entry = json.loads(redis_conn.get(key_name)) # For testing...
r_ttl = redis_conn.ttl(key_name) # For testing...
# log.debug(r_entry) # For testing...
# log.debug(r_entry.get('ws_header_key')) # For testing...
# log.debug(r_ttl) # For testing...
# r_entries = redis.scan()
# log.debug(r_entries)
redis_conn.close()
# await redis.close()
async def connect_w_client_id(self, websocket: WebSocket, client_id: str, group_id = None):
await websocket.accept()
log.setLevel(logging.DEBUG)
log.info('WS connect: Header Security Key:'+websocket.headers.get('sec-websocket-key'))
self.active_connections.append(websocket)
log.debug(self.active_connections)
redis_conn = redis.Redis(host=settings.REDIS['server'], port=settings.REDIS['port'], db=6, password=None, encoding='utf-8', decode_responses=True)
# # ### Redis ### Testing One
# ws_header_key = websocket.headers.get('sec-websocket-key')
# key_name = f'ws:sec_key:{ws_header_key}'
# data = {'ws_header_key': ws_header_key}
# redis_conn.setex(key_name, datetime.timedelta(minutes=90), json.dumps(data))
# r_entry = json.loads(redis_conn.get(key_name)) # For testing...
# r_ttl = redis_conn.ttl(key_name) # For testing...
# # log.debug(r_entry) # For testing...
# # log.debug(r_entry.get('ws_header_key')) # For testing...
# # log.debug(r_ttl) # For testing...
# # ### Redis ### Testing Two!!
# key_name = f'ws:conn:{ws_header_key}'
# data = websocket
# redis_conn.setex(key_name, datetime.timedelta(minutes=90), json.dumps(data))
# r_entry = json.loads(redis_conn.get(key_name)) # For testing...
# log.debug(r_entry) # For testing...
# ### Redis ### Testing Three!!!
ws_header_key = websocket.headers.get('sec-websocket-key')
# # Set WS connection in Redis based on WS header security key
# key_name = f'ws:header_key:{ws_header_key}'
# data = {'ws_conn': websocket}
# redis_conn.setex(key_name, datetime.timedelta(minutes=90), json.dumps(data))
# Get new ws_header_key and group_id
ws_header_key = websocket.headers.get('sec-websocket-key')
group_id = websocket.path_params.get('group_id')
# Figure out key_name
key_name = f'ws:group_id:{group_id}'
# Get the current list from Redis entry
# group_id_ws_conn_li = []
# group_id_ws_conn_li = redis_conn.get(key_name, [])
# Add the new WS connection's header key to the list
redis_conn.lpush(key_name, ws_header_key)
# group_id_ws_conn_li.append(ws_header_key)
# data = group_id_ws_conn_li
# redis_conn.hsetex(key_name, datetime.timedelta(minutes=90), json.dumps(data))
# r_entry = json.loads(redis_conn.lget(key_name)) # For testing...
log.debug('BEGIN: Loop through list key')
log.debug(redis_conn.llen(key_name))
# for i in range(0, redis_conn.llen(key_name)):
# log.debug(redis_conn.lindex(key_name, i))
log.debug('END: Loop through list key')
redis_conn.close()
def disconnect(self, websocket: WebSocket):
log.setLevel(logging.DEBUG)
log.info('WS disconnect')
self.active_connections.remove(websocket)
# Targets: echo, direct, group, broadcast
# send_ text, bytes, json
# receive_ text, bytes, json
async def echo(self, message: str, websocket: WebSocket):
log.setLevel(logging.DEBUG)
# log.debug(dir(websocket))
log.debug(vars(websocket))
log.debug(websocket.url)
log.debug(websocket.client)
log.debug(websocket.client_state)
log.debug(websocket.headers['sec-websocket-key'])
# log.debug(base64.decode(bytes(websocket.headers['sec-websocket-key']), 'utf-8'))
await websocket.send_text(message)
async def direct(self, from_client_id: str, to_client_id: str, data: dict):
log.setLevel(logging.DEBUG)
for connection in self.active_connections:
log.debug(vars(connection))
log.debug(connection)
await connection.send_text(message)
async def group(self, group_id: str, data: dict):
log.setLevel(logging.INFO)
log.debug(locals())
# Single process version...
# for connection in self.active_connections:
# log.debug(vars(connection))
# # websocket.path_params.get('client_id') # Testing...
# # if connection.scope.get('path') == group_id: # Testing...
# if connection.path_params.get('group_id') == group_id:
# log.info('Found matching Group ID')
# await connection.send_json(data)
# Redis version...
redis_conn = redis.Redis(host=settings.REDIS['server'], port=settings.REDIS['port'], db=6, password=None, encoding='utf-8', decode_responses=True)
# Figure out key_name
key_name = f'ws:group_id:{group_id}'
log.debug('BEGIN: Loop through Redis list entry')
log.debug(redis_conn.llen(key_name))
# ws_header_key = connection.headers.get('sec-websocket-key')
# ws_header_key = f'ws:header_key:{ws_header_key}'
# log.debug(f'Looking for: {ws_header_key}')
for i in range(0, redis_conn.llen(key_name)):
log.debug(f'{i}: {redis_conn.lindex(key_name, i)}')
redis_ws_group_ws_header_key = redis_conn.lindex(key_name, i)
# Check if they key
# redis_key_name_ws_conn = f'ws:header_key:{redis_ws_group_ws_header_key}'
# ws_conn_data = json.loads(redis_conn.get('redis_key_name_ws_conn'))
# active_ws_conn = ws_conn_data.get('ws_conn')
# log.debug(active_ws_conn)
for active_ws_conn in self.active_connections:
log.debug(vars(active_ws_conn))
ws_header_key = active_ws_conn.headers.get('sec-websocket-key')
if redis_ws_group_ws_header_key == ws_header_key:
log.info(f'Found key in Redis list!')
await active_ws_conn.send_json(data)
else:
log.debug(f'Key not found in Redis list')
log.debug('END: Loop through Redis list entry')
redis_conn.close()
# NOTE: Same as group, but no filter based on path
async def broadcast(self, message: str):
log.setLevel(logging.INFO)
log.debug(locals())
for connection in self.active_connections:
log.debug(vars(connection))
await connection.send_text(message)
async def send_personal_message(self, message: str, websocket: WebSocket):
log.setLevel(logging.DEBUG)
# log.debug(dir(websocket))
log.debug(vars(websocket))
log.debug(websocket.url)
log.debug(websocket.client)
log.debug(websocket.client_state)
log.debug(websocket.headers['sec-websocket-key'])
# log.debug(base64.decode(bytes(websocket.headers['sec-websocket-key']), 'utf-8'))
await websocket.send_text(message)
# manager = ConnectionManager()
# class AIOConnectionManager:
# def __init__(self):
# # NOTE: The active_connections list should be in Redis
# self.active_connections: List[WebSocket] = []
# async def connect(self, websocket: WebSocket):
# await websocket.accept()
# log.setLevel(logging.DEBUG)
# log.info('WS connect: Header Security Key:'+websocket.headers.get('sec-websocket-key'))
# # log.debug(websocket.client) # the Address(host=ip, port=number)
# # log.info('WS Key: '+websocket.headers.get('sec-websocket-key'))
# # log.debug(websocket.headers) # Most useful
# self.active_connections.append(websocket)
# log.debug(self.active_connections)
# redis_conn = aioredis.Redis(host=settings.REDIS['server'], port=settings.REDIS['port'], db=6, password=None, encoding='utf-8', decode_responses=True)
# # message = {'hello': 'world', 'goodbye': 'test'}
# # data = json.loads(message)
# ws_header_key = websocket.headers.get('sec-websocket-key')
# key_name = f'ws:sec_key:{ws_header_key}'
# data = {'ws_header_key': ws_header_key}
# await redis_conn.setex(key_name, datetime.timedelta(minutes=90), json.dumps(data))
# r_entry = json.loads(await redis_conn.get(key_name)) # For testing...
# r_ttl = await redis_conn.ttl(key_name) # For testing...
# # log.debug(r_entry) # For testing...
# # log.debug(r_entry.get('ws_header_key')) # For testing...
# # log.debug(r_ttl) # For testing...
# # r_entries = redis.scan()
# # log.debug(r_entries)
# redis_conn.close()
# # await redis.close()
# async def connect_w_client_id(self, websocket: WebSocket, client_id: str, group_id = None):
# await websocket.accept()
# log.setLevel(logging.DEBUG)
# log.info('WS connect: Header Security Key:'+websocket.headers.get('sec-websocket-key'))
# self.active_connections.append(websocket)
# log.debug(self.active_connections)
# redis_conn = redis.Redis(host=settings.REDIS['server'], port=settings.REDIS['port'], db=6, password=None, encoding='utf-8', decode_responses=True)
# # # ### Redis ### Testing One
# # ws_header_key = websocket.headers.get('sec-websocket-key')
# # key_name = f'ws:sec_key:{ws_header_key}'
# # data = {'ws_header_key': ws_header_key}
# # redis_conn.setex(key_name, datetime.timedelta(minutes=90), json.dumps(data))
# # r_entry = json.loads(redis_conn.get(key_name)) # For testing...
# # r_ttl = redis_conn.ttl(key_name) # For testing...
# # # log.debug(r_entry) # For testing...
# # # log.debug(r_entry.get('ws_header_key')) # For testing...
# # # log.debug(r_ttl) # For testing...
# # # ### Redis ### Testing Two!!
# # key_name = f'ws:conn:{ws_header_key}'
# # data = websocket
# # redis_conn.setex(key_name, datetime.timedelta(minutes=90), json.dumps(data))
# # r_entry = json.loads(redis_conn.get(key_name)) # For testing...
# # log.debug(r_entry) # For testing...
# # ### Redis ### Testing Three!!!
# ws_header_key = websocket.headers.get('sec-websocket-key')
# # # Set WS connection in Redis based on WS header security key
# # key_name = f'ws:header_key:{ws_header_key}'
# # data = {'ws_conn': websocket}
# # redis_conn.setex(key_name, datetime.timedelta(minutes=90), json.dumps(data))
# # Get new ws_header_key and group_id
# ws_header_key = websocket.headers.get('sec-websocket-key')
# group_id = websocket.path_params.get('group_id')
# # Figure out key_name
# key_name = f'ws:group_id:{group_id}'
# # Get the current list from Redis entry
# # group_id_ws_conn_li = []
# # group_id_ws_conn_li = redis_conn.get(key_name, [])
# # Add the new WS connection's header key to the list
# redis_conn.lpush(key_name, ws_header_key)
# # group_id_ws_conn_li.append(ws_header_key)
# # data = group_id_ws_conn_li
# # redis_conn.hsetex(key_name, datetime.timedelta(minutes=90), json.dumps(data))
# # r_entry = json.loads(redis_conn.lget(key_name)) # For testing...
# log.debug('BEGIN: Loop through list key')
# log.debug(redis_conn.llen(key_name))
# # for i in range(0, redis_conn.llen(key_name)):
# # log.debug(redis_conn.lindex(key_name, i))
# log.debug('END: Loop through list key')
# redis_conn.close()
# def disconnect(self, websocket: WebSocket):
# log.setLevel(logging.DEBUG)
# log.info('WS disconnect')
# self.active_connections.remove(websocket)
# # Targets: echo, direct, group, broadcast
# # send_ text, bytes, json
# # receive_ text, bytes, json
# async def echo(self, message: str, websocket: WebSocket):
# log.setLevel(logging.DEBUG)
# # log.debug(dir(websocket))
# log.debug(vars(websocket))
# log.debug(websocket.url)
# log.debug(websocket.client)
# log.debug(websocket.client_state)
# log.debug(websocket.headers['sec-websocket-key'])
# # log.debug(base64.decode(bytes(websocket.headers['sec-websocket-key']), 'utf-8'))
# await websocket.send_text(message)
# async def direct(self, from_client_id: str, to_client_id: str, data: dict):
# log.setLevel(logging.DEBUG)
# for connection in self.active_connections:
# log.debug(vars(connection))
# log.debug(connection)
# await connection.send_text(message)
# async def group(self, group_id: str, data: dict):
# log.setLevel(logging.INFO)
# log.debug(locals())
# # Single process version...
# # for connection in self.active_connections:
# # log.debug(vars(connection))
# # # websocket.path_params.get('client_id') # Testing...
# # # if connection.scope.get('path') == group_id: # Testing...
# # if connection.path_params.get('group_id') == group_id:
# # log.info('Found matching Group ID')
# # await connection.send_json(data)
# # Redis version...
# redis_conn = redis.Redis(host=settings.REDIS['server'], port=settings.REDIS['port'], db=6, password=None, encoding='utf-8', decode_responses=True)
# # Figure out key_name
# key_name = f'ws:group_id:{group_id}'
# log.debug('BEGIN: Loop through Redis list entry')
# log.debug(redis_conn.llen(key_name))
# # ws_header_key = connection.headers.get('sec-websocket-key')
# # ws_header_key = f'ws:header_key:{ws_header_key}'
# # log.debug(f'Looking for: {ws_header_key}')
# for i in range(0, redis_conn.llen(key_name)):
# log.debug(f'{i}: {redis_conn.lindex(key_name, i)}')
# redis_ws_group_ws_header_key = redis_conn.lindex(key_name, i)
# # Check if they key
# # redis_key_name_ws_conn = f'ws:header_key:{redis_ws_group_ws_header_key}'
# # ws_conn_data = json.loads(redis_conn.get('redis_key_name_ws_conn'))
# # active_ws_conn = ws_conn_data.get('ws_conn')
# # log.debug(active_ws_conn)
# for active_ws_conn in self.active_connections:
# log.debug(vars(active_ws_conn))
# ws_header_key = active_ws_conn.headers.get('sec-websocket-key')
# if redis_ws_group_ws_header_key == ws_header_key:
# log.info(f'Found key in Redis list!')
# await active_ws_conn.send_json(data)
# else:
# log.debug(f'Key not found in Redis list')
# log.debug('END: Loop through Redis list entry')
# redis_conn.close()
# # NOTE: Same as group, but no filter based on path
# async def broadcast(self, message: str):
# log.setLevel(logging.INFO)
# log.debug(locals())
# for connection in self.active_connections:
# log.debug(vars(connection))
# await connection.send_text(message)
# async def send_personal_message(self, message: str, websocket: WebSocket):
# log.setLevel(logging.DEBUG)
# # log.debug(dir(websocket))
# log.debug(vars(websocket))
# log.debug(websocket.url)
# log.debug(websocket.client)
# log.debug(websocket.client_state)
# log.debug(websocket.headers['sec-websocket-key'])
# # log.debug(base64.decode(bytes(websocket.headers['sec-websocket-key']), 'utf-8'))
# await websocket.send_text(message)
# aio_manager = AIOConnectionManager()
# Endpoints for???
# /room/<id> (just a group of clients; for a related group like a poster presenter or session room)
# /client/<id> (for one specific client/browser; something specific to a browser???)
# /person/<id> (for one specific person; handles send and receiving their messages)
# @router.websocket('/ws_redis/group/{group_id}/client/{client_id}')
# async def ws_client_id(
# websocket: WebSocket,
# group_id: str,
# client_id: str,
# ):
# await manager.connect_w_client_id(websocket, client_id, group_id)
# # await aio_manager.connect_w_client_id(websocket, client_id, group_id)
# try:
# while True:
# data = await websocket.receive_json() # Returns dict
# # log.debug(data)
# # group_path_id = f'/ws/group/{group_id}'
# # client_id = data.get('client_id')
# # echo (echo message), dm (direct message), group (group message), all (broadcast message to all), cmd, group_cmd(?)
# msg_type = data.get('type')
# cmd = data.get('cmd')
# msg = data.get('msg')
# log.setLevel(logging.INFO)
# log.info(f'Group ID: {group_id}; Client ID: {client_id}; Type: {msg_type};')
# log.debug(f'Command: {cmd}')
# log.debug(f'Message: {msg}')
# data['client_id'] = client_id
# data['group_id'] = group_id
# await manager.group(group_id=group_id, data=data)
# # await aio_manager.group(group_id=group_id, data=data)
# await redis_connector(websocket)
# except WebSocketDisconnect:
# manager.disconnect(websocket)
# # aio_manager.disconnect(websocket)
# # await manager.broadcast(f'Client #{client_id} left')
# @router.websocket('/ws_redis/looping')
# async def ws_looping(
# websocket: WebSocket,
# ):
# await manager.connect(websocket)
# # await manager.broadcast(f'Welcome to looping')
# try:
# while True:
# # NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING
# # await time.sleep(3.5) # NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING
# # NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING
# # data = await websocket.receive_json()
# # log.debug(data)
# # data_dict = data
# # data_dict = json.loads(data)
# # log.debug(data_dict['client_id'])
# # await manager.send_personal_message(f'You wrote: {data}', websocket)
# await manager.broadcast(f'Loop!!!')
# except WebSocketDisconnect:
# manager.disconnect(websocket)
# await manager.broadcast(f'Client left looping')
# @router.websocket("/ws_redis/{client_id}")
# async def websocket_endpoint(
# websocket: WebSocket,
# client_id: int,
# response: Response = Response,
# ):
# log.setLevel(logging.DEBUG)
# log.debug(locals())
# log.info('Root of ws. Waiting to accept a websocket and then the redis_connector')
# await websocket.accept()
# await redis_connector(websocket)
# await manager.connect_w_client_id(websocket, client_id, group_id)
# # await aio_manager.connect_w_client_id(websocket, client_id, group_id)
# try:
# while True:
# data = await websocket.receive_json() # Returns dict
# # log.debug(data)
# # group_path_id = f'/ws/group/{group_id}'
# # client_id = data.get('client_id')
# # echo (echo message), dm (direct message), group (group message), all (broadcast message to all), cmd, group_cmd(?)
# msg_type = data.get('type')
# cmd = data.get('cmd')
# msg = data.get('msg')
# log.setLevel(logging.INFO)
# log.info(f'Group ID: {group_id}; Client ID: {client_id}; Type: {msg_type};')
# log.debug(f'Command: {cmd}')
# log.debug(f'Message: {msg}')
# data['client_id'] = client_id
# data['group_id'] = group_id
# await manager.group(group_id=group_id, data=data)
# # await aio_manager.group(group_id=group_id, data=data)
# await redis_connector(websocket)
# except WebSocketDisconnect:
# manager.disconnect(websocket)
# # aio_manager.disconnect(websocket)
# # await manager.broadcast(f'Client #{client_id} left')
@router.websocket('/ws/group/{group_id}/client/{client_id}')
@router.websocket('/ws_redis/group/{group_id}/client/{client_id}')
async def redis_ws_client_id(
websocket: WebSocket,
group_id: str,
client_id: str,
):
log.setLevel(logging.DEBUG)
log.debug(locals())
log.info('WS Redis: Group ID and Client ID. Waiting to "accept" a websocket and then the "redis_connector"')
await websocket.accept()
await redis_connector(websocket, client_id, group_id)
async def redis_connector(
ws_conn: WebSocket,
client_id: str,
group_id = None
):
log.setLevel(logging.DEBUG)
log.debug(locals())
# The consumer is receiving WS JSON data
# ws_receiver_json_handler()
# receiver_handler() # RENAME TO THIS
async def consumer_handler(ws_conn: WebSocket, r_conn, client_id, group_id):
log.info('consumer_handler()')
# Loop until the WS is disconnected
# Publish data as messages to Redis
# Close Redis connection on disconnect
try:
while True:
data = await ws_conn.receive_json() # Returns dict
log.debug(data)
# echo (echo message), dm (direct message), group (group message), all (broadcast message to all), cmd, group_cmd(?)
msg_type = data.get('type')
cmd = data.get('cmd')
msg = data.get('msg')
log.setLevel(logging.INFO)
log.info(f'Group ID: {group_id}; Client ID: {client_id}; Type: {msg_type};')
log.debug(f'Command: {cmd}')
log.debug(f'Message: {msg}')
data['client_id'] = client_id
data['group_id'] = group_id
log.setLevel(logging.DEBUG)
log.debug(data)
if data:
#logging.info(ws)
#logging.info(dir(message))
# data = json.loads(message)
#await r.publish("chat:c", message)
#await r.publish("chat:c", str(data['message']))
# causes: TypeError: object int can't be used in 'await' expression
# await r.publish('chat:c', json.dumps(data.get('client_id')))
# log.debug(await r.get('chat:c'))
await r_conn.publish('chat:c', json.dumps(data))
# log.debug(await r.get('chat:c'))
# r.publish('chat:c', str({'test': 'xxxx'}))
# r.publish('chat:c', str('data goes here...'))
except WebSocketDisconnect as exc:
# TODO: This needs handling better?
# FUTURE: Remove from list of connected clients?
log.error(exc)
await r_conn.close()
except Exception as e:
log.exception('Aether Exception!')
log.error(e)
await r_conn.close()
# log.debug(await r.get('chat:c'))
# Also called the Redis reader for a channel subscription
# This watches a Redis channel. Then sends out the message.
# redis_watcher_ws_sender_json_handler()
# sender_handler() # RENAME TO THIS
async def producer_handler(r_channel, ws_conn: WebSocket, client_id, group_id):
log.info('producer_handler()')
try:
while True:
message = await r_channel.get_message(ignore_subscribe_messages=True)
if message is not None:
log.debug(f'(Redis Reader) Message Received: {message}')
data = json.loads(message['data'])
if group_id and group_id == data.get('group_id'):
log.info('Sending to matching group ID')
await ws_conn.send_json(data)
elif group_id:
log.info('Skip sending to client')
pass
else:
log.info('Sending to all')
await ws_conn.send_json(data)
if data == STOPWORD:
log.debug('(Reader) STOP')
break
except ConnectionError as conn_e:
# TODO: This needs handling better?
log.error(conn_e)
except Exception as e:
# raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR) from None
# redis.exceptions.ConnectionError: Connection closed by server.
log.error(e)
# (channel,) = await r.subscribe('chat:c')
# assert isinstance(channel, redis.Channel)
# try:
# while True:
# data = await channel.get()
# if data:
# await ws_conn.send_json(data)
# except Exception as exc:
# # TODO this needs handling better
# log.error(exc)
# redis_conn = await redis.create_pool(redis_url)
# Redis client bound to pool of connections (auto-reconnecting).
# r = redis.Redis(host=settings.REDIS['server'], port=settings.REDIS['port'], db=7, password=None, decode_responses=True)
log.info('Create Redis connection')
redis_conn = redis.Redis(host=settings.REDIS['server'], port=settings.REDIS['port'], db=6, password=None, encoding='utf-8', decode_responses=True)
# redis = redis.from_url(
# redis_url, encoding="utf-8", decode_responses=True
# )
redis_conn_pubsub = redis_conn.pubsub()
await redis_conn_pubsub.subscribe('channel:1', 'channel:2', 'chat:c')
log.info('Run consumer_task')
# consumer_task = consumer_handler(ws_conn, redis_conn, client_id, group_id)
# future_consumer = asyncio.create_task(consumer_task())
future_consumer = asyncio.create_task(consumer_handler(ws_conn, redis_conn, client_id, group_id))
log.info('Run producer_task')
# producer_task = producer_handler(redis_conn_pubsub, ws_conn, client_id, group_id)
# future_producer = asyncio.create_task(producer_task())
future_producer = asyncio.create_task(producer_handler(redis_conn_pubsub, ws_conn, client_id, group_id))
await future_consumer # Get WS messages and set in Redis
await future_producer # Send out messages from Redis subscribe
# @app.on_event('shutdown')
# async def shutdown():
# log.info('The Aether FastAPI API is shutting down...')
# # await ws_conn.close()
# await redis_conn.close()
# pending_tasks = asyncio.all_tasks(loop=event_loop)
# # cancel each pending task
# for task in pending_tasks:
# logger.info(f'killing task {task.get_coro()}')
# event_loop.call_soon_threadsafe(task.cancel)
# # This does not work for some reason.
# # "Passing coroutines is forbidden, use tasks explicitly."
# log.info('Start asyncio wait')
# done, pending = await asyncio.wait(
# [consumer_task, producer_task],
# return_when = asyncio.FIRST_COMPLETED,
# )
# log.debug(f'Done task: {done}')
# for task in pending:
# log.debug(f'Canceling task: {task}')
# task.cancel()
# await redis_conn.close()
# await redis_conn.wait_closed()
STOPWORD = 'STOP'
# async def reader(channel: redis.client.PubSub):
# while True:
# message = await channel.get_message(ignore_subscribe_messages=True)
# if message is not None:
# log.debug(f'(Reader) Message Received: {message}')
# log.info('WHAT NOW???')
# if message['data'].decode() == STOPWORD:
# log.debug('(Reader) STOP')
# break
# r = redis.from_url('redis://localhost')
# r = redis.Redis(host=settings.REDIS['server'], port=settings.REDIS['port'], db=6, password=None, encoding='utf-8', decode_responses=True)
# async with r.pubsub() as pubsub:
# await pubsub.subscribe('channel:1', 'channel:2', 'chat:c')
# future = asyncio.create_task(reader(pubsub))
# await r.publish('channel:1', 'Hello')
# await r.publish('channel:2', 'World')
# await r.publish('channel:1', STOPWORD)
# await future
# @app.on_event('shutdown')
# async def shutdown():
# log.setLevel(logging.INFO) # DEBUG, INFO, WARN, WARNING, ERROR, EXCEPTION, CRITICAL
# log.debug(locals())
# log.info('The Aether FastAPI API is shutting down...')
# #await database.disconnect()
# r.close()