Skip to content

Granary v2: Qwen-Omni in-process ASR (stage-adapter split + generic bucketed inference)#1967

Open
mohammadaaftabv wants to merge 41 commits into
NVIDIA-NeMo:mainfrom
mohammadaaftabv:aaftabv/granary-v2-qwen-omni-first-stage
Open

Granary v2: Qwen-Omni in-process ASR (stage-adapter split + generic bucketed inference)#1967
mohammadaaftabv wants to merge 41 commits into
NVIDIA-NeMo:mainfrom
mohammadaaftabv:aaftabv/granary-v2-qwen-omni-first-stage

Conversation

@mohammadaaftabv

@mohammadaaftabv mohammadaaftabv commented May 11, 2026

Copy link
Copy Markdown
Contributor

Summary

First-pass Qwen3-Omni in-process ASR tutorial and stage stack for NeMo Curator:

  • ASRStage + ASRAdapter split (adapter_target one-line model swap)
  • BucketedInferenceStage / BatchPolicy — generic cost-bucketed GPU inference (canonical stages/inference/)
  • QwenOmniASRAdapter on shared VLLMBase; two-turn inference via _pack_vllm_inputs / _infer_turn (strict=True scatter)
  • NeMo tarred reader + sharded manifest writer + perf_summary.json
  • Perf identity: backend-specific stamping in backends/perf_identity.py (build_xenna_perf_identity / build_ray_perf_identity on WorkerMetadata at setup); per-GPU scheduling breakdown (per_gpu, gpu_ids, pipeline_throughput)
  • Security: tutorial stage_with.resources allowlists Resources only (no open-ended hydra.instantiate); discovery YAML structure validation

Test plan

  • Unit tests: batch policy, bucketed stage, ASR adapter/stage, perf summary identity/per-GPU breakdown, backend perf identity (no cross-backend fallbacks), YAML/resources guards
  • Multi-GPU end-to-end run: work-done parity, throughput-neutral perf on shared intersection fields, output equivalence on manifest_*.jsonl (keyed on audio_filepath)

@mohammadaaftabv mohammadaaftabv requested a review from a team as a code owner May 11, 2026 12:49
@mohammadaaftabv mohammadaaftabv requested review from suiyoubi and removed request for a team May 11, 2026 12:49
@copy-pr-bot

copy-pr-bot Bot commented May 11, 2026

Copy link
Copy Markdown

This pull request requires additional validation before any workflows can run on NVIDIA's runners.

Pull request vetters can view their responsibilities here.

Contributors can view more details about this message here.

@greptile-apps

greptile-apps Bot commented May 11, 2026

Copy link
Copy Markdown
Contributor

Greptile Summary

This PR introduces Qwen3-Omni in-process ASR for the Granary v2 pipeline, adding an ASRStage/ASRAdapter split, a generic BucketedInferenceStage/BatchPolicy bucketed-inference layer, a QwenOmniASRAdapter on VLLMBase, NeMo tarred reader, sharded manifest writer, and per-backend perf-identity stamping. The vast majority of the issues flagged in the prior review round have been addressed in the current head.

  • ASRStage + adapter split: Stage owns Curator-side I/O, language resolution, chunking, and stitching; adapter owns model-side inference. The two-turn flow uses _infer_turn with strict=True scatter to surface engine-count mismatches loudly.
  • BucketedInferenceStage / BatchPolicy: Generic cost-bucketed GPU dispatch with a BucketQueueScheduler for persistent bucketing; bucketize_with_costs plans correctly sort heavier sub-batches first and reassemble results back to input order.
  • Checkpoint resume: Empty-shard .done markers are written by the reader, and the writer's teardown() is guarded to skip writing a zero-count perf_summary.json on a clean resume where no new tasks were processed.

Confidence Score: 5/5

Safe to merge; all blocking issues from prior review rounds have been addressed in the current head.

The current head resolves every previously-flagged correctness issue: optional imports are now guarded, revision is forwarded to both the vLLM engine and the processor, partial setup() failures trigger teardown() cleanly, strict=True is applied in _infer_turn, empty-shard .done markers are written by the reader, the writer teardown skips zero-count perf summary writes on resume, and YAML discovery is fully guarded against malformed structures. The only new findings are style-level inconsistencies (strict=False in two prepare-batch helpers versus the strict=True used everywhere else) and a maintainability note about build_items being shadowed by the process_batch override.

No files require blocking attention. nemo_curator/models/asr/qwen_omni.py has two minor strict=False zip calls worth tightening for consistency.

Important Files Changed

Filename Overview
nemo_curator/models/asr/qwen_omni.py New Qwen3-Omni vLLM adapter with two-turn inference; optional imports are correctly guarded, revision is forwarded to both LLM and processor, and setup() wraps both _init_engine and processor load in a single try/except with teardown on failure. Minor: _prepare_batch/_prepare_turn2_batch use strict=False in zip, inconsistent with the strict=True philosophy used in _infer_turn.
nemo_curator/stages/audio/inference/asr/stage.py ASRStage with pluggable adapter, prebucket chunk planning, and full stitch/assemble logic. process_batch override correctly handles prebucketed and plain paths. build_items is defined to satisfy the abstract method contract but is never called because process_batch is fully overridden — creates a maintenance trap if contributors modify it expecting it to be exercised.
nemo_curator/stages/audio/inference/batch_policy.py BucketQueueScheduler and BatchPolicy are well-implemented; flush semantics (overflow-first then enqueue, then ready-check) are correct; single over-cost items correctly get their own sub-batch; bucketize_with_costs sorts heavier batches first and run_bucketed correctly reassembles results by original index.
nemo_curator/stages/audio/io/nemo_tarred_reader.py Addresses all previously flagged issues: extractfile None guard, KeyError on missing filepath_key (now uses .get()), YAML structure validation via _iter_discovery_groups, bad manifest path caught per-entry with warning+continue, and empty-shard .done marker written directly by the reader.
nemo_curator/stages/audio/io/sharded_manifest_writer.py teardown() now guards perf_summary.json write behind (items_processed > 0 or total_utterances > 0) to skip zero-count overwrites on resume; setup_on_node preserves the final manifest when completed shard markers are already present; waveform/tensor keys are correctly dropped before JSON serialization.
nemo_curator/backends/perf_identity.py New backend-specific perf identity stamping; GPU index resolution via ray.get_gpu_ids() with CUDA_VISIBLE_DEVICES fallback is sound. _ray_node_label still implicitly returns None when get_node_id() is falsy without raising (the except branch returns empty string but the non-exceptional falsy path falls through), which was flagged in a prior review thread.
nemo_curator/backends/ray_data/executor.py _process_centralized_stage_dataset still calls dataset.take_all() twice (before and after processing), materializing the full audio dataset with waveform arrays into driver RAM — a pre-existing concern flagged in a prior review thread. The non-centralized path remains streaming via Ray Data transforms.
nemo_curator/stages/audio/inference/bucketed_stage.py Clean abstract base that wires build_items -> run_bucketed(run_inference) -> assemble; the generic dispatch is correct and the type parameters are appropriately unbounded for re-use by non-ASR stages.
tutorials/audio/qwen_omni_inprocess/main.py Addresses credential exposure via _safe_config_yaml/_redact_secret_values; Resources instantiation is allowlisted; _instantiate_configured_stages correctly pops stage_id/enabled/stage_with before passing raw config to hydra.utils.instantiate.

Sequence Diagram

sequenceDiagram
    participant E as Executor (Ray/Xenna)
    participant Disc as NemoTarShardDiscoveryStage
    participant Reader as NemoTarShardReaderStage
    participant ASR as ASRStage
    participant Adapter as QwenOmniASRAdapter
    participant Writer as ShardedManifestWriterStage

    E->>Disc: process(_EmptyTask)
    Disc-->>E: [FileGroupTask(manifest, tar)] x N shards

    loop per shard (fan-out)
        E->>Reader: process(FileGroupTask)
        Reader-->>E: [AudioTask(waveform, sample_rate)] x M utterances
    end

    E->>E: build_scheduled_task_batch_plan(ASRStage, tasks)
    note over E: BatchPolicy.bucketize_with_costs() groups chunks by duration bucket

    loop per bucket batch
        E->>ASR: process_batch([AudioTask])
        ASR->>ASR: _build_chunk_specs / _chunk_waveform
        ASR->>Adapter: transcribe_batch(items)
        Adapter->>Adapter: _prepare_batch (ThreadPool)
        Adapter->>Adapter: "_infer_turn (strict=True scatter)"
        alt followup_prompt set
            Adapter->>Adapter: _prepare_turn2_batch
            Adapter->>Adapter: _infer_turn Turn 2
        end
        Adapter-->>ASR: [ASRResult]
        ASR->>ASR: _stitch / assemble
        ASR-->>E: [AudioTask(pred_text)]
    end

    E->>Writer: process_batch([AudioTask])
    Writer->>Writer: _write_shard_group (append .jsonl)
    Writer->>Writer: write .jsonl.done when shard complete
    Writer->>Writer: _write_perf_summary() on each completion
Loading

Reviews (30): Last reviewed commit: "Cap ASR model dispatch for real audio ch..." | Re-trigger Greptile

Comment on lines +106 to +107
with open(out_path, "a", encoding="utf-8") as f:
f.write(json.dumps(task.data, ensure_ascii=False) + "\n")

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 json.dumps will crash when keep_waveform=True

task.data can contain a numpy ndarray (keyed by waveform_key) when the upstream InferenceQwenOmniStage is configured with keep_waveform: true. json.dumps has no numpy serializer and raises TypeError: Object of type ndarray is not JSON serializable, crashing the entire shard write for all tasks in that batch. Either strip the waveform here before serialising, or document that keep_waveform must be false when this writer is used (and add a validation guard in setup or __post_init__).

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for flagging. This is already addressed on the current head (936fa17f):

  • _manifest_data first drops keys named in drop_manifest_keys (defaults to ("waveform",)) so the configured waveform_key never reaches serialisation, regardless of keep_waveform.
  • Anything else with .shape and .dtype (numpy ndarrays, torch tensors, etc.) is dropped via a duck-typing guard before json.dumps is called.
  • The remaining json.dumps call is wrapped in try/except TypeError, so a previously-unseen non-serialisable value raises a focused TypeError with the offending key instead of crashing the shard.

Citation: nemo_curator/stages/audio/io/sharded_manifest_writer.py:96-111. Resolving as already-fixed.

Comment on lines +83 to +88
cfg = value if OmegaConf.is_config(value) else OmegaConf.create(value)
if "_target_" in cfg:
return hydra.utils.instantiate(cfg)
raw = OmegaConf.to_container(cfg, resolve=True)
return Resources(**raw)
msg = f"Invalid resources override: {value!r}"

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 security Credentials exposed in startup log

logger.info(f"Hydra config:\n{OmegaConf.to_yaml(cfg)}") prints the full resolved config, including any hf_token passed as a Hydra override. The credential ends up in every log sink (stdout, files, observability stacks) in plaintext. Consider redacting the hf_token field before logging — for example, by building a sanitised copy of the config dict — or logging only a subset of non-sensitive keys.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for flagging. This was addressed in commit 936fa17f:

  • The startup log call no longer uses raw OmegaConf.to_yaml(cfg). It now uses _safe_config_yaml(cfg) (tutorial main.py:346), which builds a redacted copy of the config before rendering to YAML.
  • _redact_secret_values walks the config recursively (main.py:170-179) and replaces values of any key matching _SECRET_KEY_NAMES (which explicitly includes hf_token, password, secret_key, token, credentials, …) or _SECRET_KEY_PARTS substrings with <redacted>.
  • Trailing-suffix matching also catches any custom secret named *_token, *_secret, or *_password.

Note: the regression test that covered this behaviour (tests/stages/audio/inference/test_qwen_omni_tutorial.py::test_safe_config_yaml_redacts_hf_token_but_keeps_token_counts) was removed in this revision per @sarahyurick's "we shouldn't need pytests for tutorials" comment. The helper code itself is unchanged.

Citations: tutorials/audio/qwen_omni_inprocess/main.py:60-72, :160-184, :346. Resolving as already-fixed.

completed.add(shard_key)
return completed

@staticmethod

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Unchecked None return from extractfile

tarfile.TarFile.extractfile returns None for members that are not regular files (hard links, directory entries embedded in some tar formats). The preceding tar_info.isfile() guard does not cover all cases where extractfile may return None — calling .read() on a None result would raise AttributeError. Add a None check before .read().

Comment thread nemo_curator/models/qwen_omni.py Outdated

def _get_prompt_text(self, language: str | None) -> str:
"""Return the EN-specific prompt for English, otherwise the default prompt."""
if language and language == "English" and self.en_prompt_text:

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 The leading language and is redundant: if language == "English" evaluates to True the string is already truthy, so the extra truthiness check is dead code and can mislead readers into thinking an empty-string case needs guarding here.

Suggested change
if language and language == "English" and self.en_prompt_text:
if language == "English" and self.en_prompt_text:

Comment thread nemo_curator/adapters/asr/qwen_omni.py Outdated
@mohammadaaftabv mohammadaaftabv requested a review from a team as a code owner May 11, 2026 16:56

@sarahyurick sarahyurick left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did an initial pass to start familiarizing myself with the PR for now. Left some minor comments.

Comment thread nemo_curator/models/qwen_omni.py Outdated
model_id: str = _QWEN3_OMNI_MODEL_ID,
prompt_text: str = "Transcribe the audio.",
en_prompt_text: str | None = None,
followup_prompt: str = "Now listen to the audio again and add any false starts, filler words and preserve colloquial words (like lemme, gonna, wanna, etc) as is spoken in the audio.",

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of having the long string here, let's create a script variable called _FOLLOWUP_PROMPT or similar.

Comment thread nemo_curator/models/qwen_omni.py Outdated
prefix_caching_hash_algo="xxhash",
)

from transformers import Qwen3OmniMoeProcessor

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Top-level import?

Comment thread nemo_curator/models/qwen_omni.py Outdated
self._sampling_params = None
gc.collect()
try:
import torch

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Top-level import?

Comment thread nemo_curator/models/qwen_omni.py Outdated
def _prepare_turn2_single(
self, waveform_16k: np.ndarray, pred_text: str, language: str | None = None,
) -> dict[str, Any] | None:
from qwen_omni_utils import process_mm_info

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Top-level import?

Comment thread nemo_curator/stages/audio/inference/__init__.py Outdated
"""

name: str = "sharded_manifest_writer"
output_dir: str = ""

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
output_dir: str = ""
output_dir: str

instead of checking for it in the post init.

from nemo_curator.stages.audio.inference.qwen_omni import InferenceQwenOmniStage
from nemo_curator.stages.audio.io.nemo_tarred_reader import NemoTarredAudioReader, NemoTarShardReaderStage
from nemo_curator.stages.audio.io.sharded_manifest_writer import ShardedManifestWriterStage
from tutorials.audio.qwen_omni_inprocess.main import (

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shouldn't need pytests for tutorials.

and prefetches common HuggingFace model attributes without hardcoding a
full Granary v2 post-processing graph in this entry point.
"""
from huggingface_hub import hf_hub_download, snapshot_download

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Top-level import?

Comment thread tutorials/audio/qwen_omni_inprocess/qwen_omni_inprocess.yaml
Comment on lines +35 to +39
If you do not have `uv`, use pip:

```bash
pip install -e ".[audio_cuda12]"
```

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should only encourage uv and not pip.

Comment thread nemo_curator/adapters/asr/qwen_omni.py Outdated
Comment on lines +24 to +25
from qwen_omni_utils import process_mm_info
from transformers import Qwen3OmniMoeProcessor

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Unconditional top-level imports break non-audio_cuda12 installs

qwen_omni_utils is only shipped with the audio_cuda12 optional extra, but from qwen_omni_utils import process_mm_info and from transformers import Qwen3OmniMoeProcessor are both imported at module level, outside any guard. On a standard Curator installation (including the Mac/ARM case the PR description explicitly claims will work), import nemo_curator.models.qwen_omni fails immediately with ImportError: No module named 'qwen_omni_utils'. The vllm import immediately below correctly uses try/except ImportError + VLLM_AVAILABLE; these two imports need the same treatment — either fold them into that same try block, or defer them into setup() where VLLM_AVAILABLE is already checked.

"""

name: str = "nemo_tar_shard_discovery"
yaml_path: str = ""

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
yaml_path: str = ""
yaml_path: str

This can be empty instead of checking it in the post init.

"""

name: str = "nemo_tarred_audio_reader"
yaml_path: str = ""

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
yaml_path: str = ""
yaml_path: str

Same comment as above.

Comment thread pyproject.toml
Comment thread pyproject.toml
Comment thread pyproject.toml Outdated
Comment thread pyproject.toml Outdated
Comment thread pyproject.toml Outdated
Comment thread pyproject.toml Outdated
Comment thread pyproject.toml Outdated
mohammadaaftabv added a commit to mohammadaaftabv/Curator that referenced this pull request May 26, 2026
NemoTarShardDiscoveryStage and NemoTarredAudioReader previously declared
yaml_path as `yaml_path: str = ""` and validated it via a manual empty-
string check in __post_init__. Per reviewer feedback, this is cleaner as
a required dataclass field: __init__ enforces the requirement directly,
removing the sentinel and the manual check.

Reordered yaml_path before defaulted fields in both dataclasses (Python
requires non-default fields first). NemoTarShardDiscoveryStage's
__post_init__ existed only for the empty-string check and is removed;
NemoTarredAudioReader's __post_init__ keeps super().__init__() and
self._stages wiring, only the check is removed.

