Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 122 additions & 0 deletions misc/python/materialize/feature_benchmark/scenarios/temporal.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.

from datetime import date, timedelta
from textwrap import dedent

from materialize.feature_benchmark.action import Action, TdAction
from materialize.feature_benchmark.measurement_source import MeasurementSource, Td
from materialize.feature_benchmark.scenario import Scenario


class TemporalFilter(Scenario):
"""Feature benchmarks related to temporal filters using mz_now()."""


class TemporalFilterIndexed(TemporalFilter):
"""Measure the time to create an index on a view with a temporal filter over 10M rows.

The view applies a temporal filter (mz_now() range) on top of a materialized
view joined with a table, ensuring the collection has future updates that
require temporal bucketing before arrangement.
"""

FIXED_SCALE = True

def init(self) -> list[Action]:
return [
TdAction(dedent("""
> DROP TABLE IF EXISTS anchor CASCADE;
> CREATE TABLE anchor (a int);
> INSERT INTO anchor VALUES (1);
""")),
]

def benchmark(self) -> MeasurementSource:
today = date.today()
next_week = today + timedelta(days=7)
return Td(dedent(f"""
> DROP VIEW IF EXISTS v_temporal CASCADE;
> DROP MATERIALIZED VIEW IF EXISTS mv_temporal CASCADE;

> CREATE MATERIALIZED VIEW mv_temporal AS
SELECT x, a
FROM generate_series(1, 10000000) AS x
CROSS JOIN anchor;

> SELECT COUNT(*) FROM mv_temporal;
10000000

> SELECT 1
/* A */
1

> CREATE VIEW v_temporal AS
SELECT x, a FROM mv_temporal
WHERE mz_now() >= extract(epoch from timestamp '{today}')::uint8 * 1000
AND mz_now() < extract(epoch from timestamp '{next_week}')::uint8 * 1000;

> CREATE INDEX idx_temporal ON v_temporal (x);

> SELECT COUNT(*) FROM v_temporal
/* B */
10000000
"""))


class TemporalFilterSustainedInsert(TemporalFilter):
"""Measure ingestion latency through a temporal-filtered indexed view.

Pre-loads 10M rows into a table that's filtered by a temporal predicate,
with the filtered view indexed. Then drives several small inserts and
times how long it takes for the index to reflect them.

Each input row produces an insert at today + a retract at next-week
through the temporal MFP. Without bucketing both updates land in the
index's merge batcher, growing the arrangement spine; with bucketing,
only today's update lands in the spine while future retractions
accumulate in the BucketChain. The smaller spine keeps per-insert
incremental merge work cheaper and that work is on the critical path
of the trailing SELECT.
"""

FIXED_SCALE = True

def benchmark(self) -> MeasurementSource:
today = date.today()
next_week = today + timedelta(days=7)
return Td(dedent(f"""
> DROP TABLE IF EXISTS t_temporal CASCADE;
> CREATE TABLE t_temporal (x int);
> INSERT INTO t_temporal SELECT generate_series(1, 10000000);

> CREATE VIEW v_temporal AS
SELECT x FROM t_temporal
WHERE mz_now() >= extract(epoch from timestamp '{today}')::uint8 * 1000
AND mz_now() < extract(epoch from timestamp '{next_week}')::uint8 * 1000;

> CREATE INDEX idx_temporal ON v_temporal (x);

> SELECT COUNT(*) FROM v_temporal;
10000000

> SELECT 1
/* A */
1

> INSERT INTO t_temporal SELECT generate_series(10000001, 10100000);
> INSERT INTO t_temporal SELECT generate_series(10100001, 10200000);
> INSERT INTO t_temporal SELECT generate_series(10200001, 10300000);
> INSERT INTO t_temporal SELECT generate_series(10300001, 10400000);
> INSERT INTO t_temporal SELECT generate_series(10400001, 10500000);

> SELECT COUNT(*) FROM v_temporal
/* B */
10500000
"""))
8 changes: 0 additions & 8 deletions src/compute-types/src/dyncfgs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,6 @@ pub const ENABLE_TEMPORAL_BUCKETING: Config<bool> = Config::new(
"Whether to enable temporal bucketing in compute.",
);

/// The summary to apply to the frontier in temporal bucketing in compute.
pub const TEMPORAL_BUCKETING_SUMMARY: Config<Duration> = Config::new(
"compute_temporal_bucketing_summary",
Duration::from_secs(2),
"The summary to apply to frontiers in temporal bucketing in compute.",
);

