Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
201 changes: 167 additions & 34 deletions datafusion/sql/src/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,25 @@ struct AggregatePlanResult {
qualify_expr: Option<Expr>,
/// ORDER BY expressions rewritten to reference aggregate output columns
order_by_exprs: Vec<SortExpr>,
/// DISTINCT ON expressions rewritten to reference aggregate output columns
on_exprs: Vec<Expr>,
}

/// If `expr` is a bare unqualified `Column` whose name matches a SELECT
/// alias, swap it for the alias's underlying expression. Nested occurrences
/// are left alone — PostgreSQL only resolves a top-level identifier as an
/// output alias in clauses like ORDER BY and DISTINCT ON.
fn substitute_top_level_alias(
expr: Expr,
aliases: &datafusion_common::HashMap<String, Expr>,
) -> Expr {
if let Expr::Column(c) = &expr
&& c.relation.is_none()
&& let Some(underlying) = aliases.get(&c.name)
{
return underlying.clone();
}
expr
}

impl<S: ContextProvider> SqlToRel<'_, S> {
Expand Down Expand Up @@ -145,6 +164,36 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
// This alias map is resolved and looked up in both having exprs and group by exprs
let alias_map = extract_aliases(&select_exprs);

// DISTINCT ON expressions are parsed alongside HAVING / QUALIFY so
// they participate in aggregate / window discovery and get rebased
// through the same pipeline. The SQL nodes are taken out of
// `select.distinct` so the later match on `Distinct::On` still fires
// but does not move the original Vec.
//
// Resolution precedence matches PostgreSQL and ORDER BY: SELECT
// aliases win over input columns. For example,
// SELECT DISTINCT ON (b) a AS b ... GROUP BY a
// resolves `b` to the alias for `a`, not to a same-named input
// column.
let on_exprs_sql: Vec<SQLExpr> = match &mut select.distinct {
Some(Distinct::On(exprs)) => std::mem::take(exprs),
_ => Vec::new(),
};
let mut on_expr_schema = projected_plan.schema().as_ref().clone();
on_expr_schema.merge(base_plan.schema());
let on_exprs_pre_aggr: Vec<Expr> = on_exprs_sql
.into_iter()
.map(|e| {
let expr =
self.sql_expr_to_logical_expr(e, &on_expr_schema, planner_context)?;
// PostgreSQL only substitutes an output alias when the whole
// ON expression is a bare identifier. `b` resolves to the
// alias; `b + 0` keeps `b` as the input column.
let expr = substitute_top_level_alias(expr, &alias_map);
normalize_col(expr, &projected_plan)
})
.collect::<Result<Vec<Expr>>>()?;

// Optionally the HAVING expression.
let having_expr_opt = select
.having
Expand Down Expand Up @@ -251,12 +300,15 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
// Find aggregates in ORDER BY
let order_by_aggrs = find_aggregate_exprs(order_by_rex.iter().map(|s| &s.expr));

// Combine: all aggregates from SELECT/HAVING/QUALIFY, plus ORDER BY aggregates
// that aren't already in SELECT/HAVING/QUALIFY
// Find aggregates in DISTINCT ON
let on_aggrs = find_aggregate_exprs(on_exprs_pre_aggr.iter());

// Combine: all aggregates from SELECT/HAVING/QUALIFY, plus ORDER BY
// and DISTINCT ON aggregates that aren't already covered.
let mut aggr_exprs = select_having_qualify_aggrs;
for order_by_aggr in order_by_aggrs {
if !aggr_exprs.iter().any(|e| e == &order_by_aggr) {
aggr_exprs.push(order_by_aggr);
for extra_aggr in order_by_aggrs.into_iter().chain(on_aggrs) {
if !aggr_exprs.iter().any(|e| e == &extra_aggr) {
aggr_exprs.push(extra_aggr);
}
}

Expand All @@ -267,13 +319,15 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
having_expr: having_expr_post_aggr,
qualify_expr: qualify_expr_post_aggr,
order_by_exprs: mut order_by_rex,
on_exprs: mut on_exprs_post_aggr,
} = if !group_by_exprs.is_empty() || !aggr_exprs.is_empty() {
self.aggregate(
&base_plan,
&select_exprs,
having_expr_opt.as_ref(),
qualify_expr_opt.as_ref(),
&order_by_rex,
&on_exprs_pre_aggr,
&group_by_exprs,
&aggr_exprs,
)?
Expand All @@ -290,6 +344,7 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
having_expr: having_expr_opt,
qualify_expr: qualify_expr_opt,
order_by_exprs: order_by_rex,
on_exprs: on_exprs_pre_aggr,
},
}
};
Expand All @@ -304,12 +359,13 @@ impl<S: ContextProvider> SqlToRel<'_, S> {

// All of the window expressions (deduplicated and rewritten to reference aggregates as
// columns from input). Window functions may be sourced from the SELECT list, QUALIFY
// expression, or ORDER BY.
// expression, ORDER BY, or DISTINCT ON.
let window_func_exprs = find_window_exprs(
select_exprs_post_aggr
.iter()
.chain(qualify_expr_post_aggr.iter())
.chain(order_by_rex.iter().map(|s| &s.expr)),
.chain(order_by_rex.iter().map(|s| &s.expr))
.chain(on_exprs_post_aggr.iter()),
);

// Process window functions after aggregation as they can reference
Expand All @@ -336,6 +392,11 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
})
.collect::<Result<Vec<_>>>()?;

on_exprs_post_aggr = on_exprs_post_aggr
.iter()
.map(|expr| rebase_expr(expr, &window_func_exprs, &plan))
.collect::<Result<Vec<Expr>>>()?;

plan
};

Expand Down Expand Up @@ -377,39 +438,94 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
plan
};

// Try processing unnest expression or do the final projection
let plan = self.try_process_unnest(plan, select_exprs_post_aggr)?;

// Process distinct clause
// Process distinct clause. For `DISTINCT ON` combined with
// aggregation, GROUP BY, or window functions we apply DistinctOn
// *before* the final projection so grouping columns and ORDER BY
// tie-breakers that aren't in the user SELECT stay in scope.
// DistinctOn provides the projection in that case (its select_expr
// list is wrapped in FIRST_VALUE during lowering).
let plan = match select.distinct {
None => Ok(plan),
Some(Distinct::All) => Ok(plan),
None | Some(Distinct::All) => {
self.try_process_unnest(plan, select_exprs_post_aggr)?
}
Some(Distinct::Distinct) => {
LogicalPlanBuilder::from(plan).distinct()?.build()
let plan = self.try_process_unnest(plan, select_exprs_post_aggr)?;
LogicalPlanBuilder::from(plan).distinct()?.build()?
}
Some(Distinct::On(on_expr)) => {
if !aggr_exprs.is_empty()
|| !group_by_exprs.is_empty()
|| !window_func_exprs.is_empty()
Some(Distinct::On(_)) => {
if aggr_exprs.is_empty()
&& group_by_exprs.is_empty()
&& window_func_exprs.is_empty()
{
return not_impl_err!(
"DISTINCT ON expressions with GROUP BY, aggregation or window functions are not supported "
);
}

let on_expr = on_expr
.into_iter()
.map(|e| {
self.sql_expr_to_logical_expr(e, plan.schema(), planner_context)
})
.collect::<Result<Vec<_>>>()?;
// Fast path: no aggregation context. Fuse projection
// and deduplication into a single DistinctOn over
// `base_plan`. The sort attached to DistinctOn via
// `with_sort_expr` later normalizes against base_plan,
// so a bare ORDER BY alias (e.g. `ORDER BY x` where
// SELECT has `a AS x`) must be swapped back to the
// underlying input expression first.
if !alias_map.is_empty() {
order_by_rex = order_by_rex
.into_iter()
.map(|s| {
let expr = substitute_top_level_alias(
s.expr.clone(),
&alias_map,
);
s.with_expr(expr)
})
.collect();
}
LogicalPlanBuilder::from(base_plan)
.distinct_on(on_exprs_post_aggr, select_exprs, None)?
.build()?
} else {
// General path: DistinctOn layered over the post-
// aggregate / post-window plan (no extra Projection
// node — DistinctOn's lowering wraps each select_expr
// in FIRST_VALUE, which acts as the projection).
//
// The DistinctOn input has the post-aggregate raw
// column names (e.g. `max(t.c4)`), not the user-facing
// SELECT aliases (`agg2`). ORDER BY may reference
// those aliases — substitute them back to the
// underlying post-aggregate expression so they
// resolve against the DistinctOn input.
let select_alias_map: datafusion_common::HashMap<String, Expr> =
select_exprs_post_aggr
.iter()
.filter_map(|e| match e {
Expr::Alias(a) => {
Some((a.name.clone(), (*a.expr).clone()))
}
_ => None,
})
.collect();

if !select_alias_map.is_empty() {
// Only swap an alias for its underlying expression
// when the ORDER BY item is a bare alias name —
// matches PostgreSQL's behavior. `b` resolves to
// the alias; `b + 0` keeps `b` as the input
// column.
order_by_rex = order_by_rex
.into_iter()
.map(|s| {
let expr = substitute_top_level_alias(
s.expr.clone(),
&select_alias_map,
);
s.with_expr(expr)
})
.collect();
}

// Build the final plan
LogicalPlanBuilder::from(base_plan)
.distinct_on(on_expr, select_exprs, None)?
.build()
LogicalPlanBuilder::from(plan)
.distinct_on(on_exprs_post_aggr, select_exprs_post_aggr, None)?
.build()?
}
}
}?;
};

