diff --git a/datafusion/datasource-parquet/src/file_format.rs b/datafusion/datasource-parquet/src/file_format.rs index c4faedf571f6d..c3082ac4dd28c 100644 --- a/datafusion/datasource-parquet/src/file_format.rs +++ b/datafusion/datasource-parquet/src/file_format.rs @@ -485,22 +485,42 @@ impl FileFormat for ParquetFormat { .await?; let file_metadata_cache = state.runtime_env().cache_manager.get_file_metadata_cache(); - let metadata = DFParquetMetadata::new(store, object) - .with_metadata_size_hint(self.metadata_size_hint()) - .with_decryption_properties(file_decryption_properties) - .with_file_metadata_cache(Some(file_metadata_cache)) - .fetch_metadata() - .await?; - let statistics = DFParquetMetadata::statistics_from_parquet_metadata( - &metadata, - &table_schema, - )?; - let ordering = - crate::metadata::ordering_from_parquet_metadata(&metadata, &table_schema)?; - Ok( - datafusion_datasource::file_format::FileMeta::new(statistics) - .with_ordering(ordering), - ) + let store = Arc::clone(store); + let object = object.clone(); + let metadata_size_hint = self.metadata_size_hint(); + let fetch_and_parse = async move { + let metadata = DFParquetMetadata::new(&store, &object) + .with_metadata_size_hint(metadata_size_hint) + .with_decryption_properties(file_decryption_properties) + .with_file_metadata_cache(Some(file_metadata_cache)) + .fetch_metadata() + .await?; + let statistics = DFParquetMetadata::statistics_from_parquet_metadata( + &metadata, + &table_schema, + )?; + let ordering = crate::metadata::ordering_from_parquet_metadata( + &metadata, + &table_schema, + )?; + Ok::<_, DataFusionError>( + datafusion_datasource::file_format::FileMeta::new(statistics) + .with_ordering(ordering), + ) + }; + + // Parse metadata and extract statistics on a separate thread so that + // multiple concurrent calls (e.g. from `list_files_for_scan`) can run + // the CPU-heavy work in parallel rather than serializing on a single + // async task. + if tokio::runtime::Handle::try_current().is_ok() { + let handle = tokio::runtime::Handle::current(); + SpawnedTask::spawn_blocking(move || handle.block_on(fetch_and_parse)) + .await + .map_err(|e| DataFusionError::ExecutionJoin(Box::new(e)))? + } else { + fetch_and_parse.await + } } async fn create_physical_plan(