Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 112 additions & 3 deletions datafusion/datasource-parquet/src/opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@ use crate::{
};
use arrow::array::{RecordBatch, RecordBatchOptions};
use arrow::datatypes::DataType;
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener};
use datafusion_physical_expr::ScalarFunctionExpr;
use datafusion_physical_expr::expressions::Literal;
use datafusion_physical_expr::projection::ProjectionExprs;
use datafusion_physical_expr::utils::reassign_expr_columns;
use datafusion_physical_expr_adapter::replace_columns_with_literals;
Expand Down Expand Up @@ -66,6 +69,29 @@ use parquet::arrow::async_reader::AsyncFileReader;
use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask};
use parquet::file::metadata::{PageIndexPolicy, ParquetMetaDataReader, RowGroupMetaData};

fn replace_input_file_name_in_projection(
projection: ProjectionExprs,
file_name: &str,
) -> Result<ProjectionExprs> {
let file_name_literal: Arc<dyn PhysicalExpr> =
Arc::new(Literal::new(ScalarValue::Utf8(Some(file_name.to_owned()))));

projection.try_map_exprs(|expr| {
Ok(expr
.transform(|expr| {
if let Some(func) = expr.as_any().downcast_ref::<ScalarFunctionExpr>()
&& func.fun().name() == "input_file_name"
&& func.args().is_empty()
{
return Ok(Transformed::yes(Arc::clone(&file_name_literal)));
}
Ok(Transformed::no(expr))
})
.data()
.expect("infallible transform"))
})
}

