-
Notifications
You must be signed in to change notification settings - Fork 0
20039: fix(core): Correctly handle nested async UDF execution #215
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Use conditional compilation to provide parallel execution with JoinSet for native targets and sequential execution with try_join_all for WASM targets, since WASM has limited multi-threading support.
- Updated AsyncMapper to perform bottom-up mapping with schema extension - Updated AsyncFuncExec to process expressions incrementally and pass intermediate results - Updated DefaultPhysicalPlanner to use correct planning logic - Added regression test case for nested async UDFs
WalkthroughThis pull request refactors the DataFusion codebase to decouple Session dependency from file listing and async expression evaluation. Changes replace Session-based context parameters with explicit ConfigOptions and RuntimeEnv arguments across the datasource listing layer. The physical planner's async expression handling is reworked to use bottom-up rewriting with incremental schema building. Non-WASM file listing now uses Tokio JoinSet for concurrent partition collection. Test coverage for nested async UDFs is expanded. Tokio is added as a workspace dependency to support concurrent task orchestration. ✨ Finishing touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Summary of ChangesHello @martin-augment, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request addresses a critical bug in DataFusion's core engine that prevented the proper execution of nested asynchronous User Defined Functions (UDFs). It refactors the expression planning and execution logic to correctly handle the dependencies between nested async operations. Additionally, it introduces performance improvements for file listing by enabling parallel execution and refactors the context passed to file listing utilities for better modularity. Highlights
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
PR Review: Fix Nested Async UDF ExecutionSummaryThis PR fixes a critical bug in handling nested async UDFs (e.g., Code Quality & Best PracticesStrengths ✅
Areas for Improvement 🔧
Potential Bugs & IssuesCritical 🚨None identified - The core logic appears sound. Medium Priority
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request correctly handles nested asynchronous UDF execution by refactoring the physical planning of async expressions. It also includes a beneficial refactoring to decouple file listing logic from the Session trait, introducing parallel file listing using tokio::JoinSet for performance. However, a critical regression was identified in the ListingTable's file listing logic: the parallel execution now eagerly collects file metadata into memory, replacing a lazy streaming implementation. This creates a potential Denial of Service (DoS) vector via Out-Of-Memory (OOM) when querying directories with a very large number of files. There is also a suggestion to optimize the parallel file listing by reducing cloning within the loop.
| &partition_cols, | ||
| ) | ||
| .await?; | ||
| stream.try_collect::<Vec<_>>().await |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The implementation of parallel file listing for non-WASM targets introduces eager collection of all files into memory for each table path. By calling stream.try_collect::<Vec<_>>().await inside each spawned task, the engine will attempt to load the metadata for all files in the directory into memory before processing. This is a regression from the previous lazy streaming behavior and can be exploited to cause an Out-Of-Memory (OOM) condition by pointing a table to a directory containing a very large number of files.
To remediate this, consider using a streaming approach that preserves laziness. For example, you could use a channel to send PartitionedFile objects from the spawned tasks back to the main task, which would satisfy the 'static lifetime requirement of JoinSet without requiring eager collection of the entire result set into memory.
| let file_extension = file_extension.clone(); | ||
| let partition_cols = partition_cols.clone(); | ||
| let filters = filters.clone(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For performance, consider wrapping file_extension, partition_cols, and filters in Arc before the loop. Cloning String and Vec on each iteration can be inefficient if there are many table_paths. Using Arc would make these cheap reference-counted clones.
For example:
let file_extension = Arc::new(self.options.file_extension.clone());
let partition_cols = Arc::new(self.options.table_partition_cols.clone());
let filters = Arc::new(filters.to_vec());
for table_path in &self.table_paths {
// ...
let file_extension = Arc::clone(&file_extension);
let partition_cols = Arc::clone(&partition_cols);
let filters = Arc::clone(&filters);
// ...
}
🤖 Augment PR SummarySummary: Fixes planning/execution of nested async scalar UDFs so inner async calls are evaluated and referenced correctly. Changes:
Technical Notes: The nested-async fix relies on deterministic 🤖 Was this summary useful? React with 👍 or 👎 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| itertools = { workspace = true } | ||
| log = { workspace = true } | ||
| object_store = { workspace = true } | ||
| tokio = { workspace = true } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| use arrow::array::{Int32Array, RecordBatch, StringArray}; | ||
| use arrow::datatypes::{DataType, Field, Schema}; | ||
| use async_trait::async_trait; | ||
| use datafusion::dataframe::DataFrame; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
datafusion/datasource/src/url.rs (1)
382-399: Doc comment still mentionsctx.The argument list should reflect the new
runtime_envparameter name.📝 Suggested doc fix
-/// * `ctx` - The session context +/// * `runtime_env` - The runtime environment (cache access)
20039: To review by AI