From 4773b436f78636cb4a8c7ce266e1e83dc2f48675 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Sat, 24 Jan 2026 13:41:05 +0100 Subject: [PATCH 1/4] Parallelize infer_schema --- .../datasource-parquet/src/file_format.rs | 81 ++++++++++++------- 1 file changed, 53 insertions(+), 28 deletions(-) diff --git a/datafusion/datasource-parquet/src/file_format.rs b/datafusion/datasource-parquet/src/file_format.rs index 6635c9072dd97..89a42134f9188 100644 --- a/datafusion/datasource-parquet/src/file_format.rs +++ b/datafusion/datasource-parquet/src/file_format.rs @@ -302,7 +302,7 @@ fn clear_metadata( #[cfg(feature = "parquet_encryption")] async fn get_file_decryption_properties( - state: &dyn Session, + runtime: &RuntimeEnv, options: &TableParquetOptions, file_path: &Path, ) -> Result>> { @@ -310,8 +310,7 @@ async fn get_file_decryption_properties( Some(cfd) => Some(Arc::new(FileDecryptionProperties::from(cfd.clone()))), None => match &options.crypto.factory_id { Some(factory_id) => { - let factory = - state.runtime_env().parquet_encryption_factory(factory_id)?; + let factory = runtime.parquet_encryption_factory(factory_id)?; factory .get_file_decryption_properties( &options.crypto.factory_options, @@ -326,7 +325,7 @@ async fn get_file_decryption_properties( #[cfg(not(feature = "parquet_encryption"))] async fn get_file_decryption_properties( - _state: &dyn Session, + _runtime: &RuntimeEnv, _options: &TableParquetOptions, _file_path: &Path, ) -> Result>> { @@ -372,26 +371,43 @@ impl FileFormat for ParquetFormat { let file_metadata_cache = state.runtime_env().cache_manager.get_file_metadata_cache(); + let runtime = Arc::clone(state.runtime_env()); + let meta_fetch_concurrency = + state.config_options().execution.meta_fetch_concurrency; + let mut schemas: Vec<_> = futures::stream::iter(objects) - .map(|object| async { - let file_decryption_properties = get_file_decryption_properties( - state, - &self.options, - &object.location, - ) - .await?; - let result = DFParquetMetadata::new(store.as_ref(), object) - .with_metadata_size_hint(self.metadata_size_hint()) - .with_decryption_properties(file_decryption_properties) - .with_file_metadata_cache(Some(Arc::clone(&file_metadata_cache))) - .with_coerce_int96(coerce_int96) - .fetch_schema_with_location() + .map(|object| { + let object = object.clone(); + let store = Arc::clone(store); + let runtime = Arc::clone(&runtime); + let options = self.options.clone(); + let file_metadata_cache = Arc::clone(&file_metadata_cache); + let metadata_size_hint = self.metadata_size_hint(); + SpawnedTask::spawn(async move { + let file_decryption_properties = get_file_decryption_properties( + &runtime, + &options, + &object.location, + ) .await?; - Ok::<_, DataFusionError>(result) + let result = DFParquetMetadata::new(store.as_ref(), &object) + .with_metadata_size_hint(metadata_size_hint) + .with_decryption_properties(file_decryption_properties) + .with_file_metadata_cache(Some(file_metadata_cache)) + .with_coerce_int96(coerce_int96) + .fetch_schema_with_location() + .await?; + Ok::<_, DataFusionError>(result) + }) }) .boxed() // Workaround https://github.com/rust-lang/rust/issues/64552 // fetch schemas concurrently, if requested - .buffered(state.config_options().execution.meta_fetch_concurrency) + // order does not matter for schema inference, it is handled below + .buffer_unordered(meta_fetch_concurrency) + .map(|result| match result { + Ok(res) => res, + Err(e) => Err(DataFusionError::External(Box::new(e))), + }) .try_collect() .await?; @@ -436,9 +452,12 @@ impl FileFormat for ParquetFormat { table_schema: SchemaRef, object: &ObjectMeta, ) -> Result { - let file_decryption_properties = - get_file_decryption_properties(state, &self.options, &object.location) - .await?; + let file_decryption_properties = get_file_decryption_properties( + state.runtime_env(), + &self.options, + &object.location, + ) + .await?; let file_metadata_cache = state.runtime_env().cache_manager.get_file_metadata_cache(); DFParquetMetadata::new(store, object) @@ -456,9 +475,12 @@ impl FileFormat for ParquetFormat { table_schema: SchemaRef, object: &ObjectMeta, ) -> Result> { - let file_decryption_properties = - get_file_decryption_properties(state, &self.options, &object.location) - .await?; + let file_decryption_properties = get_file_decryption_properties( + state.runtime_env(), + &self.options, + &object.location, + ) + .await?; let file_metadata_cache = state.runtime_env().cache_manager.get_file_metadata_cache(); let metadata = DFParquetMetadata::new(store, object) @@ -477,9 +499,12 @@ impl FileFormat for ParquetFormat { table_schema: SchemaRef, object: &ObjectMeta, ) -> Result { - let file_decryption_properties = - get_file_decryption_properties(state, &self.options, &object.location) - .await?; + let file_decryption_properties = get_file_decryption_properties( + state.runtime_env(), + &self.options, + &object.location, + ) + .await?; let file_metadata_cache = state.runtime_env().cache_manager.get_file_metadata_cache(); let metadata = DFParquetMetadata::new(store, object) From 7596392ea0fc01b10767bc34c6a06e6965329f43 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Mon, 26 Jan 2026 09:30:02 +0100 Subject: [PATCH 2/4] Improve parallelism --- .../datasource-parquet/src/file_format.rs | 52 +++++++++++++++---- 1 file changed, 43 insertions(+), 9 deletions(-) diff --git a/datafusion/datasource-parquet/src/file_format.rs b/datafusion/datasource-parquet/src/file_format.rs index 89a42134f9188..06c6c3e173641 100644 --- a/datafusion/datasource-parquet/src/file_format.rs +++ b/datafusion/datasource-parquet/src/file_format.rs @@ -375,7 +375,7 @@ impl FileFormat for ParquetFormat { let meta_fetch_concurrency = state.config_options().execution.meta_fetch_concurrency; - let mut schemas: Vec<_> = futures::stream::iter(objects) + let mut schemas: Vec<(Path, Schema)> = futures::stream::iter(objects) .map(|object| { let object = object.clone(); let store = Arc::clone(store); @@ -390,24 +390,58 @@ impl FileFormat for ParquetFormat { &object.location, ) .await?; - let result = DFParquetMetadata::new(store.as_ref(), &object) + let metadata = DFParquetMetadata::new(store.as_ref(), &object) .with_metadata_size_hint(metadata_size_hint) .with_decryption_properties(file_decryption_properties) .with_file_metadata_cache(Some(file_metadata_cache)) - .with_coerce_int96(coerce_int96) - .fetch_schema_with_location() + .fetch_metadata() .await?; - Ok::<_, DataFusionError>(result) + Ok::<_, DataFusionError>((object, metadata)) }) }) .boxed() // Workaround https://github.com/rust-lang/rust/issues/64552 + // fetch metadata concurrently + .buffer_unordered(meta_fetch_concurrency) + .map(|result| { + let coerce_int96 = coerce_int96.clone(); + async move { + let (object, metadata) = match result { + Ok(res) => res?, + Err(e) => { + return Err(DataFusionError::ExecutionJoin(Box::new(e))); + } + }; + + let join_res = SpawnedTask::spawn_blocking(move || { + let file_metadata = metadata.file_metadata(); + let schema = parquet::arrow::parquet_to_arrow_schema( + file_metadata.schema_descr(), + file_metadata.key_value_metadata(), + )?; + let schema = coerce_int96 + .as_ref() + .and_then(|time_unit| { + coerce_int96_to_resolution( + file_metadata.schema_descr(), + &schema, + time_unit, + ) + }) + .unwrap_or(schema); + Ok::<_, DataFusionError>((object.location.clone(), schema)) + }) + .await; + + match join_res { + Ok(res) => res, + Err(e) => Err(DataFusionError::ExecutionJoin(Box::new(e))), + } + } + }) + .boxed() // Workaround https://github.com/rust-lang/rust/issues/64552 // fetch schemas concurrently, if requested // order does not matter for schema inference, it is handled below .buffer_unordered(meta_fetch_concurrency) - .map(|result| match result { - Ok(res) => res, - Err(e) => Err(DataFusionError::External(Box::new(e))), - }) .try_collect() .await?; From 2888fcf9eb116865bc85acbed2f0015712bdb998 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Mon, 26 Jan 2026 09:57:10 +0100 Subject: [PATCH 3/4] WIP --- .../datasource-parquet/src/file_format.rs | 85 ++++++++++++------- 1 file changed, 56 insertions(+), 29 deletions(-) diff --git a/datafusion/datasource-parquet/src/file_format.rs b/datafusion/datasource-parquet/src/file_format.rs index 06c6c3e173641..aa46aa497d693 100644 --- a/datafusion/datasource-parquet/src/file_format.rs +++ b/datafusion/datasource-parquet/src/file_format.rs @@ -383,21 +383,34 @@ impl FileFormat for ParquetFormat { let options = self.options.clone(); let file_metadata_cache = Arc::clone(&file_metadata_cache); let metadata_size_hint = self.metadata_size_hint(); - SpawnedTask::spawn(async move { - let file_decryption_properties = get_file_decryption_properties( - &runtime, - &options, - &object.location, - ) - .await?; - let metadata = DFParquetMetadata::new(store.as_ref(), &object) - .with_metadata_size_hint(metadata_size_hint) - .with_decryption_properties(file_decryption_properties) - .with_file_metadata_cache(Some(file_metadata_cache)) - .fetch_metadata() + async move { + let fetch_metadata = async move { + let file_decryption_properties = get_file_decryption_properties( + &runtime, + &options, + &object.location, + ) .await?; - Ok::<_, DataFusionError>((object, metadata)) - }) + let metadata = DFParquetMetadata::new(store.as_ref(), &object) + .with_metadata_size_hint(metadata_size_hint) + .with_decryption_properties(file_decryption_properties) + .with_file_metadata_cache(Some(file_metadata_cache)) + .fetch_metadata() + .await?; + Ok::<_, DataFusionError>((object, metadata)) + }; + + if tokio::runtime::Handle::try_current().is_ok() { + let handle = tokio::runtime::Handle::current(); + SpawnedTask::spawn_blocking(move || { + handle.block_on(fetch_metadata) + }) + .await + .map_err(|e| DataFusionError::ExecutionJoin(Box::new(e)))? + } else { + fetch_metadata.await + } + } }) .boxed() // Workaround https://github.com/rust-lang/rust/issues/64552 // fetch metadata concurrently @@ -405,14 +418,32 @@ impl FileFormat for ParquetFormat { .map(|result| { let coerce_int96 = coerce_int96.clone(); async move { - let (object, metadata) = match result { - Ok(res) => res?, - Err(e) => { - return Err(DataFusionError::ExecutionJoin(Box::new(e))); - } - }; - - let join_res = SpawnedTask::spawn_blocking(move || { + let (object, metadata) = result?; + + let (location, schema) = if tokio::runtime::Handle::try_current() + .is_ok() + { + SpawnedTask::spawn_blocking(move || { + let file_metadata = metadata.file_metadata(); + let schema = parquet::arrow::parquet_to_arrow_schema( + file_metadata.schema_descr(), + file_metadata.key_value_metadata(), + )?; + let schema = coerce_int96 + .as_ref() + .and_then(|time_unit| { + coerce_int96_to_resolution( + file_metadata.schema_descr(), + &schema, + time_unit, + ) + }) + .unwrap_or(schema); + Ok::<_, DataFusionError>((object.location.clone(), schema)) + }) + .await + .map_err(|e| DataFusionError::ExecutionJoin(Box::new(e)))?? + } else { let file_metadata = metadata.file_metadata(); let schema = parquet::arrow::parquet_to_arrow_schema( file_metadata.schema_descr(), @@ -428,14 +459,10 @@ impl FileFormat for ParquetFormat { ) }) .unwrap_or(schema); - Ok::<_, DataFusionError>((object.location.clone(), schema)) - }) - .await; + (object.location.clone(), schema) + }; - match join_res { - Ok(res) => res, - Err(e) => Err(DataFusionError::ExecutionJoin(Box::new(e))), - } + Ok::<_, DataFusionError>((location, schema)) } }) .boxed() // Workaround https://github.com/rust-lang/rust/issues/64552 From 536cd9971e2b26bab708c903d39c03c0901b0826 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Mon, 26 Jan 2026 09:58:37 +0100 Subject: [PATCH 4/4] WIP --- .../datasource-parquet/src/file_format.rs | 37 +++++-------------- 1 file changed, 10 insertions(+), 27 deletions(-) diff --git a/datafusion/datasource-parquet/src/file_format.rs b/datafusion/datasource-parquet/src/file_format.rs index aa46aa497d693..6e822a787a2d6 100644 --- a/datafusion/datasource-parquet/src/file_format.rs +++ b/datafusion/datasource-parquet/src/file_format.rs @@ -418,32 +418,9 @@ impl FileFormat for ParquetFormat { .map(|result| { let coerce_int96 = coerce_int96.clone(); async move { - let (object, metadata) = result?; + let (object, metadata): (ObjectMeta, Arc) = result?; - let (location, schema) = if tokio::runtime::Handle::try_current() - .is_ok() - { - SpawnedTask::spawn_blocking(move || { - let file_metadata = metadata.file_metadata(); - let schema = parquet::arrow::parquet_to_arrow_schema( - file_metadata.schema_descr(), - file_metadata.key_value_metadata(), - )?; - let schema = coerce_int96 - .as_ref() - .and_then(|time_unit| { - coerce_int96_to_resolution( - file_metadata.schema_descr(), - &schema, - time_unit, - ) - }) - .unwrap_or(schema); - Ok::<_, DataFusionError>((object.location.clone(), schema)) - }) - .await - .map_err(|e| DataFusionError::ExecutionJoin(Box::new(e)))?? - } else { + let parse_schema = move || { let file_metadata = metadata.file_metadata(); let schema = parquet::arrow::parquet_to_arrow_schema( file_metadata.schema_descr(), @@ -459,10 +436,16 @@ impl FileFormat for ParquetFormat { ) }) .unwrap_or(schema); - (object.location.clone(), schema) + Ok::<_, DataFusionError>((object.location.clone(), schema)) }; - Ok::<_, DataFusionError>((location, schema)) + if tokio::runtime::Handle::try_current().is_ok() { + SpawnedTask::spawn_blocking(parse_schema) + .await + .map_err(|e| DataFusionError::ExecutionJoin(Box::new(e)))? + } else { + parse_schema() + } } }) .boxed() // Workaround https://github.com/rust-lang/rust/issues/64552