diff --git a/datafusion/functions-nested/Cargo.toml b/datafusion/functions-nested/Cargo.toml index e6392207be894..bfbfbd56c8baf 100644 --- a/datafusion/functions-nested/Cargo.toml +++ b/datafusion/functions-nested/Cargo.toml @@ -88,3 +88,7 @@ name = "map" [[bench]] harness = false name = "array_remove" + +[[bench]] +harness = false +name = "array_repeat" diff --git a/datafusion/functions-nested/benches/array_repeat.rs b/datafusion/functions-nested/benches/array_repeat.rs new file mode 100644 index 0000000000000..69297b5fd3b19 --- /dev/null +++ b/datafusion/functions-nested/benches/array_repeat.rs @@ -0,0 +1,477 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#[macro_use] +extern crate criterion; + +use arrow::array::{ArrayRef, BooleanArray, Float64Array, Int64Array, ListArray}; +use arrow::buffer::OffsetBuffer; +use arrow::datatypes::{DataType, Field}; +use criterion::{BenchmarkId, Criterion}; +use datafusion_common::ScalarValue; +use datafusion_common::config::ConfigOptions; +use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl}; +use datafusion_functions_nested::repeat::ArrayRepeat; +use rand::Rng; +use rand::SeedableRng; +use rand::rngs::StdRng; +use std::hint::black_box; +use std::sync::Arc; + +const NUM_ROWS: &[usize] = &[100, 1000, 10000]; +const REPEAT_COUNTS: &[u64] = &[5, 50]; +const SEED: u64 = 42; +const NULL_DENSITY: f64 = 0.1; + +fn criterion_benchmark(c: &mut Criterion) { + // Test array_repeat with different element types + bench_array_repeat_int64(c); + bench_array_repeat_string(c); + bench_array_repeat_float64(c); + bench_array_repeat_boolean(c); + + // Test array_repeat with list element (nested arrays) + bench_array_repeat_nested_int64_list(c); + bench_array_repeat_nested_string_list(c); +} + +fn bench_array_repeat_int64(c: &mut Criterion) { + let mut group = c.benchmark_group("array_repeat_int64"); + + for &num_rows in NUM_ROWS { + let element_array = create_int64_array(num_rows, NULL_DENSITY); + + for &repeat_count in REPEAT_COUNTS { + let args = vec![ + ColumnarValue::Array(element_array.clone()), + ColumnarValue::Scalar(ScalarValue::from(repeat_count)), + ]; + + group.bench_with_input( + BenchmarkId::new(format!("repeat_{repeat_count}_count"), num_rows), + &num_rows, + |b, _| { + let udf = ArrayRepeat::new(); + b.iter(|| { + black_box( + udf.invoke_with_args(ScalarFunctionArgs { + args: args.clone(), + arg_fields: vec![ + Field::new("element", DataType::Int64, false).into(), + Field::new("count", DataType::UInt64, false).into(), + ], + number_rows: num_rows, + return_field: Field::new( + "result", + DataType::List(Arc::new(Field::new_list_field( + DataType::Int64, + true, + ))), + false, + ) + .into(), + config_options: Arc::new(ConfigOptions::default()), + }) + .unwrap(), + ) + }) + }, + ); + } + } + + group.finish(); +} + +fn bench_array_repeat_string(c: &mut Criterion) { + let mut group = c.benchmark_group("array_repeat_string"); + + for &num_rows in NUM_ROWS { + let element_array = create_string_array(num_rows, NULL_DENSITY); + + for &repeat_count in REPEAT_COUNTS { + let args = vec![ + ColumnarValue::Array(element_array.clone()), + ColumnarValue::Scalar(ScalarValue::from(repeat_count)), + ]; + + group.bench_with_input( + BenchmarkId::new(format!("repeat_{repeat_count}_count"), num_rows), + &num_rows, + |b, _| { + let udf = ArrayRepeat::new(); + b.iter(|| { + black_box( + udf.invoke_with_args(ScalarFunctionArgs { + args: args.clone(), + arg_fields: vec![ + Field::new("element", DataType::Utf8, false).into(), + Field::new("count", DataType::UInt64, false).into(), + ], + number_rows: num_rows, + return_field: Field::new( + "result", + DataType::List(Arc::new(Field::new_list_field( + DataType::Utf8, + true, + ))), + false, + ) + .into(), + config_options: Arc::new(ConfigOptions::default()), + }) + .unwrap(), + ) + }) + }, + ); + } + } + + group.finish(); +} + +fn bench_array_repeat_nested_int64_list(c: &mut Criterion) { + let mut group = c.benchmark_group("array_repeat_nested_int64"); + + for &num_rows in NUM_ROWS { + let list_array = create_int64_list_array(num_rows, 5, NULL_DENSITY); + + for &repeat_count in REPEAT_COUNTS { + let args = vec![ + ColumnarValue::Array(list_array.clone()), + ColumnarValue::Scalar(ScalarValue::from(repeat_count)), + ]; + + group.bench_with_input( + BenchmarkId::new(format!("repeat_{repeat_count}_count"), num_rows), + &num_rows, + |b, _| { + let udf = ArrayRepeat::new(); + b.iter(|| { + black_box( + udf.invoke_with_args(ScalarFunctionArgs { + args: args.clone(), + arg_fields: vec![ + Field::new( + "element", + list_array.data_type().clone(), + false, + ) + .into(), + Field::new("count", DataType::UInt64, false).into(), + ], + number_rows: num_rows, + return_field: Field::new( + "result", + DataType::List(Arc::new(Field::new_list_field( + list_array.data_type().clone(), + true, + ))), + false, + ) + .into(), + config_options: Arc::new(ConfigOptions::default()), + }) + .unwrap(), + ) + }) + }, + ); + } + } + + group.finish(); +} + +fn bench_array_repeat_float64(c: &mut Criterion) { + let mut group = c.benchmark_group("array_repeat_float64"); + + for &num_rows in NUM_ROWS { + let element_array = create_float64_array(num_rows, NULL_DENSITY); + + for &repeat_count in REPEAT_COUNTS { + let args = vec![ + ColumnarValue::Array(element_array.clone()), + ColumnarValue::Scalar(ScalarValue::from(repeat_count)), + ]; + + group.bench_with_input( + BenchmarkId::new(format!("repeat_{repeat_count}_count"), num_rows), + &num_rows, + |b, _| { + let udf = ArrayRepeat::new(); + b.iter(|| { + black_box( + udf.invoke_with_args(ScalarFunctionArgs { + args: args.clone(), + arg_fields: vec![ + Field::new("element", DataType::Float64, false) + .into(), + Field::new("count", DataType::UInt64, false).into(), + ], + number_rows: num_rows, + return_field: Field::new( + "result", + DataType::List(Arc::new(Field::new_list_field( + DataType::Float64, + true, + ))), + false, + ) + .into(), + config_options: Arc::new(ConfigOptions::default()), + }) + .unwrap(), + ) + }) + }, + ); + } + } + + group.finish(); +} + +fn bench_array_repeat_boolean(c: &mut Criterion) { + let mut group = c.benchmark_group("array_repeat_boolean"); + + for &num_rows in NUM_ROWS { + let element_array = create_boolean_array(num_rows, NULL_DENSITY); + + for &repeat_count in REPEAT_COUNTS { + let args = vec![ + ColumnarValue::Array(element_array.clone()), + ColumnarValue::Scalar(ScalarValue::from(repeat_count)), + ]; + + group.bench_with_input( + BenchmarkId::new(format!("repeat_{repeat_count}_count"), num_rows), + &num_rows, + |b, _| { + let udf = ArrayRepeat::new(); + b.iter(|| { + black_box( + udf.invoke_with_args(ScalarFunctionArgs { + args: args.clone(), + arg_fields: vec![ + Field::new("element", DataType::Boolean, false) + .into(), + Field::new("count", DataType::UInt64, false).into(), + ], + number_rows: num_rows, + return_field: Field::new( + "result", + DataType::List(Arc::new(Field::new_list_field( + DataType::Boolean, + true, + ))), + false, + ) + .into(), + config_options: Arc::new(ConfigOptions::default()), + }) + .unwrap(), + ) + }) + }, + ); + } + } + + group.finish(); +} + +fn bench_array_repeat_nested_string_list(c: &mut Criterion) { + let mut group = c.benchmark_group("array_repeat_nested_string"); + + for &num_rows in NUM_ROWS { + let list_array = create_string_list_array(num_rows, 5, NULL_DENSITY); + + for &repeat_count in REPEAT_COUNTS { + let args = vec![ + ColumnarValue::Array(list_array.clone()), + ColumnarValue::Scalar(ScalarValue::from(repeat_count)), + ]; + + group.bench_with_input( + BenchmarkId::new(format!("repeat_{repeat_count}_count"), num_rows), + &num_rows, + |b, _| { + let udf = ArrayRepeat::new(); + b.iter(|| { + black_box( + udf.invoke_with_args(ScalarFunctionArgs { + args: args.clone(), + arg_fields: vec![ + Field::new( + "element", + list_array.data_type().clone(), + false, + ) + .into(), + Field::new("count", DataType::UInt64, false).into(), + ], + number_rows: num_rows, + return_field: Field::new( + "result", + DataType::List(Arc::new(Field::new_list_field( + list_array.data_type().clone(), + true, + ))), + false, + ) + .into(), + config_options: Arc::new(ConfigOptions::default()), + }) + .unwrap(), + ) + }) + }, + ); + } + } + + group.finish(); +} + +fn create_int64_array(num_rows: usize, null_density: f64) -> ArrayRef { + let mut rng = StdRng::seed_from_u64(SEED); + let values = (0..num_rows) + .map(|_| { + if rng.random::() < null_density { + None + } else { + Some(rng.random_range(0..1000)) + } + }) + .collect::(); + + Arc::new(values) +} + +fn create_string_array(num_rows: usize, null_density: f64) -> ArrayRef { + let mut rng = StdRng::seed_from_u64(SEED); + use arrow::array::StringArray; + + let values = (0..num_rows) + .map(|_| { + if rng.random::() < null_density { + None + } else { + Some(format!("value_{}", rng.random_range(0..100))) + } + }) + .collect::(); + + Arc::new(values) +} + +fn create_int64_list_array( + num_rows: usize, + array_size: usize, + null_density: f64, +) -> ArrayRef { + let mut rng = StdRng::seed_from_u64(SEED); + let values = (0..num_rows * array_size) + .map(|_| { + if rng.random::() < null_density { + None + } else { + Some(rng.random_range(0..1000)) + } + }) + .collect::(); + let offsets = (0..=num_rows) + .map(|i| (i * array_size) as i32) + .collect::>(); + + Arc::new( + ListArray::try_new( + Arc::new(Field::new("item", DataType::Int64, true)), + OffsetBuffer::new(offsets.into()), + Arc::new(values), + None, + ) + .unwrap(), + ) +} + +fn create_float64_array(num_rows: usize, null_density: f64) -> ArrayRef { + let mut rng = StdRng::seed_from_u64(SEED); + let values = (0..num_rows) + .map(|_| { + if rng.random::() < null_density { + None + } else { + Some(rng.random_range(0.0..1000.0)) + } + }) + .collect::(); + + Arc::new(values) +} + +fn create_boolean_array(num_rows: usize, null_density: f64) -> ArrayRef { + let mut rng = StdRng::seed_from_u64(SEED); + let values = (0..num_rows) + .map(|_| { + if rng.random::() < null_density { + None + } else { + Some(rng.random()) + } + }) + .collect::(); + + Arc::new(values) +} + +fn create_string_list_array( + num_rows: usize, + array_size: usize, + null_density: f64, +) -> ArrayRef { + let mut rng = StdRng::seed_from_u64(SEED); + use arrow::array::StringArray; + + let values = (0..num_rows * array_size) + .map(|_| { + if rng.random::() < null_density { + None + } else { + Some(format!("value_{}", rng.random_range(0..100))) + } + }) + .collect::(); + let offsets = (0..=num_rows) + .map(|i| (i * array_size) as i32) + .collect::>(); + + Arc::new( + ListArray::try_new( + Arc::new(Field::new("item", DataType::Utf8, true)), + OffsetBuffer::new(offsets.into()), + Arc::new(values), + None, + ) + .unwrap(), + ) +} + +criterion_group!(benches, criterion_benchmark); +criterion_main!(benches); diff --git a/datafusion/functions-nested/src/repeat.rs b/datafusion/functions-nested/src/repeat.rs index a121b5f03162e..28ec827cc5a01 100644 --- a/datafusion/functions-nested/src/repeat.rs +++ b/datafusion/functions-nested/src/repeat.rs @@ -19,10 +19,9 @@ use crate::utils::make_scalar_function; use arrow::array::{ - Array, ArrayRef, Capacities, GenericListArray, ListArray, MutableArrayData, - OffsetSizeTrait, UInt64Array, new_null_array, + Array, ArrayRef, BooleanBufferBuilder, GenericListArray, OffsetSizeTrait, UInt64Array, }; -use arrow::buffer::OffsetBuffer; +use arrow::buffer::{NullBuffer, OffsetBuffer}; use arrow::compute; use arrow::compute::cast; use arrow::datatypes::DataType; @@ -109,10 +108,17 @@ impl ScalarUDFImpl for ArrayRepeat { } fn return_type(&self, arg_types: &[DataType]) -> Result { - Ok(List(Arc::new(Field::new_list_field( - arg_types[0].clone(), - true, - )))) + let element_type = &arg_types[0]; + match element_type { + LargeList(_) => Ok(LargeList(Arc::new(Field::new_list_field( + element_type.clone(), + true, + )))), + _ => Ok(List(Arc::new(Field::new_list_field( + element_type.clone(), + true, + )))), + } } fn invoke_with_args( @@ -189,42 +195,33 @@ fn general_repeat( array: &ArrayRef, count_array: &UInt64Array, ) -> Result { - let data_type = array.data_type(); - let mut new_values = vec![]; - - let count_vec = count_array - .values() - .to_vec() - .iter() - .map(|x| *x as usize) - .collect::>(); - - for (row_index, &count) in count_vec.iter().enumerate() { - let repeated_array = if array.is_null(row_index) { - new_null_array(data_type, count) - } else { - let original_data = array.to_data(); - let capacity = Capacities::Array(count); - let mut mutable = - MutableArrayData::with_capacities(vec![&original_data], false, capacity); - - for _ in 0..count { - mutable.extend(0, row_index, row_index + 1); - } - - let data = mutable.freeze(); - arrow::array::make_array(data) - }; - new_values.push(repeated_array); + // Build offsets and take_indices + let total_repeated_values: usize = + count_array.values().iter().map(|&c| c as usize).sum(); + let mut take_indices = Vec::with_capacity(total_repeated_values); + let mut offsets = Vec::with_capacity(count_array.len() + 1); + offsets.push(O::zero()); + let mut running_offset = 0usize; + + for (idx, &count) in count_array.values().iter().enumerate() { + let count = count as usize; + running_offset += count; + offsets.push(O::from_usize(running_offset).unwrap()); + take_indices.extend(std::iter::repeat_n(idx as u64, count)) } - let new_values: Vec<_> = new_values.iter().map(|a| a.as_ref()).collect(); - let values = compute::concat(&new_values)?; + // Build the flattened values + let repeated_values = compute::take( + array.as_ref(), + &UInt64Array::from_iter_values(take_indices), + None, + )?; + // Construct final ListArray Ok(Arc::new(GenericListArray::::try_new( - Arc::new(Field::new_list_field(data_type.to_owned(), true)), - OffsetBuffer::from_lengths(count_vec), - values, + Arc::new(Field::new_list_field(array.data_type().to_owned(), true)), + OffsetBuffer::new(offsets.into()), + repeated_values, None, )?)) } @@ -243,56 +240,66 @@ fn general_list_repeat( list_array: &GenericListArray, count_array: &UInt64Array, ) -> Result { - let data_type = list_array.data_type(); - let value_type = list_array.value_type(); - let mut new_values = vec![]; + let counts = count_array.values(); + let list_offsets = list_array.value_offsets(); - let count_vec = count_array - .values() - .to_vec() + // calculate capacities for pre-allocation + let outer_total = counts.iter().map(|&c| c as usize).sum(); + let inner_total = counts .iter() - .map(|x| *x as usize) - .collect::>(); - - for (list_array_row, &count) in list_array.iter().zip(count_vec.iter()) { - let list_arr = match list_array_row { - Some(list_array_row) => { - let original_data = list_array_row.to_data(); - let capacity = Capacities::Array(original_data.len() * count); - let mut mutable = MutableArrayData::with_capacities( - vec![&original_data], - false, - capacity, - ); - - for _ in 0..count { - mutable.extend(0, 0, original_data.len()); - } - - let data = mutable.freeze(); - let repeated_array = arrow::array::make_array(data); - - let list_arr = GenericListArray::::try_new( - Arc::new(Field::new_list_field(value_type.clone(), true)), - OffsetBuffer::::from_lengths(vec![original_data.len(); count]), - repeated_array, - None, - )?; - Arc::new(list_arr) as ArrayRef + .enumerate() + .filter(|&(i, _)| !list_array.is_null(i)) + .map(|(i, &c)| { + let len = list_offsets[i + 1].to_usize().unwrap() + - list_offsets[i].to_usize().unwrap(); + len * (c as usize) + }) + .sum(); + + // Build inner structures + let mut inner_offsets = Vec::with_capacity(outer_total + 1); + let mut take_indices = Vec::with_capacity(inner_total); + let mut inner_nulls = BooleanBufferBuilder::new(outer_total); + let mut inner_running = 0usize; + inner_offsets.push(O::zero()); + + for (row_idx, &count) in counts.iter().enumerate() { + let is_valid = !list_array.is_null(row_idx); + let start = list_offsets[row_idx].to_usize().unwrap(); + let end = list_offsets[row_idx + 1].to_usize().unwrap(); + let row_len = end - start; + + for _ in 0..count { + inner_running += row_len; + inner_offsets.push(O::from_usize(inner_running).unwrap()); + inner_nulls.append(is_valid); + if is_valid { + take_indices.extend(start as u64..end as u64); } - None => new_null_array(data_type, count), - }; - new_values.push(list_arr); + } } - let lengths = new_values.iter().map(|a| a.len()).collect::>(); - let new_values: Vec<_> = new_values.iter().map(|a| a.as_ref()).collect(); - let values = compute::concat(&new_values)?; - - Ok(Arc::new(ListArray::try_new( - Arc::new(Field::new_list_field(data_type.to_owned(), true)), - OffsetBuffer::::from_lengths(lengths), - values, + // Build inner ListArray + let inner_values = compute::take( + list_array.values().as_ref(), + &UInt64Array::from_iter_values(take_indices), + None, + )?; + let inner_list = GenericListArray::::try_new( + Arc::new(Field::new_list_field(list_array.value_type().clone(), true)), + OffsetBuffer::new(inner_offsets.into()), + inner_values, + Some(NullBuffer::new(inner_nulls.finish())), + )?; + + // Build outer ListArray + Ok(Arc::new(GenericListArray::::try_new( + Arc::new(Field::new_list_field( + list_array.data_type().to_owned(), + true, + )), + OffsetBuffer::::from_lengths(counts.iter().map(|&c| c as usize)), + Arc::new(inner_list), None, )?)) } diff --git a/datafusion/sqllogictest/test_files/array.slt b/datafusion/sqllogictest/test_files/array.slt index 590f6b1a9ab9f..c27433e7efab3 100644 --- a/datafusion/sqllogictest/test_files/array.slt +++ b/datafusion/sqllogictest/test_files/array.slt @@ -3256,6 +3256,24 @@ drop table array_repeat_table; statement ok drop table large_array_repeat_table; + +statement ok +create table array_repeat_null_count_table +as values +(1, 2), +(2, null), +(3, 1); + +query I? +select column1, array_repeat(column1, column2) from array_repeat_null_count_table; +---- +1 [1, 1] +2 [] +3 [3] + +statement ok +drop table array_repeat_null_count_table + ## array_concat (aliases: `array_cat`, `list_concat`, `list_cat`) # test with empty array