diff --git a/datafusion/physical-expr-common/src/binary_view_map.rs b/datafusion/physical-expr-common/src/binary_view_map.rs index abc3e28f82627..566bcea82559f 100644 --- a/datafusion/physical-expr-common/src/binary_view_map.rs +++ b/datafusion/physical-expr-common/src/binary_view_map.rs @@ -33,7 +33,7 @@ use std::sync::Arc; /// HashSet optimized for storing string or binary values that can produce that /// the final set as a `GenericBinaryViewArray` with minimal copies. #[derive(Debug)] -pub struct ArrowBytesViewSet(ArrowBytesViewMap<()>); +pub struct ArrowBytesViewSet(ArrowBytesViewMap); impl ArrowBytesViewSet { pub fn new(output_type: OutputType) -> Self { @@ -42,10 +42,8 @@ impl ArrowBytesViewSet { /// Inserts each value from `values` into the set pub fn insert(&mut self, values: &ArrayRef) { - fn make_payload_fn(_value: Option<&[u8]>) {} - fn observe_payload_fn(_payload: ()) {} self.0 - .insert_if_new(values, make_payload_fn, observe_payload_fn); + .insert_if_new(values, |_value| {}, |_view_index| {}); } /// Return the contents of this map and replace it with a new empty map with @@ -88,14 +86,6 @@ impl ArrowBytesViewSet { /// values that can produce the set of keys on /// output as `GenericBinaryViewArray` without copies. /// -/// Equivalent to `HashSet` but with better performance if you need -/// to emit the keys as an Arrow `StringViewArray` / `BinaryViewArray`. For other -/// purposes it is the same as a `HashMap` -/// -/// # Generic Arguments -/// -/// * `V`: payload type -/// /// # Description /// /// This is a specialized HashMap with the following properties: @@ -108,8 +98,9 @@ impl ArrowBytesViewSet { /// 2. Retains the insertion order of entries in the final array. The values are /// in the same order as they were inserted. /// -/// Note this structure can be used as a `HashSet` by specifying the value type -/// as `()`, as is done by [`ArrowBytesViewSet`]. +/// Each distinct value is assigned a sequential index (its position in the +/// `views` vec). Callers receive this index via the `observe_fn` callback +/// and can use it as a group index, payload, etc. /// /// This map is used by the special `COUNT DISTINCT` aggregate function to /// store the distinct values, and by the `GROUP BY` operator to store @@ -117,14 +108,14 @@ impl ArrowBytesViewSet { /// Max size of the in-progress buffer before flushing to completed buffers const BYTE_VIEW_MAX_BLOCK_SIZE: usize = 2 * 1024 * 1024; -pub struct ArrowBytesViewMap -where - V: Debug + PartialEq + Eq + Clone + Copy + Default, -{ +pub struct ArrowBytesViewMap { /// Should the output be StringView or BinaryView? output_type: OutputType, - /// Underlying hash set for each distinct value - map: hashbrown::hash_table::HashTable>, + /// Underlying hash set for each distinct value. + /// Stores compact entries with a u32 index into `views` instead of + /// the full u128 view, keeping the hash table small and reducing + /// the cost of rehashing when the table grows. + map: hashbrown::hash_table::HashTable, /// Total size of the map in bytes map_size: usize, @@ -141,19 +132,14 @@ where random_state: RandomState, /// buffer that stores hash values (reused across batches to save allocations) hashes_buffer: Vec, - /// `(payload, null_index)` for the 'null' value, if any - /// NOTE null_index is the logical index in the final array, not the index - /// in the buffer - null: Option<(V, usize)>, + /// The index in `views` for the null value, if any + null: Option, } /// The size, in number of entries, of the initial hash table const INITIAL_MAP_CAPACITY: usize = 512; -impl ArrowBytesViewMap -where - V: Debug + PartialEq + Eq + Clone + Copy + Default, -{ +impl ArrowBytesViewMap { pub fn new(output_type: OutputType) -> Self { Self { output_type, @@ -177,57 +163,46 @@ where new_self } - /// Inserts each value from `values` into the map, invoking `payload_fn` for - /// each value if *not* already present, deferring the allocation of the - /// payload until it is needed. + /// Inserts each value from `values` into the map, invoking `new_fn` for + /// each value that is *not* already present, and `observe_fn` for every + /// value with its assigned index. /// - /// Note that this is different than a normal map that would replace the - /// existing entry + /// Each distinct value is assigned a sequential index (its position in + /// the internal `views` vec). This index is stable across calls and is + /// passed to `observe_fn` for every row. /// /// # Arguments: /// - /// `values`: array whose values are inserted - /// - /// `make_payload_fn`: invoked for each value that is not already present - /// to create the payload, in order of the values in `values` - /// - /// `observe_payload_fn`: invoked once, for each value in `values`, that was - /// already present in the map, with corresponding payload value. + /// * `values`: array whose values are inserted /// - /// # Returns + /// * `new_fn`: invoked for each value that is not already present, + /// in order of the values in `values`. Receives the value bytes + /// (`None` for null). Useful for side effects like incrementing a + /// counter. /// - /// The payload value for the entry, either the existing value or - /// the newly inserted value - /// - /// # Safety: - /// - /// Note that `make_payload_fn` and `observe_payload_fn` are only invoked - /// with valid values from `values`, not for the `NULL` value. - pub fn insert_if_new( + /// * `observe_fn`: invoked for every value in `values` with the + /// assigned view index for that value. + pub fn insert_if_new( &mut self, values: &ArrayRef, - make_payload_fn: MP, - observe_payload_fn: OP, + new_fn: NF, + observe_fn: OF, ) where - MP: FnMut(Option<&[u8]>) -> V, - OP: FnMut(V), + NF: FnMut(Option<&[u8]>), + OF: FnMut(usize), { // Sanity check array type match self.output_type { OutputType::BinaryView => { assert!(matches!(values.data_type(), DataType::BinaryView)); - self.insert_if_new_inner::( - values, - make_payload_fn, - observe_payload_fn, + self.insert_if_new_inner::( + values, new_fn, observe_fn, ) } OutputType::Utf8View => { assert!(matches!(values.data_type(), DataType::Utf8View)); - self.insert_if_new_inner::( - values, - make_payload_fn, - observe_payload_fn, + self.insert_if_new_inner::( + values, new_fn, observe_fn, ) } _ => unreachable!("Utf8/Binary should use `ArrowBytesSet`"), @@ -238,18 +213,18 @@ where /// (both StringView and BinaryView) /// /// Note this is the only function that is generic on [`ByteViewType`], which - /// avoids having to template the entire structure, making the code - /// simpler and understand and reducing code bloat due to duplication. + /// avoids having to template the entire structure, making the code + /// simpler to understand and reducing code bloat due to duplication. /// /// See comments on `insert_if_new` for more details - fn insert_if_new_inner( + fn insert_if_new_inner( &mut self, values: &ArrayRef, - mut make_payload_fn: MP, - mut observe_payload_fn: OP, + mut new_fn: NF, + mut observe_fn: OF, ) where - MP: FnMut(Option<&[u8]>) -> V, - OP: FnMut(V), + NF: FnMut(Option<&[u8]>), + OF: FnMut(usize), B: ByteViewType, { // step 1: compute hashes @@ -270,32 +245,64 @@ where // Ensure lengths are equivalent assert_eq!(values.len(), self.hashes_buffer.len()); + // Fast-path caches to skip hash table probes for duplicate values. + // + // 1. `empty_index`: caches the view index for the empty string + // (view == 0). Catches *all* empty strings even when + // non-consecutive. Very effective when one value dominates + // (e.g. ClickBench SearchPhrase is ~90% empty strings). + // + // 2. `last_view` / `last_index`: caches the most recently seen + // value. Catches consecutive runs of any repeated value. + // For inline strings (len <= 12) the u128 view uniquely + // identifies the value. For non-inline strings within the + // same input array, matching views means identical + // buffer_index + offset + length, i.e. the same bytes. + let mut empty_index: Option = None; + let mut last_view: u128 = u128::MAX; // impossible: len would be u32::MAX + let mut last_index: usize = 0; + for i in 0..values.len() { let view_u128 = input_views[i]; let hash = self.hashes_buffer[i]; // handle null value via validity bitmap check if values.is_null(i) { - let payload = if let Some(&(payload, _offset)) = self.null.as_ref() { - payload + let view_index = if let Some(null_index) = self.null { + null_index } else { - let payload = make_payload_fn(None); + new_fn(None); let null_index = self.views.len(); self.views.push(0); self.nulls.append_null(); - self.null = Some((payload, null_index)); - payload + self.null = Some(null_index); + null_index }; - observe_payload_fn(payload); + observe_fn(view_index); continue; } // Extract length from the view (first 4 bytes of u128 in little-endian) let len = view_u128 as u32; + // Fast path 1: empty string (test-for-zero, ~1 cycle) + if view_u128 == 0 { + if let Some(idx) = empty_index { + observe_fn(idx); + continue; + } + } + + // Fast path 2: consecutive duplicate of any value + if view_u128 == last_view { + observe_fn(last_index); + continue; + } + // Check if value already exists - let maybe_payload = { - // Borrow completed and in_progress for comparison + let maybe_index = { + // Borrow fields needed by the find closure + let views = &self.views; let completed = &self.completed; let in_progress = &self.in_progress; @@ -305,20 +312,22 @@ where return false; } + let stored_view = views[header.view_index as usize]; + // Fast path: inline strings can be compared directly if len <= 12 { - return header.view == view_u128; + return stored_view == view_u128; } // For larger strings: first compare the 4-byte prefix - let stored_prefix = (header.view >> 32) as u32; + let stored_prefix = (stored_view >> 32) as u32; let input_prefix = (view_u128 >> 32) as u32; if stored_prefix != input_prefix { return false; } // Prefix matched - compare full bytes - let byte_view = ByteView::from(header.view); + let byte_view = ByteView::from(stored_view); let stored_len = byte_view.length as usize; let buffer_index = byte_view.buffer_index as usize; let offset = byte_view.offset as usize; @@ -332,29 +341,35 @@ where let input_value: &[u8] = values.value(i).as_ref(); stored_value == input_value }) - .map(|entry| entry.payload) + .map(|entry| entry.view_index as usize) }; - let payload = if let Some(payload) = maybe_payload { - payload + let view_index = if let Some(idx) = maybe_index { + idx } else { // no existing value, make a new one let value: &[u8] = values.value(i).as_ref(); - let payload = make_payload_fn(Some(value)); + new_fn(Some(value)); // Create view pointing to our buffers - let new_view = self.append_value(value); - let new_header = Entry { - view: new_view, + self.append_value(value); + let view_index = self.views.len() - 1; + let new_entry = Entry { + view_index: view_index as u32, hash, - payload, }; self.map - .insert_accounted(new_header, |h| h.hash, &mut self.map_size); - payload + .insert_accounted(new_entry, |h| h.hash, &mut self.map_size); + view_index }; - observe_payload_fn(payload); + observe_fn(view_index); + + if view_u128 == 0 { + empty_index = Some(view_index); + } + last_view = view_u128; + last_index = view_index; } } @@ -448,12 +463,9 @@ where } } -impl Debug for ArrowBytesViewMap -where - V: Debug + PartialEq + Eq + Clone + Copy + Default, -{ +impl Debug for ArrowBytesViewMap { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("ArrowBytesMap") + f.debug_struct("ArrowBytesViewMap") .field("map", &"") .field("map_size", &self.map_size) .field("views_len", &self.views.len()) @@ -464,26 +476,17 @@ where } } -/// Entry in the hash table -- see [`ArrowBytesViewMap`] for more details +/// Compact entry in the hash table -- see [`ArrowBytesViewMap`] for more details. /// -/// Stores the view pointing to our internal buffers, eliminating the need -/// for a separate builder index. For inline strings (<=12 bytes), the view -/// contains the entire value. For out-of-line strings, the view contains -/// buffer_index and offset pointing directly to our storage. +/// Stores a u32 index into the external `views` vec rather than the full +/// u128 view. This keeps hash table entries small (12 bytes vs the +/// original 32), reducing memory usage and the cost of rehashing/resizing. #[derive(Debug, PartialEq, Eq, Hash, Clone, Copy)] -struct Entry -where - V: Debug + PartialEq + Eq + Clone + Copy + Default, -{ - /// The u128 view pointing to our internal buffers. For inline strings, - /// this contains the complete value. For larger strings, this contains - /// the buffer_index/offset into our completed/in_progress buffers. - view: u128, +struct Entry { + /// Index into `ArrowBytesViewMap::views` + view_index: u32, hash: u64, - - /// value stored by the entry - payload: V, } #[cfg(test)] @@ -678,15 +681,9 @@ mod tests { assert_eq!(set.len(), 10); } - #[derive(Debug, PartialEq, Eq, Default, Clone, Copy)] - struct TestPayload { - // store the string value to check against input - index: usize, // store the index of the string (each new string gets the next sequential input) - } - /// Wraps an [`ArrowBytesViewMap`], validating its invariants struct TestMap { - map: ArrowBytesViewMap, + map: ArrowBytesViewMap, // stores distinct strings seen, in order strings: Vec>, // map strings to index in strings @@ -709,7 +706,6 @@ mod tests { let string_array = StringViewArray::from(strings.to_vec()); let arr: ArrayRef = Arc::new(string_array); - let mut next_index = self.indexes.len(); let mut actual_new_strings = vec![]; let mut actual_seen_indexes = vec![]; // update self with new values, keeping track of newly added values @@ -733,13 +729,10 @@ mod tests { |s| { let value = s .map(|s| String::from_utf8(s.to_vec()).expect("Non utf8 string")); - let index = next_index; - next_index += 1; seen_new_strings.push(value); - TestPayload { index } }, - |payload| { - seen_indexes.push(payload.index); + |view_index| { + seen_indexes.push(view_index); }, ); diff --git a/datafusion/physical-plan/src/aggregates/group_values/single_group_by/bytes_view.rs b/datafusion/physical-plan/src/aggregates/group_values/single_group_by/bytes_view.rs index 7a56f7c52c11a..93603979399f8 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/single_group_by/bytes_view.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/single_group_by/bytes_view.rs @@ -28,7 +28,7 @@ use std::mem::size_of; /// purpose `Row`s format pub struct GroupValuesBytesView { /// Map string/binary values to group index - map: ArrowBytesViewMap, + map: ArrowBytesViewMap, /// The total number of groups so far (used to assign group_index) num_groups: usize, } @@ -58,12 +58,9 @@ impl GroupValues for GroupValuesBytesView { arr, // called for each new group |_value| { - // assign new group index on each insert - let group_idx = self.num_groups; self.num_groups += 1; - group_idx }, - // called for each group + // called for each value with its view index (= group index) |group_idx| { groups.push(group_idx); },