From 6b34e8f101f72ffa156e139967743dc8bdc40e9b Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sun, 3 May 2026 11:09:51 -0600 Subject: [PATCH 1/6] fix: preserve parent struct nullness when all requested fields missing in Parquet Closes #4136. Spark 4.1 (SPARK-53535) added LEGACY_PARQUET_RETURN_NULL_STRUCT_IF_ALL_FIELDS_MISSING. With the new default (false), Spark's vectorized reader appends a "marker" leaf field to the Parquet read schema so it can distinguish a null parent row from a non-null parent whose requested fields are all missing in the file, then truncates the marker out of the ColumnarBatch. Comet's `parquet_convert_struct_to_struct` unconditionally collapsed a non-overlapping struct to a fully-null array (`NullBuffer::new_null`), conflating the two cases. This change adds `return_null_struct_if_all_fields_missing` to `SparkParquetOptions` (default `true` for backwards compat) and plumbs it through both scan paths: - native_datafusion: read in CometNativeScan.convert from the Spark conf, pass via the new NativeScanCommon proto field, threaded through init_datasource_exec. - native_iceberg_compat: read in CometParquetFileFormat, pass through the NativeBatchReader constructor and a new initRecordBatchReader JNI parameter. For both paths the JVM defaults to `"true"` on Spark < 4.1 (legacy hardcoded behavior) and `"false"` on Spark 4.1+ (matches Spark's registered conf default), with explicit user settings honored. When the flag is false and no requested fields overlap with the file's struct, parquet_convert_struct_to_struct now preserves `array.nulls()` so the parent's nullness from the file is propagated. Reproducer: new test in CometNativeReaderSuite parameterized over both scan impls. Removes the IgnoreComet annotations from the 5 SPARK-53535 / SPARK-54220 ParquetIOSuite tests in dev/diffs/4.1.1.diff. --- .../java/org/apache/comet/parquet/Native.java | 1 + .../comet/parquet/NativeBatchReader.java | 8 ++ dev/diffs/4.1.1.diff | 107 +----------------- native/core/src/execution/planner.rs | 1 + native/core/src/parquet/mod.rs | 2 + native/core/src/parquet/parquet_exec.rs | 5 + native/core/src/parquet/parquet_support.rs | 19 +++- native/proto/src/proto/operator.proto | 5 + .../parquet/CometParquetFileFormat.scala | 11 ++ .../serde/operator/CometNativeScan.scala | 13 ++- .../comet/exec/CometNativeReaderSuite.scala | 49 +++++++- 11 files changed, 112 insertions(+), 109 deletions(-) diff --git a/common/src/main/java/org/apache/comet/parquet/Native.java b/common/src/main/java/org/apache/comet/parquet/Native.java index ec375402cb..fbe7f23875 100644 --- a/common/src/main/java/org/apache/comet/parquet/Native.java +++ b/common/src/main/java/org/apache/comet/parquet/Native.java @@ -226,6 +226,7 @@ public static native long initRecordBatchReader( String sessionTimezone, int batchSize, boolean caseSensitive, + boolean returnNullStructIfAllFieldsMissing, Map objectStoreOptions, CometFileKeyUnwrapper keyUnwrapper, Object metricsNode); diff --git a/common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java b/common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java index 9413b9316d..a2ee4963d9 100644 --- a/common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java +++ b/common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java @@ -159,6 +159,11 @@ public URI pathUri() throws URISyntaxException { protected boolean isCaseSensitive; protected boolean useFieldId; protected boolean ignoreMissingIds; + // SPARK-53535 (Spark 4.1+): when reading a struct whose requested fields are all + // missing in the Parquet file, true returns the entire struct as null (legacy + // pre-4.1 behavior); false preserves the parent struct's nullness from the file + // so non-null parents materialize as a struct of all-null fields. + protected boolean returnNullStructIfAllFieldsMissing = true; protected StructType partitionSchema; protected InternalRow partitionValues; protected PartitionedFile file; @@ -278,6 +283,7 @@ private NativeBatchReader(AbstractColumnReader[] columnReaders) { boolean useFieldId, boolean ignoreMissingIds, boolean useLegacyDateTimestamp, + boolean returnNullStructIfAllFieldsMissing, StructType partitionSchema, InternalRow partitionValues, Map metrics, @@ -290,6 +296,7 @@ private NativeBatchReader(AbstractColumnReader[] columnReaders) { this.useFieldId = useFieldId; this.ignoreMissingIds = ignoreMissingIds; this.useLegacyDateTimestamp = useLegacyDateTimestamp; + this.returnNullStructIfAllFieldsMissing = returnNullStructIfAllFieldsMissing; this.partitionSchema = partitionSchema; this.partitionValues = partitionValues; this.file = inputSplit; @@ -578,6 +585,7 @@ public void init() throws Throwable { timeZoneId, batchSize, caseSensitive, + returnNullStructIfAllFieldsMissing, objectStoreOptions, keyUnwrapper, metricsNode); diff --git a/dev/diffs/4.1.1.diff b/dev/diffs/4.1.1.diff index ac56840013..6d35aeef63 100644 --- a/dev/diffs/4.1.1.diff +++ b/dev/diffs/4.1.1.diff @@ -931,57 +931,6 @@ index 95e86fe4311..0f7ed3271d4 100644 }.flatten assert(filters.contains(GreaterThan(scan.logicalPlan.output.head, Literal(5L)))) } -diff --git a/sql/core/src/test/scala/org/apache/spark/sql/IgnoreComet.scala b/sql/core/src/test/scala/org/apache/spark/sql/IgnoreComet.scala -new file mode 100644 -index 00000000000..5691536c114 ---- /dev/null -+++ b/sql/core/src/test/scala/org/apache/spark/sql/IgnoreComet.scala -@@ -0,0 +1,45 @@ -+/* -+ * Licensed to the Apache Software Foundation (ASF) under one or more -+ * contributor license agreements. See the NOTICE file distributed with -+ * this work for additional information regarding copyright ownership. -+ * The ASF licenses this file to You under the Apache License, Version 2.0 -+ * (the "License"); you may not use this file except in compliance with -+ * the License. You may obtain a copy of the License at -+ * -+ * http://www.apache.org/licenses/LICENSE-2.0 -+ * -+ * Unless required by applicable law or agreed to in writing, software -+ * distributed under the License is distributed on an "AS IS" BASIS, -+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -+ * See the License for the specific language governing permissions and -+ * limitations under the License. -+ */ -+ -+package org.apache.spark.sql -+ -+import org.scalactic.source.Position -+import org.scalatest.Tag -+ -+import org.apache.spark.sql.test.SQLTestUtils -+ -+/** -+ * Tests with this tag will be ignored when Comet is enabled (e.g., via `ENABLE_COMET`). -+ */ -+case class IgnoreComet(reason: String) extends Tag("DisableComet") -+case class IgnoreCometNativeIcebergCompat(reason: String) extends Tag("DisableComet") -+case class IgnoreCometNativeDataFusion(reason: String) extends Tag("DisableComet") -+case class IgnoreCometNativeScan(reason: String) extends Tag("DisableComet") -+ -+/** -+ * Helper trait that disables Comet for all tests regardless of default config values. -+ */ -+trait IgnoreCometSuite extends SQLTestUtils { -+ override protected def test(testName: String, testTags: Tag*)(testFun: => Any) -+ (implicit pos: Position): Unit = { -+ if (isCometEnabled) { -+ ignore(testName + " (disabled when Comet is on)", testTags: _*)(testFun) -+ } else { -+ super.test(testName, testTags: _*)(testFun) -+ } -+ } -+} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinHintSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinHintSuite.scala index 53e47f428c3..a55d8f0c161 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JoinHintSuite.scala @@ -2993,7 +2942,7 @@ index 6b73cc8618d..624694916fb 100644 case _ => assert(false, "Can not match ParquetTable in the query.") } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala -index 3072657a095..b2293ccab17 100644 +index 3072657a095..3ea9b8543a9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -40,6 +40,7 @@ import org.apache.parquet.schema.{MessageType, MessageTypeParser} @@ -3004,57 +2953,7 @@ index 3072657a095..b2293ccab17 100644 import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Attribute, GenericInternalRow, UnsafeRow} import org.apache.spark.sql.catalyst.util.{DateTimeConstants, DateTimeUtils} -@@ -765,7 +766,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession - } - } - -- test("vectorized reader: missing all struct fields") { -+ test("vectorized reader: missing all struct fields", -+ IgnoreComet("https://github.com/apache/datafusion-comet/issues/4136")) { - for { - offheapEnabled <- Seq(true, false) - returnNullStructIfAllFieldsMissing <- Seq(true, false) -@@ -803,7 +805,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession - } - } - -- test("SPARK-53535: vectorized reader: missing all struct fields, struct with complex fields") { -+ test("SPARK-53535: vectorized reader: missing all struct fields, struct with complex fields", -+ IgnoreComet("https://github.com/apache/datafusion-comet/issues/4136")) { - val data = Seq( - Row(Row(Seq(11, 12, null, 14), Row("21", 22), Row(true)), 100), - Row(Row(Seq(11, 12, null, 14), Row("21", 22), Row(false)), 100), -@@ -858,7 +861,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession - } - } - -- test("SPARK-53535: vectorized reader: missing all struct fields, struct with map field only") { -+ test("SPARK-53535: vectorized reader: missing all struct fields, struct with map field only", -+ IgnoreComet("https://github.com/apache/datafusion-comet/issues/4136")) { - val data = Seq( - Row(Row(Map("key1" -> 1)), 100), - Row(Row(Map("key2" -> 2)), 100), -@@ -903,7 +907,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession - } - - test("SPARK-53535: vectorized reader: missing all struct fields, " + -- "struct with cheap map and more expensive array field") { -+ "struct with cheap map and more expensive array field", -+ IgnoreComet("https://github.com/apache/datafusion-comet/issues/4136")) { - val data = Seq( - Row(Row(Map(false -> Row("expensive", 1)), Seq("test1")), 100), - Row(Row(Map(true -> Row("expensive", 2)), Seq("test2")), 100), -@@ -953,7 +958,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession - } - } - -- test("SPARK-54220: vectorized reader: missing all struct fields, struct with NullType only") { -+ test("SPARK-54220: vectorized reader: missing all struct fields, struct with NullType only", -+ IgnoreComet("https://github.com/apache/datafusion-comet/issues/4136")) { - val data = Seq( - Tuple1((null, null)), - Tuple1((null, null)), -@@ -1282,7 +1288,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession +@@ -1282,7 +1283,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession } } @@ -3064,7 +2963,7 @@ index 3072657a095..b2293ccab17 100644 val data = (1 to 4).map(i => Tuple1(i.toString)) val readSchema = StructType(Seq(StructField("_1", DataTypes.TimestampType))) -@@ -1567,7 +1574,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession +@@ -1567,7 +1569,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession } } diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index f3df4b522c..13eb73eb50 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -1248,6 +1248,7 @@ impl PhysicalPlanner { default_values, common.session_timezone.as_str(), common.case_sensitive, + common.return_null_struct_if_all_fields_missing, self.session_ctx(), common.encryption_enabled, )?; diff --git a/native/core/src/parquet/mod.rs b/native/core/src/parquet/mod.rs index 61ff4fc0db..3d61251447 100644 --- a/native/core/src/parquet/mod.rs +++ b/native/core/src/parquet/mod.rs @@ -438,6 +438,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_parquet_Native_initRecordBat session_timezone: JString, batch_size: jint, case_sensitive: jboolean, + return_null_struct_if_all_fields_missing: jboolean, object_store_options: JObject, key_unwrapper_obj: JObject, metrics_node: JObject, @@ -511,6 +512,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_parquet_Native_initRecordBat None, session_timezone.as_str(), case_sensitive != JNI_FALSE, + return_null_struct_if_all_fields_missing != JNI_FALSE, session_ctx, encryption_enabled, )?; diff --git a/native/core/src/parquet/parquet_exec.rs b/native/core/src/parquet/parquet_exec.rs index ef4c878b9a..e67700e629 100644 --- a/native/core/src/parquet/parquet_exec.rs +++ b/native/core/src/parquet/parquet_exec.rs @@ -71,12 +71,14 @@ pub(crate) fn init_datasource_exec( default_values: Option>, session_timezone: &str, case_sensitive: bool, + return_null_struct_if_all_fields_missing: bool, session_ctx: &Arc, encryption_enabled: bool, ) -> Result, ExecutionError> { let (table_parquet_options, spark_parquet_options) = get_options( session_timezone, case_sensitive, + return_null_struct_if_all_fields_missing, &object_store_url, encryption_enabled, ); @@ -185,6 +187,7 @@ pub(crate) fn init_datasource_exec( fn get_options( session_timezone: &str, case_sensitive: bool, + return_null_struct_if_all_fields_missing: bool, object_store_url: &ObjectStoreUrl, encryption_enabled: bool, ) -> (TableParquetOptions, SparkParquetOptions) { @@ -196,6 +199,8 @@ fn get_options( SparkParquetOptions::new(EvalMode::Legacy, session_timezone, false); spark_parquet_options.allow_cast_unsigned_ints = true; spark_parquet_options.case_sensitive = case_sensitive; + spark_parquet_options.return_null_struct_if_all_fields_missing = + return_null_struct_if_all_fields_missing; if encryption_enabled { table_parquet_options.crypto.configure_factory( diff --git a/native/core/src/parquet/parquet_support.rs b/native/core/src/parquet/parquet_support.rs index 3418a17c43..8ccdf0a100 100644 --- a/native/core/src/parquet/parquet_support.rs +++ b/native/core/src/parquet/parquet_support.rs @@ -79,6 +79,11 @@ pub struct SparkParquetOptions { pub use_legacy_date_timestamp_or_ntz: bool, // Whether schema field names are case sensitive pub case_sensitive: bool, + /// SPARK-53535 (Spark 4.1+): when reading a struct whose requested fields are all + /// missing in the Parquet file, true returns the entire struct as null (pre-4.1 + /// legacy behavior); false preserves the parent struct's nullness from the file + /// so non-null parents return a struct of all-null fields. + pub return_null_struct_if_all_fields_missing: bool, } impl SparkParquetOptions { @@ -91,6 +96,7 @@ impl SparkParquetOptions { use_decimal_128: false, use_legacy_date_timestamp_or_ntz: false, case_sensitive: false, + return_null_struct_if_all_fields_missing: true, } } @@ -103,6 +109,7 @@ impl SparkParquetOptions { use_decimal_128: false, use_legacy_date_timestamp_or_ntz: false, case_sensitive: false, + return_null_struct_if_all_fields_missing: true, } } } @@ -279,9 +286,15 @@ fn parquet_convert_struct_to_struct( } } - // If target schema doesn't contain any of the existing fields - // mark such a column in array as NULL - let nulls = if field_overlap { + // When the file's struct contains none of the requested fields, the + // returned validity buffer depends on Spark's + // `spark.sql.legacy.parquet.returnNullStructIfAllFieldsMissing` (SPARK-53535, + // Spark 4.1+). Legacy mode marks the whole column null; the new default + // preserves the file's parent-row nullness so non-null parents materialize + // as a struct of all-null fields. + let nulls = if field_overlap + || !parquet_options.return_null_struct_if_all_fields_missing + { array.nulls().cloned() } else { Some(NullBuffer::new_null(array.len())) diff --git a/native/proto/src/proto/operator.proto b/native/proto/src/proto/operator.proto index fb438b26a4..4eadb0042f 100644 --- a/native/proto/src/proto/operator.proto +++ b/native/proto/src/proto/operator.proto @@ -107,6 +107,11 @@ message NativeScanCommon { bool encryption_enabled = 11; string source = 12; repeated spark.spark_expression.DataType fields = 13; + // SPARK-53535 (Spark 4.1+): when reading a struct whose requested fields are all + // missing in the Parquet file, true returns the entire struct as null (legacy + // pre-4.1 behavior); false preserves the parent struct's nullness from the file + // so non-null parents return a struct of all-null fields. + bool return_null_struct_if_all_fields_missing = 14; } message NativeScan { diff --git a/spark/src/main/scala/org/apache/comet/parquet/CometParquetFileFormat.scala b/spark/src/main/scala/org/apache/comet/parquet/CometParquetFileFormat.scala index c2728fcabf..0d178dbf81 100644 --- a/spark/src/main/scala/org/apache/comet/parquet/CometParquetFileFormat.scala +++ b/spark/src/main/scala/org/apache/comet/parquet/CometParquetFileFormat.scala @@ -43,6 +43,7 @@ import org.apache.spark.sql.types.{DateType, StructType, TimestampType} import org.apache.spark.util.SerializableConfiguration import org.apache.comet.CometConf +import org.apache.comet.CometSparkSessionExtensions.isSpark41Plus import org.apache.comet.MetricsSupport import org.apache.comet.shims.ShimSQLConf import org.apache.comet.vector.CometVector @@ -96,6 +97,15 @@ class CometParquetFileFormat(session: SparkSession) val isCaseSensitive = sqlConf.caseSensitiveAnalysis val useFieldId = CometParquetUtils.readFieldId(sqlConf) val ignoreMissingIds = CometParquetUtils.ignoreMissingIds(sqlConf) + // SPARK-53535 (Spark 4.1+): when reading a struct whose requested fields are all + // missing in the Parquet file, the new default preserves the parent struct's + // nullness from the file. Pre-4.1 Spark hardcodes the legacy behavior, so we + // default to "true" there for backwards compatibility. + val returnNullStructIfAllFieldsMissing = sqlConf + .getConfString( + "spark.sql.legacy.parquet.returnNullStructIfAllFieldsMissing", + if (isSpark41Plus) "false" else "true") + .toBoolean val pushDownDate = sqlConf.parquetFilterPushDownDate val pushDownTimestamp = sqlConf.parquetFilterPushDownTimestamp val pushDownDecimal = sqlConf.parquetFilterPushDownDecimal @@ -158,6 +168,7 @@ class CometParquetFileFormat(session: SparkSession) useFieldId, ignoreMissingIds, datetimeRebaseSpec.mode == CORRECTED, + returnNullStructIfAllFieldsMissing, partitionSchema, file.partitionValues, metrics.asJava, diff --git a/spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala b/spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala index 70f06f5741..066b770bbb 100644 --- a/spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala +++ b/spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala @@ -31,7 +31,7 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.comet.{CometConf, ConfigEntry} import org.apache.comet.CometConf.COMET_EXEC_ENABLED -import org.apache.comet.CometSparkSessionExtensions.{hasExplainInfo, isSpark35Plus, withInfo} +import org.apache.comet.CometSparkSessionExtensions.{hasExplainInfo, isSpark35Plus, isSpark41Plus, withInfo} import org.apache.comet.objectstore.NativeConfig import org.apache.comet.parquet.CometParquetUtils import org.apache.comet.serde.{CometOperatorSerde, Compatible, OperatorOuterClass, SupportLevel} @@ -189,6 +189,17 @@ object CometNativeScan extends CometOperatorSerde[CometScanExec] with Logging { commonBuilder.setSessionTimezone(scan.conf.getConfString("spark.sql.session.timeZone")) commonBuilder.setCaseSensitive(scan.conf.getConf[Boolean](SQLConf.CASE_SENSITIVE)) + // SPARK-53535 (Spark 4.1+): when reading a struct whose requested fields are all + // missing in the Parquet file, the new default preserves the parent struct's + // nullness from the file (so non-null parents materialize as a struct of all-null + // fields). Pre-4.1 Spark hardcodes the legacy behavior (whole struct null), which + // matches the Comet default we use as fallback. + val returnNullStructConfKey = + "spark.sql.legacy.parquet.returnNullStructIfAllFieldsMissing" + val returnNullStructDefault = if (isSpark41Plus) "false" else "true" + commonBuilder.setReturnNullStructIfAllFieldsMissing( + scan.conf.getConfString(returnNullStructConfKey, returnNullStructDefault).toBoolean) + // Collect S3/cloud storage configurations val hadoopConf = scan.relation.sparkSession.sessionState .newHadoopConfWithOptions(scan.relation.options) diff --git a/spark/src/test/scala/org/apache/comet/exec/CometNativeReaderSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometNativeReaderSuite.scala index b49cec0f5a..ea8cc962dd 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometNativeReaderSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometNativeReaderSuite.scala @@ -33,7 +33,7 @@ import org.apache.spark.sql.{CometTestBase, Row} import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper import org.apache.spark.sql.functions.{array, col} import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.{IntegerType, StringType, StructType} +import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructType} import org.apache.comet.CometConf import org.apache.comet.CometSparkSessionExtensions.isSpark41Plus @@ -685,6 +685,53 @@ class CometNativeReaderSuite extends CometTestBase with AdaptiveSparkPlanHelper } } + test("issue #4136: struct with all requested fields missing in file") { + // SPARK-53535 (Spark 4.1) added LEGACY_PARQUET_RETURN_NULL_STRUCT_IF_ALL_FIELDS_MISSING. + // With the new default (false), Spark's vectorized reader appends a "marker" leaf field to + // the Parquet read schema so it can distinguish a null parent row from a non-null parent + // whose requested fields are all missing from the file, then truncates the marker out of + // the ColumnarBatch. Comet's native scans don't implement this, so they conflate the two + // cases and return Row(null) for non-null parents. + assume( + isSpark41Plus, + "LEGACY_PARQUET_RETURN_NULL_STRUCT_IF_ALL_FIELDS_MISSING was introduced in Spark 4.1") + + val tableSchema = new StructType().add( + "_1", + new StructType() + .add("_1", IntegerType) + .add("_2", StringType), + nullable = true) + + // Read schema requests _3, _4 — fields that don't exist in the file's _1 struct. + val readSchema = new StructType().add( + "_1", + new StructType() + .add("_3", IntegerType, nullable = true) + .add("_4", LongType, nullable = true), + nullable = true) + + val data = java.util.Arrays.asList( + Row(Row(1, "a")), // non-null parent, requested fields missing in file + Row(Row(2, null)), // non-null parent, requested fields missing in file + Row(null) // null parent + ) + + withTempPath { path => + spark + .createDataFrame(data, tableSchema) + .write + .parquet(path.getCanonicalPath) + + Seq("true", "false").foreach { legacy => + withSQLConf("spark.sql.legacy.parquet.returnNullStructIfAllFieldsMissing" -> legacy) { + val df = spark.read.schema(readSchema).parquet(path.getCanonicalPath) + checkSparkAnswer(df) + } + } + } + } + /** Write a Parquet file using a raw RecordConsumer for full schema control. */ private def writeDirect( path: String, From 43d7a34010965dac317dc9f8df3c0fb0ae692b3a Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sun, 3 May 2026 11:13:18 -0600 Subject: [PATCH 2/6] style: rustfmt --- native/core/src/parquet/parquet_support.rs | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/native/core/src/parquet/parquet_support.rs b/native/core/src/parquet/parquet_support.rs index 8ccdf0a100..21cc2f67bb 100644 --- a/native/core/src/parquet/parquet_support.rs +++ b/native/core/src/parquet/parquet_support.rs @@ -292,13 +292,12 @@ fn parquet_convert_struct_to_struct( // Spark 4.1+). Legacy mode marks the whole column null; the new default // preserves the file's parent-row nullness so non-null parents materialize // as a struct of all-null fields. - let nulls = if field_overlap - || !parquet_options.return_null_struct_if_all_fields_missing - { - array.nulls().cloned() - } else { - Some(NullBuffer::new_null(array.len())) - }; + let nulls = + if field_overlap || !parquet_options.return_null_struct_if_all_fields_missing { + array.nulls().cloned() + } else { + Some(NullBuffer::new_null(array.len())) + }; Ok(Arc::new(StructArray::new( to_fields.clone(), From 59a56e0c9d11ddd474db9531b0456eb12f62e9f2 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sun, 3 May 2026 15:15:43 -0600 Subject: [PATCH 3/6] fix: pin pre-existing missing-all-fields test to legacy mode + restore IgnoreComet.scala CI on Spark 4.1 surfaced two follow-ups to the issue #4136 fix: 1. The pre-existing CometExpressionSuite test `vectorized reader: missing all struct fields` expected the legacy answer `Row(null) :: Row(null) :: Row(null)` regardless of Spark version, but did not pin `LEGACY_PARQUET_RETURN_NULL_STRUCT_IF_ALL_FIELDS_MISSING`. With the fix in place Comet correctly mirrors the new Spark 4.1 default (`Row(Row(null,null)) :: Row(Row(null,null)) :: Row(null)`), so the test broke. Pin the conf to "true" to keep the test asserting legacy semantics on every Spark version. The non-legacy semantics are covered by the `issue #4136` test in CometNativeReaderSuite, which now also iterates over `COLUMN_VECTOR_OFFHEAP_ENABLED` and the nested-vectorized-reader flag to mirror the upstream Spark `vectorized reader: missing all struct fields` test in `ParquetIOSuite`. 2. The previous regeneration of `dev/diffs/4.1.1.diff` dropped the `IgnoreComet.scala` source file (it's untracked in the Spark working tree, so `git diff` skips it without an explicit `git add`). Spark SQL CI then failed with `object IgnoreCometSuite is not a member of package org.apache.spark.sql`. Re-add the file via `git add` and regenerate so the diff includes its 45-line definition again. --- dev/diffs/4.1.1.diff | 51 +++++++++++++++++++ .../apache/comet/CometExpressionSuite.scala | 8 ++- .../comet/exec/CometNativeReaderSuite.scala | 17 +++++-- 3 files changed, 70 insertions(+), 6 deletions(-) diff --git a/dev/diffs/4.1.1.diff b/dev/diffs/4.1.1.diff index 6d35aeef63..d36383f3d3 100644 --- a/dev/diffs/4.1.1.diff +++ b/dev/diffs/4.1.1.diff @@ -931,6 +931,57 @@ index 95e86fe4311..0f7ed3271d4 100644 }.flatten assert(filters.contains(GreaterThan(scan.logicalPlan.output.head, Literal(5L)))) } +diff --git a/sql/core/src/test/scala/org/apache/spark/sql/IgnoreComet.scala b/sql/core/src/test/scala/org/apache/spark/sql/IgnoreComet.scala +new file mode 100644 +index 00000000000..5691536c114 +--- /dev/null ++++ b/sql/core/src/test/scala/org/apache/spark/sql/IgnoreComet.scala +@@ -0,0 +1,45 @@ ++/* ++ * Licensed to the Apache Software Foundation (ASF) under one or more ++ * contributor license agreements. See the NOTICE file distributed with ++ * this work for additional information regarding copyright ownership. ++ * The ASF licenses this file to You under the Apache License, Version 2.0 ++ * (the "License"); you may not use this file except in compliance with ++ * the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++ ++package org.apache.spark.sql ++ ++import org.scalactic.source.Position ++import org.scalatest.Tag ++ ++import org.apache.spark.sql.test.SQLTestUtils ++ ++/** ++ * Tests with this tag will be ignored when Comet is enabled (e.g., via `ENABLE_COMET`). ++ */ ++case class IgnoreComet(reason: String) extends Tag("DisableComet") ++case class IgnoreCometNativeIcebergCompat(reason: String) extends Tag("DisableComet") ++case class IgnoreCometNativeDataFusion(reason: String) extends Tag("DisableComet") ++case class IgnoreCometNativeScan(reason: String) extends Tag("DisableComet") ++ ++/** ++ * Helper trait that disables Comet for all tests regardless of default config values. ++ */ ++trait IgnoreCometSuite extends SQLTestUtils { ++ override protected def test(testName: String, testTags: Tag*)(testFun: => Any) ++ (implicit pos: Position): Unit = { ++ if (isCometEnabled) { ++ ignore(testName + " (disabled when Comet is on)", testTags: _*)(testFun) ++ } else { ++ super.test(testName, testTags: _*)(testFun) ++ } ++ } ++} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinHintSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinHintSuite.scala index 53e47f428c3..a55d8f0c161 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JoinHintSuite.scala diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index 4fcaa550ae..92ae4b9b52 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -2998,7 +2998,13 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { CometConf.COMET_EXPLAIN_FALLBACK_ENABLED.key -> "false", CometConf.COMET_NATIVE_SCAN_IMPL.key -> "native_datafusion", SQLConf.PARQUET_VECTORIZED_READER_NESTED_COLUMN_ENABLED.key -> "true", - SQLConf.COLUMN_VECTOR_OFFHEAP_ENABLED.key -> offheapEnabled.toString) { + SQLConf.COLUMN_VECTOR_OFFHEAP_ENABLED.key -> offheapEnabled.toString, + // SPARK-53535 (Spark 4.1+) flipped the default to "false", which preserves the parent + // struct's nullness so non-null parents materialise as Row(Row(null, null)). This test + // asserts the legacy "all missing fields => null struct" answer, so pin the conf to + // "true" to keep the expectation valid on both 3.x/4.0 and 4.1+. The non-legacy + // behaviour is covered separately by `issue #4136` in CometNativeReaderSuite. + "spark.sql.legacy.parquet.returnNullStructIfAllFieldsMissing" -> "true") { val data = Seq(Tuple1((1, "a")), Tuple1((2, null)), Tuple1(null)) val readSchema = new StructType().add( diff --git a/spark/src/test/scala/org/apache/comet/exec/CometNativeReaderSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometNativeReaderSuite.scala index ea8cc962dd..d924d51902 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometNativeReaderSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometNativeReaderSuite.scala @@ -723,11 +723,18 @@ class CometNativeReaderSuite extends CometTestBase with AdaptiveSparkPlanHelper .write .parquet(path.getCanonicalPath) - Seq("true", "false").foreach { legacy => - withSQLConf("spark.sql.legacy.parquet.returnNullStructIfAllFieldsMissing" -> legacy) { - val df = spark.read.schema(readSchema).parquet(path.getCanonicalPath) - checkSparkAnswer(df) - } + // Mirror the toggles in Spark's `vectorized reader: missing all struct fields` test in + // ParquetIOSuite, including off-heap on/off and the explicit nested-column vectorized + // reader flag. We've seen CI fail on the off-heap branch when the on-heap branch passes. + for { + offheapEnabled <- Seq("true", "false") + legacy <- Seq("true", "false") + } withSQLConf( + "spark.sql.parquet.enableNestedColumnVectorizedReader" -> "true", + "spark.sql.legacy.parquet.returnNullStructIfAllFieldsMissing" -> legacy, + "spark.sql.columnVector.offheap.enabled" -> offheapEnabled) { + val df = spark.read.schema(readSchema).parquet(path.getCanonicalPath) + checkSparkAnswer(df) } } } From b85f6c26f3d69ac37e5f43f84475df8f1a361146 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sun, 3 May 2026 16:47:54 -0600 Subject: [PATCH 4/6] fix: skip SPARK-54220 NullType test; upstream parquet-rs gap MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The Spark SQL test suite for 4.1 exercises `SPARK-54220: vectorized reader: missing all struct fields, struct with NullType only`, which writes NullType columns as `BOOLEAN + LogicalType::Unknown`. parquet-rs only accepts `INT32 + Unknown` (parquet-57.2.0/src/schema/types.rs:401) and rejects the Spark-produced file at decode time with: Parquet error: Cannot annotate Unknown from BOOLEAN for field '_1' This is an upstream Spark↔parquet-rs compatibility gap, orthogonal to the missing-all-fields fix this PR lands. Re-add the `IgnoreComet` annotation on just that one test (pointing at the new follow-up #4199) and mark the local reproducer in `CometNativeReaderSuite` as canceled with the same reference. The four SPARK-53535 tests remain un-ignored and pass. --- dev/diffs/4.1.1.diff | 16 ++++-- .../comet/exec/CometNativeReaderSuite.scala | 49 ++++++++++++++++++- 2 files changed, 61 insertions(+), 4 deletions(-) diff --git a/dev/diffs/4.1.1.diff b/dev/diffs/4.1.1.diff index d36383f3d3..6052aef79b 100644 --- a/dev/diffs/4.1.1.diff +++ b/dev/diffs/4.1.1.diff @@ -2993,7 +2993,7 @@ index 6b73cc8618d..624694916fb 100644 case _ => assert(false, "Can not match ParquetTable in the query.") } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala -index 3072657a095..3ea9b8543a9 100644 +index 3072657a095..6b5b9103363 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -40,6 +40,7 @@ import org.apache.parquet.schema.{MessageType, MessageTypeParser} @@ -3004,7 +3004,17 @@ index 3072657a095..3ea9b8543a9 100644 import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Attribute, GenericInternalRow, UnsafeRow} import org.apache.spark.sql.catalyst.util.{DateTimeConstants, DateTimeUtils} -@@ -1282,7 +1283,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession +@@ -953,7 +954,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession + } + } + +- test("SPARK-54220: vectorized reader: missing all struct fields, struct with NullType only") { ++ test("SPARK-54220: vectorized reader: missing all struct fields, struct with NullType only", ++ IgnoreComet("https://github.com/apache/datafusion-comet/issues/4199")) { + val data = Seq( + Tuple1((null, null)), + Tuple1((null, null)), +@@ -1282,7 +1284,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession } } @@ -3014,7 +3024,7 @@ index 3072657a095..3ea9b8543a9 100644 val data = (1 to 4).map(i => Tuple1(i.toString)) val readSchema = StructType(Seq(StructField("_1", DataTypes.TimestampType))) -@@ -1567,7 +1569,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession +@@ -1567,7 +1570,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession } } diff --git a/spark/src/test/scala/org/apache/comet/exec/CometNativeReaderSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometNativeReaderSuite.scala index d924d51902..62efbfb676 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometNativeReaderSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometNativeReaderSuite.scala @@ -33,7 +33,7 @@ import org.apache.spark.sql.{CometTestBase, Row} import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper import org.apache.spark.sql.functions.{array, col} import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructType} +import org.apache.spark.sql.types.{IntegerType, LongType, NullType, StringType, StructType} import org.apache.comet.CometConf import org.apache.comet.CometSparkSessionExtensions.isSpark41Plus @@ -739,6 +739,53 @@ class CometNativeReaderSuite extends CometTestBase with AdaptiveSparkPlanHelper } } + test("issue #4136: struct with only NullType fields in file (SPARK-54220)") { + // The upstream SPARK-54220 test writes `Tuple1((null, null))` which is inferred as a struct + // of NullType fields on disk; reading with a schema that asks for Int/String on top of + // NullType fails at parquet decode time because Spark encodes NullType as + // `BOOLEAN + LogicalType::Unknown` but parquet-rs only accepts `INT32 + Unknown`. See + // #4199 for the upstream compatibility gap. + assume( + false, + "Skipped until parquet-rs accepts BOOLEAN + Unknown for NullType " + + "(https://github.com/apache/datafusion-comet/issues/4199)") + + val tableSchema = new StructType().add( + "_1", + new StructType() + .add("_1", NullType) + .add("_2", NullType), + nullable = true) + + val readSchema = new StructType().add( + "_1", + new StructType() + .add("_3", IntegerType, nullable = true) + .add("_4", StringType, nullable = true), + nullable = true) + + val data = + java.util.Arrays.asList(Row(Row(null, null)), Row(Row(null, null)), Row(null)) + + withTempPath { path => + spark + .createDataFrame(data, tableSchema) + .write + .parquet(path.getCanonicalPath) + + for { + offheapEnabled <- Seq("true", "false") + legacy <- Seq("true", "false") + } withSQLConf( + "spark.sql.parquet.enableNestedColumnVectorizedReader" -> "true", + "spark.sql.legacy.parquet.returnNullStructIfAllFieldsMissing" -> legacy, + "spark.sql.columnVector.offheap.enabled" -> offheapEnabled) { + val df = spark.read.schema(readSchema).parquet(path.getCanonicalPath) + checkSparkAnswer(df) + } + } + } + /** Write a Parquet file using a raw RecordConsumer for full schema control. */ private def writeDirect( path: String, From 001810ffdb34b3adbc19db6701a68a605436a05c Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 4 May 2026 07:05:08 -0600 Subject: [PATCH 5/6] test: unignore user-defined struct schema test on Spark 4.1 (#4192) The `native reader - select struct field with user defined schema` test in `CometNativeReaderSuite` is the same missing-all-fields-from-file scenario that this PR already fixes: its third subcase reads `named_struct('a', 0, 'b', 'xyz')` with a read schema of `c0: struct`, where both requested leaves are missing from the file. Drop the `assume(!isSpark41Plus, "#4098")` guard so CI exercises it on 4.1; the test passes locally on both `native_datafusion` and `native_iceberg_compat` with the `parquet_convert_struct_to_struct` fix in place. Also closes #4192. --- .../scala/org/apache/comet/exec/CometNativeReaderSuite.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/spark/src/test/scala/org/apache/comet/exec/CometNativeReaderSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometNativeReaderSuite.scala index 62efbfb676..69397272cb 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometNativeReaderSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometNativeReaderSuite.scala @@ -389,7 +389,6 @@ class CometNativeReaderSuite extends CometTestBase with AdaptiveSparkPlanHelper } test("native reader - select struct field with user defined schema") { - assume(!isSpark41Plus, "https://github.com/apache/datafusion-comet/issues/4098") // extract existing A column var readSchema = new StructType().add( "c0", From 15c12d4b5ca1bcc6d32001b2b30643d4fd4e09d9 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 4 May 2026 19:36:00 -0600 Subject: [PATCH 6/6] refactor: flip if/else for readability per review feedback Put the special case (all-null struct) in the if-branch and the common case (preserve existing nulls) in the else-branch. --- native/core/src/parquet/parquet_support.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/native/core/src/parquet/parquet_support.rs b/native/core/src/parquet/parquet_support.rs index 21cc2f67bb..1e0c64ea4b 100644 --- a/native/core/src/parquet/parquet_support.rs +++ b/native/core/src/parquet/parquet_support.rs @@ -293,10 +293,10 @@ fn parquet_convert_struct_to_struct( // preserves the file's parent-row nullness so non-null parents materialize // as a struct of all-null fields. let nulls = - if field_overlap || !parquet_options.return_null_struct_if_all_fields_missing { - array.nulls().cloned() - } else { + if !field_overlap && parquet_options.return_null_struct_if_all_fields_missing { Some(NullBuffer::new_null(array.len())) + } else { + array.nulls().cloned() }; Ok(Arc::new(StructArray::new(