"""Challenge/response auth helpers for remote TCP transport.""" from __future__ import annotations import asyncio import json import sys from collections.abc import Callable from typing import TypeVar from browser_cli.errors import BrowserNotConnected from browser_cli.version_manager import USER_AGENT T = TypeVar("T") AUTH_FIELDS = {"token", "pubkey", "sig", "pq_kex", "encrypted", "_suppress_pq_warning"} PQ_WARNING = ( "** WARNING: connection is not using a post-quantum key exchange algorithm.\n" "** This session may be vulnerable to store now, decrypt later attacks.\n" ) def parse_challenge(raw: bytes) -> tuple[dict | None, str | None]: try: challenge = json.loads(raw) nonce_hex = challenge.get("nonce") if challenge.get("type") == "challenge" else None return challenge, nonce_hex except (json.JSONDecodeError, AttributeError): return None, None def check_min_client_version(challenge: dict | None) -> None: min_ver = challenge.get("min_client_version") if isinstance(challenge, dict) else None if not min_ver: return from browser_cli.version_manager import parse_version try: client_ver = USER_AGENT.split("/", 1)[1] if parse_version(client_ver) < parse_version(min_ver): raise BrowserNotConnected( f"Client version {client_ver} is too old for this server " f"(requires >= {min_ver}). Run: pip install --upgrade browser-cli" ) except (IndexError, ValueError): pass def clean_message(msg: dict) -> dict: return {key: value for key, value in msg.items() if key not in AUTH_FIELDS} def get_pq_public_key(challenge: dict | None) -> str | None: if not isinstance(challenge, dict): return None from browser_cli.auth import PQ_KEX_ALG kex = challenge.get("pq_kex") if isinstance(kex, dict) and kex.get("alg") == PQ_KEX_ALG and kex.get("public_key"): return str(kex["public_key"]) return None def signed_payload(clean_msg: dict, private_key, nonce_hex: str, pq_shared_secret: bytes | None) -> dict: from browser_cli.auth import pq_encrypt, public_key_hex, sign nonce = bytes.fromhex(nonce_hex) sig = sign(private_key, nonce, clean_msg, pq_shared_secret) pubkey = public_key_hex(private_key) if pq_shared_secret is None: return {**clean_msg, "pubkey": pubkey, "sig": sig.hex()} encrypted = pq_encrypt(pq_shared_secret, "request", json.dumps(clean_msg).encode("utf-8")) return { "id": clean_msg.get("id"), "user_agent": clean_msg.get("user_agent"), "pubkey": pubkey, "sig": sig.hex(), "pq_kex": clean_msg["pq_kex"], "encrypted": encrypted, } def emit_no_pq_warning(enabled: bool) -> None: if enabled: sys.stderr.write(PQ_WARNING) def build_auth_message( msg: dict, challenge: dict | None, nonce_hex: str | None, private_key, encapsulate: Callable[[str], tuple[str, bytes]], *, warn_no_pq: bool = True, ) -> tuple[dict, bytes | None]: if not nonce_hex or private_key is None: emit_no_pq_warning(warn_no_pq) return msg, None clean_msg = clean_message(msg) pq_shared_secret = None pq_public_key = get_pq_public_key(challenge) if pq_public_key: from browser_cli.auth import PQ_KEX_ALG ciphertext_hex, pq_shared_secret = encapsulate(pq_public_key) clean_msg["pq_kex"] = {"alg": PQ_KEX_ALG, "ciphertext": ciphertext_hex} else: emit_no_pq_warning(warn_no_pq) return signed_payload(clean_msg, private_key, nonce_hex, pq_shared_secret), pq_shared_secret async def build_auth_message_async( msg: dict, challenge: dict | None, nonce_hex: str | None, private_key, *, warn_no_pq: bool = True, ) -> tuple[dict, bytes | None]: def encapsulate(public_key: str) -> tuple[str, bytes]: from browser_cli.auth import pq_kex_client_encapsulate return pq_kex_client_encapsulate(public_key) return await asyncio.to_thread( build_auth_message, msg, challenge, nonce_hex, private_key, encapsulate, warn_no_pq=warn_no_pq, ) def decode_pq_response(response: bytes | None, pq_shared_secret: bytes | None) -> bytes | None: if response is None or pq_shared_secret is None: return response try: from browser_cli.auth import pq_decrypt envelope = json.loads(response) if isinstance(envelope, dict) and "encrypted" in envelope: return pq_decrypt(pq_shared_secret, "response", envelope["encrypted"]) except Exception as exc: raise BrowserNotConnected(f"Cannot decrypt post-quantum remote response: {exc}") from exc return response def with_challenge(challenge_raw: bytes, msg: dict, private_key, build_auth: Callable[[dict, dict | None, str | None, object], T]) -> T: if challenge_raw is None: raise BrowserNotConnected("No challenge received from remote endpoint") challenge, nonce_hex = parse_challenge(challenge_raw) check_min_client_version(challenge) return build_auth(msg, challenge, nonce_hex, private_key) def should_warn_no_pq(msg: dict) -> bool: return not bool(msg.pop("_suppress_pq_warning", False))