You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
When a native operator reads from a broadcast exchange, the data currently passes through Arrow FFI via ScanExec. This is the same path that shuffle reads used before ShuffleScanExec was introduced in #3731.
Current Path
CometBroadcastExchangeExec serializes data as coalesced, compressed Arrow IPC on the driver
On each executor, CometBatchRDD.compute() deserializes the IPC stream → ColumnarBatch objects on the JVM
A CometBatchIterator exposes those batches to native code
Native ScanExec reads via Arrow FFI (CometBatchIterator.next() exports via Arrow C Data Interface)
Proposed Path
Introduce a BroadcastScanExec native operator analogous to ShuffleScanExec:
Pass raw compressed Arrow IPC bytes from the JVM to native code (skip JVM-side deserialization)
Decompress and decode natively using read_ipc_compressed()
This avoids the JVM-side decompression + ArrowStreamReader deserialization + Arrow FFI export overhead
Work Required
New JVM iterator class (e.g. CometBroadcastBlockIterator) that yields raw compressed IPC bytes, similar to CometShuffleBlockIterator
New protobuf op BroadcastScan alongside existing ShuffleScan
New Rust operator BroadcastScanExec in native/core/src/execution/operators/
Planner changes in CometSink.scala to emit BroadcastScan instead of Scan for broadcast exchange inputs (behind a config flag like spark.comet.exec.broadcast.directRead.enabled)
Notes
Broadcast data is already marked arrow_ffi_safe=true since it comes from ArrowStreamReader with no mutable buffers, so the current FFI path is at least safe
Broadcast volumes are typically smaller than shuffle, so the impact may be less dramatic than perf: stop using FFI in native shuffle read path #3731, but the overhead of JVM-side decompression and FFI export is still real
What is the problem the feature request solves?
Summary
When a native operator reads from a broadcast exchange, the data currently passes through Arrow FFI via
ScanExec. This is the same path that shuffle reads used beforeShuffleScanExecwas introduced in #3731.Current Path
CometBroadcastExchangeExecserializes data as coalesced, compressed Arrow IPC on the driverCometBatchRDD.compute()deserializes the IPC stream →ColumnarBatchobjects on the JVMCometBatchIteratorexposes those batches to native codeScanExecreads via Arrow FFI (CometBatchIterator.next()exports via Arrow C Data Interface)Proposed Path
Introduce a
BroadcastScanExecnative operator analogous toShuffleScanExec:read_ipc_compressed()ArrowStreamReaderdeserialization + Arrow FFI export overheadWork Required
CometBroadcastBlockIterator) that yields raw compressed IPC bytes, similar toCometShuffleBlockIteratorBroadcastScanalongside existingShuffleScanBroadcastScanExecinnative/core/src/execution/operators/CometSink.scalato emitBroadcastScaninstead ofScanfor broadcast exchange inputs (behind a config flag likespark.comet.exec.broadcast.directRead.enabled)Notes
arrow_ffi_safe=truesince it comes fromArrowStreamReaderwith no mutable buffers, so the current FFI path is at least safeDescribe the potential solution
No response
Additional context
No response