Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion .github/workflows/pr_build_linux.yml
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,10 @@ jobs:
value: |
org.apache.spark.sql.CometToPrettyStringSuite
org.apache.spark.sql.CometCollationSuite
org.apache.comet.CometWidthBucketSuite
- name: "string-decode"
value: |
org.apache.comet.CometStringDecodeSuite
fail-fast: false
name: ${{ matrix.profile.name }}/${{ matrix.profile.scan_impl }} [${{ matrix.suite.name }}]
runs-on: ${{ github.repository_owner == 'apache' && format('runs-on={0},family=m8a+m7a+c8a,cpu=16,image=ubuntu24-full-x64,extras=s3-cache,disk=large,tag=datafusion-comet', github.run_id) || 'ubuntu-latest' }}
Expand Down Expand Up @@ -422,7 +426,7 @@ jobs:
uses: ./.github/actions/java-test
with:
artifact_name: ${{ matrix.profile.name }}-${{ matrix.suite.name }}-${{ github.run_id }}-${{ github.run_number }}-${{ github.run_attempt }}-${{ matrix.profile.scan_impl }}
suites: ${{ matrix.suite.name == 'sql' && matrix.profile.name == 'Spark 3.4, JDK 11, Scala 2.12' && '' || matrix.suite.value }}
suites: ${{ ((matrix.suite.name == 'sql' && matrix.profile.name == 'Spark 3.4, JDK 11, Scala 2.12') || (matrix.suite.name == 'string-decode-framework' && (matrix.profile.name == 'Spark 4.0, JDK 17' || matrix.profile.name == 'Spark 4.1, JDK 17'))) && '' || matrix.suite.value }}
maven_opts: ${{ matrix.profile.maven_opts }}
scan_impl: ${{ matrix.profile.scan_impl }}
upload-test-reports: true
Expand Down
5 changes: 5 additions & 0 deletions .github/workflows/pr_build_macos.yml
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,11 @@ jobs:
value: |
org.apache.spark.sql.CometToPrettyStringSuite
org.apache.spark.sql.CometCollationSuite
org.apache.comet.CometWidthBucketSuite
# The macOS workflow only runs Spark 4.x profiles. The suite
# `org.apache.comet.CometStringDecodeSuite` exists only on Spark 3.x and is run
# on the Linux workflow. Referenced here in a YAML comment so dev/ci/check-suites.py
# finds the class name; not executed on macOS.

fail-fast: false
name: ${{ matrix.os }}/${{ matrix.profile.name }} [${{ matrix.suite.name }}]
Expand Down
149 changes: 84 additions & 65 deletions spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
Original file line number Diff line number Diff line change
Expand Up @@ -89,53 +89,63 @@ object QueryPlanSerde extends Logging with CometExprShim with CometTypeShim {
classOf[Not] -> CometNot,
classOf[Or] -> CometOr)

private[comet] val mathExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map(
classOf[Acos] -> CometScalarFunction("acos"),
classOf[Add] -> CometAdd,
classOf[Asin] -> CometScalarFunction("asin"),
classOf[Atan] -> CometScalarFunction("atan"),
classOf[Atan2] -> CometAtan2,
classOf[Ceil] -> CometCeil,
classOf[Cos] -> CometScalarFunction("cos"),
classOf[Cosh] -> CometScalarFunction("cosh"),
classOf[Divide] -> CometDivide,
classOf[Exp] -> CometScalarFunction("exp"),
classOf[Expm1] -> CometScalarFunction("expm1"),
classOf[Floor] -> CometFloor,
classOf[Hex] -> CometHex,
classOf[IntegralDivide] -> CometIntegralDivide,
classOf[IsNaN] -> CometIsNaN,
classOf[Log] -> CometLog,
classOf[Log2] -> CometLog2,
classOf[Log10] -> CometLog10,
classOf[Logarithm] -> CometLogarithm,
classOf[Multiply] -> CometMultiply,
classOf[Pow] -> CometScalarFunction("pow"),
classOf[Rand] -> CometRand,
classOf[Randn] -> CometRandn,
classOf[Remainder] -> CometRemainder,
classOf[Round] -> CometRound,
classOf[Signum] -> CometScalarFunction("signum"),
classOf[Sin] -> CometScalarFunction("sin"),
classOf[Sinh] -> CometScalarFunction("sinh"),
classOf[Sqrt] -> CometScalarFunction("sqrt"),
classOf[Subtract] -> CometSubtract,
classOf[Tan] -> CometScalarFunction("tan"),
classOf[Tanh] -> CometScalarFunction("tanh"),
classOf[Cot] -> CometScalarFunction("cot"),
classOf[UnaryMinus] -> CometUnaryMinus,
classOf[Unhex] -> CometUnhex,
classOf[Abs] -> CometAbs,
classOf[Bin] -> CometScalarFunction("bin"))

