#!/usr/bin/env python3 """ Native Messaging Host for browser-cli. Chrome launches this process when extension calls connectNative(). It relays messages between extension (stdin/stdout Native Messaging protocol) and CLI (local IPC endpoint: Unix socket on Unix, named pipe on Windows). """ import json import os import queue import socket import sys import threading import uuid from pathlib import Path from browser_cli.native import local_server, protocol from browser_cli.constants import DEFAULT_ALIAS, DEFAULT_PAGE_SIZE, MAX_PAGED_ITEMS, PAGEABLE_COMMANDS from browser_cli.platform import endpoint_for_alias, is_windows, registry_path, runtime_dir from browser_cli.registry import update_registry SOCKET_PATH: str = "" # set after hello handshake PENDING: dict[str, queue.Queue] = {} PENDING_LOCK = threading.Lock() WRITE_LOCK = threading.Lock() REGISTRY_PATH = registry_path() PAGE_SIZE = int(os.environ.get("BROWSER_CLI_PAGE_SIZE", str(DEFAULT_PAGE_SIZE))) # --- Registry helpers --- def _registry_add(alias: str, sock_path: str) -> None: try: update_registry(alias, sock_path, REGISTRY_PATH) except Exception: pass def _registry_remove(alias: str) -> None: try: update_registry(alias, None, REGISTRY_PATH) except Exception: pass def _socket_path_for(alias: str) -> str: return endpoint_for_alias(alias) def _resolve_profile_alias(first_msg: dict | None) -> str: """Return a unique alias when the extension did not provide one.""" if first_msg and first_msg.get("type") == "hello": alias = first_msg.get("alias") if alias and alias != DEFAULT_ALIAS: return alias return str(uuid.uuid4()) # --- Thread A: read messages from extension (stdin) --- def stdin_reader(alias: str): stdin = sys.stdin.buffer while True: msg = protocol.read_native_message(stdin) if msg is None: # Extension disconnected — clean up and exit _cleanup(alias) os._exit(0) # Profile alias handshake if msg.get("type") == "hello": continue # already handled during startup if msg.get("type") == "bye": _cleanup(alias) os._exit(0) msg_id = msg.get("id") if msg_id: with PENDING_LOCK: q = PENDING.get(msg_id) if q: q.put(msg) # --- Thread B: accept CLI socket connections --- def _json_response(result: dict) -> bytes: return json.dumps(result).encode("utf-8") def _error_response(exc: Exception) -> bytes: return _json_response({"success": False, "error": str(exc)}) def _decode_cli_command(data: bytes) -> dict: cmd = json.loads(data) if "id" not in cmd: cmd["id"] = str(uuid.uuid4()) return cmd def _handle_cli_payload(data: bytes) -> bytes: return _json_response(_handle_browser_command(_decode_cli_command(data))) def _handle_browser_command(cmd: dict) -> dict: command = cmd.get("command") if command in PAGEABLE_COMMANDS: return _collect_paged_browser_command(cmd) return _send_browser_command(cmd) def _send_browser_command(cmd: dict, timeout: int = 30) -> dict: msg_id = cmd.get("id") or str(uuid.uuid4()) cmd["id"] = msg_id response_queue: queue.Queue = queue.Queue() with PENDING_LOCK: PENDING[msg_id] = response_queue try: with WRITE_LOCK: protocol.write_native_message(sys.stdout.buffer, cmd) try: return response_queue.get(timeout=timeout) except queue.Empty: return {"id": msg_id, "success": False, "error": "timeout waiting for browser response"} finally: with PENDING_LOCK: PENDING.pop(msg_id, None) def _collect_paged_browser_command(cmd: dict) -> dict: original_id = cmd.get("id") or str(uuid.uuid4()) offset = 0 items = [] total = None # Independent of PAGE_SIZE: the extension may return fewer items per page than # requested (byte budget), so a page-count guard derived from PAGE_SIZE would # falsely trip. Bound the page count by the absolute item cap instead. max_pages = MAX_PAGED_ITEMS pages_fetched = 0 while True: if pages_fetched >= max_pages: return {"id": original_id, "success": False, "error": f"paging loop exceeded {max_pages} pages — extension bug?"} pages_fetched += 1 page_cmd = dict(cmd) page_cmd["id"] = str(uuid.uuid4()) page_args = dict(cmd.get("args") or {}) page_args["__page"] = {"offset": offset, "limit": PAGE_SIZE} page_cmd["args"] = page_args result = _send_browser_command(page_cmd) result["id"] = original_id if not result.get("success", True): return result data = result.get("data") if not isinstance(data, dict) or data.get("__browserCliPage") is not True: return result page_items = data.get("items") or [] if not isinstance(page_items, list): return {"id": original_id, "success": False, "error": "invalid paged response from browser"} items.extend(page_items) total = data.get("total", total) next_offset = data.get("nextOffset") if next_offset is None or len(items) >= MAX_PAGED_ITEMS: break offset = int(next_offset) return {"id": original_id, "success": True, "data": items, "pageSize": PAGE_SIZE, "total": total} # --- Socket helpers (length-prefixed framing) --- def _cleanup(alias: str): try: if not is_windows(): Path(_socket_path_for(alias)).unlink(missing_ok=True) except Exception: pass _registry_remove(alias) def main(): stdin = sys.stdin.buffer # Wait for the hello handshake to learn the profile alias first_msg = protocol.read_native_message(stdin) if first_msg and first_msg.get("type") == "hello": alias = _resolve_profile_alias(first_msg) else: # No hello — use a generated alias; first_msg is dropped (no response path). alias = str(uuid.uuid4()) runtime_dir().mkdir(mode=0o700, exist_ok=True) sock_path = _socket_path_for(alias) if not is_windows(): path = Path(sock_path) if path.exists(): path.unlink() bound_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) bound_sock.bind(sock_path) os.chmod(sock_path, 0o600) bound_sock.listen(16) else: bound_sock = None _registry_add(alias, sock_path) t = threading.Thread( target=local_server.socket_server, args=(sock_path, _handle_cli_payload, _error_response), kwargs={"bound_sock": bound_sock}, daemon=True, ) t.start() stdin_reader(alias) if __name__ == "__main__": main()