Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 96 additions & 30 deletions datafusion/datasource-parquet/src/file_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -307,16 +307,15 @@ fn clear_metadata(

#[cfg(feature = "parquet_encryption")]
async fn get_file_decryption_properties(
state: &dyn Session,
runtime: &RuntimeEnv,
options: &TableParquetOptions,
file_path: &Path,
) -> Result<Option<Arc<FileDecryptionProperties>>> {
Ok(match &options.crypto.file_decryption {
Some(cfd) => Some(Arc::new(FileDecryptionProperties::from(cfd.clone()))),
None => match &options.crypto.factory_id {
Some(factory_id) => {
let factory =
state.runtime_env().parquet_encryption_factory(factory_id)?;
let factory = runtime.parquet_encryption_factory(factory_id)?;
factory
.get_file_decryption_properties(
&options.crypto.factory_options,
Expand All @@ -331,7 +330,7 @@ async fn get_file_decryption_properties(

#[cfg(not(feature = "parquet_encryption"))]
async fn get_file_decryption_properties(
_state: &dyn Session,
_runtime: &RuntimeEnv,
_options: &TableParquetOptions,
_file_path: &Path,
) -> Result<Option<Arc<FileDecryptionProperties>>> {
Expand Down Expand Up @@ -377,26 +376,84 @@ impl FileFormat for ParquetFormat {
let file_metadata_cache =
state.runtime_env().cache_manager.get_file_metadata_cache();

let mut schemas: Vec<_> = futures::stream::iter(objects)
.map(|object| async {
let file_decryption_properties = get_file_decryption_properties(
state,
&self.options,
&object.location,
)
.await?;
let result = DFParquetMetadata::new(store.as_ref(), object)
.with_metadata_size_hint(self.metadata_size_hint())
.with_decryption_properties(file_decryption_properties)
.with_file_metadata_cache(Some(Arc::clone(&file_metadata_cache)))
.with_coerce_int96(coerce_int96)
.fetch_schema_with_location()
.await?;
Ok::<_, DataFusionError>(result)
let runtime = Arc::clone(state.runtime_env());
let meta_fetch_concurrency =
state.config_options().execution.meta_fetch_concurrency;

let mut schemas: Vec<(Path, Schema)> = futures::stream::iter(objects)
.map(|object| {
let object = object.clone();
let store = Arc::clone(store);
let runtime = Arc::clone(&runtime);
let options = self.options.clone();
let file_metadata_cache = Arc::clone(&file_metadata_cache);
let metadata_size_hint = self.metadata_size_hint();
async move {
let fetch_metadata = async move {
let file_decryption_properties = get_file_decryption_properties(
&runtime,
&options,
&object.location,
)
.await?;
let metadata = DFParquetMetadata::new(store.as_ref(), &object)
.with_metadata_size_hint(metadata_size_hint)
.with_decryption_properties(file_decryption_properties)
.with_file_metadata_cache(Some(file_metadata_cache))
.fetch_metadata()
.await?;
Ok::<_, DataFusionError>((object, metadata))
};

if tokio::runtime::Handle::try_current().is_ok() {
let handle = tokio::runtime::Handle::current();
SpawnedTask::spawn_blocking(move || {
handle.block_on(fetch_metadata)
})
.await
.map_err(|e| DataFusionError::ExecutionJoin(Box::new(e)))?
} else {
fetch_metadata.await
}
}
})
.boxed() // Workaround https://github.com/rust-lang/rust/issues/64552
// fetch metadata concurrently
.buffer_unordered(meta_fetch_concurrency)
.map(|result| async move {
let (object, metadata): (ObjectMeta, Arc<ParquetMetaData>) = result?;

let parse_schema = move || {
let file_metadata = metadata.file_metadata();
let schema = parquet::arrow::parquet_to_arrow_schema(
file_metadata.schema_descr(),
file_metadata.key_value_metadata(),
)?;
let schema = coerce_int96
.as_ref()
.and_then(|time_unit| {
coerce_int96_to_resolution(
file_metadata.schema_descr(),
&schema,
time_unit,
)
})
.unwrap_or(schema);
Ok::<_, DataFusionError>((object.location.clone(), schema))
};

if tokio::runtime::Handle::try_current().is_ok() {
SpawnedTask::spawn_blocking(parse_schema)
.await
.map_err(|e| DataFusionError::ExecutionJoin(Box::new(e)))?
} else {
parse_schema()
}
})
.boxed() // Workaround https://github.com/rust-lang/rust/issues/64552
// fetch schemas concurrently, if requested
.buffer_unordered(state.config_options().execution.meta_fetch_concurrency)
// order does not matter for schema inference, it is handled below
.buffer_unordered(meta_fetch_concurrency)
.try_collect()
.await?;

Expand Down Expand Up @@ -439,9 +496,12 @@ impl FileFormat for ParquetFormat {
table_schema: SchemaRef,
object: &ObjectMeta,
) -> Result<Statistics> {
let file_decryption_properties =
get_file_decryption_properties(state, &self.options, &object.location)
.await?;
let file_decryption_properties = get_file_decryption_properties(
state.runtime_env(),
&self.options,
&object.location,
)
.await?;
let file_metadata_cache =
state.runtime_env().cache_manager.get_file_metadata_cache();
DFParquetMetadata::new(store, object)
Expand All @@ -459,9 +519,12 @@ impl FileFormat for ParquetFormat {
table_schema: SchemaRef,
object: &ObjectMeta,
) -> Result<Option<LexOrdering>> {
let file_decryption_properties =
get_file_decryption_properties(state, &self.options, &object.location)
.await?;
let file_decryption_properties = get_file_decryption_properties(
state.runtime_env(),
&self.options,
&object.location,
)
.await?;
let file_metadata_cache =
state.runtime_env().cache_manager.get_file_metadata_cache();
let metadata = DFParquetMetadata::new(store, object)
Expand All @@ -480,9 +543,12 @@ impl FileFormat for ParquetFormat {
table_schema: SchemaRef,
object: &ObjectMeta,
) -> Result<datafusion_datasource::file_format::FileMeta> {
let file_decryption_properties =
get_file_decryption_properties(state, &self.options, &object.location)
.await?;
let file_decryption_properties = get_file_decryption_properties(
state.runtime_env(),
&self.options,
&object.location,
)
.await?;
let file_metadata_cache =
state.runtime_env().cache_manager.get_file_metadata_cache();
let metadata = DFParquetMetadata::new(store, object)
Expand Down
8 changes: 0 additions & 8 deletions datafusion/datasource-parquet/src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ use datafusion_physical_expr::expressions::Column;
use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr};
use datafusion_physical_plan::Accumulator;
use log::debug;
use object_store::path::Path;
use object_store::{ObjectMeta, ObjectStore};
use parquet::DecodeResult;
use parquet::arrow::arrow_reader::statistics::StatisticsConverter;
Expand Down Expand Up @@ -228,13 +227,6 @@ impl<'a> DFParquetMetadata<'a> {
Ok(schema)
}

/// Return (path, schema) tuple by fetching the schema from Parquet file
pub(crate) async fn fetch_schema_with_location(&self) -> Result<(Path, Schema)> {
let loc_path = self.object_meta.location.clone();
let schema = self.fetch_schema().await?;
Ok((loc_path, schema))
}

/// Fetch the metadata from the Parquet file via [`Self::fetch_metadata`] and convert
/// the statistics in the metadata using [`Self::statistics_from_parquet_metadata`]
pub async fn fetch_statistics(&self, table_schema: &SchemaRef) -> Result<Statistics> {
Expand Down
Loading