Skip to content

Draft: ASR Open Source Datasets Processing Pipeline#2067

Open
sushmitha-deva-09 wants to merge 9 commits into
NVIDIA-NeMo:mainfrom
sushmitha-deva-09:asr_dp
Open

Draft: ASR Open Source Datasets Processing Pipeline#2067
sushmitha-deva-09 wants to merge 9 commits into
NVIDIA-NeMo:mainfrom
sushmitha-deva-09:asr_dp

Conversation

@sushmitha-deva-09

Copy link
Copy Markdown
Contributor

Description

Usage

# Add snippet demonstrating usage

Checklist

  • I am familiar with the Contributing Guide.
  • New or Existing tests cover these changes.
  • The documentation is up to date with these changes.

Signed-off-by: Sushmitha Deva <sdeva@nvidia.com>
Signed-off-by: Sushmitha Deva <sdeva@nvidia.com>
Signed-off-by: Sushmitha Deva <sdeva@nvidia.com>
Signed-off-by: Sushmitha Deva <sdeva@nvidia.com>
Signed-off-by: Sushmitha Deva <sdeva@nvidia.com>
Signed-off-by: Sushmitha Deva <sdeva@nvidia.com>
Signed-off-by: Sushmitha Deva <sdeva@nvidia.com>
Signed-off-by: Sushmitha Deva <sdeva@nvidia.com>
Signed-off-by: Sushmitha Deva <sdeva@nvidia.com>
@sushmitha-deva-09 sushmitha-deva-09 requested a review from a team as a code owner June 11, 2026 11:17
@sushmitha-deva-09 sushmitha-deva-09 requested review from meatybobby and removed request for a team June 11, 2026 11:17
@copy-pr-bot

copy-pr-bot Bot commented Jun 11, 2026

Copy link
Copy Markdown

This pull request requires additional validation before any workflows can run on NVIDIA's runners.

Pull request vetters can view their responsibilities here.

Contributors can view more details about this message here.

@greptile-apps

greptile-apps Bot commented Jun 11, 2026

Copy link
Copy Markdown
Contributor

Greptile Summary

This PR introduces a new ASR open-source dataset processing pipeline for NeMo Curator, adding ingestion support for the IndicVoices HuggingFace dataset, transcript normalization, transcript quality statistics, and split-aware manifest writing. Language resources (alphabet, pretokenization rules, and punctuation chars) cover 11 Indic languages (bn, gu, hi, kn, ml, mr, pa, ta, te, ur) plus English.

  • BaseASRDatasetHandlerStage / IndicVoicesHandler extract arrow datasets into WAV/16 kHz/mono/PCM16 audio tasks, with deterministic dev/test splitting via MD5 hash of utterance IDs.
  • TranscriptNormalizationStage and TranscriptStatsStage provide NFKC+regex-based normalization and streaming stats collection; SplitAwareManifestWriter routes tasks to per-language split JSONL manifests.

Confidence Score: 3/5

The core extraction and manifest writing logic is functionally correct, but TranscriptStatsStage re-reads alphabet files from disk on every utterance processed, which will be prohibitively slow for large production runs.

TranscriptStatsStage.process() triggers two full summary() calls per task. summary() invokes _bucket_summary() for every language/source pair, and each call opens alphabet.txt from disk via _load_combined_alphabet(). For a realistic multi-language run of hundreds of thousands of utterances this will degrade pipeline throughput significantly. A separate file-handle leak exists in _write_summary() after teardown, and the indicvoices stats dict uses hardcoded key construction that will KeyError on new skip_reason values.

nemo_curator/stages/audio/asr/normalization/stats.py — per-task disk reads for every language alphabet; nemo_curator/stages/audio/asr/datasets/indicvoices.py — fragile stats key construction.

Important Files Changed

Filename Overview
nemo_curator/stages/audio/asr/normalization/stats.py TranscriptStatsStage.process() calls summary() twice per task; summary() reads alphabet files from disk for every language on each invocation, which will be very slow for large streaming datasets.
nemo_curator/stages/audio/asr/datasets/indicvoices.py IndicVoicesHandler correctly extracts arrow datasets with parallel joblib workers; stats dict uses fragile hardcoded key construction that will KeyError if new skip_reason values are introduced.
nemo_curator/stages/audio/asr/datasets/base.py BaseASRDatasetHandlerStage provides solid audio conversion, manifest writing, and setup/teardown lifecycle; no critical issues found.
nemo_curator/stages/audio/asr/normalization/transcript.py ResourceTranscriptNormalizer correctly loads language resources once per normalizer instance and applies NFKC + regex rules; resolve_lang raises ValueError for unknown languages.
nemo_curator/stages/audio/asr/io/split_manifest_writer.py SplitAwareManifestWriter correctly routes AudioTasks to per-language/split JSONL files, pre-creates declared manifests on setup, and flushes on every write.
nemo_curator/stages/audio/asr/metadata.py ASRMetadata dataclass is clean; to_dict() correctly prioritizes core fields over extra dict keys, and from_dict() round-trips cleanly.
nemo_curator/stages/audio/common.py Small fix to coerce Hydra/OmegaConf list types to plain Python paths for ManifestReader; correct and minimal.
tutorials/audio/indicvoices/pipeline.py Clean tutorial demonstrating end-to-end IndicVoices extraction + manifest writing; no issues found.

