Draft: ASR Open Source Datasets Processing Pipeline#2067
Draft: ASR Open Source Datasets Processing Pipeline#2067sushmitha-deva-09 wants to merge 9 commits into
Conversation
Signed-off-by: Sushmitha Deva <sdeva@nvidia.com>
Signed-off-by: Sushmitha Deva <sdeva@nvidia.com>
Signed-off-by: Sushmitha Deva <sdeva@nvidia.com>
Signed-off-by: Sushmitha Deva <sdeva@nvidia.com>
Signed-off-by: Sushmitha Deva <sdeva@nvidia.com>
Signed-off-by: Sushmitha Deva <sdeva@nvidia.com>
Signed-off-by: Sushmitha Deva <sdeva@nvidia.com>
Signed-off-by: Sushmitha Deva <sdeva@nvidia.com>
Signed-off-by: Sushmitha Deva <sdeva@nvidia.com>
Greptile SummaryThis PR introduces a new ASR open-source dataset processing pipeline for NeMo Curator, adding ingestion support for the IndicVoices HuggingFace dataset, transcript normalization, transcript quality statistics, and split-aware manifest writing. Language resources (alphabet, pretokenization rules, and punctuation chars) cover 11 Indic languages (bn, gu, hi, kn, ml, mr, pa, ta, te, ur) plus English.
Confidence Score: 3/5The core extraction and manifest writing logic is functionally correct, but TranscriptStatsStage re-reads alphabet files from disk on every utterance processed, which will be prohibitively slow for large production runs. TranscriptStatsStage.process() triggers two full summary() calls per task. summary() invokes _bucket_summary() for every language/source pair, and each call opens alphabet.txt from disk via _load_combined_alphabet(). For a realistic multi-language run of hundreds of thousands of utterances this will degrade pipeline throughput significantly. A separate file-handle leak exists in _write_summary() after teardown, and the indicvoices stats dict uses hardcoded key construction that will KeyError on new skip_reason values. nemo_curator/stages/audio/asr/normalization/stats.py — per-task disk reads for every language alphabet; nemo_curator/stages/audio/asr/datasets/indicvoices.py — fragile stats key construction. Important Files Changed
Sequence DiagramsequenceDiagram
participant Driver
participant IndicVoicesHandler
participant joblib as joblib (threading)
participant HFDataset as HF Arrow Dataset
participant libsoundfile as soundfile
participant ManifestWriter as SplitAwareManifestWriter
participant NormStage as TranscriptNormalizationStage
participant StatsStage as TranscriptStatsStage
Driver->>IndicVoicesHandler: process(_EmptyTask)
loop per lang x native_split
IndicVoicesHandler->>HFDataset: load_from_disk + cast_column(Audio)
IndicVoicesHandler->>joblib: "Parallel(n_jobs=extraction_workers)"
loop per row (parallel)
joblib->>HFDataset: dataset[index]
joblib->>IndicVoicesHandler: coerce_audio()
joblib->>libsoundfile: write WAV (16kHz/mono/PCM16)
joblib-->>IndicVoicesHandler: _RowResult(ASRMetadata)
end
IndicVoicesHandler->>IndicVoicesHandler: assign_split (MD5 hash dev/test)
IndicVoicesHandler-->>Driver: list[AudioTask]
end
Driver->>NormStage: process(AudioTask)
NormStage->>NormStage: NFKC + pretok regex rules
NormStage-->>Driver: AudioTask (text, unknown_chars, transcript_error)
Driver->>StatsStage: process(AudioTask)
StatsStage->>StatsStage: aggregate stats buckets
StatsStage-->>Driver: AudioTask (or None if drop_invalid)
Driver->>ManifestWriter: process(AudioTask)
ManifestWriter->>ManifestWriter: route to output_dir/lang/split.jsonl
ManifestWriter-->>Driver: AudioTask
|
| def teardown(self) -> None: | ||
| for (lang, split), handle in self._handles.items(): | ||
| handle.close() | ||
| filename = self.output_filename_pattern.format(lang=lang, split=split, split_type=split) | ||
| logger.info(f"[{self.name}] {lang}/{filename}: {self._counts.get((lang, split), 0)} entries") | ||
| self._handles = {} |
There was a problem hiding this comment.
teardown() clears _handles but leaves _counts populated. If the stage is reused, _counts from the previous run accumulates into the next, making the logged entry counts wrong. Reset both dicts together.
| def teardown(self) -> None: | |
| for (lang, split), handle in self._handles.items(): | |
| handle.close() | |
| filename = self.output_filename_pattern.format(lang=lang, split=split, split_type=split) | |
| logger.info(f"[{self.name}] {lang}/{filename}: {self._counts.get((lang, split), 0)} entries") | |
| self._handles = {} | |
| def teardown(self) -> None: | |
| for (lang, split), handle in self._handles.items(): | |
| handle.close() | |
| filename = self.output_filename_pattern.format(lang=lang, split=split, split_type=split) | |
| logger.info(f"[{self.name}] {lang}/{filename}: {self._counts.get((lang, split), 0)} entries") | |
| self._handles = {} | |
| self._counts = {} |
Description
Usage
# Add snippet demonstrating usageChecklist