Files
simple-nanoshare/routes/side/upload.py
T
daniel156161 42c7047a92
Build and Push Docker Container / build-and-push (push) Successful in 1m28s
show how many ids got deleted and when none got deleted
2025-10-25 15:58:55 +02:00

217 lines
6.6 KiB
Python

from my_modules.decoratory.header import login_required
from quart import Blueprint, request, jsonify, current_app
from datetime import datetime, timedelta, timezone
from pathlib import Path
import aiofiles, asyncio, re
upload_bp = Blueprint("upload_bp", __name__)
# --- Helpers -----------------------------------------------------
PRESET_H = re.compile(r"^(\d+)h$")
PRESET_D = re.compile(r"^(\d+)d$")
def iso_stamp_filename(prefix: str, ext: str) -> str:
"""Generate timestamped filename, e.g. pasted-2025-10-23T121212Z.png"""
ts = datetime.now(timezone.utc).isoformat()
ts = ts.replace(":", "").split(".")[0]
if ts.endswith("+00:00"):
ts = ts.replace("+00:00", "Z")
return f"{prefix}-{ts}.{ext}"
def safe_name(name: str) -> str:
"""Restrict filename to safe ASCII subset."""
return re.sub(r"[^A-Za-z0-9._-]", "_", name)
def parse_expires(value: str | None) -> datetime | None:
"""Parse expiration presets or ISO datetime."""
if not value:
return None
value = value.strip()
if m := PRESET_H.match(value):
return datetime.now(timezone.utc) + timedelta(hours=int(m.group(1)))
if m := PRESET_D.match(value):
return datetime.now(timezone.utc) + timedelta(days=int(m.group(1)))
try:
return datetime.fromisoformat(value.replace("Z", "+00:00")).astimezone(timezone.utc)
except Exception:
return None
def format_size(num_bytes: int) -> str:
"""Return a human-readable file size (e.g., '2.3 MB', '10 Bytes')."""
if num_bytes < 1024:
return f"{num_bytes} Byte{'s' if num_bytes != 1 else ''}"
units = ["KB", "MB", "GB", "TB", "PB", "EB"]
size = float(num_bytes)
for unit in units:
size /= 1024.0
if size < 1024.0 or unit == units[-1]:
# 1 decimal place; drop trailing .0 (optional)
val = f"{size:.1f}"
if val.endswith(".0"):
val = val[:-2]
return f"{val} {unit}"
return f"{num_bytes} Bytes" # fallback
async def read_all(uploaded) -> bytes:
"""Read all bytes from an uploaded file, handling sync or async .read()."""
reader = getattr(uploaded, "read", None)
if reader is None:
return b""
if asyncio.iscoroutinefunction(reader):
return await reader()
data = reader()
if asyncio.iscoroutine(data):
return await data
return data
def ensure_utc(dt):
"""Ensure a timezone-aware UTC datetime or None."""
if dt is None:
return None
if dt.tzinfo is None:
return dt.replace(tzinfo=timezone.utc)
return dt.astimezone(timezone.utc)
# --- Routes ------------------------------------------------------
@upload_bp.post("/api/upload")
@login_required
async def api_upload(user):
"""
POST /upload/api/upload
Accepts:
- multipart form with 'file' or 'text'
- 'expires' can be '1h', '7d', or ISO timestamp
- 'note' optional
"""
form = await request.form
files = await request.files
note = form.get("note", "")
expires_raw = form.get("expires", "")
text = form.get("text", "")
uploaded = files.get("file")
expires_at_dt = ensure_utc(parse_expires(expires_raw))
if not uploaded and not text.strip():
return jsonify({"ok": False, "error": "No content provided"}), 400
content_type = None
# --- binary upload path ---
if uploaded:
fname = uploaded.filename or ""
ctype = uploaded.mimetype or "application/octet-stream"
content_type = ctype
# generate filename if missing/placeholder
if not fname or fname.lower() in {"blob", "file"}:
ext = {
"image/png": "png",
"image/jpeg": "jpg",
"image/gif": "gif",
"image/webp": "webp",
"application/pdf": "pdf",
"text/plain": "txt",
}.get(ctype, "bin")
fname = iso_stamp_filename("pasted", ext)
fname = safe_name(fname)
path = current_app.upload_folder / fname
data = await read_all(uploaded)
# write to disk
async with aiofiles.open(path, "wb") as f:
await f.write(data)
size_bytes = len(data)
file_size_pretty = format_size(size_bytes)
await current_app.edgedb.add_file(
file_name=fname,
file_size=file_size_pretty,
note=note,
content_type=content_type,
uploaded_at=datetime.now(timezone.utc),
expires_at=expires_at_dt,
user_id=user['sub'],
)
# --- text upload path ---
elif text.strip():
data = text.encode("utf-8")
fname = iso_stamp_filename("pasted", "txt")
path = current_app.upload_folder / fname
async with aiofiles.open(path, "wb") as f:
await f.write(data)
size_bytes = len(data)
file_size_pretty = format_size(size_bytes)
await current_app.edgedb.add_file(
file_name=fname,
file_size=file_size_pretty,
note=note,
content_type="text/plain",
uploaded_at=datetime.now(timezone.utc),
expires_at=expires_at_dt,
user_id=user['sub'],
)
return jsonify({"ok": True})
# --- Background cleanup ------------------------------------------------------
async def cleanup_task():
"""Hourly cleanup of expired files based on EdgeDB."""
await asyncio.sleep(3) # allow app startup
while True:
try:
now = datetime.now(timezone.utc)
expired = await current_app.edgedb.get_expired_files(now)
if not expired:
await asyncio.sleep(3600)
continue
upload_dir: Path = current_app.upload_folder # ensure Path
removed_ids: list[str] = []
for rec in expired:
try:
# Defensive: only touch files under your upload dir
fpath = (upload_dir / rec['file_name']).resolve()
if upload_dir.resolve() in fpath.parents or fpath == upload_dir.resolve():
fpath.unlink(missing_ok=True)
removed_ids.append(rec['file_id'])
else:
current_app.logger.warning("Refusing to delete outside upload dir: %s", fpath)
except Exception as e:
current_app.logger.exception("Failed to delete file %s (%s)", rec['file_name'], rec['file_id'])
# Remove DB rows for files we actually deleted from disk
if removed_ids:
try:
await current_app.edgedb.delete_files_by_ids(removed_ids)
current_app.logger.info("Deleted %d expired files from disk and database: %s", len(removed_ids), ", ".join(removed_ids))
except Exception:
current_app.logger.exception("Failed to delete DB rows for expired files")
else:
current_app.logger.info("No files where expired or deleted at %s", now.isoformat())
except Exception:
current_app.logger.exception("Cleanup task iteration failed")
await asyncio.sleep(3600) # every hour
@upload_bp.before_app_serving
async def start_cleanup():
asyncio.create_task(cleanup_task())