Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 66 additions & 16 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,27 +1,77 @@
# WSDS
## wsds — Web-Scale DataSets

wsds merges SQL querying capabilities with native support for multimodal data (speech and video) in a single data
format and a unified API. It uses shards for efficiency and to support very-scalable parallel data processing.
**wsds** is a multimodal dataset library that combines the power of SQL querying with native support for speech, audio, and video data. Built for large-scale machine learning workflows, it lets you work with massive datasets efficiently, regardless of where you store your data (SSDs, HDDs, Weka, S3).

wsds has a powerful database query engine integrated into it (built on top of Polars). This makes database-style
operations like duplicate detection, group by operations and aggregations very fast and easy to write.
This tight integration let's you run both SQL queries and efficient dataloaders directly on your data without any
conversion or importing.
```pycon
>>> from wsds import WSDataset
>>> dataset = WSDataset("librilight/v3-vad_ws")
>>> print(str(dataset))
WSDataset('librilight/v3-vad_ws', segmented=True)
Audio duration: 52.69 k hours
Speech duration: 47.44 k hours
Number of shards: 623
Number of samples: 22 662 659
<BLANKLINE>

## Getting Started
```

### Quick start

```bash
# create environment
conda create -n wsds python=3.10
conda activate wsds
pip install git+https://github.com/HumeAI/wsds.git
```

- **SQL Queries on Sharded Data** — Filter and select across your entire dataset using familiar SQL syntax, powered by Polars. Only the columns and shards you need are loaded.

```pycon
>>> dataset.sql_select('`transcription_wslang_raw.txt`', 'snr', 'tend - tstart as duration')
INFO: to speed things up wsds is loading a random 24.08% subset of the shards, pass shard_subsample=1 to force it to load the whole dataset
shape: (5_271_939, 3)
┌─────────────────────────────────┬──────────┬───────────┐
│ transcription_wslang_raw.txt ┆ snr ┆ duration │
│ --- ┆ --- ┆ --- │
│ str ┆ f16 ┆ f32 │
╞═════════════════════════════════╪══════════╪═══════════╡
│ This is a liberal box recordi… ┆ 70.0625 ┆ 1.331058 │
│ or liberty box recordings dur… ┆ 66.25 ┆ 1.962457 │
│ For more information or to vo… ┆ 65.6875 ┆ 3.276451 │
│ The Elder Eddas of Semen-Sekh… ┆ 51.09375 ┆ 4.863482 │
│ Translated by Erasmus B. Ande… ┆ 70.1875 ┆ 1.843002 │
│ … ┆ … ┆ … │
│ I stared about me. ┆ 66.1875 ┆ 1.433472 │
│ and then pointing to the huge… ┆ 64.75 ┆ 3.703003 │
│ It was there. Where it is now… ┆ 73.75 ┆ 3.651855 │
│ He shrugged his shoulders, to… ┆ 65.0 ┆ 9.4198 │
│ the first chance, and he made… ┆ 62.0 ┆ 11.501709 │
└─────────────────────────────────┴──────────┴───────────┘

```

- **Random Access & Indexing** — Optional SQLite-based indexing enables fast random access by key or integer index across shards.

```pycon
>>> x = dataset['large/1259/lettersofjaneausten_etk_librivox_64kb_mp3/lettersofjaneausten_22_austen_64kb_032']

# install hume_wsds
pip install https://github.com/HumeAI/wsds.git
```

## Tests
- **Lazy, On-Demand Loading** — Samples are dict-like objects that load fields only when accessed, keeping memory usage minimal even for terabyte-scale datasets.

```pycon
>>> x['transcription_wslang_raw.txt'], x['dbu']
(' The Sherers, I believe, are now really going to go. Joseph has had a bed here the last two nights, and I do not know whether this is not the day of moving. Mrs. Sherer called yesterday to take leave. The weather looks worse again.', -26.34375)

To run tests you currently need a copy of the `librilight` dataset. The tests can be run with:
```
WSDS_DATASET_PATH=/path/to/the/librilight/folder python tests.py

- **Native Audio & Multimodal Support** — First-class handling of speech and audio data, including segmented datasets with voice activity detection and computed columns that reference source audio.

```pycon
>>> x['audio']
WSAudio(audio_reader=AudioReader(src=<class '_io.BytesIO'>, sample_rate=None), tstart=614.46246, tend=627.3976)

```

- **Sharded Architecture** — Data is stored in `.wsds` files (PyArrow IPC format) organized by column type into subdirectories, enabling efficient columnar access patterns.

- **Atomic Writes** — The `WSSink` context manager provides safe, batched, compressed writes with atomic commit semantics.

- **Flexible Data Linking** — Computed columns and `.wsds-link` files let you compose datasets without duplicating data, referencing columns across dataset boundaries.
12 changes: 9 additions & 3 deletions tests.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
import unittest
import doctest
from . import ws_dataset, ws_shard, ws_sink
import unittest

import wsds
from wsds import ws_dataset, ws_shard, ws_sink


def load_tests(loader, tests, ignore):
tests.addTests(doctest.DocTestSuite(wsds))
tests.addTests(doctest.DocTestSuite(ws_dataset))
tests.addTests(doctest.DocTestSuite(ws_shard))
tests.addTests(doctest.DocTestSuite(ws_sink))
tests.addTests(doctest.DocFileSuite("README.md"))
return tests

if __name__ == '__main__':

if __name__ == "__main__":
unittest.main()
8 changes: 2 additions & 6 deletions wsds/__init__.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,7 @@
"""
# wsds dataset library

Usage example:
>>> from wsds import WSDataset
>>> dataset = WSDataset("librilight/v3-vad_ws")
>>> for sample in dataset.random_samples(5):
>>> print(sample['__key__'], sample['txt'])
.. include:: ../README.md
.. include:: ../docs/dataset-structure.md

"""

Expand Down
251 changes: 251 additions & 0 deletions wsds/audio_codec.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,251 @@
"""Audio codec layer: encoding, decoding, and format utilities.

This module contains all audio encoding/decoding logic, separated from the
data model layer in ws_audio.py. It provides:
- Decoder backends (TorchFFmpegAudioDecoder, CompatAudioDecoder)
- A factory for creating decoders with automatic backend selection
- MP3 encoding with multi-backend fallback
- HTML audio rendering utility
"""

from __future__ import annotations

import io
import typing

import pyarrow as pa


def to_filelike(src: typing.Any) -> typing.BinaryIO:
"""Coerces files, byte-strings and PyArrow binary buffers into file-like objects."""
if hasattr(src, "read"): # an open file
return src
# if not an open file then we assume some kind of binary data in memory
if hasattr(src, "as_buffer"): # PyArrow binary data
return pa.BufferReader(src.as_buffer())
return io.BytesIO(src)


class TorchFFmpegAudioDecoder:
def __init__(self, src, sample_rate):
from torchffmpeg import MediaDecoder

if hasattr(src, "_optimal_read_size"):
buffer_size = src._optimal_read_size
else:
buffer_size = 128 * 1024
self.src = src
self.reader = MediaDecoder(to_filelike(self.src), buffer_size=buffer_size)
self.metadata = self.reader.get_src_stream_info(self.reader.default_audio_stream)

if sample_rate is None:
sample_rate = int(self.metadata.sample_rate)

self.sample_rate = sample_rate

self.reader.add_basic_audio_stream(
frames_per_chunk=int(32 * sample_rate),
sample_rate=sample_rate,
decoder_option={"threads": "4", "thread_type": "frame"},
)

def get_samples_played_in_range(self, tstart=0, tend=None):
import torch

self.reader.seek(max(0, tstart - 1), "key")

if tend is None:
chunks = []
more_data = True
while more_data:
if self.reader.fill_buffer() == 1:
more_data = False
(chunk,) = self.reader.pop_chunks()
if chunk is not None:
chunks.append(chunk)
prefix = int((tstart - chunks[0].pts) * self.sample_rate)
if prefix < 0:
prefix = 0
return torch.cat(chunks)[prefix:].mT

self.reader.fill_buffer()
(chunk,) = self.reader.pop_chunks()
prefix = int((tstart - chunk.pts) * self.sample_rate)
if prefix < 0:
prefix = 0
if tend:
samples = chunk[prefix : prefix + int((tend - tstart) * self.sample_rate)].mT
else:
samples = chunk[prefix:].mT
while chunk is not None:
(chunk,) = self.reader.pop_chunks()
return samples


class CompatAudioDecoder:
def __init__(self, src, sample_rate):
import torchaudio

if not hasattr(torchaudio, "io"):
raise ImportError("You need either torchaudio<2.9 or torchcodec installed")
self.src = src
if hasattr(src, "_optimal_read_size"):
buffer_size = src._optimal_read_size
else:
buffer_size = 128 * 1024
self.reader = torchaudio.io.StreamReader(src=to_filelike(self.src), buffer_size=buffer_size)
self.metadata = self.reader.get_src_stream_info(0)

if sample_rate is None:
sample_rate = self.metadata.sample_rate

self.sample_rate = sample_rate

