feat: Arrow-direct codegen dispatcher for Spark expressions and Scala UDFs#4267
feat: Arrow-direct codegen dispatcher for Spark expressions and Scala UDFs#4267mbutrovich wants to merge 40 commits into
Conversation
|
There are like 4 Spark SQL test failures that look like they might need updating, but otherwise it's looking good. Not gonna worry about them until we discuss moving forward. |
…ted body" on Spark 3.5
| * generated subclass is not thread-safe across concurrent {@code process} calls, so kernels are | ||
| * allocated per dispatcher invocation and init is run once on the fresh instance. | ||
| */ | ||
| public void init(int partitionIndex) {} |
There was a problem hiding this comment.
nit: I think moving init before process helps with reading this
| */ | ||
| trait CometUDF { | ||
| def evaluate(inputs: Array[ValueVector]): ValueVector | ||
| def evaluate(inputs: Array[ValueVector], numRows: Int): ValueVector |
There was a problem hiding this comment.
Would it be worth creating a separate PR with this change?
There was a problem hiding this comment.
yes, I am peeling off this and the taskcontext change to its own PRs.
| val REGEXP_ENGINE_RUST = "rust" | ||
| val REGEXP_ENGINE_JAVA = "java" | ||
|
|
||
| val COMET_REGEXP_ENGINE: ConfigEntry[String] = |
There was a problem hiding this comment.
Using the regexp work to test the new framework makes sense, but I think we should split this work out into a follow on PR
There was a problem hiding this comment.
We still need compatibility docs for regexp for the Rust path
| keyUnwrapper, | ||
| // Capture the Spark task thread's TaskContext at `createPlan` time. Stashed native-side | ||
| // in the ExecutionContext and passed through the JVM UDF bridge so that Tokio workers | ||
| // running JVM UDFs see the real `TaskContext` via their thread-local. See | ||
| // `CometUdfBridge.evaluate` and `CometTaskContextShim` for the receive side. | ||
| TaskContext.get()) |
There was a problem hiding this comment.
could this change be a separate pr?
There was a problem hiding this comment.
yes, peeling this off as a separate PR
Draft while we discuss with #4233 and #4239.
Which issue does this PR close?
Closes #.
Rationale for this change
#4232 merged the JVM UDF bridge. This PR adds a codegen dispatcher on top: one generic
CometUDFthat compiles a specialized batch kernel per bound CatalystExpression+ input schema via Janino.Benefits:
ScalaUDFor Catalyst expression routes through native without a hand-writtenCometUDF.ScalaUDFs and Catalyst expressions share one expression tree, so Comet keeps surrounding native operators in place.Opt-in via
spark.comet.exec.codegenDispatch.mode = auto | force | disabled. Primary targets: string expressions and userScalaUDFs.What changes are included in this PR?
CometBatchKernelCodegen(orchestrator) +CometBatchKernelCodegenInput/CometBatchKernelCodegenOutput(per-side emission) +CometCodegenDispatchUDF(bridge entry, three-layer cache).ArrayType,StructType, andMapTypeas both input and output, including arbitrary nesting. SealedArrowColumnSpec+ recursive nested-class emission.RegExpReplaceemitter bypassing theUTF8Stringround-trip Spark'sdoGenCodeforces.(expression, input schema): zero-copy UTF8 reads (VarCharVector/ViewVarCharVector), non-nullableisNullAtelision, decimal short-precision fast path on both sides, UTF8 on-heap write shortcut, pre-sized variable-length output buffers,NullIntolerantshort-circuit, non-nullable output short-circuit, subexpression elimination. Complex-type output writes hoistgetChildByOrdinal+ cast to once-per-batch setup so the per-row body has no runtime type dispatch and no redundant casts.numRowsparameter (zero-column expressions);TaskContextpropagation across JNI so partition-sensitive expressions (Rand,Uuid,MonotonicallyIncreasingID, user UDFs readingTaskContext.get()) see the Spark task context from the Tokio worker.CometScalaUDFroutes anyScalaUDF; the regex family (rlike,regexp_replace,regexp_extract,regexp_extract_all,regexp_instr,split) gets a uniformpickWithModeswitch; native Rust paths preserved where they exist. Proto-building factored intoCodegenDispatchSerdeHelpers.buildJvmUdfExpr.Utils.toArrowField+Field.createVectorfor every output type. Input spec derives SparkDataTypes viaUtils.fromArrowField. Exception paths close partially-allocated vectors to avoid leaks.docs/source/user-guide/latest/jvm_udf_dispatch.md(configuration, supported expressions and types, regex routing matrix, behavioral limitations);docs/source/contributor-guide/jvm_udf_dispatch.md(architecture, optimizations, caching, CSE rationale, WSCG-exploration notes, open items cross-referencing in-code TODOs, file map). Both ASCII-only.How are these changes tested?
CometCodegenSourceSuite- generated-source assertions for every optimization and every complex-type shape.CometCodegenDispatchSmokeSuite- end-to-end correctness across the scalar and complex type surface (primitives, decimal precision boundaries, date/timestamp/timestampNTZ, array/struct/map round-trips including nested shapes), composed-UDF trees, subquery reuse,TaskContextpropagation.CometCodegenDispatchFuzzSuite- randomized string fuzz + decimal identity fuzz at several null densities.CometRegExpJvmSuite- SQL-level Spark-vs-Comet correctness for the regex family.CometScalaUDFCompositionBenchmark- Spark vs Comet native built-ins vs dispatcherdisabledvs dispatcherforceover three shapes.