Files
simple-nanoshare/my_modules/app/setup.py
T

81 lines
2.2 KiB
Python

from my_modules.functions import custom_limit_key
from my_modules.app.constens import SECRET_KEY, UPLOAD_DIR
from my_modules.AsyncCache import AsyncCache
from my_modules.app.logger import logger
from my_modules.EdgeDB import EdgeDB
from quart_session import Session
from flask_limiter import Limiter
import redis.asyncio as aioredis
from quart import Quart
import os
app = Quart(__name__, template_folder="../../templates/side", static_folder="../../templates/files")
app.config['MAX_CONTENT_LENGTH'] = 1024 * 1024 * 1024
app.secret_key = SECRET_KEY
app.upload_folder = UPLOAD_DIR
# Cache, Sessions and Limiter over Valkey
if os.getenv("VALKEY_HOST", None) is not None:
cache = AsyncCache(
backend='redis',
default_ttl=300,
username=os.getenv('VALKEY_CACHE_USER', ''),
password=os.getenv('VALKEY_CACHE_PASSWORD', ''),
host=os.getenv('VALKEY_HOST'),
port=os.getenv('VALKEY_PORT', 6379),
db=os.getenv('VALKEY_DB', 0)
)
else:
cache = AsyncCache(
backend='memory',
)
if os.getenv("VALKEY_HOST", None) is not None:
app.config.from_mapping(
SESSION_TYPE="redis",
SESSION_PERMANENT=True,
SESSION_USE_SIGNER=True,
SESSION_REDIS = aioredis.Redis(
username=os.getenv('VALKEY_SESSION_USER', None),
password=os.getenv('VALKEY_SESSION_PASSWORD', None),
host=os.getenv("VALKEY_HOST"),
port=os.getenv("VALKEY_PORT", 6379),
db=os.getenv("VALKEY_DB", 0),
decode_responses=True
)
)
else:
app.config.from_mapping(
SESSION_TYPE='memcached',
)
Session(app)
LIMITER = Limiter(
custom_limit_key,
app=app,
storage_uri=(
f"redis://{os.getenv('VALKEY_LIMITER_USER', '')}:{os.getenv('VALKEY_LIMITER_PASSWORD', '')}"
f"@{os.getenv("VALKEY_HOST")}:{os.getenv('VALKEY_PORT', 6379)}/{os.getenv('VALKEY_DB', 0)}"
) if os.getenv("VALKEY_HOST") else None,
default_limits=[],
strategy='moving-window'
)
@app.before_serving
async def init_edgedb():
app.edgedb = EdgeDB(
database=os.getenv("EDGEDB_DATABASE"),
tls_security=None if app.debug else 'insecure'
)
await app.edgedb.connect()
@app.after_serving
async def close_edgedb():
if app.edgedb:
await app.edgedb.close()
await logger.shutdown()