128 lines
3.9 KiB
Python
128 lines
3.9 KiB
Python
from quart_common.web.logger import await_log
|
|
|
|
from typing import Awaitable, Callable
|
|
import asyncio, inspect, json, time
|
|
|
|
class DashboardEventsService:
|
|
def __init__(self, enabled:bool, redis_url:str, channel:str, event_origin:str, shutdown_event:asyncio.Event, on_notice:Callable[[str], Awaitable[None]], logger):
|
|
self.enabled = enabled
|
|
self.redis_url = redis_url
|
|
self.channel = channel
|
|
self.event_origin = event_origin
|
|
self.shutdown_event = shutdown_event
|
|
self.on_notice = on_notice
|
|
self.logger = logger
|
|
|
|
self.listener_task:asyncio.Task|None=None
|
|
self.redis = None
|
|
self.pubsub = None
|
|
|
|
async def start_listener(self) -> None:
|
|
if not self.enabled:
|
|
return
|
|
if self.listener_task is not None:
|
|
return
|
|
|
|
try:
|
|
import redis.asyncio as aioredis # type: ignore[import-not-found]
|
|
|
|
self.redis = aioredis.from_url(self.redis_url)
|
|
self.pubsub = self.redis.pubsub()
|
|
await self.pubsub.subscribe(self.channel)
|
|
self.listener_task = asyncio.create_task(self._listener_loop())
|
|
except Exception as error:
|
|
self.listener_task = None
|
|
self.pubsub = None
|
|
self.redis = None
|
|
await await_log(self.logger.warning(f'Dashboard events listener disabled (redis unavailable): {error}'))
|
|
|
|
async def stop_listener(self) -> None:
|
|
listener_task = self.listener_task
|
|
self.listener_task = None
|
|
if listener_task is not None:
|
|
listener_task.cancel()
|
|
await asyncio.gather(listener_task, return_exceptions=True)
|
|
|
|
pubsub = self.pubsub
|
|
self.pubsub = None
|
|
if pubsub is not None:
|
|
try:
|
|
await pubsub.unsubscribe(self.channel)
|
|
except Exception:
|
|
pass
|
|
close_method = getattr(pubsub, 'aclose', None)
|
|
if callable(close_method):
|
|
try:
|
|
maybe_result = close_method()
|
|
if inspect.isawaitable(maybe_result):
|
|
await maybe_result
|
|
except Exception:
|
|
pass
|
|
|
|
redis_client = self.redis
|
|
self.redis = None
|
|
if redis_client is not None:
|
|
close_method = getattr(redis_client, 'aclose', None)
|
|
if callable(close_method):
|
|
try:
|
|
maybe_result = close_method()
|
|
if inspect.isawaitable(maybe_result):
|
|
await maybe_result
|
|
except Exception:
|
|
pass
|
|
|
|
async def publish_notice(self, trigger:str) -> None:
|
|
if not self.enabled:
|
|
return
|
|
if self.redis is None:
|
|
return
|
|
if trigger not in {'game_saved', 'stale_finalized', 'manual'}:
|
|
return
|
|
|
|
message = {
|
|
'type': 'dashboard_games_update_notice',
|
|
'origin': self.event_origin,
|
|
'trigger': trigger,
|
|
'sent_at': int(time.time()),
|
|
}
|
|
try:
|
|
await self.redis.publish(self.channel, json.dumps(message))
|
|
except Exception as error:
|
|
await await_log(self.logger.warning(f'Dashboard events publish failed: {error}'))
|
|
|
|
async def _listener_loop(self) -> None:
|
|
pubsub = self.pubsub
|
|
if pubsub is None:
|
|
return
|
|
|
|
try:
|
|
while not self.shutdown_event.is_set():
|
|
message = await pubsub.get_message(ignore_subscribe_messages=True, timeout=1.0)
|
|
if message is None:
|
|
continue
|
|
|
|
raw_data = message.get('data')
|
|
if isinstance(raw_data, bytes):
|
|
payload_raw = raw_data.decode('utf-8', errors='replace')
|
|
else:
|
|
payload_raw = str(raw_data)
|
|
|
|
try:
|
|
payload = json.loads(payload_raw)
|
|
except json.JSONDecodeError:
|
|
continue
|
|
|
|
if not isinstance(payload, dict):
|
|
continue
|
|
if payload.get('type') != 'dashboard_games_update_notice':
|
|
continue
|
|
if payload.get('origin') == self.event_origin:
|
|
continue
|
|
|
|
notice_trigger = str(payload.get('trigger') or 'game_saved')
|
|
await self.on_notice(notice_trigger)
|
|
except asyncio.CancelledError:
|
|
pass
|
|
except Exception as error:
|
|
await await_log(self.logger.warning(f'Dashboard events listener stopped unexpectedly: {error}'))
|