4b2abbbfc5
Security: - serve.py: server now sends nonce challenge before accepting any command; clients sign nonce + SHA256(canonical_payload) with Ed25519 key - New --authorized-keys FILE option for serve; token auth still works as fallback - Connection limit: BoundedSemaphore(64) in serve.py - Secure file creation with os.open(..., 0o600) for token/key files - New auth.py module: keygen, file key load/save, SSH agent protocol (pure Python), sign/verify helpers compatible with both file keys and agent-held keys (YubiKey, TPM, gpg-agent) Features: - YubiKey support via SSH agent protocol — no new runtime deps, just $SSH_AUTH_SOCK - New `browser-cli auth` command group: keygen, trust, show, keys - Global --key PATH flag (or BROWSER_CLI_KEY env) selects signing key; pass "agent" or "agent:<selector>" to use SSH agent key - BrowserCLI Python API gains key= parameter Bug fixes (11 issues across two review passes): - client.py: check response is not None before json.loads - native_host.py: _read_exact_stream loop handles EINTR short reads; fix Windows Listener leak on accept error - __init__.py: open_wait / tabs_watch_url raise RuntimeError instead of silent None - extension/tabs.ts: dedupe skips tabs without URL; tabsSort uses pendingUrl fallback - extension/session.ts: removeListener before addListener prevents duplicate handlers Breaking: TCP serve protocol now sends a challenge frame first (v0.9.0) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
204 lines
7.4 KiB
Python
204 lines
7.4 KiB
Python
"""Ed25519 keypair management and challenge-response auth helpers."""
|
|
import hashlib
|
|
import json
|
|
import os
|
|
import secrets
|
|
import socket
|
|
import struct
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
|
|
from cryptography.exceptions import InvalidSignature
|
|
from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey, Ed25519PublicKey
|
|
from cryptography.hazmat.primitives.serialization import (
|
|
Encoding,
|
|
NoEncryption,
|
|
PrivateFormat,
|
|
PublicFormat,
|
|
load_pem_private_key,
|
|
)
|
|
|
|
_CONFIG_DIR = Path(os.environ.get("XDG_CONFIG_HOME", str(Path.home() / ".config"))) / "browser-cli"
|
|
DEFAULT_KEY_PATH = _CONFIG_DIR / "client.key.pem"
|
|
DEFAULT_AUTHORIZED_KEYS_PATH = _CONFIG_DIR / "authorized_keys"
|
|
|
|
# ── SSH agent protocol constants ───────────────────────────────────────────────
|
|
_SSH_AGENTC_REQUEST_IDENTITIES = 11
|
|
_SSH_AGENT_IDENTITIES_ANSWER = 12
|
|
_SSH_AGENTC_SIGN_REQUEST = 13
|
|
_SSH_AGENT_SIGN_RESPONSE = 14
|
|
|
|
|
|
def _pack_str(s: bytes) -> bytes:
|
|
return struct.pack(">I", len(s)) + s
|
|
|
|
|
|
def _unpack_str(data: bytes, off: int) -> tuple[bytes, int]:
|
|
n = struct.unpack_from(">I", data, off)[0]
|
|
return data[off + 4 : off + 4 + n], off + 4 + n
|
|
|
|
|
|
def _agent_roundtrip(msg: bytes) -> bytes:
|
|
sock_path = os.environ.get("SSH_AUTH_SOCK")
|
|
if not sock_path:
|
|
raise RuntimeError("SSH_AUTH_SOCK not set — is gpg-agent / ssh-agent running?")
|
|
with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as sock:
|
|
sock.settimeout(10)
|
|
sock.connect(sock_path)
|
|
sock.sendall(struct.pack(">I", len(msg)) + msg)
|
|
raw_len = b""
|
|
while len(raw_len) < 4:
|
|
chunk = sock.recv(4 - len(raw_len))
|
|
if not chunk:
|
|
raise RuntimeError("SSH agent closed connection")
|
|
raw_len += chunk
|
|
n = struct.unpack(">I", raw_len)[0]
|
|
resp = b""
|
|
while len(resp) < n:
|
|
chunk = sock.recv(n - len(resp))
|
|
if not chunk:
|
|
raise RuntimeError("SSH agent closed connection mid-response")
|
|
resp += chunk
|
|
return resp
|
|
|
|
|
|
# ── AgentKey ───────────────────────────────────────────────────────────────────
|
|
|
|
@dataclass
|
|
class AgentKey:
|
|
"""Ed25519 key backed by an SSH agent (YubiKey, TPM, ssh-agent, gpg-agent …)."""
|
|
blob: bytes
|
|
comment: str
|
|
|
|
@property
|
|
def pubkey_bytes(self) -> bytes:
|
|
_algo, off = _unpack_str(self.blob, 0)
|
|
key_bytes, _ = _unpack_str(self.blob, off)
|
|
return key_bytes
|
|
|
|
|
|
# ── Agent helpers ──────────────────────────────────────────────────────────────
|
|
|
|
def agent_list_keys() -> list[AgentKey]:
|
|
"""Return all Ed25519 keys currently held by the SSH agent."""
|
|
resp = _agent_roundtrip(bytes([_SSH_AGENTC_REQUEST_IDENTITIES]))
|
|
if resp[0] != _SSH_AGENT_IDENTITIES_ANSWER:
|
|
raise RuntimeError(f"Unexpected agent response: {resp[0]}")
|
|
n_keys = struct.unpack_from(">I", resp, 1)[0]
|
|
keys: list[AgentKey] = []
|
|
off = 5
|
|
for _ in range(n_keys):
|
|
blob, off = _unpack_str(resp, off)
|
|
comment, off = _unpack_str(resp, off)
|
|
algo, _ = _unpack_str(blob, 0)
|
|
if algo == b"ssh-ed25519":
|
|
keys.append(AgentKey(blob=blob, comment=comment.decode("utf-8", errors="replace")))
|
|
return keys
|
|
|
|
|
|
def agent_find_key(selector: str | None = None) -> AgentKey | None:
|
|
"""Return the first agent Ed25519 key whose comment contains selector (or any if None)."""
|
|
try:
|
|
keys = agent_list_keys()
|
|
except Exception:
|
|
return None
|
|
for key in keys:
|
|
if selector is None or selector in key.comment:
|
|
return key
|
|
return None
|
|
|
|
|
|
def agent_sign_raw(key: AgentKey, data: bytes) -> bytes:
|
|
"""Ask the SSH agent to sign data and return the raw 64-byte Ed25519 signature."""
|
|
msg = (
|
|
bytes([_SSH_AGENTC_SIGN_REQUEST])
|
|
+ _pack_str(key.blob)
|
|
+ _pack_str(data)
|
|
+ struct.pack(">I", 0)
|
|
)
|
|
resp = _agent_roundtrip(msg)
|
|
if resp[0] != _SSH_AGENT_SIGN_RESPONSE:
|
|
raise RuntimeError(f"SSH agent refused to sign (response code {resp[0]})")
|
|
sig_blob, _ = _unpack_str(resp, 1)
|
|
_algo, soff = _unpack_str(sig_blob, 0)
|
|
raw_sig, _ = _unpack_str(sig_blob, soff)
|
|
if len(raw_sig) != 64:
|
|
raise RuntimeError(f"Unexpected signature length {len(raw_sig)}")
|
|
return raw_sig
|
|
|
|
|
|
# ── File-based key helpers ─────────────────────────────────────────────────────
|
|
|
|
def generate_keypair() -> tuple[bytes, str]:
|
|
"""Return (private_key_pem_bytes, public_key_hex)."""
|
|
priv = Ed25519PrivateKey.generate()
|
|
pem = priv.private_bytes(Encoding.PEM, PrivateFormat.PKCS8, NoEncryption())
|
|
pub_hex = priv.public_key().public_bytes(Encoding.Raw, PublicFormat.Raw).hex()
|
|
return pem, pub_hex
|
|
|
|
|
|
def load_private_key(path: Path) -> Ed25519PrivateKey:
|
|
return load_pem_private_key(path.read_bytes(), password=None)
|
|
|
|
|
|
def public_key_hex(key: Ed25519PrivateKey | AgentKey) -> str:
|
|
if isinstance(key, AgentKey):
|
|
return key.pubkey_bytes.hex()
|
|
return key.public_key().public_bytes(Encoding.Raw, PublicFormat.Raw).hex()
|
|
|
|
|
|
# ── Canonical payload + sign/verify ───────────────────────────────────────────
|
|
|
|
def canonical_payload(msg: dict) -> bytes:
|
|
"""Deterministic JSON encoding of msg without auth fields."""
|
|
return json.dumps(
|
|
{k: v for k, v in msg.items() if k not in {"pubkey", "sig"}},
|
|
sort_keys=True,
|
|
separators=(",", ":"),
|
|
).encode("utf-8")
|
|
|
|
|
|
def sign(key: Ed25519PrivateKey | AgentKey, nonce: bytes, msg: dict) -> bytes:
|
|
"""Sign nonce + SHA256(canonical_payload(msg)) — works for both file keys and agent keys."""
|
|
data = nonce + hashlib.sha256(canonical_payload(msg)).digest()
|
|
if isinstance(key, AgentKey):
|
|
return agent_sign_raw(key, data)
|
|
return key.sign(data)
|
|
|
|
|
|
def verify(pub_hex: str, nonce: bytes, msg: dict, sig_hex: str) -> bool:
|
|
"""Return True if sig_hex is a valid Ed25519 signature over the canonical payload."""
|
|
try:
|
|
pub_bytes = bytes.fromhex(pub_hex)
|
|
pub_key = Ed25519PublicKey.from_public_bytes(pub_bytes)
|
|
message = nonce + hashlib.sha256(canonical_payload(msg)).digest()
|
|
pub_key.verify(bytes.fromhex(sig_hex), message)
|
|
return True
|
|
except (InvalidSignature, Exception):
|
|
return False
|
|
|
|
|
|
def new_nonce() -> str:
|
|
return secrets.token_hex(32)
|
|
|
|
|
|
def load_authorized_keys(path: Path) -> list[str]:
|
|
if not path.exists():
|
|
return []
|
|
return [
|
|
line.strip()
|
|
for line in path.read_text(encoding="utf-8").splitlines()
|
|
if line.strip() and not line.startswith("#")
|
|
]
|
|
|
|
|
|
def add_authorized_key(path: Path, pub_hex: str) -> bool:
|
|
"""Append pub_hex to authorized_keys. Returns False if already present."""
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
existing = set(load_authorized_keys(path))
|
|
if pub_hex in existing:
|
|
return False
|
|
with open(path, "a", encoding="utf-8") as f:
|
|
f.write(pub_hex + "\n")
|
|
return True
|