Skip to content

Conversation

@martin-augment
Copy link
Owner

20039: To review by AI

- Updated AsyncMapper to perform bottom-up mapping with schema extension
- Updated AsyncFuncExec to process expressions incrementally and pass intermediate results
- Updated DefaultPhysicalPlanner to use correct planning logic
- Added regression test case for nested async UDFs
- Remove unused import datafusion::prelude::*

- Remove unused import assert_batches_eq

- Use inline format variable in panic! macro
@gemini-code-assist
Copy link

Summary of Changes

Hello @martin-augment, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request addresses a critical bug in DataFusion's handling of nested asynchronous User Defined Functions (UDFs). The changes ensure that when multiple async UDFs are composed, the inner UDF is correctly evaluated and its result is made available to the outer UDF. This is achieved through modifications in the physical planning phase and the execution of async functions, allowing for sequential evaluation and proper schema management for intermediate results.

Highlights

  • Nested Async UDF Support: This pull request introduces the capability to correctly execute nested asynchronous User Defined Functions (UDFs), resolving an issue where such compositions were not handled properly.
  • Sequential Evaluation of Async Expressions: The AsyncFuncExec now evaluates async expressions in sequence, ensuring that the output of an inner async UDF is available as input for subsequent outer async UDFs within the same expression.
  • Refactored Expression Rewriting: The AsyncMapper in the physical planner has been refactored to perform a combined 'find and map' operation, rewriting inner async UDFs to column references immediately upon discovery, which is crucial for correct nesting.
  • Enhanced Test Coverage: New test cases have been added, including a dedicated test_nested_async_udf and an update to the SQL logic tests, to thoroughly validate the correct functioning of nested async UDFs.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@coderabbitai
Copy link

coderabbitai bot commented Jan 30, 2026

Walkthrough

This pull request refactors the async user-defined function (UDF) evaluation pipeline in DataFusion. Changes span the physical planner, async function executor, and related tests. The planner's try_plan_async_exprs method now propagates rewritten expressions through a new mapping path. The AsyncMapper replaces its find_references method with find_and_map, enabling expression tree transformation with column reference substitution. The executor switches from precomputing async fields to incremental schema evolution during sequential async expression evaluation. Tests are expanded to include nested async UDF scenarios and refactored to use explicit imports and batch formatting for assertions.

✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch pr-20039-2026-01-30-10-51-22

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@claude
Copy link

claude bot commented Jan 30, 2026

Pull Request Review: Fix Nested Async UDF Execution

Summary

This PR addresses issue apache#20039 by correctly handling nested async UDF execution. The key insight is that nested async UDFs need to be evaluated sequentially, with each level's results made available to the next level through schema extension.


Code Quality & Architecture

✅ Strengths

  1. Well-structured refactoring: The consolidation of find_references and map_expr into a single find_and_map method (async_func.rs:367-410) is a significant improvement. This eliminates the two-pass approach and handles discovery and rewriting in a single bottom-up traversal.

  2. Correct schema evolution: The progressive schema building in both:

    • AsyncFuncExec::new (async_func.rs:64-90) - builds schema incrementally for each async expression
    • find_and_map (async_func.rs:379-386) - extends schema with output fields from previously processed async expressions

    This is the correct approach for nested evaluation.

  3. Sequential evaluation: The execute logic (async_func.rs:245-270) correctly evaluates async expressions in sequence, reconstructing the batch with accumulated results. This is essential for nested UDFs.


Potential Issues & Concerns

🔴 Critical: Memory & Performance

Issue: Reconstructing intermediate batches for each async expression could be expensive.

In async_func.rs:245-270:

