from datetime import UTC, datetime from pathlib import Path from typing import Any import gzip, json, os class RLBootstrapDataset: def __init__(self): self.enabled = self._env_bool("RL_BOOTSTRAP_ENABLED", default=False) self.min_base_rows = self._env_int("RL_MIN_BASE_ROWS", default=5000) self.base_dataset_path = Path(os.getenv("RL_BASE_DATASET", "data/dataset/best_moves.jsonl")) self.output_path = Path(os.getenv("RL_BOOTSTRAP_OUTPUT", "data/dataset/rl_bootstrap.jsonl")) self.max_bytes = int(float(os.getenv("RL_BOOTSTRAP_MAX_MB", "50")) * 1024 * 1024) self.needs_more_data = False @staticmethod def _env_bool(name:str, default:bool=False) -> bool: value = os.getenv(name) if value is None: return default return value.lower() in {"1", "true", "yes", "on"} @staticmethod def _env_int(name:str, default:int) -> int: value = os.getenv(name) if value is None: return default try: return int(value) except ValueError: return default @staticmethod def count_jsonl_rows(path:Path) -> int: count = 0 candidates = [path] if path.suffix == ".gz": candidates.append(path.with_suffix("")) else: candidates.append(Path(f"{path}.gz")) seen = set() for candidate in candidates: if candidate in seen: continue seen.add(candidate) if not candidate.exists() or not candidate.is_file(): continue open_fn = gzip.open if candidate.suffix == ".gz" else open try: with open_fn(candidate, "rt", encoding="utf-8") as handle: for line in handle: if line.strip(): count += 1 except (OSError, UnicodeError): continue return count @staticmethod def rotate_and_gzip_if_size_reached(path:Path, max_bytes:int) -> bool: if max_bytes <= 0: return False if not path.exists() or not path.is_file(): return False if path.stat().st_size < max_bytes: return False timestamp = datetime.now(UTC).strftime("%Y%m%d-%H%M%S") if path.suffix == ".jsonl": rotated_name = f"{path.stem}.{timestamp}.jsonl" else: rotated_name = f"{path.name}.{timestamp}" rotated_path = path.with_name(rotated_name) suffix = 1 while rotated_path.exists(): suffix += 1 if path.suffix == ".jsonl": rotated_name = f"{path.stem}.{timestamp}.{suffix}.jsonl" else: rotated_name = f"{path.name}.{timestamp}.{suffix}" rotated_path = path.with_name(rotated_name) path.rename(rotated_path) with rotated_path.open("rb") as src: with gzip.open(f"{rotated_path}.gz", "wb") as dst: dst.writelines(src) rotated_path.unlink() return True def refresh_state(self): if not self.enabled: self.needs_more_data = False return base_rows = self.count_jsonl_rows(self.base_dataset_path) self.needs_more_data = base_rows < self.min_base_rows def record_sample(self, game_data:Any, move:str, safe_moves:dict[str, dict[str, int]], reason:str, scores:dict[str, float]|None=None,): if not self.enabled or not self.needs_more_data: return try: self.output_path.parent.mkdir(parents=True, exist_ok=True) row = { "source": "best_battlesnake_bootstrap", "game_id": getattr(game_data, "id", None), "turn": game_data.get_turn(), "move": move, "safe_moves": list(safe_moves.keys()), "reason": reason, "game_board": game_data.get_game_board_as_dict(), } if scores: row["scores"] = {k: round(v, 5) for k, v in scores.items()} with self.output_path.open("a", encoding="utf-8") as handle: handle.write(json.dumps(row, ensure_ascii=False) + "\n") self.rotate_and_gzip_if_size_reached(self.output_path, self.max_bytes) except Exception: return