diff --git a/spark/src/main/scala/org/apache/comet/serde/strings.scala b/spark/src/main/scala/org/apache/comet/serde/strings.scala index 968fe8cd69..85a1dea1c2 100644 --- a/spark/src/main/scala/org/apache/comet/serde/strings.scala +++ b/spark/src/main/scala/org/apache/comet/serde/strings.scala @@ -479,4 +479,50 @@ trait CommonStringExprs { None } } + + def minutesOfTimeToProto( + expr: Expression, + inputs: Seq[Attribute], + binding: Boolean): Option[Expr] = { + val childOpt = expr.children.headOption.orElse { + withInfo(expr, "MinutesOfTime has no child expression") + None + } + + childOpt.flatMap { child => + val timeZoneId = { + val exprClass = expr.getClass + try { + val timeZoneIdMethod = exprClass.getMethod("timeZoneId") + timeZoneIdMethod.invoke(expr).asInstanceOf[Option[String]] + } catch { + case _: NoSuchMethodException => + try { + val timeZoneIdField = exprClass.getField("timeZoneId") + timeZoneIdField.get(expr).asInstanceOf[Option[String]] + } catch { + case _: NoSuchFieldException | _: SecurityException => None + } + } + } + + exprToProtoInternal(child, inputs, binding) + .map { childExpr => + val builder = ExprOuterClass.Minute.newBuilder() + builder.setChild(childExpr) + + val timeZone = timeZoneId.getOrElse("UTC") + builder.setTimezone(timeZone) + + ExprOuterClass.Expr + .newBuilder() + .setMinute(builder) + .build() + } + .orElse { + withInfo(expr, child) + None + } + } + } } diff --git a/spark/src/main/spark-3.4/org/apache/comet/shims/CometExprShim.scala b/spark/src/main/spark-3.4/org/apache/comet/shims/CometExprShim.scala index f80a8909f6..666650c985 100644 --- a/spark/src/main/spark-3.4/org/apache/comet/shims/CometExprShim.scala +++ b/spark/src/main/spark-3.4/org/apache/comet/shims/CometExprShim.scala @@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate.Sum import org.apache.comet.expressions.CometEvalMode import org.apache.comet.serde.CommonStringExprs import org.apache.comet.serde.ExprOuterClass.{BinaryOutputStyle, Expr} +import org.apache.comet.serde.QueryPlanSerde /** * `CometExprShim` acts as a shim for parsing expressions from different Spark versions. @@ -44,6 +45,9 @@ trait CometExprShim extends CommonStringExprs { // Right child is the encoding expression. stringDecode(expr, s.charset, s.bin, inputs, binding) + case _ if expr.getClass.getSimpleName == "MinutesOfTime" => + minutesOfTimeToProto(expr, inputs, binding) + case _ => None } } diff --git a/spark/src/main/spark-3.5/org/apache/comet/shims/CometExprShim.scala b/spark/src/main/spark-3.5/org/apache/comet/shims/CometExprShim.scala index d3e3270700..5bfde393b3 100644 --- a/spark/src/main/spark-3.5/org/apache/comet/shims/CometExprShim.scala +++ b/spark/src/main/spark-3.5/org/apache/comet/shims/CometExprShim.scala @@ -87,6 +87,9 @@ trait CometExprShim extends CommonStringExprs { val optExpr = scalarFunctionExprToProto("width_bucket", childExprs: _*) optExprWithInfo(optExpr, wb, wb.children: _*) + case _ if expr.getClass.getSimpleName == "MinutesOfTime" => + minutesOfTimeToProto(expr, inputs, binding) + case _ => None } } diff --git a/spark/src/main/spark-4.0/org/apache/comet/shims/CometExprShim.scala b/spark/src/main/spark-4.0/org/apache/comet/shims/CometExprShim.scala index 3d5b34bfd2..58d5713297 100644 --- a/spark/src/main/spark-4.0/org/apache/comet/shims/CometExprShim.scala +++ b/spark/src/main/spark-4.0/org/apache/comet/shims/CometExprShim.scala @@ -169,6 +169,9 @@ trait CometExprShim extends CommonStringExprs { optExprWithInfo(mapSortExpr, ms, ms.child) } + case _ if expr.getClass.getSimpleName == "MinutesOfTime" => + minutesOfTimeToProto(expr, inputs, binding) + case _ => None } } diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index 4fcaa550ae..59f419dfba 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -543,6 +543,23 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { } } + test("MinutesOfTime expression support") { + // This test verifies that minute() function works correctly with timestamp columns. + // If Spark generates MinutesOfTime expression (a RuntimeReplaceable expression), + // it will be handled by the version-specific shim and converted to Minute proto. + Seq(true, false).foreach { dictionaryEnabled => + withTempDir { dir => + val path = new Path(dir.toURI.toString, "part-r-0.parquet") + makeRawTimeParquetFile(path, dictionaryEnabled = dictionaryEnabled, 10000) + readParquetFile(path.toString) { df => + val query = df.select(expr("minute(_1)")) + + checkSparkAnswerAndOperator(query) + } + } + } + } + test("hour on int96 timestamp column") { import testImplicits._