From 545b43a29428e56673fa6512a881f27c97167443 Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Thu, 7 May 2026 14:35:07 +0200 Subject: [PATCH 1/5] timely-util: vendor MergeBatcher from differential-dataflow Copy `MergeBatcher`, `Merger`, `InternalMerge`, `InternalMerger`, and `VecMerger` from `differential_dataflow::trace::implementations::merge_batcher` into `mz_timely_util::merge_batcher`. The next commit layers temporal bucketing on top of this vendored skeleton; vendoring it gives us control over the chain representation without forking differential. Verbatim port apart from minor module wiring; no behavior change. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/timely-util/src/lib.rs | 1 + src/timely-util/src/merge_batcher.rs | 777 +++++++++++++++++++++++++++ 2 files changed, 778 insertions(+) create mode 100644 src/timely-util/src/merge_batcher.rs diff --git a/src/timely-util/src/lib.rs b/src/timely-util/src/lib.rs index 57474b58cea34..a655b17a495e5 100644 --- a/src/timely-util/src/lib.rs +++ b/src/timely-util/src/lib.rs @@ -22,6 +22,7 @@ pub mod capture; pub mod columnar; pub mod columnation; pub mod containers; +pub mod merge_batcher; pub mod operator; pub mod order; pub mod pact; diff --git a/src/timely-util/src/merge_batcher.rs b/src/timely-util/src/merge_batcher.rs new file mode 100644 index 0000000000000..f95fbbb693879 --- /dev/null +++ b/src/timely-util/src/merge_batcher.rs @@ -0,0 +1,777 @@ +// Copyright 2015–2024 The Differential Dataflow contributors +// Copyright Materialize, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. +// +// Portions of this file are derived from differential-dataflow +// (https://github.com/TimelyDataflow/differential-dataflow), licensed +// under the MIT License. + +//! A `Batcher` implementation based on merge sort. +//! +//! Vendored from `differential_dataflow::trace::implementations::merge_batcher` +//! so that we can layer temporal bucketing on top of the merge batcher's +//! internal chain representation. +//! +//! The `MergeBatcher` requires support from two types, a "chunker" and a "merger". +//! The chunker receives input batches and consolidates them, producing sorted output +//! "chunks" that are fully consolidated (no adjacent updates can be accumulated). +//! The merger implements the [`Merger`] trait, and provides hooks for manipulating +//! sorted "chains" of chunks as needed by the merge batcher: merging chunks and also +//! splitting them apart based on time. + +use std::marker::PhantomData; + +use differential_dataflow::logging::{BatcherEvent, Logger}; +use differential_dataflow::trace::{Batcher, Builder, Description}; +use timely::container::{ContainerBuilder, PushInto}; +use timely::progress::frontier::AntichainRef; +use timely::progress::{Timestamp, frontier::Antichain}; + +/// Creates batches from containers of unordered tuples. +/// +/// To implement `Batcher`, the container builder `C` must accept `&mut Input` as inputs, +/// and must produce outputs of type `M::Chunk`. +pub struct MergeBatcher { + /// Transforms input streams to chunks of sorted, consolidated data. + chunker: C, + /// A sequence of power-of-two length lists of sorted, consolidated containers. + /// + /// Do not push/pop directly but use the corresponding functions ([`Self::chain_push`]/[`Self::chain_pop`]). + chains: Vec>, + /// Stash of empty chunks, recycled through the merging process. + stash: Vec, + /// Merges consolidated chunks, and extracts the subset of an update chain that lies in an interval of time. + merger: M, + /// Current lower frontier, we sealed up to here. + lower: Antichain, + /// The lower-bound frontier of the data, after the last call to seal. + frontier: Antichain, + /// Logger for size accounting. + logger: Option, + /// Timely operator ID. + operator_id: usize, + /// The `Input` type needs to be called out as the type of container accepted, but it is not otherwise present. + _marker: PhantomData, +} + +impl Batcher for MergeBatcher +where + C: ContainerBuilder + for<'a> PushInto<&'a mut Input>, + M: Merger, +{ + type Input = Input; + type Time = M::Time; + type Output = M::Chunk; + + fn new(logger: Option, operator_id: usize) -> Self { + Self { + logger, + operator_id, + chunker: C::default(), + merger: M::default(), + chains: Vec::new(), + stash: Vec::new(), + frontier: Antichain::new(), + lower: Antichain::from_elem(M::Time::minimum()), + _marker: PhantomData, + } + } + + /// Push a container of data into this merge batcher. Updates the internal chain structure if + /// needed. + fn push_container(&mut self, container: &mut Input) { + self.chunker.push_into(container); + while let Some(chunk) = self.chunker.extract() { + let chunk = std::mem::take(chunk); + self.insert_chain(vec![chunk]); + } + } + + // Sealing a batch means finding those updates with times not greater or equal to any time + // in `upper`. All updates must have time greater or equal to the previously used `upper`, + // which we call `lower`, by assumption that after sealing a batcher we receive no more + // updates with times not greater or equal to `upper`. + fn seal>( + &mut self, + upper: Antichain, + ) -> B::Output { + // Finish + while let Some(chunk) = self.chunker.finish() { + let chunk = std::mem::take(chunk); + self.insert_chain(vec![chunk]); + } + + // Merge all remaining chains into a single chain. + while self.chains.len() > 1 { + let list1 = self.chain_pop().unwrap(); + let list2 = self.chain_pop().unwrap(); + let merged = self.merge_by(list1, list2); + self.chain_push(merged); + } + let merged = self.chain_pop().unwrap_or_default(); + + // Extract readied data. + let mut kept = Vec::new(); + let mut readied = Vec::new(); + self.frontier.clear(); + + self.merger.extract( + merged, + upper.borrow(), + &mut self.frontier, + &mut readied, + &mut kept, + &mut self.stash, + ); + + if !kept.is_empty() { + self.chain_push(kept); + } + + self.stash.clear(); + + let description = Description::new( + self.lower.clone(), + upper.clone(), + Antichain::from_elem(M::Time::minimum()), + ); + let seal = B::seal(&mut readied, description); + self.lower = upper; + seal + } + + /// The frontier of elements remaining after the most recent call to `self.seal`. + #[inline] + fn frontier(&mut self) -> AntichainRef<'_, M::Time> { + self.frontier.borrow() + } +} + +impl MergeBatcher { + /// Insert a chain and maintain chain properties: Chains are geometrically sized and ordered + /// by decreasing length. + fn insert_chain(&mut self, chain: Vec) { + if !chain.is_empty() { + self.chain_push(chain); + while self.chains.len() > 1 + && (self.chains[self.chains.len() - 1].len() + >= self.chains[self.chains.len() - 2].len() / 2) + { + let list1 = self.chain_pop().unwrap(); + let list2 = self.chain_pop().unwrap(); + let merged = self.merge_by(list1, list2); + self.chain_push(merged); + } + } + } + + // merges two sorted input lists into one sorted output list. + fn merge_by(&mut self, list1: Vec, list2: Vec) -> Vec { + // TODO: `list1` and `list2` get dropped; would be better to reuse? + let mut output = Vec::with_capacity(list1.len() + list2.len()); + self.merger + .merge(list1, list2, &mut output, &mut self.stash); + + output + } + + /// Pop a chain and account size changes. + #[inline] + fn chain_pop(&mut self) -> Option> { + let chain = self.chains.pop(); + self.account(chain.iter().flatten().map(M::account), -1); + chain + } + + /// Push a chain and account size changes. + #[inline] + fn chain_push(&mut self, chain: Vec) { + self.account(chain.iter().map(M::account), 1); + self.chains.push(chain); + } + + /// Account size changes. Only performs work if a logger exists. + /// + /// Calculate the size based on the iterator passed along, with each attribute + /// multiplied by `diff`. Usually, one wants to pass 1 or -1 as the diff. + #[inline] + fn account>(&self, items: I, diff: isize) { + if let Some(logger) = &self.logger { + let (mut records, mut size, mut capacity, mut allocations) = + (0isize, 0isize, 0isize, 0isize); + for (records_, size_, capacity_, allocations_) in items { + records = records.saturating_add_unsigned(records_); + size = size.saturating_add_unsigned(size_); + capacity = capacity.saturating_add_unsigned(capacity_); + allocations = allocations.saturating_add_unsigned(allocations_); + } + logger.log(BatcherEvent { + operator: self.operator_id, + records_diff: records * diff, + size_diff: size * diff, + capacity_diff: capacity * diff, + allocations_diff: allocations * diff, + }) + } + } +} + +impl Drop for MergeBatcher { + fn drop(&mut self) { + // Cleanup chain to retract accounting information. + while self.chain_pop().is_some() {} + } +} + +/// A trait to describe interesting moments in a merge batcher. +pub trait Merger: Default { + /// The internal representation of chunks of data. + type Chunk: Default; + /// The type of time in frontiers to extract updates. + type Time; + /// Merge chains into an output chain. + fn merge( + &mut self, + list1: Vec, + list2: Vec, + output: &mut Vec, + stash: &mut Vec, + ); + /// Extract ready updates based on the `upper` frontier. + fn extract( + &mut self, + merged: Vec, + upper: AntichainRef, + frontier: &mut Antichain, + readied: &mut Vec, + kept: &mut Vec, + stash: &mut Vec, + ); + + /// Account size and allocation changes. Returns a tuple of (records, size, capacity, allocations). + fn account(chunk: &Self::Chunk) -> (usize, usize, usize, usize); +} + +pub use container::InternalMerger; + +pub mod container { + //! Merger implementations for the merge batcher. + //! + //! The `InternalMerge` trait allows containers to merge sorted, consolidated + //! data using internal iteration. The `InternalMerger` type implements the + //! `Merger` trait using `InternalMerge`, and is the standard merger for all + //! container types. + + use std::marker::PhantomData; + use timely::container::SizableContainer; + use timely::progress::frontier::{Antichain, AntichainRef}; + use timely::{Accountable, PartialOrder}; + + use crate::merge_batcher::Merger; + + /// A container that supports the operations needed by the merge batcher: + /// merging sorted chains and extracting updates by time. + pub trait InternalMerge: Accountable + SizableContainer + Default { + /// The owned time type, for maintaining antichains. + type TimeOwned; + + /// The number of items in this container. + fn len(&self) -> usize; + + /// Clear the container for reuse. + fn clear(&mut self); + + /// Account the allocations behind the chunk. + fn account(&self) -> (usize, usize, usize, usize) { + let (size, capacity, allocations) = (0, 0, 0); + ( + usize::try_from(self.record_count()).unwrap(), + size, + capacity, + allocations, + ) + } + + /// Merge items from sorted inputs into `self`, advancing positions. + /// Merges until `self` is at capacity or all inputs are exhausted. + /// + /// Dispatches based on the number of inputs: + /// - **0**: no-op + /// - **1**: bulk copy (may swap the input into `self`) + /// - **2**: merge two sorted streams + fn merge_from(&mut self, others: &mut [Self], positions: &mut [usize]); + + /// Extract updates from `self` into `ship` (times not beyond `upper`) + /// and `keep` (times beyond `upper`), updating `frontier` with kept times. + /// + /// Iteration starts at `*position` and advances `*position` as updates + /// are consumed. The implementation must yield (return early) when + /// either `keep.at_capacity()` or `ship.at_capacity()` becomes true, + /// so the caller can swap out a full output buffer and resume by + /// calling `extract` again. The caller invokes `extract` repeatedly + /// until `*position >= self.len()`. + /// + /// This shape exists because `at_capacity()` for `Vec` is + /// `len() == capacity()`, which silently becomes false again the + /// moment a push past capacity grows the backing allocation. + /// Without per-element yielding, a single `extract` call can + /// quietly produce oversized output chunks. + fn extract( + &mut self, + position: &mut usize, + upper: AntichainRef, + frontier: &mut Antichain, + keep: &mut Self, + ship: &mut Self, + ); + } + + /// A `Merger` for `Vec` containers, which contain owned data and need special treatment. + pub type VecInternalMerger = VecMerger; + /// A `Merger` implementation for `Vec<(D, T, R)>` that drains owned inputs. + pub struct VecMerger { + _marker: PhantomData<(D, T, R)>, + } + + impl Default for VecMerger { + fn default() -> Self { + Self { + _marker: PhantomData, + } + } + } + + impl VecMerger { + /// The target capacity for output buffers, as a power of two. + /// + /// This amount is used to size vectors, where vectors not exactly this capacity are dropped. + /// If this is mis-set, there is the potential for more memory churn than anticipated. + fn target_capacity() -> usize { + timely::container::buffer::default_capacity::<(D, T, R)>().next_power_of_two() + } + /// Acquire a buffer with the target capacity. + fn empty(&self, stash: &mut Vec>) -> Vec<(D, T, R)> { + let target = Self::target_capacity(); + let mut container = stash.pop().unwrap_or_default(); + container.clear(); + // Reuse if at target; otherwise allocate fresh. + if container.capacity() != target { + container = Vec::with_capacity(target); + } + container + } + /// Refill `queue` from `iter` if empty. Recycles drained queues into `stash`. + fn refill( + queue: &mut std::collections::VecDeque<(D, T, R)>, + iter: &mut impl Iterator>, + stash: &mut Vec>, + ) { + if queue.is_empty() { + let target = Self::target_capacity(); + if stash.len() < 2 { + let mut recycled = Vec::from(std::mem::take(queue)); + recycled.clear(); + if recycled.capacity() == target { + stash.push(recycled); + } + } + if let Some(chunk) = iter.next() { + *queue = std::collections::VecDeque::from(chunk); + } + } + } + } + + impl Merger for VecMerger + where + D: Ord + Clone + 'static, + T: Ord + Clone + PartialOrder + 'static, + R: differential_dataflow::difference::Semigroup + Clone + 'static, + { + type Chunk = Vec<(D, T, R)>; + type Time = T; + + fn merge( + &mut self, + list1: Vec>, + list2: Vec>, + output: &mut Vec>, + stash: &mut Vec>, + ) { + use std::cmp::Ordering; + use std::collections::VecDeque; + + let mut iter1 = list1.into_iter(); + let mut iter2 = list2.into_iter(); + let mut q1 = VecDeque::<(D, T, R)>::from(iter1.next().unwrap_or_default()); + let mut q2 = VecDeque::<(D, T, R)>::from(iter2.next().unwrap_or_default()); + + let mut result = self.empty(stash); + + // Merge while both queues are non-empty. + while let (Some((d1, t1, _)), Some((d2, t2, _))) = (q1.front(), q2.front()) { + match (d1, t1).cmp(&(d2, t2)) { + Ordering::Less => { + result.push(q1.pop_front().unwrap()); + } + Ordering::Greater => { + result.push(q2.pop_front().unwrap()); + } + Ordering::Equal => { + let (d, t, mut r1) = q1.pop_front().unwrap(); + let (_, _, r2) = q2.pop_front().unwrap(); + r1.plus_equals(&r2); + if !r1.is_zero() { + result.push((d, t, r1)); + } + } + } + + if result.at_capacity() { + output.push(std::mem::take(&mut result)); + result = self.empty(stash); + } + + // Refill emptied queues from their chains. + if q1.is_empty() { + Self::refill(&mut q1, &mut iter1, stash); + } + if q2.is_empty() { + Self::refill(&mut q2, &mut iter2, stash); + } + } + + // Push partial result and remaining data from both sides. + if !result.is_empty() { + output.push(result); + } + for q in [q1, q2] { + if !q.is_empty() { + output.push(Vec::from(q)); + } + } + output.extend(iter1); + output.extend(iter2); + } + + fn extract( + &mut self, + merged: Vec>, + upper: AntichainRef, + frontier: &mut Antichain, + ship: &mut Vec>, + kept: &mut Vec>, + stash: &mut Vec>, + ) { + let mut keep = self.empty(stash); + let mut ready = self.empty(stash); + + for mut chunk in merged { + // Go update-by-update to swap out full containers. + for (data, time, diff) in chunk.drain(..) { + if upper.less_equal(&time) { + frontier.insert_with(&time, |time| time.clone()); + keep.push((data, time, diff)); + } else { + ready.push((data, time, diff)); + } + if keep.at_capacity() { + kept.push(std::mem::take(&mut keep)); + keep = self.empty(stash); + } + if ready.at_capacity() { + ship.push(std::mem::take(&mut ready)); + ready = self.empty(stash); + } + } + // Recycle the now-empty chunk if it has the right capacity. + if chunk.capacity() == Self::target_capacity() { + stash.push(chunk); + } + } + if !keep.is_empty() { + kept.push(keep); + } + if !ready.is_empty() { + ship.push(ready); + } + } + + fn account(chunk: &Vec<(D, T, R)>) -> (usize, usize, usize, usize) { + (chunk.len(), 0, 0, 0) + } + } + + /// A merger that uses internal iteration via [`InternalMerge`]. + pub struct InternalMerger { + _marker: PhantomData, + } + + impl Default for InternalMerger { + fn default() -> Self { + Self { + _marker: PhantomData, + } + } + } + + impl InternalMerger + where + MC: InternalMerge, + { + #[inline] + fn empty(&self, stash: &mut Vec) -> MC { + stash.pop().unwrap_or_else(|| { + let mut container = MC::default(); + container.ensure_capacity(&mut None); + container + }) + } + #[inline] + fn recycle(&self, mut chunk: MC, stash: &mut Vec) { + chunk.clear(); + stash.push(chunk); + } + /// Drain remaining items from one side into `result`/`output`. + /// + /// Copies the partially-consumed head into `result`, then appends + /// remaining full chunks directly to `output` without copying. + fn drain_side( + &self, + head: &mut MC, + pos: &mut usize, + list: &mut std::vec::IntoIter, + result: &mut MC, + output: &mut Vec, + stash: &mut Vec, + ) { + // Copy the partially-consumed head into result. + if *pos < head.len() { + result.merge_from(std::slice::from_mut(head), std::slice::from_mut(pos)); + } + // Flush result before appending full chunks. + if !result.is_empty() { + output.push(std::mem::take(result)); + *result = self.empty(stash); + } + // Remaining full chunks go directly to output. + output.extend(list); + } + } + + impl Merger for InternalMerger + where + MC: InternalMerge + 'static, + { + type Time = MC::TimeOwned; + type Chunk = MC; + + fn merge( + &mut self, + list1: Vec, + list2: Vec, + output: &mut Vec, + stash: &mut Vec, + ) { + let mut list1 = list1.into_iter(); + let mut list2 = list2.into_iter(); + + let mut heads = [ + list1.next().unwrap_or_default(), + list2.next().unwrap_or_default(), + ]; + let mut positions = [0usize, 0usize]; + + let mut result = self.empty(stash); + + // Main merge loop: both sides have data. + while positions[0] < heads[0].len() && positions[1] < heads[1].len() { + result.merge_from(&mut heads, &mut positions); + + if positions[0] >= heads[0].len() { + let old = std::mem::replace(&mut heads[0], list1.next().unwrap_or_default()); + self.recycle(old, stash); + positions[0] = 0; + } + if positions[1] >= heads[1].len() { + let old = std::mem::replace(&mut heads[1], list2.next().unwrap_or_default()); + self.recycle(old, stash); + positions[1] = 0; + } + if result.at_capacity() { + output.push(std::mem::take(&mut result)); + result = self.empty(stash); + } + } + + // Drain remaining from each side: copy partial head, then append full chunks. + self.drain_side( + &mut heads[0], + &mut positions[0], + &mut list1, + &mut result, + output, + stash, + ); + self.drain_side( + &mut heads[1], + &mut positions[1], + &mut list2, + &mut result, + output, + stash, + ); + if !result.is_empty() { + output.push(result); + } + } + + fn extract( + &mut self, + merged: Vec, + upper: AntichainRef, + frontier: &mut Antichain, + ship: &mut Vec, + kept: &mut Vec, + stash: &mut Vec, + ) { + let mut keep = self.empty(stash); + let mut ready = self.empty(stash); + + for mut buffer in merged { + let mut position = 0; + let len = buffer.len(); + while position < len { + buffer.extract(&mut position, upper, frontier, &mut keep, &mut ready); + if keep.at_capacity() { + kept.push(std::mem::take(&mut keep)); + keep = self.empty(stash); + } + if ready.at_capacity() { + ship.push(std::mem::take(&mut ready)); + ready = self.empty(stash); + } + } + self.recycle(buffer, stash); + } + if !keep.is_empty() { + kept.push(keep); + } + if !ready.is_empty() { + ship.push(ready); + } + } + + fn account(chunk: &Self::Chunk) -> (usize, usize, usize, usize) { + chunk.account() + } + } + + /// Implementation of `InternalMerge` for `Vec<(D, T, R)>`. + /// + /// Note: The `VecMerger` type implements `Merger` directly and avoids + /// cloning by draining inputs. This `InternalMerge` impl is retained + /// because `reduce` requires `Builder::Input: InternalMerge`. + pub mod vec_internal { + use super::InternalMerge; + use differential_dataflow::difference::Semigroup; + use std::cmp::Ordering; + use timely::PartialOrder; + use timely::container::SizableContainer; + use timely::progress::frontier::{Antichain, AntichainRef}; + + impl< + D: Ord + Clone + 'static, + T: Ord + Clone + PartialOrder + 'static, + R: Semigroup + Clone + 'static, + > InternalMerge for Vec<(D, T, R)> + { + type TimeOwned = T; + + fn len(&self) -> usize { + Vec::len(self) + } + fn clear(&mut self) { + Vec::clear(self) + } + + fn merge_from(&mut self, others: &mut [Self], positions: &mut [usize]) { + match others.len() { + 0 => {} + 1 => { + let other = &mut others[0]; + let pos = &mut positions[0]; + if self.is_empty() && *pos == 0 { + std::mem::swap(self, other); + return; + } + self.extend_from_slice(&other[*pos..]); + *pos = other.len(); + } + 2 => { + let (left, right) = others.split_at_mut(1); + let other1 = &mut left[0]; + let other2 = &mut right[0]; + + while positions[0] < other1.len() + && positions[1] < other2.len() + && !self.at_capacity() + { + let (d1, t1, _) = &other1[positions[0]]; + let (d2, t2, _) = &other2[positions[1]]; + // NOTE: The .clone() calls here are not great, but this dead code to be removed in the next release. + match (d1, t1).cmp(&(d2, t2)) { + Ordering::Less => { + self.push(other1[positions[0]].clone()); + positions[0] += 1; + } + Ordering::Greater => { + self.push(other2[positions[1]].clone()); + positions[1] += 1; + } + Ordering::Equal => { + let (d, t, mut r1) = other1[positions[0]].clone(); + let (_, _, ref r2) = other2[positions[1]]; + r1.plus_equals(r2); + if !r1.is_zero() { + self.push((d, t, r1)); + } + positions[0] += 1; + positions[1] += 1; + } + } + } + } + n => unimplemented!("{n}-way merge not yet supported"), + } + } + + fn extract( + &mut self, + position: &mut usize, + upper: AntichainRef, + frontier: &mut Antichain, + keep: &mut Self, + ship: &mut Self, + ) { + let len = self.len(); + while *position < len && !keep.at_capacity() && !ship.at_capacity() { + let (data, time, diff) = self[*position].clone(); + if upper.less_equal(&time) { + frontier.insert_with(&time, |time| time.clone()); + keep.push((data, time, diff)); + } else { + ship.push((data, time, diff)); + } + *position += 1; + } + } + } + } +} From 218f60eae090f24b88b3ccbd85d8ea68138622c4 Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Thu, 7 May 2026 14:36:37 +0200 Subject: [PATCH 2/5] compute: temporal-bucketing batcher Replaces the source-side temporal-bucketing operator on the arrangement path with a bucket-aware merge batcher that integrates directly into mz_arrange call sites. Layered on top of the vendored `mz_timely_util::merge_batcher`, the new `TemporalBucketingMergeBatcher` keeps current data on a flat `chains: Vec>` exactly like plain `MergeBatcher` and routes only the future-stamped `kept` side of each seal extract into a `BucketChain` of `MergeBucket`s. Subsequent seals peel mature buckets out of the bucket chain and merge their contents with the current flat extract. The seal contract is preserved: only updates strictly less than `upper` are emitted. `Batcher::frontier` reports the actual minimum held timestamp by merging the lowest non-empty bucket once per seal and consulting `Merger::extract`. Subsequent seals see a single chain in that bucket, so the work is amortised. `CollectionBundle::ensure_collections` reads `ENABLE_TEMPORAL_BUCKETING` and dispatches the row-row arrangement build through a new per- `RenderTimestamp` trait `MaybeTemporalRowRowArrange`. The trait is a sibling to `RenderTimestamp`, not a supertrait, so the bucketing precondition does not leak into the generic timestamp interface. Iterative `Product<...>` always uses the non-temporal batcher. Several seal-time fast paths keep per-push and per-seal cost close to plain `MergeBatcher` when the bucket chain is empty: * Held-records counter short-circuits `seal` when there is no future data and the current extract produced no `kept` side, skipping the full peel/merge/restore/recompute_frontier cycle. * `Merger::time_range` reports per-chunk `(lower, upper)` antichains so `MergeBucket::split` can short-circuit when data lies entirely on one side of the split timestamp. * `kept` is routed into the bucket chain as a single chain rather than per chunk, so the destination bucket adopts the consolidated chain instead of pairwise-merging it back into existence. * The slow-path route walks the chain via the lower frontier returned by `extract`, jumping directly to the next non-empty bucket instead of iterating every boundary. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/compute/src/extensions/arrange.rs | 81 +- src/compute/src/extensions/temporal_bucket.rs | 2 +- src/compute/src/render.rs | 5 +- src/compute/src/render/context.rs | 48 +- src/compute/src/render/reduce.rs | 2 +- src/compute/src/render/threshold.rs | 2 +- src/compute/src/render/top_k.rs | 2 +- src/compute/src/typedefs.rs | 2 +- .../src/upsert_continual_feedback_v2.rs | 4 +- src/timely-util/src/columnar.rs | 14 +- src/timely-util/src/columnation.rs | 16 +- src/timely-util/src/merge_batcher.rs | 968 ++++++++++++++++++ src/timely-util/src/temporal.rs | 55 +- 13 files changed, 1152 insertions(+), 49 deletions(-) diff --git a/src/compute/src/extensions/arrange.rs b/src/compute/src/extensions/arrange.rs index 6575402e65d53..88f3011fc27a8 100644 --- a/src/compute/src/extensions/arrange.rs +++ b/src/compute/src/extensions/arrange.rs @@ -11,24 +11,31 @@ use std::collections::BTreeMap; use std::rc::Rc; use differential_dataflow::difference::Semigroup; +use differential_dataflow::dynamic::pointstamp::PointStamp; use differential_dataflow::lattice::Lattice; use differential_dataflow::operators::arrange::arrangement::arrange_core; use differential_dataflow::operators::arrange::{Arranged, TraceAgent}; use differential_dataflow::trace::implementations::spine_fueled::Spine; use differential_dataflow::trace::{Batch, Batcher, Builder, Trace, TraceReader}; use differential_dataflow::{Collection, Data, ExchangeData, Hashable, VecCollection}; +use mz_repr::{Diff, Row}; +use mz_timely_util::columnar::builder::ColumnBuilder; +use mz_timely_util::columnar::{Col2ValBatcher, Col2ValTemporalBatcher, Column, columnar_exchange}; use timely::Container; use timely::dataflow::Stream; -use timely::dataflow::channels::pact::{Exchange, ParallelizationContract, Pipeline}; +use timely::dataflow::channels::pact::{Exchange, ExchangeCore, ParallelizationContract, Pipeline}; use timely::dataflow::operators::Operator; +use timely::order::Product; use timely::progress::Timestamp; use crate::logging::compute::{ ArrangementHeapAllocations, ArrangementHeapCapacity, ArrangementHeapSize, ArrangementHeapSizeOperator, ComputeEvent, ComputeEventBuilder, }; +use crate::row_spine::RowRowBuilder; use crate::typedefs::{ - KeyAgent, KeyValAgent, MzArrangeData, MzData, MzTimestamp, RowAgent, RowRowAgent, RowValAgent, + KeyAgent, KeyValAgent, MzArrangeData, MzData, MzTimestamp, RowAgent, RowRowAgent, RowRowSpine, + RowValAgent, }; /// Extension trait to arrange data. @@ -426,3 +433,73 @@ where }) } } + +/// Per-`RenderTimestamp` dispatch trait for the row-row arrangement built by +/// [`crate::render::context::CollectionBundle::ensure_collections`]. +/// +/// Two impls exist, one per concrete `RenderTimestamp`. For +/// [`mz_repr::Timestamp`] the dispatch picks +/// [`Col2ValTemporalBatcher`] when temporal bucketing is enabled, else +/// falls through to the plain [`Col2ValBatcher`]. For +/// `Product>` (iterative scopes) it +/// always uses the plain batcher because the timestamp is partially +/// ordered and not bucketable. +/// +/// This trait is intentionally **not** a supertrait of +/// `crate::render::RenderTimestamp` so that the bucketing precondition does +/// not leak into the generic timestamp interface. +pub trait MaybeTemporalRowRowArrange: MzTimestamp { + /// Arrange a row-row stream into a [`RowRowAgent`], possibly via the + /// temporal-bucketing batcher. + fn arrange_row_row<'scope>( + stream: Stream<'scope, Self, Column<((Row, Row), Self, Diff)>>, + name: &str, + bucketing_enabled: bool, + ) -> Arranged<'scope, RowRowAgent>; +} + +impl MaybeTemporalRowRowArrange for mz_repr::Timestamp { + fn arrange_row_row<'scope>( + stream: Stream<'scope, Self, Column<((Row, Row), Self, Diff)>>, + name: &str, + bucketing_enabled: bool, + ) -> Arranged<'scope, RowRowAgent> { + let pact = ExchangeCore::, _>::new_core( + columnar_exchange::, + ); + if bucketing_enabled { + stream.mz_arrange_core::< + _, + Col2ValTemporalBatcher, + RowRowBuilder, + RowRowSpine, + >(pact, name) + } else { + stream.mz_arrange_core::< + _, + Col2ValBatcher, + RowRowBuilder, + RowRowSpine, + >(pact, name) + } + } +} + +impl MaybeTemporalRowRowArrange for Product> { + fn arrange_row_row<'scope>( + stream: Stream<'scope, Self, Column<((Row, Row), Self, Diff)>>, + name: &str, + _bucketing_enabled: bool, + ) -> Arranged<'scope, RowRowAgent> { + // Iterative scope: timestamp is partially ordered, no bucketing. + let pact = ExchangeCore::, _>::new_core( + columnar_exchange::, + ); + stream.mz_arrange_core::< + _, + Col2ValBatcher, + RowRowBuilder, + RowRowSpine, + >(pact, name) + } +} diff --git a/src/compute/src/extensions/temporal_bucket.rs b/src/compute/src/extensions/temporal_bucket.rs index 569f306191d2c..2ef91817a9220 100644 --- a/src/compute/src/extensions/temporal_bucket.rs +++ b/src/compute/src/extensions/temporal_bucket.rs @@ -14,9 +14,9 @@ use std::marker::PhantomData; use differential_dataflow::Hashable; use differential_dataflow::difference::Semigroup; use differential_dataflow::lattice::Lattice; -use differential_dataflow::trace::implementations::merge_batcher::MergeBatcher; use differential_dataflow::trace::{Batcher, Builder, Description}; use mz_timely_util::columnation::{ColInternalMerger, ColumnationChunker, ColumnationStack}; +use mz_timely_util::merge_batcher::MergeBatcher; use mz_timely_util::temporal::{Bucket, BucketChain, BucketTimestamp}; use timely::container::PushInto; use timely::dataflow::channels::pact::Exchange; diff --git a/src/compute/src/render.rs b/src/compute/src/render.rs index 08e3182c3621b..e5f8f32ef1e16 100644 --- a/src/compute/src/render.rs +++ b/src/compute/src/render.rs @@ -988,7 +988,10 @@ impl<'scope> Context<'scope, Product>> { } } -impl<'scope, T: RenderTimestamp> Context<'scope, T> { +impl<'scope, T> Context<'scope, T> +where + T: RenderTimestamp + crate::extensions::arrange::MaybeTemporalRowRowArrange, +{ /// Renders a non-recursive plan to a differential dataflow, producing the collection of /// results. /// diff --git a/src/compute/src/render/context.rs b/src/compute/src/render/context.rs index e6e6d5c8d235c..d9509a499d1c8 100644 --- a/src/compute/src/render/context.rs +++ b/src/compute/src/render/context.rs @@ -19,7 +19,9 @@ use differential_dataflow::trace::implementations::BatchContainer; use differential_dataflow::trace::{BatchReader, Cursor, TraceReader}; use differential_dataflow::{AsCollection, Data, VecCollection}; use mz_compute_types::dataflows::DataflowDescription; -use mz_compute_types::dyncfgs::ENABLE_COMPUTE_RENDER_FUELED_AS_SPECIFIC_COLLECTION; +use mz_compute_types::dyncfgs::{ + ENABLE_COMPUTE_RENDER_FUELED_AS_SPECIFIC_COLLECTION, ENABLE_TEMPORAL_BUCKETING, +}; use mz_compute_types::plan::AvailableCollections; use mz_dyncfg::ConfigSet; use mz_expr::{Id, MapFilterProject, MirScalarExpr}; @@ -28,10 +30,9 @@ use mz_repr::fixed_length::ToDatumIter; use mz_repr::{DatumVec, DatumVecBorrow, Diff, GlobalId, Row, RowArena, SharedRow}; use mz_storage_types::controller::CollectionMetadata; use mz_timely_util::columnar::builder::ColumnBuilder; -use mz_timely_util::columnar::{Col2ValBatcher, columnar_exchange}; use mz_timely_util::operator::CollectionExt; use timely::container::CapacityContainerBuilder; -use timely::dataflow::channels::pact::{ExchangeCore, Pipeline}; +use timely::dataflow::channels::pact::Pipeline; use timely::dataflow::operators::Capability; use timely::dataflow::operators::generic::builder_rc::OperatorBuilder; use timely::dataflow::operators::generic::{OutputBuilder, OutputBuilderSession}; @@ -40,12 +41,12 @@ use timely::progress::operate::FrontierInterest; use timely::progress::{Antichain, Timestamp}; use crate::compute_state::ComputeState; -use crate::extensions::arrange::{KeyCollection, MzArrange, MzArrangeCore}; +use crate::extensions::arrange::{KeyCollection, MaybeTemporalRowRowArrange, MzArrange}; use crate::render::errors::{DataflowErrorSer, ErrorLogger}; use crate::render::{LinearJoinSpec, RenderTimestamp}; -use crate::row_spine::{DatumSeq, RowRowBuilder}; +use crate::row_spine::DatumSeq; use crate::typedefs::{ - ErrAgent, ErrBatcher, ErrBuilder, ErrEnter, ErrSpine, RowRowAgent, RowRowEnter, RowRowSpine, + ErrAgent, ErrBatcher, ErrBuilder, ErrEnter, ErrSpine, RowRowAgent, RowRowEnter, }; /// Dataflow-local collections and arrangements. @@ -738,7 +739,10 @@ impl<'scope, T: RenderTimestamp> CollectionBundle<'scope, T> { input_mfp: MapFilterProject, until: Antichain, config_set: &ConfigSet, - ) -> Self { + ) -> Self + where + T: MaybeTemporalRowRowArrange, + { if collections == Default::default() { return self; } @@ -757,6 +761,8 @@ impl<'scope, T: RenderTimestamp> CollectionBundle<'scope, T> { ); } + let bucketing_enabled = ENABLE_TEMPORAL_BUCKETING.get(config_set); + // We need the collection if either (1) it is explicitly demanded, or (2) we are going to render any arrangement let form_raw_collection = collections.raw || collections @@ -780,8 +786,13 @@ impl<'scope, T: RenderTimestamp> CollectionBundle<'scope, T> { .collection .take() .expect("Collection constructed above"); - let (oks, errs_keyed, passthrough) = - Self::arrange_collection(&name, oks, key.clone(), thinning.clone()); + let (oks, errs_keyed, passthrough) = Self::arrange_collection( + &name, + oks, + key.clone(), + thinning.clone(), + bucketing_enabled, + ); let errs_concat: KeyCollection<_, _, _> = errs.clone().concat(errs_keyed).into(); self.collection = Some((passthrough, errs)); let errs = @@ -810,11 +821,15 @@ impl<'scope, T: RenderTimestamp> CollectionBundle<'scope, T> { oks: VecCollection<'scope, T, Row, Diff>, key: Vec, thinning: Vec, + bucketing_enabled: bool, ) -> ( Arranged<'scope, RowRowAgent>, VecCollection<'scope, T, DataflowErrorSer, Diff>, VecCollection<'scope, T, Row, Diff>, - ) { + ) + where + T: MaybeTemporalRowRowArrange, + { // This operator implements a `map_fallible`, but produces columnar updates for the ok // stream. The `map_fallible` cannot be used here because the closure cannot return // references, which is what we need to push into columnar streams. Instead, we use a @@ -861,18 +876,7 @@ impl<'scope, T: RenderTimestamp> CollectionBundle<'scope, T> { } }); - let oks = ok_stream - .mz_arrange_core::< - _, - Col2ValBatcher<_, _, _, _>, - RowRowBuilder<_, _>, - RowRowSpine<_, _>, - >( - ExchangeCore::, _>::new_core( - columnar_exchange::, - ), - name - ); + let oks = T::arrange_row_row(ok_stream, name, bucketing_enabled); ( oks, err_stream.as_collection(), diff --git a/src/compute/src/render/reduce.rs b/src/compute/src/render/reduce.rs index d4f34d2951846..205d0c2932ba1 100644 --- a/src/compute/src/render/reduce.rs +++ b/src/compute/src/render/reduce.rs @@ -23,7 +23,6 @@ use differential_dataflow::difference::{IsZero, Multiply, Semigroup}; use differential_dataflow::hashable::Hashable; use differential_dataflow::operators::arrange::{Arranged, TraceAgent}; use differential_dataflow::trace::implementations::BatchContainer; -use differential_dataflow::trace::implementations::merge_batcher::container::InternalMerge; use differential_dataflow::trace::{Builder, Trace}; use differential_dataflow::{Data, VecCollection}; use itertools::Itertools; @@ -37,6 +36,7 @@ use mz_expr::{ use mz_repr::adt::numeric::{self, Numeric, NumericAgg}; use mz_repr::fixed_length::ToDatumIter; use mz_repr::{Datum, DatumVec, Diff, Row, RowArena, SharedRow}; +use mz_timely_util::merge_batcher::container::InternalMerge; use mz_timely_util::operator::CollectionExt; use serde::{Deserialize, Serialize}; use timely::Container; diff --git a/src/compute/src/render/threshold.rs b/src/compute/src/render/threshold.rs index 3c47f5267cf35..0570cd1f494db 100644 --- a/src/compute/src/render/threshold.rs +++ b/src/compute/src/render/threshold.rs @@ -14,11 +14,11 @@ use differential_dataflow::Data; use differential_dataflow::operators::arrange::{Arranged, TraceAgent}; use differential_dataflow::trace::implementations::BatchContainer; -use differential_dataflow::trace::implementations::merge_batcher::container::InternalMerge; use differential_dataflow::trace::{Builder, Trace, TraceReader}; use mz_compute_types::plan::threshold::{BasicThresholdPlan, ThresholdPlan}; use mz_expr::MirScalarExpr; use mz_repr::Diff; +use mz_timely_util::merge_batcher::container::InternalMerge; use timely::Container; use timely::container::PushInto; diff --git a/src/compute/src/render/top_k.rs b/src/compute/src/render/top_k.rs index 28df27f0d3369..99115ac1a7a0f 100644 --- a/src/compute/src/render/top_k.rs +++ b/src/compute/src/render/top_k.rs @@ -21,7 +21,6 @@ use differential_dataflow::lattice::Lattice; use differential_dataflow::operators::arrange::{Arranged, TraceAgent}; use differential_dataflow::operators::iterate::Variable as SemigroupVariable; use differential_dataflow::trace::implementations::BatchContainer; -use differential_dataflow::trace::implementations::merge_batcher::container::InternalMerge; use differential_dataflow::trace::{Builder, Trace}; use differential_dataflow::{Data, VecCollection}; use mz_compute_types::plan::top_k::{ @@ -32,6 +31,7 @@ use mz_expr::{BinaryFunc, EvalError, MirScalarExpr, UnaryFunc, func}; use mz_ore::cast::CastFrom; use mz_ore::soft_assert_or_log; use mz_repr::{Datum, DatumVec, Diff, ReprScalarType, Row, SharedRow}; +use mz_timely_util::merge_batcher::container::InternalMerge; use mz_timely_util::operator::CollectionExt; use timely::Container; use timely::container::{CapacityContainerBuilder, PushInto}; diff --git a/src/compute/src/typedefs.rs b/src/compute/src/typedefs.rs index 31ab29e9a03da..79be68015ca2b 100644 --- a/src/compute/src/typedefs.rs +++ b/src/compute/src/typedefs.rs @@ -14,11 +14,11 @@ use columnar::{Container, Ref}; use differential_dataflow::operators::arrange::Arranged; use differential_dataflow::operators::arrange::TraceAgent; -use differential_dataflow::trace::implementations::merge_batcher::MergeBatcher; use differential_dataflow::trace::wrappers::enter::TraceEnter; use differential_dataflow::trace::wrappers::frontier::TraceFrontier; use mz_repr::Diff; use mz_timely_util::columnation::{ColInternalMerger, ColumnationChunker}; +use mz_timely_util::merge_batcher::MergeBatcher; use crate::render::errors::DataflowErrorSer; use crate::row_spine::RowValBuilder; diff --git a/src/storage/src/upsert_continual_feedback_v2.rs b/src/storage/src/upsert_continual_feedback_v2.rs index 32de9e3770086..aa3a8b3ac6a36 100644 --- a/src/storage/src/upsert_continual_feedback_v2.rs +++ b/src/storage/src/upsert_continual_feedback_v2.rs @@ -71,9 +71,6 @@ use differential_dataflow::lattice::Lattice; use differential_dataflow::operators::arrange::agent::TraceAgent; use differential_dataflow::trace::implementations::ValSpine; use differential_dataflow::trace::implementations::chunker::ContainerChunker; -use differential_dataflow::trace::implementations::merge_batcher::{ - MergeBatcher, container::VecMerger, -}; use differential_dataflow::trace::{Batcher, Builder, Cursor, Description, TraceReader}; use differential_dataflow::{AsCollection, VecCollection}; use mz_repr::{Diff, GlobalId, Row}; @@ -81,6 +78,7 @@ use mz_storage_types::errors::{DataflowError, EnvelopeError}; use mz_timely_util::builder_async::{ Event as AsyncEvent, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton, }; +use mz_timely_util::merge_batcher::{MergeBatcher, container::VecMerger}; use std::convert::Infallible; use timely::container::CapacityContainerBuilder; use timely::dataflow::StreamVec; diff --git a/src/timely-util/src/columnar.rs b/src/timely-util/src/columnar.rs index 5db3a34689e7e..a1a7149e9fe2c 100644 --- a/src/timely-util/src/columnar.rs +++ b/src/timely-util/src/columnar.rs @@ -23,19 +23,20 @@ pub mod consolidate; use std::hash::Hash; +use crate::merge_batcher::MergeBatcher; use columnar::Borrow; use columnar::bytes::indexed; use columnar::common::IterOwn; use columnar::{Columnar, Ref}; use columnar::{FromBytes, Index, Len}; use differential_dataflow::Hashable; -use differential_dataflow::trace::implementations::merge_batcher::MergeBatcher; use timely::Accountable; use timely::bytes::arc::Bytes; use timely::container::{DrainContainer, PushInto}; use timely::dataflow::channels::ContainerBytes; use crate::columnation::{ColInternalMerger, ColumnationStack}; +use crate::merge_batcher::TemporalBucketingMergeBatcher; /// A batcher for columnar storage. pub type Col2ValBatcher = MergeBatcher< @@ -46,6 +47,17 @@ pub type Col2ValBatcher = MergeBatcher< /// A batcher for columnar storage with unit values. pub type Col2KeyBatcher = Col2ValBatcher; +/// A temporal-bucketing batcher for columnar storage. +/// +/// Same chunker and merger as [`Col2ValBatcher`], but routes chunks through +/// a [`TemporalBucketingMergeBatcher`] so that the arrangement spine sees +/// fewer, bucket-aligned batches. +pub type Col2ValTemporalBatcher = TemporalBucketingMergeBatcher< + Column<((K, V), T, R)>, + batcher::Chunker>, + ColInternalMerger<(K, V), T, R>, +>; + /// A container based on a columnar store, encoded in aligned bytes. /// /// The type can represent typed data, bytes from Timely, or an aligned allocation. The name diff --git a/src/timely-util/src/columnation.rs b/src/timely-util/src/columnation.rs index 64eb1fd035790..bfa6786bfa0dc 100644 --- a/src/timely-util/src/columnation.rs +++ b/src/timely-util/src/columnation.rs @@ -22,11 +22,11 @@ use std::collections::VecDeque; use std::iter::FromIterator; +use crate::merge_batcher::container::InternalMerge; use columnation::{Columnation, Region}; use differential_dataflow::consolidation::consolidate_updates; use differential_dataflow::difference::Semigroup; use differential_dataflow::lattice::Lattice; -use differential_dataflow::trace::implementations::merge_batcher::container::InternalMerge; use differential_dataflow::trace::implementations::{BatchContainer, BuilderInput}; use timely::container::{ContainerBuilder, DrainContainer, PushInto, SizableContainer}; use timely::progress::Timestamp; @@ -653,6 +653,16 @@ where *position += 1; } } + + fn time_range(&self) -> Option<(Antichain, Antichain)> { + let slice = &self[..]; + if slice.is_empty() { + return None; + } + Some(crate::merge_batcher::build_time_bounds( + slice.iter().map(|(_, t, _)| t.clone()), + )) + } } // --------------------------------------------------------------------------- @@ -661,6 +671,4 @@ where /// A `Merger` using internal iteration for `ColumnationStack` containers. pub type ColInternalMerger = - differential_dataflow::trace::implementations::merge_batcher::InternalMerger< - ColumnationStack<(D, T, R)>, - >; + crate::merge_batcher::InternalMerger>; diff --git a/src/timely-util/src/merge_batcher.rs b/src/timely-util/src/merge_batcher.rs index f95fbbb693879..68dc0ea816a76 100644 --- a/src/timely-util/src/merge_batcher.rs +++ b/src/timely-util/src/merge_batcher.rs @@ -256,10 +256,947 @@ pub trait Merger: Default { /// Account size and allocation changes. Returns a tuple of (records, size, capacity, allocations). fn account(chunk: &Self::Chunk) -> (usize, usize, usize, usize); + + /// Return `(lower, upper)` antichains bracketing the times in `chunk`, + /// or `None` if the chunk is empty or the implementation cannot + /// produce them. + /// + /// `lower` is an antichain of *minimal* elements (every chunk time + /// satisfies `lower.less_equal(t)`); `upper` is an antichain of + /// *maximal* elements (every chunk time satisfies `t.less_equal(u)` + /// for some `u` in `upper`). For totally-ordered times both antichains + /// reduce to the single min / max element. + /// + /// Used by [`temporal::MergeBucket`] to short-circuit + /// `Bucket::split` when the bucket's data lies entirely on one side + /// of the split timestamp. The default returns `None`, which forces + /// the bucket onto the merge+extract fallback. + fn time_range(_chunk: &Self::Chunk) -> Option<(Antichain, Antichain)> { + None + } } pub use container::InternalMerger; +/// Build `(lower, upper)` antichains from an iterator of timestamps. +/// +/// `lower` is an antichain of *minimal* elements (every emitted time is +/// in-advance-of `lower`); `upper` is an antichain of *maximal* +/// elements (every emitted time is `less_equal` to some element of +/// `upper`). For totally-ordered times both reduce to single-element +/// antichains. +pub(crate) fn build_time_bounds(elements: I) -> (Antichain, Antichain) +where + T: timely::PartialOrder + Clone, + I: IntoIterator, +{ + let mut lower: Antichain = Antichain::new(); + let mut max_set: Vec = Vec::new(); + for t in elements { + lower.insert(t.clone()); + if max_set.iter().any(|m| t.less_equal(m)) { + continue; + } + max_set.retain(|m| !m.less_than(&t)); + max_set.push(t); + } + (lower, Antichain::from(max_set)) +} + +/// Reduce a `Vec` to its maximal elements (mutually incomparable +/// elements such that no other vec element dominates them) and wrap +/// in an [`Antichain`]. +pub(crate) fn reduce_to_maximal(elements: Vec) -> Antichain { + let mut max_set: Vec = Vec::with_capacity(elements.len()); + for t in elements { + if max_set.iter().any(|m| t.less_equal(m)) { + continue; + } + max_set.retain(|m| !m.less_than(&t)); + max_set.push(t); + } + Antichain::from(max_set) +} + +pub use temporal::{MergeBucket, TemporalBucketingMergeBatcher}; + +mod temporal { + //! Temporal bucketing variant of [`super::MergeBatcher`]. + //! + //! Hybrid layout: incoming data flows into a flat `chains: + //! Vec>` exactly like plain [`super::MergeBatcher`]. On + //! `seal`, the flat chains are merged and extracted against the seal + //! upper; the `kept` (future-stamped) side is the only data that enters a + //! [`BucketChain`] of [`MergeBucket`]s. Subsequent seals peel mature + //! buckets out of the bucket chain and merge their contents with the + //! current flat extract. + //! + //! For workloads where most data is current (`kept` is small at each + //! seal) the bucket chain stays nearly empty, so per-push and per-seal + //! cost is dominated by the plain merge-batcher path. The bucket chain + //! only pays its splitting/merging overhead for the future tail of the + //! input. The contract of `Batcher::seal` is preserved: only updates + //! with time strictly less than `upper` are emitted. + + use std::marker::PhantomData; + + use differential_dataflow::logging::Logger; + use differential_dataflow::trace::{Batcher, Builder, Description}; + use timely::container::{ContainerBuilder, PushInto}; + use timely::progress::frontier::AntichainRef; + use timely::progress::{Antichain, Timestamp}; + + use crate::merge_batcher::Merger; + use crate::temporal::{Bucket, BucketChain, BucketTimestamp}; + + /// Fuel budget used to interleave [`BucketChain::restore`] with + /// `push_container` and `seal`. + /// + /// Matches the fuel used by the source-side bucketing operator until + /// benchmarks suggest a different value. + const RESTORE_FUEL: i64 = 1_000_000; + + /// A bucket of merge-batcher chunks for a single power-of-two time range. + /// + /// Maintains the same geometric chain invariant as [`super::MergeBatcher`]: + /// adjacent chains decrease in size, merging older chains as new ones are + /// pushed. + /// + /// Tracks `(lower, upper)` antichains bracketing the held times in + /// [`Self::bounds`] (when [`Merger::time_range`] returns `Some`) so + /// [`Bucket::split`] can short-circuit when the data lies entirely on + /// one side of the split timestamp. + pub struct MergeBucket { + /// Geometrically sized chains of sorted, consolidated chunks. + chains: Vec>, + /// `(lower, upper)` antichains over the times in [`Self::chains`]. + /// + /// `lower` contains *minimal* elements (every held time is + /// in-advance-of `lower`); `upper` contains *maximal* elements + /// (every held time is `less_equal` to some element of `upper`). + /// + /// `None` means either the bucket is empty or the merger does not + /// implement [`Merger::time_range`]; the latter forces + /// [`Bucket::split`] onto its merge+extract fallback path. + bounds: Option<(Antichain, Antichain)>, + } + + impl Default for MergeBucket { + fn default() -> Self { + Self { + chains: Vec::new(), + bounds: None, + } + } + } + + impl MergeBucket + where + M::Time: timely::PartialOrder + Clone, + { + fn is_empty(&self) -> bool { + self.chains.iter().all(|c| c.is_empty()) + } + + /// Insert `chain` and re-establish geometric chain sizes by merging + /// adjacent chains while their sizes are within a factor of two. + fn insert_chain( + &mut self, + chain: Vec, + merger: &mut M, + stash: &mut Vec, + ) { + if chain.is_empty() { + return; + } + // Union the chain's `(lower, upper)` antichains into + // `self.bounds`. If any chunk reports `None` we drop bounds + // to `None` so subsequent splits fall back to merge+extract. + self.absorb_chain_bounds(&chain); + self.chains.push(chain); + while self.chains.len() > 1 + && self.chains[self.chains.len() - 1].len() + >= self.chains[self.chains.len() - 2].len() / 2 + { + let l1 = self.chains.pop().unwrap(); + let l2 = self.chains.pop().unwrap(); + let mut out = Vec::with_capacity(l1.len() + l2.len()); + merger.merge(l1, l2, &mut out, stash); + self.chains.push(out); + } + } + + /// Update `self.bounds` to reflect the union of its current value + /// and the times in `chain`. Sets `bounds` to `None` if any + /// chunk reports `None` (no info available). + fn absorb_chain_bounds(&mut self, chain: &[M::Chunk]) { + for chunk in chain { + let Some(chunk_bounds) = M::time_range(chunk) else { + self.bounds = None; + return; + }; + self.bounds = Some(match self.bounds.take() { + Some(existing) => merge_bounds(existing, chunk_bounds), + None => chunk_bounds, + }); + } + } + + /// Pairwise-merge all chains into a single sorted chain. + /// + /// Drops `self.bounds` since the merge consumes the chains; the + /// caller is expected to discard the bucket or rebuild bounds if + /// further use is needed. + fn merge_into_one(mut self, merger: &mut M, stash: &mut Vec) -> Vec { + while self.chains.len() > 1 { + let l1 = self.chains.pop().unwrap(); + let l2 = self.chains.pop().unwrap(); + let mut out = Vec::with_capacity(l1.len() + l2.len()); + merger.merge(l1, l2, &mut out, stash); + self.chains.push(out); + } + self.chains.pop().unwrap_or_default() + } + } + + impl Bucket for MergeBucket + where + M: Merger, + { + type Timestamp = M::Time; + + fn split(self, timestamp: &Self::Timestamp, fuel: &mut i64) -> (Self, Self) { + use timely::PartialOrder; + + // Short-circuit when tracked bounds tell us the data lies + // entirely on one side of `timestamp`. This avoids the + // O(records) merge+extract walk that the fallback path would + // perform — the dominant cost when `BucketChain::peel` / + // `BucketChain::restore` cascades splits over a bucket that + // holds the whole residue (e.g. all records at a single time + // exactly equal to a seal upper). + // + // The checks use timely's [`PartialOrder`] so they're correct + // for partially-ordered timestamps too: when the antichain + // can't be totally compared against `timestamp` neither + // predicate fires and we fall through to the merge+extract + // path. + if let Some((lower, upper)) = &self.bounds { + // All held times satisfy `t <= u` for some `u` in + // `upper`. If every `u` in `upper` is `< timestamp` then + // every held time is `< timestamp` → all goes to lower + // side, upper side is empty. + if !upper.is_empty() && upper.elements().iter().all(|u| u.less_than(timestamp)) { + return (self, Self::default()); + } + // All held times satisfy `lower.less_equal(t)`. If every + // `l` in `lower` satisfies `timestamp.less_equal(l)` + // (i.e. `timestamp <= l`) then every held time is + // `>= timestamp` → all goes to upper side, lower side is + // empty. + if !lower.is_empty() && lower.elements().iter().all(|l| timestamp.less_equal(l)) { + return (Self::default(), self); + } + } + + let had_bounds = self.bounds.is_some(); + let mut merger = M::default(); + let mut stash = Vec::new(); + let merged = self.merge_into_one(&mut merger, &mut stash); + + // Account fuel by total record count of the chunks we are about to + // partition. `M::account(...).0` is the record count. + let records: usize = merged.iter().map(|c| M::account(c).0).sum(); + *fuel = fuel.saturating_sub(i64::try_from(records).unwrap_or(i64::MAX)); + + let upper_at = Antichain::from_elem(timestamp.clone()); + let mut frontier = Antichain::new(); + let mut ship = Vec::new(); + let mut keep = Vec::new(); + merger.extract( + merged, + upper_at.borrow(), + &mut frontier, + &mut ship, + &mut keep, + &mut stash, + ); + // Recompute child bounds from the freshly-extracted chunks. + // If the merger doesn't implement `time_range`, both sides + // fall back to `None` (and future splits stay on the + // fallback path). + let lower_bounds = if had_bounds { + bounds_from_chain::(&ship) + } else { + None + }; + let upper_bounds = if had_bounds { + bounds_from_chain::(&keep) + } else { + None + }; + let lower = MergeBucket { + chains: if ship.is_empty() { + Vec::new() + } else { + vec![ship] + }, + bounds: lower_bounds, + }; + let upper = MergeBucket { + chains: if keep.is_empty() { + Vec::new() + } else { + vec![keep] + }, + bounds: upper_bounds, + }; + (lower, upper) + } + } + + /// Compute `(lower, upper)` antichains over a chain by walking each + /// chunk's [`Merger::time_range`]. Returns `None` if the chain is + /// empty or any chunk reports `None`. + fn bounds_from_chain>( + chain: &[M::Chunk], + ) -> Option<(Antichain, Antichain)> { + let mut bounds: Option<(Antichain, Antichain)> = None; + for chunk in chain { + let chunk_bounds = M::time_range(chunk)?; + bounds = Some(match bounds.take() { + Some(existing) => merge_bounds(existing, chunk_bounds), + None => chunk_bounds, + }); + } + bounds + } + + /// Union two `(lower, upper)` antichain pairs. + /// + /// `lower` is an antichain of minimal elements; the union retains + /// only minimal elements from the combined set, exactly what + /// [`Antichain::insert`] does. `upper` is an antichain of maximal + /// elements; the union retains only maximal elements, computed via + /// [`reduce_to_maximal`]. + fn merge_bounds( + a: (Antichain, Antichain), + b: (Antichain, Antichain), + ) -> (Antichain, Antichain) { + let (a_lower, a_upper) = a; + let (b_lower, b_upper) = b; + let mut lower = a_lower; + for elt in b_lower.into_iter() { + lower.insert(elt); + } + let upper_elements: Vec = a_upper.into_iter().chain(b_upper).collect(); + let upper = super::reduce_to_maximal(upper_elements); + (lower, upper) + } + + /// A merge batcher that holds back future-stamped updates using a + /// [`BucketChain`], while routing all incoming data through plain + /// merge-batcher chains. + /// + /// Outer shape mirrors [`super::MergeBatcher`]: a chunker `C` produces + /// [`Merger::Chunk`]s, a merger `M` merges and extracts them. Internally, + /// the flat `chains: Vec>` field — identical to plain + /// `MergeBatcher` — receives every input chunk. A separate + /// [`BucketChain`] holds only the `kept` (future-stamped) tail produced + /// by seal-time extracts, so the bucketing machinery does no work for + /// data that is already at or below the current frontier. + pub struct TemporalBucketingMergeBatcher + where + M: Merger, + { + chunker: C, + /// Flat geometric chains, mirroring plain [`super::MergeBatcher`]. + /// Every input chunk lands here. + chains: Vec>, + /// Bucket chain holding only future-stamped data deposited by prior + /// seals' `kept` extracts. + bucket_chain: BucketChain>, + /// Total record count currently held in [`Self::bucket_chain`]. + /// + /// Maintained alongside insertions and peel-time drains so `seal` + /// can take a fast path that skips all bucket-chain work whenever + /// no future-stamped data has accumulated. For non-temporal + /// workloads this counter stays at `0` and the per-seal cost + /// matches plain [`super::MergeBatcher`]. + held_records: usize, + stash: Vec, + merger: M, + lower: Antichain, + frontier: Antichain, + #[allow(dead_code)] + logger: Option, + #[allow(dead_code)] + operator_id: usize, + _marker: PhantomData, + } + + impl TemporalBucketingMergeBatcher + where + C: ContainerBuilder, + M: Merger, + { + /// Insert a chain into the flat path with geometric merging, + /// matching plain [`super::MergeBatcher::insert_chain`]. + fn insert_chain(&mut self, chain: Vec) { + if chain.is_empty() { + return; + } + self.chains.push(chain); + while self.chains.len() > 1 + && self.chains[self.chains.len() - 1].len() + >= self.chains[self.chains.len() - 2].len() / 2 + { + let l1 = self.chains.pop().unwrap(); + let l2 = self.chains.pop().unwrap(); + let mut out = Vec::with_capacity(l1.len() + l2.len()); + self.merger.merge(l1, l2, &mut out, &mut self.stash); + self.chains.push(out); + } + } + + /// Route an entire sorted, consolidated chain into + /// [`Self::bucket_chain`]. + /// + /// The whole chain is fed as a unit — it is already a + /// geometric-merge-ready level, so the destination bucket can + /// absorb it without per-chunk pairwise merging. Routing per + /// chunk would force the bucket to rebuild the chain via + /// O(records × log chunks) merges. + /// + /// Fast path: when the chain's combined `(lower, upper)` time + /// bounds all map to a single bucket, push the chain into that + /// bucket directly. No `M::extract`. + /// + /// Slow path (straddle or unknown bounds): drive a frontier-led + /// loop that jumps to the bucket containing the next remaining + /// time, extracts that bucket's portion, and ships it. The + /// extract output frontier (the lower bound of `keep`) selects + /// the next bucket, so empty intermediate buckets are skipped + /// instead of paying an `O(records)` extract per boundary. + /// [`BucketChain::peel`] correctness depends on each bucket + /// only holding data in its declared range — peeled buckets + /// are emitted as ready without further extract, so a bucket + /// must not contain data with `t >= bucket_end`. + /// + /// Used at seal time for `kept` chains; never on the input path. + fn route_chain_into_bucket_chain(&mut self, chain: Vec) { + if chain.is_empty() { + return; + } + let total_records: usize = chain.iter().map(|c| M::account(c).0).sum(); + self.held_records = self.held_records.saturating_add(total_records); + + if self.bucket_chain.is_empty() { + // Chain was fully drained by a prior peel. Reseed with a + // single full-domain bucket and slot the chain there. + self.bucket_chain = BucketChain::new(MergeBucket::default()); + let bucket = self + .bucket_chain + .find_mut(&M::Time::minimum()) + .expect("freshly seeded chain has a bucket"); + bucket.insert_chain(chain, &mut self.merger, &mut self.stash); + return; + } + + // Fast path: union the chain's chunk bounds and check whether + // they all live in a single bucket. + let bounds = bounds_from_chain::(&chain); + let target_start = bounds.as_ref().and_then(|(lower, upper)| { + let min_t = lower.elements().first()?; + let target = self.bucket_chain.range_of(min_t)?.start; + let all_fit = upper.elements().iter().all(|u| { + self.bucket_chain + .range_of(u) + .is_some_and(|r| r.start == target) + }); + all_fit.then_some(target) + }); + if let Some(start) = target_start { + let bucket = self + .bucket_chain + .find_mut(&start) + .expect("bucket exists for known time"); + bucket.insert_chain(chain, &mut self.merger, &mut self.stash); + return; + } + + // Slow path: walk bucket starts with a monotonically + // advancing cursor, driven by the frontier returned from + // each `extract`. Seed with the chain's combined lower + // bound when known so we land on the first populated + // bucket immediately; otherwise start at the chain head + // and let the first extract supply the real frontier. + // + // Snapshot starts; we can't hold an immutable iterator + // across the `find_mut` borrow that follows each extract. + let starts: Vec = self.bucket_chain.starts().cloned().collect(); + let mut bucket_idx: usize = 0; + let mut scratch_frontier = match bounds { + Some((lower, _)) => lower, + None => Antichain::from_elem(M::Time::minimum()), + }; + let mut current = chain; + while !scratch_frontier.is_empty() && !current.is_empty() { + let next_t = scratch_frontier + .elements() + .first() + .expect("non-empty antichain") + .clone(); + // Advance the cursor until `starts[bucket_idx] <= + // next_t < starts[bucket_idx + 1]`. The total advance + // across all iterations is bounded by `starts.len()`, + // so empty intermediate buckets are skipped without + // paying an `extract` per boundary. + while bucket_idx + 1 < starts.len() && starts[bucket_idx + 1] <= next_t { + bucket_idx += 1; + } + let bucket_start = starts[bucket_idx].clone(); + // Exclusive upper is the next bucket's start; the + // unbounded last bucket has none, signaled to + // `extract` by an empty antichain. + let upper = match starts.get(bucket_idx + 1) { + Some(end) => Antichain::from_elem(end.clone()), + None => Antichain::new(), + }; + scratch_frontier.clear(); + let mut ship = Vec::new(); + let mut keep = Vec::new(); + self.merger.extract( + std::mem::take(&mut current), + upper.borrow(), + &mut scratch_frontier, + &mut ship, + &mut keep, + &mut self.stash, + ); + if !ship.is_empty() { + let bucket = self + .bucket_chain + .find_mut(&bucket_start) + .expect("bucket exists for known start"); + bucket.insert_chain(ship, &mut self.merger, &mut self.stash); + } + current = keep; + } + } + + /// Recompute `self.frontier` as the held-data lower bound, walking + /// only [`Self::bucket_chain`] (the only place data is held after a + /// seal — flat chains are emptied during the seal extract). + /// + /// A bucket's start is only a range lower bound; to produce the + /// actual minimum we merge the chains in the lowest non-empty bucket + /// and use `Merger::extract` against the bucket start so every entry + /// is classified as `keep`, which is the side that updates the + /// frontier antichain. The merged chain replaces the bucket's chains + /// so subsequent seals pay only the linear extract cost. + fn recompute_frontier(&mut self) { + self.frontier.clear(); + // Walk buckets in order, taking chains out of the first non-empty one. + // Avoids the per-seal `Vec` allocation and per-bucket `find_mut` cost + // that an outer loop driven by cloned starts would incur. + let mut found: Option<(M::Time, Vec>)> = None; + for (start, bucket) in self.bucket_chain.iter_mut() { + if !bucket.is_empty() { + found = Some((start.clone(), std::mem::take(&mut bucket.chains))); + break; + } + } + let Some((start, chains)) = found else { return }; + + let merged = MergeBucket:: { + chains, + bounds: None, + } + .merge_into_one(&mut self.merger, &mut self.stash); + let upper = Antichain::from_elem(start.clone()); + let mut ship = Vec::new(); + let mut keep = Vec::new(); + self.merger.extract( + merged, + upper.borrow(), + &mut self.frontier, + &mut ship, + &mut keep, + &mut self.stash, + ); + debug_assert!( + ship.is_empty(), + "all times in a bucket must be >= the bucket's start" + ); + let bucket = self + .bucket_chain + .find_mut(&start) + .expect("bucket still exists"); + if !keep.is_empty() { + bucket.chains.push(keep); + } + } + } + + impl Batcher for TemporalBucketingMergeBatcher + where + C: ContainerBuilder + for<'a> PushInto<&'a mut Input>, + M: Merger, + { + type Input = Input; + type Time = M::Time; + type Output = M::Chunk; + + fn new(logger: Option, operator_id: usize) -> Self { + Self { + logger, + operator_id, + chunker: C::default(), + merger: M::default(), + chains: Vec::new(), + bucket_chain: BucketChain::new(MergeBucket::default()), + held_records: 0, + stash: Vec::new(), + frontier: Antichain::new(), + lower: Antichain::from_elem(M::Time::minimum()), + _marker: PhantomData, + } + } + + fn push_container(&mut self, container: &mut Input) { + self.chunker.push_into(container); + while let Some(chunk) = self.chunker.extract() { + let chunk = std::mem::take(chunk); + self.insert_chain(vec![chunk]); + } + } + + fn seal>( + &mut self, + upper: Antichain, + ) -> B::Output { + // Drain the chunker into the flat path. + while let Some(chunk) = self.chunker.finish() { + let chunk = std::mem::take(chunk); + self.insert_chain(vec![chunk]); + } + + // Merge all flat chains into one sorted chain. + while self.chains.len() > 1 { + let l1 = self.chains.pop().unwrap(); + let l2 = self.chains.pop().unwrap(); + let mut out = Vec::with_capacity(l1.len() + l2.len()); + self.merger.merge(l1, l2, &mut out, &mut self.stash); + self.chains.push(out); + } + let merged_flat = self.chains.pop().unwrap_or_default(); + + // Extract from the flat side. `flat_ready` has `t < upper`, + // `kept` has `t >= upper` and feeds the bucket chain. We use a + // scratch frontier because `self.frontier` is recomputed below. + let mut flat_ready: Vec = Vec::new(); + let mut kept = Vec::new(); + let mut scratch_frontier = Antichain::new(); + self.merger.extract( + merged_flat, + upper.borrow(), + &mut scratch_frontier, + &mut flat_ready, + &mut kept, + &mut self.stash, + ); + + // Diagnostic counters; cheap to collect, gated to DEBUG below. + let trace_enabled = tracing::enabled!(tracing::Level::DEBUG); + let flat_ready_records: usize = if trace_enabled { + flat_ready.iter().map(|c| M::account(c).0).sum() + } else { + 0 + }; + let kept_records: usize = if trace_enabled { + kept.iter().map(|c| M::account(c).0).sum() + } else { + 0 + }; + let held_before = self.held_records; + + // Fast path: no held data and nothing new to hold. Skip the + // bucket-chain bookkeeping (peel, merge, restore, frontier + // recomputation) so the per-seal cost matches the plain + // [`super::MergeBatcher`]. + let mut peel_records: usize = 0; + let mut peeled_buckets: usize = 0; + let fast_path = self.held_records == 0 && kept.is_empty(); + let mut readied = if fast_path { + self.frontier.clear(); + flat_ready + } else { + // Future-stamped data enters the bucket chain. Route + // the whole chain at once so the destination bucket can + // adopt it as a single geometric level rather than + // pairwise-merging it back into existence. + self.route_chain_into_bucket_chain(kept); + + // Peel buckets fully below `upper`. The peel may split the + // boundary-crossing bucket; the upper half is reinserted. + let peeled = self.bucket_chain.peel(upper.borrow()); + + // Merge each peeled bucket into a single sorted chain, then + // merge all of those into one running chain. Account peeled + // records as leaving the bucket chain. + let mut peeled_chain: Vec = Vec::new(); + for bucket in peeled { + let bucket_one = bucket.merge_into_one(&mut self.merger, &mut self.stash); + if bucket_one.is_empty() { + continue; + } + let drained: usize = bucket_one.iter().map(|c| M::account(c).0).sum(); + self.held_records = self.held_records.saturating_sub(drained); + peel_records = peel_records.saturating_add(drained); + peeled_buckets = peeled_buckets.saturating_add(1); + if peeled_chain.is_empty() { + peeled_chain = bucket_one; + } else { + let mut out = Vec::with_capacity(peeled_chain.len() + bucket_one.len()); + self.merger + .merge(peeled_chain, bucket_one, &mut out, &mut self.stash); + peeled_chain = out; + } + } + + // Combine flat-side ready chunks with peeled-side chunks into a + // single sorted chain. Peeled buckets are entirely below `upper` + // by `peel`'s contract, so no further extract is needed. + let combined = if flat_ready.is_empty() { + peeled_chain + } else if peeled_chain.is_empty() { + flat_ready + } else { + let mut out = Vec::with_capacity(flat_ready.len() + peeled_chain.len()); + self.merger + .merge(flat_ready, peeled_chain, &mut out, &mut self.stash); + out + }; + + // Maintain bucket-chain shape with bounded fuel. + let mut fuel = RESTORE_FUEL; + self.bucket_chain.restore(&mut fuel); + + // Recompute the held-data frontier from the lowest non-empty + // bucket. After the seal, all data is in `bucket_chain` (the + // flat chains have been drained). For totally-ordered timestamps + // — the only kind that impl `BucketTimestamp` — this produces a + // single-element antichain at the smallest held time. + self.recompute_frontier(); + + combined + }; + + self.stash.clear(); + + tracing::debug!( + target: "mz_timely_util::merge_batcher::temporal", + operator_id = self.operator_id, + upper = ?upper.borrow(), + lower = ?self.lower.borrow(), + frontier_after = ?self.frontier.borrow(), + flat_ready_records, + kept_records, + held_before, + held_after = self.held_records, + peeled_buckets, + peel_records, + fast_path, + bucket_count = self.bucket_chain.len(), + "seal", + ); + + let description = Description::new( + self.lower.clone(), + upper.clone(), + Antichain::from_elem(M::Time::minimum()), + ); + let seal = B::seal(&mut readied, description); + self.lower = upper; + seal + } + + fn frontier(&mut self) -> AntichainRef<'_, M::Time> { + self.frontier.borrow() + } + } + + impl TemporalBucketingMergeBatcher + where + M: Merger, + { + /// Returns true if both the flat chains and every bucket are empty. + #[cfg(test)] + pub(super) fn is_chain_empty(&self) -> bool { + if !self.chains.is_empty() { + return false; + } + let starts: Vec<_> = self.bucket_chain.starts().cloned().collect(); + starts.iter().all(|s| { + self.bucket_chain + .find(s) + .map(|b| b.is_empty()) + .unwrap_or(true) + }) + } + } + + #[cfg(test)] + mod tests { + use std::marker::PhantomData; + + use differential_dataflow::trace::implementations::chunker::ContainerChunker; + use differential_dataflow::trace::{Batcher, Builder, Description}; + use timely::progress::{Antichain, Timestamp}; + + use crate::merge_batcher::TemporalBucketingMergeBatcher; + use crate::merge_batcher::container::VecMerger; + + type TestTime = u64; + type TestUpdate = (u64, TestTime, i64); + type TestBatcher = TemporalBucketingMergeBatcher< + Vec, + ContainerChunker>, + VecMerger, + >; + + /// A `Builder` that collects sealed chunks into a flat `Vec`. + struct VecCapturingBuilder(PhantomData<(D, T)>); + + impl Builder for VecCapturingBuilder { + type Input = Vec; + type Time = T; + type Output = Vec; + + fn with_capacity(_keys: usize, _vals: usize, _upds: usize) -> Self { + unimplemented!() + } + fn push(&mut self, _chunk: &mut Self::Input) { + unimplemented!() + } + fn done(self, _description: Description) -> Self::Output { + unimplemented!() + } + + fn seal( + chain: &mut Vec, + _description: Description, + ) -> Self::Output { + let mut out = Vec::new(); + for mut chunk in std::mem::take(chain) { + out.append(&mut chunk); + } + out + } + } + + fn push(batcher: &mut TestBatcher, mut updates: Vec) { + batcher.push_container(&mut updates); + } + + fn seal(batcher: &mut TestBatcher, upper: Option) -> Vec { + let antichain = match upper { + Some(u) => Antichain::from_elem(u), + None => Antichain::new(), + }; + batcher.seal::>(antichain) + } + + #[mz_ore::test] + fn distinct_timestamps_stay_distinct() { + let mut batcher = TestBatcher::new(None, 0); + let input: Vec = (0_u64..32).map(|i| (i, i, 1_i64)).collect(); + push(&mut batcher, input.clone()); + + let drained = seal(&mut batcher, None); + let mut sorted_drained = drained.clone(); + sorted_drained.sort(); + let mut sorted_input = input; + sorted_input.sort(); + assert_eq!(sorted_drained, sorted_input); + } + + #[mz_ore::test] + fn seal_returns_strictly_less_than_upper() { + let mut batcher = TestBatcher::new(None, 0); + let input: Vec = (0_u64..16).map(|i| (i, i, 1_i64)).collect(); + push(&mut batcher, input); + + let early = seal(&mut batcher, Some(8)); + for (_, t, _) in &early { + assert!(*t < 8, "seal returned t={t} not strictly less than 8"); + } + // Updates with t in [0, 8) must all be present. + let mut early_times: Vec = early.iter().map(|(_, t, _)| *t).collect(); + early_times.sort(); + assert_eq!(early_times, (0_u64..8).collect::>()); + + let rest = seal(&mut batcher, None); + let mut rest_times: Vec = rest.iter().map(|(_, t, _)| *t).collect(); + rest_times.sort(); + assert_eq!(rest_times, (8_u64..16).collect::>()); + + assert!(batcher.is_chain_empty()); + } + + #[mz_ore::test] + fn frontier_reports_held_lower_bound() { + let mut batcher = TestBatcher::new(None, 0); + let input: Vec = vec![(1, 3, 1), (2, 5, 1), (3, 7, 1), (4, 11, 1)]; + push(&mut batcher, input); + + // Seal up to 4. Held data is {5, 7, 11}; the smallest is 5. + // The frontier reflects the actual minimum held time, not the + // surrounding bucket's start. + let _ = seal(&mut batcher, Some(4)); + assert_eq!(&*batcher.frontier(), &[5_u64]); + + // Drain everything. + let _ = seal(&mut batcher, None); + assert!(batcher.frontier().is_empty()); + } + + #[mz_ore::test] + fn frontier_is_actual_min_not_bucket_start() { + // The smallest held time is 1_000_000, well above any low bucket + // boundary. The frontier should reflect that exact value, not the + // power-of-two bucket boundary that contains it. + let mut batcher = TestBatcher::new(None, 0); + let input: Vec = + (1_000_000_u64..1_000_010).map(|i| (i, i, 1_i64)).collect(); + push(&mut batcher, input); + + let _ = seal(&mut batcher, Some(500)); + assert_eq!(&*batcher.frontier(), &[1_000_000_u64]); + } + + #[mz_ore::test] + fn future_stamped_updates_are_held_back() { + let mut batcher = TestBatcher::new(None, 0); + // All updates at t >= 100, but seal upper is 50 — nothing should be released. + let input: Vec = (100_u64..120).map(|i| (i, i, 1_i64)).collect(); + push(&mut batcher, input.clone()); + + let early = seal(&mut batcher, Some(50)); + assert!(early.is_empty(), "no data with t < 50 should be released"); + + // After draining, all updates show up. + let drained = seal(&mut batcher, None); + let mut sorted = drained.clone(); + sorted.sort(); + let mut input_sorted = input; + input_sorted.sort(); + assert_eq!(sorted, input_sorted); + } + } +} + pub mod container { //! Merger implementations for the merge batcher. //! @@ -330,6 +1267,13 @@ pub mod container { keep: &mut Self, ship: &mut Self, ); + + /// Return `(lower, upper)` antichains bracketing the times in + /// `self`, or `None` if the container is empty / the impl can't + /// answer. See [`Merger::time_range`] for the contract. + fn time_range(&self) -> Option<(Antichain, Antichain)> { + None + } } /// A `Merger` for `Vec` containers, which contain owned data and need special treatment. @@ -506,6 +1450,15 @@ pub mod container { fn account(chunk: &Vec<(D, T, R)>) -> (usize, usize, usize, usize) { (chunk.len(), 0, 0, 0) } + + fn time_range(chunk: &Vec<(D, T, R)>) -> Option<(Antichain, Antichain)> { + if chunk.is_empty() { + return None; + } + Some(super::build_time_bounds( + chunk.iter().map(|(_, t, _)| t.clone()), + )) + } } /// A merger that uses internal iteration via [`InternalMerge`]. @@ -671,6 +1624,12 @@ pub mod container { fn account(chunk: &Self::Chunk) -> (usize, usize, usize, usize) { chunk.account() } + + fn time_range( + chunk: &Self::Chunk, + ) -> Option<(Antichain, Antichain)> { + chunk.time_range() + } } /// Implementation of `InternalMerge` for `Vec<(D, T, R)>`. @@ -772,6 +1731,15 @@ pub mod container { *position += 1; } } + + fn time_range(&self) -> Option<(Antichain, Antichain)> { + if self.is_empty() { + return None; + } + Some(crate::merge_batcher::build_time_bounds( + self.iter().map(|(_, t, _)| t.clone()), + )) + } } } } diff --git a/src/timely-util/src/temporal.rs b/src/timely-util/src/temporal.rs index c7f155213bde8..67cd00b95da42 100644 --- a/src/timely-util/src/temporal.rs +++ b/src/timely-util/src/temporal.rs @@ -163,6 +163,20 @@ impl BucketChain { /// bucket of -2 bits just before the smallest bucket. #[inline] pub fn restore(&mut self, fuel: &mut i64) { + if self.content.is_empty() { + return; + } + // Fast path: if every bucket already satisfies the chain property, no work needed. + let mut last_bits = -2_isize; + let well_formed = self.content.values().all(|(bits, _)| { + let cur = isize::cast_from(*bits); + let ok = cur <= last_bits + 2; + last_bits = cur; + ok + }); + if well_formed { + return; + } // We could write this in terms of a cursor API, but it's not stable yet. Instead, we // allocate a new map and move elements over. let mut new = BTreeMap::default(); @@ -184,12 +198,27 @@ impl BucketChain { self.content = new; } + /// Iterate `(start, &mut bucket)` pairs in ascending start order. + /// + /// Used by callers that need to mutate buckets while walking the chain + /// without paying the per-bucket `find_mut` cost. + #[inline] + pub fn iter_mut(&mut self) -> impl Iterator { + self.content.iter_mut().map(|(t, (_, s))| (t, s)) + } + /// Returns `true` if the chain is empty. This means there are no outstanding times left. #[inline(always)] pub fn is_empty(&self) -> bool { self.content.is_empty() } + /// Iterate over bucket lower bounds in ascending order. + #[inline] + pub fn starts(&self) -> impl Iterator { + self.content.keys() + } + /// The number of buckets in the chain. #[inline(always)] pub fn len(&self) -> usize { @@ -210,21 +239,25 @@ impl BucketChain { } } +/// Test-only [`BucketTimestamp`] impls, hoisted to crate scope so other +/// `#[cfg(test)]` modules in this crate can reuse them. #[cfg(test)] -mod tests { - use super::*; - - impl BucketTimestamp for u8 { - fn advance_by_power_of_two(&self, bits: u32) -> Option { - self.checked_add(1_u8.checked_shl(bits)?) - } +impl BucketTimestamp for u8 { + fn advance_by_power_of_two(&self, bits: u32) -> Option { + self.checked_add(1_u8.checked_shl(bits)?) } +} - impl BucketTimestamp for u64 { - fn advance_by_power_of_two(&self, bits: u32) -> Option { - self.checked_add(1_u64.checked_shl(bits)?) - } +#[cfg(test)] +impl BucketTimestamp for u64 { + fn advance_by_power_of_two(&self, bits: u32) -> Option { + self.checked_add(1_u64.checked_shl(bits)?) } +} + +#[cfg(test)] +mod tests { + use super::*; struct TestStorage { inner: Vec, From 93d1eef4a13988c2f9550a86ec557074279ed1ec Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Wed, 6 May 2026 20:52:20 +0200 Subject: [PATCH 3/5] compute: drop source-side temporal_bucket operator The arrange-side `TemporalBucketingMergeBatcher` (per-`ArrangeBy`) fully covers the source-side delay this operator used to provide. Keeping both pays a double bucketing cost on every dataflow that touches the source. Remove the operator and its dyncfg `TEMPORAL_BUCKETING_SUMMARY`; the bucket chain advances naturally with the seal upper. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/compute-types/src/dyncfgs.rs | 8 - src/compute/src/extensions.rs | 1 - src/compute/src/extensions/temporal_bucket.rs | 257 ------------------ src/compute/src/render.rs | 16 +- 4 files changed, 1 insertion(+), 281 deletions(-) delete mode 100644 src/compute/src/extensions/temporal_bucket.rs diff --git a/src/compute-types/src/dyncfgs.rs b/src/compute-types/src/dyncfgs.rs index de710523f1f18..4815652cc999e 100644 --- a/src/compute-types/src/dyncfgs.rs +++ b/src/compute-types/src/dyncfgs.rs @@ -66,13 +66,6 @@ pub const ENABLE_TEMPORAL_BUCKETING: Config = Config::new( "Whether to enable temporal bucketing in compute.", ); -/// The summary to apply to the frontier in temporal bucketing in compute. -pub const TEMPORAL_BUCKETING_SUMMARY: Config = Config::new( - "compute_temporal_bucketing_summary", - Duration::from_secs(2), - "The summary to apply to frontiers in temporal bucketing in compute.", -); - /// The yielding behavior with which linear joins should be rendered. pub const LINEAR_JOIN_YIELDING: Config<&str> = Config::new( "linear_join_yielding", @@ -386,7 +379,6 @@ pub fn all_dyncfgs(configs: ConfigSet) -> ConfigSet { .add(&CORRECTION_V2_CHAIN_PROPORTIONALITY) .add(&CORRECTION_V2_CHUNK_SIZE) .add(&ENABLE_TEMPORAL_BUCKETING) - .add(&TEMPORAL_BUCKETING_SUMMARY) .add(&LINEAR_JOIN_YIELDING) .add(&ENABLE_LGALLOC) .add(&LGALLOC_BACKGROUND_INTERVAL) diff --git a/src/compute/src/extensions.rs b/src/compute/src/extensions.rs index 3cc0e64657f0e..89dd28b94bfcb 100644 --- a/src/compute/src/extensions.rs +++ b/src/compute/src/extensions.rs @@ -11,4 +11,3 @@ pub(crate) mod arrange; pub(crate) mod reduce; -pub(crate) mod temporal_bucket; diff --git a/src/compute/src/extensions/temporal_bucket.rs b/src/compute/src/extensions/temporal_bucket.rs deleted file mode 100644 index 2ef91817a9220..0000000000000 --- a/src/compute/src/extensions/temporal_bucket.rs +++ /dev/null @@ -1,257 +0,0 @@ -// Copyright Materialize, Inc. and contributors. All rights reserved. -// -// Use of this software is governed by the Business Source License -// included in the LICENSE file. -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0. - -//! Utilities and stream extensions for temporal bucketing. - -use std::marker::PhantomData; - -use differential_dataflow::Hashable; -use differential_dataflow::difference::Semigroup; -use differential_dataflow::lattice::Lattice; -use differential_dataflow::trace::{Batcher, Builder, Description}; -use mz_timely_util::columnation::{ColInternalMerger, ColumnationChunker, ColumnationStack}; -use mz_timely_util::merge_batcher::MergeBatcher; -use mz_timely_util::temporal::{Bucket, BucketChain, BucketTimestamp}; -use timely::container::PushInto; -use timely::dataflow::channels::pact::Exchange; -use timely::dataflow::operators::Operator; -use timely::dataflow::{Stream, StreamVec}; -use timely::order::TotalOrder; -use timely::progress::{Antichain, PathSummary, Timestamp}; -use timely::{ContainerBuilder, ExchangeData, PartialOrder}; - -use crate::typedefs::MzData; - -/// Sort outstanding updates into a [`BucketChain`], and reveal data not in advance of the input -/// frontier. Retains a capability at the last input frontier to retain the right to produce data -/// at times between the last input frontier and the current input frontier. -pub trait TemporalBucketing<'scope, T: Timestamp, O> { - /// Construct a new stream that stores updates into a [`BucketChain`] and reveals data - /// not in advance of the frontier. Data that is within `threshold` distance of the input - /// frontier or the `as_of` is passed through without being stored in the chain. - fn bucket( - self, - as_of: Antichain, - threshold: T::Summary, - ) -> Stream<'scope, T, CB::Container> - where - CB: ContainerBuilder + PushInto; -} - -/// Implementation for streams in scopes where timestamps define a total order. -impl<'scope, T, D> TemporalBucketing<'scope, T, (D, T, mz_repr::Diff)> - for StreamVec<'scope, T, (D, T, mz_repr::Diff)> -where - T: Timestamp + ExchangeData + MzData + BucketTimestamp + TotalOrder + Lattice, - D: ExchangeData + MzData + Ord + Clone + std::fmt::Debug + Hashable, -{ - fn bucket( - self, - as_of: Antichain, - threshold: T::Summary, - ) -> Stream<'scope, T, CB::Container> - where - CB: ContainerBuilder + PushInto<(D, T, mz_repr::Diff)>, - { - let scope = self.scope(); - let logger = scope - .worker() - .logger_for("differential/arrange") - .map(Into::into); - - let pact = Exchange::new(|(d, _, _): &(D, T, mz_repr::Diff)| d.hashed().into()); - self.unary_frontier::(pact, "Temporal delay", |cap, info| { - let mut chain = BucketChain::new(MergeBatcherWrapper::new(logger, info.global_id)); - let activator = scope.activator_for(info.address); - - // Cap tracking the lower bound of potentially outstanding data. - let mut cap = Some(cap); - - // Buffer for data to be inserted into the chain. - let mut buffer = Vec::new(); - - move |(input, frontier), output| { - // The upper frontier is the join of the input frontier and the `as_of` frontier, - // with the `threshold` summary applied to it. - let mut upper = Antichain::new(); - for time1 in &frontier.frontier() { - for time2 in as_of.elements() { - // TODO: Use `join_assign` if we ever use a timestamp with allocations. - if let Some(time) = threshold.results_in(&time1.join(time2)) { - upper.insert(time); - } - } - } - - input.for_each_time(|time, data| { - let mut session = output.session_with_builder(&time); - for data in data { - // Skip data that is about to be revealed. - let pass_through = data.extract_if(.., |(_, t, _)| !upper.less_equal(t)); - session.give_iterator(pass_through); - - // Sort data by time, then drain it into a buffer that contains data for a - // single bucket. We scan the data for ranges of time that fall into the same - // bucket so we can push batches of data at once. - data.sort_unstable_by(|(_, t, _), (_, t2, _)| t.cmp(t2)); - - let mut drain = data.drain(..); - if let Some((datum, time, diff)) = drain.next() { - let mut range = chain.range_of(&time).expect("Must exist"); - buffer.push((datum, time, diff)); - for (datum, time, diff) in drain { - // If we have a range, check if the time is not within it. - if !range.contains(&time) { - // If the time is outside the range, push the current buffer - // to the chain and reset the range. - if !buffer.is_empty() { - let bucket = - chain.find_mut(&range.start).expect("Must exist"); - bucket.inner.push_container(&mut buffer); - buffer.clear(); - } - range = chain.range_of(&time).expect("Must exist"); - } - buffer.push((datum, time, diff)); - } - - // Handle leftover data in the buffer. - if !buffer.is_empty() { - let bucket = chain.find_mut(&range.start).expect("Must exist"); - bucket.inner.push_container(&mut buffer); - buffer.clear(); - } - } - } - }); - - // Check for data that is ready to be revealed. - let peeled = chain.peel(upper.borrow()); - if let Some(cap) = cap.as_ref() { - let mut session = output.session_with_builder(cap); - for stack in peeled.into_iter().flat_map(|x| x.done()) { - // TODO: If we have a columnar merge batcher, cloning won't be necessary. - session.give_iterator(stack.iter().cloned()); - } - } else { - // If we don't have a cap, we should not have any data to reveal. - assert_eq!( - peeled.into_iter().flat_map(|x| x.done()).next(), - None, - "Unexpected data revealed without a cap." - ); - } - - // Downgrade the cap to the current input frontier. - if frontier.is_empty() || upper.is_empty() { - cap = None; - } else if let Some(cap) = cap.as_mut() { - // TODO: This assumes that the time is total ordered. - cap.downgrade(&upper[0]); - } - - // Maintain the bucket chain by restoring it with fuel. - let mut fuel = 1_000_000; - chain.restore(&mut fuel); - if fuel <= 0 { - // If we run out of fuel, we activate the operator to continue processing. - activator.activate(); - } - } - }) - } -} - -/// A wrapper around `MergeBatcher` that implements the `Storage` trait for bucketing. -struct MergeBatcherWrapper -where - D: MzData + Ord + Clone, - T: MzData + Ord + PartialOrder + Clone, - R: MzData + Semigroup + Default, -{ - logger: Option, - operator_id: usize, - inner: MergeBatcher, ColumnationChunker<(D, T, R)>, ColInternalMerger>, -} - -impl MergeBatcherWrapper -where - D: MzData + Ord + Clone, - T: MzData + Ord + PartialOrder + Clone + Timestamp, - R: MzData + Semigroup + Default, -{ - /// Construct a new `MergeBatcherWrapper` with the given logger and operator ID. - fn new(logger: Option, operator_id: usize) -> Self { - Self { - logger: logger.clone(), - operator_id, - inner: MergeBatcher::new(logger, operator_id), - } - } - - /// Reveal the contents of the `MergeBatcher`, returning a vector of `ColumnationStack`s. - fn done(mut self) -> Vec> { - self.inner.seal::>(Antichain::new()) - } -} - -impl Bucket for MergeBatcherWrapper -where - D: MzData + Ord + Clone + 'static, - T: MzData + Ord + PartialOrder + Clone + 'static + BucketTimestamp, - R: MzData + Semigroup + Default, -{ - type Timestamp = T; - - fn split(mut self, timestamp: &Self::Timestamp, fuel: &mut i64) -> (Self, Self) { - // The implementation isn't tuned for performance. We should not bounce in and out of - // different containers when not needed. The merge batcher we use can only accept - // vectors as inputs, but not any other container type. - // TODO: Allow the merge batcher to accept more generic containers. - let upper = Antichain::from_elem(timestamp.clone()); - let mut lower = Self::new(self.logger.clone(), self.operator_id); - let mut buffer = Vec::new(); - for chunk in self.inner.seal::>(upper) { - *fuel = fuel.saturating_sub(chunk.len().try_into().expect("must fit")); - // TODO: Avoid this cloning. - buffer.extend(chunk.into_iter().cloned()); - lower.inner.push_container(&mut buffer); - buffer.clear(); - } - (lower, self) - } -} - -struct CapturingBuilder(D, PhantomData); - -impl Builder for CapturingBuilder { - type Input = D; - type Time = T; - type Output = Vec; - - fn with_capacity(_keys: usize, _vals: usize, _upds: usize) -> Self { - // Not needed for this implementation. - unimplemented!() - } - - fn push(&mut self, _chunk: &mut Self::Input) { - // Not needed for this implementation. - unimplemented!() - } - - fn done(self, _description: Description) -> Self::Output { - // Not needed for this implementation. - unimplemented!() - } - - #[inline] - fn seal(chain: &mut Vec, _description: Description) -> Self::Output { - std::mem::take(chain) - } -} diff --git a/src/compute/src/render.rs b/src/compute/src/render.rs index e5f8f32ef1e16..deca03ef3af72 100644 --- a/src/compute/src/render.rs +++ b/src/compute/src/render.rs @@ -122,7 +122,7 @@ use mz_compute_types::dataflows::{DataflowDescription, IndexDesc}; use mz_compute_types::dyncfgs::{ COMPUTE_APPLY_COLUMN_DEMANDS, COMPUTE_LOGICAL_BACKPRESSURE_INFLIGHT_SLACK, COMPUTE_LOGICAL_BACKPRESSURE_MAX_RETAINED_CAPABILITIES, ENABLE_COMPUTE_LOGICAL_BACKPRESSURE, - ENABLE_TEMPORAL_BUCKETING, SUBSCRIBE_SNAPSHOT_OPTIMIZATION, TEMPORAL_BUCKETING_SUMMARY, + SUBSCRIBE_SNAPSHOT_OPTIMIZATION, }; use mz_compute_types::plan::LirId; use mz_compute_types::plan::render_plan::{ @@ -138,7 +138,6 @@ use mz_timely_util::operator::{CollectionExt, StreamExt}; use mz_timely_util::probe::{Handle as MzProbeHandle, ProbeNotify}; use mz_timely_util::scope_label::ScopeExt; use timely::PartialOrder; -use timely::container::CapacityContainerBuilder; use timely::dataflow::channels::pact::Pipeline; use timely::dataflow::operators::vec::ToStream; use timely::dataflow::operators::vec::{BranchWhen, Filter}; @@ -154,7 +153,6 @@ use crate::arrangement::manager::TraceBundle; use crate::compute_state::ComputeState; use crate::extensions::arrange::{KeyCollection, MzArrange}; use crate::extensions::reduce::MzReduce; -use crate::extensions::temporal_bucket::TemporalBucketing; use crate::logging::compute::{ ComputeEvent, DataflowGlobal, LirMapping, LirMetadata, LogDataflowErrors, OperatorHydration, }; @@ -453,18 +451,6 @@ pub fn build_compute_dataflow( ); for (id, (oks, errs)) in imported_sources.into_iter() { - let oks = if ENABLE_TEMPORAL_BUCKETING.get(&compute_state.worker_config) { - let as_of = context.as_of_frontier.clone(); - let summary = TEMPORAL_BUCKETING_SUMMARY - .get(&compute_state.worker_config) - .try_into() - .expect("must fit"); - oks.inner - .bucket::>(as_of, summary) - .as_collection() - } else { - oks - }; let bundle = crate::render::CollectionBundle::from_collections( oks.enter_region(region), errs.enter_region(region), From 34905768c7693651aef602681b5bf3e2eeb5f280 Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Wed, 6 May 2026 18:55:16 +0200 Subject: [PATCH 4/5] test/feature-benchmark: add temporal scenarios Cherry-picked from #35935. Two scenarios exercise the bucketing path: * TemporalFilterIndexed: index hydration over a temporal-filtered view of a 10M-row materialized view. * TemporalFilterSustainedInsert: per-insert latency through a pre-warmed indexed temporal view. --- .../feature_benchmark/scenarios/temporal.py | 122 ++++++++++++++++++ test/feature-benchmark/mzcompose.py | 1 + 2 files changed, 123 insertions(+) create mode 100644 misc/python/materialize/feature_benchmark/scenarios/temporal.py diff --git a/misc/python/materialize/feature_benchmark/scenarios/temporal.py b/misc/python/materialize/feature_benchmark/scenarios/temporal.py new file mode 100644 index 0000000000000..d595d0fe18ca2 --- /dev/null +++ b/misc/python/materialize/feature_benchmark/scenarios/temporal.py @@ -0,0 +1,122 @@ +# Copyright Materialize, Inc. and contributors. All rights reserved. +# +# Use of this software is governed by the Business Source License +# included in the LICENSE file at the root of this repository. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0. + +from datetime import date, timedelta +from textwrap import dedent + +from materialize.feature_benchmark.action import Action, TdAction +from materialize.feature_benchmark.measurement_source import MeasurementSource, Td +from materialize.feature_benchmark.scenario import Scenario + + +class TemporalFilter(Scenario): + """Feature benchmarks related to temporal filters using mz_now().""" + + +class TemporalFilterIndexed(TemporalFilter): + """Measure the time to create an index on a view with a temporal filter over 10M rows. + + The view applies a temporal filter (mz_now() range) on top of a materialized + view joined with a table, ensuring the collection has future updates that + require temporal bucketing before arrangement. + """ + + FIXED_SCALE = True + + def init(self) -> list[Action]: + return [ + TdAction(dedent(""" + > DROP TABLE IF EXISTS anchor CASCADE; + > CREATE TABLE anchor (a int); + > INSERT INTO anchor VALUES (1); + """)), + ] + + def benchmark(self) -> MeasurementSource: + today = date.today() + next_week = today + timedelta(days=7) + return Td(dedent(f""" + > DROP VIEW IF EXISTS v_temporal CASCADE; + > DROP MATERIALIZED VIEW IF EXISTS mv_temporal CASCADE; + + > CREATE MATERIALIZED VIEW mv_temporal AS + SELECT x, a + FROM generate_series(1, 10000000) AS x + CROSS JOIN anchor; + + > SELECT COUNT(*) FROM mv_temporal; + 10000000 + + > SELECT 1 + /* A */ + 1 + + > CREATE VIEW v_temporal AS + SELECT x, a FROM mv_temporal + WHERE mz_now() >= extract(epoch from timestamp '{today}')::uint8 * 1000 + AND mz_now() < extract(epoch from timestamp '{next_week}')::uint8 * 1000; + + > CREATE INDEX idx_temporal ON v_temporal (x); + + > SELECT COUNT(*) FROM v_temporal + /* B */ + 10000000 + """)) + + +class TemporalFilterSustainedInsert(TemporalFilter): + """Measure ingestion latency through a temporal-filtered indexed view. + + Pre-loads 10M rows into a table that's filtered by a temporal predicate, + with the filtered view indexed. Then drives several small inserts and + times how long it takes for the index to reflect them. + + Each input row produces an insert at today + a retract at next-week + through the temporal MFP. Without bucketing both updates land in the + index's merge batcher, growing the arrangement spine; with bucketing, + only today's update lands in the spine while future retractions + accumulate in the BucketChain. The smaller spine keeps per-insert + incremental merge work cheaper and that work is on the critical path + of the trailing SELECT. + """ + + FIXED_SCALE = True + + def benchmark(self) -> MeasurementSource: + today = date.today() + next_week = today + timedelta(days=7) + return Td(dedent(f""" + > DROP TABLE IF EXISTS t_temporal CASCADE; + > CREATE TABLE t_temporal (x int); + > INSERT INTO t_temporal SELECT generate_series(1, 10000000); + + > CREATE VIEW v_temporal AS + SELECT x FROM t_temporal + WHERE mz_now() >= extract(epoch from timestamp '{today}')::uint8 * 1000 + AND mz_now() < extract(epoch from timestamp '{next_week}')::uint8 * 1000; + + > CREATE INDEX idx_temporal ON v_temporal (x); + + > SELECT COUNT(*) FROM v_temporal; + 10000000 + + > SELECT 1 + /* A */ + 1 + + > INSERT INTO t_temporal SELECT generate_series(10000001, 10100000); + > INSERT INTO t_temporal SELECT generate_series(10100001, 10200000); + > INSERT INTO t_temporal SELECT generate_series(10200001, 10300000); + > INSERT INTO t_temporal SELECT generate_series(10300001, 10400000); + > INSERT INTO t_temporal SELECT generate_series(10400001, 10500000); + + > SELECT COUNT(*) FROM v_temporal + /* B */ + 10500000 + """)) diff --git a/test/feature-benchmark/mzcompose.py b/test/feature-benchmark/mzcompose.py index f169bc84c36fb..3ad49b02704fb 100644 --- a/test/feature-benchmark/mzcompose.py +++ b/test/feature-benchmark/mzcompose.py @@ -81,6 +81,7 @@ from materialize.feature_benchmark.scenarios.scale import * # noqa: F401 F403 from materialize.feature_benchmark.scenarios.skew import * # noqa: F401 F403 from materialize.feature_benchmark.scenarios.subscribe import * # noqa: F401 F403 +from materialize.feature_benchmark.scenarios.temporal import * # noqa: F401 F403 from materialize.feature_benchmark.termination import ( NormalDistributionOverlap, ProbForMin, From a637dd9e4860d3b3ee64eb9c0acb44de95b8a57a Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Thu, 7 May 2026 16:48:05 +0200 Subject: [PATCH 5/5] timely-util: coarsen TemporalBucketingMergeBatcher frontier to bucket start Replace the per-seal merge+extract over the lowest non-empty bucket with a walk that returns that bucket's start. The bucket start is a valid lower bound on every held time in the bucket, so the contract `arrange_core` relies on (`Batcher::frontier()` returning a lower bound on held data) is preserved. The reported frontier may lag the true minimum by up to one bucket span, which delays capability release on the arrangement output but does not affect correctness. `BucketChain::iter_mut` had a single caller (the old `recompute_frontier`) and is replaced by a non-mut `iter`, since the frontier walk now reads but does not modify buckets. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/timely-util/src/merge_batcher.rs | 88 ++++++++-------------------- src/timely-util/src/temporal.rs | 9 +-- 2 files changed, 27 insertions(+), 70 deletions(-) diff --git a/src/timely-util/src/merge_batcher.rs b/src/timely-util/src/merge_batcher.rs index 68dc0ea816a76..e1bedda79a9ee 100644 --- a/src/timely-util/src/merge_batcher.rs +++ b/src/timely-util/src/merge_batcher.rs @@ -785,56 +785,25 @@ mod temporal { } } - /// Recompute `self.frontier` as the held-data lower bound, walking - /// only [`Self::bucket_chain`] (the only place data is held after a - /// seal — flat chains are emptied during the seal extract). - /// - /// A bucket's start is only a range lower bound; to produce the - /// actual minimum we merge the chains in the lowest non-empty bucket - /// and use `Merger::extract` against the bucket start so every entry - /// is classified as `keep`, which is the side that updates the - /// frontier antichain. The merged chain replaces the bucket's chains - /// so subsequent seals pay only the linear extract cost. + /// Set `self.frontier` to the lower bound of the lowest non-empty + /// bucket, or empty when no held data exists. The bucket start is a + /// valid lower bound on the times in that bucket (every held time + /// satisfies `start.less_equal(t)`), so reporting it preserves the + /// `Batcher::frontier` contract — at the cost of up to one bucket + /// span of lag relative to the actual minimum held time. The + /// bucketing already trades input-side temporal precision for batched + /// releases; aligning the reported frontier with the same bucket + /// boundaries is internally consistent and removes the per-seal + /// merge+extract over the lowest non-empty bucket. fn recompute_frontier(&mut self) { self.frontier.clear(); - // Walk buckets in order, taking chains out of the first non-empty one. - // Avoids the per-seal `Vec` allocation and per-bucket `find_mut` cost - // that an outer loop driven by cloned starts would incur. - let mut found: Option<(M::Time, Vec>)> = None; - for (start, bucket) in self.bucket_chain.iter_mut() { - if !bucket.is_empty() { - found = Some((start.clone(), std::mem::take(&mut bucket.chains))); - break; - } - } - let Some((start, chains)) = found else { return }; - - let merged = MergeBucket:: { - chains, - bounds: None, - } - .merge_into_one(&mut self.merger, &mut self.stash); - let upper = Antichain::from_elem(start.clone()); - let mut ship = Vec::new(); - let mut keep = Vec::new(); - self.merger.extract( - merged, - upper.borrow(), - &mut self.frontier, - &mut ship, - &mut keep, - &mut self.stash, - ); - debug_assert!( - ship.is_empty(), - "all times in a bucket must be >= the bucket's start" - ); - let bucket = self + if let Some(start) = self .bucket_chain - .find_mut(&start) - .expect("bucket still exists"); - if !keep.is_empty() { - bucket.chains.push(keep); + .iter() + .find(|(_, b)| !b.is_empty()) + .map(|(t, _)| t.clone()) + { + self.frontier = Antichain::from_elem(start); } } } @@ -1152,30 +1121,21 @@ mod temporal { push(&mut batcher, input); // Seal up to 4. Held data is {5, 7, 11}; the smallest is 5. - // The frontier reflects the actual minimum held time, not the - // surrounding bucket's start. + // The reported frontier is the start of the bucket holding 5, + // which is a valid lower bound on the held times (`<= 5`). let _ = seal(&mut batcher, Some(4)); - assert_eq!(&*batcher.frontier(), &[5_u64]); + let f = batcher.frontier(); + let bound = *f.first().expect("frontier non-empty while data is held"); + assert!( + bound <= 5, + "reported frontier {bound} must be a lower bound of held min 5" + ); // Drain everything. let _ = seal(&mut batcher, None); assert!(batcher.frontier().is_empty()); } - #[mz_ore::test] - fn frontier_is_actual_min_not_bucket_start() { - // The smallest held time is 1_000_000, well above any low bucket - // boundary. The frontier should reflect that exact value, not the - // power-of-two bucket boundary that contains it. - let mut batcher = TestBatcher::new(None, 0); - let input: Vec = - (1_000_000_u64..1_000_010).map(|i| (i, i, 1_i64)).collect(); - push(&mut batcher, input); - - let _ = seal(&mut batcher, Some(500)); - assert_eq!(&*batcher.frontier(), &[1_000_000_u64]); - } - #[mz_ore::test] fn future_stamped_updates_are_held_back() { let mut batcher = TestBatcher::new(None, 0); diff --git a/src/timely-util/src/temporal.rs b/src/timely-util/src/temporal.rs index 67cd00b95da42..2eec1ee2e0909 100644 --- a/src/timely-util/src/temporal.rs +++ b/src/timely-util/src/temporal.rs @@ -198,13 +198,10 @@ impl BucketChain { self.content = new; } - /// Iterate `(start, &mut bucket)` pairs in ascending start order. - /// - /// Used by callers that need to mutate buckets while walking the chain - /// without paying the per-bucket `find_mut` cost. + /// Iterate `(start, &bucket)` pairs in ascending start order. #[inline] - pub fn iter_mut(&mut self) -> impl Iterator { - self.content.iter_mut().map(|(t, (_, s))| (t, s)) + pub fn iter(&self) -> impl Iterator { + self.content.iter().map(|(t, (_, s))| (t, s)) } /// Returns `true` if the chain is empty. This means there are no outstanding times left.