Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 25 additions & 4 deletions benchmarks/cdk/bin/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,20 @@ use async_trait::async_trait;
use aws_config::BehaviorVersion;
use aws_sdk_ec2::Client as Ec2Client;
use axum::{Json, Router, extract::Query, http::StatusCode, routing::get};
use datafusion::catalog::memory::DataSourceExec;
use datafusion::common::DataFusionError;
use datafusion::common::instant::Instant;
use datafusion::common::runtime::SpawnedTask;
use datafusion::execution::SessionStateBuilder;
use datafusion::execution::runtime_env::RuntimeEnv;
use datafusion::physical_plan::execute_stream;
use datafusion::prelude::SessionContext;
use datafusion_distributed::test_utils::work_unit_file_scan::{
WorkUnitFileScanCodec, WorkUnitFileScanConfig, WorkUnitFileScanTaskEstimator,
};
use datafusion_distributed::{
ChannelResolver, DistributedExt, DistributedMetricsFormat, SessionStateBuilderExt, Worker,
WorkerResolver, display_plan_ascii, get_distributed_channel_resolver,
WorkerQueryContext, WorkerResolver, display_plan_ascii, get_distributed_channel_resolver,
get_distributed_worker_resolver, rewrite_distributed_plan_with_metrics,
};
use futures::{StreamExt, TryFutureExt};
Expand Down Expand Up @@ -90,17 +94,34 @@ async fn main() -> Result<(), Box<dyn Error>> {
let runtime_env = Arc::new(RuntimeEnv::default());
runtime_env.register_object_store(&s3_url, s3);

let state = SessionStateBuilder::new()
let state_builder = SessionStateBuilder::new()
.with_default_features()
.with_runtime_env(Arc::clone(&runtime_env))
.with_distributed_worker_resolver(Ec2WorkerResolver::new())
.with_distributed_planner()
.with_distributed_broadcast_joins(cmd.broadcast_joins)?
.build();
// Uncomment for enabling WorkUnitFileScans.
// .with_physical_optimizer_rule(Arc::new(WorkUnitFileScanRule))
.with_distributed_user_codec(WorkUnitFileScanCodec)
.with_distributed_task_estimator(WorkUnitFileScanTaskEstimator)
.with_distributed_work_unit_feed(|dse: &DataSourceExec| {
dse.data_source()
.as_any()
.downcast_ref::<WorkUnitFileScanConfig>()
.map(|v| &v.feed)
});
let state = state_builder.build();
let ctx = SessionContext::from(state);
let ctx_clone = ctx.clone();

let worker = Worker::default().with_runtime_env(runtime_env);
let worker = Worker::from_session_builder(|ctx: WorkerQueryContext| async move {
Ok(ctx
.builder
.with_distributed_user_codec(WorkUnitFileScanCodec)
.build())
})
.with_runtime_env(runtime_env);

let http_server = axum::serve(
listener,
Router::new()
Expand Down
42 changes: 39 additions & 3 deletions benchmarks/src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,23 @@ use datafusion::common::instant::Instant;
use datafusion::common::tree_node::{Transformed, TreeNode};
use datafusion::common::utils::get_available_parallelism;
use datafusion::common::{config_err, exec_err, not_impl_err};
use datafusion::datasource::source::DataSourceExec;
use datafusion::error::{DataFusionError, Result};
use datafusion::execution::SessionStateBuilder;
use datafusion::physical_plan::display::DisplayableExecutionPlan;
use datafusion::physical_plan::{collect, displayable};
use datafusion::prelude::*;
use datafusion_distributed::test_utils::localhost::LocalHostWorkerResolver;
use datafusion_distributed::test_utils::work_unit_file_scan::{
WorkUnitFileScanCodec, WorkUnitFileScanConfig, WorkUnitFileScanRule,
WorkUnitFileScanTaskEstimator,
};
use datafusion_distributed::{DistributedExt, NetworkBoundaryExt, SessionStateBuilderExt, Worker};
use datafusion_distributed_benchmarks::datasets::{clickbench, register_tables, tpcds, tpch};
use std::error::Error;
use std::fs;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use structopt::StructOpt;
use tokio::net::TcpListener;
Expand Down Expand Up @@ -115,6 +121,12 @@ pub struct RunOpt {
/// Activate debug mode to see more details
#[structopt(short, long)]
debug: bool,

/// Replace each `FileScanConfig` data source with a `WorkUnitFileScanConfig`
/// that streams its `FileGroup`s through the work-unit feed pipeline. Used
/// to measure the latency overhead introduced by that path.
#[structopt(long = "work-unit-file-scan")]
work_unit_file_scan: bool,
}

fn queries_for_dataset(dataset: &str) -> Result<Vec<(String, String)>, DataFusionError> {
Expand Down Expand Up @@ -157,9 +169,20 @@ impl RunOpt {
let listener = TcpListener::bind(format!("127.0.0.1:{port}")).await?;
println!("Listening on {}...", listener.local_addr().unwrap());
let incoming = tokio_stream::wrappers::TcpListenerStream::new(listener);
// Workers need to be able to decode the WorkUnitFileScan plan
// node when the feature is on. The codec is registered via a
// session builder so it is installed on every worker session.
let worker = Worker::from_session_builder(
|ctx: datafusion_distributed::WorkerQueryContext| async move {
Ok(ctx
.builder
.with_distributed_user_codec(WorkUnitFileScanCodec)
.build())
},
);
Ok::<_, Box<dyn Error + Send + Sync>>(
Server::builder()
.add_service(Worker::default().into_worker_server())
.add_service(worker.into_worker_server())
.serve_with_incoming(incoming)
.await?,
)
Expand All @@ -171,7 +194,7 @@ impl RunOpt {
}

async fn run_local(self) -> Result<()> {
let state = SessionStateBuilder::new()
let mut builder = SessionStateBuilder::new()
.with_default_features()
.with_config(self.config()?)
.with_distributed_worker_resolver(LocalHostWorkerResolver::new(self.workers.clone()))
Expand All @@ -192,7 +215,20 @@ impl RunOpt {
.with_distributed_broadcast_joins(self.broadcast_joins)?
.with_distributed_metrics_collection(self.collect_metrics)?
.with_distributed_max_tasks_per_stage(self.max_tasks_per_stage)?
.build();
.with_distributed_user_codec(WorkUnitFileScanCodec)
.with_distributed_task_estimator(WorkUnitFileScanTaskEstimator)
.with_distributed_work_unit_feed(|dse: &DataSourceExec| {
dse.data_source()
.as_any()
.downcast_ref::<WorkUnitFileScanConfig>()
.map(|v| &v.feed)
});

if self.work_unit_file_scan {
builder = builder.with_physical_optimizer_rule(Arc::new(WorkUnitFileScanRule))
}

let state = builder.build();
let ctx = SessionContext::new_with_state(state);
register_tables(&ctx, &self.get_path()?).await?;

Expand Down
2 changes: 1 addition & 1 deletion src/distributed_planner/task_estimator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ pub(crate) fn get_distributed_task_estimator(
/// [DistributedConfig].`files_per_task` field and assigns as many tasks as needed so that
/// no task handles more than the configured files.
#[derive(Debug)]
struct FileScanConfigTaskEstimator;
pub(crate) struct FileScanConfigTaskEstimator;

impl TaskEstimator for FileScanConfigTaskEstimator {
fn task_estimation(
Expand Down
1 change: 1 addition & 0 deletions src/test_utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ pub mod property_based;
pub mod routing;
pub mod session_context;
pub mod test_work_unit_feed;
pub mod work_unit_file_scan;
Loading
Loading