Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 37 additions & 14 deletions src/query/service/src/physical_plans/physical_exchange_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use std::any::Any;

use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_expression::DataSchemaRef;
use databend_common_pipeline::core::PlanScope;
Expand All @@ -24,6 +25,8 @@ use crate::physical_plans::physical_plan::IPhysicalPlan;
use crate::physical_plans::physical_plan::PhysicalPlan;
use crate::physical_plans::physical_plan::PhysicalPlanMeta;
use crate::pipelines::PipelineBuilder;
use crate::servers::flight::v1::exchange::DataExchange;
use crate::servers::flight::v1::exchange::via_remote_exchange_source;

#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
pub struct ExchangeSource {
Expand All @@ -34,6 +37,7 @@ pub struct ExchangeSource {

// Fragment ID of source fragment
pub source_fragment_id: usize,
pub source_exchange: Option<DataExchange>,
pub query_id: String,
}

Expand Down Expand Up @@ -73,30 +77,49 @@ impl IPhysicalPlan for ExchangeSource {
meta: self.meta.clone(),
schema: self.schema.clone(),
source_fragment_id: self.source_fragment_id,
source_exchange: self.source_exchange.clone(),
query_id: self.query_id.clone(),
})
}

fn build_pipeline2(&self, builder: &mut PipelineBuilder) -> Result<()> {
let exchange_manager = builder.ctx.get_exchange_manager();
let build_res = exchange_manager.get_fragment_source(
if let Some(build_res) = exchange_manager.get_fragment_source(
&self.query_id,
self.source_fragment_id,
builder.exchange_injector.clone(),
)? {
let plan_scope = PlanScope::get_plan_scope();
let build_pipeline = build_res.main_pipeline.finalize(plan_scope);

// add sharing data
builder.join_state = build_res.builder_data.input_join_state;
builder.merge_into_probe_data_fields = build_res.builder_data.input_probe_schema;

// Merge pipeline
assert_eq!(builder.main_pipeline.output_len(), 0);
let sinks = builder.main_pipeline.merge(build_pipeline)?;
builder.main_pipeline.extend_sinks(sinks);
builder.pipelines.extend(build_res.sources_pipelines);
return Ok(());
}

let source_exchange = self.source_exchange.as_ref().ok_or_else(|| {
ErrorCode::Internal(format!(
"Source fragment {} of query {} has no exchange metadata",
self.source_fragment_id, self.query_id
))
})?;

via_remote_exchange_source(
builder.ctx.clone(),
&self.query_id,
self.source_fragment_id,
&self.schema,
source_exchange,
builder.exchange_injector.clone(),
&mut builder.main_pipeline,
)?;

let plan_scope = PlanScope::get_plan_scope();
let build_pipeline = build_res.main_pipeline.finalize(plan_scope);

// add sharing data
builder.join_state = build_res.builder_data.input_join_state;
builder.merge_into_probe_data_fields = build_res.builder_data.input_probe_schema;

// Merge pipeline
assert_eq!(builder.main_pipeline.output_len(), 0);
let sinks = builder.main_pipeline.merge(build_pipeline)?;
builder.main_pipeline.extend_sinks(sinks);
builder.pipelines.extend(build_res.sources_pipelines);
Ok(())
}
}
22 changes: 18 additions & 4 deletions src/query/service/src/pipelines/executor/executor_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -886,9 +886,7 @@ impl RunningGraph {
}

pub fn assert_finished_graph(&self) -> Result<()> {
let finished_nodes = self.0.finished_nodes.load(Ordering::SeqCst);

match finished_nodes >= self.0.graph.node_count() {
match self.is_all_nodes_finished() {
true => Ok(()),
false => Err(ErrorCode::Internal(format!(
"Pipeline graph is not finished, details: {}",
Expand All @@ -899,7 +897,23 @@ impl RunningGraph {

/// Checks if all nodes in the graph are finished.
pub fn is_all_nodes_finished(&self) -> bool {
self.0.finished_nodes.load(Ordering::SeqCst) >= self.0.graph.node_count()
let node_count = self.0.graph.node_count();
if self.0.finished_nodes.load(Ordering::SeqCst) >= node_count {
return true;
}

let all_finished = self.0.graph.node_indices().all(|node_index| {
matches!(
*self.0.graph[node_index].state.lock().unwrap(),
State::Finished
)
});

if all_finished {
self.0.finished_nodes.store(node_count, Ordering::SeqCst);
}

all_finished
}

/// Flag the graph should finish and no more tasks should be scheduled.
Expand Down
11 changes: 11 additions & 0 deletions src/query/service/src/pipelines/executor/pipeline_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,17 @@ impl PipelineExecutor {
}
}

pub fn is_all_nodes_finished(&self) -> bool {
match self {
PipelineExecutor::QueryPipelineExecutor(executor) => {
executor.graph.is_all_nodes_finished()
}
PipelineExecutor::QueriesPipelineExecutor(query_wrapper) => {
query_wrapper.graph.is_all_nodes_finished()
}
}
}

pub fn format_graph_nodes(&self) -> String {
match self {
PipelineExecutor::QueryPipelineExecutor(executor) => executor.format_graph_nodes(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,16 @@ impl State {
}
}

pub fn wait_finish_timeout(&self, timeout: Duration) -> bool {
let mut mutex = self.finish_mutex.lock();

if !*mutex {
self.finish_condvar.wait_for(&mut mutex, timeout);
}

*mutex
}

pub fn is_finished(&self) -> bool {
self.is_finished.load(Ordering::Relaxed)
}
Expand Down Expand Up @@ -228,6 +238,23 @@ impl PipelinePullingExecutor {
)));
}

if self.executor.is_all_nodes_finished() {
if !self.executor.is_finished() {
self.executor.finish::<()>(None);
}

// Once the graph itself is finished, the pulling side has already
// consumed the full result. The detached executor thread may still be
// draining finish hooks or joining workers; do a short best-effort wait
// for background errors, but do not block EOF on it.
let _ = self.state.wait_finish_timeout(Duration::from_millis(100));

return match self.state.try_get_catch_error() {
None => Ok(None),
Some(error) => Err(error),
};
}

continue;
}
Err(RecvTimeoutError::Disconnected) => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,10 @@ impl QueryPipelineExecutor {
}
}
}

if graph.is_all_nodes_finished() {
self.finish::<()>(None);
}
}
Err(error_type) => {
let cause = error_type.get_error_code();
Expand Down
27 changes: 26 additions & 1 deletion src/query/service/src/schedulers/fragments/fragmenter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,15 @@ impl Fragmenter {
});

let edges = Self::collect_fragments_edge(fragments.values());
let source_lookup = fragments
.iter()
.map(|(fragment_id, fragment)| {
let mut fragment = fragment.clone();
fragment.source_fragments.clear();
(*fragment_id, fragment)
})
.collect::<BTreeMap<_, _>>();
let mut target_sources = BTreeMap::<usize, Vec<usize>>::new();

for (source, target) in edges {
let Some(fragment) = fragments.get_mut(&source) else {
Expand All @@ -121,6 +130,21 @@ impl Fragmenter {
if let Some(exchange_sink) = ExchangeSink::from_mut_physical_plan(&mut fragment.plan) {
exchange_sink.destination_fragment_id = target;
}

target_sources.entry(target).or_default().push(source);
}

for (target, sources) in target_sources {
let Some(fragment) = fragments.get_mut(&target) else {
continue;
};

let mut source_fragments = sources
.into_iter()
.filter_map(|source| source_lookup.get(&source).cloned())
.collect::<Vec<_>>();
source_fragments.sort_by_key(|fragment| fragment.fragment_id);
fragment.source_fragments = source_fragments;
}

Ok(fragments.into_values().collect::<Vec<_>>())
Expand Down Expand Up @@ -312,7 +336,7 @@ impl DeriveHandle for FragmentDeriveHandle {

let source_fragment = PlanFragment {
plan,
exchange,
exchange: exchange.clone(),
fragment_type,
source_fragments: vec![],
fragment_id: source_fragment_id,
Expand All @@ -326,6 +350,7 @@ impl DeriveHandle for FragmentDeriveHandle {
query_id: self.query_id.clone(),

source_fragment_id,
source_exchange: exchange,
meta: PhysicalPlanMeta::with_plan_id("ExchangeSource", plan_id),
}));
}
Expand Down
Loading
Loading