import argparse, random, glob, json, math from collections import Counter from pathlib import Path MOVES = ["up", "down", "left", "right"] def resolve_input_files(inputs:list[str]) -> list[Path]: resolved:list[Path] = [] seen:set[str] = set() for item in inputs: path = Path(item) if path.is_dir(): for file_path in sorted(path.rglob("*.jsonl")): key = str(file_path.resolve()) if key in seen: continue seen.add(key) resolved.append(file_path) continue if any(ch in item for ch in "*?[]"): for match in sorted(glob.glob(item)): file_path = Path(match) if not file_path.is_file(): continue key = str(file_path.resolve()) if key in seen: continue seen.add(key) resolved.append(file_path) continue if path.is_file(): key = str(path.resolve()) if key in seen: continue seen.add(key) resolved.append(path) return resolved def _neighbors(x:int, y:int) -> list[tuple[int, int, str]]: return [ (x, y + 1, "up"), (x, y - 1, "down"), (x - 1, y, "left"), (x + 1, y, "right"), ] def _safe_neighbor_count(point:tuple[int, int], blocked:set[tuple[int, int]], width:int, height:int) -> int: count = 0 for nx, ny, _ in _neighbors(point[0], point[1]): if not (0 <= nx < width and 0 <= ny < height): continue if (nx, ny) in blocked: continue count += 1 return count def _manhattan_to_nearest_food(point: tuple[int, int], food: set[tuple[int, int]]) -> int: if not food: return 25 return min(abs(point[0] - fx) + abs(point[1] - fy) for fx, fy in food) def extract_feature_values(row:dict) -> dict[str, float]: board = row.get("game_board", {}) snakes = board.get("snakes", []) if not snakes: return {} me = snakes[0] body = me.get("body", []) if not body: return {} width = int(board.get("width", 0)) height = int(board.get("height", 0)) head = body[0] hx = int(head.get("x", 0)) hy = int(head.get("y", 0)) health = int(me.get("health", 100)) length = int(me.get("length", len(body))) food_set = {(int(f.get("x", 0)), int(f.get("y", 0))) for f in board.get("food", [])} hazard_set = {(int(h.get("x", 0)), int(h.get("y", 0))) for h in board.get("hazards", [])} blocked = set() for snake in snakes: for seg in snake.get("body", []): blocked.add((int(seg.get("x", 0)), int(seg.get("y", 0)))) features:dict[str, float] = { "bias": 1.0, "health_norm": max(0.0, min(1.0, health / 100.0)), "length_norm": min(1.0, length / max(1.0, width * height)), "turn_norm": min(1.0, int(row.get("turn", 0)) / 100.0), "food_count_norm": min(1.0, len(food_set) / 10.0), "hazard_count_norm": min(1.0, len(hazard_set) / 20.0), "opponent_count_norm": min(1.0, max(0, len(snakes) - 1) / 7.0), } safe_total = 0 for nx, ny, move in _neighbors(hx, hy): in_bounds = 1.0 if (0 <= nx < width and 0 <= ny < height) else 0.0 blocked_next = 1.0 if (nx, ny) in blocked else 0.0 food_next = 1.0 if (nx, ny) in food_set else 0.0 hazard_next = 1.0 if (nx, ny) in hazard_set else 0.0 if in_bounds and not blocked_next: safe_total += 1 open_next = float(_safe_neighbor_count((nx, ny), blocked, width, height)) dist_food = float(_manhattan_to_nearest_food((nx, ny), food_set)) else: open_next = 0.0 dist_food = 25.0 prefix = f"m:{move}:" features[prefix + "in_bounds"] = in_bounds features[prefix + "blocked"] = blocked_next features[prefix + "food"] = food_next features[prefix + "hazard"] = hazard_next features[prefix + "open_next"] = min(4.0, open_next) / 4.0 features[prefix + "food_dist"] = min(25.0, dist_food) / 25.0 features["safe_total_norm"] = safe_total / 4.0 return features class SoftmaxMoveModel: def __init__(self): self.weights = {move: {} for move in MOVES} self.bias = {move: 0.0 for move in MOVES} def _score(self, move:str, features:dict[str, float]) -> float: weight_map = self.weights[move] value = self.bias[move] for name, feat in features.items(): value += weight_map.get(name, 0.0) * feat return value def fit(self, rows:list[dict], epochs:int=14, lr:float=0.08, l2:float=1e-6) -> None: examples = [] for row in rows: label = row.get("move") if label not in MOVES: continue features = extract_feature_values(row) if not features: continue examples.append((features, label)) if not examples: return for _ in range(epochs): random.shuffle(examples) for features, label in examples: scores = {move: self._score(move, features) for move in MOVES} max_score = max(scores.values()) exp_scores = { move: math.exp(scores[move] - max_score) for move in MOVES } z = sum(exp_scores.values()) probs = {move: exp_scores[move] / z for move in MOVES} for move in MOVES: target = 1.0 if move == label else 0.0 gradient = target - probs[move] self.bias[move] += lr * gradient w = self.weights[move] for name, feat in features.items(): current = w.get(name, 0.0) update = lr * ((gradient * feat) - (l2 * current)) w[name] = current + update def predict_scores(self, row:dict) -> dict[str, float]: features = extract_feature_values(row) if not features: return {move: 0.0 for move in MOVES} return {move: self._score(move, features) for move in MOVES} def predict(self, row:dict) -> str: scores = self.predict_scores(row) return max(scores, key=lambda move: scores[move]) def evaluate(self, rows:list[dict]) -> dict: total = 0 correct = 0 top2 = 0 confusion = {move: Counter() for move in MOVES} for row in rows: expected = row.get("move") if expected not in MOVES: continue scores = self.predict_scores(row) ranked = sorted(scores.items(), key=lambda item: item[1], reverse=True) predicted = ranked[0][0] total += 1 if predicted == expected: correct += 1 if expected in { ranked[0][0], ranked[1][0] if len(ranked) > 1 else ranked[0][0], }: top2 += 1 confusion[expected][predicted] += 1 return { "total": total, "correct": correct, "accuracy": round((correct / total) if total else 0.0, 4), "top2_accuracy": round((top2 / total) if total else 0.0, 4), "confusion": {label: dict(confusion[label]) for label in MOVES}, } def to_dict(self) -> dict: return { "model_type": "softmax_moves_v2", "moves": MOVES, "weights": self.weights, "bias": self.bias, } def read_rows(paths:list[Path]) -> list[dict]: rows: list[dict] = [] for path in paths: with path.open("r", encoding="utf-8") as handle: for line in handle: if not line.strip(): continue row = json.loads(line) if row.get("move") not in MOVES: continue if not row.get("game_board"): continue rows.append(row) return rows if __name__ == "__main__": parser = argparse.ArgumentParser(description="Train Battlesnake move model") parser.add_argument("--input", action="append", required=True) parser.add_argument("--output", required=True) parser.add_argument("--eval-split", type=float, default=0.2) parser.add_argument("--seed", type=int, default=42) parser.add_argument("--epochs", type=int, default=14) parser.add_argument("--lr", type=float, default=0.08) args = parser.parse_args() paths = resolve_input_files(args.input) if not paths: raise SystemExit("No input files found") rows = read_rows(paths) if len(rows) < 50: raise SystemExit("Need at least 50 rows for training") random.seed(args.seed) random.shuffle(rows) eval_count = int(len(rows) * max(0.0, min(0.5, args.eval_split))) eval_rows = rows[:eval_count] train_rows = rows[eval_count:] model = SoftmaxMoveModel() model.fit(train_rows, epochs=max(1, args.epochs), lr=max(1e-4, args.lr)) metrics = model.evaluate(eval_rows) output = Path(args.output) output.parent.mkdir(parents=True, exist_ok=True) payload = { "input_files": [str(p) for p in paths], "train_rows": len(train_rows), "eval_rows": len(eval_rows), "eval_metrics": metrics, "model": model.to_dict(), } output.write_text(json.dumps(payload, indent=2), encoding="utf-8") print(json.dumps({ "output": str(output), "train_rows": len(train_rows), "eval_rows": len(eval_rows), "accuracy": metrics.get("accuracy"), "top2_accuracy": metrics.get("top2_accuracy"), }, indent=2, ))