Work on websockets. A lot...

This commit is contained in:
Scott Idem
2023-04-07 00:37:57 -04:00
parent b369d00b3e
commit 0eb775826f
4 changed files with 1273 additions and 72 deletions

View File

@@ -18,7 +18,7 @@ from . import config
from app.log import log, logging
# Import the routers here first:
from app.routers import aether_cfg, api_crud, api, importing, sql, account, activity_log, address, archive, archive_content, contact, cont_edu_cert, cont_edu_cert_person, data_store, event, event_abstract, event_badge, event_badge_importing, event_badge_template, event_device, event_exhibit, event_exhibit_tracking, event_file, event_importing, event_location, event_person, event_person_detail, event_person_tracking, event_presentation, event_presenter, event_registration, event_session, flask_cfg, fundraising, hosted_file, journal, journal_entry, log_client_viewing, lookup, membership_cfg, membership_group, membership_person_group, membership_person, membership_person_profile, membership_type, membership_person_type, order, order_v3, order_line, order_cart, organization, page, person, person_user, post, post_comment, product, qr, site, site_domain, user, websockets, e_cvent, c_idaa, e_impexium, e_stripe
from app.routers import aether_cfg, api_crud, api, importing, sql, account, activity_log, address, archive, archive_content, contact, cont_edu_cert, cont_edu_cert_person, data_store, event, event_abstract, event_badge, event_badge_importing, event_badge_template, event_device, event_exhibit, event_exhibit_tracking, event_file, event_importing, event_location, event_person, event_person_detail, event_person_tracking, event_presentation, event_presenter, event_registration, event_session, flask_cfg, fundraising, hosted_file, journal, journal_entry, log_client_viewing, lookup, membership_cfg, membership_group, membership_person_group, membership_person, membership_person_profile, membership_type, membership_person_type, order, order_v3, order_line, order_cart, organization, page, person, person_user, post, post_comment, product, qr, site, site_domain, user, websockets, websockets_redis, e_cvent, c_idaa, e_impexium, e_stripe
from app.db_sql import sql_select # , sql_connect
@@ -392,6 +392,10 @@ app.include_router(
#dependencies=[Depends(get_token_header)],
#responses={404: {'description': 'Not found'}},
)
app.include_router(
websockets_redis.router,
tags=['Websockets Redis'],
)
app.include_router(
e_cvent.router,

View File

@@ -3,6 +3,7 @@ from fastapi import APIRouter, Body, Depends, File, Form, Header, HTTPException,
from fastapi.responses import FileResponse
from pydantic import BaseModel, EmailStr, Field
from typing import Dict, List, Optional, Set, Union
from pdf2image import convert_from_path
from app.lib_general import log, logging, common_route_params, Common_Route_Params, common_route_params_min, Common_Route_Params_Min
from app.config import settings
@@ -699,20 +700,114 @@ async def get_hosted_file_obj(
# ### BEGIN ### API Hosted File ### download_tmp() ###
# Updated 2021-11-23
@router.get('/download/tmp/{filename}', response_model=Resp_Body_Base)
# Updated 2023-04-05
@router.get('/tmp/{subdirectory}/{filename}/download', response_model=Resp_Body_Base)
async def download_tmp(
filename: str = Query(..., min_length=4, max_length=100),
# x_account_id: str = Header(...),
by_alias: Optional[bool] = True,
exclude_unset: Optional[bool] = True,
response: Response = Response,
subdirectory: str = Query(..., min_length=1, max_length=100),
filename: str = Query(..., min_length=4, max_length=120),
commons: Common_Route_Params = Depends(common_route_params),
):
log.setLevel(logging.INFO) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
full_dest_path = 'admin/temp/order_line/order_line_list_2021-11-23_1310.xlsx'
filename = 'text.xlsx'
# NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING
time.sleep(3.5) # NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING
# NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING
return FileResponse(full_dest_path, filename=filename)
hosted_tmp_path = settings.FILES_PATH['hosted_tmp_root']
log.info(f'Hosted Tmp Path: {hosted_tmp_path}')
log.debug(shutil.disk_usage(hosted_tmp_path))
hosted_tmp_w_subdir = os.path.join(hosted_tmp_path, subdirectory)
# if pathlib.Path(hosted_tmp_w_subdir):
if os.path.exists(hosted_tmp_w_subdir):
log.info('Hosted tmp with subdirectory found')
else:
log.info('Hosted tmp with subdirectory not found')
return mk_resp(data=False, status_code=404, response=commons.response, status_message='The hosted tmp file subdirectory was not found.') # Not Found
hosted_tmp_w_subdir_filename = os.path.join(hosted_tmp_path, subdirectory, filename)
# if pathlib.Path(hosted_tmp_w_subdir_filename):
if os.path.exists(hosted_tmp_w_subdir_filename):
log.info('Hosted tmp with subdirectory and filename found')
else:
log.info('Hosted tmp with subdirectory and filename not found')
return mk_resp(data=False, status_code=404, response=commons.response, status_message='The hosted tmp file was not found.') # Not Found
return FileResponse(hosted_tmp_w_subdir_filename, filename=filename)
# ### END ### API Hosted File ### download_tmp() ###
# ### BEGIN ### API Hosted File Route ### convert_file() ###
# This just needs to return the correct model for a new hosted_file
# Updated 2023-04-04
@router.get('/{hosted_file_id}/convert_file')
async def convert_file(
hosted_file_id: str = Query(..., min_length=11, max_length=22),
from_type: str = 'pdf',
to_type: str = 'webp',
pdf_opt1: bool = False,
pdf_opt2: str = 'test',
commons: Common_Route_Params = Depends(common_route_params),
):
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
# Need to look up file_hash for hosted_file_id
# file_hash = '0080f0b03144927c173694745483894a09208d9444fdaccab054493f699361be'
file_hash = '279312d1738fd3a8a2f136b48295e28664d38b18de66c55de56b8886b9454784'
file_hash_filename = f'{file_hash}.file'
hosted_files_path = settings.FILES_PATH['hosted_files_root']
log.info(f'Hosted Files Path: {hosted_files_path}')
log.debug(shutil.disk_usage(hosted_files_path))
file_subdirectory = file_hash[0:2]
full_file_path = os.path.join(hosted_files_path, file_subdirectory, file_hash_filename)
log.info(f'File Hash with Subdirectory: {full_file_path}')
hosted_tmp_path = settings.FILES_PATH['hosted_tmp_root']
log.info(f'Hosted Tmp Path: {hosted_tmp_path}')
log.debug(shutil.disk_usage(hosted_tmp_path))
hosted_tmp_convert_file_path = os.path.join(hosted_tmp_path, 'convert_file')
if pathlib.Path(hosted_tmp_convert_file_path):
log.info('Hosted tmp convert file path found')
else:
log.info('Creating hosted tmp convert file path')
pathlib.Path(hosted_tmp_convert_file_path).mkdir(parents=True, exist_ok=True)
images = convert_from_path(full_file_path, size=(2160, None))
for image in images:
save_path = os.path.join(hosted_tmp_convert_file_path, 'converted_2160px_lossless_90q.webp')
# image.save('testing_2625px_9.png', compress_level=9)
# Lossy WebP takes about 25% of the time as WebP lossless compression with 100 level effort
# .46 seconds vs 2.1 seconds with example PDF
# image.save('testing_2625px_80q.webp', quality=80) # default
# timer_2a_start = timer()
image.save(save_path, lossless=False, quality=90) # default quality is 80
# timer_2a_end = timer()
# print( round((timer_2a_end - timer_2a_start), 8) )
# timer_2b_start = timer()
# image.save('testing_2160px_lossless_100q.webp', lossless=True, quality=100) # quality is level of effort
# timer_2b_end = timer()
# print( round((timer_2b_end - timer_2b_start), 8) )
# file_info = await save_file(
# file = file_obj,
# account_id = account_id,
# account_id_random = account_id_random,
# link_to_type = link_to_type,
# link_to_id = link_to_id,
# link_to_id_random = link_to_id_random,
# check_allowed_extension = False,
# )
# if file_info['saved']: pass

View File

@@ -3,6 +3,12 @@ from fastapi.responses import HTMLResponse
from pydantic import BaseModel, EmailStr, Field
from typing import Dict, List, Optional, Set, Union
import redis, asyncio, base64, datetime, hashlib, json, os, pathlib, shutil, time
# import asyncio, base64, datetime, hashlib, json, os, pathlib, shutil, time
# import aioredis, redis, asyncio, base64, datetime, hashlib, json, os, pathlib, shutil, time
# from aioredis import from_url, Redis
# import asyncio
# import aioredis
# import async_timeout
from app.lib_general import log, logging, common_route_params, Common_Route_Params, common_route_params_min, Common_Route_Params_Min
from app.config import settings
@@ -71,11 +77,127 @@ class ConnectionManager:
async def connect(self, websocket: WebSocket):
await websocket.accept()
log.info('WS connect')
log.setLevel(logging.DEBUG)
log.info('WS connect: Header Security Key:'+websocket.headers.get('sec-websocket-key'))
# log.debug(dir(websocket))
# log.debug(websocket.client) # the Address(host=ip, port=number)
# log.info('WS Key: '+websocket.headers.get('sec-websocket-key'))
# log.debug(websocket.headers) # Most useful
# log.debug(websocket.app) # Nope
# log.debug(websocket.app()) # Nope
# log.debug(websocket.session) # Nope
# log.debug(websocket.session()) # Nope
# log.debug(websocket.user) # Nope
# log.debug(websocket.values()) # Not sure if useful
self.active_connections.append(websocket)
log.debug(self.active_connections)
redis_conn = redis.Redis(host=settings.REDIS['server'], port=settings.REDIS['port'], db=6, password=None, encoding='utf-8', decode_responses=True)
# message = {'hello': 'world', 'goodbye': 'test'}
# data = json.loads(message)
ws_header_key = websocket.headers.get('sec-websocket-key')
key_name = f'ws:sec_key:{ws_header_key}'
data = {'ws_header_key': ws_header_key}
redis_conn.setex(key_name, datetime.timedelta(minutes=90), json.dumps(data))
r_entry = json.loads(redis_conn.get(key_name)) # For testing...
r_ttl = redis_conn.ttl(key_name) # For testing...
# log.debug(r_entry) # For testing...
# log.debug(r_entry.get('ws_header_key')) # For testing...
# log.debug(r_ttl) # For testing...
# r_entries = redis.scan()
# log.debug(r_entries)
redis_conn.close()
# await redis.close()
async def connect_w_client_id(self, websocket: WebSocket, client_id: str, group_id = None):
await websocket.accept()
log.setLevel(logging.DEBUG)
log.info('WS connect: Header Security Key:'+websocket.headers.get('sec-websocket-key'))
self.active_connections.append(websocket)
log.debug(self.active_connections)
redis_conn = redis.Redis(host=settings.REDIS['server'], port=settings.REDIS['port'], db=6, password=None, encoding='utf-8', decode_responses=True)
# # ### Redis ### Testing One
# ws_header_key = websocket.headers.get('sec-websocket-key')
# key_name = f'ws:sec_key:{ws_header_key}'
# data = {'ws_header_key': ws_header_key}
# redis_conn.setex(key_name, datetime.timedelta(minutes=90), json.dumps(data))
# r_entry = json.loads(redis_conn.get(key_name)) # For testing...
# r_ttl = redis_conn.ttl(key_name) # For testing...
# # log.debug(r_entry) # For testing...
# # log.debug(r_entry.get('ws_header_key')) # For testing...
# # log.debug(r_ttl) # For testing...
# # ### Redis ### Testing Two!!
# key_name = f'ws:conn:{ws_header_key}'
# data = websocket
# redis_conn.setex(key_name, datetime.timedelta(minutes=90), json.dumps(data))
# r_entry = json.loads(redis_conn.get(key_name)) # For testing...
# log.debug(r_entry) # For testing...
# ### Redis ### Testing Three!!!
ws_header_key = websocket.headers.get('sec-websocket-key')
# # Set WS connection in Redis based on WS header security key
# key_name = f'ws:header_key:{ws_header_key}'
# data = {'ws_conn': websocket}
# redis_conn.setex(key_name, datetime.timedelta(minutes=90), json.dumps(data))
# Get new ws_header_key and group_id
ws_header_key = websocket.headers.get('sec-websocket-key')
group_id = websocket.path_params.get('group_id')
# Figure out key_name
key_name = f'ws:group_id:{group_id}'
# Get the current list from Redis entry
# group_id_ws_conn_li = []
# group_id_ws_conn_li = redis_conn.get(key_name, [])
# Add the new WS connection's header key to the list
redis_conn.lpush(key_name, ws_header_key)
# group_id_ws_conn_li.append(ws_header_key)
# data = group_id_ws_conn_li
# redis_conn.hsetex(key_name, datetime.timedelta(minutes=90), json.dumps(data))
# r_entry = json.loads(redis_conn.lget(key_name)) # For testing...
log.debug('BEGIN: Loop through list key')
log.debug(redis_conn.llen(key_name))
# for i in range(0, redis_conn.llen(key_name)):
# log.debug(redis_conn.lindex(key_name, i))
log.debug('END: Loop through list key')
redis_conn.close()
def disconnect(self, websocket: WebSocket):
log.setLevel(logging.DEBUG)
log.info('WS disconnect')
self.active_connections.remove(websocket)
@@ -105,13 +227,52 @@ class ConnectionManager:
log.setLevel(logging.INFO)
log.debug(locals())
for connection in self.active_connections:
log.debug(vars(connection))
# websocket.path_params.get('client_id')
# if connection.scope.get('path') == group_id:
if connection.path_params.get('group_id') == group_id:
log.info('Found matching Group ID')
await connection.send_json(data)
# Single process version...
# for connection in self.active_connections:
# log.debug(vars(connection))
# # websocket.path_params.get('client_id') # Testing...
# # if connection.scope.get('path') == group_id: # Testing...
# if connection.path_params.get('group_id') == group_id:
# log.info('Found matching Group ID')
# await connection.send_json(data)
# Redis version...
redis_conn = redis.Redis(host=settings.REDIS['server'], port=settings.REDIS['port'], db=6, password=None, encoding='utf-8', decode_responses=True)
# Figure out key_name
key_name = f'ws:group_id:{group_id}'
log.debug('BEGIN: Loop through Redis list entry')
log.debug(redis_conn.llen(key_name))
# ws_header_key = connection.headers.get('sec-websocket-key')
# ws_header_key = f'ws:header_key:{ws_header_key}'
# log.debug(f'Looking for: {ws_header_key}')
for i in range(0, redis_conn.llen(key_name)):
log.debug(f'{i}: {redis_conn.lindex(key_name, i)}')
redis_ws_group_ws_header_key = redis_conn.lindex(key_name, i)
# Check if they key
# redis_key_name_ws_conn = f'ws:header_key:{redis_ws_group_ws_header_key}'
# ws_conn_data = json.loads(redis_conn.get('redis_key_name_ws_conn'))
# active_ws_conn = ws_conn_data.get('ws_conn')
# log.debug(active_ws_conn)
for active_ws_conn in self.active_connections:
log.debug(vars(active_ws_conn))
ws_header_key = active_ws_conn.headers.get('sec-websocket-key')
if redis_ws_group_ws_header_key == ws_header_key:
log.info(f'Found key in Redis list!')
await active_ws_conn.send_json(data)
else:
log.debug(f'Key not found in Redis list')
log.debug('END: Loop through Redis list entry')
redis_conn.close()
# NOTE: Same as group, but no filter based on path
async def broadcast(self, message: str):
@@ -138,12 +299,289 @@ class ConnectionManager:
manager = ConnectionManager()
# class AIOConnectionManager:
# def __init__(self):
# # NOTE: The active_connections list should be in Redis
# self.active_connections: List[WebSocket] = []
# async def connect(self, websocket: WebSocket):
# await websocket.accept()
# log.setLevel(logging.DEBUG)
# log.info('WS connect: Header Security Key:'+websocket.headers.get('sec-websocket-key'))
# # log.debug(websocket.client) # the Address(host=ip, port=number)
# # log.info('WS Key: '+websocket.headers.get('sec-websocket-key'))
# # log.debug(websocket.headers) # Most useful
# self.active_connections.append(websocket)
# log.debug(self.active_connections)
# redis_conn = aioredis.Redis(host=settings.REDIS['server'], port=settings.REDIS['port'], db=6, password=None, encoding='utf-8', decode_responses=True)
# # message = {'hello': 'world', 'goodbye': 'test'}
# # data = json.loads(message)
# ws_header_key = websocket.headers.get('sec-websocket-key')
# key_name = f'ws:sec_key:{ws_header_key}'
# data = {'ws_header_key': ws_header_key}
# await redis_conn.setex(key_name, datetime.timedelta(minutes=90), json.dumps(data))
# r_entry = json.loads(await redis_conn.get(key_name)) # For testing...
# r_ttl = await redis_conn.ttl(key_name) # For testing...
# # log.debug(r_entry) # For testing...
# # log.debug(r_entry.get('ws_header_key')) # For testing...
# # log.debug(r_ttl) # For testing...
# # r_entries = redis.scan()
# # log.debug(r_entries)
# redis_conn.close()
# # await redis.close()
# async def connect_w_client_id(self, websocket: WebSocket, client_id: str, group_id = None):
# await websocket.accept()
# log.setLevel(logging.DEBUG)
# log.info('WS connect: Header Security Key:'+websocket.headers.get('sec-websocket-key'))
# self.active_connections.append(websocket)
# log.debug(self.active_connections)
# redis_conn = redis.Redis(host=settings.REDIS['server'], port=settings.REDIS['port'], db=6, password=None, encoding='utf-8', decode_responses=True)
# # # ### Redis ### Testing One
# # ws_header_key = websocket.headers.get('sec-websocket-key')
# # key_name = f'ws:sec_key:{ws_header_key}'
# # data = {'ws_header_key': ws_header_key}
# # redis_conn.setex(key_name, datetime.timedelta(minutes=90), json.dumps(data))
# # r_entry = json.loads(redis_conn.get(key_name)) # For testing...
# # r_ttl = redis_conn.ttl(key_name) # For testing...
# # # log.debug(r_entry) # For testing...
# # # log.debug(r_entry.get('ws_header_key')) # For testing...
# # # log.debug(r_ttl) # For testing...
# # # ### Redis ### Testing Two!!
# # key_name = f'ws:conn:{ws_header_key}'
# # data = websocket
# # redis_conn.setex(key_name, datetime.timedelta(minutes=90), json.dumps(data))
# # r_entry = json.loads(redis_conn.get(key_name)) # For testing...
# # log.debug(r_entry) # For testing...
# # ### Redis ### Testing Three!!!
# ws_header_key = websocket.headers.get('sec-websocket-key')
# # # Set WS connection in Redis based on WS header security key
# # key_name = f'ws:header_key:{ws_header_key}'
# # data = {'ws_conn': websocket}
# # redis_conn.setex(key_name, datetime.timedelta(minutes=90), json.dumps(data))
# # Get new ws_header_key and group_id
# ws_header_key = websocket.headers.get('sec-websocket-key')
# group_id = websocket.path_params.get('group_id')
# # Figure out key_name
# key_name = f'ws:group_id:{group_id}'
# # Get the current list from Redis entry
# # group_id_ws_conn_li = []
# # group_id_ws_conn_li = redis_conn.get(key_name, [])
# # Add the new WS connection's header key to the list
# redis_conn.lpush(key_name, ws_header_key)
# # group_id_ws_conn_li.append(ws_header_key)
# # data = group_id_ws_conn_li
# # redis_conn.hsetex(key_name, datetime.timedelta(minutes=90), json.dumps(data))
# # r_entry = json.loads(redis_conn.lget(key_name)) # For testing...
# log.debug('BEGIN: Loop through list key')
# log.debug(redis_conn.llen(key_name))
# # for i in range(0, redis_conn.llen(key_name)):
# # log.debug(redis_conn.lindex(key_name, i))
# log.debug('END: Loop through list key')
# redis_conn.close()
# def disconnect(self, websocket: WebSocket):
# log.setLevel(logging.DEBUG)
# log.info('WS disconnect')
# self.active_connections.remove(websocket)
# # Targets: echo, direct, group, broadcast
# # send_ text, bytes, json
# # receive_ text, bytes, json
# async def echo(self, message: str, websocket: WebSocket):
# log.setLevel(logging.DEBUG)
# # log.debug(dir(websocket))
# log.debug(vars(websocket))
# log.debug(websocket.url)
# log.debug(websocket.client)
# log.debug(websocket.client_state)
# log.debug(websocket.headers['sec-websocket-key'])
# # log.debug(base64.decode(bytes(websocket.headers['sec-websocket-key']), 'utf-8'))
# await websocket.send_text(message)
# async def direct(self, from_client_id: str, to_client_id: str, data: dict):
# log.setLevel(logging.DEBUG)
# for connection in self.active_connections:
# log.debug(vars(connection))
# log.debug(connection)
# await connection.send_text(message)
# async def group(self, group_id: str, data: dict):
# log.setLevel(logging.INFO)
# log.debug(locals())
# # Single process version...
# # for connection in self.active_connections:
# # log.debug(vars(connection))
# # # websocket.path_params.get('client_id') # Testing...
# # # if connection.scope.get('path') == group_id: # Testing...
# # if connection.path_params.get('group_id') == group_id:
# # log.info('Found matching Group ID')
# # await connection.send_json(data)
# # Redis version...
# redis_conn = redis.Redis(host=settings.REDIS['server'], port=settings.REDIS['port'], db=6, password=None, encoding='utf-8', decode_responses=True)
# # Figure out key_name
# key_name = f'ws:group_id:{group_id}'
# log.debug('BEGIN: Loop through Redis list entry')
# log.debug(redis_conn.llen(key_name))
# # ws_header_key = connection.headers.get('sec-websocket-key')
# # ws_header_key = f'ws:header_key:{ws_header_key}'
# # log.debug(f'Looking for: {ws_header_key}')
# for i in range(0, redis_conn.llen(key_name)):
# log.debug(f'{i}: {redis_conn.lindex(key_name, i)}')
# redis_ws_group_ws_header_key = redis_conn.lindex(key_name, i)
# # Check if they key
# # redis_key_name_ws_conn = f'ws:header_key:{redis_ws_group_ws_header_key}'
# # ws_conn_data = json.loads(redis_conn.get('redis_key_name_ws_conn'))
# # active_ws_conn = ws_conn_data.get('ws_conn')
# # log.debug(active_ws_conn)
# for active_ws_conn in self.active_connections:
# log.debug(vars(active_ws_conn))
# ws_header_key = active_ws_conn.headers.get('sec-websocket-key')
# if redis_ws_group_ws_header_key == ws_header_key:
# log.info(f'Found key in Redis list!')
# await active_ws_conn.send_json(data)
# else:
# log.debug(f'Key not found in Redis list')
# log.debug('END: Loop through Redis list entry')
# redis_conn.close()
# # NOTE: Same as group, but no filter based on path
# async def broadcast(self, message: str):
# log.setLevel(logging.INFO)
# log.debug(locals())
# for connection in self.active_connections:
# log.debug(vars(connection))
# await connection.send_text(message)
# async def send_personal_message(self, message: str, websocket: WebSocket):
# log.setLevel(logging.DEBUG)
# # log.debug(dir(websocket))
# log.debug(vars(websocket))
# log.debug(websocket.url)
# log.debug(websocket.client)
# log.debug(websocket.client_state)
# log.debug(websocket.headers['sec-websocket-key'])
# # log.debug(base64.decode(bytes(websocket.headers['sec-websocket-key']), 'utf-8'))
# await websocket.send_text(message)
# aio_manager = AIOConnectionManager()
# Endpoints for???
# /room/<id> (just a group of clients; for a related group like a poster presenter or session room)
# /client/<id> (for one specific client/browser; something specific to a browser???)
# /person/<id> (for one specific person; handles send and receiving their messages)
@router.websocket('/ws_single/group/{group_id}/client/{client_id}')
async def single_ws_client_id(
websocket: WebSocket,
group_id: str,
client_id: str,
):
await manager.connect_w_client_id(websocket, client_id, group_id)
# await aio_manager.connect_w_client_id(websocket, client_id, group_id)
try:
while True:
data = await websocket.receive_json() # Returns dict
# log.debug(data)
# group_path_id = f'/ws/group/{group_id}'
# client_id = data.get('client_id')
# echo (echo message), dm (direct message), group (group message), all (broadcast message to all), cmd, group_cmd(?)
msg_type = data.get('type')
cmd = data.get('cmd')
msg = data.get('msg')
log.setLevel(logging.INFO)
log.info(f'Group ID: {group_id}; Client ID: {client_id}; Type: {msg_type};')
log.debug(f'Command: {cmd}')
log.debug(f'Message: {msg}')
data['client_id'] = client_id
data['group_id'] = group_id
await manager.group(group_id=group_id, data=data)
# await aio_manager.group(group_id=group_id, data=data)
# if msg_type:
# if msg_type == 'echo':
# await manager.echo(f'Echo: {data}', websocket)
# elif msg_type == 'dm':
# await manager.direct(f'DM: {msg}')
# elif msg_type == 'group':
# await manager.group(group_id=group_id, f'Group: {data}')
# elif msg_type == 'all':
# await manager.broadcast(f'All: {data}')
# elif msg_type == 'cmd':
# await manager.broadcast(f'Command: {data}')
# else:
# await manager.broadcast(f'Unknown: {data}')
# else:
# await manager.broadcast(f'MSG: {data}')
except WebSocketDisconnect:
manager.disconnect(websocket)
# aio_manager.disconnect(websocket)
# await manager.broadcast(f'Client #{client_id} left')
@router.websocket('/ws/client/{client_id}')
async def ws_client_id(
websocket: WebSocket,
@@ -188,57 +626,6 @@ async def ws_client_id(
# await manager.broadcast(f'Client #{client_id} left')
@router.websocket('/ws/group/{group_id}/client/{client_id}')
async def ws_client_id(
websocket: WebSocket,
group_id: str,
client_id: str,
):
await manager.connect(websocket)
try:
while True:
data = await websocket.receive_json() # Returns dict
# log.debug(data)
# group_path_id = f'/ws/group/{group_id}'
# client_id = data.get('client_id')
# echo (echo message), dm (direct message), group (group message), all (broadcast message to all), cmd, group_cmd(?)
msg_type = data.get('type')
cmd = data.get('cmd')
msg = data.get('msg')
log.setLevel(logging.INFO)
log.info(f'Group ID: {group_id}; Client ID: {client_id}; Type: {msg_type};')
log.debug(f'Command: {cmd}')
log.debug(f'Message: {msg}')
data['client_id'] = client_id
data['group_id'] = group_id
await manager.group(group_id=group_id, data=data)
# if msg_type:
# if msg_type == 'echo':
# await manager.echo(f'Echo: {data}', websocket)
# elif msg_type == 'dm':
# await manager.direct(f'DM: {msg}')
# elif msg_type == 'group':
# await manager.group(group_id=group_id, f'Group: {data}')
# elif msg_type == 'all':
# await manager.broadcast(f'All: {data}')
# elif msg_type == 'cmd':
# await manager.broadcast(f'Command: {data}')
# else:
# await manager.broadcast(f'Unknown: {data}')
# else:
# await manager.broadcast(f'MSG: {data}')
except WebSocketDisconnect:
manager.disconnect(websocket)
# await manager.broadcast(f'Client #{client_id} left')
@router.websocket('/ws/{client_id}')
async def ws_id(
websocket: WebSocket,
@@ -259,7 +646,6 @@ async def ws_id(
await manager.broadcast(f'Client #{client_id} left')
# @router.websocket('/ws/room/{room_id}')
# async def ws_room_id(
# websocket: WebSocket,
@@ -362,6 +748,7 @@ async def redis_connector(
# redis = await redis.create_pool(redis_url)
# Redis client bound to pool of connections (auto-reconnecting).
# r = redis.Redis(host=settings.REDIS['server'], port=settings.REDIS['port'], db=7, password=None, decode_responses=True)
redis = redis.from_url(
redis_url, encoding="utf-8", decode_responses=True
)

View File

@@ -0,0 +1,715 @@
from fastapi import APIRouter, FastAPI, Response, WebSocket, WebSocketDisconnect
from fastapi.responses import HTMLResponse
from pydantic import BaseModel, EmailStr, Field
from typing import Dict, List, Optional, Set, Union
import asyncio, base64, datetime, hashlib, json, os, pathlib, redis, shutil, time
# import aioredis, redis, asyncio, base64, datetime, hashlib, json, os, pathlib, shutil, time
# from aioredis import from_url, Redis
# import asyncio
# import aioredis
# import async_timeout
from app.lib_general import log, logging, common_route_params, Common_Route_Params, common_route_params_min, Common_Route_Params_Min
from app.config import settings
router = APIRouter()
class ConnectionManager:
def __init__(self):
# NOTE: The active_connections list should be in Redis
self.active_connections: List[WebSocket] = []
async def connect(self, websocket: WebSocket):
await websocket.accept()
log.setLevel(logging.DEBUG)
log.info('WS connect: Header Security Key:'+websocket.headers.get('sec-websocket-key'))
# log.debug(dir(websocket))
# log.debug(websocket.client) # the Address(host=ip, port=number)
# log.info('WS Key: '+websocket.headers.get('sec-websocket-key'))
# log.debug(websocket.headers) # Most useful
# log.debug(websocket.app) # Nope
# log.debug(websocket.app()) # Nope
# log.debug(websocket.session) # Nope
# log.debug(websocket.session()) # Nope
# log.debug(websocket.user) # Nope
# log.debug(websocket.values()) # Not sure if useful
self.active_connections.append(websocket)
log.debug(self.active_connections)
redis_conn = redis.Redis(host=settings.REDIS['server'], port=settings.REDIS['port'], db=6, password=None, encoding='utf-8', decode_responses=True)
# message = {'hello': 'world', 'goodbye': 'test'}
# data = json.loads(message)
ws_header_key = websocket.headers.get('sec-websocket-key')
key_name = f'ws:sec_key:{ws_header_key}'
data = {'ws_header_key': ws_header_key}
redis_conn.setex(key_name, datetime.timedelta(minutes=90), json.dumps(data))
r_entry = json.loads(redis_conn.get(key_name)) # For testing...
r_ttl = redis_conn.ttl(key_name) # For testing...
# log.debug(r_entry) # For testing...
# log.debug(r_entry.get('ws_header_key')) # For testing...
# log.debug(r_ttl) # For testing...
# r_entries = redis.scan()
# log.debug(r_entries)
redis_conn.close()
# await redis.close()
async def connect_w_client_id(self, websocket: WebSocket, client_id: str, group_id = None):
await websocket.accept()
log.setLevel(logging.DEBUG)
log.info('WS connect: Header Security Key:'+websocket.headers.get('sec-websocket-key'))
self.active_connections.append(websocket)
log.debug(self.active_connections)
redis_conn = redis.Redis(host=settings.REDIS['server'], port=settings.REDIS['port'], db=6, password=None, encoding='utf-8', decode_responses=True)
# # ### Redis ### Testing One
# ws_header_key = websocket.headers.get('sec-websocket-key')
# key_name = f'ws:sec_key:{ws_header_key}'
# data = {'ws_header_key': ws_header_key}
# redis_conn.setex(key_name, datetime.timedelta(minutes=90), json.dumps(data))
# r_entry = json.loads(redis_conn.get(key_name)) # For testing...
# r_ttl = redis_conn.ttl(key_name) # For testing...
# # log.debug(r_entry) # For testing...
# # log.debug(r_entry.get('ws_header_key')) # For testing...
# # log.debug(r_ttl) # For testing...
# # ### Redis ### Testing Two!!
# key_name = f'ws:conn:{ws_header_key}'
# data = websocket
# redis_conn.setex(key_name, datetime.timedelta(minutes=90), json.dumps(data))
# r_entry = json.loads(redis_conn.get(key_name)) # For testing...
# log.debug(r_entry) # For testing...
# ### Redis ### Testing Three!!!
ws_header_key = websocket.headers.get('sec-websocket-key')
# # Set WS connection in Redis based on WS header security key
# key_name = f'ws:header_key:{ws_header_key}'
# data = {'ws_conn': websocket}
# redis_conn.setex(key_name, datetime.timedelta(minutes=90), json.dumps(data))
# Get new ws_header_key and group_id
ws_header_key = websocket.headers.get('sec-websocket-key')
group_id = websocket.path_params.get('group_id')
# Figure out key_name
key_name = f'ws:group_id:{group_id}'
# Get the current list from Redis entry
# group_id_ws_conn_li = []
# group_id_ws_conn_li = redis_conn.get(key_name, [])
# Add the new WS connection's header key to the list
redis_conn.lpush(key_name, ws_header_key)
# group_id_ws_conn_li.append(ws_header_key)
# data = group_id_ws_conn_li
# redis_conn.hsetex(key_name, datetime.timedelta(minutes=90), json.dumps(data))
# r_entry = json.loads(redis_conn.lget(key_name)) # For testing...
log.debug('BEGIN: Loop through list key')
log.debug(redis_conn.llen(key_name))
# for i in range(0, redis_conn.llen(key_name)):
# log.debug(redis_conn.lindex(key_name, i))
log.debug('END: Loop through list key')
redis_conn.close()
def disconnect(self, websocket: WebSocket):
log.setLevel(logging.DEBUG)
log.info('WS disconnect')
self.active_connections.remove(websocket)
# Targets: echo, direct, group, broadcast
# send_ text, bytes, json
# receive_ text, bytes, json
async def echo(self, message: str, websocket: WebSocket):
log.setLevel(logging.DEBUG)
# log.debug(dir(websocket))
log.debug(vars(websocket))
log.debug(websocket.url)
log.debug(websocket.client)
log.debug(websocket.client_state)
log.debug(websocket.headers['sec-websocket-key'])
# log.debug(base64.decode(bytes(websocket.headers['sec-websocket-key']), 'utf-8'))
await websocket.send_text(message)
async def direct(self, from_client_id: str, to_client_id: str, data: dict):
log.setLevel(logging.DEBUG)
for connection in self.active_connections:
log.debug(vars(connection))
log.debug(connection)
await connection.send_text(message)
async def group(self, group_id: str, data: dict):
log.setLevel(logging.INFO)
log.debug(locals())
# Single process version...
# for connection in self.active_connections:
# log.debug(vars(connection))
# # websocket.path_params.get('client_id') # Testing...
# # if connection.scope.get('path') == group_id: # Testing...
# if connection.path_params.get('group_id') == group_id:
# log.info('Found matching Group ID')
# await connection.send_json(data)
# Redis version...
redis_conn = redis.Redis(host=settings.REDIS['server'], port=settings.REDIS['port'], db=6, password=None, encoding='utf-8', decode_responses=True)
# Figure out key_name
key_name = f'ws:group_id:{group_id}'
log.debug('BEGIN: Loop through Redis list entry')
log.debug(redis_conn.llen(key_name))
# ws_header_key = connection.headers.get('sec-websocket-key')
# ws_header_key = f'ws:header_key:{ws_header_key}'
# log.debug(f'Looking for: {ws_header_key}')
for i in range(0, redis_conn.llen(key_name)):
log.debug(f'{i}: {redis_conn.lindex(key_name, i)}')
redis_ws_group_ws_header_key = redis_conn.lindex(key_name, i)
# Check if they key
# redis_key_name_ws_conn = f'ws:header_key:{redis_ws_group_ws_header_key}'
# ws_conn_data = json.loads(redis_conn.get('redis_key_name_ws_conn'))
# active_ws_conn = ws_conn_data.get('ws_conn')
# log.debug(active_ws_conn)
for active_ws_conn in self.active_connections:
log.debug(vars(active_ws_conn))
ws_header_key = active_ws_conn.headers.get('sec-websocket-key')
if redis_ws_group_ws_header_key == ws_header_key:
log.info(f'Found key in Redis list!')
await active_ws_conn.send_json(data)
else:
log.debug(f'Key not found in Redis list')
log.debug('END: Loop through Redis list entry')
redis_conn.close()
# NOTE: Same as group, but no filter based on path
async def broadcast(self, message: str):
log.setLevel(logging.INFO)
log.debug(locals())
for connection in self.active_connections:
log.debug(vars(connection))
await connection.send_text(message)
async def send_personal_message(self, message: str, websocket: WebSocket):
log.setLevel(logging.DEBUG)
# log.debug(dir(websocket))
log.debug(vars(websocket))
log.debug(websocket.url)
log.debug(websocket.client)
log.debug(websocket.client_state)
log.debug(websocket.headers['sec-websocket-key'])
# log.debug(base64.decode(bytes(websocket.headers['sec-websocket-key']), 'utf-8'))
await websocket.send_text(message)
# manager = ConnectionManager()
# class AIOConnectionManager:
# def __init__(self):
# # NOTE: The active_connections list should be in Redis
# self.active_connections: List[WebSocket] = []
# async def connect(self, websocket: WebSocket):
# await websocket.accept()
# log.setLevel(logging.DEBUG)
# log.info('WS connect: Header Security Key:'+websocket.headers.get('sec-websocket-key'))
# # log.debug(websocket.client) # the Address(host=ip, port=number)
# # log.info('WS Key: '+websocket.headers.get('sec-websocket-key'))
# # log.debug(websocket.headers) # Most useful
# self.active_connections.append(websocket)
# log.debug(self.active_connections)
# redis_conn = aioredis.Redis(host=settings.REDIS['server'], port=settings.REDIS['port'], db=6, password=None, encoding='utf-8', decode_responses=True)
# # message = {'hello': 'world', 'goodbye': 'test'}
# # data = json.loads(message)
# ws_header_key = websocket.headers.get('sec-websocket-key')
# key_name = f'ws:sec_key:{ws_header_key}'
# data = {'ws_header_key': ws_header_key}
# await redis_conn.setex(key_name, datetime.timedelta(minutes=90), json.dumps(data))
# r_entry = json.loads(await redis_conn.get(key_name)) # For testing...
# r_ttl = await redis_conn.ttl(key_name) # For testing...
# # log.debug(r_entry) # For testing...
# # log.debug(r_entry.get('ws_header_key')) # For testing...
# # log.debug(r_ttl) # For testing...
# # r_entries = redis.scan()
# # log.debug(r_entries)
# redis_conn.close()
# # await redis.close()
# async def connect_w_client_id(self, websocket: WebSocket, client_id: str, group_id = None):
# await websocket.accept()
# log.setLevel(logging.DEBUG)
# log.info('WS connect: Header Security Key:'+websocket.headers.get('sec-websocket-key'))
# self.active_connections.append(websocket)
# log.debug(self.active_connections)
# redis_conn = redis.Redis(host=settings.REDIS['server'], port=settings.REDIS['port'], db=6, password=None, encoding='utf-8', decode_responses=True)
# # # ### Redis ### Testing One
# # ws_header_key = websocket.headers.get('sec-websocket-key')
# # key_name = f'ws:sec_key:{ws_header_key}'
# # data = {'ws_header_key': ws_header_key}
# # redis_conn.setex(key_name, datetime.timedelta(minutes=90), json.dumps(data))
# # r_entry = json.loads(redis_conn.get(key_name)) # For testing...
# # r_ttl = redis_conn.ttl(key_name) # For testing...
# # # log.debug(r_entry) # For testing...
# # # log.debug(r_entry.get('ws_header_key')) # For testing...
# # # log.debug(r_ttl) # For testing...
# # # ### Redis ### Testing Two!!
# # key_name = f'ws:conn:{ws_header_key}'
# # data = websocket
# # redis_conn.setex(key_name, datetime.timedelta(minutes=90), json.dumps(data))
# # r_entry = json.loads(redis_conn.get(key_name)) # For testing...
# # log.debug(r_entry) # For testing...
# # ### Redis ### Testing Three!!!
# ws_header_key = websocket.headers.get('sec-websocket-key')
# # # Set WS connection in Redis based on WS header security key
# # key_name = f'ws:header_key:{ws_header_key}'
# # data = {'ws_conn': websocket}
# # redis_conn.setex(key_name, datetime.timedelta(minutes=90), json.dumps(data))
# # Get new ws_header_key and group_id
# ws_header_key = websocket.headers.get('sec-websocket-key')
# group_id = websocket.path_params.get('group_id')
# # Figure out key_name
# key_name = f'ws:group_id:{group_id}'
# # Get the current list from Redis entry
# # group_id_ws_conn_li = []
# # group_id_ws_conn_li = redis_conn.get(key_name, [])
# # Add the new WS connection's header key to the list
# redis_conn.lpush(key_name, ws_header_key)
# # group_id_ws_conn_li.append(ws_header_key)
# # data = group_id_ws_conn_li
# # redis_conn.hsetex(key_name, datetime.timedelta(minutes=90), json.dumps(data))
# # r_entry = json.loads(redis_conn.lget(key_name)) # For testing...
# log.debug('BEGIN: Loop through list key')
# log.debug(redis_conn.llen(key_name))
# # for i in range(0, redis_conn.llen(key_name)):
# # log.debug(redis_conn.lindex(key_name, i))
# log.debug('END: Loop through list key')
# redis_conn.close()
# def disconnect(self, websocket: WebSocket):
# log.setLevel(logging.DEBUG)
# log.info('WS disconnect')
# self.active_connections.remove(websocket)
# # Targets: echo, direct, group, broadcast
# # send_ text, bytes, json
# # receive_ text, bytes, json
# async def echo(self, message: str, websocket: WebSocket):
# log.setLevel(logging.DEBUG)
# # log.debug(dir(websocket))
# log.debug(vars(websocket))
# log.debug(websocket.url)
# log.debug(websocket.client)
# log.debug(websocket.client_state)
# log.debug(websocket.headers['sec-websocket-key'])
# # log.debug(base64.decode(bytes(websocket.headers['sec-websocket-key']), 'utf-8'))
# await websocket.send_text(message)
# async def direct(self, from_client_id: str, to_client_id: str, data: dict):
# log.setLevel(logging.DEBUG)
# for connection in self.active_connections:
# log.debug(vars(connection))
# log.debug(connection)
# await connection.send_text(message)
# async def group(self, group_id: str, data: dict):
# log.setLevel(logging.INFO)
# log.debug(locals())
# # Single process version...
# # for connection in self.active_connections:
# # log.debug(vars(connection))
# # # websocket.path_params.get('client_id') # Testing...
# # # if connection.scope.get('path') == group_id: # Testing...
# # if connection.path_params.get('group_id') == group_id:
# # log.info('Found matching Group ID')
# # await connection.send_json(data)
# # Redis version...
# redis_conn = redis.Redis(host=settings.REDIS['server'], port=settings.REDIS['port'], db=6, password=None, encoding='utf-8', decode_responses=True)
# # Figure out key_name
# key_name = f'ws:group_id:{group_id}'
# log.debug('BEGIN: Loop through Redis list entry')
# log.debug(redis_conn.llen(key_name))
# # ws_header_key = connection.headers.get('sec-websocket-key')
# # ws_header_key = f'ws:header_key:{ws_header_key}'
# # log.debug(f'Looking for: {ws_header_key}')
# for i in range(0, redis_conn.llen(key_name)):
# log.debug(f'{i}: {redis_conn.lindex(key_name, i)}')
# redis_ws_group_ws_header_key = redis_conn.lindex(key_name, i)
# # Check if they key
# # redis_key_name_ws_conn = f'ws:header_key:{redis_ws_group_ws_header_key}'
# # ws_conn_data = json.loads(redis_conn.get('redis_key_name_ws_conn'))
# # active_ws_conn = ws_conn_data.get('ws_conn')
# # log.debug(active_ws_conn)
# for active_ws_conn in self.active_connections:
# log.debug(vars(active_ws_conn))
# ws_header_key = active_ws_conn.headers.get('sec-websocket-key')
# if redis_ws_group_ws_header_key == ws_header_key:
# log.info(f'Found key in Redis list!')
# await active_ws_conn.send_json(data)
# else:
# log.debug(f'Key not found in Redis list')
# log.debug('END: Loop through Redis list entry')
# redis_conn.close()
# # NOTE: Same as group, but no filter based on path
# async def broadcast(self, message: str):
# log.setLevel(logging.INFO)
# log.debug(locals())
# for connection in self.active_connections:
# log.debug(vars(connection))
# await connection.send_text(message)
# async def send_personal_message(self, message: str, websocket: WebSocket):
# log.setLevel(logging.DEBUG)
# # log.debug(dir(websocket))
# log.debug(vars(websocket))
# log.debug(websocket.url)
# log.debug(websocket.client)
# log.debug(websocket.client_state)
# log.debug(websocket.headers['sec-websocket-key'])
# # log.debug(base64.decode(bytes(websocket.headers['sec-websocket-key']), 'utf-8'))
# await websocket.send_text(message)
# aio_manager = AIOConnectionManager()
# Endpoints for???
# /room/<id> (just a group of clients; for a related group like a poster presenter or session room)
# /client/<id> (for one specific client/browser; something specific to a browser???)
# /person/<id> (for one specific person; handles send and receiving their messages)
# @router.websocket('/ws_redis/group/{group_id}/client/{client_id}')
# async def ws_client_id(
# websocket: WebSocket,
# group_id: str,
# client_id: str,
# ):
# await manager.connect_w_client_id(websocket, client_id, group_id)
# # await aio_manager.connect_w_client_id(websocket, client_id, group_id)
# try:
# while True:
# data = await websocket.receive_json() # Returns dict
# # log.debug(data)
# # group_path_id = f'/ws/group/{group_id}'
# # client_id = data.get('client_id')
# # echo (echo message), dm (direct message), group (group message), all (broadcast message to all), cmd, group_cmd(?)
# msg_type = data.get('type')
# cmd = data.get('cmd')
# msg = data.get('msg')
# log.setLevel(logging.INFO)
# log.info(f'Group ID: {group_id}; Client ID: {client_id}; Type: {msg_type};')
# log.debug(f'Command: {cmd}')
# log.debug(f'Message: {msg}')
# data['client_id'] = client_id
# data['group_id'] = group_id
# await manager.group(group_id=group_id, data=data)
# # await aio_manager.group(group_id=group_id, data=data)
# await redis_connector(websocket)
# except WebSocketDisconnect:
# manager.disconnect(websocket)
# # aio_manager.disconnect(websocket)
# # await manager.broadcast(f'Client #{client_id} left')
# @router.websocket('/ws_redis/looping')
# async def ws_looping(
# websocket: WebSocket,
# ):
# await manager.connect(websocket)
# # await manager.broadcast(f'Welcome to looping')
# try:
# while True:
# # NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING
# # await time.sleep(3.5) # NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING
# # NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING
# # data = await websocket.receive_json()
# # log.debug(data)
# # data_dict = data
# # data_dict = json.loads(data)
# # log.debug(data_dict['client_id'])
# # await manager.send_personal_message(f'You wrote: {data}', websocket)
# await manager.broadcast(f'Loop!!!')
# except WebSocketDisconnect:
# manager.disconnect(websocket)
# await manager.broadcast(f'Client left looping')
# @router.websocket("/ws_redis/{client_id}")
# async def websocket_endpoint(
# websocket: WebSocket,
# client_id: int,
# response: Response = Response,
# ):
# log.setLevel(logging.DEBUG)
# log.debug(locals())
# log.info('Root of ws. Waiting to accept a websocket and then the redis_connector')
# await websocket.accept()
# await redis_connector(websocket)
@router.websocket('/ws/group/{group_id}/client/{client_id}')
@router.websocket('/ws_redis/group/{group_id}/client/{client_id}')
async def redis_ws_client_id(
websocket: WebSocket,
group_id: str,
client_id: str,
):
log.setLevel(logging.DEBUG)
log.debug(locals())
log.info('WS Redis: Group ID and Client ID. Waiting to "accept" a websocket and then the "redis_connector"')
await websocket.accept()
await redis_connector(websocket, client_id, group_id)
# await manager.connect_w_client_id(websocket, client_id, group_id)
# # await aio_manager.connect_w_client_id(websocket, client_id, group_id)
# try:
# while True:
# data = await websocket.receive_json() # Returns dict
# # log.debug(data)
# # group_path_id = f'/ws/group/{group_id}'
# # client_id = data.get('client_id')
# # echo (echo message), dm (direct message), group (group message), all (broadcast message to all), cmd, group_cmd(?)
# msg_type = data.get('type')
# cmd = data.get('cmd')
# msg = data.get('msg')
# log.setLevel(logging.INFO)
# log.info(f'Group ID: {group_id}; Client ID: {client_id}; Type: {msg_type};')
# log.debug(f'Command: {cmd}')
# log.debug(f'Message: {msg}')
# data['client_id'] = client_id
# data['group_id'] = group_id
# await manager.group(group_id=group_id, data=data)
# # await aio_manager.group(group_id=group_id, data=data)
# await redis_connector(websocket)
# except WebSocketDisconnect:
# manager.disconnect(websocket)
# # aio_manager.disconnect(websocket)
# # await manager.broadcast(f'Client #{client_id} left')
async def redis_connector(
ws_conn: WebSocket,
client_id: str,
group_id = None
):
log.setLevel(logging.DEBUG)
log.debug(locals())
async def consumer_handler(ws_conn: WebSocket, r, client_id, group_id):
log.info('consumer_handler()')
try:
while True:
data = await ws_conn.receive_json() # Returns dict
log.debug(data)
# echo (echo message), dm (direct message), group (group message), all (broadcast message to all), cmd, group_cmd(?)
msg_type = data.get('type')
cmd = data.get('cmd')
msg = data.get('msg')
log.setLevel(logging.INFO)
log.info(f'Group ID: {group_id}; Client ID: {client_id}; Type: {msg_type};')
log.debug(f'Command: {cmd}')
log.debug(f'Message: {msg}')
data['client_id'] = client_id
data['group_id'] = group_id
log.debug(data)
if data:
#logging.info(ws)
#logging.info(dir(message))
# data = json.loads(message)
#await r.publish("chat:c", message)
#await r.publish("chat:c", str(data['message']))
# causes: TypeError: object int can't be used in 'await' expression
r.publish("chat:c", str(data.get('client_id')))
r.publish("chat:c", str(data))
# r.publish("chat:c", str({'test': 'xxxx'}))
# r.publish("chat:c", str('data goes here...'))
except WebSocketDisconnect as exc:
# TODO this needs handling better
log.error(exc)
async def producer_handler(r, ws_conn: redis.client.PubSub, client_id, group_id):
log.info('producer_handler()')
(channel,) = await r.subscribe("chat:c")
assert isinstance(channel, redis.Channel)
try:
while True:
data = await channel.get()
if data:
await ws_conn.send_json(data)
except Exception as exc:
# TODO this needs handling better
log.error(exc)
# redis_conn = await redis.create_pool(redis_url)
# Redis client bound to pool of connections (auto-reconnecting).
# r = redis.Redis(host=settings.REDIS['server'], port=settings.REDIS['port'], db=7, password=None, decode_responses=True)
log.info('Create Redis connection')
redis_conn = redis.Redis(host=settings.REDIS['server'], port=settings.REDIS['port'], db=6, password=None, encoding='utf-8', decode_responses=True)
# redis = redis.from_url(
# redis_url, encoding="utf-8", decode_responses=True
# )
redis_conn_pubsub = redis_conn.pubsub()
log.info('Run consumer_task')
# consumer_task = consumer_handler(ws_conn, redis_conn, client_id, group_id)
# future_consumer = asyncio.create_task(consumer_task())
future_consumer = asyncio.create_task(consumer_handler(ws_conn, redis_conn, client_id, group_id))
log.info('Run producer_task')
# producer_task = producer_handler(redis_conn_pubsub, ws_conn, client_id, group_id)
# future_producer = asyncio.create_task(producer_task())
future_producer = asyncio.create_task(producer_handler(redis_conn_pubsub, ws_conn, client_id, group_id))
await future_consumer
await future_producer
# pending_tasks = asyncio.all_tasks(loop=event_loop)
# # cancel each pending task
# for task in pending_tasks:
# logger.info(f'killing task {task.get_coro()}')
# event_loop.call_soon_threadsafe(task.cancel)
# # This does not work for some reason.
# # "Passing coroutines is forbidden, use tasks explicitly."
# log.info('Start asyncio wait')
# done, pending = await asyncio.wait(
# [consumer_task, producer_task],
# return_when = asyncio.FIRST_COMPLETED,
# )
# log.debug(f'Done task: {done}')
# for task in pending:
# log.debug(f'Canceling task: {task}')
# task.cancel()
# await redis_conn.close()
# await redis_conn.wait_closed()