from redis.asyncio import Redis as aioredis from collections import defaultdict import asyncio, time class OrphanStorageIdRegistry: def __init__(self, retention_seconds:int=600, redis_client:aioredis=None): self.retention_seconds = max(60, int(retention_seconds)) self.redis = redis_client self._lock = asyncio.Lock() self._store: dict[tuple[str, str], list[tuple[str, float]]] = defaultdict(list) self._prefix = "upload:orphan:" def _key(self, user_id:str, fingerprint:str) -> str: return f"{self._prefix}{user_id}:{fingerprint}" def _prune_locked(self, now:float): threshold = now - self.retention_seconds for key in list(self._store.keys()): entries = [entry for entry in self._store[key] if entry[1] >= threshold] if entries: self._store[key] = entries else: self._store.pop(key, None) async def remember(self, user_id:str, fingerprint:str|None, storage_id:str): if not fingerprint or not storage_id: return if self.redis is not None: key = self._key(user_id, fingerprint) pipe = self.redis.pipeline() pipe.lpush(key, storage_id) pipe.expire(key, self.retention_seconds) await pipe.execute() return async with self._lock: now = time.time() self._prune_locked(now) self._store[(user_id, fingerprint)].append((storage_id, now)) async def pop_recent(self, user_id:str, fingerprint:str|None) -> str|None: if not fingerprint: return None if self.redis is not None: key = self._key(user_id, fingerprint) value = await self.redis.rpop(key) if value is None: return None if isinstance(value, bytes): return value.decode("utf-8", errors="ignore") return str(value) async with self._lock: self._prune_locked(time.time()) entries = self._store.get((user_id, fingerprint)) if not entries: return None storage_id, _ = entries.pop() if not entries: self._store.pop((user_id, fingerprint), None) return storage_id async def close(self): if self.redis is not None: await self.redis.aclose()