Files
browser-cli/browser_cli/auth.py
T
daniel156161 4b2abbbfc5
Testing / test (push) Successful in 26s
Package Extension / package-extension (push) Successful in 22s
Build & Publish Package / publish (push) Successful in 27s
feat: Ed25519 challenge-response auth + YubiKey/SSH agent support (v0.9.0)
Security:
- serve.py: server now sends nonce challenge before accepting any command;
  clients sign nonce + SHA256(canonical_payload) with Ed25519 key
- New --authorized-keys FILE option for serve; token auth still works as fallback
- Connection limit: BoundedSemaphore(64) in serve.py
- Secure file creation with os.open(..., 0o600) for token/key files
- New auth.py module: keygen, file key load/save, SSH agent protocol (pure Python),
  sign/verify helpers compatible with both file keys and agent-held keys (YubiKey,
  TPM, gpg-agent)

Features:
- YubiKey support via SSH agent protocol — no new runtime deps, just $SSH_AUTH_SOCK
- New `browser-cli auth` command group: keygen, trust, show, keys
- Global --key PATH flag (or BROWSER_CLI_KEY env) selects signing key;
  pass "agent" or "agent:<selector>" to use SSH agent key
- BrowserCLI Python API gains key= parameter

Bug fixes (11 issues across two review passes):
- client.py: check response is not None before json.loads
- native_host.py: _read_exact_stream loop handles EINTR short reads; fix Windows
  Listener leak on accept error
- __init__.py: open_wait / tabs_watch_url raise RuntimeError instead of silent None
- extension/tabs.ts: dedupe skips tabs without URL; tabsSort uses pendingUrl fallback
- extension/session.ts: removeListener before addListener prevents duplicate handlers

Breaking: TCP serve protocol now sends a challenge frame first (v0.9.0)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-02 16:20:39 +02:00

204 lines
7.4 KiB
Python

"""Ed25519 keypair management and challenge-response auth helpers."""
import hashlib
import json
import os
import secrets
import socket
import struct
from dataclasses import dataclass
from pathlib import Path
from cryptography.exceptions import InvalidSignature
from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey, Ed25519PublicKey
from cryptography.hazmat.primitives.serialization import (
Encoding,
NoEncryption,
PrivateFormat,
PublicFormat,
load_pem_private_key,
)
_CONFIG_DIR = Path(os.environ.get("XDG_CONFIG_HOME", str(Path.home() / ".config"))) / "browser-cli"
DEFAULT_KEY_PATH = _CONFIG_DIR / "client.key.pem"
DEFAULT_AUTHORIZED_KEYS_PATH = _CONFIG_DIR / "authorized_keys"
# ── SSH agent protocol constants ───────────────────────────────────────────────
_SSH_AGENTC_REQUEST_IDENTITIES = 11
_SSH_AGENT_IDENTITIES_ANSWER = 12
_SSH_AGENTC_SIGN_REQUEST = 13
_SSH_AGENT_SIGN_RESPONSE = 14
def _pack_str(s: bytes) -> bytes:
return struct.pack(">I", len(s)) + s
def _unpack_str(data: bytes, off: int) -> tuple[bytes, int]:
n = struct.unpack_from(">I", data, off)[0]
return data[off + 4 : off + 4 + n], off + 4 + n
def _agent_roundtrip(msg: bytes) -> bytes:
sock_path = os.environ.get("SSH_AUTH_SOCK")
if not sock_path:
raise RuntimeError("SSH_AUTH_SOCK not set — is gpg-agent / ssh-agent running?")
with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as sock:
sock.settimeout(10)
sock.connect(sock_path)
sock.sendall(struct.pack(">I", len(msg)) + msg)
raw_len = b""
while len(raw_len) < 4:
chunk = sock.recv(4 - len(raw_len))
if not chunk:
raise RuntimeError("SSH agent closed connection")
raw_len += chunk
n = struct.unpack(">I", raw_len)[0]
resp = b""
while len(resp) < n:
chunk = sock.recv(n - len(resp))
if not chunk:
raise RuntimeError("SSH agent closed connection mid-response")
resp += chunk
return resp
# ── AgentKey ───────────────────────────────────────────────────────────────────
@dataclass
class AgentKey:
"""Ed25519 key backed by an SSH agent (YubiKey, TPM, ssh-agent, gpg-agent …)."""
blob: bytes
comment: str
@property
def pubkey_bytes(self) -> bytes:
_algo, off = _unpack_str(self.blob, 0)
key_bytes, _ = _unpack_str(self.blob, off)
return key_bytes
# ── Agent helpers ──────────────────────────────────────────────────────────────
def agent_list_keys() -> list[AgentKey]:
"""Return all Ed25519 keys currently held by the SSH agent."""
resp = _agent_roundtrip(bytes([_SSH_AGENTC_REQUEST_IDENTITIES]))
if resp[0] != _SSH_AGENT_IDENTITIES_ANSWER:
raise RuntimeError(f"Unexpected agent response: {resp[0]}")
n_keys = struct.unpack_from(">I", resp, 1)[0]
keys: list[AgentKey] = []
off = 5
for _ in range(n_keys):
blob, off = _unpack_str(resp, off)
comment, off = _unpack_str(resp, off)
algo, _ = _unpack_str(blob, 0)
if algo == b"ssh-ed25519":
keys.append(AgentKey(blob=blob, comment=comment.decode("utf-8", errors="replace")))
return keys
def agent_find_key(selector: str | None = None) -> AgentKey | None:
"""Return the first agent Ed25519 key whose comment contains selector (or any if None)."""
try:
keys = agent_list_keys()
except Exception:
return None
for key in keys:
if selector is None or selector in key.comment:
return key
return None
def agent_sign_raw(key: AgentKey, data: bytes) -> bytes:
"""Ask the SSH agent to sign data and return the raw 64-byte Ed25519 signature."""
msg = (
bytes([_SSH_AGENTC_SIGN_REQUEST])
+ _pack_str(key.blob)
+ _pack_str(data)
+ struct.pack(">I", 0)
)
resp = _agent_roundtrip(msg)
if resp[0] != _SSH_AGENT_SIGN_RESPONSE:
raise RuntimeError(f"SSH agent refused to sign (response code {resp[0]})")
sig_blob, _ = _unpack_str(resp, 1)
_algo, soff = _unpack_str(sig_blob, 0)
raw_sig, _ = _unpack_str(sig_blob, soff)
if len(raw_sig) != 64:
raise RuntimeError(f"Unexpected signature length {len(raw_sig)}")
return raw_sig
# ── File-based key helpers ─────────────────────────────────────────────────────
def generate_keypair() -> tuple[bytes, str]:
"""Return (private_key_pem_bytes, public_key_hex)."""
priv = Ed25519PrivateKey.generate()
pem = priv.private_bytes(Encoding.PEM, PrivateFormat.PKCS8, NoEncryption())
pub_hex = priv.public_key().public_bytes(Encoding.Raw, PublicFormat.Raw).hex()
return pem, pub_hex
def load_private_key(path: Path) -> Ed25519PrivateKey:
return load_pem_private_key(path.read_bytes(), password=None)
def public_key_hex(key: Ed25519PrivateKey | AgentKey) -> str:
if isinstance(key, AgentKey):
return key.pubkey_bytes.hex()
return key.public_key().public_bytes(Encoding.Raw, PublicFormat.Raw).hex()
# ── Canonical payload + sign/verify ───────────────────────────────────────────
def canonical_payload(msg: dict) -> bytes:
"""Deterministic JSON encoding of msg without auth fields."""
return json.dumps(
{k: v for k, v in msg.items() if k not in {"pubkey", "sig"}},
sort_keys=True,
separators=(",", ":"),
).encode("utf-8")
def sign(key: Ed25519PrivateKey | AgentKey, nonce: bytes, msg: dict) -> bytes:
"""Sign nonce + SHA256(canonical_payload(msg)) — works for both file keys and agent keys."""
data = nonce + hashlib.sha256(canonical_payload(msg)).digest()
if isinstance(key, AgentKey):
return agent_sign_raw(key, data)
return key.sign(data)
def verify(pub_hex: str, nonce: bytes, msg: dict, sig_hex: str) -> bool:
"""Return True if sig_hex is a valid Ed25519 signature over the canonical payload."""
try:
pub_bytes = bytes.fromhex(pub_hex)
pub_key = Ed25519PublicKey.from_public_bytes(pub_bytes)
message = nonce + hashlib.sha256(canonical_payload(msg)).digest()
pub_key.verify(bytes.fromhex(sig_hex), message)
return True
except (InvalidSignature, Exception):
return False
def new_nonce() -> str:
return secrets.token_hex(32)
def load_authorized_keys(path: Path) -> list[str]:
if not path.exists():
return []
return [
line.strip()
for line in path.read_text(encoding="utf-8").splitlines()
if line.strip() and not line.startswith("#")
]
def add_authorized_key(path: Path, pub_hex: str) -> bool:
"""Append pub_hex to authorized_keys. Returns False if already present."""
path.parent.mkdir(parents=True, exist_ok=True)
existing = set(load_authorized_keys(path))
if pub_hex in existing:
return False
with open(path, "a", encoding="utf-8") as f:
f.write(pub_hex + "\n")
return True