diff --git a/datafusion/core/tests/physical_optimizer/limit_pushdown.rs b/datafusion/core/tests/physical_optimizer/limit_pushdown.rs index b30092159f43..0c41fc8e9ef2 100644 --- a/datafusion/core/tests/physical_optimizer/limit_pushdown.rs +++ b/datafusion/core/tests/physical_optimizer/limit_pushdown.rs @@ -98,6 +98,10 @@ fn nested_loop_join_exec( )?)) } +fn format_plan(plan: &Arc) -> String { + get_plan_string(plan).join("\n") +} + #[test] fn transforms_streaming_table_exec_into_fetching_version_when_skip_is_zero() -> Result<()> { @@ -105,20 +109,23 @@ fn transforms_streaming_table_exec_into_fetching_version_when_skip_is_zero() -> let streaming_table = stream_exec(&schema); let global_limit = global_limit_exec(streaming_table, 0, Some(5)); - let initial = get_plan_string(&global_limit); - let expected_initial = [ - "GlobalLimitExec: skip=0, fetch=5", - " StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3], infinite_source=true", - ]; - assert_eq!(initial, expected_initial); + let initial = format_plan(&global_limit); + insta::assert_snapshot!( + initial, + @r" + GlobalLimitExec: skip=0, fetch=5 + StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3], infinite_source=true + " + ); let after_optimize = LimitPushdown::new().optimize(global_limit, &ConfigOptions::new())?; - let expected = [ - "StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3], infinite_source=true, fetch=5", - ]; - assert_eq!(get_plan_string(&after_optimize), expected); + let optimized = format_plan(&after_optimize); + insta::assert_snapshot!( + optimized, + @"StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3], infinite_source=true, fetch=5" + ); Ok(()) } @@ -130,21 +137,26 @@ fn transforms_streaming_table_exec_into_fetching_version_and_keeps_the_global_li let streaming_table = stream_exec(&schema); let global_limit = global_limit_exec(streaming_table, 2, Some(5)); - let initial = get_plan_string(&global_limit); - let expected_initial = [ - "GlobalLimitExec: skip=2, fetch=5", - " StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3], infinite_source=true", - ]; - assert_eq!(initial, expected_initial); + let initial = format_plan(&global_limit); + insta::assert_snapshot!( + initial, + @r" + GlobalLimitExec: skip=2, fetch=5 + StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3], infinite_source=true + " + ); let after_optimize = LimitPushdown::new().optimize(global_limit, &ConfigOptions::new())?; - let expected = [ - "GlobalLimitExec: skip=2, fetch=5", - " StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3], infinite_source=true, fetch=7", - ]; - assert_eq!(get_plan_string(&after_optimize), expected); + let optimized = format_plan(&after_optimize); + insta::assert_snapshot!( + optimized, + @r" + GlobalLimitExec: skip=2, fetch=5 + StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3], infinite_source=true, fetch=7 + " + ); Ok(()) } @@ -157,24 +169,29 @@ fn pushes_global_limit_exec_through_projection_exec() -> Result<()> { let projection = projection_exec(schema, filter)?; let global_limit = global_limit_exec(projection, 0, Some(5)); - let initial = get_plan_string(&global_limit); - let expected_initial = [ - "GlobalLimitExec: skip=0, fetch=5", - " ProjectionExec: expr=[c1@0 as c1, c2@1 as c2, c3@2 as c3]", - " FilterExec: c3@2 > 0", - " StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3], infinite_source=true", - ]; - assert_eq!(initial, expected_initial); + let initial = format_plan(&global_limit); + insta::assert_snapshot!( + initial, + @r" + GlobalLimitExec: skip=0, fetch=5 + ProjectionExec: expr=[c1@0 as c1, c2@1 as c2, c3@2 as c3] + FilterExec: c3@2 > 0 + StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3], infinite_source=true + " + ); let after_optimize = LimitPushdown::new().optimize(global_limit, &ConfigOptions::new())?; - let expected = [ - "ProjectionExec: expr=[c1@0 as c1, c2@1 as c2, c3@2 as c3]", - " FilterExec: c3@2 > 0, fetch=5", - " StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3], infinite_source=true", - ]; - assert_eq!(get_plan_string(&after_optimize), expected); + let optimized = format_plan(&after_optimize); + insta::assert_snapshot!( + optimized, + @r" + ProjectionExec: expr=[c1@0 as c1, c2@1 as c2, c3@2 as c3] + FilterExec: c3@2 > 0, fetch=5 + StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3], infinite_source=true + " + ); Ok(()) } @@ -194,29 +211,33 @@ fn pushes_global_limit_into_multiple_fetch_plans() -> Result<()> { let spm = sort_preserving_merge_exec(ordering, sort); let global_limit = global_limit_exec(spm, 0, Some(5)); - let initial = get_plan_string(&global_limit); - let expected_initial = [ - "GlobalLimitExec: skip=0, fetch=5", - " SortPreservingMergeExec: [c1@0 ASC]", - " SortExec: expr=[c1@0 ASC], preserve_partitioning=[false]", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " ProjectionExec: expr=[c1@0 as c1, c2@1 as c2, c3@2 as c3]", - " StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3], infinite_source=true", - ]; - - assert_eq!(initial, expected_initial); + let initial = format_plan(&global_limit); + insta::assert_snapshot!( + initial, + @r" + GlobalLimitExec: skip=0, fetch=5 + SortPreservingMergeExec: [c1@0 ASC] + SortExec: expr=[c1@0 ASC], preserve_partitioning=[false] + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + ProjectionExec: expr=[c1@0 as c1, c2@1 as c2, c3@2 as c3] + StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3], infinite_source=true + " + ); let after_optimize = LimitPushdown::new().optimize(global_limit, &ConfigOptions::new())?; - let expected = [ - "SortPreservingMergeExec: [c1@0 ASC], fetch=5", - " SortExec: TopK(fetch=5), expr=[c1@0 ASC], preserve_partitioning=[false]", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " ProjectionExec: expr=[c1@0 as c1, c2@1 as c2, c3@2 as c3]", - " StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3], infinite_source=true", - ]; - assert_eq!(get_plan_string(&after_optimize), expected); + let optimized = format_plan(&after_optimize); + insta::assert_snapshot!( + optimized, + @r" + SortPreservingMergeExec: [c1@0 ASC], fetch=5 + SortExec: TopK(fetch=5), expr=[c1@0 ASC], preserve_partitioning=[false] + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + ProjectionExec: expr=[c1@0 as c1, c2@1 as c2, c3@2 as c3] + StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3], infinite_source=true + " + ); Ok(()) } @@ -231,26 +252,31 @@ fn keeps_pushed_local_limit_exec_when_there_are_multiple_input_partitions() -> R let coalesce_partitions = coalesce_partitions_exec(filter); let global_limit = global_limit_exec(coalesce_partitions, 0, Some(5)); - let initial = get_plan_string(&global_limit); - let expected_initial = [ - "GlobalLimitExec: skip=0, fetch=5", - " CoalescePartitionsExec", - " FilterExec: c3@2 > 0", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3], infinite_source=true", - ]; - assert_eq!(initial, expected_initial); + let initial = format_plan(&global_limit); + insta::assert_snapshot!( + initial, + @r" + GlobalLimitExec: skip=0, fetch=5 + CoalescePartitionsExec + FilterExec: c3@2 > 0 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3], infinite_source=true + " + ); let after_optimize = LimitPushdown::new().optimize(global_limit, &ConfigOptions::new())?; - let expected = [ - "CoalescePartitionsExec: fetch=5", - " FilterExec: c3@2 > 0, fetch=5", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3], infinite_source=true", - ]; - assert_eq!(get_plan_string(&after_optimize), expected); + let optimized = format_plan(&after_optimize); + insta::assert_snapshot!( + optimized, + @r" + CoalescePartitionsExec: fetch=5 + FilterExec: c3@2 > 0, fetch=5 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3], infinite_source=true + " + ); Ok(()) } @@ -262,20 +288,27 @@ fn merges_local_limit_with_local_limit() -> Result<()> { let child_local_limit = local_limit_exec(empty_exec, 10); let parent_local_limit = local_limit_exec(child_local_limit, 20); - let initial = get_plan_string(&parent_local_limit); - let expected_initial = [ - "LocalLimitExec: fetch=20", - " LocalLimitExec: fetch=10", - " EmptyExec", - ]; - - assert_eq!(initial, expected_initial); + let initial = format_plan(&parent_local_limit); + insta::assert_snapshot!( + initial, + @r" + LocalLimitExec: fetch=20 + LocalLimitExec: fetch=10 + EmptyExec + " + ); let after_optimize = LimitPushdown::new().optimize(parent_local_limit, &ConfigOptions::new())?; - let expected = ["GlobalLimitExec: skip=0, fetch=10", " EmptyExec"]; - assert_eq!(get_plan_string(&after_optimize), expected); + let optimized = format_plan(&after_optimize); + insta::assert_snapshot!( + optimized, + @r" + GlobalLimitExec: skip=0, fetch=10 + EmptyExec + " + ); Ok(()) } @@ -287,20 +320,27 @@ fn merges_global_limit_with_global_limit() -> Result<()> { let child_global_limit = global_limit_exec(empty_exec, 10, Some(30)); let parent_global_limit = global_limit_exec(child_global_limit, 10, Some(20)); - let initial = get_plan_string(&parent_global_limit); - let expected_initial = [ - "GlobalLimitExec: skip=10, fetch=20", - " GlobalLimitExec: skip=10, fetch=30", - " EmptyExec", - ]; - - assert_eq!(initial, expected_initial); + let initial = format_plan(&parent_global_limit); + insta::assert_snapshot!( + initial, + @r" + GlobalLimitExec: skip=10, fetch=20 + GlobalLimitExec: skip=10, fetch=30 + EmptyExec + " + ); let after_optimize = LimitPushdown::new().optimize(parent_global_limit, &ConfigOptions::new())?; - let expected = ["GlobalLimitExec: skip=20, fetch=20", " EmptyExec"]; - assert_eq!(get_plan_string(&after_optimize), expected); + let optimized = format_plan(&after_optimize); + insta::assert_snapshot!( + optimized, + @r" + GlobalLimitExec: skip=20, fetch=20 + EmptyExec + " + ); Ok(()) } @@ -312,20 +352,27 @@ fn merges_global_limit_with_local_limit() -> Result<()> { let local_limit = local_limit_exec(empty_exec, 40); let global_limit = global_limit_exec(local_limit, 20, Some(30)); - let initial = get_plan_string(&global_limit); - let expected_initial = [ - "GlobalLimitExec: skip=20, fetch=30", - " LocalLimitExec: fetch=40", - " EmptyExec", - ]; - - assert_eq!(initial, expected_initial); + let initial = format_plan(&global_limit); + insta::assert_snapshot!( + initial, + @r" + GlobalLimitExec: skip=20, fetch=30 + LocalLimitExec: fetch=40 + EmptyExec + " + ); let after_optimize = LimitPushdown::new().optimize(global_limit, &ConfigOptions::new())?; - let expected = ["GlobalLimitExec: skip=20, fetch=20", " EmptyExec"]; - assert_eq!(get_plan_string(&after_optimize), expected); + let optimized = format_plan(&after_optimize); + insta::assert_snapshot!( + optimized, + @r" + GlobalLimitExec: skip=20, fetch=20 + EmptyExec + " + ); Ok(()) } @@ -337,20 +384,27 @@ fn merges_local_limit_with_global_limit() -> Result<()> { let global_limit = global_limit_exec(empty_exec, 20, Some(30)); let local_limit = local_limit_exec(global_limit, 20); - let initial = get_plan_string(&local_limit); - let expected_initial = [ - "LocalLimitExec: fetch=20", - " GlobalLimitExec: skip=20, fetch=30", - " EmptyExec", - ]; - - assert_eq!(initial, expected_initial); + let initial = format_plan(&local_limit); + insta::assert_snapshot!( + initial, + @r" + LocalLimitExec: fetch=20 + GlobalLimitExec: skip=20, fetch=30 + EmptyExec + " + ); let after_optimize = LimitPushdown::new().optimize(local_limit, &ConfigOptions::new())?; - let expected = ["GlobalLimitExec: skip=20, fetch=20", " EmptyExec"]; - assert_eq!(get_plan_string(&after_optimize), expected); + let optimized = format_plan(&after_optimize); + insta::assert_snapshot!( + optimized, + @r" + GlobalLimitExec: skip=20, fetch=20 + EmptyExec + " + ); Ok(()) } @@ -385,30 +439,35 @@ fn preserves_nested_global_limit() -> Result<()> { // Add outer limit: GlobalLimitExec: skip=1, fetch=1 let outer_limit = global_limit_exec(outer_join, 1, Some(1)); - let initial = get_plan_string(&outer_limit); - let expected_initial = [ - "GlobalLimitExec: skip=1, fetch=1", - " NestedLoopJoinExec: join_type=Left", - " EmptyExec", - " GlobalLimitExec: skip=2, fetch=1", - " NestedLoopJoinExec: join_type=Right", - " EmptyExec", - " EmptyExec", - ]; - assert_eq!(initial, expected_initial); + let initial = format_plan(&outer_limit); + insta::assert_snapshot!( + initial, + @r" + GlobalLimitExec: skip=1, fetch=1 + NestedLoopJoinExec: join_type=Left + EmptyExec + GlobalLimitExec: skip=2, fetch=1 + NestedLoopJoinExec: join_type=Right + EmptyExec + EmptyExec + " + ); let after_optimize = LimitPushdown::new().optimize(outer_limit, &ConfigOptions::new())?; - let expected = [ - "GlobalLimitExec: skip=1, fetch=1", - " NestedLoopJoinExec: join_type=Left", - " EmptyExec", - " GlobalLimitExec: skip=2, fetch=1", - " NestedLoopJoinExec: join_type=Right", - " EmptyExec", - " EmptyExec", - ]; - assert_eq!(get_plan_string(&after_optimize), expected); + let optimized = format_plan(&after_optimize); + insta::assert_snapshot!( + optimized, + @r" + GlobalLimitExec: skip=1, fetch=1 + NestedLoopJoinExec: join_type=Left + EmptyExec + GlobalLimitExec: skip=2, fetch=1 + NestedLoopJoinExec: join_type=Right + EmptyExec + EmptyExec + " + ); Ok(()) } @@ -436,22 +495,27 @@ fn preserves_skip_before_sort() -> Result<()> { let outer_limit = global_limit_exec(sort, 1, None); - let initial = get_plan_string(&outer_limit); - let expected_initial = [ - "GlobalLimitExec: skip=1, fetch=None", - " SortExec: TopK(fetch=4), expr=[c1@0 ASC], preserve_partitioning=[false]", - " EmptyExec", - ]; - assert_eq!(initial, expected_initial); + let initial = format_plan(&outer_limit); + insta::assert_snapshot!( + initial, + @r" + GlobalLimitExec: skip=1, fetch=None + SortExec: TopK(fetch=4), expr=[c1@0 ASC], preserve_partitioning=[false] + EmptyExec + " + ); let after_optimize = LimitPushdown::new().optimize(outer_limit, &ConfigOptions::new())?; - let expected = [ - "GlobalLimitExec: skip=1, fetch=3", - " SortExec: TopK(fetch=4), expr=[c1@0 ASC], preserve_partitioning=[false]", - " EmptyExec", - ]; - assert_eq!(get_plan_string(&after_optimize), expected); + let optimized = format_plan(&after_optimize); + insta::assert_snapshot!( + optimized, + @r" + GlobalLimitExec: skip=1, fetch=3 + SortExec: TopK(fetch=4), expr=[c1@0 ASC], preserve_partitioning=[false] + EmptyExec + " + ); Ok(()) }