From 0eb775826f69f49848230aad55d11effd34f424e Mon Sep 17 00:00:00 2001 From: Scott Idem Date: Fri, 7 Apr 2023 00:37:57 -0400 Subject: [PATCH] Work on websockets. A lot... --- app/main.py | 6 +- app/routers/hosted_file.py | 117 +++++- app/routers/websockets.py | 507 +++++++++++++++++++--- app/routers/websockets_redis.py | 715 ++++++++++++++++++++++++++++++++ 4 files changed, 1273 insertions(+), 72 deletions(-) create mode 100644 app/routers/websockets_redis.py diff --git a/app/main.py b/app/main.py index 96eb2dc..854bdac 100644 --- a/app/main.py +++ b/app/main.py @@ -18,7 +18,7 @@ from . import config from app.log import log, logging # Import the routers here first: -from app.routers import aether_cfg, api_crud, api, importing, sql, account, activity_log, address, archive, archive_content, contact, cont_edu_cert, cont_edu_cert_person, data_store, event, event_abstract, event_badge, event_badge_importing, event_badge_template, event_device, event_exhibit, event_exhibit_tracking, event_file, event_importing, event_location, event_person, event_person_detail, event_person_tracking, event_presentation, event_presenter, event_registration, event_session, flask_cfg, fundraising, hosted_file, journal, journal_entry, log_client_viewing, lookup, membership_cfg, membership_group, membership_person_group, membership_person, membership_person_profile, membership_type, membership_person_type, order, order_v3, order_line, order_cart, organization, page, person, person_user, post, post_comment, product, qr, site, site_domain, user, websockets, e_cvent, c_idaa, e_impexium, e_stripe +from app.routers import aether_cfg, api_crud, api, importing, sql, account, activity_log, address, archive, archive_content, contact, cont_edu_cert, cont_edu_cert_person, data_store, event, event_abstract, event_badge, event_badge_importing, event_badge_template, event_device, event_exhibit, event_exhibit_tracking, event_file, event_importing, event_location, event_person, event_person_detail, event_person_tracking, event_presentation, event_presenter, event_registration, event_session, flask_cfg, fundraising, hosted_file, journal, journal_entry, log_client_viewing, lookup, membership_cfg, membership_group, membership_person_group, membership_person, membership_person_profile, membership_type, membership_person_type, order, order_v3, order_line, order_cart, organization, page, person, person_user, post, post_comment, product, qr, site, site_domain, user, websockets, websockets_redis, e_cvent, c_idaa, e_impexium, e_stripe from app.db_sql import sql_select # , sql_connect @@ -392,6 +392,10 @@ app.include_router( #dependencies=[Depends(get_token_header)], #responses={404: {'description': 'Not found'}}, ) +app.include_router( + websockets_redis.router, + tags=['Websockets Redis'], +) app.include_router( e_cvent.router, diff --git a/app/routers/hosted_file.py b/app/routers/hosted_file.py index 7f8d2ff..c669727 100644 --- a/app/routers/hosted_file.py +++ b/app/routers/hosted_file.py @@ -3,6 +3,7 @@ from fastapi import APIRouter, Body, Depends, File, Form, Header, HTTPException, from fastapi.responses import FileResponse from pydantic import BaseModel, EmailStr, Field from typing import Dict, List, Optional, Set, Union +from pdf2image import convert_from_path from app.lib_general import log, logging, common_route_params, Common_Route_Params, common_route_params_min, Common_Route_Params_Min from app.config import settings @@ -699,20 +700,114 @@ async def get_hosted_file_obj( # ### BEGIN ### API Hosted File ### download_tmp() ### -# Updated 2021-11-23 -@router.get('/download/tmp/{filename}', response_model=Resp_Body_Base) +# Updated 2023-04-05 +@router.get('/tmp/{subdirectory}/{filename}/download', response_model=Resp_Body_Base) async def download_tmp( - filename: str = Query(..., min_length=4, max_length=100), - # x_account_id: str = Header(...), - by_alias: Optional[bool] = True, - exclude_unset: Optional[bool] = True, - response: Response = Response, + subdirectory: str = Query(..., min_length=1, max_length=100), + filename: str = Query(..., min_length=4, max_length=120), + + commons: Common_Route_Params = Depends(common_route_params), ): - log.setLevel(logging.INFO) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL + log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) - full_dest_path = 'admin/temp/order_line/order_line_list_2021-11-23_1310.xlsx' - filename = 'text.xlsx' + # NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING + time.sleep(3.5) # NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING + # NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING - return FileResponse(full_dest_path, filename=filename) + hosted_tmp_path = settings.FILES_PATH['hosted_tmp_root'] + log.info(f'Hosted Tmp Path: {hosted_tmp_path}') + log.debug(shutil.disk_usage(hosted_tmp_path)) + + hosted_tmp_w_subdir = os.path.join(hosted_tmp_path, subdirectory) + # if pathlib.Path(hosted_tmp_w_subdir): + if os.path.exists(hosted_tmp_w_subdir): + log.info('Hosted tmp with subdirectory found') + else: + log.info('Hosted tmp with subdirectory not found') + return mk_resp(data=False, status_code=404, response=commons.response, status_message='The hosted tmp file subdirectory was not found.') # Not Found + + hosted_tmp_w_subdir_filename = os.path.join(hosted_tmp_path, subdirectory, filename) + # if pathlib.Path(hosted_tmp_w_subdir_filename): + if os.path.exists(hosted_tmp_w_subdir_filename): + log.info('Hosted tmp with subdirectory and filename found') + else: + log.info('Hosted tmp with subdirectory and filename not found') + return mk_resp(data=False, status_code=404, response=commons.response, status_message='The hosted tmp file was not found.') # Not Found + + return FileResponse(hosted_tmp_w_subdir_filename, filename=filename) # ### END ### API Hosted File ### download_tmp() ### + + +# ### BEGIN ### API Hosted File Route ### convert_file() ### +# This just needs to return the correct model for a new hosted_file +# Updated 2023-04-04 +@router.get('/{hosted_file_id}/convert_file') +async def convert_file( + hosted_file_id: str = Query(..., min_length=11, max_length=22), + + from_type: str = 'pdf', + to_type: str = 'webp', + pdf_opt1: bool = False, + pdf_opt2: str = 'test', + + commons: Common_Route_Params = Depends(common_route_params), + ): + log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL + log.debug(locals()) + + # Need to look up file_hash for hosted_file_id + + # file_hash = '0080f0b03144927c173694745483894a09208d9444fdaccab054493f699361be' + file_hash = '279312d1738fd3a8a2f136b48295e28664d38b18de66c55de56b8886b9454784' + file_hash_filename = f'{file_hash}.file' + + hosted_files_path = settings.FILES_PATH['hosted_files_root'] + log.info(f'Hosted Files Path: {hosted_files_path}') + log.debug(shutil.disk_usage(hosted_files_path)) + + file_subdirectory = file_hash[0:2] + full_file_path = os.path.join(hosted_files_path, file_subdirectory, file_hash_filename) + log.info(f'File Hash with Subdirectory: {full_file_path}') + + hosted_tmp_path = settings.FILES_PATH['hosted_tmp_root'] + log.info(f'Hosted Tmp Path: {hosted_tmp_path}') + log.debug(shutil.disk_usage(hosted_tmp_path)) + + hosted_tmp_convert_file_path = os.path.join(hosted_tmp_path, 'convert_file') + if pathlib.Path(hosted_tmp_convert_file_path): + log.info('Hosted tmp convert file path found') + else: + log.info('Creating hosted tmp convert file path') + pathlib.Path(hosted_tmp_convert_file_path).mkdir(parents=True, exist_ok=True) + + images = convert_from_path(full_file_path, size=(2160, None)) + for image in images: + save_path = os.path.join(hosted_tmp_convert_file_path, 'converted_2160px_lossless_90q.webp') + + # image.save('testing_2625px_9.png', compress_level=9) + + # Lossy WebP takes about 25% of the time as WebP lossless compression with 100 level effort + # .46 seconds vs 2.1 seconds with example PDF + + # image.save('testing_2625px_80q.webp', quality=80) # default + # timer_2a_start = timer() + image.save(save_path, lossless=False, quality=90) # default quality is 80 + # timer_2a_end = timer() + # print( round((timer_2a_end - timer_2a_start), 8) ) + + # timer_2b_start = timer() + # image.save('testing_2160px_lossless_100q.webp', lossless=True, quality=100) # quality is level of effort + # timer_2b_end = timer() + # print( round((timer_2b_end - timer_2b_start), 8) ) + + # file_info = await save_file( + # file = file_obj, + # account_id = account_id, + # account_id_random = account_id_random, + # link_to_type = link_to_type, + # link_to_id = link_to_id, + # link_to_id_random = link_to_id_random, + # check_allowed_extension = False, + # ) + # if file_info['saved']: pass \ No newline at end of file diff --git a/app/routers/websockets.py b/app/routers/websockets.py index 65ccdf4..7ac219b 100644 --- a/app/routers/websockets.py +++ b/app/routers/websockets.py @@ -3,6 +3,12 @@ from fastapi.responses import HTMLResponse from pydantic import BaseModel, EmailStr, Field from typing import Dict, List, Optional, Set, Union import redis, asyncio, base64, datetime, hashlib, json, os, pathlib, shutil, time +# import asyncio, base64, datetime, hashlib, json, os, pathlib, shutil, time +# import aioredis, redis, asyncio, base64, datetime, hashlib, json, os, pathlib, shutil, time +# from aioredis import from_url, Redis +# import asyncio +# import aioredis +# import async_timeout from app.lib_general import log, logging, common_route_params, Common_Route_Params, common_route_params_min, Common_Route_Params_Min from app.config import settings @@ -71,11 +77,127 @@ class ConnectionManager: async def connect(self, websocket: WebSocket): await websocket.accept() - log.info('WS connect') + log.setLevel(logging.DEBUG) + log.info('WS connect: Header Security Key:'+websocket.headers.get('sec-websocket-key')) + # log.debug(dir(websocket)) + # log.debug(websocket.client) # the Address(host=ip, port=number) + # log.info('WS Key: '+websocket.headers.get('sec-websocket-key')) + # log.debug(websocket.headers) # Most useful + # log.debug(websocket.app) # Nope + # log.debug(websocket.app()) # Nope + # log.debug(websocket.session) # Nope + # log.debug(websocket.session()) # Nope + # log.debug(websocket.user) # Nope + # log.debug(websocket.values()) # Not sure if useful + self.active_connections.append(websocket) log.debug(self.active_connections) + redis_conn = redis.Redis(host=settings.REDIS['server'], port=settings.REDIS['port'], db=6, password=None, encoding='utf-8', decode_responses=True) + + # message = {'hello': 'world', 'goodbye': 'test'} + # data = json.loads(message) + + ws_header_key = websocket.headers.get('sec-websocket-key') + key_name = f'ws:sec_key:{ws_header_key}' + data = {'ws_header_key': ws_header_key} + + redis_conn.setex(key_name, datetime.timedelta(minutes=90), json.dumps(data)) + + r_entry = json.loads(redis_conn.get(key_name)) # For testing... + r_ttl = redis_conn.ttl(key_name) # For testing... + + # log.debug(r_entry) # For testing... + # log.debug(r_entry.get('ws_header_key')) # For testing... + # log.debug(r_ttl) # For testing... + + + + # r_entries = redis.scan() + # log.debug(r_entries) + + redis_conn.close() + # await redis.close() + + async def connect_w_client_id(self, websocket: WebSocket, client_id: str, group_id = None): + await websocket.accept() + log.setLevel(logging.DEBUG) + log.info('WS connect: Header Security Key:'+websocket.headers.get('sec-websocket-key')) + + self.active_connections.append(websocket) + log.debug(self.active_connections) + + redis_conn = redis.Redis(host=settings.REDIS['server'], port=settings.REDIS['port'], db=6, password=None, encoding='utf-8', decode_responses=True) + + # # ### Redis ### Testing One + # ws_header_key = websocket.headers.get('sec-websocket-key') + # key_name = f'ws:sec_key:{ws_header_key}' + # data = {'ws_header_key': ws_header_key} + + # redis_conn.setex(key_name, datetime.timedelta(minutes=90), json.dumps(data)) + + # r_entry = json.loads(redis_conn.get(key_name)) # For testing... + # r_ttl = redis_conn.ttl(key_name) # For testing... + + # # log.debug(r_entry) # For testing... + # # log.debug(r_entry.get('ws_header_key')) # For testing... + # # log.debug(r_ttl) # For testing... + + # # ### Redis ### Testing Two!! + # key_name = f'ws:conn:{ws_header_key}' + # data = websocket + + # redis_conn.setex(key_name, datetime.timedelta(minutes=90), json.dumps(data)) + + # r_entry = json.loads(redis_conn.get(key_name)) # For testing... + + # log.debug(r_entry) # For testing... + + + # ### Redis ### Testing Three!!! + ws_header_key = websocket.headers.get('sec-websocket-key') + + # # Set WS connection in Redis based on WS header security key + # key_name = f'ws:header_key:{ws_header_key}' + # data = {'ws_conn': websocket} + + # redis_conn.setex(key_name, datetime.timedelta(minutes=90), json.dumps(data)) + + # Get new ws_header_key and group_id + ws_header_key = websocket.headers.get('sec-websocket-key') + group_id = websocket.path_params.get('group_id') + + # Figure out key_name + key_name = f'ws:group_id:{group_id}' + + # Get the current list from Redis entry + + # group_id_ws_conn_li = [] + # group_id_ws_conn_li = redis_conn.get(key_name, []) + + # Add the new WS connection's header key to the list + redis_conn.lpush(key_name, ws_header_key) + + # group_id_ws_conn_li.append(ws_header_key) + + # data = group_id_ws_conn_li + + # redis_conn.hsetex(key_name, datetime.timedelta(minutes=90), json.dumps(data)) + + + # r_entry = json.loads(redis_conn.lget(key_name)) # For testing... + log.debug('BEGIN: Loop through list key') + log.debug(redis_conn.llen(key_name)) + + # for i in range(0, redis_conn.llen(key_name)): + # log.debug(redis_conn.lindex(key_name, i)) + + log.debug('END: Loop through list key') + + redis_conn.close() + def disconnect(self, websocket: WebSocket): + log.setLevel(logging.DEBUG) log.info('WS disconnect') self.active_connections.remove(websocket) @@ -105,13 +227,52 @@ class ConnectionManager: log.setLevel(logging.INFO) log.debug(locals()) - for connection in self.active_connections: - log.debug(vars(connection)) - # websocket.path_params.get('client_id') - # if connection.scope.get('path') == group_id: - if connection.path_params.get('group_id') == group_id: - log.info('Found matching Group ID') - await connection.send_json(data) + # Single process version... + # for connection in self.active_connections: + # log.debug(vars(connection)) + # # websocket.path_params.get('client_id') # Testing... + # # if connection.scope.get('path') == group_id: # Testing... + + # if connection.path_params.get('group_id') == group_id: + # log.info('Found matching Group ID') + # await connection.send_json(data) + + # Redis version... + redis_conn = redis.Redis(host=settings.REDIS['server'], port=settings.REDIS['port'], db=6, password=None, encoding='utf-8', decode_responses=True) + + # Figure out key_name + key_name = f'ws:group_id:{group_id}' + + log.debug('BEGIN: Loop through Redis list entry') + log.debug(redis_conn.llen(key_name)) + + # ws_header_key = connection.headers.get('sec-websocket-key') + # ws_header_key = f'ws:header_key:{ws_header_key}' + # log.debug(f'Looking for: {ws_header_key}') + + for i in range(0, redis_conn.llen(key_name)): + log.debug(f'{i}: {redis_conn.lindex(key_name, i)}') + redis_ws_group_ws_header_key = redis_conn.lindex(key_name, i) + + # Check if they key + # redis_key_name_ws_conn = f'ws:header_key:{redis_ws_group_ws_header_key}' + # ws_conn_data = json.loads(redis_conn.get('redis_key_name_ws_conn')) + # active_ws_conn = ws_conn_data.get('ws_conn') + # log.debug(active_ws_conn) + for active_ws_conn in self.active_connections: + log.debug(vars(active_ws_conn)) + + ws_header_key = active_ws_conn.headers.get('sec-websocket-key') + + if redis_ws_group_ws_header_key == ws_header_key: + log.info(f'Found key in Redis list!') + await active_ws_conn.send_json(data) + else: + log.debug(f'Key not found in Redis list') + + log.debug('END: Loop through Redis list entry') + + redis_conn.close() # NOTE: Same as group, but no filter based on path async def broadcast(self, message: str): @@ -138,12 +299,289 @@ class ConnectionManager: manager = ConnectionManager() + +# class AIOConnectionManager: +# def __init__(self): +# # NOTE: The active_connections list should be in Redis +# self.active_connections: List[WebSocket] = [] + +# async def connect(self, websocket: WebSocket): +# await websocket.accept() +# log.setLevel(logging.DEBUG) +# log.info('WS connect: Header Security Key:'+websocket.headers.get('sec-websocket-key')) +# # log.debug(websocket.client) # the Address(host=ip, port=number) +# # log.info('WS Key: '+websocket.headers.get('sec-websocket-key')) +# # log.debug(websocket.headers) # Most useful + +# self.active_connections.append(websocket) +# log.debug(self.active_connections) + +# redis_conn = aioredis.Redis(host=settings.REDIS['server'], port=settings.REDIS['port'], db=6, password=None, encoding='utf-8', decode_responses=True) + +# # message = {'hello': 'world', 'goodbye': 'test'} +# # data = json.loads(message) + +# ws_header_key = websocket.headers.get('sec-websocket-key') +# key_name = f'ws:sec_key:{ws_header_key}' +# data = {'ws_header_key': ws_header_key} + +# await redis_conn.setex(key_name, datetime.timedelta(minutes=90), json.dumps(data)) + +# r_entry = json.loads(await redis_conn.get(key_name)) # For testing... +# r_ttl = await redis_conn.ttl(key_name) # For testing... + +# # log.debug(r_entry) # For testing... +# # log.debug(r_entry.get('ws_header_key')) # For testing... +# # log.debug(r_ttl) # For testing... + + + +# # r_entries = redis.scan() +# # log.debug(r_entries) + +# redis_conn.close() +# # await redis.close() + +# async def connect_w_client_id(self, websocket: WebSocket, client_id: str, group_id = None): +# await websocket.accept() +# log.setLevel(logging.DEBUG) +# log.info('WS connect: Header Security Key:'+websocket.headers.get('sec-websocket-key')) + +# self.active_connections.append(websocket) +# log.debug(self.active_connections) + +# redis_conn = redis.Redis(host=settings.REDIS['server'], port=settings.REDIS['port'], db=6, password=None, encoding='utf-8', decode_responses=True) + +# # # ### Redis ### Testing One +# # ws_header_key = websocket.headers.get('sec-websocket-key') +# # key_name = f'ws:sec_key:{ws_header_key}' +# # data = {'ws_header_key': ws_header_key} + +# # redis_conn.setex(key_name, datetime.timedelta(minutes=90), json.dumps(data)) + +# # r_entry = json.loads(redis_conn.get(key_name)) # For testing... +# # r_ttl = redis_conn.ttl(key_name) # For testing... + +# # # log.debug(r_entry) # For testing... +# # # log.debug(r_entry.get('ws_header_key')) # For testing... +# # # log.debug(r_ttl) # For testing... + +# # # ### Redis ### Testing Two!! +# # key_name = f'ws:conn:{ws_header_key}' +# # data = websocket + +# # redis_conn.setex(key_name, datetime.timedelta(minutes=90), json.dumps(data)) + +# # r_entry = json.loads(redis_conn.get(key_name)) # For testing... + +# # log.debug(r_entry) # For testing... + + +# # ### Redis ### Testing Three!!! +# ws_header_key = websocket.headers.get('sec-websocket-key') + +# # # Set WS connection in Redis based on WS header security key +# # key_name = f'ws:header_key:{ws_header_key}' +# # data = {'ws_conn': websocket} + +# # redis_conn.setex(key_name, datetime.timedelta(minutes=90), json.dumps(data)) + +# # Get new ws_header_key and group_id +# ws_header_key = websocket.headers.get('sec-websocket-key') +# group_id = websocket.path_params.get('group_id') + +# # Figure out key_name +# key_name = f'ws:group_id:{group_id}' + +# # Get the current list from Redis entry + +# # group_id_ws_conn_li = [] +# # group_id_ws_conn_li = redis_conn.get(key_name, []) + +# # Add the new WS connection's header key to the list +# redis_conn.lpush(key_name, ws_header_key) + +# # group_id_ws_conn_li.append(ws_header_key) + +# # data = group_id_ws_conn_li + +# # redis_conn.hsetex(key_name, datetime.timedelta(minutes=90), json.dumps(data)) + + +# # r_entry = json.loads(redis_conn.lget(key_name)) # For testing... +# log.debug('BEGIN: Loop through list key') +# log.debug(redis_conn.llen(key_name)) + +# # for i in range(0, redis_conn.llen(key_name)): +# # log.debug(redis_conn.lindex(key_name, i)) + +# log.debug('END: Loop through list key') + +# redis_conn.close() + +# def disconnect(self, websocket: WebSocket): +# log.setLevel(logging.DEBUG) +# log.info('WS disconnect') +# self.active_connections.remove(websocket) + + +# # Targets: echo, direct, group, broadcast +# # send_ text, bytes, json +# # receive_ text, bytes, json +# async def echo(self, message: str, websocket: WebSocket): +# log.setLevel(logging.DEBUG) +# # log.debug(dir(websocket)) +# log.debug(vars(websocket)) +# log.debug(websocket.url) +# log.debug(websocket.client) +# log.debug(websocket.client_state) +# log.debug(websocket.headers['sec-websocket-key']) +# # log.debug(base64.decode(bytes(websocket.headers['sec-websocket-key']), 'utf-8')) +# await websocket.send_text(message) + +# async def direct(self, from_client_id: str, to_client_id: str, data: dict): +# log.setLevel(logging.DEBUG) +# for connection in self.active_connections: +# log.debug(vars(connection)) +# log.debug(connection) +# await connection.send_text(message) + +# async def group(self, group_id: str, data: dict): +# log.setLevel(logging.INFO) +# log.debug(locals()) + +# # Single process version... +# # for connection in self.active_connections: +# # log.debug(vars(connection)) +# # # websocket.path_params.get('client_id') # Testing... +# # # if connection.scope.get('path') == group_id: # Testing... + +# # if connection.path_params.get('group_id') == group_id: +# # log.info('Found matching Group ID') +# # await connection.send_json(data) + +# # Redis version... +# redis_conn = redis.Redis(host=settings.REDIS['server'], port=settings.REDIS['port'], db=6, password=None, encoding='utf-8', decode_responses=True) + +# # Figure out key_name +# key_name = f'ws:group_id:{group_id}' + +# log.debug('BEGIN: Loop through Redis list entry') +# log.debug(redis_conn.llen(key_name)) + +# # ws_header_key = connection.headers.get('sec-websocket-key') +# # ws_header_key = f'ws:header_key:{ws_header_key}' +# # log.debug(f'Looking for: {ws_header_key}') + +# for i in range(0, redis_conn.llen(key_name)): +# log.debug(f'{i}: {redis_conn.lindex(key_name, i)}') +# redis_ws_group_ws_header_key = redis_conn.lindex(key_name, i) + +# # Check if they key +# # redis_key_name_ws_conn = f'ws:header_key:{redis_ws_group_ws_header_key}' +# # ws_conn_data = json.loads(redis_conn.get('redis_key_name_ws_conn')) +# # active_ws_conn = ws_conn_data.get('ws_conn') +# # log.debug(active_ws_conn) +# for active_ws_conn in self.active_connections: +# log.debug(vars(active_ws_conn)) + +# ws_header_key = active_ws_conn.headers.get('sec-websocket-key') + +# if redis_ws_group_ws_header_key == ws_header_key: +# log.info(f'Found key in Redis list!') +# await active_ws_conn.send_json(data) +# else: +# log.debug(f'Key not found in Redis list') + +# log.debug('END: Loop through Redis list entry') + +# redis_conn.close() + +# # NOTE: Same as group, but no filter based on path +# async def broadcast(self, message: str): +# log.setLevel(logging.INFO) +# log.debug(locals()) + +# for connection in self.active_connections: +# log.debug(vars(connection)) +# await connection.send_text(message) + +# async def send_personal_message(self, message: str, websocket: WebSocket): +# log.setLevel(logging.DEBUG) +# # log.debug(dir(websocket)) +# log.debug(vars(websocket)) +# log.debug(websocket.url) +# log.debug(websocket.client) +# log.debug(websocket.client_state) +# log.debug(websocket.headers['sec-websocket-key']) +# # log.debug(base64.decode(bytes(websocket.headers['sec-websocket-key']), 'utf-8')) +# await websocket.send_text(message) + + +# aio_manager = AIOConnectionManager() + + # Endpoints for??? # /room/ (just a group of clients; for a related group like a poster presenter or session room) # /client/ (for one specific client/browser; something specific to a browser???) # /person/ (for one specific person; handles send and receiving their messages) + + +@router.websocket('/ws_single/group/{group_id}/client/{client_id}') +async def single_ws_client_id( + websocket: WebSocket, + group_id: str, + client_id: str, + ): + await manager.connect_w_client_id(websocket, client_id, group_id) + # await aio_manager.connect_w_client_id(websocket, client_id, group_id) + try: + while True: + data = await websocket.receive_json() # Returns dict + # log.debug(data) + + # group_path_id = f'/ws/group/{group_id}' + # client_id = data.get('client_id') + # echo (echo message), dm (direct message), group (group message), all (broadcast message to all), cmd, group_cmd(?) + msg_type = data.get('type') + cmd = data.get('cmd') + msg = data.get('msg') + + log.setLevel(logging.INFO) + log.info(f'Group ID: {group_id}; Client ID: {client_id}; Type: {msg_type};') + log.debug(f'Command: {cmd}') + log.debug(f'Message: {msg}') + + data['client_id'] = client_id + data['group_id'] = group_id + + await manager.group(group_id=group_id, data=data) + # await aio_manager.group(group_id=group_id, data=data) + + # if msg_type: + # if msg_type == 'echo': + # await manager.echo(f'Echo: {data}', websocket) + # elif msg_type == 'dm': + # await manager.direct(f'DM: {msg}') + # elif msg_type == 'group': + # await manager.group(group_id=group_id, f'Group: {data}') + # elif msg_type == 'all': + # await manager.broadcast(f'All: {data}') + # elif msg_type == 'cmd': + # await manager.broadcast(f'Command: {data}') + # else: + # await manager.broadcast(f'Unknown: {data}') + # else: + # await manager.broadcast(f'MSG: {data}') + + except WebSocketDisconnect: + manager.disconnect(websocket) + # aio_manager.disconnect(websocket) + # await manager.broadcast(f'Client #{client_id} left') + + @router.websocket('/ws/client/{client_id}') async def ws_client_id( websocket: WebSocket, @@ -188,57 +626,6 @@ async def ws_client_id( # await manager.broadcast(f'Client #{client_id} left') -@router.websocket('/ws/group/{group_id}/client/{client_id}') -async def ws_client_id( - websocket: WebSocket, - group_id: str, - client_id: str, - ): - await manager.connect(websocket) - try: - while True: - data = await websocket.receive_json() # Returns dict - # log.debug(data) - - # group_path_id = f'/ws/group/{group_id}' - # client_id = data.get('client_id') - # echo (echo message), dm (direct message), group (group message), all (broadcast message to all), cmd, group_cmd(?) - msg_type = data.get('type') - cmd = data.get('cmd') - msg = data.get('msg') - - log.setLevel(logging.INFO) - log.info(f'Group ID: {group_id}; Client ID: {client_id}; Type: {msg_type};') - log.debug(f'Command: {cmd}') - log.debug(f'Message: {msg}') - - data['client_id'] = client_id - data['group_id'] = group_id - - await manager.group(group_id=group_id, data=data) - - # if msg_type: - # if msg_type == 'echo': - # await manager.echo(f'Echo: {data}', websocket) - # elif msg_type == 'dm': - # await manager.direct(f'DM: {msg}') - # elif msg_type == 'group': - # await manager.group(group_id=group_id, f'Group: {data}') - # elif msg_type == 'all': - # await manager.broadcast(f'All: {data}') - # elif msg_type == 'cmd': - # await manager.broadcast(f'Command: {data}') - # else: - # await manager.broadcast(f'Unknown: {data}') - # else: - # await manager.broadcast(f'MSG: {data}') - - except WebSocketDisconnect: - manager.disconnect(websocket) - # await manager.broadcast(f'Client #{client_id} left') - - - @router.websocket('/ws/{client_id}') async def ws_id( websocket: WebSocket, @@ -259,7 +646,6 @@ async def ws_id( await manager.broadcast(f'Client #{client_id} left') - # @router.websocket('/ws/room/{room_id}') # async def ws_room_id( # websocket: WebSocket, @@ -362,6 +748,7 @@ async def redis_connector( # redis = await redis.create_pool(redis_url) # Redis client bound to pool of connections (auto-reconnecting). + # r = redis.Redis(host=settings.REDIS['server'], port=settings.REDIS['port'], db=7, password=None, decode_responses=True) redis = redis.from_url( redis_url, encoding="utf-8", decode_responses=True ) diff --git a/app/routers/websockets_redis.py b/app/routers/websockets_redis.py new file mode 100644 index 0000000..48973c0 --- /dev/null +++ b/app/routers/websockets_redis.py @@ -0,0 +1,715 @@ +from fastapi import APIRouter, FastAPI, Response, WebSocket, WebSocketDisconnect +from fastapi.responses import HTMLResponse +from pydantic import BaseModel, EmailStr, Field +from typing import Dict, List, Optional, Set, Union +import asyncio, base64, datetime, hashlib, json, os, pathlib, redis, shutil, time +# import aioredis, redis, asyncio, base64, datetime, hashlib, json, os, pathlib, shutil, time +# from aioredis import from_url, Redis +# import asyncio +# import aioredis +# import async_timeout + +from app.lib_general import log, logging, common_route_params, Common_Route_Params, common_route_params_min, Common_Route_Params_Min +from app.config import settings + +router = APIRouter() + + +class ConnectionManager: + def __init__(self): + # NOTE: The active_connections list should be in Redis + self.active_connections: List[WebSocket] = [] + + async def connect(self, websocket: WebSocket): + await websocket.accept() + log.setLevel(logging.DEBUG) + log.info('WS connect: Header Security Key:'+websocket.headers.get('sec-websocket-key')) + # log.debug(dir(websocket)) + # log.debug(websocket.client) # the Address(host=ip, port=number) + # log.info('WS Key: '+websocket.headers.get('sec-websocket-key')) + # log.debug(websocket.headers) # Most useful + # log.debug(websocket.app) # Nope + # log.debug(websocket.app()) # Nope + # log.debug(websocket.session) # Nope + # log.debug(websocket.session()) # Nope + # log.debug(websocket.user) # Nope + # log.debug(websocket.values()) # Not sure if useful + + self.active_connections.append(websocket) + log.debug(self.active_connections) + + redis_conn = redis.Redis(host=settings.REDIS['server'], port=settings.REDIS['port'], db=6, password=None, encoding='utf-8', decode_responses=True) + + # message = {'hello': 'world', 'goodbye': 'test'} + # data = json.loads(message) + + ws_header_key = websocket.headers.get('sec-websocket-key') + key_name = f'ws:sec_key:{ws_header_key}' + data = {'ws_header_key': ws_header_key} + + redis_conn.setex(key_name, datetime.timedelta(minutes=90), json.dumps(data)) + + r_entry = json.loads(redis_conn.get(key_name)) # For testing... + r_ttl = redis_conn.ttl(key_name) # For testing... + + # log.debug(r_entry) # For testing... + # log.debug(r_entry.get('ws_header_key')) # For testing... + # log.debug(r_ttl) # For testing... + + + + # r_entries = redis.scan() + # log.debug(r_entries) + + redis_conn.close() + # await redis.close() + + async def connect_w_client_id(self, websocket: WebSocket, client_id: str, group_id = None): + await websocket.accept() + log.setLevel(logging.DEBUG) + log.info('WS connect: Header Security Key:'+websocket.headers.get('sec-websocket-key')) + + self.active_connections.append(websocket) + log.debug(self.active_connections) + + redis_conn = redis.Redis(host=settings.REDIS['server'], port=settings.REDIS['port'], db=6, password=None, encoding='utf-8', decode_responses=True) + + # # ### Redis ### Testing One + # ws_header_key = websocket.headers.get('sec-websocket-key') + # key_name = f'ws:sec_key:{ws_header_key}' + # data = {'ws_header_key': ws_header_key} + + # redis_conn.setex(key_name, datetime.timedelta(minutes=90), json.dumps(data)) + + # r_entry = json.loads(redis_conn.get(key_name)) # For testing... + # r_ttl = redis_conn.ttl(key_name) # For testing... + + # # log.debug(r_entry) # For testing... + # # log.debug(r_entry.get('ws_header_key')) # For testing... + # # log.debug(r_ttl) # For testing... + + # # ### Redis ### Testing Two!! + # key_name = f'ws:conn:{ws_header_key}' + # data = websocket + + # redis_conn.setex(key_name, datetime.timedelta(minutes=90), json.dumps(data)) + + # r_entry = json.loads(redis_conn.get(key_name)) # For testing... + + # log.debug(r_entry) # For testing... + + + # ### Redis ### Testing Three!!! + ws_header_key = websocket.headers.get('sec-websocket-key') + + # # Set WS connection in Redis based on WS header security key + # key_name = f'ws:header_key:{ws_header_key}' + # data = {'ws_conn': websocket} + + # redis_conn.setex(key_name, datetime.timedelta(minutes=90), json.dumps(data)) + + # Get new ws_header_key and group_id + ws_header_key = websocket.headers.get('sec-websocket-key') + group_id = websocket.path_params.get('group_id') + + # Figure out key_name + key_name = f'ws:group_id:{group_id}' + + # Get the current list from Redis entry + + # group_id_ws_conn_li = [] + # group_id_ws_conn_li = redis_conn.get(key_name, []) + + # Add the new WS connection's header key to the list + redis_conn.lpush(key_name, ws_header_key) + + # group_id_ws_conn_li.append(ws_header_key) + + # data = group_id_ws_conn_li + + # redis_conn.hsetex(key_name, datetime.timedelta(minutes=90), json.dumps(data)) + + + # r_entry = json.loads(redis_conn.lget(key_name)) # For testing... + log.debug('BEGIN: Loop through list key') + log.debug(redis_conn.llen(key_name)) + + # for i in range(0, redis_conn.llen(key_name)): + # log.debug(redis_conn.lindex(key_name, i)) + + log.debug('END: Loop through list key') + + redis_conn.close() + + def disconnect(self, websocket: WebSocket): + log.setLevel(logging.DEBUG) + log.info('WS disconnect') + self.active_connections.remove(websocket) + + + # Targets: echo, direct, group, broadcast + # send_ text, bytes, json + # receive_ text, bytes, json + async def echo(self, message: str, websocket: WebSocket): + log.setLevel(logging.DEBUG) + # log.debug(dir(websocket)) + log.debug(vars(websocket)) + log.debug(websocket.url) + log.debug(websocket.client) + log.debug(websocket.client_state) + log.debug(websocket.headers['sec-websocket-key']) + # log.debug(base64.decode(bytes(websocket.headers['sec-websocket-key']), 'utf-8')) + await websocket.send_text(message) + + async def direct(self, from_client_id: str, to_client_id: str, data: dict): + log.setLevel(logging.DEBUG) + for connection in self.active_connections: + log.debug(vars(connection)) + log.debug(connection) + await connection.send_text(message) + + async def group(self, group_id: str, data: dict): + log.setLevel(logging.INFO) + log.debug(locals()) + + # Single process version... + # for connection in self.active_connections: + # log.debug(vars(connection)) + # # websocket.path_params.get('client_id') # Testing... + # # if connection.scope.get('path') == group_id: # Testing... + + # if connection.path_params.get('group_id') == group_id: + # log.info('Found matching Group ID') + # await connection.send_json(data) + + # Redis version... + redis_conn = redis.Redis(host=settings.REDIS['server'], port=settings.REDIS['port'], db=6, password=None, encoding='utf-8', decode_responses=True) + + # Figure out key_name + key_name = f'ws:group_id:{group_id}' + + log.debug('BEGIN: Loop through Redis list entry') + log.debug(redis_conn.llen(key_name)) + + # ws_header_key = connection.headers.get('sec-websocket-key') + # ws_header_key = f'ws:header_key:{ws_header_key}' + # log.debug(f'Looking for: {ws_header_key}') + + for i in range(0, redis_conn.llen(key_name)): + log.debug(f'{i}: {redis_conn.lindex(key_name, i)}') + redis_ws_group_ws_header_key = redis_conn.lindex(key_name, i) + + # Check if they key + # redis_key_name_ws_conn = f'ws:header_key:{redis_ws_group_ws_header_key}' + # ws_conn_data = json.loads(redis_conn.get('redis_key_name_ws_conn')) + # active_ws_conn = ws_conn_data.get('ws_conn') + # log.debug(active_ws_conn) + for active_ws_conn in self.active_connections: + log.debug(vars(active_ws_conn)) + + ws_header_key = active_ws_conn.headers.get('sec-websocket-key') + + if redis_ws_group_ws_header_key == ws_header_key: + log.info(f'Found key in Redis list!') + await active_ws_conn.send_json(data) + else: + log.debug(f'Key not found in Redis list') + + log.debug('END: Loop through Redis list entry') + + redis_conn.close() + + # NOTE: Same as group, but no filter based on path + async def broadcast(self, message: str): + log.setLevel(logging.INFO) + log.debug(locals()) + + for connection in self.active_connections: + log.debug(vars(connection)) + await connection.send_text(message) + + async def send_personal_message(self, message: str, websocket: WebSocket): + log.setLevel(logging.DEBUG) + # log.debug(dir(websocket)) + log.debug(vars(websocket)) + log.debug(websocket.url) + log.debug(websocket.client) + log.debug(websocket.client_state) + log.debug(websocket.headers['sec-websocket-key']) + # log.debug(base64.decode(bytes(websocket.headers['sec-websocket-key']), 'utf-8')) + await websocket.send_text(message) + + + +# manager = ConnectionManager() + + + +# class AIOConnectionManager: +# def __init__(self): +# # NOTE: The active_connections list should be in Redis +# self.active_connections: List[WebSocket] = [] + +# async def connect(self, websocket: WebSocket): +# await websocket.accept() +# log.setLevel(logging.DEBUG) +# log.info('WS connect: Header Security Key:'+websocket.headers.get('sec-websocket-key')) +# # log.debug(websocket.client) # the Address(host=ip, port=number) +# # log.info('WS Key: '+websocket.headers.get('sec-websocket-key')) +# # log.debug(websocket.headers) # Most useful + +# self.active_connections.append(websocket) +# log.debug(self.active_connections) + +# redis_conn = aioredis.Redis(host=settings.REDIS['server'], port=settings.REDIS['port'], db=6, password=None, encoding='utf-8', decode_responses=True) + +# # message = {'hello': 'world', 'goodbye': 'test'} +# # data = json.loads(message) + +# ws_header_key = websocket.headers.get('sec-websocket-key') +# key_name = f'ws:sec_key:{ws_header_key}' +# data = {'ws_header_key': ws_header_key} + +# await redis_conn.setex(key_name, datetime.timedelta(minutes=90), json.dumps(data)) + +# r_entry = json.loads(await redis_conn.get(key_name)) # For testing... +# r_ttl = await redis_conn.ttl(key_name) # For testing... + +# # log.debug(r_entry) # For testing... +# # log.debug(r_entry.get('ws_header_key')) # For testing... +# # log.debug(r_ttl) # For testing... + + + +# # r_entries = redis.scan() +# # log.debug(r_entries) + +# redis_conn.close() +# # await redis.close() + +# async def connect_w_client_id(self, websocket: WebSocket, client_id: str, group_id = None): +# await websocket.accept() +# log.setLevel(logging.DEBUG) +# log.info('WS connect: Header Security Key:'+websocket.headers.get('sec-websocket-key')) + +# self.active_connections.append(websocket) +# log.debug(self.active_connections) + +# redis_conn = redis.Redis(host=settings.REDIS['server'], port=settings.REDIS['port'], db=6, password=None, encoding='utf-8', decode_responses=True) + +# # # ### Redis ### Testing One +# # ws_header_key = websocket.headers.get('sec-websocket-key') +# # key_name = f'ws:sec_key:{ws_header_key}' +# # data = {'ws_header_key': ws_header_key} + +# # redis_conn.setex(key_name, datetime.timedelta(minutes=90), json.dumps(data)) + +# # r_entry = json.loads(redis_conn.get(key_name)) # For testing... +# # r_ttl = redis_conn.ttl(key_name) # For testing... + +# # # log.debug(r_entry) # For testing... +# # # log.debug(r_entry.get('ws_header_key')) # For testing... +# # # log.debug(r_ttl) # For testing... + +# # # ### Redis ### Testing Two!! +# # key_name = f'ws:conn:{ws_header_key}' +# # data = websocket + +# # redis_conn.setex(key_name, datetime.timedelta(minutes=90), json.dumps(data)) + +# # r_entry = json.loads(redis_conn.get(key_name)) # For testing... + +# # log.debug(r_entry) # For testing... + + +# # ### Redis ### Testing Three!!! +# ws_header_key = websocket.headers.get('sec-websocket-key') + +# # # Set WS connection in Redis based on WS header security key +# # key_name = f'ws:header_key:{ws_header_key}' +# # data = {'ws_conn': websocket} + +# # redis_conn.setex(key_name, datetime.timedelta(minutes=90), json.dumps(data)) + +# # Get new ws_header_key and group_id +# ws_header_key = websocket.headers.get('sec-websocket-key') +# group_id = websocket.path_params.get('group_id') + +# # Figure out key_name +# key_name = f'ws:group_id:{group_id}' + +# # Get the current list from Redis entry + +# # group_id_ws_conn_li = [] +# # group_id_ws_conn_li = redis_conn.get(key_name, []) + +# # Add the new WS connection's header key to the list +# redis_conn.lpush(key_name, ws_header_key) + +# # group_id_ws_conn_li.append(ws_header_key) + +# # data = group_id_ws_conn_li + +# # redis_conn.hsetex(key_name, datetime.timedelta(minutes=90), json.dumps(data)) + + +# # r_entry = json.loads(redis_conn.lget(key_name)) # For testing... +# log.debug('BEGIN: Loop through list key') +# log.debug(redis_conn.llen(key_name)) + +# # for i in range(0, redis_conn.llen(key_name)): +# # log.debug(redis_conn.lindex(key_name, i)) + +# log.debug('END: Loop through list key') + +# redis_conn.close() + +# def disconnect(self, websocket: WebSocket): +# log.setLevel(logging.DEBUG) +# log.info('WS disconnect') +# self.active_connections.remove(websocket) + + +# # Targets: echo, direct, group, broadcast +# # send_ text, bytes, json +# # receive_ text, bytes, json +# async def echo(self, message: str, websocket: WebSocket): +# log.setLevel(logging.DEBUG) +# # log.debug(dir(websocket)) +# log.debug(vars(websocket)) +# log.debug(websocket.url) +# log.debug(websocket.client) +# log.debug(websocket.client_state) +# log.debug(websocket.headers['sec-websocket-key']) +# # log.debug(base64.decode(bytes(websocket.headers['sec-websocket-key']), 'utf-8')) +# await websocket.send_text(message) + +# async def direct(self, from_client_id: str, to_client_id: str, data: dict): +# log.setLevel(logging.DEBUG) +# for connection in self.active_connections: +# log.debug(vars(connection)) +# log.debug(connection) +# await connection.send_text(message) + +# async def group(self, group_id: str, data: dict): +# log.setLevel(logging.INFO) +# log.debug(locals()) + +# # Single process version... +# # for connection in self.active_connections: +# # log.debug(vars(connection)) +# # # websocket.path_params.get('client_id') # Testing... +# # # if connection.scope.get('path') == group_id: # Testing... + +# # if connection.path_params.get('group_id') == group_id: +# # log.info('Found matching Group ID') +# # await connection.send_json(data) + +# # Redis version... +# redis_conn = redis.Redis(host=settings.REDIS['server'], port=settings.REDIS['port'], db=6, password=None, encoding='utf-8', decode_responses=True) + +# # Figure out key_name +# key_name = f'ws:group_id:{group_id}' + +# log.debug('BEGIN: Loop through Redis list entry') +# log.debug(redis_conn.llen(key_name)) + +# # ws_header_key = connection.headers.get('sec-websocket-key') +# # ws_header_key = f'ws:header_key:{ws_header_key}' +# # log.debug(f'Looking for: {ws_header_key}') + +# for i in range(0, redis_conn.llen(key_name)): +# log.debug(f'{i}: {redis_conn.lindex(key_name, i)}') +# redis_ws_group_ws_header_key = redis_conn.lindex(key_name, i) + +# # Check if they key +# # redis_key_name_ws_conn = f'ws:header_key:{redis_ws_group_ws_header_key}' +# # ws_conn_data = json.loads(redis_conn.get('redis_key_name_ws_conn')) +# # active_ws_conn = ws_conn_data.get('ws_conn') +# # log.debug(active_ws_conn) +# for active_ws_conn in self.active_connections: +# log.debug(vars(active_ws_conn)) + +# ws_header_key = active_ws_conn.headers.get('sec-websocket-key') + +# if redis_ws_group_ws_header_key == ws_header_key: +# log.info(f'Found key in Redis list!') +# await active_ws_conn.send_json(data) +# else: +# log.debug(f'Key not found in Redis list') + +# log.debug('END: Loop through Redis list entry') + +# redis_conn.close() + +# # NOTE: Same as group, but no filter based on path +# async def broadcast(self, message: str): +# log.setLevel(logging.INFO) +# log.debug(locals()) + +# for connection in self.active_connections: +# log.debug(vars(connection)) +# await connection.send_text(message) + +# async def send_personal_message(self, message: str, websocket: WebSocket): +# log.setLevel(logging.DEBUG) +# # log.debug(dir(websocket)) +# log.debug(vars(websocket)) +# log.debug(websocket.url) +# log.debug(websocket.client) +# log.debug(websocket.client_state) +# log.debug(websocket.headers['sec-websocket-key']) +# # log.debug(base64.decode(bytes(websocket.headers['sec-websocket-key']), 'utf-8')) +# await websocket.send_text(message) + + +# aio_manager = AIOConnectionManager() + + +# Endpoints for??? +# /room/ (just a group of clients; for a related group like a poster presenter or session room) +# /client/ (for one specific client/browser; something specific to a browser???) +# /person/ (for one specific person; handles send and receiving their messages) + + + + +# @router.websocket('/ws_redis/group/{group_id}/client/{client_id}') +# async def ws_client_id( +# websocket: WebSocket, +# group_id: str, +# client_id: str, +# ): +# await manager.connect_w_client_id(websocket, client_id, group_id) +# # await aio_manager.connect_w_client_id(websocket, client_id, group_id) +# try: +# while True: +# data = await websocket.receive_json() # Returns dict +# # log.debug(data) + +# # group_path_id = f'/ws/group/{group_id}' +# # client_id = data.get('client_id') +# # echo (echo message), dm (direct message), group (group message), all (broadcast message to all), cmd, group_cmd(?) +# msg_type = data.get('type') +# cmd = data.get('cmd') +# msg = data.get('msg') + +# log.setLevel(logging.INFO) +# log.info(f'Group ID: {group_id}; Client ID: {client_id}; Type: {msg_type};') +# log.debug(f'Command: {cmd}') +# log.debug(f'Message: {msg}') + +# data['client_id'] = client_id +# data['group_id'] = group_id + +# await manager.group(group_id=group_id, data=data) +# # await aio_manager.group(group_id=group_id, data=data) + +# await redis_connector(websocket) + +# except WebSocketDisconnect: +# manager.disconnect(websocket) +# # aio_manager.disconnect(websocket) +# # await manager.broadcast(f'Client #{client_id} left') + + + + + + + +# @router.websocket('/ws_redis/looping') +# async def ws_looping( +# websocket: WebSocket, +# ): +# await manager.connect(websocket) +# # await manager.broadcast(f'Welcome to looping') +# try: +# while True: +# # NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING +# # await time.sleep(3.5) # NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING +# # NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING +# # data = await websocket.receive_json() +# # log.debug(data) +# # data_dict = data +# # data_dict = json.loads(data) +# # log.debug(data_dict['client_id']) +# # await manager.send_personal_message(f'You wrote: {data}', websocket) +# await manager.broadcast(f'Loop!!!') +# except WebSocketDisconnect: +# manager.disconnect(websocket) +# await manager.broadcast(f'Client left looping') + + +# @router.websocket("/ws_redis/{client_id}") +# async def websocket_endpoint( +# websocket: WebSocket, +# client_id: int, +# response: Response = Response, +# ): +# log.setLevel(logging.DEBUG) +# log.debug(locals()) + +# log.info('Root of ws. Waiting to accept a websocket and then the redis_connector') + +# await websocket.accept() +# await redis_connector(websocket) + + + + +@router.websocket('/ws/group/{group_id}/client/{client_id}') +@router.websocket('/ws_redis/group/{group_id}/client/{client_id}') +async def redis_ws_client_id( + websocket: WebSocket, + group_id: str, + client_id: str, + ): + log.setLevel(logging.DEBUG) + log.debug(locals()) + + log.info('WS Redis: Group ID and Client ID. Waiting to "accept" a websocket and then the "redis_connector"') + + await websocket.accept() + + await redis_connector(websocket, client_id, group_id) + + + # await manager.connect_w_client_id(websocket, client_id, group_id) + # # await aio_manager.connect_w_client_id(websocket, client_id, group_id) + # try: + # while True: + # data = await websocket.receive_json() # Returns dict + # # log.debug(data) + + # # group_path_id = f'/ws/group/{group_id}' + # # client_id = data.get('client_id') + # # echo (echo message), dm (direct message), group (group message), all (broadcast message to all), cmd, group_cmd(?) + # msg_type = data.get('type') + # cmd = data.get('cmd') + # msg = data.get('msg') + + # log.setLevel(logging.INFO) + # log.info(f'Group ID: {group_id}; Client ID: {client_id}; Type: {msg_type};') + # log.debug(f'Command: {cmd}') + # log.debug(f'Message: {msg}') + + # data['client_id'] = client_id + # data['group_id'] = group_id + + # await manager.group(group_id=group_id, data=data) + # # await aio_manager.group(group_id=group_id, data=data) + + # await redis_connector(websocket) + + # except WebSocketDisconnect: + # manager.disconnect(websocket) + # # aio_manager.disconnect(websocket) + # # await manager.broadcast(f'Client #{client_id} left') + + +async def redis_connector( + ws_conn: WebSocket, + client_id: str, + group_id = None + ): + log.setLevel(logging.DEBUG) + log.debug(locals()) + + async def consumer_handler(ws_conn: WebSocket, r, client_id, group_id): + log.info('consumer_handler()') + try: + while True: + data = await ws_conn.receive_json() # Returns dict + log.debug(data) + + # echo (echo message), dm (direct message), group (group message), all (broadcast message to all), cmd, group_cmd(?) + msg_type = data.get('type') + cmd = data.get('cmd') + msg = data.get('msg') + + log.setLevel(logging.INFO) + log.info(f'Group ID: {group_id}; Client ID: {client_id}; Type: {msg_type};') + log.debug(f'Command: {cmd}') + log.debug(f'Message: {msg}') + + data['client_id'] = client_id + data['group_id'] = group_id + log.debug(data) + + if data: + #logging.info(ws) + #logging.info(dir(message)) + # data = json.loads(message) + #await r.publish("chat:c", message) + #await r.publish("chat:c", str(data['message'])) + + # causes: TypeError: object int can't be used in 'await' expression + r.publish("chat:c", str(data.get('client_id'))) + r.publish("chat:c", str(data)) + # r.publish("chat:c", str({'test': 'xxxx'})) + # r.publish("chat:c", str('data goes here...')) + except WebSocketDisconnect as exc: + # TODO this needs handling better + log.error(exc) + + async def producer_handler(r, ws_conn: redis.client.PubSub, client_id, group_id): + log.info('producer_handler()') + (channel,) = await r.subscribe("chat:c") + assert isinstance(channel, redis.Channel) + try: + while True: + data = await channel.get() + if data: + await ws_conn.send_json(data) + except Exception as exc: + # TODO this needs handling better + log.error(exc) + + # redis_conn = await redis.create_pool(redis_url) + # Redis client bound to pool of connections (auto-reconnecting). + # r = redis.Redis(host=settings.REDIS['server'], port=settings.REDIS['port'], db=7, password=None, decode_responses=True) + log.info('Create Redis connection') + redis_conn = redis.Redis(host=settings.REDIS['server'], port=settings.REDIS['port'], db=6, password=None, encoding='utf-8', decode_responses=True) + # redis = redis.from_url( + # redis_url, encoding="utf-8", decode_responses=True + # ) + redis_conn_pubsub = redis_conn.pubsub() + + log.info('Run consumer_task') + # consumer_task = consumer_handler(ws_conn, redis_conn, client_id, group_id) + + # future_consumer = asyncio.create_task(consumer_task()) + future_consumer = asyncio.create_task(consumer_handler(ws_conn, redis_conn, client_id, group_id)) + + log.info('Run producer_task') + # producer_task = producer_handler(redis_conn_pubsub, ws_conn, client_id, group_id) + + # future_producer = asyncio.create_task(producer_task()) + future_producer = asyncio.create_task(producer_handler(redis_conn_pubsub, ws_conn, client_id, group_id)) + + await future_consumer + await future_producer + + # pending_tasks = asyncio.all_tasks(loop=event_loop) + + # # cancel each pending task + # for task in pending_tasks: + # logger.info(f'killing task {task.get_coro()}') + # event_loop.call_soon_threadsafe(task.cancel) + + # # This does not work for some reason. + # # "Passing coroutines is forbidden, use tasks explicitly." + # log.info('Start asyncio wait') + # done, pending = await asyncio.wait( + # [consumer_task, producer_task], + # return_when = asyncio.FIRST_COMPLETED, + # ) + # log.debug(f'Done task: {done}') + + # for task in pending: + # log.debug(f'Canceling task: {task}') + # task.cancel() + # await redis_conn.close() + # await redis_conn.wait_closed() +