Skip to content

Remove 2-pass planner (annotator + distribution) in favor of a 1-pass planner#416

Merged
gabotechs merged 13 commits into
mainfrom
gabrielmusat/refactor-distributed-planning
May 14, 2026
Merged

Remove 2-pass planner (annotator + distribution) in favor of a 1-pass planner#416
gabotechs merged 13 commits into
mainfrom
gabrielmusat/refactor-distributed-planning

Conversation

@gabotechs
Copy link
Copy Markdown
Collaborator

@gabotechs gabotechs commented Apr 27, 2026

This is a preparatory step towards:

This is one PR from the following stack of PRs:

The main purpose of this PR is to make distributed planning in a single pass, rather than the current two that communicate each other via an intermediate struct (AnnotatedPlan). This change cascades into several other changes that produce a nicer public API for building custom distributed plans, but also produce a big diff.

Dropping the two-step annotation + NB injection

On a dynamic task assignation context, choosing the task count for a stage based on the previous one can no longer
be done statically.

After "annotating" a stage, and before "annotating" the
one above, we need to be able to send it for execution, collect runtime
metrics, and based on that decide the task count for the stage above.

This means that the stage below should be good to be sent for execution
before the full annotation process has finished, meaning that we need to
do everything there is to be done in the "annotation" process, we can no
longer divide the distribution process in several steps that recurse the
whole plan.

Network boundaries no longer mutate their children

In order for network boundaries to know what mutations to apply to their
children, they need to now how many consumer tasks are they going to be
running, but this might not be know until execution time, so if we want to
dynamically assign tasks to stages, there's no way at planning time that
we can know how to mutate the children.

For example, we do not now how to scale up a RepartitionExec if we don't
know how many NetworkShuffleExecs are going to be consuming it.

The responsibility of preparing network boundaries inputs (e.g., scaling RepartitionExec)
is now factor out into a separate network_boundary_scale_input() function
that can be called either at planning time or at execution time.

Right now, it's still just called at planning time.

