compute: temporal-bucketing batcher#36427
Draft
antiguru wants to merge 7 commits intoMaterializeInc:mainfrom
Draft
compute: temporal-bucketing batcher#36427antiguru wants to merge 7 commits intoMaterializeInc:mainfrom
antiguru wants to merge 7 commits intoMaterializeInc:mainfrom
Conversation
Replaces the source-side temporal-bucketing operator on the arrangement path with a bucket-aware merge batcher that integrates directly into mz_arrange call sites. Vendors `MergeBatcher`, `Merger`, `InternalMerge`, `InternalMerger`, and `VecMerger` from differential-dataflow into `mz_timely_util::merge_batcher` so the batcher's flat `chains: Vec<Vec<M::Chunk>>` can be replaced by a `BucketChain` of `MergeBucket`s that hold updates by timestamp range. `Batcher::seal(upper)` continues to emit only updates strictly less than `upper`; bucketing is layered below the seal contract. `Batcher::frontier` reports the actual minimum held timestamp by merging the lowest non-empty bucket once per seal and consulting `Merger::extract`. Subsequent seals see a single chain in that bucket, so the work is amortised. `CollectionBundle::ensure_collections` reads `ENABLE_TEMPORAL_BUCKETING` and dispatches the row-row arrangement build through a new per- `RenderTimestamp` trait `MaybeTemporalRowRowArrange`. The trait is a sibling to `RenderTimestamp`, not a supertrait, so the bucketing precondition does not leak into the generic timestamp interface. Iterative `Product<...>` always uses the non-temporal batcher. The source-side `extensions::temporal_bucket` operator and `TEMPORAL_BUCKETING_SUMMARY` are intentionally retained for now; whether to keep them is a benchmark decision tracked as a follow-up. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Restructure `TemporalBucketingMergeBatcher` so the bucket chain holds only future-stamped data. Incoming chunks now flow into a flat `chains: Vec<Vec<M::Chunk>>` identical to plain `MergeBatcher`. The `BucketChain` is touched only at seal time, and only for the `kept` (future-stamped) side of the seal extract. Avoids the SkewedJoin / FinishOrderByLimit regression observed with the prior bucket-only layout: for those workloads the input has no future component, so `kept` is empty at every seal and the bucket chain stays empty. Per-push and per-seal cost reduces to the plain merge-batcher path. The bucket chain only pays its splitting/merging overhead for the future tail of the input. The seal contract is preserved: only updates with time strictly less than `upper` are emitted, and `frontier()` reports the actual lower bound of held data (computed from the lowest non-empty bucket of `bucket_chain`).
Cherry-picked from MaterializeInc#35935. Two scenarios exercise the bucketing path: * TemporalFilterIndexed: index hydration over a temporal-filtered view of a 10M-row materialized view. * TemporalFilterSustainedInsert: per-insert latency through a pre-warmed indexed temporal view.
The arrange-side `TemporalBucketingMergeBatcher` (per-`ArrangeBy`) fully covers the source-side delay this operator used to provide. Keeping both pays a double bucketing cost on every dataflow that touches the source. Remove the operator and its dyncfg `TEMPORAL_BUCKETING_SUMMARY`; the bucket chain advances naturally with the seal upper. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The per-seal cost of `TemporalBucketingMergeBatcher` matched plain `MergeBatcher` only when the bucket chain was active. Workloads with no future-stamped data still paid for a full peel/merge/restore/ recompute_frontier cycle on every seal, regressing benchmarks like `Insert` (~28%). Track held records on the batcher and short-circuit `seal` when the bucket chain is empty and the current extract produced no `kept` side. Make `BucketChain::restore` a no-op when content is empty or already well-formed, and rewrite `recompute_frontier` to walk the bucket chain in place via a new `BucketChain::iter_mut`, dropping the per-call `Vec` allocation and per-bucket `find_mut` cost. The held-records counter overcounts after consolidation in `Merger::merge`, so the fast path is conservative: it never incorrectly skips real held data, but may stay on the slow path once consolidation has removed all records. That is no worse than the previous behavior. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
fedf72d to
fe142dc
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Motivation
Temporal bucketing currently runs as a separate Timely operator (
extensions::temporal_bucket) sitting upstream of everymz_arrangecall.The operator buffers future-stamped updates in a
BucketChain<MergeBatcherWrapper>and re-emits them once the input frontier crosses bucket boundaries, which means data passes through two separate batcher hierarchies before reaching the arrangement spine.Pushing the bucketing into the arrangement's own
Batcherremoves the operator hop, lets the spine see fewer bucket-aligned batches, and avoids the timestamp rewrites that the operator performed to align releases.Description
mz_timely_util::merge_batchervendorsMergeBatcher,Merger,InternalMerge,InternalMerger, andVecMergerfrom differential-dataflow.Vendoring is required because the bucket-aware logic operates on a merger's chunks, frontier, and stash --- machinery that is not exposed through the public
Batchertrait alone.The vendored bodies are byte-equivalent to upstream; the only added surface is a new
temporalmodule containingMergeBucketandTemporalBucketingMergeBatcher.TemporalBucketingMergeBatcher<I, C, M>is a hybrid: it has both a flatchains: Vec<Vec<M::Chunk>>field and abucket_chain: BucketChain<MergeBucket<M>>.Every input chunk lands in the flat chains via
insert_chainwith the same geometric merging that plainMergeBatcherperforms, so the input path does not touch the bucket chain at all.At seal time the flat chains are merged into a single sorted chain,
Merger::extractis run against the sealupper, and the result is split intoflat_ready(t < upper) andkept(t >= upper).The
keptchunks --- and only those chunks --- are routed into the bucket chain via per-boundaryMerger::extractcalls keyed offBucketChain::starts().The bucket chain is then peeled at the same
upper; mature buckets release their data, the resulting chunks are merged into a single sortedpeeled_chain, andpeeled_chainis merged withflat_readyto produce the final readied chunks the builder consumes.Peel's contract guarantees the peeled buckets are entirely below
upper, so no second extract is needed againstupperafter merging.This layout keeps the bucket chain empty for workloads whose data is at or below the current frontier (the common case):
keptis empty at every seal, the bucket chain is never seeded, and per-push and per-seal cost reduces to plainMergeBatcherwith no overhead.The bucket chain only pays its splitting and merging overhead for the genuine future tail of the input.
Bucket-chain shape is restored with a bounded fuel budget at the end of seal;
push_containerdoes no bucket-chain maintenance.MergeBucket<M>holdsVec<Vec<M::Chunk>>(a geometric chain of sorted chunks, mirroringMergeBatcher's flat chain invariant) and implementstemporal::Bucketso the chain can split it on demand.MergeBucket::split(timestamp, fuel)merges its chains into a single sorted chain and runsMerger::extract(merged, [timestamp], ...)to partition the data into a lower bucket (t < timestamp) and an upper bucket (t >= timestamp).Fuel is decremented by the record count touched so the chain's amortised splitting accounting still works.
Batcher::frontier()reports the actual minimum held timestamp rather than a bucket boundary.A bucket's start is only a range-level lower bound, so reporting it would over-conservatively hold downstream capabilities.
After seal, all held data lives in
bucket_chain(the flat chains have been emptied by the seal extract), so frontier recomputation walks only that side.The lowest non-empty bucket's chains are merged into one sorted chain and run through
Merger::extractagainst the bucket's own start; every entry is classified askeep(since all bucket times are>= start), which is the side that updates the frontier antichain.For the totally ordered timestamps that impl
BucketTimestampthe antichain collapses to the single minimum.The merged chain is reinserted as the bucket's only chain so subsequent seals pay only the linear extract cost.
mz_timely_util::columnarexposesCol2ValTemporalBatcher<K, V, T, R>--- the temporal counterpart ofCol2ValBatcher--- by wrappingTemporalBucketingMergeBatcheraround the sameChunkerandColInternalMergerused elsewhere.CollectionBundle::ensure_collectionsreadsENABLE_TEMPORAL_BUCKETINGfrom the dataflow'sConfigSetand threads the flag down to the row-row arrangement build via the newMaybeTemporalRowRowArrangetrait insrc/compute/src/extensions/arrange.rs.The trait has one impl per concrete
RenderTimestamp:mz_repr::TimestamppicksCol2ValTemporalBatcherwhen the dyncfg is on andCol2ValBatcherotherwise;Product<mz_repr::Timestamp, PointStamp<u64>>(iterative scopes) always picks the plain batcher because the timestamp is partially ordered and not bucketable.The trait is intentionally a sibling of
RenderTimestamprather than a supertrait, so the bucketing precondition (T: BucketTimestamp) does not leak into the generic timestamp interface; the bound is added explicitly on the impl block that hostsarrange_collection.top_k.rs,reduce.rs,threshold.rs,typedefs.rs,columnation.rs, andupsert_continual_feedback_v2.rsflip their imports fromdifferential_dataflow::trace::implementations::merge_batcherto the vendoredmz_timely_util::merge_batcherso that the codebase has a single source for these types.The source-side
extensions::temporal_bucketoperator andTEMPORAL_BUCKETING_SUMMARYare retained behind the existing dyncfgs as a fallback while the batcher path is benchmarked; removal is tracked as a follow-up.