cea8a7e994
- Add the n8n community node package with credentials, command mapping, direct serve TCP client, and browser-cli protocol crypto helpers. - Cover Ed25519 signing, canonical JSON, PQ transport encryption, request mapping, and security behavior with unit tests. - Harden serve-http with per-address rate limiting, an 8 MB request body cap, and clear warnings when binding plain HTTP beyond loopback. - Stop one-shot --key overrides from being persisted automatically; document explicit remote trust and keep key-management behind the keys policy tier. - Make HTML-to-Markdown conversion safer by bounding tree depth and dropping unsafe link/image URL schemes. - Bump package and extension release metadata to 0.16.3.
209 lines
9.5 KiB
Python
209 lines
9.5 KiB
Python
"""Unit tests for serve-side security: per-key policy loading, rate limiting, context."""
|
|
import pytest
|
|
|
|
from browser_cli.auth.keys import (
|
|
_parse_authorized_line,
|
|
format_authorized_line,
|
|
load_authorized_keys_with_names,
|
|
load_authorized_keys_with_policies,
|
|
set_authorized_key_policy,
|
|
)
|
|
from browser_cli.command_security import CommandPolicy, assert_command_allowed
|
|
from browser_cli.serve.security import (
|
|
RateLimiter,
|
|
ServeSecurity,
|
|
key_policies_from_authorized_keys,
|
|
policy_from_categories,
|
|
)
|
|
|
|
# ── policy_from_categories ───────────────────────────────────────────────────────
|
|
|
|
def test_policy_from_categories_all_is_unrestricted():
|
|
assert policy_from_categories(["all"]) == CommandPolicy.unrestricted()
|
|
|
|
def test_policy_from_categories_subset():
|
|
policy = policy_from_categories(["read-page", "control"])
|
|
assert policy == CommandPolicy(allow_read_page=True, allow_control=True)
|
|
assert policy.allow_dangerous is False
|
|
|
|
def test_policy_from_categories_safe_and_empty_are_noops():
|
|
assert policy_from_categories(["safe"]) == CommandPolicy()
|
|
assert policy_from_categories([]) == CommandPolicy()
|
|
|
|
def test_policy_from_categories_rejects_unknown():
|
|
with pytest.raises(ValueError, match="unknown command category"):
|
|
policy_from_categories(["bogus"])
|
|
|
|
def test_policy_from_categories_keys():
|
|
assert policy_from_categories(["keys"]) == CommandPolicy(allow_keys=True)
|
|
|
|
# ── keys category gating ─────────────────────────────────────────────────────────
|
|
|
|
def test_key_commands_are_keys_category():
|
|
from browser_cli.command_security import command_category
|
|
assert command_category("browser-cli.auth.keys") == "keys"
|
|
assert command_category("browser-cli.auth.trust") == "keys"
|
|
assert command_category("browser-cli.auth.policy") == "keys"
|
|
assert command_category("browser-cli.targets") == "safe" # discovery stays open
|
|
|
|
def test_key_commands_blocked_without_allow_keys():
|
|
for cmd in ("browser-cli.auth.keys", "browser-cli.auth.trust", "browser-cli.auth.policy"):
|
|
with pytest.raises(PermissionError):
|
|
assert_command_allowed(cmd, CommandPolicy()) # safe-only default
|
|
assert_command_allowed(cmd, CommandPolicy(allow_keys=True)) # explicit grant
|
|
assert_command_allowed(cmd, CommandPolicy.unrestricted()) # all includes keys
|
|
|
|
def test_full_control_still_cannot_manage_keys():
|
|
"""A key with control+dangerous (but not keys) cannot list/trust keys."""
|
|
policy = CommandPolicy(allow_read_page=True, allow_control=True, allow_dangerous=True)
|
|
with pytest.raises(PermissionError):
|
|
assert_command_allowed("browser-cli.auth.trust", policy)
|
|
|
|
# ── set_authorized_key_policy ────────────────────────────────────────────────────
|
|
|
|
def test_set_policy_updates_by_pubkey(tmp_path):
|
|
path = tmp_path / "authorized_keys"
|
|
pub = "a" * 64
|
|
path.write_text(f"{pub} laptop\n")
|
|
assert set_authorized_key_policy(path, pub, ["control"]) == (pub, "laptop")
|
|
assert load_authorized_keys_with_policies(path) == [(pub, "laptop", ["control"])]
|
|
|
|
def test_set_policy_by_name_and_remove_with_none(tmp_path):
|
|
path = tmp_path / "authorized_keys"
|
|
pub = "b" * 64
|
|
path.write_text(f"{pub} ci-bot allow:all\n")
|
|
assert set_authorized_key_policy(path, "ci-bot", None) == (pub, "ci-bot") # remove token
|
|
assert load_authorized_keys_with_policies(path) == [(pub, "ci-bot", None)]
|
|
|
|
def test_set_policy_safe_only_writes_empty_token(tmp_path):
|
|
path = tmp_path / "authorized_keys"
|
|
pub = "c" * 64
|
|
path.write_text(f"{pub} reader\n")
|
|
set_authorized_key_policy(path, pub, [])
|
|
assert path.read_text().strip() == f"{pub} reader allow:"
|
|
|
|
def test_set_policy_not_found_returns_none(tmp_path):
|
|
path = tmp_path / "authorized_keys"
|
|
path.write_text(f"{'a' * 64} laptop\n")
|
|
assert set_authorized_key_policy(path, "nonexistent", ["control"]) is None
|
|
|
|
def test_set_policy_ambiguous_name_raises(tmp_path):
|
|
path = tmp_path / "authorized_keys"
|
|
path.write_text(f"{'a' * 64} dup\n{'b' * 64} dup\n")
|
|
with pytest.raises(ValueError, match="ambiguous"):
|
|
set_authorized_key_policy(path, "dup", ["control"])
|
|
|
|
def test_set_policy_preserves_other_lines(tmp_path):
|
|
path = tmp_path / "authorized_keys"
|
|
a, b = "a" * 64, "b" * 64
|
|
path.write_text(f"{a} first\n{b} second allow:read-page\n")
|
|
set_authorized_key_policy(path, a, ["control"])
|
|
assert load_authorized_keys_with_policies(path) == [
|
|
(a, "first", ["control"]),
|
|
(b, "second", ["read-page"]), # untouched
|
|
]
|
|
|
|
# ── authorized_keys line parsing ─────────────────────────────────────────────────
|
|
|
|
def test_parse_line_pubkey_only():
|
|
assert _parse_authorized_line("abc123") == ("abc123", "", None)
|
|
|
|
def test_parse_line_name_with_spaces_no_policy():
|
|
# A multi-word name (e.g. "YubiKey 5C NFC FIPS") must stay intact, policy None.
|
|
assert _parse_authorized_line("abc YubiKey 5C NFC FIPS") == ("abc", "YubiKey 5C NFC FIPS", None)
|
|
|
|
def test_parse_line_name_with_spaces_and_policy():
|
|
pub, name, cats = _parse_authorized_line("abc YubiKey 5C NFC FIPS allow:read-page,control")
|
|
assert pub == "abc"
|
|
assert name == "YubiKey 5C NFC FIPS" # allow: token stripped out of the name
|
|
assert cats == ["read-page", "control"]
|
|
|
|
def test_parse_line_empty_allow_is_safe():
|
|
assert _parse_authorized_line("abc name allow:") == ("abc", "name", [])
|
|
|
|
def test_parse_line_skips_comments_and_blanks():
|
|
assert _parse_authorized_line("# comment") is None
|
|
assert _parse_authorized_line(" ") is None
|
|
|
|
def test_format_authorized_line_roundtrips():
|
|
line = format_authorized_line("abc", "my laptop", ["read-page", "control"])
|
|
assert line == "abc my laptop allow:read-page,control"
|
|
assert _parse_authorized_line(line) == ("abc", "my laptop", ["read-page", "control"])
|
|
# No categories → no allow token.
|
|
assert format_authorized_line("abc", "laptop") == "abc laptop"
|
|
|
|
# ── key_policies_from_authorized_keys ────────────────────────────────────────────
|
|
|
|
def test_key_policies_from_authorized_keys(tmp_path):
|
|
path = tmp_path / "authorized_keys"
|
|
path.write_text(
|
|
"AABBCC laptop allow:all\n"
|
|
"ddee01 ci-bot allow:read-page,control\n"
|
|
"112233 readonly\n" # no allow token → no override entry
|
|
)
|
|
policies = key_policies_from_authorized_keys(path)
|
|
assert policies["aabbcc"] == CommandPolicy.unrestricted() # normalised to lowercase
|
|
assert policies["ddee01"] == CommandPolicy(allow_read_page=True, allow_control=True)
|
|
assert "112233" not in policies # falls back to server default
|
|
|
|
def test_key_policies_none_returns_empty():
|
|
assert key_policies_from_authorized_keys(None) == {}
|
|
|
|
def test_key_policies_rejects_unknown_category(tmp_path):
|
|
path = tmp_path / "authorized_keys"
|
|
path.write_text("abc name allow:bogus\n")
|
|
with pytest.raises(ValueError, match="unknown command category"):
|
|
key_policies_from_authorized_keys(path)
|
|
|
|
def test_load_with_names_ignores_allow_token(tmp_path):
|
|
path = tmp_path / "authorized_keys"
|
|
path.write_text("abc my laptop allow:control\n")
|
|
assert load_authorized_keys_with_names(path) == [("abc", "my laptop")]
|
|
assert load_authorized_keys_with_policies(path) == [("abc", "my laptop", ["control"])]
|
|
|
|
# ── RateLimiter ──────────────────────────────────────────────────────────────────
|
|
|
|
def test_rate_limiter_zero_rate_never_limits():
|
|
limiter = RateLimiter(rate=0)
|
|
assert all(limiter.allow("k") for _ in range(1000))
|
|
|
|
def test_rate_limiter_burst_then_block():
|
|
limiter = RateLimiter(rate=0.0001, burst=3)
|
|
assert limiter.allow("k") is True
|
|
assert limiter.allow("k") is True
|
|
assert limiter.allow("k") is True
|
|
assert limiter.allow("k") is False # bucket drained, refill negligible
|
|
|
|
def test_rate_limiter_is_per_key():
|
|
limiter = RateLimiter(rate=0.0001, burst=1)
|
|
assert limiter.allow("a") is True
|
|
assert limiter.allow("b") is True # different key has its own bucket
|
|
assert limiter.allow("a") is False
|
|
assert limiter.allow("b") is False
|
|
|
|
# ── ServeSecurity ────────────────────────────────────────────────────────────────
|
|
|
|
def test_effective_policy_prefers_per_key_override():
|
|
sec = ServeSecurity(
|
|
policy=CommandPolicy.unrestricted(),
|
|
key_policies={"abc": CommandPolicy()},
|
|
)
|
|
assert sec.effective_policy("abc") == CommandPolicy() # override
|
|
assert sec.effective_policy("other") == CommandPolicy.unrestricted() # default
|
|
assert sec.effective_policy(None) == CommandPolicy.unrestricted()
|
|
# And the override actually gates a dangerous command:
|
|
with pytest.raises(PermissionError):
|
|
assert_command_allowed("dom.eval", sec.effective_policy("abc"))
|
|
|
|
def test_label_for_renders_name_and_short_pubkey():
|
|
sec = ServeSecurity(key_names={"ab12cd34ef": "laptop"})
|
|
assert sec.label_for("ab12cd34ef") == "laptop ab12cd34…"
|
|
assert sec.label_for("ffeeddccbb") == "ffeeddcc…" # unknown key → short pubkey only
|
|
assert sec.label_for(None) is None
|
|
|
|
def test_serve_security_defaults_are_safe():
|
|
sec = ServeSecurity()
|
|
assert sec.key_policies == {}
|
|
assert sec.key_names == {}
|
|
assert sec.rate_limiter is None
|