# fetch 32 seconds because we likely need 30s at maximum but the seeking may be imprecise (and we seek 1s early)
# FIXME: check if we can get away with some better settings here (-1, maybe 10s + concatenate the chunks in a loop)
self.reader.add_basic_audio_stream(
frames_per_chunk=int(32 * sample_rate),
sample_rate=sample_rate,
decoder_option={"threads": "4", "thread_type": "frame"},
)

def get_samples_played_in_range(self, tstart=0, tend=None):
# rought seek
self.reader.seek(max(0, tstart - 1), "key")

if tend is None:
import torch

chunks = []
more_data = True
while more_data:
if self.reader.fill_buffer() == 1:
more_data = False
(chunk,) = self.reader.pop_chunks()
chunks.append(chunk)
prefix = int((tstart - chunks[0].pts) * self.sample_rate)
if prefix < 0:
prefix = 0
return torch.cat(chunks)[prefix:].mT

self.reader.fill_buffer()
(chunk,) = self.reader.pop_chunks()
# tight crop (seems accurate down to 1 sample in my tests)
prefix = int((tstart - chunk.pts) * self.sample_rate)
if prefix < 0:
prefix = 0
if tend:
samples = chunk[prefix : prefix + int((tend - tstart) * self.sample_rate)].mT
else:
samples = chunk[prefix:].mT
# clear out any remaining data
while chunk is not None:
(chunk,) = self.reader.pop_chunks()
return samples


def create_decoder(src, sample_rate=None):
"""Factory: tries torchffmpeg -> torchcodec -> torchaudio, returns a decoder instance.

Args:
src: A file-like object or bytes-like source for audio data.
sample_rate: Optional target sample rate for resampling.

Returns:
A decoder instance with .metadata, .sample_rate, and .get_samples_played_in_range() interface.
"""
try:
from torchffmpeg import MediaDecoder as _ # noqa: F401

AudioDecoder = TorchFFmpegAudioDecoder
except ImportError:
try:
from torchcodec.decoders import AudioDecoder
except ImportError:
AudioDecoder = CompatAudioDecoder

return AudioDecoder(src, sample_rate=sample_rate)


def decode_segment(src, start=0, end=None, sample_rate=None):
"""One-shot decode: creates decoder, reads segment, returns tensor with .sample_rate attr.

Handles MP3 skip_samples compensation automatically.

Args:
src: Audio source (file-like, bytes, or PyArrow buffer).
start: Start time in seconds.
end: End time in seconds (None for rest of file).
sample_rate: Optional target sample rate.

Returns:
A torch.Tensor with a .sample_rate attribute.
"""
filelike = to_filelike(src)
decoder = create_decoder(filelike, sample_rate)

skip_samples = 0
if decoder.metadata.codec == "mp3":
skip_samples = 1105

if sample_rate is None:
sample_rate = decoder.metadata.sample_rate

seek_adjustment = skip_samples / sample_rate if start > 0 else 0
samples = decoder.get_samples_played_in_range(
start + seek_adjustment, end + seek_adjustment if end is not None else None
)
if hasattr(samples, "data"):
samples = samples.data
samples.sample_rate = sample_rate
return samples


def encode_mp3(samples) -> bytes:
"""Encode a torch tensor to MP3 bytes.

Tries torchffmpeg -> torchcodec -> torchaudio as encoder backends.

Args:
samples: A torch.Tensor with a .sample_rate attribute. Shape: (channels, frames).

Returns:
MP3-encoded bytes.
"""
out = io.BytesIO()
try:
from torchffmpeg import MediaEncoder

sample_rate = int(samples.sample_rate)
# samples is (channels, frames), write_audio_chunk expects (frames, channels)
waveform = samples.mT.float().contiguous()
enc = MediaEncoder(out, "mp3")
enc.add_audio_stream(sample_rate=sample_rate, num_channels=waveform.size(1), format="flt")
with enc.open():
enc.write_audio_chunk(0, waveform)
except ImportError:
try:
from torchcodec.encoders import AudioEncoder

AudioEncoder(samples, sample_rate=int(samples.sample_rate)).to_file_like(out, "mp3")
except ImportError:
import torchaudio

torchaudio.save(out, samples, int(samples.sample_rate), format="mp3")

return out.getvalue()


def audio_to_html(samples) -> str:
"""Encode samples to an HTML <audio> tag with base64 MP3 data.

Args:
samples: A torch.Tensor with a .sample_rate attribute.

Returns:
An HTML string with an embedded audio player.
"""
import base64

mp3_data = base64.b64encode(encode_mp3(samples)).decode("ascii")
return f'<audio controls src="data:audio/mp3;base64,{mp3_data}"></audio>'
Loading