Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions datafusion/functions-nested/src/extract.rs
Original file line number Diff line number Diff line change
Expand Up @@ -602,14 +602,12 @@ fn combine_input_nulls(
to_array: &Int64Array,
stride: Option<&Int64Array>,
) -> Option<NullBuffer> {
[
NullBuffer::union_many([
array.nulls(),
from_array.nulls(),
to_array.nulls(),
stride.and_then(|s| s.nulls()),
]
.into_iter()
.fold(None, |acc, nulls| NullBuffer::union(acc.as_ref(), nulls))
])
}

fn general_array_slice<O: OffsetSizeTrait>(
Expand Down
7 changes: 4 additions & 3 deletions datafusion/functions/src/datetime/make_time.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,10 +142,11 @@ impl ScalarUDFImpl for MakeTimeFunc {
let minutes = minutes.as_primitive::<Int32Type>();
let seconds = seconds.as_primitive::<Int32Type>();

let nulls = NullBuffer::union(
NullBuffer::union(hours.nulls(), minutes.nulls()).as_ref(),
let nulls = NullBuffer::union_many([
hours.nulls(),
minutes.nulls(),
seconds.nulls(),
);
]);

let mut values = Vec::with_capacity(len);
for i in 0..len {
Expand Down
14 changes: 8 additions & 6 deletions datafusion/functions/src/string/replace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,10 +165,11 @@ fn replace_view(args: &[ArrayRef]) -> Result<ArrayRef> {
let len = string_array.len();
let mut builder = GenericStringArrayBuilder::<i32>::with_capacity(len, 0);
let mut buffer = String::new();
let nulls = NullBuffer::union(
NullBuffer::union(string_array.nulls(), from_array.nulls()).as_ref(),
let nulls = NullBuffer::union_many([
string_array.nulls(),
from_array.nulls(),
to_array.nulls(),
);
]);

// Hoist the nulls.is_some() check out of the loop. LLVM does not always
// unswitch this loop on its own (the Utf8View body is large enough to
Expand Down Expand Up @@ -212,10 +213,11 @@ fn replace<T: OffsetSizeTrait>(args: &[ArrayRef]) -> Result<ArrayRef> {
let len = string_array.len();
let mut builder = GenericStringArrayBuilder::<T>::with_capacity(len, 0);
let mut buffer = String::new();
let nulls = NullBuffer::union(
NullBuffer::union(string_array.nulls(), from_array.nulls()).as_ref(),
let nulls = NullBuffer::union_many([
string_array.nulls(),
from_array.nulls(),
to_array.nulls(),
);
]);

// Hoist the nulls.is_some() check out of the loop. LLVM unswitches this
// automatically today, but kept explicit so the no-nulls fast path is not
Expand Down
21 changes: 12 additions & 9 deletions datafusion/functions/src/unicode/substr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,10 +279,11 @@ fn string_view_substr(
enable_ascii_fast_path(&string_view_array, start_array, count_array_opt);

// Combine null bitmaps from all inputs in bulk.
let nulls = NullBuffer::union(
NullBuffer::union(string_view_array.nulls(), start_array.nulls()).as_ref(),
let nulls = NullBuffer::union_many([
string_view_array.nulls(),
start_array.nulls(),
count_array_opt.and_then(|a| a.nulls()),
);
]);

let mut views_buf = Vec::with_capacity(string_view_array.len());

Expand Down Expand Up @@ -363,10 +364,11 @@ fn generic_string_substr<T: OffsetSizeTrait>(
let mut has_out_of_line = false;

// Combine null bitmaps from all inputs in bulk.
let nulls = NullBuffer::union(
NullBuffer::union(string_array.nulls(), start_array.nulls()).as_ref(),
let nulls = NullBuffer::union_many([
string_array.nulls(),
start_array.nulls(),
count_array_opt.and_then(|a| a.nulls()),
);
]);

for i in 0..string_array.len() {
if nulls.as_ref().is_some_and(|n| n.is_null(i)) {
Expand Down Expand Up @@ -418,10 +420,11 @@ fn generic_string_substr_copy<T: OffsetSizeTrait>(
let is_ascii = enable_ascii_fast_path(&string_array, start_array, count_array_opt);

// Combine null bitmaps from all inputs in bulk.
let nulls = NullBuffer::union(
NullBuffer::union(string_array.nulls(), start_array.nulls()).as_ref(),
let nulls = NullBuffer::union_many([
string_array.nulls(),
start_array.nulls(),
count_array_opt.and_then(|a| a.nulls()),
);
]);

let mut result_builder = StringViewBuilder::new();

Expand Down
14 changes: 8 additions & 6 deletions datafusion/functions/src/unicode/substrindex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,10 +273,11 @@ where
{
let num_rows = string_array.len();
// Output is null if and only if any input is null.
let nulls = NullBuffer::union(
NullBuffer::union(string_array.nulls(), delimiter_array.nulls()).as_ref(),
let nulls = NullBuffer::union_many([
string_array.nulls(),
delimiter_array.nulls(),
count_array.nulls(),
);
]);

for i in 0..num_rows {
if nulls.as_ref().is_some_and(|n| n.is_null(i)) {
Expand All @@ -299,10 +300,11 @@ fn substr_index_view(
delimiter_array: &StringViewArray,
count_array: &PrimitiveArray<Int64Type>,
) -> Result<ArrayRef> {
let nulls = NullBuffer::union(
NullBuffer::union(string_array.nulls(), delimiter_array.nulls()).as_ref(),
let nulls = NullBuffer::union_many([
string_array.nulls(),
delimiter_array.nulls(),
count_array.nulls(),
);
]);
let views = string_array.views();
let mut views_buf = Vec::with_capacity(string_array.len());
let mut has_out_of_line = false;
Expand Down
2 changes: 1 addition & 1 deletion datafusion/spark/src/function/array/repeat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ fn spark_array_repeat(args: ScalarFunctionArgs) -> Result<ColumnarValue> {
let return_type = return_field.data_type().clone();

// A NULL element should be repeated into the array, not cause a NULL result.
let null_mask = compute_null_mask(&arg_values[1..], number_rows)?;
let null_mask = compute_null_mask(&arg_values[1..]);

// If count is null then return NULL immediately
if matches!(null_mask, NullMaskResolution::ReturnNull) {
Expand Down
15 changes: 7 additions & 8 deletions datafusion/spark/src/function/map/str_to_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,14 +185,13 @@ fn str_to_map_impl<'a, V: StringArrayType<'a> + Copy>(
let num_rows = text_array.len();

// Precompute combined null buffer from all input arrays.
// NullBuffer::union performs a bitmap-level AND, which is more efficient
// than checking per-row nullability inline.
let text_nulls = text_array.nulls().cloned();
let pair_nulls = pair_delim_array.and_then(|a| a.nulls().cloned());
let kv_nulls = kv_delim_array.and_then(|a| a.nulls().cloned());
let combined_nulls = [text_nulls.as_ref(), pair_nulls.as_ref(), kv_nulls.as_ref()]
.into_iter()
.fold(None, |acc, nulls| NullBuffer::union(acc.as_ref(), nulls));
// NullBuffer::union_many performs a bitmap-level AND, which is more
// efficient than checking per-row nullability inline.
let combined_nulls = NullBuffer::union_many([
text_array.nulls(),
pair_delim_array.as_ref().and_then(|a| a.nulls()),
kv_delim_array.as_ref().and_then(|a| a.nulls()),
]);

// Use field names matching map_type_from_key_value_types: "key" and "value"
let field_names = MapFieldNames {
Expand Down
80 changes: 33 additions & 47 deletions datafusion/spark/src/function/null_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,65 +23,51 @@ use datafusion_expr::ColumnarValue;
use std::sync::Arc;

pub(crate) enum NullMaskResolution {
/// Return NULL as the result (e.g., scalar inputs with at least one NULL)
/// All inputs are scalars and at least one is NULL -> return NULL
ReturnNull,
/// No null mask needed (e.g., all scalar inputs are non-NULL)
/// All inputs are non-NULL -> no null mask needed
NoMask,
/// Null mask to apply for arrays
Apply(NullBuffer),
}

/// Compute NULL mask for the arguments using NullBuffer::union
pub(crate) fn compute_null_mask(
args: &[ColumnarValue],
number_rows: usize,
) -> Result<NullMaskResolution> {
// Check if all arguments are scalars
let all_scalars = args
.iter()
.all(|arg| matches!(arg, ColumnarValue::Scalar(_)));
pub(crate) fn compute_null_mask(args: &[ColumnarValue]) -> NullMaskResolution {
let mut array_len = None;
let mut has_null_scalar = false;

if all_scalars {
// For scalars, check if any is NULL
for arg in args {
if let ColumnarValue::Scalar(scalar) = arg
&& scalar.is_null()
{
return Ok(NullMaskResolution::ReturnNull);
for arg in args {
match arg {
ColumnarValue::Array(array) => {
array_len.get_or_insert_with(|| array.len());
}
ColumnarValue::Scalar(scalar) => {
has_null_scalar |= scalar.is_null();
}
}
// No NULLs in scalars
Ok(NullMaskResolution::NoMask)
} else {
// For arrays, compute NULL mask for each row using NullBuffer::union
let array_len = args
.iter()
.find_map(|arg| match arg {
ColumnarValue::Array(array) => Some(array.len()),
_ => None,
})
.unwrap_or(number_rows);
}

// Convert all scalars to arrays for uniform processing
let arrays: Result<Vec<_>> = args
.iter()
.map(|arg| match arg {
ColumnarValue::Array(array) => Ok(Arc::clone(array)),
ColumnarValue::Scalar(scalar) => scalar.to_array_of_size(array_len),
})
.collect();
let arrays = arrays?;
let Some(array_len) = array_len else {
// All arguments are scalars
return if has_null_scalar {
NullMaskResolution::ReturnNull
} else {
NullMaskResolution::NoMask
};
};

// Use NullBuffer::union to combine all null buffers
let combined_nulls = arrays
.iter()
.map(|arr| arr.nulls())
.fold(None, |acc, nulls| NullBuffer::union(acc.as_ref(), nulls));
if has_null_scalar {
return NullMaskResolution::Apply(NullBuffer::new_null(array_len));
}

match combined_nulls {
Some(nulls) => Ok(NullMaskResolution::Apply(nulls)),
None => Ok(NullMaskResolution::NoMask),
}
let combined_nulls =
NullBuffer::union_many(args.iter().filter_map(|arg| match arg {
ColumnarValue::Array(array) => Some(array.nulls()),
ColumnarValue::Scalar(_) => None,
}));

match combined_nulls {
Some(nulls) => NullMaskResolution::Apply(nulls),
None => NullMaskResolution::NoMask,
}
}

Expand Down
2 changes: 1 addition & 1 deletion datafusion/spark/src/function/string/concat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ fn spark_concat(args: ScalarFunctionArgs) -> Result<ColumnarValue> {
}

// Step 1: Check for NULL mask in incoming args
let null_mask = compute_null_mask(&arg_values, number_rows)?;
let null_mask = compute_null_mask(&arg_values);

// If all scalars and any is NULL, return NULL immediately
if matches!(null_mask, NullMaskResolution::ReturnNull) {
Expand Down
Loading