diff --git a/native/Cargo.lock b/native/Cargo.lock index 5b3f7e885e..f43b41dd9a 100644 --- a/native/Cargo.lock +++ b/native/Cargo.lock @@ -96,12 +96,56 @@ version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4b46cbb362ab8752921c97e041f5e366ee6297bd428a31275b9fcf1e380f7299" +[[package]] +name = "anstream" +version = "0.6.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "43d5b281e737544384e969a5ccad3f1cdd24b48086a0fc1b2a5262a26b8f4f4a" +dependencies = [ + "anstyle", + "anstyle-parse", + "anstyle-query", + "anstyle-wincon", + "colorchoice", + "is_terminal_polyfill", + "utf8parse", +] + [[package]] name = "anstyle" version = "1.0.13" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5192cca8006f1fd4f7237516f40fa183bb07f8fbdfedaa0036de5ea9b0b45e78" +[[package]] +name = "anstyle-parse" +version = "0.2.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4e7644824f0aa2c7b9384579234ef10eb7efb6a0deb83f9630a49594dd9c15c2" +dependencies = [ + "utf8parse", +] + +[[package]] +name = "anstyle-query" +version = "1.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "40c48f72fd53cd289104fc64099abca73db4166ad86ea0b4341abe65af83dadc" +dependencies = [ + "windows-sys 0.60.2", +] + +[[package]] +name = "anstyle-wincon" +version = "3.0.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "291e6a250ff86cd4a820112fb8898808a366d8f9f58ce16d1f538353ad55747d" +dependencies = [ + "anstyle", + "once_cell_polyfill", + "windows-sys 0.60.2", +] + [[package]] name = "anyhow" version = "1.0.102" @@ -1331,6 +1375,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2797f34da339ce31042b27d23607e051786132987f595b02ba4f6a6dffb7030a" dependencies = [ "clap_builder", + "clap_derive", ] [[package]] @@ -1339,8 +1384,22 @@ version = "4.5.60" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "24a241312cea5059b13574bb9b3861cabf758b879c15190b37b6d6fd63ab6876" dependencies = [ + "anstream", "anstyle", "clap_lex", + "strsim", +] + +[[package]] +name = "clap_derive" +version = "4.5.55" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a92793da1a46a5f2a02a6f4c46c6496b28c43638adea8306fcb0caa1634f24e5" +dependencies = [ + "heck", + "proc-macro2", + "quote", + "syn 2.0.117", ] [[package]] @@ -1358,6 +1417,12 @@ dependencies = [ "cc", ] +[[package]] +name = "colorchoice" +version = "1.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d07550c9036bf2ae0c684c4297d503f838287c83c53686d05370d0e139ae570" + [[package]] name = "combine" version = "4.6.7" @@ -1834,6 +1899,7 @@ dependencies = [ "aws-config", "aws-credential-types", "bytes", + "clap", "crc32fast", "criterion", "datafusion", @@ -3609,6 +3675,12 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "is_terminal_polyfill" +version = "1.70.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a6cb138bb79a146c1bd460005623e142ef0181e3d0219cb493e02f7d08a35695" + [[package]] name = "itertools" version = "0.13.0" @@ -4289,6 +4361,12 @@ version = "1.21.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9f7c3e4beb33f85d45ae3e3a1792185706c8e16d043238c593331cc7cd313b50" +[[package]] +name = "once_cell_polyfill" +version = "1.70.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "384b8ab6d37215f3c5301a95a4accb5d64aa607f1fcb26a11b5303878451b4fe" + [[package]] name = "oorandom" version = "11.1.5" @@ -6339,6 +6417,12 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b6c140620e7ffbb22c2dee59cafe6084a59b5ffc27a8859a5f0d494b5d52b6be" +[[package]] +name = "utf8parse" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" + [[package]] name = "uuid" version = "1.22.0" diff --git a/native/core/Cargo.toml b/native/core/Cargo.toml index 3f305a631d..3df9e55719 100644 --- a/native/core/Cargo.toml +++ b/native/core/Cargo.toml @@ -72,6 +72,7 @@ url = { workspace = true } aws-config = { workspace = true } aws-credential-types = { workspace = true } parking_lot = "0.12.5" +clap = { version = "4", features = ["derive"] } datafusion-comet-objectstore-hdfs = { path = "../hdfs", optional = true, default-features = false, features = ["hdfs"] } reqwest = { version = "0.12", default-features = false, features = ["rustls-tls-native-roots", "http2"] } object_store_opendal = {version = "0.55.0", optional = true} @@ -113,6 +114,10 @@ name = "comet" # "rlib" is for benchmarking with criterion. crate-type = ["cdylib", "rlib"] +[[bin]] +name = "shuffle_bench" +path = "src/bin/shuffle_bench.rs" + [[bench]] name = "parquet_read" harness = false diff --git a/native/core/src/bin/shuffle_bench.rs b/native/core/src/bin/shuffle_bench.rs new file mode 100644 index 0000000000..17b1a9a6ff --- /dev/null +++ b/native/core/src/bin/shuffle_bench.rs @@ -0,0 +1,742 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Standalone shuffle benchmark tool for profiling Comet shuffle write and read +//! outside of Spark. +//! +//! # Usage +//! +//! Read from Parquet files (e.g. TPC-H lineitem): +//! ```sh +//! cargo run --release --bin shuffle_bench -- \ +//! --input /data/tpch-sf100/lineitem/ \ +//! --partitions 200 \ +//! --codec zstd --zstd-level 1 \ +//! --hash-columns 0,3 \ +//! --read-back +//! ``` +//! +//! Generate synthetic data: +//! ```sh +//! cargo run --release --bin shuffle_bench -- \ +//! --generate --gen-rows 10000000 --gen-string-cols 4 --gen-int-cols 4 \ +//! --gen-decimal-cols 2 --gen-avg-string-len 32 \ +//! --partitions 200 --codec lz4 --read-back +//! ``` +//! +//! Profile with flamegraph: +//! ```sh +//! cargo flamegraph --release --bin shuffle_bench -- \ +//! --input /data/tpch-sf100/lineitem/ \ +//! --partitions 200 --codec zstd --zstd-level 1 +//! ``` + +use arrow::array::builder::{Date32Builder, Decimal128Builder, Int64Builder, StringBuilder}; +use arrow::array::RecordBatch; +use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; +use clap::Parser; +use comet::execution::shuffle::{ + read_ipc_compressed, CometPartitioning, CompressionCodec, ShuffleWriterExec, +}; +use datafusion::datasource::memory::MemorySourceConfig; +use datafusion::datasource::source::DataSourceExec; +use datafusion::execution::config::SessionConfig; +use datafusion::execution::runtime_env::RuntimeEnvBuilder; +use datafusion::physical_expr::expressions::Column; +use datafusion::physical_plan::common::collect; +use datafusion::physical_plan::ExecutionPlan; +use datafusion::prelude::SessionContext; +use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; +use rand::RngExt; +use std::fs; +use std::path::PathBuf; +use std::sync::Arc; +use std::time::Instant; + +#[derive(Parser, Debug)] +#[command( + name = "shuffle_bench", + about = "Standalone benchmark for Comet shuffle write and read performance" +)] +struct Args { + /// Path to input Parquet file or directory of Parquet files + #[arg(long)] + input: Option, + + /// Generate synthetic data instead of reading from Parquet + #[arg(long, default_value_t = false)] + generate: bool, + + /// Number of rows to generate (requires --generate) + #[arg(long, default_value_t = 1_000_000)] + gen_rows: usize, + + /// Number of Int64 columns to generate + #[arg(long, default_value_t = 4)] + gen_int_cols: usize, + + /// Number of Utf8 string columns to generate + #[arg(long, default_value_t = 2)] + gen_string_cols: usize, + + /// Number of Decimal128 columns to generate + #[arg(long, default_value_t = 2)] + gen_decimal_cols: usize, + + /// Number of Date32 columns to generate + #[arg(long, default_value_t = 1)] + gen_date_cols: usize, + + /// Average string length for generated string columns + #[arg(long, default_value_t = 24)] + gen_avg_string_len: usize, + + /// Batch size for reading Parquet or generating data + #[arg(long, default_value_t = 8192)] + batch_size: usize, + + /// Number of output shuffle partitions + #[arg(long, default_value_t = 200)] + partitions: usize, + + /// Partitioning scheme: hash, single, round-robin + #[arg(long, default_value = "hash")] + partitioning: String, + + /// Column indices to hash on (comma-separated, e.g. "0,3") + #[arg(long, default_value = "0")] + hash_columns: String, + + /// Compression codec: none, lz4, zstd, snappy + #[arg(long, default_value = "zstd")] + codec: String, + + /// Zstd compression level (1-22) + #[arg(long, default_value_t = 1)] + zstd_level: i32, + + /// Memory limit in bytes (triggers spilling when exceeded) + #[arg(long)] + memory_limit: Option, + + /// Also benchmark reading back the shuffle output + #[arg(long, default_value_t = false)] + read_back: bool, + + /// Number of iterations to run + #[arg(long, default_value_t = 1)] + iterations: usize, + + /// Number of warmup iterations before timing + #[arg(long, default_value_t = 0)] + warmup: usize, + + /// Output directory for shuffle data/index files + #[arg(long, default_value = "/tmp/comet_shuffle_bench")] + output_dir: PathBuf, + + /// Write buffer size in bytes + #[arg(long, default_value_t = 1048576)] + write_buffer_size: usize, + + /// Maximum number of rows to use (default: 1,000,000) + #[arg(long, default_value_t = 1_000_000)] + limit: usize, +} + +fn main() { + let args = Args::parse(); + + // Validate args + if args.input.is_none() && !args.generate { + eprintln!("Error: must specify either --input or --generate"); + std::process::exit(1); + } + + // Create output directory + fs::create_dir_all(&args.output_dir).expect("Failed to create output directory"); + + let data_file = args.output_dir.join("data.out"); + let index_file = args.output_dir.join("index.out"); + + // Load data + let load_start = Instant::now(); + let batches = if let Some(ref input_path) = args.input { + load_parquet(input_path, args.batch_size, args.limit) + } else { + generate_data(&args) + }; + let load_elapsed = load_start.elapsed(); + + let schema = batches[0].schema(); + let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); + let total_bytes: usize = batches.iter().map(|b| b.get_array_memory_size()).sum(); + + println!("=== Shuffle Benchmark ==="); + println!( + "Data source: {}", + if args.input.is_some() { + "parquet" + } else { + "generated" + } + ); + println!( + "Schema: {} columns ({} fields)", + schema.fields().len(), + describe_schema(&schema) + ); + println!("Total rows: {}", format_number(total_rows)); + println!("Total size: {}", format_bytes(total_bytes)); + println!("Batches: {}", batches.len()); + println!( + "Rows/batch: ~{}", + if batches.is_empty() { + 0 + } else { + total_rows / batches.len() + } + ); + println!("Load time: {:.3}s", load_elapsed.as_secs_f64()); + println!(); + + let codec = parse_codec(&args.codec, args.zstd_level); + let hash_col_indices = parse_hash_columns(&args.hash_columns); + + println!("Partitioning: {}", args.partitioning); + println!("Partitions: {}", args.partitions); + println!("Codec: {:?}", codec); + println!("Hash columns: {:?}", hash_col_indices); + if let Some(mem_limit) = args.memory_limit { + println!("Memory limit: {}", format_bytes(mem_limit)); + } + println!( + "Iterations: {} (warmup: {})", + args.iterations, args.warmup + ); + println!(); + + // Run warmup + timed iterations + let total_iters = args.warmup + args.iterations; + let mut write_times = Vec::with_capacity(args.iterations); + let mut read_times = Vec::with_capacity(args.iterations); + let mut data_file_sizes = Vec::with_capacity(args.iterations); + + for i in 0..total_iters { + let is_warmup = i < args.warmup; + let label = if is_warmup { + format!("warmup {}/{}", i + 1, args.warmup) + } else { + format!("iter {}/{}", i - args.warmup + 1, args.iterations) + }; + + // Write phase + let write_elapsed = run_shuffle_write( + &batches, + &schema, + &codec, + &hash_col_indices, + &args, + data_file.to_str().unwrap(), + index_file.to_str().unwrap(), + ); + let data_size = fs::metadata(&data_file).map(|m| m.len()).unwrap_or(0); + + if !is_warmup { + write_times.push(write_elapsed); + data_file_sizes.push(data_size); + } + + print!(" [{label}] write: {:.3}s", write_elapsed); + print!(" output: {}", format_bytes(data_size as usize)); + + // Read phase + if args.read_back { + let read_elapsed = run_shuffle_read( + data_file.to_str().unwrap(), + index_file.to_str().unwrap(), + args.partitions, + ); + if !is_warmup { + read_times.push(read_elapsed); + } + print!(" read: {:.3}s", read_elapsed); + } + println!(); + } + + // Print summary + if args.iterations > 0 { + println!(); + println!("=== Results ==="); + + let avg_write = write_times.iter().sum::() / write_times.len() as f64; + let avg_data_size = data_file_sizes.iter().sum::() / data_file_sizes.len() as u64; + let write_throughput_rows = total_rows as f64 / avg_write; + let write_throughput_bytes = total_bytes as f64 / avg_write; + let compression_ratio = if avg_data_size > 0 { + total_bytes as f64 / avg_data_size as f64 + } else { + 0.0 + }; + + println!("Write:"); + println!(" avg time: {:.3}s", avg_write); + if write_times.len() > 1 { + let min = write_times.iter().cloned().fold(f64::INFINITY, f64::min); + let max = write_times + .iter() + .cloned() + .fold(f64::NEG_INFINITY, f64::max); + println!(" min/max: {:.3}s / {:.3}s", min, max); + } + println!( + " throughput: {}/s ({} rows/s)", + format_bytes(write_throughput_bytes as usize), + format_number(write_throughput_rows as usize) + ); + println!( + " output size: {}", + format_bytes(avg_data_size as usize) + ); + println!(" compression: {:.2}x", compression_ratio); + + if !read_times.is_empty() { + let avg_read = read_times.iter().sum::() / read_times.len() as f64; + let read_throughput_bytes = avg_data_size as f64 / avg_read; + + println!("Read:"); + println!(" avg time: {:.3}s", avg_read); + if read_times.len() > 1 { + let min = read_times.iter().cloned().fold(f64::INFINITY, f64::min); + let max = read_times.iter().cloned().fold(f64::NEG_INFINITY, f64::max); + println!(" min/max: {:.3}s / {:.3}s", min, max); + } + println!( + " throughput: {}/s (from compressed)", + format_bytes(read_throughput_bytes as usize) + ); + } + } + + // Cleanup + let _ = fs::remove_file(&data_file); + let _ = fs::remove_file(&index_file); +} + +fn load_parquet(path: &PathBuf, batch_size: usize, limit: usize) -> Vec { + let mut batches = Vec::new(); + let mut total_rows = 0usize; + + let paths = if path.is_dir() { + let mut files: Vec = fs::read_dir(path) + .expect("Failed to read input directory") + .filter_map(|entry| { + let entry = entry.ok()?; + let p = entry.path(); + if p.extension().and_then(|e| e.to_str()) == Some("parquet") { + Some(p) + } else { + None + } + }) + .collect(); + files.sort(); + if files.is_empty() { + panic!("No .parquet files found in {}", path.display()); + } + files + } else { + vec![path.clone()] + }; + + 'outer: for file_path in &paths { + let file = fs::File::open(file_path) + .unwrap_or_else(|e| panic!("Failed to open {}: {}", file_path.display(), e)); + let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap_or_else(|e| { + panic!( + "Failed to read Parquet metadata from {}: {}", + file_path.display(), + e + ) + }); + let reader = builder + .with_batch_size(batch_size) + .build() + .unwrap_or_else(|e| { + panic!( + "Failed to build Parquet reader for {}: {}", + file_path.display(), + e + ) + }); + for batch_result in reader { + let batch = batch_result.unwrap_or_else(|e| { + panic!("Failed to read batch from {}: {}", file_path.display(), e) + }); + if batch.num_rows() == 0 { + continue; + } + let remaining = limit - total_rows; + if batch.num_rows() <= remaining { + total_rows += batch.num_rows(); + batches.push(batch); + } else { + batches.push(batch.slice(0, remaining)); + total_rows += remaining; + } + if total_rows >= limit { + break 'outer; + } + } + } + + if batches.is_empty() { + panic!("No data read from input"); + } + + println!( + "Loaded {} batches ({} rows) from {} file(s)", + batches.len(), + format_number(total_rows), + paths.len() + ); + batches +} + +fn generate_data(args: &Args) -> Vec { + let mut fields = Vec::new(); + let mut col_idx = 0; + + // Int64 columns + for _ in 0..args.gen_int_cols { + fields.push(Field::new( + format!("int_col_{col_idx}"), + DataType::Int64, + true, + )); + col_idx += 1; + } + // String columns + for _ in 0..args.gen_string_cols { + fields.push(Field::new( + format!("str_col_{col_idx}"), + DataType::Utf8, + true, + )); + col_idx += 1; + } + // Decimal columns + for _ in 0..args.gen_decimal_cols { + fields.push(Field::new( + format!("dec_col_{col_idx}"), + DataType::Decimal128(18, 2), + true, + )); + col_idx += 1; + } + // Date columns + for _ in 0..args.gen_date_cols { + fields.push(Field::new( + format!("date_col_{col_idx}"), + DataType::Date32, + true, + )); + col_idx += 1; + } + + let schema = Arc::new(Schema::new(fields)); + let mut batches = Vec::new(); + let mut rng = rand::rng(); + let mut remaining = args.gen_rows; + + while remaining > 0 { + let batch_rows = remaining.min(args.batch_size); + remaining -= batch_rows; + + let mut columns: Vec> = Vec::new(); + + // Int64 columns + for _ in 0..args.gen_int_cols { + let mut builder = Int64Builder::with_capacity(batch_rows); + for _ in 0..batch_rows { + if rng.random_range(0..100) < 5 { + builder.append_null(); + } else { + builder.append_value(rng.random_range(0..1_000_000i64)); + } + } + columns.push(Arc::new(builder.finish())); + } + // String columns + for _ in 0..args.gen_string_cols { + let mut builder = + StringBuilder::with_capacity(batch_rows, batch_rows * args.gen_avg_string_len); + for _ in 0..batch_rows { + if rng.random_range(0..100) < 5 { + builder.append_null(); + } else { + let len = rng.random_range(1..args.gen_avg_string_len * 2); + let s: String = (0..len) + .map(|_| rng.random_range(b'a'..=b'z') as char) + .collect(); + builder.append_value(&s); + } + } + columns.push(Arc::new(builder.finish())); + } + // Decimal columns + for _ in 0..args.gen_decimal_cols { + let mut builder = Decimal128Builder::with_capacity(batch_rows) + .with_precision_and_scale(18, 2) + .unwrap(); + for _ in 0..batch_rows { + if rng.random_range(0..100) < 5 { + builder.append_null(); + } else { + builder.append_value(rng.random_range(0..100_000_000i128)); + } + } + columns.push(Arc::new(builder.finish())); + } + // Date columns + for _ in 0..args.gen_date_cols { + let mut builder = Date32Builder::with_capacity(batch_rows); + for _ in 0..batch_rows { + if rng.random_range(0..100) < 5 { + builder.append_null(); + } else { + builder.append_value(rng.random_range(0..20000i32)); + } + } + columns.push(Arc::new(builder.finish())); + } + + let batch = RecordBatch::try_new(Arc::clone(&schema), columns).unwrap(); + batches.push(batch); + } + + println!( + "Generated {} batches ({} rows)", + batches.len(), + args.gen_rows + ); + batches +} + +fn run_shuffle_write( + batches: &[RecordBatch], + schema: &SchemaRef, + codec: &CompressionCodec, + hash_col_indices: &[usize], + args: &Args, + data_file: &str, + index_file: &str, +) -> f64 { + let partitioning = build_partitioning( + &args.partitioning, + args.partitions, + hash_col_indices, + schema, + ); + + let partitions = &[batches.to_vec()]; + let exec = ShuffleWriterExec::try_new( + Arc::new(DataSourceExec::new(Arc::new( + MemorySourceConfig::try_new(partitions, Arc::clone(schema), None).unwrap(), + ))), + partitioning, + codec.clone(), + data_file.to_string(), + index_file.to_string(), + false, + args.write_buffer_size, + ) + .expect("Failed to create ShuffleWriterExec"); + + let config = SessionConfig::new().with_batch_size(args.batch_size); + let mut runtime_builder = RuntimeEnvBuilder::new(); + if let Some(mem_limit) = args.memory_limit { + runtime_builder = runtime_builder.with_memory_limit(mem_limit, 1.0); + } + let runtime_env = Arc::new(runtime_builder.build().unwrap()); + let ctx = SessionContext::new_with_config_rt(config, runtime_env); + let task_ctx = ctx.task_ctx(); + + let start = Instant::now(); + let stream = exec.execute(0, task_ctx).unwrap(); + let rt = tokio::runtime::Runtime::new().unwrap(); + rt.block_on(collect(stream)).unwrap(); + start.elapsed().as_secs_f64() +} + +fn run_shuffle_read(data_file: &str, index_file: &str, num_partitions: usize) -> f64 { + let start = Instant::now(); + + // Read index file to get partition offsets + let index_bytes = fs::read(index_file).expect("Failed to read index file"); + let num_offsets = index_bytes.len() / 8; + let offsets: Vec = (0..num_offsets) + .map(|i| { + let bytes: [u8; 8] = index_bytes[i * 8..(i + 1) * 8].try_into().unwrap(); + i64::from_le_bytes(bytes) + }) + .collect(); + + // Read data file + let data_bytes = fs::read(data_file).expect("Failed to read data file"); + + let mut total_rows = 0usize; + let mut total_batches = 0usize; + + // Decode each partition's data + for p in 0..num_partitions.min(offsets.len().saturating_sub(1)) { + let start_offset = offsets[p] as usize; + let end_offset = offsets[p + 1] as usize; + + if start_offset >= end_offset { + continue; // Empty partition + } + + // Read all IPC blocks within this partition + let mut offset = start_offset; + while offset < end_offset { + // First 8 bytes: IPC length + let ipc_length = + u64::from_le_bytes(data_bytes[offset..offset + 8].try_into().unwrap()) as usize; + + // Skip 8-byte length prefix, then 8 bytes of field_count + codec header + let block_data = &data_bytes[offset + 16..offset + 8 + ipc_length]; + let batch = read_ipc_compressed(block_data).expect("Failed to decode shuffle block"); + total_rows += batch.num_rows(); + total_batches += 1; + + offset += 8 + ipc_length; + } + } + + let elapsed = start.elapsed().as_secs_f64(); + eprintln!( + " read back {} rows in {} batches from {} partitions", + format_number(total_rows), + total_batches, + num_partitions + ); + elapsed +} + +fn build_partitioning( + scheme: &str, + num_partitions: usize, + hash_col_indices: &[usize], + schema: &SchemaRef, +) -> CometPartitioning { + match scheme { + "single" => CometPartitioning::SinglePartition, + "round-robin" => CometPartitioning::RoundRobin(num_partitions, 0), + "hash" => { + let exprs: Vec> = hash_col_indices + .iter() + .map(|&idx| { + let field = schema.field(idx); + Arc::new(Column::new(field.name(), idx)) + as Arc + }) + .collect(); + CometPartitioning::Hash(exprs, num_partitions) + } + other => { + eprintln!("Unknown partitioning scheme: {other}. Using hash."); + build_partitioning("hash", num_partitions, hash_col_indices, schema) + } + } +} + +fn parse_codec(codec: &str, zstd_level: i32) -> CompressionCodec { + match codec.to_lowercase().as_str() { + "none" => CompressionCodec::None, + "lz4" => CompressionCodec::Lz4Frame, + "zstd" => CompressionCodec::Zstd(zstd_level), + "snappy" => CompressionCodec::Snappy, + other => { + eprintln!("Unknown codec: {other}. Using zstd."); + CompressionCodec::Zstd(zstd_level) + } + } +} + +fn parse_hash_columns(s: &str) -> Vec { + s.split(',') + .filter(|s| !s.is_empty()) + .map(|s| s.trim().parse::().expect("Invalid column index")) + .collect() +} + +fn describe_schema(schema: &Schema) -> String { + let mut counts = std::collections::HashMap::new(); + for field in schema.fields() { + let type_name = match field.data_type() { + DataType::Int8 + | DataType::Int16 + | DataType::Int32 + | DataType::Int64 + | DataType::UInt8 + | DataType::UInt16 + | DataType::UInt32 + | DataType::UInt64 => "int", + DataType::Float16 | DataType::Float32 | DataType::Float64 => "float", + DataType::Utf8 | DataType::LargeUtf8 => "string", + DataType::Boolean => "bool", + DataType::Date32 | DataType::Date64 => "date", + DataType::Decimal128(_, _) | DataType::Decimal256(_, _) => "decimal", + DataType::Timestamp(_, _) => "timestamp", + DataType::Binary | DataType::LargeBinary | DataType::FixedSizeBinary(_) => "binary", + _ => "other", + }; + *counts.entry(type_name).or_insert(0) += 1; + } + let mut parts: Vec = counts + .into_iter() + .map(|(k, v)| format!("{}x{}", v, k)) + .collect(); + parts.sort(); + parts.join(", ") +} + +fn format_number(n: usize) -> String { + let s = n.to_string(); + let mut result = String::new(); + for (i, c) in s.chars().rev().enumerate() { + if i > 0 && i % 3 == 0 { + result.push(','); + } + result.push(c); + } + result.chars().rev().collect() +} + +fn format_bytes(bytes: usize) -> String { + if bytes >= 1024 * 1024 * 1024 { + format!("{:.2} GiB", bytes as f64 / (1024.0 * 1024.0 * 1024.0)) + } else if bytes >= 1024 * 1024 { + format!("{:.2} MiB", bytes as f64 / (1024.0 * 1024.0)) + } else if bytes >= 1024 { + format!("{:.2} KiB", bytes as f64 / 1024.0) + } else { + format!("{bytes} B") + } +} diff --git a/native/core/src/execution/shuffle/metrics.rs b/native/core/src/execution/shuffle/metrics.rs index 33b51c3cd8..8b9d721ad7 100644 --- a/native/core/src/execution/shuffle/metrics.rs +++ b/native/core/src/execution/shuffle/metrics.rs @@ -26,6 +26,9 @@ pub(super) struct ShufflePartitionerMetrics { /// Time to perform repartitioning pub(super) repart_time: Time, + /// Time scattering values to per-partition buffers + pub(super) scatter_time: Time, + /// Time encoding batches to IPC format pub(super) encode_time: Time, @@ -50,6 +53,7 @@ impl ShufflePartitionerMetrics { Self { baseline: BaselineMetrics::new(metrics, partition), repart_time: MetricBuilder::new(metrics).subset_time("repart_time", partition), + scatter_time: MetricBuilder::new(metrics).subset_time("scatter_time", partition), encode_time: MetricBuilder::new(metrics).subset_time("encode_time", partition), write_time: MetricBuilder::new(metrics).subset_time("write_time", partition), input_batches: MetricBuilder::new(metrics).counter("input_batches", partition), diff --git a/native/core/src/execution/shuffle/partitioners/mod.rs b/native/core/src/execution/shuffle/partitioners/mod.rs index b9058f66f4..9590c3c4d3 100644 --- a/native/core/src/execution/shuffle/partitioners/mod.rs +++ b/native/core/src/execution/shuffle/partitioners/mod.rs @@ -16,14 +16,13 @@ // under the License. mod multi_partition; -mod partitioned_batch_iterator; +pub(super) mod partition_buffer; mod single_partition; use arrow::record_batch::RecordBatch; use datafusion::common::Result; pub(super) use multi_partition::MultiPartitionShuffleRepartitioner; -pub(super) use partitioned_batch_iterator::PartitionedBatchIterator; pub(super) use single_partition::SinglePartitionShufflePartitioner; #[async_trait::async_trait] diff --git a/native/core/src/execution/shuffle/partitioners/multi_partition.rs b/native/core/src/execution/shuffle/partitioners/multi_partition.rs index 9c366ad462..46449d2f96 100644 --- a/native/core/src/execution/shuffle/partitioners/multi_partition.rs +++ b/native/core/src/execution/shuffle/partitioners/multi_partition.rs @@ -16,24 +16,19 @@ // under the License. use crate::execution::shuffle::metrics::ShufflePartitionerMetrics; -use crate::execution::shuffle::partitioners::partitioned_batch_iterator::{ - PartitionedBatchIterator, PartitionedBatchesProducer, -}; +use crate::execution::shuffle::partitioners::partition_buffer::{ColumnBuffer, PartitionBuffer}; use crate::execution::shuffle::partitioners::ShufflePartitioner; use crate::execution::shuffle::writers::{BufBatchWriter, PartitionWriter}; use crate::execution::shuffle::{ comet_partitioning, CometPartitioning, CompressionCodec, ShuffleBlockWriter, }; use crate::execution::tracing::{with_trace, with_trace_async}; -use arrow::array::{ArrayRef, RecordBatch}; +use arrow::array::{Array, ArrayRef, BooleanArray, RecordBatch}; use arrow::datatypes::SchemaRef; -use datafusion::common::utils::proxy::VecAllocExt; use datafusion::common::DataFusionError; use datafusion::execution::memory_pool::{MemoryConsumer, MemoryReservation}; use datafusion::execution::runtime_env::RuntimeEnv; -use datafusion::physical_plan::metrics::Time; use datafusion_comet_spark_expr::murmur3::create_murmur3_hashes; -use itertools::Itertools; use std::fmt; use std::fmt::{Debug, Formatter}; use std::fs::{File, OpenOptions}; @@ -109,8 +104,7 @@ impl ScratchSpace { pub(crate) struct MultiPartitionShuffleRepartitioner { output_data_file: String, output_index_file: String, - buffered_batches: Vec, - partition_indices: Vec>, + partition_buffers: Vec, partition_writers: Vec, shuffle_block_writer: ShuffleBlockWriter, /// Partitioning scheme to use @@ -172,6 +166,11 @@ impl MultiPartitionShuffleRepartitioner { .map(|_| PartitionWriter::try_new(shuffle_block_writer.clone())) .collect::>>()?; + let estimated_rows_per_partition = batch_size / num_output_partitions.max(1); + let partition_buffers = (0..num_output_partitions) + .map(|_| PartitionBuffer::new(Arc::clone(&schema), estimated_rows_per_partition)) + .collect(); + let reservation = MemoryConsumer::new(format!("ShuffleRepartitioner[{partition}]")) .with_can_spill(true) .register(&runtime.memory_pool); @@ -179,8 +178,7 @@ impl MultiPartitionShuffleRepartitioner { Ok(Self { output_data_file, output_index_file, - buffered_batches: vec![], - partition_indices: vec![vec![]; num_output_partitions], + partition_buffers, partition_writers, shuffle_block_writer, partitioning, @@ -221,7 +219,7 @@ impl MultiPartitionShuffleRepartitioner { match &self.partitioning { CometPartitioning::Hash(exprs, num_output_partitions) => { let mut scratch = std::mem::take(&mut self.scratch); - let (partition_starts, partition_row_indices): (&Vec, &Vec) = { + let num_rows = { let mut timer = self.metrics.repart_time.timer(); // Evaluate partition expressions to get rows to apply partitioning scheme. @@ -249,24 +247,19 @@ impl MultiPartitionShuffleRepartitioner { }); } - // We now have partition ids for every input row, map that to partition starts - // and partition indices to eventually right these rows to partition buffers. + // We now have partition ids for every input row, map that to partition starts. scratch .map_partition_ids_to_starts_and_indices(*num_output_partitions, num_rows); timer.stop(); - Ok::<(&Vec, &Vec), DataFusionError>(( - &scratch.partition_starts, - &scratch.partition_row_indices, - )) - }?; - - self.buffer_partitioned_batch_may_spill( - input, - partition_row_indices, - partition_starts, - ) - .await?; + num_rows + }; + + self.scatter_batch( + &input, + &scratch.partition_row_indices[..num_rows], + &scratch.partition_starts, + )?; self.scratch = scratch; } CometPartitioning::RangePartitioning( @@ -276,7 +269,7 @@ impl MultiPartitionShuffleRepartitioner { bounds, ) => { let mut scratch = std::mem::take(&mut self.scratch); - let (partition_starts, partition_row_indices): (&Vec, &Vec) = { + let num_rows = { let mut timer = self.metrics.repart_time.timer(); // Evaluate partition expressions for values to apply partitioning scheme on. @@ -302,24 +295,19 @@ impl MultiPartitionShuffleRepartitioner { }); } - // We now have partition ids for every input row, map that to partition starts - // and partition indices to eventually right these rows to partition buffers. + // We now have partition ids for every input row, map that to partition starts. scratch .map_partition_ids_to_starts_and_indices(*num_output_partitions, num_rows); timer.stop(); - Ok::<(&Vec, &Vec), DataFusionError>(( - &scratch.partition_starts, - &scratch.partition_row_indices, - )) - }?; - - self.buffer_partitioned_batch_may_spill( - input, - partition_row_indices, - partition_starts, - ) - .await?; + num_rows + }; + + self.scatter_batch( + &input, + &scratch.partition_row_indices[..num_rows], + &scratch.partition_starts, + )?; self.scratch = scratch; } CometPartitioning::RoundRobin(num_output_partitions, max_hash_columns) => { @@ -331,7 +319,7 @@ impl MultiPartitionShuffleRepartitioner { // which sorts by UnsafeRow binary representation before assigning partitions. // However, both approaches provide even distribution and determinism. let mut scratch = std::mem::take(&mut self.scratch); - let (partition_starts, partition_row_indices): (&Vec, &Vec) = { + let num_rows = { let mut timer = self.metrics.repart_time.timer(); let num_rows = input.num_rows(); @@ -362,24 +350,19 @@ impl MultiPartitionShuffleRepartitioner { comet_partitioning::pmod(*hash, *num_output_partitions) as u32; }); - // We now have partition ids for every input row, map that to partition starts - // and partition indices to eventually write these rows to partition buffers. + // We now have partition ids for every input row, map that to partition starts. scratch .map_partition_ids_to_starts_and_indices(*num_output_partitions, num_rows); timer.stop(); - Ok::<(&Vec, &Vec), DataFusionError>(( - &scratch.partition_starts, - &scratch.partition_row_indices, - )) - }?; - - self.buffer_partitioned_batch_may_spill( - input, - partition_row_indices, - partition_starts, - ) - .await?; + num_rows + }; + + self.scatter_batch( + &input, + &scratch.partition_row_indices[..num_rows], + &scratch.partition_starts, + )?; self.scratch = scratch; } other => { @@ -393,68 +376,173 @@ impl MultiPartitionShuffleRepartitioner { Ok(()) } - async fn buffer_partitioned_batch_may_spill( + fn scatter_batch( &mut self, - input: RecordBatch, + input: &RecordBatch, partition_row_indices: &[u32], partition_starts: &[u32], ) -> datafusion::common::Result<()> { - let mut mem_growth: usize = input.get_array_memory_size(); - let buffered_partition_idx = self.buffered_batches.len() as u32; - self.buffered_batches.push(input); - - // partition_starts conceptually slices partition_row_indices into smaller slices, - // each slice contains the indices of rows in input that will go into the corresponding - // partition. The following loop iterates over the slices and put the row indices into - // the indices array of the corresponding partition. - for (partition_id, (&start, &end)) in partition_starts - .iter() - .tuple_windows() - .enumerate() - .filter(|(_, (start, end))| start < end) - { - let row_indices = &partition_row_indices[start as usize..end as usize]; - - // Put row indices for the current partition into the indices array of that partition. - // This indices array will be used for calling interleave_record_batch to produce - // shuffled batches. - let indices = &mut self.partition_indices[partition_id]; - let before_size = indices.allocated_size(); - indices.reserve(row_indices.len()); - for row_idx in row_indices { - indices.push((buffered_partition_idx, *row_idx)); + let num_partitions = self.partition_buffers.len(); + + // Track memory before scatter + let mem_before: usize = self.partition_buffers.iter().map(|b| b.memory_size()).sum(); + + let scatter_start = Instant::now(); + + // Column-oriented scatter: for each column, iterate by partition then by + // rows within that partition. This keeps writes to the same partition buffer + // sequential for better cache locality. + for (col_idx, column) in input.columns().iter().enumerate() { + let nulls = column.nulls(); + + // Single match to determine scatter path from first partition's column type + match &self.partition_buffers[0].columns[col_idx] { + ColumnBuffer::Fixed { byte_width, .. } => { + let byte_width = *byte_width; + let data = column.to_data(); + let values = data.buffers()[0].as_slice(); + for p in 0..num_partitions { + let start = partition_starts[p] as usize; + let end = partition_starts[p + 1] as usize; + if start == end { + continue; + } + let row_indices = &partition_row_indices[start..end]; + for &row_idx in row_indices { + let row = row_idx as usize; + let src_offset = row * byte_width; + let is_valid = nulls.is_none_or(|n| n.is_valid(row)); + self.partition_buffers[p].append_fixed( + col_idx, + &values[src_offset..src_offset + byte_width], + is_valid, + ); + } + } + } + ColumnBuffer::Variable { .. } => { + let data = column.to_data(); + let offsets_slice = data.buffers()[0].typed_data::(); + let values_slice = data.buffers()[1].as_slice(); + for p in 0..num_partitions { + let start = partition_starts[p] as usize; + let end = partition_starts[p + 1] as usize; + if start == end { + continue; + } + let row_indices = &partition_row_indices[start..end]; + for &row_idx in row_indices { + let row = row_idx as usize; + let val_start = offsets_slice[row] as usize; + let val_end = offsets_slice[row + 1] as usize; + let is_valid = nulls.is_none_or(|n| n.is_valid(row)); + self.partition_buffers[p].append_variable( + col_idx, + &values_slice[val_start..val_end], + is_valid, + ); + } + } + } + ColumnBuffer::LargeVariable { .. } => { + let data = column.to_data(); + let offsets_slice = data.buffers()[0].typed_data::(); + let values_slice = data.buffers()[1].as_slice(); + for p in 0..num_partitions { + let start = partition_starts[p] as usize; + let end = partition_starts[p + 1] as usize; + if start == end { + continue; + } + let row_indices = &partition_row_indices[start..end]; + for &row_idx in row_indices { + let row = row_idx as usize; + let val_start = offsets_slice[row] as usize; + let val_end = offsets_slice[row + 1] as usize; + let is_valid = nulls.is_none_or(|n| n.is_valid(row)); + self.partition_buffers[p].append_large_variable( + col_idx, + &values_slice[val_start..val_end], + is_valid, + ); + } + } + } + ColumnBuffer::Boolean { .. } => { + let bool_array = column.as_any().downcast_ref::().unwrap(); + for p in 0..num_partitions { + let start = partition_starts[p] as usize; + let end = partition_starts[p + 1] as usize; + if start == end { + continue; + } + let row_indices = &partition_row_indices[start..end]; + for &row_idx in row_indices { + let row = row_idx as usize; + let is_valid = nulls.is_none_or(|n| n.is_valid(row)); + self.partition_buffers[p].append_bool( + col_idx, + bool_array.value(row), + is_valid, + ); + } + } + } + ColumnBuffer::Fallback { .. } => { + for p in 0..num_partitions { + let start = partition_starts[p] as usize; + let end = partition_starts[p + 1] as usize; + if start == end { + continue; + } + let row_indices = &partition_row_indices[start..end]; + for &row_idx in row_indices { + self.partition_buffers[p].append_fallback_index(col_idx, row_idx); + } + } + } } - let after_size = indices.allocated_size(); - mem_growth += after_size.saturating_sub(before_size); } - if self.reservation.try_grow(mem_growth).is_err() { - self.spill()?; + self.metrics + .scatter_time + .add_duration(scatter_start.elapsed()); + + // O(num_partitions) rather than O(num_rows) + for p in 0..num_partitions { + let count = (partition_starts[p + 1] - partition_starts[p]) as usize; + self.partition_buffers[p].row_count += count; } - Ok(()) - } + // Flush partitions. When fallback columns exist, flush ALL non-empty + // partitions since fallback indices reference the current input batch. + // Otherwise, only flush partitions that reached batch_size. + let flush_all = self.partition_buffers[0].has_fallback_columns(); + for p in 0..num_partitions { + let should_flush = if flush_all { + self.partition_buffers[p].row_count > 0 + } else { + self.partition_buffers[p].row_count >= self.batch_size + }; + if should_flush { + let batch = self.partition_buffers[p].flush(Some(input))?; + self.partition_writers[p].spill( + &[batch], + &self.runtime, + &self.metrics, + self.write_buffer_size, + self.batch_size, + )?; + } + } - fn shuffle_write_partition( - partition_iter: &mut PartitionedBatchIterator, - shuffle_block_writer: &mut ShuffleBlockWriter, - output_data: &mut BufWriter, - encode_time: &Time, - write_time: &Time, - write_buffer_size: usize, - batch_size: usize, - ) -> datafusion::common::Result<()> { - let mut buf_batch_writer = BufBatchWriter::new( - shuffle_block_writer, - output_data, - write_buffer_size, - batch_size, - ); - for batch in partition_iter { - let batch = batch?; - buf_batch_writer.write(&batch, encode_time, write_time)?; + // Precise memory tracking + let mem_after: usize = self.partition_buffers.iter().map(|b| b.memory_size()).sum(); + let mem_growth = mem_after.saturating_sub(mem_before); + if self.reservation.try_grow(mem_growth).is_err() { + self.spill()?; } - buf_batch_writer.flush(encode_time, write_time)?; + Ok(()) } @@ -474,49 +562,30 @@ impl MultiPartitionShuffleRepartitioner { self.metrics.data_size.value() } - /// This function transfers the ownership of the buffered batches and partition indices from the - /// ShuffleRepartitioner to a new PartitionedBatches struct. The returned PartitionedBatches struct - /// can be used to produce shuffled batches. - fn partitioned_batches(&mut self) -> PartitionedBatchesProducer { - let num_output_partitions = self.partition_indices.len(); - let buffered_batches = std::mem::take(&mut self.buffered_batches); - // let indices = std::mem::take(&mut self.partition_indices); - let indices = std::mem::replace( - &mut self.partition_indices, - vec![vec![]; num_output_partitions], - ); - PartitionedBatchesProducer::new(buffered_batches, indices, self.batch_size) - } - pub(crate) fn spill(&mut self) -> datafusion::common::Result<()> { + let has_data = self.partition_buffers.iter().any(|b| b.row_count() > 0); + if !has_data { + return Ok(()); + } log::info!( "ShuffleRepartitioner spilling shuffle data of {} to disk while inserting ({} time(s) so far)", self.used(), self.spill_count() ); - - // we could always get a chance to free some memory as long as we are holding some - if self.buffered_batches.is_empty() { - return Ok(()); - } - with_trace("shuffle_spill", self.tracing_enabled, || { - let num_output_partitions = self.partition_writers.len(); - let mut partitioned_batches = self.partitioned_batches(); let mut spilled_bytes = 0; - - for partition_id in 0..num_output_partitions { - let partition_writer = &mut self.partition_writers[partition_id]; - let mut iter = partitioned_batches.produce(partition_id); - spilled_bytes += partition_writer.spill( - &mut iter, - &self.runtime, - &self.metrics, - self.write_buffer_size, - self.batch_size, - )?; + for p in 0..self.partition_buffers.len() { + if self.partition_buffers[p].row_count() > 0 { + let batch = self.partition_buffers[p].flush(None)?; + spilled_bytes += self.partition_writers[p].spill( + &[batch], + &self.runtime, + &self.metrics, + self.write_buffer_size, + self.batch_size, + )?; + } } - self.reservation.free(); self.metrics.spill_count.add(1); self.metrics.spilled_bytes.add(spilled_bytes); @@ -559,11 +628,8 @@ impl ShufflePartitioner for MultiPartitionShuffleRepartitioner { fn shuffle_write(&mut self) -> datafusion::common::Result<()> { with_trace("shuffle_write", self.tracing_enabled, || { let start_time = Instant::now(); - - let mut partitioned_batches = self.partitioned_batches(); - let num_output_partitions = self.partition_indices.len(); + let num_output_partitions = self.partition_buffers.len(); let mut offsets = vec![0; num_output_partitions + 1]; - let data_file = self.output_data_file.clone(); let index_file = self.output_index_file.clone(); @@ -573,43 +639,42 @@ impl ShufflePartitioner for MultiPartitionShuffleRepartitioner { .truncate(true) .open(data_file) .map_err(|e| DataFusionError::Execution(format!("shuffle write error: {e:?}")))?; - let mut output_data = BufWriter::new(output_data); #[allow(clippy::needless_range_loop)] for i in 0..num_output_partitions { offsets[i] = output_data.stream_position()?; - // if we wrote a spill file for this partition then copy the - // contents into the shuffle file if let Some(spill_path) = self.partition_writers[i].path() { let mut spill_file = BufReader::new(File::open(spill_path)?); - let mut write_timer = self.metrics.write_time.timer(); + let mut wt = self.metrics.write_time.timer(); std::io::copy(&mut spill_file, &mut output_data)?; - write_timer.stop(); + wt.stop(); } - // Write in memory batches to output data file - let mut partition_iter = partitioned_batches.produce(i); - Self::shuffle_write_partition( - &mut partition_iter, - &mut self.shuffle_block_writer, - &mut output_data, - &self.metrics.encode_time, - &self.metrics.write_time, - self.write_buffer_size, - self.batch_size, - )?; + if self.partition_buffers[i].row_count() > 0 { + let batch = self.partition_buffers[i].flush(None)?; + let mut buf_batch_writer = BufBatchWriter::new( + &mut self.shuffle_block_writer, + &mut output_data, + self.write_buffer_size, + self.batch_size, + ); + buf_batch_writer.write( + &batch, + &self.metrics.encode_time, + &self.metrics.write_time, + )?; + buf_batch_writer.flush(&self.metrics.encode_time, &self.metrics.write_time)?; + } } - let mut write_timer = self.metrics.write_time.timer(); + let mut wt = self.metrics.write_time.timer(); output_data.flush()?; - write_timer.stop(); - - // add one extra offset at last to ease partition length computation + wt.stop(); offsets[num_output_partitions] = output_data.stream_position()?; - let mut write_timer = self.metrics.write_time.timer(); + let mut wt = self.metrics.write_time.timer(); let mut output_index = BufWriter::new(File::create(index_file).map_err(|e| { DataFusionError::Execution(format!("shuffle write error: {e:?}")) @@ -618,13 +683,12 @@ impl ShufflePartitioner for MultiPartitionShuffleRepartitioner { output_index.write_all(&(offset as i64).to_le_bytes()[..])?; } output_index.flush()?; - write_timer.stop(); + wt.stop(); self.metrics .baseline .elapsed_compute() .add_duration(start_time.elapsed()); - Ok(()) }) } diff --git a/native/core/src/execution/shuffle/partitioners/partition_buffer.rs b/native/core/src/execution/shuffle/partitioners/partition_buffer.rs new file mode 100644 index 0000000000..f91c4fd141 --- /dev/null +++ b/native/core/src/execution/shuffle/partitioners/partition_buffer.rs @@ -0,0 +1,407 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::array::{ + make_array, ArrayData, ArrayRef, BinaryArray, BooleanArray, BooleanBufferBuilder, + LargeBinaryArray, LargeStringArray, RecordBatch, StringArray, UInt32Array, +}; +use arrow::buffer::{Buffer, MutableBuffer, NullBuffer, OffsetBuffer, ScalarBuffer}; +use arrow::compute::take; +use arrow::datatypes::{DataType, SchemaRef}; +use datafusion::common::Result; +use std::sync::Arc; + +/// Per-partition typed column buffer for the scatter kernel. +pub(crate) enum ColumnBuffer { + Boolean { + values: BooleanBufferBuilder, + nulls: BooleanBufferBuilder, + }, + Fixed { + values: MutableBuffer, + byte_width: usize, + nulls: BooleanBufferBuilder, + }, + Variable { + offsets: Vec, + data: Vec, + nulls: BooleanBufferBuilder, + }, + LargeVariable { + offsets: Vec, + data: Vec, + nulls: BooleanBufferBuilder, + }, + Fallback { + indices: Vec, + }, +} + +impl ColumnBuffer { + pub(crate) fn append_fixed(&mut self, bytes: &[u8]) { + match self { + ColumnBuffer::Fixed { values, .. } => { + values.extend_from_slice(bytes); + } + _ => unreachable!("append_fixed called on non-Fixed variant"), + } + } + + pub(crate) fn append_variable(&mut self, bytes: &[u8]) { + match self { + ColumnBuffer::Variable { offsets, data, .. } => { + data.extend_from_slice(bytes); + offsets.push(data.len() as i32); + } + _ => unreachable!("append_variable called on non-Variable variant"), + } + } + + pub(crate) fn append_large_variable(&mut self, bytes: &[u8]) { + match self { + ColumnBuffer::LargeVariable { offsets, data, .. } => { + data.extend_from_slice(bytes); + offsets.push(data.len() as i64); + } + _ => unreachable!("append_large_variable called on non-LargeVariable variant"), + } + } + + pub(crate) fn append_bool(&mut self, value: bool) { + match self { + ColumnBuffer::Boolean { values, .. } => { + values.append(value); + } + _ => unreachable!("append_bool called on non-Boolean variant"), + } + } + + pub(crate) fn append_fallback_index(&mut self, idx: u32) { + match self { + ColumnBuffer::Fallback { indices } => { + indices.push(idx); + } + _ => unreachable!("append_fallback_index called on non-Fallback variant"), + } + } + + pub(crate) fn append_null_bit(&mut self, is_valid: bool) { + match self { + ColumnBuffer::Boolean { nulls, .. } + | ColumnBuffer::Fixed { nulls, .. } + | ColumnBuffer::Variable { nulls, .. } + | ColumnBuffer::LargeVariable { nulls, .. } => { + nulls.append(is_valid); + } + ColumnBuffer::Fallback { .. } => { + unreachable!("append_null_bit called on Fallback variant") + } + } + } + + pub(crate) fn memory_size(&self) -> usize { + match self { + ColumnBuffer::Boolean { values, nulls } => values.capacity() + nulls.capacity(), + ColumnBuffer::Fixed { values, nulls, .. } => values.capacity() + nulls.capacity(), + ColumnBuffer::Variable { + offsets, + data, + nulls, + } => { + offsets.capacity() * std::mem::size_of::() + data.capacity() + nulls.capacity() + } + ColumnBuffer::LargeVariable { + offsets, + data, + nulls, + } => { + offsets.capacity() * std::mem::size_of::() + data.capacity() + nulls.capacity() + } + ColumnBuffer::Fallback { indices } => indices.capacity() * std::mem::size_of::(), + } + } +} + +/// Per-partition buffer that accumulates rows by scattering values into typed +/// column buffers. +pub(crate) struct PartitionBuffer { + pub(crate) columns: Vec, + pub(crate) row_count: usize, + schema: SchemaRef, +} + +impl PartitionBuffer { + pub(crate) fn new(schema: SchemaRef, estimated_rows: usize) -> Self { + let columns = schema + .fields() + .iter() + .map(|field| { + let dt = field.data_type(); + if let DataType::Boolean = dt { + ColumnBuffer::Boolean { + values: BooleanBufferBuilder::new(estimated_rows), + nulls: BooleanBufferBuilder::new(estimated_rows), + } + } else if let Some(byte_width) = dt.primitive_width() { + ColumnBuffer::Fixed { + values: MutableBuffer::new(estimated_rows * byte_width), + byte_width, + nulls: BooleanBufferBuilder::new(estimated_rows), + } + } else { + match dt { + DataType::Utf8 | DataType::Binary => ColumnBuffer::Variable { + offsets: vec![0i32], + data: vec![], + nulls: BooleanBufferBuilder::new(estimated_rows), + }, + DataType::LargeUtf8 | DataType::LargeBinary => { + ColumnBuffer::LargeVariable { + offsets: vec![0i64], + data: vec![], + nulls: BooleanBufferBuilder::new(estimated_rows), + } + } + _ => ColumnBuffer::Fallback { indices: vec![] }, + } + } + }) + .collect(); + + Self { + columns, + row_count: 0, + schema, + } + } + + pub(crate) fn append_fixed(&mut self, col_idx: usize, bytes: &[u8], is_valid: bool) { + self.columns[col_idx].append_fixed(bytes); + self.columns[col_idx].append_null_bit(is_valid); + } + + pub(crate) fn append_variable(&mut self, col_idx: usize, bytes: &[u8], is_valid: bool) { + self.columns[col_idx].append_variable(bytes); + self.columns[col_idx].append_null_bit(is_valid); + } + + pub(crate) fn append_large_variable(&mut self, col_idx: usize, bytes: &[u8], is_valid: bool) { + self.columns[col_idx].append_large_variable(bytes); + self.columns[col_idx].append_null_bit(is_valid); + } + + pub(crate) fn append_bool(&mut self, col_idx: usize, value: bool, is_valid: bool) { + self.columns[col_idx].append_bool(value); + self.columns[col_idx].append_null_bit(is_valid); + } + + pub(crate) fn append_fallback_index(&mut self, col_idx: usize, idx: u32) { + self.columns[col_idx].append_fallback_index(idx); + } + + pub(crate) fn row_count(&self) -> usize { + self.row_count + } + + pub(crate) fn memory_size(&self) -> usize { + self.columns.iter().map(|c| c.memory_size()).sum() + } + + pub(crate) fn has_fallback_columns(&self) -> bool { + self.columns + .iter() + .any(|c| matches!(c, ColumnBuffer::Fallback { .. })) + } + + pub(crate) fn flush(&mut self, fallback_batch: Option<&RecordBatch>) -> Result { + let row_count = self.row_count; + let mut arrays: Vec = Vec::with_capacity(self.columns.len()); + + for (col_idx, col) in self.columns.iter_mut().enumerate() { + let data_type = self.schema.field(col_idx).data_type().clone(); + let array: ArrayRef = match col { + ColumnBuffer::Fixed { values, nulls, .. } => { + let buffer = Buffer::from(std::mem::replace(values, MutableBuffer::new(0))); + let mut builder = ArrayData::builder(data_type) + .len(row_count) + .add_buffer(buffer); + if !nulls.is_empty() { + builder = builder.null_bit_buffer(Some(nulls.finish().into_inner())); + } + let data = builder.build()?; + make_array(data) + } + ColumnBuffer::Variable { + offsets, + data, + nulls, + } => { + let offsets_buffer = OffsetBuffer::new(ScalarBuffer::from(std::mem::replace( + offsets, + vec![0i32], + ))); + let values_buffer = Buffer::from(std::mem::take(data)); + let null_buffer = if !nulls.is_empty() { + Some(NullBuffer::new(nulls.finish())) + } else { + None + }; + match &data_type { + DataType::Utf8 => { + Arc::new(StringArray::new(offsets_buffer, values_buffer, null_buffer)) + as ArrayRef + } + DataType::Binary => { + Arc::new(BinaryArray::new(offsets_buffer, values_buffer, null_buffer)) + as ArrayRef + } + _ => unreachable!("Variable buffer with unexpected data type"), + } + } + ColumnBuffer::LargeVariable { + offsets, + data, + nulls, + } => { + let offsets_buffer = OffsetBuffer::new(ScalarBuffer::from(std::mem::replace( + offsets, + vec![0i64], + ))); + let values_buffer = Buffer::from(std::mem::take(data)); + let null_buffer = if !nulls.is_empty() { + Some(NullBuffer::new(nulls.finish())) + } else { + None + }; + match &data_type { + DataType::LargeUtf8 => Arc::new(LargeStringArray::new( + offsets_buffer, + values_buffer, + null_buffer, + )) as ArrayRef, + DataType::LargeBinary => Arc::new(LargeBinaryArray::new( + offsets_buffer, + values_buffer, + null_buffer, + )) as ArrayRef, + _ => unreachable!("LargeVariable buffer with unexpected data type"), + } + } + ColumnBuffer::Boolean { values, nulls } => { + let values_buf = values.finish(); + let null_buffer = if !nulls.is_empty() { + Some(NullBuffer::new(nulls.finish())) + } else { + None + }; + Arc::new(BooleanArray::new(values_buf, null_buffer)) as ArrayRef + } + ColumnBuffer::Fallback { indices } => { + let fallback = + fallback_batch.expect("fallback_batch required for Fallback columns"); + let idx_array = UInt32Array::from(std::mem::take(indices)); + take(fallback.column(col_idx), &idx_array, None)? + } + }; + arrays.push(array); + } + + self.row_count = 0; + Ok(RecordBatch::try_new(Arc::clone(&self.schema), arrays)?) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use arrow::array::{Array, Int32Array}; + use arrow::datatypes::{Field, Schema}; + + #[test] + fn test_partition_buffer_basic() { + let schema = Arc::new(Schema::new(vec![ + Field::new("i", DataType::Int32, true), + Field::new("s", DataType::Utf8, true), + Field::new("b", DataType::Boolean, true), + ])); + let mut buf = PartitionBuffer::new(Arc::clone(&schema), 100); + + // Append 3 rows manually + // Row 0: i=1, s="hello", b=true, all valid + buf.columns[0].append_fixed(&1i32.to_le_bytes()); + buf.columns[0].append_null_bit(true); + buf.columns[1].append_variable(b"hello"); + buf.columns[1].append_null_bit(true); + buf.columns[2].append_bool(true); + buf.columns[2].append_null_bit(true); + buf.row_count += 1; + + // Row 1: i=NULL, s="world", b=false + buf.columns[0].append_fixed(&0i32.to_le_bytes()); + buf.columns[0].append_null_bit(false); // null + buf.columns[1].append_variable(b"world"); + buf.columns[1].append_null_bit(true); + buf.columns[2].append_bool(false); + buf.columns[2].append_null_bit(true); + buf.row_count += 1; + + // Row 2: i=42, s=NULL, b=true + buf.columns[0].append_fixed(&42i32.to_le_bytes()); + buf.columns[0].append_null_bit(true); + buf.columns[1].append_variable(b""); + buf.columns[1].append_null_bit(false); // null + buf.columns[2].append_bool(true); + buf.columns[2].append_null_bit(true); + buf.row_count += 1; + + let batch = buf.flush(None).unwrap(); + assert_eq!(batch.num_rows(), 3); + + // Check Int32 column + let col0 = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(col0.value(0), 1); + assert!(col0.is_null(1)); + assert_eq!(col0.value(2), 42); + + // Check Utf8 column + let col1 = batch + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(col1.value(0), "hello"); + assert_eq!(col1.value(1), "world"); + assert!(col1.is_null(2)); + + // Check Boolean column + let col2 = batch + .column(2) + .as_any() + .downcast_ref::() + .unwrap(); + assert!(col2.value(0)); + assert!(!col2.value(1)); + assert!(col2.value(2)); + + // After flush, row_count should be 0 + assert_eq!(buf.row_count(), 0); + } +} diff --git a/native/core/src/execution/shuffle/partitioners/partitioned_batch_iterator.rs b/native/core/src/execution/shuffle/partitioners/partitioned_batch_iterator.rs deleted file mode 100644 index 77010938cd..0000000000 --- a/native/core/src/execution/shuffle/partitioners/partitioned_batch_iterator.rs +++ /dev/null @@ -1,110 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use arrow::array::RecordBatch; -use arrow::compute::interleave_record_batch; -use datafusion::common::DataFusionError; - -/// A helper struct to produce shuffled batches. -/// This struct takes ownership of the buffered batches and partition indices from the -/// ShuffleRepartitioner, and provides an iterator over the batches in the specified partitions. -pub(super) struct PartitionedBatchesProducer { - buffered_batches: Vec, - partition_indices: Vec>, - batch_size: usize, -} - -impl PartitionedBatchesProducer { - pub(super) fn new( - buffered_batches: Vec, - indices: Vec>, - batch_size: usize, - ) -> Self { - Self { - partition_indices: indices, - buffered_batches, - batch_size, - } - } - - pub(super) fn produce(&mut self, partition_id: usize) -> PartitionedBatchIterator<'_> { - PartitionedBatchIterator::new( - &self.partition_indices[partition_id], - &self.buffered_batches, - self.batch_size, - ) - } -} - -pub(crate) struct PartitionedBatchIterator<'a> { - record_batches: Vec<&'a RecordBatch>, - batch_size: usize, - indices: Vec<(usize, usize)>, - pos: usize, -} - -impl<'a> PartitionedBatchIterator<'a> { - fn new( - indices: &'a [(u32, u32)], - buffered_batches: &'a [RecordBatch], - batch_size: usize, - ) -> Self { - if indices.is_empty() { - // Avoid unnecessary allocations when the partition is empty - return Self { - record_batches: vec![], - batch_size, - indices: vec![], - pos: 0, - }; - } - let record_batches = buffered_batches.iter().collect::>(); - let current_indices = indices - .iter() - .map(|(i_batch, i_row)| (*i_batch as usize, *i_row as usize)) - .collect::>(); - Self { - record_batches, - batch_size, - indices: current_indices, - pos: 0, - } - } -} - -impl Iterator for PartitionedBatchIterator<'_> { - type Item = datafusion::common::Result; - - fn next(&mut self) -> Option { - if self.pos >= self.indices.len() { - return None; - } - - let indices_end = std::cmp::min(self.pos + self.batch_size, self.indices.len()); - let indices = &self.indices[self.pos..indices_end]; - match interleave_record_batch(&self.record_batches, indices) { - Ok(batch) => { - self.pos = indices_end; - Some(Ok(batch)) - } - Err(e) => Some(Err(DataFusionError::ArrowError( - Box::from(e), - Some(DataFusionError::get_back_trace()), - ))), - } - } -} diff --git a/native/core/src/execution/shuffle/writers/partition_writer.rs b/native/core/src/execution/shuffle/writers/partition_writer.rs index 7c2dbe0444..6b2f6b0b7e 100644 --- a/native/core/src/execution/shuffle/writers/partition_writer.rs +++ b/native/core/src/execution/shuffle/writers/partition_writer.rs @@ -16,9 +16,9 @@ // under the License. use crate::execution::shuffle::metrics::ShufflePartitionerMetrics; -use crate::execution::shuffle::partitioners::PartitionedBatchIterator; use crate::execution::shuffle::writers::buf_batch_writer::BufBatchWriter; use crate::execution::shuffle::ShuffleBlockWriter; +use arrow::array::RecordBatch; use datafusion::common::DataFusionError; use datafusion::execution::disk_manager::RefCountedTempFile; use datafusion::execution::runtime_env::RuntimeEnv; @@ -75,40 +75,32 @@ impl PartitionWriter { pub(crate) fn spill( &mut self, - iter: &mut PartitionedBatchIterator, + batches: &[RecordBatch], runtime: &RuntimeEnv, metrics: &ShufflePartitionerMetrics, write_buffer_size: usize, batch_size: usize, ) -> datafusion::common::Result { - if let Some(batch) = iter.next() { - self.ensure_spill_file_created(runtime)?; - - let total_bytes_written = { - let mut buf_batch_writer = BufBatchWriter::new( - &mut self.shuffle_block_writer, - &mut self.spill_file.as_mut().unwrap().file, - write_buffer_size, - batch_size, - ); - let mut bytes_written = - buf_batch_writer.write(&batch?, &metrics.encode_time, &metrics.write_time)?; - for batch in iter { - let batch = batch?; - bytes_written += buf_batch_writer.write( - &batch, - &metrics.encode_time, - &metrics.write_time, - )?; - } - buf_batch_writer.flush(&metrics.encode_time, &metrics.write_time)?; - bytes_written - }; - - Ok(total_bytes_written) - } else { - Ok(0) + if batches.is_empty() { + return Ok(0); } + self.ensure_spill_file_created(runtime)?; + let total_bytes_written = { + let mut buf_batch_writer = BufBatchWriter::new( + &mut self.shuffle_block_writer, + &mut self.spill_file.as_mut().unwrap().file, + write_buffer_size, + batch_size, + ); + let mut bytes_written = 0; + for batch in batches { + bytes_written += + buf_batch_writer.write(batch, &metrics.encode_time, &metrics.write_time)?; + } + buf_batch_writer.flush(&metrics.encode_time, &metrics.write_time)?; + bytes_written + }; + Ok(total_bytes_written) } pub(crate) fn path(&self) -> Option<&std::path::Path> { diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometMetricNode.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometMetricNode.scala index 8c75df1d45..e0f4eee477 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometMetricNode.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometMetricNode.scala @@ -248,6 +248,9 @@ object CometMetricNode { Map( "elapsed_compute" -> SQLMetrics.createNanoTimingMetric(sc, "native shuffle writer time"), "repart_time" -> SQLMetrics.createNanoTimingMetric(sc, "repartition time"), + "scatter_time" -> SQLMetrics.createNanoTimingMetric( + sc, + "scatter to partition buffers time"), "encode_time" -> SQLMetrics.createNanoTimingMetric(sc, "encoding and compression time"), "decode_time" -> SQLMetrics.createNanoTimingMetric(sc, "decoding and decompression time"), "spill_count" -> SQLMetrics.createMetric(sc, "number of spills"), diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometNativeShuffleWriter.scala b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometNativeShuffleWriter.scala index 3fc222bd19..b9d48200d3 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometNativeShuffleWriter.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometNativeShuffleWriter.scala @@ -77,8 +77,9 @@ class CometNativeShuffleWriter[K, V]( val detailedMetrics = Seq( "elapsed_compute", - "encode_time", "repart_time", + "scatter_time", + "encode_time", "input_batches", "spill_count", "spilled_bytes")