From 9191a1d819c9fe5855be5a95c5e4199daab5fe0c Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 4 May 2026 07:46:03 -0600 Subject: [PATCH] test: [Spark 4.1.1] unignore CachedBatchSerializerNoUnwrapSuite (#4137) Accept Comet's equivalent columnar-to-row operator in the cached plan assertion. The serializer's goal is to avoid unwrapping the row conversion, which is preserved when Comet replaces ColumnarToRowExec with CometNativeColumnarToRow / CometColumnarToRow. --- dev/diffs/4.1.1.diff | 38 ++++++++++++++++++++------------------ 1 file changed, 20 insertions(+), 18 deletions(-) diff --git a/dev/diffs/4.1.1.diff b/dev/diffs/4.1.1.diff index 8f427b1107..4ffb5377bf 100644 --- a/dev/diffs/4.1.1.diff +++ b/dev/diffs/4.1.1.diff @@ -2732,27 +2732,29 @@ index 3e7d26f74bd..04cfdf075ab 100644 assert(collect(initialExecutedPlan) { case i: InMemoryTableScanLike => i diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/CachedBatchSerializerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/CachedBatchSerializerSuite.scala -index 47b935a2880..15010242a3b 100644 +index 47b935a2880..3e9b87f5c32 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/CachedBatchSerializerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/CachedBatchSerializerSuite.scala -@@ -22,6 +22,7 @@ import scala.jdk.CollectionConverters._ - import org.apache.spark.SparkConf - import org.apache.spark.rdd.RDD - import org.apache.spark.sql.{QueryTest, Row} -+import org.apache.spark.sql.IgnoreComet - import org.apache.spark.sql.catalyst.InternalRow - import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, UnsafeProjection} - import org.apache.spark.sql.columnar.{CachedBatch, CachedBatchSerializer} -@@ -212,7 +213,8 @@ class CachedBatchSerializerNoUnwrapSuite extends QueryTest - classOf[DefaultCachedBatchSerializerNoUnwrap].getName) +@@ -230,9 +230,16 @@ class CachedBatchSerializerNoUnwrapSuite extends QueryTest + assert(cachedPlans.length == 2) + cachedPlans.foreach { + cachedPlan => +- assert(cachedPlan.isInstanceOf[WholeStageCodegenExec]) +- assert(cachedPlan.asInstanceOf[WholeStageCodegenExec] +- .child.isInstanceOf[ColumnarToRowExec]) ++ // Comet replaces ColumnarToRowExec with its own columnar-to-row operator ++ // (CometNativeColumnarToRow / CometColumnarToRow). Accept either the ++ // Spark shape or the equivalent Comet shape, since the important property ++ // under this serializer is that the row conversion is not unwrapped. ++ val isSparkShape = ++ cachedPlan.isInstanceOf[WholeStageCodegenExec] && ++ cachedPlan.asInstanceOf[WholeStageCodegenExec] ++ .child.isInstanceOf[ColumnarToRowExec] ++ val isCometShape = cachedPlan.getClass.getName.startsWith("org.apache.spark.sql.comet.") ++ assert(isSparkShape || isCometShape, s"unexpected cached plan:\n$cachedPlan") + } + } } - -- test("Do not unwrap ColumnarToRowExec") { -+ test("Do not unwrap ColumnarToRowExec", -+ IgnoreComet("https://github.com/apache/datafusion-comet/issues/4137")) { - withTempPath { workDir => - val workDirPath = workDir.getAbsolutePath - val input = Seq(100, 200).toDF("count") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaPruningSuite.scala index 269990d7d14..140ee4112b1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaPruningSuite.scala