Files
simple-nanoshare/my_modules/OrphanStorageIdRegistry.py
T

68 lines
2.1 KiB
Python

from redis.asyncio import Redis as aioredis
from collections import defaultdict
import asyncio, time
class OrphanStorageIdRegistry:
def __init__(self, retention_seconds:int=600, redis_client:aioredis=None):
self.retention_seconds = max(60, int(retention_seconds))
self.redis = redis_client
self._lock = asyncio.Lock()
self._store: dict[tuple[str, str], list[tuple[str, float]]] = defaultdict(list)
self._prefix = "upload:orphan:"
def _key(self, user_id:str, fingerprint:str) -> str:
return f"{self._prefix}{user_id}:{fingerprint}"
def _prune_locked(self, now:float):
threshold = now - self.retention_seconds
for key in list(self._store.keys()):
entries = [entry for entry in self._store[key] if entry[1] >= threshold]
if entries:
self._store[key] = entries
else:
self._store.pop(key, None)
async def remember(self, user_id:str, fingerprint:str|None, storage_id:str):
if not fingerprint or not storage_id:
return
if self.redis is not None:
key = self._key(user_id, fingerprint)
pipe = self.redis.pipeline()
pipe.lpush(key, storage_id)
pipe.expire(key, self.retention_seconds)
await pipe.execute()
return
async with self._lock:
now = time.time()
self._prune_locked(now)
self._store[(user_id, fingerprint)].append((storage_id, now))
async def pop_recent(self, user_id:str, fingerprint:str|None) -> str|None:
if not fingerprint:
return None
if self.redis is not None:
key = self._key(user_id, fingerprint)
value = await self.redis.rpop(key)
if value is None:
return None
if isinstance(value, bytes):
return value.decode("utf-8", errors="ignore")
return str(value)
async with self._lock:
self._prune_locked(time.time())
entries = self._store.get((user_id, fingerprint))
if not entries:
return None
storage_id, _ = entries.pop()
if not entries:
self._store.pop((user_id, fingerprint), None)
return storage_id
async def close(self):
if self.redis is not None:
await self.redis.aclose()