feat: add JVM UDF framework for native execution#4232
feat: add JVM UDF framework for native execution#4232andygrove wants to merge 2 commits intoapache:mainfrom
Conversation
Add a framework that allows Comet to invoke JVM-side UDF implementations operating on Arrow data via JNI, avoiding expensive fallback to Spark while maintaining 100% Spark compatibility for expressions not yet implemented natively in Rust. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
comphead
left a comment
There was a problem hiding this comment.
Btw @andygrove can we use this framework for regexp udfs?
Yes, there is example in #4170 It is perfect for regexp because we get 100% compatibility with almost no effoert, enabled by default |
|
I'm also wondering can we use this framework for user udfs 🤔 currently this is a huge drawback in Comet that for user defined function we fallback as there is no way to transpile custom user code to native side, can this framework be offered to the user as an alternative. depending on UDF complexity it may or may not be easy to rewrite custom user code from Spark UDF to Comet Java UDF. For example I anticipate some problems if the user works on the row level, i.e update some specific values in the row and in Arrow Java it might be more complicated but still promising |
I am already working on enable this in #4233 |
| * time the serde layer registers a lambda expression under a unique key; at execution time the | ||
| * UDF retrieves it by that key (passed as a scalar argument). | ||
| */ | ||
| object CometLambdaRegistry { |
There was a problem hiding this comment.
Where is CometLambdaRegistry used?
There was a problem hiding this comment.
This PR does not wire it up. There are follow on PRs that use this framework:
| // explicit per-task isolation. | ||
| private static final int CACHE_CAPACITY = 64; | ||
|
|
||
| private static final ThreadLocal<LinkedHashMap<String, CometUDF>> INSTANCES = |
There was a problem hiding this comment.
Should we ensure one instance per thread? Spark/Hive UDFs don't seem to do this.
There was a problem hiding this comment.
yeah, that is a good point, will take another look
|
This PR got me thinking about whether the per-expression What it does
BenefitsOne JVM-side class handles any scalar Spark Expression, with no per-expression hand-coded evaluator required. The dispatcher evaluates composed expression trees in one JNI hop. If a child node (e.g. Benchmarks competitively with the hand-coded One Janino compile per expression tree, cached by registry key. LimitationsNear-term, fixable with incremental work
Longer-term, may never fully reach
Benchmark numbersPer-row nanoseconds, lower is better. Apple M3 Max, OpenJDK 11.0.30+7-LTS, macOS 26.4.1. Source:
The generic path tracks the hand-coded path within a few percent across all five patterns. Native Rust is competitive but not dominant on these patterns, likely because the workload favors JIT-warmed backtracking over DFA construction. On adversarial patterns or non-regex expressions with tight Rust kernels, native would be expected to pull further ahead. I think this is a super promising direction to more quickly (and provide 100% compatibility) support UDFs! Thanks @andygrove! |
Which issue does this PR close?
Part of #4193
Rationale for this change
This PR adds the core JVM UDF framework that enables Comet to invoke JVM-side UDF implementations operating on Arrow data via JNI. This allows us to quickly implement expressions with 100% Spark compatibility without re-implementing them in native Rust code — we call existing Java/Spark code, but operate on Arrow data, avoiding an expensive transition falling back to Spark.
What changes are included in this PR?
The framework consists of:
JVM side:
CometUDFtrait — interface that JVM UDF implementations must satisfyCometUdfBridge— JNI entry point that native execution calls to invoke a UDF; handles class instantiation caching, Arrow FFI import/export, and result validationCometLambdaRegistry— thread-safe registry bridging plan-time Spark expressions to execution-time UDF lookupNative (Rust) side:
JvmScalarUdfExpr— DataFusionPhysicalExprthat delegates evaluation to a JVM-sideCometUDFvia JNI and the Arrow C Data InterfaceCometUdfBridgeJNI handle injni-bridge— caches class/method referencesJvmScalarUdfprotobuf message — serde format for transmitting UDF invocations from plan to executionPlanner integration:
ExprStruct::JvmScalarUdfhandling in the native plannerThis is the framework only — individual expression implementations (e.g.,
array_exists) will be added in follow-up PRs.How are these changes tested?
cargo checkpasses for all affected crates)