diff --git a/server/Server.py b/server/Server.py index 4607fb2..9670a44 100644 --- a/server/Server.py +++ b/server/Server.py @@ -3,7 +3,6 @@ from quart_common.web.env import env_bool, env_int from server.Files import read_file from server.game_state_store import GameStateStoreBuilder -from server.GameBoard import GameBoard from snakes import SnakeBuilder @@ -24,8 +23,14 @@ from server.blueprints import ( create_metrics_blueprint, create_dashboard_blueprint, ) -from server.services import DashboardEventsService -from server.services import DashboardWebSocketHub +from server.services import ( + DashboardEventsService, + DashboardWebSocketHub, + GameRuntimeService, + GameplayTrackingService, + DashboardQueryService, +) + class Server: default_snake_config = { @@ -59,9 +64,12 @@ class Server: self.metrics_redis_url = metrics_redis_url self.stale_game_timeout_sec = self._get_stale_game_timeout_sec() - self.running_games: dict[str, GameBoard] = {} - self.game_move_counts: dict[str, int] = {} - self.game_last_seen_unix: dict[str, int] = {} + self.game_runtime = GameRuntimeService( + game_state_store=self.game_state_store, + snake_type=self.snake_type, + game_state_local_cache=self.game_state_local_cache, + stale_game_timeout_sec=self.stale_game_timeout_sec, + ) self.dashboard_ws_hub = DashboardWebSocketHub() dashboard_event_origin = f'worker-{os.getpid()}-{int(time.time() * 1000)}' dashboard_events_channel = os.getenv('DASHBOARD_EVENTS_CHANNEL', 'snake:dashboard:events') @@ -78,15 +86,39 @@ class Server: metrics_backend=metrics_backend_normalized, game_state_backend=game_state_backend, stale_game_timeout_sec=self.stale_game_timeout_sec, - game_last_seen_unix=self.game_last_seen_unix, - game_move_counts=self.game_move_counts, + game_last_seen_unix=self.game_runtime.game_last_seen_unix, + game_move_counts=self.game_runtime.game_move_counts, ) + + self.game_runtime.attach_metrics_collector(self.metrics_collector) self.clear_worker_metrics_on_startup = env_bool('METRICS_CLEAR_WORKERS_ON_STARTUP', True) self.worker_metrics_startup_lock_ttl_sec = env_int('METRICS_STARTUP_CLEANUP_LOCK_TTL_SEC', 300) self.dashboard_running_game_stale_sec = 600 self._startup_worker_metrics_cleared = False self.logger = build_logger('Battlesnake', debug_env_var='DEBUG_SERVER') + self.snake_builder = SnakeBuilder + self.snake_version = self._get_snake_version() + self.gameplay_database = None + if gameplay_db_enabled: + db_path = gameplay_db_path or os.path.join(data_path, 'data', 'database', 'gameplay.sqlite3') + self.gameplay_database = GameplayDatabase( + db_path=db_path, + busy_timeout_ms=gameplay_db_busy_timeout_ms, + ) + + self.gameplay_tracking = GameplayTrackingService( + gameplay_database=self.gameplay_database, + snake_type=self.snake_type, + snake_version=self.snake_version, + logger=self.logger, + ) + self.dashboard_query = DashboardQueryService( + gameplay_database=self.gameplay_database, + ws_hub=self.dashboard_ws_hub, + logger=self.logger, + dashboard_running_game_stale_sec=self.dashboard_running_game_stale_sec, + ) self.dashboard_events_service = DashboardEventsService( enabled=dashboard_events_enabled, redis_url=self.metrics_redis_url, @@ -96,14 +128,7 @@ class Server: on_notice=self._on_dashboard_games_update_notice, logger=self.logger, ) - self.snake_version = self._get_snake_version() - self.gameplay_database = None - if gameplay_db_enabled: - db_path = gameplay_db_path or os.path.join(data_path, 'data', 'database', 'gameplay.sqlite3') - self.gameplay_database = GameplayDatabase( - db_path=db_path, - busy_timeout_ms=gameplay_db_busy_timeout_ms, - ) + self.dashboard_query.set_publish_notice(self.dashboard_events_service.publish_notice) self.app = Quart('Battlesnake', template_folder=os.path.join(data_path, 'server', 'templates')) @@ -143,7 +168,7 @@ class Server: shutdown_event = asyncio.Event() def on_shutdown_signal() -> None: - self._request_dashboard_ws_shutdown() + self.dashboard_ws_hub.request_shutdown() shutdown_event.set() async def shutdown_trigger() -> None: @@ -160,7 +185,7 @@ class Server: try: await self.app.run_task(host=host, port=port, debug=debug, shutdown_trigger=shutdown_trigger) finally: - self._request_dashboard_ws_shutdown() + self.dashboard_ws_hub.request_shutdown() for shutdown_signal in installed_signal_handlers: try: loop.remove_signal_handler(shutdown_signal) @@ -207,66 +232,6 @@ class Server: def _get_stale_game_timeout_sec(self) -> int: return max(30, env_int('SNAKE_STUCK_GAME_TIMEOUT_SEC', 180)) - async def _create_game_board(self, game_state:dict) -> GameBoard: - game_id = game_state['game']['id'] - new_game_board = GameBoard( - game_id=game_id, - width=game_state['board']['width'], - height=game_state['board']['height'], - ruleset=game_state['game']['ruleset'], - source=game_state['game']['source'], - map=game_state['game']['map'], - snake_class=SnakeBuilder.build(self.snake_type), - ) - await new_game_board.start_game(game_state) - - if self.game_state_local_cache: - self.running_games[game_id] = new_game_board - await self.game_state_store.save(game_id, new_game_board) - self.game_move_counts[game_id] = 0 - self.game_last_seen_unix[game_id] = int(time.time()) - await self.metrics_collector.record_game_started(len(self.game_last_seen_unix)) - return new_game_board - - async def _persist_game_board(self, game_id:str, game_board:GameBoard): - if self.game_state_local_cache: - self.running_games[game_id] = game_board - await self.game_state_store.save(game_id, game_board) - - async def _delete_game_board(self, game_state:dict): - game_id = game_state['game']['id'] - self.running_games.pop(game_id, None) - self.game_move_counts.pop(game_id, None) - self.game_last_seen_unix.pop(game_id, None) - await self.game_state_store.delete(game_id) - - async def _get_game_board(self, game_state:dict, end:bool=False) -> GameBoard: - game_id = game_state['game']['id'] - game_board:GameBoard - if self.game_state_local_cache and game_id in self.running_games: - game_board = self.running_games[game_id] - else: - persisted_board = await self.game_state_store.load(game_id) - if persisted_board is not None: - game_board = cast(GameBoard, persisted_board) - if self.game_state_local_cache: - self.running_games[game_id] = game_board - else: - game_board = await self._create_game_board(game_state) - await self.metrics_collector.record_game_autocreated() - - if not end: - self.game_move_counts[game_id] = self.game_move_counts.get(game_id, 0) + 1 - - self.game_last_seen_unix[game_id] = int(time.time()) - - game_board.read_game_data(game_state) - if end: - game_board.end_game(game_state) - await self._persist_game_board(game_id, game_board) - - return game_board - def enable_store_game_state(self): self.store_game_state = True @@ -274,199 +239,5 @@ class Server: storage = StorageLoader.build(self.storage_type)() return storage.cleanup() - async def _prune_stale_games(self): - if not self.game_last_seen_unix: - return - - now = int(time.time()) - stale_ids = [ - game_id - for game_id, last_seen in self.game_last_seen_unix.items() - if now - last_seen >= self.stale_game_timeout_sec - ] - for game_id in stale_ids: - self.running_games.pop(game_id, None) - self.game_move_counts.pop(game_id, None) - self.game_last_seen_unix.pop(game_id, None) - await self.metrics_collector.record_stuck_removed() - - async def _record_gameplay_start(self, game_state:dict) -> None: - if self.gameplay_database is None: - return - try: - await self.gameplay_database.record_game_start( - game_state, - snake_type=self.snake_type, - snake_version=self.snake_version, - ) - except Exception as error: - await await_log(self.logger.warning(f'Gameplay DB start record failed:{error}')) - - def _extract_latest_snake_thinking(self, game_board:GameBoard) -> dict|None: - try: - history = game_board.snake_class.get_history() - except Exception: - return None - if not isinstance(history, list) or len(history) == 0: - return None - latest = history[-1] - return latest if isinstance(latest, dict) else None - - async def _record_gameplay_turn(self, game_state:dict, my_move:str, game_board:GameBoard) -> None: - if self.gameplay_database is None: - return - try: - thinking = self._extract_latest_snake_thinking(game_board) - await self.gameplay_database.record_turn(game_state, my_move, thinking) - except Exception as error: - await await_log(self.logger.warning(f'Gameplay DB turn record failed:{error}')) - - async def _record_gameplay_end(self, game_state:dict) -> None: - if self.gameplay_database is None: - return - try: - await self.gameplay_database.record_game_end(game_state) - except Exception as error: - await await_log(self.logger.warning(f'Gameplay DB end record failed:{error}')) - - async def _register_dashboard_game_subscriber(self, subscriber_queue:asyncio.Queue[str]) -> None: - await self.dashboard_ws_hub.register_subscriber(subscriber_queue) - - async def _unregister_dashboard_game_subscriber(self, subscriber_queue:asyncio.Queue[str]) -> None: - await self.dashboard_ws_hub.unregister_subscriber(subscriber_queue) - - async def _broadcast_dashboard_game_event(self, payload:dict) -> None: - await self.dashboard_ws_hub.broadcast_payload(payload) - - async def _register_dashboard_ws_task(self, websocket_task:asyncio.Task) -> None: - await self.dashboard_ws_hub.register_task(websocket_task) - - async def _unregister_dashboard_ws_task(self, websocket_task:asyncio.Task) -> None: - await self.dashboard_ws_hub.unregister_task(websocket_task) - - def _request_dashboard_ws_shutdown(self) -> None: - self.dashboard_ws_hub.request_shutdown() - async def _on_dashboard_games_update_notice(self, trigger:str) -> None: - await self._push_dashboard_games_update( - game_state=None, - publish_cluster=False, - trigger=trigger, - ) - - async def _build_dashboard_games_event(self, game_state:dict|None=None, trigger_override:str|None=None) -> dict: - games_payload = await self._get_dashboard_games(limit=100) - summary_payload = await self._get_dashboard_summary() - game_id = None - if game_state is not None: - game_id = game_state.get('game', {}).get('id') - trigger = trigger_override or ('game_saved' if game_id else 'snapshot') - - return { - 'type': 'dashboard_games_update', - 'trigger': trigger, - 'games': games_payload, - 'summary': summary_payload, - } - - async def _build_dashboard_game_replay_event(self, game_id:str, request_id:str|None=None) -> dict: - replay_payload = await self._get_dashboard_game_replay(game_id) - if replay_payload is None: - return { - 'type': 'dashboard_game_replay', - 'request_id': request_id, - 'game_id': game_id, - 'error': 'game_not_found', - } - - return { - 'type': 'dashboard_game_replay', - 'request_id': request_id, - 'game_id': game_id, - 'replay': replay_payload, - } - - async def _handle_dashboard_ws_request(self, payload_raw:object) -> dict|None: - if not isinstance(payload_raw, str): - return None - - try: - payload = json.loads(payload_raw) - except json.JSONDecodeError: - return None - - if not isinstance(payload, dict): - return None - - if payload.get('type') != 'dashboard_game_replay_request': - return None - - game_id = str(payload.get('game_id') or '').strip() - request_id_raw = payload.get('request_id') - request_id = None if request_id_raw is None else str(request_id_raw) - if game_id == '': - return { - 'type': 'dashboard_game_replay', - 'request_id': request_id, - 'error': 'missing_game_id', - } - - return await self._build_dashboard_game_replay_event( - game_id=game_id, - request_id=request_id, - ) - - async def _push_dashboard_games_update(self, game_state:dict|None=None, publish_cluster:bool=True, trigger:str|None=None) -> None: - if self.gameplay_database is None: - return - event_payload = await self._build_dashboard_games_event( - game_state, - trigger_override=trigger, - ) - await self._broadcast_dashboard_game_event(event_payload) - if publish_cluster: - await self.dashboard_events_service.publish_notice(trigger=str(event_payload.get('trigger') or '')) - - async def _get_dashboard_summary(self) -> dict: - if self.gameplay_database is None: - return {'enabled': False} - try: - await self._finalize_stale_dashboard_games() - summary = await self.gameplay_database.get_summary() - summary['enabled'] = True - return summary - except Exception as error: - await await_log(self.logger.warning(f'Gameplay DB summary failed:{error}')) - return {'enabled': True, 'error': ' summary_unavailable'} - - async def _get_dashboard_games(self, limit:int=50) -> dict: - if self.gameplay_database is None: - return {'enabled': False, 'games': []} - try: - await self._finalize_stale_dashboard_games() - games = await self.gameplay_database.list_games(limit=limit) - return {'enabled': True, 'games': games} - except Exception as error: - await await_log(self.logger.warning(f'Gameplay DB game list failed:{error}')) - return {'enabled': True, 'error': 'games_unavailable', 'games': []} - - async def _finalize_stale_dashboard_games(self) -> None: - if self.gameplay_database is None: - return - try: - await self.gameplay_database.finalize_stale_running_games(stale_after_seconds=self.dashboard_running_game_stale_sec) - except Exception as error: - await await_log(self.logger.warning(f'Gameplay DB stale running game finalize failed:{error}')) - - async def _get_dashboard_game_replay(self, game_id:str) -> dict|None: - if self.gameplay_database is None: - return {'enabled': False, 'error': 'database_disabled', 'game_id': game_id} - try: - replay = await self.gameplay_database.get_game_replay(game_id) - if replay is None: - return None - replay['enabled'] = True - return replay - except Exception as error: - await await_log(self.logger.warning(f'Gameplay DB replay failed:{error}')) - return {'enabled': True, 'error': 'replay_unavailable', 'game_id': game_id} + await self.dashboard_query.on_dashboard_games_update_notice(trigger) diff --git a/server/blueprints/battlesnake.py b/server/blueprints/battlesnake.py index 61016e3..f86fab3 100644 --- a/server/blueprints/battlesnake.py +++ b/server/blueprints/battlesnake.py @@ -23,10 +23,10 @@ def create_battlesnake_blueprint(server:'Server') -> Blueprint: @blueprint.post('/start') async def on_start(): server.metrics_collector.record_http_request('start') - await server._prune_stale_games() + await server.game_runtime.prune_stale_games() game_state = await request.get_json() - await server._create_game_board(game_state) - await server._record_gameplay_start(game_state) + await server.game_runtime.create_game_board(game_state, snake_builder=server.snake_builder) + await server.gameplay_tracking.record_gameplay_start(game_state) await await_log(server.logger.info(f'GAME START: {game_state['game']}')) return 'ok' @@ -35,10 +35,10 @@ def create_battlesnake_blueprint(server:'Server') -> Blueprint: server.metrics_collector.record_http_request('move') game_state = await request.get_json() move_started = time.perf_counter() - game_board = cast(GameBoard, await server._get_game_board(game_state)) + game_board = cast(GameBoard, await server.game_runtime.get_game_board(game_state, snake_builder=server.snake_builder)) next_move = game_board.snake_neat_make_a_move() - await server._persist_game_board(game_state['game']['id'], game_board) - await server._record_gameplay_turn(game_state, next_move, game_board) + await server.game_runtime.persist_game_board(game_state['game']['id'], game_board) + await server.gameplay_tracking.record_gameplay_turn(game_state, next_move, game_board) elapsed_ms = (time.perf_counter() - move_started) * 1000.0 await server.metrics_collector.record_move(next_move, elapsed_ms) @@ -50,10 +50,10 @@ def create_battlesnake_blueprint(server:'Server') -> Blueprint: @blueprint.post('/end') async def on_end(): server.metrics_collector.record_http_request('end') - await server._prune_stale_games() + await server.game_runtime.prune_stale_games() game_state = await request.get_json() if server.store_game_state: - game_board = cast(GameBoard, await server._get_game_board(game_state, end=True)) + game_board = cast(GameBoard, await server.game_runtime.get_game_board(game_state, snake_builder=server.snake_builder, end=True)) if server.check_tls_security: await game_board.save( StorageLoader.build(server.storage_type), @@ -68,14 +68,14 @@ def create_battlesnake_blueprint(server:'Server') -> Blueprint: database=os.getenv('EDGEDB_DATABASE', None), ) - await server._record_gameplay_end(game_state) - await server._push_dashboard_games_update(game_state) + await server.gameplay_tracking.record_gameplay_end(game_state) + await server.dashboard_query.push_dashboard_games_update(game_state) await await_log(server.logger.info(f'GAME ENDED: Winner is {[x['name'] for x in game_state['board']['snakes']]}')) - await server._delete_game_board(game_state) + await server.game_runtime.delete_game_board(game_state) await server.metrics_collector.record_game_end(game_state) return 'ok' - @blueprint.get("/cleanup") + @blueprint.get('/cleanup') async def cleanup(): results = server._cleanup_database() return jsonify(data=json.loads(results), status=200) diff --git a/server/blueprints/dashboard.py b/server/blueprints/dashboard.py index a72e3c4..b8a970d 100644 --- a/server/blueprints/dashboard.py +++ b/server/blueprints/dashboard.py @@ -18,8 +18,8 @@ def create_dashboard_blueprint(server:'Server') -> Blueprint: @blueprint.get('/dashboard') async def dashboard_view(): initial_game_id = request.args.get('game_id', '') - initial_summary = await server._get_dashboard_summary() - initial_games = await server._get_dashboard_games(limit=100) + initial_summary = await server.dashboard_query.get_dashboard_summary() + initial_games = await server.dashboard_query.get_dashboard_games(limit=100) return await render_template( 'dashboard.html', initial_game_id=initial_game_id, @@ -42,12 +42,12 @@ def create_dashboard_blueprint(server:'Server') -> Blueprint: ws_hub = server.dashboard_ws_hub websocket_task = asyncio.current_task() if websocket_task is not None: - await server._register_dashboard_ws_task(websocket_task) + await ws_hub.register_task(websocket_task) subscriber_queue:asyncio.Queue[str] = asyncio.Queue(maxsize=20) - await server._register_dashboard_game_subscriber(subscriber_queue) + await ws_hub.register_subscriber(subscriber_queue) try: - initial_payload = await server._build_dashboard_games_event() + initial_payload = await server.dashboard_query.build_dashboard_games_event() await asyncio.wait_for( websocket.send(json.dumps(initial_payload)), timeout=1.5 ) @@ -76,7 +76,7 @@ def create_dashboard_blueprint(server:'Server') -> Blueprint: except Exception: break - response_event = await server._handle_dashboard_ws_request(request_payload_raw) + response_event = await server.dashboard_query.handle_dashboard_ws_request(request_payload_raw) if response_event is not None: await asyncio.wait_for( websocket.send(json.dumps(response_event)), @@ -112,8 +112,8 @@ def create_dashboard_blueprint(server:'Server') -> Blueprint: except Exception: pass finally: - await server._unregister_dashboard_game_subscriber(subscriber_queue) + await ws_hub.unregister_subscriber(subscriber_queue) if websocket_task is not None: - await server._unregister_dashboard_ws_task(websocket_task) + await ws_hub.unregister_task(websocket_task) return blueprint diff --git a/server/blueprints/metrics.py b/server/blueprints/metrics.py index d421ee2..07ce75a 100644 --- a/server/blueprints/metrics.py +++ b/server/blueprints/metrics.py @@ -4,27 +4,27 @@ from typing import TYPE_CHECKING if TYPE_CHECKING: from server.Server import Server -def create_metrics_blueprint(server:"Server") -> Blueprint: - blueprint = Blueprint("metrics", __name__) +def create_metrics_blueprint(server:'Server') -> Blueprint: + blueprint = Blueprint('metrics', __name__) - @blueprint.get("/metrics") + @blueprint.get('/metrics') async def metrics(): snapshot = await server.metrics_collector.build_snapshot( - server.game_last_seen_unix, - server.game_move_counts, + server.game_runtime.game_last_seen_unix, + server.game_runtime.game_move_counts, ) return jsonify(snapshot) - @blueprint.get("/metrics/prometheus") + @blueprint.get('/metrics/prometheus') async def metrics_prometheus(): snapshot = await server.metrics_collector.build_snapshot( - server.game_last_seen_unix, - server.game_move_counts, + server.game_runtime.game_last_seen_unix, + server.game_runtime.game_move_counts, ) return ( server.metrics_collector.build_prometheus_metrics(snapshot), 200, - {"Content-Type": "text/plain; version=0.0.4; charset=utf-8"}, + {'Content-Type': 'text/plain; version=0.0.4; charset=utf-8'}, ) return blueprint diff --git a/server/services/__init__.py b/server/services/__init__.py index a7f0fb2..2a4c9b5 100644 --- a/server/services/__init__.py +++ b/server/services/__init__.py @@ -1,2 +1,5 @@ from .dashboard_events import DashboardEventsService from .dashboard_ws_hub import DashboardWebSocketHub +from .game_runtime import GameRuntimeService +from .gameplay_tracking import GameplayTrackingService +from .dashboard_query import DashboardQueryService diff --git a/server/services/dashboard_query.py b/server/services/dashboard_query.py new file mode 100644 index 0000000..557b2bb --- /dev/null +++ b/server/services/dashboard_query.py @@ -0,0 +1,144 @@ +from quart_common.web.logger import await_log, logging + +from typing import Awaitable, Callable +import json + +from .dashboard_ws_hub import DashboardWebSocketHub +from server.database import GameplayDatabase + +class DashboardQueryService: + def __init__(self, gameplay_database:GameplayDatabase, ws_hub:DashboardWebSocketHub, logger:logging, dashboard_running_game_stale_sec:int): + self.gameplay_database = gameplay_database + self.ws_hub = ws_hub + self.logger = logger + self.dashboard_running_game_stale_sec = dashboard_running_game_stale_sec + self.publish_notice:Callable[[str], Awaitable[None]] | None = None + + def set_publish_notice(self, publish_notice:Callable[[str], Awaitable[None]]) -> None: + self.publish_notice = publish_notice + + async def on_dashboard_games_update_notice(self, trigger:str) -> None: + await self.push_dashboard_games_update( + game_state=None, + publish_cluster=False, + trigger=trigger, + ) + + async def build_dashboard_games_event(self, game_state:dict|None=None, trigger_override:str|None=None) -> dict: + games_payload = await self.get_dashboard_games(limit=100) + summary_payload = await self.get_dashboard_summary() + game_id = None + if game_state is not None: + game_id = game_state.get('game', {}).get('id') + trigger = trigger_override or ('game_saved' if game_id else 'snapshot') + + return { + 'type': 'dashboard_games_update', + 'trigger': trigger, + 'games': games_payload, + 'summary': summary_payload, + } + + async def build_dashboard_game_replay_event(self, game_id:str, request_id:str|None=None) -> dict: + replay_payload = await self.get_dashboard_game_replay(game_id) + if replay_payload is None: + return { + 'type': 'dashboard_game_replay', + 'request_id': request_id, + 'game_id': game_id, + 'error': 'game_not_found', + } + + return { + 'type': 'dashboard_game_replay', + 'request_id': request_id, + 'game_id': game_id, + 'replay': replay_payload, + } + + async def handle_dashboard_ws_request(self, payload_raw:object) -> dict|None: + if not isinstance(payload_raw, str): + return None + + try: + payload = json.loads(payload_raw) + except json.JSONDecodeError: + return None + + if not isinstance(payload, dict): + return None + + if payload.get('type') != 'dashboard_game_replay_request': + return None + + game_id = str(payload.get('game_id') or '').strip() + request_id_raw = payload.get('request_id') + request_id = None if request_id_raw is None else str(request_id_raw) + if game_id == '': + return { + 'type': 'dashboard_game_replay', + 'request_id': request_id, + 'error': 'missing_game_id', + } + + return await self.build_dashboard_game_replay_event( + game_id=game_id, + request_id=request_id, + ) + + async def push_dashboard_games_update(self, game_state:dict|None=None, publish_cluster:bool=True, trigger:str|None=None) -> None: + if self.gameplay_database is None: + return + event_payload = await self.build_dashboard_games_event( + game_state, + trigger_override=trigger, + ) + await self.ws_hub.broadcast_payload(event_payload) + if publish_cluster and self.publish_notice is not None: + await self.publish_notice(str(event_payload.get('trigger') or '')) + + async def get_dashboard_summary(self) -> dict: + if self.gameplay_database is None: + return {'enabled': False} + try: + await self._finalize_stale_dashboard_games() + summary = await self.gameplay_database.get_summary() + summary['enabled'] = True + return summary + except Exception as error: + await await_log(self.logger.warning(f'Gameplay DB summary failed:{error}')) + return {'enabled': True, 'error': ' summary_unavailable'} + + async def get_dashboard_games(self, limit:int=50) -> dict: + if self.gameplay_database is None: + return {'enabled': False, 'games': []} + try: + await self._finalize_stale_dashboard_games() + games = await self.gameplay_database.list_games(limit=limit) + return {'enabled': True, 'games': games} + except Exception as error: + await await_log( + self.logger.warning(f'Gameplay DB game list failed:{error}') + ) + return {'enabled': True, 'error': 'games_unavailable', 'games': []} + + async def get_dashboard_game_replay(self, game_id:str) -> dict|None: + if self.gameplay_database is None: + return {'enabled': False, 'error': 'database_disabled', 'game_id': game_id} + try: + replay = await self.gameplay_database.get_game_replay(game_id) + if replay is None: + return None + replay['enabled'] = True + return replay + except Exception as error: + await await_log(self.logger.warning(f'Gameplay DB replay failed:{error}')) + return {'enabled': True, 'error': 'replay_unavailable', 'game_id': game_id} + + async def _finalize_stale_dashboard_games(self) -> None: + if self.gameplay_database is None: + return + try: + await self.gameplay_database.finalize_stale_running_games(stale_after_seconds=self.dashboard_running_game_stale_sec) + except Exception as error: + await await_log(self.logger.warning(f'Gameplay DB stale running game finalize failed:{error}')) diff --git a/server/services/game_runtime.py b/server/services/game_runtime.py new file mode 100644 index 0000000..9b00e5e --- /dev/null +++ b/server/services/game_runtime.py @@ -0,0 +1,103 @@ +from typing import cast +import time + +from server.metrics import MetricsCollector +from server.GameBoard import GameBoard + +from storage import StorageLoader +from snakes import SnakeBuilder + +class GameRuntimeService: + def __init__(self, game_state_store:StorageLoader, snake_type:str, game_state_local_cache:bool, stale_game_timeout_sec:int): + self.game_state_store = game_state_store + self.snake_type = snake_type + self.game_state_local_cache = game_state_local_cache + self.stale_game_timeout_sec = stale_game_timeout_sec + self.metrics_collector = None + + self.running_games: dict[str, GameBoard] = {} + self.game_move_counts: dict[str, int] = {} + self.game_last_seen_unix: dict[str, int] = {} + + def attach_metrics_collector(self, metrics_collector:MetricsCollector) -> None: + self.metrics_collector = metrics_collector + + async def create_game_board(self, game_state:dict, snake_builder:SnakeBuilder) -> GameBoard: + game_id = game_state['game']['id'] + new_game_board = GameBoard( + game_id=game_id, + width=game_state['board']['width'], + height=game_state['board']['height'], + ruleset=game_state['game']['ruleset'], + source=game_state['game']['source'], + map=game_state['game']['map'], + snake_class=snake_builder.build(self.snake_type), + ) + await new_game_board.start_game(game_state) + + if self.game_state_local_cache: + self.running_games[game_id] = new_game_board + + await self.game_state_store.save(game_id, new_game_board) + self.game_move_counts[game_id] = 0 + self.game_last_seen_unix[game_id] = int(time.time()) + if self.metrics_collector is not None: + await self.metrics_collector.record_game_started(len(self.game_last_seen_unix)) + return new_game_board + + async def persist_game_board(self, game_id:str, game_board:GameBoard) -> None: + if self.game_state_local_cache: + self.running_games[game_id] = game_board + await self.game_state_store.save(game_id, game_board) + + async def delete_game_board(self, game_state:dict) -> None: + game_id = game_state['game']['id'] + self.running_games.pop(game_id, None) + self.game_move_counts.pop(game_id, None) + self.game_last_seen_unix.pop(game_id, None) + await self.game_state_store.delete(game_id) + + async def get_game_board(self, game_state:dict, snake_builder:SnakeBuilder, end:bool=False) -> GameBoard: + game_id = game_state['game']['id'] + game_board: GameBoard + if self.game_state_local_cache and game_id in self.running_games: + game_board = self.running_games[game_id] + else: + persisted_board = await self.game_state_store.load(game_id) + if persisted_board is not None: + game_board = cast(GameBoard, persisted_board) + if self.game_state_local_cache: + self.running_games[game_id] = game_board + else: + game_board = await self.create_game_board(game_state, snake_builder) + if self.metrics_collector is not None: + await self.metrics_collector.record_game_autocreated() + + if not end: + self.game_move_counts[game_id] = self.game_move_counts.get(game_id, 0) + 1 + + self.game_last_seen_unix[game_id] = int(time.time()) + + game_board.read_game_data(game_state) + if end: + game_board.end_game(game_state) + await self.persist_game_board(game_id, game_board) + + return game_board + + async def prune_stale_games(self) -> None: + if not self.game_last_seen_unix: + return + + now = int(time.time()) + stale_ids = [ + game_id + for game_id, last_seen in self.game_last_seen_unix.items() + if now - last_seen >= self.stale_game_timeout_sec + ] + for game_id in stale_ids: + self.running_games.pop(game_id, None) + self.game_move_counts.pop(game_id, None) + self.game_last_seen_unix.pop(game_id, None) + if self.metrics_collector is not None: + await self.metrics_collector.record_stuck_removed() diff --git a/server/services/gameplay_tracking.py b/server/services/gameplay_tracking.py new file mode 100644 index 0000000..b012f64 --- /dev/null +++ b/server/services/gameplay_tracking.py @@ -0,0 +1,50 @@ +from quart_common.web.logger import await_log, logging + +from server.database import GameplayDatabase +from server.GameBoard import GameBoard + +class GameplayTrackingService: + def __init__(self, gameplay_database:GameplayDatabase, snake_type:str, snake_version:str, logger:logging): + self.gameplay_database = gameplay_database + self.snake_type = snake_type + self.snake_version = snake_version + self.logger = logger + + async def record_gameplay_start(self, game_state:dict) -> None: + if self.gameplay_database is None: + return + try: + await self.gameplay_database.record_game_start( + game_state, + snake_type=self.snake_type, + snake_version=self.snake_version, + ) + except Exception as error: + await await_log(self.logger.warning(f"Gameplay DB start record failed:{error}")) + + async def record_gameplay_turn(self, game_state:dict, my_move:str, game_board:GameBoard) -> None: + if self.gameplay_database is None: + return + try: + thinking = self._extract_latest_snake_thinking(game_board) + await self.gameplay_database.record_turn(game_state, my_move, thinking) + except Exception as error: + await await_log(self.logger.warning(f"Gameplay DB turn record failed:{error}")) + + async def record_gameplay_end(self, game_state:dict) -> None: + if self.gameplay_database is None: + return + try: + await self.gameplay_database.record_game_end(game_state) + except Exception as error: + await await_log(self.logger.warning(f"Gameplay DB end record failed:{error}")) + + def _extract_latest_snake_thinking(self, game_board:GameBoard) -> dict|None: + try: + history = game_board.snake_class.get_history() + except Exception: + return None + if not isinstance(history, list) or len(history) == 0: + return None + latest = history[-1] + return latest if isinstance(latest, dict) else None