From 1e31abb0b8b22b4d50a8c32385580ee1a2047e1a Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 28 Apr 2026 16:00:27 -0600 Subject: [PATCH 01/13] refactor: add versionSpecific* map hooks to CometExprShim --- .../org/apache/comet/shims/CometExprShim.scala | 11 +++++++++-- .../org/apache/comet/shims/CometExprShim.scala | 11 +++++++++-- .../org/apache/comet/shims/CometExprShim.scala | 11 +++++++++-- .../org/apache/comet/shims/CometExprShim.scala | 11 +++++++++-- .../org/apache/comet/shims/CometExprShim.scala | 11 +++++++++-- 5 files changed, 45 insertions(+), 10 deletions(-) diff --git a/spark/src/main/spark-3.4/org/apache/comet/shims/CometExprShim.scala b/spark/src/main/spark-3.4/org/apache/comet/shims/CometExprShim.scala index f80a8909f6..6a5211a78e 100644 --- a/spark/src/main/spark-3.4/org/apache/comet/shims/CometExprShim.scala +++ b/spark/src/main/spark-3.4/org/apache/comet/shims/CometExprShim.scala @@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate.Sum import org.apache.comet.expressions.CometEvalMode -import org.apache.comet.serde.CommonStringExprs +import org.apache.comet.serde.{CometExpressionSerde, CommonStringExprs} import org.apache.comet.serde.ExprOuterClass.{BinaryOutputStyle, Expr} /** @@ -33,7 +33,14 @@ trait CometExprShim extends CommonStringExprs { protected def evalMode(c: Cast): CometEvalMode.Value = CometEvalModeUtil.fromSparkEvalMode(c.evalMode) - protected def binaryOutputStyle: BinaryOutputStyle = BinaryOutputStyle.HEX_DISCRETE + def binaryOutputStyle: BinaryOutputStyle = BinaryOutputStyle.HEX_DISCRETE + + def versionSpecificStringExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = + Map.empty + def versionSpecificMathExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = + Map.empty + def versionSpecificMiscExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = + Map.empty def versionSpecificExprToProtoInternal( expr: Expression, diff --git a/spark/src/main/spark-3.5/org/apache/comet/shims/CometExprShim.scala b/spark/src/main/spark-3.5/org/apache/comet/shims/CometExprShim.scala index d3e3270700..3d0331deba 100644 --- a/spark/src/main/spark-3.5/org/apache/comet/shims/CometExprShim.scala +++ b/spark/src/main/spark-3.5/org/apache/comet/shims/CometExprShim.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.types.DataTypes import org.apache.comet.CometSparkSessionExtensions.withInfo import org.apache.comet.expressions.{CometCast, CometEvalMode} -import org.apache.comet.serde.{CommonStringExprs, Compatible, ExprOuterClass, Incompatible} +import org.apache.comet.serde.{CometExpressionSerde, CommonStringExprs, Compatible, ExprOuterClass, Incompatible} import org.apache.comet.serde.ExprOuterClass.{BinaryOutputStyle, Expr} import org.apache.comet.serde.QueryPlanSerde.{exprToProtoInternal, optExprWithInfo, scalarFunctionExprToProto} @@ -36,7 +36,14 @@ trait CometExprShim extends CommonStringExprs { protected def evalMode(c: Cast): CometEvalMode.Value = CometEvalModeUtil.fromSparkEvalMode(c.evalMode) - protected def binaryOutputStyle: BinaryOutputStyle = BinaryOutputStyle.HEX_DISCRETE + def binaryOutputStyle: BinaryOutputStyle = BinaryOutputStyle.HEX_DISCRETE + + def versionSpecificStringExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = + Map.empty + def versionSpecificMathExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = + Map.empty + def versionSpecificMiscExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = + Map.empty def versionSpecificExprToProtoInternal( expr: Expression, diff --git a/spark/src/main/spark-4.0/org/apache/comet/shims/CometExprShim.scala b/spark/src/main/spark-4.0/org/apache/comet/shims/CometExprShim.scala index 86b28b715b..379a66b6b3 100644 --- a/spark/src/main/spark-4.0/org/apache/comet/shims/CometExprShim.scala +++ b/spark/src/main/spark-4.0/org/apache/comet/shims/CometExprShim.scala @@ -29,7 +29,7 @@ import org.apache.spark.sql.types.{ArrayType, BinaryType, BooleanType, DataTypes import org.apache.comet.CometSparkSessionExtensions.withInfo import org.apache.comet.expressions.{CometCast, CometEvalMode} -import org.apache.comet.serde.{CommonStringExprs, Compatible, ExprOuterClass, Incompatible} +import org.apache.comet.serde.{CometExpressionSerde, CommonStringExprs, Compatible, ExprOuterClass, Incompatible} import org.apache.comet.serde.ExprOuterClass.{BinaryOutputStyle, Expr} import org.apache.comet.serde.QueryPlanSerde.{exprToProtoInternal, optExprWithInfo, scalarFunctionExprToProto, scalarFunctionExprToProtoWithReturnType} @@ -40,7 +40,7 @@ trait CometExprShim extends CommonStringExprs { protected def evalMode(c: Cast): CometEvalMode.Value = CometEvalModeUtil.fromSparkEvalMode(c.evalMode) - protected def binaryOutputStyle: BinaryOutputStyle = { + def binaryOutputStyle: BinaryOutputStyle = { SQLConf.get .getConf(SQLConf.BINARY_OUTPUT_STYLE) .map(SQLConf.BinaryOutputStyle.withName) match { @@ -52,6 +52,13 @@ trait CometExprShim extends CommonStringExprs { } } + def versionSpecificStringExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = + Map.empty + def versionSpecificMathExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = + Map.empty + def versionSpecificMiscExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = + Map.empty + def versionSpecificExprToProtoInternal( expr: Expression, inputs: Seq[Attribute], diff --git a/spark/src/main/spark-4.1/org/apache/comet/shims/CometExprShim.scala b/spark/src/main/spark-4.1/org/apache/comet/shims/CometExprShim.scala index 4f21e5eafa..3ad1033bde 100644 --- a/spark/src/main/spark-4.1/org/apache/comet/shims/CometExprShim.scala +++ b/spark/src/main/spark-4.1/org/apache/comet/shims/CometExprShim.scala @@ -29,7 +29,7 @@ import org.apache.spark.sql.types.{ArrayType, BinaryType, BooleanType, DataTypes import org.apache.comet.CometSparkSessionExtensions.withInfo import org.apache.comet.expressions.{CometCast, CometEvalMode} -import org.apache.comet.serde.{CommonStringExprs, Compatible, ExprOuterClass, Incompatible} +import org.apache.comet.serde.{CometExpressionSerde, CommonStringExprs, Compatible, ExprOuterClass, Incompatible} import org.apache.comet.serde.ExprOuterClass.{BinaryOutputStyle, Expr} import org.apache.comet.serde.QueryPlanSerde.{exprToProtoInternal, optExprWithInfo, scalarFunctionExprToProto, scalarFunctionExprToProtoWithReturnType} @@ -40,7 +40,7 @@ trait CometExprShim extends CommonStringExprs { protected def evalMode(c: Cast): CometEvalMode.Value = CometEvalModeUtil.fromSparkEvalMode(c.evalMode) - protected def binaryOutputStyle: BinaryOutputStyle = { + def binaryOutputStyle: BinaryOutputStyle = { // In Spark 4.1, BINARY_OUTPUT_STYLE is an enumConf so getConf already returns the enum value. SQLConf.get.getConf(SQLConf.BINARY_OUTPUT_STYLE) match { case Some(SQLConf.BinaryOutputStyle.UTF8) => BinaryOutputStyle.UTF8 @@ -51,6 +51,13 @@ trait CometExprShim extends CommonStringExprs { } } + def versionSpecificStringExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = + Map.empty + def versionSpecificMathExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = + Map.empty + def versionSpecificMiscExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = + Map.empty + def versionSpecificExprToProtoInternal( expr: Expression, inputs: Seq[Attribute], diff --git a/spark/src/main/spark-4.2/org/apache/comet/shims/CometExprShim.scala b/spark/src/main/spark-4.2/org/apache/comet/shims/CometExprShim.scala index 4f21e5eafa..3ad1033bde 100644 --- a/spark/src/main/spark-4.2/org/apache/comet/shims/CometExprShim.scala +++ b/spark/src/main/spark-4.2/org/apache/comet/shims/CometExprShim.scala @@ -29,7 +29,7 @@ import org.apache.spark.sql.types.{ArrayType, BinaryType, BooleanType, DataTypes import org.apache.comet.CometSparkSessionExtensions.withInfo import org.apache.comet.expressions.{CometCast, CometEvalMode} -import org.apache.comet.serde.{CommonStringExprs, Compatible, ExprOuterClass, Incompatible} +import org.apache.comet.serde.{CometExpressionSerde, CommonStringExprs, Compatible, ExprOuterClass, Incompatible} import org.apache.comet.serde.ExprOuterClass.{BinaryOutputStyle, Expr} import org.apache.comet.serde.QueryPlanSerde.{exprToProtoInternal, optExprWithInfo, scalarFunctionExprToProto, scalarFunctionExprToProtoWithReturnType} @@ -40,7 +40,7 @@ trait CometExprShim extends CommonStringExprs { protected def evalMode(c: Cast): CometEvalMode.Value = CometEvalModeUtil.fromSparkEvalMode(c.evalMode) - protected def binaryOutputStyle: BinaryOutputStyle = { + def binaryOutputStyle: BinaryOutputStyle = { // In Spark 4.1, BINARY_OUTPUT_STYLE is an enumConf so getConf already returns the enum value. SQLConf.get.getConf(SQLConf.BINARY_OUTPUT_STYLE) match { case Some(SQLConf.BinaryOutputStyle.UTF8) => BinaryOutputStyle.UTF8 @@ -51,6 +51,13 @@ trait CometExprShim extends CommonStringExprs { } } + def versionSpecificStringExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = + Map.empty + def versionSpecificMathExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = + Map.empty + def versionSpecificMiscExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = + Map.empty + def versionSpecificExprToProtoInternal( expr: Expression, inputs: Seq[Attribute], From 736abe1676b46a26755fbf7f6db8538fe0337e57 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 28 Apr 2026 16:10:48 -0600 Subject: [PATCH 02/13] refactor: merge versionSpecific* maps into QueryPlanSerde categories --- .../apache/comet/serde/QueryPlanSerde.scala | 118 ++++++++++-------- 1 file changed, 63 insertions(+), 55 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index c3dc6dcfd5..c7a49f542c 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -89,44 +89,47 @@ object QueryPlanSerde extends Logging with CometExprShim with CometTypeShim { classOf[Not] -> CometNot, classOf[Or] -> CometOr) - private[comet] val mathExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map( - classOf[Acos] -> CometScalarFunction("acos"), - classOf[Add] -> CometAdd, - classOf[Asin] -> CometScalarFunction("asin"), - classOf[Atan] -> CometScalarFunction("atan"), - classOf[Atan2] -> CometAtan2, - classOf[Ceil] -> CometCeil, - classOf[Cos] -> CometScalarFunction("cos"), - classOf[Cosh] -> CometScalarFunction("cosh"), - classOf[Divide] -> CometDivide, - classOf[Exp] -> CometScalarFunction("exp"), - classOf[Expm1] -> CometScalarFunction("expm1"), - classOf[Floor] -> CometFloor, - classOf[Hex] -> CometHex, - classOf[IntegralDivide] -> CometIntegralDivide, - classOf[IsNaN] -> CometIsNaN, - classOf[Log] -> CometLog, - classOf[Log2] -> CometLog2, - classOf[Log10] -> CometLog10, - classOf[Logarithm] -> CometLogarithm, - classOf[Multiply] -> CometMultiply, - classOf[Pow] -> CometScalarFunction("pow"), - classOf[Rand] -> CometRand, - classOf[Randn] -> CometRandn, - classOf[Remainder] -> CometRemainder, - classOf[Round] -> CometRound, - classOf[Signum] -> CometScalarFunction("signum"), - classOf[Sin] -> CometScalarFunction("sin"), - classOf[Sinh] -> CometScalarFunction("sinh"), - classOf[Sqrt] -> CometScalarFunction("sqrt"), - classOf[Subtract] -> CometSubtract, - classOf[Tan] -> CometScalarFunction("tan"), - classOf[Tanh] -> CometScalarFunction("tanh"), - classOf[Cot] -> CometScalarFunction("cot"), - classOf[UnaryMinus] -> CometUnaryMinus, - classOf[Unhex] -> CometUnhex, - classOf[Abs] -> CometAbs, - classOf[Bin] -> CometScalarFunction("bin")) + private[comet] val mathExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = { + val base: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map( + classOf[Acos] -> CometScalarFunction("acos"), + classOf[Add] -> CometAdd, + classOf[Asin] -> CometScalarFunction("asin"), + classOf[Atan] -> CometScalarFunction("atan"), + classOf[Atan2] -> CometAtan2, + classOf[Ceil] -> CometCeil, + classOf[Cos] -> CometScalarFunction("cos"), + classOf[Cosh] -> CometScalarFunction("cosh"), + classOf[Divide] -> CometDivide, + classOf[Exp] -> CometScalarFunction("exp"), + classOf[Expm1] -> CometScalarFunction("expm1"), + classOf[Floor] -> CometFloor, + classOf[Hex] -> CometHex, + classOf[IntegralDivide] -> CometIntegralDivide, + classOf[IsNaN] -> CometIsNaN, + classOf[Log] -> CometLog, + classOf[Log2] -> CometLog2, + classOf[Log10] -> CometLog10, + classOf[Logarithm] -> CometLogarithm, + classOf[Multiply] -> CometMultiply, + classOf[Pow] -> CometScalarFunction("pow"), + classOf[Rand] -> CometRand, + classOf[Randn] -> CometRandn, + classOf[Remainder] -> CometRemainder, + classOf[Round] -> CometRound, + classOf[Signum] -> CometScalarFunction("signum"), + classOf[Sin] -> CometScalarFunction("sin"), + classOf[Sinh] -> CometScalarFunction("sinh"), + classOf[Sqrt] -> CometScalarFunction("sqrt"), + classOf[Subtract] -> CometSubtract, + classOf[Tan] -> CometScalarFunction("tan"), + classOf[Tanh] -> CometScalarFunction("tanh"), + classOf[Cot] -> CometScalarFunction("cot"), + classOf[UnaryMinus] -> CometUnaryMinus, + classOf[Unhex] -> CometUnhex, + classOf[Abs] -> CometAbs, + classOf[Bin] -> CometScalarFunction("bin")) + base ++ versionSpecificMathExpressions + } private[comet] val mapExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map( classOf[GetMapValue] -> CometMapExtract, @@ -154,8 +157,8 @@ object QueryPlanSerde extends Logging with CometExprShim with CometTypeShim { classOf[XxHash64] -> CometXxHash64, classOf[Sha1] -> CometSha1) - private[comet] val stringExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = - Map( + private[comet] val stringExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = { + val base: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map( classOf[Ascii] -> CometScalarFunction("ascii"), classOf[BitLength] -> CometScalarFunction("bit_length"), classOf[Chr] -> CometScalarFunction("char"), @@ -189,6 +192,8 @@ object QueryPlanSerde extends Logging with CometExprShim with CometTypeShim { classOf[Right] -> CometRight, classOf[Substring] -> CometSubstring, classOf[Upper] -> CometUpper) + base ++ versionSpecificStringExpressions + } private val bitwiseExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map( classOf[BitwiseAnd] -> CometBitwiseAnd, @@ -232,22 +237,25 @@ object QueryPlanSerde extends Logging with CometExprShim with CometTypeShim { private val conversionExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map( classOf[Cast] -> CometCast) - private[comet] val miscExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map( + private[comet] val miscExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = { // TODO PromotePrecision - classOf[Alias] -> CometAlias, - classOf[AttributeReference] -> CometAttributeReference, - classOf[BloomFilterMightContain] -> CometBloomFilterMightContain, - classOf[CheckOverflow] -> CometCheckOverflow, - classOf[Coalesce] -> CometCoalesce, - classOf[KnownFloatingPointNormalized] -> CometKnownFloatingPointNormalized, - classOf[Literal] -> CometLiteral, - classOf[MakeDecimal] -> CometMakeDecimal, - classOf[MonotonicallyIncreasingID] -> CometMonotonicallyIncreasingId, - classOf[ScalarSubquery] -> CometScalarSubquery, - classOf[SparkPartitionID] -> CometSparkPartitionId, - classOf[SortOrder] -> CometSortOrder, - classOf[StaticInvoke] -> CometStaticInvoke, - classOf[UnscaledValue] -> CometUnscaledValue) + val base: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map( + classOf[Alias] -> CometAlias, + classOf[AttributeReference] -> CometAttributeReference, + classOf[BloomFilterMightContain] -> CometBloomFilterMightContain, + classOf[CheckOverflow] -> CometCheckOverflow, + classOf[Coalesce] -> CometCoalesce, + classOf[KnownFloatingPointNormalized] -> CometKnownFloatingPointNormalized, + classOf[Literal] -> CometLiteral, + classOf[MakeDecimal] -> CometMakeDecimal, + classOf[MonotonicallyIncreasingID] -> CometMonotonicallyIncreasingId, + classOf[ScalarSubquery] -> CometScalarSubquery, + classOf[SparkPartitionID] -> CometSparkPartitionId, + classOf[SortOrder] -> CometSortOrder, + classOf[StaticInvoke] -> CometStaticInvoke, + classOf[UnscaledValue] -> CometUnscaledValue) + base ++ versionSpecificMiscExpressions + } /** * Mapping of Spark expression class to Comet expression handler. From b10128bc00a162ac5c90419e7550db9fe92fc086 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 28 Apr 2026 16:14:48 -0600 Subject: [PATCH 03/13] docs: explain why QueryPlanSerde category maps wrap in a typed base val --- .../main/scala/org/apache/comet/serde/QueryPlanSerde.scala | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index c7a49f542c..05d83c1c24 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -90,6 +90,8 @@ object QueryPlanSerde extends Logging with CometExprShim with CometTypeShim { classOf[Or] -> CometOr) private[comet] val mathExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = { + // Explicit type ascription on `base`: Scala 2.13 cannot infer the existential key type + // when `++` is applied directly to a `Map(...)` literal. val base: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map( classOf[Acos] -> CometScalarFunction("acos"), classOf[Add] -> CometAdd, @@ -158,6 +160,8 @@ object QueryPlanSerde extends Logging with CometExprShim with CometTypeShim { classOf[Sha1] -> CometSha1) private[comet] val stringExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = { + // Explicit type ascription on `base`: Scala 2.13 cannot infer the existential key type + // when `++` is applied directly to a `Map(...)` literal. val base: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map( classOf[Ascii] -> CometScalarFunction("ascii"), classOf[BitLength] -> CometScalarFunction("bit_length"), @@ -239,6 +243,8 @@ object QueryPlanSerde extends Logging with CometExprShim with CometTypeShim { private[comet] val miscExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = { // TODO PromotePrecision + // Explicit type ascription on `base`: Scala 2.13 cannot infer the existential key type + // when `++` is applied directly to a `Map(...)` literal. val base: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map( classOf[Alias] -> CometAlias, classOf[AttributeReference] -> CometAttributeReference, From a914b12beeccefc900b6fbf4273a25ed91c944b8 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 28 Apr 2026 16:25:17 -0600 Subject: [PATCH 04/13] refactor: migrate StringDecode to CometExpressionSerde framework Register CometStringDecode (shared under spark-3.x/) in the versionSpecificStringExpressions map for Spark 3.4 and 3.5, and remove the now-redundant case branch from versionSpecificExprToProtoInternal. --- .../apache/comet/shims/CometExprShim.scala | 14 ++--- .../apache/comet/shims/CometExprShim.scala | 8 +-- .../comet/serde/CometStringDecode.scala | 32 ++++++++++++ .../CometStringDecodeFrameworkSuite.scala | 51 +++++++++++++++++++ 4 files changed, 88 insertions(+), 17 deletions(-) create mode 100644 spark/src/main/spark-3.x/org/apache/comet/serde/CometStringDecode.scala create mode 100644 spark/src/test/spark-3.x/org/apache/comet/CometStringDecodeFrameworkSuite.scala diff --git a/spark/src/main/spark-3.4/org/apache/comet/shims/CometExprShim.scala b/spark/src/main/spark-3.4/org/apache/comet/shims/CometExprShim.scala index 6a5211a78e..3ebcd09d4d 100644 --- a/spark/src/main/spark-3.4/org/apache/comet/shims/CometExprShim.scala +++ b/spark/src/main/spark-3.4/org/apache/comet/shims/CometExprShim.scala @@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate.Sum import org.apache.comet.expressions.CometEvalMode -import org.apache.comet.serde.{CometExpressionSerde, CommonStringExprs} +import org.apache.comet.serde.{CometExpressionSerde, CometStringDecode, CommonStringExprs} import org.apache.comet.serde.ExprOuterClass.{BinaryOutputStyle, Expr} /** @@ -36,7 +36,7 @@ trait CometExprShim extends CommonStringExprs { def binaryOutputStyle: BinaryOutputStyle = BinaryOutputStyle.HEX_DISCRETE def versionSpecificStringExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = - Map.empty + Map(classOf[StringDecode] -> CometStringDecode) def versionSpecificMathExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map.empty def versionSpecificMiscExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = @@ -45,15 +45,7 @@ trait CometExprShim extends CommonStringExprs { def versionSpecificExprToProtoInternal( expr: Expression, inputs: Seq[Attribute], - binding: Boolean): Option[Expr] = { - expr match { - case s: StringDecode => - // Right child is the encoding expression. - stringDecode(expr, s.charset, s.bin, inputs, binding) - - case _ => None - } - } + binding: Boolean): Option[Expr] = None } object CometEvalModeUtil { diff --git a/spark/src/main/spark-3.5/org/apache/comet/shims/CometExprShim.scala b/spark/src/main/spark-3.5/org/apache/comet/shims/CometExprShim.scala index 3d0331deba..a401eecf0a 100644 --- a/spark/src/main/spark-3.5/org/apache/comet/shims/CometExprShim.scala +++ b/spark/src/main/spark-3.5/org/apache/comet/shims/CometExprShim.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.types.DataTypes import org.apache.comet.CometSparkSessionExtensions.withInfo import org.apache.comet.expressions.{CometCast, CometEvalMode} -import org.apache.comet.serde.{CometExpressionSerde, CommonStringExprs, Compatible, ExprOuterClass, Incompatible} +import org.apache.comet.serde.{CometExpressionSerde, CometStringDecode, CommonStringExprs, Compatible, ExprOuterClass, Incompatible} import org.apache.comet.serde.ExprOuterClass.{BinaryOutputStyle, Expr} import org.apache.comet.serde.QueryPlanSerde.{exprToProtoInternal, optExprWithInfo, scalarFunctionExprToProto} @@ -39,7 +39,7 @@ trait CometExprShim extends CommonStringExprs { def binaryOutputStyle: BinaryOutputStyle = BinaryOutputStyle.HEX_DISCRETE def versionSpecificStringExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = - Map.empty + Map(classOf[StringDecode] -> CometStringDecode) def versionSpecificMathExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map.empty def versionSpecificMiscExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = @@ -50,10 +50,6 @@ trait CometExprShim extends CommonStringExprs { inputs: Seq[Attribute], binding: Boolean): Option[Expr] = { expr match { - case s: StringDecode => - // Right child is the encoding expression. - stringDecode(expr, s.charset, s.bin, inputs, binding) - case expr @ ToPrettyString(child, timeZoneId) => val castSupported = CometCast.isSupported( child.dataType, diff --git a/spark/src/main/spark-3.x/org/apache/comet/serde/CometStringDecode.scala b/spark/src/main/spark-3.x/org/apache/comet/serde/CometStringDecode.scala new file mode 100644 index 0000000000..8ac736d1e9 --- /dev/null +++ b/spark/src/main/spark-3.x/org/apache/comet/serde/CometStringDecode.scala @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.serde + +import org.apache.spark.sql.catalyst.expressions.{Attribute, StringDecode} + +object CometStringDecode extends CometExpressionSerde[StringDecode] with CommonStringExprs { + + override def convert( + expr: StringDecode, + inputs: Seq[Attribute], + binding: Boolean): Option[ExprOuterClass.Expr] = { + stringDecode(expr, expr.charset, expr.bin, inputs, binding) + } +} diff --git a/spark/src/test/spark-3.x/org/apache/comet/CometStringDecodeFrameworkSuite.scala b/spark/src/test/spark-3.x/org/apache/comet/CometStringDecodeFrameworkSuite.scala new file mode 100644 index 0000000000..0ac5930d18 --- /dev/null +++ b/spark/src/test/spark-3.x/org/apache/comet/CometStringDecodeFrameworkSuite.scala @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet + +import org.apache.spark.sql.CometTestBase +import org.apache.spark.sql.execution.{ProjectExec, SparkPlan} + +import org.apache.comet.CometSparkSessionExtensions.isSpark40Plus + +class CometStringDecodeFrameworkSuite extends CometTestBase { + + private def countSparkProjectExec(plan: SparkPlan): Int = + plan.collect { case _: ProjectExec => true }.length + + test("StringDecode honors spark.comet.expression.StringDecode.enabled") { + assume( + !isSpark40Plus, + "Spark 4.0+ rewrites decode() to StaticInvoke; that path is intentionally not " + + "registered through the framework (see issue #4077).") + withParquetTable(Seq(("hello".getBytes, 0)), "tbl") { + val query = "select decode(_1, 'utf-8') from tbl" + val (_, cometPlan) = checkSparkAnswerAndOperator(query) + assert(0 == countSparkProjectExec(cometPlan)) + + withSQLConf(CometConf.getExprEnabledConfigKey("StringDecode") -> "false") { + val (_, cometPlan2) = checkSparkAnswerAndFallbackReason( + query, + "Expression support is disabled. Set " + + "spark.comet.expression.StringDecode.enabled=true to enable it.") + assert(1 == countSparkProjectExec(cometPlan2)) + } + } + } +} From b11bc97c6601dd95e8dab72793c4207737833b5d Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 28 Apr 2026 16:38:30 -0600 Subject: [PATCH 05/13] refactor: migrate WidthBucket (Spark 3.5) to CometExpressionSerde --- .../apache/comet/serde/CometWidthBucket.scala | 24 ++++++++++ .../apache/comet/shims/CometExprShim.scala | 11 ++--- .../CometWidthBucketFrameworkSuite.scala | 45 +++++++++++++++++++ 3 files changed, 72 insertions(+), 8 deletions(-) create mode 100644 spark/src/main/spark-3.5/org/apache/comet/serde/CometWidthBucket.scala create mode 100644 spark/src/test/spark-3.5/org/apache/comet/CometWidthBucketFrameworkSuite.scala diff --git a/spark/src/main/spark-3.5/org/apache/comet/serde/CometWidthBucket.scala b/spark/src/main/spark-3.5/org/apache/comet/serde/CometWidthBucket.scala new file mode 100644 index 0000000000..015731acc8 --- /dev/null +++ b/spark/src/main/spark-3.5/org/apache/comet/serde/CometWidthBucket.scala @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.serde + +import org.apache.spark.sql.catalyst.expressions.WidthBucket + +object CometWidthBucket extends CometScalarFunction[WidthBucket]("width_bucket") diff --git a/spark/src/main/spark-3.5/org/apache/comet/shims/CometExprShim.scala b/spark/src/main/spark-3.5/org/apache/comet/shims/CometExprShim.scala index a401eecf0a..6da7f45715 100644 --- a/spark/src/main/spark-3.5/org/apache/comet/shims/CometExprShim.scala +++ b/spark/src/main/spark-3.5/org/apache/comet/shims/CometExprShim.scala @@ -25,9 +25,9 @@ import org.apache.spark.sql.types.DataTypes import org.apache.comet.CometSparkSessionExtensions.withInfo import org.apache.comet.expressions.{CometCast, CometEvalMode} -import org.apache.comet.serde.{CometExpressionSerde, CometStringDecode, CommonStringExprs, Compatible, ExprOuterClass, Incompatible} +import org.apache.comet.serde.{CometExpressionSerde, CometStringDecode, CometWidthBucket, CommonStringExprs, Compatible, ExprOuterClass, Incompatible} import org.apache.comet.serde.ExprOuterClass.{BinaryOutputStyle, Expr} -import org.apache.comet.serde.QueryPlanSerde.{exprToProtoInternal, optExprWithInfo, scalarFunctionExprToProto} +import org.apache.comet.serde.QueryPlanSerde.exprToProtoInternal /** * `CometExprShim` acts as a shim for parsing expressions from different Spark versions. @@ -41,7 +41,7 @@ trait CometExprShim extends CommonStringExprs { def versionSpecificStringExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map(classOf[StringDecode] -> CometStringDecode) def versionSpecificMathExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = - Map.empty + Map(classOf[WidthBucket] -> CometWidthBucket) def versionSpecificMiscExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map.empty @@ -85,11 +85,6 @@ trait CometExprShim extends CommonStringExprs { None } - case wb: WidthBucket => - val childExprs = wb.children.map(exprToProtoInternal(_, inputs, binding)) - val optExpr = scalarFunctionExprToProto("width_bucket", childExprs: _*) - optExprWithInfo(optExpr, wb, wb.children: _*) - case _ => None } } diff --git a/spark/src/test/spark-3.5/org/apache/comet/CometWidthBucketFrameworkSuite.scala b/spark/src/test/spark-3.5/org/apache/comet/CometWidthBucketFrameworkSuite.scala new file mode 100644 index 0000000000..264a4ac430 --- /dev/null +++ b/spark/src/test/spark-3.5/org/apache/comet/CometWidthBucketFrameworkSuite.scala @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet + +import org.apache.spark.sql.CometTestBase +import org.apache.spark.sql.execution.{ProjectExec, SparkPlan} + +class CometWidthBucketFrameworkSuite extends CometTestBase { + + private def countSparkProjectExec(plan: SparkPlan): Int = + plan.collect { case _: ProjectExec => true }.length + + test("WidthBucket honors spark.comet.expression.WidthBucket.enabled") { + withParquetTable(Seq((1.5, 0)), "tbl") { + val sql = "select width_bucket(_1, 0.0, 10.0, 5) from tbl" + val (_, cometPlan) = checkSparkAnswerAndOperator(sql) + assert(0 == countSparkProjectExec(cometPlan)) + + withSQLConf(CometConf.getExprEnabledConfigKey("WidthBucket") -> "false") { + val (_, cometPlan2) = checkSparkAnswerAndFallbackReason( + sql, + "Expression support is disabled. Set " + + "spark.comet.expression.WidthBucket.enabled=true to enable it.") + assert(1 == countSparkProjectExec(cometPlan2)) + } + } + } +} From f10412e29bf1bd7ff91a0b9bf14b7fbe817f318e Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 28 Apr 2026 16:49:16 -0600 Subject: [PATCH 06/13] refactor: migrate ToPrettyString (Spark 3.5) to CometExpressionSerde --- .../comet/serde/CometToPrettyString.scala | 62 +++++++++++++++++++ .../apache/comet/shims/CometExprShim.scala | 50 ++------------- .../spark/sql/CometToPrettyStringSuite.scala | 30 +++++++++ 3 files changed, 96 insertions(+), 46 deletions(-) create mode 100644 spark/src/main/spark-3.5/org/apache/comet/serde/CometToPrettyString.scala diff --git a/spark/src/main/spark-3.5/org/apache/comet/serde/CometToPrettyString.scala b/spark/src/main/spark-3.5/org/apache/comet/serde/CometToPrettyString.scala new file mode 100644 index 0000000000..a640c23199 --- /dev/null +++ b/spark/src/main/spark-3.5/org/apache/comet/serde/CometToPrettyString.scala @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.serde + +import org.apache.spark.sql.catalyst.expressions.{Attribute, ToPrettyString} +import org.apache.spark.sql.types.DataTypes + +import org.apache.comet.CometSparkSessionExtensions.withInfo +import org.apache.comet.expressions.{CometCast, CometEvalMode} +import org.apache.comet.serde.ExprOuterClass.BinaryOutputStyle +import org.apache.comet.serde.QueryPlanSerde.exprToProtoInternal + +object CometToPrettyString extends CometExpressionSerde[ToPrettyString] { + + override def getSupportLevel(expr: ToPrettyString): SupportLevel = { + CometCast.isSupported( + expr.child.dataType, + DataTypes.StringType, + expr.timeZoneId, + CometEvalMode.TRY) match { + case Compatible(_) | Incompatible(_) => Compatible(None) + case Unsupported(reason) => + Unsupported(Some(s"Cast to string is unsupported: ${reason.getOrElse("")}")) + } + } + + override def convert( + expr: ToPrettyString, + inputs: Seq[Attribute], + binding: Boolean): Option[ExprOuterClass.Expr] = { + exprToProtoInternal(expr.child, inputs, binding) match { + case Some(p) => + val tps = ExprOuterClass.ToPrettyString + .newBuilder() + .setChild(p) + .setTimezone(expr.timeZoneId.getOrElse("UTC")) + .setBinaryOutputStyle(BinaryOutputStyle.HEX_DISCRETE) + .build() + Some(ExprOuterClass.Expr.newBuilder().setToPrettyString(tps).build()) + case _ => + withInfo(expr, expr.child) + None + } + } +} diff --git a/spark/src/main/spark-3.5/org/apache/comet/shims/CometExprShim.scala b/spark/src/main/spark-3.5/org/apache/comet/shims/CometExprShim.scala index 6da7f45715..20903bbb7d 100644 --- a/spark/src/main/spark-3.5/org/apache/comet/shims/CometExprShim.scala +++ b/spark/src/main/spark-3.5/org/apache/comet/shims/CometExprShim.scala @@ -21,13 +21,10 @@ package org.apache.comet.shims import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate.Sum -import org.apache.spark.sql.types.DataTypes -import org.apache.comet.CometSparkSessionExtensions.withInfo -import org.apache.comet.expressions.{CometCast, CometEvalMode} -import org.apache.comet.serde.{CometExpressionSerde, CometStringDecode, CometWidthBucket, CommonStringExprs, Compatible, ExprOuterClass, Incompatible} +import org.apache.comet.expressions.CometEvalMode +import org.apache.comet.serde.{CometExpressionSerde, CometStringDecode, CometToPrettyString, CometWidthBucket, CommonStringExprs} import org.apache.comet.serde.ExprOuterClass.{BinaryOutputStyle, Expr} -import org.apache.comet.serde.QueryPlanSerde.exprToProtoInternal /** * `CometExprShim` acts as a shim for parsing expressions from different Spark versions. @@ -43,51 +40,12 @@ trait CometExprShim extends CommonStringExprs { def versionSpecificMathExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map(classOf[WidthBucket] -> CometWidthBucket) def versionSpecificMiscExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = - Map.empty + Map(classOf[ToPrettyString] -> CometToPrettyString) def versionSpecificExprToProtoInternal( expr: Expression, inputs: Seq[Attribute], - binding: Boolean): Option[Expr] = { - expr match { - case expr @ ToPrettyString(child, timeZoneId) => - val castSupported = CometCast.isSupported( - child.dataType, - DataTypes.StringType, - timeZoneId, - CometEvalMode.TRY) - - val isCastSupported = castSupported match { - case Compatible(_) => true - case Incompatible(_) => true - case _ => false - } - - if (isCastSupported) { - exprToProtoInternal(child, inputs, binding) match { - case Some(p) => - val toPrettyString = ExprOuterClass.ToPrettyString - .newBuilder() - .setChild(p) - .setTimezone(timeZoneId.getOrElse("UTC")) - .setBinaryOutputStyle(binaryOutputStyle) - .build() - Some( - ExprOuterClass.Expr - .newBuilder() - .setToPrettyString(toPrettyString) - .build()) - case _ => - withInfo(expr, child) - None - } - } else { - None - } - - case _ => None - } - } + binding: Boolean): Option[Expr] = None } object CometEvalModeUtil { diff --git a/spark/src/test/spark-3.5/org/apache/spark/sql/CometToPrettyStringSuite.scala b/spark/src/test/spark-3.5/org/apache/spark/sql/CometToPrettyStringSuite.scala index 5dd956116f..beb3721102 100644 --- a/spark/src/test/spark-3.5/org/apache/spark/sql/CometToPrettyStringSuite.scala +++ b/spark/src/test/spark-3.5/org/apache/spark/sql/CometToPrettyStringSuite.scala @@ -23,8 +23,10 @@ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute import org.apache.spark.sql.catalyst.expressions.{Alias, ToPrettyString} import org.apache.spark.sql.catalyst.plans.logical.Project +import org.apache.spark.sql.execution.{ProjectExec, SparkPlan} import org.apache.spark.sql.types.DataTypes +import org.apache.comet.CometConf import org.apache.comet.CometFuzzTestBase import org.apache.comet.expressions.{CometCast, CometEvalMode} import org.apache.comet.serde.Compatible @@ -54,4 +56,32 @@ class CometToPrettyStringSuite extends CometFuzzTestBase { } } + test("ToPrettyString honors spark.comet.expression.ToPrettyString.enabled") { + def countSparkProjectExec(plan: SparkPlan): Int = + plan.collect { case _: ProjectExec => true }.length + + val df = spark.read.parquet(filename) + df.createOrReplaceTempView("t1") + val table = spark.sessionState.catalog.lookupRelation(TableIdentifier("t1")) + + // Pick a column whose cast to string is Compatible, so the baseline executes natively. + val col = df.schema.fields + .find(_.dataType == DataTypes.IntegerType) + .map(_.name) + .getOrElse(df.schema.fields.head.name) + val prettyExpr = Alias(ToPrettyString(UnresolvedAttribute(col)), s"pretty_$col")() + val plan = Project(Seq(prettyExpr), table) + val analyzed = spark.sessionState.analyzer.execute(plan) + + // Baseline: ToPrettyString converts natively, no Spark ProjectExec. + val baselinePlan = Dataset.ofRows(spark, analyzed).queryExecution.executedPlan + assert(countSparkProjectExec(baselinePlan) == 0) + + // With per-expression config disabled, expression falls back to Spark. + withSQLConf(CometConf.getExprEnabledConfigKey("ToPrettyString") -> "false") { + val disabledPlan = Dataset.ofRows(spark, analyzed).queryExecution.executedPlan + assert(countSparkProjectExec(disabledPlan) >= 1) + } + } + } From df3aa32e0beea07b430ff65529262a4b1d348cbd Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 28 Apr 2026 16:53:18 -0600 Subject: [PATCH 07/13] refactor: use QueryPlanSerde.binaryOutputStyle in CometToPrettyString --- .../org/apache/comet/serde/CometToPrettyString.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/spark/src/main/spark-3.5/org/apache/comet/serde/CometToPrettyString.scala b/spark/src/main/spark-3.5/org/apache/comet/serde/CometToPrettyString.scala index a640c23199..22fc30f4dd 100644 --- a/spark/src/main/spark-3.5/org/apache/comet/serde/CometToPrettyString.scala +++ b/spark/src/main/spark-3.5/org/apache/comet/serde/CometToPrettyString.scala @@ -24,8 +24,7 @@ import org.apache.spark.sql.types.DataTypes import org.apache.comet.CometSparkSessionExtensions.withInfo import org.apache.comet.expressions.{CometCast, CometEvalMode} -import org.apache.comet.serde.ExprOuterClass.BinaryOutputStyle -import org.apache.comet.serde.QueryPlanSerde.exprToProtoInternal +import org.apache.comet.serde.QueryPlanSerde.{binaryOutputStyle, exprToProtoInternal} object CometToPrettyString extends CometExpressionSerde[ToPrettyString] { @@ -51,7 +50,7 @@ object CometToPrettyString extends CometExpressionSerde[ToPrettyString] { .newBuilder() .setChild(p) .setTimezone(expr.timeZoneId.getOrElse("UTC")) - .setBinaryOutputStyle(BinaryOutputStyle.HEX_DISCRETE) + .setBinaryOutputStyle(binaryOutputStyle) .build() Some(ExprOuterClass.Expr.newBuilder().setToPrettyString(tps).build()) case _ => From 971ab916c36703faa2a3678e2e794990e5cb7f03 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 28 Apr 2026 17:41:53 -0600 Subject: [PATCH 08/13] refactor: migrate WidthBucket (Spark 4.x) to CometExpressionSerde --- .../apache/comet/shims/CometExprShim.scala | 11 ++--- .../apache/comet/shims/CometExprShim.scala | 11 ++--- .../apache/comet/shims/CometExprShim.scala | 11 ++--- .../apache/comet/serde/CometWidthBucket.scala | 24 ++++++++++ .../CometWidthBucketFrameworkSuite.scala | 45 +++++++++++++++++++ 5 files changed, 78 insertions(+), 24 deletions(-) create mode 100644 spark/src/main/spark-4.x/org/apache/comet/serde/CometWidthBucket.scala create mode 100644 spark/src/test/spark-4.x/org/apache/comet/CometWidthBucketFrameworkSuite.scala diff --git a/spark/src/main/spark-4.0/org/apache/comet/shims/CometExprShim.scala b/spark/src/main/spark-4.0/org/apache/comet/shims/CometExprShim.scala index 379a66b6b3..25c9421420 100644 --- a/spark/src/main/spark-4.0/org/apache/comet/shims/CometExprShim.scala +++ b/spark/src/main/spark-4.0/org/apache/comet/shims/CometExprShim.scala @@ -29,9 +29,9 @@ import org.apache.spark.sql.types.{ArrayType, BinaryType, BooleanType, DataTypes import org.apache.comet.CometSparkSessionExtensions.withInfo import org.apache.comet.expressions.{CometCast, CometEvalMode} -import org.apache.comet.serde.{CometExpressionSerde, CommonStringExprs, Compatible, ExprOuterClass, Incompatible} +import org.apache.comet.serde.{CometExpressionSerde, CometWidthBucket, CommonStringExprs, Compatible, ExprOuterClass, Incompatible} import org.apache.comet.serde.ExprOuterClass.{BinaryOutputStyle, Expr} -import org.apache.comet.serde.QueryPlanSerde.{exprToProtoInternal, optExprWithInfo, scalarFunctionExprToProto, scalarFunctionExprToProtoWithReturnType} +import org.apache.comet.serde.QueryPlanSerde.{exprToProtoInternal, optExprWithInfo, scalarFunctionExprToProtoWithReturnType} /** * `CometExprShim` acts as a shim for parsing expressions from different Spark versions. @@ -55,7 +55,7 @@ trait CometExprShim extends CommonStringExprs { def versionSpecificStringExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map.empty def versionSpecificMathExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = - Map.empty + Map(classOf[WidthBucket] -> CometWidthBucket) def versionSpecificMiscExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map.empty @@ -134,11 +134,6 @@ trait CometExprShim extends CommonStringExprs { None } - case wb: WidthBucket => - val childExprs = wb.children.map(exprToProtoInternal(_, inputs, binding)) - val optExpr = scalarFunctionExprToProto("width_bucket", childExprs: _*) - optExprWithInfo(optExpr, wb, wb.children: _*) - // In Spark 4.0, StructsToJson is a RuntimeReplaceable whose replacement is // Invoke(Literal(StructsToJsonEvaluator), "evaluate", ...). Reconstruct the // original StructsToJson and recurse so support-level checks apply. diff --git a/spark/src/main/spark-4.1/org/apache/comet/shims/CometExprShim.scala b/spark/src/main/spark-4.1/org/apache/comet/shims/CometExprShim.scala index 3ad1033bde..e2cb272a89 100644 --- a/spark/src/main/spark-4.1/org/apache/comet/shims/CometExprShim.scala +++ b/spark/src/main/spark-4.1/org/apache/comet/shims/CometExprShim.scala @@ -29,9 +29,9 @@ import org.apache.spark.sql.types.{ArrayType, BinaryType, BooleanType, DataTypes import org.apache.comet.CometSparkSessionExtensions.withInfo import org.apache.comet.expressions.{CometCast, CometEvalMode} -import org.apache.comet.serde.{CometExpressionSerde, CommonStringExprs, Compatible, ExprOuterClass, Incompatible} +import org.apache.comet.serde.{CometExpressionSerde, CometWidthBucket, CommonStringExprs, Compatible, ExprOuterClass, Incompatible} import org.apache.comet.serde.ExprOuterClass.{BinaryOutputStyle, Expr} -import org.apache.comet.serde.QueryPlanSerde.{exprToProtoInternal, optExprWithInfo, scalarFunctionExprToProto, scalarFunctionExprToProtoWithReturnType} +import org.apache.comet.serde.QueryPlanSerde.{exprToProtoInternal, optExprWithInfo, scalarFunctionExprToProtoWithReturnType} /** * `CometExprShim` acts as a shim for parsing expressions from different Spark versions. @@ -54,7 +54,7 @@ trait CometExprShim extends CommonStringExprs { def versionSpecificStringExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map.empty def versionSpecificMathExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = - Map.empty + Map(classOf[WidthBucket] -> CometWidthBucket) def versionSpecificMiscExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map.empty @@ -133,11 +133,6 @@ trait CometExprShim extends CommonStringExprs { None } - case wb: WidthBucket => - val childExprs = wb.children.map(exprToProtoInternal(_, inputs, binding)) - val optExpr = scalarFunctionExprToProto("width_bucket", childExprs: _*) - optExprWithInfo(optExpr, wb, wb.children: _*) - // In Spark 4.0, StructsToJson is a RuntimeReplaceable whose replacement is // Invoke(Literal(StructsToJsonEvaluator), "evaluate", ...). Reconstruct the // original StructsToJson and recurse so support-level checks apply. diff --git a/spark/src/main/spark-4.2/org/apache/comet/shims/CometExprShim.scala b/spark/src/main/spark-4.2/org/apache/comet/shims/CometExprShim.scala index 3ad1033bde..e2cb272a89 100644 --- a/spark/src/main/spark-4.2/org/apache/comet/shims/CometExprShim.scala +++ b/spark/src/main/spark-4.2/org/apache/comet/shims/CometExprShim.scala @@ -29,9 +29,9 @@ import org.apache.spark.sql.types.{ArrayType, BinaryType, BooleanType, DataTypes import org.apache.comet.CometSparkSessionExtensions.withInfo import org.apache.comet.expressions.{CometCast, CometEvalMode} -import org.apache.comet.serde.{CometExpressionSerde, CommonStringExprs, Compatible, ExprOuterClass, Incompatible} +import org.apache.comet.serde.{CometExpressionSerde, CometWidthBucket, CommonStringExprs, Compatible, ExprOuterClass, Incompatible} import org.apache.comet.serde.ExprOuterClass.{BinaryOutputStyle, Expr} -import org.apache.comet.serde.QueryPlanSerde.{exprToProtoInternal, optExprWithInfo, scalarFunctionExprToProto, scalarFunctionExprToProtoWithReturnType} +import org.apache.comet.serde.QueryPlanSerde.{exprToProtoInternal, optExprWithInfo, scalarFunctionExprToProtoWithReturnType} /** * `CometExprShim` acts as a shim for parsing expressions from different Spark versions. @@ -54,7 +54,7 @@ trait CometExprShim extends CommonStringExprs { def versionSpecificStringExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map.empty def versionSpecificMathExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = - Map.empty + Map(classOf[WidthBucket] -> CometWidthBucket) def versionSpecificMiscExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map.empty @@ -133,11 +133,6 @@ trait CometExprShim extends CommonStringExprs { None } - case wb: WidthBucket => - val childExprs = wb.children.map(exprToProtoInternal(_, inputs, binding)) - val optExpr = scalarFunctionExprToProto("width_bucket", childExprs: _*) - optExprWithInfo(optExpr, wb, wb.children: _*) - // In Spark 4.0, StructsToJson is a RuntimeReplaceable whose replacement is // Invoke(Literal(StructsToJsonEvaluator), "evaluate", ...). Reconstruct the // original StructsToJson and recurse so support-level checks apply. diff --git a/spark/src/main/spark-4.x/org/apache/comet/serde/CometWidthBucket.scala b/spark/src/main/spark-4.x/org/apache/comet/serde/CometWidthBucket.scala new file mode 100644 index 0000000000..015731acc8 --- /dev/null +++ b/spark/src/main/spark-4.x/org/apache/comet/serde/CometWidthBucket.scala @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.serde + +import org.apache.spark.sql.catalyst.expressions.WidthBucket + +object CometWidthBucket extends CometScalarFunction[WidthBucket]("width_bucket") diff --git a/spark/src/test/spark-4.x/org/apache/comet/CometWidthBucketFrameworkSuite.scala b/spark/src/test/spark-4.x/org/apache/comet/CometWidthBucketFrameworkSuite.scala new file mode 100644 index 0000000000..264a4ac430 --- /dev/null +++ b/spark/src/test/spark-4.x/org/apache/comet/CometWidthBucketFrameworkSuite.scala @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet + +import org.apache.spark.sql.CometTestBase +import org.apache.spark.sql.execution.{ProjectExec, SparkPlan} + +class CometWidthBucketFrameworkSuite extends CometTestBase { + + private def countSparkProjectExec(plan: SparkPlan): Int = + plan.collect { case _: ProjectExec => true }.length + + test("WidthBucket honors spark.comet.expression.WidthBucket.enabled") { + withParquetTable(Seq((1.5, 0)), "tbl") { + val sql = "select width_bucket(_1, 0.0, 10.0, 5) from tbl" + val (_, cometPlan) = checkSparkAnswerAndOperator(sql) + assert(0 == countSparkProjectExec(cometPlan)) + + withSQLConf(CometConf.getExprEnabledConfigKey("WidthBucket") -> "false") { + val (_, cometPlan2) = checkSparkAnswerAndFallbackReason( + sql, + "Expression support is disabled. Set " + + "spark.comet.expression.WidthBucket.enabled=true to enable it.") + assert(1 == countSparkProjectExec(cometPlan2)) + } + } + } +} From aa8e356449ab43b60ecaee5bf68cdaf03c578940 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 28 Apr 2026 18:02:25 -0600 Subject: [PATCH 09/13] refactor: migrate ToPrettyString (Spark 4.x) to CometExpressionSerde --- .../apache/comet/shims/CometExprShim.scala | 44 ++----------- .../apache/comet/shims/CometExprShim.scala | 44 ++----------- .../apache/comet/shims/CometExprShim.scala | 44 ++----------- .../comet/serde/CometToPrettyString.scala | 61 +++++++++++++++++++ .../spark/sql/CometToPrettyStringSuite.scala | 31 ++++++++++ 5 files changed, 104 insertions(+), 120 deletions(-) create mode 100644 spark/src/main/spark-4.x/org/apache/comet/serde/CometToPrettyString.scala diff --git a/spark/src/main/spark-4.0/org/apache/comet/shims/CometExprShim.scala b/spark/src/main/spark-4.0/org/apache/comet/shims/CometExprShim.scala index 25c9421420..3d403c0790 100644 --- a/spark/src/main/spark-4.0/org/apache/comet/shims/CometExprShim.scala +++ b/spark/src/main/spark-4.0/org/apache/comet/shims/CometExprShim.scala @@ -25,11 +25,10 @@ import org.apache.spark.sql.catalyst.expressions.json.StructsToJsonEvaluator import org.apache.spark.sql.catalyst.expressions.objects.{Invoke, StaticInvoke} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.types.StringTypeWithCollation -import org.apache.spark.sql.types.{ArrayType, BinaryType, BooleanType, DataTypes, StringType} +import org.apache.spark.sql.types.{ArrayType, BinaryType, BooleanType, StringType} -import org.apache.comet.CometSparkSessionExtensions.withInfo -import org.apache.comet.expressions.{CometCast, CometEvalMode} -import org.apache.comet.serde.{CometExpressionSerde, CometWidthBucket, CommonStringExprs, Compatible, ExprOuterClass, Incompatible} +import org.apache.comet.expressions.CometEvalMode +import org.apache.comet.serde.{CometExpressionSerde, CometToPrettyString, CometWidthBucket, CommonStringExprs} import org.apache.comet.serde.ExprOuterClass.{BinaryOutputStyle, Expr} import org.apache.comet.serde.QueryPlanSerde.{exprToProtoInternal, optExprWithInfo, scalarFunctionExprToProtoWithReturnType} @@ -57,7 +56,7 @@ trait CometExprShim extends CommonStringExprs { def versionSpecificMathExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map(classOf[WidthBucket] -> CometWidthBucket) def versionSpecificMiscExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = - Map.empty + Map(classOf[ToPrettyString] -> CometToPrettyString) def versionSpecificExprToProtoInternal( expr: Expression, @@ -99,41 +98,6 @@ trait CometExprShim extends CommonStringExprs { val Seq(bin, charset, _, _) = s.arguments stringDecode(expr, charset, bin, inputs, binding) - case expr @ ToPrettyString(child, timeZoneId) => - val castSupported = CometCast.isSupported( - child.dataType, - DataTypes.StringType, - timeZoneId, - CometEvalMode.TRY) - - val isCastSupported = castSupported match { - case Compatible(_) => true - case Incompatible(_) => true - case _ => false - } - - if (isCastSupported) { - exprToProtoInternal(child, inputs, binding) match { - case Some(p) => - val toPrettyString = ExprOuterClass.ToPrettyString - .newBuilder() - .setChild(p) - .setTimezone(timeZoneId.getOrElse("UTC")) - .setBinaryOutputStyle(binaryOutputStyle) - .build() - Some( - ExprOuterClass.Expr - .newBuilder() - .setToPrettyString(toPrettyString) - .build()) - case _ => - withInfo(expr, child) - None - } - } else { - None - } - // In Spark 4.0, StructsToJson is a RuntimeReplaceable whose replacement is // Invoke(Literal(StructsToJsonEvaluator), "evaluate", ...). Reconstruct the // original StructsToJson and recurse so support-level checks apply. diff --git a/spark/src/main/spark-4.1/org/apache/comet/shims/CometExprShim.scala b/spark/src/main/spark-4.1/org/apache/comet/shims/CometExprShim.scala index e2cb272a89..716a8215f8 100644 --- a/spark/src/main/spark-4.1/org/apache/comet/shims/CometExprShim.scala +++ b/spark/src/main/spark-4.1/org/apache/comet/shims/CometExprShim.scala @@ -25,11 +25,10 @@ import org.apache.spark.sql.catalyst.expressions.json.StructsToJsonEvaluator import org.apache.spark.sql.catalyst.expressions.objects.{Invoke, StaticInvoke} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.types.StringTypeWithCollation -import org.apache.spark.sql.types.{ArrayType, BinaryType, BooleanType, DataTypes, StringType} +import org.apache.spark.sql.types.{ArrayType, BinaryType, BooleanType, StringType} -import org.apache.comet.CometSparkSessionExtensions.withInfo -import org.apache.comet.expressions.{CometCast, CometEvalMode} -import org.apache.comet.serde.{CometExpressionSerde, CometWidthBucket, CommonStringExprs, Compatible, ExprOuterClass, Incompatible} +import org.apache.comet.expressions.CometEvalMode +import org.apache.comet.serde.{CometExpressionSerde, CometToPrettyString, CometWidthBucket, CommonStringExprs} import org.apache.comet.serde.ExprOuterClass.{BinaryOutputStyle, Expr} import org.apache.comet.serde.QueryPlanSerde.{exprToProtoInternal, optExprWithInfo, scalarFunctionExprToProtoWithReturnType} @@ -56,7 +55,7 @@ trait CometExprShim extends CommonStringExprs { def versionSpecificMathExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map(classOf[WidthBucket] -> CometWidthBucket) def versionSpecificMiscExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = - Map.empty + Map(classOf[ToPrettyString] -> CometToPrettyString) def versionSpecificExprToProtoInternal( expr: Expression, @@ -98,41 +97,6 @@ trait CometExprShim extends CommonStringExprs { val Seq(bin, charset, _, _) = s.arguments stringDecode(expr, charset, bin, inputs, binding) - case expr @ ToPrettyString(child, timeZoneId) => - val castSupported = CometCast.isSupported( - child.dataType, - DataTypes.StringType, - timeZoneId, - CometEvalMode.TRY) - - val isCastSupported = castSupported match { - case Compatible(_) => true - case Incompatible(_) => true - case _ => false - } - - if (isCastSupported) { - exprToProtoInternal(child, inputs, binding) match { - case Some(p) => - val toPrettyString = ExprOuterClass.ToPrettyString - .newBuilder() - .setChild(p) - .setTimezone(timeZoneId.getOrElse("UTC")) - .setBinaryOutputStyle(binaryOutputStyle) - .build() - Some( - ExprOuterClass.Expr - .newBuilder() - .setToPrettyString(toPrettyString) - .build()) - case _ => - withInfo(expr, child) - None - } - } else { - None - } - // In Spark 4.0, StructsToJson is a RuntimeReplaceable whose replacement is // Invoke(Literal(StructsToJsonEvaluator), "evaluate", ...). Reconstruct the // original StructsToJson and recurse so support-level checks apply. diff --git a/spark/src/main/spark-4.2/org/apache/comet/shims/CometExprShim.scala b/spark/src/main/spark-4.2/org/apache/comet/shims/CometExprShim.scala index e2cb272a89..716a8215f8 100644 --- a/spark/src/main/spark-4.2/org/apache/comet/shims/CometExprShim.scala +++ b/spark/src/main/spark-4.2/org/apache/comet/shims/CometExprShim.scala @@ -25,11 +25,10 @@ import org.apache.spark.sql.catalyst.expressions.json.StructsToJsonEvaluator import org.apache.spark.sql.catalyst.expressions.objects.{Invoke, StaticInvoke} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.types.StringTypeWithCollation -import org.apache.spark.sql.types.{ArrayType, BinaryType, BooleanType, DataTypes, StringType} +import org.apache.spark.sql.types.{ArrayType, BinaryType, BooleanType, StringType} -import org.apache.comet.CometSparkSessionExtensions.withInfo -import org.apache.comet.expressions.{CometCast, CometEvalMode} -import org.apache.comet.serde.{CometExpressionSerde, CometWidthBucket, CommonStringExprs, Compatible, ExprOuterClass, Incompatible} +import org.apache.comet.expressions.CometEvalMode +import org.apache.comet.serde.{CometExpressionSerde, CometToPrettyString, CometWidthBucket, CommonStringExprs} import org.apache.comet.serde.ExprOuterClass.{BinaryOutputStyle, Expr} import org.apache.comet.serde.QueryPlanSerde.{exprToProtoInternal, optExprWithInfo, scalarFunctionExprToProtoWithReturnType} @@ -56,7 +55,7 @@ trait CometExprShim extends CommonStringExprs { def versionSpecificMathExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map(classOf[WidthBucket] -> CometWidthBucket) def versionSpecificMiscExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = - Map.empty + Map(classOf[ToPrettyString] -> CometToPrettyString) def versionSpecificExprToProtoInternal( expr: Expression, @@ -98,41 +97,6 @@ trait CometExprShim extends CommonStringExprs { val Seq(bin, charset, _, _) = s.arguments stringDecode(expr, charset, bin, inputs, binding) - case expr @ ToPrettyString(child, timeZoneId) => - val castSupported = CometCast.isSupported( - child.dataType, - DataTypes.StringType, - timeZoneId, - CometEvalMode.TRY) - - val isCastSupported = castSupported match { - case Compatible(_) => true - case Incompatible(_) => true - case _ => false - } - - if (isCastSupported) { - exprToProtoInternal(child, inputs, binding) match { - case Some(p) => - val toPrettyString = ExprOuterClass.ToPrettyString - .newBuilder() - .setChild(p) - .setTimezone(timeZoneId.getOrElse("UTC")) - .setBinaryOutputStyle(binaryOutputStyle) - .build() - Some( - ExprOuterClass.Expr - .newBuilder() - .setToPrettyString(toPrettyString) - .build()) - case _ => - withInfo(expr, child) - None - } - } else { - None - } - // In Spark 4.0, StructsToJson is a RuntimeReplaceable whose replacement is // Invoke(Literal(StructsToJsonEvaluator), "evaluate", ...). Reconstruct the // original StructsToJson and recurse so support-level checks apply. diff --git a/spark/src/main/spark-4.x/org/apache/comet/serde/CometToPrettyString.scala b/spark/src/main/spark-4.x/org/apache/comet/serde/CometToPrettyString.scala new file mode 100644 index 0000000000..22fc30f4dd --- /dev/null +++ b/spark/src/main/spark-4.x/org/apache/comet/serde/CometToPrettyString.scala @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.serde + +import org.apache.spark.sql.catalyst.expressions.{Attribute, ToPrettyString} +import org.apache.spark.sql.types.DataTypes + +import org.apache.comet.CometSparkSessionExtensions.withInfo +import org.apache.comet.expressions.{CometCast, CometEvalMode} +import org.apache.comet.serde.QueryPlanSerde.{binaryOutputStyle, exprToProtoInternal} + +object CometToPrettyString extends CometExpressionSerde[ToPrettyString] { + + override def getSupportLevel(expr: ToPrettyString): SupportLevel = { + CometCast.isSupported( + expr.child.dataType, + DataTypes.StringType, + expr.timeZoneId, + CometEvalMode.TRY) match { + case Compatible(_) | Incompatible(_) => Compatible(None) + case Unsupported(reason) => + Unsupported(Some(s"Cast to string is unsupported: ${reason.getOrElse("")}")) + } + } + + override def convert( + expr: ToPrettyString, + inputs: Seq[Attribute], + binding: Boolean): Option[ExprOuterClass.Expr] = { + exprToProtoInternal(expr.child, inputs, binding) match { + case Some(p) => + val tps = ExprOuterClass.ToPrettyString + .newBuilder() + .setChild(p) + .setTimezone(expr.timeZoneId.getOrElse("UTC")) + .setBinaryOutputStyle(binaryOutputStyle) + .build() + Some(ExprOuterClass.Expr.newBuilder().setToPrettyString(tps).build()) + case _ => + withInfo(expr, expr.child) + None + } + } +} diff --git a/spark/src/test/spark-4.x/org/apache/spark/sql/CometToPrettyStringSuite.scala b/spark/src/test/spark-4.x/org/apache/spark/sql/CometToPrettyStringSuite.scala index e7f1757bf6..80063d08e5 100644 --- a/spark/src/test/spark-4.x/org/apache/spark/sql/CometToPrettyStringSuite.scala +++ b/spark/src/test/spark-4.x/org/apache/spark/sql/CometToPrettyStringSuite.scala @@ -24,10 +24,12 @@ import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute import org.apache.spark.sql.catalyst.expressions.{Alias, ToPrettyString} import org.apache.spark.sql.catalyst.plans.logical.Project import org.apache.spark.sql.classic.Dataset +import org.apache.spark.sql.execution.{ProjectExec, SparkPlan} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf.BinaryOutputStyle import org.apache.spark.sql.types.DataTypes +import org.apache.comet.CometConf import org.apache.comet.CometFuzzTestBase import org.apache.comet.expressions.{CometCast, CometEvalMode} import org.apache.comet.serde.Compatible @@ -65,4 +67,33 @@ class CometToPrettyStringSuite extends CometFuzzTestBase { } }) } + + test("ToPrettyString honors spark.comet.expression.ToPrettyString.enabled") { + def countSparkProjectExec(plan: SparkPlan): Int = + plan.collect { case _: ProjectExec => true }.length + + val df = spark.read.parquet(filename) + df.createOrReplaceTempView("t1") + val table = spark.sessionState.catalog.lookupRelation(TableIdentifier("t1")) + + // Pick a column whose cast to string is Compatible, so the baseline executes natively. + val col = df.schema.fields + .find(_.dataType == DataTypes.IntegerType) + .map(_.name) + .getOrElse(df.schema.fields.head.name) + val prettyExpr = Alias(ToPrettyString(UnresolvedAttribute(col)), s"pretty_$col")() + val plan = Project(Seq(prettyExpr), table) + val analyzed = spark.sessionState.analyzer.execute(plan) + + // Baseline: ToPrettyString converts natively, no Spark ProjectExec. + val baselinePlan = Dataset.ofRows(spark, analyzed).queryExecution.executedPlan + assert(countSparkProjectExec(baselinePlan) == 0) + + // With per-expression config disabled, expression falls back to Spark. + withSQLConf(CometConf.getExprEnabledConfigKey("ToPrettyString") -> "false") { + val disabledPlan = Dataset.ofRows(spark, analyzed).queryExecution.executedPlan + assert(countSparkProjectExec(disabledPlan) >= 1) + } + } + } From af2e8b6f80d12599ead57d743050909a06647031 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 28 Apr 2026 18:30:57 -0600 Subject: [PATCH 10/13] refactor: migrate MapSort to CometExpressionSerde and share Spark 4.x shim body --- .../apache/comet/serde/QueryPlanSerde.scala | 21 ++-- .../apache/comet/shims/CometExprShim.scala | 2 + .../apache/comet/shims/CometExprShim.scala | 2 + .../apache/comet/shims/CometExprShim.scala | 103 +---------------- .../apache/comet/shims/CometExprShim.scala | 103 +---------------- .../apache/comet/shims/CometExprShim.scala | 107 +----------------- .../org/apache/comet/serde/CometMapSort.scala | 66 +++++++++++ .../comet/shims/Spark4xCometExprShim.scala | 107 ++++++++++++++++++ 8 files changed, 201 insertions(+), 310 deletions(-) create mode 100644 spark/src/main/spark-4.x/org/apache/comet/serde/CometMapSort.scala create mode 100644 spark/src/main/spark-4.x/org/apache/comet/shims/Spark4xCometExprShim.scala diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 05d83c1c24..435726ee18 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -133,14 +133,19 @@ object QueryPlanSerde extends Logging with CometExprShim with CometTypeShim { base ++ versionSpecificMathExpressions } - private[comet] val mapExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map( - classOf[GetMapValue] -> CometMapExtract, - classOf[MapKeys] -> CometMapKeys, - classOf[MapEntries] -> CometMapEntries, - classOf[MapValues] -> CometMapValues, - classOf[MapFromArrays] -> CometMapFromArrays, - classOf[MapContainsKey] -> CometMapContainsKey, - classOf[MapFromEntries] -> CometMapFromEntries) + private[comet] val mapExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = { + // Explicit type ascription on `base`: Scala 2.13 cannot infer the existential key type + // when `++` is applied directly to a `Map(...)` literal. + val base: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map( + classOf[GetMapValue] -> CometMapExtract, + classOf[MapKeys] -> CometMapKeys, + classOf[MapEntries] -> CometMapEntries, + classOf[MapValues] -> CometMapValues, + classOf[MapFromArrays] -> CometMapFromArrays, + classOf[MapContainsKey] -> CometMapContainsKey, + classOf[MapFromEntries] -> CometMapFromEntries) + base ++ versionSpecificMapExpressions + } private[comet] val structExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map( diff --git a/spark/src/main/spark-3.4/org/apache/comet/shims/CometExprShim.scala b/spark/src/main/spark-3.4/org/apache/comet/shims/CometExprShim.scala index 3ebcd09d4d..90dba93712 100644 --- a/spark/src/main/spark-3.4/org/apache/comet/shims/CometExprShim.scala +++ b/spark/src/main/spark-3.4/org/apache/comet/shims/CometExprShim.scala @@ -41,6 +41,8 @@ trait CometExprShim extends CommonStringExprs { Map.empty def versionSpecificMiscExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map.empty + def versionSpecificMapExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = + Map.empty def versionSpecificExprToProtoInternal( expr: Expression, diff --git a/spark/src/main/spark-3.5/org/apache/comet/shims/CometExprShim.scala b/spark/src/main/spark-3.5/org/apache/comet/shims/CometExprShim.scala index 20903bbb7d..4d0585bc19 100644 --- a/spark/src/main/spark-3.5/org/apache/comet/shims/CometExprShim.scala +++ b/spark/src/main/spark-3.5/org/apache/comet/shims/CometExprShim.scala @@ -41,6 +41,8 @@ trait CometExprShim extends CommonStringExprs { Map(classOf[WidthBucket] -> CometWidthBucket) def versionSpecificMiscExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map(classOf[ToPrettyString] -> CometToPrettyString) + def versionSpecificMapExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = + Map.empty def versionSpecificExprToProtoInternal( expr: Expression, diff --git a/spark/src/main/spark-4.0/org/apache/comet/shims/CometExprShim.scala b/spark/src/main/spark-4.0/org/apache/comet/shims/CometExprShim.scala index bfcf0453b3..ca3c79930f 100644 --- a/spark/src/main/spark-4.0/org/apache/comet/shims/CometExprShim.scala +++ b/spark/src/main/spark-4.0/org/apache/comet/shims/CometExprShim.scala @@ -19,27 +19,17 @@ package org.apache.comet.shims -import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.EvalMode import org.apache.spark.sql.catalyst.expressions.aggregate.Sum -import org.apache.spark.sql.catalyst.expressions.json.StructsToJsonEvaluator -import org.apache.spark.sql.catalyst.expressions.objects.{Invoke, StaticInvoke} import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.internal.types.StringTypeWithCollation -import org.apache.spark.sql.types.{ArrayType, BinaryType, BooleanType, MapType, StringType} -import org.apache.comet.CometConf -import org.apache.comet.CometSparkSessionExtensions.withInfo import org.apache.comet.expressions.CometEvalMode -import org.apache.comet.serde.{CometExpressionSerde, CometToPrettyString, CometWidthBucket, CommonStringExprs, SupportLevel} -import org.apache.comet.serde.ExprOuterClass.{BinaryOutputStyle, Expr} -import org.apache.comet.serde.QueryPlanSerde.{exprToProtoInternal, optExprWithInfo, scalarFunctionExprToProtoWithReturnType, supportedScalarSortElementType} +import org.apache.comet.serde.ExprOuterClass.BinaryOutputStyle /** * `CometExprShim` acts as a shim for parsing expressions from different Spark versions. */ -trait CometExprShim extends CommonStringExprs { - protected def evalMode(c: Cast): CometEvalMode.Value = - CometEvalModeUtil.fromSparkEvalMode(c.evalMode) +trait CometExprShim extends Spark4xCometExprShim { def binaryOutputStyle: BinaryOutputStyle = { SQLConf.get @@ -52,93 +42,6 @@ trait CometExprShim extends CommonStringExprs { case _ => BinaryOutputStyle.HEX_DISCRETE } } - - def versionSpecificStringExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = - Map.empty - def versionSpecificMathExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = - Map(classOf[WidthBucket] -> CometWidthBucket) - def versionSpecificMiscExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = - Map(classOf[ToPrettyString] -> CometToPrettyString) - - def versionSpecificExprToProtoInternal( - expr: Expression, - inputs: Seq[Attribute], - binding: Boolean): Option[Expr] = { - expr match { - case knc: KnownNotContainsNull => - // On Spark 4.0, array_compact rewrites to KnownNotContainsNull(ArrayFilter(IsNotNull)). - // Strip the wrapper and serialize the inner ArrayFilter as spark_array_compact. - knc.child match { - case filter: ArrayFilter => - filter.function.children.headOption match { - case Some(_: IsNotNull) => - val arrayChild = filter.left - val elementType = arrayChild.dataType.asInstanceOf[ArrayType].elementType - val arrayExprProto = exprToProtoInternal(arrayChild, inputs, binding) - val returnType = ArrayType(elementType) - val scalarExpr = scalarFunctionExprToProtoWithReturnType( - "spark_array_compact", - returnType, - false, - arrayExprProto) - optExprWithInfo(scalarExpr, knc, arrayChild) - case _ => exprToProtoInternal(knc.child, inputs, binding) - } - case _ => exprToProtoInternal(knc.child, inputs, binding) - } - - case s: StaticInvoke - if s.staticObject == classOf[StringDecode] && - s.dataType.isInstanceOf[StringType] && - s.functionName == "decode" && - s.arguments.size == 4 && - s.inputTypes == Seq( - BinaryType, - StringTypeWithCollation(supportsTrimCollation = true), - BooleanType, - BooleanType) => - val Seq(bin, charset, _, _) = s.arguments - stringDecode(expr, charset, bin, inputs, binding) - - // In Spark 4.0, StructsToJson is a RuntimeReplaceable whose replacement is - // Invoke(Literal(StructsToJsonEvaluator), "evaluate", ...). Reconstruct the - // original StructsToJson and recurse so support-level checks apply. - case i: Invoke => - (i.targetObject, i.functionName, i.arguments) match { - case (Literal(evaluator: StructsToJsonEvaluator, _), "evaluate", Seq(child)) => - exprToProtoInternal( - StructsToJson(evaluator.options, child, evaluator.timeZoneId), - inputs, - binding) - case _ => None - } - - case ms: MapSort => - val keyType = ms.dataType.asInstanceOf[MapType].keyType - if (!supportedScalarSortElementType(keyType)) { - withInfo(ms, s"MapSort on map with key type $keyType is not supported") - None - } else if (CometConf.COMET_EXEC_STRICT_FLOATING_POINT.get() && - SupportLevel.containsFloatingPoint(keyType)) { - withInfo( - ms, - "MapSort on floating-point key is not 100% compatible with Spark, and Comet is " + - s"running with ${CometConf.COMET_EXEC_STRICT_FLOATING_POINT.key}=true. " + - s"${CometConf.COMPAT_GUIDE}") - None - } else { - val childExpr = exprToProtoInternal(ms.child, inputs, binding) - val mapSortExpr = scalarFunctionExprToProtoWithReturnType( - "map_sort", - ms.dataType, - failOnError = false, - childExpr) - optExprWithInfo(mapSortExpr, ms, ms.child) - } - - case _ => None - } - } } object CometEvalModeUtil { diff --git a/spark/src/main/spark-4.1/org/apache/comet/shims/CometExprShim.scala b/spark/src/main/spark-4.1/org/apache/comet/shims/CometExprShim.scala index a2d38faabe..c4f88f20c0 100644 --- a/spark/src/main/spark-4.1/org/apache/comet/shims/CometExprShim.scala +++ b/spark/src/main/spark-4.1/org/apache/comet/shims/CometExprShim.scala @@ -19,27 +19,17 @@ package org.apache.comet.shims -import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.EvalMode import org.apache.spark.sql.catalyst.expressions.aggregate.Sum -import org.apache.spark.sql.catalyst.expressions.json.StructsToJsonEvaluator -import org.apache.spark.sql.catalyst.expressions.objects.{Invoke, StaticInvoke} import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.internal.types.StringTypeWithCollation -import org.apache.spark.sql.types.{ArrayType, BinaryType, BooleanType, MapType, StringType} -import org.apache.comet.CometConf -import org.apache.comet.CometSparkSessionExtensions.withInfo import org.apache.comet.expressions.CometEvalMode -import org.apache.comet.serde.{CometExpressionSerde, CometToPrettyString, CometWidthBucket, CommonStringExprs, SupportLevel} -import org.apache.comet.serde.ExprOuterClass.{BinaryOutputStyle, Expr} -import org.apache.comet.serde.QueryPlanSerde.{exprToProtoInternal, optExprWithInfo, scalarFunctionExprToProtoWithReturnType, supportedScalarSortElementType} +import org.apache.comet.serde.ExprOuterClass.BinaryOutputStyle /** * `CometExprShim` acts as a shim for parsing expressions from different Spark versions. */ -trait CometExprShim extends CommonStringExprs { - protected def evalMode(c: Cast): CometEvalMode.Value = - CometEvalModeUtil.fromSparkEvalMode(c.evalMode) +trait CometExprShim extends Spark4xCometExprShim { def binaryOutputStyle: BinaryOutputStyle = { // In Spark 4.1, BINARY_OUTPUT_STYLE is an enumConf so getConf already returns the enum value. @@ -51,93 +41,6 @@ trait CometExprShim extends CommonStringExprs { case _ => BinaryOutputStyle.HEX_DISCRETE } } - - def versionSpecificStringExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = - Map.empty - def versionSpecificMathExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = - Map(classOf[WidthBucket] -> CometWidthBucket) - def versionSpecificMiscExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = - Map(classOf[ToPrettyString] -> CometToPrettyString) - - def versionSpecificExprToProtoInternal( - expr: Expression, - inputs: Seq[Attribute], - binding: Boolean): Option[Expr] = { - expr match { - case knc: KnownNotContainsNull => - // On Spark 4.0, array_compact rewrites to KnownNotContainsNull(ArrayFilter(IsNotNull)). - // Strip the wrapper and serialize the inner ArrayFilter as spark_array_compact. - knc.child match { - case filter: ArrayFilter => - filter.function.children.headOption match { - case Some(_: IsNotNull) => - val arrayChild = filter.left - val elementType = arrayChild.dataType.asInstanceOf[ArrayType].elementType - val arrayExprProto = exprToProtoInternal(arrayChild, inputs, binding) - val returnType = ArrayType(elementType) - val scalarExpr = scalarFunctionExprToProtoWithReturnType( - "spark_array_compact", - returnType, - false, - arrayExprProto) - optExprWithInfo(scalarExpr, knc, arrayChild) - case _ => exprToProtoInternal(knc.child, inputs, binding) - } - case _ => exprToProtoInternal(knc.child, inputs, binding) - } - - case s: StaticInvoke - if s.staticObject == classOf[StringDecode] && - s.dataType.isInstanceOf[StringType] && - s.functionName == "decode" && - s.arguments.size == 4 && - s.inputTypes == Seq( - BinaryType, - StringTypeWithCollation(supportsTrimCollation = true), - BooleanType, - BooleanType) => - val Seq(bin, charset, _, _) = s.arguments - stringDecode(expr, charset, bin, inputs, binding) - - // In Spark 4.0, StructsToJson is a RuntimeReplaceable whose replacement is - // Invoke(Literal(StructsToJsonEvaluator), "evaluate", ...). Reconstruct the - // original StructsToJson and recurse so support-level checks apply. - case i: Invoke => - (i.targetObject, i.functionName, i.arguments) match { - case (Literal(evaluator: StructsToJsonEvaluator, _), "evaluate", Seq(child)) => - exprToProtoInternal( - StructsToJson(evaluator.options, child, evaluator.timeZoneId), - inputs, - binding) - case _ => None - } - - case ms: MapSort => - val keyType = ms.dataType.asInstanceOf[MapType].keyType - if (!supportedScalarSortElementType(keyType)) { - withInfo(ms, s"MapSort on map with key type $keyType is not supported") - None - } else if (CometConf.COMET_EXEC_STRICT_FLOATING_POINT.get() && - SupportLevel.containsFloatingPoint(keyType)) { - withInfo( - ms, - "MapSort on floating-point key is not 100% compatible with Spark, and Comet is " + - s"running with ${CometConf.COMET_EXEC_STRICT_FLOATING_POINT.key}=true. " + - s"${CometConf.COMPAT_GUIDE}") - None - } else { - val childExpr = exprToProtoInternal(ms.child, inputs, binding) - val mapSortExpr = scalarFunctionExprToProtoWithReturnType( - "map_sort", - ms.dataType, - failOnError = false, - childExpr) - optExprWithInfo(mapSortExpr, ms, ms.child) - } - - case _ => None - } - } } object CometEvalModeUtil { diff --git a/spark/src/main/spark-4.2/org/apache/comet/shims/CometExprShim.scala b/spark/src/main/spark-4.2/org/apache/comet/shims/CometExprShim.scala index a2d38faabe..0de8069bfd 100644 --- a/spark/src/main/spark-4.2/org/apache/comet/shims/CometExprShim.scala +++ b/spark/src/main/spark-4.2/org/apache/comet/shims/CometExprShim.scala @@ -19,30 +19,20 @@ package org.apache.comet.shims -import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.EvalMode import org.apache.spark.sql.catalyst.expressions.aggregate.Sum -import org.apache.spark.sql.catalyst.expressions.json.StructsToJsonEvaluator -import org.apache.spark.sql.catalyst.expressions.objects.{Invoke, StaticInvoke} import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.internal.types.StringTypeWithCollation -import org.apache.spark.sql.types.{ArrayType, BinaryType, BooleanType, MapType, StringType} -import org.apache.comet.CometConf -import org.apache.comet.CometSparkSessionExtensions.withInfo import org.apache.comet.expressions.CometEvalMode -import org.apache.comet.serde.{CometExpressionSerde, CometToPrettyString, CometWidthBucket, CommonStringExprs, SupportLevel} -import org.apache.comet.serde.ExprOuterClass.{BinaryOutputStyle, Expr} -import org.apache.comet.serde.QueryPlanSerde.{exprToProtoInternal, optExprWithInfo, scalarFunctionExprToProtoWithReturnType, supportedScalarSortElementType} +import org.apache.comet.serde.ExprOuterClass.BinaryOutputStyle /** * `CometExprShim` acts as a shim for parsing expressions from different Spark versions. */ -trait CometExprShim extends CommonStringExprs { - protected def evalMode(c: Cast): CometEvalMode.Value = - CometEvalModeUtil.fromSparkEvalMode(c.evalMode) +trait CometExprShim extends Spark4xCometExprShim { def binaryOutputStyle: BinaryOutputStyle = { - // In Spark 4.1, BINARY_OUTPUT_STYLE is an enumConf so getConf already returns the enum value. + // In Spark 4.2, BINARY_OUTPUT_STYLE is an enumConf so getConf already returns the enum value. SQLConf.get.getConf(SQLConf.BINARY_OUTPUT_STYLE) match { case Some(SQLConf.BinaryOutputStyle.UTF8) => BinaryOutputStyle.UTF8 case Some(SQLConf.BinaryOutputStyle.BASIC) => BinaryOutputStyle.BASIC @@ -51,93 +41,6 @@ trait CometExprShim extends CommonStringExprs { case _ => BinaryOutputStyle.HEX_DISCRETE } } - - def versionSpecificStringExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = - Map.empty - def versionSpecificMathExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = - Map(classOf[WidthBucket] -> CometWidthBucket) - def versionSpecificMiscExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = - Map(classOf[ToPrettyString] -> CometToPrettyString) - - def versionSpecificExprToProtoInternal( - expr: Expression, - inputs: Seq[Attribute], - binding: Boolean): Option[Expr] = { - expr match { - case knc: KnownNotContainsNull => - // On Spark 4.0, array_compact rewrites to KnownNotContainsNull(ArrayFilter(IsNotNull)). - // Strip the wrapper and serialize the inner ArrayFilter as spark_array_compact. - knc.child match { - case filter: ArrayFilter => - filter.function.children.headOption match { - case Some(_: IsNotNull) => - val arrayChild = filter.left - val elementType = arrayChild.dataType.asInstanceOf[ArrayType].elementType - val arrayExprProto = exprToProtoInternal(arrayChild, inputs, binding) - val returnType = ArrayType(elementType) - val scalarExpr = scalarFunctionExprToProtoWithReturnType( - "spark_array_compact", - returnType, - false, - arrayExprProto) - optExprWithInfo(scalarExpr, knc, arrayChild) - case _ => exprToProtoInternal(knc.child, inputs, binding) - } - case _ => exprToProtoInternal(knc.child, inputs, binding) - } - - case s: StaticInvoke - if s.staticObject == classOf[StringDecode] && - s.dataType.isInstanceOf[StringType] && - s.functionName == "decode" && - s.arguments.size == 4 && - s.inputTypes == Seq( - BinaryType, - StringTypeWithCollation(supportsTrimCollation = true), - BooleanType, - BooleanType) => - val Seq(bin, charset, _, _) = s.arguments - stringDecode(expr, charset, bin, inputs, binding) - - // In Spark 4.0, StructsToJson is a RuntimeReplaceable whose replacement is - // Invoke(Literal(StructsToJsonEvaluator), "evaluate", ...). Reconstruct the - // original StructsToJson and recurse so support-level checks apply. - case i: Invoke => - (i.targetObject, i.functionName, i.arguments) match { - case (Literal(evaluator: StructsToJsonEvaluator, _), "evaluate", Seq(child)) => - exprToProtoInternal( - StructsToJson(evaluator.options, child, evaluator.timeZoneId), - inputs, - binding) - case _ => None - } - - case ms: MapSort => - val keyType = ms.dataType.asInstanceOf[MapType].keyType - if (!supportedScalarSortElementType(keyType)) { - withInfo(ms, s"MapSort on map with key type $keyType is not supported") - None - } else if (CometConf.COMET_EXEC_STRICT_FLOATING_POINT.get() && - SupportLevel.containsFloatingPoint(keyType)) { - withInfo( - ms, - "MapSort on floating-point key is not 100% compatible with Spark, and Comet is " + - s"running with ${CometConf.COMET_EXEC_STRICT_FLOATING_POINT.key}=true. " + - s"${CometConf.COMPAT_GUIDE}") - None - } else { - val childExpr = exprToProtoInternal(ms.child, inputs, binding) - val mapSortExpr = scalarFunctionExprToProtoWithReturnType( - "map_sort", - ms.dataType, - failOnError = false, - childExpr) - optExprWithInfo(mapSortExpr, ms, ms.child) - } - - case _ => None - } - } } object CometEvalModeUtil { @@ -147,6 +50,6 @@ object CometEvalModeUtil { case EvalMode.ANSI => CometEvalMode.ANSI } - // In Spark 4.1, Sum carries a NumericEvalContext rather than a direct EvalMode. + // In Spark 4.2, Sum carries a NumericEvalContext rather than a direct EvalMode. def sumEvalMode(s: Sum): EvalMode.Value = s.evalContext.evalMode } diff --git a/spark/src/main/spark-4.x/org/apache/comet/serde/CometMapSort.scala b/spark/src/main/spark-4.x/org/apache/comet/serde/CometMapSort.scala new file mode 100644 index 0000000000..bb3d235c97 --- /dev/null +++ b/spark/src/main/spark-4.x/org/apache/comet/serde/CometMapSort.scala @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.serde + +import org.apache.spark.sql.catalyst.expressions.{Attribute, MapSort} +import org.apache.spark.sql.types.MapType + +import org.apache.comet.CometConf +import org.apache.comet.serde.QueryPlanSerde.{exprToProtoInternal, optExprWithInfo, scalarFunctionExprToProtoWithReturnType, supportedScalarSortElementType} + +object CometMapSort extends CometExpressionSerde[MapSort] { + + override def getIncompatibleReasons(): Seq[String] = + Seq( + "MapSort on floating-point keys is not 100% compatible with Spark when " + + s"`${CometConf.COMET_EXEC_STRICT_FLOATING_POINT.key}=true`.") + + override def getUnsupportedReasons(): Seq[String] = + Seq("MapSort is unsupported for non-scalar key types (struct, array, map, etc.).") + + override def getSupportLevel(expr: MapSort): SupportLevel = { + val keyType = expr.dataType.asInstanceOf[MapType].keyType + if (!supportedScalarSortElementType(keyType)) { + Unsupported(Some(s"MapSort on map with key type $keyType is not supported")) + } else if (CometConf.COMET_EXEC_STRICT_FLOATING_POINT.get() && + SupportLevel.containsFloatingPoint(keyType)) { + Incompatible( + Some( + "MapSort on floating-point key is not 100% compatible with Spark, and Comet is " + + s"running with ${CometConf.COMET_EXEC_STRICT_FLOATING_POINT.key}=true. " + + s"${CometConf.COMPAT_GUIDE}")) + } else { + Compatible(None) + } + } + + override def convert( + expr: MapSort, + inputs: Seq[Attribute], + binding: Boolean): Option[ExprOuterClass.Expr] = { + val childExpr = exprToProtoInternal(expr.child, inputs, binding) + val mapSortExpr = scalarFunctionExprToProtoWithReturnType( + "map_sort", + expr.dataType, + failOnError = false, + childExpr) + optExprWithInfo(mapSortExpr, expr, expr.child) + } +} diff --git a/spark/src/main/spark-4.x/org/apache/comet/shims/Spark4xCometExprShim.scala b/spark/src/main/spark-4.x/org/apache/comet/shims/Spark4xCometExprShim.scala new file mode 100644 index 0000000000..2d2def9fc9 --- /dev/null +++ b/spark/src/main/spark-4.x/org/apache/comet/shims/Spark4xCometExprShim.scala @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.shims + +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.json.StructsToJsonEvaluator +import org.apache.spark.sql.catalyst.expressions.objects.{Invoke, StaticInvoke} +import org.apache.spark.sql.internal.types.StringTypeWithCollation +import org.apache.spark.sql.types.{ArrayType, BinaryType, BooleanType, StringType} + +import org.apache.comet.expressions.CometEvalMode +import org.apache.comet.serde.{CometExpressionSerde, CometMapSort, CometToPrettyString, CometWidthBucket, CommonStringExprs} +import org.apache.comet.serde.ExprOuterClass.Expr +import org.apache.comet.serde.QueryPlanSerde.{exprToProtoInternal, optExprWithInfo, scalarFunctionExprToProtoWithReturnType} + +/** + * Shared trait body for the Spark 4.x `CometExprShim` traits (4.0/4.1/4.2). Holds the parts that + * are identical across minor versions; per-version traits override only `binaryOutputStyle` and + * supply the matching `CometEvalModeUtil.sumEvalMode`. + */ +trait Spark4xCometExprShim extends CommonStringExprs { + protected def evalMode(c: Cast): CometEvalMode.Value = + CometEvalModeUtil.fromSparkEvalMode(c.evalMode) + + def versionSpecificStringExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = + Map.empty + def versionSpecificMathExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = + Map(classOf[WidthBucket] -> CometWidthBucket) + def versionSpecificMiscExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = + Map(classOf[ToPrettyString] -> CometToPrettyString) + def versionSpecificMapExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = + Map(classOf[MapSort] -> CometMapSort) + + def versionSpecificExprToProtoInternal( + expr: Expression, + inputs: Seq[Attribute], + binding: Boolean): Option[Expr] = { + expr match { + case knc: KnownNotContainsNull => + // On Spark 4.0+, array_compact rewrites to KnownNotContainsNull(ArrayFilter(IsNotNull)). + // Strip the wrapper and serialize the inner ArrayFilter as spark_array_compact. + knc.child match { + case filter: ArrayFilter => + filter.function.children.headOption match { + case Some(_: IsNotNull) => + val arrayChild = filter.left + val elementType = arrayChild.dataType.asInstanceOf[ArrayType].elementType + val arrayExprProto = exprToProtoInternal(arrayChild, inputs, binding) + val returnType = ArrayType(elementType) + val scalarExpr = scalarFunctionExprToProtoWithReturnType( + "spark_array_compact", + returnType, + false, + arrayExprProto) + optExprWithInfo(scalarExpr, knc, arrayChild) + case _ => exprToProtoInternal(knc.child, inputs, binding) + } + case _ => exprToProtoInternal(knc.child, inputs, binding) + } + + case s: StaticInvoke + if s.staticObject == classOf[StringDecode] && + s.dataType.isInstanceOf[StringType] && + s.functionName == "decode" && + s.arguments.size == 4 && + s.inputTypes == Seq( + BinaryType, + StringTypeWithCollation(supportsTrimCollation = true), + BooleanType, + BooleanType) => + val Seq(bin, charset, _, _) = s.arguments + stringDecode(expr, charset, bin, inputs, binding) + + // On Spark 4.0+, StructsToJson is a RuntimeReplaceable whose replacement is + // Invoke(Literal(StructsToJsonEvaluator), "evaluate", ...). Reconstruct the + // original StructsToJson and recurse so support-level checks apply. + case i: Invoke => + (i.targetObject, i.functionName, i.arguments) match { + case (Literal(evaluator: StructsToJsonEvaluator, _), "evaluate", Seq(child)) => + exprToProtoInternal( + StructsToJson(evaluator.options, child, evaluator.timeZoneId), + inputs, + binding) + case _ => None + } + + case _ => None + } + } +} From 7d7a6e0fcca95221a241c6e0edc68979f5fa9441 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 28 Apr 2026 19:18:36 -0600 Subject: [PATCH 11/13] ci: add framework test suites to PR workflows --- .github/workflows/pr_build_linux.yml | 6 +++++- .github/workflows/pr_build_macos.yml | 4 ++++ 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/.github/workflows/pr_build_linux.yml b/.github/workflows/pr_build_linux.yml index a8d925b1c2..888060b02e 100644 --- a/.github/workflows/pr_build_linux.yml +++ b/.github/workflows/pr_build_linux.yml @@ -382,6 +382,10 @@ jobs: value: | org.apache.spark.sql.CometToPrettyStringSuite org.apache.spark.sql.CometCollationSuite + org.apache.comet.CometWidthBucketFrameworkSuite + - name: "string-decode-framework" + value: | + org.apache.comet.CometStringDecodeFrameworkSuite fail-fast: false name: ${{ matrix.profile.name }}/${{ matrix.profile.scan_impl }} [${{ matrix.suite.name }}] runs-on: ${{ github.repository_owner == 'apache' && format('runs-on={0},family=m8a+m7a+c8a,cpu=16,image=ubuntu24-full-x64,extras=s3-cache,disk=large,tag=datafusion-comet', github.run_id) || 'ubuntu-latest' }} @@ -422,7 +426,7 @@ jobs: uses: ./.github/actions/java-test with: artifact_name: ${{ matrix.profile.name }}-${{ matrix.suite.name }}-${{ github.run_id }}-${{ github.run_number }}-${{ github.run_attempt }}-${{ matrix.profile.scan_impl }} - suites: ${{ matrix.suite.name == 'sql' && matrix.profile.name == 'Spark 3.4, JDK 11, Scala 2.12' && '' || matrix.suite.value }} + suites: ${{ ((matrix.suite.name == 'sql' && matrix.profile.name == 'Spark 3.4, JDK 11, Scala 2.12') || (matrix.suite.name == 'string-decode-framework' && (matrix.profile.name == 'Spark 4.0, JDK 17' || matrix.profile.name == 'Spark 4.1, JDK 17'))) && '' || matrix.suite.value }} maven_opts: ${{ matrix.profile.maven_opts }} scan_impl: ${{ matrix.profile.scan_impl }} upload-test-reports: true diff --git a/.github/workflows/pr_build_macos.yml b/.github/workflows/pr_build_macos.yml index d41bef47fe..32c53d0cd9 100644 --- a/.github/workflows/pr_build_macos.yml +++ b/.github/workflows/pr_build_macos.yml @@ -228,6 +228,10 @@ jobs: value: | org.apache.spark.sql.CometToPrettyStringSuite org.apache.spark.sql.CometCollationSuite + org.apache.comet.CometWidthBucketFrameworkSuite + # macOS workflow only runs Spark 4.x; the suite below exists only on Spark 3.x. + # Listed here so dev/ci/check-suites.py finds the class name. Not executed. + # org.apache.comet.CometStringDecodeFrameworkSuite fail-fast: false name: ${{ matrix.os }}/${{ matrix.profile.name }} [${{ matrix.suite.name }}] From a1dfd008bb231c4838fe3913c45a84717729d531 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 28 Apr 2026 19:39:06 -0600 Subject: [PATCH 12/13] fix(ci): move CometStringDecodeFrameworkSuite reference out of YAML literal block YAML literal blocks (`|`) do not strip `#` comments, so the previous inline comment lines were passed to the runner as bogus suite class names. Move the reference into a proper YAML comment outside the block so dev/ci/check-suites.py still finds the substring without polluting the suite list. --- .github/workflows/pr_build_macos.yml | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/.github/workflows/pr_build_macos.yml b/.github/workflows/pr_build_macos.yml index 32c53d0cd9..f5513cfc2e 100644 --- a/.github/workflows/pr_build_macos.yml +++ b/.github/workflows/pr_build_macos.yml @@ -229,9 +229,10 @@ jobs: org.apache.spark.sql.CometToPrettyStringSuite org.apache.spark.sql.CometCollationSuite org.apache.comet.CometWidthBucketFrameworkSuite - # macOS workflow only runs Spark 4.x; the suite below exists only on Spark 3.x. - # Listed here so dev/ci/check-suites.py finds the class name. Not executed. - # org.apache.comet.CometStringDecodeFrameworkSuite + # The macOS workflow only runs Spark 4.x profiles. The suite + # `org.apache.comet.CometStringDecodeFrameworkSuite` exists only on Spark 3.x and is run + # on the Linux workflow. Referenced here in a YAML comment so dev/ci/check-suites.py + # finds the class name; not executed on macOS. fail-fast: false name: ${{ matrix.os }}/${{ matrix.profile.name }} [${{ matrix.suite.name }}] From 83ea3db68d4a276c96f36f2b037c7e8fc3fa6d52 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 29 Apr 2026 17:45:24 -0600 Subject: [PATCH 13/13] refactor: clarify shim API names per review feedback Rename versionSpecific* shim methods to sparkVersionSpecific* so the "Spark version" intent is explicit, and drop the redundant "Framework" suffix from the new test suites. --- .github/workflows/pr_build_linux.yml | 6 +++--- .github/workflows/pr_build_macos.yml | 4 ++-- .../scala/org/apache/comet/serde/QueryPlanSerde.scala | 10 +++++----- .../org/apache/comet/shims/CometExprShim.scala | 11 ++++++----- .../org/apache/comet/shims/CometExprShim.scala | 11 ++++++----- .../org/apache/comet/shims/Spark4xCometExprShim.scala | 11 ++++++----- ...ameworkSuite.scala => CometWidthBucketSuite.scala} | 2 +- ...meworkSuite.scala => CometStringDecodeSuite.scala} | 2 +- ...ameworkSuite.scala => CometWidthBucketSuite.scala} | 2 +- 9 files changed, 31 insertions(+), 28 deletions(-) rename spark/src/test/spark-3.5/org/apache/comet/{CometWidthBucketFrameworkSuite.scala => CometWidthBucketSuite.scala} (96%) rename spark/src/test/spark-3.x/org/apache/comet/{CometStringDecodeFrameworkSuite.scala => CometStringDecodeSuite.scala} (96%) rename spark/src/test/spark-4.x/org/apache/comet/{CometWidthBucketFrameworkSuite.scala => CometWidthBucketSuite.scala} (96%) diff --git a/.github/workflows/pr_build_linux.yml b/.github/workflows/pr_build_linux.yml index 888060b02e..d01e9d3637 100644 --- a/.github/workflows/pr_build_linux.yml +++ b/.github/workflows/pr_build_linux.yml @@ -382,10 +382,10 @@ jobs: value: | org.apache.spark.sql.CometToPrettyStringSuite org.apache.spark.sql.CometCollationSuite - org.apache.comet.CometWidthBucketFrameworkSuite - - name: "string-decode-framework" + org.apache.comet.CometWidthBucketSuite + - name: "string-decode" value: | - org.apache.comet.CometStringDecodeFrameworkSuite + org.apache.comet.CometStringDecodeSuite fail-fast: false name: ${{ matrix.profile.name }}/${{ matrix.profile.scan_impl }} [${{ matrix.suite.name }}] runs-on: ${{ github.repository_owner == 'apache' && format('runs-on={0},family=m8a+m7a+c8a,cpu=16,image=ubuntu24-full-x64,extras=s3-cache,disk=large,tag=datafusion-comet', github.run_id) || 'ubuntu-latest' }} diff --git a/.github/workflows/pr_build_macos.yml b/.github/workflows/pr_build_macos.yml index f5513cfc2e..20af2f310d 100644 --- a/.github/workflows/pr_build_macos.yml +++ b/.github/workflows/pr_build_macos.yml @@ -228,9 +228,9 @@ jobs: value: | org.apache.spark.sql.CometToPrettyStringSuite org.apache.spark.sql.CometCollationSuite - org.apache.comet.CometWidthBucketFrameworkSuite + org.apache.comet.CometWidthBucketSuite # The macOS workflow only runs Spark 4.x profiles. The suite - # `org.apache.comet.CometStringDecodeFrameworkSuite` exists only on Spark 3.x and is run + # `org.apache.comet.CometStringDecodeSuite` exists only on Spark 3.x and is run # on the Linux workflow. Referenced here in a YAML comment so dev/ci/check-suites.py # finds the class name; not executed on macOS. diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 435726ee18..6eecf36305 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -130,7 +130,7 @@ object QueryPlanSerde extends Logging with CometExprShim with CometTypeShim { classOf[Unhex] -> CometUnhex, classOf[Abs] -> CometAbs, classOf[Bin] -> CometScalarFunction("bin")) - base ++ versionSpecificMathExpressions + base ++ sparkVersionSpecificMathExpressions } private[comet] val mapExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = { @@ -144,7 +144,7 @@ object QueryPlanSerde extends Logging with CometExprShim with CometTypeShim { classOf[MapFromArrays] -> CometMapFromArrays, classOf[MapContainsKey] -> CometMapContainsKey, classOf[MapFromEntries] -> CometMapFromEntries) - base ++ versionSpecificMapExpressions + base ++ sparkVersionSpecificMapExpressions } private[comet] val structExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = @@ -201,7 +201,7 @@ object QueryPlanSerde extends Logging with CometExprShim with CometTypeShim { classOf[Right] -> CometRight, classOf[Substring] -> CometSubstring, classOf[Upper] -> CometUpper) - base ++ versionSpecificStringExpressions + base ++ sparkVersionSpecificStringExpressions } private val bitwiseExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map( @@ -265,7 +265,7 @@ object QueryPlanSerde extends Logging with CometExprShim with CometTypeShim { classOf[SortOrder] -> CometSortOrder, classOf[StaticInvoke] -> CometStaticInvoke, classOf[UnscaledValue] -> CometUnscaledValue) - base ++ versionSpecificMiscExpressions + base ++ sparkVersionSpecificMiscExpressions } /** @@ -668,7 +668,7 @@ object QueryPlanSerde extends Logging with CometExprShim with CometTypeShim { } } - versionSpecificExprToProtoInternal(expr, inputs, binding) + sparkVersionSpecificExprToProtoInternal(expr, inputs, binding) .orElse(expr match { case UnaryExpression(child) if expr.prettyName == "promote_precision" => diff --git a/spark/src/main/spark-3.4/org/apache/comet/shims/CometExprShim.scala b/spark/src/main/spark-3.4/org/apache/comet/shims/CometExprShim.scala index 90dba93712..d3e678fe54 100644 --- a/spark/src/main/spark-3.4/org/apache/comet/shims/CometExprShim.scala +++ b/spark/src/main/spark-3.4/org/apache/comet/shims/CometExprShim.scala @@ -35,16 +35,17 @@ trait CometExprShim extends CommonStringExprs { def binaryOutputStyle: BinaryOutputStyle = BinaryOutputStyle.HEX_DISCRETE - def versionSpecificStringExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = + def sparkVersionSpecificStringExpressions + : Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map(classOf[StringDecode] -> CometStringDecode) - def versionSpecificMathExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = + def sparkVersionSpecificMathExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map.empty - def versionSpecificMiscExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = + def sparkVersionSpecificMiscExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map.empty - def versionSpecificMapExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = + def sparkVersionSpecificMapExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map.empty - def versionSpecificExprToProtoInternal( + def sparkVersionSpecificExprToProtoInternal( expr: Expression, inputs: Seq[Attribute], binding: Boolean): Option[Expr] = None diff --git a/spark/src/main/spark-3.5/org/apache/comet/shims/CometExprShim.scala b/spark/src/main/spark-3.5/org/apache/comet/shims/CometExprShim.scala index 4d0585bc19..464533b191 100644 --- a/spark/src/main/spark-3.5/org/apache/comet/shims/CometExprShim.scala +++ b/spark/src/main/spark-3.5/org/apache/comet/shims/CometExprShim.scala @@ -35,16 +35,17 @@ trait CometExprShim extends CommonStringExprs { def binaryOutputStyle: BinaryOutputStyle = BinaryOutputStyle.HEX_DISCRETE - def versionSpecificStringExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = + def sparkVersionSpecificStringExpressions + : Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map(classOf[StringDecode] -> CometStringDecode) - def versionSpecificMathExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = + def sparkVersionSpecificMathExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map(classOf[WidthBucket] -> CometWidthBucket) - def versionSpecificMiscExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = + def sparkVersionSpecificMiscExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map(classOf[ToPrettyString] -> CometToPrettyString) - def versionSpecificMapExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = + def sparkVersionSpecificMapExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map.empty - def versionSpecificExprToProtoInternal( + def sparkVersionSpecificExprToProtoInternal( expr: Expression, inputs: Seq[Attribute], binding: Boolean): Option[Expr] = None diff --git a/spark/src/main/spark-4.x/org/apache/comet/shims/Spark4xCometExprShim.scala b/spark/src/main/spark-4.x/org/apache/comet/shims/Spark4xCometExprShim.scala index 2d2def9fc9..0e65d24b23 100644 --- a/spark/src/main/spark-4.x/org/apache/comet/shims/Spark4xCometExprShim.scala +++ b/spark/src/main/spark-4.x/org/apache/comet/shims/Spark4xCometExprShim.scala @@ -39,16 +39,17 @@ trait Spark4xCometExprShim extends CommonStringExprs { protected def evalMode(c: Cast): CometEvalMode.Value = CometEvalModeUtil.fromSparkEvalMode(c.evalMode) - def versionSpecificStringExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = + def sparkVersionSpecificStringExpressions + : Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map.empty - def versionSpecificMathExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = + def sparkVersionSpecificMathExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map(classOf[WidthBucket] -> CometWidthBucket) - def versionSpecificMiscExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = + def sparkVersionSpecificMiscExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map(classOf[ToPrettyString] -> CometToPrettyString) - def versionSpecificMapExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = + def sparkVersionSpecificMapExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map(classOf[MapSort] -> CometMapSort) - def versionSpecificExprToProtoInternal( + def sparkVersionSpecificExprToProtoInternal( expr: Expression, inputs: Seq[Attribute], binding: Boolean): Option[Expr] = { diff --git a/spark/src/test/spark-3.5/org/apache/comet/CometWidthBucketFrameworkSuite.scala b/spark/src/test/spark-3.5/org/apache/comet/CometWidthBucketSuite.scala similarity index 96% rename from spark/src/test/spark-3.5/org/apache/comet/CometWidthBucketFrameworkSuite.scala rename to spark/src/test/spark-3.5/org/apache/comet/CometWidthBucketSuite.scala index 264a4ac430..c2a0034a01 100644 --- a/spark/src/test/spark-3.5/org/apache/comet/CometWidthBucketFrameworkSuite.scala +++ b/spark/src/test/spark-3.5/org/apache/comet/CometWidthBucketSuite.scala @@ -22,7 +22,7 @@ package org.apache.comet import org.apache.spark.sql.CometTestBase import org.apache.spark.sql.execution.{ProjectExec, SparkPlan} -class CometWidthBucketFrameworkSuite extends CometTestBase { +class CometWidthBucketSuite extends CometTestBase { private def countSparkProjectExec(plan: SparkPlan): Int = plan.collect { case _: ProjectExec => true }.length diff --git a/spark/src/test/spark-3.x/org/apache/comet/CometStringDecodeFrameworkSuite.scala b/spark/src/test/spark-3.x/org/apache/comet/CometStringDecodeSuite.scala similarity index 96% rename from spark/src/test/spark-3.x/org/apache/comet/CometStringDecodeFrameworkSuite.scala rename to spark/src/test/spark-3.x/org/apache/comet/CometStringDecodeSuite.scala index 0ac5930d18..40c238e656 100644 --- a/spark/src/test/spark-3.x/org/apache/comet/CometStringDecodeFrameworkSuite.scala +++ b/spark/src/test/spark-3.x/org/apache/comet/CometStringDecodeSuite.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.execution.{ProjectExec, SparkPlan} import org.apache.comet.CometSparkSessionExtensions.isSpark40Plus -class CometStringDecodeFrameworkSuite extends CometTestBase { +class CometStringDecodeSuite extends CometTestBase { private def countSparkProjectExec(plan: SparkPlan): Int = plan.collect { case _: ProjectExec => true }.length diff --git a/spark/src/test/spark-4.x/org/apache/comet/CometWidthBucketFrameworkSuite.scala b/spark/src/test/spark-4.x/org/apache/comet/CometWidthBucketSuite.scala similarity index 96% rename from spark/src/test/spark-4.x/org/apache/comet/CometWidthBucketFrameworkSuite.scala rename to spark/src/test/spark-4.x/org/apache/comet/CometWidthBucketSuite.scala index 264a4ac430..c2a0034a01 100644 --- a/spark/src/test/spark-4.x/org/apache/comet/CometWidthBucketFrameworkSuite.scala +++ b/spark/src/test/spark-4.x/org/apache/comet/CometWidthBucketSuite.scala @@ -22,7 +22,7 @@ package org.apache.comet import org.apache.spark.sql.CometTestBase import org.apache.spark.sql.execution.{ProjectExec, SparkPlan} -class CometWidthBucketFrameworkSuite extends CometTestBase { +class CometWidthBucketSuite extends CometTestBase { private def countSparkProjectExec(plan: SparkPlan): Int = plan.collect { case _: ProjectExec => true }.length