7cb2a8b618
Testing / remote-protocol-compat (0.9.5) (push) Successful in 1m4s
Testing / test (push) Successful in 1m22s
Testing / remote-protocol-compat (0.9.3) (push) Successful in 1m7s
Package Extension / package-extension (push) Successful in 1m1s
Build & Publish Package / publish (push) Successful in 1m5s
- Split auth into focused package modules for agent keys, file keys, signing, and post-quantum transport helpers while keeping the public browser_cli.auth import surface intact. - Move transport encoding internals into a package with separate codec and binary-hoisting helpers, preserving browser_cli.transport compatibility. - Extract remote TCP auth/socket helpers and serve challenge setup out of the runtime paths to make connection handling easier to reason about. - Move the extension markdown extractor into a dedicated content/markdown folder with separate root selection, code normalization, renderer, and utils. - Centralize CLI Rich rendering helpers for tab/window tree and table output, and add rendering tests for the shared builders. - Remove local typing ignores in SDK/decorator/script plumbing and bump the package and extension version to 0.15.3.
104 lines
3.5 KiB
Python
104 lines
3.5 KiB
Python
"""SSH-agent backed Ed25519 key helpers."""
|
|
from __future__ import annotations
|
|
|
|
import os
|
|
import socket
|
|
import struct
|
|
from dataclasses import dataclass
|
|
|
|
from browser_cli.constants import (
|
|
SSH_AGENT_IDENTITIES_ANSWER,
|
|
SSH_AGENT_SIGN_RESPONSE,
|
|
SSH_AGENTC_REQUEST_IDENTITIES,
|
|
SSH_AGENTC_SIGN_REQUEST,
|
|
)
|
|
|
|
def pack_ssh_string(value: bytes) -> bytes:
|
|
return struct.pack(">I", len(value)) + value
|
|
|
|
def unpack_ssh_string(data: bytes, offset: int) -> tuple[bytes, int]:
|
|
length = struct.unpack_from(">I", data, offset)[0]
|
|
return data[offset + 4 : offset + 4 + length], offset + 4 + length
|
|
|
|
def agent_roundtrip(msg: bytes) -> bytes:
|
|
sock_path = os.environ.get("SSH_AUTH_SOCK")
|
|
if not sock_path:
|
|
raise RuntimeError("SSH_AUTH_SOCK not set — is gpg-agent / ssh-agent running?")
|
|
with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as sock:
|
|
sock.settimeout(10)
|
|
sock.connect(sock_path)
|
|
sock.sendall(struct.pack(">I", len(msg)) + msg)
|
|
raw_len = b""
|
|
while len(raw_len) < 4:
|
|
chunk = sock.recv(4 - len(raw_len))
|
|
if not chunk:
|
|
raise RuntimeError("SSH agent closed connection")
|
|
raw_len += chunk
|
|
length = struct.unpack(">I", raw_len)[0]
|
|
response = b""
|
|
while len(response) < length:
|
|
chunk = sock.recv(length - len(response))
|
|
if not chunk:
|
|
raise RuntimeError("SSH agent closed connection mid-response")
|
|
response += chunk
|
|
return response
|
|
|
|
@dataclass
|
|
class AgentKey:
|
|
"""Ed25519 key backed by an SSH agent (YubiKey, TPM, ssh-agent, gpg-agent …)."""
|
|
blob: bytes
|
|
comment: str
|
|
|
|
@property
|
|
def pubkey_bytes(self) -> bytes:
|
|
_algo, offset = unpack_ssh_string(self.blob, 0)
|
|
key_bytes, _ = unpack_ssh_string(self.blob, offset)
|
|
return key_bytes
|
|
|
|
def agent_list_keys() -> list[AgentKey]:
|
|
"""Return all Ed25519 keys currently held by the SSH agent."""
|
|
response = agent_roundtrip(bytes([SSH_AGENTC_REQUEST_IDENTITIES]))
|
|
if response[0] != SSH_AGENT_IDENTITIES_ANSWER:
|
|
raise RuntimeError(f"Unexpected agent response: {response[0]}")
|
|
key_count = struct.unpack_from(">I", response, 1)[0]
|
|
keys: list[AgentKey] = []
|
|
offset = 5
|
|
for _ in range(key_count):
|
|
blob, offset = unpack_ssh_string(response, offset)
|
|
comment, offset = unpack_ssh_string(response, offset)
|
|
algo, _ = unpack_ssh_string(blob, 0)
|
|
if algo == b"ssh-ed25519":
|
|
keys.append(AgentKey(blob=blob, comment=comment.decode("utf-8", errors="replace")))
|
|
return keys
|
|
|
|
def agent_find_key(selector: str | None = None) -> AgentKey | None:
|
|
"""Return the first agent Ed25519 key whose comment contains selector (or any if None)."""
|
|
try:
|
|
keys = agent_list_keys()
|
|
except Exception:
|
|
return None
|
|
for key in keys:
|
|
if key.comment == "(none)":
|
|
continue
|
|
if selector is None or selector in key.comment:
|
|
return key
|
|
return None
|
|
|
|
def agent_sign_raw(key: AgentKey, data: bytes) -> bytes:
|
|
"""Ask the SSH agent to sign data and return the raw 64-byte Ed25519 signature."""
|
|
msg = (
|
|
bytes([SSH_AGENTC_SIGN_REQUEST])
|
|
+ pack_ssh_string(key.blob)
|
|
+ pack_ssh_string(data)
|
|
+ struct.pack(">I", 0)
|
|
)
|
|
response = agent_roundtrip(msg)
|
|
if response[0] != SSH_AGENT_SIGN_RESPONSE:
|
|
raise RuntimeError(f"SSH agent refused to sign (response code {response[0]})")
|
|
sig_blob, _ = unpack_ssh_string(response, 1)
|
|
_algo, sig_offset = unpack_ssh_string(sig_blob, 0)
|
|
raw_sig, _ = unpack_ssh_string(sig_blob, sig_offset)
|
|
if len(raw_sig) != 64:
|
|
raise RuntimeError(f"Unexpected signature length {len(raw_sig)}")
|
|
return raw_sig
|