diff --git a/server/Server.py b/server/Server.py index 6081a64..3e27005 100644 --- a/server/Server.py +++ b/server/Server.py @@ -14,16 +14,17 @@ from server.metrics import ( MetricsCollector, ) -from quart import ( - Quart, - request, - jsonify, - render_template, - send_from_directory, - websocket, -) -import asyncio, signal, inspect, logging, json, os, re, time +import asyncio, signal, logging, json, os, re, time from typing import cast +from quart import Quart + +from server.blueprints import ( + create_battlesnake_blueprint, + create_metrics_blueprint, + create_dashboard_blueprint, +) +from server.services import DashboardEventsService +from server.services import DashboardWebSocketHub class Server: default_snake_config = { @@ -57,21 +58,13 @@ class Server: self.metrics_redis_url = metrics_redis_url self.stale_game_timeout_sec = self._get_stale_game_timeout_sec() - self.running_games:dict[str, GameBoard] = {} - self.game_move_counts:dict[str, int] = {} - self.game_last_seen_unix:dict[str, int] = {} - self.dashboard_game_subscribers:set[asyncio.Queue[str]] = set() - self.dashboard_game_subscribers_lock = asyncio.Lock() - self.dashboard_ws_tasks:set[asyncio.Task] = set() - self.dashboard_ws_tasks_lock = asyncio.Lock() - self.dashboard_ws_shutdown_event = asyncio.Event() - self.dashboard_ws_shutdown_message = json.dumps({'type': 'dashboard_ws_shutdown'}) - self.dashboard_event_origin = f'worker-{os.getpid()}-{int(time.time() * 1000)}' - self.dashboard_events_channel = os.getenv('DASHBOARD_EVENTS_CHANNEL', 'snake:dashboard:events') - self.dashboard_events_enabled = (self.metrics_backend_normalized == 'redis' and self._env_bool('DASHBOARD_EVENTS_ENABLED', True)) - self.dashboard_events_listener_task:asyncio.Task | None = None - self.dashboard_events_redis = None - self.dashboard_events_pubsub = None + self.running_games: dict[str, GameBoard] = {} + self.game_move_counts: dict[str, int] = {} + self.game_last_seen_unix: dict[str, int] = {} + self.dashboard_ws_hub = DashboardWebSocketHub() + dashboard_event_origin = f'worker-{os.getpid()}-{int(time.time() * 1000)}' + dashboard_events_channel = os.getenv('DASHBOARD_EVENTS_CHANNEL', 'snake:dashboard:events') + dashboard_events_enabled = (self.metrics_backend_normalized == 'redis' and self._env_bool('DASHBOARD_EVENTS_ENABLED', True)) self.metrics_collector = MetricsCollector( metrics_manager=MetricsStoreBuilder.build( @@ -93,6 +86,15 @@ class Server: self._startup_worker_metrics_cleared = False self.logger = build_logger('Battlesnake', debug_env_var='DEBUG_SERVER') + self.dashboard_events_service = DashboardEventsService( + enabled=dashboard_events_enabled, + redis_url=self.metrics_redis_url, + channel=dashboard_events_channel, + event_origin=dashboard_event_origin, + shutdown_event=self.dashboard_ws_hub.shutdown_event, + on_notice=self._on_dashboard_games_update_notice, + logger=self.logger, + ) self.snake_version = self._get_snake_version() self.gameplay_database = None if gameplay_db_enabled: @@ -104,74 +106,9 @@ class Server: self.app = Quart('Battlesnake', template_folder=os.path.join(data_path, 'server', 'templates')) - # info is called when you create your Battlesnake on play.battlesnake.com - # and controls your Battlesnake's appearance - # TIP: If you open your Battlesnake URL in a browser you should see this data - @self.app.get('/') - async def on_info(): - self.metrics_collector.record_http_request('info') - snake_config = await self._read_json_config_or_create() - - await await_log(self.logger.info(f'INFO Snake: {snake_config}')) - return snake_config - - # start is called when your Battlesnake begins a game - @self.app.post('/start') - async def on_start(): - self.metrics_collector.record_http_request('start') - await self._prune_stale_games() - game_state = await request.get_json() - await self._create_game_board(game_state) - await self._record_gameplay_start(game_state) - await await_log(self.logger.info(f'GAME START: {game_state['game']}')) - return 'ok' - - # move is called when your Battlesnake game is running game - @self.app.post('/move') - async def on_move(): - self.metrics_collector.record_http_request('move') - game_state = await request.get_json() - move_started = time.perf_counter() - game_board = cast(GameBoard, await self._get_game_board(game_state)) - next_move = game_board.snake_neat_make_a_move() - await self._persist_game_board(game_state['game']['id'], game_board) - await self._record_gameplay_turn(game_state, next_move, game_board) - elapsed_ms = (time.perf_counter() - move_started) * 1000.0 - await self.metrics_collector.record_move(next_move, elapsed_ms) - - if self.debug: - await await_log(self.logger.debug(f'TURN: {game_state['turn']:3}, MOVE: {next_move:5}')) - - return {'move': next_move} - - # end is called when your Battlesnake finishes a game - @self.app.post('/end') - async def on_end(): - self.metrics_collector.record_http_request('end') - await self._prune_stale_games() - game_state = await request.get_json() - if self.store_game_state: - game_board = cast(GameBoard, await self._get_game_board(game_state, end=True)) - if self.check_tls_security: - await game_board.save( - StorageLoader.build(self.storage_type), - file_path=os.path.join(self.data_path, 'data'), - database=os.getenv('EDGEDB_DATABASE', None), - tls_security=None, - ) - else: - await game_board.save( - StorageLoader.build(self.storage_type), - file_path=os.path.join(self.data_path, 'data'), - database=os.getenv('EDGEDB_DATABASE', None), - ) - - await self._record_gameplay_end(game_state) - await self._push_dashboard_games_update(game_state) - await await_log(self.logger.info(f'GAME ENDED: Winner is {[x['name'] for x in game_state['board']['snakes']]}')) - await self._delete_game_board(game_state) - await self.metrics_collector.record_game_end(game_state) - return 'ok' + self.app.register_blueprint(create_battlesnake_blueprint(self)) + self.app.register_blueprint(create_metrics_blueprint(self)) + self.app.register_blueprint(create_dashboard_blueprint(self)) @self.app.after_request async def identify_server(response): @@ -187,135 +124,22 @@ class Server: should_clear = await self.metrics_collector.should_clear_worker_metrics_on_startup(self.worker_metrics_startup_lock_ttl_sec) if should_clear: await self.metrics_collector.clear_worker_metrics() - await self._start_dashboard_events_listener() + await self.dashboard_events_service.start_listener() @self.app.after_serving async def shutdown_state_storage(): - await self._stop_dashboard_events_listener() + await self.dashboard_events_service.stop_listener() await self.game_state_store.close() await self.metrics_collector.close() if self.gameplay_database is not None: await self.gameplay_database.close() - @self.app.get('/cleanup') - async def cleanup(): - results = self._cleanup_database() - return jsonify(data=json.loads(results), status=200) - - @self.app.get('/metrics') - async def metrics(): - snapshot = await self.metrics_collector.build_snapshot(self.game_last_seen_unix, self.game_move_counts) - return jsonify(snapshot) - - @self.app.get('/metrics/prometheus') - async def metrics_prometheus(): - snapshot = await self.metrics_collector.build_snapshot(self.game_last_seen_unix, self.game_move_counts) - return ( - self.metrics_collector.build_prometheus_metrics(snapshot), - 200, - {'Content-Type': 'text/plain; version=0.0.4; charset=utf-8'}, - ) - - @self.app.get('/dashboard') - async def dashboard_view(): - initial_game_id = request.args.get('game_id', '') - initial_summary = await self._get_dashboard_summary() - initial_games = await self._get_dashboard_games(limit=100) - return await render_template( - 'dashboard.html', - initial_game_id=initial_game_id, - initial_summary=initial_summary, - initial_games=initial_games, - ) - - @self.app.get('/dashboard/customizations/') - async def dashboard_customizations_asset(asset_path: str): - customization_root = os.path.join(self.data_path, 'server', 'static', 'customizations') - return await send_from_directory(customization_root, asset_path) - - @self.app.websocket('/dashboard/ws/games') - async def dashboard_games_ws(): - websocket_task = asyncio.current_task() - if websocket_task is not None: - await self._register_dashboard_ws_task(websocket_task) - - subscriber_queue: asyncio.Queue[str] = asyncio.Queue(maxsize=20) - await self._register_dashboard_game_subscriber(subscriber_queue) - try: - initial_payload = await self._build_dashboard_games_event() - await asyncio.wait_for(websocket.send(json.dumps(initial_payload)), timeout=1.5) - while True: - queue_task = asyncio.create_task(subscriber_queue.get()) - receive_task = asyncio.create_task(websocket.receive()) - try: - done, _ = await asyncio.wait( - {queue_task, receive_task}, - timeout=1.0, - return_when=asyncio.FIRST_COMPLETED, - ) - - if len(done) == 0: - if self.dashboard_ws_shutdown_event.is_set(): - await asyncio.wait_for( - websocket.send(self.dashboard_ws_shutdown_message), - timeout=1.5, - ) - break - continue - - if receive_task in done: - try: - request_payload_raw = receive_task.result() - except Exception: - break - - response_event = await self._handle_dashboard_ws_request( - request_payload_raw - ) - if response_event is not None: - await asyncio.wait_for( - websocket.send(json.dumps(response_event)), - timeout=1.5, - ) - - if queue_task in done: - event_payload = queue_task.result() - if event_payload == self.dashboard_ws_shutdown_message: - await asyncio.wait_for( - websocket.send(event_payload), timeout=1.5 - ) - break - await asyncio.wait_for( - websocket.send(event_payload), timeout=1.5 - ) - except asyncio.TimeoutError: - if self.dashboard_ws_shutdown_event.is_set(): - await asyncio.wait_for( - websocket.send(self.dashboard_ws_shutdown_message), - timeout=1.5, - ) - break - finally: - for pending_task in (queue_task, receive_task): - if not pending_task.done(): - pending_task.cancel() - await asyncio.gather( - queue_task, receive_task, return_exceptions=True - ) - except asyncio.CancelledError: - pass - except Exception: - pass - finally: - await self._unregister_dashboard_game_subscriber(subscriber_queue) - if websocket_task is not None: - await self._unregister_dashboard_ws_task(websocket_task) - async def run(self, host:str='0.0.0.0', port:int=8000, debug:bool=False): logging.getLogger('werkzeug').setLevel(logging.ERROR) loop = asyncio.get_running_loop() - shutdown_event = asyncio.Event() + installed_signal_handlers:list[signal.Signals] = [] + shutdown_event = asyncio.Event() def on_shutdown_signal() -> None: self._request_dashboard_ws_shutdown() @@ -392,7 +216,7 @@ class Server: return default return value.strip().lower() in {'1', 'true', 'yes', 'on'} - def _env_int(self, name: str, default: int) -> int: + def _env_int(self, name:str, default:int) -> int: value = os.getenv(name) if value is None: return default @@ -496,7 +320,7 @@ class Server: except Exception as error: await await_log(self.logger.warning(f'Gameplay DB start record failed:{error}')) - def _extract_latest_snake_thinking(self, game_board:GameBoard) -> dict | None: + def _extract_latest_snake_thinking(self, game_board:GameBoard) -> dict|None: try: history = game_board.snake_class.get_history() except Exception: @@ -524,175 +348,29 @@ class Server: await await_log(self.logger.warning(f'Gameplay DB end record failed:{error}')) async def _register_dashboard_game_subscriber(self, subscriber_queue:asyncio.Queue[str]) -> None: - async with self.dashboard_game_subscribers_lock: - self.dashboard_game_subscribers.add(subscriber_queue) + await self.dashboard_ws_hub.register_subscriber(subscriber_queue) async def _unregister_dashboard_game_subscriber(self, subscriber_queue:asyncio.Queue[str]) -> None: - async with self.dashboard_game_subscribers_lock: - self.dashboard_game_subscribers.discard(subscriber_queue) + await self.dashboard_ws_hub.unregister_subscriber(subscriber_queue) async def _broadcast_dashboard_game_event(self, payload:dict) -> None: - encoded_payload = json.dumps(payload) - async with self.dashboard_game_subscribers_lock: - subscribers = tuple(self.dashboard_game_subscribers) - - for subscriber_queue in subscribers: - if subscriber_queue.full(): - try: - subscriber_queue.get_nowait() - except asyncio.QueueEmpty: - pass - - try: - subscriber_queue.put_nowait(encoded_payload) - except asyncio.QueueFull: - continue + await self.dashboard_ws_hub.broadcast_payload(payload) async def _register_dashboard_ws_task(self, websocket_task:asyncio.Task) -> None: - async with self.dashboard_ws_tasks_lock: - self.dashboard_ws_tasks.add(websocket_task) + await self.dashboard_ws_hub.register_task(websocket_task) async def _unregister_dashboard_ws_task(self, websocket_task:asyncio.Task) -> None: - async with self.dashboard_ws_tasks_lock: - self.dashboard_ws_tasks.discard(websocket_task) + await self.dashboard_ws_hub.unregister_task(websocket_task) def _request_dashboard_ws_shutdown(self) -> None: - if self.dashboard_ws_shutdown_event.is_set(): - return + self.dashboard_ws_hub.request_shutdown() - self.dashboard_ws_shutdown_event.set() - for subscriber_queue in tuple(self.dashboard_game_subscribers): - if subscriber_queue.full(): - try: - subscriber_queue.get_nowait() - except asyncio.QueueEmpty: - pass - try: - subscriber_queue.put_nowait(self.dashboard_ws_shutdown_message) - except asyncio.QueueFull: - continue - - async def _start_dashboard_events_listener(self) -> None: - if not self.dashboard_events_enabled: - return - if self.dashboard_events_listener_task is not None: - return - - try: - import redis.asyncio as aioredis # type: ignore[import-not-found] - - self.dashboard_events_redis = aioredis.from_url(self.metrics_redis_url) - self.dashboard_events_pubsub = self.dashboard_events_redis.pubsub() - await self.dashboard_events_pubsub.subscribe(self.dashboard_events_channel) - self.dashboard_events_listener_task = asyncio.create_task( - self._dashboard_events_listener_loop() - ) - except Exception as error: - self.dashboard_events_listener_task = None - self.dashboard_events_pubsub = None - self.dashboard_events_redis = None - await await_log(self.logger.warning(f'Dashboard events listener disabled (redis unavailable): {error}')) - - async def _stop_dashboard_events_listener(self) -> None: - listener_task = self.dashboard_events_listener_task - self.dashboard_events_listener_task = None - if listener_task is not None: - listener_task.cancel() - await asyncio.gather(listener_task, return_exceptions=True) - - pubsub = self.dashboard_events_pubsub - self.dashboard_events_pubsub = None - if pubsub is not None: - try: - await pubsub.unsubscribe(self.dashboard_events_channel) - except Exception: - pass - close_method = getattr(pubsub, 'aclose', None) - if callable(close_method): - try: - maybe_result = close_method() - if inspect.isawaitable(maybe_result): - await maybe_result - except Exception: - pass - - redis_client = self.dashboard_events_redis - self.dashboard_events_redis = None - if redis_client is not None: - close_method = getattr(redis_client, 'aclose', None) - if callable(close_method): - try: - maybe_result = close_method() - if inspect.isawaitable(maybe_result): - await maybe_result - except Exception: - pass - - async def _dashboard_events_listener_loop(self) -> None: - pubsub = self.dashboard_events_pubsub - if pubsub is None: - return - - try: - while not self.dashboard_ws_shutdown_event.is_set(): - message = await pubsub.get_message( - ignore_subscribe_messages=True, - timeout=1.0, - ) - if message is None: - continue - - raw_data = message.get('data') - if isinstance(raw_data, bytes): - payload_raw = raw_data.decode('utf-8', errors='replace') - else: - payload_raw = str(raw_data) - - try: - payload = json.loads(payload_raw) - except json.JSONDecodeError: - continue - - if not isinstance(payload, dict): - continue - if payload.get('type') != 'dashboard_games_update_notice': - continue - if payload.get('origin') == self.dashboard_event_origin: - continue - notice_trigger = str(payload.get('trigger') or 'game_saved') - - await self._push_dashboard_games_update( - game_state=None, - publish_cluster=False, - trigger=notice_trigger, - ) - except asyncio.CancelledError: - pass - except Exception as error: - await await_log( - self.logger.warning(f'Dashboard events listener stopped unexpectedly: {error}')) - - async def _publish_dashboard_games_update_notice(self, trigger: str) -> None: - if not self.dashboard_events_enabled: - return - if self.dashboard_events_redis is None: - return - if trigger not in {'game_saved', 'stale_finalized', 'manual'}: - return - - message = { - 'type': 'dashboard_games_update_notice', - 'origin': self.dashboard_event_origin, - 'trigger': trigger, - 'sent_at': int(time.time()), - } - try: - await self.dashboard_events_redis.publish( - self.dashboard_events_channel, - json.dumps(message), - ) - except Exception as error: - await await_log(self.logger.warning(f'Dashboard events publish failed: {error}')) + async def _on_dashboard_games_update_notice(self, trigger:str) -> None: + await self._push_dashboard_games_update( + game_state=None, + publish_cluster=False, + trigger=trigger, + ) async def _build_dashboard_games_event(self, game_state:dict|None=None, trigger_override:str|None=None) -> dict: games_payload = await self._get_dashboard_games(limit=100) @@ -726,7 +404,7 @@ class Server: 'replay': replay_payload, } - async def _handle_dashboard_ws_request(self, payload_raw: object) -> dict | None: + async def _handle_dashboard_ws_request(self, payload_raw:object) -> dict|None: if not isinstance(payload_raw, str): return None @@ -765,7 +443,7 @@ class Server: ) await self._broadcast_dashboard_game_event(event_payload) if publish_cluster: - await self._publish_dashboard_games_update_notice(trigger=str(event_payload.get('trigger') or '')) + await self.dashboard_events_service.publish_notice(trigger=str(event_payload.get('trigger') or '')) async def _get_dashboard_summary(self) -> dict: if self.gameplay_database is None: diff --git a/server/blueprints/__init__.py b/server/blueprints/__init__.py new file mode 100644 index 0000000..ffcbf44 --- /dev/null +++ b/server/blueprints/__init__.py @@ -0,0 +1,3 @@ +from .battlesnake import create_battlesnake_blueprint +from .metrics import create_metrics_blueprint +from .dashboard import create_dashboard_blueprint diff --git a/server/blueprints/battlesnake.py b/server/blueprints/battlesnake.py new file mode 100644 index 0000000..61016e3 --- /dev/null +++ b/server/blueprints/battlesnake.py @@ -0,0 +1,83 @@ +from typing import TYPE_CHECKING, cast +import json, time, os + +from quart import Blueprint, request, jsonify + +from quart_common.web.logger import await_log +from server.storage import StorageLoader +from server.GameBoard import GameBoard + +if TYPE_CHECKING: + from server.Server import Server + +def create_battlesnake_blueprint(server:'Server') -> Blueprint: + blueprint = Blueprint('battlesnake', __name__) + + @blueprint.get('/') + async def on_info(): + server.metrics_collector.record_http_request('info') + snake_config = await server._read_json_config_or_create() + await await_log(server.logger.info(f'INFO Snake: {snake_config}')) + return snake_config + + @blueprint.post('/start') + async def on_start(): + server.metrics_collector.record_http_request('start') + await server._prune_stale_games() + game_state = await request.get_json() + await server._create_game_board(game_state) + await server._record_gameplay_start(game_state) + await await_log(server.logger.info(f'GAME START: {game_state['game']}')) + return 'ok' + + @blueprint.post('/move') + async def on_move(): + server.metrics_collector.record_http_request('move') + game_state = await request.get_json() + move_started = time.perf_counter() + game_board = cast(GameBoard, await server._get_game_board(game_state)) + next_move = game_board.snake_neat_make_a_move() + await server._persist_game_board(game_state['game']['id'], game_board) + await server._record_gameplay_turn(game_state, next_move, game_board) + elapsed_ms = (time.perf_counter() - move_started) * 1000.0 + await server.metrics_collector.record_move(next_move, elapsed_ms) + + if server.debug: + await await_log(server.logger.debug(f'TURN: {game_state['turn']:3}, MOVE: {next_move:5}')) + + return {'move': next_move} + + @blueprint.post('/end') + async def on_end(): + server.metrics_collector.record_http_request('end') + await server._prune_stale_games() + game_state = await request.get_json() + if server.store_game_state: + game_board = cast(GameBoard, await server._get_game_board(game_state, end=True)) + if server.check_tls_security: + await game_board.save( + StorageLoader.build(server.storage_type), + file_path=os.path.join(server.data_path, 'data'), + database=os.getenv('EDGEDB_DATABASE', None), + tls_security=None, + ) + else: + await game_board.save( + StorageLoader.build(server.storage_type), + file_path=os.path.join(server.data_path, 'data'), + database=os.getenv('EDGEDB_DATABASE', None), + ) + + await server._record_gameplay_end(game_state) + await server._push_dashboard_games_update(game_state) + await await_log(server.logger.info(f'GAME ENDED: Winner is {[x['name'] for x in game_state['board']['snakes']]}')) + await server._delete_game_board(game_state) + await server.metrics_collector.record_game_end(game_state) + return 'ok' + + @blueprint.get("/cleanup") + async def cleanup(): + results = server._cleanup_database() + return jsonify(data=json.loads(results), status=200) + + return blueprint diff --git a/server/blueprints/dashboard.py b/server/blueprints/dashboard.py new file mode 100644 index 0000000..a72e3c4 --- /dev/null +++ b/server/blueprints/dashboard.py @@ -0,0 +1,119 @@ +from typing import TYPE_CHECKING +import asyncio, json, os + +from quart import ( + Blueprint, + render_template, + send_from_directory, + request, + websocket, +) + +if TYPE_CHECKING: + from server.Server import Server + +def create_dashboard_blueprint(server:'Server') -> Blueprint: + blueprint = Blueprint('dashboard', __name__) + + @blueprint.get('/dashboard') + async def dashboard_view(): + initial_game_id = request.args.get('game_id', '') + initial_summary = await server._get_dashboard_summary() + initial_games = await server._get_dashboard_games(limit=100) + return await render_template( + 'dashboard.html', + initial_game_id=initial_game_id, + initial_summary=initial_summary, + initial_games=initial_games, + ) + + @blueprint.get('/dashboard/customizations/') + async def dashboard_customizations_asset(asset_path:str): + customization_root = os.path.join( + server.data_path, + 'server', + 'static', + 'customizations', + ) + return await send_from_directory(customization_root, asset_path) + + @blueprint.websocket('/dashboard/ws/games') + async def dashboard_games_ws(): + ws_hub = server.dashboard_ws_hub + websocket_task = asyncio.current_task() + if websocket_task is not None: + await server._register_dashboard_ws_task(websocket_task) + + subscriber_queue:asyncio.Queue[str] = asyncio.Queue(maxsize=20) + await server._register_dashboard_game_subscriber(subscriber_queue) + try: + initial_payload = await server._build_dashboard_games_event() + await asyncio.wait_for( + websocket.send(json.dumps(initial_payload)), timeout=1.5 + ) + while True: + queue_task = asyncio.create_task(subscriber_queue.get()) + receive_task = asyncio.create_task(websocket.receive()) + try: + done, _ = await asyncio.wait( + {queue_task, receive_task}, + timeout=1.0, + return_when=asyncio.FIRST_COMPLETED, + ) + + if len(done) == 0: + if ws_hub.shutdown_event.is_set(): + await asyncio.wait_for( + websocket.send(ws_hub.shutdown_message), + timeout=1.5, + ) + break + continue + + if receive_task in done: + try: + request_payload_raw = receive_task.result() + except Exception: + break + + response_event = await server._handle_dashboard_ws_request(request_payload_raw) + if response_event is not None: + await asyncio.wait_for( + websocket.send(json.dumps(response_event)), + timeout=1.5, + ) + + if queue_task in done: + event_payload = queue_task.result() + if event_payload == ws_hub.shutdown_message: + await asyncio.wait_for( + websocket.send(event_payload), timeout=1.5 + ) + break + await asyncio.wait_for( + websocket.send(event_payload), timeout=1.5 + ) + except asyncio.TimeoutError: + if ws_hub.shutdown_event.is_set(): + await asyncio.wait_for( + websocket.send(ws_hub.shutdown_message), + timeout=1.5, + ) + break + finally: + for pending_task in (queue_task, receive_task): + if not pending_task.done(): + pending_task.cancel() + await asyncio.gather( + queue_task, receive_task, return_exceptions=True + ) + except asyncio.CancelledError: + pass + except Exception: + pass + finally: + await server._unregister_dashboard_game_subscriber(subscriber_queue) + if websocket_task is not None: + await server._unregister_dashboard_ws_task(websocket_task) + + return blueprint diff --git a/server/blueprints/metrics.py b/server/blueprints/metrics.py new file mode 100644 index 0000000..d421ee2 --- /dev/null +++ b/server/blueprints/metrics.py @@ -0,0 +1,30 @@ +from quart import Blueprint, jsonify +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from server.Server import Server + +def create_metrics_blueprint(server:"Server") -> Blueprint: + blueprint = Blueprint("metrics", __name__) + + @blueprint.get("/metrics") + async def metrics(): + snapshot = await server.metrics_collector.build_snapshot( + server.game_last_seen_unix, + server.game_move_counts, + ) + return jsonify(snapshot) + + @blueprint.get("/metrics/prometheus") + async def metrics_prometheus(): + snapshot = await server.metrics_collector.build_snapshot( + server.game_last_seen_unix, + server.game_move_counts, + ) + return ( + server.metrics_collector.build_prometheus_metrics(snapshot), + 200, + {"Content-Type": "text/plain; version=0.0.4; charset=utf-8"}, + ) + + return blueprint diff --git a/server/bootstrap.py b/server/bootstrap.py index 63cd3d3..df44819 100644 --- a/server/bootstrap.py +++ b/server/bootstrap.py @@ -17,15 +17,16 @@ def env_bool(name:str, default:bool=False) -> bool: def build_server_from_env(default_snake_type:str) -> Server: data_path = str(Path(__file__).resolve().parent.parent) + redis_url = os.environ.get('REDIS_URL', 'redis://localhost:6379/0') game_state_backend = os.environ.get('GAME_STATE_BACKEND', 'memory') - game_state_redis_url = os.environ.get('GAME_STATE_REDIS_URL', 'redis://localhost:6379/0') + game_state_redis_url = os.environ.get('GAME_STATE_REDIS_URL', redis_url) game_state_ttl_sec = int(os.environ.get('GAME_STATE_TTL_SEC', '900')) metrics_backend = os.environ.get('METRICS_BACKEND', None) if metrics_backend is None: metrics_backend = ('redis' if game_state_backend.strip().lower() == 'redis' else 'memory') - metrics_redis_url = os.environ.get('METRICS_REDIS_URL', game_state_redis_url) + metrics_redis_url = os.environ.get('METRICS_REDIS_URL', redis_url) metrics_ttl_sec_raw = os.environ.get('METRICS_TTL_SEC', None) if metrics_ttl_sec_raw is None: metrics_ttl_sec = (game_state_ttl_sec if metrics_backend.strip().lower() == 'redis' else None) diff --git a/server/services/__init__.py b/server/services/__init__.py new file mode 100644 index 0000000..a7f0fb2 --- /dev/null +++ b/server/services/__init__.py @@ -0,0 +1,2 @@ +from .dashboard_events import DashboardEventsService +from .dashboard_ws_hub import DashboardWebSocketHub diff --git a/server/services/dashboard_events.py b/server/services/dashboard_events.py new file mode 100644 index 0000000..5b10faf --- /dev/null +++ b/server/services/dashboard_events.py @@ -0,0 +1,127 @@ +from quart_common.web.logger import await_log + +from typing import Awaitable, Callable +import asyncio, inspect, json, time + +class DashboardEventsService: + def __init__(self, enabled:bool, redis_url:str, channel:str, event_origin:str, shutdown_event:asyncio.Event, on_notice:Callable[[str], Awaitable[None]], logger): + self.enabled = enabled + self.redis_url = redis_url + self.channel = channel + self.event_origin = event_origin + self.shutdown_event = shutdown_event + self.on_notice = on_notice + self.logger = logger + + self.listener_task:asyncio.Task|None=None + self.redis = None + self.pubsub = None + + async def start_listener(self) -> None: + if not self.enabled: + return + if self.listener_task is not None: + return + + try: + import redis.asyncio as aioredis # type: ignore[import-not-found] + + self.redis = aioredis.from_url(self.redis_url) + self.pubsub = self.redis.pubsub() + await self.pubsub.subscribe(self.channel) + self.listener_task = asyncio.create_task(self._listener_loop()) + except Exception as error: + self.listener_task = None + self.pubsub = None + self.redis = None + await await_log(self.logger.warning(f'Dashboard events listener disabled (redis unavailable): {error}')) + + async def stop_listener(self) -> None: + listener_task = self.listener_task + self.listener_task = None + if listener_task is not None: + listener_task.cancel() + await asyncio.gather(listener_task, return_exceptions=True) + + pubsub = self.pubsub + self.pubsub = None + if pubsub is not None: + try: + await pubsub.unsubscribe(self.channel) + except Exception: + pass + close_method = getattr(pubsub, 'aclose', None) + if callable(close_method): + try: + maybe_result = close_method() + if inspect.isawaitable(maybe_result): + await maybe_result + except Exception: + pass + + redis_client = self.redis + self.redis = None + if redis_client is not None: + close_method = getattr(redis_client, 'aclose', None) + if callable(close_method): + try: + maybe_result = close_method() + if inspect.isawaitable(maybe_result): + await maybe_result + except Exception: + pass + + async def publish_notice(self, trigger:str) -> None: + if not self.enabled: + return + if self.redis is None: + return + if trigger not in {'game_saved', 'stale_finalized', 'manual'}: + return + + message = { + 'type': 'dashboard_games_update_notice', + 'origin': self.event_origin, + 'trigger': trigger, + 'sent_at': int(time.time()), + } + try: + await self.redis.publish(self.channel, json.dumps(message)) + except Exception as error: + await await_log(self.logger.warning(f'Dashboard events publish failed: {error}')) + + async def _listener_loop(self) -> None: + pubsub = self.pubsub + if pubsub is None: + return + + try: + while not self.shutdown_event.is_set(): + message = await pubsub.get_message(ignore_subscribe_messages=True, timeout=1.0) + if message is None: + continue + + raw_data = message.get('data') + if isinstance(raw_data, bytes): + payload_raw = raw_data.decode('utf-8', errors='replace') + else: + payload_raw = str(raw_data) + + try: + payload = json.loads(payload_raw) + except json.JSONDecodeError: + continue + + if not isinstance(payload, dict): + continue + if payload.get('type') != 'dashboard_games_update_notice': + continue + if payload.get('origin') == self.event_origin: + continue + + notice_trigger = str(payload.get('trigger') or 'game_saved') + await self.on_notice(notice_trigger) + except asyncio.CancelledError: + pass + except Exception as error: + await await_log(self.logger.warning(f'Dashboard events listener stopped unexpectedly: {error}')) diff --git a/server/services/dashboard_ws_hub.py b/server/services/dashboard_ws_hub.py new file mode 100644 index 0000000..a319b5a --- /dev/null +++ b/server/services/dashboard_ws_hub.py @@ -0,0 +1,61 @@ +import asyncio, json + +class DashboardWebSocketHub: + def __init__(self): + self.subscribers:set[asyncio.Queue[str]] = set() + self.subscribers_lock = asyncio.Lock() + + self.ws_tasks:set[asyncio.Task] = set() + self.ws_tasks_lock = asyncio.Lock() + + self.shutdown_event = asyncio.Event() + self.shutdown_message = json.dumps({"type": "dashboard_ws_shutdown"}) + + async def register_subscriber(self, subscriber_queue:asyncio.Queue[str]) -> None: + async with self.subscribers_lock: + self.subscribers.add(subscriber_queue) + + async def unregister_subscriber(self, subscriber_queue:asyncio.Queue[str]) -> None: + async with self.subscribers_lock: + self.subscribers.discard(subscriber_queue) + + async def register_task(self, websocket_task:asyncio.Task) -> None: + async with self.ws_tasks_lock: + self.ws_tasks.add(websocket_task) + + async def unregister_task(self, websocket_task:asyncio.Task) -> None: + async with self.ws_tasks_lock: + self.ws_tasks.discard(websocket_task) + + async def broadcast_payload(self, payload:dict) -> None: + encoded_payload = json.dumps(payload) + async with self.subscribers_lock: + subscribers = tuple(self.subscribers) + + for subscriber_queue in subscribers: + if subscriber_queue.full(): + try: + subscriber_queue.get_nowait() + except asyncio.QueueEmpty: + pass + + try: + subscriber_queue.put_nowait(encoded_payload) + except asyncio.QueueFull: + continue + + def request_shutdown(self) -> None: + if self.shutdown_event.is_set(): + return + + self.shutdown_event.set() + for subscriber_queue in tuple(self.subscribers): + if subscriber_queue.full(): + try: + subscriber_queue.get_nowait() + except asyncio.QueueEmpty: + pass + try: + subscriber_queue.put_nowait(self.shutdown_message) + except asyncio.QueueFull: + continue