Skip to content

compute: temporal-bucketing batcher#36427

Draft
antiguru wants to merge 7 commits intoMaterializeInc:mainfrom
antiguru:temporal-bucketing-batcher
Draft

compute: temporal-bucketing batcher#36427
antiguru wants to merge 7 commits intoMaterializeInc:mainfrom
antiguru:temporal-bucketing-batcher

Conversation

@antiguru
Copy link
Copy Markdown
Member

@antiguru antiguru commented May 6, 2026

Motivation

Temporal bucketing currently runs as a separate Timely operator (extensions::temporal_bucket) sitting upstream of every mz_arrange call.
The operator buffers future-stamped updates in a BucketChain<MergeBatcherWrapper> and re-emits them once the input frontier crosses bucket boundaries, which means data passes through two separate batcher hierarchies before reaching the arrangement spine.
Pushing the bucketing into the arrangement's own Batcher removes the operator hop, lets the spine see fewer bucket-aligned batches, and avoids the timestamp rewrites that the operator performed to align releases.

Description

mz_timely_util::merge_batcher vendors MergeBatcher, Merger, InternalMerge, InternalMerger, and VecMerger from differential-dataflow.
Vendoring is required because the bucket-aware logic operates on a merger's chunks, frontier, and stash --- machinery that is not exposed through the public Batcher trait alone.
The vendored bodies are byte-equivalent to upstream; the only added surface is a new temporal module containing MergeBucket and TemporalBucketingMergeBatcher.

TemporalBucketingMergeBatcher<I, C, M> is a hybrid: it has both a flat chains: Vec<Vec<M::Chunk>> field and a bucket_chain: BucketChain<MergeBucket<M>>.
Every input chunk lands in the flat chains via insert_chain with the same geometric merging that plain MergeBatcher performs, so the input path does not touch the bucket chain at all.
At seal time the flat chains are merged into a single sorted chain, Merger::extract is run against the seal upper, and the result is split into flat_ready (t < upper) and kept (t >= upper).
The kept chunks --- and only those chunks --- are routed into the bucket chain via per-boundary Merger::extract calls keyed off BucketChain::starts().
The bucket chain is then peeled at the same upper; mature buckets release their data, the resulting chunks are merged into a single sorted peeled_chain, and peeled_chain is merged with flat_ready to produce the final readied chunks the builder consumes.
Peel's contract guarantees the peeled buckets are entirely below upper, so no second extract is needed against upper after merging.

This layout keeps the bucket chain empty for workloads whose data is at or below the current frontier (the common case): kept is empty at every seal, the bucket chain is never seeded, and per-push and per-seal cost reduces to plain MergeBatcher with no overhead.
The bucket chain only pays its splitting and merging overhead for the genuine future tail of the input.
Bucket-chain shape is restored with a bounded fuel budget at the end of seal; push_container does no bucket-chain maintenance.

