Files
browser-cli/browser_cli/auth.py
T
daniel156161 a2aa031d71 feat: auth keys shows trusted keys with names; remote auth trust/keys
- authorized_keys format extended to '<hex> [optional-name]'
- auth keys repurposed: shows server's trusted keys (Name/Public Key table)
  instead of local client keys; --remote queries the remote serve instance
- auth trust gains --name flag for labelling keys; --remote pushes the key
  to the remote server's authorized_keys
- serve.py handles browser-cli.auth.keys and browser-cli.auth.trust as
  server-side commands (authenticated, never forwarded to native host)
- serve.py reloads authorized_keys from disk on every connection so
  auth trust --remote takes effect immediately without restarting serve
- auth show unchanged: still prints your own client public key

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-02 19:54:41 +02:00

215 lines
7.9 KiB
Python

"""Ed25519 keypair management and challenge-response auth helpers."""
import hashlib
import json
import os
import secrets
import socket
import struct
from dataclasses import dataclass
from pathlib import Path
from cryptography.exceptions import InvalidSignature
from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey, Ed25519PublicKey
from cryptography.hazmat.primitives.serialization import (
Encoding,
NoEncryption,
PrivateFormat,
PublicFormat,
load_pem_private_key,
)
_CONFIG_DIR = Path(os.environ.get("XDG_CONFIG_HOME", str(Path.home() / ".config"))) / "browser-cli"
DEFAULT_KEY_PATH = _CONFIG_DIR / "client.key.pem"
DEFAULT_AUTHORIZED_KEYS_PATH = _CONFIG_DIR / "authorized_keys"
# ── SSH agent protocol constants ───────────────────────────────────────────────
_SSH_AGENTC_REQUEST_IDENTITIES = 11
_SSH_AGENT_IDENTITIES_ANSWER = 12
_SSH_AGENTC_SIGN_REQUEST = 13
_SSH_AGENT_SIGN_RESPONSE = 14
def _pack_str(s: bytes) -> bytes:
return struct.pack(">I", len(s)) + s
def _unpack_str(data: bytes, off: int) -> tuple[bytes, int]:
n = struct.unpack_from(">I", data, off)[0]
return data[off + 4 : off + 4 + n], off + 4 + n
def _agent_roundtrip(msg: bytes) -> bytes:
sock_path = os.environ.get("SSH_AUTH_SOCK")
if not sock_path:
raise RuntimeError("SSH_AUTH_SOCK not set — is gpg-agent / ssh-agent running?")
with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as sock:
sock.settimeout(10)
sock.connect(sock_path)
sock.sendall(struct.pack(">I", len(msg)) + msg)
raw_len = b""
while len(raw_len) < 4:
chunk = sock.recv(4 - len(raw_len))
if not chunk:
raise RuntimeError("SSH agent closed connection")
raw_len += chunk
n = struct.unpack(">I", raw_len)[0]
resp = b""
while len(resp) < n:
chunk = sock.recv(n - len(resp))
if not chunk:
raise RuntimeError("SSH agent closed connection mid-response")
resp += chunk
return resp
# ── AgentKey ───────────────────────────────────────────────────────────────────
@dataclass
class AgentKey:
"""Ed25519 key backed by an SSH agent (YubiKey, TPM, ssh-agent, gpg-agent …)."""
blob: bytes
comment: str
@property
def pubkey_bytes(self) -> bytes:
_algo, off = _unpack_str(self.blob, 0)
key_bytes, _ = _unpack_str(self.blob, off)
return key_bytes
# ── Agent helpers ──────────────────────────────────────────────────────────────
def agent_list_keys() -> list[AgentKey]:
"""Return all Ed25519 keys currently held by the SSH agent."""
resp = _agent_roundtrip(bytes([_SSH_AGENTC_REQUEST_IDENTITIES]))
if resp[0] != _SSH_AGENT_IDENTITIES_ANSWER:
raise RuntimeError(f"Unexpected agent response: {resp[0]}")
n_keys = struct.unpack_from(">I", resp, 1)[0]
keys: list[AgentKey] = []
off = 5
for _ in range(n_keys):
blob, off = _unpack_str(resp, off)
comment, off = _unpack_str(resp, off)
algo, _ = _unpack_str(blob, 0)
if algo == b"ssh-ed25519":
keys.append(AgentKey(blob=blob, comment=comment.decode("utf-8", errors="replace")))
return keys
def agent_find_key(selector: str | None = None) -> AgentKey | None:
"""Return the first agent Ed25519 key whose comment contains selector (or any if None)."""
try:
keys = agent_list_keys()
except Exception:
return None
for key in keys:
if selector is None or selector in key.comment:
return key
return None
def agent_sign_raw(key: AgentKey, data: bytes) -> bytes:
"""Ask the SSH agent to sign data and return the raw 64-byte Ed25519 signature."""
msg = (
bytes([_SSH_AGENTC_SIGN_REQUEST])
+ _pack_str(key.blob)
+ _pack_str(data)
+ struct.pack(">I", 0)
)
resp = _agent_roundtrip(msg)
if resp[0] != _SSH_AGENT_SIGN_RESPONSE:
raise RuntimeError(f"SSH agent refused to sign (response code {resp[0]})")
sig_blob, _ = _unpack_str(resp, 1)
_algo, soff = _unpack_str(sig_blob, 0)
raw_sig, _ = _unpack_str(sig_blob, soff)
if len(raw_sig) != 64:
raise RuntimeError(f"Unexpected signature length {len(raw_sig)}")
return raw_sig
# ── File-based key helpers ─────────────────────────────────────────────────────
def generate_keypair() -> tuple[bytes, str]:
"""Return (private_key_pem_bytes, public_key_hex)."""
priv = Ed25519PrivateKey.generate()
pem = priv.private_bytes(Encoding.PEM, PrivateFormat.PKCS8, NoEncryption())
pub_hex = priv.public_key().public_bytes(Encoding.Raw, PublicFormat.Raw).hex()
return pem, pub_hex
def load_private_key(path: Path) -> Ed25519PrivateKey:
return load_pem_private_key(path.read_bytes(), password=None)
def public_key_hex(key: Ed25519PrivateKey | AgentKey) -> str:
if isinstance(key, AgentKey):
return key.pubkey_bytes.hex()
return key.public_key().public_bytes(Encoding.Raw, PublicFormat.Raw).hex()
# ── Canonical payload + sign/verify ───────────────────────────────────────────
def canonical_payload(msg: dict) -> bytes:
"""Deterministic JSON encoding of msg without auth fields."""
return json.dumps(
{k: v for k, v in msg.items() if k not in {"pubkey", "sig"}},
sort_keys=True,
separators=(",", ":"),
).encode("utf-8")
def sign(key: Ed25519PrivateKey | AgentKey, nonce: bytes, msg: dict) -> bytes:
"""Sign nonce + SHA256(canonical_payload(msg)) — works for both file keys and agent keys."""
data = nonce + hashlib.sha256(canonical_payload(msg)).digest()
if isinstance(key, AgentKey):
return agent_sign_raw(key, data)
return key.sign(data)
def verify(pub_hex: str, nonce: bytes, msg: dict, sig_hex: str) -> bool:
"""Return True if sig_hex is a valid Ed25519 signature over the canonical payload."""
try:
pub_bytes = bytes.fromhex(pub_hex)
pub_key = Ed25519PublicKey.from_public_bytes(pub_bytes)
message = nonce + hashlib.sha256(canonical_payload(msg)).digest()
pub_key.verify(bytes.fromhex(sig_hex), message)
return True
except (InvalidSignature, Exception):
return False
def new_nonce() -> str:
return secrets.token_hex(32)
def load_authorized_keys_with_names(path: Path) -> list[tuple[str, str]]:
"""Return list of (pubkey_hex, name) pairs. Name is empty string if not set."""
if not path.exists():
return []
result = []
for line in path.read_text(encoding="utf-8").splitlines():
line = line.strip()
if not line or line.startswith("#"):
continue
parts = line.split(None, 1)
pubkey = parts[0]
name = parts[1].strip() if len(parts) > 1 else ""
result.append((pubkey, name))
return result
def load_authorized_keys(path: Path) -> list[str]:
return [pk for pk, _ in load_authorized_keys_with_names(path)]
def add_authorized_key(path: Path, pub_hex: str, name: str = "") -> bool:
"""Append pub_hex to authorized_keys. Returns False if already present."""
path.parent.mkdir(parents=True, exist_ok=True)
existing = {pk for pk, _ in load_authorized_keys_with_names(path)}
if pub_hex in existing:
return False
line = (f"{pub_hex} {name}".rstrip()) + "\n"
with open(path, "a", encoding="utf-8") as f:
f.write(line)
return True