test: expand browser command coverage
- Add mocked Click CLI tests across DOM, cookies, page, storage, perf, navigation, tabs, groups, sessions, and windows commands. - Cover integration paths for cookies, page info, performance profiles, and web storage commands. - Extend DOM integration coverage for eval, scrolling, waiting, focus, hover, typing, selection, keyboard, and checkbox interactions. - Add native host unit coverage for message framing, socket helpers, paging guards, timeouts, and profile alias resolution.
This commit is contained in:
+221
-9
@@ -6,11 +6,9 @@ import pytest
|
||||
|
||||
import browser_cli.native_host as native_host
|
||||
|
||||
|
||||
def _raise_system_exit(code: int):
|
||||
raise SystemExit(code)
|
||||
|
||||
|
||||
def test_cleanup_removes_socket_and_registry_entry(monkeypatch, tmp_path):
|
||||
alias = "work"
|
||||
socket_path = tmp_path / "work.sock"
|
||||
@@ -27,7 +25,6 @@ def test_cleanup_removes_socket_and_registry_entry(monkeypatch, tmp_path):
|
||||
assert not socket_path.exists()
|
||||
assert json.loads(registry_path.read_text()) == {"other": str(tmp_path / "other.sock")}
|
||||
|
||||
|
||||
def test_stdin_reader_cleans_up_on_eof(monkeypatch):
|
||||
cleaned = []
|
||||
|
||||
@@ -41,7 +38,6 @@ def test_stdin_reader_cleans_up_on_eof(monkeypatch):
|
||||
|
||||
assert cleaned == ["work"]
|
||||
|
||||
|
||||
def test_cleanup_windows_skips_socket_unlink(monkeypatch, tmp_path):
|
||||
registry_path = tmp_path / "registry.json"
|
||||
registry_path.write_text(json.dumps({"work": r"\\.\pipe\browser-cli-work"}))
|
||||
@@ -53,7 +49,6 @@ def test_cleanup_windows_skips_socket_unlink(monkeypatch, tmp_path):
|
||||
|
||||
assert json.loads(registry_path.read_text()) == {}
|
||||
|
||||
|
||||
def test_stdin_reader_cleans_up_on_bye(monkeypatch):
|
||||
cleaned = []
|
||||
messages = iter([{"type": "bye"}])
|
||||
@@ -68,7 +63,6 @@ def test_stdin_reader_cleans_up_on_bye(monkeypatch):
|
||||
|
||||
assert cleaned == ["work"]
|
||||
|
||||
|
||||
def test_stdin_reader_routes_response_messages(monkeypatch):
|
||||
response_queue = native_host.queue.Queue()
|
||||
native_host.PENDING["msg-1"] = response_queue
|
||||
@@ -85,7 +79,6 @@ def test_stdin_reader_routes_response_messages(monkeypatch):
|
||||
assert response_queue.get_nowait() == {"id": "msg-1", "success": True}
|
||||
native_host.PENDING.clear()
|
||||
|
||||
|
||||
def test_collect_paged_browser_command_accumulates_pages(monkeypatch):
|
||||
calls = []
|
||||
pages = iter([
|
||||
@@ -110,7 +103,6 @@ def test_collect_paged_browser_command_accumulates_pages(monkeypatch):
|
||||
assert all(call["args"]["foo"] == "bar" for call in calls)
|
||||
assert all(call["id"] != "orig" for call in calls)
|
||||
|
||||
|
||||
def test_collect_paged_browser_command_passes_through_non_paged_response(monkeypatch):
|
||||
monkeypatch.setattr(native_host, "_send_browser_command", lambda cmd: {"id": cmd["id"], "success": True, "data": {"value": 1}})
|
||||
|
||||
@@ -118,7 +110,6 @@ def test_collect_paged_browser_command_passes_through_non_paged_response(monkeyp
|
||||
|
||||
assert result == {"id": "orig", "success": True, "data": {"value": 1}}
|
||||
|
||||
|
||||
def test_handle_browser_command_pages_known_list_commands(monkeypatch):
|
||||
seen = []
|
||||
|
||||
@@ -128,3 +119,224 @@ def test_handle_browser_command_pages_known_list_commands(monkeypatch):
|
||||
|
||||
assert result == {"success": True, "data": []}
|
||||
assert seen[0]["command"] == "tabs.list"
|
||||
|
||||
def test_handle_browser_command_sends_non_pageable_directly(monkeypatch):
|
||||
seen = []
|
||||
monkeypatch.setattr(native_host, "_send_browser_command", lambda cmd: seen.append(cmd) or {"success": True, "data": "ok"})
|
||||
|
||||
result = native_host._handle_browser_command({"id": "x", "command": "navigate.open", "args": {}})
|
||||
|
||||
assert result == {"success": True, "data": "ok"}
|
||||
assert seen[0]["command"] == "navigate.open"
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _read_exact_stream
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_read_exact_stream_full_read():
|
||||
"""Returns the exact bytes when stream delivers them in one shot."""
|
||||
import io
|
||||
stream = io.BytesIO(b"hello")
|
||||
assert native_host._read_exact_stream(stream, 5) == b"hello"
|
||||
|
||||
def test_read_exact_stream_partial_chunks():
|
||||
"""Accumulates multiple short chunks until n bytes are read."""
|
||||
import io
|
||||
|
||||
class _ChunkyStream:
|
||||
def __init__(self, data, chunk_size):
|
||||
self._data = data
|
||||
self._pos = 0
|
||||
self._chunk_size = chunk_size
|
||||
|
||||
def read(self, n):
|
||||
end = min(self._pos + self._chunk_size, len(self._data))
|
||||
chunk = self._data[self._pos:end]
|
||||
self._pos = end
|
||||
return chunk
|
||||
|
||||
stream = _ChunkyStream(b"abcdefgh", 3)
|
||||
assert native_host._read_exact_stream(stream, 8) == b"abcdefgh"
|
||||
|
||||
def test_read_exact_stream_eof_returns_none():
|
||||
"""Returns None if stream is exhausted before n bytes are delivered."""
|
||||
import io
|
||||
stream = io.BytesIO(b"ab") # only 2 bytes, asking for 4
|
||||
assert native_host._read_exact_stream(stream, 4) is None
|
||||
|
||||
def test_read_exact_stream_immediate_eof():
|
||||
"""Returns None on an empty stream."""
|
||||
import io
|
||||
stream = io.BytesIO(b"")
|
||||
assert native_host._read_exact_stream(stream, 1) is None
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# write_native_message / read_native_message round-trip
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_write_and_read_native_message_roundtrip():
|
||||
"""write_native_message followed by read_native_message recovers the original dict."""
|
||||
import io
|
||||
buf = io.BytesIO()
|
||||
msg = {"id": "abc", "command": "tabs.list", "args": {}}
|
||||
native_host.write_native_message(buf, msg)
|
||||
buf.seek(0)
|
||||
result = native_host.read_native_message(buf)
|
||||
assert result == msg
|
||||
|
||||
def test_read_native_message_eof_at_length_prefix():
|
||||
"""Returns None when the stream is empty (no length prefix)."""
|
||||
import io
|
||||
stream = io.BytesIO(b"")
|
||||
assert native_host.read_native_message(stream) is None
|
||||
|
||||
def test_read_native_message_eof_at_body():
|
||||
"""Returns None when the body is truncated after reading the length prefix."""
|
||||
import io
|
||||
import struct
|
||||
# Write a 10-byte length prefix but only 5 bytes of body
|
||||
buf = struct.pack("<I", 10) + b"hello"
|
||||
stream = io.BytesIO(buf)
|
||||
assert native_host.read_native_message(stream) is None
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _recv_exact / _recv_all / _send_all
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_recv_exact_accumulates_data():
|
||||
"""_recv_exact receives exactly n bytes from a socket-like object."""
|
||||
class _FakeSock:
|
||||
def __init__(self, data):
|
||||
self._data = data
|
||||
self._pos = 0
|
||||
def recv(self, n):
|
||||
chunk = self._data[self._pos:self._pos + n]
|
||||
self._pos += len(chunk)
|
||||
return chunk
|
||||
|
||||
sock = _FakeSock(b"0123456789")
|
||||
assert native_host._recv_exact(sock, 5) == b"01234"
|
||||
assert native_host._recv_exact(sock, 5) == b"56789"
|
||||
|
||||
def test_recv_exact_eof_returns_none():
|
||||
class _EmptySock:
|
||||
def recv(self, n):
|
||||
return b""
|
||||
assert native_host._recv_exact(_EmptySock(), 4) is None
|
||||
|
||||
def test_send_all_and_recv_all():
|
||||
"""_send_all frames data with length prefix; _recv_all strips it."""
|
||||
import socket
|
||||
a, b = socket.socketpair()
|
||||
try:
|
||||
payload = b'{"command": "tabs.list"}'
|
||||
native_host._send_all(a, payload)
|
||||
received = native_host._recv_all(b)
|
||||
assert received == payload
|
||||
finally:
|
||||
a.close()
|
||||
b.close()
|
||||
|
||||
def test_recv_all_truncated_body():
|
||||
"""_recv_all returns None when the body is shorter than the prefix promises."""
|
||||
import socket
|
||||
import struct
|
||||
a, b = socket.socketpair()
|
||||
try:
|
||||
# Send a length of 100 but only 4 bytes of body
|
||||
a.sendall(struct.pack("<I", 100) + b"tiny")
|
||||
a.close()
|
||||
result = native_host._recv_all(b)
|
||||
assert result is None
|
||||
finally:
|
||||
b.close()
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _send_browser_command — timeout path
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_send_browser_command_timeout(monkeypatch):
|
||||
"""_send_browser_command returns an error dict when the response queue times out."""
|
||||
import io
|
||||
|
||||
buf = io.BytesIO()
|
||||
|
||||
monkeypatch.setattr(native_host.sys, "stdout", SimpleNamespace(buffer=buf))
|
||||
# Do not put anything into the response queue → timeout after 0 s
|
||||
result = native_host._send_browser_command({"id": "t1", "command": "test", "args": {}}, timeout=0)
|
||||
|
||||
assert result["success"] is False
|
||||
assert "timeout" in result["error"]
|
||||
# Clean up PENDING
|
||||
native_host.PENDING.clear()
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _collect_paged_browser_command — error and loop-guard paths
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_collect_paged_browser_command_propagates_error(monkeypatch):
|
||||
"""If _send_browser_command returns success=False the error is propagated."""
|
||||
monkeypatch.setattr(
|
||||
native_host, "_send_browser_command",
|
||||
lambda cmd: {"id": cmd["id"], "success": False, "error": "extension crash"},
|
||||
)
|
||||
result = native_host._collect_paged_browser_command({"id": "e1", "command": "tabs.list", "args": {}})
|
||||
assert result["success"] is False
|
||||
assert "extension crash" in result["error"]
|
||||
|
||||
def test_collect_paged_browser_command_max_pages_guard(monkeypatch):
|
||||
"""If paging never ends, the loop guard kicks in and returns an error."""
|
||||
monkeypatch.setattr(native_host, "PAGE_SIZE", 1)
|
||||
|
||||
call_count = [0]
|
||||
|
||||
def _infinite_pages(cmd):
|
||||
call_count[0] += 1
|
||||
return {
|
||||
"id": cmd["id"],
|
||||
"success": True,
|
||||
"data": {"__browserCliPage": True, "items": [call_count[0]], "total": 9999, "nextOffset": call_count[0]},
|
||||
}
|
||||
|
||||
monkeypatch.setattr(native_host, "_send_browser_command", _infinite_pages)
|
||||
result = native_host._collect_paged_browser_command({"id": "loop", "command": "tabs.list", "args": {}})
|
||||
assert result["success"] is False
|
||||
assert "paging loop exceeded" in result["error"]
|
||||
|
||||
def test_collect_paged_browser_command_invalid_items(monkeypatch):
|
||||
"""If items is not a list the command returns an error dict."""
|
||||
monkeypatch.setattr(
|
||||
native_host, "_send_browser_command",
|
||||
lambda cmd: {
|
||||
"id": cmd["id"],
|
||||
"success": True,
|
||||
"data": {"__browserCliPage": True, "items": "not-a-list", "total": 1, "nextOffset": None},
|
||||
},
|
||||
)
|
||||
result = native_host._collect_paged_browser_command({"id": "bad", "command": "tabs.list", "args": {}})
|
||||
assert result["success"] is False
|
||||
assert "invalid paged response" in result["error"]
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _resolve_profile_alias
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_resolve_profile_alias_uses_hello_alias():
|
||||
alias = native_host._resolve_profile_alias({"type": "hello", "alias": "brave-work"})
|
||||
assert alias == "brave-work"
|
||||
|
||||
def test_resolve_profile_alias_no_hello_returns_uuid():
|
||||
alias = native_host._resolve_profile_alias(None)
|
||||
import uuid
|
||||
uuid.UUID(alias) # raises ValueError if not a valid UUID
|
||||
|
||||
def test_resolve_profile_alias_default_alias_returns_uuid():
|
||||
from browser_cli.platform import DEFAULT_ALIAS
|
||||
alias = native_host._resolve_profile_alias({"type": "hello", "alias": DEFAULT_ALIAS})
|
||||
import uuid
|
||||
uuid.UUID(alias)
|
||||
|
||||
def test_resolve_profile_alias_non_hello_type_returns_uuid():
|
||||
alias = native_host._resolve_profile_alias({"type": "bye", "alias": "some"})
|
||||
import uuid
|
||||
uuid.UUID(alias)
|
||||
|
||||
Reference in New Issue
Block a user