From ada1809ecc6e9e7b0ceb220b4f6bcb3243ff4346 Mon Sep 17 00:00:00 2001 From: lyne7-sc <734432041@qq.com> Date: Wed, 28 Jan 2026 23:08:48 +0800 Subject: [PATCH 1/6] perf array_repeat --- datafusion/functions-nested/Cargo.toml | 4 + .../functions-nested/benches/array_repeat.rs | 474 ++++++++++++++++++ datafusion/functions-nested/src/repeat.rs | 186 +++---- 3 files changed, 577 insertions(+), 87 deletions(-) create mode 100644 datafusion/functions-nested/benches/array_repeat.rs diff --git a/datafusion/functions-nested/Cargo.toml b/datafusion/functions-nested/Cargo.toml index e6392207be894..bfbfbd56c8baf 100644 --- a/datafusion/functions-nested/Cargo.toml +++ b/datafusion/functions-nested/Cargo.toml @@ -88,3 +88,7 @@ name = "map" [[bench]] harness = false name = "array_remove" + +[[bench]] +harness = false +name = "array_repeat" diff --git a/datafusion/functions-nested/benches/array_repeat.rs b/datafusion/functions-nested/benches/array_repeat.rs new file mode 100644 index 0000000000000..d3fa4cc356c44 --- /dev/null +++ b/datafusion/functions-nested/benches/array_repeat.rs @@ -0,0 +1,474 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#[macro_use] +extern crate criterion; + +use arrow::array::{ArrayRef, BooleanArray, Float64Array, Int64Array, ListArray}; +use arrow::buffer::OffsetBuffer; +use arrow::datatypes::{DataType, Field}; +use criterion::{BenchmarkId, Criterion}; +use datafusion_common::ScalarValue; +use datafusion_common::config::ConfigOptions; +use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl}; +use datafusion_functions_nested::repeat::ArrayRepeat; +use rand::Rng; +use rand::SeedableRng; +use rand::rngs::StdRng; +use std::hint::black_box; +use std::sync::Arc; + +const NUM_ROWS: &[usize] = &[100, 1000, 10000]; +const REPEAT_COUNTS: &[u64] = &[5, 50]; +const SEED: u64 = 42; +const NULL_DENSITY: f64 = 0.1; + +fn criterion_benchmark(c: &mut Criterion) { + // Test array_repeat with different element types + bench_array_repeat_int64(c); + bench_array_repeat_string(c); + bench_array_repeat_float64(c); + bench_array_repeat_boolean(c); + + // Test array_repeat with list element (nested arrays) + bench_array_repeat_nested_int64_list(c); + bench_array_repeat_nested_string_list(c); +} + +fn bench_array_repeat_int64(c: &mut Criterion) { + let mut group = c.benchmark_group("array_repeat_int64"); + + for &num_rows in NUM_ROWS { + let element_array = create_int64_array(num_rows, NULL_DENSITY); + + for &repeat_count in REPEAT_COUNTS { + let args = vec![ + ColumnarValue::Array(element_array.clone()), + ColumnarValue::Scalar(ScalarValue::from(repeat_count)), + ]; + + group.bench_with_input( + BenchmarkId::new(format!("repeat_{repeat_count}_count"), num_rows), + &num_rows, + |b, _| { + let udf = ArrayRepeat::new(); + b.iter(|| { + black_box( + udf.invoke_with_args(ScalarFunctionArgs { + args: args.clone(), + arg_fields: vec![ + Field::new("element", DataType::Int64, false).into(), + Field::new("count", DataType::UInt64, false).into(), + ], + number_rows: num_rows, + return_field: Field::new( + "result", + DataType::List(Arc::new(Field::new_list_field( + DataType::Int64, + true, + ))), + false, + ) + .into(), + config_options: Arc::new(ConfigOptions::default()), + }) + .unwrap(), + ) + }) + }, + ); + } + } + + group.finish(); +} + +fn bench_array_repeat_string(c: &mut Criterion) { + let mut group = c.benchmark_group("array_repeat_string"); + + for &num_rows in NUM_ROWS { + let element_array = create_string_array(num_rows, NULL_DENSITY); + + for &repeat_count in REPEAT_COUNTS { + let args = vec![ + ColumnarValue::Array(element_array.clone()), + ColumnarValue::Scalar(ScalarValue::from(repeat_count)), + ]; + + group.bench_with_input( + BenchmarkId::new(format!("repeat_{repeat_count}_count"), num_rows), + &num_rows, + |b, _| { + let udf = ArrayRepeat::new(); + b.iter(|| { + black_box( + udf.invoke_with_args(ScalarFunctionArgs { + args: args.clone(), + arg_fields: vec![ + Field::new("element", DataType::Utf8, false).into(), + Field::new("count", DataType::UInt64, false).into(), + ], + number_rows: num_rows, + return_field: Field::new( + "result", + DataType::List(Arc::new(Field::new_list_field( + DataType::Utf8, + true, + ))), + false, + ) + .into(), + config_options: Arc::new(ConfigOptions::default()), + }) + .unwrap(), + ) + }) + }, + ); + } + } + + group.finish(); +} + +fn bench_array_repeat_nested_int64_list(c: &mut Criterion) { + let mut group = c.benchmark_group("array_repeat_nested_int64"); + + for &num_rows in NUM_ROWS { + let list_array = create_int64_list_array(num_rows, 5, NULL_DENSITY); + + for &repeat_count in REPEAT_COUNTS { + let args = vec![ + ColumnarValue::Array(list_array.clone()), + ColumnarValue::Scalar(ScalarValue::from(repeat_count)), + ]; + + group.bench_with_input( + BenchmarkId::new(format!("repeat_{repeat_count}_count"), num_rows), + &num_rows, + |b, _| { + let udf = ArrayRepeat::new(); + b.iter(|| { + black_box( + udf.invoke_with_args(ScalarFunctionArgs { + args: args.clone(), + arg_fields: vec![ + Field::new( + "element", + list_array.data_type().clone(), + false, + ) + .into(), + Field::new("count", DataType::UInt64, false).into(), + ], + number_rows: num_rows, + return_field: Field::new( + "result", + DataType::List(Arc::new(Field::new_list_field( + list_array.data_type().clone(), + true, + ))), + false, + ) + .into(), + config_options: Arc::new(ConfigOptions::default()), + }) + .unwrap(), + ) + }) + }, + ); + } + } + + group.finish(); +} + +fn bench_array_repeat_float64(c: &mut Criterion) { + let mut group = c.benchmark_group("array_repeat_float64"); + + for &num_rows in NUM_ROWS { + let element_array = create_float64_array(num_rows, NULL_DENSITY); + + for &repeat_count in REPEAT_COUNTS { + let args = vec![ + ColumnarValue::Array(element_array.clone()), + ColumnarValue::Scalar(ScalarValue::from(repeat_count)), + ]; + + group.bench_with_input( + BenchmarkId::new(format!("repeat_{repeat_count}_count"), num_rows), + &num_rows, + |b, _| { + let udf = ArrayRepeat::new(); + b.iter(|| { + black_box( + udf.invoke_with_args(ScalarFunctionArgs { + args: args.clone(), + arg_fields: vec![ + Field::new("element", DataType::Float64, false) + .into(), + Field::new("count", DataType::UInt64, false).into(), + ], + number_rows: num_rows, + return_field: Field::new( + "result", + DataType::List(Arc::new(Field::new_list_field( + DataType::Float64, + true, + ))), + false, + ) + .into(), + config_options: Arc::new(ConfigOptions::default()), + }) + .unwrap(), + ) + }) + }, + ); + } + } + + group.finish(); +} + +fn bench_array_repeat_boolean(c: &mut Criterion) { + let mut group = c.benchmark_group("array_repeat_boolean"); + + for &num_rows in NUM_ROWS { + let element_array = create_boolean_array(num_rows, NULL_DENSITY); + + for &repeat_count in REPEAT_COUNTS { + let args = vec![ + ColumnarValue::Array(element_array.clone()), + ColumnarValue::Scalar(ScalarValue::from(repeat_count)), + ]; + + group.bench_with_input( + BenchmarkId::new(format!("repeat_{repeat_count}_count"), num_rows), + &num_rows, + |b, _| { + let udf = ArrayRepeat::new(); + b.iter(|| { + black_box( + udf.invoke_with_args(ScalarFunctionArgs { + args: args.clone(), + arg_fields: vec![ + Field::new("element", DataType::Boolean, false) + .into(), + Field::new("count", DataType::UInt64, false).into(), + ], + number_rows: num_rows, + return_field: Field::new( + "result", + DataType::List(Arc::new(Field::new_list_field( + DataType::Boolean, + true, + ))), + false, + ) + .into(), + config_options: Arc::new(ConfigOptions::default()), + }) + .unwrap(), + ) + }) + }, + ); + } + } + + group.finish(); +} + +fn bench_array_repeat_nested_string_list(c: &mut Criterion) { + let mut group = c.benchmark_group("array_repeat_nested_string"); + + for &num_rows in NUM_ROWS { + let list_array = create_string_list_array(num_rows, 5, NULL_DENSITY); + + for &repeat_count in REPEAT_COUNTS { + let args = vec![ + ColumnarValue::Array(list_array.clone()), + ColumnarValue::Scalar(ScalarValue::from(repeat_count)), + ]; + + group.bench_with_input( + BenchmarkId::new(format!("repeat_{repeat_count}_count"), num_rows), + &num_rows, + |b, _| { + let udf = ArrayRepeat::new(); + b.iter(|| { + black_box( + udf.invoke_with_args(ScalarFunctionArgs { + args: args.clone(), + arg_fields: vec![ + Field::new("element", list_array.data_type().clone(), false) + .into(), + Field::new("count", DataType::UInt64, false).into(), + ], + number_rows: num_rows, + return_field: Field::new( + "result", + DataType::List(Arc::new(Field::new_list_field( + list_array.data_type().clone(), + true, + ))), + false, + ) + .into(), + config_options: Arc::new(ConfigOptions::default()), + }) + .unwrap(), + ) + }) + }, + ); + } + + } + + group.finish(); +} + +fn create_int64_array(num_rows: usize, null_density: f64) -> ArrayRef { + let mut rng = StdRng::seed_from_u64(SEED); + let values = (0..num_rows) + .map(|_| { + if rng.random::() < null_density { + None + } else { + Some(rng.random_range(0..1000)) + } + }) + .collect::(); + + Arc::new(values) +} + +fn create_string_array(num_rows: usize, null_density: f64) -> ArrayRef { + let mut rng = StdRng::seed_from_u64(SEED); + use arrow::array::StringArray; + + let values = (0..num_rows) + .map(|_| { + if rng.random::() < null_density { + None + } else { + Some(format!("value_{}", rng.random_range(0..100))) + } + }) + .collect::(); + + Arc::new(values) +} + +fn create_int64_list_array( + num_rows: usize, + array_size: usize, + null_density: f64, +) -> ArrayRef { + let mut rng = StdRng::seed_from_u64(SEED); + let values = (0..num_rows * array_size) + .map(|_| { + if rng.random::() < null_density { + None + } else { + Some(rng.random_range(0..1000)) + } + }) + .collect::(); + let offsets = (0..=num_rows) + .map(|i| (i * array_size) as i32) + .collect::>(); + + Arc::new( + ListArray::try_new( + Arc::new(Field::new("item", DataType::Int64, true)), + OffsetBuffer::new(offsets.into()), + Arc::new(values), + None, + ) + .unwrap(), + ) +} + +fn create_float64_array(num_rows: usize, null_density: f64) -> ArrayRef { + let mut rng = StdRng::seed_from_u64(SEED); + let values = (0..num_rows) + .map(|_| { + if rng.random::() < null_density { + None + } else { + Some(rng.random_range(0.0..1000.0)) + } + }) + .collect::(); + + Arc::new(values) +} + +fn create_boolean_array(num_rows: usize, null_density: f64) -> ArrayRef { + let mut rng = StdRng::seed_from_u64(SEED); + let values = (0..num_rows) + .map(|_| { + if rng.random::() < null_density { + None + } else { + Some(rng.random()) + } + }) + .collect::(); + + Arc::new(values) +} + +fn create_string_list_array( + num_rows: usize, + array_size: usize, + null_density: f64, +) -> ArrayRef { + let mut rng = StdRng::seed_from_u64(SEED); + use arrow::array::StringArray; + + let values = (0..num_rows * array_size) + .map(|_| { + if rng.random::() < null_density { + None + } else { + Some(format!("value_{}", rng.random_range(0..100))) + } + }) + .collect::(); + let offsets = (0..=num_rows) + .map(|i| (i * array_size) as i32) + .collect::>(); + + Arc::new( + ListArray::try_new( + Arc::new(Field::new("item", DataType::Utf8, true)), + OffsetBuffer::new(offsets.into()), + Arc::new(values), + None, + ) + .unwrap(), + ) +} + +criterion_group!(benches, criterion_benchmark); +criterion_main!(benches); diff --git a/datafusion/functions-nested/src/repeat.rs b/datafusion/functions-nested/src/repeat.rs index a121b5f03162e..cd6af2da28a0c 100644 --- a/datafusion/functions-nested/src/repeat.rs +++ b/datafusion/functions-nested/src/repeat.rs @@ -19,10 +19,10 @@ use crate::utils::make_scalar_function; use arrow::array::{ - Array, ArrayRef, Capacities, GenericListArray, ListArray, MutableArrayData, - OffsetSizeTrait, UInt64Array, new_null_array, + Array, ArrayRef, BooleanBufferBuilder, GenericListArray + , OffsetSizeTrait, UInt64Array, }; -use arrow::buffer::OffsetBuffer; +use arrow::buffer::{NullBuffer, OffsetBuffer}; use arrow::compute; use arrow::compute::cast; use arrow::datatypes::DataType; @@ -31,7 +31,7 @@ use arrow::datatypes::{ Field, }; use datafusion_common::cast::{as_large_list_array, as_list_array, as_uint64_array}; -use datafusion_common::{Result, exec_err, utils::take_function_args}; +use datafusion_common::{exec_err, utils::take_function_args, Result}; use datafusion_expr::{ ColumnarValue, Documentation, ScalarUDFImpl, Signature, Volatility, }; @@ -109,10 +109,17 @@ impl ScalarUDFImpl for ArrayRepeat { } fn return_type(&self, arg_types: &[DataType]) -> Result { - Ok(List(Arc::new(Field::new_list_field( - arg_types[0].clone(), - true, - )))) + let element_type = &arg_types[0]; + match element_type { + LargeList(_) => Ok(LargeList(Arc::new(Field::new_list_field( + element_type.clone(), + true, + )))), + _ => Ok(List(Arc::new(Field::new_list_field( + element_type.clone(), + true, + )))), + } } fn invoke_with_args( @@ -189,42 +196,29 @@ fn general_repeat( array: &ArrayRef, count_array: &UInt64Array, ) -> Result { - let data_type = array.data_type(); - let mut new_values = vec![]; - - let count_vec = count_array - .values() - .to_vec() - .iter() - .map(|x| *x as usize) - .collect::>(); - - for (row_index, &count) in count_vec.iter().enumerate() { - let repeated_array = if array.is_null(row_index) { - new_null_array(data_type, count) - } else { - let original_data = array.to_data(); - let capacity = Capacities::Array(count); - let mut mutable = - MutableArrayData::with_capacities(vec![&original_data], false, capacity); - - for _ in 0..count { - mutable.extend(0, row_index, row_index + 1); - } - - let data = mutable.freeze(); - arrow::array::make_array(data) - }; - new_values.push(repeated_array); + let total_repeated_values: usize = + count_array.values().iter().map(|&c| c as usize).sum(); + + // Build offsets and take_indices + let mut take_indices = Vec::with_capacity(total_repeated_values); + let mut offsets = Vec::with_capacity(count_array.len() + 1); + offsets.push(O::zero()); + let mut running_offset = 0usize; + + for (idx, &count) in count_array.values().iter().enumerate() { + let count = count as usize; + running_offset += count; + offsets.push(O::from_usize(running_offset).unwrap()); + take_indices.extend(std::iter::repeat_n(idx as u64, count)) } - let new_values: Vec<_> = new_values.iter().map(|a| a.as_ref()).collect(); - let values = compute::concat(&new_values)?; + let repeated_values = + compute::take(array.as_ref(), &UInt64Array::from_iter_values(take_indices), None)?; Ok(Arc::new(GenericListArray::::try_new( - Arc::new(Field::new_list_field(data_type.to_owned(), true)), - OffsetBuffer::from_lengths(count_vec), - values, + Arc::new(Field::new_list_field(array.data_type().to_owned(), true)), + OffsetBuffer::new(offsets.into()), + repeated_values, None, )?)) } @@ -243,56 +237,74 @@ fn general_list_repeat( list_array: &GenericListArray, count_array: &UInt64Array, ) -> Result { - let data_type = list_array.data_type(); - let value_type = list_array.value_type(); - let mut new_values = vec![]; - - let count_vec = count_array - .values() - .to_vec() - .iter() - .map(|x| *x as usize) - .collect::>(); - - for (list_array_row, &count) in list_array.iter().zip(count_vec.iter()) { - let list_arr = match list_array_row { - Some(list_array_row) => { - let original_data = list_array_row.to_data(); - let capacity = Capacities::Array(original_data.len() * count); - let mut mutable = MutableArrayData::with_capacities( - vec![&original_data], - false, - capacity, - ); - - for _ in 0..count { - mutable.extend(0, 0, original_data.len()); - } - - let data = mutable.freeze(); - let repeated_array = arrow::array::make_array(data); - - let list_arr = GenericListArray::::try_new( - Arc::new(Field::new_list_field(value_type.clone(), true)), - OffsetBuffer::::from_lengths(vec![original_data.len(); count]), - repeated_array, - None, - )?; - Arc::new(list_arr) as ArrayRef - } - None => new_null_array(data_type, count), - }; - new_values.push(list_arr); + let counts = count_array.values(); + let list_offsets = list_array.value_offsets(); + + // calculate capacities for pre-allocation + let outer_total = counts.iter().map(|&c| c as usize).sum(); + let inner_total = counts.iter() + .enumerate() + .filter(|&(i, _)| !list_array.is_null(i)) + .map(|(i, &c)| { + let len = list_offsets[i + 1].to_usize().unwrap() + - list_offsets[i].to_usize().unwrap(); + len * (c as usize) + }) + .sum(); + + // Build outer offsets + let mut outer_offsets = Vec::with_capacity(counts.len() + 1); + outer_offsets.push(O::zero()); + let mut running_offset = 0usize; + for &count in counts.iter() { + running_offset += count as usize; + outer_offsets.push(O::from_usize(running_offset).unwrap()); } - let lengths = new_values.iter().map(|a| a.len()).collect::>(); - let new_values: Vec<_> = new_values.iter().map(|a| a.as_ref()).collect(); - let values = compute::concat(&new_values)?; + // Build inner structures + let mut inner_offsets = Vec::with_capacity(outer_total + 1); + let mut take_indices = Vec::with_capacity(inner_total); + let mut inner_nulls = BooleanBufferBuilder::new(outer_total); + let mut inner_running = 0usize; + inner_offsets.push(O::zero()); + + for (row_idx, &count) in counts.iter().enumerate() { + let is_valid = !list_array.is_null(row_idx); + let start = list_offsets[row_idx].to_usize().unwrap(); + let end = list_offsets[row_idx + 1].to_usize().unwrap(); + let row_len = end - start; + + for _ in 0..count { + inner_running += row_len; + inner_offsets.push(O::from_usize(inner_running).unwrap()); + inner_nulls.append(is_valid); + if is_valid { + take_indices.extend(start as u64..end as u64); + } + } + } - Ok(Arc::new(ListArray::try_new( - Arc::new(Field::new_list_field(data_type.to_owned(), true)), - OffsetBuffer::::from_lengths(lengths), - values, + // Build inner ListArray + let inner_values = compute::take( + list_array.values().as_ref(), + &UInt64Array::from_iter_values(take_indices), + None, + )?; + let inner_list = GenericListArray::::try_new( + Arc::new(Field::new_list_field(list_array.value_type().clone(), true)), + OffsetBuffer::new(inner_offsets.into()), + inner_values, + Some(NullBuffer::new(inner_nulls.finish())), + )?; + + // Build outer ListArray + Ok(Arc::new(GenericListArray::::try_new( + Arc::new(Field::new_list_field( + list_array.data_type().to_owned(), + true, + )), + OffsetBuffer::new(outer_offsets.into()), + Arc::new(inner_list), None, )?)) } From 03dee1a6dfd69186919b545acfd39200169e5dd0 Mon Sep 17 00:00:00 2001 From: lyne7-sc <734432041@qq.com> Date: Wed, 28 Jan 2026 23:11:14 +0800 Subject: [PATCH 2/6] cargo fmt --- .../functions-nested/benches/array_repeat.rs | 13 ++++++++----- datafusion/functions-nested/src/repeat.rs | 15 +++++++++------ 2 files changed, 17 insertions(+), 11 deletions(-) diff --git a/datafusion/functions-nested/benches/array_repeat.rs b/datafusion/functions-nested/benches/array_repeat.rs index d3fa4cc356c44..69297b5fd3b19 100644 --- a/datafusion/functions-nested/benches/array_repeat.rs +++ b/datafusion/functions-nested/benches/array_repeat.rs @@ -318,8 +318,12 @@ fn bench_array_repeat_nested_string_list(c: &mut Criterion) { udf.invoke_with_args(ScalarFunctionArgs { args: args.clone(), arg_fields: vec![ - Field::new("element", list_array.data_type().clone(), false) - .into(), + Field::new( + "element", + list_array.data_type().clone(), + false, + ) + .into(), Field::new("count", DataType::UInt64, false).into(), ], number_rows: num_rows, @@ -331,16 +335,15 @@ fn bench_array_repeat_nested_string_list(c: &mut Criterion) { ))), false, ) - .into(), + .into(), config_options: Arc::new(ConfigOptions::default()), }) - .unwrap(), + .unwrap(), ) }) }, ); } - } group.finish(); diff --git a/datafusion/functions-nested/src/repeat.rs b/datafusion/functions-nested/src/repeat.rs index cd6af2da28a0c..0102ef99a8729 100644 --- a/datafusion/functions-nested/src/repeat.rs +++ b/datafusion/functions-nested/src/repeat.rs @@ -19,8 +19,7 @@ use crate::utils::make_scalar_function; use arrow::array::{ - Array, ArrayRef, BooleanBufferBuilder, GenericListArray - , OffsetSizeTrait, UInt64Array, + Array, ArrayRef, BooleanBufferBuilder, GenericListArray, OffsetSizeTrait, UInt64Array, }; use arrow::buffer::{NullBuffer, OffsetBuffer}; use arrow::compute; @@ -31,7 +30,7 @@ use arrow::datatypes::{ Field, }; use datafusion_common::cast::{as_large_list_array, as_list_array, as_uint64_array}; -use datafusion_common::{exec_err, utils::take_function_args, Result}; +use datafusion_common::{Result, exec_err, utils::take_function_args}; use datafusion_expr::{ ColumnarValue, Documentation, ScalarUDFImpl, Signature, Volatility, }; @@ -212,8 +211,11 @@ fn general_repeat( take_indices.extend(std::iter::repeat_n(idx as u64, count)) } - let repeated_values = - compute::take(array.as_ref(), &UInt64Array::from_iter_values(take_indices), None)?; + let repeated_values = compute::take( + array.as_ref(), + &UInt64Array::from_iter_values(take_indices), + None, + )?; Ok(Arc::new(GenericListArray::::try_new( Arc::new(Field::new_list_field(array.data_type().to_owned(), true)), @@ -242,7 +244,8 @@ fn general_list_repeat( // calculate capacities for pre-allocation let outer_total = counts.iter().map(|&c| c as usize).sum(); - let inner_total = counts.iter() + let inner_total = counts + .iter() .enumerate() .filter(|&(i, _)| !list_array.is_null(i)) .map(|(i, &c)| { From edefff47dd6f56400c366b988738976a46fe148f Mon Sep 17 00:00:00 2001 From: lyne7-sc <734432041@qq.com> Date: Wed, 28 Jan 2026 23:30:21 +0800 Subject: [PATCH 3/6] nit --- datafusion/functions-nested/src/repeat.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/datafusion/functions-nested/src/repeat.rs b/datafusion/functions-nested/src/repeat.rs index 0102ef99a8729..4b1973f0f99ea 100644 --- a/datafusion/functions-nested/src/repeat.rs +++ b/datafusion/functions-nested/src/repeat.rs @@ -30,7 +30,7 @@ use arrow::datatypes::{ Field, }; use datafusion_common::cast::{as_large_list_array, as_list_array, as_uint64_array}; -use datafusion_common::{Result, exec_err, utils::take_function_args}; +use datafusion_common::{exec_err, utils::take_function_args, Result}; use datafusion_expr::{ ColumnarValue, Documentation, ScalarUDFImpl, Signature, Volatility, }; @@ -195,10 +195,9 @@ fn general_repeat( array: &ArrayRef, count_array: &UInt64Array, ) -> Result { + // Build offsets and take_indices let total_repeated_values: usize = count_array.values().iter().map(|&c| c as usize).sum(); - - // Build offsets and take_indices let mut take_indices = Vec::with_capacity(total_repeated_values); let mut offsets = Vec::with_capacity(count_array.len() + 1); offsets.push(O::zero()); @@ -211,12 +210,14 @@ fn general_repeat( take_indices.extend(std::iter::repeat_n(idx as u64, count)) } + // Build the flattened values let repeated_values = compute::take( array.as_ref(), &UInt64Array::from_iter_values(take_indices), None, )?; + // Construct final ListArray Ok(Arc::new(GenericListArray::::try_new( Arc::new(Field::new_list_field(array.data_type().to_owned(), true)), OffsetBuffer::new(offsets.into()), From 2f097e2eec047c12c7bc44c3d839932543e7b99b Mon Sep 17 00:00:00 2001 From: lyne7-sc <734432041@qq.com> Date: Wed, 28 Jan 2026 23:54:28 +0800 Subject: [PATCH 4/6] fmt --- datafusion/functions-nested/src/repeat.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/functions-nested/src/repeat.rs b/datafusion/functions-nested/src/repeat.rs index 4b1973f0f99ea..a82857cb43e4b 100644 --- a/datafusion/functions-nested/src/repeat.rs +++ b/datafusion/functions-nested/src/repeat.rs @@ -30,7 +30,7 @@ use arrow::datatypes::{ Field, }; use datafusion_common::cast::{as_large_list_array, as_list_array, as_uint64_array}; -use datafusion_common::{exec_err, utils::take_function_args, Result}; +use datafusion_common::{Result, exec_err, utils::take_function_args}; use datafusion_expr::{ ColumnarValue, Documentation, ScalarUDFImpl, Signature, Volatility, }; From 84a312ab71d8c6ca2492bd616390200a593a5c20 Mon Sep 17 00:00:00 2001 From: lyne7-sc <734432041@qq.com> Date: Thu, 29 Jan 2026 23:04:35 +0800 Subject: [PATCH 5/6] count null slt --- datafusion/sqllogictest/test_files/array.slt | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/datafusion/sqllogictest/test_files/array.slt b/datafusion/sqllogictest/test_files/array.slt index 590f6b1a9ab9f..c27433e7efab3 100644 --- a/datafusion/sqllogictest/test_files/array.slt +++ b/datafusion/sqllogictest/test_files/array.slt @@ -3256,6 +3256,24 @@ drop table array_repeat_table; statement ok drop table large_array_repeat_table; + +statement ok +create table array_repeat_null_count_table +as values +(1, 2), +(2, null), +(3, 1); + +query I? +select column1, array_repeat(column1, column2) from array_repeat_null_count_table; +---- +1 [1, 1] +2 [] +3 [3] + +statement ok +drop table array_repeat_null_count_table + ## array_concat (aliases: `array_cat`, `list_concat`, `list_cat`) # test with empty array From a8eafe6e6f9a5dfe205c78f5ef32487152e895b3 Mon Sep 17 00:00:00 2001 From: lyne7-sc <734432041@qq.com> Date: Fri, 30 Jan 2026 21:39:01 +0800 Subject: [PATCH 6/6] use from_length --- datafusion/functions-nested/src/repeat.rs | 11 +---------- 1 file changed, 1 insertion(+), 10 deletions(-) diff --git a/datafusion/functions-nested/src/repeat.rs b/datafusion/functions-nested/src/repeat.rs index a82857cb43e4b..28ec827cc5a01 100644 --- a/datafusion/functions-nested/src/repeat.rs +++ b/datafusion/functions-nested/src/repeat.rs @@ -256,15 +256,6 @@ fn general_list_repeat( }) .sum(); - // Build outer offsets - let mut outer_offsets = Vec::with_capacity(counts.len() + 1); - outer_offsets.push(O::zero()); - let mut running_offset = 0usize; - for &count in counts.iter() { - running_offset += count as usize; - outer_offsets.push(O::from_usize(running_offset).unwrap()); - } - // Build inner structures let mut inner_offsets = Vec::with_capacity(outer_total + 1); let mut take_indices = Vec::with_capacity(inner_total); @@ -307,7 +298,7 @@ fn general_list_repeat( list_array.data_type().to_owned(), true, )), - OffsetBuffer::new(outer_offsets.into()), + OffsetBuffer::::from_lengths(counts.iter().map(|&c| c as usize)), Arc::new(inner_list), None, )?))