"""Client validation and authentication for ``browser-cli serve``.""" from __future__ import annotations import json import re from typing import Literal from browser_cli.compat import adapt_auth from browser_cli.serve.logging import log_request from browser_cli.version_manager import PROTOCOL_MIN_CLIENT, parse_version _UA_PATTERN = re.compile(r"^browser-cli/\d") AuthDecodeResult = tuple[bytes | None, bool] | tuple[Literal[False], Literal[False]] class ServeAuthMixin: addr: tuple command: str client_ver: str msg_id: object nonce: str pq_private_key: object | None auth_keys: list[str] | None response_secret: bytes | None async def send_error(self, msg: str, msg_id=None) -> None: ... async def validate_client(self, msg: dict) -> bool: self.msg_id = msg.get("id") ua = msg.get("user_agent") or "" if not _UA_PATTERN.match(ua): await self.send_error("forbidden: client required") log_request(self.addr, msg.get("command", "?"), None, "DENIED", f"bad user-agent: {ua!r}") return False try: self.client_ver = ua.split("/", 1)[1] if parse_version(self.client_ver) < parse_version(PROTOCOL_MIN_CLIENT): await self.send_error(f"client version {self.client_ver} is too old; please upgrade to >= {PROTOCOL_MIN_CLIENT}") log_request(self.addr, msg.get("command", "?"), None, "DENIED", f"client {self.client_ver} < min {PROTOCOL_MIN_CLIENT}") return False except (IndexError, ValueError): pass return True async def authenticate(self, msg: dict) -> dict | None: if self.auth_keys is None: return msg pub = msg.get("pubkey") or "" sig = msg.get("sig") or "" if not pub or not sig: await self.send_error("unauthorized: pubkey auth required — run 'browser-cli auth keygen' on the client") log_request(self.addr, self.command, None, "DENIED", "missing pubkey/sig") return None if pub not in self.auth_keys: await self.send_error("unauthorized: untrusted public key") log_request(self.addr, self.command, None, "DENIED", "untrusted key") return None pq_shared_secret, transport_encrypted = await self._decode_pq_transport(msg, pub, sig) if pq_shared_secret is False: return None from browser_cli.auth import verify if not verify(pub, bytes.fromhex(self.nonce), msg, sig, pq_shared_secret): await self.send_error("unauthorized: invalid signature") log_request(self.addr, self.command, None, "DENIED", "bad signature") return None self.response_secret = pq_shared_secret if transport_encrypted else None return msg async def _decode_pq_transport(self, msg: dict, pub: str, sig: str) -> AuthDecodeResult: pq_shared_secret = None transport_encrypted = False if self.pq_private_key is None: return pq_shared_secret, transport_encrypted kex = msg.get("pq_kex") or {} pq_required = parse_version(self.client_ver) >= parse_version("0.9.5") if not isinstance(kex, dict) or kex.get("alg") != "ML-KEM-768" or not kex.get("ciphertext"): if pq_required: await self.send_error("unauthorized: post-quantum key exchange required") log_request(self.addr, self.command, None, "DENIED", "missing pq kex") return False, False return pq_shared_secret, transport_encrypted try: from browser_cli.auth import pq_decrypt, pq_kex_server_decapsulate pq_shared_secret = pq_kex_server_decapsulate(self.pq_private_key, str(kex["ciphertext"])) if "encrypted" in msg: decrypted_msg = json.loads(pq_decrypt(pq_shared_secret, "request", msg["encrypted"])) if not isinstance(decrypted_msg, dict): raise ValueError("encrypted request is not a JSON object") decrypted_msg.update({"pubkey": pub, "sig": sig, "pq_kex": kex}) msg.clear() msg.update(adapt_auth(decrypted_msg, self.client_ver)) self.msg_id = msg.get("id", self.msg_id) self.command = msg.get("command", "?") transport_encrypted = True elif pq_required: await self.send_error("unauthorized: post-quantum encrypted transport required") log_request(self.addr, self.command, None, "DENIED", "missing pq transport") return False, False except Exception: await self.send_error("unauthorized: invalid post-quantum encrypted transport") log_request(self.addr, self.command, None, "DENIED", "bad pq transport") return False, False return pq_shared_secret, transport_encrypted