add code to start the server
This commit is contained in:
@@ -0,0 +1,63 @@
|
||||
from quart import has_request_context, request, has_websocket_context, websocket
|
||||
from flask_limiter import Limiter
|
||||
import subprocess, aiohttp
|
||||
|
||||
# Get IPs
|
||||
def get_ip():
|
||||
if has_request_context():
|
||||
xff = request.headers.get("X-Forwarded-For", "")
|
||||
return xff.split(",")[0].strip() if xff else request.remote_addr
|
||||
elif has_websocket_context():
|
||||
xff = websocket.headers.get("X-Forwarded-For", "")
|
||||
return xff.split(",")[0].strip() if xff else websocket.remote_addr
|
||||
return None # No active request or websocket context
|
||||
|
||||
async def get_my_ip_address():
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.get("https://ipinfo.io/ip") as response:
|
||||
if response.status == 200:
|
||||
return await response.text()
|
||||
raise aiohttp.ClientError(f'Could not get IP: {response.status} {await response.text()}')
|
||||
|
||||
def get_local_ip_addresses():
|
||||
try:
|
||||
result = subprocess.run(['hostname', '-I'], capture_output=True, text=True)
|
||||
first_ip = result.stdout.strip().split()[0]
|
||||
return first_ip
|
||||
except subprocess.CalledProcessError as e:
|
||||
return None
|
||||
except IndexError:
|
||||
return None
|
||||
|
||||
def generate_all_ips(base_ip:str) -> set:
|
||||
ips = set()
|
||||
for i in range(1, 255): # 1 to 254 inclusive
|
||||
ips.add(replace_last_ip_segment(base_ip, i))
|
||||
return ips
|
||||
|
||||
# Limiter Key Gen
|
||||
def custom_limit_key():
|
||||
ip = get_ip()
|
||||
# if THE_IP_BOT_MANAGER.is_client_ip_always_allowed(ip):
|
||||
# return None # No key, no increment, no enforcement
|
||||
parts = [part for part in ip.split(':') if part] # remove empty parts caused by ::
|
||||
return f":{'.'.join(parts)}:"
|
||||
|
||||
def enforce_custom_limit(limiter:Limiter, key:str, limit_count: int = 3, window_sec: int = 60):
|
||||
custom_key = custom_limit_key()
|
||||
if custom_key is None:
|
||||
return None
|
||||
|
||||
key = f"{key}{custom_key.rstrip(':')}"
|
||||
|
||||
current = limiter.storage.incr(key, expiry=window_sec)
|
||||
if current > limit_count:
|
||||
raise LookupError("To Many 404 Requests")
|
||||
|
||||
## Helper
|
||||
def replace_last_ip_segment(ip:str, new_value:str="1") -> str:
|
||||
parts = ip.strip().split('.')
|
||||
if len(parts) == 4:
|
||||
parts[-1] = str(new_value)
|
||||
return '.'.join(parts)
|
||||
raise ValueError("Invalid IP address format")
|
||||
Reference in New Issue
Block a user