from quart_common.web.env import env_bool import asyncio, sqlite3, json, os, logging, sys from datetime import datetime, timezone from pathlib import Path logger = logging.getLogger(__name__) if not logger.handlers: _handler = logging.StreamHandler(stream=sys.stdout) _handler.setFormatter(logging.Formatter(fmt="%(levelname)s %(module)s: %(message)s")) logger.addHandler(_handler) logger.propagate = False _ZSTD_EXT = Path(os.environ.get("SQLITE_ZSTD_EXT", "/usr/local/lib/libsqlite_zstd.so")).expanduser().resolve() class GameplayDatabase: def __init__(self, db_path:str, busy_timeout_ms:int=5000): self.db_path = db_path self.busy_timeout_ms = max(1000, int(busy_timeout_ms)) self._zstd_available = False self._initialize_database() def _connect(self) -> sqlite3.Connection: connection = sqlite3.connect( self.db_path, timeout=max(1, self.busy_timeout_ms // 1000), isolation_level=None, ) connection.row_factory = sqlite3.Row if _ZSTD_EXT.exists() and not env_bool('DISABLE_GAMEPLAY_DB_COMPRESSION', True): try: connection.enable_load_extension(True) connection.load_extension(str(_ZSTD_EXT)) self._zstd_available = True except sqlite3.OperationalError as e: logger.warning(f"sqlite-zstd extension skipped: {e}") finally: connection.enable_load_extension(False) connection.execute("PRAGMA foreign_keys = ON") connection.execute("PRAGMA journal_mode = WAL") connection.execute("PRAGMA synchronous = NORMAL") connection.execute("PRAGMA temp_store = MEMORY") connection.execute("PRAGMA journal_size_limit = 1048576") connection.execute(f"PRAGMA busy_timeout = {self.busy_timeout_ms}") return connection def _ensure_auto_vacuum_full(self, connection:sqlite3.Connection) -> None: current = connection.execute("PRAGMA auto_vacuum").fetchone()[0] if current != 1: connection.execute("PRAGMA auto_vacuum = FULL") connection.execute("VACUUM") def _initialize_database(self) -> None: Path(self.db_path).parent.mkdir(parents=True, exist_ok=True) with self._connect() as connection: self._ensure_auto_vacuum_full(connection) connection.executescript(""" CREATE TABLE IF NOT EXISTS games ( game_id TEXT PRIMARY KEY, started_at TEXT NOT NULL, ended_at TEXT, width INTEGER, height INTEGER, source TEXT, map_name TEXT, ruleset_name TEXT, ruleset_version TEXT, your_snake_id TEXT, your_snake_name TEXT, your_snake_type TEXT, your_snake_version TEXT, winner_names_json TEXT, winner_you INTEGER NOT NULL DEFAULT 0, final_turn INTEGER NOT NULL DEFAULT 0, status TEXT NOT NULL DEFAULT 'running' ); CREATE TABLE IF NOT EXISTS turns ( id INTEGER PRIMARY KEY AUTOINCREMENT, game_id TEXT NOT NULL, turn INTEGER NOT NULL, observed_at TEXT NOT NULL, my_move TEXT, my_thinking_json TEXT, board_state_json TEXT NOT NULL, snakes_json TEXT NOT NULL, you_json TEXT NOT NULL, food_json TEXT NOT NULL, hazards_json TEXT NOT NULL, UNIQUE (game_id, turn), FOREIGN KEY (game_id) REFERENCES games(game_id) ON DELETE CASCADE ); CREATE TABLE IF NOT EXISTS snake_turns ( id INTEGER PRIMARY KEY AUTOINCREMENT, game_id TEXT NOT NULL, turn INTEGER NOT NULL, snake_id TEXT NOT NULL, snake_name TEXT, health INTEGER, length INTEGER, head_x INTEGER, head_y INTEGER, body_json TEXT NOT NULL, is_you INTEGER NOT NULL DEFAULT 0, inferred_move TEXT, UNIQUE (game_id, turn, snake_id), FOREIGN KEY (game_id) REFERENCES games(game_id) ON DELETE CASCADE ); """) self._create_indexes_if_tables(connection) self._ensure_column_exists(connection, "turns", "my_thinking_json", "TEXT") self._ensure_column_exists(connection, "games", "your_snake_type", "TEXT") self._ensure_column_exists(connection, "games", "your_snake_version", "TEXT") self._ensure_column_exists(connection, "games", "game_type", "TEXT") self._ensure_column_exists(connection, "snake_turns", "latency", "TEXT") if self._zstd_available: self._enable_zstd_compression(connection) connection.execute("PRAGMA optimize") def _create_indexes_if_tables(self, connection: sqlite3.Connection) -> None: real_tables = { row[0] for row in connection.execute( "SELECT name FROM sqlite_master WHERE type='table'" ).fetchall() } indexes = [ ("idx_turns_game_turn", "turns", "game_id, turn"), ("idx_games_status", "games", "status"), ("idx_snake_turns_game_turn", "snake_turns", "game_id, turn"), ] for idx_name, table, cols in indexes: if table in real_tables: connection.execute(f"CREATE INDEX IF NOT EXISTS {idx_name} ON {table}({cols})") def _ensure_column_exists(self, connection:sqlite3.Connection, table_name:str, column_name:str, column_type:str) -> None: obj = connection.execute( "SELECT type FROM sqlite_master WHERE name = ?", (table_name,) ).fetchone() if obj and obj["type"] == "view": # zstd replaced this table with a view — operate on the underlying compressed table underlying = f"_{table_name}_zstd" exists = connection.execute( "SELECT 1 FROM sqlite_master WHERE name = ? AND type = 'table'", (underlying,) ).fetchone() if not exists: return # nothing we can do without the extension actual_table = underlying else: actual_table = table_name existing = connection.execute(f"PRAGMA table_info({actual_table})").fetchall() if any(row["name"] == column_name for row in existing): return connection.execute(f"ALTER TABLE {actual_table} ADD COLUMN {column_name} {column_type}") def _enable_zstd_compression(self, connection: sqlite3.Connection) -> None: compressed_columns = [ ("turns", "board_state_json"), ("turns", "snakes_json"), ("turns", "you_json"), ("turns", "food_json"), ("turns", "hazards_json"), ("snake_turns", "body_json"), ] for table, column in compressed_columns: try: connection.execute( "SELECT zstd_enable_transparent(?)", [json.dumps({"table": table, "column": column, "compression_level": 6, "dict_chooser": "'a'"})], ) except sqlite3.OperationalError: pass # already enabled connection.execute("SELECT zstd_incremental_maintenance(null, 1)") def _utc_now(self) -> str: return datetime.now(timezone.utc).isoformat() def _parse_utc_timestamp(self, value:str|None) -> datetime|None: if not value: return None normalized = value.strip() if normalized.endswith("Z"): normalized = normalized[:-1] + "+00:00" try: parsed = datetime.fromisoformat(normalized) except ValueError: return None if parsed.tzinfo is None: return parsed.replace(tzinfo=timezone.utc) return parsed.astimezone(timezone.utc) def _to_json(self, payload:object) -> str: return json.dumps(payload, ensure_ascii=False, separators=(",", ":")) def _from_json(self, payload:str|None): if payload is None or payload == "": return None try: return json.loads(payload) except json.JSONDecodeError: return None def _extract_snakes(self, game_state:dict) -> list[dict]: return list(game_state.get("board", {}).get("snakes", [])) def _extract_you(self, game_state:dict) -> dict: return dict(game_state.get("you", {})) def _infer_direction(self, old_head:tuple[int, int]|None, new_head:tuple[int, int]|None) -> str|None: if old_head is None or new_head is None: return None delta_x = new_head[0] - old_head[0] delta_y = new_head[1] - old_head[1] if delta_x == 1 and delta_y == 0: return "right" if delta_x == -1 and delta_y == 0: return "left" if delta_x == 0 and delta_y == 1: return "up" if delta_x == 0 and delta_y == -1: return "down" return None def _derive_game_type(self, board:dict, ruleset:dict) -> str: initial_snake_count = len(board.get("snakes", [])) if initial_snake_count == 2: return "duel" return ruleset.get("name") or "standard" def _record_game_start_sync(self, game_state:dict, snake_type:str|None=None, snake_version:str|None=None) -> None: game = game_state.get("game", {}) board = game_state.get("board", {}) you = self._extract_you(game_state) ruleset = game.get("ruleset", {}) game_type = self._derive_game_type(board, ruleset) with self._connect() as connection: connection.execute(""" INSERT INTO games ( game_id, started_at, width, height, source, map_name, ruleset_name, ruleset_version, your_snake_id, your_snake_name, your_snake_type, your_snake_version, game_type, status ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 'running') ON CONFLICT(game_id) DO UPDATE SET width = excluded.width, height = excluded.height, source = excluded.source, map_name = excluded.map_name, ruleset_name = excluded.ruleset_name, ruleset_version = excluded.ruleset_version, your_snake_id = excluded.your_snake_id, your_snake_name = excluded.your_snake_name, your_snake_type = excluded.your_snake_type, your_snake_version = excluded.your_snake_version, game_type = excluded.game_type, status = 'running' """, ( game.get("id"), self._utc_now(), board.get("width"), board.get("height"), game.get("source"), game.get("map"), ruleset.get("name"), ruleset.get("version"), you.get("id"), you.get("name"), snake_type, snake_version, game_type, ), ) connection.execute("PRAGMA wal_checkpoint(PASSIVE)") connection.execute("PRAGMA optimize") def _record_turn_sync(self, game_state:dict, my_move:str|None, my_thinking:dict|None) -> None: game = game_state.get("game", {}) board = game_state.get("board", {}) snakes = self._extract_snakes(game_state) you = self._extract_you(game_state) game_id = game.get("id") turn = int(game_state.get("turn", 0)) with self._connect() as connection: connection.execute(""" INSERT INTO turns ( game_id, turn, observed_at, my_move, my_thinking_json, board_state_json, snakes_json, you_json, food_json, hazards_json ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(game_id, turn) DO UPDATE SET observed_at = excluded.observed_at, my_move = excluded.my_move, my_thinking_json = excluded.my_thinking_json, board_state_json = excluded.board_state_json, snakes_json = excluded.snakes_json, you_json = excluded.you_json, food_json = excluded.food_json, hazards_json = excluded.hazards_json """, ( game_id, turn, self._utc_now(), my_move, self._to_json(my_thinking) if my_thinking is not None else None, self._to_json(board), self._to_json(snakes), self._to_json(you), self._to_json(board.get("food", [])), self._to_json(board.get("hazards", [])), ), ) previous_positions:dict[str, tuple[int, int]] = {} if turn > 0: previous_rows = connection.execute(""" SELECT snake_id, head_x, head_y FROM snake_turns WHERE game_id = ? AND turn = ? """, (game_id, turn - 1), ).fetchall() previous_positions = { row["snake_id"]: (int(row["head_x"]), int(row["head_y"])) for row in previous_rows if row["head_x"] is not None and row["head_y"] is not None } you_id = you.get("id") for snake in snakes: snake_id = snake.get("id") head = snake.get("head", {}) head_x = head.get("x") head_y = head.get("y") if snake_id is None: continue new_head = ( (int(head_x), int(head_y)) if head_x is not None and head_y is not None else None ) inferred = self._infer_direction( previous_positions.get(snake_id), new_head ) connection.execute(""" INSERT INTO snake_turns ( game_id, turn, snake_id, snake_name, health, length, head_x, head_y, body_json, is_you, inferred_move, latency ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(game_id, turn, snake_id) DO UPDATE SET snake_name = excluded.snake_name, health = excluded.health, length = excluded.length, head_x = excluded.head_x, head_y = excluded.head_y, body_json = excluded.body_json, is_you = excluded.is_you, inferred_move = excluded.inferred_move, latency = excluded.latency """, ( game_id, turn, snake_id, snake.get("name"), snake.get("health"), snake.get("length"), head_x, head_y, self._to_json(snake.get("body", [])), 1 if snake_id == you_id else 0, inferred, snake.get("latency"), ), ) connection.execute(""" UPDATE games SET final_turn = CASE WHEN ? > final_turn THEN ? ELSE final_turn END WHERE game_id = ? """, (turn, turn, game_id), ) def _record_game_end_sync(self, game_state:dict) -> None: game = game_state.get("game", {}) game_id = game.get("id") board = game_state.get("board", {}) snakes = list(board.get("snakes", [])) you = self._extract_you(game_state) winner_names = [snake.get("name") for snake in snakes if snake.get("name")] you_id = you.get("id") winner_you = any(snake.get("id") == you_id for snake in snakes) with self._connect() as connection: connection.execute(""" UPDATE games SET ended_at = ?, winner_names_json = ?, winner_you = ?, final_turn = CASE WHEN ? > final_turn THEN ? ELSE final_turn END, status = 'finished' WHERE game_id = ? """, ( self._utc_now(), self._to_json(winner_names), 1 if winner_you else 0, int(game_state.get("turn", 0)), int(game_state.get("turn", 0)), game_id, ), ) def _finalize_stale_running_games_sync(self, stale_after_seconds:int=600) -> int: threshold = max(60, int(stale_after_seconds)) now_utc = datetime.now(timezone.utc) finalized = 0 with self._connect() as connection: rows = connection.execute(""" SELECT game_id, started_at, final_turn, your_snake_id FROM games WHERE status = 'running' ORDER BY started_at ASC """).fetchall() for row in rows: started_at = self._parse_utc_timestamp(row["started_at"]) if started_at is None: continue age_seconds = (now_utc - started_at).total_seconds() if age_seconds < threshold: continue game_id = row["game_id"] your_snake_id = row["your_snake_id"] final_turn = int(row["final_turn"] or 0) snake_rows = connection.execute(""" SELECT snake_id, snake_name FROM snake_turns WHERE game_id = ? AND turn = ? ORDER BY is_you DESC, snake_name ASC """, (game_id, final_turn), ).fetchall() if len(snake_rows) == 0: latest_turn_row = connection.execute(""" SELECT MAX(turn) AS latest_turn FROM snake_turns WHERE game_id = ? """, (game_id,), ).fetchone() latest_turn = ( latest_turn_row["latest_turn"] if latest_turn_row is not None else None ) if latest_turn is not None: final_turn = int(latest_turn) snake_rows = connection.execute(""" SELECT snake_id, snake_name FROM snake_turns WHERE game_id = ? AND turn = ? ORDER BY is_you DESC, snake_name ASC """, (game_id, final_turn), ).fetchall() survivor_ids = [snake["snake_id"] for snake in snake_rows if snake["snake_id"]] survivor_names = [snake["snake_name"] for snake in snake_rows if snake["snake_name"]] winner_you = bool( your_snake_id and your_snake_id in survivor_ids and len(survivor_ids) == 1 ) update_result = connection.execute(""" UPDATE games SET ended_at = ?, winner_names_json = ?, winner_you = ?, final_turn = CASE WHEN ? > final_turn THEN ? ELSE final_turn END, status = 'finished' WHERE game_id = ? AND status = 'running' """, ( self._utc_now(), self._to_json(survivor_names), 1 if winner_you else 0, final_turn, final_turn, game_id, ), ) if update_result.rowcount > 0: finalized += 1 return finalized def _get_summary_sync(self, recent_limit:int=15) -> dict: with self._connect() as connection: totals = connection.execute(""" SELECT COUNT(*) AS total_games, SUM(CASE WHEN status = 'running' THEN 1 ELSE 0 END) AS running_games, SUM(CASE WHEN status = 'finished' THEN 1 ELSE 0 END) AS finished_games, SUM(CASE WHEN status = 'finished' AND winner_you = 1 THEN 1 ELSE 0 END) AS wins, SUM(CASE WHEN status = 'finished' AND winner_you = 0 THEN 1 ELSE 0 END) AS losses, AVG(CASE WHEN status = 'finished' THEN final_turn ELSE NULL END) AS avg_turns FROM games """ ).fetchone() by_type = connection.execute(""" SELECT COALESCE(game_type, ruleset_name, 'unknown') AS type_label, COUNT(*) AS total, SUM(CASE WHEN status = 'finished' AND winner_you = 1 THEN 1 ELSE 0 END) AS wins, SUM(CASE WHEN status = 'finished' AND winner_you = 0 THEN 1 ELSE 0 END) AS losses FROM games WHERE status = 'finished' GROUP BY type_label ORDER BY total DESC """ ).fetchall() recent = connection.execute(""" SELECT game_id, started_at, ended_at, map_name, ruleset_name, game_type, your_snake_name, your_snake_type, your_snake_version, winner_you, final_turn, status FROM games ORDER BY started_at DESC LIMIT ? """, (max(1, int(recent_limit)),), ).fetchall() return { "total_games": int(totals["total_games"] or 0), "running_games": int(totals["running_games"] or 0), "finished_games": int(totals["finished_games"] or 0), "wins": int(totals["wins"] or 0), "losses": int(totals["losses"] or 0), "avg_turns_finished": round(float(totals["avg_turns"] or 0.0), 2), "by_game_type": [{ "game_type": row["type_label"], "total": int(row["total"]), "wins": int(row["wins"]), "losses": int(row["losses"]), } for row in by_type], "recent_games": [{ "game_id": row["game_id"], "started_at": row["started_at"], "ended_at": row["ended_at"], "map": row["map_name"], "ruleset": row["ruleset_name"], "game_type": row["game_type"], "snake": row["your_snake_name"], "snake_type": row["your_snake_type"], "snake_version": row["your_snake_version"], "winner_you": bool(row["winner_you"]), "final_turn": int(row["final_turn"] or 0), "status": row["status"], } for row in recent ], } def _list_games_sync(self, limit:int=50) -> list[dict]: with self._connect() as connection: rows = connection.execute(""" SELECT game_id, started_at, ended_at, map_name, source, ruleset_name, game_type, your_snake_name, your_snake_type, your_snake_version, winner_you, winner_names_json, final_turn, status FROM games ORDER BY started_at DESC LIMIT ? """, (max(1, int(limit)),), ).fetchall() return [{ "game_id": row["game_id"], "started_at": row["started_at"], "ended_at": row["ended_at"], "map": row["map_name"], "source": row["source"], "ruleset": row["ruleset_name"], "game_type": row["game_type"], "snake": row["your_snake_name"], "snake_type": row["your_snake_type"], "snake_version": row["your_snake_version"], "winner_you": bool(row["winner_you"]), "winner_names": self._from_json(row["winner_names_json"]) or [], "final_turn": int(row["final_turn"] or 0), "status": row["status"], } for row in rows] def _get_game_replay_sync(self, game_id:str) -> dict | None: with self._connect() as connection: game_row = connection.execute(""" SELECT game_id, started_at, ended_at, width, height, source, map_name, ruleset_name, ruleset_version, game_type, your_snake_id, your_snake_name, your_snake_type, your_snake_version, winner_names_json, winner_you, final_turn, status FROM games WHERE game_id = ? """, (game_id,), ).fetchone() if game_row is None: return None turn_rows = connection.execute(""" SELECT turn, observed_at, my_move, my_thinking_json, board_state_json, food_json, hazards_json, you_json FROM turns WHERE game_id = ? ORDER BY turn ASC """, (game_id,), ).fetchall() snake_rows = connection.execute(""" SELECT turn, snake_id, snake_name, health, length, head_x, head_y, body_json, is_you, inferred_move, latency FROM snake_turns WHERE game_id = ? ORDER BY turn ASC, is_you DESC, snake_name ASC """, (game_id,), ).fetchall() snakes_by_turn:dict[int, list[dict]] = {} for row in snake_rows: turn = int(row["turn"]) snakes_by_turn.setdefault(turn, []).append({ "snake_id": row["snake_id"], "snake_name": row["snake_name"], "health": row["health"], "length": row["length"], "head": {"x": row["head_x"], "y": row["head_y"]}, "body": self._from_json(row["body_json"]) or [], "is_you": bool(row["is_you"]), "inferred_move": row["inferred_move"], "latency": row["latency"], }) replay_turns = [] for row in turn_rows: turn_number = int(row["turn"]) replay_turns.append({ "turn": turn_number, "observed_at": row["observed_at"], "my_move": row["my_move"], "my_thinking": self._from_json(row["my_thinking_json"]), "board": self._from_json(row["board_state_json"]), "food": self._from_json(row["food_json"]) or [], "hazards": self._from_json(row["hazards_json"]) or [], "you": self._from_json(row["you_json"]) or {}, "snakes": snakes_by_turn.get(turn_number, []), }) return { "game": { "game_id": game_row["game_id"], "started_at": game_row["started_at"], "ended_at": game_row["ended_at"], "width": game_row["width"], "height": game_row["height"], "source": game_row["source"], "map": game_row["map_name"], "ruleset_name": game_row["ruleset_name"], "ruleset_version": game_row["ruleset_version"], "game_type": game_row["game_type"], "your_snake_id": game_row["your_snake_id"], "your_snake_name": game_row["your_snake_name"], "your_snake_type": game_row["your_snake_type"], "your_snake_version": game_row["your_snake_version"], "winner_names": self._from_json(game_row["winner_names_json"]) or [], "winner_you": bool(game_row["winner_you"]), "final_turn": int(game_row["final_turn"] or 0), "status": game_row["status"], }, "turns": replay_turns, } async def record_game_start(self, game_state:dict, snake_type:str|None=None, snake_version:str|None=None) -> None: await asyncio.to_thread(self._record_game_start_sync, game_state, snake_type, snake_version) async def record_turn(self, game_state:dict, my_move:str|None, my_thinking:dict|None=None) -> None: await asyncio.to_thread(self._record_turn_sync, game_state, my_move, my_thinking) async def record_game_end(self, game_state:dict) -> None: await asyncio.to_thread(self._record_game_end_sync, game_state) async def get_summary(self, recent_limit:int=15) -> dict: return await asyncio.to_thread(self._get_summary_sync, recent_limit) async def list_games(self, limit:int=50) -> list[dict]: return await asyncio.to_thread(self._list_games_sync, limit) async def finalize_stale_running_games(self, stale_after_seconds:int=600) -> int: return await asyncio.to_thread(self._finalize_stale_running_games_sync, stale_after_seconds) async def get_game_replay(self, game_id:str) -> dict|None: return await asyncio.to_thread(self._get_game_replay_sync, game_id) async def close(self) -> None: return None