From fefda61c0b7f4803a090b95d8ce757a3a7e16a5e Mon Sep 17 00:00:00 2001 From: Daniel Dolezal Date: Mon, 22 Dec 2025 11:43:05 +0100 Subject: [PATCH] use new convex db base class and remove code that is already into the base class --- my_helpers | 2 +- my_modules/app/setup.py | 2 +- my_modules/db/ConvexDB.py | 289 ++----------------------------------- routes/handeling/basics.py | 40 ++--- 4 files changed, 26 insertions(+), 307 deletions(-) diff --git a/my_helpers b/my_helpers index 687d99d..c47b259 160000 --- a/my_helpers +++ b/my_helpers @@ -1 +1 @@ -Subproject commit 687d99d21fd7d6c6ee759bc248179f1f5cec0d32 +Subproject commit c47b25903f5bc9c6c131e19aef780fbfbb6b1d6d diff --git a/my_modules/app/setup.py b/my_modules/app/setup.py index f69a62e..615894b 100644 --- a/my_modules/app/setup.py +++ b/my_modules/app/setup.py @@ -68,7 +68,7 @@ LIMITER = Limiter( @app.before_serving async def init_convex(): - app.convex = ConvexDB(os.getenv("CONVEX_URL"), service='nanoshare') + app.convex = ConvexDB(os.getenv("CONVEX_URL")) await app.convex.connect() @app.after_serving diff --git a/my_modules/db/ConvexDB.py b/my_modules/db/ConvexDB.py index 089b543..2972141 100644 --- a/my_modules/db/ConvexDB.py +++ b/my_modules/db/ConvexDB.py @@ -1,97 +1,20 @@ -from concurrent.futures import ThreadPoolExecutor +from my_helpers.ConvexDbBase import ConvexDbBase from my_modules.app.logger import logger -from convex import ConvexClient, ConvexError, ConvexExecutionError -from urllib.parse import urlparse +from convex import ConvexError, ConvexExecutionError from datetime import datetime -from quart import url_for -from urllib.parse import urlparse, urlunparse -from io import BytesIO -import aiohttp, httpx, asyncio -class ConvexDB: - default_namespace = 'nanoshare' - service_protection = 'service/protection' - service_auth = 'service/auth' +class ConvexDB(ConvexDbBase): + service_namespace = 'nanoshare' - def __init__(self, dsn:str, service:str): - self.locked_printer = {} - self.client = None - self.dsn = dsn - self.service = service - - self.executor = ThreadPoolExecutor(max_workers=1) - self.loop = asyncio.get_event_loop() - - async def replace_url_host(self, url:str) -> str: - dsn = urlparse(self.dsn.rstrip('/')) - keep_content = urlparse(url) - - return urlunparse(( - dsn.scheme, - dsn.netloc, - keep_content.path, - keep_content.params, - keep_content.query, - keep_content.fragment, - )) - - # Connect Function - async def connect(self): - self.client = ConvexClient(self.dsn) - - async def close(self): - if self.client: - self.client = None - - def subscribe(self, name:str, args:dict): - return self.client.subscribe(name=f'{self.default_namespace}/{name}', args=args) - - # Query Helper Function - async def run_query_with_reconnection(self, function, *args:tuple, **kwargs): - while True: - try: - query = args[0].replace('\n ', '').replace(' ', '') - if args[1:]: - await logger.debug(f'{function.__name__} |{query}| {args[1:]}{kwargs}') - else: - await logger.debug(f'{function.__name__} |{query}| {kwargs}') - - return await self.loop.run_in_executor( - self.executor, lambda: function(*args, **kwargs) - ) - except Exception as e: - await logger.error(f'Query Database: {e}, Function: {args}') - break - - async def get_correct_storage_api_url(self, obj:dict, key:str): - update_value = obj[key] - if update_value: - obj[key] = url_for('basic.convex_storage_proxy', file_id=urlparse(update_value).path.rsplit('/', 1)[-1]) - return obj - - # Basic Blueprint Function - async def get_current_favicon(self): - data = await self.run_query_with_reconnection( - self.client.query, - f'{self.default_namespace}/cache/favicon:get', - args={} - ) - return data - - async def set_new_favicon(self, favicon:str): - data = await self.run_query_with_reconnection( - self.client.mutation, - f'{self.default_namespace}/cache/favicon:set', - args={ 'favicon': favicon } - ) - return data + def __init__(self, dsn:str): + super().__init__(dsn=dsn, service=ConvexDB.service_namespace) # File Quary Functions async def get_file(self, file_id:str): data = await self.run_query_with_reconnection( self.client.query, - f"{self.default_namespace}/files:getByFileId", + f"{self.service_namespace}/files:getByFileId", args={ 'file_id': file_id } ) return data @@ -99,7 +22,7 @@ class ConvexDB: async def get_files(self, user_id:str): data = await self.run_query_with_reconnection( self.client.query, - f"{self.default_namespace}/files:getAllNotExpired", + f"{self.service_namespace}/files:getAllNotExpired", args={ 'user_id': user_id } ) return [ { @@ -122,7 +45,7 @@ class ConvexDB: data = await self.run_query_with_reconnection( self.client.mutation, - f"{self.default_namespace}/files:addNewFile", + f"{self.service_namespace}/files:addNewFile", args=args, ) return data @@ -130,14 +53,14 @@ class ConvexDB: async def update_file(self, file_id:str, file_name:str, note:str, expires_at:datetime, user_id:str): await self.run_query_with_reconnection( self.client.mutation, - f"{self.default_namespace}/files:updateFile", + f"{self.service_namespace}/files:updateFile", args={ 'file_id': file_id, 'file_name': file_name, 'note': note, 'expires_at': expires_at.isoformat(), 'user_id': user_id } ) async def delete_file(self, file_id:str, user_id:str): await self.run_query_with_reconnection( self.client.mutation, - f"{self.default_namespace}/files:deleteFile", + f"{self.service_namespace}/files:deleteFile", args={ 'file_id': file_id, 'user_id': user_id } ) @@ -148,7 +71,7 @@ class ConvexDB: async def add_file_access(self, file_id: str, ip_address:str, status:str, user_agent:str): data = await self.run_query_with_reconnection( self.client.mutation, - f"{self.default_namespace}/access:addNewAccess", + f"{self.service_namespace}/access:addNewAccess", args={ 'file_id': file_id, 'ip_address': ip_address, 'user_agent': str(user_agent), 'status': status } ) return data @@ -213,191 +136,3 @@ class ConvexDB: "accessed_at": access.at, } for access in data.accesses] return None - - # Stream Data from DB File Storage - async def stream_from_storage(self, convex_upload_url:str): - convex_upload_url = await self.replace_url_host(convex_upload_url) - - async with aiohttp.ClientSession() as session: - async with session.get(convex_upload_url) as resp: - resp.raise_for_status() - async for chunk in resp.content.iter_chunked(4096): - yield chunk - - async def get_from_storage(self, convex_upload_url:str): - convex_upload_url = await self.replace_url_host(convex_upload_url) - - async with httpx.AsyncClient() as client: - resp = await client.get(convex_upload_url) - resp.raise_for_status() - return BytesIO(resp.read()) - - async def send_to_storage(self, data:bytes, content_type:str): - convex_upload_url = await self.run_query_with_reconnection( - self.client.mutation, - f"service/files:prepareUpload", - args={} - ) - - convex_upload_url = await self.replace_url_host(convex_upload_url) - - async with httpx.AsyncClient() as client: - resp = await client.post(convex_upload_url, - headers={ - "Content-Type": content_type, - "Content-Length": str(len(data)), - }, - content=data, - ) - resp.raise_for_status() - return resp.json()["storageId"] - - # User - async def upsert_user(self, user:dict): - data = await self.run_query_with_reconnection( - self.client.mutation, - f'{self.service_auth}/users:upsert', - args={ - 'sub': user['sub'], - 'email': user['email'], - 'name': user['name'], - 'username': user['preferred_username'], - 'groups': user['groups'], - } - ) - return data - - async def login(self, code:str): - """ - NOT WORKING - AUTH Still Beta in Convex -> Currently do Manual - """ - data = self.client.action('auth:signIn', args={'provider': 'authentik', 'params': {'code': code}}) - print(data) - - # Blocked Paths Functions - async def set_blocked_paths(self, path:str, subpath:str): - data = await self.run_query_with_reconnection( - self.client.mutation, - f"{self.service_protection}/blockedPaths:create", - args={ 'path': path, 'subpath': subpath } - ) - return data - - async def get_blocked_paths(self) -> set[str]: - data = await self.run_query_with_reconnection( - self.client.query, - f"{self.service_protection}/blockedPaths:getAll", - args={} - ) - return data - - async def is_path_blocked(self, path:str) -> bool: - data = await self.run_query_with_reconnection( - self.client.query, - f"{self.service_protection}/blockedPaths:isBlocked", - args={ 'path': path } - ) - return data - - # Blocked IP Address Functions - async def set_ip_address_to_blocklist(self, ip_address:str, method:str, path:str, blocked:bool, accessed:int, last_access_at:datetime): - data = await self.run_query_with_reconnection( - self.client.mutation, - f"{self.service_protection}/blockedIpAdresses:create", - args={ 'ip': ip_address, 'method': method, 'path': path, 'blocked': blocked, 'accessed': accessed, 'last_access_at': last_access_at.isoformat() } - ) - return data - - async def increment_blocked_ip_address_access(self, ip_address:str, method:str, path:str): - data = await self.run_query_with_reconnection( - self.client.mutation, - f"{self.service_protection}/blockedIpAdresses:increment", - args={ 'ip': ip_address, 'method': method, 'path': path } - ) - if data == 1: - await logger.info(f'Add New IP Address to Block List {ip_address}, {method}, {path}') - - async def is_ip_address_whitelisted_or_blocked(self, ip_address:str) -> bool: - data = await self.run_query_with_reconnection( - self.client.query, - f"{self.service_protection}/blockedIpAdresses:isWhitelistedOrBlocked", - args={ 'ip': ip_address } - ) - return data - - # Token Functions - async def get_tokens(self, user_id:str): - data = await self.run_query_with_reconnection( - self.client.query, - f"{self.service_auth}/jwt:getRefreshTokenByUserAndService", - args={ 'service': self.service, 'user_id': user_id } - ) - return data - - async def add_refresh_token(self, token_name:str, user_id:str): - data = await self.run_query_with_reconnection( - self.client.mutation, - f"{self.service_auth}/jwt:createRefreshToken", - args={ 'token_name': token_name, 'service': self.service, 'user_id': user_id } - ) - return data - - async def update_refresh_token_name(self, token_name:str, token_id:str, user_id:str): - data = await self.run_query_with_reconnection( - self.client.mutation, - f"{self.service_auth}/jwt:updateRefreshToken", - args={ 'token_id': token_id, 'token_name': token_name, 'service': self.service, 'user_id': user_id } - ) - return data - - async def remove_refresh_token(self, token_id:str, user_id:str): - data = await self.run_query_with_reconnection( - self.client.mutation, - f"{self.service_auth}/jwt:deleteRefreshToken", - args={ 'token_id': token_id, 'service': self.service, 'user_id': user_id } - ) - return data - - async def generate_new_access_token(self, refresh_token:str): - data = await self.run_query_with_reconnection( - self.client.action, - f"{self.service_auth}/jwt:exchangeRefreshForAccess", - args={ 'refresh_token': refresh_token, 'service': self.service } - ) - - if data.get('new_token', None): - return { - "access_token": data['new_token']['access_token'], - "token_type": data['new_token']['token_type'], - "expires_in": int(data['new_token']['expires_in']), - "created_at": int(data['new_token']['created_at']) / 1000, - "expires_at": int(data['new_token']['expires_at']) / 1000, - }, data['refresh_id'] - return data, None - - async def decode_access_token_payload(self, access_token:str): - data = await self.run_query_with_reconnection( - self.client.action, - f"{self.service_auth}/jwt:decodeAccessToken", - args={ 'access_token': access_token, 'service': self.service } - ) - - if data.get('payload', None): - return data.get('payload') - return data - - # Error Page Access - async def set_page_not_found_error(self, path:str, status:int, accessed:int, last_access_at:datetime): - data = await self.run_query_with_reconnection( - self.client.mutation, - f"{self.service_protection}/badPageAccess:create", - args={ 'path': path, 'status': status, 'accessed': accessed, 'last_access_at': last_access_at.isoformat() } - ) - return data - - async def increment_page_not_found_error(self, path:str, status:int): - await self.run_query_with_reconnection( - self.client.mutation, - f"{self.service_protection}/badPageAccess:increment", - args={ 'path': path, 'status': status } - ) diff --git a/routes/handeling/basics.py b/routes/handeling/basics.py index 421b0df..021fd6b 100644 --- a/routes/handeling/basics.py +++ b/routes/handeling/basics.py @@ -1,40 +1,24 @@ -from my_modules.app.setup import LIMITER, cache +from my_modules.app.setup import LIMITER -from quart import Blueprint, send_from_directory, render_template, current_app -from datetime import datetime +from quart import Blueprint, send_from_directory, render_template, current_app, Response, redirect basic_bp = Blueprint('basic', __name__) @basic_bp.route('/favicon', methods=['GET']) @basic_bp.route('/favicon.ico', methods=['GET']) @LIMITER.exempt -async def favicon(cache_key:str='favicon'): - cache_favicon_name = await cache.get(cache_key) - if cache_favicon_name: - file_name = cache_favicon_name - else: - current_year = datetime.now().year - - autumn_start = datetime(current_year, 9, 23) - autumn_end = datetime(current_year, 12, 21) - winter_start = datetime(current_year, 12, 21) - winter_end = datetime(current_year, 3, 20) - - # Get the current date - current_date = datetime.now() - - if autumn_start <= current_date <= autumn_end: - file_name = '1. autumn.gif' - elif current_date >= winter_start or current_date <= winter_end: - file_name = '2. winter.png' - else: - file_name = '0. default.svg' - - await cache.set(cache_key, file_name, ttl=21600) - - return await send_from_directory(current_app.static_folder, f'images/favicons/{file_name}') +async def favicon(): + file_data = await current_app.convex.get_current_favicon() + return redirect(file_data['file_id']) @basic_bp.route('/robots.txt', methods=['GET']) @LIMITER.limit('3 per day') async def robots(): return await send_from_directory(current_app.static_folder, f'robots.txt') + +@basic_bp.route("/storage/") +async def convex_storage_proxy(file_id:str): + return Response( + current_app.convex.stream_from_storage(file_id, add_api_path=True), + mimetype="application/octet-stream" + )