Sequence Diagram

sequenceDiagram
    participant Driver
    participant IndicVoicesHandler
    participant joblib as joblib (threading)
    participant HFDataset as HF Arrow Dataset
    participant libsoundfile as soundfile
    participant ManifestWriter as SplitAwareManifestWriter
    participant NormStage as TranscriptNormalizationStage
    participant StatsStage as TranscriptStatsStage

    Driver->>IndicVoicesHandler: process(_EmptyTask)
    loop per lang x native_split
        IndicVoicesHandler->>HFDataset: load_from_disk + cast_column(Audio)
        IndicVoicesHandler->>joblib: "Parallel(n_jobs=extraction_workers)"
        loop per row (parallel)
            joblib->>HFDataset: dataset[index]
            joblib->>IndicVoicesHandler: coerce_audio()
            joblib->>libsoundfile: write WAV (16kHz/mono/PCM16)
            joblib-->>IndicVoicesHandler: _RowResult(ASRMetadata)
        end
        IndicVoicesHandler->>IndicVoicesHandler: assign_split (MD5 hash dev/test)
        IndicVoicesHandler-->>Driver: list[AudioTask]
    end
    Driver->>NormStage: process(AudioTask)
    NormStage->>NormStage: NFKC + pretok regex rules
    NormStage-->>Driver: AudioTask (text, unknown_chars, transcript_error)
    Driver->>StatsStage: process(AudioTask)
    StatsStage->>StatsStage: aggregate stats buckets
    StatsStage-->>Driver: AudioTask (or None if drop_invalid)
    Driver->>ManifestWriter: process(AudioTask)
    ManifestWriter->>ManifestWriter: route to output_dir/lang/split.jsonl
    ManifestWriter-->>Driver: AudioTask
Loading

Comments Outside Diff (3)

  1. nemo_curator/stages/audio/asr/normalization/stats.py, line 970-974 (link)

    P1 Per-task summary() call reads alphabet files from disk on every processed utterance

    _metrics_snapshot() calls self.summary(), which in turn calls _bucket_summary() for every language/source combination, and each _bucket_summary() call invokes _load_combined_alphabet()_load_alphabet() which opens and reads alphabet.txt from disk. For a 100 K-utterance multi-language dataset with 3 languages and 2 sources, this translates to ~600 K file reads. The same summary() is called a second time inside _write_summary() when output_summary_path is set. Cache the combined alphabet per (lang, code_switch_langs) key and read it only once.

    Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!

  2. nemo_curator/stages/audio/asr/normalization/stats.py, line 1033-1040 (link)

    P2 File handle leaked if _write_summary() is called after teardown()

    teardown() closes and nulls _summary_handle. If _write_summary() is subsequently called, the if self._summary_handle is None: self.setup_on_node() branch reopens the file in "w" mode, truncating the previously written summary and returning a handle that is never closed. Consider raising an error or returning early instead of re-invoking setup_on_node().

  3. nemo_curator/stages/audio/asr/datasets/indicvoices.py, line 494-497 (link)

    P2 KeyError if a new skip_reason is introduced without updating the stats dict

    stats[f"skipped_{result.skip_reason}"] += 1 constructs the dict key from the skip_reason string. The pre-seeded dict only has "skipped_missing_text", "skipped_missing_audio", and "skipped_audio_load". Any new _RowResult(skip_reason="...") value will raise KeyError at runtime. Use stats.get(...) with a default of 0 or switch to a Counter.

Reviews (1): Last reviewed commit: "Add alphabet to more indic languages" | Re-trigger Greptile

Comment on lines +120 to +125
def teardown(self) -> None:
for (lang, split), handle in self._handles.items():
handle.close()
filename = self.output_filename_pattern.format(lang=lang, split=split, split_type=split)
logger.info(f"[{self.name}] {lang}/{filename}: {self._counts.get((lang, split), 0)} entries")
self._handles = {}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 teardown() clears _handles but leaves _counts populated. If the stage is reused, _counts from the previous run accumulates into the next, making the logged entry counts wrong. Reset both dicts together.

Suggested change
def teardown(self) -> None:
for (lang, split), handle in self._handles.items():
handle.close()
filename = self.output_filename_pattern.format(lang=lang, split=split, split_type=split)
logger.info(f"[{self.name}] {lang}/{filename}: {self._counts.get((lang, split), 0)} entries")
self._handles = {}
def teardown(self) -> None:
for (lang, split), handle in self._handles.items():
handle.close()
filename = self.output_filename_pattern.format(lang=lang, split=split, split_type=split)
logger.info(f"[{self.name}] {lang}/{filename}: {self._counts.get((lang, split), 0)} entries")
self._handles = {}
self._counts = {}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant