Compare commits

...

4 Commits

11 changed files with 732 additions and 62 deletions
+2
View File
@@ -8,5 +8,7 @@ dependencies = [
"aiologger>=0.7.0", "aiologger>=0.7.0",
"dotenv>=0.9.9", "dotenv>=0.9.9",
"gel>=3.1.0", "gel>=3.1.0",
"redis>=5.2.1",
"quart>=0.20.0", "quart>=0.20.0",
"aioredis>=2.0.1",
] ]
+1
View File
@@ -15,5 +15,6 @@ markupsafe==3.0.3
priority==2.0.0 priority==2.0.0
python-dotenv==1.2.1 python-dotenv==1.2.1
quart==0.20.0 quart==0.20.0
redis==5.2.1
werkzeug==3.1.4 werkzeug==3.1.4
wsproto==1.3.2 wsproto==1.3.2
+48 -22
View File
@@ -1,4 +1,5 @@
from server.Files import read_file from server.Files import read_file
from server.game_board_stats import GameBoardStoreBuilder
from server.GameBoard import GameBoard from server.GameBoard import GameBoard
from snakes import SnakeBuilder from snakes import SnakeBuilder
from quart_common.web.logger import await_log from quart_common.web.logger import await_log
@@ -20,7 +21,7 @@ class Server:
'version': '1.0.0', 'version': '1.0.0',
} }
def __init__(self, data_path:str, snake_type:str, storage_type:str, debug:bool=False, check_tls_security:bool=False): def __init__(self, data_path:str, snake_type:str, storage_type:str, debug:bool=False, check_tls_security:bool=False, game_state_backend:str='memory', game_state_redis_url:str='redis://localhost:6379/0', game_state_ttl_sec:int=900, game_state_local_cache:bool=True):
self.debug = debug self.debug = debug
self.snake_type = snake_type self.snake_type = snake_type
self.storage_type = storage_type self.storage_type = storage_type
@@ -30,6 +31,13 @@ class Server:
self.check_tls_security = check_tls_security self.check_tls_security = check_tls_security
self.store_game_state = False self.store_game_state = False
normalized_backend = (game_state_backend or 'memory').strip().lower()
self.game_state_local_cache = (game_state_local_cache and normalized_backend != 'memory')
self.game_state_store = GameBoardStoreBuilder.build(
backend=game_state_backend,
redis_url=game_state_redis_url,
ttl_seconds=game_state_ttl_sec,
)
self.running_games:dict[str, GameBoard] = {} self.running_games:dict[str, GameBoard] = {}
self.game_move_counts:dict[str, int] = {} self.game_move_counts:dict[str, int] = {}
@@ -65,6 +73,7 @@ class Server:
'last_game_end_unix': 0, 'last_game_end_unix': 0,
'last_move_unix': 0, 'last_move_unix': 0,
'games_stuck_removed': 0, 'games_stuck_removed': 0,
'game_state_local_cache_enabled': bool(self.game_state_local_cache),
} }
self.logger = build_logger('Battlesnake', debug_env_var='DEBUG_SERVER') self.logger = build_logger('Battlesnake', debug_env_var='DEBUG_SERVER')
self.snake_version = self._get_snake_version() self.snake_version = self._get_snake_version()
@@ -98,8 +107,9 @@ class Server:
self._record_http_request('move') self._record_http_request('move')
game_state = await request.get_json() game_state = await request.get_json()
move_started = time.perf_counter() move_started = time.perf_counter()
game_board = await self._get_game_board(game_state) game_board = cast(GameBoard, await self._get_game_board(game_state))
next_move = game_board.snake_neat_make_a_move() next_move = game_board.snake_neat_make_a_move()
await self._persist_game_board(game_state['game']['id'], game_board)
elapsed_ms = (time.perf_counter() - move_started) * 1000.0 elapsed_ms = (time.perf_counter() - move_started) * 1000.0
self.metrics['move_response_time_ms_total'] += elapsed_ms self.metrics['move_response_time_ms_total'] += elapsed_ms
self.metrics['move_response_time_ms_max'] = max( self.metrics['move_response_time_ms_max'] = max(
@@ -126,7 +136,7 @@ class Server:
self._prune_stale_games() self._prune_stale_games()
game_state = await request.get_json() game_state = await request.get_json()
if self.store_game_state: if self.store_game_state:
game_board = await self._get_game_board(game_state, end=True) game_board = cast(GameBoard, await self._get_game_board(game_state, end=True))
if self.check_tls_security: if self.check_tls_security:
await game_board.save( await game_board.save(
StorageLoader.build(self.storage_type), StorageLoader.build(self.storage_type),
@@ -142,7 +152,7 @@ class Server:
) )
await await_log(self.logger.info(f'GAME ENDED: Winner is {[x['name'] for x in game_state['board']['snakes']]}')) await await_log(self.logger.info(f'GAME ENDED: Winner is {[x['name'] for x in game_state['board']['snakes']]}'))
self._delete_game_board(game_state) await self._delete_game_board(game_state)
return 'ok' return 'ok'
@self.app.after_request @self.app.after_request
@@ -150,6 +160,10 @@ class Server:
response.headers.set('server', 'battlesnake/gitea/snake-python') response.headers.set('server', 'battlesnake/gitea/snake-python')
return response return response
@self.app.after_serving
async def shutdown_state_storage():
await self.game_state_store.close()
@self.app.get('/cleanup') @self.app.get('/cleanup')
async def cleanup(): async def cleanup():
results = self._cleanup_database() results = self._cleanup_database()
@@ -217,7 +231,7 @@ class Server:
except ValueError: except ValueError:
return 180 return 180
async def _create_game_board(self, game_state: dict): async def _create_game_board(self, game_state:dict) -> GameBoard:
game_id = game_state['game']['id'] game_id = game_state['game']['id']
new_game_board = GameBoard( new_game_board = GameBoard(
game_id=game_id, game_id=game_id,
@@ -230,30 +244,45 @@ class Server:
) )
await new_game_board.start_game(game_state) await new_game_board.start_game(game_state)
self.running_games[game_id] = new_game_board if self.game_state_local_cache:
self.running_games[game_id] = new_game_board
await self.game_state_store.save(game_id, new_game_board)
self.game_move_counts[game_id] = 0 self.game_move_counts[game_id] = 0
self.game_last_seen_unix[game_id] = int(time.time()) self.game_last_seen_unix[game_id] = int(time.time())
self.metrics['games_started'] += 1 self.metrics['games_started'] += 1
self.metrics['active_games_peak'] = max( self.metrics['active_games_peak'] = max(
self.metrics['active_games_peak'], self.metrics['active_games_peak'],
len(self.running_games), len(self.game_last_seen_unix),
) )
self.metrics['last_game_start_unix'] = int(time.time()) self.metrics['last_game_start_unix'] = int(time.time())
return new_game_board return new_game_board
def _delete_game_board(self, game_state:dict): async def _persist_game_board(self, game_id:str, game_board:GameBoard):
if self.game_state_local_cache:
self.running_games[game_id] = game_board
await self.game_state_store.save(game_id, game_board)
async def _delete_game_board(self, game_state:dict):
game_id = game_state['game']['id'] game_id = game_state['game']['id']
self.running_games.pop(game_id, None) self.running_games.pop(game_id, None)
self.game_move_counts.pop(game_id, None) self.game_move_counts.pop(game_id, None)
self.game_last_seen_unix.pop(game_id, None) self.game_last_seen_unix.pop(game_id, None)
await self.game_state_store.delete(game_id)
async def _get_game_board(self, game_state:dict, end:bool=False): async def _get_game_board(self, game_state:dict, end:bool=False) -> GameBoard:
game_id = game_state['game']['id'] game_id = game_state['game']['id']
try: game_board: GameBoard
if self.game_state_local_cache and game_id in self.running_games:
game_board = self.running_games[game_id] game_board = self.running_games[game_id]
except KeyError: else:
game_board = await self._create_game_board(game_state) persisted_board = await self.game_state_store.load(game_id)
self.metrics['games_autocreated'] += 1 if persisted_board is not None:
game_board = cast(GameBoard, persisted_board)
if self.game_state_local_cache:
self.running_games[game_id] = game_board
else:
game_board = await self._create_game_board(game_state)
self.metrics['games_autocreated'] += 1
if not end: if not end:
self.metrics['total_moves'] += 1 self.metrics['total_moves'] += 1
@@ -265,6 +294,7 @@ class Server:
if end: if end:
self._record_game_end(game_state) self._record_game_end(game_state)
game_board.end_game(game_state) game_board.end_game(game_state)
await self._persist_game_board(game_id, game_board)
return game_board return game_board
@@ -276,7 +306,7 @@ class Server:
return storage.cleanup() return storage.cleanup()
def _prune_stale_games(self): def _prune_stale_games(self):
if not self.running_games: if not self.game_last_seen_unix:
return return
now = int(time.time()) now = int(time.time())
@@ -291,7 +321,7 @@ class Server:
self.game_last_seen_unix.pop(game_id, None) self.game_last_seen_unix.pop(game_id, None)
self.metrics['games_stuck_removed'] += 1 self.metrics['games_stuck_removed'] += 1
def _record_game_end(self, game_state: dict): def _record_game_end(self, game_state:dict):
self.metrics['games_ended'] += 1 self.metrics['games_ended'] += 1
self.metrics['last_game_end_unix'] = int(time.time()) self.metrics['last_game_end_unix'] = int(time.time())
@@ -327,7 +357,7 @@ class Server:
return { return {
**self.metrics, **self.metrics,
'active_games': len(self.running_games), 'active_games': len(self.game_last_seen_unix),
'tracked_games': len(self.game_move_counts), 'tracked_games': len(self.game_move_counts),
'avg_turns_per_game': round(avg_turns, 2), 'avg_turns_per_game': round(avg_turns, 2),
'win_rate': round(win_rate, 4), 'win_rate': round(win_rate, 4),
@@ -426,17 +456,13 @@ class Server:
'# TYPE snake_http_requests_by_endpoint_total counter', '# TYPE snake_http_requests_by_endpoint_total counter',
]) ])
for endpoint, count in snapshot['http_requests_by_endpoint'].items(): for endpoint, count in snapshot['http_requests_by_endpoint'].items():
lines.append( lines.append(f'snake_http_requests_by_endpoint_total{{endpoint="{endpoint}"}} {count}')
f'snake_http_requests_by_endpoint_total{{endpoint="{endpoint}"}} {count}'
)
lines.extend([ lines.extend([
'# HELP snake_moves_by_direction_total Move responses grouped by direction.', '# HELP snake_moves_by_direction_total Move responses grouped by direction.',
'# TYPE snake_moves_by_direction_total counter', '# TYPE snake_moves_by_direction_total counter',
]) ])
for direction, count in snapshot['move_direction_counts'].items(): for direction, count in snapshot['move_direction_counts'].items():
lines.append( lines.append(f'snake_moves_by_direction_total{{direction="{direction}"}} {count}')
f'snake_moves_by_direction_total{{direction="{direction}"}} {count}'
)
return '\n'.join(lines) + '\n' return '\n'.join(lines) + '\n'
+4 -1
View File
@@ -23,6 +23,10 @@ def build_server_from_env(default_snake_type:str) -> Server:
storage_type=os.environ.get('STORAGE', 'LocalStorage'), storage_type=os.environ.get('STORAGE', 'LocalStorage'),
debug=env_bool('DEBUG_SERVER'), debug=env_bool('DEBUG_SERVER'),
check_tls_security=False, check_tls_security=False,
game_state_backend=os.environ.get('GAME_STATE_BACKEND', 'memory'),
game_state_redis_url=os.environ.get('GAME_STATE_REDIS_URL', 'redis://localhost:6379/0'),
game_state_ttl_sec=int(os.environ.get('GAME_STATE_TTL_SEC', '900')),
game_state_local_cache=env_bool('GAME_STATE_LOCAL_CACHE', default=True),
) )
if env_bool('STORE_GAME_HISTORY'): if env_bool('STORE_GAME_HISTORY'):
@@ -30,7 +34,6 @@ def build_server_from_env(default_snake_type:str) -> Server:
return server return server
def build_run_config() -> RunConfig: def build_run_config() -> RunConfig:
return { return {
'host': os.environ.get('HOST', '0.0.0.0'), 'host': os.environ.get('HOST', '0.0.0.0'),
@@ -0,0 +1,17 @@
from server.GameBoard import GameBoard
class MemoryGameBoardStore:
def __init__(self, **kwargs):
self._state:dict[str, object] = {}
async def save(self, game_id:str, game_board:GameBoard) -> None:
self._state[game_id] = game_board
async def load(self, game_id:str):
return self._state.get(game_id)
async def delete(self, game_id:str) -> None:
self._state.pop(game_id, None)
async def close(self) -> None:
return None
@@ -0,0 +1,53 @@
from server.GameBoard import GameBoard
import pickle
class RedisGameBoardStore:
def __init__(self, redis_url:str="redis://localhost:6379/0", key_prefix:str="snake:gameboard", ttl_seconds:int=900, **kwargs):
self.redis_url = redis_url
self.key_prefix = key_prefix
self.ttl_seconds = max(60, int(ttl_seconds))
self._redis = None
async def _get_redis(self):
if self._redis is not None:
return self._redis
try:
import aioredis # type: ignore[import-not-found]
except ImportError as error: # pragma: no cover
raise RuntimeError("Redis backend selected but 'aioredis' package is not installed") from error
self._redis = aioredis.from_url(self.redis_url)
return self._redis
def _key(self, game_id:str) -> str:
return f"{self.key_prefix}:{game_id}"
async def save(self, game_id:str, game_board:GameBoard) -> None:
redis = await self._get_redis()
payload = pickle.dumps(game_board, protocol=pickle.HIGHEST_PROTOCOL)
await redis.set(self._key(game_id), payload, ex=self.ttl_seconds)
async def load(self, game_id:str):
redis = await self._get_redis()
payload = await redis.get(self._key(game_id))
if payload is None:
return None
return pickle.loads(payload)
async def delete(self, game_id:str) -> None:
redis = await self._get_redis()
await redis.delete(self._key(game_id))
async def close(self) -> None:
if self._redis is None:
return
if hasattr(self._redis, "aclose"):
await self._redis.aclose()
elif hasattr(self._redis, "close"):
close_result = self._redis.close()
if close_result is not None:
await close_result
self._redis = None
+10
View File
@@ -0,0 +1,10 @@
from server.game_board_stats.MemoryGameBoardStore import MemoryGameBoardStore
from server.game_board_stats.RedisGameBoardStore import RedisGameBoardStore
class GameBoardStoreBuilder:
@classmethod
def build(self, backend:str="memory", **kwargs) -> MemoryGameBoardStore|RedisGameBoardStore:
selected = (backend or "memory").strip().lower()
if selected == "redis":
return RedisGameBoardStore(**kwargs)
return MemoryGameBoardStore(**kwargs)
+251 -39
View File
@@ -45,6 +45,9 @@ class BestBattleSnake(TemplateSnake):
self.rl_base_dataset_path = Path(os.getenv("RL_BASE_DATASET", "data/dataset/best_moves.jsonl")) self.rl_base_dataset_path = Path(os.getenv("RL_BASE_DATASET", "data/dataset/best_moves.jsonl"))
self.rl_bootstrap_path = Path(os.getenv("RL_BOOTSTRAP_OUTPUT", "data/dataset/rl_bootstrap.jsonl")) self.rl_bootstrap_path = Path(os.getenv("RL_BOOTSTRAP_OUTPUT", "data/dataset/rl_bootstrap.jsonl"))
self.rl_needs_more_data = False self.rl_needs_more_data = False
self.future_planning_depth = max(1, min(4, self._env_int("BATTLE_FUTURE_PLANNING_DEPTH", default=2)))
self.future_planning_branch = max(1, min(3, self._env_int("BATTLE_FUTURE_PLANNING_BRANCH", default=2)))
self.future_planning_min_time_ms = max(25, self._env_int("BATTLE_FUTURE_PLANNING_MIN_MS", default=70))
def _get_duel_style(self) -> str: def _get_duel_style(self) -> str:
"""Resolve duel tuning style from `BATTLE_SNAKE_DUEL_STYLE` or `DUEL_STYLE`.""" """Resolve duel tuning style from `BATTLE_SNAKE_DUEL_STYLE` or `DUEL_STYLE`."""
@@ -89,7 +92,7 @@ class BestBattleSnake(TemplateSnake):
value = os.getenv(name) value = os.getenv(name)
if value is None: if value is None:
return default return default
return value.lower() in {'1', 'true', 'yes', 'on'} return value.lower() in {"1", "true", "yes", "on"}
def _env_int(self, name:str, default:int) -> int: def _env_int(self, name:str, default:int) -> int:
value = os.getenv(name) value = os.getenv(name)
@@ -442,11 +445,20 @@ class BestBattleSnake(TemplateSnake):
else: else:
considered_moves = list(scores.keys()) considered_moves = list(scores.keys())
best_score = max(scores[move] for move in considered_moves) best_move = self._pick_best_with_future_planning(
top_moves = [ considered_moves=considered_moves,
move for move in considered_moves if best_score - scores[move] <= 1.5 scores=scores,
] safe_moves=safe_moves,
best_move = random.choice(top_moves) my_body=my_body,
other_snakes=other_snakes,
food_set=food_set,
is_constrictor=is_constrictor,
width=width,
height=height,
enemy_can_grow_cache=enemy_can_grow_cache,
deadline=deadline,
tie_window=1.5,
)
self.recent_heads.append(current_head_point) self.recent_heads.append(current_head_point)
self.last_move = best_move self.last_move = best_move
self.add_to_history({"turn": turn, "move": best_move, "scores": scores}) self.add_to_history({"turn": turn, "move": best_move, "scores": scores})
@@ -643,11 +655,21 @@ class BestBattleSnake(TemplateSnake):
if not scores: if not scores:
return random.choice(list(safe_moves.keys())), {} return random.choice(list(safe_moves.keys())), {}
best_score = max(scores[move] for move in considered_moves) best_move = self._pick_best_with_future_planning(
top_moves = [ considered_moves=considered_moves,
move for move in considered_moves if best_score - scores[move] <= 1.5 scores=scores,
] safe_moves=safe_moves,
return random.choice(top_moves), scores my_body=my_body,
other_snakes=other_snakes,
food_set=food_set,
is_constrictor=False,
width=width,
height=height,
enemy_can_grow_cache=enemy_can_grow_cache,
deadline=deadline,
tie_window=1.5,
)
return best_move, scores
def _choose_constrictor_move(self, safe_moves:MoveMap, my_body:list[Coord], my_len:int, other_snakes:list[SnakeState], food_set:set[Point], enemy_attack_map:AttackMap, enemy_heads:list[Point], enemy_can_grow_cache:dict[Any, bool], width:int, height:int, deadline:float|None=None) -> tuple[str, dict[str, float]]: def _choose_constrictor_move(self, safe_moves:MoveMap, my_body:list[Coord], my_len:int, other_snakes:list[SnakeState], food_set:set[Point], enemy_attack_map:AttackMap, enemy_heads:list[Point], enemy_can_grow_cache:dict[Any, bool], width:int, height:int, deadline:float|None=None) -> tuple[str, dict[str, float]]:
"""Score and select a move for constrictor games.""" """Score and select a move for constrictor games."""
@@ -773,11 +795,21 @@ class BestBattleSnake(TemplateSnake):
if not scores: if not scores:
return random.choice(list(safe_moves.keys())), {} return random.choice(list(safe_moves.keys())), {}
best_score = max(scores[move] for move in considered_moves) best_move = self._pick_best_with_future_planning(
top_moves = [ considered_moves=considered_moves,
move for move in considered_moves if best_score - scores[move] <= 2.0 scores=scores,
] safe_moves=safe_moves,
return random.choice(top_moves), scores my_body=my_body,
other_snakes=other_snakes,
food_set=food_set,
is_constrictor=True,
width=width,
height=height,
enemy_can_grow_cache=enemy_can_grow_cache,
deadline=deadline,
tie_window=2.0,
)
return best_move, scores
def _legal_moves(self, my_head:Coord, my_body:list[Coord], other_snakes:list[SnakeState], food_set:set[Point], is_constrictor:bool, width:int, height:int) -> MoveMap: def _legal_moves(self, my_head:Coord, my_body:list[Coord], other_snakes:list[SnakeState], food_set:set[Point], is_constrictor:bool, width:int, height:int) -> MoveMap:
"""Return legal immediate moves after body, wall, and tail checks.""" """Return legal immediate moves after body, wall, and tail checks."""
@@ -829,15 +861,6 @@ class BestBattleSnake(TemplateSnake):
if is_constrictor: if is_constrictor:
continue continue
snake_id = snake.get("id")
enemy_can_grow = (
enemy_can_grow_cache.get(snake_id)
if enemy_can_grow_cache and snake_id is not None
else self._enemy_can_grow_this_turn(snake, food_set)
)
if enemy_can_grow:
continue
if self._is_tail_stacked(snake["body"]): if self._is_tail_stacked(snake["body"]):
continue continue
@@ -850,19 +873,14 @@ class BestBattleSnake(TemplateSnake):
"""Map cells enemies can contest next turn to their effective length.""" """Map cells enemies can contest next turn to their effective length."""
occupied = self._occupied_cells(my_snake["body"], other_snakes) occupied = self._occupied_cells(my_snake["body"], other_snakes)
my_body_points = {(segment["x"], segment["y"]) for segment in my_snake["body"]} my_body_points = {(segment["x"], segment["y"]) for segment in my_snake["body"]}
my_tail = (my_snake["body"][-1]["x"], my_snake["body"][-1]["y"])
my_tail_stacked = self._is_tail_stacked(my_snake["body"])
attack_map = {} attack_map = {}
for enemy in other_snakes: for enemy in other_snakes:
enemy_len = enemy.get("length", len(enemy["body"])) enemy_len = enemy.get("length", len(enemy["body"]))
enemy_tail = (enemy["body"][-1]["x"], enemy["body"][-1]["y"]) enemy_tail = (enemy["body"][-1]["x"], enemy["body"][-1]["y"])
enemy_tail_stacked = self._is_tail_stacked(enemy["body"]) enemy_tail_stacked = self._is_tail_stacked(enemy["body"])
snake_id = enemy.get("id")
enemy_can_grow = (
enemy_can_grow_cache.get(snake_id)
if enemy_can_grow_cache and snake_id is not None
else self._enemy_can_grow_this_turn(enemy, food_set)
)
enemy_head = enemy["head"] enemy_head = enemy["head"]
for dx, dy in self.DIRECTIONS.values(): for dx, dy in self.DIRECTIONS.values():
point = (enemy_head["x"] + dx, enemy_head["y"] + dy) point = (enemy_head["x"] + dx, enemy_head["y"] + dy)
@@ -873,15 +891,17 @@ class BestBattleSnake(TemplateSnake):
not is_constrictor not is_constrictor
and point == enemy_tail and point == enemy_tail
and not enemy_tail_stacked and not enemy_tail_stacked
and not enemy_can_grow
) )
can_contest_my_tail = (not is_constrictor and point == my_tail and not my_tail_stacked)
if point in occupied and not can_step_on_enemy_tail: if point in occupied and not can_step_on_enemy_tail and not can_contest_my_tail:
continue continue
# Do not consider impossible overlap directly into my own occupied body except head swap possibilities. # Ignore impossible overlap into our occupied body, but keep our vacatable tail
# so we can detect dangerous head-to-head contests when tail-chasing.
if point in my_body_points: if point in my_body_points:
continue if is_constrictor or my_tail_stacked or point != my_tail:
continue
previous = attack_map.get(point) previous = attack_map.get(point)
if previous is None or enemy_len > previous: if previous is None or enemy_len > previous:
@@ -954,6 +974,199 @@ class BestBattleSnake(TemplateSnake):
return False return False
return perf_counter() >= deadline return perf_counter() >= deadline
def _remaining_ms(self, deadline:float|None) -> float:
if deadline is None:
return 10_000.0
return max(0.0, (deadline - perf_counter()) * 1000.0)
def _pick_best_with_future_planning(self, considered_moves:list[str], scores:dict[str, float], safe_moves:MoveMap, my_body:list[Coord], other_snakes:list[SnakeState], food_set:set[Point], is_constrictor:bool, width:int, height:int, enemy_can_grow_cache:dict[Any, bool]|None, deadline:float|None, tie_window:float) -> str:
best_score = max(scores[move] for move in considered_moves)
top_moves = [move for move in considered_moves if best_score - scores[move] <= tie_window]
if len(top_moves) <= 1:
return top_moves[0]
if self._time_exceeded(deadline) or self._remaining_ms(deadline) < self.future_planning_min_time_ms:
return random.choice(top_moves)
candidate_moves = sorted(top_moves, key=lambda move: scores[move], reverse=True)[:3]
lookahead_bonus:dict[str, float] = {}
for move in candidate_moves:
if self._time_exceeded(deadline):
break
bonus = self._future_rollout_bonus_for_move(
move=move,
safe_moves=safe_moves,
my_body=my_body,
other_snakes=other_snakes,
food_set=food_set,
is_constrictor=is_constrictor,
width=width,
height=height,
enemy_can_grow_cache=enemy_can_grow_cache,
depth=self.future_planning_depth,
branch_limit=self.future_planning_branch,
deadline=deadline,
)
lookahead_bonus[move] = bonus
if not lookahead_bonus:
return random.choice(top_moves)
for move, bonus in lookahead_bonus.items():
scores[move] += bonus
refined_best = max(scores[move] for move in top_moves)
refined_top = [
move
for move in top_moves
if refined_best - scores[move] <= max(0.5, tie_window / 2)
]
return random.choice(refined_top)
def _future_rollout_bonus_for_move(self, move:str, safe_moves:MoveMap, my_body:list[Coord], other_snakes:list[SnakeState], food_set:set[Point], is_constrictor:bool, width:int, height:int, enemy_can_grow_cache:dict[Any, bool]|None, depth:int, branch_limit:int, deadline:float|None) -> float:
pos = safe_moves.get(move)
if pos is None:
return -250.0
point = (pos["x"], pos["y"])
ate_food = point in food_set
future_body = self._future_body(
current_body=my_body,
next_head=pos,
ate_food=ate_food,
is_constrictor=is_constrictor,
)
raw_score = self._future_survival_tree_score(
my_body=future_body,
other_snakes=other_snakes,
food_set=food_set,
is_constrictor=is_constrictor,
width=width,
height=height,
enemy_can_grow_cache=enemy_can_grow_cache,
depth=max(1, depth),
branch_limit=max(1, branch_limit),
deadline=deadline,
)
return raw_score * 0.06
def _future_survival_tree_score(self, my_body:list[Coord], other_snakes:list[SnakeState], food_set:set[Point], is_constrictor:bool, width:int, height:int, enemy_can_grow_cache:dict[Any, bool]|None, depth:int, branch_limit:int, deadline:float|None,
) -> float:
if depth <= 0 or self._time_exceeded(deadline):
return 0.0
my_head = my_body[0]
safe_moves = self._legal_moves(
my_head=my_head,
my_body=my_body,
other_snakes=other_snakes,
food_set=food_set,
is_constrictor=is_constrictor,
width=width,
height=height,
)
if not safe_moves:
return -2400.0
scored_children:list[tuple[float, list[dict[str, int]]]] = []
for move, pos in safe_moves.items():
if self._time_exceeded(deadline):
break
point = (pos["x"], pos["y"])
ate_food = point in food_set
future_body = self._future_body(
current_body=my_body,
next_head=pos,
ate_food=ate_food,
is_constrictor=is_constrictor,
)
immediate_score = self._future_position_score(
my_body=future_body,
other_snakes=other_snakes,
food_set=food_set,
is_constrictor=is_constrictor,
width=width,
height=height,
enemy_can_grow_cache=enemy_can_grow_cache,
deadline=deadline,
)
scored_children.append((immediate_score, future_body))
if not scored_children:
return -2200.0
scored_children.sort(key=lambda item: item[0], reverse=True)
if depth == 1:
return scored_children[0][0]
best_total = scored_children[0][0]
for immediate_score, future_body in scored_children[:branch_limit]:
if self._time_exceeded(deadline):
break
continuation = self._future_survival_tree_score(
my_body=future_body,
other_snakes=other_snakes,
food_set=food_set,
is_constrictor=is_constrictor,
width=width,
height=height,
enemy_can_grow_cache=enemy_can_grow_cache,
depth=depth - 1,
branch_limit=branch_limit,
deadline=deadline,
)
total = immediate_score + continuation * 0.72
if total > best_total:
best_total = total
return best_total
def _future_position_score(self, my_body:list[Coord], other_snakes:list[SnakeState], food_set:set[Point], is_constrictor:bool, width:int, height:int, enemy_can_grow_cache:dict[Any, bool]|None, deadline:float|None) -> float:
if self._time_exceeded(deadline):
return 0.0
head_point = (my_body[0]["x"], my_body[0]["y"])
blocked = self._simulation_blocked(
future_body=my_body,
other_snakes=other_snakes,
food_set=food_set,
is_constrictor=is_constrictor,
enemy_can_grow_cache=enemy_can_grow_cache,
)
blocked.discard(head_point)
reachable_space = self._flood_fill_count(head_point, blocked, width, height)
liberties = self._open_neighbor_count(head_point, blocked, width, height)
next_options = self._next_turn_option_count(my_body, blocked, width, height)
enemy_safe_options = self._safe_next_turn_option_count(
future_body=my_body,
other_snakes=other_snakes,
food_set=food_set,
is_constrictor=is_constrictor,
enemy_can_grow_cache=enemy_can_grow_cache,
width=width,
height=height,
)
required_space = len(my_body) + (1 if is_constrictor else 0)
score = 0.0
score += reachable_space * 1.9
score += liberties * 14.0
score += next_options * 11.0
score += enemy_safe_options * 26.0
if reachable_space < required_space:
score -= 1500.0
if liberties == 0:
score -= 1000.0
if next_options == 0:
score -= 1200.0
if enemy_safe_options == 0:
score -= 1900.0
elif enemy_safe_options == 1:
score -= 420.0
return score
def _nearest_food_distance(self, start:Point, food_set:set[Point], blocked:set[Point], width:int, height:int) -> int|None: def _nearest_food_distance(self, start:Point, food_set:set[Point], blocked:set[Point], width:int, height:int) -> int|None:
"""Compute shortest reachable distance to any food using BFS.""" """Compute shortest reachable distance to any food using BFS."""
if not food_set: if not food_set:
@@ -978,7 +1191,7 @@ class BestBattleSnake(TemplateSnake):
return None return None
def _path_distance( self, start:Point, goal:Point, blocked:set[Point], width:int, height:int) -> int|None: def _path_distance(self, start:Point, goal:Point, blocked:set[Point], width:int, height:int) -> int|None:
"""Compute shortest path distance between two cells.""" """Compute shortest path distance between two cells."""
queue = deque([(start, 0)]) queue = deque([(start, 0)])
seen = {start} seen = {start}
@@ -1046,7 +1259,6 @@ class BestBattleSnake(TemplateSnake):
count += 1 count += 1
return count return count
def _safe_next_turn_option_count(self, future_body:list[Coord], other_snakes:list[SnakeState], food_set:set[Point], is_constrictor:bool, enemy_can_grow_cache:dict[Any, bool]|None, width:int, height:int) -> int: def _safe_next_turn_option_count(self, future_body:list[Coord], other_snakes:list[SnakeState], food_set:set[Point], is_constrictor:bool, enemy_can_grow_cache:dict[Any, bool]|None, width:int, height:int) -> int:
"""Count next-turn moves that stay safe from enemy head contests.""" """Count next-turn moves that stay safe from enemy head contests."""
if not future_body: if not future_body:
@@ -1180,7 +1392,7 @@ class BestBattleSnake(TemplateSnake):
) )
return enemy_space, enemy_options return enemy_space, enemy_options
def _enemy_constrictor_projection(self, other_snakes:list[SnakeState], blocked:set[Point], width:int, height: int) -> tuple[int, int]: def _enemy_constrictor_projection(self, other_snakes:list[SnakeState], blocked:set[Point], width:int, height:int) -> tuple[int, int]:
"""Estimate enemy best-space and total options after our candidate move.""" """Estimate enemy best-space and total options after our candidate move."""
best_enemy_space = 0 best_enemy_space = 0
total_enemy_options = 0 total_enemy_options = 0
+192
View File
@@ -829,5 +829,197 @@ class TestBestBattleSnake(unittest.TestCase):
move = make_board(game_state).snake_neat_make_a_move() move = make_board(game_state).snake_neat_make_a_move()
self.assertEqual(move, "right") self.assertEqual(move, "right")
def test_enemy_attack_map_marks_own_tail_contest(self):
snake = BestBattleSnake()
my_snake = {
"id": "me",
"name": "me",
"health": 90,
"length": 4,
"head": {"x": 3, "y": 3},
"body": [
{"x": 3, "y": 3},
{"x": 3, "y": 2},
{"x": 2, "y": 2},
{"x": 2, "y": 3},
],
}
enemy = {
"id": "enemy",
"name": "enemy",
"health": 90,
"length": 6,
"head": {"x": 1, "y": 3},
"body": [
{"x": 1, "y": 3},
{"x": 1, "y": 2},
{"x": 1, "y": 1},
{"x": 0, "y": 1},
{"x": 0, "y": 2},
{"x": 0, "y": 3},
],
}
attack_map = snake._build_enemy_attack_map(
my_snake=my_snake,
other_snakes=[enemy],
food_set=set(),
is_constrictor=False,
width=7,
height=7,
enemy_can_grow_cache={"enemy": False},
)
self.assertEqual(attack_map.get((2, 3)), 6)
def test_simulation_frees_enemy_tail_even_if_enemy_can_grow(self):
snake = BestBattleSnake()
future_body = [
{"x": 4, "y": 4},
{"x": 4, "y": 3},
{"x": 3, "y": 3},
{"x": 3, "y": 4},
]
enemy = {
"id": "enemy",
"name": "enemy",
"health": 90,
"length": 4,
"head": {"x": 1, "y": 1},
"body": [
{"x": 1, "y": 1},
{"x": 1, "y": 0},
{"x": 0, "y": 0},
{"x": 0, "y": 1},
],
}
blocked = snake._simulation_blocked(
future_body=future_body,
other_snakes=[enemy],
food_set={(2, 1)},
is_constrictor=False,
enemy_can_grow_cache={"enemy": True},
)
self.assertNotIn((0, 1), blocked)
def test_enemy_attack_map_allows_enemy_tail_move_when_enemy_can_grow(self):
snake = BestBattleSnake()
my_snake = {
"id": "me",
"name": "me",
"health": 90,
"length": 4,
"head": {"x": 6, "y": 6},
"body": [
{"x": 6, "y": 6},
{"x": 6, "y": 5},
{"x": 5, "y": 5},
{"x": 5, "y": 6},
],
}
enemy = {
"id": "enemy",
"name": "enemy",
"health": 90,
"length": 4,
"head": {"x": 3, "y": 3},
"body": [
{"x": 3, "y": 3},
{"x": 3, "y": 2},
{"x": 2, "y": 2},
{"x": 2, "y": 3},
],
}
attack_map = snake._build_enemy_attack_map(
my_snake=my_snake,
other_snakes=[enemy],
food_set={(4, 3)},
is_constrictor=False,
width=11,
height=11,
enemy_can_grow_cache={"enemy": True},
)
self.assertEqual(attack_map.get((2, 3)), 4)
def test_future_planning_prefers_non_trap_path(self):
snake = BestBattleSnake()
my_body = [
{"x": 3, "y": 3},
{"x": 3, "y": 2},
{"x": 2, "y": 2},
{"x": 2, "y": 3},
]
enemy = {
"id": "enemy",
"name": "enemy",
"health": 90,
"length": 8,
"head": {"x": 0, "y": 0},
"body": [
{"x": 0, "y": 0},
{"x": 4, "y": 4},
{"x": 4, "y": 2},
{"x": 5, "y": 4},
{"x": 5, "y": 2},
{"x": 6, "y": 4},
{"x": 6, "y": 3},
{"x": 6, "y": 2},
],
}
safe_moves = snake._legal_moves(
my_head=my_body[0],
my_body=my_body,
other_snakes=[enemy],
food_set=set(),
is_constrictor=False,
width=7,
height=7,
)
self.assertIn("left", safe_moves)
self.assertIn("right", safe_moves)
right_bonus = snake._future_rollout_bonus_for_move(
move="right",
safe_moves=safe_moves,
my_body=my_body,
other_snakes=[enemy],
food_set=set(),
is_constrictor=False,
width=7,
height=7,
enemy_can_grow_cache={"enemy": False},
depth=3,
branch_limit=2,
deadline=None,
)
left_bonus = snake._future_rollout_bonus_for_move(
move="left",
safe_moves=safe_moves,
my_body=my_body,
other_snakes=[enemy],
food_set=set(),
is_constrictor=False,
width=7,
height=7,
enemy_can_grow_cache={"enemy": False},
depth=3,
branch_limit=2,
deadline=None,
)
self.assertGreater(left_bonus, right_bonus)
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()
+110
View File
@@ -0,0 +1,110 @@
import unittest
from typing import Any, cast
from server.GameBoard import GameBoard
from server.game_board_stats import GameBoardStoreBuilder
from server.game_board_stats.MemoryGameBoardStore import MemoryGameBoardStore
from server.game_board_stats.RedisGameBoardStore import RedisGameBoardStore
from snakes.TemplateSnake import TemplateSnake
class _FakeRedis:
def __init__(self):
self.data = {}
async def set(self, key, value, ex=None):
self.data[key] = value
async def get(self, key):
return self.data.get(key)
async def delete(self, key):
self.data.pop(key, None)
async def aclose(self):
return None
class TestGameStateStore(unittest.IsolatedAsyncioTestCase):
def _build_board(self) -> GameBoard:
board = GameBoard(
game_id="game-1",
width=11,
height=11,
ruleset={"name": "standard", "version": "v1.0.0"},
source="custom",
map="standard",
snake_class=TemplateSnake(),
)
board.read_game_data(
{
"turn": 3,
"board": {
"food": [{"x": 1, "y": 1}],
"hazards": [],
"snakes": [
{
"id": "me",
"name": "me",
"health": 99,
"length": 3,
"head": {"x": 2, "y": 2},
"body": [
{"x": 2, "y": 2},
{"x": 2, "y": 1},
{"x": 2, "y": 0},
],
}
],
},
"you": {
"id": "me",
"name": "me",
"health": 99,
"length": 3,
"head": {"x": 2, "y": 2},
"body": [
{"x": 2, "y": 2},
{"x": 2, "y": 1},
{"x": 2, "y": 0},
],
},
"game": {"timeout": 500},
}
)
return board
def test_builder_selects_store_backend(self):
memory_store = GameBoardStoreBuilder.build(backend="memory")
redis_store = GameBoardStoreBuilder.build(backend="redis")
default_store = GameBoardStoreBuilder.build(backend="unknown")
self.assertIsInstance(memory_store, MemoryGameBoardStore)
self.assertIsInstance(redis_store, RedisGameBoardStore)
self.assertIsInstance(default_store, MemoryGameBoardStore)
async def test_memory_backend_roundtrip(self):
store = MemoryGameBoardStore()
board = self._build_board()
await store.save("game-1", board)
loaded = cast(GameBoard, await store.load("game-1"))
self.assertIsNotNone(loaded)
self.assertEqual(loaded.id, "game-1")
await store.delete("game-1")
self.assertIsNone(await store.load("game-1"))
async def test_redis_backend_roundtrip(self):
store = RedisGameBoardStore()
store._redis = cast(Any, _FakeRedis())
board = self._build_board()
await store.save("game-1", board)
loaded = cast(GameBoard, await store.load("game-1"))
self.assertIsNotNone(loaded)
self.assertEqual(loaded.id, "game-1")
self.assertEqual(loaded.get_turn(), 3)
await store.delete("game-1")
self.assertIsNone(await store.load("game-1"))
if __name__ == "__main__":
unittest.main()
Generated
+44
View File
@@ -17,6 +17,28 @@ version = "0.7.0"
source = { registry = "https://pypi.org/simple" } source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/76/a8/ca4c00b319b877d29aa792cdd4ae3fb2a9f57268d94708a637abe9ae58c5/aiologger-0.7.0.tar.gz", hash = "sha256:7a4d5c91b836b61e842a791071786a3d80d6b6fa46fb8fd8e73391253ecb72ac", size = 20485, upload-time = "2022-10-05T01:03:22.199Z" } sdist = { url = "https://files.pythonhosted.org/packages/76/a8/ca4c00b319b877d29aa792cdd4ae3fb2a9f57268d94708a637abe9ae58c5/aiologger-0.7.0.tar.gz", hash = "sha256:7a4d5c91b836b61e842a791071786a3d80d6b6fa46fb8fd8e73391253ecb72ac", size = 20485, upload-time = "2022-10-05T01:03:22.199Z" }
[[package]]
name = "aioredis"
version = "2.0.1"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "async-timeout" },
{ name = "typing-extensions" },
]
sdist = { url = "https://files.pythonhosted.org/packages/2e/cf/9eb144a0b05809ffc5d29045c4b51039000ea275bc1268d0351c9e7dfc06/aioredis-2.0.1.tar.gz", hash = "sha256:eaa51aaf993f2d71f54b70527c440437ba65340588afeb786cd87c55c89cd98e", size = 111047, upload-time = "2021-12-27T20:28:17.557Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/9b/a9/0da089c3ae7a31cbcd2dcf0214f6f571e1295d292b6139e2bac68ec081d0/aioredis-2.0.1-py3-none-any.whl", hash = "sha256:9ac0d0b3b485d293b8ca1987e6de8658d7dafcca1cddfcd1d506cae8cdebfdd6", size = 71243, upload-time = "2021-12-27T20:28:16.36Z" },
]
[[package]]
name = "async-timeout"
version = "5.0.1"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/a5/ae/136395dfbfe00dfc94da3f3e136d0b13f394cba8f4841120e34226265780/async_timeout-5.0.1.tar.gz", hash = "sha256:d9321a7a3d5a6a5e187e824d2fa0793ce379a202935782d555d6e9d2735677d3", size = 9274, upload-time = "2024-11-06T16:41:39.6Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/fe/ba/e2081de779ca30d473f21f5b30e0e737c438205440784c7dfc81efc2b029/async_timeout-5.0.1-py3-none-any.whl", hash = "sha256:39e3809566ff85354557ec2398b55e096c8364bacac9405a7a1fa429e77fe76c", size = 6233, upload-time = "2024-11-06T16:41:37.9Z" },
]
[[package]] [[package]]
name = "blinker" name = "blinker"
version = "1.9.0" version = "1.9.0"
@@ -266,23 +288,45 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/7e/e9/cc28f21f52913adf333f653b9e0a3bf9cb223f5083a26422968ba73edd8d/quart-0.20.0-py3-none-any.whl", hash = "sha256:003c08f551746710acb757de49d9b768986fd431517d0eb127380b656b98b8f1", size = 77960, upload-time = "2024-12-23T13:53:02.842Z" }, { url = "https://files.pythonhosted.org/packages/7e/e9/cc28f21f52913adf333f653b9e0a3bf9cb223f5083a26422968ba73edd8d/quart-0.20.0-py3-none-any.whl", hash = "sha256:003c08f551746710acb757de49d9b768986fd431517d0eb127380b656b98b8f1", size = 77960, upload-time = "2024-12-23T13:53:02.842Z" },
] ]
[[package]]
name = "redis"
version = "7.4.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/7b/7f/3759b1d0d72b7c92f0d70ffd9dc962b7b7b5ee74e135f9d7d8ab06b8a318/redis-7.4.0.tar.gz", hash = "sha256:64a6ea7bf567ad43c964d2c30d82853f8df927c5c9017766c55a1d1ed95d18ad", size = 4943913, upload-time = "2026-03-24T09:14:37.53Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/74/3a/95deec7db1eb53979973ebd156f3369a72732208d1391cd2e5d127062a32/redis-7.4.0-py3-none-any.whl", hash = "sha256:a9c74a5c893a5ef8455a5adb793a31bb70feb821c86eccb62eebef5a19c429ec", size = 409772, upload-time = "2026-03-24T09:14:35.968Z" },
]
[[package]] [[package]]
name = "snake-python" name = "snake-python"
version = "0.1.0" version = "0.1.0"
source = { virtual = "." } source = { virtual = "." }
dependencies = [ dependencies = [
{ name = "aiologger" }, { name = "aiologger" },
{ name = "aioredis" },
{ name = "dotenv" }, { name = "dotenv" },
{ name = "gel" }, { name = "gel" },
{ name = "quart" }, { name = "quart" },
{ name = "redis" },
] ]
[package.metadata] [package.metadata]
requires-dist = [ requires-dist = [
{ name = "aiologger", specifier = ">=0.7.0" }, { name = "aiologger", specifier = ">=0.7.0" },
{ name = "aioredis", specifier = ">=2.0.1" },
{ name = "dotenv", specifier = ">=0.9.9" }, { name = "dotenv", specifier = ">=0.9.9" },
{ name = "gel", specifier = ">=3.1.0" }, { name = "gel", specifier = ">=3.1.0" },
{ name = "quart", specifier = ">=0.20.0" }, { name = "quart", specifier = ">=0.20.0" },
{ name = "redis", specifier = ">=5.2.1" },
]
[[package]]
name = "typing-extensions"
version = "4.15.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/72/94/1a15dd82efb362ac84269196e94cf00f187f7ed21c242792a923cdb1c61f/typing_extensions-4.15.0.tar.gz", hash = "sha256:0cea48d173cc12fa28ecabc3b837ea3cf6f38c6d1136f85cbaaf598984861466", size = 109391, upload-time = "2025-08-25T13:49:26.313Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/18/67/36e9267722cc04a6b9f15c7f3441c2363321a3ea07da7ae0c0707beb2a9c/typing_extensions-4.15.0-py3-none-any.whl", hash = "sha256:f0fa19c6845758ab08074a0cfa8b7aecb71c999ca73d62883bc25cc018c4e548", size = 44614, upload-time = "2025-08-25T13:49:24.86Z" },
] ]
[[package]] [[package]]