Skip to content
17 changes: 11 additions & 6 deletions datafusion/sql/src/cte.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use std::sync::Arc;
use crate::planner::{ContextProvider, PlannerContext, SqlToRel};

use datafusion_common::{
Result, not_impl_err, plan_err,
Diagnostic, Result, Span, not_impl_err, plan_err,
tree_node::{TreeNode, TreeNodeRecursion},
};
use datafusion_expr::{LogicalPlan, LogicalPlanBuilder, TableSource};
Expand All @@ -37,10 +37,16 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
for cte in with.cte_tables {
// A `WITH` block can't use the same name more than once
let cte_name = self.ident_normalizer.normalize(cte.alias.name.clone());
let cte_name_span = Span::try_from_sqlparser_span(cte.alias.name.span);
if planner_context.contains_cte(&cte_name) {
return plan_err!(
"WITH query name {cte_name:?} specified more than once"
);
let msg =
format!("WITH query name {cte_name:?} specified more than once");
let mut diagnostic = Diagnostic::new_error(&msg, cte_name_span);
if let Some(first_span) = planner_context.get_cte_span(&cte_name) {
diagnostic =
diagnostic.with_note("previously defined here", Some(first_span));
}
return plan_err!("{msg}").map_err(|e| e.with_diagnostic(diagnostic));
}

// Create a logical plan for the CTE
Expand All @@ -53,8 +59,7 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
// Each `WITH` block can change the column names in the last
// projection (e.g. "WITH table(t1, t2) AS SELECT 1, 2").
let final_plan = self.apply_table_alias(cte_plan, cte.alias)?;
// Export the CTE to the outer query
planner_context.insert_cte(cte_name, final_plan);
planner_context.insert_cte_with_span(cte_name, final_plan, cte_name_span);
}
Ok(())
}
Expand Down
28 changes: 23 additions & 5 deletions datafusion/sql/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ use datafusion_common::TableReference;
use datafusion_common::config::SqlParserOptions;
use datafusion_common::datatype::{DataTypeExt, FieldExt};
use datafusion_common::error::add_possible_columns_to_diag;
use datafusion_common::{DFSchema, DataFusionError, Result, not_impl_err, plan_err};
use datafusion_common::{
DFSchema, DataFusionError, Result, Span, not_impl_err, plan_err,
};
use datafusion_common::{
DFSchemaRef, Diagnostic, SchemaError, field_not_found, internal_err,
plan_datafusion_err,
Expand Down Expand Up @@ -258,9 +260,9 @@ pub struct PlannerContext {
/// Data types for numbered parameters ($1, $2, etc), if supplied
/// in `PREPARE` statement
prepare_param_data_types: Arc<Vec<FieldRef>>,
/// Map of CTE name to logical plan of the WITH clause.
/// Map of CTE name to logical plan of the WITH clause and optional span.
/// Use `Arc<LogicalPlan>` to allow cheap cloning
ctes: HashMap<String, Arc<LogicalPlan>>,
ctes: HashMap<String, (Arc<LogicalPlan>, Option<Span>)>,

/// The queries schemas of outer query relations, used to resolve the outer referenced
/// columns in subquery (recursive aware)
Expand Down Expand Up @@ -392,20 +394,36 @@ impl PlannerContext {
/// Subquery for the specified name
pub fn insert_cte(&mut self, cte_name: impl Into<String>, plan: LogicalPlan) {
let cte_name = cte_name.into();
self.ctes.insert(cte_name, Arc::new(plan));
self.ctes.insert(cte_name, (Arc::new(plan), None));
}

/// Inserts a LogicalPlan with an optional span for the CTE
pub(super) fn insert_cte_with_span(
&mut self,
cte_name: impl Into<String>,
plan: LogicalPlan,
span: Option<Span>,
) {
let cte_name = cte_name.into();
self.ctes.insert(cte_name, (Arc::new(plan), span));
}

/// Return a plan for the Common Table Expression (CTE) / Subquery for the
/// specified name
pub fn get_cte(&self, cte_name: &str) -> Option<&LogicalPlan> {
self.ctes.get(cte_name).map(|cte| cte.as_ref())
self.ctes.get(cte_name).map(|(cte, _)| cte.as_ref())
}

/// Remove the plan of CTE / Subquery for the specified name
pub(super) fn remove_cte(&mut self, cte_name: &str) {
self.ctes.remove(cte_name);
}

/// Get the span of a previously defined CTE name
pub(super) fn get_cte_span(&self, name: &str) -> Option<Span> {
self.ctes.get(name).and_then(|(_, span)| *span)
}

/// Sets the left-most set expression schema, returning the previous value
pub(super) fn set_set_expr_left_schema(
&mut self,
Expand Down
40 changes: 34 additions & 6 deletions datafusion/sql/src/relation/join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,26 +16,54 @@
// under the License.

use crate::planner::{ContextProvider, PlannerContext, SqlToRel};
use datafusion_common::{Column, Result, not_impl_err, plan_datafusion_err};
use datafusion_common::{
Column, Diagnostic, Result, Span, not_impl_err, plan_datafusion_err, plan_err,
};
use datafusion_expr::{JoinType, LogicalPlan, LogicalPlanBuilder};
use sqlparser::ast::{
Join, JoinConstraint, JoinOperator, ObjectName, TableFactor, TableWithJoins,
};
use std::collections::HashSet;
use std::collections::{HashMap, HashSet};

impl<S: ContextProvider> SqlToRel<'_, S> {
pub(crate) fn plan_table_with_joins(
&self,
t: TableWithJoins,
planner_context: &mut PlannerContext,
) -> Result<LogicalPlan> {
let mut left = if is_lateral(&t.relation) {
self.create_relation_subquery(t.relation, planner_context)?
let TableWithJoins { relation, joins } = t;

let mut alias_spans: HashMap<String, Option<Span>> = HashMap::new();

if let Some((name, span)) = self.extract_relation_name(&relation)? {
alias_spans.insert(name, span);
}

let mut left = if is_lateral(&relation) {
self.create_relation_subquery(relation, planner_context)?
} else {
self.create_relation(t.relation, planner_context)?
self.create_relation(relation, planner_context)?
};
let old_outer_from_schema = planner_context.outer_from_schema();
for join in t.joins {
for join in joins {
if let Some((ref name, current_span)) =
self.extract_relation_name(&join.relation)?
{
if let Some(prior_span) = alias_spans.get(name) {
let mut diagnostic = Diagnostic::new_error(
"duplicate table alias in FROM clause",
current_span,
);
if let Some(span) = *prior_span {
diagnostic =
diagnostic.with_note("first defined here", Some(span));
}
return plan_err!("duplicate table alias in FROM clause")
.map_err(|e| e.with_diagnostic(diagnostic));
}
alias_spans.insert(name.clone(), current_span);
}

planner_context.extend_outer_from_schema(left.schema())?;
left = self.parse_relation_join(left, join, planner_context)?;
}
Expand Down
25 changes: 25 additions & 0 deletions datafusion/sql/src/relation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,31 @@ impl<'a, 'b, S: ContextProvider> RelationPlannerContext
}

impl<S: ContextProvider> SqlToRel<'_, S> {
pub(crate) fn extract_relation_name(
&self,
relation: &TableFactor,
) -> Result<Option<(String, Option<Span>)>> {
match relation {
TableFactor::Table { alias: Some(a), .. }
| TableFactor::Derived { alias: Some(a), .. }
| TableFactor::Function { alias: Some(a), .. }
| TableFactor::UNNEST { alias: Some(a), .. }
| TableFactor::NestedJoin { alias: Some(a), .. } => {
let span = Span::try_from_sqlparser_span(a.name.span);
let name = self.ident_normalizer.normalize(a.name.clone());
Ok(Some((name, span)))
}
TableFactor::Table {
name, alias: None, ..
} => {
let span = Span::try_from_sqlparser_span(relation.span());
let table_ref = self.object_name_to_table_reference(name.clone())?;
Ok(Some((table_ref.to_string(), span)))
}
_ => Ok(None),
}
}

/// Create a `LogicalPlan` that scans the named relation.
///
/// First tries any registered extension planners. If no extension handles
Expand Down
60 changes: 52 additions & 8 deletions datafusion/sql/src/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use std::collections::HashSet;
use std::collections::{HashMap, HashSet};
use std::ops::ControlFlow;
use std::sync::Arc;

Expand All @@ -29,7 +29,9 @@ use crate::utils::{

use datafusion_common::error::DataFusionErrorBuilder;
use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion};
use datafusion_common::{Column, DFSchema, DFSchemaRef, Result, not_impl_err, plan_err};
use datafusion_common::{
Column, DFSchema, DFSchemaRef, Diagnostic, Result, Span, not_impl_err, plan_err,
};
use datafusion_common::{RecursionUnnestOption, UnnestOptions};
use datafusion_expr::expr::{PlannedReplaceSelectItem, WildcardOptions};
use datafusion_expr::expr_rewriter::{
Expand Down Expand Up @@ -711,21 +713,63 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
self.plan_table_with_joins(input, planner_context)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like the duplicate alias diagnostic is only applied in the multi-entry comma FROM path. A single TableWithJoins still goes straight to plan_table_with_joins, so explicit joins skip this check.

For example, (SELECT 1 AS a) AS t JOIN (SELECT 2 AS b) AS t ON true would not be caught here. Depending on the columns, this could either fall back to a schema error or even plan successfully with duplicate t aliases.

Could we move the alias and span tracking into the relation or join planning path so both comma joins and explicit JOINs go through the same validation and produce consistent diagnostics?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've missed other paths as you've described. I think latest changes cover this. Thanks for catching it @kosiew

}
_ => {
let mut alias_spans: HashMap<String, Option<Span>> = HashMap::new();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this currently extracts only the final identifier for unaliased table factors. That means something like catalog1.schema.person and catalog2.schema.person would both be tracked as person and get flagged as duplicates.

However, DataFusion's scan qualifier uses the full normalized TableReference, so these should actually be treated as distinct.

Would it make sense to reuse object_name_to_table_reference(name.clone())?.to_string() here, or otherwise preserve the full relation identity for unaliased tables?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've missed this actually and added test case and properly use TableReferences


let mut from = from.into_iter();
let first = from.next().unwrap();

if let Some((name, span)) = self.extract_relation_name(&first.relation)? {
alias_spans.entry(name).or_insert(span);
}
for join in &first.joins {
if let Some((name, span)) =
self.extract_relation_name(&join.relation)?
{
alias_spans.entry(name).or_insert(span);
}
}

let mut left = LogicalPlanBuilder::from(
self.plan_table_with_joins(first, planner_context)?,
);

let mut left = LogicalPlanBuilder::from({
let input = from.next().unwrap();
self.plan_table_with_joins(input, planner_context)?
});
let old_outer_from_schema = {
let left_schema = Some(Arc::clone(left.schema()));
planner_context.set_outer_from_schema(left_schema)
};
for input in from {
// Join `input` with the current result (`left`).
let mut current_names = Vec::new();
if let Some(pair) = self.extract_relation_name(&input.relation)? {
current_names.push(pair);
}
for join in &input.joins {
if let Some(pair) = self.extract_relation_name(&join.relation)? {
current_names.push(pair);
}
}

for (name, current_span) in &current_names {
if let Some(prior_span) = alias_spans.get(name.as_str()) {
let mut diagnostic = Diagnostic::new_error(
"duplicate table alias in FROM clause",
*current_span,
);
if let Some(span) = *prior_span {
diagnostic = diagnostic
.with_note("first defined here", Some(span));
}
return plan_err!("duplicate table alias in FROM clause")
.map_err(|e| e.with_diagnostic(diagnostic));
}
}

for (name, span) in current_names {
alias_spans.insert(name, span);
}

let right = self.plan_table_with_joins(input, planner_context)?;

left = left.cross_join(right)?;
// Update the outer FROM schema.
let left_schema = Some(Arc::clone(left.schema()));
planner_context.set_outer_from_schema(left_schema);
}
Expand Down
Loading
Loading