Files
snake-python/server/dataset/DatasetCurator.py
T

214 lines
6.8 KiB
Python

import argparse, hashlib, shutil, json
from pathlib import Path
from server.dataset.DatasetIO import DatasetIO
class DatasetCurator:
def __init__(self, input_files:list[str], output_file:str, min_turn:int=6, late_turn:int=20, max_safe_options:int=2, min_score:int=3, append:bool=False, archive_input:bool=False, archive_dir:str|None=None):
self.input_files = input_files
self.output_file = Path(output_file)
self.min_turn = min_turn
self.late_turn = late_turn
self.max_safe_options = max_safe_options
self.min_score = min_score
self.append = append
self.archive_input = archive_input
self.archive_dir = Path(archive_dir) if archive_dir else self.output_file.parent / "archive"
def _safe_options_count(self, row:dict):
history = row.get("history", {})
for item in history.get("data", []):
if item.get("function") == "get_possible_moves":
return len(item.get("safe_positions", {}))
return None
def _state_hash(self, row:dict):
board = row.get("game_board", {})
snakes = board.get("snakes", [])
snakes_key = []
for snake in snakes:
snakes_key.append((
snake.get("id"),
snake.get("health"),
tuple((seg.get("x"), seg.get("y")) for seg in snake.get("body", [])),
))
key = {
"width": board.get("width"),
"height": board.get("height"),
"snakes": sorted(snakes_key),
"food": sorted((f.get("x"), f.get("y")) for f in board.get("food", [])),
"hazards": sorted((h.get("x"), h.get("y")) for h in board.get("hazards", [])),
}
raw = json.dumps(key, sort_keys=True, separators=(",", ":"))
return hashlib.sha1(raw.encode("utf-8")).hexdigest()
def _score(self, row:dict):
score = 0
turn = int(row.get("turn", 0))
safe_options = self._safe_options_count(row)
snakes = row.get("game_board", {}).get("snakes", [])
opponents = max(0, len(snakes) - 1)
if turn >= self.late_turn:
score += 2
if safe_options is not None and safe_options <= self.max_safe_options:
score += 3
if opponents >= 1:
score += 1
return score, safe_options
def curate(self):
self.output_file.parent.mkdir(parents=True, exist_ok=True)
input_paths = DatasetIO.resolve_input_files(self.input_files)
total = 0
kept = 0
skipped_turn = 0
skipped_quality = 0
skipped_duplicate = 0
seen_states = set()
if self.append and self.output_file.exists():
for row in DatasetIO.iter_jsonl_rows(self.output_file):
state_key = self._state_hash(row)
seen_states.add((state_key, row.get("move")))
mode = "a" if self.append else "w"
with DatasetIO.open_text(self.output_file, mode) as dst:
for input_path in input_paths:
for row in DatasetIO.iter_jsonl_rows(input_path):
total += 1
if not row.get("is_good_move", False):
skipped_quality += 1
continue
if int(row.get("turn", 0)) < self.min_turn:
skipped_turn += 1
continue
quality_score, safe_options = self._score(row)
if quality_score < self.min_score:
skipped_quality += 1
continue
state_key = self._state_hash(row)
dedupe_key = (state_key, row.get("move"))
if dedupe_key in seen_states:
skipped_duplicate += 1
continue
seen_states.add(dedupe_key)
compact_row = {
"game_id": row.get("game_id"),
"turn": row.get("turn"),
"move": row.get("move"),
"game_type": row.get("game_type"),
"quality_score": quality_score,
"safe_options": safe_options,
"game_board": row.get("game_board"),
}
dst.write(json.dumps(compact_row, ensure_ascii=False) + "\n")
kept += 1
archived_files = []
if self.archive_input:
archived_files = self._archive_processed_files(input_paths)
return {
"input_files": [str(path) for path in input_paths],
"total_rows": total,
"kept_rows": kept,
"skipped_turn": skipped_turn,
"skipped_quality": skipped_quality,
"skipped_duplicate": skipped_duplicate,
"append_mode": self.append,
"archive_input": self.archive_input,
"archived_files": archived_files,
"output_file": str(self.output_file),
}
def _archive_processed_files(self, input_paths:list[Path]):
self.archive_dir.mkdir(parents=True, exist_ok=True)
archived = []
output_resolved = (
self.output_file.resolve()
if self.output_file.exists()
else self.output_file
)
archive_resolved = self.archive_dir.resolve()
for source_path in input_paths:
if not source_path.exists():
continue
source_resolved = source_path.resolve()
if source_resolved == output_resolved:
continue
if source_resolved.parent == archive_resolved:
continue
destination = self.archive_dir / source_path.name
if destination.exists():
stem = destination.stem
suffix = destination.suffix
index = 1
while True:
candidate = self.archive_dir / f"{stem}.{index}{suffix}"
if not candidate.exists():
destination = candidate
break
index += 1
shutil.move(str(source_path), str(destination))
archived.append(str(destination))
return archived
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Create curated best-moves dataset")
parser.add_argument(
"--input",
action="append",
required=True,
help="Input JSONL/JSONL.GZ file, directory, or glob pattern. Repeat for multiple inputs.",
)
parser.add_argument("--output", required=True, help="Output JSONL file")
parser.add_argument("--min-turn", type=int, default=6)
parser.add_argument("--late-turn", type=int, default=20)
parser.add_argument("--max-safe-options", type=int, default=2)
parser.add_argument("--min-score", type=int, default=3)
parser.add_argument(
"--append",
action="store_true",
help="Append to existing output and dedupe against existing rows",
)
parser.add_argument(
"--archive-input",
action="store_true",
help="Move processed input files to archive directory after successful curation",
)
parser.add_argument(
"--archive-dir",
default=None,
help="Archive directory for processed input files (default: <output-dir>/archive)",
)
args = parser.parse_args()
report = DatasetCurator(
input_files=args.input,
output_file=args.output,
min_turn=args.min_turn,
late_turn=args.late_turn,
max_safe_options=args.max_safe_options,
min_score=args.min_score,
append=args.append,
archive_input=args.archive_input,
archive_dir=args.archive_dir,
).curate()
print(json.dumps(report, indent=2))