Files
snake-python/server/services/dashboard_events.py
T

128 lines
3.9 KiB
Python

from quart_common.web.logger import await_log
from typing import Awaitable, Callable
import asyncio, inspect, json, time
class DashboardEventsService:
def __init__(self, enabled:bool, redis_url:str, channel:str, event_origin:str, shutdown_event:asyncio.Event, on_notice:Callable[[str], Awaitable[None]], logger):
self.enabled = enabled
self.redis_url = redis_url
self.channel = channel
self.event_origin = event_origin
self.shutdown_event = shutdown_event
self.on_notice = on_notice
self.logger = logger
self.listener_task:asyncio.Task|None=None
self.redis = None
self.pubsub = None
async def start_listener(self) -> None:
if not self.enabled:
return
if self.listener_task is not None:
return
try:
import redis.asyncio as aioredis # type: ignore[import-not-found]
self.redis = aioredis.from_url(self.redis_url)
self.pubsub = self.redis.pubsub()
await self.pubsub.subscribe(self.channel)
self.listener_task = asyncio.create_task(self._listener_loop())
except Exception as error:
self.listener_task = None
self.pubsub = None
self.redis = None
await await_log(self.logger.warning(f'Dashboard events listener disabled (redis unavailable): {error}'))
async def stop_listener(self) -> None:
listener_task = self.listener_task
self.listener_task = None
if listener_task is not None:
listener_task.cancel()
await asyncio.gather(listener_task, return_exceptions=True)
pubsub = self.pubsub
self.pubsub = None
if pubsub is not None:
try:
await pubsub.unsubscribe(self.channel)
except Exception:
pass
close_method = getattr(pubsub, 'aclose', None)
if callable(close_method):
try:
maybe_result = close_method()
if inspect.isawaitable(maybe_result):
await maybe_result
except Exception:
pass
redis_client = self.redis
self.redis = None
if redis_client is not None:
close_method = getattr(redis_client, 'aclose', None)
if callable(close_method):
try:
maybe_result = close_method()
if inspect.isawaitable(maybe_result):
await maybe_result
except Exception:
pass
async def publish_notice(self, trigger:str) -> None:
if not self.enabled:
return
if self.redis is None:
return
if trigger not in {'game_saved', 'stale_finalized', 'manual'}:
return
message = {
'type': 'dashboard_games_update_notice',
'origin': self.event_origin,
'trigger': trigger,
'sent_at': int(time.time()),
}
try:
await self.redis.publish(self.channel, json.dumps(message))
except Exception as error:
await await_log(self.logger.warning(f'Dashboard events publish failed: {error}'))
async def _listener_loop(self) -> None:
pubsub = self.pubsub
if pubsub is None:
return
try:
while not self.shutdown_event.is_set():
message = await pubsub.get_message(ignore_subscribe_messages=True, timeout=1.0)
if message is None:
continue
raw_data = message.get('data')
if isinstance(raw_data, bytes):
payload_raw = raw_data.decode('utf-8', errors='replace')
else:
payload_raw = str(raw_data)
try:
payload = json.loads(payload_raw)
except json.JSONDecodeError:
continue
if not isinstance(payload, dict):
continue
if payload.get('type') != 'dashboard_games_update_notice':
continue
if payload.get('origin') == self.event_origin:
continue
notice_trigger = str(payload.get('trigger') or 'game_saved')
await self.on_notice(notice_trigger)
except asyncio.CancelledError:
pass
except Exception as error:
await await_log(self.logger.warning(f'Dashboard events listener stopped unexpectedly: {error}'))