Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/query/service/src/physical_plans/physical_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,7 @@ impl PhysicalPlanBuilder {
cols_to_fetch,
fetched_fields,
need_wrap_nullable: false,
enable_block_id_repartition: false,
stat_info: Some(stat_info.clone()),
});
}
Expand Down
15 changes: 9 additions & 6 deletions src/query/service/src/physical_plans/physical_mutation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -453,21 +453,23 @@ impl PhysicalPlanBuilder {

// If the mutation type is FullOperation, we use row_id column to split a block
// into matched and not matched parts.
let lazy_columns = self
.metadata
.read()
.get_table_lazy_columns(target_table_index)
.filter(|cols| !cols.is_empty());

if matches!(strategy, MutationStrategy::MixedMatched) {
plan = PhysicalPlan::new(MutationSplit {
input: plan,
split_index: row_id_offset,
has_row_fetch: lazy_columns.is_some(),
meta: PhysicalPlanMeta::new("MutationSplit"),
});
}

// Construct row fetch plan for lazy columns.
if let Some(lazy_columns) = self
.metadata
.read()
.get_table_lazy_columns(target_table_index)
&& !lazy_columns.is_empty()
{
if let Some(lazy_columns) = lazy_columns {
plan = build_mutation_row_fetch(
plan,
metadata.clone(),
Expand Down Expand Up @@ -809,6 +811,7 @@ fn build_mutation_row_fetch(
cols_to_fetch,
fetched_fields,
need_wrap_nullable,
enable_block_id_repartition: true,
stat_info: None,
meta: PhysicalPlanMeta::new("RowFetch"),
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@
// limitations under the License.

use std::any::Any;
use std::sync::Arc;

use databend_common_exception::Result;
use databend_common_pipeline::core::Pipe;
use databend_common_sql::IndexType;
use databend_common_storages_fuse::operations::BlockIdPartitionExchange;
use databend_common_storages_fuse::operations::MutationSplitProcessor;

use crate::physical_plans::format::MutationSplitFormatter;
Expand All @@ -31,6 +33,9 @@ pub struct MutationSplit {
pub meta: PhysicalPlanMeta,
pub input: PhysicalPlan,
pub split_index: IndexType,
/// When true, a block_id repartition is inserted before the split to reduce
/// duplicate block reads in the downstream RowFetch stage.
pub has_row_fetch: bool,
}

#[typetag::serde]
Expand Down Expand Up @@ -65,15 +70,28 @@ impl IPhysicalPlan for MutationSplit {
meta: self.meta.clone(),
input,
split_index: self.split_index,
has_row_fetch: self.has_row_fetch,
})
}

fn build_pipeline2(&self, builder: &mut PipelineBuilder) -> Result<()> {
self.input.build_pipeline(builder)?;

builder
.main_pipeline
.try_resize(builder.settings.get_max_threads()? as usize)?;
let max_threads = builder.settings.get_max_threads()? as usize;

// Repartition by block_id so each downstream RowFetch processor handles
// a disjoint set of blocks, reducing duplicate block reads.
if self.has_row_fetch
&& max_threads > 1
&& builder
.settings
.get_enable_mutation_block_id_repartition()?
{
let exchange = Arc::new(BlockIdPartitionExchange::create(self.split_index));
builder.main_pipeline.exchange(max_threads, exchange)?;
} else {
builder.main_pipeline.try_resize(max_threads)?;
}

// The MutationStrategy is FullOperation, use row_id_idx to split
let mut items = Vec::with_capacity(builder.main_pipeline.output_len());
Expand Down
27 changes: 27 additions & 0 deletions src/query/service/src/physical_plans/physical_row_fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use std::any::Any;
use std::sync::Arc;

use databend_common_catalog::plan::DataSourcePlan;
use databend_common_catalog::plan::Projection;
Expand All @@ -25,6 +26,7 @@ use databend_common_pipeline::core::OutputPort;
use databend_common_pipeline::core::Pipe;
use databend_common_pipeline::core::PipeItem;
use databend_common_pipeline_transforms::create_dummy_item;
use databend_common_storages_fuse::operations::BlockIdPartitionExchange;
use databend_common_storages_fuse::operations::row_fetch_processor;
use itertools::Itertools;

Expand All @@ -49,6 +51,11 @@ pub struct RowFetch {
pub row_id_col_offset: usize,
pub fetched_fields: Vec<DataField>,
pub need_wrap_nullable: bool,
/// When true, a block_id repartition is inserted before RowFetch to reduce
/// duplicate block reads. Applicable to join-based mutation paths (MERGE INTO,
/// UPDATE...FROM). Not applicable to SELECT+LIMIT where the exchange would
/// destroy sort order.
pub enable_block_id_repartition: bool,

/// Only used for explain
pub stat_info: Option<PlanStatsInfo>,
Expand Down Expand Up @@ -108,6 +115,7 @@ impl IPhysicalPlan for RowFetch {
row_id_col_offset: self.row_id_col_offset,
fetched_fields: self.fetched_fields.clone(),
need_wrap_nullable: self.need_wrap_nullable,
enable_block_id_repartition: self.enable_block_id_repartition,
stat_info: self.stat_info.clone(),
})
}
Expand All @@ -124,8 +132,27 @@ impl IPhysicalPlan for RowFetch {
)?;

if !MutationSplit::check_physical_plan(&self.input) {
// For MatchedOnly MERGE INTO, add block_id repartition before RowFetch
// to reduce duplicate block reads.
// Not applicable to SELECT+LIMIT: pipeline.exchange() merges partitions
// with non-deterministic output order, which would destroy the sort
// order produced by Sort+Limit.
if self.enable_block_id_repartition {
let max_threads = builder.settings.get_max_threads()? as usize;
if max_threads > 1
&& builder
.settings
.get_enable_mutation_block_id_repartition()?
{
let exchange =
Arc::new(BlockIdPartitionExchange::create(self.row_id_col_offset));
builder.main_pipeline.exchange(max_threads, exchange)?;
}
}
builder.main_pipeline.add_transform(processor)?;
} else {
// MixedMatched path: MutationSplit is upstream.
// Repartition already happened in MutationSplit::build_pipeline2.
let output_len = builder.main_pipeline.output_len();
let mut pipe_items = Vec::with_capacity(output_len);
for i in 0..output_len {
Expand Down
7 changes: 7 additions & 0 deletions src/query/settings/src/settings_default.rs
Original file line number Diff line number Diff line change
Expand Up @@ -525,6 +525,13 @@ impl DefaultSettings {
scope: SettingScope::Both,
range: Some(SettingRange::Numeric(0..=1)),
}),
("enable_mutation_block_id_repartition", DefaultSettingValue {
value: UserSettingValue::UInt64(1),
desc: "Enable local block_id repartition before row fetch in join-based mutations (MERGE INTO, UPDATE...FROM) to reduce duplicate block reads.",
mode: SettingMode::Both,
scope: SettingScope::Both,
range: Some(SettingRange::Numeric(0..=1)),
}),
("max_cte_recursive_depth", DefaultSettingValue {
value: UserSettingValue::UInt64(1000),
desc: "Max recursive depth for recursive cte",
Expand Down
4 changes: 4 additions & 0 deletions src/query/settings/src/settings_getter_setter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,10 @@ impl Settings {
Ok(self.try_get_u64("enable_merge_into_row_fetch")? != 0)
}

pub fn get_enable_mutation_block_id_repartition(&self) -> Result<bool> {
Ok(self.try_get_u64("enable_mutation_block_id_repartition")? != 0)
}

pub fn get_max_cte_recursive_depth(&self) -> Result<usize> {
Ok(self.try_get_u64("max_cte_recursive_depth")? as usize)
}
Expand Down
1 change: 1 addition & 0 deletions src/query/storages/fuse/src/operations/merge_into/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ mod mutator;
mod processors;

pub use mutator::MatchedAggregator;
pub use processors::BlockIdPartitionExchange;
pub use processors::MatchedSplitProcessor;
pub use processors::MergeIntoNotMatchedProcessor;
pub use processors::MixRowIdKindAndLog;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;

use databend_common_catalog::plan::split_row_id;
use databend_common_exception::Result;
use databend_common_expression::DataBlock;
use databend_common_expression::types::DataType;
use databend_common_expression::types::NumberDataType;
use databend_common_pipeline::basic::Exchange;

/// Partitions data blocks by block_id extracted from the `_row_id` column.
///
/// This ensures that rows belonging to the same physical block are routed
/// to the same downstream processor, reducing duplicate block reads
/// in the RowFetch stage of MERGE INTO.
pub struct BlockIdPartitionExchange {
row_id_col_offset: usize,
/// Incrementing counter used by `partition()` to spread NULL row_ids
/// (unmatched rows in MixedMatched) evenly across partitions.
null_counter: AtomicU64,
}

impl BlockIdPartitionExchange {
pub fn create(row_id_col_offset: usize) -> Self {
Self {
row_id_col_offset,
null_counter: AtomicU64::new(0),
}
}

#[inline(always)]
fn partition_index(row_id: u64, n: usize) -> u16 {
let (prefix, _) = split_row_id(row_id);
(prefix % n as u64) as u16
}
}

impl Exchange for BlockIdPartitionExchange {
const NAME: &'static str = "BlockIdPartition";
const SKIP_EMPTY_DATA_BLOCK: bool = true;

fn partition(&self, data_block: DataBlock, n: usize) -> Result<Vec<DataBlock>> {
let num_rows = data_block.num_rows();
let entry = &data_block.columns()[self.row_id_col_offset];
let mut indices: Vec<u16> = Vec::with_capacity(num_rows);

match entry.data_type() {
DataType::Number(NumberDataType::UInt64) => {
let col = entry
.to_column()
.into_number()
.unwrap()
.into_u_int64()
.unwrap();
for row_id in col.iter() {
indices.push(Self::partition_index(*row_id, n));
}
}
DataType::Nullable(inner)
if matches!(inner.as_ref(), DataType::Number(NumberDataType::UInt64)) =>
{
let col = entry.to_column();
let nullable = col.into_nullable().unwrap();
let row_ids = nullable
.column
.into_number()
.unwrap()
.into_u_int64()
.unwrap();
for (row_id, is_valid) in row_ids.iter().zip(nullable.validity.iter()) {
if is_valid {
indices.push(Self::partition_index(*row_id, n));
} else {
let counter = self.null_counter.fetch_add(1, Ordering::Relaxed);
indices.push((counter % n as u64) as u16);
}
}
}
_ => unreachable!(
"Row id column should be UInt64 or Nullable(UInt64) for block_id partition"
),
}

DataBlock::scatter(&data_block, &indices, n)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@
// See the License for the specific language governing permissions and
// limitations under the License.

mod block_id_partition_exchange;
mod processor_merge_into_matched_and_split;
mod processor_merge_into_not_matched;
mod processor_merge_into_split;
mod processor_merge_into_split_row_number_and_log;
mod transform_matched_mutation_aggregator;

pub use block_id_partition_exchange::BlockIdPartitionExchange;
pub use processor_merge_into_matched_and_split::MatchedSplitProcessor;
pub use processor_merge_into_matched_and_split::MixRowIdKindAndLog;
pub(crate) use processor_merge_into_matched_and_split::RowIdKind;
Expand Down
Loading
Loading