From c7deef33425cbea1adf603303ea9f37064060759 Mon Sep 17 00:00:00 2001 From: dantengsky Date: Thu, 9 Apr 2026 15:09:23 +0800 Subject: [PATCH 1/5] feat(query): add local block_id repartition before RowFetch in MERGE INTO During MERGE INTO with lazy columns, the RowFetch stage may repeatedly read the same physical block when rows from that block are scattered across different processors or batches. This adds a local pipeline exchange that partitions data by block_id (extracted from _row_id) before RowFetch, ensuring each processor handles a disjoint set of blocks and eliminating duplicate block reads. - Add BlockIdPartitionExchange implementing the Exchange trait - Insert exchange before MutationSplit for MixedMatched strategy - Insert exchange before RowFetch for MatchedOnly strategy - Add is_mutation flag to RowFetch to avoid affecting SELECT+LIMIT path - Add enable_merge_into_block_id_repartition setting (default on) --- .../src/physical_plans/physical_limit.rs | 1 + .../src/physical_plans/physical_mutation.rs | 1 + .../physical_mutation_into_split.rs | 20 +- .../src/physical_plans/physical_row_fetch.rs | 22 ++ src/query/settings/src/settings_default.rs | 7 + .../settings/src/settings_getter_setter.rs | 4 + .../fuse/src/operations/merge_into/mod.rs | 1 + .../processors/block_id_partition_exchange.rs | 99 +++++++++ .../operations/merge_into/processors/mod.rs | 2 + ..._0051_merge_into_block_id_repartition.test | 207 ++++++++++++++++++ 10 files changed, 361 insertions(+), 3 deletions(-) create mode 100644 src/query/storages/fuse/src/operations/merge_into/processors/block_id_partition_exchange.rs create mode 100644 tests/sqllogictests/suites/base/09_fuse_engine/09_0051_merge_into_block_id_repartition.test diff --git a/src/query/service/src/physical_plans/physical_limit.rs b/src/query/service/src/physical_plans/physical_limit.rs index 90e741c5772e1..1432abb3fdab0 100644 --- a/src/query/service/src/physical_plans/physical_limit.rs +++ b/src/query/service/src/physical_plans/physical_limit.rs @@ -296,6 +296,7 @@ impl PhysicalPlanBuilder { cols_to_fetch, fetched_fields, need_wrap_nullable: false, + is_mutation: false, stat_info: Some(stat_info.clone()), }); } diff --git a/src/query/service/src/physical_plans/physical_mutation.rs b/src/query/service/src/physical_plans/physical_mutation.rs index 30988393b8119..25bbaf046246e 100644 --- a/src/query/service/src/physical_plans/physical_mutation.rs +++ b/src/query/service/src/physical_plans/physical_mutation.rs @@ -809,6 +809,7 @@ fn build_mutation_row_fetch( cols_to_fetch, fetched_fields, need_wrap_nullable, + is_mutation: true, stat_info: None, meta: PhysicalPlanMeta::new("RowFetch"), }) diff --git a/src/query/service/src/physical_plans/physical_mutation_into_split.rs b/src/query/service/src/physical_plans/physical_mutation_into_split.rs index e86c70641ab64..99e01dd2a12fb 100644 --- a/src/query/service/src/physical_plans/physical_mutation_into_split.rs +++ b/src/query/service/src/physical_plans/physical_mutation_into_split.rs @@ -13,10 +13,12 @@ // limitations under the License. use std::any::Any; +use std::sync::Arc; use databend_common_exception::Result; use databend_common_pipeline::core::Pipe; use databend_common_sql::IndexType; +use databend_common_storages_fuse::operations::BlockIdPartitionExchange; use databend_common_storages_fuse::operations::MutationSplitProcessor; use crate::physical_plans::format::MutationSplitFormatter; @@ -71,9 +73,21 @@ impl IPhysicalPlan for MutationSplit { fn build_pipeline2(&self, builder: &mut PipelineBuilder) -> Result<()> { self.input.build_pipeline(builder)?; - builder - .main_pipeline - .try_resize(builder.settings.get_max_threads()? as usize)?; + let max_threads = builder.settings.get_max_threads()? as usize; + + // Add block_id repartition before split so each downstream RowFetch + // processor sees rows from a disjoint set of blocks, eliminating + // duplicate block reads. + if max_threads > 1 + && builder + .settings + .get_enable_merge_into_block_id_repartition()? + { + let exchange = Arc::new(BlockIdPartitionExchange::create(self.split_index)); + builder.main_pipeline.exchange(max_threads, exchange)?; + } else { + builder.main_pipeline.try_resize(max_threads)?; + } // The MutationStrategy is FullOperation, use row_id_idx to split let mut items = Vec::with_capacity(builder.main_pipeline.output_len()); diff --git a/src/query/service/src/physical_plans/physical_row_fetch.rs b/src/query/service/src/physical_plans/physical_row_fetch.rs index 11fef083c340a..a6363bb8f58c7 100644 --- a/src/query/service/src/physical_plans/physical_row_fetch.rs +++ b/src/query/service/src/physical_plans/physical_row_fetch.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::any::Any; +use std::sync::Arc; use databend_common_catalog::plan::DataSourcePlan; use databend_common_catalog::plan::Projection; @@ -25,6 +26,7 @@ use databend_common_pipeline::core::OutputPort; use databend_common_pipeline::core::Pipe; use databend_common_pipeline::core::PipeItem; use databend_common_pipeline_transforms::create_dummy_item; +use databend_common_storages_fuse::operations::BlockIdPartitionExchange; use databend_common_storages_fuse::operations::row_fetch_processor; use itertools::Itertools; @@ -49,6 +51,9 @@ pub struct RowFetch { pub row_id_col_offset: usize, pub fetched_fields: Vec, pub need_wrap_nullable: bool, + /// True when this RowFetch is part of a MERGE INTO pipeline (not SELECT+LIMIT). + #[serde(default)] + pub is_mutation: bool, /// Only used for explain pub stat_info: Option, @@ -108,6 +113,7 @@ impl IPhysicalPlan for RowFetch { row_id_col_offset: self.row_id_col_offset, fetched_fields: self.fetched_fields.clone(), need_wrap_nullable: self.need_wrap_nullable, + is_mutation: self.is_mutation, stat_info: self.stat_info.clone(), }) } @@ -124,8 +130,24 @@ impl IPhysicalPlan for RowFetch { )?; if !MutationSplit::check_physical_plan(&self.input) { + // SELECT+LIMIT path or MatchedOnly path without MutationSplit. + // For MatchedOnly with lazy columns, add block_id repartition before RowFetch. + if self.is_mutation { + let max_threads = builder.settings.get_max_threads()? as usize; + if max_threads > 1 + && builder + .settings + .get_enable_merge_into_block_id_repartition()? + { + let exchange = + Arc::new(BlockIdPartitionExchange::create(self.row_id_col_offset)); + builder.main_pipeline.exchange(max_threads, exchange)?; + } + } builder.main_pipeline.add_transform(processor)?; } else { + // MixedMatched path: MutationSplit is upstream. + // Repartition already happened in MutationSplit::build_pipeline2. let output_len = builder.main_pipeline.output_len(); let mut pipe_items = Vec::with_capacity(output_len); for i in 0..output_len { diff --git a/src/query/settings/src/settings_default.rs b/src/query/settings/src/settings_default.rs index b14f575c6206e..73d6ded39bd70 100644 --- a/src/query/settings/src/settings_default.rs +++ b/src/query/settings/src/settings_default.rs @@ -525,6 +525,13 @@ impl DefaultSettings { scope: SettingScope::Both, range: Some(SettingRange::Numeric(0..=1)), }), + ("enable_merge_into_block_id_repartition", DefaultSettingValue { + value: UserSettingValue::UInt64(1), + desc: "Enable local block_id repartition before row fetch in merge into to reduce duplicate block reads.", + mode: SettingMode::Both, + scope: SettingScope::Both, + range: Some(SettingRange::Numeric(0..=1)), + }), ("max_cte_recursive_depth", DefaultSettingValue { value: UserSettingValue::UInt64(1000), desc: "Max recursive depth for recursive cte", diff --git a/src/query/settings/src/settings_getter_setter.rs b/src/query/settings/src/settings_getter_setter.rs index dca74c1d5491d..c96de3eb57856 100644 --- a/src/query/settings/src/settings_getter_setter.rs +++ b/src/query/settings/src/settings_getter_setter.rs @@ -470,6 +470,10 @@ impl Settings { Ok(self.try_get_u64("enable_merge_into_row_fetch")? != 0) } + pub fn get_enable_merge_into_block_id_repartition(&self) -> Result { + Ok(self.try_get_u64("enable_merge_into_block_id_repartition")? != 0) + } + pub fn get_max_cte_recursive_depth(&self) -> Result { Ok(self.try_get_u64("max_cte_recursive_depth")? as usize) } diff --git a/src/query/storages/fuse/src/operations/merge_into/mod.rs b/src/query/storages/fuse/src/operations/merge_into/mod.rs index 5a6a2f19aff4d..8bfb827df54d2 100644 --- a/src/query/storages/fuse/src/operations/merge_into/mod.rs +++ b/src/query/storages/fuse/src/operations/merge_into/mod.rs @@ -16,6 +16,7 @@ mod mutator; mod processors; pub use mutator::MatchedAggregator; +pub use processors::BlockIdPartitionExchange; pub use processors::MatchedSplitProcessor; pub use processors::MergeIntoNotMatchedProcessor; pub use processors::MixRowIdKindAndLog; diff --git a/src/query/storages/fuse/src/operations/merge_into/processors/block_id_partition_exchange.rs b/src/query/storages/fuse/src/operations/merge_into/processors/block_id_partition_exchange.rs new file mode 100644 index 0000000000000..3678e60ea14c7 --- /dev/null +++ b/src/query/storages/fuse/src/operations/merge_into/processors/block_id_partition_exchange.rs @@ -0,0 +1,99 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::atomic::AtomicU64; +use std::sync::atomic::Ordering; + +use databend_common_catalog::plan::split_row_id; +use databend_common_exception::Result; +use databend_common_expression::DataBlock; +use databend_common_expression::types::DataType; +use databend_common_expression::types::NumberDataType; +use databend_common_pipeline::basic::Exchange; + +/// Partitions data blocks by block_id extracted from the `_row_id` column. +/// +/// This ensures that rows belonging to the same physical block are routed +/// to the same downstream processor, eliminating duplicate block reads +/// in the RowFetch stage of MERGE INTO. +pub struct BlockIdPartitionExchange { + row_id_col_offset: usize, + /// Round-robin counter for NULL row_ids (unmatched rows in MixedMatched). + null_counter: AtomicU64, +} + +impl BlockIdPartitionExchange { + pub fn create(row_id_col_offset: usize) -> Self { + Self { + row_id_col_offset, + null_counter: AtomicU64::new(0), + } + } + + #[inline(always)] + fn partition_index(row_id: u64, n: usize) -> u8 { + let (prefix, _) = split_row_id(row_id); + (prefix % n as u64) as u8 + } +} + +impl Exchange for BlockIdPartitionExchange { + const NAME: &'static str = "BlockIdPartition"; + const SKIP_EMPTY_DATA_BLOCK: bool = true; + + fn partition(&self, data_block: DataBlock, n: usize) -> Result> { + let num_rows = data_block.num_rows(); + let entry = &data_block.columns()[self.row_id_col_offset]; + let mut indices = Vec::with_capacity(num_rows); + + match entry.data_type() { + DataType::Number(NumberDataType::UInt64) => { + let col = entry + .to_column() + .into_number() + .unwrap() + .into_u_int64() + .unwrap(); + for row_id in col.iter() { + indices.push(Self::partition_index(*row_id, n)); + } + } + DataType::Nullable(inner) + if matches!(inner.as_ref(), DataType::Number(NumberDataType::UInt64)) => + { + let col = entry.to_column(); + let nullable = col.into_nullable().unwrap(); + let row_ids = nullable + .column + .into_number() + .unwrap() + .into_u_int64() + .unwrap(); + for (row_id, is_valid) in row_ids.iter().zip(nullable.validity.iter()) { + if is_valid { + indices.push(Self::partition_index(*row_id, n)); + } else { + let counter = self.null_counter.fetch_add(1, Ordering::Relaxed); + indices.push((counter % n as u64) as u8); + } + } + } + _ => unreachable!( + "Row id column should be UInt64 or Nullable(UInt64) for block_id partition" + ), + } + + DataBlock::scatter(&data_block, &indices, n) + } +} diff --git a/src/query/storages/fuse/src/operations/merge_into/processors/mod.rs b/src/query/storages/fuse/src/operations/merge_into/processors/mod.rs index 296cbd0bda998..b853e8ee7a3b7 100644 --- a/src/query/storages/fuse/src/operations/merge_into/processors/mod.rs +++ b/src/query/storages/fuse/src/operations/merge_into/processors/mod.rs @@ -12,12 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. +mod block_id_partition_exchange; mod processor_merge_into_matched_and_split; mod processor_merge_into_not_matched; mod processor_merge_into_split; mod processor_merge_into_split_row_number_and_log; mod transform_matched_mutation_aggregator; +pub use block_id_partition_exchange::BlockIdPartitionExchange; pub use processor_merge_into_matched_and_split::MatchedSplitProcessor; pub use processor_merge_into_matched_and_split::MixRowIdKindAndLog; pub(crate) use processor_merge_into_matched_and_split::RowIdKind; diff --git a/tests/sqllogictests/suites/base/09_fuse_engine/09_0051_merge_into_block_id_repartition.test b/tests/sqllogictests/suites/base/09_fuse_engine/09_0051_merge_into_block_id_repartition.test new file mode 100644 index 0000000000000..bd5c8ba308a31 --- /dev/null +++ b/tests/sqllogictests/suites/base/09_fuse_engine/09_0051_merge_into_block_id_repartition.test @@ -0,0 +1,207 @@ +statement ok +DROP TABLE IF EXISTS t_repartition_target; + +statement ok +DROP TABLE IF EXISTS t_repartition_source; + +statement ok +CREATE TABLE t_repartition_target (id INT, val VARCHAR, extra VARCHAR, ts TIMESTAMP); + +statement ok +INSERT INTO t_repartition_target VALUES (1, 'a', 'x1', '2024-01-01 00:00:00'); + +statement ok +INSERT INTO t_repartition_target VALUES (2, 'b', 'x2', '2024-01-01 00:00:00'); + +statement ok +INSERT INTO t_repartition_target VALUES (3, 'c', 'x3', '2024-01-01 00:00:00'); + +statement ok +INSERT INTO t_repartition_target VALUES (4, 'd', 'x4', '2024-01-01 00:00:00'); + +statement ok +INSERT INTO t_repartition_target VALUES (5, 'e', 'x5', '2024-01-01 00:00:00'); + +statement ok +CREATE TABLE t_repartition_source (id INT, val VARCHAR, extra VARCHAR, ts TIMESTAMP); + +statement ok +INSERT INTO t_repartition_source VALUES (1, 'a_new', 'y1', '2024-01-02 00:00:00'), (2, 'b_new', 'y2', '2024-01-02 00:00:00'), (3, 'c_old', 'y3', '2023-12-01 00:00:00'), (6, 'f_new', 'y6', '2024-01-02 00:00:00'), (7, 'g_new', 'y7', '2024-01-02 00:00:00'); + +statement ok +SET enable_merge_into_row_fetch = 1; + +statement ok +SET enable_merge_into_block_id_repartition = 1; + +query II +MERGE INTO t_repartition_target AS tar +USING t_repartition_source AS sou +ON tar.id = sou.id +WHEN MATCHED AND tar.ts < sou.ts THEN UPDATE * +WHEN NOT MATCHED THEN INSERT *; +---- +2 2 + +query ITTT +SELECT id, val, extra, ts FROM t_repartition_target ORDER BY id; +---- +1 a_new y1 2024-01-02 00:00:00.000000 +2 b_new y2 2024-01-02 00:00:00.000000 +3 c x3 2024-01-01 00:00:00.000000 +4 d x4 2024-01-01 00:00:00.000000 +5 e x5 2024-01-01 00:00:00.000000 +6 f_new y6 2024-01-02 00:00:00.000000 +7 g_new y7 2024-01-02 00:00:00.000000 + +statement ok +DROP TABLE t_repartition_target; + +statement ok +CREATE TABLE t_repartition_target (id INT, val VARCHAR, extra VARCHAR, ts TIMESTAMP); + +statement ok +INSERT INTO t_repartition_target VALUES (1, 'a', 'x1', '2024-01-01 00:00:00'); + +statement ok +INSERT INTO t_repartition_target VALUES (2, 'b', 'x2', '2024-01-01 00:00:00'); + +statement ok +INSERT INTO t_repartition_target VALUES (3, 'c', 'x3', '2024-01-01 00:00:00'); + +statement ok +INSERT INTO t_repartition_target VALUES (4, 'd', 'x4', '2024-01-01 00:00:00'); + +statement ok +INSERT INTO t_repartition_target VALUES (5, 'e', 'x5', '2024-01-01 00:00:00'); + +statement ok +SET enable_merge_into_block_id_repartition = 0; + +query II +MERGE INTO t_repartition_target AS tar +USING t_repartition_source AS sou +ON tar.id = sou.id +WHEN MATCHED AND tar.ts < sou.ts THEN UPDATE * +WHEN NOT MATCHED THEN INSERT *; +---- +2 2 + +query ITTT +SELECT id, val, extra, ts FROM t_repartition_target ORDER BY id; +---- +1 a_new y1 2024-01-02 00:00:00.000000 +2 b_new y2 2024-01-02 00:00:00.000000 +3 c x3 2024-01-01 00:00:00.000000 +4 d x4 2024-01-01 00:00:00.000000 +5 e x5 2024-01-01 00:00:00.000000 +6 f_new y6 2024-01-02 00:00:00.000000 +7 g_new y7 2024-01-02 00:00:00.000000 + +statement ok +DROP TABLE t_repartition_target; + +statement ok +CREATE TABLE t_repartition_target (id INT, val VARCHAR, extra VARCHAR, ts TIMESTAMP); + +statement ok +INSERT INTO t_repartition_target VALUES (1, 'a', 'x1', '2024-01-01 00:00:00'); + +statement ok +INSERT INTO t_repartition_target VALUES (2, 'b', 'x2', '2024-01-01 00:00:00'); + +statement ok +INSERT INTO t_repartition_target VALUES (3, 'c', 'x3', '2024-01-01 00:00:00'); + +statement ok +SET enable_merge_into_block_id_repartition = 1; + +query I +MERGE INTO t_repartition_target AS tar +USING t_repartition_source AS sou +ON tar.id = sou.id +WHEN MATCHED AND tar.ts < sou.ts THEN UPDATE *; +---- +2 + +query ITTT +SELECT id, val, extra, ts FROM t_repartition_target ORDER BY id; +---- +1 a_new y1 2024-01-02 00:00:00.000000 +2 b_new y2 2024-01-02 00:00:00.000000 +3 c x3 2024-01-01 00:00:00.000000 + +statement ok +DROP TABLE t_repartition_target; + +statement ok +CREATE TABLE t_repartition_target (id INT, val VARCHAR, extra VARCHAR, ts TIMESTAMP); + +statement ok +INSERT INTO t_repartition_target VALUES (1, 'a', 'x1', '2024-01-01 00:00:00'); + +statement ok +SET enable_merge_into_block_id_repartition = 1; + +query I +MERGE INTO t_repartition_target AS tar +USING t_repartition_source AS sou +ON tar.id = sou.id +WHEN NOT MATCHED THEN INSERT *; +---- +4 + +query ITTT +SELECT id, val, extra, ts FROM t_repartition_target ORDER BY id; +---- +1 a x1 2024-01-01 00:00:00.000000 +2 b_new y2 2024-01-02 00:00:00.000000 +3 c_old y3 2023-12-01 00:00:00.000000 +6 f_new y6 2024-01-02 00:00:00.000000 +7 g_new y7 2024-01-02 00:00:00.000000 + +statement ok +DROP TABLE t_repartition_target; + +statement ok +CREATE TABLE t_repartition_target (id INT, val VARCHAR, extra VARCHAR, ts TIMESTAMP); + +statement ok +INSERT INTO t_repartition_target VALUES (1, 'a', 'x1', '2024-01-01 00:00:00'); + +statement ok +INSERT INTO t_repartition_target VALUES (2, 'b', 'x2', '2024-01-01 00:00:00'); + +statement ok +INSERT INTO t_repartition_target VALUES (3, 'c', 'x3', '2024-01-01 00:00:00'); + +statement ok +SET enable_merge_into_block_id_repartition = 1; + +query II +MERGE INTO t_repartition_target AS tar +USING t_repartition_source AS sou +ON tar.id = sou.id +WHEN MATCHED AND tar.ts < sou.ts THEN DELETE +WHEN NOT MATCHED THEN INSERT *; +---- +2 2 + +query ITTT +SELECT id, val, extra, ts FROM t_repartition_target ORDER BY id; +---- +3 c x3 2024-01-01 00:00:00.000000 +6 f_new y6 2024-01-02 00:00:00.000000 +7 g_new y7 2024-01-02 00:00:00.000000 + +statement ok +DROP TABLE IF EXISTS t_repartition_target; + +statement ok +DROP TABLE IF EXISTS t_repartition_source; + +statement ok +SET enable_merge_into_block_id_repartition = 1; + +statement ok +SET enable_merge_into_row_fetch = 1; From e311156f1dc89ce94713fbc4c8a9d5ef10d9f3c6 Mon Sep 17 00:00:00 2001 From: dantengsky Date: Thu, 9 Apr 2026 16:13:52 +0800 Subject: [PATCH 2/5] fix: use u16 instead of u8 for partition indices in BlockIdPartitionExchange Avoids truncation when max_threads exceeds 255. --- .../merge_into/processors/block_id_partition_exchange.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/query/storages/fuse/src/operations/merge_into/processors/block_id_partition_exchange.rs b/src/query/storages/fuse/src/operations/merge_into/processors/block_id_partition_exchange.rs index 3678e60ea14c7..cf755ecaee5ea 100644 --- a/src/query/storages/fuse/src/operations/merge_into/processors/block_id_partition_exchange.rs +++ b/src/query/storages/fuse/src/operations/merge_into/processors/block_id_partition_exchange.rs @@ -42,9 +42,9 @@ impl BlockIdPartitionExchange { } #[inline(always)] - fn partition_index(row_id: u64, n: usize) -> u8 { + fn partition_index(row_id: u64, n: usize) -> u16 { let (prefix, _) = split_row_id(row_id); - (prefix % n as u64) as u8 + (prefix % n as u64) as u16 } } @@ -55,7 +55,7 @@ impl Exchange for BlockIdPartitionExchange { fn partition(&self, data_block: DataBlock, n: usize) -> Result> { let num_rows = data_block.num_rows(); let entry = &data_block.columns()[self.row_id_col_offset]; - let mut indices = Vec::with_capacity(num_rows); + let mut indices: Vec = Vec::with_capacity(num_rows); match entry.data_type() { DataType::Number(NumberDataType::UInt64) => { @@ -85,7 +85,7 @@ impl Exchange for BlockIdPartitionExchange { indices.push(Self::partition_index(*row_id, n)); } else { let counter = self.null_counter.fetch_add(1, Ordering::Relaxed); - indices.push((counter % n as u64) as u8); + indices.push((counter % n as u64) as u16); } } } From 4f96be6a30c45aec4f1b9c170496df650b2fa770 Mon Sep 17 00:00:00 2001 From: dantengsky Date: Fri, 10 Apr 2026 00:20:19 +0800 Subject: [PATCH 3/5] fix: guard block_id repartition by RowFetch presence and use u16 for partition indices - Add has_row_fetch flag to MutationSplit so the exchange is only inserted when RowFetch follows (lazy columns exist). Without this, non-lazy workloads pay shuffle cost for no I/O benefit. - Use u16 instead of u8 for partition indices to avoid truncation when max_threads exceeds 255. --- .../src/physical_plans/physical_mutation.rs | 19 +++++++++++++------ .../physical_mutation_into_split.rs | 9 +++++++-- 2 files changed, 20 insertions(+), 8 deletions(-) diff --git a/src/query/service/src/physical_plans/physical_mutation.rs b/src/query/service/src/physical_plans/physical_mutation.rs index 25bbaf046246e..a7542685ccc73 100644 --- a/src/query/service/src/physical_plans/physical_mutation.rs +++ b/src/query/service/src/physical_plans/physical_mutation.rs @@ -453,21 +453,28 @@ impl PhysicalPlanBuilder { // If the mutation type is FullOperation, we use row_id column to split a block // into matched and not matched parts. + let has_lazy_columns = self + .metadata + .read() + .get_table_lazy_columns(target_table_index) + .is_some_and(|cols| !cols.is_empty()); + if matches!(strategy, MutationStrategy::MixedMatched) { plan = PhysicalPlan::new(MutationSplit { input: plan, split_index: row_id_offset, + has_row_fetch: has_lazy_columns, meta: PhysicalPlanMeta::new("MutationSplit"), }); } // Construct row fetch plan for lazy columns. - if let Some(lazy_columns) = self - .metadata - .read() - .get_table_lazy_columns(target_table_index) - && !lazy_columns.is_empty() - { + if has_lazy_columns { + let lazy_columns = self + .metadata + .read() + .get_table_lazy_columns(target_table_index) + .unwrap(); plan = build_mutation_row_fetch( plan, metadata.clone(), diff --git a/src/query/service/src/physical_plans/physical_mutation_into_split.rs b/src/query/service/src/physical_plans/physical_mutation_into_split.rs index 99e01dd2a12fb..956c8922eb533 100644 --- a/src/query/service/src/physical_plans/physical_mutation_into_split.rs +++ b/src/query/service/src/physical_plans/physical_mutation_into_split.rs @@ -33,6 +33,9 @@ pub struct MutationSplit { pub meta: PhysicalPlanMeta, pub input: PhysicalPlan, pub split_index: IndexType, + /// Whether RowFetch follows this MutationSplit (lazy columns exist). + /// Block_id repartition is only beneficial when RowFetch is present. + pub has_row_fetch: bool, } #[typetag::serde] @@ -67,6 +70,7 @@ impl IPhysicalPlan for MutationSplit { meta: self.meta.clone(), input, split_index: self.split_index, + has_row_fetch: self.has_row_fetch, }) } @@ -77,8 +81,9 @@ impl IPhysicalPlan for MutationSplit { // Add block_id repartition before split so each downstream RowFetch // processor sees rows from a disjoint set of blocks, eliminating - // duplicate block reads. - if max_threads > 1 + // duplicate block reads. Only useful when RowFetch follows. + if self.has_row_fetch + && max_threads > 1 && builder .settings .get_enable_merge_into_block_id_repartition()? From 4cbc49b9ab8742700d61856df181ad997c9c3f02 Mon Sep 17 00:00:00 2001 From: dantengsky Date: Fri, 10 Apr 2026 12:35:58 +0800 Subject: [PATCH 4/5] fix: add comment explaining why SELECT+LIMIT RowFetch skips repartition --- src/query/service/src/physical_plans/physical_row_fetch.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/query/service/src/physical_plans/physical_row_fetch.rs b/src/query/service/src/physical_plans/physical_row_fetch.rs index a6363bb8f58c7..4fa09565b3d2d 100644 --- a/src/query/service/src/physical_plans/physical_row_fetch.rs +++ b/src/query/service/src/physical_plans/physical_row_fetch.rs @@ -130,8 +130,11 @@ impl IPhysicalPlan for RowFetch { )?; if !MutationSplit::check_physical_plan(&self.input) { - // SELECT+LIMIT path or MatchedOnly path without MutationSplit. - // For MatchedOnly with lazy columns, add block_id repartition before RowFetch. + // For MatchedOnly MERGE INTO, add block_id repartition before RowFetch + // to reduce duplicate block reads. + // Not applicable to SELECT+LIMIT: the exchange would destroy the sort + // order produced by Sort+Limit (MergePartitionProcessor uses Random + // strategy with non-deterministic output order). if self.is_mutation { let max_threads = builder.settings.get_max_threads()? as usize; if max_threads > 1 From d40dcb029a31c1dbe8ec945699bddf386dcf440c Mon Sep 17 00:00:00 2001 From: dantengsky Date: Fri, 10 Apr 2026 12:47:54 +0800 Subject: [PATCH 5/5] refactor: rename setting and flag, broaden scope, add UPDATE...WHERE test MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Rename setting: enable_merge_into_block_id_repartition → enable_mutation_block_id_repartition - Rename RowFetch flag: is_merge_into → enable_block_id_repartition to accurately reflect scope (MERGE INTO + UPDATE...WHERE subquery) - Remove unnecessary serde(default) annotations - Improve comments: explain why SELECT+LIMIT skips repartition, use "reducing" instead of "eliminating" for duplicate reads - Rewrite test with proper structure, comments, and CREATE OR REPLACE - Add Test 6: UPDATE...WHERE (subquery) with lazy columns - Avoid unwrap in lazy_columns handling --- .../src/physical_plans/physical_limit.rs | 2 +- .../src/physical_plans/physical_mutation.rs | 15 +- .../physical_mutation_into_split.rs | 11 +- .../src/physical_plans/physical_row_fetch.rs | 20 +- src/query/settings/src/settings_default.rs | 4 +- .../settings/src/settings_getter_setter.rs | 4 +- .../processors/block_id_partition_exchange.rs | 5 +- ..._0051_merge_into_block_id_repartition.test | 178 ++++++++++++------ 8 files changed, 148 insertions(+), 91 deletions(-) diff --git a/src/query/service/src/physical_plans/physical_limit.rs b/src/query/service/src/physical_plans/physical_limit.rs index 1432abb3fdab0..237e87cd3e1ea 100644 --- a/src/query/service/src/physical_plans/physical_limit.rs +++ b/src/query/service/src/physical_plans/physical_limit.rs @@ -296,7 +296,7 @@ impl PhysicalPlanBuilder { cols_to_fetch, fetched_fields, need_wrap_nullable: false, - is_mutation: false, + enable_block_id_repartition: false, stat_info: Some(stat_info.clone()), }); } diff --git a/src/query/service/src/physical_plans/physical_mutation.rs b/src/query/service/src/physical_plans/physical_mutation.rs index a7542685ccc73..a451dff6bd2b9 100644 --- a/src/query/service/src/physical_plans/physical_mutation.rs +++ b/src/query/service/src/physical_plans/physical_mutation.rs @@ -453,28 +453,23 @@ impl PhysicalPlanBuilder { // If the mutation type is FullOperation, we use row_id column to split a block // into matched and not matched parts. - let has_lazy_columns = self + let lazy_columns = self .metadata .read() .get_table_lazy_columns(target_table_index) - .is_some_and(|cols| !cols.is_empty()); + .filter(|cols| !cols.is_empty()); if matches!(strategy, MutationStrategy::MixedMatched) { plan = PhysicalPlan::new(MutationSplit { input: plan, split_index: row_id_offset, - has_row_fetch: has_lazy_columns, + has_row_fetch: lazy_columns.is_some(), meta: PhysicalPlanMeta::new("MutationSplit"), }); } // Construct row fetch plan for lazy columns. - if has_lazy_columns { - let lazy_columns = self - .metadata - .read() - .get_table_lazy_columns(target_table_index) - .unwrap(); + if let Some(lazy_columns) = lazy_columns { plan = build_mutation_row_fetch( plan, metadata.clone(), @@ -816,7 +811,7 @@ fn build_mutation_row_fetch( cols_to_fetch, fetched_fields, need_wrap_nullable, - is_mutation: true, + enable_block_id_repartition: true, stat_info: None, meta: PhysicalPlanMeta::new("RowFetch"), }) diff --git a/src/query/service/src/physical_plans/physical_mutation_into_split.rs b/src/query/service/src/physical_plans/physical_mutation_into_split.rs index 956c8922eb533..8d0a84e8a56e3 100644 --- a/src/query/service/src/physical_plans/physical_mutation_into_split.rs +++ b/src/query/service/src/physical_plans/physical_mutation_into_split.rs @@ -33,8 +33,8 @@ pub struct MutationSplit { pub meta: PhysicalPlanMeta, pub input: PhysicalPlan, pub split_index: IndexType, - /// Whether RowFetch follows this MutationSplit (lazy columns exist). - /// Block_id repartition is only beneficial when RowFetch is present. + /// When true, a block_id repartition is inserted before the split to reduce + /// duplicate block reads in the downstream RowFetch stage. pub has_row_fetch: bool, } @@ -79,14 +79,13 @@ impl IPhysicalPlan for MutationSplit { let max_threads = builder.settings.get_max_threads()? as usize; - // Add block_id repartition before split so each downstream RowFetch - // processor sees rows from a disjoint set of blocks, eliminating - // duplicate block reads. Only useful when RowFetch follows. + // Repartition by block_id so each downstream RowFetch processor handles + // a disjoint set of blocks, reducing duplicate block reads. if self.has_row_fetch && max_threads > 1 && builder .settings - .get_enable_merge_into_block_id_repartition()? + .get_enable_mutation_block_id_repartition()? { let exchange = Arc::new(BlockIdPartitionExchange::create(self.split_index)); builder.main_pipeline.exchange(max_threads, exchange)?; diff --git a/src/query/service/src/physical_plans/physical_row_fetch.rs b/src/query/service/src/physical_plans/physical_row_fetch.rs index 4fa09565b3d2d..c9214f136967d 100644 --- a/src/query/service/src/physical_plans/physical_row_fetch.rs +++ b/src/query/service/src/physical_plans/physical_row_fetch.rs @@ -51,9 +51,11 @@ pub struct RowFetch { pub row_id_col_offset: usize, pub fetched_fields: Vec, pub need_wrap_nullable: bool, - /// True when this RowFetch is part of a MERGE INTO pipeline (not SELECT+LIMIT). - #[serde(default)] - pub is_mutation: bool, + /// When true, a block_id repartition is inserted before RowFetch to reduce + /// duplicate block reads. Applicable to join-based mutation paths (MERGE INTO, + /// UPDATE...FROM). Not applicable to SELECT+LIMIT where the exchange would + /// destroy sort order. + pub enable_block_id_repartition: bool, /// Only used for explain pub stat_info: Option, @@ -113,7 +115,7 @@ impl IPhysicalPlan for RowFetch { row_id_col_offset: self.row_id_col_offset, fetched_fields: self.fetched_fields.clone(), need_wrap_nullable: self.need_wrap_nullable, - is_mutation: self.is_mutation, + enable_block_id_repartition: self.enable_block_id_repartition, stat_info: self.stat_info.clone(), }) } @@ -132,15 +134,15 @@ impl IPhysicalPlan for RowFetch { if !MutationSplit::check_physical_plan(&self.input) { // For MatchedOnly MERGE INTO, add block_id repartition before RowFetch // to reduce duplicate block reads. - // Not applicable to SELECT+LIMIT: the exchange would destroy the sort - // order produced by Sort+Limit (MergePartitionProcessor uses Random - // strategy with non-deterministic output order). - if self.is_mutation { + // Not applicable to SELECT+LIMIT: pipeline.exchange() merges partitions + // with non-deterministic output order, which would destroy the sort + // order produced by Sort+Limit. + if self.enable_block_id_repartition { let max_threads = builder.settings.get_max_threads()? as usize; if max_threads > 1 && builder .settings - .get_enable_merge_into_block_id_repartition()? + .get_enable_mutation_block_id_repartition()? { let exchange = Arc::new(BlockIdPartitionExchange::create(self.row_id_col_offset)); diff --git a/src/query/settings/src/settings_default.rs b/src/query/settings/src/settings_default.rs index 73d6ded39bd70..37dfadade35fe 100644 --- a/src/query/settings/src/settings_default.rs +++ b/src/query/settings/src/settings_default.rs @@ -525,9 +525,9 @@ impl DefaultSettings { scope: SettingScope::Both, range: Some(SettingRange::Numeric(0..=1)), }), - ("enable_merge_into_block_id_repartition", DefaultSettingValue { + ("enable_mutation_block_id_repartition", DefaultSettingValue { value: UserSettingValue::UInt64(1), - desc: "Enable local block_id repartition before row fetch in merge into to reduce duplicate block reads.", + desc: "Enable local block_id repartition before row fetch in join-based mutations (MERGE INTO, UPDATE...FROM) to reduce duplicate block reads.", mode: SettingMode::Both, scope: SettingScope::Both, range: Some(SettingRange::Numeric(0..=1)), diff --git a/src/query/settings/src/settings_getter_setter.rs b/src/query/settings/src/settings_getter_setter.rs index c96de3eb57856..a37e07c7b6490 100644 --- a/src/query/settings/src/settings_getter_setter.rs +++ b/src/query/settings/src/settings_getter_setter.rs @@ -470,8 +470,8 @@ impl Settings { Ok(self.try_get_u64("enable_merge_into_row_fetch")? != 0) } - pub fn get_enable_merge_into_block_id_repartition(&self) -> Result { - Ok(self.try_get_u64("enable_merge_into_block_id_repartition")? != 0) + pub fn get_enable_mutation_block_id_repartition(&self) -> Result { + Ok(self.try_get_u64("enable_mutation_block_id_repartition")? != 0) } pub fn get_max_cte_recursive_depth(&self) -> Result { diff --git a/src/query/storages/fuse/src/operations/merge_into/processors/block_id_partition_exchange.rs b/src/query/storages/fuse/src/operations/merge_into/processors/block_id_partition_exchange.rs index cf755ecaee5ea..cba54eaa86896 100644 --- a/src/query/storages/fuse/src/operations/merge_into/processors/block_id_partition_exchange.rs +++ b/src/query/storages/fuse/src/operations/merge_into/processors/block_id_partition_exchange.rs @@ -25,11 +25,12 @@ use databend_common_pipeline::basic::Exchange; /// Partitions data blocks by block_id extracted from the `_row_id` column. /// /// This ensures that rows belonging to the same physical block are routed -/// to the same downstream processor, eliminating duplicate block reads +/// to the same downstream processor, reducing duplicate block reads /// in the RowFetch stage of MERGE INTO. pub struct BlockIdPartitionExchange { row_id_col_offset: usize, - /// Round-robin counter for NULL row_ids (unmatched rows in MixedMatched). + /// Incrementing counter used by `partition()` to spread NULL row_ids + /// (unmatched rows in MixedMatched) evenly across partitions. null_counter: AtomicU64, } diff --git a/tests/sqllogictests/suites/base/09_fuse_engine/09_0051_merge_into_block_id_repartition.test b/tests/sqllogictests/suites/base/09_fuse_engine/09_0051_merge_into_block_id_repartition.test index bd5c8ba308a31..48464a0136949 100644 --- a/tests/sqllogictests/suites/base/09_fuse_engine/09_0051_merge_into_block_id_repartition.test +++ b/tests/sqllogictests/suites/base/09_fuse_engine/09_0051_merge_into_block_id_repartition.test @@ -1,42 +1,60 @@ +## Test block_id repartition optimization for mutation RowFetch. +## When enabled, rows are partitioned by block_id before RowFetch so each +## processor handles a disjoint set of blocks, reducing duplicate block reads. +## Applies to join-based mutations: MERGE INTO and UPDATE...WHERE (subquery). + statement ok -DROP TABLE IF EXISTS t_repartition_target; +CREATE OR REPLACE DATABASE test_block_id_repartition statement ok -DROP TABLE IF EXISTS t_repartition_source; +USE test_block_id_repartition + +## ========================================================================== +## Setup: target table with multiple blocks (one INSERT per block), +## and a source table with overlapping + new rows. +## ========================================================================== statement ok -CREATE TABLE t_repartition_target (id INT, val VARCHAR, extra VARCHAR, ts TIMESTAMP); +CREATE OR REPLACE TABLE target (id INT, val VARCHAR, extra VARCHAR, ts TIMESTAMP) +## Each INSERT creates a separate block in storage. statement ok -INSERT INTO t_repartition_target VALUES (1, 'a', 'x1', '2024-01-01 00:00:00'); +INSERT INTO target VALUES (1, 'a', 'x1', '2024-01-01 00:00:00') statement ok -INSERT INTO t_repartition_target VALUES (2, 'b', 'x2', '2024-01-01 00:00:00'); +INSERT INTO target VALUES (2, 'b', 'x2', '2024-01-01 00:00:00') statement ok -INSERT INTO t_repartition_target VALUES (3, 'c', 'x3', '2024-01-01 00:00:00'); +INSERT INTO target VALUES (3, 'c', 'x3', '2024-01-01 00:00:00') statement ok -INSERT INTO t_repartition_target VALUES (4, 'd', 'x4', '2024-01-01 00:00:00'); +INSERT INTO target VALUES (4, 'd', 'x4', '2024-01-01 00:00:00') statement ok -INSERT INTO t_repartition_target VALUES (5, 'e', 'x5', '2024-01-01 00:00:00'); +INSERT INTO target VALUES (5, 'e', 'x5', '2024-01-01 00:00:00') statement ok -CREATE TABLE t_repartition_source (id INT, val VARCHAR, extra VARCHAR, ts TIMESTAMP); +CREATE OR REPLACE TABLE source (id INT, val VARCHAR, extra VARCHAR, ts TIMESTAMP) statement ok -INSERT INTO t_repartition_source VALUES (1, 'a_new', 'y1', '2024-01-02 00:00:00'), (2, 'b_new', 'y2', '2024-01-02 00:00:00'), (3, 'c_old', 'y3', '2023-12-01 00:00:00'), (6, 'f_new', 'y6', '2024-01-02 00:00:00'), (7, 'g_new', 'y7', '2024-01-02 00:00:00'); +INSERT INTO source VALUES (1, 'a_new', 'y1', '2024-01-02 00:00:00'), (2, 'b_new', 'y2', '2024-01-02 00:00:00'), (3, 'c_old', 'y3', '2023-12-01 00:00:00'), (6, 'f_new', 'y6', '2024-01-02 00:00:00'), (7, 'g_new', 'y7', '2024-01-02 00:00:00') + +## ========================================================================== +## Test 1: MixedMatched (WHEN MATCHED + WHEN NOT MATCHED), repartition ON. +## Source ids 1,2 match and have newer ts => updated. +## Source id 3 matches but has older ts => NOT updated (condition: tar.ts < sou.ts). +## Source ids 6,7 don't match => inserted. +## ========================================================================== statement ok -SET enable_merge_into_row_fetch = 1; +SET enable_merge_into_row_fetch = 1 statement ok -SET enable_merge_into_block_id_repartition = 1; +SET enable_mutation_block_id_repartition = 1 query II -MERGE INTO t_repartition_target AS tar -USING t_repartition_source AS sou +MERGE INTO target AS tar +USING source AS sou ON tar.id = sou.id WHEN MATCHED AND tar.ts < sou.ts THEN UPDATE * WHEN NOT MATCHED THEN INSERT *; @@ -44,7 +62,7 @@ WHEN NOT MATCHED THEN INSERT *; 2 2 query ITTT -SELECT id, val, extra, ts FROM t_repartition_target ORDER BY id; +SELECT id, val, extra, ts FROM target ORDER BY id; ---- 1 a_new y1 2024-01-02 00:00:00.000000 2 b_new y2 2024-01-02 00:00:00.000000 @@ -54,33 +72,34 @@ SELECT id, val, extra, ts FROM t_repartition_target ORDER BY id; 6 f_new y6 2024-01-02 00:00:00.000000 7 g_new y7 2024-01-02 00:00:00.000000 -statement ok -DROP TABLE t_repartition_target; +## ========================================================================== +## Test 2: Same scenario with repartition OFF. Results must be identical. +## ========================================================================== statement ok -CREATE TABLE t_repartition_target (id INT, val VARCHAR, extra VARCHAR, ts TIMESTAMP); +CREATE OR REPLACE TABLE target (id INT, val VARCHAR, extra VARCHAR, ts TIMESTAMP) statement ok -INSERT INTO t_repartition_target VALUES (1, 'a', 'x1', '2024-01-01 00:00:00'); +INSERT INTO target VALUES (1, 'a', 'x1', '2024-01-01 00:00:00') statement ok -INSERT INTO t_repartition_target VALUES (2, 'b', 'x2', '2024-01-01 00:00:00'); +INSERT INTO target VALUES (2, 'b', 'x2', '2024-01-01 00:00:00') statement ok -INSERT INTO t_repartition_target VALUES (3, 'c', 'x3', '2024-01-01 00:00:00'); +INSERT INTO target VALUES (3, 'c', 'x3', '2024-01-01 00:00:00') statement ok -INSERT INTO t_repartition_target VALUES (4, 'd', 'x4', '2024-01-01 00:00:00'); +INSERT INTO target VALUES (4, 'd', 'x4', '2024-01-01 00:00:00') statement ok -INSERT INTO t_repartition_target VALUES (5, 'e', 'x5', '2024-01-01 00:00:00'); +INSERT INTO target VALUES (5, 'e', 'x5', '2024-01-01 00:00:00') statement ok -SET enable_merge_into_block_id_repartition = 0; +SET enable_mutation_block_id_repartition = 0 query II -MERGE INTO t_repartition_target AS tar -USING t_repartition_source AS sou +MERGE INTO target AS tar +USING source AS sou ON tar.id = sou.id WHEN MATCHED AND tar.ts < sou.ts THEN UPDATE * WHEN NOT MATCHED THEN INSERT *; @@ -88,7 +107,7 @@ WHEN NOT MATCHED THEN INSERT *; 2 2 query ITTT -SELECT id, val, extra, ts FROM t_repartition_target ORDER BY id; +SELECT id, val, extra, ts FROM target ORDER BY id; ---- 1 a_new y1 2024-01-02 00:00:00.000000 2 b_new y2 2024-01-02 00:00:00.000000 @@ -98,61 +117,65 @@ SELECT id, val, extra, ts FROM t_repartition_target ORDER BY id; 6 f_new y6 2024-01-02 00:00:00.000000 7 g_new y7 2024-01-02 00:00:00.000000 -statement ok -DROP TABLE t_repartition_target; +## ========================================================================== +## Test 3: MatchedOnly (no NOT MATCHED clause), repartition ON. +## Only ids 1,2 are updated (ts condition). Id 3 not updated. +## ========================================================================== statement ok -CREATE TABLE t_repartition_target (id INT, val VARCHAR, extra VARCHAR, ts TIMESTAMP); +CREATE OR REPLACE TABLE target (id INT, val VARCHAR, extra VARCHAR, ts TIMESTAMP) statement ok -INSERT INTO t_repartition_target VALUES (1, 'a', 'x1', '2024-01-01 00:00:00'); +INSERT INTO target VALUES (1, 'a', 'x1', '2024-01-01 00:00:00') statement ok -INSERT INTO t_repartition_target VALUES (2, 'b', 'x2', '2024-01-01 00:00:00'); +INSERT INTO target VALUES (2, 'b', 'x2', '2024-01-01 00:00:00') statement ok -INSERT INTO t_repartition_target VALUES (3, 'c', 'x3', '2024-01-01 00:00:00'); +INSERT INTO target VALUES (3, 'c', 'x3', '2024-01-01 00:00:00') statement ok -SET enable_merge_into_block_id_repartition = 1; +SET enable_mutation_block_id_repartition = 1 query I -MERGE INTO t_repartition_target AS tar -USING t_repartition_source AS sou +MERGE INTO target AS tar +USING source AS sou ON tar.id = sou.id WHEN MATCHED AND tar.ts < sou.ts THEN UPDATE *; ---- 2 query ITTT -SELECT id, val, extra, ts FROM t_repartition_target ORDER BY id; +SELECT id, val, extra, ts FROM target ORDER BY id; ---- 1 a_new y1 2024-01-02 00:00:00.000000 2 b_new y2 2024-01-02 00:00:00.000000 3 c x3 2024-01-01 00:00:00.000000 -statement ok -DROP TABLE t_repartition_target; +## ========================================================================== +## Test 4: NotMatchedOnly (no MATCHED clause), repartition is a no-op +## since there is no RowFetch in this path. +## ========================================================================== statement ok -CREATE TABLE t_repartition_target (id INT, val VARCHAR, extra VARCHAR, ts TIMESTAMP); +CREATE OR REPLACE TABLE target (id INT, val VARCHAR, extra VARCHAR, ts TIMESTAMP) statement ok -INSERT INTO t_repartition_target VALUES (1, 'a', 'x1', '2024-01-01 00:00:00'); +INSERT INTO target VALUES (1, 'a', 'x1', '2024-01-01 00:00:00') statement ok -SET enable_merge_into_block_id_repartition = 1; +SET enable_mutation_block_id_repartition = 1 query I -MERGE INTO t_repartition_target AS tar -USING t_repartition_source AS sou +MERGE INTO target AS tar +USING source AS sou ON tar.id = sou.id WHEN NOT MATCHED THEN INSERT *; ---- 4 query ITTT -SELECT id, val, extra, ts FROM t_repartition_target ORDER BY id; +SELECT id, val, extra, ts FROM target ORDER BY id; ---- 1 a x1 2024-01-01 00:00:00.000000 2 b_new y2 2024-01-02 00:00:00.000000 @@ -160,27 +183,29 @@ SELECT id, val, extra, ts FROM t_repartition_target ORDER BY id; 6 f_new y6 2024-01-02 00:00:00.000000 7 g_new y7 2024-01-02 00:00:00.000000 -statement ok -DROP TABLE t_repartition_target; +## ========================================================================== +## Test 5: MixedMatched with DELETE instead of UPDATE, repartition ON. +## Ids 1,2 matched with newer ts => deleted. Ids 6,7 not matched => inserted. +## ========================================================================== statement ok -CREATE TABLE t_repartition_target (id INT, val VARCHAR, extra VARCHAR, ts TIMESTAMP); +CREATE OR REPLACE TABLE target (id INT, val VARCHAR, extra VARCHAR, ts TIMESTAMP) statement ok -INSERT INTO t_repartition_target VALUES (1, 'a', 'x1', '2024-01-01 00:00:00'); +INSERT INTO target VALUES (1, 'a', 'x1', '2024-01-01 00:00:00') statement ok -INSERT INTO t_repartition_target VALUES (2, 'b', 'x2', '2024-01-01 00:00:00'); +INSERT INTO target VALUES (2, 'b', 'x2', '2024-01-01 00:00:00') statement ok -INSERT INTO t_repartition_target VALUES (3, 'c', 'x3', '2024-01-01 00:00:00'); +INSERT INTO target VALUES (3, 'c', 'x3', '2024-01-01 00:00:00') statement ok -SET enable_merge_into_block_id_repartition = 1; +SET enable_mutation_block_id_repartition = 1 query II -MERGE INTO t_repartition_target AS tar -USING t_repartition_source AS sou +MERGE INTO target AS tar +USING source AS sou ON tar.id = sou.id WHEN MATCHED AND tar.ts < sou.ts THEN DELETE WHEN NOT MATCHED THEN INSERT *; @@ -188,20 +213,55 @@ WHEN NOT MATCHED THEN INSERT *; 2 2 query ITTT -SELECT id, val, extra, ts FROM t_repartition_target ORDER BY id; +SELECT id, val, extra, ts FROM target ORDER BY id; ---- 3 c x3 2024-01-01 00:00:00.000000 6 f_new y6 2024-01-02 00:00:00.000000 7 g_new y7 2024-01-02 00:00:00.000000 +## ========================================================================== +## Test 6: UPDATE...WHERE (subquery) with lazy columns, repartition ON. +## This path uses MatchedOnly strategy with RowFetch when the table has +## enough columns for lazy materialization. +## The subquery matches ids 1,2,3 from source; val is updated to 'updated'. +## ========================================================================== + +statement ok +CREATE OR REPLACE TABLE target (id INT, val VARCHAR, extra VARCHAR, ts TIMESTAMP) + +## Each INSERT creates a separate block. +statement ok +INSERT INTO target VALUES (1, 'a', 'x1', '2024-01-01 00:00:00') + statement ok -DROP TABLE IF EXISTS t_repartition_target; +INSERT INTO target VALUES (2, 'b', 'x2', '2024-01-01 00:00:00') statement ok -DROP TABLE IF EXISTS t_repartition_source; +INSERT INTO target VALUES (3, 'c', 'x3', '2024-01-01 00:00:00') statement ok -SET enable_merge_into_block_id_repartition = 1; +INSERT INTO target VALUES (4, 'd', 'x4', '2024-01-01 00:00:00') + +statement ok +SET enable_mutation_block_id_repartition = 1 + +statement ok +SET enable_merge_into_row_fetch = 1 + +statement ok +UPDATE target SET val = 'updated' WHERE id IN (SELECT id FROM source WHERE id <= 3) + +query ITTT +SELECT id, val, extra, ts FROM target ORDER BY id; +---- +1 updated x1 2024-01-01 00:00:00.000000 +2 updated x2 2024-01-01 00:00:00.000000 +3 updated x3 2024-01-01 00:00:00.000000 +4 d x4 2024-01-01 00:00:00.000000 + +## ========================================================================== +## Cleanup +## ========================================================================== statement ok -SET enable_merge_into_row_fetch = 1; +DROP DATABASE test_block_id_repartition