diff --git a/server/Server.py b/server/Server.py index 1b75ed9..6282811 100644 --- a/server/Server.py +++ b/server/Server.py @@ -1,15 +1,18 @@ from server.Files import read_file + from server.game_board_stats import GameBoardStoreBuilder from server.GameBoard import GameBoard + from snakes import SnakeBuilder from quart_common.web.logger import await_log from quart_common.web.logger import build_logger -from typing import cast +from server.metrics.MetricsManager import MetricsManager from server.storage.StorageLoader import StorageLoader from quart import Quart, request, jsonify import logging, json, os, re, time +from typing import cast class Server: default_snake_config = { @@ -21,7 +24,7 @@ class Server: 'version': '1.0.0', } - def __init__(self, data_path:str, snake_type:str, storage_type:str, debug:bool=False, check_tls_security:bool=False, game_state_backend:str='memory', game_state_redis_url:str='redis://localhost:6379/0', game_state_ttl_sec:int=900, game_state_local_cache:bool=True): + def __init__(self, data_path:str, snake_type:str, storage_type:str, debug:bool=False, check_tls_security:bool=False, game_state_backend:str='memory', game_state_redis_url:str='redis://localhost:6379/0', game_state_ttl_sec:int=900, game_state_local_cache:bool=True, metrics_backend:str='memory', metrics_redis_url:str='redis://localhost:6379/0', metrics_ttl_sec:int=None): self.debug = debug self.snake_type = snake_type self.storage_type = storage_type @@ -38,6 +41,13 @@ class Server: redis_url=game_state_redis_url, ttl_seconds=game_state_ttl_sec, ) + self.metrics_backend = (metrics_backend or 'memory').strip().lower() + self.metrics_manager = MetricsManager( + backend=self.metrics_backend, + redis_url=metrics_redis_url, + ttl_seconds=metrics_ttl_sec, + key_prefix=os.environ.get('METRICS_REDIS_KEY_PREFIX', 'snake:metrics:worker'), + ) self.running_games:dict[str, GameBoard] = {} self.game_move_counts:dict[str, int] = {} @@ -74,6 +84,7 @@ class Server: 'last_move_unix': 0, 'games_stuck_removed': 0, 'game_state_local_cache_enabled': bool(self.game_state_local_cache), + 'metrics_backend': self.metrics_backend, } self.logger = build_logger('Battlesnake', debug_env_var='DEBUG_SERVER') self.snake_version = self._get_snake_version() @@ -163,6 +174,7 @@ class Server: @self.app.after_serving async def shutdown_state_storage(): await self.game_state_store.close() + await self._close_metrics_store() @self.app.get('/cleanup') async def cleanup(): @@ -171,12 +183,13 @@ class Server: @self.app.get('/metrics') async def metrics(): - return jsonify(self._build_metrics()) + return jsonify(await self._build_metrics()) @self.app.get('/metrics/prometheus') async def metrics_prometheus(): + snapshot = await self._build_metrics() return ( - self._build_prometheus_metrics(), + self._build_prometheus_metrics(snapshot), 200, {'Content-Type': 'text/plain; version=0.0.4; charset=utf-8'}, ) @@ -338,12 +351,12 @@ class Server: else: self.metrics['losses'] += 1 - def _build_metrics(self) -> dict: + def _build_local_metrics(self) -> dict: games_ended = self.metrics['games_ended'] total_moves = self.metrics['total_moves'] avg_turns = self.metrics['total_turns'] / games_ended if games_ended else 0.0 win_rate = self.metrics['wins'] / games_ended if games_ended else 0.0 - avg_move_ms = self.metrics['move_response_time_ms_total'] / total_moves if total_moves else 0.0 + avg_move_ms = (self.metrics['move_response_time_ms_total'] / total_moves if total_moves else 0.0) now = int(time.time()) oldest_active_age = 0 @@ -369,13 +382,16 @@ class Server: 'active_games_stale': stale_candidates, } - def _record_http_request(self, endpoint:str): + def _record_http_request(self, endpoint: str): self.metrics['http_requests_total'] += 1 endpoint_counts = self.metrics['http_requests_by_endpoint'] endpoint_counts[endpoint] = endpoint_counts.get(endpoint, 0) + 1 - def _build_prometheus_metrics(self) -> str: - snapshot = self._build_metrics() + async def _build_metrics(self) -> dict: + local_snapshot = self._build_local_metrics() + return await self.metrics_manager.snapshot(local_snapshot) + + def _build_prometheus_metrics(self, snapshot: dict) -> str: lines = [ '# HELP snake_games_started_total Total games started by snake server.', '# TYPE snake_games_started_total counter', @@ -466,3 +482,6 @@ class Server: lines.append(f'snake_moves_by_direction_total{{direction="{direction}"}} {count}') return '\n'.join(lines) + '\n' + + async def _close_metrics_store(self) -> None: + await self.metrics_manager.close() diff --git a/server/bootstrap.py b/server/bootstrap.py index a1afce8..5e6e6b2 100644 --- a/server/bootstrap.py +++ b/server/bootstrap.py @@ -17,16 +17,34 @@ def env_bool(name:str, default:bool=False) -> bool: def build_server_from_env(default_snake_type:str) -> Server: data_path = str(Path(__file__).resolve().parent.parent) + game_state_backend = os.environ.get('GAME_STATE_BACKEND', 'memory') + game_state_redis_url = os.environ.get('GAME_STATE_REDIS_URL', 'redis://localhost:6379/0') + game_state_ttl_sec = int(os.environ.get('GAME_STATE_TTL_SEC', '900')) + + metrics_backend = os.environ.get('METRICS_BACKEND', None) + if metrics_backend is None: + metrics_backend = ('redis' if game_state_backend.strip().lower() == 'redis' else 'memory') + + metrics_redis_url = os.environ.get('METRICS_REDIS_URL', game_state_redis_url) + metrics_ttl_sec_raw = os.environ.get('METRICS_TTL_SEC', None) + if metrics_ttl_sec_raw is None: + metrics_ttl_sec = (game_state_ttl_sec if metrics_backend.strip().lower() == 'redis' else None) + else: + metrics_ttl_sec = int(metrics_ttl_sec_raw) + server = Server( data_path=data_path, snake_type=os.environ.get('SNAKE', default_snake_type), storage_type=os.environ.get('STORAGE', 'LocalStorage'), debug=env_bool('DEBUG_SERVER'), check_tls_security=False, - game_state_backend=os.environ.get('GAME_STATE_BACKEND', 'memory'), - game_state_redis_url=os.environ.get('GAME_STATE_REDIS_URL', 'redis://localhost:6379/0'), - game_state_ttl_sec=int(os.environ.get('GAME_STATE_TTL_SEC', '900')), + game_state_backend=game_state_backend, + game_state_redis_url=game_state_redis_url, + game_state_ttl_sec=game_state_ttl_sec, game_state_local_cache=env_bool('GAME_STATE_LOCAL_CACHE', default=True), + metrics_backend=metrics_backend, + metrics_redis_url=metrics_redis_url, + metrics_ttl_sec=metrics_ttl_sec, ) if env_bool('STORE_GAME_HISTORY'): diff --git a/server/metrics/MemoryMetricsStore.py b/server/metrics/MemoryMetricsStore.py new file mode 100644 index 0000000..3bdd044 --- /dev/null +++ b/server/metrics/MemoryMetricsStore.py @@ -0,0 +1,12 @@ +class MemoryMetricsStore: + def __init__(self, **kwargs): + self._snapshots:dict[str, dict] = {} + + async def publish(self, worker_id:str, snapshot:dict) -> None: + self._snapshots[worker_id] = dict(snapshot) + + async def load_all(self) -> list[dict]: + return [dict(value) for value in self._snapshots.values()] + + async def close(self) -> None: + return None diff --git a/server/metrics/MetricsManager.py b/server/metrics/MetricsManager.py new file mode 100644 index 0000000..31807ca --- /dev/null +++ b/server/metrics/MetricsManager.py @@ -0,0 +1,103 @@ +from server.metrics import MetricsStoreBuilder + +import time, os + +class MetricsManager: + def __init__(self, backend:str="memory", redis_url:str="redis://localhost:6379/0", ttl_seconds:int=90, key_prefix:str="snake:metrics:worker", worker_id:str|None=None): + self.backend = (backend or "memory").strip().lower() + self.worker_id = worker_id or f"{os.getpid()}-{int(time.time() * 1000)}" + self.store = MetricsStoreBuilder.build( + backend=self.backend, + redis_url=redis_url, + ttl_seconds=ttl_seconds, + key_prefix=key_prefix, + ) + + async def snapshot(self, local_snapshot:dict) -> dict: + await self.store.publish(self.worker_id, local_snapshot) + + if self.backend != "redis": + return local_snapshot + + snapshots = await self.store.load_all() + if not snapshots: + return local_snapshot + return self._merge_snapshots(snapshots) + + async def close(self) -> None: + await self.store.close() + + def _merge_snapshots(self, snapshots:list[dict]) -> dict: + merged = { + "games_started": 0, + "games_ended": 0, + "wins": 0, + "losses": 0, + "total_moves": 0, + "total_turns": 0, + "max_turn": 0, + "active_games_peak": 0, + "games_autocreated": 0, + "http_requests_total": 0, + "move_response_time_ms_total": 0.0, + "move_response_time_ms_max": 0.0, + "last_game_start_unix": 0, + "last_game_end_unix": 0, + "last_move_unix": 0, + "games_stuck_removed": 0, + "game_state_local_cache_enabled": False, + "metrics_backend": "redis", + "active_games": 0, + "tracked_games": 0, + "oldest_active_game_age_sec": 0, + "stale_game_timeout_sec": 0, + "active_games_stale": 0, + "http_requests_by_endpoint": {"info": 0, "start": 0, "move": 0, "end": 0}, + "move_direction_counts": { + "up": 0, + "down": 0, + "left": 0, + "right": 0, + "unknown": 0, + }, + } + + for worker in snapshots: + for metric_name in ( + "games_started", + "games_ended", + "wins", + "losses", + "total_moves", + "total_turns", + "games_autocreated", + "http_requests_total", + "games_stuck_removed", + "active_games", + "tracked_games", + "active_games_stale", + ): + merged[metric_name] += int(worker.get(metric_name, 0)) + + merged["move_response_time_ms_total"] += float(worker.get("move_response_time_ms_total", 0.0)) + merged["max_turn"] = max(merged["max_turn"], int(worker.get("max_turn", 0))) + merged["active_games_peak"] = max(merged["active_games_peak"], int(worker.get("active_games_peak", 0))) + merged["move_response_time_ms_max"] = max(merged["move_response_time_ms_max"], float(worker.get("move_response_time_ms_max", 0.0))) + merged["last_game_start_unix"] = max(merged["last_game_start_unix"], int(worker.get("last_game_start_unix", 0)),) + merged["last_game_end_unix"] = max(merged["last_game_end_unix"], int(worker.get("last_game_end_unix", 0))) + merged["last_move_unix"] = max(merged["last_move_unix"], int(worker.get("last_move_unix", 0))) + merged["oldest_active_game_age_sec"] = max(merged["oldest_active_game_age_sec"], int(worker.get("oldest_active_game_age_sec", 0)),) + merged["stale_game_timeout_sec"] = max(merged["stale_game_timeout_sec"], int(worker.get("stale_game_timeout_sec", 0)),) + merged["game_state_local_cache_enabled"] = merged["game_state_local_cache_enabled"] or bool(worker.get("game_state_local_cache_enabled", False)) + + for endpoint in merged["http_requests_by_endpoint"]: + merged["http_requests_by_endpoint"][endpoint] += int(worker.get("http_requests_by_endpoint", {}).get(endpoint, 0)) + for direction in merged["move_direction_counts"]: + merged["move_direction_counts"][direction] += int(worker.get("move_direction_counts", {}).get(direction, 0)) + + games_ended = merged["games_ended"] + total_moves = merged["total_moves"] + merged["avg_turns_per_game"] = round((merged["total_turns"] / games_ended) if games_ended else 0.0, 2) + merged["win_rate"] = round((merged["wins"] / games_ended) if games_ended else 0.0, 4) + merged["avg_move_response_ms"] = round((merged["move_response_time_ms_total"] / total_moves) if total_moves else 0.0, 2) + return merged diff --git a/server/metrics/RedisMetricsStore.py b/server/metrics/RedisMetricsStore.py new file mode 100644 index 0000000..05b3ef6 --- /dev/null +++ b/server/metrics/RedisMetricsStore.py @@ -0,0 +1,60 @@ +import inspect +import json + +class RedisMetricsStore: + def __init__(self, redis_url:str="redis://localhost:6379/0", key_prefix:str="snake:metrics:worker", ttl_seconds:int=None, **kwargs): + self.redis_url = redis_url + self.key_prefix = key_prefix + self.ttl_seconds = ttl_seconds + self._redis = None + + async def _get_redis(self): + if self._redis is not None: + return self._redis + + try: + import redis.asyncio as aioredis # type: ignore[import-not-found] + except ImportError as error: # pragma: no cover + raise RuntimeError("Metrics backend set to redis but 'redis' package is not installed") from error + + self._redis = aioredis.from_url(self.redis_url) + return self._redis + + def _key(self, worker_id:str) -> str: + return f"{self.key_prefix}:{worker_id}" + + async def publish(self, worker_id:str, snapshot:dict) -> None: + redis = await self._get_redis() + await redis.set(self._key(worker_id), json.dumps(snapshot), ex=self.ttl_seconds) + + async def load_all(self) -> list[dict]: + redis = await self._get_redis() + keys = await redis.keys(f"{self.key_prefix}:*") + snapshots = [] + for key in keys: + payload = await redis.get(key) + if not payload: + continue + try: + snapshots.append(json.loads(payload)) + except Exception: + continue + return snapshots + + async def close(self) -> None: + if self._redis is None: + return + + aclose_method = getattr(self._redis, "aclose", None) + if callable(aclose_method): + maybe_result = aclose_method() + if inspect.isawaitable(maybe_result): + await maybe_result + else: + close_method = getattr(self._redis, "close", None) + if callable(close_method): + close_result = close_method() + if inspect.isawaitable(close_result): + await close_result + + self._redis = None diff --git a/server/metrics/__init__.py b/server/metrics/__init__.py new file mode 100644 index 0000000..a91a93c --- /dev/null +++ b/server/metrics/__init__.py @@ -0,0 +1,10 @@ +from server.metrics.MemoryMetricsStore import MemoryMetricsStore +from server.metrics.RedisMetricsStore import RedisMetricsStore + +class MetricsStoreBuilder: + @classmethod + def build(self, backend:str="memory", **kwargs) -> MemoryMetricsStore|RedisMetricsStore: + selected = (backend or "memory").strip().lower() + if selected == "redis": + return RedisMetricsStore(**kwargs) + return MemoryMetricsStore(**kwargs) diff --git a/tests/test_MetricsManager.py b/tests/test_MetricsManager.py new file mode 100644 index 0000000..0c2ee4c --- /dev/null +++ b/tests/test_MetricsManager.py @@ -0,0 +1,143 @@ +import unittest + +from server.metrics.MetricsManager import MetricsManager + +class TestMetricsManager(unittest.IsolatedAsyncioTestCase): + async def test_memory_backend_returns_local_snapshot(self): + manager = MetricsManager(backend="memory") + local = { + "games_started": 2, + "games_ended": 1, + "wins": 1, + "losses": 0, + "total_moves": 10, + "total_turns": 42, + "max_turn": 42, + "active_games_peak": 2, + "games_autocreated": 0, + "http_requests_total": 13, + "move_response_time_ms_total": 30.0, + "move_response_time_ms_max": 6.0, + "last_game_start_unix": 1, + "last_game_end_unix": 2, + "last_move_unix": 3, + "games_stuck_removed": 0, + "game_state_local_cache_enabled": False, + "metrics_backend": "memory", + "active_games": 1, + "tracked_games": 1, + "avg_turns_per_game": 42.0, + "win_rate": 1.0, + "avg_move_response_ms": 3.0, + "oldest_active_game_age_sec": 0, + "stale_game_timeout_sec": 180, + "active_games_stale": 0, + "http_requests_by_endpoint": {"info": 1, "start": 1, "move": 10, "end": 1}, + "move_direction_counts": { + "up": 4, + "down": 2, + "left": 2, + "right": 2, + "unknown": 0, + }, + } + + snapshot = await manager.snapshot(local) + self.assertEqual(snapshot["games_started"], 2) + self.assertEqual(snapshot["metrics_backend"], "memory") + await manager.close() + + async def test_merge_snapshots_aggregates_totals(self): + manager = MetricsManager(backend="memory") + merged = manager._merge_snapshots( + [ + { + "games_started": 2, + "games_ended": 1, + "wins": 1, + "losses": 0, + "total_moves": 10, + "total_turns": 40, + "max_turn": 40, + "active_games_peak": 2, + "games_autocreated": 1, + "http_requests_total": 20, + "move_response_time_ms_total": 50.0, + "move_response_time_ms_max": 8.0, + "last_game_start_unix": 10, + "last_game_end_unix": 15, + "last_move_unix": 16, + "games_stuck_removed": 0, + "active_games": 1, + "tracked_games": 1, + "oldest_active_game_age_sec": 5, + "stale_game_timeout_sec": 180, + "active_games_stale": 0, + "game_state_local_cache_enabled": True, + "http_requests_by_endpoint": { + "info": 1, + "start": 1, + "move": 17, + "end": 1, + }, + "move_direction_counts": { + "up": 5, + "down": 2, + "left": 1, + "right": 2, + "unknown": 0, + }, + }, + { + "games_started": 1, + "games_ended": 1, + "wins": 0, + "losses": 1, + "total_moves": 6, + "total_turns": 20, + "max_turn": 20, + "active_games_peak": 1, + "games_autocreated": 0, + "http_requests_total": 12, + "move_response_time_ms_total": 20.0, + "move_response_time_ms_max": 7.0, + "last_game_start_unix": 12, + "last_game_end_unix": 18, + "last_move_unix": 19, + "games_stuck_removed": 1, + "active_games": 2, + "tracked_games": 2, + "oldest_active_game_age_sec": 7, + "stale_game_timeout_sec": 180, + "active_games_stale": 1, + "game_state_local_cache_enabled": False, + "http_requests_by_endpoint": { + "info": 1, + "start": 1, + "move": 9, + "end": 1, + }, + "move_direction_counts": { + "up": 1, + "down": 1, + "left": 2, + "right": 2, + "unknown": 0, + }, + }, + ] + ) + + self.assertEqual(merged["games_started"], 3) + self.assertEqual(merged["games_ended"], 2) + self.assertEqual(merged["wins"], 1) + self.assertEqual(merged["losses"], 1) + self.assertEqual(merged["total_moves"], 16) + self.assertEqual(merged["move_response_time_ms_total"], 70.0) + self.assertEqual(merged["http_requests_by_endpoint"]["move"], 26) + self.assertEqual(merged["move_direction_counts"]["left"], 3) + self.assertEqual(merged["metrics_backend"], "redis") + await manager.close() + +if __name__ == "__main__": + unittest.main()