diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index 6f92d567c8307..e0baccce17f9f 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -25,7 +25,10 @@ use crate::{ }; use arrow::array::{RecordBatch, RecordBatchOptions}; use arrow::datatypes::DataType; +use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener}; +use datafusion_physical_expr::ScalarFunctionExpr; +use datafusion_physical_expr::expressions::Literal; use datafusion_physical_expr::projection::ProjectionExprs; use datafusion_physical_expr::utils::reassign_expr_columns; use datafusion_physical_expr_adapter::replace_columns_with_literals; @@ -66,6 +69,29 @@ use parquet::arrow::async_reader::AsyncFileReader; use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask}; use parquet::file::metadata::{PageIndexPolicy, ParquetMetaDataReader, RowGroupMetaData}; +fn replace_input_file_name_in_projection( + projection: ProjectionExprs, + file_name: &str, +) -> Result { + let file_name_literal: Arc = + Arc::new(Literal::new(ScalarValue::Utf8(Some(file_name.to_owned())))); + + projection.try_map_exprs(|expr| { + Ok(expr + .transform(|expr| { + if let Some(func) = expr.as_any().downcast_ref::() + && func.fun().name() == "input_file_name" + && func.args().is_empty() + { + return Ok(Transformed::yes(Arc::clone(&file_name_literal))); + } + Ok(Transformed::no(expr)) + }) + .data() + .expect("infallible transform")) + }) +} + /// Implements [`FileOpener`] for a parquet file pub(super) struct ParquetOpener { /// Execution partition index @@ -259,6 +285,8 @@ impl FileOpener for ParquetOpener { .map(|p| replace_columns_with_literals(p, &literal_columns)) .transpose()?; } + // Replace any `input_file_name()` UDFs in the projection with a literal for this file. + projection = replace_input_file_name_in_projection(projection, &file_name)?; let reorder_predicates = self.reorder_filters; let pushdown_filters = self.pushdown_filters; @@ -1014,23 +1042,33 @@ fn should_enable_page_index( #[cfg(test)] mod test { + use std::any::Any; use std::sync::Arc; - use super::{ConstantColumns, constant_columns_from_stats}; + use super::{ + ConstantColumns, constant_columns_from_stats, + replace_input_file_name_in_projection, + }; use crate::{DefaultParquetFileReaderFactory, RowGroupAccess, opener::ParquetOpener}; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use bytes::{BufMut, BytesMut}; + use datafusion_common::config::ConfigOptions; use datafusion_common::{ - ColumnStatistics, DataFusionError, ScalarValue, Statistics, record_batch, + ColumnStatistics, DataFusionError, Result, ScalarValue, Statistics, record_batch, stats::Precision, }; use datafusion_datasource::{PartitionedFile, TableSchema, file_stream::FileOpener}; + use datafusion_expr::{ + ColumnarValue, ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl, Signature, + Volatility, + }; use datafusion_expr::{col, lit}; + use datafusion_physical_expr::ScalarFunctionExpr; use datafusion_physical_expr::{ PhysicalExpr, expressions::{Column, DynamicFilterPhysicalExpr, Literal}, planner::logical2physical, - projection::ProjectionExprs, + projection::{ProjectionExpr, ProjectionExprs}, }; use datafusion_physical_expr_adapter::{ DefaultPhysicalExprAdapterFactory, replace_columns_with_literals, @@ -1041,6 +1079,77 @@ mod test { use parquet::arrow::ArrowWriter; use parquet::file::properties::WriterProperties; + #[derive(Debug, PartialEq, Eq, Hash)] + struct TestInputFileNameUdf { + signature: Signature, + } + + impl TestInputFileNameUdf { + fn new() -> Self { + Self { + signature: Signature::nullary(Volatility::Volatile), + } + } + } + + impl ScalarUDFImpl for TestInputFileNameUdf { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "input_file_name" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, _args: &[DataType]) -> Result { + Ok(DataType::Utf8) + } + + fn invoke_with_args(&self, _args: ScalarFunctionArgs) -> Result { + Ok(ColumnarValue::Scalar(ScalarValue::Utf8(None))) + } + } + + #[test] + fn parquet_opener_replaces_input_file_name_udf_with_literal() -> Result<()> { + let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])); + + let udf = Arc::new(ScalarUDF::new_from_impl(TestInputFileNameUdf::new())); + let udf_expr = Arc::new(ScalarFunctionExpr::try_new( + udf, + vec![], + schema.as_ref(), + Arc::new(ConfigOptions::default()), + )?); + + let projection = ProjectionExprs::new(vec![ + ProjectionExpr::new(Arc::new(Column::new("a", 0)), "a"), + ProjectionExpr::new(udf_expr, "input_file_name"), + ]); + + let file_name = "s3://bucket/data/file.parquet"; + let rewritten = replace_input_file_name_in_projection(projection, file_name)?; + + assert_eq!(rewritten.as_ref().len(), 2); + assert_eq!(rewritten.as_ref()[1].alias, "input_file_name"); + + let expr = &rewritten.as_ref()[1].expr; + let literal = expr + .as_any() + .downcast_ref::() + .expect("expected input_file_name() to be rewritten to a literal"); + assert_eq!( + literal.value(), + &ScalarValue::Utf8(Some(file_name.to_owned())) + ); + + Ok(()) + } + /// Builder for creating [`ParquetOpener`] instances with sensible defaults for tests. /// This helps reduce code duplication and makes it clear what differs between test cases. struct ParquetOpenerBuilder { diff --git a/datafusion/datasource/src/projection.rs b/datafusion/datasource/src/projection.rs index 9a0cb494e495f..f8f42e1b087c8 100644 --- a/datafusion/datasource/src/projection.rs +++ b/datafusion/datasource/src/projection.rs @@ -23,6 +23,7 @@ use datafusion_common::{ tree_node::{Transformed, TransformedResult, TreeNode}, }; use datafusion_physical_expr::{ + PhysicalExpr, ScalarFunctionExpr, expressions::{Column, Literal}, projection::{ProjectionExpr, ProjectionExprs}, }; @@ -69,6 +70,7 @@ impl ProjectionOpener { impl FileOpener for ProjectionOpener { fn open(&self, partitioned_file: PartitionedFile) -> Result { let partition_values = partitioned_file.partition_values.clone(); + let file_name = partitioned_file.object_meta.location.to_string(); // Modify any references to partition columns in the projection expressions // and substitute them with literal values from PartitionedFile.partition_values let projection = if self.partition_columns.is_empty() { @@ -80,6 +82,8 @@ impl FileOpener for ProjectionOpener { partition_values, ) }; + // Replace `input_file_name()` with a per-file literal if present. + let projection = inject_input_file_name_into_projection(&projection, file_name); let projector = projection.make_projector(&self.input_schema)?; let inner = self.inner.open(partitioned_file)?; @@ -143,6 +147,35 @@ fn inject_partition_columns_into_projection( ProjectionExprs::new(projections) } +fn inject_input_file_name_into_projection( + projection: &ProjectionExprs, + file_name: String, +) -> ProjectionExprs { + let file_name_literal: Arc = + Arc::new(Literal::new(ScalarValue::Utf8(Some(file_name)))); + + let projections = projection + .iter() + .map(|projection| { + let expr = Arc::clone(&projection.expr) + .transform(|expr| { + if let Some(func) = expr.as_any().downcast_ref::() + && func.fun().name() == "input_file_name" + && func.args().is_empty() + { + return Ok(Transformed::yes(Arc::clone(&file_name_literal))); + } + Ok(Transformed::no(expr)) + }) + .data() + .expect("infallible transform"); + ProjectionExpr::new(expr, projection.alias.clone()) + }) + .collect_vec(); + + ProjectionExprs::new(projections) +} + /// At a high level the goal of SplitProjection is to take a ProjectionExprs meant to be applied to the table schema /// and split that into: /// - The projection indices into the file schema (file_indices) @@ -238,7 +271,7 @@ impl SplitProjection { }; // Pre-create the remapped column so all references can share the same Arc - let new_column: Arc = + let new_column: Arc = Arc::new(Column::new(&name, new_index)); column_mapping.insert(original_index, new_column); } @@ -285,17 +318,64 @@ impl SplitProjection { #[cfg(test)] mod test { + use std::any::Any; use std::sync::Arc; use arrow::array::AsArray; - use arrow::datatypes::{DataType, SchemaRef}; + use arrow::datatypes::{DataType, Field, SchemaRef}; + use arrow::record_batch::RecordBatch; + use datafusion_common::config::ConfigOptions; use datafusion_common::{DFSchema, ScalarValue, record_batch}; + use datafusion_common::{Result, exec_err}; + use datafusion_expr::{ + ColumnarValue, ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl, Signature, + Volatility, + }; use datafusion_expr::{Expr, col, execution_props::ExecutionProps}; + use datafusion_physical_expr::ScalarFunctionExpr; use datafusion_physical_expr::{create_physical_exprs, projection::ProjectionExpr}; + use futures::StreamExt; use itertools::Itertools; use super::*; + #[derive(Debug, PartialEq, Eq, Hash)] + struct TestInputFileNameUdf { + signature: Signature, + } + + impl TestInputFileNameUdf { + fn new() -> Self { + Self { + signature: Signature::nullary(Volatility::Volatile), + } + } + } + + impl ScalarUDFImpl for TestInputFileNameUdf { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "input_file_name" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, _args: &[DataType]) -> Result { + Ok(DataType::Utf8) + } + + fn invoke_with_args(&self, _args: ScalarFunctionArgs) -> Result { + exec_err!( + "input_file_name() should be replaced with a literal before execution" + ) + } + } + fn create_projection_exprs<'a>( exprs: impl IntoIterator, schema: &SchemaRef, @@ -311,6 +391,65 @@ mod test { ProjectionExprs::from(projection_exprs) } + #[tokio::test] + async fn projection_opener_replaces_input_file_name_udf_with_literal() -> Result<()> { + let file_schema = + Arc::new(Schema::new(vec![Field::new("c1", DataType::Int32, false)])); + + let batch = RecordBatch::try_new( + Arc::clone(&file_schema), + vec![Arc::new(arrow::array::Int32Array::from(vec![1]))], + )?; + + struct TestOpener { + batch: RecordBatch, + } + + impl FileOpener for TestOpener { + fn open(&self, _partitioned_file: PartitionedFile) -> Result { + let batch = self.batch.clone(); + Ok(async move { + let stream = futures::stream::iter(vec![Ok(batch)]); + Ok(stream.boxed()) + } + .boxed()) + } + } + + let udf = Arc::new(ScalarUDF::new_from_impl(TestInputFileNameUdf::new())); + let udf_expr = Arc::new(ScalarFunctionExpr::try_new( + udf, + vec![], + file_schema.as_ref(), + Arc::new(ConfigOptions::default()), + )?); + + let projection = ProjectionExprs::new(vec![ + ProjectionExpr::new(Arc::new(Column::new("c1", 0)), "c1"), + ProjectionExpr::new(udf_expr, "input_file_name"), + ]); + + let split = SplitProjection::new(file_schema.as_ref(), &projection); + let inner: Arc = Arc::new(TestOpener { batch }); + let opener = ProjectionOpener::try_new(split, inner, file_schema.as_ref())?; + + let mut stream = opener + .open(PartitionedFile::new("data/path/file.csv", 0))? + .await?; + let output_batch = stream.next().await.unwrap()?; + + assert_eq!(output_batch.schema().field(1).name(), "input_file_name"); + assert!( + output_batch + .column(1) + .as_string::() + .value(0) + .ends_with("file.csv") + ); + + Ok(()) + } + #[test] fn test_split_projection_with_partition_columns() { use arrow::array::AsArray; diff --git a/datafusion/functions/src/core/input_file_name.rs b/datafusion/functions/src/core/input_file_name.rs new file mode 100644 index 0000000000000..d1160a212de15 --- /dev/null +++ b/datafusion/functions/src/core/input_file_name.rs @@ -0,0 +1,90 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! [`InputFileNameFunc`]: Implementation of the `input_file_name` function. + +use arrow::datatypes::DataType; +use datafusion_common::{Result, ScalarValue, utils::take_function_args}; +use datafusion_doc::Documentation; +use datafusion_expr::{ + ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility, +}; +use datafusion_macros::user_doc; +use std::any::Any; + +#[user_doc( + doc_section(label = "Other Functions"), + description = r#"Returns the path of the input file that produced the current row. + +Note: file paths/URIs may be sensitive metadata depending on your environment. + +This function is intended to be rewritten at file-scan time (when the file is +known). If the input file is not known (for example, if this function is +evaluated outside a file scan, or was not pushed down into one), this function +returns NULL. +"#, + syntax_example = "input_file_name()", + sql_example = r#"```sql +SELECT input_file_name() FROM t; +```"# +)] +#[derive(Debug, PartialEq, Eq, Hash)] +pub struct InputFileNameFunc { + signature: Signature, +} + +impl Default for InputFileNameFunc { + fn default() -> Self { + Self::new() + } +} + +impl InputFileNameFunc { + pub fn new() -> Self { + Self { + signature: Signature::nullary(Volatility::Volatile), + } + } +} + +impl ScalarUDFImpl for InputFileNameFunc { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "input_file_name" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, args: &[DataType]) -> Result { + let [] = take_function_args(self.name(), args)?; + Ok(DataType::Utf8) + } + + fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { + let [] = take_function_args(self.name(), args.args)?; + Ok(ColumnarValue::Scalar(ScalarValue::Utf8(None))) + } + + fn documentation(&self) -> Option<&Documentation> { + self.doc() + } +} diff --git a/datafusion/functions/src/core/mod.rs b/datafusion/functions/src/core/mod.rs index a14d563737240..6cf04995ea893 100644 --- a/datafusion/functions/src/core/mod.rs +++ b/datafusion/functions/src/core/mod.rs @@ -28,6 +28,7 @@ pub mod expr_ext; pub mod getfield; pub mod greatest; mod greatest_least_utils; +pub mod input_file_name; pub mod least; pub mod named_struct; pub mod nullif; @@ -57,6 +58,7 @@ make_udf_function!(union_extract::UnionExtractFun, union_extract); make_udf_function!(union_tag::UnionTagFunc, union_tag); make_udf_function!(version::VersionFunc, version); make_udf_function!(arrow_metadata::ArrowMetadataFunc, arrow_metadata); +make_udf_function!(input_file_name::InputFileNameFunc, input_file_name); pub mod expr_fn { use datafusion_expr::{Expr, Literal}; @@ -113,6 +115,9 @@ pub mod expr_fn { union_tag, "Returns the name of the currently selected field in the union", arg1 + ),( + input_file_name, + "Returns the path of the input file that produced the current row", )); #[doc = "Returns the value of the field with the given name from the struct"] @@ -160,6 +165,7 @@ pub fn functions() -> Vec> { union_extract(), union_tag(), version(), + input_file_name(), r#struct(), ] } diff --git a/datafusion/sqllogictest/test_files/input_file_name.slt b/datafusion/sqllogictest/test_files/input_file_name.slt new file mode 100644 index 0000000000000..bf17d5590afa3 --- /dev/null +++ b/datafusion/sqllogictest/test_files/input_file_name.slt @@ -0,0 +1,55 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +########## +## input_file_name() tests +########## + +statement ok +CREATE EXTERNAL TABLE t(c1 INT, c2 INT, c3 BOOLEAN) +STORED AS CSV +LOCATION '../core/tests/data/partitioned_csv' +OPTIONS ('format.has_header' 'false'); + +query III +SELECT + CASE + WHEN input_file_name() LIKE '%partition-0.csv' THEN 0 + WHEN input_file_name() LIKE '%partition-1.csv' THEN 1 + WHEN input_file_name() LIKE '%partition-2.csv' THEN 2 + WHEN input_file_name() LIKE '%partition-3.csv' THEN 3 + ELSE -1 + END AS file_id, + c1, + c2 +FROM t +ORDER BY c2, c1 +LIMIT 8; +---- +0 0 0 +1 1 0 +2 2 0 +3 3 0 +0 0 1 +1 1 1 +2 2 1 +3 3 1 + +query T +SELECT input_file_name() FROM (VALUES (1)) v(x); +---- +NULL diff --git a/docs/source/user-guide/sql/scalar_functions.md b/docs/source/user-guide/sql/scalar_functions.md index 1b52c7bdab528..df53b81a73bce 100644 --- a/docs/source/user-guide/sql/scalar_functions.md +++ b/docs/source/user-guide/sql/scalar_functions.md @@ -5126,6 +5126,7 @@ union_tag(union_expression) - [arrow_metadata](#arrow_metadata) - [arrow_typeof](#arrow_typeof) - [get_field](#get_field) +- [input_file_name](#input_file_name) - [version](#version) ### `arrow_cast` @@ -5271,6 +5272,27 @@ get_field(expression, field_name[, field_name2, ...]) +--------+ ``` +### `input_file_name` + +Returns the path of the input file that produced the current row. + +Note: file paths/URIs may be sensitive metadata depending on your environment. + +This function is intended to be rewritten at file-scan time (when the file is +known). If the input file is not known (for example, if this function is +evaluated outside a file scan, or was not pushed down into one), this function +returns NULL. + +```sql +input_file_name() +``` + +#### Example + +```sql +SELECT input_file_name() FROM t; +``` + ### `version` Returns the version of DataFusion.