Files
browser-cli/browser_cli/native_host.py
T
2026-04-08 21:17:59 +02:00

160 lines
4.0 KiB
Python

#!/usr/bin/env python3
"""
Native Messaging Host for browser-cli.
Chrome launches this process when the extension calls connectNative().
It relays messages between the Chrome extension (via stdin/stdout using the
Native Messaging protocol) and the CLI (via a Unix domain socket).
"""
import json
import os
import queue
import socket
import struct
import sys
import threading
import uuid
from pathlib import Path
SOCKET_PATH = "/tmp/browser-cli.sock"
PENDING: dict[str, queue.Queue] = {}
PENDING_LOCK = threading.Lock()
# --- Native Messaging protocol (4-byte LE length prefix + UTF-8 JSON) ---
def read_native_message(stream) -> dict | None:
raw_len = stream.read(4)
if len(raw_len) < 4:
return None
msg_len = struct.unpack("<I", raw_len)[0]
data = stream.read(msg_len)
if len(data) < msg_len:
return None
return json.loads(data.decode("utf-8"))
def write_native_message(stream, msg: dict) -> None:
data = json.dumps(msg).encode("utf-8")
stream.write(struct.pack("<I", len(data)))
stream.write(data)
stream.flush()
# --- Thread A: read messages from extension (stdin) ---
def stdin_reader():
stdin = sys.stdin.buffer
while True:
msg = read_native_message(stdin)
if msg is None:
# Extension disconnected — clean up socket and exit
_cleanup()
os._exit(0)
msg_id = msg.get("id")
if msg_id:
with PENDING_LOCK:
q = PENDING.get(msg_id)
if q:
q.put(msg)
# --- Thread B: accept CLI socket connections ---
def socket_server():
path = Path(SOCKET_PATH)
if path.exists():
path.unlink()
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.bind(SOCKET_PATH)
sock.listen(16)
while True:
try:
conn, _ = sock.accept()
except OSError:
break
threading.Thread(target=handle_cli_connection, args=(conn,), daemon=True).start()
def handle_cli_connection(conn: socket.socket) -> None:
try:
data = _recv_all(conn)
if not data:
return
cmd = json.loads(data)
if "id" not in cmd:
cmd["id"] = str(uuid.uuid4())
msg_id = cmd["id"]
response_queue: queue.Queue = queue.Queue()
with PENDING_LOCK:
PENDING[msg_id] = response_queue
# Forward command to extension via stdout
write_native_message(sys.stdout.buffer, cmd)
# Wait for extension's response (30 s timeout)
try:
result = response_queue.get(timeout=30)
except queue.Empty:
result = {"id": msg_id, "success": False, "error": "timeout waiting for browser response"}
with PENDING_LOCK:
PENDING.pop(msg_id, None)
_send_all(conn, json.dumps(result).encode("utf-8"))
except Exception as exc:
try:
_send_all(conn, json.dumps({"success": False, "error": str(exc)}).encode("utf-8"))
except Exception:
pass
finally:
conn.close()
# --- Socket helpers (length-prefixed framing) ---
def _send_all(conn: socket.socket, data: bytes) -> None:
framed = struct.pack("<I", len(data)) + data
conn.sendall(framed)
def _recv_all(conn: socket.socket) -> bytes | None:
raw_len = _recv_exact(conn, 4)
if raw_len is None:
return None
msg_len = struct.unpack("<I", raw_len)[0]
return _recv_exact(conn, msg_len)
def _recv_exact(conn: socket.socket, n: int) -> bytes | None:
buf = b""
while len(buf) < n:
chunk = conn.recv(n - len(buf))
if not chunk:
return None
buf += chunk
return buf
def _cleanup():
try:
Path(SOCKET_PATH).unlink(missing_ok=True)
except Exception:
pass
def main():
# Start socket server thread
t = threading.Thread(target=socket_server, daemon=True)
t.start()
# Read extension messages on main thread (blocks until extension disconnects)
stdin_reader()
if __name__ == "__main__":
main()