Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions docs/release-notes/parallel-h5-read.feat.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
<!--
Rename this file to <PR_NUMBER>.feat.md when the PR is opened (towncrier
issue_format expects a numeric id). For example: 2463.feat.md
-->
Multi-threaded HDF5 reads in ``read_h5ad``. Chunked datasets
compressed with **gzip / deflate**, **zstd**, or **blosc** (any inner
codec — lz4, zstd, blosclz, snappy) are now read into memory in
parallel: chunk locations are enumerated via
``Dataset.id.chunk_iter``, adjacent chunks are coalesced into on-disk
extents, and worker threads each ``pread()`` a sub-extent (bypassing
h5py's process-global lock) and decompress their chunks into disjoint
regions of the output array using GIL-releasing decoders from
:mod:`numcodecs`.

Bit-identical to the serial path; silently falls back to serial for
ineligible datasets (unsupported or multi-filter pipeline, unchunked,
partially written, smaller than ``parallel_h5_read_min_mb``, non-disk
file driver, missing ``chunk_iter`` on h5py < 3.8) or on any
unexpected error.

Controlled by ``settings.parallel_h5_read`` (default ``True``),
``settings.parallel_h5_read_min_mb`` (default ``64``), and
``settings.parallel_h5_read_workers`` (default ``min(cpu_count, 16)``).
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ dependencies = [
"array-api-compat>=1.7.1",
"legacy-api-wrap",
"zarr >=3.1",
"numcodecs>=0.14",
"typing-extensions; python_version<'3.13'",
"scverse-misc[settings]>=0.0.7",
]
Expand Down Expand Up @@ -105,6 +106,7 @@ test-min = [
"awkward>=2.6.3",
"pyarrow",
"pooch",
"hdf5plugin", # registers HDF5 zstd/blosc filters for parallel-read tests
"anndata[dask]",
]
test-array-api = [
Expand Down
22 changes: 20 additions & 2 deletions src/anndata/_core/sparse_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,24 @@ def is_sparse_indexing_overridden(
)


def _read_1d_full(arr: h5py.Dataset | zarr.Array) -> np.ndarray:
"""Materialise a 1-D backed array into memory.

For ``h5py.Dataset`` we attempt the parallel chunk-decompression path
(see :mod:`anndata._io.h5_parallel`); it silently returns ``None`` when
the dataset is ineligible (unsupported codec, too small, unchunked,
multi-dimensional, ...), in which case — and for zarr arrays — we fall
through to the existing serial read. Bit-identical to ``arr[...]``.
"""
if isinstance(arr, h5py.Dataset):
from .._io.h5_parallel import parallel_read_full

out = parallel_read_full(arr)
if out is not None:
return out
return arr[...]


class BaseCompressedSparseDataset[GroupT: _GroupStorageType, ArrayT: _ArrayStorageType](
abc._AbstractCSDataset, ABC
):
Expand Down Expand Up @@ -580,8 +598,8 @@ def to_memory(self) -> SparseMatrixType:
shape=self.shape,
)
mtx = backed_class.memory_format(self.shape, dtype=self.dtype)
mtx.data = self._data[...]
mtx.indices = self._indices[...]
mtx.data = _read_1d_full(self._data)
mtx.indices = _read_1d_full(self._indices)
mtx.indptr = self._indptr
return mtx

Expand Down
54 changes: 54 additions & 0 deletions src/anndata/_io/_pool.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
"""Generic, process-global thread-pool management for IO-side parallelism."""

from __future__ import annotations

import threading
from concurrent.futures import ThreadPoolExecutor
from typing import TYPE_CHECKING

if TYPE_CHECKING:
from collections.abc import Callable, Sequence
from concurrent.futures import Future
from typing import TypeVar

R = TypeVar("R")


class PoolManager:
def __init__(self, *, thread_name_prefix: str) -> None:
self._lock = threading.Lock()
self._pool: ThreadPoolExecutor | None = None
self._n_workers: int = 0
self._thread_name_prefix = thread_name_prefix

def distribute_tasks_to_threads(
self,
n_workers: int,
fn: Callable[..., R],
tasks: Sequence[tuple],
) -> list[Future[R]]:
"""Submit one ``fn(*t)`` call per ``t`` of ``tasks`` to a
pool of ``n_workers`` threads, returning the resulting Futures in
input order.
"""
with self._lock:
self._ensure_pool_locked(n_workers)
assert self._pool is not None # for type checkers
return [self._pool.submit(fn, *task) for task in tasks]

def reset(self) -> None:
with self._lock:
if self._pool is not None:
self._pool.shutdown(wait=True)
self._pool = None
self._n_workers = 0

def _ensure_pool_locked(self, n_workers: int) -> None:
if self._pool is not None and self._n_workers == n_workers:
return
if self._pool is not None:
self._pool.shutdown(wait=False, cancel_futures=False)
self._pool = ThreadPoolExecutor(
max_workers=n_workers, thread_name_prefix=self._thread_name_prefix
)
self._n_workers = n_workers
Loading
Loading