From 1ab146ad6cc119c7656ae1def75fd40697e5f94a Mon Sep 17 00:00:00 2001 From: Gene Bordegaray Date: Fri, 15 May 2026 23:11:09 -0400 Subject: [PATCH] Call take arrays once per repartitioned input batch (#22159) ## Which issue does this PR close? - Does not close an issue. - This is targeted toward high fanout repartitions we see in shuffles in distributed-datafusion (https://github.com/datafusion-contrib/datafusion-distributed/issues/385, https://github.com/datafusion-contrib/datafusion-distributed/issues/353). - Related ongoing effort to improve metrics and repartition performance: #21148, #22155. ## Rationale for this change Hash repartition currently builds one output batch per non-empty target partition by calling `take_arrays` separately for each partition. At high fanout this means an input batch can issue many take kernels, which shows in repartition-heavy queries. This changes hash repartition to concatenate the per-partition row indices, call `take_arrays` once for the input batch, and then slice the reordered batch back into per-partition output batches. This is complementary to #22010: that PR reduces channel/gate traffic from many small batches, while this PR reduces the Arrow take-kernel work required to create the repartitioned batches. ## What changes are included in this PR? - Replaces per-partition hash repartition `take_arrays` calls with one grouped `take_arrays` call per input batch. - Tracks partition ranges into the grouped reordered batch and returns zero-copy `RecordBatch::slice` outputs for each non-empty partition. How the grouped take works: ```text input rows: 0 1 2 3 4 5 6 partition 0: [2, 5] partition 1: [] partition 2: [0, 3, 4] partition 3: [1, 6] grouped indices: [2, 5, 0, 3, 4, 1, 6] partition ranges: [(0, start=0, len=2), (2, start=2, len=3), (3, start=5, len=2)] take once: rows [2, 5, 0, 3, 4, 1, 6] slice outputs: partition 0 = slice(0, 2) partition 2 = slice(2, 3) partition 3 = slice(5, 2) ``` ## Are these changes tested? - `cargo test -p datafusion-physical-plan repartition --lib` **Benchmarks:** Default TPCH SF10 summary, with no `--batch-size` override: | Partitions | main total ms | grouped total ms | change | wins | losses | biggest win | biggest loss |---:|---:|---:|---:|---:|---:|---:|---:| | 8 | 6234.28 | 6149.98 | -1.35% | 10 | 12 | Q3 -22.12% | Q21 5.47% | | 16 | 5602.63 | 5427.40 | -3.13% | 18 | 4 | Q21 -10.67% | Q10 4.47% | | 32 | 6097.10 | 5738.12 | -5.89% | 20 | 2 | Q8 -10.45% | Q6 0.47% | | 64 | 7194.70 | 6693.30 | -6.97% | 15 | 7 | Q21 -15.92% | Q1 5.02% | | 300 | 26276.60 | 23701.32 | -9.80% | 19 | 3 | Q21 -24.24% | Q1 9.45% |
TPCH SF10 default batch size, 8 partitions, all queries | Query | main ms | grouped ms | change | speedup | |---:|---:|---:|---:|---:| | Q1 | 411.40 | 356.46 | -13.35% | 1.154x | | Q2 | 117.97 | 95.68 | -18.89% | 1.233x | | Q3 | 278.88 | 217.20 | -22.12% | 1.284x | | Q4 | 104.92 | 98.47 | -6.15% | 1.065x | | Q5 | 311.36 | 302.12 | -2.97% | 1.031x | | Q6 | 133.82 | 135.13 | 0.98% | 0.990x | | Q7 | 366.32 | 360.50 | -1.59% | 1.016x | | Q8 | 408.86 | 391.14 | -4.33% | 1.045x | | Q9 | 509.62 | 515.74 | 1.20% | 0.988x | | Q10 | 278.97 | 273.80 | -1.85% | 1.019x | | Q11 | 76.55 | 75.34 | -1.58% | 1.016x | | Q12 | 175.56 | 180.87 | 3.03% | 0.971x | | Q13 | 211.03 | 221.03 | 4.74% | 0.955x | | Q14 | 183.05 | 189.95 | 3.77% | 0.964x | | Q15 | 318.46 | 328.85 | 3.26% | 0.968x | | Q16 | 57.66 | 57.70 | 0.08% | 0.999x | | Q17 | 505.05 | 503.77 | -0.25% | 1.003x | | Q18 | 596.49 | 602.77 | 1.05% | 0.990x | | Q19 | 273.60 | 283.41 | 3.58% | 0.965x | | Q20 | 255.73 | 267.30 | 4.52% | 0.957x | | Q21 | 592.44 | 624.83 | 5.47% | 0.948x | | Q22 | 66.54 | 67.91 | 2.05% | 0.980x |
TPCH SF10 default batch size, 16 partitions, all queries | Query | main ms | grouped ms | change | speedup | |---:|---:|---:|---:|---:| | Q1 | 278.95 | 276.85 | -0.76% | 1.008x | | Q2 | 95.84 | 94.86 | -1.02% | 1.010x | | Q3 | 205.24 | 195.05 | -4.97% | 1.052x | | Q4 | 93.60 | 90.58 | -3.23% | 1.033x | | Q5 | 292.34 | 289.93 | -0.82% | 1.008x | | Q6 | 103.59 | 106.02 | 2.35% | 0.977x | | Q7 | 370.86 | 365.40 | -1.47% | 1.015x | | Q8 | 392.01 | 364.63 | -6.98% | 1.075x | | Q9 | 512.66 | 483.45 | -5.70% | 1.060x | | Q10 | 246.01 | 257.01 | 4.47% | 0.957x | | Q11 | 73.33 | 70.21 | -4.26% | 1.045x | | Q12 | 143.58 | 143.53 | -0.04% | 1.000x | | Q13 | 193.30 | 193.57 | 0.14% | 0.999x | | Q14 | 157.01 | 146.14 | -6.92% | 1.074x | | Q15 | 255.58 | 258.73 | 1.23% | 0.988x | | Q16 | 58.90 | 57.14 | -2.97% | 1.031x | | Q17 | 515.22 | 491.16 | -4.67% | 1.049x | | Q18 | 518.50 | 516.90 | -0.31% | 1.003x | | Q19 | 212.28 | 211.63 | -0.30% | 1.003x | | Q20 | 229.34 | 224.54 | -2.09% | 1.021x | | Q21 | 594.29 | 530.87 | -10.67% | 1.119x | | Q22 | 60.20 | 59.18 | -1.70% | 1.017x |
TPCH SF10 default batch size, 32 partitions, all queries | Query | main ms | grouped ms | change | speedup | |---:|---:|---:|---:|---:| | Q1 | 294.06 | 286.80 | -2.47% | 1.025x | | Q2 | 110.59 | 103.55 | -6.37% | 1.068x | | Q3 | 227.86 | 215.08 | -5.61% | 1.059x | | Q4 | 110.99 | 102.77 | -7.41% | 1.080x | | Q5 | 347.34 | 319.49 | -8.02% | 1.087x | | Q6 | 109.39 | 109.91 | 0.47% | 0.995x | | Q7 | 423.98 | 385.42 | -9.09% | 1.100x | | Q8 | 428.83 | 384.00 | -10.45% | 1.117x | | Q9 | 559.81 | 510.72 | -8.77% | 1.096x | | Q10 | 269.17 | 265.02 | -1.54% | 1.016x | | Q11 | 85.99 | 80.21 | -6.72% | 1.072x | | Q12 | 153.67 | 150.36 | -2.15% | 1.022x | | Q13 | 197.90 | 188.72 | -4.64% | 1.049x | | Q14 | 161.28 | 153.93 | -4.55% | 1.048x | | Q15 | 259.92 | 261.01 | 0.42% | 0.996x | | Q16 | 64.90 | 63.70 | -1.85% | 1.019x | | Q17 | 574.18 | 531.76 | -7.39% | 1.080x | | Q18 | 572.23 | 538.68 | -5.86% | 1.062x | | Q19 | 227.95 | 220.19 | -3.41% | 1.035x | | Q20 | 232.25 | 225.80 | -2.78% | 1.029x | | Q21 | 622.54 | 579.94 | -6.84% | 1.073x | | Q22 | 62.29 | 61.06 | -1.98% | 1.020x |
TPCH SF10 default batch size, 64 partitions, all queries | Query | main ms | grouped ms | change | speedup | |---:|---:|---:|---:|---:| | Q1 | 285.10 | 299.41 | 5.02% | 0.952x | | Q2 | 161.42 | 153.51 | -4.90% | 1.052x | | Q3 | 297.85 | 272.09 | -8.65% | 1.095x | | Q4 | 147.28 | 140.69 | -4.47% | 1.047x | | Q5 | 428.29 | 381.28 | -10.98% | 1.123x | | Q6 | 106.27 | 108.41 | 2.02% | 0.980x | | Q7 | 494.50 | 443.89 | -10.23% | 1.114x | | Q8 | 507.01 | 446.69 | -11.90% | 1.135x | | Q9 | 667.11 | 624.78 | -6.34% | 1.068x | | Q10 | 294.91 | 299.05 | 1.40% | 0.986x | | Q11 | 112.17 | 104.79 | -6.57% | 1.070x | | Q12 | 168.23 | 166.87 | -0.81% | 1.008x | | Q13 | 198.74 | 196.30 | -1.23% | 1.012x | | Q14 | 175.31 | 177.74 | 1.39% | 0.986x | | Q15 | 265.68 | 265.71 | 0.01% | 1.000x | | Q16 | 85.69 | 82.93 | -3.22% | 1.033x | | Q17 | 691.32 | 629.09 | -9.00% | 1.099x | | Q18 | 697.85 | 617.88 | -11.46% | 1.129x | | Q19 | 237.00 | 243.78 | 2.86% | 0.972x | | Q20 | 272.55 | 278.57 | 2.21% | 0.978x | | Q21 | 813.29 | 683.82 | -15.92% | 1.189x | | Q22 | 87.14 | 76.04 | -12.73% | 1.146x |
TPCH SF10 default batch size, 300 partitions, all queries | Query | main ms | grouped ms | change | speedup | |---:|---:|---:|---:|---:| | Q1 | 277.84 | 304.08 | 9.45% | 0.914x | | Q2 | 1303.10 | 1268.65 | -2.64% | 1.027x | | Q3 | 1496.14 | 1393.15 | -6.88% | 1.074x | | Q4 | 681.20 | 652.85 | -4.16% | 1.043x | | Q5 | 1680.43 | 1469.91 | -12.53% | 1.143x | | Q6 | 100.65 | 105.75 | 5.07% | 0.952x | | Q7 | 1880.26 | 1652.83 | -12.10% | 1.138x | | Q8 | 1956.81 | 1760.72 | -10.02% | 1.111x | | Q9 | 1787.75 | 1454.84 | -18.62% | 1.229x | | Q10 | 1334.62 | 1296.02 | -2.89% | 1.030x | | Q11 | 1018.99 | 994.13 | -2.44% | 1.025x | | Q12 | 768.97 | 780.88 | 1.55% | 0.985x | | Q13 | 671.88 | 638.51 | -4.97% | 1.052x | | Q14 | 603.10 | 586.19 | -2.80% | 1.029x | | Q15 | 302.19 | 295.28 | -2.29% | 1.023x | | Q16 | 597.64 | 585.39 | -2.05% | 1.021x | | Q17 | 1963.57 | 1712.46 | -12.79% | 1.147x | | Q18 | 1942.46 | 1634.96 | -15.83% | 1.188x | | Q19 | 818.85 | 808.19 | -1.30% | 1.013x | | Q20 | 1499.39 | 1468.55 | -2.06% | 1.021x | | Q21 | 3006.35 | 2277.49 | -24.24% | 1.320x | | Q22 | 584.39 | 560.47 | -4.09% | 1.043x |
**Stress cases:** - These runs use `--batch-size 1024` to stress the repartition path. They are included to show the mechanism under smaller input batches and higher output fanout, not as the primary end-to-end performance claim.
TPCH SF10, 8 partitions, all queries | Query | main ms | grouped ms | change | speedup | |---:|---:|---:|---:|---:| | Q1 | 640.78 | 451.82 | -29.49% | 1.420x | | Q2 | 315.81 | 150.07 | -52.48% | 2.100x | | Q3 | 899.21 | 375.88 | -58.20% | 2.390x | | Q4 | 469.31 | 217.07 | -53.75% | 2.160x | | Q5 | 1131.37 | 446.36 | -60.55% | 2.530x | | Q6 | 376.40 | 163.66 | -56.52% | 2.300x | | Q7 | 1388.40 | 484.36 | -65.11% | 2.870x | | Q8 | 1369.67 | 571.62 | -58.27% | 2.400x | | Q9 | 1834.81 | 739.88 | -59.68% | 2.480x | | Q10 | 813.73 | 361.94 | -55.52% | 2.250x | | Q11 | 267.06 | 114.84 | -57.00% | 2.330x | | Q12 | 526.41 | 250.39 | -52.43% | 2.100x | | Q13 | 760.54 | 324.78 | -57.30% | 2.340x | | Q14 | 446.91 | 221.04 | -50.54% | 2.020x | | Q15 | 764.64 | 375.67 | -50.87% | 2.040x | | Q16 | 167.74 | 80.36 | -52.09% | 2.090x | | Q17 | 1801.72 | 763.58 | -57.62% | 2.360x | | Q18 | 3303.89 | 1649.87 | -50.06% | 2.000x | | Q19 | 694.16 | 354.97 | -48.86% | 1.960x | | Q20 | 693.91 | 323.17 | -53.43% | 2.150x | | Q21 | 3112.83 | 1065.36 | -65.78% | 2.920x | | Q22 | 205.88 | 95.97 | -53.38% | 2.150x |
TPCH SF10, 16 partitions, all queries | Query | main ms | grouped ms | change | speedup | |---:|---:|---:|---:|---:| | Q1 | 518.84 | 328.66 | -36.66% | 1.580x | | Q2 | 350.47 | 148.64 | -57.59% | 2.360x | | Q3 | 1003.55 | 371.04 | -63.03% | 2.700x | | Q4 | 589.70 | 258.05 | -56.24% | 2.290x | | Q5 | 1343.64 | 506.01 | -62.34% | 2.660x | | Q6 | 322.21 | 130.93 | -59.37% | 2.460x | | Q7 | 1527.85 | 550.88 | -63.94% | 2.770x | | Q8 | 1476.46 | 578.77 | -60.80% | 2.550x | | Q9 | 2091.16 | 785.54 | -62.44% | 2.660x | | Q10 | 817.98 | 331.02 | -59.53% | 2.470x | | Q11 | 341.46 | 123.31 | -63.89% | 2.770x | | Q12 | 493.51 | 221.61 | -55.10% | 2.230x | | Q13 | 690.52 | 290.54 | -57.92% | 2.380x | | Q14 | 410.54 | 171.45 | -58.24% | 2.390x | | Q15 | 733.96 | 290.56 | -60.41% | 2.530x | | Q16 | 197.35 | 86.09 | -56.37% | 2.290x | | Q17 | 2089.12 | 828.96 | -60.32% | 2.520x | | Q18 | 2712.00 | 1097.77 | -59.52% | 2.470x | | Q19 | 602.77 | 260.74 | -56.74% | 2.310x | | Q20 | 661.20 | 288.58 | -56.35% | 2.290x | | Q21 | 5490.50 | 1151.50 | -79.03% | 4.770x | | Q22 | 198.38 | 103.13 | -48.01% | 1.920x |
TPCH SF10, 32 partitions, all queries | Query | main ms | grouped ms | change | speedup | |---:|---:|---:|---:|---:| | Q1 | 533.86 | 338.54 | -36.59% | 1.580x | | Q2 | 439.59 | 199.50 | -54.62% | 2.200x | | Q3 | 1242.19 | 510.11 | -58.93% | 2.440x | | Q4 | 743.92 | 363.33 | -51.16% | 2.050x | | Q5 | 1711.97 | 666.50 | -61.07% | 2.570x | | Q6 | 325.39 | 134.07 | -58.80% | 2.430x | | Q7 | 1947.59 | 722.22 | -62.92% | 2.700x | | Q8 | 1914.31 | 775.62 | -59.48% | 2.470x | | Q9 | 2662.07 | 976.47 | -63.32% | 2.730x | | Q10 | 902.80 | 362.71 | -59.82% | 2.490x | | Q11 | 400.93 | 170.81 | -57.40% | 2.350x | | Q12 | 572.19 | 265.06 | -53.68% | 2.160x | | Q13 | 736.31 | 296.82 | -59.69% | 2.480x | | Q14 | 430.11 | 180.93 | -57.93% | 2.380x | | Q15 | 732.36 | 327.12 | -55.33% | 2.240x | | Q16 | 245.97 | 116.24 | -52.74% | 2.120x | | Q17 | 2711.18 | 1100.17 | -59.42% | 2.460x | | Q18 | 2946.70 | 1176.02 | -60.09% | 2.510x | | Q19 | 600.47 | 258.58 | -56.94% | 2.320x | | Q20 | 765.20 | 337.01 | -55.96% | 2.270x | | Q21 | 10062.70 | 1534.95 | -84.75% | 6.560x | | Q22 | 250.50 | 128.27 | -48.79% | 1.950x |
TPCH SF10, 64 partitions, all queries | Query | main ms | grouped ms | change | speedup | |---:|---:|---:|---:|---:| | Q1 | 595.70 | 324.74 | -45.49% | 1.830x | | Q2 | 663.08 | 305.30 | -53.96% | 2.170x | | Q3 | 1744.90 | 727.81 | -58.29% | 2.400x | | Q4 | 1070.72 | 566.20 | -47.12% | 1.890x | | Q5 | 2447.07 | 938.91 | -61.63% | 2.610x | | Q6 | 315.47 | 132.73 | -57.93% | 2.380x | | Q7 | 2807.33 | 1004.83 | -64.21% | 2.790x | | Q8 | 2674.51 | 1069.64 | -60.01% | 2.500x | | Q9 | 3777.94 | 1424.08 | -62.31% | 2.650x | | Q10 | 1086.91 | 469.38 | -56.82% | 2.320x | | Q11 | 575.59 | 264.02 | -54.13% | 2.180x | | Q12 | 841.83 | 387.25 | -54.00% | 2.170x | | Q13 | 867.57 | 379.90 | -56.21% | 2.280x | | Q14 | 470.87 | 214.58 | -54.43% | 2.190x | | Q15 | 762.07 | 340.55 | -55.31% | 2.240x | | Q16 | 337.20 | 179.25 | -46.84% | 1.880x | | Q17 | 3953.82 | 1701.46 | -56.97% | 2.320x | | Q18 | 3763.51 | 1606.90 | -57.30% | 2.340x | | Q19 | 644.43 | 314.27 | -51.23% | 2.050x | | Q20 | 973.56 | 453.24 | -53.45% | 2.150x | | Q21 | 19356.91 | 2396.96 | -87.62% | 8.080x | | Q22 | 366.20 | 195.40 | -46.64% | 1.870x |
TPCH SF10, 300 partitions, targeted high-fanout queries Yes this is a real use case for fanout in distributed-datafusion | Query | main ms | grouped ms | change | speedup | |---:|---:|---:|---:|---:| | Q3 | 2543.94 | 2250.91 | -11.52% | 1.130x | | Q9 | 6495.22 | 4755.78 | -26.78% | 1.370x | | Q10 | 1869.05 | 1709.18 | -8.55% | 1.090x | | Q13 | 1238.63 | 1157.47 | -6.55% | 1.070x | | Q15 | 461.51 | 446.25 | -3.31% | 1.030x | | Q21 | 37810.29 | 5594.01 | -85.21% | 6.760x | | Q22 | 1084.95 | 1058.74 | -2.42% | 1.020x |
TPCH SF10, 300 partitions, peak RSS stress Measured with `/usr/bin/time -l`, one iteration, `--batch-size 1024`, `--partitions 300`, and no DataFusion memory limit. RSS is process peak resident set size from the OS. | Query | main ms | grouped ms | time change | main peak RSS | grouped peak RSS | RSS change | |---:|---:|---:|---:|---:|---:|---:| | Q7 | 5171.45 | 4151.15 | -19.73% | 3.69 GiB | 3.75 GiB | 1.61% | | Q9 | 6055.57 | 4758.10 | -21.43% | 4.01 GiB | 4.01 GiB | 0.04% | | Q21 | 36300.80 | 5810.14 | -83.99% | 2.96 GiB | 2.05 GiB | -30.79% |
#### Memory concern and follow-up work This PR changes output batches from materializing per-partition batches to slices of one reordered batch. This means sibling slices can share the same buffers. Potential concern: ```text one reordered batch allocation -> slice for partition 0 -> slice for partition 1 -> slice for partition 2 ``` A slow output partition can keep the shared reordered batch buffers alive until its slice is dropped. Also, `RecordBatch::get_array_memory_size()` may count shared slice buffers repeatedly when repartition reserves memory per output batch. The peak RSS stress above did not show a process-memory regression in the measured queries. Follow-up work should add buffer-aware accounting. ## Are there any user-facing changes? No. --- .../physical-plan/src/repartition/mod.rs | 105 +++++++++++------- 1 file changed, 63 insertions(+), 42 deletions(-) diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index 873ba8a5ee487..f2842e5f773e1 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -52,8 +52,7 @@ use datafusion_common::stats::Precision; use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_common::utils::transpose; use datafusion_common::{ - ColumnStatistics, DataFusionError, HashMap, assert_or_internal_err, - internal_datafusion_err, internal_err, + ColumnStatistics, DataFusionError, HashMap, assert_or_internal_err, internal_err, }; use datafusion_common::{Result, not_impl_err}; use datafusion_common_runtime::SpawnedTask; @@ -681,46 +680,8 @@ impl BatchPartitioner { // Finished building index-arrays for output partitions timer.done(); - // Borrowing partitioner timer to prevent moving `self` to closure - let partitioner_timer = &self.timer; - - let mut partitioned_batches = vec![]; - for (partition, p_indices) in indices.iter_mut().enumerate() { - if !p_indices.is_empty() { - let taken_indices = std::mem::take(p_indices); - let indices_array: PrimitiveArray = - taken_indices.into(); - - // Tracking time required for repartitioned batches construction - let _timer = partitioner_timer.timer(); - - // Produce batches based on indices - let columns = - take_arrays(batch.columns(), &indices_array, None)?; - - let mut options = RecordBatchOptions::new(); - options = options.with_row_count(Some(indices_array.len())); - let batch = RecordBatch::try_new_with_options( - batch.schema(), - columns, - &options, - ) - .unwrap(); - - partitioned_batches.push(Ok((partition, batch))); - - // Return the taken vec - let (_, buffer, _) = indices_array.into_parts(); - let mut vec = - buffer.into_inner().into_vec::().map_err(|e| { - internal_datafusion_err!( - "Could not convert buffer to vec: {e:?}" - ) - })?; - vec.clear(); - *p_indices = vec; - } - } + let partitioned_batches = + Self::partition_grouped_take(&batch, indices, &self.timer)?; Box::new(partitioned_batches.into_iter()) } @@ -736,6 +697,66 @@ impl BatchPartitioner { BatchPartitionerState::Hash { indices, .. } => indices.len(), } } + + /// Build repartitioned hash output batches using one `take` per input batch. + /// + /// The hash router first fills one index vector per output partition. This method + /// concatenates those index vectors, performs one grouped `take_arrays`, and + /// then returns each output partition as a slice of the reordered batch. + /// + /// For example, given partition indices: + /// + /// ```text + /// partition 0: [2, 5] + /// partition 1: [] + /// partition 2: [0, 3, 4] + /// ``` + /// + /// this method takes rows in `[2, 5, 0, 3, 4]` order once, then returns + /// `partition 0 = slice(0, 2)` and `partition 2 = slice(2, 3)`. + fn partition_grouped_take( + batch: &RecordBatch, + indices: &mut [Vec], + timer: &metrics::Time, + ) -> Result>> { + let mut partition_ranges = Vec::with_capacity(indices.len()); + let mut reordered_indices = Vec::with_capacity(batch.num_rows()); + + for (partition, p_indices) in indices.iter_mut().enumerate() { + if p_indices.is_empty() { + continue; + } + + let start = reordered_indices.len(); + reordered_indices.extend_from_slice(p_indices); + partition_ranges.push((partition, start, p_indices.len())); + p_indices.clear(); + } + + if reordered_indices.is_empty() { + return Ok(vec![]); + } + + let batches = { + let _timer = timer.timer(); + let indices_array: PrimitiveArray = reordered_indices.into(); + let columns = take_arrays(batch.columns(), &indices_array, None)?; + + let mut options = RecordBatchOptions::new(); + options = options.with_row_count(Some(indices_array.len())); + let reordered_batch = + RecordBatch::try_new_with_options(batch.schema(), columns, &options)?; + + partition_ranges + .into_iter() + .map(|(partition, start, len)| { + Ok((partition, reordered_batch.slice(start, len))) + }) + .collect() + }; + + Ok(batches) + } } /// Maps `N` input partitions to `M` output partitions based on a