Skip to content

Aaftabv/tagging stage adapter split#2037

Open
mohammadaaftabv wants to merge 6 commits into
NVIDIA-NeMo:mainfrom
mohammadaaftabv:aaftabv/tagging-stage-adapter-split
Open

Aaftabv/tagging stage adapter split#2037
mohammadaaftabv wants to merge 6 commits into
NVIDIA-NeMo:mainfrom
mohammadaaftabv:aaftabv/tagging-stage-adapter-split

Conversation

@mohammadaaftabv

@mohammadaaftabv mohammadaaftabv commented May 29, 2026

Copy link
Copy Markdown
Contributor

Description

Bring the SDP-V2 stage-adapter split (originally introduced in PR #1967 for the Qwen-Omni first-pass ASR slot) to all three GPU inference stages in the audio tagging pipeline:

Old monolithic stage New generic stage + Concrete adapter
PyAnnoteDiarizationStage DiarizationStage + PyAnnoteDiarizationAdapter
WhisperXVADStage VADStage + WhisperXVADAdapter
NeMoASRAlignerStage ForcedAlignmentStage + NeMoASRAlignAdapter

Each generic stage owns Curator-side glue only (task.data reads/writes, item-dict construction, adapter dispatch, on-disk dict conversion, metric logging). Each adapter encapsulates the model-specific code path (HF auth, model load, batching knobs, inference call sequence) behind a typed Protocol. Class resolution at runtime uses hydra.utils.get_class(adapter_target) — same convention as nemo_curator/config/run.py and PR #1967's ASRStage.

This unlocks model swaps as YAML-only changes (Tier-1 swap line adapter_target: + Tier-2 opaque adapter_kwargs:) without touching Curator source for any future diarization / VAD / alignment backend (Sortformer, Silero, NeMo NFA, WhisperX alignment, etc.).

Design reference

Implements the patterns prescribed in the SDP-V2 Pipeline Design doc:

  • §3 Speaker diarization → DiarizationAdapter Protocol + Stage
  • §4 Voice activity detection → VADAdapter Protocol + Stage
  • §13 Forced alignment → ForcedAlignmentAdapter Protocol + Stage

Commits in this PR

SHA Title Files +/-
5d99f27 audio/diarization: stage-adapter split per SDP-V2 design 17 +1499 / -436
3be716e audio/vad: stage-adapter split per SDP-V2 design 11 +911 / -152
b9f3c8b audio/alignment: stage-adapter split per SDP-V2 design 19 +1421 / -639
fae6b48 audio/tagging: align test overrides + Hydra schema with stage-adapter split 2 +14 / -5
9564b74 audio/tagging: add tts_pipeline_correctness.yaml for Kratos reference-diff run 1 +168 / -0

Cumulative diff vs main: 44 files changed, +4013 / -1232.

File layout introduced

nemo_curator/
├── adapters/                                  ← NEW package
│   ├── diarization/
│   │   ├── base.py            # DiarSegment + DiarizationResult + DiarizationAdapter Protocol
│   │   └── pyannote.py        # PyAnnoteDiarizationAdapter
│   ├── vad/
│   │   ├── base.py            # VADInterval + VADResult + VADAdapter Protocol
│   │   └── whisperx.py        # WhisperXVADAdapter
│   └── alignment/
│       ├── base.py            # WordAlignment + AlignmentResult + ForcedAlignmentAdapter Protocol
│       └── nemo_asr_align.py  # NeMoASRAlignAdapter
└── stages/audio/inference/
    ├── speaker_diarization/
    │   └── stage.py           # DiarizationStage
    │   # (pyannote.py DELETED — no facade, per SDP-V2 + PR1967 posture)
    ├── vad/
    │   ├── stage.py           # VADStage
    │   └── whisperx_vad.py    # WhisperXVADModel helper retained (shared with PyAnnote adapter)
    │   # (WhisperXVADStage class DELETED)
    └── alignment/             ← NEW directory
        └── stage.py           # ForcedAlignmentStage

# Deleted (no backwards-compatibility facade, per PR1967 pattern):
nemo_curator/stages/audio/inference/speaker_diarization/pyannote.py
nemo_curator/stages/audio/tagging/inference/nemo_asr_align.py
nemo_curator/stages/audio/tagging/inference/__init__.py
nemo_curator/stages/audio/tagging/inference/base_asr_processor.py
(WhisperXVADStage class only — whisperx_vad.py file stays as WhisperXVADModel helper)

Behaviour preservation

All three families carry their core logic byte-for-byte verbatim from the deleted monolithic stages. No inference behaviour changes; no metric-key changes; on-disk task field schema preserved.

Property Status
RTTM sidecar write, overlap detection, has_overlap semantics, WhisperX-VAD long-turn micro-split Preserved (in PyAnnoteDiarizationAdapter)
Skip-short-clip rule, vad_onset / vad_offset thresholds, max_length chunk-merge Preserved (in WhisperXVADAdapter)
FastConformer time_stride = 8 × window_stride, RNNT -0.08s offset, confidence rounding, one-by-one retry fallback, U+2047 strip Preserved (in NeMoASRAlignAdapter)
SplitASRAlignJoinStage external dataclass fields + public API Unchanged (only internal decompose() switches to new stage)
nemo_curator/stages/audio/tagging/__init__.py lazy-import surface Updated: drops old class names, adds new generic stages + adapters
Output JSONL schema (segments, alignment, text, metrics, etc.) Preserved

Usage

YAML swap pattern (Tier-1 = adapter_target, Tier-2 = opaque adapter_kwargs):

# Speaker diarization
- _target_: nemo_curator.stages.audio.inference.speaker_diarization.DiarizationStage
  name: PyAnnoteDiarization            # explicit override → preserves perf_summary key
  adapter_target: nemo_curator.adapters.diarization.PyAnnoteDiarizationAdapter
  model_id: pyannote/speaker-diarization-3.1
  non_speaker_max_length: ${max_segment_length}
  adapter_kwargs:                      # Tier-2 — opaque, stage never reads inside
    hf_token: ${hf_token}
    max_length: ${max_segment_length}
    segmentation_batch_size: 128
    embedding_batch_size: 128

# Forced alignment
- _target_: nemo_curator.stages.audio.inference.alignment.ForcedAlignmentStage
  name: ASRAlignment
  adapter_target: nemo_curator.adapters.alignment.NeMoASRAlignAdapter
  model_id: nvidia/parakeet-tdt_ctc-1.1b
  batch_size: 32
  adapter_kwargs:
    is_fastconformer: true
    decoder_type: rnnt

Swapping the diarization model in the future is a one-line YAML change:

# Tier-1 swap line — pull in a future Sortformer or WhisperX diarization adapter
adapter_target: nemo_curator.adapters.diarization.SortformerDiarizationAdapter

Call-site migrations

File Change
tutorials/audio/tagging/tts_pipeline.yaml Production-shape YAML migrated to new stages + adapters; + data_config: / + output_dir: Hydra-shim added in fae6b48
tutorials/audio/tagging/tts_pipeline_correctness.yaml New — bakes the in-tree-reference-producing pipeline shape (Kratos correctness validation; see "Kratos validation" below)
tutorials/audio/tagging/README.md Stage table + override examples updated to new shape
tests/stages/audio/tagging/e2e/configs/tts_pipeline.yaml Test e2e YAML migrated to new stages + adapters
tests/stages/audio/tagging/e2e/test_tts_e2e.py Stage-4 runtime overrides updated to new field names (model_id Tier-1, is_fastconformer / decoder_type / transcribe_batch_size under adapter_kwargs.*)
benchmarking/scripts/audio_tagging_benchmark.py Stage instantiations updated to new shape (preserves old single-knob semantics via adapter_kwargs={...})
nemo_curator/stages/audio/tagging/__init__.py Lazy-import map: drops old monoliths, adds new generic stages + adapters
nemo_curator/stages/audio/tagging/split.py SplitASRAlignJoinStage.decompose() now wires ForcedAlignmentStage + NeMoASRAlignAdapter; external dataclass fields preserved for API compatibility

Test coverage

~45 new CPU tests (mirrors PR #1967's ~45-test scope):

File Scope
tests/stages/audio/inference/speaker_diarization/test_diarization_stage.py 20 CPU tests against a fake adapter (construction, lifecycle, process, metric logging, no GPU / PyAnnote needed)
tests/adapters/diarization/test_pyannote_adapter.py has_overlap unit tests (carried over), adapter construction, prefetch_weights, setup/teardown (PyAnnote mocked), diarize_batch paths, _add_vad_segments micro-split
tests/stages/audio/inference/vad/test_vad_stage.py 14 CPU tests against a fake adapter
tests/adapters/vad/test_whisperx_adapter.py WhisperXVADAdapter construction, prefetch_weights, lifecycle (WhisperX mocked), detect_batch empty / missing-filepath / short-clip-skip / happy-path
tests/stages/audio/inference/alignment/test_forced_alignment_stage.py 15 CPU tests against a fake adapter (full-audio fan-out + scatter, segment-only with mocked torchaudio.load, time-offset, sentinel handling, metrics)
tests/adapters/alignment/test_nemo_asr_align_adapter.py NeMoASRAlignAdapter construction + validation, prefetch_weights, align_batch path-mode + segment-mode dispatch, transcribe-tuple unwrap, batch-failure one-by-one retry, get_alignments_text CTC vs RNNT time-stride math, U+2047 strip

All new tests run on CPU with mocked external dependencies; no GPU, no model weights, no HF token required. Existing PyAnnote / WhisperX / NeMo ASR tests are removed alongside their stage class deletions.

Kratos validation (in progress)

The branch ships a dedicated tts_pipeline_correctness.yaml (commit 9564b74) that bakes the in-tree-reference-producing pipeline shape (CTC stt_en_fastconformer_ctc_large @ batch=1, PyAnnote segmentation_batch_size=2 / embedding_batch_size=2) — so a single-node single-GPU Kratos run against the bundled fixture audio reproduces the gold reference manifest under check_output() tolerance (text exact match + segment boundaries pytest.approx(rel=1e-3) + word timings abs=0.01).

Component Path
Test input tests/fixtures/audio/tagging/sample_input.jsonl + audios/audio_{1,2}.opus (in-tree, no swift upload of audio needed)
Gold reference tests/fixtures/audio/tagging/reference/tts/test_data_reference.jsonl
Diff harness tests/stages/audio/tagging/e2e/utils.py::check_output
Kratos config tutorials/audio/tagging/tts_pipeline_correctness.yaml
Pipeline entry tutorials/audio/tagging/main.py (unchanged Hydra-driven generic runner)

Out of scope (deliberate follow-ups)

Same posture as PR #1967 — keep this PR focused on the core split. Follow-ups ship in separate PRs once the stage contract proves out:

  • SortformerDiarizationAdapter / WhisperXDiarizationAdapter (diarization §3)
  • SileroVADAdapter / PyAnnoteVADAdapter (VAD §4)
  • NeMoNFAAdapter (the SDP-V2 doc's "canonical" alignment adapter — NFA, not transcribe(timestamps=True)) / WhisperXAlignmentAdapter (forced alignment §13)
  • Stage-side pre-slicing + BatchPolicy integration on these three stages (the doc shows duration_bucketed configs for them; PR Granary v2: Qwen-Omni in-process ASR (stage-adapter split + generic bucketed inference) #1967 added the dataclass to ASRStage only)
  • In-memory waveform handoff between stages (current pipeline still relies on resampled_audio_filepath + SplitLongAudioStage for §1.2 / §6 chunking)

Checklist

  • I am familiar with the Contributing Guide.
  • New or Existing tests cover these changes (~45 new CPU tests across stages + adapters).
  • The documentation is up to date with these changes (tutorials/audio/tagging/README.md + lazy-import map + YAML banner comments).

Bring the SDP-V2 design doc (sec 3 speaker diarization) stage-adapter
split into the audio tagging pipeline.

What moves
----------

* New package nemo_curator/adapters/ with:
  - adapters/diarization/base.py - DiarSegment + DiarizationResult
    dataclasses + DiarizationAdapter Protocol (model_id / revision /
    setup / teardown / prefetch_weights classmethod / diarize_batch).
  - adapters/diarization/pyannote.py - PyAnnoteDiarizationAdapter that
    re-houses every PyAnnote-specific code path from the deleted
    PyAnnoteDiarizationStage (HF auth, in-pipeline batching knobs,
    overlap detection, has_overlap walk, WhisperX-VAD-driven
    micro-split of long turns, RTTM sidecar write).
* New generic nemo_curator/stages/audio/inference/speaker_diarization/
  stage.py with DiarizationStage that owns Curator-side glue only:
  task.data key reads, item-dict construction, adapter dispatch,
  DiarSegment to on-disk dict conversion, add_non_speaker_segments
  gap fill, metric logging.

YAML shape (Tier-1 / Tier-2 split)
----------------------------------

  - _target_: nemo_curator.stages.audio.inference.speaker_diarization.DiarizationStage
    name: PyAnnoteDiarization               # keeps perf_summary key
    adapter_target: nemo_curator.adapters.diarization.PyAnnoteDiarizationAdapter
    model_id: pyannote/speaker-diarization-3.1
    non_speaker_max_length: ${max_segment_length}
    adapter_kwargs:
      hf_token: ${hf_token}
      max_length: ${max_segment_length}

Class resolution uses hydra.utils.get_class(adapter_target).

Behaviour preservation
----------------------

* RTTM sidecar write, overlap detection, has_overlap semantics,
  WhisperX-VAD micro-split of >max_length turns - all preserved
  byte-for-byte from the pre-split PyAnnoteDiarizationStage. Numeric
  output is identical for the same inputs and same random seed.
* On-disk segments_key + overlap_segments_key dict shape unchanged.
* add_non_speaker_segments still runs stage-side after adapter results.

What is deleted
---------------

* nemo_curator/stages/audio/inference/speaker_diarization/pyannote.py
* tests/stages/audio/inference/speaker_diarization/test_pyannote.py

Call-site migrations
--------------------

* tutorials/audio/tagging/tts_pipeline.yaml
* tests/stages/audio/tagging/e2e/configs/tts_pipeline.yaml
* benchmarking/scripts/audio_tagging_benchmark.py
* nemo_curator/stages/audio/tagging/__init__.py (lazy-import map)
* tutorials/audio/tagging/README.md (table + override doc)

New tests
---------

* tests/stages/audio/inference/speaker_diarization/test_diarization_stage.py
  - 20 CPU tests against a fake adapter (construction, lifecycle,
    process(), metric logging, no GPU / PyAnnote needed).
* tests/adapters/diarization/test_pyannote_adapter.py
  - has_overlap unit tests (carried over from the deleted file).
  - PyAnnoteDiarizationAdapter construction, prefetch_weights,
    setup/teardown lifecycle (PyAnnote mocked), diarize_batch
    empty/missing-filepath paths, _add_vad_segments micro-split.

Follow-ups (out of scope for this commit; same posture as PR1967)
-----------------------------------------------------------------

* Stage-side pre-slicing and BatchPolicy - tagging pipeline currently
  uses SplitLongAudioStage for that role; revisit when promoting to
  in-memory dataflow.
* SortformerAdapter / WhisperXDiarizationAdapter - the stage is now
  shaped for them; they ship in a separate commit.

Signed-off-by: aaftaabv@gmail.com <aaftaabv@gmail.com>
Bring the SDP-V2 design doc (sec 4 voice activity detection)
stage-adapter split into the audio tagging pipeline.

What moves
----------

* nemo_curator/adapters/vad/
  - base.py - VADInterval + VADResult dataclasses + VADAdapter Protocol
    (model_id / revision / setup / teardown / prefetch_weights
    classmethod / detect_batch).
  - whisperx.py - WhisperXVADAdapter that re-houses the WhisperX VAD
    code path from the deleted WhisperXVADStage process() body. Uses
    the same WhisperXVADModel helper as before (kept at its existing
    nemo_curator/stages/audio/inference/vad/whisperx_vad.py home -
    PyAnnoteDiarizationAdapter still consumes it for sub-segment VAD,
    so the helper stays as shared infra).
* nemo_curator/stages/audio/inference/vad/stage.py with VADStage that
  owns Curator-side glue only: task.data key reads, item-dict
  construction, adapter dispatch, VADInterval to on-disk dict
  conversion, metric logging.

YAML shape (Tier-1 / Tier-2 split)
----------------------------------

  - _target_: nemo_curator.stages.audio.inference.vad.VADStage
    name: WhisperXVAD
    adapter_target: nemo_curator.adapters.vad.WhisperXVADAdapter
    model_id: whisperx/vad
    adapter_kwargs:
      vad_onset: 0.5
      vad_offset: 0.363
      min_length: 0.5
      max_length: 40.0

What is deleted
---------------

* WhisperXVADStage class (removed from
  nemo_curator/stages/audio/inference/vad/whisperx_vad.py; the file is
  now just the WhisperXVADModel helper).
* tests/stages/audio/inference/vad/test_whisperx_vad.py

Call-site migrations
--------------------

* nemo_curator/stages/audio/inference/vad/__init__.py now re-exports
  VADStage (new) and WhisperXVADModel (helper still used by both
  adapters).
* nemo_curator/stages/audio/tagging/__init__.py lazy-import map drops
  WhisperXVADStage; adds VADStage and WhisperXVADAdapter.

New tests
---------

* tests/stages/audio/inference/vad/test_vad_stage.py
  - 14 CPU tests against a fake adapter (construction, lifecycle,
    process(), metric logging, no GPU / WhisperX needed).
* tests/adapters/vad/test_whisperx_adapter.py
  - WhisperXVADAdapter construction, prefetch_weights, setup/teardown
    lifecycle (WhisperXVADModel mocked), detect_batch empty /
    missing-filepath / short-clip-skip / happy-path with mocked sf.read
    and WhisperX VAD model.

Behaviour preservation
----------------------

* Skip-short-clip rule, vad_onset/vad_offset thresholds, max_length
  chunk-merge - all preserved from pre-split WhisperXVADStage.
* PyAnnoteDiarizationAdapter (commit 1) still constructs a
  WhisperXVADModel for its long-turn sub-VAD path; behaviour
  unchanged.
* No tutorials/audio/tagging/tts_pipeline.yaml migration is needed
  because the tagging tutorial does not wire WhisperXVADStage
  standalone - it only enters via the PyAnnote diarization adapter.

Follow-ups (out of scope; same posture as PR1967)
-------------------------------------------------

* SileroVADAdapter / PyAnnoteVADAdapter - the stage is now shaped for
  them; they ship in a separate commit.

Signed-off-by: aaftaabv@gmail.com <aaftaabv@gmail.com>
Bring the SDP-V2 design doc (sec 13 forced alignment) stage-adapter
split into the audio tagging pipeline.

What moves
----------

* New nemo_curator/adapters/alignment/ package with:
  - base.py - WordAlignment + AlignmentResult dataclasses +
    ForcedAlignmentAdapter Protocol (model_id / revision / setup /
    teardown / prefetch_weights classmethod / align_batch).
  - nemo_asr_align.py - NeMoASRAlignAdapter that re-houses the
    NeMo-specific code path from the deleted NeMoASRAlignerStage
    body: ASRModel.from_pretrained / restore_from, FastConformer
    config (change_attention_model /
    change_subsampling_conv_chunking_factor), decoder config (CTC vs
    RNNT, preserve_alignments, preserve_word_confidence,
    compute_timestamps), _override_cfg setup, transcribe() with
    one-by-one retry fallback, get_alignments_text time-stride math
    (FastConformer 8x window_stride vs default 4x, RNNT -0.08s
    offset), and U+2047 \"?\" glyph strip on the joined text.
* New nemo_curator/stages/audio/inference/alignment/stage.py with
  ForcedAlignmentStage that owns Curator-side glue only:
  task.data plumbing, split_filepaths fan-out + scatter,
  segment-mode in-memory audio cut (the
  _prepare_segment_batch_with_metadata helper, moved from
  BaseASRProcessorStage), per-segment time-offset adjustment, batch
  homogeneity guarantee, metric logging.

YAML shape (Tier-1 / Tier-2 split)
----------------------------------

  - _target_: nemo_curator.stages.audio.inference.alignment.ForcedAlignmentStage
    name: ASRAlignment
    adapter_target: nemo_curator.adapters.alignment.NeMoASRAlignAdapter
    model_id: nvidia/parakeet-tdt_ctc-1.1b
    batch_size: 32
    adapter_kwargs:
      is_fastconformer: true
      decoder_type: rnnt

What is deleted
---------------

* nemo_curator/stages/audio/tagging/inference/nemo_asr_align.py
  (NeMoASRAlignerStage + BaseASRProcessorStage)
* nemo_curator/stages/audio/tagging/inference/ (empty after removal)
* tests/stages/audio/tagging/inference/test_base_asr_processor.py
* tests/stages/audio/tagging/inference/test_nemo_asr_align.py
* tests/stages/audio/tagging/inference/ (empty after removal)

Call-site migrations
--------------------

* tutorials/audio/tagging/tts_pipeline.yaml
* tests/stages/audio/tagging/e2e/configs/tts_pipeline.yaml
* tests/stages/audio/tagging/e2e/test_tts_e2e.py (comment)
* benchmarking/scripts/audio_tagging_benchmark.py
* nemo_curator/stages/audio/tagging/__init__.py (lazy-import map -
  drops NeMoASRAlignerStage + BaseASRProcessorStage, adds
  ForcedAlignmentStage + NeMoASRAlignAdapter)
* nemo_curator/stages/audio/tagging/split.py
  (SplitASRAlignJoinStage.decompose() now wires ForcedAlignmentStage
  with NeMoASRAlignAdapter; public API of SplitASRAlignJoinStage is
  unchanged.)
* tutorials/audio/tagging/README.md (table row)

New tests
---------

* tests/stages/audio/inference/alignment/test_forced_alignment_stage.py
  - 15 CPU tests against a fake adapter (construction, lifecycle,
    full-audio mode scatter into split_metadata, top-level fallback
    when no split_metadata, sentinel non-list split_filepaths,
    segment-only mode with mocked torchaudio.load, time-offset
    adjustment, eligible-segment min_len filter, metrics).
* tests/adapters/alignment/test_nemo_asr_align_adapter.py
  - NeMoASRAlignAdapter construction + validation (rejects unknown
    decoder_type / timestamp_type), prefetch_weights happy + failure
    paths, align_batch path-mode and segment-mode dispatch,
    transcribe-tuple unwrap, batch-failure one-by-one retry in
    path-mode (succeed) vs segment-mode (raise), get_alignments_text
    CTC vs RNNT time-stride math, U+2047 strip, compute_timestamps
    False short-circuit.

Behaviour preservation
----------------------

* FastConformer time_stride = 8 * window_stride (default 4x) -
  byte-for-byte preserved.
* RNNT start/end = max(0, offset * stride - 0.08) - preserved.
* Confidence rounding to 4 decimals + start/end rounding to 3 -
  preserved.
* One-by-one retry fallback when batch transcribe fails in
  path-mode - preserved.
* SplitASRAlignJoinStage external dataclass fields and public API
  unchanged; only its internal decompose() changes.

Follow-ups (out of scope; same posture as PR1967)
-------------------------------------------------

* NeMoNFAAdapter (the SDP-V2 doc \"canonical\" alignment adapter -
  NFA, not transcribe(timestamps=True)) - the stage is now shaped
  for it; it ships in a separate commit.
* WhisperXAlignmentAdapter - same.

Signed-off-by: aaftaabv@gmail.com <aaftaabv@gmail.com>
@mohammadaaftabv mohammadaaftabv requested a review from a team as a code owner May 29, 2026 10:07
@mohammadaaftabv mohammadaaftabv requested review from sarahyurick and removed request for a team May 29, 2026 10:07
@copy-pr-bot

copy-pr-bot Bot commented May 29, 2026

Copy link
Copy Markdown

This pull request requires additional validation before any workflows can run on NVIDIA's runners.

Pull request vetters can view their responsibilities here.

Contributors can view more details about this message here.

@greptile-apps

greptile-apps Bot commented May 29, 2026

Copy link
Copy Markdown
Contributor

Greptile Summary

This PR applies the SDP-V2 stage-adapter split pattern (introduced in PR #1967 for the ASR slot) to all three remaining GPU inference stages in the audio tagging pipeline: speaker diarization, VAD, and forced alignment. Each monolithic stage is replaced by a generic *Stage (Curator glue only) and a concrete *Adapter (model-specific code), with class resolution via hydra.utils.get_class(adapter_target), enabling model swaps as YAML-only changes.

  • Three new generic stages (DiarizationStage, VADStage, ForcedAlignmentStage) and three new adapters (PyAnnoteDiarizationAdapter, WhisperXVADAdapter, NeMoASRAlignAdapter) are introduced under nemo_curator/adapters/; core inference logic is byte-for-byte ported from the deleted monolithic stages.
  • SplitASRAlignJoinStage.decompose() is rewired to construct ForcedAlignmentStage + NeMoASRAlignAdapter; all public dataclass fields and the JSONL output schema are preserved.
  • ~45 new CPU tests cover stage lifecycle and adapter paths with mocked backends; call-sites in benchmarking, e2e tests, and tutorial YAMLs are all migrated to the new shape.

Confidence Score: 5/5

Safe to merge; all inference logic is ported verbatim from deleted stages, output schema is unchanged, and ~45 CPU tests cover the new stage-adapter contracts.

The core split is a well-scoped structural refactor with no algorithmic changes. Call-site migrations in the benchmark, e2e test, and YAML configs are all consistent. The two flagged items (confusing prefetch error message, asymmetric result-count guard) are non-blocking quality concerns that do not affect correctness on the normal execution path.

The new prefetch_weights error message in nemo_curator/adapters/diarization/pyannote.py (line 163) and the result-count guard in nemo_curator/stages/audio/inference/alignment/stage.py (line 261) are worth a second look before future adapter authors follow these patterns.

Important Files Changed

Filename Overview
nemo_curator/stages/audio/inference/speaker_diarization/stage.py New generic DiarizationStage with pluggable adapter via adapter_target; lifecycle, item-building, metric logging are clean.
nemo_curator/stages/audio/inference/vad/stage.py New generic VADStage; skipped_short metric fallback (line 232) always emits 0 — already noted in prior review thread.
nemo_curator/stages/audio/inference/alignment/stage.py New generic ForcedAlignmentStage; outputs() missing text/words/alignment keys (prior thread); asymmetric adapter-result length guard (new comment).
nemo_curator/adapters/diarization/pyannote.py PyAnnoteDiarizationAdapter; logic faithfully ported from deleted stage; prefetch_weights error message is contradictory (new comment).
nemo_curator/adapters/alignment/nemo_asr_align.py NeMoASRAlignAdapter; CTC/RNNT path, one-by-one retry fallback, timestamp math all faithfully ported from deleted stage.
nemo_curator/adapters/vad/whisperx.py WhisperXVADAdapter; min_length skip-short logic and VAD segment wrapping faithfully ported; clean implementation.
nemo_curator/stages/audio/tagging/split.py SplitASRAlignJoinStage.decompose() wired to new ForcedAlignmentStage + NeMoASRAlignAdapter; all original kwargs forwarded; split_batch_size is a pre-existing dead attribute.
nemo_curator/stages/audio/tagging/init.py Lazy-import map updated: drops old monolithic classes, adds new generic stages and adapters.
benchmarking/scripts/audio_tagging_benchmark.py Updated to use DiarizationStage and ForcedAlignmentStage; adapter_target and adapter_kwargs correctly wired.
tests/stages/audio/tagging/e2e/test_tts_e2e.py Stage-4 overrides updated to Tier-1 model_id and Tier-2 adapter_kwargs; OmegaConf attribute access works correctly for DictConfig.

Flowchart

%%{init: {'theme': 'neutral'}}%%
flowchart TD
    YAML["YAML Config\n(adapter_target, model_id,\nadapter_kwargs)"] --> DS
    YAML --> VS
    YAML --> FA

    subgraph DiarizationFamily["Speaker Diarization"]
        DS["DiarizationStage\n(stage.py)"] -->|"hydra.utils.get_class\n+ diarize_batch()"| PA["PyAnnoteDiarizationAdapter\n(pyannote.py)"]
    end

    subgraph VADFamily["Voice Activity Detection"]
        VS["VADStage\n(stage.py)"] -->|"hydra.utils.get_class\n+ detect_batch()"| WX["WhisperXVADAdapter\n(whisperx.py)"]
    end

    subgraph AlignFamily["Forced Alignment"]
        FA["ForcedAlignmentStage\n(stage.py)"] -->|"hydra.utils.get_class\n+ align_batch()"| NeMo["NeMoASRAlignAdapter\n(nemo_asr_align.py)"]
    end

    PA -->|"DiarSegment list"| DS
    WX -->|"VADInterval list"| VS
    NeMo -->|"AlignmentResult list"| FA

    DS -->|"task.data[segments_key]"| TaskData["AudioTask.data"]
    VS -->|"task.data[vad_segments]"| TaskData
    FA -->|"task.data[text/alignment]"| TaskData

    SplitJoin["SplitASRAlignJoinStage\n(split.py)"] -->|"decompose()"| Split["SplitLongAudioStage"]
    Split --> FA
    FA --> Join["JoinSplitAudioMetadataStage"]
Loading

Reviews (4): Last reviewed commit: "audio/tagging: add sample_input_kratos.j..." | Re-trigger Greptile

Comment on lines +230 to +232
"skipped_short": float(result.extras.get("skipped_short", 0.0))
if "skipped_short" in result.extras
else (1.0 if not intervals and result.extras.get("duration_s", 0.0) < 0 else 0.0),

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 skipped_short metric always emits 0

The fallback expression result.extras.get("duration_s", 0.0) < 0 can never be true — audio duration is never negative. As a result, when "skipped_short" is absent from result.extras (the normal case for WhisperXVADAdapter, which only stores "duration_s" in extras when it skips a short clip), this metric always logs 0.0 even though the item was actually skipped. The correct per-item skip signal is already available via self._adapter.last_metrics["skipped_short_total"] (emitted by the adapter for every batch), but the stage-level metrics["skipped_short"] key diverges from it.

cls = self._adapter_class()
kwargs = dict(self.adapter_kwargs)
# The stage owns model_id/revision (Tier-1); pass through.
kwargs.setdefault("model_id", self.model_id) if self.model_id else None

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Confusing ternary-as-statement

Using a ternary expression as a statement with else None is a non-idiomatic pattern that can mislead readers into thinking the setdefault call's return value matters. A plain if-block expresses the intent more clearly and avoids accidental confusion with the ternary form's return value.

Suggested change
kwargs.setdefault("model_id", self.model_id) if self.model_id else None
if self.model_id:
kwargs.setdefault("model_id", self.model_id)

Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!

Comment on lines +144 to +145
def outputs(self) -> tuple[list[str], list[str]]:
return ["data"], ["duration", self.segments_key, "split_filepaths", "split_metadata"]

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 outputs() does not declare the stage's actual written keys

The stage writes self.text_key, self.alignment_key, and self.words_key during processing, but these keys are absent from outputs(). Only duration, segments_key, split_filepaths, and split_metadata are declared. If the framework or any downstream validator enforces the I/O contract, the undeclared keys would silently not appear in the contract graph, which could hide missing-dependency bugs in pipeline composition.

Comment on lines +186 to +194
def test_missing_split_filepaths_passes_through(self) -> None:
s = ForcedAlignmentStage(adapter_target=_ADAPTER_TARGET)
s.setup()
# split_filepaths key absent -> meta entry mode; adapter still called with 0 items.
# Stage handles by emitting empty results -- nothing to scatter.
task = AudioTask(data={"split_metadata": []})
out = s.process_batch([task])
# No exception, no text written (no splits -> nothing to do).
assert "text" not in out[0].data

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Misleading comment — adapter is never called in this path

The comment on line 188 says "adapter still called with 0 items", but tracing the code path for a missing split_filepaths key shows the opposite: data.get("split_filepaths") returns None, the guard if not split_filepaths: fires, logs a warning, and continues — no items are ever added to all_paths, so if not all_paths: return exits before the adapter is touched. The assertion itself is correct; only the inline comment is inaccurate.

Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!

… split

Two pre-Kratos fixes that fell out of the b9f3c8b stage-adapter split:

- tests/stages/audio/tagging/e2e/test_tts_e2e.py: update stages[4]
  overrides to the new ForcedAlignmentStage shape. Tier-1 stage field
  is model_id (was model_name); is_fastconformer / decoder_type /
  transcribe_batch_size now live under adapter_kwargs.* per the SDP-V2
  Tier-1/Tier-2 split. The pre-split shape would break OmegaConf
  attribute assignment on the new dataclass.

- tutorials/audio/tagging/tts_pipeline.yaml: declare top-level
  data_config + output_dir as empty strings so Hydra struct-mode
  accepts the CLI overrides that NvLLMOps' non-native Curator runner
  (nvllmops/stages/harvest/curator/run_curator.py lines 234-238)
  injects unconditionally for every pipeline that isn't in
  _CURATOR_NATIVE_PIPELINES. Tagging is one such pipeline. Neither
  field is consumed by any stage; they exist purely to satisfy the
  Hydra struct-mode contract during pod invocation.

Both are zero-behavior changes for the audio pipeline itself; they
only un-wedge downstream callers (pytest e2e and NvLLMOps Kratos
wrapper). Mirrors PR1967's pattern of keeping post-refactor branches
runnable end-to-end without modifying NvLLMOps-side code.

Signed-off-by: aaftaabv@gmail.com <aaftaabv@gmail.com>
…-diff run

Bakes the exact post-override pipeline shape used to generate the
in-tree reference manifest at
tests/fixtures/audio/tagging/reference/tts/test_data_reference.jsonl
into a tutorial-layout Hydra YAML, so the NvLLMOps non-native Curator
runner can load it via --local-config-path and reproduce the reference
output byte-for-byte (within check_output()'s tolerance: text exact +
segment boundaries pytest.approx(rel=1e-3) + word timings abs=0.01).

The reference was produced by tests/stages/audio/tagging/e2e/
test_tts_e2e.py, which loads tests/.../e2e/configs/tts_pipeline.yaml
and then applies four runtime overrides on Stage 4 (ForcedAlignmentStage):

  model_id                        = nvidia/stt_en_fastconformer_ctc_large
  adapter_kwargs.is_fastconformer = true
  adapter_kwargs.decoder_type     = ctc
  adapter_kwargs.transcribe_batch_size = 1

This new YAML bakes those four into the YAML itself; everything else
(Stage 2 PyAnnote small batches, Stage 4 batch_size=1 num_workers=1,
stage list + ordering) mirrors the e2e test config verbatim.

Adds top-level data_config: \"\" + output_dir: \"\" shim per the
NvLLMOps non-native Hydra override surface (same reason as fae6b48).

Kept separate from tts_pipeline.yaml because that file targets
production-shape tagging (parakeet-tdt_ctc-1.1b RNN-T at batch=32,
default PyAnnote batches), which would NOT match the reference text
or timings even with identical input audio. One config per intent:
- tts_pipeline.yaml: production tagging, no in-tree reference
- tts_pipeline_correctness.yaml: Kratos validation against reference

Three configs total for the tagging pipeline now:
- tutorials/audio/tagging/tts_pipeline.yaml (production tagging)
- tutorials/audio/tagging/tts_pipeline_correctness.yaml (Kratos diff vs ref)
- tests/stages/audio/tagging/e2e/configs/tts_pipeline.yaml (pytest e2e
  in-process; runtime-overrides Stage 4 to match the same shape)

Signed-off-by: aaftaabv@gmail.com <aaftaabv@gmail.com>
… Kratos pre-pipeline splitter

Sibling of tests/fixtures/audio/tagging/sample_input.jsonl. The Kratos
non-native runner path calls NvLLMOps split_manifest_by_duration_field
which requires an actual_duration key on every manifest entry before
the Curator pipeline starts (KeyError otherwise). Pytest path is
unaffected — it ingests sample_input.jsonl directly and NeMo Curator
ignores unknown fields, so this file is Kratos-only fixture data.

Durations measured with soundfile.info:
  audio_1.opus: 60.000000 s (stereo, 48kHz, 2880000 frames)
  audio_2.opus: 67.073500 s (stereo, 48kHz, 3219528 frames)

Signed-off-by: aaftaabv@gmail.com <aaftaabv@gmail.com>

# ---- Required protocol fields ----
model_id: str = "pyannote/speaker-diarization-3.1"
revision: str | None = None

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we remove this param if not used anywhere in the code block.

@@ -0,0 +1,168 @@
# Copyright (c) 2026, NVIDIA CORPORATION. All rights reserved.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need a new file for tts tutorial. Can we have only one latest yaml ?

@@ -0,0 +1,2 @@
{"audio_filepath": "tests/fixtures/audio/tagging/audios/audio_1.opus", "audio_item_id": "audio_1", "actual_duration": 60.0}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's remove kratos from file name. Also, can't we reuse sample_input.jsonl directly ?

)
)
else:
segments.append(DiarSegment(start=start, end=end, speaker=speaker_id))

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also add adaptors for sortformer, silero vad ? or do you plan to address them in a separate PR ?

last_metrics: dict[str, float]

@classmethod
def prefetch_weights(cls, model_id: str, revision: str | None = None) -> None:

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this looks same as setup_on_node(), then should we reuse the same method name to avoid confusions ?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 I think using the setup_on_node name can help avoid confusion.

"""Release GPU memory and worker-local state."""
...

def detect_batch(self, items: list[dict[str, Any]]) -> list[VADResult]:

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here, can we name as process_batch()?

max_length: ${max_segment_length}
segmentation_batch_size: 2
embedding_batch_size: 2
adapter_target: nemo_curator.adapters.diarization.PyAnnoteDiarizationAdapter

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of writing the path to class name, can we use autoregistry and instantiate the coressponding class using the registered names ?

@sarahyurick sarahyurick left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the general proposal makes sense, thanks. I did a quick review for now and left some minor comments.

revision: str | None = None

# ---- PyAnnote-specific knobs ----
hf_token: str = ""

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit but can it be

Suggested change
hf_token: str = ""
hf_token: str | None = None

everywhere?

min_length: float = 0.5
max_length: float = 40.0
write_rttm: bool = True
random_seed: int | None = None

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason why not to choose one?

Suggested change
random_seed: int | None = None
random_seed: int = 42

last_metrics: dict[str, float]

@classmethod
def prefetch_weights(cls, model_id: str, revision: str | None = None) -> None:

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 I think using the setup_on_node name can help avoid confusion.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be an empty file.

name: str = "ForcedAlignment"

# ---- Tier 1: swap surface ----
adapter_target: str = ""

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove empty default so we don't need to check for it in the post init:

Suggested change
adapter_target: str = ""
adapter_target: str

name: str = "VAD"

# ---- Tier 1: swap surface ----
adapter_target: str = ""

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
adapter_target: str = ""
adapter_target: str

Same comment as above.

import numpy as np
import pytest

pytest.importorskip("nemo.collections.asr")

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would like to remove this in favor of using # modality: audio at the top of the file.

)


class TestConstruction:

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we avoid the class TestXYZ pattern?


import pytest

pytest.importorskip("pyannote.audio")

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment as above.

import numpy as np
import pytest

pytest.importorskip("whisperx")

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment as above.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants