diff --git a/my_modules/EdgeDB.py b/my_modules/EdgeDB.py new file mode 100644 index 0000000..1954294 --- /dev/null +++ b/my_modules/EdgeDB.py @@ -0,0 +1,202 @@ +from my_modules.app.logger import logger + +from datetime import datetime, timedelta, timezone +from zoneinfo import ZoneInfo +from collections import Counter +import asyncio, gel, json + +class EdgeDB: + def __init__(self, database:str=None, tls_security:str='insecure', timeout:int=1, max_retrys:int=10): + self.database = database + self.tls_security = tls_security + + self.timeout = timeout + self.max_retrys = max_retrys + + self.client = None + + # Connect Function + async def __aenter__(self): + await self.connect() + return self + + async def __aexit__(self, exc_type, exc, tb): + await self.close() + + async def connect(self): + self.client = gel.create_async_client( + tls_security=self.tls_security, + database=self.database, + timeout=self.timeout + ) + + async def close(self): + if self.client: + await self.client.aclose() + + # Query Helper Function + async def run_query_with_reconnection(self, function, *args:tuple, **kwargs): + retry_count = 0 + while True: + try: + query = args[0].replace('\n ', '').replace(' ', '') + if args[1:]: + await logger.debug(f'{function.__name__} |{query}| {args[1:]}{kwargs}') + else: + await logger.debug(f'{function.__name__} |{query}| {kwargs}') + + return await function(*args, **kwargs) + except gel.errors.ClientConnectionFailedError: + await self.connect() + await logger.error(f'Retry to Query Database: {retry_count}, Function: {args}') + if retry_count > self.max_retrys: + break + await asyncio.sleep(0.5) + retry_count += 1 + + # App Quary Functions + async def get_file(self, file_id:str): + data = await self.run_query_with_reconnection( + self.client.query_single, + """ + select files { + file_name, + content_type, + expires_at + } + filter .file_id = $file_id + limit 1 + """, + file_id=file_id + ) + + if data: + return { + "file_name": data.file_name, + "content_type": data.content_type, + "expires_at": data.expires_at + } + return None + + async def get_files(self, current_datetime, user_id: str): + data = await self.run_query_with_reconnection( + self.client.query, + """ + select files { + file_id, + file_name, + file_size, + note, + uploaded_at, + expires_at + } + filter + .user_id = $user_id + and ( + (.expires_at ?? '9999-12-31T00:00:00Z') > $now + ) + order by .uploaded_at desc + """, + now=current_datetime, + user_id=user_id, + ) + return [ + { + "file_id": i.file_id, + "file_name": i.file_name, + "file_size": i.file_size, + "note": i.note, + "uploaded_at": i.uploaded_at, + "expires_at": i.expires_at, + } for i in data + ] + + async def add_file(self, file_id, file_name, file_size, note, content_type, uploaded_at, expires_at, user_id:str): + return await self.run_query_with_reconnection( + self.client.query, + """ + insert files { + file_id := $file_id, + file_name := $file_name, + file_size := $file_size, + note := $note, + content_type := $content_type, + uploaded_at := $uploaded_at, + expires_at := $expires_at, + user_id := $user_id + }; + """, + file_id=file_id, + file_name=file_name, + file_size=file_size, + note=note, + content_type=content_type, + uploaded_at=uploaded_at, + expires_at=expires_at, + user_id=user_id, + ) + + async def update_file(self, file_id:str, file_name:str, note:str, expires_at, user_id:str): + return await self.run_query_with_reconnection( + self.client.query, + """ + update files + filter .file_id = $file_id and .user_id = $user_id + set { + file_name := $file_name, + note := $note, + expires_at := $expires_at + }; + """, + file_id=file_id, + file_name=file_name, + note=note, + expires_at=expires_at, + user_id=user_id, + ) + + async def delete_file(self, file_id:str, user_id:str): + await self.run_query_with_reconnection( + self.client.query, + """ + delete files + filter .file_id = $file_id AND .user_id = $user_id + """, + file_id=file_id, + user_id=user_id + ) + + async def get_expired_files(self:str, current_datetime): + data = await self.run_query_with_reconnection( + self.client.query, + """ + select files { + file_id, + file_name, + expires_at + } + filter .expires_at < $now + order by .expires_at asc; + """, + now=current_datetime + ) + return [ + { + "file_id": item.file_id, + "file_name": item.file_name, + "expires_at": item.expires_at + } for item in data + ] + + async def delete_files_by_ids(self, remove_file_ids:list[str]): + if not remove_file_ids: + return + + await self.run_query_with_reconnection( + self.client.query, + """ + delete files + filter .file_id in array_unpack(>$ids); + """, + ids=remove_file_ids + )