from my_modules.app.constens import SECRET_KEY from my_modules.app.logger import logger from my_modules.app.setup import LIMITER from my_modules.functions import get_ip from quart import jsonify, request, url_for, Response, current_app, session, abort from functools import wraps from datetime import datetime import asyncio, msgpack, json, jwt def encode_object_default(obj): if isinstance(obj, datetime): return obj.strftime('%a, %d %b %Y %H:%M:%S %Z') raise TypeError(f"Type {type(obj)} not serializable") # Helper function to extract the token async def get_auth_token(): auth_header = request.headers.get('Authorization') if auth_header: try: return auth_header.split(" ")[1] except IndexError: pass return None # Custom decorator for token validation def token_required(func): @wraps(func) async def wrapper(*args, **kwargs): token = await get_auth_token() if not token: await logger.error('API Token is missing') return jsonify(error='Token is missing'), 400 try: decoded_payload = jwt.decode(token, SECRET_KEY, algorithms=['HS256']) if not await current_app.edgedb.check_if_refresh_token_exists_by_id(decoded_payload['refresh_id']): await logger.error(f'API Refresh Token not found: {decoded_payload['refresh_id']}') return jsonify(error='Refresh Token not found', msg='Please login again', url=url_for('login')), 403 except jwt.ExpiredSignatureError: await logger.error('API Token has expired') return jsonify(error='Token has expired'), 401 except jwt.InvalidTokenError: await logger.error('API Token is invalid') return jsonify(error='Token is invalid'), 401 return await func(user=decoded_payload, *args, **kwargs) return wrapper # Custom decorator for content type reading, convertig dict to response def parse_request_data(func): @wraps(func) async def wrapper(*args, **kwargs): content_type = request.headers.get('Content-Type', '').lower() data = None body = await request.body if body: if 'application/msgpack' in content_type or 'application/x-msgpack' in content_type: try: data = await asyncio.to_thread(msgpack.unpackb, body, raw=False) except Exception: return jsonify({'error': 'Invalid MessagePack'}), 400 elif 'application/json' in content_type: data = await request.get_json(silent=True) if data is None: return jsonify({'error': 'Invalid JSON'}), 400 else: if request.method in ['POST', 'PUT', 'PATCH', 'DELETE']: return jsonify({'error': 'Unsupported Content-Type'}), 415 # else: # if request.method in ['POST', 'PUT', 'PATCH']: # return jsonify({'error': 'Empty request body'}), 400 return await func(data=data, *args, **kwargs) return wrapper def format_response(func): @wraps(func) async def wrapper(*args, **kwargs): result = await func(*args, **kwargs) # Unpack result: (data), (data, status), (data, headers), (data, status, headers) data = None status = 200 headers = {} if isinstance(result, tuple): data = result[0] if len(result) == 2: if isinstance(result[1], dict): headers = result[1] else: status = result[1] elif len(result) == 3: status = result[1] headers = result[2] else: data = result accept = request.headers.get('Accept', '').lower() if 'application/msgpack' in accept or 'application/x-msgpack' in accept: packed = await asyncio.to_thread(msgpack.packb, data, default=encode_object_default, use_bin_type=True) return Response(packed, content_type='application/msgpack', status=status, headers=headers) else: json_str = await asyncio.to_thread(json.dumps, data, ensure_ascii=False, default=encode_object_default) response = Response(json_str, status=status, content_type='application/json') response.headers.update(headers) return response return wrapper # Custom decorator for adding limits for spezific methodes by endpoint def apply_limit(endpoint_name, limits:dict=None): def make_key_func(endpoint): def key_func(): ip = get_ip() # if THE_IP_BOT_MANAGER.is_client_ip_always_allowed(ip): # return None # No key, no increment, no enforcement # Combine endpoint name and HTTP method (and client IP) into the rate-limit key return f":{ip}:{endpoint}:{request.method}:" return key_func def decorator(func): @wraps(func) async def wrapped(*args, **kwargs): return await func(*args, **kwargs) rules = limits.get(endpoint_name) def dynamic_limit(): if isinstance(rules, dict): return rules.get(request.method.upper(), "10000 per second") return rules or "10000 per second" key_fn = make_key_func(endpoint_name) return LIMITER.limit(dynamic_limit, key_func=key_fn)(wrapped) return decorator # Check if User is loggedin def login_required(func): @wraps(func) async def decorated_function(*args, **kwargs): user_session = session.get('user') if user_session is None: abort(401) return await func(user=user_session, *args, **kwargs) return decorated_function