Compare commits

..

5 Commits

Author SHA1 Message Date
daniel156161 51de53d01c add dataset updates with doc updates
Build and Push Docker Container / build-and-push (push) Failing after 12m18s
2026-04-03 11:40:47 +02:00
daniel156161 2e1f91355b add Dataset Class and Tests 2026-04-03 10:35:21 +02:00
daniel156161 6b69d133b6 fix line spacing and make it look better 2026-04-03 10:30:35 +02:00
daniel156161 7d52d7dca8 add justfile for testing 2026-04-03 10:30:07 +02:00
daniel156161 a885b624f9 add new BestBattleSnake 2026-04-03 10:29:48 +02:00
12 changed files with 1802 additions and 48 deletions
+47
View File
@@ -54,4 +54,51 @@ battlesnake play -W 11 -H 11 --name 'Python Starter Project' --url http://localh
Continue with the [Battlesnake Quickstart Guide](https://docs.battlesnake.com/quickstart) to customize and improve your Battlesnake's behavior.
## Included Competitive Snake
This repo now includes `snakes/BestBattleSnake.py`, a stronger default snake that combines:
- collision and head-to-head risk checks
- flood-fill space evaluation to avoid traps
- food routing that gets more aggressive as health drops
- tail access checks for better long-term survival
Run it explicitly with:
```sh
SNAKE=BestBattleSnake python main.py
```
Optional duel tuning (when only 2 snakes are alive):
```sh
BATTLE_SNAKE_DUEL_STYLE=balanced python main.py
```
Allowed values: `safe`, `balanced`, `aggressive`.
## Export Training Dataset
Game saves now include a `dataset` section with labeled move samples.
Export all stored samples to JSONL:
```sh
python -m server.DatasetExporter --input data --output data/dataset/good_moves.jsonl
```
Or with `just`:
```sh
just export-dataset
```
To store compact dataset-only records (JSONL) and skip full per-game JSON files:
```sh
STORE_DATASET_ONLY=true DATASET_JSONL_PATH=data/dataset/good_moves.jsonl python main.py
```
Optional compact storage tuning:
- `DATASET_ROTATE_DAILY=true` creates one JSONL file per day (default: `true`)
- `DATASET_JSONL_MAX_MB=50` rotates when file reaches max size in MB (default: `50`)
- `DATASET_COMPRESS_ROTATED=true` gzip-compresses rotated/old JSONL files (default: `true`)
**Note:** To play games on [play.battlesnake.com](https://play.battlesnake.com) you'll need to deploy your Battlesnake to a live web server OR use a port forwarding tool like [ngrok](https://ngrok.com/) to access your server locally.
+54
View File
@@ -0,0 +1,54 @@
# Justfile for Migrate Database Changes Workflow
# Docs: https://just.systems/man/en/
# ------------------------------------------------------------------------------
# Global settings
# ------------------------------------------------------------------------------
# Load Env
set dotenv-load
set dotenv-required := true
# Use zsh
set shell := ["bash", "-cu"]
# ------------------------------------------------------------------------------
# Default
# ------------------------------------------------------------------------------
# List all Available recipes
[private]
default:
@just --list --unsorted
# ------------------------------------------------------------------------------
# Snake Script helpers
# ------------------------------------------------------------------------------
run:
"{{justfile_directory()}}/main.py"
# ------------------------------------------------------------------------------
# Testing helpers
# ------------------------------------------------------------------------------
test-constrictor:
#!/usr/bin/env bash
set -euo pipefail
BATTLESNAKE_CLI=battlesnake_cli_1.2.3_Linux_x86_64/battlesnake
$BATTLESNAKE_CLI play -W 11 -H 11 --name 'Python Starter Project' --url http://localhost:8000 -g constrictor --browser --minimumFood 0
test-seed:
#!/usr/bin/env bash
set -euo pipefail
BATTLESNAKE_CLI=battlesnake_cli_1.2.3_Linux_x86_64/battlesnake
$BATTLESNAKE_CLI play -W 11 -H 11 --name 'Python Starter Project' --url http://localhost:8000 -g solo --browser --seed 1713099635738952360
# ------------------------------------------------------------------------------
# Fataset helpers
# ------------------------------------------------------------------------------
export-dataset input="data" output="data/dataset/good_moves.jsonl":
python -m server.DatasetExporter --input "{{input}}" --output "{{output}}"
+22 -17
View File
@@ -19,23 +19,28 @@ import os
# Start server when `python main.py` is run
if __name__ == "__main__":
if os.environ.get("CREATE_ENV_FILE", None):
CreateEnvironmentFile.load_dotenv({"STORE_GAME_HISTORY": True, "DEBUG": True, "SNAKE": "TemplateSnake", "STORE_IF_WIN_AND_MOVES_ARE_BIGGER_AS": 10})
if os.environ.get("CREATE_ENV_FILE", None):
CreateEnvironmentFile.load_dotenv({
"STORE_GAME_HISTORY": True,
"DEBUG": True,
"SNAKE": "TemplateSnake",
"STORE_IF_WIN_AND_MOVES_ARE_BIGGER_AS": 10,
})
server = Server(
data_path=os.path.dirname(__file__),
snake_type=os.environ.get("SNAKE", "TemplateSnake"),
storage_type=os.environ.get("STORAGE", "LocalStorage"),
store_game_when_win_and_moves_are_bigger_as=int(os.environ.get("STORE_IF_WIN_AND_MOVES_ARE_BIGGER_AS", 10)),
debug=os.environ.get("DEBUG_SERVER", False),
check_tls_security=False
)
server = Server(
data_path=os.path.dirname(__file__),
snake_type=os.environ.get("SNAKE", "TemplateSnake"),
storage_type=os.environ.get("STORAGE", "LocalStorage"),
store_game_when_win_and_moves_are_bigger_as=int(os.environ.get("STORE_IF_WIN_AND_MOVES_ARE_BIGGER_AS", 10)),
debug=os.environ.get("DEBUG_SERVER", False),
check_tls_security=False,
)
if os.environ.get("STORE_GAME_HISTORY", None):
server.enable_store_game_state()
if os.environ.get("STORE_GAME_HISTORY", None):
server.enable_store_game_state()
server.run(
host=os.environ.get("HOST", "0.0.0.0"),
port=int(os.environ.get("PORT", "8000")),
debug=bool(os.environ.get("DEBUG", False))
)
server.run(
host=os.environ.get("HOST", "0.0.0.0"),
port=int(os.environ.get("PORT", "8000")),
debug=bool(os.environ.get("DEBUG", False)),
)
+56
View File
@@ -0,0 +1,56 @@
from server.GameBoard import GameBoard
class Dataset:
VALID_MOVES = {"up", "down", "left", "right"}
def __init__(self, game_board: GameBoard):
self.game_board = game_board
def _did_we_win(self):
winners = self.game_board.winner_snake_names or []
return "me" in winners
def _is_good_move(self, move: str):
return move in self.VALID_MOVES
def build(self, only_good_moves: bool = True):
game_type = self.game_board.get_type_of_game()
did_win = self._did_we_win()
samples = []
history = self.game_board.snake_class.get_history()
for index, turn in enumerate(self.game_board.turns):
move = turn.get("move")
is_good_move = did_win and self._is_good_move(move)
if only_good_moves and not is_good_move:
continue
samples.append({
"turn": turn.get("turn"),
"move": move,
"game_board": turn.get("game_board"),
"is_good_move": is_good_move,
"history": history[index] if index < len(history) else {},
})
return {
"game": {
"id": self.game_board.id,
"map": self.game_board.map,
"type": game_type,
},
"snake": {
"type": self.game_board.snake_class.__class__.__name__,
},
"did_win": did_win,
"total_samples": len(samples),
"samples": samples,
}
def labels_by_turn(self):
did_win = self._did_we_win()
labels = {}
for turn in self.game_board.turns:
move = turn.get("move")
labels[turn.get("turn")] = did_win and self._is_good_move(move)
return labels
+64
View File
@@ -0,0 +1,64 @@
import argparse
import json
from pathlib import Path
class DatasetExporter:
def __init__(self, input_dir:str, output_file:str):
self.input_dir = Path(input_dir)
self.output_file = Path(output_file)
def _iter_game_files(self):
if not self.input_dir.exists():
return []
return sorted(self.input_dir.rglob("*.json"))
def _extract_samples(self, payload:dict, source_file:Path):
dataset = payload.get("dataset", {})
game_info = dataset.get("game", payload.get("game", {}))
snake_info = dataset.get("snake", payload.get("snake", {}))
samples = []
for sample in dataset.get("samples", []):
samples.append({
"game_id": game_info.get("id"),
"game_map": game_info.get("map"),
"game_type": game_info.get("type"),
"snake_type": snake_info.get("type"),
"turn": sample.get("turn"),
"move": sample.get("move"),
"is_good_move": sample.get("is_good_move", False),
"game_board": sample.get("game_board"),
"history": sample.get("history"),
"source_file": str(source_file),
})
return samples
def export_jsonl(self):
game_files = self._iter_game_files()
self.output_file.parent.mkdir(parents=True, exist_ok=True)
sample_count = 0
with self.output_file.open("w", encoding="utf-8") as output:
for game_file in game_files:
with game_file.open("r", encoding="utf-8") as source:
payload = json.load(source)
for sample in self._extract_samples(payload, game_file):
output.write(json.dumps(sample, ensure_ascii=False) + "\n")
sample_count += 1
return {
"games_scanned": len(game_files),
"samples_exported": sample_count,
"output_file": str(self.output_file),
}
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Export Battlesnake dataset to JSONL")
parser.add_argument("--input", default="data", help="Input directory with stored game JSON files")
parser.add_argument("--output", default="data/dataset/good_moves.jsonl", help="Output JSONL file")
args = parser.parse_args()
report = DatasetExporter(args.input, args.output).export_jsonl()
print(json.dumps(report, indent=2))
+42 -9
View File
@@ -1,7 +1,13 @@
from server.GameBoard import GameBoard
from server.Dataset import Dataset
from datetime import datetime
import gel, json, time
import json, time
try:
import gel as _gel # type: ignore[import-not-found]
except ImportError: # pragma: no cover
_gel = None
class EdgeDB:
def __init__(self, database:str=None, tls_security:str='insecure', **kwargs):
@@ -10,16 +16,20 @@ class EdgeDB:
self._connect()
def _connect(self):
self.client = gel.create_client(
tls_security=self.tls_security,
database=self.database
if _gel is None:
raise ImportError("The 'gel' package is required to use EdgeDB storage")
self.client = _gel.create_client(
tls_security=self.tls_security, database=self.database
)
def run_query_with_reconnection(self, function, *args, **kwargs):
while True:
try:
return function(*args, **kwargs)
except gel.errors.ClientConnectionFailedError:
except Exception as error:
if error.__class__.__name__ != "ClientConnectionFailedError":
raise
self._connect()
time.sleep(0.5)
@@ -39,9 +49,21 @@ class EdgeDB:
data = []
moves = game_board.turns
snake_calulations = [[calc for calc in ele["data"]] for ele in game_board.snake_class.get_history() ]
labels_by_turn = Dataset(game_board).labels_by_turn()
for i in range(len(moves)):
data.append({"turn": moves[i]["turn"], "move": moves[i]["move"], "game_board": moves[i]["game_board"], "calculations": snake_calulations[i]})
calculations = snake_calulations[i] if i < len(snake_calulations) else []
calculations.append({
"dataset": {
"is_good_move": labels_by_turn.get(moves[i]["turn"], False)
}
})
data.append({
"turn": moves[i]["turn"],
"move": moves[i]["move"],
"game_board": moves[i]["game_board"],
"calculations": calculations,
})
return data
@@ -91,9 +113,20 @@ class EdgeDB:
created_at=datetime.fromtimestamp(game_board.now_date.timestamp(), game_board.now_date.astimezone().tzinfo),
turns=game_board.turn,
map=game_board.map if game_board.map else "standard",
winner=', '.join(game_board.winner_snake_names) if game_board.winner_snake_names else "",
moves=[ tuple([x["turn"], x["move"], json.dumps(x["game_board"]), [ json.dumps(ele) for ele in x["calculations"] ] ]) for x in self.create_moves_with_calculations(game_board) ],
winner=', '.join(game_board.winner_snake_names)
if game_board.winner_snake_names
else "",
moves=[
tuple(
[
x["turn"],
x["move"],
json.dumps(x["game_board"]),
[json.dumps(ele) for ele in x["calculations"]],
]
)
for x in self.create_moves_with_calculations(game_board)
],
game_type=game_type["name"],
is_ladder=game_type["is_ladder"],
+135 -15
View File
@@ -1,6 +1,10 @@
from server.GameBoard import GameBoard
from server.Dataset import Dataset
from server.Files import save_file
import aiofiles
import aiofiles.os
import gzip
import json, os
class LocalStorage:
@@ -12,6 +16,106 @@ class LocalStorage:
"solo": "05_Solo",
}
self.file_path = file_path
self.dataset_only = os.getenv("STORE_DATASET_ONLY", "false").strip().lower() in ("1", "true", "yes", "on")
self.dataset_jsonl_path = os.getenv("DATASET_JSONL_PATH", os.path.join(self.file_path, "dataset", "good_moves.jsonl"))
self.dataset_rotate_daily = os.getenv("DATASET_ROTATE_DAILY", "true").strip().lower() in ("1", "true", "yes", "on")
self.dataset_compress_rotated = os.getenv("DATASET_COMPRESS_ROTATED", "true").strip().lower() in ("1", "true", "yes", "on")
self.dataset_max_bytes = int(float(os.getenv("DATASET_JSONL_MAX_MB", "50")) * 1024 * 1024)
def _get_active_dataset_path(self, game_board:GameBoard):
if not self.dataset_rotate_daily:
return self.dataset_jsonl_path
base, ext = os.path.splitext(self.dataset_jsonl_path)
if ext == "":
ext = ".jsonl"
return f"{base}-{game_board.now_date.strftime('%Y-%m-%d')}{ext}"
def _gzip_file(self, file_path:str):
gz_path = f"{file_path}.gz"
with open(file_path, "rb") as src:
with gzip.open(gz_path, "wb") as dst:
dst.writelines(src)
os.remove(file_path)
async def _compress_old_daily_files(self, active_path:str):
if not self.dataset_compress_rotated:
return
folder = os.path.dirname(active_path)
base_name = os.path.basename(self.dataset_jsonl_path)
base_stem, _ = os.path.splitext(base_name)
prefix = f"{base_stem}-"
active_name = os.path.basename(active_path)
if folder == "" or not await aiofiles.os.path.exists(folder):
return
for name in os.listdir(folder):
if name == active_name:
continue
if not name.startswith(prefix):
continue
if not name.endswith(".jsonl"):
continue
self._gzip_file(os.path.join(folder, name))
async def _rotate_if_needed(self, active_path:str, game_board:GameBoard):
if self.dataset_max_bytes <= 0:
return
if not await aiofiles.os.path.exists(active_path):
return
file_size = (await aiofiles.os.stat(active_path)).st_size
if file_size < self.dataset_max_bytes:
return
timestamp = game_board.now_date.strftime("%Y%m%d-%H%M%S")
rotated_path = f"{active_path}.{timestamp}.jsonl"
suffix = 1
while await aiofiles.os.path.exists(rotated_path):
suffix += 1
rotated_path = f"{active_path}.{timestamp}.{suffix}.jsonl"
await aiofiles.os.rename(active_path, rotated_path)
if self.dataset_compress_rotated:
self._gzip_file(rotated_path)
def _build_dataset_rows(self, dataset_payload:dict, game_board:GameBoard):
game_info = dataset_payload.get("game", {})
snake_info = dataset_payload.get("snake", {})
rows = []
for sample in dataset_payload.get("samples", []):
rows.append({
"game_id": game_info.get("id", game_board.id),
"game_map": game_info.get("map", game_board.map),
"game_type": game_info.get("type", game_board.get_type_of_game()),
"snake_type": snake_info.get(
"type", game_board.snake_class.__class__.__name__
),
"turn": sample.get("turn"),
"move": sample.get("move"),
"is_good_move": sample.get("is_good_move", False),
"game_board": sample.get("game_board"),
"history": sample.get("history"),
})
return rows
async def _append_dataset_jsonl(self, dataset_payload:dict, game_board:GameBoard):
rows = self._build_dataset_rows(dataset_payload, game_board)
if len(rows) == 0:
return
active_path = self._get_active_dataset_path(game_board)
await aiofiles.os.makedirs(os.path.dirname(active_path), exist_ok=True)
await self._compress_old_daily_files(active_path)
await self._rotate_if_needed(active_path, game_board)
async with aiofiles.open(active_path, "a") as f:
for row in rows:
await f.write(json.dumps(row, ensure_ascii=False) + "\n")
def _get_correct_folder_for_save_file(self, game_board:GameBoard, file_name:str, game_type:str, leader_board:bool, winner:bool):
storage_folder = self.file_path
@@ -19,7 +123,12 @@ class LocalStorage:
storage_folder = os.path.join(storage_folder, "00_Leaderboards")
storage_folder = os.path.join(storage_folder, self.save_folder_dict[game_type])
storage_folder = os.path.join(storage_folder, game_board.now_date.strftime('%Y'), game_board.now_date.strftime('%m_%B'), game_board.now_date.strftime('%d'))
storage_folder = os.path.join(
storage_folder,
game_board.now_date.strftime("%Y"),
game_board.now_date.strftime("%m_%B"),
game_board.now_date.strftime("%d"),
)
if winner:
storage_folder = os.path.join(storage_folder, "Winner")
@@ -30,6 +139,12 @@ class LocalStorage:
async def save(self, game_board:GameBoard):
game_type = game_board.get_type_of_game()
dataset = Dataset(game_board).build(only_good_moves=True)
await self._append_dataset_jsonl(dataset, game_board)
if self.dataset_only:
return
save_file_path = self._get_correct_folder_for_save_file(
game_board,
f"{game_board.snake_class.__class__.__name__}_{game_board.now_date.strftime('%H-%M-%S')}_{game_board.id}.json",
@@ -39,21 +154,26 @@ class LocalStorage:
)
await save_file(save_file_path, {
"winner": game_board.winner_snake_names,
"game": {
"url": game_board.url,
"id": game_board.id,
"final_turns": game_board.turn,
"map": game_board.map,
"type": game_type,
"ruleset": game_board.ruleset,
"winner": game_board.winner_snake_names,
"game": {
"url": game_board.url,
"id": game_board.id,
"final_turns": game_board.turn,
"map": game_board.map,
"type": game_type,
"ruleset": game_board.ruleset,
},
"moves": game_board.turns,
"snake": {
"type": game_board.snake_class.__class__.__name__,
"calculations": game_board.snake_class.get_history(),
},
"dataset": dataset,
},
"moves": game_board.turns,
"snake": {
"type": game_board.snake_class.__class__.__name__,
"calculations": game_board.snake_class.get_history(),
},
}, callback=json.dump, indent=2, ensure_ascii=False)
callback=json.dump,
indent=2,
ensure_ascii=False,
)
def cleanup(self):
pass
+885
View File
@@ -0,0 +1,885 @@
from collections import deque
from typing import Any, cast
import random
import os
from snakes.TemplateSnake import TemplateSnake
class BestBattleSnake(TemplateSnake):
DIRECTIONS = {
"up": (0, 1),
"down": (0, -1),
"left": (-1, 0),
"right": (1, 0),
}
OPPOSITE = {
"up": "down",
"down": "up",
"left": "right",
"right": "left",
}
def __init__(self):
super().__init__()
self.name = "BestBattleSnake"
self.recent_heads = deque(maxlen=14)
self.last_move = None
self.last_game_id = None
self.duel_style = self._get_duel_style()
def _get_duel_style(self):
value = os.getenv("BATTLE_SNAKE_DUEL_STYLE")
if value is None:
value = os.getenv("DUEL_STYLE", "balanced")
style = value.strip().lower()
if style not in {"safe", "balanced", "aggressive"}:
return "balanced"
return style
def _duel_weights(self, style):
if style == "safe":
return {
"head_pressure": 0.65,
"distance_safety": 1.30,
"food_bias": 1.00,
}
if style == "aggressive":
return {
"head_pressure": 1.35,
"distance_safety": 0.75,
"food_bias": 0.85,
}
return {
"head_pressure": 1.00,
"distance_safety": 1.00,
"food_bias": 1.00,
}
def choose_move(self, game_data):
self.game_board = game_data
self.calculations = []
self.duel_style = self._get_duel_style()
game_id = getattr(game_data, "id", None)
turn = game_data.get_turn()
if game_id != self.last_game_id or turn <= 1:
self.recent_heads.clear()
self.last_move = None
self.last_game_id = game_id
my_snake = cast(dict[str, Any], game_data.get_my_snake())
my_head = my_snake["head"]
my_body = my_snake["body"]
my_len = my_snake.get("length", len(my_body))
my_health = my_snake.get("health", 100)
width = game_data.get_width()
height = game_data.get_height()
board_area = max(1, width * height)
occupancy_ratio = my_len / board_area
preserve_space_mode = occupancy_ratio >= 0.34 and my_health > 35
foods = game_data.get_food()
hazards = game_data.get_hazard()
other_snakes = game_data.get_other_snakes()
is_constrictor = game_data.get_type() == "constrictor"
food_set = {(food["x"], food["y"]) for food in foods}
hazard_set = {(hazard["x"], hazard["y"]) for hazard in hazards}
current_head_point = (my_head["x"], my_head["y"])
safe_moves = self._legal_moves(
my_head=my_head,
my_body=my_body,
other_snakes=other_snakes,
food_set=food_set,
is_constrictor=is_constrictor,
width=width,
height=height,
)
if not safe_moves:
fallback = self._fallback_move(my_head, width, height)
self.recent_heads.append(current_head_point)
self.last_move = fallback
self.add_to_history(
{
"turn": turn,
"move": fallback,
"reason": "no_safe_moves",
}
)
return fallback
enemy_attack_map = self._build_enemy_attack_map(
my_snake=my_snake,
other_snakes=other_snakes,
food_set=food_set,
is_constrictor=is_constrictor,
width=width,
height=height,
)
if is_constrictor:
best_move, scores = self._choose_constrictor_move(
safe_moves=safe_moves,
my_body=my_body,
my_len=my_len,
other_snakes=other_snakes,
food_set=food_set,
enemy_attack_map=enemy_attack_map,
width=width,
height=height,
)
self.recent_heads.append(current_head_point)
self.last_move = best_move
self.add_to_history({"turn": turn, "move": best_move, "scores": scores})
return best_move
if len(other_snakes) == 1:
best_move, scores = self._choose_duel_move(
safe_moves=safe_moves,
my_body=my_body,
my_len=my_len,
my_health=my_health,
foods=foods,
food_set=food_set,
hazards=hazards,
hazard_set=hazard_set,
other_snakes=other_snakes,
enemy_attack_map=enemy_attack_map,
width=width,
height=height,
)
self.recent_heads.append(current_head_point)
self.last_move = best_move
self.add_to_history({"turn": turn, "move": best_move, "scores": scores})
return best_move
scores: dict[str, float] = {}
move_safety: dict[str, dict[str, Any]] = {}
for move, pos in safe_moves.items():
point = (pos["x"], pos["y"])
ate_food = point in food_set
future_body = self._future_body(my_body, pos, ate_food, is_constrictor)
blocked = self._simulation_blocked(
future_body=future_body,
other_snakes=other_snakes,
food_set=food_set,
is_constrictor=is_constrictor,
)
blocked.discard(point)
reachable_space = self._flood_fill_count(point, blocked, width, height)
required_space = len(future_body)
liberties = self._open_neighbor_count(point, blocked, width, height)
next_options = self._next_turn_option_count(
future_body, blocked, width, height
)
territory = self._territory_control_score(
my_start=point,
enemy_starts=[
(snake["head"]["x"], snake["head"]["y"]) for snake in other_snakes
],
blocked=blocked,
width=width,
height=height,
)
nearest_food_dist = self._nearest_food_distance(
point, foods, blocked, width, height
)
future_tail = future_body[-1]
tail_point = (future_tail["x"], future_tail["y"])
tail_dist = self._path_distance(
point, tail_point, blocked - {tail_point}, width, height
)
has_tail_escape = tail_dist is not None
likely_dead_end = (
(reachable_space < required_space and not has_tail_escape)
or (liberties == 0 and not has_tail_escape)
or (next_options == 0 and not has_tail_escape)
)
score = 0.0
score += reachable_space * 2.6
score += liberties * 18.0
score += next_options * 10.0
score += territory * 0.35
if reachable_space < required_space:
score -= 1200.0
if liberties == 0:
score -= 900.0
enemy_len = enemy_attack_map.get(point)
if enemy_len is not None:
if enemy_len >= my_len:
score -= 1200.0
else:
score += 70.0
hunger_urgency = max(0.0, (60.0 - my_health) / 60.0)
if nearest_food_dist is not None:
score += (28.0 + 70.0 * hunger_urgency) / (nearest_food_dist + 1)
elif my_health < 30:
score -= 150.0
if ate_food:
if likely_dead_end:
score -= 1800.0
else:
score += 260.0 + 220.0 * hunger_urgency
if preserve_space_mode and ate_food and my_health > 45:
score -= 280.0
if tail_dist is not None:
score += 12.0 / (tail_dist + 1)
else:
score -= 40.0
if point in hazard_set:
score -= 70.0 if my_health > 35 else 250.0
score -= self._revisit_penalty(point)
if self.last_move == move:
score += 6.0
elif (
self.last_move
and self.OPPOSITE[self.last_move] == move
and len(safe_moves) > 1
):
score -= 20.0
health_after_move = 100 if ate_food else my_health - 1
if point in hazard_set:
health_after_move -= 15
if health_after_move <= 0:
score -= 10000.0
is_losing_head_to_head = enemy_len is not None and enemy_len >= my_len
is_dead_end = likely_dead_end
move_safety[move] = {
"is_survivable": (not is_dead_end)
and (not is_losing_head_to_head)
and health_after_move > 0,
"reachable_space": reachable_space,
"next_options": next_options,
"tail_escape": has_tail_escape,
}
scores[move] = round(score, 5)
survivable_moves = [
move for move, data in move_safety.items() if data["is_survivable"]
]
if survivable_moves:
best_space = max(
move_safety[move]["reachable_space"] for move in survivable_moves
)
roomy_moves = [
move
for move in survivable_moves
if move_safety[move]["reachable_space"]
>= max(1, int(best_space * 0.60))
]
tail_escape_moves = [
move for move in survivable_moves if move_safety[move]["tail_escape"]
]
if tail_escape_moves:
considered_moves = tail_escape_moves
else:
considered_moves = roomy_moves if roomy_moves else survivable_moves
else:
considered_moves = list(scores.keys())
best_score = max(scores[move] for move in considered_moves)
top_moves = [
move for move in considered_moves if best_score - scores[move] <= 1.5
]
best_move = random.choice(top_moves)
self.recent_heads.append(current_head_point)
self.last_move = best_move
self.add_to_history({"turn": turn, "move": best_move, "scores": scores})
return best_move
def _choose_duel_move(
self,
safe_moves,
my_body,
my_len,
my_health,
foods,
food_set,
hazards,
hazard_set,
other_snakes,
enemy_attack_map,
width,
height,
):
duel_weights = self._duel_weights(self.duel_style)
enemy = other_snakes[0]
enemy_head = (enemy["head"]["x"], enemy["head"]["y"])
enemy_len = enemy.get("length", len(enemy["body"]))
scores: dict[str, float] = {}
move_safety: dict[str, dict[str, Any]] = {}
for move, pos in safe_moves.items():
point = (pos["x"], pos["y"])
ate_food = point in food_set
future_body = self._future_body(
my_body, pos, ate_food, is_constrictor=False
)
blocked = self._simulation_blocked(
future_body=future_body,
other_snakes=other_snakes,
food_set=food_set,
is_constrictor=False,
)
blocked.discard(point)
reachable_space = self._flood_fill_count(point, blocked, width, height)
required_space = len(future_body)
liberties = self._open_neighbor_count(point, blocked, width, height)
next_options = self._next_turn_option_count(
future_body, blocked, width, height
)
nearest_food_dist = self._nearest_food_distance(
point, foods, blocked, width, height
)
future_tail = future_body[-1]
tail_point = (future_tail["x"], future_tail["y"])
tail_dist = self._path_distance(
point, tail_point, blocked - {tail_point}, width, height
)
territory = self._territory_control_score(
my_start=point,
enemy_starts=[enemy_head],
blocked=blocked,
width=width,
height=height,
)
has_tail_escape = tail_dist is not None
likely_dead_end = (
(reachable_space < required_space and not has_tail_escape)
or (liberties == 0 and not has_tail_escape)
or (next_options == 0 and not has_tail_escape)
)
enemy_attack_len = enemy_attack_map.get(point)
losing_head_to_head = (
enemy_attack_len is not None and enemy_attack_len >= my_len
)
direct_head_distance = self._manhattan(point, enemy_head)
score = 0.0
score += reachable_space * 2.8
score += liberties * 18.0
score += next_options * 10.0
score += territory * 0.50
if likely_dead_end:
score -= 1400.0
if losing_head_to_head:
score -= 1500.0
if my_len > enemy_len:
if direct_head_distance == 1:
score += 220.0 * duel_weights["head_pressure"]
elif direct_head_distance == 2:
score += 80.0 * duel_weights["head_pressure"]
else:
if direct_head_distance <= 2:
score -= 120.0 * duel_weights["distance_safety"]
hunger_urgency = max(0.0, (65.0 - my_health) / 65.0)
if nearest_food_dist is not None:
score += (
(25.0 + 90.0 * hunger_urgency) * duel_weights["food_bias"]
) / (nearest_food_dist + 1)
if ate_food:
if likely_dead_end:
score -= 1700.0
else:
score += 260.0 + 250.0 * hunger_urgency
if tail_dist is not None:
score += 14.0 / (tail_dist + 1)
else:
score -= 50.0
if point in hazard_set:
score -= 70.0 if my_health > 35 else 250.0
score -= self._revisit_penalty(point)
if self.last_move == move:
score += 6.0
elif (
self.last_move
and self.OPPOSITE[self.last_move] == move
and len(safe_moves) > 1
):
score -= 20.0
health_after_move = 100 if ate_food else my_health - 1
if point in hazard_set:
health_after_move -= 15
if health_after_move <= 0:
score -= 10000.0
move_safety[move] = {
"is_survivable": (not likely_dead_end)
and (not losing_head_to_head)
and health_after_move > 0,
"reachable_space": reachable_space,
"tail_escape": has_tail_escape,
}
scores[move] = round(score, 5)
survivable_moves = [
move for move, data in move_safety.items() if data["is_survivable"]
]
if survivable_moves:
tail_escape_moves = [
move for move in survivable_moves if move_safety[move]["tail_escape"]
]
if tail_escape_moves:
considered_moves = tail_escape_moves
else:
best_space = max(
move_safety[move]["reachable_space"] for move in survivable_moves
)
considered_moves = [
move
for move in survivable_moves
if move_safety[move]["reachable_space"]
>= max(1, int(best_space * 0.60))
]
if not considered_moves:
considered_moves = survivable_moves
else:
considered_moves = list(scores.keys())
best_score = max(scores[move] for move in considered_moves)
top_moves = [
move for move in considered_moves if best_score - scores[move] <= 1.5
]
return random.choice(top_moves), scores
def _choose_constrictor_move(
self,
safe_moves,
my_body,
my_len,
other_snakes,
food_set,
enemy_attack_map,
width,
height,
):
scores: dict[str, float] = {}
move_safety: dict[str, dict[str, Any]] = {}
for move, pos in safe_moves.items():
point = (pos["x"], pos["y"])
future_body = self._future_body(
my_body, pos, ate_food=False, is_constrictor=True
)
blocked = self._simulation_blocked(
future_body=future_body,
other_snakes=other_snakes,
food_set=food_set,
is_constrictor=True,
)
blocked.discard(point)
reachable_space = self._flood_fill_count(point, blocked, width, height)
required_space = len(future_body) + 1
liberties = self._open_neighbor_count(point, blocked, width, height)
next_options = self._next_turn_option_count(
future_body, blocked, width, height
)
territory = self._territory_control_score(
my_start=point,
enemy_starts=[
(snake["head"]["x"], snake["head"]["y"]) for snake in other_snakes
],
blocked=blocked,
width=width,
height=height,
)
enemy_len = enemy_attack_map.get(point)
is_losing_head_to_head = enemy_len is not None and enemy_len >= my_len
is_dead_end = (
reachable_space < required_space or liberties == 0 or next_options == 0
)
score = 0.0
score += reachable_space * 3.8
score += liberties * 24.0
score += next_options * 16.0
score += territory * 0.65
if is_dead_end:
score -= 2600.0
if is_losing_head_to_head:
score -= 2400.0
elif enemy_len is not None:
score += 90.0
score -= self._revisit_penalty(point)
if self.last_move == move:
score += 10.0
elif (
self.last_move
and self.OPPOSITE[self.last_move] == move
and len(safe_moves) > 1
):
score -= 35.0
move_safety[move] = {
"is_survivable": (not is_dead_end) and (not is_losing_head_to_head),
"reachable_space": reachable_space,
}
scores[move] = round(score, 5)
survivable_moves = [
move for move, data in move_safety.items() if data["is_survivable"]
]
if survivable_moves:
best_space = max(
move_safety[move]["reachable_space"] for move in survivable_moves
)
considered_moves = [
move
for move in survivable_moves
if move_safety[move]["reachable_space"]
>= max(1, int(best_space * 0.70))
]
if not considered_moves:
considered_moves = survivable_moves
else:
considered_moves = list(scores.keys())
best_score = max(scores[move] for move in considered_moves)
top_moves = [
move for move in considered_moves if best_score - scores[move] <= 2.0
]
return random.choice(top_moves), scores
def _legal_moves(
self, my_head, my_body, other_snakes, food_set, is_constrictor, width, height
):
occupied = self._occupied_cells(my_body, other_snakes)
own_tail = (my_body[-1]["x"], my_body[-1]["y"])
own_tail_stacked = self._is_tail_stacked(my_body)
safe_moves = {}
for move, pos in self.get_possible_moves(my_head).items():
point = (pos["x"], pos["y"])
if not self._in_bounds(point, width, height):
continue
ate_food = point in food_set
can_step_on_tail = self._can_step_on_own_tail(
point=point,
own_tail=own_tail,
own_tail_is_stacked=own_tail_stacked,
ate_food=ate_food,
is_constrictor=is_constrictor,
)
if point in occupied and not can_step_on_tail:
continue
safe_moves[move] = pos
return safe_moves
def _occupied_cells(self, my_body, other_snakes):
occupied = {(segment["x"], segment["y"]) for segment in my_body}
for snake in other_snakes:
occupied |= {(segment["x"], segment["y"]) for segment in snake["body"]}
return occupied
def _simulation_blocked(self, future_body, other_snakes, food_set, is_constrictor):
blocked = {(segment["x"], segment["y"]) for segment in future_body}
if not is_constrictor and not self._is_tail_stacked(future_body):
my_tail = future_body[-1]
blocked.discard((my_tail["x"], my_tail["y"]))
for snake in other_snakes:
for segment in snake["body"]:
blocked.add((segment["x"], segment["y"]))
if is_constrictor:
continue
if self._enemy_can_grow_this_turn(snake, food_set):
continue
if self._is_tail_stacked(snake["body"]):
continue
enemy_tail = snake["body"][-1]
blocked.discard((enemy_tail["x"], enemy_tail["y"]))
return blocked
def _build_enemy_attack_map(
self, my_snake, other_snakes, food_set, is_constrictor, width, height
):
occupied = self._occupied_cells(my_snake["body"], other_snakes)
my_id = my_snake["id"]
attack_map = {}
for enemy in other_snakes:
enemy_len = enemy.get("length", len(enemy["body"]))
enemy_tail = (enemy["body"][-1]["x"], enemy["body"][-1]["y"])
enemy_tail_stacked = self._is_tail_stacked(enemy["body"])
for pos in self.get_possible_moves(enemy["head"]).values():
point = (pos["x"], pos["y"])
if not self._in_bounds(point, width, height):
continue
can_step_on_enemy_tail = (
not is_constrictor
and point == enemy_tail
and not enemy_tail_stacked
and not self._enemy_can_grow_this_turn(enemy, food_set)
)
if point in occupied and not can_step_on_enemy_tail:
continue
# Do not consider impossible overlap directly into my own occupied body except head swap possibilities.
if point in {
(segment["x"], segment["y"])
for segment in my_snake["body"]
if my_snake["id"] == my_id
}:
continue
previous = attack_map.get(point)
if previous is None or enemy_len > previous:
attack_map[point] = enemy_len
return attack_map
def _future_body(self, current_body, next_head, ate_food, is_constrictor):
next_body = [next_head]
next_body.extend(current_body)
if is_constrictor or ate_food:
return next_body
next_body.pop()
return next_body
def _can_step_on_own_tail(
self, point, own_tail, own_tail_is_stacked, ate_food, is_constrictor
):
if is_constrictor:
return False
if ate_food:
return False
if own_tail_is_stacked:
return False
return point == own_tail
def _is_tail_stacked(self, body):
if len(body) < 2:
return False
return body[-1]["x"] == body[-2]["x"] and body[-1]["y"] == body[-2]["y"]
def _enemy_can_grow_this_turn(self, snake, food_set):
head = snake["head"]
for dx, dy in self.DIRECTIONS.values():
if (head["x"] + dx, head["y"] + dy) in food_set:
return True
return False
def _nearest_food_distance(self, start, foods, blocked, width, height):
if not foods:
return None
targets = {(food["x"], food["y"]) for food in foods}
queue = deque([(start, 0)])
seen = {start}
while queue:
point, distance = queue.popleft()
if point in targets:
return distance
for neighbor in self._neighbors(point):
if neighbor in seen:
continue
if not self._in_bounds(neighbor, width, height):
continue
if neighbor in blocked and neighbor not in targets:
continue
seen.add(neighbor)
queue.append((neighbor, distance + 1))
return None
def _path_distance(self, start, goal, blocked, width, height):
queue = deque([(start, 0)])
seen = {start}
while queue:
point, distance = queue.popleft()
if point == goal:
return distance
for neighbor in self._neighbors(point):
if neighbor in seen:
continue
if not self._in_bounds(neighbor, width, height):
continue
if neighbor in blocked and neighbor != goal:
continue
seen.add(neighbor)
queue.append((neighbor, distance + 1))
return None
def _flood_fill_count(self, start, blocked, width, height):
queue = deque([start])
seen = {start}
while queue:
point = queue.popleft()
for neighbor in self._neighbors(point):
if neighbor in seen:
continue
if not self._in_bounds(neighbor, width, height):
continue
if neighbor in blocked:
continue
seen.add(neighbor)
queue.append(neighbor)
return len(seen)
def _open_neighbor_count(self, start, blocked, width, height):
count = 0
for neighbor in self._neighbors(start):
if not self._in_bounds(neighbor, width, height):
continue
if neighbor in blocked:
continue
count += 1
return count
def _next_turn_option_count(self, future_body, blocked, width, height):
if not future_body:
return 0
next_head = future_body[0]
count = 0
for pos in self.get_possible_moves(next_head).values():
point = (pos["x"], pos["y"])
if not self._in_bounds(point, width, height):
continue
if point in blocked:
continue
count += 1
return count
def _revisit_penalty(self, point):
penalty = 0.0
for index, old_point in enumerate(reversed(self.recent_heads), start=1):
if old_point != point:
continue
penalty += max(0.0, 18.0 - index * 2.0)
return penalty
def _territory_control_score(self, my_start, enemy_starts, blocked, width, height):
if not enemy_starts:
return 0
my_distances = self._distance_map(my_start, blocked, width, height)
enemy_maps = [
self._distance_map(start, blocked, width, height) for start in enemy_starts
]
score = 0
for x in range(width):
for y in range(height):
point = (x, y)
if point in blocked:
continue
my_distance = my_distances.get(point)
if my_distance is None:
continue
enemy_best = None
for enemy_map in enemy_maps:
enemy_distance = enemy_map.get(point)
if enemy_distance is None:
continue
if enemy_best is None or enemy_distance < enemy_best:
enemy_best = enemy_distance
if enemy_best is None or my_distance < enemy_best:
score += 1
elif enemy_best < my_distance:
score -= 1
return score
def _distance_map(self, start, blocked, width, height):
queue = deque([(start, 0)])
distances = {start: 0}
while queue:
point, distance = queue.popleft()
for neighbor in self._neighbors(point):
if neighbor in distances:
continue
if not self._in_bounds(neighbor, width, height):
continue
if neighbor in blocked:
continue
distances[neighbor] = distance + 1
queue.append((neighbor, distance + 1))
return distances
def _neighbors(self, point):
for dx, dy in self.DIRECTIONS.values():
yield (point[0] + dx, point[1] + dy)
def _manhattan(self, a, b):
return abs(a[0] - b[0]) + abs(a[1] - b[1])
def _in_bounds(self, point, width, height):
return 0 <= point[0] < width and 0 <= point[1] < height
def _fallback_move(self, head, width, height):
for move, pos in self.get_possible_moves(head).items():
point = (pos["x"], pos["y"])
if self._in_bounds(point, width, height):
return move
return "up"
-7
View File
@@ -1,7 +0,0 @@
BATTLESNAKE_CLI=battlesnake_cli_1.2.3_Linux_x86_64/battlesnake
if [ -z $1 ]; then
$BATTLESNAKE_CLI play -W 11 -H 11 --name 'Python Starter Project' --url http://localhost:8000 -g solo --browser --seed 1713099635738952360
else
$BATTLESNAKE_CLI play -W 11 -H 11 --name 'Python Starter Project' --url http://localhost:8000 -g constrictor --browser --minimumFood 0
fi
+398
View File
@@ -0,0 +1,398 @@
import unittest
from snakes.BestBattleSnake import BestBattleSnake
from server.GameBoard import GameBoard
def make_board(game_state):
board = GameBoard(
game_id=game_state["game"]["id"],
width=game_state["board"]["width"],
height=game_state["board"]["height"],
ruleset=game_state["game"]["ruleset"],
source=game_state["game"]["source"],
map=game_state["game"]["map"],
snake_class=BestBattleSnake(),
)
board.read_game_data(game_state)
return board
class TestBestBattleSnake(unittest.TestCase):
def test_avoids_walls_and_body(self):
game_state = {
"game": {
"id": "test-wall-body",
"ruleset": {"name": "standard", "version": "v1.0.0"},
"source": "custom",
"map": "standard",
},
"turn": 20,
"board": {
"height": 7,
"width": 7,
"food": [{"x": 5, "y": 5}],
"hazards": [],
"snakes": [
{
"id": "me",
"name": "me",
"health": 90,
"length": 4,
"head": {"x": 0, "y": 0},
"body": [
{"x": 0, "y": 0},
{"x": 0, "y": 1},
{"x": 1, "y": 1},
{"x": 1, "y": 0},
],
}
],
},
"you": {
"id": "me",
"name": "me",
"health": 90,
"length": 4,
"head": {"x": 0, "y": 0},
"body": [
{"x": 0, "y": 0},
{"x": 0, "y": 1},
{"x": 1, "y": 1},
{"x": 1, "y": 0},
],
},
}
move = make_board(game_state).snake_neat_make_a_move()
self.assertEqual(move, "right")
def test_prioritizes_food_when_low_health(self):
game_state = {
"game": {
"id": "test-food-low-health",
"ruleset": {"name": "standard", "version": "v1.0.0"},
"source": "custom",
"map": "standard",
},
"turn": 32,
"board": {
"height": 11,
"width": 11,
"food": [{"x": 6, "y": 5}],
"hazards": [],
"snakes": [
{
"id": "me",
"name": "me",
"health": 10,
"length": 3,
"head": {"x": 5, "y": 5},
"body": [
{"x": 5, "y": 5},
{"x": 5, "y": 4},
{"x": 5, "y": 3},
],
}
],
},
"you": {
"id": "me",
"name": "me",
"health": 10,
"length": 3,
"head": {"x": 5, "y": 5},
"body": [
{"x": 5, "y": 5},
{"x": 5, "y": 4},
{"x": 5, "y": 3},
],
},
}
move = make_board(game_state).snake_neat_make_a_move()
self.assertEqual(move, "right")
def test_prioritizes_food_when_safe(self):
game_state = {
"game": {
"id": "test-food-safe",
"ruleset": {"name": "standard", "version": "v1.0.0"},
"source": "custom",
"map": "standard",
},
"turn": 8,
"board": {
"height": 11,
"width": 11,
"food": [{"x": 6, "y": 5}],
"hazards": [],
"snakes": [
{
"id": "me",
"name": "me",
"health": 95,
"length": 3,
"head": {"x": 5, "y": 5},
"body": [
{"x": 5, "y": 5},
{"x": 5, "y": 4},
{"x": 5, "y": 3},
],
}
],
},
"you": {
"id": "me",
"name": "me",
"health": 95,
"length": 3,
"head": {"x": 5, "y": 5},
"body": [
{"x": 5, "y": 5},
{"x": 5, "y": 4},
{"x": 5, "y": 3},
],
},
}
move = make_board(game_state).snake_neat_make_a_move()
self.assertEqual(move, "right")
def test_avoids_losing_head_to_head(self):
game_state = {
"game": {
"id": "test-head-to-head",
"ruleset": {"name": "standard", "version": "v1.0.0"},
"source": "custom",
"map": "standard",
},
"turn": 44,
"board": {
"height": 11,
"width": 11,
"food": [{"x": 1, "y": 1}],
"hazards": [],
"snakes": [
{
"id": "me",
"name": "me",
"health": 90,
"length": 3,
"head": {"x": 5, "y": 5},
"body": [
{"x": 5, "y": 5},
{"x": 5, "y": 4},
{"x": 5, "y": 3},
],
},
{
"id": "enemy",
"name": "enemy",
"health": 90,
"length": 6,
"head": {"x": 7, "y": 5},
"body": [
{"x": 7, "y": 5},
{"x": 7, "y": 4},
{"x": 7, "y": 3},
{"x": 7, "y": 2},
{"x": 7, "y": 1},
{"x": 6, "y": 1},
],
},
],
},
"you": {
"id": "me",
"name": "me",
"health": 90,
"length": 3,
"head": {"x": 5, "y": 5},
"body": [
{"x": 5, "y": 5},
{"x": 5, "y": 4},
{"x": 5, "y": 3},
],
},
}
move = make_board(game_state).snake_neat_make_a_move()
self.assertNotEqual(move, "right")
def test_does_not_step_into_stacked_tail(self):
game_state = {
"game": {
"id": "test-stacked-tail",
"ruleset": {"name": "standard", "version": "v1.0.0"},
"source": "custom",
"map": "standard",
},
"turn": 15,
"board": {
"height": 11,
"width": 11,
"food": [{"x": 10, "y": 10}],
"hazards": [],
"snakes": [
{
"id": "me",
"name": "me",
"health": 90,
"length": 5,
"head": {"x": 5, "y": 5},
"body": [
{"x": 5, "y": 5},
{"x": 5, "y": 4},
{"x": 4, "y": 4},
{"x": 4, "y": 5},
{"x": 4, "y": 5},
],
}
],
},
"you": {
"id": "me",
"name": "me",
"health": 90,
"length": 5,
"head": {"x": 5, "y": 5},
"body": [
{"x": 5, "y": 5},
{"x": 5, "y": 4},
{"x": 4, "y": 4},
{"x": 4, "y": 5},
{"x": 4, "y": 5},
],
},
}
move = make_board(game_state).snake_neat_make_a_move()
self.assertNotEqual(move, "left")
def test_avoids_food_if_it_is_a_dead_end(self):
game_state = {
"game": {
"id": "test-food-dead-end",
"ruleset": {"name": "standard", "version": "v1.0.0"},
"source": "custom",
"map": "standard",
},
"turn": 30,
"board": {
"height": 7,
"width": 7,
"food": [{"x": 3, "y": 4}],
"hazards": [],
"snakes": [
{
"id": "me",
"name": "me",
"health": 70,
"length": 3,
"head": {"x": 3, "y": 3},
"body": [
{"x": 3, "y": 3},
{"x": 3, "y": 2},
{"x": 3, "y": 1},
],
},
{
"id": "enemy",
"name": "enemy",
"health": 90,
"length": 5,
"head": {"x": 2, "y": 4},
"body": [
{"x": 2, "y": 4},
{"x": 2, "y": 5},
{"x": 3, "y": 5},
{"x": 4, "y": 5},
{"x": 4, "y": 4},
],
},
],
},
"you": {
"id": "me",
"name": "me",
"health": 70,
"length": 3,
"head": {"x": 3, "y": 3},
"body": [
{"x": 3, "y": 3},
{"x": 3, "y": 2},
{"x": 3, "y": 1},
],
},
}
move = make_board(game_state).snake_neat_make_a_move()
self.assertNotEqual(move, "up")
def test_constrictor_avoids_growth_dead_end(self):
game_state = {
"game": {
"id": "test-constrictor-dead-end",
"ruleset": {"name": "constrictor", "version": "v1.0.0"},
"source": "custom",
"map": "standard",
},
"turn": 12,
"board": {
"height": 7,
"width": 7,
"food": [],
"hazards": [],
"snakes": [
{
"id": "me",
"name": "me",
"health": 100,
"length": 4,
"head": {"x": 1, "y": 1},
"body": [
{"x": 1, "y": 1},
{"x": 1, "y": 0},
{"x": 0, "y": 0},
{"x": 0, "y": 1},
],
},
{
"id": "enemy",
"name": "enemy",
"health": 100,
"length": 8,
"head": {"x": 4, "y": 4},
"body": [
{"x": 4, "y": 4},
{"x": 3, "y": 4},
{"x": 3, "y": 3},
{"x": 2, "y": 3},
{"x": 2, "y": 2},
{"x": 2, "y": 0},
{"x": 2, "y": 2},
{"x": 3, "y": 1},
],
},
],
},
"you": {
"id": "me",
"name": "me",
"health": 100,
"length": 4,
"head": {"x": 1, "y": 1},
"body": [
{"x": 1, "y": 1},
{"x": 1, "y": 0},
{"x": 0, "y": 0},
{"x": 0, "y": 1},
],
},
}
move = make_board(game_state).snake_neat_make_a_move()
self.assertEqual(move, "up")
if __name__ == "__main__":
unittest.main()
+52
View File
@@ -0,0 +1,52 @@
import unittest
from typing import cast
from server.Dataset import Dataset
from server.GameBoard import GameBoard
class DummySnake:
def get_history(self):
return [
{"turn": 1, "data": [{"score": 1}]},
{"turn": 2, "data": [{"score": 2}]},
]
class DummyGameBoard:
def __init__(self, winners):
self.id = "game-1"
self.map = "standard"
self.winner_snake_names = winners
self.snake_class = DummySnake()
self.turns = [
{"turn": 1, "move": "up", "game_board": {"width": 11, "height": 11}},
{"turn": 2, "move": "left", "game_board": {"width": 11, "height": 11}},
]
def get_type_of_game(self):
return {"name": "standard", "is_ladder": False}
class TestDataset(unittest.TestCase):
def test_build_only_good_moves_for_wins(self):
dataset = Dataset(cast(GameBoard, DummyGameBoard(["me"])))
payload = dataset.build(only_good_moves=True)
self.assertTrue(payload["did_win"])
self.assertEqual(payload["total_samples"], 2)
self.assertTrue(all(sample["is_good_move"] for sample in payload["samples"]))
def test_build_returns_no_samples_for_losses_when_only_good(self):
dataset = Dataset(cast(GameBoard, DummyGameBoard(["enemy"])))
payload = dataset.build(only_good_moves=True)
self.assertFalse(payload["did_win"])
self.assertEqual(payload["total_samples"], 0)
def test_labels_by_turn(self):
winner_labels = Dataset(cast(GameBoard, DummyGameBoard(["me"]))).labels_by_turn()
loser_labels = Dataset(cast(GameBoard, DummyGameBoard(["enemy"]))).labels_by_turn()
self.assertEqual(winner_labels, {1: True, 2: True})
self.assertEqual(loser_labels, {1: False, 2: False})
if __name__ == "__main__":
unittest.main()
+47
View File
@@ -0,0 +1,47 @@
import json
import tempfile
import unittest
from pathlib import Path
from server.DatasetExporter import DatasetExporter
class TestDatasetExporter(unittest.TestCase):
def test_export_jsonl(self):
with tempfile.TemporaryDirectory() as tmp:
input_dir = Path(tmp) / "data"
output_file = Path(tmp) / "out" / "dataset.jsonl"
game_file = input_dir / "game-1.json"
game_file.parent.mkdir(parents=True, exist_ok=True)
game_payload = {
"dataset": {
"game": {"id": "g-1", "map": "standard", "type": {"name": "duel"}},
"snake": {"type": "BestBattleSnake"},
"samples": [
{
"turn": 1,
"move": "up",
"is_good_move": True,
"game_board": {"width": 11, "height": 11},
"history": {"data": []},
}
],
}
}
game_file.write_text(json.dumps(game_payload), encoding="utf-8")
report = DatasetExporter(str(input_dir), str(output_file)).export_jsonl()
self.assertEqual(report["games_scanned"], 1)
self.assertEqual(report["samples_exported"], 1)
self.assertTrue(output_file.exists())
lines = output_file.read_text(encoding="utf-8").strip().splitlines()
self.assertEqual(len(lines), 1)
first = json.loads(lines[0])
self.assertEqual(first["game_id"], "g-1")
self.assertEqual(first["move"], "up")
self.assertTrue(first["is_good_move"])
if __name__ == "__main__":
unittest.main()