"""SSH-agent backed Ed25519 key helpers.""" from __future__ import annotations import os import socket import struct from dataclasses import dataclass from browser_cli.constants import ( SSH_AGENT_IDENTITIES_ANSWER, SSH_AGENT_SIGN_RESPONSE, SSH_AGENTC_REQUEST_IDENTITIES, SSH_AGENTC_SIGN_REQUEST, ) def pack_ssh_string(value: bytes) -> bytes: return struct.pack(">I", len(value)) + value def unpack_ssh_string(data: bytes, offset: int) -> tuple[bytes, int]: length = struct.unpack_from(">I", data, offset)[0] return data[offset + 4 : offset + 4 + length], offset + 4 + length def agent_roundtrip(msg: bytes) -> bytes: sock_path = os.environ.get("SSH_AUTH_SOCK") if not sock_path: raise RuntimeError("SSH_AUTH_SOCK not set — is gpg-agent / ssh-agent running?") with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as sock: sock.settimeout(10) sock.connect(sock_path) sock.sendall(struct.pack(">I", len(msg)) + msg) raw_len = b"" while len(raw_len) < 4: chunk = sock.recv(4 - len(raw_len)) if not chunk: raise RuntimeError("SSH agent closed connection") raw_len += chunk length = struct.unpack(">I", raw_len)[0] response = b"" while len(response) < length: chunk = sock.recv(length - len(response)) if not chunk: raise RuntimeError("SSH agent closed connection mid-response") response += chunk return response @dataclass class AgentKey: """Ed25519 key backed by an SSH agent (YubiKey, TPM, ssh-agent, gpg-agent …).""" blob: bytes comment: str @property def pubkey_bytes(self) -> bytes: _algo, offset = unpack_ssh_string(self.blob, 0) key_bytes, _ = unpack_ssh_string(self.blob, offset) return key_bytes def agent_list_keys() -> list[AgentKey]: """Return all Ed25519 keys currently held by the SSH agent.""" response = agent_roundtrip(bytes([SSH_AGENTC_REQUEST_IDENTITIES])) if response[0] != SSH_AGENT_IDENTITIES_ANSWER: raise RuntimeError(f"Unexpected agent response: {response[0]}") key_count = struct.unpack_from(">I", response, 1)[0] keys: list[AgentKey] = [] offset = 5 for _ in range(key_count): blob, offset = unpack_ssh_string(response, offset) comment, offset = unpack_ssh_string(response, offset) algo, _ = unpack_ssh_string(blob, 0) if algo == b"ssh-ed25519": keys.append(AgentKey(blob=blob, comment=comment.decode("utf-8", errors="replace"))) return keys def agent_find_key(selector: str | None = None) -> AgentKey | None: """Return the first agent Ed25519 key whose comment contains selector (or any if None).""" try: keys = agent_list_keys() except Exception: return None for key in keys: if key.comment == "(none)": continue if selector is None or selector in key.comment: return key return None def agent_sign_raw(key: AgentKey, data: bytes) -> bytes: """Ask the SSH agent to sign data and return the raw 64-byte Ed25519 signature.""" msg = ( bytes([SSH_AGENTC_SIGN_REQUEST]) + pack_ssh_string(key.blob) + pack_ssh_string(data) + struct.pack(">I", 0) ) response = agent_roundtrip(msg) if response[0] != SSH_AGENT_SIGN_RESPONSE: raise RuntimeError(f"SSH agent refused to sign (response code {response[0]})") sig_blob, _ = unpack_ssh_string(response, 1) _algo, sig_offset = unpack_ssh_string(sig_blob, 0) raw_sig, _ = unpack_ssh_string(sig_blob, sig_offset) if len(raw_sig) != 64: raise RuntimeError(f"Unexpected signature length {len(raw_sig)}") return raw_sig