diff --git a/my_modules/app/setup.py b/my_modules/app/setup.py index 787d137..f69a62e 100644 --- a/my_modules/app/setup.py +++ b/my_modules/app/setup.py @@ -2,7 +2,8 @@ from my_modules.functions import custom_limit_key from my_modules.app.constens import SECRET_KEY, UPLOAD_DIR from my_modules.AsyncCache import AsyncCache from my_modules.app.logger import logger -from my_modules.db.EdgeDB import EdgeDB + +from my_modules.db.ConvexDB import ConvexDB from quart_session import Session from flask_limiter import Limiter @@ -66,15 +67,12 @@ LIMITER = Limiter( ) @app.before_serving -async def init_edgedb(): - app.edgedb = EdgeDB( - database=os.getenv("EDGEDB_DATABASE"), - tls_security=None if app.debug else 'insecure' - ) - await app.edgedb.connect() +async def init_convex(): + app.convex = ConvexDB(os.getenv("CONVEX_URL"), service='nanoshare') + await app.convex.connect() @app.after_serving -async def close_edgedb(): - if app.edgedb: - await app.edgedb.close() +async def close_convex(): + if app.convex: + await app.convex.close() await logger.shutdown() diff --git a/my_modules/db/ConvexDB.py b/my_modules/db/ConvexDB.py new file mode 100644 index 0000000..089b543 --- /dev/null +++ b/my_modules/db/ConvexDB.py @@ -0,0 +1,403 @@ +from concurrent.futures import ThreadPoolExecutor +from my_modules.app.logger import logger + +from convex import ConvexClient, ConvexError, ConvexExecutionError +from urllib.parse import urlparse +from datetime import datetime +from quart import url_for +from urllib.parse import urlparse, urlunparse +from io import BytesIO +import aiohttp, httpx, asyncio + +class ConvexDB: + default_namespace = 'nanoshare' + service_protection = 'service/protection' + service_auth = 'service/auth' + + def __init__(self, dsn:str, service:str): + self.locked_printer = {} + self.client = None + self.dsn = dsn + self.service = service + + self.executor = ThreadPoolExecutor(max_workers=1) + self.loop = asyncio.get_event_loop() + + async def replace_url_host(self, url:str) -> str: + dsn = urlparse(self.dsn.rstrip('/')) + keep_content = urlparse(url) + + return urlunparse(( + dsn.scheme, + dsn.netloc, + keep_content.path, + keep_content.params, + keep_content.query, + keep_content.fragment, + )) + + # Connect Function + async def connect(self): + self.client = ConvexClient(self.dsn) + + async def close(self): + if self.client: + self.client = None + + def subscribe(self, name:str, args:dict): + return self.client.subscribe(name=f'{self.default_namespace}/{name}', args=args) + + # Query Helper Function + async def run_query_with_reconnection(self, function, *args:tuple, **kwargs): + while True: + try: + query = args[0].replace('\n ', '').replace(' ', '') + if args[1:]: + await logger.debug(f'{function.__name__} |{query}| {args[1:]}{kwargs}') + else: + await logger.debug(f'{function.__name__} |{query}| {kwargs}') + + return await self.loop.run_in_executor( + self.executor, lambda: function(*args, **kwargs) + ) + except Exception as e: + await logger.error(f'Query Database: {e}, Function: {args}') + break + + async def get_correct_storage_api_url(self, obj:dict, key:str): + update_value = obj[key] + if update_value: + obj[key] = url_for('basic.convex_storage_proxy', file_id=urlparse(update_value).path.rsplit('/', 1)[-1]) + return obj + + # Basic Blueprint Function + async def get_current_favicon(self): + data = await self.run_query_with_reconnection( + self.client.query, + f'{self.default_namespace}/cache/favicon:get', + args={} + ) + return data + + async def set_new_favicon(self, favicon:str): + data = await self.run_query_with_reconnection( + self.client.mutation, + f'{self.default_namespace}/cache/favicon:set', + args={ 'favicon': favicon } + ) + return data + + # File Quary Functions + async def get_file(self, file_id:str): + data = await self.run_query_with_reconnection( + self.client.query, + f"{self.default_namespace}/files:getByFileId", + args={ 'file_id': file_id } + ) + return data + + async def get_files(self, user_id:str): + data = await self.run_query_with_reconnection( + self.client.query, + f"{self.default_namespace}/files:getAllNotExpired", + args={ 'user_id': user_id } + ) + return [ { + "file_id": x['file_id'], + "file_name": x['file_name'], + "file_size": x['file_size'], + "note": x['note'], + "expires_at": int(x['expires_at']) if x.get('expires_at', None) else '', + "uploaded_at": int(x['uploaded_at']), + } for x in data ] + + async def add_file(self, file_name:str, file_size:str, note:str, content_type:str, expires_at:datetime, storage_id:str, user_id:str): + args = { + 'file_name': file_name, 'file_size': file_size, 'content_type': content_type, + 'note': note, + 'file_storage_id': storage_id, 'user_id': user_id + } + if expires_at: + args['expires_at'] = expires_at.isoformat() + + data = await self.run_query_with_reconnection( + self.client.mutation, + f"{self.default_namespace}/files:addNewFile", + args=args, + ) + return data + + async def update_file(self, file_id:str, file_name:str, note:str, expires_at:datetime, user_id:str): + await self.run_query_with_reconnection( + self.client.mutation, + f"{self.default_namespace}/files:updateFile", + args={ 'file_id': file_id, 'file_name': file_name, 'note': note, 'expires_at': expires_at.isoformat(), 'user_id': user_id } + ) + + async def delete_file(self, file_id:str, user_id:str): + await self.run_query_with_reconnection( + self.client.mutation, + f"{self.default_namespace}/files:deleteFile", + args={ 'file_id': file_id, 'user_id': user_id } + ) + + async def get_file_informations(self, file_id:str): + pass + + # File Access Quary Functions + async def add_file_access(self, file_id: str, ip_address:str, status:str, user_agent:str): + data = await self.run_query_with_reconnection( + self.client.mutation, + f"{self.default_namespace}/access:addNewAccess", + args={ 'file_id': file_id, 'ip_address': ip_address, 'user_agent': str(user_agent), 'status': status } + ) + return data + + async def get_all_access(self, user_id:str): + data = await self.run_query_with_reconnection( + self.client.query, + """ + select files { + file_id, + file_name, + note, + accesses: { + at, + status, + ip: { + value + }, + user_agent: { + value + } + } + order by .at desc + } + filter .user_id = $user_id + """, + user_id=user_id + ) + return sorted([{ + "file_id": file.file_id, + "file_name": file.file_name, + "file_note": file.note, + "status": access.status, + "ip": access.ip.value, + "user_agent": access.user_agent.value, + "accessed_at": access.at, + } for file in data for access in file.accesses], key=lambda x: x["accessed_at"], reverse=True) + + async def get_file_access(self, file_id: str): + data = await self.run_query_with_reconnection( + self.client.query_single, + """ + SELECT files { + accesses: { + status, + ip: { value }, + user_agent: { value }, + at + } + } + FILTER .file_id = $file_id + LIMIT 1 + """, + file_id=file_id, + ) + + if data: + return [{ + "status": str(access.status), + "ip": access.ip.value if access.ip else None, + "user_agent": access.user_agent.value if access.user_agent else None, + "accessed_at": access.at, + } for access in data.accesses] + return None + + # Stream Data from DB File Storage + async def stream_from_storage(self, convex_upload_url:str): + convex_upload_url = await self.replace_url_host(convex_upload_url) + + async with aiohttp.ClientSession() as session: + async with session.get(convex_upload_url) as resp: + resp.raise_for_status() + async for chunk in resp.content.iter_chunked(4096): + yield chunk + + async def get_from_storage(self, convex_upload_url:str): + convex_upload_url = await self.replace_url_host(convex_upload_url) + + async with httpx.AsyncClient() as client: + resp = await client.get(convex_upload_url) + resp.raise_for_status() + return BytesIO(resp.read()) + + async def send_to_storage(self, data:bytes, content_type:str): + convex_upload_url = await self.run_query_with_reconnection( + self.client.mutation, + f"service/files:prepareUpload", + args={} + ) + + convex_upload_url = await self.replace_url_host(convex_upload_url) + + async with httpx.AsyncClient() as client: + resp = await client.post(convex_upload_url, + headers={ + "Content-Type": content_type, + "Content-Length": str(len(data)), + }, + content=data, + ) + resp.raise_for_status() + return resp.json()["storageId"] + + # User + async def upsert_user(self, user:dict): + data = await self.run_query_with_reconnection( + self.client.mutation, + f'{self.service_auth}/users:upsert', + args={ + 'sub': user['sub'], + 'email': user['email'], + 'name': user['name'], + 'username': user['preferred_username'], + 'groups': user['groups'], + } + ) + return data + + async def login(self, code:str): + """ + NOT WORKING - AUTH Still Beta in Convex -> Currently do Manual + """ + data = self.client.action('auth:signIn', args={'provider': 'authentik', 'params': {'code': code}}) + print(data) + + # Blocked Paths Functions + async def set_blocked_paths(self, path:str, subpath:str): + data = await self.run_query_with_reconnection( + self.client.mutation, + f"{self.service_protection}/blockedPaths:create", + args={ 'path': path, 'subpath': subpath } + ) + return data + + async def get_blocked_paths(self) -> set[str]: + data = await self.run_query_with_reconnection( + self.client.query, + f"{self.service_protection}/blockedPaths:getAll", + args={} + ) + return data + + async def is_path_blocked(self, path:str) -> bool: + data = await self.run_query_with_reconnection( + self.client.query, + f"{self.service_protection}/blockedPaths:isBlocked", + args={ 'path': path } + ) + return data + + # Blocked IP Address Functions + async def set_ip_address_to_blocklist(self, ip_address:str, method:str, path:str, blocked:bool, accessed:int, last_access_at:datetime): + data = await self.run_query_with_reconnection( + self.client.mutation, + f"{self.service_protection}/blockedIpAdresses:create", + args={ 'ip': ip_address, 'method': method, 'path': path, 'blocked': blocked, 'accessed': accessed, 'last_access_at': last_access_at.isoformat() } + ) + return data + + async def increment_blocked_ip_address_access(self, ip_address:str, method:str, path:str): + data = await self.run_query_with_reconnection( + self.client.mutation, + f"{self.service_protection}/blockedIpAdresses:increment", + args={ 'ip': ip_address, 'method': method, 'path': path } + ) + if data == 1: + await logger.info(f'Add New IP Address to Block List {ip_address}, {method}, {path}') + + async def is_ip_address_whitelisted_or_blocked(self, ip_address:str) -> bool: + data = await self.run_query_with_reconnection( + self.client.query, + f"{self.service_protection}/blockedIpAdresses:isWhitelistedOrBlocked", + args={ 'ip': ip_address } + ) + return data + + # Token Functions + async def get_tokens(self, user_id:str): + data = await self.run_query_with_reconnection( + self.client.query, + f"{self.service_auth}/jwt:getRefreshTokenByUserAndService", + args={ 'service': self.service, 'user_id': user_id } + ) + return data + + async def add_refresh_token(self, token_name:str, user_id:str): + data = await self.run_query_with_reconnection( + self.client.mutation, + f"{self.service_auth}/jwt:createRefreshToken", + args={ 'token_name': token_name, 'service': self.service, 'user_id': user_id } + ) + return data + + async def update_refresh_token_name(self, token_name:str, token_id:str, user_id:str): + data = await self.run_query_with_reconnection( + self.client.mutation, + f"{self.service_auth}/jwt:updateRefreshToken", + args={ 'token_id': token_id, 'token_name': token_name, 'service': self.service, 'user_id': user_id } + ) + return data + + async def remove_refresh_token(self, token_id:str, user_id:str): + data = await self.run_query_with_reconnection( + self.client.mutation, + f"{self.service_auth}/jwt:deleteRefreshToken", + args={ 'token_id': token_id, 'service': self.service, 'user_id': user_id } + ) + return data + + async def generate_new_access_token(self, refresh_token:str): + data = await self.run_query_with_reconnection( + self.client.action, + f"{self.service_auth}/jwt:exchangeRefreshForAccess", + args={ 'refresh_token': refresh_token, 'service': self.service } + ) + + if data.get('new_token', None): + return { + "access_token": data['new_token']['access_token'], + "token_type": data['new_token']['token_type'], + "expires_in": int(data['new_token']['expires_in']), + "created_at": int(data['new_token']['created_at']) / 1000, + "expires_at": int(data['new_token']['expires_at']) / 1000, + }, data['refresh_id'] + return data, None + + async def decode_access_token_payload(self, access_token:str): + data = await self.run_query_with_reconnection( + self.client.action, + f"{self.service_auth}/jwt:decodeAccessToken", + args={ 'access_token': access_token, 'service': self.service } + ) + + if data.get('payload', None): + return data.get('payload') + return data + + # Error Page Access + async def set_page_not_found_error(self, path:str, status:int, accessed:int, last_access_at:datetime): + data = await self.run_query_with_reconnection( + self.client.mutation, + f"{self.service_protection}/badPageAccess:create", + args={ 'path': path, 'status': status, 'accessed': accessed, 'last_access_at': last_access_at.isoformat() } + ) + return data + + async def increment_page_not_found_error(self, path:str, status:int): + await self.run_query_with_reconnection( + self.client.mutation, + f"{self.service_protection}/badPageAccess:increment", + args={ 'path': path, 'status': status } + ) diff --git a/my_modules/db/EdgeDB.py b/my_modules/db/EdgeDB.py deleted file mode 100644 index a6b6a66..0000000 --- a/my_modules/db/EdgeDB.py +++ /dev/null @@ -1,345 +0,0 @@ -from my_modules.file_helper_functions import generate_short_id -from my_modules.app.logger import logger - -import asyncio, gel - -class EdgeDB: - def __init__(self, database:str=None, tls_security:str='insecure', timeout:int=1, max_retrys:int=10): - self.database = database - self.tls_security = tls_security - - self.timeout = timeout - self.max_retrys = max_retrys - - self.client = None - - # Connect Function - async def __aenter__(self): - await self.connect() - return self - - async def __aexit__(self, exc_type, exc, tb): - await self.close() - - async def connect(self): - self.client = gel.create_async_client( - tls_security=self.tls_security, - database=self.database, - timeout=self.timeout - ) - - async def close(self): - if self.client: - await self.client.aclose() - - # Query Helper Function - async def run_query_with_reconnection(self, function, *args:tuple, **kwargs): - retry_count = 0 - while True: - try: - query = args[0].replace('\n ', '').replace(' ', '') - if args[1:]: - await logger.debug(f'{function.__name__} |{query}| {args[1:]}{kwargs}') - else: - await logger.debug(f'{function.__name__} |{query}| {kwargs}') - - return await function(*args, **kwargs) - except gel.errors.ClientConnectionFailedError: - await self.connect() - await logger.error(f'Retry to Query Database: {retry_count}, Function: {args}') - if retry_count > self.max_retrys: - break - await asyncio.sleep(0.5) - retry_count += 1 - - # File Quary Functions - async def get_file(self, file_id:str): - data = await self.run_query_with_reconnection( - self.client.query_single, - """ - select files { - file_name, - content_type, - expires_at, - user_id - } - filter .file_id = $file_id - limit 1 - """, - file_id=file_id - ) - - if data: - return { - "file_name": data.file_name, - "content_type": data.content_type, - "expires_at": data.expires_at, - "user_id": data.user_id - } - return None - - async def get_files(self, current_datetime, user_id:str): - data = await self.run_query_with_reconnection( - self.client.query, - """ - select files { - file_id, - file_name, - file_size, - note, - uploaded_at, - expires_at - } - filter - .user_id = $user_id - and ( - (.expires_at ?? '9999-12-31T00:00:00Z') > $now - ) - order by .uploaded_at desc - """, - now=current_datetime, - user_id=user_id, - ) - return [{ - "file_id": i.file_id, - "file_name": i.file_name, - "file_size": i.file_size, - "note": i.note, - "uploaded_at": i.uploaded_at, - "expires_at": i.expires_at if i.expires_at else '', - } for i in data] - - async def add_file(self, file_name:str, file_size:str, note:str, content_type:str, uploaded_at, expires_at, user_id:str): - for attempt in range(10): - try: - return await self.run_query_with_reconnection( - self.client.query_single, - """ - insert files { - file_id := $file_id, - file_name := $file_name, - file_size := $file_size, - note := $note, - content_type := $content_type, - uploaded_at := $uploaded_at, - expires_at := $expires_at, - user_id := $user_id - }; - """, - file_id=generate_short_id(), - file_name=file_name, - file_size=file_size, - note=note, - content_type=content_type, - uploaded_at=uploaded_at, - expires_at=expires_at, - user_id=user_id, - ) - except gel.errors.ConstraintViolationError as e: - await logger.warning(f'file_id collision on attempt {attempt+1}, regenerating…') - continue - raise RuntimeError("Could not allocate unique file_id after multiple retries") - - async def update_file(self, file_id:str, file_name:str, note:str, expires_at, user_id:str): - return await self.run_query_with_reconnection( - self.client.query, - """ - update files - filter .file_id = $file_id and .user_id = $user_id - set { - file_name := $file_name, - note := $note, - expires_at := $expires_at - }; - """, - file_id=file_id, - file_name=file_name, - note=note, - expires_at=expires_at, - user_id=user_id, - ) - - async def delete_file(self, file_id:str, user_id:str): - await self.run_query_with_reconnection( - self.client.query, - """ - delete files - filter .file_id = $file_id AND .user_id = $user_id - """, - file_id=file_id, - user_id=user_id - ) - - async def get_expired_files(self:str, current_datetime): - data = await self.run_query_with_reconnection( - self.client.query, - """ - select files { - file_id, - file_name, - expires_at - } - filter .expires_at < $now - order by .expires_at asc; - """, - now=current_datetime - ) - return [{ - "file_id": item.file_id, - "file_name": item.file_name, - "expires_at": item.expires_at - } for item in data] - - async def delete_files_by_ids(self, remove_file_ids:list[str]): - if not remove_file_ids: - return - - await self.run_query_with_reconnection( - self.client.query, - """ - delete files - filter .file_id in array_unpack(>$ids); - """, - ids=remove_file_ids - ) - - async def get_file_informations(self, file_id:str): - pass - - # File Access Quary Functions - async def add_file_access(self, file_id: str, ip_address: str, status: str, user_agent: str, accessed_at): - return await self.run_query_with_reconnection( - self.client.query, - """ - WITH - used_file := ( - SELECT files - FILTER .file_id = $file_id - LIMIT 1 - ), - ip_obj := ( - INSERT IPAddr { value := $ip_address } - UNLESS CONFLICT ON .value - ELSE ( - SELECT IPAddr - FILTER .value = $ip_address - ) - ), - ua_obj := ( - INSERT UserAgent { value := $user_agent } - UNLESS CONFLICT ON .value - ELSE ( - SELECT UserAgent - FILTER .value = $user_agent - ) - ), - new_file_access := ( - INSERT file_access { - at := $accessed_at, - status := $status, - ip := ip_obj, - user_agent := ua_obj - } - ), - _updated_file := ( - UPDATE used_file - SET { accesses += (SELECT new_file_access) } - ) - - SELECT new_file_access { - at, - status, - ip: { value }, - user_agent: { value }, - }; - """, - file_id=file_id, - accessed_at=accessed_at, - ip_address=ip_address, - status=status, - user_agent=str(user_agent), - ) - - async def get_all_file_access(self): - data = await self.run_query_with_reconnection( - self.client.query, - """ - select file_access { - status, - ip: { - value - }, - user_agent: { - value - }, - at - } order by .at desc - """ - ) - return [{ - "status": str(file.status), - "ip": file.ip.value, - "user_agent": file.user_agent.value, - "accessed_at": file.at, - } for file in data] - - async def get_all_access_of_user(self, user_id:str): - data = await self.run_query_with_reconnection( - self.client.query, - """ - select files { - file_id, - file_name, - note, - accesses: { - at, - status, - ip: { - value - }, - user_agent: { - value - } - } - order by .at desc - } - filter .user_id = $user_id - """, - user_id=user_id - ) - return sorted([{ - "file_id": file.file_id, - "file_name": file.file_name, - "file_note": file.note, - "status": access.status, - "ip": access.ip.value, - "user_agent": access.user_agent.value, - "accessed_at": access.at, - } for file in data for access in file.accesses], key=lambda x: x["accessed_at"], reverse=True) - - async def get_file_access(self, file_id: str): - data = await self.run_query_with_reconnection( - self.client.query_single, - """ - SELECT files { - accesses: { - status, - ip: { value }, - user_agent: { value }, - at - } - } - FILTER .file_id = $file_id - LIMIT 1 - """, - file_id=file_id, - ) - - if data: - return [{ - "status": str(access.status), - "ip": access.ip.value if access.ip else None, - "user_agent": access.user_agent.value if access.user_agent else None, - "accessed_at": access.at, - } for access in data.accesses] - return None diff --git a/my_modules/file_helper_functions.py b/my_modules/file_helper_functions.py deleted file mode 100644 index e10e909..0000000 --- a/my_modules/file_helper_functions.py +++ /dev/null @@ -1,45 +0,0 @@ -from my_modules.app.constens import SECRET_KEY - -import hmac, hashlib, base64, secrets, string, time -from datetime import datetime, timezone - -def base64url_encode(data: bytes) -> str: - return base64.urlsafe_b64encode(data).decode().rstrip("=") - -def base64url_decode(data: str) -> bytes: - padding = '=' * (-len(data) % 4) - return base64.urlsafe_b64decode(data + padding) - -def generate_short_id(length=11): - alphabet = string.ascii_letters + string.digits + '_-' - while True: - token = ''.join(secrets.choice(alphabet) for _ in range(length)) - if not token.startswith('-'): - return token - -def generate_signed_url(file_id: str) -> str: - # signature based only on the file_id - sig = hmac.new(SECRET_KEY, file_id.encode(), hashlib.sha256).digest() - token = base64url_encode(sig) - return f"-{file_id}?sig={token}" - -def verify_signed_url(file_id: str, token: str, file_expiration: int) -> bool: - # check both the signature and the file's stored expiration time - expected_sig = hmac.new(SECRET_KEY, file_id.encode(), hashlib.sha256).digest() - valid_sig = hmac.compare_digest(base64url_encode(expected_sig), token) - not_expired = file_expiration >= time.time() - return valid_sig and not_expired - -def is_expired(expires_at): - if not expires_at: - return False - if expires_at.tzinfo is None: - expires_at = expires_at.replace(tzinfo=timezone.utc) - else: - expires_at = expires_at.astimezone(timezone.utc) - return expires_at <= datetime.now(timezone.utc) - -if __name__ == "__main__": - file_id = generate_short_id() - url = generate_signed_url(file_id) - print(url) diff --git a/routes/side/main.py b/routes/side/main.py index 5dd66d2..f39680d 100644 --- a/routes/side/main.py +++ b/routes/side/main.py @@ -1,11 +1,9 @@ -from my_modules.file_helper_functions import is_expired, verify_signed_url from my_modules.decoratory.header import login_required from my_modules.functions import get_ip from my_modules.app.setup import LIMITER from my_modules.app.logger import logger -from quart import Blueprint, request, session, Response, send_from_directory, render_template, abort, current_app -from datetime import datetime, timezone +from quart import Blueprint, request, session, Response, send_file, render_template, abort, current_app side_main_bp = Blueprint('side_main', __name__) @@ -19,31 +17,31 @@ async def index(): @side_main_bp.route('/access') @login_required async def access_list(user): - access_data = await current_app.edgedb.get_all_access_of_user(user_id=user['sub']) + access_data = await current_app.convex.get_all_access(user_id=user['sub']) return await render_template("views/webpage/access/list.htm", access_logs=access_data) @side_main_bp.route('/files') @login_required async def files_list(user): - files_data = await current_app.edgedb.get_files(current_datetime=datetime.now(timezone.utc), user_id=user['sub']) + files_data = await current_app.convex.get_files(user_id=user['sub']) return await render_template("views/webpage/files/list.htm", files=files_data) @side_main_bp.route('/files//info') @login_required async def file_info(file_id, user): - files_data = await current_app.edgedb.get_files(user_id=user['sub']) + files_data = await current_app.convex.get_files(user_id=user['sub']) return await render_template("views/webpage/files/info.htm", files=files_data) @side_main_bp.route('/files//edit') @login_required async def file_edit(file_id, user): - files_data = await current_app.edgedb.get_files(user_id=user['sub']) + files_data = await current_app.convex.get_files(user_id=user['sub']) return await render_template("views/webpage/files/edit.htm", files=files_data) @side_main_bp.route("/-") @LIMITER.limit("10 per minute;500 per hour;") async def serve_file(file_id: str): - file_data = await current_app.edgedb.get_file(file_id=file_id) + file_data = await current_app.convex.get_file(file_id=file_id) disable_logging = False if not file_data: @@ -53,9 +51,9 @@ async def serve_file(file_id: str): if user and user['sub'] == file_data['user_id']: disable_logging = True - if is_expired(file_data.get("expires_at")): + if file_data.get("expired", None): if not disable_logging: - await current_app.edgedb.add_file_access(file_id=file_id, ip_address=get_ip(), user_agent=request.user_agent, status="expired", accessed_at=datetime.now(timezone.utc)) + await current_app.convex.add_file_access(file_id=file_id, ip_address=get_ip(), user_agent=request.user_agent, status="expired") return Response("This file has expired.", status=410, headers={ "Cache-Control": "no-store", "X-Content-Type-Options": "nosniff", @@ -66,21 +64,20 @@ async def serve_file(file_id: str): force_download = request.args.get("download") in {"1", "true", "yes"} - path = current_app.upload_folder / file_name - if not path.exists() or not path.is_file(): + if not file_data.get('db_image_url', None): if not disable_logging: - await current_app.edgedb.add_file_access(file_id=file_id, ip_address=get_ip(), user_agent=request.user_agent, status="error", accessed_at=datetime.now(timezone.utc)) + await current_app.convex.add_file_access(file_id=file_id, ip_address=get_ip(), user_agent=request.user_agent, status="error") abort(404) if not disable_logging: - await current_app.edgedb.add_file_access(file_id=file_id, ip_address=get_ip(), user_agent=request.user_agent, status="ok", accessed_at=datetime.now(timezone.utc)) - return await send_from_directory( - directory=current_app.upload_folder, - file_name=file_name, + await current_app.convex.add_file_access(file_id=file_id, ip_address=get_ip(), user_agent=request.user_agent, status="ok") + + return await send_file( + filename_or_io=await current_app.convex.get_from_storage(file_data.get('db_image_url')), mimetype=content_type, as_attachment=force_download, attachment_filename=file_name, conditional=True, cache_timeout=60, - last_modified=path.stat().st_mtime + last_modified=int(file_data['uploaded_at']) / 1000 ) diff --git a/routes/side/upload.py b/routes/side/upload.py index fab2e36..814607f 100644 --- a/routes/side/upload.py +++ b/routes/side/upload.py @@ -68,12 +68,12 @@ async def read_all(uploaded) -> bytes: return await data return data -def ensure_utc(dt): +def ensure_utc(dt:datetime): """Ensure a timezone-aware UTC datetime or None.""" if dt is None: - return None + return None if dt.tzinfo is None: - return dt.replace(tzinfo=timezone.utc) + return dt.replace(tzinfo=timezone.utc) return dt.astimezone(timezone.utc) # --- Routes ------------------------------------------------------ @@ -121,24 +121,20 @@ async def api_upload(user): fname = iso_stamp_filename("pasted", ext) fname = safe_name(fname) - path = current_app.upload_folder / fname - data = await read_all(uploaded) - # write to disk - async with aiofiles.open(path, "wb") as f: - await f.write(data) + storage_id = await current_app.convex.send_to_storage(data=data, content_type=content_type) size_bytes = len(data) file_size_pretty = format_size(size_bytes) - await current_app.edgedb.add_file( + await current_app.convex.add_file( file_name=fname, file_size=file_size_pretty, note=note, content_type=content_type, - uploaded_at=datetime.now(timezone.utc), expires_at=expires_at_dt, + storage_id=storage_id, user_id=user['sub'], ) @@ -151,66 +147,19 @@ async def api_upload(user): async with aiofiles.open(path, "wb") as f: await f.write(data) + storage_id = await current_app.convex.send_to_storage(data=data, content_type="text/plain") + size_bytes = len(data) file_size_pretty = format_size(size_bytes) - await current_app.edgedb.add_file( + await current_app.convex.add_file( file_name=fname, file_size=file_size_pretty, note=note, content_type="text/plain", - uploaded_at=datetime.now(timezone.utc), expires_at=expires_at_dt, + storage_id=storage_id, user_id=user['sub'], ) return jsonify({"ok": True}) - -# --- Background cleanup ------------------------------------------------------ - -async def cleanup_task(): - """Hourly cleanup of expired files based on EdgeDB.""" - await asyncio.sleep(3) # allow app startup - while True: - try: - now = datetime.now(timezone.utc) - - expired = await current_app.edgedb.get_expired_files(now) - if not expired: - await asyncio.sleep(3600) - continue - - upload_dir: Path = current_app.upload_folder # ensure Path - removed_ids: list[str] = [] - - for rec in expired: - try: - # Defensive: only touch files under your upload dir - fpath = (upload_dir / rec['file_name']).resolve() - if upload_dir.resolve() in fpath.parents or fpath == upload_dir.resolve(): - fpath.unlink(missing_ok=True) - removed_ids.append(rec['file_id']) - else: - current_app.logger.warning("Refusing to delete outside upload dir: %s", fpath) - except Exception as e: - current_app.logger.exception("Failed to delete file %s (%s)", rec['file_name'], rec['file_id']) - - # Remove DB rows for files we actually deleted from disk - if removed_ids: - try: - await current_app.edgedb.delete_files_by_ids(removed_ids) - current_app.logger.info("Deleted %d expired files from disk and database: %s", len(removed_ids), ", ".join(removed_ids)) - except Exception: - current_app.logger.exception("Failed to delete DB rows for expired files") - else: - current_app.logger.info("No files where expired or deleted at %s", now.isoformat()) - - except Exception: - current_app.logger.exception("Cleanup task iteration failed") - - await asyncio.sleep(3600) # every hour - - -@upload_bp.before_app_serving -async def start_cleanup(): - asyncio.create_task(cleanup_task()) diff --git a/run.py b/run.py index c480fc4..f401afd 100755 --- a/run.py +++ b/run.py @@ -1,4 +1,4 @@ -#!/usr/bin/env python3 +#!/usr/bin/env -S uv run --script import quart_flask_patch import asyncio asyncio.set_event_loop_policy(asyncio.DefaultEventLoopPolicy()) @@ -21,4 +21,4 @@ app.register_blueprint(side_main_bp) app.register_blueprint(upload_bp) if __name__ == '__main__': - app.run(debug=WEB_DEBUG, port=5500) + app.run(debug=WEB_DEBUG, port=5502) diff --git a/templates/side/views/webpage/files/list.htm b/templates/side/views/webpage/files/list.htm index 87e2593..171893a 100644 --- a/templates/side/views/webpage/files/list.htm +++ b/templates/side/views/webpage/files/list.htm @@ -75,7 +75,7 @@ const datetime = timeEl.getAttribute("datetime"); if (!datetime) return; - const date = new Date(datetime); + const date = new Date(Number.parseInt(datetime)); timeEl.title = date.toISOString(); timeEl.textContent = date.toLocaleString(undefined, {