"""Runtime registry helpers for active browser-cli native host endpoints.""" import contextlib import json import os import tempfile from pathlib import Path from typing import Iterator from browser_cli.platform import registry_path REGISTRY_PATH = registry_path() @contextlib.contextmanager def _file_lock(path: Path) -> Iterator[None]: """Best-effort cross-process lock for registry read/modify/write updates.""" path.parent.mkdir(mode=0o700, parents=True, exist_ok=True) lock_path = path.with_suffix(path.suffix + ".lock") with lock_path.open("a+") as lock_file: if os.name == "nt": try: import msvcrt msvcrt.locking(lock_file.fileno(), msvcrt.LK_LOCK, 1) yield finally: try: msvcrt.locking(lock_file.fileno(), msvcrt.LK_UNLCK, 1) except OSError: pass else: try: import fcntl fcntl.flock(lock_file.fileno(), fcntl.LOCK_EX) yield finally: try: fcntl.flock(lock_file.fileno(), fcntl.LOCK_UN) except OSError: pass def _coerce_registry(data) -> dict[str, str]: if not isinstance(data, dict): return {} return {str(alias): str(endpoint) for alias, endpoint in data.items() if alias and endpoint} def load_registry(path: Path | None = None) -> dict[str, str]: """Load the active browser registry. Older native hosts wrote this file non-atomically, so tolerate trailing garbage from interrupted/concurrent writes and keep the first valid JSON object when possible. """ registry = path or REGISTRY_PATH if not registry.exists(): return {} try: text = registry.read_text(encoding="utf-8") except OSError: return {} if not text.strip(): return {} try: return _coerce_registry(json.loads(text)) except json.JSONDecodeError: try: data, _ = json.JSONDecoder().raw_decode(text) return _coerce_registry(data) except json.JSONDecodeError: return {} def save_registry(data: dict[str, str], path: Path | None = None) -> None: """Atomically write the active browser registry.""" registry = path or REGISTRY_PATH registry.parent.mkdir(mode=0o700, parents=True, exist_ok=True) payload = json.dumps(_coerce_registry(data), sort_keys=True) fd, tmp_name = tempfile.mkstemp(prefix=registry.name + ".", suffix=".tmp", dir=registry.parent) try: with os.fdopen(fd, "w", encoding="utf-8") as tmp: tmp.write(payload) tmp.flush() os.fsync(tmp.fileno()) os.replace(tmp_name, registry) finally: try: os.unlink(tmp_name) except FileNotFoundError: pass def update_registry(alias: str, endpoint: str | None, path: Path | None = None) -> None: """Add/update an alias, or remove it when endpoint is None.""" registry = path or REGISTRY_PATH with _file_lock(registry): data = load_registry(registry) if endpoint is None: data.pop(alias, None) else: data[alias] = endpoint save_registry(data, registry)