Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions datafusion/sql/src/unparser/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1236,6 +1236,43 @@ impl Unparser<'_> {
}
Ok(ret)
}
// Handle Filter between SubqueryAlias and TableScan (e.g. Inexact filter pushdown) or
// manually created plan. Rewrite predicate column references to use the alias.
// Skip predicates with subquery expressions — TableAliasRewriter
// cannot rewrite OuterReferenceColumn inside subquery LogicalPlans.
// Returning None lets the caller wrap the plan as a derived table,
// preserving the original table name for outer references and generate correct SQL.
LogicalPlan::Filter(filter) => {
if filter.predicate.exists(|e| {
Ok(matches!(
e,
Expr::Exists(_) | Expr::InSubquery(_) | Expr::ScalarSubquery(_)
))
})? {
return Ok(None);
}

if let Some(plan) = self.unparse_table_scan_pushdown(
&filter.input,
alias.clone(),
already_projected,
)? {
let predicate = if let Some(ref alias_name) = alias {
let mut rewriter = TableAliasRewriter {
table_schema: plan.schema().as_arrow(),
alias_name: alias_name.clone(),
};
filter.predicate.clone().rewrite(&mut rewriter).data()?
} else {
filter.predicate.clone()
};
Ok(Some(
LogicalPlanBuilder::from(plan).filter(predicate)?.build()?,
))
} else {
Ok(None)
}
}
// SubqueryAlias could be rewritten to a plan with a projection as the top node by [rewrite::subquery_alias_inner_query_and_columns].
// The inner table scan could be a scan with pushdown operations.
LogicalPlan::Projection(projection) => {
Expand Down
10 changes: 10 additions & 0 deletions datafusion/sql/src/unparser/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,16 @@ pub(crate) fn try_transform_to_simple_table_scan_with_filters(
alias_name: alias_name.clone(),
});

// Rewrite already-collected Filter node predicates to use the
// table alias so they can be properly deduplicated against the
// rewritten TableScan filters below.
if let Some(ref mut rewriter) = filter_alias_rewriter {
filters = filters
.into_iter()
.map(|expr| expr.rewrite(rewriter).data())
.collect::<Result<IndexSet<_>, _>>()?;
}

