From a0a727c4dd1de6e02ff9c42b989060bab5c1517f Mon Sep 17 00:00:00 2001 From: Gene Bordegaray Date: Wed, 13 May 2026 07:08:46 -0400 Subject: [PATCH 1/2] Call take arrays once per repartitioned input batch --- .../physical-plan/src/repartition/mod.rs | 102 ++++++++++-------- 1 file changed, 60 insertions(+), 42 deletions(-) diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index b4af6e2c09a5c..6512a58ad4f9b 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -52,8 +52,7 @@ use datafusion_common::stats::Precision; use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_common::utils::transpose; use datafusion_common::{ - ColumnStatistics, DataFusionError, HashMap, assert_or_internal_err, - internal_datafusion_err, internal_err, + ColumnStatistics, DataFusionError, HashMap, assert_or_internal_err, internal_err, }; use datafusion_common::{Result, not_impl_err}; use datafusion_common_runtime::SpawnedTask; @@ -681,46 +680,8 @@ impl BatchPartitioner { // Finished building index-arrays for output partitions timer.done(); - // Borrowing partitioner timer to prevent moving `self` to closure - let partitioner_timer = &self.timer; - - let mut partitioned_batches = vec![]; - for (partition, p_indices) in indices.iter_mut().enumerate() { - if !p_indices.is_empty() { - let taken_indices = std::mem::take(p_indices); - let indices_array: PrimitiveArray = - taken_indices.into(); - - // Tracking time required for repartitioned batches construction - let _timer = partitioner_timer.timer(); - - // Produce batches based on indices - let columns = - take_arrays(batch.columns(), &indices_array, None)?; - - let mut options = RecordBatchOptions::new(); - options = options.with_row_count(Some(indices_array.len())); - let batch = RecordBatch::try_new_with_options( - batch.schema(), - columns, - &options, - ) - .unwrap(); - - partitioned_batches.push(Ok((partition, batch))); - - // Return the taken vec - let (_, buffer, _) = indices_array.into_parts(); - let mut vec = - buffer.into_inner().into_vec::().map_err(|e| { - internal_datafusion_err!( - "Could not convert buffer to vec: {e:?}" - ) - })?; - vec.clear(); - *p_indices = vec; - } - } + let partitioned_batches = + Self::partition_grouped_take(&batch, indices, &self.timer)?; Box::new(partitioned_batches.into_iter()) } @@ -736,6 +697,63 @@ impl BatchPartitioner { BatchPartitionerState::Hash { indices, .. } => indices.len(), } } + + /// Build repartitioned hash output batches using one `take` per input batch. + /// + /// The hash router first fills one index vector per output partition. This method + /// concatenates those index vectors, performs one grouped `take_arrays`, and + /// then returns each output partition as a slice of the reordered batch. + /// + /// For example, given partition indices: + /// + /// ```text + /// partition 0: [2, 5] + /// partition 1: [] + /// partition 2: [0, 3, 4] + /// ``` + /// + /// this method takes rows in `[2, 5, 0, 3, 4]` order once, then returns + /// `partition 0 = slice(0, 2)` and `partition 2 = slice(2, 3)`. + fn partition_grouped_take( + batch: &RecordBatch, + indices: &mut [Vec], + timer: &metrics::Time, + ) -> Result>> { + let mut partition_ranges = Vec::with_capacity(indices.len()); + let mut reordered_indices = Vec::with_capacity(batch.num_rows()); + + for (partition, p_indices) in indices.iter_mut().enumerate() { + if p_indices.is_empty() { + continue; + } + + let start = reordered_indices.len(); + reordered_indices.extend_from_slice(p_indices); + partition_ranges.push((partition, start, p_indices.len())); + p_indices.clear(); + } + + if reordered_indices.is_empty() { + return Ok(vec![]); + } + + let indices_array: PrimitiveArray = reordered_indices.into(); + let reordered_batch = { + let _timer = timer.timer(); + let columns = take_arrays(batch.columns(), &indices_array, None)?; + + let mut options = RecordBatchOptions::new(); + options = options.with_row_count(Some(indices_array.len())); + RecordBatch::try_new_with_options(batch.schema(), columns, &options).unwrap() + }; + + Ok(partition_ranges + .into_iter() + .map(|(partition, start, len)| { + Ok((partition, reordered_batch.slice(start, len))) + }) + .collect()) + } } /// Maps `N` input partitions to `M` output partitions based on a From 96b9172178ad62cac52c497649e89de5ae97be16 Mon Sep 17 00:00:00 2001 From: Gene Bordegaray Date: Thu, 14 May 2026 07:49:25 -0400 Subject: [PATCH 2/2] address comments --- .../physical-plan/src/repartition/mod.rs | 21 +++++++++++-------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index 6512a58ad4f9b..f0dfe34544982 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -737,22 +737,25 @@ impl BatchPartitioner { return Ok(vec![]); } - let indices_array: PrimitiveArray = reordered_indices.into(); - let reordered_batch = { + let batches = { let _timer = timer.timer(); + let indices_array: PrimitiveArray = reordered_indices.into(); let columns = take_arrays(batch.columns(), &indices_array, None)?; let mut options = RecordBatchOptions::new(); options = options.with_row_count(Some(indices_array.len())); - RecordBatch::try_new_with_options(batch.schema(), columns, &options).unwrap() + let reordered_batch = + RecordBatch::try_new_with_options(batch.schema(), columns, &options)?; + + partition_ranges + .into_iter() + .map(|(partition, start, len)| { + Ok((partition, reordered_batch.slice(start, len))) + }) + .collect() }; - Ok(partition_ranges - .into_iter() - .map(|(partition, start, len)| { - Ok((partition, reordered_batch.slice(start, len))) - }) - .collect()) + Ok(batches) } }