Encrypt remote transport with post-quantum session keys
Testing / test (push) Successful in 21s
Package Extension / package-extension (push) Successful in 18s
Build & Publish Package / publish (push) Successful in 29s

This commit is contained in:
2026-05-05 10:49:38 +02:00
parent 9096efd36a
commit 94c87e244b
8 changed files with 174 additions and 18 deletions
+33
View File
@@ -10,6 +10,9 @@ from pathlib import Path
from cryptography.exceptions import InvalidSignature from cryptography.exceptions import InvalidSignature
from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey, Ed25519PublicKey from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey, Ed25519PublicKey
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.ciphers.aead import ChaCha20Poly1305
from cryptography.hazmat.primitives.kdf.hkdf import HKDF
from cryptography.hazmat.primitives.serialization import ( from cryptography.hazmat.primitives.serialization import (
Encoding, Encoding,
NoEncryption, NoEncryption,
@@ -190,6 +193,7 @@ def verify(pub_hex: str, nonce: bytes, msg: dict, sig_hex: str, pq_shared_secret
# ── Post-quantum key exchange (ML-KEM / Kyber) ──────────────────────────────── # ── Post-quantum key exchange (ML-KEM / Kyber) ────────────────────────────────
PQ_KEX_ALG = "ML-KEM-768" PQ_KEX_ALG = "ML-KEM-768"
PQ_TRANSPORT_ALG = "ML-KEM-768+ChaCha20Poly1305"
def pq_kex_server_keypair(): def pq_kex_server_keypair():
@@ -221,6 +225,35 @@ def pq_kex_server_decapsulate(private_key, ciphertext_hex: str) -> bytes:
return private_key.decapsulate(bytes.fromhex(ciphertext_hex)) return private_key.decapsulate(bytes.fromhex(ciphertext_hex))
def _pq_transport_key(shared_secret: bytes, direction: str) -> bytes:
return HKDF(
algorithm=hashes.SHA256(),
length=32,
salt=None,
info=f"browser-cli pq transport v1 {direction}".encode("ascii"),
).derive(shared_secret)
def pq_encrypt(shared_secret: bytes, direction: str, plaintext: bytes) -> dict:
"""Encrypt an app-layer frame with a key derived from the ML-KEM secret."""
nonce = secrets.token_bytes(12)
key = _pq_transport_key(shared_secret, direction)
ciphertext = ChaCha20Poly1305(key).encrypt(nonce, plaintext, None)
return {"alg": PQ_TRANSPORT_ALG, "nonce": nonce.hex(), "ciphertext": ciphertext.hex()}
def pq_decrypt(shared_secret: bytes, direction: str, envelope: dict) -> bytes:
"""Decrypt an app-layer frame produced by pq_encrypt()."""
if not isinstance(envelope, dict) or envelope.get("alg") != PQ_TRANSPORT_ALG:
raise ValueError("unsupported encrypted transport envelope")
key = _pq_transport_key(shared_secret, direction)
return ChaCha20Poly1305(key).decrypt(
bytes.fromhex(str(envelope["nonce"])),
bytes.fromhex(str(envelope["ciphertext"])),
None,
)
def new_nonce() -> str: def new_nonce() -> str:
return secrets.token_hex(32) return secrets.token_hex(32)
+34 -4
View File
@@ -13,6 +13,7 @@ import os
import re import re
import socket import socket
import struct import struct
import sys
import uuid import uuid
from dataclasses import dataclass from dataclasses import dataclass
from multiprocessing.connection import Client as PipeClient from multiprocessing.connection import Client as PipeClient
@@ -318,22 +319,51 @@ def _send_remote(endpoint: str, msg: dict, private_key=None) -> bytes | None:
except (IndexError, ValueError): except (IndexError, ValueError):
pass pass
pq_shared_secret = None
if nonce_hex and private_key is not None: if nonce_hex and private_key is not None:
from browser_cli.auth import PQ_KEX_ALG, pq_kex_client_encapsulate, sign, public_key_hex from browser_cli.auth import PQ_KEX_ALG, pq_encrypt, pq_kex_client_encapsulate, sign, public_key_hex
nonce = bytes.fromhex(nonce_hex) nonce = bytes.fromhex(nonce_hex)
clean_msg = {k: v for k, v in msg.items() if k not in {"token", "pubkey", "sig", "pq_kex"}} clean_msg = {k: v for k, v in msg.items() if k not in {"token", "pubkey", "sig", "pq_kex", "encrypted"}}
pq_shared_secret = None
kex = challenge.get("pq_kex") if isinstance(challenge, dict) else None kex = challenge.get("pq_kex") if isinstance(challenge, dict) else None
if isinstance(kex, dict) and kex.get("alg") == PQ_KEX_ALG and kex.get("public_key"): if isinstance(kex, dict) and kex.get("alg") == PQ_KEX_ALG and kex.get("public_key"):
ciphertext_hex, pq_shared_secret = pq_kex_client_encapsulate(str(kex["public_key"])) ciphertext_hex, pq_shared_secret = pq_kex_client_encapsulate(str(kex["public_key"]))
clean_msg["pq_kex"] = {"alg": PQ_KEX_ALG, "ciphertext": ciphertext_hex} clean_msg["pq_kex"] = {"alg": PQ_KEX_ALG, "ciphertext": ciphertext_hex}
else:
sys.stderr.write(
"** WARNING: connection is not using a post-quantum key exchange algorithm.\n"
"** This session may be vulnerable to store now, decrypt later attacks.\n"
)
sig = sign(private_key, nonce, clean_msg, pq_shared_secret) sig = sign(private_key, nonce, clean_msg, pq_shared_secret)
msg = {**clean_msg, "pubkey": public_key_hex(private_key), "sig": sig.hex()} msg = {**clean_msg, "pubkey": public_key_hex(private_key), "sig": sig.hex()}
if pq_shared_secret is not None:
encrypted = pq_encrypt(pq_shared_secret, "request", json.dumps(clean_msg).encode("utf-8"))
msg = {
"id": clean_msg.get("id"),
"user_agent": clean_msg.get("user_agent"),
"pubkey": public_key_hex(private_key),
"sig": sig.hex(),
"pq_kex": clean_msg["pq_kex"],
"encrypted": encrypted,
}
else:
sys.stderr.write(
"** WARNING: connection is not using a post-quantum key exchange algorithm.\n"
"** This session may be vulnerable to store now, decrypt later attacks.\n"
)
payload = json.dumps(msg).encode("utf-8") payload = json.dumps(msg).encode("utf-8")
framed = struct.pack("<I", len(payload)) + payload framed = struct.pack("<I", len(payload)) + payload
sock.sendall(framed) sock.sendall(framed)
return _recv_all(sock) response = _recv_all(sock)
if response is not None and pq_shared_secret is not None:
try:
from browser_cli.auth import pq_decrypt
envelope = json.loads(response)
if isinstance(envelope, dict) and "encrypted" in envelope:
return pq_decrypt(pq_shared_secret, "response", envelope["encrypted"])
except Exception as e:
raise BrowserNotConnected(f"Cannot decrypt post-quantum remote response: {e}") from e
return response
def _auto_route_remote(endpoint: str, key=None) -> str | None: def _auto_route_remote(endpoint: str, key=None) -> str | None:
+34 -9
View File
@@ -29,17 +29,25 @@ def _proxy_request(client_sock:socket.socket, addr:tuple, profile:str|None, auth
from browser_cli.client import _resolve_socket, BrowserNotConnected from browser_cli.client import _resolve_socket, BrowserNotConnected
from browser_cli.platform import is_windows from browser_cli.platform import is_windows
response_secret = None
def _send_payload(data: bytes) -> None:
if response_secret is not None:
from browser_cli.auth import pq_encrypt
data = json.dumps({"encrypted": pq_encrypt(response_secret, "response", data)}).encode()
_framed_send(client_sock, data)
def _send_error(msg_id, msg:str) -> None: def _send_error(msg_id, msg:str) -> None:
err = json.dumps({"id": msg_id, "success": False, "error": msg}).encode() err = json.dumps({"id": msg_id, "success": False, "error": msg}).encode()
try: try:
_framed_send(client_sock, err) _send_payload(err)
except OSError: except OSError:
pass pass
def _send_ok(msg_id, payload) -> None: def _send_ok(msg_id, payload) -> None:
out = json.dumps({"id": msg_id, "success": True, "data": payload}).encode() out = json.dumps({"id": msg_id, "success": True, "data": payload}).encode()
try: try:
_framed_send(client_sock, out) _send_payload(out)
except OSError: except OSError:
pass pass
@@ -93,9 +101,10 @@ def _proxy_request(client_sock:socket.socket, addr:tuple, profile:str|None, auth
_log(addr, command, None, "DENIED", "untrusted key") _log(addr, command, None, "DENIED", "untrusted key")
return return
pq_shared_secret = None pq_shared_secret = None
transport_encrypted = False
if pq_private_key is not None: if pq_private_key is not None:
kex = msg.get("pq_kex") or {} kex = msg.get("pq_kex") or {}
pq_required = parse_version(client_ver) >= parse_version("0.9.4") pq_required = parse_version(client_ver) >= parse_version("0.9.5")
if not isinstance(kex, dict) or kex.get("alg") != "ML-KEM-768" or not kex.get("ciphertext"): if not isinstance(kex, dict) or kex.get("alg") != "ML-KEM-768" or not kex.get("ciphertext"):
if pq_required: if pq_required:
_send_error(msg_id, "unauthorized: post-quantum key exchange required") _send_error(msg_id, "unauthorized: post-quantum key exchange required")
@@ -103,11 +112,26 @@ def _proxy_request(client_sock:socket.socket, addr:tuple, profile:str|None, auth
return return
else: else:
try: try:
from browser_cli.auth import pq_kex_server_decapsulate from browser_cli.auth import pq_decrypt, pq_kex_server_decapsulate
pq_shared_secret = pq_kex_server_decapsulate(pq_private_key, str(kex["ciphertext"])) pq_shared_secret = pq_kex_server_decapsulate(pq_private_key, str(kex["ciphertext"]))
if "encrypted" in msg:
decrypted_msg = json.loads(pq_decrypt(pq_shared_secret, "request", msg["encrypted"]))
if not isinstance(decrypted_msg, dict):
raise ValueError("encrypted request is not a JSON object")
decrypted_msg["pubkey"] = pub
decrypted_msg["sig"] = sig
decrypted_msg["pq_kex"] = kex
msg = adapt_auth(decrypted_msg, client_ver)
msg_id = msg.get("id", msg_id)
command = msg.get("command", "?")
transport_encrypted = True
elif pq_required:
_send_error(msg_id, "unauthorized: post-quantum encrypted transport required")
_log(addr, command, None, "DENIED", "missing pq transport")
return
except Exception: except Exception:
_send_error(msg_id, "unauthorized: invalid post-quantum key exchange") _send_error(msg_id, "unauthorized: invalid post-quantum encrypted transport")
_log(addr, command, None, "DENIED", "bad pq kex") _log(addr, command, None, "DENIED", "bad pq transport")
return return
from browser_cli.auth import verify from browser_cli.auth import verify
@@ -115,6 +139,7 @@ def _proxy_request(client_sock:socket.socket, addr:tuple, profile:str|None, auth
_send_error(msg_id, "unauthorized: invalid signature") _send_error(msg_id, "unauthorized: invalid signature")
_log(addr, command, None, "DENIED", "bad signature") _log(addr, command, None, "DENIED", "bad signature")
return return
response_secret = pq_shared_secret if transport_encrypted else None
if command == "browser-cli.targets": if command == "browser-cli.targets":
from browser_cli.client import active_browser_targets from browser_cli.client import active_browser_targets
@@ -158,7 +183,7 @@ def _proxy_request(client_sock:socket.socket, addr:tuple, profile:str|None, auth
resolved_profile = msg.get("_route") or profile resolved_profile = msg.get("_route") or profile
# ── strip protocol fields, apply request compat shim, forward ───────────── # ── strip protocol fields, apply request compat shim, forward ─────────────
strip = {"token", "_route", "pubkey", "sig", "user_agent", "pq_kex"} strip = {"token", "_route", "pubkey", "sig", "user_agent", "pq_kex", "encrypted"}
clean_msg = {k: v for k, v in msg.items() if k not in strip} clean_msg = {k: v for k, v in msg.items() if k not in strip}
clean_msg = adapt_request(clean_msg, client_ver) clean_msg = adapt_request(clean_msg, client_ver)
clean_payload = json.dumps(clean_msg).encode() clean_payload = json.dumps(clean_msg).encode()
@@ -178,14 +203,14 @@ def _proxy_request(client_sock:socket.socket, addr:tuple, profile:str|None, auth
pipe.send_bytes(clean_payload) pipe.send_bytes(clean_payload)
resp_payload = pipe.recv_bytes() resp_payload = pipe.recv_bytes()
resp_payload = adapt_response(resp_payload, command, client_ver) resp_payload = adapt_response(resp_payload, command, client_ver)
_framed_send(client_sock, resp_payload) _send_payload(resp_payload)
else: else:
with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as local: with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as local:
local.connect(sock_path) local.connect(sock_path)
local.sendall(clean_header + clean_payload) local.sendall(clean_header + clean_payload)
resp_payload = _recv_all(local) resp_payload = _recv_all(local)
resp_payload = adapt_response(resp_payload, command, client_ver) resp_payload = adapt_response(resp_payload, command, client_ver)
_framed_send(client_sock, resp_payload) _send_payload(resp_payload)
resp_data = json.loads(resp_payload) resp_data = json.loads(resp_payload)
if resp_data.get("success", True): if resp_data.get("success", True):
+1 -1
View File
@@ -1,7 +1,7 @@
{ {
"manifest_version": 3, "manifest_version": 3,
"name": "browser-cli", "name": "browser-cli",
"version": "0.9.4", "version": "0.9.5",
"description": "Control your browser from the terminal via browser-cli", "description": "Control your browser from the terminal via browser-cli",
"permissions": [ "permissions": [
"tabs", "tabs",
+1 -1
View File
@@ -1,6 +1,6 @@
[project] [project]
name = "browser-cli" name = "browser-cli"
version = "0.9.4" version = "0.9.5"
description = "Control your real running browser from the terminal via a browser extension" description = "Control your real running browser from the terminal via a browser extension"
requires-python = ">=3.10" requires-python = ">=3.10"
dependencies = [ dependencies = [
+19
View File
@@ -12,6 +12,8 @@ from browser_cli.auth import (
load_authorized_keys_with_names, load_authorized_keys_with_names,
load_private_key, load_private_key,
new_nonce, new_nonce,
pq_decrypt,
pq_encrypt,
pq_kex_client_encapsulate, pq_kex_client_encapsulate,
pq_kex_server_decapsulate, pq_kex_server_decapsulate,
pq_kex_server_keypair, pq_kex_server_keypair,
@@ -124,6 +126,23 @@ class TestPostQuantumKex:
assert server_secret == client_secret assert server_secret == client_secret
assert len(server_secret) == 32 assert len(server_secret) == 32
def test_pq_transport_encrypt_decrypt_roundtrip(self):
secret = b"s" * 32
plaintext = b'{"command":"tabs.list"}'
envelope = pq_encrypt(secret, "request", plaintext)
assert envelope["alg"] == "ML-KEM-768+ChaCha20Poly1305"
assert plaintext.hex() not in envelope["ciphertext"]
assert pq_decrypt(secret, "request", envelope) == plaintext
def test_pq_transport_direction_is_bound(self):
secret = b"s" * 32
envelope = pq_encrypt(secret, "request", b"payload")
with pytest.raises(Exception):
pq_decrypt(secret, "response", envelope)
class TestAuthorizedKeys: class TestAuthorizedKeys:
def test_add_and_load(self, tmp_path): def test_add_and_load(self, tmp_path):
+51 -2
View File
@@ -6,7 +6,7 @@ import threading
import pytest import pytest
from browser_cli.auth import generate_keypair, load_private_key, new_nonce, sign from browser_cli.auth import generate_keypair, load_private_key, new_nonce, pq_decrypt, pq_encrypt, sign
from browser_cli.client import BrowserNotConnected from browser_cli.client import BrowserNotConnected
from browser_cli.commands.serve import _handle_client from browser_cli.commands.serve import _handle_client
@@ -167,7 +167,7 @@ class TestRejection:
client, t, challenge = self._connect(path) client, t, challenge = self._connect(path)
nonce = bytes.fromhex(challenge["nonce"]) nonce = bytes.fromhex(challenge["nonce"])
msg = {"id": "x", "command": "tabs.list", "args": {}, "user_agent": "browser-cli/0.9.4", "pubkey": pub} msg = {"id": "x", "command": "tabs.list", "args": {}, "user_agent": "browser-cli/0.9.5", "pubkey": pub}
msg["sig"] = sign(priv, nonce, msg).hex() msg["sig"] = sign(priv, nonce, msg).hex()
_send_framed(client, json.dumps(msg).encode()) _send_framed(client, json.dumps(msg).encode())
resp = _recv_framed(client) resp = _recv_framed(client)
@@ -302,6 +302,55 @@ class TestAuthSuccess:
client.close() client.close()
t.join(timeout=2) t.join(timeout=2)
def test_post_quantum_encrypted_transport_reaches_proxy(self, tmp_path, monkeypatch):
"""New clients encrypt the command payload and receive encrypted responses."""
monkeypatch.setattr("browser_cli.client._resolve_socket", _mock_no_browser)
monkeypatch.setattr("browser_cli.auth.pq_kex_server_keypair", lambda: ("fake-private", b"fake-public"))
monkeypatch.setattr("browser_cli.auth.pq_kex_server_decapsulate", lambda priv, ct: b"pq-secret")
path = tmp_path / "authorized_keys"
pem, pub = generate_keypair()
path.write_text(pub + "\n")
key_path = tmp_path / "client.key.pem"
key_path.write_bytes(pem)
priv = load_private_key(key_path)
client, server = _pair()
t = threading.Thread(
target=_handle_client,
args=(server, ("127.0.0.1", 9999), None, path),
daemon=True,
)
t.start()
challenge = _recv_framed(client)
nonce = bytes.fromhex(challenge["nonce"])
clean_msg = {
"id": "x",
"command": "tabs.list",
"args": {},
"user_agent": "browser-cli/0.9.5",
"pq_kex": {"alg": "ML-KEM-768", "ciphertext": "cafe"},
}
sig = sign(priv, nonce, clean_msg, b"pq-secret").hex()
envelope = {
"id": "x",
"user_agent": "browser-cli/0.9.5",
"pubkey": pub,
"sig": sig,
"pq_kex": clean_msg["pq_kex"],
"encrypted": pq_encrypt(b"pq-secret", "request", json.dumps(clean_msg).encode()),
}
_send_framed(client, json.dumps(envelope).encode())
encrypted_resp = _recv_framed(client)
assert "encrypted" in encrypted_resp
resp = json.loads(pq_decrypt(b"pq-secret", "response", encrypted_resp["encrypted"]))
assert "unauthorized" not in resp.get("error", "").lower()
assert "browser" in resp.get("error", "").lower() or "connected" in resp.get("error", "").lower()
client.close()
t.join(timeout=2)
def test_no_auth_mode_reaches_proxy(self, monkeypatch): def test_no_auth_mode_reaches_proxy(self, monkeypatch):
"""auth_keys_path=None (--no-auth): no pubkey required, reaches proxy layer.""" """auth_keys_path=None (--no-auth): no pubkey required, reaches proxy layer."""
monkeypatch.setattr("browser_cli.client._resolve_socket", _mock_no_browser) monkeypatch.setattr("browser_cli.client._resolve_socket", _mock_no_browser)
Generated
+1 -1
View File
@@ -4,7 +4,7 @@ requires-python = ">=3.10"
[[package]] [[package]]
name = "browser-cli" name = "browser-cli"
version = "0.9.4" version = "0.9.5"
source = { editable = "." } source = { editable = "." }
dependencies = [ dependencies = [
{ name = "click" }, { name = "click" },