use new quart_common functions

This commit is contained in:
2026-04-01 16:17:30 +02:00
parent 51f02ff5c8
commit ea4738ad06
4 changed files with 35 additions and 225 deletions
+11 -56
View File
@@ -1,38 +1,15 @@
from quart import has_request_context, request, has_websocket_context, websocket
from quart_common.web.request import (
get_ip,
get_my_ip_address,
get_local_ip_addresses,
generate_all_ips,
replace_last_ip_segment,
get_request_context,
is_valid_uuid,
)
from quart_common.web.env import is_development_environment, is_testing_environment
from flask_limiter import Limiter
from uuid import UUID
import subprocess, aiohttp
# Get IPs
def get_ip():
context = get_request_context()
if context:
xff = context.headers.get("X-Forwarded-For", "")
return xff.split(",")[0].strip() if xff else context.remote_addr
return None # No active request or websocket context
async def get_my_ip_address():
async with aiohttp.ClientSession() as session:
async with session.get("https://ipinfo.io/ip") as response:
if response.status == 200:
return await response.text()
raise aiohttp.ClientError(f'Could not get IP: {response.status} {await response.text()}')
def get_local_ip_addresses():
try:
result = subprocess.run(['hostname', '-I'], capture_output=True, text=True)
first_ip = result.stdout.strip().split()[0]
return first_ip
except subprocess.CalledProcessError as e:
return None
except IndexError:
return None
def generate_all_ips(base_ip:str) -> set:
ips = set()
for i in range(1, 255): # 1 to 254 inclusive
ips.add(replace_last_ip_segment(base_ip, i))
return ips
# Limiter Key Gen
def custom_limit_key():
@@ -52,25 +29,3 @@ def enforce_custom_limit(limiter:Limiter, key:str, limit_count: int = 3, window_
current = limiter.storage.incr(key, expiry=window_sec)
if current > limit_count:
raise LookupError("To Many 404 Requests")
## Helper
def replace_last_ip_segment(ip:str, new_value:str="1") -> str:
parts = ip.strip().split('.')
if len(parts) == 4:
parts[-1] = str(new_value)
return '.'.join(parts)
raise ValueError("Invalid IP address format")
def get_request_context():
if has_request_context():
return request
elif has_websocket_context():
return websocket
return None
def is_valid_uuid(value: str) -> bool:
try:
UUID(value)
return True
except ValueError:
return False