Files
snake-python/server/Server.py
T

326 lines
12 KiB
Python

from server.Files import read_file
from server.game_board_stats import GameBoardStoreBuilder
from server.GameBoard import GameBoard
from snakes import SnakeBuilder
from quart_common.web.logger import await_log
from quart_common.web.logger import build_logger
from server.metrics.MetricsManager import MetricsManager
from server.metrics.MetricsCollector import MetricsCollector
from server.storage.StorageLoader import StorageLoader
from quart import Quart, request, jsonify
import logging, json, os, re, time
from typing import cast
class Server:
default_snake_config = {
'apiversion': '1',
'author': '',
'color': '#888888',
'head': 'default',
'tail': 'default',
'version': '1.0.0',
}
def __init__(self, data_path:str, snake_type:str, storage_type:str, debug:bool=False, check_tls_security:bool=False, game_state_backend:str='memory', game_state_redis_url:str='redis://localhost:6379/0', game_state_ttl_sec:int=900, game_state_local_cache:bool=True, metrics_backend:str='memory', metrics_redis_url:str='redis://localhost:6379/0', metrics_ttl_sec:int|None=None):
self.debug = debug
self.snake_type = snake_type
self.storage_type = storage_type
self.config_file = os.path.join(data_path, 'data', 'snake-config.json')
self.data_path = data_path
self.check_tls_security = check_tls_security
self.store_game_state = False
normalized_backend = (game_state_backend or 'memory').strip().lower()
self.game_state_local_cache = (game_state_local_cache and normalized_backend != 'memory')
self.game_state_store = GameBoardStoreBuilder.build(
backend=game_state_backend,
redis_url=game_state_redis_url,
ttl_seconds=game_state_ttl_sec,
)
metrics_backend_normalized = (metrics_backend or 'memory').strip().lower()
self.stale_game_timeout_sec = self._get_stale_game_timeout_sec()
self.running_games:dict[str, GameBoard] = {}
self.game_move_counts:dict[str, int] = {}
self.game_last_seen_unix:dict[str, int] = {}
self.metrics_collector = MetricsCollector(
metrics_manager=MetricsManager(
backend=metrics_backend_normalized,
redis_url=metrics_redis_url,
ttl_seconds=metrics_ttl_sec,
key_prefix=os.environ.get('METRICS_REDIS_KEY_PREFIX', 'snake:metrics:worker'),
),
game_state_local_cache=self.game_state_local_cache,
metrics_backend=metrics_backend_normalized,
game_state_backend=game_state_backend,
stale_game_timeout_sec=self.stale_game_timeout_sec,
game_last_seen_unix=self.game_last_seen_unix,
game_move_counts=self.game_move_counts,
)
self.clear_worker_metrics_on_startup = self._env_bool('METRICS_CLEAR_WORKERS_ON_STARTUP', True)
self.worker_metrics_startup_lock_ttl_sec = self._env_int('METRICS_STARTUP_CLEANUP_LOCK_TTL_SEC', 300)
self._startup_worker_metrics_cleared = False
self.logger = build_logger('Battlesnake', debug_env_var='DEBUG_SERVER')
self.snake_version = self._get_snake_version()
self.app = Quart('Battlesnake')
# info is called when you create your Battlesnake on play.battlesnake.com
# and controls your Battlesnake's appearance
# TIP: If you open your Battlesnake URL in a browser you should see this data
@self.app.get('/')
async def on_info():
self.metrics_collector.record_http_request('info')
snake_config = await self._read_json_config_or_create()
await await_log(self.logger.info(f'INFO Snake: {snake_config}'))
return snake_config
# start is called when your Battlesnake begins a game
@self.app.post('/start')
async def on_start():
self.metrics_collector.record_http_request('start')
await self._prune_stale_games()
game_state = await request.get_json()
await self._create_game_board(game_state)
await await_log(self.logger.info(f'GAME START: {game_state['game']}'))
return 'ok'
# move is called when your Battlesnake game is running game
@self.app.post('/move')
async def on_move():
self.metrics_collector.record_http_request('move')
game_state = await request.get_json()
move_started = time.perf_counter()
game_board = cast(GameBoard, await self._get_game_board(game_state))
next_move = game_board.snake_neat_make_a_move()
await self._persist_game_board(game_state['game']['id'], game_board)
elapsed_ms = (time.perf_counter() - move_started) * 1000.0
await self.metrics_collector.record_move(next_move, elapsed_ms)
if self.debug:
await await_log(self.logger.debug(f'TURN: {game_state['turn']:3}, MOVE: {next_move:5}'))
return {'move': next_move}
# end is called when your Battlesnake finishes a game
@self.app.post('/end')
async def on_end():
self.metrics_collector.record_http_request('end')
await self._prune_stale_games()
game_state = await request.get_json()
if self.store_game_state:
game_board = cast(GameBoard, await self._get_game_board(game_state, end=True))
if self.check_tls_security:
await game_board.save(
StorageLoader.build(self.storage_type),
file_path=os.path.join(self.data_path, 'data'),
database=os.getenv('EDGEDB_DATABASE', None),
tls_security=None,
)
else:
await game_board.save(
StorageLoader.build(self.storage_type),
file_path=os.path.join(self.data_path, 'data'),
database=os.getenv('EDGEDB_DATABASE', None),
)
await await_log(self.logger.info(f'GAME ENDED: Winner is {[x['name'] for x in game_state['board']['snakes']]}'))
await self._delete_game_board(game_state)
await self.metrics_collector.record_game_end(game_state)
return 'ok'
@self.app.after_request
async def identify_server(response):
response.headers.set('server', 'battlesnake/gitea/snake-python')
return response
@self.app.before_serving
async def clear_startup_worker_metrics_once():
if self._startup_worker_metrics_cleared:
return
self._startup_worker_metrics_cleared = True
if self.clear_worker_metrics_on_startup:
should_clear = await self.metrics_collector.should_clear_worker_metrics_on_startup(self.worker_metrics_startup_lock_ttl_sec)
if should_clear:
await self.metrics_collector.clear_worker_metrics()
@self.app.after_serving
async def shutdown_state_storage():
await self.game_state_store.close()
await self.metrics_collector.close()
@self.app.get('/cleanup')
async def cleanup():
results = self._cleanup_database()
return jsonify(data=json.loads(results), status=200)
@self.app.get('/metrics')
async def metrics():
snapshot = await self.metrics_collector.build_snapshot(self.game_last_seen_unix, self.game_move_counts)
return jsonify(snapshot)
@self.app.get('/metrics/prometheus')
async def metrics_prometheus():
snapshot = await self.metrics_collector.build_snapshot(self.game_last_seen_unix, self.game_move_counts)
return (
self.metrics_collector.build_prometheus_metrics(snapshot),
200,
{'Content-Type': 'text/plain; version=0.0.4; charset=utf-8'},
)
async def run(self, host:str='0.0.0.0', port:int=8000, debug:bool=False):
logging.getLogger('werkzeug').setLevel(logging.ERROR)
await await_log(self.logger.info(f'Running Battlesnake at http://{host}:{port} with the {" ".join(re.findall("[A-Z][^A-Z]*", self.snake_type))}'))
await self.app.run_task(host=host, port=port, debug=debug)
async def _read_json_config_or_create(self) -> dict[str, str]:
snake_config = cast(dict[str, str]|None, await read_file(self.config_file, json.load))
if not snake_config:
return await self._override_snake_config_with_environment_variables(self.default_snake_config)
return await self._override_snake_config_with_environment_variables(snake_config)
async def _override_snake_config_with_environment_variables(self, config:dict[str, str]) -> dict[str, str]:
config['version'] = self.snake_version
for key in ('author', 'color', 'head', 'tail'):
value = os.environ.get(f'SNAKE_{key.upper()}')
if value is not None:
config[key] = value
version_override = os.environ.get('SNAKE_VERSION')
if version_override is not None:
config['version'] = version_override
return config
def _get_snake_version(self) -> str:
configured_version = SnakeBuilder.get_version(self.snake_type)
if configured_version:
return configured_version
try:
snake = SnakeBuilder.build(self.snake_type)
except Exception:
return self.default_snake_config['version']
version = getattr(snake, 'version', None)
if version is None:
version = getattr(snake, 'VERSION', None)
if not version:
return self.default_snake_config['version']
return str(version)
def _get_stale_game_timeout_sec(self) -> int:
value = os.getenv('SNAKE_STUCK_GAME_TIMEOUT_SEC', '180')
try:
return max(30, int(value))
except ValueError:
return 180
def _env_bool(self, name:str, default:bool=False) -> bool:
value = os.getenv(name)
if value is None:
return default
return value.strip().lower() in {'1', 'true', 'yes', 'on'}
def _env_int(self, name: str, default: int) -> int:
value = os.getenv(name)
if value is None:
return default
try:
return int(value)
except ValueError:
return default
async def _create_game_board(self, game_state:dict) -> GameBoard:
game_id = game_state['game']['id']
new_game_board = GameBoard(
game_id=game_id,
width=game_state['board']['width'],
height=game_state['board']['height'],
ruleset=game_state['game']['ruleset'],
source=game_state['game']['source'],
map=game_state['game']['map'],
snake_class=SnakeBuilder.build(self.snake_type),
)
await new_game_board.start_game(game_state)
if self.game_state_local_cache:
self.running_games[game_id] = new_game_board
await self.game_state_store.save(game_id, new_game_board)
self.game_move_counts[game_id] = 0
self.game_last_seen_unix[game_id] = int(time.time())
await self.metrics_collector.record_game_started(len(self.game_last_seen_unix))
return new_game_board
async def _persist_game_board(self, game_id:str, game_board:GameBoard):
if self.game_state_local_cache:
self.running_games[game_id] = game_board
await self.game_state_store.save(game_id, game_board)
async def _delete_game_board(self, game_state:dict):
game_id = game_state['game']['id']
self.running_games.pop(game_id, None)
self.game_move_counts.pop(game_id, None)
self.game_last_seen_unix.pop(game_id, None)
await self.game_state_store.delete(game_id)
async def _get_game_board(self, game_state:dict, end:bool=False) -> GameBoard:
game_id = game_state['game']['id']
game_board: GameBoard
if self.game_state_local_cache and game_id in self.running_games:
game_board = self.running_games[game_id]
else:
persisted_board = await self.game_state_store.load(game_id)
if persisted_board is not None:
game_board = cast(GameBoard, persisted_board)
if self.game_state_local_cache:
self.running_games[game_id] = game_board
else:
game_board = await self._create_game_board(game_state)
await self.metrics_collector.record_game_autocreated()
if not end:
self.game_move_counts[game_id] = self.game_move_counts.get(game_id, 0) + 1
self.game_last_seen_unix[game_id] = int(time.time())
game_board.read_game_data(game_state)
if end:
game_board.end_game(game_state)
await self._persist_game_board(game_id, game_board)
return game_board
def enable_store_game_state(self):
self.store_game_state = True
def _cleanup_database(self):
storage = StorageLoader.build(self.storage_type)()
return storage.cleanup()
async def _prune_stale_games(self):
if not self.game_last_seen_unix:
return
now = int(time.time())
stale_ids = [
game_id
for game_id, last_seen in self.game_last_seen_unix.items()
if now - last_seen >= self.stale_game_timeout_sec
]
for game_id in stale_ids:
self.running_games.pop(game_id, None)
self.game_move_counts.pop(game_id, None)
self.game_last_seen_unix.pop(game_id, None)
await self.metrics_collector.record_stuck_removed()