Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions common/src/main/java/org/apache/comet/parquet/Native.java
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ public static native long initRecordBatchReader(
String sessionTimezone,
int batchSize,
boolean caseSensitive,
boolean returnNullStructIfAllFieldsMissing,
Map<String, String> objectStoreOptions,
CometFileKeyUnwrapper keyUnwrapper,
Object metricsNode);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,11 @@ public URI pathUri() throws URISyntaxException {
protected boolean isCaseSensitive;
protected boolean useFieldId;
protected boolean ignoreMissingIds;
// SPARK-53535 (Spark 4.1+): when reading a struct whose requested fields are all
// missing in the Parquet file, true returns the entire struct as null (legacy
// pre-4.1 behavior); false preserves the parent struct's nullness from the file
// so non-null parents materialize as a struct of all-null fields.
protected boolean returnNullStructIfAllFieldsMissing = true;
protected StructType partitionSchema;
protected InternalRow partitionValues;
protected PartitionedFile file;
Expand Down Expand Up @@ -278,6 +283,7 @@ private NativeBatchReader(AbstractColumnReader[] columnReaders) {
boolean useFieldId,
boolean ignoreMissingIds,
boolean useLegacyDateTimestamp,
boolean returnNullStructIfAllFieldsMissing,
StructType partitionSchema,
InternalRow partitionValues,
Map<String, SQLMetric> metrics,
Expand All @@ -290,6 +296,7 @@ private NativeBatchReader(AbstractColumnReader[] columnReaders) {
this.useFieldId = useFieldId;
this.ignoreMissingIds = ignoreMissingIds;
this.useLegacyDateTimestamp = useLegacyDateTimestamp;
this.returnNullStructIfAllFieldsMissing = returnNullStructIfAllFieldsMissing;
this.partitionSchema = partitionSchema;
this.partitionValues = partitionValues;
this.file = inputSplit;
Expand Down Expand Up @@ -578,6 +585,7 @@ public void init() throws Throwable {
timeZoneId,
batchSize,
caseSensitive,
returnNullStructIfAllFieldsMissing,
objectStoreOptions,
keyUnwrapper,
metricsNode);
Expand Down
50 changes: 5 additions & 45 deletions dev/diffs/4.1.1.diff
Original file line number Diff line number Diff line change
Expand Up @@ -2993,7 +2993,7 @@ index 6b73cc8618d..624694916fb 100644
case _ => assert(false, "Can not match ParquetTable in the query.")
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
index 3072657a095..b2293ccab17 100644
index 3072657a095..6b5b9103363 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
@@ -40,6 +40,7 @@ import org.apache.parquet.schema.{MessageType, MessageTypeParser}
Expand All @@ -3004,57 +3004,17 @@ index 3072657a095..b2293ccab17 100644
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, GenericInternalRow, UnsafeRow}
import org.apache.spark.sql.catalyst.util.{DateTimeConstants, DateTimeUtils}
@@ -765,7 +766,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
}
}

