Files
snake-python/server/TrainBattleSnakeAI.py
daniel156161 9e826afa5f
Build and Push Docker Container / build-and-push (push) Successful in 1m36s
add Training for AI and AI Model and allow to collect rl data from BestBattleSnake
2026-04-03 23:19:09 +02:00

291 lines
8.6 KiB
Python

import argparse, random, glob, json, math
from collections import Counter
from pathlib import Path
MOVES = ["up", "down", "left", "right"]
def resolve_input_files(inputs:list[str]) -> list[Path]:
resolved:list[Path] = []
seen:set[str] = set()
for item in inputs:
path = Path(item)
if path.is_dir():
for file_path in sorted(path.rglob("*.jsonl")):
key = str(file_path.resolve())
if key in seen:
continue
seen.add(key)
resolved.append(file_path)
continue
if any(ch in item for ch in "*?[]"):
for match in sorted(glob.glob(item)):
file_path = Path(match)
if not file_path.is_file():
continue
key = str(file_path.resolve())
if key in seen:
continue
seen.add(key)
resolved.append(file_path)
continue
if path.is_file():
key = str(path.resolve())
if key in seen:
continue
seen.add(key)
resolved.append(path)
return resolved
def _neighbors(x:int, y:int) -> list[tuple[int, int, str]]:
return [
(x, y + 1, "up"),
(x, y - 1, "down"),
(x - 1, y, "left"),
(x + 1, y, "right"),
]
def _safe_neighbor_count(point:tuple[int, int], blocked:set[tuple[int, int]], width:int, height:int) -> int:
count = 0
for nx, ny, _ in _neighbors(point[0], point[1]):
if not (0 <= nx < width and 0 <= ny < height):
continue
if (nx, ny) in blocked:
continue
count += 1
return count
def _manhattan_to_nearest_food(point: tuple[int, int], food: set[tuple[int, int]]) -> int:
if not food:
return 25
return min(abs(point[0] - fx) + abs(point[1] - fy) for fx, fy in food)
def extract_feature_values(row:dict) -> dict[str, float]:
board = row.get("game_board", {})
snakes = board.get("snakes", [])
if not snakes:
return {}
me = snakes[0]
body = me.get("body", [])
if not body:
return {}
width = int(board.get("width", 0))
height = int(board.get("height", 0))
head = body[0]
hx = int(head.get("x", 0))
hy = int(head.get("y", 0))
health = int(me.get("health", 100))
length = int(me.get("length", len(body)))
food_set = {(int(f.get("x", 0)), int(f.get("y", 0))) for f in board.get("food", [])}
hazard_set = {(int(h.get("x", 0)), int(h.get("y", 0))) for h in board.get("hazards", [])}
blocked = set()
for snake in snakes:
for seg in snake.get("body", []):
blocked.add((int(seg.get("x", 0)), int(seg.get("y", 0))))
features:dict[str, float] = {
"bias": 1.0,
"health_norm": max(0.0, min(1.0, health / 100.0)),
"length_norm": min(1.0, length / max(1.0, width * height)),
"turn_norm": min(1.0, int(row.get("turn", 0)) / 100.0),
"food_count_norm": min(1.0, len(food_set) / 10.0),
"hazard_count_norm": min(1.0, len(hazard_set) / 20.0),
"opponent_count_norm": min(1.0, max(0, len(snakes) - 1) / 7.0),
}
safe_total = 0
for nx, ny, move in _neighbors(hx, hy):
in_bounds = 1.0 if (0 <= nx < width and 0 <= ny < height) else 0.0
blocked_next = 1.0 if (nx, ny) in blocked else 0.0
food_next = 1.0 if (nx, ny) in food_set else 0.0
hazard_next = 1.0 if (nx, ny) in hazard_set else 0.0
if in_bounds and not blocked_next:
safe_total += 1
open_next = float(_safe_neighbor_count((nx, ny), blocked, width, height))
dist_food = float(_manhattan_to_nearest_food((nx, ny), food_set))
else:
open_next = 0.0
dist_food = 25.0
prefix = f"m:{move}:"
features[prefix + "in_bounds"] = in_bounds
features[prefix + "blocked"] = blocked_next
features[prefix + "food"] = food_next
features[prefix + "hazard"] = hazard_next
features[prefix + "open_next"] = min(4.0, open_next) / 4.0
features[prefix + "food_dist"] = min(25.0, dist_food) / 25.0
features["safe_total_norm"] = safe_total / 4.0
return features
class SoftmaxMoveModel:
def __init__(self):
self.weights = {move: {} for move in MOVES}
self.bias = {move: 0.0 for move in MOVES}
def _score(self, move:str, features:dict[str, float]) -> float:
weight_map = self.weights[move]
value = self.bias[move]
for name, feat in features.items():
value += weight_map.get(name, 0.0) * feat
return value
def fit(self, rows:list[dict], epochs:int=14, lr:float=0.08, l2:float=1e-6) -> None:
examples = []
for row in rows:
label = row.get("move")
if label not in MOVES:
continue
features = extract_feature_values(row)
if not features:
continue
examples.append((features, label))
if not examples:
return
for _ in range(epochs):
random.shuffle(examples)
for features, label in examples:
scores = {move: self._score(move, features) for move in MOVES}
max_score = max(scores.values())
exp_scores = {
move: math.exp(scores[move] - max_score) for move in MOVES
}
z = sum(exp_scores.values())
probs = {move: exp_scores[move] / z for move in MOVES}
for move in MOVES:
target = 1.0 if move == label else 0.0
gradient = target - probs[move]
self.bias[move] += lr * gradient
w = self.weights[move]
for name, feat in features.items():
current = w.get(name, 0.0)
update = lr * ((gradient * feat) - (l2 * current))
w[name] = current + update
def predict_scores(self, row:dict) -> dict[str, float]:
features = extract_feature_values(row)
if not features:
return {move: 0.0 for move in MOVES}
return {move: self._score(move, features) for move in MOVES}
def predict(self, row:dict) -> str:
scores = self.predict_scores(row)
return max(scores, key=lambda move: scores[move])
def evaluate(self, rows:list[dict]) -> dict:
total = 0
correct = 0
top2 = 0
confusion = {move: Counter() for move in MOVES}
for row in rows:
expected = row.get("move")
if expected not in MOVES:
continue
scores = self.predict_scores(row)
ranked = sorted(scores.items(), key=lambda item: item[1], reverse=True)
predicted = ranked[0][0]
total += 1
if predicted == expected:
correct += 1
if expected in {
ranked[0][0],
ranked[1][0] if len(ranked) > 1 else ranked[0][0],
}:
top2 += 1
confusion[expected][predicted] += 1
return {
"total": total,
"correct": correct,
"accuracy": round((correct / total) if total else 0.0, 4),
"top2_accuracy": round((top2 / total) if total else 0.0, 4),
"confusion": {label: dict(confusion[label]) for label in MOVES},
}
def to_dict(self) -> dict:
return {
"model_type": "softmax_moves_v2",
"moves": MOVES,
"weights": self.weights,
"bias": self.bias,
}
def read_rows(paths:list[Path]) -> list[dict]:
rows: list[dict] = []
for path in paths:
with path.open("r", encoding="utf-8") as handle:
for line in handle:
if not line.strip():
continue
row = json.loads(line)
if row.get("move") not in MOVES:
continue
if not row.get("game_board"):
continue
rows.append(row)
return rows
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Train Battlesnake move model")
parser.add_argument("--input", action="append", required=True)
parser.add_argument("--output", required=True)
parser.add_argument("--eval-split", type=float, default=0.2)
parser.add_argument("--seed", type=int, default=42)
parser.add_argument("--epochs", type=int, default=14)
parser.add_argument("--lr", type=float, default=0.08)
args = parser.parse_args()
paths = resolve_input_files(args.input)
if not paths:
raise SystemExit("No input files found")
rows = read_rows(paths)
if len(rows) < 50:
raise SystemExit("Need at least 50 rows for training")
random.seed(args.seed)
random.shuffle(rows)
eval_count = int(len(rows) * max(0.0, min(0.5, args.eval_split)))
eval_rows = rows[:eval_count]
train_rows = rows[eval_count:]
model = SoftmaxMoveModel()
model.fit(train_rows, epochs=max(1, args.epochs), lr=max(1e-4, args.lr))
metrics = model.evaluate(eval_rows)
output = Path(args.output)
output.parent.mkdir(parents=True, exist_ok=True)
payload = {
"input_files": [str(p) for p in paths],
"train_rows": len(train_rows),
"eval_rows": len(eval_rows),
"eval_metrics": metrics,
"model": model.to_dict(),
}
output.write_text(json.dumps(payload, indent=2), encoding="utf-8")
print(json.dumps({
"output": str(output),
"train_rows": len(train_rows),
"eval_rows": len(eval_rows),
"accuracy": metrics.get("accuracy"),
"top2_accuracy": metrics.get("top2_accuracy"),
},
indent=2,
))