fix: harden IPC, screenshot, paging, and tab filter error handling

- tabs.py: validate screenshot data URL prefix and catch binascii.Error
  instead of silently writing a zero-byte file or crashing with a raw traceback
- serve.py: add 30 s recv timeout on client connections to prevent unbounded
  thread accumulation; use hmac.compare_digest for constant-time token check
- native_host.py: bind Unix socket before _registry_add to eliminate the
  window where the registry points to an unbound path; cap paging loop at
  ceil(10000/PAGE_SIZE) iterations to guard against a misbehaving extension;
  remove dead no-hello fast-path queue that was registered but never consumed
- __init__.py: narrow _apply_tab_filter except to (AttributeError, TypeError)
  so broken filter functions raise instead of silently returning wrong results

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-05-02 15:03:01 +02:00
parent 753e4c4449
commit a8421e97f5
4 changed files with 40 additions and 22 deletions
+28 -17
View File
@@ -7,6 +7,7 @@ It relays messages between extension (stdin/stdout Native Messaging protocol)
and CLI (local IPC endpoint: Unix socket on Unix, named pipe on Windows).
"""
import json
import math
import os
import queue
import socket
@@ -121,7 +122,7 @@ def stdin_reader(alias: str):
# --- Thread B: accept CLI socket connections ---
def socket_server(sock_path: str):
def socket_server(sock_path: str, bound_sock: "socket.socket | None" = None):
if is_windows():
while True:
try:
@@ -132,13 +133,14 @@ def socket_server(sock_path: str):
threading.Thread(target=handle_cli_connection, args=(conn, listener), daemon=True).start()
return
path = Path(sock_path)
if path.exists():
path.unlink()
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.bind(sock_path)
sock.listen(16)
sock = bound_sock
if sock is None:
path = Path(sock_path)
if path.exists():
path.unlink()
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.bind(sock_path)
sock.listen(16)
while True:
try:
@@ -212,8 +214,13 @@ def _collect_paged_browser_command(cmd: dict) -> dict:
offset = 0
items = []
total = None
max_pages = math.ceil(10_000 / PAGE_SIZE)
pages_fetched = 0
while True:
if pages_fetched >= max_pages:
return {"id": original_id, "success": False, "error": f"paging loop exceeded {max_pages} pages — extension bug?"}
pages_fetched += 1
page_cmd = dict(cmd)
page_cmd["id"] = str(uuid.uuid4())
page_args = dict(cmd.get("args") or {})
@@ -284,21 +291,25 @@ def main():
if first_msg and first_msg.get("type") == "hello":
alias = _resolve_profile_alias(first_msg)
else:
# No hello — use a generated alias and re-queue the first command if needed.
# No hello — use a generated alias; first_msg is dropped (no response path).
alias = str(uuid.uuid4())
if first_msg:
msg_id = first_msg.get("id")
if msg_id:
q: queue.Queue = queue.Queue()
with PENDING_LOCK:
PENDING[msg_id] = q
write_native_message(sys.stdout.buffer, first_msg)
runtime_dir().mkdir(mode=0o700, exist_ok=True)
sock_path = _socket_path_for(alias)
if not is_windows():
path = Path(sock_path)
if path.exists():
path.unlink()
bound_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
bound_sock.bind(sock_path)
bound_sock.listen(16)
else:
bound_sock = None
_registry_add(alias, sock_path)
t = threading.Thread(target=socket_server, args=(sock_path,), daemon=True)
t = threading.Thread(target=socket_server, args=(sock_path,), kwargs={"bound_sock": bound_sock}, daemon=True)
t.start()
stdin_reader(alias)