Skip to content

Add Omni-Fuse Tutorial#2069

Open
hk1510 wants to merge 8 commits into
NVIDIA-NeMo:mainfrom
hk1510:main
Open

Add Omni-Fuse Tutorial#2069
hk1510 wants to merge 8 commits into
NVIDIA-NeMo:mainfrom
hk1510:main

Conversation

@hk1510

@hk1510 hk1510 commented Jun 11, 2026

Copy link
Copy Markdown

Description

This PR adds a tutorial for Omni-Fuse. It follows a similar structure to existing tutorials and builds a curator pipeline to perform retrieval-based data curation for multimodal datasets.

Usage

Instructions to run the tutorial can be found in the README.md file. Open to feedback/adding additional instructions.

Checklist

  • I am familiar with the Contributing Guide.
  • New or Existing tests cover these changes.
  • The documentation is up to date with these changes.

hk1510 added 2 commits June 11, 2026 19:02
Signed-off-by: Harshil Kotamreddy <hkotamreddy@nvidia.com>
…modal_hybrid_real.yaml

Signed-off-by: Harshil Kotamreddy <hkotamreddy@nvidia.com>
@hk1510 hk1510 requested review from a team as code owners June 11, 2026 19:11
@hk1510 hk1510 requested review from abhinavg4 and removed request for a team June 11, 2026 19:11
@copy-pr-bot

copy-pr-bot Bot commented Jun 11, 2026

Copy link
Copy Markdown

This pull request requires additional validation before any workflows can run on NVIDIA's runners.

Pull request vetters can view their responsibilities here.

Contributors can view more details about this message here.

@arhamm1 arhamm1 requested review from suiyoubi and removed request for abhinavg4 June 11, 2026 19:13
@greptile-apps

greptile-apps Bot commented Jun 11, 2026

Copy link
Copy Markdown
Contributor

Greptile Summary

This PR adds a complete end-to-end tutorial for the Omni-Fuse multimodal data curation pipeline, including five numbered pipeline scripts, a NeMo Curator integration layer, and all supporting modules for SNS, EEE, projection training, and datablend ranking. Several issues identified in earlier review rounds (missing imports, duplicate spacy dependency, broken task_id constructor arg, README directory error) have been addressed in follow-up commits.

  • Pipeline scripts (1_sns.py4_datablend.py) are thin wrappers around utils.py helpers that build and run individual NeMo Curator stages; each step can be run independently, with outputs serialised to disk for the next step.
  • omnifuse_tutorial/data/io.py ships a custom write_npy writer that uses 16-byte header alignment instead of the numpy-spec-required 64 bytes — functionally fine for default np.load() calls but worth correcting for long-term spec compliance.
  • Config from_dict methods (EEEConfig, SNSConfig, ProjectionConfig) pass the entire YAML dict directly to the dataclass constructor, so any unexpected user-added key produces a cryptic TypeError at load time rather than a descriptive config error.

Confidence Score: 5/5

Safe to merge — all previously-blocking issues have been addressed in follow-up commits; the remaining findings are non-functional spec compliance and robustness nits.

All five pipeline scripts, the NeMo Curator integration layer, and every supporting module are present and internally consistent. The data flow from manifest loading through SNS, EEE, projection, and datablend is coherent. Prior review rounds resolved missing imports, broken constructors, and README instructions. The two remaining items (numpy header alignment and config key validation) do not affect any code path that runs during normal tutorial execution.

omnifuse_tutorial/data/io.py (write_npy header alignment) and omnifuse_tutorial/config/models.py (from_dict key validation) are worth a second look before any non-tutorial reuse.

Important Files Changed

Filename Overview
tutorials/multimodal/omni-fuse-data-curation/omnifuse_tutorial/compat/curator.py Compatibility shim around NeMo Curator APIs; previously-flagged task_id and ProcessingStage/Resources issues have been addressed in recent commits.
tutorials/multimodal/omni-fuse-data-curation/omnifuse_tutorial/data/io.py Custom stdlib I/O helpers including a hand-rolled write_npy that uses 16-byte alignment instead of the numpy-spec-required 64-byte alignment; files load correctly with np.load() in default (non-mmap) mode but are not fully spec-compliant.
tutorials/multimodal/omni-fuse-data-curation/omnifuse_tutorial/config/models.py Config dataclasses with from_dict class methods; several of these pass the full YAML dict directly to the dataclass constructor, meaning any unexpected YAML key triggers a cryptic TypeError instead of a descriptive config error.
tutorials/multimodal/omni-fuse-data-curation/omnifuse_tutorial/eee/backends.py EEE backend implementations (local, NVIDIA API, hybrid); previously-flagged audio format and _path_or_none bugs are addressed; retry/polling logic is reasonable.
tutorials/multimodal/omni-fuse-data-curation/utils.py Shared pipeline helpers; previously-flagged task_id signature mismatch in make_document_batch calls is resolved by the updated compat/curator.py signature.
tutorials/multimodal/omni-fuse-data-curation/pyproject.toml Dependency manifest; previously-flagged duplicate spacy entries fixed; en-core-web-sm is pinned to 3.8.0 via a uv URL source but spacy allows up to <4, leaving a latent compatibility window if spacy 3.9+ is ever resolved.

Flowchart

%%{init: {'theme': 'neutral'}}%%
flowchart TD
    A[["0_validate_inputs.py\n(validate config & data pools)"]]
    B[["1_sns.py\n(Semantic Noise Suppression)"]]
    C[["2_embed.py\n(Expert Embedding Engine)"]]
    D[["3_project.py\n(Projection Training)"]]
    E[["4_datablend.py\n(Datablend Ranking)"]]

    subgraph PipelineStages["NeMo Curator Pipeline Stages"]
        S1["PairManifestReaderStage\n(load raw/annotation pairs)"]
        S2["SNSStage\n(suppress noisy samples)"]
        S3["EEEEmbeddingStage\n(text-based / fusion / e2e experts)"]
        S4["ProjectionTrainingStage\n(linear or torch MLP)"]
        S5["DatablendRankingStage\n(cosine rank vs query)"]
    end

    A --> B --> C --> D --> E
    S1 -->|records + pair_ids| S2
    S2 -->|sns_annotation, sns_raw_text| S3
    S3 -->|EmbeddingBundle in metadata| S4
    S4 -->|ProjectionResult in metadata| S5
    S5 -->|ranked.jsonl + topk.jsonl| OUT[("Run output directory")]

    subgraph Backends["EEE Backends"]
        API["NvidiaApiEEEBackend\n(text-based expert)"]
        LOC["LocalEEEBackend\n(LanguageBind / OmniEmbed)"]
        HYB["HybridEEEBackend\n(API text + local fusion/e2e)"]
    end

    S3 --> Backends
    B --> Backends
Loading

Reviews (5): Last reviewed commit: "Revert "remove task_id from DocumentBatc..." | Re-trigger Greptile

Comment on lines +1 to +69
# Copyright (c) 2026, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Small compatibility layer around NeMo Curator APIs."""

from __future__ import annotations

import copy
from typing import Any

import pandas as pd
from nemo_curator.pipeline import Pipeline as CuratorPipeline
from nemo_curator.tasks import DocumentBatch, EmptyTask


def records_from_task(task: Any) -> list[dict[str, Any]]:
"""Return task data as records from a Curator task."""

data = task.data
if hasattr(data, "to_dict"):
try:
return [dict(row) for row in data.to_dict(orient="records")]
except TypeError:
return [dict(row) for row in data.to_dict("records")]
if isinstance(data, list):
return [dict(row) for row in data]
raise TypeError(f"Unsupported task data type: {type(data)!r}")


def make_document_batch(
task_id: str,
dataset_name: str,
records: list[dict[str, Any]],
metadata: dict[str, Any] | None = None,
stage_perf: list[Any] | None = None,
) -> DocumentBatch:
"""Construct a NeMo Curator DocumentBatch."""

return DocumentBatch(
task_id=task_id,
dataset_name=dataset_name,
data=pd.DataFrame.from_records(records),
_metadata=metadata or {},
_stage_perf=stage_perf or [],
)


def make_empty_task() -> EmptyTask:
if callable(EmptyTask):
try:
return EmptyTask(task_id="empty", dataset_name="omnifuse", data=None)
except TypeError:
return EmptyTask()
return copy.deepcopy(EmptyTask)


def make_curator_pipeline(name: str, stages: list[Any], description: str | None = None) -> CuratorPipeline:
return CuratorPipeline(name=name, description=description, stages=stages)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P0 Missing ProcessingStage, Resources, and re-export of EmptyTask

Every stage file (stages/eee.py, stages/sns.py, stages/projection.py, stages/datablend.py, stages/reader.py) imports ProcessingStage and Resources from this module. Neither is defined here or re-exported from it. stages/reader.py also imports EmptyTask directly. All five stage modules will fail immediately with ImportError when loaded, making the entire pipeline non-runnable.

These classes need to be either imported from nemo_curator and re-exported, or implemented as shim classes within this compat module.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed imports in b8fc71b


from omnifuse_tutorial.compat.curator import make_curator_pipeline, make_empty_task
from omnifuse_tutorial.config.models import ExperimentConfig
from omnifuse_tutorial.data.io import write_json

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P0 Missing omnifuse_tutorial/data/ package

