Files

82 lines
2.8 KiB
Python

import time
from server.metrics import MetricsCollector
from server.GameBoard import GameBoard
from snakes import SnakeBuilder
class GameRuntimeService:
def __init__(self, snake_type:str, stale_game_timeout_sec:int):
self.snake_type = snake_type
self.stale_game_timeout_sec = stale_game_timeout_sec
self.metrics_collector = None
self.running_games: dict[str, GameBoard] = {}
self.game_move_counts: dict[str, int] = {}
self.game_last_seen_unix: dict[str, int] = {}
def attach_metrics_collector(self, metrics_collector:MetricsCollector) -> None:
self.metrics_collector = metrics_collector
async def create_game_board(self, game_state:dict) -> GameBoard:
game_id = game_state['game']['id']
new_game_board = GameBoard(
game_id=game_id,
width=game_state['board']['width'],
height=game_state['board']['height'],
ruleset=game_state['game']['ruleset'],
source=game_state['game']['source'],
map=game_state['game']['map'],
snake_class=SnakeBuilder.build(self.snake_type),
)
await new_game_board.start_game(game_state)
self.running_games[game_id] = new_game_board
self.game_move_counts[game_id] = 0
self.game_last_seen_unix[game_id] = int(time.time())
if self.metrics_collector is not None:
await self.metrics_collector.record_game_started(len(self.game_last_seen_unix))
return new_game_board
async def delete_game_board(self, game_state:dict) -> None:
game_id = game_state['game']['id']
self.running_games.pop(game_id, None)
self.game_move_counts.pop(game_id, None)
self.game_last_seen_unix.pop(game_id, None)
async def get_game_board(self, game_state:dict, end:bool=False) -> GameBoard:
game_id = game_state['game']['id']
if game_id in self.running_games:
game_board = self.running_games[game_id]
else:
game_board = await self.create_game_board(game_state)
if self.metrics_collector is not None:
await self.metrics_collector.record_game_autocreated()
if not end:
self.game_move_counts[game_id] = self.game_move_counts.get(game_id, 0) + 1
self.game_last_seen_unix[game_id] = int(time.time())
game_board.read_game_data(game_state)
if end:
game_board.end_game(game_state)
return game_board
async def prune_stale_games(self) -> None:
if not self.game_last_seen_unix:
return
now = int(time.time())
stale_ids = [
game_id
for game_id, last_seen in self.game_last_seen_unix.items()
if now - last_seen >= self.stale_game_timeout_sec
]
for game_id in stale_ids:
self.running_games.pop(game_id, None)
self.game_move_counts.pop(game_id, None)
self.game_last_seen_unix.pop(game_id, None)
if self.metrics_collector is not None:
await self.metrics_collector.record_stuck_removed()