From 6172626254b6ec229460fdae6f77bee7733252da Mon Sep 17 00:00:00 2001 From: Scott Idem Date: Fri, 7 Apr 2023 17:09:51 -0400 Subject: [PATCH] Work on websockets. Finally 97.25% working? --- app/main.py | 18 +- app/methods/data_store_methods.py | 4 +- app/routers/websockets.py | 86 ---- app/routers/websockets_redis.py | 646 ++---------------------------- 4 files changed, 51 insertions(+), 703 deletions(-) diff --git a/app/main.py b/app/main.py index 854bdac..2d60a4a 100644 --- a/app/main.py +++ b/app/main.py @@ -18,7 +18,7 @@ from . import config from app.log import log, logging # Import the routers here first: -from app.routers import aether_cfg, api_crud, api, importing, sql, account, activity_log, address, archive, archive_content, contact, cont_edu_cert, cont_edu_cert_person, data_store, event, event_abstract, event_badge, event_badge_importing, event_badge_template, event_device, event_exhibit, event_exhibit_tracking, event_file, event_importing, event_location, event_person, event_person_detail, event_person_tracking, event_presentation, event_presenter, event_registration, event_session, flask_cfg, fundraising, hosted_file, journal, journal_entry, log_client_viewing, lookup, membership_cfg, membership_group, membership_person_group, membership_person, membership_person_profile, membership_type, membership_person_type, order, order_v3, order_line, order_cart, organization, page, person, person_user, post, post_comment, product, qr, site, site_domain, user, websockets, websockets_redis, e_cvent, c_idaa, e_impexium, e_stripe +from app.routers import aether_cfg, api_crud, api, importing, sql, account, activity_log, address, archive, archive_content, contact, cont_edu_cert, cont_edu_cert_person, data_store, event, event_abstract, event_badge, event_badge_importing, event_badge_template, event_device, event_exhibit, event_exhibit_tracking, event_file, event_importing, event_location, event_person, event_person_detail, event_person_tracking, event_presentation, event_presenter, event_registration, event_session, flask_cfg, fundraising, hosted_file, journal, journal_entry, log_client_viewing, lookup, membership_cfg, membership_group, membership_person_group, membership_person, membership_person_profile, membership_type, membership_person_type, order, order_v3, order_line, order_cart, organization, page, person, person_user, post, post_comment, product, qr, site, site_domain, user, websockets_redis, e_cvent, c_idaa, e_impexium, e_stripe from app.db_sql import sql_select # , sql_connect @@ -385,16 +385,16 @@ app.include_router( user.router, tags=['User'], ) -app.include_router( - websockets.router, - #prefix='/websocket', - tags=['Websockets'], - #dependencies=[Depends(get_token_header)], - #responses={404: {'description': 'Not found'}}, -) +# app.include_router( +# websockets.router, +# # prefix='/websocket', +# tags=['Websockets'], +# # dependencies=[Depends(get_token_header)], +# # responses={404: {'description': 'Not found'}}, +# ) app.include_router( websockets_redis.router, - tags=['Websockets Redis'], + tags=['Websockets (Redis)'], ) app.include_router( diff --git a/app/methods/data_store_methods.py b/app/methods/data_store_methods.py index 65b1d86..2ced95b 100644 --- a/app/methods/data_store_methods.py +++ b/app/methods/data_store_methods.py @@ -70,13 +70,13 @@ def load_data_store_obj_w_code( data['code'] = code log.debug(data) - log.warning(f'Can we get past this?????????? {code}') + # log.warning(f'Can we get past this?????????? {code}') sql_enabled, data['enable'] = sql_enable_part(table_name='data_store', enabled=enabled) # Reasonably safe return str and bool sql_limit = sql_limit_offset_part(limit=limit, offset=offset) # Reasonably safe return str log.debug(data) - log.warning(f'Where are we now??????????? {code}') + # log.warning(f'Where are we now??????????? {code}') sql = f""" SELECT * diff --git a/app/routers/websockets.py b/app/routers/websockets.py index 8ff6f56..7ac219b 100644 --- a/app/routers/websockets.py +++ b/app/routers/websockets.py @@ -764,89 +764,3 @@ async def redis_connector( task.cancel() await redis.close() # await redis.wait_closed() - - - -#### #### - -# @app.on_event('shutdown') - # async def shutdown(): - # log.info('The Aether FastAPI API is shutting down...') - # # await ws_conn.close() - # await redis_conn.close() - - # pending_tasks = asyncio.all_tasks(loop=event_loop) - - # # cancel each pending task - # for task in pending_tasks: - # logger.info(f'killing task {task.get_coro()}') - # event_loop.call_soon_threadsafe(task.cancel) - - # # This does not work for some reason. - # # "Passing coroutines is forbidden, use tasks explicitly." - # log.info('Start asyncio wait') - # done, pending = await asyncio.wait( - # [receiver_task, producer_task], - # return_when = asyncio.FIRST_COMPLETED, - # ) - # log.debug(f'Done task: {done}') - - # for task in pending: - # log.debug(f'Canceling task: {task}') - # task.cancel() - # await redis_conn.close() - # await redis_conn.wait_closed() - - - - # (channel,) = await r.subscribe('chat:c') - # assert isinstance(channel, redis.Channel) - # try: - # while True: - # data = await channel.get() - # if data: - # await ws_conn.send_json(data) - # except Exception as exc: - # # TODO this needs handling better - # log.error(exc) - - # receiver_task = receiver_handler(ws_conn, redis_conn, client_id, group_id) - - # future_receiver = asyncio.create_task(receiver_task()) - - # sender_task = sender_handler(redis_conn_pubsub, ws_conn, client_id, group_id) - - # future_sender = asyncio.create_task(sender_task()) - - -# async def reader(channel: redis.client.PubSub): -# while True: -# message = await channel.get_message(ignore_subscribe_messages=True) -# if message is not None: -# log.debug(f'(Reader) Message Received: {message}') -# log.info('WHAT NOW???') -# if message['data'].decode() == STOPWORD: -# log.debug('(Reader) STOP') -# break - -# r = redis.from_url('redis://localhost') -# r = redis.Redis(host=settings.REDIS['server'], port=settings.REDIS['port'], db=6, password=None, encoding='utf-8', decode_responses=True) -# async with r.pubsub() as pubsub: - # await pubsub.subscribe('channel:1', 'channel:2', 'chat:c') - - # future = asyncio.create_task(reader(pubsub)) - - # await r.publish('channel:1', 'Hello') - # await r.publish('channel:2', 'World') - # await r.publish('channel:1', STOPWORD) - - # await future - -# @app.on_event('shutdown') -# async def shutdown(): -# log.setLevel(logging.INFO) # DEBUG, INFO, WARN, WARNING, ERROR, EXCEPTION, CRITICAL -# log.debug(locals()) - -# log.info('The Aether FastAPI API is shutting down...') -# #await database.disconnect() -# r.close() \ No newline at end of file diff --git a/app/routers/websockets_redis.py b/app/routers/websockets_redis.py index 8406af7..cdf55f6 100644 --- a/app/routers/websockets_redis.py +++ b/app/routers/websockets_redis.py @@ -4,11 +4,6 @@ from pydantic import BaseModel, EmailStr, Field from typing import Dict, List, Optional, Set, Union import asyncio, base64, datetime, hashlib, json, os, pathlib, shutil, time import redis.asyncio as redis -# import aioredis, redis, asyncio, base64, datetime, hashlib, json, os, pathlib, shutil, time -# from aioredis import from_url, Redis -# import asyncio -# import aioredis -# import async_timeout from app.lib_general import log, logging, common_route_params, Common_Route_Params, common_route_params_min, Common_Route_Params_Min from app.config import settings @@ -16,579 +11,6 @@ from app.config import settings router = APIRouter() -class ConnectionManager: - def __init__(self): - # NOTE: The active_connections list should be in Redis - self.active_connections: List[WebSocket] = [] - - async def connect(self, websocket: WebSocket): - await websocket.accept() - log.setLevel(logging.DEBUG) - log.info('WS connect: Header Security Key:'+websocket.headers.get('sec-websocket-key')) - # log.debug(dir(websocket)) - # log.debug(websocket.client) # the Address(host=ip, port=number) - # log.info('WS Key: '+websocket.headers.get('sec-websocket-key')) - # log.debug(websocket.headers) # Most useful - # log.debug(websocket.app) # Nope - # log.debug(websocket.app()) # Nope - # log.debug(websocket.session) # Nope - # log.debug(websocket.session()) # Nope - # log.debug(websocket.user) # Nope - # log.debug(websocket.values()) # Not sure if useful - - self.active_connections.append(websocket) - log.debug(self.active_connections) - - redis_conn = redis.Redis(host=settings.REDIS['server'], port=settings.REDIS['port'], db=6, password=None, encoding='utf-8', decode_responses=True) - - # message = {'hello': 'world', 'goodbye': 'test'} - # data = json.loads(message) - - ws_header_key = websocket.headers.get('sec-websocket-key') - key_name = f'ws:sec_key:{ws_header_key}' - data = {'ws_header_key': ws_header_key} - - redis_conn.setex(key_name, datetime.timedelta(minutes=90), json.dumps(data)) - - r_entry = json.loads(redis_conn.get(key_name)) # For testing... - r_ttl = redis_conn.ttl(key_name) # For testing... - - # log.debug(r_entry) # For testing... - # log.debug(r_entry.get('ws_header_key')) # For testing... - # log.debug(r_ttl) # For testing... - - - - # r_entries = redis.scan() - # log.debug(r_entries) - - redis_conn.close() - # await redis.close() - - async def connect_w_client_id(self, websocket: WebSocket, client_id: str, group_id = None): - await websocket.accept() - log.setLevel(logging.DEBUG) - log.info('WS connect: Header Security Key:'+websocket.headers.get('sec-websocket-key')) - - self.active_connections.append(websocket) - log.debug(self.active_connections) - - redis_conn = redis.Redis(host=settings.REDIS['server'], port=settings.REDIS['port'], db=6, password=None, encoding='utf-8', decode_responses=True) - - # # ### Redis ### Testing One - # ws_header_key = websocket.headers.get('sec-websocket-key') - # key_name = f'ws:sec_key:{ws_header_key}' - # data = {'ws_header_key': ws_header_key} - - # redis_conn.setex(key_name, datetime.timedelta(minutes=90), json.dumps(data)) - - # r_entry = json.loads(redis_conn.get(key_name)) # For testing... - # r_ttl = redis_conn.ttl(key_name) # For testing... - - # # log.debug(r_entry) # For testing... - # # log.debug(r_entry.get('ws_header_key')) # For testing... - # # log.debug(r_ttl) # For testing... - - # # ### Redis ### Testing Two!! - # key_name = f'ws:conn:{ws_header_key}' - # data = websocket - - # redis_conn.setex(key_name, datetime.timedelta(minutes=90), json.dumps(data)) - - # r_entry = json.loads(redis_conn.get(key_name)) # For testing... - - # log.debug(r_entry) # For testing... - - - # ### Redis ### Testing Three!!! - ws_header_key = websocket.headers.get('sec-websocket-key') - - # # Set WS connection in Redis based on WS header security key - # key_name = f'ws:header_key:{ws_header_key}' - # data = {'ws_conn': websocket} - - # redis_conn.setex(key_name, datetime.timedelta(minutes=90), json.dumps(data)) - - # Get new ws_header_key and group_id - ws_header_key = websocket.headers.get('sec-websocket-key') - group_id = websocket.path_params.get('group_id') - - # Figure out key_name - key_name = f'ws:group_id:{group_id}' - - # Get the current list from Redis entry - - # group_id_ws_conn_li = [] - # group_id_ws_conn_li = redis_conn.get(key_name, []) - - # Add the new WS connection's header key to the list - redis_conn.lpush(key_name, ws_header_key) - - # group_id_ws_conn_li.append(ws_header_key) - - # data = group_id_ws_conn_li - - # redis_conn.hsetex(key_name, datetime.timedelta(minutes=90), json.dumps(data)) - - - # r_entry = json.loads(redis_conn.lget(key_name)) # For testing... - log.debug('BEGIN: Loop through list key') - log.debug(redis_conn.llen(key_name)) - - # for i in range(0, redis_conn.llen(key_name)): - # log.debug(redis_conn.lindex(key_name, i)) - - log.debug('END: Loop through list key') - - redis_conn.close() - - def disconnect(self, websocket: WebSocket): - log.setLevel(logging.DEBUG) - log.info('WS disconnect') - self.active_connections.remove(websocket) - - - # Targets: echo, direct, group, broadcast - # send_ text, bytes, json - # receive_ text, bytes, json - async def echo(self, message: str, websocket: WebSocket): - log.setLevel(logging.DEBUG) - # log.debug(dir(websocket)) - log.debug(vars(websocket)) - log.debug(websocket.url) - log.debug(websocket.client) - log.debug(websocket.client_state) - log.debug(websocket.headers['sec-websocket-key']) - # log.debug(base64.decode(bytes(websocket.headers['sec-websocket-key']), 'utf-8')) - await websocket.send_text(message) - - async def direct(self, from_client_id: str, to_client_id: str, data: dict): - log.setLevel(logging.DEBUG) - for connection in self.active_connections: - log.debug(vars(connection)) - log.debug(connection) - await connection.send_text(message) - - async def group(self, group_id: str, data: dict): - log.setLevel(logging.INFO) - log.debug(locals()) - - # Single process version... - # for connection in self.active_connections: - # log.debug(vars(connection)) - # # websocket.path_params.get('client_id') # Testing... - # # if connection.scope.get('path') == group_id: # Testing... - - # if connection.path_params.get('group_id') == group_id: - # log.info('Found matching Group ID') - # await connection.send_json(data) - - # Redis version... - redis_conn = redis.Redis(host=settings.REDIS['server'], port=settings.REDIS['port'], db=6, password=None, encoding='utf-8', decode_responses=True) - - # Figure out key_name - key_name = f'ws:group_id:{group_id}' - - log.debug('BEGIN: Loop through Redis list entry') - log.debug(redis_conn.llen(key_name)) - - # ws_header_key = connection.headers.get('sec-websocket-key') - # ws_header_key = f'ws:header_key:{ws_header_key}' - # log.debug(f'Looking for: {ws_header_key}') - - for i in range(0, redis_conn.llen(key_name)): - log.debug(f'{i}: {redis_conn.lindex(key_name, i)}') - redis_ws_group_ws_header_key = redis_conn.lindex(key_name, i) - - # Check if they key - # redis_key_name_ws_conn = f'ws:header_key:{redis_ws_group_ws_header_key}' - # ws_conn_data = json.loads(redis_conn.get('redis_key_name_ws_conn')) - # active_ws_conn = ws_conn_data.get('ws_conn') - # log.debug(active_ws_conn) - for active_ws_conn in self.active_connections: - log.debug(vars(active_ws_conn)) - - ws_header_key = active_ws_conn.headers.get('sec-websocket-key') - - if redis_ws_group_ws_header_key == ws_header_key: - log.info(f'Found key in Redis list!') - await active_ws_conn.send_json(data) - else: - log.debug(f'Key not found in Redis list') - - log.debug('END: Loop through Redis list entry') - - redis_conn.close() - - # NOTE: Same as group, but no filter based on path - async def broadcast(self, message: str): - log.setLevel(logging.INFO) - log.debug(locals()) - - for connection in self.active_connections: - log.debug(vars(connection)) - await connection.send_text(message) - - async def send_personal_message(self, message: str, websocket: WebSocket): - log.setLevel(logging.DEBUG) - # log.debug(dir(websocket)) - log.debug(vars(websocket)) - log.debug(websocket.url) - log.debug(websocket.client) - log.debug(websocket.client_state) - log.debug(websocket.headers['sec-websocket-key']) - # log.debug(base64.decode(bytes(websocket.headers['sec-websocket-key']), 'utf-8')) - await websocket.send_text(message) - - - -# manager = ConnectionManager() - - - -# class AIOConnectionManager: -# def __init__(self): -# # NOTE: The active_connections list should be in Redis -# self.active_connections: List[WebSocket] = [] - -# async def connect(self, websocket: WebSocket): -# await websocket.accept() -# log.setLevel(logging.DEBUG) -# log.info('WS connect: Header Security Key:'+websocket.headers.get('sec-websocket-key')) -# # log.debug(websocket.client) # the Address(host=ip, port=number) -# # log.info('WS Key: '+websocket.headers.get('sec-websocket-key')) -# # log.debug(websocket.headers) # Most useful - -# self.active_connections.append(websocket) -# log.debug(self.active_connections) - -# redis_conn = aioredis.Redis(host=settings.REDIS['server'], port=settings.REDIS['port'], db=6, password=None, encoding='utf-8', decode_responses=True) - -# # message = {'hello': 'world', 'goodbye': 'test'} -# # data = json.loads(message) - -# ws_header_key = websocket.headers.get('sec-websocket-key') -# key_name = f'ws:sec_key:{ws_header_key}' -# data = {'ws_header_key': ws_header_key} - -# await redis_conn.setex(key_name, datetime.timedelta(minutes=90), json.dumps(data)) - -# r_entry = json.loads(await redis_conn.get(key_name)) # For testing... -# r_ttl = await redis_conn.ttl(key_name) # For testing... - -# # log.debug(r_entry) # For testing... -# # log.debug(r_entry.get('ws_header_key')) # For testing... -# # log.debug(r_ttl) # For testing... - - - -# # r_entries = redis.scan() -# # log.debug(r_entries) - -# redis_conn.close() -# # await redis.close() - -# async def connect_w_client_id(self, websocket: WebSocket, client_id: str, group_id = None): -# await websocket.accept() -# log.setLevel(logging.DEBUG) -# log.info('WS connect: Header Security Key:'+websocket.headers.get('sec-websocket-key')) - -# self.active_connections.append(websocket) -# log.debug(self.active_connections) - -# redis_conn = redis.Redis(host=settings.REDIS['server'], port=settings.REDIS['port'], db=6, password=None, encoding='utf-8', decode_responses=True) - -# # # ### Redis ### Testing One -# # ws_header_key = websocket.headers.get('sec-websocket-key') -# # key_name = f'ws:sec_key:{ws_header_key}' -# # data = {'ws_header_key': ws_header_key} - -# # redis_conn.setex(key_name, datetime.timedelta(minutes=90), json.dumps(data)) - -# # r_entry = json.loads(redis_conn.get(key_name)) # For testing... -# # r_ttl = redis_conn.ttl(key_name) # For testing... - -# # # log.debug(r_entry) # For testing... -# # # log.debug(r_entry.get('ws_header_key')) # For testing... -# # # log.debug(r_ttl) # For testing... - -# # # ### Redis ### Testing Two!! -# # key_name = f'ws:conn:{ws_header_key}' -# # data = websocket - -# # redis_conn.setex(key_name, datetime.timedelta(minutes=90), json.dumps(data)) - -# # r_entry = json.loads(redis_conn.get(key_name)) # For testing... - -# # log.debug(r_entry) # For testing... - - -# # ### Redis ### Testing Three!!! -# ws_header_key = websocket.headers.get('sec-websocket-key') - -# # # Set WS connection in Redis based on WS header security key -# # key_name = f'ws:header_key:{ws_header_key}' -# # data = {'ws_conn': websocket} - -# # redis_conn.setex(key_name, datetime.timedelta(minutes=90), json.dumps(data)) - -# # Get new ws_header_key and group_id -# ws_header_key = websocket.headers.get('sec-websocket-key') -# group_id = websocket.path_params.get('group_id') - -# # Figure out key_name -# key_name = f'ws:group_id:{group_id}' - -# # Get the current list from Redis entry - -# # group_id_ws_conn_li = [] -# # group_id_ws_conn_li = redis_conn.get(key_name, []) - -# # Add the new WS connection's header key to the list -# redis_conn.lpush(key_name, ws_header_key) - -# # group_id_ws_conn_li.append(ws_header_key) - -# # data = group_id_ws_conn_li - -# # redis_conn.hsetex(key_name, datetime.timedelta(minutes=90), json.dumps(data)) - - -# # r_entry = json.loads(redis_conn.lget(key_name)) # For testing... -# log.debug('BEGIN: Loop through list key') -# log.debug(redis_conn.llen(key_name)) - -# # for i in range(0, redis_conn.llen(key_name)): -# # log.debug(redis_conn.lindex(key_name, i)) - -# log.debug('END: Loop through list key') - -# redis_conn.close() - -# def disconnect(self, websocket: WebSocket): -# log.setLevel(logging.DEBUG) -# log.info('WS disconnect') -# self.active_connections.remove(websocket) - - -# # Targets: echo, direct, group, broadcast -# # send_ text, bytes, json -# # receive_ text, bytes, json -# async def echo(self, message: str, websocket: WebSocket): -# log.setLevel(logging.DEBUG) -# # log.debug(dir(websocket)) -# log.debug(vars(websocket)) -# log.debug(websocket.url) -# log.debug(websocket.client) -# log.debug(websocket.client_state) -# log.debug(websocket.headers['sec-websocket-key']) -# # log.debug(base64.decode(bytes(websocket.headers['sec-websocket-key']), 'utf-8')) -# await websocket.send_text(message) - -# async def direct(self, from_client_id: str, to_client_id: str, data: dict): -# log.setLevel(logging.DEBUG) -# for connection in self.active_connections: -# log.debug(vars(connection)) -# log.debug(connection) -# await connection.send_text(message) - -# async def group(self, group_id: str, data: dict): -# log.setLevel(logging.INFO) -# log.debug(locals()) - -# # Single process version... -# # for connection in self.active_connections: -# # log.debug(vars(connection)) -# # # websocket.path_params.get('client_id') # Testing... -# # # if connection.scope.get('path') == group_id: # Testing... - -# # if connection.path_params.get('group_id') == group_id: -# # log.info('Found matching Group ID') -# # await connection.send_json(data) - -# # Redis version... -# redis_conn = redis.Redis(host=settings.REDIS['server'], port=settings.REDIS['port'], db=6, password=None, encoding='utf-8', decode_responses=True) - -# # Figure out key_name -# key_name = f'ws:group_id:{group_id}' - -# log.debug('BEGIN: Loop through Redis list entry') -# log.debug(redis_conn.llen(key_name)) - -# # ws_header_key = connection.headers.get('sec-websocket-key') -# # ws_header_key = f'ws:header_key:{ws_header_key}' -# # log.debug(f'Looking for: {ws_header_key}') - -# for i in range(0, redis_conn.llen(key_name)): -# log.debug(f'{i}: {redis_conn.lindex(key_name, i)}') -# redis_ws_group_ws_header_key = redis_conn.lindex(key_name, i) - -# # Check if they key -# # redis_key_name_ws_conn = f'ws:header_key:{redis_ws_group_ws_header_key}' -# # ws_conn_data = json.loads(redis_conn.get('redis_key_name_ws_conn')) -# # active_ws_conn = ws_conn_data.get('ws_conn') -# # log.debug(active_ws_conn) -# for active_ws_conn in self.active_connections: -# log.debug(vars(active_ws_conn)) - -# ws_header_key = active_ws_conn.headers.get('sec-websocket-key') - -# if redis_ws_group_ws_header_key == ws_header_key: -# log.info(f'Found key in Redis list!') -# await active_ws_conn.send_json(data) -# else: -# log.debug(f'Key not found in Redis list') - -# log.debug('END: Loop through Redis list entry') - -# redis_conn.close() - -# # NOTE: Same as group, but no filter based on path -# async def broadcast(self, message: str): -# log.setLevel(logging.INFO) -# log.debug(locals()) - -# for connection in self.active_connections: -# log.debug(vars(connection)) -# await connection.send_text(message) - -# async def send_personal_message(self, message: str, websocket: WebSocket): -# log.setLevel(logging.DEBUG) -# # log.debug(dir(websocket)) -# log.debug(vars(websocket)) -# log.debug(websocket.url) -# log.debug(websocket.client) -# log.debug(websocket.client_state) -# log.debug(websocket.headers['sec-websocket-key']) -# # log.debug(base64.decode(bytes(websocket.headers['sec-websocket-key']), 'utf-8')) -# await websocket.send_text(message) - - -# aio_manager = AIOConnectionManager() - - -# Endpoints for??? -# /room/ (just a group of clients; for a related group like a poster presenter or session room) -# /client/ (for one specific client/browser; something specific to a browser???) -# /person/ (for one specific person; handles send and receiving their messages) - - - - -# @router.websocket('/ws_redis/group/{group_id}/client/{client_id}') -# async def ws_client_id( -# websocket: WebSocket, -# group_id: str, -# client_id: str, -# ): -# await manager.connect_w_client_id(websocket, client_id, group_id) -# # await aio_manager.connect_w_client_id(websocket, client_id, group_id) -# try: -# while True: -# data = await websocket.receive_json() # Returns dict -# # log.debug(data) - -# # group_path_id = f'/ws/group/{group_id}' -# # client_id = data.get('client_id') -# # echo (echo message), dm (direct message), group (group message), all (broadcast message to all), cmd, group_cmd(?) -# msg_type = data.get('type') -# cmd = data.get('cmd') -# msg = data.get('msg') - -# log.setLevel(logging.INFO) -# log.info(f'Group ID: {group_id}; Client ID: {client_id}; Type: {msg_type};') -# log.debug(f'Command: {cmd}') -# log.debug(f'Message: {msg}') - -# data['client_id'] = client_id -# data['group_id'] = group_id - -# await manager.group(group_id=group_id, data=data) -# # await aio_manager.group(group_id=group_id, data=data) - -# await redis_connector(websocket) - -# except WebSocketDisconnect: -# manager.disconnect(websocket) -# # aio_manager.disconnect(websocket) -# # await manager.broadcast(f'Client #{client_id} left') - - - - - - - -# @router.websocket('/ws_redis/looping') -# async def ws_looping( -# websocket: WebSocket, -# ): -# await manager.connect(websocket) -# # await manager.broadcast(f'Welcome to looping') -# try: -# while True: -# # NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING -# # await time.sleep(3.5) # NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING -# # NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING -# # data = await websocket.receive_json() -# # log.debug(data) -# # data_dict = data -# # data_dict = json.loads(data) -# # log.debug(data_dict['client_id']) -# # await manager.send_personal_message(f'You wrote: {data}', websocket) -# await manager.broadcast(f'Loop!!!') -# except WebSocketDisconnect: -# manager.disconnect(websocket) -# await manager.broadcast(f'Client left looping') - - -# @router.websocket("/ws_redis/{client_id}") -# async def websocket_endpoint( -# websocket: WebSocket, -# client_id: int, -# response: Response = Response, -# ): -# log.setLevel(logging.DEBUG) -# log.debug(locals()) - -# log.info('Root of ws. Waiting to accept a websocket and then the redis_connector') - -# await websocket.accept() -# await redis_connector(websocket) - - # await manager.connect_w_client_id(websocket, client_id, group_id) - # # await aio_manager.connect_w_client_id(websocket, client_id, group_id) - # try: - # while True: - # data = await websocket.receive_json() # Returns dict - # # log.debug(data) - - # # group_path_id = f'/ws/group/{group_id}' - # # client_id = data.get('client_id') - # # echo (echo message), dm (direct message), group (group message), all (broadcast message to all), cmd, group_cmd(?) - # msg_type = data.get('type') - # cmd = data.get('cmd') - # msg = data.get('msg') - - # log.setLevel(logging.INFO) - # log.info(f'Group ID: {group_id}; Client ID: {client_id}; Type: {msg_type};') - # log.debug(f'Command: {cmd}') - # log.debug(f'Message: {msg}') - - # data['client_id'] = client_id - # data['group_id'] = group_id - - # await manager.group(group_id=group_id, data=data) - # # await aio_manager.group(group_id=group_id, data=data) - - # await redis_connector(websocket) - - # except WebSocketDisconnect: - # manager.disconnect(websocket) - # # aio_manager.disconnect(websocket) - # # await manager.broadcast(f'Client #{client_id} left') - - @router.websocket('/ws/group/{group_id}/client/{client_id}') @router.websocket('/ws_redis/group/{group_id}/client/{client_id}') async def redis_ws_client_id( @@ -596,7 +18,7 @@ async def redis_ws_client_id( group_id: str, client_id: str, ): - log.setLevel(logging.DEBUG) + log.setLevel(logging.INFO) log.debug(locals()) log.info('WS Redis: Group ID and Client ID. Waiting to "accept" a websocket and then the "redis_connector"') @@ -611,11 +33,17 @@ async def redis_connector( client_id: str, group_id = None ): - log.setLevel(logging.DEBUG) + log.setLevel(logging.INFO) log.debug(locals()) # The receives WS JSON data. Then publishes to Redis channel. - async def receiver_handler(ws_conn: WebSocket, r_conn, client_id, group_id): + async def receiver_handler( + ws_conn: WebSocket, + r_conn, + client_id = None, + group_id = None, + ): + log.setLevel(logging.INFO) log.info('receiver_handler()') # Loop until the WS is disconnected @@ -636,20 +64,20 @@ async def redis_connector( log.debug(f'Command: {cmd}') log.debug(f'Message: {msg}') - data['client_id'] = client_id - data['group_id'] = group_id - log.setLevel(logging.DEBUG) + if client_id: + data['client_id'] = client_id + if group_id: + data['group_id'] = group_id + # log.setLevel(logging.DEBUG) log.debug(data) if data: - #logging.info(ws) - #logging.info(dir(message)) - # log.debug(await r.get('chat:c')) - - await r_conn.publish('chat:c', json.dumps(data)) - + # logging.info(ws) + # logging.info(dir(message)) + # log.debug(await r_conn.get('channel:ws')) # log.debug(await r_conn.get_message()) + await r_conn.publish('channel:ws', json.dumps(data)) except WebSocketDisconnect as exc: # TODO: This needs handling better? # FUTURE: Remove from list of connected clients? @@ -660,33 +88,38 @@ async def redis_connector( log.error(e) await r_conn.close() - # log.debug(await r.get('chat:c')) - # Also called the Redis reader for a channel subscription # This watches a Redis channel. Then sends out the message. - async def sender_handler(r_channel, ws_conn: WebSocket, client_id, group_id): + async def sender_handler( + r_channel, + ws_conn: WebSocket, + client_id = None, + group_id = None, + ): + log.setLevel(logging.INFO) log.info('sender_handler()') try: while True: + message = await r_channel.get_message(ignore_subscribe_messages=True) if message is not None: log.debug(f'(Redis Reader) Message Received: {message}') data = json.loads(message['data']) # Need to make sure the ws_connection is still open before trying to send. - # log.debug(ws_conn.open()) - # log.debug(ws_conn.closed) log.debug(vars(ws_conn)) - log.debug(ws_conn.client_state) - log.debug(vars(ws_conn.client_state)) - - log.debug(ws_conn.application_state) - log.debug(vars(ws_conn.application_state)) + # log.debug(ws_conn.client_state) + log.debug(ws_conn.client_state.name) # 'WebSocketState.CONNECTED' or 'WebSocketState.DISCONNECTED' - if ws_conn.client_state == 'WebSocketState.CONNECTED': + # if ws_conn.client_state.CONNECTED: + # if ws_conn.client_state.value == 'WebSocketState.CONNECTED': + # if ws_conn.client_state == WebSocketState.CONNECTED: + if ws_conn.client_state.name == 'CONNECTED': + log.info(f'WS client ({client_id}) connected!') + if group_id and group_id == data.get('group_id'): log.info('Sending to matching group ID') await ws_conn.send_json(data) @@ -712,30 +145,31 @@ async def redis_connector( # except redis.exceptions.ConnectionError as e: # TODO: This needs handling better? # log.error(e) + # except WebSocketDisconnect as ws_disconn_e: + # log.error(ws_disconn_e) except Exception as e: log.exception('Exception in Aether!') log.error(e) - # raise RuntimeError(msg % message_type) # RuntimeError: Unexpected ASGI message 'websocket.send', after sending 'websocket.close'. # raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR) from None # redis.exceptions.ConnectionError: Connection closed by server. + # Redis client bound to pool of connections (auto-reconnecting). log.info('Create async Redis connection') redis_conn = redis.Redis(host=settings.REDIS['server'], port=settings.REDIS['port'], db=6, password=None, encoding='utf-8', decode_responses=True) redis_conn_pubsub = redis_conn.pubsub() - await redis_conn_pubsub.subscribe('channel:1', 'channel:2', 'chat:c') + await redis_conn_pubsub.subscribe('channel:ws', 'channel:1', 'channel:2', 'chat:c') - log.info('Run receiver_task') + log.info('Create receiver task') future_receiver = asyncio.create_task(receiver_handler(ws_conn, redis_conn, client_id, group_id)) - log.info('Run sender_task') - + log.info('Create sender task') future_sender = asyncio.create_task(sender_handler(redis_conn_pubsub, ws_conn, client_id, group_id)) await future_receiver # Get WS messages and set in Redis