Work on websockets. Finally 95% working?
This commit is contained in:
@@ -2,7 +2,8 @@ from fastapi import APIRouter, FastAPI, Response, WebSocket, WebSocketDisconnect
|
||||
from fastapi.responses import HTMLResponse
|
||||
from pydantic import BaseModel, EmailStr, Field
|
||||
from typing import Dict, List, Optional, Set, Union
|
||||
import asyncio, base64, datetime, hashlib, json, os, pathlib, redis, shutil, time
|
||||
import asyncio, base64, datetime, hashlib, json, os, pathlib, shutil, time
|
||||
import redis.asyncio as redis
|
||||
# import aioredis, redis, asyncio, base64, datetime, hashlib, json, os, pathlib, shutil, time
|
||||
# from aioredis import from_url, Redis
|
||||
# import asyncio
|
||||
@@ -555,26 +556,6 @@ class ConnectionManager:
|
||||
# await websocket.accept()
|
||||
# await redis_connector(websocket)
|
||||
|
||||
|
||||
|
||||
|
||||
@router.websocket('/ws/group/{group_id}/client/{client_id}')
|
||||
@router.websocket('/ws_redis/group/{group_id}/client/{client_id}')
|
||||
async def redis_ws_client_id(
|
||||
websocket: WebSocket,
|
||||
group_id: str,
|
||||
client_id: str,
|
||||
):
|
||||
log.setLevel(logging.DEBUG)
|
||||
log.debug(locals())
|
||||
|
||||
log.info('WS Redis: Group ID and Client ID. Waiting to "accept" a websocket and then the "redis_connector"')
|
||||
|
||||
await websocket.accept()
|
||||
|
||||
await redis_connector(websocket, client_id, group_id)
|
||||
|
||||
|
||||
# await manager.connect_w_client_id(websocket, client_id, group_id)
|
||||
# # await aio_manager.connect_w_client_id(websocket, client_id, group_id)
|
||||
# try:
|
||||
@@ -608,6 +589,23 @@ async def redis_ws_client_id(
|
||||
# # await manager.broadcast(f'Client #{client_id} left')
|
||||
|
||||
|
||||
@router.websocket('/ws/group/{group_id}/client/{client_id}')
|
||||
@router.websocket('/ws_redis/group/{group_id}/client/{client_id}')
|
||||
async def redis_ws_client_id(
|
||||
websocket: WebSocket,
|
||||
group_id: str,
|
||||
client_id: str,
|
||||
):
|
||||
log.setLevel(logging.DEBUG)
|
||||
log.debug(locals())
|
||||
|
||||
log.info('WS Redis: Group ID and Client ID. Waiting to "accept" a websocket and then the "redis_connector"')
|
||||
|
||||
await websocket.accept()
|
||||
|
||||
await redis_connector(websocket, client_id, group_id)
|
||||
|
||||
|
||||
async def redis_connector(
|
||||
ws_conn: WebSocket,
|
||||
client_id: str,
|
||||
@@ -616,8 +614,13 @@ async def redis_connector(
|
||||
log.setLevel(logging.DEBUG)
|
||||
log.debug(locals())
|
||||
|
||||
async def consumer_handler(ws_conn: WebSocket, r, client_id, group_id):
|
||||
# The consumer is receiving WS JSON data
|
||||
async def consumer_handler(ws_conn: WebSocket, r_conn, client_id, group_id):
|
||||
log.info('consumer_handler()')
|
||||
|
||||
# Loop until the WS is disconnected
|
||||
# Publish data as messages to Redis
|
||||
# Close Redis connection on disconnect
|
||||
try:
|
||||
while True:
|
||||
data = await ws_conn.receive_json() # Returns dict
|
||||
@@ -635,6 +638,7 @@ async def redis_connector(
|
||||
|
||||
data['client_id'] = client_id
|
||||
data['group_id'] = group_id
|
||||
log.setLevel(logging.DEBUG)
|
||||
log.debug(data)
|
||||
|
||||
if data:
|
||||
@@ -643,28 +647,70 @@ async def redis_connector(
|
||||
# data = json.loads(message)
|
||||
#await r.publish("chat:c", message)
|
||||
#await r.publish("chat:c", str(data['message']))
|
||||
|
||||
# causes: TypeError: object int can't be used in 'await' expression
|
||||
r.publish("chat:c", str(data.get('client_id')))
|
||||
r.publish("chat:c", str(data))
|
||||
# r.publish("chat:c", str({'test': 'xxxx'}))
|
||||
# r.publish("chat:c", str('data goes here...'))
|
||||
except WebSocketDisconnect as exc:
|
||||
# TODO this needs handling better
|
||||
log.error(exc)
|
||||
|
||||
async def producer_handler(r, ws_conn: redis.client.PubSub, client_id, group_id):
|
||||
# causes: TypeError: object int can't be used in 'await' expression
|
||||
# await r.publish('chat:c', json.dumps(data.get('client_id')))
|
||||
# log.debug(await r.get('chat:c'))
|
||||
await r_conn.publish('chat:c', json.dumps(data))
|
||||
# log.debug(await r.get('chat:c'))
|
||||
# r.publish('chat:c', str({'test': 'xxxx'}))
|
||||
# r.publish('chat:c', str('data goes here...'))
|
||||
except WebSocketDisconnect as exc:
|
||||
# FUTURE: Remove from list of connected clients?
|
||||
log.error(exc)
|
||||
await r_conn.close()
|
||||
except Exception as e:
|
||||
log.exception('Aether Exception!')
|
||||
log.error(e)
|
||||
await r_conn.close()
|
||||
|
||||
# log.debug(await r.get('chat:c'))
|
||||
|
||||
# Also called the Redis reader for a channel subscription
|
||||
# This watches a Redis channel. Then sends out the message.
|
||||
async def producer_handler(r_channel, ws_conn: WebSocket, client_id, group_id):
|
||||
log.info('producer_handler()')
|
||||
(channel,) = await r.subscribe("chat:c")
|
||||
assert isinstance(channel, redis.Channel)
|
||||
|
||||
try:
|
||||
while True:
|
||||
data = await channel.get()
|
||||
if data:
|
||||
await ws_conn.send_json(data)
|
||||
except Exception as exc:
|
||||
message = await r_channel.get_message(ignore_subscribe_messages=True)
|
||||
if message is not None:
|
||||
log.debug(f'(Redis Reader) Message Received: {message}')
|
||||
data = json.loads(message['data'])
|
||||
|
||||
if group_id and group_id == data.get('group_id'):
|
||||
log.info('Sending to matching group ID')
|
||||
await ws_conn.send_json(data)
|
||||
elif group_id:
|
||||
log.info('Skip sending to client')
|
||||
pass
|
||||
else:
|
||||
log.info('Sending to all')
|
||||
await ws_conn.send_json(data)
|
||||
|
||||
if data == STOPWORD:
|
||||
log.debug('(Reader) STOP')
|
||||
break
|
||||
except ConnectionError as conn_e:
|
||||
log.error(conn_e)
|
||||
except Exception as e:
|
||||
# raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR) from None
|
||||
# redis.exceptions.ConnectionError: Connection closed by server.
|
||||
|
||||
# TODO this needs handling better
|
||||
log.error(exc)
|
||||
log.error(e)
|
||||
|
||||
|
||||
# (channel,) = await r.subscribe('chat:c')
|
||||
# assert isinstance(channel, redis.Channel)
|
||||
# try:
|
||||
# while True:
|
||||
# data = await channel.get()
|
||||
# if data:
|
||||
# await ws_conn.send_json(data)
|
||||
# except Exception as exc:
|
||||
# # TODO this needs handling better
|
||||
# log.error(exc)
|
||||
|
||||
# redis_conn = await redis.create_pool(redis_url)
|
||||
# Redis client bound to pool of connections (auto-reconnecting).
|
||||
@@ -676,6 +722,8 @@ async def redis_connector(
|
||||
# )
|
||||
redis_conn_pubsub = redis_conn.pubsub()
|
||||
|
||||
await redis_conn_pubsub.subscribe('channel:1', 'channel:2', 'chat:c')
|
||||
|
||||
log.info('Run consumer_task')
|
||||
# consumer_task = consumer_handler(ws_conn, redis_conn, client_id, group_id)
|
||||
|
||||
@@ -684,12 +732,18 @@ async def redis_connector(
|
||||
|
||||
log.info('Run producer_task')
|
||||
# producer_task = producer_handler(redis_conn_pubsub, ws_conn, client_id, group_id)
|
||||
|
||||
|
||||
# future_producer = asyncio.create_task(producer_task())
|
||||
future_producer = asyncio.create_task(producer_handler(redis_conn_pubsub, ws_conn, client_id, group_id))
|
||||
|
||||
await future_consumer
|
||||
await future_producer
|
||||
|
||||
await future_consumer # Get WS messages and set in Redis
|
||||
await future_producer # Send out messages from Redis subscribe
|
||||
|
||||
# @app.on_event('shutdown')
|
||||
# async def shutdown():
|
||||
# log.info('The Aether FastAPI API is shutting down...')
|
||||
# # await ws_conn.close()
|
||||
# await redis_conn.close()
|
||||
|
||||
# pending_tasks = asyncio.all_tasks(loop=event_loop)
|
||||
|
||||
@@ -713,3 +767,37 @@ async def redis_connector(
|
||||
# await redis_conn.close()
|
||||
# await redis_conn.wait_closed()
|
||||
|
||||
|
||||
STOPWORD = 'STOP'
|
||||
|
||||
# async def reader(channel: redis.client.PubSub):
|
||||
# while True:
|
||||
# message = await channel.get_message(ignore_subscribe_messages=True)
|
||||
# if message is not None:
|
||||
# log.debug(f'(Reader) Message Received: {message}')
|
||||
# log.info('WHAT NOW???')
|
||||
# if message['data'].decode() == STOPWORD:
|
||||
# log.debug('(Reader) STOP')
|
||||
# break
|
||||
|
||||
# r = redis.from_url('redis://localhost')
|
||||
# r = redis.Redis(host=settings.REDIS['server'], port=settings.REDIS['port'], db=6, password=None, encoding='utf-8', decode_responses=True)
|
||||
# async with r.pubsub() as pubsub:
|
||||
# await pubsub.subscribe('channel:1', 'channel:2', 'chat:c')
|
||||
|
||||
# future = asyncio.create_task(reader(pubsub))
|
||||
|
||||
# await r.publish('channel:1', 'Hello')
|
||||
# await r.publish('channel:2', 'World')
|
||||
# await r.publish('channel:1', STOPWORD)
|
||||
|
||||
# await future
|
||||
|
||||
# @app.on_event('shutdown')
|
||||
# async def shutdown():
|
||||
# log.setLevel(logging.INFO) # DEBUG, INFO, WARN, WARNING, ERROR, EXCEPTION, CRITICAL
|
||||
# log.debug(locals())
|
||||
|
||||
# log.info('The Aether FastAPI API is shutting down...')
|
||||
# #await database.disconnect()
|
||||
# r.close()
|
||||
Reference in New Issue
Block a user