From 5ebb03a5981de04899672fdf2a2115e4135d557f Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 19 Mar 2026 14:07:27 -0600 Subject: [PATCH 01/11] feat: add PartitionBuffer with typed ColumnBuffer for scatter kernel --- .../src/execution/shuffle/partitioners/mod.rs | 1 + .../shuffle/partitioners/partition_buffer.rs | 453 ++++++++++++++++++ 2 files changed, 454 insertions(+) create mode 100644 native/core/src/execution/shuffle/partitioners/partition_buffer.rs diff --git a/native/core/src/execution/shuffle/partitioners/mod.rs b/native/core/src/execution/shuffle/partitioners/mod.rs index b9058f66f4..8cd9ee0c42 100644 --- a/native/core/src/execution/shuffle/partitioners/mod.rs +++ b/native/core/src/execution/shuffle/partitioners/mod.rs @@ -16,6 +16,7 @@ // under the License. mod multi_partition; +pub(super) mod partition_buffer; mod partitioned_batch_iterator; mod single_partition; diff --git a/native/core/src/execution/shuffle/partitioners/partition_buffer.rs b/native/core/src/execution/shuffle/partitioners/partition_buffer.rs new file mode 100644 index 0000000000..e082174b55 --- /dev/null +++ b/native/core/src/execution/shuffle/partitioners/partition_buffer.rs @@ -0,0 +1,453 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::array::{ + make_array, ArrayData, ArrayRef, BinaryArray, BooleanArray, BooleanBufferBuilder, + LargeBinaryArray, LargeStringArray, RecordBatch, StringArray, UInt32Array, +}; +use arrow::buffer::{Buffer, MutableBuffer, NullBuffer, OffsetBuffer, ScalarBuffer}; +use arrow::compute::take; +use arrow::datatypes::{DataType, SchemaRef}; +use datafusion::common::Result; +use std::sync::Arc; + +/// Per-partition typed column buffer for the scatter kernel. +pub(crate) enum ColumnBuffer { + Boolean { + values: BooleanBufferBuilder, + nulls: BooleanBufferBuilder, + }, + Fixed { + values: MutableBuffer, + byte_width: usize, + nulls: BooleanBufferBuilder, + }, + Variable { + offsets: Vec, + data: Vec, + nulls: BooleanBufferBuilder, + }, + LargeVariable { + offsets: Vec, + data: Vec, + nulls: BooleanBufferBuilder, + }, + Fallback { + indices: Vec, + }, +} + +impl ColumnBuffer { + pub(crate) fn append_fixed(&mut self, bytes: &[u8]) { + match self { + ColumnBuffer::Fixed { values, .. } => { + values.extend_from_slice(bytes); + } + _ => unreachable!("append_fixed called on non-Fixed variant"), + } + } + + pub(crate) fn append_variable(&mut self, bytes: &[u8]) { + match self { + ColumnBuffer::Variable { offsets, data, .. } => { + data.extend_from_slice(bytes); + offsets.push(data.len() as i32); + } + _ => unreachable!("append_variable called on non-Variable variant"), + } + } + + pub(crate) fn append_large_variable(&mut self, bytes: &[u8]) { + match self { + ColumnBuffer::LargeVariable { offsets, data, .. } => { + data.extend_from_slice(bytes); + offsets.push(data.len() as i64); + } + _ => unreachable!("append_large_variable called on non-LargeVariable variant"), + } + } + + pub(crate) fn append_bool(&mut self, value: bool) { + match self { + ColumnBuffer::Boolean { values, .. } => { + values.append(value); + } + _ => unreachable!("append_bool called on non-Boolean variant"), + } + } + + pub(crate) fn append_fallback_index(&mut self, idx: u32) { + match self { + ColumnBuffer::Fallback { indices } => { + indices.push(idx); + } + _ => unreachable!("append_fallback_index called on non-Fallback variant"), + } + } + + pub(crate) fn append_null_bit(&mut self, is_valid: bool) { + match self { + ColumnBuffer::Boolean { nulls, .. } + | ColumnBuffer::Fixed { nulls, .. } + | ColumnBuffer::Variable { nulls, .. } + | ColumnBuffer::LargeVariable { nulls, .. } => { + nulls.append(is_valid); + } + ColumnBuffer::Fallback { .. } => { + unreachable!("append_null_bit called on Fallback variant") + } + } + } + + pub(crate) fn memory_size(&self) -> usize { + match self { + ColumnBuffer::Boolean { values, nulls } => values.capacity() + nulls.capacity(), + ColumnBuffer::Fixed { values, nulls, .. } => values.capacity() + nulls.capacity(), + ColumnBuffer::Variable { + offsets, + data, + nulls, + } => { + offsets.capacity() * std::mem::size_of::() + + data.capacity() + + nulls.capacity() + } + ColumnBuffer::LargeVariable { + offsets, + data, + nulls, + } => { + offsets.capacity() * std::mem::size_of::() + + data.capacity() + + nulls.capacity() + } + ColumnBuffer::Fallback { indices } => { + indices.capacity() * std::mem::size_of::() + } + } + } +} + +/// Per-partition buffer that accumulates rows by scattering values into typed +/// column buffers. +pub(crate) struct PartitionBuffer { + pub(crate) columns: Vec, + pub(crate) row_count: usize, + schema: SchemaRef, +} + +impl PartitionBuffer { + pub(crate) fn new(schema: SchemaRef, estimated_rows: usize) -> Self { + let columns = schema + .fields() + .iter() + .map(|field| match field.data_type() { + DataType::Boolean => ColumnBuffer::Boolean { + values: BooleanBufferBuilder::new(estimated_rows), + nulls: BooleanBufferBuilder::new(estimated_rows), + }, + DataType::Int8 | DataType::UInt8 => ColumnBuffer::Fixed { + values: MutableBuffer::new(estimated_rows), + byte_width: 1, + nulls: BooleanBufferBuilder::new(estimated_rows), + }, + DataType::Int16 | DataType::UInt16 | DataType::Float16 => ColumnBuffer::Fixed { + values: MutableBuffer::new(estimated_rows * 2), + byte_width: 2, + nulls: BooleanBufferBuilder::new(estimated_rows), + }, + DataType::Int32 | DataType::UInt32 | DataType::Float32 | DataType::Date32 => { + ColumnBuffer::Fixed { + values: MutableBuffer::new(estimated_rows * 4), + byte_width: 4, + nulls: BooleanBufferBuilder::new(estimated_rows), + } + } + DataType::Int64 + | DataType::UInt64 + | DataType::Float64 + | DataType::Date64 + | DataType::Timestamp(_, _) + | DataType::Duration(_) => ColumnBuffer::Fixed { + values: MutableBuffer::new(estimated_rows * 8), + byte_width: 8, + nulls: BooleanBufferBuilder::new(estimated_rows), + }, + DataType::Decimal128(_, _) => ColumnBuffer::Fixed { + values: MutableBuffer::new(estimated_rows * 16), + byte_width: 16, + nulls: BooleanBufferBuilder::new(estimated_rows), + }, + DataType::Utf8 | DataType::Binary => ColumnBuffer::Variable { + offsets: vec![0i32], + data: vec![], + nulls: BooleanBufferBuilder::new(estimated_rows), + }, + DataType::LargeUtf8 | DataType::LargeBinary => ColumnBuffer::LargeVariable { + offsets: vec![0i64], + data: vec![], + nulls: BooleanBufferBuilder::new(estimated_rows), + }, + _ => ColumnBuffer::Fallback { indices: vec![] }, + }) + .collect(); + + Self { + columns, + row_count: 0, + schema, + } + } + + pub(crate) fn row_count(&self) -> usize { + self.row_count + } + + pub(crate) fn memory_size(&self) -> usize { + self.columns.iter().map(|c| c.memory_size()).sum() + } + + pub(crate) fn has_fallback_columns(&self) -> bool { + self.columns + .iter() + .any(|c| matches!(c, ColumnBuffer::Fallback { .. })) + } + + pub(crate) fn clear(&mut self) { + self.row_count = 0; + for col in &mut self.columns { + match col { + ColumnBuffer::Boolean { values, nulls } => { + *values = BooleanBufferBuilder::new(0); + *nulls = BooleanBufferBuilder::new(0); + } + ColumnBuffer::Fixed { values, nulls, .. } => { + *values = MutableBuffer::new(0); + *nulls = BooleanBufferBuilder::new(0); + } + ColumnBuffer::Variable { + offsets, + data, + nulls, + } => { + *offsets = vec![0i32]; + data.clear(); + *nulls = BooleanBufferBuilder::new(0); + } + ColumnBuffer::LargeVariable { + offsets, + data, + nulls, + } => { + *offsets = vec![0i64]; + data.clear(); + *nulls = BooleanBufferBuilder::new(0); + } + ColumnBuffer::Fallback { indices } => { + indices.clear(); + } + } + } + } + + pub(crate) fn flush(&mut self, fallback_batch: Option<&RecordBatch>) -> Result { + let row_count = self.row_count; + let mut arrays: Vec = Vec::with_capacity(self.columns.len()); + + for (col_idx, col) in self.columns.iter_mut().enumerate() { + let data_type = self.schema.field(col_idx).data_type().clone(); + let array: ArrayRef = match col { + ColumnBuffer::Fixed { + values, + nulls, + .. + } => { + let buffer = + Buffer::from(std::mem::replace(values, MutableBuffer::new(0))); + let mut builder = + ArrayData::builder(data_type).len(row_count).add_buffer(buffer); + if nulls.len() > 0 { + builder = builder + .null_bit_buffer(Some(nulls.finish().into_inner())); + } + let data = builder.build()?; + make_array(data) + } + ColumnBuffer::Variable { + offsets, + data, + nulls, + } => { + let offsets_buffer = + OffsetBuffer::new(ScalarBuffer::from(std::mem::replace( + offsets, + vec![0i32], + ))); + let values_buffer = Buffer::from(std::mem::take(data)); + let null_buffer = if nulls.len() > 0 { + Some(NullBuffer::new(nulls.finish())) + } else { + None + }; + match &data_type { + DataType::Utf8 => Arc::new(StringArray::new( + offsets_buffer, + values_buffer, + null_buffer, + )) as ArrayRef, + DataType::Binary => Arc::new(BinaryArray::new( + offsets_buffer, + values_buffer, + null_buffer, + )) as ArrayRef, + _ => unreachable!("Variable buffer with unexpected data type"), + } + } + ColumnBuffer::LargeVariable { + offsets, + data, + nulls, + } => { + let offsets_buffer = + OffsetBuffer::new(ScalarBuffer::from(std::mem::replace( + offsets, + vec![0i64], + ))); + let values_buffer = Buffer::from(std::mem::take(data)); + let null_buffer = if nulls.len() > 0 { + Some(NullBuffer::new(nulls.finish())) + } else { + None + }; + match &data_type { + DataType::LargeUtf8 => Arc::new(LargeStringArray::new( + offsets_buffer, + values_buffer, + null_buffer, + )) as ArrayRef, + DataType::LargeBinary => Arc::new(LargeBinaryArray::new( + offsets_buffer, + values_buffer, + null_buffer, + )) as ArrayRef, + _ => unreachable!("LargeVariable buffer with unexpected data type"), + } + } + ColumnBuffer::Boolean { values, nulls } => { + let values_buf = values.finish(); + let null_buffer = if nulls.len() > 0 { + Some(NullBuffer::new(nulls.finish())) + } else { + None + }; + Arc::new(BooleanArray::new(values_buf, null_buffer)) as ArrayRef + } + ColumnBuffer::Fallback { indices } => { + let fallback = fallback_batch + .expect("fallback_batch required for Fallback columns"); + let idx_array = UInt32Array::from(std::mem::take(indices)); + take(fallback.column(col_idx), &idx_array, None)? + } + }; + arrays.push(array); + } + + self.row_count = 0; + Ok(RecordBatch::try_new(self.schema.clone(), arrays)?) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use arrow::array::{Array, Int32Array}; + use arrow::datatypes::{Field, Schema}; + + #[test] + fn test_partition_buffer_basic() { + let schema = Arc::new(Schema::new(vec![ + Field::new("i", DataType::Int32, true), + Field::new("s", DataType::Utf8, true), + Field::new("b", DataType::Boolean, true), + ])); + let mut buf = PartitionBuffer::new(schema.clone(), 100); + + // Append 3 rows manually + // Row 0: i=1, s="hello", b=true, all valid + buf.columns[0].append_fixed(&1i32.to_le_bytes()); + buf.columns[0].append_null_bit(true); + buf.columns[1].append_variable(b"hello"); + buf.columns[1].append_null_bit(true); + buf.columns[2].append_bool(true); + buf.columns[2].append_null_bit(true); + buf.row_count += 1; + + // Row 1: i=NULL, s="world", b=false + buf.columns[0].append_fixed(&0i32.to_le_bytes()); + buf.columns[0].append_null_bit(false); // null + buf.columns[1].append_variable(b"world"); + buf.columns[1].append_null_bit(true); + buf.columns[2].append_bool(false); + buf.columns[2].append_null_bit(true); + buf.row_count += 1; + + // Row 2: i=42, s=NULL, b=true + buf.columns[0].append_fixed(&42i32.to_le_bytes()); + buf.columns[0].append_null_bit(true); + buf.columns[1].append_variable(b""); + buf.columns[1].append_null_bit(false); // null + buf.columns[2].append_bool(true); + buf.columns[2].append_null_bit(true); + buf.row_count += 1; + + let batch = buf.flush(None).unwrap(); + assert_eq!(batch.num_rows(), 3); + + // Check Int32 column + let col0 = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(col0.value(0), 1); + assert!(col0.is_null(1)); + assert_eq!(col0.value(2), 42); + + // Check Utf8 column + let col1 = batch + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(col1.value(0), "hello"); + assert_eq!(col1.value(1), "world"); + assert!(col1.is_null(2)); + + // Check Boolean column + let col2 = batch + .column(2) + .as_any() + .downcast_ref::() + .unwrap(); + assert!(col2.value(0)); + assert!(!col2.value(1)); + assert!(col2.value(2)); + + // After flush, row_count should be 0 + assert_eq!(buf.row_count(), 0); + } +} From d1b5af33094c015f05b25cd78855940796a6a5ce Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 19 Mar 2026 14:18:58 -0600 Subject: [PATCH 02/11] feat: integrate scatter kernel into shuffle partitioning Replace buffered_batches + partition_indices + interleave_record_batch with per-partition PartitionBuffers that accumulate values directly during insert via scatter_batch. Eliminates the gather step (55% of shuffle write time) and BatchCoalescer. Buffers auto-flush at batch_size. Remove PartitionedBatchIterator entirely. Change PartitionWriter::spill to accept &[RecordBatch]. --- .../src/execution/shuffle/partitioners/mod.rs | 2 - .../shuffle/partitioners/multi_partition.rs | 414 ++++++++++-------- .../partitioned_batch_iterator.rs | 110 ----- .../shuffle/writers/partition_writer.rs | 56 ++- 4 files changed, 267 insertions(+), 315 deletions(-) delete mode 100644 native/core/src/execution/shuffle/partitioners/partitioned_batch_iterator.rs diff --git a/native/core/src/execution/shuffle/partitioners/mod.rs b/native/core/src/execution/shuffle/partitioners/mod.rs index 8cd9ee0c42..9590c3c4d3 100644 --- a/native/core/src/execution/shuffle/partitioners/mod.rs +++ b/native/core/src/execution/shuffle/partitioners/mod.rs @@ -17,14 +17,12 @@ mod multi_partition; pub(super) mod partition_buffer; -mod partitioned_batch_iterator; mod single_partition; use arrow::record_batch::RecordBatch; use datafusion::common::Result; pub(super) use multi_partition::MultiPartitionShuffleRepartitioner; -pub(super) use partitioned_batch_iterator::PartitionedBatchIterator; pub(super) use single_partition::SinglePartitionShufflePartitioner; #[async_trait::async_trait] diff --git a/native/core/src/execution/shuffle/partitioners/multi_partition.rs b/native/core/src/execution/shuffle/partitioners/multi_partition.rs index 9c366ad462..027f48f865 100644 --- a/native/core/src/execution/shuffle/partitioners/multi_partition.rs +++ b/native/core/src/execution/shuffle/partitioners/multi_partition.rs @@ -16,24 +16,19 @@ // under the License. use crate::execution::shuffle::metrics::ShufflePartitionerMetrics; -use crate::execution::shuffle::partitioners::partitioned_batch_iterator::{ - PartitionedBatchIterator, PartitionedBatchesProducer, -}; +use crate::execution::shuffle::partitioners::partition_buffer::{self, PartitionBuffer}; use crate::execution::shuffle::partitioners::ShufflePartitioner; use crate::execution::shuffle::writers::{BufBatchWriter, PartitionWriter}; use crate::execution::shuffle::{ comet_partitioning, CometPartitioning, CompressionCodec, ShuffleBlockWriter, }; use crate::execution::tracing::{with_trace, with_trace_async}; -use arrow::array::{ArrayRef, RecordBatch}; -use arrow::datatypes::SchemaRef; -use datafusion::common::utils::proxy::VecAllocExt; +use arrow::array::{Array, ArrayRef, BooleanArray, RecordBatch}; +use arrow::datatypes::{DataType, SchemaRef}; use datafusion::common::DataFusionError; use datafusion::execution::memory_pool::{MemoryConsumer, MemoryReservation}; use datafusion::execution::runtime_env::RuntimeEnv; -use datafusion::physical_plan::metrics::Time; use datafusion_comet_spark_expr::murmur3::create_murmur3_hashes; -use itertools::Itertools; use std::fmt; use std::fmt::{Debug, Formatter}; use std::fs::{File, OpenOptions}; @@ -109,8 +104,8 @@ impl ScratchSpace { pub(crate) struct MultiPartitionShuffleRepartitioner { output_data_file: String, output_index_file: String, - buffered_batches: Vec, - partition_indices: Vec>, + partition_buffers: Vec, + has_fallback_columns: bool, partition_writers: Vec, shuffle_block_writer: ShuffleBlockWriter, /// Partitioning scheme to use @@ -172,6 +167,37 @@ impl MultiPartitionShuffleRepartitioner { .map(|_| PartitionWriter::try_new(shuffle_block_writer.clone())) .collect::>>()?; + let has_fallback_columns = schema.fields().iter().any(|f| { + !matches!( + f.data_type(), + DataType::Boolean + | DataType::Int8 + | DataType::Int16 + | DataType::Int32 + | DataType::Int64 + | DataType::UInt8 + | DataType::UInt16 + | DataType::UInt32 + | DataType::UInt64 + | DataType::Float16 + | DataType::Float32 + | DataType::Float64 + | DataType::Date32 + | DataType::Date64 + | DataType::Timestamp(_, _) + | DataType::Duration(_) + | DataType::Decimal128(_, _) + | DataType::Utf8 + | DataType::Binary + | DataType::LargeUtf8 + | DataType::LargeBinary + ) + }); + let estimated_rows_per_partition = batch_size / num_output_partitions.max(1); + let partition_buffers = (0..num_output_partitions) + .map(|_| PartitionBuffer::new(schema.clone(), estimated_rows_per_partition)) + .collect(); + let reservation = MemoryConsumer::new(format!("ShuffleRepartitioner[{partition}]")) .with_can_spill(true) .register(&runtime.memory_pool); @@ -179,8 +205,8 @@ impl MultiPartitionShuffleRepartitioner { Ok(Self { output_data_file, output_index_file, - buffered_batches: vec![], - partition_indices: vec![vec![]; num_output_partitions], + partition_buffers, + has_fallback_columns, partition_writers, shuffle_block_writer, partitioning, @@ -221,7 +247,7 @@ impl MultiPartitionShuffleRepartitioner { match &self.partitioning { CometPartitioning::Hash(exprs, num_output_partitions) => { let mut scratch = std::mem::take(&mut self.scratch); - let (partition_starts, partition_row_indices): (&Vec, &Vec) = { + let num_rows = { let mut timer = self.metrics.repart_time.timer(); // Evaluate partition expressions to get rows to apply partitioning scheme. @@ -249,24 +275,19 @@ impl MultiPartitionShuffleRepartitioner { }); } - // We now have partition ids for every input row, map that to partition starts - // and partition indices to eventually right these rows to partition buffers. + // We now have partition ids for every input row, map that to partition starts. scratch .map_partition_ids_to_starts_and_indices(*num_output_partitions, num_rows); timer.stop(); - Ok::<(&Vec, &Vec), DataFusionError>(( - &scratch.partition_starts, - &scratch.partition_row_indices, - )) - }?; - - self.buffer_partitioned_batch_may_spill( - input, - partition_row_indices, - partition_starts, - ) - .await?; + num_rows + }; + + self.scatter_batch( + &input, + &scratch.partition_ids[..num_rows], + &scratch.partition_starts, + )?; self.scratch = scratch; } CometPartitioning::RangePartitioning( @@ -276,7 +297,7 @@ impl MultiPartitionShuffleRepartitioner { bounds, ) => { let mut scratch = std::mem::take(&mut self.scratch); - let (partition_starts, partition_row_indices): (&Vec, &Vec) = { + let num_rows = { let mut timer = self.metrics.repart_time.timer(); // Evaluate partition expressions for values to apply partitioning scheme on. @@ -302,24 +323,19 @@ impl MultiPartitionShuffleRepartitioner { }); } - // We now have partition ids for every input row, map that to partition starts - // and partition indices to eventually right these rows to partition buffers. + // We now have partition ids for every input row, map that to partition starts. scratch .map_partition_ids_to_starts_and_indices(*num_output_partitions, num_rows); timer.stop(); - Ok::<(&Vec, &Vec), DataFusionError>(( - &scratch.partition_starts, - &scratch.partition_row_indices, - )) - }?; - - self.buffer_partitioned_batch_may_spill( - input, - partition_row_indices, - partition_starts, - ) - .await?; + num_rows + }; + + self.scatter_batch( + &input, + &scratch.partition_ids[..num_rows], + &scratch.partition_starts, + )?; self.scratch = scratch; } CometPartitioning::RoundRobin(num_output_partitions, max_hash_columns) => { @@ -331,7 +347,7 @@ impl MultiPartitionShuffleRepartitioner { // which sorts by UnsafeRow binary representation before assigning partitions. // However, both approaches provide even distribution and determinism. let mut scratch = std::mem::take(&mut self.scratch); - let (partition_starts, partition_row_indices): (&Vec, &Vec) = { + let num_rows = { let mut timer = self.metrics.repart_time.timer(); let num_rows = input.num_rows(); @@ -362,24 +378,19 @@ impl MultiPartitionShuffleRepartitioner { comet_partitioning::pmod(*hash, *num_output_partitions) as u32; }); - // We now have partition ids for every input row, map that to partition starts - // and partition indices to eventually write these rows to partition buffers. + // We now have partition ids for every input row, map that to partition starts. scratch .map_partition_ids_to_starts_and_indices(*num_output_partitions, num_rows); timer.stop(); - Ok::<(&Vec, &Vec), DataFusionError>(( - &scratch.partition_starts, - &scratch.partition_row_indices, - )) - }?; - - self.buffer_partitioned_batch_may_spill( - input, - partition_row_indices, - partition_starts, - ) - .await?; + num_rows + }; + + self.scatter_batch( + &input, + &scratch.partition_ids[..num_rows], + &scratch.partition_starts, + )?; self.scratch = scratch; } other => { @@ -393,68 +404,145 @@ impl MultiPartitionShuffleRepartitioner { Ok(()) } - async fn buffer_partitioned_batch_may_spill( + fn scatter_batch( &mut self, - input: RecordBatch, - partition_row_indices: &[u32], + input: &RecordBatch, + partition_ids: &[u32], partition_starts: &[u32], ) -> datafusion::common::Result<()> { - let mut mem_growth: usize = input.get_array_memory_size(); - let buffered_partition_idx = self.buffered_batches.len() as u32; - self.buffered_batches.push(input); - - // partition_starts conceptually slices partition_row_indices into smaller slices, - // each slice contains the indices of rows in input that will go into the corresponding - // partition. The following loop iterates over the slices and put the row indices into - // the indices array of the corresponding partition. - for (partition_id, (&start, &end)) in partition_starts - .iter() - .tuple_windows() - .enumerate() - .filter(|(_, (start, end))| start < end) - { - let row_indices = &partition_row_indices[start as usize..end as usize]; - - // Put row indices for the current partition into the indices array of that partition. - // This indices array will be used for calling interleave_record_batch to produce - // shuffled batches. - let indices = &mut self.partition_indices[partition_id]; - let before_size = indices.allocated_size(); - indices.reserve(row_indices.len()); - for row_idx in row_indices { - indices.push((buffered_partition_idx, *row_idx)); + let num_rows = input.num_rows(); + let num_partitions = self.partition_buffers.len(); + + // Track memory before scatter + let mem_before: usize = self.partition_buffers.iter().map(|b| b.memory_size()).sum(); + + // Column-oriented scatter: process one column at a time across all rows + for (col_idx, column) in input.columns().iter().enumerate() { + // Determine scatter path from first partition's column type + // (all partitions have the same column types) + let is_fixed = matches!( + self.partition_buffers[0].columns[col_idx], + partition_buffer::ColumnBuffer::Fixed { .. } + ); + let is_variable = matches!( + self.partition_buffers[0].columns[col_idx], + partition_buffer::ColumnBuffer::Variable { .. } + ); + let is_large_variable = matches!( + self.partition_buffers[0].columns[col_idx], + partition_buffer::ColumnBuffer::LargeVariable { .. } + ); + let is_boolean = matches!( + self.partition_buffers[0].columns[col_idx], + partition_buffer::ColumnBuffer::Boolean { .. } + ); + + let nulls = column.nulls(); + + if is_fixed { + let byte_width = match &self.partition_buffers[0].columns[col_idx] { + partition_buffer::ColumnBuffer::Fixed { byte_width, .. } => *byte_width, + _ => unreachable!(), + }; + let data = column.to_data(); + let values = data.buffers()[0].as_slice(); + for row in 0..num_rows { + let p = partition_ids[row] as usize; + let src_offset = row * byte_width; + self.partition_buffers[p].columns[col_idx] + .append_fixed(&values[src_offset..src_offset + byte_width]); + let is_valid = nulls.map_or(true, |n| n.is_valid(row)); + self.partition_buffers[p].columns[col_idx].append_null_bit(is_valid); + } + } else if is_variable { + let data = column.to_data(); + let offsets_slice = data.buffers()[0].typed_data::(); + let values_slice = data.buffers()[1].as_slice(); + for row in 0..num_rows { + let p = partition_ids[row] as usize; + let start = offsets_slice[row] as usize; + let end = offsets_slice[row + 1] as usize; + self.partition_buffers[p].columns[col_idx] + .append_variable(&values_slice[start..end]); + let is_valid = nulls.map_or(true, |n| n.is_valid(row)); + self.partition_buffers[p].columns[col_idx].append_null_bit(is_valid); + } + } else if is_large_variable { + let data = column.to_data(); + let offsets_slice = data.buffers()[0].typed_data::(); + let values_slice = data.buffers()[1].as_slice(); + for row in 0..num_rows { + let p = partition_ids[row] as usize; + let start = offsets_slice[row] as usize; + let end = offsets_slice[row + 1] as usize; + self.partition_buffers[p].columns[col_idx] + .append_large_variable(&values_slice[start..end]); + let is_valid = nulls.map_or(true, |n| n.is_valid(row)); + self.partition_buffers[p].columns[col_idx].append_null_bit(is_valid); + } + } else if is_boolean { + let bool_array = column.as_any().downcast_ref::().unwrap(); + for row in 0..num_rows { + let p = partition_ids[row] as usize; + self.partition_buffers[p].columns[col_idx] + .append_bool(bool_array.value(row)); + let is_valid = nulls.map_or(true, |n| n.is_valid(row)); + self.partition_buffers[p].columns[col_idx].append_null_bit(is_valid); + } + } else { + // Fallback + for row in 0..num_rows { + let p = partition_ids[row] as usize; + self.partition_buffers[p].columns[col_idx] + .append_fallback_index(row as u32); + } } - let after_size = indices.allocated_size(); - mem_growth += after_size.saturating_sub(before_size); } - if self.reservation.try_grow(mem_growth).is_err() { - self.spill()?; + // Update row counts from partition_starts (O(num_partitions), not O(num_rows)) + for p in 0..num_partitions { + let count = (partition_starts[p + 1] - partition_starts[p]) as usize; + self.partition_buffers[p].row_count += count; } - Ok(()) - } + // Auto-flush partitions that reached batch_size + for p in 0..num_partitions { + if self.partition_buffers[p].row_count >= self.batch_size { + let batch = self.partition_buffers[p].flush(Some(input))?; + self.partition_writers[p].spill( + &[batch], + &self.runtime, + &self.metrics, + self.write_buffer_size, + self.batch_size, + )?; + } + } - fn shuffle_write_partition( - partition_iter: &mut PartitionedBatchIterator, - shuffle_block_writer: &mut ShuffleBlockWriter, - output_data: &mut BufWriter, - encode_time: &Time, - write_time: &Time, - write_buffer_size: usize, - batch_size: usize, - ) -> datafusion::common::Result<()> { - let mut buf_batch_writer = BufBatchWriter::new( - shuffle_block_writer, - output_data, - write_buffer_size, - batch_size, - ); - for batch in partition_iter { - let batch = batch?; - buf_batch_writer.write(&batch, encode_time, write_time)?; + // If schema has fallback columns, flush ALL non-empty partitions + // since fallback indices reference the current input batch + if self.has_fallback_columns { + for p in 0..num_partitions { + if self.partition_buffers[p].row_count > 0 { + let batch = self.partition_buffers[p].flush(Some(input))?; + self.partition_writers[p].spill( + &[batch], + &self.runtime, + &self.metrics, + self.write_buffer_size, + self.batch_size, + )?; + } + } } - buf_batch_writer.flush(encode_time, write_time)?; + + // Precise memory tracking + let mem_after: usize = self.partition_buffers.iter().map(|b| b.memory_size()).sum(); + let mem_growth = mem_after.saturating_sub(mem_before); + if self.reservation.try_grow(mem_growth).is_err() { + self.spill()?; + } + Ok(()) } @@ -474,49 +562,30 @@ impl MultiPartitionShuffleRepartitioner { self.metrics.data_size.value() } - /// This function transfers the ownership of the buffered batches and partition indices from the - /// ShuffleRepartitioner to a new PartitionedBatches struct. The returned PartitionedBatches struct - /// can be used to produce shuffled batches. - fn partitioned_batches(&mut self) -> PartitionedBatchesProducer { - let num_output_partitions = self.partition_indices.len(); - let buffered_batches = std::mem::take(&mut self.buffered_batches); - // let indices = std::mem::take(&mut self.partition_indices); - let indices = std::mem::replace( - &mut self.partition_indices, - vec![vec![]; num_output_partitions], - ); - PartitionedBatchesProducer::new(buffered_batches, indices, self.batch_size) - } - pub(crate) fn spill(&mut self) -> datafusion::common::Result<()> { + let has_data = self.partition_buffers.iter().any(|b| b.row_count() > 0); + if !has_data { + return Ok(()); + } log::info!( "ShuffleRepartitioner spilling shuffle data of {} to disk while inserting ({} time(s) so far)", self.used(), self.spill_count() ); - - // we could always get a chance to free some memory as long as we are holding some - if self.buffered_batches.is_empty() { - return Ok(()); - } - with_trace("shuffle_spill", self.tracing_enabled, || { - let num_output_partitions = self.partition_writers.len(); - let mut partitioned_batches = self.partitioned_batches(); let mut spilled_bytes = 0; - - for partition_id in 0..num_output_partitions { - let partition_writer = &mut self.partition_writers[partition_id]; - let mut iter = partitioned_batches.produce(partition_id); - spilled_bytes += partition_writer.spill( - &mut iter, - &self.runtime, - &self.metrics, - self.write_buffer_size, - self.batch_size, - )?; + for p in 0..self.partition_buffers.len() { + if self.partition_buffers[p].row_count() > 0 { + let batch = self.partition_buffers[p].flush(None)?; + spilled_bytes += self.partition_writers[p].spill( + &[batch], + &self.runtime, + &self.metrics, + self.write_buffer_size, + self.batch_size, + )?; + } } - self.reservation.free(); self.metrics.spill_count.add(1); self.metrics.spilled_bytes.add(spilled_bytes); @@ -559,11 +628,8 @@ impl ShufflePartitioner for MultiPartitionShuffleRepartitioner { fn shuffle_write(&mut self) -> datafusion::common::Result<()> { with_trace("shuffle_write", self.tracing_enabled, || { let start_time = Instant::now(); - - let mut partitioned_batches = self.partitioned_batches(); - let num_output_partitions = self.partition_indices.len(); + let num_output_partitions = self.partition_buffers.len(); let mut offsets = vec![0; num_output_partitions + 1]; - let data_file = self.output_data_file.clone(); let index_file = self.output_index_file.clone(); @@ -573,58 +639,58 @@ impl ShufflePartitioner for MultiPartitionShuffleRepartitioner { .truncate(true) .open(data_file) .map_err(|e| DataFusionError::Execution(format!("shuffle write error: {e:?}")))?; - let mut output_data = BufWriter::new(output_data); #[allow(clippy::needless_range_loop)] for i in 0..num_output_partitions { offsets[i] = output_data.stream_position()?; - // if we wrote a spill file for this partition then copy the - // contents into the shuffle file if let Some(spill_path) = self.partition_writers[i].path() { let mut spill_file = BufReader::new(File::open(spill_path)?); - let mut write_timer = self.metrics.write_time.timer(); + let mut wt = self.metrics.write_time.timer(); std::io::copy(&mut spill_file, &mut output_data)?; - write_timer.stop(); + wt.stop(); } - // Write in memory batches to output data file - let mut partition_iter = partitioned_batches.produce(i); - Self::shuffle_write_partition( - &mut partition_iter, - &mut self.shuffle_block_writer, - &mut output_data, - &self.metrics.encode_time, - &self.metrics.write_time, - self.write_buffer_size, - self.batch_size, - )?; + if self.partition_buffers[i].row_count() > 0 { + let batch = self.partition_buffers[i].flush(None)?; + let mut buf_batch_writer = BufBatchWriter::new( + &mut self.shuffle_block_writer, + &mut output_data, + self.write_buffer_size, + self.batch_size, + ); + buf_batch_writer.write( + &batch, + &self.metrics.encode_time, + &self.metrics.write_time, + )?; + buf_batch_writer.flush( + &self.metrics.encode_time, + &self.metrics.write_time, + )?; + } } - let mut write_timer = self.metrics.write_time.timer(); + let mut wt = self.metrics.write_time.timer(); output_data.flush()?; - write_timer.stop(); - - // add one extra offset at last to ease partition length computation + wt.stop(); offsets[num_output_partitions] = output_data.stream_position()?; - let mut write_timer = self.metrics.write_time.timer(); - let mut output_index = - BufWriter::new(File::create(index_file).map_err(|e| { - DataFusionError::Execution(format!("shuffle write error: {e:?}")) - })?); + let mut wt = self.metrics.write_time.timer(); + let mut output_index = BufWriter::new(File::create(index_file).map_err(|e| { + DataFusionError::Execution(format!("shuffle write error: {e:?}")) + })?); for offset in offsets { output_index.write_all(&(offset as i64).to_le_bytes()[..])?; } output_index.flush()?; - write_timer.stop(); + wt.stop(); self.metrics .baseline .elapsed_compute() .add_duration(start_time.elapsed()); - Ok(()) }) } diff --git a/native/core/src/execution/shuffle/partitioners/partitioned_batch_iterator.rs b/native/core/src/execution/shuffle/partitioners/partitioned_batch_iterator.rs deleted file mode 100644 index 77010938cd..0000000000 --- a/native/core/src/execution/shuffle/partitioners/partitioned_batch_iterator.rs +++ /dev/null @@ -1,110 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use arrow::array::RecordBatch; -use arrow::compute::interleave_record_batch; -use datafusion::common::DataFusionError; - -/// A helper struct to produce shuffled batches. -/// This struct takes ownership of the buffered batches and partition indices from the -/// ShuffleRepartitioner, and provides an iterator over the batches in the specified partitions. -pub(super) struct PartitionedBatchesProducer { - buffered_batches: Vec, - partition_indices: Vec>, - batch_size: usize, -} - -impl PartitionedBatchesProducer { - pub(super) fn new( - buffered_batches: Vec, - indices: Vec>, - batch_size: usize, - ) -> Self { - Self { - partition_indices: indices, - buffered_batches, - batch_size, - } - } - - pub(super) fn produce(&mut self, partition_id: usize) -> PartitionedBatchIterator<'_> { - PartitionedBatchIterator::new( - &self.partition_indices[partition_id], - &self.buffered_batches, - self.batch_size, - ) - } -} - -pub(crate) struct PartitionedBatchIterator<'a> { - record_batches: Vec<&'a RecordBatch>, - batch_size: usize, - indices: Vec<(usize, usize)>, - pos: usize, -} - -impl<'a> PartitionedBatchIterator<'a> { - fn new( - indices: &'a [(u32, u32)], - buffered_batches: &'a [RecordBatch], - batch_size: usize, - ) -> Self { - if indices.is_empty() { - // Avoid unnecessary allocations when the partition is empty - return Self { - record_batches: vec![], - batch_size, - indices: vec![], - pos: 0, - }; - } - let record_batches = buffered_batches.iter().collect::>(); - let current_indices = indices - .iter() - .map(|(i_batch, i_row)| (*i_batch as usize, *i_row as usize)) - .collect::>(); - Self { - record_batches, - batch_size, - indices: current_indices, - pos: 0, - } - } -} - -impl Iterator for PartitionedBatchIterator<'_> { - type Item = datafusion::common::Result; - - fn next(&mut self) -> Option { - if self.pos >= self.indices.len() { - return None; - } - - let indices_end = std::cmp::min(self.pos + self.batch_size, self.indices.len()); - let indices = &self.indices[self.pos..indices_end]; - match interleave_record_batch(&self.record_batches, indices) { - Ok(batch) => { - self.pos = indices_end; - Some(Ok(batch)) - } - Err(e) => Some(Err(DataFusionError::ArrowError( - Box::from(e), - Some(DataFusionError::get_back_trace()), - ))), - } - } -} diff --git a/native/core/src/execution/shuffle/writers/partition_writer.rs b/native/core/src/execution/shuffle/writers/partition_writer.rs index 7c2dbe0444..40762ff1ce 100644 --- a/native/core/src/execution/shuffle/writers/partition_writer.rs +++ b/native/core/src/execution/shuffle/writers/partition_writer.rs @@ -16,9 +16,9 @@ // under the License. use crate::execution::shuffle::metrics::ShufflePartitionerMetrics; -use crate::execution::shuffle::partitioners::PartitionedBatchIterator; use crate::execution::shuffle::writers::buf_batch_writer::BufBatchWriter; use crate::execution::shuffle::ShuffleBlockWriter; +use arrow::array::RecordBatch; use datafusion::common::DataFusionError; use datafusion::execution::disk_manager::RefCountedTempFile; use datafusion::execution::runtime_env::RuntimeEnv; @@ -75,40 +75,38 @@ impl PartitionWriter { pub(crate) fn spill( &mut self, - iter: &mut PartitionedBatchIterator, + batches: &[RecordBatch], runtime: &RuntimeEnv, metrics: &ShufflePartitionerMetrics, write_buffer_size: usize, batch_size: usize, ) -> datafusion::common::Result { - if let Some(batch) = iter.next() { - self.ensure_spill_file_created(runtime)?; - - let total_bytes_written = { - let mut buf_batch_writer = BufBatchWriter::new( - &mut self.shuffle_block_writer, - &mut self.spill_file.as_mut().unwrap().file, - write_buffer_size, - batch_size, - ); - let mut bytes_written = - buf_batch_writer.write(&batch?, &metrics.encode_time, &metrics.write_time)?; - for batch in iter { - let batch = batch?; - bytes_written += buf_batch_writer.write( - &batch, - &metrics.encode_time, - &metrics.write_time, - )?; - } - buf_batch_writer.flush(&metrics.encode_time, &metrics.write_time)?; - bytes_written - }; - - Ok(total_bytes_written) - } else { - Ok(0) + if batches.is_empty() { + return Ok(0); } + self.ensure_spill_file_created(runtime)?; + let total_bytes_written = { + let mut buf_batch_writer = BufBatchWriter::new( + &mut self.shuffle_block_writer, + &mut self.spill_file.as_mut().unwrap().file, + write_buffer_size, + batch_size, + ); + let mut bytes_written = 0; + for batch in batches { + bytes_written += buf_batch_writer.write( + batch, + &metrics.encode_time, + &metrics.write_time, + )?; + } + buf_batch_writer.flush( + &metrics.encode_time, + &metrics.write_time, + )?; + bytes_written + }; + Ok(total_bytes_written) } pub(crate) fn path(&self) -> Option<&std::path::Path> { From c2d02f7588b7438369ac2c21b4cbd560a08ecb19 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 19 Mar 2026 14:53:13 -0600 Subject: [PATCH 03/11] fix: address clippy warnings in scatter kernel --- .../shuffle/partitioners/multi_partition.rs | 30 +++++++++---------- .../shuffle/partitioners/partition_buffer.rs | 14 +++++---- 2 files changed, 23 insertions(+), 21 deletions(-) diff --git a/native/core/src/execution/shuffle/partitioners/multi_partition.rs b/native/core/src/execution/shuffle/partitioners/multi_partition.rs index 027f48f865..6289a7e794 100644 --- a/native/core/src/execution/shuffle/partitioners/multi_partition.rs +++ b/native/core/src/execution/shuffle/partitioners/multi_partition.rs @@ -195,7 +195,7 @@ impl MultiPartitionShuffleRepartitioner { }); let estimated_rows_per_partition = batch_size / num_output_partitions.max(1); let partition_buffers = (0..num_output_partitions) - .map(|_| PartitionBuffer::new(schema.clone(), estimated_rows_per_partition)) + .map(|_| PartitionBuffer::new(Arc::clone(&schema), estimated_rows_per_partition)) .collect(); let reservation = MemoryConsumer::new(format!("ShuffleRepartitioner[{partition}]")) @@ -446,53 +446,53 @@ impl MultiPartitionShuffleRepartitioner { }; let data = column.to_data(); let values = data.buffers()[0].as_slice(); - for row in 0..num_rows { - let p = partition_ids[row] as usize; + for (row, &p_id) in partition_ids.iter().enumerate().take(num_rows) { + let p = p_id as usize; let src_offset = row * byte_width; self.partition_buffers[p].columns[col_idx] .append_fixed(&values[src_offset..src_offset + byte_width]); - let is_valid = nulls.map_or(true, |n| n.is_valid(row)); + let is_valid = nulls.is_none_or(|n| n.is_valid(row)); self.partition_buffers[p].columns[col_idx].append_null_bit(is_valid); } } else if is_variable { let data = column.to_data(); let offsets_slice = data.buffers()[0].typed_data::(); let values_slice = data.buffers()[1].as_slice(); - for row in 0..num_rows { - let p = partition_ids[row] as usize; + for (row, &p_id) in partition_ids.iter().enumerate().take(num_rows) { + let p = p_id as usize; let start = offsets_slice[row] as usize; let end = offsets_slice[row + 1] as usize; self.partition_buffers[p].columns[col_idx] .append_variable(&values_slice[start..end]); - let is_valid = nulls.map_or(true, |n| n.is_valid(row)); + let is_valid = nulls.is_none_or(|n| n.is_valid(row)); self.partition_buffers[p].columns[col_idx].append_null_bit(is_valid); } } else if is_large_variable { let data = column.to_data(); let offsets_slice = data.buffers()[0].typed_data::(); let values_slice = data.buffers()[1].as_slice(); - for row in 0..num_rows { - let p = partition_ids[row] as usize; + for (row, &p_id) in partition_ids.iter().enumerate().take(num_rows) { + let p = p_id as usize; let start = offsets_slice[row] as usize; let end = offsets_slice[row + 1] as usize; self.partition_buffers[p].columns[col_idx] .append_large_variable(&values_slice[start..end]); - let is_valid = nulls.map_or(true, |n| n.is_valid(row)); + let is_valid = nulls.is_none_or(|n| n.is_valid(row)); self.partition_buffers[p].columns[col_idx].append_null_bit(is_valid); } } else if is_boolean { let bool_array = column.as_any().downcast_ref::().unwrap(); - for row in 0..num_rows { - let p = partition_ids[row] as usize; + for (row, &p_id) in partition_ids.iter().enumerate().take(num_rows) { + let p = p_id as usize; self.partition_buffers[p].columns[col_idx] .append_bool(bool_array.value(row)); - let is_valid = nulls.map_or(true, |n| n.is_valid(row)); + let is_valid = nulls.is_none_or(|n| n.is_valid(row)); self.partition_buffers[p].columns[col_idx].append_null_bit(is_valid); } } else { // Fallback - for row in 0..num_rows { - let p = partition_ids[row] as usize; + for (row, &p_id) in partition_ids.iter().enumerate().take(num_rows) { + let p = p_id as usize; self.partition_buffers[p].columns[col_idx] .append_fallback_index(row as u32); } diff --git a/native/core/src/execution/shuffle/partitioners/partition_buffer.rs b/native/core/src/execution/shuffle/partitioners/partition_buffer.rs index e082174b55..0b37d6b282 100644 --- a/native/core/src/execution/shuffle/partitioners/partition_buffer.rs +++ b/native/core/src/execution/shuffle/partitioners/partition_buffer.rs @@ -221,12 +221,14 @@ impl PartitionBuffer { self.columns.iter().map(|c| c.memory_size()).sum() } + #[allow(dead_code)] pub(crate) fn has_fallback_columns(&self) -> bool { self.columns .iter() .any(|c| matches!(c, ColumnBuffer::Fallback { .. })) } + #[allow(dead_code)] pub(crate) fn clear(&mut self) { self.row_count = 0; for col in &mut self.columns { @@ -280,7 +282,7 @@ impl PartitionBuffer { Buffer::from(std::mem::replace(values, MutableBuffer::new(0))); let mut builder = ArrayData::builder(data_type).len(row_count).add_buffer(buffer); - if nulls.len() > 0 { + if !nulls.is_empty() { builder = builder .null_bit_buffer(Some(nulls.finish().into_inner())); } @@ -298,7 +300,7 @@ impl PartitionBuffer { vec![0i32], ))); let values_buffer = Buffer::from(std::mem::take(data)); - let null_buffer = if nulls.len() > 0 { + let null_buffer = if !nulls.is_empty() { Some(NullBuffer::new(nulls.finish())) } else { None @@ -328,7 +330,7 @@ impl PartitionBuffer { vec![0i64], ))); let values_buffer = Buffer::from(std::mem::take(data)); - let null_buffer = if nulls.len() > 0 { + let null_buffer = if !nulls.is_empty() { Some(NullBuffer::new(nulls.finish())) } else { None @@ -349,7 +351,7 @@ impl PartitionBuffer { } ColumnBuffer::Boolean { values, nulls } => { let values_buf = values.finish(); - let null_buffer = if nulls.len() > 0 { + let null_buffer = if !nulls.is_empty() { Some(NullBuffer::new(nulls.finish())) } else { None @@ -367,7 +369,7 @@ impl PartitionBuffer { } self.row_count = 0; - Ok(RecordBatch::try_new(self.schema.clone(), arrays)?) + Ok(RecordBatch::try_new(Arc::clone(&self.schema), arrays)?) } } @@ -384,7 +386,7 @@ mod tests { Field::new("s", DataType::Utf8, true), Field::new("b", DataType::Boolean, true), ])); - let mut buf = PartitionBuffer::new(schema.clone(), 100); + let mut buf = PartitionBuffer::new(Arc::clone(&schema), 100); // Append 3 rows manually // Row 0: i=1, s="hello", b=true, all valid From d7a11ba625a1a67d3b3108e37c2d0ddba0001db7 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 19 Mar 2026 15:08:25 -0600 Subject: [PATCH 04/11] refactor: iterate by partition in scatter kernel for better cache locality Restructure scatter_batch to iterate rows grouped by partition using partition_row_indices and partition_starts from ScratchSpace, instead of iterating in original row order. This keeps writes to the same partition buffer sequential, improving cache locality. Also add convenience methods to PartitionBuffer for appending data with null bits, and remove dead gather_time metric. --- .../shuffle/partitioners/multi_partition.rs | 114 ++++++++++++------ .../shuffle/partitioners/partition_buffer.rs | 24 ++++ .../shuffle/CometNativeShuffleWriter.scala | 2 +- 3 files changed, 100 insertions(+), 40 deletions(-) diff --git a/native/core/src/execution/shuffle/partitioners/multi_partition.rs b/native/core/src/execution/shuffle/partitioners/multi_partition.rs index 6289a7e794..25d692dad9 100644 --- a/native/core/src/execution/shuffle/partitioners/multi_partition.rs +++ b/native/core/src/execution/shuffle/partitioners/multi_partition.rs @@ -285,7 +285,7 @@ impl MultiPartitionShuffleRepartitioner { self.scatter_batch( &input, - &scratch.partition_ids[..num_rows], + &scratch.partition_row_indices[..num_rows], &scratch.partition_starts, )?; self.scratch = scratch; @@ -333,7 +333,7 @@ impl MultiPartitionShuffleRepartitioner { self.scatter_batch( &input, - &scratch.partition_ids[..num_rows], + &scratch.partition_row_indices[..num_rows], &scratch.partition_starts, )?; self.scratch = scratch; @@ -388,7 +388,7 @@ impl MultiPartitionShuffleRepartitioner { self.scatter_batch( &input, - &scratch.partition_ids[..num_rows], + &scratch.partition_row_indices[..num_rows], &scratch.partition_starts, )?; self.scratch = scratch; @@ -407,16 +407,17 @@ impl MultiPartitionShuffleRepartitioner { fn scatter_batch( &mut self, input: &RecordBatch, - partition_ids: &[u32], + partition_row_indices: &[u32], partition_starts: &[u32], ) -> datafusion::common::Result<()> { - let num_rows = input.num_rows(); let num_partitions = self.partition_buffers.len(); // Track memory before scatter let mem_before: usize = self.partition_buffers.iter().map(|b| b.memory_size()).sum(); - // Column-oriented scatter: process one column at a time across all rows + // Column-oriented scatter: for each column, iterate by partition then by + // rows within that partition. This keeps writes to the same partition buffer + // sequential for better cache locality. for (col_idx, column) in input.columns().iter().enumerate() { // Determine scatter path from first partition's column type // (all partitions have the same column types) @@ -446,55 +447,90 @@ impl MultiPartitionShuffleRepartitioner { }; let data = column.to_data(); let values = data.buffers()[0].as_slice(); - for (row, &p_id) in partition_ids.iter().enumerate().take(num_rows) { - let p = p_id as usize; - let src_offset = row * byte_width; - self.partition_buffers[p].columns[col_idx] - .append_fixed(&values[src_offset..src_offset + byte_width]); - let is_valid = nulls.is_none_or(|n| n.is_valid(row)); - self.partition_buffers[p].columns[col_idx].append_null_bit(is_valid); + for p in 0..num_partitions { + let start = partition_starts[p] as usize; + let end = partition_starts[p + 1] as usize; + if start == end { + continue; + } + let row_indices = &partition_row_indices[start..end]; + for &row_idx in row_indices { + let row = row_idx as usize; + let src_offset = row * byte_width; + let is_valid = nulls.is_none_or(|n| n.is_valid(row)); + self.partition_buffers[p] + .append_fixed(col_idx, &values[src_offset..src_offset + byte_width], is_valid); + } } } else if is_variable { let data = column.to_data(); let offsets_slice = data.buffers()[0].typed_data::(); let values_slice = data.buffers()[1].as_slice(); - for (row, &p_id) in partition_ids.iter().enumerate().take(num_rows) { - let p = p_id as usize; - let start = offsets_slice[row] as usize; - let end = offsets_slice[row + 1] as usize; - self.partition_buffers[p].columns[col_idx] - .append_variable(&values_slice[start..end]); - let is_valid = nulls.is_none_or(|n| n.is_valid(row)); - self.partition_buffers[p].columns[col_idx].append_null_bit(is_valid); + for p in 0..num_partitions { + let start = partition_starts[p] as usize; + let end = partition_starts[p + 1] as usize; + if start == end { + continue; + } + let row_indices = &partition_row_indices[start..end]; + for &row_idx in row_indices { + let row = row_idx as usize; + let val_start = offsets_slice[row] as usize; + let val_end = offsets_slice[row + 1] as usize; + let is_valid = nulls.is_none_or(|n| n.is_valid(row)); + self.partition_buffers[p] + .append_variable(col_idx, &values_slice[val_start..val_end], is_valid); + } } } else if is_large_variable { let data = column.to_data(); let offsets_slice = data.buffers()[0].typed_data::(); let values_slice = data.buffers()[1].as_slice(); - for (row, &p_id) in partition_ids.iter().enumerate().take(num_rows) { - let p = p_id as usize; - let start = offsets_slice[row] as usize; - let end = offsets_slice[row + 1] as usize; - self.partition_buffers[p].columns[col_idx] - .append_large_variable(&values_slice[start..end]); - let is_valid = nulls.is_none_or(|n| n.is_valid(row)); - self.partition_buffers[p].columns[col_idx].append_null_bit(is_valid); + for p in 0..num_partitions { + let start = partition_starts[p] as usize; + let end = partition_starts[p + 1] as usize; + if start == end { + continue; + } + let row_indices = &partition_row_indices[start..end]; + for &row_idx in row_indices { + let row = row_idx as usize; + let val_start = offsets_slice[row] as usize; + let val_end = offsets_slice[row + 1] as usize; + let is_valid = nulls.is_none_or(|n| n.is_valid(row)); + self.partition_buffers[p] + .append_large_variable(col_idx, &values_slice[val_start..val_end], is_valid); + } } } else if is_boolean { let bool_array = column.as_any().downcast_ref::().unwrap(); - for (row, &p_id) in partition_ids.iter().enumerate().take(num_rows) { - let p = p_id as usize; - self.partition_buffers[p].columns[col_idx] - .append_bool(bool_array.value(row)); - let is_valid = nulls.is_none_or(|n| n.is_valid(row)); - self.partition_buffers[p].columns[col_idx].append_null_bit(is_valid); + for p in 0..num_partitions { + let start = partition_starts[p] as usize; + let end = partition_starts[p + 1] as usize; + if start == end { + continue; + } + let row_indices = &partition_row_indices[start..end]; + for &row_idx in row_indices { + let row = row_idx as usize; + let is_valid = nulls.is_none_or(|n| n.is_valid(row)); + self.partition_buffers[p] + .append_bool(col_idx, bool_array.value(row), is_valid); + } } } else { // Fallback - for (row, &p_id) in partition_ids.iter().enumerate().take(num_rows) { - let p = p_id as usize; - self.partition_buffers[p].columns[col_idx] - .append_fallback_index(row as u32); + for p in 0..num_partitions { + let start = partition_starts[p] as usize; + let end = partition_starts[p + 1] as usize; + if start == end { + continue; + } + let row_indices = &partition_row_indices[start..end]; + for &row_idx in row_indices { + self.partition_buffers[p] + .append_fallback_index(col_idx, row_idx); + } } } } diff --git a/native/core/src/execution/shuffle/partitioners/partition_buffer.rs b/native/core/src/execution/shuffle/partitioners/partition_buffer.rs index 0b37d6b282..079de2cf89 100644 --- a/native/core/src/execution/shuffle/partitioners/partition_buffer.rs +++ b/native/core/src/execution/shuffle/partitioners/partition_buffer.rs @@ -213,6 +213,30 @@ impl PartitionBuffer { } } + pub(crate) fn append_fixed(&mut self, col_idx: usize, bytes: &[u8], is_valid: bool) { + self.columns[col_idx].append_fixed(bytes); + self.columns[col_idx].append_null_bit(is_valid); + } + + pub(crate) fn append_variable(&mut self, col_idx: usize, bytes: &[u8], is_valid: bool) { + self.columns[col_idx].append_variable(bytes); + self.columns[col_idx].append_null_bit(is_valid); + } + + pub(crate) fn append_large_variable(&mut self, col_idx: usize, bytes: &[u8], is_valid: bool) { + self.columns[col_idx].append_large_variable(bytes); + self.columns[col_idx].append_null_bit(is_valid); + } + + pub(crate) fn append_bool(&mut self, col_idx: usize, value: bool, is_valid: bool) { + self.columns[col_idx].append_bool(value); + self.columns[col_idx].append_null_bit(is_valid); + } + + pub(crate) fn append_fallback_index(&mut self, col_idx: usize, idx: u32) { + self.columns[col_idx].append_fallback_index(idx); + } + pub(crate) fn row_count(&self) -> usize { self.row_count } diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometNativeShuffleWriter.scala b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometNativeShuffleWriter.scala index 3fc222bd19..8927247143 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometNativeShuffleWriter.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometNativeShuffleWriter.scala @@ -77,8 +77,8 @@ class CometNativeShuffleWriter[K, V]( val detailedMetrics = Seq( "elapsed_compute", - "encode_time", "repart_time", + "encode_time", "input_batches", "spill_count", "spilled_bytes") From 233d436191f15fa66d237fa21eeccef7a712afd3 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 19 Mar 2026 14:58:13 -0600 Subject: [PATCH 05/11] style: format code --- .../shuffle/partitioners/multi_partition.rs | 7 +- .../shuffle/partitioners/partition_buffer.rs | 69 ++++++++----------- 2 files changed, 31 insertions(+), 45 deletions(-) diff --git a/native/core/src/execution/shuffle/partitioners/multi_partition.rs b/native/core/src/execution/shuffle/partitioners/multi_partition.rs index 25d692dad9..0efcadc603 100644 --- a/native/core/src/execution/shuffle/partitioners/multi_partition.rs +++ b/native/core/src/execution/shuffle/partitioners/multi_partition.rs @@ -714,9 +714,10 @@ impl ShufflePartitioner for MultiPartitionShuffleRepartitioner { offsets[num_output_partitions] = output_data.stream_position()?; let mut wt = self.metrics.write_time.timer(); - let mut output_index = BufWriter::new(File::create(index_file).map_err(|e| { - DataFusionError::Execution(format!("shuffle write error: {e:?}")) - })?); + let mut output_index = + BufWriter::new(File::create(index_file).map_err(|e| { + DataFusionError::Execution(format!("shuffle write error: {e:?}")) + })?); for offset in offsets { output_index.write_all(&(offset as i64).to_le_bytes()[..])?; } diff --git a/native/core/src/execution/shuffle/partitioners/partition_buffer.rs b/native/core/src/execution/shuffle/partitioners/partition_buffer.rs index 079de2cf89..ea9eaaa200 100644 --- a/native/core/src/execution/shuffle/partitioners/partition_buffer.rs +++ b/native/core/src/execution/shuffle/partitioners/partition_buffer.rs @@ -122,22 +122,16 @@ impl ColumnBuffer { data, nulls, } => { - offsets.capacity() * std::mem::size_of::() - + data.capacity() - + nulls.capacity() + offsets.capacity() * std::mem::size_of::() + data.capacity() + nulls.capacity() } ColumnBuffer::LargeVariable { offsets, data, nulls, } => { - offsets.capacity() * std::mem::size_of::() - + data.capacity() - + nulls.capacity() - } - ColumnBuffer::Fallback { indices } => { - indices.capacity() * std::mem::size_of::() + offsets.capacity() * std::mem::size_of::() + data.capacity() + nulls.capacity() } + ColumnBuffer::Fallback { indices } => indices.capacity() * std::mem::size_of::(), } } } @@ -297,18 +291,13 @@ impl PartitionBuffer { for (col_idx, col) in self.columns.iter_mut().enumerate() { let data_type = self.schema.field(col_idx).data_type().clone(); let array: ArrayRef = match col { - ColumnBuffer::Fixed { - values, - nulls, - .. - } => { - let buffer = - Buffer::from(std::mem::replace(values, MutableBuffer::new(0))); - let mut builder = - ArrayData::builder(data_type).len(row_count).add_buffer(buffer); + ColumnBuffer::Fixed { values, nulls, .. } => { + let buffer = Buffer::from(std::mem::replace(values, MutableBuffer::new(0))); + let mut builder = ArrayData::builder(data_type) + .len(row_count) + .add_buffer(buffer); if !nulls.is_empty() { - builder = builder - .null_bit_buffer(Some(nulls.finish().into_inner())); + builder = builder.null_bit_buffer(Some(nulls.finish().into_inner())); } let data = builder.build()?; make_array(data) @@ -318,11 +307,10 @@ impl PartitionBuffer { data, nulls, } => { - let offsets_buffer = - OffsetBuffer::new(ScalarBuffer::from(std::mem::replace( - offsets, - vec![0i32], - ))); + let offsets_buffer = OffsetBuffer::new(ScalarBuffer::from(std::mem::replace( + offsets, + vec![0i32], + ))); let values_buffer = Buffer::from(std::mem::take(data)); let null_buffer = if !nulls.is_empty() { Some(NullBuffer::new(nulls.finish())) @@ -330,16 +318,14 @@ impl PartitionBuffer { None }; match &data_type { - DataType::Utf8 => Arc::new(StringArray::new( - offsets_buffer, - values_buffer, - null_buffer, - )) as ArrayRef, - DataType::Binary => Arc::new(BinaryArray::new( - offsets_buffer, - values_buffer, - null_buffer, - )) as ArrayRef, + DataType::Utf8 => { + Arc::new(StringArray::new(offsets_buffer, values_buffer, null_buffer)) + as ArrayRef + } + DataType::Binary => { + Arc::new(BinaryArray::new(offsets_buffer, values_buffer, null_buffer)) + as ArrayRef + } _ => unreachable!("Variable buffer with unexpected data type"), } } @@ -348,11 +334,10 @@ impl PartitionBuffer { data, nulls, } => { - let offsets_buffer = - OffsetBuffer::new(ScalarBuffer::from(std::mem::replace( - offsets, - vec![0i64], - ))); + let offsets_buffer = OffsetBuffer::new(ScalarBuffer::from(std::mem::replace( + offsets, + vec![0i64], + ))); let values_buffer = Buffer::from(std::mem::take(data)); let null_buffer = if !nulls.is_empty() { Some(NullBuffer::new(nulls.finish())) @@ -383,8 +368,8 @@ impl PartitionBuffer { Arc::new(BooleanArray::new(values_buf, null_buffer)) as ArrayRef } ColumnBuffer::Fallback { indices } => { - let fallback = fallback_batch - .expect("fallback_batch required for Fallback columns"); + let fallback = + fallback_batch.expect("fallback_batch required for Fallback columns"); let idx_array = UInt32Array::from(std::mem::take(indices)); take(fallback.column(col_idx), &idx_array, None)? } From 1afa8ea54ddef36af991348c023cf9b31b9f1881 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 19 Mar 2026 15:16:50 -0600 Subject: [PATCH 06/11] feat: add scatter_time metric to shuffle write profiling --- native/core/src/execution/shuffle/metrics.rs | 4 ++++ .../src/execution/shuffle/partitioners/multi_partition.rs | 4 ++++ .../scala/org/apache/spark/sql/comet/CometMetricNode.scala | 3 +++ .../comet/execution/shuffle/CometNativeShuffleWriter.scala | 1 + 4 files changed, 12 insertions(+) diff --git a/native/core/src/execution/shuffle/metrics.rs b/native/core/src/execution/shuffle/metrics.rs index 33b51c3cd8..8b9d721ad7 100644 --- a/native/core/src/execution/shuffle/metrics.rs +++ b/native/core/src/execution/shuffle/metrics.rs @@ -26,6 +26,9 @@ pub(super) struct ShufflePartitionerMetrics { /// Time to perform repartitioning pub(super) repart_time: Time, + /// Time scattering values to per-partition buffers + pub(super) scatter_time: Time, + /// Time encoding batches to IPC format pub(super) encode_time: Time, @@ -50,6 +53,7 @@ impl ShufflePartitionerMetrics { Self { baseline: BaselineMetrics::new(metrics, partition), repart_time: MetricBuilder::new(metrics).subset_time("repart_time", partition), + scatter_time: MetricBuilder::new(metrics).subset_time("scatter_time", partition), encode_time: MetricBuilder::new(metrics).subset_time("encode_time", partition), write_time: MetricBuilder::new(metrics).subset_time("write_time", partition), input_batches: MetricBuilder::new(metrics).counter("input_batches", partition), diff --git a/native/core/src/execution/shuffle/partitioners/multi_partition.rs b/native/core/src/execution/shuffle/partitioners/multi_partition.rs index 0efcadc603..f160896031 100644 --- a/native/core/src/execution/shuffle/partitioners/multi_partition.rs +++ b/native/core/src/execution/shuffle/partitioners/multi_partition.rs @@ -415,6 +415,8 @@ impl MultiPartitionShuffleRepartitioner { // Track memory before scatter let mem_before: usize = self.partition_buffers.iter().map(|b| b.memory_size()).sum(); + let scatter_start = Instant::now(); + // Column-oriented scatter: for each column, iterate by partition then by // rows within that partition. This keeps writes to the same partition buffer // sequential for better cache locality. @@ -535,6 +537,8 @@ impl MultiPartitionShuffleRepartitioner { } } + self.metrics.scatter_time.add_duration(scatter_start.elapsed()); + // Update row counts from partition_starts (O(num_partitions), not O(num_rows)) for p in 0..num_partitions { let count = (partition_starts[p + 1] - partition_starts[p]) as usize; diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometMetricNode.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometMetricNode.scala index 8c75df1d45..e0f4eee477 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometMetricNode.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometMetricNode.scala @@ -248,6 +248,9 @@ object CometMetricNode { Map( "elapsed_compute" -> SQLMetrics.createNanoTimingMetric(sc, "native shuffle writer time"), "repart_time" -> SQLMetrics.createNanoTimingMetric(sc, "repartition time"), + "scatter_time" -> SQLMetrics.createNanoTimingMetric( + sc, + "scatter to partition buffers time"), "encode_time" -> SQLMetrics.createNanoTimingMetric(sc, "encoding and compression time"), "decode_time" -> SQLMetrics.createNanoTimingMetric(sc, "decoding and decompression time"), "spill_count" -> SQLMetrics.createMetric(sc, "number of spills"), diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometNativeShuffleWriter.scala b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometNativeShuffleWriter.scala index 8927247143..b9d48200d3 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometNativeShuffleWriter.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometNativeShuffleWriter.scala @@ -78,6 +78,7 @@ class CometNativeShuffleWriter[K, V]( val detailedMetrics = Seq( "elapsed_compute", "repart_time", + "scatter_time", "encode_time", "input_batches", "spill_count", From 3873b95ac18fbd2b5bef5de106d65eb664222e5f Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 21 Mar 2026 07:43:26 -0600 Subject: [PATCH 07/11] feat: add standalone shuffle benchmark binary for profiling Add a `shuffle_bench` binary that benchmarks shuffle write and read performance independently from Spark, making it easy to profile with tools like `cargo flamegraph`, `perf`, or `instruments`. Supports reading Parquet files (e.g. TPC-H/TPC-DS) or generating synthetic data with configurable schema. Covers different scenarios including compression codecs, partition counts, partitioning schemes, and memory-constrained spilling. --- native/Cargo.lock | 88 +++- native/core/Cargo.toml | 5 + native/core/src/bin/shuffle_bench.rs | 725 +++++++++++++++++++++++++++ 3 files changed, 816 insertions(+), 2 deletions(-) create mode 100644 native/core/src/bin/shuffle_bench.rs diff --git a/native/Cargo.lock b/native/Cargo.lock index 5f99c614b3..f43b41dd9a 100644 --- a/native/Cargo.lock +++ b/native/Cargo.lock @@ -96,12 +96,56 @@ version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4b46cbb362ab8752921c97e041f5e366ee6297bd428a31275b9fcf1e380f7299" +[[package]] +name = "anstream" +version = "0.6.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "43d5b281e737544384e969a5ccad3f1cdd24b48086a0fc1b2a5262a26b8f4f4a" +dependencies = [ + "anstyle", + "anstyle-parse", + "anstyle-query", + "anstyle-wincon", + "colorchoice", + "is_terminal_polyfill", + "utf8parse", +] + [[package]] name = "anstyle" version = "1.0.13" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5192cca8006f1fd4f7237516f40fa183bb07f8fbdfedaa0036de5ea9b0b45e78" +[[package]] +name = "anstyle-parse" +version = "0.2.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4e7644824f0aa2c7b9384579234ef10eb7efb6a0deb83f9630a49594dd9c15c2" +dependencies = [ + "utf8parse", +] + +[[package]] +name = "anstyle-query" +version = "1.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "40c48f72fd53cd289104fc64099abca73db4166ad86ea0b4341abe65af83dadc" +dependencies = [ + "windows-sys 0.60.2", +] + +[[package]] +name = "anstyle-wincon" +version = "3.0.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "291e6a250ff86cd4a820112fb8898808a366d8f9f58ce16d1f538353ad55747d" +dependencies = [ + "anstyle", + "once_cell_polyfill", + "windows-sys 0.60.2", +] + [[package]] name = "anyhow" version = "1.0.102" @@ -1331,6 +1375,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2797f34da339ce31042b27d23607e051786132987f595b02ba4f6a6dffb7030a" dependencies = [ "clap_builder", + "clap_derive", ] [[package]] @@ -1339,8 +1384,22 @@ version = "4.5.60" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "24a241312cea5059b13574bb9b3861cabf758b879c15190b37b6d6fd63ab6876" dependencies = [ + "anstream", "anstyle", "clap_lex", + "strsim", +] + +[[package]] +name = "clap_derive" +version = "4.5.55" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a92793da1a46a5f2a02a6f4c46c6496b28c43638adea8306fcb0caa1634f24e5" +dependencies = [ + "heck", + "proc-macro2", + "quote", + "syn 2.0.117", ] [[package]] @@ -1358,6 +1417,12 @@ dependencies = [ "cc", ] +[[package]] +name = "colorchoice" +version = "1.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d07550c9036bf2ae0c684c4297d503f838287c83c53686d05370d0e139ae570" + [[package]] name = "combine" version = "4.6.7" @@ -1834,6 +1899,7 @@ dependencies = [ "aws-config", "aws-credential-types", "bytes", + "clap", "crc32fast", "criterion", "datafusion", @@ -1885,7 +1951,7 @@ dependencies = [ [[package]] name = "datafusion-comet-common" -version = "0.14.0" +version = "0.15.0" dependencies = [ "arrow", "datafusion", @@ -1911,7 +1977,7 @@ dependencies = [ [[package]] name = "datafusion-comet-jni-bridge" -version = "0.14.0" +version = "0.15.0" dependencies = [ "arrow", "assertables", @@ -3609,6 +3675,12 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "is_terminal_polyfill" +version = "1.70.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a6cb138bb79a146c1bd460005623e142ef0181e3d0219cb493e02f7d08a35695" + [[package]] name = "itertools" version = "0.13.0" @@ -4289,6 +4361,12 @@ version = "1.21.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9f7c3e4beb33f85d45ae3e3a1792185706c8e16d043238c593331cc7cd313b50" +[[package]] +name = "once_cell_polyfill" +version = "1.70.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "384b8ab6d37215f3c5301a95a4accb5d64aa607f1fcb26a11b5303878451b4fe" + [[package]] name = "oorandom" version = "11.1.5" @@ -6339,6 +6417,12 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b6c140620e7ffbb22c2dee59cafe6084a59b5ffc27a8859a5f0d494b5d52b6be" +[[package]] +name = "utf8parse" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" + [[package]] name = "uuid" version = "1.22.0" diff --git a/native/core/Cargo.toml b/native/core/Cargo.toml index 3f305a631d..3df9e55719 100644 --- a/native/core/Cargo.toml +++ b/native/core/Cargo.toml @@ -72,6 +72,7 @@ url = { workspace = true } aws-config = { workspace = true } aws-credential-types = { workspace = true } parking_lot = "0.12.5" +clap = { version = "4", features = ["derive"] } datafusion-comet-objectstore-hdfs = { path = "../hdfs", optional = true, default-features = false, features = ["hdfs"] } reqwest = { version = "0.12", default-features = false, features = ["rustls-tls-native-roots", "http2"] } object_store_opendal = {version = "0.55.0", optional = true} @@ -113,6 +114,10 @@ name = "comet" # "rlib" is for benchmarking with criterion. crate-type = ["cdylib", "rlib"] +[[bin]] +name = "shuffle_bench" +path = "src/bin/shuffle_bench.rs" + [[bench]] name = "parquet_read" harness = false diff --git a/native/core/src/bin/shuffle_bench.rs b/native/core/src/bin/shuffle_bench.rs new file mode 100644 index 0000000000..c1498161f7 --- /dev/null +++ b/native/core/src/bin/shuffle_bench.rs @@ -0,0 +1,725 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Standalone shuffle benchmark tool for profiling Comet shuffle write and read +//! outside of Spark. +//! +//! # Usage +//! +//! Read from Parquet files (e.g. TPC-H lineitem): +//! ```sh +//! cargo run --release --bin shuffle_bench -- \ +//! --input /data/tpch-sf100/lineitem/ \ +//! --partitions 200 \ +//! --codec zstd --zstd-level 1 \ +//! --hash-columns 0,3 \ +//! --read-back +//! ``` +//! +//! Generate synthetic data: +//! ```sh +//! cargo run --release --bin shuffle_bench -- \ +//! --generate --gen-rows 10000000 --gen-string-cols 4 --gen-int-cols 4 \ +//! --gen-decimal-cols 2 --gen-avg-string-len 32 \ +//! --partitions 200 --codec lz4 --read-back +//! ``` +//! +//! Profile with flamegraph: +//! ```sh +//! cargo flamegraph --release --bin shuffle_bench -- \ +//! --input /data/tpch-sf100/lineitem/ \ +//! --partitions 200 --codec zstd --zstd-level 1 +//! ``` + +use arrow::array::builder::{Date32Builder, Decimal128Builder, Int64Builder, StringBuilder}; +use arrow::array::RecordBatch; +use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; +use clap::Parser; +use comet::execution::shuffle::{ + read_ipc_compressed, CometPartitioning, CompressionCodec, ShuffleWriterExec, +}; +use datafusion::datasource::memory::MemorySourceConfig; +use datafusion::datasource::source::DataSourceExec; +use datafusion::execution::config::SessionConfig; +use datafusion::execution::runtime_env::RuntimeEnvBuilder; +use datafusion::physical_expr::expressions::Column; +use datafusion::physical_plan::common::collect; +use datafusion::physical_plan::ExecutionPlan; +use datafusion::prelude::SessionContext; +use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; +use rand::RngExt; +use std::fs; +use std::path::PathBuf; +use std::sync::Arc; +use std::time::Instant; + +#[derive(Parser, Debug)] +#[command( + name = "shuffle_bench", + about = "Standalone benchmark for Comet shuffle write and read performance" +)] +struct Args { + /// Path to input Parquet file or directory of Parquet files + #[arg(long)] + input: Option, + + /// Generate synthetic data instead of reading from Parquet + #[arg(long, default_value_t = false)] + generate: bool, + + /// Number of rows to generate (requires --generate) + #[arg(long, default_value_t = 1_000_000)] + gen_rows: usize, + + /// Number of Int64 columns to generate + #[arg(long, default_value_t = 4)] + gen_int_cols: usize, + + /// Number of Utf8 string columns to generate + #[arg(long, default_value_t = 2)] + gen_string_cols: usize, + + /// Number of Decimal128 columns to generate + #[arg(long, default_value_t = 2)] + gen_decimal_cols: usize, + + /// Number of Date32 columns to generate + #[arg(long, default_value_t = 1)] + gen_date_cols: usize, + + /// Average string length for generated string columns + #[arg(long, default_value_t = 24)] + gen_avg_string_len: usize, + + /// Batch size for reading Parquet or generating data + #[arg(long, default_value_t = 8192)] + batch_size: usize, + + /// Number of output shuffle partitions + #[arg(long, default_value_t = 200)] + partitions: usize, + + /// Partitioning scheme: hash, single, round-robin + #[arg(long, default_value = "hash")] + partitioning: String, + + /// Column indices to hash on (comma-separated, e.g. "0,3") + #[arg(long, default_value = "0")] + hash_columns: String, + + /// Compression codec: none, lz4, zstd, snappy + #[arg(long, default_value = "zstd")] + codec: String, + + /// Zstd compression level (1-22) + #[arg(long, default_value_t = 1)] + zstd_level: i32, + + /// Memory limit in bytes (triggers spilling when exceeded) + #[arg(long)] + memory_limit: Option, + + /// Also benchmark reading back the shuffle output + #[arg(long, default_value_t = false)] + read_back: bool, + + /// Number of iterations to run + #[arg(long, default_value_t = 1)] + iterations: usize, + + /// Number of warmup iterations before timing + #[arg(long, default_value_t = 0)] + warmup: usize, + + /// Output directory for shuffle data/index files + #[arg(long, default_value = "/tmp/comet_shuffle_bench")] + output_dir: PathBuf, + + /// Write buffer size in bytes + #[arg(long, default_value_t = 1048576)] + write_buffer_size: usize, +} + +fn main() { + let args = Args::parse(); + + // Validate args + if args.input.is_none() && !args.generate { + eprintln!("Error: must specify either --input or --generate"); + std::process::exit(1); + } + + // Create output directory + fs::create_dir_all(&args.output_dir).expect("Failed to create output directory"); + + let data_file = args.output_dir.join("data.out"); + let index_file = args.output_dir.join("index.out"); + + // Load data + let load_start = Instant::now(); + let batches = if let Some(ref input_path) = args.input { + load_parquet(input_path, args.batch_size) + } else { + generate_data(&args) + }; + let load_elapsed = load_start.elapsed(); + + let schema = batches[0].schema(); + let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); + let total_bytes: usize = batches.iter().map(|b| b.get_array_memory_size()).sum(); + + println!("=== Shuffle Benchmark ==="); + println!( + "Data source: {}", + if args.input.is_some() { + "parquet" + } else { + "generated" + } + ); + println!( + "Schema: {} columns ({} fields)", + schema.fields().len(), + describe_schema(&schema) + ); + println!("Total rows: {}", format_number(total_rows)); + println!("Total size: {}", format_bytes(total_bytes)); + println!("Batches: {}", batches.len()); + println!( + "Rows/batch: ~{}", + if batches.is_empty() { + 0 + } else { + total_rows / batches.len() + } + ); + println!("Load time: {:.3}s", load_elapsed.as_secs_f64()); + println!(); + + let codec = parse_codec(&args.codec, args.zstd_level); + let hash_col_indices = parse_hash_columns(&args.hash_columns); + + println!("Partitioning: {}", args.partitioning); + println!("Partitions: {}", args.partitions); + println!("Codec: {:?}", codec); + println!("Hash columns: {:?}", hash_col_indices); + if let Some(mem_limit) = args.memory_limit { + println!("Memory limit: {}", format_bytes(mem_limit)); + } + println!( + "Iterations: {} (warmup: {})", + args.iterations, args.warmup + ); + println!(); + + // Run warmup + timed iterations + let total_iters = args.warmup + args.iterations; + let mut write_times = Vec::with_capacity(args.iterations); + let mut read_times = Vec::with_capacity(args.iterations); + let mut data_file_sizes = Vec::with_capacity(args.iterations); + + for i in 0..total_iters { + let is_warmup = i < args.warmup; + let label = if is_warmup { + format!("warmup {}/{}", i + 1, args.warmup) + } else { + format!("iter {}/{}", i - args.warmup + 1, args.iterations) + }; + + // Write phase + let write_elapsed = run_shuffle_write( + &batches, + &schema, + &codec, + &hash_col_indices, + &args, + data_file.to_str().unwrap(), + index_file.to_str().unwrap(), + ); + let data_size = fs::metadata(&data_file).map(|m| m.len()).unwrap_or(0); + + if !is_warmup { + write_times.push(write_elapsed); + data_file_sizes.push(data_size); + } + + print!(" [{label}] write: {:.3}s", write_elapsed); + print!(" output: {}", format_bytes(data_size as usize)); + + // Read phase + if args.read_back { + let read_elapsed = run_shuffle_read( + data_file.to_str().unwrap(), + index_file.to_str().unwrap(), + args.partitions, + ); + if !is_warmup { + read_times.push(read_elapsed); + } + print!(" read: {:.3}s", read_elapsed); + } + println!(); + } + + // Print summary + if args.iterations > 0 { + println!(); + println!("=== Results ==="); + + let avg_write = write_times.iter().sum::() / write_times.len() as f64; + let avg_data_size = data_file_sizes.iter().sum::() / data_file_sizes.len() as u64; + let write_throughput_rows = total_rows as f64 / avg_write; + let write_throughput_bytes = total_bytes as f64 / avg_write; + let compression_ratio = if avg_data_size > 0 { + total_bytes as f64 / avg_data_size as f64 + } else { + 0.0 + }; + + println!("Write:"); + println!(" avg time: {:.3}s", avg_write); + if write_times.len() > 1 { + let min = write_times.iter().cloned().fold(f64::INFINITY, f64::min); + let max = write_times + .iter() + .cloned() + .fold(f64::NEG_INFINITY, f64::max); + println!(" min/max: {:.3}s / {:.3}s", min, max); + } + println!( + " throughput: {}/s ({} rows/s)", + format_bytes(write_throughput_bytes as usize), + format_number(write_throughput_rows as usize) + ); + println!( + " output size: {}", + format_bytes(avg_data_size as usize) + ); + println!(" compression: {:.2}x", compression_ratio); + + if !read_times.is_empty() { + let avg_read = read_times.iter().sum::() / read_times.len() as f64; + let read_throughput_bytes = avg_data_size as f64 / avg_read; + + println!("Read:"); + println!(" avg time: {:.3}s", avg_read); + if read_times.len() > 1 { + let min = read_times.iter().cloned().fold(f64::INFINITY, f64::min); + let max = read_times.iter().cloned().fold(f64::NEG_INFINITY, f64::max); + println!(" min/max: {:.3}s / {:.3}s", min, max); + } + println!( + " throughput: {}/s (from compressed)", + format_bytes(read_throughput_bytes as usize) + ); + } + } + + // Cleanup + let _ = fs::remove_file(&data_file); + let _ = fs::remove_file(&index_file); +} + +fn load_parquet(path: &PathBuf, batch_size: usize) -> Vec { + let mut batches = Vec::new(); + + let paths = if path.is_dir() { + let mut files: Vec = fs::read_dir(path) + .expect("Failed to read input directory") + .filter_map(|entry| { + let entry = entry.ok()?; + let p = entry.path(); + if p.extension().and_then(|e| e.to_str()) == Some("parquet") { + Some(p) + } else { + None + } + }) + .collect(); + files.sort(); + if files.is_empty() { + panic!("No .parquet files found in {}", path.display()); + } + files + } else { + vec![path.clone()] + }; + + for file_path in &paths { + let file = fs::File::open(file_path) + .unwrap_or_else(|e| panic!("Failed to open {}: {}", file_path.display(), e)); + let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap_or_else(|e| { + panic!( + "Failed to read Parquet metadata from {}: {}", + file_path.display(), + e + ) + }); + let reader = builder + .with_batch_size(batch_size) + .build() + .unwrap_or_else(|e| { + panic!( + "Failed to build Parquet reader for {}: {}", + file_path.display(), + e + ) + }); + for batch_result in reader { + let batch = batch_result.unwrap_or_else(|e| { + panic!("Failed to read batch from {}: {}", file_path.display(), e) + }); + if batch.num_rows() > 0 { + batches.push(batch); + } + } + } + + if batches.is_empty() { + panic!("No data read from input"); + } + + println!( + "Loaded {} batches from {} file(s)", + batches.len(), + paths.len() + ); + batches +} + +fn generate_data(args: &Args) -> Vec { + let mut fields = Vec::new(); + let mut col_idx = 0; + + // Int64 columns + for _ in 0..args.gen_int_cols { + fields.push(Field::new( + format!("int_col_{col_idx}"), + DataType::Int64, + true, + )); + col_idx += 1; + } + // String columns + for _ in 0..args.gen_string_cols { + fields.push(Field::new( + format!("str_col_{col_idx}"), + DataType::Utf8, + true, + )); + col_idx += 1; + } + // Decimal columns + for _ in 0..args.gen_decimal_cols { + fields.push(Field::new( + format!("dec_col_{col_idx}"), + DataType::Decimal128(18, 2), + true, + )); + col_idx += 1; + } + // Date columns + for _ in 0..args.gen_date_cols { + fields.push(Field::new( + format!("date_col_{col_idx}"), + DataType::Date32, + true, + )); + col_idx += 1; + } + + let schema = Arc::new(Schema::new(fields)); + let mut batches = Vec::new(); + let mut rng = rand::rng(); + let mut remaining = args.gen_rows; + + while remaining > 0 { + let batch_rows = remaining.min(args.batch_size); + remaining -= batch_rows; + + let mut columns: Vec> = Vec::new(); + + // Int64 columns + for _ in 0..args.gen_int_cols { + let mut builder = Int64Builder::with_capacity(batch_rows); + for _ in 0..batch_rows { + if rng.random_range(0..100) < 5 { + builder.append_null(); + } else { + builder.append_value(rng.random_range(0..1_000_000i64)); + } + } + columns.push(Arc::new(builder.finish())); + } + // String columns + for _ in 0..args.gen_string_cols { + let mut builder = + StringBuilder::with_capacity(batch_rows, batch_rows * args.gen_avg_string_len); + for _ in 0..batch_rows { + if rng.random_range(0..100) < 5 { + builder.append_null(); + } else { + let len = rng.random_range(1..args.gen_avg_string_len * 2); + let s: String = (0..len) + .map(|_| rng.random_range(b'a'..=b'z') as char) + .collect(); + builder.append_value(&s); + } + } + columns.push(Arc::new(builder.finish())); + } + // Decimal columns + for _ in 0..args.gen_decimal_cols { + let mut builder = Decimal128Builder::with_capacity(batch_rows) + .with_precision_and_scale(18, 2) + .unwrap(); + for _ in 0..batch_rows { + if rng.random_range(0..100) < 5 { + builder.append_null(); + } else { + builder.append_value(rng.random_range(0..100_000_000i128)); + } + } + columns.push(Arc::new(builder.finish())); + } + // Date columns + for _ in 0..args.gen_date_cols { + let mut builder = Date32Builder::with_capacity(batch_rows); + for _ in 0..batch_rows { + if rng.random_range(0..100) < 5 { + builder.append_null(); + } else { + builder.append_value(rng.random_range(0..20000i32)); + } + } + columns.push(Arc::new(builder.finish())); + } + + let batch = RecordBatch::try_new(Arc::clone(&schema), columns).unwrap(); + batches.push(batch); + } + + println!( + "Generated {} batches ({} rows)", + batches.len(), + args.gen_rows + ); + batches +} + +fn run_shuffle_write( + batches: &[RecordBatch], + schema: &SchemaRef, + codec: &CompressionCodec, + hash_col_indices: &[usize], + args: &Args, + data_file: &str, + index_file: &str, +) -> f64 { + let partitioning = build_partitioning( + &args.partitioning, + args.partitions, + hash_col_indices, + schema, + ); + + let partitions = &[batches.to_vec()]; + let exec = ShuffleWriterExec::try_new( + Arc::new(DataSourceExec::new(Arc::new( + MemorySourceConfig::try_new(partitions, Arc::clone(schema), None).unwrap(), + ))), + partitioning, + codec.clone(), + data_file.to_string(), + index_file.to_string(), + false, + args.write_buffer_size, + ) + .expect("Failed to create ShuffleWriterExec"); + + let config = SessionConfig::new().with_batch_size(args.batch_size); + let mut runtime_builder = RuntimeEnvBuilder::new(); + if let Some(mem_limit) = args.memory_limit { + runtime_builder = runtime_builder.with_memory_limit(mem_limit, 1.0); + } + let runtime_env = Arc::new(runtime_builder.build().unwrap()); + let ctx = SessionContext::new_with_config_rt(config, runtime_env); + let task_ctx = ctx.task_ctx(); + + let start = Instant::now(); + let stream = exec.execute(0, task_ctx).unwrap(); + let rt = tokio::runtime::Runtime::new().unwrap(); + rt.block_on(collect(stream)).unwrap(); + start.elapsed().as_secs_f64() +} + +fn run_shuffle_read(data_file: &str, index_file: &str, num_partitions: usize) -> f64 { + let start = Instant::now(); + + // Read index file to get partition offsets + let index_bytes = fs::read(index_file).expect("Failed to read index file"); + let num_offsets = index_bytes.len() / 8; + let offsets: Vec = (0..num_offsets) + .map(|i| { + let bytes: [u8; 8] = index_bytes[i * 8..(i + 1) * 8].try_into().unwrap(); + i64::from_le_bytes(bytes) + }) + .collect(); + + // Read data file + let data_bytes = fs::read(data_file).expect("Failed to read data file"); + + let mut total_rows = 0usize; + let mut total_batches = 0usize; + + // Decode each partition's data + for p in 0..num_partitions.min(offsets.len().saturating_sub(1)) { + let start_offset = offsets[p] as usize; + let end_offset = offsets[p + 1] as usize; + + if start_offset >= end_offset { + continue; // Empty partition + } + + // Read all IPC blocks within this partition + let mut offset = start_offset; + while offset < end_offset { + // First 8 bytes: IPC length + let ipc_length = + u64::from_le_bytes(data_bytes[offset..offset + 8].try_into().unwrap()) as usize; + + // Skip 8-byte length prefix, then 8 bytes of field_count + codec header + let block_data = &data_bytes[offset + 16..offset + 8 + ipc_length]; + let batch = read_ipc_compressed(block_data).expect("Failed to decode shuffle block"); + total_rows += batch.num_rows(); + total_batches += 1; + + offset += 8 + ipc_length; + } + } + + let elapsed = start.elapsed().as_secs_f64(); + eprintln!( + " read back {} rows in {} batches from {} partitions", + format_number(total_rows), + total_batches, + num_partitions + ); + elapsed +} + +fn build_partitioning( + scheme: &str, + num_partitions: usize, + hash_col_indices: &[usize], + schema: &SchemaRef, +) -> CometPartitioning { + match scheme { + "single" => CometPartitioning::SinglePartition, + "round-robin" => CometPartitioning::RoundRobin(num_partitions, 0), + "hash" => { + let exprs: Vec> = hash_col_indices + .iter() + .map(|&idx| { + let field = schema.field(idx); + Arc::new(Column::new(field.name(), idx)) + as Arc + }) + .collect(); + CometPartitioning::Hash(exprs, num_partitions) + } + other => { + eprintln!("Unknown partitioning scheme: {other}. Using hash."); + build_partitioning("hash", num_partitions, hash_col_indices, schema) + } + } +} + +fn parse_codec(codec: &str, zstd_level: i32) -> CompressionCodec { + match codec.to_lowercase().as_str() { + "none" => CompressionCodec::None, + "lz4" => CompressionCodec::Lz4Frame, + "zstd" => CompressionCodec::Zstd(zstd_level), + "snappy" => CompressionCodec::Snappy, + other => { + eprintln!("Unknown codec: {other}. Using zstd."); + CompressionCodec::Zstd(zstd_level) + } + } +} + +fn parse_hash_columns(s: &str) -> Vec { + s.split(',') + .filter(|s| !s.is_empty()) + .map(|s| s.trim().parse::().expect("Invalid column index")) + .collect() +} + +fn describe_schema(schema: &Schema) -> String { + let mut counts = std::collections::HashMap::new(); + for field in schema.fields() { + let type_name = match field.data_type() { + DataType::Int8 + | DataType::Int16 + | DataType::Int32 + | DataType::Int64 + | DataType::UInt8 + | DataType::UInt16 + | DataType::UInt32 + | DataType::UInt64 => "int", + DataType::Float16 | DataType::Float32 | DataType::Float64 => "float", + DataType::Utf8 | DataType::LargeUtf8 => "string", + DataType::Boolean => "bool", + DataType::Date32 | DataType::Date64 => "date", + DataType::Decimal128(_, _) | DataType::Decimal256(_, _) => "decimal", + DataType::Timestamp(_, _) => "timestamp", + DataType::Binary | DataType::LargeBinary | DataType::FixedSizeBinary(_) => "binary", + _ => "other", + }; + *counts.entry(type_name).or_insert(0) += 1; + } + let mut parts: Vec = counts + .into_iter() + .map(|(k, v)| format!("{}x{}", v, k)) + .collect(); + parts.sort(); + parts.join(", ") +} + +fn format_number(n: usize) -> String { + let s = n.to_string(); + let mut result = String::new(); + for (i, c) in s.chars().rev().enumerate() { + if i > 0 && i % 3 == 0 { + result.push(','); + } + result.push(c); + } + result.chars().rev().collect() +} + +fn format_bytes(bytes: usize) -> String { + if bytes >= 1024 * 1024 * 1024 { + format!("{:.2} GiB", bytes as f64 / (1024.0 * 1024.0 * 1024.0)) + } else if bytes >= 1024 * 1024 { + format!("{:.2} MiB", bytes as f64 / (1024.0 * 1024.0)) + } else if bytes >= 1024 { + format!("{:.2} KiB", bytes as f64 / 1024.0) + } else { + format!("{bytes} B") + } +} From 60c55850eb20dc7b5625a88949ff4212765d6bba Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 21 Mar 2026 09:05:16 -0600 Subject: [PATCH 08/11] feat: add --limit option to shuffle benchmark (default 1M rows) --- native/core/src/bin/shuffle_bench.rs | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/native/core/src/bin/shuffle_bench.rs b/native/core/src/bin/shuffle_bench.rs index c1498161f7..9b963c5803 100644 --- a/native/core/src/bin/shuffle_bench.rs +++ b/native/core/src/bin/shuffle_bench.rs @@ -152,6 +152,10 @@ struct Args { /// Write buffer size in bytes #[arg(long, default_value_t = 1048576)] write_buffer_size: usize, + + /// Maximum number of rows to use (default: 1,000,000) + #[arg(long, default_value_t = 1_000_000)] + limit: usize, } fn main() { @@ -178,6 +182,26 @@ fn main() { }; let load_elapsed = load_start.elapsed(); + // Apply row limit + let batches = { + let mut limited = Vec::new(); + let mut rows_so_far = 0usize; + for batch in batches { + if rows_so_far >= args.limit { + break; + } + let remaining = args.limit - rows_so_far; + if batch.num_rows() <= remaining { + rows_so_far += batch.num_rows(); + limited.push(batch); + } else { + limited.push(batch.slice(0, remaining)); + rows_so_far += remaining; + } + } + limited + }; + let schema = batches[0].schema(); let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); let total_bytes: usize = batches.iter().map(|b| b.get_array_memory_size()).sum(); From c0193cbbff396e8debea417667b23299867955dd Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 21 Mar 2026 09:19:01 -0600 Subject: [PATCH 09/11] perf: apply limit during parquet read to avoid scanning all files --- native/core/src/bin/shuffle_bench.rs | 43 ++++++++++++---------------- 1 file changed, 18 insertions(+), 25 deletions(-) diff --git a/native/core/src/bin/shuffle_bench.rs b/native/core/src/bin/shuffle_bench.rs index 9b963c5803..17b1a9a6ff 100644 --- a/native/core/src/bin/shuffle_bench.rs +++ b/native/core/src/bin/shuffle_bench.rs @@ -176,32 +176,12 @@ fn main() { // Load data let load_start = Instant::now(); let batches = if let Some(ref input_path) = args.input { - load_parquet(input_path, args.batch_size) + load_parquet(input_path, args.batch_size, args.limit) } else { generate_data(&args) }; let load_elapsed = load_start.elapsed(); - // Apply row limit - let batches = { - let mut limited = Vec::new(); - let mut rows_so_far = 0usize; - for batch in batches { - if rows_so_far >= args.limit { - break; - } - let remaining = args.limit - rows_so_far; - if batch.num_rows() <= remaining { - rows_so_far += batch.num_rows(); - limited.push(batch); - } else { - limited.push(batch.slice(0, remaining)); - rows_so_far += remaining; - } - } - limited - }; - let schema = batches[0].schema(); let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); let total_bytes: usize = batches.iter().map(|b| b.get_array_memory_size()).sum(); @@ -358,8 +338,9 @@ fn main() { let _ = fs::remove_file(&index_file); } -fn load_parquet(path: &PathBuf, batch_size: usize) -> Vec { +fn load_parquet(path: &PathBuf, batch_size: usize, limit: usize) -> Vec { let mut batches = Vec::new(); + let mut total_rows = 0usize; let paths = if path.is_dir() { let mut files: Vec = fs::read_dir(path) @@ -383,7 +364,7 @@ fn load_parquet(path: &PathBuf, batch_size: usize) -> Vec { vec![path.clone()] }; - for file_path in &paths { + 'outer: for file_path in &paths { let file = fs::File::open(file_path) .unwrap_or_else(|e| panic!("Failed to open {}: {}", file_path.display(), e)); let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap_or_else(|e| { @@ -407,8 +388,19 @@ fn load_parquet(path: &PathBuf, batch_size: usize) -> Vec { let batch = batch_result.unwrap_or_else(|e| { panic!("Failed to read batch from {}: {}", file_path.display(), e) }); - if batch.num_rows() > 0 { + if batch.num_rows() == 0 { + continue; + } + let remaining = limit - total_rows; + if batch.num_rows() <= remaining { + total_rows += batch.num_rows(); batches.push(batch); + } else { + batches.push(batch.slice(0, remaining)); + total_rows += remaining; + } + if total_rows >= limit { + break 'outer; } } } @@ -418,8 +410,9 @@ fn load_parquet(path: &PathBuf, batch_size: usize) -> Vec { } println!( - "Loaded {} batches from {} file(s)", + "Loaded {} batches ({} rows) from {} file(s)", batches.len(), + format_number(total_rows), paths.len() ); batches From 7bae4d116b05f63d3301952487404e741a758415 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 21 Mar 2026 10:39:35 -0600 Subject: [PATCH 10/11] style: apply cargo fmt --- .../shuffle/partitioners/multi_partition.rs | 40 ++++++++++++------- .../shuffle/writers/partition_writer.rs | 12 ++---- 2 files changed, 28 insertions(+), 24 deletions(-) diff --git a/native/core/src/execution/shuffle/partitioners/multi_partition.rs b/native/core/src/execution/shuffle/partitioners/multi_partition.rs index f160896031..697710149e 100644 --- a/native/core/src/execution/shuffle/partitioners/multi_partition.rs +++ b/native/core/src/execution/shuffle/partitioners/multi_partition.rs @@ -460,8 +460,11 @@ impl MultiPartitionShuffleRepartitioner { let row = row_idx as usize; let src_offset = row * byte_width; let is_valid = nulls.is_none_or(|n| n.is_valid(row)); - self.partition_buffers[p] - .append_fixed(col_idx, &values[src_offset..src_offset + byte_width], is_valid); + self.partition_buffers[p].append_fixed( + col_idx, + &values[src_offset..src_offset + byte_width], + is_valid, + ); } } } else if is_variable { @@ -480,8 +483,11 @@ impl MultiPartitionShuffleRepartitioner { let val_start = offsets_slice[row] as usize; let val_end = offsets_slice[row + 1] as usize; let is_valid = nulls.is_none_or(|n| n.is_valid(row)); - self.partition_buffers[p] - .append_variable(col_idx, &values_slice[val_start..val_end], is_valid); + self.partition_buffers[p].append_variable( + col_idx, + &values_slice[val_start..val_end], + is_valid, + ); } } } else if is_large_variable { @@ -500,8 +506,11 @@ impl MultiPartitionShuffleRepartitioner { let val_start = offsets_slice[row] as usize; let val_end = offsets_slice[row + 1] as usize; let is_valid = nulls.is_none_or(|n| n.is_valid(row)); - self.partition_buffers[p] - .append_large_variable(col_idx, &values_slice[val_start..val_end], is_valid); + self.partition_buffers[p].append_large_variable( + col_idx, + &values_slice[val_start..val_end], + is_valid, + ); } } } else if is_boolean { @@ -516,8 +525,11 @@ impl MultiPartitionShuffleRepartitioner { for &row_idx in row_indices { let row = row_idx as usize; let is_valid = nulls.is_none_or(|n| n.is_valid(row)); - self.partition_buffers[p] - .append_bool(col_idx, bool_array.value(row), is_valid); + self.partition_buffers[p].append_bool( + col_idx, + bool_array.value(row), + is_valid, + ); } } } else { @@ -530,14 +542,15 @@ impl MultiPartitionShuffleRepartitioner { } let row_indices = &partition_row_indices[start..end]; for &row_idx in row_indices { - self.partition_buffers[p] - .append_fallback_index(col_idx, row_idx); + self.partition_buffers[p].append_fallback_index(col_idx, row_idx); } } } } - self.metrics.scatter_time.add_duration(scatter_start.elapsed()); + self.metrics + .scatter_time + .add_duration(scatter_start.elapsed()); // Update row counts from partition_starts (O(num_partitions), not O(num_rows)) for p in 0..num_partitions { @@ -705,10 +718,7 @@ impl ShufflePartitioner for MultiPartitionShuffleRepartitioner { &self.metrics.encode_time, &self.metrics.write_time, )?; - buf_batch_writer.flush( - &self.metrics.encode_time, - &self.metrics.write_time, - )?; + buf_batch_writer.flush(&self.metrics.encode_time, &self.metrics.write_time)?; } } diff --git a/native/core/src/execution/shuffle/writers/partition_writer.rs b/native/core/src/execution/shuffle/writers/partition_writer.rs index 40762ff1ce..6b2f6b0b7e 100644 --- a/native/core/src/execution/shuffle/writers/partition_writer.rs +++ b/native/core/src/execution/shuffle/writers/partition_writer.rs @@ -94,16 +94,10 @@ impl PartitionWriter { ); let mut bytes_written = 0; for batch in batches { - bytes_written += buf_batch_writer.write( - batch, - &metrics.encode_time, - &metrics.write_time, - )?; + bytes_written += + buf_batch_writer.write(batch, &metrics.encode_time, &metrics.write_time)?; } - buf_batch_writer.flush( - &metrics.encode_time, - &metrics.write_time, - )?; + buf_batch_writer.flush(&metrics.encode_time, &metrics.write_time)?; bytes_written }; Ok(total_bytes_written) From a9b2114c014dae95a859fe5b62ee850cbd72325e Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 21 Mar 2026 12:10:12 -0600 Subject: [PATCH 11/11] refactor: simplify scatter kernel with single match dispatch and deduplicated type lists - Use DataType::primitive_width() instead of manual byte_width mapping - Derive has_fallback_columns from PartitionBuffer instead of duplicating type list - Replace 4 separate matches! checks with single match on ColumnBuffer variant - Make auto-flush vs fallback-flush mutually exclusive - Remove dead clear() method --- .../shuffle/partitioners/multi_partition.rs | 271 +++++++----------- .../shuffle/partitioners/partition_buffer.rs | 109 ++----- 2 files changed, 135 insertions(+), 245 deletions(-) diff --git a/native/core/src/execution/shuffle/partitioners/multi_partition.rs b/native/core/src/execution/shuffle/partitioners/multi_partition.rs index 697710149e..46449d2f96 100644 --- a/native/core/src/execution/shuffle/partitioners/multi_partition.rs +++ b/native/core/src/execution/shuffle/partitioners/multi_partition.rs @@ -16,7 +16,7 @@ // under the License. use crate::execution::shuffle::metrics::ShufflePartitionerMetrics; -use crate::execution::shuffle::partitioners::partition_buffer::{self, PartitionBuffer}; +use crate::execution::shuffle::partitioners::partition_buffer::{ColumnBuffer, PartitionBuffer}; use crate::execution::shuffle::partitioners::ShufflePartitioner; use crate::execution::shuffle::writers::{BufBatchWriter, PartitionWriter}; use crate::execution::shuffle::{ @@ -24,7 +24,7 @@ use crate::execution::shuffle::{ }; use crate::execution::tracing::{with_trace, with_trace_async}; use arrow::array::{Array, ArrayRef, BooleanArray, RecordBatch}; -use arrow::datatypes::{DataType, SchemaRef}; +use arrow::datatypes::SchemaRef; use datafusion::common::DataFusionError; use datafusion::execution::memory_pool::{MemoryConsumer, MemoryReservation}; use datafusion::execution::runtime_env::RuntimeEnv; @@ -105,7 +105,6 @@ pub(crate) struct MultiPartitionShuffleRepartitioner { output_data_file: String, output_index_file: String, partition_buffers: Vec, - has_fallback_columns: bool, partition_writers: Vec, shuffle_block_writer: ShuffleBlockWriter, /// Partitioning scheme to use @@ -167,32 +166,6 @@ impl MultiPartitionShuffleRepartitioner { .map(|_| PartitionWriter::try_new(shuffle_block_writer.clone())) .collect::>>()?; - let has_fallback_columns = schema.fields().iter().any(|f| { - !matches!( - f.data_type(), - DataType::Boolean - | DataType::Int8 - | DataType::Int16 - | DataType::Int32 - | DataType::Int64 - | DataType::UInt8 - | DataType::UInt16 - | DataType::UInt32 - | DataType::UInt64 - | DataType::Float16 - | DataType::Float32 - | DataType::Float64 - | DataType::Date32 - | DataType::Date64 - | DataType::Timestamp(_, _) - | DataType::Duration(_) - | DataType::Decimal128(_, _) - | DataType::Utf8 - | DataType::Binary - | DataType::LargeUtf8 - | DataType::LargeBinary - ) - }); let estimated_rows_per_partition = batch_size / num_output_partitions.max(1); let partition_buffers = (0..num_output_partitions) .map(|_| PartitionBuffer::new(Arc::clone(&schema), estimated_rows_per_partition)) @@ -206,7 +179,6 @@ impl MultiPartitionShuffleRepartitioner { output_data_file, output_index_file, partition_buffers, - has_fallback_columns, partition_writers, shuffle_block_writer, partitioning, @@ -421,128 +393,112 @@ impl MultiPartitionShuffleRepartitioner { // rows within that partition. This keeps writes to the same partition buffer // sequential for better cache locality. for (col_idx, column) in input.columns().iter().enumerate() { - // Determine scatter path from first partition's column type - // (all partitions have the same column types) - let is_fixed = matches!( - self.partition_buffers[0].columns[col_idx], - partition_buffer::ColumnBuffer::Fixed { .. } - ); - let is_variable = matches!( - self.partition_buffers[0].columns[col_idx], - partition_buffer::ColumnBuffer::Variable { .. } - ); - let is_large_variable = matches!( - self.partition_buffers[0].columns[col_idx], - partition_buffer::ColumnBuffer::LargeVariable { .. } - ); - let is_boolean = matches!( - self.partition_buffers[0].columns[col_idx], - partition_buffer::ColumnBuffer::Boolean { .. } - ); - let nulls = column.nulls(); - if is_fixed { - let byte_width = match &self.partition_buffers[0].columns[col_idx] { - partition_buffer::ColumnBuffer::Fixed { byte_width, .. } => *byte_width, - _ => unreachable!(), - }; - let data = column.to_data(); - let values = data.buffers()[0].as_slice(); - for p in 0..num_partitions { - let start = partition_starts[p] as usize; - let end = partition_starts[p + 1] as usize; - if start == end { - continue; - } - let row_indices = &partition_row_indices[start..end]; - for &row_idx in row_indices { - let row = row_idx as usize; - let src_offset = row * byte_width; - let is_valid = nulls.is_none_or(|n| n.is_valid(row)); - self.partition_buffers[p].append_fixed( - col_idx, - &values[src_offset..src_offset + byte_width], - is_valid, - ); + // Single match to determine scatter path from first partition's column type + match &self.partition_buffers[0].columns[col_idx] { + ColumnBuffer::Fixed { byte_width, .. } => { + let byte_width = *byte_width; + let data = column.to_data(); + let values = data.buffers()[0].as_slice(); + for p in 0..num_partitions { + let start = partition_starts[p] as usize; + let end = partition_starts[p + 1] as usize; + if start == end { + continue; + } + let row_indices = &partition_row_indices[start..end]; + for &row_idx in row_indices { + let row = row_idx as usize; + let src_offset = row * byte_width; + let is_valid = nulls.is_none_or(|n| n.is_valid(row)); + self.partition_buffers[p].append_fixed( + col_idx, + &values[src_offset..src_offset + byte_width], + is_valid, + ); + } } } - } else if is_variable { - let data = column.to_data(); - let offsets_slice = data.buffers()[0].typed_data::(); - let values_slice = data.buffers()[1].as_slice(); - for p in 0..num_partitions { - let start = partition_starts[p] as usize; - let end = partition_starts[p + 1] as usize; - if start == end { - continue; - } - let row_indices = &partition_row_indices[start..end]; - for &row_idx in row_indices { - let row = row_idx as usize; - let val_start = offsets_slice[row] as usize; - let val_end = offsets_slice[row + 1] as usize; - let is_valid = nulls.is_none_or(|n| n.is_valid(row)); - self.partition_buffers[p].append_variable( - col_idx, - &values_slice[val_start..val_end], - is_valid, - ); + ColumnBuffer::Variable { .. } => { + let data = column.to_data(); + let offsets_slice = data.buffers()[0].typed_data::(); + let values_slice = data.buffers()[1].as_slice(); + for p in 0..num_partitions { + let start = partition_starts[p] as usize; + let end = partition_starts[p + 1] as usize; + if start == end { + continue; + } + let row_indices = &partition_row_indices[start..end]; + for &row_idx in row_indices { + let row = row_idx as usize; + let val_start = offsets_slice[row] as usize; + let val_end = offsets_slice[row + 1] as usize; + let is_valid = nulls.is_none_or(|n| n.is_valid(row)); + self.partition_buffers[p].append_variable( + col_idx, + &values_slice[val_start..val_end], + is_valid, + ); + } } } - } else if is_large_variable { - let data = column.to_data(); - let offsets_slice = data.buffers()[0].typed_data::(); - let values_slice = data.buffers()[1].as_slice(); - for p in 0..num_partitions { - let start = partition_starts[p] as usize; - let end = partition_starts[p + 1] as usize; - if start == end { - continue; - } - let row_indices = &partition_row_indices[start..end]; - for &row_idx in row_indices { - let row = row_idx as usize; - let val_start = offsets_slice[row] as usize; - let val_end = offsets_slice[row + 1] as usize; - let is_valid = nulls.is_none_or(|n| n.is_valid(row)); - self.partition_buffers[p].append_large_variable( - col_idx, - &values_slice[val_start..val_end], - is_valid, - ); + ColumnBuffer::LargeVariable { .. } => { + let data = column.to_data(); + let offsets_slice = data.buffers()[0].typed_data::(); + let values_slice = data.buffers()[1].as_slice(); + for p in 0..num_partitions { + let start = partition_starts[p] as usize; + let end = partition_starts[p + 1] as usize; + if start == end { + continue; + } + let row_indices = &partition_row_indices[start..end]; + for &row_idx in row_indices { + let row = row_idx as usize; + let val_start = offsets_slice[row] as usize; + let val_end = offsets_slice[row + 1] as usize; + let is_valid = nulls.is_none_or(|n| n.is_valid(row)); + self.partition_buffers[p].append_large_variable( + col_idx, + &values_slice[val_start..val_end], + is_valid, + ); + } } } - } else if is_boolean { - let bool_array = column.as_any().downcast_ref::().unwrap(); - for p in 0..num_partitions { - let start = partition_starts[p] as usize; - let end = partition_starts[p + 1] as usize; - if start == end { - continue; - } - let row_indices = &partition_row_indices[start..end]; - for &row_idx in row_indices { - let row = row_idx as usize; - let is_valid = nulls.is_none_or(|n| n.is_valid(row)); - self.partition_buffers[p].append_bool( - col_idx, - bool_array.value(row), - is_valid, - ); + ColumnBuffer::Boolean { .. } => { + let bool_array = column.as_any().downcast_ref::().unwrap(); + for p in 0..num_partitions { + let start = partition_starts[p] as usize; + let end = partition_starts[p + 1] as usize; + if start == end { + continue; + } + let row_indices = &partition_row_indices[start..end]; + for &row_idx in row_indices { + let row = row_idx as usize; + let is_valid = nulls.is_none_or(|n| n.is_valid(row)); + self.partition_buffers[p].append_bool( + col_idx, + bool_array.value(row), + is_valid, + ); + } } } - } else { - // Fallback - for p in 0..num_partitions { - let start = partition_starts[p] as usize; - let end = partition_starts[p + 1] as usize; - if start == end { - continue; - } - let row_indices = &partition_row_indices[start..end]; - for &row_idx in row_indices { - self.partition_buffers[p].append_fallback_index(col_idx, row_idx); + ColumnBuffer::Fallback { .. } => { + for p in 0..num_partitions { + let start = partition_starts[p] as usize; + let end = partition_starts[p + 1] as usize; + if start == end { + continue; + } + let row_indices = &partition_row_indices[start..end]; + for &row_idx in row_indices { + self.partition_buffers[p].append_fallback_index(col_idx, row_idx); + } } } } @@ -552,15 +508,23 @@ impl MultiPartitionShuffleRepartitioner { .scatter_time .add_duration(scatter_start.elapsed()); - // Update row counts from partition_starts (O(num_partitions), not O(num_rows)) + // O(num_partitions) rather than O(num_rows) for p in 0..num_partitions { let count = (partition_starts[p + 1] - partition_starts[p]) as usize; self.partition_buffers[p].row_count += count; } - // Auto-flush partitions that reached batch_size + // Flush partitions. When fallback columns exist, flush ALL non-empty + // partitions since fallback indices reference the current input batch. + // Otherwise, only flush partitions that reached batch_size. + let flush_all = self.partition_buffers[0].has_fallback_columns(); for p in 0..num_partitions { - if self.partition_buffers[p].row_count >= self.batch_size { + let should_flush = if flush_all { + self.partition_buffers[p].row_count > 0 + } else { + self.partition_buffers[p].row_count >= self.batch_size + }; + if should_flush { let batch = self.partition_buffers[p].flush(Some(input))?; self.partition_writers[p].spill( &[batch], @@ -572,23 +536,6 @@ impl MultiPartitionShuffleRepartitioner { } } - // If schema has fallback columns, flush ALL non-empty partitions - // since fallback indices reference the current input batch - if self.has_fallback_columns { - for p in 0..num_partitions { - if self.partition_buffers[p].row_count > 0 { - let batch = self.partition_buffers[p].flush(Some(input))?; - self.partition_writers[p].spill( - &[batch], - &self.runtime, - &self.metrics, - self.write_buffer_size, - self.batch_size, - )?; - } - } - } - // Precise memory tracking let mem_after: usize = self.partition_buffers.iter().map(|b| b.memory_size()).sum(); let mem_growth = mem_after.saturating_sub(mem_before); diff --git a/native/core/src/execution/shuffle/partitioners/partition_buffer.rs b/native/core/src/execution/shuffle/partitioners/partition_buffer.rs index ea9eaaa200..f91c4fd141 100644 --- a/native/core/src/execution/shuffle/partitioners/partition_buffer.rs +++ b/native/core/src/execution/shuffle/partitioners/partition_buffer.rs @@ -149,54 +149,36 @@ impl PartitionBuffer { let columns = schema .fields() .iter() - .map(|field| match field.data_type() { - DataType::Boolean => ColumnBuffer::Boolean { - values: BooleanBufferBuilder::new(estimated_rows), - nulls: BooleanBufferBuilder::new(estimated_rows), - }, - DataType::Int8 | DataType::UInt8 => ColumnBuffer::Fixed { - values: MutableBuffer::new(estimated_rows), - byte_width: 1, - nulls: BooleanBufferBuilder::new(estimated_rows), - }, - DataType::Int16 | DataType::UInt16 | DataType::Float16 => ColumnBuffer::Fixed { - values: MutableBuffer::new(estimated_rows * 2), - byte_width: 2, - nulls: BooleanBufferBuilder::new(estimated_rows), - }, - DataType::Int32 | DataType::UInt32 | DataType::Float32 | DataType::Date32 => { + .map(|field| { + let dt = field.data_type(); + if let DataType::Boolean = dt { + ColumnBuffer::Boolean { + values: BooleanBufferBuilder::new(estimated_rows), + nulls: BooleanBufferBuilder::new(estimated_rows), + } + } else if let Some(byte_width) = dt.primitive_width() { ColumnBuffer::Fixed { - values: MutableBuffer::new(estimated_rows * 4), - byte_width: 4, + values: MutableBuffer::new(estimated_rows * byte_width), + byte_width, nulls: BooleanBufferBuilder::new(estimated_rows), } + } else { + match dt { + DataType::Utf8 | DataType::Binary => ColumnBuffer::Variable { + offsets: vec![0i32], + data: vec![], + nulls: BooleanBufferBuilder::new(estimated_rows), + }, + DataType::LargeUtf8 | DataType::LargeBinary => { + ColumnBuffer::LargeVariable { + offsets: vec![0i64], + data: vec![], + nulls: BooleanBufferBuilder::new(estimated_rows), + } + } + _ => ColumnBuffer::Fallback { indices: vec![] }, + } } - DataType::Int64 - | DataType::UInt64 - | DataType::Float64 - | DataType::Date64 - | DataType::Timestamp(_, _) - | DataType::Duration(_) => ColumnBuffer::Fixed { - values: MutableBuffer::new(estimated_rows * 8), - byte_width: 8, - nulls: BooleanBufferBuilder::new(estimated_rows), - }, - DataType::Decimal128(_, _) => ColumnBuffer::Fixed { - values: MutableBuffer::new(estimated_rows * 16), - byte_width: 16, - nulls: BooleanBufferBuilder::new(estimated_rows), - }, - DataType::Utf8 | DataType::Binary => ColumnBuffer::Variable { - offsets: vec![0i32], - data: vec![], - nulls: BooleanBufferBuilder::new(estimated_rows), - }, - DataType::LargeUtf8 | DataType::LargeBinary => ColumnBuffer::LargeVariable { - offsets: vec![0i64], - data: vec![], - nulls: BooleanBufferBuilder::new(estimated_rows), - }, - _ => ColumnBuffer::Fallback { indices: vec![] }, }) .collect(); @@ -239,51 +221,12 @@ impl PartitionBuffer { self.columns.iter().map(|c| c.memory_size()).sum() } - #[allow(dead_code)] pub(crate) fn has_fallback_columns(&self) -> bool { self.columns .iter() .any(|c| matches!(c, ColumnBuffer::Fallback { .. })) } - #[allow(dead_code)] - pub(crate) fn clear(&mut self) { - self.row_count = 0; - for col in &mut self.columns { - match col { - ColumnBuffer::Boolean { values, nulls } => { - *values = BooleanBufferBuilder::new(0); - *nulls = BooleanBufferBuilder::new(0); - } - ColumnBuffer::Fixed { values, nulls, .. } => { - *values = MutableBuffer::new(0); - *nulls = BooleanBufferBuilder::new(0); - } - ColumnBuffer::Variable { - offsets, - data, - nulls, - } => { - *offsets = vec![0i32]; - data.clear(); - *nulls = BooleanBufferBuilder::new(0); - } - ColumnBuffer::LargeVariable { - offsets, - data, - nulls, - } => { - *offsets = vec![0i64]; - data.clear(); - *nulls = BooleanBufferBuilder::new(0); - } - ColumnBuffer::Fallback { indices } => { - indices.clear(); - } - } - } - } - pub(crate) fn flush(&mut self, fallback_batch: Option<&RecordBatch>) -> Result { let row_count = self.row_count; let mut arrays: Vec = Vec::with_capacity(self.columns.len());