100 lines
3.3 KiB
Python
100 lines
3.3 KiB
Python
"""Runtime registry helpers for active browser-cli native host endpoints."""
|
|
import contextlib
|
|
import json
|
|
import os
|
|
import tempfile
|
|
from pathlib import Path
|
|
from typing import Iterator
|
|
|
|
from browser_cli.platform import registry_path
|
|
|
|
REGISTRY_PATH = registry_path()
|
|
|
|
@contextlib.contextmanager
|
|
def _file_lock(path: Path) -> Iterator[None]:
|
|
"""Best-effort cross-process lock for registry read/modify/write updates."""
|
|
path.parent.mkdir(mode=0o700, parents=True, exist_ok=True)
|
|
lock_path = path.with_suffix(path.suffix + ".lock")
|
|
with lock_path.open("a+") as lock_file:
|
|
if os.name == "nt":
|
|
try:
|
|
import msvcrt
|
|
|
|
msvcrt.locking(lock_file.fileno(), msvcrt.LK_LOCK, 1)
|
|
yield
|
|
finally:
|
|
try:
|
|
msvcrt.locking(lock_file.fileno(), msvcrt.LK_UNLCK, 1)
|
|
except OSError:
|
|
pass
|
|
else:
|
|
try:
|
|
import fcntl
|
|
|
|
fcntl.flock(lock_file.fileno(), fcntl.LOCK_EX)
|
|
yield
|
|
finally:
|
|
try:
|
|
fcntl.flock(lock_file.fileno(), fcntl.LOCK_UN)
|
|
except OSError:
|
|
pass
|
|
|
|
def _coerce_registry(data) -> dict[str, str]:
|
|
if not isinstance(data, dict):
|
|
return {}
|
|
return {str(alias): str(endpoint) for alias, endpoint in data.items() if alias and endpoint}
|
|
|
|
def load_registry(path: Path | None = None) -> dict[str, str]:
|
|
"""Load the active browser registry.
|
|
|
|
Older native hosts wrote this file non-atomically, so tolerate trailing
|
|
garbage from interrupted/concurrent writes and keep the first valid JSON
|
|
object when possible.
|
|
"""
|
|
registry = path or REGISTRY_PATH
|
|
if not registry.exists():
|
|
return {}
|
|
try:
|
|
text = registry.read_text(encoding="utf-8")
|
|
except OSError:
|
|
return {}
|
|
if not text.strip():
|
|
return {}
|
|
try:
|
|
return _coerce_registry(json.loads(text))
|
|
except json.JSONDecodeError:
|
|
try:
|
|
data, _ = json.JSONDecoder().raw_decode(text)
|
|
return _coerce_registry(data)
|
|
except json.JSONDecodeError:
|
|
return {}
|
|
|
|
def save_registry(data: dict[str, str], path: Path | None = None) -> None:
|
|
"""Atomically write the active browser registry."""
|
|
registry = path or REGISTRY_PATH
|
|
registry.parent.mkdir(mode=0o700, parents=True, exist_ok=True)
|
|
payload = json.dumps(_coerce_registry(data), sort_keys=True)
|
|
fd, tmp_name = tempfile.mkstemp(prefix=registry.name + ".", suffix=".tmp", dir=registry.parent)
|
|
try:
|
|
with os.fdopen(fd, "w", encoding="utf-8") as tmp:
|
|
tmp.write(payload)
|
|
tmp.flush()
|
|
os.fsync(tmp.fileno())
|
|
os.replace(tmp_name, registry)
|
|
finally:
|
|
try:
|
|
os.unlink(tmp_name)
|
|
except FileNotFoundError:
|
|
pass
|
|
|
|
def update_registry(alias: str, endpoint: str | None, path: Path | None = None) -> None:
|
|
"""Add/update an alias, or remove it when endpoint is None."""
|
|
registry = path or REGISTRY_PATH
|
|
with _file_lock(registry):
|
|
data = load_registry(registry)
|
|
if endpoint is None:
|
|
data.pop(alias, None)
|
|
else:
|
|
data[alias] = endpoint
|
|
save_registry(data, registry)
|