perf: stop using FFI in native shuffle read path#3731
perf: stop using FFI in native shuffle read path#3731andygrove merged 23 commits intoapache:mainfrom
Conversation
Adds a design document for bypassing Arrow FFI in the shuffle read path when both the shuffle writer and downstream operator are native.
Add a new ShuffleScanExec operator that pulls compressed shuffle blocks from JVM via CometShuffleBlockIterator and decodes them natively using read_ipc_compressed(). Uses the pre-pull pattern (get_next_batch called externally before poll_next) to avoid JNI calls on tokio threads.
Fix two bugs discovered during testing: - ClassCastException: factory closure incorrectly cast Partition to CometExecPartition before extracting ShuffledRowRDDPartition; the partition passed to the factory is already the unwrapped partition from the input RDD - NoSuchElementException in SQLShuffleReadMetricsReporter: metrics field in CometShuffledBatchRDD was not exposed as a val, causing Map.empty to be used instead of the real shuffle metrics map Add Scala integration test that runs a repartition+aggregate query with direct read enabled and disabled to verify result parity. Add Rust unit test for read_ipc_compressed codec round-trip.
- Remove redundant getCurrentBlockLength() JNI call (reuse hasNext() return value) - Make readAsRawStream() lazy instead of materializing all streams to a List - Remove pointless DirectByteBuffer re-allocation in close() - Remove dead sparkPlanToInputIdx map
Skip test_read_compressed_ipc_block under Miri since it calls foreign zstd functions that Miri cannot execute.
a7e9659 to
19cb04b
Compare
Nice work! I didn't expect removing FFI to bring such great benefits. Could you share where these benefits mainly come from? Is it due to fewer JNI calls, or was the overhead from ArrowImporter relatively high? |
|
I read the relevant implementation in Gluten, which defines a lightweight ColumanBatch that only holds a nativeHandle and does not make arrow imports. |
| int bytesRead = channel.read(headerBuf); | ||
| if (bytesRead < 0) { | ||
| if (headerBuf.position() == 0) { | ||
| return -1; |
There was a problem hiding this comment.
We can call close() earlier here.
| // Field count discarded - schema determined by ShuffleScan protobuf fields | ||
| headerBuf.getLong(); | ||
|
|
||
| long bytesToRead = compressedLength - 8; |
There was a problem hiding this comment.
nit: add a comment explaining why -8
| case rdd: CometShuffledBatchRDD => | ||
| val dep = rdd.dependency | ||
| val rddMetrics = rdd.metrics | ||
| factories(scanIdx) = (context, part) => { |
There was a problem hiding this comment.
Much of the logic here duplicates CometShuffledBatchRDD#compute. Perhaps we could add a computeAsShuffleBlockIterator method to CometShuffledBatchRDD and reuse createReader logic. Like:
class CometShuffledBatchRDD {
def computeAsShuffleBlockIterator(context: TaskContext, split: Partition): CometShuffleBlockIterator = {
...
}
}
factories(scanIdx) = rdd.computeAsShuffleBlockIterator
There was a problem hiding this comment.
In that case, we no longer need to buildShuffleBlockIteratorFactories; we can compute it in CometExecRDD like this:
class CometExecRDD {
override def compute(split: Partition, context: TaskContext): Iterator[ColumnarBatch] = {
val partition = split.asInstanceOf[CometExecPartition]
val inputs = inputRDDs.zip(partition.inputPartitions).zipWithIndex.map {
case ((rdd: CometShuffledBatchRDD, part), idx) if shuffleScanIndices.contains(idx) =>
rdd.computeAsShuffleBlockIterator(part, context)
case ((rdd, part), _) => rdd.iterator(part, context)
}
...
}
- Close stream on clean EOF in CometShuffleBlockIterator - Add comment explaining compressedLength - 8 subtraction - Extract createReader and computeAsShuffleBlockIterator into CometShuffledBatchRDD to eliminate duplicated reader-creation logic from buildShuffleBlockIteratorFactories - Simplify CometExecRDD to take shuffleScanIndices and create iterators directly from CometShuffledBatchRDD inputs - Add test with multiple shuffles in plan (join of two shuffled datasets)
Thanks! Yes, the overhead from export/import is high, including serializing the schema for every batch. In this case, doing export/import at all was just not needed, so the overhead was just unnecessary. |
That's interesting. I think Comet could benefit from this approach as well. |
… JVM shuffle ShuffleScanExec reads compressed IPC blocks directly, but native shuffle may dictionary-encode string/binary columns. The IPC format preserves this encoding, causing a schema mismatch since the protobuf schema only declares value types. Add an unpack step after read_ipc_compressed() to cast dictionary arrays to their value types. With this fix, the direct read optimization can safely handle dictionary-encoded data, so extend it to also support JVM columnar shuffle (which uses the same wire format as native shuffle).
|
Thanks for the detailed review @wForget. I have pushed commits to address the feedback so far. |
| builder.clearChildren() | ||
| Some(builder.setShuffleScan(scanBuilder).build()) | ||
| } else { | ||
| withInfo(op, "unsupported data types for shuffle direct read") |
There was a problem hiding this comment.
this error message not really conforms to the condition IMO
There was a problem hiding this comment.
I added the node name to the message
| + bytesToRead | ||
| + " exceeds maximum of " | ||
| + Integer.MAX_VALUE | ||
| + ". Try reducing shuffle batch size."); |
There was a problem hiding this comment.
please put the shuffle batch size config param name
| // Note: native side uses get_direct_buffer_address (base pointer) + currentBlockLength, | ||
| // not the buffer's position/limit. No flip needed. | ||
|
|
||
| currentBlockLength = (int) bytesToRead; |
There was a problem hiding this comment.
maybe we can do int conversion only once?
| headerBuf.clear(); | ||
| while (headerBuf.hasRemaining()) { |
There was a problem hiding this comment.
headerBuf.clear();
while (headerBuf.hasRemaining()) {
how does it work? 🤔 if headerBuf.clear() executed it shouldn't have headerBuf.hasRemaining() ?
There was a problem hiding this comment.
while it has remaining space (up to the limit) to be filled. I added a clarifying comment
- Clarify ByteBuffer.clear()/hasRemaining() pattern with comment - Include config param name in block size error message - Perform int cast of bytesToRead once and reuse - Include operator name in unsupported data type message
85b0d4e to
f377995
Compare
| private final InputStream inputStream; | ||
| private final ByteBuffer headerBuf = ByteBuffer.allocate(16).order(ByteOrder.LITTLE_ENDIAN); | ||
| private ByteBuffer dataBuf = ByteBuffer.allocateDirect(INITIAL_BUFFER_SIZE); | ||
| private boolean closed = false; |
There was a problem hiding this comment.
is this class thread safe? and closed should be volatile or atomic?
There was a problem hiding this comment.
this will only be called on a single thread
| )))); | ||
| } | ||
|
|
||
| let mut env = JVMClasses::get_env()?; |
There was a problem hiding this comment.
maybe we can pass env by reference? or is it expensive to create env for each blcok?
There was a problem hiding this comment.
get_env calls attach_current_thread, which is a no-op for already-attached threads, so the overhead is minimal AFAIK
| /// The ShuffleScanExec producing input batches. | ||
| shuffle_scan: ShuffleScanExec, | ||
| /// Schema of the output. | ||
| schema: SchemaRef, |
There was a problem hiding this comment.
can we take schema from shuffle_scan?
Use shuffle_scan.schema() instead of storing a separate copy.
|
Thanks for the review @comphead. I have addressed feedback so far. |
comphead
left a comment
There was a problem hiding this comment.
Thanks @andygrove it is lgtm, also I was able to see some benefits on prod like environment
|
Thanks @comphead @parthchandra @wForget! Next up is #3754 |
Which issue does this PR close?
Performance improvements for native shuffle read. Shows 13% improvement in TPC-H @ 1TB.
Rationale for this change
Simplifies the shuffle direct read code path, removing unnecessary FFI transfers.
What changes are included in this PR?
How are these changes tested?