diff --git a/datafusion/datasource-parquet/src/file_format.rs b/datafusion/datasource-parquet/src/file_format.rs index c4faedf571f6d..580146630defc 100644 --- a/datafusion/datasource-parquet/src/file_format.rs +++ b/datafusion/datasource-parquet/src/file_format.rs @@ -307,7 +307,7 @@ fn clear_metadata( #[cfg(feature = "parquet_encryption")] async fn get_file_decryption_properties( - state: &dyn Session, + runtime: &RuntimeEnv, options: &TableParquetOptions, file_path: &Path, ) -> Result>> { @@ -315,8 +315,7 @@ async fn get_file_decryption_properties( Some(cfd) => Some(Arc::new(FileDecryptionProperties::from(cfd.clone()))), None => match &options.crypto.factory_id { Some(factory_id) => { - let factory = - state.runtime_env().parquet_encryption_factory(factory_id)?; + let factory = runtime.parquet_encryption_factory(factory_id)?; factory .get_file_decryption_properties( &options.crypto.factory_options, @@ -331,7 +330,7 @@ async fn get_file_decryption_properties( #[cfg(not(feature = "parquet_encryption"))] async fn get_file_decryption_properties( - _state: &dyn Session, + _runtime: &RuntimeEnv, _options: &TableParquetOptions, _file_path: &Path, ) -> Result>> { @@ -377,26 +376,84 @@ impl FileFormat for ParquetFormat { let file_metadata_cache = state.runtime_env().cache_manager.get_file_metadata_cache(); - let mut schemas: Vec<_> = futures::stream::iter(objects) - .map(|object| async { - let file_decryption_properties = get_file_decryption_properties( - state, - &self.options, - &object.location, - ) - .await?; - let result = DFParquetMetadata::new(store.as_ref(), object) - .with_metadata_size_hint(self.metadata_size_hint()) - .with_decryption_properties(file_decryption_properties) - .with_file_metadata_cache(Some(Arc::clone(&file_metadata_cache))) - .with_coerce_int96(coerce_int96) - .fetch_schema_with_location() - .await?; - Ok::<_, DataFusionError>(result) + let runtime = Arc::clone(state.runtime_env()); + let meta_fetch_concurrency = + state.config_options().execution.meta_fetch_concurrency; + + let mut schemas: Vec<(Path, Schema)> = futures::stream::iter(objects) + .map(|object| { + let object = object.clone(); + let store = Arc::clone(store); + let runtime = Arc::clone(&runtime); + let options = self.options.clone(); + let file_metadata_cache = Arc::clone(&file_metadata_cache); + let metadata_size_hint = self.metadata_size_hint(); + async move { + let fetch_metadata = async move { + let file_decryption_properties = get_file_decryption_properties( + &runtime, + &options, + &object.location, + ) + .await?; + let metadata = DFParquetMetadata::new(store.as_ref(), &object) + .with_metadata_size_hint(metadata_size_hint) + .with_decryption_properties(file_decryption_properties) + .with_file_metadata_cache(Some(file_metadata_cache)) + .fetch_metadata() + .await?; + Ok::<_, DataFusionError>((object, metadata)) + }; + + if tokio::runtime::Handle::try_current().is_ok() { + let handle = tokio::runtime::Handle::current(); + SpawnedTask::spawn_blocking(move || { + handle.block_on(fetch_metadata) + }) + .await + .map_err(|e| DataFusionError::ExecutionJoin(Box::new(e)))? + } else { + fetch_metadata.await + } + } + }) + .boxed() // Workaround https://github.com/rust-lang/rust/issues/64552 + // fetch metadata concurrently + .buffer_unordered(meta_fetch_concurrency) + .map(|result| async move { + let (object, metadata): (ObjectMeta, Arc) = result?; + + let parse_schema = move || { + let file_metadata = metadata.file_metadata(); + let schema = parquet::arrow::parquet_to_arrow_schema( + file_metadata.schema_descr(), + file_metadata.key_value_metadata(), + )?; + let schema = coerce_int96 + .as_ref() + .and_then(|time_unit| { + coerce_int96_to_resolution( + file_metadata.schema_descr(), + &schema, + time_unit, + ) + }) + .unwrap_or(schema); + Ok::<_, DataFusionError>((object.location.clone(), schema)) + }; + + if tokio::runtime::Handle::try_current().is_ok() { + SpawnedTask::spawn_blocking(parse_schema) + .await + .map_err(|e| DataFusionError::ExecutionJoin(Box::new(e)))? + } else { + parse_schema() + } }) .boxed() // Workaround https://github.com/rust-lang/rust/issues/64552 // fetch schemas concurrently, if requested - .buffer_unordered(state.config_options().execution.meta_fetch_concurrency) + // order does not matter for schema inference, it is handled below + .buffer_unordered(meta_fetch_concurrency) .try_collect() .await?; @@ -439,9 +496,12 @@ impl FileFormat for ParquetFormat { table_schema: SchemaRef, object: &ObjectMeta, ) -> Result { - let file_decryption_properties = - get_file_decryption_properties(state, &self.options, &object.location) - .await?; + let file_decryption_properties = get_file_decryption_properties( + state.runtime_env(), + &self.options, + &object.location, + ) + .await?; let file_metadata_cache = state.runtime_env().cache_manager.get_file_metadata_cache(); DFParquetMetadata::new(store, object) @@ -459,9 +519,12 @@ impl FileFormat for ParquetFormat { table_schema: SchemaRef, object: &ObjectMeta, ) -> Result> { - let file_decryption_properties = - get_file_decryption_properties(state, &self.options, &object.location) - .await?; + let file_decryption_properties = get_file_decryption_properties( + state.runtime_env(), + &self.options, + &object.location, + ) + .await?; let file_metadata_cache = state.runtime_env().cache_manager.get_file_metadata_cache(); let metadata = DFParquetMetadata::new(store, object) @@ -480,9 +543,12 @@ impl FileFormat for ParquetFormat { table_schema: SchemaRef, object: &ObjectMeta, ) -> Result { - let file_decryption_properties = - get_file_decryption_properties(state, &self.options, &object.location) - .await?; + let file_decryption_properties = get_file_decryption_properties( + state.runtime_env(), + &self.options, + &object.location, + ) + .await?; let file_metadata_cache = state.runtime_env().cache_manager.get_file_metadata_cache(); let metadata = DFParquetMetadata::new(store, object) diff --git a/datafusion/datasource-parquet/src/metadata.rs b/datafusion/datasource-parquet/src/metadata.rs index 3d4d051abbb26..75c501b13b370 100644 --- a/datafusion/datasource-parquet/src/metadata.rs +++ b/datafusion/datasource-parquet/src/metadata.rs @@ -37,7 +37,6 @@ use datafusion_physical_expr::expressions::Column; use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr}; use datafusion_physical_plan::Accumulator; use log::debug; -use object_store::path::Path; use object_store::{ObjectMeta, ObjectStore}; use parquet::DecodeResult; use parquet::arrow::arrow_reader::statistics::StatisticsConverter; @@ -228,13 +227,6 @@ impl<'a> DFParquetMetadata<'a> { Ok(schema) } - /// Return (path, schema) tuple by fetching the schema from Parquet file - pub(crate) async fn fetch_schema_with_location(&self) -> Result<(Path, Schema)> { - let loc_path = self.object_meta.location.clone(); - let schema = self.fetch_schema().await?; - Ok((loc_path, schema)) - } - /// Fetch the metadata from the Parquet file via [`Self::fetch_metadata`] and convert /// the statistics in the metadata using [`Self::statistics_from_parquet_metadata`] pub async fn fetch_statistics(&self, table_schema: &SchemaRef) -> Result {