from server.metrics.backends.Template import StoreTemplate import inspect, json class RedisMetricsStore(StoreTemplate): def __init__(self, redis_url:str="redis://localhost:6379/0", key_prefix:str="snake:metrics:worker", ttl_seconds:int|None=None, **kwargs): super().__init__(backend="redis", key_prefix=key_prefix, **kwargs) self.redis_url = redis_url self.key_prefix = key_prefix self.ttl_seconds = ttl_seconds self._redis = None async def _get_redis(self): if self._redis is not None: return self._redis try: import redis.asyncio as aioredis # type: ignore[import-not-found] except ImportError as error: # pragma: no cover raise RuntimeError("Metrics backend set to redis but 'redis' package is not installed") from error self._redis = aioredis.from_url(self.redis_url) return self._redis def _key(self, worker_id:str) -> str: return f"{self.key_prefix}:{worker_id}" async def publish(self, worker_id:str, snapshot:dict) -> None: redis = await self._get_redis() await redis.set(self._key(worker_id), json.dumps(snapshot), ex=self.ttl_seconds) async def load_all(self) -> list[dict]: redis = await self._get_redis() keys = await redis.keys(f"{self.key_prefix}:*") snapshots = [] for key in keys: payload = await redis.get(key) if not payload: continue try: snapshots.append(json.loads(payload)) except Exception: continue return snapshots async def clear_all(self) -> None: redis = await self._get_redis() keys = await redis.keys(f"{self.key_prefix}:*") if keys: await redis.delete(*keys) async def _acquire_startup_cleanup_lock(self, lock_key:str, ttl_seconds:int=300) -> bool: redis = await self._get_redis() locked = await redis.set(lock_key, '1', ex=max(1, int(ttl_seconds)), nx=True) return bool(locked) async def close(self) -> None: if self._redis is None: return aclose_method = getattr(self._redis, "aclose", None) if callable(aclose_method): maybe_result = aclose_method() if inspect.isawaitable(maybe_result): await maybe_result else: close_method = getattr(self._redis, "close", None) if callable(close_method): close_result = close_method() if inspect.isawaitable(close_result): await close_result self._redis = None