/// The yielding behavior with which linear joins should be rendered.
pub const LINEAR_JOIN_YIELDING: Config<&str> = Config::new(
"linear_join_yielding",
Expand Down Expand Up @@ -386,7 +379,6 @@ pub fn all_dyncfgs(configs: ConfigSet) -> ConfigSet {
.add(&CORRECTION_V2_CHAIN_PROPORTIONALITY)
.add(&CORRECTION_V2_CHUNK_SIZE)
.add(&ENABLE_TEMPORAL_BUCKETING)
.add(&TEMPORAL_BUCKETING_SUMMARY)
.add(&LINEAR_JOIN_YIELDING)
.add(&ENABLE_LGALLOC)
.add(&LGALLOC_BACKGROUND_INTERVAL)
Expand Down
1 change: 0 additions & 1 deletion src/compute/src/extensions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,3 @@

pub(crate) mod arrange;
pub(crate) mod reduce;
pub(crate) mod temporal_bucket;
81 changes: 79 additions & 2 deletions src/compute/src/extensions/arrange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,24 +11,31 @@ use std::collections::BTreeMap;
use std::rc::Rc;

use differential_dataflow::difference::Semigroup;
use differential_dataflow::dynamic::pointstamp::PointStamp;
use differential_dataflow::lattice::Lattice;
use differential_dataflow::operators::arrange::arrangement::arrange_core;
use differential_dataflow::operators::arrange::{Arranged, TraceAgent};
use differential_dataflow::trace::implementations::spine_fueled::Spine;
use differential_dataflow::trace::{Batch, Batcher, Builder, Trace, TraceReader};
use differential_dataflow::{Collection, Data, ExchangeData, Hashable, VecCollection};
use mz_repr::{Diff, Row};
use mz_timely_util::columnar::builder::ColumnBuilder;
use mz_timely_util::columnar::{Col2ValBatcher, Col2ValTemporalBatcher, Column, columnar_exchange};
use timely::Container;
use timely::dataflow::Stream;
use timely::dataflow::channels::pact::{Exchange, ParallelizationContract, Pipeline};
use timely::dataflow::channels::pact::{Exchange, ExchangeCore, ParallelizationContract, Pipeline};
use timely::dataflow::operators::Operator;
use timely::order::Product;
use timely::progress::Timestamp;

use crate::logging::compute::{
ArrangementHeapAllocations, ArrangementHeapCapacity, ArrangementHeapSize,
ArrangementHeapSizeOperator, ComputeEvent, ComputeEventBuilder,
};
use crate::row_spine::RowRowBuilder;
use crate::typedefs::{
KeyAgent, KeyValAgent, MzArrangeData, MzData, MzTimestamp, RowAgent, RowRowAgent, RowValAgent,
KeyAgent, KeyValAgent, MzArrangeData, MzData, MzTimestamp, RowAgent, RowRowAgent, RowRowSpine,
RowValAgent,
};

/// Extension trait to arrange data.
Expand Down Expand Up @@ -426,3 +433,73 @@ where
})
}
}

/// Per-`RenderTimestamp` dispatch trait for the row-row arrangement built by
/// [`crate::render::context::CollectionBundle::ensure_collections`].
///
/// Two impls exist, one per concrete `RenderTimestamp`. For
/// [`mz_repr::Timestamp`] the dispatch picks
/// [`Col2ValTemporalBatcher`] when temporal bucketing is enabled, else
/// falls through to the plain [`Col2ValBatcher`]. For
/// `Product<mz_repr::Timestamp, PointStamp<u64>>` (iterative scopes) it
/// always uses the plain batcher because the timestamp is partially
/// ordered and not bucketable.
///
/// This trait is intentionally **not** a supertrait of
/// `crate::render::RenderTimestamp` so that the bucketing precondition does
/// not leak into the generic timestamp interface.
pub trait MaybeTemporalRowRowArrange: MzTimestamp {
/// Arrange a row-row stream into a [`RowRowAgent`], possibly via the
/// temporal-bucketing batcher.
fn arrange_row_row<'scope>(
stream: Stream<'scope, Self, Column<((Row, Row), Self, Diff)>>,
name: &str,
bucketing_enabled: bool,
) -> Arranged<'scope, RowRowAgent<Self, Diff>>;
}

impl MaybeTemporalRowRowArrange for mz_repr::Timestamp {
fn arrange_row_row<'scope>(
stream: Stream<'scope, Self, Column<((Row, Row), Self, Diff)>>,
name: &str,
bucketing_enabled: bool,
) -> Arranged<'scope, RowRowAgent<Self, Diff>> {
let pact = ExchangeCore::<ColumnBuilder<_>, _>::new_core(
columnar_exchange::<Row, Row, Self, Diff>,
);
if bucketing_enabled {
stream.mz_arrange_core::<
_,
Col2ValTemporalBatcher<Row, Row, Self, Diff>,
RowRowBuilder<Self, Diff>,
RowRowSpine<Self, Diff>,
>(pact, name)
} else {
stream.mz_arrange_core::<
_,
Col2ValBatcher<Row, Row, Self, Diff>,
RowRowBuilder<Self, Diff>,
RowRowSpine<Self, Diff>,
>(pact, name)
}
}
}

impl MaybeTemporalRowRowArrange for Product<mz_repr::Timestamp, PointStamp<u64>> {
fn arrange_row_row<'scope>(
stream: Stream<'scope, Self, Column<((Row, Row), Self, Diff)>>,
name: &str,
_bucketing_enabled: bool,
) -> Arranged<'scope, RowRowAgent<Self, Diff>> {
// Iterative scope: timestamp is partially ordered, no bucketing.
let pact = ExchangeCore::<ColumnBuilder<_>, _>::new_core(
columnar_exchange::<Row, Row, Self, Diff>,
);
stream.mz_arrange_core::<
_,
Col2ValBatcher<Row, Row, Self, Diff>,
RowRowBuilder<Self, Diff>,
RowRowSpine<Self, Diff>,
>(pact, name)
}
}
Loading
Loading