"""TCP/TLS transport for talking to a remote ``browser-cli serve``. This module keeps the public/private compatibility surface used by older tests and callers, while delegating socket mechanics and auth-handshake details to focused helper modules. """ from __future__ import annotations import json from browser_cli.framing import async_send_frame, frame from browser_cli.remote.auth import ( build_auth_message as _build_auth_message, build_auth_message_async as _build_auth_message_async, decode_pq_response as _decode_pq_response, parse_challenge as _parse_challenge, should_warn_no_pq as _should_warn_no_pq, with_challenge as _with_challenge, ) from browser_cli.remote.socket import ( async_recv_all as _async_recv_all, async_recv_exact_bytes as _async_recv_exact, connect_socket as _connect_socket, open_async_connection as _open_async_connection, open_socket as _open_socket, recv_all as _recv_all, recv_exact_bytes as _recv_exact, split_endpoint as _split_endpoint, ) from browser_cli.remote import pool as _pool def _send_remote(endpoint: str, msg: dict, private_key=None, *, warn_no_pq: bool | None = None) -> bytes | None: # Reuse an already-authenticated connection when one is idle for this endpoint. conn = _pool.checkout(endpoint) if conn is not None: try: response = _pool.send_over(conn, msg) _pool.checkin(endpoint, conn) return response except (OSError, ConnectionError, ValueError, EOFError): _pool.discard(conn) # stale/closed — fall through to a fresh handshake return _send_remote_handshake(endpoint, msg, private_key, warn_no_pq=warn_no_pq) def _send_remote_handshake(endpoint: str, msg: dict, private_key=None, *, warn_no_pq: bool | None = None) -> bytes | None: warn = _should_warn_no_pq(msg) if warn_no_pq is None else warn_no_pq def build_auth(sync_msg: dict, challenge: dict | None, nonce_hex: str | None, key): from browser_cli.auth import pq_kex_client_encapsulate return _build_auth_message(sync_msg, challenge, nonce_hex, key, pq_kex_client_encapsulate, warn_no_pq=warn) sock = _connect_socket(endpoint) try: payload_msg, pq_shared_secret = _with_challenge(_recv_all(sock), msg, private_key, build_auth) sock.sendall(frame(json.dumps(payload_msg).encode("utf-8"))) response = _decode_pq_response(_recv_all(sock), pq_shared_secret) except BaseException: _pool._close(sock) raise # Only encrypted sessions are reusable — the server keeps those open, and a # fresh AEAD nonce per frame keeps reuse of the shared secret safe. if pq_shared_secret is not None: _pool.checkin(endpoint, _pool.PooledConnection(sock, pq_shared_secret)) else: _pool._close(sock) return response async def _send_remote_async(endpoint: str, msg: dict, private_key=None, *, warn_no_pq: bool | None = None) -> bytes | None: reader, writer = await _open_async_connection(endpoint) try: challenge_raw = await _async_recv_all(reader) warn = _should_warn_no_pq(msg) if warn_no_pq is None else warn_no_pq async def build_auth(sync_msg: dict, challenge: dict | None, nonce_hex: str | None, key): return await _build_auth_message_async(sync_msg, challenge, nonce_hex, key, warn_no_pq=warn) payload_msg, pq_shared_secret = await _with_challenge(challenge_raw, msg, private_key, build_auth) await async_send_frame(writer, json.dumps(payload_msg).encode("utf-8")) return _decode_pq_response(await _async_recv_all(reader), pq_shared_secret) finally: writer.close() try: await writer.wait_closed() except Exception: pass