import asyncio, json class DashboardWebSocketHub: def __init__(self): self.subscribers:set[asyncio.Queue[str]] = set() self.subscribers_lock = asyncio.Lock() self.ws_tasks:set[asyncio.Task] = set() self.ws_tasks_lock = asyncio.Lock() self.shutdown_event = asyncio.Event() self.shutdown_message = json.dumps({"type": "dashboard_ws_shutdown"}) async def register_subscriber(self, subscriber_queue:asyncio.Queue[str]) -> None: async with self.subscribers_lock: self.subscribers.add(subscriber_queue) async def unregister_subscriber(self, subscriber_queue:asyncio.Queue[str]) -> None: async with self.subscribers_lock: self.subscribers.discard(subscriber_queue) async def register_task(self, websocket_task:asyncio.Task) -> None: async with self.ws_tasks_lock: self.ws_tasks.add(websocket_task) async def unregister_task(self, websocket_task:asyncio.Task) -> None: async with self.ws_tasks_lock: self.ws_tasks.discard(websocket_task) async def broadcast_payload(self, payload:dict) -> None: encoded_payload = json.dumps(payload) async with self.subscribers_lock: subscribers = tuple(self.subscribers) for subscriber_queue in subscribers: if subscriber_queue.full(): try: subscriber_queue.get_nowait() except asyncio.QueueEmpty: pass try: subscriber_queue.put_nowait(encoded_payload) except asyncio.QueueFull: continue def request_shutdown(self) -> None: if self.shutdown_event.is_set(): return self.shutdown_event.set() for subscriber_queue in tuple(self.subscribers): if subscriber_queue.full(): try: subscriber_queue.get_nowait() except asyncio.QueueEmpty: pass try: subscriber_queue.put_nowait(self.shutdown_message) except asyncio.QueueFull: continue