"""Multi-browser routing mixin for :class:`~browser_cli.BrowserCLI`. When no specific browser is selected and more than one browser (local or remote) is active, list/count commands fan out to every target and aggregate the results. This mixin holds that fan-out machinery plus a few small response helpers; single-browser mode falls straight through to ``_cmd``. """ from __future__ import annotations from collections.abc import Callable, Iterable import browser_cli as _pkg from browser_cli.client import BrowserNotConnected from browser_cli.models import BrowserCounts, Tab # send_command / active_browser_targets / remote_browser_targets are resolved # through the ``browser_cli`` package namespace (``_pkg``) at call time, not bound # here at import, so tests patching ``browser_cli.send_command`` still take effect. _UNSET = object() class RoutingMixin: """Fan-out + aggregation across active browsers, mixed into ``BrowserCLI``. Relies on the client exposing ``_browser``/``_remote``/``_key``, ``_cmd``, and a ``tabs`` namespace. """ def _multi_browser_targets(self): if self._browser is not None: return [] if self._remote: targets = _pkg.remote_browser_targets(self._remote, key=self._key) else: targets = _pkg.active_browser_targets() if len(targets) <= 1 and not any(target.remote for target in targets): return [] return targets def _collect_multi_browser(self, command: str, args: dict | None = None): results = [] targets = self._multi_browser_targets() for target in targets: try: if target.remote: data = _pkg.send_command(command, args, profile=target.profile, remote=target.remote, key=self._key) else: data = _pkg.send_command(command, args, profile=target.profile) except (BrowserNotConnected, RuntimeError): continue results.append((target, data)) if results: return results if targets: raise BrowserNotConnected( "Cannot resolve a browser socket automatically.\n" "Make sure the browser is running with the browser-cli extension enabled,\n" "or pass --browser / set BROWSER_CLI_PROFILE to a known alias." ) return [] @staticmethod def _field(result, key, default=None, *, fallback=_UNSET): """Pull *key* out of a dict response, with a non-dict fallback. Returns ``result[key]`` (or *default*) when *result* is a dict. When it is not a dict, returns *fallback* if given, else *default*. """ if isinstance(result, dict): return result.get(key, default) return default if fallback is _UNSET else fallback def _toggle_tab(self, command: str, tab_id: int | None) -> int: """Run a tab toggle command (mute/pin/...) and return the target tab ID.""" result = self._cmd(command, {"tabId": tab_id}) return self._field(result, "tabId", tab_id, fallback=int(tab_id or 0)) def _multi_count(self, command: str, args: dict | None = None) -> "int | BrowserCounts": """Count command that aggregates into :class:`BrowserCounts` in multi-browser mode.""" multi_results = self._collect_multi_browser(command, args or {}) if not multi_results: return self._cmd(command, args or {}) by_browser = {target.display_name: int(count or 0) for target, count in multi_results} return BrowserCounts(total=sum(by_browser.values()), by_browser=by_browser) def _multi_list(self, command: str, args: dict | None, mapper): """List command, flattening per-browser results in multi-browser mode. *mapper* is ``(item, target) -> mapped`` where ``target`` is the source :class:`BrowserTarget` in multi mode, or ``None`` in single-browser mode. """ multi_results = self._collect_multi_browser(command, args or {}) if multi_results: return [ mapper(item, target) for target, items in multi_results for item in (items or []) ] return [mapper(item, None) for item in (self._cmd(command, args or {}) or [])] def _apply_tab_filter(self, filter_fn: Callable[[Tab], bool] | Callable[[list[Tab]], Iterable[Tab]]) -> list[Tab]: tabs = self.tabs.list() try: transformed = filter_fn(tabs) except (AttributeError, TypeError): return [tab for tab in tabs if filter_fn(tab)] if isinstance(transformed, list): return transformed if isinstance(transformed, tuple): return list(transformed) if isinstance(transformed, set): return list(transformed) if transformed is tabs: return tabs if isinstance(transformed, bool): return [tab for tab in tabs if filter_fn(tab)] try: return list(transformed) except TypeError: return [tab for tab in tabs if filter_fn(tab)]