From e7be9bbdaea7f3a4483bd759eddd31e30ddb8bfb Mon Sep 17 00:00:00 2001 From: Sergei Grebnov Date: Wed, 1 Apr 2026 14:31:37 +0300 Subject: [PATCH] fix(unparser): Fix column alias rewriting for Filter nodes preserved by Inexact filter pushdown --- datafusion/sql/src/unparser/plan.rs | 37 +++ datafusion/sql/src/unparser/utils.rs | 10 + datafusion/sql/tests/cases/plan_to_sql.rs | 320 +++++++++++++++++++++- 3 files changed, 366 insertions(+), 1 deletion(-) diff --git a/datafusion/sql/src/unparser/plan.rs b/datafusion/sql/src/unparser/plan.rs index ca8dfa431b4f5..6c862a90f3d83 100644 --- a/datafusion/sql/src/unparser/plan.rs +++ b/datafusion/sql/src/unparser/plan.rs @@ -1176,6 +1176,43 @@ impl Unparser<'_> { } Ok(ret) } + // Handle Filter between SubqueryAlias and TableScan (e.g. Inexact filter pushdown) or + // manually created plan. Rewrite predicate column references to use the alias. + // Skip predicates with subquery expressions — TableAliasRewriter + // cannot rewrite OuterReferenceColumn inside subquery LogicalPlans. + // Returning None lets the caller wrap the plan as a derived table, + // preserving the original table name for outer references and generate correct SQL. + LogicalPlan::Filter(filter) => { + if filter.predicate.exists(|e| { + Ok(matches!( + e, + Expr::Exists(_) | Expr::InSubquery(_) | Expr::ScalarSubquery(_) + )) + })? { + return Ok(None); + } + + if let Some(plan) = self.unparse_table_scan_pushdown( + &filter.input, + alias.clone(), + already_projected, + )? { + let predicate = if let Some(ref alias_name) = alias { + let mut rewriter = TableAliasRewriter { + table_schema: plan.schema().as_arrow(), + alias_name: alias_name.clone(), + }; + filter.predicate.clone().rewrite(&mut rewriter).data()? + } else { + filter.predicate.clone() + }; + Ok(Some( + LogicalPlanBuilder::from(plan).filter(predicate)?.build()?, + )) + } else { + Ok(None) + } + } // SubqueryAlias could be rewritten to a plan with a projection as the top node by [rewrite::subquery_alias_inner_query_and_columns]. // The inner table scan could be a scan with pushdown operations. LogicalPlan::Projection(projection) => { diff --git a/datafusion/sql/src/unparser/utils.rs b/datafusion/sql/src/unparser/utils.rs index f539c0ddc1e87..17c1ebbee7190 100644 --- a/datafusion/sql/src/unparser/utils.rs +++ b/datafusion/sql/src/unparser/utils.rs @@ -365,6 +365,16 @@ pub(crate) fn try_transform_to_simple_table_scan_with_filters( alias_name: alias_name.clone(), }); + // Rewrite already-collected Filter node predicates to use the + // table alias so they can be properly deduplicated against the + // rewritten TableScan filters below. + if let Some(ref mut rewriter) = filter_alias_rewriter { + filters = filters + .into_iter() + .map(|expr| expr.rewrite(rewriter).data()) + .collect::, _>>()?; + } + // rewrite filters to use table alias if present let table_scan_filters = table_scan .filters diff --git a/datafusion/sql/tests/cases/plan_to_sql.rs b/datafusion/sql/tests/cases/plan_to_sql.rs index 0dad48b168976..6aad938df9415 100644 --- a/datafusion/sql/tests/cases/plan_to_sql.rs +++ b/datafusion/sql/tests/cases/plan_to_sql.rs @@ -28,7 +28,7 @@ use datafusion_expr::test::function_stub::{ use datafusion_expr::{ EmptyRelation, Expr, Extension, LogicalPlan, LogicalPlanBuilder, Union, UserDefinedLogicalNode, UserDefinedLogicalNodeCore, WindowFrame, - WindowFunctionDefinition, cast, col, lit, table_scan, wildcard, + WindowFunctionDefinition, cast, col, lit, not_exists, table_scan, wildcard, }; use datafusion_functions::unicode; use datafusion_functions_aggregate::grouping::grouping_udaf; @@ -2904,3 +2904,321 @@ fn test_json_access_3() { @r#"SELECT (j1.j1_string : 'field.inner1[''inner2'']') FROM j1"# ); } + +/// Verifies that `SubqueryAlias` wrapping a `Filter` over a `TableScan` with +/// pushdown filters correctly rewrites column references to the alias name. +/// +/// Three scenarios are covered: +/// +/// 1. **Inexact pushdown** — the same predicate appears in both the `TableScan` +/// filters and the `Filter` node (the optimizer keeps the `Filter` for +/// re-checking): +/// - SubqueryAlias: n1 / Filter: nation.n_name = 'FRANCE' / TableScan: nation filters=[nation.n_name = 'FRANCE'] +/// +/// 2. **Mixed pushdown** — the `Filter` node contains additional predicates +/// beyond what was pushed into the `TableScan` (e.g. some filters returned +/// `Unsupported` from `supports_filters_pushdown`): +/// - SubqueryAlias: n1 / Filter: nation.n_name = 'FRANCE' AND nation.n_nationkey > 10 / TableScan: nation filters=[nation.n_name = 'FRANCE'] +/// +/// 3. **Disjoint predicates** — the `Filter` and `TableScan` have completely +/// different predicates (no duplicates): +/// - SubqueryAlias: n1 / Filter: nation.n_nationkey > 10 / TableScan: nation filters=[nation.n_name = 'FRANCE'] +#[test] +fn test_subquery_alias_with_filter_over_table_scan_pushdown() -> Result<()> { + let schema = Schema::new(vec![ + Field::new("n_nationkey", DataType::Int32, false), + Field::new("n_name", DataType::Utf8, false), + ]); + + // Scenario 1: Inexact pushdown — same predicate in both TableScan and Filter + let scan = table_scan_with_filters( + Some("nation"), + &schema, + Some(vec![0, 1]), + vec![col("n_name").eq(lit("FRANCE"))], + )? + .build()?; + + let filtered = LogicalPlanBuilder::from(scan) + .filter(col("nation.n_name").eq(lit("FRANCE")))? + .build()?; + + let aliased = LogicalPlanBuilder::from(filtered).alias("n1")?.build()?; + + let sql = plan_to_sql(&aliased)?; + // Duplicate filter is expected: both the Filter node and TableScan carry + // the same predicate for Inexact pushdown. Deduplicating filters that may + // be duplicated by Inexact/partial pushdown does not affect query + // correctness and can be optimized separately. + assert_snapshot!( + sql, + @"SELECT n1.n_nationkey, n1.n_name FROM nation AS n1 WHERE (n1.n_name = 'FRANCE') AND (n1.n_name = 'FRANCE')" + ); + + // Scenario 2: Mixed pushdown — Filter has an additional predicate not in TableScan + let scan = table_scan_with_filters( + Some("nation"), + &schema, + Some(vec![0, 1]), + vec![col("n_name").eq(lit("FRANCE"))], + )? + .build()?; + + let filtered = LogicalPlanBuilder::from(scan) + .filter( + col("nation.n_name") + .eq(lit("FRANCE")) + .and(col("nation.n_nationkey").gt(lit(10))), + )? + .build()?; + + let aliased = LogicalPlanBuilder::from(filtered).alias("n1")?.build()?; + + let sql = plan_to_sql(&aliased)?; + // The `n_name = 'FRANCE'` predicate appears twice: once from the Filter + // node (which also carries the extra `n_nationkey > 10`) and once from + // the TableScan pushdown filters. This is correct but redundant — dedup + // for Inexact/partial pushdown duplicates can be optimized separately. + assert_snapshot!( + sql, + @"SELECT n1.n_nationkey, n1.n_name FROM nation AS n1 WHERE ((n1.n_name = 'FRANCE') AND (n1.n_nationkey > 10)) AND (n1.n_name = 'FRANCE')" + ); + + // Scenario 3: Disjoint predicates — Filter and TableScan have different predicates + let scan = table_scan_with_filters( + Some("nation"), + &schema, + Some(vec![0, 1]), + vec![col("n_name").eq(lit("FRANCE"))], + )? + .build()?; + + let filtered = LogicalPlanBuilder::from(scan) + .filter(col("nation.n_nationkey").gt(lit(10)))? + .build()?; + + let aliased = LogicalPlanBuilder::from(filtered).alias("n1")?.build()?; + + let sql = plan_to_sql(&aliased)?; + // No duplicate: Filter predicate differs from TableScan pushdown filter. + assert_snapshot!( + sql, + @"SELECT n1.n_nationkey, n1.n_name FROM nation AS n1 WHERE (n1.n_nationkey > 10) AND (n1.n_name = 'FRANCE')" + ); + + Ok(()) +} + +/// Verifies that a `Filter` node above a `TableScan` with pushdown filters +/// (without a `SubqueryAlias`) is handled correctly by the SQL conversion path. +#[test] +fn test_filter_over_table_scan_pushdown_no_alias() -> Result<()> { + let schema = Schema::new(vec![ + Field::new("n_nationkey", DataType::Int32, false), + Field::new("n_name", DataType::Utf8, false), + ]); + + // Same predicate in both TableScan and Filter (Inexact pushdown, no alias) + let scan = table_scan_with_filters( + Some("nation"), + &schema, + Some(vec![0, 1]), + vec![col("n_name").eq(lit("FRANCE"))], + )? + .build()?; + + let filtered = LogicalPlanBuilder::from(scan) + .filter(col("nation.n_name").eq(lit("FRANCE")))? + .build()?; + + let sql = plan_to_sql(&filtered)?; + // Duplicate filter: same predicate in Filter node and TableScan pushdown. + // Dedup for Inexact/partial pushdown duplicates does not affect correctness + // and can be optimized separately. + assert_snapshot!( + sql, + @"SELECT nation.n_nationkey, nation.n_name FROM nation WHERE (nation.n_name = 'FRANCE') AND (nation.n_name = 'FRANCE')" + ); + + // Filter has additional predicate beyond what was pushed down + let scan = table_scan_with_filters( + Some("nation"), + &schema, + Some(vec![0, 1]), + vec![col("n_name").eq(lit("FRANCE"))], + )? + .build()?; + + let filtered = LogicalPlanBuilder::from(scan) + .filter( + col("nation.n_name") + .eq(lit("FRANCE")) + .and(col("nation.n_nationkey").gt(lit(10))), + )? + .build()?; + + let sql = plan_to_sql(&filtered)?; + // `n_name = 'FRANCE'` appears in both Filter and TableScan — redundant + // but correct. Dedup can be optimized separately. + assert_snapshot!( + sql, + @"SELECT nation.n_nationkey, nation.n_name FROM nation WHERE ((nation.n_name = 'FRANCE') AND (nation.n_nationkey > 10)) AND (nation.n_name = 'FRANCE')" + ); + + // Disjoint predicates — Filter and TableScan have different predicates + let scan = table_scan_with_filters( + Some("nation"), + &schema, + Some(vec![0, 1]), + vec![col("n_name").eq(lit("FRANCE"))], + )? + .build()?; + + let filtered = LogicalPlanBuilder::from(scan) + .filter(col("nation.n_nationkey").gt(lit(10)))? + .build()?; + + let sql = plan_to_sql(&filtered)?; + // No duplicate: Filter predicate differs from TableScan pushdown filter. + assert_snapshot!( + sql, + @"SELECT nation.n_nationkey, nation.n_name FROM nation WHERE (nation.n_nationkey > 10) AND (nation.n_name = 'FRANCE')" + ); + + Ok(()) +} + +/// Verifies that when `SubqueryAlias(Filter(TableScan))` is used inside a JOIN, +/// `try_transform_to_simple_table_scan_with_filters` rewrites the Filter +/// predicate with the alias so `nation.n_name` becomes `n1.n_name` and the +/// duplicate filter is properly deduplicated. +#[test] +fn test_join_with_filter_over_aliased_table_scan_pushdown() -> Result<()> { + let nation_schema = Schema::new(vec![ + Field::new("n_nationkey", DataType::Int32, false), + Field::new("n_name", DataType::Utf8, false), + ]); + + let supplier_schema = Schema::new(vec![ + Field::new("s_suppkey", DataType::Int32, false), + Field::new("s_nationkey", DataType::Int32, false), + ]); + + let supplier_scan = + table_scan(Some("supplier"), &supplier_schema, Some(vec![0, 1]))?.build()?; + + // Build: SubqueryAlias(n1, Filter(nation.n_name IN ('FRANCE','GERMANY'), + // TableScan(nation, partial_filters=[same]))) + let nation_scan = table_scan_with_filters( + Some("nation"), + &nation_schema, + Some(vec![0, 1]), + vec![ + col("n_name") + .eq(lit("FRANCE")) + .or(col("n_name").eq(lit("GERMANY"))), + ], + )? + .build()?; + + let nation_filtered = LogicalPlanBuilder::from(nation_scan) + .filter( + col("nation.n_name") + .eq(lit("FRANCE")) + .or(col("nation.n_name").eq(lit("GERMANY"))), + )? + .build()?; + + let nation_aliased = LogicalPlanBuilder::from(nation_filtered) + .alias("n1")? + .build()?; + + let join_plan = LogicalPlanBuilder::from(supplier_scan) + .join( + nation_aliased, + datafusion_expr::JoinType::Inner, + (vec!["supplier.s_nationkey"], vec!["n1.n_nationkey"]), + None, + )? + .build()?; + + let sql = plan_to_sql(&join_plan)?; + // The filter predicate should use alias n1 (not original table name "nation"). + // With Inexact pushdown, the Filter and TableScan carry the same predicate; + // after alias rewriting both become identical and are deduplicated. + assert_snapshot!( + sql, + @r#"SELECT supplier.s_suppkey, supplier.s_nationkey, n1.n_nationkey, n1.n_name FROM supplier INNER JOIN nation AS n1 ON supplier.s_nationkey = n1.n_nationkey AND ((n1.n_name = 'FRANCE') OR (n1.n_name = 'GERMANY'))"# + ); + + Ok(()) +} + +/// Verifies that when a `Filter` predicate contains subquery expressions +/// (e.g. `NOT EXISTS`), `unparse_table_scan_pushdown` returns `None` so the +/// caller's `SubqueryAlias` handler falls back to wrapping the inner plan as a +/// derived table: `(SELECT ... FROM table WHERE ...) AS alias`. +/// +/// This preserves the original table name inside the derived table, which is +/// required for `OuterReferenceColumn` expressions (e.g. `outer_ref(customer.c_custkey)`) +/// that refer to the original table name rather than the alias. +#[test] +fn test_filter_with_subquery_over_aliased_table_scan_pushdown() -> Result<()> { + let customer_schema = Schema::new(vec![ + Field::new("c_custkey", DataType::Int32, false), + Field::new("c_phone", DataType::Utf8, false), + Field::new("c_acctbal", DataType::Float64, false), + ]); + + let orders_schema = Schema::new(vec![ + Field::new("o_orderkey", DataType::Int32, false), + Field::new("o_custkey", DataType::Int32, false), + ]); + + // Build a NOT EXISTS subquery: + // NOT EXISTS (SELECT o_orderkey, o_custkey FROM orders + // WHERE orders.o_custkey = outer_ref(customer.c_custkey)) + let orders_scan = + table_scan(Some("orders"), &orders_schema, Some(vec![0, 1]))?.build()?; + + let subquery_filter = LogicalPlanBuilder::from(orders_scan) + .filter(col("orders.o_custkey").eq(Expr::OuterReferenceColumn( + Arc::new(Field::new("c_custkey", DataType::Int32, false)), + Column::new(Some("customer"), "c_custkey"), + )))? + .build()?; + + let not_exists_expr = not_exists(Arc::new(subquery_filter)); + + // Build the plan mirroring Q22 structure: + // SubqueryAlias(custsale, + // Projection(c_custkey, c_acctbal), + // Filter(c_acctbal > 0 AND NOT EXISTS(...), + // TableScan(customer, partial_filters=[c_acctbal > 0]))) + let customer_scan = table_scan_with_filters( + Some("customer"), + &customer_schema, + Some(vec![0, 1, 2]), + vec![col("c_acctbal").gt(lit(0.0))], + )? + .build()?; + + let filtered = LogicalPlanBuilder::from(customer_scan) + .filter(col("customer.c_acctbal").gt(lit(0.0)).and(not_exists_expr))? + .project(vec![col("customer.c_custkey"), col("customer.c_acctbal")])? + .build()?; + + let aliased = LogicalPlanBuilder::from(filtered) + .alias("custsale")? + .build()?; + + let sql = plan_to_sql(&aliased)?; + // The subquery produces a derived table that preserves the original "customer" table name, + // so outer_ref(customer.c_custkey) remains valid inside the NOT EXISTS subquery. + assert_snapshot!( + sql, + @"SELECT * FROM (SELECT customer.c_custkey, customer.c_acctbal FROM customer WHERE ((customer.c_acctbal > 0.0) AND NOT EXISTS (SELECT orders.o_orderkey, orders.o_custkey FROM orders WHERE (orders.o_custkey = customer.c_custkey))) AND (customer.c_acctbal > 0.0)) AS custsale" + ); + + Ok(()) +}