Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions spark/src/main/scala/org/apache/comet/serde/aggregates.scala
Original file line number Diff line number Diff line change
Expand Up @@ -205,8 +205,6 @@ object CometAverage extends CometAggregateExpressionSerde[Average] {

object CometSum extends CometAggregateExpressionSerde[Sum] {

override def getIncompatibleReasons(): Seq[String] = Seq("Falls back to Spark in ANSI mode.")

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we support ansi mode so no need to fallback to spark

override def convert(
aggExpr: AggregateExpression,
sum: Sum,
Expand Down
16 changes: 15 additions & 1 deletion spark/src/main/scala/org/apache/comet/serde/strings.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import org.apache.comet.serde.QueryPlanSerde.{createBinaryExpr, exprToProtoInter

object CometStringRepeat extends CometExpressionSerde[StringRepeat] {

override def getCompatibleNotes(): Seq[String] = Seq(
override def getIncompatibleReasons(): Seq[String] = Seq(
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is an approved PR to fix the behavior here

#4017

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR has now been merged

"A negative argument for the number of times to repeat throws an exception" +
" instead of returning an empty string as Spark does")

Expand All @@ -42,13 +42,27 @@ object CometStringRepeat extends CometExpressionSerde[StringRepeat] {
inputs: Seq[Attribute],
binding: Boolean): Option[ExprOuterClass.Expr] = {
val children = expr.children

children(1) match {
case Literal(count, _) if isNegativeNumber(count) =>
withInfo(expr, "Negative repeat count is not supported")
return None
case _ =>
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will guard against negative repeat character but still goes through for positive counts with an error message . getIncompatibleReasons() is to add the right error message on documentation

}

val leftCast = Cast(children(0), StringType)
val rightCast = Cast(children(1), LongType)
val leftExpr = exprToProtoInternal(leftCast, inputs, binding)
val rightExpr = exprToProtoInternal(rightCast, inputs, binding)
val optExpr = scalarFunctionExprToProto("repeat", leftExpr, rightExpr)
optExprWithInfo(optExpr, expr, leftCast, rightCast)
}

private def isNegativeNumber(value: Any): Boolean = value match {
case i: Int => i < 0
case l: Long => l < 0
case _ => false
}
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added this method to be utilized later but can remove it if it is not needed

}

class CometCaseConversionBase[T <: Expression](function: String)
Expand Down
44 changes: 30 additions & 14 deletions spark/src/main/scala/org/apache/comet/serde/unixtime.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,27 @@ import org.apache.comet.serde.QueryPlanSerde.{exprToProtoInternal, optExprWithIn
// https://github.com/apache/datafusion/issues/16594
object CometFromUnixTime extends CometExpressionSerde[FromUnixTime] {

override def getUnsupportedReasons(): Seq[String] = Seq(
"Only supports the default datetime format pattern `yyyy-MM-dd HH:mm:ss`")

override def getIncompatibleReasons(): Seq[String] = Seq(
"Only supports the default datetime format pattern `yyyy-MM-dd HH:mm:ss`." +
" DataFusion's valid timestamp range differs from Spark" +
"DataFusion's valid timestamp range differs from Spark" +
" (https://github.com/apache/datafusion/issues/16594)")

override def getSupportLevel(expr: FromUnixTime): SupportLevel = Incompatible(None)
override def getSupportLevel(expr: FromUnixTime): SupportLevel = {
expr.format match {
case Literal(fmt, _) =>
val formatStr = fmt.toString
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Possible NPE if fmt is null

val defaultPattern = TimestampFormatter.defaultPattern
if (formatStr == defaultPattern) {
Incompatible(None)
} else {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fallback based on specific pattern

Unsupported(Some(s"Datetime pattern format: $formatStr is unsupported"))
}
case _ =>
Unsupported(Some("Datetime pattern format is unsupported"))
}
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to leverage the right reason to fallback based on the pattern here

}

override def convert(
expr: FromUnixTime,
Expand All @@ -48,17 +63,18 @@ object CometFromUnixTime extends CometExpressionSerde[FromUnixTime] {
val formatExpr = exprToProtoInternal(Literal("%Y-%m-%d %H:%M:%S"), inputs, binding)
val timeZone = exprToProtoInternal(Literal(expr.timeZoneId.orNull), inputs, binding)

if (expr.format != Literal(TimestampFormatter.defaultPattern)) {
withInfo(expr, "Datetime pattern format is unsupported")
None
} else if (secExpr.isDefined && formatExpr.isDefined) {
val timestampExpr =
scalarFunctionExprToProto("from_unixtime", Seq(secExpr, timeZone): _*)
val optExpr = scalarFunctionExprToProto("to_char", Seq(timestampExpr, formatExpr): _*)
optExprWithInfo(optExpr, expr, expr.sec, expr.format)
} else {
withInfo(expr, expr.sec, expr.format)
None
expr.format match {
case Literal(fmt, _) if fmt.toString != TimestampFormatter.defaultPattern =>
withInfo(expr, "Datetime pattern format is unsupported")
None
case _ if secExpr.isDefined && formatExpr.isDefined =>
val timestampExpr =
scalarFunctionExprToProto("from_unixtime", Seq(secExpr, timeZone): _*)
val optExpr = scalarFunctionExprToProto("to_char", Seq(timestampExpr, formatExpr): _*)
optExprWithInfo(optExpr, expr, expr.sec, expr.format)
case _ =>
withInfo(expr, expr.sec, expr.format)
None
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@ INSERT INTO test_from_unix_time VALUES (0), (1718451045), (-1), (NULL), (2147483
query expect_fallback(not fully compatible with Spark)
SELECT from_unixtime(t) FROM test_from_unix_time

query expect_fallback(not fully compatible with Spark)
query expect_fallback(Datetime pattern format: yyyy-MM-dd is unsupported)
SELECT from_unixtime(t, 'yyyy-MM-dd') FROM test_from_unix_time
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

error / fallback reason is more actionable


-- literal arguments
query expect_fallback(not fully compatible with Spark)
SELECT from_unixtime(0)

query expect_fallback(not fully compatible with Spark)
query expect_fallback(Datetime pattern format: yyyy-MM-dd is unsupported)
SELECT from_unixtime(1718451045, 'yyyy-MM-dd')
Loading