add better dashboard with full snake game board
This commit is contained in:
@@ -0,0 +1,164 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
from pathlib import Path
|
||||
from typing import Iterable
|
||||
from urllib.parse import urlencode
|
||||
from urllib.request import urlopen
|
||||
import shutil
|
||||
import xml.etree.ElementTree as ET
|
||||
|
||||
BASE_URL = "https://media.battlesnake.com/"
|
||||
S3_NS = {"s3": "http://doc.s3.amazonaws.com/2006-03-01"}
|
||||
DEFAULT_PREFIXES = ("snakes/heads/", "snakes/tails/")
|
||||
ALLOWED_EXTENSIONS = {".svg", ".png", ".webp"}
|
||||
|
||||
def build_list_url(prefix:str, marker:str|None) -> str:
|
||||
query = {"prefix": prefix}
|
||||
if marker:
|
||||
query["marker"] = marker
|
||||
return f"{BASE_URL}?{urlencode(query)}"
|
||||
|
||||
def list_keys_for_prefix(prefix:str) -> list[str]:
|
||||
keys: list[str] = []
|
||||
marker: str | None = None
|
||||
|
||||
while True:
|
||||
url = build_list_url(prefix=prefix, marker=marker)
|
||||
with urlopen(url) as response:
|
||||
xml_bytes = response.read()
|
||||
|
||||
root = ET.fromstring(xml_bytes)
|
||||
for key_node in root.findall("s3:Contents/s3:Key", S3_NS):
|
||||
key = (key_node.text or "").strip()
|
||||
if key and not key.endswith("/"):
|
||||
keys.append(key)
|
||||
|
||||
truncated_text = (
|
||||
root.findtext("s3:IsTruncated", default="false", namespaces=S3_NS)
|
||||
or "false"
|
||||
).lower()
|
||||
is_truncated = truncated_text == "true"
|
||||
if not is_truncated:
|
||||
break
|
||||
|
||||
next_marker = (
|
||||
root.findtext("s3:NextMarker", default="", namespaces=S3_NS) or ""
|
||||
).strip()
|
||||
if next_marker:
|
||||
marker = next_marker
|
||||
continue
|
||||
|
||||
last_key = keys[-1] if keys else None
|
||||
if not last_key:
|
||||
break
|
||||
marker = last_key
|
||||
|
||||
return keys
|
||||
|
||||
def keep_customization_key(key:str) -> bool:
|
||||
if not key.startswith(DEFAULT_PREFIXES):
|
||||
return False
|
||||
suffix = Path(key).suffix.lower()
|
||||
return suffix in ALLOWED_EXTENSIONS
|
||||
|
||||
def to_output_path(output_root:Path, key:str) -> Path:
|
||||
if key.startswith("snakes/heads/"):
|
||||
relative = key.removeprefix("snakes/")
|
||||
elif key.startswith("snakes/tails/"):
|
||||
relative = key.removeprefix("snakes/")
|
||||
else:
|
||||
relative = key
|
||||
return output_root / relative
|
||||
|
||||
def download_file(url:str, output_file:Path) -> None:
|
||||
output_file.parent.mkdir(parents=True, exist_ok=True)
|
||||
with urlopen(url) as response, output_file.open("wb") as target:
|
||||
shutil.copyfileobj(response, target)
|
||||
|
||||
def prune_output(output_root:Path, wanted_files:set[Path]) -> int:
|
||||
removed = 0
|
||||
if not output_root.exists():
|
||||
return 0
|
||||
|
||||
for file_path in output_root.rglob("*"):
|
||||
if not file_path.is_file():
|
||||
continue
|
||||
if file_path not in wanted_files:
|
||||
file_path.unlink()
|
||||
removed += 1
|
||||
|
||||
for directory in sorted(
|
||||
(p for p in output_root.rglob("*") if p.is_dir()), reverse=True
|
||||
):
|
||||
if any(directory.iterdir()):
|
||||
continue
|
||||
directory.rmdir()
|
||||
|
||||
return removed
|
||||
|
||||
def collect_customization_keys(prefixes:Iterable[str]) -> list[str]:
|
||||
all_keys: list[str] = []
|
||||
for prefix in prefixes:
|
||||
all_keys.extend(list_keys_for_prefix(prefix))
|
||||
return [key for key in sorted(set(all_keys)) if keep_customization_key(key)]
|
||||
|
||||
def parse_args() -> argparse.Namespace:
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Download Battlesnake snake customization assets (heads/tails) from media.battlesnake.com",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--output",
|
||||
default="data/battlesnake-customizations",
|
||||
help="Output directory (default: data/battlesnake-customizations)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--overwrite",
|
||||
action="store_true",
|
||||
help="Overwrite files that already exist",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--prune",
|
||||
action="store_true",
|
||||
help="Delete files in output directory that are not snake customizations",
|
||||
)
|
||||
return parser.parse_args()
|
||||
|
||||
def main() -> None:
|
||||
args = parse_args()
|
||||
output_root = Path(args.output).resolve()
|
||||
|
||||
keys = collect_customization_keys(DEFAULT_PREFIXES)
|
||||
if not keys:
|
||||
print("No customization files found.")
|
||||
return
|
||||
|
||||
downloaded = 0
|
||||
skipped = 0
|
||||
wanted_files: set[Path] = set()
|
||||
|
||||
for key in keys:
|
||||
file_url = f"{BASE_URL}{key}"
|
||||
output_file = to_output_path(output_root, key)
|
||||
wanted_files.add(output_file)
|
||||
|
||||
if output_file.exists() and not args.overwrite:
|
||||
skipped += 1
|
||||
continue
|
||||
|
||||
download_file(file_url, output_file)
|
||||
downloaded += 1
|
||||
|
||||
removed = prune_output(output_root, wanted_files) if args.prune else 0
|
||||
|
||||
print(f"Output directory : {output_root}")
|
||||
print(f"Files discovered : {len(keys)}")
|
||||
print(f"Downloaded : {downloaded}")
|
||||
print(f"Skipped existing : {skipped}")
|
||||
if args.prune:
|
||||
print(f"Removed non-customization files: {removed}")
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
+24
-25
@@ -14,7 +14,7 @@ from server.metrics import (
|
||||
MetricsCollector,
|
||||
)
|
||||
|
||||
from quart import Quart, request, jsonify, render_template
|
||||
from quart import Quart, request, jsonify, render_template, send_from_directory
|
||||
import logging, json, os, re, time
|
||||
from typing import cast
|
||||
|
||||
@@ -144,6 +144,7 @@ class Server:
|
||||
database=os.getenv('EDGEDB_DATABASE', None),
|
||||
)
|
||||
|
||||
await self._record_gameplay_end(game_state)
|
||||
await await_log(self.logger.info(f'GAME ENDED: Winner is {[x['name'] for x in game_state['board']['snakes']]}'))
|
||||
await self._delete_game_board(game_state)
|
||||
await self.metrics_collector.record_game_end(game_state)
|
||||
@@ -190,36 +191,30 @@ class Server:
|
||||
{'Content-Type': 'text/plain; version=0.0.4; charset=utf-8'},
|
||||
)
|
||||
|
||||
@self.app.get('/dashboard/summary')
|
||||
async def dashboard_summary():
|
||||
summary = await self._get_dashboard_summary()
|
||||
return jsonify(summary)
|
||||
|
||||
@self.app.get('/dashboard')
|
||||
async def dashboard_view():
|
||||
initial_game_id = request.args.get('game_id', '')
|
||||
initial_summary = await self._get_dashboard_summary()
|
||||
initial_games = await self._get_dashboard_games(limit=100)
|
||||
return await render_template(
|
||||
'dashboard.html',
|
||||
initial_game_id=initial_game_id,
|
||||
initial_summary=initial_summary,
|
||||
initial_games=initial_games,
|
||||
)
|
||||
|
||||
@self.app.get('/dashboard/games')
|
||||
async def dashboard_games():
|
||||
raw_limit = request.args.get('limit', '50')
|
||||
try:
|
||||
limit = max(1, min(200, int(raw_limit)))
|
||||
except ValueError:
|
||||
limit = 50
|
||||
games = await self._get_dashboard_games(limit)
|
||||
return jsonify(games)
|
||||
|
||||
@self.app.get('/dashboard/game/<string:game_id>')
|
||||
async def dashboard_game_replay(game_id:str):
|
||||
replay = await self._get_dashboard_game_replay(game_id)
|
||||
if replay is None:
|
||||
return jsonify({'error':'game_not_found', 'game_id':game_id}), 404
|
||||
return jsonify({'error': 'game_not_found', 'game_id': game_id}), 404
|
||||
return jsonify(replay)
|
||||
|
||||
@self.app.get('/dashboard/customizations/<path:asset_path>')
|
||||
async def dashboard_customizations_asset(asset_path:str):
|
||||
customization_root = os.path.join(self.data_path, 'server', 'static', 'customizations')
|
||||
return await send_from_directory(customization_root, asset_path)
|
||||
|
||||
async def run(self, host:str='0.0.0.0', port:int=8000, debug:bool=False):
|
||||
logging.getLogger('werkzeug').setLevel(logging.ERROR)
|
||||
|
||||
@@ -372,7 +367,11 @@ class Server:
|
||||
if self.gameplay_database is None:
|
||||
return
|
||||
try:
|
||||
await self.gameplay_database.record_game_start(game_state)
|
||||
await self.gameplay_database.record_game_start(
|
||||
game_state,
|
||||
snake_type=self.snake_type,
|
||||
snake_version=self.snake_version,
|
||||
)
|
||||
except Exception as error:
|
||||
await await_log(self.logger.warning(f'Gameplay DB start record failed:{error}'))
|
||||
|
||||
@@ -405,28 +404,28 @@ class Server:
|
||||
|
||||
async def _get_dashboard_summary(self) -> dict:
|
||||
if self.gameplay_database is None:
|
||||
return {'enabled':False}
|
||||
return {'enabled': False}
|
||||
try:
|
||||
summary = await self.gameplay_database.get_summary()
|
||||
summary['enabled'] = True
|
||||
return summary
|
||||
except Exception as error:
|
||||
await await_log(self.logger.warning(f'Gameplay DB summary failed:{error}'))
|
||||
return {'enabled':True, 'error':'summary_unavailable'}
|
||||
return {'enabled': True, 'error':' summary_unavailable'}
|
||||
|
||||
async def _get_dashboard_games(self, limit:int=50) -> dict:
|
||||
if self.gameplay_database is None:
|
||||
return {'enabled':False, 'games':[]}
|
||||
return {'enabled': False, 'games': []}
|
||||
try:
|
||||
games = await self.gameplay_database.list_games(limit=limit)
|
||||
return {'enabled':True, 'games':games}
|
||||
return {'enabled': True, 'games': games}
|
||||
except Exception as error:
|
||||
await await_log(self.logger.warning(f'Gameplay DB game list failed:{error}'))
|
||||
return {'enabled':True, 'error':'games_unavailable', 'games':[]}
|
||||
return {'enabled': True, 'error': 'games_unavailable', 'games': []}
|
||||
|
||||
async def _get_dashboard_game_replay(self, game_id:str) -> dict | None:
|
||||
if self.gameplay_database is None:
|
||||
return {'enabled':False, 'error':'database_disabled', 'game_id':game_id}
|
||||
return {'enabled': False, 'error': 'database_disabled', 'game_id': game_id}
|
||||
try:
|
||||
replay = await self.gameplay_database.get_game_replay(game_id)
|
||||
if replay is None:
|
||||
@@ -435,4 +434,4 @@ class Server:
|
||||
return replay
|
||||
except Exception as error:
|
||||
await await_log(self.logger.warning(f'Gameplay DB replay failed:{error}'))
|
||||
return {'enabled':True, 'error':'replay_unavailable', 'game_id':game_id}
|
||||
return {'enabled': True, 'error': 'replay_unavailable', 'game_id': game_id}
|
||||
|
||||
@@ -3,7 +3,7 @@ import asyncio, sqlite3, json
|
||||
from pathlib import Path
|
||||
|
||||
class GameplayDatabase:
|
||||
def __init__(self, db_path:str, busy_timeout_ms:int = 5000):
|
||||
def __init__(self, db_path:str, busy_timeout_ms:int=5000):
|
||||
self.db_path = db_path
|
||||
self.busy_timeout_ms = max(1000, int(busy_timeout_ms))
|
||||
self._initialize_database()
|
||||
@@ -18,12 +18,15 @@ class GameplayDatabase:
|
||||
connection.execute("PRAGMA foreign_keys = ON")
|
||||
connection.execute("PRAGMA journal_mode = WAL")
|
||||
connection.execute("PRAGMA synchronous = NORMAL")
|
||||
connection.execute("PRAGMA temp_store = MEMORY")
|
||||
connection.execute("PRAGMA journal_size_limit = 1048576")
|
||||
connection.execute(f"PRAGMA busy_timeout = {self.busy_timeout_ms}")
|
||||
return connection
|
||||
|
||||
def _initialize_database(self) -> None:
|
||||
Path(self.db_path).parent.mkdir(parents=True, exist_ok=True)
|
||||
with self._connect() as connection:
|
||||
connection.execute("PRAGMA auto_vacuum = INCREMENTAL")
|
||||
connection.executescript("""
|
||||
CREATE TABLE IF NOT EXISTS games (
|
||||
game_id TEXT PRIMARY KEY,
|
||||
@@ -37,6 +40,8 @@ class GameplayDatabase:
|
||||
ruleset_version TEXT,
|
||||
your_snake_id TEXT,
|
||||
your_snake_name TEXT,
|
||||
your_snake_type TEXT,
|
||||
your_snake_version TEXT,
|
||||
winner_names_json TEXT,
|
||||
winner_you INTEGER NOT NULL DEFAULT 0,
|
||||
final_turn INTEGER NOT NULL DEFAULT 0,
|
||||
@@ -79,8 +84,11 @@ class GameplayDatabase:
|
||||
CREATE INDEX IF NOT EXISTS idx_turns_game_turn ON turns(game_id, turn);
|
||||
CREATE INDEX IF NOT EXISTS idx_games_status ON games(status);
|
||||
CREATE INDEX IF NOT EXISTS idx_snake_turns_game_turn ON snake_turns(game_id, turn);
|
||||
""")
|
||||
""")
|
||||
self._ensure_column_exists(connection, "turns", "my_thinking_json", "TEXT")
|
||||
self._ensure_column_exists(connection, "games", "your_snake_type", "TEXT")
|
||||
self._ensure_column_exists(connection, "games", "your_snake_version", "TEXT")
|
||||
connection.execute("PRAGMA optimize")
|
||||
|
||||
def _ensure_column_exists(self, connection:sqlite3.Connection, table_name:str, column_name:str, column_type:str) -> None:
|
||||
existing = connection.execute(f"PRAGMA table_info({table_name})").fetchall()
|
||||
@@ -125,7 +133,7 @@ class GameplayDatabase:
|
||||
return "down"
|
||||
return None
|
||||
|
||||
def _record_game_start_sync(self, game_state:dict) -> None:
|
||||
def _record_game_start_sync(self, game_state:dict, snake_type:str|None=None, snake_version:str|None=None) -> None:
|
||||
game = game_state.get("game", {})
|
||||
board = game_state.get("board", {})
|
||||
you = self._extract_you(game_state)
|
||||
@@ -135,8 +143,9 @@ class GameplayDatabase:
|
||||
connection.execute("""
|
||||
INSERT INTO games (
|
||||
game_id, started_at, width, height, source, map_name,
|
||||
ruleset_name, ruleset_version, your_snake_id, your_snake_name, status
|
||||
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 'running')
|
||||
ruleset_name, ruleset_version, your_snake_id, your_snake_name,
|
||||
your_snake_type, your_snake_version, status
|
||||
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 'running')
|
||||
ON CONFLICT(game_id) DO UPDATE SET
|
||||
width = excluded.width,
|
||||
height = excluded.height,
|
||||
@@ -146,6 +155,8 @@ class GameplayDatabase:
|
||||
ruleset_version = excluded.ruleset_version,
|
||||
your_snake_id = excluded.your_snake_id,
|
||||
your_snake_name = excluded.your_snake_name,
|
||||
your_snake_type = excluded.your_snake_type,
|
||||
your_snake_version = excluded.your_snake_version,
|
||||
status = 'running'
|
||||
""",
|
||||
(
|
||||
@@ -159,8 +170,13 @@ class GameplayDatabase:
|
||||
ruleset.get("version"),
|
||||
you.get("id"),
|
||||
you.get("name"),
|
||||
)
|
||||
snake_type,
|
||||
snake_version,
|
||||
),
|
||||
)
|
||||
connection.execute("PRAGMA wal_checkpoint(PASSIVE)")
|
||||
connection.execute("PRAGMA incremental_vacuum(200)")
|
||||
connection.execute("PRAGMA optimize")
|
||||
|
||||
def _record_turn_sync(self, game_state:dict, my_move:str|None, my_thinking:dict|None) -> None:
|
||||
game = game_state.get("game", {})
|
||||
@@ -317,7 +333,7 @@ class GameplayDatabase:
|
||||
).fetchone()
|
||||
|
||||
recent = connection.execute("""
|
||||
SELECT game_id, started_at, ended_at, map_name, your_snake_name, winner_you, final_turn, status
|
||||
SELECT game_id, started_at, ended_at, map_name, your_snake_name, your_snake_type, your_snake_version, winner_you, final_turn, status
|
||||
FROM games
|
||||
ORDER BY started_at DESC
|
||||
LIMIT ?
|
||||
@@ -339,6 +355,8 @@ class GameplayDatabase:
|
||||
"ended_at": row["ended_at"],
|
||||
"map": row["map_name"],
|
||||
"snake": row["your_snake_name"],
|
||||
"snake_type": row["your_snake_type"],
|
||||
"snake_version": row["your_snake_version"],
|
||||
"winner_you": bool(row["winner_you"]),
|
||||
"final_turn": int(row["final_turn"] or 0),
|
||||
"status": row["status"],
|
||||
@@ -349,7 +367,8 @@ class GameplayDatabase:
|
||||
with self._connect() as connection:
|
||||
rows = connection.execute("""
|
||||
SELECT game_id, started_at, ended_at, map_name, source, ruleset_name,
|
||||
your_snake_name, winner_you, winner_names_json, final_turn, status
|
||||
your_snake_name, your_snake_type, your_snake_version,
|
||||
winner_you, winner_names_json, final_turn, status
|
||||
FROM games
|
||||
ORDER BY started_at DESC
|
||||
LIMIT ?
|
||||
@@ -365,6 +384,8 @@ class GameplayDatabase:
|
||||
"source": row["source"],
|
||||
"ruleset": row["ruleset_name"],
|
||||
"snake": row["your_snake_name"],
|
||||
"snake_type": row["your_snake_type"],
|
||||
"snake_version": row["your_snake_version"],
|
||||
"winner_you": bool(row["winner_you"]),
|
||||
"winner_names": self._from_json(row["winner_names_json"]) or [],
|
||||
"final_turn": int(row["final_turn"] or 0),
|
||||
@@ -376,6 +397,7 @@ class GameplayDatabase:
|
||||
game_row = connection.execute("""
|
||||
SELECT game_id, started_at, ended_at, width, height, source, map_name,
|
||||
ruleset_name, ruleset_version, your_snake_id, your_snake_name,
|
||||
your_snake_type, your_snake_version,
|
||||
winner_names_json, winner_you, final_turn, status
|
||||
FROM games
|
||||
WHERE game_id = ?
|
||||
@@ -448,6 +470,8 @@ class GameplayDatabase:
|
||||
"ruleset_version": game_row["ruleset_version"],
|
||||
"your_snake_id": game_row["your_snake_id"],
|
||||
"your_snake_name": game_row["your_snake_name"],
|
||||
"your_snake_type": game_row["your_snake_type"],
|
||||
"your_snake_version": game_row["your_snake_version"],
|
||||
"winner_names": self._from_json(game_row["winner_names_json"]) or [],
|
||||
"winner_you": bool(game_row["winner_you"]),
|
||||
"final_turn": int(game_row["final_turn"] or 0),
|
||||
@@ -456,8 +480,8 @@ class GameplayDatabase:
|
||||
"turns": replay_turns,
|
||||
}
|
||||
|
||||
async def record_game_start(self, game_state:dict) -> None:
|
||||
await asyncio.to_thread(self._record_game_start_sync, game_state)
|
||||
async def record_game_start(self, game_state: dict, snake_type:str|None=None, snake_version:str|None=None) -> None:
|
||||
await asyncio.to_thread(self._record_game_start_sync, game_state, snake_type, snake_version)
|
||||
|
||||
async def record_turn(self, game_state:dict, my_move:str|None, my_thinking:dict|None=None) -> None:
|
||||
await asyncio.to_thread(self._record_turn_sync, game_state, my_move, my_thinking)
|
||||
|
||||
+747
-75
File diff suppressed because it is too large
Load Diff
Reference in New Issue
Block a user