diff --git a/datafusion/expr/src/window_state.rs b/datafusion/expr/src/window_state.rs index d7da7a778b011..f8d4609d3690c 100644 --- a/datafusion/expr/src/window_state.rs +++ b/datafusion/expr/src/window_state.rs @@ -396,6 +396,11 @@ impl WindowFrameStateRange { length: usize, ) -> Result { let current_row_values = get_row_at_idx(range_columns, idx)?; + let search_start = if SIDE { + last_range.start + } else { + last_range.end + }; let end_range = if let Some(delta) = delta { let is_descending: bool = self .sort_options @@ -407,34 +412,40 @@ impl WindowFrameStateRange { })? .descending; - current_row_values - .iter() - .map(|value| { - if value.is_null() { - return Ok(value.clone()); + // On overflow the boundary exceeds the type's range and is + // effectively unbounded within the partition. Collapse to the + // partition edge rather than feeding `search_in_slice` a + // wrapped-around target: PRECEDING searches reach `search_start`, + // FOLLOWING searches reach `length`. + let unbounded_edge = if SEARCH_SIDE { search_start } else { length }; + let mut targets = Vec::with_capacity(current_row_values.len()); + for value in ¤t_row_values { + if value.is_null() { + targets.push(value.clone()); + continue; + } + let target = if SEARCH_SIDE == is_descending { + match value.add_checked(delta) { + Ok(v) => v, + Err(_) => return Ok(unbounded_edge), } - if SEARCH_SIDE == is_descending { - // TODO: Handle positive overflows. - value.add(delta) - } else if value.is_unsigned() && value < delta { - // NOTE: This gets a polymorphic zero without having long coercion code for ScalarValue. - // If we decide to implement a "default" construction mechanism for ScalarValue, - // change the following statement to use that. - value.sub(value) - } else { - // TODO: Handle negative overflows. - value.sub(delta) + } else if value.is_unsigned() && value < delta { + // NOTE: This gets a polymorphic zero without having long coercion code for ScalarValue. + // If we decide to implement a "default" construction mechanism for ScalarValue, + // change the following statement to use that. + value.sub(value)? + } else { + match value.sub_checked(delta) { + Ok(v) => v, + Err(_) => return Ok(unbounded_edge), } - }) - .collect::>>()? + }; + targets.push(target); + } + targets } else { current_row_values }; - let search_start = if SIDE { - last_range.start - } else { - last_range.end - }; let compare_fn = |current: &[ScalarValue], target: &[ScalarValue]| { let cmp = compare_rows(current, target, &self.sort_options)?; Ok(if SIDE { cmp.is_lt() } else { cmp.is_le() }) diff --git a/datafusion/functions-aggregate/src/average.rs b/datafusion/functions-aggregate/src/average.rs index bcccea381324e..24f2777797b93 100644 --- a/datafusion/functions-aggregate/src/average.rs +++ b/datafusion/functions-aggregate/src/average.rs @@ -519,9 +519,16 @@ impl Accumulator for AvgAccumulator { } fn evaluate(&mut self) -> Result { - Ok(ScalarValue::Float64( - self.sum.map(|f| f / self.count as f64), - )) + // In sliding-window mode `retract_batch` can bring `count` back to 0 + // while `sum` remains `Some(..)` (possibly zero or a floating-point + // residual). Guard against that so the frame with no non-NULL values + // yields NULL rather than NaN / ±Inf. + let avg = if self.count == 0 { + None + } else { + self.sum.map(|f| f / self.count as f64) + }; + Ok(ScalarValue::Float64(avg)) } fn size(&self) -> usize { @@ -584,17 +591,23 @@ impl Accumulator for DecimalAvgAccumu } fn evaluate(&mut self) -> Result { - let v = self - .sum - .map(|v| { - DecimalAverager::::try_new( - self.sum_scale, - self.target_precision, - self.target_scale, - )? - .avg(v, T::Native::from_usize(self.count as usize).unwrap()) - }) - .transpose()?; + // `count == 0` can occur in sliding-window mode after `retract_batch` + // removes every contributing value. Return NULL rather than dividing + // by zero (which would panic for integer decimal types). + let v = if self.count == 0 { + None + } else { + self.sum + .map(|v| { + DecimalAverager::::try_new( + self.sum_scale, + self.target_precision, + self.target_scale, + )? + .avg(v, T::Native::from_usize(self.count as usize).unwrap()) + }) + .transpose()? + }; ScalarValue::new_primitive::( v, @@ -670,7 +683,14 @@ impl Accumulator for DurationAvgAccumulator { } fn evaluate(&mut self) -> Result { - let avg = self.sum.map(|sum| sum / self.count as i64); + // Guard against `count == 0` which can happen in sliding-window mode + // after every contributing value has been retracted. Without this + // check we would integer-divide by zero. + let avg = if self.count == 0 { + None + } else { + self.sum.map(|sum| sum / self.count as i64) + }; match self.result_unit { TimeUnit::Second => Ok(ScalarValue::DurationSecond(avg)), diff --git a/datafusion/functions-window/src/nth_value.rs b/datafusion/functions-window/src/nth_value.rs index 6c6139405cbe8..82e1081f75318 100644 --- a/datafusion/functions-window/src/nth_value.rs +++ b/datafusion/functions-window/src/nth_value.rs @@ -308,14 +308,22 @@ impl WindowUDFImpl for NthValue { } fn field(&self, field_args: WindowUDFFieldArgs) -> Result { - let return_type = field_args - .input_fields() - .first() - .map(|f| f.data_type()) - .cloned() - .unwrap_or(DataType::Null); - - Ok(Field::new(field_args.name(), return_type, true).into()) + let input_field = + field_args + .input_fields() + .first() + .cloned() + .unwrap_or_else(|| { + Arc::new(Field::new(field_args.name(), DataType::Null, true)) + }); + + // Clone the input field to preserve metadata, update name and nullability + Ok(input_field + .as_ref() + .clone() + .with_name(field_args.name()) + .with_nullable(true) + .into()) } fn reverse_expr(&self) -> ReversedUDWF { diff --git a/datafusion/sqllogictest/test_files/metadata.slt b/datafusion/sqllogictest/test_files/metadata.slt index 3fea8df260f05..3e2a503e6b3fc 100644 --- a/datafusion/sqllogictest/test_files/metadata.slt +++ b/datafusion/sqllogictest/test_files/metadata.slt @@ -472,5 +472,51 @@ select arrow_metadata(with_metadata(id, 'unit', ''), 'unit') from table_with_met ---- (empty) +# Regression test: window functions should preserve field metadata +# Test FIRST_VALUE window function preserves metadata +query IT +select + first_value(id) over (order by id asc nulls last) as fv, + arrow_metadata(first_value(id) over (order by id asc nulls last), 'metadata_key') as meta +from table_with_metadata limit 1; +---- +1 the id field + +# Test LAST_VALUE window function preserves metadata +query IT +select + last_value(id) over (order by id asc nulls last rows between unbounded preceding and unbounded following) as lv, + arrow_metadata(last_value(id) over (order by id asc nulls last rows between unbounded preceding and unbounded following), 'metadata_key') as meta +from table_with_metadata limit 1; +---- +NULL the id field + +# Test NTH_VALUE window function preserves metadata +query IT +select + nth_value(id, 2) over (order by id asc nulls last) as nv, + arrow_metadata(nth_value(id, 2) over (order by id asc nulls last), 'metadata_key') as meta +from table_with_metadata limit 1; +---- +NULL the id field + +# Test LEAD window function preserves metadata +query IT +select + lead(id) over (order by id asc nulls last) as ld, + arrow_metadata(lead(id) over (order by id asc nulls last), 'metadata_key') as meta +from table_with_metadata limit 1; +---- +3 the id field + +# Test LAG window function preserves metadata +query IT +select + lag(id) over (order by id asc nulls last) as lg, + arrow_metadata(lag(id) over (order by id asc nulls last), 'metadata_key') as meta +from table_with_metadata limit 1; +---- +NULL the id field + statement ok drop table table_with_metadata; diff --git a/datafusion/sqllogictest/test_files/window.slt b/datafusion/sqllogictest/test_files/window.slt index 74c2e38baaad5..96b811093ecb2 100644 --- a/datafusion/sqllogictest/test_files/window.slt +++ b/datafusion/sqllogictest/test_files/window.slt @@ -6236,6 +6236,274 @@ INNER JOIN issue_20194_t2 t2 ---- 6774502793 10040029 1 +# Regression tests for RANGE window frames whose value-offset boundary +# computation overflows the type's representable range. Previously these +# queries panicked in functions-window/src/nth_value.rs with +# "attempt to subtract with overflow" because the wrapped-around target +# produced a frame range where `end < start`. Both positive overflows +# (target above type MAX) and negative overflows (target below type MIN) +# must be treated as unbounded within the partition. + +############################################################################ +# Positive overflow: value + delta exceeds type MAX +############################################################################ + +# ASC + FOLLOWING: end bound wraps past i64::MAX. +query II +SELECT a, last_value(a) OVER (ORDER BY a RANGE BETWEEN CURRENT ROW AND 4 FOLLOWING) +FROM ( + SELECT 9223372036854775804 AS a + UNION ALL SELECT 9223372036854775805 + UNION ALL SELECT 9223372036854775806 +); +---- +9223372036854775804 9223372036854775806 +9223372036854775805 9223372036854775806 +9223372036854775806 9223372036854775806 + +query II +SELECT a, first_value(a) OVER (ORDER BY a RANGE BETWEEN CURRENT ROW AND 4 FOLLOWING) +FROM ( + SELECT 9223372036854775804 AS a + UNION ALL SELECT 9223372036854775805 + UNION ALL SELECT 9223372036854775806 +); +---- +9223372036854775804 9223372036854775804 +9223372036854775805 9223372036854775805 +9223372036854775806 9223372036854775806 + +# Symmetric PRECEDING/FOLLOWING where the FOLLOWING side overflows past MAX. +query II +SELECT a, last_value(a) OVER (ORDER BY a RANGE BETWEEN 4 PRECEDING AND 4 FOLLOWING) +FROM ( + SELECT 9223372036854775804 AS a + UNION ALL SELECT 9223372036854775805 + UNION ALL SELECT 9223372036854775806 +); +---- +9223372036854775804 9223372036854775806 +9223372036854775805 9223372036854775806 +9223372036854775806 9223372036854775806 + +# DESC + PRECEDING: "PRECEDING" walks toward larger values in DESC order, +# so offsetting past i64::MAX exercises the ADD-overflow path. +query II +SELECT a, first_value(a) OVER (ORDER BY a DESC RANGE BETWEEN 4 PRECEDING AND CURRENT ROW) +FROM ( + SELECT 9223372036854775804 AS a + UNION ALL SELECT 9223372036854775805 + UNION ALL SELECT 9223372036854775806 +); +---- +9223372036854775806 9223372036854775806 +9223372036854775805 9223372036854775806 +9223372036854775804 9223372036854775806 + +query II +SELECT a, last_value(a) OVER (ORDER BY a DESC RANGE BETWEEN 4 PRECEDING AND CURRENT ROW) +FROM ( + SELECT 9223372036854775804 AS a + UNION ALL SELECT 9223372036854775805 + UNION ALL SELECT 9223372036854775806 +); +---- +9223372036854775806 9223372036854775806 +9223372036854775805 9223372036854775805 +9223372036854775804 9223372036854775804 + +# Unsigned ordering column: add past u64::MAX must not wrap. +query II +SELECT a, last_value(a) OVER (ORDER BY a RANGE BETWEEN CURRENT ROW AND 4 FOLLOWING) +FROM ( + SELECT arrow_cast(18446744073709551612, 'UInt64') AS a + UNION ALL SELECT arrow_cast(18446744073709551613, 'UInt64') + UNION ALL SELECT arrow_cast(18446744073709551614, 'UInt64') +); +---- +18446744073709551612 18446744073709551614 +18446744073709551613 18446744073709551614 +18446744073709551614 18446744073709551614 + +query II +SELECT a, first_value(a) OVER (ORDER BY a RANGE BETWEEN CURRENT ROW AND 4 FOLLOWING) +FROM ( + SELECT arrow_cast(18446744073709551612, 'UInt64') AS a + UNION ALL SELECT arrow_cast(18446744073709551613, 'UInt64') + UNION ALL SELECT arrow_cast(18446744073709551614, 'UInt64') +); +---- +18446744073709551612 18446744073709551612 +18446744073709551613 18446744073709551613 +18446744073709551614 18446744073709551614 + +############################################################################ +# Negative overflow: value - delta falls below type MIN +############################################################################ + +# ASC + PRECEDING: start bound wraps below i64::MIN. +query II +SELECT a, first_value(a) OVER (ORDER BY a RANGE BETWEEN 4 PRECEDING AND CURRENT ROW) +FROM ( + SELECT -9223372036854775807 AS a + UNION ALL SELECT -9223372036854775806 + UNION ALL SELECT -9223372036854775805 +); +---- +-9223372036854775807 -9223372036854775807 +-9223372036854775806 -9223372036854775807 +-9223372036854775805 -9223372036854775807 + +query II +SELECT a, last_value(a) OVER (ORDER BY a RANGE BETWEEN 4 PRECEDING AND CURRENT ROW) +FROM ( + SELECT -9223372036854775807 AS a + UNION ALL SELECT -9223372036854775806 + UNION ALL SELECT -9223372036854775805 +); +---- +-9223372036854775807 -9223372036854775807 +-9223372036854775806 -9223372036854775806 +-9223372036854775805 -9223372036854775805 + +# Symmetric PRECEDING/FOLLOWING where the PRECEDING side underflows past MIN. +query II +SELECT a, first_value(a) OVER (ORDER BY a RANGE BETWEEN 4 PRECEDING AND 4 FOLLOWING) +FROM ( + SELECT -9223372036854775807 AS a + UNION ALL SELECT -9223372036854775806 + UNION ALL SELECT -9223372036854775805 +); +---- +-9223372036854775807 -9223372036854775807 +-9223372036854775806 -9223372036854775807 +-9223372036854775805 -9223372036854775807 + +# DESC + FOLLOWING: "FOLLOWING" walks toward smaller values in DESC order, +# so offsetting past i64::MIN exercises the SUB-underflow path. +query II +SELECT a, last_value(a) OVER (ORDER BY a DESC RANGE BETWEEN CURRENT ROW AND 4 FOLLOWING) +FROM ( + SELECT -9223372036854775805 AS a + UNION ALL SELECT -9223372036854775806 + UNION ALL SELECT -9223372036854775807 +); +---- +-9223372036854775805 -9223372036854775807 +-9223372036854775806 -9223372036854775807 +-9223372036854775807 -9223372036854775807 + +query II +SELECT a, first_value(a) OVER (ORDER BY a DESC RANGE BETWEEN CURRENT ROW AND 4 FOLLOWING) +FROM ( + SELECT -9223372036854775805 AS a + UNION ALL SELECT -9223372036854775806 + UNION ALL SELECT -9223372036854775807 +); +---- +-9223372036854775805 -9223372036854775805 +-9223372036854775806 -9223372036854775806 +-9223372036854775807 -9223372036854775807 + +# Unsigned ordering column: subtracting an offset that would go below 0 +# must saturate to 0, not wrap to u64::MAX. +query II +SELECT a, first_value(a) OVER (ORDER BY a RANGE BETWEEN 4 PRECEDING AND CURRENT ROW) +FROM ( + SELECT arrow_cast(1, 'UInt64') AS a + UNION ALL SELECT arrow_cast(2, 'UInt64') + UNION ALL SELECT arrow_cast(3, 'UInt64') +); +---- +1 1 +2 1 +3 1 + +query II +SELECT a, last_value(a) OVER (ORDER BY a RANGE BETWEEN 4 PRECEDING AND CURRENT ROW) +FROM ( + SELECT arrow_cast(1, 'UInt64') AS a + UNION ALL SELECT arrow_cast(2, 'UInt64') + UNION ALL SELECT arrow_cast(3, 'UInt64') +); +---- +1 1 +2 2 +3 3 + +############################################################################ +# ROWS frame regression guard: huge offsets already saturate via +# saturating_sub / min(length), verify we keep that behavior. +############################################################################ + +query II +SELECT a, last_value(a) OVER (ORDER BY a ROWS BETWEEN CURRENT ROW AND 9223372036854775807 FOLLOWING) +FROM ( + SELECT 1 AS a UNION ALL SELECT 2 UNION ALL SELECT 3 +); +---- +1 3 +2 3 +3 3 + +query II +SELECT a, first_value(a) OVER (ORDER BY a ROWS BETWEEN 9223372036854775807 PRECEDING AND CURRENT ROW) +FROM ( + SELECT 1 AS a UNION ALL SELECT 2 UNION ALL SELECT 3 +); +---- +1 1 +2 1 +3 1 + +# AVG over a sliding window must yield NULL when the frame has no non-NULL +# values — including frames that became empty via `retract_batch`. Covers +# Float64, Decimal, and the narrow-frame retract-to-empty case. +query IR +SELECT i, AVG(v) OVER (ORDER BY i ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING) +FROM (VALUES(1,1),(2,2),(3,CAST(NULL AS INT)),(4,CAST(NULL AS INT))) t(i,v) +ORDER BY i; +---- +1 1.5 +2 2 +3 NULL +4 NULL + +# All-NULL input — every frame is empty. +query IR +SELECT i, AVG(v) OVER (ORDER BY i ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING) +FROM (VALUES(1,CAST(NULL AS INT)),(2,CAST(NULL AS INT))) t(i,v) +ORDER BY i; +---- +1 NULL +2 NULL + +# Narrow sliding frame that drains to empty each row. +query IR +SELECT i, AVG(v) OVER (ORDER BY i ROWS BETWEEN 1 PRECEDING AND 1 PRECEDING) +FROM (VALUES(1,CAST(NULL AS INT)),(2,1),(3,CAST(NULL AS INT)),(4,CAST(NULL AS INT))) t(i,v) +ORDER BY i; +---- +1 NULL +2 NULL +3 1 +4 NULL + +# Decimal variant — the integer-division path would otherwise panic on an +# empty frame. +query IR +SELECT i, AVG(v) OVER (ORDER BY i ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING) +FROM (VALUES(1,CAST(1.5 AS DECIMAL(10,2))), + (2,CAST(2.5 AS DECIMAL(10,2))), + (3,CAST(NULL AS DECIMAL(10,2))), + (4,CAST(NULL AS DECIMAL(10,2)))) t(i,v) +ORDER BY i; +---- +1 2 +2 2.5 +3 NULL +4 NULL + # Config reset statement ok reset datafusion.execution.batch_size;