Skip to content

Eliminate remaining row<->Arrow round-trip in CometPythonMapInArrowExec #4240

@andygrove

Description

@andygrove

What is the problem the feature request solves?

#4234 introduced CometPythonMapInArrowExec, eliminating ColumnarToRow + UnsafeProjection ahead of mapInArrow / mapInPandas. End-to-end speedup is 1.30x-1.32x on narrow workloads but only 1.08x-1.09x on a 50-column workload because the input side of ArrowPythonRunner still re-encodes rows back into Arrow.

The remaining round-trip: Comet feeds ColumnarBatch.rowIterator() to the existing ArrowPythonRunner, whose writer thread reads each row via the row API and writes it back into Arrow vectors via ArrowWriter.write before sending the IPC bytes. Data that already lives in Arrow vectors goes Arrow -> row view -> Arrow inside the JVM.

Describe the potential solution

Replace the writer side of the runner with one that streams Arrow record batches straight to the Python IPC stream:

  • Option A (smaller): subclass ArrowPythonRunner and override the writer thread to accept Iterator[ColumnarBatch] and write batches via ArrowStreamWriter over a VectorSchemaRoot derived from the Comet vectors. Reuse worker management, error handling, traceback marshalling.
  • Option B (bigger): write a CometArrowPythonRunner extending BasePythonRunner[Iterator[ColumnarBatch], ColumnarBatch] directly. Cleaner separation, more code.

Either path adds new shim methods to ShimCometPythonMapInArrow for each Spark version (3.5, 4.0, 4.1, 4.2). Use benchmark_pyarrow_udf.py to validate the win, especially on wide rows.

Additional context

Closing this issue would let us flip spark.comet.exec.pythonMapInArrow.enabled to default-true and drop the experimental marker. Related: #957, #4234.

Metadata

Metadata

Assignees

Type

No type
No fields configured for issues without a type.

Projects

Status

Todo

Relationships

None yet

Development

No branches or pull requests

Issue actions