"""TCP/TLS transport for talking to a remote ``browser-cli serve``. Owns the wire mechanics of the remote leg: open a socket (TLS on :443), complete the signed challenge/response handshake with an optional post-quantum key exchange, frame the request, and read the framed (possibly encrypted) response. The higher-level "which endpoint / which profile / which key" decisions stay in :mod:`browser_cli.client.core`. """ from __future__ import annotations import asyncio import json import socket import sys from collections.abc import Callable from contextlib import contextmanager from typing import TypeVar from browser_cli.errors import BrowserNotConnected from browser_cli.endpoints import _resolve_connect_endpoint from browser_cli.framing import async_recv_exact, async_recv_frame, async_send_frame, frame, recv_exact, recv_frame from browser_cli.version_manager import USER_AGENT as _USER_AGENT T = TypeVar("T") _AUTH_FIELDS = {"token", "pubkey", "sig", "pq_kex", "encrypted", "_suppress_pq_warning"} _PQ_WARNING = ( "** WARNING: connection is not using a post-quantum key exchange algorithm.\n" "** This session may be vulnerable to store now, decrypt later attacks.\n" ) def _recv_exact(sock: socket.socket, n: int) -> bytes: return recv_exact(sock, n) or b"" def _recv_all(sock: socket.socket) -> bytes: return recv_frame(sock, label="Response") or b"" async def _async_recv_exact(reader: asyncio.StreamReader, n: int) -> bytes: return await async_recv_exact(reader, n) or b"" async def _async_recv_all(reader: asyncio.StreamReader) -> bytes: return await async_recv_frame(reader, label="Response") or b"" def _split_endpoint(endpoint: str) -> tuple[str, int]: connect_ep = _resolve_connect_endpoint(endpoint) host, _, port_str = connect_ep.rpartition(":") return host, int(port_str) @contextmanager def _open_socket(endpoint: str): host, port = _split_endpoint(endpoint) raw_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) raw_sock.settimeout(30) try: raw_sock.connect((host, port)) if port == 443: import ssl sock = ssl.create_default_context().wrap_socket(raw_sock, server_hostname=host) else: sock = raw_sock except Exception: raw_sock.close() raise with sock: yield sock async def _open_async_connection(endpoint: str) -> tuple[asyncio.StreamReader, asyncio.StreamWriter]: host, port = _split_endpoint(endpoint) ssl_ctx = None if port == 443: import ssl ssl_ctx = ssl.create_default_context() return await asyncio.open_connection(host, port, ssl=ssl_ctx, server_hostname=host if ssl_ctx else None) def _parse_challenge(raw: bytes) -> tuple[dict | None, str | None]: try: challenge = json.loads(raw) nonce_hex = challenge.get("nonce") if challenge.get("type") == "challenge" else None return challenge, nonce_hex except (json.JSONDecodeError, AttributeError): return None, None def _check_min_client_version(challenge: dict | None) -> None: min_ver = challenge.get("min_client_version") if isinstance(challenge, dict) else None if not min_ver: return from browser_cli.version_manager import parse_version try: client_ver = _USER_AGENT.split("/", 1)[1] if parse_version(client_ver) < parse_version(min_ver): raise BrowserNotConnected( f"Client version {client_ver} is too old for this server " f"(requires >= {min_ver}). Run: pip install --upgrade browser-cli" ) except (IndexError, ValueError): pass def _clean_message(msg: dict) -> dict: return {k: v for k, v in msg.items() if k not in _AUTH_FIELDS} def _get_pq_public_key(challenge: dict | None) -> str | None: if not isinstance(challenge, dict): return None from browser_cli.auth import PQ_KEX_ALG kex = challenge.get("pq_kex") if isinstance(kex, dict) and kex.get("alg") == PQ_KEX_ALG and kex.get("public_key"): return str(kex["public_key"]) return None def _signed_payload(clean_msg: dict, private_key, nonce_hex: str, pq_shared_secret: bytes | None) -> dict: from browser_cli.auth import PQ_KEX_ALG, pq_encrypt, public_key_hex, sign nonce = bytes.fromhex(nonce_hex) sig = sign(private_key, nonce, clean_msg, pq_shared_secret) pubkey = public_key_hex(private_key) if pq_shared_secret is None: return {**clean_msg, "pubkey": pubkey, "sig": sig.hex()} encrypted = pq_encrypt(pq_shared_secret, "request", json.dumps(clean_msg).encode("utf-8")) return { "id": clean_msg.get("id"), "user_agent": clean_msg.get("user_agent"), "pubkey": pubkey, "sig": sig.hex(), "pq_kex": clean_msg["pq_kex"], "encrypted": encrypted, } def _warn_no_pq(enabled: bool) -> None: if enabled: sys.stderr.write(_PQ_WARNING) def _build_auth_message( msg: dict, challenge: dict | None, nonce_hex: str | None, private_key, encapsulate: Callable[[str], tuple[str, bytes]], *, warn_no_pq: bool = True, ) -> tuple[dict, bytes | None]: if not nonce_hex or private_key is None: _warn_no_pq(warn_no_pq) return msg, None clean_msg = _clean_message(msg) pq_shared_secret = None pq_public_key = _get_pq_public_key(challenge) if pq_public_key: from browser_cli.auth import PQ_KEX_ALG ciphertext_hex, pq_shared_secret = encapsulate(pq_public_key) clean_msg["pq_kex"] = {"alg": PQ_KEX_ALG, "ciphertext": ciphertext_hex} else: _warn_no_pq(warn_no_pq) return _signed_payload(clean_msg, private_key, nonce_hex, pq_shared_secret), pq_shared_secret async def _build_auth_message_async( msg: dict, challenge: dict | None, nonce_hex: str | None, private_key, *, warn_no_pq: bool = True, ) -> tuple[dict, bytes | None]: def encapsulate(public_key: str) -> tuple[str, bytes]: from browser_cli.auth import pq_kex_client_encapsulate return pq_kex_client_encapsulate(public_key) return await asyncio.to_thread( _build_auth_message, msg, challenge, nonce_hex, private_key, encapsulate, warn_no_pq=warn_no_pq, ) def _decode_pq_response(response: bytes | None, pq_shared_secret: bytes | None) -> bytes | None: if response is None or pq_shared_secret is None: return response try: from browser_cli.auth import pq_decrypt envelope = json.loads(response) if isinstance(envelope, dict) and "encrypted" in envelope: return pq_decrypt(pq_shared_secret, "response", envelope["encrypted"]) except Exception as e: raise BrowserNotConnected(f"Cannot decrypt post-quantum remote response: {e}") from e return response def _with_challenge(challenge_raw: bytes, msg: dict, private_key, build_auth: Callable[[dict, dict | None, str | None, object], T]) -> T: if challenge_raw is None: raise BrowserNotConnected("No challenge received from remote endpoint") challenge, nonce_hex = _parse_challenge(challenge_raw) _check_min_client_version(challenge) return build_auth(msg, challenge, nonce_hex, private_key) def _should_warn_no_pq(msg: dict) -> bool: return not bool(msg.pop("_suppress_pq_warning", False)) async def _send_remote_async(endpoint: str, msg: dict, private_key=None, *, warn_no_pq: bool | None = None) -> bytes | None: reader, writer = await _open_async_connection(endpoint) try: challenge_raw = await _async_recv_all(reader) warn = _should_warn_no_pq(msg) if warn_no_pq is None else warn_no_pq async def build_auth(sync_msg: dict, challenge: dict | None, nonce_hex: str | None, key): return await _build_auth_message_async(sync_msg, challenge, nonce_hex, key, warn_no_pq=warn) payload_msg, pq_shared_secret = await _with_challenge(challenge_raw, msg, private_key, build_auth) await async_send_frame(writer, json.dumps(payload_msg).encode("utf-8")) return _decode_pq_response(await _async_recv_all(reader), pq_shared_secret) finally: writer.close() try: await writer.wait_closed() except Exception: pass def _send_remote(endpoint: str, msg: dict, private_key=None, *, warn_no_pq: bool | None = None) -> bytes | None: warn = _should_warn_no_pq(msg) if warn_no_pq is None else warn_no_pq def build_auth(sync_msg: dict, challenge: dict | None, nonce_hex: str | None, key): from browser_cli.auth import pq_kex_client_encapsulate return _build_auth_message(sync_msg, challenge, nonce_hex, key, pq_kex_client_encapsulate, warn_no_pq=warn) with _open_socket(endpoint) as sock: payload_msg, pq_shared_secret = _with_challenge(_recv_all(sock), msg, private_key, build_auth) sock.sendall(frame(json.dumps(payload_msg).encode("utf-8"))) return _decode_pq_response(_recv_all(sock), pq_shared_secret)