Files
browser-cli/browser_cli/commands/serve_http.py
T
daniel156161 5cec57e06d
Testing / remote-protocol-compat (0.9.3) (push) Successful in 40s
Testing / remote-protocol-compat (0.9.5) (push) Successful in 38s
Testing / test (push) Failing after 1m3s
Package Extension / package-extension (push) Successful in 29s
Build & Publish Package / publish (push) Successful in 33s
feat!: harden raw browser control and packaging
- Add safe-by-default policy gates for raw command surfaces: command, script, and serve-http /command.

- Require explicit opt-ins for page reads, browser control, and high-risk commands such as dom.eval, storage.*, and screenshots.

- Remove all cookies support from CLI, SDK, extension commands, permissions, constants, docs, and tests.

- Add diagnostic, events, watch, workspace, remote, raw command, script, HTTP gateway, tree-view, session import/export, and extension info/capability commands.

- Add Chrome Web Store packaging that strips manifest.key while keeping local packages with a stable native-messaging extension ID.

- Bump browser-cli and extension version to 0.14.1 and cover the new behavior with pytest and extension packaging tests.

BREAKING CHANGE: cookies commands and the b.cookies SDK namespace have been removed; generic raw command execution now blocks non-safe commands unless explicitly allowed.
2026-06-14 14:33:15 +02:00

116 lines
4.5 KiB
Python

from __future__ import annotations
import json
import secrets
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from urllib.parse import urlparse
import click
from rich.console import Console
from browser_cli import BrowserCLI
from browser_cli.command_security import CommandPolicy, assert_command_allowed
console = Console()
def _is_loopback(host: str) -> bool:
return host in {"127.0.0.1", "localhost", "::1"}
class _Handler(BaseHTTPRequestHandler):
client: BrowserCLI
token: str | None = None
policy: CommandPolicy = CommandPolicy()
def _authorized(self) -> bool:
if self.token is None:
return True
if self.headers.get("Authorization", "") == f"Bearer {self.token}":
return True
return self.headers.get("X-Browser-CLI-Token") == self.token
def _require_auth(self) -> bool:
if self._authorized():
return True
self._send(401, {"error": "missing or invalid token"})
return False
def _send(self, status: int, payload):
raw = json.dumps(payload, default=str).encode("utf-8")
self.send_response(status)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Length", str(len(raw)))
self.end_headers()
self.wfile.write(raw)
def do_GET(self):
path = urlparse(self.path).path
try:
if path != "/health" and not self._require_auth():
return
if path == "/tabs":
self._send(200, [t.__dict__ for t in self.client.tabs.list()])
elif path == "/clients":
self._send(200, self.client.clients())
elif path == "/health":
self._send(200, {"ok": True})
else:
self._send(404, {"error": "not found"})
except Exception as exc:
self._send(500, {"error": str(exc)})
def do_POST(self):
path = urlparse(self.path).path
try:
length = int(self.headers.get("Content-Length", "0"))
body = json.loads(self.rfile.read(length) or b"{}")
if path == "/command":
if not self._require_auth():
return
command = body.get("command")
assert_command_allowed(command, self.policy)
self._send(200, {"result": self.client.command(command, body.get("args") or {})})
else:
self._send(404, {"error": "not found"})
except PermissionError as exc:
self._send(403, {"error": str(exc)})
except Exception as exc:
self._send(500, {"error": str(exc)})
def log_message(self, fmt, *args):
console.print(f"[dim]http[/dim] {self.address_string()} {fmt % args}")
@click.command("serve-http")
@click.option("--host", default="127.0.0.1", show_default=True)
@click.option("--port", type=int, default=8766, show_default=True)
@click.option("--browser", default=None, help="Browser alias to target")
@click.option("--remote", default=None, help="Remote endpoint to target")
@click.option("--key", default=None, help="Remote auth key spec")
@click.option("--token", default=None, help="Bearer token required for HTTP access (generated by default)")
@click.option("--no-auth", is_flag=True, help="Disable HTTP auth (only allowed on loopback hosts)")
@click.option("--allow-read-page", is_flag=True, help="Allow /command to run page-content read commands")
@click.option("--allow-control", is_flag=True, help="Allow /command to run browser-control commands")
@click.option("--allow-dangerous", is_flag=True, help="Allow /command to run high-risk commands")
def cmd_serve_http(host, port, browser, remote, key, token, no_auth, allow_read_page, allow_control, allow_dangerous):
"""Expose a tiny local HTTP JSON gateway (/tabs, /clients, /command).
Auth is enabled by default. Pass the printed token as either
``Authorization: Bearer <token>`` or ``X-Browser-CLI-Token: <token>``.
"""
if no_auth and not _is_loopback(host):
raise click.ClickException("--no-auth is only allowed on loopback hosts")
auth_token = None if no_auth else (token or secrets.token_urlsafe(32))
policy = CommandPolicy(allow_read_page=allow_read_page, allow_control=allow_control, allow_dangerous=allow_dangerous)
handler = type(
"BrowserCLIHTTPHandler",
(_Handler,),
{"client": BrowserCLI(browser=browser, remote=remote, key=key), "token": auth_token, "policy": policy},
)
server = ThreadingHTTPServer((host, port), handler)
console.print(f"[green]HTTP gateway listening on http://{host}:{port}[/green]")
if auth_token:
console.print(f"[yellow]Token:[/yellow] {auth_token}")
try:
server.serve_forever()
except KeyboardInterrupt:
console.print("\n[yellow]Stopping HTTP gateway[/yellow]")