Skip to content

perf: stop using FFI in native shuffle read path#3731

Merged
andygrove merged 23 commits intoapache:mainfrom
andygrove:shuffle-direct-read
Mar 21, 2026
Merged

perf: stop using FFI in native shuffle read path#3731
andygrove merged 23 commits intoapache:mainfrom
andygrove:shuffle-direct-read

Conversation

@andygrove
Copy link
Copy Markdown
Member

@andygrove andygrove commented Mar 18, 2026

Which issue does this PR close?

Performance improvements for native shuffle read. Shows 13% improvement in TPC-H @ 1TB.

Rationale for this change

Simplifies the shuffle direct read code path, removing unnecessary FFI transfers.

What changes are included in this PR?

How are these changes tested?

Adds a design document for bypassing Arrow FFI in the shuffle read
path when both the shuffle writer and downstream operator are native.
Add a new ShuffleScanExec operator that pulls compressed shuffle blocks
from JVM via CometShuffleBlockIterator and decodes them natively using
read_ipc_compressed(). Uses the pre-pull pattern (get_next_batch called
externally before poll_next) to avoid JNI calls on tokio threads.
Fix two bugs discovered during testing:
- ClassCastException: factory closure incorrectly cast Partition to
  CometExecPartition before extracting ShuffledRowRDDPartition; the
  partition passed to the factory is already the unwrapped partition
  from the input RDD
- NoSuchElementException in SQLShuffleReadMetricsReporter: metrics
  field in CometShuffledBatchRDD was not exposed as a val, causing
  Map.empty to be used instead of the real shuffle metrics map

Add Scala integration test that runs a repartition+aggregate query
with direct read enabled and disabled to verify result parity.
Add Rust unit test for read_ipc_compressed codec round-trip.
- Remove redundant getCurrentBlockLength() JNI call (reuse hasNext() return value)
- Make readAsRawStream() lazy instead of materializing all streams to a List
- Remove pointless DirectByteBuffer re-allocation in close()
- Remove dead sparkPlanToInputIdx map
Skip test_read_compressed_ipc_block under Miri since it calls
foreign zstd functions that Miri cannot execute.
@andygrove andygrove marked this pull request as ready for review March 19, 2026 13:59
@andygrove andygrove changed the title feat: bypass Arrow FFI for native shuffle read path feat: replace Arrow IPC with raw buffer format in shuffle Mar 19, 2026
@andygrove andygrove changed the title feat: replace Arrow IPC with raw buffer format in shuffle feat: shuffle direct read and raw buffer shuffle format Mar 19, 2026
@andygrove andygrove marked this pull request as draft March 19, 2026 16:33
@andygrove andygrove force-pushed the shuffle-direct-read branch from a7e9659 to 19cb04b Compare March 19, 2026 16:36
@andygrove andygrove changed the title feat: shuffle direct read and raw buffer shuffle format feat: stop using FFI in native shuffle read path Mar 19, 2026
@andygrove andygrove marked this pull request as ready for review March 19, 2026 16:38
@andygrove andygrove requested review from wForget March 19, 2026 17:02
@wForget
Copy link
Copy Markdown
Member

wForget commented Mar 20, 2026

Shows 13% improvement in TPC-H @ 1TB.

Nice work! I didn't expect removing FFI to bring such great benefits. Could you share where these benefits mainly come from? Is it due to fewer JNI calls, or was the overhead from ArrowImporter relatively high?

@wForget
Copy link
Copy Markdown
Member

wForget commented Mar 20, 2026

I read the relevant implementation in Gluten, which defines a lightweight ColumanBatch that only holds a nativeHandle and does not make arrow imports.

https://github.com/apache/gluten/blob/main/gluten-arrow/src/main/java/org/apache/gluten/columnarbatch/IndicatorVector.java

