allow to strem upload to convex and reuse file id when upload error happend
This commit is contained in:
@@ -0,0 +1,67 @@
|
||||
from redis.asyncio import Redis as aioredis
|
||||
from collections import defaultdict
|
||||
import asyncio, time
|
||||
|
||||
class OrphanStorageIdRegistry:
|
||||
def __init__(self, retention_seconds:int=600, redis_client:aioredis=None):
|
||||
self.retention_seconds = max(60, int(retention_seconds))
|
||||
self.redis = redis_client
|
||||
self._lock = asyncio.Lock()
|
||||
self._store: dict[tuple[str, str], list[tuple[str, float]]] = defaultdict(list)
|
||||
self._prefix = "upload:orphan:"
|
||||
|
||||
def _key(self, user_id:str, fingerprint:str) -> str:
|
||||
return f"{self._prefix}{user_id}:{fingerprint}"
|
||||
|
||||
def _prune_locked(self, now:float):
|
||||
threshold = now - self.retention_seconds
|
||||
for key in list(self._store.keys()):
|
||||
entries = [entry for entry in self._store[key] if entry[1] >= threshold]
|
||||
if entries:
|
||||
self._store[key] = entries
|
||||
else:
|
||||
self._store.pop(key, None)
|
||||
|
||||
async def remember(self, user_id:str, fingerprint:str|None, storage_id:str):
|
||||
if not fingerprint or not storage_id:
|
||||
return
|
||||
|
||||
if self.redis is not None:
|
||||
key = self._key(user_id, fingerprint)
|
||||
pipe = self.redis.pipeline()
|
||||
pipe.lpush(key, storage_id)
|
||||
pipe.expire(key, self.retention_seconds)
|
||||
await pipe.execute()
|
||||
return
|
||||
|
||||
async with self._lock:
|
||||
now = time.time()
|
||||
self._prune_locked(now)
|
||||
self._store[(user_id, fingerprint)].append((storage_id, now))
|
||||
|
||||
async def pop_recent(self, user_id:str, fingerprint:str|None) -> str|None:
|
||||
if not fingerprint:
|
||||
return None
|
||||
|
||||
if self.redis is not None:
|
||||
key = self._key(user_id, fingerprint)
|
||||
value = await self.redis.rpop(key)
|
||||
if value is None:
|
||||
return None
|
||||
if isinstance(value, bytes):
|
||||
return value.decode("utf-8", errors="ignore")
|
||||
return str(value)
|
||||
|
||||
async with self._lock:
|
||||
self._prune_locked(time.time())
|
||||
entries = self._store.get((user_id, fingerprint))
|
||||
if not entries:
|
||||
return None
|
||||
storage_id, _ = entries.pop()
|
||||
if not entries:
|
||||
self._store.pop((user_id, fingerprint), None)
|
||||
return storage_id
|
||||
|
||||
async def close(self):
|
||||
if self.redis is not None:
|
||||
await self.redis.aclose()
|
||||
+48
-15
@@ -1,5 +1,12 @@
|
||||
from my_modules.functions import custom_limit_key, get_my_ip_address, get_local_ip_addresses, replace_last_ip_segment, generate_all_ips
|
||||
from my_modules.functions import (
|
||||
custom_limit_key,
|
||||
get_my_ip_address,
|
||||
get_local_ip_addresses,
|
||||
replace_last_ip_segment,
|
||||
generate_all_ips,
|
||||
)
|
||||
from my_modules.app.constens import SECRET_KEY, THE_IP_BOT_MANAGER
|
||||
from my_modules.OrphanStorageIdRegistry import OrphanStorageIdRegistry
|
||||
from my_modules.AsyncCache import AsyncCache
|
||||
from my_modules.app.logger import logger
|
||||
|
||||
@@ -14,7 +21,10 @@ import redis.asyncio as aioredis
|
||||
from quart import Quart
|
||||
import os
|
||||
|
||||
app = Quart(__name__, template_folder="../../templates/side", static_folder="../../templates/static")
|
||||
app = Quart(__name__,
|
||||
template_folder="../../templates/side",
|
||||
static_folder="../../templates/static",
|
||||
)
|
||||
app.config['MAX_CONTENT_LENGTH'] = 1024 * 1024 * 1024
|
||||
|
||||
app.secret_key = SECRET_KEY
|
||||
@@ -28,7 +38,7 @@ if os.getenv("VALKEY_HOST", None) is not None:
|
||||
password=os.getenv('VALKEY_CACHE_PASSWORD', ''),
|
||||
host=os.getenv('VALKEY_HOST'),
|
||||
port=os.getenv('VALKEY_PORT', 6379),
|
||||
db=os.getenv('VALKEY_DB', 0)
|
||||
db=os.getenv('VALKEY_DB', 0),
|
||||
)
|
||||
else:
|
||||
cache = AsyncCache(
|
||||
@@ -37,17 +47,17 @@ else:
|
||||
|
||||
if os.getenv("VALKEY_HOST", None) is not None:
|
||||
app.config.from_mapping(
|
||||
SESSION_TYPE="redis",
|
||||
SESSION_TYPE='redis',
|
||||
SESSION_PERMANENT=True,
|
||||
SESSION_USE_SIGNER=True,
|
||||
SESSION_REDIS = aioredis.Redis(
|
||||
SESSION_REDIS=aioredis.Redis(
|
||||
username=os.getenv('VALKEY_SESSION_USER', None),
|
||||
password=os.getenv('VALKEY_SESSION_PASSWORD', None),
|
||||
host=os.getenv("VALKEY_HOST"),
|
||||
port=os.getenv("VALKEY_PORT", 6379),
|
||||
db=os.getenv("VALKEY_DB", 0),
|
||||
decode_responses=True
|
||||
)
|
||||
host=os.getenv('VALKEY_HOST'),
|
||||
port=os.getenv('VALKEY_PORT', 6379),
|
||||
db=os.getenv('VALKEY_DB', 0),
|
||||
decode_responses=True,
|
||||
),
|
||||
)
|
||||
else:
|
||||
app.config.from_mapping(
|
||||
@@ -60,16 +70,36 @@ LIMITER = Limiter(
|
||||
custom_limit_key,
|
||||
app=app,
|
||||
storage_uri=(
|
||||
f"redis://{os.getenv('VALKEY_LIMITER_USER', '')}:{os.getenv('VALKEY_LIMITER_PASSWORD', '')}"
|
||||
f"@{os.getenv("VALKEY_HOST")}:{os.getenv('VALKEY_PORT', 6379)}/{os.getenv('VALKEY_DB', 0)}"
|
||||
) if os.getenv("VALKEY_HOST") else None,
|
||||
f'redis://{os.getenv('VALKEY_LIMITER_USER', '')}:{os.getenv('VALKEY_LIMITER_PASSWORD', '')}'
|
||||
f'@{os.getenv('VALKEY_HOST')}:{os.getenv('VALKEY_PORT', 6379)}/{os.getenv('VALKEY_DB', 0)}'
|
||||
)
|
||||
if os.getenv('VALKEY_HOST')
|
||||
else None,
|
||||
default_limits=[],
|
||||
strategy='moving-window'
|
||||
strategy='moving-window',
|
||||
)
|
||||
|
||||
convex_runtime = ConvexWorkerPool(os.getenv("CONVEX_URL"))
|
||||
convex_runtime = ConvexWorkerPool(os.getenv('CONVEX_URL'))
|
||||
app.convex_runtime = convex_runtime
|
||||
|
||||
orphan_retention_seconds = max(60, int(os.getenv('UPLOAD_ORPHAN_ID_RETENTION_SECONDS', '600')))
|
||||
if os.getenv('VALKEY_HOST', None) is not None:
|
||||
orphan_redis = aioredis.Redis(
|
||||
username=os.getenv('VALKEY_CACHE_USER', None),
|
||||
password=os.getenv('VALKEY_CACHE_PASSWORD', None),
|
||||
host=str(os.getenv('VALKEY_HOST')),
|
||||
port=int(os.getenv('VALKEY_PORT', 6379)),
|
||||
db=int(os.getenv('VALKEY_DB', 0)),
|
||||
decode_responses=False,
|
||||
)
|
||||
else:
|
||||
orphan_redis = None
|
||||
|
||||
app.orphan_storage_registry = OrphanStorageIdRegistry(
|
||||
retention_seconds=orphan_retention_seconds,
|
||||
redis_client=orphan_redis,
|
||||
)
|
||||
|
||||
@app.before_serving
|
||||
async def init_convex():
|
||||
await convex_runtime.start()
|
||||
@@ -88,4 +118,7 @@ async def init_convex():
|
||||
async def close_convex():
|
||||
if app.convex:
|
||||
await convex_runtime.stop()
|
||||
orphan_registry = getattr(app, 'orphan_storage_registry', None)
|
||||
if orphan_registry:
|
||||
await orphan_registry.close()
|
||||
await logger.shutdown()
|
||||
|
||||
Reference in New Issue
Block a user