fix: reject incompatible decimal precision/scale in native_datafusion scan#4090
Conversation
… scan The native_datafusion Spark physical expression adapter previously fell through to a Spark Cast for decimal-to-decimal type changes, which silently rescales or truncates values that should have raised an error. Mirror Spark's TypeUtil.isDecimalTypeMatched (Spark 3.x rule) by rejecting reads where the target precision is smaller than the source precision or the scales differ. Closes apache#4089.
1194d82 to
99e1235
Compare
|
In a case where we expect an exception to be generated anyway, can we catch this at CometScanRule rather than going all the way to serialization and native operators? |
I think the issue is that we do not know the types of all the parquet files until runtime? |
IIRC from looking at this a while back, Spark has read the physical schema already, but thrown it away by the time our Comet rules run with no good way to get it again. I'm not opposed to handling it this way, just wanted to think through if we could catch it earlier. It's also a fairly uncommon scenario. |
I do think this is an edge case that is fairly unlikely IRL because it only happens when the user provides a schema that is incompatible with the file schema |
|
Thanks @andygrove! I double-checked Spark's logic and the scale-narrowing guard looks right for preventing silent truncation. Spark's |
Per review feedback, document that scale widening (e.g. Decimal(10,2) read as Decimal(10,4)) is also rejected by Spark's vectorized reader via isDecimalTypeMatched but is allowed here because the cast is lossless. Reference Spark's ParquetVectorUpdaterFactory check directly in the comment so future readers don't need to consult the PR description.
|
Expanded the comment in |
…l-precision # Conflicts: # native/core/src/parquet/schema_adapter.rs # spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala
|
Merged. Thanks @mbutrovich |
…4090, apache#4091 Cases 4 (Decimal(10,2)->Decimal(5,0)) and 6 (STRING->INT) now throw SparkException on native_datafusion after the schema adapter rejection fixes landed on main. Update assertions and the behavior matrix.
Which issue does this PR close?
Closes #4089.
Rationale for this change
When the
native_datafusionscan reads a Parquet column whose physical type isDecimal(p1, s1)under a requested read schema ofDecimal(p2, s2)withs2 < s1, the existing schema adapter falls through to Spark'sCastexpression.Casthappily truncates fractional digits, producing wrong values silently. Spark's vectorized reader rejects this withSchemaColumnConvertNotSupportedException, andnative_iceberg_compatalready does the same viaTypeUtil.checkParquetType. The native scan should match.What changes are included in this PR?
native/core/src/parquet/schema_adapter.rs: inreplace_with_spark_cast, add a guard before the existing branches that returnsDataFusionError::Planwhen bothphysical_typeandtarget_typeareDecimal128and the target scale is smaller than the source scale.The check is intentionally narrow:
Decimal(5,2)read asDecimal(3,2)) are still allowed and fall through to Spark'sCast, which produces null on per-value overflow. This matches Spark 4.0's parquet-mr fallback behavior exercised byParquetTypeWideningSuite'sparquet decimal type change Decimal(5, 2) -> Decimal(3, 2) overflows with parquet-mrtest.How are these changes tested?
Added a focused test to
ParquetReadSuite:native_datafusion rejects incompatible decimal precision/scale. It writesDecimal(10, 2)data, reads it underDecimal(5, 0)(scale narrowed from 2 to 0), forcesspark.comet.scan.impl=native_datafusionandspark.sql.sources.useV1SourceList=parquet, and assertscollect()raisesSparkException. Verified againstParquetReadV1Suite(44 tests, all pass; 1 pre-existing test ignored).The behavior is also covered by the per-impl matrix added in #4087 (
decimal(10,2) read as decimal(5,0): native_datafusion), whose assertion will need flipping from "succeeds" to "throws" once that PR merges.