diff --git a/main.py b/main.py index 168d156..7d8add7 100755 --- a/main.py +++ b/main.py @@ -12,6 +12,8 @@ # To get you started we've included code to prevent your Battlesnake from moving backwards. # For more info see docs.battlesnake.com +from dotenv import load_dotenv + from server.CreateEnvironmentFile import CreateEnvironmentFile from server.bootstrap import build_run_config, build_server_from_env @@ -20,12 +22,15 @@ import os # Start server when `python main.py` is run if __name__ == "__main__": + if os.environ.get("CREATE_ENV_FILE", None): CreateEnvironmentFile.load_dotenv({ "STORE_GAME_HISTORY": True, "DEBUG": True, "SNAKE": "TemplateSnake", }) + else: + load_dotenv() server = build_server_from_env(default_snake_type="TemplateSnake") asyncio.run(server.run(**build_run_config())) diff --git a/server/Server.py b/server/Server.py index 354dd6b..5c70afd 100644 --- a/server/Server.py +++ b/server/Server.py @@ -5,6 +5,7 @@ from snakes import SnakeBuilder from server.database import ( GameplayDatabase, + GameplayBackendBuilder, StorageLoader, ) from server.metrics import ( @@ -29,7 +30,7 @@ from server.services import ( ) class Server: - def __init__(self, data_path:str, snake_type:str, storage_type:str, debug:bool=False, check_tls_security:bool=False, metrics_backend:str='memory', metrics_redis_url:str='redis://localhost:6379/0', metrics_ttl_sec:int|None=None, gameplay_db_enabled:bool=True, gameplay_db_path:str|None=None, gameplay_db_busy_timeout_ms:int=5000): + def __init__(self, data_path:str, snake_type:str, storage_type:str, debug:bool=False, check_tls_security:bool=False, metrics_backend:str='memory', metrics_redis_url:str='redis://localhost:6379/0', metrics_ttl_sec:int|None=None, gameplay_db_enabled:bool=True, gameplay_db_backend:str='sqlite', gameplay_db_path:str|None=None, gameplay_db_busy_timeout_ms:int=5000, gameplay_db_pg_dsn:str|None=None): self.debug = debug self.data_path = data_path @@ -74,8 +75,12 @@ class Server: if gameplay_db_enabled: db_path = gameplay_db_path or os.path.join(data_path, 'data', 'database', 'gameplay.sqlite3') self.gameplay_database = GameplayDatabase( - db_path=db_path, - busy_timeout_ms=gameplay_db_busy_timeout_ms, + backend=GameplayBackendBuilder.build( + backend=gameplay_db_backend, + db_path=db_path, + busy_timeout_ms=gameplay_db_busy_timeout_ms, + pg_dsn=gameplay_db_pg_dsn, + ) ) self.gameplay_tracking = GameplayTrackingService( @@ -115,12 +120,16 @@ class Server: if self._startup_worker_metrics_cleared: return self._startup_worker_metrics_cleared = True + if env_bool('METRICS_CLEAR_WORKERS_ON_STARTUP', True): should_clear = await self.metrics_collector.should_clear_worker_metrics_on_startup(env_int('METRICS_STARTUP_CLEANUP_LOCK_TTL_SEC', 300)) if should_clear: await self.metrics_collector.clear_worker_metrics() await self.dashboard_events_service.start_listener() + if self.gameplay_database is not None: + await self.gameplay_database.initialize() + @self.app.after_serving async def shutdown_state_storage(): await self.dashboard_events_service.stop_listener() diff --git a/server/bootstrap.py b/server/bootstrap.py index e7de5b1..59eddcf 100644 --- a/server/bootstrap.py +++ b/server/bootstrap.py @@ -23,11 +23,13 @@ def build_server_from_env(default_snake_type:str) -> Server: metrics_ttl_sec = env_int('METRICS_TTL_SEC', 900) if metrics_ttl_sec_raw is not None else None gameplay_db_enabled = env_bool('GAMEPLAY_DB_ENABLED', True) + gameplay_db_backend = os.environ.get('GAMEPLAY_DB_BACKEND', 'sqlite') gameplay_db_path = os.environ.get( 'GAMEPLAY_DB_PATH', os.path.join(data_path, 'data', 'database', 'gameplay.sqlite3'), ) gameplay_db_busy_timeout_ms = env_int('GAMEPLAY_DB_BUSY_TIMEOUT_MS', 5000) + gameplay_db_pg_dsn = os.environ.get('GAMEPLAY_DB_PG_DSN', None) server = Server( data_path=data_path, @@ -39,8 +41,10 @@ def build_server_from_env(default_snake_type:str) -> Server: metrics_redis_url=metrics_redis_url, metrics_ttl_sec=metrics_ttl_sec, gameplay_db_enabled=gameplay_db_enabled, + gameplay_db_backend=gameplay_db_backend, gameplay_db_path=gameplay_db_path, gameplay_db_busy_timeout_ms=gameplay_db_busy_timeout_ms, + gameplay_db_pg_dsn=gameplay_db_pg_dsn, ) if env_bool('STORE_GAME_HISTORY'): diff --git a/server/database/GameplayDatabase.py b/server/database/GameplayDatabase.py index 7ea5807..c23cc20 100644 --- a/server/database/GameplayDatabase.py +++ b/server/database/GameplayDatabase.py @@ -1,724 +1,37 @@ -from quart_common.web.env import env_bool - -import asyncio, sqlite3, json, os, logging, sys -from datetime import datetime, timezone -from pathlib import Path - -logger = logging.getLogger(__name__) -if not logger.handlers: - _handler = logging.StreamHandler(stream=sys.stdout) - _handler.setFormatter(logging.Formatter(fmt="%(levelname)s %(module)s: %(message)s")) - logger.addHandler(_handler) - logger.propagate = False - -_ZSTD_EXT = Path(os.environ.get("SQLITE_ZSTD_EXT", "/usr/local/lib/libsqlite_zstd.so")).expanduser().resolve() +from .backend.Template import GameplayBackendTemplate class GameplayDatabase: - def __init__(self, db_path:str, busy_timeout_ms:int=5000): - self.db_path = db_path - self.busy_timeout_ms = max(1000, int(busy_timeout_ms)) - self._zstd_available = False - self._initialize_database() + """Thin facade that delegates all operations to a GameplayBackendTemplate. - def _connect(self) -> sqlite3.Connection: - connection = sqlite3.connect( - self.db_path, - timeout=max(1, self.busy_timeout_ms // 1000), - isolation_level=None, - ) - connection.row_factory = sqlite3.Row + Construct via GameplayBackendBuilder.build() or pass a backend directly. + """ - if _ZSTD_EXT.exists() and not env_bool('DISABLE_GAMEPLAY_DB_COMPRESSION', True): - try: - connection.enable_load_extension(True) - connection.load_extension(str(_ZSTD_EXT)) - self._zstd_available = True - except sqlite3.OperationalError as e: - logger.warning(f"sqlite-zstd extension skipped: {e}") - finally: - connection.enable_load_extension(False) + def __init__(self, backend:GameplayBackendTemplate): + self._backend = backend - connection.execute("PRAGMA foreign_keys = ON") - connection.execute("PRAGMA journal_mode = WAL") - connection.execute("PRAGMA synchronous = NORMAL") - connection.execute("PRAGMA temp_store = MEMORY") - connection.execute("PRAGMA journal_size_limit = 1048576") - connection.execute(f"PRAGMA busy_timeout = {self.busy_timeout_ms}") - return connection - - def _ensure_auto_vacuum_full(self, connection:sqlite3.Connection) -> None: - current = connection.execute("PRAGMA auto_vacuum").fetchone()[0] - if current != 1: - connection.execute("PRAGMA auto_vacuum = FULL") - connection.execute("VACUUM") - - def _initialize_database(self) -> None: - Path(self.db_path).parent.mkdir(parents=True, exist_ok=True) - with self._connect() as connection: - self._ensure_auto_vacuum_full(connection) - connection.executescript(""" - CREATE TABLE IF NOT EXISTS games ( - game_id TEXT PRIMARY KEY, - started_at TEXT NOT NULL, - ended_at TEXT, - width INTEGER, - height INTEGER, - source TEXT, - map_name TEXT, - ruleset_name TEXT, - ruleset_version TEXT, - your_snake_id TEXT, - your_snake_name TEXT, - your_snake_type TEXT, - your_snake_version TEXT, - winner_names_json TEXT, - winner_you INTEGER NOT NULL DEFAULT 0, - final_turn INTEGER NOT NULL DEFAULT 0, - status TEXT NOT NULL DEFAULT 'running' - ); - - CREATE TABLE IF NOT EXISTS turns ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - game_id TEXT NOT NULL, - turn INTEGER NOT NULL, - observed_at TEXT NOT NULL, - my_move TEXT, - my_thinking_json TEXT, - board_state_json TEXT NOT NULL, - snakes_json TEXT NOT NULL, - you_json TEXT NOT NULL, - food_json TEXT NOT NULL, - hazards_json TEXT NOT NULL, - UNIQUE (game_id, turn), - FOREIGN KEY (game_id) REFERENCES games(game_id) ON DELETE CASCADE - ); - - CREATE TABLE IF NOT EXISTS snake_turns ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - game_id TEXT NOT NULL, - turn INTEGER NOT NULL, - snake_id TEXT NOT NULL, - snake_name TEXT, - health INTEGER, - length INTEGER, - head_x INTEGER, - head_y INTEGER, - body_json TEXT NOT NULL, - is_you INTEGER NOT NULL DEFAULT 0, - inferred_move TEXT, - UNIQUE (game_id, turn, snake_id), - FOREIGN KEY (game_id) REFERENCES games(game_id) ON DELETE CASCADE - ); - - """) - self._create_indexes_if_tables(connection) - self._ensure_column_exists(connection, "turns", "my_thinking_json", "TEXT") - self._ensure_column_exists(connection, "games", "your_snake_type", "TEXT") - self._ensure_column_exists(connection, "games", "your_snake_version", "TEXT") - self._ensure_column_exists(connection, "games", "game_type", "TEXT") - self._ensure_column_exists(connection, "snake_turns", "latency", "TEXT") - if self._zstd_available: - self._enable_zstd_compression(connection) - connection.execute("PRAGMA optimize") - - def _create_indexes_if_tables(self, connection:sqlite3.Connection) -> None: - real_tables = { - row[0] for row in connection.execute( - "SELECT name FROM sqlite_master WHERE type='table'" - ).fetchall() - } - indexes = [ - ("idx_turns_game_turn", "turns", "game_id, turn"), - ("idx_games_status", "games", "status"), - ("idx_snake_turns_game_turn", "snake_turns", "game_id, turn"), - ] - for idx_name, table, cols in indexes: - if table in real_tables: - connection.execute(f"CREATE INDEX IF NOT EXISTS {idx_name} ON {table}({cols})") - - def _ensure_column_exists(self, connection:sqlite3.Connection, table_name:str, column_name:str, column_type:str) -> None: - obj = connection.execute( - "SELECT type FROM sqlite_master WHERE name = ?", (table_name,) - ).fetchone() - - if obj and obj["type"] == "view": - # zstd replaced this table with a view — operate on the underlying compressed table - underlying = f"_{table_name}_zstd" - exists = connection.execute( - "SELECT 1 FROM sqlite_master WHERE name = ? AND type = 'table'", (underlying,) - ).fetchone() - if not exists: - return # nothing we can do without the extension - actual_table = underlying - else: - actual_table = table_name - - existing = connection.execute(f"PRAGMA table_info({actual_table})").fetchall() - if any(row["name"] == column_name for row in existing): - return - - connection.execute(f"ALTER TABLE {actual_table} ADD COLUMN {column_name} {column_type}") - - def _enable_zstd_compression(self, connection:sqlite3.Connection) -> None: - compressed_columns = [ - ("turns", "board_state_json"), - ("turns", "snakes_json"), - ("turns", "you_json"), - ("turns", "food_json"), - ("turns", "hazards_json"), - ("snake_turns", "body_json"), - ] - for table, column in compressed_columns: - try: - connection.execute( - "SELECT zstd_enable_transparent(?)", - [json.dumps({"table": table, "column": column, "compression_level": 6, "dict_chooser": "'a'"})], - ) - except sqlite3.OperationalError: - pass # already enabled - - connection.execute("SELECT zstd_incremental_maintenance(null, 1)") - - def _utc_now(self) -> str: - return datetime.now(timezone.utc).isoformat() - - def _parse_utc_timestamp(self, value:str|None) -> datetime|None: - if not value: - return None - normalized = value.strip() - if normalized.endswith("Z"): - normalized = normalized[:-1] + "+00:00" - try: - parsed = datetime.fromisoformat(normalized) - except ValueError: - return None - if parsed.tzinfo is None: - return parsed.replace(tzinfo=timezone.utc) - return parsed.astimezone(timezone.utc) - - def _to_json(self, payload:object) -> str: - return json.dumps(payload, ensure_ascii=False, separators=(",", ":")) - - def _from_json(self, payload:str|None): - if payload is None or payload == "": - return None - try: - return json.loads(payload) - except json.JSONDecodeError: - return None - - def _extract_snakes(self, game_state:dict) -> list[dict]: - return list(game_state.get("board", {}).get("snakes", [])) - - def _extract_you(self, game_state:dict) -> dict: - return dict(game_state.get("you", {})) - - def _infer_direction(self, old_head:tuple[int, int]|None, new_head:tuple[int, int]|None) -> str|None: - if old_head is None or new_head is None: - return None - - delta_x = new_head[0] - old_head[0] - delta_y = new_head[1] - old_head[1] - if delta_x == 1 and delta_y == 0: - return "right" - if delta_x == -1 and delta_y == 0: - return "left" - if delta_x == 0 and delta_y == 1: - return "up" - if delta_x == 0 and delta_y == -1: - return "down" - return None - - def _derive_game_type(self, board:dict, ruleset:dict) -> str: - initial_snake_count = len(board.get("snakes", [])) - if initial_snake_count == 2: - return "duel" - return ruleset.get("name") or "standard" - - def _record_game_start_sync(self, game_state:dict, snake_type:str|None=None, snake_version:str|None=None) -> None: - game = game_state.get("game", {}) - board = game_state.get("board", {}) - you = self._extract_you(game_state) - ruleset = game.get("ruleset", {}) - game_type = self._derive_game_type(board, ruleset) - - with self._connect() as connection: - connection.execute(""" - INSERT INTO games ( - game_id, started_at, width, height, source, map_name, - ruleset_name, ruleset_version, your_snake_id, your_snake_name, - your_snake_type, your_snake_version, game_type, status - ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 'running') - ON CONFLICT(game_id) DO UPDATE SET - width = excluded.width, - height = excluded.height, - source = excluded.source, - map_name = excluded.map_name, - ruleset_name = excluded.ruleset_name, - ruleset_version = excluded.ruleset_version, - your_snake_id = excluded.your_snake_id, - your_snake_name = excluded.your_snake_name, - your_snake_type = excluded.your_snake_type, - your_snake_version = excluded.your_snake_version, - game_type = excluded.game_type, - status = 'running' - """, - ( - game.get("id"), - self._utc_now(), - board.get("width"), - board.get("height"), - game.get("source"), - game.get("map"), - ruleset.get("name"), - ruleset.get("version"), - you.get("id"), - you.get("name"), - snake_type, - snake_version, - game_type, - ), - ) - connection.execute("PRAGMA wal_checkpoint(PASSIVE)") - connection.execute("PRAGMA optimize") - - def _record_turn_sync(self, game_state:dict, my_move:str|None, my_thinking:dict|None) -> None: - game = game_state.get("game", {}) - board = game_state.get("board", {}) - snakes = self._extract_snakes(game_state) - you = self._extract_you(game_state) - game_id = game.get("id") - turn = int(game_state.get("turn", 0)) - - with self._connect() as connection: - connection.execute(""" - INSERT INTO turns ( - game_id, turn, observed_at, my_move, my_thinking_json, - board_state_json, snakes_json, you_json, food_json, hazards_json - ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) - ON CONFLICT(game_id, turn) DO UPDATE SET - observed_at = excluded.observed_at, - my_move = excluded.my_move, - my_thinking_json = excluded.my_thinking_json, - board_state_json = excluded.board_state_json, - snakes_json = excluded.snakes_json, - you_json = excluded.you_json, - food_json = excluded.food_json, - hazards_json = excluded.hazards_json - """, - ( - game_id, - turn, - self._utc_now(), - my_move, - self._to_json(my_thinking) if my_thinking is not None else None, - self._to_json(board), - self._to_json(snakes), - self._to_json(you), - self._to_json(board.get("food", [])), - self._to_json(board.get("hazards", [])), - ), - ) - - previous_positions:dict[str, tuple[int, int]] = {} - if turn > 0: - previous_rows = connection.execute(""" - SELECT snake_id, head_x, head_y - FROM snake_turns - WHERE game_id = ? AND turn = ? - """, - (game_id, turn - 1), - ).fetchall() - - previous_positions = { - row["snake_id"]: (int(row["head_x"]), int(row["head_y"])) - for row in previous_rows - if row["head_x"] is not None and row["head_y"] is not None - } - - you_id = you.get("id") - for snake in snakes: - snake_id = snake.get("id") - head = snake.get("head", {}) - head_x = head.get("x") - head_y = head.get("y") - if snake_id is None: - continue - - new_head = ( - (int(head_x), int(head_y)) - if head_x is not None and head_y is not None - else None - ) - inferred = self._infer_direction( - previous_positions.get(snake_id), new_head - ) - - connection.execute(""" - INSERT INTO snake_turns ( - game_id, turn, snake_id, snake_name, health, length, - head_x, head_y, body_json, is_you, inferred_move, latency - ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) - ON CONFLICT(game_id, turn, snake_id) DO UPDATE SET - snake_name = excluded.snake_name, - health = excluded.health, - length = excluded.length, - head_x = excluded.head_x, - head_y = excluded.head_y, - body_json = excluded.body_json, - is_you = excluded.is_you, - inferred_move = excluded.inferred_move, - latency = excluded.latency - """, - ( - game_id, - turn, - snake_id, - snake.get("name"), - snake.get("health"), - snake.get("length"), - head_x, - head_y, - self._to_json(snake.get("body", [])), - 1 if snake_id == you_id else 0, - inferred, - snake.get("latency"), - ), - ) - - connection.execute(""" - UPDATE games - SET final_turn = CASE WHEN ? > final_turn THEN ? ELSE final_turn END - WHERE game_id = ? - """, - (turn, turn, game_id), - ) - - def _record_game_end_sync(self, game_state:dict) -> None: - game = game_state.get("game", {}) - game_id = game.get("id") - board = game_state.get("board", {}) - snakes = list(board.get("snakes", [])) - you = self._extract_you(game_state) - winner_names = [snake.get("name") for snake in snakes if snake.get("name")] - you_id = you.get("id") - winner_you = any(snake.get("id") == you_id for snake in snakes) - - with self._connect() as connection: - connection.execute(""" - UPDATE games - SET ended_at = ?, - winner_names_json = ?, - winner_you = ?, - final_turn = CASE WHEN ? > final_turn THEN ? ELSE final_turn END, - status = 'finished' - WHERE game_id = ? - """, - ( - self._utc_now(), - self._to_json(winner_names), - 1 if winner_you else 0, - int(game_state.get("turn", 0)), - int(game_state.get("turn", 0)), - game_id, - ), - ) - - def _finalize_stale_running_games_sync(self, stale_after_seconds:int=600) -> int: - threshold = max(60, int(stale_after_seconds)) - now_utc = datetime.now(timezone.utc) - finalized = 0 - - with self._connect() as connection: - rows = connection.execute(""" - SELECT game_id, started_at, final_turn, your_snake_id - FROM games - WHERE status = 'running' - ORDER BY started_at ASC - """).fetchall() - - for row in rows: - started_at = self._parse_utc_timestamp(row["started_at"]) - if started_at is None: - continue - age_seconds = (now_utc - started_at).total_seconds() - if age_seconds < threshold: - continue - - game_id = row["game_id"] - your_snake_id = row["your_snake_id"] - final_turn = int(row["final_turn"] or 0) - - snake_rows = connection.execute(""" - SELECT snake_id, snake_name - FROM snake_turns - WHERE game_id = ? AND turn = ? - ORDER BY is_you DESC, snake_name ASC - """, - (game_id, final_turn), - ).fetchall() - - if len(snake_rows) == 0: - latest_turn_row = connection.execute(""" - SELECT MAX(turn) AS latest_turn - FROM snake_turns - WHERE game_id = ? - """, - (game_id,), - ).fetchone() - latest_turn = ( - latest_turn_row["latest_turn"] - if latest_turn_row is not None - else None - ) - if latest_turn is not None: - final_turn = int(latest_turn) - snake_rows = connection.execute(""" - SELECT snake_id, snake_name - FROM snake_turns - WHERE game_id = ? AND turn = ? - ORDER BY is_you DESC, snake_name ASC - """, - (game_id, final_turn), - ).fetchall() - - survivor_ids = [snake["snake_id"] for snake in snake_rows if snake["snake_id"]] - survivor_names = [snake["snake_name"] for snake in snake_rows if snake["snake_name"]] - winner_you = bool( - your_snake_id - and your_snake_id in survivor_ids - and len(survivor_ids) == 1 - ) - - update_result = connection.execute(""" - UPDATE games - SET ended_at = ?, - winner_names_json = ?, - winner_you = ?, - final_turn = CASE WHEN ? > final_turn THEN ? ELSE final_turn END, - status = 'finished' - WHERE game_id = ? AND status = 'running' - """, - ( - self._utc_now(), - self._to_json(survivor_names), - 1 if winner_you else 0, - final_turn, - final_turn, - game_id, - ), - ) - if update_result.rowcount > 0: - finalized += 1 - - return finalized - - def _get_summary_sync(self, recent_limit:int=15) -> dict: - with self._connect() as connection: - totals = connection.execute(""" - SELECT - COUNT(*) AS total_games, - SUM(CASE WHEN status = 'running' THEN 1 ELSE 0 END) AS running_games, - SUM(CASE WHEN status = 'finished' THEN 1 ELSE 0 END) AS finished_games, - SUM(CASE WHEN status = 'finished' AND winner_you = 1 THEN 1 ELSE 0 END) AS wins, - SUM(CASE WHEN status = 'finished' AND winner_you = 0 THEN 1 ELSE 0 END) AS losses, - AVG(CASE WHEN status = 'finished' THEN final_turn ELSE NULL END) AS avg_turns - FROM games - """ - ).fetchone() - - by_type = connection.execute(""" - SELECT - COALESCE(game_type, ruleset_name, 'unknown') AS type_label, - COUNT(*) AS total, - SUM(CASE WHEN status = 'finished' AND winner_you = 1 THEN 1 ELSE 0 END) AS wins, - SUM(CASE WHEN status = 'finished' AND winner_you = 0 THEN 1 ELSE 0 END) AS losses - FROM games - WHERE status = 'finished' - GROUP BY type_label - ORDER BY total DESC - """ - ).fetchall() - - recent = connection.execute(""" - SELECT game_id, started_at, ended_at, map_name, ruleset_name, game_type, - your_snake_name, your_snake_type, your_snake_version, winner_you, final_turn, status - FROM games - ORDER BY started_at DESC - LIMIT ? - """, - (max(1, int(recent_limit)),), - ).fetchall() - - return { - "total_games": int(totals["total_games"] or 0), - "running_games": int(totals["running_games"] or 0), - "finished_games": int(totals["finished_games"] or 0), - "wins": int(totals["wins"] or 0), - "losses": int(totals["losses"] or 0), - "avg_turns_finished": round(float(totals["avg_turns"] or 0.0), 2), - "by_game_type": [{ - "game_type": row["type_label"], - "total": int(row["total"]), - "wins": int(row["wins"]), - "losses": int(row["losses"]), - } for row in by_type], - "recent_games": [{ - "game_id": row["game_id"], - "started_at": row["started_at"], - "ended_at": row["ended_at"], - "map": row["map_name"], - "ruleset": row["ruleset_name"], - "game_type": row["game_type"], - "snake": row["your_snake_name"], - "snake_type": row["your_snake_type"], - "snake_version": row["your_snake_version"], - "winner_you": bool(row["winner_you"]), - "final_turn": int(row["final_turn"] or 0), - "status": row["status"], - } for row in recent ], - } - - def _list_games_sync(self, limit:int=50) -> list[dict]: - with self._connect() as connection: - rows = connection.execute(""" - SELECT game_id, started_at, ended_at, map_name, source, ruleset_name, game_type, - your_snake_name, your_snake_type, your_snake_version, - winner_you, winner_names_json, final_turn, status - FROM games - ORDER BY started_at DESC - LIMIT ? - """, - (max(1, int(limit)),), - ).fetchall() - - return [{ - "game_id": row["game_id"], - "started_at": row["started_at"], - "ended_at": row["ended_at"], - "map": row["map_name"], - "source": row["source"], - "ruleset": row["ruleset_name"], - "game_type": row["game_type"], - "snake": row["your_snake_name"], - "snake_type": row["your_snake_type"], - "snake_version": row["your_snake_version"], - "winner_you": bool(row["winner_you"]), - "winner_names": self._from_json(row["winner_names_json"]) or [], - "final_turn": int(row["final_turn"] or 0), - "status": row["status"], - } for row in rows] - - def _get_game_replay_sync(self, game_id:str) -> dict | None: - with self._connect() as connection: - game_row = connection.execute(""" - SELECT game_id, started_at, ended_at, width, height, source, map_name, - ruleset_name, ruleset_version, game_type, your_snake_id, your_snake_name, - your_snake_type, your_snake_version, - winner_names_json, winner_you, final_turn, status - FROM games - WHERE game_id = ? - """, - (game_id,), - ).fetchone() - - if game_row is None: - return None - - turn_rows = connection.execute(""" - SELECT turn, observed_at, my_move, my_thinking_json, - board_state_json, food_json, hazards_json, you_json - FROM turns - WHERE game_id = ? - ORDER BY turn ASC - """, - (game_id,), - ).fetchall() - - snake_rows = connection.execute(""" - SELECT turn, snake_id, snake_name, health, length, head_x, head_y, - body_json, is_you, inferred_move, latency - FROM snake_turns - WHERE game_id = ? - ORDER BY turn ASC, is_you DESC, snake_name ASC - """, - (game_id,), - ).fetchall() - - snakes_by_turn:dict[int, list[dict]] = {} - for row in snake_rows: - turn = int(row["turn"]) - snakes_by_turn.setdefault(turn, []).append({ - "snake_id": row["snake_id"], - "snake_name": row["snake_name"], - "health": row["health"], - "length": row["length"], - "head": {"x": row["head_x"], "y": row["head_y"]}, - "body": self._from_json(row["body_json"]) or [], - "is_you": bool(row["is_you"]), - "inferred_move": row["inferred_move"], - "latency": row["latency"], - }) - - replay_turns = [] - for row in turn_rows: - turn_number = int(row["turn"]) - replay_turns.append({ - "turn": turn_number, - "observed_at": row["observed_at"], - "my_move": row["my_move"], - "my_thinking": self._from_json(row["my_thinking_json"]), - "board": self._from_json(row["board_state_json"]), - "food": self._from_json(row["food_json"]) or [], - "hazards": self._from_json(row["hazards_json"]) or [], - "you": self._from_json(row["you_json"]) or {}, - "snakes": snakes_by_turn.get(turn_number, []), - }) - - return { - "game": { - "game_id": game_row["game_id"], - "started_at": game_row["started_at"], - "ended_at": game_row["ended_at"], - "width": game_row["width"], - "height": game_row["height"], - "source": game_row["source"], - "map": game_row["map_name"], - "ruleset_name": game_row["ruleset_name"], - "ruleset_version": game_row["ruleset_version"], - "game_type": game_row["game_type"], - "your_snake_id": game_row["your_snake_id"], - "your_snake_name": game_row["your_snake_name"], - "your_snake_type": game_row["your_snake_type"], - "your_snake_version": game_row["your_snake_version"], - "winner_names": self._from_json(game_row["winner_names_json"]) or [], - "winner_you": bool(game_row["winner_you"]), - "final_turn": int(game_row["final_turn"] or 0), - "status": game_row["status"], - }, - "turns": replay_turns, - } + async def initialize(self) -> None: + await self._backend.initialize() async def record_game_start(self, game_state:dict, snake_type:str|None=None, snake_version:str|None=None) -> None: - await asyncio.to_thread(self._record_game_start_sync, game_state, snake_type, snake_version) + await self._backend.record_game_start(game_state, snake_type, snake_version) async def record_turn(self, game_state:dict, my_move:str|None, my_thinking:dict|None=None) -> None: - await asyncio.to_thread(self._record_turn_sync, game_state, my_move, my_thinking) + await self._backend.record_turn(game_state, my_move, my_thinking) async def record_game_end(self, game_state:dict) -> None: - await asyncio.to_thread(self._record_game_end_sync, game_state) + await self._backend.record_game_end(game_state) async def get_summary(self, recent_limit:int=15) -> dict: - return await asyncio.to_thread(self._get_summary_sync, recent_limit) + return await self._backend.get_summary(recent_limit) async def list_games(self, limit:int=50) -> list[dict]: - return await asyncio.to_thread(self._list_games_sync, limit) + return await self._backend.list_games(limit) async def finalize_stale_running_games(self, stale_after_seconds:int=600) -> int: - return await asyncio.to_thread(self._finalize_stale_running_games_sync, stale_after_seconds) + return await self._backend.finalize_stale_running_games(stale_after_seconds) async def get_game_replay(self, game_id:str) -> dict|None: - return await asyncio.to_thread(self._get_game_replay_sync, game_id) + return await self._backend.get_game_replay(game_id) async def close(self) -> None: - return None + await self._backend.close() diff --git a/server/database/__init__.py b/server/database/__init__.py index 892085c..f495916 100644 --- a/server/database/__init__.py +++ b/server/database/__init__.py @@ -1,4 +1,6 @@ from .GameplayDatabase import GameplayDatabase +from .backend import GameplayBackendBuilder + from .LocalStorage import LocalStorage from .EdgeDB import EdgeDB diff --git a/server/database/backend/PostgresqlGameplayBackend.py b/server/database/backend/PostgresqlGameplayBackend.py new file mode 100644 index 0000000..8a2af76 --- /dev/null +++ b/server/database/backend/PostgresqlGameplayBackend.py @@ -0,0 +1,836 @@ +"""PostgreSQL gameplay backend using asyncpg. + +JSON columns use the JSONB type so PostgreSQL stores them in a binary, +decomposed format and automatically compresses large values via TOAST +(Oversized-Attribute Storage Technique). No application-level +serialisation/deserialisation round-trip is needed for reads — asyncpg +decodes JSONB rows directly into Python dicts/lists. + +Connection: pass a DSN via the `dsn` constructor argument, e.g. + postgresql://user:password@host:5432/dbname + +or set GAMEPLAY_DB_PG_DSN in the environment. +""" + +import asyncio, json, logging, sqlite3, sys +from datetime import datetime, timezone +from pathlib import Path +from urllib.parse import urlparse, urlunparse + +from .Template import GameplayBackendTemplate + +logger = logging.getLogger(__name__) +if not logger.handlers: + _handler = logging.StreamHandler(stream=sys.stdout) + _handler.setFormatter(logging.Formatter(fmt="%(levelname)s %(module)s: %(message)s")) + logger.addHandler(_handler) + logger.propagate = False + +# DDL --------------------------------------------------------------------- # + +_DDL = """ +CREATE TABLE IF NOT EXISTS games ( + game_id TEXT PRIMARY KEY, + started_at TIMESTAMPTZ NOT NULL, + ended_at TIMESTAMPTZ, + width INTEGER, + height INTEGER, + source TEXT, + map_name TEXT, + ruleset_name TEXT, + ruleset_version TEXT, + your_snake_id TEXT, + your_snake_name TEXT, + your_snake_type TEXT, + your_snake_version TEXT, + game_type TEXT, + winner_name TEXT, + winner_you BOOLEAN NOT NULL DEFAULT FALSE, + final_turn INTEGER NOT NULL DEFAULT 0, + status TEXT NOT NULL DEFAULT 'running' +); + +CREATE TABLE IF NOT EXISTS turns ( + id BIGSERIAL PRIMARY KEY, + game_id TEXT NOT NULL REFERENCES games(game_id) ON DELETE CASCADE, + turn INTEGER NOT NULL, + observed_at TIMESTAMPTZ NOT NULL, + my_move TEXT, + my_thinking JSONB, + board_state JSONB NOT NULL, + snakes JSONB NOT NULL, + you JSONB NOT NULL, + food JSONB NOT NULL, + hazards JSONB NOT NULL, + UNIQUE (game_id, turn) +); + +CREATE TABLE IF NOT EXISTS snake_turns ( + id BIGSERIAL PRIMARY KEY, + game_id TEXT NOT NULL REFERENCES games(game_id) ON DELETE CASCADE, + turn INTEGER NOT NULL, + snake_id TEXT NOT NULL, + snake_name TEXT, + health INTEGER, + length INTEGER, + head_x INTEGER, + head_y INTEGER, + body JSONB NOT NULL, + is_you BOOLEAN NOT NULL DEFAULT FALSE, + inferred_move TEXT, + latency TEXT, + UNIQUE (game_id, turn, snake_id) +); + +CREATE INDEX IF NOT EXISTS idx_turns_game_turn ON turns(game_id, turn); +CREATE INDEX IF NOT EXISTS idx_games_status ON games(status); +CREATE INDEX IF NOT EXISTS idx_snake_turns_game_turn ON snake_turns(game_id, turn); +""" + +# Schema evolution: add new columns to existing tables (idempotent). +_ALTER_DDL = """ +ALTER TABLE games ADD COLUMN IF NOT EXISTS game_type TEXT; +ALTER TABLE games ADD COLUMN IF NOT EXISTS your_snake_type TEXT; +ALTER TABLE games ADD COLUMN IF NOT EXISTS your_snake_version TEXT; +ALTER TABLE games ADD COLUMN IF NOT EXISTS winner_name TEXT; +ALTER TABLE turns ADD COLUMN IF NOT EXISTS my_thinking JSONB; +ALTER TABLE snake_turns ADD COLUMN IF NOT EXISTS latency TEXT; +""" + +# Force TOAST compression on the large JSONB columns so that even +# moderately-sized payloads get compressed on-disk. +_TOAST_DDL = """ +ALTER TABLE turns ALTER COLUMN board_state SET STORAGE EXTENDED; +ALTER TABLE turns ALTER COLUMN snakes SET STORAGE EXTENDED; +ALTER TABLE turns ALTER COLUMN you SET STORAGE EXTENDED; +ALTER TABLE turns ALTER COLUMN food SET STORAGE EXTENDED; +ALTER TABLE turns ALTER COLUMN hazards SET STORAGE EXTENDED; +ALTER TABLE snake_turns ALTER COLUMN body SET STORAGE EXTENDED; +""" + +class PostgresqlGameplayBackend(GameplayBackendTemplate): + """Async PostgreSQL backend. A connection pool is created lazily on the + first method call and reused for the lifetime of the object. + + Requires: pip install asyncpg + """ + + def __init__(self, dsn:str, min_size:int=1, max_size:int=5, sqlite_migration_path:str|None=None): + self._dsn = dsn + self._min_size = min_size + self._max_size = max_size + self._sqlite_migration_path = sqlite_migration_path + self._pool = None # asyncpg.Pool, typed at runtime + + # ── DSN normalisation ────────────────────────────────────────────────────── + + _DEFAULT_DB_NAME = "battlesnake" + + @classmethod + def _ensure_db_name(cls, dsn:str) -> str: + """Return *dsn* with a database name appended when none is present. + + A DSN has no database name when its path component is empty or ``/``. + In that case ``battlesnake`` is appended so asyncpg gets a complete + connection string without the caller having to remember to add one. + """ + parsed = urlparse(dsn) + db = parsed.path.lstrip("/") + if db: + return dsn + + new_path = f"/{cls._DEFAULT_DB_NAME}" + return urlunparse(parsed._replace(path=new_path)) + + # ── pool / schema ────────────────────────────────────────────────────────── + + async def initialize(self) -> None: + """Eagerly create the connection pool on startup so schema init and + SQLite migration run immediately rather than on the first game request.""" + await self._get_pool() + + async def _get_pool(self): + if self._pool is None: + try: + import asyncpg # noqa: PLC0415 + except ImportError as exc: + raise ImportError( + "asyncpg is required for the PostgreSQL gameplay backend. " + "Install it with: pip install asyncpg" + ) from exc + + target_dsn = self._ensure_db_name(self._dsn) + await self._ensure_database_exists(asyncpg, target_dsn) + + async def _init_conn(conn) -> None: + await conn.set_type_codec('jsonb', encoder=json.dumps, decoder=json.loads, schema='pg_catalog') + await conn.set_type_codec('json', encoder=json.dumps, decoder=json.loads, schema='pg_catalog') + + self._pool = await asyncpg.create_pool( + dsn=target_dsn, + min_size=self._min_size, + max_size=self._max_size, + init=_init_conn, + ) + await self._initialize_schema() + await self._maybe_migrate_from_sqlite() + return self._pool + + async def _ensure_database_exists(self, asyncpg, target_dsn:str) -> None: + """Connect to the postgres maintenance DB and CREATE the target database + if it does not already exist. Uses a plain connection (not a pool) so + the CREATE DATABASE statement can run outside any transaction.""" + parsed = urlparse(target_dsn) + db_name = parsed.path.lstrip("/") + maintenance_dsn = urlunparse(parsed._replace(path="/postgres")) + try: + conn = await asyncpg.connect(dsn=maintenance_dsn) + except Exception: + # Fall back to connecting without specifying a database — some setups + # (e.g. Cloud SQL, managed PG) disallow direct access to 'postgres'. + maintenance_dsn = urlunparse(parsed._replace(path="")) + conn = await asyncpg.connect(dsn=maintenance_dsn) + try: + exists = await conn.fetchval( + "SELECT 1 FROM pg_database WHERE datname = $1", db_name + ) + if not exists: + await conn.execute(f'CREATE DATABASE "{db_name}"') + logger.info(f"PostgreSQL: created database '{db_name}'") + finally: + await conn.close() + + async def _initialize_schema(self) -> None: + assert self._pool is not None + async with self._pool.acquire() as conn: + await conn.execute(_DDL) + await conn.execute(_ALTER_DDL) + # TOAST storage hints are idempotent; ignore errors on repeated runs. + try: + await conn.execute(_TOAST_DDL) + except Exception as exc: + logger.debug(f"TOAST DDL skipped (likely already set): {exc}") + + # ── sqlite migration ─────────────────────────────────────────────────────── + + async def _maybe_migrate_from_sqlite(self) -> None: + if not self._sqlite_migration_path: + return + + src = Path(self._sqlite_migration_path) + if not src.exists(): + return + + logger.info(f"SQLite migration: found {src}, starting migration to PostgreSQL …") + try: + games, turns, snake_turns = await asyncio.to_thread(self._read_sqlite_data_sync, str(src)) + await self._insert_migrated_data(games, turns, snake_turns) + + done_path = src.with_suffix(".migrated") + src.rename(done_path) + + logger.info( + f"SQLite migration complete: {len(games)} games, {len(turns)} turns, " + f"{len(snake_turns)} snake_turns migrated. " + f"Source file renamed to {done_path.name}" + ) + except Exception: + logger.exception("SQLite migration failed — PostgreSQL data is untouched, original SQLite file kept") + + def _read_sqlite_data_sync(self, db_path:str) -> tuple[list[sqlite3.Row], list[sqlite3.Row], list[sqlite3.Row]]: + conn = sqlite3.connect(db_path, timeout=30, isolation_level=None) + conn.row_factory = sqlite3.Row + + try: + games = conn.execute(""" + SELECT game_id, started_at, ended_at, width, height, source, map_name, + ruleset_name, ruleset_version, your_snake_id, your_snake_name, + your_snake_type, your_snake_version, game_type, + winner_names_json, winner_you, final_turn, status + FROM games + ORDER BY started_at ASC + """).fetchall() + turns = conn.execute(""" + SELECT game_id, turn, observed_at, my_move, my_thinking_json, + board_state_json, snakes_json, you_json, food_json, hazards_json + FROM turns + ORDER BY game_id ASC, turn ASC + """).fetchall() + snake_turns = conn.execute(""" + SELECT game_id, turn, snake_id, snake_name, health, length, + head_x, head_y, body_json, is_you, inferred_move, latency + FROM snake_turns + ORDER BY game_id ASC, turn ASC, snake_id ASC + """).fetchall() + finally: + conn.close() + + return games, turns, snake_turns + + def _parse_ts(self, value:str|None) -> datetime|None: + """Parse an ISO-8601 TEXT timestamp from SQLite into a timezone-aware datetime.""" + ts = self._parse_utc_timestamp(value) + return ts # already UTC-aware from base class helper + + def _parse_json(self, value: str|None) -> object: + if not value: + return None + try: + return json.loads(value) + except (json.JSONDecodeError, TypeError): + return None + + async def _insert_migrated_data(self, games:list, turns:list, snake_turns:list) -> None: + assert self._pool is not None + async with self._pool.acquire() as conn: + async with conn.transaction(): + # games ───────────────────────────────────────────────────────────── + # winner_name is TEXT — no cast needed. + await conn.executemany(""" + INSERT INTO games ( + game_id, started_at, ended_at, width, height, source, map_name, + ruleset_name, ruleset_version, your_snake_id, your_snake_name, + your_snake_type, your_snake_version, game_type, + winner_name, winner_you, final_turn, status + ) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,$16,$17,$18) + ON CONFLICT (game_id) DO NOTHING + """, + [ + ( + row["game_id"], + self._parse_ts(row["started_at"]), + self._parse_ts(row["ended_at"]), + row["width"], + row["height"], + row["source"], + row["map_name"], + row["ruleset_name"], + row["ruleset_version"], + row["your_snake_id"], + row["your_snake_name"], + row["your_snake_type"], + row["your_snake_version"], + row["game_type"], + (self._parse_json(row["winner_names_json"]) or [None])[0], + bool(row["winner_you"]), + row["final_turn"], + row["status"], + ) + for row in games + ], + ) + + await conn.executemany(""" + INSERT INTO turns ( + game_id, turn, observed_at, my_move, my_thinking, + board_state, snakes, you, food, hazards + ) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10) + ON CONFLICT (game_id, turn) DO NOTHING + """, + [ + ( + row["game_id"], + row["turn"], + self._parse_ts(row["observed_at"]), + row["my_move"], + self._parse_json(row["my_thinking_json"]), + self._parse_json(row["board_state_json"]), + self._parse_json(row["snakes_json"]), + self._parse_json(row["you_json"]), + self._parse_json(row["food_json"]), + self._parse_json(row["hazards_json"]), + ) + for row in turns + ], + ) + + # snake_turns + await conn.executemany(""" + INSERT INTO snake_turns ( + game_id, turn, snake_id, snake_name, health, length, + head_x, head_y, body, is_you, inferred_move, latency + ) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12) + ON CONFLICT (game_id, turn, snake_id) DO NOTHING + """, + [ + ( + row["game_id"], + row["turn"], + row["snake_id"], + row["snake_name"], + row["health"], + row["length"], + row["head_x"], + row["head_y"], + self._parse_json(row["body_json"]), + bool(row["is_you"]), + row["inferred_move"], + row["latency"], + ) + for row in snake_turns + ], + ) + + # ── helpers ──────────────────────────────────────────────────────────────── + + def _utc_now_ts(self) -> datetime: + return datetime.now(timezone.utc) + + # The pool init callback registers JSON/JSONB codecs so asyncpg automatically + # encodes Python dicts/lists on write and decodes them on read. + + # ── write methods ────────────────────────────────────────────────────────── + + async def record_game_start(self, game_state:dict, snake_type:str|None=None, snake_version:str|None=None) -> None: + game = game_state.get("game", {}) + board = game_state.get("board", {}) + you = self._extract_you(game_state) + ruleset = game.get("ruleset", {}) + game_type = self._derive_game_type(board, ruleset) + + pool = await self._get_pool() + async with pool.acquire() as conn: + await conn.execute(""" + INSERT INTO games ( + game_id, started_at, width, height, source, map_name, + ruleset_name, ruleset_version, your_snake_id, your_snake_name, + your_snake_type, your_snake_version, game_type, status + ) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,'running') + ON CONFLICT (game_id) DO UPDATE SET + width = EXCLUDED.width, + height = EXCLUDED.height, + source = EXCLUDED.source, + map_name = EXCLUDED.map_name, + ruleset_name = EXCLUDED.ruleset_name, + ruleset_version = EXCLUDED.ruleset_version, + your_snake_id = EXCLUDED.your_snake_id, + your_snake_name = EXCLUDED.your_snake_name, + your_snake_type = EXCLUDED.your_snake_type, + your_snake_version = EXCLUDED.your_snake_version, + game_type = EXCLUDED.game_type, + status = 'running' + """, + game.get("id"), + self._utc_now_ts(), + board.get("width"), + board.get("height"), + game.get("source"), + game.get("map"), + ruleset.get("name"), + ruleset.get("version"), + you.get("id"), + you.get("name"), + snake_type, + snake_version, + game_type, + ) + + async def record_turn(self, game_state:dict, my_move:str|None, my_thinking:dict|None=None) -> None: + game = game_state.get("game", {}) + board = game_state.get("board", {}) + snakes = self._extract_snakes(game_state) + you = self._extract_you(game_state) + game_id = game.get("id") + turn = int(game_state.get("turn", 0)) + + pool = await self._get_pool() + async with pool.acquire() as conn: + async with conn.transaction(): + await conn.execute(""" + INSERT INTO turns ( + game_id, turn, observed_at, my_move, my_thinking, + board_state, snakes, you, food, hazards + ) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10) + ON CONFLICT (game_id, turn) DO UPDATE SET + observed_at = EXCLUDED.observed_at, + my_move = EXCLUDED.my_move, + my_thinking = EXCLUDED.my_thinking, + board_state = EXCLUDED.board_state, + snakes = EXCLUDED.snakes, + you = EXCLUDED.you, + food = EXCLUDED.food, + hazards = EXCLUDED.hazards + """, + game_id, + turn, + self._utc_now_ts(), + my_move, + my_thinking, + board, + snakes, + you, + board.get("food", []), + board.get("hazards", []), + ) + + previous_positions:dict[str, tuple[int, int]] = {} + if turn > 0: + prev_rows = await conn.fetch(""" + SELECT snake_id, head_x, head_y + FROM snake_turns + WHERE game_id = $1 AND turn = $2 + """, + game_id, turn - 1, + ) + previous_positions = { + row["snake_id"]: (int(row["head_x"]), int(row["head_y"])) + for row in prev_rows + if row["head_x"] is not None and row["head_y"] is not None + } + + you_id = you.get("id") + for snake in snakes: + snake_id = snake.get("id") + head = snake.get("head", {}) + head_x = head.get("x") + head_y = head.get("y") + if snake_id is None: + continue + + new_head = ( + (int(head_x), int(head_y)) + if head_x is not None and head_y is not None + else None + ) + inferred = self._infer_direction(previous_positions.get(snake_id), new_head) + await conn.execute(""" + INSERT INTO snake_turns ( + game_id, turn, snake_id, snake_name, health, length, + head_x, head_y, body, is_you, inferred_move, latency + ) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12) + ON CONFLICT (game_id, turn, snake_id) DO UPDATE SET + snake_name = EXCLUDED.snake_name, + health = EXCLUDED.health, + length = EXCLUDED.length, + head_x = EXCLUDED.head_x, + head_y = EXCLUDED.head_y, + body = EXCLUDED.body, + is_you = EXCLUDED.is_you, + inferred_move = EXCLUDED.inferred_move, + latency = EXCLUDED.latency + """, + game_id, + turn, + snake_id, + snake.get("name"), + snake.get("health"), + snake.get("length"), + head_x, + head_y, + snake.get("body", []), + snake_id == you_id, + inferred, + snake.get("latency"), + ) + + await conn.execute(""" + UPDATE games + SET final_turn = GREATEST(final_turn, $1) + WHERE game_id = $2 + """, + turn, game_id, + ) + + async def record_game_end(self, game_state:dict) -> None: + game = game_state.get("game", {}) + game_id = game.get("id") + board = game_state.get("board", {}) + snakes = list(board.get("snakes", [])) + you = self._extract_you(game_state) + winner_name = next((s.get("name") for s in snakes if s.get("name")), None) + you_id = you.get("id") + winner_you = any(s.get("id") == you_id for s in snakes) + + pool = await self._get_pool() + async with pool.acquire() as conn: + await conn.execute(""" + UPDATE games + SET ended_at = $1, + winner_name = $2, + winner_you = $3, + final_turn = GREATEST(final_turn, $4), + status = 'finished' + WHERE game_id = $5 + """, + self._utc_now_ts(), + winner_name, + winner_you, + int(game_state.get("turn", 0)), + game_id, + ) + + # ── stale game finalization ──────────────────────────────────────────────── + + async def finalize_stale_running_games(self, stale_after_seconds:int=600) -> int: + threshold = max(60, int(stale_after_seconds)) + now_utc = datetime.now(timezone.utc) + finalized = 0 + + pool = await self._get_pool() + async with pool.acquire() as conn: + rows = await conn.fetch(""" + SELECT game_id, started_at, final_turn, your_snake_id + FROM games + WHERE status = 'running' + ORDER BY started_at ASC + """) + + for row in rows: + started_at = row["started_at"] + if started_at is None: + continue + if started_at.tzinfo is None: + started_at = started_at.replace(tzinfo=timezone.utc) + if (now_utc - started_at).total_seconds() < threshold: + continue + + game_id = row["game_id"] + your_snake_id = row["your_snake_id"] + final_turn = int(row["final_turn"] or 0) + + snake_rows = await conn.fetch(""" + SELECT snake_id, snake_name + FROM snake_turns + WHERE game_id = $1 AND turn = $2 + ORDER BY is_you DESC, snake_name ASC + """, + game_id, final_turn, + ) + + if len(snake_rows) == 0: + latest_row = await conn.fetchrow( + "SELECT MAX(turn) AS latest_turn FROM snake_turns WHERE game_id = $1", + game_id, + ) + + if latest_row is not None and latest_row["latest_turn"] is not None: + final_turn = int(latest_row["latest_turn"]) + snake_rows = await conn.fetch(""" + SELECT snake_id, snake_name + FROM snake_turns + WHERE game_id = $1 AND turn = $2 + ORDER BY is_you DESC, snake_name ASC + """, + game_id, final_turn, + ) + + survivor_ids = [s["snake_id"] for s in snake_rows if s["snake_id"]] + winner_you = bool( + your_snake_id + and your_snake_id in survivor_ids + and len(survivor_ids) == 1 + ) + survivor_name = next((s["snake_name"] for s in snake_rows if s["snake_name"]), None) + + tag = await conn.execute(""" + UPDATE games + SET ended_at = $1, + winner_name = $2, + winner_you = $3, + final_turn = GREATEST(final_turn, $4), + status = 'finished' + WHERE game_id = $5 AND status = 'running' + """, + self._utc_now_ts(), + survivor_name, + winner_you, + final_turn, + game_id, + ) + if tag and tag.endswith("1"): + finalized += 1 + + return finalized + + # ── read methods ─────────────────────────────────────────────────────────── + + async def get_summary(self, recent_limit:int=15) -> dict: + pool = await self._get_pool() + async with pool.acquire() as conn: + totals = await conn.fetchrow(""" + SELECT + COUNT(*) AS total_games, + COUNT(*) FILTER (WHERE status = 'running') AS running_games, + COUNT(*) FILTER (WHERE status = 'finished') AS finished_games, + COUNT(*) FILTER (WHERE status = 'finished' AND winner_you) AS wins, + COUNT(*) FILTER (WHERE status = 'finished' AND NOT winner_you) AS losses, + AVG(final_turn) FILTER (WHERE status = 'finished') AS avg_turns + FROM games + """) + + by_type = await conn.fetch(""" + SELECT + COALESCE(game_type, ruleset_name, 'unknown') AS type_label, + COUNT(*) AS total, + COUNT(*) FILTER (WHERE winner_you) AS wins, + COUNT(*) FILTER (WHERE NOT winner_you) AS losses + FROM games + WHERE status = 'finished' + GROUP BY type_label + ORDER BY total DESC + """) + + recent = await conn.fetch(""" + SELECT game_id, started_at, ended_at, map_name, ruleset_name, game_type, + your_snake_name, your_snake_type, your_snake_version, winner_you, final_turn, status + FROM games + ORDER BY started_at DESC + LIMIT $1 + """, + max(1, int(recent_limit)), + ) + + return { + "total_games": int(totals["total_games"] or 0), + "running_games": int(totals["running_games"] or 0), + "finished_games": int(totals["finished_games"] or 0), + "wins": int(totals["wins"] or 0), + "losses": int(totals["losses"] or 0), + "avg_turns_finished": round(float(totals["avg_turns"] or 0.0), 2), + "by_game_type": [{ + "game_type": row["type_label"], + "total": int(row["total"]), + "wins": int(row["wins"]), + "losses": int(row["losses"]), + } for row in by_type], + "recent_games": [{ + "game_id": row["game_id"], + "started_at": row["started_at"].isoformat() if row["started_at"] else None, + "ended_at": row["ended_at"].isoformat() if row["ended_at"] else None, + "map": row["map_name"], + "ruleset": row["ruleset_name"], + "game_type": row["game_type"], + "snake": row["your_snake_name"], + "snake_type": row["your_snake_type"], + "snake_version": row["your_snake_version"], + "winner_you": bool(row["winner_you"]), + "final_turn": int(row["final_turn"] or 0), + "status": row["status"], + } for row in recent], + } + + async def list_games(self, limit:int=50) -> list[dict]: + pool = await self._get_pool() + async with pool.acquire() as conn: + rows = await conn.fetch(""" + SELECT game_id, started_at, ended_at, map_name, source, ruleset_name, game_type, + your_snake_name, your_snake_type, your_snake_version, + winner_you, winner_name, final_turn, status + FROM games + ORDER BY started_at DESC + LIMIT $1 + """, + max(1, int(limit)), + ) + + return [{ + "game_id": row["game_id"], + "started_at": row["started_at"].isoformat() if row["started_at"] else None, + "ended_at": row["ended_at"].isoformat() if row["ended_at"] else None, + "map": row["map_name"], + "source": row["source"], + "ruleset": row["ruleset_name"], + "game_type": row["game_type"], + "snake": row["your_snake_name"], + "snake_type": row["your_snake_type"], + "snake_version": row["your_snake_version"], + "winner_you": bool(row["winner_you"]), + "winner_name": row["winner_name"], + "final_turn": int(row["final_turn"] or 0), + "status": row["status"], + } for row in rows] + + async def get_game_replay(self, game_id:str) -> dict|None: + pool = await self._get_pool() + async with pool.acquire() as conn: + game_row = await conn.fetchrow(""" + SELECT game_id, started_at, ended_at, width, height, source, map_name, + ruleset_name, ruleset_version, game_type, your_snake_id, your_snake_name, + your_snake_type, your_snake_version, + winner_name, winner_you, final_turn, status + FROM games + WHERE game_id = $1 + """, + game_id, + ) + + if game_row is None: + return None + + turn_rows = await conn.fetch(""" + SELECT turn, observed_at, my_move, my_thinking, + board_state, food, hazards, you + FROM turns + WHERE game_id = $1 + ORDER BY turn ASC + """, + game_id, + ) + + snake_rows = await conn.fetch(""" + SELECT turn, snake_id, snake_name, health, length, head_x, head_y, + body, is_you, inferred_move, latency + FROM snake_turns + WHERE game_id = $1 + ORDER BY turn ASC, is_you DESC, snake_name ASC + """, + game_id, + ) + + snakes_by_turn:dict[int, list[dict]] = {} + for row in snake_rows: + snakes_by_turn.setdefault(int(row["turn"]), []).append({ + "snake_id": row["snake_id"], + "snake_name": row["snake_name"], + "health": row["health"], + "length": row["length"], + "head": {"x": row["head_x"], "y": row["head_y"]}, + "body": row["body"] or [], + "is_you": bool(row["is_you"]), + "inferred_move": row["inferred_move"], + "latency": row["latency"], + }) + + return { + "game": { + "game_id": game_row["game_id"], + "started_at": game_row["started_at"].isoformat() if game_row["started_at"] else None, + "ended_at": game_row["ended_at"].isoformat() if game_row["ended_at"] else None, + "width": game_row["width"], + "height": game_row["height"], + "source": game_row["source"], + "map": game_row["map_name"], + "ruleset_name": game_row["ruleset_name"], + "ruleset_version": game_row["ruleset_version"], + "game_type": game_row["game_type"], + "your_snake_id": game_row["your_snake_id"], + "your_snake_name": game_row["your_snake_name"], + "your_snake_type": game_row["your_snake_type"], + "your_snake_version": game_row["your_snake_version"], + "winner_name": game_row["winner_name"], + "winner_you": bool(game_row["winner_you"]), + "final_turn": int(game_row["final_turn"] or 0), + "status": game_row["status"], + }, + "turns": [ + { + "turn": int(row["turn"]), + "observed_at": row["observed_at"].isoformat() if row["observed_at"] else None, + "my_move": row["my_move"], + "my_thinking": row["my_thinking"], + "board": row["board_state"], + "food": row["food"] or [], + "hazards": row["hazards"] or [], + "you": row["you"] or {}, + "snakes": snakes_by_turn.get(int(row["turn"]), []), + } + for row in turn_rows + ], + } + + # ── lifecycle ────────────────────────────────────────────────────────────── + + async def close(self) -> None: + if self._pool is not None: + await self._pool.close() + self._pool = None diff --git a/server/database/backend/SqliteGameplayBackend.py b/server/database/backend/SqliteGameplayBackend.py new file mode 100644 index 0000000..ef0f20a --- /dev/null +++ b/server/database/backend/SqliteGameplayBackend.py @@ -0,0 +1,658 @@ +from quart_common.web.env import env_bool + +import asyncio, sqlite3, json, os, logging, sys +from datetime import datetime, timezone +from pathlib import Path + +from server.database.backend.Template import GameplayBackendTemplate + +logger = logging.getLogger(__name__) +if not logger.handlers: + _handler = logging.StreamHandler(stream=sys.stdout) + _handler.setFormatter(logging.Formatter(fmt="%(levelname)s %(module)s: %(message)s")) + logger.addHandler(_handler) + logger.propagate = False + +_ZSTD_EXT = Path(os.environ.get("SQLITE_ZSTD_EXT", "/usr/local/lib/libsqlite_zstd.so")).expanduser().resolve() + +class SqliteGameplayBackend(GameplayBackendTemplate): + def __init__(self, db_path:str, busy_timeout_ms:int=5000): + self.db_path = db_path + self.busy_timeout_ms = max(1000, int(busy_timeout_ms)) + self._zstd_available = False + self._initialize_database() + + # ── connection ───────────────────────────────────────────────────────────── + + def _connect(self) -> sqlite3.Connection: + connection = sqlite3.connect( + self.db_path, + timeout=max(1, self.busy_timeout_ms // 1000), + isolation_level=None, + ) + connection.row_factory = sqlite3.Row + + if _ZSTD_EXT.exists() and not env_bool('DISABLE_GAMEPLAY_DB_COMPRESSION', True): + try: + connection.enable_load_extension(True) + connection.load_extension(str(_ZSTD_EXT)) + self._zstd_available = True + except sqlite3.OperationalError as e: + logger.warning(f"sqlite-zstd extension skipped: {e}") + finally: + connection.enable_load_extension(False) + + connection.execute("PRAGMA foreign_keys = ON") + connection.execute("PRAGMA journal_mode = WAL") + connection.execute("PRAGMA synchronous = NORMAL") + connection.execute("PRAGMA temp_store = MEMORY") + connection.execute("PRAGMA journal_size_limit = 1048576") + connection.execute(f"PRAGMA busy_timeout = {self.busy_timeout_ms}") + return connection + + def _ensure_auto_vacuum_full(self, connection:sqlite3.Connection) -> None: + current = connection.execute("PRAGMA auto_vacuum").fetchone()[0] + if current != 1: + connection.execute("PRAGMA auto_vacuum = FULL") + connection.execute("VACUUM") + + # ── schema setup ─────────────────────────────────────────────────────────── + + def _initialize_database(self) -> None: + Path(self.db_path).parent.mkdir(parents=True, exist_ok=True) + with self._connect() as connection: + self._ensure_auto_vacuum_full(connection) + connection.executescript(""" + CREATE TABLE IF NOT EXISTS games ( + game_id TEXT PRIMARY KEY, + started_at TEXT NOT NULL, + ended_at TEXT, + width INTEGER, + height INTEGER, + source TEXT, + map_name TEXT, + ruleset_name TEXT, + ruleset_version TEXT, + your_snake_id TEXT, + your_snake_name TEXT, + your_snake_type TEXT, + your_snake_version TEXT, + winner_name TEXT, + winner_you INTEGER NOT NULL DEFAULT 0, + final_turn INTEGER NOT NULL DEFAULT 0, + status TEXT NOT NULL DEFAULT 'running' + ); + + CREATE TABLE IF NOT EXISTS turns ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + game_id TEXT NOT NULL, + turn INTEGER NOT NULL, + observed_at TEXT NOT NULL, + my_move TEXT, + my_thinking_json TEXT, + board_state_json TEXT NOT NULL, + snakes_json TEXT NOT NULL, + you_json TEXT NOT NULL, + food_json TEXT NOT NULL, + hazards_json TEXT NOT NULL, + UNIQUE (game_id, turn), + FOREIGN KEY (game_id) REFERENCES games(game_id) ON DELETE CASCADE + ); + + CREATE TABLE IF NOT EXISTS snake_turns ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + game_id TEXT NOT NULL, + turn INTEGER NOT NULL, + snake_id TEXT NOT NULL, + snake_name TEXT, + health INTEGER, + length INTEGER, + head_x INTEGER, + head_y INTEGER, + body_json TEXT NOT NULL, + is_you INTEGER NOT NULL DEFAULT 0, + inferred_move TEXT, + UNIQUE (game_id, turn, snake_id), + FOREIGN KEY (game_id) REFERENCES games(game_id) ON DELETE CASCADE + ); + """) + self._create_indexes_if_tables(connection) + self._ensure_column_exists(connection, "turns", "my_thinking_json", "TEXT") + self._ensure_column_exists(connection, "games", "your_snake_type", "TEXT") + self._ensure_column_exists(connection, "games", "your_snake_version", "TEXT") + self._ensure_column_exists(connection, "games", "game_type", "TEXT") + self._ensure_column_exists(connection, "snake_turns", "latency", "TEXT") + self._ensure_column_exists(connection, "games", "winner_name", "TEXT") + if self._zstd_available: + self._enable_zstd_compression(connection) + connection.execute("PRAGMA optimize") + + def _create_indexes_if_tables(self, connection:sqlite3.Connection) -> None: + real_tables = { + row[0] for row in connection.execute( + "SELECT name FROM sqlite_master WHERE type='table'" + ).fetchall() + } + indexes = [ + ("idx_turns_game_turn", "turns", "game_id, turn"), + ("idx_games_status", "games", "status"), + ("idx_snake_turns_game_turn", "snake_turns", "game_id, turn"), + ] + for idx_name, table, cols in indexes: + if table in real_tables: + connection.execute(f"CREATE INDEX IF NOT EXISTS {idx_name} ON {table}({cols})") + + def _ensure_column_exists(self, connection:sqlite3.Connection, table_name:str, column_name:str, column_type:str) -> None: + obj = connection.execute( + "SELECT type FROM sqlite_master WHERE name = ?", (table_name,) + ).fetchone() + + if obj and obj["type"] == "view": + underlying = f"_{table_name}_zstd" + exists = connection.execute( + "SELECT 1 FROM sqlite_master WHERE name = ? AND type = 'table'", (underlying,) + ).fetchone() + if not exists: + return + actual_table = underlying + else: + actual_table = table_name + + existing = connection.execute(f"PRAGMA table_info({actual_table})").fetchall() + if any(row["name"] == column_name for row in existing): + return + connection.execute(f"ALTER TABLE {actual_table} ADD COLUMN {column_name} {column_type}") + + def _enable_zstd_compression(self, connection:sqlite3.Connection) -> None: + compressed_columns = [ + ("turns", "board_state_json"), + ("turns", "snakes_json"), + ("turns", "you_json"), + ("turns", "food_json"), + ("turns", "hazards_json"), + ("snake_turns", "body_json"), + ] + for table, column in compressed_columns: + try: + connection.execute( + "SELECT zstd_enable_transparent(?)", + [json.dumps({"table": table, "column": column, "compression_level": 6, "dict_chooser": "'a'"})], + ) + except sqlite3.OperationalError: + pass + connection.execute("SELECT zstd_incremental_maintenance(null, 1)") + + # ── sync write methods ───────────────────────────────────────────────────── + + def _record_game_start_sync(self, game_state:dict, snake_type:str|None=None, snake_version:str|None=None) -> None: + game = game_state.get("game", {}) + board = game_state.get("board", {}) + you = self._extract_you(game_state) + ruleset = game.get("ruleset", {}) + game_type = self._derive_game_type(board, ruleset) + + with self._connect() as connection: + connection.execute(""" + INSERT INTO games ( + game_id, started_at, width, height, source, map_name, + ruleset_name, ruleset_version, your_snake_id, your_snake_name, + your_snake_type, your_snake_version, game_type, status + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 'running') + ON CONFLICT(game_id) DO UPDATE SET + width = excluded.width, + height = excluded.height, + source = excluded.source, + map_name = excluded.map_name, + ruleset_name = excluded.ruleset_name, + ruleset_version = excluded.ruleset_version, + your_snake_id = excluded.your_snake_id, + your_snake_name = excluded.your_snake_name, + your_snake_type = excluded.your_snake_type, + your_snake_version = excluded.your_snake_version, + game_type = excluded.game_type, + status = 'running' + """, + ( + game.get("id"), + self._utc_now(), + board.get("width"), + board.get("height"), + game.get("source"), + game.get("map"), + ruleset.get("name"), + ruleset.get("version"), + you.get("id"), + you.get("name"), + snake_type, + snake_version, + game_type, + ), + ) + connection.execute("PRAGMA wal_checkpoint(PASSIVE)") + connection.execute("PRAGMA optimize") + + def _record_turn_sync(self, game_state:dict, my_move:str|None, my_thinking:dict|None) -> None: + game = game_state.get("game", {}) + board = game_state.get("board", {}) + snakes = self._extract_snakes(game_state) + you = self._extract_you(game_state) + game_id = game.get("id") + turn = int(game_state.get("turn", 0)) + + with self._connect() as connection: + connection.execute(""" + INSERT INTO turns ( + game_id, turn, observed_at, my_move, my_thinking_json, + board_state_json, snakes_json, you_json, food_json, hazards_json + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + ON CONFLICT(game_id, turn) DO UPDATE SET + observed_at = excluded.observed_at, + my_move = excluded.my_move, + my_thinking_json = excluded.my_thinking_json, + board_state_json = excluded.board_state_json, + snakes_json = excluded.snakes_json, + you_json = excluded.you_json, + food_json = excluded.food_json, + hazards_json = excluded.hazards_json + """, + ( + game_id, + turn, + self._utc_now(), + my_move, + self._to_json(my_thinking) if my_thinking is not None else None, + self._to_json(board), + self._to_json(snakes), + self._to_json(you), + self._to_json(board.get("food", [])), + self._to_json(board.get("hazards", [])), + ), + ) + + previous_positions: dict[str, tuple[int, int]] = {} + if turn > 0: + previous_rows = connection.execute(""" + SELECT snake_id, head_x, head_y + FROM snake_turns + WHERE game_id = ? AND turn = ? + """, + (game_id, turn - 1), + ).fetchall() + previous_positions = { + row["snake_id"]: (int(row["head_x"]), int(row["head_y"])) + for row in previous_rows + if row["head_x"] is not None and row["head_y"] is not None + } + + you_id = you.get("id") + for snake in snakes: + snake_id = snake.get("id") + head = snake.get("head", {}) + head_x = head.get("x") + head_y = head.get("y") + if snake_id is None: + continue + new_head = ( + (int(head_x), int(head_y)) + if head_x is not None and head_y is not None + else None + ) + inferred = self._infer_direction(previous_positions.get(snake_id), new_head) + connection.execute(""" + INSERT INTO snake_turns ( + game_id, turn, snake_id, snake_name, health, length, + head_x, head_y, body_json, is_you, inferred_move, latency + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + ON CONFLICT(game_id, turn, snake_id) DO UPDATE SET + snake_name = excluded.snake_name, + health = excluded.health, + length = excluded.length, + head_x = excluded.head_x, + head_y = excluded.head_y, + body_json = excluded.body_json, + is_you = excluded.is_you, + inferred_move = excluded.inferred_move, + latency = excluded.latency + """, + ( + game_id, + turn, + snake_id, + snake.get("name"), + snake.get("health"), + snake.get("length"), + head_x, + head_y, + self._to_json(snake.get("body", [])), + 1 if snake_id == you_id else 0, + inferred, + snake.get("latency"), + ), + ) + + connection.execute(""" + UPDATE games + SET final_turn = CASE WHEN ? > final_turn THEN ? ELSE final_turn END + WHERE game_id = ? + """, + (turn, turn, game_id), + ) + + def _record_game_end_sync(self, game_state:dict) -> None: + game = game_state.get("game", {}) + game_id = game.get("id") + board = game_state.get("board", {}) + snakes = list(board.get("snakes", [])) + you = self._extract_you(game_state) + winner_name = next((snake.get("name") for snake in snakes if snake.get("name")), None) + you_id = you.get("id") + winner_you = any(snake.get("id") == you_id for snake in snakes) + + with self._connect() as connection: + connection.execute(""" + UPDATE games + SET ended_at = ?, + winner_name = ?, + winner_you = ?, + final_turn = CASE WHEN ? > final_turn THEN ? ELSE final_turn END, + status = 'finished' + WHERE game_id = ? + """, + ( + self._utc_now(), + winner_name, + 1 if winner_you else 0, + int(game_state.get("turn", 0)), + int(game_state.get("turn", 0)), + game_id, + ), + ) + + def _finalize_stale_running_games_sync(self, stale_after_seconds:int=600) -> int: + threshold = max(60, int(stale_after_seconds)) + now_utc = datetime.now(timezone.utc) + finalized = 0 + + with self._connect() as connection: + rows = connection.execute(""" + SELECT game_id, started_at, final_turn, your_snake_id + FROM games + WHERE status = 'running' + ORDER BY started_at ASC + """).fetchall() + + for row in rows: + started_at = self._parse_utc_timestamp(row["started_at"]) + if started_at is None: + continue + if (now_utc - started_at).total_seconds() < threshold: + continue + + game_id = row["game_id"] + your_snake_id = row["your_snake_id"] + final_turn = int(row["final_turn"] or 0) + + snake_rows = connection.execute(""" + SELECT snake_id, snake_name + FROM snake_turns + WHERE game_id = ? AND turn = ? + ORDER BY is_you DESC, snake_name ASC + """, + (game_id, final_turn), + ).fetchall() + + if len(snake_rows) == 0: + latest_row = connection.execute( + "SELECT MAX(turn) AS latest_turn FROM snake_turns WHERE game_id = ?", + (game_id,), + ).fetchone() + if latest_row is not None and latest_row["latest_turn"] is not None: + final_turn = int(latest_row["latest_turn"]) + snake_rows = connection.execute(""" + SELECT snake_id, snake_name + FROM snake_turns + WHERE game_id = ? AND turn = ? + ORDER BY is_you DESC, snake_name ASC + """, + (game_id, final_turn), + ).fetchall() + + survivor_ids = [s["snake_id"] for s in snake_rows if s["snake_id"]] + winner_you = bool( + your_snake_id + and your_snake_id in survivor_ids + and len(survivor_ids) == 1 + ) + survivor_name = next((s["snake_name"] for s in snake_rows if s["snake_name"]), None) + + result = connection.execute(""" + UPDATE games + SET ended_at = ?, + winner_name = ?, + winner_you = ?, + final_turn = CASE WHEN ? > final_turn THEN ? ELSE final_turn END, + status = 'finished' + WHERE game_id = ? AND status = 'running' + """, + ( + self._utc_now(), + survivor_name, + 1 if winner_you else 0, + final_turn, + final_turn, + game_id, + ), + ) + if result.rowcount > 0: + finalized += 1 + + return finalized + + # ── sync read methods ────────────────────────────────────────────────────── + + def _get_summary_sync(self, recent_limit:int=15) -> dict: + with self._connect() as connection: + totals = connection.execute(""" + SELECT + COUNT(*) AS total_games, + SUM(CASE WHEN status = 'running' THEN 1 ELSE 0 END) AS running_games, + SUM(CASE WHEN status = 'finished' THEN 1 ELSE 0 END) AS finished_games, + SUM(CASE WHEN status = 'finished' AND winner_you = 1 THEN 1 ELSE 0 END) AS wins, + SUM(CASE WHEN status = 'finished' AND winner_you = 0 THEN 1 ELSE 0 END) AS losses, + AVG(CASE WHEN status = 'finished' THEN final_turn ELSE NULL END) AS avg_turns + FROM games + """).fetchone() + + by_type = connection.execute(""" + SELECT + COALESCE(game_type, ruleset_name, 'unknown') AS type_label, + COUNT(*) AS total, + SUM(CASE WHEN status = 'finished' AND winner_you = 1 THEN 1 ELSE 0 END) AS wins, + SUM(CASE WHEN status = 'finished' AND winner_you = 0 THEN 1 ELSE 0 END) AS losses + FROM games + WHERE status = 'finished' + GROUP BY type_label + ORDER BY total DESC + """).fetchall() + + recent = connection.execute(""" + SELECT game_id, started_at, ended_at, map_name, ruleset_name, game_type, + your_snake_name, your_snake_type, your_snake_version, winner_you, final_turn, status + FROM games + ORDER BY started_at DESC + LIMIT ? + """, + (max(1, int(recent_limit)),), + ).fetchall() + + return { + "total_games": int(totals["total_games"] or 0), + "running_games": int(totals["running_games"] or 0), + "finished_games": int(totals["finished_games"] or 0), + "wins": int(totals["wins"] or 0), + "losses": int(totals["losses"] or 0), + "avg_turns_finished": round(float(totals["avg_turns"] or 0.0), 2), + "by_game_type": [{ + "game_type": row["type_label"], + "total": int(row["total"]), + "wins": int(row["wins"]), + "losses": int(row["losses"]), + } for row in by_type], + "recent_games": [{ + "game_id": row["game_id"], + "started_at": row["started_at"], + "ended_at": row["ended_at"], + "map": row["map_name"], + "ruleset": row["ruleset_name"], + "game_type": row["game_type"], + "snake": row["your_snake_name"], + "snake_type": row["your_snake_type"], + "snake_version": row["your_snake_version"], + "winner_you": bool(row["winner_you"]), + "final_turn": int(row["final_turn"] or 0), + "status": row["status"], + } for row in recent], + } + + def _list_games_sync(self, limit:int=50) -> list[dict]: + with self._connect() as connection: + rows = connection.execute(""" + SELECT game_id, started_at, ended_at, map_name, source, ruleset_name, game_type, + your_snake_name, your_snake_type, your_snake_version, + winner_you, winner_name, final_turn, status + FROM games + ORDER BY started_at DESC + LIMIT ? + """, + (max(1, int(limit)),), + ).fetchall() + + return [{ + "game_id": row["game_id"], + "started_at": row["started_at"], + "ended_at": row["ended_at"], + "map": row["map_name"], + "source": row["source"], + "ruleset": row["ruleset_name"], + "game_type": row["game_type"], + "snake": row["your_snake_name"], + "snake_type": row["your_snake_type"], + "snake_version": row["your_snake_version"], + "winner_you": bool(row["winner_you"]), + "winner_name": row["winner_name"], + "final_turn": int(row["final_turn"] or 0), + "status": row["status"], + } for row in rows] + + def _get_game_replay_sync(self, game_id:str) -> dict|None: + with self._connect() as connection: + game_row = connection.execute(""" + SELECT game_id, started_at, ended_at, width, height, source, map_name, + ruleset_name, ruleset_version, game_type, your_snake_id, your_snake_name, + your_snake_type, your_snake_version, + winner_name, winner_you, final_turn, status + FROM games + WHERE game_id = ? + """, + (game_id,), + ).fetchone() + + if game_row is None: + return None + + turn_rows = connection.execute(""" + SELECT turn, observed_at, my_move, my_thinking_json, + board_state_json, food_json, hazards_json, you_json + FROM turns + WHERE game_id = ? + ORDER BY turn ASC + """, + (game_id,), + ).fetchall() + + snake_rows = connection.execute(""" + SELECT turn, snake_id, snake_name, health, length, head_x, head_y, + body_json, is_you, inferred_move, latency + FROM snake_turns + WHERE game_id = ? + ORDER BY turn ASC, is_you DESC, snake_name ASC + """, + (game_id,), + ).fetchall() + + snakes_by_turn: dict[int, list[dict]] = {} + for row in snake_rows: + snakes_by_turn.setdefault(int(row["turn"]), []).append({ + "snake_id": row["snake_id"], + "snake_name": row["snake_name"], + "health": row["health"], + "length": row["length"], + "head": {"x": row["head_x"], "y": row["head_y"]}, + "body": self._from_json(row["body_json"]) or [], + "is_you": bool(row["is_you"]), + "inferred_move": row["inferred_move"], + "latency": row["latency"], + }) + + return { + "game": { + "game_id": game_row["game_id"], + "started_at": game_row["started_at"], + "ended_at": game_row["ended_at"], + "width": game_row["width"], + "height": game_row["height"], + "source": game_row["source"], + "map": game_row["map_name"], + "ruleset_name": game_row["ruleset_name"], + "ruleset_version": game_row["ruleset_version"], + "game_type": game_row["game_type"], + "your_snake_id": game_row["your_snake_id"], + "your_snake_name": game_row["your_snake_name"], + "your_snake_type": game_row["your_snake_type"], + "your_snake_version": game_row["your_snake_version"], + "winner_name": game_row["winner_name"], + "winner_you": bool(game_row["winner_you"]), + "final_turn": int(game_row["final_turn"] or 0), + "status": game_row["status"], + }, + "turns": [ + { + "turn": int(row["turn"]), + "observed_at": row["observed_at"], + "my_move": row["my_move"], + "my_thinking": self._from_json(row["my_thinking_json"]), + "board": self._from_json(row["board_state_json"]), + "food": self._from_json(row["food_json"]) or [], + "hazards": self._from_json(row["hazards_json"]) or [], + "you": self._from_json(row["you_json"]) or {}, + "snakes": snakes_by_turn.get(int(row["turn"]), []), + } + for row in turn_rows + ], + } + + # ── public async interface ───────────────────────────────────────────────── + + async def record_game_start(self, game_state:dict, snake_type:str|None=None, snake_version:str|None=None) -> None: + await asyncio.to_thread(self._record_game_start_sync, game_state, snake_type, snake_version) + + async def record_turn(self, game_state:dict, my_move:str|None, my_thinking:dict|None=None) -> None: + await asyncio.to_thread(self._record_turn_sync, game_state, my_move, my_thinking) + + async def record_game_end(self, game_state:dict) -> None: + await asyncio.to_thread(self._record_game_end_sync, game_state) + + async def get_summary(self, recent_limit:int=15) -> dict: + return await asyncio.to_thread(self._get_summary_sync, recent_limit) + + async def list_games(self, limit:int=50) -> list[dict]: + return await asyncio.to_thread(self._list_games_sync, limit) + + async def finalize_stale_running_games(self, stale_after_seconds:int=600) -> int: + return await asyncio.to_thread(self._finalize_stale_running_games_sync, stale_after_seconds) + + async def get_game_replay(self, game_id:str) -> dict|None: + return await asyncio.to_thread(self._get_game_replay_sync, game_id) + + async def close(self) -> None: + return None diff --git a/server/database/backend/Template.py b/server/database/backend/Template.py new file mode 100644 index 0000000..2dee660 --- /dev/null +++ b/server/database/backend/Template.py @@ -0,0 +1,100 @@ +import json +from datetime import datetime, timezone +from typing import Any + +class GameplayBackendTemplate: + """Abstract base for gameplay database backends. + + Subclasses must override every method that raises NotImplementedError. + Shared pure-Python helpers (_utc_now, _to_json, etc.) live here so they + are available to both SQLite and PostgreSQL implementations. + """ + + # ── public async interface ───────────────────────────────────────────────── + + async def initialize(self) -> None: + """Called once on server startup. Backends that need eager connection + (pool creation, schema init, migration) should override this.""" + return None + + async def record_game_start(self, game_state:dict, snake_type:str|None=None, snake_version:str|None=None) -> None: + raise NotImplementedError + + async def record_turn(self, game_state:dict, my_move:str|None, my_thinking:dict|None=None) -> None: + raise NotImplementedError + + async def record_game_end(self, game_state:dict) -> None: + raise NotImplementedError + + async def get_summary(self, recent_limit:int=15) -> dict: + raise NotImplementedError + + async def list_games(self, limit:int=50) -> list[dict]: + raise NotImplementedError + + async def finalize_stale_running_games(self, stale_after_seconds:int=600) -> int: + raise NotImplementedError + + async def get_game_replay(self, game_id:str) -> dict|None: + raise NotImplementedError + + async def close(self) -> None: + return None + + # ── shared pure-python helpers ───────────────────────────────────────────── + + def _utc_now(self) -> str: + return datetime.now(timezone.utc).isoformat() + + def _parse_utc_timestamp(self, value:str|None) -> datetime|None: + if not value: + return None + normalized = value.strip() + if normalized.endswith("Z"): + normalized = normalized[:-1] + "+00:00" + try: + parsed = datetime.fromisoformat(normalized) + except ValueError: + return None + if parsed.tzinfo is None: + return parsed.replace(tzinfo=timezone.utc) + return parsed.astimezone(timezone.utc) + + def _to_json(self, payload:object) -> str: + return json.dumps(payload, ensure_ascii=False, separators=(",", ":")) + + def _from_json(self, payload:str|None) -> Any: + if payload is None or payload == "": + return None + try: + return json.loads(payload) + except (json.JSONDecodeError, TypeError): + return None + + def _extract_snakes(self, game_state:dict) -> list[dict]: + return list(game_state.get("board", {}).get("snakes", [])) + + def _extract_you(self, game_state:dict) -> dict: + return dict(game_state.get("you", {})) + + def _infer_direction(self, old_head:tuple[int, int]|None, new_head:tuple[int, int]|None) -> str|None: + if old_head is None or new_head is None: + return None + + dx = new_head[0] - old_head[0] + dy = new_head[1] - old_head[1] + if dx == 1 and dy == 0: + return "right" + if dx == -1 and dy == 0: + return "left" + if dx == 0 and dy == 1: + return "up" + if dx == 0 and dy == -1: + return "down" + return None + + def _derive_game_type(self, board:dict, ruleset:dict) -> str: + if len(board.get("snakes", [])) == 2: + return "duel" + + return ruleset.get("name") or "standard" diff --git a/server/database/backend/__init__.py b/server/database/backend/__init__.py new file mode 100644 index 0000000..6753570 --- /dev/null +++ b/server/database/backend/__init__.py @@ -0,0 +1,25 @@ +from .Template import GameplayBackendTemplate + +class GameplayBackendBuilder: + @staticmethod + def build(backend:str="sqlite", db_path:str|None=None, busy_timeout_ms:int=5000, pg_dsn:str|None=None, pg_min_size:int=1, pg_max_size:int=5) -> GameplayBackendTemplate: + normalized = (backend or "sqlite").strip().lower() + + if normalized == "postgresql" or normalized == "postgres": + from .PostgresqlGameplayBackend import PostgresqlGameplayBackend + if not pg_dsn: + raise ValueError("pg_dsn is required for the postgresql backend") + return PostgresqlGameplayBackend( + dsn=pg_dsn, + min_size=pg_min_size, + max_size=pg_max_size, + sqlite_migration_path=db_path, + ) + + if normalized == "sqlite": + from .SqliteGameplayBackend import SqliteGameplayBackend + if not db_path: + raise ValueError("db_path is required for the sqlite backend") + return SqliteGameplayBackend(db_path=db_path, busy_timeout_ms=busy_timeout_ms) + + raise ValueError(f"Unknown gameplay backend: {backend!r}. Choose 'sqlite' or 'postgresql'.") diff --git a/tests/test_GameplayDatabase.py b/tests/test_GameplayDatabase.py index 1314c03..e162523 100644 --- a/tests/test_GameplayDatabase.py +++ b/tests/test_GameplayDatabase.py @@ -3,7 +3,7 @@ import unittest from pathlib import Path import tempfile, sqlite3 -from server.database import GameplayDatabase +from server.database import GameplayDatabase, GameplayBackendBuilder class TestGameplayDatabase(unittest.IsolatedAsyncioTestCase): def _build_state(self, turn:int, me_head:tuple[int, int], enemy_head:tuple[int, int], include_enemy:bool=True) -> dict: @@ -57,7 +57,7 @@ class TestGameplayDatabase(unittest.IsolatedAsyncioTestCase): async def test_records_gameplay_with_wal_and_inferred_moves(self): with tempfile.TemporaryDirectory() as temp_dir: db_path = Path(temp_dir) / "gameplay.sqlite3" - database = GameplayDatabase(str(db_path), busy_timeout_ms=4000) + database = GameplayDatabase(GameplayBackendBuilder.build(db_path=str(db_path), busy_timeout_ms=4000)) await database.record_game_start(self._build_state(turn=0, me_head=(1, 1), enemy_head=(5, 5))) await database.record_turn(