diff --git a/README.md b/README.md index 8ed0c84..1895568 100644 --- a/README.md +++ b/README.md @@ -1,27 +1,77 @@ -# WSDS +## wsds — Web-Scale DataSets -wsds merges SQL querying capabilities with native support for multimodal data (speech and video) in a single data -format and a unified API. It uses shards for efficiency and to support very-scalable parallel data processing. +**wsds** is a multimodal dataset library that combines the power of SQL querying with native support for speech, audio, and video data. Built for large-scale machine learning workflows, it lets you work with massive datasets efficiently, regardless of where you store your data (SSDs, HDDs, Weka, S3). -wsds has a powerful database query engine integrated into it (built on top of Polars). This makes database-style -operations like duplicate detection, group by operations and aggregations very fast and easy to write. -This tight integration let's you run both SQL queries and efficient dataloaders directly on your data without any -conversion or importing. +```pycon +>>> from wsds import WSDataset +>>> dataset = WSDataset("librilight/v3-vad_ws") +>>> print(str(dataset)) +WSDataset('librilight/v3-vad_ws', segmented=True) + Audio duration: 52.69 k hours + Speech duration: 47.44 k hours + Number of shards: 623 + Number of samples: 22 662 659 + -## Getting Started +``` + +### Quick start ```bash -# create environment -conda create -n wsds python=3.10 -conda activate wsds +pip install git+https://github.com/HumeAI/wsds.git +``` + +- **SQL Queries on Sharded Data** — Filter and select across your entire dataset using familiar SQL syntax, powered by Polars. Only the columns and shards you need are loaded. + +```pycon +>>> dataset.sql_select('`transcription_wslang_raw.txt`', 'snr', 'tend - tstart as duration') +INFO: to speed things up wsds is loading a random 24.08% subset of the shards, pass shard_subsample=1 to force it to load the whole dataset +shape: (5_271_939, 3) +┌─────────────────────────────────┬──────────┬───────────┐ +│ transcription_wslang_raw.txt ┆ snr ┆ duration │ +│ --- ┆ --- ┆ --- │ +│ str ┆ f16 ┆ f32 │ +╞═════════════════════════════════╪══════════╪═══════════╡ +│ This is a liberal box recordi… ┆ 70.0625 ┆ 1.331058 │ +│ or liberty box recordings dur… ┆ 66.25 ┆ 1.962457 │ +│ For more information or to vo… ┆ 65.6875 ┆ 3.276451 │ +│ The Elder Eddas of Semen-Sekh… ┆ 51.09375 ┆ 4.863482 │ +│ Translated by Erasmus B. Ande… ┆ 70.1875 ┆ 1.843002 │ +│ … ┆ … ┆ … │ +│ I stared about me. ┆ 66.1875 ┆ 1.433472 │ +│ and then pointing to the huge… ┆ 64.75 ┆ 3.703003 │ +│ It was there. Where it is now… ┆ 73.75 ┆ 3.651855 │ +│ He shrugged his shoulders, to… ┆ 65.0 ┆ 9.4198 │ +│ the first chance, and he made… ┆ 62.0 ┆ 11.501709 │ +└─────────────────────────────────┴──────────┴───────────┘ + +``` + +- **Random Access & Indexing** — Optional SQLite-based indexing enables fast random access by key or integer index across shards. + +```pycon +>>> x = dataset['large/1259/lettersofjaneausten_etk_librivox_64kb_mp3/lettersofjaneausten_22_austen_64kb_032'] -# install hume_wsds -pip install https://github.com/HumeAI/wsds.git ``` -## Tests +- **Lazy, On-Demand Loading** — Samples are dict-like objects that load fields only when accessed, keeping memory usage minimal even for terabyte-scale datasets. + +```pycon +>>> x['transcription_wslang_raw.txt'], x['dbu'] +(' The Sherers, I believe, are now really going to go. Joseph has had a bed here the last two nights, and I do not know whether this is not the day of moving. Mrs. Sherer called yesterday to take leave. The weather looks worse again.', -26.34375) -To run tests you currently need a copy of the `librilight` dataset. The tests can be run with: ``` -WSDS_DATASET_PATH=/path/to/the/librilight/folder python tests.py + +- **Native Audio & Multimodal Support** — First-class handling of speech and audio data, including segmented datasets with voice activity detection and computed columns that reference source audio. + +```pycon +>>> x['audio'] +WSAudio(audio_reader=AudioReader(src=, sample_rate=None), tstart=614.46246, tend=627.3976) + ``` + +- **Sharded Architecture** — Data is stored in `.wsds` files (PyArrow IPC format) organized by column type into subdirectories, enabling efficient columnar access patterns. + +- **Atomic Writes** — The `WSSink` context manager provides safe, batched, compressed writes with atomic commit semantics. + +- **Flexible Data Linking** — Computed columns and `.wsds-link` files let you compose datasets without duplicating data, referencing columns across dataset boundaries. diff --git a/tests.py b/tests.py index c8cf3d2..39a9acb 100644 --- a/tests.py +++ b/tests.py @@ -1,12 +1,18 @@ -import unittest import doctest -from . import ws_dataset, ws_shard, ws_sink +import unittest + +import wsds +from wsds import ws_dataset, ws_shard, ws_sink + def load_tests(loader, tests, ignore): + tests.addTests(doctest.DocTestSuite(wsds)) tests.addTests(doctest.DocTestSuite(ws_dataset)) tests.addTests(doctest.DocTestSuite(ws_shard)) tests.addTests(doctest.DocTestSuite(ws_sink)) + tests.addTests(doctest.DocFileSuite("README.md")) return tests -if __name__ == '__main__': + +if __name__ == "__main__": unittest.main() diff --git a/wsds/__init__.py b/wsds/__init__.py index fc9a1a0..b959670 100644 --- a/wsds/__init__.py +++ b/wsds/__init__.py @@ -1,11 +1,7 @@ """ -# wsds dataset library -Usage example: ->>> from wsds import WSDataset ->>> dataset = WSDataset("librilight/v3-vad_ws") ->>> for sample in dataset.random_samples(5): ->>> print(sample['__key__'], sample['txt']) +.. include:: ../README.md +.. include:: ../docs/dataset-structure.md """ diff --git a/wsds/audio_codec.py b/wsds/audio_codec.py new file mode 100644 index 0000000..2c64c24 --- /dev/null +++ b/wsds/audio_codec.py @@ -0,0 +1,251 @@ +"""Audio codec layer: encoding, decoding, and format utilities. + +This module contains all audio encoding/decoding logic, separated from the +data model layer in ws_audio.py. It provides: +- Decoder backends (TorchFFmpegAudioDecoder, CompatAudioDecoder) +- A factory for creating decoders with automatic backend selection +- MP3 encoding with multi-backend fallback +- HTML audio rendering utility +""" + +from __future__ import annotations + +import io +import typing + +import pyarrow as pa + + +def to_filelike(src: typing.Any) -> typing.BinaryIO: + """Coerces files, byte-strings and PyArrow binary buffers into file-like objects.""" + if hasattr(src, "read"): # an open file + return src + # if not an open file then we assume some kind of binary data in memory + if hasattr(src, "as_buffer"): # PyArrow binary data + return pa.BufferReader(src.as_buffer()) + return io.BytesIO(src) + + +class TorchFFmpegAudioDecoder: + def __init__(self, src, sample_rate): + from torchffmpeg import MediaDecoder + + if hasattr(src, "_optimal_read_size"): + buffer_size = src._optimal_read_size + else: + buffer_size = 128 * 1024 + self.src = src + self.reader = MediaDecoder(to_filelike(self.src), buffer_size=buffer_size) + self.metadata = self.reader.get_src_stream_info(self.reader.default_audio_stream) + + if sample_rate is None: + sample_rate = int(self.metadata.sample_rate) + + self.sample_rate = sample_rate + + self.reader.add_basic_audio_stream( + frames_per_chunk=int(32 * sample_rate), + sample_rate=sample_rate, + decoder_option={"threads": "4", "thread_type": "frame"}, + ) + + def get_samples_played_in_range(self, tstart=0, tend=None): + import torch + + self.reader.seek(max(0, tstart - 1), "key") + + if tend is None: + chunks = [] + more_data = True + while more_data: + if self.reader.fill_buffer() == 1: + more_data = False + (chunk,) = self.reader.pop_chunks() + if chunk is not None: + chunks.append(chunk) + prefix = int((tstart - chunks[0].pts) * self.sample_rate) + if prefix < 0: + prefix = 0 + return torch.cat(chunks)[prefix:].mT + + self.reader.fill_buffer() + (chunk,) = self.reader.pop_chunks() + prefix = int((tstart - chunk.pts) * self.sample_rate) + if prefix < 0: + prefix = 0 + if tend: + samples = chunk[prefix : prefix + int((tend - tstart) * self.sample_rate)].mT + else: + samples = chunk[prefix:].mT + while chunk is not None: + (chunk,) = self.reader.pop_chunks() + return samples + + +class CompatAudioDecoder: + def __init__(self, src, sample_rate): + import torchaudio + + if not hasattr(torchaudio, "io"): + raise ImportError("You need either torchaudio<2.9 or torchcodec installed") + self.src = src + if hasattr(src, "_optimal_read_size"): + buffer_size = src._optimal_read_size + else: + buffer_size = 128 * 1024 + self.reader = torchaudio.io.StreamReader(src=to_filelike(self.src), buffer_size=buffer_size) + self.metadata = self.reader.get_src_stream_info(0) + + if sample_rate is None: + sample_rate = self.metadata.sample_rate + + self.sample_rate = sample_rate + + # fetch 32 seconds because we likely need 30s at maximum but the seeking may be imprecise (and we seek 1s early) + # FIXME: check if we can get away with some better settings here (-1, maybe 10s + concatenate the chunks in a loop) + self.reader.add_basic_audio_stream( + frames_per_chunk=int(32 * sample_rate), + sample_rate=sample_rate, + decoder_option={"threads": "4", "thread_type": "frame"}, + ) + + def get_samples_played_in_range(self, tstart=0, tend=None): + # rought seek + self.reader.seek(max(0, tstart - 1), "key") + + if tend is None: + import torch + + chunks = [] + more_data = True + while more_data: + if self.reader.fill_buffer() == 1: + more_data = False + (chunk,) = self.reader.pop_chunks() + chunks.append(chunk) + prefix = int((tstart - chunks[0].pts) * self.sample_rate) + if prefix < 0: + prefix = 0 + return torch.cat(chunks)[prefix:].mT + + self.reader.fill_buffer() + (chunk,) = self.reader.pop_chunks() + # tight crop (seems accurate down to 1 sample in my tests) + prefix = int((tstart - chunk.pts) * self.sample_rate) + if prefix < 0: + prefix = 0 + if tend: + samples = chunk[prefix : prefix + int((tend - tstart) * self.sample_rate)].mT + else: + samples = chunk[prefix:].mT + # clear out any remaining data + while chunk is not None: + (chunk,) = self.reader.pop_chunks() + return samples + + +def create_decoder(src, sample_rate=None): + """Factory: tries torchffmpeg -> torchcodec -> torchaudio, returns a decoder instance. + + Args: + src: A file-like object or bytes-like source for audio data. + sample_rate: Optional target sample rate for resampling. + + Returns: + A decoder instance with .metadata, .sample_rate, and .get_samples_played_in_range() interface. + """ + try: + from torchffmpeg import MediaDecoder as _ # noqa: F401 + + AudioDecoder = TorchFFmpegAudioDecoder + except ImportError: + try: + from torchcodec.decoders import AudioDecoder + except ImportError: + AudioDecoder = CompatAudioDecoder + + return AudioDecoder(src, sample_rate=sample_rate) + + +def decode_segment(src, start=0, end=None, sample_rate=None): + """One-shot decode: creates decoder, reads segment, returns tensor with .sample_rate attr. + + Handles MP3 skip_samples compensation automatically. + + Args: + src: Audio source (file-like, bytes, or PyArrow buffer). + start: Start time in seconds. + end: End time in seconds (None for rest of file). + sample_rate: Optional target sample rate. + + Returns: + A torch.Tensor with a .sample_rate attribute. + """ + filelike = to_filelike(src) + decoder = create_decoder(filelike, sample_rate) + + skip_samples = 0 + if decoder.metadata.codec == "mp3": + skip_samples = 1105 + + if sample_rate is None: + sample_rate = decoder.metadata.sample_rate + + seek_adjustment = skip_samples / sample_rate if start > 0 else 0 + samples = decoder.get_samples_played_in_range( + start + seek_adjustment, end + seek_adjustment if end is not None else None + ) + if hasattr(samples, "data"): + samples = samples.data + samples.sample_rate = sample_rate + return samples + + +def encode_mp3(samples) -> bytes: + """Encode a torch tensor to MP3 bytes. + + Tries torchffmpeg -> torchcodec -> torchaudio as encoder backends. + + Args: + samples: A torch.Tensor with a .sample_rate attribute. Shape: (channels, frames). + + Returns: + MP3-encoded bytes. + """ + out = io.BytesIO() + try: + from torchffmpeg import MediaEncoder + + sample_rate = int(samples.sample_rate) + # samples is (channels, frames), write_audio_chunk expects (frames, channels) + waveform = samples.mT.float().contiguous() + enc = MediaEncoder(out, "mp3") + enc.add_audio_stream(sample_rate=sample_rate, num_channels=waveform.size(1), format="flt") + with enc.open(): + enc.write_audio_chunk(0, waveform) + except ImportError: + try: + from torchcodec.encoders import AudioEncoder + + AudioEncoder(samples, sample_rate=int(samples.sample_rate)).to_file_like(out, "mp3") + except ImportError: + import torchaudio + + torchaudio.save(out, samples, int(samples.sample_rate), format="mp3") + + return out.getvalue() + + +def audio_to_html(samples) -> str: + """Encode samples to an HTML