diff --git a/README.md b/README.md index 6c234cb..c6563b8 100644 --- a/README.md +++ b/README.md @@ -290,88 +290,100 @@ from browser_cli import BrowserCLI b = BrowserCLI() ``` -Every CLI command has a corresponding SDK method. The call blocks until the browser responds and returns the data directly as a Python object. +Commands are grouped into namespaces on the client (`b.tabs`, `b.dom`, `b.session`, ...). Each call blocks until the browser responds and returns the data directly as a Python object. ```python -# Navigation -b.open("https://example.com") -tab = b.open_tab("https://example.com") # returns a bound Tab object -tab = b.open_tab("https://example.com", wait=True, timeout=10) -b.open("https://example.com", background=True) -b.open("https://example.com", window="work") -b.reload() -b.hard_reload() -b.back() -b.forward(tab_id=1234) -b.focus_url("github") +# Navigation ── b.nav +b.nav.open("https://example.com") +b.nav.open("https://example.com", background=True) +b.nav.open("https://example.com", window="work") +b.nav.reload() +b.nav.hard_reload() +b.nav.back() +b.nav.forward(tab_id=1234) +b.nav.focus("github") +b.nav.to(1234, "https://example.com") # navigate a specific tab in place +b.nav.search("google", "python asyncio") -# Tabs -tabs = b.tabs_list() # list[Tab]; in multi-browser mode each tab.browser is set -tabs = b.tabs() # short alias for tabs_list() -active = b.active_tab() # active Tab object -tab = b.tab(1234) # tab by ID -tab = b.find_tab("github") # first matching tab or None -tabs = b.find_tabs("github") # alias for tabs_query() -b.tabs_active(1234) -b.tabs_close(1234) -b.close_tab(tab) # accepts Tab or tab ID -b.tabs_close_inactive() -b.tabs_close_duplicates() -b.tabs_filter("youtube") # list of matching tabs -b.tabs_query("pull request") -counts = b.tabs_count("github") # int, or BrowserCounts(total=..., by_browser=...) in multi-browser mode -html = b.tabs_html() # full HTML string of active tab -b.tabs_sort(by="domain") -b.tabs_merge_windows() -b.tabs_dedupe() +# Tabs ── b.tabs +tabs = b.tabs.list() # list[Tab]; in multi-browser mode each tab.browser is set +tab = b.tabs.open("https://example.com") # returns a bound Tab object +tab = b.tabs.open("https://example.com", wait=True, timeout=10) +active = b.tabs.active() # active Tab object +tab = b.tabs.get(1234) # tab by ID +tab = b.tabs.first("github") # first matching tab or None +b.tabs.activate(1234) +b.tabs.close(1234) +b.tabs.close(tab_ids=[1, 2, 3]) # close many in one round-trip (IDs or Tab objects) +b.tabs.close_inactive() +b.tabs.close_duplicates() +b.tabs.filter("youtube") # list of matching tabs +b.tabs.query("pull request") +counts = b.tabs.count("github") # int, or BrowserCounts(total=..., by_browser=...) in multi-browser mode +html = b.tabs.html() # full HTML string of active tab +b.tabs.sort(by="domain") +b.tabs.merge_windows() +b.tabs.dedupe() # Bound Tab helpers -tab = b.active_tab() +tab = b.tabs.active() tab.pin() tab.screenshot() tab.refresh() tab.wait_for_load(timeout=10) tab.watch_url(r"/done$") -# Tab groups -groups = b.group_list() # list[Group]; in multi-browser mode each group.browser is set -groups = b.groups() # short alias for group_list() -b.groups_create("research") # plural alias for group_create() -b.group_create("research") # creates group, returns Group -b.group_close(42) -b.group_tabs(42) # tabs inside a group -b.group_count() # int, or BrowserCounts(...) in multi-browser mode +# Tab groups ── b.groups +groups = b.groups.list() # list[Group]; in multi-browser mode each group.browser is set +b.groups.create("research") # creates group, returns Group +b.groups.close(42) +b.groups.tabs(42) # tabs inside a group +b.groups.add_tab(42, "https://example.com") +b.groups.count() # int, or BrowserCounts(...) in multi-browser mode -# Windows -windows = b.windows_list() # in multi-browser mode each dict has a "browser" key -b.windows_rename(1, "work") -b.windows_open() -b.windows_open("https://example.com") -b.windows_close(1) +# Windows ── b.windows +windows = b.windows.list() # in multi-browser mode each dict has a "browser" key +b.windows.rename(1, "work") +b.windows.open() +b.windows.open("https://example.com") +b.windows.close(1) -# DOM (active tab must be http/https) -elements = b.dom_query("h2") # list of { tag, text, attrs } -texts = b.dom_text(".article p") # list of strings -attrs = b.dom_attr("a", "href") # list of strings -exists = b.dom_exists(".cookie-banner")# bool -b.dom_click(".accept-button") -b.dom_type("#search", "hello world") -b.wait_for_selector("#results", visible=True, timeout=10) +# DOM ── b.dom (active tab must be http/https) +elements = b.dom.query("h2") # list of { tag, text, attrs } +texts = b.dom.text(".article p") # list of strings +attrs = b.dom.attr("a", "href") # list of strings +exists = b.dom.exists(".cookie-banner")# bool +b.dom.click(".accept-button") +b.dom.type("#search", "hello world") +b.dom.wait_for("#results", visible=True, timeout=10) +b.dom.eval("document.title") -# Extract -links = b.extract_links() # list of { text, href } -images = b.extract_images() # list of { alt, src } -text = b.extract_text() # string -data = b.extract_json("#app-data") # parsed Python object +# Extract ── b.extract +links = b.extract.links() # list of { text, href } +images = b.extract.images() # list of { alt, src } +text = b.extract.text() # string +data = b.extract.json("#app-data") # parsed Python object +md = b.extract.markdown("article") -# Sessions -b.session_save("before-meeting") -b.session_load("before-meeting") -sessions = b.session_list() # [{ name, tabs, savedAt }, ...] -b.session_remove("before-meeting") -diff = b.session_diff("session-a", "session-b") +# Page / storage / cookies +info = b.page.info() +b.storage.set("token", "abc") +val = b.storage.get("token") +cookies = b.cookies.list(domain="example.com") + +# Sessions ── b.session +b.session.save("before-meeting") +b.session.load("before-meeting") +sessions = b.session.list() # [{ name, tabs, savedAt }, ...] +b.session.remove("before-meeting") +diff = b.session.diff("session-a", "session-b") # diff = { "added": [...urls], "removed": [...urls] } -b.session_auto_save(True) +b.session.auto_save(True) + +# Performance + extension +b.perf.status() +b.perf.set_profile("gentle") +b.extension.reload() # Misc clients = b.clients() @@ -385,7 +397,7 @@ from browser_cli import BrowserCLI, BrowserNotConnected b = BrowserCLI() try: - tabs = b.tabs_list() + tabs = b.tabs.list() except BrowserNotConnected: print("Browser is not running or extension is not loaded") except RuntimeError as e: @@ -397,11 +409,11 @@ from browser_cli import BrowserCLI, BrowserCounts b = BrowserCLI() -tabs = b.tabs_list() +tabs = b.tabs.list() for tab in tabs: print(tab.browser, tab.title) -counts = b.tabs_count() +counts = b.tabs.count() if isinstance(counts, BrowserCounts): print(counts.total) print(counts.by_browser) diff --git a/browser_cli/__init__.py b/browser_cli/__init__.py index 230587a..d79847c 100644 --- a/browser_cli/__init__.py +++ b/browser_cli/__init__.py @@ -5,34 +5,66 @@ Usage: from browser_cli import BrowserCLI b = BrowserCLI() - tabs = b.tabs_list() # list[Tab] + tabs = b.tabs.list() # list[Tab] tabs[0].close() tabs[0].move(forward=True) - groups = b.group_list() # list[Group] + groups = b.groups.list() # list[Group] groups[0].tabs() groups[0].add_tab("https://example.com") + b.nav.open("https://example.com") + b.dom.click("#submit") + b.session.save("work") + # When multiple browser instances are active, pass the alias: b = BrowserCLI(browser="brave") -""" -from collections.abc import Callable, Iterable -from dataclasses import dataclass +Commands are grouped into namespaces on the client: + b.nav navigation (open, reload, back, forward, focus, search) + b.tabs tabs (list, open, close, move, status, mute, sort, ...) + b.groups tab groups (list, create, add_tab, move, close) + b.windows browser windows (list, open, close, rename) + b.dom page elements (query, click, type, wait_for, eval, ...) + b.extract content extraction (links, images, text, json, markdown) + b.page page info + b.storage localStorage / sessionStorage + b.cookies cookies (list, get, set) + b.session sessions (save, load, list, diff, ...) + b.perf performance profile + background jobs + b.extension control the extension itself +""" from browser_cli.client import BrowserNotConnected, active_browser_targets, remote_browser_targets, send_command -from browser_cli.models import Group, Tab +from browser_cli.models import BrowserCounts, Group, Tab +from browser_cli.sdk import ( + CookiesNS, + DomNS, + ExtensionNS, + ExtractNS, + GroupsNS, + NavigationNS, + PageNS, + PerfNS, + SessionNS, + StorageNS, + TabsNS, + WindowsNS, +) +from browser_cli.sdk.factories import FactoryMixin +from browser_cli.sdk.routing import RoutingMixin __all__ = ["BrowserCLI", "BrowserCounts", "BrowserNotConnected", "Tab", "Group"] +class BrowserCLI(FactoryMixin, RoutingMixin): + """Client for a running browser, with commands grouped into namespaces. -@dataclass(frozen=True) -class BrowserCounts: - """Aggregated per-browser counts returned in implicit multi-browser mode.""" - total: int - by_browser: dict[str, int] + The client itself holds the connection target (browser/remote/key) and the + shared machinery; the actual commands live on namespace accessors such as + :attr:`tabs`, :attr:`dom`, and :attr:`session`. Object construction + (``Tab``/``Group``) comes from :class:`~browser_cli.sdk.factories.FactoryMixin` + and multi-browser fan-out from :class:`~browser_cli.sdk.routing.RoutingMixin`. + """ - -class BrowserCLI: def __init__(self, browser: str | None = None, remote: str | None = None, key: str | None = None): """ Args: @@ -50,6 +82,20 @@ class BrowserCLI: self._remote = remote self._key = key if key else None + # Command namespaces. + self.nav = NavigationNS(self) + self.tabs = TabsNS(self) + self.groups = GroupsNS(self) + self.windows = WindowsNS(self) + self.dom = DomNS(self) + self.extract = ExtractNS(self) + self.page = PageNS(self) + self.storage = StorageNS(self) + self.cookies = CookiesNS(self) + self.session = SessionNS(self) + self.perf = PerfNS(self) + self.extension = ExtensionNS(self) + @property def browser(self) -> str | None: """Target browser/profile alias, equivalent to ``--browser``.""" @@ -72,816 +118,10 @@ class BrowserCLI: """Send a raw browser-cli command and return its response. This is the SDK escape hatch for commands that do not have a dedicated - convenience method yet. + namespace method yet. """ return self._cmd(command, args or {}) - def _multi_browser_targets(self): - if self._browser is not None: - return [] - if self._remote: - targets = remote_browser_targets(self._remote, key=self._key) - else: - targets = active_browser_targets() - if len(targets) <= 1 and not any(target.remote for target in targets): - return [] - return targets - - def _collect_multi_browser(self, command: str, args: dict | None = None): - results = [] - targets = self._multi_browser_targets() - for target in targets: - try: - if target.remote: - data = send_command(command, args, profile=target.profile, remote=target.remote, key=self._key) - else: - data = send_command(command, args, profile=target.profile) - except (BrowserNotConnected, RuntimeError): - continue - results.append((target, data)) - if results: - return results - if targets: - raise BrowserNotConnected( - "Cannot resolve a browser socket automatically.\n" - "Make sure the browser is running with the browser-cli extension enabled,\n" - "or pass --browser / set BROWSER_CLI_PROFILE to a known alias." - ) - return [] - - # ── Internal factories ──────────────────────────────────────────────── - - def _make_tab( - self, - data: dict, - *, - browser_profile: str | None = None, - browser_name: str | None = None, - browser_remote: str | None = None, - ) -> Tab: - tab = Tab( - id=data["id"], - window_id=data.get("windowId", 0), - active=data.get("active", False), - muted=data.get("muted", False), - title=data.get("title") or "", - url=data.get("url") or "", - group_id=data.get("groupId") or None, - browser=browser_name, - ) - tab._browser = self if browser_profile is None else BrowserCLI( - browser=browser_profile, - remote=browser_remote, - key=self._key, - ) - return tab - - def _make_group( - self, - data: dict, - *, - browser_profile: str | None = None, - browser_name: str | None = None, - browser_remote: str | None = None, - ) -> Group: - group = Group( - id=data["id"], - title=data.get("title") or "", - color=data.get("color") or "", - collapsed=data.get("collapsed", False), - tab_count=data.get("tabCount", 0), - browser=browser_name, - ) - group._browser = self if browser_profile is None else BrowserCLI( - browser=browser_profile, - remote=browser_remote, - key=self._key, - ) - return group - - # ── Navigation ──────────────────────────────────────────────────────── - - def open(self, url: str, *, background: bool = False, window: str | None = None, group: str | None = None) -> None: - self._cmd("navigate.open", {"url": url, "background": background, "window": window, "group": group}) - - def open_tab( - self, - url: str, - *, - wait: bool = False, - timeout: float = 30.0, - background: bool = False, - window: str | None = None, - group: str | None = None, - ) -> "Tab": - """Open URL in a new tab and return a bound :class:`Tab` object. - - Set ``wait=True`` to block until the page reaches ``readyState=complete``. - """ - if wait: - return self.open_wait(url, timeout=timeout, background=background, window=window, group=group) - data = self._cmd("navigate.open", {"url": url, "background": background, "window": window, "group": group}) - if not isinstance(data, dict) or "id" not in data: - raise RuntimeError("navigate.open returned unexpected data") - return self._make_tab(data) - - def reload(self, tab_id: int | None = None) -> None: - self._cmd("navigate.reload", {"tabId": tab_id}) - - def hard_reload(self, tab_id: int | None = None) -> None: - self._cmd("navigate.hard_reload", {"tabId": tab_id}) - - def back(self, tab_id: int | None = None) -> None: - self._cmd("navigate.back", {"tabId": tab_id}) - - def forward(self, tab_id: int | None = None) -> None: - self._cmd("navigate.forward", {"tabId": tab_id}) - - def focus_url(self, pattern: str) -> None: - self._cmd("navigate.focus", {"pattern": pattern}) - - def navigate_tab(self, tab_id: int, url: str) -> None: - """Navigate a specific tab to *url*.""" - self._cmd("navigate.to", {"tabId": tab_id, "url": url}) - - def open_wait( - self, - url: str, - *, - timeout: float = 30.0, - background: bool = False, - window: str | None = None, - group: str | None = None, - ) -> "Tab": - """Open URL in a new tab and block until fully loaded. Returns the Tab.""" - data = self._cmd("navigate.open_wait", { - "url": url, "timeout": int(timeout * 1000), - "background": background, "window": window, "group": group, - }) - if not isinstance(data, dict) or "id" not in data: - raise RuntimeError("navigate.open_wait returned unexpected data") - return self._make_tab(data) - - def wait_for_load( - self, - tab_id: int | None = None, - *, - timeout: float = 30.0, - ready_state: str = "complete", - ) -> Tab: - """Block until the tab finishes loading. Returns the Tab when ready. - - Args: - tab_id: Tab to watch. Defaults to the active tab. - timeout: Max seconds to wait before raising ``RuntimeError``. - ready_state: ``"complete"`` (default) or ``"interactive"``. - """ - data = self._cmd("navigate.wait", { - "tabId": tab_id, - "timeout": int(timeout * 1000), - "readyState": ready_state, - }) - if not isinstance(data, dict) or "id" not in data: - raise RuntimeError("navigate.wait returned unexpected data") - return self._make_tab(data) - - # ── Search ──────────────────────────────────────────────────────────── - - def search( - self, engine: str, query: str, *, - background: bool = False, window: str | None = None, group: str | None = None, - ) -> None: - """Open a search query in the given engine (e.g. 'google', 'youtube', 'ddg').""" - from urllib.parse import quote_plus - from browser_cli.commands.search import ENGINES - template = ENGINES.get(engine) - if template is None: - raise ValueError(f"Unknown search engine '{engine}'. Available: {', '.join(ENGINES)}") - url = template.format(query=quote_plus(query)) - self._cmd("navigate.open", {"url": url, "background": background, "window": window, "group": group}) - - # ── Tabs ────────────────────────────────────────────────────────────── - - def tabs(self) -> list[Tab]: - """Alias for :meth:`tabs_list`.""" - return self.tabs_list() - - def tab(self, tab_id: int) -> Tab: - """Return a specific tab by ID.""" - return self.tabs_status(tab_id) - - def active_tab(self) -> Tab: - """Return the active tab.""" - return self.tabs_status() - - def find_tabs(self, search: str) -> list[Tab]: - """Alias for :meth:`tabs_query`.""" - return self.tabs_query(search) - - def find_tab(self, search: str) -> Tab | None: - """Return the first tab matching *search*, or ``None``.""" - matches = self.tabs_query(search) - return matches[0] if matches else None - - def close_tab(self, tab: int | Tab) -> int: - """Close a tab by ID or :class:`Tab` object. Returns count closed.""" - tab_id = tab.id if isinstance(tab, Tab) else tab - return self.tabs_close(tab_id) - - def tabs_list(self) -> list[Tab]: - """Return all open tabs across all windows. - - When multiple browsers are active and no browser was specified, each Tab - includes ``tab.browser`` naming its source browser. - """ - multi_results = self._collect_multi_browser("tabs.list", {}) - if multi_results: - return [ - self._make_tab( - tab, - browser_profile=target.profile, - browser_name=target.display_name, - browser_remote=target.remote, - ) - for target, tabs in multi_results - for tab in (tabs or []) - ] - return [self._make_tab(t) for t in (self._cmd("tabs.list", {}) or [])] - - def tabs_close( - self, - tab_id: int | None = None, - *, - tab_ids: Iterable[int | Tab] | None = None, - inactive: bool = False, - duplicates: bool = False, - ) -> int: - """Close tab(s). Returns the number of tabs closed. - - Pass ``tab_ids`` to close many tabs in a single round-trip instead of - calling :meth:`close_tab` per tab. Accepts tab IDs or :class:`Tab` - objects. The extension throttles large batches automatically. - """ - ids = None - if tab_ids is not None: - ids = [t.id if isinstance(t, Tab) else t for t in tab_ids] - result = self._cmd("tabs.close", { - "tabId": tab_id, - "tabIds": ids, - "inactive": inactive, - "duplicates": duplicates, - }) - return result.get("closed", 1) if isinstance(result, dict) else 1 - - def tabs_move( - self, tab_id: int, *, - forward: bool = False, backward: bool = False, - group_id: int | None = None, window_id: int | None = None, index: int | None = None, - ) -> None: - self._cmd("tabs.move", { - "tabId": tab_id, "forward": forward, "backward": backward, - "groupId": group_id, "windowId": window_id, "index": index, - }) - - def tabs_active(self, tab_id: int) -> None: - """Switch browser focus to a tab by ID.""" - self._cmd("tabs.active", {"tabId": tab_id}) - - def tabs_status(self, tab_id: int | None = None) -> Tab: - """Return status for the active tab or a specific tab.""" - data = self._cmd("tabs.status", {"tabId": tab_id}) - if not isinstance(data, dict) or "id" not in data: - raise RuntimeError("No tab status returned") - return self._make_tab(data) - - def tabs_mute(self, tab_id: int | None = None) -> int: - """Mute the active tab or a specific tab. Returns the target tab ID.""" - result = self._cmd("tabs.mute", {"tabId": tab_id}) - return result.get("tabId", tab_id) if isinstance(result, dict) else int(tab_id or 0) - - def tabs_unmute(self, tab_id: int | None = None) -> int: - """Unmute the active tab or a specific tab. Returns the target tab ID.""" - result = self._cmd("tabs.unmute", {"tabId": tab_id}) - return result.get("tabId", tab_id) if isinstance(result, dict) else int(tab_id or 0) - - def tabs_pin(self, tab_id: int | None = None) -> int: - """Pin the active tab or a specific tab. Returns the target tab ID.""" - result = self._cmd("tabs.pin", {"tabId": tab_id}) - return result.get("tabId", tab_id) if isinstance(result, dict) else int(tab_id or 0) - - def tabs_unpin(self, tab_id: int | None = None) -> int: - """Unpin the active tab or a specific tab. Returns the target tab ID.""" - result = self._cmd("tabs.unpin", {"tabId": tab_id}) - return result.get("tabId", tab_id) if isinstance(result, dict) else int(tab_id or 0) - - def tabs_watch_url( - self, - pattern: str, - *, - tab_id: int | None = None, - timeout: float = 30.0, - ) -> "Tab": - """Block until the tab URL matches regex pattern. Returns the Tab.""" - data = self._cmd("tabs.watch_url", {"pattern": pattern, "tabId": tab_id, "timeout": int(timeout * 1000)}) - if not isinstance(data, dict) or "id" not in data: - raise RuntimeError("tabs.watch_url returned unexpected data") - return self._make_tab(data) - - def tabs_screenshot( - self, - tab_id: int | None = None, - *, - format: str = "png", - quality: int | None = None, - ) -> str: - """Capture the visible area of a tab. Returns a base64 data URL. - - Args: - tab_id: Tab to capture. Defaults to the active tab. - format: ``"png"`` (default) or ``"jpeg"``. - quality: JPEG quality 0-100 (ignored for PNG). - """ - result = self._cmd("tabs.screenshot", {"tabId": tab_id, "format": format, "quality": quality}) - return result.get("dataUrl", "") if isinstance(result, dict) else str(result) - - def window_active_tab(self, window_id: int) -> Tab: - """Return active tab for a specific browser window.""" - data = self._cmd("tabs.active_in_window", {"windowId": window_id}) - if not isinstance(data, dict) or "id" not in data: - raise RuntimeError(f"No active tab found for window {window_id}") - return self._make_tab(data) - - def tabs_filter(self, pattern_or_filter: str | Callable[[Tab], bool] | Callable[[list[Tab]], Iterable[Tab]]) -> list[Tab]: - """Return tabs filtered by pattern or a Python callable.""" - if isinstance(pattern_or_filter, str): - return [self._make_tab(t) for t in (self._cmd("tabs.filter", {"pattern": pattern_or_filter}) or [])] - return self._apply_tab_filter(pattern_or_filter) - - def tabs_count(self, pattern: str | None = None) -> int | BrowserCounts: - """Count open tabs, optionally filtered by URL pattern. - - Returns ``BrowserCounts`` in implicit multi-browser mode. - """ - multi_results = self._collect_multi_browser("tabs.count", {"pattern": pattern}) - if multi_results: - by_browser = {target.display_name: int(count or 0) for target, count in multi_results} - return BrowserCounts(total=sum(by_browser.values()), by_browser=by_browser) - return self._cmd("tabs.count", {"pattern": pattern}) - - def tabs_query(self, search: str) -> list[Tab]: - """Search tabs by URL or title.""" - return [self._make_tab(t) for t in (self._cmd("tabs.query", {"search": search}) or [])] - - def tabs_html(self, tab_id: int | None = None) -> str: - """Return the full HTML source of the active (or specified) tab.""" - return self._cmd("tabs.html", {"tabId": tab_id}) or "" - - def tabs_dedupe(self) -> int: - """Close duplicate tabs (keep the first occurrence of each URL). Returns count closed.""" - result = self._cmd("tabs.dedupe", {}) - return result.get("closed", 0) if isinstance(result, dict) else 0 - - def tabs_sort(self, by: str = "domain") -> None: - """Sort tabs within each window. *by* is one of 'domain', 'title', 'time'.""" - self._cmd("tabs.sort", {"by": by}) - - def tabs_merge_windows(self) -> int: - """Move all tabs into the focused window. Returns count moved.""" - result = self._cmd("tabs.merge_windows", {}) - return result.get("moved", 0) if isinstance(result, dict) else 0 - - def tabs_close_inactive(self) -> int: - """Close all inactive tabs. Returns count closed.""" - result = self._cmd("tabs.close", {"inactive": True}) - return result.get("closed", 0) if isinstance(result, dict) else 0 - - def tabs_close_duplicates(self) -> int: - """Close duplicate tabs. Returns count closed.""" - result = self._cmd("tabs.close", {"duplicates": True}) - return result.get("closed", 0) if isinstance(result, dict) else 0 - - # ── Tab Groups ──────────────────────────────────────────────────────── - - def groups(self) -> list[Group]: - """Alias for :meth:`group_list`.""" - return self.group_list() - - def groups_list(self) -> list[Group]: - """Alias for :meth:`group_list`.""" - return self.group_list() - - def group_list(self) -> list[Group]: - """Return all tab groups. - - When multiple browsers are active and no browser was specified, each Group - includes ``group.browser`` naming its source browser. - """ - multi_results = self._collect_multi_browser("group.list", {}) - if multi_results: - return [ - self._make_group( - group, - browser_profile=target.profile, - browser_name=target.display_name, - browser_remote=target.remote, - ) - for target, groups in multi_results - for group in (groups or []) - ] - return [self._make_group(g) for g in (self._cmd("group.list", {}) or [])] - - def group_tabs(self, group_id: int) -> list[Tab]: - """Return all tabs inside a group.""" - return [self._make_tab(t) for t in (self._cmd("group.tabs", {"groupId": group_id}) or [])] - - def groups_count(self) -> int | BrowserCounts: - """Alias for :meth:`group_count`.""" - return self.group_count() - - def group_count(self) -> int | BrowserCounts: - """Return the number of tab groups. - - Returns ``BrowserCounts`` in implicit multi-browser mode. - """ - multi_results = self._collect_multi_browser("group.count", {}) - if multi_results: - by_browser = {target.display_name: int(count or 0) for target, count in multi_results} - return BrowserCounts(total=sum(by_browser.values()), by_browser=by_browser) - return self._cmd("group.count", {}) - - def groups_query(self, search: str) -> list[Group]: - """Alias for :meth:`group_query`.""" - return self.group_query(search) - - def group_query(self, search: str) -> list[Group]: - """Search groups by name.""" - return [self._make_group(g) for g in (self._cmd("group.query", {"search": search}) or [])] - - def group_close(self, group_id: int) -> None: - """Ungroup (and close) a tab group by ID.""" - self._cmd("group.close", {"groupId": group_id}) - - def groups_create(self, name: str) -> Group: - """Alias for :meth:`group_create`.""" - return self.group_create(name) - - def group_open(self, name: str) -> Group: - """Alias for :meth:`group_create`.""" - return self.group_create(name) - - def group_create(self, name: str) -> Group: - """Create a new tab group with *name*. Returns the created Group.""" - data = self._cmd("group.open", {"name": name}) - return self._make_group(data) if isinstance(data, dict) else Group(id=data, title=name, color="", collapsed=False, tab_count=0) - - def group_add_tab(self, group: str | int, url: str | None = None) -> int | None: - """Open a new tab (optionally at URL) inside a group. Returns the new tab ID.""" - result = self._cmd("group.add_tab", {"group": str(group), "url": url}) - return result.get("tabId") if isinstance(result, dict) else result - - def group_move(self, group: str | int, *, forward: bool = False, backward: bool = False) -> None: - """Move a tab group forward or backward.""" - self._cmd("group.move", {"group": str(group), "forward": forward, "backward": backward}) - - # ── Windows ─────────────────────────────────────────────────────────── - - def windows_list(self) -> list[dict]: - """Return browser windows. - - In implicit multi-browser mode each window dict includes a ``browser`` key. - """ - multi_results = self._collect_multi_browser("windows.list", {}) - if multi_results: - return [ - {**window, "browser": target.display_name} - for target, windows in multi_results - for window in (windows or []) - ] - return self._cmd("windows.list", {}) or [] - - def windows_rename(self, window_id: int, name: str) -> None: - self._cmd("windows.rename", {"windowId": window_id, "name": name}) - - def windows_close(self, window_id: int) -> None: - self._cmd("windows.close", {"windowId": window_id}) - - def windows_open(self, url: str | None = None) -> dict: - """Open a new browser window, optionally on a URL.""" - return self._cmd("windows.open", {"url": url}) or {} - - # ── DOM ─────────────────────────────────────────────────────────────── - - def dom_query(self, selector: str) -> list[dict]: - return self._cmd("dom.query", {"selector": selector}) or [] - - def dom_click(self, selector: str) -> None: - self._cmd("dom.click", {"selector": selector}) - - def dom_type(self, selector: str, text: str) -> None: - self._cmd("dom.type", {"selector": selector, "text": text}) - - def dom_attr(self, selector: str, attr: str) -> list[str]: - return self._cmd("dom.attr", {"selector": selector, "attr": attr}) or [] - - def dom_text(self, selector: str) -> list[str]: - return self._cmd("dom.text", {"selector": selector}) or [] - - def dom_exists(self, selector: str) -> bool: - return self._cmd("dom.exists", {"selector": selector}) or False - - def dom_scroll(self, selector: str | None = None, *, x: int | None = None, y: int | None = None) -> None: - """Scroll to a CSS selector or to pixel coordinates.""" - self._cmd("dom.scroll", {"selector": selector, "x": x, "y": y}) - - def dom_select(self, selector: str, value: str) -> None: - """Set the value of a dropdown matching CSS SELECTOR.""" - _handle("dom.select", {"selector": selector, "value": value}) + client_from_ctx().dom.select(selector, value) console.print(f"[green]Selected '{value}' in:[/green] {selector}") - @dom_group.command("eval") @click.argument("code") -@click.option("--tab", "tab_id", type=int, default=None, help="Tab ID (default: active tab)") +@tab_option +@handle_errors def dom_eval(code, tab_id): """Evaluate JavaScript CODE in the page and print the result.""" - result = _handle("dom.eval", {"code": code, "tabId": tab_id}) + result = client_from_ctx().dom.eval(code, tab_id) if result is None: console.print("[dim]null[/dim]") else: console.print(json.dumps(result, indent=2) if isinstance(result, (dict, list)) else str(result)) - @dom_group.command("wait-for") @click.argument("selector") @click.option("--timeout", type=float, default=10.0, show_default=True, help="Max seconds to wait") @click.option("--visible", is_flag=True, help="Wait until element is visible (non-zero size)") @click.option("--hidden", is_flag=True, help="Wait until element is absent or hidden") -@click.option("--tab", "tab_id", type=int, default=None, help="Tab ID (default: active tab)") +@tab_option +@handle_errors def dom_wait_for(selector, timeout, visible, hidden, tab_id): """Wait until CSS SELECTOR appears (or disappears) in the DOM.""" - _handle("dom.wait_for", { - "selector": selector, - "timeout": int(timeout * 1000), - "visible": visible, - "hidden": hidden, - "tabId": tab_id, - }) + client_from_ctx().dom.wait_for(selector, timeout=timeout, visible=visible, hidden=hidden, tab_id=tab_id) state = "hidden" if hidden else ("visible" if visible else "present") console.print(f"[green]Ready ({state}):[/green] {selector}") - @dom_group.command("key") @click.argument("key") @click.option("--selector", default=None, help="CSS selector to target (default: focused element)") +@handle_errors def dom_key(key, selector): """Dispatch a keyboard KEY event (e.g. Enter, Tab, Escape, ArrowDown).""" - _handle("dom.key", {"key": key, "selector": selector}) + client_from_ctx().dom.key(key, selector) target = selector or "active element" console.print(f"[green]Key '{key}' sent to:[/green] {target}") - @dom_group.command("hover") @click.argument("selector") +@handle_errors def dom_hover(selector): """Dispatch mouseover/mouseenter on the element matching CSS SELECTOR.""" - _handle("dom.hover", {"selector": selector}) + client_from_ctx().dom.hover(selector) console.print(f"[green]Hovered:[/green] {selector}") - @dom_group.command("check") @click.argument("selector") +@handle_errors def dom_check(selector): """Check a checkbox matching CSS SELECTOR.""" - _handle("dom.check", {"selector": selector}) + client_from_ctx().dom.check(selector) console.print(f"[green]Checked:[/green] {selector}") - @dom_group.command("uncheck") @click.argument("selector") +@handle_errors def dom_uncheck(selector): """Uncheck a checkbox matching CSS SELECTOR.""" - _handle("dom.uncheck", {"selector": selector}) + client_from_ctx().dom.uncheck(selector) console.print(f"[green]Unchecked:[/green] {selector}") - @dom_group.command("clear") @click.argument("selector") +@handle_errors def dom_clear(selector): """Clear the value of an input matching CSS SELECTOR.""" - _handle("dom.clear", {"selector": selector}) + client_from_ctx().dom.clear(selector) console.print(f"[green]Cleared:[/green] {selector}") - @dom_group.command("focus") @click.argument("selector") +@handle_errors def dom_focus(selector): """Focus the element matching CSS SELECTOR.""" - _handle("dom.focus", {"selector": selector}) + client_from_ctx().dom.focus(selector) console.print(f"[green]Focused:[/green] {selector}") - @dom_group.command("submit") @click.argument("selector") +@handle_errors def dom_submit(selector): """Submit the form that contains the element matching CSS SELECTOR.""" - _handle("dom.submit", {"selector": selector}) + client_from_ctx().dom.submit(selector) console.print(f"[green]Submitted form for:[/green] {selector}") - @dom_group.command("poll") @click.argument("selector") @click.argument("pattern") @click.option("--attr", default=None, help="Attribute or property to read (default: textContent/value)") @click.option("--timeout", type=float, default=30.0, show_default=True, help="Max seconds to wait") @click.option("--interval", type=float, default=0.5, show_default=True, help="Poll interval in seconds") -@click.option("--tab", "tab_id", type=int, default=None, help="Tab ID (default: active tab)") +@tab_option +@handle_errors def dom_poll(selector, pattern, attr, timeout, interval, tab_id): """Poll SELECTOR until its text/value matches regex PATTERN.""" - result = _handle("dom.poll", { - "selector": selector, - "pattern": pattern, - "attr": attr, - "timeout": int(timeout * 1000), - "interval": int(interval * 1000), - "tabId": tab_id, - }) + result = client_from_ctx().dom.poll(selector, pattern, attr=attr, timeout=timeout, interval=interval, tab_id=tab_id) value = result.get("value", "") if isinstance(result, dict) else "" console.print(f"[green]Matched:[/green] {selector!r} = {value!r}") diff --git a/browser_cli/commands/extension.py b/browser_cli/commands/extension.py index 5b06585..6a87291 100644 --- a/browser_cli/commands/extension.py +++ b/browser_cli/commands/extension.py @@ -1,7 +1,6 @@ -import time import click from rich.console import Console -from browser_cli.commands import _handle +from browser_cli.commands import client_from_ctx, handle_errors console = Console() @@ -10,6 +9,7 @@ def extension_group(): """Manage the browser-cli browser extension.""" @extension_group.command("reload") +@handle_errors def extension_reload(): """Reload the browser-cli extension service worker. @@ -17,5 +17,5 @@ def extension_reload(): The command returns immediately; the extension restarts ~200 ms later. Re-connects automatically via the keepalive alarm within ~25 seconds. """ - _handle("extension.reload") + client_from_ctx().extension.reload() console.print("[green]Extension reloading…[/green] reconnects automatically") diff --git a/browser_cli/commands/extract.py b/browser_cli/commands/extract.py index 3107c0f..2e67da8 100644 --- a/browser_cli/commands/extract.py +++ b/browser_cli/commands/extract.py @@ -1,437 +1,24 @@ import json -import re -from html.parser import HTMLParser import click -from browser_cli.commands import _handle +from browser_cli.commands import client_from_ctx, handle_errors +# Re-exported for backward compatibility: the HTML→Markdown engine now lives in +# browser_cli.markdown and is applied by the SDK (ExtractNS.markdown). +from browser_cli.markdown import _clean_markdown_output, _convert_html_to_markdown # noqa: F401 from rich.console import Console from rich.table import Table console = Console() -_FENCE_RE = re.compile(r"```(?:[^\n`]*)\n.*?\n```", re.DOTALL) -_ESCAPED_MARKDOWN_RE = re.compile(r"\\([_-])") -_TABLE_SEPARATOR_RE = re.compile(r"^\|(?:\s*:?-{3,}:?\s*\|)+\s*$") - - -class _HtmlNode: - def __init__(self, tag=None, attrs=None, text=None): - self.tag = tag - self.attrs = attrs or {} - self.text = text - self.children = [] - - -class _HtmlTreeBuilder(HTMLParser): - _VOID_TAGS = {"br", "hr", "img"} - - def __init__(self): - super().__init__(convert_charrefs=True) - self.root = _HtmlNode(tag="document") - self._stack = [self.root] - - def handle_starttag(self, tag, attrs): - node = _HtmlNode(tag=tag.lower(), attrs=dict(attrs)) - self._stack[-1].children.append(node) - if node.tag not in self._VOID_TAGS: - self._stack.append(node) - - def handle_startendtag(self, tag, attrs): - node = _HtmlNode(tag=tag.lower(), attrs=dict(attrs)) - self._stack[-1].children.append(node) - - def handle_endtag(self, tag): - lowered = tag.lower() - for index in range(len(self._stack) - 1, 0, -1): - if self._stack[index].tag == lowered: - del self._stack[index:] - break - - def handle_data(self, data): - if data: - self._stack[-1].children.append(_HtmlNode(text=data)) - - -def _normalize_text(value): - return re.sub(r"\s+", " ", value or "").strip() - - -def _normalize_inline(value): - value = value.replace("\xa0", " ") - value = re.sub(r"[ \t\r\f\v]+", " ", value) - value = re.sub(r" *\n *", "\n", value) - return value.strip() - - -def _collapse_blank_lines(value): - value = re.sub(r"[ \t]+\n", "\n", value) - value = re.sub(r"\n{3,}", "\n\n", value) - return value.strip() - - -def _escape_markdown(text): - return re.sub(r"([\\`[\]])", r"\\\1", text) - - -def _escape_table_cell(text): - return text.replace("|", r"\|").replace("\n", " ").strip() - - -def _iter_descendants(node): - for child in getattr(node, "children", []): - yield child - yield from _iter_descendants(child) - - -def _has_class(node, class_name): - classes = (node.attrs.get("class") or "").split() - return class_name in classes - - -def _is_code_block_node(node): - if not node or not node.tag: - return False - if node.attrs.get("data-is-code-block-view") == "true": - return True - return node.tag == "pre" - - -def _inline_text(node): - if node.text is not None: - return _escape_markdown(node.text) - if not node.tag: - return "" - - tag = node.tag - if tag == "br": - return "\n" - if tag == "img": - src = node.attrs.get("src") or "" - alt = _normalize_text(node.attrs.get("alt") or "") - if not src: - return "" - return f"![{_escape_markdown(alt)}]({src})" if alt else f"![]({src})" - if tag == "a": - text = _normalize_inline("".join(_inline_text(child) for child in node.children)) - href = node.attrs.get("href") or "" - return f"[{text or href}]({href})" if href else text - if tag == "code": - text = _normalize_inline("".join(_inline_text(child) for child in node.children)) - return f"`{text.replace('`', r'\\`')}`" if text else "" - if tag in {"strong", "b"}: - text = _normalize_inline("".join(_inline_text(child) for child in node.children)) - return f"**{text}**" if text else "" - if tag in {"em", "i"}: - text = _normalize_inline("".join(_inline_text(child) for child in node.children)) - return f"*{text}*" if text else "" - - chunks = [] - for child in node.children: - rendered = _inline_text(child) - if rendered: - chunks.append(rendered) - if child.tag in {"p", "div", "table", "ul", "ol", "pre"}: - chunks.append("\n") - return "".join(chunks) - - -def _text_block(node): - return _collapse_blank_lines(_normalize_inline("".join(_inline_text(child) for child in node.children))) - - -def _inner_text_preserve(node): - if node.text is not None: - return node.text - if not node.tag: - return "" - if node.tag == "br": - return "" - return "".join(_inner_text_preserve(child) for child in node.children) - - -def _table_to_markdown(node): - rows = [] - for descendant in _iter_descendants(node): - if descendant.tag != "tr": - continue - row = [] - for cell in descendant.children: - if cell.tag in {"td", "th"}: - row.append(_escape_table_cell(_text_block(cell))) - if row: - rows.append(row) - if not rows: - return "" - - widths = max(len(row) for row in rows) - normalized_rows = [row + [""] * (widths - len(row)) for row in rows] - - headers = normalized_rows[0] - body_rows = normalized_rows[1:] - first_row_blank = all(not cell.strip() for cell in headers) - if first_row_blank and len(normalized_rows) > 1: - headers = normalized_rows[1] - body_rows = normalized_rows[2:] - - has_thead = any(child.tag == "thead" for child in node.children) - first_row = next((child for child in _iter_descendants(node) if child.tag == "tr"), None) - first_row_has_th = bool(first_row and any(child.tag == "th" for child in first_row.children)) - if not (has_thead or first_row_has_th or first_row_blank): - headers = [""] * widths - body_rows = normalized_rows - - separator = ["---"] * widths - lines = [ - f"| {' | '.join(headers)} |", - f"| {' | '.join(separator)} |", - ] - lines.extend(f"| {' | '.join(row)} |" for row in body_rows) - return "\n".join(lines) - - -def _list_to_markdown(node, depth=0): - ordered = node.tag == "ol" - items = [] - index = 1 - for child in node.children: - if child.tag != "li": - continue - marker = f"{index}. " if ordered else "- " - index += 1 - content = [] - nested = [] - for item_child in child.children: - if item_child.tag in {"ul", "ol"}: - nested.append(_list_to_markdown(item_child, depth + 1)) - else: - content.append(_inline_text(item_child)) - line = _collapse_blank_lines(_normalize_inline("".join(content))) - indent = " " * depth - if line: - line_parts = line.splitlines() - items.append(f"{indent}{marker}{line_parts[0]}") - continuation_indent = f"{indent}{' ' * len(marker)}" - items.extend(f"{continuation_indent}{part}" for part in line_parts[1:]) - items.extend(block for block in nested if block) - return "\n".join(items) - - -def _code_block_to_markdown(node): - if node.tag == "pre": - text = _inner_text_preserve(node).rstrip("\n") - return f"```\n{text}\n```" if text else "" - - lines = [] - for descendant in _iter_descendants(node): - if descendant.tag and _has_class(descendant, "cm-line"): - lines.append(_inner_text_preserve(descendant)) - code = "\n".join(lines).rstrip("\n") - return f"```\n{code}\n```" if code else "" - - -def _block_to_markdown(node): - if node.text is not None: - return _normalize_text(node.text) - if not node.tag: - return "" - if _is_code_block_node(node): - return _code_block_to_markdown(node) - if node.tag == "table": - return _table_to_markdown(node) - if node.tag in {"ul", "ol"}: - return _list_to_markdown(node) - if re.fullmatch(r"h[1-6]", node.tag): - text = _text_block(node) - return f"{'#' * int(node.tag[1])} {text}" if text else "" - if node.tag in {"p", "figcaption"}: - return _text_block(node) - if node.tag == "blockquote": - content = _collapse_blank_lines("\n\n".join(filter(None, (_block_to_markdown(child) for child in node.children)))) - return "\n".join(f"> {line}" if line else ">" for line in content.splitlines()) if content else "" - if node.tag == "hr": - return "---" - if node.tag == "img": - return _inline_text(node) - - child_blocks = [block for block in (_block_to_markdown(child) for child in node.children) if block] - if child_blocks: - return _collapse_blank_lines("\n\n".join(child_blocks)) - return _text_block(node) - - -def _parse_table_row(line): - stripped = line.strip() - if not stripped.startswith("|") or not stripped.endswith("|"): - return None - return [cell.strip() for cell in stripped.strip("|").split("|")] - - -def _repair_table_headers(lines): - repaired = [] - index = 0 - while index < len(lines): - if ( - index + 2 < len(lines) - and _parse_table_row(lines[index]) is not None - and _TABLE_SEPARATOR_RE.match(lines[index + 1].strip()) - and _parse_table_row(lines[index + 2]) is not None - ): - first = _parse_table_row(lines[index]) - third = _parse_table_row(lines[index + 2]) - if first and all(not cell for cell in first) and any(cell for cell in third): - repaired.append(lines[index + 2].strip()) - repaired.append(lines[index + 1].strip()) - index += 3 - continue - repaired.append(lines[index].strip()) - index += 1 - return repaired - - -def _repair_list_continuations(lines): - repaired = [] - previous_was_list_item = False - previous_continuation_indent = "" - - for line in lines: - stripped = line.strip() - list_match = re.match(r"^(\s*)([-*+]|\d+\.)\s+.+$", stripped) - is_markdown_block_start = ( - not stripped - or stripped.startswith(("```", "#", ">", "|")) - or _TABLE_SEPARATOR_RE.match(stripped) - or re.match(r"^(\s*)([-*+]|\d+\.)\s+", stripped) - ) - - if previous_was_list_item and stripped and not is_markdown_block_start: - repaired.append(f"{previous_continuation_indent}{stripped}") - previous_was_list_item = False - continue - - repaired.append(stripped) - if list_match: - marker = list_match.group(2) - base_indent = list_match.group(1) - previous_continuation_indent = f"{base_indent}{' ' * (len(marker) + 1)}" - previous_was_list_item = True - else: - previous_was_list_item = False - - return repaired - - -def _repair_flattened_diagram(text): - if "\n" in text: - return text - if sum(text.count(char) for char in "│▼├└") < 2: - return text - - text = re.sub(r"\s{2,}([│▼])", r"\n \1", text) - text = re.sub(r"([│▼])\s{2,}", r"\1\n", text) - text = re.sub(r"([│▼])(?=[^\s\n│▼├└])", r"\1\n", text) - text = re.sub(r"(?<=[^\s\n])([├└])", r"\n\1", text) - text = re.sub(r"([^\s\n])(\()", r"\1\n\2", text) - return "\n".join(line.rstrip() for line in text.splitlines() if line.strip()) - - -def _convert_dash_lists_to_branches(lines): - converted = [] - index = 0 - while index < len(lines): - match = re.match(r"^(\s*)-\s+(.*)$", lines[index]) - if not match: - converted.append(lines[index]) - index += 1 - continue - - indent = match.group(1) - items = [] - while index < len(lines): - next_match = re.match(rf"^{re.escape(indent)}-\s+(.*)$", lines[index]) - if not next_match: - break - items.append(next_match.group(1)) - index += 1 - - for item_index, item in enumerate(items): - branch = "└" if item_index == len(items) - 1 else "├" - converted.append(f"{indent}{branch} {item}") - return converted - - -def _clean_code_block(code): - lines = [line.rstrip() for line in code.splitlines()] - while lines and not lines[0].strip(): - lines.pop(0) - while lines and not lines[-1].strip(): - lines.pop() - - flattened = _repair_flattened_diagram("\n".join(lines)) - lines = flattened.splitlines() if flattened else [] - lines = [ - f" {line.strip()}" - if line.strip() in {"│", "▼"} and not re.match(r"^\s+[│▼]\s*$", line) - else line - for line in lines - ] - lines = _convert_dash_lists_to_branches(lines) - return "\n".join(lines) - - -def _clean_markdown_output(markdown): - if not markdown: - return "" - - pieces = [] - last_index = 0 - for match in _FENCE_RE.finditer(markdown): - prose = markdown[last_index:match.start()] - if prose: - cleaned = _ESCAPED_MARKDOWN_RE.sub(r"\1", prose) - lines = [line.strip() for line in cleaned.splitlines()] - lines = _repair_table_headers(lines) - lines = _repair_list_continuations(lines) - cleaned = "\n".join(lines) - cleaned = _collapse_blank_lines(cleaned) - if cleaned: - pieces.append(cleaned) - - fence = match.group(0) - header, _, tail = fence.partition("\n") - body, _, _ = tail.rpartition("\n") - cleaned_body = _clean_code_block(body) - pieces.append(f"{header}\n{cleaned_body}\n```" if cleaned_body else f"{header}\n```") - last_index = match.end() - - trailing = markdown[last_index:] - if trailing: - cleaned = _ESCAPED_MARKDOWN_RE.sub(r"\1", trailing) - lines = [line.strip() for line in cleaned.splitlines()] - lines = _repair_table_headers(lines) - lines = _repair_list_continuations(lines) - cleaned = "\n".join(lines) - cleaned = _collapse_blank_lines(cleaned) - if cleaned: - pieces.append(cleaned) - - return "\n\n".join(piece for piece in pieces if piece) - - -def _convert_html_to_markdown(html): - parser = _HtmlTreeBuilder() - parser.feed(html or "") - markdown = _block_to_markdown(parser.root) - return _clean_markdown_output(markdown) - @click.group("extract") def extract_group(): """Extract content from the active tab.""" - @extract_group.command("links") +@handle_errors def extract_links(): """Extract all links from the active tab.""" - links = _handle("extract.links") + links = client_from_ctx().extract.links() if not links: console.print("[yellow]No links found[/yellow]") return @@ -442,11 +29,11 @@ def extract_links(): table.add_row((lnk.get("text") or "")[:60], lnk.get("href") or "") console.print(table) - @extract_group.command("images") +@handle_errors def extract_images(): """Extract all images from the active tab.""" - images = _handle("extract.images") + images = client_from_ctx().extract.images() if not images: console.print("[yellow]No images found[/yellow]") return @@ -457,36 +44,30 @@ def extract_images(): table.add_row((img.get("alt") or "")[:40], img.get("src") or "") console.print(table) - @extract_group.command("text") +@handle_errors def extract_text(): """Extract all visible text from the active tab.""" - text = _handle("extract.text") - console.print(text or "") - + console.print(client_from_ctx().extract.text()) @extract_group.command("json") @click.argument("selector") +@handle_errors def extract_json(selector): """Parse and pretty-print JSON content inside SELECTOR.""" - data = _handle("extract.json", {"selector": selector}) + data = client_from_ctx().extract.json(selector) console.print_json(json.dumps(data)) - @extract_group.command("html") +@handle_errors def extract_html(): """Print the full HTML of the active tab to stdout.""" - html = _handle("extract.html") - click.echo(html or "") - + click.echo(client_from_ctx().extract.html()) @extract_group.command("markdown") @click.option("--selector", help="Extract only the DOM subtree matching this CSS selector.") +@handle_errors def extract_markdown(selector): """Extract the page's main content as Markdown.""" - markdown = _handle("extract.markdown", {"selector": selector}) - if (markdown or "").lstrip().startswith("<"): - markdown = _convert_html_to_markdown(markdown) - else: - markdown = _clean_markdown_output(markdown or "") + markdown = client_from_ctx().extract.markdown(selector) click.echo(markdown or "", nl=not (markdown or "").endswith("\n")) diff --git a/browser_cli/commands/groups.py b/browser_cli/commands/groups.py index b3a758f..0918b42 100644 --- a/browser_cli/commands/groups.py +++ b/browser_cli/commands/groups.py @@ -1,12 +1,11 @@ import click -from browser_cli.commands import _handle, _handle_multi, _multi_browser_targets +from browser_cli.commands import client_from_ctx, gentle_mode_option, handle_errors, print_counts from rich.console import Console from rich.table import Table console = Console() - -def _print_groups(groups: list[dict], *, show_browser: bool = False) -> None: +def _print_groups(groups, *, show_browser: bool = False) -> None: if not groups: console.print("[yellow]No groups found[/yellow]") return @@ -20,128 +19,88 @@ def _print_groups(groups: list[dict], *, show_browser: bool = False) -> None: table.add_column("Tabs", width=6) for g in groups: row = [ - g.get("browser", "") if show_browser else None, - str(g.get("id", "")), - g.get("title") or "", - g.get("color") or "", - "yes" if g.get("collapsed") else "no", - str(g.get("tabCount", "")), + (g.browser or "") if show_browser else None, + str(g.id), + g.title or "", + g.color or "", + "yes" if g.collapsed else "no", + str(g.tab_count), ] table.add_row(*[value for value in row if value is not None]) console.print(table) - @click.group("groups") def group_group(): """Manage tab groups.""" - @group_group.command("list") +@handle_errors def group_list(): """List all tab groups.""" - targets = _multi_browser_targets() - if targets: - groups = [] - for target in targets: - result = _handle_multi("group.list", profile=target.profile, remote=target.remote) - if result is None: - continue - groups.extend({**group, "browser": target.display_name} for group in result) - if not groups: - console.print("[red]Error:[/red] Cannot resolve a browser socket automatically.") - raise SystemExit(1) - _print_groups(groups, show_browser=True) - return - groups = _handle("group.list") - _print_groups(groups or []) - + groups = client_from_ctx().groups.list() + _print_groups(groups, show_browser=any(g.browser for g in groups)) @group_group.command("tabs") @click.argument("group_id", type=int) +@handle_errors def group_tabs(group_id): """List tabs inside a group.""" from browser_cli.commands.tabs import _print_tabs - tabs = _handle("group.tabs", {"groupId": group_id}) - _print_tabs(tabs or []) - + _print_tabs(client_from_ctx().groups.tabs(group_id)) @group_group.command("count") +@handle_errors def group_count(): """Count all tab groups.""" - targets = _multi_browser_targets() - if targets: - table = Table(show_header=True, header_style="bold cyan") - table.add_column("Browser") - table.add_column("Groups", justify="right") - total = 0 - rows = 0 - for target in targets: - count = _handle_multi("group.count", profile=target.profile, remote=target.remote) - if count is None: - continue - count = int(count or 0) - total += count - rows += 1 - table.add_row(target.display_name, str(count)) - if rows == 0: - console.print("[red]Error:[/red] Cannot resolve a browser socket automatically.") - raise SystemExit(1) - table.add_row("Total", str(total)) - console.print(table) - return - count = _handle("group.count") - console.print(f"[bold]{count}[/bold] group(s)") - + print_counts(client_from_ctx().groups.count(), "group") @group_group.command("query") @click.argument("search") +@handle_errors def group_query(search): """Search groups by name.""" - groups = _handle("group.query", {"search": search}) - _print_groups(groups or []) - + _print_groups(client_from_ctx().groups.query(search)) @group_group.command("close") @click.argument("group_id", type=int) -@click.option("--gentle-mode", type=click.Choice(["auto", "normal", "gentle", "ultra"]), default="auto", show_default=True, help="Throttle mode for large group operations.") +@gentle_mode_option("Throttle mode for large group operations.") +@handle_errors def group_close(group_id, gentle_mode): """Close (ungroup and optionally close) a tab group.""" - _handle("group.close", {"groupId": group_id, "gentleMode": gentle_mode}) + client_from_ctx().groups.close(group_id, gentle_mode=gentle_mode) console.print(f"[green]Group {group_id} closed[/green]") - @group_group.command("create") @click.argument("name") +@handle_errors def group_create(name): """Create a new tab group with NAME.""" - result = _handle("group.open", {"name": name}) - gid = result.get("id") if isinstance(result, dict) else result - console.print(f"[green]Created group '{name}'[/green] (id: {gid})") - + group = client_from_ctx().groups.create(name) + console.print(f"[green]Created group '{name}'[/green] (id: {group.id})") @group_group.command("add-tab") @click.argument("group") @click.argument("url", required=False) +@handle_errors def group_add_tab(group, url): """Open a new tab (optionally at URL) inside GROUP (name or ID).""" - result = _handle("group.add_tab", {"group": group, "url": url}) - tab_id = result.get("tabId") if isinstance(result, dict) else result + tab_id = client_from_ctx().groups.add_tab(group, url) label = url or "new tab" console.print(f"[green]Opened {label}[/green] in group '{group}' (tab id: {tab_id})") - @group_group.command("move") @click.argument("group") @click.option("-f", "--forward", "forward", is_flag=True, help="Move group one position to the right") @click.option("-b", "--backward", "backward", is_flag=True, help="Move group one position to the left") @click.option("-r", "--right", "forward", is_flag=True, help="Move group one position to the right") @click.option("-l", "--left", "backward", is_flag=True, help="Move group one position to the left") +@handle_errors def group_move(group, forward, backward): """Move a tab group forward/backward or right/left (name or ID).""" if not forward and not backward: console.print("[red]Specify --forward/--right or --backward/--left[/red]") raise SystemExit(1) - result = _handle("group.move", {"group": group, "forward": forward, "backward": backward}) + result = client_from_ctx().groups.move(group, forward=forward, backward=backward) if isinstance(result, dict) and not result.get("moved"): console.print(f"[yellow]Group '{group}' is already at the {'end' if forward else 'start'}[/yellow]") else: diff --git a/browser_cli/commands/navigate.py b/browser_cli/commands/navigate.py index ef31b1f..1611daa 100644 --- a/browser_cli/commands/navigate.py +++ b/browser_cli/commands/navigate.py @@ -1,23 +1,22 @@ import click -from browser_cli.commands import _handle +from browser_cli.commands import client_from_ctx, handle_errors, tab_option from rich.console import Console console = Console() - @click.group("nav") def nav_group(): """Navigate — open URLs, reload, go back/forward, focus tabs.""" - @nav_group.command("open") @click.argument("url") @click.option("--bg", is_flag=True, help="Open in background (no focus)") @click.option("--window", "window_name", default=None, help="Open in named window") @click.option("--group", "group_name", default=None, help="Open directly into a tab group (name or ID)") +@handle_errors def cmd_open(url, bg, window_name, group_name): """Open URL in a new tab.""" - _handle("navigate.open", {"url": url, "background": bg, "window": window_name, "group": group_name}) + client_from_ctx().nav.open(url, background=bg, window=window_name, group=group_name) suffix = "" if group_name: suffix = f" in group '{group_name}'" @@ -25,71 +24,67 @@ def cmd_open(url, bg, window_name, group_name): suffix = f" in window '{window_name}'" console.print(f"[green]Opened:[/green] {url}{suffix}") - @nav_group.command("reload") @click.argument("tab_id", type=int, required=False) +@handle_errors def cmd_reload(tab_id): """Reload the active (or specified) tab.""" - _handle("navigate.reload", {"tabId": tab_id}) + client_from_ctx().nav.reload(tab_id) console.print("[green]Reloaded[/green]") - @nav_group.command("hard-reload") @click.argument("tab_id", type=int, required=False) +@handle_errors def cmd_hard_reload(tab_id): """Hard reload (bypass cache) the active (or specified) tab.""" - _handle("navigate.hard_reload", {"tabId": tab_id}) + client_from_ctx().nav.hard_reload(tab_id) console.print("[green]Hard reloaded[/green]") - @nav_group.command("back") @click.argument("tab_id", type=int, required=False) +@handle_errors def cmd_back(tab_id): """Navigate back in the active (or specified) tab.""" - _handle("navigate.back", {"tabId": tab_id}) + client_from_ctx().nav.back(tab_id) console.print("[green]Navigated back[/green]") - @nav_group.command("forward") @click.argument("tab_id", type=int, required=False) +@handle_errors def cmd_forward(tab_id): """Navigate forward in the active (or specified) tab.""" - _handle("navigate.forward", {"tabId": tab_id}) + client_from_ctx().nav.forward(tab_id) console.print("[green]Navigated forward[/green]") - @nav_group.command("focus") @click.argument("pattern") +@handle_errors def cmd_focus(pattern): """Jump to a tab by URL pattern or tab ID.""" - result = _handle("navigate.focus", {"pattern": pattern}) + result = client_from_ctx().nav.focus(pattern) if result: console.print(f"[green]Focused:[/green] {result.get('url', result)}") else: console.print(f"[yellow]No tab found matching:[/yellow] {pattern}") - @nav_group.command("open-wait") @click.argument("url") @click.option("--timeout", type=float, default=30.0, show_default=True, help="Max seconds to wait for load") @click.option("--bg", is_flag=True, help="Open in background (no focus)") @click.option("--window", "window_name", default=None, help="Open in named window") @click.option("--group", "group_name", default=None, help="Open in tab group") +@handle_errors def cmd_open_wait(url, timeout, bg, window_name, group_name): """Open URL in a new tab and wait until fully loaded.""" - result = _handle("navigate.open_wait", { - "url": url, "timeout": int(timeout * 1000), - "background": bg, "window": window_name, "group": group_name, - }) - title = result.get("title", "") if isinstance(result, dict) else "" - console.print(f"[green]Loaded:[/green] {url}" + (f" — {title}" if title else "")) - + tab = client_from_ctx().nav.open_wait(url, timeout=timeout, background=bg, window=window_name, group=group_name) + console.print(f"[green]Loaded:[/green] {url}" + (f" — {tab.title}" if tab.title else "")) @nav_group.command("wait") -@click.option("--tab", "tab_id", type=int, default=None, help="Tab ID (default: active tab)") +@tab_option @click.option("--timeout", type=float, default=30.0, show_default=True, help="Max seconds to wait") @click.option("--ready-state", type=click.Choice(["complete", "interactive"]), default="complete", show_default=True, help="Target ready state") +@handle_errors def cmd_wait(tab_id, timeout, ready_state): """Wait until tab finishes loading.""" - result = _handle("navigate.wait", {"tabId": tab_id, "timeout": int(timeout * 1000), "readyState": ready_state}) - console.print(f"[green]Ready:[/green] {result.get('url', '')} — {result.get('title', '')}") + tab = client_from_ctx().tabs.wait_for_load(tab_id, timeout=timeout, ready_state=ready_state) + console.print(f"[green]Ready:[/green] {tab.url} — {tab.title}") diff --git a/browser_cli/commands/page.py b/browser_cli/commands/page.py index 99cd4de..1d727ff 100644 --- a/browser_cli/commands/page.py +++ b/browser_cli/commands/page.py @@ -1,20 +1,19 @@ import click -from browser_cli.commands import _handle +from browser_cli.commands import client_from_ctx, handle_errors from rich.console import Console from rich.table import Table console = Console() - @click.group("page") def page_group(): """Inspect current page metadata.""" - @page_group.command("info") +@handle_errors def page_info(): """Show title, URL, readyState, language, and meta tags of the active tab.""" - info = _handle("page.info") or {} + info = client_from_ctx().page.info() table = Table(show_header=False) table.add_column("Field", style="bold cyan", no_wrap=True) table.add_column("Value") diff --git a/browser_cli/commands/perf.py b/browser_cli/commands/perf.py index d9a2dc7..1c688e3 100644 --- a/browser_cli/commands/perf.py +++ b/browser_cli/commands/perf.py @@ -1,7 +1,7 @@ import click from rich.console import Console from rich.table import Table -from browser_cli.commands import _handle +from browser_cli.commands import client_from_ctx, handle_errors console = Console() @@ -10,9 +10,10 @@ def perf_group(): """Inspect and tune browser-cli performance behavior.""" @perf_group.command("status") +@handle_errors def perf_status(): """Show performance profile, throttle and running jobs.""" - result = _handle("perf.status") or {} + result = client_from_ctx().perf.status() console.print(f"Profile: [bold]{result.get('performanceProfile', 'auto')}[/bold]") console.print(f"Audible tabs: {'yes' if result.get('audible') else 'no'}") throttle = result.get("throttle") or {} @@ -39,7 +40,8 @@ def perf_status(): @perf_group.command("profile") @click.argument("profile", type=click.Choice(["auto", "normal", "gentle", "ultra"])) +@handle_errors def perf_profile(profile): """Set global performance profile.""" - result = _handle("perf.set_profile", {"profile": profile}) or {} + result = client_from_ctx().perf.set_profile(profile) console.print(f"[green]Performance profile set to {result.get('performanceProfile', profile)}[/green]") diff --git a/browser_cli/commands/search.py b/browser_cli/commands/search.py index f0aaeaf..11c7402 100644 --- a/browser_cli/commands/search.py +++ b/browser_cli/commands/search.py @@ -1,6 +1,5 @@ import click -from urllib.parse import quote_plus -from browser_cli.commands import _handle +from browser_cli.commands import client_from_ctx, handle_errors from rich.console import Console console = Console() @@ -61,23 +60,21 @@ _SUBCOMMANDS = [ def search_group(): """Search the web — open a query in a search engine.""" - def _build_command(engine_key: str, help_text: str) -> click.Command: @click.command(engine_key, help=help_text) @click.argument("query", nargs=-1, required=True) @click.option("--bg", is_flag=True, help="Open in background (no focus)") @click.option("--window", "window", default=None, help="Open in named window") @click.option("--group", "group", default=None, help="Open in tab group (name or ID)") + @handle_errors def _cmd(query, bg, window, group): terms = " ".join(query) - url = ENGINES[engine_key].format(query=quote_plus(terms)) - _handle("navigate.open", {"url": url, "background": bg, "window": window, "group": group}) + client_from_ctx().nav.search(engine_key, terms, background=bg, window=window, group=group) suffix = f" in group '{group}'" if group else (f" in window '{window}'" if window else "") display = _DISPLAY_NAMES.get(engine_key, engine_key.capitalize()) console.print(f"[green]Searching[/green] [cyan]{display}[/cyan]: {terms}{suffix}") return _cmd - for _name, _help in _SUBCOMMANDS: search_group.add_command(_build_command(_name, _help)) diff --git a/browser_cli/commands/serve.py b/browser_cli/commands/serve.py index 5c88577..885621c 100644 --- a/browser_cli/commands/serve.py +++ b/browser_cli/commands/serve.py @@ -4,6 +4,7 @@ from pathlib import Path from rich.console import Console +from browser_cli import transport from browser_cli.client import _recv_exact, _recv_all from browser_cli.compat import adapt_auth, adapt_request, adapt_response from browser_cli.version_manager import PROTOCOL_MIN_CLIENT, MAX_MSG_BYTES, parse_version, get_installed_version @@ -12,7 +13,6 @@ _UA_PATTERN = re.compile(r"^browser-cli/\d") _CONN_LIMIT = threading.BoundedSemaphore(64) console = Console() - def _framed_send(sock: socket.socket, data: bytes) -> None: sock.sendall(struct.pack(" None: +def _proxy_request(client_sock:socket.socket, addr:tuple, profile:str|None, auth_keys:list[str]|None, auth_keys_path:"Path|None", nonce:str, pq_private_key=None, compress:bool=True) -> None: from browser_cli.client import _resolve_socket, BrowserNotConnected from browser_cli.platform import is_windows response_secret = None + accept_encoding = None # set once the (decrypted) request is parsed; None → plain JSON def _send_payload(data: bytes) -> None: if response_secret is not None: @@ -38,16 +39,17 @@ def _proxy_request(client_sock:socket.socket, addr:tuple, profile:str|None, auth _framed_send(client_sock, data) def _send_error(msg_id, msg:str) -> None: + # errors stay plain JSON: tiny, and safe for any client err = json.dumps({"id": msg_id, "success": False, "error": msg}).encode() try: _send_payload(err) except OSError: pass - def _send_ok(msg_id, payload) -> None: - out = json.dumps({"id": msg_id, "success": True, "data": payload}).encode() + def _send_ok(msg_id, payload, command=None) -> None: + obj = {"id": msg_id, "success": True, "data": payload} try: - _send_payload(out) + _send_payload(transport.encode_response(obj, accept_encoding if compress else None, command)) except OSError: pass @@ -141,13 +143,16 @@ def _proxy_request(client_sock:socket.socket, addr:tuple, profile:str|None, auth return response_secret = pq_shared_secret if transport_encrypted else None + # client advertises what response encodings it can decode (signed, then stripped) + accept_encoding = msg.get("accept_encoding") + if command == "browser-cli.targets": from browser_cli.client import active_browser_targets targets = [ {"profile": target.profile, "displayName": target.display_name} for target in active_browser_targets(include_remotes=False) ] - _send_ok(msg_id, targets) + _send_ok(msg_id, targets, command) _log(addr, command, None, "OK") return @@ -158,7 +163,7 @@ def _proxy_request(client_sock:socket.socket, addr:tuple, profile:str|None, auth return from browser_cli.auth import load_authorized_keys_with_names entries = [{"pubkey": pk, "name": name} for pk, name in load_authorized_keys_with_names(auth_keys_path)] - _send_ok(msg_id, entries) + _send_ok(msg_id, entries, command) _log(addr, command, None, "OK") return @@ -176,14 +181,14 @@ def _proxy_request(client_sock:socket.socket, addr:tuple, profile:str|None, auth _log(addr, command, None, "ERROR", "invalid pubkey") return added = add_authorized_key(auth_keys_path, pubkey, name) - _send_ok(msg_id, {"added": added}) + _send_ok(msg_id, {"added": added}, command) _log(addr, command, None, "OK" if added else "ALREADY_TRUSTED") return resolved_profile = msg.get("_route") or profile # ── strip protocol fields, apply request compat shim, forward ───────────── - strip = {"token", "_route", "pubkey", "sig", "user_agent", "pq_kex", "encrypted"} + strip = {"token", "_route", "pubkey", "sig", "user_agent", "pq_kex", "encrypted", "accept_encoding"} clean_msg = {k: v for k, v in msg.items() if k not in strip} clean_msg = adapt_request(clean_msg, client_ver) clean_payload = json.dumps(clean_msg).encode() @@ -203,16 +208,19 @@ def _proxy_request(client_sock:socket.socket, addr:tuple, profile:str|None, auth pipe.send_bytes(clean_payload) resp_payload = pipe.recv_bytes() resp_payload = adapt_response(resp_payload, command, client_ver) - _send_payload(resp_payload) else: with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as local: local.connect(sock_path) local.sendall(clean_header + clean_payload) resp_payload = _recv_all(local) resp_payload = adapt_response(resp_payload, command, client_ver) - _send_payload(resp_payload) + # parse once: drives both the access log and (re-)encoding for the client resp_data = json.loads(resp_payload) + if compress: + _send_payload(transport.encode_response(resp_data, accept_encoding, command)) + else: + _send_payload(resp_payload) if resp_data.get("success", True): _log(addr, command, resolved_profile, "OK") else: @@ -221,7 +229,7 @@ def _proxy_request(client_sock:socket.socket, addr:tuple, profile:str|None, auth _send_error(msg_id, str(e)) _log(addr, command, resolved_profile, "ERROR", str(e)) -def _handle_client(client_sock:socket.socket, addr:tuple, profile:str|None, auth_keys_path:"Path|None") -> None: +def _handle_client(client_sock:socket.socket, addr:tuple, profile:str|None, auth_keys_path:"Path|None", compress:bool=True) -> None: if not _CONN_LIMIT.acquire(blocking=False): client_sock.close() return @@ -253,7 +261,7 @@ def _handle_client(client_sock:socket.socket, addr:tuple, profile:str|None, auth _framed_send(client_sock, challenge) except OSError: return - _proxy_request(client_sock, addr, profile, auth_keys, auth_keys_path, nonce, pq_private_key) + _proxy_request(client_sock, addr, profile, auth_keys, auth_keys_path, nonce, pq_private_key, compress) finally: _CONN_LIMIT.release() @@ -264,10 +272,13 @@ def _handle_client(client_sock:socket.socket, addr:tuple, profile:str|None, auth @click.option("--no-auth", is_flag=True, default=False, help="Disable authentication (dangerous).") @click.option("--authorized-keys", "auth_keys_file", default=None, metavar="FILE", help="File of trusted Ed25519 public keys (one hex per line). Required unless --no-auth.") +@click.option("--no-compress", "no_compress", is_flag=True, default=False, + help="Disable response compression / msgpack even for clients that support it.") @click.pass_context -def cmd_serve(ctx, host, port, no_auth, auth_keys_file): +def cmd_serve(ctx, host, port, no_auth, auth_keys_file, no_compress): """Expose this browser over TCP so remote hosts can control it.""" profile = ctx.obj.get("browser") if ctx.obj else None + compress = not no_compress if host in ("0.0.0.0", "::"): console.print("[yellow]Warning:[/yellow] Binding to all interfaces — anyone who can reach this port controls your browser.") @@ -302,18 +313,25 @@ def cmd_serve(ctx, host, port, no_auth, auth_keys_file): n = len(load_authorized_keys(auth_keys_path)) console.print(f" Auth: [bold green]Ed25519 pubkey[/bold green] ({n} trusted key{'s' if n != 1 else ''})") console.print(f" CLI: [dim]browser-cli --remote {host}:{port} tabs list[/dim]") - console.print(f" Python: [dim]BrowserCLI(remote=\"{host}:{port}\").tabs_list()[/dim]") + console.print(f" Python: [dim]BrowserCLI(remote=\"{host}:{port}\").tabs.list()[/dim]") else: console.print(f" CLI: [dim]browser-cli --remote {host}:{port} tabs list[/dim]") - console.print(f" Python: [dim]BrowserCLI(remote=\"{host}:{port}\").tabs_list()[/dim]") + console.print(f" Python: [dim]BrowserCLI(remote=\"{host}:{port}\").tabs.list()[/dim]") console.print("[yellow] Auth disabled (--no-auth)[/yellow]") + if compress: + codecs = "+".join(transport.supported_compression()) + sers = "+".join(transport.supported_serialization()) + console.print(f" Encode: [green]on[/green] [dim](compression: {codecs}; serialization: {sers}; per-client negotiated)[/dim]") + else: + console.print(" Encode: [yellow]off (--no-compress)[/yellow]") + console.print("Ctrl-C to stop.\n") try: while True: conn, addr = server.accept() - threading.Thread(target=_handle_client, args=(conn, addr, profile, auth_keys_path), daemon=True).start() + threading.Thread(target=_handle_client, args=(conn, addr, profile, auth_keys_path, compress), daemon=True).start() except KeyboardInterrupt: console.print("[yellow]Stopped.[/yellow]") finally: diff --git a/browser_cli/commands/session.py b/browser_cli/commands/session.py index 26aaa3e..44e5b89 100644 --- a/browser_cli/commands/session.py +++ b/browser_cli/commands/session.py @@ -1,5 +1,5 @@ import click -from browser_cli.commands import _handle, _handle_multi, _multi_browser_targets +from browser_cli.commands import client_from_ctx, gentle_mode_option, handle_errors from rich.console import Console console = Console() @@ -10,41 +10,47 @@ def session_group(): @session_group.command("save") @click.argument("name") +@handle_errors def session_save(name): """Save all current tabs as session NAME.""" - result = _handle("session.save", {"name": name}) + result = client_from_ctx().session.save(name) count = result.get("tabs", 0) if isinstance(result, dict) else 0 console.print(f"[green]Session '{name}' saved[/green] ({count} tabs)") @session_group.command("load") @click.argument("name") -@click.option("--gentle-mode", type=click.Choice(["auto", "normal", "gentle", "ultra"]), default="auto", show_default=True, help="Throttle mode for large restores.") +@gentle_mode_option("Throttle mode for large restores.") @click.option("--discard-background-tabs", is_flag=True, help="Discard restored background tabs after opening to reduce load.") @click.option("--lazy", is_flag=True, help="Create lightweight placeholder tabs after --eager-tabs; placeholders load when selected.") @click.option("--eager-tabs", type=int, default=10, show_default=True, help="Number of real tabs to open before lazy placeholders.") @click.option("--background", "background_job", is_flag=True, help="Start restore as a background job and return immediately.") +@handle_errors def session_load(name, gentle_mode, discard_background_tabs, lazy, eager_tabs, background_job): """Restore session NAME (opens all saved tabs).""" - result = _handle("session.load", { - "name": name, - "gentleMode": gentle_mode, - "discardBackgroundTabs": discard_background_tabs, - "lazy": lazy, - "eagerTabs": eager_tabs, - "__background": background_job, - }) - if background_job and isinstance(result, dict) and result.get("jobId"): - console.print(f"[green]Session restore started[/green] job={result['jobId']}") - return + b = client_from_ctx() + if background_job: + result = b.session.load_background( + name, gentle_mode=gentle_mode, discard_background_tabs=discard_background_tabs, + lazy=lazy, eager_tabs=eager_tabs, + ) + if isinstance(result, dict) and result.get("jobId"): + console.print(f"[green]Session restore started[/green] job={result['jobId']}") + return + else: + result = b.session.load( + name, gentle_mode=gentle_mode, discard_background_tabs=discard_background_tabs, + lazy=lazy, eager_tabs=eager_tabs, + ) count = result.get("tabs", 0) if isinstance(result, dict) else 0 console.print(f"[green]Session '{name}' loaded[/green] ({count} tabs opened)") @session_group.command("diff") @click.argument("name_a") @click.argument("name_b") +@handle_errors def session_diff(name_a, name_b): """Show tabs added/removed between two saved sessions.""" - diff = _handle("session.diff", {"nameA": name_a, "nameB": name_b}) + diff = client_from_ctx().session.diff(name_a, name_b) if not diff: console.print("[yellow]No diff data returned[/yellow]") return @@ -66,26 +72,16 @@ def session_diff(name_a, name_b): console.print("[green]Sessions are identical[/green]") @session_group.command("list") +@handle_errors def session_list(): """List all saved sessions.""" + from datetime import datetime from rich.table import Table - targets = _multi_browser_targets() - show_browser = bool(targets) - if targets: - sessions = [] - for target in targets: - result = _handle_multi("session.list", profile=target.profile, remote=target.remote) - if result is None: - continue - sessions.extend({**session, "browser": target.display_name} for session in result) - if not sessions: - console.print("[red]Error:[/red] Cannot resolve a browser socket automatically.") - raise SystemExit(1) - else: - sessions = _handle("session.list") + sessions = client_from_ctx().session.list() if not sessions: console.print("[yellow]No saved sessions[/yellow]") return + show_browser = any("browser" in s for s in sessions) table = Table(show_header=True, header_style="bold cyan") if show_browser: table.add_column("Browser") @@ -93,7 +89,6 @@ def session_list(): table.add_column("Tabs", width=6) table.add_column("Saved at") for s in sessions: - from datetime import datetime saved = datetime.fromtimestamp(s["savedAt"] / 1000).strftime("%Y-%m-%d %H:%M") if s.get("savedAt") else "" row = [s.get("browser", "")] if show_browser else [] row.extend([s["name"], str(s["tabs"]), saved]) @@ -102,16 +97,18 @@ def session_list(): @session_group.command("remove") @click.argument("name") +@handle_errors def session_remove(name): """Delete a saved session.""" - _handle("session.remove", {"name": name}) + client_from_ctx().session.remove(name) console.print(f"[green]Session '{name}' removed[/green]") @session_group.command("job-status") @click.argument("job_id") +@handle_errors def session_job_status(job_id): """Show status for a background session job.""" - result = _handle("jobs.status", {"jobId": job_id}) or {} + result = client_from_ctx().perf.job_status(job_id) status = result.get("status", "unknown") console.print(f"[bold]{job_id}[/bold]: {status}") if result.get("error"): @@ -121,15 +118,17 @@ def session_job_status(job_id): @session_group.command("job-cancel") @click.argument("job_id") +@handle_errors def session_job_cancel(job_id): """Cancel a running background job.""" - _handle("jobs.cancel", {"jobId": job_id}) + client_from_ctx().perf.job_cancel(job_id) console.print(f"[green]Cancel requested for {job_id}[/green]") @session_group.command("auto-save") @click.argument("state", type=click.Choice(["on", "off"])) +@handle_errors def session_auto_save(state): """Enable or disable automatic session saving.""" enabled = state == "on" - _handle("session.auto_save", {"enabled": enabled}) + client_from_ctx().session.auto_save(enabled) console.print(f"[green]Auto-save {state}[/green]") diff --git a/browser_cli/commands/storage.py b/browser_cli/commands/storage.py index 7404684..8d1a9a5 100644 --- a/browser_cli/commands/storage.py +++ b/browser_cli/commands/storage.py @@ -1,23 +1,22 @@ import json import click -from browser_cli.commands import _handle +from browser_cli.commands import client_from_ctx, handle_errors, tab_option from rich.console import Console console = Console() - @click.group("storage") def storage_group(): """Read and write the page's localStorage / sessionStorage.""" - @storage_group.command("get") @click.argument("key", required=False) @click.option("--type", "store_type", type=click.Choice(["local", "session"]), default="local", show_default=True) -@click.option("--tab", "tab_id", type=int, default=None, help="Tab ID (default: active tab)") +@tab_option +@handle_errors def storage_get(key, store_type, tab_id): """Get a localStorage KEY (or dump all keys if omitted).""" - result = _handle("storage.get", {"key": key, "type": store_type, "tabId": tab_id}) + result = client_from_ctx().storage.get(key, type=store_type, tab_id=tab_id) if result is None: console.print("[dim]null[/dim]") elif isinstance(result, dict): @@ -25,13 +24,13 @@ def storage_get(key, store_type, tab_id): else: console.print(str(result)) - @storage_group.command("set") @click.argument("key") @click.argument("value") @click.option("--type", "store_type", type=click.Choice(["local", "session"]), default="local", show_default=True) -@click.option("--tab", "tab_id", type=int, default=None, help="Tab ID (default: active tab)") +@tab_option +@handle_errors def storage_set(key, value, store_type, tab_id): """Set localStorage KEY to VALUE.""" - _handle("storage.set", {"key": key, "value": value, "type": store_type, "tabId": tab_id}) + client_from_ctx().storage.set(key, value, type=store_type, tab_id=tab_id) console.print(f"[green]Set[/green] {store_type}[{key!r}] = {value!r}") diff --git a/browser_cli/commands/tabs.py b/browser_cli/commands/tabs.py index 4e76839..00277e2 100644 --- a/browser_cli/commands/tabs.py +++ b/browser_cli/commands/tabs.py @@ -1,14 +1,13 @@ import base64 import binascii import click -from browser_cli.commands import _handle, _handle_multi, _multi_browser_targets +from browser_cli.commands import client_from_ctx, gentle_mode_option, handle_errors, print_counts, tab_option from rich.console import Console from rich.table import Table console = Console() - -def _print_tabs(tabs: list[dict], *, show_browser: bool = False) -> None: +def _print_tabs(tabs, *, show_browser: bool = False) -> None: if not tabs: console.print("[yellow]No tabs found[/yellow]") return @@ -22,58 +21,42 @@ def _print_tabs(tabs: list[dict], *, show_browser: bool = False) -> None: table.add_column("Title") table.add_column("URL") for t in tabs: - active = "[green]✓[/green]" if t.get("active") else "" - muted = "[yellow]✓[/yellow]" if t.get("muted") else "" + active = "[green]✓[/green]" if t.active else "" + muted = "[yellow]✓[/yellow]" if t.muted else "" row = [ - t.get("browser", "") if show_browser else None, - str(t.get("id", "")), - str(t.get("windowId", "")), + (t.browser or "") if show_browser else None, + str(t.id), + str(t.window_id), active, muted, - (t.get("title") or "")[:60], - (t.get("url") or "")[:80], + (t.title or "")[:60], + (t.url or "")[:80], ] table.add_row(*[value for value in row if value is not None]) console.print(table) - @click.group("tabs") def tabs_group(): """Manage browser tabs.""" - @tabs_group.command("list") +@handle_errors def tabs_list(): """List all open tabs across all windows.""" - targets = _multi_browser_targets() - if targets: - tabs = [] - for target in targets: - result = _handle_multi("tabs.list", profile=target.profile, remote=target.remote) - if result is None: - continue - tabs.extend({**tab, "browser": target.display_name} for tab in result) - if not tabs: - console.print("[red]Error:[/red] Cannot resolve a browser socket automatically.") - raise SystemExit(1) - _print_tabs(tabs, show_browser=True) - return - tabs = _handle("tabs.list") - _print_tabs(tabs or []) - + tabs = client_from_ctx().tabs.list() + _print_tabs(tabs, show_browser=any(t.browser for t in tabs)) @tabs_group.command("close") @click.argument("tab_id", type=int, required=False) @click.option("--inactive", is_flag=True, help="Close all inactive tabs") @click.option("--duplicates", is_flag=True, help="Close duplicate tabs (keep first)") -@click.option("--gentle-mode", type=click.Choice(["auto", "normal", "gentle", "ultra"]), default="auto", show_default=True, help="Throttle mode for large close operations.") +@gentle_mode_option("Throttle mode for large close operations.") +@handle_errors def tabs_close(tab_id, inactive, duplicates, gentle_mode): """Close a tab, all inactive tabs, or all duplicate tabs.""" - result = _handle("tabs.close", {"tabId": tab_id, "inactive": inactive, "duplicates": duplicates, "gentleMode": gentle_mode}) - count = result.get("closed", 0) if isinstance(result, dict) else 1 + count = client_from_ctx().tabs.close(tab_id, inactive=inactive, duplicates=duplicates, gentle_mode=gentle_mode) console.print(f"[green]Closed {count} tab(s)[/green]") - @tabs_group.command("move") @click.argument("tab_id", type=int) @click.option("-f", "--forward", "forward", is_flag=True, help="Move one position to the right") @@ -83,180 +66,148 @@ def tabs_close(tab_id, inactive, duplicates, gentle_mode): @click.option("--group", "group_id", type=int, default=None, help="Move to tab group ID") @click.option("--window", "window_id", type=int, default=None, help="Move to window ID") @click.option("--index", type=int, default=None, help="Absolute position index in target") +@handle_errors def tabs_move(tab_id, forward, backward, group_id, window_id, index): """Move a tab. Use --forward/--backward or --right/--left for relative movement.""" - _handle("tabs.move", { - "tabId": tab_id, "forward": forward, "backward": backward, - "groupId": group_id, "windowId": window_id, "index": index, - }) + client_from_ctx().tabs.move( + tab_id, forward=forward, backward=backward, + group_id=group_id, window_id=window_id, index=index, + ) console.print("[green]Tab moved[/green]") - @tabs_group.command("active") @click.argument("tab_id", type=int) +@handle_errors def tabs_active(tab_id): """Switch browser focus to a tab.""" - _handle("tabs.active", {"tabId": tab_id}) + client_from_ctx().tabs.activate(tab_id) console.print(f"[green]Switched to tab {tab_id}[/green]") - @tabs_group.command("status") @click.argument("tab_id", type=int, required=False) +@handle_errors def tabs_status(tab_id): """Show status for the active tab or a specific tab.""" - tab = _handle("tabs.status", {"tabId": tab_id}) or {} + tab = client_from_ctx().tabs.status(tab_id) table = Table(show_header=False) table.add_column("Field", style="bold cyan") table.add_column("Value") - table.add_row("ID", str(tab.get("id", ""))) - table.add_row("Window", str(tab.get("windowId", ""))) - table.add_row("Active", "yes" if tab.get("active") else "no") - table.add_row("Muted", "yes" if tab.get("muted") else "no") - table.add_row("Title", tab.get("title") or "") - table.add_row("URL", tab.get("url") or "") + table.add_row("ID", str(tab.id)) + table.add_row("Window", str(tab.window_id)) + table.add_row("Active", "yes" if tab.active else "no") + table.add_row("Muted", "yes" if tab.muted else "no") + table.add_row("Title", tab.title or "") + table.add_row("URL", tab.url or "") console.print(table) - @tabs_group.command("filter") @click.argument("pattern") +@handle_errors def tabs_filter(pattern): """List tabs whose URL contains PATTERN.""" - tabs = _handle("tabs.filter", {"pattern": pattern}) - _print_tabs(tabs or []) - + _print_tabs(client_from_ctx().tabs.filter(pattern)) @tabs_group.command("count") @click.argument("pattern", required=False) +@handle_errors def tabs_count(pattern): """Count open tabs, optionally filtered by URL PATTERN.""" - targets = _multi_browser_targets() - if targets: - table = Table(show_header=True, header_style="bold cyan") - table.add_column("Browser") - table.add_column("Tabs", justify="right") - total = 0 - rows = 0 - for target in targets: - count = _handle_multi("tabs.count", {"pattern": pattern}, profile=target.profile, remote=target.remote) - if count is None: - continue - count = int(count or 0) - total += count - rows += 1 - table.add_row(target.display_name, str(count)) - if rows == 0: - console.print("[red]Error:[/red] Cannot resolve a browser socket automatically.") - raise SystemExit(1) - table.add_row("Total", str(total)) - console.print(table) - return - count = _handle("tabs.count", {"pattern": pattern}) label = f" matching '{pattern}'" if pattern else "" - console.print(f"[bold]{count}[/bold] tab(s){label}") - + print_counts(client_from_ctx().tabs.count(pattern), "tab", single_suffix=label) @tabs_group.command("query") @click.argument("search") +@handle_errors def tabs_query(search): """Search tabs by URL or title.""" - tabs = _handle("tabs.query", {"search": search}) - _print_tabs(tabs or []) - + _print_tabs(client_from_ctx().tabs.query(search)) @tabs_group.command("html") @click.argument("tab_id", type=int, required=False) +@handle_errors def tabs_html(tab_id): """Print the full HTML of a tab.""" - html = _handle("tabs.html", {"tabId": tab_id}) - console.print(html or "") - + console.print(client_from_ctx().tabs.html(tab_id)) @tabs_group.command("dedupe") -@click.option("--gentle-mode", type=click.Choice(["auto", "normal", "gentle", "ultra"]), default="auto", show_default=True, help="Throttle mode for large dedupe operations.") +@gentle_mode_option("Throttle mode for large dedupe operations.") +@handle_errors def tabs_dedupe(gentle_mode): """Close duplicate tabs (keep the first occurrence of each URL).""" - result = _handle("tabs.dedupe", {"gentleMode": gentle_mode}) - count = result.get("closed", 0) if isinstance(result, dict) else 0 + count = client_from_ctx().tabs.dedupe(gentle_mode=gentle_mode) console.print(f"[green]Closed {count} duplicate tab(s)[/green]") - @tabs_group.command("sort") @click.option("--by", type=click.Choice(["domain", "title", "time"]), default="domain", show_default=True) -@click.option("--gentle-mode", type=click.Choice(["auto", "normal", "gentle", "ultra"]), default="auto", show_default=True, help="Throttle mode for large sort operations.") +@gentle_mode_option("Throttle mode for large sort operations.") +@handle_errors def tabs_sort(by, gentle_mode): """Sort tabs within each window.""" - _handle("tabs.sort", {"by": by, "gentleMode": gentle_mode}) + client_from_ctx().tabs.sort(by=by, gentle_mode=gentle_mode) console.print(f"[green]Tabs sorted by {by}[/green]") - @tabs_group.command("merge-windows") -@click.option("--gentle-mode", type=click.Choice(["auto", "normal", "gentle", "ultra"]), default="auto", show_default=True, help="Throttle mode for large merge operations.") +@gentle_mode_option("Throttle mode for large merge operations.") +@handle_errors def tabs_merge_windows(gentle_mode): """Move all tabs into the focused window.""" - result = _handle("tabs.merge_windows", {"gentleMode": gentle_mode}) - count = result.get("moved", 0) if isinstance(result, dict) else 0 + count = client_from_ctx().tabs.merge_windows(gentle_mode=gentle_mode) console.print(f"[green]Merged — moved {count} tab(s) into current window[/green]") - @tabs_group.command("mute") @click.argument("tab_id", type=int, required=False) +@handle_errors def tabs_mute(tab_id): """Mute the active tab or a specific tab.""" - result = _handle("tabs.mute", {"tabId": tab_id}) - target = result.get("tabId", tab_id) if isinstance(result, dict) else tab_id + target = client_from_ctx().tabs.mute(tab_id) console.print(f"[green]Muted tab {target}[/green]") - @tabs_group.command("unmute") @click.argument("tab_id", type=int, required=False) +@handle_errors def tabs_unmute(tab_id): """Unmute the active tab or a specific tab.""" - result = _handle("tabs.unmute", {"tabId": tab_id}) - target = result.get("tabId", tab_id) if isinstance(result, dict) else tab_id + target = client_from_ctx().tabs.unmute(tab_id) console.print(f"[green]Unmuted tab {target}[/green]") - @tabs_group.command("pin") @click.argument("tab_id", type=int, required=False) +@handle_errors def tabs_pin(tab_id): """Pin the active tab or a specific tab.""" - result = _handle("tabs.pin", {"tabId": tab_id}) - target = result.get("tabId", tab_id) if isinstance(result, dict) else tab_id + target = client_from_ctx().tabs.pin(tab_id) console.print(f"[green]Pinned tab {target}[/green]") - @tabs_group.command("unpin") @click.argument("tab_id", type=int, required=False) +@handle_errors def tabs_unpin(tab_id): """Unpin the active tab or a specific tab.""" - result = _handle("tabs.unpin", {"tabId": tab_id}) - target = result.get("tabId", tab_id) if isinstance(result, dict) else tab_id + target = client_from_ctx().tabs.unpin(tab_id) console.print(f"[green]Unpinned tab {target}[/green]") - @tabs_group.command("watch-url") @click.argument("pattern") -@click.option("--tab", "tab_id", type=int, default=None, help="Tab ID (default: active tab)") +@tab_option @click.option("--timeout", type=float, default=30.0, show_default=True, help="Max seconds to wait") +@handle_errors def tabs_watch_url(pattern, tab_id, timeout): """Wait until the active (or specified) tab URL matches regex PATTERN.""" - result = _handle("tabs.watch_url", {"pattern": pattern, "tabId": tab_id, "timeout": int(timeout * 1000)}) - url = result.get("url", "") if isinstance(result, dict) else "" - console.print(f"[green]URL matched:[/green] {url}") - + tab = client_from_ctx().tabs.watch_url(pattern, tab_id=tab_id, timeout=timeout) + console.print(f"[green]URL matched:[/green] {tab.url}") @tabs_group.command("screenshot") @click.argument("output", required=False, metavar="FILE") -@click.option("--tab", "tab_id", type=int, default=None, help="Tab ID (default: active tab)") +@tab_option @click.option("--format", "fmt", type=click.Choice(["png", "jpeg"]), default="png", show_default=True) @click.option("--quality", type=int, default=None, help="JPEG quality 0-100") +@handle_errors def tabs_screenshot(output, tab_id, fmt, quality): """Capture a screenshot of the active (or specified) tab. Saves to FILE if given, otherwise prints the base64 data URL. """ - result = _handle("tabs.screenshot", {"tabId": tab_id, "format": fmt, "quality": quality}) - data_url = result.get("dataUrl", "") if isinstance(result, dict) else "" + data_url = client_from_ctx().tabs.screenshot(tab_id, format=fmt, quality=quality) if output: header = f"data:image/{fmt};base64," if not data_url.startswith(header): diff --git a/browser_cli/commands/windows.py b/browser_cli/commands/windows.py index 3d814e3..1ad33c4 100644 --- a/browser_cli/commands/windows.py +++ b/browser_cli/commands/windows.py @@ -1,11 +1,10 @@ import click -from browser_cli.commands import _handle, _handle_multi, _multi_browser_targets +from browser_cli.commands import client_from_ctx, handle_errors from rich.console import Console from rich.table import Table console = Console() - def _print_windows(windows: list[dict], *, show_browser: bool = False) -> None: if not windows: console.print("[yellow]No windows found[/yellow]") @@ -28,53 +27,39 @@ def _print_windows(windows: list[dict], *, show_browser: bool = False) -> None: table.add_row(*[value for value in row if value is not None]) console.print(table) - @click.group("windows") def windows_group(): """Manage browser windows.""" - @windows_group.command("list") +@handle_errors def windows_list(): """List all browser windows.""" - targets = _multi_browser_targets() - if targets: - windows = [] - for target in targets: - result = _handle_multi("windows.list", profile=target.profile, remote=target.remote) - if result is None: - continue - windows.extend({**window, "browser": target.display_name} for window in result) - if not windows: - console.print("[red]Error:[/red] Cannot resolve a browser socket automatically.") - raise SystemExit(1) - _print_windows(windows, show_browser=True) - return - windows = _handle("windows.list") - _print_windows(windows or []) - + windows = client_from_ctx().windows.list() + _print_windows(windows, show_browser=any("browser" in w for w in windows)) @windows_group.command("rename") @click.argument("window_id", type=int) @click.argument("name") +@handle_errors def windows_rename(window_id, name): """Give a window a local alias NAME (stored in native host).""" - _handle("windows.rename", {"windowId": window_id, "name": name}) + client_from_ctx().windows.rename(window_id, name) console.print(f"[green]Window {window_id} aliased as '{name}'[/green]") - @windows_group.command("close") @click.argument("window_id", type=int) +@handle_errors def windows_close(window_id): """Close a browser window.""" - _handle("windows.close", {"windowId": window_id}) + client_from_ctx().windows.close(window_id) console.print(f"[green]Window {window_id} closed[/green]") - @windows_group.command("open") @click.argument("url", required=False) +@handle_errors def windows_open(url): """Open a new browser window.""" - result = _handle("windows.open", {"url": url}) + result = client_from_ctx().windows.open(url) wid = result.get("id") if isinstance(result, dict) else result console.print(f"[green]Opened new window[/green] (id: {wid})" + (f" with {url}" if url else "")) diff --git a/browser_cli/endpoints.py b/browser_cli/endpoints.py new file mode 100644 index 0000000..649c6e2 --- /dev/null +++ b/browser_cli/endpoints.py @@ -0,0 +1,57 @@ +"""Remote endpoint string handling — parsing, normalization, display. + +Pure helpers (no sockets, no I/O) for turning user-facing ``host[:port]`` +strings into the canonical forms the rest of the client uses, and back into the +short forms shown to humans. Re-exported from :mod:`browser_cli.client` for +backward compatibility. +""" +from __future__ import annotations + +import re + +from browser_cli.errors import BrowserNotConnected + +_DEFAULT_REMOTE_PORT = 443 + +def _looks_like_domain(host: str) -> bool: + """True if host looks like a domain name rather than an IP address or localhost.""" + if host in {"localhost", "127.0.0.1", "::1"}: + return False + if re.match(r'^\d{1,3}(\.\d{1,3}){3}$', host): + return False + return '.' in host and any(c.isalpha() for c in host) + +def _normalize_endpoint(endpoint: str) -> str: + """Strip :443 from domain-like endpoints so they are stored without the default port.""" + if not endpoint: + return endpoint + host, sep, port = endpoint.rpartition(":") + if sep and port == "443" and _looks_like_domain(host): + return host + return endpoint + +def _resolve_connect_endpoint(endpoint: str) -> str: + """Return host:port for TCP connection; domain without port defaults to :443.""" + _, sep, _ = endpoint.rpartition(":") + if not sep: + if _looks_like_domain(endpoint): + return f"{endpoint}:{_DEFAULT_REMOTE_PORT}" + raise BrowserNotConnected( + f"Invalid remote endpoint '{endpoint}': expected host:port" + ) + return endpoint + +def display_browser_name(profile_name: str, sock_path: str) -> str: + from pathlib import Path + + if profile_name != "default": + return profile_name + return Path(sock_path).stem or profile_name + +def _remote_display_name(endpoint: str, profile_name: str, display_name: str) -> str: + host, sep, port = endpoint.rpartition(":") + if sep and (port == "8765" or (port == "443" and _looks_like_domain(host))): + display_endpoint = host + else: + display_endpoint = endpoint # normalized domain (no port) or non-default port + return f"{display_endpoint}:{display_name or profile_name}" diff --git a/browser_cli/errors.py b/browser_cli/errors.py new file mode 100644 index 0000000..bc490ba --- /dev/null +++ b/browser_cli/errors.py @@ -0,0 +1,10 @@ +"""Shared exception types for the browser-cli client stack. + +Kept dependency-free so the transport/endpoint modules and ``client`` itself can +import it without creating an import cycle. ``BrowserNotConnected`` is re-exported +from :mod:`browser_cli.client` for backward compatibility. +""" +from __future__ import annotations + +class BrowserNotConnected(Exception): + """Raised when the native host socket is not available.""" diff --git a/browser_cli/markdown.py b/browser_cli/markdown.py new file mode 100644 index 0000000..01e99e3 --- /dev/null +++ b/browser_cli/markdown.py @@ -0,0 +1,413 @@ +"""HTML → Markdown conversion and Markdown clean-up. + +Pure, presentation-agnostic text transforms shared by the SDK +(:meth:`browser_cli.sdk.dom.ExtractNS.markdown`) and the ``extract markdown`` +CLI command. No Click/Rich/IPC dependencies — just an HTML tree walker plus a +set of repair passes for the markdown the page (or a markdown editor like +Obsidian/CodeMirror) hands back. +""" +from __future__ import annotations + +import re +from html.parser import HTMLParser + +_FENCE_RE = re.compile(r"```(?:[^\n`]*)\n.*?\n```", re.DOTALL) +_ESCAPED_MARKDOWN_RE = re.compile(r"\\([_-])") +_TABLE_SEPARATOR_RE = re.compile(r"^\|(?:\s*:?-{3,}:?\s*\|)+\s*$") + +class _HtmlNode: + def __init__(self, tag=None, attrs=None, text=None): + self.tag = tag + self.attrs = attrs or {} + self.text = text + self.children = [] + +class _HtmlTreeBuilder(HTMLParser): + _VOID_TAGS = {"br", "hr", "img"} + + def __init__(self): + super().__init__(convert_charrefs=True) + self.root = _HtmlNode(tag="document") + self._stack = [self.root] + + def handle_starttag(self, tag, attrs): + node = _HtmlNode(tag=tag.lower(), attrs=dict(attrs)) + self._stack[-1].children.append(node) + if node.tag not in self._VOID_TAGS: + self._stack.append(node) + + def handle_startendtag(self, tag, attrs): + node = _HtmlNode(tag=tag.lower(), attrs=dict(attrs)) + self._stack[-1].children.append(node) + + def handle_endtag(self, tag): + lowered = tag.lower() + for index in range(len(self._stack) - 1, 0, -1): + if self._stack[index].tag == lowered: + del self._stack[index:] + break + + def handle_data(self, data): + if data: + self._stack[-1].children.append(_HtmlNode(text=data)) + +def _normalize_text(value): + return re.sub(r"\s+", " ", value or "").strip() + +def _normalize_inline(value): + value = value.replace("\xa0", " ") + value = re.sub(r"[ \t\r\f\v]+", " ", value) + value = re.sub(r" *\n *", "\n", value) + return value.strip() + +def _collapse_blank_lines(value): + value = re.sub(r"[ \t]+\n", "\n", value) + value = re.sub(r"\n{3,}", "\n\n", value) + return value.strip() + +def _escape_markdown(text): + return re.sub(r"([\\`[\]])", r"\\\1", text) + +def _escape_table_cell(text): + return text.replace("|", r"\|").replace("\n", " ").strip() + +def _iter_descendants(node): + for child in getattr(node, "children", []): + yield child + yield from _iter_descendants(child) + +def _has_class(node, class_name): + classes = (node.attrs.get("class") or "").split() + return class_name in classes + +def _is_code_block_node(node): + if not node or not node.tag: + return False + if node.attrs.get("data-is-code-block-view") == "true": + return True + return node.tag == "pre" + +def _inline_text(node): + if node.text is not None: + return _escape_markdown(node.text) + if not node.tag: + return "" + + tag = node.tag + if tag == "br": + return "\n" + if tag == "img": + src = node.attrs.get("src") or "" + alt = _normalize_text(node.attrs.get("alt") or "") + if not src: + return "" + return f"![{_escape_markdown(alt)}]({src})" if alt else f"![]({src})" + if tag == "a": + text = _normalize_inline("".join(_inline_text(child) for child in node.children)) + href = node.attrs.get("href") or "" + return f"[{text or href}]({href})" if href else text + if tag == "code": + text = _normalize_inline("".join(_inline_text(child) for child in node.children)) + return f"`{text.replace('`', r'\\`')}`" if text else "" + if tag in {"strong", "b"}: + text = _normalize_inline("".join(_inline_text(child) for child in node.children)) + return f"**{text}**" if text else "" + if tag in {"em", "i"}: + text = _normalize_inline("".join(_inline_text(child) for child in node.children)) + return f"*{text}*" if text else "" + + chunks = [] + for child in node.children: + rendered = _inline_text(child) + if rendered: + chunks.append(rendered) + if child.tag in {"p", "div", "table", "ul", "ol", "pre"}: + chunks.append("\n") + return "".join(chunks) + +def _text_block(node): + return _collapse_blank_lines(_normalize_inline("".join(_inline_text(child) for child in node.children))) + +def _inner_text_preserve(node): + if node.text is not None: + return node.text + if not node.tag: + return "" + if node.tag == "br": + return "" + return "".join(_inner_text_preserve(child) for child in node.children) + +def _table_to_markdown(node): + rows = [] + for descendant in _iter_descendants(node): + if descendant.tag != "tr": + continue + row = [] + for cell in descendant.children: + if cell.tag in {"td", "th"}: + row.append(_escape_table_cell(_text_block(cell))) + if row: + rows.append(row) + if not rows: + return "" + + widths = max(len(row) for row in rows) + normalized_rows = [row + [""] * (widths - len(row)) for row in rows] + + headers = normalized_rows[0] + body_rows = normalized_rows[1:] + first_row_blank = all(not cell.strip() for cell in headers) + if first_row_blank and len(normalized_rows) > 1: + headers = normalized_rows[1] + body_rows = normalized_rows[2:] + + has_thead = any(child.tag == "thead" for child in node.children) + first_row = next((child for child in _iter_descendants(node) if child.tag == "tr"), None) + first_row_has_th = bool(first_row and any(child.tag == "th" for child in first_row.children)) + if not (has_thead or first_row_has_th or first_row_blank): + headers = [""] * widths + body_rows = normalized_rows + + separator = ["---"] * widths + lines = [ + f"| {' | '.join(headers)} |", + f"| {' | '.join(separator)} |", + ] + lines.extend(f"| {' | '.join(row)} |" for row in body_rows) + return "\n".join(lines) + +def _list_to_markdown(node, depth=0): + ordered = node.tag == "ol" + items = [] + index = 1 + for child in node.children: + if child.tag != "li": + continue + marker = f"{index}. " if ordered else "- " + index += 1 + content = [] + nested = [] + for item_child in child.children: + if item_child.tag in {"ul", "ol"}: + nested.append(_list_to_markdown(item_child, depth + 1)) + else: + content.append(_inline_text(item_child)) + line = _collapse_blank_lines(_normalize_inline("".join(content))) + indent = " " * depth + if line: + line_parts = line.splitlines() + items.append(f"{indent}{marker}{line_parts[0]}") + continuation_indent = f"{indent}{' ' * len(marker)}" + items.extend(f"{continuation_indent}{part}" for part in line_parts[1:]) + items.extend(block for block in nested if block) + return "\n".join(items) + +def _code_block_to_markdown(node): + if node.tag == "pre": + text = _inner_text_preserve(node).rstrip("\n") + return f"```\n{text}\n```" if text else "" + + lines = [] + for descendant in _iter_descendants(node): + if descendant.tag and _has_class(descendant, "cm-line"): + lines.append(_inner_text_preserve(descendant)) + code = "\n".join(lines).rstrip("\n") + return f"```\n{code}\n```" if code else "" + +def _block_to_markdown(node): + if node.text is not None: + return _normalize_text(node.text) + if not node.tag: + return "" + if _is_code_block_node(node): + return _code_block_to_markdown(node) + if node.tag == "table": + return _table_to_markdown(node) + if node.tag in {"ul", "ol"}: + return _list_to_markdown(node) + if re.fullmatch(r"h[1-6]", node.tag): + text = _text_block(node) + return f"{'#' * int(node.tag[1])} {text}" if text else "" + if node.tag in {"p", "figcaption"}: + return _text_block(node) + if node.tag == "blockquote": + content = _collapse_blank_lines("\n\n".join(filter(None, (_block_to_markdown(child) for child in node.children)))) + return "\n".join(f"> {line}" if line else ">" for line in content.splitlines()) if content else "" + if node.tag == "hr": + return "---" + if node.tag == "img": + return _inline_text(node) + + child_blocks = [block for block in (_block_to_markdown(child) for child in node.children) if block] + if child_blocks: + return _collapse_blank_lines("\n\n".join(child_blocks)) + return _text_block(node) + +def _parse_table_row(line): + stripped = line.strip() + if not stripped.startswith("|") or not stripped.endswith("|"): + return None + return [cell.strip() for cell in stripped.strip("|").split("|")] + +def _repair_table_headers(lines): + repaired = [] + index = 0 + while index < len(lines): + if ( + index + 2 < len(lines) + and _parse_table_row(lines[index]) is not None + and _TABLE_SEPARATOR_RE.match(lines[index + 1].strip()) + and _parse_table_row(lines[index + 2]) is not None + ): + first = _parse_table_row(lines[index]) + third = _parse_table_row(lines[index + 2]) + if first and all(not cell for cell in first) and any(cell for cell in third): + repaired.append(lines[index + 2].strip()) + repaired.append(lines[index + 1].strip()) + index += 3 + continue + repaired.append(lines[index].strip()) + index += 1 + return repaired + +def _repair_list_continuations(lines): + repaired = [] + previous_was_list_item = False + previous_continuation_indent = "" + + for line in lines: + stripped = line.strip() + list_match = re.match(r"^(\s*)([-*+]|\d+\.)\s+.+$", stripped) + is_markdown_block_start = ( + not stripped + or stripped.startswith(("```", "#", ">", "|")) + or _TABLE_SEPARATOR_RE.match(stripped) + or re.match(r"^(\s*)([-*+]|\d+\.)\s+", stripped) + ) + + if previous_was_list_item and stripped and not is_markdown_block_start: + repaired.append(f"{previous_continuation_indent}{stripped}") + previous_was_list_item = False + continue + + repaired.append(stripped) + if list_match: + marker = list_match.group(2) + base_indent = list_match.group(1) + previous_continuation_indent = f"{base_indent}{' ' * (len(marker) + 1)}" + previous_was_list_item = True + else: + previous_was_list_item = False + + return repaired + +def _repair_flattened_diagram(text): + if "\n" in text: + return text + if sum(text.count(char) for char in "│▼├└") < 2: + return text + + text = re.sub(r"\s{2,}([│▼])", r"\n \1", text) + text = re.sub(r"([│▼])\s{2,}", r"\1\n", text) + text = re.sub(r"([│▼])(?=[^\s\n│▼├└])", r"\1\n", text) + text = re.sub(r"(?<=[^\s\n])([├└])", r"\n\1", text) + text = re.sub(r"([^\s\n])(\()", r"\1\n\2", text) + return "\n".join(line.rstrip() for line in text.splitlines() if line.strip()) + +def _convert_dash_lists_to_branches(lines): + converted = [] + index = 0 + while index < len(lines): + match = re.match(r"^(\s*)-\s+(.*)$", lines[index]) + if not match: + converted.append(lines[index]) + index += 1 + continue + + indent = match.group(1) + items = [] + while index < len(lines): + next_match = re.match(rf"^{re.escape(indent)}-\s+(.*)$", lines[index]) + if not next_match: + break + items.append(next_match.group(1)) + index += 1 + + for item_index, item in enumerate(items): + branch = "└" if item_index == len(items) - 1 else "├" + converted.append(f"{indent}{branch} {item}") + return converted + +def _clean_code_block(code): + lines = [line.rstrip() for line in code.splitlines()] + while lines and not lines[0].strip(): + lines.pop(0) + while lines and not lines[-1].strip(): + lines.pop() + + flattened = _repair_flattened_diagram("\n".join(lines)) + lines = flattened.splitlines() if flattened else [] + lines = [ + f" {line.strip()}" + if line.strip() in {"│", "▼"} and not re.match(r"^\s+[│▼]\s*$", line) + else line + for line in lines + ] + lines = _convert_dash_lists_to_branches(lines) + return "\n".join(lines) + +def _clean_markdown_output(markdown): + if not markdown: + return "" + + pieces = [] + last_index = 0 + for match in _FENCE_RE.finditer(markdown): + prose = markdown[last_index:match.start()] + if prose: + cleaned = _ESCAPED_MARKDOWN_RE.sub(r"\1", prose) + lines = [line.strip() for line in cleaned.splitlines()] + lines = _repair_table_headers(lines) + lines = _repair_list_continuations(lines) + cleaned = "\n".join(lines) + cleaned = _collapse_blank_lines(cleaned) + if cleaned: + pieces.append(cleaned) + + fence = match.group(0) + header, _, tail = fence.partition("\n") + body, _, _ = tail.rpartition("\n") + cleaned_body = _clean_code_block(body) + pieces.append(f"{header}\n{cleaned_body}\n```" if cleaned_body else f"{header}\n```") + last_index = match.end() + + trailing = markdown[last_index:] + if trailing: + cleaned = _ESCAPED_MARKDOWN_RE.sub(r"\1", trailing) + lines = [line.strip() for line in cleaned.splitlines()] + lines = _repair_table_headers(lines) + lines = _repair_list_continuations(lines) + cleaned = "\n".join(lines) + cleaned = _collapse_blank_lines(cleaned) + if cleaned: + pieces.append(cleaned) + + return "\n\n".join(piece for piece in pieces if piece) + +def _convert_html_to_markdown(html): + parser = _HtmlTreeBuilder() + parser.feed(html or "") + markdown = _block_to_markdown(parser.root) + return _clean_markdown_output(markdown) + +def render_markdown(raw: str | None) -> str: + """Normalize *raw* extractor output into clean Markdown. + + If the payload looks like HTML (first non-space char is ``<``) it is run + through the HTML→Markdown converter; otherwise it is treated as Markdown and + only the clean-up/repair passes are applied. + """ + raw = raw or "" + if raw.lstrip().startswith("<"): + return _convert_html_to_markdown(raw) + return _clean_markdown_output(raw) diff --git a/browser_cli/models.py b/browser_cli/models.py index c6d465d..c30d732 100644 --- a/browser_cli/models.py +++ b/browser_cli/models.py @@ -4,11 +4,11 @@ Typed dataclasses returned by the BrowserCLI Python API. Each object is bound to a BrowserCLI instance so you can call actions directly on it: - tabs = b.tabs_list() + tabs = b.tabs.list() tabs[0].close() tabs[0].move(forward=True) - groups = b.group_list() + groups = b.groups.list() groups[0].tabs() groups[0].add_tab("https://example.com") """ @@ -21,6 +21,14 @@ if TYPE_CHECKING: from browser_cli import BrowserCLI +# ── BrowserCounts ─────────────────────────────────────────────────────────── + +@dataclass(frozen=True) +class BrowserCounts: + """Aggregated per-browser counts returned in implicit multi-browser mode.""" + total: int + by_browser: dict[str, int] + # ── Tab ─────────────────────────────────────────────────────────────────────── @dataclass @@ -97,7 +105,7 @@ class Tab: def screenshot(self, *, format: str = "png", quality: int | None = None) -> str: """Capture this tab's visible area. Returns a base64 data URL.""" - return self._b().tabs_screenshot(self.id, format=format, quality=quality) + return self._b().tabs.screenshot(self.id, format=format, quality=quality) def pin(self) -> None: """Pin this tab.""" @@ -109,19 +117,19 @@ class Tab: def refresh(self) -> Tab: """Return a fresh snapshot of this tab.""" - return self._b().tabs_status(self.id) + return self._b().tabs.status(self.id) def wait_for_load(self, *, timeout: float = 30.0, ready_state: str = "complete") -> Tab: """Wait until this tab reaches the requested readyState.""" - return self._b().wait_for_load(self.id, timeout=timeout, ready_state=ready_state) + return self._b().tabs.wait_for_load(self.id, timeout=timeout, ready_state=ready_state) def watch_url(self, pattern: str, *, timeout: float = 30.0) -> Tab: """Wait until this tab's URL matches regex *pattern*.""" - return self._b().tabs_watch_url(pattern, tab_id=self.id, timeout=timeout) + return self._b().tabs.watch_url(pattern, tab_id=self.id, timeout=timeout) def open(self, url: str, *, background: bool = False) -> None: """Navigate this tab to *url* in place.""" - self._b().navigate_tab(self.id, url) + self._b().nav.to(self.id, url) # ── Group ───────────────────────────────────────────────────────────────────── @@ -148,7 +156,7 @@ class Group: def tabs(self) -> list[Tab]: """Return all tabs inside this group.""" - return self._b().group_tabs(self.id) + return self._b().groups.tabs(self.id) def move(self, *, forward: bool = False, backward: bool = False) -> None: """Move this group forward or backward among groups.""" @@ -160,4 +168,4 @@ class Group: def add_tab(self, url: str | None = None) -> int | None: """Open a new tab inside this group. Returns the new tab ID.""" - return self._b().group_add_tab(self.id, url) + return self._b().groups.add_tab(self.id, url) diff --git a/browser_cli/remote_transport.py b/browser_cli/remote_transport.py new file mode 100644 index 0000000..295783b --- /dev/null +++ b/browser_cli/remote_transport.py @@ -0,0 +1,123 @@ +"""TCP/TLS transport for talking to a remote ``browser-cli serve``. + +Owns the wire mechanics of the remote leg: open a socket (TLS on :443), +complete the signed challenge/response handshake with an optional post-quantum +key exchange, frame the request, and read the framed (possibly encrypted) +response. The higher-level "which endpoint / which profile / which key" +decisions stay in :mod:`browser_cli.client`, which re-exports these for +backward compatibility. +""" +from __future__ import annotations + +import json +import socket +import struct +import sys + +from browser_cli.errors import BrowserNotConnected +from browser_cli.endpoints import _resolve_connect_endpoint +from browser_cli.version_manager import MAX_MSG_BYTES as _MAX_MSG_BYTES +from browser_cli.version_manager import USER_AGENT as _USER_AGENT + +_PQ_WARNING = ( + "** WARNING: connection is not using a post-quantum key exchange algorithm.\n" + "** This session may be vulnerable to store now, decrypt later attacks.\n" +) + +def _recv_exact(sock: socket.socket, n: int) -> bytes: + buf = b"" + while len(buf) < n: + chunk = sock.recv(n - len(buf)) + if not chunk: + raise ConnectionError("Socket closed before full message received") + buf += chunk + return buf + +def _recv_all(sock: socket.socket) -> bytes: + raw_len = _recv_exact(sock, 4) + msg_len = struct.unpack(" _MAX_MSG_BYTES: + raise ConnectionError(f"Response too large ({msg_len} bytes)") + return _recv_exact(sock, msg_len) + +def _send_remote(endpoint: str, msg: dict, private_key=None) -> bytes | None: + connect_ep = _resolve_connect_endpoint(endpoint) + host, _, port_str = connect_ep.rpartition(":") + port = int(port_str) + raw_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + raw_sock.settimeout(30) + try: + raw_sock.connect((host, port)) + if port == 443: + import ssl + ctx = ssl.create_default_context() + sock = ctx.wrap_socket(raw_sock, server_hostname=host) + else: + sock = raw_sock + except Exception: + raw_sock.close() + raise + with sock: + + # receive challenge + challenge_raw = _recv_all(sock) + if challenge_raw is None: + raise BrowserNotConnected(f"No challenge received from {endpoint}") + try: + challenge = json.loads(challenge_raw) + nonce_hex = challenge.get("nonce") if challenge.get("type") == "challenge" else None + except (json.JSONDecodeError, AttributeError): + nonce_hex = None + + min_ver = challenge.get("min_client_version") if isinstance(challenge, dict) else None + if min_ver: + from browser_cli.version_manager import parse_version + try: + client_ver = _USER_AGENT.split("/", 1)[1] + if parse_version(client_ver) < parse_version(min_ver): + raise BrowserNotConnected( + f"Client version {client_ver} is too old for this server " + f"(requires >= {min_ver}). Run: pip install --upgrade browser-cli" + ) + except (IndexError, ValueError): + pass + + pq_shared_secret = None + if nonce_hex and private_key is not None: + from browser_cli.auth import PQ_KEX_ALG, pq_encrypt, pq_kex_client_encapsulate, sign, public_key_hex + nonce = bytes.fromhex(nonce_hex) + clean_msg = {k: v for k, v in msg.items() if k not in {"token", "pubkey", "sig", "pq_kex", "encrypted"}} + kex = challenge.get("pq_kex") if isinstance(challenge, dict) else None + if isinstance(kex, dict) and kex.get("alg") == PQ_KEX_ALG and kex.get("public_key"): + ciphertext_hex, pq_shared_secret = pq_kex_client_encapsulate(str(kex["public_key"])) + clean_msg["pq_kex"] = {"alg": PQ_KEX_ALG, "ciphertext": ciphertext_hex} + else: + sys.stderr.write(_PQ_WARNING) + sig = sign(private_key, nonce, clean_msg, pq_shared_secret) + msg = {**clean_msg, "pubkey": public_key_hex(private_key), "sig": sig.hex()} + if pq_shared_secret is not None: + encrypted = pq_encrypt(pq_shared_secret, "request", json.dumps(clean_msg).encode("utf-8")) + msg = { + "id": clean_msg.get("id"), + "user_agent": clean_msg.get("user_agent"), + "pubkey": public_key_hex(private_key), + "sig": sig.hex(), + "pq_kex": clean_msg["pq_kex"], + "encrypted": encrypted, + } + else: + sys.stderr.write(_PQ_WARNING) + + payload = json.dumps(msg).encode("utf-8") + framed = struct.pack(" str | dict | None: + """Get a localStorage/sessionStorage entry (or all entries if key omitted).""" + return self._c._cmd("storage.get", {"key": key, "type": type, "tabId": tab_id}) + + def set( + self, + key: str, + value: str, + *, + type: str = "local", + tab_id: int | None = None, + ) -> None: + """Set a localStorage/sessionStorage entry.""" + self._c._cmd("storage.set", {"key": key, "value": value, "type": type, "tabId": tab_id}) + +class CookiesNS(Namespace): + """List, get, and set cookies.""" + + def list( + self, + *, + url: str | None = None, + domain: str | None = None, + name: str | None = None, + ) -> list[dict]: + """List cookies, optionally filtered by url, domain, or name.""" + return self._c._cmd("cookies.list", {"url": url, "domain": domain, "name": name}) or [] + + def get(self, url: str, name: str) -> dict | None: + """Get a single cookie by url and name.""" + return self._c._cmd("cookies.get", {"url": url, "name": name}) + + def set( + self, + url: str, + name: str, + value: str, + *, + domain: str | None = None, + path: str | None = None, + secure: bool | None = None, + http_only: bool | None = None, + expiration_date: float | None = None, + same_site: str | None = None, + ) -> dict: + """Set a cookie. Returns the created cookie dict.""" + return self._c._cmd("cookies.set", { + "url": url, "name": name, "value": value, + "domain": domain, "path": path, + "secure": secure, "httpOnly": http_only, + "expirationDate": expiration_date, "sameSite": same_site, + }) diff --git a/browser_cli/sdk/dom.py b/browser_cli/sdk/dom.py new file mode 100644 index 0000000..8fa41e8 --- /dev/null +++ b/browser_cli/sdk/dom.py @@ -0,0 +1,150 @@ +"""DOM, content-extraction, and page-info namespaces: ``b.dom.*``, ``b.extract.*``, ``b.page.*``.""" +from __future__ import annotations + +from browser_cli.sdk.base import Namespace + +class DomNS(Namespace): + """Query and drive page elements in the active (or specified) tab.""" + + def query(self, selector: str) -> list[dict]: + return self._c._cmd("dom.query", {"selector": selector}) or [] + + def click(self, selector: str) -> None: + self._c._cmd("dom.click", {"selector": selector}) + + def type(self, selector: str, text: str) -> None: + self._c._cmd("dom.type", {"selector": selector, "text": text}) + + def attr(self, selector: str, attr: str) -> list[str]: + return self._c._cmd("dom.attr", {"selector": selector, "attr": attr}) or [] + + def text(self, selector: str) -> list[str]: + return self._c._cmd("dom.text", {"selector": selector}) or [] + + def exists(self, selector: str) -> bool: + return self._c._cmd("dom.exists", {"selector": selector}) or False + + def scroll(self, selector: str | None = None, *, x: int | None = None, y: int | None = None) -> None: + """Scroll to a CSS selector or to pixel coordinates.""" + self._c._cmd("dom.scroll", {"selector": selector, "x": x, "y": y}) + + def select(self, selector: str, value: str) -> None: + """Set the value of a