Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
03c85f9
Refactor cast handling and add unit tests
kosiew Jan 23, 2026
e7b7d95
Add CastColumnExpr round-trip test for field changes
kosiew Jan 23, 2026
a1c57ba
proto: Add PhysicalCastOptions and CastColumnExpr serialization
kosiew Jan 23, 2026
745f15d
Add cast_options accessor to CastColumnExpr
kosiew Jan 23, 2026
18bb7d4
Implement fallible CastColumnExpr construction
kosiew Jan 23, 2026
6738e0b
Implement schema-aware CastColumnExpr constructor
kosiew Jan 23, 2026
1737a2e
physical-expr: Normalize CastColumnExpr with default format options
kosiew Jan 23, 2026
7212b8e
Update cast-column roundtrip test for missing options
kosiew Jan 24, 2026
ea9d623
Add build helper for CastColumnExpr setup
kosiew Jan 24, 2026
4c37aa7
Refactor casting options normalization
kosiew Jan 24, 2026
52ad400
Refactor physical expression adapter rewriter
kosiew Jan 24, 2026
8c063f3
Optimize format string handling with caching
kosiew Jan 24, 2026
d809484
Document CastColumnExpr behavior and usage constraints
kosiew Jan 24, 2026
f2c4d62
Refactor format string caching and testing
kosiew Jan 24, 2026
ca35d79
Validate CastColumnExpr columns against input schema
kosiew Jan 24, 2026
5ec4891
Update CastColumnExpr to handle out-of-bounds access
kosiew Jan 24, 2026
536e73a
Fix format-string cache leak and add size cap
kosiew Jan 24, 2026
4a62c9e
Fix format-string cache leak and add size cap
kosiew Jan 24, 2026
6a6cbdb
Optimize format string caching with bounded eviction
kosiew Jan 24, 2026
0426831
Reordered CastColumnExpr::build to validate Column index/field against
kosiew Jan 24, 2026
d4ea139
Tighten Field equality in CastColumnExpr::build
kosiew Jan 24, 2026
9139cce
Improve format string cache error handling and tests
kosiew Jan 24, 2026
d12a07c
cargo fmt
kosiew Jan 24, 2026
04e05b8
Refactor schema rewriter: remove unused Schema import and improve Col…
kosiew Jan 24, 2026
d5cb97b
Fix CastColumnExpr validation for schema adaptation scenarios
kosiew Jan 24, 2026
86509d2
Enhance format string cache tests: add unique string generation and c…
kosiew Jan 24, 2026
7a5298f
fix(collapsible-if): collapse nested if in substitute_oeq_class per c…
kosiew Jan 24, 2026
88624fa
refactor(schema_rewriter): simplify early return logic in rewrite_col…
kosiew Jan 27, 2026
6349f17
refactor(schema_rewriter): simplify column resolution logic and add e…
kosiew Jan 27, 2026
ff4a9f4
refactor(schema_rewriter): simplify expected cast expression construc…
kosiew Jan 27, 2026
1b71e09
refactor(cast_column): relax validation for nullability and metadata …
kosiew Jan 27, 2026
f629bc4
made CastColumnExpr consistent with nested_struct validation by reusi…
kosiew Jan 27, 2026
f0ade2a
refactor(cast_column): improve error messages and validation logic in…
kosiew Jan 27, 2026
882fa7a
doc: Implement string interning for ArrowFormatOptions to ensure life…
kosiew Jan 27, 2026
a054e3e
remove: Delete test_compile.sh script
kosiew Jan 27, 2026
e4e0452
Added shared helpers for field construction and cast expression round…
kosiew Jan 27, 2026
69cfe8c
fix: Update tests for nullability casts
kosiew Jan 27, 2026
ca994e8
rearrange helper functions
kosiew Jan 27, 2026
f37522c
fix: Update logical schema to allow nullable Int32 column
kosiew Jan 27, 2026
d8c5dea
fix: Mark safe and format_options as deprecated in PhysicalCastColumn…
kosiew Jan 27, 2026
b4acfaa
style: Reformat code for improved readability in prost.rs
kosiew Jan 27, 2026
b590ed8
feat: Introduce OwnedFormatOptions and OwnedCastOptions
kosiew Jan 28, 2026
c0fdcf5
feat: Add test script for verifying OwnedCastOptions compilation
kosiew Jan 28, 2026
a7f6015
Refactor cast options to use OwnedCastOptions and lifetime-safe borro…
kosiew Jan 28, 2026
1f6062e
chore: Remove test script for OwnedCastOptions compilation
kosiew Jan 28, 2026
2d114c6
docs: Update comments to clarify the resolution of lifetime mismatche…
kosiew Jan 28, 2026
d95fb9c
Merge branch 'main' into castintegration-17330
kosiew Jan 28, 2026
cf01fcc
clippy fix
kosiew Jan 28, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ pub async fn custom_file_casts() -> Result<()> {

// Create a logical / table schema with an Int32 column
let logical_schema =
Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, true)]));

