From ee6be60f9f1e304a30e496f828d7f9fa765fc53a Mon Sep 17 00:00:00 2001 From: shekharrajak Date: Sat, 20 Dec 2025 20:43:59 +0530 Subject: [PATCH 1/8] feat: allow native Iceberg scans with non-identity transform residuals --- .../apache/comet/rules/CometScanRule.scala | 41 +++++++++++++------ 1 file changed, 28 insertions(+), 13 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala index 2cd5ba156c..03f159c118 100644 --- a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala +++ b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala @@ -547,19 +547,34 @@ case class CometScanRule(session: SparkSession) false } - // Check for unsupported transform functions in residual expressions - // iceberg-rust can only handle identity transforms in residuals; all other transforms - // (truncate, bucket, year, month, day, hour) must fall back to Spark - val transformFunctionsSupported = taskValidation.nonIdentityTransform match { - case Some(transformType) => - fallbackReasons += - s"Iceberg transform function '$transformType' in residual expression " + - "is not yet supported by iceberg-rust. " + - "Only identity transforms are supported." - false - case None => - true - } + // Check for transform functions in residual expressions + // Non-identity transforms (truncate, bucket, year, month, day, hour) in residuals + // are now supported - they skip row-group filtering and are handled post-scan by CometFilter. + // This is less optimal than row-group filtering but still allows native execution. + val transformFunctionsSupported = + try { + IcebergReflection.findNonIdentityTransformInResiduals(metadata.tasks) match { + case Some(transformType) => + // Found non-identity transform - log info and continue with native scan + // Row-group filtering will skip these predicates, but post-scan filtering will apply + logInfo( + s"Iceberg residual contains transform '$transformType' - " + + "row-group filtering will skip this predicate, " + + "post-scan filtering by CometFilter will apply instead.") + true // Allow native execution + case None => + // No non-identity transforms - optimal row-group filtering will apply + true + } + } catch { + case e: Exception => + // Reflection failure - log warning but allow native execution + // The predicate conversion will handle unsupported cases gracefully + logWarning( + s"Could not check for transform functions in residuals: ${e.getMessage}. " + + "Continuing with native scan.") + true + } // Check for unsupported struct types in delete files val deleteFileTypesSupported = { From 54ce20af21f3b1752f70ccbc1e5a46161d9984a7 Mon Sep 17 00:00:00 2001 From: shekharrajak Date: Mon, 5 Jan 2026 23:24:29 +0530 Subject: [PATCH 2/8] Add tests for non-identity transform residuals and fix scalastyle errors --- .../apache/comet/rules/CometScanRule.scala | 6 +- .../comet/CometIcebergNativeSuite.scala | 154 ++++++++++++++++++ 2 files changed, 158 insertions(+), 2 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala index 03f159c118..b8d3578cc6 100644 --- a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala +++ b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala @@ -549,14 +549,16 @@ case class CometScanRule(session: SparkSession) // Check for transform functions in residual expressions // Non-identity transforms (truncate, bucket, year, month, day, hour) in residuals - // are now supported - they skip row-group filtering and are handled post-scan by CometFilter. + // are now supported - they skip row-group filtering and are handled + // post-scan by CometFilter. // This is less optimal than row-group filtering but still allows native execution. val transformFunctionsSupported = try { IcebergReflection.findNonIdentityTransformInResiduals(metadata.tasks) match { case Some(transformType) => // Found non-identity transform - log info and continue with native scan - // Row-group filtering will skip these predicates, but post-scan filtering will apply + // Row-group filtering will skip these predicates, but post-scan + // filtering will apply logInfo( s"Iceberg residual contains transform '$transformType' - " + "row-group filtering will skip this predicate, " + diff --git a/spark/src/test/scala/org/apache/comet/CometIcebergNativeSuite.scala b/spark/src/test/scala/org/apache/comet/CometIcebergNativeSuite.scala index ba593dcaa1..4683356631 100644 --- a/spark/src/test/scala/org/apache/comet/CometIcebergNativeSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometIcebergNativeSuite.scala @@ -2511,6 +2511,160 @@ class CometIcebergNativeSuite extends CometTestBase with RESTCatalogHelper { } } + // Tests for non-identity transform residuals feature + // These tests verify that native Iceberg scans work when residual expressions + // contain non-identity transforms (truncate, bucket, year, month, day, hour). + // Previously these would fall back to Spark, but now they're supported with + // post-scan filtering via CometFilter. + + test("non-identity transform residual - truncate transform allows native scan") { + assume(icebergAvailable, "Iceberg not available in classpath") + + withTempIcebergDir { warehouseDir => + withSQLConf( + "spark.sql.catalog.test_cat" -> "org.apache.iceberg.spark.SparkCatalog", + "spark.sql.catalog.test_cat.type" -> "hadoop", + "spark.sql.catalog.test_cat.warehouse" -> warehouseDir.getAbsolutePath, + CometConf.COMET_ENABLED.key -> "true", + CometConf.COMET_EXEC_ENABLED.key -> "true", + CometConf.COMET_ICEBERG_NATIVE_ENABLED.key -> "true") { + + // Create table partitioned by truncate transform + // When filtering by exact value (e.g., name = 'alpha_1'), Iceberg creates + // a residual expression because truncate(5, name) can't fully evaluate this + spark.sql(""" + CREATE TABLE test_cat.db.truncate_residual_test ( + id INT, + name STRING + ) USING iceberg + PARTITIONED BY (truncate(5, name)) + """) + + spark.sql(""" + INSERT INTO test_cat.db.truncate_residual_test VALUES + (1, 'alpha_1'), (2, 'alpha_2'), (3, 'alpha_3'), + (4, 'bravo_1'), (5, 'bravo_2'), (6, 'charlie_1') + """) + + // This filter creates a residual with truncate transform + // The partition can narrow down to 'alpha' prefix, but exact match + // requires post-scan filtering + checkIcebergNativeScan( + "SELECT * FROM test_cat.db.truncate_residual_test WHERE name = 'alpha_2' ORDER BY id") + + // Verify correct results + val result = spark + .sql("SELECT * FROM test_cat.db.truncate_residual_test WHERE name = 'alpha_2'") + .collect() + assert(result.length == 1, s"Expected 1 row, got ${result.length}") + assert(result(0).getInt(0) == 2, s"Expected id=2, got ${result(0).getInt(0)}") + + spark.sql("DROP TABLE test_cat.db.truncate_residual_test") + } + } + } + + test("non-identity transform residual - bucket transform allows native scan") { + assume(icebergAvailable, "Iceberg not available in classpath") + + withTempIcebergDir { warehouseDir => + withSQLConf( + "spark.sql.catalog.test_cat" -> "org.apache.iceberg.spark.SparkCatalog", + "spark.sql.catalog.test_cat.type" -> "hadoop", + "spark.sql.catalog.test_cat.warehouse" -> warehouseDir.getAbsolutePath, + CometConf.COMET_ENABLED.key -> "true", + CometConf.COMET_EXEC_ENABLED.key -> "true", + CometConf.COMET_ICEBERG_NATIVE_ENABLED.key -> "true") { + + // Create table partitioned by bucket transform + // When filtering by exact id value, Iceberg creates a residual expression + // because bucket(4, id) maps multiple ids to the same bucket + spark.sql(""" + CREATE TABLE test_cat.db.bucket_residual_test ( + id INT, + value DOUBLE + ) USING iceberg + PARTITIONED BY (bucket(4, id)) + """) + + spark.sql(""" + INSERT INTO test_cat.db.bucket_residual_test + SELECT id, CAST(id * 1.5 AS DOUBLE) as value + FROM range(100) + """) + + // This filter creates a residual with bucket transform + // The partition pruning uses bucket hash, but exact id match + // requires post-scan filtering + checkIcebergNativeScan( + "SELECT * FROM test_cat.db.bucket_residual_test WHERE id = 42 ORDER BY id") + + // Verify correct results + val result = + spark.sql("SELECT * FROM test_cat.db.bucket_residual_test WHERE id = 42").collect() + assert(result.length == 1, s"Expected 1 row, got ${result.length}") + assert(result(0).getInt(0) == 42, s"Expected id=42, got ${result(0).getInt(0)}") + + spark.sql("DROP TABLE test_cat.db.bucket_residual_test") + } + } + } + + test("non-identity transform residual - year transform allows native scan") { + assume(icebergAvailable, "Iceberg not available in classpath") + + withTempIcebergDir { warehouseDir => + withSQLConf( + "spark.sql.catalog.test_cat" -> "org.apache.iceberg.spark.SparkCatalog", + "spark.sql.catalog.test_cat.type" -> "hadoop", + "spark.sql.catalog.test_cat.warehouse" -> warehouseDir.getAbsolutePath, + CometConf.COMET_ENABLED.key -> "true", + CometConf.COMET_EXEC_ENABLED.key -> "true", + CometConf.COMET_ICEBERG_NATIVE_ENABLED.key -> "true") { + + // Create table partitioned by year transform + // When filtering by exact date, Iceberg creates a residual expression + // because year(event_date) groups all dates in a year together + spark.sql(""" + CREATE TABLE test_cat.db.year_residual_test ( + id INT, + event_date DATE, + data STRING + ) USING iceberg + PARTITIONED BY (year(event_date)) + """) + + spark.sql(""" + INSERT INTO test_cat.db.year_residual_test VALUES + (1, DATE '2023-01-15', 'jan'), + (2, DATE '2023-06-20', 'jun'), + (3, DATE '2023-12-25', 'dec'), + (4, DATE '2024-01-10', 'new_year'), + (5, DATE '2024-07-04', 'july') + """) + + // This filter creates a residual with year transform + // Partition pruning narrows to 2023, but exact date match + // requires post-scan filtering + checkIcebergNativeScan( + "SELECT * FROM test_cat.db.year_residual_test WHERE event_date = DATE '2023-06-20'") + + // Verify correct results + val result = spark + .sql( + "SELECT * FROM test_cat.db.year_residual_test WHERE event_date = DATE '2023-06-20'") + .collect() + assert(result.length == 1, s"Expected 1 row, got ${result.length}") + assert(result(0).getInt(0) == 2, s"Expected id=2, got ${result(0).getInt(0)}") + assert( + result(0).getString(2) == "jun", + s"Expected data='jun', got ${result(0).getString(2)}") + + spark.sql("DROP TABLE test_cat.db.year_residual_test") + } + } + } + // Helper to create temp directory def withTempIcebergDir(f: File => Unit): Unit = { val dir = Files.createTempDirectory("comet-iceberg-test").toFile From 3cfa9d74f6c72ad332c7064186696b96a26cc6ca Mon Sep 17 00:00:00 2001 From: shekharrajak Date: Thu, 29 Jan 2026 09:15:39 +0530 Subject: [PATCH 3/8] Add tests for month, day, and hour transform residuals --- .../comet/CometIcebergNativeSuite.scala | 163 ++++++++++++++++++ 1 file changed, 163 insertions(+) diff --git a/spark/src/test/scala/org/apache/comet/CometIcebergNativeSuite.scala b/spark/src/test/scala/org/apache/comet/CometIcebergNativeSuite.scala index 4683356631..fbfeffb47d 100644 --- a/spark/src/test/scala/org/apache/comet/CometIcebergNativeSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometIcebergNativeSuite.scala @@ -2665,6 +2665,169 @@ class CometIcebergNativeSuite extends CometTestBase with RESTCatalogHelper { } } + test("non-identity transform residual - month transform allows native scan") { + assume(icebergAvailable, "Iceberg not available in classpath") + + withTempIcebergDir { warehouseDir => + withSQLConf( + "spark.sql.catalog.test_cat" -> "org.apache.iceberg.spark.SparkCatalog", + "spark.sql.catalog.test_cat.type" -> "hadoop", + "spark.sql.catalog.test_cat.warehouse" -> warehouseDir.getAbsolutePath, + CometConf.COMET_ENABLED.key -> "true", + CometConf.COMET_EXEC_ENABLED.key -> "true", + CometConf.COMET_ICEBERG_NATIVE_ENABLED.key -> "true") { + + // Create table partitioned by month transform + // When filtering by exact date, Iceberg creates a residual expression + // because months(event_date) groups all dates in a month together + spark.sql(""" + CREATE TABLE test_cat.db.month_residual_test ( + id INT, + event_date DATE, + data STRING + ) USING iceberg + PARTITIONED BY (months(event_date)) + """) + + spark.sql(""" + INSERT INTO test_cat.db.month_residual_test VALUES + (1, DATE '2023-06-01', 'first'), + (2, DATE '2023-06-15', 'mid'), + (3, DATE '2023-06-30', 'last'), + (4, DATE '2023-07-05', 'july'), + (5, DATE '2023-05-20', 'may') + """) + + // This filter creates a residual with month transform + // Partition pruning narrows to June 2023, but exact date match + // requires post-scan filtering + checkIcebergNativeScan( + "SELECT * FROM test_cat.db.month_residual_test WHERE event_date = DATE '2023-06-15'") + + // Verify correct results + val result = spark + .sql( + "SELECT * FROM test_cat.db.month_residual_test WHERE event_date = DATE '2023-06-15'") + .collect() + assert(result.length == 1, s"Expected 1 row, got ${result.length}") + assert(result(0).getInt(0) == 2, s"Expected id=2, got ${result(0).getInt(0)}") + assert( + result(0).getString(2) == "mid", + s"Expected data='mid', got ${result(0).getString(2)}") + + spark.sql("DROP TABLE test_cat.db.month_residual_test") + } + } + } + + test("non-identity transform residual - day transform allows native scan") { + assume(icebergAvailable, "Iceberg not available in classpath") + + withTempIcebergDir { warehouseDir => + withSQLConf( + "spark.sql.catalog.test_cat" -> "org.apache.iceberg.spark.SparkCatalog", + "spark.sql.catalog.test_cat.type" -> "hadoop", + "spark.sql.catalog.test_cat.warehouse" -> warehouseDir.getAbsolutePath, + CometConf.COMET_ENABLED.key -> "true", + CometConf.COMET_EXEC_ENABLED.key -> "true", + CometConf.COMET_ICEBERG_NATIVE_ENABLED.key -> "true") { + + // Create table partitioned by day transform + // When filtering by exact timestamp, Iceberg creates a residual expression + // because days(event_time) groups all timestamps in a day together + spark.sql(""" + CREATE TABLE test_cat.db.day_residual_test ( + id INT, + event_time TIMESTAMP, + data STRING + ) USING iceberg + PARTITIONED BY (days(event_time)) + """) + + spark.sql(""" + INSERT INTO test_cat.db.day_residual_test VALUES + (1, TIMESTAMP '2023-06-15 08:00:00', 'morning'), + (2, TIMESTAMP '2023-06-15 14:30:00', 'afternoon'), + (3, TIMESTAMP '2023-06-15 22:45:00', 'evening'), + (4, TIMESTAMP '2023-06-16 10:00:00', 'next_day'), + (5, TIMESTAMP '2023-06-14 18:00:00', 'prev_day') + """) + + // This filter creates a residual with day transform + // Partition pruning narrows to June 15, but exact timestamp match + // requires post-scan filtering + checkIcebergNativeScan( + "SELECT * FROM test_cat.db.day_residual_test WHERE event_time = TIMESTAMP '2023-06-15 14:30:00'") + + // Verify correct results + val result = spark + .sql("SELECT * FROM test_cat.db.day_residual_test WHERE event_time = TIMESTAMP '2023-06-15 14:30:00'") + .collect() + assert(result.length == 1, s"Expected 1 row, got ${result.length}") + assert(result(0).getInt(0) == 2, s"Expected id=2, got ${result(0).getInt(0)}") + assert( + result(0).getString(2) == "afternoon", + s"Expected data='afternoon', got ${result(0).getString(2)}") + + spark.sql("DROP TABLE test_cat.db.day_residual_test") + } + } + } + + test("non-identity transform residual - hour transform allows native scan") { + assume(icebergAvailable, "Iceberg not available in classpath") + + withTempIcebergDir { warehouseDir => + withSQLConf( + "spark.sql.catalog.test_cat" -> "org.apache.iceberg.spark.SparkCatalog", + "spark.sql.catalog.test_cat.type" -> "hadoop", + "spark.sql.catalog.test_cat.warehouse" -> warehouseDir.getAbsolutePath, + CometConf.COMET_ENABLED.key -> "true", + CometConf.COMET_EXEC_ENABLED.key -> "true", + CometConf.COMET_ICEBERG_NATIVE_ENABLED.key -> "true") { + + // Create table partitioned by hour transform + // When filtering by exact timestamp with seconds, Iceberg creates a residual + // because hours(event_time) groups all timestamps in an hour together + spark.sql(""" + CREATE TABLE test_cat.db.hour_residual_test ( + id INT, + event_time TIMESTAMP, + data STRING + ) USING iceberg + PARTITIONED BY (hours(event_time)) + """) + + spark.sql(""" + INSERT INTO test_cat.db.hour_residual_test VALUES + (1, TIMESTAMP '2023-06-15 14:10:30', 'early'), + (2, TIMESTAMP '2023-06-15 14:30:45', 'mid'), + (3, TIMESTAMP '2023-06-15 14:55:15', 'late'), + (4, TIMESTAMP '2023-06-15 15:05:00', 'next_hour'), + (5, TIMESTAMP '2023-06-15 13:50:00', 'prev_hour') + """) + + // This filter creates a residual with hour transform + // Partition pruning narrows to hour 14 (2pm), but exact timestamp + // with seconds requires post-scan filtering + checkIcebergNativeScan( + "SELECT * FROM test_cat.db.hour_residual_test WHERE event_time = TIMESTAMP '2023-06-15 14:30:45'") + + // Verify correct results + val result = spark + .sql("SELECT * FROM test_cat.db.hour_residual_test WHERE event_time = TIMESTAMP '2023-06-15 14:30:45'") + .collect() + assert(result.length == 1, s"Expected 1 row, got ${result.length}") + assert(result(0).getInt(0) == 2, s"Expected id=2, got ${result(0).getInt(0)}") + assert( + result(0).getString(2) == "mid", + s"Expected data='mid', got ${result(0).getString(2)}") + + spark.sql("DROP TABLE test_cat.db.hour_residual_test") + } + } + } + // Helper to create temp directory def withTempIcebergDir(f: File => Unit): Unit = { val dir = Files.createTempDirectory("comet-iceberg-test").toFile From 6a5eecdce2a5d36266d60f7f14ec3e808d2ed632 Mon Sep 17 00:00:00 2001 From: shekharrajak Date: Sat, 31 Jan 2026 23:29:07 +0530 Subject: [PATCH 4/8] Fix delete operations with non-identity transform residuals --- .../apache/comet/rules/CometScanRule.scala | 36 +++++++++++-------- 1 file changed, 21 insertions(+), 15 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala index b8d3578cc6..dd4da61511 100644 --- a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala +++ b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala @@ -556,26 +556,32 @@ case class CometScanRule(session: SparkSession) try { IcebergReflection.findNonIdentityTransformInResiduals(metadata.tasks) match { case Some(transformType) => - // Found non-identity transform - log info and continue with native scan - // Row-group filtering will skip these predicates, but post-scan - // filtering will apply - logInfo( - s"Iceberg residual contains transform '$transformType' - " + - "row-group filtering will skip this predicate, " + - "post-scan filtering by CometFilter will apply instead.") - true // Allow native execution + val deleteFiles = IcebergReflection.getDeleteFiles(metadata.tasks) + + if (!deleteFiles.isEmpty) { + // Delete operations with non-identity transforms must fall back to Spark. + // convertIcebergExpression() cannot convert BoundTerm with transforms, + // causing residuals to be dropped. This breaks delete operations which + // rely on residuals to identify correct rows. + // Future: Convert transforms to Spark expressions (bucket→pmod, year→Year, etc) + fallbackReasons += + s"Iceberg transform '$transformType' with delete files present. " + + "Falling back to ensure correct delete operation." + false + } else { + // Read-only: Safe to continue natively with post-scan filtering + logInfo( + s"Iceberg residual contains transform '$transformType' - " + + "post-scan filtering will apply.") + true + } case None => - // No non-identity transforms - optimal row-group filtering will apply true } } catch { case e: Exception => - // Reflection failure - log warning but allow native execution - // The predicate conversion will handle unsupported cases gracefully - logWarning( - s"Could not check for transform functions in residuals: ${e.getMessage}. " + - "Continuing with native scan.") - true + fallbackReasons += s"Could not check for transform functions: ${e.getMessage}" + false } // Check for unsupported struct types in delete files From d802c76646ca6f0110ba29ca0ab90ff14de306cc Mon Sep 17 00:00:00 2001 From: shekharrajak Date: Fri, 3 Apr 2026 22:55:51 +0530 Subject: [PATCH 5/8] assert CometFilterExec present in non-identity transform residual tests --- .../comet/CometIcebergNativeSuite.scala | 31 ++++++++++++++----- 1 file changed, 24 insertions(+), 7 deletions(-) diff --git a/spark/src/test/scala/org/apache/comet/CometIcebergNativeSuite.scala b/spark/src/test/scala/org/apache/comet/CometIcebergNativeSuite.scala index fbfeffb47d..eb7ddcd12a 100644 --- a/spark/src/test/scala/org/apache/comet/CometIcebergNativeSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometIcebergNativeSuite.scala @@ -28,7 +28,7 @@ import scala.jdk.CollectionConverters._ import org.apache.spark.CometListenerBusUtils import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd} import org.apache.spark.sql.CometTestBase -import org.apache.spark.sql.comet.CometIcebergNativeScanExec +import org.apache.spark.sql.comet.{CometFilterExec, CometIcebergNativeScanExec} import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{StringType, TimestampType} @@ -73,6 +73,23 @@ class CometIcebergNativeSuite extends CometTestBase with RESTCatalogHelper { s"Expected exactly 1 CometIcebergNativeScanExec but found ${icebergScans.length}. Plan:\n$cometPlan") } + /** + * Verifies query correctness, exactly one CometIcebergNativeScanExec, and at least one + * CometFilterExec in the plan. Used for non-identity transform residual tests where + * iceberg-rust skips row-group filtering and CometFilter applies the predicate post-scan. + */ + private def checkIcebergNativeScanWithFilter(query: String): Unit = { + val (_, cometPlan) = checkSparkAnswer(query) + val icebergScans = collectIcebergNativeScans(cometPlan) + assert( + icebergScans.length == 1, + s"Expected exactly 1 CometIcebergNativeScanExec but found ${icebergScans.length}. Plan:\n$cometPlan") + val filters = collect(cometPlan) { case f: CometFilterExec => f } + assert( + filters.nonEmpty, + s"Expected CometFilterExec for post-scan filtering but found none. Plan:\n$cometPlan") + } + test("create and query simple Iceberg table with Hadoop catalog") { assume(icebergAvailable, "Iceberg not available in classpath") @@ -2549,7 +2566,7 @@ class CometIcebergNativeSuite extends CometTestBase with RESTCatalogHelper { // This filter creates a residual with truncate transform // The partition can narrow down to 'alpha' prefix, but exact match // requires post-scan filtering - checkIcebergNativeScan( + checkIcebergNativeScanWithFilter( "SELECT * FROM test_cat.db.truncate_residual_test WHERE name = 'alpha_2' ORDER BY id") // Verify correct results @@ -2596,7 +2613,7 @@ class CometIcebergNativeSuite extends CometTestBase with RESTCatalogHelper { // This filter creates a residual with bucket transform // The partition pruning uses bucket hash, but exact id match // requires post-scan filtering - checkIcebergNativeScan( + checkIcebergNativeScanWithFilter( "SELECT * FROM test_cat.db.bucket_residual_test WHERE id = 42 ORDER BY id") // Verify correct results @@ -2646,7 +2663,7 @@ class CometIcebergNativeSuite extends CometTestBase with RESTCatalogHelper { // This filter creates a residual with year transform // Partition pruning narrows to 2023, but exact date match // requires post-scan filtering - checkIcebergNativeScan( + checkIcebergNativeScanWithFilter( "SELECT * FROM test_cat.db.year_residual_test WHERE event_date = DATE '2023-06-20'") // Verify correct results @@ -2701,7 +2718,7 @@ class CometIcebergNativeSuite extends CometTestBase with RESTCatalogHelper { // This filter creates a residual with month transform // Partition pruning narrows to June 2023, but exact date match // requires post-scan filtering - checkIcebergNativeScan( + checkIcebergNativeScanWithFilter( "SELECT * FROM test_cat.db.month_residual_test WHERE event_date = DATE '2023-06-15'") // Verify correct results @@ -2756,7 +2773,7 @@ class CometIcebergNativeSuite extends CometTestBase with RESTCatalogHelper { // This filter creates a residual with day transform // Partition pruning narrows to June 15, but exact timestamp match // requires post-scan filtering - checkIcebergNativeScan( + checkIcebergNativeScanWithFilter( "SELECT * FROM test_cat.db.day_residual_test WHERE event_time = TIMESTAMP '2023-06-15 14:30:00'") // Verify correct results @@ -2810,7 +2827,7 @@ class CometIcebergNativeSuite extends CometTestBase with RESTCatalogHelper { // This filter creates a residual with hour transform // Partition pruning narrows to hour 14 (2pm), but exact timestamp // with seconds requires post-scan filtering - checkIcebergNativeScan( + checkIcebergNativeScanWithFilter( "SELECT * FROM test_cat.db.hour_residual_test WHERE event_time = TIMESTAMP '2023-06-15 14:30:45'") // Verify correct results From 9723a7048c5dc5762105ead4ad7728839eeb9bed Mon Sep 17 00:00:00 2001 From: shekharrajak Date: Sat, 4 Apr 2026 14:10:32 +0530 Subject: [PATCH 6/8] test: add integration tests for non-identity transform residuals --- .../comet/CometIcebergNativeSuite.scala | 485 ++++++++++++++++++ 1 file changed, 485 insertions(+) diff --git a/spark/src/test/scala/org/apache/comet/CometIcebergNativeSuite.scala b/spark/src/test/scala/org/apache/comet/CometIcebergNativeSuite.scala index eb7ddcd12a..4f0fc576e5 100644 --- a/spark/src/test/scala/org/apache/comet/CometIcebergNativeSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometIcebergNativeSuite.scala @@ -3170,6 +3170,144 @@ class CometIcebergNativeSuite extends CometTestBase with RESTCatalogHelper { } } + // ========================================================================= + // Additional integration tests for non-identity transform residuals + // ========================================================================= + + // Test A: Non-identity transform with delete files - verify correctness. + // When delete files and non-identity transforms coexist, query results must + // be correct regardless of whether the native scan or Spark handles it. + // Uses truncate(3, name) so the query and deleted row share the same partition + // (truncate(3, 'alpha') = truncate(3, 'alpine') = 'alp'). + test("non-identity transform residual - correct results with delete files present") { + assume(icebergAvailable, "Iceberg not available in classpath") + + withTempIcebergDir { warehouseDir => + withSQLConf( + "spark.sql.catalog.test_cat" -> "org.apache.iceberg.spark.SparkCatalog", + "spark.sql.catalog.test_cat.type" -> "hadoop", + "spark.sql.catalog.test_cat.warehouse" -> warehouseDir.getAbsolutePath, + CometConf.COMET_ENABLED.key -> "true", + CometConf.COMET_EXEC_ENABLED.key -> "true", + CometConf.COMET_ICEBERG_NATIVE_ENABLED.key -> "true") { + + val dataPath = s"${warehouseDir.getAbsolutePath}/nested_data" + + // Write Parquet WITHOUT Iceberg (simulates pre-migration data) + // id is last so its leaf index is after all nested type leaves + spark + .sql(""" + SELECT + named_struct('age', id * 10, 'score', id * 1.5) AS info, + array(id, id + 1) AS tags, + map('key', id) AS props, + id + FROM range(10) + """) + .write + .parquet(dataPath) + + spark.sql("CREATE NAMESPACE IF NOT EXISTS test_cat.db") + spark.sql(""" + CREATE TABLE test_cat.db.nested_migrate ( + info STRUCT, + tags ARRAY, + props MAP, + id BIGINT + ) USING iceberg + """) + + try { + val tableUtilClass = Class.forName("org.apache.iceberg.spark.SparkTableUtil") + val sparkCatalog = spark.sessionState.catalogManager + .catalog("test_cat") + .asInstanceOf[org.apache.iceberg.spark.SparkCatalog] + val ident = + org.apache.spark.sql.connector.catalog.Identifier.of(Array("db"), "nested_migrate") + val sparkTable = sparkCatalog + .loadTable(ident) + .asInstanceOf[org.apache.iceberg.spark.source.SparkTable] + val table = sparkTable.table() + + val stagingDir = s"${warehouseDir.getAbsolutePath}/staging" + spark.sql(s"""CREATE TABLE parquet_temp USING parquet LOCATION '$dataPath'""") + val sourceIdent = new org.apache.spark.sql.catalyst.TableIdentifier("parquet_temp") + + val importMethod = tableUtilClass.getMethod( + "importSparkTable", + classOf[org.apache.spark.sql.SparkSession], + classOf[org.apache.spark.sql.catalyst.TableIdentifier], + classOf[org.apache.iceberg.Table], + classOf[String]) + importMethod.invoke(null, spark, sourceIdent, table, stagingDir) + + // Select only flat columns to avoid Spark's Iceberg reader returning + // null for struct fields in migrated tables (separate Spark bug) + checkIcebergNativeScan("SELECT id FROM test_cat.db.nested_migrate ORDER BY id") + + // Filter on root column with nested types in migrated table: + // Parquet files lack Iceberg field IDs, so iceberg-rust falls back to + // name mapping where column_map resolution was broken for nested types + checkIcebergNativeScan( + "SELECT id FROM test_cat.db.nested_migrate WHERE id > 5 ORDER BY id") + + spark.sql("DROP TABLE test_cat.db.nested_migrate") + spark.sql("DROP TABLE parquet_temp") + } catch { + case _: ClassNotFoundException => + cancel("SparkTableUtil not available") + } + spark.sql(""" + CREATE TABLE test_cat.db.truncate_delete_test ( + id INT, + name STRING, + value DOUBLE + ) USING iceberg + PARTITIONED BY (truncate(3, name)) + TBLPROPERTIES ( + 'format-version' = '2', + 'write.delete.mode' = 'merge-on-read', + 'write.merge.mode' = 'merge-on-read' + ) + """) + + spark.sql(""" + INSERT INTO test_cat.db.truncate_delete_test VALUES + (1, 'alpha', 10.0), (2, 'alpine', 20.0), (3, 'bravo', 30.0), + (4, 'bridge', 40.0), (5, 'charlie', 50.0), (6, 'cherry', 60.0) + """) + + // Delete 'alpine' which shares truncate(3)='alp' partition with 'alpha'. + spark.sql("DELETE FROM test_cat.db.truncate_delete_test WHERE name = 'alpine'") + + // Query for 'alpha' creates a residual on the truncate transform. + // The deleted row 'alpine' must not appear, and 'alpha' must be returned. + val query = + "SELECT * FROM test_cat.db.truncate_delete_test WHERE name = 'alpha' ORDER BY id" + val (_, cometPlan) = checkSparkAnswer(query) + + // Verify correct results: only 'alpha' returned, 'alpine' is deleted + val result = spark.sql(query).collect() + assert(result.length == 1, s"Expected 1 row, got ${result.length}") + assert(result(0).getInt(0) == 1, s"Expected id=1, got ${result(0).getInt(0)}") + assert(result(0).getString(1) == "alpha") + + // Verify 'alpine' is truly gone from broader query + val allAlpResult = spark + .sql("SELECT * FROM test_cat.db.truncate_delete_test ORDER BY id") + .collect() + assert( + allAlpResult.length == 5, + s"Expected 5 rows after delete, got ${allAlpResult.length}") + assert( + !allAlpResult.exists(_.getString(1) == "alpine"), + "Deleted row 'alpine' should not appear in results") + + spark.sql("DROP TABLE test_cat.db.truncate_delete_test") + } + } + } + test("task-level inputMetrics.bytesRead is populated for Iceberg native scan") { assume(icebergAvailable, "Iceberg not available in classpath") @@ -3268,4 +3406,351 @@ class CometIcebergNativeSuite extends CometTestBase with RESTCatalogHelper { } } } + + // Test B: Composite transforms - table partitioned by two non-identity transforms. + // Verifies native scan works when filtering on columns involved in multiple transforms. + test("non-identity transform residual - composite bucket and truncate transforms") { + assume(icebergAvailable, "Iceberg not available in classpath") + + withTempIcebergDir { warehouseDir => + withSQLConf( + "spark.sql.catalog.test_cat" -> "org.apache.iceberg.spark.SparkCatalog", + "spark.sql.catalog.test_cat.type" -> "hadoop", + "spark.sql.catalog.test_cat.warehouse" -> warehouseDir.getAbsolutePath, + CometConf.COMET_ENABLED.key -> "true", + CometConf.COMET_EXEC_ENABLED.key -> "true", + CometConf.COMET_ICEBERG_NATIVE_ENABLED.key -> "true") { + + spark.sql(""" + CREATE TABLE test_cat.db.composite_transform ( + id INT, + name STRING, + value DOUBLE + ) USING iceberg + PARTITIONED BY (bucket(4, id), truncate(3, name)) + """) + + spark.sql(""" + INSERT INTO test_cat.db.composite_transform VALUES + (1, 'alpha', 10.0), (2, 'alpha', 20.0), (3, 'bravo', 30.0), + (4, 'bravo', 40.0), (5, 'charlie', 50.0), (6, 'delta', 60.0), + (10, 'alpha', 70.0), (20, 'bravo', 80.0), (30, 'charlie', 90.0) + """) + + // Filter on both partition columns creates residuals for both transforms + checkIcebergNativeScanWithFilter( + "SELECT * FROM test_cat.db.composite_transform " + + "WHERE id = 3 AND name = 'bravo' ORDER BY id") + + // Verify correct results + val result = spark + .sql("SELECT * FROM test_cat.db.composite_transform " + + "WHERE id = 3 AND name = 'bravo'") + .collect() + assert(result.length == 1, s"Expected 1 row, got ${result.length}") + assert(result(0).getInt(0) == 3) + assert(result(0).getString(1) == "bravo") + + // Filter on only one of the two transform columns + checkIcebergNativeScanWithFilter( + "SELECT * FROM test_cat.db.composite_transform WHERE name = 'alpha' ORDER BY id") + + val result2 = spark + .sql("SELECT * FROM test_cat.db.composite_transform WHERE name = 'alpha'") + .collect() + assert(result2.length == 3, s"Expected 3 rows for 'alpha', got ${result2.length}") + + spark.sql("DROP TABLE test_cat.db.composite_transform") + } + } + } + + // Test C: Range filters on non-identity transforms. + // Range predicates (>, <, BETWEEN) produce different residual expression shapes + // compared to equality filters. + test("non-identity transform residual - range filters on bucket and day transforms") { + assume(icebergAvailable, "Iceberg not available in classpath") + + withTempIcebergDir { warehouseDir => + withSQLConf( + "spark.sql.catalog.test_cat" -> "org.apache.iceberg.spark.SparkCatalog", + "spark.sql.catalog.test_cat.type" -> "hadoop", + "spark.sql.catalog.test_cat.warehouse" -> warehouseDir.getAbsolutePath, + CometConf.COMET_ENABLED.key -> "true", + CometConf.COMET_EXEC_ENABLED.key -> "true", + CometConf.COMET_ICEBERG_NATIVE_ENABLED.key -> "true") { + + // Range filter on bucket-partitioned int column + spark.sql(""" + CREATE TABLE test_cat.db.bucket_range ( + id INT, + value DOUBLE + ) USING iceberg + PARTITIONED BY (bucket(4, id)) + """) + + spark.sql(""" + INSERT INTO test_cat.db.bucket_range + SELECT id, CAST(id * 2.5 AS DOUBLE) as value + FROM range(100) + """) + + checkIcebergNativeScanWithFilter( + "SELECT * FROM test_cat.db.bucket_range WHERE id > 10 AND id < 20 ORDER BY id") + + val rangeResult = spark + .sql("SELECT * FROM test_cat.db.bucket_range WHERE id > 10 AND id < 20") + .collect() + assert( + rangeResult.length == 9, + s"Expected 9 rows for id in (11..19), got ${rangeResult.length}") + + spark.sql("DROP TABLE test_cat.db.bucket_range") + + // Range filter on day-partitioned timestamp column + spark.sql(""" + CREATE TABLE test_cat.db.day_range ( + id INT, + event_time TIMESTAMP, + data STRING + ) USING iceberg + PARTITIONED BY (days(event_time)) + """) + + spark.sql(""" + INSERT INTO test_cat.db.day_range VALUES + (1, TIMESTAMP '2023-06-15 08:00:00', 'a'), + (2, TIMESTAMP '2023-06-15 12:00:00', 'b'), + (3, TIMESTAMP '2023-06-15 18:00:00', 'c'), + (4, TIMESTAMP '2023-06-16 09:00:00', 'd'), + (5, TIMESTAMP '2023-06-16 15:00:00', 'e'), + (6, TIMESTAMP '2023-06-17 10:00:00', 'f') + """) + + // Range within a single day creates a residual because days() groups + // all timestamps in a day together + checkIcebergNativeScanWithFilter( + "SELECT * FROM test_cat.db.day_range " + + "WHERE event_time > TIMESTAMP '2023-06-15 10:00:00' " + + "AND event_time < TIMESTAMP '2023-06-15 20:00:00' ORDER BY id") + + val dayRangeResult = spark + .sql( + "SELECT * FROM test_cat.db.day_range " + + "WHERE event_time > TIMESTAMP '2023-06-15 10:00:00' " + + "AND event_time < TIMESTAMP '2023-06-15 20:00:00'") + .collect() + assert( + dayRangeResult.length == 2, + s"Expected 2 rows (ids 2,3), got ${dayRangeResult.length}") + + spark.sql("DROP TABLE test_cat.db.day_range") + } + } + } + + // Test D: Aggregates with non-identity transform residuals. + // Ensures the CometFilter + aggregation pipeline works end-to-end + // when post-scan filtering is required. + test("non-identity transform residual - aggregates with bucket transform") { + assume(icebergAvailable, "Iceberg not available in classpath") + + withTempIcebergDir { warehouseDir => + withSQLConf( + "spark.sql.catalog.test_cat" -> "org.apache.iceberg.spark.SparkCatalog", + "spark.sql.catalog.test_cat.type" -> "hadoop", + "spark.sql.catalog.test_cat.warehouse" -> warehouseDir.getAbsolutePath, + CometConf.COMET_ENABLED.key -> "true", + CometConf.COMET_EXEC_ENABLED.key -> "true", + CometConf.COMET_ICEBERG_NATIVE_ENABLED.key -> "true") { + + spark.sql(""" + CREATE TABLE test_cat.db.bucket_agg ( + id INT, + category STRING, + amount DOUBLE + ) USING iceberg + PARTITIONED BY (bucket(4, id)) + """) + + spark.sql(""" + INSERT INTO test_cat.db.bucket_agg VALUES + (1, 'A', 10.0), (1, 'A', 20.0), (1, 'B', 30.0), + (2, 'A', 40.0), (2, 'B', 50.0), + (3, 'A', 60.0), (3, 'C', 70.0), + (5, 'B', 80.0), (5, 'C', 90.0), + (10, 'A', 100.0) + """) + + // Aggregate with filter that creates a bucket residual + checkIcebergNativeScanWithFilter( + "SELECT COUNT(*), SUM(amount) FROM test_cat.db.bucket_agg WHERE id = 1") + + val aggResult = spark + .sql("SELECT COUNT(*), SUM(amount) FROM test_cat.db.bucket_agg WHERE id = 1") + .collect() + assert(aggResult(0).getLong(0) == 3, s"Expected count=3, got ${aggResult(0).getLong(0)}") + assert( + aggResult(0).getDouble(1) == 60.0, + s"Expected sum=60.0, got ${aggResult(0).getDouble(1)}") + + // GROUP BY with filter + checkIcebergNativeScanWithFilter( + "SELECT category, SUM(amount) FROM test_cat.db.bucket_agg " + + "WHERE id IN (1, 2) GROUP BY category ORDER BY category") + + val groupResult = spark + .sql("SELECT category, SUM(amount) FROM test_cat.db.bucket_agg " + + "WHERE id IN (1, 2) GROUP BY category ORDER BY category") + .collect() + assert(groupResult.length == 2, s"Expected 2 groups, got ${groupResult.length}") + // Category A: 10+20+40=70, Category B: 30+50=80 + assert(groupResult(0).getString(0) == "A") + assert(groupResult(0).getDouble(1) == 70.0) + assert(groupResult(1).getString(0) == "B") + assert(groupResult(1).getDouble(1) == 80.0) + + spark.sql("DROP TABLE test_cat.db.bucket_agg") + } + } + } + + // Test E: Compound predicates (OR/AND) with non-identity transform residuals. + // OR predicates across different truncate groups produce more complex residual shapes. + test("non-identity transform residual - compound OR and AND predicates") { + assume(icebergAvailable, "Iceberg not available in classpath") + + withTempIcebergDir { warehouseDir => + withSQLConf( + "spark.sql.catalog.test_cat" -> "org.apache.iceberg.spark.SparkCatalog", + "spark.sql.catalog.test_cat.type" -> "hadoop", + "spark.sql.catalog.test_cat.warehouse" -> warehouseDir.getAbsolutePath, + CometConf.COMET_ENABLED.key -> "true", + CometConf.COMET_EXEC_ENABLED.key -> "true", + CometConf.COMET_ICEBERG_NATIVE_ENABLED.key -> "true") { + + spark.sql(""" + CREATE TABLE test_cat.db.truncate_compound ( + id INT, + name STRING, + value DOUBLE + ) USING iceberg + PARTITIONED BY (truncate(5, name)) + """) + + spark.sql(""" + INSERT INTO test_cat.db.truncate_compound VALUES + (1, 'alpha_1', 10.0), (2, 'alpha_2', 20.0), (3, 'alpha_3', 30.0), + (4, 'bravo_1', 40.0), (5, 'bravo_2', 50.0), + (6, 'charlie_1', 60.0), (7, 'charlie_2', 70.0), + (8, 'delta_1', 80.0) + """) + + // OR across different truncate groups + checkIcebergNativeScanWithFilter( + "SELECT * FROM test_cat.db.truncate_compound " + + "WHERE name = 'alpha_1' OR name = 'bravo_1' ORDER BY id") + + val orResult = spark + .sql("SELECT * FROM test_cat.db.truncate_compound " + + "WHERE name = 'alpha_1' OR name = 'bravo_1'") + .collect() + assert(orResult.length == 2, s"Expected 2 rows for OR, got ${orResult.length}") + assert(orResult.map(_.getInt(0)).sorted.toSeq == Seq(1, 4)) + + // AND with mixed columns + checkIcebergNativeScanWithFilter( + "SELECT * FROM test_cat.db.truncate_compound " + + "WHERE name = 'alpha_2' AND value > 15.0 ORDER BY id") + + val andResult = spark + .sql("SELECT * FROM test_cat.db.truncate_compound " + + "WHERE name = 'alpha_2' AND value > 15.0") + .collect() + assert(andResult.length == 1, s"Expected 1 row for AND, got ${andResult.length}") + assert(andResult(0).getInt(0) == 2) + + // OR within the same truncate group + checkIcebergNativeScanWithFilter( + "SELECT * FROM test_cat.db.truncate_compound " + + "WHERE name = 'alpha_1' OR name = 'alpha_3' ORDER BY id") + + val sameGroupResult = spark + .sql("SELECT * FROM test_cat.db.truncate_compound " + + "WHERE name = 'alpha_1' OR name = 'alpha_3'") + .collect() + assert( + sameGroupResult.length == 2, + s"Expected 2 rows for same-group OR, got ${sameGroupResult.length}") + + spark.sql("DROP TABLE test_cat.db.truncate_compound") + } + } + } + + // Test F: Year transform with DATE type (not just TIMESTAMP). + // The existing year transform test uses TIMESTAMP. This verifies DATE columns + // also produce correct residuals with the years() transform. + test("non-identity transform residual - year transform with date type") { + assume(icebergAvailable, "Iceberg not available in classpath") + + withTempIcebergDir { warehouseDir => + withSQLConf( + "spark.sql.catalog.test_cat" -> "org.apache.iceberg.spark.SparkCatalog", + "spark.sql.catalog.test_cat.type" -> "hadoop", + "spark.sql.catalog.test_cat.warehouse" -> warehouseDir.getAbsolutePath, + CometConf.COMET_ENABLED.key -> "true", + CometConf.COMET_EXEC_ENABLED.key -> "true", + CometConf.COMET_ICEBERG_NATIVE_ENABLED.key -> "true") { + + spark.sql(""" + CREATE TABLE test_cat.db.year_date_residual ( + id INT, + event_date DATE, + data STRING + ) USING iceberg + PARTITIONED BY (years(event_date)) + """) + + spark.sql(""" + INSERT INTO test_cat.db.year_date_residual VALUES + (1, DATE '2022-03-15', 'spring_22'), + (2, DATE '2022-07-20', 'summer_22'), + (3, DATE '2022-11-05', 'fall_22'), + (4, DATE '2023-01-10', 'winter_23'), + (5, DATE '2023-06-25', 'summer_23'), + (6, DATE '2024-02-14', 'winter_24') + """) + + // Exact date match creates a residual because years() groups all dates + // in a year together + checkIcebergNativeScanWithFilter( + "SELECT * FROM test_cat.db.year_date_residual " + + "WHERE event_date = DATE '2022-07-20'") + + val exactResult = spark + .sql("SELECT * FROM test_cat.db.year_date_residual " + + "WHERE event_date = DATE '2022-07-20'") + .collect() + assert(exactResult.length == 1, s"Expected 1 row, got ${exactResult.length}") + assert(exactResult(0).getInt(0) == 2) + assert(exactResult(0).getString(2) == "summer_22") + + // Range within a year creates a residual + checkIcebergNativeScanWithFilter( + "SELECT * FROM test_cat.db.year_date_residual " + + "WHERE event_date >= DATE '2022-06-01' AND event_date <= DATE '2022-12-31' " + + "ORDER BY id") + + val rangeResult = spark + .sql("SELECT * FROM test_cat.db.year_date_residual " + + "WHERE event_date >= DATE '2022-06-01' AND event_date <= DATE '2022-12-31'") + .collect() + assert(rangeResult.length == 2, s"Expected 2 rows (ids 2,3), got ${rangeResult.length}") + assert(rangeResult.map(_.getInt(0)).sorted.toSeq == Seq(2, 3)) + + spark.sql("DROP TABLE test_cat.db.year_date_residual") + } + } + } } From 964287a09455777f73eb78267f5a854db7d68cb0 Mon Sep 17 00:00:00 2001 From: shekharrajak Date: Sat, 4 Apr 2026 17:19:01 +0530 Subject: [PATCH 7/8] refactor: remove dead nonIdentityTransform detection from CometScanRule --- .../apache/comet/rules/CometScanRule.scala | 82 +++---------------- 1 file changed, 13 insertions(+), 69 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala index dd4da61511..18be445eca 100644 --- a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala +++ b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala @@ -547,42 +547,12 @@ case class CometScanRule(session: SparkSession) false } - // Check for transform functions in residual expressions - // Non-identity transforms (truncate, bucket, year, month, day, hour) in residuals - // are now supported - they skip row-group filtering and are handled - // post-scan by CometFilter. - // This is less optimal than row-group filtering but still allows native execution. - val transformFunctionsSupported = - try { - IcebergReflection.findNonIdentityTransformInResiduals(metadata.tasks) match { - case Some(transformType) => - val deleteFiles = IcebergReflection.getDeleteFiles(metadata.tasks) - - if (!deleteFiles.isEmpty) { - // Delete operations with non-identity transforms must fall back to Spark. - // convertIcebergExpression() cannot convert BoundTerm with transforms, - // causing residuals to be dropped. This breaks delete operations which - // rely on residuals to identify correct rows. - // Future: Convert transforms to Spark expressions (bucket→pmod, year→Year, etc) - fallbackReasons += - s"Iceberg transform '$transformType' with delete files present. " + - "Falling back to ensure correct delete operation." - false - } else { - // Read-only: Safe to continue natively with post-scan filtering - logInfo( - s"Iceberg residual contains transform '$transformType' - " + - "post-scan filtering will apply.") - true - } - case None => - true - } - } catch { - case e: Exception => - fallbackReasons += s"Could not check for transform functions: ${e.getMessage}" - false - } + // TODO: A safety guard for non-identity transforms (truncate, bucket, year, etc.) + // with delete files should be re-implemented using PartitionSpec inspection. + // The previous residual-based detection was ineffective because Iceberg residuals + // use NamedReference terms without transform metadata. Correctness is currently + // maintained by Iceberg/Spark's MOR delete processing and CometFilter post-scan + // filtering. See: https://github.com/apache/datafusion-comet/issues/XXXX // Check for unsupported struct types in delete files val deleteFileTypesSupported = { @@ -665,7 +635,7 @@ case class CometScanRule(session: SparkSession) if (schemaSupported && fileIOCompatible && formatVersionSupported && taskValidation.allParquet && allSupportedFilesystems && partitionTypesSupported && - complexTypePredicatesSupported && transformFunctionsSupported && + complexTypePredicatesSupported && deleteFileTypesSupported && dppSubqueriesSupported) { CometBatchScanExec( scanExec.clone().asInstanceOf[BatchScanExec], @@ -843,23 +813,19 @@ object CometScanRule extends Logging { val contentScanTaskClass = Class.forName(IcebergReflection.ClassNames.CONTENT_SCAN_TASK) val contentFileClass = Class.forName(IcebergReflection.ClassNames.CONTENT_FILE) val fileScanTaskClass = Class.forName(IcebergReflection.ClassNames.FILE_SCAN_TASK) - val unboundPredicateClass = Class.forName(IcebergReflection.ClassNames.UNBOUND_PREDICATE) // scalastyle:on classforname // Cache all method lookups outside the loop val fileMethod = contentScanTaskClass.getMethod("file") val formatMethod = contentFileClass.getMethod("format") val pathMethod = contentFileClass.getMethod("path") - val residualMethod = contentScanTaskClass.getMethod("residual") val deletesMethod = fileScanTaskClass.getMethod("deletes") - val termMethod = unboundPredicateClass.getMethod("term") val supportedSchemes = Set("file", "s3", "s3a", "gs", "gcs", "oss", "abfss", "abfs", "wasbs", "wasb") var allParquet = true val unsupportedSchemes = mutable.Set[String]() - var nonIdentityTransform: Option[String] = None val deleteFiles = new java.util.ArrayList[Any]() tasks.asScala.foreach { task => @@ -883,28 +849,11 @@ object CometScanRule extends Logging { case _: java.net.URISyntaxException => // ignore } - // Residual transform check (short-circuit if already found unsupported) - if (nonIdentityTransform.isEmpty && fileScanTaskClass.isInstance(task)) { - try { - val residual = residualMethod.invoke(task) - if (unboundPredicateClass.isInstance(residual)) { - val term = termMethod.invoke(residual) - try { - val transformMethod = term.getClass.getMethod("transform") - transformMethod.setAccessible(true) - val transform = transformMethod.invoke(term) - val transformStr = transform.toString - if (transformStr != IcebergReflection.Transforms.IDENTITY) { - nonIdentityTransform = Some(transformStr) - } - } catch { - case _: NoSuchMethodException => // No transform = simple reference, OK - } - } - } catch { - case _: Exception => // Skip tasks where we can't get residual - } - } + // TODO: Non-identity transform detection via residual inspection was removed + // because Iceberg residuals use NamedReference terms (not BoundTransform), + // so the transform() method is never available. To properly detect non-identity + // transforms, inspect the table's PartitionSpec instead of residual expressions. + // See: https://github.com/apache/datafusion-comet/issues/XXXX // Collect delete files and check their schemes if (fileScanTaskClass.isInstance(task)) { @@ -932,11 +881,7 @@ object CometScanRule extends Logging { } } - IcebergTaskValidationResult( - allParquet, - unsupportedSchemes.toSet, - nonIdentityTransform, - deleteFiles) + IcebergTaskValidationResult(allParquet, unsupportedSchemes.toSet, deleteFiles) } } @@ -946,5 +891,4 @@ object CometScanRule extends Logging { case class IcebergTaskValidationResult( allParquet: Boolean, unsupportedSchemes: Set[String], - nonIdentityTransform: Option[String], deleteFiles: java.util.List[_]) From 292548a0ab5a008630bedfe631e60f3c3adadb95 Mon Sep 17 00:00:00 2001 From: shekharrajak Date: Sat, 4 Apr 2026 17:30:10 +0530 Subject: [PATCH 8/8] feat: implement PartitionSpec-based non-identity transform detection for delete file safety --- .../comet/iceberg/IcebergReflection.scala | 38 +++++++++++++++++ .../apache/comet/rules/CometScanRule.scala | 41 +++++++++++++------ .../comet/CometIcebergNativeSuite.scala | 40 ++++++++++-------- 3 files changed, 89 insertions(+), 30 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/iceberg/IcebergReflection.scala b/spark/src/main/scala/org/apache/comet/iceberg/IcebergReflection.scala index 7c52f320cf..4e1466d9dc 100644 --- a/spark/src/main/scala/org/apache/comet/iceberg/IcebergReflection.scala +++ b/spark/src/main/scala/org/apache/comet/iceberg/IcebergReflection.scala @@ -538,6 +538,44 @@ object IcebergReflection extends Logging { * List of unsupported partition types (empty if all supported). Each entry is (fieldName, * typeStr, reason) */ + /** + * Checks whether a partition spec contains any non-identity transforms. + * + * Non-identity transforms include bucket, truncate, year, month, day, hour. These transforms + * mean that Iceberg's partition pruning cannot fully resolve equality filters on the source + * column, producing residual expressions that require post-scan filtering. + * + * @param partitionSpec + * The Iceberg PartitionSpec object + * @return + * Some(transformStr) for the first non-identity transform found, or None if all are identity + */ + def findNonIdentityTransform(partitionSpec: Any): Option[String] = { + import scala.jdk.CollectionConverters._ + + try { + val fieldsMethod = partitionSpec.getClass.getMethod("fields") + val fields = fieldsMethod.invoke(partitionSpec).asInstanceOf[java.util.List[_]] + + val partitionFieldClass = loadClass(ClassNames.PARTITION_FIELD) + val transformMethod = partitionFieldClass.getMethod("transform") + + fields.asScala.foreach { field => + val transform = transformMethod.invoke(field) + val transformStr = transform.toString + if (transformStr != Transforms.IDENTITY) { + return Some(transformStr) + } + } + None + } catch { + case e: Exception => + logError( + s"Iceberg reflection failure: Failed to inspect partition transforms: ${e.getMessage}") + None + } + } + def validatePartitionTypes(partitionSpec: Any, schema: Any): List[(String, String, String)] = { import scala.jdk.CollectionConverters._ diff --git a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala index 18be445eca..8d9861a037 100644 --- a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala +++ b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala @@ -547,12 +547,33 @@ case class CometScanRule(session: SparkSession) false } - // TODO: A safety guard for non-identity transforms (truncate, bucket, year, etc.) - // with delete files should be re-implemented using PartitionSpec inspection. - // The previous residual-based detection was ineffective because Iceberg residuals - // use NamedReference terms without transform metadata. Correctness is currently - // maintained by Iceberg/Spark's MOR delete processing and CometFilter post-scan - // filtering. See: https://github.com/apache/datafusion-comet/issues/XXXX + // Safety guard: non-identity transforms with delete files must fall back. + // Non-identity transforms (truncate, bucket, year, etc.) produce residual + // expressions that require post-scan filtering. When delete files are also + // present, the native scan cannot correctly apply both the residual filter + // and delete file processing together, so we fall back to Spark. + // Detection uses PartitionSpec (table-level) rather than residual expressions, + // since Iceberg residuals use NamedReference terms without transform metadata. + val transformFunctionsSupported = + IcebergReflection.getPartitionSpec(metadata.table) match { + case Some(partitionSpec) => + IcebergReflection.findNonIdentityTransform(partitionSpec) match { + case Some(transformType) => + if (!taskValidation.deleteFiles.isEmpty) { + fallbackReasons += + s"Iceberg transform '$transformType' with delete files present. " + + "Falling back to ensure correct delete operation." + false + } else { + logInfo( + s"Iceberg partition uses transform '$transformType' - " + + "post-scan filtering will apply via CometFilter.") + true + } + case None => true + } + case None => true // Cannot inspect spec, allow through + } // Check for unsupported struct types in delete files val deleteFileTypesSupported = { @@ -635,7 +656,7 @@ case class CometScanRule(session: SparkSession) if (schemaSupported && fileIOCompatible && formatVersionSupported && taskValidation.allParquet && allSupportedFilesystems && partitionTypesSupported && - complexTypePredicatesSupported && + complexTypePredicatesSupported && transformFunctionsSupported && deleteFileTypesSupported && dppSubqueriesSupported) { CometBatchScanExec( scanExec.clone().asInstanceOf[BatchScanExec], @@ -849,12 +870,6 @@ object CometScanRule extends Logging { case _: java.net.URISyntaxException => // ignore } - // TODO: Non-identity transform detection via residual inspection was removed - // because Iceberg residuals use NamedReference terms (not BoundTransform), - // so the transform() method is never available. To properly detect non-identity - // transforms, inspect the table's PartitionSpec instead of residual expressions. - // See: https://github.com/apache/datafusion-comet/issues/XXXX - // Collect delete files and check their schemes if (fileScanTaskClass.isInstance(task)) { try { diff --git a/spark/src/test/scala/org/apache/comet/CometIcebergNativeSuite.scala b/spark/src/test/scala/org/apache/comet/CometIcebergNativeSuite.scala index 4f0fc576e5..07b1a1cca8 100644 --- a/spark/src/test/scala/org/apache/comet/CometIcebergNativeSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometIcebergNativeSuite.scala @@ -3174,12 +3174,13 @@ class CometIcebergNativeSuite extends CometTestBase with RESTCatalogHelper { // Additional integration tests for non-identity transform residuals // ========================================================================= - // Test A: Non-identity transform with delete files - verify correctness. - // When delete files and non-identity transforms coexist, query results must - // be correct regardless of whether the native scan or Spark handles it. + // Test A: Non-identity transform with delete files must fall back to Spark. + // When the table has a non-identity partition transform (truncate) AND MOR delete + // files are present, the native Iceberg scan must fall back to Spark to ensure + // correct delete processing. Detection uses PartitionSpec inspection. // Uses truncate(3, name) so the query and deleted row share the same partition // (truncate(3, 'alpha') = truncate(3, 'alpine') = 'alp'). - test("non-identity transform residual - correct results with delete files present") { + test("non-identity transform residual - falls back with delete files present") { assume(icebergAvailable, "Iceberg not available in classpath") withTempIcebergDir { warehouseDir => @@ -3258,7 +3259,7 @@ class CometIcebergNativeSuite extends CometTestBase with RESTCatalogHelper { cancel("SparkTableUtil not available") } spark.sql(""" - CREATE TABLE test_cat.db.truncate_delete_test ( + CREATE TABLE test_cat.db.truncate_delete_fallback ( id INT, name STRING, value DOUBLE @@ -3272,20 +3273,27 @@ class CometIcebergNativeSuite extends CometTestBase with RESTCatalogHelper { """) spark.sql(""" - INSERT INTO test_cat.db.truncate_delete_test VALUES + INSERT INTO test_cat.db.truncate_delete_fallback VALUES (1, 'alpha', 10.0), (2, 'alpine', 20.0), (3, 'bravo', 30.0), (4, 'bridge', 40.0), (5, 'charlie', 50.0), (6, 'cherry', 60.0) """) // Delete 'alpine' which shares truncate(3)='alp' partition with 'alpha'. - spark.sql("DELETE FROM test_cat.db.truncate_delete_test WHERE name = 'alpine'") + spark.sql("DELETE FROM test_cat.db.truncate_delete_fallback WHERE name = 'alpine'") - // Query for 'alpha' creates a residual on the truncate transform. - // The deleted row 'alpine' must not appear, and 'alpha' must be returned. + // Query with filter. Because the table has truncate transform AND delete files, + // native scan must fall back to Spark. val query = - "SELECT * FROM test_cat.db.truncate_delete_test WHERE name = 'alpha' ORDER BY id" + "SELECT * FROM test_cat.db.truncate_delete_fallback WHERE name = 'alpha' ORDER BY id" val (_, cometPlan) = checkSparkAnswer(query) + // Assert fallback: no CometIcebergNativeScanExec in plan + val icebergScans = collectIcebergNativeScans(cometPlan) + assert( + icebergScans.isEmpty, + "Expected fallback to Spark (no CometIcebergNativeScanExec) when " + + s"non-identity transform has delete files. Plan:\n$cometPlan") + // Verify correct results: only 'alpha' returned, 'alpine' is deleted val result = spark.sql(query).collect() assert(result.length == 1, s"Expected 1 row, got ${result.length}") @@ -3293,17 +3301,15 @@ class CometIcebergNativeSuite extends CometTestBase with RESTCatalogHelper { assert(result(0).getString(1) == "alpha") // Verify 'alpine' is truly gone from broader query - val allAlpResult = spark - .sql("SELECT * FROM test_cat.db.truncate_delete_test ORDER BY id") + val allResult = spark + .sql("SELECT * FROM test_cat.db.truncate_delete_fallback ORDER BY id") .collect() + assert(allResult.length == 5, s"Expected 5 rows after delete, got ${allResult.length}") assert( - allAlpResult.length == 5, - s"Expected 5 rows after delete, got ${allAlpResult.length}") - assert( - !allAlpResult.exists(_.getString(1) == "alpine"), + !allResult.exists(_.getString(1) == "alpine"), "Deleted row 'alpine' should not appear in results") - spark.sql("DROP TABLE test_cat.db.truncate_delete_test") + spark.sql("DROP TABLE test_cat.db.truncate_delete_fallback") } } }