Comment thread tests/tpcds_plans_test.rs Outdated
Comment thread tests/tpcds_plans_test.rs Outdated
@gabotechs gabotechs force-pushed the gabrielmusat/refactor-distributed-planning branch 2 times, most recently from 314bac8 to 8c98bc8 Compare April 27, 2026 10:22
@gabotechs gabotechs force-pushed the gabrielmusat/refactor-distributed-planning branch from f3cd3dc to 7626c71 Compare April 27, 2026 13:20
@gabotechs gabotechs marked this pull request as ready for review April 27, 2026 13:25
@gabotechs gabotechs force-pushed the gabrielmusat/refactor-distributed-planning branch from 2ff4808 to d12d12b Compare April 30, 2026 09:05
@gabotechs gabotechs changed the base branch from main to gabrielmusat/remote-artificial-broadcast-task-limit May 1, 2026 13:25
@gabotechs gabotechs changed the base branch from gabrielmusat/remote-artificial-broadcast-task-limit to gabrielmusat/refactor-stage-struct May 1, 2026 15:44
Comment on lines +523 to +524
#[cfg(test)]
mod tests {
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, Github is not smart enough to show a proper diff. These tests have barely changed.

Copy link
Copy Markdown

@asolimando asolimando left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, I focused more on the conceptual part than on the details of the code itself, and I think this looks very good!

One pass planning and the clear split for stages over network boundaries is the perfect place to tune the "dimensionality" of the plan (partitions and scaling).

This is a very sensible trade-off between purely static planning which can suffer from bad estimations for the number of partitions, without paying the price (for now?) for full replanning: you fix the structure early on based on the plan structure single-node DF computes, which anyway something that is way harder to adjust dynamically, but you tune the level of parallelism which in a good class of query shapes is the dominating cost, and this tuning stays tractable.

The clear separation in stages you are defining here also makes per-stage observability easier, and keeps the door open for having fault-tolerance/retries at the stage level tomorrow, if needed.

Comment thread src/distributed_planner/distribute_plan.rs Outdated
Comment thread src/distributed_planner/inject_network_boundaries.rs
Comment thread src/distributed_planner/inject_network_boundaries.rs
Comment thread src/distributed_planner/distribute_plan.rs Outdated
gabotechs added a commit that referenced this pull request May 11, 2026
PR factored out from
#416.

This is one PR from the following stack of PRs:
- #422
<- you are here
- #424
- #416
- #425
- #426
- #427
- #432

Previously, we where force-propagating a max task count assignation
below
the NetworkBroadcast so that the remote build side has never more tasks
than the stage above.

In a dynamic task count assignation context, we can no longer do this,
as by the time
you realize a remote build side is going to have more tasks than the
stage above, the build side might have
already started executing, and by that time its task count is set in
stone.

This is fine, build side in broadcast should have an arbitrarily more
expensive
build side. What matters there is not that the build side is cheap to
execute, but that it returns little amount of data. A build side can
return
very little data (just a couple of rows) and still be very expensive to
execute

This is actually the reason why there is a small speedup in benchmarks.

---

<details><summary>tpch_sf10 1.09 faster ✔</summary>

```text
=== Comparing tpch_sf10 results from engine 'datafusion-distributed-main' [prev] with 'datafusion-distributed-dynamic-task-allocation' [new] ===
      q1: prev=1269 ms, new=1286 ms, diff=1.01 slower ✖
      q2: prev= 390 ms, new= 395 ms, diff=1.01 slower ✖
      q3: prev= 784 ms, new= 826 ms, diff=1.05 slower ✖
      q4: prev= 413 ms, new= 392 ms, diff=1.05 faster ✔
      q5: prev=1306 ms, new=1242 ms, diff=1.05 faster ✔
      q6: prev= 534 ms, new= 528 ms, diff=1.01 faster ✔
      q7: prev=1483 ms, new=1420 ms, diff=1.04 faster ✔
      q8: prev=3001 ms, new=1585 ms, diff=1.89 faster ✅
      q9: prev=2054 ms, new=2009 ms, diff=1.02 faster ✔
     q10: prev= 951 ms, new= 921 ms, diff=1.03 faster ✔
     q11: prev= 322 ms, new= 304 ms, diff=1.06 faster ✔
     q12: prev= 670 ms, new= 676 ms, diff=1.01 slower ✖
     q13: prev= 624 ms, new= 613 ms, diff=1.02 faster ✔
     q14: prev= 594 ms, new= 546 ms, diff=1.09 faster ✔
     q15: prev= 778 ms, new= 756 ms, diff=1.03 faster ✔
     q16: prev= 223 ms, new= 219 ms, diff=1.02 faster ✔
     q17: prev=1644 ms, new=1733 ms, diff=1.05 slower ✖
     q18: prev=1884 ms, new=1966 ms, diff=1.04 slower ✖
     q19: prev= 802 ms, new= 727 ms, diff=1.10 faster ✔
     q20: prev= 784 ms, new= 706 ms, diff=1.11 faster ✔
     q21: prev=2112 ms, new=1925 ms, diff=1.10 faster ✔
     q22: prev= 251 ms, new= 261 ms, diff=1.04 slower ✖
   TOTAL: prev=68651.703894 ms, new=63144.566305999986 ms, diff=1.09 faster ✔
```

</details>

<details><summary>tpcds_sf1 1.02 faster ✔</summary>

```text
=== Comparing tpcds_sf1 results from engine 'datafusion-distributed-dynamic-task-allocation' [prev] with 'datafusion-distributed-dynamic-task-allocation' [new] ===
      q1: prev= 260 ms, new= 336 ms, diff=1.29 slower ❌
      q2: prev= 290 ms, new= 321 ms, diff=1.11 slower ✖
      q3: prev= 181 ms, new= 215 ms, diff=1.19 slower ✖
      q4: prev=2039 ms, new=2184 ms, diff=1.07 slower ✖
      q5: prev= 333 ms, new= 325 ms, diff=1.02 faster ✔
      q6: prev= 622 ms, new= 676 ms, diff=1.09 slower ✖
      q7: prev= 225 ms, new= 225 ms, diff=1.00 slower ✖
      q8: prev= 312 ms, new= 200 ms, diff=1.56 faster ✅
      q9: prev= 242 ms, new= 189 ms, diff=1.28 faster ✅
     q10: prev= 480 ms, new= 494 ms, diff=1.03 slower ✖
     q11: prev=1511 ms, new=1382 ms, diff=1.09 faster ✔
     q12: prev= 262 ms, new= 292 ms, diff=1.11 slower ✖
     q13: prev= 477 ms, new= 487 ms, diff=1.02 slower ✖
     q14: prev= 637 ms, new= 782 ms, diff=1.23 slower ❌
     q15: prev= 170 ms, new= 144 ms, diff=1.18 faster ✔
     q16: prev= 350 ms, new= 379 ms, diff=1.08 slower ✖
     q17: prev= 229 ms, new= 250 ms, diff=1.09 slower ✖
     q18: prev= 295 ms, new= 281 ms, diff=1.05 faster ✔
     q19: prev= 286 ms, new= 254 ms, diff=1.13 faster ✔
     q20: prev= 206 ms, new= 143 ms, diff=1.44 faster ✅
     q21: prev= 305 ms, new= 282 ms, diff=1.08 faster ✔
     q22: prev= 390 ms, new= 401 ms, diff=1.03 slower ✖
     q23: prev= 672 ms, new= 640 ms, diff=1.05 faster ✔
     q24: prev= 368 ms, new= 376 ms, diff=1.02 slower ✖
     q25: prev= 203 ms, new= 279 ms, diff=1.37 slower ❌
     q26: prev= 147 ms, new= 198 ms, diff=1.35 slower ❌
     q27: prev= 406 ms, new= 358 ms, diff=1.13 faster ✔
     q28: prev= 195 ms, new= 161 ms, diff=1.21 faster ✅
     q29: prev= 237 ms, new= 219 ms, diff=1.08 faster ✔
     q31: prev= 343 ms, new= 327 ms, diff=1.05 faster ✔
     q32: prev= 142 ms, new= 152 ms, diff=1.07 slower ✖
     q33: prev= 277 ms, new= 211 ms, diff=1.31 faster ✅
     q34: prev= 199 ms, new= 188 ms, diff=1.06 faster ✔
     q35: prev= 514 ms, new= 498 ms, diff=1.03 faster ✔
     q36: prev= 341 ms, new= 311 ms, diff=1.10 faster ✔
     q37: prev= 256 ms, new= 302 ms, diff=1.18 slower ✖
     q38: prev= 228 ms, new= 245 ms, diff=1.07 slower ✖
     q39: prev= 259 ms, new= 266 ms, diff=1.03 slower ✖
     q40: prev= 281 ms, new= 325 ms, diff=1.16 slower ✖
     q41: prev=  87 ms, new=  90 ms, diff=1.03 slower ✖
     q42: prev= 116 ms, new= 124 ms, diff=1.07 slower ✖
     q43: prev= 190 ms, new= 132 ms, diff=1.44 faster ✅
     q44: prev= 214 ms, new= 144 ms, diff=1.49 faster ✅
     q45: prev= 244 ms, new= 186 ms, diff=1.31 faster ✅
     q46: prev= 355 ms, new= 288 ms, diff=1.23 faster ✅
     q47: prev= 374 ms, new= 387 ms, diff=1.03 slower ✖
     q48: prev= 384 ms, new= 360 ms, diff=1.07 faster ✔
     q49: prev= 285 ms, new= 229 ms, diff=1.24 faster ✅
     q50: prev= 352 ms, new= 343 ms, diff=1.03 faster ✔
     q51: prev= 305 ms, new= 224 ms, diff=1.36 faster ✅
     q52: prev= 138 ms, new= 127 ms, diff=1.09 faster ✔
     q53: prev= 143 ms, new= 158 ms, diff=1.10 slower ✖
     q54: prev= 331 ms, new= 271 ms, diff=1.22 faster ✅
     q55: prev= 132 ms, new= 145 ms, diff=1.10 slower ✖
     q56: prev= 298 ms, new= 233 ms, diff=1.28 faster ✅
     q57: prev= 335 ms, new= 354 ms, diff=1.06 slower ✖
     q58: prev= 280 ms, new= 284 ms, diff=1.01 slower ✖
     q59: prev= 293 ms, new= 270 ms, diff=1.09 faster ✔
     q60: prev= 361 ms, new= 311 ms, diff=1.16 faster ✔
     q61: prev= 856 ms, new= 849 ms, diff=1.01 faster ✔
     q62: prev= 639 ms, new= 665 ms, diff=1.04 slower ✖
     q63: prev= 224 ms, new= 148 ms, diff=1.51 faster ✅
     q64: prev=1159 ms, new=1193 ms, diff=1.03 slower ✖
     q65: prev= 229 ms, new= 228 ms, diff=1.00 faster ✔
     q66: prev= 730 ms, new= 714 ms, diff=1.02 faster ✔
     q67: prev= 406 ms, new= 420 ms, diff=1.03 slower ✖
     q68: prev= 289 ms, new= 320 ms, diff=1.11 slower ✖
     q69: prev= 513 ms, new= 570 ms, diff=1.11 slower ✖
     q70: prev= 394 ms, new= 386 ms, diff=1.02 faster ✔
     q71: prev= 250 ms, new= 329 ms, diff=1.32 slower ❌
     q72: prev=6644 ms, new=6609 ms, diff=1.01 faster ✔
     q73: prev= 201 ms, new= 210 ms, diff=1.04 slower ✖
     q74: prev= 797 ms, new= 743 ms, diff=1.07 faster ✔
     q75: prev= 375 ms, new= 452 ms, diff=1.21 slower ❌
     q76: prev= 165 ms, new= 230 ms, diff=1.39 slower ❌
     q77: prev= 232 ms, new= 271 ms, diff=1.17 slower ✖
     q78: prev= 341 ms, new= 353 ms, diff=1.04 slower ✖
     q79: prev= 226 ms, new= 228 ms, diff=1.01 slower ✖
     q80: prev= 332 ms, new= 336 ms, diff=1.01 slower ✖
     q81: prev= 216 ms, new= 191 ms, diff=1.13 faster ✔
     q82: prev= 258 ms, new= 262 ms, diff=1.02 slower ✖
     q83: prev= 240 ms, new= 287 ms, diff=1.20 slower ✖
     q84: prev= 240 ms, new= 228 ms, diff=1.05 faster ✔
     q85: prev= 455 ms, new= 364 ms, diff=1.25 faster ✅
     q86: prev= 124 ms, new= 138 ms, diff=1.11 slower ✖
     q87: prev= 203 ms, new= 208 ms, diff=1.02 slower ✖
     q88: prev= 404 ms, new= 350 ms, diff=1.15 faster ✔
     q89: prev= 237 ms, new= 167 ms, diff=1.42 faster ✅
     q90: prev= 189 ms, new= 187 ms, diff=1.01 faster ✔
     q91: prev= 377 ms, new= 328 ms, diff=1.15 faster ✔
     q92: prev= 284 ms, new= 131 ms, diff=2.17 faster ✅
     q93: prev= 154 ms, new= 142 ms, diff=1.08 faster ✔
     q94: prev= 302 ms, new= 308 ms, diff=1.02 slower ✖
     q95: prev= 365 ms, new= 290 ms, diff=1.26 faster ✅
     q96: prev= 177 ms, new= 157 ms, diff=1.13 faster ✔
     q97: prev= 235 ms, new= 170 ms, diff=1.38 faster ✅
     q98: prev= 165 ms, new= 159 ms, diff=1.04 faster ✔
     q99: prev= 951 ms, new= 995 ms, diff=1.05 slower ✖
   TOTAL: prev=123029.07797800002 ms, new=120962.55825200005 ms, diff=1.02 faster ✔
```

</details>
gabotechs added a commit that referenced this pull request May 11, 2026
An independent refactor factored out from
#416

This is one PR from the following stack of PRs:
- #422
- #424
<- you are here
- #416
- #425
- #426
- #427
- #432


Previously the stage struct was a "hidden" state machine that could have
two states:

1. A state where the Stage contains the input plan and is locally
accessible and traversible.

```rust
pub struct Stage {
    query_id: ...
    num: ...
    plan: Some(plan),
    tasks: vec![None, None, None]
}
```

2. A state where the input plan is serialized, and the worker URLs are
assigned. This happens in `DistributedExec` right before execution on
`prepare_plan()`

```rust
pub struct Stage {
    query_id: ...
    num: ...
    plan: None,
    tasks: vec![Some("http://1"), Some("http://2"), Some("http://3")]
}
```

This PR makes this behavior explicit, and represented with an `enum`:

```rust
pub enum Stage {
    Local(LocalStage),
    Remote(RemoteStage),
}

pub struct LocalStage {
    pub query_id: Uuid,
    pub num: usize,
    pub plan: Arc<dyn ExecutionPlan>,
    pub tasks: usize,
}

pub struct RemoteStage {
    pub query_id: Uuid,
    pub num: usize,
    pub workers: Vec<Url>,
}
```
Base automatically changed from gabrielmusat/refactor-stage-struct to main May 11, 2026 17:25
@gabotechs gabotechs force-pushed the gabrielmusat/refactor-distributed-planning branch 2 times, most recently from 64ce7e6 to 11a240b Compare May 11, 2026 17:42
Copy link
Copy Markdown
Collaborator

@gene-bordegaray gene-bordegaray left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a good separation of passes 👍 . The only thing I am a bit wary of in the "pending" or hald-built network stages. I think this could be more explicit to be clearer.

Comment thread src/distributed_planner/distributed_query_planner.rs
Comment thread src/distributed_planner/distribute_plan.rs Outdated
Comment thread src/distributed_planner/distribute_plan.rs Outdated
Comment thread src/distributed_planner/prepare_network_boundaries.rs Outdated
Comment thread src/distributed_planner/inject_network_boundaries.rs Outdated
Comment thread src/distributed_planner/inject_network_boundaries.rs Outdated
Comment thread src/distributed_planner/inject_network_boundaries.rs Outdated
Comment thread src/distributed_planner/network_boundary.rs
Comment thread src/execution_plans/network_shuffle.rs
Comment thread src/execution_plans/network_coalesce.rs Outdated
@gabotechs gabotechs force-pushed the gabrielmusat/refactor-distributed-planning branch from 11a240b to dfe9199 Compare May 12, 2026 14:07
Address feedbac
@gabotechs gabotechs force-pushed the gabrielmusat/refactor-distributed-planning branch from dfe9199 to 6b66a95 Compare May 12, 2026 14:08
Comment thread src/distributed_planner/distribute_plan.rs Outdated
Comment thread src/distributed_planner/distributed_query_planner.rs
Comment thread src/distributed_planner/distribute_plan.rs Outdated
Comment thread src/distributed_planner/inject_network_boundaries.rs
Comment thread src/distributed_planner/inject_network_boundaries.rs Outdated
Comment thread src/distributed_planner/inject_network_boundaries.rs
Comment thread src/distributed_planner/inject_network_boundaries.rs Outdated
Comment thread src/distributed_planner/inject_network_boundaries.rs Outdated
Comment thread src/distributed_planner/prepare_network_boundaries.rs
Comment thread src/distributed_planner/distributed_query_planner.rs
Comment thread src/distributed_planner/inject_network_boundaries.rs
Copy link
Copy Markdown
Collaborator

@gene-bordegaray gene-bordegaray left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mostly extending existing conversations

Comment thread src/distributed_planner/distribute_plan.rs Outdated
Comment thread src/distributed_planner/inject_network_boundaries.rs Outdated
Comment thread src/execution_plans/network_coalesce.rs Outdated
Comment thread src/distributed_planner/distribute_plan.rs Outdated
@jayshrivastava
Copy link
Copy Markdown
Collaborator

Looks good from my end.

Copy link
Copy Markdown
Collaborator

@gene-bordegaray gene-bordegaray left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

woooop 💯

@gabotechs
Copy link
Copy Markdown
Collaborator Author

🙏 Thanks for the reviews guys! I know this is pretty tricky code, and getting you to also be aware of it is great.

@gabotechs gabotechs merged commit c67c9fb into main May 14, 2026
17 checks passed
@gabotechs gabotechs deleted the gabrielmusat/refactor-distributed-planning branch May 14, 2026 23:52
gabotechs added a commit that referenced this pull request May 15, 2026
This is a preparatory step towards:
-
#377

This is one PR from the following stack of PRs:
- #422
- #424
- #416 
- #425
<- you are here
- #426
- #427
- #432


Removes `impl_set_plan.rs` in favor of just inlining its contents to
`impl_coordinator_channel.rs`.

In future changes, the relationship between `impl_set_plan.rs` and
`impl_coordinator_channel.rs` will get more complex, increasing the
function signature `impl_set_plan.rs` exposes to
`impl_coordinator_channel.rs`. This proves that the split between those
two files does not make sense, as they have never been able to evolve
independently, so we may as well just not pay the price of a complex
function signature in between.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants