add code to convert datetime into localtime into file list and add access quarys

This commit is contained in:
2025-10-24 10:50:19 +02:00
parent d04cd5f831
commit 2eda108577
6 changed files with 204 additions and 67 deletions
+24 -6
View File
@@ -1,6 +1,18 @@
module default {
scalar type access_status extending enum<ok, denied, expired, error>;
type IPAddr {
required value: str {
constraint exclusive;
}
}
type UserAgent {
required value: str {
constraint exclusive;
}
}
type files {
required file_id: str;
required file_name: str;
@@ -15,7 +27,9 @@ module default {
readonly := true;
}
multi accesses -> file_access;
multi accesses -> file_access {
on source delete delete target if orphan;
};
required property user_id: str {
readonly := true;
};
@@ -24,14 +38,18 @@ module default {
}
type file_access {
required ip: str;
required link ip -> IPAddr {
readonly := true;
on source delete delete target if orphan;
}
required link user_agent -> UserAgent {
readonly := true;
on source delete delete target if orphan;
}
required status: access_status {
default := access_status.ok;
}
user_agent: str {
readonly := true;
};
required at: datetime {
readonly := true;
default := datetime_of_statement();
+106 -19
View File
@@ -1,9 +1,6 @@
from my_modules.app.logger import logger
from datetime import datetime, timedelta, timezone
from zoneinfo import ZoneInfo
from collections import Counter
import asyncio, gel, json
import asyncio, gel
class EdgeDB:
def __init__(self, database:str=None, tls_security:str='insecure', timeout:int=1, max_retrys:int=10):
@@ -78,7 +75,7 @@ class EdgeDB:
}
return None
async def get_files(self, current_datetime, user_id: str):
async def get_files(self, current_datetime, user_id:str):
data = await self.run_query_with_reconnection(
self.client.query,
"""
@@ -100,16 +97,14 @@ class EdgeDB:
now=current_datetime,
user_id=user_id,
)
return [
{
return [{
"file_id": i.file_id,
"file_name": i.file_name,
"file_size": i.file_size,
"note": i.note,
"uploaded_at": i.uploaded_at,
"expires_at": i.expires_at,
} for i in data
]
"expires_at": i.expires_at if i.expires_at else '',
} for i in data]
async def add_file(self, file_id, file_name, file_size, note, content_type, uploaded_at, expires_at, user_id:str):
return await self.run_query_with_reconnection(
@@ -180,13 +175,11 @@ class EdgeDB:
""",
now=current_datetime
)
return [
{
return [{
"file_id": item.file_id,
"file_name": item.file_name,
"expires_at": item.expires_at
} for item in data
]
} for item in data]
async def delete_files_by_ids(self, remove_file_ids:list[str]):
if not remove_file_ids:
@@ -205,11 +198,105 @@ class EdgeDB:
pass
# File Access Quary Functions
async def add_file_access(self):
pass
async def add_file_access(self, file_id: str, ip_address: str, status: str, user_agent: str, accessed_at):
return await self.run_query_with_reconnection(
self.client.query,
"""
WITH
used_file := (
SELECT files
FILTER .file_id = <str>$file_id
LIMIT 1
),
ip_obj := (
INSERT IPAddr { value := <str>$ip_address }
UNLESS CONFLICT ON .value
ELSE (
SELECT IPAddr
FILTER .value = <str>$ip_address
)
),
ua_obj := (
INSERT UserAgent { value := <str>$user_agent }
UNLESS CONFLICT ON .value
ELSE (
SELECT UserAgent
FILTER .value = <str>$user_agent
)
),
new_file_access := (
INSERT file_access {
at := <datetime>$accessed_at,
status := <access_status>$status,
ip := ip_obj,
user_agent := ua_obj
}
),
_updated_file := (
UPDATE used_file
SET { accesses += (SELECT new_file_access) }
)
SELECT new_file_access {
at,
status,
ip: { value },
user_agent: { value },
};
""",
file_id=file_id,
accessed_at=accessed_at,
ip_address=ip_address,
status=status,
user_agent=str(user_agent),
)
async def get_all_file_access(self):
pass
data = await self.run_query_with_reconnection(
self.client.query,
"""
select file_access {
status,
ip: {
value
},
user_agent: {
value
},
at
}
"""
)
return [{
"status": str(file.status),
"ip": file.ip.value,
"user_agent": file.user_agent.value,
"accessed_at": file.at,
} for file in data]
async def get_file_access(self, file_id:str):
pass
async def get_file_access(self, file_id: str):
data = await self.run_query_with_reconnection(
self.client.query_single,
"""
SELECT files {
accesses: {
status,
ip: { value },
user_agent: { value },
at
}
}
FILTER .file_id = <str>$file_id
LIMIT 1
""",
file_id=file_id,
)
if data:
return [{
"status": str(access.status),
"ip": access.ip.value if access.ip else None,
"user_agent": access.user_agent.value if access.user_agent else None,
"accessed_at": access.at,
} for access in data.accesses]
return None
+10 -1
View File
@@ -1,7 +1,7 @@
from my_modules.app.constens import SECRET_KEY
import hmac, hashlib, base64, secrets, time
from urllib.parse import quote, unquote
from datetime import datetime, timezone
def base64url_encode(data: bytes) -> str:
return base64.urlsafe_b64encode(data).decode().rstrip("=")
@@ -27,6 +27,15 @@ def verify_signed_url(file_id: str, token: str, file_expiration: int) -> bool:
not_expired = file_expiration >= time.time()
return valid_sig and not_expired
def is_expired(expires_at):
if not expires_at:
return False
if expires_at.tzinfo is None:
expires_at = expires_at.replace(tzinfo=timezone.utc)
else:
expires_at = expires_at.astimezone(timezone.utc)
return expires_at <= datetime.now(timezone.utc)
if __name__ == "__main__":
file_id = generate_short_id()
url = generate_signed_url(file_id)
+7 -12
View File
@@ -1,5 +1,6 @@
from my_modules.file_helper_functions import verify_signed_url
from my_modules.file_helper_functions import is_expired, verify_signed_url
from my_modules.decoratory.header import login_required
from my_modules.functions import get_ip
from my_modules.app.setup import LIMITER
from my_modules.app.logger import logger
@@ -22,29 +23,20 @@ async def files(user):
files_data = await current_app.edgedb.get_files(current_datetime=datetime.now(timezone.utc), user_id=user['sub'])
return await render_template("views/webpage/files_list.htm", files=files_data)
@side_main_bp.route('/files/<file_id>/info')
@side_main_bp.route('/files/<path:file_id>/info')
@LIMITER.limit("10 per minute")
@login_required
async def file_info(file_id, user):
files_data = await current_app.edgedb.get_files(user_id=user['sub'])
return await render_template("views/webpage/.htm", files=files_data)
@side_main_bp.route('/files/<file_id>/edit')
@side_main_bp.route('/files/<path:file_id>/edit')
@LIMITER.limit("10 per minute")
@login_required
async def file_edit(file_id, user):
files_data = await current_app.edgedb.get_files(user_id=user['sub'])
return await render_template("views/webpage/.htm", files=files_data)
def is_expired(expires_at):
if not expires_at:
return False
if expires_at.tzinfo is None:
expires_at = expires_at.replace(tzinfo=timezone.utc)
else:
expires_at = expires_at.astimezone(timezone.utc)
return expires_at <= datetime.now(timezone.utc)
@side_main_bp.route("/-<file_id>")
@LIMITER.limit("10 per minute")
async def serve_file(file_id: str):
@@ -53,6 +45,7 @@ async def serve_file(file_id: str):
abort(404)
if is_expired(file_data.get("expires_at")):
await current_app.edgedb.add_file_access(file_id=file_id, ip_address=get_ip(), user_agent=request.user_agent, status="expired", accessed_at=datetime.now(timezone.utc))
return Response("This file has expired.", status=410, headers={
"Cache-Control": "no-store",
"X-Content-Type-Options": "nosniff",
@@ -65,8 +58,10 @@ async def serve_file(file_id: str):
path = current_app.upload_folder / file_name
if not path.exists() or not path.is_file():
await current_app.edgedb.add_file_access(file_id=file_id, ip_address=get_ip(), user_agent=request.user_agent, status="error", accessed_at=datetime.now(timezone.utc))
abort(404)
await current_app.edgedb.add_file_access(file_id=file_id, ip_address=get_ip(), user_agent=request.user_agent, status="ok", accessed_at=datetime.now(timezone.utc))
return await send_from_directory(
directory=current_app.upload_folder,
file_name=file_name,
Executable
+23
View File
@@ -0,0 +1,23 @@
#!/usr/bin/env python3
import quart_flask_patch
import asyncio
asyncio.set_event_loop_policy(asyncio.DefaultEventLoopPolicy())
from my_modules.app.constens import WEB_DEBUG
import my_modules.middleware
from my_modules.app.setup import app
from routes import (
basic_bp, auth_login_bp,
side_main_bp,
upload_bp
)
# Views for Requests adding the uris
app.register_blueprint(basic_bp)
app.register_blueprint(auth_login_bp, url_prefix='/auth')
app.register_blueprint(side_main_bp)
app.register_blueprint(upload_bp)
if __name__ == '__main__':
app.run(debug=WEB_DEBUG, port=5500)
+25 -20
View File
@@ -227,13 +227,17 @@
</td>
<td>{{ file.note }}</td>
<td><span class="badge">{{ file.file_size }}</span></td>
<td><time datetime="{{ file.uploaded_at }}">{{ file.uploaded_at }}</time></td>
<td><time datetime="{{ file.expires_at }}">{{ file.expires_at }}</time></td>
<td><time datetime="{{ file.uploaded_at }}" class="local-time"></time></td>
<td><time datetime="{{ file.expires_at }}" class="local-time"></time></td>
<td class="cell--right">
<div class="actions">
<button class="icon-btn" title="Info" data-action="info" aria-label="Info for {{ file.file_name }}"><span class="sr-only">Info</span></button>
<button class="icon-btn" data-variant="primary" title="Edit" data-action="edit" aria-label="Edit {{ file.file_name }}"> <span class="sr-only">Edit</span></button>
<button class="icon-btn" data-variant="ghost" title="Copy link" data-action="copy" aria-label="Copy link to {{ file.file_name }}">📋 <span class="sr-only">Copy</span></button>
<button class="icon-btn" title="Info">
<a href="{{ url_for('side_main.file_info', file_id=file.file_id) }}"> <span class="sr-only">Info</span></a>
</button>
<button class="icon-btn" title="Edit">
<a href="{{ url_for('side_main.file_edit', file_id=file.file_id) }}">✏️ <span class="sr-only">Edit</span></a>
</button>
<button class="icon-btn" title="Copy link" data-action="copy">📋 <span class="sr-only">Copy</span></button>
</div>
</td>
</tr>
@@ -245,7 +249,6 @@
</main>
<script>
// Minimal progressive enhancement for "Copy" action
document.addEventListener('click', (e) => {
const btn = e.target.closest('button[data-action="copy"]');
if(!btn) return;
@@ -261,20 +264,22 @@
});
});
// Optional: placeholder handlers for info/edit hooks
document.addEventListener('click', (e) => {
const info = e.target.closest('button[data-action="info"]');
const edit = e.target.closest('button[data-action="edit"]');
if(info){
const name = info.closest('tr')?.querySelector('.cell--name a')?.textContent?.trim();
// Hook into your modal/toast here
console.log('Info for:', name);
}
if(edit){
const id = edit.closest('tr')?.querySelector('.cell--name a')?.getAttribute('href');
// Navigate or open editor route
if(id) window.location.href = id + '/edit';
}
document.querySelectorAll("time.local-time").forEach(timeEl => {
const datetime = timeEl.getAttribute("datetime");
if (!datetime) return;
const date = new Date(datetime);
timeEl.title = date.toISOString();
timeEl.textContent = date.toLocaleString(undefined, {
year: "numeric",
month: "short",
day: "numeric",
hour: "2-digit",
minute: "2-digit",
second: undefined,
hour12: false,
});
});
</script>
{% endblock %}