106 lines
3.1 KiB
Python
106 lines
3.1 KiB
Python
import redis.asyncio as aioredis
|
|
import asyncio
|
|
import pickle
|
|
|
|
class AsyncCache:
|
|
def __init__(self, backend="memory", prefix="cache:", default_ttl=300, **kwargs):
|
|
self.prefix = prefix
|
|
self.ttl = default_ttl
|
|
|
|
if backend == "redis":
|
|
self.backend_type = "redis"
|
|
self.redis = aioredis.Redis(
|
|
host=kwargs.get("host", "localhost"),
|
|
port=int(kwargs.get("port", 6379)),
|
|
username=kwargs.get("username"),
|
|
password=kwargs.get("password"),
|
|
db=int(kwargs.get("db", 0)),
|
|
decode_responses=False,
|
|
)
|
|
else:
|
|
self.backend_type = "memory"
|
|
self._store = {}
|
|
self._expiry = {}
|
|
|
|
# --------- asynchrone Pickle-Helfer -------------
|
|
async def _pickle_dumps(self, value):
|
|
"""Pickle.dumps in Threadpool ausführen."""
|
|
return await asyncio.to_thread(pickle.dumps, value, protocol=pickle.HIGHEST_PROTOCOL)
|
|
|
|
async def _pickle_loads(self, data):
|
|
"""Pickle.loads in Threadpool ausführen."""
|
|
return await asyncio.to_thread(pickle.loads, data)
|
|
|
|
# --------- Flask-Caching-ähnliche Serialisierung -------------
|
|
async def dump_object(self, value) -> bytes:
|
|
"""
|
|
* int -> ASCII-Bytes ohne Prefix
|
|
* alles andere -> b"!" + Pickle-Bytes
|
|
"""
|
|
if isinstance(value, int):
|
|
return str(value).encode("ascii")
|
|
pickled = await self._pickle_dumps(value)
|
|
return b"!" + pickled
|
|
|
|
async def load_object(self, value: bytes):
|
|
"""
|
|
* beginnt mit b"!" -> Pickle.loads
|
|
* ansonsten -> int oder UTF-8-String
|
|
"""
|
|
if value is None:
|
|
return None
|
|
if value.startswith(b"!"):
|
|
return await self._pickle_loads(value[1:])
|
|
try:
|
|
return int(value)
|
|
except ValueError:
|
|
return value.decode("utf-8", errors="ignore")
|
|
|
|
# ----------------- Öffentliche API -----------------
|
|
async def set(self, key, value, ttl=None):
|
|
ttl = ttl or self.ttl
|
|
full_key = f"{self.prefix}{key}"
|
|
dumped = await self.dump_object(value)
|
|
|
|
if self.backend_type == "redis":
|
|
return await self.redis.set(full_key, dumped, ex=ttl)
|
|
else:
|
|
self._store[full_key] = dumped
|
|
self._expiry[full_key] = asyncio.get_event_loop().time() + ttl
|
|
return True
|
|
|
|
async def get(self, key):
|
|
full_key = f"{self.prefix}{key}"
|
|
|
|
if self.backend_type == "redis":
|
|
data = await self.redis.get(full_key)
|
|
else:
|
|
expire = self._expiry.get(full_key)
|
|
if expire and expire < asyncio.get_event_loop().time():
|
|
self._store.pop(full_key, None)
|
|
self._expiry.pop(full_key, None)
|
|
return None
|
|
data = self._store.get(full_key)
|
|
|
|
return await self.load_object(data)
|
|
|
|
async def delete(self, key):
|
|
full_key = f"{self.prefix}{key}"
|
|
if self.backend_type == "redis":
|
|
return await self.redis.delete(full_key)
|
|
else:
|
|
self._store.pop(full_key, None)
|
|
self._expiry.pop(full_key, None)
|
|
return True
|
|
|
|
async def clear(self):
|
|
if self.backend_type == "redis":
|
|
keys = await self.redis.keys(f"{self.prefix}*")
|
|
if keys:
|
|
return await self.redis.delete(*keys)
|
|
return 0
|
|
else:
|
|
self._store.clear()
|
|
self._expiry.clear()
|
|
return True
|