From e73b65a33dba7a8f73c96194255f5e4c1a90acb8 Mon Sep 17 00:00:00 2001 From: sky <75521613+SkyFan2002@users.noreply.github.com> Date: Mon, 6 Apr 2026 15:41:45 +0800 Subject: [PATCH 1/8] update --- .../src/schedulers/fragments/fragmenter.rs | 26 ++++- .../src/servers/flight/v1/exchange/mod.rs | 4 + .../cascades/tasks/optimize_expr.rs | 33 ------ .../suites/mode/cluster/subquery.test | 105 ++++++++++++++++++ 4 files changed, 134 insertions(+), 34 deletions(-) diff --git a/src/query/service/src/schedulers/fragments/fragmenter.rs b/src/query/service/src/schedulers/fragments/fragmenter.rs index 0cbce7f5e7a18..b47498e510245 100644 --- a/src/query/service/src/schedulers/fragments/fragmenter.rs +++ b/src/query/service/src/schedulers/fragments/fragmenter.rs @@ -112,6 +112,15 @@ impl Fragmenter { }); let edges = Self::collect_fragments_edge(fragments.values()); + let source_lookup = fragments + .iter() + .map(|(fragment_id, fragment)| { + let mut fragment = fragment.clone(); + fragment.source_fragments.clear(); + (*fragment_id, fragment) + }) + .collect::>(); + let mut target_sources = BTreeMap::>::new(); for (source, target) in edges { let Some(fragment) = fragments.get_mut(&source) else { @@ -121,6 +130,21 @@ impl Fragmenter { if let Some(exchange_sink) = ExchangeSink::from_mut_physical_plan(&mut fragment.plan) { exchange_sink.destination_fragment_id = target; } + + target_sources.entry(target).or_default().push(source); + } + + for (target, sources) in target_sources { + let Some(fragment) = fragments.get_mut(&target) else { + continue; + }; + + let mut source_fragments = sources + .into_iter() + .filter_map(|source| source_lookup.get(&source).cloned()) + .collect::>(); + source_fragments.sort_by_key(|fragment| fragment.fragment_id); + fragment.source_fragments = source_fragments; } Ok(fragments.into_values().collect::>()) @@ -312,7 +336,7 @@ impl DeriveHandle for FragmentDeriveHandle { let source_fragment = PlanFragment { plan, - exchange, + exchange: exchange.clone(), fragment_type, source_fragments: vec![], fragment_id: source_fragment_id, diff --git a/src/query/service/src/servers/flight/v1/exchange/mod.rs b/src/query/service/src/servers/flight/v1/exchange/mod.rs index c4fb32fc0e34e..b406fa664187a 100644 --- a/src/query/service/src/servers/flight/v1/exchange/mod.rs +++ b/src/query/service/src/servers/flight/v1/exchange/mod.rs @@ -45,9 +45,13 @@ pub use data_exchange::NodeToNodeExchange; pub use exchange_injector::DefaultExchangeInjector; pub use exchange_injector::ExchangeInjector; pub use exchange_manager::DataExchangeManager; +pub use exchange_params::BroadcastExchangeParams; +pub use exchange_params::ExchangeParams; +pub use exchange_params::GlobalExchangeParams; pub use exchange_params::MergeExchangeParams; pub use exchange_params::ShuffleExchangeParams; pub use exchange_sorting::ExchangeSorting; +pub use exchange_transform::ExchangeTransform; pub use exchange_transform_scatter::ScatterTransform; pub use exchange_transform_shuffle::ExchangeShuffleMeta; pub use exchange_transform_shuffle::ExchangeShuffleTransform; diff --git a/src/query/sql/src/planner/optimizer/optimizers/cascades/tasks/optimize_expr.rs b/src/query/sql/src/planner/optimizer/optimizers/cascades/tasks/optimize_expr.rs index f76c25248b4df..8546c65f00d5b 100644 --- a/src/query/sql/src/planner/optimizer/optimizers/cascades/tasks/optimize_expr.rs +++ b/src/query/sql/src/planner/optimizer/optimizers/cascades/tasks/optimize_expr.rs @@ -354,39 +354,6 @@ impl OptimizeExprTask { let rel_expr = RelExpr::with_opt_context(m_expr, &optimizer.memo, &children_best_props); let physical_prop = rel_expr.derive_physical_prop()?; - let should_enforce = { - let mut should_enforce = true; - - if optimizer.enforce_distribution() - && physical_prop.distribution == Distribution::Serial - && !matches!( - self.required_prop.distribution, - Distribution::Serial | Distribution::Any - ) - { - should_enforce = false; - } - - if optimizer.enforce_distribution() - && children_best_props - .iter() - .any(|prop| prop.distribution == Distribution::Serial) - && !children_best_props - .iter() - .all(|prop| prop.distribution == Distribution::Serial) - { - should_enforce = false; - } - - should_enforce - }; - - // Sometimes we cannot enforce the required property and we cannot - // optimize the expression in this situation. - if !should_enforce { - return Ok(OptimizeExprEvent::OptimizedSelf); - } - let mut enforcers: Vec> = Vec::new(); // Enforcers of distribution. diff --git a/tests/sqllogictests/suites/mode/cluster/subquery.test b/tests/sqllogictests/suites/mode/cluster/subquery.test index 1b8ef4f3a81d6..702eec07456a0 100644 --- a/tests/sqllogictests/suites/mode/cluster/subquery.test +++ b/tests/sqllogictests/suites/mode/cluster/subquery.test @@ -174,6 +174,111 @@ Exchange ├── push downs: [filters: [], limit: NONE] └── estimated rows: 3.00 +statement ok +drop table if exists t_limit_in_subquery all; + +statement ok +create table t_limit_in_subquery(a int not null); + +statement ok +insert into t_limit_in_subquery select number from numbers(30); + +query T nosort +explain +select t.a +from t_limit_in_subquery t +where a in ( + select a + from t_limit_in_subquery + limit 10 +) +order by a; +---- +Sort(Final) +├── output columns: [t.a (#0)] +├── sort keys: [a ASC NULLS LAST] +├── estimated rows: 30.00 +└── Exchange + ├── output columns: [t.a (#0)] + ├── exchange type: Merge + └── Sort(Partial) + ├── output columns: [t.a (#0)] + ├── sort keys: [a ASC NULLS LAST] + ├── estimated rows: 30.00 + └── HashJoin + ├── output columns: [t.a (#0)] + ├── join type: LEFT SEMI + ├── build keys: [subquery_1 (#1)] + ├── probe keys: [t.a (#0)] + ├── keys is null equal: [false] + ├── filters: [] + ├── build join filters: + │ └── filter id:0, build key:subquery_1 (#1), probe targets:[t.a (#0)@scan0], filter type:bloom,inlist,min_max + ├── estimated rows: 30.00 + ├── Exchange(Build) + │ ├── output columns: [t_limit_in_subquery.a (#1)] + │ ├── exchange type: Broadcast + │ └── Limit + │ ├── output columns: [t_limit_in_subquery.a (#1)] + │ ├── limit: 10 + │ ├── offset: 0 + │ ├── estimated rows: 10.00 + │ └── Exchange + │ ├── output columns: [t_limit_in_subquery.a (#1)] + │ ├── exchange type: Merge + │ └── Limit + │ ├── output columns: [t_limit_in_subquery.a (#1)] + │ ├── limit: 10 + │ ├── offset: 0 + │ ├── estimated rows: 10.00 + │ └── TableScan + │ ├── table: default.d_subquery.t_limit_in_subquery + │ ├── scan id: 1 + │ ├── output columns: [a (#1)] + │ ├── read rows: 30 + │ ├── read size: < 1 KiB + │ ├── partitions total: 1 + │ ├── partitions scanned: 1 + │ ├── pruning stats: [segments: , blocks: ] + │ ├── push downs: [filters: [], limit: 10] + │ └── estimated rows: 30.00 + └── TableScan(Probe) + ├── table: default.d_subquery.t_limit_in_subquery + ├── scan id: 0 + ├── output columns: [a (#0)] + ├── read rows: 30 + ├── read size: < 1 KiB + ├── partitions total: 1 + ├── partitions scanned: 1 + ├── pruning stats: [segments: , blocks: ] + ├── push downs: [filters: [], limit: NONE] + ├── apply join filters: [#0] + └── estimated rows: 30.00 + +query I +select t.a +from t_limit_in_subquery t +where a in ( + select a + from t_limit_in_subquery + limit 10 +) +order by a; +---- +0 +1 +2 +3 +4 +5 +6 +7 +8 +9 + + +statement ok +drop table if exists t_limit_in_subquery all; statement ok drop database d_subquery From 26b010168e63c2bedf09dc99bb2afd5915447fdc Mon Sep 17 00:00:00 2001 From: sky <75521613+SkyFan2002@users.noreply.github.com> Date: Sun, 12 Apr 2026 14:33:31 +0800 Subject: [PATCH 2/8] fix(exchange): allow remote-only exchange sources Add execution-layer fallback for ExchangeSource when the source fragment is not present in the local QueryCoordinator. - carry source DataExchange metadata through Fragmenter - return Option from fragment subscription lookups - build merge/broadcast/shuffle/global-shuffle sources directly from remote receivers when local fragment expansion is unavailable Validation: - cargo check -p databend-query --lib - cargo build -p databend-query --lib --- .../physical_exchange_source.rs | 51 +++-- .../src/schedulers/fragments/fragmenter.rs | 1 + .../flight/v1/exchange/exchange_manager.rs | 20 +- .../flight/v1/exchange/exchange_source.rs | 209 +++++++++++++++--- .../src/servers/flight/v1/exchange/mod.rs | 1 + 5 files changed, 230 insertions(+), 52 deletions(-) diff --git a/src/query/service/src/physical_plans/physical_exchange_source.rs b/src/query/service/src/physical_plans/physical_exchange_source.rs index 79076f92fb2b4..fa9d36b4ab4b4 100644 --- a/src/query/service/src/physical_plans/physical_exchange_source.rs +++ b/src/query/service/src/physical_plans/physical_exchange_source.rs @@ -14,6 +14,7 @@ use std::any::Any; +use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_expression::DataSchemaRef; use databend_common_pipeline::core::PlanScope; @@ -24,6 +25,8 @@ use crate::physical_plans::physical_plan::IPhysicalPlan; use crate::physical_plans::physical_plan::PhysicalPlan; use crate::physical_plans::physical_plan::PhysicalPlanMeta; use crate::pipelines::PipelineBuilder; +use crate::servers::flight::v1::exchange::DataExchange; +use crate::servers::flight::v1::exchange::via_remote_exchange_source; #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] pub struct ExchangeSource { @@ -34,6 +37,7 @@ pub struct ExchangeSource { // Fragment ID of source fragment pub source_fragment_id: usize, + pub source_exchange: Option, pub query_id: String, } @@ -73,30 +77,49 @@ impl IPhysicalPlan for ExchangeSource { meta: self.meta.clone(), schema: self.schema.clone(), source_fragment_id: self.source_fragment_id, + source_exchange: self.source_exchange.clone(), query_id: self.query_id.clone(), }) } fn build_pipeline2(&self, builder: &mut PipelineBuilder) -> Result<()> { let exchange_manager = builder.ctx.get_exchange_manager(); - let build_res = exchange_manager.get_fragment_source( + if let Some(build_res) = exchange_manager.get_fragment_source( &self.query_id, self.source_fragment_id, builder.exchange_injector.clone(), + )? { + let plan_scope = PlanScope::get_plan_scope(); + let build_pipeline = build_res.main_pipeline.finalize(plan_scope); + + // add sharing data + builder.join_state = build_res.builder_data.input_join_state; + builder.merge_into_probe_data_fields = build_res.builder_data.input_probe_schema; + + // Merge pipeline + assert_eq!(builder.main_pipeline.output_len(), 0); + let sinks = builder.main_pipeline.merge(build_pipeline)?; + builder.main_pipeline.extend_sinks(sinks); + builder.pipelines.extend(build_res.sources_pipelines); + return Ok(()); + } + + let source_exchange = self.source_exchange.as_ref().ok_or_else(|| { + ErrorCode::Internal(format!( + "Source fragment {} of query {} has no exchange metadata", + self.source_fragment_id, self.query_id + )) + })?; + + via_remote_exchange_source( + builder.ctx.clone(), + &self.query_id, + self.source_fragment_id, + &self.schema, + source_exchange, + builder.exchange_injector.clone(), + &mut builder.main_pipeline, )?; - - let plan_scope = PlanScope::get_plan_scope(); - let build_pipeline = build_res.main_pipeline.finalize(plan_scope); - - // add sharing data - builder.join_state = build_res.builder_data.input_join_state; - builder.merge_into_probe_data_fields = build_res.builder_data.input_probe_schema; - - // Merge pipeline - assert_eq!(builder.main_pipeline.output_len(), 0); - let sinks = builder.main_pipeline.merge(build_pipeline)?; - builder.main_pipeline.extend_sinks(sinks); - builder.pipelines.extend(build_res.sources_pipelines); Ok(()) } } diff --git a/src/query/service/src/schedulers/fragments/fragmenter.rs b/src/query/service/src/schedulers/fragments/fragmenter.rs index b47498e510245..51b1d1c415253 100644 --- a/src/query/service/src/schedulers/fragments/fragmenter.rs +++ b/src/query/service/src/schedulers/fragments/fragmenter.rs @@ -350,6 +350,7 @@ impl DeriveHandle for FragmentDeriveHandle { query_id: self.query_id.clone(), source_fragment_id, + source_exchange: exchange, meta: PhysicalPlanMeta::with_plan_id("ExchangeSource", plan_id), })); } diff --git a/src/query/service/src/servers/flight/v1/exchange/exchange_manager.rs b/src/query/service/src/servers/flight/v1/exchange/exchange_manager.rs index 3948eac2d4a1d..001da8a79c6fb 100644 --- a/src/query/service/src/servers/flight/v1/exchange/exchange_manager.rs +++ b/src/query/service/src/servers/flight/v1/exchange/exchange_manager.rs @@ -884,7 +884,7 @@ impl DataExchangeManager { query_id: &str, fragment_id: usize, injector: Arc, - ) -> Result { + ) -> Result> { let queries_coordinator_guard = self.queries_coordinator.lock(); let queries_coordinator = unsafe { &mut *queries_coordinator_guard.deref().get() }; @@ -898,7 +898,7 @@ impl DataExchangeManager { .query_ctx .clone(); - query_coordinator.subscribe_fragment(&query_ctx, fragment_id, injector) + query_coordinator.try_subscribe_fragment(&query_ctx, fragment_id, injector) } } } @@ -1084,6 +1084,16 @@ impl QueryCoordinator { fragment_id: usize, injector: Arc, ) -> Result { + self.try_subscribe_fragment(ctx, fragment_id, injector)? + .ok_or_else(|| ErrorCode::Unimplemented("ExchangeSource is unimplemented")) + } + + pub fn try_subscribe_fragment( + &mut self, + ctx: &Arc, + fragment_id: usize, + injector: Arc, + ) -> Result> { // Merge pipelines if exist locally pipeline if let Some(mut fragment_coordinator) = self.fragments_coordinator.remove(&fragment_id) { let info = self.info.as_ref().expect("QueryInfo is none"); @@ -1098,7 +1108,7 @@ impl QueryCoordinator { if fragment_coordinator.data_exchange.is_none() { // When the root fragment and the data has been send to the coordination node, // we do not need to wait for the data of other nodes. - return Ok(fragment_coordinator.pipeline_build_res.unwrap()); + return Ok(Some(fragment_coordinator.pipeline_build_res.unwrap())); } let exchange_params = fragment_coordinator @@ -1124,9 +1134,9 @@ impl QueryCoordinator { injector, )?; - return Ok(build_res); + return Ok(Some(build_res)); } - Err(ErrorCode::Unimplemented("ExchangeSource is unimplemented")) + Ok(None) } pub fn shutdown_query(&mut self, cause: Option) { diff --git a/src/query/service/src/servers/flight/v1/exchange/exchange_source.rs b/src/query/service/src/servers/flight/v1/exchange/exchange_source.rs index 031c704fea1f7..e772094671f2a 100644 --- a/src/query/service/src/servers/flight/v1/exchange/exchange_source.rs +++ b/src/query/service/src/servers/flight/v1/exchange/exchange_source.rs @@ -17,6 +17,7 @@ use std::sync::Arc; use databend_common_catalog::table_context::TableContext; use databend_common_exception::ErrorCode; use databend_common_exception::Result; +use databend_common_expression::DataSchemaRef; use databend_common_pipeline::core::InputPort; use databend_common_pipeline::core::OutputPort; use databend_common_pipeline::core::Pipe; @@ -24,16 +25,43 @@ use databend_common_pipeline::core::PipeItem; use databend_common_pipeline::core::Pipeline; use databend_common_pipeline_transforms::processors::TransformDummy; +use super::exchange_params::BroadcastExchangeParams; use super::exchange_params::ExchangeParams; use super::exchange_params::GlobalExchangeParams; use super::exchange_params::MergeExchangeParams; +use super::exchange_params::ShuffleExchangeParams; use super::exchange_source_reader::ExchangeSourceReader; +use super::exchange_source_reader::create_reader_item; use super::hash_send_source::HashSendSource; use crate::clusters::ClusterHelper; +use crate::servers::flight::v1::exchange::BroadcastExchange; +use crate::servers::flight::v1::exchange::DataExchange; use crate::servers::flight::v1::exchange::DataExchangeManager; +use crate::servers::flight::v1::exchange::DefaultExchangeInjector; use crate::servers::flight::v1::exchange::ExchangeInjector; +use crate::servers::flight::v1::exchange::NodeToNodeExchange; +use crate::servers::flight::v1::scatter::BroadcastFlightScatter; use crate::sessions::QueryContext; +fn add_source_pipe(pipeline: &mut Pipeline, source_items: Vec) { + let last_output_len = pipeline.output_len(); + let mut items = Vec::with_capacity(last_output_len + source_items.len()); + + for _index in 0..last_output_len { + let input = InputPort::create(); + let output = OutputPort::create(); + + items.push(PipeItem::create( + TransformDummy::create(input.clone(), output.clone()), + vec![input], + vec![output], + )); + } + + items.extend(source_items); + pipeline.add_pipe(Pipe::create(last_output_len, items.len(), items)); +} + /// Add Exchange Source to the pipeline. pub fn via_exchange_source( ctx: Arc, @@ -60,43 +88,27 @@ pub fn via_exchange_source( let exchange_params = ExchangeParams::MergeExchange(params.clone()); let exchange_manager = ctx.get_exchange_manager(); let flight_receivers = exchange_manager.get_flight_receiver(&exchange_params)?; - let last_output_len = pipeline.output_len(); - let mut items = Vec::with_capacity(last_output_len + flight_receivers.len()); - - for _index in 0..last_output_len { - let input = InputPort::create(); - let output = OutputPort::create(); - - items.push(PipeItem::create( - TransformDummy::create(input.clone(), output.clone()), - vec![input], - vec![output], - )); - } + let mut source_items = Vec::with_capacity(flight_receivers.len()); for flight_exchange in flight_receivers { let output = OutputPort::create(); - items.push(PipeItem::create( + source_items.push(PipeItem::create( ExchangeSourceReader::create(output.clone(), flight_exchange), vec![], vec![output], )); } - pipeline.add_pipe(Pipe::create(last_output_len, items.len(), items)); + add_source_pipe(pipeline, source_items); - if params.allow_adjust_parallelism { + if params.allow_adjust_parallelism && last_output_len > 0 { pipeline.try_resize(last_output_len)?; } injector.apply_merge_deserializer(params, pipeline) } -/// Add HashSendSource receivers to the pipeline for hash exchange source-only path. -/// Existing pipeline outputs pass through as DummyTransforms; remote InboundChannels -/// are added as HashSendSource processors. -#[allow(dead_code)] pub fn via_hash_exchange_source( _ctx: &Arc, params: &GlobalExchangeParams, @@ -109,29 +121,160 @@ pub fn via_hash_exchange_source( let channel_set = exchange_manager.get_exchange_channel_set(query_id, exchange_id)?; let waker = pipeline.get_waker(); - let last_output_len = pipeline.output_len(); let num_receivers = channel_set.channels.len(); - let mut items = Vec::with_capacity(last_output_len + num_receivers); + let mut source_items = Vec::with_capacity(num_receivers); - for _index in 0..last_output_len { - let input = InputPort::create(); - let output = OutputPort::create(); - - items.push(PipeItem::create( - TransformDummy::create(input.clone(), output.clone()), - vec![input], - vec![output], + for idx in 0..num_receivers { + source_items.push(HashSendSource::create_item( + idx, + channel_set.create_receiver(idx, ¶ms.schema), + waker.clone(), )); } - for idx in 0..num_receivers { - items.push(HashSendSource::create_item( + add_source_pipe(pipeline, source_items); + Ok(()) +} + +pub fn via_broadcast_exchange_source( + params: &BroadcastExchangeParams, + pipeline: &mut Pipeline, +) -> Result<()> { + let exchange_manager = DataExchangeManager::instance(); + let channel_set = + exchange_manager.get_exchange_channel_set(¶ms.query_id, ¶ms.exchange_id)?; + let waker = pipeline.get_waker(); + let mut source_items = Vec::with_capacity(channel_set.channels.len()); + + for idx in 0..channel_set.channels.len() { + source_items.push(HashSendSource::create_item( idx, channel_set.create_receiver(idx, ¶ms.schema), waker.clone(), )); } - pipeline.add_pipe(Pipe::create(last_output_len, items.len(), items)); + add_source_pipe(pipeline, source_items); Ok(()) } + +pub fn via_shuffle_exchange_source( + ctx: &Arc, + params: &ShuffleExchangeParams, + injector: Arc, + pipeline: &mut Pipeline, +) -> Result<()> { + let local_channels = params + .destination_channels + .iter() + .find_map(|(destination, channels)| { + (destination == ¶ms.executor_id).then_some(channels.len()) + }) + .unwrap_or_default(); + + if local_channels == 0 { + return Err(ErrorCode::Internal(format!( + "Executor {} is not a destination of fragment {} exchange", + params.executor_id, params.fragment_id + ))); + } + + let exchange_manager = ctx.get_exchange_manager(); + let exchange_params = ExchangeParams::NodeShuffleExchange(params.clone()); + let flight_receivers = exchange_manager.get_flight_receiver(&exchange_params)?; + let mut source_items = Vec::with_capacity(flight_receivers.len()); + + for receiver in flight_receivers { + source_items.push(create_reader_item(receiver)); + } + + add_source_pipe(pipeline, source_items); + injector.apply_shuffle_deserializer(params, pipeline) +} + +fn create_broadcast_source_params( + ctx: &Arc, + query_id: &str, + schema: &DataSchemaRef, + exchange: &BroadcastExchange, +) -> BroadcastExchangeParams { + BroadcastExchangeParams { + query_id: query_id.to_string(), + executor_id: ctx.get_cluster().local_id(), + schema: schema.clone(), + exchange_id: exchange.id.clone(), + destination_channels: exchange.destination_channels.clone(), + } +} + +fn create_shuffle_source_params( + ctx: &Arc, + query_id: &str, + fragment_id: usize, + schema: &DataSchemaRef, + exchange: &NodeToNodeExchange, +) -> ShuffleExchangeParams { + ShuffleExchangeParams { + query_id: query_id.to_string(), + executor_id: ctx.get_cluster().local_id(), + fragment_id, + schema: schema.clone(), + destination_ids: exchange.destination_ids.clone(), + destination_channels: exchange.destination_channels.clone(), + shuffle_scatter: Arc::new(Box::new( + BroadcastFlightScatter::try_create(1).expect("broadcast scatter must create"), + )), + exchange_injector: DefaultExchangeInjector::create(), + allow_adjust_parallelism: exchange.allow_adjust_parallelism, + } +} + +pub fn via_remote_exchange_source( + ctx: Arc, + query_id: &str, + fragment_id: usize, + schema: &DataSchemaRef, + exchange: &DataExchange, + injector: Arc, + pipeline: &mut Pipeline, +) -> Result<()> { + match exchange { + DataExchange::Merge(exchange) => via_exchange_source( + ctx, + &MergeExchangeParams { + query_id: query_id.to_string(), + fragment_id, + destination_id: exchange.destination_id.clone(), + channel_id: exchange.channel_id.clone(), + schema: schema.clone(), + ignore_exchange: exchange.ignore_exchange, + allow_adjust_parallelism: exchange.allow_adjust_parallelism, + exchange_injector: DefaultExchangeInjector::create(), + }, + injector, + pipeline, + ), + DataExchange::Broadcast(exchange) => via_broadcast_exchange_source( + &create_broadcast_source_params(&ctx, query_id, schema, exchange), + pipeline, + ), + DataExchange::NodeToNodeExchange(exchange) => via_shuffle_exchange_source( + &ctx, + &create_shuffle_source_params(&ctx, query_id, fragment_id, schema, exchange), + injector, + pipeline, + ), + DataExchange::GlobalShuffleExchange(exchange) => via_hash_exchange_source( + &ctx, + &GlobalExchangeParams { + query_id: query_id.to_string(), + executor_id: ctx.get_cluster().local_id(), + schema: schema.clone(), + exchange_id: exchange.id.clone(), + shuffle_keys: exchange.shuffle_keys.clone(), + destination_channels: exchange.destination_channels.clone(), + }, + pipeline, + ), + } +} diff --git a/src/query/service/src/servers/flight/v1/exchange/mod.rs b/src/query/service/src/servers/flight/v1/exchange/mod.rs index b406fa664187a..3425f2d4391ca 100644 --- a/src/query/service/src/servers/flight/v1/exchange/mod.rs +++ b/src/query/service/src/servers/flight/v1/exchange/mod.rs @@ -51,6 +51,7 @@ pub use exchange_params::GlobalExchangeParams; pub use exchange_params::MergeExchangeParams; pub use exchange_params::ShuffleExchangeParams; pub use exchange_sorting::ExchangeSorting; +pub use exchange_source::via_remote_exchange_source; pub use exchange_transform::ExchangeTransform; pub use exchange_transform_scatter::ScatterTransform; pub use exchange_transform_shuffle::ExchangeShuffleMeta; From 8eb25f88518518b68bcd2162c1a4524c95c056a2 Mon Sep 17 00:00:00 2001 From: sky <75521613+SkyFan2002@users.noreply.github.com> Date: Sun, 12 Apr 2026 14:47:44 +0800 Subject: [PATCH 3/8] fix(exchange): precreate inbound channel sets Register and lazily create inbound channel sets for do_exchange receivers so remote-only exchange sources can build pipelines before network senders connect. - pre-register incoming exchange fragments during query env init - add get-or-create helpers on DataExchangeManager and QueryCoordinator - switch broadcast/global-shuffle receive paths to use get-or-create Validation: - cargo check -p databend-query --lib - db-slt --cluster --run-dir cluster --run-file subquery.test --- .../flight/v1/exchange/exchange_manager.rs | 79 ++++++++++++++++++- .../flight/v1/exchange/exchange_sink.rs | 6 +- .../flight/v1/exchange/exchange_source.rs | 53 ++++++++----- .../flight/v1/exchange/exchange_transform.rs | 12 ++- 4 files changed, 125 insertions(+), 25 deletions(-) diff --git a/src/query/service/src/servers/flight/v1/exchange/exchange_manager.rs b/src/query/service/src/servers/flight/v1/exchange/exchange_manager.rs index 001da8a79c6fb..69eeb3ea05950 100644 --- a/src/query/service/src/servers/flight/v1/exchange/exchange_manager.rs +++ b/src/query/service/src/servers/flight/v1/exchange/exchange_manager.rs @@ -249,6 +249,7 @@ impl DataExchangeManager { let mut request_exchanges = HashMap::new(); let mut targets_exchanges = HashMap::>::new(); + let mut inbound_channel_sizes = HashMap::::new(); for index in env.dataflow_diagram.node_indices() { if env.dataflow_diagram[index].id == config.query.node_id { @@ -297,9 +298,25 @@ impl DataExchangeManager { }) })); } - Edge::ExchangeFragment { .. } => { - // Skip: remote sender will call do_exchange on us, - // handled by handle_do_exchange → NetworkInboundSender + Edge::ExchangeFragment { + exchange_id, + channels, + } => { + // Pre-register inbound channel sets so source-only exchange + // receivers can build their pipelines before the first + // do_exchange connection arrives. + match inbound_channel_sizes.entry(exchange_id) { + Entry::Occupied(v) => { + if *v.get() != channels.len() { + return Err(ErrorCode::Internal( + "Mismatched inbound exchange parallelism", + )); + } + } + Entry::Vacant(v) => { + v.insert(channels.len()); + } + } } } } @@ -419,6 +436,7 @@ impl DataExchangeManager { query_coordinator.info = query_info; query_coordinator.is_request_server = GlobalConfig::instance().query.node_id == env.request_server_id; + query_coordinator.register_inbound_channel_sets(&inbound_channel_sizes)?; query_coordinator.register_flight_channel_receiver(targets_exchanges)?; query_coordinator.register_ping_pong_exchanges(ping_pong_exchanges); query_coordinator.add_statistics_exchanges(request_exchanges)?; @@ -428,6 +446,7 @@ impl DataExchangeManager { query_coordinator.info = query_info; query_coordinator.is_request_server = GlobalConfig::instance().query.node_id == env.request_server_id; + query_coordinator.register_inbound_channel_sets(&inbound_channel_sizes)?; query_coordinator.register_flight_channel_receiver(targets_exchanges)?; query_coordinator.register_ping_pong_exchanges(ping_pong_exchanges); query_coordinator.add_statistics_exchanges(request_exchanges)?; @@ -672,6 +691,25 @@ impl DataExchangeManager { } } + pub fn get_or_create_exchange_channel_set( + &self, + query_id: &str, + channel_id: &str, + num_threads: usize, + ) -> Result> { + let queries_coordinator_guard = self.queries_coordinator.lock(); + let queries_coordinator = unsafe { &mut *queries_coordinator_guard.deref().get() }; + + match queries_coordinator.entry(query_id.to_string()) { + Entry::Occupied(mut v) => v + .get_mut() + .get_or_create_inbound_channel_set(channel_id, num_threads), + Entry::Vacant(v) => v + .insert(QueryCoordinator::create()) + .get_or_create_inbound_channel_set(channel_id, num_threads), + } + } + /// Take the PingPongExchanges for a given query and channel. /// /// Returns the exchanges that were created during init_query_env. @@ -1034,6 +1072,41 @@ impl QueryCoordinator { } } + pub fn register_inbound_channel_sets( + &mut self, + channel_sizes: &HashMap, + ) -> Result<()> { + for (channel_id, num_threads) in channel_sizes { + self.get_or_create_inbound_channel_set(channel_id, *num_threads)?; + } + + Ok(()) + } + + pub fn get_or_create_inbound_channel_set( + &mut self, + channel_id: &str, + num_threads: usize, + ) -> Result> { + match self.inbound_channel_sets.entry(channel_id.to_string()) { + Entry::Occupied(v) => { + if v.get().channels.len() != num_threads { + return Err(ErrorCode::Internal(format!( + "Mismatched inbound channel set parallelism, channel_id: {}, expected: {}, actual: {}", + channel_id, + num_threads, + v.get().channels.len() + ))); + } + + Ok(v.get().clone()) + } + Entry::Vacant(v) => Ok(v + .insert(Arc::new(NetworkInboundChannelSet::new(num_threads))) + .clone()), + } + } + /// Create a NetworkInboundSender for a new do_exchange connection. /// /// The `num_threads` value is provided by the coordinator via DoExchangeParams. diff --git a/src/query/service/src/servers/flight/v1/exchange/exchange_sink.rs b/src/query/service/src/servers/flight/v1/exchange/exchange_sink.rs index a0d7119ec4b6b..86a2c27db1482 100644 --- a/src/query/service/src/servers/flight/v1/exchange/exchange_sink.rs +++ b/src/query/service/src/servers/flight/v1/exchange/exchange_sink.rs @@ -156,7 +156,11 @@ impl ExchangeSink { let exchange_id = ¶ms.exchange_id; let exchange_manager = DataExchangeManager::instance(); - let channel_set = exchange_manager.get_exchange_channel_set(query_id, exchange_id)?; + let channel_set = exchange_manager.get_or_create_exchange_channel_set( + query_id, + exchange_id, + local_threads, + )?; assert_eq!(channel_set.channels.len(), local_threads); let local_outbound = create_local_channels(&channel_set); diff --git a/src/query/service/src/servers/flight/v1/exchange/exchange_source.rs b/src/query/service/src/servers/flight/v1/exchange/exchange_source.rs index e772094671f2a..c9b4c0a5635d0 100644 --- a/src/query/service/src/servers/flight/v1/exchange/exchange_source.rs +++ b/src/query/service/src/servers/flight/v1/exchange/exchange_source.rs @@ -62,6 +62,22 @@ fn add_source_pipe(pipeline: &mut Pipeline, source_items: Vec) { pipeline.add_pipe(Pipe::create(last_output_len, items.len(), items)); } +fn local_exchange_threads( + executor_id: &str, + destination_channels: &[(String, Vec)], +) -> Result { + destination_channels + .iter() + .find_map(|(destination, channels)| (destination == executor_id).then_some(channels.len())) + .filter(|threads| *threads > 0) + .ok_or_else(|| { + ErrorCode::Internal(format!( + "Executor {} is not a destination of exchange", + executor_id + )) + }) +} + /// Add Exchange Source to the pipeline. pub fn via_exchange_source( ctx: Arc, @@ -117,8 +133,13 @@ pub fn via_hash_exchange_source( let query_id = ¶ms.query_id; let exchange_id = ¶ms.exchange_id; let exchange_manager = DataExchangeManager::instance(); + let local_threads = local_exchange_threads(¶ms.executor_id, ¶ms.destination_channels)?; - let channel_set = exchange_manager.get_exchange_channel_set(query_id, exchange_id)?; + let channel_set = exchange_manager.get_or_create_exchange_channel_set( + query_id, + exchange_id, + local_threads, + )?; let waker = pipeline.get_waker(); let num_receivers = channel_set.channels.len(); @@ -137,12 +158,17 @@ pub fn via_hash_exchange_source( } pub fn via_broadcast_exchange_source( + _ctx: &Arc, params: &BroadcastExchangeParams, pipeline: &mut Pipeline, ) -> Result<()> { let exchange_manager = DataExchangeManager::instance(); - let channel_set = - exchange_manager.get_exchange_channel_set(¶ms.query_id, ¶ms.exchange_id)?; + let local_threads = local_exchange_threads(¶ms.executor_id, ¶ms.destination_channels)?; + let channel_set = exchange_manager.get_or_create_exchange_channel_set( + ¶ms.query_id, + ¶ms.exchange_id, + local_threads, + )?; let waker = pipeline.get_waker(); let mut source_items = Vec::with_capacity(channel_set.channels.len()); @@ -159,27 +185,15 @@ pub fn via_broadcast_exchange_source( } pub fn via_shuffle_exchange_source( - ctx: &Arc, + _ctx: &Arc, params: &ShuffleExchangeParams, injector: Arc, pipeline: &mut Pipeline, ) -> Result<()> { - let local_channels = params - .destination_channels - .iter() - .find_map(|(destination, channels)| { - (destination == ¶ms.executor_id).then_some(channels.len()) - }) - .unwrap_or_default(); - - if local_channels == 0 { - return Err(ErrorCode::Internal(format!( - "Executor {} is not a destination of fragment {} exchange", - params.executor_id, params.fragment_id - ))); - } + let _local_channels = + local_exchange_threads(¶ms.executor_id, ¶ms.destination_channels)?; - let exchange_manager = ctx.get_exchange_manager(); + let exchange_manager = DataExchangeManager::instance(); let exchange_params = ExchangeParams::NodeShuffleExchange(params.clone()); let flight_receivers = exchange_manager.get_flight_receiver(&exchange_params)?; let mut source_items = Vec::with_capacity(flight_receivers.len()); @@ -255,6 +269,7 @@ pub fn via_remote_exchange_source( pipeline, ), DataExchange::Broadcast(exchange) => via_broadcast_exchange_source( + &ctx, &create_broadcast_source_params(&ctx, query_id, schema, exchange), pipeline, ), diff --git a/src/query/service/src/servers/flight/v1/exchange/exchange_transform.rs b/src/query/service/src/servers/flight/v1/exchange/exchange_transform.rs index 6b412ec32d291..2c095b5bbe312 100644 --- a/src/query/service/src/servers/flight/v1/exchange/exchange_transform.rs +++ b/src/query/service/src/servers/flight/v1/exchange/exchange_transform.rs @@ -144,7 +144,11 @@ impl ExchangeTransform { let exchange_id = ¶ms.exchange_id; let exchange_manager = DataExchangeManager::instance(); - let channel_set = exchange_manager.get_exchange_channel_set(query_id, exchange_id)?; + let channel_set = exchange_manager.get_or_create_exchange_channel_set( + query_id, + exchange_id, + local_threads, + )?; assert_eq!(channel_set.channels.len(), local_threads); @@ -205,7 +209,11 @@ impl ExchangeTransform { let exchange_id = ¶ms.exchange_id; let exchange_manager = DataExchangeManager::instance(); - let channel_set = exchange_manager.get_exchange_channel_set(query_id, exchange_id)?; + let channel_set = exchange_manager.get_or_create_exchange_channel_set( + query_id, + exchange_id, + local_threads, + )?; assert_eq!(channel_set.channels.len(), local_threads); let local_outbound = create_local_channels(&channel_set); From 1b99d656230ad1265c3a3c1e312f5d5f145c8a70 Mon Sep 17 00:00:00 2001 From: sky <75521613+SkyFan2002@users.noreply.github.com> Date: Sun, 12 Apr 2026 15:04:46 +0800 Subject: [PATCH 4/8] fix(exchange): validate inbound sender parallelism Route inbound sender creation through the same channel-set guard used by pre-registration so mismatched exchange parallelism fails explicitly. Also add QueryCoordinator unit tests covering inbound channel set registration and mismatch detection. --- .../flight/v1/exchange/exchange_manager.rs | 78 +++++++++++++++++-- 1 file changed, 73 insertions(+), 5 deletions(-) diff --git a/src/query/service/src/servers/flight/v1/exchange/exchange_manager.rs b/src/query/service/src/servers/flight/v1/exchange/exchange_manager.rs index 69eeb3ea05950..f0f2044ee4c9f 100644 --- a/src/query/service/src/servers/flight/v1/exchange/exchange_manager.rs +++ b/src/query/service/src/servers/flight/v1/exchange/exchange_manager.rs @@ -1115,11 +1115,7 @@ impl QueryCoordinator { channel_id: &str, num_threads: usize, ) -> Result { - let channel_set = self - .inbound_channel_sets - .entry(channel_id.to_string()) - .or_insert_with(|| Arc::new(NetworkInboundChannelSet::new(num_threads))) - .clone(); + let channel_set = self.get_or_create_inbound_channel_set(channel_id, num_threads)?; // TODO: get max_bytes_per_connection from query settings let max_bytes_per_connection = 20 * 1024 * 1024; // 20MB @@ -1458,3 +1454,75 @@ impl FragmentCoordinator { Ok(()) } } + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + + use databend_common_exception::Result; + + use super::QueryCoordinator; + + #[test] + fn test_query_coordinator_register_inbound_channel_sets() -> Result<()> { + let mut coordinator = QueryCoordinator::create(); + let channel_sizes = HashMap::from([ + ("exchange-a".to_string(), 2_usize), + ("exchange-b".to_string(), 4_usize), + ]); + + coordinator.register_inbound_channel_sets(&channel_sizes)?; + + assert_eq!( + coordinator + .get_or_create_inbound_channel_set("exchange-a", 2)? + .channels + .len(), + 2 + ); + assert_eq!( + coordinator + .get_or_create_inbound_channel_set("exchange-b", 4)? + .channels + .len(), + 4 + ); + + Ok(()) + } + + #[test] + fn test_query_coordinator_detects_mismatched_inbound_parallelism() -> Result<()> { + let mut coordinator = QueryCoordinator::create(); + coordinator.get_or_create_inbound_channel_set("exchange-a", 2)?; + + let err = coordinator + .get_or_create_inbound_channel_set("exchange-a", 1) + .err() + .expect("mismatched inbound channel set should fail"); + assert!( + err.message() + .contains("Mismatched inbound channel set parallelism") + ); + + Ok(()) + } + + #[test] + fn test_query_coordinator_detects_mismatched_inbound_sender_parallelism() -> Result<()> { + let mut coordinator = QueryCoordinator::create(); + coordinator + .register_inbound_channel_sets(&HashMap::from([("exchange-a".to_string(), 2)]))?; + + let err = coordinator + .create_inbound_sender("exchange-a", 1) + .err() + .expect("mismatched inbound sender should fail"); + assert!( + err.message() + .contains("Mismatched inbound channel set parallelism") + ); + + Ok(()) + } +} From 3637b964036eca411c80937f1bbf460c8a94bfac Mon Sep 17 00:00:00 2001 From: sky <75521613+SkyFan2002@users.noreply.github.com> Date: Sun, 12 Apr 2026 16:55:15 +0800 Subject: [PATCH 5/8] fix(exchange): handle remote-only leftovers - add a send-only broadcast sink for remote leftover fragments - allow shuffle exchanges to consume only existing remote receivers - validate with cluster cte, window_ntile, and generate_series tests --- .../flight/v1/exchange/broadcast_send_sink.rs | 74 +++++++++++++++++++ .../flight/v1/exchange/exchange_manager.rs | 9 --- .../flight/v1/exchange/exchange_params.rs | 13 ++-- .../flight/v1/exchange/exchange_sink.rs | 24 +++++- .../src/servers/flight/v1/exchange/mod.rs | 2 + 5 files changed, 103 insertions(+), 19 deletions(-) create mode 100644 src/query/service/src/servers/flight/v1/exchange/broadcast_send_sink.rs diff --git a/src/query/service/src/servers/flight/v1/exchange/broadcast_send_sink.rs b/src/query/service/src/servers/flight/v1/exchange/broadcast_send_sink.rs new file mode 100644 index 0000000000000..b77cc0269fffa --- /dev/null +++ b/src/query/service/src/servers/flight/v1/exchange/broadcast_send_sink.rs @@ -0,0 +1,74 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use databend_common_exception::Result; +use databend_common_expression::DataBlock; +use databend_common_pipeline::core::InputPort; +use databend_common_pipeline::core::PipeItem; +use databend_common_pipeline::core::Processor; +use databend_common_pipeline::core::ProcessorPtr; +use databend_common_pipeline::sinks::AsyncSink; +use databend_common_pipeline::sinks::AsyncSinker; + +use crate::servers::flight::v1::network::OutboundChannel; + +pub struct BroadcastExchangeSink { + channels: Vec>, +} + +impl BroadcastExchangeSink { + pub fn create( + input: Arc, + channels: Vec>, + ) -> Box { + AsyncSinker::create(input, BroadcastExchangeSink { channels }) + } +} + +#[async_trait::async_trait] +impl AsyncSink for BroadcastExchangeSink { + const NAME: &'static str = "BroadcastExchangeSink"; + + #[async_backtrace::framed] + async fn on_finish(&mut self) -> Result<()> { + for channel in &self.channels { + channel.close(); + } + Ok(()) + } + + #[async_backtrace::framed] + async fn consume(&mut self, data_block: DataBlock) -> Result { + let mut futures = Vec::with_capacity(self.channels.len()); + for channel in &self.channels { + let channel = channel.clone(); + let data_block = data_block.clone(); + futures.push(async move { channel.add_block(data_block).await }); + } + + futures::future::try_join_all(futures).await?; + Ok(false) + } +} + +pub fn create_broadcast_sink_item(channels: Vec>) -> PipeItem { + let input = InputPort::create(); + PipeItem::create( + ProcessorPtr::create(BroadcastExchangeSink::create(input.clone(), channels)), + vec![input], + vec![], + ) +} diff --git a/src/query/service/src/servers/flight/v1/exchange/exchange_manager.rs b/src/query/service/src/servers/flight/v1/exchange/exchange_manager.rs index f0f2044ee4c9f..76a1a7b3d0b83 100644 --- a/src/query/service/src/servers/flight/v1/exchange/exchange_manager.rs +++ b/src/query/service/src/servers/flight/v1/exchange/exchange_manager.rs @@ -340,11 +340,6 @@ impl DataExchangeManager { let address = target.flight_address.clone(); let keep_alive_params = keep_alive; let num_threads = channels.len(); - warn!( - "do_exchange: node={} -> target={}, exchange_id={}, num_threads={}", - config.query.node_id, target_id, exchange_id, num_threads - ); - flight_exchanges.push(Box::pin(async move { let (send_tx, response_stream) = { let mut flight_client = @@ -649,10 +644,6 @@ impl DataExchangeManager { channel_id: &str, num_threads: usize, ) -> Result { - warn!( - "handle_do_exchange: query_id={}, channel_id={}, num_threads={}", - query_id, channel_id, num_threads - ); let queries_coordinator_guard = self.queries_coordinator.lock(); let queries_coordinator = unsafe { &mut *queries_coordinator_guard.deref().get() }; diff --git a/src/query/service/src/servers/flight/v1/exchange/exchange_params.rs b/src/query/service/src/servers/flight/v1/exchange/exchange_params.rs index f9bb7e15ac774..7a41fce7e4548 100644 --- a/src/query/service/src/servers/flight/v1/exchange/exchange_params.rs +++ b/src/query/service/src/servers/flight/v1/exchange/exchange_params.rs @@ -251,13 +251,12 @@ impl ShuffleExchangeParams { for (destination, channels) in &self.destination_channels { if destination == &self.executor_id { for channel in channels { - let Some(receivers) = receivers.remove(channel) else { - return Err(ErrorCode::UnknownFragmentExchange(format!( - "Unknown fragment flight receiver, {}, {}", - self.executor_id, self.fragment_id - ))); - }; - exchanges.extend(receivers.into_iter()); + // Local data stays in-process through the dummy branch in + // `ExchangeTransform::node_shuffle`, so a receiver only exists + // when some remote executor actually sends to this destination. + if let Some(receivers) = receivers.remove(channel) { + exchanges.extend(receivers.into_iter()); + } } } } diff --git a/src/query/service/src/servers/flight/v1/exchange/exchange_sink.rs b/src/query/service/src/servers/flight/v1/exchange/exchange_sink.rs index 86a2c27db1482..c281da89fc43f 100644 --- a/src/query/service/src/servers/flight/v1/exchange/exchange_sink.rs +++ b/src/query/service/src/servers/flight/v1/exchange/exchange_sink.rs @@ -24,6 +24,7 @@ use databend_common_pipeline::core::PipeItem; use databend_common_pipeline::core::Pipeline; use databend_common_pipeline::core::ProcessorPtr; +use super::broadcast_send_sink::create_broadcast_sink_item; use super::exchange_params::BroadcastExchangeParams; use super::exchange_params::ExchangeParams; use super::exchange_params::GlobalExchangeParams; @@ -101,9 +102,9 @@ impl ExchangeSink { pipeline.add_pipe(Pipe::create(output, 0, items)); Ok(()) } - ExchangeParams::BroadcastExchange(_) => Err(ErrorCode::Internal( - "BroadcastExchange should not appear on the sink side", - )), + ExchangeParams::BroadcastExchange(params) => { + Self::broadcast_exchange_sink(ctx, pipeline, params) + } ExchangeParams::NodeShuffleExchange(params) => { exchange_shuffle(ctx, params, pipeline)?; @@ -188,6 +189,23 @@ impl ExchangeSink { pipeline.add_pipe(Pipe::create(local_threads, 0, items)); Ok(()) } + + fn broadcast_exchange_sink( + ctx: &Arc, + pipeline: &mut Pipeline, + params: &BroadcastExchangeParams, + ) -> Result<()> { + let compression = ctx.get_settings().get_query_flight_compression()?; + let channels = build_broadcast_outbound_channels(params, Vec::new(), compression)?; + let output_len = pipeline.output_len(); + + let items = (0..output_len) + .map(|_| create_broadcast_sink_item(channels.clone())) + .collect::>(); + + pipeline.add_pipe(Pipe::create(output_len, 0, items)); + Ok(()) + } } struct SinkExchangeSorting; diff --git a/src/query/service/src/servers/flight/v1/exchange/mod.rs b/src/query/service/src/servers/flight/v1/exchange/mod.rs index 3425f2d4391ca..e43d20533c83c 100644 --- a/src/query/service/src/servers/flight/v1/exchange/mod.rs +++ b/src/query/service/src/servers/flight/v1/exchange/mod.rs @@ -13,6 +13,7 @@ // limitations under the License. mod broadcast_recv_transform; +mod broadcast_send_sink; mod broadcast_send_transform; mod data_exchange; mod exchange_injector; @@ -37,6 +38,7 @@ pub mod serde; pub use broadcast_recv_transform::ExchangeRecvTransform; pub type BroadcastRecvTransform = ExchangeRecvTransform; pub type HashRecvTransform = ExchangeRecvTransform; +pub use broadcast_send_sink::BroadcastExchangeSink; pub use broadcast_send_transform::BroadcastSendTransform; pub use data_exchange::BroadcastExchange; pub use data_exchange::DataExchange; From 2ce148451111b3bf07049d6e6bff0c9f5d43b073 Mon Sep 17 00:00:00 2001 From: sky <75521613+SkyFan2002@users.noreply.github.com> Date: Sun, 12 Apr 2026 22:56:33 +0800 Subject: [PATCH 6/8] fix(distributed): finish remote-only cluster queries Schedule empty and singleton source fragments on the coordinator once real distributed work is absent so sink-side broadcast exchanges are not materialized for remote-only receive flows. Let pulling executors surface EOF after the execution graph has finished, and bound statistics receiver shutdown so zero-row cluster queries can fully tear down. Add cluster regressions for coordinator-only distributed paths. --- .../src/pipelines/executor/executor_graph.rs | 22 +- .../pipelines/executor/pipeline_executor.rs | 11 + .../executor/pipeline_pulling_executor.rs | 27 ++ .../executor/query_pipeline_executor.rs | 4 + .../src/schedulers/fragments/plan_fragment.rs | 251 ++++++++++++++++- .../fragments/query_fragment_actions.rs | 261 +++++++++++++++++- .../flight/v1/exchange/exchange_manager.rs | 53 ++++ .../flight/v1/exchange/statistics_receiver.rs | 29 +- .../src/stream/processor_executor_stream.rs | 6 + .../service/tests/it/distributed/cluster.rs | 31 ++- 10 files changed, 674 insertions(+), 21 deletions(-) diff --git a/src/query/service/src/pipelines/executor/executor_graph.rs b/src/query/service/src/pipelines/executor/executor_graph.rs index 7e272dbdb7b29..fe06765a1d8c5 100644 --- a/src/query/service/src/pipelines/executor/executor_graph.rs +++ b/src/query/service/src/pipelines/executor/executor_graph.rs @@ -886,9 +886,7 @@ impl RunningGraph { } pub fn assert_finished_graph(&self) -> Result<()> { - let finished_nodes = self.0.finished_nodes.load(Ordering::SeqCst); - - match finished_nodes >= self.0.graph.node_count() { + match self.is_all_nodes_finished() { true => Ok(()), false => Err(ErrorCode::Internal(format!( "Pipeline graph is not finished, details: {}", @@ -899,7 +897,23 @@ impl RunningGraph { /// Checks if all nodes in the graph are finished. pub fn is_all_nodes_finished(&self) -> bool { - self.0.finished_nodes.load(Ordering::SeqCst) >= self.0.graph.node_count() + let node_count = self.0.graph.node_count(); + if self.0.finished_nodes.load(Ordering::SeqCst) >= node_count { + return true; + } + + let all_finished = self.0.graph.node_indices().all(|node_index| { + matches!( + *self.0.graph[node_index].state.lock().unwrap(), + State::Finished + ) + }); + + if all_finished { + self.0.finished_nodes.store(node_count, Ordering::SeqCst); + } + + all_finished } /// Flag the graph should finish and no more tasks should be scheduled. diff --git a/src/query/service/src/pipelines/executor/pipeline_executor.rs b/src/query/service/src/pipelines/executor/pipeline_executor.rs index 7b31339232c46..78e10530e077f 100644 --- a/src/query/service/src/pipelines/executor/pipeline_executor.rs +++ b/src/query/service/src/pipelines/executor/pipeline_executor.rs @@ -267,6 +267,17 @@ impl PipelineExecutor { } } + pub fn is_all_nodes_finished(&self) -> bool { + match self { + PipelineExecutor::QueryPipelineExecutor(executor) => { + executor.graph.is_all_nodes_finished() + } + PipelineExecutor::QueriesPipelineExecutor(query_wrapper) => { + query_wrapper.graph.is_all_nodes_finished() + } + } + } + pub fn format_graph_nodes(&self) -> String { match self { PipelineExecutor::QueryPipelineExecutor(executor) => executor.format_graph_nodes(), diff --git a/src/query/service/src/pipelines/executor/pipeline_pulling_executor.rs b/src/query/service/src/pipelines/executor/pipeline_pulling_executor.rs index b975aea68895d..05d5b1ec701a0 100644 --- a/src/query/service/src/pipelines/executor/pipeline_pulling_executor.rs +++ b/src/query/service/src/pipelines/executor/pipeline_pulling_executor.rs @@ -83,6 +83,16 @@ impl State { } } + pub fn wait_finish_timeout(&self, timeout: Duration) -> bool { + let mut mutex = self.finish_mutex.lock(); + + if !*mutex { + self.finish_condvar.wait_for(&mut mutex, timeout); + } + + *mutex + } + pub fn is_finished(&self) -> bool { self.is_finished.load(Ordering::Relaxed) } @@ -228,6 +238,23 @@ impl PipelinePullingExecutor { ))); } + if self.executor.is_all_nodes_finished() { + if !self.executor.is_finished() { + self.executor.finish::<()>(None); + } + + // Once the graph itself is finished, the pulling side has already + // consumed the full result. The detached executor thread may still be + // draining finish hooks or joining workers; do a short best-effort wait + // for background errors, but do not block EOF on it. + let _ = self.state.wait_finish_timeout(Duration::from_millis(100)); + + return match self.state.try_get_catch_error() { + None => Ok(None), + Some(error) => Err(error), + }; + } + continue; } Err(RecvTimeoutError::Disconnected) => { diff --git a/src/query/service/src/pipelines/executor/query_pipeline_executor.rs b/src/query/service/src/pipelines/executor/query_pipeline_executor.rs index 37e1d07614683..f90f977c0d3fd 100644 --- a/src/query/service/src/pipelines/executor/query_pipeline_executor.rs +++ b/src/query/service/src/pipelines/executor/query_pipeline_executor.rs @@ -478,6 +478,10 @@ impl QueryPipelineExecutor { } } } + + if graph.is_all_nodes_finished() { + self.finish::<()>(None); + } } Err(error_type) => { let cause = error_type.get_error_code(); diff --git a/src/query/service/src/schedulers/fragments/plan_fragment.rs b/src/query/service/src/schedulers/fragments/plan_fragment.rs index e77cbcb30cd8a..0a3a5327aeef8 100644 --- a/src/query/service/src/schedulers/fragments/plan_fragment.rs +++ b/src/query/service/src/schedulers/fragments/plan_fragment.rs @@ -16,6 +16,7 @@ use std::any::Any; use std::collections::HashMap; use std::sync::Arc; +use databend_common_catalog::plan::DataSourceInfo; use databend_common_catalog::plan::DataSourcePlan; use databend_common_catalog::plan::Partitions; use databend_common_catalog::plan::ReclusterTask; @@ -82,6 +83,49 @@ pub struct PlanFragment { } impl PlanFragment { + fn schedule_scope(&self) -> Result { + match &self.fragment_type { + FragmentType::Root => Ok(FragmentScheduleScope::LocalOnly), + FragmentType::Intermediate => { + let has_merge_input = self + .source_fragments + .iter() + .any(|fragment| matches!(&fragment.exchange, Some(DataExchange::Merge(_)))); + let all_sources_local_only = !self.source_fragments.is_empty() + && self + .source_fragments + .iter() + .map(PlanFragment::schedule_scope) + .collect::>>()? + .into_iter() + .all(|scope| scope == FragmentScheduleScope::LocalOnly); + + if should_schedule_intermediate_fragment_locally( + has_merge_input, + !self.source_fragments.is_empty(), + all_sources_local_only, + ) { + Ok(FragmentScheduleScope::LocalOnly) + } else { + Ok(FragmentScheduleScope::Distributed) + } + } + FragmentType::Source => { + let data_sources = self.collect_data_sources()?; + let action_state = collect_source_action_state(data_sources.values()); + if matches!(action_state, SourceActionState::NonEmpty) { + Ok(FragmentScheduleScope::Distributed) + } else { + Ok(FragmentScheduleScope::LocalOnly) + } + } + FragmentType::MutationSource + | FragmentType::ReplaceInto + | FragmentType::Compact + | FragmentType::Recluster => Ok(FragmentScheduleScope::Distributed), + } + } + pub fn get_actions( &self, ctx: Arc, @@ -102,13 +146,10 @@ impl PlanFragment { fragment_actions.add_action(action); } FragmentType::Intermediate => { - if self - .source_fragments - .iter() - .any(|fragment| matches!(&fragment.exchange, Some(DataExchange::Merge(_)))) - { - // If this is a intermediate fragment with merge input, - // we will only send it to coordinator node. + if self.schedule_scope()? == FragmentScheduleScope::LocalOnly { + // If this intermediate fragment has merge input, + // or all its upstream work is already coordinator-only, + // we only send it to coordinator node. let action = QueryFragmentAction::create( Fragmenter::get_local_executor(ctx), self.plan.clone(), @@ -161,7 +202,7 @@ impl PlanFragment { let data_sources = self.collect_data_sources()?; - let executors = Fragmenter::get_executors_nodes(ctx); + let executors = Fragmenter::get_executors_nodes(ctx.clone()); let mut executor_partitions: HashMap> = HashMap::new(); @@ -213,7 +254,31 @@ impl PlanFragment { } } + let has_non_empty_action = executor_partitions.values().any(|sources| { + matches!( + collect_source_action_state(sources.values()), + SourceActionState::NonEmpty + ) + }); + let has_singleton_action = executor_partitions.values().any(|sources| { + matches!( + collect_source_action_state(sources.values()), + SourceActionState::Singleton + ) + }); + let local_executor = Fragmenter::get_local_executor(ctx); + for (executor, sources) in executor_partitions { + let action_state = collect_source_action_state(sources.values()); + if !should_schedule_source_action( + has_non_empty_action, + has_singleton_action, + action_state, + executor == local_executor, + ) { + continue; + } + // Replace `ReadDataSourcePlan` with rewritten one and generate new fragment for it. let mut handle = ReadSourceDeriveHandle::create(sources); let plan = self.plan.derive_with(&mut handle); @@ -564,6 +629,93 @@ enum DataSource { ConstTable(ConstTableColumn), } +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +enum FragmentScheduleScope { + LocalOnly, + Distributed, +} + +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +enum SourceActionState { + Empty, + Singleton, + NonEmpty, +} + +impl DataSource { + fn scheduling_state(&self) -> SourceActionState { + match self { + DataSource::Table(plan) => { + if is_singleton_system_one_source(plan) { + SourceActionState::Singleton + } else if !plan.parts.is_empty() { + SourceActionState::NonEmpty + } else { + SourceActionState::Empty + } + } + DataSource::ConstTable(columns) => { + if columns.num_rows > 0 { + SourceActionState::NonEmpty + } else { + SourceActionState::Empty + } + } + } + } +} + +fn is_singleton_system_one_source(plan: &DataSourcePlan) -> bool { + matches!( + &plan.source_info, + DataSourceInfo::TableSource(table_info) + if table_info.meta.engine == "SystemOne" && table_info.name == "one" + ) +} + +fn collect_source_action_state<'a>( + sources: impl IntoIterator, +) -> SourceActionState { + let mut has_singleton = false; + + for source in sources { + match source.scheduling_state() { + SourceActionState::NonEmpty => return SourceActionState::NonEmpty, + SourceActionState::Singleton => has_singleton = true, + SourceActionState::Empty => {} + } + } + + if has_singleton { + SourceActionState::Singleton + } else { + SourceActionState::Empty + } +} + +fn should_schedule_source_action( + has_non_empty_action: bool, + has_singleton_action: bool, + action_state: SourceActionState, + is_local_executor: bool, +) -> bool { + match action_state { + SourceActionState::NonEmpty => true, + SourceActionState::Singleton => !has_non_empty_action && is_local_executor, + SourceActionState::Empty => { + !has_non_empty_action && !has_singleton_action && is_local_executor + } + } +} + +fn should_schedule_intermediate_fragment_locally( + has_merge_input: bool, + has_sources: bool, + all_sources_local_only: bool, +) -> bool { + has_merge_input || (has_sources && all_sources_local_only) +} + impl TryFrom for DataSourcePlan { type Error = ErrorCode; @@ -649,6 +801,89 @@ impl DeriveHandle for ReadSourceDeriveHandle { } } +#[cfg(test)] +mod tests { + use super::SourceActionState; + use super::should_schedule_intermediate_fragment_locally; + use super::should_schedule_source_action; + + #[test] + fn test_skip_empty_source_actions_once_real_work_exists() { + assert!(should_schedule_source_action( + true, + false, + SourceActionState::NonEmpty, + false + )); + assert!(!should_schedule_source_action( + true, + false, + SourceActionState::Empty, + true + )); + assert!(!should_schedule_source_action( + true, + false, + SourceActionState::Empty, + false + )); + } + + #[test] + fn test_keep_single_local_fallback_for_all_empty_sources() { + assert!(should_schedule_source_action( + false, + false, + SourceActionState::Empty, + true + )); + assert!(!should_schedule_source_action( + false, + false, + SourceActionState::Empty, + false + )); + } + + #[test] + fn test_keep_singleton_sources_local_only() { + assert!(should_schedule_source_action( + false, + true, + SourceActionState::Singleton, + true + )); + assert!(!should_schedule_source_action( + false, + true, + SourceActionState::Singleton, + false + )); + assert!(!should_schedule_source_action( + false, + true, + SourceActionState::Empty, + true + )); + } + + #[test] + fn test_localize_intermediate_fragment_when_all_inputs_are_local_only() { + assert!(should_schedule_intermediate_fragment_locally( + false, true, true + )); + assert!(should_schedule_intermediate_fragment_locally( + true, true, false + )); + assert!(!should_schedule_intermediate_fragment_locally( + false, true, false + )); + assert!(!should_schedule_intermediate_fragment_locally( + false, false, true + )); + } +} + struct ReclusterDeriveHandle { tasks: Vec, } diff --git a/src/query/service/src/schedulers/fragments/query_fragment_actions.rs b/src/query/service/src/schedulers/fragments/query_fragment_actions.rs index f1019fdc11207..d22d21128e6b0 100644 --- a/src/query/service/src/schedulers/fragments/query_fragment_actions.rs +++ b/src/query/service/src/schedulers/fragments/query_fragment_actions.rs @@ -12,12 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::any::Any; use std::collections::HashMap; use std::collections::hash_map::Entry; use std::fmt::Debug; use std::fmt::Formatter; use std::sync::Arc; +use databend_common_catalog::plan::DataSourceInfo; use databend_common_config::GlobalConfig; use databend_common_exception::ErrorCode; use databend_common_exception::Result; @@ -25,9 +27,12 @@ use databend_common_expression::DataSchemaRef; use databend_meta_client::types::NodeInfo; use crate::clusters::ClusterHelper; +use crate::physical_plans::ConstantTableScan; use crate::physical_plans::ExchangeSink; use crate::physical_plans::PhysicalPlan; use crate::physical_plans::PhysicalPlanCast; +use crate::physical_plans::PhysicalPlanVisitor; +use crate::physical_plans::TableScan; use crate::servers::flight::v1::exchange::DataExchange; use crate::servers::flight::v1::packets::DataflowDiagramBuilder; use crate::servers::flight::v1::packets::QueryEnv; @@ -102,6 +107,46 @@ impl QueryFragmentActions { Ok(actions_schema[0].clone()) } + + fn prune_empty_source_actions(&mut self, local_executor: &str) -> Result<()> { + let states = self + .fragment_actions + .iter() + .map(|action| detect_source_action_state(&action.physical_plan)) + .collect::>>()?; + + if !states + .iter() + .any(|state| !matches!(state, SourceActionState::NoSource)) + { + return Ok(()); + } + + let has_non_empty_action = states + .iter() + .any(|state| matches!(state, SourceActionState::NonEmpty)); + let has_singleton_action = states + .iter() + .any(|state| matches!(state, SourceActionState::Singleton)); + + self.fragment_actions = self + .fragment_actions + .drain(..) + .zip(states) + .filter_map(|(action, state)| { + let keep = should_keep_source_action( + has_non_empty_action, + has_singleton_action, + state, + action.executor == local_executor, + ); + + keep.then_some(action) + }) + .collect(); + + Ok(()) + } } pub struct QueryFragmentsActions { @@ -152,12 +197,15 @@ impl QueryFragmentsActions { } pub fn add_fragment_actions(&mut self, actions: QueryFragmentActions) -> Result<()> { + let mut actions = actions; + actions.prune_empty_source_actions(&self.get_local_executor())?; self.fragments_actions.push(actions); Ok(()) } pub fn add_fragments_actions(&mut self, actions: QueryFragmentsActions) -> Result<()> { - for fragment_actions in actions.fragments_actions.into_iter() { + for mut fragment_actions in actions.fragments_actions.into_iter() { + fragment_actions.prune_empty_source_actions(&self.get_local_executor())?; self.fragments_actions.push(fragment_actions); } @@ -176,8 +224,11 @@ impl QueryFragmentsActions { } pub fn get_query_fragments(&self) -> Result> { + let mut executors_fragments = self.get_executors_fragments()?; + prune_query_fragments(&mut executors_fragments, &self.get_local_executor())?; + let mut query_fragments = HashMap::new(); - for (executor, fragments) in self.get_executors_fragments() { + for (executor, fragments) in executors_fragments { query_fragments.insert(executor, QueryFragments { query_id: self.ctx.get_id(), fragments, @@ -276,7 +327,7 @@ impl QueryFragmentsActions { nodes_info } - fn get_executors_fragments(&self) -> HashMap> { + fn get_executors_fragments(&self) -> Result>> { let mut fragments_packets = HashMap::new(); for fragment_actions in &self.fragments_actions { for fragment_action in &fragment_actions.fragment_actions { @@ -297,7 +348,7 @@ impl QueryFragmentsActions { } } - fragments_packets + Ok(fragments_packets) } } @@ -308,3 +359,205 @@ impl Debug for QueryFragmentsActions { .finish() } } + +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +enum SourceActionState { + NoSource, + Empty, + Singleton, + NonEmpty, +} + +fn should_keep_source_action( + has_non_empty_action: bool, + has_singleton_action: bool, + state: SourceActionState, + is_local_executor: bool, +) -> bool { + match state { + SourceActionState::NoSource | SourceActionState::NonEmpty => true, + SourceActionState::Singleton => !has_non_empty_action && is_local_executor, + SourceActionState::Empty => { + !has_non_empty_action && !has_singleton_action && is_local_executor + } + } +} + +fn is_singleton_system_one_source(scan: &TableScan) -> bool { + matches!( + &scan.source.source_info, + DataSourceInfo::TableSource(table_info) + if table_info.meta.engine == "SystemOne" && table_info.name == "one" + ) +} + +fn detect_source_action_state(plan: &PhysicalPlan) -> Result { + struct SourceActionVisitor { + has_source: bool, + has_singleton_source: bool, + has_non_empty_source: bool, + } + + impl SourceActionVisitor { + fn create() -> Box { + Box::new(SourceActionVisitor { + has_source: false, + has_singleton_source: false, + has_non_empty_source: false, + }) + } + } + + impl PhysicalPlanVisitor for SourceActionVisitor { + fn as_any(&mut self) -> &mut dyn Any { + self + } + + fn visit(&mut self, plan: &PhysicalPlan) -> Result<()> { + if let Some(scan) = TableScan::from_physical_plan(plan) { + self.has_source = true; + if is_singleton_system_one_source(scan) { + self.has_singleton_source = true; + } else if !scan.source.parts.is_empty() { + self.has_non_empty_source = true; + } + } else if let Some(scan) = ConstantTableScan::from_physical_plan(plan) { + self.has_source = true; + if scan.num_rows > 0 { + self.has_non_empty_source = true; + } + } + + Ok(()) + } + } + + let mut visitor = SourceActionVisitor::create(); + plan.visit(&mut visitor)?; + + let visitor = visitor + .as_any() + .downcast_mut::() + .ok_or_else(|| ErrorCode::Internal("Failed to downcast SourceActionVisitor"))?; + + Ok( + match ( + visitor.has_source, + visitor.has_non_empty_source, + visitor.has_singleton_source, + ) { + (false, _, _) => SourceActionState::NoSource, + (true, true, _) => SourceActionState::NonEmpty, + (true, false, true) => SourceActionState::Singleton, + (true, false, false) => SourceActionState::Empty, + }, + ) +} + +fn prune_query_fragments( + executors_fragments: &mut HashMap>, + local_executor: &str, +) -> Result<()> { + let mut fragment_states = HashMap::>::new(); + + for (executor, fragments) in executors_fragments.iter() { + for fragment in fragments { + fragment_states + .entry(fragment.fragment_id) + .or_default() + .push(( + executor.clone(), + detect_source_action_state(&fragment.physical_plan)?, + )); + } + } + + let mut keep_lookup = HashMap::<(usize, String), bool>::new(); + for (fragment_id, states) in fragment_states { + if !states + .iter() + .any(|(_, state)| !matches!(state, SourceActionState::NoSource)) + { + continue; + } + + let has_non_empty_action = states + .iter() + .any(|(_, state)| matches!(state, SourceActionState::NonEmpty)); + let has_singleton_action = states + .iter() + .any(|(_, state)| matches!(state, SourceActionState::Singleton)); + + for (executor, state) in states { + let keep = should_keep_source_action( + has_non_empty_action, + has_singleton_action, + state, + executor == local_executor, + ); + keep_lookup.insert((fragment_id, executor), keep); + } + } + + executors_fragments.retain(|executor, fragments| { + fragments.retain(|fragment| { + keep_lookup + .get(&(fragment.fragment_id, executor.clone())) + .copied() + .unwrap_or(true) + }); + !fragments.is_empty() + }); + + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::SourceActionState; + use super::should_keep_source_action; + + #[test] + fn test_prune_singleton_sources_to_local_executor() { + assert!(should_keep_source_action( + false, + true, + SourceActionState::Singleton, + true + )); + assert!(!should_keep_source_action( + false, + true, + SourceActionState::Singleton, + false + )); + assert!(!should_keep_source_action( + false, + true, + SourceActionState::Empty, + true + )); + } + + #[test] + fn test_keep_non_empty_source_actions_when_real_work_exists() { + assert!(should_keep_source_action( + true, + false, + SourceActionState::NonEmpty, + false + )); + assert!(!should_keep_source_action( + true, + true, + SourceActionState::Singleton, + true + )); + assert!(!should_keep_source_action( + true, + true, + SourceActionState::Empty, + true + )); + } +} diff --git a/src/query/service/src/servers/flight/v1/exchange/exchange_manager.rs b/src/query/service/src/servers/flight/v1/exchange/exchange_manager.rs index 76a1a7b3d0b83..cb8fcf7eafc6e 100644 --- a/src/query/service/src/servers/flight/v1/exchange/exchange_manager.rs +++ b/src/query/service/src/servers/flight/v1/exchange/exchange_manager.rs @@ -58,7 +58,9 @@ use super::statistics_receiver::StatisticsReceiver; use super::statistics_sender::StatisticsSender; use crate::clusters::ClusterHelper; use crate::clusters::FlightParams; +use crate::physical_plans::ExchangeSink as PhysicalExchangeSink; use crate::physical_plans::PhysicalPlan; +use crate::physical_plans::PhysicalPlanCast; use crate::pipelines::PipelineBuildResult; use crate::pipelines::PipelineBuilder; use crate::pipelines::attach_runtime_filter_logger; @@ -1171,6 +1173,12 @@ impl QueryCoordinator { return Ok(Some(fragment_coordinator.pipeline_build_res.unwrap())); } + if should_inline_locally_subscribed_fragment(&fragment_coordinator.physical_plan) { + // Locally subscribed source fragments are merged back into the parent pipeline + // through `ExchangeSource`, so their outgoing ExchangeSink should stay bypassed. + return Ok(Some(fragment_coordinator.pipeline_build_res.unwrap())); + } + let exchange_params = fragment_coordinator .create_exchange_params( info, @@ -1340,6 +1348,10 @@ impl QueryCoordinator { } } +fn should_inline_locally_subscribed_fragment(plan: &PhysicalPlan) -> bool { + PhysicalExchangeSink::from_physical_plan(plan).is_some() +} + struct FragmentCoordinator { initialized: bool, fragment_id: usize, @@ -1451,8 +1463,15 @@ mod tests { use std::collections::HashMap; use databend_common_exception::Result; + use databend_common_expression::DataSchemaRef; + use databend_common_sql::executor::physical_plans::FragmentKind; use super::QueryCoordinator; + use super::should_inline_locally_subscribed_fragment; + use crate::physical_plans::ConstantTableScan; + use crate::physical_plans::ExchangeSink as PhysicalExchangeSink; + use crate::physical_plans::PhysicalPlan; + use crate::physical_plans::PhysicalPlanMeta; #[test] fn test_query_coordinator_register_inbound_channel_sets() -> Result<()> { @@ -1516,4 +1535,38 @@ mod tests { Ok(()) } + + #[test] + fn test_inline_locally_subscribed_exchange_sink_fragments() { + let plan = PhysicalPlan::new(PhysicalExchangeSink { + meta: PhysicalPlanMeta::new("ExchangeSink"), + input: PhysicalPlan::new(ConstantTableScan { + meta: PhysicalPlanMeta::new("ConstantTableScan"), + values: vec![], + num_rows: 0, + output_schema: DataSchemaRef::default(), + }), + schema: DataSchemaRef::default(), + kind: FragmentKind::Merge, + keys: vec![], + destination_fragment_id: 0, + query_id: "test-query".to_string(), + ignore_exchange: false, + allow_adjust_parallelism: true, + }); + + assert!(should_inline_locally_subscribed_fragment(&plan)); + } + + #[test] + fn test_keep_non_sink_fragments_on_exchange_path() { + let plan = PhysicalPlan::new(ConstantTableScan { + meta: PhysicalPlanMeta::new("ConstantTableScan"), + values: vec![], + num_rows: 0, + output_schema: DataSchemaRef::default(), + }); + + assert!(!should_inline_locally_subscribed_fragment(&plan)); + } } diff --git a/src/query/service/src/servers/flight/v1/exchange/statistics_receiver.rs b/src/query/service/src/servers/flight/v1/exchange/statistics_receiver.rs index 07a04905672af..aa888e6d2813a 100644 --- a/src/query/service/src/servers/flight/v1/exchange/statistics_receiver.rs +++ b/src/query/service/src/servers/flight/v1/exchange/statistics_receiver.rs @@ -15,6 +15,7 @@ use std::collections::HashMap; use std::sync::Arc; use std::sync::atomic::Ordering; +use std::time::Duration; use databend_common_base::JoinHandle; use databend_common_base::runtime::Runtime; @@ -32,7 +33,7 @@ use crate::sessions::MemoryUpdater; use crate::sessions::QueryContext; pub struct StatisticsReceiver { - _runtime: Runtime, + runtime: Runtime, shutdown_tx: Option>, exchange_handler: Vec>>, } @@ -126,7 +127,7 @@ impl StatisticsReceiver { Ok(StatisticsReceiver { exchange_handler, - _runtime: runtime, + runtime, shutdown_tx: Some(shutdown_tx), }) } @@ -205,9 +206,29 @@ impl StatisticsReceiver { pub fn wait_shutdown(&mut self) -> Result<()> { let mut exchanges_handler = std::mem::take(&mut self.exchange_handler); - futures::executor::block_on(async move { + self.runtime.block_on(async move { + let deadline = tokio::time::Instant::now() + Duration::from_secs(1); while let Some(exchange_handler) = exchanges_handler.pop() { - exchange_handler.await??; + let mut exchange_handler = exchange_handler; + match tokio::time::timeout_at(deadline, &mut exchange_handler).await { + Ok(join_res) => join_res??, + Err(_) => { + let timeout_count = exchanges_handler.len() + 1; + log::warn!( + "Timed out waiting statistics receivers to shutdown, aborting {} task(s)", + timeout_count + ); + + exchange_handler.abort(); + let _ = exchange_handler.await; + + for exchange_handler in exchanges_handler { + exchange_handler.abort(); + } + + return Ok(()); + } + } } Ok(()) diff --git a/src/query/service/src/stream/processor_executor_stream.rs b/src/query/service/src/stream/processor_executor_stream.rs index 4511ec082ca6c..496ee1254671b 100644 --- a/src/query/service/src/stream/processor_executor_stream.rs +++ b/src/query/service/src/stream/processor_executor_stream.rs @@ -45,6 +45,12 @@ impl PullingExecutorStream { Poll::Ready(Some(Err(cause))) } Ok(Some(data)) => { + if data.num_rows() == 0 && executor.get_inner().is_all_nodes_finished() { + self.end_of_stream = true; + drop(executor); + return Poll::Ready(None); + } + self.executor = Some(executor); Poll::Ready(Some(Ok(data))) } diff --git a/src/query/service/tests/it/distributed/cluster.rs b/src/query/service/tests/it/distributed/cluster.rs index 568c136f402c3..cf02c59c37f39 100644 --- a/src/query/service/tests/it/distributed/cluster.rs +++ b/src/query/service/tests/it/distributed/cluster.rs @@ -21,6 +21,8 @@ use databend_query::servers::flight::FlightService; use databend_query::test_kits::*; use futures_util::TryStreamExt; use tokio::runtime::Builder as TokioRuntimeBuilder; +use tokio::time::Duration; +use tokio::time::timeout; #[test] fn test_simple_cluster() -> anyhow::Result<()> { @@ -89,6 +91,28 @@ fn test_simple_cluster() -> anyhow::Result<()> { blocks.as_slice(), ); } + + // Regression for distributed 0-row scalar-subquery HAVING queries: + // the pulling executor must surface EOF even when the graph finishes + // before the detached executor thread completes its shutdown path. + { + let res = execute_query( + ctx, + "SELECT SUM(42) HAVING (SELECT SUM(42)) > SUM(80)", + ) + .await?; + let blocks = timeout( + Duration::from_secs(5), + res.try_collect::>(), + ) + .await + .map_err(|_| { + ErrorCode::Internal( + "cluster scalar-subquery HAVING query timed out", + ) + })??; + assert!(blocks.is_empty(), "expected empty result, got {blocks:?}"); + } } Ok::<(), ErrorCode>(()) @@ -134,5 +158,10 @@ fn setup_cluster(configs: &[InnerConfig]) -> ClusterDescriptor { &conf.query.common.flight_api_address, ); } - cluster_desc + + let local_id = configs + .last() + .map(|conf| conf.query.common.cluster_id.clone()) + .unwrap_or_default(); + cluster_desc.with_local_id(local_id) } From b3cbcb5e645db34c706a4af110abaae02ad3276d Mon Sep 17 00:00:00 2001 From: sky <75521613+SkyFan2002@users.noreply.github.com> Date: Sun, 12 Apr 2026 23:46:27 +0800 Subject: [PATCH 7/8] fix(scheduler): satisfy clippy test order --- .../src/schedulers/fragments/plan_fragment.rs | 166 +++++++++--------- 1 file changed, 83 insertions(+), 83 deletions(-) diff --git a/src/query/service/src/schedulers/fragments/plan_fragment.rs b/src/query/service/src/schedulers/fragments/plan_fragment.rs index 0a3a5327aeef8..c8c713048e3ae 100644 --- a/src/query/service/src/schedulers/fragments/plan_fragment.rs +++ b/src/query/service/src/schedulers/fragments/plan_fragment.rs @@ -801,89 +801,6 @@ impl DeriveHandle for ReadSourceDeriveHandle { } } -#[cfg(test)] -mod tests { - use super::SourceActionState; - use super::should_schedule_intermediate_fragment_locally; - use super::should_schedule_source_action; - - #[test] - fn test_skip_empty_source_actions_once_real_work_exists() { - assert!(should_schedule_source_action( - true, - false, - SourceActionState::NonEmpty, - false - )); - assert!(!should_schedule_source_action( - true, - false, - SourceActionState::Empty, - true - )); - assert!(!should_schedule_source_action( - true, - false, - SourceActionState::Empty, - false - )); - } - - #[test] - fn test_keep_single_local_fallback_for_all_empty_sources() { - assert!(should_schedule_source_action( - false, - false, - SourceActionState::Empty, - true - )); - assert!(!should_schedule_source_action( - false, - false, - SourceActionState::Empty, - false - )); - } - - #[test] - fn test_keep_singleton_sources_local_only() { - assert!(should_schedule_source_action( - false, - true, - SourceActionState::Singleton, - true - )); - assert!(!should_schedule_source_action( - false, - true, - SourceActionState::Singleton, - false - )); - assert!(!should_schedule_source_action( - false, - true, - SourceActionState::Empty, - true - )); - } - - #[test] - fn test_localize_intermediate_fragment_when_all_inputs_are_local_only() { - assert!(should_schedule_intermediate_fragment_locally( - false, true, true - )); - assert!(should_schedule_intermediate_fragment_locally( - true, true, false - )); - assert!(!should_schedule_intermediate_fragment_locally( - false, true, false - )); - assert!(!should_schedule_intermediate_fragment_locally( - false, false, true - )); - } -} - struct ReclusterDeriveHandle { tasks: Vec, } @@ -1030,3 +947,86 @@ impl DeriveHandle for ReplaceDeriveHandle { Err(children) } } + +#[cfg(test)] +mod tests { + use super::SourceActionState; + use super::should_schedule_intermediate_fragment_locally; + use super::should_schedule_source_action; + + #[test] + fn test_skip_empty_source_actions_once_real_work_exists() { + assert!(should_schedule_source_action( + true, + false, + SourceActionState::NonEmpty, + false + )); + assert!(!should_schedule_source_action( + true, + false, + SourceActionState::Empty, + true + )); + assert!(!should_schedule_source_action( + true, + false, + SourceActionState::Empty, + false + )); + } + + #[test] + fn test_keep_single_local_fallback_for_all_empty_sources() { + assert!(should_schedule_source_action( + false, + false, + SourceActionState::Empty, + true + )); + assert!(!should_schedule_source_action( + false, + false, + SourceActionState::Empty, + false + )); + } + + #[test] + fn test_keep_singleton_sources_local_only() { + assert!(should_schedule_source_action( + false, + true, + SourceActionState::Singleton, + true + )); + assert!(!should_schedule_source_action( + false, + true, + SourceActionState::Singleton, + false + )); + assert!(!should_schedule_source_action( + false, + true, + SourceActionState::Empty, + true + )); + } + + #[test] + fn test_localize_intermediate_fragment_when_all_inputs_are_local_only() { + assert!(should_schedule_intermediate_fragment_locally( + false, true, true + )); + assert!(should_schedule_intermediate_fragment_locally( + true, true, false + )); + assert!(!should_schedule_intermediate_fragment_locally( + false, true, false + )); + assert!(!should_schedule_intermediate_fragment_locally( + false, false, true + )); + } +} From 377dbbe421148e25879741da47047637e0ea8895 Mon Sep 17 00:00:00 2001 From: sky <75521613+SkyFan2002@users.noreply.github.com> Date: Mon, 13 Apr 2026 01:29:14 +0800 Subject: [PATCH 8/8] fix(exchange): drop unused flight channels --- .../flight/v1/exchange/exchange_manager.rs | 61 ++++++++++++------- 1 file changed, 40 insertions(+), 21 deletions(-) diff --git a/src/query/service/src/servers/flight/v1/exchange/exchange_manager.rs b/src/query/service/src/servers/flight/v1/exchange/exchange_manager.rs index cb8fcf7eafc6e..3d3189eb35406 100644 --- a/src/query/service/src/servers/flight/v1/exchange/exchange_manager.rs +++ b/src/query/service/src/servers/flight/v1/exchange/exchange_manager.rs @@ -812,26 +812,6 @@ impl DataExchangeManager { match queries_coordinator.get_mut(&query_id) { None => Err(ErrorCode::Internal("Query not exists.")), Some(query_coordinator) => { - if !query_coordinator.flight_data_senders.is_empty() { - unreachable!( - "query_coordinator.fragment_senders is not empty: {:?}", - query_coordinator - .flight_data_senders - .keys() - .collect::>() - ); - } - - if !query_coordinator.flight_data_receivers.is_empty() { - unreachable!( - "query_coordinator.fragment_receivers is not empty: {:?}", - query_coordinator - .flight_data_receivers - .keys() - .collect::>() - ); - } - let injector = DefaultExchangeInjector::create(); let mut build_res = query_coordinator.subscribe_fragment( &ctx, @@ -853,6 +833,8 @@ impl DataExchangeManager { .extend(sub_build_res.sources_pipelines); } + query_coordinator.drop_unused_flight_data_channels("building root pipeline"); + let exchanges = std::mem::take(&mut query_coordinator.statistics_exchanges); let statistics_receiver = StatisticsReceiver::spawn_receiver(&ctx, exchanges)?; @@ -1065,6 +1047,24 @@ impl QueryCoordinator { } } + fn drop_unused_flight_data_channels(&mut self, stage: &str) { + let sender_keys = self.flight_data_senders.keys().cloned().collect::>(); + let receiver_keys = self + .flight_data_receivers + .keys() + .cloned() + .collect::>(); + + if !sender_keys.is_empty() || !receiver_keys.is_empty() { + warn!( + "Dropping unused legacy flight channels before {}: senders={:?}, receivers={:?}", + stage, sender_keys, receiver_keys + ); + self.flight_data_senders.clear(); + self.flight_data_receivers.clear(); + } + } + pub fn register_inbound_channel_sets( &mut self, channel_sizes: &HashMap, @@ -1304,7 +1304,7 @@ impl QueryCoordinator { let settings = ExecutorSettings::try_create(info.query_ctx.clone())?; let executor = PipelineCompleteExecutor::from_pipelines(pipelines, settings)?; - assert!(self.flight_data_senders.is_empty() && self.flight_data_receivers.is_empty()); + self.drop_unused_flight_data_channels("starting partial query executor"); let info_mut = self.info.as_mut().expect("Query info is None"); info_mut.query_executor = Some(executor.clone()); @@ -1472,6 +1472,7 @@ mod tests { use crate::physical_plans::ExchangeSink as PhysicalExchangeSink; use crate::physical_plans::PhysicalPlan; use crate::physical_plans::PhysicalPlanMeta; + use crate::servers::flight::FlightReceiver; #[test] fn test_query_coordinator_register_inbound_channel_sets() -> Result<()> { @@ -1536,6 +1537,24 @@ mod tests { Ok(()) } + #[test] + fn test_query_coordinator_drops_unused_flight_data_channels() -> Result<()> { + let mut coordinator = QueryCoordinator::create(); + let _ = coordinator.register_flight_channel_sender("sender-a".to_string())?; + coordinator + .flight_data_receivers + .insert("receiver-a".to_string(), vec![FlightReceiver::create( + async_channel::bounded(1).1, + )]); + + coordinator.drop_unused_flight_data_channels("unit-test"); + + assert!(coordinator.flight_data_senders.is_empty()); + assert!(coordinator.flight_data_receivers.is_empty()); + + Ok(()) + } + #[test] fn test_inline_locally_subscribed_exchange_sink_fragments() { let plan = PhysicalPlan::new(PhysicalExchangeSink {