From 105bb1d57ee4bc4c9f0b1c062a6940f7c84819e0 Mon Sep 17 00:00:00 2001 From: tokoko Date: Sat, 14 Feb 2026 10:38:49 +0100 Subject: [PATCH 1/4] feat: zero-copy columnar conversion for ArrowColumnVector-backed batches --- .../arrow/CometArrowConverters.scala | 27 ++++++- .../sql/comet/CometSparkToColumnarExec.scala | 17 +++-- .../apache/comet/exec/CometExecSuite.scala | 72 +++++++++++++++++++ 3 files changed, 107 insertions(+), 9 deletions(-) diff --git a/common/src/main/scala/org/apache/spark/sql/comet/execution/arrow/CometArrowConverters.scala b/common/src/main/scala/org/apache/spark/sql/comet/execution/arrow/CometArrowConverters.scala index 6d52078181..f0d7aedc1e 100644 --- a/common/src/main/scala/org/apache/spark/sql/comet/execution/arrow/CometArrowConverters.scala +++ b/common/src/main/scala/org/apache/spark/sql/comet/execution/arrow/CometArrowConverters.scala @@ -27,10 +27,10 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.comet.util.Utils import org.apache.spark.sql.types.StructType -import org.apache.spark.sql.vectorized.{ColumnarArray, ColumnarBatch} +import org.apache.spark.sql.vectorized.{ArrowColumnVector, ColumnarArray, ColumnarBatch} import org.apache.comet.CometArrowAllocator -import org.apache.comet.vector.NativeUtil +import org.apache.comet.vector.{CometVector, NativeUtil} object CometArrowConverters extends Logging { // This is similar how Spark converts internal row to Arrow format except that it is transforming @@ -185,6 +185,29 @@ object CometArrowConverters extends Logging { } } + /** + * Attempts zero-copy conversion of a ColumnarBatch whose columns are all ArrowColumnVector + * instances. Returns Some(iterator) if successful, None if the batch is not Arrow-backed. + */ + def tryZeroCopyConvert(batch: ColumnarBatch): Option[Iterator[ColumnarBatch]] = { + val numCols = batch.numCols() + if (numCols == 0) return None + + // Check that every column is an ArrowColumnVector + var i = 0 + while (i < numCols) { + if (!batch.column(i).isInstanceOf[ArrowColumnVector]) return None + i += 1 + } + + // All columns are Arrow-backed; wrap their ValueVectors as CometVectors (zero-copy) + val cometVectors = (0 until numCols).map { idx => + val valueVector = batch.column(idx).asInstanceOf[ArrowColumnVector].getValueVector + CometVector.getVector(valueVector, true, null) + } + Some(Iterator(new ColumnarBatch(cometVectors.toArray, batch.numRows()))) + } + def columnarBatchToArrowBatchIter( colBatch: ColumnarBatch, schema: StructType, diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometSparkToColumnarExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometSparkToColumnarExec.scala index a8a61e7a71..3a0695f009 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometSparkToColumnarExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometSparkToColumnarExec.scala @@ -106,13 +106,16 @@ case class CometSparkToColumnarExec(child: SparkPlan) .mapPartitionsInternal { sparkBatches => val arrowBatches = sparkBatches.flatMap { sparkBatch => - val context = TaskContext.get() - CometArrowConverters.columnarBatchToArrowBatchIter( - sparkBatch, - schema, - maxRecordsPerBatch, - timeZoneId, - context) + CometArrowConverters.tryZeroCopyConvert(sparkBatch).getOrElse { + // Fallback: element-by-element copy via ArrowWriter + val context = TaskContext.get() + CometArrowConverters.columnarBatchToArrowBatchIter( + sparkBatch, + schema, + maxRecordsPerBatch, + timeZoneId, + context) + } } createTimingIter(arrowBatches, numInputRows, numOutputBatches, conversionTime) } diff --git a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala index bcbbdb7f92..839b715bb4 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala @@ -2159,6 +2159,78 @@ class CometExecSuite extends CometTestBase { } } + test("SparkToColumnar zero-copy for ArrowColumnVector input") { + import org.apache.arrow.memory.RootAllocator + import org.apache.arrow.vector.{IntVector, VarCharVector} + import org.apache.spark.sql.vectorized.{ArrowColumnVector, ColumnarBatch} + import org.apache.spark.sql.comet.execution.arrow.CometArrowConverters + import org.apache.comet.vector.CometVector + + val allocator = new RootAllocator(Long.MaxValue) + try { + // Create Arrow vectors with test data + val intVector = new IntVector("intCol", allocator) + intVector.allocateNew(3) + intVector.set(0, 10) + intVector.set(1, 20) + intVector.setNull(2) + intVector.setValueCount(3) + + val varcharVector = new VarCharVector("strCol", allocator) + varcharVector.allocateNew() + varcharVector.setSafe(0, "hello".getBytes) + varcharVector.setSafe(1, "world".getBytes) + varcharVector.setNull(2) + varcharVector.setValueCount(3) + + // Wrap in Spark's ArrowColumnVector + val arrowCol0 = new ArrowColumnVector(intVector) + val arrowCol1 = new ArrowColumnVector(varcharVector) + val inputBatch = new ColumnarBatch(Array(arrowCol0, arrowCol1), 3) + + // Zero-copy conversion should succeed + val result = CometArrowConverters.tryZeroCopyConvert(inputBatch) + assert(result.isDefined, "Should detect ArrowColumnVector and return Some") + + val outputBatch = result.get.next() + assert(outputBatch.numRows() == 3) + assert(outputBatch.numCols() == 2) + + // Verify columns are CometVectors wrapping the same underlying ValueVectors (zero-copy) + val outCol0 = outputBatch.column(0).asInstanceOf[CometVector] + val outCol1 = outputBatch.column(1).asInstanceOf[CometVector] + assert(outCol0.getValueVector eq intVector, "Should be the same ValueVector instance") + assert(outCol1.getValueVector eq varcharVector, "Should be the same ValueVector instance") + + // Verify data is accessible through the CometVector wrappers + assert(outCol0.getInt(0) == 10) + assert(outCol0.getInt(1) == 20) + assert(outCol0.isNullAt(2)) + assert(outCol1.getUTF8String(0).toString == "hello") + assert(outCol1.getUTF8String(1).toString == "world") + assert(outCol1.isNullAt(2)) + + inputBatch.close() + } finally { + allocator.close() + } + } + + test("SparkToColumnar tryZeroCopyConvert returns None for non-Arrow batches") { + import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector + import org.apache.spark.sql.vectorized.ColumnarBatch + import org.apache.spark.sql.comet.execution.arrow.CometArrowConverters + import org.apache.spark.sql.types.IntegerType + + val sparkCol = new OnHeapColumnVector(10, IntegerType) + val batch = new ColumnarBatch(Array(sparkCol), 10) + + val result = CometArrowConverters.tryZeroCopyConvert(batch) + assert(result.isEmpty, "Should return None for non-ArrowColumnVector batches") + + batch.close() + } + test("LocalTableScanExec spark fallback") { withSQLConf(CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED.key -> "false") { val df = Seq.range(0, 10).toDF("id") From b914e62fa3c924b8215b72e50e3fd86c0f2ad20e Mon Sep 17 00:00:00 2001 From: tokoko Date: Sat, 21 Feb 2026 12:22:07 +0100 Subject: [PATCH 2/4] feat: use arrow c data interface --- .../org/apache/comet/vector/CDataUtil.scala | 81 +++++++++ .../arrow/CometArrowConverters.scala | 56 +++--- spark/pom.xml | 8 +- .../spark/sql/comet/ArrowCDataExport.scala | 89 +++++++++ .../sql/comet/CometSparkToColumnarExec.scala | 19 +- .../exec/CometArrowConvertersSuite.scala | 170 ++++++++++++++++++ .../apache/comet/exec/CometExecSuite.scala | 72 -------- 7 files changed, 382 insertions(+), 113 deletions(-) create mode 100644 common/src/main/scala/org/apache/comet/vector/CDataUtil.scala create mode 100644 spark/src/main/scala/org/apache/spark/sql/comet/ArrowCDataExport.scala create mode 100644 spark/src/test/scala/org/apache/comet/exec/CometArrowConvertersSuite.scala diff --git a/common/src/main/scala/org/apache/comet/vector/CDataUtil.scala b/common/src/main/scala/org/apache/comet/vector/CDataUtil.scala new file mode 100644 index 0000000000..2069b620c8 --- /dev/null +++ b/common/src/main/scala/org/apache/comet/vector/CDataUtil.scala @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.vector + +import org.apache.arrow.c.{ArrowArray, ArrowImporter, ArrowSchema, CDataDictionaryProvider} +import org.apache.arrow.memory.BufferAllocator +import org.apache.spark.sql.vectorized.ColumnarBatch + +/** + * Import-only C Data Interface bridge for Comet's shaded Arrow side. + * + * The caller (in the spark module) provides an export callback that fills pre-allocated + * ArrowArray/ArrowSchema structs at given memory addresses. This object allocates those structs + * using the shaded allocator and imports the resulting vectors as CometVectors. + * + * This design eliminates the need for reflection to cross the shading boundary: the spark module + * calls unshaded Arrow directly, and the common module calls shaded Arrow directly. The two sides + * communicate through Long memory addresses only. + */ +object CDataUtil { + + /** + * Imports a columnar batch from the C Data Interface. + * + * Allocates shaded ArrowArray/ArrowSchema structs for each column, invokes the provided export + * function to fill them (using unshaded Arrow on the caller side), then imports the vectors + * into CometVectors. + * + * @param numCols + * number of columns to import + * @param numRows + * row count for the resulting ColumnarBatch + * @param allocator + * shaded BufferAllocator for struct and vector allocation + * @param exportFn + * callback (colIndex, arrayAddr, schemaAddr) => Unit that exports the unshaded vector into + * the struct memory at the given addresses + * @return + * a ColumnarBatch with CometVector columns + */ + def importBatch( + numCols: Int, + numRows: Int, + allocator: BufferAllocator, + exportFn: (Int, Long, Long) => Unit): ColumnarBatch = { + val cometVectors = (0 until numCols).map { idx => + val arrowArray = ArrowArray.allocateNew(allocator) + val arrowSchema = ArrowSchema.allocateNew(allocator) + try { + exportFn(idx, arrowArray.memoryAddress(), arrowSchema.memoryAddress()) + val importer = new ArrowImporter(allocator) + val dictionaryProvider = new CDataDictionaryProvider() + val vector = importer.importVector(arrowArray, arrowSchema, dictionaryProvider) + CometVector.getVector(vector, true, dictionaryProvider) + } catch { + case e: Exception => + arrowArray.close() + arrowSchema.close() + throw e + } + } + new ColumnarBatch(cometVectors.toArray, numRows) + } +} diff --git a/common/src/main/scala/org/apache/spark/sql/comet/execution/arrow/CometArrowConverters.scala b/common/src/main/scala/org/apache/spark/sql/comet/execution/arrow/CometArrowConverters.scala index f0d7aedc1e..0b0731c8f4 100644 --- a/common/src/main/scala/org/apache/spark/sql/comet/execution/arrow/CometArrowConverters.scala +++ b/common/src/main/scala/org/apache/spark/sql/comet/execution/arrow/CometArrowConverters.scala @@ -27,10 +27,10 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.comet.util.Utils import org.apache.spark.sql.types.StructType -import org.apache.spark.sql.vectorized.{ArrowColumnVector, ColumnarArray, ColumnarBatch} +import org.apache.spark.sql.vectorized.{ColumnarArray, ColumnarBatch} import org.apache.comet.CometArrowAllocator -import org.apache.comet.vector.{CometVector, NativeUtil} +import org.apache.comet.vector.{CDataUtil, NativeUtil} object CometArrowConverters extends Logging { // This is similar how Spark converts internal row to Arrow format except that it is transforming @@ -145,7 +145,8 @@ object CometArrowConverters extends Logging { schema: StructType, maxRecordsPerBatch: Int, timeZoneId: String, - context: TaskContext) + context: TaskContext, + zeroCopyExportFn: Option[(Int, Long, Long) => Unit] = None) extends ArrowBatchIterBase(schema, timeZoneId, context) with AutoCloseable { @@ -159,6 +160,21 @@ object CometArrowConverters extends Logging { override protected def nextBatch(): ColumnarBatch = { val rowsInBatch = colBatch.numRows() if (rowsProduced < rowsInBatch) { + // On the first call, try zero-copy if an export function was provided + if (rowsProduced == 0 && zeroCopyExportFn.isDefined) { + try { + val zeroCopy = CDataUtil.importBatch( + colBatch.numCols(), + rowsInBatch, + allocator, + zeroCopyExportFn.get) + rowsProduced = rowsInBatch + return zeroCopy + } catch { + case e: Exception => + logWarning("Zero-copy C Data import failed, falling back to copy", e) + } + } // the arrow writer shall be reset before writing the next batch arrowWriter.reset() val rowsToProduce = @@ -185,35 +201,19 @@ object CometArrowConverters extends Logging { } } - /** - * Attempts zero-copy conversion of a ColumnarBatch whose columns are all ArrowColumnVector - * instances. Returns Some(iterator) if successful, None if the batch is not Arrow-backed. - */ - def tryZeroCopyConvert(batch: ColumnarBatch): Option[Iterator[ColumnarBatch]] = { - val numCols = batch.numCols() - if (numCols == 0) return None - - // Check that every column is an ArrowColumnVector - var i = 0 - while (i < numCols) { - if (!batch.column(i).isInstanceOf[ArrowColumnVector]) return None - i += 1 - } - - // All columns are Arrow-backed; wrap their ValueVectors as CometVectors (zero-copy) - val cometVectors = (0 until numCols).map { idx => - val valueVector = batch.column(idx).asInstanceOf[ArrowColumnVector].getValueVector - CometVector.getVector(valueVector, true, null) - } - Some(Iterator(new ColumnarBatch(cometVectors.toArray, batch.numRows()))) - } - def columnarBatchToArrowBatchIter( colBatch: ColumnarBatch, schema: StructType, maxRecordsPerBatch: Int, timeZoneId: String, - context: TaskContext): Iterator[ColumnarBatch] = { - new ColumnBatchToArrowBatchIter(colBatch, schema, maxRecordsPerBatch, timeZoneId, context) + context: TaskContext, + zeroCopyExportFn: Option[(Int, Long, Long) => Unit] = None): Iterator[ColumnarBatch] = { + new ColumnBatchToArrowBatchIter( + colBatch, + schema, + maxRecordsPerBatch, + timeZoneId, + context, + zeroCopyExportFn) } } diff --git a/spark/pom.xml b/spark/pom.xml index 1b207288c9..4292546a83 100644 --- a/spark/pom.xml +++ b/spark/pom.xml @@ -129,8 +129,10 @@ under the License. to provide InMemoryKMS class that is shaded below, to make Spark test happy. --> + depends on arrow-vector. However, arrow-c-data is used by ArrowCDataExport for zero-copy + conversion (provided scope since it may not be available in all runtime environments). + arrow-memory-unsafe is still needed for tests (Maven shading in common happens in + 'package' phase which is after 'test'). --> org.apache.arrow arrow-memory-unsafe @@ -139,7 +141,7 @@ under the License. org.apache.arrow arrow-c-data - test + provided org.apache.hadoop diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/ArrowCDataExport.scala b/spark/src/main/scala/org/apache/spark/sql/comet/ArrowCDataExport.scala new file mode 100644 index 0000000000..1b78e97651 --- /dev/null +++ b/spark/src/main/scala/org/apache/spark/sql/comet/ArrowCDataExport.scala @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.comet + +import org.apache.spark.sql.vectorized.{ArrowColumnVector, ColumnarBatch} + +/** + * Creates export functions for zero-copy transfer of unshaded Arrow vectors through the C Data + * Interface. + * + * This lives in the spark module where unshaded Arrow types are directly available, eliminating + * the need for reflection. The export function writes into pre-allocated C Data struct addresses + * provided by the common module's shaded side. + * + * At runtime, arrow-c-data may not be on the classpath (Spark does not bundle it). The + * [[cDataAvailable]] check ensures graceful degradation to the copy-based path. + */ +object ArrowCDataExport { + + /** Whether unshaded arrow-c-data classes are available on the runtime classpath. */ + private lazy val cDataAvailable: Boolean = { + try { + Class.forName("org.apache.arrow.c.Data") // scalastyle:ignore classforname + true + } catch { + case _: ClassNotFoundException => false + } + } + + /** + * Returns an export function if the batch is entirely backed by [[ArrowColumnVector]] and the + * arrow-c-data library is available at runtime. Returns [[None]] otherwise. + * + * The returned function has signature `(colIndex, arrayAddr, schemaAddr) => Unit`. When called, + * it exports the unshaded Arrow vector at the given column index into the C Data structs at the + * provided memory addresses. + */ + def makeExportFn(batch: ColumnarBatch): Option[(Int, Long, Long) => Unit] = { + if (!cDataAvailable) return None + if (batch.numCols() == 0) return None + + var i = 0 + while (i < batch.numCols()) { + if (!batch.column(i).isInstanceOf[ArrowColumnVector]) return None + i += 1 + } + + Some(CDataExporter.exportFn(batch)) + } + + /** + * Isolated object that references arrow-c-data classes. The JVM will not load this object (and + * therefore will not attempt to resolve [[org.apache.arrow.c.Data]] etc.) until it is first + * accessed, which only happens after [[cDataAvailable]] confirms the classes exist. + */ + private object CDataExporter { + def exportFn(batch: ColumnarBatch): (Int, Long, Long) => Unit = { + (colIdx: Int, arrayAddr: Long, schemaAddr: Long) => + { + val arrowCol = batch.column(colIdx).asInstanceOf[ArrowColumnVector] + val fv = + arrowCol.getValueVector.asInstanceOf[org.apache.arrow.vector.FieldVector] + org.apache.arrow.c.Data.exportVector( + fv.getAllocator, + fv, + null, + org.apache.arrow.c.ArrowArray.wrap(arrayAddr), + org.apache.arrow.c.ArrowSchema.wrap(schemaAddr)) + } + } + } +} diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometSparkToColumnarExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometSparkToColumnarExec.scala index 3a0695f009..0df05375cf 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometSparkToColumnarExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometSparkToColumnarExec.scala @@ -104,18 +104,17 @@ case class CometSparkToColumnarExec(child: SparkPlan) child .executeColumnar() .mapPartitionsInternal { sparkBatches => + val context = TaskContext.get() val arrowBatches = sparkBatches.flatMap { sparkBatch => - CometArrowConverters.tryZeroCopyConvert(sparkBatch).getOrElse { - // Fallback: element-by-element copy via ArrowWriter - val context = TaskContext.get() - CometArrowConverters.columnarBatchToArrowBatchIter( - sparkBatch, - schema, - maxRecordsPerBatch, - timeZoneId, - context) - } + val exportFn = ArrowCDataExport.makeExportFn(sparkBatch) + CometArrowConverters.columnarBatchToArrowBatchIter( + sparkBatch, + schema, + maxRecordsPerBatch, + timeZoneId, + context, + exportFn) } createTimingIter(arrowBatches, numInputRows, numOutputBatches, conversionTime) } diff --git a/spark/src/test/scala/org/apache/comet/exec/CometArrowConvertersSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometArrowConvertersSuite.scala new file mode 100644 index 0000000000..3c7775afa8 --- /dev/null +++ b/spark/src/test/scala/org/apache/comet/exec/CometArrowConvertersSuite.scala @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.exec + +import org.apache.arrow.memory.RootAllocator +import org.apache.arrow.vector.{IntVector, VarCharVector} +import org.apache.spark.sql.CometTestBase +import org.apache.spark.sql.comet.ArrowCDataExport +import org.apache.spark.sql.comet.execution.arrow.CometArrowConverters +import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector +import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} +import org.apache.spark.sql.vectorized.{ArrowColumnVector, ColumnarBatch} + +import org.apache.comet.CometArrowAllocator +import org.apache.comet.vector.{CDataUtil, CometVector} + +class CometArrowConvertersSuite extends CometTestBase { + + test("zero-copy import via ArrowCDataExport and CDataUtil.importBatch") { + val srcAllocator = new RootAllocator(Long.MaxValue) + val cometAllocator = + CometArrowAllocator.newChildAllocator("test-zero-copy", 0, Long.MaxValue) + try { + val intVector = new IntVector("intCol", srcAllocator) + intVector.allocateNew(3) + intVector.set(0, 10) + intVector.set(1, 20) + intVector.setNull(2) + intVector.setValueCount(3) + + val varcharVector = new VarCharVector("strCol", srcAllocator) + varcharVector.allocateNew() + varcharVector.setSafe(0, "hello".getBytes) + varcharVector.setSafe(1, "world".getBytes) + varcharVector.setNull(2) + varcharVector.setValueCount(3) + + val arrowCol0 = new ArrowColumnVector(intVector) + val arrowCol1 = new ArrowColumnVector(varcharVector) + val inputBatch = new ColumnarBatch(Array(arrowCol0, arrowCol1), 3) + + val exportFn = ArrowCDataExport.makeExportFn(inputBatch) + assert(exportFn.isDefined, "Should detect ArrowColumnVector and return Some") + + val outputBatch = CDataUtil.importBatch(2, 3, cometAllocator, exportFn.get) + assert(outputBatch.numRows() == 3) + assert(outputBatch.numCols() == 2) + assert(outputBatch.column(0).isInstanceOf[CometVector]) + assert(outputBatch.column(1).isInstanceOf[CometVector]) + + assert(outputBatch.column(0).getInt(0) == 10) + assert(outputBatch.column(0).getInt(1) == 20) + assert(outputBatch.column(0).isNullAt(2)) + assert(outputBatch.column(1).getUTF8String(0).toString == "hello") + assert(outputBatch.column(1).getUTF8String(1).toString == "world") + assert(outputBatch.column(1).isNullAt(2)) + + outputBatch.close() + inputBatch.close() + } finally { + cometAllocator.close() + srcAllocator.close() + } + } + + test("ArrowCDataExport returns None for non-Arrow batches") { + val sparkCol = new OnHeapColumnVector(10, IntegerType) + val batch = new ColumnarBatch(Array(sparkCol), 10) + + try { + val result = ArrowCDataExport.makeExportFn(batch) + assert(result.isEmpty, "Should return None for non-ArrowColumnVector batches") + } finally { + batch.close() + } + } + + test("columnarBatchToArrowBatchIter works for ArrowColumnVector input") { + val srcAllocator = new RootAllocator(Long.MaxValue) + try { + val intVector = new IntVector("intCol", srcAllocator) + intVector.allocateNew(3) + intVector.set(0, 10) + intVector.set(1, 20) + intVector.setNull(2) + intVector.setValueCount(3) + + val varcharVector = new VarCharVector("strCol", srcAllocator) + varcharVector.allocateNew() + varcharVector.setSafe(0, "hello".getBytes) + varcharVector.setSafe(1, "world".getBytes) + varcharVector.setNull(2) + varcharVector.setValueCount(3) + + val arrowCol0 = new ArrowColumnVector(intVector) + val arrowCol1 = new ArrowColumnVector(varcharVector) + val batch = new ColumnarBatch(Array(arrowCol0, arrowCol1), 3) + val schema = + StructType(Seq(StructField("intCol", IntegerType), StructField("strCol", StringType))) + + val exportFn = ArrowCDataExport.makeExportFn(batch) + val iter = CometArrowConverters.columnarBatchToArrowBatchIter( + batch, + schema, + maxRecordsPerBatch = 0, + "UTC", + context = null, + exportFn) + + assert(iter.hasNext) + val outputBatch = iter.next() + assert(outputBatch.numRows() == 3) + assert(outputBatch.numCols() == 2) + + assert(outputBatch.column(0).getInt(0) == 10) + assert(outputBatch.column(0).getInt(1) == 20) + assert(outputBatch.column(0).isNullAt(2)) + assert(outputBatch.column(1).getUTF8String(0).toString == "hello") + assert(outputBatch.column(1).getUTF8String(1).toString == "world") + assert(outputBatch.column(1).isNullAt(2)) + + assert(!iter.hasNext) + batch.close() + } finally { + srcAllocator.close() + } + } + + test("columnarBatchToArrowBatchIter falls back for non-Arrow batches") { + val sparkCol = new OnHeapColumnVector(10, IntegerType) + for (i <- 0 until 10) { + sparkCol.putInt(i, i * 100) + } + val batch = new ColumnarBatch(Array(sparkCol), 10) + + val iterSchema = StructType(Seq(StructField("col", IntegerType))) + val iter = CometArrowConverters.columnarBatchToArrowBatchIter( + batch, + iterSchema, + maxRecordsPerBatch = 0, + "UTC", + context = null) + + assert(iter.hasNext) + val outputBatch = iter.next() + assert(outputBatch.numRows() == 10) + assert(outputBatch.column(0).getInt(0) == 0) + assert(outputBatch.column(0).getInt(9) == 900) + + assert(!iter.hasNext) + batch.close() + } +} diff --git a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala index 839b715bb4..bcbbdb7f92 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala @@ -2159,78 +2159,6 @@ class CometExecSuite extends CometTestBase { } } - test("SparkToColumnar zero-copy for ArrowColumnVector input") { - import org.apache.arrow.memory.RootAllocator - import org.apache.arrow.vector.{IntVector, VarCharVector} - import org.apache.spark.sql.vectorized.{ArrowColumnVector, ColumnarBatch} - import org.apache.spark.sql.comet.execution.arrow.CometArrowConverters - import org.apache.comet.vector.CometVector - - val allocator = new RootAllocator(Long.MaxValue) - try { - // Create Arrow vectors with test data - val intVector = new IntVector("intCol", allocator) - intVector.allocateNew(3) - intVector.set(0, 10) - intVector.set(1, 20) - intVector.setNull(2) - intVector.setValueCount(3) - - val varcharVector = new VarCharVector("strCol", allocator) - varcharVector.allocateNew() - varcharVector.setSafe(0, "hello".getBytes) - varcharVector.setSafe(1, "world".getBytes) - varcharVector.setNull(2) - varcharVector.setValueCount(3) - - // Wrap in Spark's ArrowColumnVector - val arrowCol0 = new ArrowColumnVector(intVector) - val arrowCol1 = new ArrowColumnVector(varcharVector) - val inputBatch = new ColumnarBatch(Array(arrowCol0, arrowCol1), 3) - - // Zero-copy conversion should succeed - val result = CometArrowConverters.tryZeroCopyConvert(inputBatch) - assert(result.isDefined, "Should detect ArrowColumnVector and return Some") - - val outputBatch = result.get.next() - assert(outputBatch.numRows() == 3) - assert(outputBatch.numCols() == 2) - - // Verify columns are CometVectors wrapping the same underlying ValueVectors (zero-copy) - val outCol0 = outputBatch.column(0).asInstanceOf[CometVector] - val outCol1 = outputBatch.column(1).asInstanceOf[CometVector] - assert(outCol0.getValueVector eq intVector, "Should be the same ValueVector instance") - assert(outCol1.getValueVector eq varcharVector, "Should be the same ValueVector instance") - - // Verify data is accessible through the CometVector wrappers - assert(outCol0.getInt(0) == 10) - assert(outCol0.getInt(1) == 20) - assert(outCol0.isNullAt(2)) - assert(outCol1.getUTF8String(0).toString == "hello") - assert(outCol1.getUTF8String(1).toString == "world") - assert(outCol1.isNullAt(2)) - - inputBatch.close() - } finally { - allocator.close() - } - } - - test("SparkToColumnar tryZeroCopyConvert returns None for non-Arrow batches") { - import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector - import org.apache.spark.sql.vectorized.ColumnarBatch - import org.apache.spark.sql.comet.execution.arrow.CometArrowConverters - import org.apache.spark.sql.types.IntegerType - - val sparkCol = new OnHeapColumnVector(10, IntegerType) - val batch = new ColumnarBatch(Array(sparkCol), 10) - - val result = CometArrowConverters.tryZeroCopyConvert(batch) - assert(result.isEmpty, "Should return None for non-ArrowColumnVector batches") - - batch.close() - } - test("LocalTableScanExec spark fallback") { withSQLConf(CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED.key -> "false") { val df = Seq.range(0, 10).toDF("id") From 40003d34faf8f89cdd511a26e96867929713b4d3 Mon Sep 17 00:00:00 2001 From: tokoko Date: Sat, 21 Feb 2026 21:01:41 +0100 Subject: [PATCH 3/4] fix: ignore c-data classes --- pom.xml | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/pom.xml b/pom.xml index 2eaccd37d4..070fb32721 100644 --- a/pom.xml +++ b/pom.xml @@ -1101,6 +1101,7 @@ under the License. dev/release/requirements.txt native/proto/src/generated/** benchmarks/tpc/queries/** + pixi.lock @@ -1167,6 +1168,20 @@ under the License. com.google.thirdparty.publicsuffix.PublicSuffixType + + org.apache.arrow + arrow-c-data + + + org.apache.arrow.c.jni.JniWrapper + org.apache.arrow.c.jni.PrivateData + org.apache.arrow.c.jni.CDataJniException + org.apache.arrow.c.ArrayStreamExporter$ExportedArrayStreamPrivateData + + true true From 951340c90c3e1e667ccc3f1104632021fe2db5e9 Mon Sep 17 00:00:00 2001 From: tokoko Date: Sun, 22 Feb 2026 08:55:24 +0100 Subject: [PATCH 4/4] fix: shading issue in test --- .../org/apache/comet/vector/CDataUtil.scala | 16 ++++ .../exec/CometArrowConvertersSuite.scala | 85 +++++++++---------- 2 files changed, 58 insertions(+), 43 deletions(-) diff --git a/common/src/main/scala/org/apache/comet/vector/CDataUtil.scala b/common/src/main/scala/org/apache/comet/vector/CDataUtil.scala index 2069b620c8..3dd1a2ae2f 100644 --- a/common/src/main/scala/org/apache/comet/vector/CDataUtil.scala +++ b/common/src/main/scala/org/apache/comet/vector/CDataUtil.scala @@ -23,6 +23,8 @@ import org.apache.arrow.c.{ArrowArray, ArrowImporter, ArrowSchema, CDataDictiona import org.apache.arrow.memory.BufferAllocator import org.apache.spark.sql.vectorized.ColumnarBatch +import org.apache.comet.CometArrowAllocator + /** * Import-only C Data Interface bridge for Comet's shaded Arrow side. * @@ -36,6 +38,20 @@ import org.apache.spark.sql.vectorized.ColumnarBatch */ object CDataUtil { + /** + * Imports a columnar batch from the C Data Interface using a child of the global + * [[CometArrowAllocator]]. This is the preferred entry point from the spark module since it + * avoids passing a shaded allocator type across the shading boundary. + */ + def importBatch( + numCols: Int, + numRows: Int, + exportFn: (Int, Long, Long) => Unit): ColumnarBatch = { + val allocator = + CometArrowAllocator.newChildAllocator("CDataUtil-import", 0, Long.MaxValue) + importBatch(numCols, numRows, allocator, exportFn) + } + /** * Imports a columnar batch from the C Data Interface. * diff --git a/spark/src/test/scala/org/apache/comet/exec/CometArrowConvertersSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometArrowConvertersSuite.scala index 3c7775afa8..44bbd68ea7 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometArrowConvertersSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometArrowConvertersSuite.scala @@ -28,56 +28,55 @@ import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} import org.apache.spark.sql.vectorized.{ArrowColumnVector, ColumnarBatch} -import org.apache.comet.CometArrowAllocator import org.apache.comet.vector.{CDataUtil, CometVector} class CometArrowConvertersSuite extends CometTestBase { test("zero-copy import via ArrowCDataExport and CDataUtil.importBatch") { val srcAllocator = new RootAllocator(Long.MaxValue) - val cometAllocator = - CometArrowAllocator.newChildAllocator("test-zero-copy", 0, Long.MaxValue) - try { - val intVector = new IntVector("intCol", srcAllocator) - intVector.allocateNew(3) - intVector.set(0, 10) - intVector.set(1, 20) - intVector.setNull(2) - intVector.setValueCount(3) - - val varcharVector = new VarCharVector("strCol", srcAllocator) - varcharVector.allocateNew() - varcharVector.setSafe(0, "hello".getBytes) - varcharVector.setSafe(1, "world".getBytes) - varcharVector.setNull(2) - varcharVector.setValueCount(3) - val arrowCol0 = new ArrowColumnVector(intVector) - val arrowCol1 = new ArrowColumnVector(varcharVector) - val inputBatch = new ColumnarBatch(Array(arrowCol0, arrowCol1), 3) - - val exportFn = ArrowCDataExport.makeExportFn(inputBatch) - assert(exportFn.isDefined, "Should detect ArrowColumnVector and return Some") - - val outputBatch = CDataUtil.importBatch(2, 3, cometAllocator, exportFn.get) - assert(outputBatch.numRows() == 3) - assert(outputBatch.numCols() == 2) - assert(outputBatch.column(0).isInstanceOf[CometVector]) - assert(outputBatch.column(1).isInstanceOf[CometVector]) - - assert(outputBatch.column(0).getInt(0) == 10) - assert(outputBatch.column(0).getInt(1) == 20) - assert(outputBatch.column(0).isNullAt(2)) - assert(outputBatch.column(1).getUTF8String(0).toString == "hello") - assert(outputBatch.column(1).getUTF8String(1).toString == "world") - assert(outputBatch.column(1).isNullAt(2)) - - outputBatch.close() - inputBatch.close() - } finally { - cometAllocator.close() - srcAllocator.close() - } + val intVector = new IntVector("intCol", srcAllocator) + intVector.allocateNew(3) + intVector.set(0, 10) + intVector.set(1, 20) + intVector.setNull(2) + intVector.setValueCount(3) + + val varcharVector = new VarCharVector("strCol", srcAllocator) + varcharVector.allocateNew() + varcharVector.setSafe(0, "hello".getBytes) + varcharVector.setSafe(1, "world".getBytes) + varcharVector.setNull(2) + varcharVector.setValueCount(3) + + val arrowCol0 = new ArrowColumnVector(intVector) + val arrowCol1 = new ArrowColumnVector(varcharVector) + val inputBatch = new ColumnarBatch(Array(arrowCol0, arrowCol1), 3) + + val exportFn = ArrowCDataExport.makeExportFn(inputBatch) + assert(exportFn.isDefined, "Should detect ArrowColumnVector and return Some") + + // Use the no-allocator overload which uses the shaded CometArrowAllocator internally, + // avoiding the need to pass a shaded allocator type across the shading boundary. + val outputBatch = CDataUtil.importBatch(2, 3, exportFn.get) + assert(outputBatch.numRows() == 3) + assert(outputBatch.numCols() == 2) + assert(outputBatch.column(0).isInstanceOf[CometVector]) + assert(outputBatch.column(1).isInstanceOf[CometVector]) + + assert(outputBatch.column(0).getInt(0) == 10) + assert(outputBatch.column(0).getInt(1) == 20) + assert(outputBatch.column(0).isNullAt(2)) + assert(outputBatch.column(1).getUTF8String(0).toString == "hello") + assert(outputBatch.column(1).getUTF8String(1).toString == "world") + assert(outputBatch.column(1).isNullAt(2)) + + // Close the imported batch first (triggers C Data release callback), + // then close the source batch. The child allocator under CometArrowAllocator + // is not explicitly closed here, matching the production lifecycle in + // ColumnBatchToArrowBatchIter. + outputBatch.close() + inputBatch.close() } test("ArrowCDataExport returns None for non-Arrow batches") {