add protection that shares the data with my webside

This commit is contained in:
2025-12-23 15:20:36 +01:00
parent 0b2d635fd8
commit 729e7f5fca
13 changed files with 211 additions and 60 deletions
+14
View File
@@ -0,0 +1,14 @@
class TheIPManager:
def __init__(self):
self.always_allowed_ips = set()
# Checks
def is_client_ip_always_allowed(self, client_ip:str):
return client_ip in self.always_allowed_ips
# Add
def add_always_allowed_ip(self, client_ip:str):
self.always_allowed_ips.add(client_ip)
def update_always_allowed_ip(self, client_ips:set):
self.always_allowed_ips.update(client_ips)
+2 -3
View File
@@ -1,7 +1,7 @@
from my_modules.TheIPManager import TheIPManager
from my_modules.app.logger import logger
from dotenv import find_dotenv, load_dotenv, dotenv_values
from pathlib import Path
import os, asyncio
async def read_dot_file():
@@ -17,5 +17,4 @@ WEB_DEBUG = os.getenv("WEB_DEBUG", False)
SECRET_KEY = os.getenv("FLASK_SECRET_KEY", "USE_ENV_das_ist_ein_geheimer_schlüssel_1")
API_GROUP = os.getenv("API_GROUP", 'NanoShare')
UPLOAD_DIR = Path("uploads")
UPLOAD_DIR.mkdir(parents=True, exist_ok=True)
THE_IP_BOT_MANAGER = TheIPManager()
+1 -1
View File
@@ -8,5 +8,5 @@ formatter = Formatter(fmt="%(levelname)s %(module)s: %(message)s")
handler = AsyncStreamHandler(stream=sys.stdout)
handler.formatter = formatter
logger = Logger(name="my_webside_and_api", level="DEBUG" if os.getenv("WEB_DEBUG", False) == "true" else "INFO")
logger = Logger(name="simple-picoshare", level="DEBUG" if os.getenv("WEB_DEBUG", False) == "true" else "INFO")
logger.handlers = [handler]
+11 -4
View File
@@ -1,8 +1,7 @@
from my_modules.functions import custom_limit_key
from my_modules.app.constens import SECRET_KEY, UPLOAD_DIR
from my_modules.functions import custom_limit_key, get_my_ip_address, get_local_ip_addresses, replace_last_ip_segment, generate_all_ips
from my_modules.app.constens import SECRET_KEY, THE_IP_BOT_MANAGER
from my_modules.AsyncCache import AsyncCache
from my_modules.app.logger import logger
from my_modules.db.ConvexDB import ConvexDB
from quart_session import Session
@@ -16,7 +15,6 @@ app = Quart(__name__, template_folder="../../templates/side", static_folder="../
app.config['MAX_CONTENT_LENGTH'] = 1024 * 1024 * 1024
app.secret_key = SECRET_KEY
app.upload_folder = UPLOAD_DIR
# Cache, Sessions and Limiter over Valkey
if os.getenv("VALKEY_HOST", None) is not None:
@@ -71,6 +69,15 @@ async def init_convex():
app.convex = ConvexDB(os.getenv("CONVEX_URL"))
await app.convex.connect()
THE_IP_BOT_MANAGER.add_always_allowed_ip('127.0.0.1')
THE_IP_BOT_MANAGER.add_always_allowed_ip(await get_my_ip_address())
local_docker_host_ip = get_local_ip_addresses()
if local_docker_host_ip:
base_ip = replace_last_ip_segment(local_docker_host_ip, 1)
all_local_ips = generate_all_ips(base_ip)
THE_IP_BOT_MANAGER.update_always_allowed_ip(all_local_ips)
@app.after_serving
async def close_convex():
if app.convex:
+29 -15
View File
@@ -1,4 +1,4 @@
from my_modules.app.constens import SECRET_KEY
from my_modules.app.constens import THE_IP_BOT_MANAGER
from my_modules.app.logger import logger
from my_modules.app.setup import LIMITER
from my_modules.functions import get_ip
@@ -6,7 +6,7 @@ from my_modules.functions import get_ip
from quart import jsonify, request, url_for, Response, current_app, session, abort
from functools import wraps
from datetime import datetime
import asyncio, msgpack, json, jwt
import asyncio, msgpack, json
def encode_object_default(obj):
if isinstance(obj, datetime):
@@ -24,6 +24,27 @@ async def get_auth_token():
return None
async def verify_token(token:str):
decoded_payload = await current_app.convex.decode_access_token_payload(access_token=token)
decoded_payload_error_state = decoded_payload.get('state', None)
if decoded_payload is None:
return {'error': "No Data from Database"}, 504
elif decoded_payload_error_state == 1:
await logger.error(decoded_payload.get('error'))
return {'error': 'Invalid access token'}, 401
elif decoded_payload_error_state == 2:
await logger.error(decoded_payload.get('error'))
return {'error': 'Wrong access token type'}, 401
elif decoded_payload_error_state == 3:
await logger.error(decoded_payload.get('error'))
return {'error': 'Refresh token not found', 'msg': 'Please login again and generate a new Token', 'url': url_for('auth_login.login')}, 403
elif decoded_payload_error_state == 4:
await logger.error(decoded_payload.get('error'))
return {'error': 'Refresh token expired'}, 401
return decoded_payload, None
# Custom decorator for token validation
def token_required(func):
@wraps(func)
@@ -33,17 +54,10 @@ def token_required(func):
await logger.error('API Token is missing')
return jsonify(error='Token is missing'), 400
try:
decoded_payload = jwt.decode(token, SECRET_KEY, algorithms=['HS256'])
if not await current_app.edgedb.check_if_refresh_token_exists_by_id(decoded_payload['refresh_id']):
await logger.error(f'API Refresh Token not found: {decoded_payload['refresh_id']}')
return jsonify(error='Refresh Token not found', msg='Please login again', url=url_for('login')), 403
except jwt.ExpiredSignatureError:
await logger.error('API Token has expired')
return jsonify(error='Token has expired'), 401
except jwt.InvalidTokenError:
await logger.error('API Token is invalid')
return jsonify(error='Token is invalid'), 401
decoded_payload, status_code = await verify_token(token)
decoded_payload_error = decoded_payload.get('error', None)
if decoded_payload_error:
return jsonify(decoded_payload), status_code
return await func(user=decoded_payload, *args, **kwargs)
return wrapper
@@ -116,8 +130,8 @@ def apply_limit(endpoint_name, limits:dict=None):
def make_key_func(endpoint):
def key_func():
ip = get_ip()
# if THE_IP_BOT_MANAGER.is_client_ip_always_allowed(ip):
# return None # No key, no increment, no enforcement
if THE_IP_BOT_MANAGER.is_client_ip_always_allowed(ip):
return None # No key, no increment, no enforcement
# Combine endpoint name and HTTP method (and client IP) into the rate-limit key
return f":{ip}:{endpoint}:{request.method}:"
+20 -1
View File
@@ -1,3 +1,6 @@
from routes.handeling.errorsAndBots import maybe_a_hacker
from my_modules.app.constens import THE_IP_BOT_MANAGER
from my_modules.app.logger import logger
from my_modules.functions import get_ip
from my_modules.app.setup import app
@@ -14,13 +17,29 @@ async def custom_middleware():
path = request.path
method = request.method
db_whitelisted_or_blocked = await current_app.convex.is_ip_address_whitelisted_or_blocked(ip_address=client_ip)
# Skip allowed IPs or non-critical assets
if (
"favicon" in path
db_whitelisted_or_blocked['whiteliste']
or THE_IP_BOT_MANAGER.is_client_ip_always_allowed(client_ip)
or "static" in path
or "favicon" in path
or "storage" in path
):
return
# 2. If IP is already blocked
if db_whitelisted_or_blocked['blocked']:
await logger.error(f"[BLOCKED] {method} | {client_ip} tried {method} {path}")
await current_app.convex.increment_blocked_ip_address_access(ip_address=client_ip, method=method, path=path)
return await render_template("views/basics/blocked_access.htm", remote_addr=client_ip), 403
# 3. If path contains honeypot targets
if await current_app.convex.is_path_blocked(path=path):
await logger.warning(f"[HONEYPOT] {method} | {client_ip} accessed {path}")
return await maybe_a_hacker()
await logger.info(f"{method} | {client_ip} had accessed the Side {path}")
@app.context_processor