Files
browser-cli/browser_cli/client.py
T
daniel156161 c1a5ef9dd7
Testing / test (push) Successful in 41s
Package Extension / package-extension (push) Successful in 35s
Build & Publish Package / publish (push) Successful in 46s
feat: token-auth removal, security hardening, Stripe-style compat layer (v0.9.2)
- Remove token auth entirely; only Ed25519 pubkey auth or --no-auth
- Add 32 MB message-size cap in serve and client (DoS protection)
- Set Unix socket to 0o600 after bind in native_host (multi-user hardening)
- Enforce browser-cli/VERSION user-agent on all TCP connections
- Add PROTOCOL_MIN_CLIENT check (>= 0.9.0) server- and client-side
- Include server_version + min_client_version in challenge frame
- Add browser_cli/version_manager.py: parse_version, get_installed_version
- Add browser_cli/compat.py: Stripe-style versioning layer with adapt_request
  / adapt_response hooks; baseline 0.9.2, no shims needed yet
- Fix BrowserCLI key handling: no Path() wrap for agent specs
- Fix _multi_browser_targets() to forward key to remote_browser_targets()

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-02 21:59:46 +02:00

390 lines
15 KiB
Python

"""
Local IPC client — sends commands to native host relay endpoint.
Used by both CLI and public Python API.
Profile selection order:
1. Explicit `profile` argument to send_command()
2. BROWSER_CLI_PROFILE environment variable
3. First entry in runtime registry
4. Otherwise, no browser can be resolved automatically
"""
import json
import os
import socket
import struct
import uuid
from dataclasses import dataclass
from multiprocessing.connection import Client as PipeClient
from pathlib import Path
from typing import Any
from browser_cli.platform import endpoint_for_alias, is_windows, registry_path
from browser_cli.registry import load_registry
try:
from importlib.metadata import version as _pkg_version
_USER_AGENT = f"browser-cli/{_pkg_version('browser-cli')}"
except Exception:
_USER_AGENT = "browser-cli/0"
REGISTRY_PATH = registry_path()
REMOTE_REGISTRY_PATH = Path(os.environ.get("XDG_CONFIG_HOME", Path.home() / ".config")) / "browser-cli" / "remotes.json"
_DEFAULT_KEY_PATH = Path(os.environ.get("XDG_CONFIG_HOME", str(Path.home() / ".config"))) / "browser-cli" / "client.key.pem"
class BrowserNotConnected(Exception):
"""Raised when the native host socket is not available."""
@dataclass(frozen=True)
class BrowserTarget:
profile: str
display_name: str
socket_path: str
remote: str | None = None
def _active_endpoints(reg: dict) -> dict:
"""Return only entries whose endpoint appears reachable."""
if is_windows():
return dict(reg)
return {k: v for k, v in reg.items() if Path(v).exists()}
def display_browser_name(profile_name: str, sock_path: str) -> str:
if profile_name != "default":
return profile_name
return Path(sock_path).stem or profile_name
def _load_remotes() -> dict[str, dict[str, str]]:
if not REMOTE_REGISTRY_PATH.exists():
return {}
try:
data = json.loads(REMOTE_REGISTRY_PATH.read_text(encoding="utf-8"))
except Exception:
return {}
if not isinstance(data, dict):
return {}
return {str(endpoint): cfg for endpoint, cfg in data.items() if isinstance(cfg, dict)}
def _is_valid_key_spec(s: str) -> bool:
"""Return True if s looks like a usable key spec: 'agent', 'agent:<sel>', or a file path."""
return s == "agent" or s.startswith("agent:") or (not s.startswith("<") and "/" in s or Path(s).suffix in {".pem", ".key"})
def save_remote_key(endpoint: str, key_spec: str) -> None:
"""Persist the key spec (e.g. 'agent' or a file path) for a remote endpoint."""
if not endpoint or not key_spec:
return
if not _is_valid_key_spec(key_spec):
return # refuse to save serialized objects or other garbage
remotes = _load_remotes()
current = remotes.get(endpoint, {})
current["key"] = key_spec
remotes[endpoint] = current
REMOTE_REGISTRY_PATH.parent.mkdir(parents=True, exist_ok=True)
fd = os.open(str(REMOTE_REGISTRY_PATH), os.O_WRONLY | os.O_CREAT | os.O_TRUNC, 0o600)
with os.fdopen(fd, "w", encoding="utf-8") as f:
f.write(json.dumps(remotes, indent=2, sort_keys=True))
def key_for_remote(endpoint: str | None) -> str | None:
if not endpoint:
return None
cfg = _load_remotes().get(endpoint) or {}
key = cfg.get("key")
if not key:
return None
key_str = str(key)
# reject corrupted values (e.g. str(AgentKey(...)) saved by an older bug)
if not _is_valid_key_spec(key_str):
return None
return key_str
def _remote_display_name(endpoint: str, profile_name: str, display_name: str) -> str:
host, sep, port = endpoint.rpartition(":")
remote_name = host if sep and port == "8765" else endpoint
return f"{remote_name}:{display_name or profile_name}"
def remote_browser_targets(endpoint: str, key=None) -> list[BrowserTarget]:
"""Return browser targets advertised by a single remote endpoint."""
remote_targets = send_command("browser-cli.targets", remote=endpoint, key=key)
targets: list[BrowserTarget] = []
for item in remote_targets or []:
profile = str(item.get("profile") or "default")
display = str(item.get("displayName") or profile)
targets.append(
BrowserTarget(
profile=profile,
display_name=_remote_display_name(endpoint, profile, display),
socket_path="",
remote=endpoint,
)
)
return targets
def _remote_browser_targets(key=None) -> list[BrowserTarget]:
targets: list[BrowserTarget] = []
for endpoint in _load_remotes():
try:
targets.extend(remote_browser_targets(endpoint, key=key))
except (BrowserNotConnected, RuntimeError):
continue
return targets
def remote_target_for_alias(alias: str | None) -> BrowserTarget | None:
"""Resolve a user-facing remote alias such as 'host:profile' to a target."""
if not alias:
return None
targets = _remote_browser_targets()
for target in targets:
endpoint_profile = f"{target.remote}:{target.profile}" if target.remote else None
if alias in {target.display_name, endpoint_profile}:
return target
endpoint_matches = []
for target in targets:
if not target.remote:
continue
remote_host, sep, _remote_port = target.remote.rpartition(":")
if alias == target.remote or (sep and alias == remote_host):
endpoint_matches.append(target)
if len(endpoint_matches) == 1:
return endpoint_matches[0]
return None
def active_browser_targets(*, include_remotes: bool = True, key=None) -> list[BrowserTarget]:
targets: list[BrowserTarget] = []
if REGISTRY_PATH.exists():
reg = load_registry(REGISTRY_PATH)
targets.extend(
BrowserTarget(profile=profile, display_name=display_browser_name(profile, sock_path), socket_path=sock_path)
for profile, sock_path in _active_endpoints(reg).items()
)
if include_remotes:
targets.extend(_remote_browser_targets(key=key))
return targets
def _resolve_socket(profile: str | None = None) -> str:
"""Return the socket path for the given profile (or auto-detect)."""
target = profile or os.environ.get("BROWSER_CLI_PROFILE")
if target:
if REGISTRY_PATH.exists():
reg = load_registry(REGISTRY_PATH)
if target in reg:
return reg[target]
return endpoint_for_alias(target)
# Auto-detect: error when multiple browser instances are active
try:
active = active_browser_targets(include_remotes=False)
if len(active) > 1:
aliases = [target.profile for target in active]
examples = "\n".join(f" browser-cli --browser {a} ..." for a in aliases)
raise BrowserNotConnected(
f"Multiple browser instances are active: {', '.join(aliases)}\n"
f"Use --browser <alias> to select one:\n{examples}"
)
if active:
return active[0].socket_path
except BrowserNotConnected:
raise
except Exception:
pass
raise BrowserNotConnected(
"Cannot resolve a browser socket automatically.\n"
"Make sure the browser is running with the browser-cli extension enabled,\n"
"or pass --browser <alias> / set BROWSER_CLI_PROFILE to a known alias."
)
def _load_private_key(key_path: "Path | str | None" = None):
"""Load an Ed25519 signing key.
Sources (in priority order):
1. Explicit key_path / --key flag
2. BROWSER_CLI_KEY environment variable
3. Default PEM file (~/.config/browser-cli/client.key.pem)
Pass "agent" or "agent:<selector>" to use a key from the SSH agent
(works with YubiKey via gpg-agent, TPM, or regular ssh-agent).
"""
raw = str(key_path) if key_path is not None else os.environ.get("BROWSER_CLI_KEY", str(_DEFAULT_KEY_PATH))
if raw == "agent" or raw.startswith("agent:"):
selector = raw[6:] or None # "agent:cardno:..." → "cardno:..."
from browser_cli.auth import agent_find_key
return agent_find_key(selector)
path = Path(raw)
if not path.exists():
return None
try:
from browser_cli.auth import load_private_key
return load_private_key(path)
except Exception:
return None
def _send_remote(endpoint: str, msg: dict, private_key=None) -> bytes | None:
host, _, port_str = endpoint.rpartition(":")
if not host or not port_str:
raise BrowserNotConnected(f"Invalid remote endpoint '{endpoint}': expected host:port")
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.settimeout(30)
sock.connect((host, int(port_str)))
# receive challenge
challenge_raw = _recv_all(sock)
if challenge_raw is None:
raise BrowserNotConnected(f"No challenge received from {endpoint}")
try:
challenge = json.loads(challenge_raw)
nonce_hex = challenge.get("nonce") if challenge.get("type") == "challenge" else None
except (json.JSONDecodeError, AttributeError):
nonce_hex = None
min_ver = challenge.get("min_client_version") if isinstance(challenge, dict) else None
if min_ver:
from browser_cli.version_manager import parse_version
try:
client_ver = _USER_AGENT.split("/", 1)[1]
if parse_version(client_ver) < parse_version(min_ver):
raise BrowserNotConnected(
f"Client version {client_ver} is too old for this server "
f"(requires >= {min_ver}). Run: pip install --upgrade browser-cli"
)
except (IndexError, ValueError):
pass
if nonce_hex and private_key is not None:
from browser_cli.auth import sign, public_key_hex
nonce = bytes.fromhex(nonce_hex)
clean_msg = {k: v for k, v in msg.items() if k not in {"token", "pubkey", "sig"}}
sig = sign(private_key, nonce, clean_msg)
msg = {**clean_msg, "pubkey": public_key_hex(private_key), "sig": sig.hex()}
payload = json.dumps(msg).encode("utf-8")
framed = struct.pack("<I", len(payload)) + payload
sock.sendall(framed)
return _recv_all(sock)
def _auto_route_remote(endpoint: str, key=None) -> str | None:
targets = remote_browser_targets(endpoint, key=key)
if len(targets) == 1:
return targets[0].profile
if len(targets) > 1:
aliases = [target.profile for target in targets]
examples = "\n".join(f" browser-cli --remote {endpoint} --browser {a} ..." for a in aliases)
raise BrowserNotConnected(
f"Multiple remote browser instances are active: {', '.join(aliases)}\n"
f"Use --browser <alias> to select one:\n{examples}"
)
return None
def send_command(command: str, args: dict | None = None, profile: str | None = None, remote: str | None = None, key: "Path | None" = None) -> Any:
"""Send a command to the browser and return the response data."""
requested_profile = profile or os.environ.get("BROWSER_CLI_PROFILE")
remote_endpoint = remote or os.environ.get("BROWSER_CLI_REMOTE")
remote_alias_target = None
if not remote_endpoint and requested_profile:
remote_alias_target = remote_target_for_alias(requested_profile)
if remote_alias_target:
remote_endpoint = remote_alias_target.remote
requested_profile = remote_alias_target.profile
msg = {
"id": str(uuid.uuid4()),
"command": command,
"args": args or {},
}
if remote_endpoint:
msg["user_agent"] = _USER_AGENT
# key priority: explicit flag > saved per-remote config > BROWSER_CLI_KEY env > default file
key_spec = key if key is not None else key_for_remote(remote_endpoint)
private_key = _load_private_key(key_spec)
# persist explicit key spec so future calls don't need --key
if key is not None:
save_remote_key(remote_endpoint, str(key))
route_profile = requested_profile
_no_route_commands = {"browser-cli.targets", "browser-cli.auth.keys", "browser-cli.auth.trust"}
if not route_profile and command not in _no_route_commands:
route_profile = _auto_route_remote(remote_endpoint, key=key_spec)
if route_profile:
msg["_route"] = route_profile
else:
private_key = None
payload = json.dumps(msg).encode("utf-8")
framed = struct.pack("<I", len(payload)) + payload
try:
if remote_endpoint:
response = _send_remote(remote_endpoint, msg, private_key)
elif is_windows():
sock_path = _resolve_socket(profile)
with PipeClient(sock_path, family="AF_PIPE") as conn:
conn.send_bytes(payload)
response = conn.recv_bytes()
else:
sock_path = _resolve_socket(profile)
with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as sock:
sock.connect(sock_path)
sock.sendall(framed)
response = _recv_all(sock)
except (FileNotFoundError, ConnectionRefusedError, OSError):
if remote_endpoint:
raise BrowserNotConnected(
f"Cannot connect to remote browser at {remote_endpoint}.\n"
"Make sure browser-cli serve is running on the remote host."
)
profile_hint = f" (profile: {profile})" if profile else ""
raise BrowserNotConnected(
f"Cannot connect to browser{profile_hint}.\n"
"Make sure:\n"
" 1. The browser-cli extension is installed and enabled\n"
" 2. The native host is registered: uv run browser-cli install <browser>\n"
" 3. Your browser is running\n"
" Tip: use BROWSER_CLI_PROFILE=<name> to select a specific profile"
)
if response is None:
raise ConnectionError("Connection closed before full response received")
result = json.loads(response)
if not result.get("success", True):
raise RuntimeError(result.get("error", "unknown error from browser"))
return result.get("data")
_MAX_MSG_BYTES = 32 * 1024 * 1024
def _recv_all(sock: socket.socket) -> bytes:
raw_len = _recv_exact(sock, 4)
msg_len = struct.unpack("<I", raw_len)[0]
if msg_len > _MAX_MSG_BYTES:
raise ConnectionError(f"Response too large ({msg_len} bytes)")
return _recv_exact(sock, msg_len)
def _recv_exact(sock: socket.socket, n: int) -> bytes:
buf = b""
while len(buf) < n:
chunk = sock.recv(n - len(buf))
if not chunk:
raise ConnectionError("Socket closed before full message received")
buf += chunk
return buf