From 768b3e90f261c7aea58bdb98dc698b90deeeae34 Mon Sep 17 00:00:00 2001 From: Kazantsev Maksim Date: Sun, 14 Dec 2025 16:24:01 +0400 Subject: [PATCH 1/5] impl map_from_entries --- native/core/src/execution/jni_api.rs | 2 + .../apache/comet/serde/QueryPlanSerde.scala | 3 +- .../scala/org/apache/comet/serde/maps.scala | 29 +++++++++++- .../comet/CometMapExpressionSuite.scala | 45 +++++++++++++++++++ 4 files changed, 77 insertions(+), 2 deletions(-) diff --git a/native/core/src/execution/jni_api.rs b/native/core/src/execution/jni_api.rs index a24d993059..4f53cea3e6 100644 --- a/native/core/src/execution/jni_api.rs +++ b/native/core/src/execution/jni_api.rs @@ -46,6 +46,7 @@ use datafusion_spark::function::datetime::date_add::SparkDateAdd; use datafusion_spark::function::datetime::date_sub::SparkDateSub; use datafusion_spark::function::hash::sha1::SparkSha1; use datafusion_spark::function::hash::sha2::SparkSha2; +use datafusion_spark::function::map::map_from_entries::MapFromEntries; use datafusion_spark::function::math::expm1::SparkExpm1; use datafusion_spark::function::string::char::CharFunc; use datafusion_spark::function::string::concat::SparkConcat; @@ -337,6 +338,7 @@ fn register_datafusion_spark_function(session_ctx: &SessionContext) { session_ctx.register_udf(ScalarUDF::new_from_impl(SparkSha1::default())); session_ctx.register_udf(ScalarUDF::new_from_impl(SparkConcat::default())); session_ctx.register_udf(ScalarUDF::new_from_impl(SparkBitwiseNot::default())); + session_ctx.register_udf(ScalarUDF::new_from_impl(MapFromEntries::default())); } /// Prepares arrow arrays for output. diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 54df2f1688..a99cf3824b 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -125,7 +125,8 @@ object QueryPlanSerde extends Logging with CometExprShim { classOf[MapKeys] -> CometMapKeys, classOf[MapEntries] -> CometMapEntries, classOf[MapValues] -> CometMapValues, - classOf[MapFromArrays] -> CometMapFromArrays) + classOf[MapFromArrays] -> CometMapFromArrays, + classOf[MapFromEntries] -> CometMapFromEntries) private val structExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map( classOf[CreateNamedStruct] -> CometCreateNamedStruct, diff --git a/spark/src/main/scala/org/apache/comet/serde/maps.scala b/spark/src/main/scala/org/apache/comet/serde/maps.scala index 2e217f6af0..498aa3594c 100644 --- a/spark/src/main/scala/org/apache/comet/serde/maps.scala +++ b/spark/src/main/scala/org/apache/comet/serde/maps.scala @@ -19,9 +19,12 @@ package org.apache.comet.serde +import scala.annotation.tailrec + import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.types.{ArrayType, MapType} +import org.apache.spark.sql.types.{ArrayType, BinaryType, DataType, MapType, StructType} +import org.apache.comet.serde.CometArrayReverse.containsBinary import org.apache.comet.serde.QueryPlanSerde.{exprToProtoInternal, optExprWithInfo, scalarFunctionExprToProto, scalarFunctionExprToProtoWithReturnType} object CometMapKeys extends CometExpressionSerde[MapKeys] { @@ -89,3 +92,27 @@ object CometMapFromArrays extends CometExpressionSerde[MapFromArrays] { optExprWithInfo(mapFromArraysExpr, expr, expr.children: _*) } } + +object CometMapFromEntries extends CometScalarFunction[MapFromEntries]("map_from_entries") { + val keyUnsupportedReason = "Using BinaryType as Map keys is not allowed in map_from_entries" + val valueUnsupportedReason = "Using BinaryType as Map values is not allowed in map_from_entries" + + private def containsBinary(dataType: DataType): Boolean = { + dataType match { + case BinaryType => true + case StructType(fields) => fields.exists(field => containsBinary(field.dataType)) + case ArrayType(elementType, _) => containsBinary(elementType) + case _ => false + } + } + + override def getSupportLevel(expr: MapFromEntries): SupportLevel = { + if (containsBinary(expr.dataType.keyType)) { + return Incompatible(Some(keyUnsupportedReason)) + } + if (containsBinary(expr.dataType.valueType)) { + return Incompatible(Some(valueUnsupportedReason)) + } + Compatible(None) + } +} diff --git a/spark/src/test/scala/org/apache/comet/CometMapExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometMapExpressionSuite.scala index 88c13391a6..01b9744ed6 100644 --- a/spark/src/test/scala/org/apache/comet/CometMapExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometMapExpressionSuite.scala @@ -25,7 +25,9 @@ import org.apache.hadoop.fs.Path import org.apache.spark.sql.CometTestBase import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.BinaryType +import org.apache.comet.serde.CometMapFromEntries import org.apache.comet.testing.{DataGenOptions, ParquetGenerator, SchemaGenOptions} class CometMapExpressionSuite extends CometTestBase { @@ -125,4 +127,47 @@ class CometMapExpressionSuite extends CometTestBase { } } + test("map_from_entries") { + withTempDir { dir => + val path = new Path(dir.toURI.toString, "test.parquet") + val filename = path.toString + val random = new Random(42) + withSQLConf(CometConf.COMET_ENABLED.key -> "false") { + val schemaGenOptions = + SchemaGenOptions( + generateArray = true, + generateStruct = true, + primitiveTypes = SchemaGenOptions.defaultPrimitiveTypes.filterNot(_ == BinaryType)) + val dataGenOptions = DataGenOptions(allowNull = false, generateNegativeZero = false) + ParquetGenerator.makeParquetFile( + random, + spark, + filename, + 100, + schemaGenOptions, + dataGenOptions) + } + val df = spark.read.parquet(filename) + df.createOrReplaceTempView("t1") + for (field <- df.schema.fieldNames) { + checkSparkAnswerAndOperator( + spark.sql(s"SELECT map_from_entries(array(struct($field as a, $field as b))) FROM t1")) + } + } + } + + test("map_from_entries - fallback for binary type") { + val table = "t2" + withTable(table) { + sql( + s"create table $table using parquet as select cast(array() as array) as c1 from range(10)") + checkSparkAnswerAndFallbackReason( + sql(s"select map_from_entries(array(struct(c1, 0))) from $table"), + CometMapFromEntries.keyUnsupportedReason) + checkSparkAnswerAndFallbackReason( + sql(s"select map_from_entries(array(struct(0, c1))) from $table"), + CometMapFromEntries.valueUnsupportedReason) + } + } + } From c68c3428676b5d991e7ba9e13464bf2ce1ec84e8 Mon Sep 17 00:00:00 2001 From: Kazantsev Maksim Date: Tue, 16 Dec 2025 16:10:43 +0400 Subject: [PATCH 2/5] Revert "impl map_from_entries" This reverts commit 768b3e90f261c7aea58bdb98dc698b90deeeae34. --- native/core/src/execution/jni_api.rs | 2 - .../apache/comet/serde/QueryPlanSerde.scala | 3 +- .../scala/org/apache/comet/serde/maps.scala | 29 +----------- .../comet/CometMapExpressionSuite.scala | 45 ------------------- 4 files changed, 2 insertions(+), 77 deletions(-) diff --git a/native/core/src/execution/jni_api.rs b/native/core/src/execution/jni_api.rs index 4f53cea3e6..a24d993059 100644 --- a/native/core/src/execution/jni_api.rs +++ b/native/core/src/execution/jni_api.rs @@ -46,7 +46,6 @@ use datafusion_spark::function::datetime::date_add::SparkDateAdd; use datafusion_spark::function::datetime::date_sub::SparkDateSub; use datafusion_spark::function::hash::sha1::SparkSha1; use datafusion_spark::function::hash::sha2::SparkSha2; -use datafusion_spark::function::map::map_from_entries::MapFromEntries; use datafusion_spark::function::math::expm1::SparkExpm1; use datafusion_spark::function::string::char::CharFunc; use datafusion_spark::function::string::concat::SparkConcat; @@ -338,7 +337,6 @@ fn register_datafusion_spark_function(session_ctx: &SessionContext) { session_ctx.register_udf(ScalarUDF::new_from_impl(SparkSha1::default())); session_ctx.register_udf(ScalarUDF::new_from_impl(SparkConcat::default())); session_ctx.register_udf(ScalarUDF::new_from_impl(SparkBitwiseNot::default())); - session_ctx.register_udf(ScalarUDF::new_from_impl(MapFromEntries::default())); } /// Prepares arrow arrays for output. diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index a99cf3824b..54df2f1688 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -125,8 +125,7 @@ object QueryPlanSerde extends Logging with CometExprShim { classOf[MapKeys] -> CometMapKeys, classOf[MapEntries] -> CometMapEntries, classOf[MapValues] -> CometMapValues, - classOf[MapFromArrays] -> CometMapFromArrays, - classOf[MapFromEntries] -> CometMapFromEntries) + classOf[MapFromArrays] -> CometMapFromArrays) private val structExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map( classOf[CreateNamedStruct] -> CometCreateNamedStruct, diff --git a/spark/src/main/scala/org/apache/comet/serde/maps.scala b/spark/src/main/scala/org/apache/comet/serde/maps.scala index 498aa3594c..2e217f6af0 100644 --- a/spark/src/main/scala/org/apache/comet/serde/maps.scala +++ b/spark/src/main/scala/org/apache/comet/serde/maps.scala @@ -19,12 +19,9 @@ package org.apache.comet.serde -import scala.annotation.tailrec - import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.types.{ArrayType, BinaryType, DataType, MapType, StructType} +import org.apache.spark.sql.types.{ArrayType, MapType} -import org.apache.comet.serde.CometArrayReverse.containsBinary import org.apache.comet.serde.QueryPlanSerde.{exprToProtoInternal, optExprWithInfo, scalarFunctionExprToProto, scalarFunctionExprToProtoWithReturnType} object CometMapKeys extends CometExpressionSerde[MapKeys] { @@ -92,27 +89,3 @@ object CometMapFromArrays extends CometExpressionSerde[MapFromArrays] { optExprWithInfo(mapFromArraysExpr, expr, expr.children: _*) } } - -object CometMapFromEntries extends CometScalarFunction[MapFromEntries]("map_from_entries") { - val keyUnsupportedReason = "Using BinaryType as Map keys is not allowed in map_from_entries" - val valueUnsupportedReason = "Using BinaryType as Map values is not allowed in map_from_entries" - - private def containsBinary(dataType: DataType): Boolean = { - dataType match { - case BinaryType => true - case StructType(fields) => fields.exists(field => containsBinary(field.dataType)) - case ArrayType(elementType, _) => containsBinary(elementType) - case _ => false - } - } - - override def getSupportLevel(expr: MapFromEntries): SupportLevel = { - if (containsBinary(expr.dataType.keyType)) { - return Incompatible(Some(keyUnsupportedReason)) - } - if (containsBinary(expr.dataType.valueType)) { - return Incompatible(Some(valueUnsupportedReason)) - } - Compatible(None) - } -} diff --git a/spark/src/test/scala/org/apache/comet/CometMapExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometMapExpressionSuite.scala index 01b9744ed6..88c13391a6 100644 --- a/spark/src/test/scala/org/apache/comet/CometMapExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometMapExpressionSuite.scala @@ -25,9 +25,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark.sql.CometTestBase import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.BinaryType -import org.apache.comet.serde.CometMapFromEntries import org.apache.comet.testing.{DataGenOptions, ParquetGenerator, SchemaGenOptions} class CometMapExpressionSuite extends CometTestBase { @@ -127,47 +125,4 @@ class CometMapExpressionSuite extends CometTestBase { } } - test("map_from_entries") { - withTempDir { dir => - val path = new Path(dir.toURI.toString, "test.parquet") - val filename = path.toString - val random = new Random(42) - withSQLConf(CometConf.COMET_ENABLED.key -> "false") { - val schemaGenOptions = - SchemaGenOptions( - generateArray = true, - generateStruct = true, - primitiveTypes = SchemaGenOptions.defaultPrimitiveTypes.filterNot(_ == BinaryType)) - val dataGenOptions = DataGenOptions(allowNull = false, generateNegativeZero = false) - ParquetGenerator.makeParquetFile( - random, - spark, - filename, - 100, - schemaGenOptions, - dataGenOptions) - } - val df = spark.read.parquet(filename) - df.createOrReplaceTempView("t1") - for (field <- df.schema.fieldNames) { - checkSparkAnswerAndOperator( - spark.sql(s"SELECT map_from_entries(array(struct($field as a, $field as b))) FROM t1")) - } - } - } - - test("map_from_entries - fallback for binary type") { - val table = "t2" - withTable(table) { - sql( - s"create table $table using parquet as select cast(array() as array) as c1 from range(10)") - checkSparkAnswerAndFallbackReason( - sql(s"select map_from_entries(array(struct(c1, 0))) from $table"), - CometMapFromEntries.keyUnsupportedReason) - checkSparkAnswerAndFallbackReason( - sql(s"select map_from_entries(array(struct(0, c1))) from $table"), - CometMapFromEntries.valueUnsupportedReason) - } - } - } From 7ce248a0ffdd9708365b20592558b79c3a5673b6 Mon Sep 17 00:00:00 2001 From: Kazantsev Maksim Date: Sun, 1 Mar 2026 16:07:30 +0400 Subject: [PATCH 3/5] Use datafusion impl of bit_count --- native/core/src/execution/jni_api.rs | 2 + .../src/bitwise_funcs/bitwise_count.rs | 193 ------------------ native/spark-expr/src/bitwise_funcs/mod.rs | 20 -- native/spark-expr/src/comet_scalar_funcs.rs | 5 +- native/spark-expr/src/lib.rs | 2 - .../org/apache/comet/serde/bitwise.scala | 12 +- 6 files changed, 5 insertions(+), 229 deletions(-) delete mode 100644 native/spark-expr/src/bitwise_funcs/bitwise_count.rs delete mode 100644 native/spark-expr/src/bitwise_funcs/mod.rs diff --git a/native/core/src/execution/jni_api.rs b/native/core/src/execution/jni_api.rs index 0193f3012c..316a76cf69 100644 --- a/native/core/src/execution/jni_api.rs +++ b/native/core/src/execution/jni_api.rs @@ -400,6 +400,7 @@ fn register_datafusion_spark_function(session_ctx: &SessionContext) { session_ctx.register_udf(ScalarUDF::new_from_impl(SparkWidthBucket::default())); session_ctx.register_udf(ScalarUDF::new_from_impl(MapFromEntries::default())); session_ctx.register_udf(ScalarUDF::new_from_impl(SparkCrc32::default())); + session_ctx.register_udf(ScalarUDF::new_from_impl(SparkBitCount::default())); } /// Prepares arrow arrays for output. @@ -910,6 +911,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_logMemoryUsage( use crate::execution::columnar_to_row::ColumnarToRowContext; use arrow::ffi::{from_ffi, FFI_ArrowArray, FFI_ArrowSchema}; +use datafusion_spark::function::bitwise::bit_count::SparkBitCount; /// Initialize a native columnar to row converter. /// diff --git a/native/spark-expr/src/bitwise_funcs/bitwise_count.rs b/native/spark-expr/src/bitwise_funcs/bitwise_count.rs deleted file mode 100644 index b65c507320..0000000000 --- a/native/spark-expr/src/bitwise_funcs/bitwise_count.rs +++ /dev/null @@ -1,193 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use arrow::{array::*, datatypes::DataType}; -use datafusion::common::{exec_err, internal_datafusion_err, Result}; -use datafusion::logical_expr::{ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility}; -use datafusion::{error::DataFusionError, logical_expr::ColumnarValue}; -use std::any::Any; -use std::sync::Arc; - -#[derive(Debug, PartialEq, Eq, Hash)] -pub struct SparkBitwiseCount { - signature: Signature, - aliases: Vec, -} - -impl Default for SparkBitwiseCount { - fn default() -> Self { - Self::new() - } -} - -impl SparkBitwiseCount { - pub fn new() -> Self { - Self { - signature: Signature::user_defined(Volatility::Immutable), - aliases: vec![], - } - } -} - -impl ScalarUDFImpl for SparkBitwiseCount { - fn as_any(&self) -> &dyn Any { - self - } - - fn name(&self) -> &str { - "bit_count" - } - - fn signature(&self) -> &Signature { - &self.signature - } - - fn return_type(&self, _: &[DataType]) -> Result { - Ok(DataType::Int32) - } - - fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { - let args: [ColumnarValue; 1] = args - .args - .try_into() - .map_err(|_| internal_datafusion_err!("bit_count expects exactly one argument"))?; - spark_bit_count(args) - } - - fn aliases(&self) -> &[String] { - &self.aliases - } -} - -macro_rules! compute_op { - ($OPERAND:expr, $DT:ident) => {{ - let operand = $OPERAND.as_any().downcast_ref::<$DT>().ok_or_else(|| { - DataFusionError::Execution(format!( - "compute_op failed to downcast array to: {:?}", - stringify!($DT) - )) - })?; - - let result: Int32Array = operand - .iter() - .map(|x| x.map(|y| bit_count(y.into()))) - .collect(); - - Ok(Arc::new(result)) - }}; -} - -pub fn spark_bit_count(args: [ColumnarValue; 1]) -> Result { - match args { - [ColumnarValue::Array(array)] => { - let result: Result = match array.data_type() { - DataType::Int8 | DataType::Boolean => compute_op!(array, Int8Array), - DataType::Int16 => compute_op!(array, Int16Array), - DataType::Int32 => compute_op!(array, Int32Array), - DataType::Int64 => compute_op!(array, Int64Array), - _ => exec_err!("bit_count can't be evaluated because the array's type is {:?}, not signed int/boolean", array.data_type()), - }; - result.map(ColumnarValue::Array) - } - [ColumnarValue::Scalar(scalar)] => { - use datafusion::common::ScalarValue; - let result = match scalar { - ScalarValue::Int8(Some(v)) => bit_count(v as i64), - ScalarValue::Int16(Some(v)) => bit_count(v as i64), - ScalarValue::Int32(Some(v)) => bit_count(v as i64), - ScalarValue::Int64(Some(v)) => bit_count(v), - ScalarValue::Boolean(Some(v)) => bit_count(if v { 1 } else { 0 }), - ScalarValue::Int8(None) - | ScalarValue::Int16(None) - | ScalarValue::Int32(None) - | ScalarValue::Int64(None) - | ScalarValue::Boolean(None) => { - return Ok(ColumnarValue::Scalar(ScalarValue::Int32(None))) - } - _ => { - return exec_err!( - "bit_count can't be evaluated because the scalar's type is {:?}, not signed int/boolean", - scalar.data_type() - ) - } - }; - Ok(ColumnarValue::Scalar(ScalarValue::Int32(Some(result)))) - } - } -} - -// Here’s the equivalent Rust implementation of the bitCount function (similar to Java's bitCount for LongType) -fn bit_count(i: i64) -> i32 { - let mut u = i as u64; - u = u - ((u >> 1) & 0x5555555555555555); - u = (u & 0x3333333333333333) + ((u >> 2) & 0x3333333333333333); - u = (u + (u >> 4)) & 0x0f0f0f0f0f0f0f0f; - u = u + (u >> 8); - u = u + (u >> 16); - u = u + (u >> 32); - (u as i32) & 0x7f -} - -#[cfg(test)] -mod tests { - use datafusion::common::{cast::as_int32_array, Result, ScalarValue}; - - use super::*; - - #[test] - fn bitwise_count_op() -> Result<()> { - let args = ColumnarValue::Array(Arc::new(Int32Array::from(vec![ - Some(1), - None, - Some(12345), - Some(89), - Some(-3456), - Some(i32::MIN), - Some(i32::MAX), - ]))); - let expected = &Int32Array::from(vec![ - Some(1), - None, - Some(6), - Some(4), - Some(54), - Some(33), - Some(31), - ]); - - let ColumnarValue::Array(result) = spark_bit_count([args])? else { - unreachable!() - }; - - let result = as_int32_array(&result).expect("failed to downcast to In32Array"); - assert_eq!(result, expected); - - Ok(()) - } - - #[test] - fn bitwise_count_scalar() { - let args = ColumnarValue::Scalar(ScalarValue::Int64(Some(i64::MAX))); - - match spark_bit_count([args]) { - Ok(ColumnarValue::Scalar(ScalarValue::Int32(Some(actual)))) => { - assert_eq!(actual, 63) - } - _ => unreachable!(), - } - } -} diff --git a/native/spark-expr/src/bitwise_funcs/mod.rs b/native/spark-expr/src/bitwise_funcs/mod.rs deleted file mode 100644 index d96857e230..0000000000 --- a/native/spark-expr/src/bitwise_funcs/mod.rs +++ /dev/null @@ -1,20 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -mod bitwise_count; - -pub use bitwise_count::SparkBitwiseCount; diff --git a/native/spark-expr/src/comet_scalar_funcs.rs b/native/spark-expr/src/comet_scalar_funcs.rs index 4bfdef7096..babdb19623 100644 --- a/native/spark-expr/src/comet_scalar_funcs.rs +++ b/native/spark-expr/src/comet_scalar_funcs.rs @@ -22,8 +22,8 @@ use crate::math_funcs::modulo_expr::spark_modulo; use crate::{ spark_ceil, spark_decimal_div, spark_decimal_integral_div, spark_floor, spark_isnan, spark_lpad, spark_make_decimal, spark_read_side_padding, spark_round, spark_rpad, spark_unhex, - spark_unscaled_value, EvalMode, SparkBitwiseCount, SparkContains, SparkDateDiff, - SparkDateTrunc, SparkMakeDate, SparkSizeFunc, SparkStringSpace, + spark_unscaled_value, EvalMode, SparkContains, SparkDateDiff, SparkDateTrunc, SparkMakeDate, + SparkSizeFunc, SparkStringSpace, }; use arrow::datatypes::DataType; use datafusion::common::{DataFusionError, Result as DataFusionResult}; @@ -191,7 +191,6 @@ pub fn create_comet_physical_fun_with_eval_mode( fn all_scalar_functions() -> Vec> { vec![ - Arc::new(ScalarUDF::new_from_impl(SparkBitwiseCount::default())), Arc::new(ScalarUDF::new_from_impl(SparkContains::default())), Arc::new(ScalarUDF::new_from_impl(SparkDateDiff::default())), Arc::new(ScalarUDF::new_from_impl(SparkDateTrunc::default())), diff --git a/native/spark-expr/src/lib.rs b/native/spark-expr/src/lib.rs index 40eb180ab8..992bd94b0b 100644 --- a/native/spark-expr/src/lib.rs +++ b/native/spark-expr/src/lib.rs @@ -41,7 +41,6 @@ pub use predicate_funcs::{spark_isnan, RLike}; mod agg_funcs; mod array_funcs; -mod bitwise_funcs; mod comet_scalar_funcs; pub mod hash_funcs; @@ -61,7 +60,6 @@ mod math_funcs; mod nondetermenistic_funcs; pub use array_funcs::*; -pub use bitwise_funcs::*; pub use conditional_funcs::*; pub use conversion_funcs::*; pub use nondetermenistic_funcs::*; diff --git a/spark/src/main/scala/org/apache/comet/serde/bitwise.scala b/spark/src/main/scala/org/apache/comet/serde/bitwise.scala index 8020502ab3..ad168c3a0b 100644 --- a/spark/src/main/scala/org/apache/comet/serde/bitwise.scala +++ b/spark/src/main/scala/org/apache/comet/serde/bitwise.scala @@ -140,14 +140,4 @@ object CometBitwiseGet extends CometExpressionSerde[BitwiseGet] { } } -object CometBitwiseCount extends CometExpressionSerde[BitwiseCount] { - override def convert( - expr: BitwiseCount, - inputs: Seq[Attribute], - binding: Boolean): Option[ExprOuterClass.Expr] = { - val childProto = exprToProto(expr.child, inputs, binding) - val bitCountScalarExpr = - scalarFunctionExprToProtoWithReturnType("bit_count", IntegerType, false, childProto) - optExprWithInfo(bitCountScalarExpr, expr, expr.children: _*) - } -} +object CometBitwiseCount extends CometScalarFunction[BitwiseCount]("bit_count") From 77618bb8de368554f87cb360908039cdf824d6f2 Mon Sep 17 00:00:00 2001 From: Kazantsev Maksim Date: Mon, 2 Mar 2026 20:23:53 +0400 Subject: [PATCH 4/5] fix fmt --- spark/src/main/scala/org/apache/comet/serde/bitwise.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/serde/bitwise.scala b/spark/src/main/scala/org/apache/comet/serde/bitwise.scala index ad168c3a0b..6f5e11cc9f 100644 --- a/spark/src/main/scala/org/apache/comet/serde/bitwise.scala +++ b/spark/src/main/scala/org/apache/comet/serde/bitwise.scala @@ -19,10 +19,9 @@ package org.apache.comet.serde -import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.types.{ByteType, IntegerType, LongType} - import org.apache.comet.serde.QueryPlanSerde._ +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.types.{ByteType, LongType} object CometBitwiseAnd extends CometExpressionSerde[BitwiseAnd] { override def convert( From 16750cb5fb9f31ef39f054f306155bfd9507d0fa Mon Sep 17 00:00:00 2001 From: Kazantsev Maksim Date: Tue, 3 Mar 2026 19:57:04 +0400 Subject: [PATCH 5/5] Resolve conflicts --- native/core/src/execution/jni_api.rs | 4 ++-- native/spark-expr/src/comet_scalar_funcs.rs | 4 ++-- spark/src/main/scala/org/apache/comet/serde/bitwise.scala | 3 ++- 3 files changed, 6 insertions(+), 5 deletions(-) diff --git a/native/core/src/execution/jni_api.rs b/native/core/src/execution/jni_api.rs index 502ac74dd7..1030e30aaf 100644 --- a/native/core/src/execution/jni_api.rs +++ b/native/core/src/execution/jni_api.rs @@ -40,6 +40,7 @@ use datafusion::{ prelude::{SessionConfig, SessionContext}, }; use datafusion_comet_proto::spark_operator::Operator; +use datafusion_spark::function::bitwise::bit_count::SparkBitCount; use datafusion_spark::function::bitwise::bit_get::SparkBitGet; use datafusion_spark::function::bitwise::bitwise_not::SparkBitwiseNot; use datafusion_spark::function::datetime::date_add::SparkDateAdd; @@ -55,6 +56,7 @@ use datafusion_spark::function::math::hex::SparkHex; use datafusion_spark::function::math::width_bucket::SparkWidthBucket; use datafusion_spark::function::string::char::CharFunc; use datafusion_spark::function::string::concat::SparkConcat; +use datafusion_spark::function::string::space::SparkSpace; use futures::poll; use futures::stream::StreamExt; use jni::objects::JByteBuffer; @@ -912,8 +914,6 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_logMemoryUsage( use crate::execution::columnar_to_row::ColumnarToRowContext; use arrow::ffi::{from_ffi, FFI_ArrowArray, FFI_ArrowSchema}; -use datafusion_spark::function::string::space::SparkSpace; -use datafusion_spark::function::bitwise::bit_count::SparkBitCount; /// Initialize a native columnar to row converter. /// diff --git a/native/spark-expr/src/comet_scalar_funcs.rs b/native/spark-expr/src/comet_scalar_funcs.rs index f1af4b725d..ff75de763b 100644 --- a/native/spark-expr/src/comet_scalar_funcs.rs +++ b/native/spark-expr/src/comet_scalar_funcs.rs @@ -22,8 +22,8 @@ use crate::math_funcs::modulo_expr::spark_modulo; use crate::{ spark_ceil, spark_decimal_div, spark_decimal_integral_div, spark_floor, spark_isnan, spark_lpad, spark_make_decimal, spark_read_side_padding, spark_round, spark_rpad, spark_unhex, - spark_unscaled_value, EvalMode, SparkContains, SparkDateDiff, - SparkDateTrunc, SparkMakeDate, SparkSizeFunc, + spark_unscaled_value, EvalMode, SparkContains, SparkDateDiff, SparkDateTrunc, SparkMakeDate, + SparkSizeFunc, }; use arrow::datatypes::DataType; use datafusion::common::{DataFusionError, Result as DataFusionResult}; diff --git a/spark/src/main/scala/org/apache/comet/serde/bitwise.scala b/spark/src/main/scala/org/apache/comet/serde/bitwise.scala index 6f5e11cc9f..751fb7521f 100644 --- a/spark/src/main/scala/org/apache/comet/serde/bitwise.scala +++ b/spark/src/main/scala/org/apache/comet/serde/bitwise.scala @@ -19,10 +19,11 @@ package org.apache.comet.serde -import org.apache.comet.serde.QueryPlanSerde._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.types.{ByteType, LongType} +import org.apache.comet.serde.QueryPlanSerde._ + object CometBitwiseAnd extends CometExpressionSerde[BitwiseAnd] { override def convert( expr: BitwiseAnd,