Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 36 additions & 16 deletions datafusion/datasource-parquet/src/file_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -485,22 +485,42 @@ impl FileFormat for ParquetFormat {
.await?;
let file_metadata_cache =
state.runtime_env().cache_manager.get_file_metadata_cache();
let metadata = DFParquetMetadata::new(store, object)
.with_metadata_size_hint(self.metadata_size_hint())
.with_decryption_properties(file_decryption_properties)
.with_file_metadata_cache(Some(file_metadata_cache))
.fetch_metadata()
.await?;
let statistics = DFParquetMetadata::statistics_from_parquet_metadata(
&metadata,
&table_schema,
)?;
let ordering =
crate::metadata::ordering_from_parquet_metadata(&metadata, &table_schema)?;
Ok(
datafusion_datasource::file_format::FileMeta::new(statistics)
.with_ordering(ordering),
)
let store = Arc::clone(store);
let object = object.clone();
let metadata_size_hint = self.metadata_size_hint();
let fetch_and_parse = async move {
let metadata = DFParquetMetadata::new(&store, &object)
.with_metadata_size_hint(metadata_size_hint)
.with_decryption_properties(file_decryption_properties)
.with_file_metadata_cache(Some(file_metadata_cache))
.fetch_metadata()
.await?;
let statistics = DFParquetMetadata::statistics_from_parquet_metadata(
&metadata,
&table_schema,
)?;
let ordering = crate::metadata::ordering_from_parquet_metadata(
&metadata,
&table_schema,
)?;
Ok::<_, DataFusionError>(
datafusion_datasource::file_format::FileMeta::new(statistics)
.with_ordering(ordering),
)
};

// Parse metadata and extract statistics on a separate thread so that
// multiple concurrent calls (e.g. from `list_files_for_scan`) can run
// the CPU-heavy work in parallel rather than serializing on a single
// async task.
if tokio::runtime::Handle::try_current().is_ok() {
let handle = tokio::runtime::Handle::current();
SpawnedTask::spawn_blocking(move || handle.block_on(fetch_and_parse))
.await
.map_err(|e| DataFusionError::ExecutionJoin(Box::new(e)))?
} else {
fetch_and_parse.await
}
}

async fn create_physical_plan(
Expand Down
Loading