From b6668e1baabeded627f41915f1dad96e409e8a29 Mon Sep 17 00:00:00 2001 From: Ethan Urbanski Date: Thu, 29 Jan 2026 20:15:03 -0500 Subject: [PATCH 1/5] test: add input_file_name sqllogictest --- .../test_files/input_file_name.slt | 68 +++++++++++++++++++ 1 file changed, 68 insertions(+) create mode 100644 datafusion/sqllogictest/test_files/input_file_name.slt diff --git a/datafusion/sqllogictest/test_files/input_file_name.slt b/datafusion/sqllogictest/test_files/input_file_name.slt new file mode 100644 index 000000000000..8364ef001c95 --- /dev/null +++ b/datafusion/sqllogictest/test_files/input_file_name.slt @@ -0,0 +1,68 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +########## +## input_file_name() tests +########## + +statement ok +CREATE EXTERNAL TABLE t(c1 INT, c2 INT, c3 BOOLEAN) +STORED AS CSV +LOCATION '../core/tests/data/partitioned_csv' +OPTIONS ('format.has_header' 'false'); + +query III +SELECT + CASE + WHEN input_file_name() LIKE '%partition-0.csv' THEN 0 + WHEN input_file_name() LIKE '%partition-1.csv' THEN 1 + WHEN input_file_name() LIKE '%partition-2.csv' THEN 2 + WHEN input_file_name() LIKE '%partition-3.csv' THEN 3 + ELSE -1 + END AS file_id, + c1, + c2 +FROM t +ORDER BY c2, c1 +LIMIT 8; +---- +0 0 0 +1 1 0 +2 2 0 +3 3 0 +0 0 1 +1 1 1 +2 2 1 +3 3 1 + +statement error +SELECT input_file_name() FROM (VALUES (1)) v(x); + +statement ok +CREATE EXTERNAL TABLE t1(a INT) +STORED AS CSV +LOCATION '../core/tests/data/partitioned_csv' +OPTIONS ('format.has_header' 'false'); + +statement ok +CREATE EXTERNAL TABLE t2(a INT) +STORED AS CSV +LOCATION '../core/tests/data/partitioned_csv' +OPTIONS ('format.has_header' 'false'); + +statement error .*input_file_name\(\) cannot be used with joins.* +SELECT input_file_name() FROM t1 JOIN t2 ON t1.a = t2.a LIMIT 1; From c8e07022512262fbdf62259160f6e941b5b936b3 Mon Sep 17 00:00:00 2001 From: Ethan Urbanski Date: Thu, 29 Jan 2026 20:15:38 -0500 Subject: [PATCH 2/5] feat: add input_file_name() scalar udf stub --- datafusion/common/src/lib.rs | 1 + datafusion/common/src/metadata_columns.rs | 73 +++++++++++++++ .../functions/src/core/input_file_name.rs | 92 +++++++++++++++++++ datafusion/functions/src/core/mod.rs | 6 ++ 4 files changed, 172 insertions(+) create mode 100644 datafusion/common/src/metadata_columns.rs create mode 100644 datafusion/functions/src/core/input_file_name.rs diff --git a/datafusion/common/src/lib.rs b/datafusion/common/src/lib.rs index fdd04f752455..f894fe38d07b 100644 --- a/datafusion/common/src/lib.rs +++ b/datafusion/common/src/lib.rs @@ -48,6 +48,7 @@ pub mod format; pub mod hash_utils; pub mod instant; pub mod metadata; +pub mod metadata_columns; pub mod nested_struct; mod null_equality; pub mod parquet_config; diff --git a/datafusion/common/src/metadata_columns.rs b/datafusion/common/src/metadata_columns.rs new file mode 100644 index 000000000000..74cc36c5b058 --- /dev/null +++ b/datafusion/common/src/metadata_columns.rs @@ -0,0 +1,73 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use arrow::datatypes::{DataType, Field}; + +use crate::{DFSchema, Result, TableReference}; + +/// Reserved metadata column names used internally by DataFusion. +pub const INPUT_FILE_NAME_COL: &str = "__datafusion_input_file_name"; + +pub fn append_input_file_name_field( + schema: &DFSchema, + qualifier: Option, +) -> Result { + if schema + .fields() + .iter() + .any(|field| field.name() == INPUT_FILE_NAME_COL) + { + return Ok(schema.clone()); + } + + let mut fields: Vec<(Option, Arc)> = schema + .iter() + .map(|(qualifier, field)| (qualifier.cloned(), Arc::clone(field))) + .collect(); + + fields.push(( + qualifier, + Arc::new(Field::new(INPUT_FILE_NAME_COL, DataType::Utf8, false)), + )); + + let mut new_schema = DFSchema::new_with_metadata(fields, schema.metadata().clone())?; + new_schema = new_schema + .with_functional_dependencies(schema.functional_dependencies().clone())?; + Ok(new_schema) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn append_input_file_name_field_appends_and_dedups() -> Result<()> { + let fields = vec![(None, Arc::new(Field::new("c1", DataType::Int32, false)))]; + let schema = DFSchema::new_with_metadata(fields, Default::default())?; + + let updated = append_input_file_name_field(&schema, None)?; + assert_eq!(updated.fields().len(), 2); + assert_eq!(updated.field(1).name(), INPUT_FILE_NAME_COL); + + let deduped = append_input_file_name_field(&updated, None)?; + assert_eq!(deduped.fields().len(), 2); + + Ok(()) + } +} diff --git a/datafusion/functions/src/core/input_file_name.rs b/datafusion/functions/src/core/input_file_name.rs new file mode 100644 index 000000000000..aded49b46704 --- /dev/null +++ b/datafusion/functions/src/core/input_file_name.rs @@ -0,0 +1,92 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! [`InputFileNameFunc`]: Implementation of the `input_file_name` function. + +use arrow::datatypes::DataType; +use datafusion_common::{Result, exec_err, utils::take_function_args}; +use datafusion_doc::Documentation; +use datafusion_expr::{ + ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility, +}; +use datafusion_macros::user_doc; +use std::any::Any; + +#[user_doc( + doc_section(label = "Other Functions"), + description = r#"Returns the path of the input file that produced the current row. + +Note: file paths/URIs may be sensitive metadata depending on your environment. + +DataFrame example: +```rust +use datafusion::functions::expr_fn::input_file_name; + +// df: DataFrame +let df = df.select(vec![input_file_name()])?; +```"#, + syntax_example = "input_file_name()", + sql_example = r#"```sql +SELECT input_file_name() FROM t; +```"# +)] +#[derive(Debug, PartialEq, Eq, Hash)] +pub struct InputFileNameFunc { + signature: Signature, +} + +impl Default for InputFileNameFunc { + fn default() -> Self { + Self::new() + } +} + +impl InputFileNameFunc { + pub fn new() -> Self { + Self { + signature: Signature::nullary(Volatility::Volatile), + } + } +} + +impl ScalarUDFImpl for InputFileNameFunc { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "input_file_name" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, args: &[DataType]) -> Result { + let [] = take_function_args(self.name(), args)?; + Ok(DataType::Utf8) + } + + fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { + let [] = take_function_args(self.name(), args.args)?; + exec_err!("input_file_name() must be planned as a file-scan metadata column") + } + + fn documentation(&self) -> Option<&Documentation> { + self.doc() + } +} diff --git a/datafusion/functions/src/core/mod.rs b/datafusion/functions/src/core/mod.rs index a14d56373724..6cf04995ea89 100644 --- a/datafusion/functions/src/core/mod.rs +++ b/datafusion/functions/src/core/mod.rs @@ -28,6 +28,7 @@ pub mod expr_ext; pub mod getfield; pub mod greatest; mod greatest_least_utils; +pub mod input_file_name; pub mod least; pub mod named_struct; pub mod nullif; @@ -57,6 +58,7 @@ make_udf_function!(union_extract::UnionExtractFun, union_extract); make_udf_function!(union_tag::UnionTagFunc, union_tag); make_udf_function!(version::VersionFunc, version); make_udf_function!(arrow_metadata::ArrowMetadataFunc, arrow_metadata); +make_udf_function!(input_file_name::InputFileNameFunc, input_file_name); pub mod expr_fn { use datafusion_expr::{Expr, Literal}; @@ -113,6 +115,9 @@ pub mod expr_fn { union_tag, "Returns the name of the currently selected field in the union", arg1 + ),( + input_file_name, + "Returns the path of the input file that produced the current row", )); #[doc = "Returns the value of the field with the given name from the struct"] @@ -160,6 +165,7 @@ pub fn functions() -> Vec> { union_extract(), union_tag(), version(), + input_file_name(), r#struct(), ] } From b7742ca40382386405933fdbf76614ddef3da459 Mon Sep 17 00:00:00 2001 From: Ethan Urbanski Date: Thu, 29 Jan 2026 20:15:48 -0500 Subject: [PATCH 3/5] feat: inject input_file_name via file scan extended columns --- .../datasource/src/extended_file_columns.rs | 114 +++++++++ datafusion/datasource/src/file_scan_config.rs | 221 ++++++++++++++++-- datafusion/datasource/src/mod.rs | 1 + .../src/equivalence/properties/mod.rs | 33 +++ 4 files changed, 354 insertions(+), 15 deletions(-) create mode 100644 datafusion/datasource/src/extended_file_columns.rs diff --git a/datafusion/datasource/src/extended_file_columns.rs b/datafusion/datasource/src/extended_file_columns.rs new file mode 100644 index 000000000000..2e3df15b0e0c --- /dev/null +++ b/datafusion/datasource/src/extended_file_columns.rs @@ -0,0 +1,114 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use arrow::array::{ArrayRef, StringArray}; +use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; +use arrow::record_batch::RecordBatch; +use datafusion_common::Result; +use datafusion_common::metadata_columns::INPUT_FILE_NAME_COL; +use futures::{FutureExt, StreamExt}; + +use crate::PartitionedFile; +use crate::file_stream::{FileOpenFuture, FileOpener}; + +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum ExtendedFileColumn { + InputFileName, +} + +pub fn extended_file_column_fields(columns: &[ExtendedFileColumn]) -> Vec { + columns + .iter() + .map(|column| match column { + ExtendedFileColumn::InputFileName => { + Field::new(INPUT_FILE_NAME_COL, DataType::Utf8, false) + } + }) + .collect() +} + +pub fn append_extended_file_columns( + schema: &SchemaRef, + columns: &[ExtendedFileColumn], +) -> SchemaRef { + if columns.is_empty() { + return Arc::clone(schema); + } + + let mut fields: Vec = schema + .fields() + .iter() + .map(|field| field.as_ref().clone()) + .collect(); + fields.extend(extended_file_column_fields(columns)); + + Arc::new(Schema::new_with_metadata(fields, schema.metadata().clone())) +} + +pub(crate) struct ExtendedFileColumnsOpener { + inner: Arc, + columns: Vec, + schema: SchemaRef, +} + +impl ExtendedFileColumnsOpener { + pub fn wrap( + inner: Arc, + columns: Vec, + schema: SchemaRef, + ) -> Arc { + Arc::new(Self { + inner, + columns, + schema, + }) + } +} + +impl FileOpener for ExtendedFileColumnsOpener { + fn open(&self, partitioned_file: PartitionedFile) -> Result { + let file_name = partitioned_file.object_meta.location.to_string(); + let columns = self.columns.clone(); + let schema = Arc::clone(&self.schema); + let inner = self.inner.open(partitioned_file)?; + + Ok(async move { + let stream = inner.await?; + let stream = stream.map(move |batch| { + let batch = batch?; + let num_rows = batch.num_rows(); + let mut arrays: Vec = batch.columns().to_vec(); + + for column in &columns { + let array: ArrayRef = match column { + ExtendedFileColumn::InputFileName => { + Arc::new(StringArray::from(vec![file_name.clone(); num_rows])) + } + }; + arrays.push(array); + } + + Ok(RecordBatch::try_new(Arc::clone(&schema), arrays)?) + }); + + Ok(stream.boxed()) + } + .boxed()) + } +} diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index 51b9ba9e06e9..f3edbf9569f6 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -18,6 +18,9 @@ //! [`FileScanConfig`] to configure scanning of possibly partitioned //! file sources. +use crate::extended_file_columns::{ + ExtendedFileColumn, ExtendedFileColumnsOpener, append_extended_file_columns, +}; use crate::file_groups::FileGroup; use crate::{ PartitionedFile, display::FileGroupsDisplay, file::FileSource, @@ -27,8 +30,10 @@ use crate::{ use arrow::datatypes::FieldRef; use arrow::datatypes::{DataType, Schema, SchemaRef}; use datafusion_common::config::ConfigOptions; +use datafusion_common::metadata_columns::INPUT_FILE_NAME_COL; use datafusion_common::{ - Constraints, Result, ScalarValue, Statistics, internal_datafusion_err, internal_err, + ColumnStatistics, Constraints, Result, ScalarValue, Statistics, + internal_datafusion_err, internal_err, }; use datafusion_execution::{ SendableRecordBatchStream, TaskContext, object_store::ObjectStoreUrl, @@ -169,6 +174,8 @@ pub struct FileScanConfig { /// Expression adapter used to adapt filters and projections that are pushed down into the scan /// from the logical schema to the physical schema of the file. pub expr_adapter_factory: Option>, + /// Additional per-file columns appended to the scan output. + pub extended_file_columns: Vec, /// Unprojected statistics for the table (file schema + partition columns). /// These are projected on-demand via `projected_stats()`. /// @@ -254,6 +261,7 @@ pub struct FileScanConfigBuilder { batch_size: Option, expr_adapter_factory: Option>, partitioned_by_file_group: bool, + extended_file_columns: Vec, } impl FileScanConfigBuilder { @@ -280,6 +288,7 @@ impl FileScanConfigBuilder { batch_size: None, expr_adapter_factory: None, partitioned_by_file_group: false, + extended_file_columns: vec![], } } @@ -475,6 +484,7 @@ impl FileScanConfigBuilder { batch_size, expr_adapter_factory: expr_adapter, partitioned_by_file_group, + extended_file_columns, } = self; let constraints = constraints.unwrap_or_default(); @@ -500,6 +510,7 @@ impl FileScanConfigBuilder { expr_adapter_factory: expr_adapter, statistics, partitioned_by_file_group, + extended_file_columns, } } } @@ -519,6 +530,7 @@ impl From for FileScanConfigBuilder { batch_size: config.batch_size, expr_adapter_factory: config.expr_adapter_factory, partitioned_by_file_group: config.partitioned_by_file_group, + extended_file_columns: config.extended_file_columns, } } } @@ -537,6 +549,15 @@ impl DataSource for FileScanConfig { let source = self.file_source.with_batch_size(batch_size); let opener = source.create_file_opener(object_store, self, partition)?; + let opener = if self.extended_file_columns.is_empty() { + opener + } else { + ExtendedFileColumnsOpener::wrap( + opener, + self.extended_file_columns.clone(), + self.projected_schema()?, + ) + }; let stream = FileStream::new(self, partition, opener, source.metrics())?; Ok(Box::pin(cooperative(stream))) @@ -721,6 +742,24 @@ impl DataSource for FileScanConfig { } } + if !self.extended_file_columns.is_empty() { + match self.projected_schema() { + Ok(schema) => match eq_properties.with_appended_schema(schema) { + Ok(updated) => eq_properties = updated, + Err(e) => { + warn!("Failed to append schema to equivalence properties: {e}"); + #[cfg(debug_assertions)] + panic!("Failed to append schema to equivalence properties: {e}"); + } + }, + Err(e) => { + warn!("Failed to compute projected schema: {e}"); + #[cfg(debug_assertions)] + panic!("Failed to compute projected schema: {e}"); + } + } + } + eq_properties } @@ -729,33 +768,34 @@ impl DataSource for FileScanConfig { } fn partition_statistics(&self, partition: Option) -> Result { - if let Some(partition) = partition { + let base_schema = self.projected_schema_without_extended()?; + let stats = if let Some(partition) = partition { // Get statistics for a specific partition // Note: FileGroup statistics include partition columns (computed from partition_values) if let Some(file_group) = self.file_groups.get(partition) && let Some(stat) = file_group.file_statistics(None) { // Project the statistics based on the projection - let output_schema = self.projected_schema()?; - return if let Some(projection) = self.file_source.projection() { - projection.project_statistics(stat.clone(), &output_schema) + if let Some(projection) = self.file_source.projection() { + projection.project_statistics(stat.clone(), &base_schema) } else { Ok(stat.clone()) - }; + } + } else { + // If no statistics available for this partition, return unknown + Ok(Statistics::new_unknown(base_schema.as_ref())) } - // If no statistics available for this partition, return unknown - Ok(Statistics::new_unknown(self.projected_schema()?.as_ref())) } else { // Return aggregate statistics across all partitions let statistics = self.statistics(); - let projection = self.file_source.projection(); - let output_schema = self.projected_schema()?; - if let Some(projection) = &projection { - projection.project_statistics(statistics.clone(), &output_schema) + if let Some(projection) = self.file_source.projection() { + projection.project_statistics(statistics.clone(), &base_schema) } else { Ok(statistics) } - } + }?; + + self.append_extended_column_stats(stats) } fn with_fetch(&self, limit: Option) -> Option> { @@ -777,6 +817,14 @@ impl DataSource for FileScanConfig { &self, projection: &ProjectionExprs, ) -> Result>> { + if projection + .as_ref() + .iter() + .any(|expr| expr_contains_input_file_name(&expr.expr)) + { + return Ok(None); + } + match self.file_source.try_pushdown_projection(projection)? { Some(new_source) => { let mut new_file_scan_config = self.clone(); @@ -886,6 +934,20 @@ impl DataSource for FileScanConfig { } } +fn expr_contains_input_file_name(expr: &Arc) -> bool { + if expr + .as_any() + .downcast_ref::() + .is_some_and(|col| col.name() == INPUT_FILE_NAME_COL) + { + return true; + } + + expr.children() + .iter() + .any(|child| expr_contains_input_file_name(child)) +} + impl FileScanConfig { /// Get the file schema (schema of the files without partition columns) pub fn file_schema(&self) -> &SchemaRef { @@ -910,7 +972,7 @@ impl FileScanConfig { } } - pub fn projected_schema(&self) -> Result> { + fn projected_schema_without_extended(&self) -> Result> { let schema = self.file_source.table_schema().table_schema(); match self.file_source.projection() { Some(proj) => Ok(Arc::new(proj.project_schema(schema)?)), @@ -918,6 +980,28 @@ impl FileScanConfig { } } + pub fn projected_schema(&self) -> Result> { + let schema = self.projected_schema_without_extended()?; + Ok(append_extended_file_columns( + &schema, + &self.extended_file_columns, + )) + } + + fn append_extended_column_stats(&self, mut stats: Statistics) -> Result { + if self.extended_file_columns.is_empty() { + return Ok(stats); + } + + stats.column_statistics.extend( + std::iter::repeat_with(ColumnStatistics::new_unknown) + .take(self.extended_file_columns.len()), + ); + stats.calculate_total_byte_size(self.projected_schema()?.as_ref()); + + Ok(stats) + } + fn add_filter_equivalence_info( filter: &Arc, eq_properties: &mut EquivalenceProperties, @@ -1377,13 +1461,18 @@ mod tests { use super::*; use crate::TableSchema; + use crate::extended_file_columns::ExtendedFileColumn; + use crate::file_stream::{FileOpenFuture, FileOpener}; use crate::test_util::col; use crate::{ generate_test_files, test_util::MockSource, tests::aggr_test_schema, verify_sort_integrity, }; - use arrow::datatypes::Field; + use arrow::array::{Int32Array, StringArray}; + use arrow::datatypes::{DataType, Field, Schema}; + use arrow::record_batch::RecordBatch; + use datafusion_common::metadata_columns::INPUT_FILE_NAME_COL; use datafusion_common::stats::Precision; use datafusion_common::{ColumnStatistics, internal_err}; use datafusion_expr::{Operator, SortExpr}; @@ -1391,6 +1480,9 @@ mod tests { use datafusion_physical_expr::expressions::{BinaryExpr, Column, Literal}; use datafusion_physical_expr::projection::ProjectionExpr; use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr; + use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; + use futures::{FutureExt, StreamExt}; + use object_store::ObjectStore; #[test] fn physical_plan_config_no_projection_tab_cols_as_field() { @@ -1421,6 +1513,105 @@ mod tests { ); } + #[derive(Clone)] + struct TestSource { + table_schema: TableSchema, + opener: Arc, + metrics: ExecutionPlanMetricsSet, + } + + impl TestSource { + fn new(table_schema: TableSchema, opener: Arc) -> Self { + Self { + table_schema, + opener, + metrics: ExecutionPlanMetricsSet::new(), + } + } + } + + impl FileSource for TestSource { + fn create_file_opener( + &self, + _object_store: Arc, + _base_config: &FileScanConfig, + _partition: usize, + ) -> Result> { + Ok(Arc::clone(&self.opener)) + } + + fn as_any(&self) -> &dyn Any { + self + } + + fn table_schema(&self) -> &TableSchema { + &self.table_schema + } + + fn with_batch_size(&self, _batch_size: usize) -> Arc { + Arc::new(self.clone()) + } + + fn metrics(&self) -> &ExecutionPlanMetricsSet { + &self.metrics + } + + fn file_type(&self) -> &str { + "test" + } + } + + struct TestOpener { + batch: RecordBatch, + } + + impl FileOpener for TestOpener { + fn open(&self, _partitioned_file: PartitionedFile) -> Result { + let batch = self.batch.clone(); + Ok(async move { + let stream = futures::stream::iter(vec![Ok(batch)]); + Ok(stream.boxed()) + } + .boxed()) + } + } + + #[tokio::test] + async fn extended_file_columns_inject_input_file_name() -> Result<()> { + let file_schema = + Arc::new(Schema::new(vec![Field::new("c1", DataType::Int32, false)])); + let batch = RecordBatch::try_new( + Arc::clone(&file_schema), + vec![Arc::new(Int32Array::from(vec![1]))], + )?; + let opener: Arc = Arc::new(TestOpener { batch }); + + let table_schema = TableSchema::new(Arc::clone(&file_schema), vec![]); + let source: Arc = Arc::new(TestSource::new(table_schema, opener)); + + let mut config = + FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), source) + .with_file(PartitionedFile::new("data/path/file.csv", 0)) + .build(); + config.extended_file_columns = vec![ExtendedFileColumn::InputFileName]; + + let task_ctx = Arc::new(TaskContext::default()); + let mut stream = config.open(0, task_ctx)?; + let output_batch = stream.next().await.unwrap()?; + + assert_eq!(output_batch.schema().fields().len(), 2); + assert_eq!(output_batch.schema().field(1).name(), INPUT_FILE_NAME_COL); + + let file_name_array = output_batch + .column(1) + .as_any() + .downcast_ref::() + .expect("input file name column should be Utf8"); + assert!(file_name_array.value(0).ends_with("file.csv")); + + Ok(()) + } + #[test] fn test_split_groups_by_statistics() -> Result<()> { use chrono::TimeZone; diff --git a/datafusion/datasource/src/mod.rs b/datafusion/datasource/src/mod.rs index f80c9cb0b0da..d3927458c863 100644 --- a/datafusion/datasource/src/mod.rs +++ b/datafusion/datasource/src/mod.rs @@ -30,6 +30,7 @@ pub mod decoder; pub mod display; +pub mod extended_file_columns; pub mod file; pub mod file_compression_type; pub mod file_format; diff --git a/datafusion/physical-expr/src/equivalence/properties/mod.rs b/datafusion/physical-expr/src/equivalence/properties/mod.rs index 996bc4b08fcd..38a46699d034 100644 --- a/datafusion/physical-expr/src/equivalence/properties/mod.rs +++ b/datafusion/physical-expr/src/equivalence/properties/mod.rs @@ -239,6 +239,39 @@ impl EquivalenceProperties { &self.schema } + /// Updates the schema by appending new trailing fields. + /// + /// Validates that the existing schema is a prefix of the new schema. + pub fn with_appended_schema(mut self, new_schema: SchemaRef) -> Result { + let existing_fields = self.schema.fields(); + let new_fields = new_schema.fields(); + + if new_fields.len() < existing_fields.len() { + return plan_err!( + "appended schema has fewer fields ({} < {})", + new_fields.len(), + existing_fields.len() + ); + } + + for (idx, field) in existing_fields.iter().enumerate() { + let new_field = &new_fields[idx]; + if field.name() != new_field.name() + || field.data_type() != new_field.data_type() + || field.is_nullable() != new_field.is_nullable() + { + return plan_err!( + "appended schema mismatch at index {idx}: expected {} got {}", + field, + new_field + ); + } + } + + self.schema = new_schema; + Ok(self) + } + /// Returns a reference to the ordering equivalence class within. pub fn oeq_class(&self) -> &OrderingEquivalenceClass { &self.oeq_class From 8c049a2f706c944025d51cd0d681b7bcda06bcb6 Mon Sep 17 00:00:00 2001 From: Ethan Urbanski Date: Thu, 29 Jan 2026 20:15:57 -0500 Subject: [PATCH 4/5] feat: rewrite input_file_name() to file scan column --- datafusion/core/src/physical_planner.rs | 59 ++++- .../functions/src/core/input_file_name.rs | 9 +- datafusion/optimizer/src/analyzer/mod.rs | 3 + .../src/analyzer/resolve_input_file_name.rs | 218 ++++++++++++++++++ .../optimizer/src/optimize_projections/mod.rs | 73 +++++- 5 files changed, 351 insertions(+), 11 deletions(-) create mode 100644 datafusion/optimizer/src/analyzer/resolve_input_file_name.rs diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index b1aa850284ae..87fa550525d9 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -65,6 +65,7 @@ use datafusion_catalog::ScanArgs; use datafusion_common::Column; use datafusion_common::display::ToStringifiedPlan; use datafusion_common::format::ExplainAnalyzeLevel; +use datafusion_common::metadata_columns::INPUT_FILE_NAME_COL; use datafusion_common::tree_node::{ Transformed, TreeNode, TreeNodeRecursion, TreeNodeVisitor, }; @@ -75,8 +76,11 @@ use datafusion_common::{ use datafusion_common::{ TableReference, assert_eq_or_internal_err, assert_or_internal_err, }; +use datafusion_datasource::extended_file_columns::ExtendedFileColumn; use datafusion_datasource::file_groups::FileGroup; +use datafusion_datasource::file_scan_config::FileScanConfig; use datafusion_datasource::memory::MemorySourceConfig; +use datafusion_datasource::source::DataSourceExec; use datafusion_expr::dml::{CopyTo, InsertOp}; use datafusion_expr::expr::{ AggregateFunction, AggregateFunctionParams, Alias, GroupingSet, NullTreatment, @@ -460,6 +464,7 @@ impl DefaultPhysicalPlanner { projection, filters, fetch, + projected_schema, .. }) => { let source = source_as_provider(source)?; @@ -473,7 +478,59 @@ impl DefaultPhysicalPlanner { .with_filters(Some(&filters_vec)) .with_limit(*fetch); let res = source.scan_with_args(session_state, opts).await?; - Arc::clone(res.plan()) + let plan = Arc::clone(res.plan()); + let projected_has_input_file_name = projected_schema + .fields() + .iter() + .any(|field| field.name() == INPUT_FILE_NAME_COL); + let source_has_input_file_name = source + .schema() + .fields() + .iter() + .any(|field| field.name() == INPUT_FILE_NAME_COL); + let needs_input_file_name = + projected_has_input_file_name && !source_has_input_file_name; + + if needs_input_file_name { + let data_source_exec = plan + .as_any() + .downcast_ref::() + .ok_or_else(|| { + DataFusionError::Plan( + "input_file_name() is only supported for file-backed table scans" + .to_string(), + ) + })?; + + let file_scan_config = data_source_exec + .data_source() + .as_any() + .downcast_ref::() + .ok_or_else(|| { + DataFusionError::Plan( + "input_file_name() is only supported for file-backed table scans" + .to_string(), + ) + })?; + + let mut new_config = file_scan_config.clone(); + if !new_config + .extended_file_columns + .contains(&ExtendedFileColumn::InputFileName) + { + new_config + .extended_file_columns + .push(ExtendedFileColumn::InputFileName); + } + + Arc::new( + data_source_exec + .clone() + .with_data_source(Arc::new(new_config)), + ) + } else { + plan + } } LogicalPlan::Values(Values { values, schema }) => { let exprs = values diff --git a/datafusion/functions/src/core/input_file_name.rs b/datafusion/functions/src/core/input_file_name.rs index aded49b46704..c9298ad6fc07 100644 --- a/datafusion/functions/src/core/input_file_name.rs +++ b/datafusion/functions/src/core/input_file_name.rs @@ -31,14 +31,7 @@ use std::any::Any; description = r#"Returns the path of the input file that produced the current row. Note: file paths/URIs may be sensitive metadata depending on your environment. - -DataFrame example: -```rust -use datafusion::functions::expr_fn::input_file_name; - -// df: DataFrame -let df = df.select(vec![input_file_name()])?; -```"#, +"#, syntax_example = "input_file_name()", sql_example = r#"```sql SELECT input_file_name() FROM t; diff --git a/datafusion/optimizer/src/analyzer/mod.rs b/datafusion/optimizer/src/analyzer/mod.rs index ddb3b828f01d..e28508a0977b 100644 --- a/datafusion/optimizer/src/analyzer/mod.rs +++ b/datafusion/optimizer/src/analyzer/mod.rs @@ -29,6 +29,7 @@ use datafusion_expr::expr_rewriter::FunctionRewrite; use datafusion_expr::{InvariantLevel, LogicalPlan}; use crate::analyzer::resolve_grouping_function::ResolveGroupingFunction; +use crate::analyzer::resolve_input_file_name::ResolveInputFileName; use crate::analyzer::type_coercion::TypeCoercion; use crate::utils::log_plan; @@ -36,6 +37,7 @@ use self::function_rewrite::ApplyFunctionRewrites; pub mod function_rewrite; pub mod resolve_grouping_function; +pub mod resolve_input_file_name; pub mod type_coercion; /// [`AnalyzerRule`]s transform [`LogicalPlan`]s in some way to make @@ -87,6 +89,7 @@ impl Analyzer { pub fn new() -> Self { let rules: Vec> = vec![ Arc::new(ResolveGroupingFunction::new()), + Arc::new(ResolveInputFileName::new()), Arc::new(TypeCoercion::new()), ]; Self::with_rules(rules) diff --git a/datafusion/optimizer/src/analyzer/resolve_input_file_name.rs b/datafusion/optimizer/src/analyzer/resolve_input_file_name.rs new file mode 100644 index 000000000000..031e31b821e7 --- /dev/null +++ b/datafusion/optimizer/src/analyzer/resolve_input_file_name.rs @@ -0,0 +1,218 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Rewrite `input_file_name()` to an internal scan-projected column. + +use std::sync::Arc; + +use datafusion_common::config::ConfigOptions; +use datafusion_common::metadata_columns::{ + INPUT_FILE_NAME_COL, append_input_file_name_field, +}; +use datafusion_common::tree_node::{Transformed, TreeNode}; +use datafusion_common::{Column, Result, plan_err}; +use datafusion_expr::logical_plan::{ + Distinct, Filter, Limit, Sort, SubqueryAlias, TableScan, +}; +use datafusion_expr::{Expr, LogicalPlan, Projection}; + +use super::AnalyzerRule; + +#[derive(Debug, Default)] +pub struct ResolveInputFileName {} + +impl ResolveInputFileName { + pub fn new() -> Self { + Self {} + } +} + +impl AnalyzerRule for ResolveInputFileName { + fn name(&self) -> &str { + "resolve_input_file_name" + } + + fn analyze( + &self, + plan: LogicalPlan, + _options: &ConfigOptions, + ) -> Result { + plan.transform_up_with_subqueries(|plan| self.rewrite_plan(plan)) + .map(|res| res.data) + } +} + +impl ResolveInputFileName { + fn rewrite_plan(&self, plan: LogicalPlan) -> Result> { + match plan { + LogicalPlan::Projection(Projection { + expr, + input, + schema, + .. + }) => { + let mut contains_input_file_name = false; + let rewritten_exprs = expr + .into_iter() + .map(|expr| { + let (expr, rewritten) = self.rewrite_projection_expr(expr)?; + contains_input_file_name |= rewritten; + Ok(expr) + }) + .collect::>>()?; + + if !contains_input_file_name { + let projection = + Projection::try_new_with_schema(rewritten_exprs, input, schema)?; + return Ok(Transformed::no(LogicalPlan::Projection(projection))); + } + + let new_input = Arc::new(self.annotate_table_scan(input.as_ref())?); + let projection = Projection::try_new(rewritten_exprs, new_input)?; + + Ok(Transformed::yes(LogicalPlan::Projection(projection))) + } + _ => { + self.ensure_not_in_non_projection(&plan)?; + Ok(Transformed::no(plan)) + } + } + } + + fn ensure_not_in_non_projection(&self, plan: &LogicalPlan) -> Result<()> { + for expr in plan.expressions() { + if contains_input_file_name(&expr)? { + return plan_err!( + "input_file_name() is only supported in the SELECT list" + ); + } + } + Ok(()) + } + + fn rewrite_projection_expr(&self, expr: Expr) -> Result<(Expr, bool)> { + if is_input_file_name(&expr) { + let rewritten = Expr::Column(Column::from_name(INPUT_FILE_NAME_COL)) + .alias("input_file_name"); + return Ok((rewritten, true)); + } + + let mut found = false; + let transformed = expr.transform_up(|expr| { + if is_input_file_name(&expr) { + found = true; + Ok(Transformed::yes(Expr::Column(Column::from_name( + INPUT_FILE_NAME_COL, + )))) + } else { + Ok(Transformed::no(expr)) + } + })?; + + Ok((transformed.data, found)) + } + + fn annotate_table_scan(&self, plan: &LogicalPlan) -> Result { + match plan { + LogicalPlan::TableScan(scan) => self.rewrite_table_scan(scan), + LogicalPlan::SubqueryAlias(SubqueryAlias { input, alias, .. }) => { + let new_input = Arc::new(self.annotate_table_scan(input.as_ref())?); + let alias = SubqueryAlias::try_new(new_input, alias.clone())?; + Ok(LogicalPlan::SubqueryAlias(alias)) + } + LogicalPlan::Filter(Filter { + predicate, input, .. + }) => { + let new_input = Arc::new(self.annotate_table_scan(input.as_ref())?); + let filter = Filter::try_new(predicate.clone(), new_input)?; + Ok(LogicalPlan::Filter(filter)) + } + LogicalPlan::Sort(Sort { expr, input, fetch }) => { + let new_input = Arc::new(self.annotate_table_scan(input.as_ref())?); + Ok(LogicalPlan::Sort(Sort { + expr: expr.clone(), + input: new_input, + fetch: *fetch, + })) + } + LogicalPlan::Limit(Limit { skip, fetch, input }) => { + let new_input = Arc::new(self.annotate_table_scan(input.as_ref())?); + Ok(LogicalPlan::Limit(Limit { + skip: skip.clone(), + fetch: fetch.clone(), + input: new_input, + })) + } + LogicalPlan::Distinct(Distinct::All(input)) => { + let new_input = Arc::new(self.annotate_table_scan(input.as_ref())?); + Ok(LogicalPlan::Distinct(Distinct::All(new_input))) + } + LogicalPlan::Distinct(Distinct::On(_)) => { + plan_err!("input_file_name() is not supported with DISTINCT ON") + } + LogicalPlan::Join(_) => plan_err!( + "input_file_name() cannot be used with joins - the file source would be ambiguous. Use input_file_name() in a subquery on a single table instead." + ), + _ => plan_err!( + "input_file_name() is only supported for file-backed table scans" + ), + } + } + + fn rewrite_table_scan(&self, scan: &TableScan) -> Result { + if scan + .source + .schema() + .fields() + .iter() + .any(|field| field.name() == INPUT_FILE_NAME_COL) + { + return plan_err!( + "input_file_name() cannot be used because the table schema already contains '{INPUT_FILE_NAME_COL}'" + ); + } + + if scan + .projected_schema + .fields() + .iter() + .any(|field| field.name() == INPUT_FILE_NAME_COL) + { + return Ok(LogicalPlan::TableScan(scan.clone())); + } + + let new_schema = append_input_file_name_field( + scan.projected_schema.as_ref(), + Some(scan.table_name.clone()), + )?; + + let mut new_scan = scan.clone(); + new_scan.projected_schema = Arc::new(new_schema); + Ok(LogicalPlan::TableScan(new_scan)) + } +} + +fn is_input_file_name(expr: &Expr) -> bool { + matches!( + expr, + Expr::ScalarFunction(func) if func.name() == "input_file_name" && func.args.is_empty() + ) +} + +fn contains_input_file_name(expr: &Expr) -> Result { + expr.exists(|expr| Ok(is_input_file_name(expr))) +} diff --git a/datafusion/optimizer/src/optimize_projections/mod.rs b/datafusion/optimizer/src/optimize_projections/mod.rs index f97b05ea68fb..9c40cf3f9d1a 100644 --- a/datafusion/optimizer/src/optimize_projections/mod.rs +++ b/datafusion/optimizer/src/optimize_projections/mod.rs @@ -24,6 +24,9 @@ use crate::{OptimizerConfig, OptimizerRule}; use std::collections::HashSet; use std::sync::Arc; +use datafusion_common::metadata_columns::{ + INPUT_FILE_NAME_COL, append_input_file_name_field, +}; use datafusion_common::{ Column, DFSchema, HashMap, JoinType, Result, assert_eq_or_internal_err, get_required_group_by_exprs_indices, internal_datafusion_err, internal_err, @@ -259,18 +262,60 @@ fn optimize_projections( projection, filters, fetch, - projected_schema: _, + projected_schema, } = table_scan; + let source_has_input_file_name = source + .schema() + .fields() + .iter() + .any(|field| field.name() == INPUT_FILE_NAME_COL); + + let input_file_name_idx = if source_has_input_file_name { + None + } else { + projected_schema + .fields() + .iter() + .position(|field| field.name() == INPUT_FILE_NAME_COL) + }; + + let (needs_input_file_name, indices) = + if let Some(input_file_name_idx) = input_file_name_idx { + let needs_input_file_name = indices + .indices() + .binary_search(&input_file_name_idx) + .is_ok(); + let indices = RequiredIndices::new_from_indices( + indices + .into_inner() + .into_iter() + .filter(|idx| *idx != input_file_name_idx) + .collect(), + ); + (needs_input_file_name, indices) + } else { + (false, indices) + }; + // Get indices referred to in the original (schema with all fields) // given projected indices. let projection = match &projection { Some(projection) => indices.into_mapped_indices(|idx| projection[idx]), None => indices.into_inner(), }; - let new_scan = + let qualifier = table_name.clone(); + let mut new_scan = TableScan::try_new(table_name, source, Some(projection), filters, fetch)?; + if needs_input_file_name { + let new_schema = append_input_file_name_field( + new_scan.projected_schema.as_ref(), + Some(qualifier), + )?; + new_scan.projected_schema = Arc::new(new_schema); + } + return Ok(Transformed::yes(LogicalPlan::TableScan(new_scan))); } // Other node types are handled below @@ -946,6 +991,7 @@ mod tests { }; use crate::{OptimizerContext, OptimizerRule}; use arrow::datatypes::{DataType, Field, Schema}; + use datafusion_common::metadata_columns::INPUT_FILE_NAME_COL; use datafusion_common::{ Column, DFSchema, DFSchemaRef, JoinType, Result, TableReference, }; @@ -1183,6 +1229,29 @@ mod tests { ) } + #[test] + fn optimize_projections_keeps_reserved_column_from_source() -> Result<()> { + let schema = Schema::new(vec![ + Field::new(INPUT_FILE_NAME_COL, DataType::Utf8, false), + Field::new("a", DataType::UInt32, false), + ]); + + let plan = table_scan(Some("t"), &schema, None)?.build()?; + let optimized_plan = optimize(plan)?; + + match optimized_plan { + LogicalPlan::TableScan(scan) => { + let projection = scan.projection.expect("projection present"); + assert_eq!(projection, vec![0, 1]); + assert_eq!(scan.projected_schema.field(0).name(), INPUT_FILE_NAME_COL); + assert_eq!(scan.projected_schema.fields().len(), 2); + } + _ => panic!("expected optimized plan to be a TableScan"), + } + + Ok(()) + } + #[test] fn merge_three_projection() -> Result<()> { let table_scan = test_table_scan()?; From 83a13661cdedf4522b009bf1499367fe50737fc9 Mon Sep 17 00:00:00 2001 From: Ethan Urbanski Date: Fri, 30 Jan 2026 16:36:07 -0500 Subject: [PATCH 5/5] feat: rewrite input_file_name() at file opener boundary Signed-off-by: Ethan Urbanski --- datafusion/common/src/lib.rs | 1 - datafusion/common/src/metadata_columns.rs | 73 ------ datafusion/core/src/physical_planner.rs | 59 +---- datafusion/datasource-parquet/src/opener.rs | 115 ++++++++- .../datasource/src/extended_file_columns.rs | 114 --------- datafusion/datasource/src/file_scan_config.rs | 221 ++---------------- datafusion/datasource/src/mod.rs | 1 - datafusion/datasource/src/projection.rs | 143 +++++++++++- .../functions/src/core/input_file_name.rs | 9 +- datafusion/optimizer/src/analyzer/mod.rs | 3 - .../src/analyzer/resolve_input_file_name.rs | 218 ----------------- .../optimizer/src/optimize_projections/mod.rs | 73 +----- .../src/equivalence/properties/mod.rs | 33 --- .../test_files/input_file_name.slt | 19 +- .../source/user-guide/sql/scalar_functions.md | 22 ++ 15 files changed, 303 insertions(+), 801 deletions(-) delete mode 100644 datafusion/common/src/metadata_columns.rs delete mode 100644 datafusion/datasource/src/extended_file_columns.rs delete mode 100644 datafusion/optimizer/src/analyzer/resolve_input_file_name.rs diff --git a/datafusion/common/src/lib.rs b/datafusion/common/src/lib.rs index f894fe38d07b..fdd04f752455 100644 --- a/datafusion/common/src/lib.rs +++ b/datafusion/common/src/lib.rs @@ -48,7 +48,6 @@ pub mod format; pub mod hash_utils; pub mod instant; pub mod metadata; -pub mod metadata_columns; pub mod nested_struct; mod null_equality; pub mod parquet_config; diff --git a/datafusion/common/src/metadata_columns.rs b/datafusion/common/src/metadata_columns.rs deleted file mode 100644 index 74cc36c5b058..000000000000 --- a/datafusion/common/src/metadata_columns.rs +++ /dev/null @@ -1,73 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use std::sync::Arc; - -use arrow::datatypes::{DataType, Field}; - -use crate::{DFSchema, Result, TableReference}; - -/// Reserved metadata column names used internally by DataFusion. -pub const INPUT_FILE_NAME_COL: &str = "__datafusion_input_file_name"; - -pub fn append_input_file_name_field( - schema: &DFSchema, - qualifier: Option, -) -> Result { - if schema - .fields() - .iter() - .any(|field| field.name() == INPUT_FILE_NAME_COL) - { - return Ok(schema.clone()); - } - - let mut fields: Vec<(Option, Arc)> = schema - .iter() - .map(|(qualifier, field)| (qualifier.cloned(), Arc::clone(field))) - .collect(); - - fields.push(( - qualifier, - Arc::new(Field::new(INPUT_FILE_NAME_COL, DataType::Utf8, false)), - )); - - let mut new_schema = DFSchema::new_with_metadata(fields, schema.metadata().clone())?; - new_schema = new_schema - .with_functional_dependencies(schema.functional_dependencies().clone())?; - Ok(new_schema) -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn append_input_file_name_field_appends_and_dedups() -> Result<()> { - let fields = vec![(None, Arc::new(Field::new("c1", DataType::Int32, false)))]; - let schema = DFSchema::new_with_metadata(fields, Default::default())?; - - let updated = append_input_file_name_field(&schema, None)?; - assert_eq!(updated.fields().len(), 2); - assert_eq!(updated.field(1).name(), INPUT_FILE_NAME_COL); - - let deduped = append_input_file_name_field(&updated, None)?; - assert_eq!(deduped.fields().len(), 2); - - Ok(()) - } -} diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 87fa550525d9..b1aa850284ae 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -65,7 +65,6 @@ use datafusion_catalog::ScanArgs; use datafusion_common::Column; use datafusion_common::display::ToStringifiedPlan; use datafusion_common::format::ExplainAnalyzeLevel; -use datafusion_common::metadata_columns::INPUT_FILE_NAME_COL; use datafusion_common::tree_node::{ Transformed, TreeNode, TreeNodeRecursion, TreeNodeVisitor, }; @@ -76,11 +75,8 @@ use datafusion_common::{ use datafusion_common::{ TableReference, assert_eq_or_internal_err, assert_or_internal_err, }; -use datafusion_datasource::extended_file_columns::ExtendedFileColumn; use datafusion_datasource::file_groups::FileGroup; -use datafusion_datasource::file_scan_config::FileScanConfig; use datafusion_datasource::memory::MemorySourceConfig; -use datafusion_datasource::source::DataSourceExec; use datafusion_expr::dml::{CopyTo, InsertOp}; use datafusion_expr::expr::{ AggregateFunction, AggregateFunctionParams, Alias, GroupingSet, NullTreatment, @@ -464,7 +460,6 @@ impl DefaultPhysicalPlanner { projection, filters, fetch, - projected_schema, .. }) => { let source = source_as_provider(source)?; @@ -478,59 +473,7 @@ impl DefaultPhysicalPlanner { .with_filters(Some(&filters_vec)) .with_limit(*fetch); let res = source.scan_with_args(session_state, opts).await?; - let plan = Arc::clone(res.plan()); - let projected_has_input_file_name = projected_schema - .fields() - .iter() - .any(|field| field.name() == INPUT_FILE_NAME_COL); - let source_has_input_file_name = source - .schema() - .fields() - .iter() - .any(|field| field.name() == INPUT_FILE_NAME_COL); - let needs_input_file_name = - projected_has_input_file_name && !source_has_input_file_name; - - if needs_input_file_name { - let data_source_exec = plan - .as_any() - .downcast_ref::() - .ok_or_else(|| { - DataFusionError::Plan( - "input_file_name() is only supported for file-backed table scans" - .to_string(), - ) - })?; - - let file_scan_config = data_source_exec - .data_source() - .as_any() - .downcast_ref::() - .ok_or_else(|| { - DataFusionError::Plan( - "input_file_name() is only supported for file-backed table scans" - .to_string(), - ) - })?; - - let mut new_config = file_scan_config.clone(); - if !new_config - .extended_file_columns - .contains(&ExtendedFileColumn::InputFileName) - { - new_config - .extended_file_columns - .push(ExtendedFileColumn::InputFileName); - } - - Arc::new( - data_source_exec - .clone() - .with_data_source(Arc::new(new_config)), - ) - } else { - plan - } + Arc::clone(res.plan()) } LogicalPlan::Values(Values { values, schema }) => { let exprs = values diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index 6f92d567c830..e0baccce17f9 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -25,7 +25,10 @@ use crate::{ }; use arrow::array::{RecordBatch, RecordBatchOptions}; use arrow::datatypes::DataType; +use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener}; +use datafusion_physical_expr::ScalarFunctionExpr; +use datafusion_physical_expr::expressions::Literal; use datafusion_physical_expr::projection::ProjectionExprs; use datafusion_physical_expr::utils::reassign_expr_columns; use datafusion_physical_expr_adapter::replace_columns_with_literals; @@ -66,6 +69,29 @@ use parquet::arrow::async_reader::AsyncFileReader; use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask}; use parquet::file::metadata::{PageIndexPolicy, ParquetMetaDataReader, RowGroupMetaData}; +fn replace_input_file_name_in_projection( + projection: ProjectionExprs, + file_name: &str, +) -> Result { + let file_name_literal: Arc = + Arc::new(Literal::new(ScalarValue::Utf8(Some(file_name.to_owned())))); + + projection.try_map_exprs(|expr| { + Ok(expr + .transform(|expr| { + if let Some(func) = expr.as_any().downcast_ref::() + && func.fun().name() == "input_file_name" + && func.args().is_empty() + { + return Ok(Transformed::yes(Arc::clone(&file_name_literal))); + } + Ok(Transformed::no(expr)) + }) + .data() + .expect("infallible transform")) + }) +} + /// Implements [`FileOpener`] for a parquet file pub(super) struct ParquetOpener { /// Execution partition index @@ -259,6 +285,8 @@ impl FileOpener for ParquetOpener { .map(|p| replace_columns_with_literals(p, &literal_columns)) .transpose()?; } + // Replace any `input_file_name()` UDFs in the projection with a literal for this file. + projection = replace_input_file_name_in_projection(projection, &file_name)?; let reorder_predicates = self.reorder_filters; let pushdown_filters = self.pushdown_filters; @@ -1014,23 +1042,33 @@ fn should_enable_page_index( #[cfg(test)] mod test { + use std::any::Any; use std::sync::Arc; - use super::{ConstantColumns, constant_columns_from_stats}; + use super::{ + ConstantColumns, constant_columns_from_stats, + replace_input_file_name_in_projection, + }; use crate::{DefaultParquetFileReaderFactory, RowGroupAccess, opener::ParquetOpener}; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use bytes::{BufMut, BytesMut}; + use datafusion_common::config::ConfigOptions; use datafusion_common::{ - ColumnStatistics, DataFusionError, ScalarValue, Statistics, record_batch, + ColumnStatistics, DataFusionError, Result, ScalarValue, Statistics, record_batch, stats::Precision, }; use datafusion_datasource::{PartitionedFile, TableSchema, file_stream::FileOpener}; + use datafusion_expr::{ + ColumnarValue, ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl, Signature, + Volatility, + }; use datafusion_expr::{col, lit}; + use datafusion_physical_expr::ScalarFunctionExpr; use datafusion_physical_expr::{ PhysicalExpr, expressions::{Column, DynamicFilterPhysicalExpr, Literal}, planner::logical2physical, - projection::ProjectionExprs, + projection::{ProjectionExpr, ProjectionExprs}, }; use datafusion_physical_expr_adapter::{ DefaultPhysicalExprAdapterFactory, replace_columns_with_literals, @@ -1041,6 +1079,77 @@ mod test { use parquet::arrow::ArrowWriter; use parquet::file::properties::WriterProperties; + #[derive(Debug, PartialEq, Eq, Hash)] + struct TestInputFileNameUdf { + signature: Signature, + } + + impl TestInputFileNameUdf { + fn new() -> Self { + Self { + signature: Signature::nullary(Volatility::Volatile), + } + } + } + + impl ScalarUDFImpl for TestInputFileNameUdf { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "input_file_name" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, _args: &[DataType]) -> Result { + Ok(DataType::Utf8) + } + + fn invoke_with_args(&self, _args: ScalarFunctionArgs) -> Result { + Ok(ColumnarValue::Scalar(ScalarValue::Utf8(None))) + } + } + + #[test] + fn parquet_opener_replaces_input_file_name_udf_with_literal() -> Result<()> { + let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])); + + let udf = Arc::new(ScalarUDF::new_from_impl(TestInputFileNameUdf::new())); + let udf_expr = Arc::new(ScalarFunctionExpr::try_new( + udf, + vec![], + schema.as_ref(), + Arc::new(ConfigOptions::default()), + )?); + + let projection = ProjectionExprs::new(vec![ + ProjectionExpr::new(Arc::new(Column::new("a", 0)), "a"), + ProjectionExpr::new(udf_expr, "input_file_name"), + ]); + + let file_name = "s3://bucket/data/file.parquet"; + let rewritten = replace_input_file_name_in_projection(projection, file_name)?; + + assert_eq!(rewritten.as_ref().len(), 2); + assert_eq!(rewritten.as_ref()[1].alias, "input_file_name"); + + let expr = &rewritten.as_ref()[1].expr; + let literal = expr + .as_any() + .downcast_ref::() + .expect("expected input_file_name() to be rewritten to a literal"); + assert_eq!( + literal.value(), + &ScalarValue::Utf8(Some(file_name.to_owned())) + ); + + Ok(()) + } + /// Builder for creating [`ParquetOpener`] instances with sensible defaults for tests. /// This helps reduce code duplication and makes it clear what differs between test cases. struct ParquetOpenerBuilder { diff --git a/datafusion/datasource/src/extended_file_columns.rs b/datafusion/datasource/src/extended_file_columns.rs deleted file mode 100644 index 2e3df15b0e0c..000000000000 --- a/datafusion/datasource/src/extended_file_columns.rs +++ /dev/null @@ -1,114 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use std::sync::Arc; - -use arrow::array::{ArrayRef, StringArray}; -use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; -use arrow::record_batch::RecordBatch; -use datafusion_common::Result; -use datafusion_common::metadata_columns::INPUT_FILE_NAME_COL; -use futures::{FutureExt, StreamExt}; - -use crate::PartitionedFile; -use crate::file_stream::{FileOpenFuture, FileOpener}; - -#[derive(Debug, Clone, PartialEq, Eq)] -pub enum ExtendedFileColumn { - InputFileName, -} - -pub fn extended_file_column_fields(columns: &[ExtendedFileColumn]) -> Vec { - columns - .iter() - .map(|column| match column { - ExtendedFileColumn::InputFileName => { - Field::new(INPUT_FILE_NAME_COL, DataType::Utf8, false) - } - }) - .collect() -} - -pub fn append_extended_file_columns( - schema: &SchemaRef, - columns: &[ExtendedFileColumn], -) -> SchemaRef { - if columns.is_empty() { - return Arc::clone(schema); - } - - let mut fields: Vec = schema - .fields() - .iter() - .map(|field| field.as_ref().clone()) - .collect(); - fields.extend(extended_file_column_fields(columns)); - - Arc::new(Schema::new_with_metadata(fields, schema.metadata().clone())) -} - -pub(crate) struct ExtendedFileColumnsOpener { - inner: Arc, - columns: Vec, - schema: SchemaRef, -} - -impl ExtendedFileColumnsOpener { - pub fn wrap( - inner: Arc, - columns: Vec, - schema: SchemaRef, - ) -> Arc { - Arc::new(Self { - inner, - columns, - schema, - }) - } -} - -impl FileOpener for ExtendedFileColumnsOpener { - fn open(&self, partitioned_file: PartitionedFile) -> Result { - let file_name = partitioned_file.object_meta.location.to_string(); - let columns = self.columns.clone(); - let schema = Arc::clone(&self.schema); - let inner = self.inner.open(partitioned_file)?; - - Ok(async move { - let stream = inner.await?; - let stream = stream.map(move |batch| { - let batch = batch?; - let num_rows = batch.num_rows(); - let mut arrays: Vec = batch.columns().to_vec(); - - for column in &columns { - let array: ArrayRef = match column { - ExtendedFileColumn::InputFileName => { - Arc::new(StringArray::from(vec![file_name.clone(); num_rows])) - } - }; - arrays.push(array); - } - - Ok(RecordBatch::try_new(Arc::clone(&schema), arrays)?) - }); - - Ok(stream.boxed()) - } - .boxed()) - } -} diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index f3edbf9569f6..51b9ba9e06e9 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -18,9 +18,6 @@ //! [`FileScanConfig`] to configure scanning of possibly partitioned //! file sources. -use crate::extended_file_columns::{ - ExtendedFileColumn, ExtendedFileColumnsOpener, append_extended_file_columns, -}; use crate::file_groups::FileGroup; use crate::{ PartitionedFile, display::FileGroupsDisplay, file::FileSource, @@ -30,10 +27,8 @@ use crate::{ use arrow::datatypes::FieldRef; use arrow::datatypes::{DataType, Schema, SchemaRef}; use datafusion_common::config::ConfigOptions; -use datafusion_common::metadata_columns::INPUT_FILE_NAME_COL; use datafusion_common::{ - ColumnStatistics, Constraints, Result, ScalarValue, Statistics, - internal_datafusion_err, internal_err, + Constraints, Result, ScalarValue, Statistics, internal_datafusion_err, internal_err, }; use datafusion_execution::{ SendableRecordBatchStream, TaskContext, object_store::ObjectStoreUrl, @@ -174,8 +169,6 @@ pub struct FileScanConfig { /// Expression adapter used to adapt filters and projections that are pushed down into the scan /// from the logical schema to the physical schema of the file. pub expr_adapter_factory: Option>, - /// Additional per-file columns appended to the scan output. - pub extended_file_columns: Vec, /// Unprojected statistics for the table (file schema + partition columns). /// These are projected on-demand via `projected_stats()`. /// @@ -261,7 +254,6 @@ pub struct FileScanConfigBuilder { batch_size: Option, expr_adapter_factory: Option>, partitioned_by_file_group: bool, - extended_file_columns: Vec, } impl FileScanConfigBuilder { @@ -288,7 +280,6 @@ impl FileScanConfigBuilder { batch_size: None, expr_adapter_factory: None, partitioned_by_file_group: false, - extended_file_columns: vec![], } } @@ -484,7 +475,6 @@ impl FileScanConfigBuilder { batch_size, expr_adapter_factory: expr_adapter, partitioned_by_file_group, - extended_file_columns, } = self; let constraints = constraints.unwrap_or_default(); @@ -510,7 +500,6 @@ impl FileScanConfigBuilder { expr_adapter_factory: expr_adapter, statistics, partitioned_by_file_group, - extended_file_columns, } } } @@ -530,7 +519,6 @@ impl From for FileScanConfigBuilder { batch_size: config.batch_size, expr_adapter_factory: config.expr_adapter_factory, partitioned_by_file_group: config.partitioned_by_file_group, - extended_file_columns: config.extended_file_columns, } } } @@ -549,15 +537,6 @@ impl DataSource for FileScanConfig { let source = self.file_source.with_batch_size(batch_size); let opener = source.create_file_opener(object_store, self, partition)?; - let opener = if self.extended_file_columns.is_empty() { - opener - } else { - ExtendedFileColumnsOpener::wrap( - opener, - self.extended_file_columns.clone(), - self.projected_schema()?, - ) - }; let stream = FileStream::new(self, partition, opener, source.metrics())?; Ok(Box::pin(cooperative(stream))) @@ -742,24 +721,6 @@ impl DataSource for FileScanConfig { } } - if !self.extended_file_columns.is_empty() { - match self.projected_schema() { - Ok(schema) => match eq_properties.with_appended_schema(schema) { - Ok(updated) => eq_properties = updated, - Err(e) => { - warn!("Failed to append schema to equivalence properties: {e}"); - #[cfg(debug_assertions)] - panic!("Failed to append schema to equivalence properties: {e}"); - } - }, - Err(e) => { - warn!("Failed to compute projected schema: {e}"); - #[cfg(debug_assertions)] - panic!("Failed to compute projected schema: {e}"); - } - } - } - eq_properties } @@ -768,34 +729,33 @@ impl DataSource for FileScanConfig { } fn partition_statistics(&self, partition: Option) -> Result { - let base_schema = self.projected_schema_without_extended()?; - let stats = if let Some(partition) = partition { + if let Some(partition) = partition { // Get statistics for a specific partition // Note: FileGroup statistics include partition columns (computed from partition_values) if let Some(file_group) = self.file_groups.get(partition) && let Some(stat) = file_group.file_statistics(None) { // Project the statistics based on the projection - if let Some(projection) = self.file_source.projection() { - projection.project_statistics(stat.clone(), &base_schema) + let output_schema = self.projected_schema()?; + return if let Some(projection) = self.file_source.projection() { + projection.project_statistics(stat.clone(), &output_schema) } else { Ok(stat.clone()) - } - } else { - // If no statistics available for this partition, return unknown - Ok(Statistics::new_unknown(base_schema.as_ref())) + }; } + // If no statistics available for this partition, return unknown + Ok(Statistics::new_unknown(self.projected_schema()?.as_ref())) } else { // Return aggregate statistics across all partitions let statistics = self.statistics(); - if let Some(projection) = self.file_source.projection() { - projection.project_statistics(statistics.clone(), &base_schema) + let projection = self.file_source.projection(); + let output_schema = self.projected_schema()?; + if let Some(projection) = &projection { + projection.project_statistics(statistics.clone(), &output_schema) } else { Ok(statistics) } - }?; - - self.append_extended_column_stats(stats) + } } fn with_fetch(&self, limit: Option) -> Option> { @@ -817,14 +777,6 @@ impl DataSource for FileScanConfig { &self, projection: &ProjectionExprs, ) -> Result>> { - if projection - .as_ref() - .iter() - .any(|expr| expr_contains_input_file_name(&expr.expr)) - { - return Ok(None); - } - match self.file_source.try_pushdown_projection(projection)? { Some(new_source) => { let mut new_file_scan_config = self.clone(); @@ -934,20 +886,6 @@ impl DataSource for FileScanConfig { } } -fn expr_contains_input_file_name(expr: &Arc) -> bool { - if expr - .as_any() - .downcast_ref::() - .is_some_and(|col| col.name() == INPUT_FILE_NAME_COL) - { - return true; - } - - expr.children() - .iter() - .any(|child| expr_contains_input_file_name(child)) -} - impl FileScanConfig { /// Get the file schema (schema of the files without partition columns) pub fn file_schema(&self) -> &SchemaRef { @@ -972,7 +910,7 @@ impl FileScanConfig { } } - fn projected_schema_without_extended(&self) -> Result> { + pub fn projected_schema(&self) -> Result> { let schema = self.file_source.table_schema().table_schema(); match self.file_source.projection() { Some(proj) => Ok(Arc::new(proj.project_schema(schema)?)), @@ -980,28 +918,6 @@ impl FileScanConfig { } } - pub fn projected_schema(&self) -> Result> { - let schema = self.projected_schema_without_extended()?; - Ok(append_extended_file_columns( - &schema, - &self.extended_file_columns, - )) - } - - fn append_extended_column_stats(&self, mut stats: Statistics) -> Result { - if self.extended_file_columns.is_empty() { - return Ok(stats); - } - - stats.column_statistics.extend( - std::iter::repeat_with(ColumnStatistics::new_unknown) - .take(self.extended_file_columns.len()), - ); - stats.calculate_total_byte_size(self.projected_schema()?.as_ref()); - - Ok(stats) - } - fn add_filter_equivalence_info( filter: &Arc, eq_properties: &mut EquivalenceProperties, @@ -1461,18 +1377,13 @@ mod tests { use super::*; use crate::TableSchema; - use crate::extended_file_columns::ExtendedFileColumn; - use crate::file_stream::{FileOpenFuture, FileOpener}; use crate::test_util::col; use crate::{ generate_test_files, test_util::MockSource, tests::aggr_test_schema, verify_sort_integrity, }; - use arrow::array::{Int32Array, StringArray}; - use arrow::datatypes::{DataType, Field, Schema}; - use arrow::record_batch::RecordBatch; - use datafusion_common::metadata_columns::INPUT_FILE_NAME_COL; + use arrow::datatypes::Field; use datafusion_common::stats::Precision; use datafusion_common::{ColumnStatistics, internal_err}; use datafusion_expr::{Operator, SortExpr}; @@ -1480,9 +1391,6 @@ mod tests { use datafusion_physical_expr::expressions::{BinaryExpr, Column, Literal}; use datafusion_physical_expr::projection::ProjectionExpr; use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr; - use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; - use futures::{FutureExt, StreamExt}; - use object_store::ObjectStore; #[test] fn physical_plan_config_no_projection_tab_cols_as_field() { @@ -1513,105 +1421,6 @@ mod tests { ); } - #[derive(Clone)] - struct TestSource { - table_schema: TableSchema, - opener: Arc, - metrics: ExecutionPlanMetricsSet, - } - - impl TestSource { - fn new(table_schema: TableSchema, opener: Arc) -> Self { - Self { - table_schema, - opener, - metrics: ExecutionPlanMetricsSet::new(), - } - } - } - - impl FileSource for TestSource { - fn create_file_opener( - &self, - _object_store: Arc, - _base_config: &FileScanConfig, - _partition: usize, - ) -> Result> { - Ok(Arc::clone(&self.opener)) - } - - fn as_any(&self) -> &dyn Any { - self - } - - fn table_schema(&self) -> &TableSchema { - &self.table_schema - } - - fn with_batch_size(&self, _batch_size: usize) -> Arc { - Arc::new(self.clone()) - } - - fn metrics(&self) -> &ExecutionPlanMetricsSet { - &self.metrics - } - - fn file_type(&self) -> &str { - "test" - } - } - - struct TestOpener { - batch: RecordBatch, - } - - impl FileOpener for TestOpener { - fn open(&self, _partitioned_file: PartitionedFile) -> Result { - let batch = self.batch.clone(); - Ok(async move { - let stream = futures::stream::iter(vec![Ok(batch)]); - Ok(stream.boxed()) - } - .boxed()) - } - } - - #[tokio::test] - async fn extended_file_columns_inject_input_file_name() -> Result<()> { - let file_schema = - Arc::new(Schema::new(vec![Field::new("c1", DataType::Int32, false)])); - let batch = RecordBatch::try_new( - Arc::clone(&file_schema), - vec![Arc::new(Int32Array::from(vec![1]))], - )?; - let opener: Arc = Arc::new(TestOpener { batch }); - - let table_schema = TableSchema::new(Arc::clone(&file_schema), vec![]); - let source: Arc = Arc::new(TestSource::new(table_schema, opener)); - - let mut config = - FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), source) - .with_file(PartitionedFile::new("data/path/file.csv", 0)) - .build(); - config.extended_file_columns = vec![ExtendedFileColumn::InputFileName]; - - let task_ctx = Arc::new(TaskContext::default()); - let mut stream = config.open(0, task_ctx)?; - let output_batch = stream.next().await.unwrap()?; - - assert_eq!(output_batch.schema().fields().len(), 2); - assert_eq!(output_batch.schema().field(1).name(), INPUT_FILE_NAME_COL); - - let file_name_array = output_batch - .column(1) - .as_any() - .downcast_ref::() - .expect("input file name column should be Utf8"); - assert!(file_name_array.value(0).ends_with("file.csv")); - - Ok(()) - } - #[test] fn test_split_groups_by_statistics() -> Result<()> { use chrono::TimeZone; diff --git a/datafusion/datasource/src/mod.rs b/datafusion/datasource/src/mod.rs index d3927458c863..f80c9cb0b0da 100644 --- a/datafusion/datasource/src/mod.rs +++ b/datafusion/datasource/src/mod.rs @@ -30,7 +30,6 @@ pub mod decoder; pub mod display; -pub mod extended_file_columns; pub mod file; pub mod file_compression_type; pub mod file_format; diff --git a/datafusion/datasource/src/projection.rs b/datafusion/datasource/src/projection.rs index 9a0cb494e495..f8f42e1b087c 100644 --- a/datafusion/datasource/src/projection.rs +++ b/datafusion/datasource/src/projection.rs @@ -23,6 +23,7 @@ use datafusion_common::{ tree_node::{Transformed, TransformedResult, TreeNode}, }; use datafusion_physical_expr::{ + PhysicalExpr, ScalarFunctionExpr, expressions::{Column, Literal}, projection::{ProjectionExpr, ProjectionExprs}, }; @@ -69,6 +70,7 @@ impl ProjectionOpener { impl FileOpener for ProjectionOpener { fn open(&self, partitioned_file: PartitionedFile) -> Result { let partition_values = partitioned_file.partition_values.clone(); + let file_name = partitioned_file.object_meta.location.to_string(); // Modify any references to partition columns in the projection expressions // and substitute them with literal values from PartitionedFile.partition_values let projection = if self.partition_columns.is_empty() { @@ -80,6 +82,8 @@ impl FileOpener for ProjectionOpener { partition_values, ) }; + // Replace `input_file_name()` with a per-file literal if present. + let projection = inject_input_file_name_into_projection(&projection, file_name); let projector = projection.make_projector(&self.input_schema)?; let inner = self.inner.open(partitioned_file)?; @@ -143,6 +147,35 @@ fn inject_partition_columns_into_projection( ProjectionExprs::new(projections) } +fn inject_input_file_name_into_projection( + projection: &ProjectionExprs, + file_name: String, +) -> ProjectionExprs { + let file_name_literal: Arc = + Arc::new(Literal::new(ScalarValue::Utf8(Some(file_name)))); + + let projections = projection + .iter() + .map(|projection| { + let expr = Arc::clone(&projection.expr) + .transform(|expr| { + if let Some(func) = expr.as_any().downcast_ref::() + && func.fun().name() == "input_file_name" + && func.args().is_empty() + { + return Ok(Transformed::yes(Arc::clone(&file_name_literal))); + } + Ok(Transformed::no(expr)) + }) + .data() + .expect("infallible transform"); + ProjectionExpr::new(expr, projection.alias.clone()) + }) + .collect_vec(); + + ProjectionExprs::new(projections) +} + /// At a high level the goal of SplitProjection is to take a ProjectionExprs meant to be applied to the table schema /// and split that into: /// - The projection indices into the file schema (file_indices) @@ -238,7 +271,7 @@ impl SplitProjection { }; // Pre-create the remapped column so all references can share the same Arc - let new_column: Arc = + let new_column: Arc = Arc::new(Column::new(&name, new_index)); column_mapping.insert(original_index, new_column); } @@ -285,17 +318,64 @@ impl SplitProjection { #[cfg(test)] mod test { + use std::any::Any; use std::sync::Arc; use arrow::array::AsArray; - use arrow::datatypes::{DataType, SchemaRef}; + use arrow::datatypes::{DataType, Field, SchemaRef}; + use arrow::record_batch::RecordBatch; + use datafusion_common::config::ConfigOptions; use datafusion_common::{DFSchema, ScalarValue, record_batch}; + use datafusion_common::{Result, exec_err}; + use datafusion_expr::{ + ColumnarValue, ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl, Signature, + Volatility, + }; use datafusion_expr::{Expr, col, execution_props::ExecutionProps}; + use datafusion_physical_expr::ScalarFunctionExpr; use datafusion_physical_expr::{create_physical_exprs, projection::ProjectionExpr}; + use futures::StreamExt; use itertools::Itertools; use super::*; + #[derive(Debug, PartialEq, Eq, Hash)] + struct TestInputFileNameUdf { + signature: Signature, + } + + impl TestInputFileNameUdf { + fn new() -> Self { + Self { + signature: Signature::nullary(Volatility::Volatile), + } + } + } + + impl ScalarUDFImpl for TestInputFileNameUdf { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "input_file_name" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, _args: &[DataType]) -> Result { + Ok(DataType::Utf8) + } + + fn invoke_with_args(&self, _args: ScalarFunctionArgs) -> Result { + exec_err!( + "input_file_name() should be replaced with a literal before execution" + ) + } + } + fn create_projection_exprs<'a>( exprs: impl IntoIterator, schema: &SchemaRef, @@ -311,6 +391,65 @@ mod test { ProjectionExprs::from(projection_exprs) } + #[tokio::test] + async fn projection_opener_replaces_input_file_name_udf_with_literal() -> Result<()> { + let file_schema = + Arc::new(Schema::new(vec![Field::new("c1", DataType::Int32, false)])); + + let batch = RecordBatch::try_new( + Arc::clone(&file_schema), + vec![Arc::new(arrow::array::Int32Array::from(vec![1]))], + )?; + + struct TestOpener { + batch: RecordBatch, + } + + impl FileOpener for TestOpener { + fn open(&self, _partitioned_file: PartitionedFile) -> Result { + let batch = self.batch.clone(); + Ok(async move { + let stream = futures::stream::iter(vec![Ok(batch)]); + Ok(stream.boxed()) + } + .boxed()) + } + } + + let udf = Arc::new(ScalarUDF::new_from_impl(TestInputFileNameUdf::new())); + let udf_expr = Arc::new(ScalarFunctionExpr::try_new( + udf, + vec![], + file_schema.as_ref(), + Arc::new(ConfigOptions::default()), + )?); + + let projection = ProjectionExprs::new(vec![ + ProjectionExpr::new(Arc::new(Column::new("c1", 0)), "c1"), + ProjectionExpr::new(udf_expr, "input_file_name"), + ]); + + let split = SplitProjection::new(file_schema.as_ref(), &projection); + let inner: Arc = Arc::new(TestOpener { batch }); + let opener = ProjectionOpener::try_new(split, inner, file_schema.as_ref())?; + + let mut stream = opener + .open(PartitionedFile::new("data/path/file.csv", 0))? + .await?; + let output_batch = stream.next().await.unwrap()?; + + assert_eq!(output_batch.schema().field(1).name(), "input_file_name"); + assert!( + output_batch + .column(1) + .as_string::() + .value(0) + .ends_with("file.csv") + ); + + Ok(()) + } + #[test] fn test_split_projection_with_partition_columns() { use arrow::array::AsArray; diff --git a/datafusion/functions/src/core/input_file_name.rs b/datafusion/functions/src/core/input_file_name.rs index c9298ad6fc07..d1160a212de1 100644 --- a/datafusion/functions/src/core/input_file_name.rs +++ b/datafusion/functions/src/core/input_file_name.rs @@ -18,7 +18,7 @@ //! [`InputFileNameFunc`]: Implementation of the `input_file_name` function. use arrow::datatypes::DataType; -use datafusion_common::{Result, exec_err, utils::take_function_args}; +use datafusion_common::{Result, ScalarValue, utils::take_function_args}; use datafusion_doc::Documentation; use datafusion_expr::{ ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility, @@ -31,6 +31,11 @@ use std::any::Any; description = r#"Returns the path of the input file that produced the current row. Note: file paths/URIs may be sensitive metadata depending on your environment. + +This function is intended to be rewritten at file-scan time (when the file is +known). If the input file is not known (for example, if this function is +evaluated outside a file scan, or was not pushed down into one), this function +returns NULL. "#, syntax_example = "input_file_name()", sql_example = r#"```sql @@ -76,7 +81,7 @@ impl ScalarUDFImpl for InputFileNameFunc { fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { let [] = take_function_args(self.name(), args.args)?; - exec_err!("input_file_name() must be planned as a file-scan metadata column") + Ok(ColumnarValue::Scalar(ScalarValue::Utf8(None))) } fn documentation(&self) -> Option<&Documentation> { diff --git a/datafusion/optimizer/src/analyzer/mod.rs b/datafusion/optimizer/src/analyzer/mod.rs index e28508a0977b..ddb3b828f01d 100644 --- a/datafusion/optimizer/src/analyzer/mod.rs +++ b/datafusion/optimizer/src/analyzer/mod.rs @@ -29,7 +29,6 @@ use datafusion_expr::expr_rewriter::FunctionRewrite; use datafusion_expr::{InvariantLevel, LogicalPlan}; use crate::analyzer::resolve_grouping_function::ResolveGroupingFunction; -use crate::analyzer::resolve_input_file_name::ResolveInputFileName; use crate::analyzer::type_coercion::TypeCoercion; use crate::utils::log_plan; @@ -37,7 +36,6 @@ use self::function_rewrite::ApplyFunctionRewrites; pub mod function_rewrite; pub mod resolve_grouping_function; -pub mod resolve_input_file_name; pub mod type_coercion; /// [`AnalyzerRule`]s transform [`LogicalPlan`]s in some way to make @@ -89,7 +87,6 @@ impl Analyzer { pub fn new() -> Self { let rules: Vec> = vec![ Arc::new(ResolveGroupingFunction::new()), - Arc::new(ResolveInputFileName::new()), Arc::new(TypeCoercion::new()), ]; Self::with_rules(rules) diff --git a/datafusion/optimizer/src/analyzer/resolve_input_file_name.rs b/datafusion/optimizer/src/analyzer/resolve_input_file_name.rs deleted file mode 100644 index 031e31b821e7..000000000000 --- a/datafusion/optimizer/src/analyzer/resolve_input_file_name.rs +++ /dev/null @@ -1,218 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! Rewrite `input_file_name()` to an internal scan-projected column. - -use std::sync::Arc; - -use datafusion_common::config::ConfigOptions; -use datafusion_common::metadata_columns::{ - INPUT_FILE_NAME_COL, append_input_file_name_field, -}; -use datafusion_common::tree_node::{Transformed, TreeNode}; -use datafusion_common::{Column, Result, plan_err}; -use datafusion_expr::logical_plan::{ - Distinct, Filter, Limit, Sort, SubqueryAlias, TableScan, -}; -use datafusion_expr::{Expr, LogicalPlan, Projection}; - -use super::AnalyzerRule; - -#[derive(Debug, Default)] -pub struct ResolveInputFileName {} - -impl ResolveInputFileName { - pub fn new() -> Self { - Self {} - } -} - -impl AnalyzerRule for ResolveInputFileName { - fn name(&self) -> &str { - "resolve_input_file_name" - } - - fn analyze( - &self, - plan: LogicalPlan, - _options: &ConfigOptions, - ) -> Result { - plan.transform_up_with_subqueries(|plan| self.rewrite_plan(plan)) - .map(|res| res.data) - } -} - -impl ResolveInputFileName { - fn rewrite_plan(&self, plan: LogicalPlan) -> Result> { - match plan { - LogicalPlan::Projection(Projection { - expr, - input, - schema, - .. - }) => { - let mut contains_input_file_name = false; - let rewritten_exprs = expr - .into_iter() - .map(|expr| { - let (expr, rewritten) = self.rewrite_projection_expr(expr)?; - contains_input_file_name |= rewritten; - Ok(expr) - }) - .collect::>>()?; - - if !contains_input_file_name { - let projection = - Projection::try_new_with_schema(rewritten_exprs, input, schema)?; - return Ok(Transformed::no(LogicalPlan::Projection(projection))); - } - - let new_input = Arc::new(self.annotate_table_scan(input.as_ref())?); - let projection = Projection::try_new(rewritten_exprs, new_input)?; - - Ok(Transformed::yes(LogicalPlan::Projection(projection))) - } - _ => { - self.ensure_not_in_non_projection(&plan)?; - Ok(Transformed::no(plan)) - } - } - } - - fn ensure_not_in_non_projection(&self, plan: &LogicalPlan) -> Result<()> { - for expr in plan.expressions() { - if contains_input_file_name(&expr)? { - return plan_err!( - "input_file_name() is only supported in the SELECT list" - ); - } - } - Ok(()) - } - - fn rewrite_projection_expr(&self, expr: Expr) -> Result<(Expr, bool)> { - if is_input_file_name(&expr) { - let rewritten = Expr::Column(Column::from_name(INPUT_FILE_NAME_COL)) - .alias("input_file_name"); - return Ok((rewritten, true)); - } - - let mut found = false; - let transformed = expr.transform_up(|expr| { - if is_input_file_name(&expr) { - found = true; - Ok(Transformed::yes(Expr::Column(Column::from_name( - INPUT_FILE_NAME_COL, - )))) - } else { - Ok(Transformed::no(expr)) - } - })?; - - Ok((transformed.data, found)) - } - - fn annotate_table_scan(&self, plan: &LogicalPlan) -> Result { - match plan { - LogicalPlan::TableScan(scan) => self.rewrite_table_scan(scan), - LogicalPlan::SubqueryAlias(SubqueryAlias { input, alias, .. }) => { - let new_input = Arc::new(self.annotate_table_scan(input.as_ref())?); - let alias = SubqueryAlias::try_new(new_input, alias.clone())?; - Ok(LogicalPlan::SubqueryAlias(alias)) - } - LogicalPlan::Filter(Filter { - predicate, input, .. - }) => { - let new_input = Arc::new(self.annotate_table_scan(input.as_ref())?); - let filter = Filter::try_new(predicate.clone(), new_input)?; - Ok(LogicalPlan::Filter(filter)) - } - LogicalPlan::Sort(Sort { expr, input, fetch }) => { - let new_input = Arc::new(self.annotate_table_scan(input.as_ref())?); - Ok(LogicalPlan::Sort(Sort { - expr: expr.clone(), - input: new_input, - fetch: *fetch, - })) - } - LogicalPlan::Limit(Limit { skip, fetch, input }) => { - let new_input = Arc::new(self.annotate_table_scan(input.as_ref())?); - Ok(LogicalPlan::Limit(Limit { - skip: skip.clone(), - fetch: fetch.clone(), - input: new_input, - })) - } - LogicalPlan::Distinct(Distinct::All(input)) => { - let new_input = Arc::new(self.annotate_table_scan(input.as_ref())?); - Ok(LogicalPlan::Distinct(Distinct::All(new_input))) - } - LogicalPlan::Distinct(Distinct::On(_)) => { - plan_err!("input_file_name() is not supported with DISTINCT ON") - } - LogicalPlan::Join(_) => plan_err!( - "input_file_name() cannot be used with joins - the file source would be ambiguous. Use input_file_name() in a subquery on a single table instead." - ), - _ => plan_err!( - "input_file_name() is only supported for file-backed table scans" - ), - } - } - - fn rewrite_table_scan(&self, scan: &TableScan) -> Result { - if scan - .source - .schema() - .fields() - .iter() - .any(|field| field.name() == INPUT_FILE_NAME_COL) - { - return plan_err!( - "input_file_name() cannot be used because the table schema already contains '{INPUT_FILE_NAME_COL}'" - ); - } - - if scan - .projected_schema - .fields() - .iter() - .any(|field| field.name() == INPUT_FILE_NAME_COL) - { - return Ok(LogicalPlan::TableScan(scan.clone())); - } - - let new_schema = append_input_file_name_field( - scan.projected_schema.as_ref(), - Some(scan.table_name.clone()), - )?; - - let mut new_scan = scan.clone(); - new_scan.projected_schema = Arc::new(new_schema); - Ok(LogicalPlan::TableScan(new_scan)) - } -} - -fn is_input_file_name(expr: &Expr) -> bool { - matches!( - expr, - Expr::ScalarFunction(func) if func.name() == "input_file_name" && func.args.is_empty() - ) -} - -fn contains_input_file_name(expr: &Expr) -> Result { - expr.exists(|expr| Ok(is_input_file_name(expr))) -} diff --git a/datafusion/optimizer/src/optimize_projections/mod.rs b/datafusion/optimizer/src/optimize_projections/mod.rs index 9c40cf3f9d1a..f97b05ea68fb 100644 --- a/datafusion/optimizer/src/optimize_projections/mod.rs +++ b/datafusion/optimizer/src/optimize_projections/mod.rs @@ -24,9 +24,6 @@ use crate::{OptimizerConfig, OptimizerRule}; use std::collections::HashSet; use std::sync::Arc; -use datafusion_common::metadata_columns::{ - INPUT_FILE_NAME_COL, append_input_file_name_field, -}; use datafusion_common::{ Column, DFSchema, HashMap, JoinType, Result, assert_eq_or_internal_err, get_required_group_by_exprs_indices, internal_datafusion_err, internal_err, @@ -262,60 +259,18 @@ fn optimize_projections( projection, filters, fetch, - projected_schema, + projected_schema: _, } = table_scan; - let source_has_input_file_name = source - .schema() - .fields() - .iter() - .any(|field| field.name() == INPUT_FILE_NAME_COL); - - let input_file_name_idx = if source_has_input_file_name { - None - } else { - projected_schema - .fields() - .iter() - .position(|field| field.name() == INPUT_FILE_NAME_COL) - }; - - let (needs_input_file_name, indices) = - if let Some(input_file_name_idx) = input_file_name_idx { - let needs_input_file_name = indices - .indices() - .binary_search(&input_file_name_idx) - .is_ok(); - let indices = RequiredIndices::new_from_indices( - indices - .into_inner() - .into_iter() - .filter(|idx| *idx != input_file_name_idx) - .collect(), - ); - (needs_input_file_name, indices) - } else { - (false, indices) - }; - // Get indices referred to in the original (schema with all fields) // given projected indices. let projection = match &projection { Some(projection) => indices.into_mapped_indices(|idx| projection[idx]), None => indices.into_inner(), }; - let qualifier = table_name.clone(); - let mut new_scan = + let new_scan = TableScan::try_new(table_name, source, Some(projection), filters, fetch)?; - if needs_input_file_name { - let new_schema = append_input_file_name_field( - new_scan.projected_schema.as_ref(), - Some(qualifier), - )?; - new_scan.projected_schema = Arc::new(new_schema); - } - return Ok(Transformed::yes(LogicalPlan::TableScan(new_scan))); } // Other node types are handled below @@ -991,7 +946,6 @@ mod tests { }; use crate::{OptimizerContext, OptimizerRule}; use arrow::datatypes::{DataType, Field, Schema}; - use datafusion_common::metadata_columns::INPUT_FILE_NAME_COL; use datafusion_common::{ Column, DFSchema, DFSchemaRef, JoinType, Result, TableReference, }; @@ -1229,29 +1183,6 @@ mod tests { ) } - #[test] - fn optimize_projections_keeps_reserved_column_from_source() -> Result<()> { - let schema = Schema::new(vec![ - Field::new(INPUT_FILE_NAME_COL, DataType::Utf8, false), - Field::new("a", DataType::UInt32, false), - ]); - - let plan = table_scan(Some("t"), &schema, None)?.build()?; - let optimized_plan = optimize(plan)?; - - match optimized_plan { - LogicalPlan::TableScan(scan) => { - let projection = scan.projection.expect("projection present"); - assert_eq!(projection, vec![0, 1]); - assert_eq!(scan.projected_schema.field(0).name(), INPUT_FILE_NAME_COL); - assert_eq!(scan.projected_schema.fields().len(), 2); - } - _ => panic!("expected optimized plan to be a TableScan"), - } - - Ok(()) - } - #[test] fn merge_three_projection() -> Result<()> { let table_scan = test_table_scan()?; diff --git a/datafusion/physical-expr/src/equivalence/properties/mod.rs b/datafusion/physical-expr/src/equivalence/properties/mod.rs index 38a46699d034..996bc4b08fcd 100644 --- a/datafusion/physical-expr/src/equivalence/properties/mod.rs +++ b/datafusion/physical-expr/src/equivalence/properties/mod.rs @@ -239,39 +239,6 @@ impl EquivalenceProperties { &self.schema } - /// Updates the schema by appending new trailing fields. - /// - /// Validates that the existing schema is a prefix of the new schema. - pub fn with_appended_schema(mut self, new_schema: SchemaRef) -> Result { - let existing_fields = self.schema.fields(); - let new_fields = new_schema.fields(); - - if new_fields.len() < existing_fields.len() { - return plan_err!( - "appended schema has fewer fields ({} < {})", - new_fields.len(), - existing_fields.len() - ); - } - - for (idx, field) in existing_fields.iter().enumerate() { - let new_field = &new_fields[idx]; - if field.name() != new_field.name() - || field.data_type() != new_field.data_type() - || field.is_nullable() != new_field.is_nullable() - { - return plan_err!( - "appended schema mismatch at index {idx}: expected {} got {}", - field, - new_field - ); - } - } - - self.schema = new_schema; - Ok(self) - } - /// Returns a reference to the ordering equivalence class within. pub fn oeq_class(&self) -> &OrderingEquivalenceClass { &self.oeq_class diff --git a/datafusion/sqllogictest/test_files/input_file_name.slt b/datafusion/sqllogictest/test_files/input_file_name.slt index 8364ef001c95..bf17d5590afa 100644 --- a/datafusion/sqllogictest/test_files/input_file_name.slt +++ b/datafusion/sqllogictest/test_files/input_file_name.slt @@ -49,20 +49,7 @@ LIMIT 8; 2 2 1 3 3 1 -statement error +query T SELECT input_file_name() FROM (VALUES (1)) v(x); - -statement ok -CREATE EXTERNAL TABLE t1(a INT) -STORED AS CSV -LOCATION '../core/tests/data/partitioned_csv' -OPTIONS ('format.has_header' 'false'); - -statement ok -CREATE EXTERNAL TABLE t2(a INT) -STORED AS CSV -LOCATION '../core/tests/data/partitioned_csv' -OPTIONS ('format.has_header' 'false'); - -statement error .*input_file_name\(\) cannot be used with joins.* -SELECT input_file_name() FROM t1 JOIN t2 ON t1.a = t2.a LIMIT 1; +---- +NULL diff --git a/docs/source/user-guide/sql/scalar_functions.md b/docs/source/user-guide/sql/scalar_functions.md index 1b52c7bdab52..df53b81a73bc 100644 --- a/docs/source/user-guide/sql/scalar_functions.md +++ b/docs/source/user-guide/sql/scalar_functions.md @@ -5126,6 +5126,7 @@ union_tag(union_expression) - [arrow_metadata](#arrow_metadata) - [arrow_typeof](#arrow_typeof) - [get_field](#get_field) +- [input_file_name](#input_file_name) - [version](#version) ### `arrow_cast` @@ -5271,6 +5272,27 @@ get_field(expression, field_name[, field_name2, ...]) +--------+ ``` +### `input_file_name` + +Returns the path of the input file that produced the current row. + +Note: file paths/URIs may be sensitive metadata depending on your environment. + +This function is intended to be rewritten at file-scan time (when the file is +known). If the input file is not known (for example, if this function is +evaluated outside a file scan, or was not pushed down into one), this function +returns NULL. + +```sql +input_file_name() +``` + +#### Example + +```sql +SELECT input_file_name() FROM t; +``` + ### `version` Returns the version of DataFusion.