Remove 2-pass planner (annotator + distribution) in favor of a 1-pass planner#416
Conversation
314bac8 to
8c98bc8
Compare
f3cd3dc to
7626c71
Compare
2ff4808 to
d12d12b
Compare
| #[cfg(test)] | ||
| mod tests { |
There was a problem hiding this comment.
Unfortunately, Github is not smart enough to show a proper diff. These tests have barely changed.
6b2e85b to
c6d91a8
Compare
7f9c28f to
e623085
Compare
c6d91a8 to
8285ee3
Compare
asolimando
left a comment
There was a problem hiding this comment.
LGTM, I focused more on the conceptual part than on the details of the code itself, and I think this looks very good!
One pass planning and the clear split for stages over network boundaries is the perfect place to tune the "dimensionality" of the plan (partitions and scaling).
This is a very sensible trade-off between purely static planning which can suffer from bad estimations for the number of partitions, without paying the price (for now?) for full replanning: you fix the structure early on based on the plan structure single-node DF computes, which anyway something that is way harder to adjust dynamically, but you tune the level of parallelism which in a good class of query shapes is the dominating cost, and this tuning stays tractable.
The clear separation in stages you are defining here also makes per-stage observability easier, and keeps the door open for having fault-tolerance/retries at the stage level tomorrow, if needed.
PR factored out from #416. This is one PR from the following stack of PRs: - #422 <- you are here - #424 - #416 - #425 - #426 - #427 - #432 Previously, we where force-propagating a max task count assignation below the NetworkBroadcast so that the remote build side has never more tasks than the stage above. In a dynamic task count assignation context, we can no longer do this, as by the time you realize a remote build side is going to have more tasks than the stage above, the build side might have already started executing, and by that time its task count is set in stone. This is fine, build side in broadcast should have an arbitrarily more expensive build side. What matters there is not that the build side is cheap to execute, but that it returns little amount of data. A build side can return very little data (just a couple of rows) and still be very expensive to execute This is actually the reason why there is a small speedup in benchmarks. --- <details><summary>tpch_sf10 1.09 faster ✔</summary> ```text === Comparing tpch_sf10 results from engine 'datafusion-distributed-main' [prev] with 'datafusion-distributed-dynamic-task-allocation' [new] === q1: prev=1269 ms, new=1286 ms, diff=1.01 slower ✖ q2: prev= 390 ms, new= 395 ms, diff=1.01 slower ✖ q3: prev= 784 ms, new= 826 ms, diff=1.05 slower ✖ q4: prev= 413 ms, new= 392 ms, diff=1.05 faster ✔ q5: prev=1306 ms, new=1242 ms, diff=1.05 faster ✔ q6: prev= 534 ms, new= 528 ms, diff=1.01 faster ✔ q7: prev=1483 ms, new=1420 ms, diff=1.04 faster ✔ q8: prev=3001 ms, new=1585 ms, diff=1.89 faster ✅ q9: prev=2054 ms, new=2009 ms, diff=1.02 faster ✔ q10: prev= 951 ms, new= 921 ms, diff=1.03 faster ✔ q11: prev= 322 ms, new= 304 ms, diff=1.06 faster ✔ q12: prev= 670 ms, new= 676 ms, diff=1.01 slower ✖ q13: prev= 624 ms, new= 613 ms, diff=1.02 faster ✔ q14: prev= 594 ms, new= 546 ms, diff=1.09 faster ✔ q15: prev= 778 ms, new= 756 ms, diff=1.03 faster ✔ q16: prev= 223 ms, new= 219 ms, diff=1.02 faster ✔ q17: prev=1644 ms, new=1733 ms, diff=1.05 slower ✖ q18: prev=1884 ms, new=1966 ms, diff=1.04 slower ✖ q19: prev= 802 ms, new= 727 ms, diff=1.10 faster ✔ q20: prev= 784 ms, new= 706 ms, diff=1.11 faster ✔ q21: prev=2112 ms, new=1925 ms, diff=1.10 faster ✔ q22: prev= 251 ms, new= 261 ms, diff=1.04 slower ✖ TOTAL: prev=68651.703894 ms, new=63144.566305999986 ms, diff=1.09 faster ✔ ``` </details> <details><summary>tpcds_sf1 1.02 faster ✔</summary> ```text === Comparing tpcds_sf1 results from engine 'datafusion-distributed-dynamic-task-allocation' [prev] with 'datafusion-distributed-dynamic-task-allocation' [new] === q1: prev= 260 ms, new= 336 ms, diff=1.29 slower ❌ q2: prev= 290 ms, new= 321 ms, diff=1.11 slower ✖ q3: prev= 181 ms, new= 215 ms, diff=1.19 slower ✖ q4: prev=2039 ms, new=2184 ms, diff=1.07 slower ✖ q5: prev= 333 ms, new= 325 ms, diff=1.02 faster ✔ q6: prev= 622 ms, new= 676 ms, diff=1.09 slower ✖ q7: prev= 225 ms, new= 225 ms, diff=1.00 slower ✖ q8: prev= 312 ms, new= 200 ms, diff=1.56 faster ✅ q9: prev= 242 ms, new= 189 ms, diff=1.28 faster ✅ q10: prev= 480 ms, new= 494 ms, diff=1.03 slower ✖ q11: prev=1511 ms, new=1382 ms, diff=1.09 faster ✔ q12: prev= 262 ms, new= 292 ms, diff=1.11 slower ✖ q13: prev= 477 ms, new= 487 ms, diff=1.02 slower ✖ q14: prev= 637 ms, new= 782 ms, diff=1.23 slower ❌ q15: prev= 170 ms, new= 144 ms, diff=1.18 faster ✔ q16: prev= 350 ms, new= 379 ms, diff=1.08 slower ✖ q17: prev= 229 ms, new= 250 ms, diff=1.09 slower ✖ q18: prev= 295 ms, new= 281 ms, diff=1.05 faster ✔ q19: prev= 286 ms, new= 254 ms, diff=1.13 faster ✔ q20: prev= 206 ms, new= 143 ms, diff=1.44 faster ✅ q21: prev= 305 ms, new= 282 ms, diff=1.08 faster ✔ q22: prev= 390 ms, new= 401 ms, diff=1.03 slower ✖ q23: prev= 672 ms, new= 640 ms, diff=1.05 faster ✔ q24: prev= 368 ms, new= 376 ms, diff=1.02 slower ✖ q25: prev= 203 ms, new= 279 ms, diff=1.37 slower ❌ q26: prev= 147 ms, new= 198 ms, diff=1.35 slower ❌ q27: prev= 406 ms, new= 358 ms, diff=1.13 faster ✔ q28: prev= 195 ms, new= 161 ms, diff=1.21 faster ✅ q29: prev= 237 ms, new= 219 ms, diff=1.08 faster ✔ q31: prev= 343 ms, new= 327 ms, diff=1.05 faster ✔ q32: prev= 142 ms, new= 152 ms, diff=1.07 slower ✖ q33: prev= 277 ms, new= 211 ms, diff=1.31 faster ✅ q34: prev= 199 ms, new= 188 ms, diff=1.06 faster ✔ q35: prev= 514 ms, new= 498 ms, diff=1.03 faster ✔ q36: prev= 341 ms, new= 311 ms, diff=1.10 faster ✔ q37: prev= 256 ms, new= 302 ms, diff=1.18 slower ✖ q38: prev= 228 ms, new= 245 ms, diff=1.07 slower ✖ q39: prev= 259 ms, new= 266 ms, diff=1.03 slower ✖ q40: prev= 281 ms, new= 325 ms, diff=1.16 slower ✖ q41: prev= 87 ms, new= 90 ms, diff=1.03 slower ✖ q42: prev= 116 ms, new= 124 ms, diff=1.07 slower ✖ q43: prev= 190 ms, new= 132 ms, diff=1.44 faster ✅ q44: prev= 214 ms, new= 144 ms, diff=1.49 faster ✅ q45: prev= 244 ms, new= 186 ms, diff=1.31 faster ✅ q46: prev= 355 ms, new= 288 ms, diff=1.23 faster ✅ q47: prev= 374 ms, new= 387 ms, diff=1.03 slower ✖ q48: prev= 384 ms, new= 360 ms, diff=1.07 faster ✔ q49: prev= 285 ms, new= 229 ms, diff=1.24 faster ✅ q50: prev= 352 ms, new= 343 ms, diff=1.03 faster ✔ q51: prev= 305 ms, new= 224 ms, diff=1.36 faster ✅ q52: prev= 138 ms, new= 127 ms, diff=1.09 faster ✔ q53: prev= 143 ms, new= 158 ms, diff=1.10 slower ✖ q54: prev= 331 ms, new= 271 ms, diff=1.22 faster ✅ q55: prev= 132 ms, new= 145 ms, diff=1.10 slower ✖ q56: prev= 298 ms, new= 233 ms, diff=1.28 faster ✅ q57: prev= 335 ms, new= 354 ms, diff=1.06 slower ✖ q58: prev= 280 ms, new= 284 ms, diff=1.01 slower ✖ q59: prev= 293 ms, new= 270 ms, diff=1.09 faster ✔ q60: prev= 361 ms, new= 311 ms, diff=1.16 faster ✔ q61: prev= 856 ms, new= 849 ms, diff=1.01 faster ✔ q62: prev= 639 ms, new= 665 ms, diff=1.04 slower ✖ q63: prev= 224 ms, new= 148 ms, diff=1.51 faster ✅ q64: prev=1159 ms, new=1193 ms, diff=1.03 slower ✖ q65: prev= 229 ms, new= 228 ms, diff=1.00 faster ✔ q66: prev= 730 ms, new= 714 ms, diff=1.02 faster ✔ q67: prev= 406 ms, new= 420 ms, diff=1.03 slower ✖ q68: prev= 289 ms, new= 320 ms, diff=1.11 slower ✖ q69: prev= 513 ms, new= 570 ms, diff=1.11 slower ✖ q70: prev= 394 ms, new= 386 ms, diff=1.02 faster ✔ q71: prev= 250 ms, new= 329 ms, diff=1.32 slower ❌ q72: prev=6644 ms, new=6609 ms, diff=1.01 faster ✔ q73: prev= 201 ms, new= 210 ms, diff=1.04 slower ✖ q74: prev= 797 ms, new= 743 ms, diff=1.07 faster ✔ q75: prev= 375 ms, new= 452 ms, diff=1.21 slower ❌ q76: prev= 165 ms, new= 230 ms, diff=1.39 slower ❌ q77: prev= 232 ms, new= 271 ms, diff=1.17 slower ✖ q78: prev= 341 ms, new= 353 ms, diff=1.04 slower ✖ q79: prev= 226 ms, new= 228 ms, diff=1.01 slower ✖ q80: prev= 332 ms, new= 336 ms, diff=1.01 slower ✖ q81: prev= 216 ms, new= 191 ms, diff=1.13 faster ✔ q82: prev= 258 ms, new= 262 ms, diff=1.02 slower ✖ q83: prev= 240 ms, new= 287 ms, diff=1.20 slower ✖ q84: prev= 240 ms, new= 228 ms, diff=1.05 faster ✔ q85: prev= 455 ms, new= 364 ms, diff=1.25 faster ✅ q86: prev= 124 ms, new= 138 ms, diff=1.11 slower ✖ q87: prev= 203 ms, new= 208 ms, diff=1.02 slower ✖ q88: prev= 404 ms, new= 350 ms, diff=1.15 faster ✔ q89: prev= 237 ms, new= 167 ms, diff=1.42 faster ✅ q90: prev= 189 ms, new= 187 ms, diff=1.01 faster ✔ q91: prev= 377 ms, new= 328 ms, diff=1.15 faster ✔ q92: prev= 284 ms, new= 131 ms, diff=2.17 faster ✅ q93: prev= 154 ms, new= 142 ms, diff=1.08 faster ✔ q94: prev= 302 ms, new= 308 ms, diff=1.02 slower ✖ q95: prev= 365 ms, new= 290 ms, diff=1.26 faster ✅ q96: prev= 177 ms, new= 157 ms, diff=1.13 faster ✔ q97: prev= 235 ms, new= 170 ms, diff=1.38 faster ✅ q98: prev= 165 ms, new= 159 ms, diff=1.04 faster ✔ q99: prev= 951 ms, new= 995 ms, diff=1.05 slower ✖ TOTAL: prev=123029.07797800002 ms, new=120962.55825200005 ms, diff=1.02 faster ✔ ``` </details>
An independent refactor factored out from #416 This is one PR from the following stack of PRs: - #422 - #424 <- you are here - #416 - #425 - #426 - #427 - #432 Previously the stage struct was a "hidden" state machine that could have two states: 1. A state where the Stage contains the input plan and is locally accessible and traversible. ```rust pub struct Stage { query_id: ... num: ... plan: Some(plan), tasks: vec![None, None, None] } ``` 2. A state where the input plan is serialized, and the worker URLs are assigned. This happens in `DistributedExec` right before execution on `prepare_plan()` ```rust pub struct Stage { query_id: ... num: ... plan: None, tasks: vec![Some("http://1"), Some("http://2"), Some("http://3")] } ``` This PR makes this behavior explicit, and represented with an `enum`: ```rust pub enum Stage { Local(LocalStage), Remote(RemoteStage), } pub struct LocalStage { pub query_id: Uuid, pub num: usize, pub plan: Arc<dyn ExecutionPlan>, pub tasks: usize, } pub struct RemoteStage { pub query_id: Uuid, pub num: usize, pub workers: Vec<Url>, } ```
64ce7e6 to
11a240b
Compare
11a240b to
dfe9199
Compare
Address feedbac
dfe9199 to
6b66a95
Compare
gene-bordegaray
left a comment
There was a problem hiding this comment.
mostly extending existing conversations
|
Looks good from my end. |
|
🙏 Thanks for the reviews guys! I know this is pretty tricky code, and getting you to also be aware of it is great. |
This is a preparatory step towards: - #377 This is one PR from the following stack of PRs: - #422 - #424 - #416 - #425 <- you are here - #426 - #427 - #432 Removes `impl_set_plan.rs` in favor of just inlining its contents to `impl_coordinator_channel.rs`. In future changes, the relationship between `impl_set_plan.rs` and `impl_coordinator_channel.rs` will get more complex, increasing the function signature `impl_set_plan.rs` exposes to `impl_coordinator_channel.rs`. This proves that the split between those two files does not make sense, as they have never been able to evolve independently, so we may as well just not pay the price of a complex function signature in between.
This is a preparatory step towards:
This is one PR from the following stack of PRs:
The main purpose of this PR is to make distributed planning in a single pass, rather than the current two that communicate each other via an intermediate struct (
AnnotatedPlan). This change cascades into several other changes that produce a nicer public API for building custom distributed plans, but also produce a big diff.Dropping the two-step annotation + NB injection
On a dynamic task assignation context, choosing the task count for a stage based on the previous one can no longer
be done statically.
After "annotating" a stage, and before "annotating" the
one above, we need to be able to send it for execution, collect runtime
metrics, and based on that decide the task count for the stage above.
This means that the stage below should be good to be sent for execution
before the full annotation process has finished, meaning that we need to
do everything there is to be done in the "annotation" process, we can no
longer divide the distribution process in several steps that recurse the
whole plan.
Network boundaries no longer mutate their children
In order for network boundaries to know what mutations to apply to their
children, they need to now how many consumer tasks are they going to be
running, but this might not be know until execution time, so if we want to
dynamically assign tasks to stages, there's no way at planning time that
we can know how to mutate the children.
For example, we do not now how to scale up a
RepartitionExecif we don'tknow how many
NetworkShuffleExecs are going to be consuming it.The responsibility of preparing network boundaries inputs (e.g., scaling RepartitionExec)
is now factor out into a separate
network_boundary_scale_input()functionthat can be called either at planning time or at execution time.
Right now, it's still just called at planning time.