import time from server.metrics import MetricsCollector from server.GameBoard import GameBoard from snakes import SnakeBuilder class GameRuntimeService: def __init__(self, snake_type:str, stale_game_timeout_sec:int): self.snake_type = snake_type self.stale_game_timeout_sec = stale_game_timeout_sec self.metrics_collector = None self.running_games: dict[str, GameBoard] = {} self.game_move_counts: dict[str, int] = {} self.game_last_seen_unix: dict[str, int] = {} def attach_metrics_collector(self, metrics_collector:MetricsCollector) -> None: self.metrics_collector = metrics_collector async def create_game_board(self, game_state:dict) -> GameBoard: game_id = game_state['game']['id'] new_game_board = GameBoard( game_id=game_id, width=game_state['board']['width'], height=game_state['board']['height'], ruleset=game_state['game']['ruleset'], source=game_state['game']['source'], map=game_state['game']['map'], snake_class=SnakeBuilder.build(self.snake_type), ) await new_game_board.start_game(game_state) self.running_games[game_id] = new_game_board self.game_move_counts[game_id] = 0 self.game_last_seen_unix[game_id] = int(time.time()) if self.metrics_collector is not None: await self.metrics_collector.record_game_started(len(self.game_last_seen_unix)) return new_game_board async def delete_game_board(self, game_state:dict) -> None: game_id = game_state['game']['id'] self.running_games.pop(game_id, None) self.game_move_counts.pop(game_id, None) self.game_last_seen_unix.pop(game_id, None) async def get_game_board(self, game_state:dict, end:bool=False) -> GameBoard: game_id = game_state['game']['id'] if game_id in self.running_games: game_board = self.running_games[game_id] else: game_board = await self.create_game_board(game_state) if self.metrics_collector is not None: await self.metrics_collector.record_game_autocreated() if not end: self.game_move_counts[game_id] = self.game_move_counts.get(game_id, 0) + 1 self.game_last_seen_unix[game_id] = int(time.time()) game_board.read_game_data(game_state) if end: game_board.end_game(game_state) return game_board async def prune_stale_games(self) -> None: if not self.game_last_seen_unix: return now = int(time.time()) stale_ids = [ game_id for game_id, last_seen in self.game_last_seen_unix.items() if now - last_seen >= self.stale_game_timeout_sec ] for game_id in stale_ids: self.running_games.pop(game_id, None) self.game_move_counts.pop(game_id, None) self.game_last_seen_unix.pop(game_id, None) if self.metrics_collector is not None: await self.metrics_collector.record_stuck_removed()