from concurrent.futures import ThreadPoolExecutor from my_modules.app.logger import logger from convex import ConvexClient, ConvexError, ConvexExecutionError from urllib.parse import urlparse from datetime import datetime from quart import url_for from urllib.parse import urlparse, urlunparse from io import BytesIO import aiohttp, httpx, asyncio class ConvexDB: default_namespace = 'nanoshare' service_protection = 'service/protection' service_auth = 'service/auth' def __init__(self, dsn:str, service:str): self.locked_printer = {} self.client = None self.dsn = dsn self.service = service self.executor = ThreadPoolExecutor(max_workers=1) self.loop = asyncio.get_event_loop() async def replace_url_host(self, url:str) -> str: dsn = urlparse(self.dsn.rstrip('/')) keep_content = urlparse(url) return urlunparse(( dsn.scheme, dsn.netloc, keep_content.path, keep_content.params, keep_content.query, keep_content.fragment, )) # Connect Function async def connect(self): self.client = ConvexClient(self.dsn) async def close(self): if self.client: self.client = None def subscribe(self, name:str, args:dict): return self.client.subscribe(name=f'{self.default_namespace}/{name}', args=args) # Query Helper Function async def run_query_with_reconnection(self, function, *args:tuple, **kwargs): while True: try: query = args[0].replace('\n ', '').replace(' ', '') if args[1:]: await logger.debug(f'{function.__name__} |{query}| {args[1:]}{kwargs}') else: await logger.debug(f'{function.__name__} |{query}| {kwargs}') return await self.loop.run_in_executor( self.executor, lambda: function(*args, **kwargs) ) except Exception as e: await logger.error(f'Query Database: {e}, Function: {args}') break async def get_correct_storage_api_url(self, obj:dict, key:str): update_value = obj[key] if update_value: obj[key] = url_for('basic.convex_storage_proxy', file_id=urlparse(update_value).path.rsplit('/', 1)[-1]) return obj # Basic Blueprint Function async def get_current_favicon(self): data = await self.run_query_with_reconnection( self.client.query, f'{self.default_namespace}/cache/favicon:get', args={} ) return data async def set_new_favicon(self, favicon:str): data = await self.run_query_with_reconnection( self.client.mutation, f'{self.default_namespace}/cache/favicon:set', args={ 'favicon': favicon } ) return data # File Quary Functions async def get_file(self, file_id:str): data = await self.run_query_with_reconnection( self.client.query, f"{self.default_namespace}/files:getByFileId", args={ 'file_id': file_id } ) return data async def get_files(self, user_id:str): data = await self.run_query_with_reconnection( self.client.query, f"{self.default_namespace}/files:getAllNotExpired", args={ 'user_id': user_id } ) return [ { "file_id": x['file_id'], "file_name": x['file_name'], "file_size": x['file_size'], "note": x['note'], "expires_at": int(x['expires_at']) if x.get('expires_at', None) else '', "uploaded_at": int(x['uploaded_at']), } for x in data ] async def add_file(self, file_name:str, file_size:str, note:str, content_type:str, expires_at:datetime, storage_id:str, user_id:str): args = { 'file_name': file_name, 'file_size': file_size, 'content_type': content_type, 'note': note, 'file_storage_id': storage_id, 'user_id': user_id } if expires_at: args['expires_at'] = expires_at.isoformat() data = await self.run_query_with_reconnection( self.client.mutation, f"{self.default_namespace}/files:addNewFile", args=args, ) return data async def update_file(self, file_id:str, file_name:str, note:str, expires_at:datetime, user_id:str): await self.run_query_with_reconnection( self.client.mutation, f"{self.default_namespace}/files:updateFile", args={ 'file_id': file_id, 'file_name': file_name, 'note': note, 'expires_at': expires_at.isoformat(), 'user_id': user_id } ) async def delete_file(self, file_id:str, user_id:str): await self.run_query_with_reconnection( self.client.mutation, f"{self.default_namespace}/files:deleteFile", args={ 'file_id': file_id, 'user_id': user_id } ) async def get_file_informations(self, file_id:str): pass # File Access Quary Functions async def add_file_access(self, file_id: str, ip_address:str, status:str, user_agent:str): data = await self.run_query_with_reconnection( self.client.mutation, f"{self.default_namespace}/access:addNewAccess", args={ 'file_id': file_id, 'ip_address': ip_address, 'user_agent': str(user_agent), 'status': status } ) return data async def get_all_access(self, user_id:str): data = await self.run_query_with_reconnection( self.client.query, """ select files { file_id, file_name, note, accesses: { at, status, ip: { value }, user_agent: { value } } order by .at desc } filter .user_id = $user_id """, user_id=user_id ) return sorted([{ "file_id": file.file_id, "file_name": file.file_name, "file_note": file.note, "status": access.status, "ip": access.ip.value, "user_agent": access.user_agent.value, "accessed_at": access.at, } for file in data for access in file.accesses], key=lambda x: x["accessed_at"], reverse=True) async def get_file_access(self, file_id: str): data = await self.run_query_with_reconnection( self.client.query_single, """ SELECT files { accesses: { status, ip: { value }, user_agent: { value }, at } } FILTER .file_id = $file_id LIMIT 1 """, file_id=file_id, ) if data: return [{ "status": str(access.status), "ip": access.ip.value if access.ip else None, "user_agent": access.user_agent.value if access.user_agent else None, "accessed_at": access.at, } for access in data.accesses] return None # Stream Data from DB File Storage async def stream_from_storage(self, convex_upload_url:str): convex_upload_url = await self.replace_url_host(convex_upload_url) async with aiohttp.ClientSession() as session: async with session.get(convex_upload_url) as resp: resp.raise_for_status() async for chunk in resp.content.iter_chunked(4096): yield chunk async def get_from_storage(self, convex_upload_url:str): convex_upload_url = await self.replace_url_host(convex_upload_url) async with httpx.AsyncClient() as client: resp = await client.get(convex_upload_url) resp.raise_for_status() return BytesIO(resp.read()) async def send_to_storage(self, data:bytes, content_type:str): convex_upload_url = await self.run_query_with_reconnection( self.client.mutation, f"service/files:prepareUpload", args={} ) convex_upload_url = await self.replace_url_host(convex_upload_url) async with httpx.AsyncClient() as client: resp = await client.post(convex_upload_url, headers={ "Content-Type": content_type, "Content-Length": str(len(data)), }, content=data, ) resp.raise_for_status() return resp.json()["storageId"] # User async def upsert_user(self, user:dict): data = await self.run_query_with_reconnection( self.client.mutation, f'{self.service_auth}/users:upsert', args={ 'sub': user['sub'], 'email': user['email'], 'name': user['name'], 'username': user['preferred_username'], 'groups': user['groups'], } ) return data async def login(self, code:str): """ NOT WORKING - AUTH Still Beta in Convex -> Currently do Manual """ data = self.client.action('auth:signIn', args={'provider': 'authentik', 'params': {'code': code}}) print(data) # Blocked Paths Functions async def set_blocked_paths(self, path:str, subpath:str): data = await self.run_query_with_reconnection( self.client.mutation, f"{self.service_protection}/blockedPaths:create", args={ 'path': path, 'subpath': subpath } ) return data async def get_blocked_paths(self) -> set[str]: data = await self.run_query_with_reconnection( self.client.query, f"{self.service_protection}/blockedPaths:getAll", args={} ) return data async def is_path_blocked(self, path:str) -> bool: data = await self.run_query_with_reconnection( self.client.query, f"{self.service_protection}/blockedPaths:isBlocked", args={ 'path': path } ) return data # Blocked IP Address Functions async def set_ip_address_to_blocklist(self, ip_address:str, method:str, path:str, blocked:bool, accessed:int, last_access_at:datetime): data = await self.run_query_with_reconnection( self.client.mutation, f"{self.service_protection}/blockedIpAdresses:create", args={ 'ip': ip_address, 'method': method, 'path': path, 'blocked': blocked, 'accessed': accessed, 'last_access_at': last_access_at.isoformat() } ) return data async def increment_blocked_ip_address_access(self, ip_address:str, method:str, path:str): data = await self.run_query_with_reconnection( self.client.mutation, f"{self.service_protection}/blockedIpAdresses:increment", args={ 'ip': ip_address, 'method': method, 'path': path } ) if data == 1: await logger.info(f'Add New IP Address to Block List {ip_address}, {method}, {path}') async def is_ip_address_whitelisted_or_blocked(self, ip_address:str) -> bool: data = await self.run_query_with_reconnection( self.client.query, f"{self.service_protection}/blockedIpAdresses:isWhitelistedOrBlocked", args={ 'ip': ip_address } ) return data # Token Functions async def get_tokens(self, user_id:str): data = await self.run_query_with_reconnection( self.client.query, f"{self.service_auth}/jwt:getRefreshTokenByUserAndService", args={ 'service': self.service, 'user_id': user_id } ) return data async def add_refresh_token(self, token_name:str, user_id:str): data = await self.run_query_with_reconnection( self.client.mutation, f"{self.service_auth}/jwt:createRefreshToken", args={ 'token_name': token_name, 'service': self.service, 'user_id': user_id } ) return data async def update_refresh_token_name(self, token_name:str, token_id:str, user_id:str): data = await self.run_query_with_reconnection( self.client.mutation, f"{self.service_auth}/jwt:updateRefreshToken", args={ 'token_id': token_id, 'token_name': token_name, 'service': self.service, 'user_id': user_id } ) return data async def remove_refresh_token(self, token_id:str, user_id:str): data = await self.run_query_with_reconnection( self.client.mutation, f"{self.service_auth}/jwt:deleteRefreshToken", args={ 'token_id': token_id, 'service': self.service, 'user_id': user_id } ) return data async def generate_new_access_token(self, refresh_token:str): data = await self.run_query_with_reconnection( self.client.action, f"{self.service_auth}/jwt:exchangeRefreshForAccess", args={ 'refresh_token': refresh_token, 'service': self.service } ) if data.get('new_token', None): return { "access_token": data['new_token']['access_token'], "token_type": data['new_token']['token_type'], "expires_in": int(data['new_token']['expires_in']), "created_at": int(data['new_token']['created_at']) / 1000, "expires_at": int(data['new_token']['expires_at']) / 1000, }, data['refresh_id'] return data, None async def decode_access_token_payload(self, access_token:str): data = await self.run_query_with_reconnection( self.client.action, f"{self.service_auth}/jwt:decodeAccessToken", args={ 'access_token': access_token, 'service': self.service } ) if data.get('payload', None): return data.get('payload') return data # Error Page Access async def set_page_not_found_error(self, path:str, status:int, accessed:int, last_access_at:datetime): data = await self.run_query_with_reconnection( self.client.mutation, f"{self.service_protection}/badPageAccess:create", args={ 'path': path, 'status': status, 'accessed': accessed, 'last_access_at': last_access_at.isoformat() } ) return data async def increment_page_not_found_error(self, path:str, status:int): await self.run_query_with_reconnection( self.client.mutation, f"{self.service_protection}/badPageAccess:increment", args={ 'path': path, 'status': status } )