/// Implements [`FileOpener`] for a parquet file
pub(super) struct ParquetOpener {
/// Execution partition index
Expand Down Expand Up @@ -259,6 +285,8 @@ impl FileOpener for ParquetOpener {
.map(|p| replace_columns_with_literals(p, &literal_columns))
.transpose()?;
}
// Replace any `input_file_name()` UDFs in the projection with a literal for this file.
projection = replace_input_file_name_in_projection(projection, &file_name)?;

let reorder_predicates = self.reorder_filters;
let pushdown_filters = self.pushdown_filters;
Expand Down Expand Up @@ -1014,23 +1042,33 @@ fn should_enable_page_index(

#[cfg(test)]
mod test {
use std::any::Any;
use std::sync::Arc;

use super::{ConstantColumns, constant_columns_from_stats};
use super::{
ConstantColumns, constant_columns_from_stats,
replace_input_file_name_in_projection,
};
use crate::{DefaultParquetFileReaderFactory, RowGroupAccess, opener::ParquetOpener};
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use bytes::{BufMut, BytesMut};
use datafusion_common::config::ConfigOptions;
use datafusion_common::{
ColumnStatistics, DataFusionError, ScalarValue, Statistics, record_batch,
ColumnStatistics, DataFusionError, Result, ScalarValue, Statistics, record_batch,
stats::Precision,
};
use datafusion_datasource::{PartitionedFile, TableSchema, file_stream::FileOpener};
use datafusion_expr::{
ColumnarValue, ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl, Signature,
Volatility,
};
use datafusion_expr::{col, lit};
use datafusion_physical_expr::ScalarFunctionExpr;
use datafusion_physical_expr::{
PhysicalExpr,
expressions::{Column, DynamicFilterPhysicalExpr, Literal},
planner::logical2physical,
projection::ProjectionExprs,
projection::{ProjectionExpr, ProjectionExprs},
};
use datafusion_physical_expr_adapter::{
DefaultPhysicalExprAdapterFactory, replace_columns_with_literals,
Expand All @@ -1041,6 +1079,77 @@ mod test {
use parquet::arrow::ArrowWriter;
use parquet::file::properties::WriterProperties;

#[derive(Debug, PartialEq, Eq, Hash)]
struct TestInputFileNameUdf {
signature: Signature,
}

impl TestInputFileNameUdf {
fn new() -> Self {
Self {
signature: Signature::nullary(Volatility::Volatile),
}
}
}

impl ScalarUDFImpl for TestInputFileNameUdf {
fn as_any(&self) -> &dyn Any {
self
}

fn name(&self) -> &str {
"input_file_name"
}

fn signature(&self) -> &Signature {
&self.signature
}

fn return_type(&self, _args: &[DataType]) -> Result<DataType> {
Ok(DataType::Utf8)
}

fn invoke_with_args(&self, _args: ScalarFunctionArgs) -> Result<ColumnarValue> {
Ok(ColumnarValue::Scalar(ScalarValue::Utf8(None)))
}
}

#[test]
fn parquet_opener_replaces_input_file_name_udf_with_literal() -> Result<()> {
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));

let udf = Arc::new(ScalarUDF::new_from_impl(TestInputFileNameUdf::new()));
let udf_expr = Arc::new(ScalarFunctionExpr::try_new(
udf,
vec![],
schema.as_ref(),
Arc::new(ConfigOptions::default()),
)?);

let projection = ProjectionExprs::new(vec![
ProjectionExpr::new(Arc::new(Column::new("a", 0)), "a"),
ProjectionExpr::new(udf_expr, "input_file_name"),
]);

let file_name = "s3://bucket/data/file.parquet";
let rewritten = replace_input_file_name_in_projection(projection, file_name)?;

assert_eq!(rewritten.as_ref().len(), 2);
assert_eq!(rewritten.as_ref()[1].alias, "input_file_name");

let expr = &rewritten.as_ref()[1].expr;
let literal = expr
.as_any()
.downcast_ref::<Literal>()
.expect("expected input_file_name() to be rewritten to a literal");
assert_eq!(
literal.value(),
&ScalarValue::Utf8(Some(file_name.to_owned()))
);

Ok(())
}

/// Builder for creating [`ParquetOpener`] instances with sensible defaults for tests.
/// This helps reduce code duplication and makes it clear what differs between test cases.
struct ParquetOpenerBuilder {
Expand Down
143 changes: 141 additions & 2 deletions datafusion/datasource/src/projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use datafusion_common::{
tree_node::{Transformed, TransformedResult, TreeNode},
};
use datafusion_physical_expr::{
PhysicalExpr, ScalarFunctionExpr,
expressions::{Column, Literal},
projection::{ProjectionExpr, ProjectionExprs},
};
Expand Down Expand Up @@ -69,6 +70,7 @@ impl ProjectionOpener {
impl FileOpener for ProjectionOpener {
fn open(&self, partitioned_file: PartitionedFile) -> Result<FileOpenFuture> {
let partition_values = partitioned_file.partition_values.clone();
let file_name = partitioned_file.object_meta.location.to_string();
// Modify any references to partition columns in the projection expressions
// and substitute them with literal values from PartitionedFile.partition_values
let projection = if self.partition_columns.is_empty() {
Expand All @@ -80,6 +82,8 @@ impl FileOpener for ProjectionOpener {
partition_values,
)
};
// Replace `input_file_name()` with a per-file literal if present.
let projection = inject_input_file_name_into_projection(&projection, file_name);
let projector = projection.make_projector(&self.input_schema)?;

let inner = self.inner.open(partitioned_file)?;
Expand Down Expand Up @@ -143,6 +147,35 @@ fn inject_partition_columns_into_projection(
ProjectionExprs::new(projections)
}

fn inject_input_file_name_into_projection(
projection: &ProjectionExprs,
file_name: String,
) -> ProjectionExprs {
let file_name_literal: Arc<dyn PhysicalExpr> =
Arc::new(Literal::new(ScalarValue::Utf8(Some(file_name))));

let projections = projection
.iter()
.map(|projection| {
let expr = Arc::clone(&projection.expr)
.transform(|expr| {
if let Some(func) = expr.as_any().downcast_ref::<ScalarFunctionExpr>()
&& func.fun().name() == "input_file_name"
&& func.args().is_empty()
{
return Ok(Transformed::yes(Arc::clone(&file_name_literal)));
}
Ok(Transformed::no(expr))
})
.data()
.expect("infallible transform");
ProjectionExpr::new(expr, projection.alias.clone())
})
.collect_vec();

ProjectionExprs::new(projections)
}

/// At a high level the goal of SplitProjection is to take a ProjectionExprs meant to be applied to the table schema
/// and split that into:
/// - The projection indices into the file schema (file_indices)
Expand Down Expand Up @@ -238,7 +271,7 @@ impl SplitProjection {
};

// Pre-create the remapped column so all references can share the same Arc
let new_column: Arc<dyn datafusion_physical_plan::PhysicalExpr> =
let new_column: Arc<dyn PhysicalExpr> =
Arc::new(Column::new(&name, new_index));
column_mapping.insert(original_index, new_column);
}
Expand Down Expand Up @@ -285,17 +318,64 @@ impl SplitProjection {

#[cfg(test)]
mod test {
use std::any::Any;
use std::sync::Arc;

use arrow::array::AsArray;
use arrow::datatypes::{DataType, SchemaRef};
use arrow::datatypes::{DataType, Field, SchemaRef};
use arrow::record_batch::RecordBatch;
use datafusion_common::config::ConfigOptions;
use datafusion_common::{DFSchema, ScalarValue, record_batch};
use datafusion_common::{Result, exec_err};
use datafusion_expr::{
ColumnarValue, ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl, Signature,
Volatility,
};
use datafusion_expr::{Expr, col, execution_props::ExecutionProps};
use datafusion_physical_expr::ScalarFunctionExpr;
use datafusion_physical_expr::{create_physical_exprs, projection::ProjectionExpr};
use futures::StreamExt;
use itertools::Itertools;

use super::*;

#[derive(Debug, PartialEq, Eq, Hash)]
struct TestInputFileNameUdf {
signature: Signature,
}

impl TestInputFileNameUdf {
fn new() -> Self {
Self {
signature: Signature::nullary(Volatility::Volatile),
}
}
}

impl ScalarUDFImpl for TestInputFileNameUdf {
fn as_any(&self) -> &dyn Any {
self
}

fn name(&self) -> &str {
"input_file_name"
}

fn signature(&self) -> &Signature {
&self.signature
}

fn return_type(&self, _args: &[DataType]) -> Result<DataType> {
Ok(DataType::Utf8)
}

fn invoke_with_args(&self, _args: ScalarFunctionArgs) -> Result<ColumnarValue> {
exec_err!(
"input_file_name() should be replaced with a literal before execution"
)
}
}

fn create_projection_exprs<'a>(
exprs: impl IntoIterator<Item = &'a Expr>,
schema: &SchemaRef,
Expand All @@ -311,6 +391,65 @@ mod test {
ProjectionExprs::from(projection_exprs)
}

#[tokio::test]
async fn projection_opener_replaces_input_file_name_udf_with_literal() -> Result<()> {
let file_schema =
Arc::new(Schema::new(vec![Field::new("c1", DataType::Int32, false)]));

let batch = RecordBatch::try_new(
Arc::clone(&file_schema),
vec![Arc::new(arrow::array::Int32Array::from(vec![1]))],
)?;

struct TestOpener {
batch: RecordBatch,
}

impl FileOpener for TestOpener {
fn open(&self, _partitioned_file: PartitionedFile) -> Result<FileOpenFuture> {
let batch = self.batch.clone();
Ok(async move {
let stream = futures::stream::iter(vec![Ok(batch)]);
Ok(stream.boxed())
}
.boxed())
}
}

let udf = Arc::new(ScalarUDF::new_from_impl(TestInputFileNameUdf::new()));
let udf_expr = Arc::new(ScalarFunctionExpr::try_new(
udf,
vec![],
file_schema.as_ref(),
Arc::new(ConfigOptions::default()),
)?);

let projection = ProjectionExprs::new(vec![
ProjectionExpr::new(Arc::new(Column::new("c1", 0)), "c1"),
ProjectionExpr::new(udf_expr, "input_file_name"),
]);

let split = SplitProjection::new(file_schema.as_ref(), &projection);
let inner: Arc<dyn FileOpener> = Arc::new(TestOpener { batch });
let opener = ProjectionOpener::try_new(split, inner, file_schema.as_ref())?;

let mut stream = opener
.open(PartitionedFile::new("data/path/file.csv", 0))?
.await?;
let output_batch = stream.next().await.unwrap()?;

assert_eq!(output_batch.schema().field(1).name(), "input_file_name");
assert!(
output_batch
.column(1)
.as_string::<i32>()
.value(0)
.ends_with("file.csv")
);

Ok(())
}

#[test]
fn test_split_projection_with_partition_columns() {
use arrow::array::AsArray;
Expand Down
Loading