from server.dataset.Dataset import Dataset from server.GameBoard import GameBoard from server.Files import save_file import aiofiles import aiofiles.os import gzip, json, os class LocalStorage: def __init__(self, file_path:str, **kwargs): self.save_folder_dict = { "standard": "01_Standard", "duel": "02_Duels", "constrictor": "04_Constrictor", "solo": "05_Solo", } self.file_path = file_path self.dataset_only = os.getenv("STORE_DATASET_ONLY", "false").strip().lower() in ("1", "true", "yes", "on") self.dataset_jsonl_path = os.getenv("DATASET_JSONL_PATH", os.path.join(self.file_path, "dataset", "good_moves.jsonl")) self.dataset_rotate_daily = os.getenv("DATASET_ROTATE_DAILY", "true").strip().lower() in ("1", "true", "yes", "on") self.dataset_compress_rotated = os.getenv("DATASET_COMPRESS_ROTATED", "true").strip().lower() in ("1", "true", "yes", "on") self.dataset_max_bytes = int(float(os.getenv("DATASET_JSONL_MAX_MB", "50")) * 1024 * 1024) def _get_active_dataset_path(self, game_board:GameBoard): if not self.dataset_rotate_daily: return self.dataset_jsonl_path base, ext = os.path.splitext(self.dataset_jsonl_path) if ext == "": ext = ".jsonl" return f"{base}-{game_board.now_date.strftime('%Y-%m-%d')}{ext}" def _gzip_file(self, file_path:str): gz_path = f"{file_path}.gz" with open(file_path, "rb") as src: with gzip.open(gz_path, "wb") as dst: dst.writelines(src) os.remove(file_path) async def _compress_old_daily_files(self, active_path:str): if not self.dataset_compress_rotated: return folder = os.path.dirname(active_path) base_name = os.path.basename(self.dataset_jsonl_path) base_stem, _ = os.path.splitext(base_name) prefix = f"{base_stem}-" active_name = os.path.basename(active_path) if folder == "" or not await aiofiles.os.path.exists(folder): return for name in os.listdir(folder): if name == active_name: continue if not name.startswith(prefix): continue if not name.endswith(".jsonl"): continue self._gzip_file(os.path.join(folder, name)) async def _rotate_if_needed(self, active_path:str, game_board:GameBoard): if self.dataset_max_bytes <= 0: return if not await aiofiles.os.path.exists(active_path): return file_size = (await aiofiles.os.stat(active_path)).st_size if file_size < self.dataset_max_bytes: return timestamp = game_board.now_date.strftime("%Y%m%d-%H%M%S") rotated_path = f"{active_path}.{timestamp}.jsonl" suffix = 1 while await aiofiles.os.path.exists(rotated_path): suffix += 1 rotated_path = f"{active_path}.{timestamp}.{suffix}.jsonl" await aiofiles.os.rename(active_path, rotated_path) if self.dataset_compress_rotated: self._gzip_file(rotated_path) def _build_dataset_rows(self, dataset_payload:dict, game_board:GameBoard): game_info = dataset_payload.get("game", {}) snake_info = dataset_payload.get("snake", {}) rows = [] for sample in dataset_payload.get("samples", []): rows.append({ "game_id": game_info.get("id", game_board.id), "game_map": game_info.get("map", game_board.map), "game_type": game_info.get("type", game_board.get_type_of_game()), "snake_type": snake_info.get( "type", game_board.snake_class.__class__.__name__ ), "turn": sample.get("turn"), "move": sample.get("move"), "is_good_move": sample.get("is_good_move", False), "game_board": sample.get("game_board"), "history": sample.get("history"), }) return rows async def _append_dataset_jsonl(self, dataset_payload:dict, game_board:GameBoard): rows = self._build_dataset_rows(dataset_payload, game_board) if len(rows) == 0: return active_path = self._get_active_dataset_path(game_board) await aiofiles.os.makedirs(os.path.dirname(active_path), exist_ok=True) await self._compress_old_daily_files(active_path) await self._rotate_if_needed(active_path, game_board) async with aiofiles.open(active_path, "a") as f: for row in rows: await f.write(json.dumps(row, ensure_ascii=False) + "\n") def _get_correct_folder_for_save_file(self, game_board:GameBoard, file_name:str, game_type:str, leader_board:bool, winner:bool): storage_folder = self.file_path if leader_board: storage_folder = os.path.join(storage_folder, "00_Leaderboards") storage_folder = os.path.join(storage_folder, self.save_folder_dict[game_type]) storage_folder = os.path.join( storage_folder, game_board.now_date.strftime("%Y"), game_board.now_date.strftime("%m_%B"), game_board.now_date.strftime("%d"), ) if winner: storage_folder = os.path.join(storage_folder, "Winner") else: storage_folder = os.path.join(storage_folder, "Lost") return os.path.join(storage_folder, file_name) async def save(self, game_board:GameBoard): game_type = game_board.get_type_of_game() dataset = Dataset(game_board).build(only_good_moves=True) await self._append_dataset_jsonl(dataset, game_board) if self.dataset_only: return save_file_path = self._get_correct_folder_for_save_file( game_board, f"{game_board.snake_class.__class__.__name__}_{game_board.now_date.strftime('%H-%M-%S')}_{game_board.id}.json", game_type["name"], game_type["is_ladder"], True if game_board.winner_snake_names and "me" in game_board.winner_snake_names else False ) payload = { "winner": game_board.winner_snake_names, "game": { "url": game_board.url, "id": game_board.id, "final_turns": game_board.turn, "map": game_board.map, "type": game_type, "ruleset": game_board.ruleset, }, "moves": game_board.turns, "snake": { "type": game_board.snake_class.__class__.__name__, "calculations": game_board.snake_class.get_history(), }, "dataset": dataset, } await save_file(save_file_path, json.dumps(payload, indent=2, ensure_ascii=False)) def cleanup(self): pass