diff --git a/benchmarks/cdk/bin/worker.rs b/benchmarks/cdk/bin/worker.rs index ea7cbaab..1768e299 100644 --- a/benchmarks/cdk/bin/worker.rs +++ b/benchmarks/cdk/bin/worker.rs @@ -2,6 +2,7 @@ use async_trait::async_trait; use aws_config::BehaviorVersion; use aws_sdk_ec2::Client as Ec2Client; use axum::{Json, Router, extract::Query, http::StatusCode, routing::get}; +use datafusion::catalog::memory::DataSourceExec; use datafusion::common::DataFusionError; use datafusion::common::instant::Instant; use datafusion::common::runtime::SpawnedTask; @@ -9,9 +10,12 @@ use datafusion::execution::SessionStateBuilder; use datafusion::execution::runtime_env::RuntimeEnv; use datafusion::physical_plan::execute_stream; use datafusion::prelude::SessionContext; +use datafusion_distributed::test_utils::work_unit_file_scan::{ + WorkUnitFileScanCodec, WorkUnitFileScanConfig, WorkUnitFileScanTaskEstimator, +}; use datafusion_distributed::{ ChannelResolver, DistributedExt, DistributedMetricsFormat, SessionStateBuilderExt, Worker, - WorkerResolver, display_plan_ascii, get_distributed_channel_resolver, + WorkerQueryContext, WorkerResolver, display_plan_ascii, get_distributed_channel_resolver, get_distributed_worker_resolver, rewrite_distributed_plan_with_metrics, }; use futures::{StreamExt, TryFutureExt}; @@ -90,17 +94,34 @@ async fn main() -> Result<(), Box> { let runtime_env = Arc::new(RuntimeEnv::default()); runtime_env.register_object_store(&s3_url, s3); - let state = SessionStateBuilder::new() + let state_builder = SessionStateBuilder::new() .with_default_features() .with_runtime_env(Arc::clone(&runtime_env)) .with_distributed_worker_resolver(Ec2WorkerResolver::new()) .with_distributed_planner() .with_distributed_broadcast_joins(cmd.broadcast_joins)? - .build(); + // Uncomment for enabling WorkUnitFileScans. + // .with_physical_optimizer_rule(Arc::new(WorkUnitFileScanRule)) + .with_distributed_user_codec(WorkUnitFileScanCodec) + .with_distributed_task_estimator(WorkUnitFileScanTaskEstimator) + .with_distributed_work_unit_feed(|dse: &DataSourceExec| { + dse.data_source() + .as_any() + .downcast_ref::() + .map(|v| &v.feed) + }); + let state = state_builder.build(); let ctx = SessionContext::from(state); let ctx_clone = ctx.clone(); - let worker = Worker::default().with_runtime_env(runtime_env); + let worker = Worker::from_session_builder(|ctx: WorkerQueryContext| async move { + Ok(ctx + .builder + .with_distributed_user_codec(WorkUnitFileScanCodec) + .build()) + }) + .with_runtime_env(runtime_env); + let http_server = axum::serve( listener, Router::new() diff --git a/benchmarks/src/run.rs b/benchmarks/src/run.rs index 45836f42..7dd34353 100644 --- a/benchmarks/src/run.rs +++ b/benchmarks/src/run.rs @@ -22,17 +22,23 @@ use datafusion::common::instant::Instant; use datafusion::common::tree_node::{Transformed, TreeNode}; use datafusion::common::utils::get_available_parallelism; use datafusion::common::{config_err, exec_err, not_impl_err}; +use datafusion::datasource::source::DataSourceExec; use datafusion::error::{DataFusionError, Result}; use datafusion::execution::SessionStateBuilder; use datafusion::physical_plan::display::DisplayableExecutionPlan; use datafusion::physical_plan::{collect, displayable}; use datafusion::prelude::*; use datafusion_distributed::test_utils::localhost::LocalHostWorkerResolver; +use datafusion_distributed::test_utils::work_unit_file_scan::{ + WorkUnitFileScanCodec, WorkUnitFileScanConfig, WorkUnitFileScanRule, + WorkUnitFileScanTaskEstimator, +}; use datafusion_distributed::{DistributedExt, NetworkBoundaryExt, SessionStateBuilderExt, Worker}; use datafusion_distributed_benchmarks::datasets::{clickbench, register_tables, tpcds, tpch}; use std::error::Error; use std::fs; use std::path::PathBuf; +use std::sync::Arc; use std::time::Duration; use structopt::StructOpt; use tokio::net::TcpListener; @@ -115,6 +121,12 @@ pub struct RunOpt { /// Activate debug mode to see more details #[structopt(short, long)] debug: bool, + + /// Replace each `FileScanConfig` data source with a `WorkUnitFileScanConfig` + /// that streams its `FileGroup`s through the work-unit feed pipeline. Used + /// to measure the latency overhead introduced by that path. + #[structopt(long = "work-unit-file-scan")] + work_unit_file_scan: bool, } fn queries_for_dataset(dataset: &str) -> Result, DataFusionError> { @@ -157,9 +169,20 @@ impl RunOpt { let listener = TcpListener::bind(format!("127.0.0.1:{port}")).await?; println!("Listening on {}...", listener.local_addr().unwrap()); let incoming = tokio_stream::wrappers::TcpListenerStream::new(listener); + // Workers need to be able to decode the WorkUnitFileScan plan + // node when the feature is on. The codec is registered via a + // session builder so it is installed on every worker session. + let worker = Worker::from_session_builder( + |ctx: datafusion_distributed::WorkerQueryContext| async move { + Ok(ctx + .builder + .with_distributed_user_codec(WorkUnitFileScanCodec) + .build()) + }, + ); Ok::<_, Box>( Server::builder() - .add_service(Worker::default().into_worker_server()) + .add_service(worker.into_worker_server()) .serve_with_incoming(incoming) .await?, ) @@ -171,7 +194,7 @@ impl RunOpt { } async fn run_local(self) -> Result<()> { - let state = SessionStateBuilder::new() + let mut builder = SessionStateBuilder::new() .with_default_features() .with_config(self.config()?) .with_distributed_worker_resolver(LocalHostWorkerResolver::new(self.workers.clone())) @@ -192,7 +215,20 @@ impl RunOpt { .with_distributed_broadcast_joins(self.broadcast_joins)? .with_distributed_metrics_collection(self.collect_metrics)? .with_distributed_max_tasks_per_stage(self.max_tasks_per_stage)? - .build(); + .with_distributed_user_codec(WorkUnitFileScanCodec) + .with_distributed_task_estimator(WorkUnitFileScanTaskEstimator) + .with_distributed_work_unit_feed(|dse: &DataSourceExec| { + dse.data_source() + .as_any() + .downcast_ref::() + .map(|v| &v.feed) + }); + + if self.work_unit_file_scan { + builder = builder.with_physical_optimizer_rule(Arc::new(WorkUnitFileScanRule)) + } + + let state = builder.build(); let ctx = SessionContext::new_with_state(state); register_tables(&ctx, &self.get_path()?).await?; diff --git a/src/distributed_planner/task_estimator.rs b/src/distributed_planner/task_estimator.rs index 60534e3a..1f894f27 100644 --- a/src/distributed_planner/task_estimator.rs +++ b/src/distributed_planner/task_estimator.rs @@ -235,7 +235,7 @@ pub(crate) fn get_distributed_task_estimator( /// [DistributedConfig].`files_per_task` field and assigns as many tasks as needed so that /// no task handles more than the configured files. #[derive(Debug)] -struct FileScanConfigTaskEstimator; +pub(crate) struct FileScanConfigTaskEstimator; impl TaskEstimator for FileScanConfigTaskEstimator { fn task_estimation( diff --git a/src/test_utils/mod.rs b/src/test_utils/mod.rs index b71a3dda..dc66f977 100644 --- a/src/test_utils/mod.rs +++ b/src/test_utils/mod.rs @@ -9,3 +9,4 @@ pub mod property_based; pub mod routing; pub mod session_context; pub mod test_work_unit_feed; +pub mod work_unit_file_scan; diff --git a/src/test_utils/work_unit_file_scan.rs b/src/test_utils/work_unit_file_scan.rs new file mode 100644 index 00000000..52df7476 --- /dev/null +++ b/src/test_utils/work_unit_file_scan.rs @@ -0,0 +1,433 @@ +//! WorkUnit-based [`FileScanConfig`] alternative for benchmarking purposes. +//! +//! Streams the per-partition `PartitionedFile`s through a [`WorkUnitFeed`] +//! instead of embedding them directly in the serialized plan. Used to measure +//! the latency impact of routing file scan inputs through the work unit +//! pipeline as compared to the regular [`FileScanConfig`] path. + +use crate::{DistributedConfig, TaskCountAnnotation, TaskEstimation, TaskEstimator}; +use crate::{WorkUnitFeed, WorkUnitFeedProto, WorkUnitFeedProvider}; +use datafusion::catalog::memory::DataSourceExec; +use datafusion::common::tree_node::{Transformed, TreeNode}; +use datafusion::common::{Result, internal_datafusion_err}; +use datafusion::common::{Statistics, internal_err}; +use datafusion::config::ConfigOptions; +use datafusion::datasource::physical_plan::{FileGroup, FileScanConfig, FileScanConfigBuilder}; +use datafusion::datasource::source::DataSource; +use datafusion::execution::{SendableRecordBatchStream, TaskContext}; +use datafusion::physical_expr::{EquivalenceProperties, LexOrdering}; +use datafusion::physical_optimizer::PhysicalOptimizerRule; +use datafusion::physical_plan::execution_plan::SchedulingType; +use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet; +use datafusion::physical_plan::stream::RecordBatchStreamAdapter; +use datafusion::physical_plan::{DisplayFormatType, ExecutionPlan, Partitioning}; +use datafusion_proto::physical_plan::{ + AsExecutionPlan, DefaultPhysicalExtensionCodec, PhysicalExtensionCodec, +}; +use datafusion_proto::protobuf as df_proto; +use datafusion_proto::protobuf::proto_error; +use futures::stream::BoxStream; +use futures::{StreamExt, TryStreamExt}; +use prost::Message; +use std::any::Any; +use std::fmt::{Debug, Formatter}; +use std::sync::Arc; + +/// Per-partition work unit emitted by [`FileScanWorkUnitProvider`]: the (encoded) +/// `PartitionedFile` that the receiving worker partition should scan. `None` +/// means "no file for this slot" — used as padding when the total file count +/// isn't divisible by `task_count` so the global feed indexing layout +/// (`task_index * partitions_per_task + p`) stays consistent across tasks. +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct FileScanWorkUnit { + #[prost(message, tag = "1")] + pub file: Option, +} + +/// Local provider that holds the per-partition `PartitionedFile` assignment +/// for a [`WorkUnitFileScanConfig`]. It is only invoked on the coordinator +/// (the worker side gets a `RemoteFeedProvider` automatically when the plan is +/// decoded). +#[derive(Debug, Clone)] +pub struct FileScanWorkUnitProvider { + /// One entry per global feed partition (`partitions_per_task * task_count`). + /// `None` slots are sent as empty work units so the worker emits an empty + /// stream for that partition. + file_groups: Vec, + metrics: ExecutionPlanMetricsSet, +} + +impl FileScanWorkUnitProvider { + pub fn new(file_groups: Vec) -> Self { + Self { + file_groups, + metrics: ExecutionPlanMetricsSet::new(), + } + } +} + +impl WorkUnitFeedProvider for FileScanWorkUnitProvider { + type WorkUnit = FileScanWorkUnit; + + fn feed( + &self, + partition: usize, + _ctx: Arc, + ) -> Result>> { + let Some(file_group) = self.file_groups.get(partition) else { + return Ok(futures::stream::empty().boxed()); + }; + let stream = futures::stream::iter(file_group.files().to_vec()).map(|file| { + let file_proto: df_proto::PartitionedFile = (&file) + .try_into() + .map_err(|e| internal_datafusion_err!("{e}"))?; + Ok(FileScanWorkUnit { + file: Some(file_proto), + }) + }); + Ok(stream.boxed()) + } +} + +/// [`DataSource`] that defers obtaining its [`PartitionedFile`](datafusion_datasource::PartitionedFile) +/// until execution time, pulling it off a [`WorkUnitFeed`] before delegating +/// the actual file scan to the wrapped [`FileScanConfig`]. The wrapped config +/// carries no file groups while it is being serialized — the per-partition +/// file travels through the feed instead. +#[derive(Debug, Clone)] +pub struct WorkUnitFileScanConfig { + pub feed: WorkUnitFeed, + /// Underlying [`FileScanConfig`] used as a template. `file_groups` is left + /// empty here; the per-partition assignment arrives via `feed`. + pub fsc: FileScanConfig, + /// Number of output partitions exposed to the rest of the plan. On the + /// coordinator this is per-task partitions before scaling; on a worker this + /// is the per-task partition count. + pub partitions: usize, +} + +impl WorkUnitFileScanConfig { + pub fn new(mut fsc: FileScanConfig) -> Self { + let file_groups = std::mem::take(&mut fsc.file_groups); + + Self { + partitions: file_groups.len(), + feed: WorkUnitFeed::new(FileScanWorkUnitProvider::new(file_groups)), + fsc, + } + } +} + +impl DataSource for WorkUnitFileScanConfig { + fn open( + &self, + partition: usize, + context: Arc, + ) -> Result { + let inner = self.fsc.clone(); + let schema = inner.projected_schema()?; + + let stream = self + .feed + .feed(partition, Arc::clone(&context))? + .map(move |work_unit| { + let file = work_unit?.file.expect("missing file"); + + let single_file_group = df_proto::FileGroup { files: vec![file] }; + let single_file_group = FileGroup::try_from(&single_file_group)?; + + let new_config = FileScanConfigBuilder::from(inner.clone()) + .with_file_groups(vec![single_file_group]) + .build(); + new_config.open(0, Arc::clone(&context)) + }) + .try_flatten(); + Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream))) + } + + fn as_any(&self) -> &dyn Any { + self + } + + fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result { + match t { + DisplayFormatType::Default | DisplayFormatType::Verbose => { + write!(f, "WorkUnitFileScan: ",)?; + self.fsc.fmt_as(t, f) + } + DisplayFormatType::TreeRender => { + writeln!(f, "WorkUnitFileScan")?; + Ok(()) + } + } + } + + fn repartitioned( + &self, + _target_partitions: usize, + _repartition_file_min_size: usize, + _output_ordering: Option, + ) -> Result>> { + // Repartitioning is handled by the WorkUnitFileScanTaskEstimator, not + // by DataFusion's repartition pass. + Ok(None) + } + + fn output_partitioning(&self) -> Partitioning { + Partitioning::UnknownPartitioning(self.partitions) + } + + fn eq_properties(&self) -> EquivalenceProperties { + self.fsc.eq_properties() + } + + fn scheduling_type(&self) -> SchedulingType { + SchedulingType::Cooperative + } + + fn partition_statistics(&self, _partition: Option) -> Result { + // Statistics for a specific partition are not known at planning time + // because we don't know which file will land on it; fall back to the + // aggregate template statistics. + self.fsc.partition_statistics(None) + } + + fn with_fetch(&self, limit: Option) -> Option> { + let new_template = FileScanConfigBuilder::from(self.fsc.clone()) + .with_limit(limit) + .build(); + Some(Arc::new(WorkUnitFileScanConfig { + feed: self.feed.clone(), + fsc: new_template, + partitions: self.partitions, + })) + } + + fn fetch(&self) -> Option { + self.fsc.limit + } + + fn metrics(&self) -> ExecutionPlanMetricsSet { + self.feed + .inner() + .map(|p| p.metrics.clone()) + .unwrap_or_default() + } + + fn try_swapping_with_projection( + &self, + _projection: &datafusion::physical_expr::projection::ProjectionExprs, + ) -> Result>> { + Ok(None) + } +} + +/// Encodes/decodes a [`DataSourceExec`] wrapping a [`WorkUnitFileScanConfig`]. +/// The template [`FileScanConfig`] (with empty `file_groups`) is serialized via +/// DataFusion's default codec — we rely on it being round-trippable for the +/// underlying `FileSource` (Parquet, CSV, etc.). +#[derive(Debug)] +pub struct WorkUnitFileScanCodec; + +#[derive(Clone, PartialEq, ::prost::Message)] +struct WorkUnitFileScanProto { + /// Encoded [`df_proto::PhysicalPlanNode`] representing the template + /// `DataSourceExec(FileScanConfig)` (with empty `file_groups`). We keep it + /// as raw bytes so we can use DataFusion's default protobuf converter to + /// roundtrip it without re-implementing every `FileSource` codec here. + #[prost(bytes, tag = "1")] + inner: Vec, + #[prost(message, optional, tag = "2")] + feed: Option, + #[prost(uint64, tag = "3")] + partitions: u64, +} + +impl PhysicalExtensionCodec for WorkUnitFileScanCodec { + fn try_decode( + &self, + buf: &[u8], + inputs: &[Arc], + ctx: &TaskContext, + ) -> Result> { + if !inputs.is_empty() { + return internal_err!( + "WorkUnitFileScanConfig should have no children, got {}", + inputs.len() + ); + } + let proto = WorkUnitFileScanProto::decode(buf) + .map_err(|e| proto_error(format!("Failed to decode WorkUnitFileScanProto: {e}")))?; + + let plan_node = df_proto::PhysicalPlanNode::decode(&proto.inner[..]) + .map_err(|e| proto_error(format!("Failed to decode template plan: {e}")))?; + let template_plan = + plan_node.try_into_physical_plan(ctx, &DefaultPhysicalExtensionCodec {})?; + let Some(dse) = template_plan.as_any().downcast_ref::() else { + return Err(proto_error( + "Expected the WorkUnitFileScan template plan to be a DataSourceExec", + )); + }; + let Some(inner) = dse.data_source().as_any().downcast_ref::() else { + return Err(proto_error( + "Expected the WorkUnitFileScan template DataSource to be a FileScanConfig", + )); + }; + let Some(feed_proto) = proto.feed else { + return Err(proto_error("WorkUnitFileScanProto missing feed")); + }; + Ok(DataSourceExec::from_data_source(WorkUnitFileScanConfig { + feed: WorkUnitFeed::from_proto(feed_proto)?, + fsc: inner.clone(), + partitions: proto.partitions as usize, + })) + } + + fn try_encode(&self, node: Arc, buf: &mut Vec) -> Result<()> { + let Some(dse) = node.as_any().downcast_ref::() else { + return internal_err!( + "Expected DataSourceExec wrapping a WorkUnitFileScanConfig, got {}", + node.name() + ); + }; + let Some(wfs) = dse + .data_source() + .as_any() + .downcast_ref::() + else { + return internal_err!("Expected the inner DataSource to be a WorkUnitFileScanConfig"); + }; + + // Encode the template DataSourceExec(FileScanConfig) as a regular + // PhysicalPlanNode using DataFusion's default codec. + let plan_node = df_proto::PhysicalPlanNode::try_from_physical_plan( + DataSourceExec::from_data_source(wfs.fsc.clone()), + &DefaultPhysicalExtensionCodec {}, + )?; + let mut inner_bytes = Vec::new(); + plan_node.encode(&mut inner_bytes).map_err(|e| { + proto_error(format!( + "Failed to encode WorkUnitFileScan template plan: {e}" + )) + })?; + + let proto = WorkUnitFileScanProto { + inner: inner_bytes, + feed: Some(wfs.feed.to_proto()), + partitions: wfs.partitions as u64, + }; + proto + .encode(buf) + .map_err(|e| proto_error(format!("Failed to encode WorkUnitFileScanProto: {e}"))) + } +} + +/// [`PhysicalOptimizerRule`] that rewrites every leaf `DataSourceExec` +/// containing a [`FileScanConfig`] to one that wraps a +/// [`WorkUnitFileScanConfig`]. Every individual file from every original +/// `FileGroup` is moved into a separate slot of the [`WorkUnitFeed`] (one +/// `PartitionedFile` per output partition), and the template config is left +/// with empty file groups. +#[derive(Debug, Default)] +pub struct WorkUnitFileScanRule; + +impl PhysicalOptimizerRule for WorkUnitFileScanRule { + fn optimize( + &self, + plan: Arc, + _config: &ConfigOptions, + ) -> Result> { + plan.transform_down(|node| { + let Some(dse) = node.as_any().downcast_ref::() else { + return Ok(Transformed::no(node)); + }; + let Some(fsc) = dse.data_source().as_any().downcast_ref::() else { + return Ok(Transformed::no(node)); + }; + + let new_ds = WorkUnitFileScanConfig::new(fsc.clone()); + Ok(Transformed::yes(DataSourceExec::from_data_source(new_ds))) + }) + .map(|t| t.data) + } + + fn name(&self) -> &str { + "WorkUnitFileScanRule" + } + + fn schema_check(&self) -> bool { + true + } +} + +/// [`TaskEstimator`] for [`WorkUnitFileScanConfig`] leaves that delegates to +/// the built-in [`FileScanConfigTaskEstimator`]: we synthesize a regular +/// `DataSourceExec(FileScanConfig)` carrying the same files currently stored +/// in the work-unit feed, hand it to the underlying estimator, and then +/// re-wrap the result back into our work-unit-flavored data source. +/// +/// `FileScanConfigTaskEstimator::scale_up_leaf_node` returns a +/// `PartitionIsolatorExec(DataSourceExec(FileScanConfig))`. We unwrap that +/// here: the `PartitionIsolatorExec` itself must not appear in the final plan +/// because the per-task feed routing already handles per-task isolation. We +/// only keep the inner `FileScanConfig`'s file groups, flatten them back into +/// one `PartitionedFile` per feed slot, and feed them into a freshly built +/// `WorkUnitFileScanConfig`. +#[derive(Debug, Default)] +pub struct WorkUnitFileScanTaskEstimator; + +impl TaskEstimator for WorkUnitFileScanTaskEstimator { + fn task_estimation( + &self, + plan: &Arc, + cfg: &ConfigOptions, + ) -> Option { + let dse = plan.as_any().downcast_ref::()?; + let wfs = dse + .data_source() + .as_any() + .downcast_ref::()?; + + // Same as FileScanConfigTaskEstimator.task_estimation. + let d_cfg = cfg.extensions.get::()?; + + let mut partitioned_files = 0; + for file_group in &wfs.feed.inner()?.file_groups { + partitioned_files += file_group.len(); + } + + let task_count = partitioned_files.div_ceil(d_cfg.files_per_task); + + Some(TaskEstimation { + task_count: TaskCountAnnotation::Desired(task_count), + }) + } + + fn scale_up_leaf_node( + &self, + plan: &Arc, + task_count: usize, + _cfg: &ConfigOptions, + ) -> Option> { + let dse = plan.as_any().downcast_ref::()?; + let wfs = dse + .data_source() + .as_any() + .downcast_ref::()?; + + let wuf_provider = wfs.feed.inner()?; + + // Same as FileScanConfigTaskEstimator.scale_up_leaf_node + let mut new_file_groups = vec![]; + for file_group in wuf_provider.file_groups.clone() { + new_file_groups.extend(file_group.split_files(task_count)); + } + + let new_provider = FileScanWorkUnitProvider::new(new_file_groups); + Some(DataSourceExec::from_data_source(WorkUnitFileScanConfig { + feed: WorkUnitFeed::new(new_provider), + fsc: wfs.fsc.clone(), + partitions: wfs.partitions, + }) as Arc) + } +}