""" Local IPC client — sends commands to native host relay endpoint. Used by both CLI and public Python API. Profile selection order: 1. Explicit `profile` argument to send_command() 2. BROWSER_CLI_PROFILE environment variable 3. First entry in runtime registry 4. Otherwise, no browser can be resolved automatically """ import json import os import re import socket import struct import sys import uuid from dataclasses import dataclass from multiprocessing.connection import Client as PipeClient from pathlib import Path from typing import Any from browser_cli.platform import endpoint_for_alias, is_windows, registry_path from browser_cli.version_manager import MAX_MSG_BYTES as _MAX_MSG_BYTES from browser_cli.registry import load_registry try: from importlib.metadata import version as _pkg_version _USER_AGENT = f"browser-cli/{_pkg_version('browser-cli')}" except Exception: _USER_AGENT = "browser-cli/0" REGISTRY_PATH = registry_path() REMOTE_REGISTRY_PATH = Path(os.environ.get("XDG_CONFIG_HOME", Path.home() / ".config")) / "browser-cli" / "remotes.json" _DEFAULT_KEY_PATH = Path(os.environ.get("XDG_CONFIG_HOME", str(Path.home() / ".config"))) / "browser-cli" / "client.key.pem" _DEFAULT_REMOTE_PORT = 443 def _looks_like_domain(host: str) -> bool: """True if host looks like a domain name rather than an IP address or localhost.""" if host in {"localhost", "127.0.0.1", "::1"}: return False if re.match(r'^\d{1,3}(\.\d{1,3}){3}$', host): return False return '.' in host and any(c.isalpha() for c in host) def _normalize_endpoint(endpoint: str) -> str: """Strip :443 from domain-like endpoints so they are stored without the default port.""" if not endpoint: return endpoint host, sep, port = endpoint.rpartition(":") if sep and port == "443" and _looks_like_domain(host): return host return endpoint def _resolve_connect_endpoint(endpoint: str) -> str: """Return host:port for TCP connection; domain without port defaults to :443.""" _, sep, _ = endpoint.rpartition(":") if not sep: if _looks_like_domain(endpoint): return f"{endpoint}:{_DEFAULT_REMOTE_PORT}" raise BrowserNotConnected( f"Invalid remote endpoint '{endpoint}': expected host:port" ) return endpoint class BrowserNotConnected(Exception): """Raised when the native host socket is not available.""" @dataclass(frozen=True) class BrowserTarget: profile: str display_name: str socket_path: str remote: str | None = None def _is_reachable_unix_endpoint(endpoint: str) -> bool: """Return True when a Unix socket path exists and accepts connections.""" path = Path(endpoint) if not path.exists(): return False try: with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as sock: sock.settimeout(0.2) sock.connect(endpoint) return True except OSError: return False def _active_endpoints(reg: dict) -> dict: """Return only entries whose endpoint appears reachable.""" if is_windows(): return dict(reg) return {k: v for k, v in reg.items() if _is_reachable_unix_endpoint(v)} def display_browser_name(profile_name: str, sock_path: str) -> str: if profile_name != "default": return profile_name return Path(sock_path).stem or profile_name def _load_remotes() -> dict[str, dict[str, str]]: if not REMOTE_REGISTRY_PATH.exists(): return {} try: data = json.loads(REMOTE_REGISTRY_PATH.read_text(encoding="utf-8")) except Exception: return {} if not isinstance(data, dict): return {} # normalize keys so old entries stored as "domain:443" match current lookups return {_normalize_endpoint(str(endpoint)): cfg for endpoint, cfg in data.items() if isinstance(cfg, dict)} def _is_valid_key_spec(s: str) -> bool: """Return True if s looks like a usable key spec: 'agent', 'agent:', or a file path.""" return s == "agent" or s.startswith("agent:") or (not s.startswith("<") and ("/" in s or Path(s).suffix in {".pem", ".key"})) def save_remote_key(endpoint: str, key_spec: str) -> None: """Persist the key spec (e.g. 'agent' or a file path) for a remote endpoint.""" if not endpoint or not key_spec: return if not _is_valid_key_spec(key_spec): return # refuse to save serialized objects or other garbage remotes = _load_remotes() current = remotes.get(endpoint, {}) current["key"] = key_spec remotes[endpoint] = current REMOTE_REGISTRY_PATH.parent.mkdir(parents=True, exist_ok=True) fd = os.open(str(REMOTE_REGISTRY_PATH), os.O_WRONLY | os.O_CREAT | os.O_TRUNC, 0o600) with os.fdopen(fd, "w", encoding="utf-8") as f: f.write(json.dumps(remotes, indent=2, sort_keys=True)) def key_for_remote(endpoint: str | None) -> str | None: if not endpoint: return None cfg = _load_remotes().get(endpoint) or {} key = cfg.get("key") if not key: return None key_str = str(key) # reject corrupted values (e.g. str(AgentKey(...)) saved by an older bug) if not _is_valid_key_spec(key_str): return None return key_str def _remote_display_name(endpoint: str, profile_name: str, display_name: str) -> str: host, sep, port = endpoint.rpartition(":") if sep and (port == "8765" or (port == "443" and _looks_like_domain(host))): display_endpoint = host else: display_endpoint = endpoint # normalized domain (no port) or non-default port return f"{display_endpoint}:{display_name or profile_name}" def remote_browser_targets(endpoint: str, key=None) -> list[BrowserTarget]: """Return browser targets advertised by a single remote endpoint.""" remote_targets = send_command("browser-cli.targets", remote=endpoint, key=key) targets: list[BrowserTarget] = [] for item in remote_targets or []: profile = str(item.get("profile") or "default") display = str(item.get("displayName") or profile) targets.append( BrowserTarget( profile=profile, display_name=_remote_display_name(endpoint, profile, display), socket_path="", remote=endpoint, ) ) return targets def _remote_browser_targets(key=None) -> list[BrowserTarget]: targets: list[BrowserTarget] = [] for endpoint in _load_remotes(): try: targets.extend(remote_browser_targets(endpoint, key=key)) except (BrowserNotConnected, RuntimeError): continue return targets def remote_target_for_alias(alias: str | None) -> BrowserTarget | None: """Resolve a user-facing remote alias such as 'host:profile' to a target.""" if not alias: return None targets = _remote_browser_targets() for target in targets: endpoint_profile = f"{target.remote}:{target.profile}" if target.remote else None if alias in {target.display_name, endpoint_profile}: return target endpoint_matches = [] for target in targets: if not target.remote: continue remote_host, sep, _remote_port = target.remote.rpartition(":") if alias == target.remote or (sep and alias == remote_host): endpoint_matches.append(target) if len(endpoint_matches) == 1: return endpoint_matches[0] if len(endpoint_matches) > 1: aliases = [target.profile for target in endpoint_matches] endpoint = endpoint_matches[0].remote or alias examples = "\n".join( f" browser-cli --remote {endpoint} --browser {a} ..." for a in aliases ) display_aliases = [target.display_name for target in endpoint_matches] shorthand_examples = "\n".join( f" browser-cli --browser {a} ..." for a in display_aliases ) raise BrowserNotConnected( f"Multiple remote browser instances are active on {alias}: {', '.join(aliases)}\n" f"Use --browser with --remote to select one:\n{examples}\n" f"Or use the full remote browser alias:\n{shorthand_examples}" ) return None def active_browser_targets(*, include_remotes: bool = True, key=None) -> list[BrowserTarget]: targets: list[BrowserTarget] = [] if REGISTRY_PATH.exists(): reg = load_registry(REGISTRY_PATH) targets.extend( BrowserTarget(profile=profile, display_name=display_browser_name(profile, sock_path), socket_path=sock_path) for profile, sock_path in _active_endpoints(reg).items() ) if include_remotes: targets.extend(_remote_browser_targets(key=key)) return targets def _is_active_local_profile(profile: str | None) -> bool: """Return True when profile names a reachable local browser endpoint.""" if not profile: return False if REGISTRY_PATH.exists(): reg = load_registry(REGISTRY_PATH) if profile in _active_endpoints(reg): return True if not is_windows(): try: return Path(endpoint_for_alias(profile)).exists() except Exception: return False return False def _resolve_socket(profile: str | None = None) -> str: """Return the socket path for the given profile (or auto-detect).""" target = profile or os.environ.get("BROWSER_CLI_PROFILE") if target: if REGISTRY_PATH.exists(): reg = load_registry(REGISTRY_PATH) if target in reg: return reg[target] return endpoint_for_alias(target) # Auto-detect: error when multiple browser instances are active try: active = active_browser_targets(include_remotes=False) if len(active) > 1: aliases = [target.profile for target in active] examples = "\n".join(f" browser-cli --browser {a} ..." for a in aliases) raise BrowserNotConnected( f"Multiple browser instances are active: {', '.join(aliases)}\n" f"Use --browser to select one:\n{examples}" ) if active: return active[0].socket_path except BrowserNotConnected: raise except Exception: pass raise BrowserNotConnected( "Cannot resolve a browser socket automatically.\n" "Make sure the browser is running with the browser-cli extension enabled,\n" "or pass --browser / set BROWSER_CLI_PROFILE to a known alias." ) def _load_private_key(key_path: "Path | str | None" = None): """Load an Ed25519 signing key. Sources (in priority order): 1. Explicit key_path / --key flag 2. BROWSER_CLI_KEY environment variable 3. Default PEM file (~/.config/browser-cli/client.key.pem) Pass "agent" or "agent:" to use a key from the SSH agent (works with YubiKey via gpg-agent, TPM, or regular ssh-agent). """ raw = str(key_path) if key_path is not None else os.environ.get("BROWSER_CLI_KEY", str(_DEFAULT_KEY_PATH)) if raw == "agent" or raw.startswith("agent:"): selector = raw[6:] or None # "agent:cardno:..." → "cardno:..." from browser_cli.auth import agent_find_key return agent_find_key(selector) path = Path(raw) if not path.exists(): return None try: from browser_cli.auth import load_private_key return load_private_key(path) except Exception: return None def _send_remote(endpoint: str, msg: dict, private_key=None) -> bytes | None: connect_ep = _resolve_connect_endpoint(endpoint) host, _, port_str = connect_ep.rpartition(":") port = int(port_str) raw_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) raw_sock.settimeout(30) try: raw_sock.connect((host, port)) if port == 443: import ssl ctx = ssl.create_default_context() sock = ctx.wrap_socket(raw_sock, server_hostname=host) else: sock = raw_sock except Exception: raw_sock.close() raise with sock: # receive challenge challenge_raw = _recv_all(sock) if challenge_raw is None: raise BrowserNotConnected(f"No challenge received from {endpoint}") try: challenge = json.loads(challenge_raw) nonce_hex = challenge.get("nonce") if challenge.get("type") == "challenge" else None except (json.JSONDecodeError, AttributeError): nonce_hex = None min_ver = challenge.get("min_client_version") if isinstance(challenge, dict) else None if min_ver: from browser_cli.version_manager import parse_version try: client_ver = _USER_AGENT.split("/", 1)[1] if parse_version(client_ver) < parse_version(min_ver): raise BrowserNotConnected( f"Client version {client_ver} is too old for this server " f"(requires >= {min_ver}). Run: pip install --upgrade browser-cli" ) except (IndexError, ValueError): pass pq_shared_secret = None if nonce_hex and private_key is not None: from browser_cli.auth import PQ_KEX_ALG, pq_encrypt, pq_kex_client_encapsulate, sign, public_key_hex nonce = bytes.fromhex(nonce_hex) clean_msg = {k: v for k, v in msg.items() if k not in {"token", "pubkey", "sig", "pq_kex", "encrypted"}} kex = challenge.get("pq_kex") if isinstance(challenge, dict) else None if isinstance(kex, dict) and kex.get("alg") == PQ_KEX_ALG and kex.get("public_key"): ciphertext_hex, pq_shared_secret = pq_kex_client_encapsulate(str(kex["public_key"])) clean_msg["pq_kex"] = {"alg": PQ_KEX_ALG, "ciphertext": ciphertext_hex} else: sys.stderr.write( "** WARNING: connection is not using a post-quantum key exchange algorithm.\n" "** This session may be vulnerable to store now, decrypt later attacks.\n" ) sig = sign(private_key, nonce, clean_msg, pq_shared_secret) msg = {**clean_msg, "pubkey": public_key_hex(private_key), "sig": sig.hex()} if pq_shared_secret is not None: encrypted = pq_encrypt(pq_shared_secret, "request", json.dumps(clean_msg).encode("utf-8")) msg = { "id": clean_msg.get("id"), "user_agent": clean_msg.get("user_agent"), "pubkey": public_key_hex(private_key), "sig": sig.hex(), "pq_kex": clean_msg["pq_kex"], "encrypted": encrypted, } else: sys.stderr.write( "** WARNING: connection is not using a post-quantum key exchange algorithm.\n" "** This session may be vulnerable to store now, decrypt later attacks.\n" ) payload = json.dumps(msg).encode("utf-8") framed = struct.pack(" str | None: targets = remote_browser_targets(endpoint, key=key) if len(targets) == 1: return targets[0].profile if len(targets) > 1: aliases = [target.profile for target in targets] examples = "\n".join(f" browser-cli --remote {endpoint} --browser {a} ..." for a in aliases) raise BrowserNotConnected( f"Multiple remote browser instances are active: {', '.join(aliases)}\n" f"Use --browser to select one:\n{examples}" ) return None def send_command(command: str, args: dict | None = None, profile: str | None = None, remote: str | None = None, key: "Path | None" = None) -> Any: """Send a command to the browser and return the response data.""" requested_profile = profile or os.environ.get("BROWSER_CLI_PROFILE") remote_endpoint = remote or os.environ.get("BROWSER_CLI_REMOTE") if remote_endpoint: remote_endpoint = _normalize_endpoint(remote_endpoint) remote_alias_target = None if not remote_endpoint and requested_profile and not _is_active_local_profile(requested_profile): remote_alias_target = remote_target_for_alias(requested_profile) if remote_alias_target: remote_endpoint = remote_alias_target.remote requested_profile = remote_alias_target.profile msg = { "id": str(uuid.uuid4()), "command": command, "args": args or {}, } if remote_endpoint: msg["user_agent"] = _USER_AGENT # key priority: explicit flag > saved per-remote config > BROWSER_CLI_KEY env > default file key_spec = key if key is not None else key_for_remote(remote_endpoint) private_key = _load_private_key(key_spec) # persist explicit key spec so future calls don't need --key if key is not None: save_remote_key(remote_endpoint, str(key)) route_profile = requested_profile _no_route_commands = {"browser-cli.targets", "browser-cli.auth.keys", "browser-cli.auth.trust"} if not route_profile and command not in _no_route_commands: route_profile = _auto_route_remote(remote_endpoint, key=key_spec) if route_profile: msg["_route"] = route_profile else: private_key = None try: if remote_endpoint: response = _send_remote(remote_endpoint, msg, private_key) elif is_windows(): payload = json.dumps(msg).encode("utf-8") sock_path = _resolve_socket(profile) with PipeClient(sock_path, family="AF_PIPE") as conn: conn.send_bytes(payload) response = conn.recv_bytes() else: payload = json.dumps(msg).encode("utf-8") sock_path = _resolve_socket(profile) with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as sock: sock.connect(sock_path) sock.sendall(struct.pack("\n" " 3. Your browser is running\n" " Tip: use BROWSER_CLI_PROFILE= to select a specific profile" ) if response is None: raise ConnectionError("Connection closed before full response received") result = json.loads(response) if not result.get("success", True): raise RuntimeError(result.get("error", "unknown error from browser")) return result.get("data") def _recv_all(sock: socket.socket) -> bytes: raw_len = _recv_exact(sock, 4) msg_len = struct.unpack(" _MAX_MSG_BYTES: raise ConnectionError(f"Response too large ({msg_len} bytes)") return _recv_exact(sock, msg_len) def _recv_exact(sock: socket.socket, n: int) -> bytes: buf = b"" while len(buf) < n: chunk = sock.recv(n - len(buf)) if not chunk: raise ConnectionError("Socket closed before full message received") buf += chunk return buf