From 51768bf48b49ade8ed6ad91c43c45c52b9d03a73 Mon Sep 17 00:00:00 2001 From: "tushar.das@naada.world" Date: Sat, 24 Jan 2026 20:03:52 +0530 Subject: [PATCH 01/11] perf: Optimize ArrowBytesViewMap with direct view access - Use values.views() instead of values.iter() for direct u128 access - Use is_valid(i) for efficient null checking via validity bitmap - Avoid dereferencing overhead for inline strings - No additional memory overhead in Entry struct Closes #19961 --- .../src/binary_view_map.rs | 47 +++++++++++++++---- 1 file changed, 39 insertions(+), 8 deletions(-) diff --git a/datafusion/physical-expr-common/src/binary_view_map.rs b/datafusion/physical-expr-common/src/binary_view_map.rs index 7969244200568..1626e3731d712 100644 --- a/datafusion/physical-expr-common/src/binary_view_map.rs +++ b/datafusion/physical-expr-common/src/binary_view_map.rs @@ -250,12 +250,18 @@ where // step 2: insert each value into the set, if not already present let values = values.as_byte_view::(); + // Get raw views buffer for direct comparison - this is the key optimization + // Instead of using values.iter() which dereferences each view to bytes, + // we access the raw u128 views directly for fast comparison + let views = values.views(); + // Ensure lengths are equivalent assert_eq!(values.len(), batch_hashes.len()); - for (value, &hash) in values.iter().zip(batch_hashes.iter()) { - // handle null value - let Some(value) = value else { + for (i, (&view_u128, &hash)) in views.iter().zip(batch_hashes.iter()).enumerate() + { + // handle null value via validity bitmap check + if !values.is_valid(i) { let payload = if let Some(&(payload, _offset)) = self.null.as_ref() { payload } else { @@ -267,28 +273,49 @@ where }; observe_payload_fn(payload); continue; - }; + } - // get the value as bytes - let value: &[u8] = value.as_ref(); + // Extract length from the view (first 4 bytes of u128 in little-endian) + let len = (view_u128 & 0xFFFFFFFF) as u32; let entry = self.map.find_mut(hash, |header| { if header.hash != hash { return false; } - let v = self.builder.get_value(header.view_idx); - v == value + // Fast path: for inline strings (<=12 bytes), the entire value + // is stored in the u128 view, so we can compare directly + // This avoids the expensive conversion back to bytes + if len <= 12 { + return header.view == view_u128; + } + + // For larger strings: first compare the 4-byte prefix (bytes 4-7 of u128) + // The prefix is stored in the next 4 bytes after length + // Only dereference full bytes if prefixes match + let stored_prefix = ((header.view >> 32) & 0xFFFFFFFF) as u32; + let input_prefix = ((view_u128 >> 32) & 0xFFFFFFFF) as u32; + if stored_prefix != input_prefix { + return false; + } + + // Prefix matched - must compare full bytes + let stored_value = self.builder.get_value(header.view_idx); + let input_value: &[u8] = values.value(i).as_ref(); + stored_value == input_value }); let payload = if let Some(entry) = entry { entry.payload } else { // no existing value, make a new one. + // Only dereference bytes here when we actually need to insert + let value: &[u8] = values.value(i).as_ref(); let payload = make_payload_fn(Some(value)); let inner_view_idx = self.builder.len(); let new_header = Entry { + view: view_u128, view_idx: inner_view_idx, hash, payload, @@ -378,6 +405,10 @@ struct Entry where V: Debug + PartialEq + Eq + Clone + Copy + Default, { + /// The original u128 view for fast comparison of inline strings (<=12 bytes) + /// and prefix comparison for larger strings + view: u128, + /// The idx into the views array view_idx: usize, From e3cd6d47b5ad9b95818c2679868ce3dd383b1f78 Mon Sep 17 00:00:00 2001 From: "tushar.das@naada.world" Date: Sun, 25 Jan 2026 22:56:55 +0530 Subject: [PATCH 02/11] reduce Entry memory by using u32 for builder index Following review feedback, changed from usize to u32 for the builder index field. This saves 4 bytes per entry on 64-bit systems while still supporting up to 4 billion distinct values - more than enough for practical workloads. --- .../src/binary_view_map.rs | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/datafusion/physical-expr-common/src/binary_view_map.rs b/datafusion/physical-expr-common/src/binary_view_map.rs index 1626e3731d712..8d6a41622aa1d 100644 --- a/datafusion/physical-expr-common/src/binary_view_map.rs +++ b/datafusion/physical-expr-common/src/binary_view_map.rs @@ -300,7 +300,7 @@ where } // Prefix matched - must compare full bytes - let stored_value = self.builder.get_value(header.view_idx); + let stored_value = self.builder.get_value(header.builder_idx as usize); let input_value: &[u8] = values.value(i).as_ref(); stored_value == input_value }); @@ -313,10 +313,10 @@ where let value: &[u8] = values.value(i).as_ref(); let payload = make_payload_fn(Some(value)); - let inner_view_idx = self.builder.len(); + let builder_idx = self.builder.len() as u32; let new_header = Entry { view: view_u128, - view_idx: inner_view_idx, + builder_idx, hash, payload, }; @@ -400,17 +400,23 @@ where } /// Entry in the hash table -- see [`ArrowBytesViewMap`] for more details +/// +/// Memory layout optimized: we use u32 for the builder index instead of usize, +/// saving 4 bytes per entry on 64-bit systems. This still supports up to 4 billion +/// distinct values which is sufficient for practical use cases. #[derive(Debug, PartialEq, Eq, Hash, Clone, Copy)] struct Entry where V: Debug + PartialEq + Eq + Clone + Copy + Default, { /// The original u128 view for fast comparison of inline strings (<=12 bytes) - /// and prefix comparison for larger strings + /// and prefix comparison for larger strings. For inline strings, this contains + /// the complete value. For out-of-line strings, bytes 4-7 contain the prefix. view: u128, - /// The idx into the views array - view_idx: usize, + /// Index into the builder array. Uses u32 instead of usize to save 4 bytes + /// per entry while supporting up to 4 billion distinct values. + builder_idx: u32, hash: u64, From b9f7d68c4469a5ee66d0e52a87dac9f82695c07b Mon Sep 17 00:00:00 2001 From: "tushar.das@naada.world" Date: Mon, 26 Jan 2026 01:47:39 +0530 Subject: [PATCH 03/11] perf: store views directly, remove builder_idx from Entry --- .../src/binary_view_map.rs | 221 +++++++++++------- 1 file changed, 143 insertions(+), 78 deletions(-) diff --git a/datafusion/physical-expr-common/src/binary_view_map.rs b/datafusion/physical-expr-common/src/binary_view_map.rs index 8d6a41622aa1d..c103f227ab871 100644 --- a/datafusion/physical-expr-common/src/binary_view_map.rs +++ b/datafusion/physical-expr-common/src/binary_view_map.rs @@ -17,16 +17,16 @@ //! [`ArrowBytesViewMap`] and [`ArrowBytesViewSet`] for storing maps/sets of values from //! `StringViewArray`/`BinaryViewArray`. -//! Much of the code is from `binary_map.rs`, but with simpler implementation because we directly use the -//! [`GenericByteViewBuilder`]. use crate::binary_map::OutputType; use ahash::RandomState; use arrow::array::cast::AsArray; -use arrow::array::{Array, ArrayBuilder, ArrayRef, GenericByteViewBuilder}; +use arrow::array::{Array, ArrayRef, BinaryViewArray, ByteView, make_view}; +use arrow::buffer::{Buffer, NullBuffer, ScalarBuffer}; use arrow::datatypes::{BinaryViewType, ByteViewType, DataType, StringViewType}; use datafusion_common::hash_utils::create_hashes; use datafusion_common::utils::proxy::{HashTableAllocExt, VecAllocExt}; use std::fmt::Debug; +use std::mem::{replace, size_of}; use std::sync::Arc; /// HashSet optimized for storing string or binary values that can produce that @@ -113,6 +113,9 @@ impl ArrowBytesViewSet { /// This map is used by the special `COUNT DISTINCT` aggregate function to /// store the distinct values, and by the `GROUP BY` operator to store /// group values when they are a single string array. +/// Max size of the in-progress buffer before flushing to completed buffers +const BYTE_VIEW_MAX_BLOCK_SIZE: usize = 2 * 1024 * 1024; + pub struct ArrowBytesViewMap where V: Debug + PartialEq + Eq + Clone + Copy + Default, @@ -124,8 +127,15 @@ where /// Total size of the map in bytes map_size: usize, - /// Builder for output array - builder: GenericByteViewBuilder, + /// Views for all stored values (in insertion order) + views: Vec, + /// In-progress buffer for out-of-line string data + in_progress: Vec, + /// Completed buffers containing string data + completed: Vec, + /// Tracks null values (true = null) + nulls: Vec, + /// random state used to generate hashes random_state: RandomState, /// buffer that stores hash values (reused across batches to save allocations) @@ -148,7 +158,10 @@ where output_type, map: hashbrown::hash_table::HashTable::with_capacity(INITIAL_MAP_CAPACITY), map_size: 0, - builder: GenericByteViewBuilder::new(), + views: Vec::new(), + in_progress: Vec::new(), + completed: Vec::new(), + nulls: Vec::new(), random_state: RandomState::new(), hashes_buffer: vec![], null: None, @@ -250,24 +263,25 @@ where // step 2: insert each value into the set, if not already present let values = values.as_byte_view::(); - // Get raw views buffer for direct comparison - this is the key optimization - // Instead of using values.iter() which dereferences each view to bytes, - // we access the raw u128 views directly for fast comparison - let views = values.views(); + // Get raw views buffer for direct comparison + let input_views = values.views(); // Ensure lengths are equivalent - assert_eq!(values.len(), batch_hashes.len()); + assert_eq!(values.len(), self.hashes_buffer.len()); + + for i in 0..values.len() { + let view_u128 = input_views[i]; + let hash = self.hashes_buffer[i]; - for (i, (&view_u128, &hash)) in views.iter().zip(batch_hashes.iter()).enumerate() - { // handle null value via validity bitmap check if !values.is_valid(i) { let payload = if let Some(&(payload, _offset)) = self.null.as_ref() { payload } else { let payload = make_payload_fn(None); - let null_index = self.builder.len(); - self.builder.append_null(); + let null_index = self.views.len(); + self.views.push(0); + self.nulls.push(true); self.null = Some((payload, null_index)); payload }; @@ -278,51 +292,60 @@ where // Extract length from the view (first 4 bytes of u128 in little-endian) let len = (view_u128 & 0xFFFFFFFF) as u32; - let entry = self.map.find_mut(hash, |header| { - if header.hash != hash { - return false; - } - - // Fast path: for inline strings (<=12 bytes), the entire value - // is stored in the u128 view, so we can compare directly - // This avoids the expensive conversion back to bytes - if len <= 12 { - return header.view == view_u128; - } - - // For larger strings: first compare the 4-byte prefix (bytes 4-7 of u128) - // The prefix is stored in the next 4 bytes after length - // Only dereference full bytes if prefixes match - let stored_prefix = ((header.view >> 32) & 0xFFFFFFFF) as u32; - let input_prefix = ((view_u128 >> 32) & 0xFFFFFFFF) as u32; - if stored_prefix != input_prefix { - return false; - } - - // Prefix matched - must compare full bytes - let stored_value = self.builder.get_value(header.builder_idx as usize); - let input_value: &[u8] = values.value(i).as_ref(); - stored_value == input_value - }); - - let payload = if let Some(entry) = entry { - entry.payload + // Check if value already exists + let maybe_payload = { + // Borrow completed and in_progress for comparison + let completed = &self.completed; + let in_progress = &self.in_progress; + + self.map.find(hash, |header| { + if header.hash != hash { + return false; + } + + // Fast path: inline strings can be compared directly + if len <= 12 { + return header.view == view_u128; + } + + // For larger strings: first compare the 4-byte prefix + let stored_prefix = ((header.view >> 32) & 0xFFFFFFFF) as u32; + let input_prefix = ((view_u128 >> 32) & 0xFFFFFFFF) as u32; + if stored_prefix != input_prefix { + return false; + } + + // Prefix matched - compare full bytes + let byte_view = ByteView::from(header.view); + let stored_len = byte_view.length as usize; + let buffer_index = byte_view.buffer_index as usize; + let offset = byte_view.offset as usize; + + let stored_value = if buffer_index < completed.len() { + &completed[buffer_index].as_slice()[offset..offset + stored_len] + } else { + &in_progress[offset..offset + stored_len] + }; + let input_value: &[u8] = values.value(i).as_ref(); + stored_value == input_value + }).map(|entry| entry.payload) + }; + + let payload = if let Some(payload) = maybe_payload { + payload } else { - // no existing value, make a new one. - // Only dereference bytes here when we actually need to insert + // no existing value, make a new one let value: &[u8] = values.value(i).as_ref(); let payload = make_payload_fn(Some(value)); - let builder_idx = self.builder.len() as u32; + // Create view pointing to our buffers + let new_view = self.append_value(value); let new_header = Entry { - view: view_u128, - builder_idx, + view: new_view, hash, payload, }; - self.builder.append_value(value); - self.map .insert_accounted(new_header, |h| h.hash, &mut self.map_size); payload @@ -337,29 +360,65 @@ where /// /// The values are guaranteed to be returned in the same order in which /// they were first seen. - pub fn into_state(self) -> ArrayRef { - let mut builder = self.builder; - match self.output_type { - OutputType::BinaryView => { - let array = builder.finish(); + pub fn into_state(mut self) -> ArrayRef { + // Flush any remaining in-progress buffer + if !self.in_progress.is_empty() { + let flushed = replace(&mut self.in_progress, Vec::new()); + self.completed.push(Buffer::from_vec(flushed)); + } - Arc::new(array) - } + // Build null buffer if we have any nulls + let null_buffer = if self.nulls.iter().any(|&is_null| is_null) { + Some(NullBuffer::from( + self.nulls.iter().map(|&is_null| !is_null).collect::>(), + )) + } else { + None + }; + + let views = ScalarBuffer::from(self.views); + let array = unsafe { + BinaryViewArray::new_unchecked(views, self.completed, null_buffer) + }; + + match self.output_type { + OutputType::BinaryView => Arc::new(array), OutputType::Utf8View => { - // SAFETY: - // we asserted the input arrays were all the correct type and - // thus since all the values that went in were valid (e.g. utf8) - // so are all the values that come out - let array = builder.finish(); + // SAFETY: all input was valid utf8 let array = unsafe { array.to_string_view_unchecked() }; Arc::new(array) } - _ => { - unreachable!("Utf8/Binary should use `ArrowBytesMap`") - } + _ => unreachable!("Utf8/Binary should use `ArrowBytesMap`"), } } + /// Append a value to our buffers and return the view pointing to it + fn append_value(&mut self, value: &[u8]) -> u128 { + let len = value.len(); + let view = if len <= 12 { + make_view(value, 0, 0) + } else { + // Ensure buffer is big enough + if self.in_progress.len() + len > BYTE_VIEW_MAX_BLOCK_SIZE { + let flushed = replace( + &mut self.in_progress, + Vec::with_capacity(BYTE_VIEW_MAX_BLOCK_SIZE), + ); + self.completed.push(Buffer::from_vec(flushed)); + } + + let buffer_index = self.completed.len() as u32; + let offset = self.in_progress.len() as u32; + self.in_progress.extend_from_slice(value); + + make_view(value, buffer_index, offset) + }; + + self.views.push(view); + self.nulls.push(false); + view + } + /// Total number of entries (including null, if present) pub fn len(&self) -> usize { self.non_null_len() + self.null.map(|_| 1).unwrap_or(0) @@ -378,8 +437,16 @@ where /// Return the total size, in bytes, of memory used to store the data in /// this set, not including `self` pub fn size(&self) -> usize { + let views_size = self.views.len() * size_of::(); + let in_progress_size = self.in_progress.capacity(); + let completed_size: usize = self.completed.iter().map(|b| b.len()).sum(); + let nulls_size = self.nulls.len(); + self.map_size - + self.builder.allocated_size() + + views_size + + in_progress_size + + completed_size + + nulls_size + self.hashes_buffer.allocated_size() } } @@ -392,7 +459,8 @@ where f.debug_struct("ArrowBytesMap") .field("map", &"") .field("map_size", &self.map_size) - .field("view_builder", &self.builder) + .field("views_len", &self.views.len()) + .field("completed_buffers", &self.completed.len()) .field("random_state", &self.random_state) .field("hashes_buffer", &self.hashes_buffer) .finish() @@ -401,23 +469,20 @@ where /// Entry in the hash table -- see [`ArrowBytesViewMap`] for more details /// -/// Memory layout optimized: we use u32 for the builder index instead of usize, -/// saving 4 bytes per entry on 64-bit systems. This still supports up to 4 billion -/// distinct values which is sufficient for practical use cases. +/// Stores the view pointing to our internal buffers, eliminating the need +/// for a separate builder index. For inline strings (<=12 bytes), the view +/// contains the entire value. For out-of-line strings, the view contains +/// buffer_index and offset pointing directly to our storage. #[derive(Debug, PartialEq, Eq, Hash, Clone, Copy)] struct Entry where V: Debug + PartialEq + Eq + Clone + Copy + Default, { - /// The original u128 view for fast comparison of inline strings (<=12 bytes) - /// and prefix comparison for larger strings. For inline strings, this contains - /// the complete value. For out-of-line strings, bytes 4-7 contain the prefix. + /// The u128 view pointing to our internal buffers. For inline strings, + /// this contains the complete value. For larger strings, this contains + /// the buffer_index/offset into our completed/in_progress buffers. view: u128, - /// Index into the builder array. Uses u32 instead of usize to save 4 bytes - /// per entry while supporting up to 4 billion distinct values. - builder_idx: u32, - hash: u64, /// value stored by the entry From 52b6f78dc5596711ac6686e8d08ded2398141d36 Mon Sep 17 00:00:00 2001 From: "tushar.das@naada.world" Date: Mon, 26 Jan 2026 12:22:27 +0530 Subject: [PATCH 04/11] use BooleanBufferBuilder for null tracking Per reviewer feedback, replaced Vec with BooleanBufferBuilder for tracking null values. This uses bit-packed storage (1 bit per value) instead of byte-per-value, reducing memory usage by 8x for the null bitmap. Also fixed clippy warnings for mem_replace_with_default. --- .../src/binary_view_map.rs | 101 +++++++++--------- 1 file changed, 51 insertions(+), 50 deletions(-) diff --git a/datafusion/physical-expr-common/src/binary_view_map.rs b/datafusion/physical-expr-common/src/binary_view_map.rs index c103f227ab871..4f8118130c344 100644 --- a/datafusion/physical-expr-common/src/binary_view_map.rs +++ b/datafusion/physical-expr-common/src/binary_view_map.rs @@ -20,13 +20,15 @@ use crate::binary_map::OutputType; use ahash::RandomState; use arrow::array::cast::AsArray; -use arrow::array::{Array, ArrayRef, BinaryViewArray, ByteView, make_view}; +use arrow::array::{ + Array, ArrayRef, BinaryViewArray, BooleanBufferBuilder, ByteView, make_view, +}; use arrow::buffer::{Buffer, NullBuffer, ScalarBuffer}; use arrow::datatypes::{BinaryViewType, ByteViewType, DataType, StringViewType}; use datafusion_common::hash_utils::create_hashes; use datafusion_common::utils::proxy::{HashTableAllocExt, VecAllocExt}; use std::fmt::Debug; -use std::mem::{replace, size_of}; +use std::mem::size_of; use std::sync::Arc; /// HashSet optimized for storing string or binary values that can produce that @@ -133,8 +135,8 @@ where in_progress: Vec, /// Completed buffers containing string data completed: Vec, - /// Tracks null values (true = null) - nulls: Vec, + /// Tracks null values using efficient bit-packed representation + nulls: BooleanBufferBuilder, /// random state used to generate hashes random_state: RandomState, @@ -161,7 +163,7 @@ where views: Vec::new(), in_progress: Vec::new(), completed: Vec::new(), - nulls: Vec::new(), + nulls: BooleanBufferBuilder::new(INITIAL_MAP_CAPACITY), random_state: RandomState::new(), hashes_buffer: vec![], null: None, @@ -281,7 +283,7 @@ where let payload = make_payload_fn(None); let null_index = self.views.len(); self.views.push(0); - self.nulls.push(true); + self.nulls.append(false); // false = null in validity buffer self.null = Some((payload, null_index)); payload }; @@ -298,37 +300,40 @@ where let completed = &self.completed; let in_progress = &self.in_progress; - self.map.find(hash, |header| { - if header.hash != hash { - return false; - } - - // Fast path: inline strings can be compared directly - if len <= 12 { - return header.view == view_u128; - } - - // For larger strings: first compare the 4-byte prefix - let stored_prefix = ((header.view >> 32) & 0xFFFFFFFF) as u32; - let input_prefix = ((view_u128 >> 32) & 0xFFFFFFFF) as u32; - if stored_prefix != input_prefix { - return false; - } - - // Prefix matched - compare full bytes - let byte_view = ByteView::from(header.view); - let stored_len = byte_view.length as usize; - let buffer_index = byte_view.buffer_index as usize; - let offset = byte_view.offset as usize; - - let stored_value = if buffer_index < completed.len() { - &completed[buffer_index].as_slice()[offset..offset + stored_len] - } else { - &in_progress[offset..offset + stored_len] - }; - let input_value: &[u8] = values.value(i).as_ref(); - stored_value == input_value - }).map(|entry| entry.payload) + self.map + .find(hash, |header| { + if header.hash != hash { + return false; + } + + // Fast path: inline strings can be compared directly + if len <= 12 { + return header.view == view_u128; + } + + // For larger strings: first compare the 4-byte prefix + let stored_prefix = ((header.view >> 32) & 0xFFFFFFFF) as u32; + let input_prefix = ((view_u128 >> 32) & 0xFFFFFFFF) as u32; + if stored_prefix != input_prefix { + return false; + } + + // Prefix matched - compare full bytes + let byte_view = ByteView::from(header.view); + let stored_len = byte_view.length as usize; + let buffer_index = byte_view.buffer_index as usize; + let offset = byte_view.offset as usize; + + let stored_value = if buffer_index < completed.len() { + &completed[buffer_index].as_slice() + [offset..offset + stored_len] + } else { + &in_progress[offset..offset + stored_len] + }; + let input_value: &[u8] = values.value(i).as_ref(); + stored_value == input_value + }) + .map(|entry| entry.payload) }; let payload = if let Some(payload) = maybe_payload { @@ -363,23 +368,20 @@ where pub fn into_state(mut self) -> ArrayRef { // Flush any remaining in-progress buffer if !self.in_progress.is_empty() { - let flushed = replace(&mut self.in_progress, Vec::new()); + let flushed = std::mem::take(&mut self.in_progress); self.completed.push(Buffer::from_vec(flushed)); } - // Build null buffer if we have any nulls - let null_buffer = if self.nulls.iter().any(|&is_null| is_null) { - Some(NullBuffer::from( - self.nulls.iter().map(|&is_null| !is_null).collect::>(), - )) + // Build null buffer from the validity bitmap + let null_buffer = if self.null.is_some() { + Some(NullBuffer::new(self.nulls.finish())) } else { None }; let views = ScalarBuffer::from(self.views); - let array = unsafe { - BinaryViewArray::new_unchecked(views, self.completed, null_buffer) - }; + let array = + unsafe { BinaryViewArray::new_unchecked(views, self.completed, null_buffer) }; match self.output_type { OutputType::BinaryView => Arc::new(array), @@ -398,9 +400,8 @@ where let view = if len <= 12 { make_view(value, 0, 0) } else { - // Ensure buffer is big enough if self.in_progress.len() + len > BYTE_VIEW_MAX_BLOCK_SIZE { - let flushed = replace( + let flushed = std::mem::replace( &mut self.in_progress, Vec::with_capacity(BYTE_VIEW_MAX_BLOCK_SIZE), ); @@ -415,7 +416,7 @@ where }; self.views.push(view); - self.nulls.push(false); + self.nulls.append(true); // true = valid (not null) view } @@ -440,7 +441,7 @@ where let views_size = self.views.len() * size_of::(); let in_progress_size = self.in_progress.capacity(); let completed_size: usize = self.completed.iter().map(|b| b.len()).sum(); - let nulls_size = self.nulls.len(); + let nulls_size = self.nulls.len() / 8 + 1; // bit-packed size self.map_size + views_size From 49de0e913c81e5e53dbef5df882d5ba00665ef3c Mon Sep 17 00:00:00 2001 From: "tushar.das@naada.world" Date: Mon, 26 Jan 2026 20:50:35 +0530 Subject: [PATCH 05/11] Apply review suggestions: remove unnecessary & 0xFFFFFFFF masks --- datafusion/catalog-listing/src/table.rs | 27 ++--- .../src/binary_view_map.rs | 103 +++++++++--------- 2 files changed, 65 insertions(+), 65 deletions(-) diff --git a/datafusion/catalog-listing/src/table.rs b/datafusion/catalog-listing/src/table.rs index 38456944075fc..a5aebfa0cbdea 100644 --- a/datafusion/catalog-listing/src/table.rs +++ b/datafusion/catalog-listing/src/table.rs @@ -45,7 +45,7 @@ use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory; use datafusion_physical_expr_common::sort_expr::LexOrdering; use datafusion_physical_plan::ExecutionPlan; use datafusion_physical_plan::empty::EmptyExec; -use futures::{Stream, StreamExt, TryStreamExt, future, stream}; +use futures::{Stream, StreamExt, TryStreamExt, stream}; use object_store::ObjectStore; use std::any::Any; use std::collections::HashMap; @@ -712,20 +712,21 @@ impl ListingTable { }); }; // list files (with partitions) - let file_list = future::try_join_all(self.table_paths.iter().map(|table_path| { - pruned_partition_list( - ctx, - store.as_ref(), - table_path, - filters, - &self.options.file_extension, - &self.options.table_partition_cols, - ) - })) - .await?; let meta_fetch_concurrency = ctx.config_options().execution.meta_fetch_concurrency; - let file_list = stream::iter(file_list).flatten_unordered(meta_fetch_concurrency); + let file_list = stream::iter(self.table_paths.iter()) + .map(|table_path| { + pruned_partition_list( + ctx, + store.as_ref(), + table_path, + filters, + &self.options.file_extension, + &self.options.table_partition_cols, + ) + }) + .buffer_unordered(meta_fetch_concurrency) + .try_flatten_unordered(meta_fetch_concurrency); // collect the statistics and ordering if required by the config let files = file_list .map(|part_file| async { diff --git a/datafusion/physical-expr-common/src/binary_view_map.rs b/datafusion/physical-expr-common/src/binary_view_map.rs index 4f8118130c344..6114fb97d9c79 100644 --- a/datafusion/physical-expr-common/src/binary_view_map.rs +++ b/datafusion/physical-expr-common/src/binary_view_map.rs @@ -20,15 +20,13 @@ use crate::binary_map::OutputType; use ahash::RandomState; use arrow::array::cast::AsArray; -use arrow::array::{ - Array, ArrayRef, BinaryViewArray, BooleanBufferBuilder, ByteView, make_view, -}; +use arrow::array::{Array, ArrayRef, BinaryViewArray, ByteView, make_view}; use arrow::buffer::{Buffer, NullBuffer, ScalarBuffer}; use arrow::datatypes::{BinaryViewType, ByteViewType, DataType, StringViewType}; use datafusion_common::hash_utils::create_hashes; use datafusion_common::utils::proxy::{HashTableAllocExt, VecAllocExt}; use std::fmt::Debug; -use std::mem::size_of; +use std::mem::{replace, size_of}; use std::sync::Arc; /// HashSet optimized for storing string or binary values that can produce that @@ -135,8 +133,8 @@ where in_progress: Vec, /// Completed buffers containing string data completed: Vec, - /// Tracks null values using efficient bit-packed representation - nulls: BooleanBufferBuilder, + /// Tracks null values (true = null) + nulls: Vec, /// random state used to generate hashes random_state: RandomState, @@ -163,7 +161,7 @@ where views: Vec::new(), in_progress: Vec::new(), completed: Vec::new(), - nulls: BooleanBufferBuilder::new(INITIAL_MAP_CAPACITY), + nulls: Vec::new(), random_state: RandomState::new(), hashes_buffer: vec![], null: None, @@ -283,7 +281,7 @@ where let payload = make_payload_fn(None); let null_index = self.views.len(); self.views.push(0); - self.nulls.append(false); // false = null in validity buffer + self.nulls.push(true); self.null = Some((payload, null_index)); payload }; @@ -292,7 +290,7 @@ where } // Extract length from the view (first 4 bytes of u128 in little-endian) - let len = (view_u128 & 0xFFFFFFFF) as u32; + let len = view_u128 as u32; // Check if value already exists let maybe_payload = { @@ -300,40 +298,37 @@ where let completed = &self.completed; let in_progress = &self.in_progress; - self.map - .find(hash, |header| { - if header.hash != hash { - return false; - } - - // Fast path: inline strings can be compared directly - if len <= 12 { - return header.view == view_u128; - } - - // For larger strings: first compare the 4-byte prefix - let stored_prefix = ((header.view >> 32) & 0xFFFFFFFF) as u32; - let input_prefix = ((view_u128 >> 32) & 0xFFFFFFFF) as u32; - if stored_prefix != input_prefix { - return false; - } - - // Prefix matched - compare full bytes - let byte_view = ByteView::from(header.view); - let stored_len = byte_view.length as usize; - let buffer_index = byte_view.buffer_index as usize; - let offset = byte_view.offset as usize; - - let stored_value = if buffer_index < completed.len() { - &completed[buffer_index].as_slice() - [offset..offset + stored_len] - } else { - &in_progress[offset..offset + stored_len] - }; - let input_value: &[u8] = values.value(i).as_ref(); - stored_value == input_value - }) - .map(|entry| entry.payload) + self.map.find(hash, |header| { + if header.hash != hash { + return false; + } + + // Fast path: inline strings can be compared directly + if len <= 12 { + return header.view == view_u128; + } + + // For larger strings: first compare the 4-byte prefix + let stored_prefix = (header.view >> 32) as u32; + let input_prefix = (view_u128 >> 32) as u32; + if stored_prefix != input_prefix { + return false; + } + + // Prefix matched - compare full bytes + let byte_view = ByteView::from(header.view); + let stored_len = byte_view.length as usize; + let buffer_index = byte_view.buffer_index as usize; + let offset = byte_view.offset as usize; + + let stored_value = if buffer_index < completed.len() { + &completed[buffer_index].as_slice()[offset..offset + stored_len] + } else { + &in_progress[offset..offset + stored_len] + }; + let input_value: &[u8] = values.value(i).as_ref(); + stored_value == input_value + }).map(|entry| entry.payload) }; let payload = if let Some(payload) = maybe_payload { @@ -368,20 +363,23 @@ where pub fn into_state(mut self) -> ArrayRef { // Flush any remaining in-progress buffer if !self.in_progress.is_empty() { - let flushed = std::mem::take(&mut self.in_progress); + let flushed = replace(&mut self.in_progress, Vec::new()); self.completed.push(Buffer::from_vec(flushed)); } - // Build null buffer from the validity bitmap - let null_buffer = if self.null.is_some() { - Some(NullBuffer::new(self.nulls.finish())) + // Build null buffer if we have any nulls + let null_buffer = if self.nulls.iter().any(|&is_null| is_null) { + Some(NullBuffer::from( + self.nulls.iter().map(|&is_null| !is_null).collect::>(), + )) } else { None }; let views = ScalarBuffer::from(self.views); - let array = - unsafe { BinaryViewArray::new_unchecked(views, self.completed, null_buffer) }; + let array = unsafe { + BinaryViewArray::new_unchecked(views, self.completed, null_buffer) + }; match self.output_type { OutputType::BinaryView => Arc::new(array), @@ -400,8 +398,9 @@ where let view = if len <= 12 { make_view(value, 0, 0) } else { + // Ensure buffer is big enough if self.in_progress.len() + len > BYTE_VIEW_MAX_BLOCK_SIZE { - let flushed = std::mem::replace( + let flushed = replace( &mut self.in_progress, Vec::with_capacity(BYTE_VIEW_MAX_BLOCK_SIZE), ); @@ -416,7 +415,7 @@ where }; self.views.push(view); - self.nulls.append(true); // true = valid (not null) + self.nulls.push(false); view } @@ -441,7 +440,7 @@ where let views_size = self.views.len() * size_of::(); let in_progress_size = self.in_progress.capacity(); let completed_size: usize = self.completed.iter().map(|b| b.len()).sum(); - let nulls_size = self.nulls.len() / 8 + 1; // bit-packed size + let nulls_size = self.nulls.len(); self.map_size + views_size From 5996423b82f11a9fef93f0538a2c55b3dbc22cca Mon Sep 17 00:00:00 2001 From: "tushar.das@naada.world" Date: Mon, 26 Jan 2026 22:40:08 +0530 Subject: [PATCH 06/11] fix: apply cargo fmt formatting --- .../src/binary_view_map.rs | 75 ++++++++++--------- 1 file changed, 40 insertions(+), 35 deletions(-) diff --git a/datafusion/physical-expr-common/src/binary_view_map.rs b/datafusion/physical-expr-common/src/binary_view_map.rs index 6114fb97d9c79..1f7e544829cd7 100644 --- a/datafusion/physical-expr-common/src/binary_view_map.rs +++ b/datafusion/physical-expr-common/src/binary_view_map.rs @@ -298,37 +298,40 @@ where let completed = &self.completed; let in_progress = &self.in_progress; - self.map.find(hash, |header| { - if header.hash != hash { - return false; - } - - // Fast path: inline strings can be compared directly - if len <= 12 { - return header.view == view_u128; - } - - // For larger strings: first compare the 4-byte prefix - let stored_prefix = (header.view >> 32) as u32; - let input_prefix = (view_u128 >> 32) as u32; - if stored_prefix != input_prefix { - return false; - } - - // Prefix matched - compare full bytes - let byte_view = ByteView::from(header.view); - let stored_len = byte_view.length as usize; - let buffer_index = byte_view.buffer_index as usize; - let offset = byte_view.offset as usize; - - let stored_value = if buffer_index < completed.len() { - &completed[buffer_index].as_slice()[offset..offset + stored_len] - } else { - &in_progress[offset..offset + stored_len] - }; - let input_value: &[u8] = values.value(i).as_ref(); - stored_value == input_value - }).map(|entry| entry.payload) + self.map + .find(hash, |header| { + if header.hash != hash { + return false; + } + + // Fast path: inline strings can be compared directly + if len <= 12 { + return header.view == view_u128; + } + + // For larger strings: first compare the 4-byte prefix + let stored_prefix = (header.view >> 32) as u32; + let input_prefix = (view_u128 >> 32) as u32; + if stored_prefix != input_prefix { + return false; + } + + // Prefix matched - compare full bytes + let byte_view = ByteView::from(header.view); + let stored_len = byte_view.length as usize; + let buffer_index = byte_view.buffer_index as usize; + let offset = byte_view.offset as usize; + + let stored_value = if buffer_index < completed.len() { + &completed[buffer_index].as_slice() + [offset..offset + stored_len] + } else { + &in_progress[offset..offset + stored_len] + }; + let input_value: &[u8] = values.value(i).as_ref(); + stored_value == input_value + }) + .map(|entry| entry.payload) }; let payload = if let Some(payload) = maybe_payload { @@ -370,16 +373,18 @@ where // Build null buffer if we have any nulls let null_buffer = if self.nulls.iter().any(|&is_null| is_null) { Some(NullBuffer::from( - self.nulls.iter().map(|&is_null| !is_null).collect::>(), + self.nulls + .iter() + .map(|&is_null| !is_null) + .collect::>(), )) } else { None }; let views = ScalarBuffer::from(self.views); - let array = unsafe { - BinaryViewArray::new_unchecked(views, self.completed, null_buffer) - }; + let array = + unsafe { BinaryViewArray::new_unchecked(views, self.completed, null_buffer) }; match self.output_type { OutputType::BinaryView => Arc::new(array), From 9c06c58c9cf329a4af86e35d559472f08e8868dd Mon Sep 17 00:00:00 2001 From: "tushar.das@naada.world" Date: Tue, 27 Jan 2026 00:34:54 +0530 Subject: [PATCH 07/11] fix: use std::mem::take per clippy suggestion --- datafusion/physical-expr-common/src/binary_view_map.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/datafusion/physical-expr-common/src/binary_view_map.rs b/datafusion/physical-expr-common/src/binary_view_map.rs index 1f7e544829cd7..2a06f3fbab02e 100644 --- a/datafusion/physical-expr-common/src/binary_view_map.rs +++ b/datafusion/physical-expr-common/src/binary_view_map.rs @@ -26,7 +26,7 @@ use arrow::datatypes::{BinaryViewType, ByteViewType, DataType, StringViewType}; use datafusion_common::hash_utils::create_hashes; use datafusion_common::utils::proxy::{HashTableAllocExt, VecAllocExt}; use std::fmt::Debug; -use std::mem::{replace, size_of}; +use std::mem::size_of; use std::sync::Arc; /// HashSet optimized for storing string or binary values that can produce that @@ -366,7 +366,7 @@ where pub fn into_state(mut self) -> ArrayRef { // Flush any remaining in-progress buffer if !self.in_progress.is_empty() { - let flushed = replace(&mut self.in_progress, Vec::new()); + let flushed = std::mem::take(&mut self.in_progress); self.completed.push(Buffer::from_vec(flushed)); } @@ -405,7 +405,7 @@ where } else { // Ensure buffer is big enough if self.in_progress.len() + len > BYTE_VIEW_MAX_BLOCK_SIZE { - let flushed = replace( + let flushed = std::mem::replace( &mut self.in_progress, Vec::with_capacity(BYTE_VIEW_MAX_BLOCK_SIZE), ); From 9302c3b1ad2942645c9186b53203e954433b17d2 Mon Sep 17 00:00:00 2001 From: "tushar.das@naada.world" Date: Tue, 27 Jan 2026 00:45:59 +0530 Subject: [PATCH 08/11] revert: remove accidental table.rs changes (belongs in separate PR) --- datafusion/catalog-listing/src/table.rs | 27 ++++++++++++------------- 1 file changed, 13 insertions(+), 14 deletions(-) diff --git a/datafusion/catalog-listing/src/table.rs b/datafusion/catalog-listing/src/table.rs index a5aebfa0cbdea..38456944075fc 100644 --- a/datafusion/catalog-listing/src/table.rs +++ b/datafusion/catalog-listing/src/table.rs @@ -45,7 +45,7 @@ use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory; use datafusion_physical_expr_common::sort_expr::LexOrdering; use datafusion_physical_plan::ExecutionPlan; use datafusion_physical_plan::empty::EmptyExec; -use futures::{Stream, StreamExt, TryStreamExt, stream}; +use futures::{Stream, StreamExt, TryStreamExt, future, stream}; use object_store::ObjectStore; use std::any::Any; use std::collections::HashMap; @@ -712,21 +712,20 @@ impl ListingTable { }); }; // list files (with partitions) + let file_list = future::try_join_all(self.table_paths.iter().map(|table_path| { + pruned_partition_list( + ctx, + store.as_ref(), + table_path, + filters, + &self.options.file_extension, + &self.options.table_partition_cols, + ) + })) + .await?; let meta_fetch_concurrency = ctx.config_options().execution.meta_fetch_concurrency; - let file_list = stream::iter(self.table_paths.iter()) - .map(|table_path| { - pruned_partition_list( - ctx, - store.as_ref(), - table_path, - filters, - &self.options.file_extension, - &self.options.table_partition_cols, - ) - }) - .buffer_unordered(meta_fetch_concurrency) - .try_flatten_unordered(meta_fetch_concurrency); + let file_list = stream::iter(file_list).flatten_unordered(meta_fetch_concurrency); // collect the statistics and ordering if required by the config let files = file_list .map(|part_file| async { From 0fc43a443d86843822d87327937b187754a20d58 Mon Sep 17 00:00:00 2001 From: "tushar.das@naada.world" Date: Thu, 29 Jan 2026 03:54:44 +0530 Subject: [PATCH 09/11] perf: Optimize ArrowBytesViewMap with direct view access and single fetch This change optimizes the interning process in ArrowBytesViewMap by:\n1. Performing direct u128 view comparisons for inlined values.\n2. Fetching the input value only once per row to avoid redundant work during hash collisions and insertion.\n3. Addresses feedback from @Dandandan regarding redundant value fetches. --- datafusion/physical-expr-common/src/binary_view_map.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/datafusion/physical-expr-common/src/binary_view_map.rs b/datafusion/physical-expr-common/src/binary_view_map.rs index 2a06f3fbab02e..8bd18b4b617d9 100644 --- a/datafusion/physical-expr-common/src/binary_view_map.rs +++ b/datafusion/physical-expr-common/src/binary_view_map.rs @@ -291,6 +291,7 @@ where // Extract length from the view (first 4 bytes of u128 in little-endian) let len = view_u128 as u32; + let value: &[u8] = values.value(i).as_ref(); // Check if value already exists let maybe_payload = { @@ -328,8 +329,7 @@ where } else { &in_progress[offset..offset + stored_len] }; - let input_value: &[u8] = values.value(i).as_ref(); - stored_value == input_value + stored_value == value }) .map(|entry| entry.payload) }; @@ -338,7 +338,6 @@ where payload } else { // no existing value, make a new one - let value: &[u8] = values.value(i).as_ref(); let payload = make_payload_fn(Some(value)); // Create view pointing to our buffers From 102855a8ffd7daeb64b51a19b8fd7c38b31f1f76 Mon Sep 17 00:00:00 2001 From: "tushar.das@naada.world" Date: Thu, 29 Jan 2026 13:19:49 +0530 Subject: [PATCH 10/11] perf: defer byte slice fetch in ArrowBytesViewMap to avoid unnecessary work for inline strings --- datafusion/physical-expr-common/src/binary_view_map.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/datafusion/physical-expr-common/src/binary_view_map.rs b/datafusion/physical-expr-common/src/binary_view_map.rs index 8bd18b4b617d9..a1721bf1f54d5 100644 --- a/datafusion/physical-expr-common/src/binary_view_map.rs +++ b/datafusion/physical-expr-common/src/binary_view_map.rs @@ -291,9 +291,8 @@ where // Extract length from the view (first 4 bytes of u128 in little-endian) let len = view_u128 as u32; - let value: &[u8] = values.value(i).as_ref(); - // Check if value already exists + let mut value: Option<&[u8]> = None; let maybe_payload = { // Borrow completed and in_progress for comparison let completed = &self.completed; @@ -329,7 +328,8 @@ where } else { &in_progress[offset..offset + stored_len] }; - stored_value == value + let input_value = value.get_or_insert_with(|| values.value(i).as_ref()); + stored_value == *input_value }) .map(|entry| entry.payload) }; @@ -338,6 +338,7 @@ where payload } else { // no existing value, make a new one + let value = value.unwrap_or_else(|| values.value(i).as_ref()); let payload = make_payload_fn(Some(value)); // Create view pointing to our buffers From f105b5328b93e8984b9b97d6347bfec685717cc6 Mon Sep 17 00:00:00 2001 From: "tushar.das@naada.world" Date: Thu, 29 Jan 2026 13:25:25 +0530 Subject: [PATCH 11/11] chore: apply cargo fmt to binary_view_map.rs --- datafusion/physical-expr-common/src/binary_view_map.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/datafusion/physical-expr-common/src/binary_view_map.rs b/datafusion/physical-expr-common/src/binary_view_map.rs index a1721bf1f54d5..4125a372f7218 100644 --- a/datafusion/physical-expr-common/src/binary_view_map.rs +++ b/datafusion/physical-expr-common/src/binary_view_map.rs @@ -328,7 +328,8 @@ where } else { &in_progress[offset..offset + stored_len] }; - let input_value = value.get_or_insert_with(|| values.value(i).as_ref()); + let input_value = + value.get_or_insert_with(|| values.value(i).as_ref()); stored_value == *input_value }) .map(|entry| entry.payload)