68 lines
2.1 KiB
Python
68 lines
2.1 KiB
Python
from redis.asyncio import Redis as aioredis
|
|
from collections import defaultdict
|
|
import asyncio, time
|
|
|
|
class OrphanStorageIdRegistry:
|
|
def __init__(self, retention_seconds:int=600, redis_client:aioredis=None):
|
|
self.retention_seconds = max(60, int(retention_seconds))
|
|
self.redis = redis_client
|
|
self._lock = asyncio.Lock()
|
|
self._store: dict[tuple[str, str], list[tuple[str, float]]] = defaultdict(list)
|
|
self._prefix = "upload:orphan:"
|
|
|
|
def _key(self, user_id:str, fingerprint:str) -> str:
|
|
return f"{self._prefix}{user_id}:{fingerprint}"
|
|
|
|
def _prune_locked(self, now:float):
|
|
threshold = now - self.retention_seconds
|
|
for key in list(self._store.keys()):
|
|
entries = [entry for entry in self._store[key] if entry[1] >= threshold]
|
|
if entries:
|
|
self._store[key] = entries
|
|
else:
|
|
self._store.pop(key, None)
|
|
|
|
async def remember(self, user_id:str, fingerprint:str|None, storage_id:str):
|
|
if not fingerprint or not storage_id:
|
|
return
|
|
|
|
if self.redis is not None:
|
|
key = self._key(user_id, fingerprint)
|
|
pipe = self.redis.pipeline()
|
|
pipe.lpush(key, storage_id)
|
|
pipe.expire(key, self.retention_seconds)
|
|
await pipe.execute()
|
|
return
|
|
|
|
async with self._lock:
|
|
now = time.time()
|
|
self._prune_locked(now)
|
|
self._store[(user_id, fingerprint)].append((storage_id, now))
|
|
|
|
async def pop_recent(self, user_id:str, fingerprint:str|None) -> str|None:
|
|
if not fingerprint:
|
|
return None
|
|
|
|
if self.redis is not None:
|
|
key = self._key(user_id, fingerprint)
|
|
value = await self.redis.rpop(key)
|
|
if value is None:
|
|
return None
|
|
if isinstance(value, bytes):
|
|
return value.decode("utf-8", errors="ignore")
|
|
return str(value)
|
|
|
|
async with self._lock:
|
|
self._prune_locked(time.time())
|
|
entries = self._store.get((user_id, fingerprint))
|
|
if not entries:
|
|
return None
|
|
storage_id, _ = entries.pop()
|
|
if not entries:
|
|
self._store.pop((user_id, fingerprint), None)
|
|
return storage_id
|
|
|
|
async def close(self):
|
|
if self.redis is not None:
|
|
await self.redis.aclose()
|