cleanup all worker stats when starting up servers
This commit is contained in:
+32
-1
@@ -25,7 +25,7 @@ class Server:
|
|||||||
'version': '1.0.0',
|
'version': '1.0.0',
|
||||||
}
|
}
|
||||||
|
|
||||||
def __init__(self, data_path:str, snake_type:str, storage_type:str, debug:bool=False, check_tls_security:bool=False, game_state_backend:str='memory', game_state_redis_url:str='redis://localhost:6379/0', game_state_ttl_sec:int=900, game_state_local_cache:bool=True, metrics_backend:str='memory', metrics_redis_url:str='redis://localhost:6379/0', metrics_ttl_sec:int=None):
|
def __init__(self, data_path:str, snake_type:str, storage_type:str, debug:bool=False, check_tls_security:bool=False, game_state_backend:str='memory', game_state_redis_url:str='redis://localhost:6379/0', game_state_ttl_sec:int=900, game_state_local_cache:bool=True, metrics_backend:str='memory', metrics_redis_url:str='redis://localhost:6379/0', metrics_ttl_sec:int|None=None):
|
||||||
self.debug = debug
|
self.debug = debug
|
||||||
self.snake_type = snake_type
|
self.snake_type = snake_type
|
||||||
self.storage_type = storage_type
|
self.storage_type = storage_type
|
||||||
@@ -44,9 +44,11 @@ class Server:
|
|||||||
)
|
)
|
||||||
metrics_backend_normalized = (metrics_backend or 'memory').strip().lower()
|
metrics_backend_normalized = (metrics_backend or 'memory').strip().lower()
|
||||||
self.stale_game_timeout_sec = self._get_stale_game_timeout_sec()
|
self.stale_game_timeout_sec = self._get_stale_game_timeout_sec()
|
||||||
|
|
||||||
self.running_games:dict[str, GameBoard] = {}
|
self.running_games:dict[str, GameBoard] = {}
|
||||||
self.game_move_counts:dict[str, int] = {}
|
self.game_move_counts:dict[str, int] = {}
|
||||||
self.game_last_seen_unix:dict[str, int] = {}
|
self.game_last_seen_unix:dict[str, int] = {}
|
||||||
|
|
||||||
self.metrics_collector = ServerMetricsCollector(
|
self.metrics_collector = ServerMetricsCollector(
|
||||||
metrics_manager=MetricsManager(
|
metrics_manager=MetricsManager(
|
||||||
backend=metrics_backend_normalized,
|
backend=metrics_backend_normalized,
|
||||||
@@ -61,6 +63,10 @@ class Server:
|
|||||||
game_last_seen_unix=self.game_last_seen_unix,
|
game_last_seen_unix=self.game_last_seen_unix,
|
||||||
game_move_counts=self.game_move_counts,
|
game_move_counts=self.game_move_counts,
|
||||||
)
|
)
|
||||||
|
self.clear_worker_metrics_on_startup = self._env_bool('METRICS_CLEAR_WORKERS_ON_STARTUP', True)
|
||||||
|
self.worker_metrics_startup_lock_ttl_sec = self._env_int('METRICS_STARTUP_CLEANUP_LOCK_TTL_SEC', 300)
|
||||||
|
self._startup_worker_metrics_cleared = False
|
||||||
|
|
||||||
self.logger = build_logger('Battlesnake', debug_env_var='DEBUG_SERVER')
|
self.logger = build_logger('Battlesnake', debug_env_var='DEBUG_SERVER')
|
||||||
self.snake_version = self._get_snake_version()
|
self.snake_version = self._get_snake_version()
|
||||||
|
|
||||||
@@ -136,6 +142,16 @@ class Server:
|
|||||||
response.headers.set('server', 'battlesnake/gitea/snake-python')
|
response.headers.set('server', 'battlesnake/gitea/snake-python')
|
||||||
return response
|
return response
|
||||||
|
|
||||||
|
@self.app.before_serving
|
||||||
|
async def clear_startup_worker_metrics_once():
|
||||||
|
if self._startup_worker_metrics_cleared:
|
||||||
|
return
|
||||||
|
self._startup_worker_metrics_cleared = True
|
||||||
|
if self.clear_worker_metrics_on_startup:
|
||||||
|
should_clear = await self.metrics_collector.should_clear_worker_metrics_on_startup(self.worker_metrics_startup_lock_ttl_sec)
|
||||||
|
if should_clear:
|
||||||
|
await self.metrics_collector.clear_worker_metrics()
|
||||||
|
|
||||||
@self.app.after_serving
|
@self.app.after_serving
|
||||||
async def shutdown_state_storage():
|
async def shutdown_state_storage():
|
||||||
await self.game_state_store.close()
|
await self.game_state_store.close()
|
||||||
@@ -210,6 +226,21 @@ class Server:
|
|||||||
except ValueError:
|
except ValueError:
|
||||||
return 180
|
return 180
|
||||||
|
|
||||||
|
def _env_bool(self, name:str, default:bool=False) -> bool:
|
||||||
|
value = os.getenv(name)
|
||||||
|
if value is None:
|
||||||
|
return default
|
||||||
|
return value.strip().lower() in {'1', 'true', 'yes', 'on'}
|
||||||
|
|
||||||
|
def _env_int(self, name: str, default: int) -> int:
|
||||||
|
value = os.getenv(name)
|
||||||
|
if value is None:
|
||||||
|
return default
|
||||||
|
try:
|
||||||
|
return int(value)
|
||||||
|
except ValueError:
|
||||||
|
return default
|
||||||
|
|
||||||
async def _create_game_board(self, game_state:dict) -> GameBoard:
|
async def _create_game_board(self, game_state:dict) -> GameBoard:
|
||||||
game_id = game_state['game']['id']
|
game_id = game_state['game']['id']
|
||||||
new_game_board = GameBoard(
|
new_game_board = GameBoard(
|
||||||
|
|||||||
@@ -8,5 +8,11 @@ class MemoryMetricsStore:
|
|||||||
async def load_all(self) -> list[dict]:
|
async def load_all(self) -> list[dict]:
|
||||||
return [dict(value) for value in self._snapshots.values()]
|
return [dict(value) for value in self._snapshots.values()]
|
||||||
|
|
||||||
|
async def clear_all(self) -> None:
|
||||||
|
self._snapshots.clear()
|
||||||
|
|
||||||
|
async def acquire_startup_cleanup_lock(self, lock_key:str, ttl_seconds:int=300) -> bool:
|
||||||
|
return True
|
||||||
|
|
||||||
async def close(self) -> None:
|
async def close(self) -> None:
|
||||||
return None
|
return None
|
||||||
|
|||||||
@@ -1,10 +1,12 @@
|
|||||||
from server.metrics import MetricsStoreBuilder
|
from server.metrics import MetricsStoreBuilder
|
||||||
|
|
||||||
import time, os
|
from typing import Any, Awaitable, cast
|
||||||
|
import time, os, inspect
|
||||||
|
|
||||||
class MetricsManager:
|
class MetricsManager:
|
||||||
def __init__(self, backend:str="memory", redis_url:str="redis://localhost:6379/0", ttl_seconds:int=90, key_prefix:str="snake:metrics:worker", worker_id:str|None=None):
|
def __init__(self, backend:str="memory", redis_url:str="redis://localhost:6379/0", ttl_seconds:int|None=90, key_prefix:str="snake:metrics:worker", worker_id:str|None=None):
|
||||||
self.backend = (backend or "memory").strip().lower()
|
self.backend = (backend or "memory").strip().lower()
|
||||||
|
self.key_prefix = key_prefix
|
||||||
self.worker_id = worker_id or f"{os.getpid()}-{int(time.time() * 1000)}"
|
self.worker_id = worker_id or f"{os.getpid()}-{int(time.time() * 1000)}"
|
||||||
self.store = MetricsStoreBuilder.build(
|
self.store = MetricsStoreBuilder.build(
|
||||||
backend=self.backend,
|
backend=self.backend,
|
||||||
@@ -30,6 +32,27 @@ class MetricsManager:
|
|||||||
async def close(self) -> None:
|
async def close(self) -> None:
|
||||||
await self.store.close()
|
await self.store.close()
|
||||||
|
|
||||||
|
async def clear_all_workers(self) -> None:
|
||||||
|
clear_all = getattr(self.store, "clear_all", None)
|
||||||
|
if callable(clear_all):
|
||||||
|
maybe_result = clear_all()
|
||||||
|
if inspect.isawaitable(maybe_result):
|
||||||
|
await cast(Awaitable[Any], maybe_result)
|
||||||
|
|
||||||
|
async def acquire_startup_cleanup_lock(self, ttl_seconds:int=300) -> bool:
|
||||||
|
if self.backend != "redis":
|
||||||
|
return True
|
||||||
|
|
||||||
|
acquire_lock = getattr(self.store, "acquire_startup_cleanup_lock", None)
|
||||||
|
if not callable(acquire_lock):
|
||||||
|
return True
|
||||||
|
|
||||||
|
lock_key = f"{self.key_prefix}:startup_cleanup_lock"
|
||||||
|
maybe_result = acquire_lock(lock_key, ttl_seconds)
|
||||||
|
if inspect.isawaitable(maybe_result):
|
||||||
|
return bool(await cast(Awaitable[Any], maybe_result))
|
||||||
|
return bool(maybe_result)
|
||||||
|
|
||||||
def _merge_snapshots(self, snapshots:list[dict]) -> dict:
|
def _merge_snapshots(self, snapshots:list[dict]) -> dict:
|
||||||
merged = {
|
merged = {
|
||||||
"games_started": 0,
|
"games_started": 0,
|
||||||
@@ -86,11 +109,11 @@ class MetricsManager:
|
|||||||
merged["max_turn"] = max(merged["max_turn"], int(worker.get("max_turn", 0)))
|
merged["max_turn"] = max(merged["max_turn"], int(worker.get("max_turn", 0)))
|
||||||
merged["active_games_peak"] = max(merged["active_games_peak"], int(worker.get("active_games_peak", 0)))
|
merged["active_games_peak"] = max(merged["active_games_peak"], int(worker.get("active_games_peak", 0)))
|
||||||
merged["move_response_time_ms_max"] = max(merged["move_response_time_ms_max"], float(worker.get("move_response_time_ms_max", 0.0)))
|
merged["move_response_time_ms_max"] = max(merged["move_response_time_ms_max"], float(worker.get("move_response_time_ms_max", 0.0)))
|
||||||
merged["last_game_start_unix"] = max(merged["last_game_start_unix"], int(worker.get("last_game_start_unix", 0)),)
|
merged["last_game_start_unix"] = max(merged["last_game_start_unix"], int(worker.get("last_game_start_unix", 0)))
|
||||||
merged["last_game_end_unix"] = max(merged["last_game_end_unix"], int(worker.get("last_game_end_unix", 0)))
|
merged["last_game_end_unix"] = max(merged["last_game_end_unix"], int(worker.get("last_game_end_unix", 0)))
|
||||||
merged["last_move_unix"] = max(merged["last_move_unix"], int(worker.get("last_move_unix", 0)))
|
merged["last_move_unix"] = max(merged["last_move_unix"], int(worker.get("last_move_unix", 0)))
|
||||||
merged["oldest_active_game_age_sec"] = max(merged["oldest_active_game_age_sec"], int(worker.get("oldest_active_game_age_sec", 0)),)
|
merged["oldest_active_game_age_sec"] = max(merged["oldest_active_game_age_sec"], int(worker.get("oldest_active_game_age_sec", 0)))
|
||||||
merged["stale_game_timeout_sec"] = max(merged["stale_game_timeout_sec"], int(worker.get("stale_game_timeout_sec", 0)),)
|
merged["stale_game_timeout_sec"] = max(merged["stale_game_timeout_sec"], int(worker.get("stale_game_timeout_sec", 0)))
|
||||||
merged["game_state_local_cache_enabled"] = merged["game_state_local_cache_enabled"] or bool(worker.get("game_state_local_cache_enabled", False))
|
merged["game_state_local_cache_enabled"] = merged["game_state_local_cache_enabled"] or bool(worker.get("game_state_local_cache_enabled", False))
|
||||||
|
|
||||||
for endpoint in merged["http_requests_by_endpoint"]:
|
for endpoint in merged["http_requests_by_endpoint"]:
|
||||||
|
|||||||
@@ -2,7 +2,7 @@ import inspect
|
|||||||
import json
|
import json
|
||||||
|
|
||||||
class RedisMetricsStore:
|
class RedisMetricsStore:
|
||||||
def __init__(self, redis_url:str="redis://localhost:6379/0", key_prefix:str="snake:metrics:worker", ttl_seconds:int=None, **kwargs):
|
def __init__(self, redis_url:str="redis://localhost:6379/0", key_prefix:str="snake:metrics:worker", ttl_seconds:int|None=None, **kwargs):
|
||||||
self.redis_url = redis_url
|
self.redis_url = redis_url
|
||||||
self.key_prefix = key_prefix
|
self.key_prefix = key_prefix
|
||||||
self.ttl_seconds = ttl_seconds
|
self.ttl_seconds = ttl_seconds
|
||||||
@@ -41,6 +41,17 @@ class RedisMetricsStore:
|
|||||||
continue
|
continue
|
||||||
return snapshots
|
return snapshots
|
||||||
|
|
||||||
|
async def clear_all(self) -> None:
|
||||||
|
redis = await self._get_redis()
|
||||||
|
keys = await redis.keys(f"{self.key_prefix}:*")
|
||||||
|
if keys:
|
||||||
|
await redis.delete(*keys)
|
||||||
|
|
||||||
|
async def acquire_startup_cleanup_lock(self, lock_key:str, ttl_seconds:int=300) -> bool:
|
||||||
|
redis = await self._get_redis()
|
||||||
|
locked = await redis.set(lock_key, '1', ex=max(1, int(ttl_seconds)), nx=True)
|
||||||
|
return bool(locked)
|
||||||
|
|
||||||
async def close(self) -> None:
|
async def close(self) -> None:
|
||||||
if self._redis is None:
|
if self._redis is None:
|
||||||
return
|
return
|
||||||
|
|||||||
@@ -1,9 +1,9 @@
|
|||||||
import time
|
|
||||||
|
|
||||||
from server.metrics.MetricsManager import MetricsManager
|
from server.metrics.MetricsManager import MetricsManager
|
||||||
|
|
||||||
|
import time
|
||||||
|
|
||||||
class ServerMetricsCollector:
|
class ServerMetricsCollector:
|
||||||
def __init__(self, metrics_manager:MetricsManager, game_state_local_cache:bool, metrics_backend:str, game_state_backend:str, stale_game_timeout_sec:int, game_last_seen_unix:dict, game_move_counts:dict,):
|
def __init__(self, metrics_manager:MetricsManager, game_state_local_cache:bool, metrics_backend:str, game_state_backend:str, stale_game_timeout_sec:int, game_last_seen_unix:dict, game_move_counts:dict):
|
||||||
self._manager = metrics_manager
|
self._manager = metrics_manager
|
||||||
self._stale_game_timeout_sec = stale_game_timeout_sec
|
self._stale_game_timeout_sec = stale_game_timeout_sec
|
||||||
self._game_last_seen_unix = game_last_seen_unix
|
self._game_last_seen_unix = game_last_seen_unix
|
||||||
@@ -167,6 +167,12 @@ class ServerMetricsCollector:
|
|||||||
local_snapshot = self.build_local_snapshot(game_last_seen_unix, game_move_counts)
|
local_snapshot = self.build_local_snapshot(game_last_seen_unix, game_move_counts)
|
||||||
return await self._manager.snapshot(local_snapshot)
|
return await self._manager.snapshot(local_snapshot)
|
||||||
|
|
||||||
|
async def clear_worker_metrics(self) -> None:
|
||||||
|
await self._manager.clear_all_workers()
|
||||||
|
|
||||||
|
async def should_clear_worker_metrics_on_startup(self, lock_ttl_seconds:int=300) -> bool:
|
||||||
|
return await self._manager.acquire_startup_cleanup_lock(lock_ttl_seconds)
|
||||||
|
|
||||||
def build_prometheus_metrics(self, snapshot:dict) -> str:
|
def build_prometheus_metrics(self, snapshot:dict) -> str:
|
||||||
lines = [
|
lines = [
|
||||||
'# HELP snake_games_started_total Total games started by snake server.',
|
'# HELP snake_games_started_total Total games started by snake server.',
|
||||||
|
|||||||
@@ -1,4 +1,5 @@
|
|||||||
import unittest
|
import unittest
|
||||||
|
from typing import Any, cast
|
||||||
|
|
||||||
from server.metrics.MetricsManager import MetricsManager
|
from server.metrics.MetricsManager import MetricsManager
|
||||||
|
|
||||||
@@ -139,5 +140,26 @@ class TestMetricsManager(unittest.IsolatedAsyncioTestCase):
|
|||||||
self.assertEqual(merged["metrics_backend"], "redis")
|
self.assertEqual(merged["metrics_backend"], "redis")
|
||||||
await manager.close()
|
await manager.close()
|
||||||
|
|
||||||
|
async def test_acquire_startup_cleanup_lock_uses_store_for_redis_backend(self):
|
||||||
|
class FakeStore:
|
||||||
|
def __init__(self):
|
||||||
|
self.calls = []
|
||||||
|
|
||||||
|
async def acquire_startup_cleanup_lock(self, lock_key:str, ttl_seconds:int=300):
|
||||||
|
self.calls.append((lock_key, ttl_seconds))
|
||||||
|
return True
|
||||||
|
|
||||||
|
async def close(self):
|
||||||
|
return None
|
||||||
|
|
||||||
|
manager = MetricsManager(backend="redis", key_prefix="snake:metrics:worker")
|
||||||
|
fake_store = FakeStore()
|
||||||
|
manager.store = cast(Any, fake_store)
|
||||||
|
|
||||||
|
allowed = await manager.acquire_startup_cleanup_lock(180)
|
||||||
|
self.assertTrue(allowed)
|
||||||
|
self.assertEqual(fake_store.calls, [("snake:metrics:worker:startup_cleanup_lock", 180)])
|
||||||
|
await manager.close()
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
unittest.main()
|
unittest.main()
|
||||||
|
|||||||
Reference in New Issue
Block a user