-
Notifications
You must be signed in to change notification settings - Fork 0
20039: fix(core): Correctly handle nested async UDF execution #216
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
- Updated AsyncMapper to perform bottom-up mapping with schema extension - Updated AsyncFuncExec to process expressions incrementally and pass intermediate results - Updated DefaultPhysicalPlanner to use correct planning logic - Added regression test case for nested async UDFs
- Remove unused import datafusion::prelude::* - Remove unused import assert_batches_eq - Use inline format variable in panic! macro
Summary of ChangesHello @martin-augment, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request addresses a critical bug in DataFusion's handling of nested asynchronous User Defined Functions (UDFs). The changes ensure that when multiple async UDFs are composed, the inner UDF is correctly evaluated and its result is made available to the outer UDF. This is achieved through modifications in the physical planning phase and the execution of async functions, allowing for sequential evaluation and proper schema management for intermediate results. Highlights
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
WalkthroughThis pull request refactors the async user-defined function (UDF) evaluation pipeline in DataFusion. Changes span the physical planner, async function executor, and related tests. The planner's ✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Pull Request Review: Fix Nested Async UDF ExecutionSummaryThis PR addresses issue apache#20039 by correctly handling nested async UDF execution. The key insight is that nested async UDFs need to be evaluated sequentially, with each level's results made available to the next level through schema extension. Code Quality & Architecture✅ Strengths
Potential Issues & Concerns🔴 Critical: Memory & PerformanceIssue: Reconstructing intermediate batches for each async expression could be expensive. In async_func.rs:245-270: for (i, async_expr) in async_exprs_captured.iter().enumerate() {
let current_schema_fields: Vec<_> = schema_captured
.fields()
.iter()
.take(input_columns + i)
.cloned()
.collect();
let current_schema = Arc::new(Schema::new(current_schema_fields));
let mut current_columns = batch.columns().to_vec();
current_columns.extend_from_slice(&output_arrays);
let current_batch = RecordBatch::try_new(current_schema, current_columns)?;Concerns:
Recommendation: Consider if there's a way to pass context differently, or at minimum, add a comment explaining this tradeoff and perhaps add metrics to track the overhead. 🟡 Medium: Projection Mapping LogicIn async_func.rs:83-100, the filtering logic for let mut tuples = Vec::new();
for expr in &async_exprs {
let mut refers_to_new_cols = false;
expr.func.apply(&mut |e: &Arc<dyn PhysicalExpr>| {
if let Some(col) = e.as_any().downcast_ref::<Column>() {
refers_to_new_cols |= col.index() >= input_len;
}
Ok(TreeNodeRecursion::Continue)
})?;
if !refers_to_new_cols {
tuples.push((Arc::clone(&expr.func), expr.name().to_string()));
}
}Concerns:
Questions:
🟡 Medium: Cloning OverheadIn physical_planner.rs:2693-2699 and 2707-2709: let new_expr = async_map.find_and_map(expr, schema)?;The Recommendation: Consider checking if the expression contains any async UDFs before cloning, or document why the clone is necessary. Testing✅ Good Coverage
🟡 Suggestions for Additional Tests
Security & Safety✅ No Security Concerns
Code Style & Best PracticesMinor Issues
Performance ConsiderationsRecommendations
DocumentationMissing
Summary & RecommendationOverall Assessment: ✅ Approve with suggestions This is a solid fix for a real bug. The core algorithm is correct and the approach is sound. The main concerns are:
Suggested Action Items:
Nice work on the fix! The consolidation of the two-pass approach into a single |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request correctly addresses the handling of nested asynchronous UDFs by introducing a more robust planning and execution strategy. The changes in physical_planner.rs refactor the async expression planning into a single find_and_map function, which correctly uses a bottom-up traversal to handle nested calls. The AsyncFuncExec is updated to sequentially evaluate async expressions, making the output of inner functions available to outer ones. This is a crucial fix for correctness. The addition of a dedicated test case for nested async UDFs ensures this functionality is well-tested. The code is well-structured and the changes are logical and correct. I have reviewed the changes and found no issues.
🤖 Augment PR SummarySummary: Fixes planning and execution of nested async scalar UDFs so outer async calls can correctly consume results produced by inner async calls. Changes:
Technical Notes: The key behavior change is that async expressions are now rewritten and executed sequentially, enabling dependency chains between async UDF evaluations within a single projected expression. 🤖 Was this summary useful? React with 👍 or 👎 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Review completed. No suggestions at this time.
Comment augment review to trigger a new review at any time.
20039: To review by AI