Skip to content

Conversation

@martin-augment
Copy link
Owner

20039: To review by AI

Use conditional compilation to provide parallel execution with JoinSet for
native targets and sequential execution with try_join_all for WASM targets,
since WASM has limited multi-threading support.
- Updated AsyncMapper to perform bottom-up mapping with schema extension
- Updated AsyncFuncExec to process expressions incrementally and pass intermediate results
- Updated DefaultPhysicalPlanner to use correct planning logic
- Added regression test case for nested async UDFs
@coderabbitai
Copy link

coderabbitai bot commented Jan 28, 2026

Walkthrough

This pull request refactors the DataFusion codebase to decouple Session dependency from file listing and async expression evaluation. Changes replace Session-based context parameters with explicit ConfigOptions and RuntimeEnv arguments across the datasource listing layer. The physical planner's async expression handling is reworked to use bottom-up rewriting with incremental schema building. Non-WASM file listing now uses Tokio JoinSet for concurrent partition collection. Test coverage for nested async UDFs is expanded. Tokio is added as a workspace dependency to support concurrent task orchestration.

✨ Finishing touches
  • 📝 Generate docstrings

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@gemini-code-assist
Copy link

Summary of Changes

Hello @martin-augment, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request addresses a critical bug in DataFusion's core engine that prevented the proper execution of nested asynchronous User Defined Functions (UDFs). It refactors the expression planning and execution logic to correctly handle the dependencies between nested async operations. Additionally, it introduces performance improvements for file listing by enabling parallel execution and refactors the context passed to file listing utilities for better modularity.

Highlights

  • Nested Async UDF Execution Fix: Resolved an issue preventing the correct execution of nested asynchronous User Defined Functions (UDFs). The AsyncMapper now performs a bottom-up transformation, replacing inner async UDFs with column references, allowing outer UDFs to correctly reference their results. The AsyncFuncExec evaluates these expressions sequentially, making intermediate results available.
  • Parallel File Listing: Introduced parallel file listing for non-WASM targets using tokio::task::JoinSet within ListingTable::list_files_for_scan, which can improve performance for tables with multiple paths. A sequential fallback is maintained for WASM environments.
  • Refactored File Listing Context: The Session context in file listing functions (pruned_partition_list, list_prefixed_files, list_all_files, list_with_cache) has been replaced with more specific ConfigOptions and RuntimeEnv parameters, enhancing modularity and reducing coupling.
  • New Test Case for Nested Async UDFs: A dedicated test (test_nested_async_udf) has been added to verify the correct functionality of nested asynchronous UDFs, ensuring the fix works as expected.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@claude
Copy link

claude bot commented Jan 28, 2026

PR Review: Fix Nested Async UDF Execution

Summary

This PR fixes a critical bug in handling nested async UDFs (e.g., async_udf(async_udf(x))) by refactoring how async expressions are discovered, mapped, and executed. The changes span multiple crates and include both the core fix and necessary API refactoring.


Code Quality & Best Practices

Strengths ✅

  1. Well-structured refactoring: The separation of concerns between finding async expressions and rewriting them is clean. The new find_and_map method in AsyncMapper elegantly combines both operations.

  2. Good documentation: Inline comments explain the complex logic well, especially:

    • datafusion/catalog-listing/src/table.rs:719-723 explaining the JoinSet tradeoff
    • datafusion/core/src/physical_planner.rs:2671-2674 explaining the find_and_map steps
  3. Proper use of Rust idioms: Transform pattern usage, Arc cloning discipline, and error handling are all appropriate.

  4. Schema evolution handling: The incremental schema building in AsyncFuncExec::try_new (lines 63-70) properly handles nested async functions.

Areas for Improvement 🔧

  1. API Breaking Change Without Deprecation: The refactoring changes the signature of pruned_partition_list, list_prefixed_files, and list_all_files from taking a Session trait object to taking ConfigOptions and RuntimeEnv separately. While this is likely internal, consider:

    • Adding a migration guide or deprecation notice if these are public APIs
    • Documenting the reason for this change in the PR description
  2. Code Duplication in Test Updates: Multiple test files (datafusion/datasource/src/url.rs:900-1085) have repetitive patterns for extracting config and runtime_env. Consider a helper function:

    fn session_parts(session: &impl Session) -> (&ConfigOptions, &Arc<RuntimeEnv>) {
        (session.config().options(), session.runtime_env())
    }
  3. Removed future import: In datafusion/catalog-listing/src/table.rs:102, the future import was removed but it's unclear if futures::future::try_join_all is still accessible through the wildcard import. Verify this compiles cleanly.


Potential Bugs & Issues

Critical 🚨

None identified - The core logic appears sound.

