156 lines
4.5 KiB
Python
156 lines
4.5 KiB
Python
from my_modules.decoratory.header import login_required
|
|
|
|
from quart import Blueprint, request, jsonify, current_app
|
|
from datetime import datetime, timedelta, timezone
|
|
import aiofiles, asyncio, re
|
|
|
|
upload_bp = Blueprint("upload_bp", __name__)
|
|
|
|
# --- Helpers -----------------------------------------------------
|
|
|
|
PRESET_H = re.compile(r"^(\d+)h$")
|
|
PRESET_D = re.compile(r"^(\d+)d$")
|
|
|
|
def iso_stamp_filename(prefix: str, ext: str) -> str:
|
|
"""Generate timestamped filename, e.g. pasted-2025-10-23T121212Z.png"""
|
|
ts = datetime.now(timezone.utc).isoformat()
|
|
ts = ts.replace(":", "").split(".")[0]
|
|
if ts.endswith("+00:00"):
|
|
ts = ts.replace("+00:00", "Z")
|
|
return f"{prefix}-{ts}.{ext}"
|
|
|
|
def parse_expires(value: str | None) -> datetime | None:
|
|
"""Parse expiration presets or ISO datetime."""
|
|
if not value:
|
|
return None
|
|
value = value.strip()
|
|
if m := PRESET_H.match(value):
|
|
return datetime.now(timezone.utc) + timedelta(hours=int(m.group(1)))
|
|
if m := PRESET_D.match(value):
|
|
return datetime.now(timezone.utc) + timedelta(days=int(m.group(1)))
|
|
try:
|
|
return datetime.fromisoformat(value.replace("Z", "+00:00")).astimezone(timezone.utc)
|
|
except Exception:
|
|
return None
|
|
|
|
def format_size(num_bytes: int) -> str:
|
|
"""Return a human-readable file size (e.g., '2.3 MB', '10 Bytes')."""
|
|
if num_bytes < 1024:
|
|
return f"{num_bytes} Byte{'s' if num_bytes != 1 else ''}"
|
|
|
|
units = ["KB", "MB", "GB", "TB", "PB", "EB"]
|
|
size = float(num_bytes)
|
|
for unit in units:
|
|
size /= 1024.0
|
|
if size < 1024.0 or unit == units[-1]:
|
|
# 1 decimal place; drop trailing .0 (optional)
|
|
val = f"{size:.1f}"
|
|
if val.endswith(".0"):
|
|
val = val[:-2]
|
|
return f"{val} {unit}"
|
|
return f"{num_bytes} Bytes" # fallback
|
|
|
|
async def read_all(uploaded) -> bytes:
|
|
"""Read all bytes from an uploaded file, handling sync or async .read()."""
|
|
reader = getattr(uploaded, "read", None)
|
|
if reader is None:
|
|
return b""
|
|
if asyncio.iscoroutinefunction(reader):
|
|
return await reader()
|
|
|
|
data = reader()
|
|
if asyncio.iscoroutine(data):
|
|
return await data
|
|
return data
|
|
|
|
def ensure_utc(dt:datetime):
|
|
"""Ensure a timezone-aware UTC datetime or None."""
|
|
if dt is None:
|
|
return None
|
|
if dt.tzinfo is None:
|
|
return dt.replace(tzinfo=timezone.utc)
|
|
return dt.astimezone(timezone.utc)
|
|
|
|
# --- Routes ------------------------------------------------------
|
|
|
|
@upload_bp.post("/api/upload")
|
|
@login_required
|
|
async def api_upload(user):
|
|
"""
|
|
POST /upload/api/upload
|
|
Accepts:
|
|
- multipart form with 'file' or 'text'
|
|
- 'expires' can be '1h', '7d', or ISO timestamp
|
|
- 'note' optional
|
|
"""
|
|
form = await request.form
|
|
files = await request.files
|
|
note = form.get("note", "")
|
|
expires_raw = form.get("expires", "")
|
|
text = form.get("text", "")
|
|
|
|
uploaded = files.get("file")
|
|
expires_at_dt = ensure_utc(parse_expires(expires_raw))
|
|
|
|
if not uploaded and not text.strip():
|
|
return jsonify({"ok": False, "error": "No content provided"}), 400
|
|
|
|
content_type = None
|
|
|
|
# --- binary upload path ---
|
|
if uploaded:
|
|
fname = uploaded.filename or ""
|
|
ctype = uploaded.mimetype or "application/octet-stream"
|
|
content_type = ctype
|
|
|
|
# generate filename if missing/placeholder
|
|
if not fname or fname.lower() in {"blob", "file"}:
|
|
ext = {
|
|
"image/png": "png",
|
|
"image/jpeg": "jpg",
|
|
"image/gif": "gif",
|
|
"image/webp": "webp",
|
|
"application/pdf": "pdf",
|
|
"text/plain": "txt",
|
|
}.get(ctype, "bin")
|
|
fname = iso_stamp_filename("pasted", ext)
|
|
|
|
data = await read_all(uploaded)
|
|
|
|
storage_id = await current_app.convex.send_to_storage(data=data, content_type=content_type)
|
|
|
|
size_bytes = len(data)
|
|
file_size_pretty = format_size(size_bytes)
|
|
|
|
await current_app.convex.add_file(
|
|
file_name=fname,
|
|
file_size=file_size_pretty,
|
|
note=note,
|
|
content_type=content_type,
|
|
expires_at=expires_at_dt,
|
|
storage_id=storage_id,
|
|
user_id=user['sub'],
|
|
)
|
|
|
|
# --- text upload path ---
|
|
elif text.strip():
|
|
data = text.encode("utf-8")
|
|
fname = iso_stamp_filename("pasted", "txt")
|
|
|
|
storage_id = await current_app.convex.send_to_storage(data=data, content_type="text/plain")
|
|
|
|
size_bytes = len(data)
|
|
file_size_pretty = format_size(size_bytes)
|
|
|
|
await current_app.convex.add_file(
|
|
file_name=fname,
|
|
file_size=file_size_pretty,
|
|
note=note,
|
|
content_type="text/plain",
|
|
expires_at=expires_at_dt,
|
|
storage_id=storage_id,
|
|
user_id=user['sub'],
|
|
)
|
|
|
|
return jsonify({"ok": True})
|