// Create some data that can be cast (Int16 -> Int32 is widening) and some that cannot (Int64 -> Int32 is narrowing)
let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
Expand Down
207 changes: 207 additions & 0 deletions datafusion/common/src/format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,213 @@ use arrow::util::display::{DurationFormat, FormatOptions};
use crate::config::{ConfigField, Visit};
use crate::error::{DataFusionError, Result};

/// Owned version of Arrow's `FormatOptions` with `String` instead of `&'static str`.
///
/// Arrow's `FormatOptions<'a>` requires borrowed strings with lifetime bounds,
/// and often requires `&'static str` for storage in long-lived types like `CastExpr`.
/// This struct uses owned `String` values instead, allowing dynamic format options
/// to be created from user queries, Protobuf deserialization, or IPC without
/// memory leaks or string interning.
///
/// # Conversion to Arrow Types
///
/// Use the `as_arrow_options()` method to temporarily convert to `FormatOptions<'a>`
/// with borrowed `&str` references for passing to Arrow compute kernels:
///
/// ```ignore
/// let owned_options = OwnedFormatOptions { ... };
/// let arrow_options = owned_options.as_arrow_options(); // borrows owned strings
/// arrow::compute::cast(&array, &data_type, Some(&arrow_options))?;
/// ```
#[derive(Debug, Clone, Eq, PartialEq, Hash)]
pub struct OwnedFormatOptions {
/// String representation of null values
pub null: String,
/// Date format string
pub date_format: Option<String>,
/// Datetime format string
pub datetime_format: Option<String>,
/// Timestamp format string
pub timestamp_format: Option<String>,
/// Timestamp with timezone format string
pub timestamp_tz_format: Option<String>,
/// Time format string
pub time_format: Option<String>,
/// Duration format (owned, since DurationFormat is a simple enum)
pub duration_format: DurationFormat,
/// Include type information in formatted output
pub types_info: bool,
}

impl OwnedFormatOptions {
/// Create a new `OwnedFormatOptions` with default values.
pub fn new() -> Self {
Self::default()
}

/// Set the null string.
pub fn with_null(mut self, null: String) -> Self {
self.null = null;
self
}

/// Set the date format.
pub fn with_date_format(mut self, date_format: Option<String>) -> Self {
self.date_format = date_format;
self
}

/// Set the datetime format.
pub fn with_datetime_format(mut self, datetime_format: Option<String>) -> Self {
self.datetime_format = datetime_format;
self
}

/// Set the timestamp format.
pub fn with_timestamp_format(mut self, timestamp_format: Option<String>) -> Self {
self.timestamp_format = timestamp_format;
self
}

/// Set the timestamp with timezone format.
pub fn with_timestamp_tz_format(
mut self,
timestamp_tz_format: Option<String>,
) -> Self {
self.timestamp_tz_format = timestamp_tz_format;
self
}

/// Set the time format.
pub fn with_time_format(mut self, time_format: Option<String>) -> Self {
self.time_format = time_format;
self
}

/// Set the duration format.
pub fn with_duration_format(mut self, duration_format: DurationFormat) -> Self {
self.duration_format = duration_format;
self
}

/// Set whether to include type information in formatted output.
pub fn with_types_info(mut self, types_info: bool) -> Self {
self.types_info = types_info;
self
}

/// Convert to Arrow's `FormatOptions<'a>` with borrowed references.
///
/// This creates a temporary `FormatOptions` with borrowed `&str` references
/// to the owned strings. The returned options can be passed to Arrow compute
/// kernels. The borrowed references are valid only as long as `self` is alive.
pub fn as_arrow_options<'a>(&'a self) -> FormatOptions<'a> {
FormatOptions::new()
.with_null(self.null.as_str())
.with_date_format(self.date_format.as_deref())
.with_datetime_format(self.datetime_format.as_deref())
.with_timestamp_format(self.timestamp_format.as_deref())
.with_timestamp_tz_format(self.timestamp_tz_format.as_deref())
.with_time_format(self.time_format.as_deref())
.with_duration_format(self.duration_format)
.with_display_error(false) // safe field is handled separately
.with_types_info(self.types_info)
}
}

impl Default for OwnedFormatOptions {
fn default() -> Self {
Self {
null: "NULL".to_string(),
date_format: None,
datetime_format: None,
timestamp_format: None,
timestamp_tz_format: None,
time_format: None,
duration_format: DurationFormat::Pretty,
types_info: false,
}
}
}

