init commit

This commit is contained in:
2026-04-08 21:17:59 +02:00
commit 178b7bf7a2
24 changed files with 2466 additions and 0 deletions
+168
View File
@@ -0,0 +1,168 @@
"""
browser_cli — Python API for controlling your running browser.
Usage:
from browser_cli import BrowserCLI
b = BrowserCLI()
tabs = b.tabs_list()
b.open("https://example.com")
"""
from browser_cli.client import BrowserNotConnected, send_command
__all__ = ["BrowserCLI", "BrowserNotConnected"]
class BrowserCLI:
# ── Navigation ────────────────────────────────────────────────────────
def open(self, url: str, *, background: bool = False, window: str | None = None):
return send_command("navigate.open", {"url": url, "background": background, "window": window})
def reload(self, tab_id: int | None = None):
return send_command("navigate.reload", {"tabId": tab_id})
def hard_reload(self, tab_id: int | None = None):
return send_command("navigate.hard_reload", {"tabId": tab_id})
def back(self, tab_id: int | None = None):
return send_command("navigate.back", {"tabId": tab_id})
def forward(self, tab_id: int | None = None):
return send_command("navigate.forward", {"tabId": tab_id})
def focus_url(self, pattern: str):
return send_command("navigate.focus", {"pattern": pattern})
# ── Tabs ──────────────────────────────────────────────────────────────
def tabs_list(self) -> list[dict]:
return send_command("tabs.list", {})
def tabs_close(self, tab_id: int | None = None, *, inactive: bool = False, duplicates: bool = False):
return send_command("tabs.close", {"tabId": tab_id, "inactive": inactive, "duplicates": duplicates})
def tabs_move(self, tab_id: int, *, group_id: int | None = None, window_id: int | None = None):
return send_command("tabs.move", {"tabId": tab_id, "groupId": group_id, "windowId": window_id})
def tabs_active(self, tab_id: int):
return send_command("tabs.active", {"tabId": tab_id})
def tabs_filter(self, pattern: str) -> list[dict]:
return send_command("tabs.filter", {"pattern": pattern})
def tabs_count(self, pattern: str | None = None) -> int:
return send_command("tabs.count", {"pattern": pattern})
def tabs_query(self, search: str) -> list[dict]:
return send_command("tabs.query", {"search": search})
def tabs_html(self, tab_id: int | None = None) -> str:
return send_command("tabs.html", {"tabId": tab_id})
def tabs_dedupe(self):
return send_command("tabs.dedupe", {})
def tabs_sort(self, by: str = "domain"):
return send_command("tabs.sort", {"by": by})
def tabs_merge_windows(self):
return send_command("tabs.merge_windows", {})
def tabs_close_inactive(self):
return send_command("tabs.close", {"inactive": True})
def tabs_close_duplicates(self):
return send_command("tabs.close", {"duplicates": True})
# ── Tab Groups ────────────────────────────────────────────────────────
def group_list(self) -> list[dict]:
return send_command("group.list", {})
def group_tabs(self, group_id: int) -> list[dict]:
return send_command("group.tabs", {"groupId": group_id})
def group_count(self) -> int:
return send_command("group.count", {})
def group_query(self, search: str) -> list[dict]:
return send_command("group.query", {"search": search})
def group_close(self, group_id: int):
return send_command("group.close", {"groupId": group_id})
def group_open(self, name: str) -> dict:
return send_command("group.open", {"name": name})
# ── Windows ───────────────────────────────────────────────────────────
def windows_list(self) -> list[dict]:
return send_command("windows.list", {})
def windows_rename(self, window_id: int, name: str):
return send_command("windows.rename", {"windowId": window_id, "name": name})
def windows_close(self, window_id: int):
return send_command("windows.close", {"windowId": window_id})
def windows_open(self, profile: str | None = None) -> dict:
return send_command("windows.open", {"profile": profile})
# ── DOM ───────────────────────────────────────────────────────────────
def dom_query(self, selector: str) -> list[dict]:
return send_command("dom.query", {"selector": selector})
def dom_click(self, selector: str):
return send_command("dom.click", {"selector": selector})
def dom_type(self, selector: str, text: str):
return send_command("dom.type", {"selector": selector, "text": text})
def dom_attr(self, selector: str, attr: str) -> list[str]:
return send_command("dom.attr", {"selector": selector, "attr": attr})
def dom_text(self, selector: str) -> list[str]:
return send_command("dom.text", {"selector": selector})
def dom_exists(self, selector: str) -> bool:
return send_command("dom.exists", {"selector": selector})
# ── Extract ───────────────────────────────────────────────────────────
def extract_links(self) -> list[dict]:
return send_command("extract.links", {})
def extract_images(self) -> list[dict]:
return send_command("extract.images", {})
def extract_text(self) -> str:
return send_command("extract.text", {})
def extract_json(self, selector: str):
return send_command("extract.json", {"selector": selector})
# ── Session ───────────────────────────────────────────────────────────
def session_save(self, name: str):
return send_command("session.save", {"name": name})
def session_load(self, name: str):
return send_command("session.load", {"name": name})
def session_diff(self, name_a: str, name_b: str) -> dict:
return send_command("session.diff", {"nameA": name_a, "nameB": name_b})
def session_list(self) -> list[dict]:
return send_command("session.list", {})
def session_remove(self, name: str):
return send_command("session.remove", {"name": name})
def session_auto_save(self, enabled: bool):
return send_command("session.auto_save", {"enabled": enabled})
# ── Misc ──────────────────────────────────────────────────────────────
def clients(self) -> list[dict]:
return send_command("clients.list", {})
+154
View File
@@ -0,0 +1,154 @@
#!/usr/bin/env python3
"""
browser-cli — Control your running browser from the terminal.
"""
import click
import sys
import os
import json
import stat
from pathlib import Path
from rich.console import Console
from browser_cli.commands.navigate import cmd_open, cmd_reload, cmd_hard_reload, cmd_back, cmd_forward, cmd_focus
from browser_cli.commands.tabs import tabs_group
from browser_cli.commands.groups import group_group
from browser_cli.commands.windows import windows_group
from browser_cli.commands.dom import dom_group
from browser_cli.commands.extract import extract_group
from browser_cli.commands.session import session_group
from browser_cli.client import send_command, BrowserNotConnected
console = Console()
NATIVE_HOST_NAME = "com.browsercli.host"
NATIVE_HOST_DIRS = {
"chrome": {
"linux": [Path.home() / ".config/google-chrome/NativeMessagingHosts"],
"darwin": [Path.home() / "Library/Application Support/Google/Chrome/NativeMessagingHosts"],
},
"chromium": {
"linux": [Path.home() / ".config/chromium/NativeMessagingHosts"],
"darwin": [Path.home() / "Library/Application Support/Chromium/NativeMessagingHosts"],
},
"brave": {
"linux": [Path.home() / ".config/BraveSoftware/Brave-Browser/NativeMessagingHosts"],
"darwin": [Path.home() / "Library/Application Support/BraveSoftware/Brave-Browser/NativeMessagingHosts"],
},
}
@click.group()
def main():
"""Control your running browser from the terminal via a Chrome extension."""
# ── Top-level navigation commands ─────────────────────────────────────────────
main.add_command(cmd_open, name="open")
main.add_command(cmd_reload, name="reload")
main.add_command(cmd_hard_reload, name="hard-reload")
main.add_command(cmd_back, name="back")
main.add_command(cmd_forward, name="forward")
main.add_command(cmd_focus, name="focus")
# ── Sub-command groups ─────────────────────────────────────────────────────────
main.add_command(tabs_group)
main.add_command(group_group)
main.add_command(windows_group)
main.add_command(dom_group)
main.add_command(extract_group)
main.add_command(session_group)
# ── clients ────────────────────────────────────────────────────────────────────
@main.command("clients")
def cmd_clients():
"""Show connected browser clients."""
try:
clients = send_command("clients.list")
except BrowserNotConnected as e:
console.print(f"[red]Error:[/red] {e}")
sys.exit(1)
from rich.table import Table
table = Table(show_header=True, header_style="bold cyan")
table.add_column("Browser")
table.add_column("Version")
table.add_column("Platform")
for c in (clients or []):
table.add_row(c.get("name", ""), c.get("version", ""), c.get("platform", ""))
console.print(table)
# ── install ────────────────────────────────────────────────────────────────────
@main.command("install")
@click.argument("browser", type=click.Choice(["chrome", "chromium", "brave"]), default="chrome")
def cmd_install(browser):
"""Register the native messaging host and print extension load instructions."""
# Find the native_host.py path
native_host_script = Path(__file__).parent / "native_host.py"
if not native_host_script.exists():
console.print(f"[red]Cannot find native_host.py at {native_host_script}[/red]")
sys.exit(1)
# Build a wrapper shell script so it's executable by Chrome
wrapper_path = Path(__file__).parent.parent / "browser-cli-native-host"
python_exe = sys.executable
wrapper_content = f"""#!/bin/sh
exec "{python_exe}" "{native_host_script}" "$@"
"""
wrapper_path.write_text(wrapper_content)
wrapper_path.chmod(wrapper_path.stat().st_mode | stat.S_IEXEC | stat.S_IXGRP | stat.S_IXOTH)
# Ask for extension ID
ext_url = "brave://extensions" if browser == "brave" else "chrome://extensions"
console.print("\n[bold]Step 1:[/bold] Load the extension in your browser")
console.print(f" 1. Open [cyan]{ext_url}[/cyan]")
console.print(" 2. Enable [bold]Developer mode[/bold] (top-right toggle)")
console.print(f" 3. Click [bold]Load unpacked[/bold] → select: [cyan]{Path(__file__).parent.parent / 'extension'}[/cyan]")
console.print(" 4. Copy the [bold]Extension ID[/bold] shown on the extension card\n")
extension_id = click.prompt("Paste your extension ID here")
extension_id = extension_id.strip()
# Build native messaging manifest
manifest = {
"name": NATIVE_HOST_NAME,
"description": "browser-cli native messaging host",
"path": str(wrapper_path),
"type": "stdio",
"allowed_origins": [f"chrome-extension://{extension_id}/"],
}
# Write to OS native messaging dirs
platform = "darwin" if sys.platform == "darwin" else "linux"
dirs = NATIVE_HOST_DIRS[browser][platform]
installed = []
for d in dirs:
try:
d.mkdir(parents=True, exist_ok=True)
manifest_path = d / f"{NATIVE_HOST_NAME}.json"
manifest_path.write_text(json.dumps(manifest, indent=2))
installed.append(manifest_path)
except Exception as e:
console.print(f"[yellow]Could not write to {d}: {e}[/yellow]")
if not installed:
console.print("[red]Failed to install native host manifest[/red]")
sys.exit(1)
for p in installed:
console.print(f"[green]✓[/green] Wrote native host manifest: {p}")
console.print("\n[bold]Step 2:[/bold] Restart Chrome completely (Cmd/Ctrl+Q, then reopen)")
console.print("\n[green bold]✓ Installation complete![/green bold]")
console.print(" After restarting Chrome, try: [cyan]browser-cli tabs list[/cyan]")
if __name__ == "__main__":
main()
+61
View File
@@ -0,0 +1,61 @@
"""
Unix socket client — sends commands to the native host relay socket.
Used by both the CLI and the public Python API.
"""
import json
import socket
import struct
import uuid
from typing import Any
SOCKET_PATH = "/tmp/browser-cli.sock"
class BrowserNotConnected(Exception):
"""Raised when the native host socket is not available."""
def send_command(command: str, args: dict | None = None) -> Any:
"""Send a command to the browser and return the response data."""
msg = {
"id": str(uuid.uuid4()),
"command": command,
"args": args or {},
}
payload = json.dumps(msg).encode("utf-8")
framed = struct.pack("<I", len(payload)) + payload
try:
with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as sock:
sock.connect(SOCKET_PATH)
sock.sendall(framed)
response = _recv_all(sock)
except (FileNotFoundError, ConnectionRefusedError, OSError):
raise BrowserNotConnected(
"Cannot connect to browser.\n"
"Make sure:\n"
" 1. The browser-cli extension is installed and enabled\n"
" 2. The native host is registered: uv run browser-cli install chrome\n"
" 3. Your browser is running"
)
result = json.loads(response)
if not result.get("success", True):
raise RuntimeError(result.get("error", "unknown error from browser"))
return result.get("data")
def _recv_all(sock: socket.socket) -> bytes:
raw_len = _recv_exact(sock, 4)
msg_len = struct.unpack("<I", raw_len)[0]
return _recv_exact(sock, msg_len)
def _recv_exact(sock: socket.socket, n: int) -> bytes:
buf = b""
while len(buf) < n:
chunk = sock.recv(n - len(buf))
if not chunk:
raise ConnectionError("Socket closed before full message received")
buf += chunk
return buf
View File
+89
View File
@@ -0,0 +1,89 @@
import click
from browser_cli.client import send_command, BrowserNotConnected
from rich.console import Console
from rich.table import Table
import json
console = Console()
def _handle(command, args=None):
try:
return send_command(command, args or {})
except BrowserNotConnected as e:
console.print(f"[red]Error:[/red] {e}")
raise SystemExit(1)
except RuntimeError as e:
console.print(f"[red]Browser error:[/red] {e}")
raise SystemExit(1)
@click.group("dom")
def dom_group():
"""Query and interact with page DOM elements."""
@dom_group.command("query")
@click.argument("selector")
def dom_query(selector):
"""Return elements matching CSS SELECTOR (like mini DevTools)."""
elements = _handle("dom.query", {"selector": selector})
if not elements:
console.print("[yellow]No elements found[/yellow]")
return
table = Table(show_header=True, header_style="bold cyan")
table.add_column("Tag", width=12)
table.add_column("Text", width=40)
table.add_column("Attributes")
for el in elements:
attrs = ", ".join(f"{k}={v!r}" for k, v in (el.get("attrs") or {}).items())
table.add_row(el.get("tag", ""), (el.get("text") or "")[:60], attrs[:80])
console.print(table)
@dom_group.command("click")
@click.argument("selector")
def dom_click(selector):
"""Click the first element matching CSS SELECTOR."""
_handle("dom.click", {"selector": selector})
console.print(f"[green]Clicked:[/green] {selector}")
@dom_group.command("type")
@click.argument("selector")
@click.argument("text")
def dom_type(selector, text):
"""Type TEXT into the element matching CSS SELECTOR."""
_handle("dom.type", {"selector": selector, "text": text})
console.print(f"[green]Typed into:[/green] {selector}")
@dom_group.command("attr")
@click.argument("selector")
@click.argument("attr_name")
def dom_attr(selector, attr_name):
"""Get attribute ATTR_NAME from elements matching CSS SELECTOR."""
values = _handle("dom.attr", {"selector": selector, "attr": attr_name})
for v in (values or []):
console.print(v)
@dom_group.command("text")
@click.argument("selector")
def dom_text(selector):
"""Get text content of elements matching CSS SELECTOR."""
values = _handle("dom.text", {"selector": selector})
for v in (values or []):
console.print(v)
@dom_group.command("exists")
@click.argument("selector")
def dom_exists(selector):
"""Check if an element matching CSS SELECTOR exists on the page."""
exists = _handle("dom.exists", {"selector": selector})
if exists:
console.print(f"[green]exists[/green]: {selector}")
else:
console.print(f"[red]not found[/red]: {selector}")
raise SystemExit(1)
+68
View File
@@ -0,0 +1,68 @@
import click
import json
from browser_cli.client import send_command, BrowserNotConnected
from rich.console import Console
from rich.table import Table
console = Console()
def _handle(command, args=None):
try:
return send_command(command, args or {})
except BrowserNotConnected as e:
console.print(f"[red]Error:[/red] {e}")
raise SystemExit(1)
except RuntimeError as e:
console.print(f"[red]Browser error:[/red] {e}")
raise SystemExit(1)
@click.group("extract")
def extract_group():
"""Extract content from the active tab."""
@extract_group.command("links")
def extract_links():
"""Extract all links from the active tab."""
links = _handle("extract.links")
if not links:
console.print("[yellow]No links found[/yellow]")
return
table = Table(show_header=True, header_style="bold cyan")
table.add_column("Text", width=40)
table.add_column("URL")
for lnk in links:
table.add_row((lnk.get("text") or "")[:60], lnk.get("href") or "")
console.print(table)
@extract_group.command("images")
def extract_images():
"""Extract all images from the active tab."""
images = _handle("extract.images")
if not images:
console.print("[yellow]No images found[/yellow]")
return
table = Table(show_header=True, header_style="bold cyan")
table.add_column("Alt", width=30)
table.add_column("Src")
for img in images:
table.add_row((img.get("alt") or "")[:40], img.get("src") or "")
console.print(table)
@extract_group.command("text")
def extract_text():
"""Extract all visible text from the active tab."""
text = _handle("extract.text")
console.print(text or "")
@extract_group.command("json")
@click.argument("selector")
def extract_json(selector):
"""Parse and pretty-print JSON content inside SELECTOR."""
data = _handle("extract.json", {"selector": selector})
console.print_json(json.dumps(data))
+102
View File
@@ -0,0 +1,102 @@
import click
from browser_cli.client import send_command, BrowserNotConnected
from rich.console import Console
from rich.table import Table
console = Console()
def _handle(command, args=None):
try:
return send_command(command, args or {})
except BrowserNotConnected as e:
console.print(f"[red]Error:[/red] {e}")
raise SystemExit(1)
except RuntimeError as e:
console.print(f"[red]Browser error:[/red] {e}")
raise SystemExit(1)
def _print_groups(groups: list[dict]) -> None:
if not groups:
console.print("[yellow]No groups found[/yellow]")
return
table = Table(show_header=True, header_style="bold cyan")
table.add_column("ID", style="dim", no_wrap=True)
table.add_column("Name")
table.add_column("Color", width=10)
table.add_column("Collapsed", width=10)
table.add_column("Tabs", width=6)
for g in groups:
table.add_row(
str(g.get("id", "")),
g.get("title") or "(unnamed)",
g.get("color") or "",
"yes" if g.get("collapsed") else "no",
str(g.get("tabCount", "")),
)
console.print(table)
@click.group("group")
def group_group():
"""Manage tab groups."""
@group_group.command("list")
def group_list():
"""List all tab groups."""
groups = _handle("group.list")
_print_groups(groups or [])
@group_group.command("tabs")
@click.argument("group_id", type=int)
def group_tabs(group_id):
"""List tabs inside a group."""
from browser_cli.commands.tabs import _print_tabs
tabs = _handle("group.tabs", {"groupId": group_id})
_print_tabs(tabs or [])
@group_group.command("count")
def group_count():
"""Count all tab groups."""
count = _handle("group.count")
console.print(f"[bold]{count}[/bold] group(s)")
@group_group.command("query")
@click.argument("search")
def group_query(search):
"""Search groups by name."""
groups = _handle("group.query", {"search": search})
_print_groups(groups or [])
@group_group.command("close")
@click.argument("group_id", type=int)
def group_close(group_id):
"""Close (ungroup and optionally close) a tab group."""
_handle("group.close", {"groupId": group_id})
console.print(f"[green]Group {group_id} closed[/green]")
@group_group.command("create")
@click.argument("name")
def group_create(name):
"""Create a new tab group with NAME."""
result = _handle("group.open", {"name": name})
gid = result.get("id") if isinstance(result, dict) else result
console.print(f"[green]Created group '{name}'[/green] (id: {gid})")
@group_group.command("add-tab")
@click.argument("group")
@click.argument("url", required=False)
def group_add_tab(group, url):
"""Open a new tab (optionally at URL) inside GROUP (name or ID)."""
result = _handle("group.add_tab", {"group": group, "url": url})
tab_id = result.get("tabId") if isinstance(result, dict) else result
label = url or "new tab"
console.print(f"[green]Opened {label}[/green] in group '{group}' (tab id: {tab_id})")
+75
View File
@@ -0,0 +1,75 @@
import click
from browser_cli.client import send_command, BrowserNotConnected
from rich.console import Console
console = Console()
def _handle(command, args):
try:
return send_command(command, args)
except BrowserNotConnected as e:
console.print(f"[red]Error:[/red] {e}")
raise SystemExit(1)
except RuntimeError as e:
console.print(f"[red]Browser error:[/red] {e}")
raise SystemExit(1)
@click.command("open")
@click.argument("url")
@click.option("--bg", is_flag=True, help="Open in background (no focus)")
@click.option("--window", "window_name", default=None, help="Open in named window")
@click.option("--group", "group_name", default=None, help="Open directly into a tab group (name or ID)")
def cmd_open(url, bg, window_name, group_name):
"""Open URL in a new tab."""
result = _handle("navigate.open", {"url": url, "background": bg, "window": window_name, "group": group_name})
suffix = ""
if group_name:
suffix = f" in group '{group_name}'"
elif window_name:
suffix = f" in window '{window_name}'"
console.print(f"[green]Opened:[/green] {url}{suffix}")
@click.command("reload")
@click.argument("tab_id", type=int, required=False)
def cmd_reload(tab_id):
"""Reload the active (or specified) tab."""
_handle("navigate.reload", {"tabId": tab_id})
console.print("[green]Reloaded[/green]")
@click.command("hard-reload")
@click.argument("tab_id", type=int, required=False)
def cmd_hard_reload(tab_id):
"""Hard reload (bypass cache) the active (or specified) tab."""
_handle("navigate.hard_reload", {"tabId": tab_id})
console.print("[green]Hard reloaded[/green]")
@click.command("back")
@click.argument("tab_id", type=int, required=False)
def cmd_back(tab_id):
"""Navigate back in the active (or specified) tab."""
_handle("navigate.back", {"tabId": tab_id})
console.print("[green]Navigated back[/green]")
@click.command("forward")
@click.argument("tab_id", type=int, required=False)
def cmd_forward(tab_id):
"""Navigate forward in the active (or specified) tab."""
_handle("navigate.forward", {"tabId": tab_id})
console.print("[green]Navigated forward[/green]")
@click.command("focus")
@click.argument("pattern")
def cmd_focus(pattern):
"""Jump to the first tab whose URL matches PATTERN."""
result = _handle("navigate.focus", {"pattern": pattern})
if result:
console.print(f"[green]Focused:[/green] {result.get('url', result)}")
else:
console.print(f"[yellow]No tab found matching:[/yellow] {pattern}")
+103
View File
@@ -0,0 +1,103 @@
import click
import json
from browser_cli.client import send_command, BrowserNotConnected
from rich.console import Console
console = Console()
def _handle(command, args=None):
try:
return send_command(command, args or {})
except BrowserNotConnected as e:
console.print(f"[red]Error:[/red] {e}")
raise SystemExit(1)
except RuntimeError as e:
console.print(f"[red]Browser error:[/red] {e}")
raise SystemExit(1)
@click.group("session")
def session_group():
"""Save and restore browser sessions."""
@session_group.command("save")
@click.argument("name")
def session_save(name):
"""Save all current tabs as session NAME."""
result = _handle("session.save", {"name": name})
count = result.get("tabs", 0) if isinstance(result, dict) else 0
console.print(f"[green]Session '{name}' saved[/green] ({count} tabs)")
@session_group.command("load")
@click.argument("name")
def session_load(name):
"""Restore session NAME (opens all saved tabs)."""
result = _handle("session.load", {"name": name})
count = result.get("tabs", 0) if isinstance(result, dict) else 0
console.print(f"[green]Session '{name}' loaded[/green] ({count} tabs opened)")
@session_group.command("diff")
@click.argument("name_a")
@click.argument("name_b")
def session_diff(name_a, name_b):
"""Show tabs added/removed between two saved sessions."""
diff = _handle("session.diff", {"nameA": name_a, "nameB": name_b})
if not diff:
console.print("[yellow]No diff data returned[/yellow]")
return
added = diff.get("added") or []
removed = diff.get("removed") or []
if added:
console.print(f"[green]Added in '{name_b}':[/green]")
for url in added:
console.print(f" + {url}")
if removed:
console.print(f"[red]Removed in '{name_b}':[/red]")
for url in removed:
console.print(f" - {url}")
if not added and not removed:
console.print("[green]Sessions are identical[/green]")
@session_group.command("list")
def session_list():
"""List all saved sessions."""
from rich.table import Table
sessions = _handle("session.list")
if not sessions:
console.print("[yellow]No saved sessions[/yellow]")
return
table = Table(show_header=True, header_style="bold cyan")
table.add_column("Name")
table.add_column("Tabs", width=6)
table.add_column("Saved at")
for s in sessions:
from datetime import datetime
saved = datetime.fromtimestamp(s["savedAt"] / 1000).strftime("%Y-%m-%d %H:%M") if s.get("savedAt") else ""
table.add_row(s["name"], str(s["tabs"]), saved)
console.print(table)
@session_group.command("remove")
@click.argument("name")
def session_remove(name):
"""Delete a saved session."""
_handle("session.remove", {"name": name})
console.print(f"[green]Session '{name}' removed[/green]")
@session_group.command("auto-save")
@click.argument("state", type=click.Choice(["on", "off"]))
def session_auto_save(state):
"""Enable or disable automatic session saving."""
enabled = state == "on"
_handle("session.auto_save", {"enabled": enabled})
console.print(f"[green]Auto-save {state}[/green]")
+138
View File
@@ -0,0 +1,138 @@
import click
from browser_cli.client import send_command, BrowserNotConnected
from rich.console import Console
from rich.table import Table
console = Console()
def _handle(command, args=None):
try:
return send_command(command, args or {})
except BrowserNotConnected as e:
console.print(f"[red]Error:[/red] {e}")
raise SystemExit(1)
except RuntimeError as e:
console.print(f"[red]Browser error:[/red] {e}")
raise SystemExit(1)
def _print_tabs(tabs: list[dict]) -> None:
if not tabs:
console.print("[yellow]No tabs found[/yellow]")
return
table = Table(show_header=True, header_style="bold cyan")
table.add_column("ID", style="dim", no_wrap=True)
table.add_column("Window", no_wrap=True)
table.add_column("Active", width=7)
table.add_column("Title")
table.add_column("URL")
for t in tabs:
active = "[green]✓[/green]" if t.get("active") else ""
table.add_row(
str(t.get("id", "")),
str(t.get("windowId", "")),
active,
(t.get("title") or "")[:60],
(t.get("url") or "")[:80],
)
console.print(table)
@click.group("tabs")
def tabs_group():
"""Manage browser tabs."""
@tabs_group.command("list")
def tabs_list():
"""List all open tabs across all windows."""
tabs = _handle("tabs.list")
_print_tabs(tabs or [])
@tabs_group.command("close")
@click.argument("tab_id", type=int, required=False)
@click.option("--inactive", is_flag=True, help="Close all inactive tabs")
@click.option("--duplicates", is_flag=True, help="Close duplicate tabs (keep first)")
def tabs_close(tab_id, inactive, duplicates):
"""Close a tab, all inactive tabs, or all duplicate tabs."""
result = _handle("tabs.close", {"tabId": tab_id, "inactive": inactive, "duplicates": duplicates})
count = result.get("closed", 0) if isinstance(result, dict) else 1
console.print(f"[green]Closed {count} tab(s)[/green]")
@tabs_group.command("move")
@click.argument("tab_id", type=int)
@click.option("--group", "group_id", type=int, default=None, help="Move to tab group ID")
@click.option("--window", "window_id", type=int, default=None, help="Move to window ID")
@click.option("--index", type=int, default=None, help="Position index in target")
def tabs_move(tab_id, group_id, window_id, index):
"""Move a tab to a different window or group."""
_handle("tabs.move", {"tabId": tab_id, "groupId": group_id, "windowId": window_id, "index": index})
console.print("[green]Tab moved[/green]")
@tabs_group.command("active")
@click.argument("tab_id", type=int)
def tabs_active(tab_id):
"""Switch browser focus to a tab."""
_handle("tabs.active", {"tabId": tab_id})
console.print(f"[green]Switched to tab {tab_id}[/green]")
@tabs_group.command("filter")
@click.argument("pattern")
def tabs_filter(pattern):
"""List tabs whose URL contains PATTERN."""
tabs = _handle("tabs.filter", {"pattern": pattern})
_print_tabs(tabs or [])
@tabs_group.command("count")
@click.argument("pattern", required=False)
def tabs_count(pattern):
"""Count open tabs, optionally filtered by URL PATTERN."""
count = _handle("tabs.count", {"pattern": pattern})
label = f" matching '{pattern}'" if pattern else ""
console.print(f"[bold]{count}[/bold] tab(s){label}")
@tabs_group.command("query")
@click.argument("search")
def tabs_query(search):
"""Search tabs by URL or title."""
tabs = _handle("tabs.query", {"search": search})
_print_tabs(tabs or [])
@tabs_group.command("html")
@click.argument("tab_id", type=int, required=False)
def tabs_html(tab_id):
"""Print the full HTML of a tab."""
html = _handle("tabs.html", {"tabId": tab_id})
console.print(html or "")
@tabs_group.command("dedupe")
def tabs_dedupe():
"""Close duplicate tabs (keep the first occurrence of each URL)."""
result = _handle("tabs.dedupe")
count = result.get("closed", 0) if isinstance(result, dict) else 0
console.print(f"[green]Closed {count} duplicate tab(s)[/green]")
@tabs_group.command("sort")
@click.option("--by", type=click.Choice(["domain", "title", "time"]), default="domain", show_default=True)
def tabs_sort(by):
"""Sort tabs within each window."""
_handle("tabs.sort", {"by": by})
console.print(f"[green]Tabs sorted by {by}[/green]")
@tabs_group.command("merge-windows")
def tabs_merge_windows():
"""Move all tabs into the focused window."""
result = _handle("tabs.merge_windows")
count = result.get("moved", 0) if isinstance(result, dict) else 0
console.print(f"[green]Merged — moved {count} tab(s) into current window[/green]")
+77
View File
@@ -0,0 +1,77 @@
import click
from browser_cli.client import send_command, BrowserNotConnected
from rich.console import Console
from rich.table import Table
console = Console()
def _handle(command, args=None):
try:
return send_command(command, args or {})
except BrowserNotConnected as e:
console.print(f"[red]Error:[/red] {e}")
raise SystemExit(1)
except RuntimeError as e:
console.print(f"[red]Browser error:[/red] {e}")
raise SystemExit(1)
def _print_windows(windows: list[dict]) -> None:
if not windows:
console.print("[yellow]No windows found[/yellow]")
return
table = Table(show_header=True, header_style="bold cyan")
table.add_column("ID", style="dim", no_wrap=True)
table.add_column("Alias", width=20)
table.add_column("Focused", width=8)
table.add_column("Tabs", width=6)
table.add_column("State", width=12)
for w in windows:
focused = "[green]✓[/green]" if w.get("focused") else ""
table.add_row(
str(w.get("id", "")),
w.get("alias") or "",
focused,
str(w.get("tabCount", "")),
w.get("state") or "",
)
console.print(table)
@click.group("windows")
def windows_group():
"""Manage browser windows."""
@windows_group.command("list")
def windows_list():
"""List all browser windows."""
windows = _handle("windows.list")
_print_windows(windows or [])
@windows_group.command("rename")
@click.argument("window_id", type=int)
@click.argument("name")
def windows_rename(window_id, name):
"""Give a window a local alias NAME (stored in native host)."""
_handle("windows.rename", {"windowId": window_id, "name": name})
console.print(f"[green]Window {window_id} aliased as '{name}'[/green]")
@windows_group.command("close")
@click.argument("window_id", type=int)
def windows_close(window_id):
"""Close a browser window."""
_handle("windows.close", {"windowId": window_id})
console.print(f"[green]Window {window_id} closed[/green]")
@windows_group.command("open")
@click.option("--profile", default=None, help="Open with a specific Chrome profile name")
def windows_open(profile):
"""Open a new browser window."""
result = _handle("windows.open", {"profile": profile})
wid = result.get("id") if isinstance(result, dict) else result
console.print(f"[green]Opened new window[/green] (id: {wid})" + (f" with profile '{profile}'" if profile else ""))
+159
View File
@@ -0,0 +1,159 @@
#!/usr/bin/env python3
"""
Native Messaging Host for browser-cli.
Chrome launches this process when the extension calls connectNative().
It relays messages between the Chrome extension (via stdin/stdout using the
Native Messaging protocol) and the CLI (via a Unix domain socket).
"""
import json
import os
import queue
import socket
import struct
import sys
import threading
import uuid
from pathlib import Path
SOCKET_PATH = "/tmp/browser-cli.sock"
PENDING: dict[str, queue.Queue] = {}
PENDING_LOCK = threading.Lock()
# --- Native Messaging protocol (4-byte LE length prefix + UTF-8 JSON) ---
def read_native_message(stream) -> dict | None:
raw_len = stream.read(4)
if len(raw_len) < 4:
return None
msg_len = struct.unpack("<I", raw_len)[0]
data = stream.read(msg_len)
if len(data) < msg_len:
return None
return json.loads(data.decode("utf-8"))
def write_native_message(stream, msg: dict) -> None:
data = json.dumps(msg).encode("utf-8")
stream.write(struct.pack("<I", len(data)))
stream.write(data)
stream.flush()
# --- Thread A: read messages from extension (stdin) ---
def stdin_reader():
stdin = sys.stdin.buffer
while True:
msg = read_native_message(stdin)
if msg is None:
# Extension disconnected — clean up socket and exit
_cleanup()
os._exit(0)
msg_id = msg.get("id")
if msg_id:
with PENDING_LOCK:
q = PENDING.get(msg_id)
if q:
q.put(msg)
# --- Thread B: accept CLI socket connections ---
def socket_server():
path = Path(SOCKET_PATH)
if path.exists():
path.unlink()
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.bind(SOCKET_PATH)
sock.listen(16)
while True:
try:
conn, _ = sock.accept()
except OSError:
break
threading.Thread(target=handle_cli_connection, args=(conn,), daemon=True).start()
def handle_cli_connection(conn: socket.socket) -> None:
try:
data = _recv_all(conn)
if not data:
return
cmd = json.loads(data)
if "id" not in cmd:
cmd["id"] = str(uuid.uuid4())
msg_id = cmd["id"]
response_queue: queue.Queue = queue.Queue()
with PENDING_LOCK:
PENDING[msg_id] = response_queue
# Forward command to extension via stdout
write_native_message(sys.stdout.buffer, cmd)
# Wait for extension's response (30 s timeout)
try:
result = response_queue.get(timeout=30)
except queue.Empty:
result = {"id": msg_id, "success": False, "error": "timeout waiting for browser response"}
with PENDING_LOCK:
PENDING.pop(msg_id, None)
_send_all(conn, json.dumps(result).encode("utf-8"))
except Exception as exc:
try:
_send_all(conn, json.dumps({"success": False, "error": str(exc)}).encode("utf-8"))
except Exception:
pass
finally:
conn.close()
# --- Socket helpers (length-prefixed framing) ---
def _send_all(conn: socket.socket, data: bytes) -> None:
framed = struct.pack("<I", len(data)) + data
conn.sendall(framed)
def _recv_all(conn: socket.socket) -> bytes | None:
raw_len = _recv_exact(conn, 4)
if raw_len is None:
return None
msg_len = struct.unpack("<I", raw_len)[0]
return _recv_exact(conn, msg_len)
def _recv_exact(conn: socket.socket, n: int) -> bytes | None:
buf = b""
while len(buf) < n:
chunk = conn.recv(n - len(buf))
if not chunk:
return None
buf += chunk
return buf
def _cleanup():
try:
Path(SOCKET_PATH).unlink(missing_ok=True)
except Exception:
pass
def main():
# Start socket server thread
t = threading.Thread(target=socket_server, daemon=True)
t.start()
# Read extension messages on main thread (blocks until extension disconnects)
stdin_reader()
if __name__ == "__main__":
main()