Skip to content

add heuristic initial replica#2066

Draft
weijiac0619 wants to merge 1 commit into
mainfrom
weijia/initial-replicas
Draft

add heuristic initial replica#2066
weijiac0619 wants to merge 1 commit into
mainfrom
weijia/initial-replicas

Conversation

@weijiac0619

Copy link
Copy Markdown
Contributor

Description

Adds throughput-balanced initial replica allocation for Ray Data video pipelines. Instead of starting every actor stage at 1 replica and waiting for autoscaling, this pre-computes initial actor counts from Xenna profiling speeds so all stages start at a balanced throughput.

Usage

# Add snippet demonstrating usage

Checklist

  • I am familiar with the Contributing Guide.
  • New or Existing tests cover these changes.
  • The documentation is up to date with these changes.

Signed-off-by: Weijia Chen <weijiac@nvidia.com>
@copy-pr-bot

copy-pr-bot Bot commented Jun 11, 2026

Copy link
Copy Markdown

This pull request requires additional validation before any workflows can run on NVIDIA's runners.

Pull request vetters can view their responsibilities here.

Contributors can view more details about this message here.

@greptile-apps

greptile-apps Bot commented Jun 11, 2026

Copy link
Copy Markdown
Contributor

Greptile Summary

This PR adds a throughput-balanced initial replica allocation for Ray Data video pipelines. Instead of every actor stage starting at 1 replica and relying entirely on Ray Data autoscaling, the executor now pre-computes starting replica counts from hardcoded Xenna-profiled speed tables so all stages enter the pipeline at a balanced throughput.

  • compute_joint_initial_allocation in utils.py computes per-stage replica counts by finding the largest scale factor K such that replica counts proportional to 1/speed_i fit within the CPU/GPU budget.
  • executor.py adds hardcoded speed tables for four known pipelines (transcoding, embedding, captioning, TransNetV2), detects which table to use from stage names, and passes the computed initial_replicas down through process_dataset to calculate_concurrency_for_actors_for_stage.
  • adapter.py forwards the new initial_replicas parameter unchanged.

Confidence Score: 3/5

The pipeline will fail at runtime when ignore_head_node=True because the joint allocation is computed from total cluster CPUs while the per-stage max is computed from available CPUs minus the head node, making initial replicas exceed the max concurrency ceiling.

Two separate code paths query cluster resources using different Ray APIs — ray.cluster_resources() for the joint allocation and ray.available_resources() (minus head node when ignore_head_node=True) for per-stage max actors. The resulting concurrency=(min, max) tuple passed to Ray Data's map_batches will have min > max any time ignore_head_node is enabled or the cluster is under any pre-existing load, causing Ray Data to raise an error. There is also no defensive clamp in calculate_concurrency_for_actors_for_stage to catch this before the tuple reaches Ray.

The resource-fetch logic in executor.py (lines 132–134) and the tuple construction in utils.py (lines 136–138) need the most attention before this can safely run in production.

Important Files Changed

Filename Overview
nemo_curator/backends/ray_data/executor.py Adds joint initial replica allocation using Xenna-profiled speed tables; uses ray.cluster_resources() (total) instead of available resources, causing initial_replicas > max_actors when ignore_head_node=True or cluster is under load.
nemo_curator/backends/ray_data/utils.py Adds compute_joint_initial_allocation and _throughput_balanced_allocation; no guard ensures min_actors <= max_actors before returning the concurrency tuple to Ray Data.
nemo_curator/backends/ray_data/adapter.py Threads initial_replicas through process_dataset to calculate_concurrency_for_actors_for_stage; change is a clean parameter pass-through with no logic issues in this file.

Sequence Diagram

