Files
browser-cli/browser_cli/native_host.py
T
daniel156161 4b2abbbfc5
Testing / test (push) Successful in 26s
Package Extension / package-extension (push) Successful in 22s
Build & Publish Package / publish (push) Successful in 27s
feat: Ed25519 challenge-response auth + YubiKey/SSH agent support (v0.9.0)
Security:
- serve.py: server now sends nonce challenge before accepting any command;
  clients sign nonce + SHA256(canonical_payload) with Ed25519 key
- New --authorized-keys FILE option for serve; token auth still works as fallback
- Connection limit: BoundedSemaphore(64) in serve.py
- Secure file creation with os.open(..., 0o600) for token/key files
- New auth.py module: keygen, file key load/save, SSH agent protocol (pure Python),
  sign/verify helpers compatible with both file keys and agent-held keys (YubiKey,
  TPM, gpg-agent)

Features:
- YubiKey support via SSH agent protocol — no new runtime deps, just $SSH_AUTH_SOCK
- New `browser-cli auth` command group: keygen, trust, show, keys
- Global --key PATH flag (or BROWSER_CLI_KEY env) selects signing key;
  pass "agent" or "agent:<selector>" to use SSH agent key
- BrowserCLI Python API gains key= parameter

Bug fixes (11 issues across two review passes):
- client.py: check response is not None before json.loads
- native_host.py: _read_exact_stream loop handles EINTR short reads; fix Windows
  Listener leak on accept error
- __init__.py: open_wait / tabs_watch_url raise RuntimeError instead of silent None
- extension/tabs.ts: dedupe skips tabs without URL; tabsSort uses pendingUrl fallback
- extension/session.ts: removeListener before addListener prevents duplicate handlers

Breaking: TCP serve protocol now sends a challenge frame first (v0.9.0)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-02 16:20:39 +02:00

336 lines
9.3 KiB
Python

#!/usr/bin/env python3
"""
Native Messaging Host for browser-cli.
Chrome launches this process when extension calls connectNative().
It relays messages between extension (stdin/stdout Native Messaging protocol)
and CLI (local IPC endpoint: Unix socket on Unix, named pipe on Windows).
"""
import json
import math
import os
import queue
import socket
import struct
import sys
import threading
import uuid
from multiprocessing.connection import Listener
from pathlib import Path
from browser_cli.platform import DEFAULT_ALIAS, endpoint_for_alias, is_windows, registry_path, runtime_dir
from browser_cli.registry import update_registry
SOCKET_PATH: str = "" # set after hello handshake
PENDING: dict[str, queue.Queue] = {}
PENDING_LOCK = threading.Lock()
WRITE_LOCK = threading.Lock()
REGISTRY_PATH = registry_path()
PAGE_SIZE = int(os.environ.get("BROWSER_CLI_PAGE_SIZE", "100"))
PAGEABLE_COMMANDS = {
"tabs.list",
"tabs.filter",
"tabs.query",
"group.list",
"group.tabs",
"group.query",
"windows.list",
"dom.query",
"dom.text",
"dom.attr",
"extract.links",
"extract.images",
"extract.json",
"cookies.list",
"session.list",
}
# --- Native Messaging protocol (4-byte LE length prefix + UTF-8 JSON) ---
def _read_exact_stream(stream, n: int) -> bytes | None:
buf = b""
while len(buf) < n:
chunk = stream.read(n - len(buf))
if not chunk:
return None # real EOF
buf += chunk
return buf
def read_native_message(stream) -> dict | None:
raw_len = _read_exact_stream(stream, 4)
if raw_len is None:
return None
msg_len = struct.unpack("<I", raw_len)[0]
data = _read_exact_stream(stream, msg_len)
if data is None:
return None
return json.loads(data.decode("utf-8"))
def write_native_message(stream, msg: dict) -> None:
data = json.dumps(msg).encode("utf-8")
stream.write(struct.pack("<I", len(data)))
stream.write(data)
stream.flush()
# --- Registry helpers ---
def _registry_add(alias: str, sock_path: str) -> None:
try:
update_registry(alias, sock_path, REGISTRY_PATH)
except Exception:
pass
def _registry_remove(alias: str) -> None:
try:
update_registry(alias, None, REGISTRY_PATH)
except Exception:
pass
def _socket_path_for(alias: str) -> str:
return endpoint_for_alias(alias)
def _resolve_profile_alias(first_msg: dict | None) -> str:
"""Return a unique alias when the extension did not provide one."""
if first_msg and first_msg.get("type") == "hello":
alias = first_msg.get("alias")
if alias and alias != DEFAULT_ALIAS:
return alias
return str(uuid.uuid4())
# --- Thread A: read messages from extension (stdin) ---
def stdin_reader(alias: str):
stdin = sys.stdin.buffer
while True:
msg = read_native_message(stdin)
if msg is None:
# Extension disconnected — clean up and exit
_cleanup(alias)
os._exit(0)
# Profile alias handshake
if msg.get("type") == "hello":
continue # already handled during startup
if msg.get("type") == "bye":
_cleanup(alias)
os._exit(0)
msg_id = msg.get("id")
if msg_id:
with PENDING_LOCK:
q = PENDING.get(msg_id)
if q:
q.put(msg)
# --- Thread B: accept CLI socket connections ---
def socket_server(sock_path: str, bound_sock: "socket.socket | None" = None):
if is_windows():
while True:
listener = None
try:
listener = Listener(sock_path, family="AF_PIPE")
conn = listener.accept()
except OSError:
if listener is not None:
try:
listener.close()
except Exception:
pass
break
threading.Thread(target=handle_cli_connection, args=(conn, listener), daemon=True).start()
return
sock = bound_sock
if sock is None:
path = Path(sock_path)
if path.exists():
path.unlink()
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.bind(sock_path)
sock.listen(16)
while True:
try:
conn, _ = sock.accept()
except OSError:
break
threading.Thread(target=handle_cli_connection, args=(conn, None), daemon=True).start()
def handle_cli_connection(conn, listener=None) -> None:
try:
data = conn.recv_bytes() if is_windows() else _recv_all(conn)
if not data:
return
cmd = json.loads(data)
if "id" not in cmd:
cmd["id"] = str(uuid.uuid4())
result = _handle_browser_command(cmd)
response = json.dumps(result).encode("utf-8")
if is_windows():
conn.send_bytes(response)
else:
_send_all(conn, response)
except Exception as exc:
try:
response = json.dumps({"success": False, "error": str(exc)}).encode("utf-8")
if is_windows():
conn.send_bytes(response)
else:
_send_all(conn, response)
except Exception:
pass
finally:
conn.close()
if listener is not None:
listener.close()
def _handle_browser_command(cmd: dict) -> dict:
command = cmd.get("command")
if command in PAGEABLE_COMMANDS:
return _collect_paged_browser_command(cmd)
return _send_browser_command(cmd)
def _send_browser_command(cmd: dict, timeout: int = 30) -> dict:
msg_id = cmd.get("id") or str(uuid.uuid4())
cmd["id"] = msg_id
response_queue: queue.Queue = queue.Queue()
with PENDING_LOCK:
PENDING[msg_id] = response_queue
try:
with WRITE_LOCK:
write_native_message(sys.stdout.buffer, cmd)
try:
return response_queue.get(timeout=timeout)
except queue.Empty:
return {"id": msg_id, "success": False, "error": "timeout waiting for browser response"}
finally:
with PENDING_LOCK:
PENDING.pop(msg_id, None)
def _collect_paged_browser_command(cmd: dict) -> dict:
original_id = cmd.get("id") or str(uuid.uuid4())
offset = 0
items = []
total = None
max_pages = math.ceil(10_000 / PAGE_SIZE)
pages_fetched = 0
while True:
if pages_fetched >= max_pages:
return {"id": original_id, "success": False, "error": f"paging loop exceeded {max_pages} pages — extension bug?"}
pages_fetched += 1
page_cmd = dict(cmd)
page_cmd["id"] = str(uuid.uuid4())
page_args = dict(cmd.get("args") or {})
page_args["__page"] = {"offset": offset, "limit": PAGE_SIZE}
page_cmd["args"] = page_args
result = _send_browser_command(page_cmd)
result["id"] = original_id
if not result.get("success", True):
return result
data = result.get("data")
if not isinstance(data, dict) or data.get("__browserCliPage") is not True:
return result
page_items = data.get("items") or []
if not isinstance(page_items, list):
return {"id": original_id, "success": False, "error": "invalid paged response from browser"}
items.extend(page_items)
total = data.get("total", total)
next_offset = data.get("nextOffset")
if next_offset is None:
break
offset = int(next_offset)
return {"id": original_id, "success": True, "data": items, "pageSize": PAGE_SIZE, "total": total}
# --- Socket helpers (length-prefixed framing) ---
def _send_all(conn: socket.socket, data: bytes) -> None:
framed = struct.pack("<I", len(data)) + data
conn.sendall(framed)
def _recv_all(conn: socket.socket) -> bytes | None:
raw_len = _recv_exact(conn, 4)
if raw_len is None:
return None
msg_len = struct.unpack("<I", raw_len)[0]
return _recv_exact(conn, msg_len)
def _recv_exact(conn: socket.socket, n: int) -> bytes | None:
buf = b""
while len(buf) < n:
chunk = conn.recv(n - len(buf))
if not chunk:
return None
buf += chunk
return buf
def _cleanup(alias: str):
try:
if not is_windows():
Path(_socket_path_for(alias)).unlink(missing_ok=True)
except Exception:
pass
_registry_remove(alias)
def main():
stdin = sys.stdin.buffer
# Wait for the hello handshake to learn the profile alias
first_msg = read_native_message(stdin)
if first_msg and first_msg.get("type") == "hello":
alias = _resolve_profile_alias(first_msg)
else:
# No hello — use a generated alias; first_msg is dropped (no response path).
alias = str(uuid.uuid4())
runtime_dir().mkdir(mode=0o700, exist_ok=True)
sock_path = _socket_path_for(alias)
if not is_windows():
path = Path(sock_path)
if path.exists():
path.unlink()
bound_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
bound_sock.bind(sock_path)
bound_sock.listen(16)
else:
bound_sock = None
_registry_add(alias, sock_path)
t = threading.Thread(target=socket_server, args=(sock_path,), kwargs={"bound_sock": bound_sock}, daemon=True)
t.start()
stdin_reader(alias)
if __name__ == "__main__":
main()