Skip to content

perf: bypass Arrow FFI for broadcast exchange reads #3762

@andygrove

Description

@andygrove

What is the problem the feature request solves?

Summary

When a native operator reads from a broadcast exchange, the data currently passes through Arrow FFI via ScanExec. This is the same path that shuffle reads used before ShuffleScanExec was introduced in #3731.

Current Path

  1. CometBroadcastExchangeExec serializes data as coalesced, compressed Arrow IPC on the driver
  2. On each executor, CometBatchRDD.compute() deserializes the IPC stream → ColumnarBatch objects on the JVM
  3. A CometBatchIterator exposes those batches to native code
  4. Native ScanExec reads via Arrow FFI (CometBatchIterator.next() exports via Arrow C Data Interface)

Proposed Path

Introduce a BroadcastScanExec native operator analogous to ShuffleScanExec:

  1. Pass raw compressed Arrow IPC bytes from the JVM to native code (skip JVM-side deserialization)
  2. Decompress and decode natively using read_ipc_compressed()
  3. This avoids the JVM-side decompression + ArrowStreamReader deserialization + Arrow FFI export overhead

Work Required

  • New JVM iterator class (e.g. CometBroadcastBlockIterator) that yields raw compressed IPC bytes, similar to CometShuffleBlockIterator
  • New protobuf op BroadcastScan alongside existing ShuffleScan
  • New Rust operator BroadcastScanExec in native/core/src/execution/operators/
  • Planner changes in CometSink.scala to emit BroadcastScan instead of Scan for broadcast exchange inputs (behind a config flag like spark.comet.exec.broadcast.directRead.enabled)

Notes

  • Broadcast data is already marked arrow_ffi_safe=true since it comes from ArrowStreamReader with no mutable buffers, so the current FFI path is at least safe
  • Broadcast volumes are typically smaller than shuffle, so the impact may be less dramatic than perf: stop using FFI in native shuffle read path #3731, but the overhead of JVM-side decompression and FFI export is still real

Describe the potential solution

No response

Additional context

No response

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    Status

    Todo

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions