Files
snake-python/server/database/GameplayDatabase.py
T

693 lines
24 KiB
Python

from datetime import datetime, timezone
import asyncio, sqlite3, json, os
from pathlib import Path
_ZSTD_EXT = Path(os.environ.get("SQLITE_ZSTD_EXT", "/usr/local/lib/libsqlite_zstd.so")).expanduser().resolve()
class GameplayDatabase:
def __init__(self, db_path:str, busy_timeout_ms:int=5000):
self.db_path = db_path
self.busy_timeout_ms = max(1000, int(busy_timeout_ms))
self._initialize_database()
def _connect(self) -> sqlite3.Connection:
connection = sqlite3.connect(
self.db_path,
timeout=max(1, self.busy_timeout_ms // 1000),
isolation_level=None,
)
if Path(_ZSTD_EXT).exists():
connection.enable_load_extension(True)
connection.load_extension(str(_ZSTD_EXT))
connection.enable_load_extension(False)
connection.row_factory = sqlite3.Row
connection.execute("PRAGMA foreign_keys = ON")
connection.execute("PRAGMA journal_mode = WAL")
connection.execute("PRAGMA synchronous = NORMAL")
connection.execute("PRAGMA temp_store = MEMORY")
connection.execute("PRAGMA journal_size_limit = 1048576")
connection.execute(f"PRAGMA busy_timeout = {self.busy_timeout_ms}")
return connection
def _ensure_auto_vacuum_full(self, connection:sqlite3.Connection) -> None:
current = connection.execute("PRAGMA auto_vacuum").fetchone()[0]
if current != 1:
connection.execute("PRAGMA auto_vacuum = FULL")
connection.execute("VACUUM")
def _initialize_database(self) -> None:
Path(self.db_path).parent.mkdir(parents=True, exist_ok=True)
with self._connect() as connection:
self._ensure_auto_vacuum_full(connection)
connection.executescript("""
CREATE TABLE IF NOT EXISTS games (
game_id TEXT PRIMARY KEY,
started_at TEXT NOT NULL,
ended_at TEXT,
width INTEGER,
height INTEGER,
source TEXT,
map_name TEXT,
ruleset_name TEXT,
ruleset_version TEXT,
your_snake_id TEXT,
your_snake_name TEXT,
your_snake_type TEXT,
your_snake_version TEXT,
winner_names_json TEXT,
winner_you INTEGER NOT NULL DEFAULT 0,
final_turn INTEGER NOT NULL DEFAULT 0,
status TEXT NOT NULL DEFAULT 'running'
);
CREATE TABLE IF NOT EXISTS turns (
id INTEGER PRIMARY KEY AUTOINCREMENT,
game_id TEXT NOT NULL,
turn INTEGER NOT NULL,
observed_at TEXT NOT NULL,
my_move TEXT,
my_thinking_json TEXT,
board_state_json TEXT NOT NULL,
snakes_json TEXT NOT NULL,
you_json TEXT NOT NULL,
food_json TEXT NOT NULL,
hazards_json TEXT NOT NULL,
UNIQUE (game_id, turn),
FOREIGN KEY (game_id) REFERENCES games(game_id) ON DELETE CASCADE
);
CREATE TABLE IF NOT EXISTS snake_turns (
id INTEGER PRIMARY KEY AUTOINCREMENT,
game_id TEXT NOT NULL,
turn INTEGER NOT NULL,
snake_id TEXT NOT NULL,
snake_name TEXT,
health INTEGER,
length INTEGER,
head_x INTEGER,
head_y INTEGER,
body_json TEXT NOT NULL,
is_you INTEGER NOT NULL DEFAULT 0,
inferred_move TEXT,
UNIQUE (game_id, turn, snake_id),
FOREIGN KEY (game_id) REFERENCES games(game_id) ON DELETE CASCADE
);
""")
self._create_indexes_if_tables(connection)
self._ensure_column_exists(connection, "turns", "my_thinking_json", "TEXT")
self._ensure_column_exists(connection, "games", "your_snake_type", "TEXT")
self._ensure_column_exists(connection, "games", "your_snake_version", "TEXT")
self._ensure_column_exists(connection, "games", "game_type", "TEXT")
self._ensure_column_exists(connection, "snake_turns", "latency", "TEXT")
self._enable_zstd_compression(connection)
connection.execute("PRAGMA optimize")
def _create_indexes_if_tables(self, connection: sqlite3.Connection) -> None:
real_tables = {
row[0] for row in connection.execute(
"SELECT name FROM sqlite_master WHERE type='table'"
).fetchall()
}
indexes = [
("idx_turns_game_turn", "turns", "game_id, turn"),
("idx_games_status", "games", "status"),
("idx_snake_turns_game_turn", "snake_turns", "game_id, turn"),
]
for idx_name, table, cols in indexes:
if table in real_tables:
connection.execute(f"CREATE INDEX IF NOT EXISTS {idx_name} ON {table}({cols})")
def _ensure_column_exists(self, connection:sqlite3.Connection, table_name:str, column_name:str, column_type:str) -> None:
existing = connection.execute(f"PRAGMA table_info({table_name})").fetchall()
if any(row["name"] == column_name for row in existing):
return
connection.execute(f"ALTER TABLE {table_name} ADD COLUMN {column_name} {column_type}")
def _enable_zstd_compression(self, connection: sqlite3.Connection) -> None:
compressed_columns = [
("turns", "board_state_json"),
("turns", "snakes_json"),
("turns", "you_json"),
("turns", "food_json"),
("turns", "hazards_json"),
("snake_turns", "body_json"),
]
for table, column in compressed_columns:
try:
connection.execute(
"SELECT zstd_enable_transparent(?)",
[json.dumps({"table": table, "column": column, "compression_level": 6, "dict_chooser": "'a'"})],
)
except sqlite3.OperationalError:
pass # already enabled
connection.execute("SELECT zstd_incremental_maintenance(null, 1)")
def _utc_now(self) -> str:
return datetime.now(timezone.utc).isoformat()
def _parse_utc_timestamp(self, value:str|None) -> datetime|None:
if not value:
return None
normalized = value.strip()
if normalized.endswith("Z"):
normalized = normalized[:-1] + "+00:00"
try:
parsed = datetime.fromisoformat(normalized)
except ValueError:
return None
if parsed.tzinfo is None:
return parsed.replace(tzinfo=timezone.utc)
return parsed.astimezone(timezone.utc)
def _to_json(self, payload:object) -> str:
return json.dumps(payload, ensure_ascii=False, separators=(",", ":"))
def _from_json(self, payload:str|None):
if payload is None or payload == "":
return None
try:
return json.loads(payload)
except json.JSONDecodeError:
return None
def _extract_snakes(self, game_state:dict) -> list[dict]:
return list(game_state.get("board", {}).get("snakes", []))
def _extract_you(self, game_state:dict) -> dict:
return dict(game_state.get("you", {}))
def _infer_direction(self, old_head:tuple[int, int]|None, new_head:tuple[int, int]|None) -> str|None:
if old_head is None or new_head is None:
return None
delta_x = new_head[0] - old_head[0]
delta_y = new_head[1] - old_head[1]
if delta_x == 1 and delta_y == 0:
return "right"
if delta_x == -1 and delta_y == 0:
return "left"
if delta_x == 0 and delta_y == 1:
return "up"
if delta_x == 0 and delta_y == -1:
return "down"
return None
def _derive_game_type(self, board:dict, ruleset:dict) -> str:
initial_snake_count = len(board.get("snakes", []))
if initial_snake_count == 2:
return "duel"
return ruleset.get("name") or "standard"
def _record_game_start_sync(self, game_state:dict, snake_type:str|None=None, snake_version:str|None=None) -> None:
game = game_state.get("game", {})
board = game_state.get("board", {})
you = self._extract_you(game_state)
ruleset = game.get("ruleset", {})
game_type = self._derive_game_type(board, ruleset)
with self._connect() as connection:
connection.execute("""
INSERT INTO games (
game_id, started_at, width, height, source, map_name,
ruleset_name, ruleset_version, your_snake_id, your_snake_name,
your_snake_type, your_snake_version, game_type, status
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 'running')
ON CONFLICT(game_id) DO UPDATE SET
width = excluded.width,
height = excluded.height,
source = excluded.source,
map_name = excluded.map_name,
ruleset_name = excluded.ruleset_name,
ruleset_version = excluded.ruleset_version,
your_snake_id = excluded.your_snake_id,
your_snake_name = excluded.your_snake_name,
your_snake_type = excluded.your_snake_type,
your_snake_version = excluded.your_snake_version,
game_type = excluded.game_type,
status = 'running'
""",
(
game.get("id"),
self._utc_now(),
board.get("width"),
board.get("height"),
game.get("source"),
game.get("map"),
ruleset.get("name"),
ruleset.get("version"),
you.get("id"),
you.get("name"),
snake_type,
snake_version,
game_type,
),
)
connection.execute("PRAGMA wal_checkpoint(PASSIVE)")
connection.execute("PRAGMA optimize")
def _record_turn_sync(self, game_state:dict, my_move:str|None, my_thinking:dict|None) -> None:
game = game_state.get("game", {})
board = game_state.get("board", {})
snakes = self._extract_snakes(game_state)
you = self._extract_you(game_state)
game_id = game.get("id")
turn = int(game_state.get("turn", 0))
with self._connect() as connection:
connection.execute("""
INSERT INTO turns (
game_id, turn, observed_at, my_move, my_thinking_json,
board_state_json, snakes_json, you_json, food_json, hazards_json
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(game_id, turn) DO UPDATE SET
observed_at = excluded.observed_at,
my_move = excluded.my_move,
my_thinking_json = excluded.my_thinking_json,
board_state_json = excluded.board_state_json,
snakes_json = excluded.snakes_json,
you_json = excluded.you_json,
food_json = excluded.food_json,
hazards_json = excluded.hazards_json
""",
(
game_id,
turn,
self._utc_now(),
my_move,
self._to_json(my_thinking) if my_thinking is not None else None,
self._to_json(board),
self._to_json(snakes),
self._to_json(you),
self._to_json(board.get("food", [])),
self._to_json(board.get("hazards", [])),
),
)
previous_positions:dict[str, tuple[int, int]] = {}
if turn > 0:
previous_rows = connection.execute("""
SELECT snake_id, head_x, head_y
FROM snake_turns
WHERE game_id = ? AND turn = ?
""",
(game_id, turn - 1),
).fetchall()
previous_positions = {
row["snake_id"]: (int(row["head_x"]), int(row["head_y"]))
for row in previous_rows
if row["head_x"] is not None and row["head_y"] is not None
}
you_id = you.get("id")
for snake in snakes:
snake_id = snake.get("id")
head = snake.get("head", {})
head_x = head.get("x")
head_y = head.get("y")
if snake_id is None:
continue
new_head = (
(int(head_x), int(head_y))
if head_x is not None and head_y is not None
else None
)
inferred = self._infer_direction(
previous_positions.get(snake_id), new_head
)
connection.execute("""
INSERT INTO snake_turns (
game_id, turn, snake_id, snake_name, health, length,
head_x, head_y, body_json, is_you, inferred_move, latency
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(game_id, turn, snake_id) DO UPDATE SET
snake_name = excluded.snake_name,
health = excluded.health,
length = excluded.length,
head_x = excluded.head_x,
head_y = excluded.head_y,
body_json = excluded.body_json,
is_you = excluded.is_you,
inferred_move = excluded.inferred_move,
latency = excluded.latency
""",
(
game_id,
turn,
snake_id,
snake.get("name"),
snake.get("health"),
snake.get("length"),
head_x,
head_y,
self._to_json(snake.get("body", [])),
1 if snake_id == you_id else 0,
inferred,
snake.get("latency"),
),
)
connection.execute("""
UPDATE games
SET final_turn = CASE WHEN ? > final_turn THEN ? ELSE final_turn END
WHERE game_id = ?
""",
(turn, turn, game_id),
)
def _record_game_end_sync(self, game_state:dict) -> None:
game = game_state.get("game", {})
game_id = game.get("id")
board = game_state.get("board", {})
snakes = list(board.get("snakes", []))
you = self._extract_you(game_state)
winner_names = [snake.get("name") for snake in snakes if snake.get("name")]
you_id = you.get("id")
winner_you = any(snake.get("id") == you_id for snake in snakes)
with self._connect() as connection:
connection.execute("""
UPDATE games
SET ended_at = ?,
winner_names_json = ?,
winner_you = ?,
final_turn = CASE WHEN ? > final_turn THEN ? ELSE final_turn END,
status = 'finished'
WHERE game_id = ?
""",
(
self._utc_now(),
self._to_json(winner_names),
1 if winner_you else 0,
int(game_state.get("turn", 0)),
int(game_state.get("turn", 0)),
game_id,
),
)
def _finalize_stale_running_games_sync(self, stale_after_seconds:int=600) -> int:
threshold = max(60, int(stale_after_seconds))
now_utc = datetime.now(timezone.utc)
finalized = 0
with self._connect() as connection:
rows = connection.execute("""
SELECT game_id, started_at, final_turn, your_snake_id
FROM games
WHERE status = 'running'
ORDER BY started_at ASC
""").fetchall()
for row in rows:
started_at = self._parse_utc_timestamp(row["started_at"])
if started_at is None:
continue
age_seconds = (now_utc - started_at).total_seconds()
if age_seconds < threshold:
continue
game_id = row["game_id"]
your_snake_id = row["your_snake_id"]
final_turn = int(row["final_turn"] or 0)
snake_rows = connection.execute("""
SELECT snake_id, snake_name
FROM snake_turns
WHERE game_id = ? AND turn = ?
ORDER BY is_you DESC, snake_name ASC
""",
(game_id, final_turn),
).fetchall()
if len(snake_rows) == 0:
latest_turn_row = connection.execute("""
SELECT MAX(turn) AS latest_turn
FROM snake_turns
WHERE game_id = ?
""",
(game_id,),
).fetchone()
latest_turn = (
latest_turn_row["latest_turn"]
if latest_turn_row is not None
else None
)
if latest_turn is not None:
final_turn = int(latest_turn)
snake_rows = connection.execute("""
SELECT snake_id, snake_name
FROM snake_turns
WHERE game_id = ? AND turn = ?
ORDER BY is_you DESC, snake_name ASC
""",
(game_id, final_turn),
).fetchall()
survivor_ids = [snake["snake_id"] for snake in snake_rows if snake["snake_id"]]
survivor_names = [snake["snake_name"] for snake in snake_rows if snake["snake_name"]]
winner_you = bool(
your_snake_id
and your_snake_id in survivor_ids
and len(survivor_ids) == 1
)
update_result = connection.execute("""
UPDATE games
SET ended_at = ?,
winner_names_json = ?,
winner_you = ?,
final_turn = CASE WHEN ? > final_turn THEN ? ELSE final_turn END,
status = 'finished'
WHERE game_id = ? AND status = 'running'
""",
(
self._utc_now(),
self._to_json(survivor_names),
1 if winner_you else 0,
final_turn,
final_turn,
game_id,
),
)
if update_result.rowcount > 0:
finalized += 1
return finalized
def _get_summary_sync(self, recent_limit:int=15) -> dict:
with self._connect() as connection:
totals = connection.execute("""
SELECT
COUNT(*) AS total_games,
SUM(CASE WHEN status = 'running' THEN 1 ELSE 0 END) AS running_games,
SUM(CASE WHEN status = 'finished' THEN 1 ELSE 0 END) AS finished_games,
SUM(CASE WHEN status = 'finished' AND winner_you = 1 THEN 1 ELSE 0 END) AS wins,
SUM(CASE WHEN status = 'finished' AND winner_you = 0 THEN 1 ELSE 0 END) AS losses,
AVG(CASE WHEN status = 'finished' THEN final_turn ELSE NULL END) AS avg_turns
FROM games
"""
).fetchone()
by_type = connection.execute("""
SELECT
COALESCE(game_type, ruleset_name, 'unknown') AS type_label,
COUNT(*) AS total,
SUM(CASE WHEN status = 'finished' AND winner_you = 1 THEN 1 ELSE 0 END) AS wins,
SUM(CASE WHEN status = 'finished' AND winner_you = 0 THEN 1 ELSE 0 END) AS losses
FROM games
WHERE status = 'finished'
GROUP BY type_label
ORDER BY total DESC
"""
).fetchall()
recent = connection.execute("""
SELECT game_id, started_at, ended_at, map_name, ruleset_name, game_type,
your_snake_name, your_snake_type, your_snake_version, winner_you, final_turn, status
FROM games
ORDER BY started_at DESC
LIMIT ?
""",
(max(1, int(recent_limit)),),
).fetchall()
return {
"total_games": int(totals["total_games"] or 0),
"running_games": int(totals["running_games"] or 0),
"finished_games": int(totals["finished_games"] or 0),
"wins": int(totals["wins"] or 0),
"losses": int(totals["losses"] or 0),
"avg_turns_finished": round(float(totals["avg_turns"] or 0.0), 2),
"by_game_type": [{
"game_type": row["type_label"],
"total": int(row["total"]),
"wins": int(row["wins"]),
"losses": int(row["losses"]),
} for row in by_type],
"recent_games": [{
"game_id": row["game_id"],
"started_at": row["started_at"],
"ended_at": row["ended_at"],
"map": row["map_name"],
"ruleset": row["ruleset_name"],
"game_type": row["game_type"],
"snake": row["your_snake_name"],
"snake_type": row["your_snake_type"],
"snake_version": row["your_snake_version"],
"winner_you": bool(row["winner_you"]),
"final_turn": int(row["final_turn"] or 0),
"status": row["status"],
} for row in recent ],
}
def _list_games_sync(self, limit:int=50) -> list[dict]:
with self._connect() as connection:
rows = connection.execute("""
SELECT game_id, started_at, ended_at, map_name, source, ruleset_name, game_type,
your_snake_name, your_snake_type, your_snake_version,
winner_you, winner_names_json, final_turn, status
FROM games
ORDER BY started_at DESC
LIMIT ?
""",
(max(1, int(limit)),),
).fetchall()
return [{
"game_id": row["game_id"],
"started_at": row["started_at"],
"ended_at": row["ended_at"],
"map": row["map_name"],
"source": row["source"],
"ruleset": row["ruleset_name"],
"game_type": row["game_type"],
"snake": row["your_snake_name"],
"snake_type": row["your_snake_type"],
"snake_version": row["your_snake_version"],
"winner_you": bool(row["winner_you"]),
"winner_names": self._from_json(row["winner_names_json"]) or [],
"final_turn": int(row["final_turn"] or 0),
"status": row["status"],
} for row in rows]
def _get_game_replay_sync(self, game_id:str) -> dict | None:
with self._connect() as connection:
game_row = connection.execute("""
SELECT game_id, started_at, ended_at, width, height, source, map_name,
ruleset_name, ruleset_version, game_type, your_snake_id, your_snake_name,
your_snake_type, your_snake_version,
winner_names_json, winner_you, final_turn, status
FROM games
WHERE game_id = ?
""",
(game_id,),
).fetchone()
if game_row is None:
return None
turn_rows = connection.execute("""
SELECT turn, observed_at, my_move, my_thinking_json,
board_state_json, food_json, hazards_json, you_json
FROM turns
WHERE game_id = ?
ORDER BY turn ASC
""",
(game_id,),
).fetchall()
snake_rows = connection.execute("""
SELECT turn, snake_id, snake_name, health, length, head_x, head_y,
body_json, is_you, inferred_move, latency
FROM snake_turns
WHERE game_id = ?
ORDER BY turn ASC, is_you DESC, snake_name ASC
""",
(game_id,),
).fetchall()
snakes_by_turn:dict[int, list[dict]] = {}
for row in snake_rows:
turn = int(row["turn"])
snakes_by_turn.setdefault(turn, []).append({
"snake_id": row["snake_id"],
"snake_name": row["snake_name"],
"health": row["health"],
"length": row["length"],
"head": {"x": row["head_x"], "y": row["head_y"]},
"body": self._from_json(row["body_json"]) or [],
"is_you": bool(row["is_you"]),
"inferred_move": row["inferred_move"],
"latency": row["latency"],
})
replay_turns = []
for row in turn_rows:
turn_number = int(row["turn"])
replay_turns.append({
"turn": turn_number,
"observed_at": row["observed_at"],
"my_move": row["my_move"],
"my_thinking": self._from_json(row["my_thinking_json"]),
"board": self._from_json(row["board_state_json"]),
"food": self._from_json(row["food_json"]) or [],
"hazards": self._from_json(row["hazards_json"]) or [],
"you": self._from_json(row["you_json"]) or {},
"snakes": snakes_by_turn.get(turn_number, []),
})
return {
"game": {
"game_id": game_row["game_id"],
"started_at": game_row["started_at"],
"ended_at": game_row["ended_at"],
"width": game_row["width"],
"height": game_row["height"],
"source": game_row["source"],
"map": game_row["map_name"],
"ruleset_name": game_row["ruleset_name"],
"ruleset_version": game_row["ruleset_version"],
"game_type": game_row["game_type"],
"your_snake_id": game_row["your_snake_id"],
"your_snake_name": game_row["your_snake_name"],
"your_snake_type": game_row["your_snake_type"],
"your_snake_version": game_row["your_snake_version"],
"winner_names": self._from_json(game_row["winner_names_json"]) or [],
"winner_you": bool(game_row["winner_you"]),
"final_turn": int(game_row["final_turn"] or 0),
"status": game_row["status"],
},
"turns": replay_turns,
}
async def record_game_start(self, game_state:dict, snake_type:str|None=None, snake_version:str|None=None) -> None:
await asyncio.to_thread(self._record_game_start_sync, game_state, snake_type, snake_version)
async def record_turn(self, game_state:dict, my_move:str|None, my_thinking:dict|None=None) -> None:
await asyncio.to_thread(self._record_turn_sync, game_state, my_move, my_thinking)
async def record_game_end(self, game_state:dict) -> None:
await asyncio.to_thread(self._record_game_end_sync, game_state)
async def get_summary(self, recent_limit:int=15) -> dict:
return await asyncio.to_thread(self._get_summary_sync, recent_limit)
async def list_games(self, limit:int=50) -> list[dict]:
return await asyncio.to_thread(self._list_games_sync, limit)
async def finalize_stale_running_games(self, stale_after_seconds:int=600) -> int:
return await asyncio.to_thread(self._finalize_stale_running_games_sync, stale_after_seconds)
async def get_game_replay(self, game_id:str) -> dict|None:
return await asyncio.to_thread(self._get_game_replay_sync, game_id)
async def close(self) -> None:
return None