// DISTRIBUTE BY
let plan = if !select.distribute_by.is_empty() {
Expand Down Expand Up @@ -1011,6 +1127,7 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
having_expr_opt: Option<&Expr>,
qualify_expr_opt: Option<&Expr>,
order_by_exprs: &[SortExpr],
on_exprs: &[Expr],
group_by_exprs: &[Expr],
aggr_exprs: &[Expr],
) -> Result<AggregatePlanResult> {
Expand Down Expand Up @@ -1177,12 +1294,28 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
),
)?;

// Rewrite the DISTINCT ON expressions to use the columns produced by
// the aggregation. Same shape as ORDER BY rewriting so a hidden
// grouping column or a raw aggregate expression in ON is resolved.
let on_exprs_post_aggr = on_exprs
.iter()
.map(|expr| rebase_expr(expr, &aggr_projection_exprs, input))
.collect::<Result<Vec<Expr>>>()?;
check_columns_satisfy_exprs(
&all_valid_exprs,
&on_exprs_post_aggr,
CheckColumnsSatisfyExprsPurpose::Aggregate(
CheckColumnsMustReferenceAggregatePurpose::DistinctOn,
),
)?;

Ok(AggregatePlanResult {
plan,
select_exprs: select_exprs_post_aggr,
having_expr: having_expr_post_aggr,
qualify_expr: qualify_expr_post_aggr,
order_by_exprs: order_by_post_aggr,
on_exprs: on_exprs_post_aggr,
})
}

Expand Down
4 changes: 4 additions & 0 deletions datafusion/sql/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ pub(crate) enum CheckColumnsMustReferenceAggregatePurpose {
Having,
Qualify,
OrderBy,
DistinctOn,
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
Expand All @@ -120,6 +121,9 @@ impl CheckColumnsSatisfyExprsPurpose {
Self::Aggregate(CheckColumnsMustReferenceAggregatePurpose::OrderBy) => {
"Column in ORDER BY must be in GROUP BY or an aggregate function"
}
Self::Aggregate(CheckColumnsMustReferenceAggregatePurpose::DistinctOn) => {
"Column in DISTINCT ON must be in GROUP BY or an aggregate function"
}
}
}

Expand Down
Loading
Loading