Files
browser-cli/browser_cli/native_host.py
T

216 lines
5.7 KiB
Python

#!/usr/bin/env python3
"""
Native Messaging Host for browser-cli.
Chrome launches this process when the extension calls connectNative().
It relays messages between the Chrome extension (via stdin/stdout using the
Native Messaging protocol) and the CLI (via a Unix domain socket).
Multi-browser support: the extension sends a "hello" message on startup
with a profile alias. The host uses that alias to create a unique socket
path and registers it in a shared registry file.
"""
import json
import os
import queue
import socket
import struct
import sys
import threading
import uuid
from pathlib import Path
REGISTRY_PATH = Path("/tmp/browser-cli-registry.json")
DEFAULT_ALIAS = "default"
SOCKET_PATH: str = "" # set after hello handshake
PENDING: dict[str, queue.Queue] = {}
PENDING_LOCK = threading.Lock()
# --- Native Messaging protocol (4-byte LE length prefix + UTF-8 JSON) ---
def read_native_message(stream) -> dict | None:
raw_len = stream.read(4)
if len(raw_len) < 4:
return None
msg_len = struct.unpack("<I", raw_len)[0]
data = stream.read(msg_len)
if len(data) < msg_len:
return None
return json.loads(data.decode("utf-8"))
def write_native_message(stream, msg: dict) -> None:
data = json.dumps(msg).encode("utf-8")
stream.write(struct.pack("<I", len(data)))
stream.write(data)
stream.flush()
# --- Registry helpers ---
def _registry_add(alias: str, sock_path: str) -> None:
try:
reg = json.loads(REGISTRY_PATH.read_text()) if REGISTRY_PATH.exists() else {}
reg[alias] = sock_path
REGISTRY_PATH.write_text(json.dumps(reg))
except Exception:
pass
def _registry_remove(alias: str) -> None:
try:
if not REGISTRY_PATH.exists():
return
reg = json.loads(REGISTRY_PATH.read_text())
reg.pop(alias, None)
REGISTRY_PATH.write_text(json.dumps(reg))
except Exception:
pass
def _socket_path_for(alias: str) -> str:
safe = alias.replace(" ", "_").replace("/", "_")
return f"/tmp/browser-cli-{safe}.sock"
# --- Thread A: read messages from extension (stdin) ---
def stdin_reader(alias: str):
stdin = sys.stdin.buffer
while True:
msg = read_native_message(stdin)
if msg is None:
# Extension disconnected — clean up and exit
_cleanup(alias)
os._exit(0)
# Profile alias handshake
if msg.get("type") == "hello":
continue # already handled during startup
msg_id = msg.get("id")
if msg_id:
with PENDING_LOCK:
q = PENDING.get(msg_id)
if q:
q.put(msg)
# --- Thread B: accept CLI socket connections ---
def socket_server(sock_path: str):
path = Path(sock_path)
if path.exists():
path.unlink()
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.bind(sock_path)
sock.listen(16)
while True:
try:
conn, _ = sock.accept()
except OSError:
break
threading.Thread(target=handle_cli_connection, args=(conn,), daemon=True).start()
def handle_cli_connection(conn: socket.socket) -> None:
try:
data = _recv_all(conn)
if not data:
return
cmd = json.loads(data)
if "id" not in cmd:
cmd["id"] = str(uuid.uuid4())
msg_id = cmd["id"]
response_queue: queue.Queue = queue.Queue()
with PENDING_LOCK:
PENDING[msg_id] = response_queue
write_native_message(sys.stdout.buffer, cmd)
try:
result = response_queue.get(timeout=30)
except queue.Empty:
result = {"id": msg_id, "success": False, "error": "timeout waiting for browser response"}
with PENDING_LOCK:
PENDING.pop(msg_id, None)
_send_all(conn, json.dumps(result).encode("utf-8"))
except Exception as exc:
try:
_send_all(conn, json.dumps({"success": False, "error": str(exc)}).encode("utf-8"))
except Exception:
pass
finally:
conn.close()
# --- Socket helpers (length-prefixed framing) ---
def _send_all(conn: socket.socket, data: bytes) -> None:
framed = struct.pack("<I", len(data)) + data
conn.sendall(framed)
def _recv_all(conn: socket.socket) -> bytes | None:
raw_len = _recv_exact(conn, 4)
if raw_len is None:
return None
msg_len = struct.unpack("<I", raw_len)[0]
return _recv_exact(conn, msg_len)
def _recv_exact(conn: socket.socket, n: int) -> bytes | None:
buf = b""
while len(buf) < n:
chunk = conn.recv(n - len(buf))
if not chunk:
return None
buf += chunk
return buf
def _cleanup(alias: str):
try:
Path(_socket_path_for(alias)).unlink(missing_ok=True)
except Exception:
pass
_registry_remove(alias)
def main():
stdin = sys.stdin.buffer
# Wait for the hello handshake to learn the profile alias
first_msg = read_native_message(stdin)
if first_msg and first_msg.get("type") == "hello":
alias = first_msg.get("alias") or DEFAULT_ALIAS
else:
# No hello — fall back to default, re-queue message if it was a command
alias = DEFAULT_ALIAS
if first_msg:
msg_id = first_msg.get("id")
if msg_id:
q: queue.Queue = queue.Queue()
with PENDING_LOCK:
PENDING[msg_id] = q
write_native_message(sys.stdout.buffer, first_msg)
sock_path = _socket_path_for(alias)
_registry_add(alias, sock_path)
t = threading.Thread(target=socket_server, args=(sock_path,), daemon=True)
t.start()
stdin_reader(alias)
if __name__ == "__main__":
main()