move all Databases into the database folder to not have storeage and Database when they are all Databases

This commit is contained in:
2026-04-06 16:24:28 +02:00
parent c4238d19e8
commit 01343472df
6 changed files with 14 additions and 14 deletions
+156
View File
@@ -0,0 +1,156 @@
from server.GameBoard import GameBoard
from server.dataset.Dataset import Dataset
from datetime import datetime
import json, time
try:
import gel as _gel # type: ignore[import-not-found]
except ImportError: # pragma: no cover
_gel = None
class EdgeDB:
def __init__(self, database:str=None, tls_security:str='insecure', **kwargs):
self.database = database
self.tls_security = tls_security
self._connect()
def _connect(self):
if _gel is None:
raise ImportError("The 'gel' package is required to use EdgeDB storage")
self.client = _gel.create_client(
tls_security=self.tls_security, database=self.database
)
def run_query_with_reconnection(self, function, *args, **kwargs):
while True:
try:
return function(*args, **kwargs)
except Exception as error:
if error.__class__.__name__ != "ClientConnectionFailedError":
raise
self._connect()
time.sleep(0.5)
def insert_game_type(self, name:str, is_ladder:bool):
return self.run_query_with_reconnection(
self.client.query_required_single,
"""
insert GameType {
name := <str>$name,
is_ladder := <bool>$is_ladder
}""",
name=name,
is_ladder=is_ladder
)
def create_moves_with_calculations(self, game_board:GameBoard):
data = []
moves = game_board.turns
snake_calulations = [[calc for calc in ele["data"]] for ele in game_board.snake_class.get_history() ]
labels_by_turn = Dataset(game_board).labels_by_turn()
for i in range(len(moves)):
calculations = snake_calulations[i] if i < len(snake_calulations) else []
calculations.append({
"dataset": {
"is_good_move": labels_by_turn.get(moves[i]["turn"], False)
}
})
data.append({
"turn": moves[i]["turn"],
"move": moves[i]["move"],
"game_board": moves[i]["game_board"],
"calculations": calculations,
})
return data
async def insert(self, game_board:GameBoard):
game_type = game_board.get_type_of_game()
self.run_query_with_reconnection(
self.client.query,
"""
insert GameBoard {
id := <uuid>$id,
created_at := <datetime>$created_at,
turns := <int32>$turns,
map := <str>$map,
winner := <str>$winner,
moves := (
with input_data := <array <tuple <turn: int32, `move`: str, game_board: json, calculations: array<json> >>>$moves
for data in array_unpack(input_data)
insert Moves {
turn := data.turn,
snake_move := data.`move`,
game_board := data.game_board,
calculations := data.calculations
}
),
type := (
insert GameType {
name := <str>$game_type,
is_ladder := <bool>$is_ladder
} unless conflict on (.name, .is_ladder) else GameType
),
ruleset := (
insert Ruleset {
name := <str>$ruleset,
version := <str>$version,
settings := to_json(<str>$settings)
} unless conflict on (.name, .version, .settings) else Ruleset
),
snake := (
insert Snake {
type := <str>$snake_type
} unless conflict on .type else Snake
)
}""",
id=game_board.id,
created_at=datetime.fromtimestamp(game_board.now_date.timestamp(), game_board.now_date.astimezone().tzinfo),
turns=game_board.turn,
map=game_board.map if game_board.map else "standard",
winner=', '.join(game_board.winner_snake_names)
if game_board.winner_snake_names
else "",
moves=[
tuple(
[
x["turn"],
x["move"],
json.dumps(x["game_board"]),
[json.dumps(ele) for ele in x["calculations"]],
]
)
for x in self.create_moves_with_calculations(game_board)
],
game_type=game_type["name"],
is_ladder=game_type["is_ladder"],
ruleset=game_board.ruleset["name"],
version=game_board.ruleset["version"],
settings=json.dumps(game_board.ruleset["settings"]),
snake_type=game_board.snake_class.__class__.__name__,
)
async def save(self, game_board:GameBoard):
await self.insert(game_board)
def __del__(self):
self.client.close()
def cleanup(self):
return self.run_query_with_reconnection(
self.client.query_json,
"""
delete Moves { };
with gameboard := (delete GameBoard filter .turns < <std::int32>"200" or .is_winner_me = <std::bool>"false")
select gameboard {id, url, winner, turns, type: { is_ladder, name } } order by .turns desc;
"""
)
+180
View File
@@ -0,0 +1,180 @@
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from server.GameBoard import GameBoard
from server.dataset.Dataset import Dataset
from server.Files import save_file
import aiofiles
import aiofiles.os
import gzip, json, os
class LocalStorage:
def __init__(self, file_path:str, **kwargs):
self.save_folder_dict = {
"standard": "01_Standard",
"duel": "02_Duels",
"constrictor": "04_Constrictor",
"solo": "05_Solo",
}
self.file_path = file_path
self.dataset_only = os.getenv("STORE_DATASET_ONLY", "false").strip().lower() in ("1", "true", "yes", "on")
self.dataset_jsonl_path = os.getenv("DATASET_JSONL_PATH", os.path.join(self.file_path, "dataset", "good_moves.jsonl"))
self.dataset_rotate_daily = os.getenv("DATASET_ROTATE_DAILY", "true").strip().lower() in ("1", "true", "yes", "on")
self.dataset_compress_rotated = os.getenv("DATASET_COMPRESS_ROTATED", "true").strip().lower() in ("1", "true", "yes", "on")
self.dataset_max_bytes = int(float(os.getenv("DATASET_JSONL_MAX_MB", "50")) * 1024 * 1024)
def _get_active_dataset_path(self, game_board:'GameBoard'):
if not self.dataset_rotate_daily:
return self.dataset_jsonl_path
base, ext = os.path.splitext(self.dataset_jsonl_path)
if ext == "":
ext = ".jsonl"
return f"{base}-{game_board.now_date.strftime('%Y-%m-%d')}{ext}"
def _gzip_file(self, file_path:str):
gz_path = f"{file_path}.gz"
with open(file_path, "rb") as src:
with gzip.open(gz_path, "wb") as dst:
dst.writelines(src)
os.remove(file_path)
async def _compress_old_daily_files(self, active_path:str):
if not self.dataset_compress_rotated:
return
folder = os.path.dirname(active_path)
base_name = os.path.basename(self.dataset_jsonl_path)
base_stem, _ = os.path.splitext(base_name)
prefix = f"{base_stem}-"
active_name = os.path.basename(active_path)
if folder == "" or not await aiofiles.os.path.exists(folder):
return
for name in os.listdir(folder):
if name == active_name:
continue
if not name.startswith(prefix):
continue
if not name.endswith(".jsonl"):
continue
self._gzip_file(os.path.join(folder, name))
async def _rotate_if_needed(self, active_path:str, game_board:'GameBoard'):
if self.dataset_max_bytes <= 0:
return
if not await aiofiles.os.path.exists(active_path):
return
file_size = (await aiofiles.os.stat(active_path)).st_size
if file_size < self.dataset_max_bytes:
return
timestamp = game_board.now_date.strftime("%Y%m%d-%H%M%S")
rotated_path = f"{active_path}.{timestamp}.jsonl"
suffix = 1
while await aiofiles.os.path.exists(rotated_path):
suffix += 1
rotated_path = f"{active_path}.{timestamp}.{suffix}.jsonl"
await aiofiles.os.rename(active_path, rotated_path)
if self.dataset_compress_rotated:
self._gzip_file(rotated_path)
def _build_dataset_rows(self, dataset_payload:dict, game_board:'GameBoard'):
game_info = dataset_payload.get("game", {})
snake_info = dataset_payload.get("snake", {})
rows = []
for sample in dataset_payload.get("samples", []):
rows.append({
"game_id": game_info.get("id", game_board.id),
"game_map": game_info.get("map", game_board.map),
"game_type": game_info.get("type", game_board.get_type_of_game()),
"snake_type": snake_info.get(
"type", game_board.snake_class.__class__.__name__
),
"turn": sample.get("turn"),
"move": sample.get("move"),
"is_good_move": sample.get("is_good_move", False),
"game_board": sample.get("game_board"),
"history": sample.get("history"),
})
return rows
async def _append_dataset_jsonl(self, dataset_payload:dict, game_board:'GameBoard'):
rows = self._build_dataset_rows(dataset_payload, game_board)
if len(rows) == 0:
return
active_path = self._get_active_dataset_path(game_board)
await aiofiles.os.makedirs(os.path.dirname(active_path), exist_ok=True)
await self._compress_old_daily_files(active_path)
await self._rotate_if_needed(active_path, game_board)
async with aiofiles.open(active_path, "a") as f:
for row in rows:
await f.write(json.dumps(row, ensure_ascii=False) + "\n")
def _get_correct_folder_for_save_file(self, game_board:'GameBoard', file_name:str, game_type:str, leader_board:bool, winner:bool):
storage_folder = self.file_path
if leader_board:
storage_folder = os.path.join(storage_folder, "00_Leaderboards")
storage_folder = os.path.join(storage_folder, self.save_folder_dict[game_type])
storage_folder = os.path.join(
storage_folder,
game_board.now_date.strftime("%Y"),
game_board.now_date.strftime("%m_%B"),
game_board.now_date.strftime("%d"),
)
if winner:
storage_folder = os.path.join(storage_folder, "Winner")
else:
storage_folder = os.path.join(storage_folder, "Lost")
return os.path.join(storage_folder, file_name)
async def save(self, game_board:'GameBoard'):
game_type = game_board.get_type_of_game()
dataset = Dataset(game_board).build(only_good_moves=True)
await self._append_dataset_jsonl(dataset, game_board)
if self.dataset_only:
return
save_file_path = self._get_correct_folder_for_save_file(
game_board,
f"{game_board.snake_class.__class__.__name__}_{game_board.now_date.strftime('%H-%M-%S')}_{game_board.id}.json",
game_type["name"],
game_type["is_ladder"],
True if game_board.winner_snake_names and "me" in game_board.winner_snake_names else False
)
payload = {
"winner": game_board.winner_snake_names,
"game": {
"url": game_board.url,
"id": game_board.id,
"final_turns": game_board.turn,
"map": game_board.map,
"type": game_type,
"ruleset": game_board.ruleset,
},
"moves": game_board.turns,
"snake": {
"type": game_board.snake_class.__class__.__name__,
"calculations": game_board.snake_class.get_history(),
},
"dataset": dataset,
}
await save_file(save_file_path, json.dumps(payload, indent=2, ensure_ascii=False))
def cleanup(self):
pass
+9
View File
@@ -1 +1,10 @@
from .GameplayDatabase import GameplayDatabase
from .LocalStorage import LocalStorage
from .EdgeDB import EdgeDB
class StorageLoader:
@classmethod
def build(self, selected_storage:str) -> LocalStorage|EdgeDB:
storage_module = __import__(f"server.storage.{selected_storage}", fromlist=[selected_storage])
storage_class = getattr(storage_module, selected_storage)
return storage_class