What is the problem the feature request solves?
#4234 introduced CometPythonMapInArrowExec, eliminating ColumnarToRow + UnsafeProjection ahead of mapInArrow / mapInPandas. End-to-end speedup is 1.30x-1.32x on narrow workloads but only 1.08x-1.09x on a 50-column workload because the input side of ArrowPythonRunner still re-encodes rows back into Arrow.
The remaining round-trip: Comet feeds ColumnarBatch.rowIterator() to the existing ArrowPythonRunner, whose writer thread reads each row via the row API and writes it back into Arrow vectors via ArrowWriter.write before sending the IPC bytes. Data that already lives in Arrow vectors goes Arrow -> row view -> Arrow inside the JVM.
Describe the potential solution
Replace the writer side of the runner with one that streams Arrow record batches straight to the Python IPC stream:
- Option A (smaller): subclass
ArrowPythonRunner and override the writer thread to accept Iterator[ColumnarBatch] and write batches via ArrowStreamWriter over a VectorSchemaRoot derived from the Comet vectors. Reuse worker management, error handling, traceback marshalling.
- Option B (bigger): write a
CometArrowPythonRunner extending BasePythonRunner[Iterator[ColumnarBatch], ColumnarBatch] directly. Cleaner separation, more code.
Either path adds new shim methods to ShimCometPythonMapInArrow for each Spark version (3.5, 4.0, 4.1, 4.2). Use benchmark_pyarrow_udf.py to validate the win, especially on wide rows.
Additional context
Closing this issue would let us flip spark.comet.exec.pythonMapInArrow.enabled to default-true and drop the experimental marker. Related: #957, #4234.
What is the problem the feature request solves?
#4234 introduced
CometPythonMapInArrowExec, eliminatingColumnarToRow + UnsafeProjectionahead ofmapInArrow/mapInPandas. End-to-end speedup is 1.30x-1.32x on narrow workloads but only 1.08x-1.09x on a 50-column workload because the input side ofArrowPythonRunnerstill re-encodes rows back into Arrow.The remaining round-trip: Comet feeds
ColumnarBatch.rowIterator()to the existingArrowPythonRunner, whose writer thread reads each row via the row API and writes it back into Arrow vectors viaArrowWriter.writebefore sending the IPC bytes. Data that already lives in Arrow vectors goes Arrow -> row view -> Arrow inside the JVM.Describe the potential solution
Replace the writer side of the runner with one that streams Arrow record batches straight to the Python IPC stream:
ArrowPythonRunnerand override the writer thread to acceptIterator[ColumnarBatch]and write batches viaArrowStreamWriterover aVectorSchemaRootderived from the Comet vectors. Reuse worker management, error handling, traceback marshalling.CometArrowPythonRunnerextendingBasePythonRunner[Iterator[ColumnarBatch], ColumnarBatch]directly. Cleaner separation, more code.Either path adds new shim methods to
ShimCometPythonMapInArrowfor each Spark version (3.5, 4.0, 4.1, 4.2). Usebenchmark_pyarrow_udf.pyto validate the win, especially on wide rows.Additional context
Closing this issue would let us flip
spark.comet.exec.pythonMapInArrow.enabledto default-true and drop the experimental marker. Related: #957, #4234.