From 0ebb04f0a2fa4247292ae0a6cddcba86ffbf23f2 Mon Sep 17 00:00:00 2001 From: Daniel Dolezal Date: Mon, 6 Apr 2026 02:05:56 +0200 Subject: [PATCH] change websocket and game publish to redis backend with pub sub --- server/Server.py | 155 +++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 143 insertions(+), 12 deletions(-) diff --git a/server/Server.py b/server/Server.py index f2260fc..6081a64 100644 --- a/server/Server.py +++ b/server/Server.py @@ -22,7 +22,7 @@ from quart import ( send_from_directory, websocket, ) -import asyncio, signal, logging, json, os, re, time +import asyncio, signal, inspect, logging, json, os, re, time from typing import cast class Server: @@ -53,6 +53,8 @@ class Server: ttl_seconds=game_state_ttl_sec, ) metrics_backend_normalized = (metrics_backend or 'memory').strip().lower() + self.metrics_backend_normalized = metrics_backend_normalized + self.metrics_redis_url = metrics_redis_url self.stale_game_timeout_sec = self._get_stale_game_timeout_sec() self.running_games:dict[str, GameBoard] = {} @@ -64,6 +66,12 @@ class Server: self.dashboard_ws_tasks_lock = asyncio.Lock() self.dashboard_ws_shutdown_event = asyncio.Event() self.dashboard_ws_shutdown_message = json.dumps({'type': 'dashboard_ws_shutdown'}) + self.dashboard_event_origin = f'worker-{os.getpid()}-{int(time.time() * 1000)}' + self.dashboard_events_channel = os.getenv('DASHBOARD_EVENTS_CHANNEL', 'snake:dashboard:events') + self.dashboard_events_enabled = (self.metrics_backend_normalized == 'redis' and self._env_bool('DASHBOARD_EVENTS_ENABLED', True)) + self.dashboard_events_listener_task:asyncio.Task | None = None + self.dashboard_events_redis = None + self.dashboard_events_pubsub = None self.metrics_collector = MetricsCollector( metrics_manager=MetricsStoreBuilder.build( @@ -179,9 +187,11 @@ class Server: should_clear = await self.metrics_collector.should_clear_worker_metrics_on_startup(self.worker_metrics_startup_lock_ttl_sec) if should_clear: await self.metrics_collector.clear_worker_metrics() + await self._start_dashboard_events_listener() @self.app.after_serving async def shutdown_state_storage(): + await self._stop_dashboard_events_listener() await self.game_state_store.close() await self.metrics_collector.close() if self.gameplay_database is not None: @@ -218,13 +228,6 @@ class Server: initial_games=initial_games, ) - @self.app.get('/dashboard/game/') - async def dashboard_game_replay(game_id:str): - replay = await self._get_dashboard_game_replay(game_id) - if replay is None: - return jsonify({'error': 'game_not_found', 'game_id': game_id}), 404 - return jsonify(replay) - @self.app.get('/dashboard/customizations/') async def dashboard_customizations_asset(asset_path: str): customization_root = os.path.join(self.data_path, 'server', 'static', 'customizations') @@ -569,16 +572,139 @@ class Server: except asyncio.QueueFull: continue - async def _build_dashboard_games_event(self, game_state:dict|None=None) -> dict: + async def _start_dashboard_events_listener(self) -> None: + if not self.dashboard_events_enabled: + return + if self.dashboard_events_listener_task is not None: + return + + try: + import redis.asyncio as aioredis # type: ignore[import-not-found] + + self.dashboard_events_redis = aioredis.from_url(self.metrics_redis_url) + self.dashboard_events_pubsub = self.dashboard_events_redis.pubsub() + await self.dashboard_events_pubsub.subscribe(self.dashboard_events_channel) + self.dashboard_events_listener_task = asyncio.create_task( + self._dashboard_events_listener_loop() + ) + except Exception as error: + self.dashboard_events_listener_task = None + self.dashboard_events_pubsub = None + self.dashboard_events_redis = None + await await_log(self.logger.warning(f'Dashboard events listener disabled (redis unavailable): {error}')) + + async def _stop_dashboard_events_listener(self) -> None: + listener_task = self.dashboard_events_listener_task + self.dashboard_events_listener_task = None + if listener_task is not None: + listener_task.cancel() + await asyncio.gather(listener_task, return_exceptions=True) + + pubsub = self.dashboard_events_pubsub + self.dashboard_events_pubsub = None + if pubsub is not None: + try: + await pubsub.unsubscribe(self.dashboard_events_channel) + except Exception: + pass + close_method = getattr(pubsub, 'aclose', None) + if callable(close_method): + try: + maybe_result = close_method() + if inspect.isawaitable(maybe_result): + await maybe_result + except Exception: + pass + + redis_client = self.dashboard_events_redis + self.dashboard_events_redis = None + if redis_client is not None: + close_method = getattr(redis_client, 'aclose', None) + if callable(close_method): + try: + maybe_result = close_method() + if inspect.isawaitable(maybe_result): + await maybe_result + except Exception: + pass + + async def _dashboard_events_listener_loop(self) -> None: + pubsub = self.dashboard_events_pubsub + if pubsub is None: + return + + try: + while not self.dashboard_ws_shutdown_event.is_set(): + message = await pubsub.get_message( + ignore_subscribe_messages=True, + timeout=1.0, + ) + if message is None: + continue + + raw_data = message.get('data') + if isinstance(raw_data, bytes): + payload_raw = raw_data.decode('utf-8', errors='replace') + else: + payload_raw = str(raw_data) + + try: + payload = json.loads(payload_raw) + except json.JSONDecodeError: + continue + + if not isinstance(payload, dict): + continue + if payload.get('type') != 'dashboard_games_update_notice': + continue + if payload.get('origin') == self.dashboard_event_origin: + continue + notice_trigger = str(payload.get('trigger') or 'game_saved') + + await self._push_dashboard_games_update( + game_state=None, + publish_cluster=False, + trigger=notice_trigger, + ) + except asyncio.CancelledError: + pass + except Exception as error: + await await_log( + self.logger.warning(f'Dashboard events listener stopped unexpectedly: {error}')) + + async def _publish_dashboard_games_update_notice(self, trigger: str) -> None: + if not self.dashboard_events_enabled: + return + if self.dashboard_events_redis is None: + return + if trigger not in {'game_saved', 'stale_finalized', 'manual'}: + return + + message = { + 'type': 'dashboard_games_update_notice', + 'origin': self.dashboard_event_origin, + 'trigger': trigger, + 'sent_at': int(time.time()), + } + try: + await self.dashboard_events_redis.publish( + self.dashboard_events_channel, + json.dumps(message), + ) + except Exception as error: + await await_log(self.logger.warning(f'Dashboard events publish failed: {error}')) + + async def _build_dashboard_games_event(self, game_state:dict|None=None, trigger_override:str|None=None) -> dict: games_payload = await self._get_dashboard_games(limit=100) summary_payload = await self._get_dashboard_summary() game_id = None if game_state is not None: game_id = game_state.get('game', {}).get('id') + trigger = trigger_override or ('game_saved' if game_id else 'snapshot') return { 'type': 'dashboard_games_update', - 'trigger': 'game_saved' if game_id else 'snapshot', + 'trigger': trigger, 'games': games_payload, 'summary': summary_payload, } @@ -630,11 +756,16 @@ class Server: request_id=request_id, ) - async def _push_dashboard_games_update(self, game_state:dict|None=None) -> None: + async def _push_dashboard_games_update(self, game_state:dict|None=None, publish_cluster:bool=True, trigger:str|None=None) -> None: if self.gameplay_database is None: return - event_payload = await self._build_dashboard_games_event(game_state) + event_payload = await self._build_dashboard_games_event( + game_state, + trigger_override=trigger, + ) await self._broadcast_dashboard_game_event(event_payload) + if publish_cluster: + await self._publish_dashboard_games_update_notice(trigger=str(event_payload.get('trigger') or '')) async def _get_dashboard_summary(self) -> dict: if self.gameplay_database is None: