Add Omni-Fuse Tutorial#2069
Conversation
Signed-off-by: Harshil Kotamreddy <hkotamreddy@nvidia.com>
…modal_hybrid_real.yaml Signed-off-by: Harshil Kotamreddy <hkotamreddy@nvidia.com>
Greptile SummaryThis PR adds a complete end-to-end tutorial for the Omni-Fuse multimodal data curation pipeline, including five numbered pipeline scripts, a NeMo Curator integration layer, and all supporting modules for SNS, EEE, projection training, and datablend ranking. Several issues identified in earlier review rounds (missing imports, duplicate spacy dependency, broken
Confidence Score: 5/5Safe to merge — all previously-blocking issues have been addressed in follow-up commits; the remaining findings are non-functional spec compliance and robustness nits. All five pipeline scripts, the NeMo Curator integration layer, and every supporting module are present and internally consistent. The data flow from manifest loading through SNS, EEE, projection, and datablend is coherent. Prior review rounds resolved missing imports, broken constructors, and README instructions. The two remaining items (numpy header alignment and config key validation) do not affect any code path that runs during normal tutorial execution. omnifuse_tutorial/data/io.py (write_npy header alignment) and omnifuse_tutorial/config/models.py (from_dict key validation) are worth a second look before any non-tutorial reuse. Important Files Changed
Flowchart%%{init: {'theme': 'neutral'}}%%
flowchart TD
A[["0_validate_inputs.py\n(validate config & data pools)"]]
B[["1_sns.py\n(Semantic Noise Suppression)"]]
C[["2_embed.py\n(Expert Embedding Engine)"]]
D[["3_project.py\n(Projection Training)"]]
E[["4_datablend.py\n(Datablend Ranking)"]]
subgraph PipelineStages["NeMo Curator Pipeline Stages"]
S1["PairManifestReaderStage\n(load raw/annotation pairs)"]
S2["SNSStage\n(suppress noisy samples)"]
S3["EEEEmbeddingStage\n(text-based / fusion / e2e experts)"]
S4["ProjectionTrainingStage\n(linear or torch MLP)"]
S5["DatablendRankingStage\n(cosine rank vs query)"]
end
A --> B --> C --> D --> E
S1 -->|records + pair_ids| S2
S2 -->|sns_annotation, sns_raw_text| S3
S3 -->|EmbeddingBundle in metadata| S4
S4 -->|ProjectionResult in metadata| S5
S5 -->|ranked.jsonl + topk.jsonl| OUT[("Run output directory")]
subgraph Backends["EEE Backends"]
API["NvidiaApiEEEBackend\n(text-based expert)"]
LOC["LocalEEEBackend\n(LanguageBind / OmniEmbed)"]
HYB["HybridEEEBackend\n(API text + local fusion/e2e)"]
end
S3 --> Backends
B --> Backends
Reviews (5): Last reviewed commit: "Revert "remove task_id from DocumentBatc..." | Re-trigger Greptile |
| # Copyright (c) 2026, NVIDIA CORPORATION. All rights reserved. | ||
| # | ||
| # Licensed under the Apache License, Version 2.0 (the "License"); | ||
| # you may not use this file except in compliance with the License. | ||
| # You may obtain a copy of the License at | ||
| # | ||
| # http://www.apache.org/licenses/LICENSE-2.0 | ||
| # | ||
| # Unless required by applicable law or agreed to in writing, software | ||
| # distributed under the License is distributed on an "AS IS" BASIS, | ||
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| # See the License for the specific language governing permissions and | ||
| # limitations under the License. | ||
|
|
||
| """Small compatibility layer around NeMo Curator APIs.""" | ||
|
|
||
| from __future__ import annotations | ||
|
|
||
| import copy | ||
| from typing import Any | ||
|
|
||
| import pandas as pd | ||
| from nemo_curator.pipeline import Pipeline as CuratorPipeline | ||
| from nemo_curator.tasks import DocumentBatch, EmptyTask | ||
|
|
||
|
|
||
| def records_from_task(task: Any) -> list[dict[str, Any]]: | ||
| """Return task data as records from a Curator task.""" | ||
|
|
||
| data = task.data | ||
| if hasattr(data, "to_dict"): | ||
| try: | ||
| return [dict(row) for row in data.to_dict(orient="records")] | ||
| except TypeError: | ||
| return [dict(row) for row in data.to_dict("records")] | ||
| if isinstance(data, list): | ||
| return [dict(row) for row in data] | ||
| raise TypeError(f"Unsupported task data type: {type(data)!r}") | ||
|
|
||
|
|
||
| def make_document_batch( | ||
| task_id: str, | ||
| dataset_name: str, | ||
| records: list[dict[str, Any]], | ||
| metadata: dict[str, Any] | None = None, | ||
| stage_perf: list[Any] | None = None, | ||
| ) -> DocumentBatch: | ||
| """Construct a NeMo Curator DocumentBatch.""" | ||
|
|
||
| return DocumentBatch( | ||
| task_id=task_id, | ||
| dataset_name=dataset_name, | ||
| data=pd.DataFrame.from_records(records), | ||
| _metadata=metadata or {}, | ||
| _stage_perf=stage_perf or [], | ||
| ) | ||
|
|
||
|
|
||
| def make_empty_task() -> EmptyTask: | ||
| if callable(EmptyTask): | ||
| try: | ||
| return EmptyTask(task_id="empty", dataset_name="omnifuse", data=None) | ||
| except TypeError: | ||
| return EmptyTask() | ||
| return copy.deepcopy(EmptyTask) | ||
|
|
||
|
|
||
| def make_curator_pipeline(name: str, stages: list[Any], description: str | None = None) -> CuratorPipeline: | ||
| return CuratorPipeline(name=name, description=description, stages=stages) |
There was a problem hiding this comment.
Missing
ProcessingStage, Resources, and re-export of EmptyTask
Every stage file (stages/eee.py, stages/sns.py, stages/projection.py, stages/datablend.py, stages/reader.py) imports ProcessingStage and Resources from this module. Neither is defined here or re-exported from it. stages/reader.py also imports EmptyTask directly. All five stage modules will fail immediately with ImportError when loaded, making the entire pipeline non-runnable.
These classes need to be either imported from nemo_curator and re-exported, or implemented as shim classes within this compat module.
|
|
||
| from omnifuse_tutorial.compat.curator import make_curator_pipeline, make_empty_task | ||
| from omnifuse_tutorial.config.models import ExperimentConfig | ||
| from omnifuse_tutorial.data.io import write_json |
There was a problem hiding this comment.
Missing
omnifuse_tutorial/data/ package
omnifuse_tutorial/data/io.py and omnifuse_tutorial/data/loader.py are imported by at least eight files — pipeline.py, utils.py, stages/eee.py, stages/projection.py, stages/datablend.py, stages/reader.py, sns/backends.py, and projection/trainer.py — but the entire omnifuse_tutorial/data/ directory does not exist in the repository. Every entry point to the tutorial will fail at import time with ModuleNotFoundError: No module named 'omnifuse_tutorial.data'.
There was a problem hiding this comment.
Added in recent commit. Was .gitignored previously.
| def _audio_format(path: Path) -> str: | ||
| audio_format = path.suffix.lower().lstrip(".") | ||
| if audio_format == "mpeg": | ||
| audio_format = "mp3" | ||
| if audio_format not in {"wav", "mp3"}: | ||
| raise ValueError(f"NVIDIA audio descriptions support wav/mp3 only, got {path.suffix} for {path}") | ||
| return audio_format |
There was a problem hiding this comment.
_audio_format raises for extensions declared as supported
AUDIO_EXTENSIONS and MIME_TYPES both include .flac, .m4a, .aac, and .ogg, advertising them as valid inputs. But _audio_format raises ValueError for any extension other than .wav and .mp3. The API description path in NvidiaApiEEEBackend._describe_raw → _describe_file → describe_file_with_nvidia_api calls this function unconditionally for audio records, so a user who provides a .flac or .m4a file will receive a confusing ValueError rather than a clear unsupported-format message.
Reconcile by either removing the unsupported extensions from AUDIO_EXTENSIONS/MIME_TYPES, converting to a supported format before sending, or raising a clear error with guidance early in the audio code path.
| "pandas>=2.0", | ||
| "pyyaml>=6.0", | ||
| "requests>=2.31", | ||
| "spacy>=3.8.14", |
There was a problem hiding this comment.
Duplicate
spacy constraint — the same package appears twice with conflicting lower bounds (>=3.8.14 on line 17 and >=3.8,<4 on line 35). Keep a single, unambiguous entry.
| "spacy>=3.8.14", | |
| "spacy>=3.8.14,<4", |
Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!
| "qwen-vl-utils>=0.0.8", | ||
| "sentence-transformers>=2.2,<4", | ||
| "soundfile>=0.12", | ||
| "spacy>=3.8,<4", |
| path = Path(value) | ||
| except OSError: | ||
| return None | ||
| return path if path.exists() else path |
There was a problem hiding this comment.
_path_or_none always returns a Path, never None — return path if path.exists() else path is a tautology; both branches return the same value. The | None return type annotation is misleading. Either return None when the path does not exist, or simplify to return path.
| return path if path.exists() else path | |
| return path |
|
|
||
| ```bash | ||
| uv sync --extra dev |
There was a problem hiding this comment.
Incorrect setup directory in README
The instruction says to run uv sync --extra dev "from the Curator repository root", but uv sync reads pyproject.toml from the current directory. No uv workspace configuration ties this tutorial to the repository root, so running uv sync from the repo root installs the main Curator package rather than this tutorial's dependencies. Users should cd tutorials/multimodal/omni-fuse-data-curation/ first.
Signed-off-by: Harshil Kotamreddy <hkotamreddy@nvidia.com>
Signed-off-by: Harshil Kotamreddy <hkotamreddy@nvidia.com>
Signed-off-by: Harshil Kotamreddy <hkotamreddy@nvidia.com>
Signed-off-by: Harshil Kotamreddy <hkotamreddy@nvidia.com>
| task = run_curator_step( | ||
| name=f"{config.experiment_id}-2-embed", | ||
| stages=[EEEEmbeddingStage(config=config)], | ||
| initial_task=load_sns_task(config), | ||
| ) | ||
| records_path = config.run_dir / "embeddings" / "records.jsonl" | ||
| write_jsonl(records_path, records_from_task(task)) | ||
| metadata = dict(getattr(task, "_metadata", {}) or {}) | ||
| metadata["embedding_records_path"] = str(records_path) | ||
| task._metadata = metadata |
There was a problem hiding this comment.
task_id is not a parameter of make_document_batch
load_sns_task, load_embedding_task, and load_projection_task all call make_document_batch(task_id=..., dataset_name=..., records=..., metadata=...), but the current make_document_batch signature in compat/curator.py is (dataset_name, records, metadata=None, stage_perf=None) — it has no task_id parameter. Every one of these three loaders will raise TypeError: make_document_batch() got an unexpected keyword argument 'task_id' as soon as it is called, breaking all three resume paths (2_embed.py, 3_project.py, 4_datablend.py). Remove the task_id= keyword from all three call sites.
DocumentBatch requires task_id This reverts commit 21b8091. Signed-off-by: Harshil Kotamreddy <hkotamreddy@nvidia.com>
Description
This PR adds a tutorial for Omni-Fuse. It follows a similar structure to existing tutorials and builds a curator pipeline to perform retrieval-based data curation for multimodal datasets.
Usage
Instructions to run the tutorial can be found in the README.md file. Open to feedback/adding additional instructions.
Checklist