Files
snake-python/server/metrics/backends/Redis.py
T

74 lines
2.4 KiB
Python

from server.metrics.backends.Template import StoreTemplate
import inspect, json
class RedisMetricsStore(StoreTemplate):
def __init__(self, redis_url:str="redis://localhost:6379/0", key_prefix:str="snake:metrics:worker", ttl_seconds:int|None=None, **kwargs):
super().__init__(backend="redis", key_prefix=key_prefix, **kwargs)
self.redis_url = redis_url
self.key_prefix = key_prefix
self.ttl_seconds = ttl_seconds
self._redis = None
async def _get_redis(self):
if self._redis is not None:
return self._redis
try:
import redis.asyncio as aioredis # type: ignore[import-not-found]
except ImportError as error: # pragma: no cover
raise RuntimeError("Metrics backend set to redis but 'redis' package is not installed") from error
self._redis = aioredis.from_url(self.redis_url)
return self._redis
def _key(self, worker_id:str) -> str:
return f"{self.key_prefix}:{worker_id}"
async def publish(self, worker_id:str, snapshot:dict) -> None:
redis = await self._get_redis()
await redis.set(self._key(worker_id), json.dumps(snapshot), ex=self.ttl_seconds)
async def load_all(self) -> list[dict]:
redis = await self._get_redis()
keys = await redis.keys(f"{self.key_prefix}:*")
snapshots = []
for key in keys:
payload = await redis.get(key)
if not payload:
continue
try:
snapshots.append(json.loads(payload))
except Exception:
continue
return snapshots
async def clear_all(self) -> None:
redis = await self._get_redis()
keys = await redis.keys(f"{self.key_prefix}:*")
if keys:
await redis.delete(*keys)
async def _acquire_startup_cleanup_lock(self, lock_key:str, ttl_seconds:int=300) -> bool:
redis = await self._get_redis()
locked = await redis.set(lock_key, '1', ex=max(1, int(ttl_seconds)), nx=True)
return bool(locked)
async def close(self) -> None:
if self._redis is None:
return
aclose_method = getattr(self._redis, "aclose", None)
if callable(aclose_method):
maybe_result = aclose_method()
if inspect.isawaitable(maybe_result):
await maybe_result
else:
close_method = getattr(self._redis, "close", None)
if callable(close_method):
close_result = close_method()
if inspect.isawaitable(close_result):
await close_result
self._redis = None