from my_modules.app.constens import THE_IP_BOT_MANAGER from my_modules.app.logger import logger from my_modules.app.setup import LIMITER from my_modules.functions import get_ip from quart import jsonify, request, url_for, Response, current_app, session, abort from functools import wraps from datetime import datetime import asyncio, msgpack, json def encode_object_default(obj): if isinstance(obj, datetime): return obj.strftime('%a, %d %b %Y %H:%M:%S %Z') raise TypeError(f"Type {type(obj)} not serializable") # Helper function to extract the token async def get_auth_token(): auth_header = request.headers.get('Authorization') if auth_header: try: return auth_header.split(" ")[1] except IndexError: pass return None async def verify_token(token:str): decoded_payload = await current_app.convex.decode_access_token_payload(access_token=token) decoded_payload_error_state = decoded_payload.get('state', None) if decoded_payload is None: return {'error': "No Data from Database"}, 504 elif decoded_payload_error_state == 1: await logger.error(decoded_payload.get('error')) return {'error': 'Invalid access token'}, 401 elif decoded_payload_error_state == 2: await logger.error(decoded_payload.get('error')) return {'error': 'Wrong access token type'}, 401 elif decoded_payload_error_state == 3: await logger.error(decoded_payload.get('error')) return {'error': 'Refresh token not found', 'msg': 'Please login again and generate a new Token', 'url': url_for('auth_login.login')}, 403 elif decoded_payload_error_state == 4: await logger.error(decoded_payload.get('error')) return {'error': 'Refresh token expired'}, 401 return decoded_payload, None # Custom decorator for token validation def token_required(func): @wraps(func) async def wrapper(*args, **kwargs): token = await get_auth_token() if not token: await logger.error('API Token is missing') return jsonify(error='Token is missing'), 400 decoded_payload, status_code = await verify_token(token) decoded_payload_error = decoded_payload.get('error', None) if decoded_payload_error: return jsonify(decoded_payload), status_code return await func(user=decoded_payload, *args, **kwargs) return wrapper # Custom decorator for content type reading, convertig dict to response def parse_request_data(func): @wraps(func) async def wrapper(*args, **kwargs): content_type = request.headers.get('Content-Type', '').lower() data = None body = await request.body if body: if 'application/msgpack' in content_type or 'application/x-msgpack' in content_type: try: data = await asyncio.to_thread(msgpack.unpackb, body, raw=False) except Exception: return jsonify({'error': 'Invalid MessagePack'}), 400 elif 'application/json' in content_type: data = await request.get_json(silent=True) if data is None: return jsonify({'error': 'Invalid JSON'}), 400 else: if request.method in ['POST', 'PUT', 'PATCH', 'DELETE']: return jsonify({'error': 'Unsupported Content-Type'}), 415 # else: # if request.method in ['POST', 'PUT', 'PATCH']: # return jsonify({'error': 'Empty request body'}), 400 return await func(data=data, *args, **kwargs) return wrapper def format_response(func): @wraps(func) async def wrapper(*args, **kwargs): result = await func(*args, **kwargs) # Unpack result: (data), (data, status), (data, headers), (data, status, headers) data = None status = 200 headers = {} if isinstance(result, tuple): data = result[0] if len(result) == 2: if isinstance(result[1], dict): headers = result[1] else: status = result[1] elif len(result) == 3: status = result[1] headers = result[2] else: data = result accept = request.headers.get('Accept', '').lower() if 'application/msgpack' in accept or 'application/x-msgpack' in accept: packed = await asyncio.to_thread(msgpack.packb, data, default=encode_object_default, use_bin_type=True) return Response(packed, content_type='application/msgpack', status=status, headers=headers) else: json_str = await asyncio.to_thread(json.dumps, data, ensure_ascii=False, default=encode_object_default) response = Response(json_str, status=status, content_type='application/json') response.headers.update(headers) return response return wrapper # Custom decorator for adding limits for spezific methodes by endpoint def apply_limit(endpoint_name, limits:dict=None): def make_key_func(endpoint): def key_func(): ip = get_ip() if THE_IP_BOT_MANAGER.is_client_ip_always_allowed(ip): return None # No key, no increment, no enforcement # Combine endpoint name and HTTP method (and client IP) into the rate-limit key return f":{ip}:{endpoint}:{request.method}:" return key_func def decorator(func): @wraps(func) async def wrapped(*args, **kwargs): return await func(*args, **kwargs) rules = limits.get(endpoint_name) def dynamic_limit(): if isinstance(rules, dict): return rules.get(request.method.upper(), "10000 per second") return rules or "10000 per second" key_fn = make_key_func(endpoint_name) return LIMITER.limit(dynamic_limit, key_func=key_fn)(wrapped) return decorator # Check if User is loggedin def login_required(func): @wraps(func) async def decorated_function(*args, **kwargs): user_session = session.get('user') if user_session is None: abort(401) return await func(user=user_session, *args, **kwargs) return decorated_function