- test("vectorized reader: missing all struct fields") {
+ test("vectorized reader: missing all struct fields",
+ IgnoreComet("https://github.com/apache/datafusion-comet/issues/4136")) {
for {
offheapEnabled <- Seq(true, false)
returnNullStructIfAllFieldsMissing <- Seq(true, false)
@@ -803,7 +805,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
}
}

- test("SPARK-53535: vectorized reader: missing all struct fields, struct with complex fields") {
+ test("SPARK-53535: vectorized reader: missing all struct fields, struct with complex fields",
+ IgnoreComet("https://github.com/apache/datafusion-comet/issues/4136")) {
val data = Seq(
Row(Row(Seq(11, 12, null, 14), Row("21", 22), Row(true)), 100),
Row(Row(Seq(11, 12, null, 14), Row("21", 22), Row(false)), 100),
@@ -858,7 +861,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
}
}

- test("SPARK-53535: vectorized reader: missing all struct fields, struct with map field only") {
+ test("SPARK-53535: vectorized reader: missing all struct fields, struct with map field only",
+ IgnoreComet("https://github.com/apache/datafusion-comet/issues/4136")) {
val data = Seq(
Row(Row(Map("key1" -> 1)), 100),
Row(Row(Map("key2" -> 2)), 100),
@@ -903,7 +907,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
}

test("SPARK-53535: vectorized reader: missing all struct fields, " +
- "struct with cheap map and more expensive array field") {
+ "struct with cheap map and more expensive array field",
+ IgnoreComet("https://github.com/apache/datafusion-comet/issues/4136")) {
val data = Seq(
Row(Row(Map(false -> Row("expensive", 1)), Seq("test1")), 100),
Row(Row(Map(true -> Row("expensive", 2)), Seq("test2")), 100),
@@ -953,7 +958,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
@@ -953,7 +954,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
}
}

- test("SPARK-54220: vectorized reader: missing all struct fields, struct with NullType only") {
+ test("SPARK-54220: vectorized reader: missing all struct fields, struct with NullType only",
+ IgnoreComet("https://github.com/apache/datafusion-comet/issues/4136")) {
+ IgnoreComet("https://github.com/apache/datafusion-comet/issues/4199")) {
val data = Seq(
Tuple1((null, null)),
Tuple1((null, null)),
@@ -1282,7 +1288,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
@@ -1282,7 +1284,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
}
}

Expand All @@ -3064,7 +3024,7 @@ index 3072657a095..b2293ccab17 100644
val data = (1 to 4).map(i => Tuple1(i.toString))
val readSchema = StructType(Seq(StructField("_1", DataTypes.TimestampType)))

@@ -1567,7 +1574,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
@@ -1567,7 +1570,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
}
}

Expand Down
1 change: 1 addition & 0 deletions native/core/src/execution/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1248,6 +1248,7 @@ impl PhysicalPlanner {
default_values,
common.session_timezone.as_str(),
common.case_sensitive,
common.return_null_struct_if_all_fields_missing,
self.session_ctx(),
common.encryption_enabled,
)?;
Expand Down
2 changes: 2 additions & 0 deletions native/core/src/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_parquet_Native_initRecordBat
session_timezone: JString,
batch_size: jint,
case_sensitive: jboolean,
return_null_struct_if_all_fields_missing: jboolean,
object_store_options: JObject,
key_unwrapper_obj: JObject,
metrics_node: JObject,
Expand Down Expand Up @@ -511,6 +512,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_parquet_Native_initRecordBat
None,
session_timezone.as_str(),
case_sensitive != JNI_FALSE,
return_null_struct_if_all_fields_missing != JNI_FALSE,
session_ctx,
encryption_enabled,
)?;
Expand Down
5 changes: 5 additions & 0 deletions native/core/src/parquet/parquet_exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,14 @@ pub(crate) fn init_datasource_exec(
default_values: Option<HashMap<Column, ScalarValue>>,
session_timezone: &str,
case_sensitive: bool,
return_null_struct_if_all_fields_missing: bool,
session_ctx: &Arc<SessionContext>,
encryption_enabled: bool,
) -> Result<Arc<DataSourceExec>, ExecutionError> {
let (table_parquet_options, spark_parquet_options) = get_options(
session_timezone,
case_sensitive,
return_null_struct_if_all_fields_missing,
&object_store_url,
encryption_enabled,
);
Expand Down Expand Up @@ -185,6 +187,7 @@ pub(crate) fn init_datasource_exec(
fn get_options(
session_timezone: &str,
case_sensitive: bool,
return_null_struct_if_all_fields_missing: bool,
object_store_url: &ObjectStoreUrl,
encryption_enabled: bool,
) -> (TableParquetOptions, SparkParquetOptions) {
Expand All @@ -196,6 +199,8 @@ fn get_options(
SparkParquetOptions::new(EvalMode::Legacy, session_timezone, false);
spark_parquet_options.allow_cast_unsigned_ints = true;
spark_parquet_options.case_sensitive = case_sensitive;
spark_parquet_options.return_null_struct_if_all_fields_missing =
return_null_struct_if_all_fields_missing;

if encryption_enabled {
table_parquet_options.crypto.configure_factory(
Expand Down
26 changes: 19 additions & 7 deletions native/core/src/parquet/parquet_support.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,11 @@ pub struct SparkParquetOptions {
pub use_legacy_date_timestamp_or_ntz: bool,
// Whether schema field names are case sensitive
pub case_sensitive: bool,
/// SPARK-53535 (Spark 4.1+): when reading a struct whose requested fields are all
/// missing in the Parquet file, true returns the entire struct as null (pre-4.1
/// legacy behavior); false preserves the parent struct's nullness from the file
/// so non-null parents return a struct of all-null fields.
pub return_null_struct_if_all_fields_missing: bool,
}

impl SparkParquetOptions {
Expand All @@ -91,6 +96,7 @@ impl SparkParquetOptions {
use_decimal_128: false,
use_legacy_date_timestamp_or_ntz: false,
case_sensitive: false,
return_null_struct_if_all_fields_missing: true,
}
}

Expand All @@ -103,6 +109,7 @@ impl SparkParquetOptions {
use_decimal_128: false,
use_legacy_date_timestamp_or_ntz: false,
case_sensitive: false,
return_null_struct_if_all_fields_missing: true,
}
}
}
Expand Down Expand Up @@ -279,13 +286,18 @@ fn parquet_convert_struct_to_struct(
}
}

// If target schema doesn't contain any of the existing fields
// mark such a column in array as NULL
let nulls = if field_overlap {
array.nulls().cloned()
} else {
Some(NullBuffer::new_null(array.len()))
};
// When the file's struct contains none of the requested fields, the
// returned validity buffer depends on Spark's
// `spark.sql.legacy.parquet.returnNullStructIfAllFieldsMissing` (SPARK-53535,
// Spark 4.1+). Legacy mode marks the whole column null; the new default
// preserves the file's parent-row nullness so non-null parents materialize
// as a struct of all-null fields.
let nulls =
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May be a nit but I find it more readable if the if has the more common case and the else branch the special case.
Maybe if we flip it

let nulls = if !field_overlap && parquet_options.return_null_struct_if_all_fields_missing {                                                                                                                                                                                                                            
      Some(NullBuffer::new_null(array.len()))                                                                                                                                                                                                                                                                            
  } else {                                                                                                                                                                                                                                                                                                               
      array.nulls().cloned()                                                                                                                                                                                                                                                                                             
  };   

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated

if !field_overlap && parquet_options.return_null_struct_if_all_fields_missing {
Some(NullBuffer::new_null(array.len()))
} else {
array.nulls().cloned()
};

Ok(Arc::new(StructArray::new(
to_fields.clone(),
Expand Down
5 changes: 5 additions & 0 deletions native/proto/src/proto/operator.proto
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,11 @@ message NativeScanCommon {
bool encryption_enabled = 11;
string source = 12;
repeated spark.spark_expression.DataType fields = 13;
// SPARK-53535 (Spark 4.1+): when reading a struct whose requested fields are all
// missing in the Parquet file, true returns the entire struct as null (legacy
// pre-4.1 behavior); false preserves the parent struct's nullness from the file
// so non-null parents return a struct of all-null fields.
bool return_null_struct_if_all_fields_missing = 14;
}

message NativeScan {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import org.apache.spark.sql.types.{DateType, StructType, TimestampType}
import org.apache.spark.util.SerializableConfiguration

import org.apache.comet.CometConf
import org.apache.comet.CometSparkSessionExtensions.isSpark41Plus
import org.apache.comet.MetricsSupport
import org.apache.comet.shims.ShimSQLConf
import org.apache.comet.vector.CometVector
Expand Down Expand Up @@ -96,6 +97,15 @@ class CometParquetFileFormat(session: SparkSession)
val isCaseSensitive = sqlConf.caseSensitiveAnalysis
val useFieldId = CometParquetUtils.readFieldId(sqlConf)
val ignoreMissingIds = CometParquetUtils.ignoreMissingIds(sqlConf)
// SPARK-53535 (Spark 4.1+): when reading a struct whose requested fields are all
// missing in the Parquet file, the new default preserves the parent struct's
// nullness from the file. Pre-4.1 Spark hardcodes the legacy behavior, so we
// default to "true" there for backwards compatibility.
val returnNullStructIfAllFieldsMissing = sqlConf
.getConfString(
"spark.sql.legacy.parquet.returnNullStructIfAllFieldsMissing",
if (isSpark41Plus) "false" else "true")
.toBoolean
val pushDownDate = sqlConf.parquetFilterPushDownDate
val pushDownTimestamp = sqlConf.parquetFilterPushDownTimestamp
val pushDownDecimal = sqlConf.parquetFilterPushDownDecimal
Expand Down Expand Up @@ -158,6 +168,7 @@ class CometParquetFileFormat(session: SparkSession)
useFieldId,
ignoreMissingIds,
datetimeRebaseSpec.mode == CORRECTED,
returnNullStructIfAllFieldsMissing,
partitionSchema,
file.partitionValues,
metrics.asJava,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.apache.spark.sql.internal.SQLConf

import org.apache.comet.{CometConf, ConfigEntry}
import org.apache.comet.CometConf.COMET_EXEC_ENABLED
import org.apache.comet.CometSparkSessionExtensions.{hasExplainInfo, isSpark35Plus, withInfo}
import org.apache.comet.CometSparkSessionExtensions.{hasExplainInfo, isSpark35Plus, isSpark41Plus, withInfo}
import org.apache.comet.objectstore.NativeConfig
import org.apache.comet.parquet.CometParquetUtils
import org.apache.comet.serde.{CometOperatorSerde, Compatible, OperatorOuterClass, SupportLevel}
Expand Down Expand Up @@ -189,6 +189,17 @@ object CometNativeScan extends CometOperatorSerde[CometScanExec] with Logging {
commonBuilder.setSessionTimezone(scan.conf.getConfString("spark.sql.session.timeZone"))
commonBuilder.setCaseSensitive(scan.conf.getConf[Boolean](SQLConf.CASE_SENSITIVE))

// SPARK-53535 (Spark 4.1+): when reading a struct whose requested fields are all
// missing in the Parquet file, the new default preserves the parent struct's
// nullness from the file (so non-null parents materialize as a struct of all-null
// fields). Pre-4.1 Spark hardcodes the legacy behavior (whole struct null), which
// matches the Comet default we use as fallback.
val returnNullStructConfKey =
"spark.sql.legacy.parquet.returnNullStructIfAllFieldsMissing"
val returnNullStructDefault = if (isSpark41Plus) "false" else "true"
commonBuilder.setReturnNullStructIfAllFieldsMissing(
scan.conf.getConfString(returnNullStructConfKey, returnNullStructDefault).toBoolean)

// Collect S3/cloud storage configurations
val hadoopConf = scan.relation.sparkSession.sessionState
.newHadoopConfWithOptions(scan.relation.options)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2998,7 +2998,13 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
CometConf.COMET_EXPLAIN_FALLBACK_ENABLED.key -> "false",
CometConf.COMET_NATIVE_SCAN_IMPL.key -> "native_datafusion",
SQLConf.PARQUET_VECTORIZED_READER_NESTED_COLUMN_ENABLED.key -> "true",
SQLConf.COLUMN_VECTOR_OFFHEAP_ENABLED.key -> offheapEnabled.toString) {
SQLConf.COLUMN_VECTOR_OFFHEAP_ENABLED.key -> offheapEnabled.toString,
// SPARK-53535 (Spark 4.1+) flipped the default to "false", which preserves the parent
// struct's nullness so non-null parents materialise as Row(Row(null, null)). This test
// asserts the legacy "all missing fields => null struct" answer, so pin the conf to
// "true" to keep the expectation valid on both 3.x/4.0 and 4.1+. The non-legacy
// behaviour is covered separately by `issue #4136` in CometNativeReaderSuite.
"spark.sql.legacy.parquet.returnNullStructIfAllFieldsMissing" -> "true") {
val data = Seq(Tuple1((1, "a")), Tuple1((2, null)), Tuple1(null))

val readSchema = new StructType().add(
Expand Down
Loading
Loading