From fe64eba054faf5f6853a983a98204453d03c4276 Mon Sep 17 00:00:00 2001 From: Kumar Ujjawal Date: Thu, 14 May 2026 10:06:59 +0530 Subject: [PATCH] Support DISTINCT ON with aggregation and windows --- datafusion/sql/src/select.rs | 201 +++++++++++++++--- datafusion/sql/src/utils.rs | 4 + .../sqllogictest/test_files/distinct_on.slt | 179 ++++++++++++++++ 3 files changed, 350 insertions(+), 34 deletions(-) diff --git a/datafusion/sql/src/select.rs b/datafusion/sql/src/select.rs index 09d8566c4a19e..5292c3ee23516 100644 --- a/datafusion/sql/src/select.rs +++ b/datafusion/sql/src/select.rs @@ -69,6 +69,25 @@ struct AggregatePlanResult { qualify_expr: Option, /// ORDER BY expressions rewritten to reference aggregate output columns order_by_exprs: Vec, + /// DISTINCT ON expressions rewritten to reference aggregate output columns + on_exprs: Vec, +} + +/// If `expr` is a bare unqualified `Column` whose name matches a SELECT +/// alias, swap it for the alias's underlying expression. Nested occurrences +/// are left alone — PostgreSQL only resolves a top-level identifier as an +/// output alias in clauses like ORDER BY and DISTINCT ON. +fn substitute_top_level_alias( + expr: Expr, + aliases: &datafusion_common::HashMap, +) -> Expr { + if let Expr::Column(c) = &expr + && c.relation.is_none() + && let Some(underlying) = aliases.get(&c.name) + { + return underlying.clone(); + } + expr } impl SqlToRel<'_, S> { @@ -145,6 +164,36 @@ impl SqlToRel<'_, S> { // This alias map is resolved and looked up in both having exprs and group by exprs let alias_map = extract_aliases(&select_exprs); + // DISTINCT ON expressions are parsed alongside HAVING / QUALIFY so + // they participate in aggregate / window discovery and get rebased + // through the same pipeline. The SQL nodes are taken out of + // `select.distinct` so the later match on `Distinct::On` still fires + // but does not move the original Vec. + // + // Resolution precedence matches PostgreSQL and ORDER BY: SELECT + // aliases win over input columns. For example, + // SELECT DISTINCT ON (b) a AS b ... GROUP BY a + // resolves `b` to the alias for `a`, not to a same-named input + // column. + let on_exprs_sql: Vec = match &mut select.distinct { + Some(Distinct::On(exprs)) => std::mem::take(exprs), + _ => Vec::new(), + }; + let mut on_expr_schema = projected_plan.schema().as_ref().clone(); + on_expr_schema.merge(base_plan.schema()); + let on_exprs_pre_aggr: Vec = on_exprs_sql + .into_iter() + .map(|e| { + let expr = + self.sql_expr_to_logical_expr(e, &on_expr_schema, planner_context)?; + // PostgreSQL only substitutes an output alias when the whole + // ON expression is a bare identifier. `b` resolves to the + // alias; `b + 0` keeps `b` as the input column. + let expr = substitute_top_level_alias(expr, &alias_map); + normalize_col(expr, &projected_plan) + }) + .collect::>>()?; + // Optionally the HAVING expression. let having_expr_opt = select .having @@ -251,12 +300,15 @@ impl SqlToRel<'_, S> { // Find aggregates in ORDER BY let order_by_aggrs = find_aggregate_exprs(order_by_rex.iter().map(|s| &s.expr)); - // Combine: all aggregates from SELECT/HAVING/QUALIFY, plus ORDER BY aggregates - // that aren't already in SELECT/HAVING/QUALIFY + // Find aggregates in DISTINCT ON + let on_aggrs = find_aggregate_exprs(on_exprs_pre_aggr.iter()); + + // Combine: all aggregates from SELECT/HAVING/QUALIFY, plus ORDER BY + // and DISTINCT ON aggregates that aren't already covered. let mut aggr_exprs = select_having_qualify_aggrs; - for order_by_aggr in order_by_aggrs { - if !aggr_exprs.iter().any(|e| e == &order_by_aggr) { - aggr_exprs.push(order_by_aggr); + for extra_aggr in order_by_aggrs.into_iter().chain(on_aggrs) { + if !aggr_exprs.iter().any(|e| e == &extra_aggr) { + aggr_exprs.push(extra_aggr); } } @@ -267,6 +319,7 @@ impl SqlToRel<'_, S> { having_expr: having_expr_post_aggr, qualify_expr: qualify_expr_post_aggr, order_by_exprs: mut order_by_rex, + on_exprs: mut on_exprs_post_aggr, } = if !group_by_exprs.is_empty() || !aggr_exprs.is_empty() { self.aggregate( &base_plan, @@ -274,6 +327,7 @@ impl SqlToRel<'_, S> { having_expr_opt.as_ref(), qualify_expr_opt.as_ref(), &order_by_rex, + &on_exprs_pre_aggr, &group_by_exprs, &aggr_exprs, )? @@ -290,6 +344,7 @@ impl SqlToRel<'_, S> { having_expr: having_expr_opt, qualify_expr: qualify_expr_opt, order_by_exprs: order_by_rex, + on_exprs: on_exprs_pre_aggr, }, } }; @@ -304,12 +359,13 @@ impl SqlToRel<'_, S> { // All of the window expressions (deduplicated and rewritten to reference aggregates as // columns from input). Window functions may be sourced from the SELECT list, QUALIFY - // expression, or ORDER BY. + // expression, ORDER BY, or DISTINCT ON. let window_func_exprs = find_window_exprs( select_exprs_post_aggr .iter() .chain(qualify_expr_post_aggr.iter()) - .chain(order_by_rex.iter().map(|s| &s.expr)), + .chain(order_by_rex.iter().map(|s| &s.expr)) + .chain(on_exprs_post_aggr.iter()), ); // Process window functions after aggregation as they can reference @@ -336,6 +392,11 @@ impl SqlToRel<'_, S> { }) .collect::>>()?; + on_exprs_post_aggr = on_exprs_post_aggr + .iter() + .map(|expr| rebase_expr(expr, &window_func_exprs, &plan)) + .collect::>>()?; + plan }; @@ -377,39 +438,94 @@ impl SqlToRel<'_, S> { plan }; - // Try processing unnest expression or do the final projection - let plan = self.try_process_unnest(plan, select_exprs_post_aggr)?; - - // Process distinct clause + // Process distinct clause. For `DISTINCT ON` combined with + // aggregation, GROUP BY, or window functions we apply DistinctOn + // *before* the final projection so grouping columns and ORDER BY + // tie-breakers that aren't in the user SELECT stay in scope. + // DistinctOn provides the projection in that case (its select_expr + // list is wrapped in FIRST_VALUE during lowering). let plan = match select.distinct { - None => Ok(plan), - Some(Distinct::All) => Ok(plan), + None | Some(Distinct::All) => { + self.try_process_unnest(plan, select_exprs_post_aggr)? + } Some(Distinct::Distinct) => { - LogicalPlanBuilder::from(plan).distinct()?.build() + let plan = self.try_process_unnest(plan, select_exprs_post_aggr)?; + LogicalPlanBuilder::from(plan).distinct()?.build()? } - Some(Distinct::On(on_expr)) => { - if !aggr_exprs.is_empty() - || !group_by_exprs.is_empty() - || !window_func_exprs.is_empty() + Some(Distinct::On(_)) => { + if aggr_exprs.is_empty() + && group_by_exprs.is_empty() + && window_func_exprs.is_empty() { - return not_impl_err!( - "DISTINCT ON expressions with GROUP BY, aggregation or window functions are not supported " - ); - } - - let on_expr = on_expr - .into_iter() - .map(|e| { - self.sql_expr_to_logical_expr(e, plan.schema(), planner_context) - }) - .collect::>>()?; + // Fast path: no aggregation context. Fuse projection + // and deduplication into a single DistinctOn over + // `base_plan`. The sort attached to DistinctOn via + // `with_sort_expr` later normalizes against base_plan, + // so a bare ORDER BY alias (e.g. `ORDER BY x` where + // SELECT has `a AS x`) must be swapped back to the + // underlying input expression first. + if !alias_map.is_empty() { + order_by_rex = order_by_rex + .into_iter() + .map(|s| { + let expr = substitute_top_level_alias( + s.expr.clone(), + &alias_map, + ); + s.with_expr(expr) + }) + .collect(); + } + LogicalPlanBuilder::from(base_plan) + .distinct_on(on_exprs_post_aggr, select_exprs, None)? + .build()? + } else { + // General path: DistinctOn layered over the post- + // aggregate / post-window plan (no extra Projection + // node — DistinctOn's lowering wraps each select_expr + // in FIRST_VALUE, which acts as the projection). + // + // The DistinctOn input has the post-aggregate raw + // column names (e.g. `max(t.c4)`), not the user-facing + // SELECT aliases (`agg2`). ORDER BY may reference + // those aliases — substitute them back to the + // underlying post-aggregate expression so they + // resolve against the DistinctOn input. + let select_alias_map: datafusion_common::HashMap = + select_exprs_post_aggr + .iter() + .filter_map(|e| match e { + Expr::Alias(a) => { + Some((a.name.clone(), (*a.expr).clone())) + } + _ => None, + }) + .collect(); + + if !select_alias_map.is_empty() { + // Only swap an alias for its underlying expression + // when the ORDER BY item is a bare alias name — + // matches PostgreSQL's behavior. `b` resolves to + // the alias; `b + 0` keeps `b` as the input + // column. + order_by_rex = order_by_rex + .into_iter() + .map(|s| { + let expr = substitute_top_level_alias( + s.expr.clone(), + &select_alias_map, + ); + s.with_expr(expr) + }) + .collect(); + } - // Build the final plan - LogicalPlanBuilder::from(base_plan) - .distinct_on(on_expr, select_exprs, None)? - .build() + LogicalPlanBuilder::from(plan) + .distinct_on(on_exprs_post_aggr, select_exprs_post_aggr, None)? + .build()? + } } - }?; + }; // DISTRIBUTE BY let plan = if !select.distribute_by.is_empty() { @@ -1011,6 +1127,7 @@ impl SqlToRel<'_, S> { having_expr_opt: Option<&Expr>, qualify_expr_opt: Option<&Expr>, order_by_exprs: &[SortExpr], + on_exprs: &[Expr], group_by_exprs: &[Expr], aggr_exprs: &[Expr], ) -> Result { @@ -1177,12 +1294,28 @@ impl SqlToRel<'_, S> { ), )?; + // Rewrite the DISTINCT ON expressions to use the columns produced by + // the aggregation. Same shape as ORDER BY rewriting so a hidden + // grouping column or a raw aggregate expression in ON is resolved. + let on_exprs_post_aggr = on_exprs + .iter() + .map(|expr| rebase_expr(expr, &aggr_projection_exprs, input)) + .collect::>>()?; + check_columns_satisfy_exprs( + &all_valid_exprs, + &on_exprs_post_aggr, + CheckColumnsSatisfyExprsPurpose::Aggregate( + CheckColumnsMustReferenceAggregatePurpose::DistinctOn, + ), + )?; + Ok(AggregatePlanResult { plan, select_exprs: select_exprs_post_aggr, having_expr: having_expr_post_aggr, qualify_expr: qualify_expr_post_aggr, order_by_exprs: order_by_post_aggr, + on_exprs: on_exprs_post_aggr, }) } diff --git a/datafusion/sql/src/utils.rs b/datafusion/sql/src/utils.rs index 1a76dd69f46c5..03ea467ada128 100644 --- a/datafusion/sql/src/utils.rs +++ b/datafusion/sql/src/utils.rs @@ -98,6 +98,7 @@ pub(crate) enum CheckColumnsMustReferenceAggregatePurpose { Having, Qualify, OrderBy, + DistinctOn, } #[derive(Debug, Clone, Copy, PartialEq, Eq)] @@ -120,6 +121,9 @@ impl CheckColumnsSatisfyExprsPurpose { Self::Aggregate(CheckColumnsMustReferenceAggregatePurpose::OrderBy) => { "Column in ORDER BY must be in GROUP BY or an aggregate function" } + Self::Aggregate(CheckColumnsMustReferenceAggregatePurpose::DistinctOn) => { + "Column in DISTINCT ON must be in GROUP BY or an aggregate function" + } } } diff --git a/datafusion/sqllogictest/test_files/distinct_on.slt b/datafusion/sqllogictest/test_files/distinct_on.slt index 5b18915080f8f..c44b7a852a46b 100644 --- a/datafusion/sqllogictest/test_files/distinct_on.slt +++ b/datafusion/sqllogictest/test_files/distinct_on.slt @@ -195,3 +195,182 @@ RESET datafusion.explain.logical_plan_only; statement ok drop table t; + +# DISTINCT ON combined with GROUP BY + aggregation (issue #17256). +# ON references a grouping column; ORDER BY uses an aggregate alias. +query TII +SELECT DISTINCT ON (c1) c1, c3, max(c4) AS agg2 +FROM aggregate_test_100 GROUP BY c1, c3 ORDER BY c1, agg2; +---- +a 65 -28462 +b -60 -21739 +c 3 -30508 +d 102 -24558 +e -56 -31500 + +# DISTINCT ON referencing a SELECT alias for an aggregate. +query TI +SELECT DISTINCT ON (agg2) c1, max(c4) AS agg2 +FROM aggregate_test_100 GROUP BY c1 ORDER BY agg2; +---- +b 25286 +c 29106 +d 31106 +a 32064 +e 32514 + +# DISTINCT ON with a scalar function over a grouping column. +query TI +SELECT DISTINCT ON (upper(c1)) c1, sum(c3) FROM aggregate_test_100 +GROUP BY c1 ORDER BY upper(c1); +---- +a -385 +b -111 +c -28 +d 458 +e 847 + +# Hidden ORDER BY tie-breaker: c3 is in GROUP BY and ORDER BY but +# NOT in the SELECT list. PostgreSQL accepts this. +query TI +SELECT DISTINCT ON (c1) c1, max(c4) AS agg2 +FROM aggregate_test_100 GROUP BY c1, c3 ORDER BY c1, c3; +---- +a 11640 +b 19316 +c -30187 +d 5613 +e 13611 + +# Hidden DISTINCT ON key: ON references a grouping column that is +# NOT in the SELECT list. ORDER BY adds a deterministic tie-breaker. +query II +SELECT DISTINCT ON (c1) c2 % 2, count(*) AS n +FROM aggregate_test_100 GROUP BY c1, c2 % 2 ORDER BY c1, c2 % 2; +---- +0 7 +0 9 +0 11 +0 6 +0 12 + +# Raw aggregate expression in DISTINCT ON (not via an alias). +query TI +SELECT DISTINCT ON (sum(c3)) c1, sum(c3) AS total +FROM aggregate_test_100 GROUP BY c1 ORDER BY sum(c3); +---- +a -385 +b -111 +c -28 +d 458 +e 847 + +# DISTINCT ON with HAVING. +query TI +SELECT DISTINCT ON (c1) c1, count(*) AS cnt FROM aggregate_test_100 +GROUP BY c1 HAVING count(*) > 10 ORDER BY c1, cnt DESC; +---- +a 21 +b 19 +c 21 +d 18 +e 21 + +# DISTINCT ON combined with a window function over a unique ordering +# key, so the test is fully deterministic. +query II +WITH t(id, v) AS (VALUES (1, 10), (2, 20), (3, 10), (4, 30), (5, 20)) +SELECT DISTINCT ON (v) v, row_number() OVER (ORDER BY id) AS rn +FROM t ORDER BY v, rn; +---- +10 1 +20 2 +30 4 + +# Raw window expression in DISTINCT ON (not via an alias). Uses a +# unique ordering key so row_number is deterministic. +query II +WITH t(id, v) AS (VALUES (1, 10), (2, 20), (3, 30), (4, 40)) +SELECT DISTINCT ON (row_number() OVER (ORDER BY id)) id, v +FROM t +ORDER BY row_number() OVER (ORDER BY id); +---- +1 10 +2 20 +3 30 +4 40 + +# Qualified join columns with potential alias conflict. +query TII +WITH t1(k, v) AS (VALUES ('x', 1), ('x', 2), ('y', 3)), + t2(k, w) AS (VALUES ('x', 10), ('y', 20)) +SELECT DISTINCT ON (t1.k) t1.k, sum(t1.v) AS s, max(t2.w) AS mw +FROM t1 JOIN t2 ON t1.k = t2.k +GROUP BY t1.k ORDER BY t1.k; +---- +x 3 10 +y 3 20 + +# DISTINCT ON name conflicts with an input column of the same name. +# PostgreSQL resolves `b` to the SELECT alias `a AS b`, not the input +# column `t.b`. Groups should be keyed by `a`, not `t.b`. +query TI +WITH t(a, b) AS (VALUES ('x', 1), ('x', 2), ('y', 1)) +SELECT DISTINCT ON (b) a AS b, count(*) AS n +FROM t GROUP BY a ORDER BY b; +---- +x 2 +y 1 + +# A bare alias resolves to the SELECT expression, but inside a larger +# expression the same identifier refers to the input column. Postgres: +# ORDER BY a, b + 0 DESC +# uses `a` (post-aggregate) and `t.b + 0`, so for a=100 the row with +# t.b=2 wins (sum=2). DataFusion must not recursively swap `b` inside +# `b + 0` for the alias. +query II +WITH t(a, b) AS (VALUES (100, 1), (100, 2), (200, 1)) +SELECT DISTINCT ON (a) a AS b, sum(b) AS s +FROM t GROUP BY a, b ORDER BY a, b + 0 DESC; +---- +100 2 +200 1 + +# Fast path (no aggregation): a bare ORDER BY alias must resolve to +# its underlying SELECT expression so that the sort attached to +# DistinctOn normalizes against the base plan. +query T +WITH t(a, b) AS (VALUES ('x', 1), ('x', 2), ('y', 3)) +SELECT DISTINCT ON (x) a AS x FROM t ORDER BY x; +---- +x +y + +# EXPLAIN for the post-aggregation case. +statement ok +set datafusion.explain.logical_plan_only = true; + +query TT +explain SELECT DISTINCT ON (c1) c1, max(c4) AS agg2 +FROM aggregate_test_100 GROUP BY c1 ORDER BY c1, agg2; +---- +logical_plan +01)Projection: first_value(aggregate_test_100.c1) ORDER BY [aggregate_test_100.c1 ASC NULLS LAST, max(aggregate_test_100.c4) ASC NULLS LAST] AS c1, first_value(agg2) ORDER BY [aggregate_test_100.c1 ASC NULLS LAST, max(aggregate_test_100.c4) ASC NULLS LAST] AS agg2 +02)--Sort: aggregate_test_100.c1 ASC NULLS LAST +03)----Aggregate: groupBy=[[aggregate_test_100.c1]], aggr=[[first_value(aggregate_test_100.c1) ORDER BY [aggregate_test_100.c1 ASC NULLS LAST, max(aggregate_test_100.c4) ASC NULLS LAST], first_value(max(aggregate_test_100.c4) AS agg2) ORDER BY [aggregate_test_100.c1 ASC NULLS LAST, max(aggregate_test_100.c4) ASC NULLS LAST]]] +04)------Aggregate: groupBy=[[aggregate_test_100.c1]], aggr=[[max(aggregate_test_100.c4)]] +05)--------TableScan: aggregate_test_100 projection=[c1, c4] + +statement ok +RESET datafusion.explain.logical_plan_only; + +# Synthetic repro for issue #17256. +query TIR +WITH t(a, b, c) AS ( + VALUES ('x', 1, 10.0), ('x', 1, 20.0), ('y', 2, 30.0) +) +SELECT DISTINCT ON (a) a, b, sum(c) AS total +FROM t GROUP BY a, b ORDER BY a, total DESC; +---- +x 1 30 +y 2 30