move EdgeDB into db sub folder
This commit is contained in:
@@ -0,0 +1,345 @@
|
||||
from my_modules.file_helper_functions import generate_short_id
|
||||
from my_modules.app.logger import logger
|
||||
|
||||
import asyncio, gel
|
||||
|
||||
class EdgeDB:
|
||||
def __init__(self, database:str=None, tls_security:str='insecure', timeout:int=1, max_retrys:int=10):
|
||||
self.database = database
|
||||
self.tls_security = tls_security
|
||||
|
||||
self.timeout = timeout
|
||||
self.max_retrys = max_retrys
|
||||
|
||||
self.client = None
|
||||
|
||||
# Connect Function
|
||||
async def __aenter__(self):
|
||||
await self.connect()
|
||||
return self
|
||||
|
||||
async def __aexit__(self, exc_type, exc, tb):
|
||||
await self.close()
|
||||
|
||||
async def connect(self):
|
||||
self.client = gel.create_async_client(
|
||||
tls_security=self.tls_security,
|
||||
database=self.database,
|
||||
timeout=self.timeout
|
||||
)
|
||||
|
||||
async def close(self):
|
||||
if self.client:
|
||||
await self.client.aclose()
|
||||
|
||||
# Query Helper Function
|
||||
async def run_query_with_reconnection(self, function, *args:tuple, **kwargs):
|
||||
retry_count = 0
|
||||
while True:
|
||||
try:
|
||||
query = args[0].replace('\n ', '').replace(' ', '')
|
||||
if args[1:]:
|
||||
await logger.debug(f'{function.__name__} |{query}| {args[1:]}{kwargs}')
|
||||
else:
|
||||
await logger.debug(f'{function.__name__} |{query}| {kwargs}')
|
||||
|
||||
return await function(*args, **kwargs)
|
||||
except gel.errors.ClientConnectionFailedError:
|
||||
await self.connect()
|
||||
await logger.error(f'Retry to Query Database: {retry_count}, Function: {args}')
|
||||
if retry_count > self.max_retrys:
|
||||
break
|
||||
await asyncio.sleep(0.5)
|
||||
retry_count += 1
|
||||
|
||||
# File Quary Functions
|
||||
async def get_file(self, file_id:str):
|
||||
data = await self.run_query_with_reconnection(
|
||||
self.client.query_single,
|
||||
"""
|
||||
select files {
|
||||
file_name,
|
||||
content_type,
|
||||
expires_at,
|
||||
user_id
|
||||
}
|
||||
filter .file_id = <str>$file_id
|
||||
limit 1
|
||||
""",
|
||||
file_id=file_id
|
||||
)
|
||||
|
||||
if data:
|
||||
return {
|
||||
"file_name": data.file_name,
|
||||
"content_type": data.content_type,
|
||||
"expires_at": data.expires_at,
|
||||
"user_id": data.user_id
|
||||
}
|
||||
return None
|
||||
|
||||
async def get_files(self, current_datetime, user_id:str):
|
||||
data = await self.run_query_with_reconnection(
|
||||
self.client.query,
|
||||
"""
|
||||
select files {
|
||||
file_id,
|
||||
file_name,
|
||||
file_size,
|
||||
note,
|
||||
uploaded_at,
|
||||
expires_at
|
||||
}
|
||||
filter
|
||||
.user_id = <str>$user_id
|
||||
and (
|
||||
(.expires_at ?? <datetime>'9999-12-31T00:00:00Z') > <datetime>$now
|
||||
)
|
||||
order by .uploaded_at desc
|
||||
""",
|
||||
now=current_datetime,
|
||||
user_id=user_id,
|
||||
)
|
||||
return [{
|
||||
"file_id": i.file_id,
|
||||
"file_name": i.file_name,
|
||||
"file_size": i.file_size,
|
||||
"note": i.note,
|
||||
"uploaded_at": i.uploaded_at,
|
||||
"expires_at": i.expires_at if i.expires_at else '',
|
||||
} for i in data]
|
||||
|
||||
async def add_file(self, file_name:str, file_size:str, note:str, content_type:str, uploaded_at, expires_at, user_id:str):
|
||||
for attempt in range(10):
|
||||
try:
|
||||
return await self.run_query_with_reconnection(
|
||||
self.client.query_single,
|
||||
"""
|
||||
insert files {
|
||||
file_id := <str>$file_id,
|
||||
file_name := <str>$file_name,
|
||||
file_size := <str>$file_size,
|
||||
note := <str>$note,
|
||||
content_type := <str>$content_type,
|
||||
uploaded_at := <datetime>$uploaded_at,
|
||||
expires_at := <optional datetime>$expires_at,
|
||||
user_id := <str>$user_id
|
||||
};
|
||||
""",
|
||||
file_id=generate_short_id(),
|
||||
file_name=file_name,
|
||||
file_size=file_size,
|
||||
note=note,
|
||||
content_type=content_type,
|
||||
uploaded_at=uploaded_at,
|
||||
expires_at=expires_at,
|
||||
user_id=user_id,
|
||||
)
|
||||
except gel.errors.ConstraintViolationError as e:
|
||||
await logger.warning(f'file_id collision on attempt {attempt+1}, regenerating…')
|
||||
continue
|
||||
raise RuntimeError("Could not allocate unique file_id after multiple retries")
|
||||
|
||||
async def update_file(self, file_id:str, file_name:str, note:str, expires_at, user_id:str):
|
||||
return await self.run_query_with_reconnection(
|
||||
self.client.query,
|
||||
"""
|
||||
update files
|
||||
filter .file_id = <str>$file_id and .user_id = <str>$user_id
|
||||
set {
|
||||
file_name := <str>$file_name,
|
||||
note := <str>$note,
|
||||
expires_at := <datetime>$expires_at
|
||||
};
|
||||
""",
|
||||
file_id=file_id,
|
||||
file_name=file_name,
|
||||
note=note,
|
||||
expires_at=expires_at,
|
||||
user_id=user_id,
|
||||
)
|
||||
|
||||
async def delete_file(self, file_id:str, user_id:str):
|
||||
await self.run_query_with_reconnection(
|
||||
self.client.query,
|
||||
"""
|
||||
delete files
|
||||
filter .file_id = <str>$file_id AND .user_id = <str>$user_id
|
||||
""",
|
||||
file_id=file_id,
|
||||
user_id=user_id
|
||||
)
|
||||
|
||||
async def get_expired_files(self:str, current_datetime):
|
||||
data = await self.run_query_with_reconnection(
|
||||
self.client.query,
|
||||
"""
|
||||
select files {
|
||||
file_id,
|
||||
file_name,
|
||||
expires_at
|
||||
}
|
||||
filter .expires_at < <datetime>$now
|
||||
order by .expires_at asc;
|
||||
""",
|
||||
now=current_datetime
|
||||
)
|
||||
return [{
|
||||
"file_id": item.file_id,
|
||||
"file_name": item.file_name,
|
||||
"expires_at": item.expires_at
|
||||
} for item in data]
|
||||
|
||||
async def delete_files_by_ids(self, remove_file_ids:list[str]):
|
||||
if not remove_file_ids:
|
||||
return
|
||||
|
||||
await self.run_query_with_reconnection(
|
||||
self.client.query,
|
||||
"""
|
||||
delete files
|
||||
filter .file_id in array_unpack(<array<str>>$ids);
|
||||
""",
|
||||
ids=remove_file_ids
|
||||
)
|
||||
|
||||
async def get_file_informations(self, file_id:str):
|
||||
pass
|
||||
|
||||
# File Access Quary Functions
|
||||
async def add_file_access(self, file_id: str, ip_address: str, status: str, user_agent: str, accessed_at):
|
||||
return await self.run_query_with_reconnection(
|
||||
self.client.query,
|
||||
"""
|
||||
WITH
|
||||
used_file := (
|
||||
SELECT files
|
||||
FILTER .file_id = <str>$file_id
|
||||
LIMIT 1
|
||||
),
|
||||
ip_obj := (
|
||||
INSERT IPAddr { value := <str>$ip_address }
|
||||
UNLESS CONFLICT ON .value
|
||||
ELSE (
|
||||
SELECT IPAddr
|
||||
FILTER .value = <str>$ip_address
|
||||
)
|
||||
),
|
||||
ua_obj := (
|
||||
INSERT UserAgent { value := <str>$user_agent }
|
||||
UNLESS CONFLICT ON .value
|
||||
ELSE (
|
||||
SELECT UserAgent
|
||||
FILTER .value = <str>$user_agent
|
||||
)
|
||||
),
|
||||
new_file_access := (
|
||||
INSERT file_access {
|
||||
at := <datetime>$accessed_at,
|
||||
status := <access_status>$status,
|
||||
ip := ip_obj,
|
||||
user_agent := ua_obj
|
||||
}
|
||||
),
|
||||
_updated_file := (
|
||||
UPDATE used_file
|
||||
SET { accesses += (SELECT new_file_access) }
|
||||
)
|
||||
|
||||
SELECT new_file_access {
|
||||
at,
|
||||
status,
|
||||
ip: { value },
|
||||
user_agent: { value },
|
||||
};
|
||||
""",
|
||||
file_id=file_id,
|
||||
accessed_at=accessed_at,
|
||||
ip_address=ip_address,
|
||||
status=status,
|
||||
user_agent=str(user_agent),
|
||||
)
|
||||
|
||||
async def get_all_file_access(self):
|
||||
data = await self.run_query_with_reconnection(
|
||||
self.client.query,
|
||||
"""
|
||||
select file_access {
|
||||
status,
|
||||
ip: {
|
||||
value
|
||||
},
|
||||
user_agent: {
|
||||
value
|
||||
},
|
||||
at
|
||||
} order by .at desc
|
||||
"""
|
||||
)
|
||||
return [{
|
||||
"status": str(file.status),
|
||||
"ip": file.ip.value,
|
||||
"user_agent": file.user_agent.value,
|
||||
"accessed_at": file.at,
|
||||
} for file in data]
|
||||
|
||||
async def get_all_access_of_user(self, user_id:str):
|
||||
data = await self.run_query_with_reconnection(
|
||||
self.client.query,
|
||||
"""
|
||||
select files {
|
||||
file_id,
|
||||
file_name,
|
||||
note,
|
||||
accesses: {
|
||||
at,
|
||||
status,
|
||||
ip: {
|
||||
value
|
||||
},
|
||||
user_agent: {
|
||||
value
|
||||
}
|
||||
}
|
||||
order by .at desc
|
||||
}
|
||||
filter .user_id = <str>$user_id
|
||||
""",
|
||||
user_id=user_id
|
||||
)
|
||||
return sorted([{
|
||||
"file_id": file.file_id,
|
||||
"file_name": file.file_name,
|
||||
"file_note": file.note,
|
||||
"status": access.status,
|
||||
"ip": access.ip.value,
|
||||
"user_agent": access.user_agent.value,
|
||||
"accessed_at": access.at,
|
||||
} for file in data for access in file.accesses], key=lambda x: x["accessed_at"], reverse=True)
|
||||
|
||||
async def get_file_access(self, file_id: str):
|
||||
data = await self.run_query_with_reconnection(
|
||||
self.client.query_single,
|
||||
"""
|
||||
SELECT files {
|
||||
accesses: {
|
||||
status,
|
||||
ip: { value },
|
||||
user_agent: { value },
|
||||
at
|
||||
}
|
||||
}
|
||||
FILTER .file_id = <str>$file_id
|
||||
LIMIT 1
|
||||
""",
|
||||
file_id=file_id,
|
||||
)
|
||||
|
||||
if data:
|
||||
return [{
|
||||
"status": str(access.status),
|
||||
"ip": access.ip.value if access.ip else None,
|
||||
"user_agent": access.user_agent.value if access.user_agent else None,
|
||||
"accessed_at": access.at,
|
||||
} for access in data.accesses]
|
||||
return None
|
||||
Reference in New Issue
Block a user