Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
232 changes: 206 additions & 26 deletions datafusion/physical-plan/src/topk/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ impl TopK {
expr,
row_converter,
scratch_rows,
heap: TopKHeap::new(k, batch_size),
heap: TopKHeap::new(k),
common_sort_prefix_converter: prefix_row_converter,
common_sort_prefix: Arc::from(common_sort_prefix),
finished: false,
Expand Down Expand Up @@ -663,8 +663,6 @@ impl TopKMetrics {
struct TopKHeap {
/// The maximum number of elements to store in this heap.
k: usize,
/// The target number of rows for output batches
batch_size: usize,
/// Storage for up at most `k` items using a BinaryHeap. Reversed
/// so that the smallest k so far is on the top
inner: BinaryHeap<TopKRow>,
Expand All @@ -675,11 +673,10 @@ struct TopKHeap {
}

impl TopKHeap {
fn new(k: usize, batch_size: usize) -> Self {
fn new(k: usize) -> Self {
assert!(k > 0);
Self {
k,
batch_size,
inner: BinaryHeap::new(),
store: RecordBatchStore::new(),
owned_bytes: 0,
Expand Down Expand Up @@ -792,24 +789,26 @@ impl TopKHeap {
/// Compact this heap, rewriting all stored batches into a single
/// input batch
pub fn maybe_compact(&mut self) -> Result<()> {
// we compact if the number of "unused" rows in the store is
// past some pre-defined threshold. Target holding up to
// around 20 batches, but handle cases of large k where some
// batches might be partially full
let max_unused_rows = (20 * self.batch_size) + self.k;
let unused_rows = self.store.unused_rows();

// don't compact if the store has one extra batch or
// unused rows is under the threshold
if self.store.len() <= 2 || unused_rows < max_unused_rows {
// Don't compact if there's only one batch (compacting into itself is pointless)
if self.store.len() <= 1 {
return Ok(());
}

let total_rows = self.store.total_rows;
let num_rows = self.inner.len();

// Compact when current store memory exceeds 2x what the compacted
// result would need. The multiplier avoids compacting when the
// savings would be marginal.
if total_rows <= num_rows * 2 {
return Ok(());
}

// at first, compact the entire thing always into a new batch
// (maybe we can get fancier in the future about ignoring
// batches that have a high usage ratio already

// Note: new batch is in the same order as inner
let num_rows = self.inner.len();
let (new_batch, mut topk_rows) = self.emit_with_state()?;
let Some(new_batch) = new_batch else {
return Ok(());
Expand Down Expand Up @@ -969,6 +968,8 @@ struct RecordBatchStore {
batches: HashMap<u32, RecordBatchEntry>,
/// total size of all record batches tracked by this store
batches_size: usize,
/// row count of all the batches
total_rows: usize,
}

impl RecordBatchStore {
Expand All @@ -977,6 +978,7 @@ impl RecordBatchStore {
next_id: 0,
batches: HashMap::new(),
batches_size: 0,
total_rows: 0,
}
}

Expand All @@ -994,6 +996,7 @@ impl RecordBatchStore {
// uses of 0 means that none of the rows in the batch were stored in the topk
if entry.uses > 0 {
self.batches_size += get_record_batch_memory_size(&entry.batch);
self.total_rows += entry.batch.num_rows();
self.batches.insert(entry.id, entry);
}
}
Expand All @@ -1002,6 +1005,7 @@ impl RecordBatchStore {
fn clear(&mut self) {
self.batches.clear();
self.batches_size = 0;
self.total_rows = 0;
}

fn get(&self, id: u32) -> Option<&RecordBatchEntry> {
Expand All @@ -1013,15 +1017,6 @@ impl RecordBatchStore {
self.batches.len()
}

/// Returns the total number of rows in batches minus the number
/// which are in use
fn unused_rows(&self) -> usize {
self.batches
.values()
.map(|batch_entry| batch_entry.batch.num_rows() - batch_entry.uses)
.sum()
}

/// returns true if the store has nothing stored
fn is_empty(&self) -> bool {
self.batches.is_empty()
Expand All @@ -1045,6 +1040,11 @@ impl RecordBatchStore {
.batches_size
.checked_sub(get_record_batch_memory_size(&old_entry.batch))
.unwrap();

self.total_rows = self
.total_rows
.checked_sub(old_entry.batch.num_rows())
.unwrap();
}
}

Expand All @@ -1060,7 +1060,7 @@ impl RecordBatchStore {
#[cfg(test)]
mod tests {
use super::*;
use arrow::array::{Float64Array, Int32Array};
use arrow::array::{BooleanArray, Float64Array, Int32Array};
use arrow::datatypes::{DataType, Field, Schema};
use arrow_schema::SortOptions;
use datafusion_common::assert_batches_eq;
Expand Down Expand Up @@ -1243,4 +1243,184 @@ mod tests {

Ok(())
}

/// Tests that memory-based compaction triggers when a large batch
/// has very few rows referenced by the top-k heap.
#[tokio::test]
async fn test_topk_memory_compaction() -> Result<()> {
Comment thread
cetra3 marked this conversation as resolved.
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));

let sort_expr = PhysicalSortExpr {
expr: col("a", schema.as_ref())?,
options: SortOptions::default(),
};

let full_expr = LexOrdering::from([sort_expr.clone()]);
let prefix = vec![sort_expr];

let runtime = Arc::new(RuntimeEnv::default());
let metrics = ExecutionPlanMetricsSet::new();

let k = 5;
let mut topk = TopK::try_new(
0,
Arc::clone(&schema),
prefix,
full_expr,
k,
8192,
runtime,
&metrics,
Arc::new(RwLock::new(TopKDynamicFilters::new(Arc::new(
DynamicFilterPhysicalExpr::new(vec![], lit(true)),
)))),
)?;

// Insert a large batch (100,000 rows) with values 1..=100_000.
// Only the smallest 5 values (1..=5) will end up in the heap.
let large_values: Vec<i32> = (1..=100_000).collect();
let array1: ArrayRef = Arc::new(Int32Array::from(large_values));
let batch1 = RecordBatch::try_new(Arc::clone(&schema), vec![array1])?;
topk.insert_batch(batch1)?;

// After the first batch, store has 1 batch — compaction should
// not trigger (guard: store.len() <= 1).
assert_eq!(
topk.heap.store.len(),
1,
"should have 1 batch before second insert"
);

// Insert a second batch whose values displace entries in the heap.
// -1 and 0 are smaller than the current top-5 (1..=5), so they
// produce 2 replacements. With replacements > 0, `insert_batch`
// calls `insert_batch_entry` (briefly making store.len() == 2)
// and then `maybe_compact`, which should collapse it back to 1.
let array2: ArrayRef = Arc::new(Int32Array::from(vec![-1, 0]));
let batch2 = RecordBatch::try_new(Arc::clone(&schema), vec![array2])?;
let replacements_before = topk.metrics.row_replacements.value();
topk.insert_batch(batch2)?;

// Sanity check: batch2 was actually integrated. Without
// replacements, `maybe_compact` is never called and the
// store-length assertion below would pass vacuously.
assert!(
topk.metrics.row_replacements.value() > replacements_before,
"batch2 must produce replacements so compaction is exercised"
);

// The compacted-estimate guard is `total_rows <= num_rows * 2`,
// i.e. 100_002 <= 10, which is false, so compaction fires and
// collapses the two stored batches back into one.
assert_eq!(
topk.heap.store.len(),
1,
"store should be compacted to 1 batch"
);

// Verify the emitted results are correct (top 5 ascending).
let results: Vec<_> = topk.emit()?.try_collect().await?;
assert_batches_eq!(
&[
"+----+", "| a |", "+----+", "| -1 |", "| 0 |", "| 1 |", "| 2 |",
"| 3 |", "+----+",
],
&results
);

Ok(())
}

/// Negative path: when stored rows are close to the heap size,
/// compaction must NOT fire even with multiple batches present,
/// because the savings would be marginal
/// (guard: `total_rows <= num_rows * 2`).
///
/// Uses a bit-packed `BooleanArray` so that future changes to the
/// compaction heuristic that reintroduce a per-byte estimate
/// (where integer truncation could misbehave on sub-byte types)
/// are caught here.
#[tokio::test]
async fn test_topk_memory_compaction_skipped_when_marginal() -> Result<()> {
Comment thread
cetra3 marked this conversation as resolved.
let schema =
Arc::new(Schema::new(vec![Field::new("a", DataType::Boolean, false)]));

let sort_expr = PhysicalSortExpr {
expr: col("a", schema.as_ref())?,
options: SortOptions::default(),
};
let full_expr = LexOrdering::from([sort_expr.clone()]);
let prefix = vec![sort_expr];

let runtime = Arc::new(RuntimeEnv::default());
let metrics = ExecutionPlanMetricsSet::new();

let k = 10;
let mut topk = TopK::try_new(
0,
Arc::clone(&schema),
prefix,
full_expr,
k,
8192,
runtime,
&metrics,
Arc::new(RwLock::new(TopKDynamicFilters::new(Arc::new(
DynamicFilterPhysicalExpr::new(vec![], lit(true)),
)))),
)?;

// Two small batches; every row from both batches ends up referenced
// by the heap, so total_rows == num_rows == 10.
let batch1 = RecordBatch::try_new(
Arc::clone(&schema),
vec![
Arc::new(BooleanArray::from(vec![false, false, true, true, true]))
as ArrayRef,
],
)?;
topk.insert_batch(batch1)?;

let batch2 = RecordBatch::try_new(
Arc::clone(&schema),
vec![
Arc::new(BooleanArray::from(vec![false, false, false, true, true]))
as ArrayRef,
],
)?;
topk.insert_batch(batch2)?;

// Guard `total_rows <= num_rows * 2` should hold (10 <= 20),
// so compaction is skipped and BOTH batches remain in the store.
assert_eq!(
topk.heap.store.len(),
2,
"store must keep 2 batches when savings would be marginal"
);
assert_eq!(topk.heap.inner.len(), 10, "heap should hold all 10 rows");

// Output is still correct (5 falses then 5 trues ascending).
let results: Vec<_> = topk.emit()?.try_collect().await?;
assert_batches_eq!(
&[
"+-------+",
"| a |",
"+-------+",
"| false |",
"| false |",
"| false |",
"| false |",
"| false |",
"| true |",
"| true |",
"| true |",
"| true |",
"| true |",
"+-------+",
],
&results
);

Ok(())
}
}
Loading