076914e5b7
- Split client, native, remote, serve, markdown, and SDK internals into focused packages with direct imports. - Move local and remote transport framing/protocol helpers behind clearer module boundaries. - Break up the extension injected DOM logic into a separate content dispatch bundle and dedicated content modules. - Add explicit client handling for passive remote discovery without noisy PQ warnings. - Keep behavior covered with updated unit, integration, and extension tests.
108 lines
4.4 KiB
Python
108 lines
4.4 KiB
Python
"""Client validation and authentication for ``browser-cli serve``."""
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import re
|
|
from typing import Literal
|
|
|
|
from browser_cli.compat import adapt_auth
|
|
from browser_cli.serve.logging import log_request
|
|
from browser_cli.version_manager import PROTOCOL_MIN_CLIENT, parse_version
|
|
|
|
_UA_PATTERN = re.compile(r"^browser-cli/\d")
|
|
AuthDecodeResult = tuple[bytes | None, bool] | tuple[Literal[False], Literal[False]]
|
|
|
|
class ServeAuthMixin:
|
|
addr: tuple
|
|
command: str
|
|
client_ver: str
|
|
msg_id: object
|
|
nonce: str
|
|
pq_private_key: object | None
|
|
auth_keys: list[str] | None
|
|
response_secret: bytes | None
|
|
|
|
async def send_error(self, msg: str, msg_id=None) -> None: ...
|
|
|
|
async def validate_client(self, msg: dict) -> bool:
|
|
self.msg_id = msg.get("id")
|
|
ua = msg.get("user_agent") or ""
|
|
if not _UA_PATTERN.match(ua):
|
|
await self.send_error("forbidden: client required")
|
|
log_request(self.addr, msg.get("command", "?"), None, "DENIED", f"bad user-agent: {ua!r}")
|
|
return False
|
|
try:
|
|
self.client_ver = ua.split("/", 1)[1]
|
|
if parse_version(self.client_ver) < parse_version(PROTOCOL_MIN_CLIENT):
|
|
await self.send_error(f"client version {self.client_ver} is too old; please upgrade to >= {PROTOCOL_MIN_CLIENT}")
|
|
log_request(self.addr, msg.get("command", "?"), None, "DENIED", f"client {self.client_ver} < min {PROTOCOL_MIN_CLIENT}")
|
|
return False
|
|
except (IndexError, ValueError):
|
|
pass
|
|
return True
|
|
|
|
async def authenticate(self, msg: dict) -> dict | None:
|
|
if self.auth_keys is None:
|
|
return msg
|
|
|
|
pub = msg.get("pubkey") or ""
|
|
sig = msg.get("sig") or ""
|
|
if not pub or not sig:
|
|
await self.send_error("unauthorized: pubkey auth required — run 'browser-cli auth keygen' on the client")
|
|
log_request(self.addr, self.command, None, "DENIED", "missing pubkey/sig")
|
|
return None
|
|
if pub not in self.auth_keys:
|
|
await self.send_error("unauthorized: untrusted public key")
|
|
log_request(self.addr, self.command, None, "DENIED", "untrusted key")
|
|
return None
|
|
|
|
pq_shared_secret, transport_encrypted = await self._decode_pq_transport(msg, pub, sig)
|
|
if pq_shared_secret is False:
|
|
return None
|
|
|
|
from browser_cli.auth import verify
|
|
if not verify(pub, bytes.fromhex(self.nonce), msg, sig, pq_shared_secret):
|
|
await self.send_error("unauthorized: invalid signature")
|
|
log_request(self.addr, self.command, None, "DENIED", "bad signature")
|
|
return None
|
|
self.response_secret = pq_shared_secret if transport_encrypted else None
|
|
return msg
|
|
|
|
async def _decode_pq_transport(self, msg: dict, pub: str, sig: str) -> AuthDecodeResult:
|
|
pq_shared_secret = None
|
|
transport_encrypted = False
|
|
if self.pq_private_key is None:
|
|
return pq_shared_secret, transport_encrypted
|
|
|
|
kex = msg.get("pq_kex") or {}
|
|
pq_required = parse_version(self.client_ver) >= parse_version("0.9.5")
|
|
if not isinstance(kex, dict) or kex.get("alg") != "ML-KEM-768" or not kex.get("ciphertext"):
|
|
if pq_required:
|
|
await self.send_error("unauthorized: post-quantum key exchange required")
|
|
log_request(self.addr, self.command, None, "DENIED", "missing pq kex")
|
|
return False, False
|
|
return pq_shared_secret, transport_encrypted
|
|
|
|
try:
|
|
from browser_cli.auth import pq_decrypt, pq_kex_server_decapsulate
|
|
pq_shared_secret = pq_kex_server_decapsulate(self.pq_private_key, str(kex["ciphertext"]))
|
|
if "encrypted" in msg:
|
|
decrypted_msg = json.loads(pq_decrypt(pq_shared_secret, "request", msg["encrypted"]))
|
|
if not isinstance(decrypted_msg, dict):
|
|
raise ValueError("encrypted request is not a JSON object")
|
|
decrypted_msg.update({"pubkey": pub, "sig": sig, "pq_kex": kex})
|
|
msg.clear()
|
|
msg.update(adapt_auth(decrypted_msg, self.client_ver))
|
|
self.msg_id = msg.get("id", self.msg_id)
|
|
self.command = msg.get("command", "?")
|
|
transport_encrypted = True
|
|
elif pq_required:
|
|
await self.send_error("unauthorized: post-quantum encrypted transport required")
|
|
log_request(self.addr, self.command, None, "DENIED", "missing pq transport")
|
|
return False, False
|
|
except Exception:
|
|
await self.send_error("unauthorized: invalid post-quantum encrypted transport")
|
|
log_request(self.addr, self.command, None, "DENIED", "bad pq transport")
|
|
return False, False
|
|
return pq_shared_secret, transport_encrypted
|