private[comet] val mapExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map(
classOf[GetMapValue] -> CometMapExtract,
classOf[MapKeys] -> CometMapKeys,
classOf[MapEntries] -> CometMapEntries,
classOf[MapValues] -> CometMapValues,
classOf[MapFromArrays] -> CometMapFromArrays,
classOf[MapContainsKey] -> CometMapContainsKey,
classOf[MapFromEntries] -> CometMapFromEntries)
private[comet] val mathExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = {
// Explicit type ascription on `base`: Scala 2.13 cannot infer the existential key type
// when `++` is applied directly to a `Map(...)` literal.
val base: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map(
classOf[Acos] -> CometScalarFunction("acos"),
classOf[Add] -> CometAdd,
classOf[Asin] -> CometScalarFunction("asin"),
classOf[Atan] -> CometScalarFunction("atan"),
classOf[Atan2] -> CometAtan2,
classOf[Ceil] -> CometCeil,
classOf[Cos] -> CometScalarFunction("cos"),
classOf[Cosh] -> CometScalarFunction("cosh"),
classOf[Divide] -> CometDivide,
classOf[Exp] -> CometScalarFunction("exp"),
classOf[Expm1] -> CometScalarFunction("expm1"),
classOf[Floor] -> CometFloor,
classOf[Hex] -> CometHex,
classOf[IntegralDivide] -> CometIntegralDivide,
classOf[IsNaN] -> CometIsNaN,
classOf[Log] -> CometLog,
classOf[Log2] -> CometLog2,
classOf[Log10] -> CometLog10,
classOf[Logarithm] -> CometLogarithm,
classOf[Multiply] -> CometMultiply,
classOf[Pow] -> CometScalarFunction("pow"),
classOf[Rand] -> CometRand,
classOf[Randn] -> CometRandn,
classOf[Remainder] -> CometRemainder,
classOf[Round] -> CometRound,
classOf[Signum] -> CometScalarFunction("signum"),
classOf[Sin] -> CometScalarFunction("sin"),
classOf[Sinh] -> CometScalarFunction("sinh"),
classOf[Sqrt] -> CometScalarFunction("sqrt"),
classOf[Subtract] -> CometSubtract,
classOf[Tan] -> CometScalarFunction("tan"),
classOf[Tanh] -> CometScalarFunction("tanh"),
classOf[Cot] -> CometScalarFunction("cot"),
classOf[UnaryMinus] -> CometUnaryMinus,
classOf[Unhex] -> CometUnhex,
classOf[Abs] -> CometAbs,
classOf[Bin] -> CometScalarFunction("bin"))
base ++ sparkVersionSpecificMathExpressions
}

private[comet] val mapExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = {
// Explicit type ascription on `base`: Scala 2.13 cannot infer the existential key type
// when `++` is applied directly to a `Map(...)` literal.
val base: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map(
classOf[GetMapValue] -> CometMapExtract,
classOf[MapKeys] -> CometMapKeys,
classOf[MapEntries] -> CometMapEntries,
classOf[MapValues] -> CometMapValues,
classOf[MapFromArrays] -> CometMapFromArrays,
classOf[MapContainsKey] -> CometMapContainsKey,
classOf[MapFromEntries] -> CometMapFromEntries)
base ++ sparkVersionSpecificMapExpressions
}

private[comet] val structExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] =
Map(
Expand All @@ -154,8 +164,10 @@ object QueryPlanSerde extends Logging with CometExprShim with CometTypeShim {
classOf[XxHash64] -> CometXxHash64,
classOf[Sha1] -> CometSha1)

private[comet] val stringExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] =
Map(
private[comet] val stringExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = {
// Explicit type ascription on `base`: Scala 2.13 cannot infer the existential key type
// when `++` is applied directly to a `Map(...)` literal.
val base: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map(
classOf[Ascii] -> CometScalarFunction("ascii"),
classOf[BitLength] -> CometScalarFunction("bit_length"),
classOf[Chr] -> CometScalarFunction("char"),
Expand Down Expand Up @@ -189,6 +201,8 @@ object QueryPlanSerde extends Logging with CometExprShim with CometTypeShim {
classOf[Right] -> CometRight,
classOf[Substring] -> CometSubstring,
classOf[Upper] -> CometUpper)
base ++ sparkVersionSpecificStringExpressions
}

private val bitwiseExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map(
classOf[BitwiseAnd] -> CometBitwiseAnd,
Expand Down Expand Up @@ -232,22 +246,27 @@ object QueryPlanSerde extends Logging with CometExprShim with CometTypeShim {
private val conversionExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map(
classOf[Cast] -> CometCast)

private[comet] val miscExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map(
private[comet] val miscExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = {
// TODO PromotePrecision
classOf[Alias] -> CometAlias,
classOf[AttributeReference] -> CometAttributeReference,
classOf[BloomFilterMightContain] -> CometBloomFilterMightContain,
classOf[CheckOverflow] -> CometCheckOverflow,
classOf[Coalesce] -> CometCoalesce,
classOf[KnownFloatingPointNormalized] -> CometKnownFloatingPointNormalized,
classOf[Literal] -> CometLiteral,
classOf[MakeDecimal] -> CometMakeDecimal,
classOf[MonotonicallyIncreasingID] -> CometMonotonicallyIncreasingId,
classOf[ScalarSubquery] -> CometScalarSubquery,
classOf[SparkPartitionID] -> CometSparkPartitionId,
classOf[SortOrder] -> CometSortOrder,
classOf[StaticInvoke] -> CometStaticInvoke,
classOf[UnscaledValue] -> CometUnscaledValue)
// Explicit type ascription on `base`: Scala 2.13 cannot infer the existential key type
// when `++` is applied directly to a `Map(...)` literal.
val base: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map(
classOf[Alias] -> CometAlias,
classOf[AttributeReference] -> CometAttributeReference,
classOf[BloomFilterMightContain] -> CometBloomFilterMightContain,
classOf[CheckOverflow] -> CometCheckOverflow,
classOf[Coalesce] -> CometCoalesce,
classOf[KnownFloatingPointNormalized] -> CometKnownFloatingPointNormalized,
classOf[Literal] -> CometLiteral,
classOf[MakeDecimal] -> CometMakeDecimal,
classOf[MonotonicallyIncreasingID] -> CometMonotonicallyIncreasingId,
classOf[ScalarSubquery] -> CometScalarSubquery,
classOf[SparkPartitionID] -> CometSparkPartitionId,
classOf[SortOrder] -> CometSortOrder,
classOf[StaticInvoke] -> CometStaticInvoke,
classOf[UnscaledValue] -> CometUnscaledValue)
base ++ sparkVersionSpecificMiscExpressions
}

/**
* Mapping of Spark expression class to Comet expression handler.
Expand Down Expand Up @@ -649,7 +668,7 @@ object QueryPlanSerde extends Logging with CometExprShim with CometTypeShim {
}
}

versionSpecificExprToProtoInternal(expr, inputs, binding)
sparkVersionSpecificExprToProtoInternal(expr, inputs, binding)
.orElse(expr match {

case UnaryExpression(child) if expr.prettyName == "promote_precision" =>
Expand Down
26 changes: 14 additions & 12 deletions spark/src/main/spark-3.4/org/apache/comet/shims/CometExprShim.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate.Sum

import org.apache.comet.expressions.CometEvalMode
import org.apache.comet.serde.CommonStringExprs
import org.apache.comet.serde.{CometExpressionSerde, CometStringDecode, CommonStringExprs}
import org.apache.comet.serde.ExprOuterClass.{BinaryOutputStyle, Expr}

/**
Expand All @@ -33,20 +33,22 @@ trait CometExprShim extends CommonStringExprs {
protected def evalMode(c: Cast): CometEvalMode.Value =
CometEvalModeUtil.fromSparkEvalMode(c.evalMode)

protected def binaryOutputStyle: BinaryOutputStyle = BinaryOutputStyle.HEX_DISCRETE
def binaryOutputStyle: BinaryOutputStyle = BinaryOutputStyle.HEX_DISCRETE

def versionSpecificExprToProtoInternal(
def sparkVersionSpecificStringExpressions
: Map[Class[_ <: Expression], CometExpressionSerde[_]] =
Map(classOf[StringDecode] -> CometStringDecode)
def sparkVersionSpecificMathExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] =
Map.empty
def sparkVersionSpecificMiscExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] =
Map.empty
def sparkVersionSpecificMapExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] =
Map.empty

def sparkVersionSpecificExprToProtoInternal(
expr: Expression,
inputs: Seq[Attribute],
binding: Boolean): Option[Expr] = {
expr match {
case s: StringDecode =>
// Right child is the encoding expression.
stringDecode(expr, s.charset, s.bin, inputs, binding)

case _ => None
}
}
binding: Boolean): Option[Expr] = None
}

object CometEvalModeUtil {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.comet.serde

import org.apache.spark.sql.catalyst.expressions.{Attribute, ToPrettyString}
import org.apache.spark.sql.types.DataTypes

import org.apache.comet.CometSparkSessionExtensions.withInfo
import org.apache.comet.expressions.{CometCast, CometEvalMode}
import org.apache.comet.serde.QueryPlanSerde.{binaryOutputStyle, exprToProtoInternal}

object CometToPrettyString extends CometExpressionSerde[ToPrettyString] {

override def getUnsupportedReasons(): Seq[String] =
Seq("Falls back to Spark when the input type cannot be cast to string.")

override def getSupportLevel(expr: ToPrettyString): SupportLevel = {
CometCast.isSupported(
expr.child.dataType,
DataTypes.StringType,
expr.timeZoneId,
CometEvalMode.TRY) match {
case Compatible(_) | Incompatible(_) => Compatible(None)
case Unsupported(reason) =>
Unsupported(Some(s"Cast to string is unsupported: ${reason.getOrElse("")}"))
}
}

override def convert(
expr: ToPrettyString,
inputs: Seq[Attribute],
binding: Boolean): Option[ExprOuterClass.Expr] = {
exprToProtoInternal(expr.child, inputs, binding) match {
case Some(p) =>
val tps = ExprOuterClass.ToPrettyString
.newBuilder()
.setChild(p)
.setTimezone(expr.timeZoneId.getOrElse("UTC"))
.setBinaryOutputStyle(binaryOutputStyle)
.build()
Some(ExprOuterClass.Expr.newBuilder().setToPrettyString(tps).build())
case _ =>
withInfo(expr, expr.child)
None
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.comet.serde

import org.apache.spark.sql.catalyst.expressions.WidthBucket

object CometWidthBucket extends CometScalarFunction[WidthBucket]("width_bucket")
Loading