from __future__ import annotations import json import secrets from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer from urllib.parse import urlparse import click from rich.console import Console from browser_cli import BrowserCLI from browser_cli.command_security import CommandPolicy, assert_command_allowed from browser_cli.commands import command_policy_from_options, command_policy_options console = Console() def _is_loopback(host: str) -> bool: return host in {"127.0.0.1", "localhost", "::1"} class _Handler(BaseHTTPRequestHandler): client: BrowserCLI token: str | None = None policy: CommandPolicy = CommandPolicy() def _authorized(self) -> bool: if self.token is None: return True bearer = self.headers.get("Authorization", "") if bearer.startswith("Bearer ") and secrets.compare_digest(bearer[len("Bearer "):], self.token): return True header = self.headers.get("X-Browser-CLI-Token") return header is not None and secrets.compare_digest(header, self.token) def _require_auth(self) -> bool: if self._authorized(): return True self._send(401, {"error": "missing or invalid token"}) return False def _send(self, status: int, payload): raw = json.dumps(payload, default=str).encode("utf-8") self.send_response(status) self.send_header("Content-Type", "application/json") self.send_header("Content-Length", str(len(raw))) self.end_headers() self.wfile.write(raw) def do_GET(self): path = urlparse(self.path).path try: if path != "/health" and not self._require_auth(): return if path == "/tabs": self._send(200, [t.__dict__ for t in self.client.tabs.list()]) elif path == "/clients": self._send(200, self.client.clients()) elif path == "/health": self._send(200, {"ok": True}) else: self._send(404, {"error": "not found"}) except Exception as exc: self._send(500, {"error": str(exc)}) def do_POST(self): path = urlparse(self.path).path try: length = int(self.headers.get("Content-Length", "0")) body = json.loads(self.rfile.read(length) or b"{}") if path == "/command": if not self._require_auth(): return command = body.get("command") assert_command_allowed(command, self.policy) self._send(200, {"result": self.client.command(command, body.get("args") or {})}) else: self._send(404, {"error": "not found"}) except PermissionError as exc: self._send(403, {"error": str(exc)}) except Exception as exc: self._send(500, {"error": str(exc)}) def log_message(self, fmt, *args): console.print(f"[dim]http[/dim] {self.address_string()} {fmt % args}") @click.command("serve-http") @click.option("--host", default="127.0.0.1", show_default=True) @click.option("--port", type=int, default=8766, show_default=True) @click.option("--browser", default=None, help="Browser alias to target") @click.option("--remote", default=None, help="Remote endpoint to target") @click.option("--key", default=None, help="Remote auth key spec") @click.option("--token", default=None, help="Bearer token required for HTTP access (generated by default)") @click.option("--no-auth", is_flag=True, help="Disable HTTP auth (only allowed on loopback hosts)") @command_policy_options def cmd_serve_http(host, port, browser, remote, key, token, no_auth, allow_read_page, allow_control, allow_dangerous, allow_keys, allow_all): """Expose a tiny local HTTP JSON gateway (/tabs, /clients, /command). Auth is enabled by default. Pass the printed token as either ``Authorization: Bearer `` or ``X-Browser-CLI-Token: ``. """ if no_auth and not _is_loopback(host): raise click.ClickException("--no-auth is only allowed on loopback hosts") auth_token = None if no_auth else (token or secrets.token_urlsafe(32)) policy = command_policy_from_options(allow_read_page=allow_read_page, allow_control=allow_control, allow_dangerous=allow_dangerous, allow_keys=allow_keys, allow_all=allow_all) handler = type( "BrowserCLIHTTPHandler", (_Handler,), {"client": BrowserCLI(browser=browser, remote=remote, key=key), "token": auth_token, "policy": policy}, ) server = ThreadingHTTPServer((host, port), handler) console.print(f"[green]HTTP gateway listening on http://{host}:{port}[/green]") if auth_token: console.print(f"[yellow]Token:[/yellow] {auth_token}") try: server.serve_forever() except KeyboardInterrupt: console.print("\n[yellow]Stopping HTTP gateway[/yellow]")