61 lines
1.8 KiB
Python
61 lines
1.8 KiB
Python
import inspect
|
|
import json
|
|
|
|
class RedisMetricsStore:
|
|
def __init__(self, redis_url:str="redis://localhost:6379/0", key_prefix:str="snake:metrics:worker", ttl_seconds:int=None, **kwargs):
|
|
self.redis_url = redis_url
|
|
self.key_prefix = key_prefix
|
|
self.ttl_seconds = ttl_seconds
|
|
self._redis = None
|
|
|
|
async def _get_redis(self):
|
|
if self._redis is not None:
|
|
return self._redis
|
|
|
|
try:
|
|
import redis.asyncio as aioredis # type: ignore[import-not-found]
|
|
except ImportError as error: # pragma: no cover
|
|
raise RuntimeError("Metrics backend set to redis but 'redis' package is not installed") from error
|
|
|
|
self._redis = aioredis.from_url(self.redis_url)
|
|
return self._redis
|
|
|
|
def _key(self, worker_id:str) -> str:
|
|
return f"{self.key_prefix}:{worker_id}"
|
|
|
|
async def publish(self, worker_id:str, snapshot:dict) -> None:
|
|
redis = await self._get_redis()
|
|
await redis.set(self._key(worker_id), json.dumps(snapshot), ex=self.ttl_seconds)
|
|
|
|
async def load_all(self) -> list[dict]:
|
|
redis = await self._get_redis()
|
|
keys = await redis.keys(f"{self.key_prefix}:*")
|
|
snapshots = []
|
|
for key in keys:
|
|
payload = await redis.get(key)
|
|
if not payload:
|
|
continue
|
|
try:
|
|
snapshots.append(json.loads(payload))
|
|
except Exception:
|
|
continue
|
|
return snapshots
|
|
|
|
async def close(self) -> None:
|
|
if self._redis is None:
|
|
return
|
|
|
|
aclose_method = getattr(self._redis, "aclose", None)
|
|
if callable(aclose_method):
|
|
maybe_result = aclose_method()
|
|
if inspect.isawaitable(maybe_result):
|
|
await maybe_result
|
|
else:
|
|
close_method = getattr(self._redis, "close", None)
|
|
if callable(close_method):
|
|
close_result = close_method()
|
|
if inspect.isawaitable(close_result):
|
|
await close_result
|
|
|
|
self._redis = None
|