Files
simple-nanoshare/my_modules/AsyncCache.py
T

106 lines
3.1 KiB
Python

import redis.asyncio as aioredis
import asyncio
import pickle
class AsyncCache:
def __init__(self, backend="memory", prefix="cache:", default_ttl=300, **kwargs):
self.prefix = prefix
self.ttl = default_ttl
if backend == "redis":
self.backend_type = "redis"
self.redis = aioredis.Redis(
host=kwargs.get("host", "localhost"),
port=int(kwargs.get("port", 6379)),
username=kwargs.get("username"),
password=kwargs.get("password"),
db=int(kwargs.get("db", 0)),
decode_responses=False,
)
else:
self.backend_type = "memory"
self._store = {}
self._expiry = {}
# --------- asynchrone Pickle-Helfer -------------
async def _pickle_dumps(self, value):
"""Pickle.dumps in Threadpool ausführen."""
return await asyncio.to_thread(pickle.dumps, value, protocol=pickle.HIGHEST_PROTOCOL)
async def _pickle_loads(self, data):
"""Pickle.loads in Threadpool ausführen."""
return await asyncio.to_thread(pickle.loads, data)
# --------- Flask-Caching-ähnliche Serialisierung -------------
async def dump_object(self, value) -> bytes:
"""
* int -> ASCII-Bytes ohne Prefix
* alles andere -> b"!" + Pickle-Bytes
"""
if isinstance(value, int):
return str(value).encode("ascii")
pickled = await self._pickle_dumps(value)
return b"!" + pickled
async def load_object(self, value: bytes):
"""
* beginnt mit b"!" -> Pickle.loads
* ansonsten -> int oder UTF-8-String
"""
if value is None:
return None
if value.startswith(b"!"):
return await self._pickle_loads(value[1:])
try:
return int(value)
except ValueError:
return value.decode("utf-8", errors="ignore")
# ----------------- Öffentliche API -----------------
async def set(self, key, value, ttl=None):
ttl = ttl or self.ttl
full_key = f"{self.prefix}{key}"
dumped = await self.dump_object(value)
if self.backend_type == "redis":
return await self.redis.set(full_key, dumped, ex=ttl)
else:
self._store[full_key] = dumped
self._expiry[full_key] = asyncio.get_event_loop().time() + ttl
return True
async def get(self, key):
full_key = f"{self.prefix}{key}"
if self.backend_type == "redis":
data = await self.redis.get(full_key)
else:
expire = self._expiry.get(full_key)
if expire and expire < asyncio.get_event_loop().time():
self._store.pop(full_key, None)
self._expiry.pop(full_key, None)
return None
data = self._store.get(full_key)
return await self.load_object(data)
async def delete(self, key):
full_key = f"{self.prefix}{key}"
if self.backend_type == "redis":
return await self.redis.delete(full_key)
else:
self._store.pop(full_key, None)
self._expiry.pop(full_key, None)
return True
async def clear(self):
if self.backend_type == "redis":
keys = await self.redis.keys(f"{self.prefix}*")
if keys:
return await self.redis.delete(*keys)
return 0
else:
self._store.clear()
self._expiry.clear()
return True