"""Proxying from TCP clients to the local browser native-host socket.""" from __future__ import annotations import asyncio import json from browser_cli import transport from browser_cli.compat import adapt_request, adapt_response from browser_cli.framing import async_recv_frame, async_send_frame from browser_cli.serve.logging import log_request _STRIP_PROTOCOL_FIELDS = {"token", "_route", "pubkey", "sig", "user_agent", "pq_kex", "encrypted", "accept_encoding"} class ServeProxyMixin: addr: tuple profile: str | None client_ver: str command: str compress: bool accept_encoding: dict | None async def send_error(self, msg: str, msg_id=None) -> None: ... async def send_payload(self, data: bytes) -> None: ... async def forward_to_browser(self, msg: dict) -> None: from browser_cli.client import BrowserNotConnected from browser_cli.client.targets import resolve_socket from browser_cli.platform import is_windows resolved_profile = msg.get("_route") or self.profile clean_msg = {k: v for k, v in msg.items() if k not in _STRIP_PROTOCOL_FIELDS} clean_payload = json.dumps(adapt_request(clean_msg, self.client_ver)).encode() try: sock_path = resolve_socket(resolved_profile) except BrowserNotConnected as e: await self.send_error(str(e)) log_request(self.addr, self.command, resolved_profile, "ERROR", "browser not connected") return try: if is_windows(): resp_payload = await self._windows_roundtrip(sock_path, clean_payload) else: resp_payload = await self._unix_roundtrip(sock_path, clean_payload) await self.send_browser_response(adapt_response(resp_payload, self.command, self.client_ver), resolved_profile) except (OSError, json.JSONDecodeError, ConnectionError) as e: await self.send_error(str(e)) log_request(self.addr, self.command, resolved_profile, "ERROR", str(e)) async def _windows_roundtrip(self, sock_path: str, payload: bytes) -> bytes: from multiprocessing.connection import Client as PipeClient def _pipe_roundtrip(): with PipeClient(sock_path, family="AF_PIPE") as pipe: pipe.send_bytes(payload) return pipe.recv_bytes() return await asyncio.to_thread(_pipe_roundtrip) async def _unix_roundtrip(self, sock_path: str, payload: bytes) -> bytes: local_reader, local_writer = await asyncio.open_unix_connection(sock_path) try: await async_send_frame(local_writer, payload) return await async_recv_frame(local_reader) or b"" finally: local_writer.close() await local_writer.wait_closed() async def send_browser_response(self, resp_payload: bytes, resolved_profile: str | None) -> None: resp_data = json.loads(resp_payload) if self.compress: await self.send_payload(transport.encode_response(resp_data, self.accept_encoding, self.command)) else: await self.send_payload(resp_payload) if resp_data.get("success", True): log_request(self.addr, self.command, resolved_profile, "OK") else: log_request(self.addr, self.command, resolved_profile, "ERROR", resp_data.get("error", ""))