Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions datafusion/expr-common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,10 @@ pub mod dyn_eq;
pub mod groups_accumulator;
pub mod interval_arithmetic;
pub mod operator;
pub mod placement;
pub mod signature;
pub mod sort_properties;
pub mod statistics;
pub mod type_coercion;

pub use placement::ExpressionPlacement;
59 changes: 59 additions & 0 deletions datafusion/expr-common/src/placement.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Expression placement information for optimization decisions.

/// Describes where an expression should be placed in the query plan for
/// optimal execution. This is used by optimizers to make decisions about
/// expression placement, such as whether to push expressions down through
/// projections.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum ExpressionPlacement {
/// A constant literal value.
Literal,
/// A simple column reference.
Column,
/// A cheap expression that can be pushed to leaf nodes in the plan.
/// Examples include `get_field` for struct field access.
PlaceAtLeaves,
/// An expensive expression that should stay at the root of the plan.
/// This is the default for most expressions.
PlaceAtRoot,
}

impl ExpressionPlacement {
/// Returns true if the expression can be pushed down to leaf nodes
/// in the query plan.
///
/// This returns true for:
/// - `Column`: Simple column references can be pushed down. They do no compute and do not increase or
/// decrease the amount of data being processed.
/// A projection that reduces the number of columns can eliminate unnecessary data early,
/// but this method only considers one expression at a time, not a projection as a whole.
/// - `PlaceAtLeaves`: Cheap expressions can be pushed down to leaves to take advantage of
/// early computation and potential optimizations at the data source level.
/// For example `struct_col['field']` is cheap to compute (just an Arc clone of the nested array for `'field'`)
/// and thus can reduce data early in the plan at very low compute cost.
/// It may even be possible to eliminate the expression entirely if the data source can project only the needed field
/// (as e.g. Parquet can).
pub fn should_push_to_leaves(&self) -> bool {
matches!(
self,
ExpressionPlacement::Column | ExpressionPlacement::PlaceAtLeaves
)
}
}
18 changes: 18 additions & 0 deletions datafusion/expr/src/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use datafusion_common::tree_node::{
use datafusion_common::{
Column, DFSchema, HashMap, Result, ScalarValue, Spans, TableReference,
};
use datafusion_expr_common::placement::ExpressionPlacement;
use datafusion_functions_window_common::field::WindowUDFFieldArgs;
#[cfg(feature = "sql")]
use sqlparser::ast::{
Expand Down Expand Up @@ -1536,6 +1537,23 @@ impl Expr {
}
}

/// Returns placement information for this expression.
///
/// This is used by optimizers to make decisions about expression placement,
/// such as whether to push expressions down through projections.
pub fn placement(&self) -> ExpressionPlacement {
match self {
Expr::Column(_) => ExpressionPlacement::Column,
Expr::Literal(_, _) => ExpressionPlacement::Literal,
Expr::ScalarFunction(func) => {
let arg_placements: Vec<_> =
func.args.iter().map(|arg| arg.placement()).collect();
func.func.placement(&arg_placements)
}
_ => ExpressionPlacement::PlaceAtRoot,
}
}

/// Return String representation of the variant represented by `self`
/// Useful for non-rust based bindings
pub fn variant_name(&self) -> &str {
Expand Down
1 change: 1 addition & 0 deletions datafusion/expr/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ pub use datafusion_expr_common::accumulator::Accumulator;
pub use datafusion_expr_common::columnar_value::ColumnarValue;
pub use datafusion_expr_common::groups_accumulator::{EmitTo, GroupsAccumulator};
pub use datafusion_expr_common::operator::Operator;
pub use datafusion_expr_common::placement::ExpressionPlacement;
pub use datafusion_expr_common::signature::{
ArrayFunctionArgument, ArrayFunctionSignature, Coercion, Signature,
TIMEZONE_WILDCARD, TypeSignature, TypeSignatureClass, Volatility,
Expand Down
26 changes: 26 additions & 0 deletions datafusion/expr/src/udf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use datafusion_common::config::ConfigOptions;
use datafusion_common::{ExprSchema, Result, ScalarValue, not_impl_err};
use datafusion_expr_common::dyn_eq::{DynEq, DynHash};
use datafusion_expr_common::interval_arithmetic::Interval;
use datafusion_expr_common::placement::ExpressionPlacement;
use std::any::Any;
use std::cmp::Ordering;
use std::fmt::Debug;
Expand Down Expand Up @@ -361,6 +362,13 @@ impl ScalarUDF {
pub fn as_async(&self) -> Option<&AsyncScalarUDF> {
self.inner().as_any().downcast_ref::<AsyncScalarUDF>()
}

/// Returns placement information for this function.
///
/// See [`ScalarUDFImpl::placement`] for more details.
pub fn placement(&self, args: &[ExpressionPlacement]) -> ExpressionPlacement {
self.inner.placement(args)
}
}

impl<F> From<F> for ScalarUDF
Expand Down Expand Up @@ -885,6 +893,20 @@ pub trait ScalarUDFImpl: Debug + DynEq + DynHash + Send + Sync {
fn documentation(&self) -> Option<&Documentation> {
None
}

/// Returns placement information for this function.
///
/// This is used by optimizers to make decisions about expression placement,
/// such as whether to push expressions down through projections.
///
/// The default implementation returns [`ExpressionPlacement::PlaceAtRoot`],
/// meaning the expression should stay at the root of the plan.
///
/// Override this method to indicate that the function can be pushed down
/// closer to the data source.
fn placement(&self, _args: &[ExpressionPlacement]) -> ExpressionPlacement {
ExpressionPlacement::PlaceAtRoot
}
}

/// ScalarUDF that adds an alias to the underlying function. It is better to
Expand Down Expand Up @@ -1012,6 +1034,10 @@ impl ScalarUDFImpl for AliasedScalarUDFImpl {
fn documentation(&self) -> Option<&Documentation> {
self.inner.documentation()
}

fn placement(&self, args: &[ExpressionPlacement]) -> ExpressionPlacement {
self.inner.placement(args)
}
}

#[cfg(test)]
Expand Down
106 changes: 104 additions & 2 deletions datafusion/functions/src/core/getfield.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ use datafusion_common::{
use datafusion_expr::expr::ScalarFunction;
use datafusion_expr::simplify::ExprSimplifyResult;
use datafusion_expr::{
ColumnarValue, Documentation, Expr, ReturnFieldArgs, ScalarFunctionArgs, ScalarUDF,
ScalarUDFImpl, Signature, Volatility,
ColumnarValue, Documentation, Expr, ExpressionPlacement, ReturnFieldArgs,
ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl, Signature, Volatility,
};
use datafusion_macros::user_doc;

Expand Down Expand Up @@ -499,6 +499,32 @@ impl ScalarUDFImpl for GetFieldFunc {
fn documentation(&self) -> Option<&Documentation> {
self.doc()
}

fn placement(&self, args: &[ExpressionPlacement]) -> ExpressionPlacement {
// get_field can be pushed to leaves if:
// 1. The base (first arg) is a column or already placeable at leaves
// 2. All field keys (remaining args) are literals
if args.is_empty() {
return ExpressionPlacement::PlaceAtRoot;
}

let base_placement = args[0];
let base_is_pushable = matches!(
base_placement,
ExpressionPlacement::Column | ExpressionPlacement::PlaceAtLeaves
);

let all_keys_are_literals = args
.iter()
.skip(1)
.all(|p| matches!(p, ExpressionPlacement::Literal));

if base_is_pushable && all_keys_are_literals {
ExpressionPlacement::PlaceAtLeaves
} else {
ExpressionPlacement::PlaceAtRoot
}
}
}

#[cfg(test)]
Expand Down Expand Up @@ -542,4 +568,80 @@ mod tests {

Ok(())
}

#[test]
fn test_placement_literal_key() {
let func = GetFieldFunc::new();

// get_field(col, 'literal') -> leaf-pushable (static field access)
let args = vec![ExpressionPlacement::Column, ExpressionPlacement::Literal];
assert_eq!(func.placement(&args), ExpressionPlacement::PlaceAtLeaves);

// get_field(col, 'a', 'b') -> leaf-pushable (nested static field access)
let args = vec![
ExpressionPlacement::Column,
ExpressionPlacement::Literal,
ExpressionPlacement::Literal,
];
assert_eq!(func.placement(&args), ExpressionPlacement::PlaceAtLeaves);

// get_field(get_field(col, 'a'), 'b') represented as PlaceAtLeaves for base
let args = vec![
ExpressionPlacement::PlaceAtLeaves,
ExpressionPlacement::Literal,
];
assert_eq!(func.placement(&args), ExpressionPlacement::PlaceAtLeaves);
}

#[test]
fn test_placement_column_key() {
let func = GetFieldFunc::new();

// get_field(col, other_col) -> NOT leaf-pushable (dynamic per-row lookup)
let args = vec![ExpressionPlacement::Column, ExpressionPlacement::Column];
assert_eq!(func.placement(&args), ExpressionPlacement::PlaceAtRoot);

// get_field(col, 'a', other_col) -> NOT leaf-pushable (dynamic nested lookup)
let args = vec![
ExpressionPlacement::Column,
ExpressionPlacement::Literal,
ExpressionPlacement::Column,
];
assert_eq!(func.placement(&args), ExpressionPlacement::PlaceAtRoot);
}

#[test]
fn test_placement_root() {
let func = GetFieldFunc::new();

// get_field(root_expr, 'literal') -> NOT leaf-pushable
let args = vec![
ExpressionPlacement::PlaceAtRoot,
ExpressionPlacement::Literal,
];
assert_eq!(func.placement(&args), ExpressionPlacement::PlaceAtRoot);

// get_field(col, root_expr) -> NOT leaf-pushable
let args = vec![
ExpressionPlacement::Column,
ExpressionPlacement::PlaceAtRoot,
];
assert_eq!(func.placement(&args), ExpressionPlacement::PlaceAtRoot);
}

#[test]
fn test_placement_edge_cases() {
let func = GetFieldFunc::new();

// Empty args -> NOT leaf-pushable
assert_eq!(func.placement(&[]), ExpressionPlacement::PlaceAtRoot);

// Just base, no key -> PlaceAtLeaves (not a valid call but should handle gracefully)
let args = vec![ExpressionPlacement::Column];
assert_eq!(func.placement(&args), ExpressionPlacement::PlaceAtLeaves);

// Literal base with literal key -> NOT leaf-pushable (would be constant-folded)
let args = vec![ExpressionPlacement::Literal, ExpressionPlacement::Literal];
assert_eq!(func.placement(&args), ExpressionPlacement::PlaceAtRoot);
}
}
5 changes: 5 additions & 0 deletions datafusion/optimizer/src/common_subexpr_eliminate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -702,6 +702,11 @@ impl CSEController for ExprCSEController<'_> {
#[expect(deprecated)]
let is_normal_minus_aggregates = matches!(
node,
// TODO: there's an argument for removing `Literal` from here,
// maybe using `Expr::placemement().should_push_to_leaves()` instead
// so that we extract common literals and don't broadcast them to num_batch_rows multiple times.
// However that currently breaks things like `percentile_cont()` which expect literal arguments
// (and would instead be getting `col(__common_expr_n)`).
Expr::Literal(..)
| Expr::Column(..)
| Expr::ScalarVariable(..)
Expand Down
14 changes: 4 additions & 10 deletions datafusion/optimizer/src/optimize_projections/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -525,15 +525,14 @@ fn merge_consecutive_projections(proj: Projection) -> Result<Transformed<Project
expr.iter()
.for_each(|expr| expr.add_column_ref_counts(&mut column_referral_map));

// If an expression is non-trivial and appears more than once, do not merge
// If an expression is non-trivial (PlaceAtRoot) and appears more than once, do not merge
// them as consecutive projections will benefit from a compute-once approach.
// For details, see: https://github.com/apache/datafusion/issues/8296
if column_referral_map.into_iter().any(|(col, usage)| {
usage > 1
&& !is_expr_trivial(
&prev_projection.expr
[prev_projection.schema.index_of_column(col).unwrap()],
)
&& !prev_projection.expr[prev_projection.schema.index_of_column(col).unwrap()]
.placement()
.should_push_to_leaves()
}) {
// no change
return Projection::try_new_with_schema(expr, input, schema).map(Transformed::no);
Expand Down Expand Up @@ -586,11 +585,6 @@ fn merge_consecutive_projections(proj: Projection) -> Result<Transformed<Project
}
}

// Check whether `expr` is trivial; i.e. it doesn't imply any computation.
fn is_expr_trivial(expr: &Expr) -> bool {
matches!(expr, Expr::Column(_) | Expr::Literal(_, _))
}
Comment on lines -589 to -592
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The point of this PR is to formalize up these hidden assumptions about expressions and let expressions like get_field participate in the decision of how to treat the expression.


/// Rewrites a projection expression using the projection before it (i.e. its input)
/// This is a subroutine to the `merge_consecutive_projections` function.
///
Expand Down
11 changes: 11 additions & 0 deletions datafusion/physical-expr-common/src/physical_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use datafusion_common::{
};
use datafusion_expr_common::columnar_value::ColumnarValue;
use datafusion_expr_common::interval_arithmetic::Interval;
use datafusion_expr_common::placement::ExpressionPlacement;
use datafusion_expr_common::sort_properties::ExprProperties;
use datafusion_expr_common::statistics::Distribution;

Expand Down Expand Up @@ -430,6 +431,16 @@ pub trait PhysicalExpr: Any + Send + Sync + Display + Debug + DynEq + DynHash {
fn is_volatile_node(&self) -> bool {
false
}

/// Returns placement information for this expression.
///
/// This is used by optimizers to make decisions about expression placement,
/// such as whether to push expressions down through projections.
///
/// The default implementation returns [`ExpressionPlacement::PlaceAtRoot`].
fn placement(&self) -> ExpressionPlacement {
ExpressionPlacement::PlaceAtRoot
}
}

#[deprecated(
Expand Down
5 changes: 5 additions & 0 deletions datafusion/physical-expr/src/expressions/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use arrow::{
use datafusion_common::tree_node::{Transformed, TreeNode};
use datafusion_common::{Result, internal_err, plan_err};
use datafusion_expr::ColumnarValue;
use datafusion_expr_common::placement::ExpressionPlacement;

/// Represents the column at a given index in a RecordBatch
///
Expand Down Expand Up @@ -146,6 +147,10 @@ impl PhysicalExpr for Column {
fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.name)
}

fn placement(&self) -> ExpressionPlacement {
ExpressionPlacement::Column
}
}

impl Column {
Expand Down
5 changes: 5 additions & 0 deletions datafusion/physical-expr/src/expressions/literal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use datafusion_common::{Result, ScalarValue};
use datafusion_expr::Expr;
use datafusion_expr_common::columnar_value::ColumnarValue;
use datafusion_expr_common::interval_arithmetic::Interval;
use datafusion_expr_common::placement::ExpressionPlacement;
use datafusion_expr_common::sort_properties::{ExprProperties, SortProperties};

/// Represents a literal value
Expand Down Expand Up @@ -134,6 +135,10 @@ impl PhysicalExpr for Literal {
fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
std::fmt::Display::fmt(self, f)
}

fn placement(&self) -> ExpressionPlacement {
ExpressionPlacement::Literal
}
}

/// Create a literal expression
Expand Down
Loading