feat(query): local block_id repartition before RowFetch in MERGE INTO#19689
feat(query): local block_id repartition before RowFetch in MERGE INTO#19689dantengsky wants to merge 5 commits intodatabendlabs:mainfrom
Conversation
…INTO During MERGE INTO with lazy columns, the RowFetch stage may repeatedly read the same physical block when rows from that block are scattered across different processors or batches. This adds a local pipeline exchange that partitions data by block_id (extracted from _row_id) before RowFetch, ensuring each processor handles a disjoint set of blocks and eliminating duplicate block reads. - Add BlockIdPartitionExchange implementing the Exchange trait - Insert exchange before MutationSplit for MixedMatched strategy - Insert exchange before RowFetch for MatchedOnly strategy - Add is_mutation flag to RowFetch to avoid affecting SELECT+LIMIT path - Add enable_merge_into_block_id_repartition setting (default on)
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: c7deef3342
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
src/query/storages/fuse/src/operations/merge_into/processors/block_id_partition_exchange.rs
Outdated
Show resolved
Hide resolved
…xchange Avoids truncation when max_threads exceeds 255.
|
@codex review |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: e311156f1d
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
src/query/service/src/physical_plans/physical_mutation_into_split.rs
Outdated
Show resolved
Hide resolved
…partition indices - Add has_row_fetch flag to MutationSplit so the exchange is only inserted when RowFetch follows (lazy columns exist). Without this, non-lazy workloads pay shuffle cost for no I/O benefit. - Use u16 instead of u8 for partition indices to avoid truncation when max_threads exceeds 255.
|
@codex review |
|
Codex Review: Didn't find any major issues. Keep it up! ℹ️ About Codex in GitHubCodex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
If Codex has suggestions, it will comment; otherwise it will react with 👍. When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback". |
eb1ed7b to
fddfa19
Compare
|
@codex review |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: fddfa19d9c
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
6b79bc3 to
0057c23
Compare
…test - Rename setting: enable_merge_into_block_id_repartition → enable_mutation_block_id_repartition - Rename RowFetch flag: is_merge_into → enable_block_id_repartition to accurately reflect scope (MERGE INTO + UPDATE...WHERE subquery) - Remove unnecessary serde(default) annotations - Improve comments: explain why SELECT+LIMIT skips repartition, use "reducing" instead of "eliminating" for duplicate reads - Rewrite test with proper structure, comments, and CREATE OR REPLACE - Add Test 6: UPDATE...WHERE (subquery) with lazy columns - Avoid unwrap in lazy_columns handling
0057c23 to
d40dcb0
Compare
|
@codex review |
|
Codex Review: Didn't find any major issues. Can't wait for the next one! ℹ️ About Codex in GitHubCodex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
If Codex has suggestions, it will comment; otherwise it will react with 👍. When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback". |
I hereby agree to the terms of the CLA available at: https://docs.databend.com/dev/policies/cla/
Summary
Problem
During join-based mutations (MERGE INTO, UPDATE...WHERE with subqueries) with lazy columns, the
TransformRowsFetcherreads target table blocks by_row_id. Since Hash Join output order follows the probe side, rows from the same target block scatter across different RowFetch processors or batches. Each batch flushes its block data, causing the same physical block to be read from storage multiple times — a significant I/O overhead when the target table has many columns and the matched set is large.In distributed mode, a cross-node shuffle by
block_idalready exists (build_block_id_shuffle_exchange), but within a single node no such grouping is performed.Solution
Add a local
pipeline.exchange()byblock_id(extracted from_row_id) before RowFetch, reusing the existingPartitionProcessor/MergePartitionProcessorinfrastructure.After repartition, each RowFetch processor handles a disjoint set of blocks, which eliminates cross-processor duplicate block reads. Within a single processor, same-block rows tend to arrive consecutively, greatly reducing cross-batch duplicates as well — though a residual duplicate read can still occur if a block's rows are split across a
BlockThresholdflush boundary.The existing per-batch deduplication inside
ParquetRowsFetcher::fetch()(which groups row_ids by block_id within a single flush) remains effective. This PR complements it by ensuring the rows reaching each processor are block-local in the first place.Pipeline changes
MixedMatched (WHEN MATCHED + WHEN NOT MATCHED):
Exchange replaces
try_resize(N)— same output port count, but data is partitioned by block_id.MatchedOnly (MERGE INTO with only WHEN MATCHED, or UPDATE...WHERE with subquery):
NotMatchedOnly: no RowFetch, no change.
New setting:
enable_mutation_block_id_repartition1(on)When to disable: If matched rows are heavily concentrated in a few large blocks, the repartition may cause load skew (one processor handles most of the work). In this case,
SET enable_mutation_block_id_repartition = 0falls back to the originaltry_resizeround-robin distribution, which provides better load balancing at the cost of more duplicate block reads.Affected paths:
Not affected:
MutationStrategy::Direct, returns early before RowFetch)_row_idis needed)enable_block_id_repartitionflag isfalse)max_threads = 1, guarded)Key changes
BlockIdPartitionExchange: a newExchangeimplementation that partitions rows by the block_id prefix of_row_id. For nullable row_ids (unmatched rows in MixedMatched), an incrementing counter spreads them evenly across partitions.MutationSplit::build_pipeline2before the matched/unmatched split. Guarded byhas_row_fetchflag so non-lazy workloads are not affected.RowFetch::build_pipeline2. Anenable_block_id_repartitionflag onRowFetchcontrols this.Performance impact
Reduces storage I/O in the RowFetch stage by avoiding redundant block reads across processors. The trade-off is that load balancing now depends on how matched rows are distributed across blocks (previously
try_resizedistributed rows evenly via round-robin). The overhead is one local scatter + merge per pipeline, operating on thin data (only join output columns, before lazy columns are materialized). Guarded bymax_threads > 1so single-threaded execution has zero overhead.Tests
New sqllogictest
09_0051_merge_into_block_id_repartition.testcovers:All existing merge_into tests pass without regression.
Type of change
This change is