Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ regex = "1.12"
rstest = "0.26.1"
serde_json = "1"
sha2 = "^0.11.0"
sqlparser = { version = "0.61.0", default-features = false, features = ["std", "visitor"] }
sqlparser = { version = "0.62.0", default-features = false, features = ["std", "visitor"] }
strum = "0.28.0"
strum_macros = "0.28.0"
tempfile = "3"
Expand Down
27 changes: 26 additions & 1 deletion datafusion/execution/src/memory_pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
//! help with allocation accounting.

use datafusion_common::{Result, internal_datafusion_err};
use std::any::Any;
use std::fmt::Display;
use std::hash::{Hash, Hasher};
use std::{cmp::Ordering, sync::Arc, sync::atomic};
Expand Down Expand Up @@ -182,7 +183,7 @@ pub use pool::*;
///
/// * [`TrackConsumersPool`]: Wraps another [`MemoryPool`] and tracks consumers,
/// providing better error messages on the largest memory users.
pub trait MemoryPool: Send + Sync + std::fmt::Debug + Display {
pub trait MemoryPool: Any + Send + Sync + std::fmt::Debug + Display {
/// Return pool name
fn name(&self) -> &str;

Expand Down Expand Up @@ -224,6 +225,18 @@ pub trait MemoryPool: Send + Sync + std::fmt::Debug + Display {
}
}

impl dyn MemoryPool {
/// Returns `true` if this pool is of type `T`.
pub fn is<T: MemoryPool>(&self) -> bool {
(self as &dyn Any).is::<T>()
}

/// Attempts to downcast this pool to a concrete type `T`.
pub fn downcast_ref<T: MemoryPool>(&self) -> Option<&T> {
(self as &dyn Any).downcast_ref()
}
}

/// Memory limit of `MemoryPool`
pub enum MemoryLimit {
Infinite,
Expand Down Expand Up @@ -603,6 +616,18 @@ mod tests {
assert_eq!(pool.reserved(), 28);
}

#[test]
fn test_downcast() {
let pool: Arc<dyn MemoryPool> = Arc::new(GreedyMemoryPool::new(50));

assert!(pool.is::<GreedyMemoryPool>());
assert!(!pool.is::<UnboundedMemoryPool>());

let greedy: &GreedyMemoryPool = pool.downcast_ref().unwrap();
assert_eq!(greedy.reserved(), 0);
assert!(pool.downcast_ref::<UnboundedMemoryPool>().is_none());
}

#[test]
fn test_try_shrink() {
let pool = Arc::new(GreedyMemoryPool::new(100)) as _;
Expand Down
4 changes: 2 additions & 2 deletions datafusion/expr/src/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4014,8 +4014,8 @@ mod test {
wildcard_with_options(wildcard_options(
None,
Some(ExcludeSelectItem::Multiple(vec![
Ident::from("c1"),
Ident::from("c2")
Ident::from("c1").into(),
Ident::from("c2").into()
])),
None,
None,
Expand Down
35 changes: 33 additions & 2 deletions datafusion/expr/src/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ impl Display for IlikeSelectItem {

#[derive(Clone, PartialEq, Eq, PartialOrd, Hash, Debug)]
pub enum ExcludeSelectItem {
Single(Ident),
Multiple(Vec<Ident>),
Single(ObjectName),
Multiple(Vec<ObjectName>),
}

impl Display for ExcludeSelectItem {
Expand All @@ -64,6 +64,37 @@ impl Display for ExcludeSelectItem {
}
}

#[derive(Clone, PartialEq, Eq, PartialOrd, Hash, Debug)]
pub struct ObjectName(pub Vec<ObjectNamePart>);

impl Display for ObjectName {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
let parts: Vec<String> = self.0.iter().map(|p| format!("{p}")).collect();
write!(f, "{}", parts.join("."))
}
}

#[derive(Clone, PartialEq, Eq, PartialOrd, Hash, Debug)]
pub enum ObjectNamePart {
Identifier(Ident),
}

impl ObjectNamePart {
pub fn as_ident(&self) -> Option<&Ident> {
match self {
ObjectNamePart::Identifier(ident) => Some(ident),
}
}
}

impl Display for ObjectNamePart {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
match self {
ObjectNamePart::Identifier(ident) => write!(f, "{ident}"),
}
}
}

#[derive(Clone, PartialEq, Eq, PartialOrd, Hash, Debug)]
pub struct ExceptSelectItem {
pub first_element: Ident,
Expand Down
33 changes: 27 additions & 6 deletions datafusion/expr/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,10 @@ use datafusion_common::{
};

#[cfg(not(feature = "sql"))]
use crate::sql::{ExceptSelectItem, ExcludeSelectItem};
use crate::sql::{ExceptSelectItem, ExcludeSelectItem, Ident, ObjectName};
use indexmap::IndexSet;
#[cfg(feature = "sql")]
use sqlparser::ast::{ExceptSelectItem, ExcludeSelectItem};
use sqlparser::ast::{ExceptSelectItem, ExcludeSelectItem, Ident, ObjectName};

pub use datafusion_functions_aggregate_common::order::AggregateOrderSensitivity;

Expand Down Expand Up @@ -339,11 +339,32 @@ fn get_excluded_columns(
idents.push(&excepts.first_element);
idents.extend(&excepts.additional_elements);
}
// Declared outside the `if let` so `idents.extend(exclude_owned.iter())`
// below can borrow references that outlive the inner scope.
let exclude_owned: Vec<Ident>;
if let Some(exclude) = opt_exclude {
match exclude {
ExcludeSelectItem::Single(ident) => idents.push(ident),
ExcludeSelectItem::Multiple(idents_inner) => idents.extend(idents_inner),
}
let object_name_to_ident = |name: &ObjectName| -> Result<Ident> {
if name.0.len() != 1 {
return plan_err!(
"EXCLUDE with multi-part identifiers is not supported: {name}"
);
}
let part = &name.0[0];
let Some(ident) = part.as_ident() else {
return plan_err!(
"EXCLUDE with non-identifier name part is not supported: {part}"
);
};
Ok(ident.clone())
};
exclude_owned = match exclude {
ExcludeSelectItem::Single(name) => vec![object_name_to_ident(name)?],
ExcludeSelectItem::Multiple(names) => names
.iter()
.map(object_name_to_ident)
.collect::<Result<Vec<_>>>()?,
};
idents.extend(exclude_owned.iter());
}
// Excluded columns should be unique
let n_elem = idents.len();
Expand Down
4 changes: 4 additions & 0 deletions datafusion/physical-plan/src/aggregates/no_grouping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use crate::aggregates::{
finalize_aggregation,
};
use crate::metrics::{BaselineMetrics, RecordOutput};
use crate::stream::EmptyRecordBatchStream;
use crate::{RecordBatchStream, SendableRecordBatchStream};
use arrow::datatypes::SchemaRef;
use arrow::record_batch::RecordBatch;
Expand Down Expand Up @@ -370,6 +371,9 @@ impl AggregateStream {
Some(Err(e)) => Err(e),
None => {
this.finished = true;
// Release the input pipeline's resources before finalization.
let input_schema = this.input.schema();
this.input = Box::pin(EmptyRecordBatchStream::new(input_schema));
let timer = this.baseline_metrics.elapsed_compute().timer();
let result =
finalize_aggregation(&mut this.accumulators, &this.mode)
Expand Down
10 changes: 10 additions & 0 deletions datafusion/physical-plan/src/aggregates/row_hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use crate::aggregates::{
use crate::metrics::{BaselineMetrics, MetricBuilder, MetricCategory, RecordOutput};
use crate::sorts::streaming_merge::{SortedSpillFile, StreamingMergeBuilder};
use crate::spill::spill_manager::{GetSlicedSize, SpillManager};
use crate::stream::EmptyRecordBatchStream;
use crate::{PhysicalExpr, aggregates, metrics};
use crate::{RecordBatchStream, SendableRecordBatchStream};

Expand Down Expand Up @@ -837,6 +838,10 @@ impl Stream for GroupedHashAggregateStream {
self.group_values.len()
)));
}
// Release the input pipeline's resources.
let input_schema = self.input.schema();
self.input =
Box::pin(EmptyRecordBatchStream::new(input_schema));
self.exec_state = ExecutionState::Done;
}
}
Expand Down Expand Up @@ -1320,6 +1325,11 @@ impl GroupedHashAggregateStream {
fn set_input_done_and_produce_output(&mut self) -> Result<()> {
self.input_done = true;
self.group_ordering.input_done();
// Release the original input pipeline's resources now that we're done
// reading from it. In the spill branch below, `self.input` is replaced
// again with a stream that merges spill files.
let input_schema = self.input.schema();
self.input = Box::pin(EmptyRecordBatchStream::new(input_schema));
let elapsed_compute = self.baseline_metrics.elapsed_compute().clone();
let timer = elapsed_compute.timer();
self.exec_state = if self.spill_state.spills.is_empty() {
Expand Down
4 changes: 4 additions & 0 deletions datafusion/physical-plan/src/aggregates/topk_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use crate::aggregates::{
evaluate_many,
};
use crate::metrics::BaselineMetrics;
use crate::stream::EmptyRecordBatchStream;
use crate::{RecordBatchStream, SendableRecordBatchStream};
use arrow::array::{Array, ArrayRef, RecordBatch};
use arrow::datatypes::SchemaRef;
Expand Down Expand Up @@ -205,6 +206,9 @@ impl Stream for GroupedTopKAggregateStream {
}
// inner is done, emit all rows and switch to producing output
None => {
// Release the input pipeline's resources before emitting.
let input_schema = self.input.schema();
self.input = Box::pin(EmptyRecordBatchStream::new(input_schema));
if self.priority_map.is_empty() {
trace!("partition {} emit None", self.partition);
return Poll::Ready(None);
Expand Down
1 change: 1 addition & 0 deletions datafusion/physical-plan/src/analyze.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ impl ExecutionPlan for AnalyzeExec {
while let Some(batch) = input_stream.next().await.transpose()? {
total_rows += batch.num_rows();
}
drop(input_stream);

let duration = Instant::now() - start;
create_output_batch(
Expand Down
6 changes: 5 additions & 1 deletion datafusion/physical-plan/src/async_func.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

use crate::coalesce::LimitedBatchCoalescer;
use crate::metrics::{ExecutionPlanMetricsSet, MetricsSet};
use crate::stream::RecordBatchStreamAdapter;
use crate::stream::{EmptyRecordBatchStream, RecordBatchStreamAdapter};
use crate::{
DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, PlanProperties,
check_if_same_properties,
Expand Down Expand Up @@ -290,6 +290,10 @@ impl Stream for CoalesceInputStream {
}
None => {
completed = true;
// Release the input pipeline's resources.
let input_schema = self.input_stream.schema();
self.input_stream =
Box::pin(EmptyRecordBatchStream::new(input_schema));
if let Err(err) = self.batch_coalescer.finish() {
return Poll::Ready(Some(Err(err)));
}
Expand Down
6 changes: 6 additions & 0 deletions datafusion/physical-plan/src/coalesce_batches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use std::task::{Context, Poll};
use super::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
use super::{DisplayAs, ExecutionPlanProperties, PlanProperties, Statistics};
use crate::projection::ProjectionExec;
use crate::stream::EmptyRecordBatchStream;
use crate::{
DisplayFormatType, ExecutionPlan, RecordBatchStream, SendableRecordBatchStream,
check_if_same_properties,
Expand Down Expand Up @@ -343,6 +344,8 @@ impl CoalesceBatchesStream {
None => {
// Input stream is exhausted, finalize any remaining batches
self.completed = true;
self.input =
Box::pin(EmptyRecordBatchStream::new(self.coalescer.schema()));
self.coalescer.finish()?;
}
Some(Ok(batch)) => {
Expand All @@ -353,6 +356,9 @@ impl CoalesceBatchesStream {
PushBatchStatus::LimitReached => {
// limit was reached, so stop early
self.completed = true;
self.input = Box::pin(EmptyRecordBatchStream::new(
self.coalescer.schema(),
));
self.coalescer.finish()?;
}
}
Expand Down
8 changes: 8 additions & 0 deletions datafusion/physical-plan/src/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ use crate::projection::{
EmbeddedProjection, ProjectionExec, ProjectionExpr, make_with_child,
try_embed_projection, update_expr,
};
use crate::stream::EmptyRecordBatchStream;
use crate::{
DisplayFormatType, ExecutionPlan,
metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, RatioMetrics},
Expand Down Expand Up @@ -1030,6 +1031,9 @@ impl Stream for FilterExecStream {
match ready!(self.input.poll_next_unpin(cx)) {
None => {
self.batch_coalescer.finish()?;
// Release the input pipeline's resources.
let input_schema = self.input.schema();
self.input = Box::pin(EmptyRecordBatchStream::new(input_schema));
// continue draining the coalescer
}
Some(Ok(batch)) => {
Expand Down Expand Up @@ -1070,6 +1074,10 @@ impl Stream for FilterExecStream {
PushBatchStatus::LimitReached => {
// limit was reached, so stop early
self.batch_coalescer.finish()?;
// Release the input pipeline's resources.
let input_schema = self.input.schema();
self.input =
Box::pin(EmptyRecordBatchStream::new(input_schema));
// continue draining the coalescer
}
}
Expand Down
8 changes: 7 additions & 1 deletion datafusion/physical-plan/src/joins/cross_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use crate::projection::{
ProjectionExec, join_allows_pushdown, join_table_borders, new_join_children,
physical_to_column_exprs,
};
use crate::stream::EmptyRecordBatchStream;
use crate::{
ColumnStatistics, DisplayAs, DisplayFormatType, Distribution, ExecutionPlan,
ExecutionPlanProperties, PlanProperties, RecordBatchStream,
Expand Down Expand Up @@ -645,7 +646,12 @@ impl<T: BatchTransformer> CrossJoinStream<T> {
let right_data = match ready!(self.right.poll_next_unpin(cx)) {
Some(Ok(right_data)) => right_data,
Some(Err(e)) => return Poll::Ready(Err(e)),
None => return Poll::Ready(Ok(StatefulStreamResult::Ready(None))),
None => {
// Release the right (probe) input pipeline's resources.
let right_schema = self.right.schema();
self.right = Box::pin(EmptyRecordBatchStream::new(right_schema));
return Poll::Ready(Ok(StatefulStreamResult::Ready(None)));
}
};
self.join_metrics.input_batches.add(1);
self.join_metrics.input_rows.add(right_data.num_rows());
Expand Down
6 changes: 6 additions & 0 deletions datafusion/physical-plan/src/joins/hash_join/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use crate::joins::hash_join::shared_bounds::{
use crate::joins::utils::{
OnceFut, equal_rows_arr, get_final_indices_from_shared_bitmap,
};
use crate::stream::EmptyRecordBatchStream;
use crate::{
RecordBatchStream, SendableRecordBatchStream, handle_state,
hash_utils::create_hashes,
Expand Down Expand Up @@ -587,6 +588,11 @@ impl HashJoinStream {
) -> Poll<Result<StatefulStreamResult<Option<RecordBatch>>>> {
match ready!(self.right.poll_next_unpin(cx)) {
None => {
// Release the probe-side input pipeline's resources. The schema
// is preserved so callers that still query `self.right.schema()`
// (e.g. for unmatched-build emission) keep working.
let right_schema = self.right.schema();
self.right = Box::pin(EmptyRecordBatchStream::new(right_schema));
self.state = HashJoinStreamState::ExhaustedProbeSide;
}
Some(Ok(batch)) => {
Expand Down
6 changes: 6 additions & 0 deletions datafusion/physical-plan/src/joins/nested_loop_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1611,6 +1611,12 @@ impl NestedLoopJoinStream {
}
}

// If the left stream is fully exhausted, release its resources so the
// upstream pipeline can be torn down before we move on to probing.
if self.left_exhausted {
active.left_stream = None;
}

if active.pending_batches.is_empty() {
// No data at all — go directly to Done
self.left_exhausted = true;
Expand Down
Loading
Loading