Work on websockets. Finally 97.25% working?

This commit is contained in:
Scott Idem
2023-04-07 17:09:51 -04:00
parent c9fa280c57
commit 6172626254
4 changed files with 51 additions and 703 deletions

View File

@@ -18,7 +18,7 @@ from . import config
from app.log import log, logging from app.log import log, logging
# Import the routers here first: # Import the routers here first:
from app.routers import aether_cfg, api_crud, api, importing, sql, account, activity_log, address, archive, archive_content, contact, cont_edu_cert, cont_edu_cert_person, data_store, event, event_abstract, event_badge, event_badge_importing, event_badge_template, event_device, event_exhibit, event_exhibit_tracking, event_file, event_importing, event_location, event_person, event_person_detail, event_person_tracking, event_presentation, event_presenter, event_registration, event_session, flask_cfg, fundraising, hosted_file, journal, journal_entry, log_client_viewing, lookup, membership_cfg, membership_group, membership_person_group, membership_person, membership_person_profile, membership_type, membership_person_type, order, order_v3, order_line, order_cart, organization, page, person, person_user, post, post_comment, product, qr, site, site_domain, user, websockets, websockets_redis, e_cvent, c_idaa, e_impexium, e_stripe from app.routers import aether_cfg, api_crud, api, importing, sql, account, activity_log, address, archive, archive_content, contact, cont_edu_cert, cont_edu_cert_person, data_store, event, event_abstract, event_badge, event_badge_importing, event_badge_template, event_device, event_exhibit, event_exhibit_tracking, event_file, event_importing, event_location, event_person, event_person_detail, event_person_tracking, event_presentation, event_presenter, event_registration, event_session, flask_cfg, fundraising, hosted_file, journal, journal_entry, log_client_viewing, lookup, membership_cfg, membership_group, membership_person_group, membership_person, membership_person_profile, membership_type, membership_person_type, order, order_v3, order_line, order_cart, organization, page, person, person_user, post, post_comment, product, qr, site, site_domain, user, websockets_redis, e_cvent, c_idaa, e_impexium, e_stripe
from app.db_sql import sql_select # , sql_connect from app.db_sql import sql_select # , sql_connect
@@ -385,16 +385,16 @@ app.include_router(
user.router, user.router,
tags=['User'], tags=['User'],
) )
app.include_router( # app.include_router(
websockets.router, # websockets.router,
#prefix='/websocket', # # prefix='/websocket',
tags=['Websockets'], # tags=['Websockets'],
#dependencies=[Depends(get_token_header)], # # dependencies=[Depends(get_token_header)],
#responses={404: {'description': 'Not found'}}, # # responses={404: {'description': 'Not found'}},
) # )
app.include_router( app.include_router(
websockets_redis.router, websockets_redis.router,
tags=['Websockets Redis'], tags=['Websockets (Redis)'],
) )
app.include_router( app.include_router(

View File

@@ -70,13 +70,13 @@ def load_data_store_obj_w_code(
data['code'] = code data['code'] = code
log.debug(data) log.debug(data)
log.warning(f'Can we get past this?????????? {code}') # log.warning(f'Can we get past this?????????? {code}')
sql_enabled, data['enable'] = sql_enable_part(table_name='data_store', enabled=enabled) # Reasonably safe return str and bool sql_enabled, data['enable'] = sql_enable_part(table_name='data_store', enabled=enabled) # Reasonably safe return str and bool
sql_limit = sql_limit_offset_part(limit=limit, offset=offset) # Reasonably safe return str sql_limit = sql_limit_offset_part(limit=limit, offset=offset) # Reasonably safe return str
log.debug(data) log.debug(data)
log.warning(f'Where are we now??????????? {code}') # log.warning(f'Where are we now??????????? {code}')
sql = f""" sql = f"""
SELECT * SELECT *

View File

@@ -764,89 +764,3 @@ async def redis_connector(
task.cancel() task.cancel()
await redis.close() await redis.close()
# await redis.wait_closed() # await redis.wait_closed()
#### ####
# @app.on_event('shutdown')
# async def shutdown():
# log.info('The Aether FastAPI API is shutting down...')
# # await ws_conn.close()
# await redis_conn.close()
# pending_tasks = asyncio.all_tasks(loop=event_loop)
# # cancel each pending task
# for task in pending_tasks:
# logger.info(f'killing task {task.get_coro()}')
# event_loop.call_soon_threadsafe(task.cancel)
# # This does not work for some reason.
# # "Passing coroutines is forbidden, use tasks explicitly."
# log.info('Start asyncio wait')
# done, pending = await asyncio.wait(
# [receiver_task, producer_task],
# return_when = asyncio.FIRST_COMPLETED,
# )
# log.debug(f'Done task: {done}')
# for task in pending:
# log.debug(f'Canceling task: {task}')
# task.cancel()
# await redis_conn.close()
# await redis_conn.wait_closed()
# (channel,) = await r.subscribe('chat:c')
# assert isinstance(channel, redis.Channel)
# try:
# while True:
# data = await channel.get()
# if data:
# await ws_conn.send_json(data)
# except Exception as exc:
# # TODO this needs handling better
# log.error(exc)
# receiver_task = receiver_handler(ws_conn, redis_conn, client_id, group_id)
# future_receiver = asyncio.create_task(receiver_task())
# sender_task = sender_handler(redis_conn_pubsub, ws_conn, client_id, group_id)
# future_sender = asyncio.create_task(sender_task())
# async def reader(channel: redis.client.PubSub):
# while True:
# message = await channel.get_message(ignore_subscribe_messages=True)
# if message is not None:
# log.debug(f'(Reader) Message Received: {message}')
# log.info('WHAT NOW???')
# if message['data'].decode() == STOPWORD:
# log.debug('(Reader) STOP')
# break
# r = redis.from_url('redis://localhost')
# r = redis.Redis(host=settings.REDIS['server'], port=settings.REDIS['port'], db=6, password=None, encoding='utf-8', decode_responses=True)
# async with r.pubsub() as pubsub:
# await pubsub.subscribe('channel:1', 'channel:2', 'chat:c')
# future = asyncio.create_task(reader(pubsub))
# await r.publish('channel:1', 'Hello')
# await r.publish('channel:2', 'World')
# await r.publish('channel:1', STOPWORD)
# await future
# @app.on_event('shutdown')
# async def shutdown():
# log.setLevel(logging.INFO) # DEBUG, INFO, WARN, WARNING, ERROR, EXCEPTION, CRITICAL
# log.debug(locals())
# log.info('The Aether FastAPI API is shutting down...')
# #await database.disconnect()
# r.close()

View File

@@ -4,11 +4,6 @@ from pydantic import BaseModel, EmailStr, Field
from typing import Dict, List, Optional, Set, Union from typing import Dict, List, Optional, Set, Union
import asyncio, base64, datetime, hashlib, json, os, pathlib, shutil, time import asyncio, base64, datetime, hashlib, json, os, pathlib, shutil, time
import redis.asyncio as redis import redis.asyncio as redis
# import aioredis, redis, asyncio, base64, datetime, hashlib, json, os, pathlib, shutil, time
# from aioredis import from_url, Redis
# import asyncio
# import aioredis
# import async_timeout
from app.lib_general import log, logging, common_route_params, Common_Route_Params, common_route_params_min, Common_Route_Params_Min from app.lib_general import log, logging, common_route_params, Common_Route_Params, common_route_params_min, Common_Route_Params_Min
from app.config import settings from app.config import settings
@@ -16,579 +11,6 @@ from app.config import settings
router = APIRouter() router = APIRouter()
class ConnectionManager:
def __init__(self):
# NOTE: The active_connections list should be in Redis
self.active_connections: List[WebSocket] = []
async def connect(self, websocket: WebSocket):
await websocket.accept()
log.setLevel(logging.DEBUG)
log.info('WS connect: Header Security Key:'+websocket.headers.get('sec-websocket-key'))
# log.debug(dir(websocket))
# log.debug(websocket.client) # the Address(host=ip, port=number)
# log.info('WS Key: '+websocket.headers.get('sec-websocket-key'))
# log.debug(websocket.headers) # Most useful
# log.debug(websocket.app) # Nope
# log.debug(websocket.app()) # Nope
# log.debug(websocket.session) # Nope
# log.debug(websocket.session()) # Nope
# log.debug(websocket.user) # Nope
# log.debug(websocket.values()) # Not sure if useful
self.active_connections.append(websocket)
log.debug(self.active_connections)
redis_conn = redis.Redis(host=settings.REDIS['server'], port=settings.REDIS['port'], db=6, password=None, encoding='utf-8', decode_responses=True)
# message = {'hello': 'world', 'goodbye': 'test'}
# data = json.loads(message)
ws_header_key = websocket.headers.get('sec-websocket-key')
key_name = f'ws:sec_key:{ws_header_key}'
data = {'ws_header_key': ws_header_key}
redis_conn.setex(key_name, datetime.timedelta(minutes=90), json.dumps(data))
r_entry = json.loads(redis_conn.get(key_name)) # For testing...
r_ttl = redis_conn.ttl(key_name) # For testing...
# log.debug(r_entry) # For testing...
# log.debug(r_entry.get('ws_header_key')) # For testing...
# log.debug(r_ttl) # For testing...
# r_entries = redis.scan()
# log.debug(r_entries)
redis_conn.close()
# await redis.close()
async def connect_w_client_id(self, websocket: WebSocket, client_id: str, group_id = None):
await websocket.accept()
log.setLevel(logging.DEBUG)
log.info('WS connect: Header Security Key:'+websocket.headers.get('sec-websocket-key'))
self.active_connections.append(websocket)
log.debug(self.active_connections)
redis_conn = redis.Redis(host=settings.REDIS['server'], port=settings.REDIS['port'], db=6, password=None, encoding='utf-8', decode_responses=True)
# # ### Redis ### Testing One
# ws_header_key = websocket.headers.get('sec-websocket-key')
# key_name = f'ws:sec_key:{ws_header_key}'
# data = {'ws_header_key': ws_header_key}
# redis_conn.setex(key_name, datetime.timedelta(minutes=90), json.dumps(data))
# r_entry = json.loads(redis_conn.get(key_name)) # For testing...
# r_ttl = redis_conn.ttl(key_name) # For testing...
# # log.debug(r_entry) # For testing...
# # log.debug(r_entry.get('ws_header_key')) # For testing...
# # log.debug(r_ttl) # For testing...
# # ### Redis ### Testing Two!!
# key_name = f'ws:conn:{ws_header_key}'
# data = websocket
# redis_conn.setex(key_name, datetime.timedelta(minutes=90), json.dumps(data))
# r_entry = json.loads(redis_conn.get(key_name)) # For testing...
# log.debug(r_entry) # For testing...
# ### Redis ### Testing Three!!!
ws_header_key = websocket.headers.get('sec-websocket-key')
# # Set WS connection in Redis based on WS header security key
# key_name = f'ws:header_key:{ws_header_key}'
# data = {'ws_conn': websocket}
# redis_conn.setex(key_name, datetime.timedelta(minutes=90), json.dumps(data))
# Get new ws_header_key and group_id
ws_header_key = websocket.headers.get('sec-websocket-key')
group_id = websocket.path_params.get('group_id')
# Figure out key_name
key_name = f'ws:group_id:{group_id}'
# Get the current list from Redis entry
# group_id_ws_conn_li = []
# group_id_ws_conn_li = redis_conn.get(key_name, [])
# Add the new WS connection's header key to the list
redis_conn.lpush(key_name, ws_header_key)
# group_id_ws_conn_li.append(ws_header_key)
# data = group_id_ws_conn_li
# redis_conn.hsetex(key_name, datetime.timedelta(minutes=90), json.dumps(data))
# r_entry = json.loads(redis_conn.lget(key_name)) # For testing...
log.debug('BEGIN: Loop through list key')
log.debug(redis_conn.llen(key_name))
# for i in range(0, redis_conn.llen(key_name)):
# log.debug(redis_conn.lindex(key_name, i))
log.debug('END: Loop through list key')
redis_conn.close()
def disconnect(self, websocket: WebSocket):
log.setLevel(logging.DEBUG)
log.info('WS disconnect')
self.active_connections.remove(websocket)
# Targets: echo, direct, group, broadcast
# send_ text, bytes, json
# receive_ text, bytes, json
async def echo(self, message: str, websocket: WebSocket):
log.setLevel(logging.DEBUG)
# log.debug(dir(websocket))
log.debug(vars(websocket))
log.debug(websocket.url)
log.debug(websocket.client)
log.debug(websocket.client_state)
log.debug(websocket.headers['sec-websocket-key'])
# log.debug(base64.decode(bytes(websocket.headers['sec-websocket-key']), 'utf-8'))
await websocket.send_text(message)
async def direct(self, from_client_id: str, to_client_id: str, data: dict):
log.setLevel(logging.DEBUG)
for connection in self.active_connections:
log.debug(vars(connection))
log.debug(connection)
await connection.send_text(message)
async def group(self, group_id: str, data: dict):
log.setLevel(logging.INFO)
log.debug(locals())
# Single process version...
# for connection in self.active_connections:
# log.debug(vars(connection))
# # websocket.path_params.get('client_id') # Testing...
# # if connection.scope.get('path') == group_id: # Testing...
# if connection.path_params.get('group_id') == group_id:
# log.info('Found matching Group ID')
# await connection.send_json(data)
# Redis version...
redis_conn = redis.Redis(host=settings.REDIS['server'], port=settings.REDIS['port'], db=6, password=None, encoding='utf-8', decode_responses=True)
# Figure out key_name
key_name = f'ws:group_id:{group_id}'
log.debug('BEGIN: Loop through Redis list entry')
log.debug(redis_conn.llen(key_name))
# ws_header_key = connection.headers.get('sec-websocket-key')
# ws_header_key = f'ws:header_key:{ws_header_key}'
# log.debug(f'Looking for: {ws_header_key}')
for i in range(0, redis_conn.llen(key_name)):
log.debug(f'{i}: {redis_conn.lindex(key_name, i)}')
redis_ws_group_ws_header_key = redis_conn.lindex(key_name, i)
# Check if they key
# redis_key_name_ws_conn = f'ws:header_key:{redis_ws_group_ws_header_key}'
# ws_conn_data = json.loads(redis_conn.get('redis_key_name_ws_conn'))
# active_ws_conn = ws_conn_data.get('ws_conn')
# log.debug(active_ws_conn)
for active_ws_conn in self.active_connections:
log.debug(vars(active_ws_conn))
ws_header_key = active_ws_conn.headers.get('sec-websocket-key')
if redis_ws_group_ws_header_key == ws_header_key:
log.info(f'Found key in Redis list!')
await active_ws_conn.send_json(data)
else:
log.debug(f'Key not found in Redis list')
log.debug('END: Loop through Redis list entry')
redis_conn.close()
# NOTE: Same as group, but no filter based on path
async def broadcast(self, message: str):
log.setLevel(logging.INFO)
log.debug(locals())
for connection in self.active_connections:
log.debug(vars(connection))
await connection.send_text(message)
async def send_personal_message(self, message: str, websocket: WebSocket):
log.setLevel(logging.DEBUG)
# log.debug(dir(websocket))
log.debug(vars(websocket))
log.debug(websocket.url)
log.debug(websocket.client)
log.debug(websocket.client_state)
log.debug(websocket.headers['sec-websocket-key'])
# log.debug(base64.decode(bytes(websocket.headers['sec-websocket-key']), 'utf-8'))
await websocket.send_text(message)
# manager = ConnectionManager()
# class AIOConnectionManager:
# def __init__(self):
# # NOTE: The active_connections list should be in Redis
# self.active_connections: List[WebSocket] = []
# async def connect(self, websocket: WebSocket):
# await websocket.accept()
# log.setLevel(logging.DEBUG)
# log.info('WS connect: Header Security Key:'+websocket.headers.get('sec-websocket-key'))
# # log.debug(websocket.client) # the Address(host=ip, port=number)
# # log.info('WS Key: '+websocket.headers.get('sec-websocket-key'))
# # log.debug(websocket.headers) # Most useful
# self.active_connections.append(websocket)
# log.debug(self.active_connections)
# redis_conn = aioredis.Redis(host=settings.REDIS['server'], port=settings.REDIS['port'], db=6, password=None, encoding='utf-8', decode_responses=True)
# # message = {'hello': 'world', 'goodbye': 'test'}
# # data = json.loads(message)
# ws_header_key = websocket.headers.get('sec-websocket-key')
# key_name = f'ws:sec_key:{ws_header_key}'
# data = {'ws_header_key': ws_header_key}
# await redis_conn.setex(key_name, datetime.timedelta(minutes=90), json.dumps(data))
# r_entry = json.loads(await redis_conn.get(key_name)) # For testing...
# r_ttl = await redis_conn.ttl(key_name) # For testing...
# # log.debug(r_entry) # For testing...
# # log.debug(r_entry.get('ws_header_key')) # For testing...
# # log.debug(r_ttl) # For testing...
# # r_entries = redis.scan()
# # log.debug(r_entries)
# redis_conn.close()
# # await redis.close()
# async def connect_w_client_id(self, websocket: WebSocket, client_id: str, group_id = None):
# await websocket.accept()
# log.setLevel(logging.DEBUG)
# log.info('WS connect: Header Security Key:'+websocket.headers.get('sec-websocket-key'))
# self.active_connections.append(websocket)
# log.debug(self.active_connections)
# redis_conn = redis.Redis(host=settings.REDIS['server'], port=settings.REDIS['port'], db=6, password=None, encoding='utf-8', decode_responses=True)
# # # ### Redis ### Testing One
# # ws_header_key = websocket.headers.get('sec-websocket-key')
# # key_name = f'ws:sec_key:{ws_header_key}'
# # data = {'ws_header_key': ws_header_key}
# # redis_conn.setex(key_name, datetime.timedelta(minutes=90), json.dumps(data))
# # r_entry = json.loads(redis_conn.get(key_name)) # For testing...
# # r_ttl = redis_conn.ttl(key_name) # For testing...
# # # log.debug(r_entry) # For testing...
# # # log.debug(r_entry.get('ws_header_key')) # For testing...
# # # log.debug(r_ttl) # For testing...
# # # ### Redis ### Testing Two!!
# # key_name = f'ws:conn:{ws_header_key}'
# # data = websocket
# # redis_conn.setex(key_name, datetime.timedelta(minutes=90), json.dumps(data))
# # r_entry = json.loads(redis_conn.get(key_name)) # For testing...
# # log.debug(r_entry) # For testing...
# # ### Redis ### Testing Three!!!
# ws_header_key = websocket.headers.get('sec-websocket-key')
# # # Set WS connection in Redis based on WS header security key
# # key_name = f'ws:header_key:{ws_header_key}'
# # data = {'ws_conn': websocket}
# # redis_conn.setex(key_name, datetime.timedelta(minutes=90), json.dumps(data))
# # Get new ws_header_key and group_id
# ws_header_key = websocket.headers.get('sec-websocket-key')
# group_id = websocket.path_params.get('group_id')
# # Figure out key_name
# key_name = f'ws:group_id:{group_id}'
# # Get the current list from Redis entry
# # group_id_ws_conn_li = []
# # group_id_ws_conn_li = redis_conn.get(key_name, [])
# # Add the new WS connection's header key to the list
# redis_conn.lpush(key_name, ws_header_key)
# # group_id_ws_conn_li.append(ws_header_key)
# # data = group_id_ws_conn_li
# # redis_conn.hsetex(key_name, datetime.timedelta(minutes=90), json.dumps(data))
# # r_entry = json.loads(redis_conn.lget(key_name)) # For testing...
# log.debug('BEGIN: Loop through list key')
# log.debug(redis_conn.llen(key_name))
# # for i in range(0, redis_conn.llen(key_name)):
# # log.debug(redis_conn.lindex(key_name, i))
# log.debug('END: Loop through list key')
# redis_conn.close()
# def disconnect(self, websocket: WebSocket):
# log.setLevel(logging.DEBUG)
# log.info('WS disconnect')
# self.active_connections.remove(websocket)
# # Targets: echo, direct, group, broadcast
# # send_ text, bytes, json
# # receive_ text, bytes, json
# async def echo(self, message: str, websocket: WebSocket):
# log.setLevel(logging.DEBUG)
# # log.debug(dir(websocket))
# log.debug(vars(websocket))
# log.debug(websocket.url)
# log.debug(websocket.client)
# log.debug(websocket.client_state)
# log.debug(websocket.headers['sec-websocket-key'])
# # log.debug(base64.decode(bytes(websocket.headers['sec-websocket-key']), 'utf-8'))
# await websocket.send_text(message)
# async def direct(self, from_client_id: str, to_client_id: str, data: dict):
# log.setLevel(logging.DEBUG)
# for connection in self.active_connections:
# log.debug(vars(connection))
# log.debug(connection)
# await connection.send_text(message)
# async def group(self, group_id: str, data: dict):
# log.setLevel(logging.INFO)
# log.debug(locals())
# # Single process version...
# # for connection in self.active_connections:
# # log.debug(vars(connection))
# # # websocket.path_params.get('client_id') # Testing...
# # # if connection.scope.get('path') == group_id: # Testing...
# # if connection.path_params.get('group_id') == group_id:
# # log.info('Found matching Group ID')
# # await connection.send_json(data)
# # Redis version...
# redis_conn = redis.Redis(host=settings.REDIS['server'], port=settings.REDIS['port'], db=6, password=None, encoding='utf-8', decode_responses=True)
# # Figure out key_name
# key_name = f'ws:group_id:{group_id}'
# log.debug('BEGIN: Loop through Redis list entry')
# log.debug(redis_conn.llen(key_name))
# # ws_header_key = connection.headers.get('sec-websocket-key')
# # ws_header_key = f'ws:header_key:{ws_header_key}'
# # log.debug(f'Looking for: {ws_header_key}')
# for i in range(0, redis_conn.llen(key_name)):
# log.debug(f'{i}: {redis_conn.lindex(key_name, i)}')
# redis_ws_group_ws_header_key = redis_conn.lindex(key_name, i)
# # Check if they key
# # redis_key_name_ws_conn = f'ws:header_key:{redis_ws_group_ws_header_key}'
# # ws_conn_data = json.loads(redis_conn.get('redis_key_name_ws_conn'))
# # active_ws_conn = ws_conn_data.get('ws_conn')
# # log.debug(active_ws_conn)
# for active_ws_conn in self.active_connections:
# log.debug(vars(active_ws_conn))
# ws_header_key = active_ws_conn.headers.get('sec-websocket-key')
# if redis_ws_group_ws_header_key == ws_header_key:
# log.info(f'Found key in Redis list!')
# await active_ws_conn.send_json(data)
# else:
# log.debug(f'Key not found in Redis list')
# log.debug('END: Loop through Redis list entry')
# redis_conn.close()
# # NOTE: Same as group, but no filter based on path
# async def broadcast(self, message: str):
# log.setLevel(logging.INFO)
# log.debug(locals())
# for connection in self.active_connections:
# log.debug(vars(connection))
# await connection.send_text(message)
# async def send_personal_message(self, message: str, websocket: WebSocket):
# log.setLevel(logging.DEBUG)
# # log.debug(dir(websocket))
# log.debug(vars(websocket))
# log.debug(websocket.url)
# log.debug(websocket.client)
# log.debug(websocket.client_state)
# log.debug(websocket.headers['sec-websocket-key'])
# # log.debug(base64.decode(bytes(websocket.headers['sec-websocket-key']), 'utf-8'))
# await websocket.send_text(message)
# aio_manager = AIOConnectionManager()
# Endpoints for???
# /room/<id> (just a group of clients; for a related group like a poster presenter or session room)
# /client/<id> (for one specific client/browser; something specific to a browser???)
# /person/<id> (for one specific person; handles send and receiving their messages)
# @router.websocket('/ws_redis/group/{group_id}/client/{client_id}')
# async def ws_client_id(
# websocket: WebSocket,
# group_id: str,
# client_id: str,
# ):
# await manager.connect_w_client_id(websocket, client_id, group_id)
# # await aio_manager.connect_w_client_id(websocket, client_id, group_id)
# try:
# while True:
# data = await websocket.receive_json() # Returns dict
# # log.debug(data)
# # group_path_id = f'/ws/group/{group_id}'
# # client_id = data.get('client_id')
# # echo (echo message), dm (direct message), group (group message), all (broadcast message to all), cmd, group_cmd(?)
# msg_type = data.get('type')
# cmd = data.get('cmd')
# msg = data.get('msg')
# log.setLevel(logging.INFO)
# log.info(f'Group ID: {group_id}; Client ID: {client_id}; Type: {msg_type};')
# log.debug(f'Command: {cmd}')
# log.debug(f'Message: {msg}')
# data['client_id'] = client_id
# data['group_id'] = group_id
# await manager.group(group_id=group_id, data=data)
# # await aio_manager.group(group_id=group_id, data=data)
# await redis_connector(websocket)
# except WebSocketDisconnect:
# manager.disconnect(websocket)
# # aio_manager.disconnect(websocket)
# # await manager.broadcast(f'Client #{client_id} left')
# @router.websocket('/ws_redis/looping')
# async def ws_looping(
# websocket: WebSocket,
# ):
# await manager.connect(websocket)
# # await manager.broadcast(f'Welcome to looping')
# try:
# while True:
# # NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING
# # await time.sleep(3.5) # NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING
# # NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING
# # data = await websocket.receive_json()
# # log.debug(data)
# # data_dict = data
# # data_dict = json.loads(data)
# # log.debug(data_dict['client_id'])
# # await manager.send_personal_message(f'You wrote: {data}', websocket)
# await manager.broadcast(f'Loop!!!')
# except WebSocketDisconnect:
# manager.disconnect(websocket)
# await manager.broadcast(f'Client left looping')
# @router.websocket("/ws_redis/{client_id}")
# async def websocket_endpoint(
# websocket: WebSocket,
# client_id: int,
# response: Response = Response,
# ):
# log.setLevel(logging.DEBUG)
# log.debug(locals())
# log.info('Root of ws. Waiting to accept a websocket and then the redis_connector')
# await websocket.accept()
# await redis_connector(websocket)
# await manager.connect_w_client_id(websocket, client_id, group_id)
# # await aio_manager.connect_w_client_id(websocket, client_id, group_id)
# try:
# while True:
# data = await websocket.receive_json() # Returns dict
# # log.debug(data)
# # group_path_id = f'/ws/group/{group_id}'
# # client_id = data.get('client_id')
# # echo (echo message), dm (direct message), group (group message), all (broadcast message to all), cmd, group_cmd(?)
# msg_type = data.get('type')
# cmd = data.get('cmd')
# msg = data.get('msg')
# log.setLevel(logging.INFO)
# log.info(f'Group ID: {group_id}; Client ID: {client_id}; Type: {msg_type};')
# log.debug(f'Command: {cmd}')
# log.debug(f'Message: {msg}')
# data['client_id'] = client_id
# data['group_id'] = group_id
# await manager.group(group_id=group_id, data=data)
# # await aio_manager.group(group_id=group_id, data=data)
# await redis_connector(websocket)
# except WebSocketDisconnect:
# manager.disconnect(websocket)
# # aio_manager.disconnect(websocket)
# # await manager.broadcast(f'Client #{client_id} left')
@router.websocket('/ws/group/{group_id}/client/{client_id}') @router.websocket('/ws/group/{group_id}/client/{client_id}')
@router.websocket('/ws_redis/group/{group_id}/client/{client_id}') @router.websocket('/ws_redis/group/{group_id}/client/{client_id}')
async def redis_ws_client_id( async def redis_ws_client_id(
@@ -596,7 +18,7 @@ async def redis_ws_client_id(
group_id: str, group_id: str,
client_id: str, client_id: str,
): ):
log.setLevel(logging.DEBUG) log.setLevel(logging.INFO)
log.debug(locals()) log.debug(locals())
log.info('WS Redis: Group ID and Client ID. Waiting to "accept" a websocket and then the "redis_connector"') log.info('WS Redis: Group ID and Client ID. Waiting to "accept" a websocket and then the "redis_connector"')
@@ -611,11 +33,17 @@ async def redis_connector(
client_id: str, client_id: str,
group_id = None group_id = None
): ):
log.setLevel(logging.DEBUG) log.setLevel(logging.INFO)
log.debug(locals()) log.debug(locals())
# The receives WS JSON data. Then publishes to Redis channel. # The receives WS JSON data. Then publishes to Redis channel.
async def receiver_handler(ws_conn: WebSocket, r_conn, client_id, group_id): async def receiver_handler(
ws_conn: WebSocket,
r_conn,
client_id = None,
group_id = None,
):
log.setLevel(logging.INFO)
log.info('receiver_handler()') log.info('receiver_handler()')
# Loop until the WS is disconnected # Loop until the WS is disconnected
@@ -636,20 +64,20 @@ async def redis_connector(
log.debug(f'Command: {cmd}') log.debug(f'Command: {cmd}')
log.debug(f'Message: {msg}') log.debug(f'Message: {msg}')
data['client_id'] = client_id if client_id:
data['group_id'] = group_id data['client_id'] = client_id
log.setLevel(logging.DEBUG) if group_id:
data['group_id'] = group_id
# log.setLevel(logging.DEBUG)
log.debug(data) log.debug(data)
if data: if data:
#logging.info(ws) # logging.info(ws)
#logging.info(dir(message)) # logging.info(dir(message))
# log.debug(await r.get('chat:c')) # log.debug(await r_conn.get('channel:ws'))
await r_conn.publish('chat:c', json.dumps(data))
# log.debug(await r_conn.get_message()) # log.debug(await r_conn.get_message())
await r_conn.publish('channel:ws', json.dumps(data))
except WebSocketDisconnect as exc: except WebSocketDisconnect as exc:
# TODO: This needs handling better? # TODO: This needs handling better?
# FUTURE: Remove from list of connected clients? # FUTURE: Remove from list of connected clients?
@@ -660,33 +88,38 @@ async def redis_connector(
log.error(e) log.error(e)
await r_conn.close() await r_conn.close()
# log.debug(await r.get('chat:c'))
# Also called the Redis reader for a channel subscription # Also called the Redis reader for a channel subscription
# This watches a Redis channel. Then sends out the message. # This watches a Redis channel. Then sends out the message.
async def sender_handler(r_channel, ws_conn: WebSocket, client_id, group_id): async def sender_handler(
r_channel,
ws_conn: WebSocket,
client_id = None,
group_id = None,
):
log.setLevel(logging.INFO)
log.info('sender_handler()') log.info('sender_handler()')
try: try:
while True: while True:
message = await r_channel.get_message(ignore_subscribe_messages=True) message = await r_channel.get_message(ignore_subscribe_messages=True)
if message is not None: if message is not None:
log.debug(f'(Redis Reader) Message Received: {message}') log.debug(f'(Redis Reader) Message Received: {message}')
data = json.loads(message['data']) data = json.loads(message['data'])
# Need to make sure the ws_connection is still open before trying to send. # Need to make sure the ws_connection is still open before trying to send.
# log.debug(ws_conn.open())
# log.debug(ws_conn.closed)
log.debug(vars(ws_conn)) log.debug(vars(ws_conn))
log.debug(ws_conn.client_state) # log.debug(ws_conn.client_state)
log.debug(vars(ws_conn.client_state)) log.debug(ws_conn.client_state.name)
log.debug(ws_conn.application_state)
log.debug(vars(ws_conn.application_state))
# 'WebSocketState.CONNECTED' or 'WebSocketState.DISCONNECTED' # 'WebSocketState.CONNECTED' or 'WebSocketState.DISCONNECTED'
if ws_conn.client_state == 'WebSocketState.CONNECTED': # if ws_conn.client_state.CONNECTED:
# if ws_conn.client_state.value == 'WebSocketState.CONNECTED':
# if ws_conn.client_state == WebSocketState.CONNECTED:
if ws_conn.client_state.name == 'CONNECTED':
log.info(f'WS client ({client_id}) connected!')
if group_id and group_id == data.get('group_id'): if group_id and group_id == data.get('group_id'):
log.info('Sending to matching group ID') log.info('Sending to matching group ID')
await ws_conn.send_json(data) await ws_conn.send_json(data)
@@ -712,30 +145,31 @@ async def redis_connector(
# except redis.exceptions.ConnectionError as e: # except redis.exceptions.ConnectionError as e:
# TODO: This needs handling better? # TODO: This needs handling better?
# log.error(e) # log.error(e)
# except WebSocketDisconnect as ws_disconn_e:
# log.error(ws_disconn_e)
except Exception as e: except Exception as e:
log.exception('Exception in Aether!') log.exception('Exception in Aether!')
log.error(e) log.error(e)
# raise RuntimeError(msg % message_type) # raise RuntimeError(msg % message_type)
# RuntimeError: Unexpected ASGI message 'websocket.send', after sending 'websocket.close'. # RuntimeError: Unexpected ASGI message 'websocket.send', after sending 'websocket.close'.
# raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR) from None # raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR) from None
# redis.exceptions.ConnectionError: Connection closed by server. # redis.exceptions.ConnectionError: Connection closed by server.
# Redis client bound to pool of connections (auto-reconnecting). # Redis client bound to pool of connections (auto-reconnecting).
log.info('Create async Redis connection') log.info('Create async Redis connection')
redis_conn = redis.Redis(host=settings.REDIS['server'], port=settings.REDIS['port'], db=6, password=None, encoding='utf-8', decode_responses=True) redis_conn = redis.Redis(host=settings.REDIS['server'], port=settings.REDIS['port'], db=6, password=None, encoding='utf-8', decode_responses=True)
redis_conn_pubsub = redis_conn.pubsub() redis_conn_pubsub = redis_conn.pubsub()
await redis_conn_pubsub.subscribe('channel:1', 'channel:2', 'chat:c') await redis_conn_pubsub.subscribe('channel:ws', 'channel:1', 'channel:2', 'chat:c')
log.info('Run receiver_task') log.info('Create receiver task')
future_receiver = asyncio.create_task(receiver_handler(ws_conn, redis_conn, client_id, group_id)) future_receiver = asyncio.create_task(receiver_handler(ws_conn, redis_conn, client_id, group_id))
log.info('Run sender_task') log.info('Create sender task')
future_sender = asyncio.create_task(sender_handler(redis_conn_pubsub, ws_conn, client_id, group_id)) future_sender = asyncio.create_task(sender_handler(redis_conn_pubsub, ws_conn, client_id, group_id))
await future_receiver # Get WS messages and set in Redis await future_receiver # Get WS messages and set in Redis