Add task specialized plans#454
Draft
shehab-ali wants to merge 1 commit into
Draft
Conversation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Motivation
Today, distributed execution uses a shared stage plan payload for all tasks. This PR adds an opt-in path to materialize per-task stage plans so we can incrementally roll out task specialization while preserving current default behavior.
What changed
Added distributed.task_specialized_stage_plans to DistributedConfig (default: false).
This flag controls whether per-task stage plans are built/serialized.
Added build_task_specialized_stage_plans in task_specialized_stage_plans.rs.
The builder creates task_count plans and rewrites PartitionIsolatorExec nodes into task-fixed PartitionIsolatorExec::for_task(...) instances.
Exported the builder from distributed_planner::mod.rs for internal planner use.
CoordinatorToWorkerTaskSpawner now stores plan_protos: Vec<Vec> (one serialized plan per task) instead of a single shared plan_proto.
CoordinatorToWorkerTaskSpawner::new now accepts ctx so it can read distributed config and choose between:
specialized per-task plans (flag on), or
cloned shared plans (flag off).
SetPlanRequest now sends plan_protos[task_i] to each task.
Updated callsite accordingly in DistributedExec::prepare_plan.
Added fixed_partition_group: Option<Vec>.
Added constructor PartitionIsolatorExec::for_task(...) to build task-fixed isolators.
execute now uses fixed mapping when present, otherwise preserves existing runtime DistributedTaskContext behavior.
Behavior and compatibility
Default behavior unchanged (task_specialized_stage_plans = false).
Specialization path is fully gated behind config and can be enabled incrementally.
Testing
Added/updated unit test returns_one_plan_per_task to verify per-task plan generation path and transformed plan shape.
Ran formatting and targeted test:
cargo fmt --check
cargo test returns_one_plan_per_task --lib -q
Follow-ups
Extend specialization beyond PartitionIsolatorExec (scan-level/file-group pruning).
Validate and adapt WorkUnit feed declaration/indexing assumptions for heterogeneous per-task partitioning.