from quart import has_request_context, request, has_websocket_context, websocket from flask_limiter import Limiter import subprocess, aiohttp # Get IPs def get_ip(): if has_request_context(): xff = request.headers.get("X-Forwarded-For", "") return xff.split(",")[0].strip() if xff else request.remote_addr elif has_websocket_context(): xff = websocket.headers.get("X-Forwarded-For", "") return xff.split(",")[0].strip() if xff else websocket.remote_addr return None # No active request or websocket context async def get_my_ip_address(): async with aiohttp.ClientSession() as session: async with session.get("https://ipinfo.io/ip") as response: if response.status == 200: return await response.text() raise aiohttp.ClientError(f'Could not get IP: {response.status} {await response.text()}') def get_local_ip_addresses(): try: result = subprocess.run(['hostname', '-I'], capture_output=True, text=True) first_ip = result.stdout.strip().split()[0] return first_ip except subprocess.CalledProcessError as e: return None except IndexError: return None def generate_all_ips(base_ip:str) -> set: ips = set() for i in range(1, 255): # 1 to 254 inclusive ips.add(replace_last_ip_segment(base_ip, i)) return ips # Limiter Key Gen def custom_limit_key(): ip = get_ip() # if THE_IP_BOT_MANAGER.is_client_ip_always_allowed(ip): # return None # No key, no increment, no enforcement parts = [part for part in ip.split(':') if part] # remove empty parts caused by :: return f":{'.'.join(parts)}:" def enforce_custom_limit(limiter:Limiter, key:str, limit_count: int = 3, window_sec: int = 60): custom_key = custom_limit_key() if custom_key is None: return None key = f"{key}{custom_key.rstrip(':')}" current = limiter.storage.incr(key, expiry=window_sec) if current > limit_count: raise LookupError("To Many 404 Requests") ## Helper def replace_last_ip_segment(ip:str, new_value:str="1") -> str: parts = ip.strip().split('.') if len(parts) == 4: parts[-1] = str(new_value) return '.'.join(parts) raise ValueError("Invalid IP address format")