Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ system-metrics = ["sysinfo"]
tpch = ["integration"]
tpcds = ["integration"]
clickbench = ["integration"]
slow-tests = []
sysinfo = ["dep:sysinfo"]

[dev-dependencies]
Expand Down
43 changes: 35 additions & 8 deletions src/test_utils/property_based.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,24 @@ use datafusion::{
};
use std::sync::Arc;

/// Per-test configuration knobs for the property-based comparison helpers.
#[derive(Debug, Default, Clone, Copy)]
pub struct PerTestConfig {
/// If true, skip the set-equality comparison in [`compare_result_set`]. Use for queries
/// whose final operator is a non-deterministic LIMIT (no ORDER BY at all), where two
/// engines may correctly return different valid 10-row slices.
pub uses_undeterministic_limit_operator: bool,
/// If true, skip the set-equality comparison in [`compare_result_set`]. Use for queries
/// whose ORDER BY does not produce a total ordering (e.g. `ORDER BY c DESC` with ties
/// at the LIMIT boundary), where two engines may correctly pick different tied rows.
pub non_deterministic_sort: bool,
}

/// compares the set of record batches for equality
pub fn compare_result_set(
actual_result: &Result<Vec<RecordBatch>>,
expected_result: &Result<Vec<RecordBatch>>,
config: &PerTestConfig,
) -> Result<()> {
let test_batches = match actual_result.as_ref() {
Ok(batches) => batches,
Expand All @@ -36,7 +50,7 @@ pub fn compare_result_set(
}
};

records_equal_as_sets(test_batches, compare_batches)
records_equal_as_sets(test_batches, compare_batches, config)
.map_err(|e| internal_datafusion_err!("result sets were not equal: {}", e))
}

Expand All @@ -55,7 +69,6 @@ pub fn compare_ordering(

let actual_ordering = actual_physical_plan.properties().output_ordering();
let expected_ordering = expected_physical_plan.properties().output_ordering();

if actual_ordering != expected_ordering {
return internal_err!(
"ordering mismatch: expected ordering: {:?}, actual ordering: {:?}",
Expand Down Expand Up @@ -93,6 +106,7 @@ pub fn compare_ordering(
fn records_equal_as_sets(
left: &[RecordBatch],
right: &[RecordBatch],
config: &PerTestConfig,
) -> Result<(), DataFusionError> {
// First check if total row counts match
let left_rows: usize = left.iter().map(|b| b.num_rows()).sum();
Expand All @@ -114,6 +128,13 @@ fn records_equal_as_sets(
right[0].schema()
);
}
// Skip the row-by-row set comparison for queries where SQL semantics allow two engines
// to correctly return different result sets (e.g. LIMIT without ORDER BY, or ORDER BY
// with unbroken ties at the LIMIT boundary). Running the full comparison on these
// queries would surface false positives even though both engines are right.
if config.uses_undeterministic_limit_operator || config.non_deterministic_sort {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved this function a little lower so that it runs after a schema check and a row count check

return Ok(());
}

detailed_batch_comparison(left, right)
}
Expand Down Expand Up @@ -197,11 +218,17 @@ fn batch_rows_to_strings(batches: &[RecordBatch]) -> Vec<String> {
if array.is_null(row_idx) {
row_values.push("NULL".to_string());
} else if let Some(arr) = array.as_any().downcast_ref::<Float16Array>() {
row_values.push(format!("{:.1$}", arr.value(row_idx), 2));
// 14 significant digits — leaves ~1-2 digits of headroom below f64 precision
// to tolerate summation-order drift between single-node and distributed
// aggregation. [15 digits still diverged at the last digit]
// Example values that 15 digits could not reconcile:
// - Single-node: 2.533767602294735e18 → 2,533,767,602,294,735,000
// - Distributed: 2.5337676022947354e18 → 2,533,767,602,294,735,400
row_values.push(format!("{:.13e}", arr.value(row_idx).to_f64()));
} else if let Some(arr) = array.as_any().downcast_ref::<Float32Array>() {
row_values.push(format!("{:.1$}", arr.value(row_idx), 2));
row_values.push(format!("{:.13e}", arr.value(row_idx) as f64));
} else if let Some(arr) = array.as_any().downcast_ref::<Float64Array>() {
row_values.push(format!("{:.1$}", arr.value(row_idx), 2));
row_values.push(format!("{:.13e}", arr.value(row_idx)));
} else {
// Use Arrow's deterministic string representation
let value_str = array_value_to_string(array, row_idx)
Expand Down Expand Up @@ -290,7 +317,7 @@ mod tests {
async fn test_records_equal_as_sets_empty() {
let left: Vec<RecordBatch> = vec![];
let right: Vec<RecordBatch> = vec![];
assert!(records_equal_as_sets(&left, &right).is_ok());
assert!(records_equal_as_sets(&left, &right, &PerTestConfig::default()).is_ok());
}

#[tokio::test]
Expand Down Expand Up @@ -323,7 +350,7 @@ mod tests {
let right = vec![batch2];

// Should be equal as sets (order independent)
assert!(records_equal_as_sets(&left, &right).is_ok());
assert!(records_equal_as_sets(&left, &right, &PerTestConfig::default()).is_ok());
}

#[tokio::test]
Expand All @@ -345,7 +372,7 @@ mod tests {
let left = vec![batch1];
let right = vec![batch2];

assert!(records_equal_as_sets(&left, &right).is_err());
assert!(records_equal_as_sets(&left, &right, &PerTestConfig::default()).is_err());
}

#[tokio::test]
Expand Down
Loading
Loading