#!/usr/bin/env python3 """ Native Messaging Host for browser-cli. Chrome launches this process when the extension calls connectNative(). It relays messages between the Chrome extension (via stdin/stdout using the Native Messaging protocol) and the CLI (via a Unix domain socket). Multi-browser support: the extension sends a "hello" message on startup with a profile alias. The host uses that alias to create a unique socket path and registers it in a shared registry file. """ import json import os import queue import socket import struct import sys import threading import uuid from pathlib import Path SOCKET_DIR = Path("/tmp/.browser_cli") REGISTRY_PATH = SOCKET_DIR / "registry.json" DEFAULT_ALIAS = "default" SOCKET_PATH: str = "" # set after hello handshake PENDING: dict[str, queue.Queue] = {} PENDING_LOCK = threading.Lock() # --- Native Messaging protocol (4-byte LE length prefix + UTF-8 JSON) --- def read_native_message(stream) -> dict | None: raw_len = stream.read(4) if len(raw_len) < 4: return None msg_len = struct.unpack(" None: data = json.dumps(msg).encode("utf-8") stream.write(struct.pack(" None: try: reg = json.loads(REGISTRY_PATH.read_text()) if REGISTRY_PATH.exists() else {} reg[alias] = sock_path REGISTRY_PATH.write_text(json.dumps(reg)) except Exception: pass def _registry_remove(alias: str) -> None: try: if not REGISTRY_PATH.exists(): return reg = json.loads(REGISTRY_PATH.read_text()) reg.pop(alias, None) REGISTRY_PATH.write_text(json.dumps(reg)) except Exception: pass def _socket_path_for(alias: str) -> str: safe = alias.replace(" ", "_").replace("/", "_") return str(SOCKET_DIR / f"{safe}.sock") # --- Thread A: read messages from extension (stdin) --- def stdin_reader(alias: str): stdin = sys.stdin.buffer while True: msg = read_native_message(stdin) if msg is None: # Extension disconnected — clean up and exit _cleanup(alias) os._exit(0) # Profile alias handshake if msg.get("type") == "hello": continue # already handled during startup msg_id = msg.get("id") if msg_id: with PENDING_LOCK: q = PENDING.get(msg_id) if q: q.put(msg) # --- Thread B: accept CLI socket connections --- def socket_server(sock_path: str): path = Path(sock_path) if path.exists(): path.unlink() sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) sock.bind(sock_path) sock.listen(16) while True: try: conn, _ = sock.accept() except OSError: break threading.Thread(target=handle_cli_connection, args=(conn,), daemon=True).start() def handle_cli_connection(conn: socket.socket) -> None: try: data = _recv_all(conn) if not data: return cmd = json.loads(data) if "id" not in cmd: cmd["id"] = str(uuid.uuid4()) msg_id = cmd["id"] response_queue: queue.Queue = queue.Queue() with PENDING_LOCK: PENDING[msg_id] = response_queue write_native_message(sys.stdout.buffer, cmd) try: result = response_queue.get(timeout=30) except queue.Empty: result = {"id": msg_id, "success": False, "error": "timeout waiting for browser response"} with PENDING_LOCK: PENDING.pop(msg_id, None) _send_all(conn, json.dumps(result).encode("utf-8")) except Exception as exc: try: _send_all(conn, json.dumps({"success": False, "error": str(exc)}).encode("utf-8")) except Exception: pass finally: conn.close() # --- Socket helpers (length-prefixed framing) --- def _send_all(conn: socket.socket, data: bytes) -> None: framed = struct.pack(" bytes | None: raw_len = _recv_exact(conn, 4) if raw_len is None: return None msg_len = struct.unpack(" bytes | None: buf = b"" while len(buf) < n: chunk = conn.recv(n - len(buf)) if not chunk: return None buf += chunk return buf def _cleanup(alias: str): try: Path(_socket_path_for(alias)).unlink(missing_ok=True) except Exception: pass _registry_remove(alias) def main(): stdin = sys.stdin.buffer # Wait for the hello handshake to learn the profile alias first_msg = read_native_message(stdin) if first_msg and first_msg.get("type") == "hello": alias = first_msg.get("alias") or DEFAULT_ALIAS else: # No hello — fall back to default, re-queue message if it was a command alias = DEFAULT_ALIAS if first_msg: msg_id = first_msg.get("id") if msg_id: q: queue.Queue = queue.Queue() with PENDING_LOCK: PENDING[msg_id] = q write_native_message(sys.stdout.buffer, first_msg) SOCKET_DIR.mkdir(mode=0o700, exist_ok=True) sock_path = _socket_path_for(alias) _registry_add(alias, sock_path) t = threading.Thread(target=socket_server, args=(sock_path,), daemon=True) t.start() stdin_reader(alias) if __name__ == "__main__": main()