feat!: harden raw browser control and packaging
Testing / remote-protocol-compat (0.9.3) (push) Successful in 40s
Testing / remote-protocol-compat (0.9.5) (push) Successful in 38s
Testing / test (push) Failing after 1m3s
Package Extension / package-extension (push) Successful in 29s
Build & Publish Package / publish (push) Successful in 33s

- Add safe-by-default policy gates for raw command surfaces: command, script, and serve-http /command.

- Require explicit opt-ins for page reads, browser control, and high-risk commands such as dom.eval, storage.*, and screenshots.

- Remove all cookies support from CLI, SDK, extension commands, permissions, constants, docs, and tests.

- Add diagnostic, events, watch, workspace, remote, raw command, script, HTTP gateway, tree-view, session import/export, and extension info/capability commands.

- Add Chrome Web Store packaging that strips manifest.key while keeping local packages with a stable native-messaging extension ID.

- Bump browser-cli and extension version to 0.14.1 and cover the new behavior with pytest and extension packaging tests.

BREAKING CHANGE: cookies commands and the b.cookies SDK namespace have been removed; generic raw command execution now blocks non-safe commands unless explicitly allowed.
This commit is contained in:
2026-06-14 14:33:15 +02:00
parent 3e3b8d529c
commit 5cec57e06d
43 changed files with 1184 additions and 375 deletions
-3
View File
@@ -29,7 +29,6 @@ Commands are grouped into namespaces on the client:
b.extract content extraction (links, images, text, json, markdown)
b.page page info
b.storage localStorage / sessionStorage
b.cookies cookies (list, get, set)
b.session sessions (save, load, list, diff, ...)
b.perf performance profile + background jobs
b.extension control the extension itself
@@ -41,7 +40,6 @@ from browser_cli.client import active_browser_targets, remote_browser_targets, s
from browser_cli.errors import BrowserNotConnected
from browser_cli.models import BrowserCounts, Group, Tab
from browser_cli.sdk import (
CookiesNS,
DecoratorsNS,
DomNS,
ExtensionNS,
@@ -85,7 +83,6 @@ class BrowserCLI(FactoryMixin, RoutingMixin):
extract: ExtractNS
page: PageNS
storage: StorageNS
cookies: CookiesNS
session: SessionNS
perf: PerfNS
extension: ExtensionNS
+16 -2
View File
@@ -20,7 +20,6 @@ from browser_cli.commands.session import session_group
from browser_cli.commands.search import search_group
from browser_cli.commands.page import page_group
from browser_cli.commands.storage import storage_group
from browser_cli.commands.cookies import cookies_group
from browser_cli.commands.perf import perf_group
from browser_cli.commands.extension import extension_group
from browser_cli.commands.serve import cmd_serve
@@ -29,6 +28,14 @@ from browser_cli.commands.auth import auth_group
from browser_cli.commands.clients import clients_group
from browser_cli.commands.completion import cmd_completion
from browser_cli.commands.install import cmd_install
from browser_cli.commands.doctor import cmd_doctor
from browser_cli.commands.events import cmd_events
from browser_cli.commands.remote import remote_group
from browser_cli.commands.script import cmd_script
from browser_cli.commands.serve_http import cmd_serve_http
from browser_cli.commands.watch import watch_group
from browser_cli.commands.workspace import workspace_group
from browser_cli.commands.raw import cmd_command
console = Console()
@@ -118,7 +125,6 @@ main.add_command(session_group)
main.add_command(search_group)
main.add_command(page_group)
main.add_command(storage_group)
main.add_command(cookies_group)
main.add_command(perf_group)
main.add_command(extension_group)
main.add_command(cmd_serve)
@@ -126,6 +132,14 @@ main.add_command(cmd_link_serve)
main.add_command(clients_group)
main.add_command(cmd_completion)
main.add_command(cmd_install)
main.add_command(cmd_doctor)
main.add_command(cmd_events)
main.add_command(remote_group)
main.add_command(cmd_script)
main.add_command(cmd_serve_http)
main.add_command(watch_group)
main.add_command(workspace_group)
main.add_command(cmd_command)
# ── native-host (hidden, called by Chrome via native messaging) ────────────────
+119
View File
@@ -0,0 +1,119 @@
"""Safety policy for generic command execution surfaces.
Dedicated first-party CLI/SDK methods keep their normal behavior. This module
only gates raw surfaces where a single string can trigger arbitrary browser
capabilities: ``browser-cli command``, ``browser-cli script``, and the HTTP
``/command`` endpoint.
"""
from __future__ import annotations
from dataclasses import dataclass
SAFE_COMMANDS = {
"clients.list",
"extension.capabilities",
"extension.info",
"group.list",
"group.query",
"group.tabs",
"page.info",
"perf.status",
"tabs.active_in_window",
"tabs.count",
"tabs.filter",
"tabs.list",
"tabs.query",
"tabs.status",
"windows.list",
}
READ_PAGE_COMMANDS = {
"dom.attr",
"dom.exists",
"dom.query",
"dom.text",
"extract.html",
"extract.images",
"extract.json",
"extract.links",
"extract.markdown",
"extract.text",
"tabs.html",
}
CONTROL_PREFIXES = (
"navigate.",
"nav.",
"group.",
"session.",
"tabs.",
"windows.",
)
CONTROL_COMMANDS = {
"dom.check",
"dom.clear",
"dom.click",
"dom.focus",
"dom.hover",
"dom.key",
"dom.poll",
"dom.scroll",
"dom.select",
"dom.submit",
"dom.type",
"dom.uncheck",
"dom.wait_for",
"extension.reload",
}
DANGEROUS_COMMANDS = {
"dom.eval",
"tabs.screenshot",
}
DANGEROUS_PREFIXES = (
"storage.",
)
@dataclass(frozen=True)
class CommandPolicy:
allow_read_page: bool = False
allow_control: bool = False
allow_dangerous: bool = False
@classmethod
def unrestricted(cls) -> "CommandPolicy":
return cls(allow_read_page=True, allow_control=True, allow_dangerous=True)
def _is_control(command: str) -> bool:
if command in CONTROL_COMMANDS:
return True
if any(command.startswith(prefix) for prefix in CONTROL_PREFIXES):
return command not in SAFE_COMMANDS and command not in READ_PAGE_COMMANDS and command not in DANGEROUS_COMMANDS
return False
def command_category(command: str) -> str:
name = str(command or "")
if name in DANGEROUS_COMMANDS or any(name.startswith(prefix) for prefix in DANGEROUS_PREFIXES):
return "dangerous"
if name in READ_PAGE_COMMANDS:
return "read-page"
if name in SAFE_COMMANDS:
return "safe"
if _is_control(name):
return "control"
return "unknown"
def assert_command_allowed(command: str, policy: CommandPolicy) -> None:
category = command_category(command)
if category == "safe":
return
if category == "read-page" and policy.allow_read_page:
return
if category == "control" and policy.allow_control:
return
if category == "dangerous" and policy.allow_dangerous:
return
raise PermissionError(
f"Raw command '{command}' is {category} and blocked by default; "
"use --allow-read-page, --allow-control, or --allow-dangerous explicitly"
)
+3
View File
@@ -71,6 +71,9 @@ def handle_errors(fn):
except BrowserNotConnected as e:
_console.print(f"[red]Error:[/red] {e}")
raise SystemExit(1)
except PermissionError as e:
_console.print(f"[red]Blocked:[/red] {e}")
raise SystemExit(1)
except RuntimeError as e:
_console.print(f"[red]Browser error:[/red] {e}")
raise SystemExit(1)
-72
View File
@@ -1,72 +0,0 @@
import click
from browser_cli.commands import client_from_ctx, handle_errors
from rich.console import Console
from rich.table import Table
console = Console()
@click.group("cookies")
def cookies_group():
"""Manage browser cookies."""
@cookies_group.command("list")
@click.option("--url", default=None, help="Filter by URL")
@click.option("--domain", default=None, help="Filter by domain")
@click.option("--name", default=None, help="Filter by cookie name")
@handle_errors
def cookies_list(url, domain, name):
"""List cookies, optionally filtered by URL, domain, or name."""
cookies = client_from_ctx().cookies.list(url=url, domain=domain, name=name)
if not cookies:
console.print("[yellow]No cookies found[/yellow]")
return
table = Table(show_header=True, header_style="bold cyan")
table.add_column("Name")
table.add_column("Value")
table.add_column("Domain")
table.add_column("Path")
table.add_column("Secure", width=7)
table.add_column("HttpOnly", width=9)
for c in cookies:
table.add_row(
c.get("name", ""),
(c.get("value") or "")[:60],
c.get("domain", ""),
c.get("path", ""),
"[green]✓[/green]" if c.get("secure") else "",
"[green]✓[/green]" if c.get("httpOnly") else "",
)
console.print(table)
@cookies_group.command("get")
@click.argument("url")
@click.argument("name")
@handle_errors
def cookies_get(url, name):
"""Get the value of a single cookie by URL and NAME."""
cookie = client_from_ctx().cookies.get(url, name)
if cookie is None:
console.print(f"[yellow]Cookie '{name}' not found for {url}[/yellow]")
raise SystemExit(1)
console.print(cookie.get("value", ""))
@cookies_group.command("set")
@click.argument("url")
@click.argument("name")
@click.argument("value")
@click.option("--domain", default=None)
@click.option("--path", default=None)
@click.option("--secure", is_flag=True)
@click.option("--http-only", "http_only", is_flag=True)
@click.option("--expires", "expiration_date", type=float, default=None, help="Unix timestamp")
@click.option("--same-site", type=click.Choice(["no_restriction", "lax", "strict"]), default=None)
@handle_errors
def cookies_set(url, name, value, domain, path, secure, http_only, expiration_date, same_site):
"""Set a cookie on URL."""
client_from_ctx().cookies.set(
url, name, value,
domain=domain, path=path,
secure=secure or None, http_only=http_only or None,
expiration_date=expiration_date, same_site=same_site,
)
console.print(f"[green]Set cookie:[/green] {name}={value!r} on {url}")
+90
View File
@@ -0,0 +1,90 @@
from __future__ import annotations
import re
import shutil
from importlib.metadata import PackageNotFoundError, version as package_version
from pathlib import Path
import click
from rich.console import Console
from rich.table import Table
from browser_cli.commands import handle_errors, client_from_ctx
from browser_cli.client import active_browser_targets
from browser_cli.constants import NATIVE_HOST_DIRS, NATIVE_HOST_NAME
from browser_cli.platform import is_windows
console = Console()
def _project_version() -> str:
pyproject_path = Path(__file__).resolve().parents[2] / "pyproject.toml"
try:
content = pyproject_path.read_text(encoding="utf-8")
match = re.search(r'^version\s*=\s*"([^"]+)"', content, re.MULTILINE)
if match:
return match.group(1)
except OSError:
pass
try:
return package_version("browser-cli")
except PackageNotFoundError:
return "unknown"
def _status(ok: bool) -> str:
return "[green]OK[/green]" if ok else "[red]FAIL[/red]"
@click.command("doctor")
@click.option("--remote", "check_remote", is_flag=True, help="Also probe the configured remote endpoint")
@handle_errors
def cmd_doctor(check_remote):
"""Diagnose browser-cli installation, extension, and connection health."""
rows: list[tuple[str, bool, str]] = []
version = _project_version()
rows.append(("Python package", version != "unknown", version))
rows.append(("browser-cli executable", shutil.which("browser-cli") is not None, shutil.which("browser-cli") or "not on PATH"))
manifest_notes = []
if not is_windows():
import sys
platform = "darwin" if sys.platform == "darwin" else "linux"
for browser, by_platform in NATIVE_HOST_DIRS.items():
for directory in by_platform.get(platform, []):
path = directory / f"{NATIVE_HOST_NAME}.json"
if path.exists():
manifest_notes.append(f"{browser}: {path}")
rows.append(("Native host manifest", bool(manifest_notes), "; ".join(manifest_notes) or "not found for common browsers"))
try:
targets = active_browser_targets(include_remotes=check_remote)
rows.append(("Browser registry", bool(targets), f"{len(targets)} active target(s)"))
except Exception as exc:
rows.append(("Browser registry", False, str(exc)))
client = client_from_ctx()
try:
clients = client.clients()
rows.append(("Connection", True, f"{len(clients)} client(s) responded"))
ext_versions = sorted({str(c.get("extensionVersion", "unknown")) for c in clients if isinstance(c, dict)})
if ext_versions:
rows.append(("Extension version", version in ext_versions, ", ".join(ext_versions)))
except Exception as exc:
rows.append(("Connection", False, str(exc)))
try:
info = client.extension.info()
caps = info.get("capabilities") or []
rows.append(("Extension info", True, f"v{info.get('version', 'unknown')} · {len(caps)} capabilities"))
except Exception as exc:
rows.append(("Extension info", False, f"not available ({exc})"))
table = Table(show_header=True, header_style="bold cyan")
table.add_column("Check")
table.add_column("Status")
table.add_column("Details")
for name, ok, detail in rows:
table.add_row(name, _status(ok), detail)
console.print(table)
failed = [name for name, ok, _ in rows if not ok and name in {"Connection", "Browser registry"}]
if failed:
raise SystemExit(1)
+52
View File
@@ -0,0 +1,52 @@
from __future__ import annotations
import json
import time
from dataclasses import asdict, is_dataclass
import click
from rich.console import Console
from browser_cli.commands import client_from_ctx, handle_errors
console = Console()
def _snapshot(client):
tabs = client.tabs.list()
return {str(t.id): asdict(t) if is_dataclass(t) else dict(t) for t in tabs}
def _emit(event, json_output: bool):
if json_output:
click.echo(json.dumps(event, default=str), flush=True)
else:
kind = event.get("type")
tab = event.get("tab") or {}
console.print(f"[cyan]{kind}[/cyan] {tab.get('id', '')} {tab.get('title') or ''} [dim]{tab.get('url') or ''}[/dim]")
@click.command("events")
@click.option("--interval", type=float, default=1.0, show_default=True, help="Polling interval in seconds")
@click.option("--once", is_flag=True, help="Emit initial snapshot and exit")
@click.option("--json", "json_output", is_flag=True, default=True, help="Emit JSON Lines (default)")
@click.option("--pretty", is_flag=True, help="Render human-readable events instead of JSON")
@handle_errors
def cmd_events(interval: float, once: bool, json_output: bool, pretty: bool):
"""Stream tab events as JSON Lines using a lightweight polling watcher."""
json_output = json_output and not pretty
client = client_from_ctx()
previous = _snapshot(client)
for tab in previous.values():
_emit({"type": "tabs.snapshot", "tab": tab}, json_output)
if once:
return
while True:
time.sleep(interval)
current = _snapshot(client)
for tab_id, tab in current.items():
if tab_id not in previous:
_emit({"type": "tabs.created", "tab": tab}, json_output)
elif tab != previous[tab_id]:
_emit({"type": "tabs.updated", "tab": tab, "previous": previous[tab_id]}, json_output)
for tab_id, tab in previous.items():
if tab_id not in current:
_emit({"type": "tabs.closed", "tab": tab}, json_output)
previous = current
+21
View File
@@ -8,6 +8,27 @@ console = Console()
def extension_group():
"""Manage the browser-cli browser extension."""
@extension_group.command("info")
@handle_errors
def extension_info():
"""Show extension version and advertised capabilities."""
info = client_from_ctx().extension.info()
for key in ("name", "version", "id", "platform"):
if key in info:
console.print(f"[bold]{key}:[/bold] {info[key]}")
caps = info.get("capabilities") or []
if caps:
console.print("[bold]capabilities:[/bold]")
for cap in caps:
console.print(f" - {cap}")
@extension_group.command("capabilities")
@handle_errors
def extension_capabilities():
"""Print extension feature capability strings."""
for cap in client_from_ctx().extension.capabilities():
console.print(cap)
@extension_group.command("reload")
@handle_errors
def extension_reload():
+3 -3
View File
@@ -133,19 +133,19 @@ _LOOPBACK_HOSTS = {"127.0.0.1", "::1", "localhost"}
@click.option("--token", default=None, metavar="SECRET",
help="Shared bearer token required from callers (sent as 'Authorization: Bearer ...').")
@click.option("--insecure", is_flag=True, default=False,
help="Run with NO token. Grants full browser control (cookies, pages) to anyone who can reach the port.")
help="Run with NO token. Grants full browser control to anyone who can reach the port.")
@click.pass_context
def cmd_link_serve(ctx, host, port, token, insecure):
"""Serve this browser to the ServiceLink mesh over HTTP /rpc.
Exposes the running browser (open/scrape pages, read cookies and storage), so
Exposes the running browser (open/scrape pages, read storage), so
a token is required by default. Bind to loopback and keep the port off the
public network.
"""
if not token and not insecure:
raise click.ClickException(
"Refusing to start without --token (this endpoint can control your browser "
"and read its cookies). Pass --insecure to override on a trusted host."
"and read page/storage data). Pass --insecure to override on a trusted host."
)
if host not in _LOOPBACK_HOSTS:
click.echo(
+10 -4
View File
@@ -13,10 +13,13 @@ def nav_group():
@click.option("--focus", is_flag=True, help="Bring the opened tab/window to the front")
@click.option("--window", "window_name", default=None, help="Open in named window")
@click.option("--group", "group_name", default=None, help="Open directly into a tab group (name or ID)")
@click.option("--reuse", is_flag=True, help="Reuse an existing tab with exactly this URL")
@click.option("--reuse-domain", is_flag=True, help="Reuse an existing tab with the same domain")
@click.option("--reuse-title", default=None, metavar="TEXT", help="Reuse an existing tab whose title contains TEXT")
@handle_errors
def cmd_open(url, focus, window_name, group_name):
def cmd_open(url, focus, window_name, group_name, reuse, reuse_domain, reuse_title):
"""Open URL in a new tab without stealing focus by default."""
client_from_ctx().nav.open(url, focus=focus, window=window_name, group=group_name)
client_from_ctx().nav.open(url, focus=focus, window=window_name, group=group_name, reuse=reuse, reuse_domain=reuse_domain, reuse_title=reuse_title)
suffix = ""
if group_name:
suffix = f" in group '{group_name}'"
@@ -73,10 +76,13 @@ def cmd_focus(pattern):
@click.option("--focus", is_flag=True, help="Bring the opened tab/window to the front")
@click.option("--window", "window_name", default=None, help="Open in named window")
@click.option("--group", "group_name", default=None, help="Open in tab group")
@click.option("--reuse", is_flag=True, help="Reuse an existing tab with exactly this URL")
@click.option("--reuse-domain", is_flag=True, help="Reuse an existing tab with the same domain")
@click.option("--reuse-title", default=None, metavar="TEXT", help="Reuse an existing tab whose title contains TEXT")
@handle_errors
def cmd_open_wait(url, timeout, focus, window_name, group_name):
def cmd_open_wait(url, timeout, focus, window_name, group_name, reuse, reuse_domain, reuse_title):
"""Open URL in a new tab and wait until fully loaded."""
tab = client_from_ctx().nav.open_wait(url, timeout=timeout, focus=focus, window=window_name, group=group_name)
tab = client_from_ctx().nav.open_wait(url, timeout=timeout, focus=focus, window=window_name, group=group_name, reuse=reuse, reuse_domain=reuse_domain, reuse_title=reuse_title)
console.print(f"[green]Loaded:[/green] {url}" + (f"{tab.title}" if tab.title else ""))
@nav_group.command("wait")
+23
View File
@@ -0,0 +1,23 @@
from __future__ import annotations
import json
import click
from browser_cli.command_security import CommandPolicy, assert_command_allowed
from browser_cli.commands import client_from_ctx, handle_errors
@click.command("command")
@click.argument("name")
@click.argument("args_json", required=False, default="{}")
@click.option("--allow-read-page", is_flag=True, help="Allow page-content read commands such as extract.* and dom.text")
@click.option("--allow-control", is_flag=True, help="Allow browser-control commands such as nav.*, tabs.close, dom.click")
@click.option("--allow-dangerous", is_flag=True, help="Allow high-risk commands such as dom.eval, storage.*, screenshots")
@handle_errors
def cmd_command(name, args_json, allow_read_page, allow_control, allow_dangerous):
"""Send a raw browser-cli wire command and print JSON."""
policy = CommandPolicy(allow_read_page=allow_read_page, allow_control=allow_control, allow_dangerous=allow_dangerous)
assert_command_allowed(name, policy)
args = json.loads(args_json) if args_json else {}
result = client_from_ctx().command(name, args)
click.echo(json.dumps(result, indent=2, default=str))
+68
View File
@@ -0,0 +1,68 @@
from __future__ import annotations
import json
import click
from rich.console import Console
from rich.table import Table
from browser_cli import BrowserCLI
from browser_cli.commands import handle_errors
from browser_cli.remote.registry import REMOTE_REGISTRY_PATH, load_remotes, save_remote_key
console = Console()
@click.group("remote")
def remote_group():
"""Manage remembered browser-cli remote endpoints."""
@remote_group.command("status")
@click.argument("endpoint")
@click.option("--key", default=None, help="Key spec/path to use for this probe")
@handle_errors
def remote_status(endpoint, key):
"""Probe a remote endpoint and show server/client status."""
client = BrowserCLI(remote=endpoint, key=key)
clients = client.clients()
table = Table(show_header=True, header_style="bold cyan")
table.add_column("Profile")
table.add_column("Browser")
table.add_column("Extension")
for item in clients:
table.add_row(str(item.get("profile", "")), str(item.get("name", "")), str(item.get("extensionVersion", "")))
console.print(table)
@remote_group.command("trust")
@click.argument("endpoint")
@click.argument("key_spec")
def remote_trust(endpoint, key_spec):
"""Remember which key spec to use for ENDPOINT."""
save_remote_key(endpoint, key_spec)
console.print(f"[green]Trusted remote {endpoint} with key {key_spec}[/green]")
@remote_group.command("keys")
def remote_keys():
"""List remembered remote key specs."""
remotes = load_remotes()
if not remotes:
console.print("[yellow]No remembered remotes[/yellow]")
return
table = Table(show_header=True, header_style="bold cyan")
table.add_column("Endpoint")
table.add_column("Key")
for endpoint, cfg in sorted(remotes.items()):
table.add_row(endpoint, str(cfg.get("key", "")))
console.print(table)
@remote_group.command("revoke")
@click.argument("endpoint")
def remote_revoke(endpoint):
"""Remove remembered key/config for ENDPOINT."""
remotes = load_remotes()
if endpoint not in remotes:
console.print(f"[yellow]Remote {endpoint} not remembered[/yellow]")
return
del remotes[endpoint]
REMOTE_REGISTRY_PATH.parent.mkdir(parents=True, exist_ok=True)
REMOTE_REGISTRY_PATH.write_text(json.dumps(remotes, indent=2, sort_keys=True) + "\n", encoding="utf-8")
console.print(f"[green]Revoked {endpoint}[/green]")
+68
View File
@@ -0,0 +1,68 @@
from __future__ import annotations
import json
from pathlib import Path
import click
from rich.console import Console
from browser_cli.command_security import CommandPolicy, assert_command_allowed
from browser_cli.commands import client_from_ctx, handle_errors
console = Console()
def _load_steps(path: Path):
text = path.read_text(encoding="utf-8")
if path.suffix.lower() in {".yaml", ".yml"}:
try:
import yaml # type: ignore
except Exception as exc:
raise click.ClickException("YAML scripts require PyYAML; use JSON or install PyYAML") from exc
return yaml.safe_load(text)
return json.loads(text)
def _parse_step(step):
if isinstance(step, str):
return step, {}
if isinstance(step, dict):
if "command" in step:
return step["command"], step.get("args") or {}
if len(step) == 1:
command, args = next(iter(step.items()))
return command, args or {}
raise click.ClickException(f"Invalid script step: {step!r}")
@click.command("script")
@click.argument("file", type=click.Path(exists=True, dir_okay=False, path_type=Path))
@click.option("--json", "json_output", is_flag=True, help="Print all step results as JSON")
@click.option("--continue-on-error", is_flag=True, help="Continue after failed steps")
@click.option("--allow-read-page", is_flag=True, help="Allow page-content read commands such as extract.* and dom.text")
@click.option("--allow-control", is_flag=True, help="Allow browser-control commands such as nav.*, tabs.close, dom.click")
@click.option("--allow-dangerous", is_flag=True, help="Allow high-risk commands such as dom.eval, storage.*, screenshots")
@handle_errors
def cmd_script(file: Path, json_output: bool, continue_on_error: bool, allow_read_page: bool, allow_control: bool, allow_dangerous: bool):
"""Run a JSON/YAML batch script of browser-cli wire commands."""
steps = _load_steps(file)
if not isinstance(steps, list):
raise click.ClickException("Script root must be a list")
client = client_from_ctx()
policy = CommandPolicy(allow_read_page=allow_read_page, allow_control=allow_control, allow_dangerous=allow_dangerous)
results = []
for index, step in enumerate(steps, start=1):
command, args = _parse_step(step)
try:
assert_command_allowed(command, policy)
result = client.command(command, args)
results.append({"index": index, "command": command, "ok": True, "result": result})
if not json_output:
console.print(f"[green]✓[/green] {index}: {command}")
except Exception as exc:
results.append({"index": index, "command": command, "ok": False, "error": str(exc)})
if not continue_on_error:
if json_output:
click.echo(json.dumps(results, indent=2, default=str))
raise
if not json_output:
console.print(f"[red]✗[/red] {index}: {command}: {exc}")
if json_output:
click.echo(json.dumps(results, indent=2, default=str))
+1 -1
View File
@@ -87,7 +87,7 @@ def cmd_serve(ctx, host, port, no_auth, auth_keys_file, no_compress, rpc, rpc_po
if rpc and not rpc_token and not rpc_insecure:
console.print(
"[red]Error:[/red] --rpc requires --rpc-token (this endpoint can control your "
"browser and read its cookies). Use --rpc-insecure to override on a trusted host."
"browser and read page/storage data). Use --rpc-insecure to override on a trusted host."
)
sys.exit(1)
+115
View File
@@ -0,0 +1,115 @@
from __future__ import annotations
import json
import secrets
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from urllib.parse import urlparse
import click
from rich.console import Console
from browser_cli import BrowserCLI
from browser_cli.command_security import CommandPolicy, assert_command_allowed
console = Console()
def _is_loopback(host: str) -> bool:
return host in {"127.0.0.1", "localhost", "::1"}
class _Handler(BaseHTTPRequestHandler):
client: BrowserCLI
token: str | None = None
policy: CommandPolicy = CommandPolicy()
def _authorized(self) -> bool:
if self.token is None:
return True
if self.headers.get("Authorization", "") == f"Bearer {self.token}":
return True
return self.headers.get("X-Browser-CLI-Token") == self.token
def _require_auth(self) -> bool:
if self._authorized():
return True
self._send(401, {"error": "missing or invalid token"})
return False
def _send(self, status: int, payload):
raw = json.dumps(payload, default=str).encode("utf-8")
self.send_response(status)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Length", str(len(raw)))
self.end_headers()
self.wfile.write(raw)
def do_GET(self):
path = urlparse(self.path).path
try:
if path != "/health" and not self._require_auth():
return
if path == "/tabs":
self._send(200, [t.__dict__ for t in self.client.tabs.list()])
elif path == "/clients":
self._send(200, self.client.clients())
elif path == "/health":
self._send(200, {"ok": True})
else:
self._send(404, {"error": "not found"})
except Exception as exc:
self._send(500, {"error": str(exc)})
def do_POST(self):
path = urlparse(self.path).path
try:
length = int(self.headers.get("Content-Length", "0"))
body = json.loads(self.rfile.read(length) or b"{}")
if path == "/command":
if not self._require_auth():
return
command = body.get("command")
assert_command_allowed(command, self.policy)
self._send(200, {"result": self.client.command(command, body.get("args") or {})})
else:
self._send(404, {"error": "not found"})
except PermissionError as exc:
self._send(403, {"error": str(exc)})
except Exception as exc:
self._send(500, {"error": str(exc)})
def log_message(self, fmt, *args):
console.print(f"[dim]http[/dim] {self.address_string()} {fmt % args}")
@click.command("serve-http")
@click.option("--host", default="127.0.0.1", show_default=True)
@click.option("--port", type=int, default=8766, show_default=True)
@click.option("--browser", default=None, help="Browser alias to target")
@click.option("--remote", default=None, help="Remote endpoint to target")
@click.option("--key", default=None, help="Remote auth key spec")
@click.option("--token", default=None, help="Bearer token required for HTTP access (generated by default)")
@click.option("--no-auth", is_flag=True, help="Disable HTTP auth (only allowed on loopback hosts)")
@click.option("--allow-read-page", is_flag=True, help="Allow /command to run page-content read commands")
@click.option("--allow-control", is_flag=True, help="Allow /command to run browser-control commands")
@click.option("--allow-dangerous", is_flag=True, help="Allow /command to run high-risk commands")
def cmd_serve_http(host, port, browser, remote, key, token, no_auth, allow_read_page, allow_control, allow_dangerous):
"""Expose a tiny local HTTP JSON gateway (/tabs, /clients, /command).
Auth is enabled by default. Pass the printed token as either
``Authorization: Bearer <token>`` or ``X-Browser-CLI-Token: <token>``.
"""
if no_auth and not _is_loopback(host):
raise click.ClickException("--no-auth is only allowed on loopback hosts")
auth_token = None if no_auth else (token or secrets.token_urlsafe(32))
policy = CommandPolicy(allow_read_page=allow_read_page, allow_control=allow_control, allow_dangerous=allow_dangerous)
handler = type(
"BrowserCLIHTTPHandler",
(_Handler,),
{"client": BrowserCLI(browser=browser, remote=remote, key=key), "token": auth_token, "policy": policy},
)
server = ThreadingHTTPServer((host, port), handler)
console.print(f"[green]HTTP gateway listening on http://{host}:{port}[/green]")
if auth_token:
console.print(f"[yellow]Token:[/yellow] {auth_token}")
try:
server.serve_forever()
except KeyboardInterrupt:
console.print("\n[yellow]Stopping HTTP gateway[/yellow]")
+29
View File
@@ -1,3 +1,5 @@
import json
from pathlib import Path
import click
from browser_cli.commands import client_from_ctx, gentle_mode_option, handle_errors
from rich.console import Console
@@ -44,6 +46,33 @@ def session_load(name, gentle_mode, discard_background_tabs, lazy, eager_tabs, b
count = result.get("tabs", 0) if isinstance(result, dict) else 0
console.print(f"[green]Session '{name}' loaded[/green] ({count} tabs opened)")
@session_group.command("export")
@click.argument("name", required=False)
@click.option("-o", "output", type=click.Path(dir_okay=False, path_type=Path), default=None, help="Write JSON to file instead of stdout")
@handle_errors
def session_export(name, output):
"""Export one saved session, or all sessions as JSON."""
data = client_from_ctx().session.export(name)
text = json.dumps(data, indent=2, sort_keys=True)
if output:
output.write_text(text + "\n", encoding="utf-8")
console.print(f"[green]Exported session data to {output}[/green]")
else:
click.echo(text)
@session_group.command("import")
@click.argument("name")
@click.argument("file", type=click.Path(exists=True, dir_okay=False, path_type=Path))
@click.option("--overwrite", is_flag=True, help="Replace an existing saved session")
@handle_errors
def session_import(name, file, overwrite):
"""Import a saved session JSON file."""
payload = json.loads(file.read_text(encoding="utf-8"))
session = payload.get("session", payload) if isinstance(payload, dict) else payload
result = client_from_ctx().session.import_(name, session, overwrite=overwrite)
count = result.get("tabs", 0) if isinstance(result, dict) else 0
console.print(f"[green]Imported session '{name}'[/green] ({count} tabs)")
@session_group.command("diff")
@click.argument("name_a")
@click.argument("name_b")
+29
View File
@@ -4,6 +4,7 @@ import click
from browser_cli.commands import client_from_ctx, gentle_mode_option, handle_errors, print_counts, tab_option
from rich.console import Console
from rich.table import Table
from rich.tree import Tree
console = Console()
@@ -46,6 +47,34 @@ def tabs_list():
tabs = client_from_ctx().tabs.list()
_print_tabs(tabs, show_browser=any(t.browser for t in tabs))
@tabs_group.command("tree")
@handle_errors
def tabs_tree():
"""Show tabs grouped as a window/group tree."""
tabs = sorted(client_from_ctx().tabs.list(), key=lambda t: ((t.browser or ""), t.window_id, t.group_id if t.group_id is not None else -1, t.index))
root = Tree("[bold]Tabs[/bold]")
browsers = {}
windows = {}
groups = {}
show_browser = any(t.browser for t in tabs)
for tab in tabs:
browser_key = tab.browser or "local"
browser_node = browsers.setdefault(browser_key, root.add(f"[bold cyan]{browser_key}[/bold cyan]") if show_browser else root)
win_key = (browser_key, tab.window_id)
win_node = windows.get(win_key)
if win_node is None:
win_node = browser_node.add(f"Window {tab.window_id}")
windows[win_key] = win_node
group_label = f"Group {tab.group_id}" if tab.group_id is not None else "Ungrouped"
group_key = (browser_key, tab.window_id, group_label)
group_node = groups.get(group_key)
if group_node is None:
group_node = win_node.add(group_label)
groups[group_key] = group_node
active = " [green]*[/green]" if tab.active else ""
group_node.add(f"[{tab.id}] {tab.title or '(untitled)'}{active} — [dim]{tab.url or ''}[/dim]")
console.print(root)
@tabs_group.command("close")
@click.argument("tab_id", type=int, required=False)
@click.option("--inactive", is_flag=True, help="Close all inactive tabs")
+60
View File
@@ -0,0 +1,60 @@
from __future__ import annotations
import json
import time
import click
from browser_cli.commands import client_from_ctx, handle_errors
@click.group("watch")
def watch_group():
"""Watch browser state and print changes."""
@watch_group.command("tabs")
@click.option("--interval", type=float, default=1.0, show_default=True)
@click.option("--once", is_flag=True)
@handle_errors
def watch_tabs(interval, once):
"""Watch the tab list as JSON snapshots."""
client = client_from_ctx()
previous = None
while True:
current = [t.__dict__ for t in client.tabs.list()]
if current != previous:
click.echo(json.dumps({"type": "tabs", "tabs": current}, default=str), flush=True)
previous = current
if once:
return
time.sleep(interval)
@watch_group.command("page")
@click.option("--field", default=None, help="Only print a single page.info field")
@click.option("--interval", type=float, default=1.0, show_default=True)
@handle_errors
def watch_page(field, interval):
"""Watch page.info for the active tab."""
client = client_from_ctx()
previous = object()
while True:
info = client.page.info()
current = info.get(field) if field else info
if current != previous:
click.echo(json.dumps({"type": "page", "field": field, "value": current}, default=str), flush=True)
previous = current
time.sleep(interval)
@watch_group.command("dom")
@click.argument("selector")
@click.option("--interval", type=float, default=1.0, show_default=True)
@handle_errors
def watch_dom(selector, interval):
"""Watch textContent for a selector."""
client = client_from_ctx()
previous = object()
while True:
current = client.dom.text(selector)
if current != previous:
click.echo(json.dumps({"type": "dom", "selector": selector, "text": current}, default=str), flush=True)
previous = current
time.sleep(interval)
+22
View File
@@ -2,6 +2,7 @@ import click
from browser_cli.commands import client_from_ctx, handle_errors
from rich.console import Console
from rich.table import Table
from rich.tree import Tree
console = Console()
@@ -38,6 +39,27 @@ def windows_list():
windows = client_from_ctx().windows.list()
_print_windows(windows, show_browser=any("browser" in w for w in windows))
@windows_group.command("tree")
@handle_errors
def windows_tree():
"""Show windows and their tabs as a tree."""
client = client_from_ctx()
windows = client.windows.list()
tabs = client.tabs.list()
root = Tree("[bold]Windows[/bold]")
for w in sorted(windows, key=lambda item: (item.get("browser", ""), item.get("id", 0))):
wid = w.get("id")
label = f"Window {wid}"
if w.get("alias"):
label += f" ({w['alias']})"
if w.get("browser"):
label = f"{w['browser']}: " + label
node = root.add(label)
for tab in sorted([t for t in tabs if t.window_id == wid and (not w.get("browser") or t.browser == w.get("browser"))], key=lambda t: t.index):
active = " [green]*[/green]" if tab.active else ""
node.add(f"[{tab.id}] {tab.title or '(untitled)'}{active} — [dim]{tab.url or ''}[/dim]")
console.print(root)
@windows_group.command("rename")
@click.argument("window_id", type=int)
@click.argument("name")
+91
View File
@@ -0,0 +1,91 @@
from __future__ import annotations
import json
from pathlib import Path
import click
from rich.console import Console
from rich.table import Table
from browser_cli.commands import client_from_ctx, handle_errors
from browser_cli.constants import CONFIG_DIR
console = Console()
WORKSPACES_PATH = CONFIG_DIR / "workspaces.json"
def _load() -> dict:
try:
return json.loads(WORKSPACES_PATH.read_text(encoding="utf-8"))
except FileNotFoundError:
return {}
def _save(data: dict) -> None:
WORKSPACES_PATH.parent.mkdir(parents=True, exist_ok=True)
WORKSPACES_PATH.write_text(json.dumps(data, indent=2, sort_keys=True) + "\n", encoding="utf-8")
@click.group("workspace")
def workspace_group():
"""Named browser workspaces built on top of sessions."""
@workspace_group.command("save")
@click.argument("name")
@click.option("--session", "session_name", default=None, help="Session name to save/use (default: workspace name)")
@click.option("--profile", default=None, help="Performance profile to remember")
@handle_errors
def workspace_save(name, session_name, profile):
session_name = session_name or name
result = client_from_ctx().session.save(session_name)
data = _load()
data[name] = {"session": session_name, "profile": profile}
_save(data)
console.print(f"[green]Workspace '{name}' saved[/green] ({result.get('tabs', 0) if isinstance(result, dict) else 0} tabs)")
@workspace_group.command("load")
@click.argument("name")
@click.option("--lazy", is_flag=True, help="Lazy-restore tabs")
@click.option("--eager-tabs", type=int, default=10, show_default=True)
@handle_errors
def workspace_load(name, lazy, eager_tabs):
data = _load()
ws = data.get(name)
if not ws:
raise click.ClickException(f"Workspace '{name}' not found")
client = client_from_ctx()
if ws.get("profile"):
client.perf.set_profile(ws["profile"])
result = client.session.load(ws["session"], lazy=lazy, eager_tabs=eager_tabs)
console.print(f"[green]Workspace '{name}' loaded[/green] ({result.get('tabs', 0) if isinstance(result, dict) else 0} tabs)")
@workspace_group.command("switch")
@click.argument("name")
@click.option("--lazy", is_flag=True)
@handle_errors
def workspace_switch(name, lazy):
"""Load a workspace. Alias for workspace load."""
ctx = click.get_current_context()
ctx.invoke(workspace_load, name=name, lazy=lazy, eager_tabs=10)
@workspace_group.command("list")
def workspace_list():
"""List configured workspaces."""
data = _load()
if not data:
console.print("[yellow]No workspaces[/yellow]")
return
table = Table(show_header=True, header_style="bold cyan")
table.add_column("Name")
table.add_column("Session")
table.add_column("Profile")
for name, ws in sorted(data.items()):
table.add_row(name, ws.get("session", ""), ws.get("profile") or "")
console.print(table)
@workspace_group.command("remove")
@click.argument("name")
def workspace_remove(name):
data = _load()
if name not in data:
raise click.ClickException(f"Workspace '{name}' not found")
del data[name]
_save(data)
console.print(f"[green]Workspace '{name}' removed[/green]")
-1
View File
@@ -39,7 +39,6 @@ PAGEABLE_COMMANDS = {
"extract.links",
"extract.images",
"extract.json",
"cookies.list",
"session.list",
}
+1 -3
View File
@@ -4,7 +4,7 @@ Each namespace groups related browser commands under a short accessor on the
client (``b.tabs``, ``b.dom``, ``b.session``, ...), mirroring the command groups
in the browser extension.
"""
from browser_cli.sdk.browser_data import CookiesNS, StorageNS
from browser_cli.sdk.browser_data import StorageNS
from browser_cli.sdk.decorators import DecoratorsNS
from browser_cli.sdk.dom import DomNS, ExtractNS, PageNS
from browser_cli.sdk.extension import ExtensionNS
@@ -24,7 +24,6 @@ NAMESPACE_SPECS = (
("extract", ExtractNS),
("page", PageNS),
("storage", StorageNS),
("cookies", CookiesNS),
("session", SessionNS),
("perf", PerfNS),
("extension", ExtensionNS),
@@ -40,7 +39,6 @@ __all__ = [
"ExtractNS",
"PageNS",
"StorageNS",
"CookiesNS",
"SessionNS",
"PerfNS",
"ExtensionNS",
+1 -49
View File
@@ -1,4 +1,4 @@
"""Storage and cookies namespaces: ``b.storage.*``, ``b.cookies.*``."""
"""Storage namespace: ``b.storage.*``."""
from __future__ import annotations
from browser_cli.sdk.base import Namespace, sdk_command
@@ -35,51 +35,3 @@ class StorageNS(Namespace):
tab_id: int | None = None,
) -> None:
"""Set a localStorage/sessionStorage entry."""
class CookiesNS(Namespace):
"""List, get, and set cookies."""
@sdk_command("cookies.list", lambda self, *, url=None, domain=None, name=None: {
"url": url,
"domain": domain,
"name": name,
}, default=[])
def list(
self,
*,
url: str | None = None,
domain: str | None = None,
name: str | None = None,
) -> list[dict]:
"""List cookies, optionally filtered by url, domain, or name."""
@sdk_command("cookies.get", lambda self, url, name: {"url": url, "name": name})
def get(self, url: str, name: str) -> dict | None:
"""Get a single cookie by url and name."""
@sdk_command("cookies.set", lambda self, url, name, value, *, domain=None, path=None, secure=None,
http_only=None, expiration_date=None, same_site=None: {
"url": url,
"name": name,
"value": value,
"domain": domain,
"path": path,
"secure": secure,
"httpOnly": http_only,
"expirationDate": expiration_date,
"sameSite": same_site,
})
def set(
self,
url: str,
name: str,
value: str,
*,
domain: str | None = None,
path: str | None = None,
secure: bool | None = None,
http_only: bool | None = None,
expiration_date: float | None = None,
same_site: str | None = None,
) -> dict:
"""Set a cookie. Returns the created cookie dict."""
+8
View File
@@ -6,6 +6,14 @@ from browser_cli.sdk.base import Namespace, sdk_command
class ExtensionNS(Namespace):
"""Control the browser-cli extension itself."""
@sdk_command("extension.info", default={})
def info(self) -> dict:
"""Return extension version, runtime metadata, and capabilities."""
@sdk_command("extension.capabilities", default=[])
def capabilities(self) -> list[str]:
"""Return feature capability strings advertised by the extension."""
@sdk_command("extension.reload")
def reload(self) -> None:
"""Reload the browser-cli extension service worker.
+43 -3
View File
@@ -4,7 +4,7 @@ from __future__ import annotations
from browser_cli.models import Tab
from browser_cli.sdk.base import Namespace, sdk_command
def _open_args(self, url, *, background=False, focus=False, window=None, group=None):
def _open_args(self, url, *, background=False, focus=False, window=None, group=None, **_ignored):
return {"url": url, "background": background or not focus, "focus": focus, "window": window, "group": group}
def _tab_args(self, tab_id=None):
@@ -13,7 +13,6 @@ def _tab_args(self, tab_id=None):
class NavigationNS(Namespace):
"""Open URLs, navigate history, and focus tabs."""
@sdk_command("navigate.open", _open_args)
def open(
self,
url: str,
@@ -22,8 +21,23 @@ class NavigationNS(Namespace):
focus: bool = False,
window: str | None = None,
group: str | None = None,
reuse: bool = False,
reuse_domain: bool = False,
reuse_title: str | None = None,
) -> None:
"""Open *url* in a new tab without stealing OS focus by default."""
"""Open *url* in a new tab without stealing OS focus by default.
``reuse``/``reuse_domain``/``reuse_title`` navigate an existing matching tab
instead of creating a new one.
"""
tab = self._reuse_target(url, reuse=reuse, reuse_domain=reuse_domain, reuse_title=reuse_title)
if tab is not None:
self.to(tab.id, url)
if focus:
self._c.tabs.activate(tab.id)
return None
self.command("navigate.open", _open_args(self, url, background=background, focus=focus, window=window, group=group))
return None
def open_wait(
self,
@@ -34,8 +48,17 @@ class NavigationNS(Namespace):
focus: bool = False,
window: str | None = None,
group: str | None = None,
reuse: bool = False,
reuse_domain: bool = False,
reuse_title: str | None = None,
) -> Tab:
"""Open *url* in a new tab and block until fully loaded. Returns the Tab."""
tab = self._reuse_target(url, reuse=reuse, reuse_domain=reuse_domain, reuse_title=reuse_title)
if tab is not None:
self.to(tab.id, url)
if focus:
self._c.tabs.activate(tab.id)
return self._c.tabs.wait_for_load(tab.id, timeout=timeout)
return self.require_tab(
self.command("navigate.open_wait", {
"url": url, "timeout": int(timeout * 1000),
@@ -68,6 +91,23 @@ class NavigationNS(Namespace):
def to(self, tab_id: int, url: str) -> None:
"""Navigate a specific tab to *url* in place."""
def _reuse_target(self, url: str, *, reuse: bool, reuse_domain: bool, reuse_title: str | None):
if not (reuse or reuse_domain or reuse_title):
return None
from urllib.parse import urlparse
wanted = urlparse(url)
wanted_host = wanted.netloc.lower()
for tab in self._c.tabs.list():
tab_url = tab.url or ""
parsed = urlparse(tab_url)
if reuse and tab_url == url:
return tab
if reuse_domain and wanted_host and parsed.netloc.lower() == wanted_host:
return tab
if reuse_title and reuse_title.lower() in (tab.title or "").lower():
return tab
return None
def search(
self, engine: str, query: str, *,
background: bool = False, focus: bool = False, window: str | None = None, group: str | None = None,
+8
View File
@@ -48,6 +48,14 @@ class SessionNS(Namespace):
def diff(self, name_a: str, name_b: str) -> dict:
"""Diff two saved sessions."""
@sdk_command("session.export", lambda self, name=None: {"name": name}, default={})
def export(self, name: str | None = None) -> dict:
"""Export one saved session, or all sessions when *name* is omitted."""
@sdk_command("session.import", lambda self, name, session, overwrite=False: {"name": name, "session": session, "overwrite": overwrite}, default={})
def import_(self, name: str, session: dict, *, overwrite: bool = False) -> dict:
"""Import a saved session payload under *name*."""
def list(self) -> list[dict]:
"""Return saved sessions.