import redis.asyncio as aioredis import asyncio import pickle class AsyncCache: def __init__(self, backend="memory", prefix="cache:", default_ttl=300, **kwargs): self.prefix = prefix self.ttl = default_ttl if backend == "redis": self.backend_type = "redis" self.redis = aioredis.Redis( host=kwargs.get("host", "localhost"), port=int(kwargs.get("port", 6379)), username=kwargs.get("username"), password=kwargs.get("password"), db=int(kwargs.get("db", 0)), decode_responses=False, ) else: self.backend_type = "memory" self._store = {} self._expiry = {} # --------- asynchrone Pickle-Helfer ------------- async def _pickle_dumps(self, value): """Pickle.dumps in Threadpool ausführen.""" return await asyncio.to_thread(pickle.dumps, value, protocol=pickle.HIGHEST_PROTOCOL) async def _pickle_loads(self, data): """Pickle.loads in Threadpool ausführen.""" return await asyncio.to_thread(pickle.loads, data) # --------- Flask-Caching-ähnliche Serialisierung ------------- async def dump_object(self, value) -> bytes: """ * int -> ASCII-Bytes ohne Prefix * alles andere -> b"!" + Pickle-Bytes """ if isinstance(value, int): return str(value).encode("ascii") pickled = await self._pickle_dumps(value) return b"!" + pickled async def load_object(self, value: bytes): """ * beginnt mit b"!" -> Pickle.loads * ansonsten -> int oder UTF-8-String """ if value is None: return None if value.startswith(b"!"): return await self._pickle_loads(value[1:]) try: return int(value) except ValueError: return value.decode("utf-8", errors="ignore") # ----------------- Öffentliche API ----------------- async def set(self, key, value, ttl=None): ttl = ttl or self.ttl full_key = f"{self.prefix}{key}" dumped = await self.dump_object(value) if self.backend_type == "redis": return await self.redis.set(full_key, dumped, ex=ttl) else: self._store[full_key] = dumped self._expiry[full_key] = asyncio.get_event_loop().time() + ttl return True async def get(self, key): full_key = f"{self.prefix}{key}" if self.backend_type == "redis": data = await self.redis.get(full_key) else: expire = self._expiry.get(full_key) if expire and expire < asyncio.get_event_loop().time(): self._store.pop(full_key, None) self._expiry.pop(full_key, None) return None data = self._store.get(full_key) return await self.load_object(data) async def delete(self, key): full_key = f"{self.prefix}{key}" if self.backend_type == "redis": return await self.redis.delete(full_key) else: self._store.pop(full_key, None) self._expiry.pop(full_key, None) return True async def clear(self): if self.backend_type == "redis": keys = await self.redis.keys(f"{self.prefix}*") if keys: return await self.redis.delete(*keys) return 0 else: self._store.clear() self._expiry.clear() return True