move Server code into more services and use them into blueprints or server

This commit is contained in:
2026-04-06 04:20:07 +02:00
parent 30eb17bb83
commit 98be2fe6fe
8 changed files with 373 additions and 302 deletions
+44 -273
View File
@@ -3,7 +3,6 @@ from quart_common.web.env import env_bool, env_int
from server.Files import read_file from server.Files import read_file
from server.game_state_store import GameStateStoreBuilder from server.game_state_store import GameStateStoreBuilder
from server.GameBoard import GameBoard
from snakes import SnakeBuilder from snakes import SnakeBuilder
@@ -24,8 +23,14 @@ from server.blueprints import (
create_metrics_blueprint, create_metrics_blueprint,
create_dashboard_blueprint, create_dashboard_blueprint,
) )
from server.services import DashboardEventsService from server.services import (
from server.services import DashboardWebSocketHub DashboardEventsService,
DashboardWebSocketHub,
GameRuntimeService,
GameplayTrackingService,
DashboardQueryService,
)
class Server: class Server:
default_snake_config = { default_snake_config = {
@@ -59,9 +64,12 @@ class Server:
self.metrics_redis_url = metrics_redis_url self.metrics_redis_url = metrics_redis_url
self.stale_game_timeout_sec = self._get_stale_game_timeout_sec() self.stale_game_timeout_sec = self._get_stale_game_timeout_sec()
self.running_games: dict[str, GameBoard] = {} self.game_runtime = GameRuntimeService(
self.game_move_counts: dict[str, int] = {} game_state_store=self.game_state_store,
self.game_last_seen_unix: dict[str, int] = {} snake_type=self.snake_type,
game_state_local_cache=self.game_state_local_cache,
stale_game_timeout_sec=self.stale_game_timeout_sec,
)
self.dashboard_ws_hub = DashboardWebSocketHub() self.dashboard_ws_hub = DashboardWebSocketHub()
dashboard_event_origin = f'worker-{os.getpid()}-{int(time.time() * 1000)}' dashboard_event_origin = f'worker-{os.getpid()}-{int(time.time() * 1000)}'
dashboard_events_channel = os.getenv('DASHBOARD_EVENTS_CHANNEL', 'snake:dashboard:events') dashboard_events_channel = os.getenv('DASHBOARD_EVENTS_CHANNEL', 'snake:dashboard:events')
@@ -78,15 +86,39 @@ class Server:
metrics_backend=metrics_backend_normalized, metrics_backend=metrics_backend_normalized,
game_state_backend=game_state_backend, game_state_backend=game_state_backend,
stale_game_timeout_sec=self.stale_game_timeout_sec, stale_game_timeout_sec=self.stale_game_timeout_sec,
game_last_seen_unix=self.game_last_seen_unix, game_last_seen_unix=self.game_runtime.game_last_seen_unix,
game_move_counts=self.game_move_counts, game_move_counts=self.game_runtime.game_move_counts,
) )
self.game_runtime.attach_metrics_collector(self.metrics_collector)
self.clear_worker_metrics_on_startup = env_bool('METRICS_CLEAR_WORKERS_ON_STARTUP', True) self.clear_worker_metrics_on_startup = env_bool('METRICS_CLEAR_WORKERS_ON_STARTUP', True)
self.worker_metrics_startup_lock_ttl_sec = env_int('METRICS_STARTUP_CLEANUP_LOCK_TTL_SEC', 300) self.worker_metrics_startup_lock_ttl_sec = env_int('METRICS_STARTUP_CLEANUP_LOCK_TTL_SEC', 300)
self.dashboard_running_game_stale_sec = 600 self.dashboard_running_game_stale_sec = 600
self._startup_worker_metrics_cleared = False self._startup_worker_metrics_cleared = False
self.logger = build_logger('Battlesnake', debug_env_var='DEBUG_SERVER') self.logger = build_logger('Battlesnake', debug_env_var='DEBUG_SERVER')
self.snake_builder = SnakeBuilder
self.snake_version = self._get_snake_version()
self.gameplay_database = None
if gameplay_db_enabled:
db_path = gameplay_db_path or os.path.join(data_path, 'data', 'database', 'gameplay.sqlite3')
self.gameplay_database = GameplayDatabase(
db_path=db_path,
busy_timeout_ms=gameplay_db_busy_timeout_ms,
)
self.gameplay_tracking = GameplayTrackingService(
gameplay_database=self.gameplay_database,
snake_type=self.snake_type,
snake_version=self.snake_version,
logger=self.logger,
)
self.dashboard_query = DashboardQueryService(
gameplay_database=self.gameplay_database,
ws_hub=self.dashboard_ws_hub,
logger=self.logger,
dashboard_running_game_stale_sec=self.dashboard_running_game_stale_sec,
)
self.dashboard_events_service = DashboardEventsService( self.dashboard_events_service = DashboardEventsService(
enabled=dashboard_events_enabled, enabled=dashboard_events_enabled,
redis_url=self.metrics_redis_url, redis_url=self.metrics_redis_url,
@@ -96,14 +128,7 @@ class Server:
on_notice=self._on_dashboard_games_update_notice, on_notice=self._on_dashboard_games_update_notice,
logger=self.logger, logger=self.logger,
) )
self.snake_version = self._get_snake_version() self.dashboard_query.set_publish_notice(self.dashboard_events_service.publish_notice)
self.gameplay_database = None
if gameplay_db_enabled:
db_path = gameplay_db_path or os.path.join(data_path, 'data', 'database', 'gameplay.sqlite3')
self.gameplay_database = GameplayDatabase(
db_path=db_path,
busy_timeout_ms=gameplay_db_busy_timeout_ms,
)
self.app = Quart('Battlesnake', template_folder=os.path.join(data_path, 'server', 'templates')) self.app = Quart('Battlesnake', template_folder=os.path.join(data_path, 'server', 'templates'))
@@ -143,7 +168,7 @@ class Server:
shutdown_event = asyncio.Event() shutdown_event = asyncio.Event()
def on_shutdown_signal() -> None: def on_shutdown_signal() -> None:
self._request_dashboard_ws_shutdown() self.dashboard_ws_hub.request_shutdown()
shutdown_event.set() shutdown_event.set()
async def shutdown_trigger() -> None: async def shutdown_trigger() -> None:
@@ -160,7 +185,7 @@ class Server:
try: try:
await self.app.run_task(host=host, port=port, debug=debug, shutdown_trigger=shutdown_trigger) await self.app.run_task(host=host, port=port, debug=debug, shutdown_trigger=shutdown_trigger)
finally: finally:
self._request_dashboard_ws_shutdown() self.dashboard_ws_hub.request_shutdown()
for shutdown_signal in installed_signal_handlers: for shutdown_signal in installed_signal_handlers:
try: try:
loop.remove_signal_handler(shutdown_signal) loop.remove_signal_handler(shutdown_signal)
@@ -207,66 +232,6 @@ class Server:
def _get_stale_game_timeout_sec(self) -> int: def _get_stale_game_timeout_sec(self) -> int:
return max(30, env_int('SNAKE_STUCK_GAME_TIMEOUT_SEC', 180)) return max(30, env_int('SNAKE_STUCK_GAME_TIMEOUT_SEC', 180))
async def _create_game_board(self, game_state:dict) -> GameBoard:
game_id = game_state['game']['id']
new_game_board = GameBoard(
game_id=game_id,
width=game_state['board']['width'],
height=game_state['board']['height'],
ruleset=game_state['game']['ruleset'],
source=game_state['game']['source'],
map=game_state['game']['map'],
snake_class=SnakeBuilder.build(self.snake_type),
)
await new_game_board.start_game(game_state)
if self.game_state_local_cache:
self.running_games[game_id] = new_game_board
await self.game_state_store.save(game_id, new_game_board)
self.game_move_counts[game_id] = 0
self.game_last_seen_unix[game_id] = int(time.time())
await self.metrics_collector.record_game_started(len(self.game_last_seen_unix))
return new_game_board
async def _persist_game_board(self, game_id:str, game_board:GameBoard):
if self.game_state_local_cache:
self.running_games[game_id] = game_board
await self.game_state_store.save(game_id, game_board)
async def _delete_game_board(self, game_state:dict):
game_id = game_state['game']['id']
self.running_games.pop(game_id, None)
self.game_move_counts.pop(game_id, None)
self.game_last_seen_unix.pop(game_id, None)
await self.game_state_store.delete(game_id)
async def _get_game_board(self, game_state:dict, end:bool=False) -> GameBoard:
game_id = game_state['game']['id']
game_board:GameBoard
if self.game_state_local_cache and game_id in self.running_games:
game_board = self.running_games[game_id]
else:
persisted_board = await self.game_state_store.load(game_id)
if persisted_board is not None:
game_board = cast(GameBoard, persisted_board)
if self.game_state_local_cache:
self.running_games[game_id] = game_board
else:
game_board = await self._create_game_board(game_state)
await self.metrics_collector.record_game_autocreated()
if not end:
self.game_move_counts[game_id] = self.game_move_counts.get(game_id, 0) + 1
self.game_last_seen_unix[game_id] = int(time.time())
game_board.read_game_data(game_state)
if end:
game_board.end_game(game_state)
await self._persist_game_board(game_id, game_board)
return game_board
def enable_store_game_state(self): def enable_store_game_state(self):
self.store_game_state = True self.store_game_state = True
@@ -274,199 +239,5 @@ class Server:
storage = StorageLoader.build(self.storage_type)() storage = StorageLoader.build(self.storage_type)()
return storage.cleanup() return storage.cleanup()
async def _prune_stale_games(self):
if not self.game_last_seen_unix:
return
now = int(time.time())
stale_ids = [
game_id
for game_id, last_seen in self.game_last_seen_unix.items()
if now - last_seen >= self.stale_game_timeout_sec
]
for game_id in stale_ids:
self.running_games.pop(game_id, None)
self.game_move_counts.pop(game_id, None)
self.game_last_seen_unix.pop(game_id, None)
await self.metrics_collector.record_stuck_removed()
async def _record_gameplay_start(self, game_state:dict) -> None:
if self.gameplay_database is None:
return
try:
await self.gameplay_database.record_game_start(
game_state,
snake_type=self.snake_type,
snake_version=self.snake_version,
)
except Exception as error:
await await_log(self.logger.warning(f'Gameplay DB start record failed:{error}'))
def _extract_latest_snake_thinking(self, game_board:GameBoard) -> dict|None:
try:
history = game_board.snake_class.get_history()
except Exception:
return None
if not isinstance(history, list) or len(history) == 0:
return None
latest = history[-1]
return latest if isinstance(latest, dict) else None
async def _record_gameplay_turn(self, game_state:dict, my_move:str, game_board:GameBoard) -> None:
if self.gameplay_database is None:
return
try:
thinking = self._extract_latest_snake_thinking(game_board)
await self.gameplay_database.record_turn(game_state, my_move, thinking)
except Exception as error:
await await_log(self.logger.warning(f'Gameplay DB turn record failed:{error}'))
async def _record_gameplay_end(self, game_state:dict) -> None:
if self.gameplay_database is None:
return
try:
await self.gameplay_database.record_game_end(game_state)
except Exception as error:
await await_log(self.logger.warning(f'Gameplay DB end record failed:{error}'))
async def _register_dashboard_game_subscriber(self, subscriber_queue:asyncio.Queue[str]) -> None:
await self.dashboard_ws_hub.register_subscriber(subscriber_queue)
async def _unregister_dashboard_game_subscriber(self, subscriber_queue:asyncio.Queue[str]) -> None:
await self.dashboard_ws_hub.unregister_subscriber(subscriber_queue)
async def _broadcast_dashboard_game_event(self, payload:dict) -> None:
await self.dashboard_ws_hub.broadcast_payload(payload)
async def _register_dashboard_ws_task(self, websocket_task:asyncio.Task) -> None:
await self.dashboard_ws_hub.register_task(websocket_task)
async def _unregister_dashboard_ws_task(self, websocket_task:asyncio.Task) -> None:
await self.dashboard_ws_hub.unregister_task(websocket_task)
def _request_dashboard_ws_shutdown(self) -> None:
self.dashboard_ws_hub.request_shutdown()
async def _on_dashboard_games_update_notice(self, trigger:str) -> None: async def _on_dashboard_games_update_notice(self, trigger:str) -> None:
await self._push_dashboard_games_update( await self.dashboard_query.on_dashboard_games_update_notice(trigger)
game_state=None,
publish_cluster=False,
trigger=trigger,
)
async def _build_dashboard_games_event(self, game_state:dict|None=None, trigger_override:str|None=None) -> dict:
games_payload = await self._get_dashboard_games(limit=100)
summary_payload = await self._get_dashboard_summary()
game_id = None
if game_state is not None:
game_id = game_state.get('game', {}).get('id')
trigger = trigger_override or ('game_saved' if game_id else 'snapshot')
return {
'type': 'dashboard_games_update',
'trigger': trigger,
'games': games_payload,
'summary': summary_payload,
}
async def _build_dashboard_game_replay_event(self, game_id:str, request_id:str|None=None) -> dict:
replay_payload = await self._get_dashboard_game_replay(game_id)
if replay_payload is None:
return {
'type': 'dashboard_game_replay',
'request_id': request_id,
'game_id': game_id,
'error': 'game_not_found',
}
return {
'type': 'dashboard_game_replay',
'request_id': request_id,
'game_id': game_id,
'replay': replay_payload,
}
async def _handle_dashboard_ws_request(self, payload_raw:object) -> dict|None:
if not isinstance(payload_raw, str):
return None
try:
payload = json.loads(payload_raw)
except json.JSONDecodeError:
return None
if not isinstance(payload, dict):
return None
if payload.get('type') != 'dashboard_game_replay_request':
return None
game_id = str(payload.get('game_id') or '').strip()
request_id_raw = payload.get('request_id')
request_id = None if request_id_raw is None else str(request_id_raw)
if game_id == '':
return {
'type': 'dashboard_game_replay',
'request_id': request_id,
'error': 'missing_game_id',
}
return await self._build_dashboard_game_replay_event(
game_id=game_id,
request_id=request_id,
)
async def _push_dashboard_games_update(self, game_state:dict|None=None, publish_cluster:bool=True, trigger:str|None=None) -> None:
if self.gameplay_database is None:
return
event_payload = await self._build_dashboard_games_event(
game_state,
trigger_override=trigger,
)
await self._broadcast_dashboard_game_event(event_payload)
if publish_cluster:
await self.dashboard_events_service.publish_notice(trigger=str(event_payload.get('trigger') or ''))
async def _get_dashboard_summary(self) -> dict:
if self.gameplay_database is None:
return {'enabled': False}
try:
await self._finalize_stale_dashboard_games()
summary = await self.gameplay_database.get_summary()
summary['enabled'] = True
return summary
except Exception as error:
await await_log(self.logger.warning(f'Gameplay DB summary failed:{error}'))
return {'enabled': True, 'error': ' summary_unavailable'}
async def _get_dashboard_games(self, limit:int=50) -> dict:
if self.gameplay_database is None:
return {'enabled': False, 'games': []}
try:
await self._finalize_stale_dashboard_games()
games = await self.gameplay_database.list_games(limit=limit)
return {'enabled': True, 'games': games}
except Exception as error:
await await_log(self.logger.warning(f'Gameplay DB game list failed:{error}'))
return {'enabled': True, 'error': 'games_unavailable', 'games': []}
async def _finalize_stale_dashboard_games(self) -> None:
if self.gameplay_database is None:
return
try:
await self.gameplay_database.finalize_stale_running_games(stale_after_seconds=self.dashboard_running_game_stale_sec)
except Exception as error:
await await_log(self.logger.warning(f'Gameplay DB stale running game finalize failed:{error}'))
async def _get_dashboard_game_replay(self, game_id:str) -> dict|None:
if self.gameplay_database is None:
return {'enabled': False, 'error': 'database_disabled', 'game_id': game_id}
try:
replay = await self.gameplay_database.get_game_replay(game_id)
if replay is None:
return None
replay['enabled'] = True
return replay
except Exception as error:
await await_log(self.logger.warning(f'Gameplay DB replay failed:{error}'))
return {'enabled': True, 'error': 'replay_unavailable', 'game_id': game_id}
+12 -12
View File
@@ -23,10 +23,10 @@ def create_battlesnake_blueprint(server:'Server') -> Blueprint:
@blueprint.post('/start') @blueprint.post('/start')
async def on_start(): async def on_start():
server.metrics_collector.record_http_request('start') server.metrics_collector.record_http_request('start')
await server._prune_stale_games() await server.game_runtime.prune_stale_games()
game_state = await request.get_json() game_state = await request.get_json()
await server._create_game_board(game_state) await server.game_runtime.create_game_board(game_state, snake_builder=server.snake_builder)
await server._record_gameplay_start(game_state) await server.gameplay_tracking.record_gameplay_start(game_state)
await await_log(server.logger.info(f'GAME START: {game_state['game']}')) await await_log(server.logger.info(f'GAME START: {game_state['game']}'))
return 'ok' return 'ok'
@@ -35,10 +35,10 @@ def create_battlesnake_blueprint(server:'Server') -> Blueprint:
server.metrics_collector.record_http_request('move') server.metrics_collector.record_http_request('move')
game_state = await request.get_json() game_state = await request.get_json()
move_started = time.perf_counter() move_started = time.perf_counter()
game_board = cast(GameBoard, await server._get_game_board(game_state)) game_board = cast(GameBoard, await server.game_runtime.get_game_board(game_state, snake_builder=server.snake_builder))
next_move = game_board.snake_neat_make_a_move() next_move = game_board.snake_neat_make_a_move()
await server._persist_game_board(game_state['game']['id'], game_board) await server.game_runtime.persist_game_board(game_state['game']['id'], game_board)
await server._record_gameplay_turn(game_state, next_move, game_board) await server.gameplay_tracking.record_gameplay_turn(game_state, next_move, game_board)
elapsed_ms = (time.perf_counter() - move_started) * 1000.0 elapsed_ms = (time.perf_counter() - move_started) * 1000.0
await server.metrics_collector.record_move(next_move, elapsed_ms) await server.metrics_collector.record_move(next_move, elapsed_ms)
@@ -50,10 +50,10 @@ def create_battlesnake_blueprint(server:'Server') -> Blueprint:
@blueprint.post('/end') @blueprint.post('/end')
async def on_end(): async def on_end():
server.metrics_collector.record_http_request('end') server.metrics_collector.record_http_request('end')
await server._prune_stale_games() await server.game_runtime.prune_stale_games()
game_state = await request.get_json() game_state = await request.get_json()
if server.store_game_state: if server.store_game_state:
game_board = cast(GameBoard, await server._get_game_board(game_state, end=True)) game_board = cast(GameBoard, await server.game_runtime.get_game_board(game_state, snake_builder=server.snake_builder, end=True))
if server.check_tls_security: if server.check_tls_security:
await game_board.save( await game_board.save(
StorageLoader.build(server.storage_type), StorageLoader.build(server.storage_type),
@@ -68,14 +68,14 @@ def create_battlesnake_blueprint(server:'Server') -> Blueprint:
database=os.getenv('EDGEDB_DATABASE', None), database=os.getenv('EDGEDB_DATABASE', None),
) )
await server._record_gameplay_end(game_state) await server.gameplay_tracking.record_gameplay_end(game_state)
await server._push_dashboard_games_update(game_state) await server.dashboard_query.push_dashboard_games_update(game_state)
await await_log(server.logger.info(f'GAME ENDED: Winner is {[x['name'] for x in game_state['board']['snakes']]}')) await await_log(server.logger.info(f'GAME ENDED: Winner is {[x['name'] for x in game_state['board']['snakes']]}'))
await server._delete_game_board(game_state) await server.game_runtime.delete_game_board(game_state)
await server.metrics_collector.record_game_end(game_state) await server.metrics_collector.record_game_end(game_state)
return 'ok' return 'ok'
@blueprint.get("/cleanup") @blueprint.get('/cleanup')
async def cleanup(): async def cleanup():
results = server._cleanup_database() results = server._cleanup_database()
return jsonify(data=json.loads(results), status=200) return jsonify(data=json.loads(results), status=200)
+8 -8
View File
@@ -18,8 +18,8 @@ def create_dashboard_blueprint(server:'Server') -> Blueprint:
@blueprint.get('/dashboard') @blueprint.get('/dashboard')
async def dashboard_view(): async def dashboard_view():
initial_game_id = request.args.get('game_id', '') initial_game_id = request.args.get('game_id', '')
initial_summary = await server._get_dashboard_summary() initial_summary = await server.dashboard_query.get_dashboard_summary()
initial_games = await server._get_dashboard_games(limit=100) initial_games = await server.dashboard_query.get_dashboard_games(limit=100)
return await render_template( return await render_template(
'dashboard.html', 'dashboard.html',
initial_game_id=initial_game_id, initial_game_id=initial_game_id,
@@ -42,12 +42,12 @@ def create_dashboard_blueprint(server:'Server') -> Blueprint:
ws_hub = server.dashboard_ws_hub ws_hub = server.dashboard_ws_hub
websocket_task = asyncio.current_task() websocket_task = asyncio.current_task()
if websocket_task is not None: if websocket_task is not None:
await server._register_dashboard_ws_task(websocket_task) await ws_hub.register_task(websocket_task)
subscriber_queue:asyncio.Queue[str] = asyncio.Queue(maxsize=20) subscriber_queue:asyncio.Queue[str] = asyncio.Queue(maxsize=20)
await server._register_dashboard_game_subscriber(subscriber_queue) await ws_hub.register_subscriber(subscriber_queue)
try: try:
initial_payload = await server._build_dashboard_games_event() initial_payload = await server.dashboard_query.build_dashboard_games_event()
await asyncio.wait_for( await asyncio.wait_for(
websocket.send(json.dumps(initial_payload)), timeout=1.5 websocket.send(json.dumps(initial_payload)), timeout=1.5
) )
@@ -76,7 +76,7 @@ def create_dashboard_blueprint(server:'Server') -> Blueprint:
except Exception: except Exception:
break break
response_event = await server._handle_dashboard_ws_request(request_payload_raw) response_event = await server.dashboard_query.handle_dashboard_ws_request(request_payload_raw)
if response_event is not None: if response_event is not None:
await asyncio.wait_for( await asyncio.wait_for(
websocket.send(json.dumps(response_event)), websocket.send(json.dumps(response_event)),
@@ -112,8 +112,8 @@ def create_dashboard_blueprint(server:'Server') -> Blueprint:
except Exception: except Exception:
pass pass
finally: finally:
await server._unregister_dashboard_game_subscriber(subscriber_queue) await ws_hub.unregister_subscriber(subscriber_queue)
if websocket_task is not None: if websocket_task is not None:
await server._unregister_dashboard_ws_task(websocket_task) await ws_hub.unregister_task(websocket_task)
return blueprint return blueprint
+9 -9
View File
@@ -4,27 +4,27 @@ from typing import TYPE_CHECKING
if TYPE_CHECKING: if TYPE_CHECKING:
from server.Server import Server from server.Server import Server
def create_metrics_blueprint(server:"Server") -> Blueprint: def create_metrics_blueprint(server:'Server') -> Blueprint:
blueprint = Blueprint("metrics", __name__) blueprint = Blueprint('metrics', __name__)
@blueprint.get("/metrics") @blueprint.get('/metrics')
async def metrics(): async def metrics():
snapshot = await server.metrics_collector.build_snapshot( snapshot = await server.metrics_collector.build_snapshot(
server.game_last_seen_unix, server.game_runtime.game_last_seen_unix,
server.game_move_counts, server.game_runtime.game_move_counts,
) )
return jsonify(snapshot) return jsonify(snapshot)
@blueprint.get("/metrics/prometheus") @blueprint.get('/metrics/prometheus')
async def metrics_prometheus(): async def metrics_prometheus():
snapshot = await server.metrics_collector.build_snapshot( snapshot = await server.metrics_collector.build_snapshot(
server.game_last_seen_unix, server.game_runtime.game_last_seen_unix,
server.game_move_counts, server.game_runtime.game_move_counts,
) )
return ( return (
server.metrics_collector.build_prometheus_metrics(snapshot), server.metrics_collector.build_prometheus_metrics(snapshot),
200, 200,
{"Content-Type": "text/plain; version=0.0.4; charset=utf-8"}, {'Content-Type': 'text/plain; version=0.0.4; charset=utf-8'},
) )
return blueprint return blueprint
+3
View File
@@ -1,2 +1,5 @@
from .dashboard_events import DashboardEventsService from .dashboard_events import DashboardEventsService
from .dashboard_ws_hub import DashboardWebSocketHub from .dashboard_ws_hub import DashboardWebSocketHub
from .game_runtime import GameRuntimeService
from .gameplay_tracking import GameplayTrackingService
from .dashboard_query import DashboardQueryService
+144
View File
@@ -0,0 +1,144 @@
from quart_common.web.logger import await_log, logging
from typing import Awaitable, Callable
import json
from .dashboard_ws_hub import DashboardWebSocketHub
from server.database import GameplayDatabase
class DashboardQueryService:
def __init__(self, gameplay_database:GameplayDatabase, ws_hub:DashboardWebSocketHub, logger:logging, dashboard_running_game_stale_sec:int):
self.gameplay_database = gameplay_database
self.ws_hub = ws_hub
self.logger = logger
self.dashboard_running_game_stale_sec = dashboard_running_game_stale_sec
self.publish_notice:Callable[[str], Awaitable[None]] | None = None
def set_publish_notice(self, publish_notice:Callable[[str], Awaitable[None]]) -> None:
self.publish_notice = publish_notice
async def on_dashboard_games_update_notice(self, trigger:str) -> None:
await self.push_dashboard_games_update(
game_state=None,
publish_cluster=False,
trigger=trigger,
)
async def build_dashboard_games_event(self, game_state:dict|None=None, trigger_override:str|None=None) -> dict:
games_payload = await self.get_dashboard_games(limit=100)
summary_payload = await self.get_dashboard_summary()
game_id = None
if game_state is not None:
game_id = game_state.get('game', {}).get('id')
trigger = trigger_override or ('game_saved' if game_id else 'snapshot')
return {
'type': 'dashboard_games_update',
'trigger': trigger,
'games': games_payload,
'summary': summary_payload,
}
async def build_dashboard_game_replay_event(self, game_id:str, request_id:str|None=None) -> dict:
replay_payload = await self.get_dashboard_game_replay(game_id)
if replay_payload is None:
return {
'type': 'dashboard_game_replay',
'request_id': request_id,
'game_id': game_id,
'error': 'game_not_found',
}
return {
'type': 'dashboard_game_replay',
'request_id': request_id,
'game_id': game_id,
'replay': replay_payload,
}
async def handle_dashboard_ws_request(self, payload_raw:object) -> dict|None:
if not isinstance(payload_raw, str):
return None
try:
payload = json.loads(payload_raw)
except json.JSONDecodeError:
return None
if not isinstance(payload, dict):
return None
if payload.get('type') != 'dashboard_game_replay_request':
return None
game_id = str(payload.get('game_id') or '').strip()
request_id_raw = payload.get('request_id')
request_id = None if request_id_raw is None else str(request_id_raw)
if game_id == '':
return {
'type': 'dashboard_game_replay',
'request_id': request_id,
'error': 'missing_game_id',
}
return await self.build_dashboard_game_replay_event(
game_id=game_id,
request_id=request_id,
)
async def push_dashboard_games_update(self, game_state:dict|None=None, publish_cluster:bool=True, trigger:str|None=None) -> None:
if self.gameplay_database is None:
return
event_payload = await self.build_dashboard_games_event(
game_state,
trigger_override=trigger,
)
await self.ws_hub.broadcast_payload(event_payload)
if publish_cluster and self.publish_notice is not None:
await self.publish_notice(str(event_payload.get('trigger') or ''))
async def get_dashboard_summary(self) -> dict:
if self.gameplay_database is None:
return {'enabled': False}
try:
await self._finalize_stale_dashboard_games()
summary = await self.gameplay_database.get_summary()
summary['enabled'] = True
return summary
except Exception as error:
await await_log(self.logger.warning(f'Gameplay DB summary failed:{error}'))
return {'enabled': True, 'error': ' summary_unavailable'}
async def get_dashboard_games(self, limit:int=50) -> dict:
if self.gameplay_database is None:
return {'enabled': False, 'games': []}
try:
await self._finalize_stale_dashboard_games()
games = await self.gameplay_database.list_games(limit=limit)
return {'enabled': True, 'games': games}
except Exception as error:
await await_log(
self.logger.warning(f'Gameplay DB game list failed:{error}')
)
return {'enabled': True, 'error': 'games_unavailable', 'games': []}
async def get_dashboard_game_replay(self, game_id:str) -> dict|None:
if self.gameplay_database is None:
return {'enabled': False, 'error': 'database_disabled', 'game_id': game_id}
try:
replay = await self.gameplay_database.get_game_replay(game_id)
if replay is None:
return None
replay['enabled'] = True
return replay
except Exception as error:
await await_log(self.logger.warning(f'Gameplay DB replay failed:{error}'))
return {'enabled': True, 'error': 'replay_unavailable', 'game_id': game_id}
async def _finalize_stale_dashboard_games(self) -> None:
if self.gameplay_database is None:
return
try:
await self.gameplay_database.finalize_stale_running_games(stale_after_seconds=self.dashboard_running_game_stale_sec)
except Exception as error:
await await_log(self.logger.warning(f'Gameplay DB stale running game finalize failed:{error}'))
+103
View File
@@ -0,0 +1,103 @@
from typing import cast
import time
from server.metrics import MetricsCollector
from server.GameBoard import GameBoard
from storage import StorageLoader
from snakes import SnakeBuilder
class GameRuntimeService:
def __init__(self, game_state_store:StorageLoader, snake_type:str, game_state_local_cache:bool, stale_game_timeout_sec:int):
self.game_state_store = game_state_store
self.snake_type = snake_type
self.game_state_local_cache = game_state_local_cache
self.stale_game_timeout_sec = stale_game_timeout_sec
self.metrics_collector = None
self.running_games: dict[str, GameBoard] = {}
self.game_move_counts: dict[str, int] = {}
self.game_last_seen_unix: dict[str, int] = {}
def attach_metrics_collector(self, metrics_collector:MetricsCollector) -> None:
self.metrics_collector = metrics_collector
async def create_game_board(self, game_state:dict, snake_builder:SnakeBuilder) -> GameBoard:
game_id = game_state['game']['id']
new_game_board = GameBoard(
game_id=game_id,
width=game_state['board']['width'],
height=game_state['board']['height'],
ruleset=game_state['game']['ruleset'],
source=game_state['game']['source'],
map=game_state['game']['map'],
snake_class=snake_builder.build(self.snake_type),
)
await new_game_board.start_game(game_state)
if self.game_state_local_cache:
self.running_games[game_id] = new_game_board
await self.game_state_store.save(game_id, new_game_board)
self.game_move_counts[game_id] = 0
self.game_last_seen_unix[game_id] = int(time.time())
if self.metrics_collector is not None:
await self.metrics_collector.record_game_started(len(self.game_last_seen_unix))
return new_game_board
async def persist_game_board(self, game_id:str, game_board:GameBoard) -> None:
if self.game_state_local_cache:
self.running_games[game_id] = game_board
await self.game_state_store.save(game_id, game_board)
async def delete_game_board(self, game_state:dict) -> None:
game_id = game_state['game']['id']
self.running_games.pop(game_id, None)
self.game_move_counts.pop(game_id, None)
self.game_last_seen_unix.pop(game_id, None)
await self.game_state_store.delete(game_id)
async def get_game_board(self, game_state:dict, snake_builder:SnakeBuilder, end:bool=False) -> GameBoard:
game_id = game_state['game']['id']
game_board: GameBoard
if self.game_state_local_cache and game_id in self.running_games:
game_board = self.running_games[game_id]
else:
persisted_board = await self.game_state_store.load(game_id)
if persisted_board is not None:
game_board = cast(GameBoard, persisted_board)
if self.game_state_local_cache:
self.running_games[game_id] = game_board
else:
game_board = await self.create_game_board(game_state, snake_builder)
if self.metrics_collector is not None:
await self.metrics_collector.record_game_autocreated()
if not end:
self.game_move_counts[game_id] = self.game_move_counts.get(game_id, 0) + 1
self.game_last_seen_unix[game_id] = int(time.time())
game_board.read_game_data(game_state)
if end:
game_board.end_game(game_state)
await self.persist_game_board(game_id, game_board)
return game_board
async def prune_stale_games(self) -> None:
if not self.game_last_seen_unix:
return
now = int(time.time())
stale_ids = [
game_id
for game_id, last_seen in self.game_last_seen_unix.items()
if now - last_seen >= self.stale_game_timeout_sec
]
for game_id in stale_ids:
self.running_games.pop(game_id, None)
self.game_move_counts.pop(game_id, None)
self.game_last_seen_unix.pop(game_id, None)
if self.metrics_collector is not None:
await self.metrics_collector.record_stuck_removed()
+50
View File
@@ -0,0 +1,50 @@
from quart_common.web.logger import await_log, logging
from server.database import GameplayDatabase
from server.GameBoard import GameBoard
class GameplayTrackingService:
def __init__(self, gameplay_database:GameplayDatabase, snake_type:str, snake_version:str, logger:logging):
self.gameplay_database = gameplay_database
self.snake_type = snake_type
self.snake_version = snake_version
self.logger = logger
async def record_gameplay_start(self, game_state:dict) -> None:
if self.gameplay_database is None:
return
try:
await self.gameplay_database.record_game_start(
game_state,
snake_type=self.snake_type,
snake_version=self.snake_version,
)
except Exception as error:
await await_log(self.logger.warning(f"Gameplay DB start record failed:{error}"))
async def record_gameplay_turn(self, game_state:dict, my_move:str, game_board:GameBoard) -> None:
if self.gameplay_database is None:
return
try:
thinking = self._extract_latest_snake_thinking(game_board)
await self.gameplay_database.record_turn(game_state, my_move, thinking)
except Exception as error:
await await_log(self.logger.warning(f"Gameplay DB turn record failed:{error}"))
async def record_gameplay_end(self, game_state:dict) -> None:
if self.gameplay_database is None:
return
try:
await self.gameplay_database.record_game_end(game_state)
except Exception as error:
await await_log(self.logger.warning(f"Gameplay DB end record failed:{error}"))
def _extract_latest_snake_thinking(self, game_board:GameBoard) -> dict|None:
try:
history = game_board.snake_class.get_history()
except Exception:
return None
if not isinstance(history, list) or len(history) == 0:
return None
latest = history[-1]
return latest if isinstance(latest, dict) else None