From 9db8715b124d9118d7b5dc8fb2dadd9d57808a65 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Tue, 14 Apr 2026 09:33:38 +0200 Subject: [PATCH 1/3] Skip hash table probe for consecutive duplicate values in ArrowBytesViewMap Add a last-value cache in insert_if_new_inner that skips the hash table probe when the current view matches the previous one. For inline strings (<=12 bytes), matching views guarantees equal values. For non-inline strings within the same input array, matching views means identical buffer_index + offset, so they point to the same bytes. This is effective for workloads with repeated values such as ClickBench query 5 (COUNT(DISTINCT SearchPhrase)) where ~90% of rows are empty strings. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../src/binary_view_map.rs | 20 +++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/datafusion/physical-expr-common/src/binary_view_map.rs b/datafusion/physical-expr-common/src/binary_view_map.rs index abc3e28f82627..2cfafe412b2be 100644 --- a/datafusion/physical-expr-common/src/binary_view_map.rs +++ b/datafusion/physical-expr-common/src/binary_view_map.rs @@ -270,6 +270,16 @@ where // Ensure lengths are equivalent assert_eq!(values.len(), self.hashes_buffer.len()); + // Track the last seen value to skip hash table probes for + // consecutive duplicates. For inline strings (len <= 12), the u128 + // view uniquely identifies the value. For non-inline strings, + // matching views within the same input array means identical + // buffer_index + offset + length, so the bytes are the same. + // This is highly effective for workloads with repeated values + // (e.g. ClickBench SearchPhrase is ~90% empty strings). + let mut last_view: u128 = u128::MAX; // impossible: len would be u32::MAX + let mut last_payload: V = V::default(); + for i in 0..values.len() { let view_u128 = input_views[i]; let hash = self.hashes_buffer[i]; @@ -293,6 +303,13 @@ where // Extract length from the view (first 4 bytes of u128 in little-endian) let len = view_u128 as u32; + // Fast path: if the view matches the last seen value we know + // it is the same string and can skip the hash table probe. + if view_u128 == last_view { + observe_payload_fn(last_payload); + continue; + } + // Check if value already exists let maybe_payload = { // Borrow completed and in_progress for comparison @@ -355,6 +372,9 @@ where payload }; observe_payload_fn(payload); + + last_view = view_u128; + last_payload = payload; } } From 0e94b9a5dbe67628b315c4f7e13cd885c82e9a4a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Tue, 14 Apr 2026 09:48:45 +0200 Subject: [PATCH 2/3] Skip hash table probe for consecutive duplicate values in ArrowBytesViewMap Add two fast-path caches in insert_if_new_inner that skip hash table probes for duplicate values: 1. Empty-string cache: catches *all* empty strings (view == 0) even when non-consecutive. Comparing against zero is essentially free. 2. Last-value cache: catches consecutive runs of any repeated value. For inline strings (<=12 bytes) matching views guarantees equal values. For non-inline strings within the same input array, matching views means identical buffer_index + offset, i.e. the same bytes. This is effective for workloads with repeated values such as ClickBench query 5 (COUNT(DISTINCT SearchPhrase)) where ~90% of rows are empty strings. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../src/binary_view_map.rs | 35 ++++++++++++++----- 1 file changed, 26 insertions(+), 9 deletions(-) diff --git a/datafusion/physical-expr-common/src/binary_view_map.rs b/datafusion/physical-expr-common/src/binary_view_map.rs index 2cfafe412b2be..441575adc8711 100644 --- a/datafusion/physical-expr-common/src/binary_view_map.rs +++ b/datafusion/physical-expr-common/src/binary_view_map.rs @@ -270,13 +270,20 @@ where // Ensure lengths are equivalent assert_eq!(values.len(), self.hashes_buffer.len()); - // Track the last seen value to skip hash table probes for - // consecutive duplicates. For inline strings (len <= 12), the u128 - // view uniquely identifies the value. For non-inline strings, - // matching views within the same input array means identical - // buffer_index + offset + length, so the bytes are the same. - // This is highly effective for workloads with repeated values - // (e.g. ClickBench SearchPhrase is ~90% empty strings). + // Fast-path caches to skip hash table probes for duplicate values. + // + // 1. `empty_payload`: caches the payload for the empty string + // (view == 0). Catches *all* empty strings even when + // non-consecutive. Very effective when one value dominates + // (e.g. ClickBench SearchPhrase is ~90% empty strings). + // + // 2. `last_view` / `last_payload`: caches the most recently seen + // value. Catches consecutive runs of any repeated value. + // For inline strings (len <= 12) the u128 view uniquely + // identifies the value. For non-inline strings within the + // same input array, matching views means identical + // buffer_index + offset + length, i.e. the same bytes. + let mut empty_payload: Option = None; let mut last_view: u128 = u128::MAX; // impossible: len would be u32::MAX let mut last_payload: V = V::default(); @@ -303,8 +310,15 @@ where // Extract length from the view (first 4 bytes of u128 in little-endian) let len = view_u128 as u32; - // Fast path: if the view matches the last seen value we know - // it is the same string and can skip the hash table probe. + // Fast path 1: empty string (test-for-zero, ~1 cycle) + if view_u128 == 0 { + if let Some(payload) = empty_payload { + observe_payload_fn(payload); + continue; + } + } + + // Fast path 2: consecutive duplicate of any value if view_u128 == last_view { observe_payload_fn(last_payload); continue; @@ -373,6 +387,9 @@ where }; observe_payload_fn(payload); + if view_u128 == 0 { + empty_payload = Some(payload); + } last_view = view_u128; last_payload = payload; } From ec94be3d29f1fe8413ea076e811d92144beddd44 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Tue, 14 Apr 2026 10:23:16 +0200 Subject: [PATCH 3/3] Shrink hash table entries in ArrowBytesViewMap by removing payload and view Replace the full u128 view + payload in each hash table entry with a compact u32 view_index. The view is already stored in the `views` vec, and the payload (group index) always equals the view index for all callers. This shrinks each entry from 32 bytes to 12 bytes, reducing memory usage and halving the cost of rehashing when the hash table grows. The ArrowBytesViewMap is no longer generic over a payload type V. Instead, the observe callback receives the view index (usize) directly, which callers use as the group index. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../src/binary_view_map.rs | 222 +++++++----------- .../single_group_by/bytes_view.rs | 7 +- 2 files changed, 91 insertions(+), 138 deletions(-) diff --git a/datafusion/physical-expr-common/src/binary_view_map.rs b/datafusion/physical-expr-common/src/binary_view_map.rs index 441575adc8711..566bcea82559f 100644 --- a/datafusion/physical-expr-common/src/binary_view_map.rs +++ b/datafusion/physical-expr-common/src/binary_view_map.rs @@ -33,7 +33,7 @@ use std::sync::Arc; /// HashSet optimized for storing string or binary values that can produce that /// the final set as a `GenericBinaryViewArray` with minimal copies. #[derive(Debug)] -pub struct ArrowBytesViewSet(ArrowBytesViewMap<()>); +pub struct ArrowBytesViewSet(ArrowBytesViewMap); impl ArrowBytesViewSet { pub fn new(output_type: OutputType) -> Self { @@ -42,10 +42,8 @@ impl ArrowBytesViewSet { /// Inserts each value from `values` into the set pub fn insert(&mut self, values: &ArrayRef) { - fn make_payload_fn(_value: Option<&[u8]>) {} - fn observe_payload_fn(_payload: ()) {} self.0 - .insert_if_new(values, make_payload_fn, observe_payload_fn); + .insert_if_new(values, |_value| {}, |_view_index| {}); } /// Return the contents of this map and replace it with a new empty map with @@ -88,14 +86,6 @@ impl ArrowBytesViewSet { /// values that can produce the set of keys on /// output as `GenericBinaryViewArray` without copies. /// -/// Equivalent to `HashSet` but with better performance if you need -/// to emit the keys as an Arrow `StringViewArray` / `BinaryViewArray`. For other -/// purposes it is the same as a `HashMap` -/// -/// # Generic Arguments -/// -/// * `V`: payload type -/// /// # Description /// /// This is a specialized HashMap with the following properties: @@ -108,8 +98,9 @@ impl ArrowBytesViewSet { /// 2. Retains the insertion order of entries in the final array. The values are /// in the same order as they were inserted. /// -/// Note this structure can be used as a `HashSet` by specifying the value type -/// as `()`, as is done by [`ArrowBytesViewSet`]. +/// Each distinct value is assigned a sequential index (its position in the +/// `views` vec). Callers receive this index via the `observe_fn` callback +/// and can use it as a group index, payload, etc. /// /// This map is used by the special `COUNT DISTINCT` aggregate function to /// store the distinct values, and by the `GROUP BY` operator to store @@ -117,14 +108,14 @@ impl ArrowBytesViewSet { /// Max size of the in-progress buffer before flushing to completed buffers const BYTE_VIEW_MAX_BLOCK_SIZE: usize = 2 * 1024 * 1024; -pub struct ArrowBytesViewMap -where - V: Debug + PartialEq + Eq + Clone + Copy + Default, -{ +pub struct ArrowBytesViewMap { /// Should the output be StringView or BinaryView? output_type: OutputType, - /// Underlying hash set for each distinct value - map: hashbrown::hash_table::HashTable>, + /// Underlying hash set for each distinct value. + /// Stores compact entries with a u32 index into `views` instead of + /// the full u128 view, keeping the hash table small and reducing + /// the cost of rehashing when the table grows. + map: hashbrown::hash_table::HashTable, /// Total size of the map in bytes map_size: usize, @@ -141,19 +132,14 @@ where random_state: RandomState, /// buffer that stores hash values (reused across batches to save allocations) hashes_buffer: Vec, - /// `(payload, null_index)` for the 'null' value, if any - /// NOTE null_index is the logical index in the final array, not the index - /// in the buffer - null: Option<(V, usize)>, + /// The index in `views` for the null value, if any + null: Option, } /// The size, in number of entries, of the initial hash table const INITIAL_MAP_CAPACITY: usize = 512; -impl ArrowBytesViewMap -where - V: Debug + PartialEq + Eq + Clone + Copy + Default, -{ +impl ArrowBytesViewMap { pub fn new(output_type: OutputType) -> Self { Self { output_type, @@ -177,57 +163,46 @@ where new_self } - /// Inserts each value from `values` into the map, invoking `payload_fn` for - /// each value if *not* already present, deferring the allocation of the - /// payload until it is needed. + /// Inserts each value from `values` into the map, invoking `new_fn` for + /// each value that is *not* already present, and `observe_fn` for every + /// value with its assigned index. /// - /// Note that this is different than a normal map that would replace the - /// existing entry + /// Each distinct value is assigned a sequential index (its position in + /// the internal `views` vec). This index is stable across calls and is + /// passed to `observe_fn` for every row. /// /// # Arguments: /// - /// `values`: array whose values are inserted - /// - /// `make_payload_fn`: invoked for each value that is not already present - /// to create the payload, in order of the values in `values` - /// - /// `observe_payload_fn`: invoked once, for each value in `values`, that was - /// already present in the map, with corresponding payload value. - /// - /// # Returns + /// * `values`: array whose values are inserted /// - /// The payload value for the entry, either the existing value or - /// the newly inserted value + /// * `new_fn`: invoked for each value that is not already present, + /// in order of the values in `values`. Receives the value bytes + /// (`None` for null). Useful for side effects like incrementing a + /// counter. /// - /// # Safety: - /// - /// Note that `make_payload_fn` and `observe_payload_fn` are only invoked - /// with valid values from `values`, not for the `NULL` value. - pub fn insert_if_new( + /// * `observe_fn`: invoked for every value in `values` with the + /// assigned view index for that value. + pub fn insert_if_new( &mut self, values: &ArrayRef, - make_payload_fn: MP, - observe_payload_fn: OP, + new_fn: NF, + observe_fn: OF, ) where - MP: FnMut(Option<&[u8]>) -> V, - OP: FnMut(V), + NF: FnMut(Option<&[u8]>), + OF: FnMut(usize), { // Sanity check array type match self.output_type { OutputType::BinaryView => { assert!(matches!(values.data_type(), DataType::BinaryView)); - self.insert_if_new_inner::( - values, - make_payload_fn, - observe_payload_fn, + self.insert_if_new_inner::( + values, new_fn, observe_fn, ) } OutputType::Utf8View => { assert!(matches!(values.data_type(), DataType::Utf8View)); - self.insert_if_new_inner::( - values, - make_payload_fn, - observe_payload_fn, + self.insert_if_new_inner::( + values, new_fn, observe_fn, ) } _ => unreachable!("Utf8/Binary should use `ArrowBytesSet`"), @@ -238,18 +213,18 @@ where /// (both StringView and BinaryView) /// /// Note this is the only function that is generic on [`ByteViewType`], which - /// avoids having to template the entire structure, making the code - /// simpler and understand and reducing code bloat due to duplication. + /// avoids having to template the entire structure, making the code + /// simpler to understand and reducing code bloat due to duplication. /// /// See comments on `insert_if_new` for more details - fn insert_if_new_inner( + fn insert_if_new_inner( &mut self, values: &ArrayRef, - mut make_payload_fn: MP, - mut observe_payload_fn: OP, + mut new_fn: NF, + mut observe_fn: OF, ) where - MP: FnMut(Option<&[u8]>) -> V, - OP: FnMut(V), + NF: FnMut(Option<&[u8]>), + OF: FnMut(usize), B: ByteViewType, { // step 1: compute hashes @@ -272,20 +247,20 @@ where // Fast-path caches to skip hash table probes for duplicate values. // - // 1. `empty_payload`: caches the payload for the empty string + // 1. `empty_index`: caches the view index for the empty string // (view == 0). Catches *all* empty strings even when // non-consecutive. Very effective when one value dominates // (e.g. ClickBench SearchPhrase is ~90% empty strings). // - // 2. `last_view` / `last_payload`: caches the most recently seen + // 2. `last_view` / `last_index`: caches the most recently seen // value. Catches consecutive runs of any repeated value. // For inline strings (len <= 12) the u128 view uniquely // identifies the value. For non-inline strings within the // same input array, matching views means identical // buffer_index + offset + length, i.e. the same bytes. - let mut empty_payload: Option = None; + let mut empty_index: Option = None; let mut last_view: u128 = u128::MAX; // impossible: len would be u32::MAX - let mut last_payload: V = V::default(); + let mut last_index: usize = 0; for i in 0..values.len() { let view_u128 = input_views[i]; @@ -293,17 +268,17 @@ where // handle null value via validity bitmap check if values.is_null(i) { - let payload = if let Some(&(payload, _offset)) = self.null.as_ref() { - payload + let view_index = if let Some(null_index) = self.null { + null_index } else { - let payload = make_payload_fn(None); + new_fn(None); let null_index = self.views.len(); self.views.push(0); self.nulls.append_null(); - self.null = Some((payload, null_index)); - payload + self.null = Some(null_index); + null_index }; - observe_payload_fn(payload); + observe_fn(view_index); continue; } @@ -312,21 +287,22 @@ where // Fast path 1: empty string (test-for-zero, ~1 cycle) if view_u128 == 0 { - if let Some(payload) = empty_payload { - observe_payload_fn(payload); + if let Some(idx) = empty_index { + observe_fn(idx); continue; } } // Fast path 2: consecutive duplicate of any value if view_u128 == last_view { - observe_payload_fn(last_payload); + observe_fn(last_index); continue; } // Check if value already exists - let maybe_payload = { - // Borrow completed and in_progress for comparison + let maybe_index = { + // Borrow fields needed by the find closure + let views = &self.views; let completed = &self.completed; let in_progress = &self.in_progress; @@ -336,20 +312,22 @@ where return false; } + let stored_view = views[header.view_index as usize]; + // Fast path: inline strings can be compared directly if len <= 12 { - return header.view == view_u128; + return stored_view == view_u128; } // For larger strings: first compare the 4-byte prefix - let stored_prefix = (header.view >> 32) as u32; + let stored_prefix = (stored_view >> 32) as u32; let input_prefix = (view_u128 >> 32) as u32; if stored_prefix != input_prefix { return false; } // Prefix matched - compare full bytes - let byte_view = ByteView::from(header.view); + let byte_view = ByteView::from(stored_view); let stored_len = byte_view.length as usize; let buffer_index = byte_view.buffer_index as usize; let offset = byte_view.offset as usize; @@ -363,35 +341,35 @@ where let input_value: &[u8] = values.value(i).as_ref(); stored_value == input_value }) - .map(|entry| entry.payload) + .map(|entry| entry.view_index as usize) }; - let payload = if let Some(payload) = maybe_payload { - payload + let view_index = if let Some(idx) = maybe_index { + idx } else { // no existing value, make a new one let value: &[u8] = values.value(i).as_ref(); - let payload = make_payload_fn(Some(value)); + new_fn(Some(value)); // Create view pointing to our buffers - let new_view = self.append_value(value); - let new_header = Entry { - view: new_view, + self.append_value(value); + let view_index = self.views.len() - 1; + let new_entry = Entry { + view_index: view_index as u32, hash, - payload, }; self.map - .insert_accounted(new_header, |h| h.hash, &mut self.map_size); - payload + .insert_accounted(new_entry, |h| h.hash, &mut self.map_size); + view_index }; - observe_payload_fn(payload); + observe_fn(view_index); if view_u128 == 0 { - empty_payload = Some(payload); + empty_index = Some(view_index); } last_view = view_u128; - last_payload = payload; + last_index = view_index; } } @@ -485,12 +463,9 @@ where } } -impl Debug for ArrowBytesViewMap -where - V: Debug + PartialEq + Eq + Clone + Copy + Default, -{ +impl Debug for ArrowBytesViewMap { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("ArrowBytesMap") + f.debug_struct("ArrowBytesViewMap") .field("map", &"") .field("map_size", &self.map_size) .field("views_len", &self.views.len()) @@ -501,26 +476,17 @@ where } } -/// Entry in the hash table -- see [`ArrowBytesViewMap`] for more details +/// Compact entry in the hash table -- see [`ArrowBytesViewMap`] for more details. /// -/// Stores the view pointing to our internal buffers, eliminating the need -/// for a separate builder index. For inline strings (<=12 bytes), the view -/// contains the entire value. For out-of-line strings, the view contains -/// buffer_index and offset pointing directly to our storage. +/// Stores a u32 index into the external `views` vec rather than the full +/// u128 view. This keeps hash table entries small (12 bytes vs the +/// original 32), reducing memory usage and the cost of rehashing/resizing. #[derive(Debug, PartialEq, Eq, Hash, Clone, Copy)] -struct Entry -where - V: Debug + PartialEq + Eq + Clone + Copy + Default, -{ - /// The u128 view pointing to our internal buffers. For inline strings, - /// this contains the complete value. For larger strings, this contains - /// the buffer_index/offset into our completed/in_progress buffers. - view: u128, +struct Entry { + /// Index into `ArrowBytesViewMap::views` + view_index: u32, hash: u64, - - /// value stored by the entry - payload: V, } #[cfg(test)] @@ -715,15 +681,9 @@ mod tests { assert_eq!(set.len(), 10); } - #[derive(Debug, PartialEq, Eq, Default, Clone, Copy)] - struct TestPayload { - // store the string value to check against input - index: usize, // store the index of the string (each new string gets the next sequential input) - } - /// Wraps an [`ArrowBytesViewMap`], validating its invariants struct TestMap { - map: ArrowBytesViewMap, + map: ArrowBytesViewMap, // stores distinct strings seen, in order strings: Vec>, // map strings to index in strings @@ -746,7 +706,6 @@ mod tests { let string_array = StringViewArray::from(strings.to_vec()); let arr: ArrayRef = Arc::new(string_array); - let mut next_index = self.indexes.len(); let mut actual_new_strings = vec![]; let mut actual_seen_indexes = vec![]; // update self with new values, keeping track of newly added values @@ -770,13 +729,10 @@ mod tests { |s| { let value = s .map(|s| String::from_utf8(s.to_vec()).expect("Non utf8 string")); - let index = next_index; - next_index += 1; seen_new_strings.push(value); - TestPayload { index } }, - |payload| { - seen_indexes.push(payload.index); + |view_index| { + seen_indexes.push(view_index); }, ); diff --git a/datafusion/physical-plan/src/aggregates/group_values/single_group_by/bytes_view.rs b/datafusion/physical-plan/src/aggregates/group_values/single_group_by/bytes_view.rs index 7a56f7c52c11a..93603979399f8 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/single_group_by/bytes_view.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/single_group_by/bytes_view.rs @@ -28,7 +28,7 @@ use std::mem::size_of; /// purpose `Row`s format pub struct GroupValuesBytesView { /// Map string/binary values to group index - map: ArrowBytesViewMap, + map: ArrowBytesViewMap, /// The total number of groups so far (used to assign group_index) num_groups: usize, } @@ -58,12 +58,9 @@ impl GroupValues for GroupValuesBytesView { arr, // called for each new group |_value| { - // assign new group index on each insert - let group_idx = self.num_groups; self.num_groups += 1; - group_idx }, - // called for each group + // called for each value with its view index (= group index) |group_idx| { groups.push(group_idx); },