Skip to content

feat(query): local block_id repartition before RowFetch in MERGE INTO#19689

Open
dantengsky wants to merge 5 commits intodatabendlabs:mainfrom
dantengsky:feat/merge-into-block-id-repartition
Open

feat(query): local block_id repartition before RowFetch in MERGE INTO#19689
dantengsky wants to merge 5 commits intodatabendlabs:mainfrom
dantengsky:feat/merge-into-block-id-repartition

Conversation

@dantengsky
Copy link
Copy Markdown
Member

@dantengsky dantengsky commented Apr 9, 2026

I hereby agree to the terms of the CLA available at: https://docs.databend.com/dev/policies/cla/

Summary

Problem

During join-based mutations (MERGE INTO, UPDATE...WHERE with subqueries) with lazy columns, the TransformRowsFetcher reads target table blocks by _row_id. Since Hash Join output order follows the probe side, rows from the same target block scatter across different RowFetch processors or batches. Each batch flushes its block data, causing the same physical block to be read from storage multiple times — a significant I/O overhead when the target table has many columns and the matched set is large.

In distributed mode, a cross-node shuffle by block_id already exists (build_block_id_shuffle_exchange), but within a single node no such grouping is performed.

Solution

Add a local pipeline.exchange() by block_id (extracted from _row_id) before RowFetch, reusing the existing PartitionProcessor / MergePartitionProcessor infrastructure.

After repartition, each RowFetch processor handles a disjoint set of blocks, which eliminates cross-processor duplicate block reads. Within a single processor, same-block rows tend to arrive consecutively, greatly reducing cross-batch duplicates as well — though a residual duplicate read can still occur if a block's rows are split across a BlockThreshold flush boundary.

The existing per-batch deduplication inside ParquetRowsFetcher::fetch() (which groups row_ids by block_id within a single flush) remains effective. This PR complements it by ensuring the rows reaching each processor are block-local in the first place.

Pipeline changes

MixedMatched (WHEN MATCHED + WHEN NOT MATCHED):

                    Before                                      After
                    ──────                                      ─────
              HashJoin output                             HashJoin output
                     │                                           │
             try_resize(N)                          Exchange(block_id, N)  ← NEW
                     │                                           │
               MutationSplit                               MutationSplit
              ╱             ╲                              ╱             ╲
     [even ports]     [odd ports]                [even ports]     [odd ports]
      matched          unmatched                  matched          unmatched
         │                 │                         │                 │
      RowFetch          dummy                     RowFetch          dummy
         │                 │                         │                 │
         └────────┬────────┘                         └────────┬────────┘
        MutationManipulate                          MutationManipulate

Exchange replaces try_resize(N) — same output port count, but data is partitioned by block_id.

MatchedOnly (MERGE INTO with only WHEN MATCHED, or UPDATE...WHERE with subquery):

                    Before                                      After
                    ──────                                      ─────
              HashJoin output                             HashJoin output
                     │                                           │
                  RowFetch                          Exchange(block_id, N)  ← NEW
                     │                                           │
            MutationManipulate                              RowFetch
                                                                 │
                                                        MutationManipulate

NotMatchedOnly: no RowFetch, no change.

New setting: enable_mutation_block_id_repartition

Default 1 (on)
Scope Session / Global
Purpose Controls whether to insert a local block_id repartition before RowFetch in join-based mutation pipelines

When to disable: If matched rows are heavily concentrated in a few large blocks, the repartition may cause load skew (one processor handles most of the work). In this case, SET enable_mutation_block_id_repartition = 0 falls back to the original try_resize round-robin distribution, which provides better load balancing at the cost of more duplicate block reads.

Affected paths:

  • MERGE INTO with lazy columns (MixedMatched and MatchedOnly strategies)
  • UPDATE...WHERE with subqueries and lazy columns (MatchedOnly strategy)

Not affected:

  • Plain UPDATE/DELETE without subqueries (MutationStrategy::Direct, returns early before RowFetch)
  • DELETE with subqueries (lazy materialization is always skipped for deletes — only _row_id is needed)
  • SELECT+LIMIT RowFetch (exchange would destroy sort order; enable_block_id_repartition flag is false)
  • Single-threaded execution (max_threads = 1, guarded)

Key changes

  • BlockIdPartitionExchange: a new Exchange implementation that partitions rows by the block_id prefix of _row_id. For nullable row_ids (unmatched rows in MixedMatched), an incrementing counter spreads them evenly across partitions.
  • For MixedMatched, the exchange is inserted in MutationSplit::build_pipeline2 before the matched/unmatched split. Guarded by has_row_fetch flag so non-lazy workloads are not affected.
  • For MatchedOnly, the exchange is inserted in RowFetch::build_pipeline2. An enable_block_id_repartition flag on RowFetch controls this.

