Skip to content

feat: Arrow-direct codegen dispatcher for Spark expressions and Scala UDFs#4267

Draft
mbutrovich wants to merge 40 commits into
apache:mainfrom
mbutrovich:codegen_scala_udf
Draft

feat: Arrow-direct codegen dispatcher for Spark expressions and Scala UDFs#4267
mbutrovich wants to merge 40 commits into
apache:mainfrom
mbutrovich:codegen_scala_udf

Conversation

@mbutrovich
Copy link
Copy Markdown
Contributor

@mbutrovich mbutrovich commented May 8, 2026

Draft while we discuss with #4233 and #4239.

Which issue does this PR close?

Closes #.

Rationale for this change

#4232 merged the JVM UDF bridge. This PR adds a codegen dispatcher on top: one generic CometUDF that compiles a specialized batch kernel per bound Catalyst Expression + input schema via Janino.

Benefits:

  • Any supported ScalaUDF or Catalyst expression routes through native without a hand-written CometUDF.
  • UDFs stop being opaque operator boundaries; ScalaUDFs and Catalyst expressions share one expression tree, so Comet keeps surrounding native operators in place.
  • An entire expression subtree compiles into one per-row loop with stack-local intermediates instead of per-subexpression Arrow batches.

Opt-in via spark.comet.exec.codegenDispatch.mode = auto | force | disabled. Primary targets: string expressions and user ScalaUDFs.

What changes are included in this PR?

  • Codegen dispatcher: CometBatchKernelCodegen (orchestrator) + CometBatchKernelCodegenInput / CometBatchKernelCodegenOutput (per-side emission) + CometCodegenDispatchUDF (bridge entry, three-layer cache).
  • Complex type support: ArrayType, StructType, and MapType as both input and output, including arbitrary nesting. Sealed ArrowColumnSpec + recursive nested-class emission.
  • Per-expression specialization: direct-bytes RegExpReplace emitter bypassing the UTF8String round-trip Spark's doGenCode forces.
  • Optimization set applied per (expression, input schema): zero-copy UTF8 reads (VarCharVector / ViewVarCharVector), non-nullable isNullAt elision, decimal short-precision fast path on both sides, UTF8 on-heap write shortcut, pre-sized variable-length output buffers, NullIntolerant short-circuit, non-nullable output short-circuit, subexpression elimination. Complex-type output writes hoist getChildByOrdinal + cast to once-per-batch setup so the per-row body has no runtime type dispatch and no redundant casts.
  • Bridge contract additions: numRows parameter (zero-column expressions); TaskContext propagation across JNI so partition-sensitive expressions (Rand, Uuid, MonotonicallyIncreasingID, user UDFs reading TaskContext.get()) see the Spark task context from the Tokio worker.
  • Serde routing: CometScalaUDF routes any ScalaUDF; the regex family (rlike, regexp_replace, regexp_extract, regexp_extract_all, regexp_instr, split) gets a uniform pickWithMode switch; native Rust paths preserved where they exist. Proto-building factored into CodegenDispatchSerdeHelpers.buildJvmUdfExpr.
  • Allocation reuses Utils.toArrowField + Field.createVector for every output type. Input spec derives Spark DataTypes via Utils.fromArrowField. Exception paths close partially-allocated vectors to avoid leaks.
  • Docs split: docs/source/user-guide/latest/jvm_udf_dispatch.md (configuration, supported expressions and types, regex routing matrix, behavioral limitations); docs/source/contributor-guide/jvm_udf_dispatch.md (architecture, optimizations, caching, CSE rationale, WSCG-exploration notes, open items cross-referencing in-code TODOs, file map). Both ASCII-only.

How are these changes tested?

  • CometCodegenSourceSuite - generated-source assertions for every optimization and every complex-type shape.
  • CometCodegenDispatchSmokeSuite - end-to-end correctness across the scalar and complex type surface (primitives, decimal precision boundaries, date/timestamp/timestampNTZ, array/struct/map round-trips including nested shapes), composed-UDF trees, subquery reuse, TaskContext propagation.
  • CometCodegenDispatchFuzzSuite - randomized string fuzz + decimal identity fuzz at several null densities.
  • CometRegExpJvmSuite - SQL-level Spark-vs-Comet correctness for the regex family.
  • CometScalaUDFCompositionBenchmark - Spark vs Comet native built-ins vs dispatcher disabled vs dispatcher force over three shapes.

@mbutrovich
Copy link
Copy Markdown
Contributor Author

There are like 4 Spark SQL test failures that look like they might need updating, but otherwise it's looking good. Not gonna worry about them until we discuss moving forward.

* generated subclass is not thread-safe across concurrent {@code process} calls, so kernels are
* allocated per dispatcher invocation and init is run once on the fresh instance.
*/
public void init(int partitionIndex) {}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I think moving init before process helps with reading this

*/
trait CometUDF {
def evaluate(inputs: Array[ValueVector]): ValueVector
def evaluate(inputs: Array[ValueVector], numRows: Int): ValueVector
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be worth creating a separate PR with this change?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, I am peeling off this and the taskcontext change to its own PRs.

val REGEXP_ENGINE_RUST = "rust"
val REGEXP_ENGINE_JAVA = "java"

val COMET_REGEXP_ENGINE: ConfigEntry[String] =
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using the regexp work to test the new framework makes sense, but I think we should split this work out into a follow on PR

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We still need compatibility docs for regexp for the Rust path

Comment on lines +130 to +135
keyUnwrapper,
// Capture the Spark task thread's TaskContext at `createPlan` time. Stashed native-side
// in the ExecutionContext and passed through the JVM UDF bridge so that Tokio workers
// running JVM UDFs see the real `TaskContext` via their thread-local. See
// `CometUdfBridge.evaluate` and `CometTaskContextShim` for the receive side.
TaskContext.get())
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could this change be a separate pr?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, peeling this off as a separate PR

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants