from my_modules.decoratory.header import login_required from quart import Blueprint, request, jsonify, current_app from datetime import datetime, timedelta, timezone from pathlib import Path import aiofiles, asyncio, re upload_bp = Blueprint("upload_bp", __name__) # --- Helpers ----------------------------------------------------- PRESET_H = re.compile(r"^(\d+)h$") PRESET_D = re.compile(r"^(\d+)d$") def iso_stamp_filename(prefix: str, ext: str) -> str: """Generate timestamped filename, e.g. pasted-2025-10-23T121212Z.png""" ts = datetime.now(timezone.utc).isoformat() ts = ts.replace(":", "").split(".")[0] if ts.endswith("+00:00"): ts = ts.replace("+00:00", "Z") return f"{prefix}-{ts}.{ext}" def safe_name(name: str) -> str: """Restrict filename to safe ASCII subset.""" return re.sub(r"[^A-Za-z0-9._-]", "_", name) def parse_expires(value: str | None) -> datetime | None: """Parse expiration presets or ISO datetime.""" if not value: return None value = value.strip() if m := PRESET_H.match(value): return datetime.now(timezone.utc) + timedelta(hours=int(m.group(1))) if m := PRESET_D.match(value): return datetime.now(timezone.utc) + timedelta(days=int(m.group(1))) try: return datetime.fromisoformat(value.replace("Z", "+00:00")).astimezone(timezone.utc) except Exception: return None def format_size(num_bytes: int) -> str: """Return a human-readable file size (e.g., '2.3 MB', '10 Bytes').""" if num_bytes < 1024: return f"{num_bytes} Byte{'s' if num_bytes != 1 else ''}" units = ["KB", "MB", "GB", "TB", "PB", "EB"] size = float(num_bytes) for unit in units: size /= 1024.0 if size < 1024.0 or unit == units[-1]: # 1 decimal place; drop trailing .0 (optional) val = f"{size:.1f}" if val.endswith(".0"): val = val[:-2] return f"{val} {unit}" return f"{num_bytes} Bytes" # fallback async def read_all(uploaded) -> bytes: """Read all bytes from an uploaded file, handling sync or async .read().""" reader = getattr(uploaded, "read", None) if reader is None: return b"" if asyncio.iscoroutinefunction(reader): return await reader() data = reader() if asyncio.iscoroutine(data): return await data return data def ensure_utc(dt): """Ensure a timezone-aware UTC datetime or None.""" if dt is None: return None if dt.tzinfo is None: return dt.replace(tzinfo=timezone.utc) return dt.astimezone(timezone.utc) # --- Routes ------------------------------------------------------ @upload_bp.post("/api/upload") @login_required async def api_upload(user): """ POST /upload/api/upload Accepts: - multipart form with 'file' or 'text' - 'expires' can be '1h', '7d', or ISO timestamp - 'note' optional """ form = await request.form files = await request.files note = form.get("note", "") expires_raw = form.get("expires", "") text = form.get("text", "") uploaded = files.get("file") expires_at_dt = ensure_utc(parse_expires(expires_raw)) if not uploaded and not text.strip(): return jsonify({"ok": False, "error": "No content provided"}), 400 content_type = None # --- binary upload path --- if uploaded: fname = uploaded.filename or "" ctype = uploaded.mimetype or "application/octet-stream" content_type = ctype # generate filename if missing/placeholder if not fname or fname.lower() in {"blob", "file"}: ext = { "image/png": "png", "image/jpeg": "jpg", "image/gif": "gif", "image/webp": "webp", "application/pdf": "pdf", "text/plain": "txt", }.get(ctype, "bin") fname = iso_stamp_filename("pasted", ext) fname = safe_name(fname) path = current_app.upload_folder / fname data = await read_all(uploaded) # write to disk async with aiofiles.open(path, "wb") as f: await f.write(data) size_bytes = len(data) file_size_pretty = format_size(size_bytes) await current_app.edgedb.add_file( file_name=fname, file_size=file_size_pretty, note=note, content_type=content_type, uploaded_at=datetime.now(timezone.utc), expires_at=expires_at_dt, user_id=user['sub'], ) # --- text upload path --- elif text.strip(): data = text.encode("utf-8") fname = iso_stamp_filename("pasted", "txt") path = current_app.upload_folder / fname async with aiofiles.open(path, "wb") as f: await f.write(data) size_bytes = len(data) file_size_pretty = format_size(size_bytes) await current_app.edgedb.add_file( file_name=fname, file_size=file_size_pretty, note=note, content_type="text/plain", uploaded_at=datetime.now(timezone.utc), expires_at=expires_at_dt, user_id=user['sub'], ) return jsonify({"ok": True}) # --- Background cleanup ------------------------------------------------------ async def cleanup_task(): """Hourly cleanup of expired files based on EdgeDB.""" await asyncio.sleep(3) # allow app startup while True: try: now = datetime.now(timezone.utc) expired = await current_app.edgedb.get_expired_files(now) if not expired: await asyncio.sleep(3600) continue upload_dir: Path = current_app.upload_folder # ensure Path removed_ids: list[str] = [] for rec in expired: try: # Defensive: only touch files under your upload dir fpath = (upload_dir / rec['file_name']).resolve() if upload_dir.resolve() in fpath.parents or fpath == upload_dir.resolve(): fpath.unlink(missing_ok=True) removed_ids.append(rec['file_id']) else: current_app.logger.warning("Refusing to delete outside upload dir: %s", fpath) except Exception as e: current_app.logger.exception("Failed to delete file %s (%s)", rec['file_name'], rec['file_id']) # Remove DB rows for files we actually deleted from disk if removed_ids: try: await current_app.edgedb.delete_files_by_ids(removed_ids) except Exception: current_app.logger.exception("Failed to delete DB rows for expired files") except Exception: current_app.logger.exception("Cleanup task iteration failed") await asyncio.sleep(3600) # every hour @upload_bp.before_app_serving async def start_cleanup(): asyncio.create_task(cleanup_task())