from quart_common.web.logger import build_logger, await_log from server.Files import read_file from server.game_state_store import GameStateStoreBuilder from server.GameBoard import GameBoard from snakes import SnakeBuilder from server.storage import StorageLoader from server.database import GameplayDatabase from server.metrics import ( MetricsStoreBuilder, MetricsCollector, ) from quart import ( Quart, request, jsonify, render_template, send_from_directory, websocket, ) import asyncio, signal, logging, json, os, re, time from typing import cast class Server: default_snake_config = { 'apiversion': '1', 'author': '', 'color': '#888888', 'head': 'default', 'tail': 'default', 'version': '1.0.0', } def __init__(self, data_path:str, snake_type:str, storage_type:str, debug:bool=False, check_tls_security:bool=False, game_state_backend:str='memory', game_state_redis_url:str='redis://localhost:6379/0', game_state_ttl_sec:int=900, game_state_local_cache:bool=True, metrics_backend:str='memory', metrics_redis_url:str='redis://localhost:6379/0', metrics_ttl_sec:int|None=None, gameplay_db_enabled:bool=True, gameplay_db_path:str|None=None, gameplay_db_busy_timeout_ms:int=5000): self.debug = debug self.snake_type = snake_type self.storage_type = storage_type self.config_file = os.path.join(data_path, 'data', 'snake-config.json') self.data_path = data_path self.check_tls_security = check_tls_security self.store_game_state = False normalized_backend = (game_state_backend or 'memory').strip().lower() self.game_state_local_cache = (game_state_local_cache and normalized_backend != 'memory') self.game_state_store = GameStateStoreBuilder.build( backend=game_state_backend, redis_url=game_state_redis_url, ttl_seconds=game_state_ttl_sec, ) metrics_backend_normalized = (metrics_backend or 'memory').strip().lower() self.stale_game_timeout_sec = self._get_stale_game_timeout_sec() self.running_games:dict[str, GameBoard] = {} self.game_move_counts:dict[str, int] = {} self.game_last_seen_unix:dict[str, int] = {} self.dashboard_game_subscribers:set[asyncio.Queue[str]] = set() self.dashboard_game_subscribers_lock = asyncio.Lock() self.dashboard_ws_tasks:set[asyncio.Task] = set() self.dashboard_ws_tasks_lock = asyncio.Lock() self.dashboard_ws_shutdown_event = asyncio.Event() self.dashboard_ws_shutdown_message = json.dumps({'type': 'dashboard_ws_shutdown'}) self.metrics_collector = MetricsCollector( metrics_manager=MetricsStoreBuilder.build( backend=metrics_backend_normalized, redis_url=metrics_redis_url, ttl_seconds=metrics_ttl_sec, key_prefix=os.environ.get('METRICS_REDIS_KEY_PREFIX', 'snake:metrics:worker'), ), game_state_local_cache=self.game_state_local_cache, metrics_backend=metrics_backend_normalized, game_state_backend=game_state_backend, stale_game_timeout_sec=self.stale_game_timeout_sec, game_last_seen_unix=self.game_last_seen_unix, game_move_counts=self.game_move_counts, ) self.clear_worker_metrics_on_startup = self._env_bool('METRICS_CLEAR_WORKERS_ON_STARTUP', True) self.worker_metrics_startup_lock_ttl_sec = self._env_int('METRICS_STARTUP_CLEANUP_LOCK_TTL_SEC', 300) self.dashboard_running_game_stale_sec = 600 self._startup_worker_metrics_cleared = False self.logger = build_logger('Battlesnake', debug_env_var='DEBUG_SERVER') self.snake_version = self._get_snake_version() self.gameplay_database = None if gameplay_db_enabled: db_path = gameplay_db_path or os.path.join(data_path, 'data', 'database', 'gameplay.sqlite3') self.gameplay_database = GameplayDatabase( db_path=db_path, busy_timeout_ms=gameplay_db_busy_timeout_ms, ) self.app = Quart('Battlesnake', template_folder=os.path.join(data_path, 'server', 'templates')) # info is called when you create your Battlesnake on play.battlesnake.com # and controls your Battlesnake's appearance # TIP: If you open your Battlesnake URL in a browser you should see this data @self.app.get('/') async def on_info(): self.metrics_collector.record_http_request('info') snake_config = await self._read_json_config_or_create() await await_log(self.logger.info(f'INFO Snake: {snake_config}')) return snake_config # start is called when your Battlesnake begins a game @self.app.post('/start') async def on_start(): self.metrics_collector.record_http_request('start') await self._prune_stale_games() game_state = await request.get_json() await self._create_game_board(game_state) await self._record_gameplay_start(game_state) await await_log(self.logger.info(f'GAME START: {game_state['game']}')) return 'ok' # move is called when your Battlesnake game is running game @self.app.post('/move') async def on_move(): self.metrics_collector.record_http_request('move') game_state = await request.get_json() move_started = time.perf_counter() game_board = cast(GameBoard, await self._get_game_board(game_state)) next_move = game_board.snake_neat_make_a_move() await self._persist_game_board(game_state['game']['id'], game_board) await self._record_gameplay_turn(game_state, next_move, game_board) elapsed_ms = (time.perf_counter() - move_started) * 1000.0 await self.metrics_collector.record_move(next_move, elapsed_ms) if self.debug: await await_log(self.logger.debug(f'TURN: {game_state['turn']:3}, MOVE: {next_move:5}')) return {'move': next_move} # end is called when your Battlesnake finishes a game @self.app.post('/end') async def on_end(): self.metrics_collector.record_http_request('end') await self._prune_stale_games() game_state = await request.get_json() if self.store_game_state: game_board = cast(GameBoard, await self._get_game_board(game_state, end=True)) if self.check_tls_security: await game_board.save( StorageLoader.build(self.storage_type), file_path=os.path.join(self.data_path, 'data'), database=os.getenv('EDGEDB_DATABASE', None), tls_security=None, ) else: await game_board.save( StorageLoader.build(self.storage_type), file_path=os.path.join(self.data_path, 'data'), database=os.getenv('EDGEDB_DATABASE', None), ) await self._record_gameplay_end(game_state) await self._push_dashboard_games_update(game_state) await await_log(self.logger.info(f'GAME ENDED: Winner is {[x['name'] for x in game_state['board']['snakes']]}')) await self._delete_game_board(game_state) await self.metrics_collector.record_game_end(game_state) return 'ok' @self.app.after_request async def identify_server(response): response.headers.set('server', 'battlesnake/gitea/snake-python') return response @self.app.before_serving async def clear_startup_worker_metrics_once(): if self._startup_worker_metrics_cleared: return self._startup_worker_metrics_cleared = True if self.clear_worker_metrics_on_startup: should_clear = await self.metrics_collector.should_clear_worker_metrics_on_startup(self.worker_metrics_startup_lock_ttl_sec) if should_clear: await self.metrics_collector.clear_worker_metrics() @self.app.after_serving async def shutdown_state_storage(): await self.game_state_store.close() await self.metrics_collector.close() if self.gameplay_database is not None: await self.gameplay_database.close() @self.app.get('/cleanup') async def cleanup(): results = self._cleanup_database() return jsonify(data=json.loads(results), status=200) @self.app.get('/metrics') async def metrics(): snapshot = await self.metrics_collector.build_snapshot(self.game_last_seen_unix, self.game_move_counts) return jsonify(snapshot) @self.app.get('/metrics/prometheus') async def metrics_prometheus(): snapshot = await self.metrics_collector.build_snapshot(self.game_last_seen_unix, self.game_move_counts) return ( self.metrics_collector.build_prometheus_metrics(snapshot), 200, {'Content-Type': 'text/plain; version=0.0.4; charset=utf-8'}, ) @self.app.get('/dashboard') async def dashboard_view(): initial_game_id = request.args.get('game_id', '') initial_summary = await self._get_dashboard_summary() initial_games = await self._get_dashboard_games(limit=100) return await render_template( 'dashboard.html', initial_game_id=initial_game_id, initial_summary=initial_summary, initial_games=initial_games, ) @self.app.get('/dashboard/game/') async def dashboard_game_replay(game_id:str): replay = await self._get_dashboard_game_replay(game_id) if replay is None: return jsonify({'error': 'game_not_found', 'game_id': game_id}), 404 return jsonify(replay) @self.app.get('/dashboard/customizations/') async def dashboard_customizations_asset(asset_path: str): customization_root = os.path.join(self.data_path, 'server', 'static', 'customizations') return await send_from_directory(customization_root, asset_path) @self.app.websocket('/dashboard/ws/games') async def dashboard_games_ws(): websocket_task = asyncio.current_task() if websocket_task is not None: await self._register_dashboard_ws_task(websocket_task) subscriber_queue: asyncio.Queue[str] = asyncio.Queue(maxsize=20) await self._register_dashboard_game_subscriber(subscriber_queue) try: initial_payload = await self._build_dashboard_games_event() await asyncio.wait_for(websocket.send(json.dumps(initial_payload)), timeout=1.5) while True: queue_task = asyncio.create_task(subscriber_queue.get()) receive_task = asyncio.create_task(websocket.receive()) try: done, _ = await asyncio.wait( {queue_task, receive_task}, timeout=1.0, return_when=asyncio.FIRST_COMPLETED, ) if len(done) == 0: if self.dashboard_ws_shutdown_event.is_set(): await asyncio.wait_for( websocket.send(self.dashboard_ws_shutdown_message), timeout=1.5, ) break continue if receive_task in done: try: request_payload_raw = receive_task.result() except Exception: break response_event = await self._handle_dashboard_ws_request( request_payload_raw ) if response_event is not None: await asyncio.wait_for( websocket.send(json.dumps(response_event)), timeout=1.5, ) if queue_task in done: event_payload = queue_task.result() if event_payload == self.dashboard_ws_shutdown_message: await asyncio.wait_for( websocket.send(event_payload), timeout=1.5 ) break await asyncio.wait_for( websocket.send(event_payload), timeout=1.5 ) except asyncio.TimeoutError: if self.dashboard_ws_shutdown_event.is_set(): await asyncio.wait_for( websocket.send(self.dashboard_ws_shutdown_message), timeout=1.5, ) break finally: for pending_task in (queue_task, receive_task): if not pending_task.done(): pending_task.cancel() await asyncio.gather( queue_task, receive_task, return_exceptions=True ) except asyncio.CancelledError: pass except Exception: pass finally: await self._unregister_dashboard_game_subscriber(subscriber_queue) if websocket_task is not None: await self._unregister_dashboard_ws_task(websocket_task) async def run(self, host:str='0.0.0.0', port:int=8000, debug:bool=False): logging.getLogger('werkzeug').setLevel(logging.ERROR) loop = asyncio.get_running_loop() shutdown_event = asyncio.Event() installed_signal_handlers:list[signal.Signals] = [] def on_shutdown_signal() -> None: self._request_dashboard_ws_shutdown() shutdown_event.set() async def shutdown_trigger() -> None: await shutdown_event.wait() for shutdown_signal in (signal.SIGINT, signal.SIGTERM): try: loop.add_signal_handler(shutdown_signal, on_shutdown_signal) installed_signal_handlers.append(shutdown_signal) except (NotImplementedError, RuntimeError): continue await await_log(self.logger.info(f'Running Battlesnake at http://{host}:{port} with the {' '.join(re.findall('[A-Z][^A-Z]*', self.snake_type))}')) try: await self.app.run_task(host=host, port=port, debug=debug, shutdown_trigger=shutdown_trigger) finally: self._request_dashboard_ws_shutdown() for shutdown_signal in installed_signal_handlers: try: loop.remove_signal_handler(shutdown_signal) except Exception: continue async def _read_json_config_or_create(self) -> dict[str, str]: snake_config = cast(dict[str, str]|None, await read_file(self.config_file, json.load)) if not snake_config: return await self._override_snake_config_with_environment_variables(self.default_snake_config) return await self._override_snake_config_with_environment_variables(snake_config) async def _override_snake_config_with_environment_variables(self, config:dict[str, str]) -> dict[str, str]: config['version'] = self.snake_version for key in ('author', 'color', 'head', 'tail'): value = os.environ.get(f'SNAKE_{key.upper()}') if value is not None: config[key] = value version_override = os.environ.get('SNAKE_VERSION') if version_override is not None: config['version'] = version_override return config def _get_snake_version(self) -> str: configured_version = SnakeBuilder.get_version(self.snake_type) if configured_version: return configured_version try: snake = SnakeBuilder.build(self.snake_type) except Exception: return self.default_snake_config['version'] version = getattr(snake, 'version', None) if version is None: version = getattr(snake, 'VERSION', None) if not version: return self.default_snake_config['version'] return str(version) def _get_stale_game_timeout_sec(self) -> int: value = os.getenv('SNAKE_STUCK_GAME_TIMEOUT_SEC', '180') try: return max(30, int(value)) except ValueError: return 180 def _env_bool(self, name:str, default:bool=False) -> bool: value = os.getenv(name) if value is None: return default return value.strip().lower() in {'1', 'true', 'yes', 'on'} def _env_int(self, name: str, default: int) -> int: value = os.getenv(name) if value is None: return default try: return int(value) except ValueError: return default async def _create_game_board(self, game_state:dict) -> GameBoard: game_id = game_state['game']['id'] new_game_board = GameBoard( game_id=game_id, width=game_state['board']['width'], height=game_state['board']['height'], ruleset=game_state['game']['ruleset'], source=game_state['game']['source'], map=game_state['game']['map'], snake_class=SnakeBuilder.build(self.snake_type), ) await new_game_board.start_game(game_state) if self.game_state_local_cache: self.running_games[game_id] = new_game_board await self.game_state_store.save(game_id, new_game_board) self.game_move_counts[game_id] = 0 self.game_last_seen_unix[game_id] = int(time.time()) await self.metrics_collector.record_game_started(len(self.game_last_seen_unix)) return new_game_board async def _persist_game_board(self, game_id:str, game_board:GameBoard): if self.game_state_local_cache: self.running_games[game_id] = game_board await self.game_state_store.save(game_id, game_board) async def _delete_game_board(self, game_state:dict): game_id = game_state['game']['id'] self.running_games.pop(game_id, None) self.game_move_counts.pop(game_id, None) self.game_last_seen_unix.pop(game_id, None) await self.game_state_store.delete(game_id) async def _get_game_board(self, game_state:dict, end:bool=False) -> GameBoard: game_id = game_state['game']['id'] game_board:GameBoard if self.game_state_local_cache and game_id in self.running_games: game_board = self.running_games[game_id] else: persisted_board = await self.game_state_store.load(game_id) if persisted_board is not None: game_board = cast(GameBoard, persisted_board) if self.game_state_local_cache: self.running_games[game_id] = game_board else: game_board = await self._create_game_board(game_state) await self.metrics_collector.record_game_autocreated() if not end: self.game_move_counts[game_id] = self.game_move_counts.get(game_id, 0) + 1 self.game_last_seen_unix[game_id] = int(time.time()) game_board.read_game_data(game_state) if end: game_board.end_game(game_state) await self._persist_game_board(game_id, game_board) return game_board def enable_store_game_state(self): self.store_game_state = True def _cleanup_database(self): storage = StorageLoader.build(self.storage_type)() return storage.cleanup() async def _prune_stale_games(self): if not self.game_last_seen_unix: return now = int(time.time()) stale_ids = [ game_id for game_id, last_seen in self.game_last_seen_unix.items() if now - last_seen >= self.stale_game_timeout_sec ] for game_id in stale_ids: self.running_games.pop(game_id, None) self.game_move_counts.pop(game_id, None) self.game_last_seen_unix.pop(game_id, None) await self.metrics_collector.record_stuck_removed() async def _record_gameplay_start(self, game_state:dict) -> None: if self.gameplay_database is None: return try: await self.gameplay_database.record_game_start( game_state, snake_type=self.snake_type, snake_version=self.snake_version, ) except Exception as error: await await_log(self.logger.warning(f'Gameplay DB start record failed:{error}')) def _extract_latest_snake_thinking(self, game_board:GameBoard) -> dict | None: try: history = game_board.snake_class.get_history() except Exception: return None if not isinstance(history, list) or len(history) == 0: return None latest = history[-1] return latest if isinstance(latest, dict) else None async def _record_gameplay_turn(self, game_state:dict, my_move:str, game_board:GameBoard) -> None: if self.gameplay_database is None: return try: thinking = self._extract_latest_snake_thinking(game_board) await self.gameplay_database.record_turn(game_state, my_move, thinking) except Exception as error: await await_log(self.logger.warning(f'Gameplay DB turn record failed:{error}')) async def _record_gameplay_end(self, game_state:dict) -> None: if self.gameplay_database is None: return try: await self.gameplay_database.record_game_end(game_state) except Exception as error: await await_log(self.logger.warning(f'Gameplay DB end record failed:{error}')) async def _register_dashboard_game_subscriber(self, subscriber_queue:asyncio.Queue[str]) -> None: async with self.dashboard_game_subscribers_lock: self.dashboard_game_subscribers.add(subscriber_queue) async def _unregister_dashboard_game_subscriber(self, subscriber_queue:asyncio.Queue[str]) -> None: async with self.dashboard_game_subscribers_lock: self.dashboard_game_subscribers.discard(subscriber_queue) async def _broadcast_dashboard_game_event(self, payload:dict) -> None: encoded_payload = json.dumps(payload) async with self.dashboard_game_subscribers_lock: subscribers = tuple(self.dashboard_game_subscribers) for subscriber_queue in subscribers: if subscriber_queue.full(): try: subscriber_queue.get_nowait() except asyncio.QueueEmpty: pass try: subscriber_queue.put_nowait(encoded_payload) except asyncio.QueueFull: continue async def _register_dashboard_ws_task(self, websocket_task:asyncio.Task) -> None: async with self.dashboard_ws_tasks_lock: self.dashboard_ws_tasks.add(websocket_task) async def _unregister_dashboard_ws_task(self, websocket_task:asyncio.Task) -> None: async with self.dashboard_ws_tasks_lock: self.dashboard_ws_tasks.discard(websocket_task) def _request_dashboard_ws_shutdown(self) -> None: if self.dashboard_ws_shutdown_event.is_set(): return self.dashboard_ws_shutdown_event.set() for subscriber_queue in tuple(self.dashboard_game_subscribers): if subscriber_queue.full(): try: subscriber_queue.get_nowait() except asyncio.QueueEmpty: pass try: subscriber_queue.put_nowait(self.dashboard_ws_shutdown_message) except asyncio.QueueFull: continue async def _build_dashboard_games_event(self, game_state:dict|None=None) -> dict: games_payload = await self._get_dashboard_games(limit=100) summary_payload = await self._get_dashboard_summary() game_id = None if game_state is not None: game_id = game_state.get('game', {}).get('id') return { 'type': 'dashboard_games_update', 'trigger': 'game_saved' if game_id else 'snapshot', 'games': games_payload, 'summary': summary_payload, } async def _build_dashboard_game_replay_event(self, game_id:str, request_id:str|None=None) -> dict: replay_payload = await self._get_dashboard_game_replay(game_id) if replay_payload is None: return { 'type': 'dashboard_game_replay', 'request_id': request_id, 'game_id': game_id, 'error': 'game_not_found', } return { 'type': 'dashboard_game_replay', 'request_id': request_id, 'game_id': game_id, 'replay': replay_payload, } async def _handle_dashboard_ws_request(self, payload_raw: object) -> dict | None: if not isinstance(payload_raw, str): return None try: payload = json.loads(payload_raw) except json.JSONDecodeError: return None if not isinstance(payload, dict): return None if payload.get('type') != 'dashboard_game_replay_request': return None game_id = str(payload.get('game_id') or '').strip() request_id_raw = payload.get('request_id') request_id = None if request_id_raw is None else str(request_id_raw) if game_id == '': return { 'type': 'dashboard_game_replay', 'request_id': request_id, 'error': 'missing_game_id', } return await self._build_dashboard_game_replay_event( game_id=game_id, request_id=request_id, ) async def _push_dashboard_games_update(self, game_state:dict|None=None) -> None: if self.gameplay_database is None: return event_payload = await self._build_dashboard_games_event(game_state) await self._broadcast_dashboard_game_event(event_payload) async def _get_dashboard_summary(self) -> dict: if self.gameplay_database is None: return {'enabled': False} try: await self._finalize_stale_dashboard_games() summary = await self.gameplay_database.get_summary() summary['enabled'] = True return summary except Exception as error: await await_log(self.logger.warning(f'Gameplay DB summary failed:{error}')) return {'enabled': True, 'error': ' summary_unavailable'} async def _get_dashboard_games(self, limit:int=50) -> dict: if self.gameplay_database is None: return {'enabled': False, 'games': []} try: await self._finalize_stale_dashboard_games() games = await self.gameplay_database.list_games(limit=limit) return {'enabled': True, 'games': games} except Exception as error: await await_log(self.logger.warning(f'Gameplay DB game list failed:{error}')) return {'enabled': True, 'error': 'games_unavailable', 'games': []} async def _finalize_stale_dashboard_games(self) -> None: if self.gameplay_database is None: return try: await self.gameplay_database.finalize_stale_running_games(stale_after_seconds=self.dashboard_running_game_stale_sec) except Exception as error: await await_log(self.logger.warning(f'Gameplay DB stale running game finalize failed:{error}')) async def _get_dashboard_game_replay(self, game_id:str) -> dict|None: if self.gameplay_database is None: return {'enabled': False, 'error': 'database_disabled', 'game_id': game_id} try: replay = await self.gameplay_database.get_game_replay(game_id) if replay is None: return None replay['enabled'] = True return replay except Exception as error: await await_log(self.logger.warning(f'Gameplay DB replay failed:{error}')) return {'enabled': True, 'error': 'replay_unavailable', 'game_id': game_id}