diff --git a/.gitignore b/.gitignore index 0c4efd6..2e16b8a 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,6 @@ # TypeScript / Node extension/background.js +extension/content-dispatch.js extension/test-dist/ node_modules/ dist/ diff --git a/README.md b/README.md index c6563b8..1ee5a52 100644 --- a/README.md +++ b/README.md @@ -67,7 +67,7 @@ The `install` command will: After install, **fully restart your browser** (Quit and reopen — not just close the window). The extension will connect to the native host automatically on startup. -Only the `browser-cli` command needs to be on your `PATH`. The browser launches the native host wrapper directly from its absolute path in the native messaging manifest, and that wrapper points to the internally installed `native_host.py` copy. On Windows the install command also registers the host in the current user's Registry for the selected browser. +Only the `browser-cli` command needs to be on your `PATH`. The browser launches the native host wrapper directly from its absolute path in the native messaging manifest, and that wrapper imports the installed `browser_cli.native.host` entry point. On Windows the install command also registers the host in the current user's Registry for the selected browser. --- @@ -78,9 +78,19 @@ browser-cli/ ├── browser_cli/ │ ├── __init__.py # Python SDK — BrowserCLI class and SDK entry point │ ├── cli.py # Click CLI entry point -│ ├── client.py # Local IPC client used by CLI and SDK +│ ├── client/ # Client-side command routing used by CLI and SDK +│ │ ├── core.py # send_command and remote command routing +│ │ ├── targets.py # Browser target discovery and socket resolution +│ │ ├── auth.py # Remote auth fields and key lookup +│ │ └── messages.py # Request/response helpers │ ├── models.py # Tab and Group helper models -│ ├── native_host.py # Native messaging host launched by the browser +│ ├── native/ # Native messaging host internals +│ │ ├── host.py # Browser-launched native host entry point +│ │ ├── local_server.py # Local CLI IPC server +│ │ └── protocol.py # Chrome Native Messaging framing +│ ├── remote/ # Client-side remote browser support +│ │ ├── transport.py # TCP/TLS remote transport +│ │ └── registry.py # Saved remote endpoints/keys │ └── commands/ │ ├── navigate.py # nav open/reload/back/forward/focus │ ├── search.py # search engine shortcuts @@ -94,7 +104,8 @@ browser-cli/ │ ├── manifest.json # MV3 extension manifest │ ├── content.js # Content-script helpers │ └── src/ # TypeScript source split by command area -│ └── index.ts # Builds generated extension/background.js +│ ├── index.ts # Builds generated extension/background.js +│ └── content/ # Builds generated extension/content-dispatch.js ├── examples/ │ ├── demo.py # Python SDK walkthrough │ └── demo.sh # Bash CLI walkthrough @@ -285,12 +296,12 @@ browser-cli completion zsh --script # output raw completion script ## Python SDK ```python -from browser_cli import BrowserCLI +from browser_cli import AsyncBrowserCLI, BrowserCLI b = BrowserCLI() ``` -Commands are grouped into namespaces on the client (`b.tabs`, `b.dom`, `b.session`, ...). Each call blocks until the browser responds and returns the data directly as a Python object. +Commands are grouped into namespaces on the client (`b.tabs`, `b.dom`, `b.session`, ...). Each sync call blocks until the browser responds and returns the data directly as a Python object. For asyncio programs, `AsyncBrowserCLI` exposes the same namespaces as native awaitable methods over async Unix/TCP transport. ```python # Navigation ── b.nav @@ -385,6 +396,35 @@ b.perf.status() b.perf.set_profile("gentle") b.extension.reload() +# Workflow decorators ── b.decorators +@b.decorators.new_tab("https://example.com", wait=True, close=True) +def scrape(*, tab): + return b.extract.markdown("article") + +@b.decorators.wait_for_selector("#ready", visible=True) +def run_after_page_ready(): + return b.dom.text("#ready") + +@b.decorators.performance_profile("ultra") +def restore_big_session(): + return b.session.load("work", lazy=True) + +@b.decorators.retry(times=3, delay=1) +@b.decorators.save_session_before("before-risky-step") +def risky_workflow(): + b.tabs.close_duplicates() + +# Async SDK: same namespaces, native awaitable methods +async def async_example(): + ab = AsyncBrowserCLI() + tabs = await ab.tabs.list() + + @ab.decorators.new_tab("https://example.com", wait=True, close=True) + async def scrape(*, tab): + return await ab.extract.markdown("article") + + return tabs, await scrape() + # Misc clients = b.clients() raw = b.command("tabs.count", {"pattern": "github"}) # escape hatch for raw commands @@ -436,7 +476,7 @@ bash examples/demo.sh ```sh npm ci -npm run check:extension # type-check, build extension/background.js, syntax-check bundle +npm run check:extension # type-check, build extension bundles, syntax-check bundle uv run pytest -q ``` @@ -447,7 +487,7 @@ nix-shell # automatically runs npm ci when node_modules is missing/outdated npm run check:extension ``` -The extension source lives in `extension/src/`. `extension/background.js` is generated and ignored by git. Run `npm run build:extension` before using `Load unpacked` with `extension/`. On NixOS, use `nix-shell` first if npm is not installed globally. +The extension source lives in `extension/src/`. `extension/background.js` and `extension/content-dispatch.js` are generated and ignored by git. Run `npm run build:extension` before using `Load unpacked` with `extension/`. On NixOS, use `nix-shell` first if npm is not installed globally. --- diff --git a/browser_cli/__init__.py b/browser_cli/__init__.py index d79847c..d3798e4 100644 --- a/browser_cli/__init__.py +++ b/browser_cli/__init__.py @@ -2,126 +2,166 @@ browser_cli — Python SDK for controlling your running browser. Usage: - from browser_cli import BrowserCLI - b = BrowserCLI() + from browser_cli import BrowserCLI + b = BrowserCLI() - tabs = b.tabs.list() # list[Tab] - tabs[0].close() - tabs[0].move(forward=True) + tabs = b.tabs.list() # list[Tab] + tabs[0].close() + tabs[0].move(forward=True) - groups = b.groups.list() # list[Group] - groups[0].tabs() - groups[0].add_tab("https://example.com") + groups = b.groups.list() # list[Group] + groups[0].tabs() + groups[0].add_tab("https://example.com") - b.nav.open("https://example.com") - b.dom.click("#submit") - b.session.save("work") + b.nav.open("https://example.com") + b.dom.click("#submit") + b.session.save("work") - # When multiple browser instances are active, pass the alias: - b = BrowserCLI(browser="brave") + # When multiple browser instances are active, pass the alias: + b = BrowserCLI(browser="brave") Commands are grouped into namespaces on the client: - b.nav navigation (open, reload, back, forward, focus, search) - b.tabs tabs (list, open, close, move, status, mute, sort, ...) - b.groups tab groups (list, create, add_tab, move, close) - b.windows browser windows (list, open, close, rename) - b.dom page elements (query, click, type, wait_for, eval, ...) - b.extract content extraction (links, images, text, json, markdown) - b.page page info - b.storage localStorage / sessionStorage - b.cookies cookies (list, get, set) - b.session sessions (save, load, list, diff, ...) - b.perf performance profile + background jobs - b.extension control the extension itself + b.nav navigation (open, reload, back, forward, focus, search) + b.tabs tabs (list, open, close, move, status, mute, sort, ...) + b.groups tab groups (list, create, add_tab, move, close) + b.windows browser windows (list, open, close, rename) + b.dom page elements (query, click, type, wait_for, eval, ...) + b.extract content extraction (links, images, text, json, markdown) + b.page page info + b.storage localStorage / sessionStorage + b.cookies cookies (list, get, set) + b.session sessions (save, load, list, diff, ...) + b.perf performance profile + background jobs + b.extension control the extension itself + b.decorators workflow decorators for scripts """ -from browser_cli.client import BrowserNotConnected, active_browser_targets, remote_browser_targets, send_command +from collections.abc import Callable + +from browser_cli.client import active_browser_targets, remote_browser_targets, send_command, send_command_async +from browser_cli.errors import BrowserNotConnected from browser_cli.models import BrowserCounts, Group, Tab from browser_cli.sdk import ( - CookiesNS, - DomNS, - ExtensionNS, - ExtractNS, - GroupsNS, - NavigationNS, - PageNS, - PerfNS, - SessionNS, - StorageNS, - TabsNS, - WindowsNS, + CookiesNS, + DecoratorsNS, + DomNS, + ExtensionNS, + ExtractNS, + GroupsNS, + NAMESPACE_SPECS, + NavigationNS, + PageNS, + PerfNS, + SessionNS, + StorageNS, + TabsNS, + WindowsNS, ) from browser_cli.sdk.factories import FactoryMixin from browser_cli.sdk.routing import RoutingMixin -__all__ = ["BrowserCLI", "BrowserCounts", "BrowserNotConnected", "Tab", "Group"] +from browser_cli.async_sdk import AsyncBrowserCLI + +__all__ = ["BrowserCLI", "AsyncBrowserCLI", "BrowserCounts", "BrowserNotConnected", "Tab", "Group"] class BrowserCLI(FactoryMixin, RoutingMixin): - """Client for a running browser, with commands grouped into namespaces. + """Client for a running browser, with commands grouped into namespaces. - The client itself holds the connection target (browser/remote/key) and the - shared machinery; the actual commands live on namespace accessors such as - :attr:`tabs`, :attr:`dom`, and :attr:`session`. Object construction - (``Tab``/``Group``) comes from :class:`~browser_cli.sdk.factories.FactoryMixin` - and multi-browser fan-out from :class:`~browser_cli.sdk.routing.RoutingMixin`. + The client itself holds the connection target (browser/remote/key) and the + shared machinery; the actual commands live on namespace accessors such as + :attr:`tabs`, :attr:`dom`, and :attr:`session`. Object construction + (``Tab``/``Group``) comes from :class:`~browser_cli.sdk.factories.FactoryMixin` + and multi-browser fan-out from :class:`~browser_cli.sdk.routing.RoutingMixin`. + """ + + _browser: str | None + _remote: str | None + _key: str | None + _command_sender: Callable + nav: NavigationNS + tabs: TabsNS + groups: GroupsNS + windows: WindowsNS + dom: DomNS + extract: ExtractNS + page: PageNS + storage: StorageNS + cookies: CookiesNS + session: SessionNS + perf: PerfNS + extension: ExtensionNS + decorators: DecoratorsNS + + def __init__( + self, + browser: str | None = None, + remote: str | None = None, + key: str | None = None, + *, + _command_sender=None, + ): """ + Args: + browser: Profile alias to target. Required when multiple browser + instances are active. Equivalent to ``--browser`` on the CLI. + remote: Connect to a remote browser exposed via ``browser-cli serve``. + Format: ``"host:port"`` (e.g. ``"browser-host.example:8765"``). + Can be combined with ``browser`` to route to a specific + remote profile. + key: Path to Ed25519 private key PEM for pubkey auth, or ``"agent"`` + to use a key from the SSH agent (YubiKey, gpg-agent, etc.). + Defaults to ``~/.config/browser-cli/client.key.pem`` if that file exists. + """ + self._browser = browser + self._remote = remote + self._key = key if key else None + self._command_sender = _command_sender or send_command - def __init__(self, browser: str | None = None, remote: str | None = None, key: str | None = None): - """ - Args: - browser: Profile alias to target. Required when multiple browser - instances are active. Equivalent to ``--browser`` on the CLI. - remote: Connect to a remote browser exposed via ``browser-cli serve``. - Format: ``"host:port"`` (e.g. ``"192.168.1.10:8765"``). - Can be combined with ``browser`` to route to a specific - remote profile. - key: Path to Ed25519 private key PEM for pubkey auth, or ``"agent"`` - to use a key from the SSH agent (YubiKey, gpg-agent, etc.). - Defaults to ``~/.config/browser-cli/client.key.pem`` if that file exists. - """ - self._browser = browser - self._remote = remote - self._key = key if key else None + for name, namespace_type in NAMESPACE_SPECS: + setattr(self, name, namespace_type(self)) + self.decorators = DecoratorsNS(self) - # Command namespaces. - self.nav = NavigationNS(self) - self.tabs = TabsNS(self) - self.groups = GroupsNS(self) - self.windows = WindowsNS(self) - self.dom = DomNS(self) - self.extract = ExtractNS(self) - self.page = PageNS(self) - self.storage = StorageNS(self) - self.cookies = CookiesNS(self) - self.session = SessionNS(self) - self.perf = PerfNS(self) - self.extension = ExtensionNS(self) + @property + def browser(self) -> str | None: + """Target browser/profile alias, equivalent to ``--browser``.""" + return self._browser - @property - def browser(self) -> str | None: - """Target browser/profile alias, equivalent to ``--browser``.""" - return self._browser + @property + def remote(self) -> str | None: + """Remote endpoint used by this client, if any.""" + return self._remote - @property - def remote(self) -> str | None: - """Remote endpoint used by this client, if any.""" - return self._remote + @property + def key(self) -> str | None: + """Ed25519 key spec used for remote auth, if explicitly configured.""" + return self._key - @property - def key(self) -> str | None: - """Ed25519 key spec used for remote auth, if explicitly configured.""" - return self._key + def dispatch(self, command: str, args: dict | None = None): + """Dispatch one browser command using this client's target settings.""" + return self._command_sender(command, args, profile=self._browser, remote=self._remote, key=self._key) - def _cmd(self, command: str, args: dict | None = None): - return send_command(command, args, profile=self._browser, remote=self._remote, key=self._key) + def require_tab(self, data, error: str): + """Convert a tab-like command response into a bound Tab.""" + return self.require_tab_response(data, error) - def command(self, command: str, args: dict | None = None): - """Send a raw browser-cli command and return its response. + _FIELD_MISSING = object() - This is the SDK escape hatch for commands that do not have a dedicated - namespace method yet. - """ - return self._cmd(command, args or {}) + def field(self, result, key, default=None, *, fallback=_FIELD_MISSING): + """Read a named field from command output.""" + if fallback is self._FIELD_MISSING: + return self._field(result, key, default) + return self._field(result, key, default, fallback=fallback) - def clients(self) -> list[dict]: - """Return the active browser clients known to this connection.""" - return self._cmd("clients.list", {}) + def _cmd(self, command: str, args: dict | None = None): + return self.dispatch(command, args) + + def command(self, command: str, args: dict | None = None): + """Send a raw browser-cli command and return its response. + + This is the SDK escape hatch for commands that do not have a dedicated + namespace method yet. + """ + return self._cmd(command, args or {}) + + def clients(self) -> list[dict]: + """Return the active browser clients known to this connection.""" + return self._cmd("clients.list", {}) diff --git a/browser_cli/async_sdk.py b/browser_cli/async_sdk.py new file mode 100644 index 0000000..049b77b --- /dev/null +++ b/browser_cli/async_sdk.py @@ -0,0 +1,235 @@ +"""Async browser-cli Python SDK. + +The async SDK intentionally reuses the synchronous SDK namespaces instead of +copying every command method. Each async namespace is a thin adapter that runs +the corresponding sync SDK method in a worker thread, while a private command +sender dispatches commands through ``send_command_async``. That keeps command +strings, argument shapes, result mapping, and bound model creation in one place: +``browser_cli.sdk``. +""" +from __future__ import annotations + +import asyncio +import functools +from collections.abc import Callable +from typing import TypeVar + +from browser_cli.models import Group, Tab +from browser_cli.sdk import NAMESPACE_NAMES +from browser_cli.sdk.workflow_decorators import WorkflowDecoratorsMixin, _NO_INJECT + +F = TypeVar("F", bound=Callable) + +class AsyncNamespaceAdapter: + """Async wrapper around one synchronous SDK namespace.""" + + def __init__(self, sync_namespace): + self._sync = sync_namespace + + def __getattr__(self, name: str): + value = getattr(self._sync, name) + if not callable(value): + return value + + @functools.wraps(value) + async def wrapper(*args, **kwargs): + return await asyncio.to_thread(value, *args, **kwargs) + + return wrapper + +class AsyncDecoratorsNS(WorkflowDecoratorsMixin): + """Async workflow decorators for :class:`AsyncBrowserCLI`. + + The public decorator methods are inherited from ``WorkflowDecoratorsMixin``; + only the execution strategy differs: every wrapper is async and awaits both + browser calls and async user functions. + """ + + def __init__(self, client: "AsyncBrowserCLI"): + self._c = client + + @staticmethod + async def _maybe_await(value): + if hasattr(value, "__await__"): + return await value + return value + + def _value_decorator( + self, + func: F | None, + get_value: Callable, + *, + keyword: str | None | object = "tab", + cleanup: Callable | None = None, + ): + def decorator(fn: F) -> F: + @functools.wraps(fn) + async def wrapper(*args, **kwargs): + value = await get_value() + try: + extra_args = () + if keyword is not _NO_INJECT: + extra_args, kwargs = self._inject(kwargs, keyword, value) + return await self._maybe_await(fn(*extra_args, *args, **kwargs)) + finally: + if cleanup is not None: + await self._maybe_await(cleanup(value)) + return wrapper # type: ignore[return-value] + return decorator(func) if func is not None else decorator + + def new_tab( + self, + url: str, + *, + wait: bool = False, + timeout: float = 30.0, + background: bool = False, + window: str | None = None, + group: str | None = None, + close: bool = False, + keyword: str | None = "tab", + ): + def open_tab(): + return self._c.tabs.open( + url, + wait=wait, + timeout=timeout, + background=background, + window=window, + group=group, + ) + + async def close_tab(tab): + await self._c.tabs.close(tab.id) + + return self._value_decorator(None, open_tab, keyword=keyword, cleanup=close_tab if close else None) + + def performance_profile(self, profile: str, *, restore: bool = True): + def decorator(fn: F) -> F: + @functools.wraps(fn) + async def wrapper(*args, **kwargs): + previous = (await self._c.perf.status()).get("performanceProfile") if restore else None + await self._c.perf.set_profile(profile) + try: + return await self._maybe_await(fn(*args, **kwargs)) + finally: + if previous: + await self._c.perf.set_profile(previous) + return wrapper # type: ignore[return-value] + return decorator + + def retry( + self, + *, + times: int = 3, + delay: float = 0.0, + exceptions: tuple[type[BaseException], ...] = (Exception,), + ): + attempts = max(1, times) + + def decorator(fn: F) -> F: + @functools.wraps(fn) + async def wrapper(*args, **kwargs): + last_error = None + for attempt in range(attempts): + try: + return await self._maybe_await(fn(*args, **kwargs)) + except exceptions as exc: + last_error = exc + if attempt == attempts - 1: + raise + if delay > 0: + await asyncio.sleep(delay) + raise last_error # type: ignore[misc] + return wrapper # type: ignore[return-value] + return decorator + +class AsyncBrowserCLI: + """Async client for a running browser. + + Namespace methods are awaitable mirrors of :class:`browser_cli.BrowserCLI`. + """ + + _NAMESPACES = NAMESPACE_NAMES + + def __init__(self, browser: str | None = None, remote: str | None = None, key: str | None = None): + from browser_cli import BrowserCLI + + self._browser = browser + self._remote = remote + self._key = key if key else None + self._sync = BrowserCLI(browser=browser, remote=remote, key=key, _command_sender=self._blocking_async_cmd) + + for name in self._NAMESPACES: + setattr(self, name, AsyncNamespaceAdapter(getattr(self._sync, name))) + self.decorators = AsyncDecoratorsNS(self) + + @property + def browser(self) -> str | None: + return self._browser + + @property + def remote(self) -> str | None: + return self._remote + + @property + def key(self) -> str | None: + return self._key + + def _blocking_async_cmd( + self, + command: str, + args: dict | None = None, + *, + profile: str | None = None, + remote: str | None = None, + key: str | None = None, + ): + """Run the native async transport from a worker thread. + + Async namespace methods execute sync SDK logic in ``asyncio.to_thread``. + Inside that worker thread, the sync SDK's injected command sender lands + here and uses the async transport implementation without blocking the + caller's event loop. + """ + return asyncio.run(self._cmd(command, args, profile=profile, remote=remote, key=key)) + + async def _cmd( + self, + command: str, + args: dict | None = None, + *, + profile: str | None = None, + remote: str | None = None, + key: str | None = None, + ): + from browser_cli.client import send_command_async + return await send_command_async( + command, + args, + profile=self._browser if profile is None else profile, + remote=self._remote if remote is None else remote, + key=self._key if key is None else key, + ) + + async def command(self, command: str, args: dict | None = None): + return await self._cmd(command, args or {}) + + async def clients(self) -> list[dict]: + return await self._cmd("clients.list", {}) + + def tab_from(self, data: dict, *, browser_profile: str | None = None, browser_name: str | None = None, browser_remote: str | None = None) -> Tab: + return self._sync.tab_from( + data, + browser_profile=browser_profile, + browser_name=browser_name, + browser_remote=browser_remote, + ) + + def group_from(self, data: dict, *, browser_profile: str | None = None, browser_name: str | None = None, browser_remote: str | None = None) -> Group: + return self._sync.group_from( + data, + browser_profile=browser_profile, + browser_name=browser_name, + browser_remote=browser_remote, + ) diff --git a/browser_cli/auth.py b/browser_cli/auth.py index 8cc2812..494599b 100644 --- a/browser_cli/auth.py +++ b/browser_cli/auth.py @@ -14,277 +14,250 @@ from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives.ciphers.aead import ChaCha20Poly1305 from cryptography.hazmat.primitives.kdf.hkdf import HKDF from cryptography.hazmat.primitives.serialization import ( - Encoding, - NoEncryption, - PrivateFormat, - PublicFormat, - load_pem_private_key, + Encoding, + NoEncryption, + PrivateFormat, + PublicFormat, + load_pem_private_key, ) -_CONFIG_DIR = Path(os.environ.get("XDG_CONFIG_HOME", str(Path.home() / ".config"))) / "browser-cli" -DEFAULT_KEY_PATH = _CONFIG_DIR / "client.key.pem" -DEFAULT_AUTHORIZED_KEYS_PATH = _CONFIG_DIR / "authorized_keys" - -# ── SSH agent protocol constants ─────────────────────────────────────────────── -_SSH_AGENTC_REQUEST_IDENTITIES = 11 -_SSH_AGENT_IDENTITIES_ANSWER = 12 -_SSH_AGENTC_SIGN_REQUEST = 13 -_SSH_AGENT_SIGN_RESPONSE = 14 - +from browser_cli.constants import ( + DEFAULT_AUTHORIZED_KEYS_PATH, + DEFAULT_KEY_PATH, + PQ_KEX_ALG, + PQ_TRANSPORT_ALG, + SSH_AGENT_IDENTITIES_ANSWER, + SSH_AGENT_SIGN_RESPONSE, + SSH_AGENTC_REQUEST_IDENTITIES, + SSH_AGENTC_SIGN_REQUEST, +) def _pack_str(s: bytes) -> bytes: - return struct.pack(">I", len(s)) + s - + return struct.pack(">I", len(s)) + s def _unpack_str(data: bytes, off: int) -> tuple[bytes, int]: - n = struct.unpack_from(">I", data, off)[0] - return data[off + 4 : off + 4 + n], off + 4 + n - + n = struct.unpack_from(">I", data, off)[0] + return data[off + 4 : off + 4 + n], off + 4 + n def _agent_roundtrip(msg: bytes) -> bytes: - sock_path = os.environ.get("SSH_AUTH_SOCK") - if not sock_path: - raise RuntimeError("SSH_AUTH_SOCK not set — is gpg-agent / ssh-agent running?") - with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as sock: - sock.settimeout(10) - sock.connect(sock_path) - sock.sendall(struct.pack(">I", len(msg)) + msg) - raw_len = b"" - while len(raw_len) < 4: - chunk = sock.recv(4 - len(raw_len)) - if not chunk: - raise RuntimeError("SSH agent closed connection") - raw_len += chunk - n = struct.unpack(">I", raw_len)[0] - resp = b"" - while len(resp) < n: - chunk = sock.recv(n - len(resp)) - if not chunk: - raise RuntimeError("SSH agent closed connection mid-response") - resp += chunk - return resp - + sock_path = os.environ.get("SSH_AUTH_SOCK") + if not sock_path: + raise RuntimeError("SSH_AUTH_SOCK not set — is gpg-agent / ssh-agent running?") + with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as sock: + sock.settimeout(10) + sock.connect(sock_path) + sock.sendall(struct.pack(">I", len(msg)) + msg) + raw_len = b"" + while len(raw_len) < 4: + chunk = sock.recv(4 - len(raw_len)) + if not chunk: + raise RuntimeError("SSH agent closed connection") + raw_len += chunk + n = struct.unpack(">I", raw_len)[0] + resp = b"" + while len(resp) < n: + chunk = sock.recv(n - len(resp)) + if not chunk: + raise RuntimeError("SSH agent closed connection mid-response") + resp += chunk + return resp # ── AgentKey ─────────────────────────────────────────────────────────────────── @dataclass class AgentKey: - """Ed25519 key backed by an SSH agent (YubiKey, TPM, ssh-agent, gpg-agent …).""" - blob: bytes - comment: str - - @property - def pubkey_bytes(self) -> bytes: - _algo, off = _unpack_str(self.blob, 0) - key_bytes, _ = _unpack_str(self.blob, off) - return key_bytes + """Ed25519 key backed by an SSH agent (YubiKey, TPM, ssh-agent, gpg-agent …).""" + blob: bytes + comment: str + @property + def pubkey_bytes(self) -> bytes: + _algo, off = _unpack_str(self.blob, 0) + key_bytes, _ = _unpack_str(self.blob, off) + return key_bytes # ── Agent helpers ────────────────────────────────────────────────────────────── def agent_list_keys() -> list[AgentKey]: - """Return all Ed25519 keys currently held by the SSH agent.""" - resp = _agent_roundtrip(bytes([_SSH_AGENTC_REQUEST_IDENTITIES])) - if resp[0] != _SSH_AGENT_IDENTITIES_ANSWER: - raise RuntimeError(f"Unexpected agent response: {resp[0]}") - n_keys = struct.unpack_from(">I", resp, 1)[0] - keys: list[AgentKey] = [] - off = 5 - for _ in range(n_keys): - blob, off = _unpack_str(resp, off) - comment, off = _unpack_str(resp, off) - algo, _ = _unpack_str(blob, 0) - if algo == b"ssh-ed25519": - keys.append(AgentKey(blob=blob, comment=comment.decode("utf-8", errors="replace"))) - return keys - + """Return all Ed25519 keys currently held by the SSH agent.""" + resp = _agent_roundtrip(bytes([SSH_AGENTC_REQUEST_IDENTITIES])) + if resp[0] != SSH_AGENT_IDENTITIES_ANSWER: + raise RuntimeError(f"Unexpected agent response: {resp[0]}") + n_keys = struct.unpack_from(">I", resp, 1)[0] + keys: list[AgentKey] = [] + off = 5 + for _ in range(n_keys): + blob, off = _unpack_str(resp, off) + comment, off = _unpack_str(resp, off) + algo, _ = _unpack_str(blob, 0) + if algo == b"ssh-ed25519": + keys.append(AgentKey(blob=blob, comment=comment.decode("utf-8", errors="replace"))) + return keys def agent_find_key(selector: str | None = None) -> AgentKey | None: - """Return the first agent Ed25519 key whose comment contains selector (or any if None).""" - try: - keys = agent_list_keys() - except Exception: - return None - for key in keys: - if key.comment == "(none)": - continue - if selector is None or selector in key.comment: - return key + """Return the first agent Ed25519 key whose comment contains selector (or any if None).""" + try: + keys = agent_list_keys() + except Exception: return None - + for key in keys: + if key.comment == "(none)": + continue + if selector is None or selector in key.comment: + return key + return None def agent_sign_raw(key: AgentKey, data: bytes) -> bytes: - """Ask the SSH agent to sign data and return the raw 64-byte Ed25519 signature.""" - msg = ( - bytes([_SSH_AGENTC_SIGN_REQUEST]) - + _pack_str(key.blob) - + _pack_str(data) - + struct.pack(">I", 0) - ) - resp = _agent_roundtrip(msg) - if resp[0] != _SSH_AGENT_SIGN_RESPONSE: - raise RuntimeError(f"SSH agent refused to sign (response code {resp[0]})") - sig_blob, _ = _unpack_str(resp, 1) - _algo, soff = _unpack_str(sig_blob, 0) - raw_sig, _ = _unpack_str(sig_blob, soff) - if len(raw_sig) != 64: - raise RuntimeError(f"Unexpected signature length {len(raw_sig)}") - return raw_sig - + """Ask the SSH agent to sign data and return the raw 64-byte Ed25519 signature.""" + msg = ( + bytes([SSH_AGENTC_SIGN_REQUEST]) + + _pack_str(key.blob) + + _pack_str(data) + + struct.pack(">I", 0) + ) + resp = _agent_roundtrip(msg) + if resp[0] != SSH_AGENT_SIGN_RESPONSE: + raise RuntimeError(f"SSH agent refused to sign (response code {resp[0]})") + sig_blob, _ = _unpack_str(resp, 1) + _algo, soff = _unpack_str(sig_blob, 0) + raw_sig, _ = _unpack_str(sig_blob, soff) + if len(raw_sig) != 64: + raise RuntimeError(f"Unexpected signature length {len(raw_sig)}") + return raw_sig # ── File-based key helpers ───────────────────────────────────────────────────── def generate_keypair() -> tuple[bytes, str]: - """Return (private_key_pem_bytes, public_key_hex).""" - priv = Ed25519PrivateKey.generate() - pem = priv.private_bytes(Encoding.PEM, PrivateFormat.PKCS8, NoEncryption()) - pub_hex = priv.public_key().public_bytes(Encoding.Raw, PublicFormat.Raw).hex() - return pem, pub_hex - + """Return (private_key_pem_bytes, public_key_hex).""" + priv = Ed25519PrivateKey.generate() + pem = priv.private_bytes(Encoding.PEM, PrivateFormat.PKCS8, NoEncryption()) + pub_hex = priv.public_key().public_bytes(Encoding.Raw, PublicFormat.Raw).hex() + return pem, pub_hex def load_private_key(path: Path) -> Ed25519PrivateKey: - return load_pem_private_key(path.read_bytes(), password=None) - + return load_pem_private_key(path.read_bytes(), password=None) def public_key_hex(key: Ed25519PrivateKey | AgentKey) -> str: - if isinstance(key, AgentKey): - return key.pubkey_bytes.hex() - return key.public_key().public_bytes(Encoding.Raw, PublicFormat.Raw).hex() - + if isinstance(key, AgentKey): + return key.pubkey_bytes.hex() + return key.public_key().public_bytes(Encoding.Raw, PublicFormat.Raw).hex() # ── Canonical payload + sign/verify ─────────────────────────────────────────── def canonical_payload(msg: dict) -> bytes: - """Deterministic JSON encoding of msg without auth protocol fields.""" - return json.dumps( - {k: v for k, v in msg.items() if k not in {"pubkey", "sig", "pq_kex"}}, - sort_keys=True, - separators=(",", ":"), - ).encode("utf-8") - + """Deterministic JSON encoding of msg without auth protocol fields.""" + return json.dumps( + {k: v for k, v in msg.items() if k not in {"pubkey", "sig", "pq_kex"}}, + sort_keys=True, + separators=(",", ":"), + ).encode("utf-8") def _auth_message(nonce: bytes, msg: dict, pq_shared_secret: bytes | None = None) -> bytes: - """Bytes signed for auth; optionally binds a post-quantum KEX secret.""" - data = nonce + hashlib.sha256(canonical_payload(msg)).digest() - if pq_shared_secret is not None: - data += hashlib.sha256(b"browser-cli ml-kem-768 v1" + pq_shared_secret).digest() - return data - + """Bytes signed for auth; optionally binds a post-quantum KEX secret.""" + data = nonce + hashlib.sha256(canonical_payload(msg)).digest() + if pq_shared_secret is not None: + data += hashlib.sha256(b"browser-cli ml-kem-768 v1" + pq_shared_secret).digest() + return data def sign(key: Ed25519PrivateKey | AgentKey, nonce: bytes, msg: dict, pq_shared_secret: bytes | None = None) -> bytes: - """Sign nonce + payload hash, optionally bound to an ML-KEM shared secret.""" - data = _auth_message(nonce, msg, pq_shared_secret) - if isinstance(key, AgentKey): - return agent_sign_raw(key, data) - return key.sign(data) - + """Sign nonce + payload hash, optionally bound to an ML-KEM shared secret.""" + data = _auth_message(nonce, msg, pq_shared_secret) + if isinstance(key, AgentKey): + return agent_sign_raw(key, data) + return key.sign(data) def verify(pub_hex: str, nonce: bytes, msg: dict, sig_hex: str, pq_shared_secret: bytes | None = None) -> bool: - """Return True if sig_hex is a valid signature over the canonical payload/auth secret.""" - try: - pub_bytes = bytes.fromhex(pub_hex) - pub_key = Ed25519PublicKey.from_public_bytes(pub_bytes) - pub_key.verify(bytes.fromhex(sig_hex), _auth_message(nonce, msg, pq_shared_secret)) - return True - except (InvalidSignature, ValueError): - return False - + """Return True if sig_hex is a valid signature over the canonical payload/auth secret.""" + try: + pub_bytes = bytes.fromhex(pub_hex) + pub_key = Ed25519PublicKey.from_public_bytes(pub_bytes) + pub_key.verify(bytes.fromhex(sig_hex), _auth_message(nonce, msg, pq_shared_secret)) + return True + except (InvalidSignature, ValueError): + return False # ── Post-quantum key exchange (ML-KEM / Kyber) ──────────────────────────────── -PQ_KEX_ALG = "ML-KEM-768" -PQ_TRANSPORT_ALG = "ML-KEM-768+ChaCha20Poly1305" - - def pq_kex_server_keypair(): - """Return an ephemeral ML-KEM-768 private key and raw public key bytes. - - Returns ``None`` when the installed cryptography/OpenSSL backend does not - support ML-KEM yet. The serve/client protocol treats this as graceful - downgrade instead of breaking local installs on older OpenSSL builds. - """ - try: - from cryptography.hazmat.primitives.asymmetric import mlkem - priv = mlkem.MLKEM768PrivateKey.generate() - pub = priv.public_key().public_bytes_raw() - return priv, pub - except Exception: - return None + """Return an ephemeral ML-KEM-768 private key and raw public key bytes. + Returns ``None`` when the installed cryptography/OpenSSL backend does not + support ML-KEM yet. The serve/client protocol treats this as graceful + downgrade instead of breaking local installs on older OpenSSL builds. + """ + try: + from cryptography.hazmat.primitives.asymmetric import mlkem + priv = mlkem.MLKEM768PrivateKey.generate() + pub = priv.public_key().public_bytes_raw() + return priv, pub + except Exception: + return None def pq_kex_client_encapsulate(public_key_hex: str) -> tuple[str, bytes]: - """Encapsulate to a server ML-KEM public key. Returns (ciphertext_hex, secret).""" - from cryptography.hazmat.primitives.asymmetric import mlkem - pub = mlkem.MLKEM768PublicKey.from_public_bytes(bytes.fromhex(public_key_hex)) - shared_secret, ciphertext = pub.encapsulate() - return ciphertext.hex(), shared_secret - + """Encapsulate to a server ML-KEM public key. Returns (ciphertext_hex, secret).""" + from cryptography.hazmat.primitives.asymmetric import mlkem + pub = mlkem.MLKEM768PublicKey.from_public_bytes(bytes.fromhex(public_key_hex)) + shared_secret, ciphertext = pub.encapsulate() + return ciphertext.hex(), shared_secret def pq_kex_server_decapsulate(private_key, ciphertext_hex: str) -> bytes: - """Decapsulate a client ML-KEM ciphertext and return the shared secret.""" - return private_key.decapsulate(bytes.fromhex(ciphertext_hex)) - + """Decapsulate a client ML-KEM ciphertext and return the shared secret.""" + return private_key.decapsulate(bytes.fromhex(ciphertext_hex)) def _pq_transport_key(shared_secret: bytes, direction: str) -> bytes: - return HKDF( - algorithm=hashes.SHA256(), - length=32, - salt=None, - info=f"browser-cli pq transport v1 {direction}".encode("ascii"), - ).derive(shared_secret) - + return HKDF( + algorithm=hashes.SHA256(), + length=32, + salt=None, + info=f"browser-cli pq transport v1 {direction}".encode("ascii"), + ).derive(shared_secret) def pq_encrypt(shared_secret: bytes, direction: str, plaintext: bytes) -> dict: - """Encrypt an app-layer frame with a key derived from the ML-KEM secret.""" - nonce = secrets.token_bytes(12) - key = _pq_transport_key(shared_secret, direction) - ciphertext = ChaCha20Poly1305(key).encrypt(nonce, plaintext, None) - return {"alg": PQ_TRANSPORT_ALG, "nonce": nonce.hex(), "ciphertext": ciphertext.hex()} - + """Encrypt an app-layer frame with a key derived from the ML-KEM secret.""" + nonce = secrets.token_bytes(12) + key = _pq_transport_key(shared_secret, direction) + ciphertext = ChaCha20Poly1305(key).encrypt(nonce, plaintext, None) + return {"alg": PQ_TRANSPORT_ALG, "nonce": nonce.hex(), "ciphertext": ciphertext.hex()} def pq_decrypt(shared_secret: bytes, direction: str, envelope: dict) -> bytes: - """Decrypt an app-layer frame produced by pq_encrypt().""" - if not isinstance(envelope, dict) or envelope.get("alg") != PQ_TRANSPORT_ALG: - raise ValueError("unsupported encrypted transport envelope") - key = _pq_transport_key(shared_secret, direction) - return ChaCha20Poly1305(key).decrypt( - bytes.fromhex(str(envelope["nonce"])), - bytes.fromhex(str(envelope["ciphertext"])), - None, - ) - + """Decrypt an app-layer frame produced by pq_encrypt().""" + if not isinstance(envelope, dict) or envelope.get("alg") != PQ_TRANSPORT_ALG: + raise ValueError("unsupported encrypted transport envelope") + key = _pq_transport_key(shared_secret, direction) + return ChaCha20Poly1305(key).decrypt( + bytes.fromhex(str(envelope["nonce"])), + bytes.fromhex(str(envelope["ciphertext"])), + None, + ) def new_nonce() -> str: - return secrets.token_hex(32) - + return secrets.token_hex(32) def load_authorized_keys_with_names(path: Path) -> list[tuple[str, str]]: - """Return list of (pubkey_hex, name) pairs. Name is empty string if not set.""" - if not path.exists(): - return [] - result = [] - for line in path.read_text(encoding="utf-8").splitlines(): - line = line.strip() - if not line or line.startswith("#"): - continue - parts = line.split(None, 1) - pubkey = parts[0] - name = parts[1].strip() if len(parts) > 1 else "" - result.append((pubkey, name)) - return result - + """Return list of (pubkey_hex, name) pairs. Name is empty string if not set.""" + if not path.exists(): + return [] + result = [] + for line in path.read_text(encoding="utf-8").splitlines(): + line = line.strip() + if not line or line.startswith("#"): + continue + parts = line.split(None, 1) + pubkey = parts[0] + name = parts[1].strip() if len(parts) > 1 else "" + result.append((pubkey, name)) + return result def load_authorized_keys(path: Path) -> list[str]: - return [pk for pk, _ in load_authorized_keys_with_names(path)] - + return [pk for pk, _ in load_authorized_keys_with_names(path)] def add_authorized_key(path: Path, pub_hex: str, name: str = "") -> bool: - """Append pub_hex to authorized_keys. Returns False if already present.""" - path.parent.mkdir(parents=True, exist_ok=True) - existing = {pk for pk, _ in load_authorized_keys_with_names(path)} - if pub_hex in existing: - return False - line = (f"{pub_hex} {name}".rstrip()) + "\n" - with open(path, "a", encoding="utf-8") as f: - f.write(line) - return True + """Append pub_hex to authorized_keys. Returns False if already present.""" + path.parent.mkdir(parents=True, exist_ok=True) + existing = {pk for pk, _ in load_authorized_keys_with_names(path)} + if pub_hex in existing: + return False + line = (f"{pub_hex} {name}".rstrip()) + "\n" + with open(path, "a", encoding="utf-8") as f: + f.write(line) + return True diff --git a/browser_cli/cli.py b/browser_cli/cli.py index 4848d49..734589a 100755 --- a/browser_cli/cli.py +++ b/browser_cli/cli.py @@ -3,9 +3,7 @@ browser-cli — Control your running browser from the terminal. """ import click -import sys import os -import json import shutil import re from importlib.metadata import PackageNotFoundError, version as package_version @@ -26,332 +24,90 @@ from browser_cli.commands.cookies import cookies_group from browser_cli.commands.perf import perf_group from browser_cli.commands.extension import extension_group from browser_cli.commands.serve import cmd_serve -from browser_cli.client import ( - send_command, - BrowserNotConnected, - REGISTRY_PATH, - active_browser_targets, - display_browser_name, - remote_target_for_alias, - remote_browser_targets, -) -from browser_cli.platform import install_base_dir, is_windows -from browser_cli.registry import load_registry +from browser_cli.commands.link_serve import cmd_link_serve +from browser_cli.commands.auth import auth_group +from browser_cli.commands.clients import clients_group +from browser_cli.commands.completion import cmd_completion +from browser_cli.commands.install import cmd_install console = Console() # Click's Group.shell_complete hardcodes no limit for get_short_help_str (defaults to 45 chars); # patch to use a wider limit so zsh completion descriptions aren't truncated. def _patched_group_shell_complete(self, ctx, incomplete): - from click.shell_completion import CompletionItem - results = [ - CompletionItem(name, help=command.get_short_help_str(limit=shutil.get_terminal_size().columns)) - for name, command in self.commands.items() - if not command.hidden and name.startswith(incomplete) - ] - results.extend(click.Command.shell_complete(self, ctx, incomplete)) - return results + from click.shell_completion import CompletionItem + results = [ + CompletionItem(name, help=command.get_short_help_str(limit=shutil.get_terminal_size().columns)) + for name, command in self.commands.items() + if not command.hidden and name.startswith(incomplete) + ] + results.extend(click.Command.shell_complete(self, ctx, incomplete)) + return results click.Group.shell_complete = _patched_group_shell_complete -NATIVE_HOST_NAME = "com.browsercli.host" -EXTENSION_ID = "bfpmkhngkjnfhabmfckgeohlilokodkg" - -NATIVE_HOST_DIRS = { - "chrome": { - "linux": [Path.home() / ".config/google-chrome/NativeMessagingHosts"], - "darwin": [Path.home() / "Library/Application Support/Google/Chrome/NativeMessagingHosts"], - }, - "chromium": { - "linux": [Path.home() / ".config/chromium/NativeMessagingHosts"], - "darwin": [Path.home() / "Library/Application Support/Chromium/NativeMessagingHosts"], - }, - "brave": { - "linux": [Path.home() / ".config/BraveSoftware/Brave-Browser/NativeMessagingHosts"], - "darwin": [Path.home() / "Library/Application Support/BraveSoftware/Brave-Browser/NativeMessagingHosts"], - }, - "edge": { - "linux": [Path.home() / ".config/microsoft-edge/NativeMessagingHosts"], - "darwin": [Path.home() / "Library/Application Support/Microsoft Edge/NativeMessagingHosts"], - }, - "vivaldi": { - "linux": [Path.home() / ".config/vivaldi/NativeMessagingHosts"], - "darwin": [Path.home() / "Library/Application Support/Vivaldi/NativeMessagingHosts"], - }, -} - -WINDOWS_NATIVE_HOST_REGISTRY_KEYS = { - "chrome": [r"Software\Google\Chrome\NativeMessagingHosts"], - "chromium": [r"Software\Chromium\NativeMessagingHosts"], - "brave": [r"Software\BraveSoftware\Brave-Browser\NativeMessagingHosts"], - "edge": [r"Software\Microsoft\Edge\NativeMessagingHosts"], - "vivaldi": [r"Software\Vivaldi\NativeMessagingHosts"], -} - -def _rename_target_profile(target_browser: str | None) -> str | None: - if target_browser: - return target_browser - - active = active_browser_targets() - if len(active) == 1: - return active[0].profile - return None - -def _ensure_unique_browser_alias(alias: str, target_browser: str | None) -> None: - target_profile = _rename_target_profile(target_browser) - - profiles: dict[str, str] = load_registry(REGISTRY_PATH) - - if alias in profiles and alias != target_profile: - raise click.ClickException(f"Browser alias '{alias}' already exists") - -def _native_host_exe() -> Path: - base = install_base_dir() - if is_windows(): - return base / "libexec" / "browser-cli-native-host.cmd" - return base / "libexec" / "browser-cli-native-host" - -def _write_native_host_exe(path: Path) -> None: - path.parent.mkdir(parents=True, exist_ok=True) - if is_windows(): - path.write_text( - f'@echo off\r\n"{sys.executable}" -c "from browser_cli.native_host import main; main()" %*\r\n', - encoding="utf-8", - ) - else: - path.write_text( - f'#!{sys.executable}\nfrom browser_cli.native_host import main\nmain()\n' - ) - path.chmod(path.stat().st_mode | 0o111) - -def _windows_registry_views(): - import winreg - - return [0, getattr(winreg, "KEY_WOW64_32KEY", 0), getattr(winreg, "KEY_WOW64_64KEY", 0)] - -def _register_windows_native_host(browser: str, manifest_path: Path) -> list[str]: - import winreg - - installed = [] - for key_path in WINDOWS_NATIVE_HOST_REGISTRY_KEYS[browser]: - full_key = f"{key_path}\\{NATIVE_HOST_NAME}" - for view in _windows_registry_views(): - try: - access = winreg.KEY_WRITE | view - key = winreg.CreateKeyEx(winreg.HKEY_CURRENT_USER, full_key, 0, access) - with key: - winreg.SetValueEx(key, "", 0, winreg.REG_SZ, str(manifest_path)) - installed.append(f"HKCU\\{full_key}") - except OSError as e: - console.print(f"[yellow]Could not write registry key {full_key}: {e}[/yellow]") - return installed - def _project_version() -> str: - pyproject_path = Path(__file__).resolve().parent.parent / "pyproject.toml" - try: - content = pyproject_path.read_text(encoding="utf-8") - match = re.search(r'^version\s*=\s*"([^"]+)"', content, re.MULTILINE) - if match: - return match.group(1) - except OSError: - pass + pyproject_path = Path(__file__).resolve().parent.parent / "pyproject.toml" + try: + content = pyproject_path.read_text(encoding="utf-8") + match = re.search(r'^version\s*=\s*"([^"]+)"', content, re.MULTILINE) + if match: + return match.group(1) + except OSError: + pass - try: - return package_version("browser-cli") - except PackageNotFoundError: - return "unknown" + try: + return package_version("browser-cli") + except PackageNotFoundError: + return "unknown" def _print_version(ctx, param, value): - if not value or ctx.resilient_parsing: - return - click.echo(_project_version()) - ctx.exit() + if not value or ctx.resilient_parsing: + return + click.echo(_project_version()) + ctx.exit() @click.group() @click.option( - "-V", "--version", - is_flag=True, - is_eager=True, - expose_value=False, - callback=_print_version, - help="Show the browser-cli version and exit.", + "-V", "--version", + is_flag=True, + is_eager=True, + expose_value=False, + callback=_print_version, + help="Show the browser-cli version and exit.", ) @click.option( - "--browser", default=None, metavar="ALIAS", - help="Browser profile alias to target (required when multiple browsers are active).", + "--browser", default=None, metavar="ALIAS", + help="Browser profile alias to target (required when multiple browsers are active).", ) @click.option( - "--remote", default=None, metavar="HOST[:PORT]", - help="Connect to a remote browser exposed via 'browser-cli serve'. Domains default to port 443.", + "--remote", default=None, metavar="HOST[:PORT]", + help="Connect to a remote browser exposed via 'browser-cli serve'. Domains default to port 443.", ) @click.option( - "--key", default=None, metavar="PATH", - help="Ed25519 private key PEM for pubkey auth with a remote serve instance.", + "--key", default=None, metavar="PATH", + help="Ed25519 private key PEM for pubkey auth with a remote serve instance.", ) @click.pass_context def main(ctx, browser, remote, key): - """Control your running browser from the terminal via a Chrome extension.""" - ctx.ensure_object(dict) - ctx.obj["browser"] = browser - ctx.obj["browser_explicit"] = browser is not None - if browser: - os.environ["BROWSER_CLI_PROFILE"] = browser - ctx.call_on_close(lambda: os.environ.pop("BROWSER_CLI_PROFILE", None)) - ctx.obj["remote"] = remote - ctx.obj["key"] = key - if remote: - os.environ["BROWSER_CLI_REMOTE"] = remote - ctx.call_on_close(lambda: os.environ.pop("BROWSER_CLI_REMOTE", None)) - if key: - os.environ["BROWSER_CLI_KEY"] = key - ctx.call_on_close(lambda: os.environ.pop("BROWSER_CLI_KEY", None)) - -# ── auth ────────────────────────────────────────────────────────────────────── - -@click.group("auth") -def auth_group(): - """Manage Ed25519 keys for public-key authentication with browser-cli serve.""" - -@auth_group.command("keygen") -@click.option("--output", "-o", default=None, metavar="PATH", help="Output path for the private key PEM.") -@click.option("--force", is_flag=True, help="Overwrite existing key.") -def cmd_auth_keygen(output, force): - """Generate an Ed25519 keypair for pubkey auth.""" - from browser_cli.auth import DEFAULT_KEY_PATH, generate_keypair - - key_path = Path(output) if output else DEFAULT_KEY_PATH - if key_path.exists() and not force: - console.print(f"[red]Key already exists:[/red] {key_path} (use --force to overwrite)") - sys.exit(1) - pem, pub_hex = generate_keypair() - key_path.parent.mkdir(parents=True, exist_ok=True) - fd = os.open(str(key_path), os.O_WRONLY | os.O_CREAT | os.O_TRUNC, 0o600) - with os.fdopen(fd, "wb") as f: - f.write(pem) - console.print(f"[green]✓[/green] Private key: {key_path}") - console.print(f"\nPublic key:\n [bold cyan]{pub_hex}[/bold cyan]") - console.print(f"\nOn the serve host, trust this key:") - console.print(f" [dim]browser-cli auth trust {pub_hex}[/dim]") - -@auth_group.command("trust") -@click.argument("pubkey") -@click.option("--name", default="", metavar="NAME", help="Human-friendly label for this key.") -@click.option("--file", "keys_file", default=None, metavar="PATH", help="Authorized keys file (default: ~/.config/browser-cli/authorized_keys).") -@click.pass_context -def cmd_auth_trust(ctx, pubkey, name, keys_file): - """Add a public key to the authorized keys file (locally or on a remote serve host).""" - from browser_cli.auth import DEFAULT_AUTHORIZED_KEYS_PATH, add_authorized_key - - if len(pubkey) != 64: - console.print("[red]Invalid public key:[/red] expected 64 hex characters (Ed25519 raw public key)") - sys.exit(1) - try: - bytes.fromhex(pubkey) - except ValueError: - console.print("[red]Invalid public key:[/red] not valid hex") - sys.exit(1) - - remote = (ctx.obj or {}).get("remote") - if remote: - from browser_cli.client import send_command - result = send_command( - "browser-cli.auth.trust", - args={"pubkey": pubkey, "name": name}, - remote=remote, - key=(ctx.obj or {}).get("key"), - ) - added = (result or {}).get("added", False) - label = f" ({name})" if name else "" - if added: - console.print(f"[green]✓[/green] Trusted on {remote}{label}: [cyan]{pubkey}[/cyan]") - else: - console.print(f"[yellow]Already trusted on {remote}:[/yellow] {pubkey}") - return - - path = Path(keys_file) if keys_file else DEFAULT_AUTHORIZED_KEYS_PATH - added = add_authorized_key(path, pubkey, name) - label = f" ({name})" if name else "" - if added: - console.print(f"[green]✓[/green] Trusted{label}: [cyan]{pubkey}[/cyan]") - console.print(f" File: {path}") - console.print(f"\nStart the server with:") - console.print(f" [dim]browser-cli serve --authorized-keys {path}[/dim]") - else: - console.print(f"[yellow]Already trusted:[/yellow] {pubkey}") - -@auth_group.command("show") -@click.option("--key", "key_src", default=None, metavar="PATH|agent[:]", - help="Key source: path to PEM file, 'agent', or 'agent:'.") -def cmd_auth_show(key_src): - """Print the Ed25519 public key that browser-cli will use for auth.""" - from browser_cli.auth import DEFAULT_KEY_PATH, agent_find_key, load_private_key, public_key_hex - - src = key_src or os.environ.get("BROWSER_CLI_KEY", str(DEFAULT_KEY_PATH)) - - if src == "agent" or src.startswith("agent:"): - selector = src[6:] or None - key = agent_find_key(selector) - if key is None: - console.print("[red]No Ed25519 key found in SSH agent.[/red]") - console.print(" Make sure gpg-agent / ssh-agent is running and the key is loaded.") - sys.exit(1) - console.print(f"[dim]source:[/dim] agent ({key.comment})") - console.print(public_key_hex(key)) - return - - path = Path(src) - if not path.exists(): - console.print(f"[red]No key found at {path}[/red]") - console.print(" Run: [dim]browser-cli auth keygen[/dim]") - console.print(" Or use: [dim]browser-cli auth show --key agent[/dim]") - sys.exit(1) - try: - priv = load_private_key(path) - console.print(f"[dim]source:[/dim] {path}") - console.print(public_key_hex(priv)) - except Exception as e: - console.print(f"[red]Failed to load key:[/red] {e}") - sys.exit(1) - -@auth_group.command("keys") -@click.option("--file", "keys_file", default=None, metavar="PATH", help="Authorized keys file (default: ~/.config/browser-cli/authorized_keys).") -@click.pass_context -def cmd_auth_keys(ctx, keys_file): - """List trusted public keys (server's authorized_keys). With --remote, queries the remote server.""" - from rich.table import Table - - remote = (ctx.obj or {}).get("remote") - if remote: - from browser_cli.client import send_command - result = send_command( - "browser-cli.auth.keys", - remote=remote, - key=(ctx.obj or {}).get("key"), - ) - entries = result or [] - source_label = remote - else: - from browser_cli.auth import DEFAULT_AUTHORIZED_KEYS_PATH, load_authorized_keys_with_names - path = Path(keys_file) if keys_file else DEFAULT_AUTHORIZED_KEYS_PATH - entries = [{"pubkey": pk, "name": name} for pk, name in load_authorized_keys_with_names(path)] - source_label = str(path) - - if not entries: - console.print(f"[yellow]No trusted keys[/yellow] in {source_label}") - console.print(" Add one: [dim]browser-cli auth trust --name