change websocket and game publish to redis backend with pub sub
Build and Push Docker Container / build-and-push (push) Successful in 1m59s
Build and Push Docker Container / build-and-push (push) Successful in 1m59s
This commit is contained in:
+143
-12
@@ -22,7 +22,7 @@ from quart import (
|
||||
send_from_directory,
|
||||
websocket,
|
||||
)
|
||||
import asyncio, signal, logging, json, os, re, time
|
||||
import asyncio, signal, inspect, logging, json, os, re, time
|
||||
from typing import cast
|
||||
|
||||
class Server:
|
||||
@@ -53,6 +53,8 @@ class Server:
|
||||
ttl_seconds=game_state_ttl_sec,
|
||||
)
|
||||
metrics_backend_normalized = (metrics_backend or 'memory').strip().lower()
|
||||
self.metrics_backend_normalized = metrics_backend_normalized
|
||||
self.metrics_redis_url = metrics_redis_url
|
||||
self.stale_game_timeout_sec = self._get_stale_game_timeout_sec()
|
||||
|
||||
self.running_games:dict[str, GameBoard] = {}
|
||||
@@ -64,6 +66,12 @@ class Server:
|
||||
self.dashboard_ws_tasks_lock = asyncio.Lock()
|
||||
self.dashboard_ws_shutdown_event = asyncio.Event()
|
||||
self.dashboard_ws_shutdown_message = json.dumps({'type': 'dashboard_ws_shutdown'})
|
||||
self.dashboard_event_origin = f'worker-{os.getpid()}-{int(time.time() * 1000)}'
|
||||
self.dashboard_events_channel = os.getenv('DASHBOARD_EVENTS_CHANNEL', 'snake:dashboard:events')
|
||||
self.dashboard_events_enabled = (self.metrics_backend_normalized == 'redis' and self._env_bool('DASHBOARD_EVENTS_ENABLED', True))
|
||||
self.dashboard_events_listener_task:asyncio.Task | None = None
|
||||
self.dashboard_events_redis = None
|
||||
self.dashboard_events_pubsub = None
|
||||
|
||||
self.metrics_collector = MetricsCollector(
|
||||
metrics_manager=MetricsStoreBuilder.build(
|
||||
@@ -179,9 +187,11 @@ class Server:
|
||||
should_clear = await self.metrics_collector.should_clear_worker_metrics_on_startup(self.worker_metrics_startup_lock_ttl_sec)
|
||||
if should_clear:
|
||||
await self.metrics_collector.clear_worker_metrics()
|
||||
await self._start_dashboard_events_listener()
|
||||
|
||||
@self.app.after_serving
|
||||
async def shutdown_state_storage():
|
||||
await self._stop_dashboard_events_listener()
|
||||
await self.game_state_store.close()
|
||||
await self.metrics_collector.close()
|
||||
if self.gameplay_database is not None:
|
||||
@@ -218,13 +228,6 @@ class Server:
|
||||
initial_games=initial_games,
|
||||
)
|
||||
|
||||
@self.app.get('/dashboard/game/<string:game_id>')
|
||||
async def dashboard_game_replay(game_id:str):
|
||||
replay = await self._get_dashboard_game_replay(game_id)
|
||||
if replay is None:
|
||||
return jsonify({'error': 'game_not_found', 'game_id': game_id}), 404
|
||||
return jsonify(replay)
|
||||
|
||||
@self.app.get('/dashboard/customizations/<path:asset_path>')
|
||||
async def dashboard_customizations_asset(asset_path: str):
|
||||
customization_root = os.path.join(self.data_path, 'server', 'static', 'customizations')
|
||||
@@ -569,16 +572,139 @@ class Server:
|
||||
except asyncio.QueueFull:
|
||||
continue
|
||||
|
||||
async def _build_dashboard_games_event(self, game_state:dict|None=None) -> dict:
|
||||
async def _start_dashboard_events_listener(self) -> None:
|
||||
if not self.dashboard_events_enabled:
|
||||
return
|
||||
if self.dashboard_events_listener_task is not None:
|
||||
return
|
||||
|
||||
try:
|
||||
import redis.asyncio as aioredis # type: ignore[import-not-found]
|
||||
|
||||
self.dashboard_events_redis = aioredis.from_url(self.metrics_redis_url)
|
||||
self.dashboard_events_pubsub = self.dashboard_events_redis.pubsub()
|
||||
await self.dashboard_events_pubsub.subscribe(self.dashboard_events_channel)
|
||||
self.dashboard_events_listener_task = asyncio.create_task(
|
||||
self._dashboard_events_listener_loop()
|
||||
)
|
||||
except Exception as error:
|
||||
self.dashboard_events_listener_task = None
|
||||
self.dashboard_events_pubsub = None
|
||||
self.dashboard_events_redis = None
|
||||
await await_log(self.logger.warning(f'Dashboard events listener disabled (redis unavailable): {error}'))
|
||||
|
||||
async def _stop_dashboard_events_listener(self) -> None:
|
||||
listener_task = self.dashboard_events_listener_task
|
||||
self.dashboard_events_listener_task = None
|
||||
if listener_task is not None:
|
||||
listener_task.cancel()
|
||||
await asyncio.gather(listener_task, return_exceptions=True)
|
||||
|
||||
pubsub = self.dashboard_events_pubsub
|
||||
self.dashboard_events_pubsub = None
|
||||
if pubsub is not None:
|
||||
try:
|
||||
await pubsub.unsubscribe(self.dashboard_events_channel)
|
||||
except Exception:
|
||||
pass
|
||||
close_method = getattr(pubsub, 'aclose', None)
|
||||
if callable(close_method):
|
||||
try:
|
||||
maybe_result = close_method()
|
||||
if inspect.isawaitable(maybe_result):
|
||||
await maybe_result
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
redis_client = self.dashboard_events_redis
|
||||
self.dashboard_events_redis = None
|
||||
if redis_client is not None:
|
||||
close_method = getattr(redis_client, 'aclose', None)
|
||||
if callable(close_method):
|
||||
try:
|
||||
maybe_result = close_method()
|
||||
if inspect.isawaitable(maybe_result):
|
||||
await maybe_result
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
async def _dashboard_events_listener_loop(self) -> None:
|
||||
pubsub = self.dashboard_events_pubsub
|
||||
if pubsub is None:
|
||||
return
|
||||
|
||||
try:
|
||||
while not self.dashboard_ws_shutdown_event.is_set():
|
||||
message = await pubsub.get_message(
|
||||
ignore_subscribe_messages=True,
|
||||
timeout=1.0,
|
||||
)
|
||||
if message is None:
|
||||
continue
|
||||
|
||||
raw_data = message.get('data')
|
||||
if isinstance(raw_data, bytes):
|
||||
payload_raw = raw_data.decode('utf-8', errors='replace')
|
||||
else:
|
||||
payload_raw = str(raw_data)
|
||||
|
||||
try:
|
||||
payload = json.loads(payload_raw)
|
||||
except json.JSONDecodeError:
|
||||
continue
|
||||
|
||||
if not isinstance(payload, dict):
|
||||
continue
|
||||
if payload.get('type') != 'dashboard_games_update_notice':
|
||||
continue
|
||||
if payload.get('origin') == self.dashboard_event_origin:
|
||||
continue
|
||||
notice_trigger = str(payload.get('trigger') or 'game_saved')
|
||||
|
||||
await self._push_dashboard_games_update(
|
||||
game_state=None,
|
||||
publish_cluster=False,
|
||||
trigger=notice_trigger,
|
||||
)
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
except Exception as error:
|
||||
await await_log(
|
||||
self.logger.warning(f'Dashboard events listener stopped unexpectedly: {error}'))
|
||||
|
||||
async def _publish_dashboard_games_update_notice(self, trigger: str) -> None:
|
||||
if not self.dashboard_events_enabled:
|
||||
return
|
||||
if self.dashboard_events_redis is None:
|
||||
return
|
||||
if trigger not in {'game_saved', 'stale_finalized', 'manual'}:
|
||||
return
|
||||
|
||||
message = {
|
||||
'type': 'dashboard_games_update_notice',
|
||||
'origin': self.dashboard_event_origin,
|
||||
'trigger': trigger,
|
||||
'sent_at': int(time.time()),
|
||||
}
|
||||
try:
|
||||
await self.dashboard_events_redis.publish(
|
||||
self.dashboard_events_channel,
|
||||
json.dumps(message),
|
||||
)
|
||||
except Exception as error:
|
||||
await await_log(self.logger.warning(f'Dashboard events publish failed: {error}'))
|
||||
|
||||
async def _build_dashboard_games_event(self, game_state:dict|None=None, trigger_override:str|None=None) -> dict:
|
||||
games_payload = await self._get_dashboard_games(limit=100)
|
||||
summary_payload = await self._get_dashboard_summary()
|
||||
game_id = None
|
||||
if game_state is not None:
|
||||
game_id = game_state.get('game', {}).get('id')
|
||||
trigger = trigger_override or ('game_saved' if game_id else 'snapshot')
|
||||
|
||||
return {
|
||||
'type': 'dashboard_games_update',
|
||||
'trigger': 'game_saved' if game_id else 'snapshot',
|
||||
'trigger': trigger,
|
||||
'games': games_payload,
|
||||
'summary': summary_payload,
|
||||
}
|
||||
@@ -630,11 +756,16 @@ class Server:
|
||||
request_id=request_id,
|
||||
)
|
||||
|
||||
async def _push_dashboard_games_update(self, game_state:dict|None=None) -> None:
|
||||
async def _push_dashboard_games_update(self, game_state:dict|None=None, publish_cluster:bool=True, trigger:str|None=None) -> None:
|
||||
if self.gameplay_database is None:
|
||||
return
|
||||
event_payload = await self._build_dashboard_games_event(game_state)
|
||||
event_payload = await self._build_dashboard_games_event(
|
||||
game_state,
|
||||
trigger_override=trigger,
|
||||
)
|
||||
await self._broadcast_dashboard_game_event(event_payload)
|
||||
if publish_cluster:
|
||||
await self._publish_dashboard_games_update_notice(trigger=str(event_payload.get('trigger') or ''))
|
||||
|
||||
async def _get_dashboard_summary(self) -> dict:
|
||||
if self.gameplay_database is None:
|
||||
|
||||
Reference in New Issue
Block a user