Skip to content

feat: add user-facing CometUDF registration for custom JVM UDFs#4233

Draft
andygrove wants to merge 9 commits intoapache:mainfrom
andygrove:user-jvm-udf
Draft

feat: add user-facing CometUDF registration for custom JVM UDFs#4233
andygrove wants to merge 9 commits intoapache:mainfrom
andygrove:user-jvm-udf

Conversation

@andygrove
Copy link
Copy Markdown
Member

@andygrove andygrove commented May 5, 2026

Which issue does this PR close?

Part of #4193

Builds on #4232 (JVM UDF framework)

Rationale for this change

This PR enables end users to provide their own CometUDF implementations that operate on Arrow columnar data, registered alongside (or in place of) standard Spark UDFs. When Comet encounters a matching UDF during planning, it routes to the vectorized Arrow implementation instead of falling back to Spark's row-at-a-time execution.

What changes are included in this PR?

  • CometUDF trait: declares name, returnType, nullable (default true), inputTypes (default empty), and the vectorized evaluate method. UDF metadata lives on the trait so registration callers do not have to repeat it.
  • CometUdfRegistry: a thread-safe registry mapping Spark UDF names to CometUDF class names plus metadata, with three class-based registration paths.
  • CometScalaUdf serde handler: intercepts ScalaUDF expressions in query planning; if the UDF name is registered, emits a JvmScalarUdf proto for native execution.
  • User guide page (custom-jvm-udfs.md): documents how to write, register, and deploy custom JVM UDFs.

User-facing API

import org.apache.comet.udf.CometUdfRegistry

// (1) Comet only: a Spark UDF is already registered separately.
spark.udf.register("is_positive", (x: Int) => x > 0)
CometUdfRegistry.register(classOf[IsPositiveUdf])

// (2) Register both Spark UDF and CometUDF in one call (arity 1, 2, 3).
CometUdfRegistry.register(spark, classOf[IsPositiveUdf], (x: Int) => x > 0)

// (3) Columnar-only: synthesizes a stub Spark UDF that throws if invoked
// row-at-a-time, so users do not need to write a row-based equivalent.
// Requires the CometUDF to declare `inputTypes`.
CometUdfRegistry.registerColumnarOnly(spark, classOf[IsPositiveUdf])

How are these changes tested?

CometUserUdfSuite covers:

  • basic integer UDF execution end-to-end
  • multiple arguments
  • filtered queries
  • unregistered UDF falls back to Spark with the expected fallback reason
  • registerColumnarOnly runs natively under Comet
  • registerColumnarOnly stub raises UnsupportedOperationException when Comet is disabled
  • register(classOf[X]) correctly captures metadata (name, return type, nullability) from the trait

andygrove and others added 3 commits May 5, 2026 15:26
Add a framework that allows Comet to invoke JVM-side UDF implementations
operating on Arrow data via JNI, avoiding expensive fallback to Spark while
maintaining 100% Spark compatibility for expressions not yet implemented
natively in Rust.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Add CometUdfRegistry that allows end users to register their own CometUDF
implementations to be accelerated by Comet's native execution. When a ScalaUDF
is encountered during planning whose name matches a registry entry, Comet emits
a JvmScalarUdf proto instead of falling back to Spark's row-at-a-time execution.

Also adds user guide documentation explaining how to write, register, and deploy
custom JVM UDFs.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Adds end-to-end tests verifying:
- Basic CometUDF execution (integer doubling via Arrow vectors)
- Unregistered UDFs correctly fall back to Spark
- Multiple UDF invocations in a single query
- UDF combined with WHERE filter
- CometUdfRegistry API (register, lookup, remove)

Also fixes KnownNotNull unwrapping in CometScalaUdf — Spark wraps UDF
arguments in KnownNotNull when the UDF is non-nullable, which needs to
be stripped before serializing the underlying expression.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
}

test("user CometUDF - basic integer doubling") {
CometUdfRegistry.register(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

non critical, just from dev experience, we prob can have a single facade method that registers the function in all registries

sql("CREATE TABLE t (x INT) USING parquet")
sql("INSERT INTO t VALUES (1), (2), (3)")
// Should still produce correct results via Spark fallback
checkSparkAnswer(sql("SELECT triple_int(x) FROM t"))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we check fallback msg?

andygrove added 4 commits May 5, 2026 18:25
Move the DoubleIntUdf test fixture from spark/src/test/ to common/src/main/
so that its bytecode references to org.apache.arrow are relocated by common's
shade plugin to org.apache.comet.shaded.arrow, matching the shaded CometUDF
interface that user code sees at runtime. A test-scope class in spark/ was
compiled against common/target/classes (unshaded) due to Maven workspace
resolution and failed at runtime with AbstractMethodError when dispatched
through the shaded interface.

Update the user-guide page to import Arrow from org.apache.comet.shaded.arrow,
which is the package real users compile against in the published comet-spark
JAR.
The PR's new ScalaUDF dispatch in QueryPlanSerde changes the fallback message
emitted for an anonymous (no-name) UDF from the generic "scalaudf is not
supported" to "ScalaUDF has no name, cannot look up CometUDF registration".
Update the test's expected fallback reasons accordingly.
Add Scala function overloads (arity 1-3) to CometUdfRegistry.register so
callers can register a UDF with both Spark and Comet in a single call,
without first wrapping the function in udf(). Update the test suite to
use the facade and to assert the expected fallback reason in the
unregistered-UDF case.
@andygrove andygrove marked this pull request as ready for review May 6, 2026 11:39
@andygrove andygrove marked this pull request as draft May 6, 2026 12:29
@andygrove andygrove marked this pull request as draft May 6, 2026 12:29
andygrove added 2 commits May 6, 2026 06:43
…istration

Lift name, returnType, nullable, and inputTypes from CometUdfRegistry.register
arguments onto the CometUDF trait itself. Registration now takes a class:

  CometUdfRegistry.register(classOf[MyUdf])
  CometUdfRegistry.register(spark, classOf[MyUdf], rowFn)
  CometUdfRegistry.registerColumnarOnly(spark, classOf[MyUdf])

The third form synthesizes a stub Spark UDF (arities 1 to 5) that throws
UnsupportedOperationException if invoked row-at-a-time, so users no longer have
to write a row-based equivalent just to register a vectorized implementation.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants