from my_modules.app.logger import logger import asyncio, gel class EdgeDB: def __init__(self, database:str=None, tls_security:str='insecure', timeout:int=1, max_retrys:int=10): self.database = database self.tls_security = tls_security self.timeout = timeout self.max_retrys = max_retrys self.client = None # Connect Function async def __aenter__(self): await self.connect() return self async def __aexit__(self, exc_type, exc, tb): await self.close() async def connect(self): self.client = gel.create_async_client( tls_security=self.tls_security, database=self.database, timeout=self.timeout ) async def close(self): if self.client: await self.client.aclose() # Query Helper Function async def run_query_with_reconnection(self, function, *args:tuple, **kwargs): retry_count = 0 while True: try: query = args[0].replace('\n ', '').replace(' ', '') if args[1:]: await logger.debug(f'{function.__name__} |{query}| {args[1:]}{kwargs}') else: await logger.debug(f'{function.__name__} |{query}| {kwargs}') return await function(*args, **kwargs) except gel.errors.ClientConnectionFailedError: await self.connect() await logger.error(f'Retry to Query Database: {retry_count}, Function: {args}') if retry_count > self.max_retrys: break await asyncio.sleep(0.5) retry_count += 1 # File Quary Functions async def get_file(self, file_id:str): data = await self.run_query_with_reconnection( self.client.query_single, """ select files { file_name, content_type, expires_at } filter .file_id = $file_id limit 1 """, file_id=file_id ) if data: return { "file_name": data.file_name, "content_type": data.content_type, "expires_at": data.expires_at } return None async def get_files(self, current_datetime, user_id:str): data = await self.run_query_with_reconnection( self.client.query, """ select files { file_id, file_name, file_size, note, uploaded_at, expires_at } filter .user_id = $user_id and ( (.expires_at ?? '9999-12-31T00:00:00Z') > $now ) order by .uploaded_at desc """, now=current_datetime, user_id=user_id, ) return [{ "file_id": i.file_id, "file_name": i.file_name, "file_size": i.file_size, "note": i.note, "uploaded_at": i.uploaded_at, "expires_at": i.expires_at if i.expires_at else '', } for i in data] async def add_file(self, file_id, file_name, file_size, note, content_type, uploaded_at, expires_at, user_id:str): return await self.run_query_with_reconnection( self.client.query, """ insert files { file_id := $file_id, file_name := $file_name, file_size := $file_size, note := $note, content_type := $content_type, uploaded_at := $uploaded_at, expires_at := $expires_at, user_id := $user_id }; """, file_id=file_id, file_name=file_name, file_size=file_size, note=note, content_type=content_type, uploaded_at=uploaded_at, expires_at=expires_at, user_id=user_id, ) async def update_file(self, file_id:str, file_name:str, note:str, expires_at, user_id:str): return await self.run_query_with_reconnection( self.client.query, """ update files filter .file_id = $file_id and .user_id = $user_id set { file_name := $file_name, note := $note, expires_at := $expires_at }; """, file_id=file_id, file_name=file_name, note=note, expires_at=expires_at, user_id=user_id, ) async def delete_file(self, file_id:str, user_id:str): await self.run_query_with_reconnection( self.client.query, """ delete files filter .file_id = $file_id AND .user_id = $user_id """, file_id=file_id, user_id=user_id ) async def get_expired_files(self:str, current_datetime): data = await self.run_query_with_reconnection( self.client.query, """ select files { file_id, file_name, expires_at } filter .expires_at < $now order by .expires_at asc; """, now=current_datetime ) return [{ "file_id": item.file_id, "file_name": item.file_name, "expires_at": item.expires_at } for item in data] async def delete_files_by_ids(self, remove_file_ids:list[str]): if not remove_file_ids: return await self.run_query_with_reconnection( self.client.query, """ delete files filter .file_id in array_unpack(>$ids); """, ids=remove_file_ids ) async def get_file_informations(self, file_id:str): pass # File Access Quary Functions async def add_file_access(self, file_id: str, ip_address: str, status: str, user_agent: str, accessed_at): return await self.run_query_with_reconnection( self.client.query, """ WITH used_file := ( SELECT files FILTER .file_id = $file_id LIMIT 1 ), ip_obj := ( INSERT IPAddr { value := $ip_address } UNLESS CONFLICT ON .value ELSE ( SELECT IPAddr FILTER .value = $ip_address ) ), ua_obj := ( INSERT UserAgent { value := $user_agent } UNLESS CONFLICT ON .value ELSE ( SELECT UserAgent FILTER .value = $user_agent ) ), new_file_access := ( INSERT file_access { at := $accessed_at, status := $status, ip := ip_obj, user_agent := ua_obj } ), _updated_file := ( UPDATE used_file SET { accesses += (SELECT new_file_access) } ) SELECT new_file_access { at, status, ip: { value }, user_agent: { value }, }; """, file_id=file_id, accessed_at=accessed_at, ip_address=ip_address, status=status, user_agent=str(user_agent), ) async def get_all_file_access(self): data = await self.run_query_with_reconnection( self.client.query, """ select file_access { status, ip: { value }, user_agent: { value }, at } """ ) return [{ "status": str(file.status), "ip": file.ip.value, "user_agent": file.user_agent.value, "accessed_at": file.at, } for file in data] async def get_file_access(self, file_id: str): data = await self.run_query_with_reconnection( self.client.query_single, """ SELECT files { accesses: { status, ip: { value }, user_agent: { value }, at } } FILTER .file_id = $file_id LIMIT 1 """, file_id=file_id, ) if data: return [{ "status": str(access.status), "ip": access.ip.value if access.ip else None, "user_agent": access.user_agent.value if access.user_agent else None, "accessed_at": access.at, } for access in data.accesses] return None