From 95a925950c22fe241aedbdd4a824339f6a266c44 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Fri, 17 Apr 2026 12:09:47 +0200 Subject: [PATCH] perf: parallelize CPU-heavy parquet metadata parsing in `list_files_for_scan` Wraps the metadata fetch + statistics/ordering extraction inside `ParquetFormat::infer_stats_and_ordering` in `SpawnedTask::spawn_blocking` with `handle.block_on` so each call runs on a separate worker thread. `list_files_for_scan` already drives this via `.buffer_unordered(meta_fetch_concurrency)`, so concurrent per-file work now actually runs in parallel across threads instead of serializing the CPU-heavy parquet metadata decode/merge onto a single async task. Follows the same pattern as #19969 (parallelizing `infer_schema`). Part of #19971. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../datasource-parquet/src/file_format.rs | 52 +++++++++++++------ 1 file changed, 36 insertions(+), 16 deletions(-) diff --git a/datafusion/datasource-parquet/src/file_format.rs b/datafusion/datasource-parquet/src/file_format.rs index c4faedf571f6d..c3082ac4dd28c 100644 --- a/datafusion/datasource-parquet/src/file_format.rs +++ b/datafusion/datasource-parquet/src/file_format.rs @@ -485,22 +485,42 @@ impl FileFormat for ParquetFormat { .await?; let file_metadata_cache = state.runtime_env().cache_manager.get_file_metadata_cache(); - let metadata = DFParquetMetadata::new(store, object) - .with_metadata_size_hint(self.metadata_size_hint()) - .with_decryption_properties(file_decryption_properties) - .with_file_metadata_cache(Some(file_metadata_cache)) - .fetch_metadata() - .await?; - let statistics = DFParquetMetadata::statistics_from_parquet_metadata( - &metadata, - &table_schema, - )?; - let ordering = - crate::metadata::ordering_from_parquet_metadata(&metadata, &table_schema)?; - Ok( - datafusion_datasource::file_format::FileMeta::new(statistics) - .with_ordering(ordering), - ) + let store = Arc::clone(store); + let object = object.clone(); + let metadata_size_hint = self.metadata_size_hint(); + let fetch_and_parse = async move { + let metadata = DFParquetMetadata::new(&store, &object) + .with_metadata_size_hint(metadata_size_hint) + .with_decryption_properties(file_decryption_properties) + .with_file_metadata_cache(Some(file_metadata_cache)) + .fetch_metadata() + .await?; + let statistics = DFParquetMetadata::statistics_from_parquet_metadata( + &metadata, + &table_schema, + )?; + let ordering = crate::metadata::ordering_from_parquet_metadata( + &metadata, + &table_schema, + )?; + Ok::<_, DataFusionError>( + datafusion_datasource::file_format::FileMeta::new(statistics) + .with_ordering(ordering), + ) + }; + + // Parse metadata and extract statistics on a separate thread so that + // multiple concurrent calls (e.g. from `list_files_for_scan`) can run + // the CPU-heavy work in parallel rather than serializing on a single + // async task. + if tokio::runtime::Handle::try_current().is_ok() { + let handle = tokio::runtime::Handle::current(); + SpawnedTask::spawn_blocking(move || handle.block_on(fetch_and_parse)) + .await + .map_err(|e| DataFusionError::ExecutionJoin(Box::new(e)))? + } else { + fetch_and_parse.await + } } async fn create_physical_plan(