From 67e2b37a1c80844ea8ce84769f628f5edc1700ff Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jakub=20Piotr=20C=C5=82apa?= Date: Fri, 6 Feb 2026 13:22:09 +0000 Subject: [PATCH 1/2] WSSample: always validate that keys match across subdirs --- wsds/ws_sample.py | 42 +++++++++++++++++++++++++++++++++++++++++- 1 file changed, 41 insertions(+), 1 deletion(-) diff --git a/wsds/ws_sample.py b/wsds/ws_sample.py index 5d76f8b..70eb2cf 100644 --- a/wsds/ws_sample.py +++ b/wsds/ws_sample.py @@ -7,12 +7,15 @@ from .ws_dataset import WSDataset -@dataclass(frozen=True, slots=True) +@dataclass(frozen=True) class WSSample: dataset: "WSDataset" shard_name: str offset: int overrides: dict = field(default_factory=dict) + # Key verification state (mutable containers to work with frozen dataclass) + _verified_subdirs: set = field(default_factory=set, repr=False, compare=False) + _reference_key: list = field(default_factory=list, repr=False, compare=False) def get_audio(self, audio_columns=None): candidates = audio_columns or self.dataset._audio_file_keys @@ -33,9 +36,46 @@ def items(self): def values(self): yield from (v for _, v in self.items()) + def _verify_key_for_field(self, field: str): + """Verify __key__ in this field's subdir matches the reference key.""" + value = self.dataset.fields.get(field) + if value is None: + return + (subdir, _column) = value[0] + + if subdir in self._verified_subdirs: + return + + # Skip computed columns (they don't have their own __key__) + if subdir in self.dataset.computed_columns: + self._verified_subdirs.add(subdir) + return + + # Get __key__ from this subdir + try: + key = self.dataset.get_shard(subdir, self.shard_name).get_sample("__key__", self.offset) + except (WSShardMissingError, KeyError): + # Can't verify if shard or key is missing + self._verified_subdirs.add(subdir) + return + + if not self._reference_key: + # First subdir accessed - store as reference + self._reference_key.append((subdir, key)) + else: + ref_subdir, ref_key = self._reference_key[0] + if key != ref_key: + raise ValueError( + f"Key mismatch at offset {self.offset} in shard {self.shard_name}: " + f"{ref_subdir} has '{ref_key}' but {subdir} has '{key}'" + ) + + self._verified_subdirs.add(subdir) + def __getitem__(self, field): if field in self.overrides: return self.overrides[field] + self._verify_key_for_field(field) return self.dataset.get_sample(self.shard_name, field, self.offset) def __setitem__(self, field, value): From da238e8eab297332935e162f6df38ad055eb1e8d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jakub=20Piotr=20C=C5=82apa?= Date: Fri, 13 Mar 2026 18:50:35 +0100 Subject: [PATCH 2/2] WSSample: print fields with missing shards last (#37) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * WSSample: print fields with missing shards last * WSAudio: disable slots because it breaks code auto-reload (#38) * WSAudio: disable slots because it breaks code auto-reload * Added WSModalShard (#41) * Extract audio codec layer from ws_audio.py into audio_codec.py Separates codec concerns (decoder backends, encoder, format utils) from the data model layer (AudioReader, WSAudio) for better reusability and testability. Co-Authored-By: Claude Opus 4.6 * Add ModalFileReader for Modal Volume range requests Co-Authored-By: Claude Opus 4.6 * Centralize binary column decoding into ws_decode module Extract duplicated npy/pyd/txt/audio decode logic from WSShard and WSS3Shard into a shared decode_sample() function. Dispatch is now based on column type (binary) rather than column-name heuristics. Co-Authored-By: Claude Opus 4.6 * Added WSModalShard * Big renaming and cleanups (#45) * Move index SQL queries from WSDataset into WSIndex - Add old/new index format detection (partition vs dataset_path columns) - Add _partition_col property for unified SQL partition expression - Add lookup_by_index() and lookup_by_key() methods to WSIndex - Add shard_n_samples() and shard_global_offset() via _query_shard() helper - Simplify WSDataset.__getitem__ to delegate to WSIndex lookups - Replace all raw index.query() calls in WSDataset with WSIndex methods - Update ws_tools.py to use new format detection * Fix env var name in README: WSDS_DATASET_PATH → WSDS_DATASET_SEARCH_PATH Co-Authored-By: Claude Opus 4.6 * Move is_notebook() to utils, guard _ipython_display_ for terminal use - Extract is_notebook() from convplayer.py into utils.py (simplified) - Remove redundant _ipython_display_ from AudioReader and WSAudio (IPython already calls _repr_html_ automatically) - Add is_notebook() guard to WSDataset and WSSample _ipython_display_ so they fall back to print() in terminal IPython sessions Co-Authored-By: Claude Opus 4.6 * Improve naming consistency across codebase - subdir → column_dir (utils, ws_sample, ws_modal_shard) - shard_name → shard_ref on shard interfaces and WSSample - dataset_path → partition in index and shard code - dataset_dir → dataset_root in ws_indexer Co-Authored-By: Claude Opus 4.6 * Add get_audio() helper in ws_decode, use in WSSample Centralizes audio column lookup logic so it can be reused outside of WSSample (e.g. from plain dicts or other sample types). Co-Authored-By: Claude Opus 4.6 * Add rng parameter to WSDataset for reproducible sampling Allows passing rng=42 (or a Random instance) to get deterministic sample ordering in random_sample() and sql_select(). Also removes unused needs_key variable. Co-Authored-By: Claude Opus 4.6 * Fix hume_wsds module path remapping and sql_filter pl.first() usage - Remap hume_wsds.* loader paths to wsds.* for backward compatibility with old index files that reference the former package name - Use pl.first() instead of exprs[0] in sql_filter for correctness Co-Authored-By: Claude Opus 4.6 * Update module docstring with rich examples and fix doctests - Add comprehensive docstring to __init__.py with working doctests showcasing SQL queries, random access, lazy loading, and audio - Fix ws_dataset.py doctests (AudioReader src type, add shard_subsample=1) - Fix ws_sink.py doctest (remove invalid batch_size param) - Update tests.py to run wsds module doctests and fix imports Co-Authored-By: Claude Opus 4.6 * Apply ruff formatting Co-Authored-By: Claude Opus 4.6 * Moved the library showcase to README.md * Use shard ref --------- Co-authored-by: Claude Opus 4.6 Co-authored-by: Shahbaz Mogal --------- Co-authored-by: Claude Opus 4.6 Co-authored-by: Shahbaz Mogal --------- Co-authored-by: Claude Opus 4.6 Co-authored-by: Shahbaz Mogal --------- Co-authored-by: Claude Opus 4.6 Co-authored-by: Shahbaz Mogal --- README.md | 82 +++++++++-- tests.py | 12 +- wsds/__init__.py | 8 +- wsds/audio_codec.py | 251 +++++++++++++++++++++++++++++++ wsds/convplayer.py | 121 ++++++++------- wsds/pupyarrow/file_reader.py | 116 +++++++++++++++ wsds/utils.py | 56 ++++--- wsds/ws_audio.py | 171 +++++---------------- wsds/ws_dataset.py | 270 +++++++++++++++++----------------- wsds/ws_decode.py | 56 +++++++ wsds/ws_feather_index.py | 39 +++-- wsds/ws_index.py | 104 +++++++++++-- wsds/ws_indexer.py | 176 +++++++++++----------- wsds/ws_modal_shard.py | 116 +++++++++++++++ wsds/ws_s3_shard.py | 38 ++--- wsds/ws_sample.py | 130 ++++++++-------- wsds/ws_shard.py | 53 +++---- wsds/ws_sink.py | 4 +- wsds/ws_tools.py | 50 ++++--- 19 files changed, 1215 insertions(+), 638 deletions(-) create mode 100644 wsds/audio_codec.py create mode 100644 wsds/ws_decode.py create mode 100644 wsds/ws_modal_shard.py diff --git a/README.md b/README.md index 8ed0c84..1895568 100644 --- a/README.md +++ b/README.md @@ -1,27 +1,77 @@ -# WSDS +## wsds — Web-Scale DataSets -wsds merges SQL querying capabilities with native support for multimodal data (speech and video) in a single data -format and a unified API. It uses shards for efficiency and to support very-scalable parallel data processing. +**wsds** is a multimodal dataset library that combines the power of SQL querying with native support for speech, audio, and video data. Built for large-scale machine learning workflows, it lets you work with massive datasets efficiently, regardless of where you store your data (SSDs, HDDs, Weka, S3). -wsds has a powerful database query engine integrated into it (built on top of Polars). This makes database-style -operations like duplicate detection, group by operations and aggregations very fast and easy to write. -This tight integration let's you run both SQL queries and efficient dataloaders directly on your data without any -conversion or importing. +```pycon +>>> from wsds import WSDataset +>>> dataset = WSDataset("librilight/v3-vad_ws") +>>> print(str(dataset)) +WSDataset('librilight/v3-vad_ws', segmented=True) + Audio duration: 52.69 k hours + Speech duration: 47.44 k hours + Number of shards: 623 + Number of samples: 22 662 659 + -## Getting Started +``` + +### Quick start ```bash -# create environment -conda create -n wsds python=3.10 -conda activate wsds +pip install git+https://github.com/HumeAI/wsds.git +``` + +- **SQL Queries on Sharded Data** — Filter and select across your entire dataset using familiar SQL syntax, powered by Polars. Only the columns and shards you need are loaded. + +```pycon +>>> dataset.sql_select('`transcription_wslang_raw.txt`', 'snr', 'tend - tstart as duration') +INFO: to speed things up wsds is loading a random 24.08% subset of the shards, pass shard_subsample=1 to force it to load the whole dataset +shape: (5_271_939, 3) +┌─────────────────────────────────┬──────────┬───────────┐ +│ transcription_wslang_raw.txt ┆ snr ┆ duration │ +│ --- ┆ --- ┆ --- │ +│ str ┆ f16 ┆ f32 │ +╞═════════════════════════════════╪══════════╪═══════════╡ +│ This is a liberal box recordi… ┆ 70.0625 ┆ 1.331058 │ +│ or liberty box recordings dur… ┆ 66.25 ┆ 1.962457 │ +│ For more information or to vo… ┆ 65.6875 ┆ 3.276451 │ +│ The Elder Eddas of Semen-Sekh… ┆ 51.09375 ┆ 4.863482 │ +│ Translated by Erasmus B. Ande… ┆ 70.1875 ┆ 1.843002 │ +│ … ┆ … ┆ … │ +│ I stared about me. ┆ 66.1875 ┆ 1.433472 │ +│ and then pointing to the huge… ┆ 64.75 ┆ 3.703003 │ +│ It was there. Where it is now… ┆ 73.75 ┆ 3.651855 │ +│ He shrugged his shoulders, to… ┆ 65.0 ┆ 9.4198 │ +│ the first chance, and he made… ┆ 62.0 ┆ 11.501709 │ +└─────────────────────────────────┴──────────┴───────────┘ + +``` + +- **Random Access & Indexing** — Optional SQLite-based indexing enables fast random access by key or integer index across shards. + +```pycon +>>> x = dataset['large/1259/lettersofjaneausten_etk_librivox_64kb_mp3/lettersofjaneausten_22_austen_64kb_032'] -# install hume_wsds -pip install https://github.com/HumeAI/wsds.git ``` -## Tests +- **Lazy, On-Demand Loading** — Samples are dict-like objects that load fields only when accessed, keeping memory usage minimal even for terabyte-scale datasets. + +```pycon +>>> x['transcription_wslang_raw.txt'], x['dbu'] +(' The Sherers, I believe, are now really going to go. Joseph has had a bed here the last two nights, and I do not know whether this is not the day of moving. Mrs. Sherer called yesterday to take leave. The weather looks worse again.', -26.34375) -To run tests you currently need a copy of the `librilight` dataset. The tests can be run with: ``` -WSDS_DATASET_PATH=/path/to/the/librilight/folder python tests.py + +- **Native Audio & Multimodal Support** — First-class handling of speech and audio data, including segmented datasets with voice activity detection and computed columns that reference source audio. + +```pycon +>>> x['audio'] +WSAudio(audio_reader=AudioReader(src=, sample_rate=None), tstart=614.46246, tend=627.3976) + ``` + +- **Sharded Architecture** — Data is stored in `.wsds` files (PyArrow IPC format) organized by column type into subdirectories, enabling efficient columnar access patterns. + +- **Atomic Writes** — The `WSSink` context manager provides safe, batched, compressed writes with atomic commit semantics. + +- **Flexible Data Linking** — Computed columns and `.wsds-link` files let you compose datasets without duplicating data, referencing columns across dataset boundaries. diff --git a/tests.py b/tests.py index c8cf3d2..39a9acb 100644 --- a/tests.py +++ b/tests.py @@ -1,12 +1,18 @@ -import unittest import doctest -from . import ws_dataset, ws_shard, ws_sink +import unittest + +import wsds +from wsds import ws_dataset, ws_shard, ws_sink + def load_tests(loader, tests, ignore): + tests.addTests(doctest.DocTestSuite(wsds)) tests.addTests(doctest.DocTestSuite(ws_dataset)) tests.addTests(doctest.DocTestSuite(ws_shard)) tests.addTests(doctest.DocTestSuite(ws_sink)) + tests.addTests(doctest.DocFileSuite("README.md")) return tests -if __name__ == '__main__': + +if __name__ == "__main__": unittest.main() diff --git a/wsds/__init__.py b/wsds/__init__.py index fc9a1a0..b959670 100644 --- a/wsds/__init__.py +++ b/wsds/__init__.py @@ -1,11 +1,7 @@ """ -# wsds dataset library -Usage example: ->>> from wsds import WSDataset ->>> dataset = WSDataset("librilight/v3-vad_ws") ->>> for sample in dataset.random_samples(5): ->>> print(sample['__key__'], sample['txt']) +.. include:: ../README.md +.. include:: ../docs/dataset-structure.md """ diff --git a/wsds/audio_codec.py b/wsds/audio_codec.py new file mode 100644 index 0000000..2c64c24 --- /dev/null +++ b/wsds/audio_codec.py @@ -0,0 +1,251 @@ +"""Audio codec layer: encoding, decoding, and format utilities. + +This module contains all audio encoding/decoding logic, separated from the +data model layer in ws_audio.py. It provides: +- Decoder backends (TorchFFmpegAudioDecoder, CompatAudioDecoder) +- A factory for creating decoders with automatic backend selection +- MP3 encoding with multi-backend fallback +- HTML audio rendering utility +""" + +from __future__ import annotations + +import io +import typing + +import pyarrow as pa + + +def to_filelike(src: typing.Any) -> typing.BinaryIO: + """Coerces files, byte-strings and PyArrow binary buffers into file-like objects.""" + if hasattr(src, "read"): # an open file + return src + # if not an open file then we assume some kind of binary data in memory + if hasattr(src, "as_buffer"): # PyArrow binary data + return pa.BufferReader(src.as_buffer()) + return io.BytesIO(src) + + +class TorchFFmpegAudioDecoder: + def __init__(self, src, sample_rate): + from torchffmpeg import MediaDecoder + + if hasattr(src, "_optimal_read_size"): + buffer_size = src._optimal_read_size + else: + buffer_size = 128 * 1024 + self.src = src + self.reader = MediaDecoder(to_filelike(self.src), buffer_size=buffer_size) + self.metadata = self.reader.get_src_stream_info(self.reader.default_audio_stream) + + if sample_rate is None: + sample_rate = int(self.metadata.sample_rate) + + self.sample_rate = sample_rate + + self.reader.add_basic_audio_stream( + frames_per_chunk=int(32 * sample_rate), + sample_rate=sample_rate, + decoder_option={"threads": "4", "thread_type": "frame"}, + ) + + def get_samples_played_in_range(self, tstart=0, tend=None): + import torch + + self.reader.seek(max(0, tstart - 1), "key") + + if tend is None: + chunks = [] + more_data = True + while more_data: + if self.reader.fill_buffer() == 1: + more_data = False + (chunk,) = self.reader.pop_chunks() + if chunk is not None: + chunks.append(chunk) + prefix = int((tstart - chunks[0].pts) * self.sample_rate) + if prefix < 0: + prefix = 0 + return torch.cat(chunks)[prefix:].mT + + self.reader.fill_buffer() + (chunk,) = self.reader.pop_chunks() + prefix = int((tstart - chunk.pts) * self.sample_rate) + if prefix < 0: + prefix = 0 + if tend: + samples = chunk[prefix : prefix + int((tend - tstart) * self.sample_rate)].mT + else: + samples = chunk[prefix:].mT + while chunk is not None: + (chunk,) = self.reader.pop_chunks() + return samples + + +class CompatAudioDecoder: + def __init__(self, src, sample_rate): + import torchaudio + + if not hasattr(torchaudio, "io"): + raise ImportError("You need either torchaudio<2.9 or torchcodec installed") + self.src = src + if hasattr(src, "_optimal_read_size"): + buffer_size = src._optimal_read_size + else: + buffer_size = 128 * 1024 + self.reader = torchaudio.io.StreamReader(src=to_filelike(self.src), buffer_size=buffer_size) + self.metadata = self.reader.get_src_stream_info(0) + + if sample_rate is None: + sample_rate = self.metadata.sample_rate + + self.sample_rate = sample_rate + + # fetch 32 seconds because we likely need 30s at maximum but the seeking may be imprecise (and we seek 1s early) + # FIXME: check if we can get away with some better settings here (-1, maybe 10s + concatenate the chunks in a loop) + self.reader.add_basic_audio_stream( + frames_per_chunk=int(32 * sample_rate), + sample_rate=sample_rate, + decoder_option={"threads": "4", "thread_type": "frame"}, + ) + + def get_samples_played_in_range(self, tstart=0, tend=None): + # rought seek + self.reader.seek(max(0, tstart - 1), "key") + + if tend is None: + import torch + + chunks = [] + more_data = True + while more_data: + if self.reader.fill_buffer() == 1: + more_data = False + (chunk,) = self.reader.pop_chunks() + chunks.append(chunk) + prefix = int((tstart - chunks[0].pts) * self.sample_rate) + if prefix < 0: + prefix = 0 + return torch.cat(chunks)[prefix:].mT + + self.reader.fill_buffer() + (chunk,) = self.reader.pop_chunks() + # tight crop (seems accurate down to 1 sample in my tests) + prefix = int((tstart - chunk.pts) * self.sample_rate) + if prefix < 0: + prefix = 0 + if tend: + samples = chunk[prefix : prefix + int((tend - tstart) * self.sample_rate)].mT + else: + samples = chunk[prefix:].mT + # clear out any remaining data + while chunk is not None: + (chunk,) = self.reader.pop_chunks() + return samples + + +def create_decoder(src, sample_rate=None): + """Factory: tries torchffmpeg -> torchcodec -> torchaudio, returns a decoder instance. + + Args: + src: A file-like object or bytes-like source for audio data. + sample_rate: Optional target sample rate for resampling. + + Returns: + A decoder instance with .metadata, .sample_rate, and .get_samples_played_in_range() interface. + """ + try: + from torchffmpeg import MediaDecoder as _ # noqa: F401 + + AudioDecoder = TorchFFmpegAudioDecoder + except ImportError: + try: + from torchcodec.decoders import AudioDecoder + except ImportError: + AudioDecoder = CompatAudioDecoder + + return AudioDecoder(src, sample_rate=sample_rate) + + +def decode_segment(src, start=0, end=None, sample_rate=None): + """One-shot decode: creates decoder, reads segment, returns tensor with .sample_rate attr. + + Handles MP3 skip_samples compensation automatically. + + Args: + src: Audio source (file-like, bytes, or PyArrow buffer). + start: Start time in seconds. + end: End time in seconds (None for rest of file). + sample_rate: Optional target sample rate. + + Returns: + A torch.Tensor with a .sample_rate attribute. + """ + filelike = to_filelike(src) + decoder = create_decoder(filelike, sample_rate) + + skip_samples = 0 + if decoder.metadata.codec == "mp3": + skip_samples = 1105 + + if sample_rate is None: + sample_rate = decoder.metadata.sample_rate + + seek_adjustment = skip_samples / sample_rate if start > 0 else 0 + samples = decoder.get_samples_played_in_range( + start + seek_adjustment, end + seek_adjustment if end is not None else None + ) + if hasattr(samples, "data"): + samples = samples.data + samples.sample_rate = sample_rate + return samples + + +def encode_mp3(samples) -> bytes: + """Encode a torch tensor to MP3 bytes. + + Tries torchffmpeg -> torchcodec -> torchaudio as encoder backends. + + Args: + samples: A torch.Tensor with a .sample_rate attribute. Shape: (channels, frames). + + Returns: + MP3-encoded bytes. + """ + out = io.BytesIO() + try: + from torchffmpeg import MediaEncoder + + sample_rate = int(samples.sample_rate) + # samples is (channels, frames), write_audio_chunk expects (frames, channels) + waveform = samples.mT.float().contiguous() + enc = MediaEncoder(out, "mp3") + enc.add_audio_stream(sample_rate=sample_rate, num_channels=waveform.size(1), format="flt") + with enc.open(): + enc.write_audio_chunk(0, waveform) + except ImportError: + try: + from torchcodec.encoders import AudioEncoder + + AudioEncoder(samples, sample_rate=int(samples.sample_rate)).to_file_like(out, "mp3") + except ImportError: + import torchaudio + + torchaudio.save(out, samples, int(samples.sample_rate), format="mp3") + + return out.getvalue() + + +def audio_to_html(samples) -> str: + """Encode samples to an HTML