Medium Priority ⚠️

  1. Sequential Schema Construction During Execution (datafusion/physical-plan/src/async_func.rs:245-260):

    • Each async expression creates a new schema by taking the first N fields
    • This creates intermediate schemas repeatedly in the hot path
    • Recommendation: Pre-compute all intermediate schemas in AsyncFuncExec::new() and store them, rather than reconstructing on every batch
  2. JoinSet Error Handling (datafusion/catalog-listing/src/table.rs:760-771):

    • JoinSet errors are wrapped in DataFusionError::External(Box::new(e))
    • This loses type information and makes debugging harder
    • Recommendation: Provide more context about which table_path failed:
      Err(e) => {
          return Err(datafusion_common::DataFusionError::Execution(
              format!("Task failed during file listing: {}", e)
          ));
      }
  3. Potential Column Index Overflow (datafusion/physical-plan/src/async_func.rs:400):

    • output_idx = self.num_input_columns + self.async_exprs.len() - 1
    • If there are many nested async UDFs, this could theoretically overflow usize (extremely unlikely but worth a comment)
    • Recommendation: Add an assertion or comment about the practical limits

Low Priority 📝

  1. Test Output Format Change (datafusion/core/tests/user_defined/user_defined_async_scalar_functions.rs:393-404):

    • Changed from assert_batches_eq! macro to manual string comparison
    • The macro is generally preferred for readability and better error messages
    • Was this change necessary? If so, document why
  2. Unused Import Warning Risk: The new tokio dependency in catalog-listing might trigger warnings on WASM targets since JoinSet is conditionally compiled out.

    • Recommendation: Add #[cfg(not(target_arch = "wasm32"))] to the tokio import

Performance Considerations

Improvements 🚀

  1. Parallel File Listing (datafusion/catalog-listing/src/table.rs:725-776): Using JoinSet to parallelize file listing across table paths is excellent. This should significantly improve performance for multi-path queries.

  2. Bottom-up Async Discovery: The transform_up approach ensures nested async functions are discovered in the right order, avoiding multiple passes.

Concerns 📉

  1. Memory Usage in JoinSet Implementation: The comment at line 719 acknowledges this - files are collected into memory per table_path rather than streamed. For large table scans, this could spike memory usage.

    • Mitigation: The WASM fallback preserves streaming, which is good
    • Recommendation: Consider adding a configuration option to disable JoinSet parallelization for memory-constrained environments
  2. Repeated Schema Cloning (datafusion/physical-plan/src/async_func.rs:248-254): Every iteration creates a new schema by cloning fields. For deeply nested async UDFs:

    • Impact: O(n²) field clones where n = number of async expressions
    • Recommendation: Pre-build these schemas once during plan construction
  3. Arc Clone Overhead (datafusion/catalog-listing/src/table.rs:736-740): Each spawned task clones multiple Arcs. While Arc clones are cheap, this happens for every table_path.

    • Assessment: Acceptable overhead given the parallelization benefits

Security Concerns

None Identified ✓

The changes are purely computational and don't introduce:

  • SQL injection vectors
  • Path traversal issues
  • Authentication/authorization bypasses
  • Resource exhaustion attacks (within reason - see performance notes)

Test Coverage

Strengths ✅

  1. Excellent new test coverage: The new test_nested_async_udf test (datafusion/core/tests/user_defined/user_defined_async_scalar_functions.rs:428-463) directly tests the bug being fixed.

  2. SQL logic test added: datafusion/sqllogictest/test_files/async_udf.slt:882-888 adds a concise regression test.

  3. Comprehensive API updates: All existing tests were updated to use the new API signatures, ensuring no regressions.

Gaps 🔍

  1. Missing edge cases:

    • Triple nesting: async_udf(async_udf(async_udf(x)))
    • Mixed nesting: async_udf(regular_udf(async_udf(x)))
    • Multiple nested in same query: SELECT async_udf(async_udf(a)), async_udf(async_udf(b))
  2. No error case tests: What happens if:

    • Async UDF returns wrong type in nested context?
    • Schema conflicts occur?
  3. Performance tests: Consider adding a benchmark for the new JoinSet parallelization.

Recommendation: Add at least one test for triple-nested async UDFs to ensure the recursive approach works correctly.


Additional Observations

  1. Type Annotations in Tests (datafusion/core/tests/user_defined/user_defined_async_scalar_functions.rs:373, 413, 433): Adding explicit type annotations (: DataFrame, : Vec<RecordBatch>) improves readability. Good change.

  2. Removal of Redundant Code: The old two-phase approach (find_references + map_expr) had complexity issues with nested async UDFs. The new unified find_and_map is much cleaner.

  3. WASM Compatibility: Proper handling of WASM vs non-WASM targets with conditional compilation is well done.


Recommendations Summary

Must Address Before Merge

  • Verify the future import removal doesn't break compilation
  • Add #[cfg(not(target_arch = "wasm32"))] to tokio import in catalog-listing

Should Address

  • Pre-compute intermediate schemas in AsyncFuncExec to avoid hot-path construction
  • Improve JoinSet error messages with context about which table_path failed
  • Add test for triple-nested async UDFs
  • Document the API breaking change and rationale

Nice to Have

  • Add helper function to reduce test code duplication
  • Explain why assert_batches_eq! was changed to manual comparison
  • Add benchmark for parallel file listing performance

Overall Assessment

