diff --git a/my_helpers b/my_helpers index 5882fdd..7d086ad 160000 --- a/my_helpers +++ b/my_helpers @@ -1 +1 @@ -Subproject commit 5882fddcbc30b53436d3ab44c69608e1ebf9e543 +Subproject commit 7d086add75b14d8f8fa54cc673c9cb6b5c8efc3e diff --git a/my_modules/OrphanStorageIdRegistry.py b/my_modules/OrphanStorageIdRegistry.py new file mode 100644 index 0000000..a6168a8 --- /dev/null +++ b/my_modules/OrphanStorageIdRegistry.py @@ -0,0 +1,67 @@ +from redis.asyncio import Redis as aioredis +from collections import defaultdict +import asyncio, time + +class OrphanStorageIdRegistry: + def __init__(self, retention_seconds:int=600, redis_client:aioredis=None): + self.retention_seconds = max(60, int(retention_seconds)) + self.redis = redis_client + self._lock = asyncio.Lock() + self._store: dict[tuple[str, str], list[tuple[str, float]]] = defaultdict(list) + self._prefix = "upload:orphan:" + + def _key(self, user_id:str, fingerprint:str) -> str: + return f"{self._prefix}{user_id}:{fingerprint}" + + def _prune_locked(self, now:float): + threshold = now - self.retention_seconds + for key in list(self._store.keys()): + entries = [entry for entry in self._store[key] if entry[1] >= threshold] + if entries: + self._store[key] = entries + else: + self._store.pop(key, None) + + async def remember(self, user_id:str, fingerprint:str|None, storage_id:str): + if not fingerprint or not storage_id: + return + + if self.redis is not None: + key = self._key(user_id, fingerprint) + pipe = self.redis.pipeline() + pipe.lpush(key, storage_id) + pipe.expire(key, self.retention_seconds) + await pipe.execute() + return + + async with self._lock: + now = time.time() + self._prune_locked(now) + self._store[(user_id, fingerprint)].append((storage_id, now)) + + async def pop_recent(self, user_id:str, fingerprint:str|None) -> str|None: + if not fingerprint: + return None + + if self.redis is not None: + key = self._key(user_id, fingerprint) + value = await self.redis.rpop(key) + if value is None: + return None + if isinstance(value, bytes): + return value.decode("utf-8", errors="ignore") + return str(value) + + async with self._lock: + self._prune_locked(time.time()) + entries = self._store.get((user_id, fingerprint)) + if not entries: + return None + storage_id, _ = entries.pop() + if not entries: + self._store.pop((user_id, fingerprint), None) + return storage_id + + async def close(self): + if self.redis is not None: + await self.redis.aclose() diff --git a/my_modules/app/setup.py b/my_modules/app/setup.py index 8192f40..6bbae32 100644 --- a/my_modules/app/setup.py +++ b/my_modules/app/setup.py @@ -1,5 +1,12 @@ -from my_modules.functions import custom_limit_key, get_my_ip_address, get_local_ip_addresses, replace_last_ip_segment, generate_all_ips +from my_modules.functions import ( + custom_limit_key, + get_my_ip_address, + get_local_ip_addresses, + replace_last_ip_segment, + generate_all_ips, +) from my_modules.app.constens import SECRET_KEY, THE_IP_BOT_MANAGER +from my_modules.OrphanStorageIdRegistry import OrphanStorageIdRegistry from my_modules.AsyncCache import AsyncCache from my_modules.app.logger import logger @@ -14,7 +21,10 @@ import redis.asyncio as aioredis from quart import Quart import os -app = Quart(__name__, template_folder="../../templates/side", static_folder="../../templates/static") +app = Quart(__name__, + template_folder="../../templates/side", + static_folder="../../templates/static", +) app.config['MAX_CONTENT_LENGTH'] = 1024 * 1024 * 1024 app.secret_key = SECRET_KEY @@ -28,7 +38,7 @@ if os.getenv("VALKEY_HOST", None) is not None: password=os.getenv('VALKEY_CACHE_PASSWORD', ''), host=os.getenv('VALKEY_HOST'), port=os.getenv('VALKEY_PORT', 6379), - db=os.getenv('VALKEY_DB', 0) + db=os.getenv('VALKEY_DB', 0), ) else: cache = AsyncCache( @@ -37,17 +47,17 @@ else: if os.getenv("VALKEY_HOST", None) is not None: app.config.from_mapping( - SESSION_TYPE="redis", + SESSION_TYPE='redis', SESSION_PERMANENT=True, SESSION_USE_SIGNER=True, - SESSION_REDIS = aioredis.Redis( + SESSION_REDIS=aioredis.Redis( username=os.getenv('VALKEY_SESSION_USER', None), password=os.getenv('VALKEY_SESSION_PASSWORD', None), - host=os.getenv("VALKEY_HOST"), - port=os.getenv("VALKEY_PORT", 6379), - db=os.getenv("VALKEY_DB", 0), - decode_responses=True - ) + host=os.getenv('VALKEY_HOST'), + port=os.getenv('VALKEY_PORT', 6379), + db=os.getenv('VALKEY_DB', 0), + decode_responses=True, + ), ) else: app.config.from_mapping( @@ -60,16 +70,36 @@ LIMITER = Limiter( custom_limit_key, app=app, storage_uri=( - f"redis://{os.getenv('VALKEY_LIMITER_USER', '')}:{os.getenv('VALKEY_LIMITER_PASSWORD', '')}" - f"@{os.getenv("VALKEY_HOST")}:{os.getenv('VALKEY_PORT', 6379)}/{os.getenv('VALKEY_DB', 0)}" - ) if os.getenv("VALKEY_HOST") else None, + f'redis://{os.getenv('VALKEY_LIMITER_USER', '')}:{os.getenv('VALKEY_LIMITER_PASSWORD', '')}' + f'@{os.getenv('VALKEY_HOST')}:{os.getenv('VALKEY_PORT', 6379)}/{os.getenv('VALKEY_DB', 0)}' + ) + if os.getenv('VALKEY_HOST') + else None, default_limits=[], - strategy='moving-window' + strategy='moving-window', ) -convex_runtime = ConvexWorkerPool(os.getenv("CONVEX_URL")) +convex_runtime = ConvexWorkerPool(os.getenv('CONVEX_URL')) app.convex_runtime = convex_runtime +orphan_retention_seconds = max(60, int(os.getenv('UPLOAD_ORPHAN_ID_RETENTION_SECONDS', '600'))) +if os.getenv('VALKEY_HOST', None) is not None: + orphan_redis = aioredis.Redis( + username=os.getenv('VALKEY_CACHE_USER', None), + password=os.getenv('VALKEY_CACHE_PASSWORD', None), + host=str(os.getenv('VALKEY_HOST')), + port=int(os.getenv('VALKEY_PORT', 6379)), + db=int(os.getenv('VALKEY_DB', 0)), + decode_responses=False, + ) +else: + orphan_redis = None + +app.orphan_storage_registry = OrphanStorageIdRegistry( + retention_seconds=orphan_retention_seconds, + redis_client=orphan_redis, +) + @app.before_serving async def init_convex(): await convex_runtime.start() @@ -88,4 +118,7 @@ async def init_convex(): async def close_convex(): if app.convex: await convex_runtime.stop() + orphan_registry = getattr(app, 'orphan_storage_registry', None) + if orphan_registry: + await orphan_registry.close() await logger.shutdown() diff --git a/quart_common b/quart_common index c6fb94f..be555d8 160000 --- a/quart_common +++ b/quart_common @@ -1 +1 @@ -Subproject commit c6fb94f2c3a6a1e5fd1b5766a60eeb872e775534 +Subproject commit be555d897e9aa2d9f4f5ac6912ecc1eb93769be9 diff --git a/routes/side/upload.py b/routes/side/upload.py index 21e5271..037e23d 100644 --- a/routes/side/upload.py +++ b/routes/side/upload.py @@ -3,17 +3,17 @@ from my_modules.expiry import parse_expires, ensure_utc from my_modules.file_meta import iso_stamp_filename, format_size from quart import Blueprint, request, jsonify, current_app -import asyncio +import asyncio, hashlib -upload_bp = Blueprint("upload_bp", __name__) +upload_bp = Blueprint('upload_bp', __name__) # --- Helpers ----------------------------------------------------- async def read_all(uploaded) -> bytes: """Read all bytes from an uploaded file, handling sync or async .read().""" - reader = getattr(uploaded, "read", None) + reader = getattr(uploaded, 'read', None) if reader is None: - return b"" + return b'' if asyncio.iscoroutinefunction(reader): return await reader() @@ -22,9 +22,41 @@ async def read_all(uploaded) -> bytes: return await data return data + +async def fingerprint_stream(stream, chunk_size:int=1024 * 1024) -> tuple[str|None, int|None]: + if not hasattr(stream, 'seek') or not hasattr(stream, 'tell'): + return None, None + + try: + stream.seek(0) + except Exception: + return None, None + + digest = hashlib.sha256() + size_bytes = 0 + + while True: + chunk = await asyncio.to_thread(stream.read, chunk_size) + if not chunk: + break + size_bytes += len(chunk) + digest.update(chunk) + + try: + stream.seek(0) + except Exception: + return None, None + + return digest.hexdigest(), size_bytes + + +def fingerprint_bytes(data: bytes) -> str: + return hashlib.sha256(data).hexdigest() + + # --- Routes ------------------------------------------------------ -@upload_bp.post("/api/upload") +@upload_bp.post('/api/upload') @login_required async def api_upload(user): """ @@ -36,71 +68,119 @@ async def api_upload(user): """ form = await request.form files = await request.files - note = form.get("note", "") - expires_raw = form.get("expires", "") - text = form.get("text", "") + note = form.get('note', '') + expires_raw = form.get('expires', '') + text = form.get('text', '') + orphan_registry = getattr(current_app, 'orphan_storage_registry', None) - uploaded = files.get("file") + uploaded = files.get('file') expires_at_dt = ensure_utc(parse_expires(expires_raw)) if not uploaded and not text.strip(): - return jsonify({"ok": False, "error": "No content provided"}), 400 + return jsonify({'ok': False, 'error': 'No content provided'}), 400 content_type = None # --- binary upload path --- if uploaded: - fname = uploaded.filename or "" - ctype = uploaded.mimetype or "application/octet-stream" + fname = uploaded.filename or '' + ctype = uploaded.mimetype or 'application/octet-stream' content_type = ctype + storage_id = None + size_bytes = 0 + fingerprint = None + reused_orphan_storage_id = False # generate filename if missing/placeholder - if not fname or fname.lower() in {"blob", "file"}: + if not fname or fname.lower() in {'blob', 'file'}: ext = { - "image/png": "png", - "image/jpeg": "jpg", - "image/gif": "gif", - "image/webp": "webp", - "application/pdf": "pdf", - "text/plain": "txt", - }.get(ctype, "bin") - fname = iso_stamp_filename("pasted", ext) + 'image/png': 'png', + 'image/jpeg': 'jpg', + 'image/gif': 'gif', + 'image/webp': 'webp', + 'application/pdf': 'pdf', + 'text/plain': 'txt', + }.get(ctype, 'bin') + fname = iso_stamp_filename('pasted', ext) - data = await read_all(uploaded) + stream = getattr(uploaded, 'stream', None) - storage_id = await current_app.convex.send_to_storage(data=data, content_type=content_type) + if stream is not None: + fingerprint, detected_size = await fingerprint_stream(stream) + size_bytes = detected_size or 0 + storage_id = ( + await orphan_registry.pop_recent(user['sub'], fingerprint) + if orphan_registry + else None + ) + if storage_id: + reused_orphan_storage_id = True + else: + storage_id, sent_size = await current_app.convex.send_stream_to_storage(stream=stream,content_type=content_type) + size_bytes = sent_size + else: + data = await read_all(uploaded) + fingerprint = fingerprint_bytes(data) + storage_id = ( + await orphan_registry.pop_recent(user['sub'], fingerprint) + if orphan_registry + else None + ) + if storage_id: + reused_orphan_storage_id = True + else: + storage_id = await current_app.convex.send_to_storage(data=data, content_type=content_type) + size_bytes = len(data) - size_bytes = len(data) file_size_pretty = format_size(size_bytes) - await current_app.convex.add_file( - file_name=fname, - file_size=file_size_pretty, - note=note, - content_type=content_type, - expires_at=expires_at_dt, - storage_id=storage_id, - user_id=user['sub'], - ) + try: + await current_app.convex.add_file( + file_name=fname, + file_size=file_size_pretty, + note=note, + content_type=content_type, + expires_at=expires_at_dt, + storage_id=storage_id, + user_id=user['sub'], + ) + except Exception: + if storage_id and not reused_orphan_storage_id and orphan_registry: + await orphan_registry.remember(user['sub'], fingerprint, storage_id) + raise # --- text upload path --- elif text.strip(): - data = text.encode("utf-8") - fname = iso_stamp_filename("pasted", "txt") - - storage_id = await current_app.convex.send_to_storage(data=data, content_type="text/plain") + data = text.encode('utf-8') + fname = iso_stamp_filename('pasted', 'txt') + fingerprint = fingerprint_bytes(data) + storage_id = ( + await orphan_registry.pop_recent(user['sub'], fingerprint) + if orphan_registry + else None + ) + reused_orphan_storage_id = bool(storage_id) + if not storage_id: + storage_id = await current_app.convex.send_to_storage( + data=data, content_type='text/plain' + ) size_bytes = len(data) file_size_pretty = format_size(size_bytes) - await current_app.convex.add_file( - file_name=fname, - file_size=file_size_pretty, - note=note, - content_type="text/plain", - expires_at=expires_at_dt, - storage_id=storage_id, - user_id=user['sub'], - ) + try: + await current_app.convex.add_file( + file_name=fname, + file_size=file_size_pretty, + note=note, + content_type='text/plain', + expires_at=expires_at_dt, + storage_id=storage_id, + user_id=user['sub'], + ) + except Exception: + if not reused_orphan_storage_id and orphan_registry: + await orphan_registry.remember(user['sub'], fingerprint, storage_id) + raise - return jsonify({"ok": True}) + return jsonify({'ok': True})