add GameplayDatabase database with dashboard
This commit is contained in:
@@ -0,0 +1,478 @@
|
||||
from datetime import datetime, timezone
|
||||
import asyncio, sqlite3, json
|
||||
from pathlib import Path
|
||||
|
||||
class GameplayDatabase:
|
||||
def __init__(self, db_path:str, busy_timeout_ms:int = 5000):
|
||||
self.db_path = db_path
|
||||
self.busy_timeout_ms = max(1000, int(busy_timeout_ms))
|
||||
self._initialize_database()
|
||||
|
||||
def _connect(self) -> sqlite3.Connection:
|
||||
connection = sqlite3.connect(
|
||||
self.db_path,
|
||||
timeout=max(1, self.busy_timeout_ms // 1000),
|
||||
isolation_level=None,
|
||||
)
|
||||
connection.row_factory = sqlite3.Row
|
||||
connection.execute("PRAGMA foreign_keys = ON")
|
||||
connection.execute("PRAGMA journal_mode = WAL")
|
||||
connection.execute("PRAGMA synchronous = NORMAL")
|
||||
connection.execute(f"PRAGMA busy_timeout = {self.busy_timeout_ms}")
|
||||
return connection
|
||||
|
||||
def _initialize_database(self) -> None:
|
||||
Path(self.db_path).parent.mkdir(parents=True, exist_ok=True)
|
||||
with self._connect() as connection:
|
||||
connection.executescript("""
|
||||
CREATE TABLE IF NOT EXISTS games (
|
||||
game_id TEXT PRIMARY KEY,
|
||||
started_at TEXT NOT NULL,
|
||||
ended_at TEXT,
|
||||
width INTEGER,
|
||||
height INTEGER,
|
||||
source TEXT,
|
||||
map_name TEXT,
|
||||
ruleset_name TEXT,
|
||||
ruleset_version TEXT,
|
||||
your_snake_id TEXT,
|
||||
your_snake_name TEXT,
|
||||
winner_names_json TEXT,
|
||||
winner_you INTEGER NOT NULL DEFAULT 0,
|
||||
final_turn INTEGER NOT NULL DEFAULT 0,
|
||||
status TEXT NOT NULL DEFAULT 'running'
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS turns (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
game_id TEXT NOT NULL,
|
||||
turn INTEGER NOT NULL,
|
||||
observed_at TEXT NOT NULL,
|
||||
my_move TEXT,
|
||||
my_thinking_json TEXT,
|
||||
board_state_json TEXT NOT NULL,
|
||||
snakes_json TEXT NOT NULL,
|
||||
you_json TEXT NOT NULL,
|
||||
food_json TEXT NOT NULL,
|
||||
hazards_json TEXT NOT NULL,
|
||||
UNIQUE (game_id, turn),
|
||||
FOREIGN KEY (game_id) REFERENCES games(game_id) ON DELETE CASCADE
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS snake_turns (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
game_id TEXT NOT NULL,
|
||||
turn INTEGER NOT NULL,
|
||||
snake_id TEXT NOT NULL,
|
||||
snake_name TEXT,
|
||||
health INTEGER,
|
||||
length INTEGER,
|
||||
head_x INTEGER,
|
||||
head_y INTEGER,
|
||||
body_json TEXT NOT NULL,
|
||||
is_you INTEGER NOT NULL DEFAULT 0,
|
||||
inferred_move TEXT,
|
||||
UNIQUE (game_id, turn, snake_id),
|
||||
FOREIGN KEY (game_id) REFERENCES games(game_id) ON DELETE CASCADE
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_turns_game_turn ON turns(game_id, turn);
|
||||
CREATE INDEX IF NOT EXISTS idx_games_status ON games(status);
|
||||
CREATE INDEX IF NOT EXISTS idx_snake_turns_game_turn ON snake_turns(game_id, turn);
|
||||
""")
|
||||
self._ensure_column_exists(connection, "turns", "my_thinking_json", "TEXT")
|
||||
|
||||
def _ensure_column_exists(self, connection:sqlite3.Connection, table_name:str, column_name:str, column_type:str) -> None:
|
||||
existing = connection.execute(f"PRAGMA table_info({table_name})").fetchall()
|
||||
if any(row["name"] == column_name for row in existing):
|
||||
return
|
||||
|
||||
connection.execute(f"ALTER TABLE {table_name} ADD COLUMN {column_name} {column_type}")
|
||||
|
||||
def _utc_now(self) -> str:
|
||||
return datetime.now(timezone.utc).isoformat()
|
||||
|
||||
def _to_json(self, payload:dict) -> str:
|
||||
return json.dumps(payload, ensure_ascii=False, separators=(",", ":"))
|
||||
|
||||
def _from_json(self, payload:str|None):
|
||||
if payload is None or payload == "":
|
||||
return None
|
||||
try:
|
||||
return json.loads(payload)
|
||||
except json.JSONDecodeError:
|
||||
return None
|
||||
|
||||
def _extract_snakes(self, game_state:dict) -> list[dict]:
|
||||
return list(game_state.get("board", {}).get("snakes", []))
|
||||
|
||||
def _extract_you(self, game_state:dict) -> dict:
|
||||
return dict(game_state.get("you", {}))
|
||||
|
||||
def _infer_direction(self, old_head:tuple[int, int]|None, new_head:tuple[int, int]|None) -> str|None:
|
||||
if old_head is None or new_head is None:
|
||||
return None
|
||||
|
||||
delta_x = new_head[0] - old_head[0]
|
||||
delta_y = new_head[1] - old_head[1]
|
||||
if delta_x == 1 and delta_y == 0:
|
||||
return "right"
|
||||
if delta_x == -1 and delta_y == 0:
|
||||
return "left"
|
||||
if delta_x == 0 and delta_y == 1:
|
||||
return "up"
|
||||
if delta_x == 0 and delta_y == -1:
|
||||
return "down"
|
||||
return None
|
||||
|
||||
def _record_game_start_sync(self, game_state:dict) -> None:
|
||||
game = game_state.get("game", {})
|
||||
board = game_state.get("board", {})
|
||||
you = self._extract_you(game_state)
|
||||
ruleset = game.get("ruleset", {})
|
||||
|
||||
with self._connect() as connection:
|
||||
connection.execute("""
|
||||
INSERT INTO games (
|
||||
game_id, started_at, width, height, source, map_name,
|
||||
ruleset_name, ruleset_version, your_snake_id, your_snake_name, status
|
||||
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 'running')
|
||||
ON CONFLICT(game_id) DO UPDATE SET
|
||||
width = excluded.width,
|
||||
height = excluded.height,
|
||||
source = excluded.source,
|
||||
map_name = excluded.map_name,
|
||||
ruleset_name = excluded.ruleset_name,
|
||||
ruleset_version = excluded.ruleset_version,
|
||||
your_snake_id = excluded.your_snake_id,
|
||||
your_snake_name = excluded.your_snake_name,
|
||||
status = 'running'
|
||||
""",
|
||||
(
|
||||
game.get("id"),
|
||||
self._utc_now(),
|
||||
board.get("width"),
|
||||
board.get("height"),
|
||||
game.get("source"),
|
||||
game.get("map"),
|
||||
ruleset.get("name"),
|
||||
ruleset.get("version"),
|
||||
you.get("id"),
|
||||
you.get("name"),
|
||||
)
|
||||
)
|
||||
|
||||
def _record_turn_sync(self, game_state:dict, my_move:str|None, my_thinking:dict|None) -> None:
|
||||
game = game_state.get("game", {})
|
||||
board = game_state.get("board", {})
|
||||
snakes = self._extract_snakes(game_state)
|
||||
you = self._extract_you(game_state)
|
||||
game_id = game.get("id")
|
||||
turn = int(game_state.get("turn", 0))
|
||||
|
||||
with self._connect() as connection:
|
||||
connection.execute("""
|
||||
INSERT INTO turns (
|
||||
game_id, turn, observed_at, my_move, my_thinking_json,
|
||||
board_state_json, snakes_json, you_json, food_json, hazards_json
|
||||
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||||
ON CONFLICT(game_id, turn) DO UPDATE SET
|
||||
observed_at = excluded.observed_at,
|
||||
my_move = excluded.my_move,
|
||||
my_thinking_json = excluded.my_thinking_json,
|
||||
board_state_json = excluded.board_state_json,
|
||||
snakes_json = excluded.snakes_json,
|
||||
you_json = excluded.you_json,
|
||||
food_json = excluded.food_json,
|
||||
hazards_json = excluded.hazards_json
|
||||
""",
|
||||
(
|
||||
game_id,
|
||||
turn,
|
||||
self._utc_now(),
|
||||
my_move,
|
||||
self._to_json(my_thinking) if my_thinking is not None else None,
|
||||
self._to_json(board),
|
||||
self._to_json(snakes),
|
||||
self._to_json(you),
|
||||
self._to_json(board.get("food", [])),
|
||||
self._to_json(board.get("hazards", [])),
|
||||
),
|
||||
)
|
||||
|
||||
previous_positions:dict[str, tuple[int, int]] = {}
|
||||
if turn > 0:
|
||||
previous_rows = connection.execute("""
|
||||
SELECT snake_id, head_x, head_y
|
||||
FROM snake_turns
|
||||
WHERE game_id = ? AND turn = ?
|
||||
""",
|
||||
(game_id, turn - 1),
|
||||
).fetchall()
|
||||
|
||||
previous_positions = {
|
||||
row["snake_id"]: (int(row["head_x"]), int(row["head_y"]))
|
||||
for row in previous_rows
|
||||
if row["head_x"] is not None and row["head_y"] is not None
|
||||
}
|
||||
|
||||
you_id = you.get("id")
|
||||
for snake in snakes:
|
||||
snake_id = snake.get("id")
|
||||
head = snake.get("head", {})
|
||||
head_x = head.get("x")
|
||||
head_y = head.get("y")
|
||||
if snake_id is None:
|
||||
continue
|
||||
|
||||
new_head = (
|
||||
(int(head_x), int(head_y))
|
||||
if head_x is not None and head_y is not None
|
||||
else None
|
||||
)
|
||||
inferred = self._infer_direction(
|
||||
previous_positions.get(snake_id), new_head
|
||||
)
|
||||
|
||||
connection.execute("""
|
||||
INSERT INTO snake_turns (
|
||||
game_id, turn, snake_id, snake_name, health, length,
|
||||
head_x, head_y, body_json, is_you, inferred_move
|
||||
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||||
ON CONFLICT(game_id, turn, snake_id) DO UPDATE SET
|
||||
snake_name = excluded.snake_name,
|
||||
health = excluded.health,
|
||||
length = excluded.length,
|
||||
head_x = excluded.head_x,
|
||||
head_y = excluded.head_y,
|
||||
body_json = excluded.body_json,
|
||||
is_you = excluded.is_you,
|
||||
inferred_move = excluded.inferred_move
|
||||
""",
|
||||
(
|
||||
game_id,
|
||||
turn,
|
||||
snake_id,
|
||||
snake.get("name"),
|
||||
snake.get("health"),
|
||||
snake.get("length"),
|
||||
head_x,
|
||||
head_y,
|
||||
self._to_json(snake.get("body", [])),
|
||||
1 if snake_id == you_id else 0,
|
||||
inferred,
|
||||
),
|
||||
)
|
||||
|
||||
connection.execute("""
|
||||
UPDATE games
|
||||
SET final_turn = CASE WHEN ? > final_turn THEN ? ELSE final_turn END
|
||||
WHERE game_id = ?
|
||||
""",
|
||||
(turn, turn, game_id),
|
||||
)
|
||||
|
||||
def _record_game_end_sync(self, game_state:dict) -> None:
|
||||
game = game_state.get("game", {})
|
||||
game_id = game.get("id")
|
||||
board = game_state.get("board", {})
|
||||
snakes = list(board.get("snakes", []))
|
||||
you = self._extract_you(game_state)
|
||||
winner_names = [snake.get("name") for snake in snakes if snake.get("name")]
|
||||
you_id = you.get("id")
|
||||
winner_you = any(snake.get("id") == you_id for snake in snakes)
|
||||
|
||||
with self._connect() as connection:
|
||||
connection.execute("""
|
||||
UPDATE games
|
||||
SET ended_at = ?,
|
||||
winner_names_json = ?,
|
||||
winner_you = ?,
|
||||
final_turn = CASE WHEN ? > final_turn THEN ? ELSE final_turn END,
|
||||
status = 'finished'
|
||||
WHERE game_id = ?
|
||||
""",
|
||||
(
|
||||
self._utc_now(),
|
||||
self._to_json(winner_names),
|
||||
1 if winner_you else 0,
|
||||
int(game_state.get("turn", 0)),
|
||||
int(game_state.get("turn", 0)),
|
||||
game_id,
|
||||
),
|
||||
)
|
||||
|
||||
def _get_summary_sync(self, recent_limit:int=15) -> dict:
|
||||
with self._connect() as connection:
|
||||
totals = connection.execute("""
|
||||
SELECT
|
||||
COUNT(*) AS total_games,
|
||||
SUM(CASE WHEN status = 'running' THEN 1 ELSE 0 END) AS running_games,
|
||||
SUM(CASE WHEN status = 'finished' THEN 1 ELSE 0 END) AS finished_games,
|
||||
SUM(CASE WHEN status = 'finished' AND winner_you = 1 THEN 1 ELSE 0 END) AS wins,
|
||||
SUM(CASE WHEN status = 'finished' AND winner_you = 0 THEN 1 ELSE 0 END) AS losses,
|
||||
AVG(CASE WHEN status = 'finished' THEN final_turn ELSE NULL END) AS avg_turns
|
||||
FROM games
|
||||
"""
|
||||
).fetchone()
|
||||
|
||||
recent = connection.execute("""
|
||||
SELECT game_id, started_at, ended_at, map_name, your_snake_name, winner_you, final_turn, status
|
||||
FROM games
|
||||
ORDER BY started_at DESC
|
||||
LIMIT ?
|
||||
""",
|
||||
(max(1, int(recent_limit)),),
|
||||
).fetchall()
|
||||
|
||||
return {
|
||||
"database": self.db_path,
|
||||
"total_games": int(totals["total_games"] or 0),
|
||||
"running_games": int(totals["running_games"] or 0),
|
||||
"finished_games": int(totals["finished_games"] or 0),
|
||||
"wins": int(totals["wins"] or 0),
|
||||
"losses": int(totals["losses"] or 0),
|
||||
"avg_turns_finished": round(float(totals["avg_turns"] or 0.0), 2),
|
||||
"recent_games": [{
|
||||
"game_id": row["game_id"],
|
||||
"started_at": row["started_at"],
|
||||
"ended_at": row["ended_at"],
|
||||
"map": row["map_name"],
|
||||
"snake": row["your_snake_name"],
|
||||
"winner_you": bool(row["winner_you"]),
|
||||
"final_turn": int(row["final_turn"] or 0),
|
||||
"status": row["status"],
|
||||
} for row in recent ],
|
||||
}
|
||||
|
||||
def _list_games_sync(self, limit:int=50) -> list[dict]:
|
||||
with self._connect() as connection:
|
||||
rows = connection.execute("""
|
||||
SELECT game_id, started_at, ended_at, map_name, source, ruleset_name,
|
||||
your_snake_name, winner_you, winner_names_json, final_turn, status
|
||||
FROM games
|
||||
ORDER BY started_at DESC
|
||||
LIMIT ?
|
||||
""",
|
||||
(max(1, int(limit)),),
|
||||
).fetchall()
|
||||
|
||||
return [{
|
||||
"game_id": row["game_id"],
|
||||
"started_at": row["started_at"],
|
||||
"ended_at": row["ended_at"],
|
||||
"map": row["map_name"],
|
||||
"source": row["source"],
|
||||
"ruleset": row["ruleset_name"],
|
||||
"snake": row["your_snake_name"],
|
||||
"winner_you": bool(row["winner_you"]),
|
||||
"winner_names": self._from_json(row["winner_names_json"]) or [],
|
||||
"final_turn": int(row["final_turn"] or 0),
|
||||
"status": row["status"],
|
||||
} for row in rows]
|
||||
|
||||
def _get_game_replay_sync(self, game_id:str) -> dict | None:
|
||||
with self._connect() as connection:
|
||||
game_row = connection.execute("""
|
||||
SELECT game_id, started_at, ended_at, width, height, source, map_name,
|
||||
ruleset_name, ruleset_version, your_snake_id, your_snake_name,
|
||||
winner_names_json, winner_you, final_turn, status
|
||||
FROM games
|
||||
WHERE game_id = ?
|
||||
""",
|
||||
(game_id,),
|
||||
).fetchone()
|
||||
|
||||
if game_row is None:
|
||||
return None
|
||||
|
||||
turn_rows = connection.execute("""
|
||||
SELECT turn, observed_at, my_move, my_thinking_json,
|
||||
board_state_json, food_json, hazards_json, you_json
|
||||
FROM turns
|
||||
WHERE game_id = ?
|
||||
ORDER BY turn ASC
|
||||
""",
|
||||
(game_id,),
|
||||
).fetchall()
|
||||
|
||||
snake_rows = connection.execute("""
|
||||
SELECT turn, snake_id, snake_name, health, length, head_x, head_y,
|
||||
body_json, is_you, inferred_move
|
||||
FROM snake_turns
|
||||
WHERE game_id = ?
|
||||
ORDER BY turn ASC, is_you DESC, snake_name ASC
|
||||
""",
|
||||
(game_id,),
|
||||
).fetchall()
|
||||
|
||||
snakes_by_turn:dict[int, list[dict]] = {}
|
||||
for row in snake_rows:
|
||||
turn = int(row["turn"])
|
||||
snakes_by_turn.setdefault(turn, []).append({
|
||||
"snake_id": row["snake_id"],
|
||||
"snake_name": row["snake_name"],
|
||||
"health": row["health"],
|
||||
"length": row["length"],
|
||||
"head": {"x": row["head_x"], "y": row["head_y"]},
|
||||
"body": self._from_json(row["body_json"]) or [],
|
||||
"is_you": bool(row["is_you"]),
|
||||
"inferred_move": row["inferred_move"],
|
||||
})
|
||||
|
||||
replay_turns = []
|
||||
for row in turn_rows:
|
||||
turn_number = int(row["turn"])
|
||||
replay_turns.append({
|
||||
"turn": turn_number,
|
||||
"observed_at": row["observed_at"],
|
||||
"my_move": row["my_move"],
|
||||
"my_thinking": self._from_json(row["my_thinking_json"]),
|
||||
"board": self._from_json(row["board_state_json"]),
|
||||
"food": self._from_json(row["food_json"]) or [],
|
||||
"hazards": self._from_json(row["hazards_json"]) or [],
|
||||
"you": self._from_json(row["you_json"]) or {},
|
||||
"snakes": snakes_by_turn.get(turn_number, []),
|
||||
})
|
||||
|
||||
return {
|
||||
"game": {
|
||||
"game_id": game_row["game_id"],
|
||||
"started_at": game_row["started_at"],
|
||||
"ended_at": game_row["ended_at"],
|
||||
"width": game_row["width"],
|
||||
"height": game_row["height"],
|
||||
"source": game_row["source"],
|
||||
"map": game_row["map_name"],
|
||||
"ruleset_name": game_row["ruleset_name"],
|
||||
"ruleset_version": game_row["ruleset_version"],
|
||||
"your_snake_id": game_row["your_snake_id"],
|
||||
"your_snake_name": game_row["your_snake_name"],
|
||||
"winner_names": self._from_json(game_row["winner_names_json"]) or [],
|
||||
"winner_you": bool(game_row["winner_you"]),
|
||||
"final_turn": int(game_row["final_turn"] or 0),
|
||||
"status": game_row["status"],
|
||||
},
|
||||
"turns": replay_turns,
|
||||
}
|
||||
|
||||
async def record_game_start(self, game_state:dict) -> None:
|
||||
await asyncio.to_thread(self._record_game_start_sync, game_state)
|
||||
|
||||
async def record_turn(self, game_state:dict, my_move:str|None, my_thinking:dict|None=None) -> None:
|
||||
await asyncio.to_thread(self._record_turn_sync, game_state, my_move, my_thinking)
|
||||
|
||||
async def record_game_end(self, game_state:dict) -> None:
|
||||
await asyncio.to_thread(self._record_game_end_sync, game_state)
|
||||
|
||||
async def get_summary(self, recent_limit:int=15) -> dict:
|
||||
return await asyncio.to_thread(self._get_summary_sync, recent_limit)
|
||||
|
||||
async def list_games(self, limit:int=50) -> list[dict]:
|
||||
return await asyncio.to_thread(self._list_games_sync, limit)
|
||||
|
||||
async def get_game_replay(self, game_id:str) -> dict|None:
|
||||
return await asyncio.to_thread(self._get_game_replay_sync, game_id)
|
||||
|
||||
async def close(self) -> None:
|
||||
return None
|
||||
Reference in New Issue
Block a user