diff --git a/common/src/main/java/org/apache/comet/parquet/Native.java b/common/src/main/java/org/apache/comet/parquet/Native.java index ec375402cb..fbe7f23875 100644 --- a/common/src/main/java/org/apache/comet/parquet/Native.java +++ b/common/src/main/java/org/apache/comet/parquet/Native.java @@ -226,6 +226,7 @@ public static native long initRecordBatchReader( String sessionTimezone, int batchSize, boolean caseSensitive, + boolean returnNullStructIfAllFieldsMissing, Map objectStoreOptions, CometFileKeyUnwrapper keyUnwrapper, Object metricsNode); diff --git a/common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java b/common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java index 9413b9316d..a2ee4963d9 100644 --- a/common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java +++ b/common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java @@ -159,6 +159,11 @@ public URI pathUri() throws URISyntaxException { protected boolean isCaseSensitive; protected boolean useFieldId; protected boolean ignoreMissingIds; + // SPARK-53535 (Spark 4.1+): when reading a struct whose requested fields are all + // missing in the Parquet file, true returns the entire struct as null (legacy + // pre-4.1 behavior); false preserves the parent struct's nullness from the file + // so non-null parents materialize as a struct of all-null fields. + protected boolean returnNullStructIfAllFieldsMissing = true; protected StructType partitionSchema; protected InternalRow partitionValues; protected PartitionedFile file; @@ -278,6 +283,7 @@ private NativeBatchReader(AbstractColumnReader[] columnReaders) { boolean useFieldId, boolean ignoreMissingIds, boolean useLegacyDateTimestamp, + boolean returnNullStructIfAllFieldsMissing, StructType partitionSchema, InternalRow partitionValues, Map metrics, @@ -290,6 +296,7 @@ private NativeBatchReader(AbstractColumnReader[] columnReaders) { this.useFieldId = useFieldId; this.ignoreMissingIds = ignoreMissingIds; this.useLegacyDateTimestamp = useLegacyDateTimestamp; + this.returnNullStructIfAllFieldsMissing = returnNullStructIfAllFieldsMissing; this.partitionSchema = partitionSchema; this.partitionValues = partitionValues; this.file = inputSplit; @@ -578,6 +585,7 @@ public void init() throws Throwable { timeZoneId, batchSize, caseSensitive, + returnNullStructIfAllFieldsMissing, objectStoreOptions, keyUnwrapper, metricsNode); diff --git a/dev/diffs/4.1.1.diff b/dev/diffs/4.1.1.diff index ac56840013..6052aef79b 100644 --- a/dev/diffs/4.1.1.diff +++ b/dev/diffs/4.1.1.diff @@ -2993,7 +2993,7 @@ index 6b73cc8618d..624694916fb 100644 case _ => assert(false, "Can not match ParquetTable in the query.") } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala -index 3072657a095..b2293ccab17 100644 +index 3072657a095..6b5b9103363 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -40,6 +40,7 @@ import org.apache.parquet.schema.{MessageType, MessageTypeParser} @@ -3004,57 +3004,17 @@ index 3072657a095..b2293ccab17 100644 import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Attribute, GenericInternalRow, UnsafeRow} import org.apache.spark.sql.catalyst.util.{DateTimeConstants, DateTimeUtils} -@@ -765,7 +766,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession - } - } - -- test("vectorized reader: missing all struct fields") { -+ test("vectorized reader: missing all struct fields", -+ IgnoreComet("https://github.com/apache/datafusion-comet/issues/4136")) { - for { - offheapEnabled <- Seq(true, false) - returnNullStructIfAllFieldsMissing <- Seq(true, false) -@@ -803,7 +805,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession - } - } - -- test("SPARK-53535: vectorized reader: missing all struct fields, struct with complex fields") { -+ test("SPARK-53535: vectorized reader: missing all struct fields, struct with complex fields", -+ IgnoreComet("https://github.com/apache/datafusion-comet/issues/4136")) { - val data = Seq( - Row(Row(Seq(11, 12, null, 14), Row("21", 22), Row(true)), 100), - Row(Row(Seq(11, 12, null, 14), Row("21", 22), Row(false)), 100), -@@ -858,7 +861,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession - } - } - -- test("SPARK-53535: vectorized reader: missing all struct fields, struct with map field only") { -+ test("SPARK-53535: vectorized reader: missing all struct fields, struct with map field only", -+ IgnoreComet("https://github.com/apache/datafusion-comet/issues/4136")) { - val data = Seq( - Row(Row(Map("key1" -> 1)), 100), - Row(Row(Map("key2" -> 2)), 100), -@@ -903,7 +907,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession - } - - test("SPARK-53535: vectorized reader: missing all struct fields, " + -- "struct with cheap map and more expensive array field") { -+ "struct with cheap map and more expensive array field", -+ IgnoreComet("https://github.com/apache/datafusion-comet/issues/4136")) { - val data = Seq( - Row(Row(Map(false -> Row("expensive", 1)), Seq("test1")), 100), - Row(Row(Map(true -> Row("expensive", 2)), Seq("test2")), 100), -@@ -953,7 +958,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession +@@ -953,7 +954,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession } } - test("SPARK-54220: vectorized reader: missing all struct fields, struct with NullType only") { + test("SPARK-54220: vectorized reader: missing all struct fields, struct with NullType only", -+ IgnoreComet("https://github.com/apache/datafusion-comet/issues/4136")) { ++ IgnoreComet("https://github.com/apache/datafusion-comet/issues/4199")) { val data = Seq( Tuple1((null, null)), Tuple1((null, null)), -@@ -1282,7 +1288,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession +@@ -1282,7 +1284,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession } } @@ -3064,7 +3024,7 @@ index 3072657a095..b2293ccab17 100644 val data = (1 to 4).map(i => Tuple1(i.toString)) val readSchema = StructType(Seq(StructField("_1", DataTypes.TimestampType))) -@@ -1567,7 +1574,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession +@@ -1567,7 +1570,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession } } diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index f3df4b522c..13eb73eb50 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -1248,6 +1248,7 @@ impl PhysicalPlanner { default_values, common.session_timezone.as_str(), common.case_sensitive, + common.return_null_struct_if_all_fields_missing, self.session_ctx(), common.encryption_enabled, )?; diff --git a/native/core/src/parquet/mod.rs b/native/core/src/parquet/mod.rs index 61ff4fc0db..3d61251447 100644 --- a/native/core/src/parquet/mod.rs +++ b/native/core/src/parquet/mod.rs @@ -438,6 +438,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_parquet_Native_initRecordBat session_timezone: JString, batch_size: jint, case_sensitive: jboolean, + return_null_struct_if_all_fields_missing: jboolean, object_store_options: JObject, key_unwrapper_obj: JObject, metrics_node: JObject, @@ -511,6 +512,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_parquet_Native_initRecordBat None, session_timezone.as_str(), case_sensitive != JNI_FALSE, + return_null_struct_if_all_fields_missing != JNI_FALSE, session_ctx, encryption_enabled, )?; diff --git a/native/core/src/parquet/parquet_exec.rs b/native/core/src/parquet/parquet_exec.rs index ef4c878b9a..e67700e629 100644 --- a/native/core/src/parquet/parquet_exec.rs +++ b/native/core/src/parquet/parquet_exec.rs @@ -71,12 +71,14 @@ pub(crate) fn init_datasource_exec( default_values: Option>, session_timezone: &str, case_sensitive: bool, + return_null_struct_if_all_fields_missing: bool, session_ctx: &Arc, encryption_enabled: bool, ) -> Result, ExecutionError> { let (table_parquet_options, spark_parquet_options) = get_options( session_timezone, case_sensitive, + return_null_struct_if_all_fields_missing, &object_store_url, encryption_enabled, ); @@ -185,6 +187,7 @@ pub(crate) fn init_datasource_exec( fn get_options( session_timezone: &str, case_sensitive: bool, + return_null_struct_if_all_fields_missing: bool, object_store_url: &ObjectStoreUrl, encryption_enabled: bool, ) -> (TableParquetOptions, SparkParquetOptions) { @@ -196,6 +199,8 @@ fn get_options( SparkParquetOptions::new(EvalMode::Legacy, session_timezone, false); spark_parquet_options.allow_cast_unsigned_ints = true; spark_parquet_options.case_sensitive = case_sensitive; + spark_parquet_options.return_null_struct_if_all_fields_missing = + return_null_struct_if_all_fields_missing; if encryption_enabled { table_parquet_options.crypto.configure_factory( diff --git a/native/core/src/parquet/parquet_support.rs b/native/core/src/parquet/parquet_support.rs index 3418a17c43..1e0c64ea4b 100644 --- a/native/core/src/parquet/parquet_support.rs +++ b/native/core/src/parquet/parquet_support.rs @@ -79,6 +79,11 @@ pub struct SparkParquetOptions { pub use_legacy_date_timestamp_or_ntz: bool, // Whether schema field names are case sensitive pub case_sensitive: bool, + /// SPARK-53535 (Spark 4.1+): when reading a struct whose requested fields are all + /// missing in the Parquet file, true returns the entire struct as null (pre-4.1 + /// legacy behavior); false preserves the parent struct's nullness from the file + /// so non-null parents return a struct of all-null fields. + pub return_null_struct_if_all_fields_missing: bool, } impl SparkParquetOptions { @@ -91,6 +96,7 @@ impl SparkParquetOptions { use_decimal_128: false, use_legacy_date_timestamp_or_ntz: false, case_sensitive: false, + return_null_struct_if_all_fields_missing: true, } } @@ -103,6 +109,7 @@ impl SparkParquetOptions { use_decimal_128: false, use_legacy_date_timestamp_or_ntz: false, case_sensitive: false, + return_null_struct_if_all_fields_missing: true, } } } @@ -279,13 +286,18 @@ fn parquet_convert_struct_to_struct( } } - // If target schema doesn't contain any of the existing fields - // mark such a column in array as NULL - let nulls = if field_overlap { - array.nulls().cloned() - } else { - Some(NullBuffer::new_null(array.len())) - }; + // When the file's struct contains none of the requested fields, the + // returned validity buffer depends on Spark's + // `spark.sql.legacy.parquet.returnNullStructIfAllFieldsMissing` (SPARK-53535, + // Spark 4.1+). Legacy mode marks the whole column null; the new default + // preserves the file's parent-row nullness so non-null parents materialize + // as a struct of all-null fields. + let nulls = + if !field_overlap && parquet_options.return_null_struct_if_all_fields_missing { + Some(NullBuffer::new_null(array.len())) + } else { + array.nulls().cloned() + }; Ok(Arc::new(StructArray::new( to_fields.clone(), diff --git a/native/proto/src/proto/operator.proto b/native/proto/src/proto/operator.proto index fb438b26a4..4eadb0042f 100644 --- a/native/proto/src/proto/operator.proto +++ b/native/proto/src/proto/operator.proto @@ -107,6 +107,11 @@ message NativeScanCommon { bool encryption_enabled = 11; string source = 12; repeated spark.spark_expression.DataType fields = 13; + // SPARK-53535 (Spark 4.1+): when reading a struct whose requested fields are all + // missing in the Parquet file, true returns the entire struct as null (legacy + // pre-4.1 behavior); false preserves the parent struct's nullness from the file + // so non-null parents return a struct of all-null fields. + bool return_null_struct_if_all_fields_missing = 14; } message NativeScan { diff --git a/spark/src/main/scala/org/apache/comet/parquet/CometParquetFileFormat.scala b/spark/src/main/scala/org/apache/comet/parquet/CometParquetFileFormat.scala index c2728fcabf..0d178dbf81 100644 --- a/spark/src/main/scala/org/apache/comet/parquet/CometParquetFileFormat.scala +++ b/spark/src/main/scala/org/apache/comet/parquet/CometParquetFileFormat.scala @@ -43,6 +43,7 @@ import org.apache.spark.sql.types.{DateType, StructType, TimestampType} import org.apache.spark.util.SerializableConfiguration import org.apache.comet.CometConf +import org.apache.comet.CometSparkSessionExtensions.isSpark41Plus import org.apache.comet.MetricsSupport import org.apache.comet.shims.ShimSQLConf import org.apache.comet.vector.CometVector @@ -96,6 +97,15 @@ class CometParquetFileFormat(session: SparkSession) val isCaseSensitive = sqlConf.caseSensitiveAnalysis val useFieldId = CometParquetUtils.readFieldId(sqlConf) val ignoreMissingIds = CometParquetUtils.ignoreMissingIds(sqlConf) + // SPARK-53535 (Spark 4.1+): when reading a struct whose requested fields are all + // missing in the Parquet file, the new default preserves the parent struct's + // nullness from the file. Pre-4.1 Spark hardcodes the legacy behavior, so we + // default to "true" there for backwards compatibility. + val returnNullStructIfAllFieldsMissing = sqlConf + .getConfString( + "spark.sql.legacy.parquet.returnNullStructIfAllFieldsMissing", + if (isSpark41Plus) "false" else "true") + .toBoolean val pushDownDate = sqlConf.parquetFilterPushDownDate val pushDownTimestamp = sqlConf.parquetFilterPushDownTimestamp val pushDownDecimal = sqlConf.parquetFilterPushDownDecimal @@ -158,6 +168,7 @@ class CometParquetFileFormat(session: SparkSession) useFieldId, ignoreMissingIds, datetimeRebaseSpec.mode == CORRECTED, + returnNullStructIfAllFieldsMissing, partitionSchema, file.partitionValues, metrics.asJava, diff --git a/spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala b/spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala index 70f06f5741..066b770bbb 100644 --- a/spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala +++ b/spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala @@ -31,7 +31,7 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.comet.{CometConf, ConfigEntry} import org.apache.comet.CometConf.COMET_EXEC_ENABLED -import org.apache.comet.CometSparkSessionExtensions.{hasExplainInfo, isSpark35Plus, withInfo} +import org.apache.comet.CometSparkSessionExtensions.{hasExplainInfo, isSpark35Plus, isSpark41Plus, withInfo} import org.apache.comet.objectstore.NativeConfig import org.apache.comet.parquet.CometParquetUtils import org.apache.comet.serde.{CometOperatorSerde, Compatible, OperatorOuterClass, SupportLevel} @@ -189,6 +189,17 @@ object CometNativeScan extends CometOperatorSerde[CometScanExec] with Logging { commonBuilder.setSessionTimezone(scan.conf.getConfString("spark.sql.session.timeZone")) commonBuilder.setCaseSensitive(scan.conf.getConf[Boolean](SQLConf.CASE_SENSITIVE)) + // SPARK-53535 (Spark 4.1+): when reading a struct whose requested fields are all + // missing in the Parquet file, the new default preserves the parent struct's + // nullness from the file (so non-null parents materialize as a struct of all-null + // fields). Pre-4.1 Spark hardcodes the legacy behavior (whole struct null), which + // matches the Comet default we use as fallback. + val returnNullStructConfKey = + "spark.sql.legacy.parquet.returnNullStructIfAllFieldsMissing" + val returnNullStructDefault = if (isSpark41Plus) "false" else "true" + commonBuilder.setReturnNullStructIfAllFieldsMissing( + scan.conf.getConfString(returnNullStructConfKey, returnNullStructDefault).toBoolean) + // Collect S3/cloud storage configurations val hadoopConf = scan.relation.sparkSession.sessionState .newHadoopConfWithOptions(scan.relation.options) diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index 4fcaa550ae..92ae4b9b52 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -2998,7 +2998,13 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { CometConf.COMET_EXPLAIN_FALLBACK_ENABLED.key -> "false", CometConf.COMET_NATIVE_SCAN_IMPL.key -> "native_datafusion", SQLConf.PARQUET_VECTORIZED_READER_NESTED_COLUMN_ENABLED.key -> "true", - SQLConf.COLUMN_VECTOR_OFFHEAP_ENABLED.key -> offheapEnabled.toString) { + SQLConf.COLUMN_VECTOR_OFFHEAP_ENABLED.key -> offheapEnabled.toString, + // SPARK-53535 (Spark 4.1+) flipped the default to "false", which preserves the parent + // struct's nullness so non-null parents materialise as Row(Row(null, null)). This test + // asserts the legacy "all missing fields => null struct" answer, so pin the conf to + // "true" to keep the expectation valid on both 3.x/4.0 and 4.1+. The non-legacy + // behaviour is covered separately by `issue #4136` in CometNativeReaderSuite. + "spark.sql.legacy.parquet.returnNullStructIfAllFieldsMissing" -> "true") { val data = Seq(Tuple1((1, "a")), Tuple1((2, null)), Tuple1(null)) val readSchema = new StructType().add( diff --git a/spark/src/test/scala/org/apache/comet/exec/CometNativeReaderSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometNativeReaderSuite.scala index b49cec0f5a..69397272cb 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometNativeReaderSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometNativeReaderSuite.scala @@ -33,7 +33,7 @@ import org.apache.spark.sql.{CometTestBase, Row} import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper import org.apache.spark.sql.functions.{array, col} import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.{IntegerType, StringType, StructType} +import org.apache.spark.sql.types.{IntegerType, LongType, NullType, StringType, StructType} import org.apache.comet.CometConf import org.apache.comet.CometSparkSessionExtensions.isSpark41Plus @@ -389,7 +389,6 @@ class CometNativeReaderSuite extends CometTestBase with AdaptiveSparkPlanHelper } test("native reader - select struct field with user defined schema") { - assume(!isSpark41Plus, "https://github.com/apache/datafusion-comet/issues/4098") // extract existing A column var readSchema = new StructType().add( "c0", @@ -685,6 +684,107 @@ class CometNativeReaderSuite extends CometTestBase with AdaptiveSparkPlanHelper } } + test("issue #4136: struct with all requested fields missing in file") { + // SPARK-53535 (Spark 4.1) added LEGACY_PARQUET_RETURN_NULL_STRUCT_IF_ALL_FIELDS_MISSING. + // With the new default (false), Spark's vectorized reader appends a "marker" leaf field to + // the Parquet read schema so it can distinguish a null parent row from a non-null parent + // whose requested fields are all missing from the file, then truncates the marker out of + // the ColumnarBatch. Comet's native scans don't implement this, so they conflate the two + // cases and return Row(null) for non-null parents. + assume( + isSpark41Plus, + "LEGACY_PARQUET_RETURN_NULL_STRUCT_IF_ALL_FIELDS_MISSING was introduced in Spark 4.1") + + val tableSchema = new StructType().add( + "_1", + new StructType() + .add("_1", IntegerType) + .add("_2", StringType), + nullable = true) + + // Read schema requests _3, _4 — fields that don't exist in the file's _1 struct. + val readSchema = new StructType().add( + "_1", + new StructType() + .add("_3", IntegerType, nullable = true) + .add("_4", LongType, nullable = true), + nullable = true) + + val data = java.util.Arrays.asList( + Row(Row(1, "a")), // non-null parent, requested fields missing in file + Row(Row(2, null)), // non-null parent, requested fields missing in file + Row(null) // null parent + ) + + withTempPath { path => + spark + .createDataFrame(data, tableSchema) + .write + .parquet(path.getCanonicalPath) + + // Mirror the toggles in Spark's `vectorized reader: missing all struct fields` test in + // ParquetIOSuite, including off-heap on/off and the explicit nested-column vectorized + // reader flag. We've seen CI fail on the off-heap branch when the on-heap branch passes. + for { + offheapEnabled <- Seq("true", "false") + legacy <- Seq("true", "false") + } withSQLConf( + "spark.sql.parquet.enableNestedColumnVectorizedReader" -> "true", + "spark.sql.legacy.parquet.returnNullStructIfAllFieldsMissing" -> legacy, + "spark.sql.columnVector.offheap.enabled" -> offheapEnabled) { + val df = spark.read.schema(readSchema).parquet(path.getCanonicalPath) + checkSparkAnswer(df) + } + } + } + + test("issue #4136: struct with only NullType fields in file (SPARK-54220)") { + // The upstream SPARK-54220 test writes `Tuple1((null, null))` which is inferred as a struct + // of NullType fields on disk; reading with a schema that asks for Int/String on top of + // NullType fails at parquet decode time because Spark encodes NullType as + // `BOOLEAN + LogicalType::Unknown` but parquet-rs only accepts `INT32 + Unknown`. See + // #4199 for the upstream compatibility gap. + assume( + false, + "Skipped until parquet-rs accepts BOOLEAN + Unknown for NullType " + + "(https://github.com/apache/datafusion-comet/issues/4199)") + + val tableSchema = new StructType().add( + "_1", + new StructType() + .add("_1", NullType) + .add("_2", NullType), + nullable = true) + + val readSchema = new StructType().add( + "_1", + new StructType() + .add("_3", IntegerType, nullable = true) + .add("_4", StringType, nullable = true), + nullable = true) + + val data = + java.util.Arrays.asList(Row(Row(null, null)), Row(Row(null, null)), Row(null)) + + withTempPath { path => + spark + .createDataFrame(data, tableSchema) + .write + .parquet(path.getCanonicalPath) + + for { + offheapEnabled <- Seq("true", "false") + legacy <- Seq("true", "false") + } withSQLConf( + "spark.sql.parquet.enableNestedColumnVectorizedReader" -> "true", + "spark.sql.legacy.parquet.returnNullStructIfAllFieldsMissing" -> legacy, + "spark.sql.columnVector.offheap.enabled" -> offheapEnabled) { + val df = spark.read.schema(readSchema).parquet(path.getCanonicalPath) + checkSparkAnswer(df) + } + } + } + /** Write a Parquet file using a raw RecordConsumer for full schema control. */ private def writeDirect( path: String,