Skip to content

feat(parquet): two-stage access-plan hooks with shared async reader#22160

Draft
adriangb wants to merge 1 commit into
apache:mainfrom
adriangb:feat/parquet-access-plan-hooks
Draft

feat(parquet): two-stage access-plan hooks with shared async reader#22160
adriangb wants to merge 1 commit into
apache:mainfrom
adriangb:feat/parquet-access-plan-hooks

Conversation

@adriangb
Copy link
Copy Markdown
Contributor

Which issue does this PR close?

Relates to the discussion in #22024 about how to extend the Parquet opener with additional pruning strategies (sampling, custom indexes, etc.) without forking it. Does not close that issue — this is the smallest standalone PR that delivers an extension point.

Rationale for this change

DataFusion's Parquet opener runs a fixed pipeline of built-in pruning passes (file range, row-group statistics, bloom filter, limit, page index). Proposed work in #22024 (sampling), #21637 (fully-matched row groups), and #22144 (adaptive filter pushdown) all want to add passes — and today each one requires editing the opener inline.

This PR adds two extension points that let those passes (and external ones — user-defined Parquet indexes, custom statistics) live outside the opener:

  • `PostMetadataAccessPlanHook` — runs after the built-in file-range and row-group-statistics passes have refined the plan. Bloom filters have not been loaded yet.
  • `PreBuildStreamAccessPlanHook` — runs after all built-in pruning passes, just before the reader stream is constructed.

A hook may need to do multiple steps of CPU and I/O work — for example, fetch an external index (I/O), then apply pruning using the fetched data (CPU). To preserve the opener's CPU/I/O routing, each hook is itself a small state machine driven by the opener.

What changes are included in this PR?

New trait shape

```rust
pub trait PostMetadataAccessPlanHook: Debug + Send + Sync {
fn begin(&self) -> Box;
}

pub trait PostMetadataHookInstance: Debug + Send {
fn step(self: Box, ctx: Box)
-> Result;
}

pub enum PostMetadataHookStep {
/// Hook needs I/O. Caller awaits the future on the I/O pool;
/// future returns the updated context + next instance.
Yield(PostMetadataHookYieldFuture),
/// Hook is finished.
Done(Box),
}
```

(Equivalent shapes for the `PreBuildStream` stage.)

Each "state" of a hook is its own type implementing the instance trait. `step` consumes the current instance and returns the next one — so loaded data (e.g. an external index) becomes a field on the next instance type, with no scratchpad on the context.

State machine integration

The opener's `ParquetOpenState` gains four new variants — `Cpu` and `Io` per stage — that route hook steps to the appropriate pool:

```text
... → PruneWithStatistics
→ RunPostMetadataHooksCpu ⇄ RunPostMetadataHooksIo
→ LoadBloomFilters → PruneWithBloomFilters
→ FinalizeAccessPlan (built-in limit + page-index pruning)
→ RunPreBuildStreamHooksCpu ⇄ RunPreBuildStreamHooksIo
→ BuildStream
```

`build_stream`'s prior inline limit + page-index pruning moves to a dedicated `FinalizeAccessPlan` CPU step so the `PreBuildStream` hooks run after built-in pruning. When no hooks are registered for a stage, the opener skips the new states entirely — zero allocation overhead for the no-hook case.

`SharedAsyncFileReader`

`AsyncFileReader::get_bytes` takes `&mut self`, so a raw `Box` can't be shared between consumers. A new `SharedAsyncFileReader` wraps the boxed reader in `Arc<tokio::sync::Mutex<...>>` and reimplements `AsyncFileReader`. Cloning the wrapper bumps the `Arc` refcount; hook I/O futures get warm state (footer cache, byte-coalescing buffers in custom readers) for free instead of paying for `factory.create_reader(...)` per hook. The Mutex never contends in practice because reads are sequential.

The wrapper replaces `Box` everywhere it was previously used in `PreparedParquetOpen` and `PushDecoderStreamState`.

Registration surface

```rust
ParquetSource::new(schema)
.with_post_metadata_access_plan_hook(Arc::new(MyExternalIndexHook { ... }))
.with_pre_build_stream_access_plan_hook(Arc::new(MySampler { ... }))
```

