diff --git a/spark/src/main/scala/org/apache/comet/iceberg/IcebergReflection.scala b/spark/src/main/scala/org/apache/comet/iceberg/IcebergReflection.scala index 7c52f320cf..4e1466d9dc 100644 --- a/spark/src/main/scala/org/apache/comet/iceberg/IcebergReflection.scala +++ b/spark/src/main/scala/org/apache/comet/iceberg/IcebergReflection.scala @@ -538,6 +538,44 @@ object IcebergReflection extends Logging { * List of unsupported partition types (empty if all supported). Each entry is (fieldName, * typeStr, reason) */ + /** + * Checks whether a partition spec contains any non-identity transforms. + * + * Non-identity transforms include bucket, truncate, year, month, day, hour. These transforms + * mean that Iceberg's partition pruning cannot fully resolve equality filters on the source + * column, producing residual expressions that require post-scan filtering. + * + * @param partitionSpec + * The Iceberg PartitionSpec object + * @return + * Some(transformStr) for the first non-identity transform found, or None if all are identity + */ + def findNonIdentityTransform(partitionSpec: Any): Option[String] = { + import scala.jdk.CollectionConverters._ + + try { + val fieldsMethod = partitionSpec.getClass.getMethod("fields") + val fields = fieldsMethod.invoke(partitionSpec).asInstanceOf[java.util.List[_]] + + val partitionFieldClass = loadClass(ClassNames.PARTITION_FIELD) + val transformMethod = partitionFieldClass.getMethod("transform") + + fields.asScala.foreach { field => + val transform = transformMethod.invoke(field) + val transformStr = transform.toString + if (transformStr != Transforms.IDENTITY) { + return Some(transformStr) + } + } + None + } catch { + case e: Exception => + logError( + s"Iceberg reflection failure: Failed to inspect partition transforms: ${e.getMessage}") + None + } + } + def validatePartitionTypes(partitionSpec: Any, schema: Any): List[(String, String, String)] = { import scala.jdk.CollectionConverters._ diff --git a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala index 1c9ec98a7a..d0df65e964 100644 --- a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala +++ b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala @@ -552,19 +552,33 @@ case class CometScanRule(session: SparkSession) false } - // Check for unsupported transform functions in residual expressions - // iceberg-rust can only handle identity transforms in residuals; all other transforms - // (truncate, bucket, year, month, day, hour) must fall back to Spark - val transformFunctionsSupported = taskValidation.nonIdentityTransform match { - case Some(transformType) => - fallbackReasons += - s"Iceberg transform function '$transformType' in residual expression " + - "is not yet supported by iceberg-rust. " + - "Only identity transforms are supported." - false - case None => - true - } + // Safety guard: non-identity transforms with delete files must fall back. + // Non-identity transforms (truncate, bucket, year, etc.) produce residual + // expressions that require post-scan filtering. When delete files are also + // present, the native scan cannot correctly apply both the residual filter + // and delete file processing together, so we fall back to Spark. + // Detection uses PartitionSpec (table-level) rather than residual expressions, + // since Iceberg residuals use NamedReference terms without transform metadata. + val transformFunctionsSupported = + IcebergReflection.getPartitionSpec(metadata.table) match { + case Some(partitionSpec) => + IcebergReflection.findNonIdentityTransform(partitionSpec) match { + case Some(transformType) => + if (!taskValidation.deleteFiles.isEmpty) { + fallbackReasons += + s"Iceberg transform '$transformType' with delete files present. " + + "Falling back to ensure correct delete operation." + false + } else { + logInfo( + s"Iceberg partition uses transform '$transformType' - " + + "post-scan filtering will apply via CometFilter.") + true + } + case None => true + } + case None => true // Cannot inspect spec, allow through + } // Check for unsupported struct types in delete files val deleteFileTypesSupported = { @@ -825,23 +839,19 @@ object CometScanRule extends Logging { val contentScanTaskClass = Class.forName(IcebergReflection.ClassNames.CONTENT_SCAN_TASK) val contentFileClass = Class.forName(IcebergReflection.ClassNames.CONTENT_FILE) val fileScanTaskClass = Class.forName(IcebergReflection.ClassNames.FILE_SCAN_TASK) - val unboundPredicateClass = Class.forName(IcebergReflection.ClassNames.UNBOUND_PREDICATE) // scalastyle:on classforname // Cache all method lookups outside the loop val fileMethod = contentScanTaskClass.getMethod("file") val formatMethod = contentFileClass.getMethod("format") val pathMethod = contentFileClass.getMethod("path") - val residualMethod = contentScanTaskClass.getMethod("residual") val deletesMethod = fileScanTaskClass.getMethod("deletes") - val termMethod = unboundPredicateClass.getMethod("term") val supportedSchemes = Set("file", "s3", "s3a", "gs", "gcs", "oss", "abfss", "abfs", "wasbs", "wasb") var allParquet = true val unsupportedSchemes = mutable.Set[String]() - var nonIdentityTransform: Option[String] = None val deleteFiles = new java.util.ArrayList[Any]() tasks.asScala.foreach { task => @@ -865,29 +875,6 @@ object CometScanRule extends Logging { case _: java.net.URISyntaxException => // ignore } - // Residual transform check (short-circuit if already found unsupported) - if (nonIdentityTransform.isEmpty && fileScanTaskClass.isInstance(task)) { - try { - val residual = residualMethod.invoke(task) - if (unboundPredicateClass.isInstance(residual)) { - val term = termMethod.invoke(residual) - try { - val transformMethod = term.getClass.getMethod("transform") - transformMethod.setAccessible(true) - val transform = transformMethod.invoke(term) - val transformStr = transform.toString - if (transformStr != IcebergReflection.Transforms.IDENTITY) { - nonIdentityTransform = Some(transformStr) - } - } catch { - case _: NoSuchMethodException => // No transform = simple reference, OK - } - } - } catch { - case _: Exception => // Skip tasks where we can't get residual - } - } - // Collect delete files and check their schemes if (fileScanTaskClass.isInstance(task)) { try { @@ -914,11 +901,7 @@ object CometScanRule extends Logging { } } - IcebergTaskValidationResult( - allParquet, - unsupportedSchemes.toSet, - nonIdentityTransform, - deleteFiles) + IcebergTaskValidationResult(allParquet, unsupportedSchemes.toSet, deleteFiles) } } @@ -928,5 +911,4 @@ object CometScanRule extends Logging { case class IcebergTaskValidationResult( allParquet: Boolean, unsupportedSchemes: Set[String], - nonIdentityTransform: Option[String], deleteFiles: java.util.List[_]) diff --git a/spark/src/test/scala/org/apache/comet/CometIcebergNativeSuite.scala b/spark/src/test/scala/org/apache/comet/CometIcebergNativeSuite.scala index ba593dcaa1..07b1a1cca8 100644 --- a/spark/src/test/scala/org/apache/comet/CometIcebergNativeSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometIcebergNativeSuite.scala @@ -28,7 +28,7 @@ import scala.jdk.CollectionConverters._ import org.apache.spark.CometListenerBusUtils import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd} import org.apache.spark.sql.CometTestBase -import org.apache.spark.sql.comet.CometIcebergNativeScanExec +import org.apache.spark.sql.comet.{CometFilterExec, CometIcebergNativeScanExec} import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{StringType, TimestampType} @@ -73,6 +73,23 @@ class CometIcebergNativeSuite extends CometTestBase with RESTCatalogHelper { s"Expected exactly 1 CometIcebergNativeScanExec but found ${icebergScans.length}. Plan:\n$cometPlan") } + /** + * Verifies query correctness, exactly one CometIcebergNativeScanExec, and at least one + * CometFilterExec in the plan. Used for non-identity transform residual tests where + * iceberg-rust skips row-group filtering and CometFilter applies the predicate post-scan. + */ + private def checkIcebergNativeScanWithFilter(query: String): Unit = { + val (_, cometPlan) = checkSparkAnswer(query) + val icebergScans = collectIcebergNativeScans(cometPlan) + assert( + icebergScans.length == 1, + s"Expected exactly 1 CometIcebergNativeScanExec but found ${icebergScans.length}. Plan:\n$cometPlan") + val filters = collect(cometPlan) { case f: CometFilterExec => f } + assert( + filters.nonEmpty, + s"Expected CometFilterExec for post-scan filtering but found none. Plan:\n$cometPlan") + } + test("create and query simple Iceberg table with Hadoop catalog") { assume(icebergAvailable, "Iceberg not available in classpath") @@ -2511,6 +2528,323 @@ class CometIcebergNativeSuite extends CometTestBase with RESTCatalogHelper { } } + // Tests for non-identity transform residuals feature + // These tests verify that native Iceberg scans work when residual expressions + // contain non-identity transforms (truncate, bucket, year, month, day, hour). + // Previously these would fall back to Spark, but now they're supported with + // post-scan filtering via CometFilter. + + test("non-identity transform residual - truncate transform allows native scan") { + assume(icebergAvailable, "Iceberg not available in classpath") + + withTempIcebergDir { warehouseDir => + withSQLConf( + "spark.sql.catalog.test_cat" -> "org.apache.iceberg.spark.SparkCatalog", + "spark.sql.catalog.test_cat.type" -> "hadoop", + "spark.sql.catalog.test_cat.warehouse" -> warehouseDir.getAbsolutePath, + CometConf.COMET_ENABLED.key -> "true", + CometConf.COMET_EXEC_ENABLED.key -> "true", + CometConf.COMET_ICEBERG_NATIVE_ENABLED.key -> "true") { + + // Create table partitioned by truncate transform + // When filtering by exact value (e.g., name = 'alpha_1'), Iceberg creates + // a residual expression because truncate(5, name) can't fully evaluate this + spark.sql(""" + CREATE TABLE test_cat.db.truncate_residual_test ( + id INT, + name STRING + ) USING iceberg + PARTITIONED BY (truncate(5, name)) + """) + + spark.sql(""" + INSERT INTO test_cat.db.truncate_residual_test VALUES + (1, 'alpha_1'), (2, 'alpha_2'), (3, 'alpha_3'), + (4, 'bravo_1'), (5, 'bravo_2'), (6, 'charlie_1') + """) + + // This filter creates a residual with truncate transform + // The partition can narrow down to 'alpha' prefix, but exact match + // requires post-scan filtering + checkIcebergNativeScanWithFilter( + "SELECT * FROM test_cat.db.truncate_residual_test WHERE name = 'alpha_2' ORDER BY id") + + // Verify correct results + val result = spark + .sql("SELECT * FROM test_cat.db.truncate_residual_test WHERE name = 'alpha_2'") + .collect() + assert(result.length == 1, s"Expected 1 row, got ${result.length}") + assert(result(0).getInt(0) == 2, s"Expected id=2, got ${result(0).getInt(0)}") + + spark.sql("DROP TABLE test_cat.db.truncate_residual_test") + } + } + } + + test("non-identity transform residual - bucket transform allows native scan") { + assume(icebergAvailable, "Iceberg not available in classpath") + + withTempIcebergDir { warehouseDir => + withSQLConf( + "spark.sql.catalog.test_cat" -> "org.apache.iceberg.spark.SparkCatalog", + "spark.sql.catalog.test_cat.type" -> "hadoop", + "spark.sql.catalog.test_cat.warehouse" -> warehouseDir.getAbsolutePath, + CometConf.COMET_ENABLED.key -> "true", + CometConf.COMET_EXEC_ENABLED.key -> "true", + CometConf.COMET_ICEBERG_NATIVE_ENABLED.key -> "true") { + + // Create table partitioned by bucket transform + // When filtering by exact id value, Iceberg creates a residual expression + // because bucket(4, id) maps multiple ids to the same bucket + spark.sql(""" + CREATE TABLE test_cat.db.bucket_residual_test ( + id INT, + value DOUBLE + ) USING iceberg + PARTITIONED BY (bucket(4, id)) + """) + + spark.sql(""" + INSERT INTO test_cat.db.bucket_residual_test + SELECT id, CAST(id * 1.5 AS DOUBLE) as value + FROM range(100) + """) + + // This filter creates a residual with bucket transform + // The partition pruning uses bucket hash, but exact id match + // requires post-scan filtering + checkIcebergNativeScanWithFilter( + "SELECT * FROM test_cat.db.bucket_residual_test WHERE id = 42 ORDER BY id") + + // Verify correct results + val result = + spark.sql("SELECT * FROM test_cat.db.bucket_residual_test WHERE id = 42").collect() + assert(result.length == 1, s"Expected 1 row, got ${result.length}") + assert(result(0).getInt(0) == 42, s"Expected id=42, got ${result(0).getInt(0)}") + + spark.sql("DROP TABLE test_cat.db.bucket_residual_test") + } + } + } + + test("non-identity transform residual - year transform allows native scan") { + assume(icebergAvailable, "Iceberg not available in classpath") + + withTempIcebergDir { warehouseDir => + withSQLConf( + "spark.sql.catalog.test_cat" -> "org.apache.iceberg.spark.SparkCatalog", + "spark.sql.catalog.test_cat.type" -> "hadoop", + "spark.sql.catalog.test_cat.warehouse" -> warehouseDir.getAbsolutePath, + CometConf.COMET_ENABLED.key -> "true", + CometConf.COMET_EXEC_ENABLED.key -> "true", + CometConf.COMET_ICEBERG_NATIVE_ENABLED.key -> "true") { + + // Create table partitioned by year transform + // When filtering by exact date, Iceberg creates a residual expression + // because year(event_date) groups all dates in a year together + spark.sql(""" + CREATE TABLE test_cat.db.year_residual_test ( + id INT, + event_date DATE, + data STRING + ) USING iceberg + PARTITIONED BY (year(event_date)) + """) + + spark.sql(""" + INSERT INTO test_cat.db.year_residual_test VALUES + (1, DATE '2023-01-15', 'jan'), + (2, DATE '2023-06-20', 'jun'), + (3, DATE '2023-12-25', 'dec'), + (4, DATE '2024-01-10', 'new_year'), + (5, DATE '2024-07-04', 'july') + """) + + // This filter creates a residual with year transform + // Partition pruning narrows to 2023, but exact date match + // requires post-scan filtering + checkIcebergNativeScanWithFilter( + "SELECT * FROM test_cat.db.year_residual_test WHERE event_date = DATE '2023-06-20'") + + // Verify correct results + val result = spark + .sql( + "SELECT * FROM test_cat.db.year_residual_test WHERE event_date = DATE '2023-06-20'") + .collect() + assert(result.length == 1, s"Expected 1 row, got ${result.length}") + assert(result(0).getInt(0) == 2, s"Expected id=2, got ${result(0).getInt(0)}") + assert( + result(0).getString(2) == "jun", + s"Expected data='jun', got ${result(0).getString(2)}") + + spark.sql("DROP TABLE test_cat.db.year_residual_test") + } + } + } + + test("non-identity transform residual - month transform allows native scan") { + assume(icebergAvailable, "Iceberg not available in classpath") + + withTempIcebergDir { warehouseDir => + withSQLConf( + "spark.sql.catalog.test_cat" -> "org.apache.iceberg.spark.SparkCatalog", + "spark.sql.catalog.test_cat.type" -> "hadoop", + "spark.sql.catalog.test_cat.warehouse" -> warehouseDir.getAbsolutePath, + CometConf.COMET_ENABLED.key -> "true", + CometConf.COMET_EXEC_ENABLED.key -> "true", + CometConf.COMET_ICEBERG_NATIVE_ENABLED.key -> "true") { + + // Create table partitioned by month transform + // When filtering by exact date, Iceberg creates a residual expression + // because months(event_date) groups all dates in a month together + spark.sql(""" + CREATE TABLE test_cat.db.month_residual_test ( + id INT, + event_date DATE, + data STRING + ) USING iceberg + PARTITIONED BY (months(event_date)) + """) + + spark.sql(""" + INSERT INTO test_cat.db.month_residual_test VALUES + (1, DATE '2023-06-01', 'first'), + (2, DATE '2023-06-15', 'mid'), + (3, DATE '2023-06-30', 'last'), + (4, DATE '2023-07-05', 'july'), + (5, DATE '2023-05-20', 'may') + """) + + // This filter creates a residual with month transform + // Partition pruning narrows to June 2023, but exact date match + // requires post-scan filtering + checkIcebergNativeScanWithFilter( + "SELECT * FROM test_cat.db.month_residual_test WHERE event_date = DATE '2023-06-15'") + + // Verify correct results + val result = spark + .sql( + "SELECT * FROM test_cat.db.month_residual_test WHERE event_date = DATE '2023-06-15'") + .collect() + assert(result.length == 1, s"Expected 1 row, got ${result.length}") + assert(result(0).getInt(0) == 2, s"Expected id=2, got ${result(0).getInt(0)}") + assert( + result(0).getString(2) == "mid", + s"Expected data='mid', got ${result(0).getString(2)}") + + spark.sql("DROP TABLE test_cat.db.month_residual_test") + } + } + } + + test("non-identity transform residual - day transform allows native scan") { + assume(icebergAvailable, "Iceberg not available in classpath") + + withTempIcebergDir { warehouseDir => + withSQLConf( + "spark.sql.catalog.test_cat" -> "org.apache.iceberg.spark.SparkCatalog", + "spark.sql.catalog.test_cat.type" -> "hadoop", + "spark.sql.catalog.test_cat.warehouse" -> warehouseDir.getAbsolutePath, + CometConf.COMET_ENABLED.key -> "true", + CometConf.COMET_EXEC_ENABLED.key -> "true", + CometConf.COMET_ICEBERG_NATIVE_ENABLED.key -> "true") { + + // Create table partitioned by day transform + // When filtering by exact timestamp, Iceberg creates a residual expression + // because days(event_time) groups all timestamps in a day together + spark.sql(""" + CREATE TABLE test_cat.db.day_residual_test ( + id INT, + event_time TIMESTAMP, + data STRING + ) USING iceberg + PARTITIONED BY (days(event_time)) + """) + + spark.sql(""" + INSERT INTO test_cat.db.day_residual_test VALUES + (1, TIMESTAMP '2023-06-15 08:00:00', 'morning'), + (2, TIMESTAMP '2023-06-15 14:30:00', 'afternoon'), + (3, TIMESTAMP '2023-06-15 22:45:00', 'evening'), + (4, TIMESTAMP '2023-06-16 10:00:00', 'next_day'), + (5, TIMESTAMP '2023-06-14 18:00:00', 'prev_day') + """) + + // This filter creates a residual with day transform + // Partition pruning narrows to June 15, but exact timestamp match + // requires post-scan filtering + checkIcebergNativeScanWithFilter( + "SELECT * FROM test_cat.db.day_residual_test WHERE event_time = TIMESTAMP '2023-06-15 14:30:00'") + + // Verify correct results + val result = spark + .sql("SELECT * FROM test_cat.db.day_residual_test WHERE event_time = TIMESTAMP '2023-06-15 14:30:00'") + .collect() + assert(result.length == 1, s"Expected 1 row, got ${result.length}") + assert(result(0).getInt(0) == 2, s"Expected id=2, got ${result(0).getInt(0)}") + assert( + result(0).getString(2) == "afternoon", + s"Expected data='afternoon', got ${result(0).getString(2)}") + + spark.sql("DROP TABLE test_cat.db.day_residual_test") + } + } + } + + test("non-identity transform residual - hour transform allows native scan") { + assume(icebergAvailable, "Iceberg not available in classpath") + + withTempIcebergDir { warehouseDir => + withSQLConf( + "spark.sql.catalog.test_cat" -> "org.apache.iceberg.spark.SparkCatalog", + "spark.sql.catalog.test_cat.type" -> "hadoop", + "spark.sql.catalog.test_cat.warehouse" -> warehouseDir.getAbsolutePath, + CometConf.COMET_ENABLED.key -> "true", + CometConf.COMET_EXEC_ENABLED.key -> "true", + CometConf.COMET_ICEBERG_NATIVE_ENABLED.key -> "true") { + + // Create table partitioned by hour transform + // When filtering by exact timestamp with seconds, Iceberg creates a residual + // because hours(event_time) groups all timestamps in an hour together + spark.sql(""" + CREATE TABLE test_cat.db.hour_residual_test ( + id INT, + event_time TIMESTAMP, + data STRING + ) USING iceberg + PARTITIONED BY (hours(event_time)) + """) + + spark.sql(""" + INSERT INTO test_cat.db.hour_residual_test VALUES + (1, TIMESTAMP '2023-06-15 14:10:30', 'early'), + (2, TIMESTAMP '2023-06-15 14:30:45', 'mid'), + (3, TIMESTAMP '2023-06-15 14:55:15', 'late'), + (4, TIMESTAMP '2023-06-15 15:05:00', 'next_hour'), + (5, TIMESTAMP '2023-06-15 13:50:00', 'prev_hour') + """) + + // This filter creates a residual with hour transform + // Partition pruning narrows to hour 14 (2pm), but exact timestamp + // with seconds requires post-scan filtering + checkIcebergNativeScanWithFilter( + "SELECT * FROM test_cat.db.hour_residual_test WHERE event_time = TIMESTAMP '2023-06-15 14:30:45'") + + // Verify correct results + val result = spark + .sql("SELECT * FROM test_cat.db.hour_residual_test WHERE event_time = TIMESTAMP '2023-06-15 14:30:45'") + .collect() + assert(result.length == 1, s"Expected 1 row, got ${result.length}") + assert(result(0).getInt(0) == 2, s"Expected id=2, got ${result(0).getInt(0)}") + assert( + result(0).getString(2) == "mid", + s"Expected data='mid', got ${result(0).getString(2)}") + + spark.sql("DROP TABLE test_cat.db.hour_residual_test") + } + } + } + // Helper to create temp directory def withTempIcebergDir(f: File => Unit): Unit = { val dir = Files.createTempDirectory("comet-iceberg-test").toFile @@ -2836,6 +3170,150 @@ class CometIcebergNativeSuite extends CometTestBase with RESTCatalogHelper { } } + // ========================================================================= + // Additional integration tests for non-identity transform residuals + // ========================================================================= + + // Test A: Non-identity transform with delete files must fall back to Spark. + // When the table has a non-identity partition transform (truncate) AND MOR delete + // files are present, the native Iceberg scan must fall back to Spark to ensure + // correct delete processing. Detection uses PartitionSpec inspection. + // Uses truncate(3, name) so the query and deleted row share the same partition + // (truncate(3, 'alpha') = truncate(3, 'alpine') = 'alp'). + test("non-identity transform residual - falls back with delete files present") { + assume(icebergAvailable, "Iceberg not available in classpath") + + withTempIcebergDir { warehouseDir => + withSQLConf( + "spark.sql.catalog.test_cat" -> "org.apache.iceberg.spark.SparkCatalog", + "spark.sql.catalog.test_cat.type" -> "hadoop", + "spark.sql.catalog.test_cat.warehouse" -> warehouseDir.getAbsolutePath, + CometConf.COMET_ENABLED.key -> "true", + CometConf.COMET_EXEC_ENABLED.key -> "true", + CometConf.COMET_ICEBERG_NATIVE_ENABLED.key -> "true") { + + val dataPath = s"${warehouseDir.getAbsolutePath}/nested_data" + + // Write Parquet WITHOUT Iceberg (simulates pre-migration data) + // id is last so its leaf index is after all nested type leaves + spark + .sql(""" + SELECT + named_struct('age', id * 10, 'score', id * 1.5) AS info, + array(id, id + 1) AS tags, + map('key', id) AS props, + id + FROM range(10) + """) + .write + .parquet(dataPath) + + spark.sql("CREATE NAMESPACE IF NOT EXISTS test_cat.db") + spark.sql(""" + CREATE TABLE test_cat.db.nested_migrate ( + info STRUCT, + tags ARRAY, + props MAP, + id BIGINT + ) USING iceberg + """) + + try { + val tableUtilClass = Class.forName("org.apache.iceberg.spark.SparkTableUtil") + val sparkCatalog = spark.sessionState.catalogManager + .catalog("test_cat") + .asInstanceOf[org.apache.iceberg.spark.SparkCatalog] + val ident = + org.apache.spark.sql.connector.catalog.Identifier.of(Array("db"), "nested_migrate") + val sparkTable = sparkCatalog + .loadTable(ident) + .asInstanceOf[org.apache.iceberg.spark.source.SparkTable] + val table = sparkTable.table() + + val stagingDir = s"${warehouseDir.getAbsolutePath}/staging" + spark.sql(s"""CREATE TABLE parquet_temp USING parquet LOCATION '$dataPath'""") + val sourceIdent = new org.apache.spark.sql.catalyst.TableIdentifier("parquet_temp") + + val importMethod = tableUtilClass.getMethod( + "importSparkTable", + classOf[org.apache.spark.sql.SparkSession], + classOf[org.apache.spark.sql.catalyst.TableIdentifier], + classOf[org.apache.iceberg.Table], + classOf[String]) + importMethod.invoke(null, spark, sourceIdent, table, stagingDir) + + // Select only flat columns to avoid Spark's Iceberg reader returning + // null for struct fields in migrated tables (separate Spark bug) + checkIcebergNativeScan("SELECT id FROM test_cat.db.nested_migrate ORDER BY id") + + // Filter on root column with nested types in migrated table: + // Parquet files lack Iceberg field IDs, so iceberg-rust falls back to + // name mapping where column_map resolution was broken for nested types + checkIcebergNativeScan( + "SELECT id FROM test_cat.db.nested_migrate WHERE id > 5 ORDER BY id") + + spark.sql("DROP TABLE test_cat.db.nested_migrate") + spark.sql("DROP TABLE parquet_temp") + } catch { + case _: ClassNotFoundException => + cancel("SparkTableUtil not available") + } + spark.sql(""" + CREATE TABLE test_cat.db.truncate_delete_fallback ( + id INT, + name STRING, + value DOUBLE + ) USING iceberg + PARTITIONED BY (truncate(3, name)) + TBLPROPERTIES ( + 'format-version' = '2', + 'write.delete.mode' = 'merge-on-read', + 'write.merge.mode' = 'merge-on-read' + ) + """) + + spark.sql(""" + INSERT INTO test_cat.db.truncate_delete_fallback VALUES + (1, 'alpha', 10.0), (2, 'alpine', 20.0), (3, 'bravo', 30.0), + (4, 'bridge', 40.0), (5, 'charlie', 50.0), (6, 'cherry', 60.0) + """) + + // Delete 'alpine' which shares truncate(3)='alp' partition with 'alpha'. + spark.sql("DELETE FROM test_cat.db.truncate_delete_fallback WHERE name = 'alpine'") + + // Query with filter. Because the table has truncate transform AND delete files, + // native scan must fall back to Spark. + val query = + "SELECT * FROM test_cat.db.truncate_delete_fallback WHERE name = 'alpha' ORDER BY id" + val (_, cometPlan) = checkSparkAnswer(query) + + // Assert fallback: no CometIcebergNativeScanExec in plan + val icebergScans = collectIcebergNativeScans(cometPlan) + assert( + icebergScans.isEmpty, + "Expected fallback to Spark (no CometIcebergNativeScanExec) when " + + s"non-identity transform has delete files. Plan:\n$cometPlan") + + // Verify correct results: only 'alpha' returned, 'alpine' is deleted + val result = spark.sql(query).collect() + assert(result.length == 1, s"Expected 1 row, got ${result.length}") + assert(result(0).getInt(0) == 1, s"Expected id=1, got ${result(0).getInt(0)}") + assert(result(0).getString(1) == "alpha") + + // Verify 'alpine' is truly gone from broader query + val allResult = spark + .sql("SELECT * FROM test_cat.db.truncate_delete_fallback ORDER BY id") + .collect() + assert(allResult.length == 5, s"Expected 5 rows after delete, got ${allResult.length}") + assert( + !allResult.exists(_.getString(1) == "alpine"), + "Deleted row 'alpine' should not appear in results") + + spark.sql("DROP TABLE test_cat.db.truncate_delete_fallback") + } + } + } + test("task-level inputMetrics.bytesRead is populated for Iceberg native scan") { assume(icebergAvailable, "Iceberg not available in classpath") @@ -2934,4 +3412,351 @@ class CometIcebergNativeSuite extends CometTestBase with RESTCatalogHelper { } } } + + // Test B: Composite transforms - table partitioned by two non-identity transforms. + // Verifies native scan works when filtering on columns involved in multiple transforms. + test("non-identity transform residual - composite bucket and truncate transforms") { + assume(icebergAvailable, "Iceberg not available in classpath") + + withTempIcebergDir { warehouseDir => + withSQLConf( + "spark.sql.catalog.test_cat" -> "org.apache.iceberg.spark.SparkCatalog", + "spark.sql.catalog.test_cat.type" -> "hadoop", + "spark.sql.catalog.test_cat.warehouse" -> warehouseDir.getAbsolutePath, + CometConf.COMET_ENABLED.key -> "true", + CometConf.COMET_EXEC_ENABLED.key -> "true", + CometConf.COMET_ICEBERG_NATIVE_ENABLED.key -> "true") { + + spark.sql(""" + CREATE TABLE test_cat.db.composite_transform ( + id INT, + name STRING, + value DOUBLE + ) USING iceberg + PARTITIONED BY (bucket(4, id), truncate(3, name)) + """) + + spark.sql(""" + INSERT INTO test_cat.db.composite_transform VALUES + (1, 'alpha', 10.0), (2, 'alpha', 20.0), (3, 'bravo', 30.0), + (4, 'bravo', 40.0), (5, 'charlie', 50.0), (6, 'delta', 60.0), + (10, 'alpha', 70.0), (20, 'bravo', 80.0), (30, 'charlie', 90.0) + """) + + // Filter on both partition columns creates residuals for both transforms + checkIcebergNativeScanWithFilter( + "SELECT * FROM test_cat.db.composite_transform " + + "WHERE id = 3 AND name = 'bravo' ORDER BY id") + + // Verify correct results + val result = spark + .sql("SELECT * FROM test_cat.db.composite_transform " + + "WHERE id = 3 AND name = 'bravo'") + .collect() + assert(result.length == 1, s"Expected 1 row, got ${result.length}") + assert(result(0).getInt(0) == 3) + assert(result(0).getString(1) == "bravo") + + // Filter on only one of the two transform columns + checkIcebergNativeScanWithFilter( + "SELECT * FROM test_cat.db.composite_transform WHERE name = 'alpha' ORDER BY id") + + val result2 = spark + .sql("SELECT * FROM test_cat.db.composite_transform WHERE name = 'alpha'") + .collect() + assert(result2.length == 3, s"Expected 3 rows for 'alpha', got ${result2.length}") + + spark.sql("DROP TABLE test_cat.db.composite_transform") + } + } + } + + // Test C: Range filters on non-identity transforms. + // Range predicates (>, <, BETWEEN) produce different residual expression shapes + // compared to equality filters. + test("non-identity transform residual - range filters on bucket and day transforms") { + assume(icebergAvailable, "Iceberg not available in classpath") + + withTempIcebergDir { warehouseDir => + withSQLConf( + "spark.sql.catalog.test_cat" -> "org.apache.iceberg.spark.SparkCatalog", + "spark.sql.catalog.test_cat.type" -> "hadoop", + "spark.sql.catalog.test_cat.warehouse" -> warehouseDir.getAbsolutePath, + CometConf.COMET_ENABLED.key -> "true", + CometConf.COMET_EXEC_ENABLED.key -> "true", + CometConf.COMET_ICEBERG_NATIVE_ENABLED.key -> "true") { + + // Range filter on bucket-partitioned int column + spark.sql(""" + CREATE TABLE test_cat.db.bucket_range ( + id INT, + value DOUBLE + ) USING iceberg + PARTITIONED BY (bucket(4, id)) + """) + + spark.sql(""" + INSERT INTO test_cat.db.bucket_range + SELECT id, CAST(id * 2.5 AS DOUBLE) as value + FROM range(100) + """) + + checkIcebergNativeScanWithFilter( + "SELECT * FROM test_cat.db.bucket_range WHERE id > 10 AND id < 20 ORDER BY id") + + val rangeResult = spark + .sql("SELECT * FROM test_cat.db.bucket_range WHERE id > 10 AND id < 20") + .collect() + assert( + rangeResult.length == 9, + s"Expected 9 rows for id in (11..19), got ${rangeResult.length}") + + spark.sql("DROP TABLE test_cat.db.bucket_range") + + // Range filter on day-partitioned timestamp column + spark.sql(""" + CREATE TABLE test_cat.db.day_range ( + id INT, + event_time TIMESTAMP, + data STRING + ) USING iceberg + PARTITIONED BY (days(event_time)) + """) + + spark.sql(""" + INSERT INTO test_cat.db.day_range VALUES + (1, TIMESTAMP '2023-06-15 08:00:00', 'a'), + (2, TIMESTAMP '2023-06-15 12:00:00', 'b'), + (3, TIMESTAMP '2023-06-15 18:00:00', 'c'), + (4, TIMESTAMP '2023-06-16 09:00:00', 'd'), + (5, TIMESTAMP '2023-06-16 15:00:00', 'e'), + (6, TIMESTAMP '2023-06-17 10:00:00', 'f') + """) + + // Range within a single day creates a residual because days() groups + // all timestamps in a day together + checkIcebergNativeScanWithFilter( + "SELECT * FROM test_cat.db.day_range " + + "WHERE event_time > TIMESTAMP '2023-06-15 10:00:00' " + + "AND event_time < TIMESTAMP '2023-06-15 20:00:00' ORDER BY id") + + val dayRangeResult = spark + .sql( + "SELECT * FROM test_cat.db.day_range " + + "WHERE event_time > TIMESTAMP '2023-06-15 10:00:00' " + + "AND event_time < TIMESTAMP '2023-06-15 20:00:00'") + .collect() + assert( + dayRangeResult.length == 2, + s"Expected 2 rows (ids 2,3), got ${dayRangeResult.length}") + + spark.sql("DROP TABLE test_cat.db.day_range") + } + } + } + + // Test D: Aggregates with non-identity transform residuals. + // Ensures the CometFilter + aggregation pipeline works end-to-end + // when post-scan filtering is required. + test("non-identity transform residual - aggregates with bucket transform") { + assume(icebergAvailable, "Iceberg not available in classpath") + + withTempIcebergDir { warehouseDir => + withSQLConf( + "spark.sql.catalog.test_cat" -> "org.apache.iceberg.spark.SparkCatalog", + "spark.sql.catalog.test_cat.type" -> "hadoop", + "spark.sql.catalog.test_cat.warehouse" -> warehouseDir.getAbsolutePath, + CometConf.COMET_ENABLED.key -> "true", + CometConf.COMET_EXEC_ENABLED.key -> "true", + CometConf.COMET_ICEBERG_NATIVE_ENABLED.key -> "true") { + + spark.sql(""" + CREATE TABLE test_cat.db.bucket_agg ( + id INT, + category STRING, + amount DOUBLE + ) USING iceberg + PARTITIONED BY (bucket(4, id)) + """) + + spark.sql(""" + INSERT INTO test_cat.db.bucket_agg VALUES + (1, 'A', 10.0), (1, 'A', 20.0), (1, 'B', 30.0), + (2, 'A', 40.0), (2, 'B', 50.0), + (3, 'A', 60.0), (3, 'C', 70.0), + (5, 'B', 80.0), (5, 'C', 90.0), + (10, 'A', 100.0) + """) + + // Aggregate with filter that creates a bucket residual + checkIcebergNativeScanWithFilter( + "SELECT COUNT(*), SUM(amount) FROM test_cat.db.bucket_agg WHERE id = 1") + + val aggResult = spark + .sql("SELECT COUNT(*), SUM(amount) FROM test_cat.db.bucket_agg WHERE id = 1") + .collect() + assert(aggResult(0).getLong(0) == 3, s"Expected count=3, got ${aggResult(0).getLong(0)}") + assert( + aggResult(0).getDouble(1) == 60.0, + s"Expected sum=60.0, got ${aggResult(0).getDouble(1)}") + + // GROUP BY with filter + checkIcebergNativeScanWithFilter( + "SELECT category, SUM(amount) FROM test_cat.db.bucket_agg " + + "WHERE id IN (1, 2) GROUP BY category ORDER BY category") + + val groupResult = spark + .sql("SELECT category, SUM(amount) FROM test_cat.db.bucket_agg " + + "WHERE id IN (1, 2) GROUP BY category ORDER BY category") + .collect() + assert(groupResult.length == 2, s"Expected 2 groups, got ${groupResult.length}") + // Category A: 10+20+40=70, Category B: 30+50=80 + assert(groupResult(0).getString(0) == "A") + assert(groupResult(0).getDouble(1) == 70.0) + assert(groupResult(1).getString(0) == "B") + assert(groupResult(1).getDouble(1) == 80.0) + + spark.sql("DROP TABLE test_cat.db.bucket_agg") + } + } + } + + // Test E: Compound predicates (OR/AND) with non-identity transform residuals. + // OR predicates across different truncate groups produce more complex residual shapes. + test("non-identity transform residual - compound OR and AND predicates") { + assume(icebergAvailable, "Iceberg not available in classpath") + + withTempIcebergDir { warehouseDir => + withSQLConf( + "spark.sql.catalog.test_cat" -> "org.apache.iceberg.spark.SparkCatalog", + "spark.sql.catalog.test_cat.type" -> "hadoop", + "spark.sql.catalog.test_cat.warehouse" -> warehouseDir.getAbsolutePath, + CometConf.COMET_ENABLED.key -> "true", + CometConf.COMET_EXEC_ENABLED.key -> "true", + CometConf.COMET_ICEBERG_NATIVE_ENABLED.key -> "true") { + + spark.sql(""" + CREATE TABLE test_cat.db.truncate_compound ( + id INT, + name STRING, + value DOUBLE + ) USING iceberg + PARTITIONED BY (truncate(5, name)) + """) + + spark.sql(""" + INSERT INTO test_cat.db.truncate_compound VALUES + (1, 'alpha_1', 10.0), (2, 'alpha_2', 20.0), (3, 'alpha_3', 30.0), + (4, 'bravo_1', 40.0), (5, 'bravo_2', 50.0), + (6, 'charlie_1', 60.0), (7, 'charlie_2', 70.0), + (8, 'delta_1', 80.0) + """) + + // OR across different truncate groups + checkIcebergNativeScanWithFilter( + "SELECT * FROM test_cat.db.truncate_compound " + + "WHERE name = 'alpha_1' OR name = 'bravo_1' ORDER BY id") + + val orResult = spark + .sql("SELECT * FROM test_cat.db.truncate_compound " + + "WHERE name = 'alpha_1' OR name = 'bravo_1'") + .collect() + assert(orResult.length == 2, s"Expected 2 rows for OR, got ${orResult.length}") + assert(orResult.map(_.getInt(0)).sorted.toSeq == Seq(1, 4)) + + // AND with mixed columns + checkIcebergNativeScanWithFilter( + "SELECT * FROM test_cat.db.truncate_compound " + + "WHERE name = 'alpha_2' AND value > 15.0 ORDER BY id") + + val andResult = spark + .sql("SELECT * FROM test_cat.db.truncate_compound " + + "WHERE name = 'alpha_2' AND value > 15.0") + .collect() + assert(andResult.length == 1, s"Expected 1 row for AND, got ${andResult.length}") + assert(andResult(0).getInt(0) == 2) + + // OR within the same truncate group + checkIcebergNativeScanWithFilter( + "SELECT * FROM test_cat.db.truncate_compound " + + "WHERE name = 'alpha_1' OR name = 'alpha_3' ORDER BY id") + + val sameGroupResult = spark + .sql("SELECT * FROM test_cat.db.truncate_compound " + + "WHERE name = 'alpha_1' OR name = 'alpha_3'") + .collect() + assert( + sameGroupResult.length == 2, + s"Expected 2 rows for same-group OR, got ${sameGroupResult.length}") + + spark.sql("DROP TABLE test_cat.db.truncate_compound") + } + } + } + + // Test F: Year transform with DATE type (not just TIMESTAMP). + // The existing year transform test uses TIMESTAMP. This verifies DATE columns + // also produce correct residuals with the years() transform. + test("non-identity transform residual - year transform with date type") { + assume(icebergAvailable, "Iceberg not available in classpath") + + withTempIcebergDir { warehouseDir => + withSQLConf( + "spark.sql.catalog.test_cat" -> "org.apache.iceberg.spark.SparkCatalog", + "spark.sql.catalog.test_cat.type" -> "hadoop", + "spark.sql.catalog.test_cat.warehouse" -> warehouseDir.getAbsolutePath, + CometConf.COMET_ENABLED.key -> "true", + CometConf.COMET_EXEC_ENABLED.key -> "true", + CometConf.COMET_ICEBERG_NATIVE_ENABLED.key -> "true") { + + spark.sql(""" + CREATE TABLE test_cat.db.year_date_residual ( + id INT, + event_date DATE, + data STRING + ) USING iceberg + PARTITIONED BY (years(event_date)) + """) + + spark.sql(""" + INSERT INTO test_cat.db.year_date_residual VALUES + (1, DATE '2022-03-15', 'spring_22'), + (2, DATE '2022-07-20', 'summer_22'), + (3, DATE '2022-11-05', 'fall_22'), + (4, DATE '2023-01-10', 'winter_23'), + (5, DATE '2023-06-25', 'summer_23'), + (6, DATE '2024-02-14', 'winter_24') + """) + + // Exact date match creates a residual because years() groups all dates + // in a year together + checkIcebergNativeScanWithFilter( + "SELECT * FROM test_cat.db.year_date_residual " + + "WHERE event_date = DATE '2022-07-20'") + + val exactResult = spark + .sql("SELECT * FROM test_cat.db.year_date_residual " + + "WHERE event_date = DATE '2022-07-20'") + .collect() + assert(exactResult.length == 1, s"Expected 1 row, got ${exactResult.length}") + assert(exactResult(0).getInt(0) == 2) + assert(exactResult(0).getString(2) == "summer_22") + + // Range within a year creates a residual + checkIcebergNativeScanWithFilter( + "SELECT * FROM test_cat.db.year_date_residual " + + "WHERE event_date >= DATE '2022-06-01' AND event_date <= DATE '2022-12-31' " + + "ORDER BY id") + + val rangeResult = spark + .sql("SELECT * FROM test_cat.db.year_date_residual " + + "WHERE event_date >= DATE '2022-06-01' AND event_date <= DATE '2022-12-31'") + .collect() + assert(rangeResult.length == 2, s"Expected 2 rows (ids 2,3), got ${rangeResult.length}") + assert(rangeResult.map(_.getInt(0)).sorted.toSeq == Seq(2, 3)) + + spark.sql("DROP TABLE test_cat.db.year_date_residual") + } + } + } }