feat: Ed25519 challenge-response auth + YubiKey/SSH agent support (v0.9.0)
Testing / test (push) Successful in 26s
Package Extension / package-extension (push) Successful in 22s
Build & Publish Package / publish (push) Successful in 27s

Security:
- serve.py: server now sends nonce challenge before accepting any command;
  clients sign nonce + SHA256(canonical_payload) with Ed25519 key
- New --authorized-keys FILE option for serve; token auth still works as fallback
- Connection limit: BoundedSemaphore(64) in serve.py
- Secure file creation with os.open(..., 0o600) for token/key files
- New auth.py module: keygen, file key load/save, SSH agent protocol (pure Python),
  sign/verify helpers compatible with both file keys and agent-held keys (YubiKey,
  TPM, gpg-agent)

Features:
- YubiKey support via SSH agent protocol — no new runtime deps, just $SSH_AUTH_SOCK
- New `browser-cli auth` command group: keygen, trust, show, keys
- Global --key PATH flag (or BROWSER_CLI_KEY env) selects signing key;
  pass "agent" or "agent:<selector>" to use SSH agent key
- BrowserCLI Python API gains key= parameter

Bug fixes (11 issues across two review passes):
- client.py: check response is not None before json.loads
- native_host.py: _read_exact_stream loop handles EINTR short reads; fix Windows
  Listener leak on accept error
- __init__.py: open_wait / tabs_watch_url raise RuntimeError instead of silent None
- extension/tabs.ts: dedupe skips tabs without URL; tabsSort uses pendingUrl fallback
- extension/session.ts: removeListener before addListener prevents duplicate handlers

Breaking: TCP serve protocol now sends a challenge frame first (v0.9.0)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-05-02 16:20:39 +02:00
parent 9f03e29807
commit 4b2abbbfc5
14 changed files with 735 additions and 121 deletions
+63 -9
View File
@@ -23,6 +23,7 @@ from browser_cli.registry import load_registry
REGISTRY_PATH = registry_path()
REMOTE_REGISTRY_PATH = Path(os.environ.get("XDG_CONFIG_HOME", Path.home() / ".config")) / "browser-cli" / "remotes.json"
_DEFAULT_KEY_PATH = Path(os.environ.get("XDG_CONFIG_HOME", str(Path.home() / ".config"))) / "browser-cli" / "client.key.pem"
class BrowserNotConnected(Exception):
@@ -72,11 +73,9 @@ def save_remote_token(endpoint: str, token: str | None) -> None:
current["token"] = token
remotes[endpoint] = current
REMOTE_REGISTRY_PATH.parent.mkdir(parents=True, exist_ok=True)
REMOTE_REGISTRY_PATH.write_text(json.dumps(remotes, indent=2, sort_keys=True), encoding="utf-8")
try:
REMOTE_REGISTRY_PATH.chmod(0o600)
except OSError:
pass
fd = os.open(str(REMOTE_REGISTRY_PATH), os.O_WRONLY | os.O_CREAT | os.O_TRUNC, 0o600)
with os.fdopen(fd, "w", encoding="utf-8") as f:
f.write(json.dumps(remotes, indent=2, sort_keys=True))
def token_for_remote(endpoint: str | None) -> str | None:
@@ -193,12 +192,61 @@ def _resolve_socket(profile: str | None = None) -> str:
)
def _send_remote(endpoint: str, framed: bytes) -> bytes:
def _load_private_key(key_path: "Path | str | None" = None):
"""Load an Ed25519 signing key.
Sources (in priority order):
1. Explicit key_path / --key flag
2. BROWSER_CLI_KEY environment variable
3. Default PEM file (~/.config/browser-cli/client.key.pem)
Pass "agent" or "agent:<selector>" to use a key from the SSH agent
(works with YubiKey via gpg-agent, TPM, or regular ssh-agent).
"""
raw = str(key_path) if key_path is not None else os.environ.get("BROWSER_CLI_KEY", str(_DEFAULT_KEY_PATH))
if raw == "agent" or raw.startswith("agent:"):
selector = raw[6:] or None # "agent:cardno:..." → "cardno:..."
from browser_cli.auth import agent_find_key
return agent_find_key(selector)
path = Path(raw)
if not path.exists():
return None
try:
from browser_cli.auth import load_private_key
return load_private_key(path)
except Exception:
return None
def _send_remote(endpoint: str, msg: dict, private_key=None) -> bytes | None:
host, _, port_str = endpoint.rpartition(":")
if not host or not port_str:
raise BrowserNotConnected(f"Invalid remote endpoint '{endpoint}': expected host:port")
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.settimeout(30)
sock.connect((host, int(port_str)))
# receive challenge
challenge_raw = _recv_all(sock)
if challenge_raw is None:
raise BrowserNotConnected(f"No challenge received from {endpoint}")
try:
challenge = json.loads(challenge_raw)
nonce_hex = challenge.get("nonce") if challenge.get("type") == "challenge" else None
except (json.JSONDecodeError, AttributeError):
nonce_hex = None
if nonce_hex and private_key is not None:
from browser_cli.auth import sign, public_key_hex
nonce = bytes.fromhex(nonce_hex)
clean_msg = {k: v for k, v in msg.items() if k not in {"token", "pubkey", "sig"}}
sig = sign(private_key, nonce, clean_msg)
msg = {**clean_msg, "pubkey": public_key_hex(private_key), "sig": sig.hex()}
payload = json.dumps(msg).encode("utf-8")
framed = struct.pack("<I", len(payload)) + payload
sock.sendall(framed)
return _recv_all(sock)
@@ -217,7 +265,7 @@ def _auto_route_remote(endpoint: str, token: str | None) -> str | None:
return None
def send_command(command: str, args: dict | None = None, profile: str | None = None, remote: str | None = None, token: str | None = None) -> Any:
def send_command(command: str, args: dict | None = None, profile: str | None = None, remote: str | None = None, token: str | None = None, key: "Path | None" = None) -> Any:
"""Send a command to the browser and return the response data."""
requested_profile = profile or os.environ.get("BROWSER_CLI_PROFILE")
remote_endpoint = remote or os.environ.get("BROWSER_CLI_REMOTE")
@@ -235,19 +283,23 @@ def send_command(command: str, args: dict | None = None, profile: str | None = N
"args": args or {},
}
if remote_endpoint:
if resolved_token:
private_key = _load_private_key(key)
# use token auth only when no Ed25519 key is available
if private_key is None and resolved_token:
msg["token"] = resolved_token
route_profile = requested_profile
if not route_profile and command != "browser-cli.targets":
route_profile = _auto_route_remote(remote_endpoint, resolved_token)
if route_profile:
msg["_route"] = route_profile
else:
private_key = None
payload = json.dumps(msg).encode("utf-8")
framed = struct.pack("<I", len(payload)) + payload
try:
if remote_endpoint:
response = _send_remote(remote_endpoint, framed)
response = _send_remote(remote_endpoint, msg, private_key)
elif is_windows():
sock_path = _resolve_socket(profile)
with PipeClient(sock_path, family="AF_PIPE") as conn:
@@ -275,6 +327,8 @@ def send_command(command: str, args: dict | None = None, profile: str | None = N
" Tip: use BROWSER_CLI_PROFILE=<name> to select a specific profile"
)
if response is None:
raise ConnectionError("Connection closed before full response received")
result = json.loads(response)
if not result.get("success", True):
raise RuntimeError(result.get("error", "unknown error from browser"))