188 lines
6.4 KiB
Python
188 lines
6.4 KiB
Python
import json
|
|
import tempfile
|
|
from pathlib import Path
|
|
|
|
import pytest
|
|
|
|
from browser_cli.auth import (
|
|
add_authorized_key,
|
|
canonical_payload,
|
|
generate_keypair,
|
|
load_authorized_keys,
|
|
load_authorized_keys_with_names,
|
|
load_private_key,
|
|
new_nonce,
|
|
pq_kex_client_encapsulate,
|
|
pq_kex_server_decapsulate,
|
|
pq_kex_server_keypair,
|
|
sign,
|
|
verify,
|
|
)
|
|
from browser_cli.client import _is_valid_key_spec
|
|
|
|
|
|
class TestGenerateKeypair:
|
|
def test_returns_pem_and_hex(self):
|
|
pem, pub_hex = generate_keypair()
|
|
assert pem.startswith(b"-----BEGIN PRIVATE KEY-----")
|
|
assert len(pub_hex) == 64
|
|
|
|
def test_each_call_unique(self):
|
|
_, pub1 = generate_keypair()
|
|
_, pub2 = generate_keypair()
|
|
assert pub1 != pub2
|
|
|
|
|
|
class TestCanonicalPayload:
|
|
def test_strips_auth_protocol_fields(self):
|
|
msg = {"command": "tabs.list", "id": "x", "pubkey": "abc", "sig": "def", "pq_kex": {"alg": "ML-KEM-768"}}
|
|
data = json.loads(canonical_payload(msg))
|
|
assert "pubkey" not in data
|
|
assert "sig" not in data
|
|
assert "pq_kex" not in data
|
|
|
|
def test_keys_sorted(self):
|
|
msg = {"z": 1, "a": 2, "m": 3}
|
|
payload = canonical_payload(msg).decode()
|
|
assert payload.index('"a"') < payload.index('"m"') < payload.index('"z"')
|
|
|
|
def test_deterministic(self):
|
|
msg = {"b": 2, "a": 1}
|
|
assert canonical_payload(msg) == canonical_payload(msg)
|
|
|
|
|
|
@pytest.fixture()
|
|
def keypair(tmp_path):
|
|
pem, pub_hex = generate_keypair()
|
|
key_path = tmp_path / "client.key.pem"
|
|
key_path.write_bytes(pem)
|
|
priv = load_private_key(key_path)
|
|
return priv, pub_hex
|
|
|
|
|
|
class TestSignVerify:
|
|
def test_valid_signature_verifies(self, keypair):
|
|
priv, pub_hex = keypair
|
|
nonce = bytes.fromhex(new_nonce())
|
|
msg = {"command": "tabs.list", "id": "uuid-1", "args": {}}
|
|
sig = sign(priv, nonce, msg).hex()
|
|
assert verify(pub_hex, nonce, msg, sig) is True
|
|
|
|
def test_tampered_sig_fails(self, keypair):
|
|
priv, pub_hex = keypair
|
|
nonce = bytes.fromhex(new_nonce())
|
|
msg = {"command": "tabs.list", "id": "x"}
|
|
sign(priv, nonce, msg)
|
|
assert verify(pub_hex, nonce, msg, "00" * 64) is False
|
|
|
|
def test_wrong_pubkey_fails(self, keypair):
|
|
priv, _ = keypair
|
|
_, other_pub = generate_keypair()
|
|
nonce = bytes.fromhex(new_nonce())
|
|
msg = {"command": "tabs.list"}
|
|
sig = sign(priv, nonce, msg).hex()
|
|
assert verify(other_pub, nonce, msg, sig) is False
|
|
|
|
def test_wrong_nonce_fails(self, keypair):
|
|
priv, pub_hex = keypair
|
|
nonce = bytes.fromhex(new_nonce())
|
|
msg = {"command": "tabs.list"}
|
|
sig = sign(priv, nonce, msg).hex()
|
|
other_nonce = bytes.fromhex(new_nonce())
|
|
assert verify(pub_hex, other_nonce, msg, sig) is False
|
|
|
|
def test_post_quantum_shared_secret_is_bound_to_signature(self, keypair):
|
|
priv, pub_hex = keypair
|
|
nonce = bytes.fromhex(new_nonce())
|
|
msg = {"command": "tabs.list", "pq_kex": {"alg": "ML-KEM-768", "ciphertext": "abcd"}}
|
|
sig = sign(priv, nonce, msg, b"shared-secret").hex()
|
|
assert verify(pub_hex, nonce, msg, sig, b"shared-secret") is True
|
|
assert verify(pub_hex, nonce, msg, sig, b"other-secret") is False
|
|
assert verify(pub_hex, nonce, msg, sig) is False
|
|
|
|
def test_garbage_pub_hex_returns_false_not_exception(self):
|
|
assert verify("not-hex!!!!", b"nonce", {}, "00" * 64) is False
|
|
|
|
def test_truncated_sig_hex_returns_false_not_exception(self, keypair):
|
|
_, pub_hex = keypair
|
|
assert verify(pub_hex, b"nonce", {}, "aabb") is False
|
|
|
|
def test_wrong_length_pubkey_returns_false_not_exception(self):
|
|
assert verify("aabbcc", b"nonce", {}, "00" * 64) is False
|
|
|
|
|
|
class TestPostQuantumKex:
|
|
def test_mlkem_roundtrip_when_backend_supports_it(self):
|
|
keypair = pq_kex_server_keypair()
|
|
if keypair is None:
|
|
pytest.skip("ML-KEM backend not available")
|
|
priv, pub = keypair
|
|
|
|
ciphertext_hex, client_secret = pq_kex_client_encapsulate(pub.hex())
|
|
server_secret = pq_kex_server_decapsulate(priv, ciphertext_hex)
|
|
|
|
assert server_secret == client_secret
|
|
assert len(server_secret) == 32
|
|
|
|
|
|
class TestAuthorizedKeys:
|
|
def test_add_and_load(self, tmp_path):
|
|
path = tmp_path / "authorized_keys"
|
|
_, pub = generate_keypair()
|
|
assert add_authorized_key(path, pub, "alice") is True
|
|
assert pub in load_authorized_keys(path)
|
|
|
|
def test_add_duplicate_returns_false(self, tmp_path):
|
|
path = tmp_path / "authorized_keys"
|
|
_, pub = generate_keypair()
|
|
add_authorized_key(path, pub)
|
|
assert add_authorized_key(path, pub) is False
|
|
|
|
def test_load_with_names(self, tmp_path):
|
|
path = tmp_path / "authorized_keys"
|
|
_, pub1 = generate_keypair()
|
|
_, pub2 = generate_keypair()
|
|
add_authorized_key(path, pub1, "alice")
|
|
add_authorized_key(path, pub2)
|
|
entries = load_authorized_keys_with_names(path)
|
|
assert (pub1, "alice") in entries
|
|
assert (pub2, "") in entries
|
|
|
|
def test_ignores_comment_lines(self, tmp_path):
|
|
path = tmp_path / "authorized_keys"
|
|
path.write_text("# this is a comment\n")
|
|
assert load_authorized_keys(path) == []
|
|
|
|
def test_returns_empty_for_missing_file(self, tmp_path):
|
|
assert load_authorized_keys(tmp_path / "nofile") == []
|
|
|
|
|
|
class TestIsValidKeySpec:
|
|
def test_agent_bare(self):
|
|
assert _is_valid_key_spec("agent") is True
|
|
|
|
def test_agent_with_selector(self):
|
|
assert _is_valid_key_spec("agent:cardno:000012345678") is True
|
|
|
|
def test_absolute_pem_path(self):
|
|
assert _is_valid_key_spec("/home/user/.config/browser-cli/client.key.pem") is True
|
|
|
|
def test_dot_key_extension(self):
|
|
assert _is_valid_key_spec("/tmp/mykey.key") is True
|
|
|
|
def test_angled_bracket_pem_rejected(self):
|
|
# regression: operator precedence bug allowed "<garbage>.pem" to pass
|
|
assert _is_valid_key_spec("<garbage>.pem") is False
|
|
|
|
def test_angled_bracket_key_rejected(self):
|
|
assert _is_valid_key_spec("<garbage>.key") is False
|
|
|
|
def test_serialized_object_rejected(self):
|
|
assert _is_valid_key_spec("<AgentKey(blob=b'...', comment='test')>.pem") is False
|
|
|
|
def test_empty_string_rejected(self):
|
|
assert _is_valid_key_spec("") is False
|
|
|
|
def test_bare_filename_no_slash_no_ext_rejected(self):
|
|
assert _is_valid_key_spec("mykey") is False
|