int bytesRead = channel.read(headerBuf);
if (bytesRead < 0) {
if (headerBuf.position() == 0) {
return -1;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can call close() earlier here.

// Field count discarded - schema determined by ShuffleScan protobuf fields
headerBuf.getLong();

long bytesToRead = compressedLength - 8;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: add a comment explaining why -8

case rdd: CometShuffledBatchRDD =>
val dep = rdd.dependency
val rddMetrics = rdd.metrics
factories(scanIdx) = (context, part) => {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Much of the logic here duplicates CometShuffledBatchRDD#compute. Perhaps we could add a computeAsShuffleBlockIterator method to CometShuffledBatchRDD and reuse createReader logic. Like:

class CometShuffledBatchRDD {
  def computeAsShuffleBlockIterator(context: TaskContext, split: Partition): CometShuffleBlockIterator = {
     ...
  }
}

factories(scanIdx) = rdd.computeAsShuffleBlockIterator

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In that case, we no longer need to buildShuffleBlockIteratorFactories; we can compute it in CometExecRDD like this:

class CometExecRDD {
  override def compute(split: Partition, context: TaskContext): Iterator[ColumnarBatch] = {
    val partition = split.asInstanceOf[CometExecPartition]

    val inputs = inputRDDs.zip(partition.inputPartitions).zipWithIndex.map {
      case ((rdd: CometShuffledBatchRDD, part), idx) if shuffleScanIndices.contains(idx) =>
        rdd.computeAsShuffleBlockIterator(part, context)
      case ((rdd, part), _) => rdd.iterator(part, context)
    }
...
}

- Close stream on clean EOF in CometShuffleBlockIterator
- Add comment explaining compressedLength - 8 subtraction
- Extract createReader and computeAsShuffleBlockIterator into
  CometShuffledBatchRDD to eliminate duplicated reader-creation
  logic from buildShuffleBlockIteratorFactories
- Simplify CometExecRDD to take shuffleScanIndices and create
  iterators directly from CometShuffledBatchRDD inputs
- Add test with multiple shuffles in plan (join of two shuffled
  datasets)
@andygrove
Copy link
Copy Markdown
Member Author

Shows 13% improvement in TPC-H @ 1TB.

Nice work! I didn't expect removing FFI to bring such great benefits. Could you share where these benefits mainly come from? Is it due to fewer JNI calls, or was the overhead from ArrowImporter relatively high?

Thanks! Yes, the overhead from export/import is high, including serializing the schema for every batch.

In this case, doing export/import at all was just not needed, so the overhead was just unnecessary.

@andygrove
Copy link
Copy Markdown
Member Author

I read the relevant implementation in Gluten, which defines a lightweight ColumanBatch that only holds a nativeHandle and does not make arrow imports.

https://github.com/apache/gluten/blob/main/gluten-arrow/src/main/java/org/apache/gluten/columnarbatch/IndicatorVector.java

That's interesting. I think Comet could benefit from this approach as well.

@andygrove andygrove changed the title feat: stop using FFI in native shuffle read path perf: stop using FFI in native shuffle read path Mar 20, 2026
… JVM shuffle

ShuffleScanExec reads compressed IPC blocks directly, but native shuffle
may dictionary-encode string/binary columns. The IPC format preserves
this encoding, causing a schema mismatch since the protobuf schema only
declares value types. Add an unpack step after read_ipc_compressed() to
cast dictionary arrays to their value types.

With this fix, the direct read optimization can safely handle
dictionary-encoded data, so extend it to also support JVM columnar
shuffle (which uses the same wire format as native shuffle).
@andygrove
Copy link
Copy Markdown
Member Author

Thanks for the detailed review @wForget. I have pushed commits to address the feedback so far.

builder.clearChildren()
Some(builder.setShuffleScan(scanBuilder).build())
} else {
withInfo(op, "unsupported data types for shuffle direct read")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this error message not really conforms to the condition IMO

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added the node name to the message

+ bytesToRead
+ " exceeds maximum of "
+ Integer.MAX_VALUE
+ ". Try reducing shuffle batch size.");
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please put the shuffle batch size config param name

// Note: native side uses get_direct_buffer_address (base pointer) + currentBlockLength,
// not the buffer's position/limit. No flip needed.

currentBlockLength = (int) bytesToRead;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we can do int conversion only once?

Comment on lines +73 to +74
headerBuf.clear();
while (headerBuf.hasRemaining()) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

    headerBuf.clear();
    while (headerBuf.hasRemaining()) {

how does it work? 🤔 if headerBuf.clear() executed it shouldn't have headerBuf.hasRemaining() ?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

while it has remaining space (up to the limit) to be filled. I added a clarifying comment

- Clarify ByteBuffer.clear()/hasRemaining() pattern with comment
- Include config param name in block size error message
- Perform int cast of bytesToRead once and reuse
- Include operator name in unsupported data type message
@andygrove andygrove force-pushed the shuffle-direct-read branch from 85b0d4e to f377995 Compare March 20, 2026 22:33
private final InputStream inputStream;
private final ByteBuffer headerBuf = ByteBuffer.allocate(16).order(ByteOrder.LITTLE_ENDIAN);
private ByteBuffer dataBuf = ByteBuffer.allocateDirect(INITIAL_BUFFER_SIZE);
private boolean closed = false;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this class thread safe? and closed should be volatile or atomic?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this will only be called on a single thread

))));
}

let mut env = JVMClasses::get_env()?;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we can pass env by reference? or is it expensive to create env for each blcok?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get_env calls attach_current_thread, which is a no-op for already-attached threads, so the overhead is minimal AFAIK

/// The ShuffleScanExec producing input batches.
shuffle_scan: ShuffleScanExec,
/// Schema of the output.
schema: SchemaRef,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we take schema from shuffle_scan?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Fixed.

Use shuffle_scan.schema() instead of storing a separate copy.
@andygrove
Copy link
Copy Markdown
Member Author

Thanks for the review @comphead. I have addressed feedback so far.

Copy link
Copy Markdown
Contributor

@comphead comphead left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @andygrove it is lgtm, also I was able to see some benefits on prod like environment

@andygrove andygrove merged commit 8bab4a5 into apache:main Mar 21, 2026
60 checks passed
@andygrove andygrove deleted the shuffle-direct-read branch March 21, 2026 18:02
@andygrove
Copy link
Copy Markdown
Member Author

Thanks @comphead @parthchandra @wForget! Next up is #3754

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants