533e9d328d
Testing / test (push) Failing after 14m32s
set_alpn_protocols(["browser-cli"]) caused TLS handshake failure (no_application_protocol alert) when connecting through a reverse proxy (e.g. Traefik) that terminates TLS but doesn't know the custom ALPN. Plain TLS without ALPN negotiation works correctly. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
439 lines
16 KiB
Python
439 lines
16 KiB
Python
"""
|
|
Local IPC client — sends commands to native host relay endpoint.
|
|
Used by both CLI and public Python API.
|
|
|
|
Profile selection order:
|
|
1. Explicit `profile` argument to send_command()
|
|
2. BROWSER_CLI_PROFILE environment variable
|
|
3. First entry in runtime registry
|
|
4. Otherwise, no browser can be resolved automatically
|
|
"""
|
|
import json
|
|
import os
|
|
import re
|
|
import socket
|
|
import struct
|
|
import uuid
|
|
from dataclasses import dataclass
|
|
from multiprocessing.connection import Client as PipeClient
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
from browser_cli.platform import endpoint_for_alias, is_windows, registry_path
|
|
from browser_cli.version_manager import MAX_MSG_BYTES as _MAX_MSG_BYTES
|
|
from browser_cli.registry import load_registry
|
|
|
|
try:
|
|
from importlib.metadata import version as _pkg_version
|
|
_USER_AGENT = f"browser-cli/{_pkg_version('browser-cli')}"
|
|
except Exception:
|
|
_USER_AGENT = "browser-cli/0"
|
|
|
|
REGISTRY_PATH = registry_path()
|
|
REMOTE_REGISTRY_PATH = Path(os.environ.get("XDG_CONFIG_HOME", Path.home() / ".config")) / "browser-cli" / "remotes.json"
|
|
_DEFAULT_KEY_PATH = Path(os.environ.get("XDG_CONFIG_HOME", str(Path.home() / ".config"))) / "browser-cli" / "client.key.pem"
|
|
|
|
_DEFAULT_REMOTE_PORT = 443
|
|
|
|
|
|
def _looks_like_domain(host: str) -> bool:
|
|
"""True if host looks like a domain name rather than an IP address or localhost."""
|
|
if host in {"localhost", "127.0.0.1", "::1"}:
|
|
return False
|
|
if re.match(r'^\d{1,3}(\.\d{1,3}){3}$', host):
|
|
return False
|
|
return '.' in host and any(c.isalpha() for c in host)
|
|
|
|
|
|
def _normalize_endpoint(endpoint: str) -> str:
|
|
"""Strip :443 from domain-like endpoints so they are stored without the default port."""
|
|
if not endpoint:
|
|
return endpoint
|
|
host, sep, port = endpoint.rpartition(":")
|
|
if sep and port == "443" and _looks_like_domain(host):
|
|
return host
|
|
return endpoint
|
|
|
|
|
|
def _resolve_connect_endpoint(endpoint: str) -> str:
|
|
"""Return host:port for TCP connection; domain without port defaults to :443."""
|
|
_, sep, _ = endpoint.rpartition(":")
|
|
if not sep:
|
|
if _looks_like_domain(endpoint):
|
|
return f"{endpoint}:{_DEFAULT_REMOTE_PORT}"
|
|
raise BrowserNotConnected(
|
|
f"Invalid remote endpoint '{endpoint}': expected host:port"
|
|
)
|
|
return endpoint
|
|
|
|
|
|
class BrowserNotConnected(Exception):
|
|
"""Raised when the native host socket is not available."""
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class BrowserTarget:
|
|
profile: str
|
|
display_name: str
|
|
socket_path: str
|
|
remote: str | None = None
|
|
|
|
|
|
def _active_endpoints(reg: dict) -> dict:
|
|
"""Return only entries whose endpoint appears reachable."""
|
|
if is_windows():
|
|
return dict(reg)
|
|
return {k: v for k, v in reg.items() if Path(v).exists()}
|
|
|
|
|
|
def display_browser_name(profile_name: str, sock_path: str) -> str:
|
|
if profile_name != "default":
|
|
return profile_name
|
|
return Path(sock_path).stem or profile_name
|
|
|
|
|
|
def _load_remotes() -> dict[str, dict[str, str]]:
|
|
if not REMOTE_REGISTRY_PATH.exists():
|
|
return {}
|
|
try:
|
|
data = json.loads(REMOTE_REGISTRY_PATH.read_text(encoding="utf-8"))
|
|
except Exception:
|
|
return {}
|
|
if not isinstance(data, dict):
|
|
return {}
|
|
# normalize keys so old entries stored as "domain:443" match current lookups
|
|
return {_normalize_endpoint(str(endpoint)): cfg for endpoint, cfg in data.items() if isinstance(cfg, dict)}
|
|
|
|
|
|
|
|
def _is_valid_key_spec(s: str) -> bool:
|
|
"""Return True if s looks like a usable key spec: 'agent', 'agent:<sel>', or a file path."""
|
|
return s == "agent" or s.startswith("agent:") or (not s.startswith("<") and ("/" in s or Path(s).suffix in {".pem", ".key"}))
|
|
|
|
|
|
def save_remote_key(endpoint: str, key_spec: str) -> None:
|
|
"""Persist the key spec (e.g. 'agent' or a file path) for a remote endpoint."""
|
|
if not endpoint or not key_spec:
|
|
return
|
|
if not _is_valid_key_spec(key_spec):
|
|
return # refuse to save serialized objects or other garbage
|
|
remotes = _load_remotes()
|
|
current = remotes.get(endpoint, {})
|
|
current["key"] = key_spec
|
|
remotes[endpoint] = current
|
|
REMOTE_REGISTRY_PATH.parent.mkdir(parents=True, exist_ok=True)
|
|
fd = os.open(str(REMOTE_REGISTRY_PATH), os.O_WRONLY | os.O_CREAT | os.O_TRUNC, 0o600)
|
|
with os.fdopen(fd, "w", encoding="utf-8") as f:
|
|
f.write(json.dumps(remotes, indent=2, sort_keys=True))
|
|
|
|
|
|
def key_for_remote(endpoint: str | None) -> str | None:
|
|
if not endpoint:
|
|
return None
|
|
cfg = _load_remotes().get(endpoint) or {}
|
|
key = cfg.get("key")
|
|
if not key:
|
|
return None
|
|
key_str = str(key)
|
|
# reject corrupted values (e.g. str(AgentKey(...)) saved by an older bug)
|
|
if not _is_valid_key_spec(key_str):
|
|
return None
|
|
return key_str
|
|
|
|
|
|
def _remote_display_name(endpoint: str, profile_name: str, display_name: str) -> str:
|
|
host, sep, port = endpoint.rpartition(":")
|
|
if sep and (port == "8765" or (port == "443" and _looks_like_domain(host))):
|
|
display_endpoint = host
|
|
else:
|
|
display_endpoint = endpoint # normalized domain (no port) or non-default port
|
|
return f"{display_endpoint}:{display_name or profile_name}"
|
|
|
|
|
|
def remote_browser_targets(endpoint: str, key=None) -> list[BrowserTarget]:
|
|
"""Return browser targets advertised by a single remote endpoint."""
|
|
remote_targets = send_command("browser-cli.targets", remote=endpoint, key=key)
|
|
targets: list[BrowserTarget] = []
|
|
for item in remote_targets or []:
|
|
profile = str(item.get("profile") or "default")
|
|
display = str(item.get("displayName") or profile)
|
|
targets.append(
|
|
BrowserTarget(
|
|
profile=profile,
|
|
display_name=_remote_display_name(endpoint, profile, display),
|
|
socket_path="",
|
|
remote=endpoint,
|
|
)
|
|
)
|
|
return targets
|
|
|
|
|
|
def _remote_browser_targets(key=None) -> list[BrowserTarget]:
|
|
targets: list[BrowserTarget] = []
|
|
for endpoint in _load_remotes():
|
|
try:
|
|
targets.extend(remote_browser_targets(endpoint, key=key))
|
|
except (BrowserNotConnected, RuntimeError):
|
|
continue
|
|
return targets
|
|
|
|
|
|
def remote_target_for_alias(alias: str | None) -> BrowserTarget | None:
|
|
"""Resolve a user-facing remote alias such as 'host:profile' to a target."""
|
|
if not alias:
|
|
return None
|
|
targets = _remote_browser_targets()
|
|
for target in targets:
|
|
endpoint_profile = f"{target.remote}:{target.profile}" if target.remote else None
|
|
if alias in {target.display_name, endpoint_profile}:
|
|
return target
|
|
|
|
endpoint_matches = []
|
|
for target in targets:
|
|
if not target.remote:
|
|
continue
|
|
remote_host, sep, _remote_port = target.remote.rpartition(":")
|
|
if alias == target.remote or (sep and alias == remote_host):
|
|
endpoint_matches.append(target)
|
|
if len(endpoint_matches) == 1:
|
|
return endpoint_matches[0]
|
|
return None
|
|
|
|
|
|
def active_browser_targets(*, include_remotes: bool = True, key=None) -> list[BrowserTarget]:
|
|
targets: list[BrowserTarget] = []
|
|
if REGISTRY_PATH.exists():
|
|
reg = load_registry(REGISTRY_PATH)
|
|
targets.extend(
|
|
BrowserTarget(profile=profile, display_name=display_browser_name(profile, sock_path), socket_path=sock_path)
|
|
for profile, sock_path in _active_endpoints(reg).items()
|
|
)
|
|
if include_remotes:
|
|
targets.extend(_remote_browser_targets(key=key))
|
|
return targets
|
|
|
|
|
|
def _resolve_socket(profile: str | None = None) -> str:
|
|
"""Return the socket path for the given profile (or auto-detect)."""
|
|
target = profile or os.environ.get("BROWSER_CLI_PROFILE")
|
|
|
|
if target:
|
|
if REGISTRY_PATH.exists():
|
|
reg = load_registry(REGISTRY_PATH)
|
|
if target in reg:
|
|
return reg[target]
|
|
return endpoint_for_alias(target)
|
|
|
|
# Auto-detect: error when multiple browser instances are active
|
|
try:
|
|
active = active_browser_targets(include_remotes=False)
|
|
if len(active) > 1:
|
|
aliases = [target.profile for target in active]
|
|
examples = "\n".join(f" browser-cli --browser {a} ..." for a in aliases)
|
|
raise BrowserNotConnected(
|
|
f"Multiple browser instances are active: {', '.join(aliases)}\n"
|
|
f"Use --browser <alias> to select one:\n{examples}"
|
|
)
|
|
if active:
|
|
return active[0].socket_path
|
|
except BrowserNotConnected:
|
|
raise
|
|
except Exception:
|
|
pass
|
|
|
|
raise BrowserNotConnected(
|
|
"Cannot resolve a browser socket automatically.\n"
|
|
"Make sure the browser is running with the browser-cli extension enabled,\n"
|
|
"or pass --browser <alias> / set BROWSER_CLI_PROFILE to a known alias."
|
|
)
|
|
|
|
|
|
def _load_private_key(key_path: "Path | str | None" = None):
|
|
"""Load an Ed25519 signing key.
|
|
|
|
Sources (in priority order):
|
|
1. Explicit key_path / --key flag
|
|
2. BROWSER_CLI_KEY environment variable
|
|
3. Default PEM file (~/.config/browser-cli/client.key.pem)
|
|
|
|
Pass "agent" or "agent:<selector>" to use a key from the SSH agent
|
|
(works with YubiKey via gpg-agent, TPM, or regular ssh-agent).
|
|
"""
|
|
raw = str(key_path) if key_path is not None else os.environ.get("BROWSER_CLI_KEY", str(_DEFAULT_KEY_PATH))
|
|
|
|
if raw == "agent" or raw.startswith("agent:"):
|
|
selector = raw[6:] or None # "agent:cardno:..." → "cardno:..."
|
|
from browser_cli.auth import agent_find_key
|
|
return agent_find_key(selector)
|
|
|
|
path = Path(raw)
|
|
if not path.exists():
|
|
return None
|
|
try:
|
|
from browser_cli.auth import load_private_key
|
|
return load_private_key(path)
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def _send_remote(endpoint: str, msg: dict, private_key=None) -> bytes | None:
|
|
connect_ep = _resolve_connect_endpoint(endpoint)
|
|
host, _, port_str = connect_ep.rpartition(":")
|
|
port = int(port_str)
|
|
raw_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
raw_sock.settimeout(30)
|
|
try:
|
|
raw_sock.connect((host, port))
|
|
if port == 443:
|
|
import ssl
|
|
ctx = ssl.create_default_context()
|
|
sock = ctx.wrap_socket(raw_sock, server_hostname=host)
|
|
else:
|
|
sock = raw_sock
|
|
except Exception:
|
|
raw_sock.close()
|
|
raise
|
|
with sock:
|
|
|
|
# receive challenge
|
|
challenge_raw = _recv_all(sock)
|
|
if challenge_raw is None:
|
|
raise BrowserNotConnected(f"No challenge received from {endpoint}")
|
|
try:
|
|
challenge = json.loads(challenge_raw)
|
|
nonce_hex = challenge.get("nonce") if challenge.get("type") == "challenge" else None
|
|
except (json.JSONDecodeError, AttributeError):
|
|
nonce_hex = None
|
|
|
|
min_ver = challenge.get("min_client_version") if isinstance(challenge, dict) else None
|
|
if min_ver:
|
|
from browser_cli.version_manager import parse_version
|
|
try:
|
|
client_ver = _USER_AGENT.split("/", 1)[1]
|
|
if parse_version(client_ver) < parse_version(min_ver):
|
|
raise BrowserNotConnected(
|
|
f"Client version {client_ver} is too old for this server "
|
|
f"(requires >= {min_ver}). Run: pip install --upgrade browser-cli"
|
|
)
|
|
except (IndexError, ValueError):
|
|
pass
|
|
|
|
if nonce_hex and private_key is not None:
|
|
from browser_cli.auth import sign, public_key_hex
|
|
nonce = bytes.fromhex(nonce_hex)
|
|
clean_msg = {k: v for k, v in msg.items() if k not in {"token", "pubkey", "sig"}}
|
|
sig = sign(private_key, nonce, clean_msg)
|
|
msg = {**clean_msg, "pubkey": public_key_hex(private_key), "sig": sig.hex()}
|
|
|
|
payload = json.dumps(msg).encode("utf-8")
|
|
framed = struct.pack("<I", len(payload)) + payload
|
|
sock.sendall(framed)
|
|
return _recv_all(sock)
|
|
|
|
|
|
def _auto_route_remote(endpoint: str, key=None) -> str | None:
|
|
targets = remote_browser_targets(endpoint, key=key)
|
|
if len(targets) == 1:
|
|
return targets[0].profile
|
|
if len(targets) > 1:
|
|
aliases = [target.profile for target in targets]
|
|
examples = "\n".join(f" browser-cli --remote {endpoint} --browser {a} ..." for a in aliases)
|
|
raise BrowserNotConnected(
|
|
f"Multiple remote browser instances are active: {', '.join(aliases)}\n"
|
|
f"Use --browser <alias> to select one:\n{examples}"
|
|
)
|
|
return None
|
|
|
|
|
|
def send_command(command: str, args: dict | None = None, profile: str | None = None, remote: str | None = None, key: "Path | None" = None) -> Any:
|
|
"""Send a command to the browser and return the response data."""
|
|
requested_profile = profile or os.environ.get("BROWSER_CLI_PROFILE")
|
|
remote_endpoint = remote or os.environ.get("BROWSER_CLI_REMOTE")
|
|
if remote_endpoint:
|
|
remote_endpoint = _normalize_endpoint(remote_endpoint)
|
|
remote_alias_target = None
|
|
if not remote_endpoint and requested_profile:
|
|
remote_alias_target = remote_target_for_alias(requested_profile)
|
|
if remote_alias_target:
|
|
remote_endpoint = remote_alias_target.remote
|
|
requested_profile = remote_alias_target.profile
|
|
|
|
msg = {
|
|
"id": str(uuid.uuid4()),
|
|
"command": command,
|
|
"args": args or {},
|
|
}
|
|
if remote_endpoint:
|
|
msg["user_agent"] = _USER_AGENT
|
|
# key priority: explicit flag > saved per-remote config > BROWSER_CLI_KEY env > default file
|
|
key_spec = key if key is not None else key_for_remote(remote_endpoint)
|
|
private_key = _load_private_key(key_spec)
|
|
# persist explicit key spec so future calls don't need --key
|
|
if key is not None:
|
|
save_remote_key(remote_endpoint, str(key))
|
|
route_profile = requested_profile
|
|
_no_route_commands = {"browser-cli.targets", "browser-cli.auth.keys", "browser-cli.auth.trust"}
|
|
if not route_profile and command not in _no_route_commands:
|
|
route_profile = _auto_route_remote(remote_endpoint, key=key_spec)
|
|
if route_profile:
|
|
msg["_route"] = route_profile
|
|
else:
|
|
private_key = None
|
|
|
|
try:
|
|
if remote_endpoint:
|
|
response = _send_remote(remote_endpoint, msg, private_key)
|
|
elif is_windows():
|
|
payload = json.dumps(msg).encode("utf-8")
|
|
sock_path = _resolve_socket(profile)
|
|
with PipeClient(sock_path, family="AF_PIPE") as conn:
|
|
conn.send_bytes(payload)
|
|
response = conn.recv_bytes()
|
|
else:
|
|
payload = json.dumps(msg).encode("utf-8")
|
|
sock_path = _resolve_socket(profile)
|
|
with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as sock:
|
|
sock.connect(sock_path)
|
|
sock.sendall(struct.pack("<I", len(payload)) + payload)
|
|
response = _recv_all(sock)
|
|
except (FileNotFoundError, ConnectionRefusedError, OSError):
|
|
if remote_endpoint:
|
|
raise BrowserNotConnected(
|
|
f"Cannot connect to remote browser at {remote_endpoint}.\n"
|
|
"Make sure browser-cli serve is running on the remote host."
|
|
)
|
|
profile_hint = f" (profile: {profile})" if profile else ""
|
|
raise BrowserNotConnected(
|
|
f"Cannot connect to browser{profile_hint}.\n"
|
|
"Make sure:\n"
|
|
" 1. The browser-cli extension is installed and enabled\n"
|
|
" 2. The native host is registered: uv run browser-cli install <browser>\n"
|
|
" 3. Your browser is running\n"
|
|
" Tip: use BROWSER_CLI_PROFILE=<name> to select a specific profile"
|
|
)
|
|
|
|
if response is None:
|
|
raise ConnectionError("Connection closed before full response received")
|
|
result = json.loads(response)
|
|
if not result.get("success", True):
|
|
raise RuntimeError(result.get("error", "unknown error from browser"))
|
|
return result.get("data")
|
|
|
|
|
|
def _recv_all(sock: socket.socket) -> bytes:
|
|
raw_len = _recv_exact(sock, 4)
|
|
msg_len = struct.unpack("<I", raw_len)[0]
|
|
if msg_len > _MAX_MSG_BYTES:
|
|
raise ConnectionError(f"Response too large ({msg_len} bytes)")
|
|
return _recv_exact(sock, msg_len)
|
|
|
|
|
|
def _recv_exact(sock: socket.socket, n: int) -> bytes:
|
|
buf = b""
|
|
while len(buf) < n:
|
|
chunk = sock.recv(n - len(buf))
|
|
if not chunk:
|
|
raise ConnectionError("Socket closed before full message received")
|
|
buf += chunk
|
|
return buf
|