/// Owned version of Arrow's `CastOptions` with `OwnedFormatOptions` instead of `FormatOptions<'static>`.
///
/// Arrow's `CastOptions<'static>` requires `FormatOptions<'static>`, which mandates
/// `&'static str` references. This struct uses `OwnedFormatOptions` with `String` values,
/// allowing dynamic cast options to be created without memory leaks.
///
/// # Conversion to Arrow Types
///
/// Use the `as_arrow_options()` method to temporarily convert to `CastOptions<'a>`
/// with borrowed references for passing to Arrow compute kernels:
///
/// ```ignore
/// let owned_options = OwnedCastOptions { ... };
/// let arrow_options = owned_options.as_arrow_options(); // borrows owned strings
/// arrow::compute::cast(&array, &data_type, Some(&arrow_options))?;
/// ```
#[derive(Debug, Clone, Eq, PartialEq, Hash, Default)]
pub struct OwnedCastOptions {
/// Whether to use safe casting (return errors instead of overflowing)
pub safe: bool,
/// Format options for string output
pub format_options: OwnedFormatOptions,
}

impl OwnedCastOptions {
/// Create a new `OwnedCastOptions` with default values.
pub fn new(safe: bool) -> Self {
Self {
safe,
format_options: OwnedFormatOptions::default(),
}
}

/// Create a new `OwnedCastOptions` from an Arrow `CastOptions`.
pub fn from_arrow_options(options: &CastOptions<'_>) -> Self {
Self {
safe: options.safe,
format_options: OwnedFormatOptions {
null: options.format_options.null().to_string(),
date_format: options
.format_options
.date_format()
.map(ToString::to_string),
datetime_format: options
.format_options
.datetime_format()
.map(ToString::to_string),
timestamp_format: options
.format_options
.timestamp_format()
.map(ToString::to_string),
timestamp_tz_format: options
.format_options
.timestamp_tz_format()
.map(ToString::to_string),
time_format: options
.format_options
.time_format()
.map(ToString::to_string),
duration_format: options.format_options.duration_format(),
types_info: options.format_options.types_info(),
},
}
}

/// Convert to Arrow's `CastOptions<'a>` with borrowed references.
///
/// This creates a temporary `CastOptions` with borrowed `&str` references
/// to the owned strings. The returned options can be passed to Arrow compute
/// kernels. The borrowed references are valid only as long as `self` is alive.
pub fn as_arrow_options<'a>(&'a self) -> CastOptions<'a> {
CastOptions {
safe: self.safe,
format_options: self.format_options.as_arrow_options(),
}
}
}

