Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
60 commits
Select commit Hold shift + click to select a range
6704444
feat(parquet): in-decoder adaptive filter scheduling
adriangb Apr 27, 2026
caa9448
fix(parquet): rustdoc broken intra-doc links; regen configs.md
adriangb Apr 28, 2026
149a71a
chore: bump Cargo.lock to pydantic/arrow-rs adaptive-strategy-swap@b6…
adriangb Apr 28, 2026
9f5e9dc
fix(example): force row-level pushdown in json_shredding
adriangb Apr 28, 2026
4a910f3
fix(parquet): swap-safe projection mask + overlap-aware filter heuristic
adriangb Apr 28, 2026
c05bfb0
fix(parquet): require non-zero extra_bytes for row-level initial plac…
adriangb Apr 28, 2026
7f2baf6
fix(parquet): add prune_rate gate; drop stats sampling interval
adriangb Apr 28, 2026
ee6e5b8
refactor(parquet): cache is_optional inline; drop sampling intervals
adriangb Apr 28, 2026
5df3a2a
feat(parquet): scatter-aware bytes-saved metric
adriangb Apr 28, 2026
8522d18
restore: STATS_SAMPLE_INTERVAL=32 + prune_rate>=0.99 gate for promotion
adriangb Apr 28, 2026
5f203a3
fix(parquet): drop STATS_SAMPLE_INTERVAL — sample every batch
adriangb Apr 28, 2026
ac525dc
fix(parquet): drop the prune_rate ≥ 99% promotion gate
adriangb Apr 29, 2026
b3a3a65
feat(parquet): coalesce post-scan-filtered batches
adriangb Apr 29, 2026
ae43632
Revert "feat(parquet): coalesce post-scan-filtered batches"
adriangb Apr 29, 2026
a0a03fa
exp(parquet): page-pruning prior for initial filter placement
adriangb May 8, 2026
107d0a2
exp(parquet): combined page-pruning prior + latency-aware z
adriangb May 8, 2026
d61033c
docs: experiments.md final writeup of overnight exp1-4
adriangb May 8, 2026
3af9ae7
docs: round 2 — push all-combined further (mid-stream swap + post-sca…
adriangb May 8, 2026
3114f14
docs: round 3 — Q64 chase in morsel flow, four dead ends
adriangb May 8, 2026
1b7f70b
docs: round 4 — page-pruning prior tried and rejected
adriangb May 8, 2026
c9fa237
exp(parquet): per-conjunct page-pruning rates as side-effect of opene…
adriangb May 8, 2026
4e139a7
exp(parquet): drop debug prints + dead row-group prior helpers
adriangb May 8, 2026
ac9c8b3
docs: round 5 — proper per-conjunct architecture (page-prior as side-…
adriangb May 8, 2026
27386ac
exp(pruning): per-conjunct rates from PruningPredicate as side-effect
adriangb May 8, 2026
55197aa
docs: r6 progress + r7 plan
adriangb May 8, 2026
8f30b10
exp(parquet): per-conjunct rates from bloom-filter pruning
adriangb May 8, 2026
123847a
docs: r7 outcome + r8 plan (refresh rates for dynamic filters)
adriangb May 8, 2026
99dba8e
exp(parquet): refresh per-conjunct rate for populated dynamic filters
adriangb May 8, 2026
094b9e5
docs: r8 outcome — smoke lat 1.013× of exp3, full TPC-DS-lat 1.074×
adriangb May 8, 2026
d93b664
exp(parquet): partial-AND promote-only signal for hash_lookup dynamic…
adriangb May 8, 2026
a4267d9
exp(parquet): snapshot dynamic filter before split for partial-AND prior
adriangb May 8, 2026
21cd786
docs: r9 result (neutral) + final round-6 stack summary
adriangb May 8, 2026
b98af45
exp(pruning): drop wrapper PruningPredicate construction in tagged-co…
adriangb May 8, 2026
2b1a78b
cleanup: drop unused top-level PruningPredicate import in selectivity
adriangb May 8, 2026
bdc51dc
docs: r11 (filter sort by rate) neutral; r10 at parity with exp3
adriangb May 8, 2026
31c97b1
fix(parquet): clippy cleanup of round-6 code
adriangb May 8, 2026
066e149
docs: ClickBench smoke confirms r10 parity with exp3
adriangb May 8, 2026
f214957
test(parquet): update selectivity tests for scatter-aware bytes API
adriangb May 8, 2026
d2d5db0
docs: log selectivity test fix as part of round-6 PR-readiness
adriangb May 8, 2026
77fba1e
docs: TPC-H smoke confirms parity — r10 is 1.5% faster than exp3
adriangb May 8, 2026
7248281
docs: success criterion verified — r10+pushdown < main+no-pushdown
adriangb May 8, 2026
5f9761f
docs: design doc + reframe slides and report around the clean stack
adriangb May 9, 2026
8b75d96
docs: reframe design, report, and slides for the general DataFusion c…
adriangb May 10, 2026
ee43f6f
docs: fix TPC-H story — post-scan runs inside the scan, no shuffle re…
adriangb May 10, 2026
264e88d
slides: fix TPC-H slide rendering — chart now shows Q18, takeaway fits
adriangb May 10, 2026
88ed3bf
fix(pruning): union sub-predicate columns in literal_columns()
adriangb May 9, 2026
d595098
test(parquet): force-promote filters in tests that drove the legacy '…
adriangb May 9, 2026
48b4f88
docs: correct dynamic-filter timing story (probe side never sees plac…
adriangb May 11, 2026
dcf4631
docs: snapshot already happens unconditionally — refresh path is for …
adriangb May 11, 2026
1570f9e
slides: add 'lottery in numbers' slide between the diagnosis and the …
adriangb May 11, 2026
6b33c9f
slides: replace lottery-table slide with two SQL-example slides (win …
adriangb May 11, 2026
e45d194
update
adriangb May 11, 2026
e4f5708
update
adriangb May 13, 2026
893f836
remove
adriangb May 13, 2026
d118b2c
cleanup
adriangb May 13, 2026
858013c
cleanup
adriangb May 13, 2026
54433ca
fmt
adriangb May 13, 2026
ada0472
parquet: dynamic projection-mask swaps at row-group boundaries
adriangb May 13, 2026
cae5fe6
hash join: wrap pushed-down dynamic filter in OptionalFilterPhysicalExpr
adriangb May 14, 2026
9267518
fix CI: page-index match count, fmt, broken doc link
adriangb May 14, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 30 additions & 46 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

28 changes: 28 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,34 @@ url = "2.5.7"
uuid = "1.23"
zstd = { version = "0.13", default-features = false }

# Override arrow / parquet to the `adaptive-strategy-swap` branch on
# pydantic's fork of arrow-rs, which adds the `swap_strategy` API on
# `ParquetPushDecoder` that the in-decoder adaptive filter scheduling
# depends on.
#
# The full set of arrow-rs workspace crates is listed so transitive
# deps (e.g. `arrow-cast` pulled in via `arrow`) resolve to the patched
# version and we don't link two copies into one binary.
#
# Branch: https://github.com/pydantic/arrow-rs/tree/adaptive-strategy-swap
[patch.crates-io]
arrow = { git = "https://github.com/pydantic/arrow-rs.git", branch = "adaptive-strategy-swap" }
arrow-arith = { git = "https://github.com/pydantic/arrow-rs.git", branch = "adaptive-strategy-swap" }
arrow-array = { git = "https://github.com/pydantic/arrow-rs.git", branch = "adaptive-strategy-swap" }
arrow-buffer = { git = "https://github.com/pydantic/arrow-rs.git", branch = "adaptive-strategy-swap" }
arrow-cast = { git = "https://github.com/pydantic/arrow-rs.git", branch = "adaptive-strategy-swap" }
arrow-csv = { git = "https://github.com/pydantic/arrow-rs.git", branch = "adaptive-strategy-swap" }
arrow-data = { git = "https://github.com/pydantic/arrow-rs.git", branch = "adaptive-strategy-swap" }
arrow-flight = { git = "https://github.com/pydantic/arrow-rs.git", branch = "adaptive-strategy-swap" }
arrow-ipc = { git = "https://github.com/pydantic/arrow-rs.git", branch = "adaptive-strategy-swap" }
arrow-json = { git = "https://github.com/pydantic/arrow-rs.git", branch = "adaptive-strategy-swap" }
arrow-ord = { git = "https://github.com/pydantic/arrow-rs.git", branch = "adaptive-strategy-swap" }
arrow-row = { git = "https://github.com/pydantic/arrow-rs.git", branch = "adaptive-strategy-swap" }
arrow-schema = { git = "https://github.com/pydantic/arrow-rs.git", branch = "adaptive-strategy-swap" }
arrow-select = { git = "https://github.com/pydantic/arrow-rs.git", branch = "adaptive-strategy-swap" }
arrow-string = { git = "https://github.com/pydantic/arrow-rs.git", branch = "adaptive-strategy-swap" }
parquet = { git = "https://github.com/pydantic/arrow-rs.git", branch = "adaptive-strategy-swap" }

[workspace.lints.clippy]
# Detects large stack-allocated futures that may cause stack overflow crashes (see threshold in clippy.toml)
large_futures = "warn"
Expand Down
10 changes: 10 additions & 0 deletions datafusion-examples/examples/data_io/json_shredding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,16 @@ pub async fn json_shredding() -> Result<()> {
// Set up query execution
let mut cfg = SessionConfig::new();
cfg.options_mut().execution.parquet.pushdown_filters = true;
// Force every filter to row-level so the example's
// `pushdown_rows_pruned=1` assertion is deterministic. The default
// adaptive scheduler keeps small-file filters on the post-scan path
// (via the byte-ratio heuristic), where `pushdown_rows_pruned` stays
// 0; setting `filter_pushdown_min_bytes_per_sec = 0` disables that
// heuristic.
cfg.options_mut()
.execution
.parquet
.filter_pushdown_min_bytes_per_sec = 0.0;
let ctx = SessionContext::new_with_config(cfg);
ctx.runtime_env().register_object_store(
ObjectStoreUrl::parse("memory://")?.as_ref(),
Expand Down
23 changes: 23 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -919,6 +919,29 @@ config_namespace! {
/// parquet reader setting. 0 means no caching.
pub max_predicate_cache_size: Option<usize>, default = None

/// (reading) Minimum throughput, in bytes per second, that an adaptive
/// row-level filter must sustain to remain at row-level. Filters that
/// drop below this threshold (with statistical confidence — see
/// `filter_confidence_z`) are demoted to post-scan, or dropped entirely
/// if they were optional (e.g. a hash-join build-side dynamic filter).
/// Set to `0` to force every filter to row-level (skip the threshold
/// check); set to `f64::INFINITY` to keep every filter post-scan.
pub filter_pushdown_min_bytes_per_sec: f64, default = 100.0 * 1024.0 * 1024.0

/// (reading) Initial-placement heuristic for adaptive filters: when a
/// filter is first observed, place it at row-level if its column bytes
/// are this fraction or less of the total projection's column bytes.
/// Above this ratio, the filter starts as post-scan and only gets
/// promoted later if measured throughput crosses
/// `filter_pushdown_min_bytes_per_sec`.
pub filter_collecting_byte_ratio_threshold: f64, default = 0.20

/// (reading) Z-score for the one-sided confidence interval the adaptive
/// filter scheduler uses when promoting / demoting / dropping filters.
/// Default `2.0` (≈ 97.5%) keeps strategy moves conservative; lower the
/// value for snappier adaptation, raise it for more stable placements.
pub filter_confidence_z: f64, default = 2.0

// The following options affect writing to parquet files
// and map to parquet::file::properties::WriterProperties

Expand Down
13 changes: 13 additions & 0 deletions datafusion/common/src/file_options/parquet_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,10 @@ impl ParquetOptions {
coerce_int96: _, // not used for writer props
skip_arrow_metadata: _,
max_predicate_cache_size: _,
// Read-time adaptive filter knobs; not used for writer props.
filter_pushdown_min_bytes_per_sec: _,
filter_collecting_byte_ratio_threshold: _,
filter_confidence_z: _,
} = self;

let mut builder = WriterProperties::builder()
Expand Down Expand Up @@ -483,6 +487,10 @@ mod tests {
skip_arrow_metadata: defaults.skip_arrow_metadata,
coerce_int96: None,
max_predicate_cache_size: defaults.max_predicate_cache_size,
filter_pushdown_min_bytes_per_sec: defaults.filter_pushdown_min_bytes_per_sec,
filter_collecting_byte_ratio_threshold: defaults
.filter_collecting_byte_ratio_threshold,
filter_confidence_z: defaults.filter_confidence_z,
use_content_defined_chunking: defaults.use_content_defined_chunking.clone(),
}
}
Expand Down Expand Up @@ -600,6 +608,11 @@ mod tests {
binary_as_string: global_options_defaults.binary_as_string,
skip_arrow_metadata: global_options_defaults.skip_arrow_metadata,
coerce_int96: None,
filter_pushdown_min_bytes_per_sec: global_options_defaults
.filter_pushdown_min_bytes_per_sec,
filter_collecting_byte_ratio_threshold: global_options_defaults
.filter_collecting_byte_ratio_threshold,
filter_confidence_z: global_options_defaults.filter_confidence_z,
use_content_defined_chunking: props.content_defined_chunking().map(|c| {
CdcOptions {
min_chunk_size: c.min_chunk_size,
Expand Down
16 changes: 11 additions & 5 deletions datafusion/core/src/datasource/physical_plan/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,13 +166,19 @@ mod tests {
source = source.with_predicate(predicate);
}

// The adaptive selectivity tracker subsumes the static
// `reorder_filters` flag. To keep these row-filter-pushdown
// assertions deterministic regardless of the byte-ratio
// heuristic, force every filter to row-level by setting
// `filter_pushdown_min_bytes_per_sec = 0` (the
// "always-row-level" sentinel). The promote/demote behavior
// exercised by other tests is irrelevant here.
if self.pushdown_predicate {
source = source
.with_pushdown_filters(true)
.with_reorder_filters(true);
} else {
source = source.with_pushdown_filters(false);
let mut opts = TableParquetOptions::default();
opts.global.filter_pushdown_min_bytes_per_sec = 0.0;
source = source.with_table_parquet_options(opts);
}
source = source.with_pushdown_filters(self.pushdown_predicate);

if self.page_index_predicate {
source = source.with_enable_page_index(true);
Expand Down
Loading
Loading