import base64 import binascii import click from browser_cli.client import BrowserNotConnected, active_browser_targets, remote_browser_targets, send_command from rich.console import Console from rich.table import Table console = Console() def _handle(command, args=None, profile=None): try: return send_command(command, args or {}, profile=profile) except BrowserNotConnected as e: console.print(f"[red]Error:[/red] {e}") raise SystemExit(1) except RuntimeError as e: console.print(f"[red]Browser error:[/red] {e}") raise SystemExit(1) def _handle_multi(command, args=None, profile=None, remote=None, token=None): try: if remote: return send_command(command, args or {}, profile=profile, remote=remote, token=token) return send_command(command, args or {}, profile=profile) except (BrowserNotConnected, RuntimeError): return None def _multi_browser_targets(): root = click.get_current_context().find_root() if root.obj.get("browser_explicit"): return [] remote = root.obj.get("remote") if remote: targets = remote_browser_targets(remote, root.obj.get("token")) else: targets = active_browser_targets() if len(targets) <= 1 and not any(target.remote for target in targets): return [] return targets def _print_tabs(tabs: list[dict], *, show_browser: bool = False) -> None: if not tabs: console.print("[yellow]No tabs found[/yellow]") return table = Table(show_header=True, header_style="bold cyan") if show_browser: table.add_column("Browser", no_wrap=True) table.add_column("ID", style="dim", no_wrap=True) table.add_column("Window", no_wrap=True) table.add_column("Active", width=7) table.add_column("Muted", width=7) table.add_column("Title") table.add_column("URL") for t in tabs: active = "[green]✓[/green]" if t.get("active") else "" muted = "[yellow]✓[/yellow]" if t.get("muted") else "" row = [ t.get("browser", "") if show_browser else None, str(t.get("id", "")), str(t.get("windowId", "")), active, muted, (t.get("title") or "")[:60], (t.get("url") or "")[:80], ] table.add_row(*[value for value in row if value is not None]) console.print(table) @click.group("tabs") def tabs_group(): """Manage browser tabs.""" @tabs_group.command("list") def tabs_list(): """List all open tabs across all windows.""" targets = _multi_browser_targets() if targets: tabs = [] for target in targets: result = _handle_multi("tabs.list", profile=target.profile, remote=target.remote, token=target.token) if result is None: continue tabs.extend({**tab, "browser": target.display_name} for tab in result) if not tabs: console.print("[red]Error:[/red] Cannot resolve a browser socket automatically.") raise SystemExit(1) _print_tabs(tabs, show_browser=True) return tabs = _handle("tabs.list") _print_tabs(tabs or []) @tabs_group.command("close") @click.argument("tab_id", type=int, required=False) @click.option("--inactive", is_flag=True, help="Close all inactive tabs") @click.option("--duplicates", is_flag=True, help="Close duplicate tabs (keep first)") def tabs_close(tab_id, inactive, duplicates): """Close a tab, all inactive tabs, or all duplicate tabs.""" result = _handle("tabs.close", {"tabId": tab_id, "inactive": inactive, "duplicates": duplicates}) count = result.get("closed", 0) if isinstance(result, dict) else 1 console.print(f"[green]Closed {count} tab(s)[/green]") @tabs_group.command("move") @click.argument("tab_id", type=int) @click.option("-f", "--forward", "forward", is_flag=True, help="Move one position to the right") @click.option("-b", "--backward", "backward", is_flag=True, help="Move one position to the left") @click.option("-r", "--right", "forward", is_flag=True, help="Move one position to the right") @click.option("-l", "--left", "backward", is_flag=True, help="Move one position to the left") @click.option("--group", "group_id", type=int, default=None, help="Move to tab group ID") @click.option("--window", "window_id", type=int, default=None, help="Move to window ID") @click.option("--index", type=int, default=None, help="Absolute position index in target") def tabs_move(tab_id, forward, backward, group_id, window_id, index): """Move a tab. Use --forward/--backward or --right/--left for relative movement.""" _handle("tabs.move", { "tabId": tab_id, "forward": forward, "backward": backward, "groupId": group_id, "windowId": window_id, "index": index, }) console.print("[green]Tab moved[/green]") @tabs_group.command("active") @click.argument("tab_id", type=int) def tabs_active(tab_id): """Switch browser focus to a tab.""" _handle("tabs.active", {"tabId": tab_id}) console.print(f"[green]Switched to tab {tab_id}[/green]") @tabs_group.command("status") @click.argument("tab_id", type=int, required=False) def tabs_status(tab_id): """Show status for the active tab or a specific tab.""" tab = _handle("tabs.status", {"tabId": tab_id}) or {} table = Table(show_header=False) table.add_column("Field", style="bold cyan") table.add_column("Value") table.add_row("ID", str(tab.get("id", ""))) table.add_row("Window", str(tab.get("windowId", ""))) table.add_row("Active", "yes" if tab.get("active") else "no") table.add_row("Muted", "yes" if tab.get("muted") else "no") table.add_row("Title", tab.get("title") or "") table.add_row("URL", tab.get("url") or "") console.print(table) @tabs_group.command("filter") @click.argument("pattern") def tabs_filter(pattern): """List tabs whose URL contains PATTERN.""" tabs = _handle("tabs.filter", {"pattern": pattern}) _print_tabs(tabs or []) @tabs_group.command("count") @click.argument("pattern", required=False) def tabs_count(pattern): """Count open tabs, optionally filtered by URL PATTERN.""" targets = _multi_browser_targets() if targets: table = Table(show_header=True, header_style="bold cyan") table.add_column("Browser") table.add_column("Tabs", justify="right") total = 0 rows = 0 for target in targets: count = _handle_multi("tabs.count", {"pattern": pattern}, profile=target.profile, remote=target.remote, token=target.token) if count is None: continue count = int(count or 0) total += count rows += 1 table.add_row(target.display_name, str(count)) if rows == 0: console.print("[red]Error:[/red] Cannot resolve a browser socket automatically.") raise SystemExit(1) table.add_row("Total", str(total)) console.print(table) return count = _handle("tabs.count", {"pattern": pattern}) label = f" matching '{pattern}'" if pattern else "" console.print(f"[bold]{count}[/bold] tab(s){label}") @tabs_group.command("query") @click.argument("search") def tabs_query(search): """Search tabs by URL or title.""" tabs = _handle("tabs.query", {"search": search}) _print_tabs(tabs or []) @tabs_group.command("html") @click.argument("tab_id", type=int, required=False) def tabs_html(tab_id): """Print the full HTML of a tab.""" html = _handle("tabs.html", {"tabId": tab_id}) console.print(html or "") @tabs_group.command("dedupe") def tabs_dedupe(): """Close duplicate tabs (keep the first occurrence of each URL).""" result = _handle("tabs.dedupe") count = result.get("closed", 0) if isinstance(result, dict) else 0 console.print(f"[green]Closed {count} duplicate tab(s)[/green]") @tabs_group.command("sort") @click.option("--by", type=click.Choice(["domain", "title", "time"]), default="domain", show_default=True) def tabs_sort(by): """Sort tabs within each window.""" _handle("tabs.sort", {"by": by}) console.print(f"[green]Tabs sorted by {by}[/green]") @tabs_group.command("merge-windows") def tabs_merge_windows(): """Move all tabs into the focused window.""" result = _handle("tabs.merge_windows") count = result.get("moved", 0) if isinstance(result, dict) else 0 console.print(f"[green]Merged — moved {count} tab(s) into current window[/green]") @tabs_group.command("mute") @click.argument("tab_id", type=int, required=False) def tabs_mute(tab_id): """Mute the active tab or a specific tab.""" result = _handle("tabs.mute", {"tabId": tab_id}) target = result.get("tabId", tab_id) if isinstance(result, dict) else tab_id console.print(f"[green]Muted tab {target}[/green]") @tabs_group.command("unmute") @click.argument("tab_id", type=int, required=False) def tabs_unmute(tab_id): """Unmute the active tab or a specific tab.""" result = _handle("tabs.unmute", {"tabId": tab_id}) target = result.get("tabId", tab_id) if isinstance(result, dict) else tab_id console.print(f"[green]Unmuted tab {target}[/green]") @tabs_group.command("pin") @click.argument("tab_id", type=int, required=False) def tabs_pin(tab_id): """Pin the active tab or a specific tab.""" result = _handle("tabs.pin", {"tabId": tab_id}) target = result.get("tabId", tab_id) if isinstance(result, dict) else tab_id console.print(f"[green]Pinned tab {target}[/green]") @tabs_group.command("unpin") @click.argument("tab_id", type=int, required=False) def tabs_unpin(tab_id): """Unpin the active tab or a specific tab.""" result = _handle("tabs.unpin", {"tabId": tab_id}) target = result.get("tabId", tab_id) if isinstance(result, dict) else tab_id console.print(f"[green]Unpinned tab {target}[/green]") @tabs_group.command("watch-url") @click.argument("pattern") @click.option("--tab", "tab_id", type=int, default=None, help="Tab ID (default: active tab)") @click.option("--timeout", type=float, default=30.0, show_default=True, help="Max seconds to wait") def tabs_watch_url(pattern, tab_id, timeout): """Wait until the active (or specified) tab URL matches regex PATTERN.""" result = _handle("tabs.watch_url", {"pattern": pattern, "tabId": tab_id, "timeout": int(timeout * 1000)}) url = result.get("url", "") if isinstance(result, dict) else "" console.print(f"[green]URL matched:[/green] {url}") @tabs_group.command("screenshot") @click.argument("output", required=False, metavar="FILE") @click.option("--tab", "tab_id", type=int, default=None, help="Tab ID (default: active tab)") @click.option("--format", "fmt", type=click.Choice(["png", "jpeg"]), default="png", show_default=True) @click.option("--quality", type=int, default=None, help="JPEG quality 0-100") def tabs_screenshot(output, tab_id, fmt, quality): """Capture a screenshot of the active (or specified) tab. Saves to FILE if given, otherwise prints the base64 data URL. """ result = _handle("tabs.screenshot", {"tabId": tab_id, "format": fmt, "quality": quality}) data_url = result.get("dataUrl", "") if isinstance(result, dict) else "" if output: header = f"data:image/{fmt};base64," if not data_url.startswith(header): raise click.ClickException("Empty or unexpected screenshot response (incognito/protected tab?)") try: raw = base64.b64decode(data_url[len(header):]) except binascii.Error as e: raise click.ClickException(f"Failed to decode screenshot data: {e}") with open(output, "wb") as f: f.write(raw) console.print(f"[green]Screenshot saved:[/green] {output}") else: console.print(data_url)