Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 109 additions & 1 deletion datafusion/datasource-parquet/src/access_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,12 @@ use parquet::file::metadata::{ParquetMetaData, RowGroupMetaData};
pub struct ParquetAccessPlan {
/// How to access the i-th row group
row_groups: Vec<RowGroupAccess>,
/// Whether all rows in the i-th row group are known to match the predicate.
///
/// This is tracked separately from [`RowGroupAccess`] because it describes
/// whether row-level filter evaluation can be skipped, not which rows should
/// be read.
fully_matched: Vec<bool>,
}

/// Describes how the parquet reader will access a row group
Expand All @@ -104,6 +110,24 @@ pub enum RowGroupAccess {
Selection(RowSelection),
}

/// A consecutive set of row groups that share the same row filter requirement.
#[derive(Debug, Clone, PartialEq)]
pub(crate) struct RowGroupRun {
/// True if this run needs row filter evaluation.
pub(crate) needs_filter: bool,
/// The access plan for this run.
pub(crate) access_plan: ParquetAccessPlan,
}

impl RowGroupRun {
fn new(needs_filter: bool, access_plan: ParquetAccessPlan) -> Self {
Self {
needs_filter,
access_plan,
}
}
}

impl RowGroupAccess {
/// Return true if this row group should be scanned
pub fn should_scan(&self) -> bool {
Expand All @@ -119,24 +143,34 @@ impl ParquetAccessPlan {
pub fn new_all(row_group_count: usize) -> Self {
Self {
row_groups: vec![RowGroupAccess::Scan; row_group_count],
fully_matched: vec![false; row_group_count],
}
}

/// Create a new `ParquetAccessPlan` that scans no row groups
pub fn new_none(row_group_count: usize) -> Self {
Self {
row_groups: vec![RowGroupAccess::Skip; row_group_count],
fully_matched: vec![false; row_group_count],
}
}

/// Create a new `ParquetAccessPlan` from the specified [`RowGroupAccess`]es
pub fn new(row_groups: Vec<RowGroupAccess>) -> Self {
Self { row_groups }
let row_group_count = row_groups.len();
Self {
row_groups,
fully_matched: vec![false; row_group_count],
}
}

/// Set the i-th row group to the specified [`RowGroupAccess`]
pub fn set(&mut self, idx: usize, access: RowGroupAccess) {
let should_scan = access.should_scan();
self.row_groups[idx] = access;
if !should_scan {
self.fully_matched[idx] = false;
}
}

/// skips the i-th row group (should not be scanned)
Expand All @@ -154,6 +188,32 @@ impl ParquetAccessPlan {
self.row_groups[idx].should_scan()
}

/// Marks the i-th row group as fully matched.
///
/// Fully matched row groups are still read according to their
/// [`RowGroupAccess`], but row-level filter evaluation can be skipped.
pub(crate) fn mark_fully_matched(&mut self, idx: usize) {
if self.should_scan(idx) {
self.fully_matched[idx] = true;
}
}

/// Return true if the i-th row group is fully matched and scanned.
pub(crate) fn is_fully_matched(&self, idx: usize) -> bool {
self.should_scan(idx) && self.fully_matched[idx]
}

/// Returns the fully matched row group flags.
pub(crate) fn fully_matched(&self) -> &Vec<bool> {
&self.fully_matched
}

/// Return true if any scanned row group is fully matched.
fn has_fully_matched(&self) -> bool {
self.row_group_index_iter()
.any(|idx| self.is_fully_matched(idx))
}

/// Set to scan only the [`RowSelection`] in the specified row group.
///
/// Behavior is different depending on the existing access
Expand Down Expand Up @@ -339,6 +399,54 @@ impl ParquetAccessPlan {
self.row_groups
}

/// Split this plan into consecutive row group runs that share the same row
/// filter requirement.
pub(crate) fn split_runs(self, needs_filter: bool) -> Vec<RowGroupRun> {
if !needs_filter || !self.has_fully_matched() {
return vec![RowGroupRun::new(needs_filter, self)];
}

let num_row_groups = self.row_groups.len();
let row_groups = self.row_groups;
let fully_matched = self.fully_matched;
let mut runs: Vec<RowGroupRun> = Vec::new();

for (idx, (access, fully_matched)) in
row_groups.into_iter().zip(fully_matched).enumerate()
{
if !access.should_scan() {
continue;
}

let row_group_needs_filter = !fully_matched;
if let Some(run) = runs
.last_mut()
.filter(|run| run.needs_filter == row_group_needs_filter)
{
run.access_plan.set(idx, access);
if fully_matched {
run.access_plan.mark_fully_matched(idx);
}
} else {
let mut run_plan = ParquetAccessPlan::new_none(num_row_groups);
run_plan.set(idx, access);
if fully_matched {
run_plan.mark_fully_matched(idx);
}
runs.push(RowGroupRun::new(row_group_needs_filter, run_plan));
}
}

if runs.is_empty() {
vec![RowGroupRun::new(
needs_filter,
ParquetAccessPlan::new_none(num_row_groups),
)]
} else {
runs
}
}

/// Prepare this plan and resolve to the final `PreparedAccessPlan`
pub(crate) fn prepare(
self,
Expand Down
24 changes: 24 additions & 0 deletions datafusion/datasource-parquet/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,4 +213,28 @@ impl ParquetFileMetrics {
predicate_cache_records,
}
}

/// Record pages whose page-index pruning was skipped because the containing
/// row group was fully matched by row-group statistics.
///
/// The counter is only registered when there is a non-zero value. This keeps
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should apply the same pattern to the other metrics (lazily initialize them) -- if you can get a few percent in this query maybe it would get us a few in the others

/// [`ParquetFileMetrics::new`] from cloning the filename and metrics set for
/// files that never use this metric.
pub(crate) fn add_page_index_pages_skipped_by_fully_matched(
metrics: &ExecutionPlanMetricsSet,
partition: usize,
filename: &str,
n: usize,
) {
if n == 0 {
return;
}

let count = MetricBuilder::new(metrics)
.with_new_label("filename", filename.to_string())
.with_type(MetricType::Summary)
.with_category(MetricCategory::Rows)
.counter("page_index_pages_skipped_by_fully_matched", partition);
count.add(n);
}
}
Loading
Loading