add routes and upload functions

This commit is contained in:
2025-10-24 08:25:57 +02:00
parent 175f382ec0
commit 8c2259d1de
7 changed files with 583 additions and 0 deletions
+6
View File
@@ -0,0 +1,6 @@
from .handeling.basics import basic_bp
from .auth.login import auth_login_bp
from .side.main import side_main_bp
from .side.upload import upload_bp
+139
View File
@@ -0,0 +1,139 @@
from my_modules.app.setup import cache, LIMITER
from my_modules.app.constens import API_GROUP
from my_modules.app.logger import logger
from quart import Blueprint, redirect, url_for, session, request, render_template, abort, make_response, current_app
from authlib.jose import JsonWebKey, JoseError, jwt as authlib_jwt
from authlib.integrations.httpx_client import AsyncOAuth2Client
from jwt import InvalidTokenError
import httpx, uuid, os
auth_login_bp = Blueprint('auth_login', __name__)
# OAuth
OIDC_CLIENT_ID = os.getenv('OIDC_CLIENT_ID', '')
OIDC_CLIENT_SECRET = os.getenv('OIDC_CLIENT_SECRET', '')
OIDC_METADATA_URL = os.getenv('OIDC_METADATA_URL', '')
REDIRECT_URI_SCHEME = os.getenv('REDIRECT_URI_SCHEME', 'http')
async def get_oidc_metadata():
async with httpx.AsyncClient() as client:
response = await client.get(OIDC_METADATA_URL)
response.raise_for_status()
return response.json()
@auth_login_bp.route('/', methods=['GET'])
@LIMITER.limit("5 per minute; 30 per hour")
async def login():
metadata = await get_oidc_metadata()
auth_id = request.cookies.get('auth_id') or str(uuid.uuid4())
nonce = str(uuid.uuid4())
await cache.set(f"oauth:nonce:{auth_id}", nonce, ttl=3603)
client = AsyncOAuth2Client(
client_id=OIDC_CLIENT_ID,
client_secret=OIDC_CLIENT_SECRET,
redirect_uri=url_for('auth_login.auth_callback', _external=True, _scheme=REDIRECT_URI_SCHEME),
scope='openid profile email',
)
uri, state = client.create_authorization_url(
metadata['authorization_endpoint'],
nonce=nonce,
state=auth_id,
)
response = await make_response(redirect(uri))
response.set_cookie('auth_id', auth_id, max_age=3600, httponly=True, secure=True, samesite='Lax')
return response
@auth_login_bp.route('/logout', methods=['GET'])
@LIMITER.limit("10 per minute")
async def logout():
user = session.get('user')
if user:
await logger.info(f'logging out user: {user}')
session.pop('user', None)
session.clear()
return redirect(url_for('side_main.index'))
@auth_login_bp.route('/callback', methods=['GET'])
@LIMITER.limit("5 per minute")
async def auth_callback():
try:
auth_id = request.args.get('state')
code = request.args.get('code')
nonce = await cache.get(f"oauth:nonce:{auth_id}")
if not nonce:
await logger.error('Nonce not found')
return await render_template('views/api/token.htm', error='Nonce not found'), 400
await cache.delete(f"oauth:nonce:{auth_id}")
metadata = await get_oidc_metadata()
client = AsyncOAuth2Client(
client_id=OIDC_CLIENT_ID,
client_secret=OIDC_CLIENT_SECRET,
redirect_uri=url_for('auth_login.auth_callback', _external=True, _scheme=REDIRECT_URI_SCHEME),
scope='openid profile email',
)
# Exchange code for token
token = await client.fetch_token(
metadata['token_endpoint'],
code=code,
grant_type='authorization_code'
)
await logger.debug(f'Auth Callback | token: {token}')
# Decode ID token
id_token = token.get('id_token')
if not id_token:
await logger.error('ID token missing in OAuth response')
return await render_template('views/api/token.htm', error='ID token missing'), 400
# Fetch the JWKs to verify signature
async with httpx.AsyncClient() as http_client:
jwks_resp = await http_client.get(metadata['jwks_uri'])
jwks = jwks_resp.json()
keys = JsonWebKey.import_key_set(jwks)
claims = authlib_jwt.decode(id_token, key=keys)
try:
claims.validate_aud()
claims.validate()
except JoseError as e:
await logger.error(f"JWT validation error: {e}")
return await render_template('views/api/token.htm', error=str(e)), 400
if claims.get('nonce') != nonce:
await logger.error('Nonce mismatch in ID token')
return await render_template('views/api/token.htm', error='Invalid nonce'), 400
await logger.info(f'Auth Callback | user_info: {claims}')
if API_GROUP not in claims.get('groups', []):
await logger.error("You don't have Permissions to Access this API")
return await render_template('views/api/token.htm', error="You don't have Permissions to Access this API"), 403
session['user'] = claims
response = await make_response(redirect(url_for('side_main.index')))
response.set_cookie('auth_id', '', max_age=0, httponly=True, secure=True, samesite='Lax')
return response
except httpx.HTTPError as e:
await logger.error(f"HTTP error during token exchange: {e}")
return await render_template('views/api/token.htm', error="Token exchange failed"), 500
except InvalidTokenError as e:
await logger.error(f"Invalid ID Token: {e}")
return await render_template('views/api/token.htm', error="Invalid ID Token"), 400
except Exception as e:
await logger.exception("Unknown error during OAuth callback")
return await render_template('views/api/token.htm', error=str(e)), 500
+40
View File
@@ -0,0 +1,40 @@
from my_modules.app.setup import LIMITER, cache
from quart import Blueprint, send_from_directory, render_template, current_app
from datetime import datetime
basic_bp = Blueprint('basic', __name__)
@basic_bp.route('/favicon', methods=['GET'])
@basic_bp.route('/favicon.ico', methods=['GET'])
@LIMITER.exempt
async def favicon(cache_key:str='favicon'):
cache_favicon_name = await cache.get(cache_key)
if cache_favicon_name:
file_name = cache_favicon_name
else:
current_year = datetime.now().year
autumn_start = datetime(current_year, 9, 23)
autumn_end = datetime(current_year, 12, 21)
winter_start = datetime(current_year, 12, 21)
winter_end = datetime(current_year, 3, 20)
# Get the current date
current_date = datetime.now()
if autumn_start <= current_date <= autumn_end:
file_name = '1. autumn.gif'
elif current_date >= winter_start or current_date <= winter_end:
file_name = '2. winter.png'
else:
file_name = '0. default.svg'
await cache.set(cache_key, file_name, ttl=21600)
return await send_from_directory(current_app.static_folder, f'images/favicons/{file_name}')
@basic_bp.route('/robots.txt', methods=['GET'])
@LIMITER.limit('3 per day')
async def robots():
return await send_from_directory(current_app.static_folder, f'robots.txt')
+70
View File
@@ -0,0 +1,70 @@
from my_modules.app.setup import app, LIMITER
from my_modules.app.logger import logger
from quart import request, render_template, jsonify, current_app, make_response, redirect, url_for
from my_modules.functions import get_ip, enforce_custom_limit
@app.errorhandler(401)
async def handle_unauthorized(e):
await logger.error(e)
return redirect(url_for('auth_login.login'))
@app.errorhandler(404)
async def not_found(e):
try:
enforce_custom_limit(LIMITER, "404")
except LookupError as e:
return await to_many_requests(e)
if request.path.startswith("/api"):
return jsonify({"error": "Page Not Found", "message": "Oops! The page you are looking for does not exist."}), 404
await logger.error(f"[404] Page Not Found: {request.path}")
await current_app.edgedb.upsert_page_not_found(request.path)
return await render_template('views/basics/error.htm',
title='Page Not Found',
header={'title': '404 - Page Not Found', 'message': "Oops! The page you are looking for does not exist."},
file={'name': '404.webp', 'alt': "Matrix - Neo stoping the Bullets by holding his hand up"},
), 404
@app.errorhandler(429)
async def to_many_requests(e):
message = "We love your enthusiasm, but our server thought it was being DDoSed… by you. The keyboard needs a new set of keys and we need a nap. Try again soon!"
if request.path.startswith("/api") or request.path.endswith('/auth/userinfo') or request.path.endswith('/auth/refresh'):
return jsonify({"error": "Too Many Requests - YOU SHALL NOT PASS (for now)", "message": message}), 429
return await render_template('views/basics/error.htm',
title='Too Many Requests',
header={'title': '429 - YOU SHALL NOT PASS (for now)', 'message': message},
file={'name': '429_JimCarrey.gif', 'alt': "Jim Carrey Tips very fast on a computer keyboard"},
), 429
@app.errorhandler(500)
async def internal_server_error(e):
try:
enforce_custom_limit(LIMITER, "500")
except LookupError as e:
return await to_many_requests(e)
await logger.error(e)
return await render_template('views/basics/error.htm',
title='Internal Server Error',
header={'title': '500 - Internal Server Error', 'message': "It looks like you broke something... but don't worry, we're fixing it! In the meantime, we may or may not have logged your IP address (just kidding... or are we?). Either way, thanks for helping us find new ways to crash our system. Stay curious, hacker-friend!"},
file={'name': '500.webp', 'alt': "Astronaut jumping and clicking on random Buttons as a red alert gone off - They is a Text on the Image saying: Why don't shit Work!?!"},
), 500
@app.errorhandler(504)
async def database_server_error(e):
try:
enforce_custom_limit(LIMITER, "504")
except LookupError as e:
return await to_many_requests(e)
await logger.error(e)
return await render_template('views/basics/error.htm',
title='Database Error',
header={'title': '504 - Database Error', 'message': "It looks like something is broke on our end... but don't worry, we're fixing it! Either way, thanks for helping us find new ways to crash our system. Stay curious, hacker-friend!"},
file={'name': '504.gif', 'alt': "Hex Code running over a screen and ends with Error"},
), 504
+79
View File
@@ -0,0 +1,79 @@
from my_modules.file_helper_functions import verify_signed_url
from my_modules.decoratory.header import login_required
from my_modules.app.setup import LIMITER
from my_modules.app.logger import logger
from quart import Blueprint, request, session, Response, send_from_directory, render_template, abort, current_app
from datetime import datetime, timezone
side_main_bp = Blueprint('side_main', __name__)
@side_main_bp.route('/')
@LIMITER.limit("10 per minute")
async def index():
if session.get("user") is not None:
return await render_template("views/webpage/upload.htm")
return await render_template("views/webpage/index.htm")
@side_main_bp.route('/files')
@LIMITER.limit("10 per minute")
@login_required
async def files(user):
files_data = await current_app.edgedb.get_files(current_datetime=datetime.now(timezone.utc), user_id=user['sub'])
return await render_template("views/webpage/files_list.htm", files=files_data)
@side_main_bp.route('/files/<file_id>/info')
@LIMITER.limit("10 per minute")
@login_required
async def file_info(file_id, user):
files_data = await current_app.edgedb.get_files(user_id=user['sub'])
return await render_template("views/webpage/.htm", files=files_data)
@side_main_bp.route('/files/<file_id>/edit')
@LIMITER.limit("10 per minute")
@login_required
async def file_edit(file_id, user):
files_data = await current_app.edgedb.get_files(user_id=user['sub'])
return await render_template("views/webpage/.htm", files=files_data)
def is_expired(expires_at):
if not expires_at:
return False
if expires_at.tzinfo is None:
expires_at = expires_at.replace(tzinfo=timezone.utc)
else:
expires_at = expires_at.astimezone(timezone.utc)
return expires_at <= datetime.now(timezone.utc)
@side_main_bp.route("/-<file_id>")
@LIMITER.limit("10 per minute")
async def serve_file(file_id: str):
file_data = await current_app.edgedb.get_file(file_id=file_id)
if not file_data:
abort(404)
if is_expired(file_data.get("expires_at")):
return Response("This file has expired.", status=410, headers={
"Cache-Control": "no-store",
"X-Content-Type-Options": "nosniff",
})
file_name = file_data.get("file_name")
content_type = file_data.get("content_type") or "application/octet-stream"
force_download = request.args.get("download") in {"1", "true", "yes"}
path = current_app.upload_folder / file_name
if not path.exists() or not path.is_file():
abort(404)
return await send_from_directory(
directory=current_app.upload_folder,
file_name=file_name,
mimetype=content_type,
as_attachment=force_download,
attachment_filename=file_name,
conditional=True,
cache_timeout=60,
last_modified=path.stat().st_mtime
)
+216
View File
@@ -0,0 +1,216 @@
from my_modules.file_helper_functions import generate_short_id
from my_modules.decoratory.header import login_required
from quart import Blueprint, request, jsonify, current_app
from datetime import datetime, timedelta, timezone
from pathlib import Path
import aiofiles, asyncio, re
upload_bp = Blueprint("upload_bp", __name__)
# --- Helpers -----------------------------------------------------
PRESET_H = re.compile(r"^(\d+)h$")
PRESET_D = re.compile(r"^(\d+)d$")
def iso_stamp_filename(prefix: str, ext: str) -> str:
"""Generate timestamped filename, e.g. pasted-2025-10-23T121212Z.png"""
ts = datetime.now(timezone.utc).isoformat()
ts = ts.replace(":", "").split(".")[0]
if ts.endswith("+00:00"):
ts = ts.replace("+00:00", "Z")
return f"{prefix}-{ts}.{ext}"
def safe_name(name: str) -> str:
"""Restrict filename to safe ASCII subset."""
return re.sub(r"[^A-Za-z0-9._-]", "_", name)
def parse_expires(value: str | None) -> datetime | None:
"""Parse expiration presets or ISO datetime."""
if not value:
return None
value = value.strip()
if m := PRESET_H.match(value):
return datetime.now(timezone.utc) + timedelta(hours=int(m.group(1)))
if m := PRESET_D.match(value):
return datetime.now(timezone.utc) + timedelta(days=int(m.group(1)))
try:
return datetime.fromisoformat(value.replace("Z", "+00:00")).astimezone(timezone.utc)
except Exception:
return None
def format_size(num_bytes: int) -> str:
"""Return a human-readable file size (e.g., '2.3 MB', '10 Bytes')."""
if num_bytes < 1024:
return f"{num_bytes} Byte{'s' if num_bytes != 1 else ''}"
units = ["KB", "MB", "GB", "TB", "PB", "EB"]
size = float(num_bytes)
for unit in units:
size /= 1024.0
if size < 1024.0 or unit == units[-1]:
# 1 decimal place; drop trailing .0 (optional)
val = f"{size:.1f}"
if val.endswith(".0"):
val = val[:-2]
return f"{val} {unit}"
return f"{num_bytes} Bytes" # fallback
async def read_all(uploaded) -> bytes:
"""Read all bytes from an uploaded file, handling sync or async .read()."""
reader = getattr(uploaded, "read", None)
if reader is None:
return b""
if asyncio.iscoroutinefunction(reader):
return await reader()
data = reader()
if asyncio.iscoroutine(data):
return await data
return data
def ensure_utc(dt):
"""Ensure a timezone-aware UTC datetime or None."""
if dt is None:
return None
if dt.tzinfo is None:
return dt.replace(tzinfo=timezone.utc)
return dt.astimezone(timezone.utc)
# --- Routes ------------------------------------------------------
@upload_bp.post("/api/upload")
@login_required
async def api_upload(user):
"""
POST /upload/api/upload
Accepts:
- multipart form with 'file' or 'text'
- 'expires' can be '1h', '7d', or ISO timestamp
- 'note' optional
"""
form = await request.form
files = await request.files
note = form.get("note", "")
expires_raw = form.get("expires", "")
text = form.get("text", "")
uploaded = files.get("file")
expires_at_dt = ensure_utc(parse_expires(expires_raw))
if not uploaded and not text.strip():
return jsonify({"ok": False, "error": "No content provided"}), 400
content_type = None
# --- binary upload path ---
if uploaded:
fname = uploaded.filename or ""
ctype = uploaded.mimetype or "application/octet-stream"
content_type = ctype
# generate filename if missing/placeholder
if not fname or fname.lower() in {"blob", "file"}:
ext = {
"image/png": "png",
"image/jpeg": "jpg",
"image/gif": "gif",
"image/webp": "webp",
"application/pdf": "pdf",
"text/plain": "txt",
}.get(ctype, "bin")
fname = iso_stamp_filename("pasted", ext)
fname = safe_name(fname)
path = current_app.upload_folder / fname
data = await read_all(uploaded)
# write to disk
async with aiofiles.open(path, "wb") as f:
await f.write(data)
size_bytes = len(data)
file_size_pretty = format_size(size_bytes)
await current_app.edgedb.add_file(
file_id=generate_short_id(),
file_name=fname,
file_size=file_size_pretty,
note=note,
content_type=content_type,
uploaded_at=datetime.now(timezone.utc),
expires_at=expires_at_dt,
user_id=user['sub'],
)
# --- text upload path ---
elif text.strip():
data = text.encode("utf-8")
fname = iso_stamp_filename("pasted", "txt")
path = current_app.upload_folder / fname
async with aiofiles.open(path, "wb") as f:
await f.write(data)
size_bytes = len(data)
file_size_pretty = format_size(size_bytes)
await current_app.edgedb.add_file(
file_id=generate_short_id(),
file_name=fname,
file_size=file_size_pretty,
note=note,
content_type="text/plain",
uploaded_at=datetime.now(timezone.utc),
expires_at=expires_at_dt,
user_id=user['sub'],
)
return jsonify({"ok": True})
# --- Background cleanup ------------------------------------------------------
async def cleanup_task():
"""Hourly cleanup of expired files based on EdgeDB."""
await asyncio.sleep(3) # allow app startup
while True:
try:
now = datetime.now(timezone.utc)
expired = await current_app.edgedb.get_expired_files(now)
if not expired:
await asyncio.sleep(3600)
continue
upload_dir: Path = current_app.upload_folder # ensure Path
removed_ids: list[str] = []
for rec in expired:
try:
# Defensive: only touch files under your upload dir
fpath = (upload_dir / rec['file_name']).resolve()
if upload_dir.resolve() in fpath.parents or fpath == upload_dir.resolve():
fpath.unlink(missing_ok=True)
removed_ids.append(rec['file_id'])
else:
current_app.logger.warning("Refusing to delete outside upload dir: %s", fpath)
except Exception as e:
current_app.logger.exception("Failed to delete file %s (%s)", rec['file_name'], rec['file_id'])
# Remove DB rows for files we actually deleted from disk
if removed_ids:
try:
await current_app.edgedb.delete_files_by_ids(removed_ids)
except Exception:
current_app.logger.exception("Failed to delete DB rows for expired files")
except Exception:
current_app.logger.exception("Cleanup task iteration failed")
await asyncio.sleep(3600) # every hour
@upload_bp.before_app_serving
async def start_cleanup():
asyncio.create_task(cleanup_task())