omnifuse_tutorial/data/io.py and omnifuse_tutorial/data/loader.py are imported by at least eight files — pipeline.py, utils.py, stages/eee.py, stages/projection.py, stages/datablend.py, stages/reader.py, sns/backends.py, and projection/trainer.py — but the entire omnifuse_tutorial/data/ directory does not exist in the repository. Every entry point to the tutorial will fail at import time with ModuleNotFoundError: No module named 'omnifuse_tutorial.data'.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added in recent commit. Was .gitignored previously.

Comment on lines +492 to +498
def _audio_format(path: Path) -> str:
audio_format = path.suffix.lower().lstrip(".")
if audio_format == "mpeg":
audio_format = "mp3"
if audio_format not in {"wav", "mp3"}:
raise ValueError(f"NVIDIA audio descriptions support wav/mp3 only, got {path.suffix} for {path}")
return audio_format

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 _audio_format raises for extensions declared as supported

AUDIO_EXTENSIONS and MIME_TYPES both include .flac, .m4a, .aac, and .ogg, advertising them as valid inputs. But _audio_format raises ValueError for any extension other than .wav and .mp3. The API description path in NvidiaApiEEEBackend._describe_raw_describe_filedescribe_file_with_nvidia_api calls this function unconditionally for audio records, so a user who provides a .flac or .m4a file will receive a confusing ValueError rather than a clear unsupported-format message.

Reconcile by either removing the unsupported extensions from AUDIO_EXTENSIONS/MIME_TYPES, converting to a supported format before sending, or raising a clear error with guidance early in the audio code path.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in b8fc71b

"pandas>=2.0",
"pyyaml>=6.0",
"requests>=2.31",
"spacy>=3.8.14",

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Duplicate spacy constraint — the same package appears twice with conflicting lower bounds (>=3.8.14 on line 17 and >=3.8,<4 on line 35). Keep a single, unambiguous entry.

Suggested change
"spacy>=3.8.14",
"spacy>=3.8.14,<4",

Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 367d7e9

"qwen-vl-utils>=0.0.8",
"sentence-transformers>=2.2,<4",
"soundfile>=0.12",
"spacy>=3.8,<4",

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Remove the second duplicate spacy entry.

Suggested change
"spacy>=3.8,<4",
# spacy consolidated above as spacy>=3.8.14,<4

Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 367d7e9

path = Path(value)
except OSError:
return None
return path if path.exists() else path

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 _path_or_none always returns a Path, never Nonereturn path if path.exists() else path is a tautology; both branches return the same value. The | None return type annotation is misleading. Either return None when the path does not exist, or simplify to return path.

Suggested change
return path if path.exists() else path
return path

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 367d7e9

Comment on lines +20 to +22

```bash
uv sync --extra dev

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Incorrect setup directory in README

The instruction says to run uv sync --extra dev "from the Curator repository root", but uv sync reads pyproject.toml from the current directory. No uv workspace configuration ties this tutorial to the repository root, so running uv sync from the repo root installs the main Curator package rather than this tutorial's dependencies. Users should cd tutorials/multimodal/omni-fuse-data-curation/ first.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 367d7e9

hk1510 and others added 4 commits June 11, 2026 20:34
Signed-off-by: Harshil Kotamreddy <hkotamreddy@nvidia.com>
Signed-off-by: Harshil Kotamreddy <hkotamreddy@nvidia.com>
Signed-off-by: Harshil Kotamreddy <hkotamreddy@nvidia.com>
Signed-off-by: Harshil Kotamreddy <hkotamreddy@nvidia.com>
Comment on lines +115 to +124
task = run_curator_step(
name=f"{config.experiment_id}-2-embed",
stages=[EEEEmbeddingStage(config=config)],
initial_task=load_sns_task(config),
)
records_path = config.run_dir / "embeddings" / "records.jsonl"
write_jsonl(records_path, records_from_task(task))
metadata = dict(getattr(task, "_metadata", {}) or {})
metadata["embedding_records_path"] = str(records_path)
task._metadata = metadata

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 task_id is not a parameter of make_document_batch

load_sns_task, load_embedding_task, and load_projection_task all call make_document_batch(task_id=..., dataset_name=..., records=..., metadata=...), but the current make_document_batch signature in compat/curator.py is (dataset_name, records, metadata=None, stage_perf=None) — it has no task_id parameter. Every one of these three loaders will raise TypeError: make_document_batch() got an unexpected keyword argument 'task_id' as soon as it is called, breaking all three resume paths (2_embed.py, 3_project.py, 4_datablend.py). Remove the task_id= keyword from all three call sites.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reverted 21b8091 in 59225f5 so this should no longer be an issue.

DocumentBatch requires task_id

This reverts commit 21b8091.

Signed-off-by: Harshil Kotamreddy <hkotamreddy@nvidia.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant