From 097a7f295ac2426681f7fadbfe071d469801776b Mon Sep 17 00:00:00 2001 From: Daniel Dolezal Date: Mon, 6 Apr 2026 01:49:26 +0200 Subject: [PATCH] update dashboards over websocket and remove database path from GameplayDatabase output --- server/Server.py | 182 ++++++++++++++++++++++++++-- server/database/GameplayDatabase.py | 1 - server/templates/dashboard.html | 162 +++++++++++++++++++++++-- 3 files changed, 323 insertions(+), 22 deletions(-) diff --git a/server/Server.py b/server/Server.py index d74d8e6..f2260fc 100644 --- a/server/Server.py +++ b/server/Server.py @@ -22,7 +22,7 @@ from quart import ( send_from_directory, websocket, ) -import asyncio, logging, json, os, re, time +import asyncio, signal, logging, json, os, re, time from typing import cast class Server: @@ -59,7 +59,11 @@ class Server: self.game_move_counts:dict[str, int] = {} self.game_last_seen_unix:dict[str, int] = {} self.dashboard_game_subscribers:set[asyncio.Queue[str]] = set() - self.dashboard_game_subscribers_lock=asyncio.Lock() + self.dashboard_game_subscribers_lock = asyncio.Lock() + self.dashboard_ws_tasks:set[asyncio.Task] = set() + self.dashboard_ws_tasks_lock = asyncio.Lock() + self.dashboard_ws_shutdown_event = asyncio.Event() + self.dashboard_ws_shutdown_message = json.dumps({'type': 'dashboard_ws_shutdown'}) self.metrics_collector = MetricsCollector( metrics_manager=MetricsStoreBuilder.build( @@ -222,28 +226,118 @@ class Server: return jsonify(replay) @self.app.get('/dashboard/customizations/') - async def dashboard_customizations_asset(asset_path:str): + async def dashboard_customizations_asset(asset_path: str): customization_root = os.path.join(self.data_path, 'server', 'static', 'customizations') return await send_from_directory(customization_root, asset_path) @self.app.websocket('/dashboard/ws/games') async def dashboard_games_ws(): + websocket_task = asyncio.current_task() + if websocket_task is not None: + await self._register_dashboard_ws_task(websocket_task) + subscriber_queue: asyncio.Queue[str] = asyncio.Queue(maxsize=20) await self._register_dashboard_game_subscriber(subscriber_queue) try: initial_payload = await self._build_dashboard_games_event() - await websocket.send(json.dumps(initial_payload)) + await asyncio.wait_for(websocket.send(json.dumps(initial_payload)), timeout=1.5) while True: - event_payload = await subscriber_queue.get() - await websocket.send(event_payload) + queue_task = asyncio.create_task(subscriber_queue.get()) + receive_task = asyncio.create_task(websocket.receive()) + try: + done, _ = await asyncio.wait( + {queue_task, receive_task}, + timeout=1.0, + return_when=asyncio.FIRST_COMPLETED, + ) + + if len(done) == 0: + if self.dashboard_ws_shutdown_event.is_set(): + await asyncio.wait_for( + websocket.send(self.dashboard_ws_shutdown_message), + timeout=1.5, + ) + break + continue + + if receive_task in done: + try: + request_payload_raw = receive_task.result() + except Exception: + break + + response_event = await self._handle_dashboard_ws_request( + request_payload_raw + ) + if response_event is not None: + await asyncio.wait_for( + websocket.send(json.dumps(response_event)), + timeout=1.5, + ) + + if queue_task in done: + event_payload = queue_task.result() + if event_payload == self.dashboard_ws_shutdown_message: + await asyncio.wait_for( + websocket.send(event_payload), timeout=1.5 + ) + break + await asyncio.wait_for( + websocket.send(event_payload), timeout=1.5 + ) + except asyncio.TimeoutError: + if self.dashboard_ws_shutdown_event.is_set(): + await asyncio.wait_for( + websocket.send(self.dashboard_ws_shutdown_message), + timeout=1.5, + ) + break + finally: + for pending_task in (queue_task, receive_task): + if not pending_task.done(): + pending_task.cancel() + await asyncio.gather( + queue_task, receive_task, return_exceptions=True + ) + except asyncio.CancelledError: + pass + except Exception: + pass finally: await self._unregister_dashboard_game_subscriber(subscriber_queue) + if websocket_task is not None: + await self._unregister_dashboard_ws_task(websocket_task) async def run(self, host:str='0.0.0.0', port:int=8000, debug:bool=False): logging.getLogger('werkzeug').setLevel(logging.ERROR) + loop = asyncio.get_running_loop() + shutdown_event = asyncio.Event() + installed_signal_handlers:list[signal.Signals] = [] - await await_log(self.logger.info(f'Running Battlesnake at http://{host}:{port} with the {" ".join(re.findall("[A-Z][^A-Z]*", self.snake_type))}')) - await self.app.run_task(host=host, port=port, debug=debug) + def on_shutdown_signal() -> None: + self._request_dashboard_ws_shutdown() + shutdown_event.set() + + async def shutdown_trigger() -> None: + await shutdown_event.wait() + + for shutdown_signal in (signal.SIGINT, signal.SIGTERM): + try: + loop.add_signal_handler(shutdown_signal, on_shutdown_signal) + installed_signal_handlers.append(shutdown_signal) + except (NotImplementedError, RuntimeError): + continue + + await await_log(self.logger.info(f'Running Battlesnake at http://{host}:{port} with the {' '.join(re.findall('[A-Z][^A-Z]*', self.snake_type))}')) + try: + await self.app.run_task(host=host, port=port, debug=debug, shutdown_trigger=shutdown_trigger) + finally: + self._request_dashboard_ws_shutdown() + for shutdown_signal in installed_signal_handlers: + try: + loop.remove_signal_handler(shutdown_signal) + except Exception: + continue async def _read_json_config_or_create(self) -> dict[str, str]: snake_config = cast(dict[str, str]|None, await read_file(self.config_file, json.load)) @@ -451,6 +545,30 @@ class Server: except asyncio.QueueFull: continue + async def _register_dashboard_ws_task(self, websocket_task:asyncio.Task) -> None: + async with self.dashboard_ws_tasks_lock: + self.dashboard_ws_tasks.add(websocket_task) + + async def _unregister_dashboard_ws_task(self, websocket_task:asyncio.Task) -> None: + async with self.dashboard_ws_tasks_lock: + self.dashboard_ws_tasks.discard(websocket_task) + + def _request_dashboard_ws_shutdown(self) -> None: + if self.dashboard_ws_shutdown_event.is_set(): + return + + self.dashboard_ws_shutdown_event.set() + for subscriber_queue in tuple(self.dashboard_game_subscribers): + if subscriber_queue.full(): + try: + subscriber_queue.get_nowait() + except asyncio.QueueEmpty: + pass + try: + subscriber_queue.put_nowait(self.dashboard_ws_shutdown_message) + except asyncio.QueueFull: + continue + async def _build_dashboard_games_event(self, game_state:dict|None=None) -> dict: games_payload = await self._get_dashboard_games(limit=100) summary_payload = await self._get_dashboard_summary() @@ -461,11 +579,57 @@ class Server: return { 'type': 'dashboard_games_update', 'trigger': 'game_saved' if game_id else 'snapshot', - 'game_id': game_id, 'games': games_payload, 'summary': summary_payload, } + async def _build_dashboard_game_replay_event(self, game_id:str, request_id:str|None=None) -> dict: + replay_payload = await self._get_dashboard_game_replay(game_id) + if replay_payload is None: + return { + 'type': 'dashboard_game_replay', + 'request_id': request_id, + 'game_id': game_id, + 'error': 'game_not_found', + } + + return { + 'type': 'dashboard_game_replay', + 'request_id': request_id, + 'game_id': game_id, + 'replay': replay_payload, + } + + async def _handle_dashboard_ws_request(self, payload_raw: object) -> dict | None: + if not isinstance(payload_raw, str): + return None + + try: + payload = json.loads(payload_raw) + except json.JSONDecodeError: + return None + + if not isinstance(payload, dict): + return None + + if payload.get('type') != 'dashboard_game_replay_request': + return None + + game_id = str(payload.get('game_id') or '').strip() + request_id_raw = payload.get('request_id') + request_id = None if request_id_raw is None else str(request_id_raw) + if game_id == '': + return { + 'type': 'dashboard_game_replay', + 'request_id': request_id, + 'error': 'missing_game_id', + } + + return await self._build_dashboard_game_replay_event( + game_id=game_id, + request_id=request_id, + ) + async def _push_dashboard_games_update(self, game_state:dict|None=None) -> None: if self.gameplay_database is None: return diff --git a/server/database/GameplayDatabase.py b/server/database/GameplayDatabase.py index 46fe657..63ee9d8 100644 --- a/server/database/GameplayDatabase.py +++ b/server/database/GameplayDatabase.py @@ -472,7 +472,6 @@ class GameplayDatabase: ).fetchall() return { - "database": self.db_path, "total_games": int(totals["total_games"] or 0), "running_games": int(totals["running_games"] or 0), "finished_games": int(totals["finished_games"] or 0), diff --git a/server/templates/dashboard.html b/server/templates/dashboard.html index 44aac2a..ba7fac7 100644 --- a/server/templates/dashboard.html +++ b/server/templates/dashboard.html @@ -750,6 +750,10 @@ let activeGameId = String(initialGameId || ""); let gamesWebSocket = null; let gamesWebSocketReconnectTimer = null; + let gamesWebSocketServerShuttingDown = false; + let hasLoadedReplayOnce = false; + let replayRequestSeq = 0; + const pendingReplayRequests = new Map(); let selectedSnakeId = null; const svgCache = new Map(); @@ -1373,6 +1377,7 @@ } function scheduleDashboardGamesWebSocketReconnect() { + if (gamesWebSocketServerShuttingDown) return; if (gamesWebSocketReconnectTimer) return; gamesWebSocketReconnectTimer = window.setTimeout(() => { gamesWebSocketReconnectTimer = null; @@ -1380,6 +1385,14 @@ }, 1500); } + function rejectPendingReplayRequests(message) { + for (const pending of pendingReplayRequests.values()) { + window.clearTimeout(pending.timeoutId); + pending.reject(new Error(message)); + } + pendingReplayRequests.clear(); + } + function applyDashboardGamesUpdate(payload) { const nextSummary = payload && payload.summary && typeof payload.summary === "object" ? payload.summary @@ -1407,9 +1420,8 @@ clearActiveGame(); } - const payloadGameId = payload && payload.game_id ? String(payload.game_id) : ""; - if (payloadGameId && payloadGameId === activeGameId) { - loadReplay(payloadGameId); + if (activeGameId && gameIds.has(activeGameId) && payload && payload.trigger === "game_saved") { + loadReplay(activeGameId); return; } @@ -1421,6 +1433,10 @@ } function connectDashboardGamesWebSocket() { + if (gamesWebSocketServerShuttingDown) return; + if (gamesWebSocket && (gamesWebSocket.readyState === WebSocket.OPEN || gamesWebSocket.readyState === WebSocket.CONNECTING)) { + return; + } const wsUrl = dashboardGamesWebSocketUrl(); try { gamesWebSocket = new WebSocket(wsUrl); @@ -1436,13 +1452,47 @@ } catch { return; } - if (!payload || payload.type !== "dashboard_games_update") return; - applyDashboardGamesUpdate(payload); + if (!payload || !payload.type) return; + + if (payload.type === "dashboard_ws_shutdown") { + gamesWebSocketServerShuttingDown = true; + if (gamesWebSocketReconnectTimer) { + clearTimeout(gamesWebSocketReconnectTimer); + gamesWebSocketReconnectTimer = null; + } + rejectPendingReplayRequests("Server shutting down"); + if (gamesWebSocket) { + gamesWebSocket.close(); + } + return; + } + + if (payload.type === "dashboard_game_replay") { + const requestId = String(payload.request_id || ""); + if (!requestId) return; + const pending = pendingReplayRequests.get(requestId); + if (!pending) return; + pendingReplayRequests.delete(requestId); + window.clearTimeout(pending.timeoutId); + if (payload.error) { + pending.reject(new Error(String(payload.error))); + return; + } + pending.resolve(payload.replay || null); + return; + } + + if (payload.type === "dashboard_games_update") { + applyDashboardGamesUpdate(payload); + } }); gamesWebSocket.addEventListener("close", () => { gamesWebSocket = null; - scheduleDashboardGamesWebSocketReconnect(); + rejectPendingReplayRequests("Dashboard websocket disconnected"); + if (!gamesWebSocketServerShuttingDown) { + scheduleDashboardGamesWebSocketReconnect(); + } }); gamesWebSocket.addEventListener("error", () => { @@ -1452,6 +1502,79 @@ }); } + function waitForDashboardGamesWebSocketOpen(timeoutMs = 4000) { + if (gamesWebSocketServerShuttingDown) { + return Promise.resolve(false); + } + + if (!gamesWebSocket || gamesWebSocket.readyState === WebSocket.CLOSED) { + connectDashboardGamesWebSocket(); + } + + if (gamesWebSocket && gamesWebSocket.readyState === WebSocket.OPEN) { + return Promise.resolve(true); + } + + return new Promise((resolve) => { + if (!gamesWebSocket) { + resolve(false); + return; + } + + const socketRef = gamesWebSocket; + let settled = false; + const cleanup = () => { + if (!socketRef) return; + socketRef.removeEventListener("open", onOpen); + socketRef.removeEventListener("close", onClose); + socketRef.removeEventListener("error", onError); + window.clearTimeout(timeoutId); + }; + const finish = (value) => { + if (settled) return; + settled = true; + cleanup(); + resolve(value); + }; + const onOpen = () => finish(true); + const onClose = () => finish(false); + const onError = () => finish(false); + const timeoutId = window.setTimeout(() => finish(false), timeoutMs); + + socketRef.addEventListener("open", onOpen); + socketRef.addEventListener("close", onClose); + socketRef.addEventListener("error", onError); + }); + } + + async function requestReplayOverWebSocket(gameId) { + const isOpen = await waitForDashboardGamesWebSocketOpen(); + if (!isOpen || !gamesWebSocket || gamesWebSocket.readyState !== WebSocket.OPEN) { + throw new Error("Dashboard websocket unavailable"); + } + + const requestId = `replay-${Date.now()}-${replayRequestSeq++}`; + return await new Promise((resolve, reject) => { + const timeoutId = window.setTimeout(() => { + pendingReplayRequests.delete(requestId); + reject(new Error(`Replay websocket timeout for ${gameId}`)); + }, 4000); + + pendingReplayRequests.set(requestId, { resolve, reject, timeoutId }); + try { + gamesWebSocket.send(JSON.stringify({ + type: "dashboard_game_replay_request", + request_id: requestId, + game_id: gameId, + })); + } catch (error) { + window.clearTimeout(timeoutId); + pendingReplayRequests.delete(requestId); + reject(error); + } + }); + } + function clearActiveGame() { for (const row of gamesBodyEl.querySelectorAll("tr")) { row.classList.remove("active"); @@ -1609,12 +1732,24 @@ } async function loadReplay(gameId) { - const response = await fetch(`/dashboard/game/${gameId}`); - if (!response.ok) { - renderThinking({ my_move: "-", my_thinking: { error: `Replay load failed for ${gameId}` } }); - return; + let nextReplay = null; + try { + nextReplay = await requestReplayOverWebSocket(gameId); + } catch { + if (!hasLoadedReplayOnce) { + renderThinking({ my_move: "-", my_thinking: { error: `Replay websocket unavailable for ${gameId}` } }); + return; + } + const response = await fetch(`/dashboard/game/${gameId}`); + if (!response.ok) { + renderThinking({ my_move: "-", my_thinking: { error: `Replay load failed for ${gameId}` } }); + return; + } + nextReplay = await response.json(); } - replay = await response.json(); + + replay = nextReplay; + hasLoadedReplayOnce = true; activeGameId = String(gameId || ""); await preloadReplaySvgs(); turnIndex = 0; @@ -1749,8 +1884,9 @@ async function boot() { renderThinking(null); loadSummary(); - await loadGames(); connectDashboardGamesWebSocket(); + await waitForDashboardGamesWebSocketOpen(2500); + await loadGames(); syncMonoOffset(); } @@ -1780,10 +1916,12 @@ }); window.addEventListener("beforeunload", () => { + gamesWebSocketServerShuttingDown = true; if (gamesWebSocketReconnectTimer) { clearTimeout(gamesWebSocketReconnectTimer); gamesWebSocketReconnectTimer = null; } + rejectPendingReplayRequests("Dashboard unloading"); if (gamesWebSocket) { gamesWebSocket.close(); gamesWebSocket = null;