// rewrite filters to use table alias if present
let table_scan_filters = table_scan
.filters
Expand Down
320 changes: 319 additions & 1 deletion datafusion/sql/tests/cases/plan_to_sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use datafusion_expr::test::function_stub::{
use datafusion_expr::{
EmptyRelation, Expr, Extension, LogicalPlan, LogicalPlanBuilder, Union,
UserDefinedLogicalNode, UserDefinedLogicalNodeCore, WindowFrame,
WindowFunctionDefinition, cast, col, lit, table_scan, wildcard,
WindowFunctionDefinition, cast, col, lit, not_exists, table_scan, wildcard,
};
use datafusion_functions::unicode;
use datafusion_functions_aggregate::grouping::grouping_udaf;
Expand Down Expand Up @@ -3006,3 +3006,321 @@ fn test_unparse_manual_join_with_subquery_aggregate() -> Result<()> {

Ok(())
}

/// Verifies that `SubqueryAlias` wrapping a `Filter` over a `TableScan` with
/// pushdown filters correctly rewrites column references to the alias name.
///
/// Three scenarios are covered:
///
/// 1. **Inexact pushdown** — the same predicate appears in both the `TableScan`
/// filters and the `Filter` node (the optimizer keeps the `Filter` for
/// re-checking):
/// - SubqueryAlias: n1 / Filter: nation.n_name = 'FRANCE' / TableScan: nation filters=[nation.n_name = 'FRANCE']
///
/// 2. **Mixed pushdown** — the `Filter` node contains additional predicates
/// beyond what was pushed into the `TableScan` (e.g. some filters returned
/// `Unsupported` from `supports_filters_pushdown`):
/// - SubqueryAlias: n1 / Filter: nation.n_name = 'FRANCE' AND nation.n_nationkey > 10 / TableScan: nation filters=[nation.n_name = 'FRANCE']
///
/// 3. **Disjoint predicates** — the `Filter` and `TableScan` have completely
/// different predicates (no duplicates):
/// - SubqueryAlias: n1 / Filter: nation.n_nationkey > 10 / TableScan: nation filters=[nation.n_name = 'FRANCE']
#[test]
fn test_subquery_alias_with_filter_over_table_scan_pushdown() -> Result<()> {
let schema = Schema::new(vec![
Field::new("n_nationkey", DataType::Int32, false),
Field::new("n_name", DataType::Utf8, false),
]);

// Scenario 1: Inexact pushdown — same predicate in both TableScan and Filter
let scan = table_scan_with_filters(
Some("nation"),
&schema,
Some(vec![0, 1]),
vec![col("n_name").eq(lit("FRANCE"))],
)?
.build()?;

let filtered = LogicalPlanBuilder::from(scan)
.filter(col("nation.n_name").eq(lit("FRANCE")))?
.build()?;

let aliased = LogicalPlanBuilder::from(filtered).alias("n1")?.build()?;

let sql = plan_to_sql(&aliased)?;
// Duplicate filter is expected: both the Filter node and TableScan carry
// the same predicate for Inexact pushdown. Deduplicating filters that may
// be duplicated by Inexact/partial pushdown does not affect query
// correctness and can be optimized separately.
assert_snapshot!(
sql,
@"SELECT n1.n_nationkey, n1.n_name FROM nation AS n1 WHERE (n1.n_name = 'FRANCE') AND (n1.n_name = 'FRANCE')"
);

// Scenario 2: Mixed pushdown — Filter has an additional predicate not in TableScan
let scan = table_scan_with_filters(
Some("nation"),
&schema,
Some(vec![0, 1]),
vec![col("n_name").eq(lit("FRANCE"))],
)?
.build()?;

let filtered = LogicalPlanBuilder::from(scan)
.filter(
col("nation.n_name")
.eq(lit("FRANCE"))
.and(col("nation.n_nationkey").gt(lit(10))),
)?
.build()?;

let aliased = LogicalPlanBuilder::from(filtered).alias("n1")?.build()?;

let sql = plan_to_sql(&aliased)?;
// The `n_name = 'FRANCE'` predicate appears twice: once from the Filter
// node (which also carries the extra `n_nationkey > 10`) and once from
// the TableScan pushdown filters. This is correct but redundant — dedup
// for Inexact/partial pushdown duplicates can be optimized separately.
assert_snapshot!(
sql,
@"SELECT n1.n_nationkey, n1.n_name FROM nation AS n1 WHERE ((n1.n_name = 'FRANCE') AND (n1.n_nationkey > 10)) AND (n1.n_name = 'FRANCE')"
);

// Scenario 3: Disjoint predicates — Filter and TableScan have different predicates
let scan = table_scan_with_filters(
Some("nation"),
&schema,
Some(vec![0, 1]),
vec![col("n_name").eq(lit("FRANCE"))],
)?
.build()?;

let filtered = LogicalPlanBuilder::from(scan)
.filter(col("nation.n_nationkey").gt(lit(10)))?
.build()?;

let aliased = LogicalPlanBuilder::from(filtered).alias("n1")?.build()?;

let sql = plan_to_sql(&aliased)?;
// No duplicate: Filter predicate differs from TableScan pushdown filter.
assert_snapshot!(
sql,
@"SELECT n1.n_nationkey, n1.n_name FROM nation AS n1 WHERE (n1.n_nationkey > 10) AND (n1.n_name = 'FRANCE')"
);

Ok(())
}

/// Verifies that a `Filter` node above a `TableScan` with pushdown filters
/// (without a `SubqueryAlias`) is handled correctly by the SQL conversion path.
#[test]
fn test_filter_over_table_scan_pushdown_no_alias() -> Result<()> {
let schema = Schema::new(vec![
Field::new("n_nationkey", DataType::Int32, false),
Field::new("n_name", DataType::Utf8, false),
]);

// Same predicate in both TableScan and Filter (Inexact pushdown, no alias)
let scan = table_scan_with_filters(
Some("nation"),
&schema,
Some(vec![0, 1]),
vec![col("n_name").eq(lit("FRANCE"))],
)?
.build()?;

let filtered = LogicalPlanBuilder::from(scan)
.filter(col("nation.n_name").eq(lit("FRANCE")))?
.build()?;

let sql = plan_to_sql(&filtered)?;
// Duplicate filter: same predicate in Filter node and TableScan pushdown.
// Dedup for Inexact/partial pushdown duplicates does not affect correctness
// and can be optimized separately.
assert_snapshot!(
sql,
@"SELECT nation.n_nationkey, nation.n_name FROM nation WHERE (nation.n_name = 'FRANCE') AND (nation.n_name = 'FRANCE')"
);

// Filter has additional predicate beyond what was pushed down
let scan = table_scan_with_filters(
Some("nation"),
&schema,
Some(vec![0, 1]),
vec![col("n_name").eq(lit("FRANCE"))],
)?
.build()?;

let filtered = LogicalPlanBuilder::from(scan)
.filter(
col("nation.n_name")
.eq(lit("FRANCE"))
.and(col("nation.n_nationkey").gt(lit(10))),
)?
.build()?;

let sql = plan_to_sql(&filtered)?;
// `n_name = 'FRANCE'` appears in both Filter and TableScan — redundant
// but correct. Dedup can be optimized separately.
assert_snapshot!(
sql,
@"SELECT nation.n_nationkey, nation.n_name FROM nation WHERE ((nation.n_name = 'FRANCE') AND (nation.n_nationkey > 10)) AND (nation.n_name = 'FRANCE')"
);

// Disjoint predicates — Filter and TableScan have different predicates
let scan = table_scan_with_filters(
Some("nation"),
&schema,
Some(vec![0, 1]),
vec![col("n_name").eq(lit("FRANCE"))],
)?
.build()?;

let filtered = LogicalPlanBuilder::from(scan)
.filter(col("nation.n_nationkey").gt(lit(10)))?
.build()?;

let sql = plan_to_sql(&filtered)?;
// No duplicate: Filter predicate differs from TableScan pushdown filter.
assert_snapshot!(
sql,
@"SELECT nation.n_nationkey, nation.n_name FROM nation WHERE (nation.n_nationkey > 10) AND (nation.n_name = 'FRANCE')"
);

Ok(())
}

/// Verifies that when `SubqueryAlias(Filter(TableScan))` is used inside a JOIN,
/// `try_transform_to_simple_table_scan_with_filters` rewrites the Filter
/// predicate with the alias so `nation.n_name` becomes `n1.n_name` and the
/// duplicate filter is properly deduplicated.
#[test]
fn test_join_with_filter_over_aliased_table_scan_pushdown() -> Result<()> {
let nation_schema = Schema::new(vec![
Field::new("n_nationkey", DataType::Int32, false),
Field::new("n_name", DataType::Utf8, false),
]);

let supplier_schema = Schema::new(vec![
Field::new("s_suppkey", DataType::Int32, false),
Field::new("s_nationkey", DataType::Int32, false),
]);

let supplier_scan =
table_scan(Some("supplier"), &supplier_schema, Some(vec![0, 1]))?.build()?;

// Build: SubqueryAlias(n1, Filter(nation.n_name IN ('FRANCE','GERMANY'),
// TableScan(nation, partial_filters=[same])))
let nation_scan = table_scan_with_filters(
Some("nation"),
&nation_schema,
Some(vec![0, 1]),
vec![
col("n_name")
.eq(lit("FRANCE"))
.or(col("n_name").eq(lit("GERMANY"))),
],
)?
.build()?;

let nation_filtered = LogicalPlanBuilder::from(nation_scan)
.filter(
col("nation.n_name")
.eq(lit("FRANCE"))
.or(col("nation.n_name").eq(lit("GERMANY"))),
)?
.build()?;

let nation_aliased = LogicalPlanBuilder::from(nation_filtered)
.alias("n1")?
.build()?;

let join_plan = LogicalPlanBuilder::from(supplier_scan)
.join(
nation_aliased,
datafusion_expr::JoinType::Inner,
(vec!["supplier.s_nationkey"], vec!["n1.n_nationkey"]),
None,
)?
.build()?;

let sql = plan_to_sql(&join_plan)?;
// The filter predicate should use alias n1 (not original table name "nation").
// With Inexact pushdown, the Filter and TableScan carry the same predicate;
// after alias rewriting both become identical and are deduplicated.
assert_snapshot!(
sql,
@r#"SELECT supplier.s_suppkey, supplier.s_nationkey, n1.n_nationkey, n1.n_name FROM supplier INNER JOIN nation AS n1 ON supplier.s_nationkey = n1.n_nationkey AND ((n1.n_name = 'FRANCE') OR (n1.n_name = 'GERMANY'))"#
);

Ok(())
}

/// Verifies that when a `Filter` predicate contains subquery expressions
/// (e.g. `NOT EXISTS`), `unparse_table_scan_pushdown` returns `None` so the
/// caller's `SubqueryAlias` handler falls back to wrapping the inner plan as a
/// derived table: `(SELECT ... FROM table WHERE ...) AS alias`.
///
/// This preserves the original table name inside the derived table, which is
/// required for `OuterReferenceColumn` expressions (e.g. `outer_ref(customer.c_custkey)`)
/// that refer to the original table name rather than the alias.
#[test]
fn test_filter_with_subquery_over_aliased_table_scan_pushdown() -> Result<()> {
let customer_schema = Schema::new(vec![
Field::new("c_custkey", DataType::Int32, false),
Field::new("c_phone", DataType::Utf8, false),
Field::new("c_acctbal", DataType::Float64, false),
]);

let orders_schema = Schema::new(vec![
Field::new("o_orderkey", DataType::Int32, false),
Field::new("o_custkey", DataType::Int32, false),
]);

// Build a NOT EXISTS subquery:
// NOT EXISTS (SELECT o_orderkey, o_custkey FROM orders
// WHERE orders.o_custkey = outer_ref(customer.c_custkey))
let orders_scan =
table_scan(Some("orders"), &orders_schema, Some(vec![0, 1]))?.build()?;

let subquery_filter = LogicalPlanBuilder::from(orders_scan)
.filter(col("orders.o_custkey").eq(Expr::OuterReferenceColumn(
Arc::new(Field::new("c_custkey", DataType::Int32, false)),
Column::new(Some("customer"), "c_custkey"),
)))?
.build()?;

let not_exists_expr = not_exists(Arc::new(subquery_filter));

// Build the plan mirroring Q22 structure:
// SubqueryAlias(custsale,
// Projection(c_custkey, c_acctbal),
// Filter(c_acctbal > 0 AND NOT EXISTS(...),
// TableScan(customer, partial_filters=[c_acctbal > 0])))
let customer_scan = table_scan_with_filters(
Some("customer"),
&customer_schema,
Some(vec![0, 1, 2]),
vec![col("c_acctbal").gt(lit(0.0))],
)?
.build()?;

let filtered = LogicalPlanBuilder::from(customer_scan)
.filter(col("customer.c_acctbal").gt(lit(0.0)).and(not_exists_expr))?
.project(vec![col("customer.c_custkey"), col("customer.c_acctbal")])?
.build()?;

let aliased = LogicalPlanBuilder::from(filtered)
.alias("custsale")?
.build()?;

let sql = plan_to_sql(&aliased)?;
// The subquery produces a derived table that preserves the original "customer" table name,
// so outer_ref(customer.c_custkey) remains valid inside the NOT EXISTS subquery.
assert_snapshot!(
sql,
@"SELECT * FROM (SELECT customer.c_custkey, customer.c_acctbal FROM customer WHERE ((customer.c_acctbal > 0.0) AND NOT EXISTS (SELECT orders.o_orderkey, orders.o_custkey FROM orders WHERE (orders.o_custkey = customer.c_custkey))) AND (customer.c_acctbal > 0.0)) AS custsale"
);

Ok(())
}
Loading