Verdict: ✅ Approve with minor revisions

This is a high-quality PR that fixes a real bug with a well-thought-out solution. The core algorithm is sound, the refactoring improves code clarity, and the test coverage directly addresses the issue. The performance optimization via JoinSet is a nice bonus.

The main concerns are:

  1. Minor compilation/configuration issues that should be verified
  2. Missed optimization opportunities (pre-computing schemas)
  3. Test coverage gaps for edge cases

The code is production-ready after addressing the "Must Address" items. The "Should Address" items would further improve quality but aren't blockers.

Estimated Risk: Low - The changes are well-isolated, properly tested, and the refactoring is clean.

Great work! 🎉

Copy link

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request correctly handles nested asynchronous UDF execution by refactoring the physical planning of async expressions. It also includes a beneficial refactoring to decouple file listing logic from the Session trait, introducing parallel file listing using tokio::JoinSet for performance. However, a critical regression was identified in the ListingTable's file listing logic: the parallel execution now eagerly collects file metadata into memory, replacing a lazy streaming implementation. This creates a potential Denial of Service (DoS) vector via Out-Of-Memory (OOM) when querying directories with a very large number of files. There is also a suggestion to optimize the parallel file listing by reducing cloning within the loop.

&partition_cols,
)
.await?;
stream.try_collect::<Vec<_>>().await

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security-medium medium

The implementation of parallel file listing for non-WASM targets introduces eager collection of all files into memory for each table path. By calling stream.try_collect::<Vec<_>>().await inside each spawned task, the engine will attempt to load the metadata for all files in the directory into memory before processing. This is a regression from the previous lazy streaming behavior and can be exploited to cause an Out-Of-Memory (OOM) condition by pointing a table to a directory containing a very large number of files.

To remediate this, consider using a streaming approach that preserves laziness. For example, you could use a channel to send PartitionedFile objects from the spawned tasks back to the main task, which would satisfy the 'static lifetime requirement of JoinSet without requiring eager collection of the entire result set into memory.

Comment on lines +738 to +740
let file_extension = file_extension.clone();
let partition_cols = partition_cols.clone();
let filters = filters.clone();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

For performance, consider wrapping file_extension, partition_cols, and filters in Arc before the loop. Cloning String and Vec on each iteration can be inefficient if there are many table_paths. Using Arc would make these cheap reference-counted clones.

For example:

let file_extension = Arc::new(self.options.file_extension.clone());
let partition_cols = Arc::new(self.options.table_partition_cols.clone());
let filters = Arc::new(filters.to_vec());

for table_path in &self.table_paths {
    // ...
    let file_extension = Arc::clone(&file_extension);
    let partition_cols = Arc::clone(&partition_cols);
    let filters = Arc::clone(&filters);
    // ...
}

@augmentcode
Copy link

augmentcode bot commented Jan 28, 2026

🤖 Augment PR Summary

Summary: Fixes planning/execution of nested async scalar UDFs so inner async calls are evaluated and referenced correctly.

Changes:

  • Reworked async-expression planning to detect async UDFs bottom-up and rewrite nested calls via AsyncMapper::find_and_map
  • Updated AsyncFuncExec to build output fields incrementally and evaluate async expressions in-order, allowing later async expressions to reference earlier async outputs
  • Refactored listing-table URL/file discovery APIs to take ConfigOptions and RuntimeEnv explicitly (instead of a Session)
  • Parallelized multi-path listing-table file discovery on non-wasm targets using tokio::task::JoinSet, with a wasm sequential fallback
  • Added regression coverage for nested async UDFs (Rust test + sqllogictest)

Technical Notes: The nested-async fix relies on deterministic __async_fn_N column numbering and appending async outputs to the end of each batch�s schema.

🤖 Was this summary useful? React with 👍 or 👎

Copy link

@augmentcode augmentcode bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review completed. 2 suggestions posted.

Fix All in Augment

Comment augment review to trigger a new review at any time.

itertools = { workspace = true }
log = { workspace = true }
object_store = { workspace = true }
tokio = { workspace = true }
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tokio is only used behind #[cfg(not(target_arch = "wasm32"))]; consider making this a target-specific dependency so wasm builds don't pull in tokio (or potentially incompatible tokio feature sets).

Fix This in Augment

🤖 Was this useful? React with 👍 or 👎

use arrow::array::{Int32Array, RecordBatch, StringArray};
use arrow::datatypes::{DataType, Field, Schema};
use async_trait::async_trait;
use datafusion::dataframe::DataFrame;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assert_batches_eq appears to be unused after switching these tests to format_batches + assert_eq; if CI uses -D warnings, this unused import could fail the build.

Fix This in Augment

🤖 Was this useful? React with 👍 or 👎

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
datafusion/datasource/src/url.rs (1)

382-399: Doc comment still mentions ctx.

The argument list should reflect the new runtime_env parameter name.

📝 Suggested doc fix
-/// * `ctx` - The session context
+/// * `runtime_env` - The runtime environment (cache access)

@martin-augment
Copy link
Owner Author

#216

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants