Remove artificial broadcast task limit in the build side#422
Merged
gabotechs merged 1 commit intoMay 11, 2026
Conversation
This was referenced May 1, 2026
01654ba to
c5d130a
Compare
gene-bordegaray
approved these changes
May 11, 2026
Collaborator
gene-bordegaray
left a comment
There was a problem hiding this comment.
the explanation makes sense too, all that matters is that we are broadcasting the small number of output rows. not necessarily the price of computing 👍
| @@ -298,19 +298,11 @@ async fn _annotate_plan( | |||
| // If it's a normal plan, continue with the propagation. | |||
| PlanOrNetworkBoundary::Plan(plan) => plan, | |||
| // Broadcast is a stage split only propagate a Maximum cap into the build stage. | |||
Collaborator
There was a problem hiding this comment.
don't think we need comment
Collaborator
Author
There was a problem hiding this comment.
yeap, I removed it in one upper pr in the stack
gabotechs
added a commit
that referenced
this pull request
May 11, 2026
An independent refactor factored out from #416 This is one PR from the following stack of PRs: - #422 - #424 <- you are here - #416 - #425 - #426 - #427 - #432 Previously the stage struct was a "hidden" state machine that could have two states: 1. A state where the Stage contains the input plan and is locally accessible and traversible. ```rust pub struct Stage { query_id: ... num: ... plan: Some(plan), tasks: vec![None, None, None] } ``` 2. A state where the input plan is serialized, and the worker URLs are assigned. This happens in `DistributedExec` right before execution on `prepare_plan()` ```rust pub struct Stage { query_id: ... num: ... plan: None, tasks: vec![Some("http://1"), Some("http://2"), Some("http://3")] } ``` This PR makes this behavior explicit, and represented with an `enum`: ```rust pub enum Stage { Local(LocalStage), Remote(RemoteStage), } pub struct LocalStage { pub query_id: Uuid, pub num: usize, pub plan: Arc<dyn ExecutionPlan>, pub tasks: usize, } pub struct RemoteStage { pub query_id: Uuid, pub num: usize, pub workers: Vec<Url>, } ```
gabotechs
added a commit
that referenced
this pull request
May 14, 2026
… planner (#416) This is a preparatory step towards: - #377 This is one PR from the following stack of PRs: - #422 - #424 - #416 <- you are here - #425 - #426 - #427 - #432 The main purpose of this PR is to make distributed planning in a single pass, rather than the current two that communicate each other via an intermediate struct (`AnnotatedPlan`). This change cascades into several other changes that produce a nicer public API for building custom distributed plans, but also produce a big diff. ## Dropping the two-step annotation + NB injection On a dynamic task assignation context, choosing the task count for a stage based on the previous one can no longer be done statically. After "annotating" a stage, and before "annotating" the one above, we need to be able to send it for execution, collect runtime metrics, and based on that decide the task count for the stage above. This means that the stage below should be good to be sent for execution before the full annotation process has finished, meaning that we need to do everything there is to be done in the "annotation" process, we can no longer divide the distribution process in several steps that recurse the whole plan. ## Network boundaries no longer mutate their children In order for network boundaries to know what mutations to apply to their children, they need to now how many consumer tasks are they going to be running, but this might not be know until execution time, so if we want to dynamically assign tasks to stages, there's no way at planning time that we can know how to mutate the children. For example, we do not now how to scale up a `RepartitionExec` if we don't know how many `NetworkShuffleExec`s are going to be consuming it. The responsibility of preparing network boundaries inputs (e.g., scaling RepartitionExec) is now factor out into a separate `network_boundary_scale_input()` function that can be called either at planning time or at execution time. Right now, it's still just called at planning time.
gabotechs
added a commit
that referenced
this pull request
May 15, 2026
This is a preparatory step towards: - #377 This is one PR from the following stack of PRs: - #422 - #424 - #416 - #425 <- you are here - #426 - #427 - #432 Removes `impl_set_plan.rs` in favor of just inlining its contents to `impl_coordinator_channel.rs`. In future changes, the relationship between `impl_set_plan.rs` and `impl_coordinator_channel.rs` will get more complex, increasing the function signature `impl_set_plan.rs` exposes to `impl_coordinator_channel.rs`. This proves that the split between those two files does not make sense, as they have never been able to evolve independently, so we may as well just not pay the price of a complex function signature in between.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
PR factored out from #416.
This is one PR from the following stack of PRs:
Previously, we where force-propagating a max task count assignation below
the NetworkBroadcast so that the remote build side has never more tasks
than the stage above.
In a dynamic task count assignation context, we can no longer do this, as by the time
you realize a remote build side is going to have more tasks than the stage above, the build side might have
already started executing, and by that time its task count is set in stone.
This is fine, build side in broadcast should have an arbitrarily more expensive
build side. What matters there is not that the build side is cheap to
execute, but that it returns little amount of data. A build side can return
very little data (just a couple of rows) and still be very expensive to execute
This is actually the reason why there is a small speedup in benchmarks.
tpch_sf10 1.09 faster ✔
tpcds_sf1 1.02 faster ✔