MergeBucket<M> holds Vec<Vec<M::Chunk>> (a geometric chain of sorted chunks, mirroring MergeBatcher's flat chain invariant) and implements temporal::Bucket so the chain can split it on demand.
MergeBucket::split(timestamp, fuel) merges its chains into a single sorted chain and runs Merger::extract(merged, [timestamp], ...) to partition the data into a lower bucket (t < timestamp) and an upper bucket (t >= timestamp).
Fuel is decremented by the record count touched so the chain's amortised splitting accounting still works.

Batcher::frontier() reports the actual minimum held timestamp rather than a bucket boundary.
A bucket's start is only a range-level lower bound, so reporting it would over-conservatively hold downstream capabilities.
After seal, all held data lives in bucket_chain (the flat chains have been emptied by the seal extract), so frontier recomputation walks only that side.
The lowest non-empty bucket's chains are merged into one sorted chain and run through Merger::extract against the bucket's own start; every entry is classified as keep (since all bucket times are >= start), which is the side that updates the frontier antichain.
For the totally ordered timestamps that impl BucketTimestamp the antichain collapses to the single minimum.
The merged chain is reinserted as the bucket's only chain so subsequent seals pay only the linear extract cost.

mz_timely_util::columnar exposes Col2ValTemporalBatcher<K, V, T, R> --- the temporal counterpart of Col2ValBatcher --- by wrapping TemporalBucketingMergeBatcher around the same Chunker and ColInternalMerger used elsewhere.
CollectionBundle::ensure_collections reads ENABLE_TEMPORAL_BUCKETING from the dataflow's ConfigSet and threads the flag down to the row-row arrangement build via the new MaybeTemporalRowRowArrange trait in src/compute/src/extensions/arrange.rs.
The trait has one impl per concrete RenderTimestamp: mz_repr::Timestamp picks Col2ValTemporalBatcher when the dyncfg is on and Col2ValBatcher otherwise; Product<mz_repr::Timestamp, PointStamp<u64>> (iterative scopes) always picks the plain batcher because the timestamp is partially ordered and not bucketable.
The trait is intentionally a sibling of RenderTimestamp rather than a supertrait, so the bucketing precondition (T: BucketTimestamp) does not leak into the generic timestamp interface; the bound is added explicitly on the impl block that hosts arrange_collection.

top_k.rs, reduce.rs, threshold.rs, typedefs.rs, columnation.rs, and upsert_continual_feedback_v2.rs flip their imports from differential_dataflow::trace::implementations::merge_batcher to the vendored mz_timely_util::merge_batcher so that the codebase has a single source for these types.

The source-side extensions::temporal_bucket operator and TEMPORAL_BUCKETING_SUMMARY are retained behind the existing dyncfgs as a fallback while the batcher path is benchmarked; removal is tracked as a follow-up.

antiguru and others added 7 commits May 6, 2026 12:29
Replaces the source-side temporal-bucketing operator on the arrangement
path with a bucket-aware merge batcher that integrates directly into
mz_arrange call sites. Vendors `MergeBatcher`, `Merger`, `InternalMerge`,
`InternalMerger`, and `VecMerger` from differential-dataflow into
`mz_timely_util::merge_batcher` so the batcher's flat
`chains: Vec<Vec<M::Chunk>>` can be replaced by a `BucketChain` of
`MergeBucket`s that hold updates by timestamp range.

`Batcher::seal(upper)` continues to emit only updates strictly less than
`upper`; bucketing is layered below the seal contract. `Batcher::frontier`
reports the actual minimum held timestamp by merging the lowest non-empty
bucket once per seal and consulting `Merger::extract`. Subsequent seals
see a single chain in that bucket, so the work is amortised.

`CollectionBundle::ensure_collections` reads `ENABLE_TEMPORAL_BUCKETING`
and dispatches the row-row arrangement build through a new per-
`RenderTimestamp` trait `MaybeTemporalRowRowArrange`. The trait is a
sibling to `RenderTimestamp`, not a supertrait, so the bucketing
precondition does not leak into the generic timestamp interface.
Iterative `Product<...>` always uses the non-temporal batcher.

The source-side `extensions::temporal_bucket` operator and
`TEMPORAL_BUCKETING_SUMMARY` are intentionally retained for now; whether
to keep them is a benchmark decision tracked as a follow-up.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Restructure `TemporalBucketingMergeBatcher` so the bucket chain holds only
future-stamped data. Incoming chunks now flow into a flat
`chains: Vec<Vec<M::Chunk>>` identical to plain `MergeBatcher`. The
`BucketChain` is touched only at seal time, and only for the `kept`
(future-stamped) side of the seal extract.

Avoids the SkewedJoin / FinishOrderByLimit regression observed with the
prior bucket-only layout: for those workloads the input has no future
component, so `kept` is empty at every seal and the bucket chain stays
empty. Per-push and per-seal cost reduces to the plain merge-batcher
path. The bucket chain only pays its splitting/merging overhead for the
future tail of the input.

The seal contract is preserved: only updates with time strictly less
than `upper` are emitted, and `frontier()` reports the actual lower
bound of held data (computed from the lowest non-empty bucket of
`bucket_chain`).
Cherry-picked from MaterializeInc#35935. Two scenarios exercise the bucketing path:

* TemporalFilterIndexed: index hydration over a temporal-filtered view of
  a 10M-row materialized view.
* TemporalFilterSustainedInsert: per-insert latency through a
  pre-warmed indexed temporal view.
The arrange-side `TemporalBucketingMergeBatcher` (per-`ArrangeBy`)
fully covers the source-side delay this operator used to provide.
Keeping both pays a double bucketing cost on every dataflow that
touches the source. Remove the operator and its dyncfg
`TEMPORAL_BUCKETING_SUMMARY`; the bucket chain advances naturally
with the seal upper.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The per-seal cost of `TemporalBucketingMergeBatcher` matched plain
`MergeBatcher` only when the bucket chain was active. Workloads with
no future-stamped data still paid for a full peel/merge/restore/
recompute_frontier cycle on every seal, regressing benchmarks like
`Insert` (~28%).

Track held records on the batcher and short-circuit `seal` when the
bucket chain is empty and the current extract produced no `kept`
side. Make `BucketChain::restore` a no-op when content is empty or
already well-formed, and rewrite `recompute_frontier` to walk the
bucket chain in place via a new `BucketChain::iter_mut`, dropping
the per-call `Vec` allocation and per-bucket `find_mut` cost.

The held-records counter overcounts after consolidation in
`Merger::merge`, so the fast path is conservative: it never
incorrectly skips real held data, but may stay on the slow path
once consolidation has removed all records. That is no worse than
the previous behavior.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@antiguru antiguru force-pushed the temporal-bucketing-batcher branch from fedf72d to fe142dc Compare May 7, 2026 06:48
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant