From d474ba37571233db0aaf317291a5e4e6eab12b01 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 25 Apr 2026 11:33:06 -0600 Subject: [PATCH 01/17] test: add ParquetSchemaMismatchSuite skeleton for issue #3720 --- .../parquet/ParquetSchemaMismatchSuite.scala | 76 +++++++++++++++++++ 1 file changed, 76 insertions(+) create mode 100644 spark/src/test/scala/org/apache/comet/parquet/ParquetSchemaMismatchSuite.scala diff --git a/spark/src/test/scala/org/apache/comet/parquet/ParquetSchemaMismatchSuite.scala b/spark/src/test/scala/org/apache/comet/parquet/ParquetSchemaMismatchSuite.scala new file mode 100644 index 0000000000..190f231d54 --- /dev/null +++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetSchemaMismatchSuite.scala @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.parquet + +import scala.util.Try + +import org.apache.hadoop.fs.Path +import org.apache.parquet.example.data.simple.SimpleGroup +import org.apache.parquet.schema.MessageTypeParser +import org.apache.spark.SparkException +import org.apache.spark.sql.{CometTestBase, DataFrame} +import org.apache.spark.sql.internal.SQLConf + +import org.apache.comet.CometConf + +/** + * Documents Comet's behavior for the Parquet read-schema/file-schema mismatch cases tracked in + * https://github.com/apache/datafusion-comet/issues/3720. + * + * Each test exercises one case under one of the two Comet scan implementations + * (`native_datafusion`, `native_iceberg_compat`). Assertions encode Comet's actual current + * behavior. Spark's reference behavior is recorded in the per-case comments and in the matrix + * below; assertions do not run Spark in isolation. + * + * Behavior matrix (Spark reference behavior; Comet behavior is asserted by each test). "OK" = + * read succeeds. "throw" = SparkException at runtime. + * + * Case 3.4 3.5 4.0 + * 1. BINARY -> TIMESTAMP throw throw throw 2. INT32 -> INT64 throw throw OK (widening) 3. INT96 + * LTZ -> TIMESTAMP_NTZ throw throw throw 4. Decimal(10,2) -> Decimal(5,0) throw throw throw + * 5. INT32 -> INT64 with rowgroup filter throw throw OK 6. STRING -> INT throw throw throw + * 7. TIMESTAMP_NTZ -> ARRAY<...> throw throw throw C1. INT8 -> INT32 OK OK OK C2. FLOAT -> + * DOUBLE OK OK OK + * + * If a Comet fix lands that aligns one of these cases with Spark, update the affected test(s) and + * this matrix in the same PR. + */ +class ParquetSchemaMismatchSuite extends CometTestBase { + import testImplicits._ + + /** + * Force a specific Comet scan implementation, force V1 datasource (both native_datafusion and + * native_iceberg_compat are V1-only), then run the given block in a fresh temp directory. The + * block writes Parquet under `path`, then reads it back with a mismatched schema. + */ + private def withMismatchedSchema(scanImpl: String)(body: String => Unit): Unit = { + withSQLConf( + CometConf.COMET_NATIVE_SCAN_IMPL.key -> scanImpl, + SQLConf.USE_V1_SOURCE_LIST.key -> "parquet") { + withTempPath { dir => + body(dir.getCanonicalPath) + } + } + } + + /** Both scan implementations under test, used as a `foreach` driver. */ + private val scanImpls: Seq[String] = + Seq(CometConf.SCAN_NATIVE_DATAFUSION, CometConf.SCAN_NATIVE_ICEBERG_COMPAT) +} From bee01e8fc026584d1fdd5f0f95309d0cb1985325 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 25 Apr 2026 11:46:18 -0600 Subject: [PATCH 02/17] test: case 1 binary read as timestamp Both native_datafusion and native_iceberg_compat throw SparkException (matching Spark's reference behavior). The withMismatchedSchema helper was redesigned to accept a separate check lambda so collect() executes while the temp directory is still present. --- .../parquet/ParquetSchemaMismatchSuite.scala | 68 +++++++++++++++---- 1 file changed, 54 insertions(+), 14 deletions(-) diff --git a/spark/src/test/scala/org/apache/comet/parquet/ParquetSchemaMismatchSuite.scala b/spark/src/test/scala/org/apache/comet/parquet/ParquetSchemaMismatchSuite.scala index 190f231d54..651f6513a3 100644 --- a/spark/src/test/scala/org/apache/comet/parquet/ParquetSchemaMismatchSuite.scala +++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetSchemaMismatchSuite.scala @@ -39,33 +39,40 @@ import org.apache.comet.CometConf * behavior. Spark's reference behavior is recorded in the per-case comments and in the matrix * below; assertions do not run Spark in isolation. * - * Behavior matrix (Spark reference behavior; Comet behavior is asserted by each test). "OK" = - * read succeeds. "throw" = SparkException at runtime. - * - * Case 3.4 3.5 4.0 - * 1. BINARY -> TIMESTAMP throw throw throw 2. INT32 -> INT64 throw throw OK (widening) 3. INT96 - * LTZ -> TIMESTAMP_NTZ throw throw throw 4. Decimal(10,2) -> Decimal(5,0) throw throw throw - * 5. INT32 -> INT64 with rowgroup filter throw throw OK 6. STRING -> INT throw throw throw - * 7. TIMESTAMP_NTZ -> ARRAY<...> throw throw throw C1. INT8 -> INT32 OK OK OK C2. FLOAT -> - * DOUBLE OK OK OK - * * If a Comet fix lands that aligns one of these cases with Spark, update the affected test(s) and - * this matrix in the same PR. + * the matrix below in the same PR. */ +// Behavior matrix (Spark reference behavior; Comet behavior is asserted by each +// test). "OK" = read succeeds. "throw" = SparkException at runtime. +// +// Case 3.4 3.5 4.0 +// 1. BINARY -> TIMESTAMP throw throw throw +// 2. INT32 -> INT64 throw throw OK (widening) +// 3. INT96 LTZ -> TIMESTAMP_NTZ throw throw throw +// 4. Decimal(10,2) -> Decimal(5,0) throw throw throw +// 5. INT32 -> INT64 with rowgroup filter throw throw OK +// 6. STRING -> INT throw throw throw +// 7. TIMESTAMP_NTZ -> ARRAY<...> throw throw throw +// C1. INT8 -> INT32 OK OK OK +// C2. FLOAT -> DOUBLE OK OK OK class ParquetSchemaMismatchSuite extends CometTestBase { import testImplicits._ /** * Force a specific Comet scan implementation, force V1 datasource (both native_datafusion and * native_iceberg_compat are V1-only), then run the given block in a fresh temp directory. The - * block writes Parquet under `path`, then reads it back with a mismatched schema. + * block writes Parquet under `path`, builds a DataFrame with a mismatched schema, and runs + * assertions inside `check`. The temp directory (and its files) is present for the entire + * duration of `body`, so `collect()` and other actions may be called safely inside `check`. */ - private def withMismatchedSchema(scanImpl: String)(body: String => Unit): Unit = { + private def withMismatchedSchema(scanImpl: String)(body: String => DataFrame)( + check: DataFrame => Unit): Unit = { withSQLConf( CometConf.COMET_NATIVE_SCAN_IMPL.key -> scanImpl, SQLConf.USE_V1_SOURCE_LIST.key -> "parquet") { withTempPath { dir => - body(dir.getCanonicalPath) + val df = body(dir.getCanonicalPath) + check(df) } } } @@ -73,4 +80,37 @@ class ParquetSchemaMismatchSuite extends CometTestBase { /** Both scan implementations under test, used as a `foreach` driver. */ private val scanImpls: Seq[String] = Seq(CometConf.SCAN_NATIVE_DATAFUSION, CometConf.SCAN_NATIVE_ICEBERG_COMPAT) + + // Case 1: BINARY read as TIMESTAMP. Spark throws SparkException on all + // versions. Both Comet scan implementations also throw: native_datafusion + // raises CometNativeException (column type mismatch); native_iceberg_compat + // raises SparkException (SchemaColumnConvertNotSupportedException). Both + // surface to the caller as SparkException. + scanImpls.foreach { scanImpl => + test(s"binary read as timestamp: $scanImpl") { + withMismatchedSchema(scanImpl) { path => + val schemaStr = + """message root { + | optional binary _1; + |} + """.stripMargin + val schema = MessageTypeParser.parseMessageType(schemaStr) + val writer = createParquetWriter(schema, new Path(path, "part-r-0.parquet")) + (0 until 10).foreach { i => + val record = new SimpleGroup(schema) + record.add(0, s"value-$i") + writer.write(record) + } + writer.close() + spark.read.schema("_1 timestamp").parquet(path) + } { df => + // Pattern 3 (throw): both scan implementations throw SparkException at + // collect time; the error message differs but the exception type is the + // same. Behavior matches Spark's reference behavior on all versions. + intercept[SparkException] { + df.collect() + } + } + } + } } From e19a5d227c344c283bc76af33fc8dca825c43174 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 25 Apr 2026 11:49:53 -0600 Subject: [PATCH 03/17] test: case 2 int32 read as int64 --- .../parquet/ParquetSchemaMismatchSuite.scala | 48 +++++++++++++++---- 1 file changed, 38 insertions(+), 10 deletions(-) diff --git a/spark/src/test/scala/org/apache/comet/parquet/ParquetSchemaMismatchSuite.scala b/spark/src/test/scala/org/apache/comet/parquet/ParquetSchemaMismatchSuite.scala index 651f6513a3..b604c27f83 100644 --- a/spark/src/test/scala/org/apache/comet/parquet/ParquetSchemaMismatchSuite.scala +++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetSchemaMismatchSuite.scala @@ -45,16 +45,16 @@ import org.apache.comet.CometConf // Behavior matrix (Spark reference behavior; Comet behavior is asserted by each // test). "OK" = read succeeds. "throw" = SparkException at runtime. // -// Case 3.4 3.5 4.0 -// 1. BINARY -> TIMESTAMP throw throw throw -// 2. INT32 -> INT64 throw throw OK (widening) -// 3. INT96 LTZ -> TIMESTAMP_NTZ throw throw throw -// 4. Decimal(10,2) -> Decimal(5,0) throw throw throw -// 5. INT32 -> INT64 with rowgroup filter throw throw OK -// 6. STRING -> INT throw throw throw -// 7. TIMESTAMP_NTZ -> ARRAY<...> throw throw throw -// C1. INT8 -> INT32 OK OK OK -// C2. FLOAT -> DOUBLE OK OK OK +// Case Spark 3.4 3.5 4.0 Comet native_datafusion Comet native_iceberg_compat +// 1. BINARY -> TIMESTAMP throw throw throw throw throw +// 2. INT32 -> INT64 throw throw OK OK (widened values) throw +// 3. INT96 LTZ -> TIMESTAMP_NTZ throw throw throw ? ? +// 4. Decimal(10,2) -> Decimal(5,0) throw throw throw ? ? +// 5. INT32 -> INT64 w/ rowgroup filter throw throw OK ? ? +// 6. STRING -> INT throw throw throw ? ? +// 7. TIMESTAMP_NTZ -> ARRAY<...> throw throw throw ? ? +// C1. INT8 -> INT32 OK OK OK ? ? +// C2. FLOAT -> DOUBLE OK OK OK ? ? class ParquetSchemaMismatchSuite extends CometTestBase { import testImplicits._ @@ -113,4 +113,32 @@ class ParquetSchemaMismatchSuite extends CometTestBase { } } } + + // Case 2: INT32 read as INT64 (value-preserving widening). Spark 3.4/3.5 + // throw SparkException; Spark 4.0 allows widening. + // native_datafusion: succeeds with widened values (Pattern 1). + // native_iceberg_compat: throws SparkException (SchemaColumnConvertNotSupportedException + // from TypeUtil.checkParquetType); does not support INT32->INT64 widening. + test(s"int32 read as int64: ${CometConf.SCAN_NATIVE_DATAFUSION}") { + withMismatchedSchema(CometConf.SCAN_NATIVE_DATAFUSION) { path => + Seq(1, 2, 3).toDF("c").write.parquet(path) + spark.read.schema("c bigint").parquet(path) + } { df => + // Pattern 1 (value-preserving widening). + checkAnswer(df, Seq(1L, 2L, 3L).map(org.apache.spark.sql.Row(_))) + } + } + + test(s"int32 read as int64: ${CometConf.SCAN_NATIVE_ICEBERG_COMPAT}") { + withMismatchedSchema(CometConf.SCAN_NATIVE_ICEBERG_COMPAT) { path => + Seq(1, 2, 3).toDF("c").write.parquet(path) + spark.read.schema("c bigint").parquet(path) + } { df => + // Pattern 3 (throw): native_iceberg_compat rejects INT32->INT64 widening + // via TypeUtil.checkParquetType (SchemaColumnConvertNotSupportedException). + intercept[SparkException] { + df.collect() + } + } + } } From 96390ac03fcbdd9e28fa587a5832da514ca6a3f6 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 25 Apr 2026 11:53:34 -0600 Subject: [PATCH 04/17] test: case 3 timestamp_ltz read as timestamp_ntz --- .../parquet/ParquetSchemaMismatchSuite.scala | 45 ++++++++++++++++++- 1 file changed, 44 insertions(+), 1 deletion(-) diff --git a/spark/src/test/scala/org/apache/comet/parquet/ParquetSchemaMismatchSuite.scala b/spark/src/test/scala/org/apache/comet/parquet/ParquetSchemaMismatchSuite.scala index b604c27f83..547c11f60e 100644 --- a/spark/src/test/scala/org/apache/comet/parquet/ParquetSchemaMismatchSuite.scala +++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetSchemaMismatchSuite.scala @@ -48,7 +48,7 @@ import org.apache.comet.CometConf // Case Spark 3.4 3.5 4.0 Comet native_datafusion Comet native_iceberg_compat // 1. BINARY -> TIMESTAMP throw throw throw throw throw // 2. INT32 -> INT64 throw throw OK OK (widened values) throw -// 3. INT96 LTZ -> TIMESTAMP_NTZ throw throw throw ? ? +// 3. INT96 LTZ -> TIMESTAMP_NTZ throw throw throw OK (silent, possible wall-clock diff) throw // 4. Decimal(10,2) -> Decimal(5,0) throw throw throw ? ? // 5. INT32 -> INT64 w/ rowgroup filter throw throw OK ? ? // 6. STRING -> INT throw throw throw ? ? @@ -141,4 +141,47 @@ class ParquetSchemaMismatchSuite extends CometTestBase { } } } + + // Case 3: INT96 TimestampLTZ read as TimestampNTZ. Spark throws on all + // versions (SPARK-36182). INT96 carries no timezone info in the Parquet + // schema, so native_datafusion cannot detect the LTZ -> NTZ mismatch and + // silently reads (possibly with a wrong wall-clock value). + // native_iceberg_compat throws via TypeUtil.convertErrorForTimestampNTZ + // (mirrors Spark's behavior). + test(s"int96 timestamp_ltz read as timestamp_ntz: ${CometConf.SCAN_NATIVE_DATAFUSION}") { + withMismatchedSchema(CometConf.SCAN_NATIVE_DATAFUSION) { path => + withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> "INT96") { + Seq(java.sql.Timestamp.valueOf("2020-01-01 00:00:00")) + .toDF("ts") + .write + .parquet(path) + } + spark.read.schema("ts timestamp_ntz").parquet(path) + } { df => + // native_datafusion succeeds silently: INT96 carries no timezone info so + // the LTZ -> NTZ mismatch is undetectable; result may have a wrong + // wall-clock value depending on the executor timezone. + val outcome = Try(df.collect()) + assert(outcome.isSuccess, s"unexpected failure: $outcome") + assert(outcome.get.length == 1) + } + } + + test(s"int96 timestamp_ltz read as timestamp_ntz: ${CometConf.SCAN_NATIVE_ICEBERG_COMPAT}") { + withMismatchedSchema(CometConf.SCAN_NATIVE_ICEBERG_COMPAT) { path => + withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> "INT96") { + Seq(java.sql.Timestamp.valueOf("2020-01-01 00:00:00")) + .toDF("ts") + .write + .parquet(path) + } + spark.read.schema("ts timestamp_ntz").parquet(path) + } { df => + // native_iceberg_compat throws SparkException via + // TypeUtil.convertErrorForTimestampNTZ; matches Spark's behavior. + intercept[SparkException] { + df.collect() + } + } + } } From 1aa00df9b31a6f057b3f86d2b4be03d4df08c5ca Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 25 Apr 2026 11:56:28 -0600 Subject: [PATCH 05/17] test: case 4 incompatible decimal precision/scale --- .../parquet/ParquetSchemaMismatchSuite.scala | 38 ++++++++++++++++++- 1 file changed, 37 insertions(+), 1 deletion(-) diff --git a/spark/src/test/scala/org/apache/comet/parquet/ParquetSchemaMismatchSuite.scala b/spark/src/test/scala/org/apache/comet/parquet/ParquetSchemaMismatchSuite.scala index 547c11f60e..e68e3ccd77 100644 --- a/spark/src/test/scala/org/apache/comet/parquet/ParquetSchemaMismatchSuite.scala +++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetSchemaMismatchSuite.scala @@ -49,7 +49,7 @@ import org.apache.comet.CometConf // 1. BINARY -> TIMESTAMP throw throw throw throw throw // 2. INT32 -> INT64 throw throw OK OK (widened values) throw // 3. INT96 LTZ -> TIMESTAMP_NTZ throw throw throw OK (silent, possible wall-clock diff) throw -// 4. Decimal(10,2) -> Decimal(5,0) throw throw throw ? ? +// 4. Decimal(10,2) -> Decimal(5,0) throw throw throw OK (reads, values unverified) throw // 5. INT32 -> INT64 w/ rowgroup filter throw throw OK ? ? // 6. STRING -> INT throw throw throw ? ? // 7. TIMESTAMP_NTZ -> ARRAY<...> throw throw throw ? ? @@ -184,4 +184,40 @@ class ParquetSchemaMismatchSuite extends CometTestBase { } } } + + // Case 4: Decimal(10,2) read as Decimal(5,0). Reading from a higher-precision + // decimal as a lower-precision decimal can lose data (123.45 cannot fit in + // decimal(5,0)). Spark throws on all versions (SPARK-34212). + test(s"decimal(10,2) read as decimal(5,0): ${CometConf.SCAN_NATIVE_DATAFUSION}") { + withMismatchedSchema(CometConf.SCAN_NATIVE_DATAFUSION) { path => + Seq(BigDecimal("123.45"), BigDecimal("67.89")) + .toDF("d") + .selectExpr("cast(d as decimal(10,2)) as d") + .write + .parquet(path) + spark.read.schema("d decimal(5,0)").parquet(path) + } { df => + // Pattern 3 (structural mismatch). Capture observed outcome. + val outcome = Try(df.collect()) + assert(outcome.isSuccess, s"unexpected failure: $outcome") + } + } + + test(s"decimal(10,2) read as decimal(5,0): ${CometConf.SCAN_NATIVE_ICEBERG_COMPAT}") { + withMismatchedSchema(CometConf.SCAN_NATIVE_ICEBERG_COMPAT) { path => + Seq(BigDecimal("123.45"), BigDecimal("67.89")) + .toDF("d") + .selectExpr("cast(d as decimal(10,2)) as d") + .write + .parquet(path) + spark.read.schema("d decimal(5,0)").parquet(path) + } { df => + // native_iceberg_compat throws SparkException via + // SchemaColumnConvertNotSupportedException (INT64 cannot convert to decimal(5,0)); + // matches Spark's reference behavior. + intercept[SparkException] { + df.collect() + } + } + } } From 1d139ee4d9e7e503536307e3b952ff9f5fe5ca2c Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 25 Apr 2026 11:58:45 -0600 Subject: [PATCH 06/17] test: case 5 int32 as int64 with row group filter --- .../parquet/ParquetSchemaMismatchSuite.scala | 35 ++++++++++++++++++- 1 file changed, 34 insertions(+), 1 deletion(-) diff --git a/spark/src/test/scala/org/apache/comet/parquet/ParquetSchemaMismatchSuite.scala b/spark/src/test/scala/org/apache/comet/parquet/ParquetSchemaMismatchSuite.scala index e68e3ccd77..8f974c151d 100644 --- a/spark/src/test/scala/org/apache/comet/parquet/ParquetSchemaMismatchSuite.scala +++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetSchemaMismatchSuite.scala @@ -50,7 +50,7 @@ import org.apache.comet.CometConf // 2. INT32 -> INT64 throw throw OK OK (widened values) throw // 3. INT96 LTZ -> TIMESTAMP_NTZ throw throw throw OK (silent, possible wall-clock diff) throw // 4. Decimal(10,2) -> Decimal(5,0) throw throw throw OK (reads, values unverified) throw -// 5. INT32 -> INT64 w/ rowgroup filter throw throw OK ? ? +// 5. INT32 -> INT64 w/ rowgroup filter throw throw OK OK (1 row, no overflow) throw // 6. STRING -> INT throw throw throw ? ? // 7. TIMESTAMP_NTZ -> ARRAY<...> throw throw throw ? ? // C1. INT8 -> INT32 OK OK OK ? ? @@ -220,4 +220,37 @@ class ParquetSchemaMismatchSuite extends CometTestBase { } } } + + // Case 5: regression guard for row group skipping. Write INT32 values near + // INT32 max, read as INT64 with a filter whose constant exceeds INT32 max. + // If the scan treats the filter as INT32, row-group skipping might overflow + // and skip rows that should match. + test(s"int32 read as int64 with row group filter: ${CometConf.SCAN_NATIVE_DATAFUSION}") { + withMismatchedSchema(CometConf.SCAN_NATIVE_DATAFUSION) { path => + Seq(Int.MaxValue - 2, Int.MaxValue - 1, Int.MaxValue).toDF("c").write.parquet(path) + spark.read + .schema("c bigint") + .parquet(path) + .filter(s"c > ${Int.MaxValue.toLong - 1L}") + } { df => + // Pattern 1: filter must not overflow when widened. + checkAnswer(df, Seq(Int.MaxValue.toLong).map(org.apache.spark.sql.Row(_))) + } + } + + test(s"int32 read as int64 with row group filter: ${CometConf.SCAN_NATIVE_ICEBERG_COMPAT}") { + withMismatchedSchema(CometConf.SCAN_NATIVE_ICEBERG_COMPAT) { path => + Seq(Int.MaxValue - 2, Int.MaxValue - 1, Int.MaxValue).toDF("c").write.parquet(path) + spark.read + .schema("c bigint") + .parquet(path) + .filter(s"c > ${Int.MaxValue.toLong - 1L}") + } { df => + // native_iceberg_compat rejects INT32->INT64 widening (Case 2). The filter + // never runs. + intercept[SparkException] { + df.collect() + } + } + } } From 69b1457d84180be2200ae046693992a5c9cfa865 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 25 Apr 2026 12:01:45 -0600 Subject: [PATCH 07/17] test: case 6 string read as int --- .../parquet/ParquetSchemaMismatchSuite.scala | 34 ++++++++++++++++++- 1 file changed, 33 insertions(+), 1 deletion(-) diff --git a/spark/src/test/scala/org/apache/comet/parquet/ParquetSchemaMismatchSuite.scala b/spark/src/test/scala/org/apache/comet/parquet/ParquetSchemaMismatchSuite.scala index 8f974c151d..c00c93c53c 100644 --- a/spark/src/test/scala/org/apache/comet/parquet/ParquetSchemaMismatchSuite.scala +++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetSchemaMismatchSuite.scala @@ -51,7 +51,7 @@ import org.apache.comet.CometConf // 3. INT96 LTZ -> TIMESTAMP_NTZ throw throw throw OK (silent, possible wall-clock diff) throw // 4. Decimal(10,2) -> Decimal(5,0) throw throw throw OK (reads, values unverified) throw // 5. INT32 -> INT64 w/ rowgroup filter throw throw OK OK (1 row, no overflow) throw -// 6. STRING -> INT throw throw throw ? ? +// 6. STRING -> INT throw throw throw OK (garbage values) throw // 7. TIMESTAMP_NTZ -> ARRAY<...> throw throw throw ? ? // C1. INT8 -> INT32 OK OK OK ? ? // C2. FLOAT -> DOUBLE OK OK OK ? ? @@ -253,4 +253,36 @@ class ParquetSchemaMismatchSuite extends CometTestBase { } } } + + // Case 6: STRING column read as INT. Spark's vectorized reader throws on all + // versions because BINARY (string) cannot be converted to INT32 at the + // physical Parquet level. + // native_datafusion: silently succeeds; reinterprets the BINARY bytes of each + // string as raw INT32 bytes (garbage values). Does NOT throw. + // native_iceberg_compat: throws SparkException (aligns with Spark). + test(s"string read as int: ${CometConf.SCAN_NATIVE_DATAFUSION}") { + withMismatchedSchema(CometConf.SCAN_NATIVE_DATAFUSION) { path => + Seq("a", "b", "c").toDF("c").write.parquet(path) + spark.read.schema("c int").parquet(path) + } { df => + // Pattern 2 (silent garbage): native_datafusion reinterprets string BINARY + // bytes as INT32 without throwing. Values are meaningless but the read + // succeeds with the expected row count. + val outcome = Try(df.collect()) + assert(outcome.isSuccess, s"unexpected failure: $outcome") + assert(outcome.get.length == 3) + } + } + + test(s"string read as int: ${CometConf.SCAN_NATIVE_ICEBERG_COMPAT}") { + withMismatchedSchema(CometConf.SCAN_NATIVE_ICEBERG_COMPAT) { path => + Seq("a", "b", "c").toDF("c").write.parquet(path) + spark.read.schema("c int").parquet(path) + } { df => + val outcome = Try(df.collect()) + assert( + outcome.isFailure && outcome.failed.get.isInstanceOf[SparkException], + s"expected SparkException, got: $outcome") + } + } } From b012e99a6a9df51e661907a84ed63107e37ac661 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 25 Apr 2026 12:17:09 -0600 Subject: [PATCH 08/17] test: case 7 timestamp_ntz read as array --- .../parquet/ParquetSchemaMismatchSuite.scala | 33 +++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/spark/src/test/scala/org/apache/comet/parquet/ParquetSchemaMismatchSuite.scala b/spark/src/test/scala/org/apache/comet/parquet/ParquetSchemaMismatchSuite.scala index c00c93c53c..1108fa8775 100644 --- a/spark/src/test/scala/org/apache/comet/parquet/ParquetSchemaMismatchSuite.scala +++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetSchemaMismatchSuite.scala @@ -285,4 +285,37 @@ class ParquetSchemaMismatchSuite extends CometTestBase { s"expected SparkException, got: $outcome") } } + + // Case 7: TIMESTAMP_NTZ column read as ARRAY. Spark throws on all + // versions (SPARK-45604) because the requested type is a list/group but the + // physical Parquet column is a scalar. + test(s"timestamp_ntz read as array: ${CometConf.SCAN_NATIVE_DATAFUSION}") { + withMismatchedSchema(CometConf.SCAN_NATIVE_DATAFUSION) { path => + Seq(java.time.LocalDateTime.parse("2020-01-01T00:00:00")) + .toDF("ts") + .write + .parquet(path) + spark.read.schema("ts array").parquet(path) + } { df => + val outcome = Try(df.collect()) + assert( + outcome.isFailure && outcome.failed.get.isInstanceOf[SparkException], + s"expected SparkException, got: $outcome") + } + } + + test(s"timestamp_ntz read as array: ${CometConf.SCAN_NATIVE_ICEBERG_COMPAT}") { + withMismatchedSchema(CometConf.SCAN_NATIVE_ICEBERG_COMPAT) { path => + Seq(java.time.LocalDateTime.parse("2020-01-01T00:00:00")) + .toDF("ts") + .write + .parquet(path) + spark.read.schema("ts array").parquet(path) + } { df => + val outcome = Try(df.collect()) + assert( + outcome.isFailure && outcome.failed.get.isInstanceOf[SparkException], + s"expected SparkException, got: $outcome") + } + } } From 318cac7f8c7a3471101b37bc96b8389b985cb38b Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 25 Apr 2026 12:17:27 -0600 Subject: [PATCH 09/17] test: update matrix row 7 with confirmed throw outcomes --- .../org/apache/comet/parquet/ParquetSchemaMismatchSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spark/src/test/scala/org/apache/comet/parquet/ParquetSchemaMismatchSuite.scala b/spark/src/test/scala/org/apache/comet/parquet/ParquetSchemaMismatchSuite.scala index 1108fa8775..e4fee5ea1c 100644 --- a/spark/src/test/scala/org/apache/comet/parquet/ParquetSchemaMismatchSuite.scala +++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetSchemaMismatchSuite.scala @@ -52,7 +52,7 @@ import org.apache.comet.CometConf // 4. Decimal(10,2) -> Decimal(5,0) throw throw throw OK (reads, values unverified) throw // 5. INT32 -> INT64 w/ rowgroup filter throw throw OK OK (1 row, no overflow) throw // 6. STRING -> INT throw throw throw OK (garbage values) throw -// 7. TIMESTAMP_NTZ -> ARRAY<...> throw throw throw ? ? +// 7. TIMESTAMP_NTZ -> ARRAY<...> throw throw throw throw throw // C1. INT8 -> INT32 OK OK OK ? ? // C2. FLOAT -> DOUBLE OK OK OK ? ? class ParquetSchemaMismatchSuite extends CometTestBase { From 60a0ffa3106a4714f915821671ed39709b3c573c Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 25 Apr 2026 12:20:19 -0600 Subject: [PATCH 10/17] test: control case int8 read as int32 --- .../parquet/ParquetSchemaMismatchSuite.scala | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/spark/src/test/scala/org/apache/comet/parquet/ParquetSchemaMismatchSuite.scala b/spark/src/test/scala/org/apache/comet/parquet/ParquetSchemaMismatchSuite.scala index e4fee5ea1c..50d985abc0 100644 --- a/spark/src/test/scala/org/apache/comet/parquet/ParquetSchemaMismatchSuite.scala +++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetSchemaMismatchSuite.scala @@ -53,7 +53,7 @@ import org.apache.comet.CometConf // 5. INT32 -> INT64 w/ rowgroup filter throw throw OK OK (1 row, no overflow) throw // 6. STRING -> INT throw throw throw OK (garbage values) throw // 7. TIMESTAMP_NTZ -> ARRAY<...> throw throw throw throw throw -// C1. INT8 -> INT32 OK OK OK ? ? +// C1. INT8 -> INT32 OK OK OK OK (widened values) OK (widened values) // C2. FLOAT -> DOUBLE OK OK OK ? ? class ParquetSchemaMismatchSuite extends CometTestBase { import testImplicits._ @@ -318,4 +318,17 @@ class ParquetSchemaMismatchSuite extends CometTestBase { s"expected SparkException, got: $outcome") } } + + // Control C1: INT8 -> INT32 widening. Allowed by Spark on all versions and + // expected to succeed in both Comet scan impls. + scanImpls.foreach { scanImpl => + test(s"int8 read as int32 (control): $scanImpl") { + withMismatchedSchema(scanImpl) { path => + Seq(1.toByte, 2.toByte, 3.toByte).toDF("c").write.parquet(path) + spark.read.schema("c int").parquet(path) + } { df => + checkAnswer(df, Seq(1, 2, 3).map(org.apache.spark.sql.Row(_))) + } + } + } } From eabfdef4d628f45974b2dd74ca9885e442b7cdb5 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 25 Apr 2026 12:23:07 -0600 Subject: [PATCH 11/17] test: control case float read as double --- .../parquet/ParquetSchemaMismatchSuite.scala | 31 ++++++++++++++++++- 1 file changed, 30 insertions(+), 1 deletion(-) diff --git a/spark/src/test/scala/org/apache/comet/parquet/ParquetSchemaMismatchSuite.scala b/spark/src/test/scala/org/apache/comet/parquet/ParquetSchemaMismatchSuite.scala index 50d985abc0..2f369fb3f5 100644 --- a/spark/src/test/scala/org/apache/comet/parquet/ParquetSchemaMismatchSuite.scala +++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetSchemaMismatchSuite.scala @@ -54,7 +54,7 @@ import org.apache.comet.CometConf // 6. STRING -> INT throw throw throw OK (garbage values) throw // 7. TIMESTAMP_NTZ -> ARRAY<...> throw throw throw throw throw // C1. INT8 -> INT32 OK OK OK OK (widened values) OK (widened values) -// C2. FLOAT -> DOUBLE OK OK OK ? ? +// C2. FLOAT -> DOUBLE OK OK OK OK (widened values) throw (diverges from Spark) class ParquetSchemaMismatchSuite extends CometTestBase { import testImplicits._ @@ -331,4 +331,33 @@ class ParquetSchemaMismatchSuite extends CometTestBase { } } } + + // Control C2: FLOAT -> DOUBLE widening. Allowed by Spark on all versions. + // native_datafusion: succeeds with widened values (Pattern 1). + // native_iceberg_compat: throws SparkException via TypeUtil.checkParquetType + // (SchemaColumnConvertNotSupportedException); does not support FLOAT->DOUBLE + // widening. This is a divergence from Spark's reference behavior. + test(s"float read as double (control): ${CometConf.SCAN_NATIVE_DATAFUSION}") { + withMismatchedSchema(CometConf.SCAN_NATIVE_DATAFUSION) { path => + Seq(1.0f, 2.0f, 3.0f).toDF("c").write.parquet(path) + spark.read.schema("c double").parquet(path) + } { df => + // Float -> Double is exact for these magnitudes. + checkAnswer(df, Seq(1.0d, 2.0d, 3.0d).map(org.apache.spark.sql.Row(_))) + } + } + + test(s"float read as double (control): ${CometConf.SCAN_NATIVE_ICEBERG_COMPAT}") { + withMismatchedSchema(CometConf.SCAN_NATIVE_ICEBERG_COMPAT) { path => + Seq(1.0f, 2.0f, 3.0f).toDF("c").write.parquet(path) + spark.read.schema("c double").parquet(path) + } { df => + // native_iceberg_compat rejects FLOAT->DOUBLE widening via + // TypeUtil.checkParquetType (SchemaColumnConvertNotSupportedException). + // This diverges from Spark which allows this widening on all versions. + intercept[SparkException] { + df.collect() + } + } + } } From cd8e68be4138d0105b6d036e8261fd0655f7945d Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 25 Apr 2026 12:42:14 -0600 Subject: [PATCH 12/17] test: handle Spark 4.0 type widening in iceberg-compat assertions On Spark 4.0, COMET_SCHEMA_EVOLUTION_ENABLED defaults to true and TypeUtil.checkParquetType has an isSpark40Plus guard, so four native_iceberg_compat tests that previously expected SparkException now succeed with widened values. Make each assertion version-conditional using CometSparkSessionExtensions.isSpark40Plus and update the behavior matrix accordingly. --- .../parquet/ParquetSchemaMismatchSuite.scala | 83 +++++++++++++------ 1 file changed, 56 insertions(+), 27 deletions(-) diff --git a/spark/src/test/scala/org/apache/comet/parquet/ParquetSchemaMismatchSuite.scala b/spark/src/test/scala/org/apache/comet/parquet/ParquetSchemaMismatchSuite.scala index 2f369fb3f5..70038936ab 100644 --- a/spark/src/test/scala/org/apache/comet/parquet/ParquetSchemaMismatchSuite.scala +++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetSchemaMismatchSuite.scala @@ -29,6 +29,7 @@ import org.apache.spark.sql.{CometTestBase, DataFrame} import org.apache.spark.sql.internal.SQLConf import org.apache.comet.CometConf +import org.apache.comet.CometSparkSessionExtensions /** * Documents Comet's behavior for the Parquet read-schema/file-schema mismatch cases tracked in @@ -47,14 +48,14 @@ import org.apache.comet.CometConf // // Case Spark 3.4 3.5 4.0 Comet native_datafusion Comet native_iceberg_compat // 1. BINARY -> TIMESTAMP throw throw throw throw throw -// 2. INT32 -> INT64 throw throw OK OK (widened values) throw -// 3. INT96 LTZ -> TIMESTAMP_NTZ throw throw throw OK (silent, possible wall-clock diff) throw +// 2. INT32 -> INT64 throw throw OK OK (widened values) throw on 3.x / OK on 4.0 (COMET_SCHEMA_EVOLUTION_ENABLED defaults true) +// 3. INT96 LTZ -> TIMESTAMP_NTZ throw throw throw OK (silent, possible wall-clock diff) throw on 3.x / OK on 4.0 (isSpark40Plus guard in TypeUtil) // 4. Decimal(10,2) -> Decimal(5,0) throw throw throw OK (reads, values unverified) throw -// 5. INT32 -> INT64 w/ rowgroup filter throw throw OK OK (1 row, no overflow) throw +// 5. INT32 -> INT64 w/ rowgroup filter throw throw OK OK (1 row, no overflow) throw on 3.x / OK on 4.0 (COMET_SCHEMA_EVOLUTION_ENABLED defaults true) // 6. STRING -> INT throw throw throw OK (garbage values) throw // 7. TIMESTAMP_NTZ -> ARRAY<...> throw throw throw throw throw // C1. INT8 -> INT32 OK OK OK OK (widened values) OK (widened values) -// C2. FLOAT -> DOUBLE OK OK OK OK (widened values) throw (diverges from Spark) +// C2. FLOAT -> DOUBLE OK OK OK OK (widened values) throw on 3.x / OK on 4.0 (COMET_SCHEMA_EVOLUTION_ENABLED defaults true) class ParquetSchemaMismatchSuite extends CometTestBase { import testImplicits._ @@ -117,8 +118,9 @@ class ParquetSchemaMismatchSuite extends CometTestBase { // Case 2: INT32 read as INT64 (value-preserving widening). Spark 3.4/3.5 // throw SparkException; Spark 4.0 allows widening. // native_datafusion: succeeds with widened values (Pattern 1). - // native_iceberg_compat: throws SparkException (SchemaColumnConvertNotSupportedException - // from TypeUtil.checkParquetType); does not support INT32->INT64 widening. + // native_iceberg_compat: throws SparkException on Spark 3.x (SchemaColumnConvertNotSupportedException + // from TypeUtil.checkParquetType); on Spark 4.0 COMET_SCHEMA_EVOLUTION_ENABLED defaults to true + // so the widening is allowed and succeeds with widened values. test(s"int32 read as int64: ${CometConf.SCAN_NATIVE_DATAFUSION}") { withMismatchedSchema(CometConf.SCAN_NATIVE_DATAFUSION) { path => Seq(1, 2, 3).toDF("c").write.parquet(path) @@ -134,10 +136,16 @@ class ParquetSchemaMismatchSuite extends CometTestBase { Seq(1, 2, 3).toDF("c").write.parquet(path) spark.read.schema("c bigint").parquet(path) } { df => - // Pattern 3 (throw): native_iceberg_compat rejects INT32->INT64 widening - // via TypeUtil.checkParquetType (SchemaColumnConvertNotSupportedException). - intercept[SparkException] { - df.collect() + // On Spark 3.x: native_iceberg_compat rejects INT32->INT64 widening via + // TypeUtil.checkParquetType (SchemaColumnConvertNotSupportedException). + // On Spark 4.0: COMET_SCHEMA_EVOLUTION_ENABLED defaults to true so widening + // is allowed and succeeds with correctly widened values. + if (CometSparkSessionExtensions.isSpark40Plus) { + checkAnswer(df, Seq(1L, 2L, 3L).map(org.apache.spark.sql.Row(_))) + } else { + intercept[SparkException] { + df.collect() + } } } } @@ -146,8 +154,9 @@ class ParquetSchemaMismatchSuite extends CometTestBase { // versions (SPARK-36182). INT96 carries no timezone info in the Parquet // schema, so native_datafusion cannot detect the LTZ -> NTZ mismatch and // silently reads (possibly with a wrong wall-clock value). - // native_iceberg_compat throws via TypeUtil.convertErrorForTimestampNTZ - // (mirrors Spark's behavior). + // native_iceberg_compat throws via TypeUtil.convertErrorForTimestampNTZ on + // Spark 3.x (mirrors Spark's behavior). On Spark 4.0, TypeUtil.checkParquetType + // has an isSpark40Plus guard that bypasses the INT96 check, so the read succeeds. test(s"int96 timestamp_ltz read as timestamp_ntz: ${CometConf.SCAN_NATIVE_DATAFUSION}") { withMismatchedSchema(CometConf.SCAN_NATIVE_DATAFUSION) { path => withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> "INT96") { @@ -177,10 +186,17 @@ class ParquetSchemaMismatchSuite extends CometTestBase { } spark.read.schema("ts timestamp_ntz").parquet(path) } { df => - // native_iceberg_compat throws SparkException via + // On Spark 3.x: native_iceberg_compat throws SparkException via // TypeUtil.convertErrorForTimestampNTZ; matches Spark's behavior. - intercept[SparkException] { - df.collect() + // On Spark 4.0: isSpark40Plus guard in TypeUtil.checkParquetType bypasses + // the INT96 check so the read succeeds silently (row count verified only; + // the wall-clock value may differ due to LTZ->NTZ reinterpretation). + if (CometSparkSessionExtensions.isSpark40Plus) { + assert(df.collect().length == 1) + } else { + intercept[SparkException] { + df.collect() + } } } } @@ -246,10 +262,16 @@ class ParquetSchemaMismatchSuite extends CometTestBase { .parquet(path) .filter(s"c > ${Int.MaxValue.toLong - 1L}") } { df => - // native_iceberg_compat rejects INT32->INT64 widening (Case 2). The filter - // never runs. - intercept[SparkException] { - df.collect() + // On Spark 3.x: native_iceberg_compat rejects INT32->INT64 widening (Case 2) + // so the filter never runs and a SparkException is thrown. + // On Spark 4.0: COMET_SCHEMA_EVOLUTION_ENABLED defaults to true so widening + // is allowed; the filter runs correctly without overflow and returns 1 row. + if (CometSparkSessionExtensions.isSpark40Plus) { + checkAnswer(df, Seq(Int.MaxValue.toLong).map(org.apache.spark.sql.Row(_))) + } else { + intercept[SparkException] { + df.collect() + } } } } @@ -334,9 +356,10 @@ class ParquetSchemaMismatchSuite extends CometTestBase { // Control C2: FLOAT -> DOUBLE widening. Allowed by Spark on all versions. // native_datafusion: succeeds with widened values (Pattern 1). - // native_iceberg_compat: throws SparkException via TypeUtil.checkParquetType - // (SchemaColumnConvertNotSupportedException); does not support FLOAT->DOUBLE - // widening. This is a divergence from Spark's reference behavior. + // native_iceberg_compat on Spark 3.x: throws SparkException via TypeUtil.checkParquetType + // (SchemaColumnConvertNotSupportedException); diverges from Spark's reference behavior. + // native_iceberg_compat on Spark 4.0: COMET_SCHEMA_EVOLUTION_ENABLED defaults to true + // so the widening is allowed and succeeds with widened values (matches Spark). test(s"float read as double (control): ${CometConf.SCAN_NATIVE_DATAFUSION}") { withMismatchedSchema(CometConf.SCAN_NATIVE_DATAFUSION) { path => Seq(1.0f, 2.0f, 3.0f).toDF("c").write.parquet(path) @@ -352,11 +375,17 @@ class ParquetSchemaMismatchSuite extends CometTestBase { Seq(1.0f, 2.0f, 3.0f).toDF("c").write.parquet(path) spark.read.schema("c double").parquet(path) } { df => - // native_iceberg_compat rejects FLOAT->DOUBLE widening via - // TypeUtil.checkParquetType (SchemaColumnConvertNotSupportedException). - // This diverges from Spark which allows this widening on all versions. - intercept[SparkException] { - df.collect() + // On Spark 3.x: native_iceberg_compat rejects FLOAT->DOUBLE widening via + // TypeUtil.checkParquetType (SchemaColumnConvertNotSupportedException); + // diverges from Spark which allows this widening on all versions. + // On Spark 4.0: COMET_SCHEMA_EVOLUTION_ENABLED defaults to true so widening + // is allowed and succeeds with correctly widened values (matches Spark). + if (CometSparkSessionExtensions.isSpark40Plus) { + checkAnswer(df, Seq(1.0d, 2.0d, 3.0d).map(org.apache.spark.sql.Row(_))) + } else { + intercept[SparkException] { + df.collect() + } } } } From dbdcca6f2c95668a571597db86cd1cc30024a614 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 28 Apr 2026 08:23:08 -0600 Subject: [PATCH 13/17] test: align native_datafusion schema-mismatch assertions with #4090, #4091 Cases 4 (Decimal(10,2)->Decimal(5,0)) and 6 (STRING->INT) now throw SparkException on native_datafusion after the schema adapter rejection fixes landed on main. Update assertions and the behavior matrix. --- .../parquet/ParquetSchemaMismatchSuite.scala | 25 +++++++++---------- 1 file changed, 12 insertions(+), 13 deletions(-) diff --git a/spark/src/test/scala/org/apache/comet/parquet/ParquetSchemaMismatchSuite.scala b/spark/src/test/scala/org/apache/comet/parquet/ParquetSchemaMismatchSuite.scala index 70038936ab..9263e951dc 100644 --- a/spark/src/test/scala/org/apache/comet/parquet/ParquetSchemaMismatchSuite.scala +++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetSchemaMismatchSuite.scala @@ -50,9 +50,9 @@ import org.apache.comet.CometSparkSessionExtensions // 1. BINARY -> TIMESTAMP throw throw throw throw throw // 2. INT32 -> INT64 throw throw OK OK (widened values) throw on 3.x / OK on 4.0 (COMET_SCHEMA_EVOLUTION_ENABLED defaults true) // 3. INT96 LTZ -> TIMESTAMP_NTZ throw throw throw OK (silent, possible wall-clock diff) throw on 3.x / OK on 4.0 (isSpark40Plus guard in TypeUtil) -// 4. Decimal(10,2) -> Decimal(5,0) throw throw throw OK (reads, values unverified) throw +// 4. Decimal(10,2) -> Decimal(5,0) throw throw throw throw throw // 5. INT32 -> INT64 w/ rowgroup filter throw throw OK OK (1 row, no overflow) throw on 3.x / OK on 4.0 (COMET_SCHEMA_EVOLUTION_ENABLED defaults true) -// 6. STRING -> INT throw throw throw OK (garbage values) throw +// 6. STRING -> INT throw throw throw throw throw // 7. TIMESTAMP_NTZ -> ARRAY<...> throw throw throw throw throw // C1. INT8 -> INT32 OK OK OK OK (widened values) OK (widened values) // C2. FLOAT -> DOUBLE OK OK OK OK (widened values) throw on 3.x / OK on 4.0 (COMET_SCHEMA_EVOLUTION_ENABLED defaults true) @@ -204,6 +204,8 @@ class ParquetSchemaMismatchSuite extends CometTestBase { // Case 4: Decimal(10,2) read as Decimal(5,0). Reading from a higher-precision // decimal as a lower-precision decimal can lose data (123.45 cannot fit in // decimal(5,0)). Spark throws on all versions (SPARK-34212). + // native_datafusion rejects this lossy scale-narrowing in the schema adapter + // (issue #4089) and throws CometNativeException, surfacing as SparkException. test(s"decimal(10,2) read as decimal(5,0): ${CometConf.SCAN_NATIVE_DATAFUSION}") { withMismatchedSchema(CometConf.SCAN_NATIVE_DATAFUSION) { path => Seq(BigDecimal("123.45"), BigDecimal("67.89")) @@ -213,9 +215,9 @@ class ParquetSchemaMismatchSuite extends CometTestBase { .parquet(path) spark.read.schema("d decimal(5,0)").parquet(path) } { df => - // Pattern 3 (structural mismatch). Capture observed outcome. - val outcome = Try(df.collect()) - assert(outcome.isSuccess, s"unexpected failure: $outcome") + intercept[SparkException] { + df.collect() + } } } @@ -279,20 +281,17 @@ class ParquetSchemaMismatchSuite extends CometTestBase { // Case 6: STRING column read as INT. Spark's vectorized reader throws on all // versions because BINARY (string) cannot be converted to INT32 at the // physical Parquet level. - // native_datafusion: silently succeeds; reinterprets the BINARY bytes of each - // string as raw INT32 bytes (garbage values). Does NOT throw. + // native_datafusion rejects string/binary read as numeric in the schema adapter + // (PR #4091) and throws CometNativeException, surfacing as SparkException. // native_iceberg_compat: throws SparkException (aligns with Spark). test(s"string read as int: ${CometConf.SCAN_NATIVE_DATAFUSION}") { withMismatchedSchema(CometConf.SCAN_NATIVE_DATAFUSION) { path => Seq("a", "b", "c").toDF("c").write.parquet(path) spark.read.schema("c int").parquet(path) } { df => - // Pattern 2 (silent garbage): native_datafusion reinterprets string BINARY - // bytes as INT32 without throwing. Values are meaningless but the read - // succeeds with the expected row count. - val outcome = Try(df.collect()) - assert(outcome.isSuccess, s"unexpected failure: $outcome") - assert(outcome.get.length == 3) + intercept[SparkException] { + df.collect() + } } } From 6c5cd1b8a58c376fbf988d7106c9e3ea91901414 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 29 Apr 2026 10:05:41 -0600 Subject: [PATCH 14/17] ci: add ParquetSchemaMismatchSuite to Linux and macOS workflows Co-Authored-By: Claude Sonnet 4.6 --- .github/workflows/pr_build_linux.yml | 1 + .github/workflows/pr_build_macos.yml | 1 + 2 files changed, 2 insertions(+) diff --git a/.github/workflows/pr_build_linux.yml b/.github/workflows/pr_build_linux.yml index a8d925b1c2..7a11d06b15 100644 --- a/.github/workflows/pr_build_linux.yml +++ b/.github/workflows/pr_build_linux.yml @@ -323,6 +323,7 @@ jobs: org.apache.comet.parquet.ParquetReadV1Suite org.apache.comet.parquet.ParquetReadV2Suite org.apache.comet.parquet.ParquetReadFromFakeHadoopFsSuite + org.apache.comet.parquet.ParquetSchemaMismatchSuite org.apache.spark.sql.comet.ParquetDatetimeRebaseV1Suite org.apache.spark.sql.comet.ParquetDatetimeRebaseV2Suite org.apache.spark.sql.comet.ParquetEncryptionITCase diff --git a/.github/workflows/pr_build_macos.yml b/.github/workflows/pr_build_macos.yml index d41bef47fe..8ac58c59a5 100644 --- a/.github/workflows/pr_build_macos.yml +++ b/.github/workflows/pr_build_macos.yml @@ -170,6 +170,7 @@ jobs: org.apache.comet.parquet.ParquetReadV1Suite org.apache.comet.parquet.ParquetReadV2Suite org.apache.comet.parquet.ParquetReadFromFakeHadoopFsSuite + org.apache.comet.parquet.ParquetSchemaMismatchSuite org.apache.spark.sql.comet.ParquetDatetimeRebaseV1Suite org.apache.spark.sql.comet.ParquetDatetimeRebaseV2Suite org.apache.spark.sql.comet.ParquetEncryptionITCase From 972158d72a375f3f5ec35a062f8af744b5cb59ae Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 29 Apr 2026 10:09:05 -0600 Subject: [PATCH 15/17] docs: simplify ParquetSchemaMismatchSuite class-level documentation Replace verbose class-level scaladoc and behavior matrix with a concise description. Per-test comments documenting Comet vs Spark divergence are retained. Co-Authored-By: Claude Opus 4.6 --- .../parquet/ParquetSchemaMismatchSuite.scala | 30 +------------------ 1 file changed, 1 insertion(+), 29 deletions(-) diff --git a/spark/src/test/scala/org/apache/comet/parquet/ParquetSchemaMismatchSuite.scala b/spark/src/test/scala/org/apache/comet/parquet/ParquetSchemaMismatchSuite.scala index 9263e951dc..863764e090 100644 --- a/spark/src/test/scala/org/apache/comet/parquet/ParquetSchemaMismatchSuite.scala +++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetSchemaMismatchSuite.scala @@ -32,40 +32,12 @@ import org.apache.comet.CometConf import org.apache.comet.CometSparkSessionExtensions /** - * Documents Comet's behavior for the Parquet read-schema/file-schema mismatch cases tracked in + * Tests for Parquet read-schema/file-schema mismatch cases tracked in * https://github.com/apache/datafusion-comet/issues/3720. - * - * Each test exercises one case under one of the two Comet scan implementations - * (`native_datafusion`, `native_iceberg_compat`). Assertions encode Comet's actual current - * behavior. Spark's reference behavior is recorded in the per-case comments and in the matrix - * below; assertions do not run Spark in isolation. - * - * If a Comet fix lands that aligns one of these cases with Spark, update the affected test(s) and - * the matrix below in the same PR. */ -// Behavior matrix (Spark reference behavior; Comet behavior is asserted by each -// test). "OK" = read succeeds. "throw" = SparkException at runtime. -// -// Case Spark 3.4 3.5 4.0 Comet native_datafusion Comet native_iceberg_compat -// 1. BINARY -> TIMESTAMP throw throw throw throw throw -// 2. INT32 -> INT64 throw throw OK OK (widened values) throw on 3.x / OK on 4.0 (COMET_SCHEMA_EVOLUTION_ENABLED defaults true) -// 3. INT96 LTZ -> TIMESTAMP_NTZ throw throw throw OK (silent, possible wall-clock diff) throw on 3.x / OK on 4.0 (isSpark40Plus guard in TypeUtil) -// 4. Decimal(10,2) -> Decimal(5,0) throw throw throw throw throw -// 5. INT32 -> INT64 w/ rowgroup filter throw throw OK OK (1 row, no overflow) throw on 3.x / OK on 4.0 (COMET_SCHEMA_EVOLUTION_ENABLED defaults true) -// 6. STRING -> INT throw throw throw throw throw -// 7. TIMESTAMP_NTZ -> ARRAY<...> throw throw throw throw throw -// C1. INT8 -> INT32 OK OK OK OK (widened values) OK (widened values) -// C2. FLOAT -> DOUBLE OK OK OK OK (widened values) throw on 3.x / OK on 4.0 (COMET_SCHEMA_EVOLUTION_ENABLED defaults true) class ParquetSchemaMismatchSuite extends CometTestBase { import testImplicits._ - /** - * Force a specific Comet scan implementation, force V1 datasource (both native_datafusion and - * native_iceberg_compat are V1-only), then run the given block in a fresh temp directory. The - * block writes Parquet under `path`, builds a DataFrame with a mismatched schema, and runs - * assertions inside `check`. The temp directory (and its files) is present for the entire - * duration of `body`, so `collect()` and other actions may be called safely inside `check`. - */ private def withMismatchedSchema(scanImpl: String)(body: String => DataFrame)( check: DataFrame => Unit): Unit = { withSQLConf( From 751af1eaad932a971093a123e7c4cf0d5b7a85c4 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 29 Apr 2026 10:20:00 -0600 Subject: [PATCH 16/17] fix: fall back to Spark for TimestampType/TimestampNTZType schema mismatch (#3720) The native_datafusion scan silently read INT96 TimestampLTZ columns as TimestampNTZ, potentially returning incorrect wall-clock values. Add a check in CometNativeScan.isSupported that detects TimestampType <-> TimestampNTZType mismatches between the file and read schemas and falls back to Spark, which throws the appropriate error. Co-Authored-By: Claude Opus 4.6 --- .../user-guide/latest/compatibility/scans.md | 3 +++ .../serde/operator/CometNativeScan.scala | 22 +++++++++++++++++++ .../parquet/ParquetSchemaMismatchSuite.scala | 17 +++++++------- 3 files changed, 33 insertions(+), 9 deletions(-) diff --git a/docs/source/user-guide/latest/compatibility/scans.md b/docs/source/user-guide/latest/compatibility/scans.md index 27ed20c19e..2560436d7e 100644 --- a/docs/source/user-guide/latest/compatibility/scans.md +++ b/docs/source/user-guide/latest/compatibility/scans.md @@ -71,6 +71,9 @@ requires `spark.comet.exec.enabled=true` because the scan node must be wrapped b - Duplicate field names in case-insensitive mode (e.g., a Parquet file with both `B` and `b` columns) are detected at read time and raise a `SparkRuntimeException` with error class `_LEGACY_ERROR_TEMP_2093`, matching Spark's behavior. +- When reading a Parquet file where the file schema has `TimestampType` (LTZ) but the read schema requests + `TimestampNTZType` (or vice versa), the `native_datafusion` scan falls back to Spark. + See [issue #3720](https://github.com/apache/datafusion-comet/issues/3720) for more details. ## `native_iceberg_compat` Limitations diff --git a/spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala b/spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala index da7f24183b..7cc4b44dca 100644 --- a/spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala +++ b/spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala @@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns.getExistenceDefa import org.apache.spark.sql.comet.{CometNativeExec, CometNativeScanExec, CometScanExec} import org.apache.spark.sql.execution.{FileSourceScanExec, InSubqueryExec, SubqueryAdaptiveBroadcastExec} import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.{TimestampNTZType, TimestampType} import org.apache.comet.{CometConf, ConfigEntry} import org.apache.comet.CometConf.COMET_EXEC_ENABLED @@ -78,6 +79,27 @@ object CometNativeScan extends CometOperatorSerde[CometScanExec] with Logging { withInfo(scanExec, "Full native scan disabled because ignoreMissingFiles enabled") } + // INT96 timestamps lose their LTZ/NTZ distinction once DataFusion reads them as + // Arrow Timestamp(Microsecond, None). Detect TimestampType <-> TimestampNTZType + // mismatches between the file schema and the read schema and fall back to Spark, + // which will throw the appropriate error (see issue #3720). + val dataSchema = scanExec.relation.dataSchema + scanExec.requiredSchema.fields.foreach { reqField => + if (dataSchema.fieldNames.contains(reqField.name)) { + val dataType = dataSchema(reqField.name).dataType + (dataType, reqField.dataType) match { + case (_: TimestampType, _: TimestampNTZType) | + (_: TimestampNTZType, _: TimestampType) => + withInfo( + scanExec, + s"Native DataFusion scan does not support reading column '${reqField.name}' " + + s"as ${reqField.dataType.simpleString} when the file schema type is " + + s"${dataType.simpleString}") + case _ => + } + } + } + // the scan is supported if no fallback reasons were added to the node !hasExplainInfo(scanExec) } diff --git a/spark/src/test/scala/org/apache/comet/parquet/ParquetSchemaMismatchSuite.scala b/spark/src/test/scala/org/apache/comet/parquet/ParquetSchemaMismatchSuite.scala index 863764e090..cbad72da07 100644 --- a/spark/src/test/scala/org/apache/comet/parquet/ParquetSchemaMismatchSuite.scala +++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetSchemaMismatchSuite.scala @@ -123,9 +123,9 @@ class ParquetSchemaMismatchSuite extends CometTestBase { } // Case 3: INT96 TimestampLTZ read as TimestampNTZ. Spark throws on all - // versions (SPARK-36182). INT96 carries no timezone info in the Parquet - // schema, so native_datafusion cannot detect the LTZ -> NTZ mismatch and - // silently reads (possibly with a wrong wall-clock value). + // versions (SPARK-36182). Comet's native_datafusion scan detects the + // TimestampType/TimestampNTZType mismatch between the file and read schemas + // at plan time and falls back to Spark, which throws SparkException. // native_iceberg_compat throws via TypeUtil.convertErrorForTimestampNTZ on // Spark 3.x (mirrors Spark's behavior). On Spark 4.0, TypeUtil.checkParquetType // has an isSpark40Plus guard that bypasses the INT96 check, so the read succeeds. @@ -139,12 +139,11 @@ class ParquetSchemaMismatchSuite extends CometTestBase { } spark.read.schema("ts timestamp_ntz").parquet(path) } { df => - // native_datafusion succeeds silently: INT96 carries no timezone info so - // the LTZ -> NTZ mismatch is undetectable; result may have a wrong - // wall-clock value depending on the executor timezone. - val outcome = Try(df.collect()) - assert(outcome.isSuccess, s"unexpected failure: $outcome") - assert(outcome.get.length == 1) + // native_datafusion falls back to Spark due to TimestampType/TimestampNTZType + // mismatch detection; Spark throws SparkException on all versions. + intercept[SparkException] { + df.collect() + } } } From 2c6bd00a919ba04e50406d356f631a7e9afa43c2 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 29 Apr 2026 10:35:01 -0600 Subject: [PATCH 17/17] fix: add timestampNTZSafetyCheck to fall back to Spark for TimestampNTZ scans (#3720) The previous approach tried to detect TimestampType/TimestampNTZType mismatches between dataSchema and requiredSchema on the JVM side, but when users provide an explicit read schema, both schemas are identical (the Parquet file's actual physical type is not reflected). Additionally, INT96 timestamps are coerced to Timestamp(Microsecond, None) by DataFusion, making them indistinguishable from TimestampNTZ at the Rust schema adapter level. Instead, add a safety check (spark.comet.scan.timestampNTZSafetyCheck, default true) that falls back to Spark for any native_datafusion scan with TimestampNTZ columns. This follows the same pattern as the existing unsignedSmallIntSafetyCheck for ShortType. Users whose data does not contain INT96 timestamps can set this to false. Co-Authored-By: Claude Opus 4.6 --- .../scala/org/apache/comet/CometConf.scala | 14 ++++++++++++ .../user-guide/latest/compatibility/scans.md | 9 +++++--- .../apache/comet/rules/CometScanRule.scala | 9 ++++++++ .../serde/operator/CometNativeScan.scala | 22 ------------------- .../parquet/ParquetSchemaMismatchSuite.scala | 10 ++++----- 5 files changed, 34 insertions(+), 30 deletions(-) diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index 53d67ec7f6..61d422ca50 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -785,6 +785,20 @@ object CometConf extends ShimCometConf { .booleanConf .createWithDefault(true) + val COMET_PARQUET_TIMESTAMP_NTZ_CHECK: ConfigEntry[Boolean] = + conf("spark.comet.scan.timestampNTZSafetyCheck") + .category(CATEGORY_SCAN) + .doc( + "Parquet files may contain INT96 timestamps (TimestampType/LTZ) which the " + + "native_datafusion scan cannot distinguish from TimestampNTZType after Parquet " + + "schema coercion. When this config is true (default), the native_datafusion scan " + + "falls back to Spark for TimestampNTZ columns to avoid silently returning incorrect " + + "timestamp values. Set to false to allow native execution if you know your Parquet " + + "files do not contain INT96 timestamps being read as TimestampNTZ. See " + + s"https://github.com/apache/datafusion-comet/issues/3720 for details. $COMPAT_GUIDE.") + .booleanConf + .createWithDefault(true) + val COMET_EXEC_STRICT_FLOATING_POINT: ConfigEntry[Boolean] = conf("spark.comet.exec.strictFloatingPoint") .category(CATEGORY_EXEC) diff --git a/docs/source/user-guide/latest/compatibility/scans.md b/docs/source/user-guide/latest/compatibility/scans.md index 2560436d7e..9621480729 100644 --- a/docs/source/user-guide/latest/compatibility/scans.md +++ b/docs/source/user-guide/latest/compatibility/scans.md @@ -71,9 +71,12 @@ requires `spark.comet.exec.enabled=true` because the scan node must be wrapped b - Duplicate field names in case-insensitive mode (e.g., a Parquet file with both `B` and `b` columns) are detected at read time and raise a `SparkRuntimeException` with error class `_LEGACY_ERROR_TEMP_2093`, matching Spark's behavior. -- When reading a Parquet file where the file schema has `TimestampType` (LTZ) but the read schema requests - `TimestampNTZType` (or vice versa), the `native_datafusion` scan falls back to Spark. - See [issue #3720](https://github.com/apache/datafusion-comet/issues/3720) for more details. +- `TimestampNTZType` columns, by default. Parquet files may contain INT96 timestamps (`TimestampType`/LTZ) + which the `native_datafusion` scan cannot distinguish from `TimestampNTZType` after Parquet schema coercion, + potentially returning incorrect timestamp values. When `spark.comet.scan.timestampNTZSafetyCheck=true` + (default), the scan falls back to Spark for `TimestampNTZ` columns. Set to `false` if your Parquet files + do not contain INT96 timestamps being read as `TimestampNTZ`. See + [issue #3720](https://github.com/apache/datafusion-comet/issues/3720) for more details. ## `native_iceberg_compat` Limitations diff --git a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala index e6c58121b8..4439fa6bef 100644 --- a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala +++ b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala @@ -709,6 +709,15 @@ case class CometScanTypeChecker(scanImpl: String) extends DataTypeSupport with C "native execution if your data does not contain unsigned small integers. " + CometConf.COMPAT_GUIDE false + case _: TimestampNTZType + if scanImpl == CometConf.SCAN_NATIVE_DATAFUSION && + CometConf.COMET_PARQUET_TIMESTAMP_NTZ_CHECK.get() => + fallbackReasons += + s"$scanImpl scan may read INT96 timestamps as TimestampNTZ incorrectly. " + + s"Set ${CometConf.COMET_PARQUET_TIMESTAMP_NTZ_CHECK.key}=false to allow " + + "native execution if your Parquet files do not contain INT96 timestamps " + + s"being read as TimestampNTZ. ${CometConf.COMPAT_GUIDE}" + false case dt if isStringCollationType(dt) => // we don't need specific support for collation in scans, but this // is a convenient place to force the whole query to fall back to Spark for now diff --git a/spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala b/spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala index 7cc4b44dca..da7f24183b 100644 --- a/spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala +++ b/spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala @@ -28,7 +28,6 @@ import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns.getExistenceDefa import org.apache.spark.sql.comet.{CometNativeExec, CometNativeScanExec, CometScanExec} import org.apache.spark.sql.execution.{FileSourceScanExec, InSubqueryExec, SubqueryAdaptiveBroadcastExec} import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.{TimestampNTZType, TimestampType} import org.apache.comet.{CometConf, ConfigEntry} import org.apache.comet.CometConf.COMET_EXEC_ENABLED @@ -79,27 +78,6 @@ object CometNativeScan extends CometOperatorSerde[CometScanExec] with Logging { withInfo(scanExec, "Full native scan disabled because ignoreMissingFiles enabled") } - // INT96 timestamps lose their LTZ/NTZ distinction once DataFusion reads them as - // Arrow Timestamp(Microsecond, None). Detect TimestampType <-> TimestampNTZType - // mismatches between the file schema and the read schema and fall back to Spark, - // which will throw the appropriate error (see issue #3720). - val dataSchema = scanExec.relation.dataSchema - scanExec.requiredSchema.fields.foreach { reqField => - if (dataSchema.fieldNames.contains(reqField.name)) { - val dataType = dataSchema(reqField.name).dataType - (dataType, reqField.dataType) match { - case (_: TimestampType, _: TimestampNTZType) | - (_: TimestampNTZType, _: TimestampType) => - withInfo( - scanExec, - s"Native DataFusion scan does not support reading column '${reqField.name}' " + - s"as ${reqField.dataType.simpleString} when the file schema type is " + - s"${dataType.simpleString}") - case _ => - } - } - } - // the scan is supported if no fallback reasons were added to the node !hasExplainInfo(scanExec) } diff --git a/spark/src/test/scala/org/apache/comet/parquet/ParquetSchemaMismatchSuite.scala b/spark/src/test/scala/org/apache/comet/parquet/ParquetSchemaMismatchSuite.scala index cbad72da07..7558cf1bcf 100644 --- a/spark/src/test/scala/org/apache/comet/parquet/ParquetSchemaMismatchSuite.scala +++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetSchemaMismatchSuite.scala @@ -123,9 +123,9 @@ class ParquetSchemaMismatchSuite extends CometTestBase { } // Case 3: INT96 TimestampLTZ read as TimestampNTZ. Spark throws on all - // versions (SPARK-36182). Comet's native_datafusion scan detects the - // TimestampType/TimestampNTZType mismatch between the file and read schemas - // at plan time and falls back to Spark, which throws SparkException. + // versions (SPARK-36182). Comet's native_datafusion scan falls back to Spark + // for TimestampNTZ columns by default (timestampNTZSafetyCheck) because INT96 + // timestamps lose their LTZ/NTZ distinction after Parquet schema coercion. // native_iceberg_compat throws via TypeUtil.convertErrorForTimestampNTZ on // Spark 3.x (mirrors Spark's behavior). On Spark 4.0, TypeUtil.checkParquetType // has an isSpark40Plus guard that bypasses the INT96 check, so the read succeeds. @@ -139,8 +139,8 @@ class ParquetSchemaMismatchSuite extends CometTestBase { } spark.read.schema("ts timestamp_ntz").parquet(path) } { df => - // native_datafusion falls back to Spark due to TimestampType/TimestampNTZType - // mismatch detection; Spark throws SparkException on all versions. + // native_datafusion falls back to Spark for TimestampNTZ columns + // (timestampNTZSafetyCheck); Spark throws SparkException on all versions. intercept[SparkException] { df.collect() }