move route code out of server into own blueprints and cleanup the codebase

This commit is contained in:
2026-04-06 03:10:49 +02:00
parent 0ebb04f0a2
commit d89986dba9
9 changed files with 477 additions and 373 deletions
+49 -371
View File
@@ -14,16 +14,17 @@ from server.metrics import (
MetricsCollector, MetricsCollector,
) )
from quart import ( import asyncio, signal, logging, json, os, re, time
Quart,
request,
jsonify,
render_template,
send_from_directory,
websocket,
)
import asyncio, signal, inspect, logging, json, os, re, time
from typing import cast from typing import cast
from quart import Quart
from server.blueprints import (
create_battlesnake_blueprint,
create_metrics_blueprint,
create_dashboard_blueprint,
)
from server.services import DashboardEventsService
from server.services import DashboardWebSocketHub
class Server: class Server:
default_snake_config = { default_snake_config = {
@@ -57,21 +58,13 @@ class Server:
self.metrics_redis_url = metrics_redis_url self.metrics_redis_url = metrics_redis_url
self.stale_game_timeout_sec = self._get_stale_game_timeout_sec() self.stale_game_timeout_sec = self._get_stale_game_timeout_sec()
self.running_games:dict[str, GameBoard] = {} self.running_games: dict[str, GameBoard] = {}
self.game_move_counts:dict[str, int] = {} self.game_move_counts: dict[str, int] = {}
self.game_last_seen_unix:dict[str, int] = {} self.game_last_seen_unix: dict[str, int] = {}
self.dashboard_game_subscribers:set[asyncio.Queue[str]] = set() self.dashboard_ws_hub = DashboardWebSocketHub()
self.dashboard_game_subscribers_lock = asyncio.Lock() dashboard_event_origin = f'worker-{os.getpid()}-{int(time.time() * 1000)}'
self.dashboard_ws_tasks:set[asyncio.Task] = set() dashboard_events_channel = os.getenv('DASHBOARD_EVENTS_CHANNEL', 'snake:dashboard:events')
self.dashboard_ws_tasks_lock = asyncio.Lock() dashboard_events_enabled = (self.metrics_backend_normalized == 'redis' and self._env_bool('DASHBOARD_EVENTS_ENABLED', True))
self.dashboard_ws_shutdown_event = asyncio.Event()
self.dashboard_ws_shutdown_message = json.dumps({'type': 'dashboard_ws_shutdown'})
self.dashboard_event_origin = f'worker-{os.getpid()}-{int(time.time() * 1000)}'
self.dashboard_events_channel = os.getenv('DASHBOARD_EVENTS_CHANNEL', 'snake:dashboard:events')
self.dashboard_events_enabled = (self.metrics_backend_normalized == 'redis' and self._env_bool('DASHBOARD_EVENTS_ENABLED', True))
self.dashboard_events_listener_task:asyncio.Task | None = None
self.dashboard_events_redis = None
self.dashboard_events_pubsub = None
self.metrics_collector = MetricsCollector( self.metrics_collector = MetricsCollector(
metrics_manager=MetricsStoreBuilder.build( metrics_manager=MetricsStoreBuilder.build(
@@ -93,6 +86,15 @@ class Server:
self._startup_worker_metrics_cleared = False self._startup_worker_metrics_cleared = False
self.logger = build_logger('Battlesnake', debug_env_var='DEBUG_SERVER') self.logger = build_logger('Battlesnake', debug_env_var='DEBUG_SERVER')
self.dashboard_events_service = DashboardEventsService(
enabled=dashboard_events_enabled,
redis_url=self.metrics_redis_url,
channel=dashboard_events_channel,
event_origin=dashboard_event_origin,
shutdown_event=self.dashboard_ws_hub.shutdown_event,
on_notice=self._on_dashboard_games_update_notice,
logger=self.logger,
)
self.snake_version = self._get_snake_version() self.snake_version = self._get_snake_version()
self.gameplay_database = None self.gameplay_database = None
if gameplay_db_enabled: if gameplay_db_enabled:
@@ -104,74 +106,9 @@ class Server:
self.app = Quart('Battlesnake', template_folder=os.path.join(data_path, 'server', 'templates')) self.app = Quart('Battlesnake', template_folder=os.path.join(data_path, 'server', 'templates'))
# info is called when you create your Battlesnake on play.battlesnake.com self.app.register_blueprint(create_battlesnake_blueprint(self))
# and controls your Battlesnake's appearance self.app.register_blueprint(create_metrics_blueprint(self))
# TIP: If you open your Battlesnake URL in a browser you should see this data self.app.register_blueprint(create_dashboard_blueprint(self))
@self.app.get('/')
async def on_info():
self.metrics_collector.record_http_request('info')
snake_config = await self._read_json_config_or_create()
await await_log(self.logger.info(f'INFO Snake: {snake_config}'))
return snake_config
# start is called when your Battlesnake begins a game
@self.app.post('/start')
async def on_start():
self.metrics_collector.record_http_request('start')
await self._prune_stale_games()
game_state = await request.get_json()
await self._create_game_board(game_state)
await self._record_gameplay_start(game_state)
await await_log(self.logger.info(f'GAME START: {game_state['game']}'))
return 'ok'
# move is called when your Battlesnake game is running game
@self.app.post('/move')
async def on_move():
self.metrics_collector.record_http_request('move')
game_state = await request.get_json()
move_started = time.perf_counter()
game_board = cast(GameBoard, await self._get_game_board(game_state))
next_move = game_board.snake_neat_make_a_move()
await self._persist_game_board(game_state['game']['id'], game_board)
await self._record_gameplay_turn(game_state, next_move, game_board)
elapsed_ms = (time.perf_counter() - move_started) * 1000.0
await self.metrics_collector.record_move(next_move, elapsed_ms)
if self.debug:
await await_log(self.logger.debug(f'TURN: {game_state['turn']:3}, MOVE: {next_move:5}'))
return {'move': next_move}
# end is called when your Battlesnake finishes a game
@self.app.post('/end')
async def on_end():
self.metrics_collector.record_http_request('end')
await self._prune_stale_games()
game_state = await request.get_json()
if self.store_game_state:
game_board = cast(GameBoard, await self._get_game_board(game_state, end=True))
if self.check_tls_security:
await game_board.save(
StorageLoader.build(self.storage_type),
file_path=os.path.join(self.data_path, 'data'),
database=os.getenv('EDGEDB_DATABASE', None),
tls_security=None,
)
else:
await game_board.save(
StorageLoader.build(self.storage_type),
file_path=os.path.join(self.data_path, 'data'),
database=os.getenv('EDGEDB_DATABASE', None),
)
await self._record_gameplay_end(game_state)
await self._push_dashboard_games_update(game_state)
await await_log(self.logger.info(f'GAME ENDED: Winner is {[x['name'] for x in game_state['board']['snakes']]}'))
await self._delete_game_board(game_state)
await self.metrics_collector.record_game_end(game_state)
return 'ok'
@self.app.after_request @self.app.after_request
async def identify_server(response): async def identify_server(response):
@@ -187,135 +124,22 @@ class Server:
should_clear = await self.metrics_collector.should_clear_worker_metrics_on_startup(self.worker_metrics_startup_lock_ttl_sec) should_clear = await self.metrics_collector.should_clear_worker_metrics_on_startup(self.worker_metrics_startup_lock_ttl_sec)
if should_clear: if should_clear:
await self.metrics_collector.clear_worker_metrics() await self.metrics_collector.clear_worker_metrics()
await self._start_dashboard_events_listener() await self.dashboard_events_service.start_listener()
@self.app.after_serving @self.app.after_serving
async def shutdown_state_storage(): async def shutdown_state_storage():
await self._stop_dashboard_events_listener() await self.dashboard_events_service.stop_listener()
await self.game_state_store.close() await self.game_state_store.close()
await self.metrics_collector.close() await self.metrics_collector.close()
if self.gameplay_database is not None: if self.gameplay_database is not None:
await self.gameplay_database.close() await self.gameplay_database.close()
@self.app.get('/cleanup')
async def cleanup():
results = self._cleanup_database()
return jsonify(data=json.loads(results), status=200)
@self.app.get('/metrics')
async def metrics():
snapshot = await self.metrics_collector.build_snapshot(self.game_last_seen_unix, self.game_move_counts)
return jsonify(snapshot)
@self.app.get('/metrics/prometheus')
async def metrics_prometheus():
snapshot = await self.metrics_collector.build_snapshot(self.game_last_seen_unix, self.game_move_counts)
return (
self.metrics_collector.build_prometheus_metrics(snapshot),
200,
{'Content-Type': 'text/plain; version=0.0.4; charset=utf-8'},
)
@self.app.get('/dashboard')
async def dashboard_view():
initial_game_id = request.args.get('game_id', '')
initial_summary = await self._get_dashboard_summary()
initial_games = await self._get_dashboard_games(limit=100)
return await render_template(
'dashboard.html',
initial_game_id=initial_game_id,
initial_summary=initial_summary,
initial_games=initial_games,
)
@self.app.get('/dashboard/customizations/<path:asset_path>')
async def dashboard_customizations_asset(asset_path: str):
customization_root = os.path.join(self.data_path, 'server', 'static', 'customizations')
return await send_from_directory(customization_root, asset_path)
@self.app.websocket('/dashboard/ws/games')
async def dashboard_games_ws():
websocket_task = asyncio.current_task()
if websocket_task is not None:
await self._register_dashboard_ws_task(websocket_task)
subscriber_queue: asyncio.Queue[str] = asyncio.Queue(maxsize=20)
await self._register_dashboard_game_subscriber(subscriber_queue)
try:
initial_payload = await self._build_dashboard_games_event()
await asyncio.wait_for(websocket.send(json.dumps(initial_payload)), timeout=1.5)
while True:
queue_task = asyncio.create_task(subscriber_queue.get())
receive_task = asyncio.create_task(websocket.receive())
try:
done, _ = await asyncio.wait(
{queue_task, receive_task},
timeout=1.0,
return_when=asyncio.FIRST_COMPLETED,
)
if len(done) == 0:
if self.dashboard_ws_shutdown_event.is_set():
await asyncio.wait_for(
websocket.send(self.dashboard_ws_shutdown_message),
timeout=1.5,
)
break
continue
if receive_task in done:
try:
request_payload_raw = receive_task.result()
except Exception:
break
response_event = await self._handle_dashboard_ws_request(
request_payload_raw
)
if response_event is not None:
await asyncio.wait_for(
websocket.send(json.dumps(response_event)),
timeout=1.5,
)
if queue_task in done:
event_payload = queue_task.result()
if event_payload == self.dashboard_ws_shutdown_message:
await asyncio.wait_for(
websocket.send(event_payload), timeout=1.5
)
break
await asyncio.wait_for(
websocket.send(event_payload), timeout=1.5
)
except asyncio.TimeoutError:
if self.dashboard_ws_shutdown_event.is_set():
await asyncio.wait_for(
websocket.send(self.dashboard_ws_shutdown_message),
timeout=1.5,
)
break
finally:
for pending_task in (queue_task, receive_task):
if not pending_task.done():
pending_task.cancel()
await asyncio.gather(
queue_task, receive_task, return_exceptions=True
)
except asyncio.CancelledError:
pass
except Exception:
pass
finally:
await self._unregister_dashboard_game_subscriber(subscriber_queue)
if websocket_task is not None:
await self._unregister_dashboard_ws_task(websocket_task)
async def run(self, host:str='0.0.0.0', port:int=8000, debug:bool=False): async def run(self, host:str='0.0.0.0', port:int=8000, debug:bool=False):
logging.getLogger('werkzeug').setLevel(logging.ERROR) logging.getLogger('werkzeug').setLevel(logging.ERROR)
loop = asyncio.get_running_loop() loop = asyncio.get_running_loop()
shutdown_event = asyncio.Event()
installed_signal_handlers:list[signal.Signals] = [] installed_signal_handlers:list[signal.Signals] = []
shutdown_event = asyncio.Event()
def on_shutdown_signal() -> None: def on_shutdown_signal() -> None:
self._request_dashboard_ws_shutdown() self._request_dashboard_ws_shutdown()
@@ -392,7 +216,7 @@ class Server:
return default return default
return value.strip().lower() in {'1', 'true', 'yes', 'on'} return value.strip().lower() in {'1', 'true', 'yes', 'on'}
def _env_int(self, name: str, default: int) -> int: def _env_int(self, name:str, default:int) -> int:
value = os.getenv(name) value = os.getenv(name)
if value is None: if value is None:
return default return default
@@ -496,7 +320,7 @@ class Server:
except Exception as error: except Exception as error:
await await_log(self.logger.warning(f'Gameplay DB start record failed:{error}')) await await_log(self.logger.warning(f'Gameplay DB start record failed:{error}'))
def _extract_latest_snake_thinking(self, game_board:GameBoard) -> dict | None: def _extract_latest_snake_thinking(self, game_board:GameBoard) -> dict|None:
try: try:
history = game_board.snake_class.get_history() history = game_board.snake_class.get_history()
except Exception: except Exception:
@@ -524,175 +348,29 @@ class Server:
await await_log(self.logger.warning(f'Gameplay DB end record failed:{error}')) await await_log(self.logger.warning(f'Gameplay DB end record failed:{error}'))
async def _register_dashboard_game_subscriber(self, subscriber_queue:asyncio.Queue[str]) -> None: async def _register_dashboard_game_subscriber(self, subscriber_queue:asyncio.Queue[str]) -> None:
async with self.dashboard_game_subscribers_lock: await self.dashboard_ws_hub.register_subscriber(subscriber_queue)
self.dashboard_game_subscribers.add(subscriber_queue)
async def _unregister_dashboard_game_subscriber(self, subscriber_queue:asyncio.Queue[str]) -> None: async def _unregister_dashboard_game_subscriber(self, subscriber_queue:asyncio.Queue[str]) -> None:
async with self.dashboard_game_subscribers_lock: await self.dashboard_ws_hub.unregister_subscriber(subscriber_queue)
self.dashboard_game_subscribers.discard(subscriber_queue)
async def _broadcast_dashboard_game_event(self, payload:dict) -> None: async def _broadcast_dashboard_game_event(self, payload:dict) -> None:
encoded_payload = json.dumps(payload) await self.dashboard_ws_hub.broadcast_payload(payload)
async with self.dashboard_game_subscribers_lock:
subscribers = tuple(self.dashboard_game_subscribers)
for subscriber_queue in subscribers:
if subscriber_queue.full():
try:
subscriber_queue.get_nowait()
except asyncio.QueueEmpty:
pass
try:
subscriber_queue.put_nowait(encoded_payload)
except asyncio.QueueFull:
continue
async def _register_dashboard_ws_task(self, websocket_task:asyncio.Task) -> None: async def _register_dashboard_ws_task(self, websocket_task:asyncio.Task) -> None:
async with self.dashboard_ws_tasks_lock: await self.dashboard_ws_hub.register_task(websocket_task)
self.dashboard_ws_tasks.add(websocket_task)
async def _unregister_dashboard_ws_task(self, websocket_task:asyncio.Task) -> None: async def _unregister_dashboard_ws_task(self, websocket_task:asyncio.Task) -> None:
async with self.dashboard_ws_tasks_lock: await self.dashboard_ws_hub.unregister_task(websocket_task)
self.dashboard_ws_tasks.discard(websocket_task)
def _request_dashboard_ws_shutdown(self) -> None: def _request_dashboard_ws_shutdown(self) -> None:
if self.dashboard_ws_shutdown_event.is_set(): self.dashboard_ws_hub.request_shutdown()
return
self.dashboard_ws_shutdown_event.set() async def _on_dashboard_games_update_notice(self, trigger:str) -> None:
for subscriber_queue in tuple(self.dashboard_game_subscribers): await self._push_dashboard_games_update(
if subscriber_queue.full(): game_state=None,
try: publish_cluster=False,
subscriber_queue.get_nowait() trigger=trigger,
except asyncio.QueueEmpty: )
pass
try:
subscriber_queue.put_nowait(self.dashboard_ws_shutdown_message)
except asyncio.QueueFull:
continue
async def _start_dashboard_events_listener(self) -> None:
if not self.dashboard_events_enabled:
return
if self.dashboard_events_listener_task is not None:
return
try:
import redis.asyncio as aioredis # type: ignore[import-not-found]
self.dashboard_events_redis = aioredis.from_url(self.metrics_redis_url)
self.dashboard_events_pubsub = self.dashboard_events_redis.pubsub()
await self.dashboard_events_pubsub.subscribe(self.dashboard_events_channel)
self.dashboard_events_listener_task = asyncio.create_task(
self._dashboard_events_listener_loop()
)
except Exception as error:
self.dashboard_events_listener_task = None
self.dashboard_events_pubsub = None
self.dashboard_events_redis = None
await await_log(self.logger.warning(f'Dashboard events listener disabled (redis unavailable): {error}'))
async def _stop_dashboard_events_listener(self) -> None:
listener_task = self.dashboard_events_listener_task
self.dashboard_events_listener_task = None
if listener_task is not None:
listener_task.cancel()
await asyncio.gather(listener_task, return_exceptions=True)
pubsub = self.dashboard_events_pubsub
self.dashboard_events_pubsub = None
if pubsub is not None:
try:
await pubsub.unsubscribe(self.dashboard_events_channel)
except Exception:
pass
close_method = getattr(pubsub, 'aclose', None)
if callable(close_method):
try:
maybe_result = close_method()
if inspect.isawaitable(maybe_result):
await maybe_result
except Exception:
pass
redis_client = self.dashboard_events_redis
self.dashboard_events_redis = None
if redis_client is not None:
close_method = getattr(redis_client, 'aclose', None)
if callable(close_method):
try:
maybe_result = close_method()
if inspect.isawaitable(maybe_result):
await maybe_result
except Exception:
pass
async def _dashboard_events_listener_loop(self) -> None:
pubsub = self.dashboard_events_pubsub
if pubsub is None:
return
try:
while not self.dashboard_ws_shutdown_event.is_set():
message = await pubsub.get_message(
ignore_subscribe_messages=True,
timeout=1.0,
)
if message is None:
continue
raw_data = message.get('data')
if isinstance(raw_data, bytes):
payload_raw = raw_data.decode('utf-8', errors='replace')
else:
payload_raw = str(raw_data)
try:
payload = json.loads(payload_raw)
except json.JSONDecodeError:
continue
if not isinstance(payload, dict):
continue
if payload.get('type') != 'dashboard_games_update_notice':
continue
if payload.get('origin') == self.dashboard_event_origin:
continue
notice_trigger = str(payload.get('trigger') or 'game_saved')
await self._push_dashboard_games_update(
game_state=None,
publish_cluster=False,
trigger=notice_trigger,
)
except asyncio.CancelledError:
pass
except Exception as error:
await await_log(
self.logger.warning(f'Dashboard events listener stopped unexpectedly: {error}'))
async def _publish_dashboard_games_update_notice(self, trigger: str) -> None:
if not self.dashboard_events_enabled:
return
if self.dashboard_events_redis is None:
return
if trigger not in {'game_saved', 'stale_finalized', 'manual'}:
return
message = {
'type': 'dashboard_games_update_notice',
'origin': self.dashboard_event_origin,
'trigger': trigger,
'sent_at': int(time.time()),
}
try:
await self.dashboard_events_redis.publish(
self.dashboard_events_channel,
json.dumps(message),
)
except Exception as error:
await await_log(self.logger.warning(f'Dashboard events publish failed: {error}'))
async def _build_dashboard_games_event(self, game_state:dict|None=None, trigger_override:str|None=None) -> dict: async def _build_dashboard_games_event(self, game_state:dict|None=None, trigger_override:str|None=None) -> dict:
games_payload = await self._get_dashboard_games(limit=100) games_payload = await self._get_dashboard_games(limit=100)
@@ -726,7 +404,7 @@ class Server:
'replay': replay_payload, 'replay': replay_payload,
} }
async def _handle_dashboard_ws_request(self, payload_raw: object) -> dict | None: async def _handle_dashboard_ws_request(self, payload_raw:object) -> dict|None:
if not isinstance(payload_raw, str): if not isinstance(payload_raw, str):
return None return None
@@ -765,7 +443,7 @@ class Server:
) )
await self._broadcast_dashboard_game_event(event_payload) await self._broadcast_dashboard_game_event(event_payload)
if publish_cluster: if publish_cluster:
await self._publish_dashboard_games_update_notice(trigger=str(event_payload.get('trigger') or '')) await self.dashboard_events_service.publish_notice(trigger=str(event_payload.get('trigger') or ''))
async def _get_dashboard_summary(self) -> dict: async def _get_dashboard_summary(self) -> dict:
if self.gameplay_database is None: if self.gameplay_database is None:
+3
View File
@@ -0,0 +1,3 @@
from .battlesnake import create_battlesnake_blueprint
from .metrics import create_metrics_blueprint
from .dashboard import create_dashboard_blueprint
+83
View File
@@ -0,0 +1,83 @@
from typing import TYPE_CHECKING, cast
import json, time, os
from quart import Blueprint, request, jsonify
from quart_common.web.logger import await_log
from server.storage import StorageLoader
from server.GameBoard import GameBoard
if TYPE_CHECKING:
from server.Server import Server
def create_battlesnake_blueprint(server:'Server') -> Blueprint:
blueprint = Blueprint('battlesnake', __name__)
@blueprint.get('/')
async def on_info():
server.metrics_collector.record_http_request('info')
snake_config = await server._read_json_config_or_create()
await await_log(server.logger.info(f'INFO Snake: {snake_config}'))
return snake_config
@blueprint.post('/start')
async def on_start():
server.metrics_collector.record_http_request('start')
await server._prune_stale_games()
game_state = await request.get_json()
await server._create_game_board(game_state)
await server._record_gameplay_start(game_state)
await await_log(server.logger.info(f'GAME START: {game_state['game']}'))
return 'ok'
@blueprint.post('/move')
async def on_move():
server.metrics_collector.record_http_request('move')
game_state = await request.get_json()
move_started = time.perf_counter()
game_board = cast(GameBoard, await server._get_game_board(game_state))
next_move = game_board.snake_neat_make_a_move()
await server._persist_game_board(game_state['game']['id'], game_board)
await server._record_gameplay_turn(game_state, next_move, game_board)
elapsed_ms = (time.perf_counter() - move_started) * 1000.0
await server.metrics_collector.record_move(next_move, elapsed_ms)
if server.debug:
await await_log(server.logger.debug(f'TURN: {game_state['turn']:3}, MOVE: {next_move:5}'))
return {'move': next_move}
@blueprint.post('/end')
async def on_end():
server.metrics_collector.record_http_request('end')
await server._prune_stale_games()
game_state = await request.get_json()
if server.store_game_state:
game_board = cast(GameBoard, await server._get_game_board(game_state, end=True))
if server.check_tls_security:
await game_board.save(
StorageLoader.build(server.storage_type),
file_path=os.path.join(server.data_path, 'data'),
database=os.getenv('EDGEDB_DATABASE', None),
tls_security=None,
)
else:
await game_board.save(
StorageLoader.build(server.storage_type),
file_path=os.path.join(server.data_path, 'data'),
database=os.getenv('EDGEDB_DATABASE', None),
)
await server._record_gameplay_end(game_state)
await server._push_dashboard_games_update(game_state)
await await_log(server.logger.info(f'GAME ENDED: Winner is {[x['name'] for x in game_state['board']['snakes']]}'))
await server._delete_game_board(game_state)
await server.metrics_collector.record_game_end(game_state)
return 'ok'
@blueprint.get("/cleanup")
async def cleanup():
results = server._cleanup_database()
return jsonify(data=json.loads(results), status=200)
return blueprint
+119
View File
@@ -0,0 +1,119 @@
from typing import TYPE_CHECKING
import asyncio, json, os
from quart import (
Blueprint,
render_template,
send_from_directory,
request,
websocket,
)
if TYPE_CHECKING:
from server.Server import Server
def create_dashboard_blueprint(server:'Server') -> Blueprint:
blueprint = Blueprint('dashboard', __name__)
@blueprint.get('/dashboard')
async def dashboard_view():
initial_game_id = request.args.get('game_id', '')
initial_summary = await server._get_dashboard_summary()
initial_games = await server._get_dashboard_games(limit=100)
return await render_template(
'dashboard.html',
initial_game_id=initial_game_id,
initial_summary=initial_summary,
initial_games=initial_games,
)
@blueprint.get('/dashboard/customizations/<path:asset_path>')
async def dashboard_customizations_asset(asset_path:str):
customization_root = os.path.join(
server.data_path,
'server',
'static',
'customizations',
)
return await send_from_directory(customization_root, asset_path)
@blueprint.websocket('/dashboard/ws/games')
async def dashboard_games_ws():
ws_hub = server.dashboard_ws_hub
websocket_task = asyncio.current_task()
if websocket_task is not None:
await server._register_dashboard_ws_task(websocket_task)
subscriber_queue:asyncio.Queue[str] = asyncio.Queue(maxsize=20)
await server._register_dashboard_game_subscriber(subscriber_queue)
try:
initial_payload = await server._build_dashboard_games_event()
await asyncio.wait_for(
websocket.send(json.dumps(initial_payload)), timeout=1.5
)
while True:
queue_task = asyncio.create_task(subscriber_queue.get())
receive_task = asyncio.create_task(websocket.receive())
try:
done, _ = await asyncio.wait(
{queue_task, receive_task},
timeout=1.0,
return_when=asyncio.FIRST_COMPLETED,
)
if len(done) == 0:
if ws_hub.shutdown_event.is_set():
await asyncio.wait_for(
websocket.send(ws_hub.shutdown_message),
timeout=1.5,
)
break
continue
if receive_task in done:
try:
request_payload_raw = receive_task.result()
except Exception:
break
response_event = await server._handle_dashboard_ws_request(request_payload_raw)
if response_event is not None:
await asyncio.wait_for(
websocket.send(json.dumps(response_event)),
timeout=1.5,
)
if queue_task in done:
event_payload = queue_task.result()
if event_payload == ws_hub.shutdown_message:
await asyncio.wait_for(
websocket.send(event_payload), timeout=1.5
)
break
await asyncio.wait_for(
websocket.send(event_payload), timeout=1.5
)
except asyncio.TimeoutError:
if ws_hub.shutdown_event.is_set():
await asyncio.wait_for(
websocket.send(ws_hub.shutdown_message),
timeout=1.5,
)
break
finally:
for pending_task in (queue_task, receive_task):
if not pending_task.done():
pending_task.cancel()
await asyncio.gather(
queue_task, receive_task, return_exceptions=True
)
except asyncio.CancelledError:
pass
except Exception:
pass
finally:
await server._unregister_dashboard_game_subscriber(subscriber_queue)
if websocket_task is not None:
await server._unregister_dashboard_ws_task(websocket_task)
return blueprint
+30
View File
@@ -0,0 +1,30 @@
from quart import Blueprint, jsonify
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from server.Server import Server
def create_metrics_blueprint(server:"Server") -> Blueprint:
blueprint = Blueprint("metrics", __name__)
@blueprint.get("/metrics")
async def metrics():
snapshot = await server.metrics_collector.build_snapshot(
server.game_last_seen_unix,
server.game_move_counts,
)
return jsonify(snapshot)
@blueprint.get("/metrics/prometheus")
async def metrics_prometheus():
snapshot = await server.metrics_collector.build_snapshot(
server.game_last_seen_unix,
server.game_move_counts,
)
return (
server.metrics_collector.build_prometheus_metrics(snapshot),
200,
{"Content-Type": "text/plain; version=0.0.4; charset=utf-8"},
)
return blueprint
+3 -2
View File
@@ -17,15 +17,16 @@ def env_bool(name:str, default:bool=False) -> bool:
def build_server_from_env(default_snake_type:str) -> Server: def build_server_from_env(default_snake_type:str) -> Server:
data_path = str(Path(__file__).resolve().parent.parent) data_path = str(Path(__file__).resolve().parent.parent)
redis_url = os.environ.get('REDIS_URL', 'redis://localhost:6379/0')
game_state_backend = os.environ.get('GAME_STATE_BACKEND', 'memory') game_state_backend = os.environ.get('GAME_STATE_BACKEND', 'memory')
game_state_redis_url = os.environ.get('GAME_STATE_REDIS_URL', 'redis://localhost:6379/0') game_state_redis_url = os.environ.get('GAME_STATE_REDIS_URL', redis_url)
game_state_ttl_sec = int(os.environ.get('GAME_STATE_TTL_SEC', '900')) game_state_ttl_sec = int(os.environ.get('GAME_STATE_TTL_SEC', '900'))
metrics_backend = os.environ.get('METRICS_BACKEND', None) metrics_backend = os.environ.get('METRICS_BACKEND', None)
if metrics_backend is None: if metrics_backend is None:
metrics_backend = ('redis' if game_state_backend.strip().lower() == 'redis' else 'memory') metrics_backend = ('redis' if game_state_backend.strip().lower() == 'redis' else 'memory')
metrics_redis_url = os.environ.get('METRICS_REDIS_URL', game_state_redis_url) metrics_redis_url = os.environ.get('METRICS_REDIS_URL', redis_url)
metrics_ttl_sec_raw = os.environ.get('METRICS_TTL_SEC', None) metrics_ttl_sec_raw = os.environ.get('METRICS_TTL_SEC', None)
if metrics_ttl_sec_raw is None: if metrics_ttl_sec_raw is None:
metrics_ttl_sec = (game_state_ttl_sec if metrics_backend.strip().lower() == 'redis' else None) metrics_ttl_sec = (game_state_ttl_sec if metrics_backend.strip().lower() == 'redis' else None)
+2
View File
@@ -0,0 +1,2 @@
from .dashboard_events import DashboardEventsService
from .dashboard_ws_hub import DashboardWebSocketHub
+127
View File
@@ -0,0 +1,127 @@
from quart_common.web.logger import await_log
from typing import Awaitable, Callable
import asyncio, inspect, json, time
class DashboardEventsService:
def __init__(self, enabled:bool, redis_url:str, channel:str, event_origin:str, shutdown_event:asyncio.Event, on_notice:Callable[[str], Awaitable[None]], logger):
self.enabled = enabled
self.redis_url = redis_url
self.channel = channel
self.event_origin = event_origin
self.shutdown_event = shutdown_event
self.on_notice = on_notice
self.logger = logger
self.listener_task:asyncio.Task|None=None
self.redis = None
self.pubsub = None
async def start_listener(self) -> None:
if not self.enabled:
return
if self.listener_task is not None:
return
try:
import redis.asyncio as aioredis # type: ignore[import-not-found]
self.redis = aioredis.from_url(self.redis_url)
self.pubsub = self.redis.pubsub()
await self.pubsub.subscribe(self.channel)
self.listener_task = asyncio.create_task(self._listener_loop())
except Exception as error:
self.listener_task = None
self.pubsub = None
self.redis = None
await await_log(self.logger.warning(f'Dashboard events listener disabled (redis unavailable): {error}'))
async def stop_listener(self) -> None:
listener_task = self.listener_task
self.listener_task = None
if listener_task is not None:
listener_task.cancel()
await asyncio.gather(listener_task, return_exceptions=True)
pubsub = self.pubsub
self.pubsub = None
if pubsub is not None:
try:
await pubsub.unsubscribe(self.channel)
except Exception:
pass
close_method = getattr(pubsub, 'aclose', None)
if callable(close_method):
try:
maybe_result = close_method()
if inspect.isawaitable(maybe_result):
await maybe_result
except Exception:
pass
redis_client = self.redis
self.redis = None
if redis_client is not None:
close_method = getattr(redis_client, 'aclose', None)
if callable(close_method):
try:
maybe_result = close_method()
if inspect.isawaitable(maybe_result):
await maybe_result
except Exception:
pass
async def publish_notice(self, trigger:str) -> None:
if not self.enabled:
return
if self.redis is None:
return
if trigger not in {'game_saved', 'stale_finalized', 'manual'}:
return
message = {
'type': 'dashboard_games_update_notice',
'origin': self.event_origin,
'trigger': trigger,
'sent_at': int(time.time()),
}
try:
await self.redis.publish(self.channel, json.dumps(message))
except Exception as error:
await await_log(self.logger.warning(f'Dashboard events publish failed: {error}'))
async def _listener_loop(self) -> None:
pubsub = self.pubsub
if pubsub is None:
return
try:
while not self.shutdown_event.is_set():
message = await pubsub.get_message(ignore_subscribe_messages=True, timeout=1.0)
if message is None:
continue
raw_data = message.get('data')
if isinstance(raw_data, bytes):
payload_raw = raw_data.decode('utf-8', errors='replace')
else:
payload_raw = str(raw_data)
try:
payload = json.loads(payload_raw)
except json.JSONDecodeError:
continue
if not isinstance(payload, dict):
continue
if payload.get('type') != 'dashboard_games_update_notice':
continue
if payload.get('origin') == self.event_origin:
continue
notice_trigger = str(payload.get('trigger') or 'game_saved')
await self.on_notice(notice_trigger)
except asyncio.CancelledError:
pass
except Exception as error:
await await_log(self.logger.warning(f'Dashboard events listener stopped unexpectedly: {error}'))
+61
View File
@@ -0,0 +1,61 @@
import asyncio, json
class DashboardWebSocketHub:
def __init__(self):
self.subscribers:set[asyncio.Queue[str]] = set()
self.subscribers_lock = asyncio.Lock()
self.ws_tasks:set[asyncio.Task] = set()
self.ws_tasks_lock = asyncio.Lock()
self.shutdown_event = asyncio.Event()
self.shutdown_message = json.dumps({"type": "dashboard_ws_shutdown"})
async def register_subscriber(self, subscriber_queue:asyncio.Queue[str]) -> None:
async with self.subscribers_lock:
self.subscribers.add(subscriber_queue)
async def unregister_subscriber(self, subscriber_queue:asyncio.Queue[str]) -> None:
async with self.subscribers_lock:
self.subscribers.discard(subscriber_queue)
async def register_task(self, websocket_task:asyncio.Task) -> None:
async with self.ws_tasks_lock:
self.ws_tasks.add(websocket_task)
async def unregister_task(self, websocket_task:asyncio.Task) -> None:
async with self.ws_tasks_lock:
self.ws_tasks.discard(websocket_task)
async def broadcast_payload(self, payload:dict) -> None:
encoded_payload = json.dumps(payload)
async with self.subscribers_lock:
subscribers = tuple(self.subscribers)
for subscriber_queue in subscribers:
if subscriber_queue.full():
try:
subscriber_queue.get_nowait()
except asyncio.QueueEmpty:
pass
try:
subscriber_queue.put_nowait(encoded_payload)
except asyncio.QueueFull:
continue
def request_shutdown(self) -> None:
if self.shutdown_event.is_set():
return
self.shutdown_event.set()
for subscriber_queue in tuple(self.subscribers):
if subscriber_queue.full():
try:
subscriber_queue.get_nowait()
except asyncio.QueueEmpty:
pass
try:
subscriber_queue.put_nowait(self.shutdown_message)
except asyncio.QueueFull:
continue