add dataset updates with doc updates
Build and Push Docker Container / build-and-push (push) Failing after 12m18s

This commit is contained in:
2026-04-03 11:40:47 +02:00
parent 2e1f91355b
commit 51de53d01c
5 changed files with 235 additions and 28 deletions
+47
View File
@@ -54,4 +54,51 @@ battlesnake play -W 11 -H 11 --name 'Python Starter Project' --url http://localh
Continue with the [Battlesnake Quickstart Guide](https://docs.battlesnake.com/quickstart) to customize and improve your Battlesnake's behavior. Continue with the [Battlesnake Quickstart Guide](https://docs.battlesnake.com/quickstart) to customize and improve your Battlesnake's behavior.
## Included Competitive Snake
This repo now includes `snakes/BestBattleSnake.py`, a stronger default snake that combines:
- collision and head-to-head risk checks
- flood-fill space evaluation to avoid traps
- food routing that gets more aggressive as health drops
- tail access checks for better long-term survival
Run it explicitly with:
```sh
SNAKE=BestBattleSnake python main.py
```
Optional duel tuning (when only 2 snakes are alive):
```sh
BATTLE_SNAKE_DUEL_STYLE=balanced python main.py
```
Allowed values: `safe`, `balanced`, `aggressive`.
## Export Training Dataset
Game saves now include a `dataset` section with labeled move samples.
Export all stored samples to JSONL:
```sh
python -m server.DatasetExporter --input data --output data/dataset/good_moves.jsonl
```
Or with `just`:
```sh
just export-dataset
```
To store compact dataset-only records (JSONL) and skip full per-game JSON files:
```sh
STORE_DATASET_ONLY=true DATASET_JSONL_PATH=data/dataset/good_moves.jsonl python main.py
```
Optional compact storage tuning:
- `DATASET_ROTATE_DAILY=true` creates one JSONL file per day (default: `true`)
- `DATASET_JSONL_MAX_MB=50` rotates when file reaches max size in MB (default: `50`)
- `DATASET_COMPRESS_ROTATED=true` gzip-compresses rotated/old JSONL files (default: `true`)
**Note:** To play games on [play.battlesnake.com](https://play.battlesnake.com) you'll need to deploy your Battlesnake to a live web server OR use a port forwarding tool like [ngrok](https://ngrok.com/) to access your server locally. **Note:** To play games on [play.battlesnake.com](https://play.battlesnake.com) you'll need to deploy your Battlesnake to a live web server OR use a port forwarding tool like [ngrok](https://ngrok.com/) to access your server locally.
+5
View File
@@ -24,6 +24,7 @@ default:
# ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------
# Snake Script helpers # Snake Script helpers
# ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------
run: run:
"{{justfile_directory()}}/main.py" "{{justfile_directory()}}/main.py"
@@ -45,5 +46,9 @@ test-seed:
BATTLESNAKE_CLI=battlesnake_cli_1.2.3_Linux_x86_64/battlesnake BATTLESNAKE_CLI=battlesnake_cli_1.2.3_Linux_x86_64/battlesnake
$BATTLESNAKE_CLI play -W 11 -H 11 --name 'Python Starter Project' --url http://localhost:8000 -g solo --browser --seed 1713099635738952360 $BATTLESNAKE_CLI play -W 11 -H 11 --name 'Python Starter Project' --url http://localhost:8000 -g solo --browser --seed 1713099635738952360
# ------------------------------------------------------------------------------
# Fataset helpers
# ------------------------------------------------------------------------------
export-dataset input="data" output="data/dataset/good_moves.jsonl": export-dataset input="data" output="data/dataset/good_moves.jsonl":
python -m server.DatasetExporter --input "{{input}}" --output "{{output}}" python -m server.DatasetExporter --input "{{input}}" --output "{{output}}"
+42 -9
View File
@@ -1,7 +1,13 @@
from server.GameBoard import GameBoard from server.GameBoard import GameBoard
from server.Dataset import Dataset
from datetime import datetime from datetime import datetime
import gel, json, time import json, time
try:
import gel as _gel # type: ignore[import-not-found]
except ImportError: # pragma: no cover
_gel = None
class EdgeDB: class EdgeDB:
def __init__(self, database:str=None, tls_security:str='insecure', **kwargs): def __init__(self, database:str=None, tls_security:str='insecure', **kwargs):
@@ -10,16 +16,20 @@ class EdgeDB:
self._connect() self._connect()
def _connect(self): def _connect(self):
self.client = gel.create_client( if _gel is None:
tls_security=self.tls_security, raise ImportError("The 'gel' package is required to use EdgeDB storage")
database=self.database self.client = _gel.create_client(
tls_security=self.tls_security, database=self.database
) )
def run_query_with_reconnection(self, function, *args, **kwargs): def run_query_with_reconnection(self, function, *args, **kwargs):
while True: while True:
try: try:
return function(*args, **kwargs) return function(*args, **kwargs)
except gel.errors.ClientConnectionFailedError: except Exception as error:
if error.__class__.__name__ != "ClientConnectionFailedError":
raise
self._connect() self._connect()
time.sleep(0.5) time.sleep(0.5)
@@ -39,9 +49,21 @@ class EdgeDB:
data = [] data = []
moves = game_board.turns moves = game_board.turns
snake_calulations = [[calc for calc in ele["data"]] for ele in game_board.snake_class.get_history() ] snake_calulations = [[calc for calc in ele["data"]] for ele in game_board.snake_class.get_history() ]
labels_by_turn = Dataset(game_board).labels_by_turn()
for i in range(len(moves)): for i in range(len(moves)):
data.append({"turn": moves[i]["turn"], "move": moves[i]["move"], "game_board": moves[i]["game_board"], "calculations": snake_calulations[i]}) calculations = snake_calulations[i] if i < len(snake_calulations) else []
calculations.append({
"dataset": {
"is_good_move": labels_by_turn.get(moves[i]["turn"], False)
}
})
data.append({
"turn": moves[i]["turn"],
"move": moves[i]["move"],
"game_board": moves[i]["game_board"],
"calculations": calculations,
})
return data return data
@@ -91,9 +113,20 @@ class EdgeDB:
created_at=datetime.fromtimestamp(game_board.now_date.timestamp(), game_board.now_date.astimezone().tzinfo), created_at=datetime.fromtimestamp(game_board.now_date.timestamp(), game_board.now_date.astimezone().tzinfo),
turns=game_board.turn, turns=game_board.turn,
map=game_board.map if game_board.map else "standard", map=game_board.map if game_board.map else "standard",
winner=', '.join(game_board.winner_snake_names) if game_board.winner_snake_names else "", winner=', '.join(game_board.winner_snake_names)
moves=[ tuple([x["turn"], x["move"], json.dumps(x["game_board"]), [ json.dumps(ele) for ele in x["calculations"] ] ]) for x in self.create_moves_with_calculations(game_board) ], if game_board.winner_snake_names
else "",
moves=[
tuple(
[
x["turn"],
x["move"],
json.dumps(x["game_board"]),
[json.dumps(ele) for ele in x["calculations"]],
]
)
for x in self.create_moves_with_calculations(game_board)
],
game_type=game_type["name"], game_type=game_type["name"],
is_ladder=game_type["is_ladder"], is_ladder=game_type["is_ladder"],
+135 -15
View File
@@ -1,6 +1,10 @@
from server.GameBoard import GameBoard from server.GameBoard import GameBoard
from server.Dataset import Dataset
from server.Files import save_file from server.Files import save_file
import aiofiles
import aiofiles.os
import gzip
import json, os import json, os
class LocalStorage: class LocalStorage:
@@ -12,6 +16,106 @@ class LocalStorage:
"solo": "05_Solo", "solo": "05_Solo",
} }
self.file_path = file_path self.file_path = file_path
self.dataset_only = os.getenv("STORE_DATASET_ONLY", "false").strip().lower() in ("1", "true", "yes", "on")
self.dataset_jsonl_path = os.getenv("DATASET_JSONL_PATH", os.path.join(self.file_path, "dataset", "good_moves.jsonl"))
self.dataset_rotate_daily = os.getenv("DATASET_ROTATE_DAILY", "true").strip().lower() in ("1", "true", "yes", "on")
self.dataset_compress_rotated = os.getenv("DATASET_COMPRESS_ROTATED", "true").strip().lower() in ("1", "true", "yes", "on")
self.dataset_max_bytes = int(float(os.getenv("DATASET_JSONL_MAX_MB", "50")) * 1024 * 1024)
def _get_active_dataset_path(self, game_board:GameBoard):
if not self.dataset_rotate_daily:
return self.dataset_jsonl_path
base, ext = os.path.splitext(self.dataset_jsonl_path)
if ext == "":
ext = ".jsonl"
return f"{base}-{game_board.now_date.strftime('%Y-%m-%d')}{ext}"
def _gzip_file(self, file_path:str):
gz_path = f"{file_path}.gz"
with open(file_path, "rb") as src:
with gzip.open(gz_path, "wb") as dst:
dst.writelines(src)
os.remove(file_path)
async def _compress_old_daily_files(self, active_path:str):
if not self.dataset_compress_rotated:
return
folder = os.path.dirname(active_path)
base_name = os.path.basename(self.dataset_jsonl_path)
base_stem, _ = os.path.splitext(base_name)
prefix = f"{base_stem}-"
active_name = os.path.basename(active_path)
if folder == "" or not await aiofiles.os.path.exists(folder):
return
for name in os.listdir(folder):
if name == active_name:
continue
if not name.startswith(prefix):
continue
if not name.endswith(".jsonl"):
continue
self._gzip_file(os.path.join(folder, name))
async def _rotate_if_needed(self, active_path:str, game_board:GameBoard):
if self.dataset_max_bytes <= 0:
return
if not await aiofiles.os.path.exists(active_path):
return
file_size = (await aiofiles.os.stat(active_path)).st_size
if file_size < self.dataset_max_bytes:
return
timestamp = game_board.now_date.strftime("%Y%m%d-%H%M%S")
rotated_path = f"{active_path}.{timestamp}.jsonl"
suffix = 1
while await aiofiles.os.path.exists(rotated_path):
suffix += 1
rotated_path = f"{active_path}.{timestamp}.{suffix}.jsonl"
await aiofiles.os.rename(active_path, rotated_path)
if self.dataset_compress_rotated:
self._gzip_file(rotated_path)
def _build_dataset_rows(self, dataset_payload:dict, game_board:GameBoard):
game_info = dataset_payload.get("game", {})
snake_info = dataset_payload.get("snake", {})
rows = []
for sample in dataset_payload.get("samples", []):
rows.append({
"game_id": game_info.get("id", game_board.id),
"game_map": game_info.get("map", game_board.map),
"game_type": game_info.get("type", game_board.get_type_of_game()),
"snake_type": snake_info.get(
"type", game_board.snake_class.__class__.__name__
),
"turn": sample.get("turn"),
"move": sample.get("move"),
"is_good_move": sample.get("is_good_move", False),
"game_board": sample.get("game_board"),
"history": sample.get("history"),
})
return rows
async def _append_dataset_jsonl(self, dataset_payload:dict, game_board:GameBoard):
rows = self._build_dataset_rows(dataset_payload, game_board)
if len(rows) == 0:
return
active_path = self._get_active_dataset_path(game_board)
await aiofiles.os.makedirs(os.path.dirname(active_path), exist_ok=True)
await self._compress_old_daily_files(active_path)
await self._rotate_if_needed(active_path, game_board)
async with aiofiles.open(active_path, "a") as f:
for row in rows:
await f.write(json.dumps(row, ensure_ascii=False) + "\n")
def _get_correct_folder_for_save_file(self, game_board:GameBoard, file_name:str, game_type:str, leader_board:bool, winner:bool): def _get_correct_folder_for_save_file(self, game_board:GameBoard, file_name:str, game_type:str, leader_board:bool, winner:bool):
storage_folder = self.file_path storage_folder = self.file_path
@@ -19,7 +123,12 @@ class LocalStorage:
storage_folder = os.path.join(storage_folder, "00_Leaderboards") storage_folder = os.path.join(storage_folder, "00_Leaderboards")
storage_folder = os.path.join(storage_folder, self.save_folder_dict[game_type]) storage_folder = os.path.join(storage_folder, self.save_folder_dict[game_type])
storage_folder = os.path.join(storage_folder, game_board.now_date.strftime('%Y'), game_board.now_date.strftime('%m_%B'), game_board.now_date.strftime('%d')) storage_folder = os.path.join(
storage_folder,
game_board.now_date.strftime("%Y"),
game_board.now_date.strftime("%m_%B"),
game_board.now_date.strftime("%d"),
)
if winner: if winner:
storage_folder = os.path.join(storage_folder, "Winner") storage_folder = os.path.join(storage_folder, "Winner")
@@ -30,6 +139,12 @@ class LocalStorage:
async def save(self, game_board:GameBoard): async def save(self, game_board:GameBoard):
game_type = game_board.get_type_of_game() game_type = game_board.get_type_of_game()
dataset = Dataset(game_board).build(only_good_moves=True)
await self._append_dataset_jsonl(dataset, game_board)
if self.dataset_only:
return
save_file_path = self._get_correct_folder_for_save_file( save_file_path = self._get_correct_folder_for_save_file(
game_board, game_board,
f"{game_board.snake_class.__class__.__name__}_{game_board.now_date.strftime('%H-%M-%S')}_{game_board.id}.json", f"{game_board.snake_class.__class__.__name__}_{game_board.now_date.strftime('%H-%M-%S')}_{game_board.id}.json",
@@ -39,21 +154,26 @@ class LocalStorage:
) )
await save_file(save_file_path, { await save_file(save_file_path, {
"winner": game_board.winner_snake_names, "winner": game_board.winner_snake_names,
"game": { "game": {
"url": game_board.url, "url": game_board.url,
"id": game_board.id, "id": game_board.id,
"final_turns": game_board.turn, "final_turns": game_board.turn,
"map": game_board.map, "map": game_board.map,
"type": game_type, "type": game_type,
"ruleset": game_board.ruleset, "ruleset": game_board.ruleset,
},
"moves": game_board.turns,
"snake": {
"type": game_board.snake_class.__class__.__name__,
"calculations": game_board.snake_class.get_history(),
},
"dataset": dataset,
}, },
"moves": game_board.turns, callback=json.dump,
"snake": { indent=2,
"type": game_board.snake_class.__class__.__name__, ensure_ascii=False,
"calculations": game_board.snake_class.get_history(), )
},
}, callback=json.dump, indent=2, ensure_ascii=False)
def cleanup(self): def cleanup(self):
pass pass
+6 -4
View File
@@ -1,6 +1,8 @@
import unittest import unittest
from typing import cast
from server.Dataset import Dataset from server.Dataset import Dataset
from server.GameBoard import GameBoard
class DummySnake: class DummySnake:
def get_history(self): def get_history(self):
@@ -25,7 +27,7 @@ class DummyGameBoard:
class TestDataset(unittest.TestCase): class TestDataset(unittest.TestCase):
def test_build_only_good_moves_for_wins(self): def test_build_only_good_moves_for_wins(self):
dataset = Dataset(DummyGameBoard(["me"])) dataset = Dataset(cast(GameBoard, DummyGameBoard(["me"])))
payload = dataset.build(only_good_moves=True) payload = dataset.build(only_good_moves=True)
self.assertTrue(payload["did_win"]) self.assertTrue(payload["did_win"])
@@ -33,15 +35,15 @@ class TestDataset(unittest.TestCase):
self.assertTrue(all(sample["is_good_move"] for sample in payload["samples"])) self.assertTrue(all(sample["is_good_move"] for sample in payload["samples"]))
def test_build_returns_no_samples_for_losses_when_only_good(self): def test_build_returns_no_samples_for_losses_when_only_good(self):
dataset = Dataset(DummyGameBoard(["enemy"])) dataset = Dataset(cast(GameBoard, DummyGameBoard(["enemy"])))
payload = dataset.build(only_good_moves=True) payload = dataset.build(only_good_moves=True)
self.assertFalse(payload["did_win"]) self.assertFalse(payload["did_win"])
self.assertEqual(payload["total_samples"], 0) self.assertEqual(payload["total_samples"], 0)
def test_labels_by_turn(self): def test_labels_by_turn(self):
winner_labels = Dataset(DummyGameBoard(["me"])).labels_by_turn() winner_labels = Dataset(cast(GameBoard, DummyGameBoard(["me"]))).labels_by_turn()
loser_labels = Dataset(DummyGameBoard(["enemy"])).labels_by_turn() loser_labels = Dataset(cast(GameBoard, DummyGameBoard(["enemy"]))).labels_by_turn()
self.assertEqual(winner_labels, {1: True, 2: True}) self.assertEqual(winner_labels, {1: True, 2: True})
self.assertEqual(loser_labels, {1: False, 2: False}) self.assertEqual(loser_labels, {1: False, 2: False})