Compare commits

..

10 Commits

Author SHA1 Message Date
daniel156161 6785b9f70c feat(serve): add remote browser control over TCP with token auth
Build & Publish Package / publish (push) Successful in 50s
Testing / test (push) Successful in 31s
Package Extension / package-extension (push) Successful in 27s
Exposes a local browser over a TCP socket so remote machines can
  control it using the same CLI and Python API. Token auth (auto-generated
  via secrets.token_urlsafe) is on by default; --no-auth disables it.
  Profile routing via _route message field lets clients target specific
  browser instances on the remote host. BROWSER_CLI_PROFILE is forwarded
  automatically so --browser flag works transparently over remote.
  - browser-cli serve [--host] [--port] [--token] [--no-auth]
  - browser-cli --remote HOST:PORT --token TOKEN <command>
  - BrowserCLI(remote="host:port", token="...").tabs_list()
2026-04-25 18:33:59 +02:00
daniel156161 1bf44c0eef update uv lock file
Testing / test (push) Successful in 28s
Package Extension / package-extension (push) Successful in 14s
Build & Publish Package / publish (push) Successful in 26s
2026-04-17 21:09:27 +02:00
daniel156161 cf0c9555d0 update version to 0.7.1
Testing / test (push) Successful in 28s
Package Extension / package-extension (push) Successful in 13s
Build & Publish Package / publish (push) Successful in 31s
2026-04-17 21:08:18 +02:00
daniel156161 a7da6cfab0 hardcode extension id and not prompt user
Testing / test (push) Has been cancelled
2026-04-17 21:07:30 +02:00
daniel156161 88b4f5ed11 add key generation script 2026-04-17 21:06:52 +02:00
daniel156161 36abde501c update version to 0.7.0
Testing / test (push) Successful in 23s
Package Extension / package-extension (push) Successful in 21s
Build & Publish Package / publish (push) Successful in 24s
2026-04-17 20:52:47 +02:00
daniel156161 1aff084429 add deterministic extension key
Testing / test (push) Waiting to run
Extension ID: bfpmkhngkjnfhabmfckgeohlilokodkg
2026-04-17 20:45:36 +02:00
daniel156161 1c5fd0ffee feat: add browser automation commands (v0.6.0)
Testing / test (push) Successful in 24s
Package Extension / package-extension (push) Successful in 9s
Build & Publish Package / publish (push) Successful in 21s
Navigation: open-wait (open + block until loaded)
DOM: key, hover, check/uncheck, clear, focus, submit, poll, scroll, select, eval, wait-for
Tabs: pin/unpin, screenshot, watch-url (block until URL matches regex)
New command groups: page info, storage get/set, cookies list/get/set
Extension: add cookies permission
2026-04-16 14:21:19 +02:00
daniel156161 fc4ce8f74d rename Chrome to Browser
Testing / test (push) Successful in 20s
2026-04-16 11:55:09 +02:00
daniel156161 cd2ebc2982 set defaults to tab empyt url and title with muted false
Testing / test (push) Successful in 31s
2026-04-14 13:21:21 +02:00
18 changed files with 1255 additions and 70 deletions
+7 -7
View File
@@ -8,7 +8,7 @@ Control your real, running browser from the terminal or a Python script — no h
You have 40 tabs open. You want to close all the duplicates, group the GitHub ones, save your session before a meeting, and open a few URLs into a specific group — all from a script. That is what browser-cli is for.
It works by pairing a small Chrome/Brave extension with a Python CLI tool. The extension has full access to your browser's tabs, windows, groups, and page DOM. The CLI talks to it in real time over a local IPC channel.
It works by pairing a small browser extension with a Python CLI tool. The extension has full access to your browser's tabs, windows, groups, and page DOM. The CLI talks to it in real time over a local IPC channel.
---
@@ -23,9 +23,9 @@ terminal / python script
│ Native Messaging Protocol (stdin/stdout, 4-byte length prefix + JSON)
Chrome Extension (background service worker)
Browser Extension (background worker/page)
chrome.* APIs
extension APIs
Your running browser
```
@@ -53,7 +53,7 @@ Every response:
## Installation
**Requirements:** Python 3.10+, [uv](https://github.com/astral-sh/uv), Chrome, Chromium, Brave, Edge, or Vivaldi
**Requirements:** Python 3.10+, [uv](https://github.com/astral-sh/uv), Chrome, Chromium, Brave, Edge, Vivaldi
```sh
git clone <repo>
@@ -63,8 +63,8 @@ uv run browser-cli install brave # or: chrome, chromium, edge, vivaldi
```
The `install` command will:
1. Ask you to load the `extension/` folder as an unpacked extension in your browser (`brave://extensions` → Developer mode → Load unpacked)
2. Ask you to paste the extension ID shown on the extension card
1. Ask you to load the browser-specific extension package
2. For Chromium-family browsers, ask you to paste the extension ID shown on the extension card
3. Write the native messaging manifest to your OS so the browser can find the host
4. Copy the native host into an internal `libexec` directory and create a small wrapper outside your `PATH`
@@ -404,7 +404,7 @@ bash examples/demo.sh
## Limitations
- **Chrome internal pages** (`chrome://`, `brave://`, `about:`) cannot be scripted. DOM and extract commands only work on regular `http://` and `https://` pages.
- **Browser internal pages** (`chrome://`, `brave://`, `edge://`, `about:`) cannot be scripted. DOM and extract commands only work on regular `http://` and `https://` pages.
- **Multiple browser instances can be auto-distinguished, but generated aliases are temporary**. Unaliased browsers get UUID aliases from the native host, which avoids collisions but is less ergonomic than setting a stable alias with `browser-cli clients rename --browser <current-alias> <new-alias>`.
- **Supported install targets are explicit, not “all Chromium browsers”**. The installer currently supports Chrome, Chromium, Brave, Edge, and Vivaldi. Other Chromium-based browsers may use different or shared native messaging manifest locations, so they need browser-specific verification before being added safely.
- **Linux and macOS only** — Windows native messaging paths are not yet handled.
+240 -2
View File
@@ -33,16 +33,22 @@ class BrowserCounts:
class BrowserCLI:
def __init__(self, browser: str | None = None):
def __init__(self, browser: str | None = None, remote: str | None = None, token: str | None = None):
"""
Args:
browser: Profile alias to target. Required when multiple browser
instances are active. Equivalent to ``--browser`` on the CLI.
remote: Connect to a remote browser exposed via ``browser-cli serve``.
Format: ``"host:port"`` (e.g. ``"192.168.1.10:8765"``).
When set, ``browser`` is ignored.
token: Auth token for the remote serve instance.
"""
self._browser = browser
self._remote = remote
self._token = token
def _cmd(self, command: str, args: dict | None = None):
return send_command(command, args, profile=self._browser)
return send_command(command, args, profile=self._browser, remote=self._remote, token=self._token)
def _multi_browser_targets(self):
if self._browser is not None:
@@ -122,6 +128,45 @@ class BrowserCLI:
"""Navigate a specific tab to *url*."""
self._cmd("navigate.to", {"tabId": tab_id, "url": url})
def open_wait(
self,
url: str,
*,
timeout: float = 30.0,
background: bool = False,
window: str | None = None,
group: str | None = None,
) -> "Tab":
"""Open URL in a new tab and block until fully loaded. Returns the Tab."""
data = self._cmd("navigate.open_wait", {
"url": url, "timeout": int(timeout * 1000),
"background": background, "window": window, "group": group,
})
return self._make_tab(data) if isinstance(data, dict) and "id" in data else data
def wait_for_load(
self,
tab_id: int | None = None,
*,
timeout: float = 30.0,
ready_state: str = "complete",
) -> Tab:
"""Block until the tab finishes loading. Returns the Tab when ready.
Args:
tab_id: Tab to watch. Defaults to the active tab.
timeout: Max seconds to wait before raising ``RuntimeError``.
ready_state: ``"complete"`` (default) or ``"interactive"``.
"""
data = self._cmd("navigate.wait", {
"tabId": tab_id,
"timeout": int(timeout * 1000),
"readyState": ready_state,
})
if not isinstance(data, dict) or "id" not in data:
raise RuntimeError("navigate.wait returned unexpected data")
return self._make_tab(data)
# ── Search ────────────────────────────────────────────────────────────
def search(
@@ -190,6 +235,44 @@ class BrowserCLI:
result = self._cmd("tabs.unmute", {"tabId": tab_id})
return result.get("tabId", tab_id) if isinstance(result, dict) else int(tab_id or 0)
def tabs_pin(self, tab_id: int | None = None) -> int:
"""Pin the active tab or a specific tab. Returns the target tab ID."""
result = self._cmd("tabs.pin", {"tabId": tab_id})
return result.get("tabId", tab_id) if isinstance(result, dict) else int(tab_id or 0)
def tabs_unpin(self, tab_id: int | None = None) -> int:
"""Unpin the active tab or a specific tab. Returns the target tab ID."""
result = self._cmd("tabs.unpin", {"tabId": tab_id})
return result.get("tabId", tab_id) if isinstance(result, dict) else int(tab_id or 0)
def tabs_watch_url(
self,
pattern: str,
*,
tab_id: int | None = None,
timeout: float = 30.0,
) -> "Tab":
"""Block until the tab URL matches regex pattern. Returns the Tab."""
data = self._cmd("tabs.watch_url", {"pattern": pattern, "tabId": tab_id, "timeout": int(timeout * 1000)})
return self._make_tab(data) if isinstance(data, dict) and "id" in data else data
def tabs_screenshot(
self,
tab_id: int | None = None,
*,
format: str = "png",
quality: int | None = None,
) -> str:
"""Capture the visible area of a tab. Returns a base64 data URL.
Args:
tab_id: Tab to capture. Defaults to the active tab.
format: ``"png"`` (default) or ``"jpeg"``.
quality: JPEG quality 0-100 (ignored for PNG).
"""
result = self._cmd("tabs.screenshot", {"tabId": tab_id, "format": format, "quality": quality})
return result.get("dataUrl", "") if isinstance(result, dict) else str(result)
def window_active_tab(self, window_id: int) -> Tab:
"""Return active tab for a specific browser window."""
data = self._cmd("tabs.active_in_window", {"windowId": window_id})
@@ -346,6 +429,161 @@ class BrowserCLI:
def dom_exists(self, selector: str) -> bool:
return self._cmd("dom.exists", {"selector": selector})
def dom_scroll(self, selector: str | None = None, *, x: int | None = None, y: int | None = None) -> None:
"""Scroll to a CSS selector or to pixel coordinates."""
self._cmd("dom.scroll", {"selector": selector, "x": x, "y": y})
def dom_select(self, selector: str, value: str) -> None:
"""Set the value of a <select> element."""
self._cmd("dom.select", {"selector": selector, "value": value})
def dom_eval(self, code: str, tab_id: int | None = None):
"""Evaluate JavaScript in the page's main world and return the result."""
return self._cmd("dom.eval", {"code": code, "tabId": tab_id})
def dom_key(self, key: str, selector: str | None = None) -> None:
"""Dispatch a keyboard event. key examples: 'Enter', 'Tab', 'Escape', 'ArrowDown'."""
self._cmd("dom.key", {"key": key, "selector": selector})
def dom_hover(self, selector: str) -> None:
"""Dispatch mouseover/mouseenter on an element."""
self._cmd("dom.hover", {"selector": selector})
def dom_check(self, selector: str) -> None:
"""Check a checkbox."""
self._cmd("dom.check", {"selector": selector})
def dom_uncheck(self, selector: str) -> None:
"""Uncheck a checkbox."""
self._cmd("dom.uncheck", {"selector": selector})
def dom_clear(self, selector: str) -> None:
"""Clear the value of an input element."""
self._cmd("dom.clear", {"selector": selector})
def dom_focus(self, selector: str) -> None:
"""Focus an element."""
self._cmd("dom.focus", {"selector": selector})
def dom_submit(self, selector: str) -> None:
"""Submit the form containing the matched element."""
self._cmd("dom.submit", {"selector": selector})
def dom_poll(
self,
selector: str,
pattern: str,
*,
attr: str | None = None,
timeout: float = 30.0,
interval: float = 0.5,
tab_id: int | None = None,
) -> dict:
"""Poll selector's text/value until it matches regex pattern.
Returns ``{"selector": ..., "value": ..., "pattern": ...}`` when matched.
"""
return self._cmd("dom.poll", {
"selector": selector,
"pattern": pattern,
"attr": attr,
"timeout": int(timeout * 1000),
"interval": int(interval * 1000),
"tabId": tab_id,
})
def dom_wait_for(
self,
selector: str,
*,
timeout: float = 10.0,
visible: bool = False,
hidden: bool = False,
tab_id: int | None = None,
) -> dict:
"""Wait until a CSS selector appears (or disappears) in the DOM.
Args:
selector: CSS selector to watch.
timeout: Max seconds to wait before raising ``RuntimeError``.
visible: Wait until the element has non-zero dimensions.
hidden: Wait until the element is absent or has ``offsetParent == null``.
tab_id: Tab to watch. Defaults to the active tab.
"""
return self._cmd("dom.wait_for", {
"selector": selector,
"timeout": int(timeout * 1000),
"visible": visible,
"hidden": hidden,
"tabId": tab_id,
})
# ── Page ─────────────────────────────────────────────────────────────
def page_info(self) -> dict:
"""Return title, URL, readyState, lang, and meta tags of the active tab."""
return self._cmd("page.info", {}) or {}
# ── Storage ───────────────────────────────────────────────────────────
def storage_get(
self,
key: str | None = None,
*,
type: str = "local",
tab_id: int | None = None,
) -> str | dict | None:
"""Get a localStorage/sessionStorage entry (or all entries if key omitted)."""
return self._cmd("storage.get", {"key": key, "type": type, "tabId": tab_id})
def storage_set(
self,
key: str,
value: str,
*,
type: str = "local",
tab_id: int | None = None,
) -> None:
"""Set a localStorage/sessionStorage entry."""
self._cmd("storage.set", {"key": key, "value": value, "type": type, "tabId": tab_id})
# ── Cookies ───────────────────────────────────────────────────────────
def cookies_list(
self,
*,
url: str | None = None,
domain: str | None = None,
name: str | None = None,
) -> list[dict]:
"""List cookies, optionally filtered by url, domain, or name."""
return self._cmd("cookies.list", {"url": url, "domain": domain, "name": name}) or []
def cookies_get(self, url: str, name: str) -> dict | None:
"""Get a single cookie by url and name."""
return self._cmd("cookies.get", {"url": url, "name": name})
def cookies_set(
self,
url: str,
name: str,
value: str,
*,
domain: str | None = None,
path: str | None = None,
secure: bool | None = None,
http_only: bool | None = None,
expiration_date: float | None = None,
same_site: str | None = None,
) -> dict:
"""Set a cookie. Returns the created cookie dict."""
return self._cmd("cookies.set", {
"url": url, "name": name, "value": value,
"domain": domain, "path": path,
"secure": secure, "httpOnly": http_only,
"expirationDate": expiration_date, "sameSite": same_site,
})
# ── Extract ───────────────────────────────────────────────────────────
def extract_links(self) -> list[dict]:
+25 -5
View File
@@ -21,6 +21,10 @@ from browser_cli.commands.dom import dom_group
from browser_cli.commands.extract import extract_group
from browser_cli.commands.session import session_group
from browser_cli.commands.search import search_group
from browser_cli.commands.page import page_group
from browser_cli.commands.storage import storage_group
from browser_cli.commands.cookies import cookies_group
from browser_cli.commands.serve import cmd_serve
from browser_cli.client import (
send_command,
BrowserNotConnected,
@@ -33,6 +37,7 @@ from browser_cli.platform import install_base_dir, is_windows
console = Console()
NATIVE_HOST_NAME = "com.browsercli.host"
EXTENSION_ID = "bfpmkhngkjnfhabmfckgeohlilokodkg"
NATIVE_HOST_DIRS = {
"chrome": {
@@ -160,14 +165,26 @@ def _print_version(ctx, param, value):
"--browser", default=None, metavar="ALIAS",
help="Browser profile alias to target (required when multiple browsers are active).",
)
@click.option(
"--remote", default=None, metavar="HOST:PORT",
help="Connect to a remote browser exposed via 'browser-cli serve'.",
)
@click.option(
"--token", default=None, metavar="TOKEN",
help="Auth token for the remote browser-cli serve instance.",
)
@click.pass_context
def main(ctx, browser):
def main(ctx, browser, remote, token):
"""Control your running browser from the terminal via a Chrome extension."""
ctx.ensure_object(dict)
ctx.obj["browser"] = browser
ctx.obj["browser_explicit"] = browser is not None
if browser:
os.environ["BROWSER_CLI_PROFILE"] = browser
if remote:
os.environ["BROWSER_CLI_REMOTE"] = remote
if token:
os.environ["BROWSER_CLI_TOKEN"] = token
# ── Sub-command groups ─────────────────────────────────────────────────────────
@@ -179,6 +196,10 @@ main.add_command(dom_group)
main.add_command(extract_group)
main.add_command(session_group)
main.add_command(search_group)
main.add_command(page_group)
main.add_command(storage_group)
main.add_command(cookies_group)
main.add_command(cmd_serve)
# ── clients ────────────────────────────────────────────────────────────────────
@@ -278,7 +299,7 @@ def cmd_install(browser):
wrapper_content = f'@echo off\r\n"{sys.executable}" "{native_host_script_path}" %*\r\n'
wrapper_path.write_text(wrapper_content, encoding="utf-8")
# Ask for extension ID
# Load extension
ext_urls = {
"chrome": "chrome://extensions",
"chromium": "chrome://extensions",
@@ -291,10 +312,9 @@ def cmd_install(browser):
console.print(f" 1. Open [cyan]{ext_url}[/cyan]")
console.print(" 2. Enable [bold]Developer mode[/bold] (top-right toggle)")
console.print(f" 3. Click [bold]Load unpacked[/bold] → select: [cyan]{Path(__file__).parent.parent / 'extension'}[/cyan]")
console.print(" 4. Copy the [bold]Extension ID[/bold] shown on the extension card\n")
console.print(f" 4. Extension ID will be [cyan]{EXTENSION_ID}[/cyan] (fixed by built-in key)\n")
extension_id = click.prompt("Paste your extension ID here")
extension_id = extension_id.strip()
extension_id = EXTENSION_ID
# Build native messaging manifest
manifest = {
+26 -4
View File
@@ -98,34 +98,56 @@ def _resolve_socket(profile: str | None = None) -> str:
)
def send_command(command: str, args: dict | None = None, profile: str | None = None) -> Any:
def send_command(command: str, args: dict | None = None, profile: str | None = None, remote: str | None = None, token: str | None = None) -> Any:
"""Send a command to the browser and return the response data."""
sock_path = _resolve_socket(profile)
remote_endpoint = remote or os.environ.get("BROWSER_CLI_REMOTE")
resolved_token = token or os.environ.get("BROWSER_CLI_TOKEN")
msg = {
"id": str(uuid.uuid4()),
"command": command,
"args": args or {},
}
if remote_endpoint:
if resolved_token:
msg["token"] = resolved_token
route_profile = profile or os.environ.get("BROWSER_CLI_PROFILE")
if route_profile:
msg["_route"] = route_profile
payload = json.dumps(msg).encode("utf-8")
framed = struct.pack("<I", len(payload)) + payload
try:
if is_windows():
if remote_endpoint:
host, _, port_str = remote_endpoint.rpartition(":")
if not host or not port_str:
raise BrowserNotConnected(f"Invalid remote endpoint '{remote_endpoint}': expected host:port")
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.connect((host, int(port_str)))
sock.sendall(framed)
response = _recv_all(sock)
elif is_windows():
sock_path = _resolve_socket(profile)
with PipeClient(sock_path, family="AF_PIPE") as conn:
conn.send_bytes(payload)
response = conn.recv_bytes()
else:
sock_path = _resolve_socket(profile)
with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as sock:
sock.connect(sock_path)
sock.sendall(framed)
response = _recv_all(sock)
except (FileNotFoundError, ConnectionRefusedError, OSError):
if remote_endpoint:
raise BrowserNotConnected(
f"Cannot connect to remote browser at {remote_endpoint}.\n"
"Make sure browser-cli serve is running on the remote host."
)
profile_hint = f" (profile: {profile})" if profile else ""
raise BrowserNotConnected(
f"Cannot connect to browser{profile_hint}.\n"
"Make sure:\n"
" 1. The browser-cli extension is installed and enabled\n"
" 2. The native host is registered: uv run browser-cli install chrome\n"
" 2. The native host is registered: uv run browser-cli install <browser>\n"
" 3. Your browser is running\n"
" Tip: use BROWSER_CLI_PROFILE=<name> to select a specific profile"
)
+86
View File
@@ -0,0 +1,86 @@
import click
from browser_cli.client import send_command, BrowserNotConnected
from rich.console import Console
from rich.table import Table
console = Console()
def _handle(command, args=None):
try:
return send_command(command, args or {})
except BrowserNotConnected as e:
console.print(f"[red]Error:[/red] {e}")
raise SystemExit(1)
except RuntimeError as e:
console.print(f"[red]Browser error:[/red] {e}")
raise SystemExit(1)
@click.group("cookies")
def cookies_group():
"""Manage browser cookies."""
@cookies_group.command("list")
@click.option("--url", default=None, help="Filter by URL")
@click.option("--domain", default=None, help="Filter by domain")
@click.option("--name", default=None, help="Filter by cookie name")
def cookies_list(url, domain, name):
"""List cookies, optionally filtered by URL, domain, or name."""
cookies = _handle("cookies.list", {"url": url, "domain": domain, "name": name}) or []
if not cookies:
console.print("[yellow]No cookies found[/yellow]")
return
table = Table(show_header=True, header_style="bold cyan")
table.add_column("Name")
table.add_column("Value")
table.add_column("Domain")
table.add_column("Path")
table.add_column("Secure", width=7)
table.add_column("HttpOnly", width=9)
for c in cookies:
table.add_row(
c.get("name", ""),
(c.get("value") or "")[:60],
c.get("domain", ""),
c.get("path", ""),
"[green]✓[/green]" if c.get("secure") else "",
"[green]✓[/green]" if c.get("httpOnly") else "",
)
console.print(table)
@cookies_group.command("get")
@click.argument("url")
@click.argument("name")
def cookies_get(url, name):
"""Get the value of a single cookie by URL and NAME."""
cookie = _handle("cookies.get", {"url": url, "name": name})
if cookie is None:
console.print(f"[yellow]Cookie '{name}' not found for {url}[/yellow]")
raise SystemExit(1)
console.print(cookie.get("value", ""))
@cookies_group.command("set")
@click.argument("url")
@click.argument("name")
@click.argument("value")
@click.option("--domain", default=None)
@click.option("--path", default=None)
@click.option("--secure", is_flag=True)
@click.option("--http-only", "http_only", is_flag=True)
@click.option("--expires", "expiration_date", type=float, default=None, help="Unix timestamp")
@click.option("--same-site", type=click.Choice(["no_restriction", "lax", "strict"]), default=None)
def cookies_set(url, name, value, domain, path, secure, http_only, expiration_date, same_site):
"""Set a cookie on URL."""
_handle("cookies.set", {
"url": url, "name": name, "value": value,
"domain": domain, "path": path,
"secure": secure or None,
"httpOnly": http_only or None,
"expirationDate": expiration_date,
"sameSite": same_site,
})
console.print(f"[green]Set cookie:[/green] {name}={value!r} on {url}")
+130
View File
@@ -87,3 +87,133 @@ def dom_exists(selector):
else:
console.print(f"[red]not found[/red]: {selector}")
raise SystemExit(1)
@dom_group.command("scroll")
@click.argument("selector", required=False)
@click.option("--x", type=int, default=None, help="Horizontal scroll position (px)")
@click.option("--y", type=int, default=None, help="Vertical scroll position (px)")
def dom_scroll(selector, x, y):
"""Scroll to a CSS SELECTOR or to an X/Y coordinate."""
_handle("dom.scroll", {"selector": selector, "x": x, "y": y})
target = selector or f"({x or 0}, {y or 0})"
console.print(f"[green]Scrolled to:[/green] {target}")
@dom_group.command("select")
@click.argument("selector")
@click.argument("value")
def dom_select(selector, value):
"""Set the VALUE of a <select> dropdown matching CSS SELECTOR."""
_handle("dom.select", {"selector": selector, "value": value})
console.print(f"[green]Selected '{value}' in:[/green] {selector}")
@dom_group.command("eval")
@click.argument("code")
@click.option("--tab", "tab_id", type=int, default=None, help="Tab ID (default: active tab)")
def dom_eval(code, tab_id):
"""Evaluate JavaScript CODE in the page and print the result."""
result = _handle("dom.eval", {"code": code, "tabId": tab_id})
if result is None:
console.print("[dim]null[/dim]")
else:
console.print(json.dumps(result, indent=2) if isinstance(result, (dict, list)) else str(result))
@dom_group.command("wait-for")
@click.argument("selector")
@click.option("--timeout", type=float, default=10.0, show_default=True, help="Max seconds to wait")
@click.option("--visible", is_flag=True, help="Wait until element is visible (non-zero size)")
@click.option("--hidden", is_flag=True, help="Wait until element is absent or hidden")
@click.option("--tab", "tab_id", type=int, default=None, help="Tab ID (default: active tab)")
def dom_wait_for(selector, timeout, visible, hidden, tab_id):
"""Wait until CSS SELECTOR appears (or disappears) in the DOM."""
_handle("dom.wait_for", {
"selector": selector,
"timeout": int(timeout * 1000),
"visible": visible,
"hidden": hidden,
"tabId": tab_id,
})
state = "hidden" if hidden else ("visible" if visible else "present")
console.print(f"[green]Ready ({state}):[/green] {selector}")
@dom_group.command("key")
@click.argument("key")
@click.option("--selector", default=None, help="CSS selector to target (default: focused element)")
def dom_key(key, selector):
"""Dispatch a keyboard KEY event (e.g. Enter, Tab, Escape, ArrowDown)."""
_handle("dom.key", {"key": key, "selector": selector})
target = selector or "active element"
console.print(f"[green]Key '{key}' sent to:[/green] {target}")
@dom_group.command("hover")
@click.argument("selector")
def dom_hover(selector):
"""Dispatch mouseover/mouseenter on the element matching CSS SELECTOR."""
_handle("dom.hover", {"selector": selector})
console.print(f"[green]Hovered:[/green] {selector}")
@dom_group.command("check")
@click.argument("selector")
def dom_check(selector):
"""Check a checkbox matching CSS SELECTOR."""
_handle("dom.check", {"selector": selector})
console.print(f"[green]Checked:[/green] {selector}")
@dom_group.command("uncheck")
@click.argument("selector")
def dom_uncheck(selector):
"""Uncheck a checkbox matching CSS SELECTOR."""
_handle("dom.uncheck", {"selector": selector})
console.print(f"[green]Unchecked:[/green] {selector}")
@dom_group.command("clear")
@click.argument("selector")
def dom_clear(selector):
"""Clear the value of an input matching CSS SELECTOR."""
_handle("dom.clear", {"selector": selector})
console.print(f"[green]Cleared:[/green] {selector}")
@dom_group.command("focus")
@click.argument("selector")
def dom_focus(selector):
"""Focus the element matching CSS SELECTOR."""
_handle("dom.focus", {"selector": selector})
console.print(f"[green]Focused:[/green] {selector}")
@dom_group.command("submit")
@click.argument("selector")
def dom_submit(selector):
"""Submit the form that contains the element matching CSS SELECTOR."""
_handle("dom.submit", {"selector": selector})
console.print(f"[green]Submitted form for:[/green] {selector}")
@dom_group.command("poll")
@click.argument("selector")
@click.argument("pattern")
@click.option("--attr", default=None, help="Attribute or property to read (default: textContent/value)")
@click.option("--timeout", type=float, default=30.0, show_default=True, help="Max seconds to wait")
@click.option("--interval", type=float, default=0.5, show_default=True, help="Poll interval in seconds")
@click.option("--tab", "tab_id", type=int, default=None, help="Tab ID (default: active tab)")
def dom_poll(selector, pattern, attr, timeout, interval, tab_id):
"""Poll SELECTOR until its text/value matches regex PATTERN."""
result = _handle("dom.poll", {
"selector": selector,
"pattern": pattern,
"attr": attr,
"timeout": int(timeout * 1000),
"interval": int(interval * 1000),
"tabId": tab_id,
})
value = result.get("value", "") if isinstance(result, dict) else ""
console.print(f"[green]Matched:[/green] {selector!r} = {value!r}")
+26
View File
@@ -78,3 +78,29 @@ def cmd_focus(pattern):
console.print(f"[green]Focused:[/green] {result.get('url', result)}")
else:
console.print(f"[yellow]No tab found matching:[/yellow] {pattern}")
@nav_group.command("open-wait")
@click.argument("url")
@click.option("--timeout", type=float, default=30.0, show_default=True, help="Max seconds to wait for load")
@click.option("--bg", is_flag=True, help="Open in background (no focus)")
@click.option("--window", "window_name", default=None, help="Open in named window")
@click.option("--group", "group_name", default=None, help="Open in tab group")
def cmd_open_wait(url, timeout, bg, window_name, group_name):
"""Open URL in a new tab and wait until fully loaded."""
result = _handle("navigate.open_wait", {
"url": url, "timeout": int(timeout * 1000),
"background": bg, "window": window_name, "group": group_name,
})
title = result.get("title", "") if isinstance(result, dict) else ""
console.print(f"[green]Loaded:[/green] {url}" + (f"{title}" if title else ""))
@nav_group.command("wait")
@click.option("--tab", "tab_id", type=int, default=None, help="Tab ID (default: active tab)")
@click.option("--timeout", type=float, default=30.0, show_default=True, help="Max seconds to wait")
@click.option("--ready-state", type=click.Choice(["complete", "interactive"]), default="complete", show_default=True, help="Target ready state")
def cmd_wait(tab_id, timeout, ready_state):
"""Wait until tab finishes loading."""
result = _handle("navigate.wait", {"tabId": tab_id, "timeout": int(timeout * 1000), "readyState": ready_state})
console.print(f"[green]Ready:[/green] {result.get('url', '')}{result.get('title', '')}")
+38
View File
@@ -0,0 +1,38 @@
import click
from browser_cli.client import send_command, BrowserNotConnected
from rich.console import Console
from rich.table import Table
console = Console()
def _handle(command, args=None):
try:
return send_command(command, args or {})
except BrowserNotConnected as e:
console.print(f"[red]Error:[/red] {e}")
raise SystemExit(1)
except RuntimeError as e:
console.print(f"[red]Browser error:[/red] {e}")
raise SystemExit(1)
@click.group("page")
def page_group():
"""Inspect current page metadata."""
@page_group.command("info")
def page_info():
"""Show title, URL, readyState, language, and meta tags of the active tab."""
info = _handle("page.info") or {}
table = Table(show_header=False)
table.add_column("Field", style="bold cyan", no_wrap=True)
table.add_column("Value")
table.add_row("Title", info.get("title") or "")
table.add_row("URL", info.get("url") or "")
table.add_row("Ready", info.get("readyState") or "")
table.add_row("Lang", info.get("lang") or "")
for key, val in (info.get("meta") or {}).items():
table.add_row(f"meta:{key}", val)
console.print(table)
+153
View File
@@ -0,0 +1,153 @@
import threading, secrets, socket, struct, click, json, sys
from rich.console import Console
from datetime import datetime
console = Console()
def _recv_exact(sock:socket.socket, n:int) -> bytes:
buf = b""
while len(buf) < n:
chunk = sock.recv(n - len(buf))
if not chunk:
raise ConnectionError("Connection closed")
buf += chunk
return buf
def _log(addr:tuple, command:str, profile:str|None, status:str, error:str|None=None) -> None:
ts = datetime.now().strftime("%H:%M:%S")
addr_str = f"{addr[0]}:{addr[1]}"
profile_str = f"[dim]{profile}[/dim] " if profile else ""
if error:
console.print(f"[dim]{ts}[/dim] {addr_str} {profile_str}[cyan]{command}[/cyan] [red]{status}[/red] {error}")
else:
console.print(f"[dim]{ts}[/dim] {addr_str} {profile_str}[cyan]{command}[/cyan] [green]{status}[/green]")
def _proxy_request(client_sock:socket.socket, addr:tuple, profile:str|None, server_token:str|None) -> None:
from browser_cli.client import _resolve_socket, BrowserNotConnected
from browser_cli.platform import is_windows
try:
header = _recv_exact(client_sock, 4)
msg_len = struct.unpack("<I", header)[0]
payload = _recv_exact(client_sock, msg_len)
except (ConnectionError, OSError):
return
def _send_error(msg_id, msg:str) -> None:
err = json.dumps({"id": msg_id, "success": False, "error": msg}).encode()
try:
client_sock.sendall(struct.pack("<I", len(err)) + err)
except OSError:
pass
try:
msg = json.loads(payload)
except (json.JSONDecodeError, ValueError):
_send_error(None, "invalid JSON")
_log(addr, "?", None, "ERROR", "invalid JSON")
return
msg_id = msg.get("id")
command = msg.get("command", "?")
if server_token is not None:
if msg.get("token") != server_token:
_send_error(msg_id, "unauthorized: invalid or missing token")
_log(addr, command, None, "DENIED", "bad token")
return
resolved_profile = msg.get("_route") or profile
strip = {"token", "_route"}
if strip & msg.keys():
clean_payload = json.dumps({k: v for k, v in msg.items() if k not in strip}).encode()
clean_header = struct.pack("<I", len(clean_payload))
else:
clean_payload = payload
clean_header = header
try:
sock_path = _resolve_socket(resolved_profile)
except BrowserNotConnected as e:
_send_error(msg_id, str(e))
_log(addr, command, resolved_profile, "ERROR", "browser not connected")
return
try:
if is_windows():
from multiprocessing.connection import Client as PipeClient
with PipeClient(sock_path, family="AF_PIPE") as pipe:
pipe.send_bytes(clean_payload)
resp = pipe.recv_bytes()
client_sock.sendall(struct.pack("<I", len(resp)) + resp)
else:
with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as local:
local.connect(sock_path)
local.sendall(clean_header + clean_payload)
resp_header = _recv_exact(local, 4)
resp_len = struct.unpack("<I", resp_header)[0]
resp_payload = _recv_exact(local, resp_len)
client_sock.sendall(resp_header + resp_payload)
resp_data = json.loads(resp_payload if not is_windows() else resp)
if resp_data.get("success", True):
_log(addr, command, resolved_profile, "OK")
else:
_log(addr, command, resolved_profile, "ERROR", resp_data.get("error", ""))
except OSError as e:
_send_error(msg_id, str(e))
_log(addr, command, resolved_profile, "ERROR", str(e))
def _handle_client(client_sock:socket.socket, addr:tuple, profile:str|None, server_token:str|None) -> None:
with client_sock:
_proxy_request(client_sock, addr, profile, server_token)
@click.command("serve")
@click.option("--host", default="127.0.0.1", show_default=True, help="Address to bind.")
@click.option("--port", default=8765, show_default=True, type=int, help="TCP port to listen on.")
@click.option("--token", default=None, metavar="TOKEN", help="Auth token (auto-generated if omitted).")
@click.option("--no-auth", is_flag=True, default=False, help="Disable token authentication.")
@click.pass_context
def cmd_serve(ctx, host, port, token, no_auth):
"""Expose this browser over TCP so remote hosts can control it."""
profile = ctx.obj.get("browser") if ctx.obj else None
if host in ("0.0.0.0", "::"):
console.print("[yellow]Warning:[/yellow] Binding to all interfaces — anyone who can reach this port controls your browser.")
if no_auth:
server_token = None
else:
server_token = token or secrets.token_urlsafe(32)
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
try:
server.bind((host, port))
except OSError as e:
console.print(f"[red]Cannot bind to {host}:{port}:[/red] {e}")
sys.exit(1)
server.listen(16)
browser_hint = f" (browser: {profile})" if profile else ""
console.print(f"[green]Serving browser{browser_hint} →[/green] [cyan]{host}:{port}[/cyan]")
if server_token:
console.print(f" Token: [bold yellow]{server_token}[/bold yellow]")
console.print(f" CLI: [dim]browser-cli --remote {host}:{port} --token {server_token} tabs list[/dim]")
console.print(f" Python: [dim]BrowserCLI(remote=\"{host}:{port}\", token=\"{server_token}\").tabs_list()[/dim]")
else:
console.print(f" CLI: [dim]browser-cli --remote {host}:{port} tabs list[/dim]")
console.print(f" Python: [dim]BrowserCLI(remote=\"{host}:{port}\").tabs_list()[/dim]")
console.print("[yellow] Auth disabled (--no-auth)[/yellow]")
console.print("Ctrl-C to stop.\n")
try:
while True:
conn, addr = server.accept()
threading.Thread(target=_handle_client, args=(conn, addr, profile, server_token), daemon=True).start()
except KeyboardInterrupt:
console.print("[yellow]Stopped.[/yellow]")
finally:
server.close()
+48
View File
@@ -0,0 +1,48 @@
import json
import click
from browser_cli.client import send_command, BrowserNotConnected
from rich.console import Console
console = Console()
def _handle(command, args=None):
try:
return send_command(command, args or {})
except BrowserNotConnected as e:
console.print(f"[red]Error:[/red] {e}")
raise SystemExit(1)
except RuntimeError as e:
console.print(f"[red]Browser error:[/red] {e}")
raise SystemExit(1)
@click.group("storage")
def storage_group():
"""Read and write the page's localStorage / sessionStorage."""
@storage_group.command("get")
@click.argument("key", required=False)
@click.option("--type", "store_type", type=click.Choice(["local", "session"]), default="local", show_default=True)
@click.option("--tab", "tab_id", type=int, default=None, help="Tab ID (default: active tab)")
def storage_get(key, store_type, tab_id):
"""Get a localStorage KEY (or dump all keys if omitted)."""
result = _handle("storage.get", {"key": key, "type": store_type, "tabId": tab_id})
if result is None:
console.print("[dim]null[/dim]")
elif isinstance(result, dict):
console.print(json.dumps(result, indent=2))
else:
console.print(str(result))
@storage_group.command("set")
@click.argument("key")
@click.argument("value")
@click.option("--type", "store_type", type=click.Choice(["local", "session"]), default="local", show_default=True)
@click.option("--tab", "tab_id", type=int, default=None, help="Tab ID (default: active tab)")
def storage_set(key, value, store_type, tab_id):
"""Set localStorage KEY to VALUE."""
_handle("storage.set", {"key": key, "value": value, "type": store_type, "tabId": tab_id})
console.print(f"[green]Set[/green] {store_type}[{key!r}] = {value!r}")
+52
View File
@@ -1,3 +1,4 @@
import base64
import click
from browser_cli.client import BrowserNotConnected, active_browser_targets, send_command
from rich.console import Console
@@ -236,3 +237,54 @@ def tabs_unmute(tab_id):
result = _handle("tabs.unmute", {"tabId": tab_id})
target = result.get("tabId", tab_id) if isinstance(result, dict) else tab_id
console.print(f"[green]Unmuted tab {target}[/green]")
@tabs_group.command("pin")
@click.argument("tab_id", type=int, required=False)
def tabs_pin(tab_id):
"""Pin the active tab or a specific tab."""
result = _handle("tabs.pin", {"tabId": tab_id})
target = result.get("tabId", tab_id) if isinstance(result, dict) else tab_id
console.print(f"[green]Pinned tab {target}[/green]")
@tabs_group.command("unpin")
@click.argument("tab_id", type=int, required=False)
def tabs_unpin(tab_id):
"""Unpin the active tab or a specific tab."""
result = _handle("tabs.unpin", {"tabId": tab_id})
target = result.get("tabId", tab_id) if isinstance(result, dict) else tab_id
console.print(f"[green]Unpinned tab {target}[/green]")
@tabs_group.command("watch-url")
@click.argument("pattern")
@click.option("--tab", "tab_id", type=int, default=None, help="Tab ID (default: active tab)")
@click.option("--timeout", type=float, default=30.0, show_default=True, help="Max seconds to wait")
def tabs_watch_url(pattern, tab_id, timeout):
"""Wait until the active (or specified) tab URL matches regex PATTERN."""
result = _handle("tabs.watch_url", {"pattern": pattern, "tabId": tab_id, "timeout": int(timeout * 1000)})
url = result.get("url", "") if isinstance(result, dict) else ""
console.print(f"[green]URL matched:[/green] {url}")
@tabs_group.command("screenshot")
@click.argument("output", required=False, metavar="FILE")
@click.option("--tab", "tab_id", type=int, default=None, help="Tab ID (default: active tab)")
@click.option("--format", "fmt", type=click.Choice(["png", "jpeg"]), default="png", show_default=True)
@click.option("--quality", type=int, default=None, help="JPEG quality 0-100")
def tabs_screenshot(output, tab_id, fmt, quality):
"""Capture a screenshot of the active (or specified) tab.
Saves to FILE if given, otherwise prints the base64 data URL.
"""
result = _handle("tabs.screenshot", {"tabId": tab_id, "format": fmt, "quality": quality})
data_url = result.get("dataUrl", "") if isinstance(result, dict) else ""
if output:
header = f"data:image/{fmt};base64,"
raw = base64.b64decode(data_url[len(header):])
with open(output, "wb") as f:
f.write(raw)
console.print(f"[green]Screenshot saved:[/green] {output}")
else:
console.print(data_url)
+3 -3
View File
@@ -29,9 +29,9 @@ class Tab:
id: int
window_id: int
active: bool
muted: bool
title: str
url: str
muted: bool = False
title: str = ""
url: str = ""
group_id: int | None = None
browser: str | None = None
_browser: BrowserCLI | None = field(default=None, repr=False, compare=False, init=False)
+294
View File
@@ -127,6 +127,8 @@ async function dispatch(command, args) {
case "navigate.back": return navBack(args);
case "navigate.forward": return navForward(args);
case "navigate.focus": return navFocus(args);
case "navigate.wait": return navWait(args);
case "navigate.open_wait": return navOpenWait(args);
// ── Tabs ──────────────────────────────────────────────────────────────
case "tabs.list": return tabsList();
@@ -144,6 +146,10 @@ async function dispatch(command, args) {
case "tabs.merge_windows": return tabsMergeWindows();
case "tabs.mute": return tabsMute(args);
case "tabs.unmute": return tabsUnmute(args);
case "tabs.pin": return tabsPin(args);
case "tabs.unpin": return tabsUnpin(args);
case "tabs.screenshot": return tabsScreenshot(args);
case "tabs.watch_url": return tabsWatchUrl(args);
// ── Groups ────────────────────────────────────────────────────────────
case "group.list": return groupList();
@@ -168,6 +174,30 @@ async function dispatch(command, args) {
case "dom.attr": return domOp("domAttr", args);
case "dom.text": return domOp("domText", args);
case "dom.exists": return domOp("domExists", args);
case "dom.scroll": return domOp("domScroll", args);
case "dom.select": return domOp("domSelect", args);
case "dom.key": return domOp("domKey", args);
case "dom.hover": return domOp("domHover", args);
case "dom.check": return domOp("domCheck", { ...args, checked: true });
case "dom.uncheck": return domOp("domCheck", { ...args, checked: false });
case "dom.clear": return domOp("domClear", args);
case "dom.focus": return domOp("domFocus", args);
case "dom.submit": return domOp("domSubmit", args);
case "dom.eval": return domEval(args);
case "dom.wait_for": return domWaitFor(args);
case "dom.poll": return domPoll(args);
// ── Page ──────────────────────────────────────────────────────────────
case "page.info": return domOp("pageInfo", {});
// ── Storage ───────────────────────────────────────────────────────────
case "storage.get": return storageGet(args);
case "storage.set": return storageSet(args);
// ── Cookies ───────────────────────────────────────────────────────────
case "cookies.list": return cookiesList(args);
case "cookies.get": return cookiesGet(args);
case "cookies.set": return cookiesSet(args);
// ── Extract ───────────────────────────────────────────────────────────
case "extract.links": return domOp("extractLinks", args);
@@ -267,6 +297,25 @@ async function navFocus({ pattern }) {
return { id: match.id, url: match.url || match.pendingUrl, title: match.title };
}
async function navWait({ tabId, timeout = 30000, readyState = "complete" } = {}) {
const tab = tabId ? { id: tabId } : await getActiveTab();
const deadline = Date.now() + timeout;
const interval = 200;
while (Date.now() < deadline) {
const t = await chrome.tabs.get(tab.id);
if (readyState === "complete" ? t.status === "complete" : t.status !== "loading") {
return tabInfo(t);
}
await new Promise(r => setTimeout(r, interval));
}
throw new Error(`Tab ${tab.id} did not reach status '${readyState}' within ${timeout}ms`);
}
async function navOpenWait({ url, timeout = 30000, background, window: windowName, group } = {}) {
const opened = await navOpen({ url, background, window: windowName, group });
return await navWait({ tabId: opened.id, timeout });
}
// ── Tabs ──────────────────────────────────────────────────────────────────────
async function tabsList() {
@@ -442,6 +491,47 @@ async function tabsMergeWindows() {
return { moved };
}
async function tabsPin({ tabId }) {
const tab = tabId ? await chrome.tabs.get(tabId) : await getActiveTab();
await chrome.tabs.update(tab.id, { pinned: true });
return { tabId: tab.id, pinned: true };
}
async function tabsUnpin({ tabId }) {
const tab = tabId ? await chrome.tabs.get(tabId) : await getActiveTab();
await chrome.tabs.update(tab.id, { pinned: false });
return { tabId: tab.id, pinned: false };
}
async function tabsScreenshot({ tabId, format = "png", quality } = {}) {
let windowId;
if (tabId) {
const tab = await chrome.tabs.get(tabId);
await chrome.tabs.update(tabId, { active: true });
windowId = tab.windowId;
} else {
const tab = await getActiveTab();
windowId = tab.windowId;
}
const opts = { format };
if (format === "jpeg" && quality != null) opts.quality = quality;
const dataUrl = await chrome.tabs.captureVisibleTab(windowId, opts);
return { dataUrl, format };
}
async function tabsWatchUrl({ pattern, timeout = 30000, tabId } = {}) {
const tab = tabId ? await chrome.tabs.get(tabId) : await getActiveTab();
const deadline = Date.now() + timeout;
const regex = new RegExp(pattern);
while (Date.now() < deadline) {
const t = await chrome.tabs.get(tab.id);
const url = t.url || t.pendingUrl || "";
if (regex.test(url)) return tabInfo(t);
await new Promise(r => setTimeout(r, 200));
}
throw new Error(`Tab ${tab.id} URL did not match '${pattern}' within ${timeout}ms`);
}
async function tabsMute({ tabId }) {
const tab = await resolveTabForDirectAction(tabId, "mute");
await chrome.tabs.update(tab.id, { muted: true });
@@ -616,6 +706,131 @@ async function domOp(funcName, args) {
return results[0]?.result;
}
async function domEval({ code, tabId } = {}) {
const tab = tabId ? await chrome.tabs.get(tabId) : await getActiveTab();
if (!isScriptableUrl(tab.url)) {
throw new Error(`Cannot run DOM commands on ${tab.url} — navigate to a regular web page first`);
}
const results = await executeScript({
target: { tabId: tab.id },
world: "MAIN",
func: (c) => (0, eval)(c),
args: [code],
});
return results[0]?.result ?? null;
}
async function domWaitFor({ selector, timeout = 10000, visible = false, hidden = false, tabId } = {}) {
const tab = tabId ? await chrome.tabs.get(tabId) : await getActiveTab();
if (!isScriptableUrl(tab.url)) {
throw new Error(`Cannot run DOM commands on ${tab.url} — navigate to a regular web page first`);
}
const deadline = Date.now() + timeout;
while (Date.now() < deadline) {
const results = await executeScript({
target: { tabId: tab.id },
func: (sel, vis, hid) => {
const el = document.querySelector(sel);
if (hid) return !el || el.offsetParent === null;
if (!el) return false;
if (vis) {
const r = el.getBoundingClientRect();
return r.width > 0 && r.height > 0;
}
return true;
},
args: [selector, visible, hidden],
});
if (results[0]?.result) return { selector, found: !hidden };
await new Promise(r => setTimeout(r, 200));
}
throw new Error(`Selector '${selector}' condition not met within ${timeout}ms`);
}
async function domPoll({ selector, pattern, attr, timeout = 30000, interval = 500, tabId } = {}) {
const tab = tabId ? await chrome.tabs.get(tabId) : await getActiveTab();
if (!isScriptableUrl(tab.url)) {
throw new Error(`Cannot run DOM commands on ${tab.url} — navigate to a regular web page first`);
}
const deadline = Date.now() + timeout;
const regex = new RegExp(pattern);
while (Date.now() < deadline) {
const results = await executeScript({
target: { tabId: tab.id },
func: (sel, a) => {
const el = document.querySelector(sel);
if (!el) return null;
if (a) return el.getAttribute(a) ?? el[a] ?? null;
return el.value !== undefined ? el.value : el.textContent.trim();
},
args: [selector, attr || null],
});
const value = results[0]?.result;
if (value != null && regex.test(String(value))) return { selector, value, pattern };
await new Promise(r => setTimeout(r, interval));
}
throw new Error(`Selector '${selector}' did not match '${pattern}' within ${timeout}ms`);
}
async function storageGet({ key, type = "local", tabId } = {}) {
const tab = tabId ? await chrome.tabs.get(tabId) : await getActiveTab();
if (!isScriptableUrl(tab.url)) {
throw new Error(`Cannot access storage on ${tab.url} — navigate to a regular web page first`);
}
const results = await executeScript({
target: { tabId: tab.id },
world: "MAIN",
func: (k, t) => {
const store = t === "session" ? sessionStorage : localStorage;
if (k) return store.getItem(k);
return Object.fromEntries(Object.keys(store).map(key => [key, store.getItem(key)]));
},
args: [key || null, type],
});
return results[0]?.result ?? null;
}
async function storageSet({ key, value, type = "local", tabId } = {}) {
const tab = tabId ? await chrome.tabs.get(tabId) : await getActiveTab();
if (!isScriptableUrl(tab.url)) {
throw new Error(`Cannot access storage on ${tab.url} — navigate to a regular web page first`);
}
const results = await executeScript({
target: { tabId: tab.id },
world: "MAIN",
func: (k, v, t) => {
const store = t === "session" ? sessionStorage : localStorage;
store.setItem(k, typeof v === "string" ? v : JSON.stringify(v));
return true;
},
args: [key, value, type],
});
return results[0]?.result ?? false;
}
async function cookiesList({ url, domain, name } = {}) {
const details = {};
if (url) details.url = url;
if (domain) details.domain = domain;
if (name) details.name = name;
return await chrome.cookies.getAll(details);
}
async function cookiesGet({ url, name }) {
return await chrome.cookies.get({ url, name });
}
async function cookiesSet({ url, name, value, domain, path, secure, httpOnly, expirationDate, sameSite } = {}) {
const details = { url, name, value };
if (domain != null) details.domain = domain;
if (path != null) details.path = path;
if (secure != null) details.secure = secure;
if (httpOnly != null) details.httpOnly = httpOnly;
if (expirationDate != null) details.expirationDate = expirationDate;
if (sameSite != null) details.sameSite = sameSite;
return await chrome.cookies.set(details);
}
// This function is serialized and injected into the page by chrome.scripting
function contentDispatch(funcName, args) {
function domQuery({ selector }) {
@@ -653,6 +868,83 @@ function contentDispatch(funcName, args) {
function domExists({ selector }) {
return document.querySelector(selector) !== null;
}
function domKey({ selector, key }) {
const el = selector ? document.querySelector(selector) : document.activeElement;
if (selector && !el) throw new Error(`No element: ${selector}`);
const target = el || document.body;
["keydown", "keypress", "keyup"].forEach(type => {
target.dispatchEvent(new KeyboardEvent(type, { key, bubbles: true, cancelable: true }));
});
return true;
}
function domHover({ selector }) {
const el = document.querySelector(selector);
if (!el) throw new Error(`No element: ${selector}`);
el.dispatchEvent(new MouseEvent("mouseover", { bubbles: true }));
el.dispatchEvent(new MouseEvent("mouseenter", { bubbles: true }));
return true;
}
function domCheck({ selector, checked }) {
const el = document.querySelector(selector);
if (!el) throw new Error(`No element: ${selector}`);
el.checked = checked;
el.dispatchEvent(new Event("change", { bubbles: true }));
return true;
}
function domClear({ selector }) {
const el = document.querySelector(selector);
if (!el) throw new Error(`No element: ${selector}`);
el.value = "";
el.dispatchEvent(new Event("input", { bubbles: true }));
el.dispatchEvent(new Event("change", { bubbles: true }));
return true;
}
function domFocus({ selector }) {
const el = document.querySelector(selector);
if (!el) throw new Error(`No element: ${selector}`);
el.focus();
return true;
}
function domSubmit({ selector }) {
const el = document.querySelector(selector);
if (!el) throw new Error(`No element: ${selector}`);
const form = el.tagName === "FORM" ? el : el.closest("form");
if (!form) throw new Error(`No form found for: ${selector}`);
form.submit();
return true;
}
function pageInfo() {
const metas = {};
document.querySelectorAll("meta[name], meta[property]").forEach(m => {
const k = m.getAttribute("name") || m.getAttribute("property");
if (k) metas[k] = m.getAttribute("content") || "";
});
return {
title: document.title,
url: location.href,
readyState: document.readyState,
lang: document.documentElement.lang || null,
meta: metas,
};
}
function domScroll({ selector, x, y }) {
if (selector) {
const el = document.querySelector(selector);
if (!el) throw new Error(`No element: ${selector}`);
el.scrollIntoView({ behavior: "smooth", block: "center" });
return true;
}
window.scrollTo({ top: y || 0, left: x || 0, behavior: "smooth" });
return true;
}
function domSelect({ selector, value }) {
const el = document.querySelector(selector);
if (!el) throw new Error(`No element: ${selector}`);
el.value = value;
el.dispatchEvent(new Event("change", { bubbles: true }));
el.dispatchEvent(new Event("input", { bubbles: true }));
return true;
}
function extractLinks() {
const seen = new Set();
return Array.from(document.querySelectorAll("a[href]")).reduce((links, a) => {
@@ -1090,6 +1382,8 @@ function contentDispatch(funcName, args) {
}
const fns = { domQuery, domClick, domType, domAttr, domText, domExists,
domScroll, domSelect, domKey, domHover, domCheck, domClear, domFocus, domSubmit,
pageInfo,
extractLinks, extractImages, extractText, extractJson, extractMarkdown };
const fn = fns[funcName];
if (!fn) throw new Error(`Unknown content function: ${funcName}`);
+8 -4
View File
@@ -1,7 +1,7 @@
{
"manifest_version": 3,
"name": "browser-cli",
"version": "0.5.12",
"version": "0.7.1",
"description": "Control your browser from the terminal via browser-cli",
"permissions": [
"tabs",
@@ -10,9 +10,12 @@
"windows",
"storage",
"alarms",
"nativeMessaging"
"nativeMessaging",
"cookies"
],
"host_permissions": [
"<all_urls>"
],
"host_permissions": ["<all_urls>"],
"background": {
"service_worker": "background.js"
},
@@ -28,5 +31,6 @@
"16": "icons/icon-16.png",
"32": "icons/icon-32.png"
}
}
},
"key": "MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAlfCvygCocGbU2Bm2Rg6cnvHN0Lt25gJGJ/XX7VuAccrp4dH+Whj3Fw2vYSjgx90wuWuMl5fsWSsSX9H1k1vp7ImGzszCDnScn+o+KRWrVCQVRD1NEaKavuHoaHyc3Hs+njrM8c7c6u2ygdItZkggwPU0U1dKkixP/DWR9oG13Gr4u39p/xHxITiBh0DROYdoKBzw/J+vT7zWITKyG7QBgLMuoaYc15oqRIm7raBW1GIn1A5V2WPpBM9rMAli4vCyc9rbqsUqO1Yu4SrNIoG+wfz3MED3ajylDH6Jh1bsf1l5EZNDR/EpqBsSQcEV0VXX7nkqchqgzh3bgT9psiUQAQIDAQAB"
}
+2 -2
View File
@@ -1,7 +1,7 @@
[project]
name = "browser-cli"
version = "0.5.12"
description = "Control your real running browser from the terminal via a Chrome extension"
version = "0.8.0"
description = "Control your real running browser from the terminal via a browser extension"
requires-python = ">=3.10"
dependencies = [
"click>=8",
+74
View File
@@ -0,0 +1,74 @@
#!/usr/bin/env -S uv run
"""Generate or derive Chrome extension key and ID.
Usage:
python scripts/gen_extension_key.py # generate new key pair
python scripts/gen_extension_key.py --from-manifest # derive ID from extension/manifest.json
python scripts/gen_extension_key.py --key <base64> # derive ID from given public key
"""
import argparse, hashlib
import base64, json, sys
from pathlib import Path
def public_key_to_extension_id(pub_key_der:bytes) -> str:
digest = hashlib.sha256(pub_key_der).hexdigest()
return "".join(chr(ord("a") + int(c, 16)) for c in digest[:32])
def derive_from_key_b64(key_b64:str) -> tuple[str, str]:
der = base64.b64decode(key_b64)
ext_id = public_key_to_extension_id(der)
return key_b64, ext_id
def generate_new_key() -> tuple[str, str, str]:
try:
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives.serialization import (
Encoding,
PublicFormat,
PrivateFormat,
NoEncryption,
)
except ImportError:
print("Install 'cryptography' to generate new keys: pip install cryptography", file=sys.stderr)
sys.exit(1)
private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
pub_der = private_key.public_key().public_bytes(Encoding.DER, PublicFormat.SubjectPublicKeyInfo)
priv_pem = private_key.private_bytes(Encoding.PEM, PrivateFormat.TraditionalOpenSSL, NoEncryption()).decode()
key_b64 = base64.b64encode(pub_der).decode()
ext_id = public_key_to_extension_id(pub_der)
return key_b64, ext_id, priv_pem
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Chrome extension key/ID tool")
group = parser.add_mutually_exclusive_group()
group.add_argument("--from-manifest", action="store_true", help="Derive ID from extension/manifest.json")
group.add_argument("--key", metavar="BASE64", help="Derive ID from given base64 public key")
args = parser.parse_args()
if args.from_manifest:
manifest_path = Path(__file__).parent.parent / "extension" / "manifest.json"
manifest = json.loads(manifest_path.read_text())
key_b64 = manifest.get("key")
if not key_b64:
print("No 'key' field in manifest.json", file=sys.stderr)
sys.exit(1)
key_b64, ext_id = derive_from_key_b64(key_b64)
print(f"Extension ID: {ext_id}")
print(f"Key (b64): {key_b64}")
elif args.key:
key_b64, ext_id = derive_from_key_b64(args.key)
print(f"Extension ID: {ext_id}")
else:
key_b64, ext_id, priv_pem = generate_new_key()
print(f"Extension ID: {ext_id}")
print(f"Key (b64): {key_b64}")
print()
print("Add this to extension/manifest.json:")
print(f' "key": "{key_b64}"')
print()
print("Private key (keep secret, needed to re-derive same ID):")
print(priv_pem)
+35 -35
View File
@@ -122,7 +122,7 @@ class TestNavigation:
mock_send.assert_called_once_with(
"navigate.open",
{"url": "https://example.com", "background": False, "window": None, "group": None},
profile=None,
profile=None, remote=None, token=None,
)
def test_open_background(self, b, mock_send):
@@ -136,33 +136,33 @@ class TestNavigation:
def test_reload(self, b, mock_send):
b.reload(tab_id=5)
mock_send.assert_called_once_with("navigate.reload", {"tabId": 5}, profile=None)
mock_send.assert_called_once_with("navigate.reload", {"tabId": 5}, profile=None, remote=None, token=None)
def test_hard_reload(self, b, mock_send):
b.hard_reload(tab_id=7)
mock_send.assert_called_once_with("navigate.hard_reload", {"tabId": 7}, profile=None)
mock_send.assert_called_once_with("navigate.hard_reload", {"tabId": 7}, profile=None, remote=None, token=None)
def test_back(self, b, mock_send):
b.back(tab_id=3)
mock_send.assert_called_once_with("navigate.back", {"tabId": 3}, profile=None)
mock_send.assert_called_once_with("navigate.back", {"tabId": 3}, profile=None, remote=None, token=None)
def test_forward(self, b, mock_send):
b.forward(tab_id=3)
mock_send.assert_called_once_with("navigate.forward", {"tabId": 3}, profile=None)
mock_send.assert_called_once_with("navigate.forward", {"tabId": 3}, profile=None, remote=None, token=None)
def test_focus_url(self, b, mock_send):
b.focus_url("github.com")
mock_send.assert_called_once_with("navigate.focus", {"pattern": "github.com"}, profile=None)
mock_send.assert_called_once_with("navigate.focus", {"pattern": "github.com"}, profile=None, remote=None, token=None)
def test_navigate_tab(self, b, mock_send):
b.navigate_tab(5, "https://example.com")
mock_send.assert_called_once_with(
"navigate.to", {"tabId": 5, "url": "https://example.com"}, profile=None
"navigate.to", {"tabId": 5, "url": "https://example.com"}, profile=None, remote=None, token=None
)
def test_profile_forwarded(self, b_profile, mock_send):
b_profile.reload()
mock_send.assert_called_once_with("navigate.reload", {"tabId": None}, profile="brave")
mock_send.assert_called_once_with("navigate.reload", {"tabId": None}, profile="brave", remote=None, token=None)
# ── Search ────────────────────────────────────────────────────────────────────
@@ -195,12 +195,12 @@ class TestExtract:
result = b.extract_markdown()
assert result == "# Title"
mock_send.assert_called_once_with("extract.markdown", {"selector": None}, profile=None)
mock_send.assert_called_once_with("extract.markdown", {"selector": None}, profile=None, remote=None, token=None)
def test_extract_markdown_selector(self, b, mock_send):
b.extract_markdown("article")
mock_send.assert_called_once_with("extract.markdown", {"selector": "article"}, profile=None)
mock_send.assert_called_once_with("extract.markdown", {"selector": "article"}, profile=None, remote=None, token=None)
# ── Tabs ──────────────────────────────────────────────────────────────────────
@@ -235,7 +235,7 @@ class TestTabs:
mock_send.assert_called_once_with(
"tabs.close",
{"tabId": 10, "inactive": False, "duplicates": False},
profile=None,
profile=None, remote=None, token=None,
)
def test_tabs_move(self, b, mock_send):
@@ -243,19 +243,19 @@ class TestTabs:
mock_send.assert_called_once_with(
"tabs.move",
{"tabId": 10, "forward": True, "backward": False, "groupId": None, "windowId": None, "index": None},
profile=None,
profile=None, remote=None, token=None,
)
def test_tabs_active(self, b, mock_send):
b.tabs_active(10)
mock_send.assert_called_once_with("tabs.active", {"tabId": 10}, profile=None)
mock_send.assert_called_once_with("tabs.active", {"tabId": 10}, profile=None, remote=None, token=None)
def test_window_active_tab(self, b, mock_send):
mock_send.return_value = TAB_DATA
tab = b.window_active_tab(1)
assert isinstance(tab, Tab)
assert tab.id == 10
mock_send.assert_called_once_with("tabs.active_in_window", {"windowId": 1}, profile=None)
mock_send.assert_called_once_with("tabs.active_in_window", {"windowId": 1}, profile=None, remote=None, token=None)
def test_window_active_tab_missing_raises(self, b, mock_send):
mock_send.return_value = None
@@ -308,7 +308,7 @@ class TestTabs:
assert mock_send.call_args_list == [
call("tabs.list", {}, profile="default"),
call("tabs.list", {}, profile="work"),
call("tabs.close", {"tabId": 11}, profile="work"),
call("tabs.close", {"tabId": 11}, profile="work", remote=None, token=None),
]
def test_tabs_count_multi_browser_returns_browser_counts(self, b, mock_send):
@@ -351,7 +351,7 @@ class TestTabs:
def test_tabs_sort(self, b, mock_send):
b.tabs_sort(by="title")
mock_send.assert_called_once_with("tabs.sort", {"by": "title"}, profile=None)
mock_send.assert_called_once_with("tabs.sort", {"by": "title"}, profile=None, remote=None, token=None)
def test_tabs_merge_windows(self, b, mock_send):
mock_send.return_value = {"moved": 4}
@@ -384,7 +384,7 @@ class TestGroups:
mock_send.return_value = [TAB_DATA]
tabs = b.group_tabs(42)
assert isinstance(tabs[0], Tab)
mock_send.assert_called_once_with("group.tabs", {"groupId": 42}, profile=None)
mock_send.assert_called_once_with("group.tabs", {"groupId": 42}, profile=None, remote=None, token=None)
def test_group_count(self, b, mock_send):
mock_send.return_value = 7
@@ -412,7 +412,7 @@ class TestGroups:
assert mock_send.call_args_list == [
call("group.list", {}, profile="default"),
call("group.list", {}, profile="work"),
call("group.close", {"groupId": 99}, profile="work"),
call("group.close", {"groupId": 99}, profile="work", remote=None, token=None),
]
def test_group_count_multi_browser_returns_browser_counts(self, b, mock_send):
@@ -435,7 +435,7 @@ class TestGroups:
def test_group_close(self, b, mock_send):
b.group_close(42)
mock_send.assert_called_once_with("group.close", {"groupId": 42}, profile=None)
mock_send.assert_called_once_with("group.close", {"groupId": 42}, profile=None, remote=None, token=None)
def test_group_create_dict_response(self, b, mock_send):
mock_send.return_value = GROUP_DATA
@@ -455,7 +455,7 @@ class TestGroups:
tab_id = b.group_add_tab(42, "https://example.com")
assert tab_id == 55
mock_send.assert_called_once_with(
"group.add_tab", {"group": "42", "url": "https://example.com"}, profile=None
"group.add_tab", {"group": "42", "url": "https://example.com"}, profile=None, remote=None, token=None
)
def test_group_add_tab_non_dict_response(self, b, mock_send):
@@ -465,7 +465,7 @@ class TestGroups:
def test_group_move_forward(self, b, mock_send):
b.group_move(42, forward=True)
mock_send.assert_called_once_with(
"group.move", {"group": "42", "forward": True, "backward": False}, profile=None
"group.move", {"group": "42", "forward": True, "backward": False}, profile=None, remote=None, token=None
)
@@ -495,7 +495,7 @@ class TestWindows:
result = b.windows_open()
assert result == {"id": 5}
mock_send.assert_called_once_with("windows.open", {"url": None}, profile=None)
mock_send.assert_called_once_with("windows.open", {"url": None}, profile=None, remote=None, token=None)
def test_windows_open_with_url(self, b, mock_send):
mock_send.return_value = {"id": 9}
@@ -503,7 +503,7 @@ class TestWindows:
result = b.windows_open("https://example.com")
assert result == {"id": 9}
mock_send.assert_called_once_with("windows.open", {"url": "https://example.com"}, profile=None)
mock_send.assert_called_once_with("windows.open", {"url": "https://example.com"}, profile=None, remote=None, token=None)
class TestSession:
@@ -513,7 +513,7 @@ class TestSession:
result = b.session_list()
assert result == [{"name": "saved", "tabs": 3, "savedAt": 1712707200000}]
mock_send.assert_called_once_with("session.list", {}, profile=None)
mock_send.assert_called_once_with("session.list", {}, profile=None, remote=None, token=None)
def test_session_list_multi_browser_adds_browser(self, b, mock_send):
with patch(
@@ -548,26 +548,26 @@ class TestTabModel:
def test_close(self, tab, mock_send):
tab.close()
mock_send.assert_called_once_with("tabs.close", {"tabId": 10}, profile=None)
mock_send.assert_called_once_with("tabs.close", {"tabId": 10}, profile=None, remote=None, token=None)
def test_activate(self, tab, mock_send):
tab.activate()
mock_send.assert_called_once_with("tabs.active", {"tabId": 10}, profile=None)
mock_send.assert_called_once_with("tabs.active", {"tabId": 10}, profile=None, remote=None, token=None)
def test_reload(self, tab, mock_send):
tab.reload()
mock_send.assert_called_once_with("navigate.reload", {"tabId": 10}, profile=None)
mock_send.assert_called_once_with("navigate.reload", {"tabId": 10}, profile=None, remote=None, token=None)
def test_hard_reload(self, tab, mock_send):
tab.hard_reload()
mock_send.assert_called_once_with("navigate.hard_reload", {"tabId": 10}, profile=None)
mock_send.assert_called_once_with("navigate.hard_reload", {"tabId": 10}, profile=None, remote=None, token=None)
def test_move_forward(self, tab, mock_send):
tab.move(forward=True)
mock_send.assert_called_once_with(
"tabs.move",
{"tabId": 10, "forward": True, "backward": False, "groupId": None, "windowId": None, "index": None},
profile=None,
profile=None, remote=None, token=None,
)
def test_move_to_group(self, tab, mock_send):
@@ -577,12 +577,12 @@ class TestTabModel:
def test_html(self, tab, mock_send):
mock_send.return_value = "<html/>"
assert tab.html() == "<html/>"
mock_send.assert_called_once_with("tabs.html", {"tabId": 10}, profile=None)
mock_send.assert_called_once_with("tabs.html", {"tabId": 10}, profile=None, remote=None, token=None)
def test_open(self, tab, mock_send):
tab.open("https://new.example.com")
mock_send.assert_called_once_with(
"navigate.to", {"tabId": 10, "url": "https://new.example.com"}, profile=None
"navigate.to", {"tabId": 10, "url": "https://new.example.com"}, profile=None, remote=None, token=None
)
def test_open_background_changes_same_tab(self, tab, mock_send):
@@ -590,7 +590,7 @@ class TestTabModel:
mock_send.assert_called_once_with(
"navigate.to",
{"tabId": 10, "url": "https://new.example.com"},
profile=None,
profile=None, remote=None, token=None,
)
def test_unbound_raises(self):
@@ -608,18 +608,18 @@ class TestGroupModel:
def test_close(self, group, mock_send):
group.close()
mock_send.assert_called_once_with("group.close", {"groupId": 42}, profile=None)
mock_send.assert_called_once_with("group.close", {"groupId": 42}, profile=None, remote=None, token=None)
def test_tabs(self, group, mock_send):
mock_send.return_value = [TAB_DATA]
tabs = group.tabs()
assert isinstance(tabs[0], Tab)
mock_send.assert_called_once_with("group.tabs", {"groupId": 42}, profile=None)
mock_send.assert_called_once_with("group.tabs", {"groupId": 42}, profile=None, remote=None, token=None)
def test_move_forward(self, group, mock_send):
group.move(forward=True)
mock_send.assert_called_once_with(
"group.move", {"group": "42", "forward": True, "backward": False}, profile=None
"group.move", {"group": "42", "forward": True, "backward": False}, profile=None, remote=None, token=None
)
def test_move_backward(self, group, mock_send):
Generated
+1 -1
View File
@@ -4,7 +4,7 @@ requires-python = ">=3.10"
[[package]]
name = "browser-cli"
version = "0.5.7"
version = "0.8.0"
source = { editable = "." }
dependencies = [
{ name = "click" },