update Dashboard with new keyboard bindings fix errors with hazards, set win or loss on stale games (older) and push new games real time over the websocket connection when they are finished
Build and Push Docker Container / build-and-push (push) Failing after 12m39s

This commit is contained in:
2026-04-06 00:36:06 +02:00
parent e7d0227cf9
commit 4626a491f5
3 changed files with 417 additions and 44 deletions
+108 -2
View File
@@ -102,7 +102,21 @@ class GameplayDatabase:
def _utc_now(self) -> str:
return datetime.now(timezone.utc).isoformat()
def _to_json(self, payload:dict) -> str:
def _parse_utc_timestamp(self, value:str|None) -> datetime|None:
if not value:
return None
normalized = value.strip()
if normalized.endswith("Z"):
normalized = normalized[:-1] + "+00:00"
try:
parsed = datetime.fromisoformat(normalized)
except ValueError:
return None
if parsed.tzinfo is None:
return parsed.replace(tzinfo=timezone.utc)
return parsed.astimezone(timezone.utc)
def _to_json(self, payload:object) -> str:
return json.dumps(payload, ensure_ascii=False, separators=(",", ":"))
def _from_json(self, payload:str|None):
@@ -331,6 +345,95 @@ class GameplayDatabase:
),
)
def _finalize_stale_running_games_sync(self, stale_after_seconds:int=600) -> int:
threshold = max(60, int(stale_after_seconds))
now_utc = datetime.now(timezone.utc)
finalized = 0
with self._connect() as connection:
rows = connection.execute("""
SELECT game_id, started_at, final_turn, your_snake_id
FROM games
WHERE status = 'running'
ORDER BY started_at ASC
""").fetchall()
for row in rows:
started_at = self._parse_utc_timestamp(row["started_at"])
if started_at is None:
continue
age_seconds = (now_utc - started_at).total_seconds()
if age_seconds < threshold:
continue
game_id = row["game_id"]
your_snake_id = row["your_snake_id"]
final_turn = int(row["final_turn"] or 0)
snake_rows = connection.execute("""
SELECT snake_id, snake_name
FROM snake_turns
WHERE game_id = ? AND turn = ?
ORDER BY is_you DESC, snake_name ASC
""",
(game_id, final_turn),
).fetchall()
if len(snake_rows) == 0:
latest_turn_row = connection.execute("""
SELECT MAX(turn) AS latest_turn
FROM snake_turns
WHERE game_id = ?
""",
(game_id,),
).fetchone()
latest_turn = (
latest_turn_row["latest_turn"]
if latest_turn_row is not None
else None
)
if latest_turn is not None:
final_turn = int(latest_turn)
snake_rows = connection.execute("""
SELECT snake_id, snake_name
FROM snake_turns
WHERE game_id = ? AND turn = ?
ORDER BY is_you DESC, snake_name ASC
""",
(game_id, final_turn),
).fetchall()
survivor_ids = [snake["snake_id"] for snake in snake_rows if snake["snake_id"]]
survivor_names = [snake["snake_name"] for snake in snake_rows if snake["snake_name"]]
winner_you = bool(
your_snake_id
and your_snake_id in survivor_ids
and len(survivor_ids) == 1
)
update_result = connection.execute("""
UPDATE games
SET ended_at = ?,
winner_names_json = ?,
winner_you = ?,
final_turn = CASE WHEN ? > final_turn THEN ? ELSE final_turn END,
status = 'finished'
WHERE game_id = ? AND status = 'running'
""",
(
self._utc_now(),
self._to_json(survivor_names),
1 if winner_you else 0,
final_turn,
final_turn,
game_id,
),
)
if update_result.rowcount > 0:
finalized += 1
return finalized
def _get_summary_sync(self, recent_limit:int=15) -> dict:
with self._connect() as connection:
totals = connection.execute("""
@@ -518,7 +621,7 @@ class GameplayDatabase:
"turns": replay_turns,
}
async def record_game_start(self, game_state: dict, snake_type:str|None=None, snake_version:str|None=None) -> None:
async def record_game_start(self, game_state:dict, snake_type:str|None=None, snake_version:str|None=None) -> None:
await asyncio.to_thread(self._record_game_start_sync, game_state, snake_type, snake_version)
async def record_turn(self, game_state:dict, my_move:str|None, my_thinking:dict|None=None) -> None:
@@ -533,6 +636,9 @@ class GameplayDatabase:
async def list_games(self, limit:int=50) -> list[dict]:
return await asyncio.to_thread(self._list_games_sync, limit)
async def finalize_stale_running_games(self, stale_after_seconds:int=600) -> int:
return await asyncio.to_thread(self._finalize_stale_running_games_sync, stale_after_seconds)
async def get_game_replay(self, game_id:str) -> dict|None:
return await asyncio.to_thread(self._get_game_replay_sync, game_id)