Skip to content

feat: add JVM UDF framework for native execution#4232

Open
andygrove wants to merge 2 commits intoapache:mainfrom
andygrove:jvm-udf-framework
Open

feat: add JVM UDF framework for native execution#4232
andygrove wants to merge 2 commits intoapache:mainfrom
andygrove:jvm-udf-framework

Conversation

@andygrove
Copy link
Copy Markdown
Member

@andygrove andygrove commented May 5, 2026

Which issue does this PR close?

Part of #4193

Rationale for this change

This PR adds the core JVM UDF framework that enables Comet to invoke JVM-side UDF implementations operating on Arrow data via JNI. This allows us to quickly implement expressions with 100% Spark compatibility without re-implementing them in native Rust code — we call existing Java/Spark code, but operate on Arrow data, avoiding an expensive transition falling back to Spark.

What changes are included in this PR?

The framework consists of:

JVM side:

  • CometUDF trait — interface that JVM UDF implementations must satisfy
  • CometUdfBridge — JNI entry point that native execution calls to invoke a UDF; handles class instantiation caching, Arrow FFI import/export, and result validation
  • CometLambdaRegistry — thread-safe registry bridging plan-time Spark expressions to execution-time UDF lookup

Native (Rust) side:

  • JvmScalarUdfExpr — DataFusion PhysicalExpr that delegates evaluation to a JVM-side CometUDF via JNI and the Arrow C Data Interface
  • CometUdfBridge JNI handle in jni-bridge — caches class/method references
  • JvmScalarUdf protobuf message — serde format for transmitting UDF invocations from plan to execution

Planner integration:

  • ExprStruct::JvmScalarUdf handling in the native planner

This is the framework only — individual expression implementations (e.g., array_exists) will be added in follow-up PRs.

How are these changes tested?

  • Rust compilation verified (cargo check passes for all affected crates)
  • End-to-end testing will come with the first expression implementation in a follow-up PR

Add a framework that allows Comet to invoke JVM-side UDF implementations
operating on Arrow data via JNI, avoiding expensive fallback to Spark while
maintaining 100% Spark compatibility for expressions not yet implemented
natively in Rust.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Copy link
Copy Markdown
Contributor

@comphead comphead left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw @andygrove can we use this framework for regexp udfs?

@andygrove
Copy link
Copy Markdown
Member Author

Btw @andygrove can we use this framework for regexp udfs?

Yes, there is example in #4170

It is perfect for regexp because we get 100% compatibility with almost no effoert, enabled by default

@comphead
Copy link
Copy Markdown
Contributor

comphead commented May 5, 2026

I'm also wondering can we use this framework for user udfs 🤔 currently this is a huge drawback in Comet that for user defined function we fallback as there is no way to transpile custom user code to native side, can this framework be offered to the user as an alternative. depending on UDF complexity it may or may not be easy to rewrite custom user code from Spark UDF to Comet Java UDF. For example I anticipate some problems if the user works on the row level, i.e update some specific values in the row and in Arrow Java it might be more complicated but still promising

@andygrove
Copy link
Copy Markdown
Member Author

I'm also wondering can we use this framework for user udfs 🤔 currently this is a huge drawback in Comet that for user defined function we fallback as there is no way to transpile custom user code to native side, can this framework be offered to the user as an alternative. depending on UDF complexity it may or may not be easy to rewrite custom user code from Spark UDF to Comet Java UDF. For example I anticipate some problems if the user works on the row level, i.e update some specific values in the row and in Arrow Java it might be more complicated but still promising

I am already working on enable this in #4233

