Files
snake-python/server/Server.py
T
daniel156161 5997a1f6c1
Build and Push Docker Container / build-and-push (push) Successful in 57s
only update stuck game metric when game state backend is not redis
2026-04-04 15:33:07 +02:00

295 lines
11 KiB
Python

from server.Files import read_file
from server.game_board_stats import GameBoardStoreBuilder
from server.GameBoard import GameBoard
from snakes import SnakeBuilder
from quart_common.web.logger import await_log
from quart_common.web.logger import build_logger
from server.metrics.MetricsManager import MetricsManager
from server.metrics.ServerMetricsCollector import ServerMetricsCollector
from server.storage.StorageLoader import StorageLoader
from quart import Quart, request, jsonify
import logging, json, os, re, time
from typing import cast
class Server:
default_snake_config = {
'apiversion': '1',
'author': '',
'color': '#888888',
'head': 'default',
'tail': 'default',
'version': '1.0.0',
}
def __init__(self, data_path:str, snake_type:str, storage_type:str, debug:bool=False, check_tls_security:bool=False, game_state_backend:str='memory', game_state_redis_url:str='redis://localhost:6379/0', game_state_ttl_sec:int=900, game_state_local_cache:bool=True, metrics_backend:str='memory', metrics_redis_url:str='redis://localhost:6379/0', metrics_ttl_sec:int=None):
self.debug = debug
self.snake_type = snake_type
self.storage_type = storage_type
self.config_file = os.path.join(data_path, 'data', 'snake-config.json')
self.data_path = data_path
self.check_tls_security = check_tls_security
self.store_game_state = False
normalized_backend = (game_state_backend or 'memory').strip().lower()
self.game_state_local_cache = (game_state_local_cache and normalized_backend != 'memory')
self.game_state_store = GameBoardStoreBuilder.build(
backend=game_state_backend,
redis_url=game_state_redis_url,
ttl_seconds=game_state_ttl_sec,
)
metrics_backend_normalized = (metrics_backend or 'memory').strip().lower()
self.stale_game_timeout_sec = self._get_stale_game_timeout_sec()
self.running_games:dict[str, GameBoard] = {}
self.game_move_counts:dict[str, int] = {}
self.game_last_seen_unix:dict[str, int] = {}
self.metrics_collector = ServerMetricsCollector(
metrics_manager=MetricsManager(
backend=metrics_backend_normalized,
redis_url=metrics_redis_url,
ttl_seconds=metrics_ttl_sec,
key_prefix=os.environ.get('METRICS_REDIS_KEY_PREFIX', 'snake:metrics:worker'),
),
game_state_local_cache=self.game_state_local_cache,
metrics_backend=metrics_backend_normalized,
game_state_backend=game_state_backend,
stale_game_timeout_sec=self.stale_game_timeout_sec,
game_last_seen_unix=self.game_last_seen_unix,
game_move_counts=self.game_move_counts,
)
self.logger = build_logger('Battlesnake', debug_env_var='DEBUG_SERVER')
self.snake_version = self._get_snake_version()
self.app = Quart('Battlesnake')
# info is called when you create your Battlesnake on play.battlesnake.com
# and controls your Battlesnake's appearance
# TIP: If you open your Battlesnake URL in a browser you should see this data
@self.app.get('/')
async def on_info():
self.metrics_collector.record_http_request('info')
snake_config = await self._read_json_config_or_create()
await await_log(self.logger.info(f'INFO Snake: {snake_config}'))
return snake_config
# start is called when your Battlesnake begins a game
@self.app.post('/start')
async def on_start():
self.metrics_collector.record_http_request('start')
await self._prune_stale_games()
game_state = await request.get_json()
await self._create_game_board(game_state)
await await_log(self.logger.info(f'GAME START: {game_state['game']}'))
return 'ok'
# move is called when your Battlesnake game is running game
@self.app.post('/move')
async def on_move():
self.metrics_collector.record_http_request('move')
game_state = await request.get_json()
move_started = time.perf_counter()
game_board = cast(GameBoard, await self._get_game_board(game_state))
next_move = game_board.snake_neat_make_a_move()
await self._persist_game_board(game_state['game']['id'], game_board)
elapsed_ms = (time.perf_counter() - move_started) * 1000.0
await self.metrics_collector.record_move(next_move, elapsed_ms)
if self.debug:
await await_log(self.logger.debug(f'TURN: {game_state['turn']:3}, MOVE: {next_move:5}'))
return {'move': next_move}
# end is called when your Battlesnake finishes a game
@self.app.post('/end')
async def on_end():
self.metrics_collector.record_http_request('end')
await self._prune_stale_games()
game_state = await request.get_json()
if self.store_game_state:
game_board = cast(GameBoard, await self._get_game_board(game_state, end=True))
if self.check_tls_security:
await game_board.save(
StorageLoader.build(self.storage_type),
file_path=os.path.join(self.data_path, 'data'),
database=os.getenv('EDGEDB_DATABASE', None),
tls_security=None,
)
else:
await game_board.save(
StorageLoader.build(self.storage_type),
file_path=os.path.join(self.data_path, 'data'),
database=os.getenv('EDGEDB_DATABASE', None),
)
await await_log(self.logger.info(f'GAME ENDED: Winner is {[x['name'] for x in game_state['board']['snakes']]}'))
await self._delete_game_board(game_state)
await self.metrics_collector.record_game_end(game_state)
return 'ok'
@self.app.after_request
async def identify_server(response):
response.headers.set('server', 'battlesnake/gitea/snake-python')
return response
@self.app.after_serving
async def shutdown_state_storage():
await self.game_state_store.close()
await self.metrics_collector.close()
@self.app.get('/cleanup')
async def cleanup():
results = self._cleanup_database()
return jsonify(data=json.loads(results), status=200)
@self.app.get('/metrics')
async def metrics():
snapshot = await self.metrics_collector.build_snapshot(self.game_last_seen_unix, self.game_move_counts)
return jsonify(snapshot)
@self.app.get('/metrics/prometheus')
async def metrics_prometheus():
snapshot = await self.metrics_collector.build_snapshot(self.game_last_seen_unix, self.game_move_counts)
return (
self.metrics_collector.build_prometheus_metrics(snapshot),
200,
{'Content-Type': 'text/plain; version=0.0.4; charset=utf-8'},
)
async def run(self, host:str='0.0.0.0', port:int=8000, debug:bool=False):
logging.getLogger('werkzeug').setLevel(logging.ERROR)
await await_log(self.logger.info(f'Running Battlesnake at http://{host}:{port} with the {" ".join(re.findall("[A-Z][^A-Z]*", self.snake_type))}'))
await self.app.run_task(host=host, port=port, debug=debug)
async def _read_json_config_or_create(self) -> dict[str, str]:
snake_config = cast(dict[str, str]|None, await read_file(self.config_file, json.load))
if not snake_config:
return await self._override_snake_config_with_environment_variables(self.default_snake_config)
return await self._override_snake_config_with_environment_variables(snake_config)
async def _override_snake_config_with_environment_variables(self, config:dict[str, str]) -> dict[str, str]:
config['version'] = self.snake_version
for key in ('author', 'color', 'head', 'tail'):
value = os.environ.get(f'SNAKE_{key.upper()}')
if value is not None:
config[key] = value
version_override = os.environ.get('SNAKE_VERSION')
if version_override is not None:
config['version'] = version_override
return config
def _get_snake_version(self) -> str:
configured_version = SnakeBuilder.get_version(self.snake_type)
if configured_version:
return configured_version
try:
snake = SnakeBuilder.build(self.snake_type)
except Exception:
return self.default_snake_config['version']
version = getattr(snake, 'version', None)
if version is None:
version = getattr(snake, 'VERSION', None)
if not version:
return self.default_snake_config['version']
return str(version)
def _get_stale_game_timeout_sec(self) -> int:
value = os.getenv('SNAKE_STUCK_GAME_TIMEOUT_SEC', '180')
try:
return max(30, int(value))
except ValueError:
return 180
async def _create_game_board(self, game_state:dict) -> GameBoard:
game_id = game_state['game']['id']
new_game_board = GameBoard(
game_id=game_id,
width=game_state['board']['width'],
height=game_state['board']['height'],
ruleset=game_state['game']['ruleset'],
source=game_state['game']['source'],
map=game_state['game']['map'],
snake_class=SnakeBuilder.build(self.snake_type),
)
await new_game_board.start_game(game_state)
if self.game_state_local_cache:
self.running_games[game_id] = new_game_board
await self.game_state_store.save(game_id, new_game_board)
self.game_move_counts[game_id] = 0
self.game_last_seen_unix[game_id] = int(time.time())
await self.metrics_collector.record_game_started(len(self.game_last_seen_unix))
return new_game_board
async def _persist_game_board(self, game_id:str, game_board:GameBoard):
if self.game_state_local_cache:
self.running_games[game_id] = game_board
await self.game_state_store.save(game_id, game_board)
async def _delete_game_board(self, game_state:dict):
game_id = game_state['game']['id']
self.running_games.pop(game_id, None)
self.game_move_counts.pop(game_id, None)
self.game_last_seen_unix.pop(game_id, None)
await self.game_state_store.delete(game_id)
async def _get_game_board(self, game_state:dict, end:bool=False) -> GameBoard:
game_id = game_state['game']['id']
game_board: GameBoard
if self.game_state_local_cache and game_id in self.running_games:
game_board = self.running_games[game_id]
else:
persisted_board = await self.game_state_store.load(game_id)
if persisted_board is not None:
game_board = cast(GameBoard, persisted_board)
if self.game_state_local_cache:
self.running_games[game_id] = game_board
else:
game_board = await self._create_game_board(game_state)
await self.metrics_collector.record_game_autocreated()
if not end:
self.game_move_counts[game_id] = self.game_move_counts.get(game_id, 0) + 1
self.game_last_seen_unix[game_id] = int(time.time())
game_board.read_game_data(game_state)
if end:
game_board.end_game(game_state)
await self._persist_game_board(game_id, game_board)
return game_board
def enable_store_game_state(self):
self.store_game_state = True
def _cleanup_database(self):
storage = StorageLoader.build(self.storage_type)()
return storage.cleanup()
async def _prune_stale_games(self):
if not self.game_last_seen_unix:
return
now = int(time.time())
stale_ids = [
game_id
for game_id, last_seen in self.game_last_seen_unix.items()
if now - last_seen >= self.stale_game_timeout_sec
]
for game_id in stale_ids:
self.running_games.pop(game_id, None)
self.game_move_counts.pop(game_id, None)
self.game_last_seen_unix.pop(game_id, None)
await self.metrics_collector.record_stuck_removed()