From fd0d9fd96c155195429eb61ce17dc04a8583dd9e Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Wed, 13 May 2026 19:59:09 -0700 Subject: [PATCH] feat(parquet): two-stage access-plan hooks with shared async reader MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add two extension points so external crates can contribute additional pruning passes (sampling, custom statistics, user-defined Parquet indexes) without forking the opener: - `PostMetadataAccessPlanHook` runs after the built-in file-range and row-group-statistics passes have refined the access plan. Bloom filters have not been loaded yet. - `PreBuildStreamAccessPlanHook` runs after all built-in pruning passes, just before the reader stream is constructed. Each hook is itself a small state machine: `begin()` returns an instance; `step()` runs on the CPU pool and either yields a `BoxFuture` for I/O (awaited on the I/O pool) or signals `Done`. The instance morphs between typed states between calls, so loaded data (e.g. an external index) becomes a field on the next instance type. The opener's state machine grows four new variants — `Cpu` and `Io` per stage — that route hook steps to the correct pool. When no hooks are registered for a stage the opener skips the new states entirely. `SharedAsyncFileReader` wraps the `Box` returned by the factory in `Arc>` and reimplements `AsyncFileReader`. The wrapper is cheaply cloneable, so hook I/O futures share warm state (footer cache, etc.) with the opener's primary reader. The Mutex never contends in practice because reads are sequential. Net effect on existing behavior: none. The built-in pruning passes (file range, statistics, bloom, limit, page index) all remain as opener state-machine steps. Moving them to default-registered hooks is a follow-up so reviewers can evaluate the semantics-preservation of each conversion separately. A new end-to-end test exercises a multi-step user hook: CPU step inspects context, yields to I/O fetching bytes from the warm reader, CPU step narrows the access plan, asserts the reader skips the corresponding row group. All 100 existing `datafusion-datasource-parquet` lib tests pass plus the new test. `cargo fmt --all`, `./dev/rust_lint.sh`, and `cargo clippy -p datafusion-datasource-parquet --all-targets --all-features -- -D warnings` are clean. Downstream `datafusion` core builds cleanly. --- .../src/access_plan_optimizer.rs | 260 +++++++++ datafusion/datasource-parquet/src/mod.rs | 6 + datafusion/datasource-parquet/src/opener.rs | 550 ++++++++++++++++-- datafusion/datasource-parquet/src/reader.rs | 73 +++ datafusion/datasource-parquet/src/source.rs | 51 ++ 5 files changed, 894 insertions(+), 46 deletions(-) create mode 100644 datafusion/datasource-parquet/src/access_plan_optimizer.rs diff --git a/datafusion/datasource-parquet/src/access_plan_optimizer.rs b/datafusion/datasource-parquet/src/access_plan_optimizer.rs new file mode 100644 index 0000000000000..d784c6e18bd81 --- /dev/null +++ b/datafusion/datasource-parquet/src/access_plan_optimizer.rs @@ -0,0 +1,260 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Extension hooks for refining a [`ParquetAccessPlan`] during file open. +//! +//! The Parquet opener narrows down which row groups (and which rows within +//! them) it will read through a fixed sequence of built-in passes: +//! +//! - file-range pruning, +//! - row-group statistics pruning, +//! - bloom-filter pruning, +//! - limit-based pruning, +//! - page-index pruning. +//! +//! Each pass operates on a [`ParquetAccessPlan`]. This module exposes the +//! pipeline as two stage-specific hooks so external crates can contribute +//! additional passes — sampling, custom statistics, user-defined Parquet +//! indexes — without forking the opener. +//! +//! # Hook stages +//! +//! - [`PostMetadataAccessPlanHook`] runs after the Parquet footer and +//! page index (when enabled) are loaded, and after the built-in +//! file-range and row-group-statistics passes have refined the plan. +//! The opener also pre-registers the built-in bloom-filter pruning +//! pass as a hook at this stage when `enable_bloom_filter` is set. +//! - [`PreBuildStreamAccessPlanHook`] runs after the built-in limit and +//! page-index passes, just before the reader stream is constructed. +//! +//! # Hook is a state machine +//! +//! A hook may do multiple steps of CPU and I/O work — for instance, +//! "fetch an external index" (I/O) followed by "apply pruning using the +//! fetched data" (CPU). To preserve the opener's CPU/I/O routing, each +//! hook is itself a small state machine driven by the opener: +//! +//! 1. The opener calls [`PostMetadataAccessPlanHook::begin`] to obtain a +//! stateful [`PostMetadataHookInstance`]. +//! 2. The opener calls [`PostMetadataHookInstance::step`] on the +//! **CPU pool**. Any CPU work the hook needs to do happens +//! synchronously inside `step`. The hook returns either +//! [`PostMetadataHookStep::Done`] (final context) or +//! [`PostMetadataHookStep::Yield`] (a future to await on the +//! **I/O pool**). +//! 3. On [`Yield`](PostMetadataHookStep::Yield), the opener awaits the +//! returned future on its I/O pool. The future returns +//! `(ctx, next_instance)`; the opener then re-enters `step` on the +//! CPU pool with the next instance. +//! 4. On [`Done`](PostMetadataHookStep::Done), the opener moves on to +//! the next registered hook (if any), or to the next built-in stage. +//! +//! Each "state" of a hook can be a different type — `MyHookStart`, +//! `MyHookLoaded { index_data }`, etc. — that implements +//! `PostMetadataHookInstance`. The instance morphs by returning the +//! next type. State that the I/O future produces (e.g., a loaded +//! index) becomes a field on the next instance type. +//! +//! # Doing I/O against the parquet file +//! +//! Both contexts expose a [`SharedAsyncFileReader`] that's cheaply +//! cloneable. Hook I/O futures can use it directly — they see the same +//! warm state the opener's primary reader has, with no need to call the +//! [`ParquetFileReaderFactory`] for a fresh reader. + +use std::fmt::Debug; +use std::sync::Arc; + +use arrow::datatypes::SchemaRef; +use datafusion_common::Result; +use datafusion_datasource::{FileRange, PartitionedFile}; +use datafusion_physical_expr_common::physical_expr::PhysicalExpr; +use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; +use datafusion_pruning::PruningPredicate; +use futures::future::BoxFuture; +use parquet::file::metadata::ParquetMetaData; + +use crate::ParquetAccessPlan; +use crate::ParquetFileMetrics; +use crate::ParquetFileReaderFactory; +use crate::SharedAsyncFileReader; +use crate::page_filter::PagePruningAccessPlanFilter; + +// ===================================================================== +// PostMetadata stage +// ===================================================================== + +/// Context for a [`PostMetadataAccessPlanHook`]. +/// +/// Available after the Parquet footer and page index (when enabled) have +/// been loaded, and after the built-in file-range and +/// row-group-statistics passes have refined the plan. Bloom filters have +/// **not** been loaded yet — the built-in bloom-filter pruning pass is +/// itself a hook at this stage. +#[derive(Debug)] +pub struct PostMetadataContext { + /// Access plan refined so far. Hooks mutate this in place. + pub plan: ParquetAccessPlan, + /// Execution partition index for the scan. + pub partition_index: usize, + /// The file being opened. + pub partitioned_file: PartitionedFile, + /// Optional byte range restricting which part of the file to read. + pub file_range: Option, + /// Schema of the file after type coercions. + pub physical_file_schema: SchemaRef, + /// Loaded Parquet metadata, including page index when enabled. + pub file_metadata: Arc, + /// Raw predicate applied to this scan, if any. + pub predicate: Option>, + /// Row-group-level pruning predicate derived from `predicate`. + pub pruning_predicate: Option>, + /// Page-index pruning predicate derived from `predicate`. The + /// built-in page-index pass has **not** run yet at this stage. + pub page_pruning_predicate: Option>, + /// Outer query limit, if any. The built-in limit pass has **not** + /// run yet at this stage. + pub limit: Option, + /// Whether the query requires the original row order to be preserved. + pub preserve_order: bool, + /// Per-file metrics. Hooks may emit to these counters. + pub file_metrics: ParquetFileMetrics, + /// Cheaply cloneable reader for the opened file. Hook I/O futures + /// should clone this rather than asking + /// [`Self::parquet_file_reader_factory`] for a fresh reader, so they + /// share warm state with the opener's primary reader. + pub async_file_reader: SharedAsyncFileReader, + /// Factory used by the opener to create the primary reader. Hooks + /// that need an *additional, independent* reader can obtain one here. + pub parquet_file_reader_factory: Arc, + /// Hint forwarded to + /// [`Self::parquet_file_reader_factory`]`::create_reader`. + pub metadata_size_hint: Option, + /// Plan-wide metrics set, for `MetricBuilder` use by hooks. + pub metrics: ExecutionPlanMetricsSet, +} + +/// Future returned by [`PostMetadataHookStep::Yield`]. +pub type PostMetadataHookYieldFuture = BoxFuture< + 'static, + Result<(Box, Box)>, +>; + +/// One step of a [`PostMetadataHookInstance`]. +pub enum PostMetadataHookStep { + /// Hook needs I/O. The opener awaits the future on its I/O pool; + /// the future returns the updated context and the next instance, + /// which the opener then drives on the CPU pool. + Yield(PostMetadataHookYieldFuture), + /// Hook is finished. `ctx` contains the final mutations. + Done(Box), +} + +/// A stateful instance of a [`PostMetadataAccessPlanHook`], produced by +/// [`PostMetadataAccessPlanHook::begin`]. +/// +/// The opener calls [`step`](Self::step) on the CPU pool. Any CPU work +/// happens inside `step`. To do I/O the hook returns +/// [`PostMetadataHookStep::Yield`]; the opener awaits the future on the +/// I/O pool and re-enters `step` on the next instance. +pub trait PostMetadataHookInstance: Debug + Send { + /// Drive one step. Consumes the current instance. + fn step( + self: Box, + ctx: Box, + ) -> Result; +} + +/// Factory trait for [`PostMetadataHookInstance`]s. One factory is +/// registered on [`ParquetSource`](crate::source::ParquetSource); the +/// opener calls [`begin`](Self::begin) once per file open. +pub trait PostMetadataAccessPlanHook: Debug + Send + Sync { + fn begin(&self) -> Box; +} + +// ===================================================================== +// PreBuildStream stage +// ===================================================================== + +/// Context for a [`PreBuildStreamAccessPlanHook`]. +/// +/// Available after **all** built-in pruning passes have run, just +/// before the reader stream is constructed. +#[derive(Debug)] +pub struct PreBuildStreamContext { + /// Access plan refined so far. Hooks mutate this in place. + pub plan: ParquetAccessPlan, + /// Execution partition index for the scan. + pub partition_index: usize, + /// The file being opened. + pub partitioned_file: PartitionedFile, + /// Optional byte range restricting which part of the file to read. + pub file_range: Option, + /// Schema of the file after type coercions. + pub physical_file_schema: SchemaRef, + /// Loaded Parquet metadata. + pub file_metadata: Arc, + /// Raw predicate applied to this scan, if any. + pub predicate: Option>, + /// Row-group-level pruning predicate derived from `predicate`. + pub pruning_predicate: Option>, + /// Outer query limit, if any. Already applied by the built-in + /// limit pass. + pub limit: Option, + /// Whether the query requires the original row order to be preserved. + pub preserve_order: bool, + /// Per-file metrics. + pub file_metrics: ParquetFileMetrics, + /// Cheaply cloneable reader for the opened file. + pub async_file_reader: SharedAsyncFileReader, + /// Factory used by the opener to create the primary reader. + pub parquet_file_reader_factory: Arc, + /// Hint forwarded to + /// [`Self::parquet_file_reader_factory`]`::create_reader`. + pub metadata_size_hint: Option, + /// Plan-wide metrics set. + pub metrics: ExecutionPlanMetricsSet, +} + +/// Future returned by [`PreBuildStreamHookStep::Yield`]. +pub type PreBuildStreamHookYieldFuture = BoxFuture< + 'static, + Result<( + Box, + Box, + )>, +>; + +/// One step of a [`PreBuildStreamHookInstance`]. See [`PostMetadataHookStep`]. +pub enum PreBuildStreamHookStep { + Yield(PreBuildStreamHookYieldFuture), + Done(Box), +} + +/// A stateful instance of a [`PreBuildStreamAccessPlanHook`]. See +/// [`PostMetadataHookInstance`] for the protocol. +pub trait PreBuildStreamHookInstance: Debug + Send { + fn step( + self: Box, + ctx: Box, + ) -> Result; +} + +/// Factory trait for [`PreBuildStreamHookInstance`]s. +pub trait PreBuildStreamAccessPlanHook: Debug + Send + Sync { + fn begin(&self) -> Box; +} diff --git a/datafusion/datasource-parquet/src/mod.rs b/datafusion/datasource-parquet/src/mod.rs index 9a907f4118a86..b67fd29b8c36e 100644 --- a/datafusion/datasource-parquet/src/mod.rs +++ b/datafusion/datasource-parquet/src/mod.rs @@ -25,6 +25,7 @@ #![cfg_attr(test, allow(clippy::needless_pass_by_value))] pub mod access_plan; +pub mod access_plan_optimizer; pub mod file_format; pub mod metadata; mod metrics; @@ -39,6 +40,11 @@ mod supported_predicates; mod writer; pub use access_plan::{ParquetAccessPlan, RowGroupAccess}; +pub use access_plan_optimizer::{ + PostMetadataAccessPlanHook, PostMetadataContext, PostMetadataHookInstance, + PostMetadataHookStep, PreBuildStreamAccessPlanHook, PreBuildStreamContext, + PreBuildStreamHookInstance, PreBuildStreamHookStep, +}; pub use file_format::*; pub use metrics::ParquetFileMetrics; pub use page_filter::PagePruningAccessPlanFilter; diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index bbbd298687ab5..c12c1be8f3c30 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -17,7 +17,13 @@ //! [`ParquetMorselizer`] state machines for opening Parquet files +use crate::access_plan_optimizer::{ + PostMetadataAccessPlanHook, PostMetadataContext, PostMetadataHookInstance, + PostMetadataHookStep, PreBuildStreamAccessPlanHook, PreBuildStreamContext, + PreBuildStreamHookInstance, PreBuildStreamHookStep, +}; use crate::page_filter::PagePruningAccessPlanFilter; +use crate::reader::SharedAsyncFileReader; use crate::row_filter::build_projection_read_plan; use crate::row_group_filter::{BloomFilterStatistics, RowGroupAccessPlanFilter}; use crate::{ @@ -136,6 +142,10 @@ pub(super) struct ParquetMorselizer { pub max_predicate_cache_size: Option, /// Whether to read row groups in reverse order pub reverse_row_groups: bool, + /// Hooks invoked after the built-in post-metadata pruning passes. + pub post_metadata_hooks: Vec>, + /// Hooks invoked after all built-in pruning passes. + pub pre_build_stream_hooks: Vec>, } impl fmt::Debug for ParquetMorselizer { @@ -220,14 +230,22 @@ enum ParquetOpenState { LoadPageIndex(BoxFuture<'static, Result>), /// Pruning Row Groups PruneWithStatistics(Box), + /// Driving a user-supplied post-metadata hook on the CPU pool. + RunPostMetadataHooksCpu(Box), + /// Awaiting an I/O step yielded by a post-metadata hook. + RunPostMetadataHooksIo(BoxFuture<'static, Result>), /// Loading bloom filters required for row-group pruning LoadBloomFilters(BoxFuture<'static, Result>), /// Pruning with preloaded Bloom Filters PruneWithBloomFilters(Box), - /// Builds the final reader stream - /// - /// TODO: split state as this currently does both I/O and CPU work. - BuildStream(Box), + /// CPU-only step: apply built-in limit and page-index pruning. + FinalizeAccessPlan(Box), + /// Driving a user-supplied pre-build-stream hook on the CPU pool. + RunPreBuildStreamHooksCpu(Box), + /// Awaiting an I/O step yielded by a pre-build-stream hook. + RunPreBuildStreamHooksIo(BoxFuture<'static, Result>), + /// Builds the final reader stream. + BuildStream(Box), /// Terminal state: the final opened stream is ready to return. Ready(BoxStream<'static, Result>), /// Terminal state: reading complete @@ -245,8 +263,13 @@ impl fmt::Debug for ParquetOpenState { ParquetOpenState::PrepareFilters(_) => "PrepareFilters", ParquetOpenState::LoadPageIndex(_) => "LoadPageIndex", ParquetOpenState::PruneWithStatistics(_) => "PruneWithStatistics", + ParquetOpenState::RunPostMetadataHooksCpu(_) => "RunPostMetadataHooksCpu", + ParquetOpenState::RunPostMetadataHooksIo(_) => "RunPostMetadataHooksIo", ParquetOpenState::LoadBloomFilters(_) => "LoadBloomFilters", ParquetOpenState::PruneWithBloomFilters(_) => "PruneWithBloomFilters", + ParquetOpenState::FinalizeAccessPlan(_) => "FinalizeAccessPlan", + ParquetOpenState::RunPreBuildStreamHooksCpu(_) => "RunPreBuildStreamHooksCpu", + ParquetOpenState::RunPreBuildStreamHooksIo(_) => "RunPreBuildStreamHooksIo", ParquetOpenState::BuildStream(_) => "BuildStream", ParquetOpenState::Ready(_) => "Ready", ParquetOpenState::Done => "Done", @@ -267,7 +290,7 @@ struct PreparedParquetOpen { metadata_size_hint: Option, metrics: ExecutionPlanMetricsSet, parquet_file_reader_factory: Arc, - async_file_reader: Box, + async_file_reader: SharedAsyncFileReader, batch_size: usize, logical_file_schema: SchemaRef, physical_file_schema: SchemaRef, @@ -287,6 +310,8 @@ struct PreparedParquetOpen { max_predicate_cache_size: Option, reverse_row_groups: bool, preserve_order: bool, + post_metadata_hooks: Vec>, + pre_build_stream_hooks: Vec>, #[cfg(feature = "parquet_encryption")] file_decryption_properties: Option>, } @@ -329,6 +354,35 @@ struct BloomFiltersLoadedParquetOpen { row_group_bloom_filters: Vec, } +/// State of [`ParquetOpenState`] +/// +/// Final access plan after built-in limit + page-index pruning. The +/// pre-build-stream hooks see this state. +struct FinalizedAccessPlanState { + prepared: FiltersPreparedParquetOpen, + access_plan: ParquetAccessPlan, +} + +/// In-flight state while driving a [`PostMetadataAccessPlanHook`]. +/// +/// One hook instance + the queue of remaining hooks to run after it +/// completes + the carrier (the rest of the opener state, threaded +/// through the iteration to be reassembled at the end). +struct PostMetadataHookProgress { + ctx: Box, + instance: Box, + remaining: Vec>, + carrier: Box, +} + +/// In-flight state while driving a [`PreBuildStreamAccessPlanHook`]. +struct PreBuildStreamHookProgress { + ctx: Box, + instance: Box, + remaining: Vec>, + carrier: Box, +} + impl ParquetOpenState { /// Applies one CPU-only state transition. /// @@ -386,16 +440,31 @@ impl ParquetOpenState { } ParquetOpenState::PruneWithStatistics(prepared) => { let prepared_row_groups = prepared.prune_row_groups()?; - Ok(ParquetOpenState::LoadBloomFilters( - prepared_row_groups.load_bloom_filters().boxed(), - )) + Self::after_post_metadata(prepared_row_groups) + } + ParquetOpenState::RunPostMetadataHooksCpu(progress) => { + Self::drive_post_metadata_step(*progress) + } + ParquetOpenState::RunPostMetadataHooksIo(future) => { + Ok(ParquetOpenState::RunPostMetadataHooksIo(future)) } ParquetOpenState::LoadBloomFilters(future) => { Ok(ParquetOpenState::LoadBloomFilters(future)) } - ParquetOpenState::PruneWithBloomFilters(loaded) => Ok( - ParquetOpenState::BuildStream(Box::new(loaded.prune_bloom_filters())), - ), + ParquetOpenState::PruneWithBloomFilters(loaded) => { + let pruned = loaded.prune_bloom_filters(); + Ok(ParquetOpenState::FinalizeAccessPlan(Box::new(pruned))) + } + ParquetOpenState::FinalizeAccessPlan(prepared) => { + let finalized = prepared.finalize_access_plan(); + Self::after_finalize(finalized) + } + ParquetOpenState::RunPreBuildStreamHooksCpu(progress) => { + Self::drive_pre_build_stream_step(*progress) + } + ParquetOpenState::RunPreBuildStreamHooksIo(future) => { + Ok(ParquetOpenState::RunPreBuildStreamHooksIo(future)) + } ParquetOpenState::BuildStream(prepared) => { Ok(ParquetOpenState::Ready(prepared.build_stream()?)) } @@ -405,6 +474,218 @@ impl ParquetOpenState { } } } + + /// After built-in post-metadata pruning: if any post-metadata hooks + /// are registered, begin driving the first one. Otherwise transition + /// straight to bloom-filter loading. + fn after_post_metadata( + pruned: RowGroupsPrunedParquetOpen, + ) -> Result { + if pruned + .prepared + .loaded + .prepared + .post_metadata_hooks + .is_empty() + { + return Ok(ParquetOpenState::LoadBloomFilters( + pruned.load_bloom_filters().boxed(), + )); + } + let RowGroupsPrunedParquetOpen { + prepared: carrier, + row_groups, + } = pruned; + let mut remaining = carrier.loaded.prepared.post_metadata_hooks.clone(); + let first = remaining.remove(0); + let ctx = build_post_metadata_context(&carrier, row_groups.build()); + let instance = first.begin(); + Ok(ParquetOpenState::RunPostMetadataHooksCpu(Box::new( + PostMetadataHookProgress { + ctx, + instance, + remaining, + carrier: Box::new(carrier), + }, + ))) + } + + /// Drive one CPU step of the current post-metadata hook. + fn drive_post_metadata_step( + progress: PostMetadataHookProgress, + ) -> Result { + let PostMetadataHookProgress { + ctx, + instance, + mut remaining, + carrier, + } = progress; + match instance.step(ctx)? { + PostMetadataHookStep::Yield(future) => { + let resume = async move { + let (ctx, instance) = future.await?; + Ok(PostMetadataHookProgress { + ctx, + instance, + remaining, + carrier, + }) + } + .boxed(); + Ok(ParquetOpenState::RunPostMetadataHooksIo(resume)) + } + PostMetadataHookStep::Done(ctx) => { + if let Some(next) = (!remaining.is_empty()).then(|| remaining.remove(0)) { + let instance = next.begin(); + Ok(ParquetOpenState::RunPostMetadataHooksCpu(Box::new( + PostMetadataHookProgress { + ctx, + instance, + remaining, + carrier, + }, + ))) + } else { + // All post-metadata hooks complete. Reassemble and + // advance to bloom-filter loading. + let row_groups = RowGroupAccessPlanFilter::new(ctx.plan); + let pruned = RowGroupsPrunedParquetOpen { + prepared: *carrier, + row_groups, + }; + Ok(ParquetOpenState::LoadBloomFilters( + pruned.load_bloom_filters().boxed(), + )) + } + } + } + } + + /// After built-in limit + page-index pruning: if any + /// pre-build-stream hooks are registered, begin driving the first + /// one. Otherwise transition straight to BuildStream. + fn after_finalize(finalized: FinalizedAccessPlanState) -> Result { + if finalized + .prepared + .loaded + .prepared + .pre_build_stream_hooks + .is_empty() + { + return Ok(ParquetOpenState::BuildStream(Box::new(finalized))); + } + let FinalizedAccessPlanState { + prepared: carrier, + access_plan, + } = finalized; + let mut remaining = carrier.loaded.prepared.pre_build_stream_hooks.clone(); + let first = remaining.remove(0); + let ctx = build_pre_build_stream_context(&carrier, access_plan); + let instance = first.begin(); + Ok(ParquetOpenState::RunPreBuildStreamHooksCpu(Box::new( + PreBuildStreamHookProgress { + ctx, + instance, + remaining, + carrier: Box::new(carrier), + }, + ))) + } + + /// Drive one CPU step of the current pre-build-stream hook. + fn drive_pre_build_stream_step( + progress: PreBuildStreamHookProgress, + ) -> Result { + let PreBuildStreamHookProgress { + ctx, + instance, + mut remaining, + carrier, + } = progress; + match instance.step(ctx)? { + PreBuildStreamHookStep::Yield(future) => { + let resume = async move { + let (ctx, instance) = future.await?; + Ok(PreBuildStreamHookProgress { + ctx, + instance, + remaining, + carrier, + }) + } + .boxed(); + Ok(ParquetOpenState::RunPreBuildStreamHooksIo(resume)) + } + PreBuildStreamHookStep::Done(ctx) => { + if let Some(next) = (!remaining.is_empty()).then(|| remaining.remove(0)) { + let instance = next.begin(); + Ok(ParquetOpenState::RunPreBuildStreamHooksCpu(Box::new( + PreBuildStreamHookProgress { + ctx, + instance, + remaining, + carrier, + }, + ))) + } else { + let finalized = FinalizedAccessPlanState { + prepared: *carrier, + access_plan: ctx.plan, + }; + Ok(ParquetOpenState::BuildStream(Box::new(finalized))) + } + } + } + } +} + +fn build_post_metadata_context( + state: &FiltersPreparedParquetOpen, + plan: ParquetAccessPlan, +) -> Box { + let inner = &state.loaded.prepared; + Box::new(PostMetadataContext { + plan, + partition_index: inner.partition_index, + partitioned_file: inner.partitioned_file.clone(), + file_range: inner.file_range.clone(), + physical_file_schema: Arc::clone(&inner.physical_file_schema), + file_metadata: Arc::clone(state.loaded.reader_metadata.metadata()), + predicate: inner.predicate.clone(), + pruning_predicate: state.pruning_predicate.clone(), + page_pruning_predicate: state.page_pruning_predicate.clone(), + limit: inner.limit, + preserve_order: inner.preserve_order, + file_metrics: inner.file_metrics.clone(), + async_file_reader: inner.async_file_reader.clone(), + parquet_file_reader_factory: Arc::clone(&inner.parquet_file_reader_factory), + metadata_size_hint: inner.metadata_size_hint, + metrics: inner.metrics.clone(), + }) +} + +fn build_pre_build_stream_context( + state: &FiltersPreparedParquetOpen, + plan: ParquetAccessPlan, +) -> Box { + let inner = &state.loaded.prepared; + Box::new(PreBuildStreamContext { + plan, + partition_index: inner.partition_index, + partitioned_file: inner.partitioned_file.clone(), + file_range: inner.file_range.clone(), + physical_file_schema: Arc::clone(&inner.physical_file_schema), + file_metadata: Arc::clone(state.loaded.reader_metadata.metadata()), + predicate: inner.predicate.clone(), + pruning_predicate: state.pruning_predicate.clone(), + limit: inner.limit, + preserve_order: inner.preserve_order, + file_metrics: inner.file_metrics.clone(), + async_file_reader: inner.async_file_reader.clone(), + parquet_file_reader_factory: Arc::clone(&inner.parquet_file_reader_factory), + metadata_size_hint: inner.metadata_size_hint, + metrics: inner.metrics.clone(), + }) } /// Implements the Morsel API @@ -515,6 +796,20 @@ impl MorselPlanner for ParquetMorselPlanner { ))) }))) } + ParquetOpenState::RunPostMetadataHooksIo(future) => { + Ok(Some(Self::schedule_io(async move { + Ok(ParquetOpenState::RunPostMetadataHooksCpu(Box::new( + future.await?, + ))) + }))) + } + ParquetOpenState::RunPreBuildStreamHooksIo(future) => { + Ok(Some(Self::schedule_io(async move { + Ok(ParquetOpenState::RunPreBuildStreamHooksCpu(Box::new( + future.await?, + ))) + }))) + } ParquetOpenState::Ready(stream) => { let morsels: Vec> = vec![Box::new(ParquetStreamMorsel::new(stream))]; @@ -546,13 +841,13 @@ impl ParquetMorselizer { .metadata_size_hint .or(self.metadata_size_hint); - let async_file_reader: Box = - self.parquet_file_reader_factory.create_reader( + let async_file_reader = + SharedAsyncFileReader::new(self.parquet_file_reader_factory.create_reader( self.partition_index, partitioned_file.clone(), metadata_size_hint, &self.metrics, - )?; + )?); // Calculate the output schema from the original projection (before literal replacement) // so we get correct field names from column references @@ -656,6 +951,8 @@ impl ParquetMorselizer { max_predicate_cache_size: self.max_predicate_cache_size, reverse_row_groups: self.reverse_row_groups, preserve_order: self.preserve_order, + post_metadata_hooks: self.post_metadata_hooks.clone(), + pre_build_stream_hooks: self.pre_build_stream_hooks.clone(), #[cfg(feature = "parquet_encryption")] file_decryption_properties: None, }) @@ -962,12 +1259,14 @@ impl RowGroupsPrunedParquetOpen { let reader_metadata = self.prepared.loaded.reader_metadata.clone(); let replacement_reader = { let prepared = &self.prepared.loaded.prepared; - prepared.parquet_file_reader_factory.create_reader( - prepared.partition_index, - prepared.partitioned_file.clone(), - prepared.metadata_size_hint, - &prepared.metrics, - )? + SharedAsyncFileReader::new( + prepared.parquet_file_reader_factory.create_reader( + prepared.partition_index, + prepared.partitioned_file.clone(), + prepared.metadata_size_hint, + &prepared.metrics, + )?, + ) }; let prepared = &mut self.prepared.loaded.prepared; @@ -1055,16 +1354,57 @@ impl BloomFiltersLoadedParquetOpen { } impl RowGroupsPrunedParquetOpen { - /// Build the final parquet stream once all pruning work is complete. - fn build_stream(self) -> Result>> { + /// Run the built-in limit + page-index pruning passes and extract a + /// finalized [`ParquetAccessPlan`]. CPU-only. + fn finalize_access_plan(self) -> FinalizedAccessPlanState { let RowGroupsPrunedParquetOpen { prepared, mut row_groups, } = self; + let file_metadata = Arc::clone(prepared.loaded.reader_metadata.metadata()); + let rg_metadata = file_metadata.row_groups(); + let inner = &prepared.loaded.prepared; + + // Prune by limit if limit is set and limit order is not sensitive + if let (Some(limit), false) = (inner.limit, inner.preserve_order) { + row_groups.prune_by_limit(limit, rg_metadata, &inner.file_metrics); + } + + // Page index pruning: if all data on individual pages can + // be ruled using page metadata, rows from other columns + // with that range can be skipped as well. + let mut access_plan = row_groups.build(); + if inner.enable_page_index + && !access_plan.is_empty() + && let Some(page_pruning_predicate) = prepared.page_pruning_predicate.as_ref() + { + access_plan = page_pruning_predicate.prune_plan_with_page_index( + access_plan, + &inner.physical_file_schema, + prepared.loaded.reader_metadata.parquet_schema(), + file_metadata.as_ref(), + &inner.file_metrics, + ); + } + + FinalizedAccessPlanState { + prepared, + access_plan, + } + } +} + +impl FinalizedAccessPlanState { + /// Build the final parquet stream from the finalized access plan. + fn build_stream(self) -> Result>> { + let FinalizedAccessPlanState { + prepared, + access_plan, + } = self; let FiltersPreparedParquetOpen { loaded, pruning_predicate: _, - page_pruning_predicate, + page_pruning_predicate: _, } = prepared; let MetadataLoadedParquetOpen { prepared, @@ -1101,28 +1441,6 @@ impl RowGroupsPrunedParquetOpen { None }; - // Prune by limit if limit is set and limit order is not sensitive - if let (Some(limit), false) = (prepared.limit, prepared.preserve_order) { - row_groups.prune_by_limit(limit, rg_metadata, &prepared.file_metrics); - } - - // Page index pruning: if all data on individual pages can - // be ruled using page metadata, rows from other columns - // with that range can be skipped as well. - let mut access_plan = row_groups.build(); - if prepared.enable_page_index - && !access_plan.is_empty() - && let Some(page_pruning_predicate) = page_pruning_predicate - { - access_plan = page_pruning_predicate.prune_plan_with_page_index( - access_plan, - &prepared.physical_file_schema, - reader_metadata.parquet_schema(), - file_metadata.as_ref(), - &prepared.file_metrics, - ); - } - // Prepare the access plan (extract row groups and row selection) let mut prepared_plan = access_plan.prepare(rg_metadata)?; @@ -1226,7 +1544,7 @@ impl RowGroupsPrunedParquetOpen { /// fully consumed. struct PushDecoderStreamState { decoder: ParquetPushDecoder, - reader: Box, + reader: SharedAsyncFileReader, projector: Projector, output_schema: Arc, replace_schema: bool, @@ -1670,6 +1988,8 @@ mod test { max_predicate_cache_size: Option, reverse_row_groups: bool, preserve_order: bool, + post_metadata_hooks: Vec>, + pre_build_stream_hooks: Vec>, } impl ParquetMorselizerBuilder { @@ -1696,6 +2016,8 @@ mod test { max_predicate_cache_size: None, reverse_row_groups: false, preserve_order: false, + post_metadata_hooks: Vec::new(), + pre_build_stream_hooks: Vec::new(), } } @@ -1810,8 +2132,27 @@ mod test { encryption_factory: None, max_predicate_cache_size: self.max_predicate_cache_size, reverse_row_groups: self.reverse_row_groups, + post_metadata_hooks: self.post_metadata_hooks.clone(), + pre_build_stream_hooks: self.pre_build_stream_hooks.clone(), } } + + fn with_post_metadata_access_plan_hook( + mut self, + hook: Arc, + ) -> Self { + self.post_metadata_hooks.push(hook); + self + } + + #[expect(dead_code)] + fn with_pre_build_stream_access_plan_hook( + mut self, + hook: Arc, + ) -> Self { + self.pre_build_stream_hooks.push(hook); + self + } } /// Test helper that drives a [`ParquetMorselizer`] to completion and returns @@ -2714,4 +3055,121 @@ mod test { "without page index all rows are returned" ); } + + /// Verifies that a user-registered multi-step + /// [`PostMetadataAccessPlanHook`] is driven correctly through both + /// CPU and I/O steps, and that the access plan it produces is + /// honored downstream. + /// + /// The hook simulates an "external index" pattern: + /// 1. CPU step: inspect ctx, yield a future. + /// 2. I/O step: load a "mask" via `ctx.async_file_reader` (this just + /// fetches a tiny byte range so we exercise the shared-reader path). + /// 3. CPU step: apply the mask, marking row group 0 as `Skip`. + #[tokio::test] + async fn test_post_metadata_hook_multi_step() { + use crate::access_plan_optimizer::{ + PostMetadataAccessPlanHook, PostMetadataContext, PostMetadataHookInstance, + PostMetadataHookStep, + }; + use futures::FutureExt; + use parquet::arrow::async_reader::AsyncFileReader; + use parquet::file::properties::WriterProperties; + + let store = Arc::new(InMemory::new()) as Arc; + // 100 rows in 2 row groups of 50 each. + let values: Vec = (0..100).collect(); + let batch = record_batch!(( + "a", + Int32, + values.iter().map(|v| Some(*v)).collect::>() + )) + .unwrap(); + let props = WriterProperties::builder() + .set_max_row_group_row_count(Some(50)) + .build(); + let schema = batch.schema(); + let data_size = write_parquet_batches( + Arc::clone(&store), + "two_row_groups.parquet", + vec![batch], + Some(props), + ) + .await; + let file = + PartitionedFile::new("two_row_groups.parquet".to_string(), data_size as u64); + + #[derive(Debug)] + struct ExternalIndexHook; + + impl PostMetadataAccessPlanHook for ExternalIndexHook { + fn begin(&self) -> Box { + Box::new(HookStart) + } + } + + // Step 1: inspect ctx, yield to I/O. + #[derive(Debug)] + struct HookStart; + impl PostMetadataHookInstance for HookStart { + fn step( + self: Box, + ctx: Box, + ) -> Result { + let mut reader = ctx.async_file_reader.clone(); + Ok(PostMetadataHookStep::Yield( + async move { + // Real I/O against the parquet file (warm reader) + // — fetch the first few bytes just to prove the + // shared-reader plumbing works end-to-end. + let _ = reader + .get_bytes(0..4) + .await + .map_err(DataFusionError::from)?; + let next: Box = + Box::new(HookLoaded); + Ok((ctx, next)) + } + .boxed(), + )) + } + } + + // Step 2 (after I/O): CPU work — narrow the plan. + #[derive(Debug)] + struct HookLoaded; + impl PostMetadataHookInstance for HookLoaded { + fn step( + self: Box, + mut ctx: Box, + ) -> Result { + if !ctx.plan.is_empty() { + ctx.plan.skip(0); + } + Ok(PostMetadataHookStep::Done(ctx)) + } + } + + let baseline = ParquetMorselizerBuilder::new() + .with_store(Arc::clone(&store)) + .with_schema(Arc::clone(&schema)) + .build(); + let with_hook = ParquetMorselizerBuilder::new() + .with_store(Arc::clone(&store)) + .with_schema(Arc::clone(&schema)) + .with_post_metadata_access_plan_hook(Arc::new(ExternalIndexHook)) + .build(); + + let (_, baseline_rows) = + count_batches_and_rows(open_file(&baseline, file.clone()).await.unwrap()) + .await; + let (_, hooked_rows) = + count_batches_and_rows(open_file(&with_hook, file).await.unwrap()).await; + + assert_eq!(baseline_rows, 100, "baseline reads all rows"); + assert_eq!( + hooked_rows, 50, + "hook should skip the first row group (50 rows)" + ); + } } diff --git a/datafusion/datasource-parquet/src/reader.rs b/datafusion/datasource-parquet/src/reader.rs index 482bf8dced4f8..f92057cd09275 100644 --- a/datafusion/datasource-parquet/src/reader.rs +++ b/datafusion/datasource-parquet/src/reader.rs @@ -361,3 +361,76 @@ impl FileMetadata for CachedParquetMetaData { HashMap::from([("page_index".to_owned(), page_index.to_string())]) } } + +/// A clone-friendly wrapper around a [`Box`]. +/// +/// `AsyncFileReader::get_bytes` takes `&mut self`, which means a raw +/// `Box` cannot be shared between consumers. +/// `SharedAsyncFileReader` wraps an inner reader in +/// `Arc>` and reimplements [`AsyncFileReader`] by +/// delegating each call through the mutex. Cloning the wrapper bumps the +/// `Arc` refcount — both consumers see the same underlying reader (and its +/// warm state, e.g. byte caches inside a custom reader). +/// +/// The Mutex serializes concurrent calls, but the opener uses the reader +/// sequentially (footer load → page index load → hook I/O → decode), so +/// contention is not an issue in practice. +#[derive(Clone)] +pub struct SharedAsyncFileReader { + inner: Arc>>, +} + +impl SharedAsyncFileReader { + /// Wrap a raw reader. + pub fn new(reader: Box) -> Self { + Self { + inner: Arc::new(tokio::sync::Mutex::new(reader)), + } + } +} + +impl Debug for SharedAsyncFileReader { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("SharedAsyncFileReader") + .finish_non_exhaustive() + } +} + +impl AsyncFileReader for SharedAsyncFileReader { + fn get_bytes( + &mut self, + range: Range, + ) -> BoxFuture<'_, parquet::errors::Result> { + let arc = Arc::clone(&self.inner); + async move { + let mut guard = arc.lock().await; + guard.get_bytes(range).await + } + .boxed() + } + + fn get_byte_ranges( + &mut self, + ranges: Vec>, + ) -> BoxFuture<'_, parquet::errors::Result>> { + let arc = Arc::clone(&self.inner); + async move { + let mut guard = arc.lock().await; + guard.get_byte_ranges(ranges).await + } + .boxed() + } + + fn get_metadata<'a>( + &'a mut self, + options: Option<&'a ArrowReaderOptions>, + ) -> BoxFuture<'a, parquet::errors::Result>> { + let arc = Arc::clone(&self.inner); + let options = options.cloned(); + async move { + let mut guard = arc.lock().await; + guard.get_metadata(options.as_ref()).await + } + .boxed() + } +} diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs index 91e3d2274932d..c0ed3d7809dbf 100644 --- a/datafusion/datasource-parquet/src/source.rs +++ b/datafusion/datasource-parquet/src/source.rs @@ -22,6 +22,9 @@ use std::sync::Arc; use crate::DefaultParquetFileReaderFactory; use crate::ParquetFileReaderFactory; +use crate::access_plan_optimizer::{ + PostMetadataAccessPlanHook, PreBuildStreamAccessPlanHook, +}; use crate::opener::ParquetMorselizer; use crate::opener::build_pruning_predicates; use crate::row_filter::can_expr_be_pushed_down_with_schemas; @@ -294,6 +297,11 @@ pub struct ParquetSource { /// so we still need to sort them after reading, so the reverse scan is inexact. /// Used to optimize ORDER BY ... DESC on sorted data. reverse_row_groups: bool, + /// Hooks invoked after the built-in post-metadata pruning passes. + pub(crate) post_metadata_hooks: Vec>, + /// Hooks invoked after all built-in pruning passes, just before + /// the reader stream is constructed. + pub(crate) pre_build_stream_hooks: Vec>, } impl ParquetSource { @@ -319,9 +327,50 @@ impl ParquetSource { #[cfg(feature = "parquet_encryption")] encryption_factory: None, reverse_row_groups: false, + post_metadata_hooks: Vec::new(), + pre_build_stream_hooks: Vec::new(), } } + /// Register a hook invoked after the built-in post-metadata pruning + /// passes (file-range + row-group statistics). The hook may do + /// multiple CPU and I/O steps; see [`PostMetadataAccessPlanHook`]. + /// + /// Multiple hooks can be registered; they run sequentially in + /// registration order. + pub fn with_post_metadata_access_plan_hook( + mut self, + hook: Arc, + ) -> Self { + self.post_metadata_hooks.push(hook); + self + } + + /// Register a hook invoked after all built-in pruning passes, just + /// before the reader stream is constructed. See + /// [`PreBuildStreamAccessPlanHook`]. + pub fn with_pre_build_stream_access_plan_hook( + mut self, + hook: Arc, + ) -> Self { + self.pre_build_stream_hooks.push(hook); + self + } + + /// All post-metadata access-plan hooks registered on this source. + pub fn post_metadata_access_plan_hooks( + &self, + ) -> &[Arc] { + &self.post_metadata_hooks + } + + /// All pre-build-stream access-plan hooks registered on this source. + pub fn pre_build_stream_access_plan_hooks( + &self, + ) -> &[Arc] { + &self.pre_build_stream_hooks + } + /// Set the `TableParquetOptions` for this ParquetSource. pub fn with_table_parquet_options( mut self, @@ -581,6 +630,8 @@ impl FileSource for ParquetSource { encryption_factory: self.get_encryption_factory_with_config(), max_predicate_cache_size: self.max_predicate_cache_size(), reverse_row_groups: self.reverse_row_groups, + post_metadata_hooks: self.post_metadata_hooks.clone(), + pre_build_stream_hooks: self.pre_build_stream_hooks.clone(), })) }