Skip to content

fix: auto-detect Ray fanout stages#2056

Open
nightcityblade wants to merge 5 commits into
NVIDIA-NeMo:mainfrom
nightcityblade:fix/issue-1613-auto-fanout
Open

fix: auto-detect Ray fanout stages#2056
nightcityblade wants to merge 5 commits into
NVIDIA-NeMo:mainfrom
nightcityblade:fix/issue-1613-auto-fanout

Conversation

@nightcityblade

Copy link
Copy Markdown
Contributor

Description

Closes #1613.

Automatically marks Ray Data stages as fanout stages when their process return annotation is a concrete list[...]. This lets stages like URLGenerationStage rely on the base ProcessingStage default instead of manually setting is_fanout_stage.

Supersedes #2025 with the same branch after refreshing the DCO sign-offs.

Usage

class MyFanoutStage(ProcessingStage[InputTask, OutputTask]):
    def process(self, task: InputTask) -> list[OutputTask]:
        ...

assert MyFanoutStage().ray_stage_spec() == {"is_fanout_stage": True}

Checklist

  • I am familiar with the Contributing Guide.
  • New or Existing tests cover these changes.
  • The documentation is up to date with these changes.

Tests:

  • python3 -m compileall nemo_curator/stages/base.py nemo_curator/stages/video/clipping/clip_extraction_stages.py tests/stages/video/clipping/test_clip_transcoding_stage.py
  • Earlier targeted ruff checks passed on the changed stage/test files; targeted pytest collection is blocked locally on macOS by NeMo-Curator's Linux-only runtime check.

nightcityblade added 5 commits June 5, 2026 11:10
Signed-off-by: nightcityblade <nightcityblade@gmail.com>
Signed-off-by: nightcityblade <nightcityblade@gmail.com>
Signed-off-by: nightcityblade <nightcityblade@gmail.com>
Signed-off-by: nightcityblade <nightcityblade@gmail.com>
Signed-off-by: nightcityblade <nightcityblade@gmail.com>
@copy-pr-bot

copy-pr-bot Bot commented Jun 8, 2026

Copy link
Copy Markdown

This pull request requires additional validation before any workflows can run on NVIDIA's runners.

Pull request vetters can view their responsibilities here.

Contributors can view more details about this message here.

@greptile-apps

greptile-apps Bot commented Jun 8, 2026

Copy link
Copy Markdown
Contributor

Greptile Summary

This PR eliminates per-stage ray_stage_spec() boilerplate by teaching ProcessingStage to auto-detect fanout stages: if a concrete process method is annotated with a plain list[...] return type, the base ray_stage_spec() now returns {"is_fanout_stage": True} automatically. It also corrects ClipTranscodingStage.process to return list[VideoTask] everywhere (including the early-return no-clips path), bringing the signature in line with the fanout contract.

  • ProcessingStage._process_returns_list uses get_type_hints with a string-annotation fallback to inspect the return annotation at runtime; only a bare list[...] (not a union containing list) triggers the fanout flag, matching the documented "concrete list" requirement.
  • Five stages drop their manual overrides; each has a -> list[...] return annotation confirmed.
  • Tests cover fanout, non-fanout, union/maybe-fanout, explicit opt-in, and the string-annotation fallback path.

Confidence Score: 4/5

Safe to merge — the auto-detection logic is well-tested and all removed overrides were verified to have matching list[...] return annotations.

The implementation is sound for all current stages. The one gap is that get_type_hints can raise AttributeError on dotted-attribute annotations, which would surface as an unexpected exception from ray_stage_spec() rather than silently falling back — a narrow but real failure mode on the changed path.

nemo_curator/stages/base.py — the _process_returns_list except clause

Important Files Changed

Filename Overview
nemo_curator/stages/base.py Adds _process_returns_list / _annotation_includes_list to auto-detect list[...] return annotations; get_type_hints fallback doesn't catch AttributeError
nemo_curator/stages/video/clipping/clip_extraction_stages.py Fixes ClipTranscodingStage.process return type to list[VideoTask], wraps early-return path in a list
nemo_curator/stages/audio/alm/alm_manifest_reader.py Removes ray_stage_spec override; process annotated -> list[AudioBatch], auto-detection covers it
nemo_curator/stages/deduplication/semantic/pairwise_io.py Removes ray_stage_spec override and unused import; process annotated -> list[FileGroupTask]
nemo_curator/stages/file_partitioning.py Removes ray_stage_spec override and unused import; process annotated -> list[FileGroupTask]
nemo_curator/stages/text/download/base/url_generation.py Removes ray_stage_spec override; process annotated -> list[FileGroupTask]
tests/stages/common/test_base.py Adds TestProcessingStageRaySpec covering fanout, non-fanout, union, opt-in, and string-annotation fallback
tests/stages/video/clipping/test_clip_transcoding_stage.py Updates assertions for early-return path to expect list[VideoTask]

Flowchart

%%{init: {'theme': 'neutral'}}%%
flowchart TD
    A["ray_stage_spec() called"] --> B["_process_returns_list(cls)"]
    B --> C{"get_type_hints(cls.process) succeeds?"}
    C -- "Yes" --> D["return_annotation = resolved type"]
    C -- "NameError / TypeError / AttributeError" --> E["return_annotation = raw __annotations__ string"]
    D --> F["_annotation_includes_list(annotation)"]
    E --> F
    F --> G{"isinstance(annotation, str)?"}
    G -- "Yes" --> H{"startswith list[ / List[ / typing.List[?"}
    G -- "No" --> I{"get_origin(annotation) is list?"}
    H -- "True" --> J["return is_fanout_stage: True"]
    H -- "False" --> K["return {}"]
    I -- "True" --> J
    I -- "False" --> K
Loading

Reviews (1): Last reviewed commit: "fix: return clip fanout tasks consistent..." | Re-trigger Greptile

Comment on lines +312 to +315
try:
return_annotation = get_type_hints(cls.process).get("return")
except (NameError, TypeError):
return_annotation = cls.process.__annotations__.get("return")

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Adding AttributeError to the except tuple ensures the fallback path is reached when get_type_hints encounters a dotted-attribute annotation referencing a missing symbol (e.g. some_module.MissingType). Without it, the exception propagates out of ray_stage_spec().

Suggested change
try:
return_annotation = get_type_hints(cls.process).get("return")
except (NameError, TypeError):
return_annotation = cls.process.__annotations__.get("return")
try:
return_annotation = get_type_hints(cls.process).get("return")
except (NameError, TypeError, AttributeError):
return_annotation = cls.process.__annotations__.get("return")

Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!

@svcnvidia-nemo-ci svcnvidia-nemo-ci added the waiting-on-maintainers Waiting on maintainers to respond label Jun 10, 2026
@sarahyurick sarahyurick self-requested a review June 11, 2026 20:57
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

community-request waiting-on-maintainers Waiting on maintainers to respond

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Automatically detect when IS_FANOUT_STAGE should be set to True

2 participants