Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 34 additions & 23 deletions datafusion/expr/src/window_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,11 @@ impl WindowFrameStateRange {
length: usize,
) -> Result<usize> {
let current_row_values = get_row_at_idx(range_columns, idx)?;
let search_start = if SIDE {
last_range.start
} else {
last_range.end
};
let end_range = if let Some(delta) = delta {
let is_descending: bool = self
.sort_options
Expand All @@ -407,34 +412,40 @@ impl WindowFrameStateRange {
})?
.descending;

current_row_values
.iter()
.map(|value| {
if value.is_null() {
return Ok(value.clone());
// On overflow the boundary exceeds the type's range and is
// effectively unbounded within the partition. Collapse to the
// partition edge rather than feeding `search_in_slice` a
// wrapped-around target: PRECEDING searches reach `search_start`,
// FOLLOWING searches reach `length`.
let unbounded_edge = if SEARCH_SIDE { search_start } else { length };
let mut targets = Vec::with_capacity(current_row_values.len());
for value in &current_row_values {
if value.is_null() {
targets.push(value.clone());
continue;
}
let target = if SEARCH_SIDE == is_descending {
match value.add_checked(delta) {
Ok(v) => v,
Err(_) => return Ok(unbounded_edge),
}
if SEARCH_SIDE == is_descending {
// TODO: Handle positive overflows.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 task completed

value.add(delta)
} else if value.is_unsigned() && value < delta {
// NOTE: This gets a polymorphic zero without having long coercion code for ScalarValue.
// If we decide to implement a "default" construction mechanism for ScalarValue,
// change the following statement to use that.
value.sub(value)
} else {
// TODO: Handle negative overflows.
value.sub(delta)
} else if value.is_unsigned() && value < delta {
// NOTE: This gets a polymorphic zero without having long coercion code for ScalarValue.
// If we decide to implement a "default" construction mechanism for ScalarValue,
// change the following statement to use that.
value.sub(value)?
} else {
match value.sub_checked(delta) {
Ok(v) => v,
Err(_) => return Ok(unbounded_edge),
}
})
.collect::<Result<Vec<ScalarValue>>>()?
};
targets.push(target);
}
targets
} else {
current_row_values
};
let search_start = if SIDE {
last_range.start
} else {
last_range.end
};
let compare_fn = |current: &[ScalarValue], target: &[ScalarValue]| {
let cmp = compare_rows(current, target, &self.sort_options)?;
Ok(if SIDE { cmp.is_lt() } else { cmp.is_le() })
Expand Down
220 changes: 220 additions & 0 deletions datafusion/sqllogictest/test_files/window.slt
Original file line number Diff line number Diff line change
Expand Up @@ -6236,6 +6236,226 @@ INNER JOIN issue_20194_t2 t2
----
6774502793 10040029 1

# Regression tests for RANGE window frames whose value-offset boundary
# computation overflows the type's representable range. Previously these
# queries panicked in functions-window/src/nth_value.rs with
# "attempt to subtract with overflow" because the wrapped-around target
# produced a frame range where `end < start`. Both positive overflows
# (target above type MAX) and negative overflows (target below type MIN)
# must be treated as unbounded within the partition.

############################################################################
# Positive overflow: value + delta exceeds type MAX
############################################################################

# ASC + FOLLOWING: end bound wraps past i64::MAX.
query II
SELECT a, last_value(a) OVER (ORDER BY a RANGE BETWEEN CURRENT ROW AND 4 FOLLOWING)
FROM (
SELECT 9223372036854775804 AS a
UNION ALL SELECT 9223372036854775805
UNION ALL SELECT 9223372036854775806
);
----
9223372036854775804 9223372036854775806
9223372036854775805 9223372036854775806
9223372036854775806 9223372036854775806

query II
SELECT a, first_value(a) OVER (ORDER BY a RANGE BETWEEN CURRENT ROW AND 4 FOLLOWING)
FROM (
SELECT 9223372036854775804 AS a
UNION ALL SELECT 9223372036854775805
UNION ALL SELECT 9223372036854775806
);
----
9223372036854775804 9223372036854775804
9223372036854775805 9223372036854775805
9223372036854775806 9223372036854775806

# Symmetric PRECEDING/FOLLOWING where the FOLLOWING side overflows past MAX.
query II
SELECT a, last_value(a) OVER (ORDER BY a RANGE BETWEEN 4 PRECEDING AND 4 FOLLOWING)
FROM (
SELECT 9223372036854775804 AS a
UNION ALL SELECT 9223372036854775805
UNION ALL SELECT 9223372036854775806
);
----
9223372036854775804 9223372036854775806
9223372036854775805 9223372036854775806
9223372036854775806 9223372036854775806

# DESC + PRECEDING: "PRECEDING" walks toward larger values in DESC order,
# so offsetting past i64::MAX exercises the ADD-overflow path.
query II
SELECT a, first_value(a) OVER (ORDER BY a DESC RANGE BETWEEN 4 PRECEDING AND CURRENT ROW)
FROM (
SELECT 9223372036854775804 AS a
UNION ALL SELECT 9223372036854775805
UNION ALL SELECT 9223372036854775806
);
----
9223372036854775806 9223372036854775806
9223372036854775805 9223372036854775806
9223372036854775804 9223372036854775806

query II
SELECT a, last_value(a) OVER (ORDER BY a DESC RANGE BETWEEN 4 PRECEDING AND CURRENT ROW)
FROM (
SELECT 9223372036854775804 AS a
UNION ALL SELECT 9223372036854775805
UNION ALL SELECT 9223372036854775806
);
----
9223372036854775806 9223372036854775806
9223372036854775805 9223372036854775805
9223372036854775804 9223372036854775804

# Unsigned ordering column: add past u64::MAX must not wrap.
query II
SELECT a, last_value(a) OVER (ORDER BY a RANGE BETWEEN CURRENT ROW AND 4 FOLLOWING)
FROM (
SELECT arrow_cast(18446744073709551612, 'UInt64') AS a
UNION ALL SELECT arrow_cast(18446744073709551613, 'UInt64')
UNION ALL SELECT arrow_cast(18446744073709551614, 'UInt64')
);
----
18446744073709551612 18446744073709551614
18446744073709551613 18446744073709551614
18446744073709551614 18446744073709551614

query II
SELECT a, first_value(a) OVER (ORDER BY a RANGE BETWEEN CURRENT ROW AND 4 FOLLOWING)
FROM (
SELECT arrow_cast(18446744073709551612, 'UInt64') AS a
UNION ALL SELECT arrow_cast(18446744073709551613, 'UInt64')
UNION ALL SELECT arrow_cast(18446744073709551614, 'UInt64')
);
----
18446744073709551612 18446744073709551612
18446744073709551613 18446744073709551613
18446744073709551614 18446744073709551614

############################################################################
# Negative overflow: value - delta falls below type MIN
############################################################################

# ASC + PRECEDING: start bound wraps below i64::MIN.
query II
SELECT a, first_value(a) OVER (ORDER BY a RANGE BETWEEN 4 PRECEDING AND CURRENT ROW)
FROM (
SELECT -9223372036854775807 AS a
UNION ALL SELECT -9223372036854775806
UNION ALL SELECT -9223372036854775805
);
----
-9223372036854775807 -9223372036854775807
-9223372036854775806 -9223372036854775807
-9223372036854775805 -9223372036854775807

query II
SELECT a, last_value(a) OVER (ORDER BY a RANGE BETWEEN 4 PRECEDING AND CURRENT ROW)
FROM (
SELECT -9223372036854775807 AS a
UNION ALL SELECT -9223372036854775806
UNION ALL SELECT -9223372036854775805
);
----
-9223372036854775807 -9223372036854775807
-9223372036854775806 -9223372036854775806
-9223372036854775805 -9223372036854775805

# Symmetric PRECEDING/FOLLOWING where the PRECEDING side underflows past MIN.
query II
SELECT a, first_value(a) OVER (ORDER BY a RANGE BETWEEN 4 PRECEDING AND 4 FOLLOWING)
FROM (
SELECT -9223372036854775807 AS a
UNION ALL SELECT -9223372036854775806
UNION ALL SELECT -9223372036854775805
);
----
-9223372036854775807 -9223372036854775807
-9223372036854775806 -9223372036854775807
-9223372036854775805 -9223372036854775807

# DESC + FOLLOWING: "FOLLOWING" walks toward smaller values in DESC order,
# so offsetting past i64::MIN exercises the SUB-underflow path.
query II
SELECT a, last_value(a) OVER (ORDER BY a DESC RANGE BETWEEN CURRENT ROW AND 4 FOLLOWING)
FROM (
SELECT -9223372036854775805 AS a
UNION ALL SELECT -9223372036854775806
UNION ALL SELECT -9223372036854775807
);
----
-9223372036854775805 -9223372036854775807
-9223372036854775806 -9223372036854775807
-9223372036854775807 -9223372036854775807

query II
SELECT a, first_value(a) OVER (ORDER BY a DESC RANGE BETWEEN CURRENT ROW AND 4 FOLLOWING)
FROM (
SELECT -9223372036854775805 AS a
UNION ALL SELECT -9223372036854775806
UNION ALL SELECT -9223372036854775807
);
----
-9223372036854775805 -9223372036854775805
-9223372036854775806 -9223372036854775806
-9223372036854775807 -9223372036854775807

# Unsigned ordering column: subtracting an offset that would go below 0
# must saturate to 0, not wrap to u64::MAX.
query II
SELECT a, first_value(a) OVER (ORDER BY a RANGE BETWEEN 4 PRECEDING AND CURRENT ROW)
FROM (
SELECT arrow_cast(1, 'UInt64') AS a
UNION ALL SELECT arrow_cast(2, 'UInt64')
UNION ALL SELECT arrow_cast(3, 'UInt64')
);
----
1 1
2 1
3 1

query II
SELECT a, last_value(a) OVER (ORDER BY a RANGE BETWEEN 4 PRECEDING AND CURRENT ROW)
FROM (
SELECT arrow_cast(1, 'UInt64') AS a
UNION ALL SELECT arrow_cast(2, 'UInt64')
UNION ALL SELECT arrow_cast(3, 'UInt64')
);
----
1 1
2 2
3 3

############################################################################
# ROWS frame regression guard: huge offsets already saturate via
# saturating_sub / min(length), verify we keep that behavior.
############################################################################

query II
SELECT a, last_value(a) OVER (ORDER BY a ROWS BETWEEN CURRENT ROW AND 9223372036854775807 FOLLOWING)
FROM (
SELECT 1 AS a UNION ALL SELECT 2 UNION ALL SELECT 3
);
----
1 3
2 3
3 3

query II
SELECT a, first_value(a) OVER (ORDER BY a ROWS BETWEEN 9223372036854775807 PRECEDING AND CURRENT ROW)
FROM (
SELECT 1 AS a UNION ALL SELECT 2 UNION ALL SELECT 3
);
----
1 1
2 1
3 1

# Config reset
statement ok
reset datafusion.execution.batch_size;
Expand Down
Loading