Files
browser-cli/browser_cli/serve/auth.py
T
daniel156161 076914e5b7 refactor: reorganize client transport and extension internals
- Split client, native, remote, serve, markdown, and SDK internals into focused packages with direct imports.
- Move local and remote transport framing/protocol helpers behind clearer module boundaries.
- Break up the extension injected DOM logic into a separate content dispatch bundle and dedicated content modules.
- Add explicit client handling for passive remote discovery without noisy PQ warnings.
- Keep behavior covered with updated unit, integration, and extension tests.
2026-06-13 23:31:24 +02:00

108 lines
4.4 KiB
Python

"""Client validation and authentication for ``browser-cli serve``."""
from __future__ import annotations
import json
import re
from typing import Literal
from browser_cli.compat import adapt_auth
from browser_cli.serve.logging import log_request
from browser_cli.version_manager import PROTOCOL_MIN_CLIENT, parse_version
_UA_PATTERN = re.compile(r"^browser-cli/\d")
AuthDecodeResult = tuple[bytes | None, bool] | tuple[Literal[False], Literal[False]]
class ServeAuthMixin:
addr: tuple
command: str
client_ver: str
msg_id: object
nonce: str
pq_private_key: object | None
auth_keys: list[str] | None
response_secret: bytes | None
async def send_error(self, msg: str, msg_id=None) -> None: ...
async def validate_client(self, msg: dict) -> bool:
self.msg_id = msg.get("id")
ua = msg.get("user_agent") or ""
if not _UA_PATTERN.match(ua):
await self.send_error("forbidden: client required")
log_request(self.addr, msg.get("command", "?"), None, "DENIED", f"bad user-agent: {ua!r}")
return False
try:
self.client_ver = ua.split("/", 1)[1]
if parse_version(self.client_ver) < parse_version(PROTOCOL_MIN_CLIENT):
await self.send_error(f"client version {self.client_ver} is too old; please upgrade to >= {PROTOCOL_MIN_CLIENT}")
log_request(self.addr, msg.get("command", "?"), None, "DENIED", f"client {self.client_ver} < min {PROTOCOL_MIN_CLIENT}")
return False
except (IndexError, ValueError):
pass
return True
async def authenticate(self, msg: dict) -> dict | None:
if self.auth_keys is None:
return msg
pub = msg.get("pubkey") or ""
sig = msg.get("sig") or ""
if not pub or not sig:
await self.send_error("unauthorized: pubkey auth required — run 'browser-cli auth keygen' on the client")
log_request(self.addr, self.command, None, "DENIED", "missing pubkey/sig")
return None
if pub not in self.auth_keys:
await self.send_error("unauthorized: untrusted public key")
log_request(self.addr, self.command, None, "DENIED", "untrusted key")
return None
pq_shared_secret, transport_encrypted = await self._decode_pq_transport(msg, pub, sig)
if pq_shared_secret is False:
return None
from browser_cli.auth import verify
if not verify(pub, bytes.fromhex(self.nonce), msg, sig, pq_shared_secret):
await self.send_error("unauthorized: invalid signature")
log_request(self.addr, self.command, None, "DENIED", "bad signature")
return None
self.response_secret = pq_shared_secret if transport_encrypted else None
return msg
async def _decode_pq_transport(self, msg: dict, pub: str, sig: str) -> AuthDecodeResult:
pq_shared_secret = None
transport_encrypted = False
if self.pq_private_key is None:
return pq_shared_secret, transport_encrypted
kex = msg.get("pq_kex") or {}
pq_required = parse_version(self.client_ver) >= parse_version("0.9.5")
if not isinstance(kex, dict) or kex.get("alg") != "ML-KEM-768" or not kex.get("ciphertext"):
if pq_required:
await self.send_error("unauthorized: post-quantum key exchange required")
log_request(self.addr, self.command, None, "DENIED", "missing pq kex")
return False, False
return pq_shared_secret, transport_encrypted
try:
from browser_cli.auth import pq_decrypt, pq_kex_server_decapsulate
pq_shared_secret = pq_kex_server_decapsulate(self.pq_private_key, str(kex["ciphertext"]))
if "encrypted" in msg:
decrypted_msg = json.loads(pq_decrypt(pq_shared_secret, "request", msg["encrypted"]))
if not isinstance(decrypted_msg, dict):
raise ValueError("encrypted request is not a JSON object")
decrypted_msg.update({"pubkey": pub, "sig": sig, "pq_kex": kex})
msg.clear()
msg.update(adapt_auth(decrypted_msg, self.client_ver))
self.msg_id = msg.get("id", self.msg_id)
self.command = msg.get("command", "?")
transport_encrypted = True
elif pq_required:
await self.send_error("unauthorized: post-quantum encrypted transport required")
log_request(self.addr, self.command, None, "DENIED", "missing pq transport")
return False, False
except Exception:
await self.send_error("unauthorized: invalid post-quantum encrypted transport")
log_request(self.addr, self.command, None, "DENIED", "bad pq transport")
return False, False
return pq_shared_secret, transport_encrypted