From 404bf263bf7ae34a16db0228f7e3150fdfe79eb7 Mon Sep 17 00:00:00 2001 From: Daniel Dolezal Date: Fri, 24 Oct 2025 08:21:16 +0200 Subject: [PATCH] add code to start the server --- my_modules/AsyncCache.py | 105 ++++++++++++++++++++++ my_modules/app/constens.py | 21 +++++ my_modules/app/logger.py | 12 +++ my_modules/app/setup.py | 80 +++++++++++++++++ my_modules/decoratory/header.py | 150 ++++++++++++++++++++++++++++++++ my_modules/functions.py | 63 ++++++++++++++ my_modules/middleware.py | 35 ++++++++ 7 files changed, 466 insertions(+) create mode 100644 my_modules/AsyncCache.py create mode 100644 my_modules/app/constens.py create mode 100644 my_modules/app/logger.py create mode 100644 my_modules/app/setup.py create mode 100644 my_modules/decoratory/header.py create mode 100644 my_modules/functions.py create mode 100644 my_modules/middleware.py diff --git a/my_modules/AsyncCache.py b/my_modules/AsyncCache.py new file mode 100644 index 0000000..cbc6076 --- /dev/null +++ b/my_modules/AsyncCache.py @@ -0,0 +1,105 @@ +import redis.asyncio as aioredis +import asyncio +import pickle + +class AsyncCache: + def __init__(self, backend="memory", prefix="cache:", default_ttl=300, **kwargs): + self.prefix = prefix + self.ttl = default_ttl + + if backend == "redis": + self.backend_type = "redis" + self.redis = aioredis.Redis( + host=kwargs.get("host", "localhost"), + port=int(kwargs.get("port", 6379)), + username=kwargs.get("username"), + password=kwargs.get("password"), + db=int(kwargs.get("db", 0)), + decode_responses=False, + ) + else: + self.backend_type = "memory" + self._store = {} + self._expiry = {} + + # --------- asynchrone Pickle-Helfer ------------- + async def _pickle_dumps(self, value): + """Pickle.dumps in Threadpool ausführen.""" + return await asyncio.to_thread(pickle.dumps, value, protocol=pickle.HIGHEST_PROTOCOL) + + async def _pickle_loads(self, data): + """Pickle.loads in Threadpool ausführen.""" + return await asyncio.to_thread(pickle.loads, data) + + # --------- Flask-Caching-ähnliche Serialisierung ------------- + async def dump_object(self, value) -> bytes: + """ + * int -> ASCII-Bytes ohne Prefix + * alles andere -> b"!" + Pickle-Bytes + """ + if isinstance(value, int): + return str(value).encode("ascii") + pickled = await self._pickle_dumps(value) + return b"!" + pickled + + async def load_object(self, value: bytes): + """ + * beginnt mit b"!" -> Pickle.loads + * ansonsten -> int oder UTF-8-String + """ + if value is None: + return None + if value.startswith(b"!"): + return await self._pickle_loads(value[1:]) + try: + return int(value) + except ValueError: + return value.decode("utf-8", errors="ignore") + + # ----------------- Öffentliche API ----------------- + async def set(self, key, value, ttl=None): + ttl = ttl or self.ttl + full_key = f"{self.prefix}{key}" + dumped = await self.dump_object(value) + + if self.backend_type == "redis": + return await self.redis.set(full_key, dumped, ex=ttl) + else: + self._store[full_key] = dumped + self._expiry[full_key] = asyncio.get_event_loop().time() + ttl + return True + + async def get(self, key): + full_key = f"{self.prefix}{key}" + + if self.backend_type == "redis": + data = await self.redis.get(full_key) + else: + expire = self._expiry.get(full_key) + if expire and expire < asyncio.get_event_loop().time(): + self._store.pop(full_key, None) + self._expiry.pop(full_key, None) + return None + data = self._store.get(full_key) + + return await self.load_object(data) + + async def delete(self, key): + full_key = f"{self.prefix}{key}" + if self.backend_type == "redis": + return await self.redis.delete(full_key) + else: + self._store.pop(full_key, None) + self._expiry.pop(full_key, None) + return True + + async def clear(self): + if self.backend_type == "redis": + keys = await self.redis.keys(f"{self.prefix}*") + if keys: + return await self.redis.delete(*keys) + return 0 + else: + self._store.clear() + self._expiry.clear() + return True diff --git a/my_modules/app/constens.py b/my_modules/app/constens.py new file mode 100644 index 0000000..b9f137c --- /dev/null +++ b/my_modules/app/constens.py @@ -0,0 +1,21 @@ +from my_modules.app.logger import logger + +from dotenv import find_dotenv, load_dotenv, dotenv_values +from pathlib import Path +import os, asyncio + +async def read_dot_file(): + if load_dotenv(find_dotenv()): + dot_env_file = find_dotenv() + await logger.info(f'Found dotenv File: {dot_env_file}') + await logger.info(f'Loaded Content: {dict(dotenv_values(dot_env_file))}') + +asyncio.run(read_dot_file()) + +WEB_DEBUG = os.getenv("WEB_DEBUG", False) + +SECRET_KEY = os.getenv("FLASK_SECRET_KEY", "USE_ENV_das_ist_ein_geheimer_schlüssel_1") +API_GROUP = os.getenv("API_GROUP", 'PICOSHARE') + +UPLOAD_DIR = Path("uploads") +UPLOAD_DIR.mkdir(parents=True, exist_ok=True) diff --git a/my_modules/app/logger.py b/my_modules/app/logger.py new file mode 100644 index 0000000..0786b2e --- /dev/null +++ b/my_modules/app/logger.py @@ -0,0 +1,12 @@ +from aiologger.formatters.base import Formatter +from aiologger.handlers.streams import AsyncStreamHandler +from aiologger import Logger +import os +import sys + +formatter = Formatter(fmt="%(levelname)s %(module)s: %(message)s") +handler = AsyncStreamHandler(stream=sys.stdout) +handler.formatter = formatter + +logger = Logger(name="my_webside_and_api", level="DEBUG" if os.getenv("WEB_DEBUG", False) == "true" else "INFO") +logger.handlers = [handler] diff --git a/my_modules/app/setup.py b/my_modules/app/setup.py new file mode 100644 index 0000000..5010450 --- /dev/null +++ b/my_modules/app/setup.py @@ -0,0 +1,80 @@ +from my_modules.functions import custom_limit_key +from my_modules.app.constens import SECRET_KEY, UPLOAD_DIR +from my_modules.AsyncCache import AsyncCache +from my_modules.app.logger import logger +from my_modules.EdgeDB import EdgeDB + +from quart_session import Session +from flask_limiter import Limiter + +import redis.asyncio as aioredis +from quart import Quart +import os + +app = Quart(__name__, template_folder="../../templates/side", static_folder="../../templates/files") +app.config['MAX_CONTENT_LENGTH'] = 1024 * 1024 * 1024 + +app.secret_key = SECRET_KEY +app.upload_folder = UPLOAD_DIR + +# Cache, Sessions and Limiter over Valkey +if os.getenv("VALKEY_HOST", None) is not None: + cache = AsyncCache( + backend='redis', + default_ttl=300, + username=os.getenv('VALKEY_CACHE_USER', ''), + password=os.getenv('VALKEY_CACHE_PASSWORD', ''), + host=os.getenv('VALKEY_HOST'), + port=os.getenv('VALKEY_PORT', 6379), + db=os.getenv('VALKEY_DB', 0) + ) +else: + cache = AsyncCache( + backend='memory', + ) + +if os.getenv("VALKEY_HOST", None) is not None: + app.config.from_mapping( + SESSION_TYPE="redis", + SESSION_PERMANENT=True, + SESSION_USE_SIGNER=True, + SESSION_REDIS = aioredis.Redis( + username=os.getenv('VALKEY_SESSION_USER', None), + password=os.getenv('VALKEY_SESSION_PASSWORD', None), + host=os.getenv("VALKEY_HOST"), + port=os.getenv("VALKEY_PORT", 6379), + db=os.getenv("VALKEY_DB", 0), + decode_responses=True + ) + ) +else: + app.config.from_mapping( + SESSION_TYPE='memcached', + ) + +Session(app) + +LIMITER = Limiter( + custom_limit_key, + app=app, + storage_uri=( + f"redis://{os.getenv('VALKEY_LIMITER_USER', '')}:{os.getenv('VALKEY_LIMITER_PASSWORD', '')}" + f"@{os.getenv("VALKEY_HOST")}:{os.getenv('VALKEY_PORT', 6379)}/{os.getenv('VALKEY_DB', 0)}" + ) if os.getenv("VALKEY_HOST") else None, + default_limits=[], + strategy='moving-window' +) + +@app.before_serving +async def init_edgedb(): + app.edgedb = EdgeDB( + database=os.getenv("EDGEDB_DATABASE"), + tls_security=None if app.debug else 'insecure' + ) + await app.edgedb.connect() + +@app.after_serving +async def close_edgedb(): + if app.edgedb: + await app.edgedb.close() + await logger.shutdown() diff --git a/my_modules/decoratory/header.py b/my_modules/decoratory/header.py new file mode 100644 index 0000000..ea766f9 --- /dev/null +++ b/my_modules/decoratory/header.py @@ -0,0 +1,150 @@ +from my_modules.app.constens import SECRET_KEY +from my_modules.app.logger import logger +from my_modules.app.setup import LIMITER +from my_modules.functions import get_ip + +from quart import jsonify, request, url_for, Response, current_app, session, abort +from functools import wraps +from datetime import datetime +import asyncio, msgpack, json, jwt + +def encode_object_default(obj): + if isinstance(obj, datetime): + return obj.strftime('%a, %d %b %Y %H:%M:%S %Z') + raise TypeError(f"Type {type(obj)} not serializable") + +# Helper function to extract the token +async def get_auth_token(): + auth_header = request.headers.get('Authorization') + if auth_header: + try: + return auth_header.split(" ")[1] + except IndexError: + pass + + return None + +# Custom decorator for token validation +def token_required(func): + @wraps(func) + async def wrapper(*args, **kwargs): + token = await get_auth_token() + if not token: + await logger.error('API Token is missing') + return jsonify(error='Token is missing'), 400 + + try: + decoded_payload = jwt.decode(token, SECRET_KEY, algorithms=['HS256']) + if not await current_app.edgedb.check_if_refresh_token_exists_by_id(decoded_payload['refresh_id']): + await logger.error(f'API Refresh Token not found: {decoded_payload['refresh_id']}') + return jsonify(error='Refresh Token not found', msg='Please login again', url=url_for('login')), 403 + except jwt.ExpiredSignatureError: + await logger.error('API Token has expired') + return jsonify(error='Token has expired'), 401 + except jwt.InvalidTokenError: + await logger.error('API Token is invalid') + return jsonify(error='Token is invalid'), 401 + + return await func(user=decoded_payload, *args, **kwargs) + return wrapper + +# Custom decorator for content type reading, convertig dict to response +def parse_request_data(func): + @wraps(func) + async def wrapper(*args, **kwargs): + content_type = request.headers.get('Content-Type', '').lower() + data = None + body = await request.body + + if body: + if 'application/msgpack' in content_type or 'application/x-msgpack' in content_type: + try: + data = await asyncio.to_thread(msgpack.unpackb, body, raw=False) + except Exception: + return jsonify({'error': 'Invalid MessagePack'}), 400 + elif 'application/json' in content_type: + data = await request.get_json(silent=True) + if data is None: + return jsonify({'error': 'Invalid JSON'}), 400 + else: + if request.method in ['POST', 'PUT', 'PATCH', 'DELETE']: + return jsonify({'error': 'Unsupported Content-Type'}), 415 + # else: + # if request.method in ['POST', 'PUT', 'PATCH']: + # return jsonify({'error': 'Empty request body'}), 400 + + return await func(data=data, *args, **kwargs) + return wrapper + +def format_response(func): + @wraps(func) + async def wrapper(*args, **kwargs): + result = await func(*args, **kwargs) + + # Unpack result: (data), (data, status), (data, headers), (data, status, headers) + data = None + status = 200 + headers = {} + + if isinstance(result, tuple): + data = result[0] + if len(result) == 2: + if isinstance(result[1], dict): + headers = result[1] + else: + status = result[1] + elif len(result) == 3: + status = result[1] + headers = result[2] + else: + data = result + + accept = request.headers.get('Accept', '').lower() + if 'application/msgpack' in accept or 'application/x-msgpack' in accept: + packed = await asyncio.to_thread(msgpack.packb, data, default=encode_object_default, use_bin_type=True) + return Response(packed, content_type='application/msgpack', status=status, headers=headers) + else: + json_str = await asyncio.to_thread(json.dumps, data, ensure_ascii=False, default=encode_object_default) + response = Response(json_str, status=status, content_type='application/json') + response.headers.update(headers) + return response + + return wrapper + +# Custom decorator for adding limits for spezific methodes by endpoint +def apply_limit(endpoint_name, limits:dict=None): + def make_key_func(endpoint): + def key_func(): + ip = get_ip() + # if THE_IP_BOT_MANAGER.is_client_ip_always_allowed(ip): + # return None # No key, no increment, no enforcement + + # Combine endpoint name and HTTP method (and client IP) into the rate-limit key + return f":{ip}:{endpoint}:{request.method}:" + return key_func + + def decorator(func): + @wraps(func) + async def wrapped(*args, **kwargs): + return await func(*args, **kwargs) + + rules = limits.get(endpoint_name) + def dynamic_limit(): + if isinstance(rules, dict): + return rules.get(request.method.upper(), "10000 per second") + return rules or "10000 per second" + + key_fn = make_key_func(endpoint_name) + return LIMITER.limit(dynamic_limit, key_func=key_fn)(wrapped) + return decorator + +# Check if User is loggedin +def login_required(func): + @wraps(func) + async def decorated_function(*args, **kwargs): + user_session = session.get('user') + if user_session is None: + abort(401) + + return await func(user=user_session, *args, **kwargs) + return decorated_function diff --git a/my_modules/functions.py b/my_modules/functions.py new file mode 100644 index 0000000..bd49d9f --- /dev/null +++ b/my_modules/functions.py @@ -0,0 +1,63 @@ +from quart import has_request_context, request, has_websocket_context, websocket +from flask_limiter import Limiter +import subprocess, aiohttp + +# Get IPs +def get_ip(): + if has_request_context(): + xff = request.headers.get("X-Forwarded-For", "") + return xff.split(",")[0].strip() if xff else request.remote_addr + elif has_websocket_context(): + xff = websocket.headers.get("X-Forwarded-For", "") + return xff.split(",")[0].strip() if xff else websocket.remote_addr + return None # No active request or websocket context + +async def get_my_ip_address(): + async with aiohttp.ClientSession() as session: + async with session.get("https://ipinfo.io/ip") as response: + if response.status == 200: + return await response.text() + raise aiohttp.ClientError(f'Could not get IP: {response.status} {await response.text()}') + +def get_local_ip_addresses(): + try: + result = subprocess.run(['hostname', '-I'], capture_output=True, text=True) + first_ip = result.stdout.strip().split()[0] + return first_ip + except subprocess.CalledProcessError as e: + return None + except IndexError: + return None + +def generate_all_ips(base_ip:str) -> set: + ips = set() + for i in range(1, 255): # 1 to 254 inclusive + ips.add(replace_last_ip_segment(base_ip, i)) + return ips + +# Limiter Key Gen +def custom_limit_key(): + ip = get_ip() + # if THE_IP_BOT_MANAGER.is_client_ip_always_allowed(ip): + # return None # No key, no increment, no enforcement + parts = [part for part in ip.split(':') if part] # remove empty parts caused by :: + return f":{'.'.join(parts)}:" + +def enforce_custom_limit(limiter:Limiter, key:str, limit_count: int = 3, window_sec: int = 60): + custom_key = custom_limit_key() + if custom_key is None: + return None + + key = f"{key}{custom_key.rstrip(':')}" + + current = limiter.storage.incr(key, expiry=window_sec) + if current > limit_count: + raise LookupError("To Many 404 Requests") + +## Helper +def replace_last_ip_segment(ip:str, new_value:str="1") -> str: + parts = ip.strip().split('.') + if len(parts) == 4: + parts[-1] = str(new_value) + return '.'.join(parts) + raise ValueError("Invalid IP address format") diff --git a/my_modules/middleware.py b/my_modules/middleware.py new file mode 100644 index 0000000..1a5465d --- /dev/null +++ b/my_modules/middleware.py @@ -0,0 +1,35 @@ +from my_modules.app.logger import logger +from my_modules.functions import get_ip +from my_modules.app.setup import app + +from quart import request, render_template, current_app, session +from datetime import datetime + +@app.before_request +async def custom_middleware(): + if session.get('user'): # only if session already has data, update redis expire time + session.permanent = True + + client_ip = get_ip() + path = request.path + method = request.method + + # Skip allowed IPs or non-critical assets + if ( + "favicon" in path + or "files" in path + ): + return + + await logger.info(f"{method} | {client_ip} had accessed the Side {path}") + +@app.context_processor +async def inject_context_data(): + user = session.get("user") + current_year = datetime.now().year + + await logger.debug(f"Inject Context Data | User: {user}, Year: {current_year}") + return { + "user": user, + "year": current_year, + }