Skip to content

Add task specialized plans#454

Draft
shehab-ali wants to merge 1 commit into
datafusion-contrib:mainfrom
shehab-ali:sa/specialized-task-plan
Draft

Add task specialized plans#454
shehab-ali wants to merge 1 commit into
datafusion-contrib:mainfrom
shehab-ali:sa/specialized-task-plan

Conversation

@shehab-ali
Copy link
Copy Markdown

@shehab-ali shehab-ali commented May 13, 2026

Motivation

Today, distributed execution uses a shared stage plan payload for all tasks. This PR adds an opt-in path to materialize per-task stage plans so we can incrementally roll out task specialization while preserving current default behavior.

What changed

  1. Add config flag for gated rollout
    Added distributed.task_specialized_stage_plans to DistributedConfig (default: false).

This flag controls whether per-task stage plans are built/serialized.

  1. Add per-task stage plan builder
    Added build_task_specialized_stage_plans in task_specialized_stage_plans.rs.

The builder creates task_count plans and rewrites PartitionIsolatorExec nodes into task-fixed PartitionIsolatorExec::for_task(...) instances.

Exported the builder from distributed_planner::mod.rs for internal planner use.

  1. Wire distributed fan-out to per-task serialized payloads
    CoordinatorToWorkerTaskSpawner now stores plan_protos: Vec<Vec> (one serialized plan per task) instead of a single shared plan_proto.

CoordinatorToWorkerTaskSpawner::new now accepts ctx so it can read distributed config and choose between:

specialized per-task plans (flag on), or

cloned shared plans (flag off).

SetPlanRequest now sends plan_protos[task_i] to each task.

Updated callsite accordingly in DistributedExec::prepare_plan.

  1. Extend PartitionIsolatorExec for task-fixed mapping
    Added fixed_partition_group: Option<Vec>.

Added constructor PartitionIsolatorExec::for_task(...) to build task-fixed isolators.

execute now uses fixed mapping when present, otherwise preserves existing runtime DistributedTaskContext behavior.

Behavior and compatibility

Default behavior unchanged (task_specialized_stage_plans = false).

Specialization path is fully gated behind config and can be enabled incrementally.

Testing

Added/updated unit test returns_one_plan_per_task to verify per-task plan generation path and transformed plan shape.

Ran formatting and targeted test:

cargo fmt --check
cargo test returns_one_plan_per_task --lib -q

Follow-ups

Extend specialization beyond PartitionIsolatorExec (scan-level/file-group pruning).

Validate and adapt WorkUnit feed declaration/indexing assumptions for heterogeneous per-task partitioning.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant