Files
browser-cli/tests/test_remote_protocol_matrix.py
daniel156161 076914e5b7 refactor: reorganize client transport and extension internals
- Split client, native, remote, serve, markdown, and SDK internals into focused packages with direct imports.
- Move local and remote transport framing/protocol helpers behind clearer module boundaries.
- Break up the extension injected DOM logic into a separate content dispatch bundle and dedicated content modules.
- Add explicit client handling for passive remote discovery without noisy PQ warnings.
- Keep behavior covered with updated unit, integration, and extension tests.
2026-06-13 23:31:24 +02:00

214 lines
7.3 KiB
Python

"""Remote protocol compatibility matrix for post-quantum transport.
These tests exercise the wire-level combinations that matter for mixed
browser-cli versions without requiring a real browser. The native-host lookup is
mocked so successful auth/transport reaches the proxy layer and then returns the
expected "browser not connected" error.
"""
import contextlib
import io
import json
import os
import socket
import struct
import threading
import pytest
from browser_cli.auth import (
generate_keypair,
load_private_key,
pq_decrypt,
pq_encrypt,
pq_kex_client_encapsulate,
pq_kex_server_decapsulate,
pq_kex_server_keypair,
sign,
)
from browser_cli.client import BrowserNotConnected, send_command
from browser_cli.commands.serve import _handle_client
def _send_framed(sock: socket.socket, msg: dict) -> None:
payload = json.dumps(msg).encode("utf-8")
sock.sendall(struct.pack("<I", len(payload)) + payload)
def _recv_framed(sock: socket.socket) -> dict:
raw_len = b""
while len(raw_len) < 4:
chunk = sock.recv(4 - len(raw_len))
if not chunk:
raise ConnectionError("socket closed before response header")
raw_len += chunk
msg_len = struct.unpack("<I", raw_len)[0]
data = b""
while len(data) < msg_len:
chunk = sock.recv(msg_len - len(data))
if not chunk:
raise ConnectionError("socket closed mid-response")
data += chunk
return json.loads(data)
@pytest.fixture()
def auth_material(tmp_path):
pem, pub = generate_keypair()
key_path = tmp_path / "client.key.pem"
key_path.write_bytes(pem)
auth_path = tmp_path / "authorized_keys"
auth_path.write_text(pub + "\n", encoding="utf-8")
return key_path, auth_path, load_private_key(key_path), pub
@pytest.fixture(autouse=True)
def no_browser(monkeypatch):
def _raise_no_browser(*_args, **_kwargs):
raise BrowserNotConnected("no browser")
monkeypatch.setattr("browser_cli.client.targets.resolve_socket", _raise_no_browser)
def _connect(auth_keys_path):
client, server = socket.socketpair()
thread = threading.Thread(
target=_handle_client,
args=(server, ("127.0.0.1", 9999), None, auth_keys_path),
daemon=True,
)
thread.start()
challenge = _recv_framed(client)
return client, thread, challenge
def _pq_auth_message(priv, pub: str, nonce_hex: str, command_msg: dict, challenge: dict, *, encrypted: bool) -> tuple[dict, bytes]:
if "pq_kex" not in challenge:
pytest.skip("ML-KEM backend not available")
ciphertext_hex, shared_secret = pq_kex_client_encapsulate(challenge["pq_kex"]["public_key"])
clean_msg = {
**command_msg,
"pq_kex": {"alg": "ML-KEM-768", "ciphertext": ciphertext_hex},
}
sig = sign(priv, bytes.fromhex(nonce_hex), clean_msg, shared_secret).hex()
if not encrypted:
return {**clean_msg, "pubkey": pub, "sig": sig}, shared_secret
envelope = {
"id": clean_msg["id"],
"user_agent": clean_msg["user_agent"],
"pubkey": pub,
"sig": sig,
"pq_kex": clean_msg["pq_kex"],
"encrypted": pq_encrypt(shared_secret, "request", json.dumps(clean_msg).encode("utf-8")),
}
return envelope, shared_secret
def _assert_browser_not_connected(resp: dict) -> None:
assert resp.get("success") is False
assert "browser" in resp.get("error", "").lower() or "connected" in resp.get("error", "").lower()
def test_real_mlkem_primitive_roundtrip():
keypair = pq_kex_server_keypair()
if keypair is None:
pytest.skip("ML-KEM backend not available")
private_key, public_key = keypair
ciphertext_hex, client_secret = pq_kex_client_encapsulate(public_key.hex())
server_secret = pq_kex_server_decapsulate(private_key, ciphertext_hex)
assert server_secret == client_secret
@pytest.mark.parametrize(
("client_version", "encrypted", "expect_encrypted_response"),
[
("0.9.3", False, False), # legacy client stays compatible
("0.9.5", True, True), # current client must use encrypted transport
],
)
def test_remote_protocol_version_matrix(auth_material, client_version, encrypted, expect_encrypted_response):
selected_version = os.environ.get("BROWSER_CLI_COMPAT_CLIENT_VERSION")
if selected_version and selected_version != client_version:
pytest.skip(f"compat matrix selected {selected_version}")
_key_path, auth_path, priv, pub = auth_material
client, thread, challenge = _connect(auth_path)
msg = {
"id": f"tabs-{client_version}",
"command": "tabs.list",
"args": {},
"user_agent": f"browser-cli/{client_version}",
}
wire_msg, shared_secret = _pq_auth_message(priv, pub, challenge["nonce"], msg, challenge, encrypted=encrypted)
_send_framed(client, wire_msg)
resp = _recv_framed(client)
if expect_encrypted_response:
assert set(resp) == {"encrypted"}
resp = json.loads(pq_decrypt(shared_secret, "response", resp["encrypted"]))
else:
assert "encrypted" not in resp
_assert_browser_not_connected(resp)
client.close()
thread.join(timeout=2)
def test_current_client_plaintext_transport_is_rejected(auth_material):
_key_path, auth_path, priv, pub = auth_material
client, thread, challenge = _connect(auth_path)
msg = {
"id": "new-plain",
"command": "tabs.list",
"args": {},
"user_agent": "browser-cli/0.9.5",
}
wire_msg, _shared_secret = _pq_auth_message(priv, pub, challenge["nonce"], msg, challenge, encrypted=False)
_send_framed(client, wire_msg)
resp = _recv_framed(client)
assert resp.get("success") is False
assert "encrypted transport" in resp.get("error", "").lower()
client.close()
thread.join(timeout=2)
def test_send_command_uses_encrypted_remote_transport(auth_material):
key_path, auth_path, _priv, _pub = auth_material
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind(("127.0.0.1", 0))
server.listen(1)
host, port = server.getsockname()
def _accept_once():
conn, addr = server.accept()
_handle_client(conn, addr, None, auth_path)
server.close()
thread = threading.Thread(target=_accept_once, daemon=True)
thread.start()
with pytest.raises(RuntimeError, match="browser|connected"):
send_command("tabs.list", remote=f"{host}:{port}", profile="default", key=key_path)
thread.join(timeout=2)
def test_no_mlkem_backend_falls_back_and_client_warns(auth_material, monkeypatch):
key_path, auth_path, _priv, _pub = auth_material
monkeypatch.setattr("browser_cli.auth.pq_kex_server_keypair", lambda: None)
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind(("127.0.0.1", 0))
server.listen(1)
host, port = server.getsockname()
def _accept_once():
conn, addr = server.accept()
_handle_client(conn, addr, None, auth_path)
server.close()
thread = threading.Thread(target=_accept_once, daemon=True)
thread.start()
stderr = io.StringIO()
with contextlib.redirect_stderr(stderr):
with pytest.raises(RuntimeError, match="browser|connected"):
send_command("tabs.list", remote=f"{host}:{port}", profile="default", key=key_path)
assert "not using a post-quantum key exchange" in stderr.getvalue()
thread.join(timeout=2)