214 lines
7.3 KiB
Python
214 lines
7.3 KiB
Python
"""Remote protocol compatibility matrix for post-quantum transport.
|
|
|
|
These tests exercise the wire-level combinations that matter for mixed
|
|
browser-cli versions without requiring a real browser. The native-host lookup is
|
|
mocked so successful auth/transport reaches the proxy layer and then returns the
|
|
expected "browser not connected" error.
|
|
"""
|
|
import contextlib
|
|
import io
|
|
import json
|
|
import os
|
|
import socket
|
|
import struct
|
|
import threading
|
|
|
|
import pytest
|
|
|
|
from browser_cli.auth import (
|
|
generate_keypair,
|
|
load_private_key,
|
|
pq_decrypt,
|
|
pq_encrypt,
|
|
pq_kex_client_encapsulate,
|
|
pq_kex_server_decapsulate,
|
|
pq_kex_server_keypair,
|
|
sign,
|
|
)
|
|
from browser_cli.client import BrowserNotConnected, send_command
|
|
from browser_cli.commands.serve import _handle_client
|
|
|
|
def _send_framed(sock: socket.socket, msg: dict) -> None:
|
|
payload = json.dumps(msg).encode("utf-8")
|
|
sock.sendall(struct.pack("<I", len(payload)) + payload)
|
|
|
|
def _recv_framed(sock: socket.socket) -> dict:
|
|
raw_len = b""
|
|
while len(raw_len) < 4:
|
|
chunk = sock.recv(4 - len(raw_len))
|
|
if not chunk:
|
|
raise ConnectionError("socket closed before response header")
|
|
raw_len += chunk
|
|
msg_len = struct.unpack("<I", raw_len)[0]
|
|
data = b""
|
|
while len(data) < msg_len:
|
|
chunk = sock.recv(msg_len - len(data))
|
|
if not chunk:
|
|
raise ConnectionError("socket closed mid-response")
|
|
data += chunk
|
|
return json.loads(data)
|
|
|
|
@pytest.fixture()
|
|
def auth_material(tmp_path):
|
|
pem, pub = generate_keypair()
|
|
key_path = tmp_path / "client.key.pem"
|
|
key_path.write_bytes(pem)
|
|
auth_path = tmp_path / "authorized_keys"
|
|
auth_path.write_text(pub + "\n", encoding="utf-8")
|
|
return key_path, auth_path, load_private_key(key_path), pub
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def no_browser(monkeypatch):
|
|
def _raise_no_browser(*_args, **_kwargs):
|
|
raise BrowserNotConnected("no browser")
|
|
|
|
monkeypatch.setattr("browser_cli.client._resolve_socket", _raise_no_browser)
|
|
|
|
def _connect(auth_keys_path):
|
|
client, server = socket.socketpair()
|
|
thread = threading.Thread(
|
|
target=_handle_client,
|
|
args=(server, ("127.0.0.1", 9999), None, auth_keys_path),
|
|
daemon=True,
|
|
)
|
|
thread.start()
|
|
challenge = _recv_framed(client)
|
|
return client, thread, challenge
|
|
|
|
def _pq_auth_message(priv, pub: str, nonce_hex: str, command_msg: dict, challenge: dict, *, encrypted: bool) -> tuple[dict, bytes]:
|
|
if "pq_kex" not in challenge:
|
|
pytest.skip("ML-KEM backend not available")
|
|
|
|
ciphertext_hex, shared_secret = pq_kex_client_encapsulate(challenge["pq_kex"]["public_key"])
|
|
clean_msg = {
|
|
**command_msg,
|
|
"pq_kex": {"alg": "ML-KEM-768", "ciphertext": ciphertext_hex},
|
|
}
|
|
sig = sign(priv, bytes.fromhex(nonce_hex), clean_msg, shared_secret).hex()
|
|
if not encrypted:
|
|
return {**clean_msg, "pubkey": pub, "sig": sig}, shared_secret
|
|
|
|
envelope = {
|
|
"id": clean_msg["id"],
|
|
"user_agent": clean_msg["user_agent"],
|
|
"pubkey": pub,
|
|
"sig": sig,
|
|
"pq_kex": clean_msg["pq_kex"],
|
|
"encrypted": pq_encrypt(shared_secret, "request", json.dumps(clean_msg).encode("utf-8")),
|
|
}
|
|
return envelope, shared_secret
|
|
|
|
def _assert_browser_not_connected(resp: dict) -> None:
|
|
assert resp.get("success") is False
|
|
assert "browser" in resp.get("error", "").lower() or "connected" in resp.get("error", "").lower()
|
|
|
|
def test_real_mlkem_primitive_roundtrip():
|
|
keypair = pq_kex_server_keypair()
|
|
if keypair is None:
|
|
pytest.skip("ML-KEM backend not available")
|
|
private_key, public_key = keypair
|
|
|
|
ciphertext_hex, client_secret = pq_kex_client_encapsulate(public_key.hex())
|
|
server_secret = pq_kex_server_decapsulate(private_key, ciphertext_hex)
|
|
|
|
assert server_secret == client_secret
|
|
|
|
@pytest.mark.parametrize(
|
|
("client_version", "encrypted", "expect_encrypted_response"),
|
|
[
|
|
("0.9.3", False, False), # legacy client stays compatible
|
|
("0.9.5", True, True), # current client must use encrypted transport
|
|
],
|
|
)
|
|
def test_remote_protocol_version_matrix(auth_material, client_version, encrypted, expect_encrypted_response):
|
|
selected_version = os.environ.get("BROWSER_CLI_COMPAT_CLIENT_VERSION")
|
|
if selected_version and selected_version != client_version:
|
|
pytest.skip(f"compat matrix selected {selected_version}")
|
|
|
|
_key_path, auth_path, priv, pub = auth_material
|
|
client, thread, challenge = _connect(auth_path)
|
|
|
|
msg = {
|
|
"id": f"tabs-{client_version}",
|
|
"command": "tabs.list",
|
|
"args": {},
|
|
"user_agent": f"browser-cli/{client_version}",
|
|
}
|
|
wire_msg, shared_secret = _pq_auth_message(priv, pub, challenge["nonce"], msg, challenge, encrypted=encrypted)
|
|
_send_framed(client, wire_msg)
|
|
resp = _recv_framed(client)
|
|
|
|
if expect_encrypted_response:
|
|
assert set(resp) == {"encrypted"}
|
|
resp = json.loads(pq_decrypt(shared_secret, "response", resp["encrypted"]))
|
|
else:
|
|
assert "encrypted" not in resp
|
|
|
|
_assert_browser_not_connected(resp)
|
|
client.close()
|
|
thread.join(timeout=2)
|
|
|
|
def test_current_client_plaintext_transport_is_rejected(auth_material):
|
|
_key_path, auth_path, priv, pub = auth_material
|
|
client, thread, challenge = _connect(auth_path)
|
|
|
|
msg = {
|
|
"id": "new-plain",
|
|
"command": "tabs.list",
|
|
"args": {},
|
|
"user_agent": "browser-cli/0.9.5",
|
|
}
|
|
wire_msg, _shared_secret = _pq_auth_message(priv, pub, challenge["nonce"], msg, challenge, encrypted=False)
|
|
_send_framed(client, wire_msg)
|
|
resp = _recv_framed(client)
|
|
|
|
assert resp.get("success") is False
|
|
assert "encrypted transport" in resp.get("error", "").lower()
|
|
client.close()
|
|
thread.join(timeout=2)
|
|
|
|
def test_send_command_uses_encrypted_remote_transport(auth_material):
|
|
key_path, auth_path, _priv, _pub = auth_material
|
|
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
server.bind(("127.0.0.1", 0))
|
|
server.listen(1)
|
|
host, port = server.getsockname()
|
|
|
|
def _accept_once():
|
|
conn, addr = server.accept()
|
|
_handle_client(conn, addr, None, auth_path)
|
|
server.close()
|
|
|
|
thread = threading.Thread(target=_accept_once, daemon=True)
|
|
thread.start()
|
|
|
|
with pytest.raises(RuntimeError, match="browser|connected"):
|
|
send_command("tabs.list", remote=f"{host}:{port}", profile="default", key=key_path)
|
|
|
|
thread.join(timeout=2)
|
|
|
|
def test_no_mlkem_backend_falls_back_and_client_warns(auth_material, monkeypatch):
|
|
key_path, auth_path, _priv, _pub = auth_material
|
|
monkeypatch.setattr("browser_cli.auth.pq_kex_server_keypair", lambda: None)
|
|
|
|
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
server.bind(("127.0.0.1", 0))
|
|
server.listen(1)
|
|
host, port = server.getsockname()
|
|
|
|
def _accept_once():
|
|
conn, addr = server.accept()
|
|
_handle_client(conn, addr, None, auth_path)
|
|
server.close()
|
|
|
|
thread = threading.Thread(target=_accept_once, daemon=True)
|
|
thread.start()
|
|
|
|
stderr = io.StringIO()
|
|
with contextlib.redirect_stderr(stderr):
|
|
with pytest.raises(RuntimeError, match="browser|connected"):
|
|
send_command("tabs.list", remote=f"{host}:{port}", profile="default", key=key_path)
|
|
|
|
assert "not using a post-quantum key exchange" in stderr.getvalue()
|
|
thread.join(timeout=2)
|