Work on websockets. Finally 95.75% working?
This commit is contained in:
@@ -764,3 +764,89 @@ async def redis_connector(
|
||||
task.cancel()
|
||||
await redis.close()
|
||||
# await redis.wait_closed()
|
||||
|
||||
|
||||
|
||||
#### ####
|
||||
|
||||
# @app.on_event('shutdown')
|
||||
# async def shutdown():
|
||||
# log.info('The Aether FastAPI API is shutting down...')
|
||||
# # await ws_conn.close()
|
||||
# await redis_conn.close()
|
||||
|
||||
# pending_tasks = asyncio.all_tasks(loop=event_loop)
|
||||
|
||||
# # cancel each pending task
|
||||
# for task in pending_tasks:
|
||||
# logger.info(f'killing task {task.get_coro()}')
|
||||
# event_loop.call_soon_threadsafe(task.cancel)
|
||||
|
||||
# # This does not work for some reason.
|
||||
# # "Passing coroutines is forbidden, use tasks explicitly."
|
||||
# log.info('Start asyncio wait')
|
||||
# done, pending = await asyncio.wait(
|
||||
# [receiver_task, producer_task],
|
||||
# return_when = asyncio.FIRST_COMPLETED,
|
||||
# )
|
||||
# log.debug(f'Done task: {done}')
|
||||
|
||||
# for task in pending:
|
||||
# log.debug(f'Canceling task: {task}')
|
||||
# task.cancel()
|
||||
# await redis_conn.close()
|
||||
# await redis_conn.wait_closed()
|
||||
|
||||
|
||||
|
||||
# (channel,) = await r.subscribe('chat:c')
|
||||
# assert isinstance(channel, redis.Channel)
|
||||
# try:
|
||||
# while True:
|
||||
# data = await channel.get()
|
||||
# if data:
|
||||
# await ws_conn.send_json(data)
|
||||
# except Exception as exc:
|
||||
# # TODO this needs handling better
|
||||
# log.error(exc)
|
||||
|
||||
# receiver_task = receiver_handler(ws_conn, redis_conn, client_id, group_id)
|
||||
|
||||
# future_receiver = asyncio.create_task(receiver_task())
|
||||
|
||||
# sender_task = sender_handler(redis_conn_pubsub, ws_conn, client_id, group_id)
|
||||
|
||||
# future_sender = asyncio.create_task(sender_task())
|
||||
|
||||
|
||||
# async def reader(channel: redis.client.PubSub):
|
||||
# while True:
|
||||
# message = await channel.get_message(ignore_subscribe_messages=True)
|
||||
# if message is not None:
|
||||
# log.debug(f'(Reader) Message Received: {message}')
|
||||
# log.info('WHAT NOW???')
|
||||
# if message['data'].decode() == STOPWORD:
|
||||
# log.debug('(Reader) STOP')
|
||||
# break
|
||||
|
||||
# r = redis.from_url('redis://localhost')
|
||||
# r = redis.Redis(host=settings.REDIS['server'], port=settings.REDIS['port'], db=6, password=None, encoding='utf-8', decode_responses=True)
|
||||
# async with r.pubsub() as pubsub:
|
||||
# await pubsub.subscribe('channel:1', 'channel:2', 'chat:c')
|
||||
|
||||
# future = asyncio.create_task(reader(pubsub))
|
||||
|
||||
# await r.publish('channel:1', 'Hello')
|
||||
# await r.publish('channel:2', 'World')
|
||||
# await r.publish('channel:1', STOPWORD)
|
||||
|
||||
# await future
|
||||
|
||||
# @app.on_event('shutdown')
|
||||
# async def shutdown():
|
||||
# log.setLevel(logging.INFO) # DEBUG, INFO, WARN, WARNING, ERROR, EXCEPTION, CRITICAL
|
||||
# log.debug(locals())
|
||||
|
||||
# log.info('The Aether FastAPI API is shutting down...')
|
||||
# #await database.disconnect()
|
||||
# r.close()
|
||||
@@ -614,11 +614,9 @@ async def redis_connector(
|
||||
log.setLevel(logging.DEBUG)
|
||||
log.debug(locals())
|
||||
|
||||
# The consumer is receiving WS JSON data
|
||||
# ws_receiver_json_handler()
|
||||
# receiver_handler() # RENAME TO THIS
|
||||
async def consumer_handler(ws_conn: WebSocket, r_conn, client_id, group_id):
|
||||
log.info('consumer_handler()')
|
||||
# The receives WS JSON data. Then publishes to Redis channel.
|
||||
async def receiver_handler(ws_conn: WebSocket, r_conn, client_id, group_id):
|
||||
log.info('receiver_handler()')
|
||||
|
||||
# Loop until the WS is disconnected
|
||||
# Publish data as messages to Redis
|
||||
@@ -646,24 +644,19 @@ async def redis_connector(
|
||||
if data:
|
||||
#logging.info(ws)
|
||||
#logging.info(dir(message))
|
||||
# data = json.loads(message)
|
||||
#await r.publish("chat:c", message)
|
||||
#await r.publish("chat:c", str(data['message']))
|
||||
# log.debug(await r.get('chat:c'))
|
||||
|
||||
# causes: TypeError: object int can't be used in 'await' expression
|
||||
# await r.publish('chat:c', json.dumps(data.get('client_id')))
|
||||
# log.debug(await r.get('chat:c'))
|
||||
await r_conn.publish('chat:c', json.dumps(data))
|
||||
# log.debug(await r.get('chat:c'))
|
||||
# r.publish('chat:c', str({'test': 'xxxx'}))
|
||||
# r.publish('chat:c', str('data goes here...'))
|
||||
|
||||
# log.debug(await r_conn.get_message())
|
||||
|
||||
except WebSocketDisconnect as exc:
|
||||
# TODO: This needs handling better?
|
||||
# FUTURE: Remove from list of connected clients?
|
||||
log.error(exc)
|
||||
await r_conn.close()
|
||||
log.error(exc) # usually 1001?
|
||||
# await r_conn.close() # Uncommenting causes an exception below (sender_handler)
|
||||
except Exception as e:
|
||||
log.exception('Aether Exception!')
|
||||
log.exception('Exception in Aether!')
|
||||
log.error(e)
|
||||
await r_conn.close()
|
||||
|
||||
@@ -671,10 +664,8 @@ async def redis_connector(
|
||||
|
||||
# Also called the Redis reader for a channel subscription
|
||||
# This watches a Redis channel. Then sends out the message.
|
||||
# redis_watcher_ws_sender_json_handler()
|
||||
# sender_handler() # RENAME TO THIS
|
||||
async def producer_handler(r_channel, ws_conn: WebSocket, client_id, group_id):
|
||||
log.info('producer_handler()')
|
||||
async def sender_handler(r_channel, ws_conn: WebSocket, client_id, group_id):
|
||||
log.info('sender_handler()')
|
||||
|
||||
try:
|
||||
while True:
|
||||
@@ -683,125 +674,69 @@ async def redis_connector(
|
||||
log.debug(f'(Redis Reader) Message Received: {message}')
|
||||
data = json.loads(message['data'])
|
||||
|
||||
if group_id and group_id == data.get('group_id'):
|
||||
log.info('Sending to matching group ID')
|
||||
await ws_conn.send_json(data)
|
||||
elif group_id:
|
||||
log.info('Skip sending to client')
|
||||
pass
|
||||
else:
|
||||
log.info('Sending to all')
|
||||
await ws_conn.send_json(data)
|
||||
# Need to make sure the ws_connection is still open before trying to send.
|
||||
# log.debug(ws_conn.open())
|
||||
# log.debug(ws_conn.closed)
|
||||
log.debug(vars(ws_conn))
|
||||
|
||||
if data == STOPWORD:
|
||||
log.debug(ws_conn.client_state)
|
||||
log.debug(vars(ws_conn.client_state))
|
||||
|
||||
log.debug(ws_conn.application_state)
|
||||
log.debug(vars(ws_conn.application_state))
|
||||
|
||||
# 'WebSocketState.CONNECTED' or 'WebSocketState.DISCONNECTED'
|
||||
if ws_conn.client_state == 'WebSocketState.CONNECTED':
|
||||
if group_id and group_id == data.get('group_id'):
|
||||
log.info('Sending to matching group ID')
|
||||
await ws_conn.send_json(data)
|
||||
elif group_id:
|
||||
log.info('Skip sending to client')
|
||||
pass
|
||||
else:
|
||||
log.info('Sending to all')
|
||||
await ws_conn.send_json(data)
|
||||
else:
|
||||
log.warning(f'WS client ({client_id}) no longer connected!? They have just missed a message.')
|
||||
|
||||
if data == 'STOP':
|
||||
log.debug('(Reader) STOP')
|
||||
break
|
||||
except ConnectionError as conn_e:
|
||||
# TODO: This needs handling better?
|
||||
log.error(conn_e)
|
||||
# await r_channel.unsubscribe()
|
||||
await r_channel.close()
|
||||
# except ConnectionClosedError as conn_close_e:
|
||||
# log.error(conn_close_e)
|
||||
# except redis.exceptions.ConnectionError as e:
|
||||
# TODO: This needs handling better?
|
||||
# log.error(e)
|
||||
except Exception as e:
|
||||
# raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR) from None
|
||||
# redis.exceptions.ConnectionError: Connection closed by server.
|
||||
log.exception('Exception in Aether!')
|
||||
log.error(e)
|
||||
|
||||
|
||||
# (channel,) = await r.subscribe('chat:c')
|
||||
# assert isinstance(channel, redis.Channel)
|
||||
# try:
|
||||
# while True:
|
||||
# data = await channel.get()
|
||||
# if data:
|
||||
# await ws_conn.send_json(data)
|
||||
# except Exception as exc:
|
||||
# # TODO this needs handling better
|
||||
# log.error(exc)
|
||||
# raise RuntimeError(msg % message_type)
|
||||
# RuntimeError: Unexpected ASGI message 'websocket.send', after sending 'websocket.close'.
|
||||
|
||||
# raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR) from None
|
||||
# redis.exceptions.ConnectionError: Connection closed by server.
|
||||
|
||||
# redis_conn = await redis.create_pool(redis_url)
|
||||
# Redis client bound to pool of connections (auto-reconnecting).
|
||||
# r = redis.Redis(host=settings.REDIS['server'], port=settings.REDIS['port'], db=7, password=None, decode_responses=True)
|
||||
log.info('Create Redis connection')
|
||||
log.info('Create async Redis connection')
|
||||
redis_conn = redis.Redis(host=settings.REDIS['server'], port=settings.REDIS['port'], db=6, password=None, encoding='utf-8', decode_responses=True)
|
||||
# redis = redis.from_url(
|
||||
# redis_url, encoding="utf-8", decode_responses=True
|
||||
# )
|
||||
|
||||
redis_conn_pubsub = redis_conn.pubsub()
|
||||
|
||||
await redis_conn_pubsub.subscribe('channel:1', 'channel:2', 'chat:c')
|
||||
|
||||
log.info('Run consumer_task')
|
||||
# consumer_task = consumer_handler(ws_conn, redis_conn, client_id, group_id)
|
||||
log.info('Run receiver_task')
|
||||
future_receiver = asyncio.create_task(receiver_handler(ws_conn, redis_conn, client_id, group_id))
|
||||
|
||||
# future_consumer = asyncio.create_task(consumer_task())
|
||||
future_consumer = asyncio.create_task(consumer_handler(ws_conn, redis_conn, client_id, group_id))
|
||||
log.info('Run sender_task')
|
||||
|
||||
log.info('Run producer_task')
|
||||
# producer_task = producer_handler(redis_conn_pubsub, ws_conn, client_id, group_id)
|
||||
future_sender = asyncio.create_task(sender_handler(redis_conn_pubsub, ws_conn, client_id, group_id))
|
||||
|
||||
# future_producer = asyncio.create_task(producer_task())
|
||||
future_producer = asyncio.create_task(producer_handler(redis_conn_pubsub, ws_conn, client_id, group_id))
|
||||
|
||||
await future_consumer # Get WS messages and set in Redis
|
||||
await future_producer # Send out messages from Redis subscribe
|
||||
|
||||
# @app.on_event('shutdown')
|
||||
# async def shutdown():
|
||||
# log.info('The Aether FastAPI API is shutting down...')
|
||||
# # await ws_conn.close()
|
||||
# await redis_conn.close()
|
||||
|
||||
# pending_tasks = asyncio.all_tasks(loop=event_loop)
|
||||
|
||||
# # cancel each pending task
|
||||
# for task in pending_tasks:
|
||||
# logger.info(f'killing task {task.get_coro()}')
|
||||
# event_loop.call_soon_threadsafe(task.cancel)
|
||||
|
||||
# # This does not work for some reason.
|
||||
# # "Passing coroutines is forbidden, use tasks explicitly."
|
||||
# log.info('Start asyncio wait')
|
||||
# done, pending = await asyncio.wait(
|
||||
# [consumer_task, producer_task],
|
||||
# return_when = asyncio.FIRST_COMPLETED,
|
||||
# )
|
||||
# log.debug(f'Done task: {done}')
|
||||
|
||||
# for task in pending:
|
||||
# log.debug(f'Canceling task: {task}')
|
||||
# task.cancel()
|
||||
# await redis_conn.close()
|
||||
# await redis_conn.wait_closed()
|
||||
|
||||
|
||||
STOPWORD = 'STOP'
|
||||
|
||||
# async def reader(channel: redis.client.PubSub):
|
||||
# while True:
|
||||
# message = await channel.get_message(ignore_subscribe_messages=True)
|
||||
# if message is not None:
|
||||
# log.debug(f'(Reader) Message Received: {message}')
|
||||
# log.info('WHAT NOW???')
|
||||
# if message['data'].decode() == STOPWORD:
|
||||
# log.debug('(Reader) STOP')
|
||||
# break
|
||||
|
||||
# r = redis.from_url('redis://localhost')
|
||||
# r = redis.Redis(host=settings.REDIS['server'], port=settings.REDIS['port'], db=6, password=None, encoding='utf-8', decode_responses=True)
|
||||
# async with r.pubsub() as pubsub:
|
||||
# await pubsub.subscribe('channel:1', 'channel:2', 'chat:c')
|
||||
|
||||
# future = asyncio.create_task(reader(pubsub))
|
||||
|
||||
# await r.publish('channel:1', 'Hello')
|
||||
# await r.publish('channel:2', 'World')
|
||||
# await r.publish('channel:1', STOPWORD)
|
||||
|
||||
# await future
|
||||
|
||||
# @app.on_event('shutdown')
|
||||
# async def shutdown():
|
||||
# log.setLevel(logging.INFO) # DEBUG, INFO, WARN, WARNING, ERROR, EXCEPTION, CRITICAL
|
||||
# log.debug(locals())
|
||||
|
||||
# log.info('The Aether FastAPI API is shutting down...')
|
||||
# #await database.disconnect()
|
||||
# r.close()
|
||||
await future_receiver # Get WS messages and set in Redis
|
||||
await future_sender # Send out messages from Redis subscribe
|
||||
|
||||
Reference in New Issue
Block a user