Performance impact

Reduces storage I/O in the RowFetch stage by avoiding redundant block reads across processors. The trade-off is that load balancing now depends on how matched rows are distributed across blocks (previously try_resize distributed rows evenly via round-robin). The overhead is one local scatter + merge per pipeline, operating on thin data (only join output columns, before lazy columns are materialized). Guarded by max_threads > 1 so single-threaded execution has zero overhead.

Tests

  • Unit Test
  • Logic Test
  • Benchmark Test
  • No Test - Explain why

New sqllogictest 09_0051_merge_into_block_id_repartition.test covers:

  1. MixedMatched UPDATE (repartition ON)
  2. MixedMatched UPDATE (repartition OFF, identical results)
  3. MatchedOnly UPDATE
  4. NotMatchedOnly INSERT (no-op)
  5. MixedMatched DELETE
  6. UPDATE...WHERE (subquery) with lazy columns

All existing merge_into tests pass without regression.

Type of change

  • Bug Fix (non-breaking change which fixes an issue)
  • New Feature (non-breaking change which adds functionality)
  • Breaking Change (fix or feature that could cause existing functionality not to work as expected)
  • Documentation Update
  • Refactoring
  • Performance Improvement
  • Other (please describe):

This change is Reviewable

…INTO

During MERGE INTO with lazy columns, the RowFetch stage may repeatedly
read the same physical block when rows from that block are scattered
across different processors or batches. This adds a local pipeline
exchange that partitions data by block_id (extracted from _row_id)
before RowFetch, ensuring each processor handles a disjoint set of
blocks and eliminating duplicate block reads.

- Add BlockIdPartitionExchange implementing the Exchange trait
- Insert exchange before MutationSplit for MixedMatched strategy
- Insert exchange before RowFetch for MatchedOnly strategy
- Add is_mutation flag to RowFetch to avoid affecting SELECT+LIMIT path
- Add enable_merge_into_block_id_repartition setting (default on)
@github-actions github-actions bot added the pr-feature this PR introduces a new feature to the codebase label Apr 9, 2026
Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: c7deef3342

ℹ️ About Codex in GitHub

Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".

…xchange

Avoids truncation when max_threads exceeds 255.
@dantengsky
Copy link
Copy Markdown
Member Author

@codex review

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: e311156f1d

ℹ️ About Codex in GitHub

Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".

…partition indices

- Add has_row_fetch flag to MutationSplit so the exchange is only
  inserted when RowFetch follows (lazy columns exist). Without this,
  non-lazy workloads pay shuffle cost for no I/O benefit.
- Use u16 instead of u8 for partition indices to avoid truncation
  when max_threads exceeds 255.
@dantengsky
Copy link
Copy Markdown
Member Author

@codex review

@chatgpt-codex-connector
Copy link
Copy Markdown

Codex Review: Didn't find any major issues. Keep it up!

ℹ️ About Codex in GitHub

Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".

@dantengsky dantengsky force-pushed the feat/merge-into-block-id-repartition branch 8 times, most recently from eb1ed7b to fddfa19 Compare April 10, 2026 06:05
@dantengsky
Copy link
Copy Markdown
Member Author

@codex review

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: fddfa19d9c

ℹ️ About Codex in GitHub

Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".

Copy link
Copy Markdown
Member Author

@dantengsky dantengsky left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

dantengsky

This comment was marked as duplicate.

@dantengsky dantengsky force-pushed the feat/merge-into-block-id-repartition branch 2 times, most recently from 6b79bc3 to 0057c23 Compare April 10, 2026 08:03
…test

- Rename setting: enable_merge_into_block_id_repartition →
  enable_mutation_block_id_repartition
- Rename RowFetch flag: is_merge_into → enable_block_id_repartition
  to accurately reflect scope (MERGE INTO + UPDATE...WHERE subquery)
- Remove unnecessary serde(default) annotations
- Improve comments: explain why SELECT+LIMIT skips repartition,
  use "reducing" instead of "eliminating" for duplicate reads
- Rewrite test with proper structure, comments, and CREATE OR REPLACE
- Add Test 6: UPDATE...WHERE (subquery) with lazy columns
- Avoid unwrap in lazy_columns handling
@dantengsky dantengsky force-pushed the feat/merge-into-block-id-repartition branch from 0057c23 to d40dcb0 Compare April 10, 2026 08:32
@dantengsky
Copy link
Copy Markdown
Member Author

@codex review

@chatgpt-codex-connector
Copy link
Copy Markdown

Codex Review: Didn't find any major issues. Can't wait for the next one!

ℹ️ About Codex in GitHub

Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

pr-feature this PR introduces a new feature to the codebase

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant