291 lines
8.6 KiB
Python
291 lines
8.6 KiB
Python
import argparse, random, glob, json, math
|
|
from collections import Counter
|
|
from pathlib import Path
|
|
|
|
MOVES = ["up", "down", "left", "right"]
|
|
|
|
def resolve_input_files(inputs:list[str]) -> list[Path]:
|
|
resolved:list[Path] = []
|
|
seen:set[str] = set()
|
|
|
|
for item in inputs:
|
|
path = Path(item)
|
|
if path.is_dir():
|
|
for file_path in sorted(path.rglob("*.jsonl")):
|
|
key = str(file_path.resolve())
|
|
if key in seen:
|
|
continue
|
|
seen.add(key)
|
|
resolved.append(file_path)
|
|
continue
|
|
|
|
if any(ch in item for ch in "*?[]"):
|
|
for match in sorted(glob.glob(item)):
|
|
file_path = Path(match)
|
|
if not file_path.is_file():
|
|
continue
|
|
key = str(file_path.resolve())
|
|
if key in seen:
|
|
continue
|
|
seen.add(key)
|
|
resolved.append(file_path)
|
|
continue
|
|
|
|
if path.is_file():
|
|
key = str(path.resolve())
|
|
if key in seen:
|
|
continue
|
|
seen.add(key)
|
|
resolved.append(path)
|
|
|
|
return resolved
|
|
|
|
def _neighbors(x:int, y:int) -> list[tuple[int, int, str]]:
|
|
return [
|
|
(x, y + 1, "up"),
|
|
(x, y - 1, "down"),
|
|
(x - 1, y, "left"),
|
|
(x + 1, y, "right"),
|
|
]
|
|
|
|
def _safe_neighbor_count(point:tuple[int, int], blocked:set[tuple[int, int]], width:int, height:int) -> int:
|
|
count = 0
|
|
for nx, ny, _ in _neighbors(point[0], point[1]):
|
|
if not (0 <= nx < width and 0 <= ny < height):
|
|
continue
|
|
if (nx, ny) in blocked:
|
|
continue
|
|
count += 1
|
|
return count
|
|
|
|
def _manhattan_to_nearest_food(point: tuple[int, int], food: set[tuple[int, int]]) -> int:
|
|
if not food:
|
|
return 25
|
|
return min(abs(point[0] - fx) + abs(point[1] - fy) for fx, fy in food)
|
|
|
|
def extract_feature_values(row:dict) -> dict[str, float]:
|
|
board = row.get("game_board", {})
|
|
snakes = board.get("snakes", [])
|
|
if not snakes:
|
|
return {}
|
|
|
|
me = snakes[0]
|
|
body = me.get("body", [])
|
|
if not body:
|
|
return {}
|
|
|
|
width = int(board.get("width", 0))
|
|
height = int(board.get("height", 0))
|
|
head = body[0]
|
|
hx = int(head.get("x", 0))
|
|
hy = int(head.get("y", 0))
|
|
health = int(me.get("health", 100))
|
|
length = int(me.get("length", len(body)))
|
|
|
|
food_set = {(int(f.get("x", 0)), int(f.get("y", 0))) for f in board.get("food", [])}
|
|
hazard_set = {(int(h.get("x", 0)), int(h.get("y", 0))) for h in board.get("hazards", [])}
|
|
blocked = set()
|
|
for snake in snakes:
|
|
for seg in snake.get("body", []):
|
|
blocked.add((int(seg.get("x", 0)), int(seg.get("y", 0))))
|
|
|
|
features:dict[str, float] = {
|
|
"bias": 1.0,
|
|
"health_norm": max(0.0, min(1.0, health / 100.0)),
|
|
"length_norm": min(1.0, length / max(1.0, width * height)),
|
|
"turn_norm": min(1.0, int(row.get("turn", 0)) / 100.0),
|
|
"food_count_norm": min(1.0, len(food_set) / 10.0),
|
|
"hazard_count_norm": min(1.0, len(hazard_set) / 20.0),
|
|
"opponent_count_norm": min(1.0, max(0, len(snakes) - 1) / 7.0),
|
|
}
|
|
|
|
safe_total = 0
|
|
for nx, ny, move in _neighbors(hx, hy):
|
|
in_bounds = 1.0 if (0 <= nx < width and 0 <= ny < height) else 0.0
|
|
blocked_next = 1.0 if (nx, ny) in blocked else 0.0
|
|
food_next = 1.0 if (nx, ny) in food_set else 0.0
|
|
hazard_next = 1.0 if (nx, ny) in hazard_set else 0.0
|
|
|
|
if in_bounds and not blocked_next:
|
|
safe_total += 1
|
|
open_next = float(_safe_neighbor_count((nx, ny), blocked, width, height))
|
|
dist_food = float(_manhattan_to_nearest_food((nx, ny), food_set))
|
|
else:
|
|
open_next = 0.0
|
|
dist_food = 25.0
|
|
|
|
prefix = f"m:{move}:"
|
|
features[prefix + "in_bounds"] = in_bounds
|
|
features[prefix + "blocked"] = blocked_next
|
|
features[prefix + "food"] = food_next
|
|
features[prefix + "hazard"] = hazard_next
|
|
features[prefix + "open_next"] = min(4.0, open_next) / 4.0
|
|
features[prefix + "food_dist"] = min(25.0, dist_food) / 25.0
|
|
|
|
features["safe_total_norm"] = safe_total / 4.0
|
|
return features
|
|
|
|
class SoftmaxMoveModel:
|
|
def __init__(self):
|
|
self.weights = {move: {} for move in MOVES}
|
|
self.bias = {move: 0.0 for move in MOVES}
|
|
|
|
def _score(self, move:str, features:dict[str, float]) -> float:
|
|
weight_map = self.weights[move]
|
|
value = self.bias[move]
|
|
for name, feat in features.items():
|
|
value += weight_map.get(name, 0.0) * feat
|
|
return value
|
|
|
|
def fit(self, rows:list[dict], epochs:int=14, lr:float=0.08, l2:float=1e-6) -> None:
|
|
examples = []
|
|
for row in rows:
|
|
label = row.get("move")
|
|
if label not in MOVES:
|
|
continue
|
|
features = extract_feature_values(row)
|
|
if not features:
|
|
continue
|
|
examples.append((features, label))
|
|
|
|
if not examples:
|
|
return
|
|
|
|
for _ in range(epochs):
|
|
random.shuffle(examples)
|
|
for features, label in examples:
|
|
scores = {move: self._score(move, features) for move in MOVES}
|
|
max_score = max(scores.values())
|
|
exp_scores = {
|
|
move: math.exp(scores[move] - max_score) for move in MOVES
|
|
}
|
|
z = sum(exp_scores.values())
|
|
probs = {move: exp_scores[move] / z for move in MOVES}
|
|
|
|
for move in MOVES:
|
|
target = 1.0 if move == label else 0.0
|
|
gradient = target - probs[move]
|
|
self.bias[move] += lr * gradient
|
|
|
|
w = self.weights[move]
|
|
for name, feat in features.items():
|
|
current = w.get(name, 0.0)
|
|
update = lr * ((gradient * feat) - (l2 * current))
|
|
w[name] = current + update
|
|
|
|
def predict_scores(self, row:dict) -> dict[str, float]:
|
|
features = extract_feature_values(row)
|
|
if not features:
|
|
return {move: 0.0 for move in MOVES}
|
|
return {move: self._score(move, features) for move in MOVES}
|
|
|
|
def predict(self, row:dict) -> str:
|
|
scores = self.predict_scores(row)
|
|
return max(scores, key=lambda move: scores[move])
|
|
|
|
def evaluate(self, rows:list[dict]) -> dict:
|
|
total = 0
|
|
correct = 0
|
|
top2 = 0
|
|
confusion = {move: Counter() for move in MOVES}
|
|
|
|
for row in rows:
|
|
expected = row.get("move")
|
|
if expected not in MOVES:
|
|
continue
|
|
|
|
scores = self.predict_scores(row)
|
|
ranked = sorted(scores.items(), key=lambda item: item[1], reverse=True)
|
|
predicted = ranked[0][0]
|
|
|
|
total += 1
|
|
if predicted == expected:
|
|
correct += 1
|
|
if expected in {
|
|
ranked[0][0],
|
|
ranked[1][0] if len(ranked) > 1 else ranked[0][0],
|
|
}:
|
|
top2 += 1
|
|
|
|
confusion[expected][predicted] += 1
|
|
|
|
return {
|
|
"total": total,
|
|
"correct": correct,
|
|
"accuracy": round((correct / total) if total else 0.0, 4),
|
|
"top2_accuracy": round((top2 / total) if total else 0.0, 4),
|
|
"confusion": {label: dict(confusion[label]) for label in MOVES},
|
|
}
|
|
|
|
def to_dict(self) -> dict:
|
|
return {
|
|
"model_type": "softmax_moves_v2",
|
|
"moves": MOVES,
|
|
"weights": self.weights,
|
|
"bias": self.bias,
|
|
}
|
|
|
|
def read_rows(paths:list[Path]) -> list[dict]:
|
|
rows: list[dict] = []
|
|
for path in paths:
|
|
with path.open("r", encoding="utf-8") as handle:
|
|
for line in handle:
|
|
if not line.strip():
|
|
continue
|
|
row = json.loads(line)
|
|
if row.get("move") not in MOVES:
|
|
continue
|
|
if not row.get("game_board"):
|
|
continue
|
|
rows.append(row)
|
|
return rows
|
|
|
|
if __name__ == "__main__":
|
|
parser = argparse.ArgumentParser(description="Train Battlesnake move model")
|
|
parser.add_argument("--input", action="append", required=True)
|
|
parser.add_argument("--output", required=True)
|
|
parser.add_argument("--eval-split", type=float, default=0.2)
|
|
parser.add_argument("--seed", type=int, default=42)
|
|
parser.add_argument("--epochs", type=int, default=14)
|
|
parser.add_argument("--lr", type=float, default=0.08)
|
|
args = parser.parse_args()
|
|
|
|
paths = resolve_input_files(args.input)
|
|
if not paths:
|
|
raise SystemExit("No input files found")
|
|
|
|
rows = read_rows(paths)
|
|
if len(rows) < 50:
|
|
raise SystemExit("Need at least 50 rows for training")
|
|
|
|
random.seed(args.seed)
|
|
random.shuffle(rows)
|
|
eval_count = int(len(rows) * max(0.0, min(0.5, args.eval_split)))
|
|
eval_rows = rows[:eval_count]
|
|
train_rows = rows[eval_count:]
|
|
|
|
model = SoftmaxMoveModel()
|
|
model.fit(train_rows, epochs=max(1, args.epochs), lr=max(1e-4, args.lr))
|
|
metrics = model.evaluate(eval_rows)
|
|
|
|
output = Path(args.output)
|
|
output.parent.mkdir(parents=True, exist_ok=True)
|
|
payload = {
|
|
"input_files": [str(p) for p in paths],
|
|
"train_rows": len(train_rows),
|
|
"eval_rows": len(eval_rows),
|
|
"eval_metrics": metrics,
|
|
"model": model.to_dict(),
|
|
}
|
|
output.write_text(json.dumps(payload, indent=2), encoding="utf-8")
|
|
|
|
print(json.dumps({
|
|
"output": str(output),
|
|
"train_rows": len(train_rows),
|
|
"eval_rows": len(eval_rows),
|
|
"accuracy": metrics.get("accuracy"),
|
|
"top2_accuracy": metrics.get("top2_accuracy"),
|
|
},
|
|
indent=2,
|
|
))
|