use login from quard_common

This commit is contained in:
2026-04-15 18:49:17 +02:00
parent 3d8d74785c
commit 0b81d3d803
4 changed files with 101 additions and 166 deletions
+1 -1
View File
@@ -1,5 +1,5 @@
from .handeling.basics import basic_bp
from .auth.login import auth_login_bp
from quart_common.routes.login import auth_login_bp
from .side.main import side_main_bp
-151
View File
@@ -1,151 +0,0 @@
from my_modules.app.setup import cache, LIMITER
from my_modules.app.constens import API_GROUP
from my_modules.app.logger import logger
from quart import Blueprint, redirect, url_for, session, request, render_template, abort, make_response, current_app
from authlib.jose import JsonWebKey, JoseError, jwt as authlib_jwt
from authlib.integrations.httpx_client import AsyncOAuth2Client
from jwt import InvalidTokenError
import httpx, uuid, os
auth_login_bp = Blueprint('auth_login', __name__)
# OAuth
OIDC_CLIENT_ID = os.getenv('OIDC_CLIENT_ID', '')
OIDC_CLIENT_SECRET = os.getenv('OIDC_CLIENT_SECRET', '')
OIDC_METADATA_URL = os.getenv('OIDC_METADATA_URL', '')
REDIRECT_URI_SCHEME = os.getenv('REDIRECT_URI_SCHEME', 'http')
async def get_oidc_metadata():
async with httpx.AsyncClient() as client:
try:
response = await client.get(OIDC_METADATA_URL)
response.raise_for_status()
return response.json()
except httpx.ReadTimeout:
return await get_oidc_metadata()
@auth_login_bp.route('/login', methods=['GET'])
@auth_login_bp.route('/auth', methods=['GET'])
@LIMITER.limit("5 per minute; 30 per hour")
async def login():
metadata = await get_oidc_metadata()
auth_id = request.cookies.get('auth_id') or str(uuid.uuid4())
nonce = str(uuid.uuid4())
await cache.set(f"oauth:nonce:{auth_id}", nonce, ttl=3603)
client = AsyncOAuth2Client(
client_id=OIDC_CLIENT_ID,
client_secret=OIDC_CLIENT_SECRET,
redirect_uri=url_for('auth_login.auth_callback', _external=True, _scheme=REDIRECT_URI_SCHEME),
scope='openid profile email',
)
uri, state = client.create_authorization_url(
metadata['authorization_endpoint'],
nonce=nonce,
state=auth_id,
)
response = await make_response(redirect(uri))
response.set_cookie('auth_id', auth_id, max_age=3600, httponly=True, secure=True, samesite='Lax')
return response
@auth_login_bp.route('/logout', methods=['GET'])
@auth_login_bp.route('/auth/logout', methods=['GET'])
@LIMITER.limit("10 per minute")
async def logout():
user = session.get('user')
if user:
await logger.info(f'logging out user: {user}')
session.pop('user', None)
session.clear()
return redirect(url_for('side_main.index'))
@auth_login_bp.route('/auth/callback', methods=['GET'])
@LIMITER.limit("5 per minute")
async def auth_callback():
try:
auth_id = request.args.get('state')
code = request.args.get('code')
nonce = await cache.get(f"oauth:nonce:{auth_id}")
if not nonce:
await logger.error('Nonce not found')
return await render_template('views/api/token.htm', error='Nonce not found'), 400
await cache.delete(f"oauth:nonce:{auth_id}")
metadata = await get_oidc_metadata()
client = AsyncOAuth2Client(
client_id=OIDC_CLIENT_ID,
client_secret=OIDC_CLIENT_SECRET,
redirect_uri=url_for('auth_login.auth_callback', _external=True, _scheme=REDIRECT_URI_SCHEME),
scope='openid profile email',
)
# Exchange code for token
token_fetched = False
while not token_fetched:
try:
token = await client.fetch_token(
metadata['token_endpoint'],
code=code,
grant_type='authorization_code'
)
await logger.debug(f'Auth Callback | token: {token}')
token_fetched = True
except httpx.ReadTimeout:
pass
# Decode ID token
id_token = token.get('id_token')
if not id_token:
await logger.error('ID token missing in OAuth response')
return await render_template('views/api/token.htm', error='ID token missing'), 400
# Fetch the JWKs to verify signature
async with httpx.AsyncClient() as http_client:
jwks_resp = await http_client.get(metadata['jwks_uri'])
jwks = jwks_resp.json()
keys = JsonWebKey.import_key_set(jwks)
claims = authlib_jwt.decode(id_token, key=keys)
try:
claims.validate_aud()
claims.validate()
except JoseError as e:
await logger.error(f"JWT validation error: {e}")
return await render_template('views/api/token.htm', error=str(e)), 400
if claims.get('nonce') != nonce:
await logger.error('Nonce mismatch in ID token')
return await render_template('views/api/token.htm', error='Invalid nonce'), 400
await logger.info(f'Auth Callback | user_info: {claims}')
if API_GROUP not in claims.get('groups', []):
await logger.error("You don't have Permissions to Access this API")
return await render_template('views/api/token.htm', error="You don't have Permissions to Access this API"), 403
session['user'] = claims
response = await make_response(redirect(url_for('side_main.index')))
response.set_cookie('auth_id', '', max_age=0, httponly=True, secure=True, samesite='Lax')
return response
except httpx.HTTPError as e:
await logger.error(f"HTTP error during token exchange: {e}")
return await render_template('views/api/token.htm', error="Token exchange failed"), 500
except InvalidTokenError as e:
await logger.error(f"Invalid ID Token: {e}")
return await render_template('views/api/token.htm', error="Invalid ID Token"), 400
except Exception as e:
await logger.exception("Unknown error during OAuth callback")
return await render_template('views/api/token.htm', error=str(e)), 500
+99 -13
View File
@@ -1,16 +1,87 @@
from my_modules.app.constens import BLOCKED_IPS_ACCESSING_TIMES, BLOCKED_IPS_STORED_TIMEFRAME
from my_modules.app.constens import (
BLOCKED_IPS_ACCESSING_TIMES,
BLOCKED_IPS_STORED_TIMEFRAME,
)
from my_modules.app.setup import app, LIMITER
from my_modules.app.logger import logger
from quart import request, render_template, jsonify, current_app, make_response
from my_modules.functions import get_ip, enforce_custom_limit
from my_modules.functions import (
get_ip,
enforce_custom_limit,
get_request_context,
)
from quart_common.web.env import is_development_environment
IGNORED_404_PATHS = [
"/.well-known/",
]
IGNORE_CONTAIN_404_PATHS = [
"/.htaccess",
]
FEATURE_FLAG_DISABLED_PREFIX = "feature_flag_disabled:"
AUTH_STATUS_TITLES = {
400: '400 - OAuth Time Paradox',
401: '401 - Not Authenticated',
403: '403 - Access Forbidden',
500: '500 - Auth Reactor Meltdown',
504: '504 - Auth Gateway Timeout',
}
AUTH_STATUS_MESSAGES = {
400: (
'The fox courier dropped your login form in a puddle before it reached the gatekeeper. '
'Please send it again with all required fields so the checkpoint can read it.'
),
401: (
'The fox guards checked your badge and it did not pass the sniff test this round. '
'Please sign in again so they can issue a fresh one.'
),
403: (
'The fox guards found your badge valid, but this den is still off-limits for your current role. '
'If you should have access, ask an admin to update your permissions.'
),
500: (
'The auth engine coughed up a spark and the fox mechanics are tightening bolts right now. '
'Please try again in a moment while they get the reactor stable.'
),
504: (
'The fox guards are still waiting for the auth mothership to answer the walkie-talkie. '
'Please try again in a moment before they start howling at the server rack.'
),
}
async def auth_error(message:str, status_code:int=400):
context = get_request_context()
if status_code in AUTH_STATUS_MESSAGES:
funny_message = AUTH_STATUS_MESSAGES[status_code]
else:
funny_message = (
'The fox guards tripped over a cable while checking your badge. '
'Authentication failed. Please try again or contact an administrator.'
)
logger.error(f"[AUTH:{status_code}] {message}")
if context and context.path.startswith("/api"):
return jsonify({"error": "Authentication Error", "message": funny_message}), status_code
return await render_template('views/basics/error.htm',
title='Authentication Error',
header={'title': AUTH_STATUS_TITLES.get(status_code, f'{status_code} - Authentication Error'), 'message': funny_message},
file={'name': 'auth_error.webp', 'alt': 'A monitor flashes unauthorized access in blinking red warning text'},
), status_code
@app.errorhandler(401)
async def handle_unauthorized(e):
if request.path.startswith("/api"):
context = get_request_context()
if context.path.startswith("/api"):
return jsonify({"error": "Unauthorized Access", "message": "Gandalf has spoken: You shall not pass… until you log in."}), 401
await logger.error(e)
logger.error(e)
return await render_template('views/basics/error.htm',
title='Unauthorized Access',
header={'title': '401 - Unauthorized', 'message': "Gandalf has spoken: You shall not pass… until you log in."},
@@ -24,11 +95,24 @@ async def not_found(e):
except LookupError as e:
return await to_many_requests(e)
if request.path.startswith("/api"):
return jsonify({"error": "Page Not Found", "message": "Oops! The page you are looking for does not exist."}), 404
context = get_request_context()
error_description = str(getattr(e, "description", ""))
is_feature_flag_disabled_404 = error_description.startswith(FEATURE_FLAG_DISABLED_PREFIX)
await logger.error(f"[404] Page Not Found: {request.path}")
await current_app.convex.increment_page_not_found_error(path=request.path, status=404)
if (
not is_development_environment()
and not is_feature_flag_disabled_404
and context.path not in IGNORED_404_PATHS
and not any(p in context.path for p in IGNORE_CONTAIN_404_PATHS)
):
await current_app.convex.increment_page_not_found_error(
path=context.path, status=404
)
logger.error(f"[404] Page Not Found: {context.path}")
if context.path.startswith("/api"):
return jsonify({"error": "Page Not Found", "message": "Oops! The page you are looking for does not exist."}), 404
return await render_template('views/basics/error.htm',
title='Page Not Found',
@@ -47,7 +131,7 @@ async def maybe_a_hacker(e=None):
method=request.method,
path=request.path,
)
await logger.warning(f"[HONEYPOT] Blocked {client_ip} after accessing {request.path}")
logger.warning(f"[HONEYPOT] Blocked {client_ip} after accessing {request.path}")
return await to_many_requests(e)
rendered = await render_template('views/basics/error.htm',
@@ -66,7 +150,8 @@ async def maybe_a_hacker(e=None):
async def to_many_requests(e):
message = "We love your enthusiasm, but our server thought it was being DDoSed… by you. The keyboard needs a new set of keys and we need a nap. Try again soon!"
if request.path.startswith("/api") or request.path.endswith('/auth/userinfo') or request.path.endswith('/auth/refresh'):
context = get_request_context()
if context.path.startswith("/api") or context.path.endswith('/auth/userinfo') or context.path.endswith('/auth/refresh'):
return jsonify({"error": "Too Many Requests - YOU SHALL NOT PASS (for now)", "message": message}), 429
return await render_template('views/basics/error.htm',
@@ -82,10 +167,11 @@ async def internal_server_error(e):
except LookupError as e:
return await to_many_requests(e)
if request.path.startswith("/api"):
context = get_request_context()
if context.path.startswith("/api"):
return jsonify({"error": "Internal Server Error", "message": "It looks like you broke something... but don't worry, we're fixing it! In the meantime, we may or may not have logged your IP address (just kidding... or are we?). Either way, thanks for helping us find new ways to crash our system. Stay curious, hacker-friend!"}), 500
await logger.error(e)
logger.error(e)
return await render_template('views/basics/error.htm',
title='Internal Server Error',
header={'title': '500 - Internal Server Error', 'message': "It looks like you broke something... but don't worry, we're fixing it! In the meantime, we may or may not have logged your IP address (just kidding... or are we?). Either way, thanks for helping us find new ways to crash our system. Stay curious, hacker-friend!"},
@@ -99,7 +185,7 @@ async def database_server_error(e):
except LookupError as e:
return await to_many_requests(e)
await logger.error(e)
logger.error(e)
return await render_template('views/basics/error.htm',
title='Database Error',
header={'title': '504 - Database Error', 'message': "It looks like something is broke on our end... but don't worry, we're fixing it! Either way, thanks for helping us find new ways to crash our system. Stay curious, hacker-friend!"},