Files
browser-cli/tests/test_new_feature_commands.py
daniel156161 7fe0e27fec
Testing / remote-protocol-compat (0.9.3) (push) Successful in 46s
Testing / remote-protocol-compat (0.9.5) (push) Successful in 47s
Testing / test (push) Successful in 36s
feat(auth): add interactive key policy editing
- Add auth policy to update existing authorized_keys allow policies locally or over remote serve.
- Support key lookup by public key or exact name, with safe, all, server-default, and category-based modes.
- Add questionary-powered interactive key selection and checkbox policy editing with current policy preselected.
- Show policy descriptions in auth keys output so each capability is easier to understand.
- Gate the new remote control command behind the existing keys policy category and include protocol routing/compat updates.
- Bump real-browser-cli to 0.16.2 and lock the new questionary dependency.
- Cover local, remote, validation, and policy-category behavior in tests.
2026-06-18 15:02:18 +02:00

406 lines
19 KiB
Python

import json
from pathlib import Path
from unittest.mock import patch
from click.testing import CliRunner
import pytest
from browser_cli import BrowserCLI
from browser_cli.client import BrowserTarget
from browser_cli.cli import main
from browser_cli.command_security import CommandPolicy, assert_command_allowed, command_category
from browser_cli.commands import command_policy_from_options
def test_extension_info_cli_renders_capabilities():
with patch("browser_cli.send_command", return_value={"version": "1.2.3", "capabilities": ["extension.info"]}):
result = CliRunner().invoke(main, ["extension", "info"])
assert result.exit_code == 0
assert "1.2.3" in result.output
assert "extension.info" in result.output
def test_script_runs_raw_commands(tmp_path: Path):
script = tmp_path / "workflow.json"
script.write_text(json.dumps([{"tabs.count": {"pattern": "example.com"}}]), encoding="utf-8")
with patch("browser_cli.send_command", return_value={"count": 2}) as send_command:
result = CliRunner().invoke(main, ["script", str(script), "--json"])
assert result.exit_code == 0
assert "tabs.count" in result.output
send_command.assert_called_once_with("tabs.count", {"pattern": "example.com"}, profile=None, remote=None, key=None)
def test_session_export_cli_prints_json():
with patch("browser_cli.send_command", return_value={"name": "work", "session": {"tabs": ["https://example.com"]}}):
result = CliRunner().invoke(main, ["session", "export", "work"])
assert result.exit_code == 0
assert '"name": "work"' in result.output
def test_nav_open_reuse_navigates_existing_tab_instead_of_opening_new():
calls = []
def sender(command, args=None, **kwargs):
calls.append((command, args))
if command == "tabs.list":
return [{"id": 7, "windowId": 1, "active": False, "muted": False, "title": "Example", "url": "https://example.com"}]
return {}
BrowserCLI(browser="testing", _command_sender=sender).nav.open("https://example.com", reuse=True)
assert calls == [
("tabs.list", {}),
("navigate.to", {"tabId": 7, "url": "https://example.com"}),
]
def _tree_sender(tabs, groups):
def sender(command, args=None, **kwargs):
if command == "tabs.list":
return tabs
if command == "group.list":
return groups
return []
return sender
def test_tabs_tree_command_available():
with patch("browser_cli.send_command", side_effect=_tree_sender([], [])):
result = CliRunner().invoke(main, ["tabs", "tree"])
assert result.exit_code == 0
assert "Tabs" in result.output
def test_tabs_tree_handles_tabs_without_index_from_older_extension():
tabs = [{
"id": 7,
"windowId": 1,
"active": True,
"muted": False,
"title": "Example",
"url": "https://example.com",
"groupId": None,
}]
with patch("browser_cli.send_command", side_effect=_tree_sender(tabs, [])):
result = CliRunner().invoke(main, ["tabs", "tree"])
assert result.exit_code == 0
assert "Example" in result.output
def test_tabs_tree_preserves_window_tab_order_and_truncates_long_lines():
tabs = [
{"id": 1, "windowId": 1, "index": 0, "active": False, "title": "Before", "url": "https://example.com/before", "groupId": None},
{"id": 2, "windowId": 1, "index": 1, "active": False, "title": "[Gold] Grouped", "url": "https://example.com/grouped", "groupId": 20},
{"id": 3, "windowId": 1, "index": 2, "active": False, "title": "After", "url": "https://example.com/" + "x" * 200, "groupId": None},
]
groups = [{"id": 20, "title": "Group Name", "color": "blue", "collapsed": False, "tabCount": 1, "windowId": 1}]
with patch("browser_cli.send_command", side_effect=_tree_sender(tabs, groups)):
result = CliRunner().invoke(main, ["tabs", "tree"])
assert result.exit_code == 0
output = result.output
assert output.index("Before") < output.index("Group Name") < output.index("[Gold] Grouped") < output.index("After")
assert "https://example.com/before" not in output
assert "https://example.com/grouped" not in output
assert "https://example.com/" + "x" * 200 not in output
def test_tabs_tree_adds_each_browser_node_only_once():
tabs = [
{"id": 1, "windowId": 1, "index": 0, "active": False, "title": "One", "url": "https://example.com/one", "groupId": None, "browser": "work"},
{"id": 2, "windowId": 1, "index": 1, "active": False, "title": "Two", "url": "https://example.com/two", "groupId": None, "browser": "work"},
]
targets = [
BrowserTarget("work", "work", "/tmp/work.sock"),
BrowserTarget("personal", "personal", "/tmp/personal.sock"),
]
with patch("browser_cli.active_browser_targets", return_value=targets), \
patch("browser_cli.send_command", side_effect=_tree_sender(tabs, [])):
result = CliRunner().invoke(main, ["tabs", "tree"])
assert result.exit_code == 0
assert result.output.count("work") == 1
assert result.output.count("personal") == 1
assert "One" in result.output
assert "Two" in result.output
def test_tabs_tree_shows_tabs_inside_collapsed_browser_groups():
tabs = [
{"id": 1, "windowId": 1, "index": 0, "active": False, "title": "Before", "url": "https://example.com/before", "groupId": None},
{"id": 2, "windowId": 1, "index": 1, "active": False, "title": "Hidden", "url": "https://example.com/hidden", "groupId": 20},
{"id": 3, "windowId": 1, "index": 2, "active": False, "title": "After", "url": "https://example.com/after", "groupId": None},
]
groups = [{"id": 20, "title": "Collapsed Group", "color": "orange", "collapsed": True, "tabCount": 1, "windowId": 1}]
with patch("browser_cli.send_command", side_effect=_tree_sender(tabs, groups)):
result = CliRunner().invoke(main, ["tabs", "tree"])
assert result.exit_code == 0
assert "Collapsed Group" in result.output
assert "1 tab" in result.output
assert "collapsed" in result.output
assert "Hidden" in result.output
def test_tabs_tree_can_show_shortened_urls_on_request():
tabs = [{"id": 1, "windowId": 1, "index": 0, "active": False, "title": "Long URL", "url": "https://example.com/" + "x" * 200, "groupId": None}]
with patch("browser_cli.send_command", side_effect=_tree_sender(tabs, [])):
result = CliRunner().invoke(main, ["tabs", "tree", "--urls"])
assert result.exit_code == 0
assert "https://example.com/" in result.output
assert "https://example.com/" + "x" * 200 not in result.output
assert "" in result.output
def test_doctor_command_reports_connection_failure_cleanly():
with patch("browser_cli.commands.doctor.active_browser_targets", return_value=[]), \
patch("browser_cli.send_command", side_effect=RuntimeError("no browser")):
result = CliRunner().invoke(main, ["doctor"])
assert result.exit_code == 1
assert "Connection" in result.output
def test_serve_http_no_auth_rejected_on_public_host():
result = CliRunner().invoke(main, ["serve-http", "--host", "0.0.0.0", "--no-auth"])
assert result.exit_code != 0
assert "--no-auth is only allowed on loopback" in result.output
def test_serve_tcp_no_auth_rejected_on_public_host():
result = CliRunner().invoke(main, ["serve", "--host", "0.0.0.0", "--no-auth"])
assert result.exit_code != 0
assert "--no-auth is only allowed on loopback" in result.output
def test_serve_tcp_no_auth_allowed_on_loopback():
# Should pass the loopback guard and only fail later when trying to bind/serve.
# We stop it before serve_forever by mocking _serve_async to a no-op.
with patch("browser_cli.commands.serve._serve_async", return_value=None) as serve_async:
result = CliRunner().invoke(main, ["serve", "--host", "127.0.0.1", "--no-auth"])
assert result.exit_code == 0
assert serve_async.called
def _serve_security_for(args):
"""Invoke `serve` with the given args and return the ServeSecurity handed to _serve_async."""
with patch("browser_cli.commands.serve._serve_async", return_value=None) as serve_async:
result = CliRunner().invoke(main, ["serve", "--host", "127.0.0.1", "--no-auth", *args])
assert result.exit_code == 0, result.output
# _serve_async(host, port, profile, auth_keys_path, compress, security)
return serve_async.call_args.args[5]
def _serve_policy_for(args):
"""Convenience: the server-default CommandPolicy from a `serve` invocation."""
return _serve_security_for(args).policy
def test_serve_tcp_defaults_to_safe_only_policy():
policy = _serve_policy_for([])
assert policy == CommandPolicy() # safe-only, nothing opened
assert_command_allowed("tabs.list", policy)
with pytest.raises(PermissionError):
assert_command_allowed("dom.eval", policy)
with pytest.raises(PermissionError):
assert_command_allowed("navigate.open", policy)
def test_serve_tcp_allow_all_yields_unrestricted_policy():
policy = _serve_policy_for(["--allow-all"])
assert policy == CommandPolicy.unrestricted()
assert_command_allowed("dom.eval", policy)
assert_command_allowed("storage.get", policy)
def test_serve_tcp_allow_control_opens_only_control():
policy = _serve_policy_for(["--allow-control"])
assert_command_allowed("navigate.open", policy)
with pytest.raises(PermissionError):
assert_command_allowed("dom.eval", policy) # dangerous still blocked
def test_serve_tcp_default_rate_limit_active():
security = _serve_security_for([])
assert security.rate_limiter is not None
assert security.rate_limiter.rate == 100.0 # default
def test_serve_tcp_rate_limit_zero_disables():
security = _serve_security_for(["--rate-limit", "0"])
assert security.rate_limiter is None
def test_serve_tcp_per_key_policies_loaded_from_authorized_keys(tmp_path):
keys = tmp_path / "authorized_keys"
keys.write_text("abc123 reader allow:read-page\ndef456 admin allow:all\nghi789 plain\n")
security = _serve_security_for(["--authorized-keys", str(keys)])
assert security.key_policies["abc123"] == CommandPolicy(allow_read_page=True)
assert security.key_policies["def456"] == CommandPolicy.unrestricted()
assert "ghi789" not in security.key_policies # falls back to server default
assert security.key_names["def456"] == "admin"
def test_auth_trust_writes_inline_policy_token(tmp_path):
keys = tmp_path / "authorized_keys"
pub = "a" * 64
result = CliRunner().invoke(main, [
"auth", "trust", pub, "--name", "ci bot", "--file", str(keys),
"--allow-read-page", "--allow-control",
])
assert result.exit_code == 0
line = keys.read_text().strip()
assert line == f"{pub} ci bot allow:read-page,control"
def test_auth_trust_without_allow_flags_writes_no_token(tmp_path):
keys = tmp_path / "authorized_keys"
pub = "b" * 64
result = CliRunner().invoke(main, ["auth", "trust", pub, "--name", "plain", "--file", str(keys)])
assert result.exit_code == 0
assert keys.read_text().strip() == f"{pub} plain"
def test_auth_keys_local_shows_policy_column(tmp_path):
keys = tmp_path / "authorized_keys"
keys.write_text(f"{'a' * 64} reader allow:read-page\n{'b' * 64} admin allow:all\n{'c' * 64} plain\n")
result = CliRunner().invoke(main, ["auth", "keys", "--file", str(keys)])
assert result.exit_code == 0
assert "Policy" in result.output
assert "Description" in result.output
assert "read-page" in result.output
assert "all" in result.output
assert "server default" in result.output
assert "read page content" in result.output
assert "Full access" in result.output
def test_auth_policy_updates_existing_key_policy(tmp_path):
keys = tmp_path / "authorized_keys"
pub = "a" * 64
keys.write_text(f"{pub} YubiKey 5C NFC FIPS\n")
result = CliRunner().invoke(main, [
"auth", "policy", pub, "--file", str(keys), "--allow-read-page", "--allow-control",
])
assert result.exit_code == 0
assert keys.read_text().strip() == f"{pub} YubiKey 5C NFC FIPS allow:read-page,control"
assert "Updated policy" in result.output
def test_auth_policy_can_set_safe_and_server_default_by_name(tmp_path):
keys = tmp_path / "authorized_keys"
pub = "b" * 64
keys.write_text(f"{pub} laptop allow:all\n")
safe_result = CliRunner().invoke(main, ["auth", "policy", "laptop", "--file", str(keys), "--safe"])
assert safe_result.exit_code == 0
assert keys.read_text().strip() == f"{pub} laptop allow:"
default_result = CliRunner().invoke(main, ["auth", "policy", "laptop", "--file", str(keys), "--server-default"])
assert default_result.exit_code == 0
assert keys.read_text().strip() == f"{pub} laptop"
def test_auth_policy_requires_policy_mode_when_not_interactive(tmp_path):
keys = tmp_path / "authorized_keys"
keys.write_text(f"{'a' * 64} laptop\n")
result = CliRunner().invoke(main, ["auth", "policy", "laptop", "--file", str(keys)])
assert result.exit_code == 1
assert "Choose a policy mode" in result.output
def test_auth_policy_rejects_conflicting_policy_modes(tmp_path):
keys = tmp_path / "authorized_keys"
keys.write_text(f"{'a' * 64} laptop\n")
result = CliRunner().invoke(main, ["auth", "policy", "laptop", "--file", str(keys), "--safe", "--allow-all"])
assert result.exit_code == 1
assert "Choose exactly one policy mode" in result.output
def test_parse_interactive_policy_selection():
from browser_cli.commands.auth import _parse_checkbox_policy_selection, _parse_policy_selection
assert _parse_policy_selection("1,2") == ["read-page", "control"]
assert _parse_policy_selection("read-page control") == ["read-page", "control"]
assert _parse_policy_selection("all") == ["all"]
assert _parse_policy_selection("safe") == []
assert _parse_policy_selection("default") is None
assert _parse_checkbox_policy_selection(["read-page", "control"]) == ["read-page", "control"]
assert _parse_checkbox_policy_selection(["__all__"]) == ["all"]
assert _parse_checkbox_policy_selection(["__safe__"]) == []
assert _parse_checkbox_policy_selection(["__server_default__"]) is None
def test_auth_policy_without_identifier_requires_interactive_picker():
result = CliRunner().invoke(main, ["auth", "policy", "--allow-all"])
assert result.exit_code == 1
assert "Missing key identifier" in result.output
def test_auth_policy_remote_sends_policy_command():
pub = "c" * 64
with patch("browser_cli.client.send_command", return_value={"pubkey": pub, "name": "remote key", "allow": ["all"]}) as send:
result = CliRunner().invoke(main, ["--remote", "browser-host.example:8765", "auth", "policy", pub, "--allow-all"])
assert result.exit_code == 0
send.assert_called_once()
assert send.call_args.kwargs["args"] == {"identifier": pub, "allow": ["all"]}
assert "Updated policy" in result.output
def test_auth_keys_remote_unreachable_clean_error():
"""`auth keys --remote` on an unreachable host shows a clean error, not a traceback."""
from browser_cli.client import BrowserNotConnected
with patch("browser_cli.client.send_command", side_effect=BrowserNotConnected("Cannot connect to remote browser at x.")):
result = CliRunner().invoke(main, ["--remote", "x.example:8765", "auth", "keys"])
assert result.exit_code == 1
assert isinstance(result.exception, SystemExit) # handled, not a raw exception
assert "Error:" in result.output
assert "Cannot connect" in result.output
def test_auth_trust_remote_unreachable_clean_error():
from browser_cli.client import BrowserNotConnected
with patch("browser_cli.client.send_command", side_effect=BrowserNotConnected("Cannot connect to remote browser at x.")):
result = CliRunner().invoke(main, ["--remote", "x.example:8765", "auth", "trust", "a" * 64])
assert result.exit_code == 1
assert isinstance(result.exception, SystemExit)
assert "Error:" in result.output
def test_serve_http_token_check_is_constant_time():
"""The bearer-token comparison uses secrets.compare_digest, not ==."""
from browser_cli.commands.serve_http import _Handler
handler = _Handler.__new__(_Handler)
handler.token = "s3cret-token"
handler.headers = {"Authorization": "Bearer s3cret-token"}
assert handler._authorized() is True
handler.headers = {"Authorization": "Bearer wrong"}
assert handler._authorized() is False
handler.headers = {"X-Browser-CLI-Token": "s3cret-token"}
assert handler._authorized() is True
handler.headers = {"X-Browser-CLI-Token": "nope"}
assert handler._authorized() is False
handler.headers = {}
assert handler._authorized() is False
# No token configured → open.
handler.token = None
assert handler._authorized() is True
def test_serve_http_uses_compare_digest():
import inspect
from browser_cli.commands import serve_http
src = inspect.getsource(serve_http._Handler._authorized)
assert "compare_digest" in src
assert "== f\"Bearer" not in src
def test_command_policy_allow_all_grants_everything():
policy = command_policy_from_options(
allow_read_page=False, allow_control=False, allow_dangerous=False, allow_all=True
)
assert policy == CommandPolicy.unrestricted()
assert_command_allowed("dom.eval", policy)
assert_command_allowed("storage.get", policy)
def test_raw_command_blocks_dangerous_by_default():
result = CliRunner().invoke(main, ["command", "dom.eval", '{"code":"document.title"}'])
assert result.exit_code != 0
assert "blocked by default" in result.output
def test_raw_command_allows_dangerous_with_explicit_flag():
with patch("browser_cli.send_command", return_value="Example") as send_command:
result = CliRunner().invoke(main, ["command", "--allow-dangerous", "dom.eval", '{"code":"document.title"}'])
assert result.exit_code == 0
send_command.assert_called_once_with("dom.eval", {"code": "document.title"}, profile=None, remote=None, key=None)
def test_script_blocks_control_without_explicit_flag(tmp_path: Path):
script = tmp_path / "workflow.json"
script.write_text(json.dumps([{"navigate.open": {"url": "https://example.com"}}]), encoding="utf-8")
result = CliRunner().invoke(main, ["script", str(script), "--json"])
assert result.exit_code != 0
assert "blocked by default" in result.output
def test_script_allows_control_with_explicit_flag(tmp_path: Path):
script = tmp_path / "workflow.json"
script.write_text(json.dumps([{"navigate.open": {"url": "https://example.com"}}]), encoding="utf-8")
with patch("browser_cli.send_command", return_value={}) as send_command:
result = CliRunner().invoke(main, ["script", str(script), "--json", "--allow-control"])
assert result.exit_code == 0
send_command.assert_called_once_with("navigate.open", {"url": "https://example.com"}, profile=None, remote=None, key=None)
def test_command_policy_categories_and_flags():
assert command_category("tabs.list") == "safe"
assert command_category("extract.text") == "read-page"
assert command_category("dom.click") == "control"
assert command_category("storage.get") == "dangerous"
assert_command_allowed("tabs.list", CommandPolicy())
with pytest.raises(PermissionError):
assert_command_allowed("extract.text", CommandPolicy())
assert_command_allowed("extract.text", CommandPolicy(allow_read_page=True))
with pytest.raises(PermissionError):
assert_command_allowed("storage.get", CommandPolicy(allow_read_page=True, allow_control=True))
assert_command_allowed("storage.get", CommandPolicy(allow_dangerous=True))