Files
snake-python/server/services/dashboard_ws_hub.py
T

62 lines
1.9 KiB
Python

import asyncio, json
class DashboardWebSocketHub:
def __init__(self):
self.subscribers:set[asyncio.Queue[str]] = set()
self.subscribers_lock = asyncio.Lock()
self.ws_tasks:set[asyncio.Task] = set()
self.ws_tasks_lock = asyncio.Lock()
self.shutdown_event = asyncio.Event()
self.shutdown_message = json.dumps({"type": "dashboard_ws_shutdown"})
async def register_subscriber(self, subscriber_queue:asyncio.Queue[str]) -> None:
async with self.subscribers_lock:
self.subscribers.add(subscriber_queue)
async def unregister_subscriber(self, subscriber_queue:asyncio.Queue[str]) -> None:
async with self.subscribers_lock:
self.subscribers.discard(subscriber_queue)
async def register_task(self, websocket_task:asyncio.Task) -> None:
async with self.ws_tasks_lock:
self.ws_tasks.add(websocket_task)
async def unregister_task(self, websocket_task:asyncio.Task) -> None:
async with self.ws_tasks_lock:
self.ws_tasks.discard(websocket_task)
async def broadcast_payload(self, payload:dict) -> None:
encoded_payload = json.dumps(payload)
async with self.subscribers_lock:
subscribers = tuple(self.subscribers)
for subscriber_queue in subscribers:
if subscriber_queue.full():
try:
subscriber_queue.get_nowait()
except asyncio.QueueEmpty:
pass
try:
subscriber_queue.put_nowait(encoded_payload)
except asyncio.QueueFull:
continue
def request_shutdown(self) -> None:
if self.shutdown_event.is_set():
return
self.shutdown_event.set()
for subscriber_queue in tuple(self.subscribers):
if subscriber_queue.full():
try:
subscriber_queue.get_nowait()
except asyncio.QueueEmpty:
pass
try:
subscriber_queue.put_nowait(self.shutdown_message)
except asyncio.QueueFull:
continue