Files

100 lines
3.3 KiB
Python

"""Runtime registry helpers for active browser-cli native host endpoints."""
import contextlib
import json
import os
import tempfile
from pathlib import Path
from typing import Iterator
from browser_cli.platform import registry_path
REGISTRY_PATH = registry_path()
@contextlib.contextmanager
def _file_lock(path: Path) -> Iterator[None]:
"""Best-effort cross-process lock for registry read/modify/write updates."""
path.parent.mkdir(mode=0o700, parents=True, exist_ok=True)
lock_path = path.with_suffix(path.suffix + ".lock")
with lock_path.open("a+") as lock_file:
if os.name == "nt":
try:
import msvcrt
msvcrt.locking(lock_file.fileno(), msvcrt.LK_LOCK, 1)
yield
finally:
try:
msvcrt.locking(lock_file.fileno(), msvcrt.LK_UNLCK, 1)
except OSError:
pass
else:
try:
import fcntl
fcntl.flock(lock_file.fileno(), fcntl.LOCK_EX)
yield
finally:
try:
fcntl.flock(lock_file.fileno(), fcntl.LOCK_UN)
except OSError:
pass
def _coerce_registry(data) -> dict[str, str]:
if not isinstance(data, dict):
return {}
return {str(alias): str(endpoint) for alias, endpoint in data.items() if alias and endpoint}
def load_registry(path: Path | None = None) -> dict[str, str]:
"""Load the active browser registry.
Older native hosts wrote this file non-atomically, so tolerate trailing
garbage from interrupted/concurrent writes and keep the first valid JSON
object when possible.
"""
registry = path or REGISTRY_PATH
if not registry.exists():
return {}
try:
text = registry.read_text(encoding="utf-8")
except OSError:
return {}
if not text.strip():
return {}
try:
return _coerce_registry(json.loads(text))
except json.JSONDecodeError:
try:
data, _ = json.JSONDecoder().raw_decode(text)
return _coerce_registry(data)
except json.JSONDecodeError:
return {}
def save_registry(data: dict[str, str], path: Path | None = None) -> None:
"""Atomically write the active browser registry."""
registry = path or REGISTRY_PATH
registry.parent.mkdir(mode=0o700, parents=True, exist_ok=True)
payload = json.dumps(_coerce_registry(data), sort_keys=True)
fd, tmp_name = tempfile.mkstemp(prefix=registry.name + ".", suffix=".tmp", dir=registry.parent)
try:
with os.fdopen(fd, "w", encoding="utf-8") as tmp:
tmp.write(payload)
tmp.flush()
os.fsync(tmp.fileno())
os.replace(tmp_name, registry)
finally:
try:
os.unlink(tmp_name)
except FileNotFoundError:
pass
def update_registry(alias: str, endpoint: str | None, path: Path | None = None) -> None:
"""Add/update an alias, or remove it when endpoint is None."""
registry = path or REGISTRY_PATH
with _file_lock(registry):
data = load_registry(registry)
if endpoint is None:
data.pop(alias, None)
else:
data[alias] = endpoint
save_registry(data, registry)