diff --git a/server/Server.py b/server/Server.py index 2e30c04..85df3f3 100644 --- a/server/Server.py +++ b/server/Server.py @@ -25,7 +25,7 @@ class Server: 'version': '1.0.0', } - def __init__(self, data_path:str, snake_type:str, storage_type:str, debug:bool=False, check_tls_security:bool=False, game_state_backend:str='memory', game_state_redis_url:str='redis://localhost:6379/0', game_state_ttl_sec:int=900, game_state_local_cache:bool=True, metrics_backend:str='memory', metrics_redis_url:str='redis://localhost:6379/0', metrics_ttl_sec:int=None): + def __init__(self, data_path:str, snake_type:str, storage_type:str, debug:bool=False, check_tls_security:bool=False, game_state_backend:str='memory', game_state_redis_url:str='redis://localhost:6379/0', game_state_ttl_sec:int=900, game_state_local_cache:bool=True, metrics_backend:str='memory', metrics_redis_url:str='redis://localhost:6379/0', metrics_ttl_sec:int|None=None): self.debug = debug self.snake_type = snake_type self.storage_type = storage_type @@ -44,9 +44,11 @@ class Server: ) metrics_backend_normalized = (metrics_backend or 'memory').strip().lower() self.stale_game_timeout_sec = self._get_stale_game_timeout_sec() + self.running_games:dict[str, GameBoard] = {} self.game_move_counts:dict[str, int] = {} self.game_last_seen_unix:dict[str, int] = {} + self.metrics_collector = ServerMetricsCollector( metrics_manager=MetricsManager( backend=metrics_backend_normalized, @@ -61,6 +63,10 @@ class Server: game_last_seen_unix=self.game_last_seen_unix, game_move_counts=self.game_move_counts, ) + self.clear_worker_metrics_on_startup = self._env_bool('METRICS_CLEAR_WORKERS_ON_STARTUP', True) + self.worker_metrics_startup_lock_ttl_sec = self._env_int('METRICS_STARTUP_CLEANUP_LOCK_TTL_SEC', 300) + self._startup_worker_metrics_cleared = False + self.logger = build_logger('Battlesnake', debug_env_var='DEBUG_SERVER') self.snake_version = self._get_snake_version() @@ -136,6 +142,16 @@ class Server: response.headers.set('server', 'battlesnake/gitea/snake-python') return response + @self.app.before_serving + async def clear_startup_worker_metrics_once(): + if self._startup_worker_metrics_cleared: + return + self._startup_worker_metrics_cleared = True + if self.clear_worker_metrics_on_startup: + should_clear = await self.metrics_collector.should_clear_worker_metrics_on_startup(self.worker_metrics_startup_lock_ttl_sec) + if should_clear: + await self.metrics_collector.clear_worker_metrics() + @self.app.after_serving async def shutdown_state_storage(): await self.game_state_store.close() @@ -210,6 +226,21 @@ class Server: except ValueError: return 180 + def _env_bool(self, name:str, default:bool=False) -> bool: + value = os.getenv(name) + if value is None: + return default + return value.strip().lower() in {'1', 'true', 'yes', 'on'} + + def _env_int(self, name: str, default: int) -> int: + value = os.getenv(name) + if value is None: + return default + try: + return int(value) + except ValueError: + return default + async def _create_game_board(self, game_state:dict) -> GameBoard: game_id = game_state['game']['id'] new_game_board = GameBoard( diff --git a/server/metrics/MemoryMetricsStore.py b/server/metrics/MemoryMetricsStore.py index 3bdd044..93cc626 100644 --- a/server/metrics/MemoryMetricsStore.py +++ b/server/metrics/MemoryMetricsStore.py @@ -8,5 +8,11 @@ class MemoryMetricsStore: async def load_all(self) -> list[dict]: return [dict(value) for value in self._snapshots.values()] + async def clear_all(self) -> None: + self._snapshots.clear() + + async def acquire_startup_cleanup_lock(self, lock_key:str, ttl_seconds:int=300) -> bool: + return True + async def close(self) -> None: return None diff --git a/server/metrics/MetricsManager.py b/server/metrics/MetricsManager.py index cce07bc..1dbcf7e 100644 --- a/server/metrics/MetricsManager.py +++ b/server/metrics/MetricsManager.py @@ -1,10 +1,12 @@ from server.metrics import MetricsStoreBuilder -import time, os +from typing import Any, Awaitable, cast +import time, os, inspect class MetricsManager: - def __init__(self, backend:str="memory", redis_url:str="redis://localhost:6379/0", ttl_seconds:int=90, key_prefix:str="snake:metrics:worker", worker_id:str|None=None): + def __init__(self, backend:str="memory", redis_url:str="redis://localhost:6379/0", ttl_seconds:int|None=90, key_prefix:str="snake:metrics:worker", worker_id:str|None=None): self.backend = (backend or "memory").strip().lower() + self.key_prefix = key_prefix self.worker_id = worker_id or f"{os.getpid()}-{int(time.time() * 1000)}" self.store = MetricsStoreBuilder.build( backend=self.backend, @@ -30,6 +32,27 @@ class MetricsManager: async def close(self) -> None: await self.store.close() + async def clear_all_workers(self) -> None: + clear_all = getattr(self.store, "clear_all", None) + if callable(clear_all): + maybe_result = clear_all() + if inspect.isawaitable(maybe_result): + await cast(Awaitable[Any], maybe_result) + + async def acquire_startup_cleanup_lock(self, ttl_seconds:int=300) -> bool: + if self.backend != "redis": + return True + + acquire_lock = getattr(self.store, "acquire_startup_cleanup_lock", None) + if not callable(acquire_lock): + return True + + lock_key = f"{self.key_prefix}:startup_cleanup_lock" + maybe_result = acquire_lock(lock_key, ttl_seconds) + if inspect.isawaitable(maybe_result): + return bool(await cast(Awaitable[Any], maybe_result)) + return bool(maybe_result) + def _merge_snapshots(self, snapshots:list[dict]) -> dict: merged = { "games_started": 0, @@ -86,11 +109,11 @@ class MetricsManager: merged["max_turn"] = max(merged["max_turn"], int(worker.get("max_turn", 0))) merged["active_games_peak"] = max(merged["active_games_peak"], int(worker.get("active_games_peak", 0))) merged["move_response_time_ms_max"] = max(merged["move_response_time_ms_max"], float(worker.get("move_response_time_ms_max", 0.0))) - merged["last_game_start_unix"] = max(merged["last_game_start_unix"], int(worker.get("last_game_start_unix", 0)),) + merged["last_game_start_unix"] = max(merged["last_game_start_unix"], int(worker.get("last_game_start_unix", 0))) merged["last_game_end_unix"] = max(merged["last_game_end_unix"], int(worker.get("last_game_end_unix", 0))) merged["last_move_unix"] = max(merged["last_move_unix"], int(worker.get("last_move_unix", 0))) - merged["oldest_active_game_age_sec"] = max(merged["oldest_active_game_age_sec"], int(worker.get("oldest_active_game_age_sec", 0)),) - merged["stale_game_timeout_sec"] = max(merged["stale_game_timeout_sec"], int(worker.get("stale_game_timeout_sec", 0)),) + merged["oldest_active_game_age_sec"] = max(merged["oldest_active_game_age_sec"], int(worker.get("oldest_active_game_age_sec", 0))) + merged["stale_game_timeout_sec"] = max(merged["stale_game_timeout_sec"], int(worker.get("stale_game_timeout_sec", 0))) merged["game_state_local_cache_enabled"] = merged["game_state_local_cache_enabled"] or bool(worker.get("game_state_local_cache_enabled", False)) for endpoint in merged["http_requests_by_endpoint"]: diff --git a/server/metrics/RedisMetricsStore.py b/server/metrics/RedisMetricsStore.py index 05b3ef6..e4f041b 100644 --- a/server/metrics/RedisMetricsStore.py +++ b/server/metrics/RedisMetricsStore.py @@ -2,7 +2,7 @@ import inspect import json class RedisMetricsStore: - def __init__(self, redis_url:str="redis://localhost:6379/0", key_prefix:str="snake:metrics:worker", ttl_seconds:int=None, **kwargs): + def __init__(self, redis_url:str="redis://localhost:6379/0", key_prefix:str="snake:metrics:worker", ttl_seconds:int|None=None, **kwargs): self.redis_url = redis_url self.key_prefix = key_prefix self.ttl_seconds = ttl_seconds @@ -41,6 +41,17 @@ class RedisMetricsStore: continue return snapshots + async def clear_all(self) -> None: + redis = await self._get_redis() + keys = await redis.keys(f"{self.key_prefix}:*") + if keys: + await redis.delete(*keys) + + async def acquire_startup_cleanup_lock(self, lock_key:str, ttl_seconds:int=300) -> bool: + redis = await self._get_redis() + locked = await redis.set(lock_key, '1', ex=max(1, int(ttl_seconds)), nx=True) + return bool(locked) + async def close(self) -> None: if self._redis is None: return diff --git a/server/metrics/ServerMetricsCollector.py b/server/metrics/ServerMetricsCollector.py index 695ec27..5506f48 100644 --- a/server/metrics/ServerMetricsCollector.py +++ b/server/metrics/ServerMetricsCollector.py @@ -1,9 +1,9 @@ -import time - from server.metrics.MetricsManager import MetricsManager +import time + class ServerMetricsCollector: - def __init__(self, metrics_manager:MetricsManager, game_state_local_cache:bool, metrics_backend:str, game_state_backend:str, stale_game_timeout_sec:int, game_last_seen_unix:dict, game_move_counts:dict,): + def __init__(self, metrics_manager:MetricsManager, game_state_local_cache:bool, metrics_backend:str, game_state_backend:str, stale_game_timeout_sec:int, game_last_seen_unix:dict, game_move_counts:dict): self._manager = metrics_manager self._stale_game_timeout_sec = stale_game_timeout_sec self._game_last_seen_unix = game_last_seen_unix @@ -167,6 +167,12 @@ class ServerMetricsCollector: local_snapshot = self.build_local_snapshot(game_last_seen_unix, game_move_counts) return await self._manager.snapshot(local_snapshot) + async def clear_worker_metrics(self) -> None: + await self._manager.clear_all_workers() + + async def should_clear_worker_metrics_on_startup(self, lock_ttl_seconds:int=300) -> bool: + return await self._manager.acquire_startup_cleanup_lock(lock_ttl_seconds) + def build_prometheus_metrics(self, snapshot:dict) -> str: lines = [ '# HELP snake_games_started_total Total games started by snake server.', diff --git a/tests/test_MetricsManager.py b/tests/test_MetricsManager.py index 0c2ee4c..859d196 100644 --- a/tests/test_MetricsManager.py +++ b/tests/test_MetricsManager.py @@ -1,4 +1,5 @@ import unittest +from typing import Any, cast from server.metrics.MetricsManager import MetricsManager @@ -139,5 +140,26 @@ class TestMetricsManager(unittest.IsolatedAsyncioTestCase): self.assertEqual(merged["metrics_backend"], "redis") await manager.close() + async def test_acquire_startup_cleanup_lock_uses_store_for_redis_backend(self): + class FakeStore: + def __init__(self): + self.calls = [] + + async def acquire_startup_cleanup_lock(self, lock_key:str, ttl_seconds:int=300): + self.calls.append((lock_key, ttl_seconds)) + return True + + async def close(self): + return None + + manager = MetricsManager(backend="redis", key_prefix="snake:metrics:worker") + fake_store = FakeStore() + manager.store = cast(Any, fake_store) + + allowed = await manager.acquire_startup_cleanup_lock(180) + self.assertTrue(allowed) + self.assertEqual(fake_store.calls, [("snake:metrics:worker:startup_cleanup_lock", 180)]) + await manager.close() + if __name__ == "__main__": unittest.main()