from typing import Any, TextIO, cast from datetime import UTC, datetime from pathlib import Path import gzip, glob, json class DatasetIO: @staticmethod def resolve_input_files(input_files: list[str], directory_pattern: str | tuple[str, ...] = ("*.jsonl", "*.jsonl.gz")) -> list[Path]: patterns = ( (directory_pattern,) if isinstance(directory_pattern, str) else directory_pattern ) resolved = [] seen = set() for item in input_files: path = Path(item) if path.is_dir(): for pattern in patterns: for file_path in sorted(path.rglob(pattern)): key = str(file_path.resolve()) if key in seen: continue seen.add(key) resolved.append(file_path) continue if any(ch in item for ch in "*?[]"): for match in sorted(glob.glob(item)): file_path = Path(match) if not file_path.is_file(): continue key = str(file_path.resolve()) if key in seen: continue seen.add(key) resolved.append(file_path) continue if path.is_file(): key = str(path.resolve()) if key in seen: continue seen.add(key) resolved.append(path) return resolved @staticmethod def list_directory_files(input_dir:Path, directory_pattern:str) -> list[Path]: if not input_dir.exists(): return [] return sorted(input_dir.rglob(directory_pattern)) @staticmethod def open_text(path:Path, mode:str="r"): text_mode = mode if "t" in mode else f"{mode}t" open_fn = gzip.open if path.suffix == ".gz" else open return cast(TextIO, open_fn(path, text_mode, encoding="utf-8")) @staticmethod def iter_jsonl_rows(path:Path): with DatasetIO.open_text(path, "r") as handle: for line in handle: if line.strip(): yield json.loads(line) @staticmethod def append_jsonl_row(path:Path, row:dict[str, Any]): with DatasetIO.open_text(path, "a") as raw_handle: handle = cast(TextIO, raw_handle) handle.write(json.dumps(row, ensure_ascii=False) + "\n") @staticmethod def count_jsonl_rows(path:Path) -> int: count = 0 candidates = [path] if path.suffix == ".gz": candidates.append(path.with_suffix("")) else: candidates.append(Path(f"{path}.gz")) seen = set() for candidate in candidates: if candidate in seen: continue seen.add(candidate) if not candidate.exists() or not candidate.is_file(): continue try: with DatasetIO.open_text(candidate, "r") as handle: for line in handle: if line.strip(): count += 1 except (OSError, UnicodeError): continue return count @staticmethod def rotate_and_gzip_if_size_reached(path:Path, max_bytes:int) -> bool: if max_bytes <= 0: return False if not path.exists() or not path.is_file(): return False if path.stat().st_size < max_bytes: return False timestamp = datetime.now(UTC).strftime("%Y%m%d-%H%M%S") if path.suffix == ".jsonl": rotated_name = f"{path.stem}.{timestamp}.jsonl" else: rotated_name = f"{path.name}.{timestamp}" rotated_path = path.with_name(rotated_name) suffix = 1 while rotated_path.exists(): suffix += 1 if path.suffix == ".jsonl": rotated_name = f"{path.stem}.{timestamp}.{suffix}.jsonl" else: rotated_name = f"{path.name}.{timestamp}.{suffix}" rotated_path = path.with_name(rotated_name) path.rename(rotated_path) with rotated_path.open("rb") as src: with gzip.open(f"{rotated_path}.gz", "wb") as dst: dst.writelines(src) rotated_path.unlink() return True