Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 87 additions & 3 deletions benchmarks/src/hj.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ use std::path::PathBuf;

use futures::StreamExt;

// TODO: Add existence joins

/// Run the Hash Join benchmark
///
/// This micro-benchmark focuses on the performance characteristics of Hash Joins.
Expand Down Expand Up @@ -303,6 +301,90 @@ const HASH_QUERIES: &[HashJoinQuery] = &[
build_size: "100K_(20%_dups)",
probe_size: "60M",
},
// RightSemi Join benchmarks with Int32 keys
// Fanout: N/A for semi joins (returns at most one row per probe key)
//
// Q16: RightSemi, Small build (25 rows), 100% Hit rate
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's also doc the fanout here, it means if we change the join type to inner join, for each probe row, how many matches can be found on average.

This can be automatically calculated from explain analyze the query, after changing join type to inner join, it will show up in the HashJoinExec's metrics.

And later we should ensure those queries have covered different fanouts.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, my previous explanation may have been confusing. Let me try again.

Suppose we have the query:

SELECT *
FROM generate_series(100) AS t1(v1)
RIGHT SEMI JOIN generate_series(10) AS t2(v1)
ON (t1.v1 % 10) = t2.v1

Here, each probe row from t2 matches 10 rows on average from t1, so the matching rows per probe row ratio is 10:1.

Although a semi join only returns whether a match exists, this ratio still matters for execution behavior, because we are evaluating short-circuit optimizations here.

So I suggest we could doc this metric here. See the original reply for how to get this matching ratio metric automatically.

// Build Side: nation (25 rows) | Probe Side: customer (1.5M rows)
HashJoinQuery {
sql: r###"SELECT c.k
FROM (SELECT CAST(n_nationkey AS INT) as k FROM nation) n
RIGHT SEMI JOIN (SELECT CAST(c_nationkey AS INT) as k FROM customer) c
ON n.k = c.k"###,
density: 1.0,
prob_hit: 1.0,
build_size: "25",
probe_size: "1.5M_RightSemi",
},
// Q17: RightSemi, Medium build (100K rows), 100% Hit rate
// Build Side: supplier (100K rows) | Probe Side: lineitem (60M rows)
HashJoinQuery {
sql: r###"SELECT l.k
FROM (SELECT CAST(s_suppkey AS INT) as k FROM supplier) s
RIGHT SEMI JOIN (SELECT CAST(l_suppkey AS INT) as k FROM lineitem) l
ON s.k = l.k"###,
density: 1.0,
prob_hit: 1.0,
build_size: "100K",
probe_size: "60M_RightSemi",
},
// Q18: RightSemi, Medium build (100K rows), 10% Hit rate
// Build Side: supplier (100K rows) | Probe Side: lineitem (60M rows)
HashJoinQuery {
sql: r###"SELECT l.k
FROM (SELECT CAST(s_suppkey AS INT) as k FROM supplier) s
RIGHT SEMI JOIN (
SELECT CAST(CASE WHEN l_suppkey % 10 = 0 THEN l_suppkey ELSE l_suppkey + 1000000 END AS INT) as k
FROM lineitem
) l
ON s.k = l.k"###,
density: 1.0,
prob_hit: 0.1,
build_size: "100K",
probe_size: "60M_RightSemi",
},
// RightAnti Join benchmarks with Int32 keys
// Fanout: N/A for anti joins (returns at most one row per probe key)
//
// Q19: RightAnti, Small build (25 rows), 100% Hit rate (no output)
// Build Side: nation (25 rows) | Probe Side: customer (1.5M rows)
HashJoinQuery {
sql: r###"SELECT c.k
FROM (SELECT CAST(n_nationkey AS INT) as k FROM nation) n
RIGHT ANTI JOIN (SELECT CAST(c_nationkey AS INT) as k FROM customer) c
ON n.k = c.k"###,
density: 1.0,
prob_hit: 1.0,
build_size: "25",
probe_size: "1.5M_RightAnti",
},
// Q20: RightAnti, Medium build (100K rows), 100% Hit rate (no output)
// Build Side: supplier (100K rows) | Probe Side: lineitem (60M rows)
HashJoinQuery {
sql: r###"SELECT l.k
FROM (SELECT CAST(s_suppkey AS INT) as k FROM supplier) s
RIGHT ANTI JOIN (SELECT CAST(l_suppkey AS INT) as k FROM lineitem) l
ON s.k = l.k"###,
density: 1.0,
prob_hit: 1.0,
build_size: "100K",
probe_size: "60M_RightAnti",
},
// Q21: RightAnti, Medium build (100K rows), 10% Hit rate (90% output)
// Build Side: supplier (100K rows) | Probe Side: lineitem (60M rows)
HashJoinQuery {
sql: r###"SELECT l.k
FROM (SELECT CAST(s_suppkey AS INT) as k FROM supplier) s
RIGHT ANTI JOIN (
SELECT CAST(CASE WHEN l_suppkey % 10 = 0 THEN l_suppkey ELSE l_suppkey + 1000000 END AS INT) as k
FROM lineitem
) l
ON s.k = l.k"###,
density: 1.0,
prob_hit: 0.1,
build_size: "100K",
probe_size: "60M_RightAnti",
},
];

impl RunOpt {
Expand All @@ -323,7 +405,9 @@ impl RunOpt {
None => 1..=HASH_QUERIES.len(),
};

let config = self.common.config()?;
let mut config = self.common.config()?;
// Disable join reordering to ensure the optimizer doesn't swap join sides
config.options_mut().optimizer.join_reordering = false;
let rt = self.common.build_runtime()?;
let ctx = SessionContext::new_with_config_rt(config, rt);

Expand Down
5 changes: 5 additions & 0 deletions datafusion/physical-plan/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -112,3 +112,8 @@ required-features = ["test_utils"]
harness = false
name = "aggregate_vectorized"
required-features = ["test_utils"]

[[bench]]
harness = false
name = "hash_join_semi_anti"
required-features = ["test_utils"]
Loading
Loading