add Training for AI and AI Model and allow to collect rl data from BestBattleSnake
Build and Push Docker Container / build-and-push (push) Successful in 1m36s
Build and Push Docker Container / build-and-push (push) Successful in 1m36s
This commit is contained in:
@@ -0,0 +1,290 @@
|
||||
import argparse, random, glob, json, math
|
||||
from collections import Counter
|
||||
from pathlib import Path
|
||||
|
||||
MOVES = ["up", "down", "left", "right"]
|
||||
|
||||
def resolve_input_files(inputs:list[str]) -> list[Path]:
|
||||
resolved:list[Path] = []
|
||||
seen:set[str] = set()
|
||||
|
||||
for item in inputs:
|
||||
path = Path(item)
|
||||
if path.is_dir():
|
||||
for file_path in sorted(path.rglob("*.jsonl")):
|
||||
key = str(file_path.resolve())
|
||||
if key in seen:
|
||||
continue
|
||||
seen.add(key)
|
||||
resolved.append(file_path)
|
||||
continue
|
||||
|
||||
if any(ch in item for ch in "*?[]"):
|
||||
for match in sorted(glob.glob(item)):
|
||||
file_path = Path(match)
|
||||
if not file_path.is_file():
|
||||
continue
|
||||
key = str(file_path.resolve())
|
||||
if key in seen:
|
||||
continue
|
||||
seen.add(key)
|
||||
resolved.append(file_path)
|
||||
continue
|
||||
|
||||
if path.is_file():
|
||||
key = str(path.resolve())
|
||||
if key in seen:
|
||||
continue
|
||||
seen.add(key)
|
||||
resolved.append(path)
|
||||
|
||||
return resolved
|
||||
|
||||
def _neighbors(x:int, y:int) -> list[tuple[int, int, str]]:
|
||||
return [
|
||||
(x, y + 1, "up"),
|
||||
(x, y - 1, "down"),
|
||||
(x - 1, y, "left"),
|
||||
(x + 1, y, "right"),
|
||||
]
|
||||
|
||||
def _safe_neighbor_count(point:tuple[int, int], blocked:set[tuple[int, int]], width:int, height:int) -> int:
|
||||
count = 0
|
||||
for nx, ny, _ in _neighbors(point[0], point[1]):
|
||||
if not (0 <= nx < width and 0 <= ny < height):
|
||||
continue
|
||||
if (nx, ny) in blocked:
|
||||
continue
|
||||
count += 1
|
||||
return count
|
||||
|
||||
def _manhattan_to_nearest_food(point: tuple[int, int], food: set[tuple[int, int]]) -> int:
|
||||
if not food:
|
||||
return 25
|
||||
return min(abs(point[0] - fx) + abs(point[1] - fy) for fx, fy in food)
|
||||
|
||||
def extract_feature_values(row:dict) -> dict[str, float]:
|
||||
board = row.get("game_board", {})
|
||||
snakes = board.get("snakes", [])
|
||||
if not snakes:
|
||||
return {}
|
||||
|
||||
me = snakes[0]
|
||||
body = me.get("body", [])
|
||||
if not body:
|
||||
return {}
|
||||
|
||||
width = int(board.get("width", 0))
|
||||
height = int(board.get("height", 0))
|
||||
head = body[0]
|
||||
hx = int(head.get("x", 0))
|
||||
hy = int(head.get("y", 0))
|
||||
health = int(me.get("health", 100))
|
||||
length = int(me.get("length", len(body)))
|
||||
|
||||
food_set = {(int(f.get("x", 0)), int(f.get("y", 0))) for f in board.get("food", [])}
|
||||
hazard_set = {(int(h.get("x", 0)), int(h.get("y", 0))) for h in board.get("hazards", [])}
|
||||
blocked = set()
|
||||
for snake in snakes:
|
||||
for seg in snake.get("body", []):
|
||||
blocked.add((int(seg.get("x", 0)), int(seg.get("y", 0))))
|
||||
|
||||
features:dict[str, float] = {
|
||||
"bias": 1.0,
|
||||
"health_norm": max(0.0, min(1.0, health / 100.0)),
|
||||
"length_norm": min(1.0, length / max(1.0, width * height)),
|
||||
"turn_norm": min(1.0, int(row.get("turn", 0)) / 100.0),
|
||||
"food_count_norm": min(1.0, len(food_set) / 10.0),
|
||||
"hazard_count_norm": min(1.0, len(hazard_set) / 20.0),
|
||||
"opponent_count_norm": min(1.0, max(0, len(snakes) - 1) / 7.0),
|
||||
}
|
||||
|
||||
safe_total = 0
|
||||
for nx, ny, move in _neighbors(hx, hy):
|
||||
in_bounds = 1.0 if (0 <= nx < width and 0 <= ny < height) else 0.0
|
||||
blocked_next = 1.0 if (nx, ny) in blocked else 0.0
|
||||
food_next = 1.0 if (nx, ny) in food_set else 0.0
|
||||
hazard_next = 1.0 if (nx, ny) in hazard_set else 0.0
|
||||
|
||||
if in_bounds and not blocked_next:
|
||||
safe_total += 1
|
||||
open_next = float(_safe_neighbor_count((nx, ny), blocked, width, height))
|
||||
dist_food = float(_manhattan_to_nearest_food((nx, ny), food_set))
|
||||
else:
|
||||
open_next = 0.0
|
||||
dist_food = 25.0
|
||||
|
||||
prefix = f"m:{move}:"
|
||||
features[prefix + "in_bounds"] = in_bounds
|
||||
features[prefix + "blocked"] = blocked_next
|
||||
features[prefix + "food"] = food_next
|
||||
features[prefix + "hazard"] = hazard_next
|
||||
features[prefix + "open_next"] = min(4.0, open_next) / 4.0
|
||||
features[prefix + "food_dist"] = min(25.0, dist_food) / 25.0
|
||||
|
||||
features["safe_total_norm"] = safe_total / 4.0
|
||||
return features
|
||||
|
||||
class SoftmaxMoveModel:
|
||||
def __init__(self):
|
||||
self.weights = {move: {} for move in MOVES}
|
||||
self.bias = {move: 0.0 for move in MOVES}
|
||||
|
||||
def _score(self, move:str, features:dict[str, float]) -> float:
|
||||
weight_map = self.weights[move]
|
||||
value = self.bias[move]
|
||||
for name, feat in features.items():
|
||||
value += weight_map.get(name, 0.0) * feat
|
||||
return value
|
||||
|
||||
def fit(self, rows:list[dict], epochs:int=14, lr:float=0.08, l2:float=1e-6) -> None:
|
||||
examples = []
|
||||
for row in rows:
|
||||
label = row.get("move")
|
||||
if label not in MOVES:
|
||||
continue
|
||||
features = extract_feature_values(row)
|
||||
if not features:
|
||||
continue
|
||||
examples.append((features, label))
|
||||
|
||||
if not examples:
|
||||
return
|
||||
|
||||
for _ in range(epochs):
|
||||
random.shuffle(examples)
|
||||
for features, label in examples:
|
||||
scores = {move: self._score(move, features) for move in MOVES}
|
||||
max_score = max(scores.values())
|
||||
exp_scores = {
|
||||
move: math.exp(scores[move] - max_score) for move in MOVES
|
||||
}
|
||||
z = sum(exp_scores.values())
|
||||
probs = {move: exp_scores[move] / z for move in MOVES}
|
||||
|
||||
for move in MOVES:
|
||||
target = 1.0 if move == label else 0.0
|
||||
gradient = target - probs[move]
|
||||
self.bias[move] += lr * gradient
|
||||
|
||||
w = self.weights[move]
|
||||
for name, feat in features.items():
|
||||
current = w.get(name, 0.0)
|
||||
update = lr * ((gradient * feat) - (l2 * current))
|
||||
w[name] = current + update
|
||||
|
||||
def predict_scores(self, row:dict) -> dict[str, float]:
|
||||
features = extract_feature_values(row)
|
||||
if not features:
|
||||
return {move: 0.0 for move in MOVES}
|
||||
return {move: self._score(move, features) for move in MOVES}
|
||||
|
||||
def predict(self, row:dict) -> str:
|
||||
scores = self.predict_scores(row)
|
||||
return max(scores, key=lambda move: scores[move])
|
||||
|
||||
def evaluate(self, rows:list[dict]) -> dict:
|
||||
total = 0
|
||||
correct = 0
|
||||
top2 = 0
|
||||
confusion = {move: Counter() for move in MOVES}
|
||||
|
||||
for row in rows:
|
||||
expected = row.get("move")
|
||||
if expected not in MOVES:
|
||||
continue
|
||||
|
||||
scores = self.predict_scores(row)
|
||||
ranked = sorted(scores.items(), key=lambda item: item[1], reverse=True)
|
||||
predicted = ranked[0][0]
|
||||
|
||||
total += 1
|
||||
if predicted == expected:
|
||||
correct += 1
|
||||
if expected in {
|
||||
ranked[0][0],
|
||||
ranked[1][0] if len(ranked) > 1 else ranked[0][0],
|
||||
}:
|
||||
top2 += 1
|
||||
|
||||
confusion[expected][predicted] += 1
|
||||
|
||||
return {
|
||||
"total": total,
|
||||
"correct": correct,
|
||||
"accuracy": round((correct / total) if total else 0.0, 4),
|
||||
"top2_accuracy": round((top2 / total) if total else 0.0, 4),
|
||||
"confusion": {label: dict(confusion[label]) for label in MOVES},
|
||||
}
|
||||
|
||||
def to_dict(self) -> dict:
|
||||
return {
|
||||
"model_type": "softmax_moves_v2",
|
||||
"moves": MOVES,
|
||||
"weights": self.weights,
|
||||
"bias": self.bias,
|
||||
}
|
||||
|
||||
def read_rows(paths:list[Path]) -> list[dict]:
|
||||
rows: list[dict] = []
|
||||
for path in paths:
|
||||
with path.open("r", encoding="utf-8") as handle:
|
||||
for line in handle:
|
||||
if not line.strip():
|
||||
continue
|
||||
row = json.loads(line)
|
||||
if row.get("move") not in MOVES:
|
||||
continue
|
||||
if not row.get("game_board"):
|
||||
continue
|
||||
rows.append(row)
|
||||
return rows
|
||||
|
||||
if __name__ == "__main__":
|
||||
parser = argparse.ArgumentParser(description="Train Battlesnake move model")
|
||||
parser.add_argument("--input", action="append", required=True)
|
||||
parser.add_argument("--output", required=True)
|
||||
parser.add_argument("--eval-split", type=float, default=0.2)
|
||||
parser.add_argument("--seed", type=int, default=42)
|
||||
parser.add_argument("--epochs", type=int, default=14)
|
||||
parser.add_argument("--lr", type=float, default=0.08)
|
||||
args = parser.parse_args()
|
||||
|
||||
paths = resolve_input_files(args.input)
|
||||
if not paths:
|
||||
raise SystemExit("No input files found")
|
||||
|
||||
rows = read_rows(paths)
|
||||
if len(rows) < 50:
|
||||
raise SystemExit("Need at least 50 rows for training")
|
||||
|
||||
random.seed(args.seed)
|
||||
random.shuffle(rows)
|
||||
eval_count = int(len(rows) * max(0.0, min(0.5, args.eval_split)))
|
||||
eval_rows = rows[:eval_count]
|
||||
train_rows = rows[eval_count:]
|
||||
|
||||
model = SoftmaxMoveModel()
|
||||
model.fit(train_rows, epochs=max(1, args.epochs), lr=max(1e-4, args.lr))
|
||||
metrics = model.evaluate(eval_rows)
|
||||
|
||||
output = Path(args.output)
|
||||
output.parent.mkdir(parents=True, exist_ok=True)
|
||||
payload = {
|
||||
"input_files": [str(p) for p in paths],
|
||||
"train_rows": len(train_rows),
|
||||
"eval_rows": len(eval_rows),
|
||||
"eval_metrics": metrics,
|
||||
"model": model.to_dict(),
|
||||
}
|
||||
output.write_text(json.dumps(payload, indent=2), encoding="utf-8")
|
||||
|
||||
print(json.dumps({
|
||||
"output": str(output),
|
||||
"train_rows": len(train_rows),
|
||||
"eval_rows": len(eval_rows),
|
||||
"accuracy": metrics.get("accuracy"),
|
||||
"top2_accuracy": metrics.get("top2_accuracy"),
|
||||
},
|
||||
indent=2,
|
||||
))
|
||||
Reference in New Issue
Block a user