for (i, async_expr) in async_exprs_captured.iter().enumerate() {
    let current_schema_fields: Vec<_> = schema_captured
        .fields()
        .iter()
        .take(input_columns + i)
        .cloned()
        .collect();
    let current_schema = Arc::new(Schema::new(current_schema_fields));

    let mut current_columns = batch.columns().to_vec();
    current_columns.extend_from_slice(&output_arrays);

    let current_batch = RecordBatch::try_new(current_schema, current_columns)?;

Concerns:

  • For N async expressions, this creates N-1 intermediate RecordBatch objects
  • Each iteration clones column vectors (current_columns)
  • Schema reconstruction on every iteration
  • For large batches or many nested async UDFs, this could be memory-intensive

Recommendation: Consider if there's a way to pass context differently, or at minimum, add a comment explaining this tradeoff and perhaps add metrics to track the overhead.

🟡 Medium: Projection Mapping Logic

In async_func.rs:83-100, the filtering logic for ProjectionMapping:

let mut tuples = Vec::new();
for expr in &async_exprs {
    let mut refers_to_new_cols = false;
    expr.func.apply(&mut |e: &Arc<dyn PhysicalExpr>| {
        if let Some(col) = e.as_any().downcast_ref::<Column>() {
            refers_to_new_cols |= col.index() >= input_len;
        }
        Ok(TreeNodeRecursion::Continue)
    })?;

    if !refers_to_new_cols {
        tuples.push((Arc::clone(&expr.func), expr.name().to_string()));
    }
}

Concerns:

  • The comment says "Expressions referencing newly created async columns cannot be verified against input schema"
  • This filtering might hide expressions from the projection mapping that should be tracked
  • Could this affect query optimization or equivalence checking?

Questions:

  • What happens to expressions that reference new columns in terms of optimization?
  • Should there be a test case that verifies this doesn't break optimization passes?

🟡 Medium: Cloning Overhead

In physical_planner.rs:2693-2699 and 2707-2709:

let new_expr = async_map.find_and_map(expr, schema)?;

The transform_up call in find_and_map creates clones of the expression tree. For non-async expressions, this is wasteful.

Recommendation: Consider checking if the expression contains any async UDFs before cloning, or document why the clone is necessary.


Testing

✅ Good Coverage

  1. Nested async UDF test: The new test in user_defined_async_scalar_functions.rs:117-152 directly tests the bug fix
  2. SQL logic test: The addition to async_udf.slt provides integration-level testing
  3. Existing tests preserved: Modifications to existing tests maintain backward compatibility

🟡 Suggestions for Additional Tests

  1. Triple nesting: async_abs(async_abs(async_abs(x))) to verify arbitrary depth
  2. Mixed nesting: Async UDF with regular UDF inside: async_udf(regular_udf(async_udf(x)))
  3. Multiple nested in same expression: async_udf1(x) + async_udf2(async_udf3(y))
  4. Error handling: What happens if an inner async UDF fails? Does cleanup work correctly?
  5. Performance test: Large batches with multiple nested async UDFs to catch performance regressions

Security & Safety

✅ No Security Concerns

  • No unsafe code introduced
  • No new external dependencies
  • No changes to authentication/authorization
  • No user input directly used in unsafe operations

Code Style & Best Practices

Minor Issues

  1. Import organization (user_defined_async_scalar_functions.rs:20-29):

    • The refactoring to explicit imports is good
    • Consider keeping imports alphabetically sorted for consistency
  2. Type annotations (user_defined_async_scalar_functions.rs:70, 97, 102, 107):

    let df: DataFrame = ctx.sql(...).await?;
    let result: Vec<RecordBatch> = df.collect().await?;
    let result: Result<Vec<RecordBatch>> = df.collect().await;

    These explicit type annotations are good for clarity, though some may be unnecessary. The inconsistency (sometimes on df, sometimes on result) is minor but worth noting.

  3. Comment quality: The comments in physical_planner.rs:2694-2697 are excellent and explain the intent clearly.


Performance Considerations

Recommendations

  1. Add metrics: Track time spent in intermediate batch construction
  2. Benchmark: Compare performance of:
    • Simple async UDF (no nesting)
    • 2-level nesting
    • 3-level nesting
  3. Memory profiling: Verify memory usage with large batches and multiple nested UDFs

Documentation

Missing

  • CHANGELOG entry: Should this be documented in a changelog?
  • User-facing docs: If async UDF nesting is now officially supported, should it be mentioned in docs/source/library-user-guide/functions/adding-udfs.md?
  • Code comments: The algorithm is complex - consider adding a module-level comment in async_func.rs explaining the nested evaluation strategy

Summary & Recommendation

Overall Assessment: ✅ Approve with suggestions

This is a solid fix for a real bug. The core algorithm is correct and the approach is sound. The main concerns are:

  1. Performance implications of intermediate batch reconstruction (needs profiling)
  2. ProjectionMapping filtering logic (needs verification it doesn't break optimizations)
  3. Documentation gaps (changelog, user docs, code comments)

Suggested Action Items:

  • Add performance benchmarks for nested async UDFs
  • Verify ProjectionMapping filtering doesn't break query optimization
  • Consider adding more edge case tests (triple nesting, error handling)
  • Update documentation if this makes nested async UDFs officially supported
  • Add metrics for intermediate batch construction overhead
  • Consider adding a CHANGELOG entry

Nice work on the fix! The consolidation of the two-pass approach into a single find_and_map traversal is elegant, and the sequential evaluation with schema extension is the right solution.

Copy link

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request correctly addresses the handling of nested asynchronous UDFs by introducing a more robust planning and execution strategy. The changes in physical_planner.rs refactor the async expression planning into a single find_and_map function, which correctly uses a bottom-up traversal to handle nested calls. The AsyncFuncExec is updated to sequentially evaluate async expressions, making the output of inner functions available to outer ones. This is a crucial fix for correctness. The addition of a dedicated test case for nested async UDFs ensures this functionality is well-tested. The code is well-structured and the changes are logical and correct. I have reviewed the changes and found no issues.

@augmentcode
Copy link

augmentcode bot commented Jan 30, 2026

🤖 Augment PR Summary

Summary: Fixes planning and execution of nested async scalar UDFs so outer async calls can correctly consume results produced by inner async calls.

Changes:

  • Refactors async-expression planning to rewrite expressions in one pass via a new AsyncMapper::find_and_map (instead of collecting references then rewriting later).
  • Ensures nested async UDFs are identified bottom-up and replaced with Column references to the async outputs.
  • Updates AsyncFuncExec to compute async output fields incrementally using an extended schema (so later async exprs can reference earlier async outputs).
  • Evaluates async expressions in-order per batch, constructing intermediate RecordBatch values so subsequent async exprs can see newly produced columns.
  • Adds a dedicated Rust test covering nested async UDF execution and updates existing async UDF tests to compare formatted output.
  • Adds a SQLLogicTest case for async_abs(async_abs(x)) to exercise nesting in the integration suite.

Technical Notes: The key behavior change is that async expressions are now rewritten and executed sequentially, enabling dependency chains between async UDF evaluations within a single projected expression.

🤖 Was this summary useful? React with 👍 or 👎

Copy link

@augmentcode augmentcode bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review completed. No suggestions at this time.

Comment augment review to trigger a new review at any time.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants