From 46244810b7dedfcd8d19805c2ff305b6a0b1b980 Mon Sep 17 00:00:00 2001 From: Parth Chandra Date: Wed, 6 May 2026 10:25:39 -0700 Subject: [PATCH 1/4] feat: implement make_time --- native/core/src/execution/columnar_to_row.rs | 22 +- native/core/src/execution/serde.rs | 1 + native/proto/src/proto/types.proto | 1 + native/spark-expr/src/comet_scalar_funcs.rs | 3 +- .../src/datetime_funcs/make_time.rs | 233 ++++++++++++++++++ native/spark-expr/src/datetime_funcs/mod.rs | 2 + native/spark-expr/src/lib.rs | 4 +- .../apache/comet/serde/QueryPlanSerde.scala | 3 + .../apache/comet/shims/CometExprShim.scala | 13 +- .../apache/comet/shims/CometExprShim.scala | 13 +- .../expressions/datetime/make_time.sql | 128 ++++++++++ 11 files changed, 416 insertions(+), 7 deletions(-) create mode 100644 native/spark-expr/src/datetime_funcs/make_time.rs create mode 100644 spark/src/test/resources/sql-tests/expressions/datetime/make_time.sql diff --git a/native/core/src/execution/columnar_to_row.rs b/native/core/src/execution/columnar_to_row.rs index ec2b633cc1..2f4589cf0c 100644 --- a/native/core/src/execution/columnar_to_row.rs +++ b/native/core/src/execution/columnar_to_row.rs @@ -161,6 +161,7 @@ enum TypedArray<'a> { Float64(&'a Float64Array), Date32(&'a Date32Array), TimestampMicro(&'a TimestampMicrosecondArray), + Time64Nano(&'a Time64NanosecondArray), Decimal128(&'a Decimal128Array, u8), // array + precision String(&'a StringArray), LargeString(&'a LargeStringArray), @@ -200,6 +201,9 @@ impl<'a> TypedArray<'a> { DataType::Timestamp(TimeUnit::Microsecond, _) => Ok(TypedArray::TimestampMicro( downcast_array!(array, TimestampMicrosecondArray)?, )), + DataType::Time64(TimeUnit::Nanosecond) => Ok(TypedArray::Time64Nano( + downcast_array!(array, Time64NanosecondArray)?, + )), DataType::Decimal128(p, _) => Ok(TypedArray::Decimal128( downcast_array!(array, Decimal128Array)?, *p, @@ -267,6 +271,7 @@ impl<'a> TypedArray<'a> { Float64, Date32, TimestampMicro, + Time64Nano, Decimal128, String, LargeString, @@ -295,6 +300,7 @@ impl<'a> TypedArray<'a> { TypedArray::Float64(arr) => arr.value(row_idx).to_bits() as i64, TypedArray::Date32(arr) => arr.value(row_idx) as i64, TypedArray::TimestampMicro(arr) => arr.value(row_idx), + TypedArray::Time64Nano(arr) => arr.value(row_idx), TypedArray::Decimal128(arr, precision) if *precision <= MAX_LONG_DIGITS => { arr.value(row_idx) as i64 } @@ -317,7 +323,8 @@ impl<'a> TypedArray<'a> { | TypedArray::Float32(_) | TypedArray::Float64(_) | TypedArray::Date32(_) - | TypedArray::TimestampMicro(_) => false, + | TypedArray::TimestampMicro(_) + | TypedArray::Time64Nano(_) => false, TypedArray::Decimal128(_, precision) => *precision > MAX_LONG_DIGITS, _ => true, } @@ -380,6 +387,7 @@ enum TypedElements<'a> { Float64(&'a Float64Array), Date32(&'a Date32Array), TimestampMicro(&'a TimestampMicrosecondArray), + Time64Nano(&'a Time64NanosecondArray), Decimal128(&'a Decimal128Array, u8), String(&'a StringArray), LargeString(&'a LargeStringArray), @@ -418,6 +426,11 @@ impl<'a> TypedElements<'a> { return TypedElements::TimestampMicro(arr); } } + DataType::Time64(TimeUnit::Nanosecond) => { + if let Some(arr) = array.as_any().downcast_ref::() { + return TypedElements::Time64Nano(arr); + } + } DataType::Decimal128(p, _) => { if let Some(arr) = array.as_any().downcast_ref::() { return TypedElements::Decimal128(arr, *p); @@ -442,6 +455,7 @@ impl<'a> TypedElements<'a> { TypedElements::Int32(_) | TypedElements::Date32(_) | TypedElements::Float32(_) => 4, TypedElements::Int64(_) | TypedElements::TimestampMicro(_) + | TypedElements::Time64Nano(_) | TypedElements::Float64(_) => 8, TypedElements::Decimal128(_, p) if *p <= MAX_LONG_DIGITS => 8, _ => 8, // Variable-length uses 8 bytes for offset+length @@ -460,6 +474,7 @@ impl<'a> TypedElements<'a> { | TypedElements::Float64(_) | TypedElements::Date32(_) | TypedElements::TimestampMicro(_) + | TypedElements::Time64Nano(_) ) } @@ -479,6 +494,7 @@ impl<'a> TypedElements<'a> { Float64, Date32, TimestampMicro, + Time64Nano, Decimal128, String, LargeString, @@ -502,7 +518,8 @@ impl<'a> TypedElements<'a> { | TypedElements::Float32(_) | TypedElements::Float64(_) | TypedElements::Date32(_) - | TypedElements::TimestampMicro(_) => true, + | TypedElements::TimestampMicro(_) + | TypedElements::Time64Nano(_) => true, TypedElements::Decimal128(_, p) => *p <= MAX_LONG_DIGITS, _ => false, } @@ -521,6 +538,7 @@ impl<'a> TypedElements<'a> { TypedElements::Float64(arr) => arr.value(idx).to_bits() as i64, TypedElements::Date32(arr) => arr.value(idx) as i64, TypedElements::TimestampMicro(arr) => arr.value(idx), + TypedElements::Time64Nano(arr) => arr.value(idx), TypedElements::Decimal128(arr, _) => arr.value(idx) as i64, _ => 0, // Should not be called for variable-length types } diff --git a/native/core/src/execution/serde.rs b/native/core/src/execution/serde.rs index 5d60288f68..d6ec6be132 100644 --- a/native/core/src/execution/serde.rs +++ b/native/core/src/execution/serde.rs @@ -96,6 +96,7 @@ pub fn to_arrow_datatype(dt_value: &DataType) -> ArrowDataType { } DataTypeId::TimestampNtz => ArrowDataType::Timestamp(TimeUnit::Microsecond, None), DataTypeId::Date => ArrowDataType::Date32, + DataTypeId::Time => ArrowDataType::Time64(TimeUnit::Nanosecond), DataTypeId::Null => ArrowDataType::Null, DataTypeId::List => match dt_value .type_info diff --git a/native/proto/src/proto/types.proto b/native/proto/src/proto/types.proto index fec972a8f0..df0c0c5553 100644 --- a/native/proto/src/proto/types.proto +++ b/native/proto/src/proto/types.proto @@ -59,6 +59,7 @@ message DataType { LIST = 14; MAP = 15; STRUCT = 16; + TIME = 17; } DataTypeId type_id = 1; diff --git a/native/spark-expr/src/comet_scalar_funcs.rs b/native/spark-expr/src/comet_scalar_funcs.rs index 9ecb11dc52..a914eef255 100644 --- a/native/spark-expr/src/comet_scalar_funcs.rs +++ b/native/spark-expr/src/comet_scalar_funcs.rs @@ -26,7 +26,7 @@ use crate::{ spark_lpad, spark_make_decimal, spark_read_side_padding, spark_round, spark_rpad, spark_unhex, spark_unscaled_value, EvalMode, SparkArrayCompact, SparkArrayPositionFunc, SparkArraysOverlap, SparkContains, SparkDateDiff, SparkDateFromUnixDate, SparkDateTrunc, SparkMakeDate, - SparkSecondsToTimestamp, SparkSizeFunc, + SparkMakeTime, SparkSecondsToTimestamp, SparkSizeFunc, }; use arrow::datatypes::DataType; use datafusion::common::{DataFusionError, Result as DataFusionResult}; @@ -214,6 +214,7 @@ fn all_scalar_functions() -> Vec> { Arc::new(ScalarUDF::new_from_impl(SparkDateFromUnixDate::default())), Arc::new(ScalarUDF::new_from_impl(SparkDateTrunc::default())), Arc::new(ScalarUDF::new_from_impl(SparkMakeDate::default())), + Arc::new(ScalarUDF::new_from_impl(SparkMakeTime::default())), Arc::new(ScalarUDF::new_from_impl(SparkSecondsToTimestamp::default())), Arc::new(ScalarUDF::new_from_impl(SparkSizeFunc::default())), ] diff --git a/native/spark-expr/src/datetime_funcs/make_time.rs b/native/spark-expr/src/datetime_funcs/make_time.rs new file mode 100644 index 0000000000..0f2daf41c0 --- /dev/null +++ b/native/spark-expr/src/datetime_funcs/make_time.rs @@ -0,0 +1,233 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::array::{Array, Decimal128Array, Int32Array, Time64NanosecondArray}; +use arrow::compute::cast; +use arrow::datatypes::{DataType, TimeUnit}; +use datafusion::common::{utils::take_function_args, DataFusionError, Result}; +use datafusion::logical_expr::{ + ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility, +}; +use std::any::Any; +use std::sync::Arc; + +const MICROS_PER_SECOND: i128 = 1_000_000; +const NANOS_PER_MICRO: i64 = 1_000; +const NANOS_PER_SECOND: i64 = 1_000_000_000; + +#[derive(Debug, PartialEq, Eq, Hash)] +pub struct SparkMakeTime { + signature: Signature, +} + +impl SparkMakeTime { + pub fn new() -> Self { + Self { + signature: Signature::any(3, Volatility::Immutable), + } + } +} + +impl Default for SparkMakeTime { + fn default() -> Self { + Self::new() + } +} + +/// Converts hours, minutes, and fractional seconds (Decimal(16,6)) to nanoseconds from midnight. +/// Returns an error for invalid inputs (matching Spark's always-throw behavior). +fn make_time(hours: i32, minutes: i32, secs_and_micros_unscaled: i128) -> Result { + let full_secs = secs_and_micros_unscaled.div_euclid(MICROS_PER_SECOND); + let frac_micros = secs_and_micros_unscaled.rem_euclid(MICROS_PER_SECOND); + + if full_secs > i32::MAX as i128 || full_secs < 0 { + return Err(DataFusionError::Execution(format!( + "Invalid value for SecondOfMinute (valid values 0 - 59): {}", + secs_and_micros_unscaled / MICROS_PER_SECOND + ))); + } + + let secs = full_secs as i32; + let nanos = (frac_micros as i64) * NANOS_PER_MICRO; + + if !(0..=23).contains(&hours) { + return Err(DataFusionError::Execution(format!( + "Invalid value for HourOfDay (valid values 0 - 23): {hours}" + ))); + } + if !(0..=59).contains(&minutes) { + return Err(DataFusionError::Execution(format!( + "Invalid value for MinuteOfHour (valid values 0 - 59): {minutes}" + ))); + } + if !(0..=59).contains(&secs) { + return Err(DataFusionError::Execution(format!( + "Invalid value for SecondOfMinute (valid values 0 - 59): {secs}" + ))); + } + + let total_nanos = + hours as i64 * 3_600 * NANOS_PER_SECOND + minutes as i64 * 60 * NANOS_PER_SECOND + secs as i64 * NANOS_PER_SECOND + nanos; + + Ok(total_nanos) +} + +impl ScalarUDFImpl for SparkMakeTime { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "make_time" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, _: &[DataType]) -> Result { + Ok(DataType::Time64(TimeUnit::Nanosecond)) + } + + fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { + let [hours, minutes, secs_and_micros] = take_function_args(self.name(), args.args)?; + + let num_rows = [&hours, &minutes, &secs_and_micros] + .iter() + .find_map(|arg| match arg { + ColumnarValue::Array(array) => Some(array.len()), + ColumnarValue::Scalar(_) => None, + }) + .unwrap_or(1); + + let hours_arr = hours.into_array(num_rows)?; + let minutes_arr = minutes.into_array(num_rows)?; + let secs_arr = secs_and_micros.into_array(num_rows)?; + + let hours_arr = cast_to_int32(&hours_arr)?; + let minutes_arr = cast_to_int32(&minutes_arr)?; + + let hours_array = hours_arr.as_any().downcast_ref::().ok_or_else(|| { + DataFusionError::Execution("make_time: failed to cast hours to Int32".to_string()) + })?; + + let minutes_array = + minutes_arr + .as_any() + .downcast_ref::() + .ok_or_else(|| { + DataFusionError::Execution( + "make_time: failed to cast minutes to Int32".to_string(), + ) + })?; + + let secs_array = secs_arr + .as_any() + .downcast_ref::() + .ok_or_else(|| { + DataFusionError::Execution( + "make_time: expected Decimal128 for seconds argument".to_string(), + ) + })?; + + let len = hours_array.len(); + let mut builder = Time64NanosecondArray::builder(len); + + for i in 0..len { + if hours_array.is_null(i) || minutes_array.is_null(i) || secs_array.is_null(i) { + builder.append_null(); + } else { + let h = hours_array.value(i); + let m = minutes_array.value(i); + let s = secs_array.value(i); + + let nanos = make_time(h, m, s)?; + builder.append_value(nanos); + } + } + + Ok(ColumnarValue::Array(Arc::new(builder.finish()))) + } +} + +fn cast_to_int32(arr: &Arc) -> Result> { + if arr.data_type() == &DataType::Int32 { + Ok(Arc::clone(arr)) + } else { + cast(arr.as_ref(), &DataType::Int32) + .map_err(|e| DataFusionError::Execution(format!("Failed to cast to Int32: {e}"))) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_make_time_valid() { + // Midnight + assert_eq!(make_time(0, 0, 0).unwrap(), 0); + // 1 hour + assert_eq!(make_time(1, 0, 0).unwrap(), 3_600_000_000_000); + // 1 minute + assert_eq!(make_time(0, 1, 0).unwrap(), 60_000_000_000); + // 1 second (unscaled: 1_000_000) + assert_eq!(make_time(0, 0, 1_000_000).unwrap(), 1_000_000_000); + // 1.5 seconds (unscaled: 1_500_000) + assert_eq!(make_time(0, 0, 1_500_000).unwrap(), 1_500_000_000); + // 23:59:59.999999 (unscaled: 59_999_999) + assert_eq!( + make_time(23, 59, 59_999_999).unwrap(), + 86_399_999_999_000 + ); + // 12:30:45.123456 (unscaled: 45_123_456) + assert_eq!( + make_time(12, 30, 45_123_456).unwrap(), + 12 * 3_600_000_000_000 + 30 * 60_000_000_000 + 45_123_456_000 + ); + } + + #[test] + fn test_make_time_invalid_hours() { + assert!(make_time(24, 0, 0).is_err()); + assert!(make_time(25, 0, 0).is_err()); + assert!(make_time(-1, 0, 0).is_err()); + } + + #[test] + fn test_make_time_invalid_minutes() { + assert!(make_time(0, 60, 0).is_err()); + assert!(make_time(0, -1, 0).is_err()); + } + + #[test] + fn test_make_time_invalid_seconds() { + // 60 seconds (unscaled: 60_000_000) + assert!(make_time(0, 0, 60_000_000).is_err()); + // 100.5 seconds (unscaled: 100_500_000) + assert!(make_time(0, 0, 100_500_000).is_err()); + // negative seconds (unscaled: -1_000_000) + assert!(make_time(0, 0, -1_000_000).is_err()); + } + + #[test] + fn test_make_time_overflow_seconds() { + // Very large value that overflows i32 + let large = (i32::MAX as i128 + 1) * MICROS_PER_SECOND; + assert!(make_time(0, 0, large).is_err()); + } +} diff --git a/native/spark-expr/src/datetime_funcs/mod.rs b/native/spark-expr/src/datetime_funcs/mod.rs index a94bf16ce2..06cac1d208 100644 --- a/native/spark-expr/src/datetime_funcs/mod.rs +++ b/native/spark-expr/src/datetime_funcs/mod.rs @@ -21,6 +21,7 @@ mod date_trunc; mod extract_date_part; mod hours; mod make_date; +mod make_time; mod seconds_to_timestamp; mod timestamp_trunc; mod unix_timestamp; @@ -33,6 +34,7 @@ pub use extract_date_part::SparkMinute; pub use extract_date_part::SparkSecond; pub use hours::SparkHoursTransform; pub use make_date::SparkMakeDate; +pub use make_time::SparkMakeTime; pub use seconds_to_timestamp::SparkSecondsToTimestamp; pub use timestamp_trunc::TimestampTruncExpr; pub use unix_timestamp::SparkUnixTimestamp; diff --git a/native/spark-expr/src/lib.rs b/native/spark-expr/src/lib.rs index e0baa131cb..7cf5c9953b 100644 --- a/native/spark-expr/src/lib.rs +++ b/native/spark-expr/src/lib.rs @@ -74,8 +74,8 @@ pub use comet_scalar_funcs::{ pub use csv_funcs::*; pub use datetime_funcs::{ SparkDateDiff, SparkDateFromUnixDate, SparkDateTrunc, SparkHour, SparkHoursTransform, - SparkMakeDate, SparkMinute, SparkSecond, SparkSecondsToTimestamp, SparkUnixTimestamp, - TimestampTruncExpr, + SparkMakeDate, SparkMakeTime, SparkMinute, SparkSecond, SparkSecondsToTimestamp, + SparkUnixTimestamp, TimestampTruncExpr, }; pub use error::{decimal_overflow_error, SparkError, SparkErrorWithContext, SparkResult}; pub use hash_funcs::*; diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 2d138450e9..de1594dcc0 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -360,6 +360,8 @@ object QueryPlanSerde extends Logging with CometExprShim with CometTypeShim { _: DoubleType | _: StringType | _: BinaryType | _: TimestampType | _: TimestampNTZType | _: DecimalType | _: DateType | _: BooleanType | _: NullType => true + case dt if dt.getClass.getSimpleName.startsWith("TimeType") => + true case s: StructType if allowComplex => s.fields.nonEmpty && s.fields.map(_.dataType).forall(supportedDataType(_, allowComplex)) case a: ArrayType if allowComplex => @@ -394,6 +396,7 @@ object QueryPlanSerde extends Logging with CometExprShim with CometTypeShim { case _: ArrayType => 14 case _: MapType => 15 case _: StructType => 16 + case dt if dt.getClass.getSimpleName.startsWith("TimeType") => 17 case dt => logWarning(s"Cannot serialize Spark data type: $dt") return None diff --git a/spark/src/main/spark-4.1/org/apache/comet/shims/CometExprShim.scala b/spark/src/main/spark-4.1/org/apache/comet/shims/CometExprShim.scala index 5e906a0d83..363ad1cfc4 100644 --- a/spark/src/main/spark-4.1/org/apache/comet/shims/CometExprShim.scala +++ b/spark/src/main/spark-4.1/org/apache/comet/shims/CometExprShim.scala @@ -23,9 +23,10 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate.Sum import org.apache.spark.sql.catalyst.expressions.json.StructsToJsonEvaluator import org.apache.spark.sql.catalyst.expressions.objects.{Invoke, StaticInvoke} +import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.types.StringTypeWithCollation -import org.apache.spark.sql.types.{ArrayType, BinaryType, BooleanType, DataTypes, MapType, StringType} +import org.apache.spark.sql.types.{ArrayType, BinaryType, BooleanType, DataTypes, MapType, StringType, TimeType} import org.apache.comet.CometConf import org.apache.comet.CometSparkSessionExtensions.withInfo @@ -92,6 +93,16 @@ trait CometExprShim extends CommonStringExprs { val Seq(bin, charset, _, _) = s.arguments stringDecode(expr, charset, bin, inputs, binding) + case s: StaticInvoke + if s.staticObject == classOf[DateTimeUtils.type] && + s.functionName == "makeTime" && + s.arguments.size == 3 && + s.dataType.isInstanceOf[TimeType] => + val childExprs = s.arguments.map(exprToProtoInternal(_, inputs, binding)) + val optExpr = + scalarFunctionExprToProtoWithReturnType("make_time", s.dataType, true, childExprs: _*) + optExprWithInfo(optExpr, expr, s.arguments: _*) + case expr @ ToPrettyString(child, timeZoneId) => val castSupported = CometCast.isSupported( child.dataType, diff --git a/spark/src/main/spark-4.2/org/apache/comet/shims/CometExprShim.scala b/spark/src/main/spark-4.2/org/apache/comet/shims/CometExprShim.scala index 5e906a0d83..363ad1cfc4 100644 --- a/spark/src/main/spark-4.2/org/apache/comet/shims/CometExprShim.scala +++ b/spark/src/main/spark-4.2/org/apache/comet/shims/CometExprShim.scala @@ -23,9 +23,10 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate.Sum import org.apache.spark.sql.catalyst.expressions.json.StructsToJsonEvaluator import org.apache.spark.sql.catalyst.expressions.objects.{Invoke, StaticInvoke} +import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.types.StringTypeWithCollation -import org.apache.spark.sql.types.{ArrayType, BinaryType, BooleanType, DataTypes, MapType, StringType} +import org.apache.spark.sql.types.{ArrayType, BinaryType, BooleanType, DataTypes, MapType, StringType, TimeType} import org.apache.comet.CometConf import org.apache.comet.CometSparkSessionExtensions.withInfo @@ -92,6 +93,16 @@ trait CometExprShim extends CommonStringExprs { val Seq(bin, charset, _, _) = s.arguments stringDecode(expr, charset, bin, inputs, binding) + case s: StaticInvoke + if s.staticObject == classOf[DateTimeUtils.type] && + s.functionName == "makeTime" && + s.arguments.size == 3 && + s.dataType.isInstanceOf[TimeType] => + val childExprs = s.arguments.map(exprToProtoInternal(_, inputs, binding)) + val optExpr = + scalarFunctionExprToProtoWithReturnType("make_time", s.dataType, true, childExprs: _*) + optExprWithInfo(optExpr, expr, s.arguments: _*) + case expr @ ToPrettyString(child, timeZoneId) => val castSupported = CometCast.isSupported( child.dataType, diff --git a/spark/src/test/resources/sql-tests/expressions/datetime/make_time.sql b/spark/src/test/resources/sql-tests/expressions/datetime/make_time.sql new file mode 100644 index 0000000000..db9fda43d9 --- /dev/null +++ b/spark/src/test/resources/sql-tests/expressions/datetime/make_time.sql @@ -0,0 +1,128 @@ +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +-- MinSparkVersion: 4.1 + +statement +CREATE TABLE test_make_time(hours int, minutes int, secs decimal(16,6)) USING parquet + +statement +INSERT INTO test_make_time VALUES + (0, 0, 0.000000), + (12, 30, 45.123456), + (23, 59, 59.999999), + (1, 2, 3.500000), + (0, 0, 0.000001), + (NULL, 0, 0.000000), + (12, NULL, 30.000000), + (12, 30, NULL), + (NULL, NULL, NULL) + +-- column arguments +query +SELECT hours, minutes, secs, make_time(hours, minutes, secs) FROM test_make_time ORDER BY hours, minutes, secs + +-- literal hour, column minutes and secs +query +SELECT make_time(10, minutes, secs) FROM test_make_time ORDER BY minutes, secs + +-- column hours, literal minutes and secs +query +SELECT make_time(hours, 15, 30.5) FROM test_make_time ORDER BY hours + +-- all literals +query +SELECT make_time(0, 0, 0) + +query +SELECT make_time(12, 30, 45.123456) + +query +SELECT make_time(23, 59, 59.999999) + +-- midnight +query +SELECT make_time(0, 0, 0.0) + +-- one microsecond after midnight +query +SELECT make_time(0, 0, 0.000001) + +-- end of day +query +SELECT make_time(23, 59, 59.999999) + +-- null handling with literals +query +SELECT make_time(NULL, 0, 0) + +query +SELECT make_time(12, NULL, 0) + +query +SELECT make_time(12, 30, NULL) + +-- integer seconds (implicit cast to decimal) +query +SELECT make_time(10, 20, 30) + +query +SELECT make_time(1, 2, 0) + +-- boundary valid values +query +SELECT make_time(0, 0, 0) + +query +SELECT make_time(23, 0, 0) + +query +SELECT make_time(0, 59, 0) + +query +SELECT make_time(0, 0, 59.999999) + +-- invalid hours - should throw error +query expect_error(HourOfDay) +SELECT make_time(24, 0, 0) + +query expect_error(HourOfDay) +SELECT make_time(25, 2, 23.5) + +query expect_error(HourOfDay) +SELECT make_time(-1, 0, 0) + +-- invalid minutes - should throw error +query expect_error(MinuteOfHour) +SELECT make_time(12, 60, 0) + +query expect_error(MinuteOfHour) +SELECT make_time(23, -1, 23.5) + +-- invalid seconds - should throw error +query expect_error(SecondOfMinute) +SELECT make_time(12, 30, 60.0) + +query expect_error(SecondOfMinute) +SELECT make_time(23, 12, 100.5) + +query expect_error(SecondOfMinute) +SELECT make_time(0, 0, -1.0) + +-- overflow seconds +query expect_error(SecondOfMinute) +SELECT make_time(1, 18, 4294967297.999999) From 965a642bd34ed2387a59ca4da2247b96d235c493 Mon Sep 17 00:00:00 2001 From: Parth Chandra Date: Wed, 6 May 2026 17:41:15 -0700 Subject: [PATCH 2/4] feat: implement to_time --- native/spark-expr/src/comet_scalar_funcs.rs | 12 +- native/spark-expr/src/datetime_funcs/mod.rs | 2 + .../spark-expr/src/datetime_funcs/to_time.rs | 442 ++++++++++++++++++ native/spark-expr/src/lib.rs | 6 +- .../apache/comet/shims/CometExprShim.scala | 22 + .../apache/comet/shims/CometExprShim.scala | 22 + .../expressions/datetime/to_time.sql | 197 ++++++++ 7 files changed, 696 insertions(+), 7 deletions(-) create mode 100644 native/spark-expr/src/datetime_funcs/to_time.rs create mode 100644 spark/src/test/resources/sql-tests/expressions/datetime/to_time.sql diff --git a/native/spark-expr/src/comet_scalar_funcs.rs b/native/spark-expr/src/comet_scalar_funcs.rs index a914eef255..788fac68c9 100644 --- a/native/spark-expr/src/comet_scalar_funcs.rs +++ b/native/spark-expr/src/comet_scalar_funcs.rs @@ -23,10 +23,11 @@ use crate::math_funcs::log::spark_log; use crate::math_funcs::modulo_expr::spark_modulo; use crate::{ spark_ceil, spark_decimal_div, spark_decimal_integral_div, spark_floor, spark_isnan, - spark_lpad, spark_make_decimal, spark_read_side_padding, spark_round, spark_rpad, spark_unhex, - spark_unscaled_value, EvalMode, SparkArrayCompact, SparkArrayPositionFunc, SparkArraysOverlap, - SparkContains, SparkDateDiff, SparkDateFromUnixDate, SparkDateTrunc, SparkMakeDate, - SparkMakeTime, SparkSecondsToTimestamp, SparkSizeFunc, + spark_lpad, spark_make_decimal, spark_read_side_padding, spark_round, spark_rpad, + spark_to_time, spark_unhex, spark_unscaled_value, EvalMode, SparkArrayCompact, + SparkArrayPositionFunc, SparkArraysOverlap, SparkContains, SparkDateDiff, + SparkDateFromUnixDate, SparkDateTrunc, SparkMakeDate, SparkMakeTime, + SparkSecondsToTimestamp, SparkSizeFunc, }; use arrow::datatypes::DataType; use datafusion::common::{DataFusionError, Result as DataFusionResult}; @@ -196,6 +197,9 @@ pub fn create_comet_physical_fun_with_eval_mode( let func = Arc::new(spark_map_sort); make_comet_scalar_udf!("spark_map_sort", func, without data_type) } + "to_time" => { + make_comet_scalar_udf!("to_time", spark_to_time, without data_type, fail_on_error) + } _ => registry.udf(fun_name).map_err(|e| { DataFusionError::Execution(format!( "Function {fun_name} not found in the registry: {e}", diff --git a/native/spark-expr/src/datetime_funcs/mod.rs b/native/spark-expr/src/datetime_funcs/mod.rs index 06cac1d208..e56f894eaa 100644 --- a/native/spark-expr/src/datetime_funcs/mod.rs +++ b/native/spark-expr/src/datetime_funcs/mod.rs @@ -23,6 +23,7 @@ mod hours; mod make_date; mod make_time; mod seconds_to_timestamp; +mod to_time; mod timestamp_trunc; mod unix_timestamp; @@ -36,5 +37,6 @@ pub use hours::SparkHoursTransform; pub use make_date::SparkMakeDate; pub use make_time::SparkMakeTime; pub use seconds_to_timestamp::SparkSecondsToTimestamp; +pub use to_time::{spark_to_time, to_time_return_type}; pub use timestamp_trunc::TimestampTruncExpr; pub use unix_timestamp::SparkUnixTimestamp; diff --git a/native/spark-expr/src/datetime_funcs/to_time.rs b/native/spark-expr/src/datetime_funcs/to_time.rs new file mode 100644 index 0000000000..eea65358b7 --- /dev/null +++ b/native/spark-expr/src/datetime_funcs/to_time.rs @@ -0,0 +1,442 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::array::{Array, StringArray, Time64NanosecondArray}; +use arrow::datatypes::{DataType, TimeUnit}; +use datafusion::common::{DataFusionError, Result}; +use datafusion::physical_plan::ColumnarValue; +use std::sync::Arc; + +const NANOS_PER_MICRO: i64 = 1_000; +const NANOS_PER_SECOND: i64 = 1_000_000_000; +const NANOS_PER_MINUTE: i64 = 60 * NANOS_PER_SECOND; +const NANOS_PER_HOUR: i64 = 60 * NANOS_PER_MINUTE; + +/// Spark-compatible to_time: parse a string to time (nanoseconds from midnight). +/// When fail_on_error is true (to_time), returns an error for unparseable input. +/// When fail_on_error is false (try_to_time), returns null for unparseable input. +pub fn spark_to_time(args: &[ColumnarValue], fail_on_error: bool) -> Result { + if args.is_empty() { + return Err(DataFusionError::Execution( + "to_time requires at least 1 argument".to_string(), + )); + } + + let num_rows = args + .iter() + .find_map(|arg| match arg { + ColumnarValue::Array(array) => Some(array.len()), + ColumnarValue::Scalar(_) => None, + }) + .unwrap_or(1); + + let str_arr = args[0].clone().into_array(num_rows)?; + let str_array = str_arr.as_any().downcast_ref::().ok_or_else(|| { + DataFusionError::Execution("to_time: expected String argument".to_string()) + })?; + + let len = str_array.len(); + let mut builder = Time64NanosecondArray::builder(len); + + for i in 0..len { + if str_array.is_null(i) { + builder.append_null(); + } else { + let s = str_array.value(i); + match string_to_time(s) { + Some(nanos) => builder.append_value(nanos), + None => { + if fail_on_error { + return Err(DataFusionError::Execution(format!( + "The input string '{}' cannot be parsed to a TIME value", + s + ))); + } + builder.append_null(); + } + } + } + } + + Ok(ColumnarValue::Array(Arc::new(builder.finish()))) +} + +/// Parse a time string to nanoseconds from midnight, matching Spark's stringToTime behavior. +/// Returns None for invalid input. +fn string_to_time(s: &str) -> Option { + let trimmed = s.trim_end(); + if trimmed.is_empty() { + return None; + } + + let bytes = trimmed.as_bytes(); + let num_chars = bytes.len(); + + // Detect AM/PM suffix + let (is_am, is_pm, has_suffix) = if num_chars > 2 { + let last = bytes[num_chars - 1]; + if last == b'M' || last == b'm' { + let second_last = bytes[num_chars - 2]; + let am = second_last == b'A' || second_last == b'a'; + let pm = second_last == b'P' || second_last == b'p'; + (am, pm, am || pm) + } else { + (false, false, false) + } + } else { + (false, false, false) + }; + + // Strip AM/PM suffix (and optional space before it) + let time_str = if has_suffix { + let end = num_chars - 2; + let s = &trimmed[..end]; + s.trim_end() + } else { + trimmed + }; + + // Parse the time components + let (hour, minute, second, micros) = parse_time_components(time_str)?; + + // Validate and convert hours + let hr = if !has_suffix { + if hour > 23 { + return None; + } + hour + } else { + if hour < 1 || hour > 12 { + return None; + } + if is_am { + if hour == 12 { 0 } else { hour } + } else if is_pm { + if hour == 12 { 12 } else { hour + 12 } + } else { + return None; + } + }; + + // Validate minutes and seconds + if minute > 59 || second > 59 { + return None; + } + + let nanos = hr as i64 * NANOS_PER_HOUR + + minute as i64 * NANOS_PER_MINUTE + + second as i64 * NANOS_PER_SECOND + + micros as i64 * NANOS_PER_MICRO; + + Some(nanos) +} + +/// Parse time components from a string like "HH:mm:ss.ffffff" or "T HH:mm:ss". +/// Returns (hour, minute, second, microseconds) or None if invalid. +fn parse_time_components(s: &str) -> Option<(i32, i32, i32, i32)> { + let bytes = s.trim_start().as_bytes(); + if bytes.is_empty() { + return None; + } + + let mut pos = 0; + + // Skip optional 'T' prefix + if bytes[pos] == b'T' { + pos += 1; + } + + // Parse hour + let (hour, new_pos) = parse_digits(bytes, pos)?; + pos = new_pos; + if hour < 0 { + return None; + } + + // Expect ':' + if pos >= bytes.len() || bytes[pos] != b':' { + return None; + } + pos += 1; + + // Parse minute + let (minute, new_pos) = parse_digits(bytes, pos)?; + pos = new_pos; + + // Optional seconds + if pos >= bytes.len() { + return Some((hour, minute, 0, 0)); + } + + if bytes[pos] != b':' { + return None; + } + pos += 1; + + // Parse seconds + let (second, new_pos) = parse_digits(bytes, pos)?; + pos = new_pos; + + // Optional fractional seconds + if pos >= bytes.len() { + return Some((hour, minute, second, 0)); + } + + if bytes[pos] != b'.' { + // No more content allowed (timezone would invalidate) + return None; + } + pos += 1; + + // Parse fractional seconds (up to 6 digits, pad with zeros) + let (micros, new_pos) = parse_fractional(bytes, pos)?; + pos = new_pos; + + // Nothing should follow the fractional seconds (timezone not allowed for time) + if pos < bytes.len() { + return None; + } + + Some((hour, minute, second, micros)) +} + +/// Parse consecutive digits starting at pos. Returns (value, new_pos). +/// At least 1 digit is required. +fn parse_digits(bytes: &[u8], start: usize) -> Option<(i32, usize)> { + let mut pos = start; + let mut value: i32 = 0; + let mut count = 0; + + while pos < bytes.len() { + let b = bytes[pos]; + if b >= b'0' && b <= b'9' { + value = value * 10 + (b - b'0') as i32; + count += 1; + pos += 1; + } else { + break; + } + } + + if count == 0 || count > 2 { + // Hour/minute/second: 1-2 digits + // Exception: we allow 1-2 digits for time components + if count == 0 { + return None; + } + } + + Some((value, pos)) +} + +/// Parse fractional seconds (microseconds). Up to 6 digits, padded with zeros. +/// Returns (microseconds, new_pos). +fn parse_fractional(bytes: &[u8], start: usize) -> Option<(i32, usize)> { + let mut pos = start; + let mut value: i32 = 0; + let mut count = 0; + + while pos < bytes.len() && count < 6 { + let b = bytes[pos]; + if b >= b'0' && b <= b'9' { + value = value * 10 + (b - b'0') as i32; + count += 1; + pos += 1; + } else { + break; + } + } + + if count == 0 { + return None; + } + + // Skip any remaining digits beyond 6 (truncation) + while pos < bytes.len() && bytes[pos] >= b'0' && bytes[pos] <= b'9' { + pos += 1; + } + + // Pad with zeros to 6 digits + while count < 6 { + value *= 10; + count += 1; + } + + Some((value, pos)) +} + +/// Return type for to_time +pub fn to_time_return_type() -> DataType { + DataType::Time64(TimeUnit::Nanosecond) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_basic_time_parsing() { + // HH:mm + assert_eq!(string_to_time("00:00"), Some(0)); + assert_eq!(string_to_time("12:30"), Some(12 * NANOS_PER_HOUR + 30 * NANOS_PER_MINUTE)); + assert_eq!(string_to_time("23:59"), Some(23 * NANOS_PER_HOUR + 59 * NANOS_PER_MINUTE)); + + // HH:mm:ss + assert_eq!( + string_to_time("12:30:45"), + Some(12 * NANOS_PER_HOUR + 30 * NANOS_PER_MINUTE + 45 * NANOS_PER_SECOND) + ); + assert_eq!(string_to_time("00:00:00"), Some(0)); + assert_eq!( + string_to_time("23:59:59"), + Some(23 * NANOS_PER_HOUR + 59 * NANOS_PER_MINUTE + 59 * NANOS_PER_SECOND) + ); + } + + #[test] + fn test_fractional_seconds() { + // 1 digit + assert_eq!( + string_to_time("00:00:00.1"), + Some(100_000 * NANOS_PER_MICRO) + ); + // 3 digits + assert_eq!( + string_to_time("00:00:00.001"), + Some(1_000 * NANOS_PER_MICRO) + ); + // 6 digits + assert_eq!( + string_to_time("00:00:00.000001"), + Some(1 * NANOS_PER_MICRO) + ); + // Full precision + assert_eq!( + string_to_time("23:59:59.999999"), + Some( + 23 * NANOS_PER_HOUR + + 59 * NANOS_PER_MINUTE + + 59 * NANOS_PER_SECOND + + 999_999 * NANOS_PER_MICRO + ) + ); + } + + #[test] + fn test_single_digit_components() { + // Single digit hour, minute, second + assert_eq!( + string_to_time("1:2:3"), + Some(1 * NANOS_PER_HOUR + 2 * NANOS_PER_MINUTE + 3 * NANOS_PER_SECOND) + ); + assert_eq!( + string_to_time("1:2:3.04"), + Some( + 1 * NANOS_PER_HOUR + + 2 * NANOS_PER_MINUTE + + 3 * NANOS_PER_SECOND + + 40_000 * NANOS_PER_MICRO + ) + ); + } + + #[test] + fn test_t_prefix() { + assert_eq!( + string_to_time("T1:02:3.04"), + Some( + 1 * NANOS_PER_HOUR + + 2 * NANOS_PER_MINUTE + + 3 * NANOS_PER_SECOND + + 40_000 * NANOS_PER_MICRO + ) + ); + assert_eq!( + string_to_time("T12:30:45"), + Some(12 * NANOS_PER_HOUR + 30 * NANOS_PER_MINUTE + 45 * NANOS_PER_SECOND) + ); + } + + #[test] + fn test_am_pm() { + // 12:00:00 AM = midnight + assert_eq!(string_to_time("12:00:00 AM"), Some(0)); + // 1:00:00 AM + assert_eq!(string_to_time("1:00:00 AM"), Some(1 * NANOS_PER_HOUR)); + // 11:59:59 AM + assert_eq!( + string_to_time("11:59:59 AM"), + Some(11 * NANOS_PER_HOUR + 59 * NANOS_PER_MINUTE + 59 * NANOS_PER_SECOND) + ); + // 12:00:00 PM = noon + assert_eq!(string_to_time("12:00:00 PM"), Some(12 * NANOS_PER_HOUR)); + // 1:00:00 PM = 13:00 + assert_eq!(string_to_time("1:00:00 PM"), Some(13 * NANOS_PER_HOUR)); + // 11:59:59 PM = 23:59:59 + assert_eq!( + string_to_time("11:59:59 PM"), + Some(23 * NANOS_PER_HOUR + 59 * NANOS_PER_MINUTE + 59 * NANOS_PER_SECOND) + ); + // Case insensitive + assert_eq!(string_to_time("12:00:00 am"), Some(0)); + assert_eq!(string_to_time("12:00:00 pm"), Some(12 * NANOS_PER_HOUR)); + // No space before AM/PM + assert_eq!(string_to_time("12:00:00AM"), Some(0)); + assert_eq!(string_to_time("1:00:00PM"), Some(13 * NANOS_PER_HOUR)); + // With fractional seconds + assert_eq!( + string_to_time("12:59:59.999999 PM"), + Some( + 12 * NANOS_PER_HOUR + + 59 * NANOS_PER_MINUTE + + 59 * NANOS_PER_SECOND + + 999_999 * NANOS_PER_MICRO + ) + ); + } + + #[test] + fn test_invalid_am_pm() { + // Hour 0 invalid in 12-hour format + assert_eq!(string_to_time("0:00:00 AM"), None); + // Hour 13 invalid in 12-hour format + assert_eq!(string_to_time("13:00:00 AM"), None); + assert_eq!(string_to_time("13:00:00 PM"), None); + } + + #[test] + fn test_invalid_inputs() { + assert_eq!(string_to_time(""), None); + assert_eq!(string_to_time(" "), None); + assert_eq!(string_to_time("XYZ"), None); + assert_eq!(string_to_time("24:00:00"), None); + assert_eq!(string_to_time("23:60:00"), None); + assert_eq!(string_to_time("23:00:60"), None); + // Date component present + assert_eq!(string_to_time("2025-03-09 00:00:00"), None); + // Timezone present + assert_eq!(string_to_time("00:01:02 UTC"), None); + // Just digits without separators + assert_eq!(string_to_time("120000"), None); + } + + #[test] + fn test_trailing_whitespace() { + assert_eq!(string_to_time("12:30:45 "), string_to_time("12:30:45")); + assert_eq!(string_to_time("1:00:00 AM "), string_to_time("1:00:00 AM")); + } +} diff --git a/native/spark-expr/src/lib.rs b/native/spark-expr/src/lib.rs index 7cf5c9953b..4b591810af 100644 --- a/native/spark-expr/src/lib.rs +++ b/native/spark-expr/src/lib.rs @@ -73,9 +73,9 @@ pub use comet_scalar_funcs::{ }; pub use csv_funcs::*; pub use datetime_funcs::{ - SparkDateDiff, SparkDateFromUnixDate, SparkDateTrunc, SparkHour, SparkHoursTransform, - SparkMakeDate, SparkMakeTime, SparkMinute, SparkSecond, SparkSecondsToTimestamp, - SparkUnixTimestamp, TimestampTruncExpr, + spark_to_time, to_time_return_type, SparkDateDiff, SparkDateFromUnixDate, SparkDateTrunc, + SparkHour, SparkHoursTransform, SparkMakeDate, SparkMakeTime, SparkMinute, SparkSecond, + SparkSecondsToTimestamp, SparkUnixTimestamp, TimestampTruncExpr, }; pub use error::{decimal_overflow_error, SparkError, SparkErrorWithContext, SparkResult}; pub use hash_funcs::*; diff --git a/spark/src/main/spark-4.1/org/apache/comet/shims/CometExprShim.scala b/spark/src/main/spark-4.1/org/apache/comet/shims/CometExprShim.scala index 363ad1cfc4..b4a412156d 100644 --- a/spark/src/main/spark-4.1/org/apache/comet/shims/CometExprShim.scala +++ b/spark/src/main/spark-4.1/org/apache/comet/shims/CometExprShim.scala @@ -146,6 +146,7 @@ trait CometExprShim extends CommonStringExprs { // In Spark 4.0, StructsToJson is a RuntimeReplaceable whose replacement is // Invoke(Literal(StructsToJsonEvaluator), "evaluate", ...). Reconstruct the // original StructsToJson and recurse so support-level checks apply. + // ToTime (Spark 4.1) resolves to Invoke(Literal(ToTimeParser), "parse", TimeType(), ...). case i: Invoke => (i.targetObject, i.functionName, i.arguments) match { case (Literal(evaluator: StructsToJsonEvaluator, _), "evaluate", Seq(child)) => @@ -153,6 +154,27 @@ trait CometExprShim extends CommonStringExprs { StructsToJson(evaluator.options, child, evaluator.timeZoneId), inputs, binding) + case (Literal(parser: ToTimeParser, _), "parse", args) + if i.dataType.isInstanceOf[TimeType] && parser.fmt.isEmpty => + val childExprs = args.map(exprToProtoInternal(_, inputs, binding)) + val optExpr = + scalarFunctionExprToProtoWithReturnType("to_time", i.dataType, true, childExprs: _*) + optExprWithInfo(optExpr, i, args: _*) + case _ => None + } + + // try_to_time resolves to TryEval(Invoke(Literal(ToTimeParser), "parse", ...)) + case TryEval(i: Invoke) => + (i.targetObject, i.functionName, i.arguments) match { + case (Literal(parser: ToTimeParser, _), "parse", args) + if i.dataType.isInstanceOf[TimeType] && parser.fmt.isEmpty => + val childExprs = args.map(exprToProtoInternal(_, inputs, binding)) + val optExpr = scalarFunctionExprToProtoWithReturnType( + "to_time", + i.dataType, + false, + childExprs: _*) + optExprWithInfo(optExpr, expr, args: _*) case _ => None } diff --git a/spark/src/main/spark-4.2/org/apache/comet/shims/CometExprShim.scala b/spark/src/main/spark-4.2/org/apache/comet/shims/CometExprShim.scala index 363ad1cfc4..b4a412156d 100644 --- a/spark/src/main/spark-4.2/org/apache/comet/shims/CometExprShim.scala +++ b/spark/src/main/spark-4.2/org/apache/comet/shims/CometExprShim.scala @@ -146,6 +146,7 @@ trait CometExprShim extends CommonStringExprs { // In Spark 4.0, StructsToJson is a RuntimeReplaceable whose replacement is // Invoke(Literal(StructsToJsonEvaluator), "evaluate", ...). Reconstruct the // original StructsToJson and recurse so support-level checks apply. + // ToTime (Spark 4.1) resolves to Invoke(Literal(ToTimeParser), "parse", TimeType(), ...). case i: Invoke => (i.targetObject, i.functionName, i.arguments) match { case (Literal(evaluator: StructsToJsonEvaluator, _), "evaluate", Seq(child)) => @@ -153,6 +154,27 @@ trait CometExprShim extends CommonStringExprs { StructsToJson(evaluator.options, child, evaluator.timeZoneId), inputs, binding) + case (Literal(parser: ToTimeParser, _), "parse", args) + if i.dataType.isInstanceOf[TimeType] && parser.fmt.isEmpty => + val childExprs = args.map(exprToProtoInternal(_, inputs, binding)) + val optExpr = + scalarFunctionExprToProtoWithReturnType("to_time", i.dataType, true, childExprs: _*) + optExprWithInfo(optExpr, i, args: _*) + case _ => None + } + + // try_to_time resolves to TryEval(Invoke(Literal(ToTimeParser), "parse", ...)) + case TryEval(i: Invoke) => + (i.targetObject, i.functionName, i.arguments) match { + case (Literal(parser: ToTimeParser, _), "parse", args) + if i.dataType.isInstanceOf[TimeType] && parser.fmt.isEmpty => + val childExprs = args.map(exprToProtoInternal(_, inputs, binding)) + val optExpr = scalarFunctionExprToProtoWithReturnType( + "to_time", + i.dataType, + false, + childExprs: _*) + optExprWithInfo(optExpr, expr, args: _*) case _ => None } diff --git a/spark/src/test/resources/sql-tests/expressions/datetime/to_time.sql b/spark/src/test/resources/sql-tests/expressions/datetime/to_time.sql new file mode 100644 index 0000000000..b4bc198a77 --- /dev/null +++ b/spark/src/test/resources/sql-tests/expressions/datetime/to_time.sql @@ -0,0 +1,197 @@ +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +-- MinSparkVersion: 4.1 + +statement +CREATE TABLE test_to_time(s STRING) USING parquet + +statement +INSERT INTO test_to_time VALUES + ('00:00'), + ('12:30'), + ('23:59'), + ('12:30:45'), + ('00:00:00'), + ('23:59:59'), + ('00:00:00.1'), + ('00:00:00.001'), + ('00:00:00.000001'), + ('23:59:59.999999'), + ('1:2:3'), + ('1:2:3.04'), + ('T12:30:45'), + ('T1:02:3.04'), + ('12:00:00 AM'), + ('1:00:00 AM'), + ('12:00:00 PM'), + ('1:00:00 PM'), + ('11:59:59 PM'), + ('12:59:59.999999 PM'), + ('12:00:00AM'), + ('1:00:00PM'), + (NULL) + +-- column argument: basic time formats +query +SELECT s, to_time(s) FROM test_to_time ORDER BY s + +-- literal HH:mm +query +SELECT to_time('00:00') + +query +SELECT to_time('12:30') + +query +SELECT to_time('23:59') + +-- literal HH:mm:ss +query +SELECT to_time('12:30:45') + +query +SELECT to_time('00:00:00') + +query +SELECT to_time('23:59:59') + +-- fractional seconds +query +SELECT to_time('00:00:00.1') + +query +SELECT to_time('00:00:00.001') + +query +SELECT to_time('00:00:00.000001') + +query +SELECT to_time('23:59:59.999999') + +-- single digit hour/min/sec +query +SELECT to_time('1:2:3') + +query +SELECT to_time('1:2:3.04') + +-- T-prefix +query +SELECT to_time('T12:30:45') + +query +SELECT to_time('T1:02:3.04') + +-- AM/PM +query +SELECT to_time('12:00:00 AM') + +query +SELECT to_time('1:00:00 AM') + +query +SELECT to_time('11:59:59 AM') + +query +SELECT to_time('12:00:00 PM') + +query +SELECT to_time('1:00:00 PM') + +query +SELECT to_time('11:59:59 PM') + +-- AM/PM case insensitive +query +SELECT to_time('12:00:00 am') + +query +SELECT to_time('12:00:00 pm') + +-- AM/PM without space +query +SELECT to_time('12:00:00AM') + +query +SELECT to_time('1:00:00PM') + +-- AM/PM with fractional seconds +query +SELECT to_time('12:59:59.999999 PM') + +-- null input +query +SELECT to_time(NULL) + +-- trailing whitespace +query +SELECT to_time('12:30:45 ') + +-- invalid inputs - should throw error with to_time +query expect_error(cannot be parsed to a TIME value) +SELECT to_time('') + +query expect_error(cannot be parsed to a TIME value) +SELECT to_time('XYZ') + +query expect_error(cannot be parsed to a TIME value) +SELECT to_time('24:00:00') + +query expect_error(cannot be parsed to a TIME value) +SELECT to_time('23:60:00') + +query expect_error(cannot be parsed to a TIME value) +SELECT to_time('23:00:60') + +query expect_error(cannot be parsed to a TIME value) +SELECT to_time('120000') + +-- invalid AM/PM - should throw error +query expect_error(cannot be parsed to a TIME value) +SELECT to_time('0:00:00 AM') + +query expect_error(cannot be parsed to a TIME value) +SELECT to_time('13:00:00 AM') + +query expect_error(cannot be parsed to a TIME value) +SELECT to_time('13:00:00 PM') + +-- try_to_time: returns null for invalid inputs +query +SELECT try_to_time('12:30:45') + +query +SELECT try_to_time('') + +query +SELECT try_to_time('XYZ') + +query +SELECT try_to_time('24:00:00') + +query +SELECT try_to_time('23:60:00') + +query +SELECT try_to_time(NULL) + +query +SELECT try_to_time('0:00:00 AM') + +query +SELECT try_to_time('13:00:00 PM') From 4ccb2285c35ad889c5674f806c89df1021e7a1c5 Mon Sep 17 00:00:00 2001 From: Parth Chandra Date: Thu, 7 May 2026 13:02:22 -0700 Subject: [PATCH 3/4] Fixes for TimeType --- .../apache/spark/sql/comet/util/Utils.scala | 8 +++ docs/source/user-guide/latest/expressions.md | 4 ++ native/core/src/execution/columnar_to_row.rs | 7 +- native/core/src/execution/planner.rs | 3 + native/spark-expr/src/comet_scalar_funcs.rs | 4 +- .../src/datetime_funcs/make_time.rs | 35 +++++----- native/spark-expr/src/datetime_funcs/mod.rs | 4 +- .../spark-expr/src/datetime_funcs/to_time.rs | 66 +++++++++++++------ .../expressions/datetime/make_time.sql | 13 ++-- .../expressions/datetime/to_time.sql | 5 +- 10 files changed, 95 insertions(+), 54 deletions(-) diff --git a/common/src/main/scala/org/apache/spark/sql/comet/util/Utils.scala b/common/src/main/scala/org/apache/spark/sql/comet/util/Utils.scala index 78f2e81c7c..4a048e6b8a 100644 --- a/common/src/main/scala/org/apache/spark/sql/comet/util/Utils.scala +++ b/common/src/main/scala/org/apache/spark/sql/comet/util/Utils.scala @@ -108,6 +108,12 @@ object Utils extends CometTypeShim with Logging { case yi: ArrowType.Interval if yi.getUnit == IntervalUnit.YEAR_MONTH => YearMonthIntervalType() case di: ArrowType.Interval if di.getUnit == IntervalUnit.DAY_TIME => DayTimeIntervalType() + case t: ArrowType.Time if t.getUnit == TimeUnit.NANOSECOND && t.getBitWidth == 64 => + // scalastyle:off classforname + val clazz = Class.forName("org.apache.spark.sql.types.TimeType$") + // scalastyle:on classforname + val module = clazz.getField("MODULE$").get(null) + clazz.getMethod("apply").invoke(module).asInstanceOf[DataType] case _ => throw new UnsupportedOperationException(s"Unsupported data type: ${dt.toString}") } @@ -142,6 +148,8 @@ object Utils extends CometTypeShim with Logging { } case TimestampNTZType => new ArrowType.Timestamp(TimeUnit.MICROSECOND, null) + case dt if dt.getClass.getSimpleName.startsWith("TimeType") => + new ArrowType.Time(TimeUnit.NANOSECOND, 64) case _ => throw new UnsupportedOperationException( s"Unsupported data type: [${dt.getClass.getName}] ${dt.catalogString}") diff --git a/docs/source/user-guide/latest/expressions.md b/docs/source/user-guide/latest/expressions.md index 3842148a43..ed9ed393bf 100644 --- a/docs/source/user-guide/latest/expressions.md +++ b/docs/source/user-guide/latest/expressions.md @@ -120,6 +120,10 @@ of expressions that be disabled. | DayOfYear | `dayofyear` | | WeekOfYear | `weekofyear` | | Quarter | `quarter` | +| MakeDate | `make_date` | +| MakeTime | `make_time` | +| ToTime | `to_time` | +| TryToTime | `try_to_time` | ## Math Expressions diff --git a/native/core/src/execution/columnar_to_row.rs b/native/core/src/execution/columnar_to_row.rs index 2f4589cf0c..ad98d347ff 100644 --- a/native/core/src/execution/columnar_to_row.rs +++ b/native/core/src/execution/columnar_to_row.rs @@ -201,9 +201,10 @@ impl<'a> TypedArray<'a> { DataType::Timestamp(TimeUnit::Microsecond, _) => Ok(TypedArray::TimestampMicro( downcast_array!(array, TimestampMicrosecondArray)?, )), - DataType::Time64(TimeUnit::Nanosecond) => Ok(TypedArray::Time64Nano( - downcast_array!(array, Time64NanosecondArray)?, - )), + DataType::Time64(TimeUnit::Nanosecond) => Ok(TypedArray::Time64Nano(downcast_array!( + array, + Time64NanosecondArray + )?)), DataType::Decimal128(p, _) => Ok(TypedArray::Decimal128( downcast_array!(array, Decimal128Array)?, *p, diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index da81e789bf..5909ad2c72 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -344,6 +344,9 @@ impl PhysicalPlanner { DataType::Map(f, s) => DataType::Map(f, s).try_into()?, DataType::List(f) => DataType::List(f).try_into()?, DataType::Null => ScalarValue::Null, + DataType::Time64(TimeUnit::Nanosecond) => { + ScalarValue::Time64Nanosecond(None) + } dt => { return Err(GeneralError(format!("{dt:?} is not supported in Comet"))) } diff --git a/native/spark-expr/src/comet_scalar_funcs.rs b/native/spark-expr/src/comet_scalar_funcs.rs index 788fac68c9..7108105dcb 100644 --- a/native/spark-expr/src/comet_scalar_funcs.rs +++ b/native/spark-expr/src/comet_scalar_funcs.rs @@ -26,8 +26,8 @@ use crate::{ spark_lpad, spark_make_decimal, spark_read_side_padding, spark_round, spark_rpad, spark_to_time, spark_unhex, spark_unscaled_value, EvalMode, SparkArrayCompact, SparkArrayPositionFunc, SparkArraysOverlap, SparkContains, SparkDateDiff, - SparkDateFromUnixDate, SparkDateTrunc, SparkMakeDate, SparkMakeTime, - SparkSecondsToTimestamp, SparkSizeFunc, + SparkDateFromUnixDate, SparkDateTrunc, SparkMakeDate, SparkMakeTime, SparkSecondsToTimestamp, + SparkSizeFunc, }; use arrow::datatypes::DataType; use datafusion::common::{DataFusionError, Result as DataFusionResult}; diff --git a/native/spark-expr/src/datetime_funcs/make_time.rs b/native/spark-expr/src/datetime_funcs/make_time.rs index 0f2daf41c0..154ef3bf46 100644 --- a/native/spark-expr/src/datetime_funcs/make_time.rs +++ b/native/spark-expr/src/datetime_funcs/make_time.rs @@ -80,8 +80,10 @@ fn make_time(hours: i32, minutes: i32, secs_and_micros_unscaled: i128) -> Result ))); } - let total_nanos = - hours as i64 * 3_600 * NANOS_PER_SECOND + minutes as i64 * 60 * NANOS_PER_SECOND + secs as i64 * NANOS_PER_SECOND + nanos; + let total_nanos = hours as i64 * 3_600 * NANOS_PER_SECOND + + minutes as i64 * 60 * NANOS_PER_SECOND + + secs as i64 * NANOS_PER_SECOND + + nanos; Ok(total_nanos) } @@ -121,19 +123,19 @@ impl ScalarUDFImpl for SparkMakeTime { let hours_arr = cast_to_int32(&hours_arr)?; let minutes_arr = cast_to_int32(&minutes_arr)?; - let hours_array = hours_arr.as_any().downcast_ref::().ok_or_else(|| { - DataFusionError::Execution("make_time: failed to cast hours to Int32".to_string()) - })?; + let hours_array = hours_arr + .as_any() + .downcast_ref::() + .ok_or_else(|| { + DataFusionError::Execution("make_time: failed to cast hours to Int32".to_string()) + })?; - let minutes_array = - minutes_arr - .as_any() - .downcast_ref::() - .ok_or_else(|| { - DataFusionError::Execution( - "make_time: failed to cast minutes to Int32".to_string(), - ) - })?; + let minutes_array = minutes_arr + .as_any() + .downcast_ref::() + .ok_or_else(|| { + DataFusionError::Execution("make_time: failed to cast minutes to Int32".to_string()) + })?; let secs_array = secs_arr .as_any() @@ -190,10 +192,7 @@ mod tests { // 1.5 seconds (unscaled: 1_500_000) assert_eq!(make_time(0, 0, 1_500_000).unwrap(), 1_500_000_000); // 23:59:59.999999 (unscaled: 59_999_999) - assert_eq!( - make_time(23, 59, 59_999_999).unwrap(), - 86_399_999_999_000 - ); + assert_eq!(make_time(23, 59, 59_999_999).unwrap(), 86_399_999_999_000); // 12:30:45.123456 (unscaled: 45_123_456) assert_eq!( make_time(12, 30, 45_123_456).unwrap(), diff --git a/native/spark-expr/src/datetime_funcs/mod.rs b/native/spark-expr/src/datetime_funcs/mod.rs index e56f894eaa..f910ef0ea8 100644 --- a/native/spark-expr/src/datetime_funcs/mod.rs +++ b/native/spark-expr/src/datetime_funcs/mod.rs @@ -23,8 +23,8 @@ mod hours; mod make_date; mod make_time; mod seconds_to_timestamp; -mod to_time; mod timestamp_trunc; +mod to_time; mod unix_timestamp; pub use date_diff::SparkDateDiff; @@ -37,6 +37,6 @@ pub use hours::SparkHoursTransform; pub use make_date::SparkMakeDate; pub use make_time::SparkMakeTime; pub use seconds_to_timestamp::SparkSecondsToTimestamp; -pub use to_time::{spark_to_time, to_time_return_type}; pub use timestamp_trunc::TimestampTruncExpr; +pub use to_time::{spark_to_time, to_time_return_type}; pub use unix_timestamp::SparkUnixTimestamp; diff --git a/native/spark-expr/src/datetime_funcs/to_time.rs b/native/spark-expr/src/datetime_funcs/to_time.rs index eea65358b7..8eecc34c14 100644 --- a/native/spark-expr/src/datetime_funcs/to_time.rs +++ b/native/spark-expr/src/datetime_funcs/to_time.rs @@ -45,9 +45,12 @@ pub fn spark_to_time(args: &[ColumnarValue], fail_on_error: bool) -> Result().ok_or_else(|| { - DataFusionError::Execution("to_time: expected String argument".to_string()) - })?; + let str_array = str_arr + .as_any() + .downcast_ref::() + .ok_or_else(|| { + DataFusionError::Execution("to_time: expected String argument".to_string()) + })?; let len = str_array.len(); let mut builder = Time64NanosecondArray::builder(len); @@ -120,13 +123,21 @@ fn string_to_time(s: &str) -> Option { } hour } else { - if hour < 1 || hour > 12 { + if !(1..=12).contains(&hour) { return None; } if is_am { - if hour == 12 { 0 } else { hour } + if hour == 12 { + 0 + } else { + hour + } } else if is_pm { - if hour == 12 { 12 } else { hour + 12 } + if hour == 12 { + 12 + } else { + hour + 12 + } } else { return None; } @@ -148,7 +159,7 @@ fn string_to_time(s: &str) -> Option { /// Parse time components from a string like "HH:mm:ss.ffffff" or "T HH:mm:ss". /// Returns (hour, minute, second, microseconds) or None if invalid. fn parse_time_components(s: &str) -> Option<(i32, i32, i32, i32)> { - let bytes = s.trim_start().as_bytes(); + let bytes = s.as_bytes(); if bytes.is_empty() { return None; } @@ -223,7 +234,7 @@ fn parse_digits(bytes: &[u8], start: usize) -> Option<(i32, usize)> { while pos < bytes.len() { let b = bytes[pos]; - if b >= b'0' && b <= b'9' { + if b.is_ascii_digit() { value = value * 10 + (b - b'0') as i32; count += 1; pos += 1; @@ -233,11 +244,7 @@ fn parse_digits(bytes: &[u8], start: usize) -> Option<(i32, usize)> { } if count == 0 || count > 2 { - // Hour/minute/second: 1-2 digits - // Exception: we allow 1-2 digits for time components - if count == 0 { - return None; - } + return None; } Some((value, pos)) @@ -252,7 +259,7 @@ fn parse_fractional(bytes: &[u8], start: usize) -> Option<(i32, usize)> { while pos < bytes.len() && count < 6 { let b = bytes[pos]; - if b >= b'0' && b <= b'9' { + if b.is_ascii_digit() { value = value * 10 + (b - b'0') as i32; count += 1; pos += 1; @@ -266,7 +273,7 @@ fn parse_fractional(bytes: &[u8], start: usize) -> Option<(i32, usize)> { } // Skip any remaining digits beyond 6 (truncation) - while pos < bytes.len() && bytes[pos] >= b'0' && bytes[pos] <= b'9' { + while pos < bytes.len() && bytes[pos].is_ascii_digit() { pos += 1; } @@ -292,8 +299,14 @@ mod tests { fn test_basic_time_parsing() { // HH:mm assert_eq!(string_to_time("00:00"), Some(0)); - assert_eq!(string_to_time("12:30"), Some(12 * NANOS_PER_HOUR + 30 * NANOS_PER_MINUTE)); - assert_eq!(string_to_time("23:59"), Some(23 * NANOS_PER_HOUR + 59 * NANOS_PER_MINUTE)); + assert_eq!( + string_to_time("12:30"), + Some(12 * NANOS_PER_HOUR + 30 * NANOS_PER_MINUTE) + ); + assert_eq!( + string_to_time("23:59"), + Some(23 * NANOS_PER_HOUR + 59 * NANOS_PER_MINUTE) + ); // HH:mm:ss assert_eq!( @@ -320,10 +333,7 @@ mod tests { Some(1_000 * NANOS_PER_MICRO) ); // 6 digits - assert_eq!( - string_to_time("00:00:00.000001"), - Some(1 * NANOS_PER_MICRO) - ); + assert_eq!(string_to_time("00:00:00.000001"), Some(1 * NANOS_PER_MICRO)); // Full precision assert_eq!( string_to_time("23:59:59.999999"), @@ -439,4 +449,18 @@ mod tests { assert_eq!(string_to_time("12:30:45 "), string_to_time("12:30:45")); assert_eq!(string_to_time("1:00:00 AM "), string_to_time("1:00:00 AM")); } + + #[test] + fn test_three_digit_components() { + // 3-digit hour/minute/second must be rejected (Spark requires 1-2 digits) + assert_eq!(string_to_time("001:02:03"), None); + assert_eq!(string_to_time("12:001:03"), None); + assert_eq!(string_to_time("12:02:003"), None); + } + + #[test] + fn test_leading_space_with_t_prefix() { + // Leading space before T should be rejected (Spark only right-trims) + assert_eq!(string_to_time(" T12:30"), None); + } } diff --git a/spark/src/test/resources/sql-tests/expressions/datetime/make_time.sql b/spark/src/test/resources/sql-tests/expressions/datetime/make_time.sql index db9fda43d9..7bc1f91e1b 100644 --- a/spark/src/test/resources/sql-tests/expressions/datetime/make_time.sql +++ b/spark/src/test/resources/sql-tests/expressions/datetime/make_time.sql @@ -16,6 +16,7 @@ -- under the License. -- MinSparkVersion: 4.1 +-- Config: spark.sql.timeType.enabled=true statement CREATE TABLE test_make_time(hours int, minutes int, secs decimal(16,6)) USING parquet @@ -32,16 +33,16 @@ INSERT INTO test_make_time VALUES (12, 30, NULL), (NULL, NULL, NULL) --- column arguments -query +-- column arguments (spark_answer_only: shuffle does not support TimeType yet) +query spark_answer_only SELECT hours, minutes, secs, make_time(hours, minutes, secs) FROM test_make_time ORDER BY hours, minutes, secs --- literal hour, column minutes and secs -query +-- literal hour, column minutes and secs (spark_answer_only: shuffle does not support TimeType yet) +query spark_answer_only SELECT make_time(10, minutes, secs) FROM test_make_time ORDER BY minutes, secs --- column hours, literal minutes and secs -query +-- column hours, literal minutes and secs (spark_answer_only: shuffle does not support TimeType yet) +query spark_answer_only SELECT make_time(hours, 15, 30.5) FROM test_make_time ORDER BY hours -- all literals diff --git a/spark/src/test/resources/sql-tests/expressions/datetime/to_time.sql b/spark/src/test/resources/sql-tests/expressions/datetime/to_time.sql index b4bc198a77..b382a820c4 100644 --- a/spark/src/test/resources/sql-tests/expressions/datetime/to_time.sql +++ b/spark/src/test/resources/sql-tests/expressions/datetime/to_time.sql @@ -16,6 +16,7 @@ -- under the License. -- MinSparkVersion: 4.1 +-- Config: spark.sql.timeType.enabled=true statement CREATE TABLE test_to_time(s STRING) USING parquet @@ -46,8 +47,8 @@ INSERT INTO test_to_time VALUES ('1:00:00PM'), (NULL) --- column argument: basic time formats -query +-- column argument: basic time formats (spark_answer_only: shuffle does not support TimeType yet) +query spark_answer_only SELECT s, to_time(s) FROM test_to_time ORDER BY s -- literal HH:mm From 70bd50902949f8a0745e6100c502792b82866342 Mon Sep 17 00:00:00 2001 From: Parth Chandra Date: Thu, 7 May 2026 13:34:41 -0700 Subject: [PATCH 4/4] cleanup and clippy --- native/core/src/execution/columnar_to_row.rs | 16 +++++++++++++++- native/spark-expr/src/datetime_funcs/to_time.rs | 10 +++++----- 2 files changed, 20 insertions(+), 6 deletions(-) diff --git a/native/core/src/execution/columnar_to_row.rs b/native/core/src/execution/columnar_to_row.rs index ad98d347ff..9a3616bef7 100644 --- a/native/core/src/execution/columnar_to_row.rs +++ b/native/core/src/execution/columnar_to_row.rs @@ -674,6 +674,7 @@ impl<'a> TypedElements<'a> { TypedElements::Float64(arr) => bulk_copy_range!(arr, 8), TypedElements::Date32(arr) => bulk_copy_range!(arr, 4), TypedElements::TimestampMicro(arr) => bulk_copy_range!(arr, 8), + TypedElements::Time64Nano(arr) => bulk_copy_range!(arr, 8), _ => {} // Should not reach here due to supports_bulk_copy check } } @@ -846,7 +847,8 @@ fn is_fixed_width(data_type: &DataType) -> bool { | DataType::Float32 | DataType::Float64 | DataType::Date32 - | DataType::Timestamp(TimeUnit::Microsecond, _) => true, + | DataType::Timestamp(TimeUnit::Microsecond, _) + | DataType::Time64(TimeUnit::Nanosecond) => true, DataType::Decimal128(p, _) => *p <= MAX_LONG_DIGITS, _ => false, } @@ -1254,6 +1256,15 @@ impl ColumnarToRowContext { TimestampMicrosecondArray, |v: i64| v ), + DataType::Time64(TimeUnit::Nanosecond) => write_fixed_column_primitive!( + self, + array, + row_size, + field_offset_in_row, + num_rows, + Time64NanosecondArray, + |v: i64| v + ), DataType::Decimal128(precision, _) if *precision <= MAX_LONG_DIGITS => { write_fixed_column_primitive!( self, @@ -1379,6 +1390,9 @@ fn get_field_value(data_type: &DataType, array: &ArrayRef, row_idx: usize) -> Co DataType::Timestamp(TimeUnit::Microsecond, _) => { get_field_value_primitive!(array, row_idx, TimestampMicrosecondArray, |v: i64| v) } + DataType::Time64(TimeUnit::Nanosecond) => { + get_field_value_primitive!(array, row_idx, Time64NanosecondArray, |v: i64| v) + } DataType::Decimal128(precision, _) if *precision <= MAX_LONG_DIGITS => { get_field_value_primitive!(array, row_idx, Decimal128Array, |v: i128| v as i64) } diff --git a/native/spark-expr/src/datetime_funcs/to_time.rs b/native/spark-expr/src/datetime_funcs/to_time.rs index 8eecc34c14..e6d73734d8 100644 --- a/native/spark-expr/src/datetime_funcs/to_time.rs +++ b/native/spark-expr/src/datetime_funcs/to_time.rs @@ -333,7 +333,7 @@ mod tests { Some(1_000 * NANOS_PER_MICRO) ); // 6 digits - assert_eq!(string_to_time("00:00:00.000001"), Some(1 * NANOS_PER_MICRO)); + assert_eq!(string_to_time("00:00:00.000001"), Some(NANOS_PER_MICRO)); // Full precision assert_eq!( string_to_time("23:59:59.999999"), @@ -351,12 +351,12 @@ mod tests { // Single digit hour, minute, second assert_eq!( string_to_time("1:2:3"), - Some(1 * NANOS_PER_HOUR + 2 * NANOS_PER_MINUTE + 3 * NANOS_PER_SECOND) + Some(NANOS_PER_HOUR + 2 * NANOS_PER_MINUTE + 3 * NANOS_PER_SECOND) ); assert_eq!( string_to_time("1:2:3.04"), Some( - 1 * NANOS_PER_HOUR + NANOS_PER_HOUR + 2 * NANOS_PER_MINUTE + 3 * NANOS_PER_SECOND + 40_000 * NANOS_PER_MICRO @@ -369,7 +369,7 @@ mod tests { assert_eq!( string_to_time("T1:02:3.04"), Some( - 1 * NANOS_PER_HOUR + NANOS_PER_HOUR + 2 * NANOS_PER_MINUTE + 3 * NANOS_PER_SECOND + 40_000 * NANOS_PER_MICRO @@ -386,7 +386,7 @@ mod tests { // 12:00:00 AM = midnight assert_eq!(string_to_time("12:00:00 AM"), Some(0)); // 1:00:00 AM - assert_eq!(string_to_time("1:00:00 AM"), Some(1 * NANOS_PER_HOUR)); + assert_eq!(string_to_time("1:00:00 AM"), Some(NANOS_PER_HOUR)); // 11:59:59 AM assert_eq!( string_to_time("11:59:59 AM"),