* time the serde layer registers a lambda expression under a unique key; at execution time the
* UDF retrieves it by that key (passed as a scalar argument).
*/
object CometLambdaRegistry {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is CometLambdaRegistry used?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// explicit per-task isolation.
private static final int CACHE_CAPACITY = 64;

private static final ThreadLocal<LinkedHashMap<String, CometUDF>> INSTANCES =
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we ensure one instance per thread? Spark/Hive UDFs don't seem to do this.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, that is a good point, will take another look

@mbutrovich
Copy link
Copy Markdown
Contributor

This PR got me thinking about whether the per-expression CometUDF pattern could be generalized, and I prototyped a generic dispatcher on top of this PR's framework. Branch: https://github.com/mbutrovich/datafusion-comet/tree/jvm-udf-generic-dispatcher. The core file is CometGenericExpressionUDF.scala.

What it does

CometGenericExpressionUDF is one CometUDF class that evaluates an arbitrary Spark Expression tree. At plan time, the serde registers the bound expression in CometLambdaRegistry (UUID-keyed), emits a JvmScalarUdf proto pointing at the generic class, and passes data attributes as args. At execution time the UDF looks up the expression, compiles it once via GenerateMutableProjection, and loops over the input Arrow vectors using a reused SpecificInternalRow.

Benefits

One JVM-side class handles any scalar Spark Expression, with no per-expression hand-coded evaluator required.

The dispatcher evaluates composed expression trees in one JNI hop. If a child node (e.g. upper(col) inside rlike(upper(col), pattern)) isn't supported natively, the whole tree still evaluates together without forcing whole-plan fallback.

Benchmarks competitively with the hand-coded RegExpLikeUDF from #4239. Spark's MutableProjection codegen produces the same hot-loop shape (bytes to UTF8String to eval to result) that a hand-written loop does, so there is no inherent per-row dispatcher overhead.

One Janino compile per expression tree, cached by registry key.

Limitations

Near-term, fixable with incremental work

  • Types are prototype-narrow. Input is VarCharVector only, output is BitVector only. Widening is mechanical: build an Array[ColumnReader] and a ResultWriter at cache-miss time, dispatching on Arrow type and expression.dataType once per expression. Scaladoc in the prototype file sketches the shape.
  • CometLambdaRegistry is JVM-local. Driver and executor sharing a JVM works for local Spark only. Cluster mode requires serializing the bound Expression into the proto (Java serialization or Kryo) and dropping the UUID key.
  • Only CometRLike is wired for the generic path in this prototype. The serde logic is not RLike-specific and can be extracted to a single helper that any CometExpressionSerde opts into with one line.
  • Nondeterministic expressions (rand, monotonically_increasing_id) need an initialize(partitionIndex) call before the first row. Easy to add at cache-miss time.
  • Registry entries are never removed, which leaks for long-running drivers.
  • VarCharVector.get(i) copies bytes into a fresh byte[] per row. Matches RegExpLikeUDF, so the A/B comparison is fair, but both paths would improve with a reusable NullableVarCharHolder or UTF8String.fromAddress.

Longer-term, may never fully reach

  • Aggregates, window functions, and generators do not fit the CometUdfBridge "one result vector per input, same length" contract. Each needs its own bridge signature.
  • Python and Pandas UDFs are reachable in principle (they are Expression subclasses). Whether the per-row socket IPC to the Python worker is cheaper than whole-plan fallback would need to be measured.
  • Performance parity with native Rust on expressions that emit per-row allocations (decimals, arrays, strings out) is unlikely. JVM boxing through UnsafeRow and ArrayData is inherent to the evaluation shape, whereas native Rust writes directly into Arrow buffers.
  • Cross-Spark-version stability of Expression serialization is fragile. Spark internals change between releases, and a cluster-mode implementation would need a compatibility story.

Benchmark numbers

Per-row nanoseconds, lower is better. Apple M3 Max, OpenJDK 11.0.30+7-LTS, macOS 26.4.1. Source: CometRegExpBenchmark with one extra case added for the generic dispatcher.

Pattern Spark Comet (Scan) Comet (Exec, native Rust) Comet (Exec, JVM hand-coded) Comet (Exec, JVM generic)
character_class [0-9]+ 12561.0 10616.9 4764.3 4377.9 4293.4
anchored ^[0-9] 9077.1 8776.9 3463.7 3487.0 3384.8
alternation abc|def|ghi 12189.4 11970.7 6837.2 6497.1 6785.3
multi_class [a-zA-Z][0-9]+ 9394.9 10048.6 4272.1 4193.9 4343.2
repetition (ab){2,} 9160.1 9146.7 4086.7 4075.5 4125.5

The generic path tracks the hand-coded path within a few percent across all five patterns. Native Rust is competitive but not dominant on these patterns, likely because the workload favors JIT-warmed backtracking over DFA construction. On adversarial patterns or non-regex expressions with tight Rust kernels, native would be expected to pull further ahead.

I think this is a super promising direction to more quickly (and provide 100% compatibility) support UDFs! Thanks @andygrove!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants