From 58970d84d5133405291d9b8d602214a388cc2403 Mon Sep 17 00:00:00 2001 From: Gabriel Musat Mestre Date: Sat, 2 May 2026 18:24:02 +0200 Subject: [PATCH 1/4] Refactor distributed.rs into coordinator module and explicitly enable task metrics collection --- src/coordinator/distributed.rs | 193 +++++++++ src/coordinator/metrics_store.rs | 45 ++ src/coordinator/mod.rs | 7 + src/coordinator/prepare_static_plan.rs | 112 +++++ .../task_spawner.rs} | 407 +++--------------- src/execution_plans/mod.rs | 2 - src/lib.rs | 4 +- src/metrics/task_metrics_collector.rs | 18 +- src/metrics/task_metrics_rewriter.rs | 9 +- src/stage.rs | 3 +- src/test_utils/metrics.rs | 2 +- src/test_utils/plans.rs | 2 +- 12 files changed, 449 insertions(+), 355 deletions(-) create mode 100644 src/coordinator/distributed.rs create mode 100644 src/coordinator/metrics_store.rs create mode 100644 src/coordinator/mod.rs create mode 100644 src/coordinator/prepare_static_plan.rs rename src/{execution_plans/distributed.rs => coordinator/task_spawner.rs} (51%) diff --git a/src/coordinator/distributed.rs b/src/coordinator/distributed.rs new file mode 100644 index 00000000..916479aa --- /dev/null +++ b/src/coordinator/distributed.rs @@ -0,0 +1,193 @@ +use crate::common::{require_one_child, serialize_uuid}; +use crate::coordinator::metrics_store::MetricsStore; +use crate::coordinator::prepare_static_plan::prepare_static_plan; +use crate::distributed_planner::NetworkBoundaryExt; +use crate::worker::generated::worker::TaskKey; +use datafusion::common::internal_datafusion_err; +use datafusion::common::runtime::JoinSet; +use datafusion::common::tree_node::{TreeNode, TreeNodeRecursion}; +use datafusion::common::{Result, exec_err}; +use datafusion::execution::{SendableRecordBatchStream, TaskContext}; +use datafusion::physical_expr_common::metrics::MetricsSet; +use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet; +use datafusion::physical_plan::stream::RecordBatchReceiverStreamBuilder; +use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties}; +use futures::StreamExt; +use std::any::Any; +use std::fmt::Formatter; +use std::sync::Arc; +use std::sync::Mutex; + +/// [ExecutionPlan] that executes the inner plan in distributed mode. +/// Before executing it, two modifications are lazily performed on the plan: +/// 1. Assigns worker URLs to all the stages. A random set of URLs are sampled from the +/// channel resolver and assigned to each task in each stage. +/// 2. Encodes all the plans in protobuf format so that network boundary nodes can send them +/// over the wire. +#[derive(Debug)] +pub struct DistributedExec { + plan: Arc, + prepared_plan: Arc>>>, + metrics: ExecutionPlanMetricsSet, + pub(crate) task_metrics: Option>, +} + +pub(super) struct PreparedPlan { + pub(super) head_stage: Arc, + pub(super) join_set: JoinSet>, +} + +impl DistributedExec { + pub fn new(plan: Arc) -> Self { + Self { + plan, + prepared_plan: Arc::new(Mutex::new(None)), + metrics: ExecutionPlanMetricsSet::new(), + task_metrics: None, + } + } + + /// Enables task metrics collection from remote workers. + pub fn with_metrics_collection(mut self, enabled: bool) -> Self { + self.task_metrics = match enabled { + true => Some(Arc::new(MetricsStore::new())), + false => None, + }; + self + } + + /// Waits until all worker tasks have reported their metrics back via the coordinator channel. + /// + /// Metrics are delivered asynchronously after query execution completes, so callers that need + /// complete metrics (e.g. for observability or display) should await this before inspecting + /// [`Self::task_metrics`] or calling [`rewrite_distributed_plan_with_metrics`]. + /// + /// [`rewrite_distributed_plan_with_metrics`]: crate::rewrite_distributed_plan_with_metrics + pub async fn wait_for_metrics(&self) { + let mut expected_keys: Vec = Vec::new(); + let Some(task_metrics) = &self.task_metrics else { + return; + }; + let _ = self.plan.apply(|plan| { + if let Some(boundary) = plan.as_network_boundary() { + let stage = boundary.input_stage(); + for i in 0..stage.task_count() { + expected_keys.push(TaskKey { + query_id: serialize_uuid(&stage.query_id()), + stage_id: stage.num() as u64, + task_number: i as u64, + }); + } + } + Ok(TreeNodeRecursion::Continue) + }); + if expected_keys.is_empty() { + return; + } + let mut rx = task_metrics.rx.clone(); + let _ = rx + .wait_for(|map| expected_keys.iter().all(|key| map.contains_key(key))) + .await; + } + + /// Returns the plan which is lazily prepared on `execute()` and actually gets executed. + /// It is updated on every call to `execute()`. Returns an error if `.execute()` has not been + /// called. + pub(crate) fn prepared_plan(&self) -> Result> { + self.prepared_plan + .lock() + .map_err(|e| internal_datafusion_err!("Failed to lock prepared plan: {}", e))? + .clone() + .ok_or_else(|| { + internal_datafusion_err!("No prepared plan found. Was execute() called?") + }) + } +} + +impl DisplayAs for DistributedExec { + fn fmt_as(&self, _: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result { + write!(f, "DistributedExec") + } +} + +impl ExecutionPlan for DistributedExec { + fn name(&self) -> &str { + "DistributedExec" + } + + fn as_any(&self) -> &dyn Any { + self + } + + fn properties(&self) -> &Arc { + self.plan.properties() + } + + fn children(&self) -> Vec<&Arc> { + vec![&self.plan] + } + + fn with_new_children( + self: Arc, + children: Vec>, + ) -> Result> { + Ok(Arc::new(DistributedExec { + plan: require_one_child(&children)?, + prepared_plan: self.prepared_plan.clone(), + metrics: self.metrics.clone(), + task_metrics: self.task_metrics.clone(), + })) + } + + fn execute( + &self, + partition: usize, + context: Arc, + ) -> Result { + if partition > 0 { + // The DistributedExec node calls try_assign_urls() lazily upon calling .execute(). This means + // that .execute() must only be called once, as we cannot afford to perform several + // random URL assignation while calling multiple partitions, as they will differ, + // producing an invalid plan + return exec_err!( + "DistributedExec must only have 1 partition, but it was called with partition index {partition}" + ); + } + + let PreparedPlan { + head_stage, + join_set, + } = prepare_static_plan(&self.plan, &self.metrics, &self.task_metrics, &context)?; + { + let mut guard = self + .prepared_plan + .lock() + .map_err(|e| internal_datafusion_err!("Failed to lock prepared plan: {e}"))?; + *guard = Some(head_stage.clone()); + } + let mut builder = RecordBatchReceiverStreamBuilder::new(self.schema(), 1); + let tx = builder.tx(); + // Spawn the task that pulls data from child... + builder.spawn(async move { + let mut stream = head_stage.execute(partition, context)?; + while let Some(msg) = stream.next().await { + if tx.send(msg).await.is_err() { + break; // channel closed + } + } + Ok(()) + }); + // ...in parallel to the one that feeds the plan to workers. + builder.spawn(async move { + for res in join_set.join_all().await { + res?; + } + Ok(()) + }); + Ok(builder.build()) + } + + fn metrics(&self) -> Option { + Some(self.metrics.clone_inner()) + } +} diff --git a/src/coordinator/metrics_store.rs b/src/coordinator/metrics_store.rs new file mode 100644 index 00000000..4a9eee93 --- /dev/null +++ b/src/coordinator/metrics_store.rs @@ -0,0 +1,45 @@ +use crate::TaskKey; +use datafusion::common::HashMap; +use tokio::sync::watch; + +type MetricsMap = HashMap>; + +/// Stores the metrics collected from all worker tasks, and notifies waiters when new entries arrive. +#[derive(Debug, Clone)] +pub struct MetricsStore { + tx: watch::Sender, + pub(crate) rx: watch::Receiver, +} + +impl MetricsStore { + pub(crate) fn new() -> Self { + let (tx, rx) = watch::channel(HashMap::new()); + Self { tx, rx } + } + + pub(crate) fn insert( + &self, + key: TaskKey, + metrics: Vec, + ) { + self.tx.send_modify(|map| { + map.insert(key, metrics); + }); + } + + pub(crate) fn get( + &self, + key: &TaskKey, + ) -> Option> { + self.rx.borrow().get(key).cloned() + } + + #[cfg(test)] + pub(crate) fn from_entries( + entries: impl IntoIterator)>, + ) -> Self { + let map: HashMap<_, _> = entries.into_iter().collect(); + let (tx, rx) = watch::channel(map); + Self { tx, rx } + } +} diff --git a/src/coordinator/mod.rs b/src/coordinator/mod.rs new file mode 100644 index 00000000..2aea8442 --- /dev/null +++ b/src/coordinator/mod.rs @@ -0,0 +1,7 @@ +mod distributed; +mod metrics_store; +mod prepare_static_plan; +mod task_spawner; + +pub use distributed::DistributedExec; +pub(crate) use metrics_store::MetricsStore; diff --git a/src/coordinator/prepare_static_plan.rs b/src/coordinator/prepare_static_plan.rs new file mode 100644 index 00000000..22aba4b3 --- /dev/null +++ b/src/coordinator/prepare_static_plan.rs @@ -0,0 +1,112 @@ +use crate::coordinator::MetricsStore; +use crate::coordinator::distributed::PreparedPlan; +use crate::coordinator::task_spawner::{ + CoordinatorToWorkerMetrics, CoordinatorToWorkerTaskSpawner, +}; +use crate::distributed_planner::get_distributed_task_estimator; +use crate::stage::RemoteStage; +use crate::{ + DistributedCodec, NetworkBoundaryExt, Stage, TaskRoutingContext, + get_distributed_worker_resolver, +}; +use datafusion::common::runtime::JoinSet; +use datafusion::common::tree_node::{Transformed, TreeNode}; +use datafusion::common::{Result, exec_err}; +use datafusion::execution::TaskContext; +use datafusion::physical_plan::ExecutionPlan; +use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet; +use rand::Rng; +use std::sync::Arc; + +/// Prepares the distributed plan for execution, which implies: +/// 1. Perform some worker assignation, choosing randomly from the given URLs and assigning one +/// URL per task. +/// 2. Sending the sliced subplans to the assigned URLs. For each URL assigned to a task, a +/// network call feeding the subplan is necessary. +/// 3. In each network boundary, set the input plan to `None`. That way, network boundaries +/// become nodes without children and traversing them will not go further down in. +/// 4. Spawn a background task per worker that waits for the worker to finish and collects +/// its metrics into [DistributedExec::task_metrics] via the coordinator channel. +pub(super) fn prepare_static_plan( + base_plan: &Arc, + metrics: &ExecutionPlanMetricsSet, + task_metrics: &Option>, + ctx: &Arc, +) -> Result { + let worker_resolver = get_distributed_worker_resolver(ctx.session_config())?; + let codec = DistributedCodec::new_combined_with_user(ctx.session_config()); + + let available_urls = worker_resolver.get_urls()?; + + let metrics = CoordinatorToWorkerMetrics::new(metrics); + + let mut join_set = JoinSet::new(); + let prepared = Arc::clone(base_plan).transform_up(|plan| { + // The following logic is just applied on network boundaries. + let Some(plan) = plan.as_network_boundary() else { + return Ok(Transformed::no(plan)); + }; + + let Stage::Local(stage) = plan.input_stage() else { + return exec_err!("Input stage from network boundary was not in Local state"); + }; + + let task_estimator = get_distributed_task_estimator(ctx.session_config())?; + + let mut spawner = CoordinatorToWorkerTaskSpawner::new( + stage, + &metrics, + task_metrics, + &codec, + &mut join_set, + )?; + + let routed_urls = match task_estimator.route_tasks(&TaskRoutingContext { + task_ctx: Arc::clone(ctx), + plan: &stage.plan, + task_count: stage.tasks, + available_urls: &available_urls, + }) { + Ok(Some(routed_urls)) => routed_urls, + // If the user has not defined custom routing with a `route_tasks` implementation, we + // default to round-robin task assignation from a randomized starting point. + Ok(None) => { + let start_idx = rand::rng().random_range(0..available_urls.len()); + (0..stage.tasks) + .map(|i| available_urls[(start_idx + i) % available_urls.len()].clone()) + .collect() + } + Err(e) => return exec_err!("error routing tasks to workers: {e}"), + }; + + if routed_urls.len() != stage.tasks { + return exec_err!( + "number of tasks ({}) was not equal to number of urls ({}) at execution time", + stage.tasks, + routed_urls.len() + ); + } + + let mut workers = Vec::with_capacity(stage.tasks); + for (i, routed_url) in routed_urls.into_iter().enumerate() { + workers.push(routed_url.clone()); + // Spawn a task that sends the subplan to the chosen URL. + // There will be as many spawned tasks as workers. + let (tx, worker_rx) = spawner.send_plan_task(Arc::clone(ctx), i, routed_url)?; + spawner.metrics_collection_task(i, worker_rx); + spawner.work_unit_feed_task(Arc::clone(ctx), i, tx)?; + } + + Ok(Transformed::yes(plan.with_input_stage(Stage::Remote( + RemoteStage { + query_id: stage.query_id, + num: stage.num, + workers, + }, + ))?)) + })?; + Ok(PreparedPlan { + head_stage: prepared.data, + join_set, + }) +} diff --git a/src/execution_plans/distributed.rs b/src/coordinator/task_spawner.rs similarity index 51% rename from src/execution_plans/distributed.rs rename to src/coordinator/task_spawner.rs index 52fa1e2b..202f0001 100644 --- a/src/execution_plans/distributed.rs +++ b/src/coordinator/task_spawner.rs @@ -1,356 +1,68 @@ -use crate::common::{require_one_child, serialize_uuid, task_ctx_with_extension}; +use crate::common::{serialize_uuid, task_ctx_with_extension}; use crate::config_extension_ext::get_config_extension_propagation_headers; -use crate::distributed_planner::{NetworkBoundaryExt, get_distributed_task_estimator}; +use crate::coordinator::MetricsStore; use crate::execution_plans::ChildrenIsolatorUnionExec; -use crate::networking::get_distributed_worker_resolver; use crate::passthrough_headers::get_passthrough_headers; -use crate::protobuf::{DistributedCodec, tonic_status_to_datafusion_error}; -use crate::stage::{LocalStage, RemoteStage, Stage}; +use crate::protobuf::tonic_status_to_datafusion_error; +use crate::stage::LocalStage; use crate::worker::generated::worker as pb; +use crate::worker::generated::worker::coordinator_to_worker_msg::Inner; use crate::worker::generated::worker::set_plan_request::WorkUnitFeedDeclaration; -use crate::worker::generated::worker::{ - CoordinatorToWorkerMsg, SetPlanRequest, TaskKey, WorkUnit, WorkerToCoordinatorMsg, - coordinator_to_worker_msg::Inner, worker_to_coordinator_msg, -}; use crate::{ DISTRIBUTED_DATAFUSION_TASK_ID_LABEL, DistributedConfig, DistributedTaskContext, - DistributedWorkUnitFeedContext, TaskRoutingContext, WorkerResolver, - get_distributed_channel_resolver, + DistributedWorkUnitFeedContext, TaskKey, get_distributed_channel_resolver, }; -use datafusion::common::HashMap; +use datafusion::common::Result; use datafusion::common::instant::Instant; use datafusion::common::runtime::JoinSet; -use datafusion::common::tree_node::{Transformed, TreeNode, TreeNodeRecursion}; -use datafusion::common::{Result, exec_err}; -use datafusion::common::{exec_datafusion_err, internal_datafusion_err}; -use datafusion::error::DataFusionError; -use datafusion::execution::{SendableRecordBatchStream, TaskContext}; -use datafusion::physical_expr_common::metrics::MetricsSet; -use datafusion::physical_plan::metrics::{ +use datafusion::common::{DataFusionError, exec_datafusion_err}; +use datafusion::execution::TaskContext; +use datafusion::physical_expr_common::metrics::{ Count, ExecutionPlanMetricsSet, Label, MetricBuilder, MetricValue, Time, }; -use datafusion::physical_plan::stream::RecordBatchReceiverStreamBuilder; -use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties}; +use datafusion::physical_plan::ExecutionPlan; use datafusion_proto::physical_plan::{AsExecutionPlan, PhysicalExtensionCodec}; use datafusion_proto::protobuf::PhysicalPlanNode; use futures::StreamExt; use futures::future::BoxFuture; use http::Extensions; use prost::Message; -use rand::Rng; -use std::any::Any; -use std::fmt::{Display, Formatter}; +use std::fmt::Display; use std::sync::Arc; -use std::sync::Mutex; use std::sync::atomic::{AtomicU64, Ordering}; use std::time::Duration; -use tokio::sync::mpsc::UnboundedSender; -use tokio::sync::watch; +use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; use tokio_stream::wrappers::UnboundedReceiverStream; use tonic::Request; use tonic::metadata::MetadataMap; use url::Url; use uuid::Uuid; -/// Stores the metrics collected from all worker tasks, and notifies waiters when new entries arrive. -#[derive(Debug, Clone)] -pub struct MetricsStore { - tx: watch::Sender>>, - rx: watch::Receiver>>, -} - -impl MetricsStore { - fn new() -> Self { - let (tx, rx) = watch::channel(HashMap::new()); - Self { tx, rx } - } - - pub fn insert(&self, key: TaskKey, metrics: Vec) { - self.tx.send_modify(|map| { - map.insert(key, metrics); - }); - } - - pub fn get(&self, key: &TaskKey) -> Option> { - self.rx.borrow().get(key).cloned() - } - - #[cfg(test)] - pub(crate) fn from_entries( - entries: impl IntoIterator)>, - ) -> Self { - let map: HashMap<_, _> = entries.into_iter().collect(); - let (tx, rx) = watch::channel(map); - Self { tx, rx } - } -} - -/// [ExecutionPlan] that executes the inner plan in distributed mode. -/// Before executing it, two modifications are lazily performed on the plan: -/// 1. Assigns worker URLs to all the stages. A random set of URLs are sampled from the -/// channel resolver and assigned to each task in each stage. -/// 2. Encodes all the plans in protobuf format so that network boundary nodes can send them -/// over the wire. -#[derive(Debug)] -pub struct DistributedExec { - pub plan: Arc, - pub prepared_plan: Arc>>>, - metrics: ExecutionPlanMetricsSet, - pub task_metrics: Arc, -} - -struct PreparedPlan { - plan: Arc, - join_set: JoinSet>, +/// Metrics that measure network details about communications between [DistributedExec] and a +/// worker. +#[derive(Clone)] +pub(super) struct CoordinatorToWorkerMetrics { + pub(super) plan_bytes_sent: Count, + pub(super) plan_send_latency: Arc, } -impl DistributedExec { - pub fn new(plan: Arc) -> Self { +impl CoordinatorToWorkerMetrics { + pub(super) fn new(metrics: &ExecutionPlanMetricsSet) -> Self { Self { - plan, - prepared_plan: Arc::new(Mutex::new(None)), - metrics: ExecutionPlanMetricsSet::new(), - task_metrics: Arc::new(MetricsStore::new()), - } - } - - /// Waits until all worker tasks have reported their metrics back via the coordinator channel. - /// - /// Metrics are delivered asynchronously after query execution completes, so callers that need - /// complete metrics (e.g. for observability or display) should await this before inspecting - /// [`Self::task_metrics`] or calling [`rewrite_distributed_plan_with_metrics`]. - /// - /// [`rewrite_distributed_plan_with_metrics`]: crate::rewrite_distributed_plan_with_metrics - pub async fn wait_for_metrics(&self) { - let mut expected_keys: Vec = Vec::new(); - let _ = self.plan.apply(|plan| { - if let Some(boundary) = plan.as_network_boundary() { - let stage = boundary.input_stage(); - for i in 0..stage.task_count() { - expected_keys.push(TaskKey { - query_id: serialize_uuid(&stage.query_id()), - stage_id: stage.num() as u64, - task_number: i as u64, - }); - } - } - Ok(TreeNodeRecursion::Continue) - }); - if expected_keys.is_empty() { - return; - } - let mut rx = self.task_metrics.rx.clone(); - let _ = rx - .wait_for(|map| expected_keys.iter().all(|key| map.contains_key(key))) - .await; - } - - /// Returns the plan which is lazily prepared on execute() and actually gets executed. - /// It is updated on every call to execute(). Returns an error if .execute() has not been called. - pub(crate) fn prepared_plan(&self) -> Result, DataFusionError> { - self.prepared_plan - .lock() - .map_err(|e| internal_datafusion_err!("Failed to lock prepared plan: {}", e))? - .clone() - .ok_or_else(|| { - internal_datafusion_err!("No prepared plan found. Was execute() called?") - }) - } - - /// Prepares the distributed plan for execution. - /// In particular, this means we must take the following steps at each network boundary node: - /// 1. Assign tasks to URLs. Follow the user-specified routing defined in the TaskEstimator - /// implementation, or default to random round-robin assignment. - /// 2. Send the sliced subplans to the assigned URLs. For each task assigned to a URL, it is here - /// that we now must actually send that subplan to the URL over the wire. - /// 3. Set network boundary input plans to `None`. This way, network boundaries become nodes - /// without children, so we stop further traversal from happening in the future. - /// 4. Spawn a background task per worker that waits for the worker to finish and collects - /// its metrics into [DistributedExec::task_metrics] via the coordinator channel. - fn prepare_plan(&self, ctx: &Arc) -> Result { - let worker_resolver = get_distributed_worker_resolver(ctx.session_config())?; - let codec = DistributedCodec::new_combined_with_user(ctx.session_config()); - - let available_urls = worker_resolver.get_urls()?; - - let metrics = CoordinatorToWorkerMetrics { // Metric that measures to total sum of bytes worth of subplans sent. - plan_bytes_sent: MetricBuilder::new(&self.metrics) + plan_bytes_sent: MetricBuilder::new(metrics) .with_label(Label::new(DISTRIBUTED_DATAFUSION_TASK_ID_LABEL, "0")) .global_counter("plan_bytes_sent"), // Latency statistics about the network calls issued to the workers for feeding subplans. plan_send_latency: Arc::new(LatencyMetric::new( "plan_send_latency", |b| b.with_label(Label::new(DISTRIBUTED_DATAFUSION_TASK_ID_LABEL, "0")), - &self.metrics, + metrics, )), - }; - - let mut join_set = JoinSet::new(); - let prepared = Arc::clone(&self.plan).transform_up(|plan| { - // The following logic is only relevant to network boundaries. - let Some(plan) = plan.as_network_boundary() else { - return Ok(Transformed::no(plan)); - }; - - let Stage::Local(stage) = plan.input_stage() else { - return exec_err!("Input stage from network boundary was not in Local state"); - }; - - let task_estimator = get_distributed_task_estimator(ctx.session_config())?; - - let mut spawner = CoordinatorToWorkerTaskSpawner::new( - stage, - &metrics, - &self.task_metrics, - &codec, - &mut join_set, - )?; - - let routed_urls = match task_estimator.route_tasks(&TaskRoutingContext { - task_ctx: Arc::clone(ctx), - plan: &stage.plan, - task_count: stage.tasks, - available_urls: &available_urls, - }) { - Ok(Some(routed_urls)) => routed_urls, - // If the user has not defined custom routing with a `route_tasks` implementation, we - // default to round-robin task assignation from a randomized starting point. - Ok(None) => { - let start_idx = rand::rng().random_range(0..available_urls.len()); - (0..stage.tasks) - .map(|i| available_urls[(start_idx + i) % available_urls.len()].clone()) - .collect() - } - Err(e) => return Err(exec_datafusion_err!("error routing tasks to workers: {e}")), - }; - - if routed_urls.len() != stage.tasks { - return Err(exec_datafusion_err!( - "number of tasks ({}) was not equal to number of urls ({}) at execution time", - stage.tasks, - routed_urls.len() - )); - } - - let mut workers = Vec::with_capacity(stage.tasks); - for (i, routed_url) in routed_urls.into_iter().enumerate() { - workers.push(routed_url.clone()); - // Spawn a task that sends the subplan to the chosen URL. - // There will be as many spawned tasks as workers. - let (tx, worker_rx) = spawner.send_plan_task(Arc::clone(ctx), i, routed_url)?; - spawner.metrics_collection_task(i, worker_rx); - spawner.work_unit_feed_task(Arc::clone(ctx), i, tx)?; - } - - Ok(Transformed::yes(plan.with_input_stage(Stage::Remote( - RemoteStage { - query_id: stage.query_id, - num: stage.num, - workers, - }, - ))?)) - })?; - Ok(PreparedPlan { - plan: prepared.data, - join_set, - }) - } -} - -impl DisplayAs for DistributedExec { - fn fmt_as(&self, _: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result { - write!(f, "DistributedExec") - } -} - -impl ExecutionPlan for DistributedExec { - fn name(&self) -> &str { - "DistributedExec" - } - - fn as_any(&self) -> &dyn Any { - self - } - - fn properties(&self) -> &Arc { - self.plan.properties() - } - - fn children(&self) -> Vec<&Arc> { - vec![&self.plan] - } - - fn with_new_children( - self: Arc, - children: Vec>, - ) -> Result> { - Ok(Arc::new(DistributedExec { - plan: require_one_child(&children)?, - prepared_plan: self.prepared_plan.clone(), - metrics: self.metrics.clone(), - task_metrics: Arc::clone(&self.task_metrics), - })) - } - - fn execute( - &self, - partition: usize, - context: Arc, - ) -> Result { - if partition > 0 { - // The DistributedExec node calls try_assign_urls() lazily upon calling .execute(). This means - // that .execute() must only be called once, as we cannot afford to perform several - // random URL assignation while calling multiple partitions, as they will differ, - // producing an invalid plan - return exec_err!( - "DistributedExec must only have 1 partition, but it was called with partition index {partition}" - ); } - - let PreparedPlan { plan, join_set } = self.prepare_plan(&context)?; - { - let mut guard = self - .prepared_plan - .lock() - .map_err(|e| internal_datafusion_err!("Failed to lock prepared plan: {e}"))?; - *guard = Some(plan.clone()); - } - let mut builder = RecordBatchReceiverStreamBuilder::new(self.schema(), 1); - let tx = builder.tx(); - // Spawn the task that pulls data from child... - builder.spawn(async move { - let mut stream = plan.execute(partition, context)?; - while let Some(msg) = stream.next().await { - if tx.send(msg).await.is_err() { - break; // channel closed - } - } - Ok(()) - }); - // ...in parallel to the one that feeds the plan to workers. - builder.spawn(async move { - for res in join_set.join_all().await { - res?; - } - Ok(()) - }); - Ok(builder.build()) - } - - fn metrics(&self) -> Option { - Some(self.metrics.clone_inner()) } } -/// Metrics that measure network details about communications between [DistributedExec] and a -/// worker. -#[derive(Clone)] -struct CoordinatorToWorkerMetrics { - plan_bytes_sent: Count, - plan_send_latency: Arc, -} - /// Builder for the different kind of tasks that handle the communications between the /// [DistributedExec] node to the workers. This struct is responsible for instantiating the tasks /// as boxed futures so that [DistributedExec] can tokio-spawn them at will. @@ -359,27 +71,24 @@ struct CoordinatorToWorkerMetrics { /// - Building tasks that communicate a serialized plan to multiple workers for further execution. /// - Building tasks that stream partition feeds from local [WorkUnitFeedExec] nodes to their /// remote counterparts. -type WorkerResponseRx = - tokio::sync::mpsc::UnboundedReceiver>; - -struct CoordinatorToWorkerTaskSpawner<'a> { +pub(super) struct CoordinatorToWorkerTaskSpawner<'a> { plan: &'a Arc, plan_proto: Vec, query_id: Uuid, stage_id: usize, task_count: usize, metrics: &'a CoordinatorToWorkerMetrics, - task_metrics: &'a Arc, + task_metrics: &'a Option>, join_set: &'a mut JoinSet>, } impl<'a> CoordinatorToWorkerTaskSpawner<'a> { /// Builds a new [CoordinatorToWorkerTaskSpawner] based on the [Stage] that needs to be /// fanned out to multiple workers. - fn new( + pub(super) fn new( stage: &'a LocalStage, metrics: &'a CoordinatorToWorkerMetrics, - task_metrics: &'a Arc, + task_metrics: &'a Option>, codec: &'a dyn PhysicalExtensionCodec, join_set: &'a mut JoinSet>, ) -> Result { @@ -401,12 +110,15 @@ impl<'a> CoordinatorToWorkerTaskSpawner<'a> { /// Sends a serialized plan to a specific worker and sets up the bidirectional gRPC stream. /// Returns the sender for outbound coordinator-to-worker messages and the receiver for /// inbound worker-to-coordinator messages. - fn send_plan_task( + pub(super) fn send_plan_task( &mut self, ctx: Arc, task_i: usize, url: Url, - ) -> Result<(UnboundedSender, WorkerResponseRx)> { + ) -> Result<( + UnboundedSender, + UnboundedReceiver, + )> { let d_cfg = DistributedConfig::from_config_options(ctx.session_config().options())?; /// Searches recursively for nodes exposing [crate::WorkUnitFeed]s, and executes their /// feeds, keeping into account that some of them might be executed within a @@ -463,8 +175,8 @@ impl<'a> CoordinatorToWorkerTaskSpawner<'a> { stage_id: self.stage_id as u64, task_number: task_i as u64, }; - let msg = CoordinatorToWorkerMsg { - inner: Some(Inner::SetPlanRequest(SetPlanRequest { + let msg = pb::CoordinatorToWorkerMsg { + inner: Some(Inner::SetPlanRequest(pb::SetPlanRequest { plan_proto: self.plan_proto.clone(), task_count: self.task_count as u64, task_key: Some(task_key.clone()), @@ -503,8 +215,16 @@ impl<'a> CoordinatorToWorkerTaskSpawner<'a> { })?; metrics.plan_send_latency.record(&start); metrics.plan_bytes_sent.add(plan_size); - let mut stream = response.into_inner(); - while let Some(msg) = stream.next().await { + let mut worker_to_coordinator_stream = response.into_inner(); + while let Some(msg_or_err) = worker_to_coordinator_stream.next().await { + let msg = match msg_or_err { + Ok(msg) => msg, + Err(err) => { + return Err(tonic_status_to_datafusion_error(err).unwrap_or_else(|| { + exec_datafusion_err!("Unknown error on worker to coordinator stream") + })); + } + }; if worker_to_coordinator_tx.send(msg).is_err() { break; // receiver dropped } @@ -515,28 +235,29 @@ impl<'a> CoordinatorToWorkerTaskSpawner<'a> { Ok((coordinator_to_worker_tx, worker_to_coordinator_rx)) } - /// Receives worker-to-coordinator messages and inserts any collected metrics into the store. - /// Runs in a detached spawn so it is not cancelled when the output stream is dropped early. - fn metrics_collection_task( + pub(super) fn metrics_collection_task( &mut self, task_i: usize, - mut worker_to_coordinator_rx: WorkerResponseRx, + mut worker_to_coordinator_rx: UnboundedReceiver, ) { let task_key = TaskKey { query_id: serialize_uuid(&self.query_id), stage_id: self.stage_id as u64, task_number: task_i as u64, }; - let task_metrics_collection = Arc::clone(self.task_metrics); + let task_metrics = self.task_metrics.clone(); #[allow(clippy::disallowed_methods)] tokio::spawn(async move { - while let Some(Ok(msg)) = worker_to_coordinator_rx.recv().await { - let Some(worker_to_coordinator_msg::Inner::TaskMetrics(pre_order_metrics)) = - msg.inner - else { - continue; - }; - task_metrics_collection.insert(task_key.clone(), pre_order_metrics.metrics); + while let Some(msg) = worker_to_coordinator_rx.recv().await { + let Some(inner) = msg.inner else { continue }; + + match inner { + pb::worker_to_coordinator_msg::Inner::TaskMetrics(pre_order_metrics) => { + if let Some(task_metrics) = &task_metrics { + task_metrics.insert(task_key.clone(), pre_order_metrics.metrics); + } + } + } } }); } @@ -546,11 +267,11 @@ impl<'a> CoordinatorToWorkerTaskSpawner<'a> { /// The returned task is just a future that does nothing unless polled. /// /// Once this function is called, all the [WorkUnitFeedExec]s feeds will be consumed. - fn work_unit_feed_task( + pub(super) fn work_unit_feed_task( &mut self, ctx: Arc, task_i: usize, - tx: UnboundedSender, + tx: UnboundedSender, ) -> Result<()> { let d_cfg = DistributedConfig::from_config_options(ctx.session_config().options())?; /// Recurses into the plan looking for [WorkUnitFeedExec] nodes that should be handled by @@ -565,7 +286,7 @@ impl<'a> CoordinatorToWorkerTaskSpawner<'a> { dt_ctx: DistributedTaskContext, t_ctx: &Arc, d_cfg: &DistributedConfig, - tx: &UnboundedSender, + tx: &UnboundedSender, out: &mut Vec>>, ) -> Result<()> { let wuf = if let Some(wuf) = d_cfg @@ -611,8 +332,8 @@ impl<'a> CoordinatorToWorkerTaskSpawner<'a> { // so they must be encoded in order to send them over the wire. while let Some(data_or_err) = work_unit_feed.next().await { if tx - .send(CoordinatorToWorkerMsg { - inner: Some(Inner::WorkUnit(WorkUnit { + .send(pb::CoordinatorToWorkerMsg { + inner: Some(Inner::WorkUnit(pb::WorkUnit { id: serialize_uuid(&id), partition: partition as u64, body: data_or_err?.encode_to_bytes(), @@ -651,7 +372,7 @@ impl<'a> CoordinatorToWorkerTaskSpawner<'a> { /// DataFusion metrics system is pretty limited from an API standpoint. This intermediate struct /// bridges the gaps that are not satisfied by upstream API for measuring latency. -struct LatencyMetric { +pub(super) struct LatencyMetric { max: Time, avg: Time, max_latency_micros: AtomicU64, @@ -672,7 +393,7 @@ impl Drop for LatencyMetric { } impl LatencyMetric { - fn new( + pub(super) fn new( name: impl Display, builder: impl Fn(MetricBuilder) -> MetricBuilder, metrics: &ExecutionPlanMetricsSet, diff --git a/src/execution_plans/mod.rs b/src/execution_plans/mod.rs index 854e487a..aa09d8be 100644 --- a/src/execution_plans/mod.rs +++ b/src/execution_plans/mod.rs @@ -1,7 +1,6 @@ mod broadcast; mod children_isolator_union; mod common; -mod distributed; mod metrics; mod network_broadcast; mod network_coalesce; @@ -13,7 +12,6 @@ pub mod benchmarks; pub use broadcast::BroadcastExec; pub use children_isolator_union::ChildrenIsolatorUnionExec; -pub use distributed::{DistributedExec, MetricsStore}; pub(crate) use metrics::MetricsWrapperExec; pub use network_broadcast::NetworkBroadcastExec; pub use network_coalesce::NetworkCoalesceExec; diff --git a/src/lib.rs b/src/lib.rs index 6de07cc4..e93bb98d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -14,18 +14,20 @@ mod networking; mod observability; mod protobuf; pub use protobuf::DistributedCodec; +mod coordinator; #[cfg(any(feature = "integration", test))] pub mod test_utils; mod work_unit_feed; pub use arrow_ipc::CompressionType; +pub use coordinator::DistributedExec; pub use distributed_ext::DistributedExt; pub use distributed_planner::{ DistributedConfig, NetworkBoundary, NetworkBoundaryExt, SessionStateBuilderExt, TaskCountAnnotation, TaskEstimation, TaskEstimator, TaskRoutingContext, }; pub use execution_plans::{ - BroadcastExec, DistributedExec, NetworkBroadcastExec, NetworkCoalesceExec, NetworkShuffleExec, + BroadcastExec, NetworkBroadcastExec, NetworkCoalesceExec, NetworkShuffleExec, PartitionIsolatorExec, }; pub use metrics::{ diff --git a/src/metrics/task_metrics_collector.rs b/src/metrics/task_metrics_collector.rs index 611de51d..1cf6d90a 100644 --- a/src/metrics/task_metrics_collector.rs +++ b/src/metrics/task_metrics_collector.rs @@ -28,7 +28,7 @@ mod tests { use datafusion::arrow::record_batch::RecordBatch; use futures::StreamExt; - use crate::execution_plans::DistributedExec; + use crate::coordinator::DistributedExec; use crate::test_utils::in_memory_channel_resolver::{ InMemoryChannelResolver, InMemoryWorkerResolver, }; @@ -175,7 +175,12 @@ mod tests { // Ensure that there's metrics for each node for each task for each stage. for expected_task_key in expected_task_keys { - let actual_metrics = dist_exec.task_metrics.get(&expected_task_key).unwrap(); + let actual_metrics = dist_exec + .task_metrics + .as_ref() + .unwrap() + .get(&expected_task_key) + .unwrap(); // Verify that metrics were collected for all nodes. Some nodes may legitimately have // empty metrics (e.g., custom execution plans without metrics), which is fine - we @@ -291,6 +296,8 @@ mod tests { for expected_task_key in &expected_task_keys { let actual_metrics = dist_exec .task_metrics + .as_ref() + .unwrap() .get(expected_task_key) .unwrap_or_else(|| { panic!( @@ -342,7 +349,12 @@ mod tests { // Verify all nodes (including PartitionIsolatorExec) are preserved in metrics collection for expected_task_key in expected_task_keys { - let actual_metrics = dist_exec.task_metrics.get(&expected_task_key).unwrap(); + let actual_metrics = dist_exec + .task_metrics + .as_ref() + .unwrap() + .get(&expected_task_key) + .unwrap(); let stage = stages.get(&(expected_task_key.stage_id as usize)).unwrap(); let stage_plan = stage.local_plan().unwrap(); diff --git a/src/metrics/task_metrics_rewriter.rs b/src/metrics/task_metrics_rewriter.rs index 8bba62f9..4b442f0a 100644 --- a/src/metrics/task_metrics_rewriter.rs +++ b/src/metrics/task_metrics_rewriter.rs @@ -1,6 +1,7 @@ use crate::common::serialize_uuid; +use crate::coordinator::{DistributedExec, MetricsStore}; use crate::distributed_planner::NetworkBoundaryExt; -use crate::execution_plans::{DistributedExec, MetricsStore, MetricsWrapperExec}; +use crate::execution_plans::MetricsWrapperExec; use crate::metrics::DISTRIBUTED_DATAFUSION_TASK_ID_LABEL; use crate::metrics::collect_plan_metrics; use crate::metrics::proto::metrics_set_proto_to_df; @@ -54,7 +55,9 @@ pub async fn rewrite_distributed_plan_with_metrics( distributed_exec.wait_for_metrics().await; - let metrics_collection = Arc::clone(&distributed_exec.task_metrics); + let Some(metrics_collection) = distributed_exec.task_metrics.clone() else { + return Ok(plan); + }; let task_metrics = collect_plan_metrics(&prepared)?; @@ -270,7 +273,7 @@ pub fn stage_metrics_rewriter( #[cfg(test)] mod tests { - use crate::execution_plans::MetricsStore; + use crate::coordinator::MetricsStore; use crate::metrics::DISTRIBUTED_DATAFUSION_TASK_ID_LABEL; use crate::metrics::proto::{df_metrics_set_to_proto, metrics_set_proto_to_df}; use crate::metrics::task_metrics_rewriter::{ diff --git a/src/stage.rs b/src/stage.rs index 76b4e612..5c584d7d 100644 --- a/src/stage.rs +++ b/src/stage.rs @@ -1,4 +1,5 @@ -use crate::execution_plans::{DistributedExec, NetworkCoalesceExec}; +use crate::coordinator::DistributedExec; +use crate::execution_plans::NetworkCoalesceExec; use crate::metrics::DISTRIBUTED_DATAFUSION_TASK_ID_LABEL; use crate::{NetworkShuffleExec, PartitionIsolatorExec}; use datafusion::common::{HashMap, config_err}; diff --git a/src/test_utils/metrics.rs b/src/test_utils/metrics.rs index 0e1ac9b8..e8138917 100644 --- a/src/test_utils/metrics.rs +++ b/src/test_utils/metrics.rs @@ -1,4 +1,4 @@ -use crate::execution_plans::DistributedExec; +use crate::coordinator::DistributedExec; use crate::worker::generated::worker as pb; use datafusion::physical_plan::ExecutionPlan; use std::sync::Arc; diff --git a/src/test_utils/plans.rs b/src/test_utils/plans.rs index 35ecf23e..ba840774 100644 --- a/src/test_utils/plans.rs +++ b/src/test_utils/plans.rs @@ -1,8 +1,8 @@ use super::parquet::register_parquet_tables; use crate::NetworkBoundaryExt; use crate::common::serialize_uuid; +use crate::coordinator::DistributedExec; use crate::distributed_ext::DistributedExt; -use crate::execution_plans::DistributedExec; use crate::stage::Stage; use crate::test_utils::in_memory_channel_resolver::InMemoryWorkerResolver; use crate::worker::generated::worker::TaskKey; From 3872a3f6c8bc3eca79b344d5d12431dfeca844a6 Mon Sep 17 00:00:00 2001 From: Gabriel Musat Mestre Date: Thu, 14 May 2026 19:44:36 -0400 Subject: [PATCH 2/4] Add comment --- src/coordinator/distributed.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/coordinator/distributed.rs b/src/coordinator/distributed.rs index 916479aa..1c472b9a 100644 --- a/src/coordinator/distributed.rs +++ b/src/coordinator/distributed.rs @@ -20,7 +20,8 @@ use std::sync::Mutex; /// [ExecutionPlan] that executes the inner plan in distributed mode. /// Before executing it, two modifications are lazily performed on the plan: -/// 1. Assigns worker URLs to all the stages. A random set of URLs are sampled from the +/// 1. Assigns worker URLs to all the stages. Unless explicitly set in +/// [crate::TaskEstimator::route_tasks], a random set of URLs are sampled from the /// channel resolver and assigned to each task in each stage. /// 2. Encodes all the plans in protobuf format so that network boundary nodes can send them /// over the wire. From 716a985945704022a9f76e5004c3f91cce688d9a Mon Sep 17 00:00:00 2001 From: Gabriel Musat Mestre Date: Fri, 15 May 2026 13:40:56 +0200 Subject: [PATCH 3/4] Fix conflicts --- src/distributed_planner/distributed_query_planner.rs | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/src/distributed_planner/distributed_query_planner.rs b/src/distributed_planner/distributed_query_planner.rs index 5524a3ce..9593cd6a 100644 --- a/src/distributed_planner/distributed_query_planner.rs +++ b/src/distributed_planner/distributed_query_planner.rs @@ -2,7 +2,7 @@ use crate::distributed_planner::inject_network_boundaries::inject_network_bounda use crate::distributed_planner::insert_broadcast::insert_broadcast_execs; use crate::distributed_planner::partial_reduce_below_network_shuffles::partial_reduce_below_network_shuffles; use crate::distributed_planner::prepare_network_boundaries::prepare_network_boundaries; -use crate::{DistributedExec, NetworkBoundaryExt}; +use crate::{DistributedConfig, DistributedExec, NetworkBoundaryExt}; use async_trait::async_trait; use datafusion::common::tree_node::TreeNode; use datafusion::execution::SessionState; @@ -66,6 +66,8 @@ impl QueryPlanner for DistributedQueryPlanner { return Ok(original_plan); } + let d_cfg = DistributedConfig::from_config_options(session_state.config_options())?; + // The plan already contains network boundaries set by the user. Just ensure they have nice // unique identifiers for each stage, and move forward with it. if original_plan.exists(|plan| Ok(plan.is_network_boundary()))? { @@ -74,7 +76,9 @@ impl QueryPlanner for DistributedQueryPlanner { if !plan.exists(|plan| Ok(plan.is_network_boundary()))? { return Ok(plan); } - return Ok(Arc::new(DistributedExec::new(plan))); + return Ok(Arc::new( + DistributedExec::new(plan).with_metrics_collection(d_cfg.collect_metrics), + )); } let mut plan = Arc::clone(&original_plan); @@ -96,7 +100,9 @@ impl QueryPlanner for DistributedQueryPlanner { let plan = partial_reduce_below_network_shuffles(plan, cfg)?; - Ok(Arc::new(DistributedExec::new(plan))) + Ok(Arc::new( + DistributedExec::new(plan).with_metrics_collection(d_cfg.collect_metrics), + )) } } From f3e3e506619abad90ef9aadfefdfbddd193579e5 Mon Sep 17 00:00:00 2001 From: Gabriel Musat Mestre Date: Fri, 15 May 2026 13:48:23 +0200 Subject: [PATCH 4/4] Improve some comments --- src/coordinator/prepare_static_plan.rs | 5 +++-- src/coordinator/task_spawner.rs | 5 ++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/coordinator/prepare_static_plan.rs b/src/coordinator/prepare_static_plan.rs index 22aba4b3..ff072694 100644 --- a/src/coordinator/prepare_static_plan.rs +++ b/src/coordinator/prepare_static_plan.rs @@ -19,8 +19,9 @@ use rand::Rng; use std::sync::Arc; /// Prepares the distributed plan for execution, which implies: -/// 1. Perform some worker assignation, choosing randomly from the given URLs and assigning one -/// URL per task. +/// 1. Perform some worker URL assignation, choosing either: +/// - The URLs set by the user with [crate::TaskEstimator::route_tasks]. +/// - Randomly otherwise /// 2. Sending the sliced subplans to the assigned URLs. For each URL assigned to a task, a /// network call feeding the subplan is necessary. /// 3. In each network boundary, set the input plan to `None`. That way, network boundaries diff --git a/src/coordinator/task_spawner.rs b/src/coordinator/task_spawner.rs index 202f0001..f960da45 100644 --- a/src/coordinator/task_spawner.rs +++ b/src/coordinator/task_spawner.rs @@ -262,9 +262,8 @@ impl<'a> CoordinatorToWorkerTaskSpawner<'a> { }); } - /// Instantiates and returns the task that based on the different local [WorkUnitFeedExec] - /// nodes, sends their inner [WorkUnitFeeds] over the network to their remote counterparts. - /// The returned task is just a future that does nothing unless polled. + /// Launches the task that based on the different local [WorkUnitFeedExec] nodes, sends their + /// inner [WorkUnitFeeds] over the network to their remote counterparts. /// /// Once this function is called, all the [WorkUnitFeedExec]s feeds will be consumed. pub(super) fn work_unit_feed_task(