Files
browser-cli/browser_cli/native_host.py
T
2026-04-13 11:02:54 +02:00

246 lines
6.7 KiB
Python

#!/usr/bin/env python3
"""
Native Messaging Host for browser-cli.
Chrome launches this process when extension calls connectNative().
It relays messages between extension (stdin/stdout Native Messaging protocol)
and CLI (local IPC endpoint: Unix socket on Unix, named pipe on Windows).
"""
import json
import os
import queue
import socket
import struct
import sys
import threading
import uuid
from multiprocessing.connection import Listener
from pathlib import Path
from browser_cli.platform import DEFAULT_ALIAS, endpoint_for_alias, is_windows, registry_path, runtime_dir
SOCKET_PATH: str = "" # set after hello handshake
PENDING: dict[str, queue.Queue] = {}
PENDING_LOCK = threading.Lock()
REGISTRY_PATH = registry_path()
# --- Native Messaging protocol (4-byte LE length prefix + UTF-8 JSON) ---
def read_native_message(stream) -> dict | None:
raw_len = stream.read(4)
if len(raw_len) < 4:
return None
msg_len = struct.unpack("<I", raw_len)[0]
data = stream.read(msg_len)
if len(data) < msg_len:
return None
return json.loads(data.decode("utf-8"))
def write_native_message(stream, msg: dict) -> None:
data = json.dumps(msg).encode("utf-8")
stream.write(struct.pack("<I", len(data)))
stream.write(data)
stream.flush()
# --- Registry helpers ---
def _registry_add(alias: str, sock_path: str) -> None:
try:
reg = json.loads(REGISTRY_PATH.read_text()) if REGISTRY_PATH.exists() else {}
reg[alias] = sock_path
REGISTRY_PATH.write_text(json.dumps(reg))
except Exception:
pass
def _registry_remove(alias: str) -> None:
try:
if not REGISTRY_PATH.exists():
return
reg = json.loads(REGISTRY_PATH.read_text())
reg.pop(alias, None)
REGISTRY_PATH.write_text(json.dumps(reg))
except Exception:
pass
def _socket_path_for(alias: str) -> str:
return endpoint_for_alias(alias)
def _resolve_profile_alias(first_msg: dict | None) -> str:
"""Return a unique alias when the extension did not provide one."""
if first_msg and first_msg.get("type") == "hello":
alias = first_msg.get("alias")
if alias and alias != DEFAULT_ALIAS:
return alias
return str(uuid.uuid4())
# --- Thread A: read messages from extension (stdin) ---
def stdin_reader(alias: str):
stdin = sys.stdin.buffer
while True:
msg = read_native_message(stdin)
if msg is None:
# Extension disconnected — clean up and exit
_cleanup(alias)
os._exit(0)
# Profile alias handshake
if msg.get("type") == "hello":
continue # already handled during startup
if msg.get("type") == "bye":
_cleanup(alias)
os._exit(0)
msg_id = msg.get("id")
if msg_id:
with PENDING_LOCK:
q = PENDING.get(msg_id)
if q:
q.put(msg)
# --- Thread B: accept CLI socket connections ---
def socket_server(sock_path: str):
if is_windows():
while True:
try:
listener = Listener(sock_path, family="AF_PIPE")
conn = listener.accept()
except OSError:
break
threading.Thread(target=handle_cli_connection, args=(conn, listener), daemon=True).start()
return
path = Path(sock_path)
if path.exists():
path.unlink()
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.bind(sock_path)
sock.listen(16)
while True:
try:
conn, _ = sock.accept()
except OSError:
break
threading.Thread(target=handle_cli_connection, args=(conn, None), daemon=True).start()
def handle_cli_connection(conn, listener=None) -> None:
try:
data = conn.recv_bytes() if is_windows() else _recv_all(conn)
if not data:
return
cmd = json.loads(data)
if "id" not in cmd:
cmd["id"] = str(uuid.uuid4())
msg_id = cmd["id"]
response_queue: queue.Queue = queue.Queue()
with PENDING_LOCK:
PENDING[msg_id] = response_queue
write_native_message(sys.stdout.buffer, cmd)
try:
result = response_queue.get(timeout=30)
except queue.Empty:
result = {"id": msg_id, "success": False, "error": "timeout waiting for browser response"}
with PENDING_LOCK:
PENDING.pop(msg_id, None)
response = json.dumps(result).encode("utf-8")
if is_windows():
conn.send_bytes(response)
else:
_send_all(conn, response)
except Exception as exc:
try:
response = json.dumps({"success": False, "error": str(exc)}).encode("utf-8")
if is_windows():
conn.send_bytes(response)
else:
_send_all(conn, response)
except Exception:
pass
finally:
conn.close()
if listener is not None:
listener.close()
# --- Socket helpers (length-prefixed framing) ---
def _send_all(conn: socket.socket, data: bytes) -> None:
framed = struct.pack("<I", len(data)) + data
conn.sendall(framed)
def _recv_all(conn: socket.socket) -> bytes | None:
raw_len = _recv_exact(conn, 4)
if raw_len is None:
return None
msg_len = struct.unpack("<I", raw_len)[0]
return _recv_exact(conn, msg_len)
def _recv_exact(conn: socket.socket, n: int) -> bytes | None:
buf = b""
while len(buf) < n:
chunk = conn.recv(n - len(buf))
if not chunk:
return None
buf += chunk
return buf
def _cleanup(alias: str):
try:
if not is_windows():
Path(_socket_path_for(alias)).unlink(missing_ok=True)
except Exception:
pass
_registry_remove(alias)
def main():
stdin = sys.stdin.buffer
# Wait for the hello handshake to learn the profile alias
first_msg = read_native_message(stdin)
if first_msg and first_msg.get("type") == "hello":
alias = _resolve_profile_alias(first_msg)
else:
# No hello — use a generated alias and re-queue the first command if needed.
alias = str(uuid.uuid4())
if first_msg:
msg_id = first_msg.get("id")
if msg_id:
q: queue.Queue = queue.Queue()
with PENDING_LOCK:
PENDING[msg_id] = q
write_native_message(sys.stdout.buffer, first_msg)
runtime_dir().mkdir(mode=0o700, exist_ok=True)
sock_path = _socket_path_for(alias)
_registry_add(alias, sock_path)
t = threading.Thread(target=socket_server, args=(sock_path,), daemon=True)
t.start()
stdin_reader(alias)
if __name__ == "__main__":
main()