-
Notifications
You must be signed in to change notification settings - Fork 46
Refactor distributed.rs into coordinator module #426
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,194 @@ | ||
| use crate::common::{require_one_child, serialize_uuid}; | ||
| use crate::coordinator::metrics_store::MetricsStore; | ||
| use crate::coordinator::prepare_static_plan::prepare_static_plan; | ||
| use crate::distributed_planner::NetworkBoundaryExt; | ||
| use crate::worker::generated::worker::TaskKey; | ||
| use datafusion::common::internal_datafusion_err; | ||
| use datafusion::common::runtime::JoinSet; | ||
| use datafusion::common::tree_node::{TreeNode, TreeNodeRecursion}; | ||
| use datafusion::common::{Result, exec_err}; | ||
| use datafusion::execution::{SendableRecordBatchStream, TaskContext}; | ||
| use datafusion::physical_expr_common::metrics::MetricsSet; | ||
| use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet; | ||
| use datafusion::physical_plan::stream::RecordBatchReceiverStreamBuilder; | ||
| use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties}; | ||
| use futures::StreamExt; | ||
| use std::any::Any; | ||
| use std::fmt::Formatter; | ||
| use std::sync::Arc; | ||
| use std::sync::Mutex; | ||
|
|
||
| /// [ExecutionPlan] that executes the inner plan in distributed mode. | ||
| /// Before executing it, two modifications are lazily performed on the plan: | ||
| /// 1. Assigns worker URLs to all the stages. Unless explicitly set in | ||
| /// [crate::TaskEstimator::route_tasks], a random set of URLs are sampled from the | ||
| /// channel resolver and assigned to each task in each stage. | ||
| /// 2. Encodes all the plans in protobuf format so that network boundary nodes can send them | ||
| /// over the wire. | ||
| #[derive(Debug)] | ||
| pub struct DistributedExec { | ||
| plan: Arc<dyn ExecutionPlan>, | ||
| prepared_plan: Arc<Mutex<Option<Arc<dyn ExecutionPlan>>>>, | ||
| metrics: ExecutionPlanMetricsSet, | ||
| pub(crate) task_metrics: Option<Arc<MetricsStore>>, | ||
| } | ||
|
|
||
| pub(super) struct PreparedPlan { | ||
| pub(super) head_stage: Arc<dyn ExecutionPlan>, | ||
| pub(super) join_set: JoinSet<Result<()>>, | ||
| } | ||
|
|
||
| impl DistributedExec { | ||
| pub fn new(plan: Arc<dyn ExecutionPlan>) -> Self { | ||
| Self { | ||
| plan, | ||
| prepared_plan: Arc::new(Mutex::new(None)), | ||
| metrics: ExecutionPlanMetricsSet::new(), | ||
| task_metrics: None, | ||
| } | ||
| } | ||
|
|
||
| /// Enables task metrics collection from remote workers. | ||
| pub fn with_metrics_collection(mut self, enabled: bool) -> Self { | ||
| self.task_metrics = match enabled { | ||
| true => Some(Arc::new(MetricsStore::new())), | ||
| false => None, | ||
| }; | ||
| self | ||
| } | ||
|
|
||
| /// Waits until all worker tasks have reported their metrics back via the coordinator channel. | ||
| /// | ||
| /// Metrics are delivered asynchronously after query execution completes, so callers that need | ||
| /// complete metrics (e.g. for observability or display) should await this before inspecting | ||
| /// [`Self::task_metrics`] or calling [`rewrite_distributed_plan_with_metrics`]. | ||
| /// | ||
| /// [`rewrite_distributed_plan_with_metrics`]: crate::rewrite_distributed_plan_with_metrics | ||
| pub async fn wait_for_metrics(&self) { | ||
| let mut expected_keys: Vec<TaskKey> = Vec::new(); | ||
| let Some(task_metrics) = &self.task_metrics else { | ||
| return; | ||
| }; | ||
| let _ = self.plan.apply(|plan| { | ||
| if let Some(boundary) = plan.as_network_boundary() { | ||
| let stage = boundary.input_stage(); | ||
| for i in 0..stage.task_count() { | ||
| expected_keys.push(TaskKey { | ||
| query_id: serialize_uuid(&stage.query_id()), | ||
| stage_id: stage.num() as u64, | ||
| task_number: i as u64, | ||
| }); | ||
| } | ||
| } | ||
| Ok(TreeNodeRecursion::Continue) | ||
| }); | ||
| if expected_keys.is_empty() { | ||
| return; | ||
| } | ||
| let mut rx = task_metrics.rx.clone(); | ||
| let _ = rx | ||
| .wait_for(|map| expected_keys.iter().all(|key| map.contains_key(key))) | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could have 100s of tasks in theory. Maybe just check the length? It's a hashmap so no duplicates.
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. On every update we would iterate over the length of the map. So for 100 tasks it's like 100 * 99 iterations or something in total
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same as above, this is not new, just copy-pasted from the previous file, for the sake of keeping this a clean copy-paste refactor, I'd like to avoid introducing unrelated changes. |
||
| .await; | ||
| } | ||
|
|
||
| /// Returns the plan which is lazily prepared on `execute()` and actually gets executed. | ||
| /// It is updated on every call to `execute()`. Returns an error if `.execute()` has not been | ||
| /// called. | ||
| pub(crate) fn prepared_plan(&self) -> Result<Arc<dyn ExecutionPlan>> { | ||
| self.prepared_plan | ||
| .lock() | ||
| .map_err(|e| internal_datafusion_err!("Failed to lock prepared plan: {}", e))? | ||
| .clone() | ||
| .ok_or_else(|| { | ||
| internal_datafusion_err!("No prepared plan found. Was execute() called?") | ||
| }) | ||
| } | ||
| } | ||
|
|
||
| impl DisplayAs for DistributedExec { | ||
| fn fmt_as(&self, _: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result { | ||
| write!(f, "DistributedExec") | ||
| } | ||
| } | ||
|
|
||
| impl ExecutionPlan for DistributedExec { | ||
| fn name(&self) -> &str { | ||
| "DistributedExec" | ||
| } | ||
|
|
||
| fn as_any(&self) -> &dyn Any { | ||
| self | ||
| } | ||
|
|
||
| fn properties(&self) -> &Arc<PlanProperties> { | ||
| self.plan.properties() | ||
| } | ||
|
|
||
| fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> { | ||
| vec![&self.plan] | ||
| } | ||
|
|
||
| fn with_new_children( | ||
| self: Arc<Self>, | ||
| children: Vec<Arc<dyn ExecutionPlan>>, | ||
| ) -> Result<Arc<dyn ExecutionPlan>> { | ||
| Ok(Arc::new(DistributedExec { | ||
| plan: require_one_child(&children)?, | ||
| prepared_plan: self.prepared_plan.clone(), | ||
| metrics: self.metrics.clone(), | ||
| task_metrics: self.task_metrics.clone(), | ||
| })) | ||
| } | ||
|
|
||
| fn execute( | ||
| &self, | ||
| partition: usize, | ||
| context: Arc<TaskContext>, | ||
| ) -> Result<SendableRecordBatchStream> { | ||
| if partition > 0 { | ||
| // The DistributedExec node calls try_assign_urls() lazily upon calling .execute(). This means | ||
| // that .execute() must only be called once, as we cannot afford to perform several | ||
| // random URL assignation while calling multiple partitions, as they will differ, | ||
| // producing an invalid plan | ||
| return exec_err!( | ||
| "DistributedExec must only have 1 partition, but it was called with partition index {partition}" | ||
| ); | ||
| } | ||
|
|
||
| let PreparedPlan { | ||
| head_stage, | ||
| join_set, | ||
| } = prepare_static_plan(&self.plan, &self.metrics, &self.task_metrics, &context)?; | ||
| { | ||
| let mut guard = self | ||
| .prepared_plan | ||
| .lock() | ||
| .map_err(|e| internal_datafusion_err!("Failed to lock prepared plan: {e}"))?; | ||
| *guard = Some(head_stage.clone()); | ||
| } | ||
| let mut builder = RecordBatchReceiverStreamBuilder::new(self.schema(), 1); | ||
| let tx = builder.tx(); | ||
| // Spawn the task that pulls data from child... | ||
| builder.spawn(async move { | ||
| let mut stream = head_stage.execute(partition, context)?; | ||
| while let Some(msg) = stream.next().await { | ||
| if tx.send(msg).await.is_err() { | ||
| break; // channel closed | ||
| } | ||
| } | ||
| Ok(()) | ||
| }); | ||
| // ...in parallel to the one that feeds the plan to workers. | ||
| builder.spawn(async move { | ||
| for res in join_set.join_all().await { | ||
| res?; | ||
| } | ||
| Ok(()) | ||
| }); | ||
| Ok(builder.build()) | ||
| } | ||
|
|
||
| fn metrics(&self) -> Option<MetricsSet> { | ||
| Some(self.metrics.clone_inner()) | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,45 @@ | ||
| use crate::TaskKey; | ||
| use datafusion::common::HashMap; | ||
| use tokio::sync::watch; | ||
|
|
||
| type MetricsMap = HashMap<TaskKey, Vec<crate::worker::generated::worker::MetricsSet>>; | ||
|
|
||
| /// Stores the metrics collected from all worker tasks, and notifies waiters when new entries arrive. | ||
| #[derive(Debug, Clone)] | ||
| pub struct MetricsStore { | ||
| tx: watch::Sender<MetricsMap>, | ||
| pub(crate) rx: watch::Receiver<MetricsMap>, | ||
| } | ||
|
|
||
| impl MetricsStore { | ||
| pub(crate) fn new() -> Self { | ||
| let (tx, rx) = watch::channel(HashMap::new()); | ||
| Self { tx, rx } | ||
| } | ||
|
|
||
| pub(crate) fn insert( | ||
| &self, | ||
| key: TaskKey, | ||
| metrics: Vec<crate::worker::generated::worker::MetricsSet>, | ||
| ) { | ||
| self.tx.send_modify(|map| { | ||
| map.insert(key, metrics); | ||
| }); | ||
|
gabotechs marked this conversation as resolved.
|
||
| } | ||
|
|
||
| pub(crate) fn get( | ||
| &self, | ||
| key: &TaskKey, | ||
| ) -> Option<Vec<crate::worker::generated::worker::MetricsSet>> { | ||
| self.rx.borrow().get(key).cloned() | ||
| } | ||
|
|
||
| #[cfg(test)] | ||
| pub(crate) fn from_entries( | ||
| entries: impl IntoIterator<Item = (TaskKey, Vec<crate::worker::generated::worker::MetricsSet>)>, | ||
| ) -> Self { | ||
| let map: HashMap<_, _> = entries.into_iter().collect(); | ||
| let (tx, rx) = watch::channel(map); | ||
| Self { tx, rx } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,7 @@ | ||
| mod distributed; | ||
| mod metrics_store; | ||
| mod prepare_static_plan; | ||
| mod task_spawner; | ||
|
|
||
| pub use distributed::DistributedExec; | ||
| pub(crate) use metrics_store::MetricsStore; |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,113 @@ | ||
| use crate::coordinator::MetricsStore; | ||
| use crate::coordinator::distributed::PreparedPlan; | ||
| use crate::coordinator::task_spawner::{ | ||
| CoordinatorToWorkerMetrics, CoordinatorToWorkerTaskSpawner, | ||
| }; | ||
| use crate::distributed_planner::get_distributed_task_estimator; | ||
| use crate::stage::RemoteStage; | ||
| use crate::{ | ||
| DistributedCodec, NetworkBoundaryExt, Stage, TaskRoutingContext, | ||
| get_distributed_worker_resolver, | ||
| }; | ||
| use datafusion::common::runtime::JoinSet; | ||
| use datafusion::common::tree_node::{Transformed, TreeNode}; | ||
| use datafusion::common::{Result, exec_err}; | ||
| use datafusion::execution::TaskContext; | ||
| use datafusion::physical_plan::ExecutionPlan; | ||
| use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet; | ||
| use rand::Rng; | ||
| use std::sync::Arc; | ||
|
|
||
| /// Prepares the distributed plan for execution, which implies: | ||
| /// 1. Perform some worker URL assignation, choosing either: | ||
| /// - The URLs set by the user with [crate::TaskEstimator::route_tasks]. | ||
| /// - Randomly otherwise | ||
| /// 2. Sending the sliced subplans to the assigned URLs. For each URL assigned to a task, a | ||
| /// network call feeding the subplan is necessary. | ||
| /// 3. In each network boundary, set the input plan to `None`. That way, network boundaries | ||
| /// become nodes without children and traversing them will not go further down in. | ||
| /// 4. Spawn a background task per worker that waits for the worker to finish and collects | ||
| /// its metrics into [DistributedExec::task_metrics] via the coordinator channel. | ||
| pub(super) fn prepare_static_plan( | ||
| base_plan: &Arc<dyn ExecutionPlan>, | ||
| metrics: &ExecutionPlanMetricsSet, | ||
| task_metrics: &Option<Arc<MetricsStore>>, | ||
| ctx: &Arc<TaskContext>, | ||
| ) -> Result<PreparedPlan> { | ||
| let worker_resolver = get_distributed_worker_resolver(ctx.session_config())?; | ||
| let codec = DistributedCodec::new_combined_with_user(ctx.session_config()); | ||
|
|
||
| let available_urls = worker_resolver.get_urls()?; | ||
|
|
||
| let metrics = CoordinatorToWorkerMetrics::new(metrics); | ||
|
|
||
| let mut join_set = JoinSet::new(); | ||
| let prepared = Arc::clone(base_plan).transform_up(|plan| { | ||
| // The following logic is just applied on network boundaries. | ||
| let Some(plan) = plan.as_network_boundary() else { | ||
| return Ok(Transformed::no(plan)); | ||
| }; | ||
|
|
||
| let Stage::Local(stage) = plan.input_stage() else { | ||
| return exec_err!("Input stage from network boundary was not in Local state"); | ||
| }; | ||
|
|
||
| let task_estimator = get_distributed_task_estimator(ctx.session_config())?; | ||
|
|
||
| let mut spawner = CoordinatorToWorkerTaskSpawner::new( | ||
| stage, | ||
| &metrics, | ||
| task_metrics, | ||
| &codec, | ||
| &mut join_set, | ||
| )?; | ||
|
|
||
| let routed_urls = match task_estimator.route_tasks(&TaskRoutingContext { | ||
| task_ctx: Arc::clone(ctx), | ||
| plan: &stage.plan, | ||
| task_count: stage.tasks, | ||
| available_urls: &available_urls, | ||
| }) { | ||
| Ok(Some(routed_urls)) => routed_urls, | ||
| // If the user has not defined custom routing with a `route_tasks` implementation, we | ||
| // default to round-robin task assignation from a randomized starting point. | ||
| Ok(None) => { | ||
| let start_idx = rand::rng().random_range(0..available_urls.len()); | ||
| (0..stage.tasks) | ||
| .map(|i| available_urls[(start_idx + i) % available_urls.len()].clone()) | ||
| .collect() | ||
| } | ||
| Err(e) => return exec_err!("error routing tasks to workers: {e}"), | ||
| }; | ||
|
|
||
| if routed_urls.len() != stage.tasks { | ||
| return exec_err!( | ||
| "number of tasks ({}) was not equal to number of urls ({}) at execution time", | ||
| stage.tasks, | ||
| routed_urls.len() | ||
| ); | ||
| } | ||
|
|
||
| let mut workers = Vec::with_capacity(stage.tasks); | ||
| for (i, routed_url) in routed_urls.into_iter().enumerate() { | ||
| workers.push(routed_url.clone()); | ||
| // Spawn a task that sends the subplan to the chosen URL. | ||
| // There will be as many spawned tasks as workers. | ||
| let (tx, worker_rx) = spawner.send_plan_task(Arc::clone(ctx), i, routed_url)?; | ||
| spawner.metrics_collection_task(i, worker_rx); | ||
| spawner.work_unit_feed_task(Arc::clone(ctx), i, tx)?; | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Tweaked the comment.
The traversal is different depending on the This is anyways not new code, It's just a dumb copy-paste from the old monolithic |
||
| } | ||
|
|
||
| Ok(Transformed::yes(plan.with_input_stage(Stage::Remote( | ||
| RemoteStage { | ||
| query_id: stage.query_id, | ||
| num: stage.num, | ||
| workers, | ||
| }, | ||
| ))?)) | ||
| })?; | ||
| Ok(PreparedPlan { | ||
| head_stage: prepared.data, | ||
| join_set, | ||
| }) | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit allocate expected keys after checking if we need to collect metrics.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not something new, this is just copy-pasted from the old file.
I'd like to avoid making code changes for a PR that is supposed to be just a mechanical copy-paste