All three construction sites (the test, hydra _target_ resolution, and
NemoTarredAudioReader's own __post_init__) already use keyword arguments,
so the required-field reorder is transparent. The observable difference
for a caller that forgets yaml_path is now TypeError from __init__
(before any object exists) instead of ValueError from __post_init__.

Addresses:
- NVIDIA-NeMo#1967 (comment)
- NVIDIA-NeMo#1967 (comment)

Signed-off-by: aaftaabv@gmail.com <aaftaabv@gmail.com>
mohammadaaftabv added a commit to mohammadaaftabv/Curator that referenced this pull request May 26, 2026
…1.0 pins

The audio_cuda12 PR previously added three [tool.uv] override-dependencies
entries (huggingface-hub==0.36.0, transformers==4.57.6, accelerate==1.12.0)
described as overriding qwen-asr's incompatible declared pins. Audit of the
resolved graph showed that rationale was inaccurate:

- qwen-asr 0.0.6 declares transformers==4.57.6 / accelerate==1.12.0 natively
  in its requires-dist, so no override is needed for those.
- The remaining transformers/accelerate constraints across the graph
  (nemo-toolkit ~=4.57.0, vllm >=4.56.0,<5, pyannote-audio >=4.48.3,
  whisperx >=4.48.0) all naturally permit 4.57.6 / 1.12.0.
- The hf-hub conflict (data-designer-engine >=1.0.1 vs whisperx <1.0.0)
  was already resolved on main by hf-hub>=0.34,<1.0; this PR's tightening
  to ==0.36.0 was redundant.

All three entries removed; the pre-existing hf-hub override restored to
its original range. Verified via `uv lock` that the resolved versions of
transformers, accelerate, huggingface-hub, qwen-asr, nagisa, soynlp, vllm,
torch, torchaudio, torchvision, torchcodec, nixl-cu12, xgrammar, fasttext,
data-designer-engine, pyannote-audio, whisperx, nemo-toolkit, qwen-omni-utils,
fsspec, numpy, protobuf, setuptools are byte-identical before and after
(591 packages resolved). This PR now adds zero new override-dependencies
entries, eliminating the cross-modality blast radius that was flagged.

Also softened the remaining hard `==` pins inside audio_common /
audio_cuda12:

- nagisa ==0.2.11 -> >=0.2.11,<0.3
  (8-year-old stable lib; qwen-asr still transitively pins 0.2.11)
- qwen-asr ==0.0.6 -> >=0.0.6,<0.1
  (only 0.0.6 exists today, but a future security-patch 0.0.7 auto-flows
  in if/when published; <0.1 caps speculative jumps)
- fasttext ==0.9.3 -> >=0.9.3,<0.10
  (10-year-old maintenance-mode lib; releases every 2-4 years)

The 7-line preamble comment above qwen-asr was trimmed to one line in
the same edit (reviewer asked for less verbose).

uv.lock is regenerated; all resolved package versions remain identical
to the pre-edit state.

Addresses:
- NVIDIA-NeMo#1967 (comment)
- NVIDIA-NeMo#1967 (comment)
- NVIDIA-NeMo#1967 (comment)
- NVIDIA-NeMo#1967 (comment)
- NVIDIA-NeMo#1967 (comment)

Signed-off-by: aaftaabv@gmail.com <aaftaabv@gmail.com>
Comment thread tutorials/audio/qwen_omni_inprocess/qwen_omni_inprocess.yaml Outdated
mohammadaaftabv added a commit to mohammadaaftabv/Curator that referenced this pull request May 27, 2026
NemoTarShardDiscoveryStage and NemoTarredAudioReader previously declared
yaml_path as `yaml_path: str = ""` and validated it via a manual empty-
string check in __post_init__. Per reviewer feedback, this is cleaner as
a required dataclass field: __init__ enforces the requirement directly,
removing the sentinel and the manual check.

Reordered yaml_path before defaulted fields in both dataclasses (Python
requires non-default fields first). NemoTarShardDiscoveryStage's
__post_init__ existed only for the empty-string check and is removed;
NemoTarredAudioReader's __post_init__ keeps super().__init__() and
self._stages wiring, only the check is removed.

All three construction sites (the test, hydra _target_ resolution, and
NemoTarredAudioReader's own __post_init__) already use keyword arguments,
so the required-field reorder is transparent. The observable difference
for a caller that forgets yaml_path is now TypeError from __init__
(before any object exists) instead of ValueError from __post_init__.

Addresses:
- NVIDIA-NeMo#1967 (comment)
- NVIDIA-NeMo#1967 (comment)

Signed-off-by: Aaftab V <aaftabv@nvidia.com>
mohammadaaftabv added a commit to mohammadaaftabv/Curator that referenced this pull request May 27, 2026
…1.0 pins

The audio_cuda12 PR previously added three [tool.uv] override-dependencies
entries (huggingface-hub==0.36.0, transformers==4.57.6, accelerate==1.12.0)
described as overriding qwen-asr's incompatible declared pins. Audit of the
resolved graph showed that rationale was inaccurate:

- qwen-asr 0.0.6 declares transformers==4.57.6 / accelerate==1.12.0 natively
  in its requires-dist, so no override is needed for those.
- The remaining transformers/accelerate constraints across the graph
  (nemo-toolkit ~=4.57.0, vllm >=4.56.0,<5, pyannote-audio >=4.48.3,
  whisperx >=4.48.0) all naturally permit 4.57.6 / 1.12.0.
- The hf-hub conflict (data-designer-engine >=1.0.1 vs whisperx <1.0.0)
  was already resolved on main by hf-hub>=0.34,<1.0; this PR's tightening
  to ==0.36.0 was redundant.

All three entries removed; the pre-existing hf-hub override restored to
its original range. Verified via `uv lock` that the resolved versions of
transformers, accelerate, huggingface-hub, qwen-asr, nagisa, soynlp, vllm,
torch, torchaudio, torchvision, torchcodec, nixl-cu12, xgrammar, fasttext,
data-designer-engine, pyannote-audio, whisperx, nemo-toolkit, qwen-omni-utils,
fsspec, numpy, protobuf, setuptools are byte-identical before and after
(591 packages resolved). This PR now adds zero new override-dependencies
entries, eliminating the cross-modality blast radius that was flagged.

Also softened the remaining hard `==` pins inside audio_common /
audio_cuda12:

- nagisa ==0.2.11 -> >=0.2.11,<0.3
  (8-year-old stable lib; qwen-asr still transitively pins 0.2.11)
- qwen-asr ==0.0.6 -> >=0.0.6,<0.1
  (only 0.0.6 exists today, but a future security-patch 0.0.7 auto-flows
  in if/when published; <0.1 caps speculative jumps)
- fasttext ==0.9.3 -> >=0.9.3,<0.10
  (10-year-old maintenance-mode lib; releases every 2-4 years)

The 7-line preamble comment above qwen-asr was trimmed to one line in
the same edit (reviewer asked for less verbose).

uv.lock is regenerated; all resolved package versions remain identical
to the pre-edit state.

Addresses:
- NVIDIA-NeMo#1967 (comment)
- NVIDIA-NeMo#1967 (comment)
- NVIDIA-NeMo#1967 (comment)
- NVIDIA-NeMo#1967 (comment)
- NVIDIA-NeMo#1967 (comment)

Signed-off-by: Aaftab V <aaftabv@nvidia.com>
mohammadaaftabv added a commit to mohammadaaftabv/Curator that referenced this pull request May 27, 2026
…scaling

Per Praateek's review feedback on tutorials/audio/qwen_omni_inprocess/
qwen_omni_inprocess.yaml (NVIDIA-NeMo#1967), controlled-throughput benchmarking is
not a tutorial concern. Remove the explicit worker-count overrides
(reader_num_workers, reader_num_workers_per_node, omni_num_workers)
from the tutorial config and the NemoTarredAudioReader /
NemoTarShardReaderStage / InferenceQwenOmniStage classes. The
ProcessingStage base class defaults (num_workers() -> None,
xenna_stage_spec() -> {}) let Xenna's autoscaler size each stage from
the cluster resources, which is the documented happy-path behaviour
of the audio pipelines.

Changes:
- tutorials/audio/qwen_omni_inprocess/qwen_omni_inprocess.yaml:
  remove the MULTI-NODE SCALING block (reader_num_workers,
  reader_num_workers_per_node, omni_num_workers) and the three
  Hydra interpolations that fed them into the reader and qwen-omni
  stages.
- nemo_curator/stages/audio/io/nemo_tarred_reader.py: drop
  reader_num_workers / reader_num_workers_per_node fields on the
  composite NemoTarredAudioReader; drop num_workers_override /
  num_workers_per_node fields, their __post_init__ validation, the
  num_workers() override, and the xenna_stage_spec() override on
  NemoTarShardReaderStage.
- nemo_curator/stages/audio/inference/qwen_omni.py: drop the
  num_workers_override field plus the num_workers() and
  xenna_stage_spec() overrides on InferenceQwenOmniStage. Remove
  the now-unused `Any` import.
- tutorials/audio/qwen_omni_inprocess/README.md: drop the
  reader_num_workers / reader_num_workers_per_node / omni_num_workers
  rows from the configuration-knob table.
- tests/stages/audio/inference/test_qwen_omni.py: remove
  test_worker_override_specs (no longer applicable).

Addresses:
- NVIDIA-NeMo#1967 (comment)

Signed-off-by: Aaftab V <aaftabv@nvidia.com>
@mohammadaaftabv mohammadaaftabv force-pushed the aaftabv/granary-v2-qwen-omni-first-stage branch from 05264b0 to 2c42888 Compare May 27, 2026 07:03
Signed-off-by: Aaftab V <aaftabv@nvidia.com>
Signed-off-by: Aaftab V <aaftabv@nvidia.com>
Signed-off-by: Aaftab V <aaftabv@nvidia.com>
Signed-off-by: Aaftab V <aaftabv@nvidia.com>
Signed-off-by: Aaftab V <aaftabv@nvidia.com>
- hoist followup_prompt default into _FOLLOWUP_PROMPT module constant
- move 6 lazy imports to module scope (torch, transformers.Qwen3OmniMoeProcessor,
  qwen_omni_utils.process_mm_info, huggingface_hub.{snapshot_download,
  hf_hub_download}, yaml); keep existing vllm try/except guard
- drop redundant `language and` short-circuit in _get_prompt_text
- guard tar.extractfile(...) against None before .read() in
  NemoTarShardReaderStage; add hard-link regression test
- make ShardedManifestWriterStage.output_dir a required field; drop empty-
  string post_init check
- add Apache/NVIDIA copyright headers to inference/__init__.py and
  qwen_omni_inprocess.yaml
- drop pip-fallback install block from tutorial README (uv-only)
- remove tests/stages/audio/inference/test_qwen_omni_tutorial.py per
  "no pytests for tutorials"
- retarget @patch decorators in test_qwen_omni.py to the use-site
  (nemo_curator.stages.audio.inference.qwen_omni.snapshot_download) so
  the patches still bind after the import hoist

Signed-off-by: Aaftab V <aaftabv@nvidia.com>
Add qwen-asr and its lazy-imported runtime companions to Curator's audio
extras so that the harvest.curator Docker image gets the full qwen-asr stack
via Curator's uv sync rather than via post-uv pip installs in NvLLMOps. This
honors the Algorithmic vs Data-Mover Dep Ownership Rule: algorithmic libraries
belong in Curator, NvLLMOps owns only data-mover clients.

audio_common gains the qwen-asr forced-aligner text-norm and audio-feature
companions that qwen-asr 0.0.6's qwen3_forced_aligner.py imports lazily:
nagisa==0.2.11, soynlp==0.0.493, pyarabic, opencc-python-reimplemented, and
nnAudio.

audio_cuda12 gains qwen-asr==0.0.6 itself for the Granary v2 Qwen-ASR
recovery stage, and fasttext==0.9.3 for the Granary v2 LID stage. fasttext
already lives in text_cpu but audio_cuda12 does not pull text_cpu, so the
declaration is duplicated here.

[tool.uv] override-dependencies replaces the broad huggingface-hub>=0.34,<1.0
override with three exact pins proven against qwen-asr by NvLLMOps commit
68f18e9b: transformers==4.57.6, accelerate==1.12.0, and
huggingface-hub==0.36.0. These force-override qwen-asr's declared
(incompatible) version pins so the resolver picks the proven-compatible
versions for the entire graph.

This change extends PR1967's scope from "first-stage Qwen-Omni inference
only" to also cover Granary v2 algorithmic-dep self-containment, so that
later Granary v2 PRs (Qwen-ASR recovery, text filtering, PnC, ITN, SED) can
rely on Curator's audio_cuda12 extra without further pip-after-uv overrides
in NvLLMOps.

Lock churn: +18 packages including qwen-asr 0.0.6, nagisa, soynlp, pyarabic,
opencc-python-reimplemented, nnaudio, and qwen-asr's gradio/flask transitive
demo deps. transformers/huggingface-hub/accelerate stayed at the
override-pinned versions, so no version drift for the qwen-omni stack.

Signed-off-by: Aaftab V <aaftabv@nvidia.com>
NemoTarShardDiscoveryStage and NemoTarredAudioReader previously declared
yaml_path as `yaml_path: str = ""` and validated it via a manual empty-
string check in __post_init__. Per reviewer feedback, this is cleaner as
a required dataclass field: __init__ enforces the requirement directly,
removing the sentinel and the manual check.

Reordered yaml_path before defaulted fields in both dataclasses (Python
requires non-default fields first). NemoTarShardDiscoveryStage's
__post_init__ existed only for the empty-string check and is removed;
NemoTarredAudioReader's __post_init__ keeps super().__init__() and
self._stages wiring, only the check is removed.

All three construction sites (the test, hydra _target_ resolution, and
NemoTarredAudioReader's own __post_init__) already use keyword arguments,
so the required-field reorder is transparent. The observable difference
for a caller that forgets yaml_path is now TypeError from __init__
(before any object exists) instead of ValueError from __post_init__.

Addresses:
- NVIDIA-NeMo#1967 (comment)
- NVIDIA-NeMo#1967 (comment)

Signed-off-by: Aaftab V <aaftabv@nvidia.com>
…1.0 pins

The audio_cuda12 PR previously added three [tool.uv] override-dependencies
entries (huggingface-hub==0.36.0, transformers==4.57.6, accelerate==1.12.0)
described as overriding qwen-asr's incompatible declared pins. Audit of the
resolved graph showed that rationale was inaccurate:

- qwen-asr 0.0.6 declares transformers==4.57.6 / accelerate==1.12.0 natively
  in its requires-dist, so no override is needed for those.
- The remaining transformers/accelerate constraints across the graph
  (nemo-toolkit ~=4.57.0, vllm >=4.56.0,<5, pyannote-audio >=4.48.3,
  whisperx >=4.48.0) all naturally permit 4.57.6 / 1.12.0.
- The hf-hub conflict (data-designer-engine >=1.0.1 vs whisperx <1.0.0)
  was already resolved on main by hf-hub>=0.34,<1.0; this PR's tightening
  to ==0.36.0 was redundant.

All three entries removed; the pre-existing hf-hub override restored to
its original range. Verified via `uv lock` that the resolved versions of
transformers, accelerate, huggingface-hub, qwen-asr, nagisa, soynlp, vllm,
torch, torchaudio, torchvision, torchcodec, nixl-cu12, xgrammar, fasttext,
data-designer-engine, pyannote-audio, whisperx, nemo-toolkit, qwen-omni-utils,
fsspec, numpy, protobuf, setuptools are byte-identical before and after
(591 packages resolved). This PR now adds zero new override-dependencies
entries, eliminating the cross-modality blast radius that was flagged.

Also softened the remaining hard `==` pins inside audio_common /
audio_cuda12:

- nagisa ==0.2.11 -> >=0.2.11,<0.3
  (8-year-old stable lib; qwen-asr still transitively pins 0.2.11)
- qwen-asr ==0.0.6 -> >=0.0.6,<0.1
  (only 0.0.6 exists today, but a future security-patch 0.0.7 auto-flows
  in if/when published; <0.1 caps speculative jumps)
- fasttext ==0.9.3 -> >=0.9.3,<0.10
  (10-year-old maintenance-mode lib; releases every 2-4 years)

The 7-line preamble comment above qwen-asr was trimmed to one line in
the same edit (reviewer asked for less verbose).

uv.lock is regenerated; all resolved package versions remain identical
to the pre-edit state.

Addresses:
- NVIDIA-NeMo#1967 (comment)
- NVIDIA-NeMo#1967 (comment)
- NVIDIA-NeMo#1967 (comment)
- NVIDIA-NeMo#1967 (comment)
- NVIDIA-NeMo#1967 (comment)

Signed-off-by: Aaftab V <aaftabv@nvidia.com>
…scaling

Per Praateek's review feedback on tutorials/audio/qwen_omni_inprocess/
qwen_omni_inprocess.yaml (NVIDIA-NeMo#1967), controlled-throughput benchmarking is
not a tutorial concern. Remove the explicit worker-count overrides
(reader_num_workers, reader_num_workers_per_node, omni_num_workers)
from the tutorial config and the NemoTarredAudioReader /
NemoTarShardReaderStage / InferenceQwenOmniStage classes. The
ProcessingStage base class defaults (num_workers() -> None,
xenna_stage_spec() -> {}) let Xenna's autoscaler size each stage from
the cluster resources, which is the documented happy-path behaviour
of the audio pipelines.

Changes:
- tutorials/audio/qwen_omni_inprocess/qwen_omni_inprocess.yaml:
  remove the MULTI-NODE SCALING block (reader_num_workers,
  reader_num_workers_per_node, omni_num_workers) and the three
  Hydra interpolations that fed them into the reader and qwen-omni
  stages.
- nemo_curator/stages/audio/io/nemo_tarred_reader.py: drop
  reader_num_workers / reader_num_workers_per_node fields on the
  composite NemoTarredAudioReader; drop num_workers_override /
  num_workers_per_node fields, their __post_init__ validation, the
  num_workers() override, and the xenna_stage_spec() override on
  NemoTarShardReaderStage.
- nemo_curator/stages/audio/inference/qwen_omni.py: drop the
  num_workers_override field plus the num_workers() and
  xenna_stage_spec() overrides on InferenceQwenOmniStage. Remove
  the now-unused `Any` import.
- tutorials/audio/qwen_omni_inprocess/README.md: drop the
  reader_num_workers / reader_num_workers_per_node / omni_num_workers
  rows from the configuration-knob table.
- tests/stages/audio/inference/test_qwen_omni.py: remove
  test_worker_override_specs (no longer applicable).

Addresses:
- NVIDIA-NeMo#1967 (comment)

Signed-off-by: Aaftab V <aaftabv@nvidia.com>
@mohammadaaftabv mohammadaaftabv force-pushed the aaftabv/granary-v2-qwen-omni-first-stage branch from 2c42888 to 1bc03a7 Compare May 27, 2026 07:08
Explain that BaseStageAdapter meters every CPU and GPU stage onto
task._stage_perf, ShardedManifestWriterStage (num_workers=1) serializes
{shard}_perf.jsonl and perf_summary.json, write_perf_stats toggle,
_log_metrics author guide, and CPU vs GPU published JSON differences.

Signed-off-by: Aaftab V <aaftabv@nvidia.com>
Comment on lines +210 to +258
def setup(self) -> None:
if self._llm is not None:
return
_require_audio_cuda12_stack(context="setup()")

tp_size = self.tensor_parallel_size or get_gpu_count()
logger.info(
f"Loading QwenOmni model={self.model_id} tp={tp_size} "
f"max_model_len={self.max_model_len} max_num_seqs={self.max_num_seqs}"
+ (f" revision={self.revision}" if self.revision is not None else "")
)

model_kwargs: dict[str, Any] = {
"model": self.model_id,
"trust_remote_code": True,
"gpu_memory_utilization": self.gpu_memory_utilization,
"tensor_parallel_size": tp_size,
"limit_mm_per_prompt": {"image": 1, "video": 1, "audio": int(self.limit_mm_per_prompt_audio)},
"max_num_seqs": self.max_num_seqs,
"max_model_len": self.max_model_len,
"seed": int(self.seed),
"enable_prefix_caching": bool(self.enable_prefix_caching),
"prefix_caching_hash_algo": str(self.prefix_caching_hash_algo),
}
if self.revision is not None:
model_kwargs["revision"] = self.revision

sampling_kwargs: dict[str, Any] = {
"temperature": self.temperature,
"top_k": self.top_k,
"max_tokens": self.max_output_tokens,
}

self._init_engine(model_kwargs, sampling_kwargs)

proc_kwargs: dict[str, Any] = {}
if self.revision is not None:
proc_kwargs["revision"] = self.revision
self._processor = Qwen3OmniMoeProcessor.from_pretrained(self.model_id, **proc_kwargs)

self._prep_pool = ThreadPoolExecutor(max_workers=self.prep_workers)

def teardown(self) -> None:
if self._prep_pool is not None:
self._prep_pool.shutdown(wait=False)
self._prep_pool = None
self._processor = None
self._cleanup_gpu()

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Partial initialisation leaves adapter silently broken on processor load failure

If _init_engine (line 243) succeeds but Qwen3OmniMoeProcessor.from_pretrained (line 248) raises (e.g., network error, wrong revision, disk full), self._llm is set to a live engine but self._processor stays None. Any later call to transcribe_batch_prepare_single_pack_vllm_inputs reaches self._processor.apply_chat_template(...), which raises AttributeError. Because _prepare_single wraps the entire body in except Exception, the AttributeError is silently caught and every waveform returns None. The batch is returned entirely as skipped with "empty_audio" — no error is surfaced, and because self._llm is not None on the next setup() call, the guard at line 211 short-circuits retry, making the broken state permanent until teardown() is called.

Resetting self._llm on processor-load failure (e.g., via self._cleanup_gpu() in a try/except around the from_pretrained call) would allow setup() to retry and surface the real error.

Replace internal milestone labels with public-facing perf terminology and
remove Kratos/NvLLMOps/harvest-specific validation wording from READMEs,
comments, and tests touched by PR NVIDIA-NeMo#1967.

Signed-off-by: Aaftab V <aaftabv@nvidia.com>
Move actor/node/gpu label resolution into backends/perf_identity.py with
separate Xenna (allocation-first) and Ray (get_gpu_ids only) builders.
Each adapter stamps WorkerMetadata at setup; BaseStageAdapter copies
those fields verbatim. Removes CUDA_VISIBLE_DEVICES and cross-backend
fallback chain from base.py.

Signed-off-by: Aaftab V <aaftabv@nvidia.com>
Split Ray label resolution into small helpers to avoid C901 complexity.

Signed-off-by: Aaftab V <aaftabv@nvidia.com>
Comment on lines +203 to +215
def teardown(self) -> None:
total = self._perf_summary.total_utterances
done = sum(
1 for k in self._perf_summary.shard_keys
if os.path.exists(os.path.join(self.output_dir, f"{k}.jsonl.done"))
)
logger.info(
f"ShardedManifestWriter: {total} utterances across "
f"{len(self._perf_summary.shard_keys)} shards, {done} completed with .done"
)

if self.write_perf_stats:
self._write_perf_summary()

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 teardown() overwrites perf_summary.json with zeros on checkpoint resume

On a checkpoint-resume run where NemoTarShardDiscoveryStage skips all completed shards, the writer actor processes 0 tasks (_writer_process_calls == 0). Its teardown() still calls _write_perf_summary(), which opens perf_summary.json in "w" mode and dumps an empty-accumulator summary (all zeros). This silently destroys the valid summary written by the previous run.

Add a guard to skip the write when this instance processed nothing:

Document Xenna vs Ray resolution paths via backends/perf_identity.py
without cross-backend fallback.

Signed-off-by: Aaftab V <aaftabv@nvidia.com>
Comment on lines +79 to +92
def setup_on_node(
self,
_node_info: NodeInfo | None = None,
_worker_metadata: WorkerMetadata | None = None,
) -> None:
os.makedirs(self.output_dir, exist_ok=True)
if self.final_manifest_path:
final_parent = os.path.dirname(self.final_manifest_path)
if final_parent:
os.makedirs(final_parent, exist_ok=True)
if os.path.exists(self.final_manifest_path):
os.remove(self.final_manifest_path)
self._perf_summary.reset_wall_timer()
logger.info(f"ShardedManifestWriterStage: output_dir={self.output_dir}")

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 final_manifest_path silently deleted on every actor start — incomplete manifest on checkpoint resume

setup_on_node() unconditionally removes final_manifest_path before any work begins (lines 89-90). On a checkpoint-resume run, NemoTarShardDiscoveryStage skips already-completed shards, so the writer actor only receives tasks for the remaining shards. Because the file was cleared at startup, the resumed manifest contains only the newly processed shards — the records from the previous run are gone with no warning. A user inspecting final_manifest_path after the second run sees a silently incomplete manifest.

A simple fix is to skip the delete when the file already exists and completed-shard markers are found in output_dir (i.e., the caller is resuming). Alternatively, a constructor-level flag such as truncate_on_start: bool = True could let the operator opt out of clearing on resume.

Keep gpu_id as the within-run per_gpu join key while stamping
physical_address, pod_ip, hostname, gpu_indices, and gpu_uuids at
worker setup for cluster-debuggable perf summaries.

Signed-off-by: Aaftab V <aaftabv@nvidia.com>
Comment on lines +447 to +480
@staticmethod
def _first_output_text(output: Any) -> str: # noqa: ANN401
sequences = getattr(output, "outputs", None) or []
if not sequences:
return ""
return (getattr(sequences[0], "text", "") or "").strip()

def _infer_turn(
self,
inputs: list[dict[str, Any]],
indices: list[int],
n: int,
) -> tuple[list[str], float, float]:
"""Run one vLLM turn and scatter its texts back to input order.

Turn-1 and Turn-2 share this generate -> count-tokens -> scatter
sequence; they differ only in the prompts already baked into
``inputs`` and the output list each fills (Turn-1 -> ``pred_texts``,
Turn-2 -> ``disfluency_texts``). ``indices[k]`` is the position in
the length-``n`` batch that ``inputs[k]`` came from.

Returns ``(texts_of_len_n, generation_time_s, output_token_count)``.
"""
t0 = time.perf_counter()
outputs = self._generate(inputs)
generation_time_s = time.perf_counter() - t0
output_tokens = self._count_output_tokens(outputs)
texts: list[str] = [""] * n
# strict=True: vLLM must return exactly one output per input, in order.
# A count mismatch means a broken engine contract - fail loud here
# instead of silently emitting empty transcriptions with skipped=False.
for idx, out in zip(indices, outputs, strict=True):
texts[idx] = self._first_output_text(out)
return texts, generation_time_s, output_tokens

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 vLLM empty-output samples not marked as skipped

_first_output_text returns "" when out.outputs is empty (scheduler rejection, context length exceeded, etc.), placing "" into texts[idx]. However, skipped_indices is only populated with preprocessing failures — vLLM-side empty outputs are never added to it. The final ASRResult for such a sample has text="" and skipped=False, so the writer records it as a valid empty transcription rather than a failed inference. Downstream training pipelines cannot distinguish this from a genuinely silent utterance.

A check like if not sequences or not texts[idx]: skipped_from_inference.add(idx) after the loop, then unioning that set into skipped_indices before returning, would close the gap.

Clarify WorkerPerfIdentity scheduling vs cluster-location metadata
and the new per_gpu fields (physical_address, pod_ip, gpu_indices).

Signed-off-by: Aaftab V <aaftabv@nvidia.com>
Comment thread nemo_curator/adapters/asr/base.py Outdated
extras: dict[str, Any] = field(default_factory=dict)


@runtime_checkable

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am also not sure why this is needed?

If it's really needed, I think the location is a bit confusing. Seems like this whole adapters folder is for model inference. Can we move it under models folder?

# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly, this file and the asr folder shouldn't be here IMO.


# Get runtime context for worker metadata
node_info, worker_metadata = get_worker_metadata_and_node_id()
requires_gpu = bool(getattr(getattr(stage, "resources", None), "requires_gpu", False))

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the reason of this change?

And, if a backend code is updated, we need to make sure all the benchmarks are also tested. So, github will run the unit tests but it's not enough unfortunately.

We need the benchmark run and comparison with the existing one.

"""
# Convert Xenna's types to our generic types (simplified)
generic_node_info = NodeInfo(node_id=node_info.node_id)
requires_gpu = bool(getattr(getattr(self.processing_stage, "resources", None), "requires_gpu", False))

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly here.

If a backend code is updated, we need to make sure all the benchmarks are also tested. So, github will run the unit tests but it's not enough unfortunately.

We need the benchmark run and comparison with the existing one.

super().__init__(stage)
self.setup_done = False
node_info, worker_metadata = get_worker_metadata_and_node_id()
requires_gpu = bool(getattr(getattr(stage, "resources", None), "requires_gpu", False))

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly here.

If a backend code is updated, we need to make sure all the benchmarks are also tested. So, github will run the unit tests but it's not enough unfortunately.

We need the benchmark run and comparison with the existing one.



class VLLMModel(ModelInterface):
class VLLMBase:

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really this VLLM Base class?

If you are adding this because some of the vllm models need specific functions based on the domain/modality, then it makes sense. But in this case, why don't we use the VLLMModel as base?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes — this is exactly the "modality-specific functions" case you mention, so the split is intentional rather than incidental.

VLLMBase (vllm_model.py L49) is interface-free engine plumbing only: _init_engine, _generate, _cleanup_gpu. Two different consumers sit on top:

  • VLLMModel(VLLMBase, ModelInterface) (L112) — the generic text path; implements the ModelInterface contract (model_id_names, text-in/text-out generate).
  • QwenOmniASRAdapter(VLLMBase) — the audio path. It feeds vLLM multimodal prompt dicts and reads back raw RequestOutput objects for token-level metadata, neither of which fits ModelInterface.generate's text-only signature. It conforms to the ASRAdapter protocol, not ModelInterface.

If the adapter subclassed VLLMModel, it would inherit a ModelInterface surface it can't honor and would have to stub/override. Keeping VLLMBase as the shared engine layer lets both the text model and the audio adapter reuse identical engine/generation/cleanup code without a contract mismatch. The VLLMBase docstring notes this; happy to expand it if helpful.



@dataclass
class BatchPolicy:

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These inference classes seem like more specific to Audio. Can you move them under audio stage folder?

Comment thread nemo_curator/models/vllm_model.py Outdated
del self._llm
self._llm = None
self._sampling_params = None
if dist.is_initialized():

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please check this?

This destroys the default global process group unconditionally. If any other component in the same process uses torch.distributed (e.g., another stage, Ray's distributed primitives), this will corrupt their state.

…tches

Ray Data delivers task batches as numpy ndarrays; truthiness on ndarrays
raises ValueError in BucketedInferenceStage. Coerce to list at the Ray
adapter boundary and use len(tasks)==0 in the shared bucketed base.

Signed-off-by: Aaftab V <aaftabv@nvidia.com>
… fixes

- ASRStage: Tier-1 xenna_num_workers / xenna_num_workers_per_node pin
  (mutually exclusive) to skip the autoscale cold-start ramp on the
  expensive GPU stage; surfaced via num_workers() (Ray Data) and
  xenna_stage_spec() (Xenna).
- NemoTarShardReaderStage: pin to 1 worker (num_workers + IS_ACTOR_STAGE
  + per-node Xenna spec) for bounded memory under keep_waveform.
- BatchPolicy: default max_audio_sec_per_batch 480 -> 2400 to match the
  Qwen-Omni docstring buckets.
- qwen_omni tutorial: drop driver-side HF_TOKEN/prefetch (does not reach
  remote workers); rename omni_num_workers* -> asr_num_workers* (Tier-1,
  adapter-agnostic) with asr_num_workers_per_node: 2.
- docs: "pin the GPU stage, autoscale the cheap stages" guidance plus
  adapter-swap GPU math.

Signed-off-by: Aaftab V <aaftabv@nvidia.com>

# ------------------------------------------------------------------
# Adapter lifecycle
# ------------------------------------------------------------------

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we remove the Claude comments everywhere? I think they are making a very long PR even longer.


# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another example.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should have tests for tutorials. If we really want to test that the yaml logic works then I think it should be moved under https://github.com/NVIDIA-NeMo/Curator/tree/main/nemo_curator/config and have the tutorial use it.

…ng, perf per-actor/per-GPU, reader/writer hardening

Adapters + inference co-location (review NVIDIA-NeMo#3/NVIDIA-NeMo#25):
- Move nemo_curator/adapters/asr/* -> stages/audio/inference/asr/adapters/*
- Move stages/inference/{batch_policy,bucketed_stage}.py -> stages/audio/inference/
- Update all imports, YAML adapter_target / batch_policy._target_, READMEs, tests
  (tests mirror the new package paths).

vLLM teardown (review NVIDIA-NeMo#15):
- _cleanup_gpu() no longer calls torch.distributed.destroy_process_group(); vLLM
  owns its TP group, so tearing down the global PG could corrupt other components.

Tutorial backend (review NVIDIA-NeMo#30):
- Default backend -> ray_data (YAML + main.py); trim the "Choosing a Backend" section.
- Drop tutorial test tests/tutorials/.../test_main_resources.py (review NVIDIA-NeMo#28).

Performance reporting:
- Unify metrics into a single per_actor block keyed by actor_id; GPU actors embed
  physical_address (host_ip:indices, now the canonical per-GPU key), gpu_indices/
  gpu_uuids, and per-GPU utilization percentiles (nested gpus map).
- Add first-class StagePerfStats.invocation_id for exact dedup; __add__ preserves
  identity only within the same worker.
- Add inference_compute_fraction (inference time / total process time).
- New GpuUtilSampler (NVML, bounded deque + lock); harden _collect_gpu_uuids to map
  physical indices -> visible ordinals under CUDA_VISIBLE_DEVICES.
- Remove per-shard <output_dir>/<shard_key>_perf.jsonl output.

Reader (nemo_tarred_reader):
- Collision-safe _ManifestIndex (exact path, then longest path-suffix; ambiguous
  basenames resolve to no-match).
- Enforce max_duration_s against decoded audio duration; narrow decode except to
  audio errors; write .done for empty/fully-filtered shards.

Writer (sharded_manifest_writer):
- Batch manifest writes per shard instead of per-utterance open/close.

Misc:
- Remove redundant super().setup_on_node() in RayActorPoolStageAdapter.__init__
  (executor already runs once-per-node setup).
- Fix stale docstring (disfluency_text_key); drop bucketize redundancy; trim
  adapter duplicate volume math; reduce comment verbosity across PR files.

Signed-off-by: Aaftab V <aaftabv@nvidia.com>
Signed-off-by: Aaftab V <aaftabv@nvidia.com>
Comment on lines +113 to 130
def _process_centralized_stage_dataset(self, stage: "ProcessingStage", dataset: Dataset) -> Dataset:
"""Run a centralized stage as a Ray Data scheduler-ready stream."""
parent_tasks = self._dataset_to_tasks(dataset)
plan = build_scheduled_task_batch_plan(stage, parent_tasks)
if plan is None:
return RayDataStageAdapter(stage).process_dataset(dataset, self.ignore_head_node)

scheduler_dataset = self._scheduler_ready_batches_to_dataset(plan.ready_batches)
processed_dataset = RayDataStageAdapter(stage).process_scheduler_ready_dataset(
scheduler_dataset,
self.ignore_head_node,
)
processed_tasks = self._dataset_to_tasks(processed_dataset)
return self._tasks_to_dataset(assemble_scheduled_task_batch_results(stage, plan, processed_tasks))

def _tasks_to_dataset(self, tasks: list[Task]) -> Dataset:
"""Convert list of tasks to Ray Data dataset.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Full dataset materialisation into driver RAM on centralized-batching path

_process_centralized_stage_dataset calls self._dataset_to_tasks(dataset) (which calls dataset.take_all()) before building the bucketing plan. For the ASRStage + NemoTarShardReaderStage combination, the upstream dataset contains AudioTask objects each carrying a float32 numpy waveform array. A modest 100 K-utterance run at 10 s / utterance (16 kHz × float32 ≈ 640 KB each) pulls ~64 GB into the Ray driver process before any GPU inference begins. This silently OOMs the driver whenever backend: ray_data is used with batch_policy.enabled: true (the default ASR configuration), since stage_uses_centralized_batching returns True for ASRStage in that case.

A second _dataset_to_tasks call materialises the post-inference results dataset before assembly. The Xenna and Ray Actor Pool executors process data in bounded batches (typically 32 tasks) and are not affected; only the Ray Data path has this issue.

Add a raw-manifest Qwen-Omni tutorial that reuses the generic Hydra runner
with a raw-audio stage graph:

ManifestReader -> ResampleAudioStage -> MonoConversionStage -> ASRStage ->
ShardedManifestWriterStage.

Stamp shard metadata in ManifestReaderStage so raw JSONL input shards can
complete through ShardedManifestWriterStage and rebuild final manifests.
Add tests covering raw reader shard keys and totals.

Signed-off-by: Aaftab V <aaftabv@nvidia.com>
Emit 1D numpy waveforms from MonoConversionStage when explicitly requested,
then opt the raw Qwen pipeline into that format so it matches the working
NeMo tar shard reader contract without changing Qwen Omni or ASR inference code.

Signed-off-by: Aaftab V <aaftabv@nvidia.com>
Comment on lines +212 to +218
def _ray_node_label(ctx: object) -> str:
try:
node_hex = getattr(ctx, "get_node_id", lambda: "")()
if node_hex:
return f"node-{str(node_hex)[:8]}"
except Exception: # noqa: BLE001
return ""

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 _ray_node_label implicitly returns None when get_node_id() is falsy

When get_node_id() returns a falsy value (empty string, None, etc.) without raising, execution exits the try block without hitting any return statement, so Python returns None implicitly. The caller then passes that None as node_label into WorkerPerfIdentity(node_id=node_label, ...), which stamps None into the node_id: str = "" field. Downstream, stamp_worker_metadataapply_worker_perf_identity copies it to StagePerfStats.node_id, and the final perf_summary.json ends up with "node_id": null instead of "node_id": "", breaking the expected schema for any analytics consumer.

Fix: add return "" after the try block.

Signed-off-by: Aaftab V <aaftabv@nvidia.com>
@mohammadaaftabv mohammadaaftabv force-pushed the aaftabv/granary-v2-qwen-omni-first-stage branch from 8447373 to bf56765 Compare June 13, 2026 19:32
Signed-off-by: Aaftab V <aaftabv@nvidia.com>
Signed-off-by: Aaftab V <aaftabv@nvidia.com>

@sarahyurick sarahyurick left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @mohammadaaftabv I am worried about this PR being too large now. Since there are a lot of logical changes going on, would it make sense to split it up? For example, backend changes vs ASR vs batch policy vs new reader/writer vs metrics/utils vs tutorials? Or some other variation of this?

Basically whichever changes could exist as their own PR and still be functional, we should consider doing that. WDYT?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants