From fdaa32608bf620ea7dd9dfabd4d1fd329fafa3e3 Mon Sep 17 00:00:00 2001 From: Huaijin Date: Wed, 13 May 2026 22:19:52 +0800 Subject: [PATCH 1/3] feat: impl Any for MemoryPool (#21803) ## Which issue does this PR close? - Closes #21802 ## Rationale for this change - see #21802 ## What changes are included in this PR? implment Any for MemoryPool ## Are these changes tested? yes, add one test case ## Are there any user-facing changes? --------- Co-authored-by: Dmitrii Blaginin --- datafusion/execution/src/memory_pool/mod.rs | 27 +++++++++- .../library-user-guide/upgrading/54.0.0.md | 54 +++++++++++++++++++ 2 files changed, 80 insertions(+), 1 deletion(-) diff --git a/datafusion/execution/src/memory_pool/mod.rs b/datafusion/execution/src/memory_pool/mod.rs index 829e313d2381e..2b36ee7f40add 100644 --- a/datafusion/execution/src/memory_pool/mod.rs +++ b/datafusion/execution/src/memory_pool/mod.rs @@ -19,6 +19,7 @@ //! help with allocation accounting. use datafusion_common::{Result, internal_datafusion_err}; +use std::any::Any; use std::fmt::Display; use std::hash::{Hash, Hasher}; use std::{cmp::Ordering, sync::Arc, sync::atomic}; @@ -182,7 +183,7 @@ pub use pool::*; /// /// * [`TrackConsumersPool`]: Wraps another [`MemoryPool`] and tracks consumers, /// providing better error messages on the largest memory users. -pub trait MemoryPool: Send + Sync + std::fmt::Debug + Display { +pub trait MemoryPool: Any + Send + Sync + std::fmt::Debug + Display { /// Return pool name fn name(&self) -> &str; @@ -224,6 +225,18 @@ pub trait MemoryPool: Send + Sync + std::fmt::Debug + Display { } } +impl dyn MemoryPool { + /// Returns `true` if this pool is of type `T`. + pub fn is(&self) -> bool { + (self as &dyn Any).is::() + } + + /// Attempts to downcast this pool to a concrete type `T`. + pub fn downcast_ref(&self) -> Option<&T> { + (self as &dyn Any).downcast_ref() + } +} + /// Memory limit of `MemoryPool` pub enum MemoryLimit { Infinite, @@ -603,6 +616,18 @@ mod tests { assert_eq!(pool.reserved(), 28); } + #[test] + fn test_downcast() { + let pool: Arc = Arc::new(GreedyMemoryPool::new(50)); + + assert!(pool.is::()); + assert!(!pool.is::()); + + let greedy: &GreedyMemoryPool = pool.downcast_ref().unwrap(); + assert_eq!(greedy.reserved(), 0); + assert!(pool.downcast_ref::().is_none()); + } + #[test] fn test_try_shrink() { let pool = Arc::new(GreedyMemoryPool::new(100)) as _; diff --git a/docs/source/library-user-guide/upgrading/54.0.0.md b/docs/source/library-user-guide/upgrading/54.0.0.md index 0ba3e4eb3eaa1..46b768e8340eb 100644 --- a/docs/source/library-user-guide/upgrading/54.0.0.md +++ b/docs/source/library-user-guide/upgrading/54.0.0.md @@ -595,3 +595,57 @@ impl Default for MyTreeNode { } } ``` + +### `MemoryPool` now requires `'static` (adds `Any` as a supertrait) + +To enable downcasting of `dyn MemoryPool` to concrete pool types (via +`is::()` / `downcast_ref::()`), the `MemoryPool` trait now has `Any` +as a supertrait: + +```rust,ignore +// Before +pub trait MemoryPool: Send + Sync + std::fmt::Debug + Display { ... } + +// After +pub trait MemoryPool: Any + Send + Sync + std::fmt::Debug + Display { ... } +``` + +Because `Any` is only implemented for `'static` types, this implicitly adds a +`'static` bound to every `MemoryPool` implementor. + +**Who is affected:** + +- Users who implement a custom `MemoryPool` whose type carries a lifetime + parameter or borrows state (e.g. `struct MyPool<'a> { inner: &'a State }`). + Existing implementations that are already `'static` (the common case) need + no changes. + +**Migration guide:** + +Replace borrowed references with owned handles so the pool type becomes +`'static`. The typical fix is to swap `&'a T` for `Arc` (or `Rc`, or an +owned value): + +```rust,ignore +// Before — not 'static, no longer compiles +struct MyPool<'a> { + inner: &'a SomeState, +} + +impl<'a> MemoryPool for MyPool<'a> { ... } + +// After — owned handle makes MyPool: 'static +struct MyPool { + inner: Arc, +} + +impl MemoryPool for MyPool { ... } +``` + +If the borrowed state truly cannot be made `'static`, you can wrap the +borrowed pool in a `'static` adapter that the pool consumer owns — for +example, store the underlying state in an `Arc` owned by the adapter, or +move the borrow behind an interior-mutability primitive such as `Arc>` +or `Arc>`. + +See [PR #21803](https://github.com/apache/datafusion/pull/21803) for details. From 12bd5b076584b1eeac0e5357afce8ae6d3cd72be Mon Sep 17 00:00:00 2001 From: Emily Matheys <55631053+EmilyMatt@users.noreply.github.com> Date: Wed, 13 May 2026 17:23:32 +0300 Subject: [PATCH 2/3] mem: Cleanup resources of done streams immediately (#22064) ## Which issue does this PR close? - Closes #22063 . ## Rationale for this change Reduces memory pressure, cleans up resources eagerly, and makes pools aware that operators are done by dropping their MemoryReservation and MemoryConsumers. ## What changes are included in this PR? Whenever a stream is polled and returns None(is depleted), drops that stream, or replaces it with EmptyRecordBatchStream. ## Are these changes tested? This should have no effect on logic, as the streams are already depleted. ## Are there any user-facing changes? No, users implementing their own memory pool can expect to see the consumer count drop whenever a stream is released, but that is well within parameters and I don't think is considered a change at all. --- .../src/aggregates/no_grouping.rs | 4 ++ .../physical-plan/src/aggregates/row_hash.rs | 10 ++++ .../src/aggregates/topk_stream.rs | 4 ++ datafusion/physical-plan/src/analyze.rs | 1 + datafusion/physical-plan/src/async_func.rs | 6 ++- .../physical-plan/src/coalesce_batches.rs | 6 +++ datafusion/physical-plan/src/filter.rs | 8 +++ .../physical-plan/src/joins/cross_join.rs | 8 ++- .../src/joins/hash_join/stream.rs | 6 +++ .../src/joins/nested_loop_join.rs | 6 +++ .../piecewise_merge_join/classic_join.rs | 4 ++ .../joins/sort_merge_join/bitwise_stream.rs | 16 ++++-- .../sort_merge_join/materializing_stream.rs | 14 +++++ .../src/joins/symmetric_hash_join.rs | 18 +++++++ .../physical-plan/src/repartition/mod.rs | 8 ++- .../physical-plan/src/sorts/partial_sort.rs | 7 +++ .../src/sorts/partitioned_topk.rs | 2 + datafusion/physical-plan/src/sorts/sort.rs | 2 + .../src/spill/replayable_spill_input.rs | 3 ++ datafusion/physical-plan/src/stream.rs | 53 +++++++++++++++++-- datafusion/physical-plan/src/unnest.rs | 10 ++++ .../src/windows/bounded_window_agg_exec.rs | 5 ++ .../src/windows/window_agg_exec.rs | 5 ++ 23 files changed, 194 insertions(+), 12 deletions(-) diff --git a/datafusion/physical-plan/src/aggregates/no_grouping.rs b/datafusion/physical-plan/src/aggregates/no_grouping.rs index a7dd7c9a66cb1..ac7727b459300 100644 --- a/datafusion/physical-plan/src/aggregates/no_grouping.rs +++ b/datafusion/physical-plan/src/aggregates/no_grouping.rs @@ -23,6 +23,7 @@ use crate::aggregates::{ finalize_aggregation, }; use crate::metrics::{BaselineMetrics, RecordOutput}; +use crate::stream::EmptyRecordBatchStream; use crate::{RecordBatchStream, SendableRecordBatchStream}; use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch; @@ -370,6 +371,9 @@ impl AggregateStream { Some(Err(e)) => Err(e), None => { this.finished = true; + // Release the input pipeline's resources before finalization. + let input_schema = this.input.schema(); + this.input = Box::pin(EmptyRecordBatchStream::new(input_schema)); let timer = this.baseline_metrics.elapsed_compute().timer(); let result = finalize_aggregation(&mut this.accumulators, &this.mode) diff --git a/datafusion/physical-plan/src/aggregates/row_hash.rs b/datafusion/physical-plan/src/aggregates/row_hash.rs index a65aaf9134fe8..a55cf09c79b0a 100644 --- a/datafusion/physical-plan/src/aggregates/row_hash.rs +++ b/datafusion/physical-plan/src/aggregates/row_hash.rs @@ -33,6 +33,7 @@ use crate::aggregates::{ use crate::metrics::{BaselineMetrics, MetricBuilder, MetricCategory, RecordOutput}; use crate::sorts::streaming_merge::{SortedSpillFile, StreamingMergeBuilder}; use crate::spill::spill_manager::{GetSlicedSize, SpillManager}; +use crate::stream::EmptyRecordBatchStream; use crate::{PhysicalExpr, aggregates, metrics}; use crate::{RecordBatchStream, SendableRecordBatchStream}; @@ -837,6 +838,10 @@ impl Stream for GroupedHashAggregateStream { self.group_values.len() ))); } + // Release the input pipeline's resources. + let input_schema = self.input.schema(); + self.input = + Box::pin(EmptyRecordBatchStream::new(input_schema)); self.exec_state = ExecutionState::Done; } } @@ -1320,6 +1325,11 @@ impl GroupedHashAggregateStream { fn set_input_done_and_produce_output(&mut self) -> Result<()> { self.input_done = true; self.group_ordering.input_done(); + // Release the original input pipeline's resources now that we're done + // reading from it. In the spill branch below, `self.input` is replaced + // again with a stream that merges spill files. + let input_schema = self.input.schema(); + self.input = Box::pin(EmptyRecordBatchStream::new(input_schema)); let elapsed_compute = self.baseline_metrics.elapsed_compute().clone(); let timer = elapsed_compute.timer(); self.exec_state = if self.spill_state.spills.is_empty() { diff --git a/datafusion/physical-plan/src/aggregates/topk_stream.rs b/datafusion/physical-plan/src/aggregates/topk_stream.rs index 4aa566ccfcd0a..9128844f1d1ef 100644 --- a/datafusion/physical-plan/src/aggregates/topk_stream.rs +++ b/datafusion/physical-plan/src/aggregates/topk_stream.rs @@ -26,6 +26,7 @@ use crate::aggregates::{ evaluate_many, }; use crate::metrics::BaselineMetrics; +use crate::stream::EmptyRecordBatchStream; use crate::{RecordBatchStream, SendableRecordBatchStream}; use arrow::array::{Array, ArrayRef, RecordBatch}; use arrow::datatypes::SchemaRef; @@ -205,6 +206,9 @@ impl Stream for GroupedTopKAggregateStream { } // inner is done, emit all rows and switch to producing output None => { + // Release the input pipeline's resources before emitting. + let input_schema = self.input.schema(); + self.input = Box::pin(EmptyRecordBatchStream::new(input_schema)); if self.priority_map.is_empty() { trace!("partition {} emit None", self.partition); return Poll::Ready(None); diff --git a/datafusion/physical-plan/src/analyze.rs b/datafusion/physical-plan/src/analyze.rs index 491a0872a2f97..ea3abf439e4c1 100644 --- a/datafusion/physical-plan/src/analyze.rs +++ b/datafusion/physical-plan/src/analyze.rs @@ -214,6 +214,7 @@ impl ExecutionPlan for AnalyzeExec { while let Some(batch) = input_stream.next().await.transpose()? { total_rows += batch.num_rows(); } + drop(input_stream); let duration = Instant::now() - start; create_output_batch( diff --git a/datafusion/physical-plan/src/async_func.rs b/datafusion/physical-plan/src/async_func.rs index 76a68bf5708db..8ad4ecb096962 100644 --- a/datafusion/physical-plan/src/async_func.rs +++ b/datafusion/physical-plan/src/async_func.rs @@ -17,7 +17,7 @@ use crate::coalesce::LimitedBatchCoalescer; use crate::metrics::{ExecutionPlanMetricsSet, MetricsSet}; -use crate::stream::RecordBatchStreamAdapter; +use crate::stream::{EmptyRecordBatchStream, RecordBatchStreamAdapter}; use crate::{ DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, PlanProperties, check_if_same_properties, @@ -290,6 +290,10 @@ impl Stream for CoalesceInputStream { } None => { completed = true; + // Release the input pipeline's resources. + let input_schema = self.input_stream.schema(); + self.input_stream = + Box::pin(EmptyRecordBatchStream::new(input_schema)); if let Err(err) = self.batch_coalescer.finish() { return Poll::Ready(Some(Err(err))); } diff --git a/datafusion/physical-plan/src/coalesce_batches.rs b/datafusion/physical-plan/src/coalesce_batches.rs index 2bf046f03b6cf..34cd770260915 100644 --- a/datafusion/physical-plan/src/coalesce_batches.rs +++ b/datafusion/physical-plan/src/coalesce_batches.rs @@ -24,6 +24,7 @@ use std::task::{Context, Poll}; use super::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}; use super::{DisplayAs, ExecutionPlanProperties, PlanProperties, Statistics}; use crate::projection::ProjectionExec; +use crate::stream::EmptyRecordBatchStream; use crate::{ DisplayFormatType, ExecutionPlan, RecordBatchStream, SendableRecordBatchStream, check_if_same_properties, @@ -343,6 +344,8 @@ impl CoalesceBatchesStream { None => { // Input stream is exhausted, finalize any remaining batches self.completed = true; + self.input = + Box::pin(EmptyRecordBatchStream::new(self.coalescer.schema())); self.coalescer.finish()?; } Some(Ok(batch)) => { @@ -353,6 +356,9 @@ impl CoalesceBatchesStream { PushBatchStatus::LimitReached => { // limit was reached, so stop early self.completed = true; + self.input = Box::pin(EmptyRecordBatchStream::new( + self.coalescer.schema(), + )); self.coalescer.finish()?; } } diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index 1119d1b240788..c485e181f3826 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -42,6 +42,7 @@ use crate::projection::{ EmbeddedProjection, ProjectionExec, ProjectionExpr, make_with_child, try_embed_projection, update_expr, }; +use crate::stream::EmptyRecordBatchStream; use crate::{ DisplayFormatType, ExecutionPlan, metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, RatioMetrics}, @@ -1030,6 +1031,9 @@ impl Stream for FilterExecStream { match ready!(self.input.poll_next_unpin(cx)) { None => { self.batch_coalescer.finish()?; + // Release the input pipeline's resources. + let input_schema = self.input.schema(); + self.input = Box::pin(EmptyRecordBatchStream::new(input_schema)); // continue draining the coalescer } Some(Ok(batch)) => { @@ -1070,6 +1074,10 @@ impl Stream for FilterExecStream { PushBatchStatus::LimitReached => { // limit was reached, so stop early self.batch_coalescer.finish()?; + // Release the input pipeline's resources. + let input_schema = self.input.schema(); + self.input = + Box::pin(EmptyRecordBatchStream::new(input_schema)); // continue draining the coalescer } } diff --git a/datafusion/physical-plan/src/joins/cross_join.rs b/datafusion/physical-plan/src/joins/cross_join.rs index 3027fb130f087..ab66955dc6034 100644 --- a/datafusion/physical-plan/src/joins/cross_join.rs +++ b/datafusion/physical-plan/src/joins/cross_join.rs @@ -31,6 +31,7 @@ use crate::projection::{ ProjectionExec, join_allows_pushdown, join_table_borders, new_join_children, physical_to_column_exprs, }; +use crate::stream::EmptyRecordBatchStream; use crate::{ ColumnStatistics, DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, ExecutionPlanProperties, PlanProperties, RecordBatchStream, @@ -645,7 +646,12 @@ impl CrossJoinStream { let right_data = match ready!(self.right.poll_next_unpin(cx)) { Some(Ok(right_data)) => right_data, Some(Err(e)) => return Poll::Ready(Err(e)), - None => return Poll::Ready(Ok(StatefulStreamResult::Ready(None))), + None => { + // Release the right (probe) input pipeline's resources. + let right_schema = self.right.schema(); + self.right = Box::pin(EmptyRecordBatchStream::new(right_schema)); + return Poll::Ready(Ok(StatefulStreamResult::Ready(None))); + } }; self.join_metrics.input_batches.add(1); self.join_metrics.input_rows.add(right_data.num_rows()); diff --git a/datafusion/physical-plan/src/joins/hash_join/stream.rs b/datafusion/physical-plan/src/joins/hash_join/stream.rs index 9885fb5c5c70a..040470c9be12b 100644 --- a/datafusion/physical-plan/src/joins/hash_join/stream.rs +++ b/datafusion/physical-plan/src/joins/hash_join/stream.rs @@ -35,6 +35,7 @@ use crate::joins::hash_join::shared_bounds::{ use crate::joins::utils::{ OnceFut, equal_rows_arr, get_final_indices_from_shared_bitmap, }; +use crate::stream::EmptyRecordBatchStream; use crate::{ RecordBatchStream, SendableRecordBatchStream, handle_state, hash_utils::create_hashes, @@ -587,6 +588,11 @@ impl HashJoinStream { ) -> Poll>>> { match ready!(self.right.poll_next_unpin(cx)) { None => { + // Release the probe-side input pipeline's resources. The schema + // is preserved so callers that still query `self.right.schema()` + // (e.g. for unmatched-build emission) keep working. + let right_schema = self.right.schema(); + self.right = Box::pin(EmptyRecordBatchStream::new(right_schema)); self.state = HashJoinStreamState::ExhaustedProbeSide; } Some(Ok(batch)) => { diff --git a/datafusion/physical-plan/src/joins/nested_loop_join.rs b/datafusion/physical-plan/src/joins/nested_loop_join.rs index db8c75b4a578b..feaf344200ac1 100644 --- a/datafusion/physical-plan/src/joins/nested_loop_join.rs +++ b/datafusion/physical-plan/src/joins/nested_loop_join.rs @@ -1611,6 +1611,12 @@ impl NestedLoopJoinStream { } } + // If the left stream is fully exhausted, release its resources so the + // upstream pipeline can be torn down before we move on to probing. + if self.left_exhausted { + active.left_stream = None; + } + if active.pending_batches.is_empty() { // No data at all — go directly to Done self.left_exhausted = true; diff --git a/datafusion/physical-plan/src/joins/piecewise_merge_join/classic_join.rs b/datafusion/physical-plan/src/joins/piecewise_merge_join/classic_join.rs index da0d21f046daa..36a043cc7d16b 100644 --- a/datafusion/physical-plan/src/joins/piecewise_merge_join/classic_join.rs +++ b/datafusion/physical-plan/src/joins/piecewise_merge_join/classic_join.rs @@ -39,6 +39,7 @@ use crate::joins::piecewise_merge_join::exec::{BufferedSide, BufferedSideReadySt use crate::joins::piecewise_merge_join::utils::need_produce_result_in_final; use crate::joins::utils::{BuildProbeJoinMetrics, StatefulStreamResult}; use crate::joins::utils::{JoinKeyComparator, get_final_indices_from_shared_bitmap}; +use crate::stream::EmptyRecordBatchStream; pub(super) enum PiecewiseMergeJoinStreamState { WaitBufferedSide, @@ -212,6 +213,9 @@ impl ClassicPWMJStream { ) -> Poll>>> { match ready!(self.streamed.poll_next_unpin(cx)) { None => { + // Release the streamed input pipeline's resources. + let streamed_schema = self.streamed.schema(); + self.streamed = Box::pin(EmptyRecordBatchStream::new(streamed_schema)); if self .buffered_side .try_as_ready_mut()? diff --git a/datafusion/physical-plan/src/joins/sort_merge_join/bitwise_stream.rs b/datafusion/physical-plan/src/joins/sort_merge_join/bitwise_stream.rs index 3b409c98b2cf4..ad7312426bd18 100644 --- a/datafusion/physical-plan/src/joins/sort_merge_join/bitwise_stream.rs +++ b/datafusion/physical-plan/src/joins/sort_merge_join/bitwise_stream.rs @@ -125,12 +125,12 @@ use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; -use crate::RecordBatchStream; use crate::joins::utils::{JoinFilter, JoinKeyComparator, compare_join_arrays}; use crate::metrics::{ BaselineMetrics, Count, ExecutionPlanMetricsSet, Gauge, MetricBuilder, }; use crate::spill::spill_manager::SpillManager; +use crate::{EmptyRecordBatchStream, RecordBatchStream}; use arrow::array::{Array, ArrayRef, BooleanArray, BooleanBufferBuilder, RecordBatch}; use arrow::compute::{BatchCoalescer, SortOptions, filter_record_batch, not}; use arrow::datatypes::SchemaRef; @@ -475,7 +475,12 @@ impl BitwiseSortMergeJoinStream { fn poll_next_outer_batch(&mut self, cx: &mut Context<'_>) -> Poll> { loop { match ready!(self.outer.poll_next_unpin(cx)) { - None => return Poll::Ready(Ok(false)), + None => { + // Release the outer input pipeline's resources. + let outer_schema = self.outer.schema(); + self.outer = Box::pin(EmptyRecordBatchStream::new(outer_schema)); + return Poll::Ready(Ok(false)); + } Some(Err(e)) => return Poll::Ready(Err(e)), Some(Ok(batch)) => { let batch_num_rows = batch.num_rows(); @@ -503,7 +508,12 @@ impl BitwiseSortMergeJoinStream { fn poll_next_inner_batch(&mut self, cx: &mut Context<'_>) -> Poll> { loop { match ready!(self.inner.poll_next_unpin(cx)) { - None => return Poll::Ready(Ok(false)), + None => { + // Release the inner input pipeline's resources. + let inner_schema = self.inner.schema(); + self.inner = Box::pin(EmptyRecordBatchStream::new(inner_schema)); + return Poll::Ready(Ok(false)); + } Some(Err(e)) => return Poll::Ready(Err(e)), Some(Ok(batch)) => { let batch_num_rows = batch.num_rows(); diff --git a/datafusion/physical-plan/src/joins/sort_merge_join/materializing_stream.rs b/datafusion/physical-plan/src/joins/sort_merge_join/materializing_stream.rs index 069e94d0a9fd6..5d23046ec7726 100644 --- a/datafusion/physical-plan/src/joins/sort_merge_join/materializing_stream.rs +++ b/datafusion/physical-plan/src/joins/sort_merge_join/materializing_stream.rs @@ -41,6 +41,7 @@ use crate::joins::sort_merge_join::metrics::SortMergeJoinMetrics; use crate::joins::utils::{JoinFilter, JoinKeyComparator}; use crate::metrics::RecordOutput; use crate::spill::spill_manager::SpillManager; +use crate::stream::EmptyRecordBatchStream; use crate::{PhysicalExpr, RecordBatchStream, SendableRecordBatchStream}; use arrow::array::{types::UInt64Type, *}; @@ -935,6 +936,10 @@ impl MaterializingSortMergeJoinStream { return Poll::Pending; } Poll::Ready(None) => { + // Release the streamed input pipeline's resources. + let streamed_schema = self.streamed.schema(); + self.streamed = + Box::pin(EmptyRecordBatchStream::new(streamed_schema)); self.streamed_state = StreamedState::Exhausted; } Poll::Ready(Some(batch)) => { @@ -1063,6 +1068,10 @@ impl MaterializingSortMergeJoinStream { return Poll::Pending; } Poll::Ready(None) => { + // Release the buffered input pipeline's resources. + let buffered_schema = self.buffered.schema(); + self.buffered = + Box::pin(EmptyRecordBatchStream::new(buffered_schema)); self.buffered_state = BufferedState::Exhausted; return Poll::Ready(None); } @@ -1106,6 +1115,11 @@ impl MaterializingSortMergeJoinStream { return Poll::Pending; } Poll::Ready(None) => { + // Release the buffered input pipeline's resources. + let buffered_schema = self.buffered.schema(); + self.buffered = Box::pin(EmptyRecordBatchStream::new( + buffered_schema, + )); self.buffered_state = BufferedState::Ready; } Poll::Ready(Some(batch)) => { diff --git a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs index 11e036434ee97..34af88ea4027b 100644 --- a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs +++ b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs @@ -50,6 +50,7 @@ use crate::projection::{ ProjectionExec, join_allows_pushdown, join_table_borders, new_join_children, physical_to_column_exprs, update_join_filter, update_join_on, }; +use crate::stream::EmptyRecordBatchStream; use crate::{ DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, ExecutionPlanProperties, PlanProperties, RecordBatchStream, SendableRecordBatchStream, @@ -1406,6 +1407,19 @@ impl SymmetricHashJoinStream { } } } + + /// Release the right input pipeline's resources. + fn cleanup_depleted_right_stream(&mut self) { + let right_schema = self.right_stream.schema(); + self.right_stream = Box::pin(EmptyRecordBatchStream::new(right_schema)); + } + + /// Release the left input pipeline's resources. + fn cleanup_depleted_left_stream(&mut self) { + let left_schema = self.left_stream.schema(); + self.left_stream = Box::pin(EmptyRecordBatchStream::new(left_schema)); + } + /// Asynchronously pulls the next batch from the right stream. /// /// This default implementation checks for the next value in the right stream. @@ -1429,6 +1443,7 @@ impl SymmetricHashJoinStream { } Some(Err(e)) => Poll::Ready(Err(e)), None => { + self.cleanup_depleted_right_stream(); self.set_state(SHJStreamState::RightExhausted); Poll::Ready(Ok(StatefulStreamResult::Continue)) } @@ -1458,6 +1473,7 @@ impl SymmetricHashJoinStream { } Some(Err(e)) => Poll::Ready(Err(e)), None => { + self.cleanup_depleted_left_stream(); self.set_state(SHJStreamState::LeftExhausted); Poll::Ready(Ok(StatefulStreamResult::Continue)) } @@ -1487,6 +1503,7 @@ impl SymmetricHashJoinStream { } Some(Err(e)) => Poll::Ready(Err(e)), None => { + self.cleanup_depleted_left_stream(); self.set_state(SHJStreamState::BothExhausted { final_result: false, }); @@ -1518,6 +1535,7 @@ impl SymmetricHashJoinStream { } Some(Err(e)) => Poll::Ready(Err(e)), None => { + self.cleanup_depleted_right_stream(); self.set_state(SHJStreamState::BothExhausted { final_result: false, }); diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index b4af6e2c09a5c..873ba8a5ee487 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -38,7 +38,7 @@ use crate::projection::{ProjectionExec, all_columns, make_with_child, update_exp use crate::sorts::streaming_merge::StreamingMergeBuilder; use crate::spill::spill_manager::SpillManager; use crate::spill::spill_pool::{self, SpillPoolWriter}; -use crate::stream::RecordBatchStreamAdapter; +use crate::stream::{EmptyRecordBatchStream, RecordBatchStreamAdapter}; use crate::{ DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties, Statistics, check_if_same_properties, @@ -1757,7 +1757,11 @@ impl PerPartitionStream { return Poll::Ready(Some(Err(e))); } Poll::Ready(None) => { - // Spill stream ended, keep draining the memory channel + // Spill stream ended — release its resources before + // we go back to draining the memory channel. + let spill_schema = self.spill_stream.schema(); + self.spill_stream = + Box::pin(EmptyRecordBatchStream::new(spill_schema)); self.state = StreamState::ReadingMemory; } Poll::Pending => { diff --git a/datafusion/physical-plan/src/sorts/partial_sort.rs b/datafusion/physical-plan/src/sorts/partial_sort.rs index 28b8745235918..abd9ebb142a66 100644 --- a/datafusion/physical-plan/src/sorts/partial_sort.rs +++ b/datafusion/physical-plan/src/sorts/partial_sort.rs @@ -58,6 +58,7 @@ use std::task::{Context, Poll}; use crate::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}; use crate::sorts::sort::sort_batch; +use crate::stream::EmptyRecordBatchStream; use crate::{ DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, ExecutionPlanProperties, Partitioning, PlanProperties, SendableRecordBatchStream, Statistics, @@ -403,6 +404,9 @@ impl PartialSortStream { // Check if we've already reached the fetch limit if self.fetch == Some(0) { self.is_closed = true; + // Release the input pipeline's resources. + let input_schema = self.input.schema(); + self.input = Box::pin(EmptyRecordBatchStream::new(input_schema)); return Poll::Ready(None); } @@ -436,6 +440,9 @@ impl PartialSortStream { Some(Err(e)) => return Poll::Ready(Some(Err(e))), None => { self.is_closed = true; + // Release the input pipeline's resources before sorting. + let input_schema = self.input.schema(); + self.input = Box::pin(EmptyRecordBatchStream::new(input_schema)); // Once input is consumed, sort the rest of the inserted batches let remaining_batch = self.sort_in_mem_batch()?; return if remaining_batch.num_rows() > 0 { diff --git a/datafusion/physical-plan/src/sorts/partitioned_topk.rs b/datafusion/physical-plan/src/sorts/partitioned_topk.rs index f5d47f503bf9b..f4c2585ea790d 100644 --- a/datafusion/physical-plan/src/sorts/partitioned_topk.rs +++ b/datafusion/physical-plan/src/sorts/partitioned_topk.rs @@ -495,6 +495,8 @@ async fn do_partitioned_topk( topk.insert_batch(sub_batch)?; } } + // Release the input pipeline now that accumulation is complete. + drop(input); // ---------- Emit phase ---------- // Sort partition keys so output is ordered by (partition_keys, order_keys). diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index 6c02af8dec6d3..044580a86f7f3 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -1240,6 +1240,7 @@ impl ExecutionPlan for SortExec { break; } } + drop(input); topk.emit() }) .try_flatten(), @@ -1264,6 +1265,7 @@ impl ExecutionPlan for SortExec { let batch = batch?; sorter.insert_batch(batch).await?; } + drop(input); sorter.sort().await }) .try_flatten(), diff --git a/datafusion/physical-plan/src/spill/replayable_spill_input.rs b/datafusion/physical-plan/src/spill/replayable_spill_input.rs index ddef15a639183..fea998d268c59 100644 --- a/datafusion/physical-plan/src/spill/replayable_spill_input.rs +++ b/datafusion/physical-plan/src/spill/replayable_spill_input.rs @@ -282,6 +282,9 @@ impl Stream for ReplayableSpillStream { } // The stream is exhausted, give the inner state ownership back to `ReplayableStreamSource` Poll::Ready(None) => { + // Release the input pipeline's resources. + let inner_schema = this.inner.schema(); + this.inner = Box::pin(EmptyRecordBatchStream::new(inner_schema)); if let Some(spill_file) = this.spill_file.as_mut() { match spill_file.finish() { Ok(file) => { diff --git a/datafusion/physical-plan/src/stream.rs b/datafusion/physical-plan/src/stream.rs index 9139a6dd04799..9d0b964886afd 100644 --- a/datafusion/physical-plan/src/stream.rs +++ b/datafusion/physical-plan/src/stream.rs @@ -411,8 +411,11 @@ pin_project! { pub struct RecordBatchStreamAdapter { schema: SchemaRef, + // Wrapped in Option so we can drop the inner stream as soon as it + // returns `None`, releasing any upstream pipeline resources before the + // adapter itself is dropped. #[pin] - stream: S, + stream: Option, } } @@ -441,7 +444,10 @@ impl RecordBatchStreamAdapter { /// // ... /// ``` pub fn new(schema: SchemaRef, stream: S) -> Self { - Self { schema, stream } + Self { + schema, + stream: Some(stream), + } } } @@ -460,11 +466,29 @@ where type Item = Result; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.project().stream.poll_next(cx) + let mut this = self.project(); + let Some(inner) = this.stream.as_mut().as_pin_mut() else { + return Poll::Ready(None); + }; + let item = ready!(inner.poll_next(cx)); + if item.is_none() { + // Drop the inner stream in place to release its resources. + // SAFETY: the inner stream is dropped without moving it out of + // its pinned location; assigning `None` only runs the inner + // value's destructor in place, which is permitted for pinned + // values. + unsafe { + *this.stream.as_mut().get_unchecked_mut() = None; + } + } + Poll::Ready(item) } fn size_hint(&self) -> (usize, Option) { - self.stream.size_hint() + match self.stream.as_ref() { + Some(stream) => stream.size_hint(), + None => (0, Some(0)), + } } } @@ -538,6 +562,7 @@ impl ObservedStream { let Some(fetch) = self.fetch else { return poll }; if self.produced >= fetch { + self.release_inner(); return Poll::Ready(None); } @@ -545,12 +570,22 @@ impl ObservedStream { if self.produced + batch.num_rows() > fetch { let batch = batch.slice(0, fetch.saturating_sub(self.produced)); self.produced += batch.num_rows(); + if self.produced >= fetch { + self.release_inner(); + } return Poll::Ready(Some(Ok(batch))); }; self.produced += batch.num_rows() } poll } + + /// Replace the inner stream with an [`EmptyRecordBatchStream`], dropping + /// the original stream so its upstream pipeline can be torn down. + fn release_inner(&mut self) { + let schema = self.inner.schema(); + self.inner = Box::pin(EmptyRecordBatchStream::new(schema)); + } } impl RecordBatchStream for ObservedStream { @@ -678,7 +713,12 @@ impl BatchSplitStream { } } Some(Err(e)) => Poll::Ready(Some(Err(e))), - None => Poll::Ready(None), + None => { + // Release the input pipeline's resources. + let input_schema = self.input.schema(); + self.input = Box::pin(EmptyRecordBatchStream::new(input_schema)); + Poll::Ready(None) + } } } } @@ -751,6 +791,9 @@ impl Stream for ReservationStream { None => { // Stream is done so free the reservation completely self.reservation.free(); + // Release the input pipeline's resources. + let inner_schema = self.inner.schema(); + self.inner = Box::pin(EmptyRecordBatchStream::new(inner_schema)); Poll::Ready(None) } } diff --git a/datafusion/physical-plan/src/unnest.rs b/datafusion/physical-plan/src/unnest.rs index c774ff09af33c..3a4b9d7232f4d 100644 --- a/datafusion/physical-plan/src/unnest.rs +++ b/datafusion/physical-plan/src/unnest.rs @@ -26,6 +26,7 @@ use super::metrics::{ MetricsSet, RecordOutput, }; use super::{DisplayAs, ExecutionPlanProperties, PlanProperties}; +use crate::stream::EmptyRecordBatchStream; use crate::{ DisplayFormatType, Distribution, ExecutionPlan, RecordBatchStream, SendableRecordBatchStream, check_if_same_properties, @@ -382,6 +383,7 @@ impl UnnestStream { debug_assert!(result_batch.num_rows() > 0); Some(Ok(result_batch)) } + // If the stream is depleted or returned an error, log the finish message: other => { trace!( "Processed {} probe-side input batches containing {} rows and \ @@ -392,6 +394,14 @@ impl UnnestStream { self.metrics.baseline_metrics.output_rows(), self.metrics.baseline_metrics.elapsed_compute(), ); + + // In the non-error case, i.e., input is simply depleted: + if other.is_none() { + // Release the input pipeline's resources. + let input_schema = self.input.schema(); + self.input = Box::pin(EmptyRecordBatchStream::new(input_schema)); + } + other } }); diff --git a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs index 14f8ce5e95ffd..f442bcea94be2 100644 --- a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs +++ b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs @@ -28,6 +28,7 @@ use std::task::{Context, Poll}; use super::utils::create_schema; use crate::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}; +use crate::stream::EmptyRecordBatchStream; use crate::windows::{ calc_requirements, get_ordered_partition_by_indices, get_partition_by_sort_exprs, window_equivalence_properties, @@ -1083,6 +1084,10 @@ impl BoundedWindowAggStream { let _timer = elapsed_compute.timer(); self.finished = true; + // Release the input pipeline's resources before computing the + // final aggregates. + let input_schema = self.input.schema(); + self.input = Box::pin(EmptyRecordBatchStream::new(input_schema)); for (_, partition_batch_state) in self.partition_buffers.iter_mut() { partition_batch_state.is_end = true; } diff --git a/datafusion/physical-plan/src/windows/window_agg_exec.rs b/datafusion/physical-plan/src/windows/window_agg_exec.rs index 5098c84034062..9e8fc8a6ebb62 100644 --- a/datafusion/physical-plan/src/windows/window_agg_exec.rs +++ b/datafusion/physical-plan/src/windows/window_agg_exec.rs @@ -24,6 +24,7 @@ use std::task::{Context, Poll}; use super::utils::create_schema; use crate::execution_plan::{CardinalityEffect, EmissionType}; use crate::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}; +use crate::stream::EmptyRecordBatchStream; use crate::windows::{ calc_requirements, get_ordered_partition_by_indices, get_partition_by_sort_exprs, window_equivalence_properties, @@ -444,6 +445,10 @@ impl WindowAggStream { } Some(Err(e)) => Err(e), None => { + // Release the input pipeline's resources before computing + // the final aggregates. + let input_schema = self.input.schema(); + self.input = Box::pin(EmptyRecordBatchStream::new(input_schema)); let Some(result) = self.compute_aggregates()? else { return Poll::Ready(None); }; From 0c4ace8b77461cac3538d38b1122a7dbf08c4d4f Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 13 May 2026 08:24:19 -0600 Subject: [PATCH 3/3] feat: Upgrade to sqlparser-rs 0.62.0 (#22069) ## Which issue does this PR close? N/A ## Rationale for this change Dependency update ## What changes are included in this PR? ## Are these changes tested? ## Are there any user-facing changes? --- Cargo.lock | 4 +- Cargo.toml | 2 +- datafusion/expr/src/expr.rs | 4 +- datafusion/expr/src/sql.rs | 35 +++++- datafusion/expr/src/utils.rs | 33 ++++- datafusion/sql/src/expr/function.rs | 15 ++- datafusion/sql/src/expr/mod.rs | 8 +- datafusion/sql/src/query.rs | 1 + datafusion/sql/src/select.rs | 6 + datafusion/sql/src/statement.rs | 116 ++++++++++++++++-- datafusion/sql/src/unparser/ast.rs | 6 +- datafusion/sql/src/unparser/expr.rs | 12 +- datafusion/sql/src/unparser/plan.rs | 3 + datafusion/sql/src/values.rs | 3 +- .../test_files/array/array_transform.slt | 6 +- 15 files changed, 213 insertions(+), 41 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d3a6442596390..40d920459719c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5737,9 +5737,9 @@ dependencies = [ [[package]] name = "sqlparser" -version = "0.61.0" +version = "0.62.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dbf5ea8d4d7c808e1af1cbabebca9a2abe603bcefc22294c5b95018d53200cb7" +checksum = "13c6d1b651dc4edf07eead2a0c6c78016ce971bc2c10da5266861b13f25e7cec" dependencies = [ "log", "recursive", diff --git a/Cargo.toml b/Cargo.toml index 2a573d80b3aa3..78c271d524fb8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -192,7 +192,7 @@ regex = "1.12" rstest = "0.26.1" serde_json = "1" sha2 = "^0.11.0" -sqlparser = { version = "0.61.0", default-features = false, features = ["std", "visitor"] } +sqlparser = { version = "0.62.0", default-features = false, features = ["std", "visitor"] } strum = "0.28.0" strum_macros = "0.28.0" tempfile = "3" diff --git a/datafusion/expr/src/expr.rs b/datafusion/expr/src/expr.rs index 8c58e8f709600..36ef6cf1f5ba9 100644 --- a/datafusion/expr/src/expr.rs +++ b/datafusion/expr/src/expr.rs @@ -4014,8 +4014,8 @@ mod test { wildcard_with_options(wildcard_options( None, Some(ExcludeSelectItem::Multiple(vec![ - Ident::from("c1"), - Ident::from("c2") + Ident::from("c1").into(), + Ident::from("c2").into() ])), None, None, diff --git a/datafusion/expr/src/sql.rs b/datafusion/expr/src/sql.rs index c9c3fa533614e..d582a0f6b95d1 100644 --- a/datafusion/expr/src/sql.rs +++ b/datafusion/expr/src/sql.rs @@ -45,8 +45,8 @@ impl Display for IlikeSelectItem { #[derive(Clone, PartialEq, Eq, PartialOrd, Hash, Debug)] pub enum ExcludeSelectItem { - Single(Ident), - Multiple(Vec), + Single(ObjectName), + Multiple(Vec), } impl Display for ExcludeSelectItem { @@ -64,6 +64,37 @@ impl Display for ExcludeSelectItem { } } +#[derive(Clone, PartialEq, Eq, PartialOrd, Hash, Debug)] +pub struct ObjectName(pub Vec); + +impl Display for ObjectName { + fn fmt(&self, f: &mut Formatter) -> fmt::Result { + let parts: Vec = self.0.iter().map(|p| format!("{p}")).collect(); + write!(f, "{}", parts.join(".")) + } +} + +#[derive(Clone, PartialEq, Eq, PartialOrd, Hash, Debug)] +pub enum ObjectNamePart { + Identifier(Ident), +} + +impl ObjectNamePart { + pub fn as_ident(&self) -> Option<&Ident> { + match self { + ObjectNamePart::Identifier(ident) => Some(ident), + } + } +} + +impl Display for ObjectNamePart { + fn fmt(&self, f: &mut Formatter) -> fmt::Result { + match self { + ObjectNamePart::Identifier(ident) => write!(f, "{ident}"), + } + } +} + #[derive(Clone, PartialEq, Eq, PartialOrd, Hash, Debug)] pub struct ExceptSelectItem { pub first_element: Ident, diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs index 659306d3bf625..22abb454d4e6b 100644 --- a/datafusion/expr/src/utils.rs +++ b/datafusion/expr/src/utils.rs @@ -39,10 +39,10 @@ use datafusion_common::{ }; #[cfg(not(feature = "sql"))] -use crate::sql::{ExceptSelectItem, ExcludeSelectItem}; +use crate::sql::{ExceptSelectItem, ExcludeSelectItem, Ident, ObjectName}; use indexmap::IndexSet; #[cfg(feature = "sql")] -use sqlparser::ast::{ExceptSelectItem, ExcludeSelectItem}; +use sqlparser::ast::{ExceptSelectItem, ExcludeSelectItem, Ident, ObjectName}; pub use datafusion_functions_aggregate_common::order::AggregateOrderSensitivity; @@ -339,11 +339,32 @@ fn get_excluded_columns( idents.push(&excepts.first_element); idents.extend(&excepts.additional_elements); } + // Declared outside the `if let` so `idents.extend(exclude_owned.iter())` + // below can borrow references that outlive the inner scope. + let exclude_owned: Vec; if let Some(exclude) = opt_exclude { - match exclude { - ExcludeSelectItem::Single(ident) => idents.push(ident), - ExcludeSelectItem::Multiple(idents_inner) => idents.extend(idents_inner), - } + let object_name_to_ident = |name: &ObjectName| -> Result { + if name.0.len() != 1 { + return plan_err!( + "EXCLUDE with multi-part identifiers is not supported: {name}" + ); + } + let part = &name.0[0]; + let Some(ident) = part.as_ident() else { + return plan_err!( + "EXCLUDE with non-identifier name part is not supported: {part}" + ); + }; + Ok(ident.clone()) + }; + exclude_owned = match exclude { + ExcludeSelectItem::Single(name) => vec![object_name_to_ident(name)?], + ExcludeSelectItem::Multiple(names) => names + .iter() + .map(object_name_to_ident) + .collect::>>()?, + }; + idents.extend(exclude_owned.iter()); } // Excluded columns should be unique let n_elem = idents.len(); diff --git a/datafusion/sql/src/expr/function.rs b/datafusion/sql/src/expr/function.rs index 1790e66a027bb..67abb8b822063 100644 --- a/datafusion/sql/src/expr/function.rs +++ b/datafusion/sql/src/expr/function.rs @@ -927,10 +927,15 @@ impl SqlToRel<'_, S> { ); } + if lambda.params.iter().any(|p| p.data_type.is_some()) { + return not_impl_err!( + "Lambda parameters with explicit data types are not supported" + ); + } let params = lambda .params .iter() - .map(|p| crate::utils::normalize_ident(p.clone())) + .map(|p| crate::utils::normalize_ident(p.name.clone())) .collect(); let lambda_parameters = std::iter::zip(lambda_params, ¶ms) @@ -1181,19 +1186,19 @@ impl SqlToRel<'_, S> { /// After normalization with [normalize_ident], check whether all params are unique /// /// [normalize_ident]: crate::utils::normalize_ident -fn all_unique(params: &[sqlparser::ast::Ident]) -> bool { +fn all_unique(params: &[sqlparser::ast::LambdaFunctionParameter]) -> bool { match params.len() { 0 | 1 => true, 2 => { - crate::utils::normalize_ident(params[0].clone()) - != crate::utils::normalize_ident(params[1].clone()) + crate::utils::normalize_ident(params[0].name.clone()) + != crate::utils::normalize_ident(params[1].name.clone()) } _ => { let mut set = HashSet::with_capacity(params.len()); params .iter() - .map(|p| crate::utils::normalize_ident(p.clone())) + .map(|p| crate::utils::normalize_ident(p.name.clone())) .all(|p| set.insert(p)) } } diff --git a/datafusion/sql/src/expr/mod.rs b/datafusion/sql/src/expr/mod.rs index 5cbc1c84bdb4b..daf092ecd4cf9 100644 --- a/datafusion/sql/src/expr/mod.rs +++ b/datafusion/sql/src/expr/mod.rs @@ -900,7 +900,7 @@ impl SqlToRel<'_, S> { negated: bool, expr: SQLExpr, pattern: SQLExpr, - escape_char: Option, + escape_char: Option, schema: &DFSchema, planner_context: &mut PlannerContext, case_insensitive: bool, @@ -910,7 +910,7 @@ impl SqlToRel<'_, S> { return not_impl_err!("ANY in LIKE expression"); } let pattern = self.sql_expr_to_logical_expr(pattern, schema, planner_context)?; - let escape_char = match escape_char { + let escape_char = match escape_char.map(|v| v.value) { Some(Value::SingleQuotedString(char)) if char.len() == 1 => { Some(char.chars().next().unwrap()) } @@ -935,7 +935,7 @@ impl SqlToRel<'_, S> { negated: bool, expr: SQLExpr, pattern: SQLExpr, - escape_char: Option, + escape_char: Option, schema: &DFSchema, planner_context: &mut PlannerContext, ) -> Result { @@ -944,7 +944,7 @@ impl SqlToRel<'_, S> { if pattern_type != DataType::Utf8 && pattern_type != DataType::Null { return plan_err!("Invalid pattern in SIMILAR TO expression"); } - let escape_char = match escape_char { + let escape_char = match escape_char.map(|v| v.value) { Some(Value::SingleQuotedString(char)) if char.len() == 1 => { Some(char.chars().next().unwrap()) } diff --git a/datafusion/sql/src/query.rs b/datafusion/sql/src/query.rs index e320d2ee6e9c1..76124cbc7eb59 100644 --- a/datafusion/sql/src/query.rs +++ b/datafusion/sql/src/query.rs @@ -171,6 +171,7 @@ impl SqlToRel<'_, S> { // Apply to all fields columns: vec![], explicit: true, + at: None, }, ), PipeOperator::Union { diff --git a/datafusion/sql/src/select.rs b/datafusion/sql/src/select.rs index 09d8566c4a19e..b7f7d80e70815 100644 --- a/datafusion/sql/src/select.rs +++ b/datafusion/sql/src/select.rs @@ -839,6 +839,9 @@ impl SqlToRel<'_, S> { Ok(SelectExpr::Expression(expr)) } + SelectItem::ExprWithAliases { .. } => { + not_impl_err!("SELECT item with multiple aliases is not supported") + } SelectItem::Wildcard(options) => { Self::check_wildcard_options(&options)?; if empty_from { @@ -886,11 +889,14 @@ impl SqlToRel<'_, S> { opt_rename, opt_replace: _opt_replace, opt_ilike: _opt_ilike, + opt_alias, wildcard_token: _wildcard_token, } = options; if opt_rename.is_some() { not_impl_err!("wildcard * with RENAME not supported ") + } else if opt_alias.is_some() { + not_impl_err!("wildcard * with AS alias not supported") } else { Ok(()) } diff --git a/datafusion/sql/src/statement.rs b/datafusion/sql/src/statement.rs index 587ed02d13188..e7088c8a3d6f1 100644 --- a/datafusion/sql/src/statement.rs +++ b/datafusion/sql/src/statement.rs @@ -344,6 +344,12 @@ impl SqlToRel<'_, S> { require_user, partition_of, for_values, + snapshot, + with_storage_lifecycle_policy, + diststyle, + distkey, + sortkey, + backup, }) => { if temporary { return not_impl_err!("Temporary tables not supported"); @@ -497,6 +503,24 @@ impl SqlToRel<'_, S> { if for_values.is_some() { return not_impl_err!("PARTITION OF .. FOR VALUES .. not supported"); } + if snapshot { + return not_impl_err!("Snapshot tables not supported"); + } + if with_storage_lifecycle_policy.is_some() { + return not_impl_err!("WITH STORAGE LIFECYCLE POLICY not supported"); + } + if diststyle.is_some() { + return not_impl_err!("DISTSTYLE not supported"); + } + if distkey.is_some() { + return not_impl_err!("DISTKEY not supported"); + } + if sortkey.is_some() { + return not_impl_err!("SORTKEY not supported"); + } + if backup.is_some() { + return not_impl_err!("BACKUP not supported"); + } // Merge inline constraints and existing constraints let mut all_constraints = constraints; let inline_constraints = calc_inline_constraints_from_columns(&columns); @@ -604,6 +628,7 @@ impl SqlToRel<'_, S> { or_alter, secure, name_before_not_exists, + copy_grants, }) => { if materialized { return not_impl_err!("Materialized views not supported")?; @@ -623,6 +648,9 @@ impl SqlToRel<'_, S> { if to.is_some() { return not_impl_err!("To not supported")?; } + if copy_grants { + return not_impl_err!("COPY GRANTS not supported")?; + } // put the statement back together temporarily to get the SQL // string representation @@ -643,6 +671,7 @@ impl SqlToRel<'_, S> { or_alter, secure, name_before_not_exists, + copy_grants, }); let sql = stmt.to_string(); let Statement::CreateView(ast::CreateView { @@ -1000,7 +1029,12 @@ impl SqlToRel<'_, S> { settings, format_clause, insert_token: _, // record the location the `INSERT` token - optimizer_hint, + optimizer_hints, + output, + multi_table_insert_type, + multi_table_into_clauses, + multi_table_when_clauses, + multi_table_else_clause, }) => { let table_name = match table { TableObject::TableName(table_name) => table_name, @@ -1009,6 +1043,11 @@ impl SqlToRel<'_, S> { "INSERT INTO Table functions not supported" ); } + TableObject::TableQuery(_) => { + return not_impl_err!( + "INSERT INTO subquery target not supported" + ); + } }; if let Some(or) = or { match or { @@ -1056,9 +1095,19 @@ impl SqlToRel<'_, S> { if format_clause.is_some() { plan_err!("Inserts with format clause not supported")?; } - if optimizer_hint.is_some() { + if !optimizer_hints.is_empty() { plan_err!("Optimizer hints not supported")?; } + if output.is_some() { + plan_err!("Insert OUTPUT clause not supported")?; + } + if multi_table_insert_type.is_some() + || !multi_table_into_clauses.is_empty() + || !multi_table_when_clauses.is_empty() + || multi_table_else_clause.is_some() + { + plan_err!("Multi-table INSERT not supported")?; + } // optional keywords don't change behavior let _ = into; let _ = has_table_keyword; @@ -1073,7 +1122,9 @@ impl SqlToRel<'_, S> { or, limit, update_token: _, - optimizer_hint, + optimizer_hints, + output, + order_by, }) => { let from_clauses = from.map(|update_table_from_kind| match update_table_from_kind { @@ -1103,9 +1154,15 @@ impl SqlToRel<'_, S> { if limit.is_some() { return not_impl_err!("Update-limit clause not supported")?; } - if optimizer_hint.is_some() { + if !optimizer_hints.is_empty() { plan_err!("Optimizer hints not supported")?; } + if output.is_some() { + plan_err!("Update OUTPUT clause not supported")?; + } + if !order_by.is_empty() { + plan_err!("Update ORDER BY not supported")?; + } self.update_to_plan(table, &assignments, update_from, selection) } @@ -1118,7 +1175,8 @@ impl SqlToRel<'_, S> { order_by, limit, delete_token: _, - optimizer_hint, + optimizer_hints, + output, }) => { if !tables.is_empty() { plan_err!("DELETE not supported")?; @@ -1136,9 +1194,12 @@ impl SqlToRel<'_, S> { plan_err!("Delete-order-by clause not yet supported")?; } - if optimizer_hint.is_some() { + if !optimizer_hints.is_empty() { plan_err!("Optimizer hints not supported")?; } + if output.is_some() { + plan_err!("Delete OUTPUT clause not supported")?; + } let table_name = self.get_delete_target(from)?; self.delete_to_plan(&table_name, selection, limit) @@ -1260,7 +1321,14 @@ impl SqlToRel<'_, S> { .. }) => { let return_type = match return_type { - Some(t) => Some(self.convert_data_type_to_field(&t)?), + Some(ast::FunctionReturnType::DataType(t)) => { + Some(self.convert_data_type_to_field(&t)?) + } + Some(ast::FunctionReturnType::SetOf(_)) => { + return not_impl_err!( + "RETURNS SETOF in CREATE FUNCTION is not supported" + ); + } None => None, }; let mut planner_context = PlannerContext::new(); @@ -1882,6 +1950,16 @@ impl SqlToRel<'_, S> { TableConstraint::FulltextOrSpatial { .. } => { _plan_err!("Indexes are not currently supported") } + TableConstraint::PrimaryKeyUsingIndex(_) => { + _plan_err!( + "PRIMARY KEY USING INDEX constraints are not currently supported" + ) + } + TableConstraint::UniqueUsingIndex(_) => { + _plan_err!( + "UNIQUE USING INDEX constraints are not currently supported" + ) + } }) .collect::>>()?; Ok(Constraints::new_unverified(constraints)) @@ -2276,7 +2354,7 @@ impl SqlToRel<'_, S> { fn insert_to_plan( &self, table_name: ObjectName, - columns: Vec, + columns: Vec, source: Box, overwrite: bool, replace_into: bool, @@ -2286,6 +2364,24 @@ impl SqlToRel<'_, S> { let table_source = self.context_provider.get_table_source(table_name.clone())?; let table_schema = DFSchema::try_from(table_source.schema())?; + let columns: Vec = columns + .into_iter() + .map(|name| { + if name.0.len() != 1 { + return not_impl_err!( + "Multi-part column names in INSERT not supported: {name}" + ); + } + let part = &name.0[0]; + let Some(ident) = part.as_ident() else { + return not_impl_err!( + "Non-identifier column name part in INSERT not supported: {part}" + ); + }; + Ok(ident.clone()) + }) + .collect::>>()?; + // Get insert fields and target table's value indices // // If value_indices[i] = Some(j), it means that the value of the i-th target table's column is @@ -2329,7 +2425,7 @@ impl SqlToRel<'_, S> { let mut prepare_param_data_types = BTreeMap::new(); if let SetExpr::Values(ast::Values { rows, .. }) = (*source.body).clone() { for row in rows.iter() { - for (idx, val) in row.iter().enumerate() { + for (idx, val) in row.content.iter().enumerate() { if let SQLExpr::Value(ValueWithSpan { value: Value::Placeholder(name), span: _, @@ -2581,7 +2677,7 @@ ON p.function_name = r.routine_name None => Ok(()), // BEGIN TRANSACTION Some(BeginTransactionKind::Transaction) => Ok(()), - Some(BeginTransactionKind::Work) => { + Some(BeginTransactionKind::Work) | Some(BeginTransactionKind::Tran) => { not_impl_err!("Transaction kind not supported: {kind:?}") } } diff --git a/datafusion/sql/src/unparser/ast.rs b/datafusion/sql/src/unparser/ast.rs index d8d5ec9e409fc..4b4e56c40cdc5 100644 --- a/datafusion/sql/src/unparser/ast.rs +++ b/datafusion/sql/src/unparser/ast.rs @@ -358,7 +358,7 @@ impl SelectBuilder { } pub fn build(&self) -> Result { Ok(ast::Select { - optimizer_hint: None, + optimizer_hints: vec![], distinct: self.distinct.clone(), select_modifiers: None, top_before_distinct: false, @@ -477,6 +477,9 @@ pub struct RelationBuilder { } #[derive(Clone)] +// Boxing variants would penalize the common builder path; this enum is +// constructed-then-consumed locally rather than stored at scale. +#[expect(clippy::large_enum_variant)] enum TableFactorBuilder { Table(TableRelationBuilder), Derived(DerivedRelationBuilder), @@ -794,6 +797,7 @@ impl FlattenRelationBuilder { lateral: true, name: ast::ObjectName::from(vec![ast::Ident::new("FLATTEN")]), args, + with_ordinality: false, alias: self.alias.clone(), }) } diff --git a/datafusion/sql/src/unparser/expr.rs b/datafusion/sql/src/unparser/expr.rs index 2124d5739c0cd..dde383ea26c39 100644 --- a/datafusion/sql/src/unparser/expr.rs +++ b/datafusion/sql/src/unparser/expr.rs @@ -317,7 +317,8 @@ impl Unparser<'_> { negated: *negated, expr: Box::new(self.expr_to_sql_inner(expr)?), pattern: Box::new(self.expr_to_sql_inner(pattern)?), - escape_char: escape_char.map(|c| SingleQuotedString(c.to_string())), + escape_char: escape_char + .map(|c| SingleQuotedString(c.to_string()).into()), any: false, }), Expr::Like(Like { @@ -333,7 +334,7 @@ impl Unparser<'_> { expr: Box::new(self.expr_to_sql_inner(expr)?), pattern: Box::new(self.expr_to_sql_inner(pattern)?), escape_char: escape_char - .map(|c| SingleQuotedString(c.to_string())), + .map(|c| SingleQuotedString(c.to_string()).into()), any: false, }) } else { @@ -342,7 +343,7 @@ impl Unparser<'_> { expr: Box::new(self.expr_to_sql_inner(expr)?), pattern: Box::new(self.expr_to_sql_inner(pattern)?), escape_char: escape_char - .map(|c| SingleQuotedString(c.to_string())), + .map(|c| SingleQuotedString(c.to_string()).into()), any: false, }) } @@ -598,7 +599,10 @@ impl Unparser<'_> { params: ast::OneOrManyWithParens::Many( params .iter() - .map(|param| self.new_ident_quoted_if_needs(param.clone())) + .map(|param| ast::LambdaFunctionParameter { + name: self.new_ident_quoted_if_needs(param.clone()), + data_type: None, + }) .collect(), ), body: Box::new(self.expr_to_sql_inner(body)?), diff --git a/datafusion/sql/src/unparser/plan.rs b/datafusion/sql/src/unparser/plan.rs index 2c36fe0b2c98a..43d7ef49d9453 100644 --- a/datafusion/sql/src/unparser/plan.rs +++ b/datafusion/sql/src/unparser/plan.rs @@ -434,6 +434,7 @@ impl Unparser<'_> { name: Ident::with_quote('"', &flatten_alias_name), columns: vec![], explicit: true, + at: None, })); if !select.already_projected() { @@ -1208,6 +1209,7 @@ impl Unparser<'_> { name: Ident::with_quote('"', &alias), columns: vec![], explicit: true, + at: None, })); } relation.flatten(flatten_relation); @@ -1902,6 +1904,7 @@ impl Unparser<'_> { name: self.new_ident_quoted_if_needs(alias), columns, explicit: true, + at: None, } } diff --git a/datafusion/sql/src/values.rs b/datafusion/sql/src/values.rs index c8cdf1254f33f..a1df1fe1b18d1 100644 --- a/datafusion/sql/src/values.rs +++ b/datafusion/sql/src/values.rs @@ -43,7 +43,8 @@ impl SqlToRel<'_, S> { let values = rows .into_iter() .map(|row| { - row.into_iter() + row.content + .into_iter() .map(|v| self.sql_to_expr(v, &empty_schema, planner_context)) .collect::>>() }) diff --git a/datafusion/sqllogictest/test_files/array/array_transform.slt b/datafusion/sqllogictest/test_files/array/array_transform.slt index f726d265d9d63..f87253695d332 100644 --- a/datafusion/sqllogictest/test_files/array/array_transform.slt +++ b/datafusion/sqllogictest/test_files/array/array_transform.slt @@ -411,13 +411,13 @@ SELECT array_transform([1, 2], (e, i, j) -> i); query error DataFusion error: Error during planning: lambda parameters names must be unique, got \(v, v\) SELECT array_transform([1], (v, v) -> v*2); -query error DataFusion error: This feature is not implemented: Unsupported ast node in sqltorel: Lambda\(LambdaFunction \{ params: One\(Ident \{ value: "v", quote_style: None, span: Span\(Location\(1,12\)\.\.Location\(1,13\)\) \}\), body: Identifier\(Ident \{ value: "v", quote_style: None, span: Span\(Location\(1,17\)\.\.Location\(1,18\)\) \}\), syntax: Arrow \}\) +query error DataFusion error: This feature is not implemented: Unsupported ast node in sqltorel: Lambda\(LambdaFunction \{ params: One\(LambdaFunctionParameter \{ name: Ident \{ value: "v", quote_style: None, span: Span\(Location\(1,12\)\.\.Location\(1,13\)\) \}, data_type: None \}\), body: Identifier\(Ident \{ value: "v", quote_style: None, span: Span\(Location\(1,17\)\.\.Location\(1,18\)\) \}\), syntax: Arrow \}\) SELECT abs(v -> v); -query error DataFusion error: This feature is not implemented: Unsupported ast node in sqltorel: Lambda\(LambdaFunction \{ params: One\(Ident \{ value: "v", quote_style: None, span: Span\(Location\(1,8\)\.\.Location\(1,9\)\) \}\), body: Identifier\(Ident \{ value: "v", quote_style: None, span: Span\(Location\(1,13\)\.\.Location\(1,14\)\) \}\), syntax: Arrow \}\) +query error DataFusion error: This feature is not implemented: Unsupported ast node in sqltorel: Lambda\(LambdaFunction \{ params: One\(LambdaFunctionParameter \{ name: Ident \{ value: "v", quote_style: None, span: Span\(Location\(1,8\)\.\.Location\(1,9\)\) \}, data_type: None \}\), body: Identifier\(Ident \{ value: "v", quote_style: None, span: Span\(Location\(1,13\)\.\.Location\(1,14\)\) \}\), syntax: Arrow \}\) SELECT v -> v; -query error DataFusion error: This feature is not implemented: Unsupported ast node in sqltorel: Lambda\(LambdaFunction \{ params: One\(Ident \{ value: "v", quote_style: None, span: Span\(Location\(1,34\)\.\.Location\(1,35\)\) \}\), body: BinaryOp \{ left: Identifier\(Ident \{ value: "v", quote_style: None, span: Span\(Location\(1,39\)\.\.Location\(1,40\)\) \}\), op: Plus, right: Value\(ValueWithSpan \{ value: Number\("1", false\), span: Span\(Location\(1,41\)\.\.Location\(1,42\)\) \}\) \}, syntax: Arrow \}\) +query error DataFusion error: This feature is not implemented: Unsupported ast node in sqltorel: Lambda\(LambdaFunction \{ params: One\(LambdaFunctionParameter \{ name: Ident \{ value: "v", quote_style: None, span: Span\(Location\(1,34\)\.\.Location\(1,35\)\) \}, data_type: None \}\), body: BinaryOp \{ left: Identifier\(Ident \{ value: "v", quote_style: None, span: Span\(Location\(1,39\)\.\.Location\(1,40\)\) \}\), op: Plus, right: Value\(ValueWithSpan \{ value: Number\("1", false\), span: Span\(Location\(1,41\)\.\.Location\(1,42\)\) \}\) \}, syntax: Arrow \}\) SELECT array_transform([1], v -> v -> v+1); query error DataFusion error: SQL error: ParserError\("Expected: an expression, found: \) at Line: 1, Column: 30"\)