Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions native/core/src/parquet/schema_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,42 @@ impl SparkPhysicalExprAdapter {
)));
}

// Decimal-to-decimal scale-narrowing check.
// Reject reads where the read schema has a smaller scale than the
// file's, because Spark's Cast below would silently truncate
// fractional digits, producing wrong values. This matches the
// unconditionally-lossy case in issue #4089 (e.g. Decimal(10,2) read
// as Decimal(5,0)).
//
// Other decimal mismatches are intentionally NOT rejected here,
// even though Spark's vectorized reader would reject them via
// `ParquetVectorUpdaterFactory#isDecimalTypeMatched` (which requires
// exact precision and scale):
//
// - Precision-only changes with the same scale (e.g. Decimal(5,2)
// read as Decimal(3,2)): Spark 4.0's parquet-mr fallback path
// (PARQUET_VECTORIZED_READER_ENABLED=false) and the vectorized
// type-widening path produce null on per-value overflow, which
// DataFusion's cast already does in the adapting-schema path.
//
// - Scale widening (e.g. Decimal(10,2) read as Decimal(10,4)): the
// cast is lossless (no truncation, no overflow), so allowing it
// here is strictly more permissive than Spark's vectorized reader
// without risking wrong values.
if let (DataType::Decimal128(_src_p, src_s), DataType::Decimal128(_dst_p, dst_s)) =
(physical_type, target_type)
{
if dst_s < src_s {
return Err(DataFusionError::Plan(format!(
"Parquet column cannot be converted. Column: [{}], \
Expected: {}, Found: {}",
cast.input_field().name(),
target_type,
physical_type,
)));
}
}

// For complex nested types (Struct, List, Map), Timestamp timezone
// mismatches, and Timestamp→Int64 (nanosAsLong), use CometCastColumnExpr
// with spark_parquet_convert which handles field-name-based selection,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1022,6 +1022,29 @@ abstract class ParquetReadSuite extends CometTestBase {
}
}

test("native_datafusion rejects incompatible decimal precision/scale") {
// Regression guard for https://github.com/apache/datafusion-comet/issues/4089.
// Reading Decimal(10,2) under a Decimal(5,0) read schema is unconditionally
// lossy: target precision is smaller than source precision and scales differ.
// Spark's vectorized reader throws SchemaColumnConvertNotSupportedException
// here on all versions. The native_datafusion scan must reject this in its
// schema adapter rather than letting Spark Cast silently rescale/truncate.
withSQLConf(
CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_DATAFUSION,
SQLConf.USE_V1_SOURCE_LIST.key -> "parquet") {
withTempPath { dir =>
val path = dir.getCanonicalPath
spark
.sql("select cast('123.45' as decimal(10,2)) as d " +
"union all select cast('67.89' as decimal(10,2))")
.write
.parquet(path)
val df = spark.read.schema("d decimal(5,0)").parquet(path)
assertThrows[SparkException](df.collect())
}
}
}

test("type widening: byte → short/int/long, short → int/long, int → long") {
withSQLConf(CometConf.COMET_SCHEMA_EVOLUTION_ENABLED.key -> "true") {
withTempPath { dir =>
Expand Down
Loading