refactor: reorganize client transport and extension internals

- Split client, native, remote, serve, markdown, and SDK internals into focused packages with direct imports.
- Move local and remote transport framing/protocol helpers behind clearer module boundaries.
- Break up the extension injected DOM logic into a separate content dispatch bundle and dedicated content modules.
- Add explicit client handling for passive remote discovery without noisy PQ warnings.
- Keep behavior covered with updated unit, integration, and extension tests.
This commit is contained in:
2026-06-13 23:31:24 +02:00
parent fd5447cbb9
commit 076914e5b7
88 changed files with 7491 additions and 5228 deletions
+1
View File
@@ -0,0 +1 @@
"""Native messaging host internals."""
+211
View File
@@ -0,0 +1,211 @@
#!/usr/bin/env python3
"""
Native Messaging Host for browser-cli.
Chrome launches this process when extension calls connectNative().
It relays messages between extension (stdin/stdout Native Messaging protocol)
and CLI (local IPC endpoint: Unix socket on Unix, named pipe on Windows).
"""
import json
import math
import os
import queue
import socket
import sys
import threading
import uuid
from pathlib import Path
from browser_cli.native import local_server, protocol
from browser_cli.constants import DEFAULT_ALIAS, DEFAULT_PAGE_SIZE, PAGEABLE_COMMANDS
from browser_cli.platform import endpoint_for_alias, is_windows, registry_path, runtime_dir
from browser_cli.registry import update_registry
SOCKET_PATH: str = "" # set after hello handshake
PENDING: dict[str, queue.Queue] = {}
PENDING_LOCK = threading.Lock()
WRITE_LOCK = threading.Lock()
REGISTRY_PATH = registry_path()
PAGE_SIZE = int(os.environ.get("BROWSER_CLI_PAGE_SIZE", str(DEFAULT_PAGE_SIZE)))
# --- Registry helpers ---
def _registry_add(alias: str, sock_path: str) -> None:
try:
update_registry(alias, sock_path, REGISTRY_PATH)
except Exception:
pass
def _registry_remove(alias: str) -> None:
try:
update_registry(alias, None, REGISTRY_PATH)
except Exception:
pass
def _socket_path_for(alias: str) -> str:
return endpoint_for_alias(alias)
def _resolve_profile_alias(first_msg: dict | None) -> str:
"""Return a unique alias when the extension did not provide one."""
if first_msg and first_msg.get("type") == "hello":
alias = first_msg.get("alias")
if alias and alias != DEFAULT_ALIAS:
return alias
return str(uuid.uuid4())
# --- Thread A: read messages from extension (stdin) ---
def stdin_reader(alias: str):
stdin = sys.stdin.buffer
while True:
msg = protocol.read_native_message(stdin)
if msg is None:
# Extension disconnected — clean up and exit
_cleanup(alias)
os._exit(0)
# Profile alias handshake
if msg.get("type") == "hello":
continue # already handled during startup
if msg.get("type") == "bye":
_cleanup(alias)
os._exit(0)
msg_id = msg.get("id")
if msg_id:
with PENDING_LOCK:
q = PENDING.get(msg_id)
if q:
q.put(msg)
# --- Thread B: accept CLI socket connections ---
def _json_response(result: dict) -> bytes:
return json.dumps(result).encode("utf-8")
def _error_response(exc: Exception) -> bytes:
return _json_response({"success": False, "error": str(exc)})
def _decode_cli_command(data: bytes) -> dict:
cmd = json.loads(data)
if "id" not in cmd:
cmd["id"] = str(uuid.uuid4())
return cmd
def _handle_cli_payload(data: bytes) -> bytes:
return _json_response(_handle_browser_command(_decode_cli_command(data)))
def _handle_browser_command(cmd: dict) -> dict:
command = cmd.get("command")
if command in PAGEABLE_COMMANDS:
return _collect_paged_browser_command(cmd)
return _send_browser_command(cmd)
def _send_browser_command(cmd: dict, timeout: int = 30) -> dict:
msg_id = cmd.get("id") or str(uuid.uuid4())
cmd["id"] = msg_id
response_queue: queue.Queue = queue.Queue()
with PENDING_LOCK:
PENDING[msg_id] = response_queue
try:
with WRITE_LOCK:
protocol.write_native_message(sys.stdout.buffer, cmd)
try:
return response_queue.get(timeout=timeout)
except queue.Empty:
return {"id": msg_id, "success": False, "error": "timeout waiting for browser response"}
finally:
with PENDING_LOCK:
PENDING.pop(msg_id, None)
def _collect_paged_browser_command(cmd: dict) -> dict:
original_id = cmd.get("id") or str(uuid.uuid4())
offset = 0
items = []
total = None
max_pages = math.ceil(10_000 / PAGE_SIZE)
pages_fetched = 0
while True:
if pages_fetched >= max_pages:
return {"id": original_id, "success": False, "error": f"paging loop exceeded {max_pages} pages — extension bug?"}
pages_fetched += 1
page_cmd = dict(cmd)
page_cmd["id"] = str(uuid.uuid4())
page_args = dict(cmd.get("args") or {})
page_args["__page"] = {"offset": offset, "limit": PAGE_SIZE}
page_cmd["args"] = page_args
result = _send_browser_command(page_cmd)
result["id"] = original_id
if not result.get("success", True):
return result
data = result.get("data")
if not isinstance(data, dict) or data.get("__browserCliPage") is not True:
return result
page_items = data.get("items") or []
if not isinstance(page_items, list):
return {"id": original_id, "success": False, "error": "invalid paged response from browser"}
items.extend(page_items)
total = data.get("total", total)
next_offset = data.get("nextOffset")
if next_offset is None:
break
offset = int(next_offset)
return {"id": original_id, "success": True, "data": items, "pageSize": PAGE_SIZE, "total": total}
# --- Socket helpers (length-prefixed framing) ---
def _cleanup(alias: str):
try:
if not is_windows():
Path(_socket_path_for(alias)).unlink(missing_ok=True)
except Exception:
pass
_registry_remove(alias)
def main():
stdin = sys.stdin.buffer
# Wait for the hello handshake to learn the profile alias
first_msg = protocol.read_native_message(stdin)
if first_msg and first_msg.get("type") == "hello":
alias = _resolve_profile_alias(first_msg)
else:
# No hello — use a generated alias; first_msg is dropped (no response path).
alias = str(uuid.uuid4())
runtime_dir().mkdir(mode=0o700, exist_ok=True)
sock_path = _socket_path_for(alias)
if not is_windows():
path = Path(sock_path)
if path.exists():
path.unlink()
bound_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
bound_sock.bind(sock_path)
os.chmod(sock_path, 0o600)
bound_sock.listen(16)
else:
bound_sock = None
_registry_add(alias, sock_path)
t = threading.Thread(
target=local_server.socket_server,
args=(sock_path, _handle_cli_payload, _error_response),
kwargs={"bound_sock": bound_sock},
daemon=True,
)
t.start()
stdin_reader(alias)
if __name__ == "__main__":
main()
+111
View File
@@ -0,0 +1,111 @@
"""Local IPC server loops used by the native messaging host."""
import asyncio
import os
import socket
import threading
from collections.abc import Callable
from multiprocessing.connection import Listener
from pathlib import Path
from browser_cli import framing, local_transport
from browser_cli.platform import is_windows
PayloadHandler = Callable[[bytes], bytes]
ErrorHandler = Callable[[Exception], bytes]
async def async_socket_server(
sock_path: str,
handle_payload: PayloadHandler,
error_response: ErrorHandler,
*,
bound_sock: socket.socket | None = None,
) -> None:
sock = bound_sock
if sock is None:
path = Path(sock_path)
if path.exists():
path.unlink()
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.bind(sock_path)
os.chmod(sock_path, 0o600)
sock.listen(16)
async def handle(reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None:
await async_handle_cli_connection(reader, writer, handle_payload, error_response)
server = await asyncio.start_unix_server(handle, sock=sock)
async with server:
await server.serve_forever()
def socket_server(
sock_path: str,
handle_payload: PayloadHandler,
error_response: ErrorHandler,
*,
bound_sock: socket.socket | None = None,
) -> None:
if is_windows():
windows_pipe_server(sock_path, handle_payload, error_response)
return
asyncio.run(async_socket_server(sock_path, handle_payload, error_response, bound_sock=bound_sock))
def windows_pipe_server(sock_path: str, handle_payload: PayloadHandler, error_response: ErrorHandler) -> None:
while True:
listener = None
try:
listener = Listener(sock_path, family="AF_PIPE")
conn = listener.accept()
except OSError:
if listener is not None:
try:
listener.close()
except Exception:
pass
break
threading.Thread(target=handle_cli_connection, args=(conn, handle_payload, error_response, listener), daemon=True).start()
async def async_handle_cli_connection(
reader: asyncio.StreamReader,
writer: asyncio.StreamWriter,
handle_payload: PayloadHandler,
error_response: ErrorHandler,
) -> None:
try:
data = await local_transport.async_recv_all(reader)
if not data:
return
response = await asyncio.to_thread(handle_payload, data)
await local_transport.async_send_all(writer, response)
except Exception as exc:
try:
await local_transport.async_send_all(writer, error_response(exc))
except Exception:
pass
finally:
writer.close()
try:
await writer.wait_closed()
except Exception:
pass
def send_cli_response(conn, response: bytes) -> None:
if is_windows():
conn.send_bytes(response)
else:
framing.send_frame(conn, response)
def handle_cli_connection(conn, handle_payload: PayloadHandler, error_response: ErrorHandler, listener=None) -> None:
try:
data = conn.recv_bytes() if is_windows() else framing.recv_frame(conn, allow_eof=True)
if not data:
return
send_cli_response(conn, handle_payload(data))
except Exception as exc:
try:
send_cli_response(conn, error_response(exc))
except Exception:
pass
finally:
conn.close()
if listener is not None:
listener.close()
+30
View File
@@ -0,0 +1,30 @@
"""Chrome Native Messaging stdio protocol helpers."""
from __future__ import annotations
import json
import struct
def read_exact_stream(stream, n: int) -> bytes | None:
buf = b""
while len(buf) < n:
chunk = stream.read(n - len(buf))
if not chunk:
return None
buf += chunk
return buf
def read_native_message(stream) -> dict | None:
raw_len = read_exact_stream(stream, 4)
if raw_len is None:
return None
msg_len = struct.unpack("<I", raw_len)[0]
data = read_exact_stream(stream, msg_len)
if data is None:
return None
return json.loads(data.decode("utf-8"))
def write_native_message(stream, msg: dict) -> None:
data = json.dumps(msg).encode("utf-8")
stream.write(struct.pack("<I", len(data)))
stream.write(data)
stream.flush()