sequenceDiagram
    participant E as RayDataExecutor.execute()
    participant R as ray.cluster_resources()
    participant U as compute_joint_initial_allocation()
    participant A as RayDataStageAdapter.process_dataset()
    participant C as calculate_concurrency_for_actors_for_stage()
    participant AR as ray.available_resources()

    E->>R: get total CPUs/GPUs
    R-->>E: _avail_cpus, _avail_gpus (TOTAL)
    E->>U: actor_stages, _avail_cpus, _avail_gpus, stage_speeds
    U-->>E: "joint_allocation {stage_name: initial_replicas}"
    loop each stage
        E->>A: process_dataset(dataset, ignore_head_node, initial_replicas)
        A->>C: calculate_concurrency(stage, ignore_head_node, initial_replicas)
        C->>AR: get available CPUs/GPUs (AVAILABLE, minus head if ignore_head_node)
        AR-->>C: avail_cpus (less than or equal to TOTAL)
        Note over C: max_actors = avail_cpus divided by stage.cpus, min_actors = initial_replicas, warning min_actors may exceed max_actors
        C-->>A: (min_actors, max_actors)
        A-->>E: "dataset.map_batches(concurrency=(min, max))"
    end
Loading

Reviews (1): Last reviewed commit: "add heuristic initial replica" | Re-trigger Greptile

Comment on lines +132 to +134
_cluster = ray.cluster_resources()
_avail_cpus = _cluster.get("CPU", 0)
_avail_gpus = _cluster.get("GPU", 0)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Resource source mismatch: ray.cluster_resources() returns total cluster resources (including head node and in-use allocations), while calculate_concurrency_for_actors_for_stage calls get_available_cpu_gpu_resources() which returns currently available resources. When ignore_head_node=True, the available count is further reduced by subtracting head-node CPUs/GPUs, making it almost certain that initial_replicas > max_actors. Ray Data raises a ValueError when given concurrency=(min, max) with min > max.

Suggested change
_cluster = ray.cluster_resources()
_avail_cpus = _cluster.get("CPU", 0)
_avail_gpus = _cluster.get("GPU", 0)
from nemo_curator.backends.utils import get_available_cpu_gpu_resources
_avail_cpus, _avail_gpus = get_available_cpu_gpu_resources(
init_and_shutdown=False, ignore_head_node=self.ignore_head_node
)

Comment on lines +136 to +138
max_actors = int(min(max_cpu_actors, max_gpu_actors))
min_actors = initial_replicas if initial_replicas is not None else 1
return (min_actors, max_actors)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Even after fixing the resource source, initial_replicas could still exceed max_actors in rare timing windows (e.g., another actor briefly holds resources when calculate_concurrency_for_actors_for_stage queries available resources). Clamping min_actors to max_actors ensures the tuple is always valid without suppressing the intent of the heuristic.

Suggested change
max_actors = int(min(max_cpu_actors, max_gpu_actors))
min_actors = initial_replicas if initial_replicas is not None else 1
return (min_actors, max_actors)
max_actors = int(min(max_cpu_actors, max_gpu_actors))
min_actors = initial_replicas if initial_replicas is not None else 1
return (max(1, min(min_actors, max_actors)), max_actors)

Comment on lines +63 to +74
def _get_pipeline_speeds(actor_stages: list) -> "dict[str, float] | None":
"""Return the Xenna speed table that matches the given actor-stage set, or None."""
names = {s.__class__.__name__ for s in actor_stages}
if "CaptionEnhancementStage" in names:
return _SPEEDS_CAPTIONING
if "TransNetV2ClipExtractionStage" in names:
return _SPEEDS_TRANSNETV2
if "CosmosEmbed1EmbeddingStage" in names:
return _SPEEDS_EMBEDDING
if "ClipTranscodingStage" in names:
return _SPEEDS_TRANSCODING
return None

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Fragile pipeline identity heuristic: The cascade of single-feature checks (CaptionEnhancementStage present → captioning, etc.) silently picks a wrong speed table if a future pipeline combines these stages in a novel way. For example, a pipeline containing both CaptionEnhancementStage and TransNetV2ClipExtractionStage would match captioning even though transnetv2 speeds are also in play. Since a mismatched table causes a silent fallback to N=1 (via the all(n in stage_speeds) guard), correctness is preserved, but it may be worth adding an explicit warning when the matched table doesn't contain every stage in the pipeline.

Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!

@weijiac0619 weijiac0619 marked this pull request as draft June 11, 2026 21:05
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant