"""Click command for exposing a browser over TCP.""" from __future__ import annotations import asyncio import sys from pathlib import Path import click from browser_cli import transport from browser_cli.command_security import CommandPolicy from browser_cli.commands import command_policy_from_options, command_policy_options from browser_cli.serve.runtime import ( _async_framed_send, _async_handle_client, _async_recv_all, _handle_client, _serve_async, console, ) from browser_cli.serve.security import RateLimiter, ServeSecurity, key_policies_from_authorized_keys from browser_cli.version_manager import get_installed_version __all__ = [ "_async_framed_send", "_async_handle_client", "_async_recv_all", "_handle_client", "_serve_async", "cmd_serve", ] def _is_loopback(host: str) -> bool: return host in {"127.0.0.1", "localhost", "::1"} @click.command("serve") @click.option("--host", default="127.0.0.1", show_default=True, help="Address to bind.") @click.option("--port", default=8765, show_default=True, type=int, help="TCP port to listen on.") @click.option("--no-auth", is_flag=True, default=False, help="Disable authentication (dangerous).") @click.option( "--authorized-keys", "auth_keys_file", default=None, metavar="FILE", help="File of trusted Ed25519 public keys (one hex per line). Required unless --no-auth.", ) @click.option( "--no-compress", "no_compress", is_flag=True, default=False, help="Disable response compression / msgpack even for clients that support it.", ) @click.option( "--rate-limit", default=100.0, show_default=True, type=float, help="Max commands/sec per client key (0 disables).", ) @command_policy_options @click.pass_context def cmd_serve(ctx, host, port, no_auth, auth_keys_file, no_compress, rate_limit, allow_read_page, allow_control, allow_dangerous, allow_keys, allow_all): """Expose this browser over TCP so remote hosts can control it. Commands are gated by a safe-only policy by default; remote clients can only run read-only status/listing commands. Open more with --allow-read-page, --allow-control, --allow-dangerous, or --allow-all (full control). Per-key overrides come from an ``allow:`` token in authorized_keys (set via ``auth trust --allow-*``), and --rate-limit throttles each client key. """ profile = ctx.obj.get("browser") if ctx.obj else None compress = not no_compress if no_auth and not _is_loopback(host): console.print( "[red]Error:[/red] --no-auth is only allowed on loopback hosts " "(127.0.0.1, localhost, ::1). Use --authorized-keys to expose this browser to the network." ) sys.exit(1) if host in ("0.0.0.0", "::"): console.print( "[yellow]Warning:[/yellow] Binding to all interfaces — " "anyone who can reach this port controls your browser." ) policy = command_policy_from_options( allow_read_page=allow_read_page, allow_control=allow_control, allow_dangerous=allow_dangerous, allow_keys=allow_keys, allow_all=allow_all, ) auth_keys_path = _resolve_auth_keys_path(auth_keys_file, no_auth) if auth_keys_path is False: sys.exit(1) security = _build_security(policy, auth_keys_path, rate_limit) _print_startup(host, port, profile, auth_keys_path, compress, security) try: asyncio.run(_serve_async(host, port, profile, auth_keys_path, compress, security)) except OSError as e: console.print(f"[red]Cannot bind to {host}:{port}:[/red] {e}") sys.exit(1) except KeyboardInterrupt: console.print("[yellow]Stopped.[/yellow]") def _build_security(policy, auth_keys_path, rate_limit) -> ServeSecurity: """Assemble the serve-time security context from the authorized_keys file.""" key_policies: dict = {} key_names: dict = {} if auth_keys_path is not None: from browser_cli.auth import load_authorized_keys_with_names key_names = {pk.strip().lower(): name for pk, name in load_authorized_keys_with_names(auth_keys_path)} key_policies = key_policies_from_authorized_keys(auth_keys_path) rate_limiter = RateLimiter(rate_limit) if rate_limit and rate_limit > 0 else None return ServeSecurity(policy=policy, key_policies=key_policies, key_names=key_names, rate_limiter=rate_limiter) def _resolve_auth_keys_path(auth_keys_file: str | None, no_auth: bool) -> Path | None | bool: if auth_keys_file: from browser_cli.auth import load_authorized_keys auth_keys_path = Path(auth_keys_file) if not load_authorized_keys(auth_keys_path): console.print(f"[yellow]Warning:[/yellow] No authorized keys found in {auth_keys_path}") return auth_keys_path if no_auth: return None console.print( "[red]Error:[/red] --authorized-keys FILE is required. " "Use --no-auth to explicitly disable auth (dangerous)." ) return False def _print_startup(host: str, port: int, profile: str | None, auth_keys_path: Path | None, compress: bool, security: ServeSecurity | None = None) -> None: current_ver = get_installed_version() security = security if security is not None else ServeSecurity() browser_hint = f" (browser: {profile})" if profile else "" console.print(f"[green]Serving browser{browser_hint} →[/green] [cyan]{host}:{port}[/cyan] [dim]v{current_ver}[/dim]") if auth_keys_path is not None: from browser_cli.auth import load_authorized_keys n = len(load_authorized_keys(auth_keys_path)) console.print(f" Auth: [bold green]Ed25519 pubkey[/bold green] ({n} trusted key{'s' if n != 1 else ''})") else: console.print("[yellow] Auth disabled (--no-auth)[/yellow]") _print_policy_status(security.policy) if security.key_policies: console.print(f" Per-key: [green]{len(security.key_policies)} override(s)[/green] [dim](allow: in authorized_keys)[/dim]") if security.rate_limiter is not None: console.print(f" Rate: [green]{security.rate_limiter.rate:g}/s per key[/green] [dim](burst {security.rate_limiter.capacity:g})[/dim]") else: console.print(" Rate: [yellow]unlimited[/yellow] [dim](--rate-limit 0)[/dim]") console.print(f" CLI: [dim]browser-cli --remote {host}:{port} tabs list[/dim]") console.print(f" Python: [dim]BrowserCLI(remote=\"{host}:{port}\").tabs.list()[/dim]") _print_encoding_status(compress) console.print("Ctrl-C to stop.\n") def _print_policy_status(policy: CommandPolicy | None) -> None: if policy is None or policy == CommandPolicy.unrestricted(): console.print(" Policy: [yellow]unrestricted (--allow-all)[/yellow] [dim](every command allowed, incl. dom.eval/storage)[/dim]") return allowed = ["safe"] if policy.allow_read_page: allowed.append("read-page") if policy.allow_control: allowed.append("control") if policy.allow_dangerous: allowed.append("dangerous") if policy.allow_keys: allowed.append("keys") console.print(f" Policy: [green]restricted[/green] [dim](allowed: {', '.join(allowed)})[/dim]") def _print_encoding_status(compress: bool) -> None: if not compress: console.print(" Encode: [yellow]off (--no-compress)[/yellow]") return codecs = "+".join(transport.supported_compression()) sers = "+".join(transport.supported_serialization()) console.print( " Encode: [green]on[/green] " f"[dim](compression: {codecs}; serialization: {sers}; per-client negotiated)[/dim]" )