add heuristic initial replica#2066
Conversation
Signed-off-by: Weijia Chen <weijiac@nvidia.com>
Greptile SummaryThis PR adds a throughput-balanced initial replica allocation for Ray Data video pipelines. Instead of every actor stage starting at 1 replica and relying entirely on Ray Data autoscaling, the executor now pre-computes starting replica counts from hardcoded Xenna-profiled speed tables so all stages enter the pipeline at a balanced throughput.
Confidence Score: 3/5The pipeline will fail at runtime when Two separate code paths query cluster resources using different Ray APIs — The resource-fetch logic in Important Files Changed
Sequence DiagramsequenceDiagram
participant E as RayDataExecutor.execute()
participant R as ray.cluster_resources()
participant U as compute_joint_initial_allocation()
participant A as RayDataStageAdapter.process_dataset()
participant C as calculate_concurrency_for_actors_for_stage()
participant AR as ray.available_resources()
E->>R: get total CPUs/GPUs
R-->>E: _avail_cpus, _avail_gpus (TOTAL)
E->>U: actor_stages, _avail_cpus, _avail_gpus, stage_speeds
U-->>E: "joint_allocation {stage_name: initial_replicas}"
loop each stage
E->>A: process_dataset(dataset, ignore_head_node, initial_replicas)
A->>C: calculate_concurrency(stage, ignore_head_node, initial_replicas)
C->>AR: get available CPUs/GPUs (AVAILABLE, minus head if ignore_head_node)
AR-->>C: avail_cpus (less than or equal to TOTAL)
Note over C: max_actors = avail_cpus divided by stage.cpus, min_actors = initial_replicas, warning min_actors may exceed max_actors
C-->>A: (min_actors, max_actors)
A-->>E: "dataset.map_batches(concurrency=(min, max))"
end
Reviews (1): Last reviewed commit: "add heuristic initial replica" | Re-trigger Greptile |
| _cluster = ray.cluster_resources() | ||
| _avail_cpus = _cluster.get("CPU", 0) | ||
| _avail_gpus = _cluster.get("GPU", 0) |
There was a problem hiding this comment.
Resource source mismatch:
ray.cluster_resources() returns total cluster resources (including head node and in-use allocations), while calculate_concurrency_for_actors_for_stage calls get_available_cpu_gpu_resources() which returns currently available resources. When ignore_head_node=True, the available count is further reduced by subtracting head-node CPUs/GPUs, making it almost certain that initial_replicas > max_actors. Ray Data raises a ValueError when given concurrency=(min, max) with min > max.
| _cluster = ray.cluster_resources() | |
| _avail_cpus = _cluster.get("CPU", 0) | |
| _avail_gpus = _cluster.get("GPU", 0) | |
| from nemo_curator.backends.utils import get_available_cpu_gpu_resources | |
| _avail_cpus, _avail_gpus = get_available_cpu_gpu_resources( | |
| init_and_shutdown=False, ignore_head_node=self.ignore_head_node | |
| ) |
| max_actors = int(min(max_cpu_actors, max_gpu_actors)) | ||
| min_actors = initial_replicas if initial_replicas is not None else 1 | ||
| return (min_actors, max_actors) |
There was a problem hiding this comment.
Even after fixing the resource source,
initial_replicas could still exceed max_actors in rare timing windows (e.g., another actor briefly holds resources when calculate_concurrency_for_actors_for_stage queries available resources). Clamping min_actors to max_actors ensures the tuple is always valid without suppressing the intent of the heuristic.
| max_actors = int(min(max_cpu_actors, max_gpu_actors)) | |
| min_actors = initial_replicas if initial_replicas is not None else 1 | |
| return (min_actors, max_actors) | |
| max_actors = int(min(max_cpu_actors, max_gpu_actors)) | |
| min_actors = initial_replicas if initial_replicas is not None else 1 | |
| return (max(1, min(min_actors, max_actors)), max_actors) |
| def _get_pipeline_speeds(actor_stages: list) -> "dict[str, float] | None": | ||
| """Return the Xenna speed table that matches the given actor-stage set, or None.""" | ||
| names = {s.__class__.__name__ for s in actor_stages} | ||
| if "CaptionEnhancementStage" in names: | ||
| return _SPEEDS_CAPTIONING | ||
| if "TransNetV2ClipExtractionStage" in names: | ||
| return _SPEEDS_TRANSNETV2 | ||
| if "CosmosEmbed1EmbeddingStage" in names: | ||
| return _SPEEDS_EMBEDDING | ||
| if "ClipTranscodingStage" in names: | ||
| return _SPEEDS_TRANSCODING | ||
| return None |
There was a problem hiding this comment.
Fragile pipeline identity heuristic: The cascade of single-feature checks (
CaptionEnhancementStage present → captioning, etc.) silently picks a wrong speed table if a future pipeline combines these stages in a novel way. For example, a pipeline containing both CaptionEnhancementStage and TransNetV2ClipExtractionStage would match captioning even though transnetv2 speeds are also in play. Since a mismatched table causes a silent fallback to N=1 (via the all(n in stage_speeds) guard), correctness is preserved, but it may be worth adding an explicit warning when the matched table doesn't contain every stage in the pipeline.
Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!
Description
Adds throughput-balanced initial replica allocation for Ray Data video pipelines. Instead of starting every actor stage at 1 replica and waiting for autoscaling, this pre-computes initial actor counts from Xenna profiling speeds so all stages start at a balanced throughput.
Usage
# Add snippet demonstrating usageChecklist