From 2028426be36d23f8259610d7022809f0129ab082 Mon Sep 17 00:00:00 2001 From: Qi Zhu <821684824@qq.com> Date: Thu, 30 Apr 2026 11:25:54 +0800 Subject: [PATCH 1/2] feat: pushdown OFFSET to parquet for RG-level skipping Push OFFSET from GlobalLimitExec down to DataSourceExec/ParquetOpener. Uses shared Arc counter across partitions so multi-partition single-file queries (byte-range partitioning) are handled correctly. Design: - with_offset accepted for parquet + no filter (any file count) - SharedCount: each partition atomically consumes offset by skipping RGs - RowSelection for partial RG skip (remaining offset within first RG) - Optimizer eliminates GlobalLimitExec when offset is pushed - effective_limit adjusted per partition based on consumed offset Implementation: - FileSource::supports_offset() (parquet=true, others=false) - FileScanConfig: offset field, with_offset (no filter guard) - LimitPushdown: push offset, eliminate GlobalLimitExec - prune_by_offset: skip leading fully-matched RGs - PreparedAccessPlan::apply_offset() for RowSelection - Shared Arc remaining_offset in ParquetMorselizer --- .../datasource-parquet/src/access_plan.rs | 30 +++ datafusion/datasource-parquet/src/metrics.rs | 8 + datafusion/datasource-parquet/src/opener.rs | 61 ++++++- .../src/row_group_filter.rs | 172 ++++++++++++++++++ datafusion/datasource-parquet/src/source.rs | 7 + datafusion/datasource/src/file.rs | 7 + .../datasource/src/file_scan_config/mod.rs | 45 +++++ datafusion/datasource/src/source.rs | 28 +++ .../physical-optimizer/src/limit_pushdown.rs | 19 +- .../physical-plan/src/execution_plan.rs | 12 ++ .../dynamic_filter_pushdown_config.slt | 2 +- .../test_files/explain_analyze.slt | 6 +- .../sqllogictest/test_files/limit_pruning.slt | 4 +- .../test_files/push_down_filter_parquet.slt | 52 +++--- .../sqllogictest/test_files/sort_pushdown.slt | 148 +++++++++++++++ 15 files changed, 563 insertions(+), 38 deletions(-) diff --git a/datafusion/datasource-parquet/src/access_plan.rs b/datafusion/datasource-parquet/src/access_plan.rs index ca4d097c37a44..9128c0fe0c2a1 100644 --- a/datafusion/datasource-parquet/src/access_plan.rs +++ b/datafusion/datasource-parquet/src/access_plan.rs @@ -396,6 +396,36 @@ impl PreparedAccessPlan { Ok(self) } + + /// Apply a row-level offset by creating a [`RowSelection`] that skips + /// the first `remaining_offset` rows across all row groups. + pub(crate) fn apply_offset( + mut self, + remaining_offset: usize, + rg_metadata: &[RowGroupMetaData], + ) -> Self { + if remaining_offset == 0 || self.row_group_indexes.is_empty() { + return self; + } + let total_rows: usize = self + .row_group_indexes + .iter() + .map(|&idx| rg_metadata[idx].num_rows() as usize) + .sum(); + let select_rows = total_rows.saturating_sub(remaining_offset); + if select_rows == 0 { + return self; + } + let offset_selection = RowSelection::from(vec![ + RowSelector::skip(remaining_offset), + RowSelector::select(select_rows), + ]); + self.row_selection = Some(match self.row_selection { + Some(existing) => existing.intersection(&offset_selection), + None => offset_selection, + }); + self + } } #[cfg(test)] diff --git a/datafusion/datasource-parquet/src/metrics.rs b/datafusion/datasource-parquet/src/metrics.rs index 8eb5912b919da..1534515108810 100644 --- a/datafusion/datasource-parquet/src/metrics.rs +++ b/datafusion/datasource-parquet/src/metrics.rs @@ -49,6 +49,8 @@ pub struct ParquetFileMetrics { pub row_groups_pruned_bloom_filter: PruningMetrics, /// Number of row groups pruned due to limit pruning. pub limit_pruned_row_groups: PruningMetrics, + /// Number of row groups pruned due to offset pruning. + pub offset_pruned_row_groups: PruningMetrics, /// Number of row groups pruned by statistics pub row_groups_pruned_statistics: PruningMetrics, /// Total number of bytes scanned @@ -113,6 +115,11 @@ impl ParquetFileMetrics { .with_type(MetricType::Summary) .pruning_metrics("limit_pruned_row_groups", partition); + let offset_pruned_row_groups = MetricBuilder::new(metrics) + .with_new_label("filename", filename.to_string()) + .with_type(MetricType::Summary) + .pruning_metrics("offset_pruned_row_groups", partition); + let row_groups_pruned_statistics = MetricBuilder::new(metrics) .with_new_label("filename", filename.to_string()) .with_type(MetricType::Summary) @@ -198,6 +205,7 @@ impl ParquetFileMetrics { row_groups_pruned_bloom_filter, row_groups_pruned_statistics, limit_pruned_row_groups, + offset_pruned_row_groups, bytes_scanned, pushdown_rows_pruned, pushdown_rows_matched, diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index bad1c684b47f5..51c06b9f81b7d 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -92,6 +92,9 @@ pub(super) struct ParquetMorselizer { pub batch_size: usize, /// Optional limit on the number of rows to read pub(crate) limit: Option, + /// Shared remaining offset across all partition openers. + /// Each opener atomically consumes rows by skipping RGs. + pub(crate) remaining_offset: Arc, /// If should keep the output rows in order pub preserve_order: bool, /// Optional predicate to apply during the scan @@ -281,6 +284,7 @@ struct PreparedParquetOpen { enable_bloom_filter: bool, enable_row_group_stats_pruning: bool, limit: Option, + remaining_offset: Arc, coerce_int96: Option, expr_adapter_factory: Arc, predicate_creation_errors: Count, @@ -650,6 +654,7 @@ impl ParquetMorselizer { enable_bloom_filter: self.enable_bloom_filter, enable_row_group_stats_pruning: self.enable_row_group_stats_pruning, limit: self.limit, + remaining_offset: Arc::clone(&self.remaining_offset), coerce_int96: self.coerce_int96, expr_adapter_factory: Arc::clone(&self.expr_adapter_factory), predicate_creation_errors, @@ -1101,6 +1106,29 @@ impl RowGroupsPrunedParquetOpen { None }; + // Prune by offset: atomically consume from the shared remaining_offset. + // Prune by offset: atomically consume from shared remaining_offset. + // Multiple partitions safely share this counter — each skips RGs + // and reduces the counter. When it reaches 0, remaining partitions + // start producing rows. + let current_offset = prepared + .remaining_offset + .load(std::sync::atomic::Ordering::SeqCst); + if current_offset > 0 { + let remaining = row_groups.prune_by_offset( + current_offset, + prepared.predicate.is_some(), + rg_metadata, + &prepared.file_metrics, + ); + let skipped = current_offset - remaining; + if skipped > 0 { + prepared + .remaining_offset + .fetch_sub(skipped, std::sync::atomic::Ordering::SeqCst); + } + } + // Prune by limit if limit is set and limit order is not sensitive if let (Some(limit), false) = (prepared.limit, prepared.preserve_order) { row_groups.prune_by_limit(limit, rg_metadata, &prepared.file_metrics); @@ -1126,6 +1154,25 @@ impl RowGroupsPrunedParquetOpen { // Prepare the access plan (extract row groups and row selection) let mut prepared_plan = access_plan.prepare(rg_metadata)?; + // Apply remaining offset (partial RG skip) via RowSelection. + // SharedCount was reduced by whole-RG skips above; any leftover + // offset is handled by skipping rows within the first surviving RG. + let now_remaining = prepared + .remaining_offset + .load(std::sync::atomic::Ordering::SeqCst); + if now_remaining > 0 { + // Atomically consume the remaining offset for this partition + let taken = prepared + .remaining_offset + .fetch_sub(now_remaining, std::sync::atomic::Ordering::SeqCst) + .min(now_remaining); + if taken > 0 { + prepared_plan = prepared_plan.apply_offset(taken, rg_metadata); + } + } + // GlobalLimitExec handles the row-level skip. We only prune + // whole RGs to reduce I/O. + // Potentially reverse the access plan for performance. // See `ParquetSource::try_pushdown_sort` for the rationale. if prepared.reverse_row_groups { @@ -1157,8 +1204,19 @@ impl RowGroupsPrunedParquetOpen { } decoder_builder = decoder_builder.with_row_groups(prepared_plan.row_group_indexes); + + // Adjust limit: original limit is skip+fetch from optimizer. + // Subtract the offset that was consumed by this partition's + // RG pruning (via SharedCount). Remaining rows to read = + // limit - (original_offset - current_remaining). if let Some(limit) = prepared.limit { - decoder_builder = decoder_builder.with_limit(limit); + let original_offset = current_offset; // captured before prune + let now_remaining = prepared + .remaining_offset + .load(std::sync::atomic::Ordering::SeqCst); + let this_partition_skipped = original_offset.saturating_sub(now_remaining); + let effective_limit = limit.saturating_sub(this_partition_skipped); + decoder_builder = decoder_builder.with_limit(effective_limit); } if let Some(max_predicate_cache_size) = prepared.max_predicate_cache_size { decoder_builder = @@ -1794,6 +1852,7 @@ mod test { projection, batch_size: self.batch_size, limit: self.limit, + remaining_offset: Arc::new(std::sync::atomic::AtomicUsize::new(0)), preserve_order: self.preserve_order, predicate: self.predicate, table_schema, diff --git a/datafusion/datasource-parquet/src/row_group_filter.rs b/datafusion/datasource-parquet/src/row_group_filter.rs index 3f254c9f55282..5b0b2141982f9 100644 --- a/datafusion/datasource-parquet/src/row_group_filter.rs +++ b/datafusion/datasource-parquet/src/row_group_filter.rs @@ -212,6 +212,51 @@ impl RowGroupAccessPlanFilter { } } + /// Prune row groups that can be entirely skipped due to offset. + /// + /// When an offset is specified, rows at the beginning of the scan must be + /// skipped. This method marks leading fully-matched row groups whose + /// cumulative row count falls within the offset as skipped, so they are + /// never read from disk. + /// + /// Returns the remaining offset (number of rows still to skip within the + /// first non-pruned row group). + pub fn prune_by_offset( + &mut self, + offset: usize, + has_predicate: bool, + rg_metadata: &[RowGroupMetaData], + metrics: &ParquetFileMetrics, + ) -> usize { + let mut remaining = offset; + let mut pruned_count = 0; + + for &idx in self.access_plan.row_group_indexes().iter() { + if remaining == 0 { + break; + } + // We can skip a row group entirely if: + // - No predicate: all rows match, row count is exact + // - Has predicate but is_fully_matched: all rows pass filter + let can_skip = !has_predicate || self.is_fully_matched[idx]; + if can_skip { + let rg_rows = rg_metadata[idx].num_rows() as usize; + if remaining >= rg_rows { + self.access_plan.skip(idx); + remaining -= rg_rows; + pruned_count += 1; + } else { + break; + } + } else { + break; + } + } + + metrics.offset_pruned_row_groups.add_pruned(pruned_count); + remaining + } + /// Prune remaining row groups to only those within the specified range. /// /// Updates this set to mark row groups that should not be scanned @@ -1438,6 +1483,133 @@ mod tests { ParquetFileMetrics::new(0, "file.parquet", &metrics) } + /// Create a RowGroupMetaData with the specified number of rows. + /// Uses a minimal schema with a single INT32 column. + fn make_row_group_meta(num_rows: i64) -> RowGroupMetaData { + let schema_descr = get_test_schema_descr(vec![PrimitiveTypeField::new( + "id", + PhysicalType::INT32, + )]); + let column = ColumnChunkMetaData::builder(schema_descr.column(0)) + .set_num_values(num_rows) + .build() + .unwrap(); + RowGroupMetaData::builder(schema_descr) + .set_num_rows(num_rows) + .set_total_byte_size(1000) + .set_column_metadata(vec![column]) + .build() + .unwrap() + } + + /// Helper to build a RowGroupAccessPlanFilter with specified fully_matched flags. + fn make_filter_with_fully_matched( + num_rgs: usize, + fully_matched: Vec, + ) -> RowGroupAccessPlanFilter { + assert_eq!(num_rgs, fully_matched.len()); + let access_plan = ParquetAccessPlan::new_all(num_rgs); + let mut filter = RowGroupAccessPlanFilter::new(access_plan); + filter.is_fully_matched = fully_matched; + filter + } + + #[test] + fn test_prune_by_offset_skips_fully_matched_rgs() { + // 3 RGs each with 100 rows, all fully_matched. offset=250. + // Should skip 2 RGs (200 rows), remaining=50. + let rg_metadata: Vec = + (0..3).map(|_| make_row_group_meta(100)).collect(); + let metrics = parquet_file_metrics(); + let mut filter = make_filter_with_fully_matched(3, vec![true, true, true]); + + let remaining = filter.prune_by_offset(250, false, &rg_metadata, &metrics); + assert_eq!(remaining, 50); + // First two RGs should be skipped, third should still be scanned + let indexes: Vec = filter.row_group_indexes().collect(); + assert_eq!(indexes, vec![2]); + } + + #[test] + fn test_prune_by_offset_stops_at_non_fully_matched() { + // 3 RGs each with 100 rows. First two fully_matched, third not. + // offset=250 → skip 2 RGs (200 rows), remaining=50. + // Cannot skip the non-fully-matched third RG even though offset + // still needs more rows skipped. + let rg_metadata: Vec = + (0..3).map(|_| make_row_group_meta(100)).collect(); + let metrics = parquet_file_metrics(); + let mut filter = make_filter_with_fully_matched(3, vec![true, true, false]); + + let remaining = filter.prune_by_offset(250, true, &rg_metadata, &metrics); + assert_eq!(remaining, 50); + // First two RGs skipped, third still scanned (not fully matched) + let indexes: Vec = filter.row_group_indexes().collect(); + assert_eq!(indexes, vec![2]); + } + + #[test] + fn test_prune_by_offset_zero() { + // offset=0 → no pruning, remaining=0. + let rg_metadata: Vec = + (0..3).map(|_| make_row_group_meta(100)).collect(); + let metrics = parquet_file_metrics(); + let mut filter = make_filter_with_fully_matched(3, vec![true, true, true]); + + let remaining = filter.prune_by_offset(0, false, &rg_metadata, &metrics); + assert_eq!(remaining, 0); + // All RGs should still be scanned + let indexes: Vec = filter.row_group_indexes().collect(); + assert_eq!(indexes, vec![0, 1, 2]); + } + + #[test] + fn test_prune_by_offset_exact_boundary() { + // 3 RGs each 100 rows. offset=200 → skip exactly 2 RGs, remaining=0. + let rg_metadata: Vec = + (0..3).map(|_| make_row_group_meta(100)).collect(); + let metrics = parquet_file_metrics(); + let mut filter = make_filter_with_fully_matched(3, vec![true, true, true]); + + let remaining = filter.prune_by_offset(200, false, &rg_metadata, &metrics); + assert_eq!(remaining, 0); + // First two RGs skipped, third still scanned + let indexes: Vec = filter.row_group_indexes().collect(); + assert_eq!(indexes, vec![2]); + } + + #[test] + fn test_prune_by_offset_exceeds_total() { + // offset=400 > total 300 rows → skip all fully_matched RGs, + // remaining = 400 - 300 = 100. + let rg_metadata: Vec = + (0..3).map(|_| make_row_group_meta(100)).collect(); + let metrics = parquet_file_metrics(); + let mut filter = make_filter_with_fully_matched(3, vec![true, true, true]); + + let remaining = filter.prune_by_offset(400, false, &rg_metadata, &metrics); + assert_eq!(remaining, 100); + // All RGs should be skipped + let indexes: Vec = filter.row_group_indexes().collect(); + assert!(indexes.is_empty()); + } + + #[test] + fn test_prune_by_offset_partial_rg() { + // offset=50 (less than first RG of 100 rows) → don't skip any RG, + // remaining=50. + let rg_metadata: Vec = + (0..3).map(|_| make_row_group_meta(100)).collect(); + let metrics = parquet_file_metrics(); + let mut filter = make_filter_with_fully_matched(3, vec![true, true, true]); + + let remaining = filter.prune_by_offset(50, false, &rg_metadata, &metrics); + assert_eq!(remaining, 50); + // No RGs should be skipped since offset < first RG's row count + let indexes: Vec = filter.row_group_indexes().collect(); + assert_eq!(indexes, vec![0, 1, 2]); + } + #[tokio::test] async fn test_row_group_bloom_filter_pruning_predicate_simple_expr() { BloomFilterTest::new_data_index_bloom_encoding_stats() diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs index a014c8b2726e7..ca60f9eca8579 100644 --- a/datafusion/datasource-parquet/src/source.rs +++ b/datafusion/datasource-parquet/src/source.rs @@ -560,6 +560,9 @@ impl FileSource for ParquetSource { .batch_size .expect("Batch size must set before creating ParquetMorselizer"), limit: base_config.limit, + remaining_offset: Arc::new(std::sync::atomic::AtomicUsize::new( + base_config.offset.unwrap_or(0), + )), preserve_order: base_config.preserve_order, predicate: self.predicate.clone(), table_schema: self.table_schema.clone(), @@ -583,6 +586,10 @@ impl FileSource for ParquetSource { })) } + fn supports_offset(&self) -> bool { + true + } + fn table_schema(&self) -> &TableSchema { &self.table_schema } diff --git a/datafusion/datasource/src/file.rs b/datafusion/datasource/src/file.rs index 9b4ae5827ae8b..aba3e8578292d 100644 --- a/datafusion/datasource/src/file.rs +++ b/datafusion/datasource/src/file.rs @@ -280,6 +280,13 @@ pub trait FileSource: Any + Send + Sync { Ok(SortOrderPushdownResult::Unsupported) } + /// Whether this source can efficiently skip rows for OFFSET queries + /// (e.g., by skipping entire row groups based on row counts). + /// Default: false. + fn supports_offset(&self) -> bool { + false + } + /// Try to push down a projection into this FileSource. /// /// `FileSource` implementations that support projection pushdown should diff --git a/datafusion/datasource/src/file_scan_config/mod.rs b/datafusion/datasource/src/file_scan_config/mod.rs index 04b74528d5ac1..f2c274871d392 100644 --- a/datafusion/datasource/src/file_scan_config/mod.rs +++ b/datafusion/datasource/src/file_scan_config/mod.rs @@ -169,6 +169,10 @@ pub struct FileScanConfig { /// The maximum number of records to read from this plan. If `None`, /// all records after filtering are returned. pub limit: Option, + /// The number of rows to skip before returning results. + /// When combined with `limit`, this enables efficient OFFSET handling + /// at the file scan level by skipping entire row groups when possible. + pub offset: Option, /// Whether the scan's limit is order sensitive /// When `true`, files must be read in the exact order specified to produce /// correct results (e.g., for `ORDER BY ... LIMIT` queries). When `false`, @@ -271,6 +275,7 @@ pub struct FileScanConfigBuilder { object_store_url: ObjectStoreUrl, file_source: Arc, limit: Option, + offset: Option, preserve_order: bool, constraints: Option, file_groups: Vec, @@ -301,6 +306,7 @@ impl FileScanConfigBuilder { output_ordering: vec![], file_compression_type: None, limit: None, + offset: None, preserve_order: false, constraints: None, batch_size: None, @@ -317,6 +323,15 @@ impl FileScanConfigBuilder { self } + /// Set the number of rows to skip before returning results. + /// + /// When combined with a limit, this enables efficient OFFSET handling + /// at the file scan level by skipping entire row groups when possible. + pub fn with_offset(mut self, offset: Option) -> Self { + self.offset = offset; + self + } + /// Set whether the limit should be order-sensitive. /// /// When `true`, files must be read in the exact order specified to produce @@ -518,6 +533,7 @@ impl FileScanConfigBuilder { object_store_url, file_source, limit, + offset, preserve_order, constraints, file_groups, @@ -543,6 +559,7 @@ impl FileScanConfigBuilder { object_store_url, file_source, limit, + offset, preserve_order, constraints, file_groups, @@ -566,6 +583,7 @@ impl From for FileScanConfigBuilder { output_ordering: config.output_ordering, file_compression_type: Some(config.file_compression_type), limit: config.limit, + offset: config.offset, preserve_order: config.preserve_order, constraints: Some(config.constraints), batch_size: config.batch_size, @@ -660,6 +678,10 @@ impl DataSource for FileScanConfig { write!(f, ", limit={limit}")?; } + if let Some(offset) = self.offset { + write!(f, ", offset={offset}")?; + } + display_orderings(f, &orderings)?; if !self.constraints.is_empty() { @@ -849,6 +871,25 @@ impl DataSource for FileScanConfig { self.limit } + fn with_offset(&self, offset: usize) -> Option> { + // Accept offset when source can efficiently skip rows (parquet) + // and no filter is present (row counts must be exact). + // For multi-partition single file: each partition independently + // skips RGs within its byte range. GlobalLimitExec is kept + // when multiple partitions exist to handle cross-partition offset. + if !self.file_source.supports_offset() || self.file_source.filter().is_some() { + return None; + } + let source = FileScanConfigBuilder::from(self.clone()) + .with_offset(Some(offset)) + .build(); + Some(Arc::new(source)) + } + + fn offset(&self) -> Option { + self.offset + } + fn metrics(&self) -> ExecutionPlanMetricsSet { self.file_source.metrics().clone() } @@ -1360,6 +1401,10 @@ impl DisplayAs for FileScanConfig { write!(f, ", limit={limit}")?; } + if let Some(offset) = self.offset { + write!(f, ", offset={offset}")?; + } + display_orderings(f, &orderings)?; if !self.constraints.is_empty() { diff --git a/datafusion/datasource/src/source.rs b/datafusion/datasource/src/source.rs index 420c6b508ce4f..7caf6d80cad9e 100644 --- a/datafusion/datasource/src/source.rs +++ b/datafusion/datasource/src/source.rs @@ -174,6 +174,18 @@ pub trait DataSource: Any + Send + Sync + Debug { /// Return a copy of this DataSource with a new fetch limit fn with_fetch(&self, _limit: Option) -> Option>; fn fetch(&self) -> Option; + + /// Return a copy of this DataSource with a new offset (number of rows to skip). + /// Returns `None` if the data source does not support offset pushdown. + fn with_offset(&self, _offset: usize) -> Option> { + None + } + + /// Gets the offset for the operator, `None` means there is no offset. + fn offset(&self) -> Option { + None + } + fn metrics(&self) -> ExecutionPlanMetricsSet { ExecutionPlanMetricsSet::new() } @@ -471,6 +483,22 @@ impl ExecutionPlan for DataSourceExec { self.data_source.fetch() } + fn with_offset(&self, offset: usize) -> Option> { + let data_source = self.data_source.with_offset(offset)?; + let cache = Arc::clone(&self.cache); + let execution_state = Arc::new(OnceLock::new()); + + Some(Arc::new(Self { + data_source, + cache, + execution_state, + })) + } + + fn offset(&self) -> Option { + self.data_source.offset() + } + fn try_swapping_with_projection( &self, projection: &ProjectionExec, diff --git a/datafusion/physical-optimizer/src/limit_pushdown.rs b/datafusion/physical-optimizer/src/limit_pushdown.rs index c5fa0cc3ee78c..fa6b644876fc4 100644 --- a/datafusion/physical-optimizer/src/limit_pushdown.rs +++ b/datafusion/physical-optimizer/src/limit_pushdown.rs @@ -268,11 +268,20 @@ pub fn pushdown_limit_helper( .unwrap_or(plan_with_fetch); if global_skip > 0 { - add_global_limit( - plan_with_preserve_order, - global_skip, - Some(global_fetch), - ) + // Push offset to the plan. If accepted, the source + // handles offset via shared atomic counter across + // partitions — safe to eliminate GlobalLimitExec. + if let Some(plan_with_offset) = + plan_with_preserve_order.with_offset(global_skip) + { + plan_with_offset + } else { + add_global_limit( + plan_with_preserve_order, + global_skip, + Some(global_fetch), + ) + } } else { plan_with_preserve_order } diff --git a/datafusion/physical-plan/src/execution_plan.rs b/datafusion/physical-plan/src/execution_plan.rs index 1a67ea0ded11b..342dad4e70792 100644 --- a/datafusion/physical-plan/src/execution_plan.rs +++ b/datafusion/physical-plan/src/execution_plan.rs @@ -588,6 +588,18 @@ pub trait ExecutionPlan: Any + Debug + DisplayAs + Send + Sync { None } + /// Returns a variant of this `ExecutionPlan` with the given offset + /// (number of rows to skip). Returns `None` if offset pushdown is + /// not supported. + fn with_offset(&self, _offset: usize) -> Option> { + None + } + + /// Gets the offset for the operator, `None` means there is no offset. + fn offset(&self) -> Option { + None + } + /// Gets the effect on cardinality, if known fn cardinality_effect(&self) -> CardinalityEffect { CardinalityEffect::Unknown diff --git a/datafusion/sqllogictest/test_files/dynamic_filter_pushdown_config.slt b/datafusion/sqllogictest/test_files/dynamic_filter_pushdown_config.slt index 179a611d37e1f..61d885c04037a 100644 --- a/datafusion/sqllogictest/test_files/dynamic_filter_pushdown_config.slt +++ b/datafusion/sqllogictest/test_files/dynamic_filter_pushdown_config.slt @@ -104,7 +104,7 @@ Plan with Metrics 03)----ProjectionExec: expr=[id@0 as id, value@1 as v, value@1 + id@0 as name], metrics=[output_rows=10, ] 04)------FilterExec: value@1 > 3, metrics=[output_rows=10, , selectivity=100% (10/10)] 05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, metrics=[output_rows=10, ] -06)----------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/test_data.parquet]]}, projection=[id, value], file_type=parquet, predicate=value@1 > 3 AND DynamicFilter [ value@1 IS NULL OR value@1 > 800 ], pruning_predicate=value_null_count@1 != row_count@2 AND value_max@0 > 3 AND (value_null_count@1 > 0 OR value_null_count@1 != row_count@2 AND value_max@0 > 800), required_guarantees=[], metrics=[output_rows=10, elapsed_compute=, output_bytes=80.0 B, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched -> 1 fully matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_pages_pruned=1 total → 1 matched, limit_pruned_row_groups=0 total → 0 matched, bytes_scanned=210, metadata_load_time=, scan_efficiency_ratio=18.31% (210/1.15 K)] +06)----------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/test_data.parquet]]}, projection=[id, value], file_type=parquet, predicate=value@1 > 3 AND DynamicFilter [ value@1 IS NULL OR value@1 > 800 ], pruning_predicate=value_null_count@1 != row_count@2 AND value_max@0 > 3 AND (value_null_count@1 > 0 OR value_null_count@1 != row_count@2 AND value_max@0 > 800), required_guarantees=[], metrics=[output_rows=10, elapsed_compute=, output_bytes=80.0 B, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched -> 1 fully matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_pages_pruned=1 total → 1 matched, limit_pruned_row_groups=0 total → 0 matched, offset_pruned_row_groups=0 total → 0 matched, bytes_scanned=210, metadata_load_time=, scan_efficiency_ratio=18.31% (210/1.15 K)] statement ok set datafusion.explain.analyze_level = dev; diff --git a/datafusion/sqllogictest/test_files/explain_analyze.slt b/datafusion/sqllogictest/test_files/explain_analyze.slt index 561c1a9dcbc87..18014021a72bd 100644 --- a/datafusion/sqllogictest/test_files/explain_analyze.slt +++ b/datafusion/sqllogictest/test_files/explain_analyze.slt @@ -240,7 +240,7 @@ explain analyze select * from cat_tracking where species > 'M' AND s >= 50 order ---- Plan with Metrics 01)SortExec: TopK(fetch=3), expr=[species@0 ASC NULLS LAST], preserve_partitioning=[false], filter=[species@0 < Nlpine Sheep], metrics=[output_rows=3] -02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/explain_analyze/data.parquet]]}, projection=[species, s], file_type=parquet, predicate=species@0 > M AND s@1 >= 50 AND DynamicFilter [ species@0 < Nlpine Sheep ], pruning_predicate=species_null_count@1 != row_count@2 AND species_max@0 > M AND s_null_count@4 != row_count@2 AND s_max@3 >= 50 AND species_null_count@1 != row_count@2 AND species_min@5 < Nlpine Sheep, required_guarantees=[], metrics=[output_rows=3, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=4 total → 3 matched -> 1 fully matched, row_groups_pruned_bloom_filter=3 total → 3 matched, page_index_pages_pruned=6 total → 6 matched, limit_pruned_row_groups=0 total → 0 matched, scan_efficiency_ratio=22.13% (521/2.35 K)] +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/explain_analyze/data.parquet]]}, projection=[species, s], file_type=parquet, predicate=species@0 > M AND s@1 >= 50 AND DynamicFilter [ species@0 < Nlpine Sheep ], pruning_predicate=species_null_count@1 != row_count@2 AND species_max@0 > M AND s_null_count@4 != row_count@2 AND s_max@3 >= 50 AND species_null_count@1 != row_count@2 AND species_min@5 < Nlpine Sheep, required_guarantees=[], metrics=[output_rows=3, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=4 total → 3 matched -> 1 fully matched, row_groups_pruned_bloom_filter=3 total → 3 matched, page_index_pages_pruned=6 total → 6 matched, limit_pruned_row_groups=0 total → 0 matched, offset_pruned_row_groups=0 total → 0 matched, scan_efficiency_ratio=22.13% (521/2.35 K)] statement ok reset datafusion.explain.analyze_categories; @@ -255,7 +255,7 @@ explain analyze select * from cat_tracking where species > 'M' AND s >= 50 order ---- Plan with Metrics 01)SortExec: TopK(fetch=3), expr=[species@0 ASC NULLS LAST], preserve_partitioning=[false], filter=[species@0 < Nlpine Sheep], metrics=[output_rows=3, output_bytes=] -02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/explain_analyze/data.parquet]]}, projection=[species, s], file_type=parquet, predicate=species@0 > M AND s@1 >= 50 AND DynamicFilter [ species@0 < Nlpine Sheep ], pruning_predicate=species_null_count@1 != row_count@2 AND species_max@0 > M AND s_null_count@4 != row_count@2 AND s_max@3 >= 50 AND species_null_count@1 != row_count@2 AND species_min@5 < Nlpine Sheep, required_guarantees=[], metrics=[output_rows=3, output_bytes=, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=4 total → 3 matched -> 1 fully matched, row_groups_pruned_bloom_filter=3 total → 3 matched, page_index_pages_pruned=6 total → 6 matched, limit_pruned_row_groups=0 total → 0 matched, bytes_scanned=, scan_efficiency_ratio=] +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/explain_analyze/data.parquet]]}, projection=[species, s], file_type=parquet, predicate=species@0 > M AND s@1 >= 50 AND DynamicFilter [ species@0 < Nlpine Sheep ], pruning_predicate=species_null_count@1 != row_count@2 AND species_max@0 > M AND s_null_count@4 != row_count@2 AND s_max@3 >= 50 AND species_null_count@1 != row_count@2 AND species_min@5 < Nlpine Sheep, required_guarantees=[], metrics=[output_rows=3, output_bytes=, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=4 total → 3 matched -> 1 fully matched, row_groups_pruned_bloom_filter=3 total → 3 matched, page_index_pages_pruned=6 total → 6 matched, limit_pruned_row_groups=0 total → 0 matched, offset_pruned_row_groups=0 total → 0 matched, bytes_scanned=, scan_efficiency_ratio=] statement ok reset datafusion.explain.analyze_categories; @@ -270,7 +270,7 @@ explain analyze select * from cat_tracking where species > 'M' AND s >= 50 order ---- Plan with Metrics 01)SortExec: TopK(fetch=3), expr=[species@0 ASC NULLS LAST], preserve_partitioning=[false], filter=[species@0 < Nlpine Sheep], metrics=[output_rows=3, output_bytes=] -02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/explain_analyze/data.parquet]]}, projection=[species, s], file_type=parquet, predicate=species@0 > M AND s@1 >= 50 AND DynamicFilter [ species@0 < Nlpine Sheep ], pruning_predicate=species_null_count@1 != row_count@2 AND species_max@0 > M AND s_null_count@4 != row_count@2 AND s_max@3 >= 50 AND species_null_count@1 != row_count@2 AND species_min@5 < Nlpine Sheep, required_guarantees=[], metrics=[output_rows=3, output_bytes=, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=4 total → 3 matched -> 1 fully matched, row_groups_pruned_bloom_filter=3 total → 3 matched, page_index_pages_pruned=6 total → 6 matched, limit_pruned_row_groups=0 total → 0 matched, bytes_scanned=, scan_efficiency_ratio=] +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/explain_analyze/data.parquet]]}, projection=[species, s], file_type=parquet, predicate=species@0 > M AND s@1 >= 50 AND DynamicFilter [ species@0 < Nlpine Sheep ], pruning_predicate=species_null_count@1 != row_count@2 AND species_max@0 > M AND s_null_count@4 != row_count@2 AND s_max@3 >= 50 AND species_null_count@1 != row_count@2 AND species_min@5 < Nlpine Sheep, required_guarantees=[], metrics=[output_rows=3, output_bytes=, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=4 total → 3 matched -> 1 fully matched, row_groups_pruned_bloom_filter=3 total → 3 matched, page_index_pages_pruned=6 total → 6 matched, limit_pruned_row_groups=0 total → 0 matched, offset_pruned_row_groups=0 total → 0 matched, bytes_scanned=, scan_efficiency_ratio=] statement ok reset datafusion.explain.analyze_categories; diff --git a/datafusion/sqllogictest/test_files/limit_pruning.slt b/datafusion/sqllogictest/test_files/limit_pruning.slt index 34acb98f60033..8534abc35fadc 100644 --- a/datafusion/sqllogictest/test_files/limit_pruning.slt +++ b/datafusion/sqllogictest/test_files/limit_pruning.slt @@ -63,7 +63,7 @@ set datafusion.explain.analyze_level = summary; query TT explain analyze select * from tracking_data where species > 'M' AND s >= 50 limit 3; ---- -Plan with Metrics DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/limit_pruning/data.parquet]]}, projection=[species, s], limit=3, file_type=parquet, predicate=species@0 > M AND s@1 >= 50, pruning_predicate=species_null_count@1 != row_count@2 AND species_max@0 > M AND s_null_count@4 != row_count@2 AND s_max@3 >= 50, required_guarantees=[], metrics=[output_rows=3, elapsed_compute=, output_bytes=, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=4 total → 3 matched -> 1 fully matched, row_groups_pruned_bloom_filter=3 total → 3 matched, page_index_pages_pruned=2 total → 2 matched, limit_pruned_row_groups=2 total → 0 matched, bytes_scanned=, metadata_load_time=, scan_efficiency_ratio= (171/2.35 K)] +Plan with Metrics DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/limit_pruning/data.parquet]]}, projection=[species, s], limit=3, file_type=parquet, predicate=species@0 > M AND s@1 >= 50, pruning_predicate=species_null_count@1 != row_count@2 AND species_max@0 > M AND s_null_count@4 != row_count@2 AND s_max@3 >= 50, required_guarantees=[], metrics=[output_rows=3, elapsed_compute=, output_bytes=, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=4 total → 3 matched -> 1 fully matched, row_groups_pruned_bloom_filter=3 total → 3 matched, page_index_pages_pruned=2 total → 2 matched, limit_pruned_row_groups=2 total → 0 matched, offset_pruned_row_groups=0 total → 0 matched, bytes_scanned=, metadata_load_time=, scan_efficiency_ratio= (171/2.35 K)] # limit_pruned_row_groups=0 total → 0 matched # because of order by, scan needs to preserve sort, so limit pruning is disabled @@ -72,7 +72,7 @@ explain analyze select * from tracking_data where species > 'M' AND s >= 50 orde ---- Plan with Metrics 01)SortExec: TopK(fetch=3), expr=[species@0 ASC NULLS LAST], preserve_partitioning=[false], filter=[species@0 < Nlpine Sheep], metrics=[output_rows=3, elapsed_compute=, output_bytes=] -02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/limit_pruning/data.parquet]]}, projection=[species, s], file_type=parquet, predicate=species@0 > M AND s@1 >= 50 AND DynamicFilter [ species@0 < Nlpine Sheep ], pruning_predicate=species_null_count@1 != row_count@2 AND species_max@0 > M AND s_null_count@4 != row_count@2 AND s_max@3 >= 50 AND species_null_count@1 != row_count@2 AND species_min@5 < Nlpine Sheep, required_guarantees=[], metrics=[output_rows=3, elapsed_compute=, output_bytes=, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=4 total → 3 matched -> 1 fully matched, row_groups_pruned_bloom_filter=3 total → 3 matched, page_index_pages_pruned=6 total → 6 matched, limit_pruned_row_groups=0 total → 0 matched, bytes_scanned=, metadata_load_time=, scan_efficiency_ratio= (521/2.35 K)] +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/limit_pruning/data.parquet]]}, projection=[species, s], file_type=parquet, predicate=species@0 > M AND s@1 >= 50 AND DynamicFilter [ species@0 < Nlpine Sheep ], pruning_predicate=species_null_count@1 != row_count@2 AND species_max@0 > M AND s_null_count@4 != row_count@2 AND s_max@3 >= 50 AND species_null_count@1 != row_count@2 AND species_min@5 < Nlpine Sheep, required_guarantees=[], metrics=[output_rows=3, elapsed_compute=, output_bytes=, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=4 total → 3 matched -> 1 fully matched, row_groups_pruned_bloom_filter=3 total → 3 matched, page_index_pages_pruned=6 total → 6 matched, limit_pruned_row_groups=0 total → 0 matched, offset_pruned_row_groups=0 total → 0 matched, bytes_scanned=, metadata_load_time=, scan_efficiency_ratio= (521/2.35 K)] statement ok drop table tracking_data; diff --git a/datafusion/sqllogictest/test_files/push_down_filter_parquet.slt b/datafusion/sqllogictest/test_files/push_down_filter_parquet.slt index 8469c32a17033..adfa013953c63 100644 --- a/datafusion/sqllogictest/test_files/push_down_filter_parquet.slt +++ b/datafusion/sqllogictest/test_files/push_down_filter_parquet.slt @@ -206,7 +206,7 @@ EXPLAIN ANALYZE SELECT t FROM topk_pushdown ORDER BY t * t LIMIT 10; ---- Plan with Metrics 01)SortExec: TopK(fetch=10), expr=[t@0 * t@0 ASC NULLS LAST], preserve_partitioning=[false], filter=[t@0 * t@0 < 1884329474306198481], metrics=[output_rows=10, output_batches=1, row_replacements=10] -02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/topk_pushdown.parquet]]}, projection=[t], output_ordering=[t@0 ASC NULLS LAST], file_type=parquet, predicate=DynamicFilter [ t@0 * t@0 < 1884329474306198481 ], metrics=[output_rows=128, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=782 total → 782 matched, row_groups_pruned_bloom_filter=782 total → 782 matched, page_index_pages_pruned=0 total → 0 matched, page_index_rows_pruned=0 total → 0 matched, limit_pruned_row_groups=0 total → 0 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, predicate_evaluation_errors=0, pushdown_rows_matched=128, pushdown_rows_pruned=99.87 K, predicate_cache_inner_records=128, predicate_cache_records=128, scan_efficiency_ratio=64.87% (258.7 K/398.8 K)] +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/topk_pushdown.parquet]]}, projection=[t], output_ordering=[t@0 ASC NULLS LAST], file_type=parquet, predicate=DynamicFilter [ t@0 * t@0 < 1884329474306198481 ], metrics=[output_rows=128, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=782 total → 782 matched, row_groups_pruned_bloom_filter=782 total → 782 matched, page_index_pages_pruned=0 total → 0 matched, page_index_rows_pruned=0 total → 0 matched, limit_pruned_row_groups=0 total → 0 matched, offset_pruned_row_groups=0 total → 0 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, predicate_evaluation_errors=0, pushdown_rows_matched=128, pushdown_rows_pruned=99.87 K, predicate_cache_inner_records=128, predicate_cache_records=128, scan_efficiency_ratio=64.87% (258.7 K/398.8 K)] statement ok reset datafusion.explain.analyze_categories; @@ -268,7 +268,7 @@ EXPLAIN ANALYZE SELECT * FROM topk_single_col ORDER BY b DESC LIMIT 1; ---- Plan with Metrics 01)SortExec: TopK(fetch=1), expr=[b@1 DESC], preserve_partitioning=[false], filter=[b@1 IS NULL OR b@1 > bd], metrics=[output_rows=1, output_batches=1, row_replacements=1] -02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/topk_single_col.parquet]]}, projection=[a, b, c], file_type=parquet, predicate=DynamicFilter [ b@1 IS NULL OR b@1 > bd ], pruning_predicate=b_null_count@0 > 0 OR b_null_count@0 != row_count@2 AND b_max@1 > bd, required_guarantees=[], metrics=[output_rows=4, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_pages_pruned=0 total → 0 matched, page_index_rows_pruned=0 total → 0 matched, limit_pruned_row_groups=0 total → 0 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, predicate_evaluation_errors=0, pushdown_rows_matched=4, pushdown_rows_pruned=0, predicate_cache_inner_records=4, predicate_cache_records=4, scan_efficiency_ratio=22.37% (240/1.07 K)] +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/topk_single_col.parquet]]}, projection=[a, b, c], file_type=parquet, predicate=DynamicFilter [ b@1 IS NULL OR b@1 > bd ], pruning_predicate=b_null_count@0 > 0 OR b_null_count@0 != row_count@2 AND b_max@1 > bd, required_guarantees=[], metrics=[output_rows=4, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_pages_pruned=0 total → 0 matched, page_index_rows_pruned=0 total → 0 matched, limit_pruned_row_groups=0 total → 0 matched, offset_pruned_row_groups=0 total → 0 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, predicate_evaluation_errors=0, pushdown_rows_matched=4, pushdown_rows_pruned=0, predicate_cache_inner_records=4, predicate_cache_records=4, scan_efficiency_ratio=22.37% (240/1.07 K)] statement ok reset datafusion.explain.analyze_categories; @@ -319,7 +319,7 @@ EXPLAIN ANALYZE SELECT * FROM topk_multi_col ORDER BY b ASC NULLS LAST, a DESC L ---- Plan with Metrics 01)SortExec: TopK(fetch=2), expr=[b@1 ASC NULLS LAST, a@0 DESC], preserve_partitioning=[false], filter=[b@1 < bb OR b@1 = bb AND (a@0 IS NULL OR a@0 > ac)], metrics=[output_rows=2, output_batches=1, row_replacements=2] -02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/topk_multi_col.parquet]]}, projection=[a, b, c], file_type=parquet, predicate=DynamicFilter [ b@1 < bb OR b@1 = bb AND (a@0 IS NULL OR a@0 > ac) ], pruning_predicate=b_null_count@1 != row_count@2 AND b_min@0 < bb OR b_null_count@1 != row_count@2 AND b_min@0 <= bb AND bb <= b_max@3 AND (a_null_count@4 > 0 OR a_null_count@4 != row_count@2 AND a_max@5 > ac), required_guarantees=[], metrics=[output_rows=4, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_pages_pruned=0 total → 0 matched, page_index_rows_pruned=0 total → 0 matched, limit_pruned_row_groups=0 total → 0 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, predicate_evaluation_errors=0, pushdown_rows_matched=4, pushdown_rows_pruned=0, predicate_cache_inner_records=8, predicate_cache_records=8, scan_efficiency_ratio=22.37% (240/1.07 K)] +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/topk_multi_col.parquet]]}, projection=[a, b, c], file_type=parquet, predicate=DynamicFilter [ b@1 < bb OR b@1 = bb AND (a@0 IS NULL OR a@0 > ac) ], pruning_predicate=b_null_count@1 != row_count@2 AND b_min@0 < bb OR b_null_count@1 != row_count@2 AND b_min@0 <= bb AND bb <= b_max@3 AND (a_null_count@4 > 0 OR a_null_count@4 != row_count@2 AND a_max@5 > ac), required_guarantees=[], metrics=[output_rows=4, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_pages_pruned=0 total → 0 matched, page_index_rows_pruned=0 total → 0 matched, limit_pruned_row_groups=0 total → 0 matched, offset_pruned_row_groups=0 total → 0 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, predicate_evaluation_errors=0, pushdown_rows_matched=4, pushdown_rows_pruned=0, predicate_cache_inner_records=8, predicate_cache_records=8, scan_efficiency_ratio=22.37% (240/1.07 K)] statement ok reset datafusion.explain.analyze_categories; @@ -388,8 +388,8 @@ FROM join_probe p INNER JOIN join_build AS build ---- Plan with Metrics 01)HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)], projection=[a@3, b@4, c@2, e@5], metrics=[output_rows=2, output_batches=1, array_map_created_count=0, build_input_batches=1, build_input_rows=2, input_batches=1, input_rows=2, avg_fanout=100% (2/2), probe_hit_rate=100% (2/2)] -02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/join_build.parquet]]}, projection=[a, b, c], file_type=parquet, metrics=[output_rows=2, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_pages_pruned=0 total → 0 matched, page_index_rows_pruned=0 total → 0 matched, limit_pruned_row_groups=0 total → 0 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, predicate_evaluation_errors=0, pushdown_rows_matched=0, pushdown_rows_pruned=0, predicate_cache_inner_records=0, predicate_cache_records=0, scan_efficiency_ratio=20.48% (214/1.04 K)] -03)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/join_probe.parquet]]}, projection=[a, b, e], file_type=parquet, predicate=DynamicFilter [ a@0 >= aa AND a@0 <= ab AND b@1 >= ba AND b@1 <= bb AND struct(a@0, b@1) IN (SET) ([{c0:aa,c1:ba}, {c0:ab,c1:bb}]) ], pruning_predicate=a_null_count@1 != row_count@2 AND a_max@0 >= aa AND a_null_count@1 != row_count@2 AND a_min@3 <= ab AND b_null_count@5 != row_count@2 AND b_max@4 >= ba AND b_null_count@5 != row_count@2 AND b_min@6 <= bb, required_guarantees=[], metrics=[output_rows=2, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_pages_pruned=0 total → 0 matched, page_index_rows_pruned=0 total → 0 matched, limit_pruned_row_groups=0 total → 0 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, predicate_evaluation_errors=0, pushdown_rows_matched=2, pushdown_rows_pruned=2, predicate_cache_inner_records=8, predicate_cache_records=4, scan_efficiency_ratio=22.78% (246/1.08 K)] +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/join_build.parquet]]}, projection=[a, b, c], file_type=parquet, metrics=[output_rows=2, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_pages_pruned=0 total → 0 matched, page_index_rows_pruned=0 total → 0 matched, limit_pruned_row_groups=0 total → 0 matched, offset_pruned_row_groups=0 total → 0 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, predicate_evaluation_errors=0, pushdown_rows_matched=0, pushdown_rows_pruned=0, predicate_cache_inner_records=0, predicate_cache_records=0, scan_efficiency_ratio=20.48% (214/1.04 K)] +03)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/join_probe.parquet]]}, projection=[a, b, e], file_type=parquet, predicate=DynamicFilter [ a@0 >= aa AND a@0 <= ab AND b@1 >= ba AND b@1 <= bb AND struct(a@0, b@1) IN (SET) ([{c0:aa,c1:ba}, {c0:ab,c1:bb}]) ], pruning_predicate=a_null_count@1 != row_count@2 AND a_max@0 >= aa AND a_null_count@1 != row_count@2 AND a_min@3 <= ab AND b_null_count@5 != row_count@2 AND b_max@4 >= ba AND b_null_count@5 != row_count@2 AND b_min@6 <= bb, required_guarantees=[], metrics=[output_rows=2, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_pages_pruned=0 total → 0 matched, page_index_rows_pruned=0 total → 0 matched, limit_pruned_row_groups=0 total → 0 matched, offset_pruned_row_groups=0 total → 0 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, predicate_evaluation_errors=0, pushdown_rows_matched=2, pushdown_rows_pruned=2, predicate_cache_inner_records=8, predicate_cache_records=4, scan_efficiency_ratio=22.78% (246/1.08 K)] statement ok reset datafusion.explain.analyze_categories; @@ -474,9 +474,9 @@ INNER JOIN nested_t3 ON nested_t2.c = nested_t3.d; Plan with Metrics 01)HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(c@3, d@0)], metrics=[output_rows=2, output_batches=1, array_map_created_count=0, build_input_batches=1, build_input_rows=2, input_batches=1, input_rows=2, avg_fanout=100% (2/2), probe_hit_rate=100% (2/2)] 02)--HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, b@0)], metrics=[output_rows=2, output_batches=1, array_map_created_count=0, build_input_batches=1, build_input_rows=2, input_batches=1, input_rows=2, avg_fanout=100% (2/2), probe_hit_rate=100% (2/2)] -03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/nested_t1.parquet]]}, projection=[a, x], file_type=parquet, metrics=[output_rows=2, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_pages_pruned=0 total → 0 matched, page_index_rows_pruned=0 total → 0 matched, limit_pruned_row_groups=0 total → 0 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, predicate_evaluation_errors=0, pushdown_rows_matched=0, pushdown_rows_pruned=0, predicate_cache_inner_records=0, predicate_cache_records=0, scan_efficiency_ratio=18.23% (144/790)] -04)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/nested_t2.parquet]]}, projection=[b, c, y], file_type=parquet, predicate=DynamicFilter [ b@0 >= aa AND b@0 <= ab AND b@0 IN (SET) ([aa, ab]) ], pruning_predicate=b_null_count@1 != row_count@2 AND b_max@0 >= aa AND b_null_count@1 != row_count@2 AND b_min@3 <= ab AND (b_null_count@1 != row_count@2 AND b_min@3 <= aa AND aa <= b_max@0 OR b_null_count@1 != row_count@2 AND b_min@3 <= ab AND ab <= b_max@0), required_guarantees=[b in (aa, ab)], metrics=[output_rows=2, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_pages_pruned=1 total → 1 matched, page_index_rows_pruned=5 total → 5 matched, limit_pruned_row_groups=0 total → 0 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, predicate_evaluation_errors=0, pushdown_rows_matched=2, pushdown_rows_pruned=3, predicate_cache_inner_records=5, predicate_cache_records=2, scan_efficiency_ratio=23.2% (252/1.09 K)] -05)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/nested_t3.parquet]]}, projection=[d, z], file_type=parquet, predicate=DynamicFilter [ d@0 >= ca AND d@0 <= cb AND hash_lookup ], pruning_predicate=d_null_count@1 != row_count@2 AND d_max@0 >= ca AND d_null_count@1 != row_count@2 AND d_min@3 <= cb, required_guarantees=[], metrics=[output_rows=2, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_pages_pruned=1 total → 1 matched, page_index_rows_pruned=8 total → 8 matched, limit_pruned_row_groups=0 total → 0 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, predicate_evaluation_errors=0, pushdown_rows_matched=2, pushdown_rows_pruned=6, predicate_cache_inner_records=8, predicate_cache_records=2, scan_efficiency_ratio=22.12% (184/832)] +03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/nested_t1.parquet]]}, projection=[a, x], file_type=parquet, metrics=[output_rows=2, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_pages_pruned=0 total → 0 matched, page_index_rows_pruned=0 total → 0 matched, limit_pruned_row_groups=0 total → 0 matched, offset_pruned_row_groups=0 total → 0 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, predicate_evaluation_errors=0, pushdown_rows_matched=0, pushdown_rows_pruned=0, predicate_cache_inner_records=0, predicate_cache_records=0, scan_efficiency_ratio=18.23% (144/790)] +04)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/nested_t2.parquet]]}, projection=[b, c, y], file_type=parquet, predicate=DynamicFilter [ b@0 >= aa AND b@0 <= ab AND b@0 IN (SET) ([aa, ab]) ], pruning_predicate=b_null_count@1 != row_count@2 AND b_max@0 >= aa AND b_null_count@1 != row_count@2 AND b_min@3 <= ab AND (b_null_count@1 != row_count@2 AND b_min@3 <= aa AND aa <= b_max@0 OR b_null_count@1 != row_count@2 AND b_min@3 <= ab AND ab <= b_max@0), required_guarantees=[b in (aa, ab)], metrics=[output_rows=2, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_pages_pruned=1 total → 1 matched, page_index_rows_pruned=5 total → 5 matched, limit_pruned_row_groups=0 total → 0 matched, offset_pruned_row_groups=0 total → 0 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, predicate_evaluation_errors=0, pushdown_rows_matched=2, pushdown_rows_pruned=3, predicate_cache_inner_records=5, predicate_cache_records=2, scan_efficiency_ratio=23.2% (252/1.09 K)] +05)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/nested_t3.parquet]]}, projection=[d, z], file_type=parquet, predicate=DynamicFilter [ d@0 >= ca AND d@0 <= cb AND hash_lookup ], pruning_predicate=d_null_count@1 != row_count@2 AND d_max@0 >= ca AND d_null_count@1 != row_count@2 AND d_min@3 <= cb, required_guarantees=[], metrics=[output_rows=2, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_pages_pruned=1 total → 1 matched, page_index_rows_pruned=8 total → 8 matched, limit_pruned_row_groups=0 total → 0 matched, offset_pruned_row_groups=0 total → 0 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, predicate_evaluation_errors=0, pushdown_rows_matched=2, pushdown_rows_pruned=6, predicate_cache_inner_records=8, predicate_cache_records=2, scan_efficiency_ratio=22.12% (184/832)] statement ok reset datafusion.explain.analyze_categories; @@ -605,8 +605,8 @@ LIMIT 2; Plan with Metrics 01)SortExec: TopK(fetch=2), expr=[e@0 ASC NULLS LAST], preserve_partitioning=[false], filter=[e@0 < bb], metrics=[output_rows=2, output_batches=1, row_replacements=2] 02)--HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, d@0)], projection=[e@2], metrics=[output_rows=2, output_batches=1, array_map_created_count=0, build_input_batches=1, build_input_rows=2, input_batches=1, input_rows=2, avg_fanout=100% (2/2), probe_hit_rate=100% (2/2)] -03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/topk_join_build.parquet]]}, projection=[a], file_type=parquet, metrics=[output_rows=2, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_pages_pruned=0 total → 0 matched, page_index_rows_pruned=0 total → 0 matched, limit_pruned_row_groups=0 total → 0 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, predicate_evaluation_errors=0, pushdown_rows_matched=0, pushdown_rows_pruned=0, predicate_cache_inner_records=0, predicate_cache_records=0, scan_efficiency_ratio=6.7% (70/1.04 K)] -04)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/topk_join_probe.parquet]]}, projection=[d, e], file_type=parquet, predicate=DynamicFilter [ d@0 >= aa AND d@0 <= ab AND d@0 IN (SET) ([aa, ab]) ] AND DynamicFilter [ e@1 < bb ], pruning_predicate=d_null_count@1 != row_count@2 AND d_max@0 >= aa AND d_null_count@1 != row_count@2 AND d_min@3 <= ab AND (d_null_count@1 != row_count@2 AND d_min@3 <= aa AND aa <= d_max@0 OR d_null_count@1 != row_count@2 AND d_min@3 <= ab AND ab <= d_max@0) AND e_null_count@5 != row_count@2 AND e_min@4 < bb, required_guarantees=[d in (aa, ab)], metrics=[output_rows=2, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_pages_pruned=1 total → 1 matched, page_index_rows_pruned=4 total → 4 matched, limit_pruned_row_groups=0 total → 0 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, predicate_evaluation_errors=0, pushdown_rows_matched=2, pushdown_rows_pruned=2, predicate_cache_inner_records=8, predicate_cache_records=4, scan_efficiency_ratio=15.37% (166/1.08 K)] +03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/topk_join_build.parquet]]}, projection=[a], file_type=parquet, metrics=[output_rows=2, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_pages_pruned=0 total → 0 matched, page_index_rows_pruned=0 total → 0 matched, limit_pruned_row_groups=0 total → 0 matched, offset_pruned_row_groups=0 total → 0 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, predicate_evaluation_errors=0, pushdown_rows_matched=0, pushdown_rows_pruned=0, predicate_cache_inner_records=0, predicate_cache_records=0, scan_efficiency_ratio=6.7% (70/1.04 K)] +04)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/topk_join_probe.parquet]]}, projection=[d, e], file_type=parquet, predicate=DynamicFilter [ d@0 >= aa AND d@0 <= ab AND d@0 IN (SET) ([aa, ab]) ] AND DynamicFilter [ e@1 < bb ], pruning_predicate=d_null_count@1 != row_count@2 AND d_max@0 >= aa AND d_null_count@1 != row_count@2 AND d_min@3 <= ab AND (d_null_count@1 != row_count@2 AND d_min@3 <= aa AND aa <= d_max@0 OR d_null_count@1 != row_count@2 AND d_min@3 <= ab AND ab <= d_max@0) AND e_null_count@5 != row_count@2 AND e_min@4 < bb, required_guarantees=[d in (aa, ab)], metrics=[output_rows=2, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_pages_pruned=1 total → 1 matched, page_index_rows_pruned=4 total → 4 matched, limit_pruned_row_groups=0 total → 0 matched, offset_pruned_row_groups=0 total → 0 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, predicate_evaluation_errors=0, pushdown_rows_matched=2, pushdown_rows_pruned=2, predicate_cache_inner_records=8, predicate_cache_records=4, scan_efficiency_ratio=15.37% (166/1.08 K)] statement ok reset datafusion.explain.analyze_categories; @@ -655,7 +655,7 @@ EXPLAIN ANALYZE SELECT b, a FROM topk_proj ORDER BY a LIMIT 2; ---- Plan with Metrics 01)SortExec: TopK(fetch=2), expr=[a@1 ASC NULLS LAST], preserve_partitioning=[false], filter=[a@1 < 2], metrics=[output_rows=2, output_batches=1, row_replacements=2] -02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/topk_proj.parquet]]}, projection=[b, a], file_type=parquet, predicate=DynamicFilter [ a@0 < 2 ], pruning_predicate=a_null_count@1 != row_count@2 AND a_min@0 < 2, required_guarantees=[], metrics=[output_rows=3, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_pages_pruned=0 total → 0 matched, page_index_rows_pruned=0 total → 0 matched, limit_pruned_row_groups=0 total → 0 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, predicate_evaluation_errors=0, pushdown_rows_matched=3, pushdown_rows_pruned=0, predicate_cache_inner_records=3, predicate_cache_records=3, scan_efficiency_ratio=13.72% (153/1.11 K)] +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/topk_proj.parquet]]}, projection=[b, a], file_type=parquet, predicate=DynamicFilter [ a@0 < 2 ], pruning_predicate=a_null_count@1 != row_count@2 AND a_min@0 < 2, required_guarantees=[], metrics=[output_rows=3, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_pages_pruned=0 total → 0 matched, page_index_rows_pruned=0 total → 0 matched, limit_pruned_row_groups=0 total → 0 matched, offset_pruned_row_groups=0 total → 0 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, predicate_evaluation_errors=0, pushdown_rows_matched=3, pushdown_rows_pruned=0, predicate_cache_inner_records=3, predicate_cache_records=3, scan_efficiency_ratio=13.72% (153/1.11 K)] # Case 2: prune — `SELECT a` — filter stays as `a < 2` on the scan. query TT @@ -663,7 +663,7 @@ EXPLAIN ANALYZE SELECT a FROM topk_proj ORDER BY a LIMIT 2; ---- Plan with Metrics 01)SortExec: TopK(fetch=2), expr=[a@0 ASC NULLS LAST], preserve_partitioning=[false], filter=[a@0 < 2], metrics=[output_rows=2, output_batches=1, row_replacements=2] -02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/topk_proj.parquet]]}, projection=[a], file_type=parquet, predicate=DynamicFilter [ a@0 < 2 ], pruning_predicate=a_null_count@1 != row_count@2 AND a_min@0 < 2, required_guarantees=[], metrics=[output_rows=3, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_pages_pruned=0 total → 0 matched, page_index_rows_pruned=0 total → 0 matched, limit_pruned_row_groups=0 total → 0 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, predicate_evaluation_errors=0, pushdown_rows_matched=3, pushdown_rows_pruned=0, predicate_cache_inner_records=3, predicate_cache_records=3, scan_efficiency_ratio=7.09% (79/1.11 K)] +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/topk_proj.parquet]]}, projection=[a], file_type=parquet, predicate=DynamicFilter [ a@0 < 2 ], pruning_predicate=a_null_count@1 != row_count@2 AND a_min@0 < 2, required_guarantees=[], metrics=[output_rows=3, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_pages_pruned=0 total → 0 matched, page_index_rows_pruned=0 total → 0 matched, limit_pruned_row_groups=0 total → 0 matched, offset_pruned_row_groups=0 total → 0 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, predicate_evaluation_errors=0, pushdown_rows_matched=3, pushdown_rows_pruned=0, predicate_cache_inner_records=3, predicate_cache_records=3, scan_efficiency_ratio=7.09% (79/1.11 K)] # Case 3: expression — `SELECT a+1 AS a_plus_1` — the TopK filter is on # `a_plus_1`, the scan predicate must read `a@0 + 1`. @@ -672,7 +672,7 @@ EXPLAIN ANALYZE SELECT a + 1 AS a_plus_1, b FROM topk_proj ORDER BY a_plus_1 LIM ---- Plan with Metrics 01)SortExec: TopK(fetch=2), expr=[a_plus_1@0 ASC NULLS LAST], preserve_partitioning=[false], filter=[a_plus_1@0 < 3], metrics=[output_rows=2, output_batches=1, row_replacements=2] -02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/topk_proj.parquet]]}, projection=[CAST(a@0 AS Int64) + 1 as a_plus_1, b], file_type=parquet, predicate=DynamicFilter [ CAST(a@0 AS Int64) + 1 < 3 ], metrics=[output_rows=3, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_pages_pruned=0 total → 0 matched, page_index_rows_pruned=0 total → 0 matched, limit_pruned_row_groups=0 total → 0 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, predicate_evaluation_errors=0, pushdown_rows_matched=3, pushdown_rows_pruned=0, predicate_cache_inner_records=3, predicate_cache_records=3, scan_efficiency_ratio=13.72% (153/1.11 K)] +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/topk_proj.parquet]]}, projection=[CAST(a@0 AS Int64) + 1 as a_plus_1, b], file_type=parquet, predicate=DynamicFilter [ CAST(a@0 AS Int64) + 1 < 3 ], metrics=[output_rows=3, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_pages_pruned=0 total → 0 matched, page_index_rows_pruned=0 total → 0 matched, limit_pruned_row_groups=0 total → 0 matched, offset_pruned_row_groups=0 total → 0 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, predicate_evaluation_errors=0, pushdown_rows_matched=3, pushdown_rows_pruned=0, predicate_cache_inner_records=3, predicate_cache_records=3, scan_efficiency_ratio=13.72% (153/1.11 K)] # Case 4: alias shadowing — `SELECT a+1 AS a` — the projection renames # `a+1` to `a`, so the TopK's `a < 3` must still be rewritten to @@ -682,7 +682,7 @@ EXPLAIN ANALYZE SELECT a + 1 AS a, b FROM topk_proj ORDER BY a LIMIT 2; ---- Plan with Metrics 01)SortExec: TopK(fetch=2), expr=[a@0 ASC NULLS LAST], preserve_partitioning=[false], filter=[a@0 < 3], metrics=[output_rows=2, output_batches=1, row_replacements=2] -02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/topk_proj.parquet]]}, projection=[CAST(a@0 AS Int64) + 1 as a, b], file_type=parquet, predicate=DynamicFilter [ CAST(a@0 AS Int64) + 1 < 3 ], metrics=[output_rows=3, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_pages_pruned=0 total → 0 matched, page_index_rows_pruned=0 total → 0 matched, limit_pruned_row_groups=0 total → 0 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, predicate_evaluation_errors=0, pushdown_rows_matched=3, pushdown_rows_pruned=0, predicate_cache_inner_records=3, predicate_cache_records=3, scan_efficiency_ratio=13.72% (153/1.11 K)] +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/topk_proj.parquet]]}, projection=[CAST(a@0 AS Int64) + 1 as a, b], file_type=parquet, predicate=DynamicFilter [ CAST(a@0 AS Int64) + 1 < 3 ], metrics=[output_rows=3, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_pages_pruned=0 total → 0 matched, page_index_rows_pruned=0 total → 0 matched, limit_pruned_row_groups=0 total → 0 matched, offset_pruned_row_groups=0 total → 0 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, predicate_evaluation_errors=0, pushdown_rows_matched=3, pushdown_rows_pruned=0, predicate_cache_inner_records=3, predicate_cache_records=3, scan_efficiency_ratio=13.72% (153/1.11 K)] statement ok reset datafusion.explain.analyze_categories; @@ -739,12 +739,12 @@ INNER JOIN ( ---- Plan with Metrics 01)HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, a@0)], projection=[a@0, min_value@2], metrics=[output_rows=2, output_batches=2, array_map_created_count=0, build_input_batches=1, build_input_rows=2, input_batches=2, input_rows=2, avg_fanout=100% (2/2), probe_hit_rate=100% (2/2)] -02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/join_agg_build.parquet]]}, projection=[a], file_type=parquet, metrics=[output_rows=2, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_pages_pruned=0 total → 0 matched, page_index_rows_pruned=0 total → 0 matched, limit_pruned_row_groups=0 total → 0 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, predicate_evaluation_errors=0, pushdown_rows_matched=0, pushdown_rows_pruned=0, predicate_cache_inner_records=0, predicate_cache_records=0, scan_efficiency_ratio=15.32% (70/457)] +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/join_agg_build.parquet]]}, projection=[a], file_type=parquet, metrics=[output_rows=2, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_pages_pruned=0 total → 0 matched, page_index_rows_pruned=0 total → 0 matched, limit_pruned_row_groups=0 total → 0 matched, offset_pruned_row_groups=0 total → 0 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, predicate_evaluation_errors=0, pushdown_rows_matched=0, pushdown_rows_pruned=0, predicate_cache_inner_records=0, predicate_cache_records=0, scan_efficiency_ratio=15.32% (70/457)] 03)--ProjectionExec: expr=[a@0 as a, min(join_agg_probe.value)@1 as min_value], metrics=[output_rows=2, output_batches=2] 04)----AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[min(join_agg_probe.value)], metrics=[output_rows=2, output_batches=2, spill_count=0, spilled_rows=0] 05)------RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=1, metrics=[output_rows=2, output_batches=2, spill_count=0, spilled_rows=0] 06)--------AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[min(join_agg_probe.value)], metrics=[output_rows=2, output_batches=1, spill_count=0, spilled_rows=0, skipped_aggregation_rows=0, reduction_factor=100% (2/2)] -07)----------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/join_agg_probe.parquet]]}, projection=[a, value], file_type=parquet, predicate=DynamicFilter [ a@0 >= h1 AND a@0 <= h2 AND a@0 IN (SET) ([h1, h2]) ], pruning_predicate=a_null_count@1 != row_count@2 AND a_max@0 >= h1 AND a_null_count@1 != row_count@2 AND a_min@3 <= h2 AND (a_null_count@1 != row_count@2 AND a_min@3 <= h1 AND h1 <= a_max@0 OR a_null_count@1 != row_count@2 AND a_min@3 <= h2 AND h2 <= a_max@0), required_guarantees=[a in (h1, h2)], metrics=[output_rows=2, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_pages_pruned=1 total → 1 matched, page_index_rows_pruned=4 total → 4 matched, limit_pruned_row_groups=0 total → 0 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, predicate_evaluation_errors=0, pushdown_rows_matched=2, pushdown_rows_pruned=2, predicate_cache_inner_records=4, predicate_cache_records=2, scan_efficiency_ratio=19.81% (163/823)] +07)----------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/join_agg_probe.parquet]]}, projection=[a, value], file_type=parquet, predicate=DynamicFilter [ a@0 >= h1 AND a@0 <= h2 AND a@0 IN (SET) ([h1, h2]) ], pruning_predicate=a_null_count@1 != row_count@2 AND a_max@0 >= h1 AND a_null_count@1 != row_count@2 AND a_min@3 <= h2 AND (a_null_count@1 != row_count@2 AND a_min@3 <= h1 AND h1 <= a_max@0 OR a_null_count@1 != row_count@2 AND a_min@3 <= h2 AND h2 <= a_max@0), required_guarantees=[a in (h1, h2)], metrics=[output_rows=2, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_pages_pruned=1 total → 1 matched, page_index_rows_pruned=4 total → 4 matched, limit_pruned_row_groups=0 total → 0 matched, offset_pruned_row_groups=0 total → 0 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, predicate_evaluation_errors=0, pushdown_rows_matched=2, pushdown_rows_pruned=2, predicate_cache_inner_records=4, predicate_cache_records=2, scan_efficiency_ratio=19.81% (163/823)] statement ok reset datafusion.explain.analyze_categories; @@ -806,8 +806,8 @@ ON nulls_build.a = nulls_probe.a AND nulls_build.b = nulls_probe.b; ---- Plan with Metrics 01)HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)], metrics=[output_rows=1, output_batches=1, array_map_created_count=0, build_input_batches=1, build_input_rows=3, input_batches=1, input_rows=1, avg_fanout=100% (1/1), probe_hit_rate=100% (1/1)] -02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/nulls_build.parquet]]}, projection=[a, b], file_type=parquet, metrics=[output_rows=3, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_pages_pruned=0 total → 0 matched, page_index_rows_pruned=0 total → 0 matched, limit_pruned_row_groups=0 total → 0 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, predicate_evaluation_errors=0, pushdown_rows_matched=0, pushdown_rows_pruned=0, predicate_cache_inner_records=0, predicate_cache_records=0, scan_efficiency_ratio=18.6% (144/774)] -03)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/nulls_probe.parquet]]}, projection=[a, b, c], file_type=parquet, predicate=DynamicFilter [ a@0 >= aa AND a@0 <= ab AND b@1 >= 1 AND b@1 <= 2 AND struct(a@0, b@1) IN (SET) ([{c0:aa,c1:1}, {c0:,c1:2}, {c0:ab,c1:}]) ], pruning_predicate=a_null_count@1 != row_count@2 AND a_max@0 >= aa AND a_null_count@1 != row_count@2 AND a_min@3 <= ab AND b_null_count@5 != row_count@2 AND b_max@4 >= 1 AND b_null_count@5 != row_count@2 AND b_min@6 <= 2, required_guarantees=[], metrics=[output_rows=1, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_pages_pruned=0 total → 0 matched, page_index_rows_pruned=0 total → 0 matched, limit_pruned_row_groups=0 total → 0 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, predicate_evaluation_errors=0, pushdown_rows_matched=1, pushdown_rows_pruned=3, predicate_cache_inner_records=8, predicate_cache_records=2, scan_efficiency_ratio=21.1% (237/1.12 K)] +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/nulls_build.parquet]]}, projection=[a, b], file_type=parquet, metrics=[output_rows=3, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_pages_pruned=0 total → 0 matched, page_index_rows_pruned=0 total → 0 matched, limit_pruned_row_groups=0 total → 0 matched, offset_pruned_row_groups=0 total → 0 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, predicate_evaluation_errors=0, pushdown_rows_matched=0, pushdown_rows_pruned=0, predicate_cache_inner_records=0, predicate_cache_records=0, scan_efficiency_ratio=18.6% (144/774)] +03)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/nulls_probe.parquet]]}, projection=[a, b, c], file_type=parquet, predicate=DynamicFilter [ a@0 >= aa AND a@0 <= ab AND b@1 >= 1 AND b@1 <= 2 AND struct(a@0, b@1) IN (SET) ([{c0:aa,c1:1}, {c0:,c1:2}, {c0:ab,c1:}]) ], pruning_predicate=a_null_count@1 != row_count@2 AND a_max@0 >= aa AND a_null_count@1 != row_count@2 AND a_min@3 <= ab AND b_null_count@5 != row_count@2 AND b_max@4 >= 1 AND b_null_count@5 != row_count@2 AND b_min@6 <= 2, required_guarantees=[], metrics=[output_rows=1, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_pages_pruned=0 total → 0 matched, page_index_rows_pruned=0 total → 0 matched, limit_pruned_row_groups=0 total → 0 matched, offset_pruned_row_groups=0 total → 0 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, predicate_evaluation_errors=0, pushdown_rows_matched=1, pushdown_rows_pruned=3, predicate_cache_inner_records=8, predicate_cache_records=2, scan_efficiency_ratio=21.1% (237/1.12 K)] statement ok reset datafusion.explain.analyze_categories; @@ -872,8 +872,8 @@ ON lj_build.a = lj_probe.a AND lj_build.b = lj_probe.b; ---- Plan with Metrics 01)HashJoinExec: mode=CollectLeft, join_type=Left, on=[(a@0, a@0), (b@1, b@1)], metrics=[output_rows=2, output_batches=1, array_map_created_count=0, build_input_batches=1, build_input_rows=2, input_batches=2, input_rows=2, avg_fanout=100% (2/2), probe_hit_rate=100% (2/2)] -02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/lj_build.parquet]]}, projection=[a, b, c], file_type=parquet, metrics=[output_rows=2, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_pages_pruned=0 total → 0 matched, page_index_rows_pruned=0 total → 0 matched, limit_pruned_row_groups=0 total → 0 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, predicate_evaluation_errors=0, pushdown_rows_matched=0, pushdown_rows_pruned=0, predicate_cache_inner_records=0, predicate_cache_records=0, scan_efficiency_ratio=20.48% (214/1.04 K)] -03)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/lj_probe.parquet]]}, projection=[a, b, e], file_type=parquet, predicate=DynamicFilter [ a@0 >= aa AND a@0 <= ab AND b@1 >= ba AND b@1 <= bb AND struct(a@0, b@1) IN (SET) ([{c0:aa,c1:ba}, {c0:ab,c1:bb}]) ], pruning_predicate=a_null_count@1 != row_count@2 AND a_max@0 >= aa AND a_null_count@1 != row_count@2 AND a_min@3 <= ab AND b_null_count@5 != row_count@2 AND b_max@4 >= ba AND b_null_count@5 != row_count@2 AND b_min@6 <= bb, required_guarantees=[], metrics=[output_rows=2, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_pages_pruned=0 total → 0 matched, page_index_rows_pruned=0 total → 0 matched, limit_pruned_row_groups=0 total → 0 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, predicate_evaluation_errors=0, pushdown_rows_matched=2, pushdown_rows_pruned=2, predicate_cache_inner_records=8, predicate_cache_records=4, scan_efficiency_ratio=22.78% (246/1.08 K)] +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/lj_build.parquet]]}, projection=[a, b, c], file_type=parquet, metrics=[output_rows=2, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_pages_pruned=0 total → 0 matched, page_index_rows_pruned=0 total → 0 matched, limit_pruned_row_groups=0 total → 0 matched, offset_pruned_row_groups=0 total → 0 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, predicate_evaluation_errors=0, pushdown_rows_matched=0, pushdown_rows_pruned=0, predicate_cache_inner_records=0, predicate_cache_records=0, scan_efficiency_ratio=20.48% (214/1.04 K)] +03)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/lj_probe.parquet]]}, projection=[a, b, e], file_type=parquet, predicate=DynamicFilter [ a@0 >= aa AND a@0 <= ab AND b@1 >= ba AND b@1 <= bb AND struct(a@0, b@1) IN (SET) ([{c0:aa,c1:ba}, {c0:ab,c1:bb}]) ], pruning_predicate=a_null_count@1 != row_count@2 AND a_max@0 >= aa AND a_null_count@1 != row_count@2 AND a_min@3 <= ab AND b_null_count@5 != row_count@2 AND b_max@4 >= ba AND b_null_count@5 != row_count@2 AND b_min@6 <= bb, required_guarantees=[], metrics=[output_rows=2, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_pages_pruned=0 total → 0 matched, page_index_rows_pruned=0 total → 0 matched, limit_pruned_row_groups=0 total → 0 matched, offset_pruned_row_groups=0 total → 0 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, predicate_evaluation_errors=0, pushdown_rows_matched=2, pushdown_rows_pruned=2, predicate_cache_inner_records=8, predicate_cache_records=4, scan_efficiency_ratio=22.78% (246/1.08 K)] # LEFT SEMI JOIN: only matching build rows are returned; probe scan still # receives the dynamic filter. @@ -888,8 +888,8 @@ WHERE EXISTS ( ---- Plan with Metrics 01)HashJoinExec: mode=CollectLeft, join_type=LeftSemi, on=[(a@0, a@0), (b@1, b@1)], metrics=[output_rows=2, output_batches=1, array_map_created_count=0, build_input_batches=1, build_input_rows=2, input_batches=2, input_rows=4, avg_fanout=100% (2/2), probe_hit_rate=100% (2/2)] -02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/lj_build.parquet]]}, projection=[a, b, c], file_type=parquet, metrics=[output_rows=2, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_pages_pruned=0 total → 0 matched, page_index_rows_pruned=0 total → 0 matched, limit_pruned_row_groups=0 total → 0 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, predicate_evaluation_errors=0, pushdown_rows_matched=0, pushdown_rows_pruned=0, predicate_cache_inner_records=0, predicate_cache_records=0, scan_efficiency_ratio=20.48% (214/1.04 K)] -03)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/lj_probe.parquet]]}, projection=[a, b], file_type=parquet, predicate=DynamicFilter [ a@0 >= aa AND a@0 <= ab AND b@1 >= ba AND b@1 <= bb AND struct(a@0, b@1) IN (SET) ([{c0:aa,c1:ba}, {c0:ab,c1:bb}]) ], pruning_predicate=a_null_count@1 != row_count@2 AND a_max@0 >= aa AND a_null_count@1 != row_count@2 AND a_min@3 <= ab AND b_null_count@5 != row_count@2 AND b_max@4 >= ba AND b_null_count@5 != row_count@2 AND b_min@6 <= bb, required_guarantees=[], metrics=[output_rows=2, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_pages_pruned=0 total → 0 matched, page_index_rows_pruned=0 total → 0 matched, limit_pruned_row_groups=0 total → 0 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, predicate_evaluation_errors=0, pushdown_rows_matched=2, pushdown_rows_pruned=2, predicate_cache_inner_records=8, predicate_cache_records=4, scan_efficiency_ratio=15.37% (166/1.08 K)] +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/lj_build.parquet]]}, projection=[a, b, c], file_type=parquet, metrics=[output_rows=2, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_pages_pruned=0 total → 0 matched, page_index_rows_pruned=0 total → 0 matched, limit_pruned_row_groups=0 total → 0 matched, offset_pruned_row_groups=0 total → 0 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, predicate_evaluation_errors=0, pushdown_rows_matched=0, pushdown_rows_pruned=0, predicate_cache_inner_records=0, predicate_cache_records=0, scan_efficiency_ratio=20.48% (214/1.04 K)] +03)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/lj_probe.parquet]]}, projection=[a, b], file_type=parquet, predicate=DynamicFilter [ a@0 >= aa AND a@0 <= ab AND b@1 >= ba AND b@1 <= bb AND struct(a@0, b@1) IN (SET) ([{c0:aa,c1:ba}, {c0:ab,c1:bb}]) ], pruning_predicate=a_null_count@1 != row_count@2 AND a_max@0 >= aa AND a_null_count@1 != row_count@2 AND a_min@3 <= ab AND b_null_count@5 != row_count@2 AND b_max@4 >= ba AND b_null_count@5 != row_count@2 AND b_min@6 <= bb, required_guarantees=[], metrics=[output_rows=2, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_pages_pruned=0 total → 0 matched, page_index_rows_pruned=0 total → 0 matched, limit_pruned_row_groups=0 total → 0 matched, offset_pruned_row_groups=0 total → 0 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, predicate_evaluation_errors=0, pushdown_rows_matched=2, pushdown_rows_pruned=2, predicate_cache_inner_records=8, predicate_cache_records=4, scan_efficiency_ratio=15.37% (166/1.08 K)] statement ok reset datafusion.explain.analyze_categories; @@ -958,8 +958,8 @@ FROM hl_probe p INNER JOIN hl_build AS build ---- Plan with Metrics 01)HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)], projection=[a@3, b@4, c@2, e@5], metrics=[output_rows=2, output_batches=1, array_map_created_count=0, build_input_batches=1, build_input_rows=2, input_batches=1, input_rows=2, avg_fanout=100% (2/2), probe_hit_rate=100% (2/2)] -02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/hl_build.parquet]]}, projection=[a, b, c], file_type=parquet, metrics=[output_rows=2, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_pages_pruned=0 total → 0 matched, page_index_rows_pruned=0 total → 0 matched, limit_pruned_row_groups=0 total → 0 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, predicate_evaluation_errors=0, pushdown_rows_matched=0, pushdown_rows_pruned=0, predicate_cache_inner_records=0, predicate_cache_records=0, scan_efficiency_ratio=20.48% (214/1.04 K)] -03)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/hl_probe.parquet]]}, projection=[a, b, e], file_type=parquet, predicate=DynamicFilter [ a@0 >= aa AND a@0 <= ab AND b@1 >= ba AND b@1 <= bb AND hash_lookup ], pruning_predicate=a_null_count@1 != row_count@2 AND a_max@0 >= aa AND a_null_count@1 != row_count@2 AND a_min@3 <= ab AND b_null_count@5 != row_count@2 AND b_max@4 >= ba AND b_null_count@5 != row_count@2 AND b_min@6 <= bb, required_guarantees=[], metrics=[output_rows=2, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_pages_pruned=0 total → 0 matched, page_index_rows_pruned=0 total → 0 matched, limit_pruned_row_groups=0 total → 0 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, predicate_evaluation_errors=0, pushdown_rows_matched=2, pushdown_rows_pruned=2, predicate_cache_inner_records=8, predicate_cache_records=4, scan_efficiency_ratio=22.78% (246/1.08 K)] +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/hl_build.parquet]]}, projection=[a, b, c], file_type=parquet, metrics=[output_rows=2, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_pages_pruned=0 total → 0 matched, page_index_rows_pruned=0 total → 0 matched, limit_pruned_row_groups=0 total → 0 matched, offset_pruned_row_groups=0 total → 0 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, predicate_evaluation_errors=0, pushdown_rows_matched=0, pushdown_rows_pruned=0, predicate_cache_inner_records=0, predicate_cache_records=0, scan_efficiency_ratio=20.48% (214/1.04 K)] +03)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/hl_probe.parquet]]}, projection=[a, b, e], file_type=parquet, predicate=DynamicFilter [ a@0 >= aa AND a@0 <= ab AND b@1 >= ba AND b@1 <= bb AND hash_lookup ], pruning_predicate=a_null_count@1 != row_count@2 AND a_max@0 >= aa AND a_null_count@1 != row_count@2 AND a_min@3 <= ab AND b_null_count@5 != row_count@2 AND b_max@4 >= ba AND b_null_count@5 != row_count@2 AND b_min@6 <= bb, required_guarantees=[], metrics=[output_rows=2, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_pages_pruned=0 total → 0 matched, page_index_rows_pruned=0 total → 0 matched, limit_pruned_row_groups=0 total → 0 matched, offset_pruned_row_groups=0 total → 0 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, predicate_evaluation_errors=0, pushdown_rows_matched=2, pushdown_rows_pruned=2, predicate_cache_inner_records=8, predicate_cache_records=4, scan_efficiency_ratio=22.78% (246/1.08 K)] statement ok drop table hl_build; @@ -1007,8 +1007,8 @@ FROM int_build b INNER JOIN int_probe p ---- Plan with Metrics 01)HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(id1@0, id1@0), (id2@1, id2@1)], projection=[id1@0, id2@1, value@2, data@5], metrics=[output_rows=2, output_batches=1, array_map_created_count=0, build_input_batches=1, build_input_rows=2, input_batches=1, input_rows=2, avg_fanout=100% (2/2), probe_hit_rate=100% (2/2)] -02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/int_build.parquet]]}, projection=[id1, id2, value], file_type=parquet, metrics=[output_rows=2, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_pages_pruned=0 total → 0 matched, page_index_rows_pruned=0 total → 0 matched, limit_pruned_row_groups=0 total → 0 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, predicate_evaluation_errors=0, pushdown_rows_matched=0, pushdown_rows_pruned=0, predicate_cache_inner_records=0, predicate_cache_records=0, scan_efficiency_ratio=19.02% (222/1.17 K)] -03)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/int_probe.parquet]]}, projection=[id1, id2, data], file_type=parquet, predicate=DynamicFilter [ id1@0 >= 1 AND id1@0 <= 2 AND id2@1 >= 10 AND id2@1 <= 20 AND hash_lookup ], pruning_predicate=id1_null_count@1 != row_count@2 AND id1_max@0 >= 1 AND id1_null_count@1 != row_count@2 AND id1_min@3 <= 2 AND id2_null_count@5 != row_count@2 AND id2_max@4 >= 10 AND id2_null_count@5 != row_count@2 AND id2_min@6 <= 20, required_guarantees=[], metrics=[output_rows=2, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_pages_pruned=0 total → 0 matched, page_index_rows_pruned=0 total → 0 matched, limit_pruned_row_groups=0 total → 0 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, predicate_evaluation_errors=0, pushdown_rows_matched=2, pushdown_rows_pruned=2, predicate_cache_inner_records=8, predicate_cache_records=4, scan_efficiency_ratio=21.43% (239/1.11 K)] +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/int_build.parquet]]}, projection=[id1, id2, value], file_type=parquet, metrics=[output_rows=2, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_pages_pruned=0 total → 0 matched, page_index_rows_pruned=0 total → 0 matched, limit_pruned_row_groups=0 total → 0 matched, offset_pruned_row_groups=0 total → 0 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, predicate_evaluation_errors=0, pushdown_rows_matched=0, pushdown_rows_pruned=0, predicate_cache_inner_records=0, predicate_cache_records=0, scan_efficiency_ratio=19.02% (222/1.17 K)] +03)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/int_probe.parquet]]}, projection=[id1, id2, data], file_type=parquet, predicate=DynamicFilter [ id1@0 >= 1 AND id1@0 <= 2 AND id2@1 >= 10 AND id2@1 <= 20 AND hash_lookup ], pruning_predicate=id1_null_count@1 != row_count@2 AND id1_max@0 >= 1 AND id1_null_count@1 != row_count@2 AND id1_min@3 <= 2 AND id2_null_count@5 != row_count@2 AND id2_max@4 >= 10 AND id2_null_count@5 != row_count@2 AND id2_min@6 <= 20, required_guarantees=[], metrics=[output_rows=2, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_pages_pruned=0 total → 0 matched, page_index_rows_pruned=0 total → 0 matched, limit_pruned_row_groups=0 total → 0 matched, offset_pruned_row_groups=0 total → 0 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, predicate_evaluation_errors=0, pushdown_rows_matched=2, pushdown_rows_pruned=2, predicate_cache_inner_records=8, predicate_cache_records=4, scan_efficiency_ratio=21.43% (239/1.11 K)] statement ok reset datafusion.explain.analyze_categories; diff --git a/datafusion/sqllogictest/test_files/sort_pushdown.slt b/datafusion/sqllogictest/test_files/sort_pushdown.slt index b6c75f3977010..57d95513a5fa6 100644 --- a/datafusion/sqllogictest/test_files/sort_pushdown.slt +++ b/datafusion/sqllogictest/test_files/sort_pushdown.slt @@ -2280,3 +2280,151 @@ SET datafusion.execution.collect_statistics = true; statement ok SET datafusion.optimizer.enable_sort_pushdown = true; + +# =========================================================== +# Test N: OFFSET pushdown to parquet +# Verifies that OFFSET is pushed down to DataSourceExec, +# eliminating GlobalLimitExec and skipping row groups. +# =========================================================== + +statement ok +SET datafusion.execution.target_partitions = 1; + +statement ok +SET datafusion.execution.parquet.max_row_group_size = 5; + +statement ok +CREATE TABLE tn_data(id INT, value INT) AS VALUES +(1,10),(2,20),(3,30),(4,40),(5,50), +(6,60),(7,70),(8,80),(9,90),(10,100), +(11,110),(12,120),(13,130),(14,140),(15,150); + +query I +COPY (SELECT * FROM tn_data ORDER BY id ASC) +TO 'test_files/scratch/sort_pushdown/tn_offset/data.parquet'; +---- +15 + +statement ok +SET datafusion.execution.parquet.max_row_group_size = 1048576; + +statement ok +CREATE EXTERNAL TABLE tn_offset(id INT, value INT) +STORED AS PARQUET +LOCATION 'test_files/scratch/sort_pushdown/tn_offset/data.parquet'; + +# Test N.1: OFFSET pushdown — GlobalLimitExec eliminated, offset handled by parquet +query TT +EXPLAIN SELECT * FROM tn_offset LIMIT 3 OFFSET 5; +---- +logical_plan +01)Limit: skip=5, fetch=3 +02)--TableScan: tn_offset projection=[id, value], fetch=8 +physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/tn_offset/data.parquet]]}, projection=[id, value], limit=8, offset=5, output_ordering=[id@0 ASC NULLS LAST], file_type=parquet + +# Test N.2: Results skip first 5 rows, return next 3 +query II +SELECT * FROM tn_offset LIMIT 3 OFFSET 5; +---- +6 60 +7 70 +8 80 + +# Test N.3: Large offset near end +query II +SELECT * FROM tn_offset LIMIT 3 OFFSET 12; +---- +13 130 +14 140 +15 150 + +# Test N.4: Offset at last few rows +query II +SELECT * FROM tn_offset LIMIT 10 OFFSET 13; +---- +14 140 +15 150 + +# Test N.5: Offset exactly at boundary (skip first RG entirely) +query II +SELECT * FROM tn_offset LIMIT 3 OFFSET 5; +---- +6 60 +7 70 +8 80 + +# Test N.6: Offset zero (no skip) +query II +SELECT * FROM tn_offset LIMIT 3 OFFSET 0; +---- +1 10 +2 20 +3 30 + +# Test N.7: LIMIT 5 OFFSET 10 spanning RG boundaries +# With max_row_group_size=5, we have 3 RGs: [1-5], [6-10], [11-15] +# OFFSET 10 skips rows 1-10, LIMIT 5 returns rows 11-15 +query II +SELECT * FROM tn_offset LIMIT 5 OFFSET 10; +---- +11 110 +12 120 +13 130 +14 140 +15 150 + +# Test N.8: OFFSET with multi-partition (target_partitions=4) +statement ok +SET datafusion.execution.target_partitions = 4; + +query II +SELECT * FROM tn_offset LIMIT 3 OFFSET 5; +---- +6 60 +7 70 +8 80 + +# Restore single partition for remaining tests +statement ok +SET datafusion.execution.target_partitions = 1; + +# Test N.9: OFFSET with WHERE clause — offset still works +# (GlobalLimitExec may remain when there is a filter) +query II +SELECT * FROM tn_offset WHERE value > 50 LIMIT 3 OFFSET 2; +---- +8 80 +9 90 +10 100 + +# Test N.10: OFFSET 0 is equivalent to no offset +query II +SELECT * FROM tn_offset LIMIT 5 OFFSET 0; +---- +1 10 +2 20 +3 30 +4 40 +5 50 + +# Cleanup Test N +statement ok +DROP TABLE tn_data; + +statement ok +DROP TABLE tn_offset; + +# =========================================================== +# Test O: Multi-file OFFSET — NOT pushed down (non-deterministic order) +# GlobalLimitExec is kept for multi-file queries. +# =========================================================== + +# Reset settings +statement ok +SET datafusion.execution.target_partitions = 4; + +statement ok +SET datafusion.execution.collect_statistics = true; + +statement ok +SET datafusion.optimizer.enable_sort_pushdown = true; From 1691b075daab7981e90bac5fcf287f017120ed8a Mon Sep 17 00:00:00 2001 From: Qi Zhu <821684824@qq.com> Date: Thu, 30 Apr 2026 15:42:36 +0800 Subject: [PATCH 2/2] fix: push OFFSET through CoalescePartitionsExec to DataSourceExec MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When a single parquet file is split into multiple byte-range partitions, CoalescePartitionsExec sits between GlobalLimitExec and DataSourceExec. Previously, the optimizer would keep the GlobalLimitExec for the skip, meaning offset wasn't pushed to the parquet reader. Now we try to push the offset through the combining operator to its DataSourceExec child, which uses a shared Arc counter to coordinate offset consumption across partitions. This eliminates the GlobalLimitExec entirely for supported sources. Result on 14GB hits.parquet: OFFSET 99M LIMIT 5 from 3.2s → 29ms. Co-Authored-By: Claude Opus 4.6 (1M context) --- datafusion/datasource-parquet/src/source.rs | 19 +++++++++++++--- .../physical-optimizer/src/limit_pushdown.rs | 22 +++++++++++++++---- .../sqllogictest/test_files/sort_pushdown.slt | 1 + 3 files changed, 35 insertions(+), 7 deletions(-) diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs index ca60f9eca8579..1b13e19bf9733 100644 --- a/datafusion/datasource-parquet/src/source.rs +++ b/datafusion/datasource-parquet/src/source.rs @@ -294,6 +294,11 @@ pub struct ParquetSource { /// so we still need to sort them after reading, so the reverse scan is inexact. /// Used to optimize ORDER BY ... DESC on sorted data. reverse_row_groups: bool, + /// Shared remaining offset counter across all partitions. + /// Initialized from `FileScanConfig.offset` in `create_morselizer`; + /// all partitions clone the same Arc so offset consumption is + /// coordinated atomically. + remaining_offset: Arc, } impl ParquetSource { @@ -319,6 +324,7 @@ impl ParquetSource { #[cfg(feature = "parquet_encryption")] encryption_factory: None, reverse_row_groups: false, + remaining_offset: Arc::new(std::sync::atomic::AtomicUsize::new(0)), } } @@ -553,6 +559,15 @@ impl FileSource for ParquetSource { .as_ref() .map(|time_unit| parse_coerce_int96_string(time_unit.as_str()).unwrap()); + // Initialize the shared offset from base_config on first partition. + // All partitions share the same Arc; scanning hasn't started yet + // (streams are created before poll_next), so no race with consumers. + if let Some(offset) = base_config.offset { + self.remaining_offset + .store(offset, std::sync::atomic::Ordering::SeqCst); + } + let remaining_offset = Arc::clone(&self.remaining_offset); + Ok(Box::new(ParquetMorselizer { partition_index: partition, projection: self.projection.clone(), @@ -560,9 +575,7 @@ impl FileSource for ParquetSource { .batch_size .expect("Batch size must set before creating ParquetMorselizer"), limit: base_config.limit, - remaining_offset: Arc::new(std::sync::atomic::AtomicUsize::new( - base_config.offset.unwrap_or(0), - )), + remaining_offset, preserve_order: base_config.preserve_order, predicate: self.predicate.clone(), table_schema: self.table_schema.clone(), diff --git a/datafusion/physical-optimizer/src/limit_pushdown.rs b/datafusion/physical-optimizer/src/limit_pushdown.rs index fa6b644876fc4..83b10a0338806 100644 --- a/datafusion/physical-optimizer/src/limit_pushdown.rs +++ b/datafusion/physical-optimizer/src/limit_pushdown.rs @@ -216,11 +216,25 @@ pub fn pushdown_limit_helper( // fetch info to plan if possible. If not, we must add a limit node // with the information from the global state. let mut new_plan = plan_with_fetch; - // Execution plans can't (yet) handle skip, so if we have one, - // we still need to add a global limit if global_state.skip > 0 { - new_plan = - add_global_limit(new_plan, global_state.skip, global_state.fetch); + // Try to push offset through the combining operator + // to its child (DataSourceExec). + let offset_pushed = { + let children = new_plan.children(); + children.len() == 1 + && children[0].with_offset(global_state.skip).is_some() + }; + if offset_pushed { + let child = new_plan.children()[0] + .with_offset(global_state.skip) + .unwrap(); + let fallback = Arc::clone(&new_plan); + new_plan = + new_plan.with_new_children(vec![child]).unwrap_or(fallback); + } else { + new_plan = + add_global_limit(new_plan, global_state.skip, global_state.fetch); + } } global_state.fetch = skip_and_fetch; global_state.skip = 0; diff --git a/datafusion/sqllogictest/test_files/sort_pushdown.slt b/datafusion/sqllogictest/test_files/sort_pushdown.slt index 57d95513a5fa6..2c09c6e840e73 100644 --- a/datafusion/sqllogictest/test_files/sort_pushdown.slt +++ b/datafusion/sqllogictest/test_files/sort_pushdown.slt @@ -2374,6 +2374,7 @@ SELECT * FROM tn_offset LIMIT 5 OFFSET 10; 15 150 # Test N.8: OFFSET with multi-partition (target_partitions=4) +# Small file stays as 1 group, but offset is still pushed down correctly. statement ok SET datafusion.execution.target_partitions = 4;