From d365b1401f8fc780f6192cda4a4901afd4d1df7c Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Thu, 14 May 2026 11:23:06 +0800 Subject: [PATCH 1/3] feat: optimize unused Unnest pruning in projections - Pruned unused Unnest under aggregate/group-by when safe. - Handled direct Unnest and Projection -> Unnest scenarios. - Ensured Unnest is retained when empty/null lists may drop rows. - Added unit tests to cover these changes. - Updated safe GROUP BY explain in sqllogictest: no Unnest/UnnestExec. - Added regression tests for empty/null grouped scenarios to ensure Unnest retention. --- .../optimizer/src/optimize_projections/mod.rs | 212 +++++++++++++++++- datafusion/sqllogictest/test_files/unnest.slt | 25 ++- 2 files changed, 228 insertions(+), 9 deletions(-) diff --git a/datafusion/optimizer/src/optimize_projections/mod.rs b/datafusion/optimizer/src/optimize_projections/mod.rs index af944abc6f0b4..dbfbe7188cdbe 100644 --- a/datafusion/optimizer/src/optimize_projections/mod.rs +++ b/datafusion/optimizer/src/optimize_projections/mod.rs @@ -21,16 +21,18 @@ mod required_indices; use crate::optimizer::ApplyOrder; use crate::{OptimizerConfig, OptimizerRule}; +use arrow::array::Array; +use std::collections::HashSet; use std::sync::Arc; use datafusion_common::{ - Column, DFSchema, HashMap, JoinType, Result, assert_eq_or_internal_err, + Column, DFSchema, HashMap, JoinType, Result, ScalarValue, assert_eq_or_internal_err, get_required_group_by_exprs_indices, internal_datafusion_err, internal_err, }; use datafusion_expr::expr::Alias; use datafusion_expr::{ Aggregate, Distinct, EmptyRelation, Expr, Projection, TableScan, Unnest, Window, - logical_plan::LogicalPlan, + logical_plan::LogicalPlan, utils::expr_to_columns, }; use crate::optimize_projections::required_indices::RequiredIndices; @@ -146,7 +148,8 @@ fn optimize_projections( let n_group_exprs = aggregate.group_expr_len()?; // Offset aggregate indices so that they point to valid indices at // `aggregate.aggr_expr`: - let (group_by_reqs, aggregate_reqs) = indices.split_off(n_group_exprs); + let (group_by_reqs, aggregate_reqs) = + indices.clone().split_off(n_group_exprs); // Get absolutely necessary GROUP BY fields. // @@ -197,6 +200,22 @@ fn optimize_projections( ))); } + if new_aggr_expr.is_empty() + && let Some(input) = + remove_unused_unnest_from_duplicate_insensitive_input( + aggregate.input.as_ref(), + &new_group_bys, + )? + { + let aggregate = + Aggregate::try_new(Arc::new(input), new_group_bys, new_aggr_expr)?; + return optimize_projections( + LogicalPlan::Aggregate(aggregate), + config, + indices, + ); + } + let all_exprs_iter = new_group_bys.iter().chain(new_aggr_expr.iter()); let schema = aggregate.input.schema(); let necessary_indices = @@ -492,6 +511,109 @@ fn optimize_projections( } } +fn remove_unused_unnest_from_duplicate_insensitive_input( + input: &LogicalPlan, + required_exprs: &[Expr], +) -> Result> { + match input { + LogicalPlan::Unnest(unnest) + if can_remove_unused_unnest_for_exprs(unnest, required_exprs)? => + { + Ok(Some(Arc::unwrap_or_clone(Arc::clone(&unnest.input)))) + } + LogicalPlan::Projection(projection) => { + let LogicalPlan::Unnest(unnest) = projection.input.as_ref() else { + return Ok(None); + }; + let required_projection_exprs = RequiredIndices::new() + .with_exprs(&projection.schema, required_exprs.iter()) + .get_at_indices(&projection.expr); + + if can_remove_unused_unnest_for_exprs(unnest, &required_projection_exprs)? { + Projection::try_new(required_projection_exprs, Arc::clone(&unnest.input)) + .map(LogicalPlan::Projection) + .map(Some) + } else { + Ok(None) + } + } + _ => Ok(None), + } +} + +fn can_remove_unused_unnest_for_exprs(unnest: &Unnest, exprs: &[Expr]) -> Result { + if !unnest_preserves_at_least_one_row_per_input(unnest) { + return Ok(false); + } + + let unnested_input_indices = unnest + .list_type_columns + .iter() + .map(|(idx, _)| *idx) + .chain(unnest.struct_type_columns.iter().copied()) + .collect::>(); + + let mut columns = HashSet::new(); + for expr in exprs { + expr_to_columns(expr, &mut columns)?; + } + + columns.into_iter().try_fold(true, |can_remove, column| { + if !can_remove { + return Ok(false); + } + let output_index = unnest.schema.index_of_column(&column)?; + Ok(!unnested_input_indices + .iter() + .any(|idx| unnest.dependency_indices[output_index] == *idx)) + }) +} + +fn unnest_preserves_at_least_one_row_per_input(unnest: &Unnest) -> bool { + unnest.list_type_columns.is_empty() + || unnest.list_type_columns.iter().all(|(input_index, _)| { + unnest_input_expr(unnest, *input_index) + .and_then(literal_non_empty_list) + .unwrap_or(false) + }) +} + +fn unnest_input_expr(unnest: &Unnest, input_index: usize) -> Option<&Expr> { + match unnest.input.as_ref() { + LogicalPlan::Projection(projection) => projection.expr.get(input_index), + _ => None, + } +} + +fn literal_non_empty_list(expr: &Expr) -> Option { + let expr = match expr { + Expr::Alias(Alias { expr, .. }) => expr.as_ref(), + _ => expr, + }; + let Expr::Literal(value, _) = expr else { + return None; + }; + + match value { + ScalarValue::List(array) => { + Some(!array.is_empty() && array.is_valid(0) && array.value_length(0) > 0) + } + ScalarValue::LargeList(array) => { + Some(!array.is_empty() && array.is_valid(0) && array.value_length(0) > 0) + } + ScalarValue::FixedSizeList(array) => { + Some(!array.is_empty() && array.is_valid(0) && array.value_length() > 0) + } + ScalarValue::ListView(array) => { + Some(!array.is_empty() && array.is_valid(0) && array.value_sizes()[0] > 0) + } + ScalarValue::LargeListView(array) => { + Some(!array.is_empty() && array.is_valid(0) && array.value_sizes()[0] > 0) + } + _ => None, + } +} + /// Optimizes uncorrelated subquery plans embedded in expressions of the given /// plan node (e.g., `Expr::ScalarSubquery`). `map_children` only visits direct /// plan inputs, so subqueries must be handled separately. @@ -947,9 +1069,10 @@ mod tests { test_table_scan_with_name, }; use crate::{OptimizerContext, OptimizerRule}; - use arrow::datatypes::{DataType, Field, Schema}; + use arrow::array::ListArray; + use arrow::datatypes::{DataType, Field, Int64Type, Schema}; use datafusion_common::{ - Column, DFSchema, DFSchemaRef, JoinType, Result, TableReference, + Column, DFSchema, DFSchemaRef, JoinType, Result, ScalarValue, TableReference, }; use datafusion_expr::ExprFunctionExt; use datafusion_expr::{ @@ -1312,6 +1435,85 @@ mod tests { ) } + #[test] + fn remove_unused_non_empty_literal_unnest_under_group_by() -> Result<()> { + let schema = Schema::new(vec![Field::new("id", DataType::UInt32, false)]); + let list = ListArray::from_iter_primitive::(vec![Some(vec![ + Some(1), + Some(2), + ])]); + let plan = table_scan(Some("test"), &schema, None)? + .project(vec![ + col("id"), + Expr::Literal(ScalarValue::List(Arc::new(list)), None).alias("elem"), + ])? + .unnest_column(Column::from_name("elem"))? + .aggregate(vec![col("id")], Vec::::new())? + .build()?; + + assert_optimized_plan_equal!( + plan, + @r" + Aggregate: groupBy=[[test.id]], aggr=[[]] + TableScan: test projection=[id] + " + ) + } + + #[test] + fn remove_unused_unnest_below_projection_under_group_by() -> Result<()> { + let schema = Schema::new(vec![Field::new("id", DataType::UInt32, false)]); + let list = ListArray::from_iter_primitive::(vec![Some(vec![ + Some(1), + Some(2), + ])]); + let plan = table_scan(Some("test"), &schema, None)? + .project(vec![ + col("id"), + Expr::Literal(ScalarValue::List(Arc::new(list)), None).alias("elem"), + ])? + .unnest_column(Column::from_name("elem"))? + .project(vec![col("id")])? + .aggregate(vec![col("id")], Vec::::new())? + .build()?; + + assert_optimized_plan_equal!( + plan, + @r" + Aggregate: groupBy=[[test.id]], aggr=[[]] + TableScan: test projection=[id] + " + ) + } + + #[test] + fn keep_unused_empty_literal_unnest_under_group_by() -> Result<()> { + let schema = Schema::new(vec![Field::new("id", DataType::UInt32, false)]); + let list = ListArray::from_iter_primitive::(vec![Some(Vec::< + Option, + >::new( + ))]); + let plan = table_scan(Some("test"), &schema, None)? + .project(vec![ + col("id"), + Expr::Literal(ScalarValue::List(Arc::new(list)), None).alias("elem"), + ])? + .unnest_column(Column::from_name("elem"))? + .aggregate(vec![col("id")], Vec::::new())? + .build()?; + + assert_optimized_plan_equal!( + plan, + @r" + Aggregate: groupBy=[[test.id]], aggr=[[]] + Projection: test.id + Unnest: lists[elem|depth=1] structs[] + Projection: test.id, List([]) AS elem + TableScan: test projection=[id] + " + ) + } + #[test] fn test_neg_push_down() -> Result<()> { let table_scan = test_table_scan()?; diff --git a/datafusion/sqllogictest/test_files/unnest.slt b/datafusion/sqllogictest/test_files/unnest.slt index faeb5d59578e5..5aaa90682e162 100644 --- a/datafusion/sqllogictest/test_files/unnest.slt +++ b/datafusion/sqllogictest/test_files/unnest.slt @@ -1348,8 +1348,8 @@ SELECT * FROM ( (3, arrow_cast(NULL, 'List(Int64)')) ) AS t(id, arr); -# Reproducer for the optimization gap: the unused `elem` output is duplicate-insensitive -# below this GROUP BY, but the current plan still keeps Unnest/UnnestExec. +# The unused `elem` output only duplicates rows below this GROUP BY, so it can +# be pruned without changing the grouped ids. query I SELECT id FROM ( @@ -1372,9 +1372,13 @@ FROM ( GROUP BY id; ---- logical_plan -Unnest: +01)Aggregate: groupBy=[[unused_unnest_pruning.id]], aggr=[[]] +02)--TableScan: unused_unnest_pruning projection=[id] physical_plan -UnnestExec +01)AggregateExec: mode=FinalPartitioned, gby=[id@0 as id], aggr=[] +02)--RepartitionExec: partitioning=Hash([id@0], 4), input_partitions=1 +03)----AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[] +04)------DataSourceExec: partitions=1, partition_sizes=[1] # Counterexample: removing UNNEST here would change cardinality. query I rowsort @@ -1417,5 +1421,18 @@ FROM ( ---- 2 +# Empty and NULL input lists can remove rows before grouping, so this UNNEST +# must not be pruned even though `elem` is not projected above the GROUP BY. +query I +SELECT id +FROM ( + SELECT id, UNNEST(arr) AS elem + FROM unused_unnest_pruning +) +GROUP BY id +ORDER BY id; +---- +1 + statement ok DROP TABLE unused_unnest_pruning; From b32b73fe00f16ea718a66f5146d510146bfa340e Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Thu, 14 May 2026 11:35:00 +0800 Subject: [PATCH 2/3] feat(tests): add optimizer unit test and SLT DISTINCT regression - Added a unit test for the optimizer to verify that GROUP BY elements keep UNNEST in the case of `keep_referenced_unnest_under_group_by`. - Implemented SLT DISTINCT regression test for `SELECT DISTINCT id ... UNNEST(make_array(...))`, ensuring the correctness of results and that UNNEST is pruned via the distinct-to-aggregate path. --- .../optimizer/src/optimize_projections/mod.rs | 27 +++++++++++++++++ datafusion/sqllogictest/test_files/unnest.slt | 30 +++++++++++++++++++ 2 files changed, 57 insertions(+) diff --git a/datafusion/optimizer/src/optimize_projections/mod.rs b/datafusion/optimizer/src/optimize_projections/mod.rs index dbfbe7188cdbe..1123fc6189bd4 100644 --- a/datafusion/optimizer/src/optimize_projections/mod.rs +++ b/datafusion/optimizer/src/optimize_projections/mod.rs @@ -1486,6 +1486,33 @@ mod tests { ) } + #[test] + fn keep_referenced_unnest_under_group_by() -> Result<()> { + let schema = Schema::new(vec![Field::new("id", DataType::UInt32, false)]); + let list = ListArray::from_iter_primitive::(vec![Some(vec![ + Some(1), + Some(2), + ])]); + let plan = table_scan(Some("test"), &schema, None)? + .project(vec![ + col("id"), + Expr::Literal(ScalarValue::List(Arc::new(list)), None).alias("elem"), + ])? + .unnest_column(Column::from_name("elem"))? + .aggregate(vec![col("elem")], Vec::::new())? + .build()?; + + assert_optimized_plan_equal!( + plan, + @r" + Aggregate: groupBy=[[elem]], aggr=[[]] + Unnest: lists[elem|depth=1] structs[] + Projection: List([1, 2]) AS elem + TableScan: test projection=[] + " + ) + } + #[test] fn keep_unused_empty_literal_unnest_under_group_by() -> Result<()> { let schema = Schema::new(vec![Field::new("id", DataType::UInt32, false)]); diff --git a/datafusion/sqllogictest/test_files/unnest.slt b/datafusion/sqllogictest/test_files/unnest.slt index 5aaa90682e162..1cb610274bdfc 100644 --- a/datafusion/sqllogictest/test_files/unnest.slt +++ b/datafusion/sqllogictest/test_files/unnest.slt @@ -1380,6 +1380,36 @@ physical_plan 03)----AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[] 04)------DataSourceExec: partitions=1, partition_sizes=[1] +# DISTINCT is implemented as a duplicate-insensitive aggregate before projection +# pruning, so the same unused non-empty literal UNNEST can be removed. +query I +SELECT DISTINCT id +FROM ( + SELECT id, UNNEST(make_array(1, 2, 3)) AS elem + FROM unused_unnest_pruning +) +ORDER BY id; +---- +1 +2 +3 + +query TT +EXPLAIN SELECT DISTINCT id +FROM ( + SELECT id, UNNEST(make_array(1, 2, 3)) AS elem + FROM unused_unnest_pruning +); +---- +logical_plan +01)Aggregate: groupBy=[[unused_unnest_pruning.id]], aggr=[[]] +02)--TableScan: unused_unnest_pruning projection=[id] +physical_plan +01)AggregateExec: mode=FinalPartitioned, gby=[id@0 as id], aggr=[] +02)--RepartitionExec: partitioning=Hash([id@0], 4), input_partitions=1 +03)----AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[] +04)------DataSourceExec: partitions=1, partition_sizes=[1] + # Counterexample: removing UNNEST here would change cardinality. query I rowsort SELECT id From ce3e4193bea65398790ab5295c6fdf9967cecec3 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Thu, 14 May 2026 11:43:49 +0800 Subject: [PATCH 3/3] feat: optimize input index handling and improve test setup - Removed Vec allocation for unnested input indices to enhance performance. - Replaced try_fold boolean flow with a more explicit for loop for better readability. - Introduced is_unnested_input_index function to clarify input index processing. - Simplified logic for handling empty lists in .all() calls. - Added has_valid_first_value helper for validation purposes. - Introduced new test helpers: id_schema, list_literal_expr, and id_elem_unnest_plan to facilitate testing. - Streamlined repeated optimizer test setup for efficiency. - Improved construction of empty-list tests for clarity. --- .../optimizer/src/optimize_projections/mod.rs | 119 ++++++++---------- 1 file changed, 50 insertions(+), 69 deletions(-) diff --git a/datafusion/optimizer/src/optimize_projections/mod.rs b/datafusion/optimizer/src/optimize_projections/mod.rs index 1123fc6189bd4..899cbcb00263f 100644 --- a/datafusion/optimizer/src/optimize_projections/mod.rs +++ b/datafusion/optimizer/src/optimize_projections/mod.rs @@ -546,36 +546,36 @@ fn can_remove_unused_unnest_for_exprs(unnest: &Unnest, exprs: &[Expr]) -> Result return Ok(false); } - let unnested_input_indices = unnest - .list_type_columns - .iter() - .map(|(idx, _)| *idx) - .chain(unnest.struct_type_columns.iter().copied()) - .collect::>(); - let mut columns = HashSet::new(); for expr in exprs { expr_to_columns(expr, &mut columns)?; } - columns.into_iter().try_fold(true, |can_remove, column| { - if !can_remove { + for column in columns { + let output_index = unnest.schema.index_of_column(&column)?; + if is_unnested_input_index(unnest, unnest.dependency_indices[output_index]) { return Ok(false); } - let output_index = unnest.schema.index_of_column(&column)?; - Ok(!unnested_input_indices - .iter() - .any(|idx| unnest.dependency_indices[output_index] == *idx)) - }) + } + + Ok(true) +} + +fn is_unnested_input_index(unnest: &Unnest, input_index: usize) -> bool { + unnest + .list_type_columns + .iter() + .map(|(idx, _)| *idx) + .chain(unnest.struct_type_columns.iter().copied()) + .any(|idx| idx == input_index) } fn unnest_preserves_at_least_one_row_per_input(unnest: &Unnest) -> bool { - unnest.list_type_columns.is_empty() - || unnest.list_type_columns.iter().all(|(input_index, _)| { - unnest_input_expr(unnest, *input_index) - .and_then(literal_non_empty_list) - .unwrap_or(false) - }) + unnest.list_type_columns.iter().all(|(input_index, _)| { + unnest_input_expr(unnest, *input_index) + .and_then(literal_non_empty_list) + .unwrap_or(false) + }) } fn unnest_input_expr(unnest: &Unnest, input_index: usize) -> Option<&Expr> { @@ -596,24 +596,28 @@ fn literal_non_empty_list(expr: &Expr) -> Option { match value { ScalarValue::List(array) => { - Some(!array.is_empty() && array.is_valid(0) && array.value_length(0) > 0) + Some(has_valid_first_value(array.as_ref()) && array.value_length(0) > 0) } ScalarValue::LargeList(array) => { - Some(!array.is_empty() && array.is_valid(0) && array.value_length(0) > 0) + Some(has_valid_first_value(array.as_ref()) && array.value_length(0) > 0) } ScalarValue::FixedSizeList(array) => { - Some(!array.is_empty() && array.is_valid(0) && array.value_length() > 0) + Some(has_valid_first_value(array.as_ref()) && array.value_length() > 0) } ScalarValue::ListView(array) => { - Some(!array.is_empty() && array.is_valid(0) && array.value_sizes()[0] > 0) + Some(has_valid_first_value(array.as_ref()) && array.value_sizes()[0] > 0) } ScalarValue::LargeListView(array) => { - Some(!array.is_empty() && array.is_valid(0) && array.value_sizes()[0] > 0) + Some(has_valid_first_value(array.as_ref()) && array.value_sizes()[0] > 0) } _ => None, } } +fn has_valid_first_value(array: &impl Array) -> bool { + !array.is_empty() && array.is_valid(0) +} + /// Optimizes uncorrelated subquery plans embedded in expressions of the given /// plan node (e.g., `Expr::ScalarSubquery`). `map_children` only visits direct /// plan inputs, so subqueries must be handled separately. @@ -1109,6 +1113,22 @@ mod tests { }}; } + fn id_schema() -> Schema { + Schema::new(vec![Field::new("id", DataType::UInt32, false)]) + } + + fn list_literal_expr(values: Vec>) -> Expr { + let list = ListArray::from_iter_primitive::(vec![Some(values)]); + Expr::Literal(ScalarValue::List(Arc::new(list)), None) + } + + fn id_elem_unnest_plan(values: Vec>) -> Result { + let schema = id_schema(); + table_scan(Some("test"), &schema, None)? + .project(vec![col("id"), list_literal_expr(values).alias("elem")])? + .unnest_column(Column::from_name("elem")) + } + #[derive(Debug, Hash, PartialEq, Eq)] struct NoOpUserDefined { exprs: Vec, @@ -1437,17 +1457,7 @@ mod tests { #[test] fn remove_unused_non_empty_literal_unnest_under_group_by() -> Result<()> { - let schema = Schema::new(vec![Field::new("id", DataType::UInt32, false)]); - let list = ListArray::from_iter_primitive::(vec![Some(vec![ - Some(1), - Some(2), - ])]); - let plan = table_scan(Some("test"), &schema, None)? - .project(vec![ - col("id"), - Expr::Literal(ScalarValue::List(Arc::new(list)), None).alias("elem"), - ])? - .unnest_column(Column::from_name("elem"))? + let plan = id_elem_unnest_plan(vec![Some(1), Some(2)])? .aggregate(vec![col("id")], Vec::::new())? .build()?; @@ -1462,17 +1472,7 @@ mod tests { #[test] fn remove_unused_unnest_below_projection_under_group_by() -> Result<()> { - let schema = Schema::new(vec![Field::new("id", DataType::UInt32, false)]); - let list = ListArray::from_iter_primitive::(vec![Some(vec![ - Some(1), - Some(2), - ])]); - let plan = table_scan(Some("test"), &schema, None)? - .project(vec![ - col("id"), - Expr::Literal(ScalarValue::List(Arc::new(list)), None).alias("elem"), - ])? - .unnest_column(Column::from_name("elem"))? + let plan = id_elem_unnest_plan(vec![Some(1), Some(2)])? .project(vec![col("id")])? .aggregate(vec![col("id")], Vec::::new())? .build()?; @@ -1488,17 +1488,7 @@ mod tests { #[test] fn keep_referenced_unnest_under_group_by() -> Result<()> { - let schema = Schema::new(vec![Field::new("id", DataType::UInt32, false)]); - let list = ListArray::from_iter_primitive::(vec![Some(vec![ - Some(1), - Some(2), - ])]); - let plan = table_scan(Some("test"), &schema, None)? - .project(vec![ - col("id"), - Expr::Literal(ScalarValue::List(Arc::new(list)), None).alias("elem"), - ])? - .unnest_column(Column::from_name("elem"))? + let plan = id_elem_unnest_plan(vec![Some(1), Some(2)])? .aggregate(vec![col("elem")], Vec::::new())? .build()?; @@ -1515,17 +1505,8 @@ mod tests { #[test] fn keep_unused_empty_literal_unnest_under_group_by() -> Result<()> { - let schema = Schema::new(vec![Field::new("id", DataType::UInt32, false)]); - let list = ListArray::from_iter_primitive::(vec![Some(Vec::< - Option, - >::new( - ))]); - let plan = table_scan(Some("test"), &schema, None)? - .project(vec![ - col("id"), - Expr::Literal(ScalarValue::List(Arc::new(list)), None).alias("elem"), - ])? - .unnest_column(Column::from_name("elem"))? + let empty_list: Vec> = vec![]; + let plan = id_elem_unnest_plan(empty_list)? .aggregate(vec![col("id")], Vec::::new())? .build()?;