"""Unit tests for the TCP serve layer (challenge-response auth, framing, rejection paths).""" import json import socket import struct import threading import pytest from browser_cli.auth import generate_keypair, load_private_key, new_nonce, pq_decrypt, pq_encrypt, sign from browser_cli.client import BrowserNotConnected from browser_cli.commands.serve import _handle_client FAKE_UA = "browser-cli/0.9.3" # ── helpers ──────────────────────────────────────────────────────────────────── def _send_framed(sock: socket.socket, data: bytes) -> None: sock.sendall(struct.pack(" dict: raw = b"" while len(raw) < 4: chunk = sock.recv(4 - len(raw)) if not chunk: raise ConnectionError("socket closed before response header") raw += chunk n = struct.unpack(" threading.Thread: t = threading.Thread( target=_handle_client, args=(server_sock, ("127.0.0.1", 9999), None, auth_keys_path), daemon=True, ) t.start() return t def _pair(): return socket.socketpair() def _mock_no_browser(*_args, **_kwargs): raise BrowserNotConnected("no browser") # ── challenge frame ──────────────────────────────────────────────────────────── class TestChallenge: def test_challenge_sent_on_connect(self): client, server = _pair() t = _spawn(server, None) challenge = _recv_framed(client) assert challenge["type"] == "challenge" assert "nonce" in challenge client.close() t.join(timeout=2) def test_challenge_includes_version_fields(self): client, server = _pair() t = _spawn(server, None) challenge = _recv_framed(client) assert "server_version" in challenge assert "min_client_version" in challenge client.close() t.join(timeout=2) def test_challenge_advertises_post_quantum_kex_when_available(self, tmp_path, monkeypatch): monkeypatch.setattr("browser_cli.auth.pq_kex_server_keypair", lambda: ("fake-private", b"fake-public")) path = tmp_path / "authorized_keys" _, pub = generate_keypair() path.write_text(pub + "\n") client, server = _pair() t = _spawn(server, path) challenge = _recv_framed(client) assert challenge["pq_kex"] == {"alg": "ML-KEM-768", "public_key": b"fake-public".hex()} client.close() t.join(timeout=2) # ── rejection paths ──────────────────────────────────────────────────────────── class TestRejection: def _connect(self, auth_keys_path): client, server = _pair() t = _spawn(server, auth_keys_path) challenge = _recv_framed(client) return client, t, challenge def test_bad_user_agent_rejected(self): client, t, _ = self._connect(None) msg = {"id": "x", "command": "tabs.list", "args": {}, "user_agent": "curl/7.88"} _send_framed(client, json.dumps(msg).encode()) resp = _recv_framed(client) assert resp["success"] is False assert "forbidden" in resp["error"].lower() or "client" in resp["error"].lower() client.close() t.join(timeout=2) def test_missing_pubkey_sig_rejected(self, tmp_path): path = tmp_path / "authorized_keys" _, pub = generate_keypair() path.write_text(pub + "\n") client, t, _ = self._connect(path) msg = {"id": "x", "command": "tabs.list", "args": {}, "user_agent": FAKE_UA} _send_framed(client, json.dumps(msg).encode()) resp = _recv_framed(client) assert resp["success"] is False assert "unauthorized" in resp["error"].lower() client.close() t.join(timeout=2) def test_untrusted_pubkey_rejected(self, tmp_path): path = tmp_path / "authorized_keys" _, trusted_pub = generate_keypair() path.write_text(trusted_pub + "\n") pem, untrusted_pub = generate_keypair() key_path = tmp_path / "other.pem" key_path.write_bytes(pem) priv = load_private_key(key_path) client, t, challenge = self._connect(path) nonce = bytes.fromhex(challenge["nonce"]) msg = {"id": "x", "command": "tabs.list", "args": {}, "user_agent": FAKE_UA, "pubkey": untrusted_pub} msg["sig"] = sign(priv, nonce, msg).hex() _send_framed(client, json.dumps(msg).encode()) resp = _recv_framed(client) assert resp["success"] is False assert "untrusted" in resp["error"].lower() client.close() t.join(timeout=2) def test_bad_signature_rejected(self, tmp_path): path = tmp_path / "authorized_keys" _, pub = generate_keypair() path.write_text(pub + "\n") client, t, _ = self._connect(path) msg = {"id": "x", "command": "tabs.list", "args": {}, "user_agent": FAKE_UA, "pubkey": pub, "sig": "00" * 64} _send_framed(client, json.dumps(msg).encode()) resp = _recv_framed(client) assert resp["success"] is False assert "signature" in resp["error"].lower() or "invalid" in resp["error"].lower() client.close() t.join(timeout=2) def test_missing_post_quantum_kex_rejected_when_required(self, tmp_path, monkeypatch): monkeypatch.setattr("browser_cli.auth.pq_kex_server_keypair", lambda: ("fake-private", b"fake-public")) path = tmp_path / "authorized_keys" pem, pub = generate_keypair() path.write_text(pub + "\n") priv_path = tmp_path / "client.pem" priv_path.write_bytes(pem) priv = load_private_key(priv_path) client, t, challenge = self._connect(path) nonce = bytes.fromhex(challenge["nonce"]) msg = {"id": "x", "command": "tabs.list", "args": {}, "user_agent": "browser-cli/0.9.5", "pubkey": pub} msg["sig"] = sign(priv, nonce, msg).hex() _send_framed(client, json.dumps(msg).encode()) resp = _recv_framed(client) assert resp["success"] is False assert "post-quantum" in resp["error"].lower() client.close() t.join(timeout=2) def test_oversized_message_rejected(self): client, server = _pair() t = _spawn(server, None) _recv_framed(client) # consume challenge client.sendall(struct.pack("