diff --git a/misc/python/materialize/feature_benchmark/scenarios/temporal.py b/misc/python/materialize/feature_benchmark/scenarios/temporal.py new file mode 100644 index 0000000000000..d595d0fe18ca2 --- /dev/null +++ b/misc/python/materialize/feature_benchmark/scenarios/temporal.py @@ -0,0 +1,122 @@ +# Copyright Materialize, Inc. and contributors. All rights reserved. +# +# Use of this software is governed by the Business Source License +# included in the LICENSE file at the root of this repository. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0. + +from datetime import date, timedelta +from textwrap import dedent + +from materialize.feature_benchmark.action import Action, TdAction +from materialize.feature_benchmark.measurement_source import MeasurementSource, Td +from materialize.feature_benchmark.scenario import Scenario + + +class TemporalFilter(Scenario): + """Feature benchmarks related to temporal filters using mz_now().""" + + +class TemporalFilterIndexed(TemporalFilter): + """Measure the time to create an index on a view with a temporal filter over 10M rows. + + The view applies a temporal filter (mz_now() range) on top of a materialized + view joined with a table, ensuring the collection has future updates that + require temporal bucketing before arrangement. + """ + + FIXED_SCALE = True + + def init(self) -> list[Action]: + return [ + TdAction(dedent(""" + > DROP TABLE IF EXISTS anchor CASCADE; + > CREATE TABLE anchor (a int); + > INSERT INTO anchor VALUES (1); + """)), + ] + + def benchmark(self) -> MeasurementSource: + today = date.today() + next_week = today + timedelta(days=7) + return Td(dedent(f""" + > DROP VIEW IF EXISTS v_temporal CASCADE; + > DROP MATERIALIZED VIEW IF EXISTS mv_temporal CASCADE; + + > CREATE MATERIALIZED VIEW mv_temporal AS + SELECT x, a + FROM generate_series(1, 10000000) AS x + CROSS JOIN anchor; + + > SELECT COUNT(*) FROM mv_temporal; + 10000000 + + > SELECT 1 + /* A */ + 1 + + > CREATE VIEW v_temporal AS + SELECT x, a FROM mv_temporal + WHERE mz_now() >= extract(epoch from timestamp '{today}')::uint8 * 1000 + AND mz_now() < extract(epoch from timestamp '{next_week}')::uint8 * 1000; + + > CREATE INDEX idx_temporal ON v_temporal (x); + + > SELECT COUNT(*) FROM v_temporal + /* B */ + 10000000 + """)) + + +class TemporalFilterSustainedInsert(TemporalFilter): + """Measure ingestion latency through a temporal-filtered indexed view. + + Pre-loads 10M rows into a table that's filtered by a temporal predicate, + with the filtered view indexed. Then drives several small inserts and + times how long it takes for the index to reflect them. + + Each input row produces an insert at today + a retract at next-week + through the temporal MFP. Without bucketing both updates land in the + index's merge batcher, growing the arrangement spine; with bucketing, + only today's update lands in the spine while future retractions + accumulate in the BucketChain. The smaller spine keeps per-insert + incremental merge work cheaper and that work is on the critical path + of the trailing SELECT. + """ + + FIXED_SCALE = True + + def benchmark(self) -> MeasurementSource: + today = date.today() + next_week = today + timedelta(days=7) + return Td(dedent(f""" + > DROP TABLE IF EXISTS t_temporal CASCADE; + > CREATE TABLE t_temporal (x int); + > INSERT INTO t_temporal SELECT generate_series(1, 10000000); + + > CREATE VIEW v_temporal AS + SELECT x FROM t_temporal + WHERE mz_now() >= extract(epoch from timestamp '{today}')::uint8 * 1000 + AND mz_now() < extract(epoch from timestamp '{next_week}')::uint8 * 1000; + + > CREATE INDEX idx_temporal ON v_temporal (x); + + > SELECT COUNT(*) FROM v_temporal; + 10000000 + + > SELECT 1 + /* A */ + 1 + + > INSERT INTO t_temporal SELECT generate_series(10000001, 10100000); + > INSERT INTO t_temporal SELECT generate_series(10100001, 10200000); + > INSERT INTO t_temporal SELECT generate_series(10200001, 10300000); + > INSERT INTO t_temporal SELECT generate_series(10300001, 10400000); + > INSERT INTO t_temporal SELECT generate_series(10400001, 10500000); + + > SELECT COUNT(*) FROM v_temporal + /* B */ + 10500000 + """)) diff --git a/src/compute-types/src/dyncfgs.rs b/src/compute-types/src/dyncfgs.rs index de710523f1f18..4815652cc999e 100644 --- a/src/compute-types/src/dyncfgs.rs +++ b/src/compute-types/src/dyncfgs.rs @@ -66,13 +66,6 @@ pub const ENABLE_TEMPORAL_BUCKETING: Config = Config::new( "Whether to enable temporal bucketing in compute.", ); -/// The summary to apply to the frontier in temporal bucketing in compute. -pub const TEMPORAL_BUCKETING_SUMMARY: Config = Config::new( - "compute_temporal_bucketing_summary", - Duration::from_secs(2), - "The summary to apply to frontiers in temporal bucketing in compute.", -); - /// The yielding behavior with which linear joins should be rendered. pub const LINEAR_JOIN_YIELDING: Config<&str> = Config::new( "linear_join_yielding", @@ -386,7 +379,6 @@ pub fn all_dyncfgs(configs: ConfigSet) -> ConfigSet { .add(&CORRECTION_V2_CHAIN_PROPORTIONALITY) .add(&CORRECTION_V2_CHUNK_SIZE) .add(&ENABLE_TEMPORAL_BUCKETING) - .add(&TEMPORAL_BUCKETING_SUMMARY) .add(&LINEAR_JOIN_YIELDING) .add(&ENABLE_LGALLOC) .add(&LGALLOC_BACKGROUND_INTERVAL) diff --git a/src/compute/src/extensions.rs b/src/compute/src/extensions.rs index 3cc0e64657f0e..89dd28b94bfcb 100644 --- a/src/compute/src/extensions.rs +++ b/src/compute/src/extensions.rs @@ -11,4 +11,3 @@ pub(crate) mod arrange; pub(crate) mod reduce; -pub(crate) mod temporal_bucket; diff --git a/src/compute/src/extensions/arrange.rs b/src/compute/src/extensions/arrange.rs index 6575402e65d53..88f3011fc27a8 100644 --- a/src/compute/src/extensions/arrange.rs +++ b/src/compute/src/extensions/arrange.rs @@ -11,24 +11,31 @@ use std::collections::BTreeMap; use std::rc::Rc; use differential_dataflow::difference::Semigroup; +use differential_dataflow::dynamic::pointstamp::PointStamp; use differential_dataflow::lattice::Lattice; use differential_dataflow::operators::arrange::arrangement::arrange_core; use differential_dataflow::operators::arrange::{Arranged, TraceAgent}; use differential_dataflow::trace::implementations::spine_fueled::Spine; use differential_dataflow::trace::{Batch, Batcher, Builder, Trace, TraceReader}; use differential_dataflow::{Collection, Data, ExchangeData, Hashable, VecCollection}; +use mz_repr::{Diff, Row}; +use mz_timely_util::columnar::builder::ColumnBuilder; +use mz_timely_util::columnar::{Col2ValBatcher, Col2ValTemporalBatcher, Column, columnar_exchange}; use timely::Container; use timely::dataflow::Stream; -use timely::dataflow::channels::pact::{Exchange, ParallelizationContract, Pipeline}; +use timely::dataflow::channels::pact::{Exchange, ExchangeCore, ParallelizationContract, Pipeline}; use timely::dataflow::operators::Operator; +use timely::order::Product; use timely::progress::Timestamp; use crate::logging::compute::{ ArrangementHeapAllocations, ArrangementHeapCapacity, ArrangementHeapSize, ArrangementHeapSizeOperator, ComputeEvent, ComputeEventBuilder, }; +use crate::row_spine::RowRowBuilder; use crate::typedefs::{ - KeyAgent, KeyValAgent, MzArrangeData, MzData, MzTimestamp, RowAgent, RowRowAgent, RowValAgent, + KeyAgent, KeyValAgent, MzArrangeData, MzData, MzTimestamp, RowAgent, RowRowAgent, RowRowSpine, + RowValAgent, }; /// Extension trait to arrange data. @@ -426,3 +433,73 @@ where }) } } + +/// Per-`RenderTimestamp` dispatch trait for the row-row arrangement built by +/// [`crate::render::context::CollectionBundle::ensure_collections`]. +/// +/// Two impls exist, one per concrete `RenderTimestamp`. For +/// [`mz_repr::Timestamp`] the dispatch picks +/// [`Col2ValTemporalBatcher`] when temporal bucketing is enabled, else +/// falls through to the plain [`Col2ValBatcher`]. For +/// `Product>` (iterative scopes) it +/// always uses the plain batcher because the timestamp is partially +/// ordered and not bucketable. +/// +/// This trait is intentionally **not** a supertrait of +/// `crate::render::RenderTimestamp` so that the bucketing precondition does +/// not leak into the generic timestamp interface. +pub trait MaybeTemporalRowRowArrange: MzTimestamp { + /// Arrange a row-row stream into a [`RowRowAgent`], possibly via the + /// temporal-bucketing batcher. + fn arrange_row_row<'scope>( + stream: Stream<'scope, Self, Column<((Row, Row), Self, Diff)>>, + name: &str, + bucketing_enabled: bool, + ) -> Arranged<'scope, RowRowAgent>; +} + +impl MaybeTemporalRowRowArrange for mz_repr::Timestamp { + fn arrange_row_row<'scope>( + stream: Stream<'scope, Self, Column<((Row, Row), Self, Diff)>>, + name: &str, + bucketing_enabled: bool, + ) -> Arranged<'scope, RowRowAgent> { + let pact = ExchangeCore::, _>::new_core( + columnar_exchange::, + ); + if bucketing_enabled { + stream.mz_arrange_core::< + _, + Col2ValTemporalBatcher, + RowRowBuilder, + RowRowSpine, + >(pact, name) + } else { + stream.mz_arrange_core::< + _, + Col2ValBatcher, + RowRowBuilder, + RowRowSpine, + >(pact, name) + } + } +} + +impl MaybeTemporalRowRowArrange for Product> { + fn arrange_row_row<'scope>( + stream: Stream<'scope, Self, Column<((Row, Row), Self, Diff)>>, + name: &str, + _bucketing_enabled: bool, + ) -> Arranged<'scope, RowRowAgent> { + // Iterative scope: timestamp is partially ordered, no bucketing. + let pact = ExchangeCore::, _>::new_core( + columnar_exchange::, + ); + stream.mz_arrange_core::< + _, + Col2ValBatcher, + RowRowBuilder, + RowRowSpine, + >(pact, name) + } +} diff --git a/src/compute/src/extensions/temporal_bucket.rs b/src/compute/src/extensions/temporal_bucket.rs deleted file mode 100644 index 569f306191d2c..0000000000000 --- a/src/compute/src/extensions/temporal_bucket.rs +++ /dev/null @@ -1,257 +0,0 @@ -// Copyright Materialize, Inc. and contributors. All rights reserved. -// -// Use of this software is governed by the Business Source License -// included in the LICENSE file. -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0. - -//! Utilities and stream extensions for temporal bucketing. - -use std::marker::PhantomData; - -use differential_dataflow::Hashable; -use differential_dataflow::difference::Semigroup; -use differential_dataflow::lattice::Lattice; -use differential_dataflow::trace::implementations::merge_batcher::MergeBatcher; -use differential_dataflow::trace::{Batcher, Builder, Description}; -use mz_timely_util::columnation::{ColInternalMerger, ColumnationChunker, ColumnationStack}; -use mz_timely_util::temporal::{Bucket, BucketChain, BucketTimestamp}; -use timely::container::PushInto; -use timely::dataflow::channels::pact::Exchange; -use timely::dataflow::operators::Operator; -use timely::dataflow::{Stream, StreamVec}; -use timely::order::TotalOrder; -use timely::progress::{Antichain, PathSummary, Timestamp}; -use timely::{ContainerBuilder, ExchangeData, PartialOrder}; - -use crate::typedefs::MzData; - -/// Sort outstanding updates into a [`BucketChain`], and reveal data not in advance of the input -/// frontier. Retains a capability at the last input frontier to retain the right to produce data -/// at times between the last input frontier and the current input frontier. -pub trait TemporalBucketing<'scope, T: Timestamp, O> { - /// Construct a new stream that stores updates into a [`BucketChain`] and reveals data - /// not in advance of the frontier. Data that is within `threshold` distance of the input - /// frontier or the `as_of` is passed through without being stored in the chain. - fn bucket( - self, - as_of: Antichain, - threshold: T::Summary, - ) -> Stream<'scope, T, CB::Container> - where - CB: ContainerBuilder + PushInto; -} - -/// Implementation for streams in scopes where timestamps define a total order. -impl<'scope, T, D> TemporalBucketing<'scope, T, (D, T, mz_repr::Diff)> - for StreamVec<'scope, T, (D, T, mz_repr::Diff)> -where - T: Timestamp + ExchangeData + MzData + BucketTimestamp + TotalOrder + Lattice, - D: ExchangeData + MzData + Ord + Clone + std::fmt::Debug + Hashable, -{ - fn bucket( - self, - as_of: Antichain, - threshold: T::Summary, - ) -> Stream<'scope, T, CB::Container> - where - CB: ContainerBuilder + PushInto<(D, T, mz_repr::Diff)>, - { - let scope = self.scope(); - let logger = scope - .worker() - .logger_for("differential/arrange") - .map(Into::into); - - let pact = Exchange::new(|(d, _, _): &(D, T, mz_repr::Diff)| d.hashed().into()); - self.unary_frontier::(pact, "Temporal delay", |cap, info| { - let mut chain = BucketChain::new(MergeBatcherWrapper::new(logger, info.global_id)); - let activator = scope.activator_for(info.address); - - // Cap tracking the lower bound of potentially outstanding data. - let mut cap = Some(cap); - - // Buffer for data to be inserted into the chain. - let mut buffer = Vec::new(); - - move |(input, frontier), output| { - // The upper frontier is the join of the input frontier and the `as_of` frontier, - // with the `threshold` summary applied to it. - let mut upper = Antichain::new(); - for time1 in &frontier.frontier() { - for time2 in as_of.elements() { - // TODO: Use `join_assign` if we ever use a timestamp with allocations. - if let Some(time) = threshold.results_in(&time1.join(time2)) { - upper.insert(time); - } - } - } - - input.for_each_time(|time, data| { - let mut session = output.session_with_builder(&time); - for data in data { - // Skip data that is about to be revealed. - let pass_through = data.extract_if(.., |(_, t, _)| !upper.less_equal(t)); - session.give_iterator(pass_through); - - // Sort data by time, then drain it into a buffer that contains data for a - // single bucket. We scan the data for ranges of time that fall into the same - // bucket so we can push batches of data at once. - data.sort_unstable_by(|(_, t, _), (_, t2, _)| t.cmp(t2)); - - let mut drain = data.drain(..); - if let Some((datum, time, diff)) = drain.next() { - let mut range = chain.range_of(&time).expect("Must exist"); - buffer.push((datum, time, diff)); - for (datum, time, diff) in drain { - // If we have a range, check if the time is not within it. - if !range.contains(&time) { - // If the time is outside the range, push the current buffer - // to the chain and reset the range. - if !buffer.is_empty() { - let bucket = - chain.find_mut(&range.start).expect("Must exist"); - bucket.inner.push_container(&mut buffer); - buffer.clear(); - } - range = chain.range_of(&time).expect("Must exist"); - } - buffer.push((datum, time, diff)); - } - - // Handle leftover data in the buffer. - if !buffer.is_empty() { - let bucket = chain.find_mut(&range.start).expect("Must exist"); - bucket.inner.push_container(&mut buffer); - buffer.clear(); - } - } - } - }); - - // Check for data that is ready to be revealed. - let peeled = chain.peel(upper.borrow()); - if let Some(cap) = cap.as_ref() { - let mut session = output.session_with_builder(cap); - for stack in peeled.into_iter().flat_map(|x| x.done()) { - // TODO: If we have a columnar merge batcher, cloning won't be necessary. - session.give_iterator(stack.iter().cloned()); - } - } else { - // If we don't have a cap, we should not have any data to reveal. - assert_eq!( - peeled.into_iter().flat_map(|x| x.done()).next(), - None, - "Unexpected data revealed without a cap." - ); - } - - // Downgrade the cap to the current input frontier. - if frontier.is_empty() || upper.is_empty() { - cap = None; - } else if let Some(cap) = cap.as_mut() { - // TODO: This assumes that the time is total ordered. - cap.downgrade(&upper[0]); - } - - // Maintain the bucket chain by restoring it with fuel. - let mut fuel = 1_000_000; - chain.restore(&mut fuel); - if fuel <= 0 { - // If we run out of fuel, we activate the operator to continue processing. - activator.activate(); - } - } - }) - } -} - -/// A wrapper around `MergeBatcher` that implements the `Storage` trait for bucketing. -struct MergeBatcherWrapper -where - D: MzData + Ord + Clone, - T: MzData + Ord + PartialOrder + Clone, - R: MzData + Semigroup + Default, -{ - logger: Option, - operator_id: usize, - inner: MergeBatcher, ColumnationChunker<(D, T, R)>, ColInternalMerger>, -} - -impl MergeBatcherWrapper -where - D: MzData + Ord + Clone, - T: MzData + Ord + PartialOrder + Clone + Timestamp, - R: MzData + Semigroup + Default, -{ - /// Construct a new `MergeBatcherWrapper` with the given logger and operator ID. - fn new(logger: Option, operator_id: usize) -> Self { - Self { - logger: logger.clone(), - operator_id, - inner: MergeBatcher::new(logger, operator_id), - } - } - - /// Reveal the contents of the `MergeBatcher`, returning a vector of `ColumnationStack`s. - fn done(mut self) -> Vec> { - self.inner.seal::>(Antichain::new()) - } -} - -impl Bucket for MergeBatcherWrapper -where - D: MzData + Ord + Clone + 'static, - T: MzData + Ord + PartialOrder + Clone + 'static + BucketTimestamp, - R: MzData + Semigroup + Default, -{ - type Timestamp = T; - - fn split(mut self, timestamp: &Self::Timestamp, fuel: &mut i64) -> (Self, Self) { - // The implementation isn't tuned for performance. We should not bounce in and out of - // different containers when not needed. The merge batcher we use can only accept - // vectors as inputs, but not any other container type. - // TODO: Allow the merge batcher to accept more generic containers. - let upper = Antichain::from_elem(timestamp.clone()); - let mut lower = Self::new(self.logger.clone(), self.operator_id); - let mut buffer = Vec::new(); - for chunk in self.inner.seal::>(upper) { - *fuel = fuel.saturating_sub(chunk.len().try_into().expect("must fit")); - // TODO: Avoid this cloning. - buffer.extend(chunk.into_iter().cloned()); - lower.inner.push_container(&mut buffer); - buffer.clear(); - } - (lower, self) - } -} - -struct CapturingBuilder(D, PhantomData); - -impl Builder for CapturingBuilder { - type Input = D; - type Time = T; - type Output = Vec; - - fn with_capacity(_keys: usize, _vals: usize, _upds: usize) -> Self { - // Not needed for this implementation. - unimplemented!() - } - - fn push(&mut self, _chunk: &mut Self::Input) { - // Not needed for this implementation. - unimplemented!() - } - - fn done(self, _description: Description) -> Self::Output { - // Not needed for this implementation. - unimplemented!() - } - - #[inline] - fn seal(chain: &mut Vec, _description: Description) -> Self::Output { - std::mem::take(chain) - } -} diff --git a/src/compute/src/render.rs b/src/compute/src/render.rs index 08e3182c3621b..deca03ef3af72 100644 --- a/src/compute/src/render.rs +++ b/src/compute/src/render.rs @@ -122,7 +122,7 @@ use mz_compute_types::dataflows::{DataflowDescription, IndexDesc}; use mz_compute_types::dyncfgs::{ COMPUTE_APPLY_COLUMN_DEMANDS, COMPUTE_LOGICAL_BACKPRESSURE_INFLIGHT_SLACK, COMPUTE_LOGICAL_BACKPRESSURE_MAX_RETAINED_CAPABILITIES, ENABLE_COMPUTE_LOGICAL_BACKPRESSURE, - ENABLE_TEMPORAL_BUCKETING, SUBSCRIBE_SNAPSHOT_OPTIMIZATION, TEMPORAL_BUCKETING_SUMMARY, + SUBSCRIBE_SNAPSHOT_OPTIMIZATION, }; use mz_compute_types::plan::LirId; use mz_compute_types::plan::render_plan::{ @@ -138,7 +138,6 @@ use mz_timely_util::operator::{CollectionExt, StreamExt}; use mz_timely_util::probe::{Handle as MzProbeHandle, ProbeNotify}; use mz_timely_util::scope_label::ScopeExt; use timely::PartialOrder; -use timely::container::CapacityContainerBuilder; use timely::dataflow::channels::pact::Pipeline; use timely::dataflow::operators::vec::ToStream; use timely::dataflow::operators::vec::{BranchWhen, Filter}; @@ -154,7 +153,6 @@ use crate::arrangement::manager::TraceBundle; use crate::compute_state::ComputeState; use crate::extensions::arrange::{KeyCollection, MzArrange}; use crate::extensions::reduce::MzReduce; -use crate::extensions::temporal_bucket::TemporalBucketing; use crate::logging::compute::{ ComputeEvent, DataflowGlobal, LirMapping, LirMetadata, LogDataflowErrors, OperatorHydration, }; @@ -453,18 +451,6 @@ pub fn build_compute_dataflow( ); for (id, (oks, errs)) in imported_sources.into_iter() { - let oks = if ENABLE_TEMPORAL_BUCKETING.get(&compute_state.worker_config) { - let as_of = context.as_of_frontier.clone(); - let summary = TEMPORAL_BUCKETING_SUMMARY - .get(&compute_state.worker_config) - .try_into() - .expect("must fit"); - oks.inner - .bucket::>(as_of, summary) - .as_collection() - } else { - oks - }; let bundle = crate::render::CollectionBundle::from_collections( oks.enter_region(region), errs.enter_region(region), @@ -988,7 +974,10 @@ impl<'scope> Context<'scope, Product>> { } } -impl<'scope, T: RenderTimestamp> Context<'scope, T> { +impl<'scope, T> Context<'scope, T> +where + T: RenderTimestamp + crate::extensions::arrange::MaybeTemporalRowRowArrange, +{ /// Renders a non-recursive plan to a differential dataflow, producing the collection of /// results. /// diff --git a/src/compute/src/render/context.rs b/src/compute/src/render/context.rs index e6e6d5c8d235c..d9509a499d1c8 100644 --- a/src/compute/src/render/context.rs +++ b/src/compute/src/render/context.rs @@ -19,7 +19,9 @@ use differential_dataflow::trace::implementations::BatchContainer; use differential_dataflow::trace::{BatchReader, Cursor, TraceReader}; use differential_dataflow::{AsCollection, Data, VecCollection}; use mz_compute_types::dataflows::DataflowDescription; -use mz_compute_types::dyncfgs::ENABLE_COMPUTE_RENDER_FUELED_AS_SPECIFIC_COLLECTION; +use mz_compute_types::dyncfgs::{ + ENABLE_COMPUTE_RENDER_FUELED_AS_SPECIFIC_COLLECTION, ENABLE_TEMPORAL_BUCKETING, +}; use mz_compute_types::plan::AvailableCollections; use mz_dyncfg::ConfigSet; use mz_expr::{Id, MapFilterProject, MirScalarExpr}; @@ -28,10 +30,9 @@ use mz_repr::fixed_length::ToDatumIter; use mz_repr::{DatumVec, DatumVecBorrow, Diff, GlobalId, Row, RowArena, SharedRow}; use mz_storage_types::controller::CollectionMetadata; use mz_timely_util::columnar::builder::ColumnBuilder; -use mz_timely_util::columnar::{Col2ValBatcher, columnar_exchange}; use mz_timely_util::operator::CollectionExt; use timely::container::CapacityContainerBuilder; -use timely::dataflow::channels::pact::{ExchangeCore, Pipeline}; +use timely::dataflow::channels::pact::Pipeline; use timely::dataflow::operators::Capability; use timely::dataflow::operators::generic::builder_rc::OperatorBuilder; use timely::dataflow::operators::generic::{OutputBuilder, OutputBuilderSession}; @@ -40,12 +41,12 @@ use timely::progress::operate::FrontierInterest; use timely::progress::{Antichain, Timestamp}; use crate::compute_state::ComputeState; -use crate::extensions::arrange::{KeyCollection, MzArrange, MzArrangeCore}; +use crate::extensions::arrange::{KeyCollection, MaybeTemporalRowRowArrange, MzArrange}; use crate::render::errors::{DataflowErrorSer, ErrorLogger}; use crate::render::{LinearJoinSpec, RenderTimestamp}; -use crate::row_spine::{DatumSeq, RowRowBuilder}; +use crate::row_spine::DatumSeq; use crate::typedefs::{ - ErrAgent, ErrBatcher, ErrBuilder, ErrEnter, ErrSpine, RowRowAgent, RowRowEnter, RowRowSpine, + ErrAgent, ErrBatcher, ErrBuilder, ErrEnter, ErrSpine, RowRowAgent, RowRowEnter, }; /// Dataflow-local collections and arrangements. @@ -738,7 +739,10 @@ impl<'scope, T: RenderTimestamp> CollectionBundle<'scope, T> { input_mfp: MapFilterProject, until: Antichain, config_set: &ConfigSet, - ) -> Self { + ) -> Self + where + T: MaybeTemporalRowRowArrange, + { if collections == Default::default() { return self; } @@ -757,6 +761,8 @@ impl<'scope, T: RenderTimestamp> CollectionBundle<'scope, T> { ); } + let bucketing_enabled = ENABLE_TEMPORAL_BUCKETING.get(config_set); + // We need the collection if either (1) it is explicitly demanded, or (2) we are going to render any arrangement let form_raw_collection = collections.raw || collections @@ -780,8 +786,13 @@ impl<'scope, T: RenderTimestamp> CollectionBundle<'scope, T> { .collection .take() .expect("Collection constructed above"); - let (oks, errs_keyed, passthrough) = - Self::arrange_collection(&name, oks, key.clone(), thinning.clone()); + let (oks, errs_keyed, passthrough) = Self::arrange_collection( + &name, + oks, + key.clone(), + thinning.clone(), + bucketing_enabled, + ); let errs_concat: KeyCollection<_, _, _> = errs.clone().concat(errs_keyed).into(); self.collection = Some((passthrough, errs)); let errs = @@ -810,11 +821,15 @@ impl<'scope, T: RenderTimestamp> CollectionBundle<'scope, T> { oks: VecCollection<'scope, T, Row, Diff>, key: Vec, thinning: Vec, + bucketing_enabled: bool, ) -> ( Arranged<'scope, RowRowAgent>, VecCollection<'scope, T, DataflowErrorSer, Diff>, VecCollection<'scope, T, Row, Diff>, - ) { + ) + where + T: MaybeTemporalRowRowArrange, + { // This operator implements a `map_fallible`, but produces columnar updates for the ok // stream. The `map_fallible` cannot be used here because the closure cannot return // references, which is what we need to push into columnar streams. Instead, we use a @@ -861,18 +876,7 @@ impl<'scope, T: RenderTimestamp> CollectionBundle<'scope, T> { } }); - let oks = ok_stream - .mz_arrange_core::< - _, - Col2ValBatcher<_, _, _, _>, - RowRowBuilder<_, _>, - RowRowSpine<_, _>, - >( - ExchangeCore::, _>::new_core( - columnar_exchange::, - ), - name - ); + let oks = T::arrange_row_row(ok_stream, name, bucketing_enabled); ( oks, err_stream.as_collection(), diff --git a/src/compute/src/render/reduce.rs b/src/compute/src/render/reduce.rs index d4f34d2951846..205d0c2932ba1 100644 --- a/src/compute/src/render/reduce.rs +++ b/src/compute/src/render/reduce.rs @@ -23,7 +23,6 @@ use differential_dataflow::difference::{IsZero, Multiply, Semigroup}; use differential_dataflow::hashable::Hashable; use differential_dataflow::operators::arrange::{Arranged, TraceAgent}; use differential_dataflow::trace::implementations::BatchContainer; -use differential_dataflow::trace::implementations::merge_batcher::container::InternalMerge; use differential_dataflow::trace::{Builder, Trace}; use differential_dataflow::{Data, VecCollection}; use itertools::Itertools; @@ -37,6 +36,7 @@ use mz_expr::{ use mz_repr::adt::numeric::{self, Numeric, NumericAgg}; use mz_repr::fixed_length::ToDatumIter; use mz_repr::{Datum, DatumVec, Diff, Row, RowArena, SharedRow}; +use mz_timely_util::merge_batcher::container::InternalMerge; use mz_timely_util::operator::CollectionExt; use serde::{Deserialize, Serialize}; use timely::Container; diff --git a/src/compute/src/render/threshold.rs b/src/compute/src/render/threshold.rs index 3c47f5267cf35..0570cd1f494db 100644 --- a/src/compute/src/render/threshold.rs +++ b/src/compute/src/render/threshold.rs @@ -14,11 +14,11 @@ use differential_dataflow::Data; use differential_dataflow::operators::arrange::{Arranged, TraceAgent}; use differential_dataflow::trace::implementations::BatchContainer; -use differential_dataflow::trace::implementations::merge_batcher::container::InternalMerge; use differential_dataflow::trace::{Builder, Trace, TraceReader}; use mz_compute_types::plan::threshold::{BasicThresholdPlan, ThresholdPlan}; use mz_expr::MirScalarExpr; use mz_repr::Diff; +use mz_timely_util::merge_batcher::container::InternalMerge; use timely::Container; use timely::container::PushInto; diff --git a/src/compute/src/render/top_k.rs b/src/compute/src/render/top_k.rs index 28df27f0d3369..99115ac1a7a0f 100644 --- a/src/compute/src/render/top_k.rs +++ b/src/compute/src/render/top_k.rs @@ -21,7 +21,6 @@ use differential_dataflow::lattice::Lattice; use differential_dataflow::operators::arrange::{Arranged, TraceAgent}; use differential_dataflow::operators::iterate::Variable as SemigroupVariable; use differential_dataflow::trace::implementations::BatchContainer; -use differential_dataflow::trace::implementations::merge_batcher::container::InternalMerge; use differential_dataflow::trace::{Builder, Trace}; use differential_dataflow::{Data, VecCollection}; use mz_compute_types::plan::top_k::{ @@ -32,6 +31,7 @@ use mz_expr::{BinaryFunc, EvalError, MirScalarExpr, UnaryFunc, func}; use mz_ore::cast::CastFrom; use mz_ore::soft_assert_or_log; use mz_repr::{Datum, DatumVec, Diff, ReprScalarType, Row, SharedRow}; +use mz_timely_util::merge_batcher::container::InternalMerge; use mz_timely_util::operator::CollectionExt; use timely::Container; use timely::container::{CapacityContainerBuilder, PushInto}; diff --git a/src/compute/src/typedefs.rs b/src/compute/src/typedefs.rs index 31ab29e9a03da..79be68015ca2b 100644 --- a/src/compute/src/typedefs.rs +++ b/src/compute/src/typedefs.rs @@ -14,11 +14,11 @@ use columnar::{Container, Ref}; use differential_dataflow::operators::arrange::Arranged; use differential_dataflow::operators::arrange::TraceAgent; -use differential_dataflow::trace::implementations::merge_batcher::MergeBatcher; use differential_dataflow::trace::wrappers::enter::TraceEnter; use differential_dataflow::trace::wrappers::frontier::TraceFrontier; use mz_repr::Diff; use mz_timely_util::columnation::{ColInternalMerger, ColumnationChunker}; +use mz_timely_util::merge_batcher::MergeBatcher; use crate::render::errors::DataflowErrorSer; use crate::row_spine::RowValBuilder; diff --git a/src/storage/src/upsert_continual_feedback_v2.rs b/src/storage/src/upsert_continual_feedback_v2.rs index 32de9e3770086..aa3a8b3ac6a36 100644 --- a/src/storage/src/upsert_continual_feedback_v2.rs +++ b/src/storage/src/upsert_continual_feedback_v2.rs @@ -71,9 +71,6 @@ use differential_dataflow::lattice::Lattice; use differential_dataflow::operators::arrange::agent::TraceAgent; use differential_dataflow::trace::implementations::ValSpine; use differential_dataflow::trace::implementations::chunker::ContainerChunker; -use differential_dataflow::trace::implementations::merge_batcher::{ - MergeBatcher, container::VecMerger, -}; use differential_dataflow::trace::{Batcher, Builder, Cursor, Description, TraceReader}; use differential_dataflow::{AsCollection, VecCollection}; use mz_repr::{Diff, GlobalId, Row}; @@ -81,6 +78,7 @@ use mz_storage_types::errors::{DataflowError, EnvelopeError}; use mz_timely_util::builder_async::{ Event as AsyncEvent, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton, }; +use mz_timely_util::merge_batcher::{MergeBatcher, container::VecMerger}; use std::convert::Infallible; use timely::container::CapacityContainerBuilder; use timely::dataflow::StreamVec; diff --git a/src/timely-util/src/columnar.rs b/src/timely-util/src/columnar.rs index 5db3a34689e7e..a1a7149e9fe2c 100644 --- a/src/timely-util/src/columnar.rs +++ b/src/timely-util/src/columnar.rs @@ -23,19 +23,20 @@ pub mod consolidate; use std::hash::Hash; +use crate::merge_batcher::MergeBatcher; use columnar::Borrow; use columnar::bytes::indexed; use columnar::common::IterOwn; use columnar::{Columnar, Ref}; use columnar::{FromBytes, Index, Len}; use differential_dataflow::Hashable; -use differential_dataflow::trace::implementations::merge_batcher::MergeBatcher; use timely::Accountable; use timely::bytes::arc::Bytes; use timely::container::{DrainContainer, PushInto}; use timely::dataflow::channels::ContainerBytes; use crate::columnation::{ColInternalMerger, ColumnationStack}; +use crate::merge_batcher::TemporalBucketingMergeBatcher; /// A batcher for columnar storage. pub type Col2ValBatcher = MergeBatcher< @@ -46,6 +47,17 @@ pub type Col2ValBatcher = MergeBatcher< /// A batcher for columnar storage with unit values. pub type Col2KeyBatcher = Col2ValBatcher; +/// A temporal-bucketing batcher for columnar storage. +/// +/// Same chunker and merger as [`Col2ValBatcher`], but routes chunks through +/// a [`TemporalBucketingMergeBatcher`] so that the arrangement spine sees +/// fewer, bucket-aligned batches. +pub type Col2ValTemporalBatcher = TemporalBucketingMergeBatcher< + Column<((K, V), T, R)>, + batcher::Chunker>, + ColInternalMerger<(K, V), T, R>, +>; + /// A container based on a columnar store, encoded in aligned bytes. /// /// The type can represent typed data, bytes from Timely, or an aligned allocation. The name diff --git a/src/timely-util/src/columnation.rs b/src/timely-util/src/columnation.rs index 64eb1fd035790..bfa6786bfa0dc 100644 --- a/src/timely-util/src/columnation.rs +++ b/src/timely-util/src/columnation.rs @@ -22,11 +22,11 @@ use std::collections::VecDeque; use std::iter::FromIterator; +use crate::merge_batcher::container::InternalMerge; use columnation::{Columnation, Region}; use differential_dataflow::consolidation::consolidate_updates; use differential_dataflow::difference::Semigroup; use differential_dataflow::lattice::Lattice; -use differential_dataflow::trace::implementations::merge_batcher::container::InternalMerge; use differential_dataflow::trace::implementations::{BatchContainer, BuilderInput}; use timely::container::{ContainerBuilder, DrainContainer, PushInto, SizableContainer}; use timely::progress::Timestamp; @@ -653,6 +653,16 @@ where *position += 1; } } + + fn time_range(&self) -> Option<(Antichain, Antichain)> { + let slice = &self[..]; + if slice.is_empty() { + return None; + } + Some(crate::merge_batcher::build_time_bounds( + slice.iter().map(|(_, t, _)| t.clone()), + )) + } } // --------------------------------------------------------------------------- @@ -661,6 +671,4 @@ where /// A `Merger` using internal iteration for `ColumnationStack` containers. pub type ColInternalMerger = - differential_dataflow::trace::implementations::merge_batcher::InternalMerger< - ColumnationStack<(D, T, R)>, - >; + crate::merge_batcher::InternalMerger>; diff --git a/src/timely-util/src/lib.rs b/src/timely-util/src/lib.rs index 57474b58cea34..a655b17a495e5 100644 --- a/src/timely-util/src/lib.rs +++ b/src/timely-util/src/lib.rs @@ -22,6 +22,7 @@ pub mod capture; pub mod columnar; pub mod columnation; pub mod containers; +pub mod merge_batcher; pub mod operator; pub mod order; pub mod pact; diff --git a/src/timely-util/src/merge_batcher.rs b/src/timely-util/src/merge_batcher.rs new file mode 100644 index 0000000000000..e1bedda79a9ee --- /dev/null +++ b/src/timely-util/src/merge_batcher.rs @@ -0,0 +1,1705 @@ +// Copyright 2015–2024 The Differential Dataflow contributors +// Copyright Materialize, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. +// +// Portions of this file are derived from differential-dataflow +// (https://github.com/TimelyDataflow/differential-dataflow), licensed +// under the MIT License. + +//! A `Batcher` implementation based on merge sort. +//! +//! Vendored from `differential_dataflow::trace::implementations::merge_batcher` +//! so that we can layer temporal bucketing on top of the merge batcher's +//! internal chain representation. +//! +//! The `MergeBatcher` requires support from two types, a "chunker" and a "merger". +//! The chunker receives input batches and consolidates them, producing sorted output +//! "chunks" that are fully consolidated (no adjacent updates can be accumulated). +//! The merger implements the [`Merger`] trait, and provides hooks for manipulating +//! sorted "chains" of chunks as needed by the merge batcher: merging chunks and also +//! splitting them apart based on time. + +use std::marker::PhantomData; + +use differential_dataflow::logging::{BatcherEvent, Logger}; +use differential_dataflow::trace::{Batcher, Builder, Description}; +use timely::container::{ContainerBuilder, PushInto}; +use timely::progress::frontier::AntichainRef; +use timely::progress::{Timestamp, frontier::Antichain}; + +/// Creates batches from containers of unordered tuples. +/// +/// To implement `Batcher`, the container builder `C` must accept `&mut Input` as inputs, +/// and must produce outputs of type `M::Chunk`. +pub struct MergeBatcher { + /// Transforms input streams to chunks of sorted, consolidated data. + chunker: C, + /// A sequence of power-of-two length lists of sorted, consolidated containers. + /// + /// Do not push/pop directly but use the corresponding functions ([`Self::chain_push`]/[`Self::chain_pop`]). + chains: Vec>, + /// Stash of empty chunks, recycled through the merging process. + stash: Vec, + /// Merges consolidated chunks, and extracts the subset of an update chain that lies in an interval of time. + merger: M, + /// Current lower frontier, we sealed up to here. + lower: Antichain, + /// The lower-bound frontier of the data, after the last call to seal. + frontier: Antichain, + /// Logger for size accounting. + logger: Option, + /// Timely operator ID. + operator_id: usize, + /// The `Input` type needs to be called out as the type of container accepted, but it is not otherwise present. + _marker: PhantomData, +} + +impl Batcher for MergeBatcher +where + C: ContainerBuilder + for<'a> PushInto<&'a mut Input>, + M: Merger, +{ + type Input = Input; + type Time = M::Time; + type Output = M::Chunk; + + fn new(logger: Option, operator_id: usize) -> Self { + Self { + logger, + operator_id, + chunker: C::default(), + merger: M::default(), + chains: Vec::new(), + stash: Vec::new(), + frontier: Antichain::new(), + lower: Antichain::from_elem(M::Time::minimum()), + _marker: PhantomData, + } + } + + /// Push a container of data into this merge batcher. Updates the internal chain structure if + /// needed. + fn push_container(&mut self, container: &mut Input) { + self.chunker.push_into(container); + while let Some(chunk) = self.chunker.extract() { + let chunk = std::mem::take(chunk); + self.insert_chain(vec![chunk]); + } + } + + // Sealing a batch means finding those updates with times not greater or equal to any time + // in `upper`. All updates must have time greater or equal to the previously used `upper`, + // which we call `lower`, by assumption that after sealing a batcher we receive no more + // updates with times not greater or equal to `upper`. + fn seal>( + &mut self, + upper: Antichain, + ) -> B::Output { + // Finish + while let Some(chunk) = self.chunker.finish() { + let chunk = std::mem::take(chunk); + self.insert_chain(vec![chunk]); + } + + // Merge all remaining chains into a single chain. + while self.chains.len() > 1 { + let list1 = self.chain_pop().unwrap(); + let list2 = self.chain_pop().unwrap(); + let merged = self.merge_by(list1, list2); + self.chain_push(merged); + } + let merged = self.chain_pop().unwrap_or_default(); + + // Extract readied data. + let mut kept = Vec::new(); + let mut readied = Vec::new(); + self.frontier.clear(); + + self.merger.extract( + merged, + upper.borrow(), + &mut self.frontier, + &mut readied, + &mut kept, + &mut self.stash, + ); + + if !kept.is_empty() { + self.chain_push(kept); + } + + self.stash.clear(); + + let description = Description::new( + self.lower.clone(), + upper.clone(), + Antichain::from_elem(M::Time::minimum()), + ); + let seal = B::seal(&mut readied, description); + self.lower = upper; + seal + } + + /// The frontier of elements remaining after the most recent call to `self.seal`. + #[inline] + fn frontier(&mut self) -> AntichainRef<'_, M::Time> { + self.frontier.borrow() + } +} + +impl MergeBatcher { + /// Insert a chain and maintain chain properties: Chains are geometrically sized and ordered + /// by decreasing length. + fn insert_chain(&mut self, chain: Vec) { + if !chain.is_empty() { + self.chain_push(chain); + while self.chains.len() > 1 + && (self.chains[self.chains.len() - 1].len() + >= self.chains[self.chains.len() - 2].len() / 2) + { + let list1 = self.chain_pop().unwrap(); + let list2 = self.chain_pop().unwrap(); + let merged = self.merge_by(list1, list2); + self.chain_push(merged); + } + } + } + + // merges two sorted input lists into one sorted output list. + fn merge_by(&mut self, list1: Vec, list2: Vec) -> Vec { + // TODO: `list1` and `list2` get dropped; would be better to reuse? + let mut output = Vec::with_capacity(list1.len() + list2.len()); + self.merger + .merge(list1, list2, &mut output, &mut self.stash); + + output + } + + /// Pop a chain and account size changes. + #[inline] + fn chain_pop(&mut self) -> Option> { + let chain = self.chains.pop(); + self.account(chain.iter().flatten().map(M::account), -1); + chain + } + + /// Push a chain and account size changes. + #[inline] + fn chain_push(&mut self, chain: Vec) { + self.account(chain.iter().map(M::account), 1); + self.chains.push(chain); + } + + /// Account size changes. Only performs work if a logger exists. + /// + /// Calculate the size based on the iterator passed along, with each attribute + /// multiplied by `diff`. Usually, one wants to pass 1 or -1 as the diff. + #[inline] + fn account>(&self, items: I, diff: isize) { + if let Some(logger) = &self.logger { + let (mut records, mut size, mut capacity, mut allocations) = + (0isize, 0isize, 0isize, 0isize); + for (records_, size_, capacity_, allocations_) in items { + records = records.saturating_add_unsigned(records_); + size = size.saturating_add_unsigned(size_); + capacity = capacity.saturating_add_unsigned(capacity_); + allocations = allocations.saturating_add_unsigned(allocations_); + } + logger.log(BatcherEvent { + operator: self.operator_id, + records_diff: records * diff, + size_diff: size * diff, + capacity_diff: capacity * diff, + allocations_diff: allocations * diff, + }) + } + } +} + +impl Drop for MergeBatcher { + fn drop(&mut self) { + // Cleanup chain to retract accounting information. + while self.chain_pop().is_some() {} + } +} + +/// A trait to describe interesting moments in a merge batcher. +pub trait Merger: Default { + /// The internal representation of chunks of data. + type Chunk: Default; + /// The type of time in frontiers to extract updates. + type Time; + /// Merge chains into an output chain. + fn merge( + &mut self, + list1: Vec, + list2: Vec, + output: &mut Vec, + stash: &mut Vec, + ); + /// Extract ready updates based on the `upper` frontier. + fn extract( + &mut self, + merged: Vec, + upper: AntichainRef, + frontier: &mut Antichain, + readied: &mut Vec, + kept: &mut Vec, + stash: &mut Vec, + ); + + /// Account size and allocation changes. Returns a tuple of (records, size, capacity, allocations). + fn account(chunk: &Self::Chunk) -> (usize, usize, usize, usize); + + /// Return `(lower, upper)` antichains bracketing the times in `chunk`, + /// or `None` if the chunk is empty or the implementation cannot + /// produce them. + /// + /// `lower` is an antichain of *minimal* elements (every chunk time + /// satisfies `lower.less_equal(t)`); `upper` is an antichain of + /// *maximal* elements (every chunk time satisfies `t.less_equal(u)` + /// for some `u` in `upper`). For totally-ordered times both antichains + /// reduce to the single min / max element. + /// + /// Used by [`temporal::MergeBucket`] to short-circuit + /// `Bucket::split` when the bucket's data lies entirely on one side + /// of the split timestamp. The default returns `None`, which forces + /// the bucket onto the merge+extract fallback. + fn time_range(_chunk: &Self::Chunk) -> Option<(Antichain, Antichain)> { + None + } +} + +pub use container::InternalMerger; + +/// Build `(lower, upper)` antichains from an iterator of timestamps. +/// +/// `lower` is an antichain of *minimal* elements (every emitted time is +/// in-advance-of `lower`); `upper` is an antichain of *maximal* +/// elements (every emitted time is `less_equal` to some element of +/// `upper`). For totally-ordered times both reduce to single-element +/// antichains. +pub(crate) fn build_time_bounds(elements: I) -> (Antichain, Antichain) +where + T: timely::PartialOrder + Clone, + I: IntoIterator, +{ + let mut lower: Antichain = Antichain::new(); + let mut max_set: Vec = Vec::new(); + for t in elements { + lower.insert(t.clone()); + if max_set.iter().any(|m| t.less_equal(m)) { + continue; + } + max_set.retain(|m| !m.less_than(&t)); + max_set.push(t); + } + (lower, Antichain::from(max_set)) +} + +/// Reduce a `Vec` to its maximal elements (mutually incomparable +/// elements such that no other vec element dominates them) and wrap +/// in an [`Antichain`]. +pub(crate) fn reduce_to_maximal(elements: Vec) -> Antichain { + let mut max_set: Vec = Vec::with_capacity(elements.len()); + for t in elements { + if max_set.iter().any(|m| t.less_equal(m)) { + continue; + } + max_set.retain(|m| !m.less_than(&t)); + max_set.push(t); + } + Antichain::from(max_set) +} + +pub use temporal::{MergeBucket, TemporalBucketingMergeBatcher}; + +mod temporal { + //! Temporal bucketing variant of [`super::MergeBatcher`]. + //! + //! Hybrid layout: incoming data flows into a flat `chains: + //! Vec>` exactly like plain [`super::MergeBatcher`]. On + //! `seal`, the flat chains are merged and extracted against the seal + //! upper; the `kept` (future-stamped) side is the only data that enters a + //! [`BucketChain`] of [`MergeBucket`]s. Subsequent seals peel mature + //! buckets out of the bucket chain and merge their contents with the + //! current flat extract. + //! + //! For workloads where most data is current (`kept` is small at each + //! seal) the bucket chain stays nearly empty, so per-push and per-seal + //! cost is dominated by the plain merge-batcher path. The bucket chain + //! only pays its splitting/merging overhead for the future tail of the + //! input. The contract of `Batcher::seal` is preserved: only updates + //! with time strictly less than `upper` are emitted. + + use std::marker::PhantomData; + + use differential_dataflow::logging::Logger; + use differential_dataflow::trace::{Batcher, Builder, Description}; + use timely::container::{ContainerBuilder, PushInto}; + use timely::progress::frontier::AntichainRef; + use timely::progress::{Antichain, Timestamp}; + + use crate::merge_batcher::Merger; + use crate::temporal::{Bucket, BucketChain, BucketTimestamp}; + + /// Fuel budget used to interleave [`BucketChain::restore`] with + /// `push_container` and `seal`. + /// + /// Matches the fuel used by the source-side bucketing operator until + /// benchmarks suggest a different value. + const RESTORE_FUEL: i64 = 1_000_000; + + /// A bucket of merge-batcher chunks for a single power-of-two time range. + /// + /// Maintains the same geometric chain invariant as [`super::MergeBatcher`]: + /// adjacent chains decrease in size, merging older chains as new ones are + /// pushed. + /// + /// Tracks `(lower, upper)` antichains bracketing the held times in + /// [`Self::bounds`] (when [`Merger::time_range`] returns `Some`) so + /// [`Bucket::split`] can short-circuit when the data lies entirely on + /// one side of the split timestamp. + pub struct MergeBucket { + /// Geometrically sized chains of sorted, consolidated chunks. + chains: Vec>, + /// `(lower, upper)` antichains over the times in [`Self::chains`]. + /// + /// `lower` contains *minimal* elements (every held time is + /// in-advance-of `lower`); `upper` contains *maximal* elements + /// (every held time is `less_equal` to some element of `upper`). + /// + /// `None` means either the bucket is empty or the merger does not + /// implement [`Merger::time_range`]; the latter forces + /// [`Bucket::split`] onto its merge+extract fallback path. + bounds: Option<(Antichain, Antichain)>, + } + + impl Default for MergeBucket { + fn default() -> Self { + Self { + chains: Vec::new(), + bounds: None, + } + } + } + + impl MergeBucket + where + M::Time: timely::PartialOrder + Clone, + { + fn is_empty(&self) -> bool { + self.chains.iter().all(|c| c.is_empty()) + } + + /// Insert `chain` and re-establish geometric chain sizes by merging + /// adjacent chains while their sizes are within a factor of two. + fn insert_chain( + &mut self, + chain: Vec, + merger: &mut M, + stash: &mut Vec, + ) { + if chain.is_empty() { + return; + } + // Union the chain's `(lower, upper)` antichains into + // `self.bounds`. If any chunk reports `None` we drop bounds + // to `None` so subsequent splits fall back to merge+extract. + self.absorb_chain_bounds(&chain); + self.chains.push(chain); + while self.chains.len() > 1 + && self.chains[self.chains.len() - 1].len() + >= self.chains[self.chains.len() - 2].len() / 2 + { + let l1 = self.chains.pop().unwrap(); + let l2 = self.chains.pop().unwrap(); + let mut out = Vec::with_capacity(l1.len() + l2.len()); + merger.merge(l1, l2, &mut out, stash); + self.chains.push(out); + } + } + + /// Update `self.bounds` to reflect the union of its current value + /// and the times in `chain`. Sets `bounds` to `None` if any + /// chunk reports `None` (no info available). + fn absorb_chain_bounds(&mut self, chain: &[M::Chunk]) { + for chunk in chain { + let Some(chunk_bounds) = M::time_range(chunk) else { + self.bounds = None; + return; + }; + self.bounds = Some(match self.bounds.take() { + Some(existing) => merge_bounds(existing, chunk_bounds), + None => chunk_bounds, + }); + } + } + + /// Pairwise-merge all chains into a single sorted chain. + /// + /// Drops `self.bounds` since the merge consumes the chains; the + /// caller is expected to discard the bucket or rebuild bounds if + /// further use is needed. + fn merge_into_one(mut self, merger: &mut M, stash: &mut Vec) -> Vec { + while self.chains.len() > 1 { + let l1 = self.chains.pop().unwrap(); + let l2 = self.chains.pop().unwrap(); + let mut out = Vec::with_capacity(l1.len() + l2.len()); + merger.merge(l1, l2, &mut out, stash); + self.chains.push(out); + } + self.chains.pop().unwrap_or_default() + } + } + + impl Bucket for MergeBucket + where + M: Merger, + { + type Timestamp = M::Time; + + fn split(self, timestamp: &Self::Timestamp, fuel: &mut i64) -> (Self, Self) { + use timely::PartialOrder; + + // Short-circuit when tracked bounds tell us the data lies + // entirely on one side of `timestamp`. This avoids the + // O(records) merge+extract walk that the fallback path would + // perform — the dominant cost when `BucketChain::peel` / + // `BucketChain::restore` cascades splits over a bucket that + // holds the whole residue (e.g. all records at a single time + // exactly equal to a seal upper). + // + // The checks use timely's [`PartialOrder`] so they're correct + // for partially-ordered timestamps too: when the antichain + // can't be totally compared against `timestamp` neither + // predicate fires and we fall through to the merge+extract + // path. + if let Some((lower, upper)) = &self.bounds { + // All held times satisfy `t <= u` for some `u` in + // `upper`. If every `u` in `upper` is `< timestamp` then + // every held time is `< timestamp` → all goes to lower + // side, upper side is empty. + if !upper.is_empty() && upper.elements().iter().all(|u| u.less_than(timestamp)) { + return (self, Self::default()); + } + // All held times satisfy `lower.less_equal(t)`. If every + // `l` in `lower` satisfies `timestamp.less_equal(l)` + // (i.e. `timestamp <= l`) then every held time is + // `>= timestamp` → all goes to upper side, lower side is + // empty. + if !lower.is_empty() && lower.elements().iter().all(|l| timestamp.less_equal(l)) { + return (Self::default(), self); + } + } + + let had_bounds = self.bounds.is_some(); + let mut merger = M::default(); + let mut stash = Vec::new(); + let merged = self.merge_into_one(&mut merger, &mut stash); + + // Account fuel by total record count of the chunks we are about to + // partition. `M::account(...).0` is the record count. + let records: usize = merged.iter().map(|c| M::account(c).0).sum(); + *fuel = fuel.saturating_sub(i64::try_from(records).unwrap_or(i64::MAX)); + + let upper_at = Antichain::from_elem(timestamp.clone()); + let mut frontier = Antichain::new(); + let mut ship = Vec::new(); + let mut keep = Vec::new(); + merger.extract( + merged, + upper_at.borrow(), + &mut frontier, + &mut ship, + &mut keep, + &mut stash, + ); + // Recompute child bounds from the freshly-extracted chunks. + // If the merger doesn't implement `time_range`, both sides + // fall back to `None` (and future splits stay on the + // fallback path). + let lower_bounds = if had_bounds { + bounds_from_chain::(&ship) + } else { + None + }; + let upper_bounds = if had_bounds { + bounds_from_chain::(&keep) + } else { + None + }; + let lower = MergeBucket { + chains: if ship.is_empty() { + Vec::new() + } else { + vec![ship] + }, + bounds: lower_bounds, + }; + let upper = MergeBucket { + chains: if keep.is_empty() { + Vec::new() + } else { + vec![keep] + }, + bounds: upper_bounds, + }; + (lower, upper) + } + } + + /// Compute `(lower, upper)` antichains over a chain by walking each + /// chunk's [`Merger::time_range`]. Returns `None` if the chain is + /// empty or any chunk reports `None`. + fn bounds_from_chain>( + chain: &[M::Chunk], + ) -> Option<(Antichain, Antichain)> { + let mut bounds: Option<(Antichain, Antichain)> = None; + for chunk in chain { + let chunk_bounds = M::time_range(chunk)?; + bounds = Some(match bounds.take() { + Some(existing) => merge_bounds(existing, chunk_bounds), + None => chunk_bounds, + }); + } + bounds + } + + /// Union two `(lower, upper)` antichain pairs. + /// + /// `lower` is an antichain of minimal elements; the union retains + /// only minimal elements from the combined set, exactly what + /// [`Antichain::insert`] does. `upper` is an antichain of maximal + /// elements; the union retains only maximal elements, computed via + /// [`reduce_to_maximal`]. + fn merge_bounds( + a: (Antichain, Antichain), + b: (Antichain, Antichain), + ) -> (Antichain, Antichain) { + let (a_lower, a_upper) = a; + let (b_lower, b_upper) = b; + let mut lower = a_lower; + for elt in b_lower.into_iter() { + lower.insert(elt); + } + let upper_elements: Vec = a_upper.into_iter().chain(b_upper).collect(); + let upper = super::reduce_to_maximal(upper_elements); + (lower, upper) + } + + /// A merge batcher that holds back future-stamped updates using a + /// [`BucketChain`], while routing all incoming data through plain + /// merge-batcher chains. + /// + /// Outer shape mirrors [`super::MergeBatcher`]: a chunker `C` produces + /// [`Merger::Chunk`]s, a merger `M` merges and extracts them. Internally, + /// the flat `chains: Vec>` field — identical to plain + /// `MergeBatcher` — receives every input chunk. A separate + /// [`BucketChain`] holds only the `kept` (future-stamped) tail produced + /// by seal-time extracts, so the bucketing machinery does no work for + /// data that is already at or below the current frontier. + pub struct TemporalBucketingMergeBatcher + where + M: Merger, + { + chunker: C, + /// Flat geometric chains, mirroring plain [`super::MergeBatcher`]. + /// Every input chunk lands here. + chains: Vec>, + /// Bucket chain holding only future-stamped data deposited by prior + /// seals' `kept` extracts. + bucket_chain: BucketChain>, + /// Total record count currently held in [`Self::bucket_chain`]. + /// + /// Maintained alongside insertions and peel-time drains so `seal` + /// can take a fast path that skips all bucket-chain work whenever + /// no future-stamped data has accumulated. For non-temporal + /// workloads this counter stays at `0` and the per-seal cost + /// matches plain [`super::MergeBatcher`]. + held_records: usize, + stash: Vec, + merger: M, + lower: Antichain, + frontier: Antichain, + #[allow(dead_code)] + logger: Option, + #[allow(dead_code)] + operator_id: usize, + _marker: PhantomData, + } + + impl TemporalBucketingMergeBatcher + where + C: ContainerBuilder, + M: Merger, + { + /// Insert a chain into the flat path with geometric merging, + /// matching plain [`super::MergeBatcher::insert_chain`]. + fn insert_chain(&mut self, chain: Vec) { + if chain.is_empty() { + return; + } + self.chains.push(chain); + while self.chains.len() > 1 + && self.chains[self.chains.len() - 1].len() + >= self.chains[self.chains.len() - 2].len() / 2 + { + let l1 = self.chains.pop().unwrap(); + let l2 = self.chains.pop().unwrap(); + let mut out = Vec::with_capacity(l1.len() + l2.len()); + self.merger.merge(l1, l2, &mut out, &mut self.stash); + self.chains.push(out); + } + } + + /// Route an entire sorted, consolidated chain into + /// [`Self::bucket_chain`]. + /// + /// The whole chain is fed as a unit — it is already a + /// geometric-merge-ready level, so the destination bucket can + /// absorb it without per-chunk pairwise merging. Routing per + /// chunk would force the bucket to rebuild the chain via + /// O(records × log chunks) merges. + /// + /// Fast path: when the chain's combined `(lower, upper)` time + /// bounds all map to a single bucket, push the chain into that + /// bucket directly. No `M::extract`. + /// + /// Slow path (straddle or unknown bounds): drive a frontier-led + /// loop that jumps to the bucket containing the next remaining + /// time, extracts that bucket's portion, and ships it. The + /// extract output frontier (the lower bound of `keep`) selects + /// the next bucket, so empty intermediate buckets are skipped + /// instead of paying an `O(records)` extract per boundary. + /// [`BucketChain::peel`] correctness depends on each bucket + /// only holding data in its declared range — peeled buckets + /// are emitted as ready without further extract, so a bucket + /// must not contain data with `t >= bucket_end`. + /// + /// Used at seal time for `kept` chains; never on the input path. + fn route_chain_into_bucket_chain(&mut self, chain: Vec) { + if chain.is_empty() { + return; + } + let total_records: usize = chain.iter().map(|c| M::account(c).0).sum(); + self.held_records = self.held_records.saturating_add(total_records); + + if self.bucket_chain.is_empty() { + // Chain was fully drained by a prior peel. Reseed with a + // single full-domain bucket and slot the chain there. + self.bucket_chain = BucketChain::new(MergeBucket::default()); + let bucket = self + .bucket_chain + .find_mut(&M::Time::minimum()) + .expect("freshly seeded chain has a bucket"); + bucket.insert_chain(chain, &mut self.merger, &mut self.stash); + return; + } + + // Fast path: union the chain's chunk bounds and check whether + // they all live in a single bucket. + let bounds = bounds_from_chain::(&chain); + let target_start = bounds.as_ref().and_then(|(lower, upper)| { + let min_t = lower.elements().first()?; + let target = self.bucket_chain.range_of(min_t)?.start; + let all_fit = upper.elements().iter().all(|u| { + self.bucket_chain + .range_of(u) + .is_some_and(|r| r.start == target) + }); + all_fit.then_some(target) + }); + if let Some(start) = target_start { + let bucket = self + .bucket_chain + .find_mut(&start) + .expect("bucket exists for known time"); + bucket.insert_chain(chain, &mut self.merger, &mut self.stash); + return; + } + + // Slow path: walk bucket starts with a monotonically + // advancing cursor, driven by the frontier returned from + // each `extract`. Seed with the chain's combined lower + // bound when known so we land on the first populated + // bucket immediately; otherwise start at the chain head + // and let the first extract supply the real frontier. + // + // Snapshot starts; we can't hold an immutable iterator + // across the `find_mut` borrow that follows each extract. + let starts: Vec = self.bucket_chain.starts().cloned().collect(); + let mut bucket_idx: usize = 0; + let mut scratch_frontier = match bounds { + Some((lower, _)) => lower, + None => Antichain::from_elem(M::Time::minimum()), + }; + let mut current = chain; + while !scratch_frontier.is_empty() && !current.is_empty() { + let next_t = scratch_frontier + .elements() + .first() + .expect("non-empty antichain") + .clone(); + // Advance the cursor until `starts[bucket_idx] <= + // next_t < starts[bucket_idx + 1]`. The total advance + // across all iterations is bounded by `starts.len()`, + // so empty intermediate buckets are skipped without + // paying an `extract` per boundary. + while bucket_idx + 1 < starts.len() && starts[bucket_idx + 1] <= next_t { + bucket_idx += 1; + } + let bucket_start = starts[bucket_idx].clone(); + // Exclusive upper is the next bucket's start; the + // unbounded last bucket has none, signaled to + // `extract` by an empty antichain. + let upper = match starts.get(bucket_idx + 1) { + Some(end) => Antichain::from_elem(end.clone()), + None => Antichain::new(), + }; + scratch_frontier.clear(); + let mut ship = Vec::new(); + let mut keep = Vec::new(); + self.merger.extract( + std::mem::take(&mut current), + upper.borrow(), + &mut scratch_frontier, + &mut ship, + &mut keep, + &mut self.stash, + ); + if !ship.is_empty() { + let bucket = self + .bucket_chain + .find_mut(&bucket_start) + .expect("bucket exists for known start"); + bucket.insert_chain(ship, &mut self.merger, &mut self.stash); + } + current = keep; + } + } + + /// Set `self.frontier` to the lower bound of the lowest non-empty + /// bucket, or empty when no held data exists. The bucket start is a + /// valid lower bound on the times in that bucket (every held time + /// satisfies `start.less_equal(t)`), so reporting it preserves the + /// `Batcher::frontier` contract — at the cost of up to one bucket + /// span of lag relative to the actual minimum held time. The + /// bucketing already trades input-side temporal precision for batched + /// releases; aligning the reported frontier with the same bucket + /// boundaries is internally consistent and removes the per-seal + /// merge+extract over the lowest non-empty bucket. + fn recompute_frontier(&mut self) { + self.frontier.clear(); + if let Some(start) = self + .bucket_chain + .iter() + .find(|(_, b)| !b.is_empty()) + .map(|(t, _)| t.clone()) + { + self.frontier = Antichain::from_elem(start); + } + } + } + + impl Batcher for TemporalBucketingMergeBatcher + where + C: ContainerBuilder + for<'a> PushInto<&'a mut Input>, + M: Merger, + { + type Input = Input; + type Time = M::Time; + type Output = M::Chunk; + + fn new(logger: Option, operator_id: usize) -> Self { + Self { + logger, + operator_id, + chunker: C::default(), + merger: M::default(), + chains: Vec::new(), + bucket_chain: BucketChain::new(MergeBucket::default()), + held_records: 0, + stash: Vec::new(), + frontier: Antichain::new(), + lower: Antichain::from_elem(M::Time::minimum()), + _marker: PhantomData, + } + } + + fn push_container(&mut self, container: &mut Input) { + self.chunker.push_into(container); + while let Some(chunk) = self.chunker.extract() { + let chunk = std::mem::take(chunk); + self.insert_chain(vec![chunk]); + } + } + + fn seal>( + &mut self, + upper: Antichain, + ) -> B::Output { + // Drain the chunker into the flat path. + while let Some(chunk) = self.chunker.finish() { + let chunk = std::mem::take(chunk); + self.insert_chain(vec![chunk]); + } + + // Merge all flat chains into one sorted chain. + while self.chains.len() > 1 { + let l1 = self.chains.pop().unwrap(); + let l2 = self.chains.pop().unwrap(); + let mut out = Vec::with_capacity(l1.len() + l2.len()); + self.merger.merge(l1, l2, &mut out, &mut self.stash); + self.chains.push(out); + } + let merged_flat = self.chains.pop().unwrap_or_default(); + + // Extract from the flat side. `flat_ready` has `t < upper`, + // `kept` has `t >= upper` and feeds the bucket chain. We use a + // scratch frontier because `self.frontier` is recomputed below. + let mut flat_ready: Vec = Vec::new(); + let mut kept = Vec::new(); + let mut scratch_frontier = Antichain::new(); + self.merger.extract( + merged_flat, + upper.borrow(), + &mut scratch_frontier, + &mut flat_ready, + &mut kept, + &mut self.stash, + ); + + // Diagnostic counters; cheap to collect, gated to DEBUG below. + let trace_enabled = tracing::enabled!(tracing::Level::DEBUG); + let flat_ready_records: usize = if trace_enabled { + flat_ready.iter().map(|c| M::account(c).0).sum() + } else { + 0 + }; + let kept_records: usize = if trace_enabled { + kept.iter().map(|c| M::account(c).0).sum() + } else { + 0 + }; + let held_before = self.held_records; + + // Fast path: no held data and nothing new to hold. Skip the + // bucket-chain bookkeeping (peel, merge, restore, frontier + // recomputation) so the per-seal cost matches the plain + // [`super::MergeBatcher`]. + let mut peel_records: usize = 0; + let mut peeled_buckets: usize = 0; + let fast_path = self.held_records == 0 && kept.is_empty(); + let mut readied = if fast_path { + self.frontier.clear(); + flat_ready + } else { + // Future-stamped data enters the bucket chain. Route + // the whole chain at once so the destination bucket can + // adopt it as a single geometric level rather than + // pairwise-merging it back into existence. + self.route_chain_into_bucket_chain(kept); + + // Peel buckets fully below `upper`. The peel may split the + // boundary-crossing bucket; the upper half is reinserted. + let peeled = self.bucket_chain.peel(upper.borrow()); + + // Merge each peeled bucket into a single sorted chain, then + // merge all of those into one running chain. Account peeled + // records as leaving the bucket chain. + let mut peeled_chain: Vec = Vec::new(); + for bucket in peeled { + let bucket_one = bucket.merge_into_one(&mut self.merger, &mut self.stash); + if bucket_one.is_empty() { + continue; + } + let drained: usize = bucket_one.iter().map(|c| M::account(c).0).sum(); + self.held_records = self.held_records.saturating_sub(drained); + peel_records = peel_records.saturating_add(drained); + peeled_buckets = peeled_buckets.saturating_add(1); + if peeled_chain.is_empty() { + peeled_chain = bucket_one; + } else { + let mut out = Vec::with_capacity(peeled_chain.len() + bucket_one.len()); + self.merger + .merge(peeled_chain, bucket_one, &mut out, &mut self.stash); + peeled_chain = out; + } + } + + // Combine flat-side ready chunks with peeled-side chunks into a + // single sorted chain. Peeled buckets are entirely below `upper` + // by `peel`'s contract, so no further extract is needed. + let combined = if flat_ready.is_empty() { + peeled_chain + } else if peeled_chain.is_empty() { + flat_ready + } else { + let mut out = Vec::with_capacity(flat_ready.len() + peeled_chain.len()); + self.merger + .merge(flat_ready, peeled_chain, &mut out, &mut self.stash); + out + }; + + // Maintain bucket-chain shape with bounded fuel. + let mut fuel = RESTORE_FUEL; + self.bucket_chain.restore(&mut fuel); + + // Recompute the held-data frontier from the lowest non-empty + // bucket. After the seal, all data is in `bucket_chain` (the + // flat chains have been drained). For totally-ordered timestamps + // — the only kind that impl `BucketTimestamp` — this produces a + // single-element antichain at the smallest held time. + self.recompute_frontier(); + + combined + }; + + self.stash.clear(); + + tracing::debug!( + target: "mz_timely_util::merge_batcher::temporal", + operator_id = self.operator_id, + upper = ?upper.borrow(), + lower = ?self.lower.borrow(), + frontier_after = ?self.frontier.borrow(), + flat_ready_records, + kept_records, + held_before, + held_after = self.held_records, + peeled_buckets, + peel_records, + fast_path, + bucket_count = self.bucket_chain.len(), + "seal", + ); + + let description = Description::new( + self.lower.clone(), + upper.clone(), + Antichain::from_elem(M::Time::minimum()), + ); + let seal = B::seal(&mut readied, description); + self.lower = upper; + seal + } + + fn frontier(&mut self) -> AntichainRef<'_, M::Time> { + self.frontier.borrow() + } + } + + impl TemporalBucketingMergeBatcher + where + M: Merger, + { + /// Returns true if both the flat chains and every bucket are empty. + #[cfg(test)] + pub(super) fn is_chain_empty(&self) -> bool { + if !self.chains.is_empty() { + return false; + } + let starts: Vec<_> = self.bucket_chain.starts().cloned().collect(); + starts.iter().all(|s| { + self.bucket_chain + .find(s) + .map(|b| b.is_empty()) + .unwrap_or(true) + }) + } + } + + #[cfg(test)] + mod tests { + use std::marker::PhantomData; + + use differential_dataflow::trace::implementations::chunker::ContainerChunker; + use differential_dataflow::trace::{Batcher, Builder, Description}; + use timely::progress::{Antichain, Timestamp}; + + use crate::merge_batcher::TemporalBucketingMergeBatcher; + use crate::merge_batcher::container::VecMerger; + + type TestTime = u64; + type TestUpdate = (u64, TestTime, i64); + type TestBatcher = TemporalBucketingMergeBatcher< + Vec, + ContainerChunker>, + VecMerger, + >; + + /// A `Builder` that collects sealed chunks into a flat `Vec`. + struct VecCapturingBuilder(PhantomData<(D, T)>); + + impl Builder for VecCapturingBuilder { + type Input = Vec; + type Time = T; + type Output = Vec; + + fn with_capacity(_keys: usize, _vals: usize, _upds: usize) -> Self { + unimplemented!() + } + fn push(&mut self, _chunk: &mut Self::Input) { + unimplemented!() + } + fn done(self, _description: Description) -> Self::Output { + unimplemented!() + } + + fn seal( + chain: &mut Vec, + _description: Description, + ) -> Self::Output { + let mut out = Vec::new(); + for mut chunk in std::mem::take(chain) { + out.append(&mut chunk); + } + out + } + } + + fn push(batcher: &mut TestBatcher, mut updates: Vec) { + batcher.push_container(&mut updates); + } + + fn seal(batcher: &mut TestBatcher, upper: Option) -> Vec { + let antichain = match upper { + Some(u) => Antichain::from_elem(u), + None => Antichain::new(), + }; + batcher.seal::>(antichain) + } + + #[mz_ore::test] + fn distinct_timestamps_stay_distinct() { + let mut batcher = TestBatcher::new(None, 0); + let input: Vec = (0_u64..32).map(|i| (i, i, 1_i64)).collect(); + push(&mut batcher, input.clone()); + + let drained = seal(&mut batcher, None); + let mut sorted_drained = drained.clone(); + sorted_drained.sort(); + let mut sorted_input = input; + sorted_input.sort(); + assert_eq!(sorted_drained, sorted_input); + } + + #[mz_ore::test] + fn seal_returns_strictly_less_than_upper() { + let mut batcher = TestBatcher::new(None, 0); + let input: Vec = (0_u64..16).map(|i| (i, i, 1_i64)).collect(); + push(&mut batcher, input); + + let early = seal(&mut batcher, Some(8)); + for (_, t, _) in &early { + assert!(*t < 8, "seal returned t={t} not strictly less than 8"); + } + // Updates with t in [0, 8) must all be present. + let mut early_times: Vec = early.iter().map(|(_, t, _)| *t).collect(); + early_times.sort(); + assert_eq!(early_times, (0_u64..8).collect::>()); + + let rest = seal(&mut batcher, None); + let mut rest_times: Vec = rest.iter().map(|(_, t, _)| *t).collect(); + rest_times.sort(); + assert_eq!(rest_times, (8_u64..16).collect::>()); + + assert!(batcher.is_chain_empty()); + } + + #[mz_ore::test] + fn frontier_reports_held_lower_bound() { + let mut batcher = TestBatcher::new(None, 0); + let input: Vec = vec![(1, 3, 1), (2, 5, 1), (3, 7, 1), (4, 11, 1)]; + push(&mut batcher, input); + + // Seal up to 4. Held data is {5, 7, 11}; the smallest is 5. + // The reported frontier is the start of the bucket holding 5, + // which is a valid lower bound on the held times (`<= 5`). + let _ = seal(&mut batcher, Some(4)); + let f = batcher.frontier(); + let bound = *f.first().expect("frontier non-empty while data is held"); + assert!( + bound <= 5, + "reported frontier {bound} must be a lower bound of held min 5" + ); + + // Drain everything. + let _ = seal(&mut batcher, None); + assert!(batcher.frontier().is_empty()); + } + + #[mz_ore::test] + fn future_stamped_updates_are_held_back() { + let mut batcher = TestBatcher::new(None, 0); + // All updates at t >= 100, but seal upper is 50 — nothing should be released. + let input: Vec = (100_u64..120).map(|i| (i, i, 1_i64)).collect(); + push(&mut batcher, input.clone()); + + let early = seal(&mut batcher, Some(50)); + assert!(early.is_empty(), "no data with t < 50 should be released"); + + // After draining, all updates show up. + let drained = seal(&mut batcher, None); + let mut sorted = drained.clone(); + sorted.sort(); + let mut input_sorted = input; + input_sorted.sort(); + assert_eq!(sorted, input_sorted); + } + } +} + +pub mod container { + //! Merger implementations for the merge batcher. + //! + //! The `InternalMerge` trait allows containers to merge sorted, consolidated + //! data using internal iteration. The `InternalMerger` type implements the + //! `Merger` trait using `InternalMerge`, and is the standard merger for all + //! container types. + + use std::marker::PhantomData; + use timely::container::SizableContainer; + use timely::progress::frontier::{Antichain, AntichainRef}; + use timely::{Accountable, PartialOrder}; + + use crate::merge_batcher::Merger; + + /// A container that supports the operations needed by the merge batcher: + /// merging sorted chains and extracting updates by time. + pub trait InternalMerge: Accountable + SizableContainer + Default { + /// The owned time type, for maintaining antichains. + type TimeOwned; + + /// The number of items in this container. + fn len(&self) -> usize; + + /// Clear the container for reuse. + fn clear(&mut self); + + /// Account the allocations behind the chunk. + fn account(&self) -> (usize, usize, usize, usize) { + let (size, capacity, allocations) = (0, 0, 0); + ( + usize::try_from(self.record_count()).unwrap(), + size, + capacity, + allocations, + ) + } + + /// Merge items from sorted inputs into `self`, advancing positions. + /// Merges until `self` is at capacity or all inputs are exhausted. + /// + /// Dispatches based on the number of inputs: + /// - **0**: no-op + /// - **1**: bulk copy (may swap the input into `self`) + /// - **2**: merge two sorted streams + fn merge_from(&mut self, others: &mut [Self], positions: &mut [usize]); + + /// Extract updates from `self` into `ship` (times not beyond `upper`) + /// and `keep` (times beyond `upper`), updating `frontier` with kept times. + /// + /// Iteration starts at `*position` and advances `*position` as updates + /// are consumed. The implementation must yield (return early) when + /// either `keep.at_capacity()` or `ship.at_capacity()` becomes true, + /// so the caller can swap out a full output buffer and resume by + /// calling `extract` again. The caller invokes `extract` repeatedly + /// until `*position >= self.len()`. + /// + /// This shape exists because `at_capacity()` for `Vec` is + /// `len() == capacity()`, which silently becomes false again the + /// moment a push past capacity grows the backing allocation. + /// Without per-element yielding, a single `extract` call can + /// quietly produce oversized output chunks. + fn extract( + &mut self, + position: &mut usize, + upper: AntichainRef, + frontier: &mut Antichain, + keep: &mut Self, + ship: &mut Self, + ); + + /// Return `(lower, upper)` antichains bracketing the times in + /// `self`, or `None` if the container is empty / the impl can't + /// answer. See [`Merger::time_range`] for the contract. + fn time_range(&self) -> Option<(Antichain, Antichain)> { + None + } + } + + /// A `Merger` for `Vec` containers, which contain owned data and need special treatment. + pub type VecInternalMerger = VecMerger; + /// A `Merger` implementation for `Vec<(D, T, R)>` that drains owned inputs. + pub struct VecMerger { + _marker: PhantomData<(D, T, R)>, + } + + impl Default for VecMerger { + fn default() -> Self { + Self { + _marker: PhantomData, + } + } + } + + impl VecMerger { + /// The target capacity for output buffers, as a power of two. + /// + /// This amount is used to size vectors, where vectors not exactly this capacity are dropped. + /// If this is mis-set, there is the potential for more memory churn than anticipated. + fn target_capacity() -> usize { + timely::container::buffer::default_capacity::<(D, T, R)>().next_power_of_two() + } + /// Acquire a buffer with the target capacity. + fn empty(&self, stash: &mut Vec>) -> Vec<(D, T, R)> { + let target = Self::target_capacity(); + let mut container = stash.pop().unwrap_or_default(); + container.clear(); + // Reuse if at target; otherwise allocate fresh. + if container.capacity() != target { + container = Vec::with_capacity(target); + } + container + } + /// Refill `queue` from `iter` if empty. Recycles drained queues into `stash`. + fn refill( + queue: &mut std::collections::VecDeque<(D, T, R)>, + iter: &mut impl Iterator>, + stash: &mut Vec>, + ) { + if queue.is_empty() { + let target = Self::target_capacity(); + if stash.len() < 2 { + let mut recycled = Vec::from(std::mem::take(queue)); + recycled.clear(); + if recycled.capacity() == target { + stash.push(recycled); + } + } + if let Some(chunk) = iter.next() { + *queue = std::collections::VecDeque::from(chunk); + } + } + } + } + + impl Merger for VecMerger + where + D: Ord + Clone + 'static, + T: Ord + Clone + PartialOrder + 'static, + R: differential_dataflow::difference::Semigroup + Clone + 'static, + { + type Chunk = Vec<(D, T, R)>; + type Time = T; + + fn merge( + &mut self, + list1: Vec>, + list2: Vec>, + output: &mut Vec>, + stash: &mut Vec>, + ) { + use std::cmp::Ordering; + use std::collections::VecDeque; + + let mut iter1 = list1.into_iter(); + let mut iter2 = list2.into_iter(); + let mut q1 = VecDeque::<(D, T, R)>::from(iter1.next().unwrap_or_default()); + let mut q2 = VecDeque::<(D, T, R)>::from(iter2.next().unwrap_or_default()); + + let mut result = self.empty(stash); + + // Merge while both queues are non-empty. + while let (Some((d1, t1, _)), Some((d2, t2, _))) = (q1.front(), q2.front()) { + match (d1, t1).cmp(&(d2, t2)) { + Ordering::Less => { + result.push(q1.pop_front().unwrap()); + } + Ordering::Greater => { + result.push(q2.pop_front().unwrap()); + } + Ordering::Equal => { + let (d, t, mut r1) = q1.pop_front().unwrap(); + let (_, _, r2) = q2.pop_front().unwrap(); + r1.plus_equals(&r2); + if !r1.is_zero() { + result.push((d, t, r1)); + } + } + } + + if result.at_capacity() { + output.push(std::mem::take(&mut result)); + result = self.empty(stash); + } + + // Refill emptied queues from their chains. + if q1.is_empty() { + Self::refill(&mut q1, &mut iter1, stash); + } + if q2.is_empty() { + Self::refill(&mut q2, &mut iter2, stash); + } + } + + // Push partial result and remaining data from both sides. + if !result.is_empty() { + output.push(result); + } + for q in [q1, q2] { + if !q.is_empty() { + output.push(Vec::from(q)); + } + } + output.extend(iter1); + output.extend(iter2); + } + + fn extract( + &mut self, + merged: Vec>, + upper: AntichainRef, + frontier: &mut Antichain, + ship: &mut Vec>, + kept: &mut Vec>, + stash: &mut Vec>, + ) { + let mut keep = self.empty(stash); + let mut ready = self.empty(stash); + + for mut chunk in merged { + // Go update-by-update to swap out full containers. + for (data, time, diff) in chunk.drain(..) { + if upper.less_equal(&time) { + frontier.insert_with(&time, |time| time.clone()); + keep.push((data, time, diff)); + } else { + ready.push((data, time, diff)); + } + if keep.at_capacity() { + kept.push(std::mem::take(&mut keep)); + keep = self.empty(stash); + } + if ready.at_capacity() { + ship.push(std::mem::take(&mut ready)); + ready = self.empty(stash); + } + } + // Recycle the now-empty chunk if it has the right capacity. + if chunk.capacity() == Self::target_capacity() { + stash.push(chunk); + } + } + if !keep.is_empty() { + kept.push(keep); + } + if !ready.is_empty() { + ship.push(ready); + } + } + + fn account(chunk: &Vec<(D, T, R)>) -> (usize, usize, usize, usize) { + (chunk.len(), 0, 0, 0) + } + + fn time_range(chunk: &Vec<(D, T, R)>) -> Option<(Antichain, Antichain)> { + if chunk.is_empty() { + return None; + } + Some(super::build_time_bounds( + chunk.iter().map(|(_, t, _)| t.clone()), + )) + } + } + + /// A merger that uses internal iteration via [`InternalMerge`]. + pub struct InternalMerger { + _marker: PhantomData, + } + + impl Default for InternalMerger { + fn default() -> Self { + Self { + _marker: PhantomData, + } + } + } + + impl InternalMerger + where + MC: InternalMerge, + { + #[inline] + fn empty(&self, stash: &mut Vec) -> MC { + stash.pop().unwrap_or_else(|| { + let mut container = MC::default(); + container.ensure_capacity(&mut None); + container + }) + } + #[inline] + fn recycle(&self, mut chunk: MC, stash: &mut Vec) { + chunk.clear(); + stash.push(chunk); + } + /// Drain remaining items from one side into `result`/`output`. + /// + /// Copies the partially-consumed head into `result`, then appends + /// remaining full chunks directly to `output` without copying. + fn drain_side( + &self, + head: &mut MC, + pos: &mut usize, + list: &mut std::vec::IntoIter, + result: &mut MC, + output: &mut Vec, + stash: &mut Vec, + ) { + // Copy the partially-consumed head into result. + if *pos < head.len() { + result.merge_from(std::slice::from_mut(head), std::slice::from_mut(pos)); + } + // Flush result before appending full chunks. + if !result.is_empty() { + output.push(std::mem::take(result)); + *result = self.empty(stash); + } + // Remaining full chunks go directly to output. + output.extend(list); + } + } + + impl Merger for InternalMerger + where + MC: InternalMerge + 'static, + { + type Time = MC::TimeOwned; + type Chunk = MC; + + fn merge( + &mut self, + list1: Vec, + list2: Vec, + output: &mut Vec, + stash: &mut Vec, + ) { + let mut list1 = list1.into_iter(); + let mut list2 = list2.into_iter(); + + let mut heads = [ + list1.next().unwrap_or_default(), + list2.next().unwrap_or_default(), + ]; + let mut positions = [0usize, 0usize]; + + let mut result = self.empty(stash); + + // Main merge loop: both sides have data. + while positions[0] < heads[0].len() && positions[1] < heads[1].len() { + result.merge_from(&mut heads, &mut positions); + + if positions[0] >= heads[0].len() { + let old = std::mem::replace(&mut heads[0], list1.next().unwrap_or_default()); + self.recycle(old, stash); + positions[0] = 0; + } + if positions[1] >= heads[1].len() { + let old = std::mem::replace(&mut heads[1], list2.next().unwrap_or_default()); + self.recycle(old, stash); + positions[1] = 0; + } + if result.at_capacity() { + output.push(std::mem::take(&mut result)); + result = self.empty(stash); + } + } + + // Drain remaining from each side: copy partial head, then append full chunks. + self.drain_side( + &mut heads[0], + &mut positions[0], + &mut list1, + &mut result, + output, + stash, + ); + self.drain_side( + &mut heads[1], + &mut positions[1], + &mut list2, + &mut result, + output, + stash, + ); + if !result.is_empty() { + output.push(result); + } + } + + fn extract( + &mut self, + merged: Vec, + upper: AntichainRef, + frontier: &mut Antichain, + ship: &mut Vec, + kept: &mut Vec, + stash: &mut Vec, + ) { + let mut keep = self.empty(stash); + let mut ready = self.empty(stash); + + for mut buffer in merged { + let mut position = 0; + let len = buffer.len(); + while position < len { + buffer.extract(&mut position, upper, frontier, &mut keep, &mut ready); + if keep.at_capacity() { + kept.push(std::mem::take(&mut keep)); + keep = self.empty(stash); + } + if ready.at_capacity() { + ship.push(std::mem::take(&mut ready)); + ready = self.empty(stash); + } + } + self.recycle(buffer, stash); + } + if !keep.is_empty() { + kept.push(keep); + } + if !ready.is_empty() { + ship.push(ready); + } + } + + fn account(chunk: &Self::Chunk) -> (usize, usize, usize, usize) { + chunk.account() + } + + fn time_range( + chunk: &Self::Chunk, + ) -> Option<(Antichain, Antichain)> { + chunk.time_range() + } + } + + /// Implementation of `InternalMerge` for `Vec<(D, T, R)>`. + /// + /// Note: The `VecMerger` type implements `Merger` directly and avoids + /// cloning by draining inputs. This `InternalMerge` impl is retained + /// because `reduce` requires `Builder::Input: InternalMerge`. + pub mod vec_internal { + use super::InternalMerge; + use differential_dataflow::difference::Semigroup; + use std::cmp::Ordering; + use timely::PartialOrder; + use timely::container::SizableContainer; + use timely::progress::frontier::{Antichain, AntichainRef}; + + impl< + D: Ord + Clone + 'static, + T: Ord + Clone + PartialOrder + 'static, + R: Semigroup + Clone + 'static, + > InternalMerge for Vec<(D, T, R)> + { + type TimeOwned = T; + + fn len(&self) -> usize { + Vec::len(self) + } + fn clear(&mut self) { + Vec::clear(self) + } + + fn merge_from(&mut self, others: &mut [Self], positions: &mut [usize]) { + match others.len() { + 0 => {} + 1 => { + let other = &mut others[0]; + let pos = &mut positions[0]; + if self.is_empty() && *pos == 0 { + std::mem::swap(self, other); + return; + } + self.extend_from_slice(&other[*pos..]); + *pos = other.len(); + } + 2 => { + let (left, right) = others.split_at_mut(1); + let other1 = &mut left[0]; + let other2 = &mut right[0]; + + while positions[0] < other1.len() + && positions[1] < other2.len() + && !self.at_capacity() + { + let (d1, t1, _) = &other1[positions[0]]; + let (d2, t2, _) = &other2[positions[1]]; + // NOTE: The .clone() calls here are not great, but this dead code to be removed in the next release. + match (d1, t1).cmp(&(d2, t2)) { + Ordering::Less => { + self.push(other1[positions[0]].clone()); + positions[0] += 1; + } + Ordering::Greater => { + self.push(other2[positions[1]].clone()); + positions[1] += 1; + } + Ordering::Equal => { + let (d, t, mut r1) = other1[positions[0]].clone(); + let (_, _, ref r2) = other2[positions[1]]; + r1.plus_equals(r2); + if !r1.is_zero() { + self.push((d, t, r1)); + } + positions[0] += 1; + positions[1] += 1; + } + } + } + } + n => unimplemented!("{n}-way merge not yet supported"), + } + } + + fn extract( + &mut self, + position: &mut usize, + upper: AntichainRef, + frontier: &mut Antichain, + keep: &mut Self, + ship: &mut Self, + ) { + let len = self.len(); + while *position < len && !keep.at_capacity() && !ship.at_capacity() { + let (data, time, diff) = self[*position].clone(); + if upper.less_equal(&time) { + frontier.insert_with(&time, |time| time.clone()); + keep.push((data, time, diff)); + } else { + ship.push((data, time, diff)); + } + *position += 1; + } + } + + fn time_range(&self) -> Option<(Antichain, Antichain)> { + if self.is_empty() { + return None; + } + Some(crate::merge_batcher::build_time_bounds( + self.iter().map(|(_, t, _)| t.clone()), + )) + } + } + } +} diff --git a/src/timely-util/src/temporal.rs b/src/timely-util/src/temporal.rs index c7f155213bde8..2eec1ee2e0909 100644 --- a/src/timely-util/src/temporal.rs +++ b/src/timely-util/src/temporal.rs @@ -163,6 +163,20 @@ impl BucketChain { /// bucket of -2 bits just before the smallest bucket. #[inline] pub fn restore(&mut self, fuel: &mut i64) { + if self.content.is_empty() { + return; + } + // Fast path: if every bucket already satisfies the chain property, no work needed. + let mut last_bits = -2_isize; + let well_formed = self.content.values().all(|(bits, _)| { + let cur = isize::cast_from(*bits); + let ok = cur <= last_bits + 2; + last_bits = cur; + ok + }); + if well_formed { + return; + } // We could write this in terms of a cursor API, but it's not stable yet. Instead, we // allocate a new map and move elements over. let mut new = BTreeMap::default(); @@ -184,12 +198,24 @@ impl BucketChain { self.content = new; } + /// Iterate `(start, &bucket)` pairs in ascending start order. + #[inline] + pub fn iter(&self) -> impl Iterator { + self.content.iter().map(|(t, (_, s))| (t, s)) + } + /// Returns `true` if the chain is empty. This means there are no outstanding times left. #[inline(always)] pub fn is_empty(&self) -> bool { self.content.is_empty() } + /// Iterate over bucket lower bounds in ascending order. + #[inline] + pub fn starts(&self) -> impl Iterator { + self.content.keys() + } + /// The number of buckets in the chain. #[inline(always)] pub fn len(&self) -> usize { @@ -210,21 +236,25 @@ impl BucketChain { } } +/// Test-only [`BucketTimestamp`] impls, hoisted to crate scope so other +/// `#[cfg(test)]` modules in this crate can reuse them. #[cfg(test)] -mod tests { - use super::*; - - impl BucketTimestamp for u8 { - fn advance_by_power_of_two(&self, bits: u32) -> Option { - self.checked_add(1_u8.checked_shl(bits)?) - } +impl BucketTimestamp for u8 { + fn advance_by_power_of_two(&self, bits: u32) -> Option { + self.checked_add(1_u8.checked_shl(bits)?) } +} - impl BucketTimestamp for u64 { - fn advance_by_power_of_two(&self, bits: u32) -> Option { - self.checked_add(1_u64.checked_shl(bits)?) - } +#[cfg(test)] +impl BucketTimestamp for u64 { + fn advance_by_power_of_two(&self, bits: u32) -> Option { + self.checked_add(1_u64.checked_shl(bits)?) } +} + +#[cfg(test)] +mod tests { + use super::*; struct TestStorage { inner: Vec, diff --git a/test/feature-benchmark/mzcompose.py b/test/feature-benchmark/mzcompose.py index f169bc84c36fb..3ad49b02704fb 100644 --- a/test/feature-benchmark/mzcompose.py +++ b/test/feature-benchmark/mzcompose.py @@ -81,6 +81,7 @@ from materialize.feature_benchmark.scenarios.scale import * # noqa: F401 F403 from materialize.feature_benchmark.scenarios.skew import * # noqa: F401 F403 from materialize.feature_benchmark.scenarios.subscribe import * # noqa: F401 F403 +from materialize.feature_benchmark.scenarios.temporal import * # noqa: F401 F403 from materialize.feature_benchmark.termination import ( NormalDistributionOverlap, ProbForMin,