/// The default [`FormatOptions`] to use within DataFusion
/// Also see [`crate::config::FormatOptions`]
pub const DEFAULT_FORMAT_OPTIONS: FormatOptions<'static> =
Expand Down
1 change: 1 addition & 0 deletions datafusion/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ pub use file_options::file_type::{
DEFAULT_ARROW_EXTENSION, DEFAULT_AVRO_EXTENSION, DEFAULT_CSV_EXTENSION,
DEFAULT_JSON_EXTENSION, DEFAULT_PARQUET_EXTENSION, GetExt,
};
pub use format::{OwnedCastOptions, OwnedFormatOptions};
pub use functional_dependencies::{
Constraint, Constraints, Dependency, FunctionalDependence, FunctionalDependencies,
aggregate_functional_dependencies, get_required_group_by_exprs_indices,
Expand Down
10 changes: 9 additions & 1 deletion datafusion/common/src/nested_struct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,15 @@ pub fn validate_struct_compatibility(
Ok(())
}

fn validate_field_compatibility(
/// Validate that a field can be cast from source to target type.
///
/// This function checks:
/// - Nullability compatibility: cannot cast nullable → non-nullable
/// - Data type castability using Arrow's can_cast_types
/// - Recursive validation for nested struct types
///
/// This validation is used for both top-level fields and nested struct fields.
pub fn validate_field_compatibility(
source_field: &Field,
target_field: &Field,
) -> Result<()> {
Expand Down
2 changes: 1 addition & 1 deletion datafusion/common/src/scalar/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3678,7 +3678,7 @@ impl ScalarValue {
pub fn cast_to_with_options(
&self,
target_type: &DataType,
cast_options: &CastOptions<'static>,
cast_options: &CastOptions<'_>,
) -> Result<Self> {
let source_type = self.data_type();
if let Some(multiplier) = date_to_timestamp_multiplier(&source_type, target_type)
Expand Down
17 changes: 15 additions & 2 deletions datafusion/core/src/datasource/physical_plan/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,8 @@ mod tests {
array: ArrayRef,
) -> RecordBatch {
let mut fields = SchemaBuilder::from(batch.schema().fields());
fields.push(Field::new(field_name, array.data_type().clone(), true));
let nullable = array.null_count() > 0;
fields.push(Field::new(field_name, array.data_type().clone(), nullable));
let schema = Arc::new(fields.finish());

let mut columns = batch.columns().to_vec();
Expand Down Expand Up @@ -1135,12 +1136,24 @@ mod tests {
let batch3 = create_batch(vec![("c1", c1.clone()), ("c2", c2.clone())]);

// batch4 (has c2, c1) -- different column order, should still prune
let batch4 = create_batch(vec![("c2", c2), ("c1", c1)]);
let batch4 = create_batch(vec![
// Ensure c1 appears in this batch to avoid non-nullable missing column errors
("c1", c1.clone()),
("c2", c2),
]);

let filter = col("c2").eq(lit(1_i64));

// Provide a nullable logical schema so missing columns across batches
// are filled with nulls rather than treated as non-nullable.
let table_schema = Arc::new(Schema::new(vec![
Field::new("c1", DataType::Utf8, true),
Field::new("c2", DataType::Int64, true),
]));

// read/write them files:
let rt = RoundTrip::new()
.with_table_schema(table_schema)
.with_predicate(filter)
.with_page_index_predicate()
.round_trip(vec![batch1, batch2, batch3, batch4])
Expand Down
10 changes: 5 additions & 5 deletions datafusion/core/tests/parquet/expr_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ async fn test_custom_schema_adapter_and_custom_expression_adapter() {
write_parquet(batch, store.clone(), path).await;

let table_schema = Arc::new(Schema::new(vec![
Field::new("c1", DataType::Int64, false),
Field::new("c1", DataType::Int64, true),
Field::new("c2", DataType::Utf8, true),
]));

Expand Down Expand Up @@ -234,9 +234,9 @@ async fn test_physical_expr_adapter_with_non_null_defaults() {

// Table schema has additional columns c2 (Utf8) and c3 (Int64) that don't exist in file
let table_schema = Arc::new(Schema::new(vec![
Field::new("c1", DataType::Int64, false), // type differs from file (Int32 vs Int64)
Field::new("c2", DataType::Utf8, true), // missing from file
Field::new("c3", DataType::Int64, true), // missing from file
Field::new("c1", DataType::Int64, true), // type differs from file (Int32 vs Int64)
Field::new("c2", DataType::Utf8, true), // missing from file
Field::new("c3", DataType::Int64, true), // missing from file
]));

let mut cfg = SessionConfig::new()
Expand Down Expand Up @@ -343,7 +343,7 @@ async fn test_physical_expr_adapter_factory_reuse_across_tables() {

// Table schema has additional columns that don't exist in files
let table_schema = Arc::new(Schema::new(vec![
Field::new("c1", DataType::Int64, false),
Field::new("c1", DataType::Int64, true),
Field::new("c2", DataType::Utf8, true), // missing from files
]));

Expand Down
2 changes: 1 addition & 1 deletion datafusion/datasource-parquet/src/row_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -728,7 +728,7 @@ mod test {
let table_schema = Schema::new(vec![Field::new(
"timestamp_col",
DataType::Timestamp(Nanosecond, Some(Arc::from("UTC"))),
false,
true,
)]);

// Test all should fail
Expand Down
11 changes: 6 additions & 5 deletions datafusion/expr-common/src/columnar_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,16 +288,17 @@ impl ColumnarValue {
pub fn cast_to(
&self,
cast_type: &DataType,
cast_options: Option<&CastOptions<'static>>,
cast_options: Option<&CastOptions<'_>>,
) -> Result<ColumnarValue> {
let cast_options = cast_options.cloned().unwrap_or(DEFAULT_CAST_OPTIONS);
// Use provided options when available; otherwise fallback to global default
let cast_options = cast_options.unwrap_or(&DEFAULT_CAST_OPTIONS);
match self {
ColumnarValue::Array(array) => {
let casted = cast_array_by_name(array, cast_type, &cast_options)?;
let casted = cast_array_by_name(array, cast_type, cast_options)?;
Ok(ColumnarValue::Array(casted))
}
ColumnarValue::Scalar(scalar) => Ok(ColumnarValue::Scalar(
scalar.cast_to_with_options(cast_type, &cast_options)?,
scalar.cast_to_with_options(cast_type, cast_options)?,
)),
}
}
Expand All @@ -306,7 +307,7 @@ impl ColumnarValue {
fn cast_array_by_name(
array: &ArrayRef,
cast_type: &DataType,
cast_options: &CastOptions<'static>,
cast_options: &CastOptions<'_>,
) -> Result<ArrayRef> {
// If types are already equal, no cast needed
if array.data_type() == cast_type {
Expand Down
Loading