feat(logging): add NanoShare wide event instrumentation
Build and Push Docker Container / build-and-push (push) Failing after 51s

- Register quart_common wide-event logging during app setup so every HTTP request emits one canonical structured event.

- Replace the inline security middleware with reusable quart_common security middleware wiring and move skip path configuration into app constants.

- Add NanoShare-specific wide-event context for health checks, auth/error handlers, file list/edit/delete/serve flows and upload outcomes.

- Rename runtime logging/project metadata from simple-picoshare to nanoshare where it is emitted in service context.

- Update my_helpers and quart_common submodules for Convex/wide-event integration and reusable security middleware support.

- Add NanoShare middleware tests covering safe user context, client IP enrichment, missing Convex handling and Convex security lookup failures.
This commit is contained in:
2026-05-13 20:22:43 +02:00
parent 26536a3cde
commit 9c731d6e67
12 changed files with 208 additions and 40 deletions
+3
View File
@@ -19,6 +19,9 @@ API_GROUP = os.getenv("API_GROUP", 'NanoShare')
THE_IP_BOT_MANAGER = TheIPManager() THE_IP_BOT_MANAGER = TheIPManager()
SKIP_PATH_PREFIXES = ("/static", "/storage")
SKIP_PATHS = ("/favicon.ico",)
# Blocke IPs (Bots, Hackers) # Blocke IPs (Bots, Hackers)
BLOCKED_IPS_ACCESSING_TIMES = int(os.getenv("BLOCKE_IPS_AFTER_ACCESSING_HOWMANY_TIME", 5)) BLOCKED_IPS_ACCESSING_TIMES = int(os.getenv("BLOCKE_IPS_AFTER_ACCESSING_HOWMANY_TIME", 5))
BLOCKED_IPS_STORED_TIMEFRAME = int(os.getenv("BLOCKE_IPS_STORE_KEYS_TIMEFRAME", 3600)) BLOCKED_IPS_STORED_TIMEFRAME = int(os.getenv("BLOCKE_IPS_STORE_KEYS_TIMEFRAME", 3600))
+1 -1
View File
@@ -1,3 +1,3 @@
from quart_common.web.logger import build_logger from quart_common.web.logger import build_logger
logger = build_logger(name="simple-picoshare") logger = build_logger(name="nanoshare")
+2
View File
@@ -9,6 +9,7 @@ from my_modules.app.constens import SECRET_KEY, THE_IP_BOT_MANAGER
from my_modules.OrphanStorageIdRegistry import OrphanStorageIdRegistry from my_modules.OrphanStorageIdRegistry import OrphanStorageIdRegistry
from my_modules.AsyncCache import AsyncCache from my_modules.AsyncCache import AsyncCache
from my_modules.app.logger import logger from my_modules.app.logger import logger
from quart_common.web.wide_event import register_wide_event_logging
from my_helpers.db.convex.ConvexRuntime import ConvexRuntime from my_helpers.db.convex.ConvexRuntime import ConvexRuntime
from my_helpers.db.convex.ConvexWorkerPool import ConvexWorkerPool from my_helpers.db.convex.ConvexWorkerPool import ConvexWorkerPool
@@ -26,6 +27,7 @@ app = Quart(__name__,
static_folder="../../templates/static", static_folder="../../templates/static",
) )
app.config['MAX_CONTENT_LENGTH'] = 1024 * 1024 * 1024 app.config['MAX_CONTENT_LENGTH'] = 1024 * 1024 * 1024
register_wide_event_logging(app, logger)
app.secret_key = SECRET_KEY app.secret_key = SECRET_KEY
+12 -36
View File
@@ -1,47 +1,23 @@
from routes.handeling.errorsAndBots import maybe_a_hacker from routes.handeling.errorsAndBots import maybe_a_hacker
from my_modules.app.constens import THE_IP_BOT_MANAGER from my_modules.app.constens import THE_IP_BOT_MANAGER, SKIP_PATH_PREFIXES, SKIP_PATHS
from my_modules.app.logger import logger from my_modules.app.logger import logger
from my_modules.functions import get_ip from my_modules.functions import get_ip
from my_modules.app.setup import app from my_modules.app.setup import app
from quart_common.web.security_middleware import register_security_middleware
from quart import request, render_template, current_app, session from quart import session
from datetime import datetime from datetime import datetime
@app.before_request custom_middleware = register_security_middleware(
async def custom_middleware(): app,
if session.get('user'): # only if session already has data, update redis expire time logger=logger,
session.permanent = True ip_bot_manager=THE_IP_BOT_MANAGER,
get_ip=get_ip,
client_ip = get_ip() maybe_hacker_fn=maybe_a_hacker,
path = request.path skip_paths=SKIP_PATHS,
method = request.method skip_path_prefixes=SKIP_PATH_PREFIXES,
)
db_whitelisted_or_blocked = await current_app.convex.is_ip_address_whitelisted_or_blocked(ip_address=client_ip)
# Skip allowed IPs or non-critical assets
if (
db_whitelisted_or_blocked['whiteliste']
or THE_IP_BOT_MANAGER.is_client_ip_always_allowed(client_ip)
or "static" in path
or "favicon" in path
or "storage" in path
):
return
# 2. If IP is already blocked
if db_whitelisted_or_blocked['blocked']:
await logger.error(f"[BLOCKED] {method} | {client_ip} tried {method} {path}")
await current_app.convex.increment_blocked_ip_address_access(ip_address=client_ip, method=method, path=path)
return await render_template("views/basics/blocked_access.htm", remote_addr=client_ip), 403
# 3. If path contains honeypot targets
if await current_app.convex.is_path_blocked(path=path):
await logger.warning(f"[HONEYPOT] {method} | {client_ip} accessed {path}")
await current_app.convex.increment_blocked_path_access(path=path)
return await maybe_a_hacker()
await logger.info(f"{method} | {client_ip} had accessed the Side {path}")
@app.context_processor @app.context_processor
async def inject_context_data(): async def inject_context_data():
+7 -1
View File
@@ -1,5 +1,5 @@
[project] [project]
name = "simple-picoshare" name = "nanoshare"
version = "0.1.0" version = "0.1.0"
description = "Add your description here" description = "Add your description here"
readme = "README.md" readme = "README.md"
@@ -62,6 +62,12 @@ dependencies = [
"yarl==1.23.0", "yarl==1.23.0",
] ]
[tool.pytest.ini_options]
testpaths = [
"tests",
"quart_common/tests",
]
[tool.uv.workspace] [tool.uv.workspace]
members = [ members = [
"quart-session", "quart-session",
+4
View File
@@ -1,6 +1,7 @@
from my_modules.decoratory.header import format_response, feature_flag_required from my_modules.decoratory.header import format_response, feature_flag_required
from my_modules.app.setup import LIMITER from my_modules.app.setup import LIMITER
from my_modules.app.logger import logger from my_modules.app.logger import logger
from quart_common.web.wide_event import add_wide_event_context
from quart import Blueprint, current_app from quart import Blueprint, current_app
@@ -11,8 +12,10 @@ health_bp = Blueprint("health", __name__)
@feature_flag_required("convex_health", fallback=False, status_code=404) @feature_flag_required("convex_health", fallback=False, status_code=404)
@format_response @format_response
async def get_convex_health_for_api(): async def get_convex_health_for_api():
add_wide_event_context(health={"check": "convex"})
runtime = getattr(current_app, "convex_runtime", None) runtime = getattr(current_app, "convex_runtime", None)
if runtime is None: if runtime is None:
add_wide_event_context(health={"status": "unavailable", "reason": "runtime_missing"})
return { return {
"status": "unavailable", "status": "unavailable",
"description": "Convex runtime is not attached to app", "description": "Convex runtime is not attached to app",
@@ -20,6 +23,7 @@ async def get_convex_health_for_api():
is_alive = await runtime.is_alive() is_alive = await runtime.is_alive()
metrics = runtime.get_metrics() metrics = runtime.get_metrics()
add_wide_event_context(health={"status": "ok" if is_alive else "unhealthy"})
return { return {
"alive": is_alive, "alive": is_alive,
+9
View File
@@ -12,6 +12,7 @@ from my_modules.functions import (
get_request_context, get_request_context,
) )
from quart_common.web.env import is_development_environment from quart_common.web.env import is_development_environment
from quart_common.web.wide_event import add_wide_event_context
IGNORED_404_PATHS = [ IGNORED_404_PATHS = [
"/.well-known/", "/.well-known/",
@@ -64,6 +65,7 @@ async def auth_error(message:str, status_code:int=400):
'Authentication failed. Please try again or contact an administrator.' 'Authentication failed. Please try again or contact an administrator.'
) )
add_wide_event_context(auth={"operation_status": "error"}, error={"type": "AuthenticationError", "message": message})
logger.error(f"[AUTH:{status_code}] {message}") logger.error(f"[AUTH:{status_code}] {message}")
if context and context.path.startswith("/api"): if context and context.path.startswith("/api"):
@@ -78,6 +80,7 @@ async def auth_error(message:str, status_code:int=400):
@app.errorhandler(401) @app.errorhandler(401)
async def handle_unauthorized(e): async def handle_unauthorized(e):
context = get_request_context() context = get_request_context()
add_wide_event_context(auth={"operation_status": "unauthorized"}, error={"type": type(e).__name__, "message": str(e)})
if context.path.startswith("/api"): if context.path.startswith("/api"):
return jsonify({"error": "Unauthorized Access", "message": "Gandalf has spoken: You shall not pass… until you log in."}), 401 return jsonify({"error": "Unauthorized Access", "message": "Gandalf has spoken: You shall not pass… until you log in."}), 401
@@ -109,6 +112,7 @@ async def not_found(e):
path=context.path, status=404 path=context.path, status=404
) )
add_wide_event_context(error={"type": "NotFound", "message": str(e)})
logger.error(f"[404] Page Not Found: {context.path}") logger.error(f"[404] Page Not Found: {context.path}")
if context.path.startswith("/api"): if context.path.startswith("/api"):
@@ -131,6 +135,7 @@ async def maybe_a_hacker(e=None):
method=request.method, method=request.method,
path=request.path, path=request.path,
) )
add_wide_event_context(security={"blocked": True, "block_reason": "honeypot_rate_limit"})
logger.warning(f"[HONEYPOT] Blocked {client_ip} after accessing {request.path}") logger.warning(f"[HONEYPOT] Blocked {client_ip} after accessing {request.path}")
return await to_many_requests(e) return await to_many_requests(e)
@@ -140,6 +145,7 @@ async def maybe_a_hacker(e=None):
file={'name': 'hacker_crap.webp', 'alt': "Someone got Hacked and he says I hate this Hacker crap - Jurassic Park Movie"}, file={'name': 'hacker_crap.webp', 'alt': "Someone got Hacked and he says I hate this Hacker crap - Jurassic Park Movie"},
) )
add_wide_event_context(security={"blocked": True, "block_reason": "honeypot"})
response = await make_response((rendered, 418)) response = await make_response((rendered, 418))
response.headers['X-Honeypot-Triggered'] = 'true' response.headers['X-Honeypot-Triggered'] = 'true'
response.headers['X-Reason'] = 'Unauthorized access attempt' response.headers['X-Reason'] = 'Unauthorized access attempt'
@@ -151,6 +157,7 @@ async def to_many_requests(e):
message = "We love your enthusiasm, but our server thought it was being DDoSed… by you. The keyboard needs a new set of keys and we need a nap. Try again soon!" message = "We love your enthusiasm, but our server thought it was being DDoSed… by you. The keyboard needs a new set of keys and we need a nap. Try again soon!"
context = get_request_context() context = get_request_context()
add_wide_event_context(rate_limit={"limited": True}, error={"type": type(e).__name__, "message": str(e)})
if context.path.startswith("/api") or context.path.endswith('/auth/userinfo') or context.path.endswith('/auth/refresh'): if context.path.startswith("/api") or context.path.endswith('/auth/userinfo') or context.path.endswith('/auth/refresh'):
return jsonify({"error": "Too Many Requests - YOU SHALL NOT PASS (for now)", "message": message}), 429 return jsonify({"error": "Too Many Requests - YOU SHALL NOT PASS (for now)", "message": message}), 429
@@ -168,6 +175,7 @@ async def internal_server_error(e):
return await to_many_requests(e) return await to_many_requests(e)
context = get_request_context() context = get_request_context()
add_wide_event_context(error={"type": type(e).__name__, "message": str(e)})
if context.path.startswith("/api"): if context.path.startswith("/api"):
return jsonify({"error": "Internal Server Error", "message": "It looks like you broke something... but don't worry, we're fixing it! In the meantime, we may or may not have logged your IP address (just kidding... or are we?). Either way, thanks for helping us find new ways to crash our system. Stay curious, hacker-friend!"}), 500 return jsonify({"error": "Internal Server Error", "message": "It looks like you broke something... but don't worry, we're fixing it! In the meantime, we may or may not have logged your IP address (just kidding... or are we?). Either way, thanks for helping us find new ways to crash our system. Stay curious, hacker-friend!"}), 500
@@ -185,6 +193,7 @@ async def database_server_error(e):
except LookupError as e: except LookupError as e:
return await to_many_requests(e) return await to_many_requests(e)
add_wide_event_context(error={"type": type(e).__name__, "message": str(e)})
logger.error(e) logger.error(e)
return await render_template('views/basics/error.htm', return await render_template('views/basics/error.htm',
title='Database Error', title='Database Error',
+12
View File
@@ -3,6 +3,7 @@ from my_modules.functions import get_ip
from my_modules.app.setup import LIMITER from my_modules.app.setup import LIMITER
from my_modules.app.logger import logger from my_modules.app.logger import logger
from my_modules.expiry import parse_expires from my_modules.expiry import parse_expires
from quart_common.web.wide_event import add_wide_event_context
from quart import ( from quart import (
Blueprint, Blueprint,
@@ -40,12 +41,14 @@ async def index():
@side_main_bp.route('/access') @side_main_bp.route('/access')
@login_required @login_required
async def access_list(user): async def access_list(user):
add_wide_event_context(nanoshare={"operation": "access_list"})
access_data = await current_app.convex.get_all_access(user_id=user['sub']) access_data = await current_app.convex.get_all_access(user_id=user['sub'])
return await render_template("views/webpage/access/list.htm", access_logs=access_data) return await render_template("views/webpage/access/list.htm", access_logs=access_data)
@side_main_bp.route('/files') @side_main_bp.route('/files')
@login_required @login_required
async def files_list(user): async def files_list(user):
add_wide_event_context(nanoshare={"operation": "files_list"})
files_data = await current_app.convex.get_files(user_id=user['sub']) files_data = await current_app.convex.get_files(user_id=user['sub'])
return await render_template("views/webpage/files/list.htm", return await render_template("views/webpage/files/list.htm",
files=files_data files=files_data
@@ -54,6 +57,7 @@ async def files_list(user):
@side_main_bp.route('/files/<path:file_id>/info') @side_main_bp.route('/files/<path:file_id>/info')
@login_required @login_required
async def file_info(file_id, user): async def file_info(file_id, user):
add_wide_event_context(nanoshare={"operation": "file_info", "file_id": file_id})
files_data = await current_app.convex.get_files(user_id=user["sub"]) files_data = await current_app.convex.get_files(user_id=user["sub"])
file_data = find_file(files_data, file_id) file_data = find_file(files_data, file_id)
if not file_data: if not file_data:
@@ -71,6 +75,7 @@ async def file_info(file_id, user):
@side_main_bp.route("/files/<path:file_id>/edit") @side_main_bp.route("/files/<path:file_id>/edit")
@login_required @login_required
async def file_edit(file_id, user): async def file_edit(file_id, user):
add_wide_event_context(nanoshare={"operation": "file_edit", "file_id": file_id})
file_data = await current_app.convex.get_file_informations(file_id=file_id, user_id=user["sub"]) file_data = await current_app.convex.get_file_informations(file_id=file_id, user_id=user["sub"])
if not file_data: if not file_data:
abort(404) abort(404)
@@ -83,6 +88,7 @@ async def file_edit(file_id, user):
@side_main_bp.put("/api/file/<path:file_id>") @side_main_bp.put("/api/file/<path:file_id>")
@login_required @login_required
async def file_edit_api(file_id, user): async def file_edit_api(file_id, user):
add_wide_event_context(nanoshare={"operation": "file_update", "file_id": file_id})
files_data = await current_app.convex.get_files(user_id=user["sub"]) files_data = await current_app.convex.get_files(user_id=user["sub"])
if not find_file(files_data, file_id): if not find_file(files_data, file_id):
return jsonify({"ok": False, "error": "File not found"}), 404 return jsonify({"ok": False, "error": "File not found"}), 404
@@ -115,6 +121,7 @@ async def file_edit_api(file_id, user):
@side_main_bp.delete("/api/file/<path:file_id>") @side_main_bp.delete("/api/file/<path:file_id>")
@login_required @login_required
async def file_delete_api(file_id, user): async def file_delete_api(file_id, user):
add_wide_event_context(nanoshare={"operation": "file_delete", "file_id": file_id})
files_data = await current_app.convex.get_files(user_id=user["sub"]) files_data = await current_app.convex.get_files(user_id=user["sub"])
if not find_file(files_data, file_id): if not find_file(files_data, file_id):
return jsonify({"ok": False, "error": "File not found"}), 404 return jsonify({"ok": False, "error": "File not found"}), 404
@@ -125,10 +132,12 @@ async def file_delete_api(file_id, user):
@side_main_bp.route("/-<file_id>") @side_main_bp.route("/-<file_id>")
@LIMITER.limit("10 per minute;500 per hour;") @LIMITER.limit("10 per minute;500 per hour;")
async def serve_file(file_id: str): async def serve_file(file_id: str):
add_wide_event_context(nanoshare={"operation": "serve_file", "file_id": file_id})
file_data = await current_app.convex.get_file(file_id=file_id) file_data = await current_app.convex.get_file(file_id=file_id)
disable_logging = False disable_logging = False
if not file_data: if not file_data:
add_wide_event_context(nanoshare={"operation_status": "not_found"})
abort(404) abort(404)
user = session.get('user') user = session.get('user')
@@ -136,6 +145,7 @@ async def serve_file(file_id: str):
disable_logging = True disable_logging = True
if file_data.get("expired", None): if file_data.get("expired", None):
add_wide_event_context(nanoshare={"operation_status": "expired", "owner_request": disable_logging})
if not disable_logging: if not disable_logging:
await current_app.convex.add_file_access(file_id=file_id, ip_address=get_ip(), user_agent=request.user_agent, status="expired") await current_app.convex.add_file_access(file_id=file_id, ip_address=get_ip(), user_agent=request.user_agent, status="expired")
return Response("This file has expired.", status=410, headers={ return Response("This file has expired.", status=410, headers={
@@ -149,10 +159,12 @@ async def serve_file(file_id: str):
force_download = request.args.get("download") in {"1", "true", "yes"} force_download = request.args.get("download") in {"1", "true", "yes"}
if not file_data.get('db_image_url', None): if not file_data.get('db_image_url', None):
add_wide_event_context(nanoshare={"operation_status": "missing_storage", "owner_request": disable_logging})
if not disable_logging: if not disable_logging:
await current_app.convex.add_file_access(file_id=file_id, ip_address=get_ip(), user_agent=request.user_agent, status="error") await current_app.convex.add_file_access(file_id=file_id, ip_address=get_ip(), user_agent=request.user_agent, status="error")
abort(404) abort(404)
add_wide_event_context(nanoshare={"operation_status": "served", "owner_request": disable_logging, "content_type": content_type, "download": force_download})
if not disable_logging: if not disable_logging:
await current_app.convex.add_file_access(file_id=file_id, ip_address=get_ip(), user_agent=request.user_agent, status="ok") await current_app.convex.add_file_access(file_id=file_id, ip_address=get_ip(), user_agent=request.user_agent, status="ok")
+11
View File
@@ -1,6 +1,7 @@
from my_modules.decoratory.header import login_required from my_modules.decoratory.header import login_required
from my_modules.expiry import parse_expires, ensure_utc from my_modules.expiry import parse_expires, ensure_utc
from my_modules.file_meta import iso_stamp_filename, format_size from my_modules.file_meta import iso_stamp_filename, format_size
from quart_common.web.wide_event import add_wide_event_context
from quart import Blueprint, request, jsonify, current_app from quart import Blueprint, request, jsonify, current_app
import asyncio, hashlib import asyncio, hashlib
@@ -73,10 +74,13 @@ async def api_upload(user):
text = form.get('text', '') text = form.get('text', '')
orphan_registry = getattr(current_app, 'orphan_storage_registry', None) orphan_registry = getattr(current_app, 'orphan_storage_registry', None)
add_wide_event_context(nanoshare={"operation": "upload", "has_file": bool(files.get('file')), "has_text": bool(text.strip())})
uploaded = files.get('file') uploaded = files.get('file')
expires_at_dt = ensure_utc(parse_expires(expires_raw)) expires_at_dt = ensure_utc(parse_expires(expires_raw))
if not uploaded and not text.strip(): if not uploaded and not text.strip():
add_wide_event_context(nanoshare={"operation_status": "missing_content"})
return jsonify({'ok': False, 'error': 'No content provided'}), 400 return jsonify({'ok': False, 'error': 'No content provided'}), 400
content_type = None content_type = None
@@ -84,6 +88,7 @@ async def api_upload(user):
# --- binary upload path --- # --- binary upload path ---
if uploaded: if uploaded:
fname = uploaded.filename or '' fname = uploaded.filename or ''
add_wide_event_context(nanoshare={"upload_type": "file", "filename_present": bool(fname)})
ctype = uploaded.mimetype or 'application/octet-stream' ctype = uploaded.mimetype or 'application/octet-stream'
content_type = ctype content_type = ctype
storage_id = None storage_id = None
@@ -115,6 +120,7 @@ async def api_upload(user):
) )
if storage_id: if storage_id:
reused_orphan_storage_id = True reused_orphan_storage_id = True
add_wide_event_context(nanoshare={"reused_orphan_storage_id": True})
else: else:
storage_id, sent_size = await current_app.convex.send_stream_to_storage(stream=stream,content_type=content_type) storage_id, sent_size = await current_app.convex.send_stream_to_storage(stream=stream,content_type=content_type)
size_bytes = sent_size size_bytes = sent_size
@@ -128,6 +134,7 @@ async def api_upload(user):
) )
if storage_id: if storage_id:
reused_orphan_storage_id = True reused_orphan_storage_id = True
add_wide_event_context(nanoshare={"reused_orphan_storage_id": True})
else: else:
storage_id = await current_app.convex.send_to_storage(data=data, content_type=content_type) storage_id = await current_app.convex.send_to_storage(data=data, content_type=content_type)
size_bytes = len(data) size_bytes = len(data)
@@ -151,6 +158,7 @@ async def api_upload(user):
# --- text upload path --- # --- text upload path ---
elif text.strip(): elif text.strip():
add_wide_event_context(nanoshare={"upload_type": "text"})
data = text.encode('utf-8') data = text.encode('utf-8')
fname = iso_stamp_filename('pasted', 'txt') fname = iso_stamp_filename('pasted', 'txt')
fingerprint = fingerprint_bytes(data) fingerprint = fingerprint_bytes(data)
@@ -160,6 +168,8 @@ async def api_upload(user):
else None else None
) )
reused_orphan_storage_id = bool(storage_id) reused_orphan_storage_id = bool(storage_id)
if reused_orphan_storage_id:
add_wide_event_context(nanoshare={"reused_orphan_storage_id": True})
if not storage_id: if not storage_id:
storage_id = await current_app.convex.send_to_storage( storage_id = await current_app.convex.send_to_storage(
data=data, content_type='text/plain' data=data, content_type='text/plain'
@@ -183,4 +193,5 @@ async def api_upload(user):
await orphan_registry.remember(user['sub'], fingerprint, storage_id) await orphan_registry.remember(user['sub'], fingerprint, storage_id)
raise raise
add_wide_event_context(nanoshare={"operation_status": "uploaded", "content_type": content_type})
return jsonify({'ok': True}) return jsonify({'ok': True})
+145
View File
@@ -0,0 +1,145 @@
import asyncio
import importlib
import sys
import types
from pathlib import Path
sys.path.insert(0, str(Path(__file__).resolve().parents[1]))
from quart import Quart, g, session
class FakeLogger:
def __init__(self):
self.records = []
async def info(self, message):
self.records.append(("info", message))
async def warning(self, message):
self.records.append(("warning", message))
async def error(self, message):
self.records.append(("error", message))
class FakeIPBotManager:
def is_client_ip_always_allowed(self, ip):
return False
class FakeSetupApp:
def before_request(self, func):
return func
def context_processor(self, func):
return func
class FakeConvex:
async def is_ip_address_whitelisted_or_blocked(self, ip_address):
return {"whiteliste": False, "blocked": False}
async def is_path_blocked(self, path):
return False
class FailingConvex:
async def is_ip_address_whitelisted_or_blocked(self, ip_address):
raise RuntimeError("convex down")
def load_middleware(monkeypatch, client_ip="203.0.113.10", logger=None):
fake_errors = types.ModuleType("routes.handeling.errorsAndBots")
async def maybe_a_hacker():
return "blocked", 418
fake_errors.maybe_a_hacker = maybe_a_hacker
fake_constens = types.ModuleType("my_modules.app.constens")
fake_constens.THE_IP_BOT_MANAGER = FakeIPBotManager()
fake_constens.SKIP_PATH_PREFIXES = ("/static", "/storage")
fake_constens.SKIP_PATHS = ("/favicon.ico",)
fake_logger_module = types.ModuleType("my_modules.app.logger")
fake_logger_module.logger = logger or FakeLogger()
fake_functions = types.ModuleType("my_modules.functions")
fake_functions.get_ip = lambda: client_ip
fake_setup = types.ModuleType("my_modules.app.setup")
fake_setup.app = FakeSetupApp()
monkeypatch.setitem(sys.modules, "routes.handeling.errorsAndBots", fake_errors)
monkeypatch.setitem(sys.modules, "my_modules.app.constens", fake_constens)
monkeypatch.setitem(sys.modules, "my_modules.app.logger", fake_logger_module)
monkeypatch.setitem(sys.modules, "my_modules.functions", fake_functions)
monkeypatch.setitem(sys.modules, "my_modules.app.setup", fake_setup)
sys.modules.pop("my_modules.middleware", None)
return importlib.import_module("my_modules.middleware")
def test_middleware_adds_user_and_client_context(monkeypatch):
async def run_test():
middleware = load_middleware(monkeypatch)
monkeypatch.setattr(middleware, "get_ip", lambda: "203.0.113.10")
monkeypatch.setattr(middleware, "logger", FakeLogger())
app = Quart(__name__)
app.secret_key = "test-secret"
app.convex = FakeConvex()
async with app.test_request_context("/files"):
g.wide_event = {}
session["user"] = {"sub": "user_123", "preferred_username": "demo"}
session["login_at"] = 1
result = await middleware.custom_middleware()
assert result is None
assert g.wide_event["user"]["id"] == "user_123"
assert g.wide_event["user"]["name"] == "demo"
assert g.wide_event["user"]["authenticated"] is True
assert g.wide_event["user"]["session_age_seconds"] >= 0
assert g.wide_event["client"]["ip"] == "203.0.113.10"
asyncio.run(run_test())
def test_middleware_marks_missing_convex_as_skipped(monkeypatch):
async def run_test():
middleware = load_middleware(monkeypatch, client_ip="203.0.113.11")
monkeypatch.setattr(middleware, "logger", FakeLogger())
app = Quart(__name__)
app.secret_key = "test-secret"
async with app.test_request_context("/files"):
g.wide_event = {}
result = await middleware.custom_middleware()
assert result is None
assert g.wide_event["client"]["ip"] == "203.0.113.11"
assert g.wide_event["security"] == {
"convex_missing": True,
"middleware_skipped": True,
}
asyncio.run(run_test())
def test_middleware_records_convex_security_failures(monkeypatch):
async def run_test():
fake_logger = FakeLogger()
middleware = load_middleware(monkeypatch, client_ip="203.0.113.12", logger=fake_logger)
app = Quart(__name__)
app.secret_key = "test-secret"
app.convex = FailingConvex()
async with app.test_request_context("/files"):
g.wide_event = {}
result = await middleware.custom_middleware()
assert result is None
assert g.wide_event["client"]["ip"] == "203.0.113.12"
assert g.wide_event["security"] == {"ip_lookup_failed": True}
assert g.wide_event["error"]["type"] == "RuntimeError"
assert fake_logger.records == [("error", "[MIDDLEWARE] Convex ip_lookup failed: convex down")]
asyncio.run(run_test())