298 lines
11 KiB
Python
298 lines
11 KiB
Python
"""
|
|
Local IPC client — sends commands to native host relay endpoint.
|
|
Used by both CLI and public Python API.
|
|
|
|
Profile selection order:
|
|
1. Explicit `profile` argument to send_command()
|
|
2. BROWSER_CLI_PROFILE environment variable
|
|
3. First entry in runtime registry
|
|
4. Otherwise, no browser can be resolved automatically
|
|
"""
|
|
import json
|
|
import os
|
|
import socket
|
|
import struct
|
|
import uuid
|
|
from dataclasses import dataclass
|
|
from multiprocessing.connection import Client as PipeClient
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
from browser_cli.platform import endpoint_for_alias, is_windows, registry_path
|
|
from browser_cli.registry import load_registry
|
|
|
|
REGISTRY_PATH = registry_path()
|
|
REMOTE_REGISTRY_PATH = Path(os.environ.get("XDG_CONFIG_HOME", Path.home() / ".config")) / "browser-cli" / "remotes.json"
|
|
|
|
|
|
class BrowserNotConnected(Exception):
|
|
"""Raised when the native host socket is not available."""
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class BrowserTarget:
|
|
profile: str
|
|
display_name: str
|
|
socket_path: str
|
|
remote: str | None = None
|
|
token: str | None = None
|
|
|
|
|
|
def _active_endpoints(reg: dict) -> dict:
|
|
"""Return only entries whose endpoint appears reachable."""
|
|
if is_windows():
|
|
return dict(reg)
|
|
return {k: v for k, v in reg.items() if Path(v).exists()}
|
|
|
|
|
|
def display_browser_name(profile_name: str, sock_path: str) -> str:
|
|
if profile_name != "default":
|
|
return profile_name
|
|
return Path(sock_path).stem or profile_name
|
|
|
|
|
|
def _load_remotes() -> dict[str, dict[str, str]]:
|
|
if not REMOTE_REGISTRY_PATH.exists():
|
|
return {}
|
|
try:
|
|
data = json.loads(REMOTE_REGISTRY_PATH.read_text(encoding="utf-8"))
|
|
except Exception:
|
|
return {}
|
|
if not isinstance(data, dict):
|
|
return {}
|
|
return {str(endpoint): cfg for endpoint, cfg in data.items() if isinstance(cfg, dict)}
|
|
|
|
|
|
def save_remote_token(endpoint: str, token: str | None) -> None:
|
|
"""Persist the auth token for a remote endpoint used by this client."""
|
|
if not endpoint or not token:
|
|
return
|
|
remotes = _load_remotes()
|
|
current = remotes.get(endpoint, {})
|
|
current["token"] = token
|
|
remotes[endpoint] = current
|
|
REMOTE_REGISTRY_PATH.parent.mkdir(parents=True, exist_ok=True)
|
|
REMOTE_REGISTRY_PATH.write_text(json.dumps(remotes, indent=2, sort_keys=True), encoding="utf-8")
|
|
try:
|
|
REMOTE_REGISTRY_PATH.chmod(0o600)
|
|
except OSError:
|
|
pass
|
|
|
|
|
|
def token_for_remote(endpoint: str | None) -> str | None:
|
|
if not endpoint:
|
|
return None
|
|
cfg = _load_remotes().get(endpoint) or {}
|
|
token = cfg.get("token")
|
|
return str(token) if token else None
|
|
|
|
|
|
def _remote_display_name(endpoint: str, profile_name: str, display_name: str) -> str:
|
|
host, sep, port = endpoint.rpartition(":")
|
|
remote_name = host if sep and port == "8765" else endpoint
|
|
return f"{remote_name}:{display_name or profile_name}"
|
|
|
|
|
|
def remote_browser_targets(endpoint: str, token: str | None = None) -> list[BrowserTarget]:
|
|
"""Return browser targets advertised by a single remote endpoint."""
|
|
remote_targets = send_command("browser-cli.targets", remote=endpoint, token=token)
|
|
targets: list[BrowserTarget] = []
|
|
for item in remote_targets or []:
|
|
profile = str(item.get("profile") or "default")
|
|
display = str(item.get("displayName") or profile)
|
|
targets.append(
|
|
BrowserTarget(
|
|
profile=profile,
|
|
display_name=_remote_display_name(endpoint, profile, display),
|
|
socket_path="",
|
|
remote=endpoint,
|
|
token=token,
|
|
)
|
|
)
|
|
return targets
|
|
|
|
|
|
def _remote_browser_targets() -> list[BrowserTarget]:
|
|
targets: list[BrowserTarget] = []
|
|
for endpoint, cfg in _load_remotes().items():
|
|
token = str(cfg.get("token") or "") or None
|
|
try:
|
|
targets.extend(remote_browser_targets(endpoint, token))
|
|
except (BrowserNotConnected, RuntimeError):
|
|
continue
|
|
return targets
|
|
|
|
|
|
def remote_target_for_alias(alias: str | None) -> BrowserTarget | None:
|
|
"""Resolve a user-facing remote alias such as 'host:profile' to a target."""
|
|
if not alias:
|
|
return None
|
|
targets = _remote_browser_targets()
|
|
for target in targets:
|
|
endpoint_profile = f"{target.remote}:{target.profile}" if target.remote else None
|
|
if alias in {target.display_name, endpoint_profile}:
|
|
return target
|
|
|
|
endpoint_matches = []
|
|
for target in targets:
|
|
if not target.remote:
|
|
continue
|
|
remote_host, sep, _remote_port = target.remote.rpartition(":")
|
|
if alias == target.remote or (sep and alias == remote_host):
|
|
endpoint_matches.append(target)
|
|
if len(endpoint_matches) == 1:
|
|
return endpoint_matches[0]
|
|
return None
|
|
|
|
|
|
def active_browser_targets(*, include_remotes: bool = True) -> list[BrowserTarget]:
|
|
targets: list[BrowserTarget] = []
|
|
if REGISTRY_PATH.exists():
|
|
reg = load_registry(REGISTRY_PATH)
|
|
targets.extend(
|
|
BrowserTarget(profile=profile, display_name=display_browser_name(profile, sock_path), socket_path=sock_path)
|
|
for profile, sock_path in _active_endpoints(reg).items()
|
|
)
|
|
if include_remotes:
|
|
targets.extend(_remote_browser_targets())
|
|
return targets
|
|
|
|
|
|
def _resolve_socket(profile: str | None = None) -> str:
|
|
"""Return the socket path for the given profile (or auto-detect)."""
|
|
target = profile or os.environ.get("BROWSER_CLI_PROFILE")
|
|
|
|
if target:
|
|
if REGISTRY_PATH.exists():
|
|
reg = load_registry(REGISTRY_PATH)
|
|
if target in reg:
|
|
return reg[target]
|
|
return endpoint_for_alias(target)
|
|
|
|
# Auto-detect: error when multiple browser instances are active
|
|
try:
|
|
active = active_browser_targets(include_remotes=False)
|
|
if len(active) > 1:
|
|
aliases = [target.profile for target in active]
|
|
examples = "\n".join(f" browser-cli --browser {a} ..." for a in aliases)
|
|
raise BrowserNotConnected(
|
|
f"Multiple browser instances are active: {', '.join(aliases)}\n"
|
|
f"Use --browser <alias> to select one:\n{examples}"
|
|
)
|
|
if active:
|
|
return active[0].socket_path
|
|
except BrowserNotConnected:
|
|
raise
|
|
except Exception:
|
|
pass
|
|
|
|
raise BrowserNotConnected(
|
|
"Cannot resolve a browser socket automatically.\n"
|
|
"Make sure the browser is running with the browser-cli extension enabled,\n"
|
|
"or pass --browser <alias> / set BROWSER_CLI_PROFILE to a known alias."
|
|
)
|
|
|
|
|
|
def _send_remote(endpoint: str, framed: bytes) -> bytes:
|
|
host, _, port_str = endpoint.rpartition(":")
|
|
if not host or not port_str:
|
|
raise BrowserNotConnected(f"Invalid remote endpoint '{endpoint}': expected host:port")
|
|
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
|
|
sock.connect((host, int(port_str)))
|
|
sock.sendall(framed)
|
|
return _recv_all(sock)
|
|
|
|
|
|
def _auto_route_remote(endpoint: str, token: str | None) -> str | None:
|
|
targets = remote_browser_targets(endpoint, token)
|
|
if len(targets) == 1:
|
|
return targets[0].profile
|
|
if len(targets) > 1:
|
|
aliases = [target.profile for target in targets]
|
|
examples = "\n".join(f" browser-cli --remote {endpoint} --browser {a} ..." for a in aliases)
|
|
raise BrowserNotConnected(
|
|
f"Multiple remote browser instances are active: {', '.join(aliases)}\n"
|
|
f"Use --browser <alias> to select one:\n{examples}"
|
|
)
|
|
return None
|
|
|
|
|
|
def send_command(command: str, args: dict | None = None, profile: str | None = None, remote: str | None = None, token: str | None = None) -> Any:
|
|
"""Send a command to the browser and return the response data."""
|
|
requested_profile = profile or os.environ.get("BROWSER_CLI_PROFILE")
|
|
remote_endpoint = remote or os.environ.get("BROWSER_CLI_REMOTE")
|
|
remote_alias_target = None
|
|
if not remote_endpoint and requested_profile:
|
|
remote_alias_target = remote_target_for_alias(requested_profile)
|
|
if remote_alias_target:
|
|
remote_endpoint = remote_alias_target.remote
|
|
requested_profile = remote_alias_target.profile
|
|
|
|
resolved_token = token or os.environ.get("BROWSER_CLI_TOKEN") or (remote_alias_target.token if remote_alias_target else None) or token_for_remote(remote_endpoint)
|
|
msg = {
|
|
"id": str(uuid.uuid4()),
|
|
"command": command,
|
|
"args": args or {},
|
|
}
|
|
if remote_endpoint:
|
|
if resolved_token:
|
|
msg["token"] = resolved_token
|
|
route_profile = requested_profile
|
|
if not route_profile and command != "browser-cli.targets":
|
|
route_profile = _auto_route_remote(remote_endpoint, resolved_token)
|
|
if route_profile:
|
|
msg["_route"] = route_profile
|
|
payload = json.dumps(msg).encode("utf-8")
|
|
framed = struct.pack("<I", len(payload)) + payload
|
|
|
|
try:
|
|
if remote_endpoint:
|
|
response = _send_remote(remote_endpoint, framed)
|
|
elif is_windows():
|
|
sock_path = _resolve_socket(profile)
|
|
with PipeClient(sock_path, family="AF_PIPE") as conn:
|
|
conn.send_bytes(payload)
|
|
response = conn.recv_bytes()
|
|
else:
|
|
sock_path = _resolve_socket(profile)
|
|
with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as sock:
|
|
sock.connect(sock_path)
|
|
sock.sendall(framed)
|
|
response = _recv_all(sock)
|
|
except (FileNotFoundError, ConnectionRefusedError, OSError):
|
|
if remote_endpoint:
|
|
raise BrowserNotConnected(
|
|
f"Cannot connect to remote browser at {remote_endpoint}.\n"
|
|
"Make sure browser-cli serve is running on the remote host."
|
|
)
|
|
profile_hint = f" (profile: {profile})" if profile else ""
|
|
raise BrowserNotConnected(
|
|
f"Cannot connect to browser{profile_hint}.\n"
|
|
"Make sure:\n"
|
|
" 1. The browser-cli extension is installed and enabled\n"
|
|
" 2. The native host is registered: uv run browser-cli install <browser>\n"
|
|
" 3. Your browser is running\n"
|
|
" Tip: use BROWSER_CLI_PROFILE=<name> to select a specific profile"
|
|
)
|
|
|
|
result = json.loads(response)
|
|
if not result.get("success", True):
|
|
raise RuntimeError(result.get("error", "unknown error from browser"))
|
|
return result.get("data")
|
|
|
|
|
|
def _recv_all(sock: socket.socket) -> bytes:
|
|
raw_len = _recv_exact(sock, 4)
|
|
msg_len = struct.unpack("<I", raw_len)[0]
|
|
return _recv_exact(sock, msg_len)
|
|
|
|
|
|
def _recv_exact(sock: socket.socket, n: int) -> bytes:
|
|
buf = b""
|
|
while len(buf) < n:
|
|
chunk = sock.recv(n - len(buf))
|
|
if not chunk:
|
|
raise ConnectionError("Socket closed before full message received")
|
|
buf += chunk
|
|
return buf
|