From 6c15c4034416643965c73648e602b64994e83516 Mon Sep 17 00:00:00 2001 From: petitbonney Date: Tue, 24 Mar 2026 15:22:29 -0400 Subject: [PATCH] Support Spark 4.0.0 --- .../za/co/absa/abris/avro/functions.scala | 9 ++- .../avro/parsing/utils/AvroSchemaUtils.scala | 8 +- .../absa/abris/utils/SparkColumnCompat.scala | 77 +++++++++++++++++++ .../avro/sql/AvroDataToCatalystSpec.scala | 13 ++-- .../avro/sql/CatalystDataToAvroSpec.scala | 3 +- 5 files changed, 94 insertions(+), 16 deletions(-) create mode 100644 src/main/scala/za/co/absa/abris/utils/SparkColumnCompat.scala diff --git a/src/main/scala/za/co/absa/abris/avro/functions.scala b/src/main/scala/za/co/absa/abris/avro/functions.scala index 230f2506..efc6e7c8 100755 --- a/src/main/scala/za/co/absa/abris/avro/functions.scala +++ b/src/main/scala/za/co/absa/abris/avro/functions.scala @@ -19,6 +19,7 @@ package za.co.absa.abris.avro import org.apache.spark.sql.Column import za.co.absa.abris.avro.sql.{AvroDataToCatalyst, CatalystDataToAvro} import za.co.absa.abris.config.{AbrisConfig, FromAvroConfig, ToAvroConfig} +import za.co.absa.abris.utils.SparkColumnCompat.{col2expr, expr2col} // scalastyle:off: object.name @@ -35,8 +36,8 @@ object functions { def to_avro(column: Column, config: ToAvroConfig): Column = { config.validate() - new Column(CatalystDataToAvro( - column.expr, + expr2col(CatalystDataToAvro( + col2expr(column), config.abrisConfig() )) } @@ -64,8 +65,8 @@ object functions { def from_avro(column: Column, config: FromAvroConfig): Column = { config.validate() - new Column(AvroDataToCatalyst( - column.expr, + expr2col(AvroDataToCatalyst( + col2expr(column), config.abrisConfig(), config.schemaRegistryConf() )) diff --git a/src/main/scala/za/co/absa/abris/avro/parsing/utils/AvroSchemaUtils.scala b/src/main/scala/za/co/absa/abris/avro/parsing/utils/AvroSchemaUtils.scala index c23fcd21..58d57c5c 100644 --- a/src/main/scala/za/co/absa/abris/avro/parsing/utils/AvroSchemaUtils.scala +++ b/src/main/scala/za/co/absa/abris/avro/parsing/utils/AvroSchemaUtils.scala @@ -22,7 +22,7 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.sql.DataFrame import org.apache.spark.sql.avro.SchemaConverters -import org.apache.spark.sql.functions.struct +import org.apache.spark.sql.types.StructType import java.nio.charset.Charset import scala.collection.JavaConverters._ @@ -80,10 +80,8 @@ object AvroSchemaUtils { recordName: String, nameSpace: String ): Schema = { - val allColumns = struct(columnNames.map(dataFrame.col): _*) - val expression = allColumns.expr - - SchemaConverters.toAvroType(expression.dataType, expression.nullable, recordName, nameSpace) + val structType = StructType(columnNames.map(dataFrame.schema(_))) + SchemaConverters.toAvroType(structType, nullable = false, recordName, nameSpace) } def toAvroSchema( diff --git a/src/main/scala/za/co/absa/abris/utils/SparkColumnCompat.scala b/src/main/scala/za/co/absa/abris/utils/SparkColumnCompat.scala new file mode 100644 index 00000000..e72fe8d1 --- /dev/null +++ b/src/main/scala/za/co/absa/abris/utils/SparkColumnCompat.scala @@ -0,0 +1,77 @@ +/* + * Copyright 2024 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.abris.utils + +import org.apache.spark.sql.Column +import org.apache.spark.sql.catalyst.expressions.Expression + +/** + * Compatibility layer for Column <-> Expression conversions across Spark versions. + * + * In Spark 3.x, Column has a direct `.expr` property and a constructor taking an Expression. + * In Spark 4.x, Column wraps a ColumnNode AST and conversions go through + * `org.apache.spark.sql.classic.ColumnConversions` and `ExpressionUtils`. + * + * This object uses reflection to support both versions from a single codebase, + * following the same pattern used in AbrisAvroDeserializer. + */ +object SparkColumnCompat { + + /** Convert a Column to a Catalyst Expression. Replaces `column.expr`. */ + lazy val col2expr: Column => Expression = { + // Try Spark 3.x first: Column.expr is a direct method + val spark3Method = try { + Some(classOf[Column].getMethod("expr")) + } catch { + case _: NoSuchMethodException => None + } + + spark3Method match { + case Some(method) => + (column: Column) => method.invoke(column).asInstanceOf[Expression] + + case None => + // Spark 4.x: use org.apache.spark.sql.classic.ColumnConversions.expression(column) + val clazz = Class.forName("org.apache.spark.sql.classic.ColumnConversions$") + val instance = clazz.getField("MODULE$").get(null) + val method = clazz.getMethod("expression", classOf[Column]) + (column: Column) => method.invoke(instance, column).asInstanceOf[Expression] + } + } + + /** Convert a Catalyst Expression to a Column. Replaces `new Column(expr)`. */ + lazy val expr2col: Expression => Column = { + // Try Spark 3.x first: new Column(Expression) + val spark3Ctor = try { + Some(classOf[Column].getConstructor(classOf[Expression])) + } catch { + case _: NoSuchMethodException => None + } + + spark3Ctor match { + case Some(ctor) => + (expr: Expression) => ctor.newInstance(expr) + + case None => + // Spark 4.x: use ExpressionUtils.column(expr) + val clazz = Class.forName("org.apache.spark.sql.classic.ExpressionUtils$") + val instance = clazz.getField("MODULE$").get(null) + val method = clazz.getMethod("column", classOf[Expression]) + (expr: Expression) => method.invoke(instance, expr).asInstanceOf[Column] + } + } +} diff --git a/src/test/scala/za/co/absa/abris/avro/sql/AvroDataToCatalystSpec.scala b/src/test/scala/za/co/absa/abris/avro/sql/AvroDataToCatalystSpec.scala index ffedd866..3add7b5b 100644 --- a/src/test/scala/za/co/absa/abris/avro/sql/AvroDataToCatalystSpec.scala +++ b/src/test/scala/za/co/absa/abris/avro/sql/AvroDataToCatalystSpec.scala @@ -32,6 +32,7 @@ import za.co.absa.abris.avro.functions._ import za.co.absa.abris.avro.utils.AvroSchemaEncoder import za.co.absa.abris.config.{AbrisConfig, FromAvroConfig} import za.co.absa.abris.examples.data.generation.TestSchemas +import za.co.absa.abris.utils.SparkColumnCompat.col2expr import java.util.Collections import java.nio.ByteBuffer @@ -66,7 +67,7 @@ class AvroDataToCatalystSpec extends AnyFlatSpec with Matchers with BeforeAndAft )) val column = from_avro(col("avroBytes"), fromAvroConfig) - column.expr.toString() should not include sensitiveData + col2expr(column).toString() should not include sensitiveData } it should "use the default schema converter by default" in { @@ -84,7 +85,7 @@ class AvroDataToCatalystSpec extends AnyFlatSpec with Matchers with BeforeAndAft )) val column = from_avro(col("avroBytes"), fromAvroConfig) - column.expr.dataType shouldBe expectedDataType + col2expr(column).dataType shouldBe expectedDataType } it should "use a custom schema converter identified by the short name" in { @@ -99,7 +100,7 @@ class AvroDataToCatalystSpec extends AnyFlatSpec with Matchers with BeforeAndAft .withSchemaConverter(DummySchemaConverter.name) val column = from_avro(col("avroBytes"), fromAvroConfig) - column.expr.dataType shouldBe DummySchemaConverter.dataType + col2expr(column).dataType shouldBe DummySchemaConverter.dataType } it should "use a custom schema converter identified by the fully qualified name" in { @@ -114,7 +115,7 @@ class AvroDataToCatalystSpec extends AnyFlatSpec with Matchers with BeforeAndAft .withSchemaConverter("za.co.absa.abris.avro.sql.DummySchemaConverter") val column = from_avro(col("avroBytes"), fromAvroConfig) - column.expr.dataType shouldBe DummySchemaConverter.dataType + col2expr(column).dataType shouldBe DummySchemaConverter.dataType } it should "throw an error if the specified custom schema converter does not exist" in { @@ -128,14 +129,14 @@ class AvroDataToCatalystSpec extends AnyFlatSpec with Matchers with BeforeAndAft )) .withSchemaConverter("nonexistent") - val ex = intercept[ClassNotFoundException](from_avro(col("avroBytes"), fromAvroConfig).expr.dataType) + val ex = intercept[ClassNotFoundException](col2expr(from_avro(col("avroBytes"), fromAvroConfig)).dataType) ex.getMessage should include ("nonexistent") } it should "be serializable" in { val schemaString = TestSchemas.NATIVE_SIMPLE_NESTED_SCHEMA val config = FromAvroConfig().withReaderSchema(schemaString) - val avroDataToCatalyst = from_avro(col("col"), config).expr + val avroDataToCatalyst = col2expr(from_avro(col("col"), config)) val javaSerializer = new JavaSerializer(new SparkConf()) javaSerializer.newInstance().serialize(avroDataToCatalyst) diff --git a/src/test/scala/za/co/absa/abris/avro/sql/CatalystDataToAvroSpec.scala b/src/test/scala/za/co/absa/abris/avro/sql/CatalystDataToAvroSpec.scala index 9bdc0950..59fc9646 100644 --- a/src/test/scala/za/co/absa/abris/avro/sql/CatalystDataToAvroSpec.scala +++ b/src/test/scala/za/co/absa/abris/avro/sql/CatalystDataToAvroSpec.scala @@ -25,6 +25,7 @@ import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers import za.co.absa.abris.avro.functions._ import za.co.absa.abris.config.ToAvroConfig +import za.co.absa.abris.utils.SparkColumnCompat.col2expr class CatalystDataToAvroSpec extends AnyFlatSpec with Matchers with BeforeAndAfterEach { it should "be serializable" in { @@ -36,7 +37,7 @@ class CatalystDataToAvroSpec extends AnyFlatSpec with Matchers with BeforeAndAft .endRecord() .toString val config = ToAvroConfig().withSchema(schema) - val catalystDataToAvro = to_avro(col("col"), config).expr + val catalystDataToAvro = col2expr(to_avro(col("col"), config)) val javaSerializer = new JavaSerializer(new SparkConf()) javaSerializer.newInstance().serialize(catalystDataToAvro)