a2aa031d71
- authorized_keys format extended to '<hex> [optional-name]' - auth keys repurposed: shows server's trusted keys (Name/Public Key table) instead of local client keys; --remote queries the remote serve instance - auth trust gains --name flag for labelling keys; --remote pushes the key to the remote server's authorized_keys - serve.py handles browser-cli.auth.keys and browser-cli.auth.trust as server-side commands (authenticated, never forwarded to native host) - serve.py reloads authorized_keys from disk on every connection so auth trust --remote takes effect immediately without restarting serve - auth show unchanged: still prints your own client public key Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
215 lines
7.9 KiB
Python
215 lines
7.9 KiB
Python
"""Ed25519 keypair management and challenge-response auth helpers."""
|
|
import hashlib
|
|
import json
|
|
import os
|
|
import secrets
|
|
import socket
|
|
import struct
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
|
|
from cryptography.exceptions import InvalidSignature
|
|
from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey, Ed25519PublicKey
|
|
from cryptography.hazmat.primitives.serialization import (
|
|
Encoding,
|
|
NoEncryption,
|
|
PrivateFormat,
|
|
PublicFormat,
|
|
load_pem_private_key,
|
|
)
|
|
|
|
_CONFIG_DIR = Path(os.environ.get("XDG_CONFIG_HOME", str(Path.home() / ".config"))) / "browser-cli"
|
|
DEFAULT_KEY_PATH = _CONFIG_DIR / "client.key.pem"
|
|
DEFAULT_AUTHORIZED_KEYS_PATH = _CONFIG_DIR / "authorized_keys"
|
|
|
|
# ── SSH agent protocol constants ───────────────────────────────────────────────
|
|
_SSH_AGENTC_REQUEST_IDENTITIES = 11
|
|
_SSH_AGENT_IDENTITIES_ANSWER = 12
|
|
_SSH_AGENTC_SIGN_REQUEST = 13
|
|
_SSH_AGENT_SIGN_RESPONSE = 14
|
|
|
|
|
|
def _pack_str(s: bytes) -> bytes:
|
|
return struct.pack(">I", len(s)) + s
|
|
|
|
|
|
def _unpack_str(data: bytes, off: int) -> tuple[bytes, int]:
|
|
n = struct.unpack_from(">I", data, off)[0]
|
|
return data[off + 4 : off + 4 + n], off + 4 + n
|
|
|
|
|
|
def _agent_roundtrip(msg: bytes) -> bytes:
|
|
sock_path = os.environ.get("SSH_AUTH_SOCK")
|
|
if not sock_path:
|
|
raise RuntimeError("SSH_AUTH_SOCK not set — is gpg-agent / ssh-agent running?")
|
|
with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as sock:
|
|
sock.settimeout(10)
|
|
sock.connect(sock_path)
|
|
sock.sendall(struct.pack(">I", len(msg)) + msg)
|
|
raw_len = b""
|
|
while len(raw_len) < 4:
|
|
chunk = sock.recv(4 - len(raw_len))
|
|
if not chunk:
|
|
raise RuntimeError("SSH agent closed connection")
|
|
raw_len += chunk
|
|
n = struct.unpack(">I", raw_len)[0]
|
|
resp = b""
|
|
while len(resp) < n:
|
|
chunk = sock.recv(n - len(resp))
|
|
if not chunk:
|
|
raise RuntimeError("SSH agent closed connection mid-response")
|
|
resp += chunk
|
|
return resp
|
|
|
|
|
|
# ── AgentKey ───────────────────────────────────────────────────────────────────
|
|
|
|
@dataclass
|
|
class AgentKey:
|
|
"""Ed25519 key backed by an SSH agent (YubiKey, TPM, ssh-agent, gpg-agent …)."""
|
|
blob: bytes
|
|
comment: str
|
|
|
|
@property
|
|
def pubkey_bytes(self) -> bytes:
|
|
_algo, off = _unpack_str(self.blob, 0)
|
|
key_bytes, _ = _unpack_str(self.blob, off)
|
|
return key_bytes
|
|
|
|
|
|
# ── Agent helpers ──────────────────────────────────────────────────────────────
|
|
|
|
def agent_list_keys() -> list[AgentKey]:
|
|
"""Return all Ed25519 keys currently held by the SSH agent."""
|
|
resp = _agent_roundtrip(bytes([_SSH_AGENTC_REQUEST_IDENTITIES]))
|
|
if resp[0] != _SSH_AGENT_IDENTITIES_ANSWER:
|
|
raise RuntimeError(f"Unexpected agent response: {resp[0]}")
|
|
n_keys = struct.unpack_from(">I", resp, 1)[0]
|
|
keys: list[AgentKey] = []
|
|
off = 5
|
|
for _ in range(n_keys):
|
|
blob, off = _unpack_str(resp, off)
|
|
comment, off = _unpack_str(resp, off)
|
|
algo, _ = _unpack_str(blob, 0)
|
|
if algo == b"ssh-ed25519":
|
|
keys.append(AgentKey(blob=blob, comment=comment.decode("utf-8", errors="replace")))
|
|
return keys
|
|
|
|
|
|
def agent_find_key(selector: str | None = None) -> AgentKey | None:
|
|
"""Return the first agent Ed25519 key whose comment contains selector (or any if None)."""
|
|
try:
|
|
keys = agent_list_keys()
|
|
except Exception:
|
|
return None
|
|
for key in keys:
|
|
if selector is None or selector in key.comment:
|
|
return key
|
|
return None
|
|
|
|
|
|
def agent_sign_raw(key: AgentKey, data: bytes) -> bytes:
|
|
"""Ask the SSH agent to sign data and return the raw 64-byte Ed25519 signature."""
|
|
msg = (
|
|
bytes([_SSH_AGENTC_SIGN_REQUEST])
|
|
+ _pack_str(key.blob)
|
|
+ _pack_str(data)
|
|
+ struct.pack(">I", 0)
|
|
)
|
|
resp = _agent_roundtrip(msg)
|
|
if resp[0] != _SSH_AGENT_SIGN_RESPONSE:
|
|
raise RuntimeError(f"SSH agent refused to sign (response code {resp[0]})")
|
|
sig_blob, _ = _unpack_str(resp, 1)
|
|
_algo, soff = _unpack_str(sig_blob, 0)
|
|
raw_sig, _ = _unpack_str(sig_blob, soff)
|
|
if len(raw_sig) != 64:
|
|
raise RuntimeError(f"Unexpected signature length {len(raw_sig)}")
|
|
return raw_sig
|
|
|
|
|
|
# ── File-based key helpers ─────────────────────────────────────────────────────
|
|
|
|
def generate_keypair() -> tuple[bytes, str]:
|
|
"""Return (private_key_pem_bytes, public_key_hex)."""
|
|
priv = Ed25519PrivateKey.generate()
|
|
pem = priv.private_bytes(Encoding.PEM, PrivateFormat.PKCS8, NoEncryption())
|
|
pub_hex = priv.public_key().public_bytes(Encoding.Raw, PublicFormat.Raw).hex()
|
|
return pem, pub_hex
|
|
|
|
|
|
def load_private_key(path: Path) -> Ed25519PrivateKey:
|
|
return load_pem_private_key(path.read_bytes(), password=None)
|
|
|
|
|
|
def public_key_hex(key: Ed25519PrivateKey | AgentKey) -> str:
|
|
if isinstance(key, AgentKey):
|
|
return key.pubkey_bytes.hex()
|
|
return key.public_key().public_bytes(Encoding.Raw, PublicFormat.Raw).hex()
|
|
|
|
|
|
# ── Canonical payload + sign/verify ───────────────────────────────────────────
|
|
|
|
def canonical_payload(msg: dict) -> bytes:
|
|
"""Deterministic JSON encoding of msg without auth fields."""
|
|
return json.dumps(
|
|
{k: v for k, v in msg.items() if k not in {"pubkey", "sig"}},
|
|
sort_keys=True,
|
|
separators=(",", ":"),
|
|
).encode("utf-8")
|
|
|
|
|
|
def sign(key: Ed25519PrivateKey | AgentKey, nonce: bytes, msg: dict) -> bytes:
|
|
"""Sign nonce + SHA256(canonical_payload(msg)) — works for both file keys and agent keys."""
|
|
data = nonce + hashlib.sha256(canonical_payload(msg)).digest()
|
|
if isinstance(key, AgentKey):
|
|
return agent_sign_raw(key, data)
|
|
return key.sign(data)
|
|
|
|
|
|
def verify(pub_hex: str, nonce: bytes, msg: dict, sig_hex: str) -> bool:
|
|
"""Return True if sig_hex is a valid Ed25519 signature over the canonical payload."""
|
|
try:
|
|
pub_bytes = bytes.fromhex(pub_hex)
|
|
pub_key = Ed25519PublicKey.from_public_bytes(pub_bytes)
|
|
message = nonce + hashlib.sha256(canonical_payload(msg)).digest()
|
|
pub_key.verify(bytes.fromhex(sig_hex), message)
|
|
return True
|
|
except (InvalidSignature, Exception):
|
|
return False
|
|
|
|
|
|
def new_nonce() -> str:
|
|
return secrets.token_hex(32)
|
|
|
|
|
|
def load_authorized_keys_with_names(path: Path) -> list[tuple[str, str]]:
|
|
"""Return list of (pubkey_hex, name) pairs. Name is empty string if not set."""
|
|
if not path.exists():
|
|
return []
|
|
result = []
|
|
for line in path.read_text(encoding="utf-8").splitlines():
|
|
line = line.strip()
|
|
if not line or line.startswith("#"):
|
|
continue
|
|
parts = line.split(None, 1)
|
|
pubkey = parts[0]
|
|
name = parts[1].strip() if len(parts) > 1 else ""
|
|
result.append((pubkey, name))
|
|
return result
|
|
|
|
|
|
def load_authorized_keys(path: Path) -> list[str]:
|
|
return [pk for pk, _ in load_authorized_keys_with_names(path)]
|
|
|
|
|
|
def add_authorized_key(path: Path, pub_hex: str, name: str = "") -> bool:
|
|
"""Append pub_hex to authorized_keys. Returns False if already present."""
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
existing = {pk for pk, _ in load_authorized_keys_with_names(path)}
|
|
if pub_hex in existing:
|
|
return False
|
|
line = (f"{pub_hex} {name}".rstrip()) + "\n"
|
|
with open(path, "a", encoding="utf-8") as f:
|
|
f.write(line)
|
|
return True
|