Multiple hooks per stage are supported; they run in registration order.

Deliberately not in this PR

  • Migrating built-in passes to hooks. Bloom-filter loading + pruning, page-index pruning, and limit pruning all stay in the opener for now. These migrations are mechanical follow-ups that can be reviewed individually for semantics preservation.
  • Per-conjunct pruning rates / selectivity tracker integration. That's a separate API question and not needed for the extension point itself.

Are these changes tested?

Yes:

  • All 100 existing `datafusion-datasource-parquet` unit tests pass.
  • New `test_post_metadata_hook_multi_step` exercises a multi-step user hook end-to-end:
    • CPU step inspects context and yields a future.
    • I/O step calls `ctx.async_file_reader.clone().get_bytes(..)` (proving the shared-reader plumbing works against a real parquet file in object store).
    • CPU step narrows the access plan.
    • Test asserts the reader skips the corresponding row group (50 rows out of 100 across 2 row groups).
  • `cargo fmt --all`, `./dev/rust_lint.sh`, `cargo clippy -p datafusion-datasource-parquet --all-targets --all-features -- -D warnings` all pass.
  • Downstream `datafusion` core builds clean.

Are there any user-facing changes?

New public Rust API, all in `datafusion_datasource_parquet` (re-exported at the crate root):

  • `PostMetadataAccessPlanHook` + `PostMetadataHookInstance` + `PostMetadataHookStep` + `PostMetadataHookYieldFuture` + `PostMetadataContext`
  • `PreBuildStreamAccessPlanHook` + `PreBuildStreamHookInstance` + `PreBuildStreamHookStep` + `PreBuildStreamHookYieldFuture` + `PreBuildStreamContext`
  • `SharedAsyncFileReader` (in the `reader` module)
  • `ParquetSource::with_post_metadata_access_plan_hook`, `with_pre_build_stream_access_plan_hook`
  • `ParquetSource::post_metadata_access_plan_hooks`, `pre_build_stream_access_plan_hooks`

No existing API changed. No breaking changes.

Add two extension points so external crates can contribute additional
pruning passes (sampling, custom statistics, user-defined Parquet
indexes) without forking the opener:

- `PostMetadataAccessPlanHook` runs after the built-in file-range and
  row-group-statistics passes have refined the access plan. Bloom
  filters have not been loaded yet.
- `PreBuildStreamAccessPlanHook` runs after all built-in pruning
  passes, just before the reader stream is constructed.

Each hook is itself a small state machine: `begin()` returns an
instance; `step()` runs on the CPU pool and either yields a
`BoxFuture` for I/O (awaited on the I/O pool) or signals `Done`. The
instance morphs between typed states between calls, so loaded data
(e.g. an external index) becomes a field on the next instance type.

The opener's state machine grows four new variants — `Cpu` and `Io`
per stage — that route hook steps to the correct pool. When no hooks
are registered for a stage the opener skips the new states entirely.

`SharedAsyncFileReader` wraps the `Box<dyn AsyncFileReader>` returned
by the factory in `Arc<tokio::sync::Mutex<...>>` and reimplements
`AsyncFileReader`. The wrapper is cheaply cloneable, so hook I/O
futures share warm state (footer cache, etc.) with the opener's
primary reader. The Mutex never contends in practice because reads
are sequential.

Net effect on existing behavior: none. The built-in pruning passes
(file range, statistics, bloom, limit, page index) all remain as
opener state-machine steps. Moving them to default-registered hooks
is a follow-up so reviewers can evaluate the semantics-preservation
of each conversion separately.

A new end-to-end test exercises a multi-step user hook: CPU step
inspects context, yields to I/O fetching bytes from the warm reader,
CPU step narrows the access plan, asserts the reader skips the
corresponding row group.

All 100 existing `datafusion-datasource-parquet` lib tests pass plus
the new test. `cargo fmt --all`, `./dev/rust_lint.sh`, and
`cargo clippy -p datafusion-datasource-parquet --all-targets
--all-features -- -D warnings` are clean. Downstream `datafusion`
core builds cleanly.
@github-actions github-actions Bot added the datasource Changes to the datasource crate label May 14, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

datasource Changes to the datasource crate

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant