Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,44 @@ object IcebergReflection extends Logging {
* List of unsupported partition types (empty if all supported). Each entry is (fieldName,
* typeStr, reason)
*/
/**
* Checks whether a partition spec contains any non-identity transforms.
*
* Non-identity transforms include bucket, truncate, year, month, day, hour. These transforms
* mean that Iceberg's partition pruning cannot fully resolve equality filters on the source
* column, producing residual expressions that require post-scan filtering.
*
* @param partitionSpec
* The Iceberg PartitionSpec object
* @return
* Some(transformStr) for the first non-identity transform found, or None if all are identity
*/
def findNonIdentityTransform(partitionSpec: Any): Option[String] = {
import scala.jdk.CollectionConverters._

try {
val fieldsMethod = partitionSpec.getClass.getMethod("fields")
val fields = fieldsMethod.invoke(partitionSpec).asInstanceOf[java.util.List[_]]

val partitionFieldClass = loadClass(ClassNames.PARTITION_FIELD)
val transformMethod = partitionFieldClass.getMethod("transform")

fields.asScala.foreach { field =>
val transform = transformMethod.invoke(field)
val transformStr = transform.toString
if (transformStr != Transforms.IDENTITY) {
return Some(transformStr)
}
}
None
} catch {
case e: Exception =>
logError(
s"Iceberg reflection failure: Failed to inspect partition transforms: ${e.getMessage}")
None
}
}

def validatePartitionTypes(partitionSpec: Any, schema: Any): List[(String, String, String)] = {
import scala.jdk.CollectionConverters._

Expand Down
74 changes: 28 additions & 46 deletions spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala
Original file line number Diff line number Diff line change
Expand Up @@ -552,19 +552,33 @@ case class CometScanRule(session: SparkSession)
false
}

// Check for unsupported transform functions in residual expressions
// iceberg-rust can only handle identity transforms in residuals; all other transforms
// (truncate, bucket, year, month, day, hour) must fall back to Spark
val transformFunctionsSupported = taskValidation.nonIdentityTransform match {
case Some(transformType) =>
fallbackReasons +=
s"Iceberg transform function '$transformType' in residual expression " +
"is not yet supported by iceberg-rust. " +
"Only identity transforms are supported."
false
case None =>
true
}
// Safety guard: non-identity transforms with delete files must fall back.
// Non-identity transforms (truncate, bucket, year, etc.) produce residual
// expressions that require post-scan filtering. When delete files are also
// present, the native scan cannot correctly apply both the residual filter
// and delete file processing together, so we fall back to Spark.
// Detection uses PartitionSpec (table-level) rather than residual expressions,
// since Iceberg residuals use NamedReference terms without transform metadata.
val transformFunctionsSupported =
IcebergReflection.getPartitionSpec(metadata.table) match {
case Some(partitionSpec) =>
IcebergReflection.findNonIdentityTransform(partitionSpec) match {
case Some(transformType) =>
if (!taskValidation.deleteFiles.isEmpty) {
fallbackReasons +=
s"Iceberg transform '$transformType' with delete files present. " +
"Falling back to ensure correct delete operation."
false
} else {
logInfo(
s"Iceberg partition uses transform '$transformType' - " +
"post-scan filtering will apply via CometFilter.")
true
}
case None => true
}
case None => true // Cannot inspect spec, allow through
}

// Check for unsupported struct types in delete files
val deleteFileTypesSupported = {
Expand Down Expand Up @@ -825,23 +839,19 @@ object CometScanRule extends Logging {
val contentScanTaskClass = Class.forName(IcebergReflection.ClassNames.CONTENT_SCAN_TASK)
val contentFileClass = Class.forName(IcebergReflection.ClassNames.CONTENT_FILE)
val fileScanTaskClass = Class.forName(IcebergReflection.ClassNames.FILE_SCAN_TASK)
val unboundPredicateClass = Class.forName(IcebergReflection.ClassNames.UNBOUND_PREDICATE)
// scalastyle:on classforname

// Cache all method lookups outside the loop
val fileMethod = contentScanTaskClass.getMethod("file")
val formatMethod = contentFileClass.getMethod("format")
val pathMethod = contentFileClass.getMethod("path")
val residualMethod = contentScanTaskClass.getMethod("residual")
val deletesMethod = fileScanTaskClass.getMethod("deletes")
val termMethod = unboundPredicateClass.getMethod("term")

val supportedSchemes =
Set("file", "s3", "s3a", "gs", "gcs", "oss", "abfss", "abfs", "wasbs", "wasb")

var allParquet = true
val unsupportedSchemes = mutable.Set[String]()
var nonIdentityTransform: Option[String] = None
val deleteFiles = new java.util.ArrayList[Any]()

tasks.asScala.foreach { task =>
Expand All @@ -865,29 +875,6 @@ object CometScanRule extends Logging {
case _: java.net.URISyntaxException => // ignore
}

// Residual transform check (short-circuit if already found unsupported)
if (nonIdentityTransform.isEmpty && fileScanTaskClass.isInstance(task)) {
try {
val residual = residualMethod.invoke(task)
if (unboundPredicateClass.isInstance(residual)) {
val term = termMethod.invoke(residual)
try {
val transformMethod = term.getClass.getMethod("transform")
transformMethod.setAccessible(true)
val transform = transformMethod.invoke(term)
val transformStr = transform.toString
if (transformStr != IcebergReflection.Transforms.IDENTITY) {
nonIdentityTransform = Some(transformStr)
}
} catch {
case _: NoSuchMethodException => // No transform = simple reference, OK
}
}
} catch {
case _: Exception => // Skip tasks where we can't get residual
}
}

// Collect delete files and check their schemes
if (fileScanTaskClass.isInstance(task)) {
try {
Expand All @@ -914,11 +901,7 @@ object CometScanRule extends Logging {
}
}

IcebergTaskValidationResult(
allParquet,
unsupportedSchemes.toSet,
nonIdentityTransform,
deleteFiles)
IcebergTaskValidationResult(allParquet, unsupportedSchemes.toSet, deleteFiles)
}
}

Expand All @@ -928,5 +911,4 @@ object CometScanRule extends Logging {
case class IcebergTaskValidationResult(
allParquet: Boolean,
unsupportedSchemes: Set[String],
nonIdentityTransform: Option[String],
deleteFiles: java.util.List[_])
Loading
Loading