Add Spark Connect support for DataFrame testing#491
Conversation
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: ff12f066dc
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
There was a problem hiding this comment.
Pull request overview
Adds Spark Connect–compatible testing infrastructure so DataFrame/SQL test suites can run through the Connect gRPC protocol (without relying on .rdd).
Changes:
- Introduces
SparkConnectSuiteBaseto start/stop a local Spark Connect server and provide a Connect clientSparkSession. - Adds
ConnectDataFrameComparisonswith Connect-safe DataFrame equality/approx/no-order comparison helpers (usingcollect()and DataFrame ops). - Updates
build.sbtto include Spark Connect dependencies and to include3.5/scalasources/tests for Spark 3.5+.
Reviewed changes
Copilot reviewed 3 out of 4 changed files in this pull request and generated 3 comments.
| File | Description |
|---|---|
| core/src/main/3.5/scala/com/holdenkarau/spark/testing/ConnectSuiteBase.scala | New Spark Connect test base trait managing server + client sessions. |
| core/src/main/3.5/scala/com/holdenkarau/spark/testing/ConnectDataFrameComparisons.scala | New DataFrame comparison utilities compatible with Spark Connect. |
| core/src/test/3.5/scala/com/holdenkarau/spark/testing/SampleSparkConnectTest.scala | Example test suite demonstrating Connect execution and assertion behavior. |
| build.sbt | Adds Spark Connect deps and wires 3.5 source/test directories; adjusts version gating. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
45f2377 to
72074b4
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 3 out of 4 changed files in this pull request and generated 3 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| override def afterAll(): Unit = { | ||
| try { | ||
| if (_connectSession != null) { | ||
| _connectSession.close() | ||
| _connectSession = null | ||
| } | ||
| SparkConnectService.stop() | ||
| } finally { | ||
| super.afterAll() | ||
| } |
There was a problem hiding this comment.
ConnectEnabled.afterAll closes the Connect session and stops Spark Connect, but it never resets SparkSessionProvider._sparkSession. If a suite overrides reuseContextIfPossible = true, the base DataFrameSuiteBase.afterAll will not clear _sparkSession, leaving a closed/invalid SparkSession in the provider that later suites may reuse. Consider restoring the previous SparkSessionProvider value (saved in beforeAll) or explicitly setting SparkSessionProvider._sparkSession = null in afterAll regardless of reuseContextIfPossible.
e774c67 to
23020b0
Compare
Add `ConnectEnabled` mixin trait: just add `with ConnectEnabled` to any
existing DataFrameSuiteBase test to route operations through Spark Connect.
Usage:
class MyTest extends ScalaDataFrameSuiteBase with ConnectEnabled {
test("works through Connect") {
assertDataFrameEquals(expected, result) // through Connect!
}
}
Architecture:
- connect-client-shaded sub-project: shades spark-connect-client-jvm via
sbt-assembly, relocating org.apache.spark.sql.** to avoid classpath
conflicts with spark-sql. Exposes ConnectBridge with classloader-safe
types (primitives, arrays) for cross-classpath communication.
- ConnectEnabled mixin (in core 3.5/ source dir):
- Injects Connect gRPC port into server SparkConf via abstract override
- Starts SparkConnectService on the existing SparkContext
- On Spark 4.0+: replaces SparkSessionProvider with a Connect session
(unified SparkSession API has .remote())
- On Spark 3.5: uses shaded ConnectBridge for Connect validation
(classic session stays due to classpath conflicts)
- isConnectSession: true if primary session is Connect (4.0+)
- isConnectServerActive: true if gRPC server reachable (3.5+)
- Only assertDataFrameApproximateEquals needs a collect-based override
Also fixes assertDataFrameDataEquals in DataFrameSuiteBaseLike to use
pure DataFrame ops instead of .rdd.cache/.unpersist.
Tests:
- Bridge connectivity test (works on 3.5+ and 4.0+)
- .rdd smoke test gated by assume(isConnectSession) for 4.0+
- Full DataFrame assertion suite through Connect
Build changes:
- New connect-client-shaded sub-project with sbt-assembly shade rules
- spark-connect (server) for 3.5+, spark-connect-client-jvm for 4.0+
- 3.5/ source directory for Connect code
- Fix >= for spark-sql-api dep (was > "4.0.0", now >= "4.0.0")
https://claude.ai/code/session_016uRqdQ8z5pj4UatnJYMySn
23020b0 to
fa2ae06
Compare
- Save/restore SparkSessionProvider._sparkSession in beforeAll/afterAll so later suites (or reuseContextIfPossible=true) don't see a closed Connect session - Add test that executes same query via classic session AND shaded Connect bridge, asserts results match -- this validates Connect routing on both 3.5 and 4.0 (not just DataFrame ops that would pass either way) https://claude.ai/code/session_016uRqdQ8z5pj4UatnJYMySn
assertDataFrameDataEquals now uses cache()/unpersist() on DataFrames instead of .rdd.cache/.rdd.unpersist, retaining the prior performance characteristics (avoid re-scanning inputs across count, groupBy, join) while staying Connect-compatible. ConnectEnabled's assertDataFrameApproximateEquals override also adds cache()/unpersist() around the collect-based comparison. https://claude.ai/code/session_016uRqdQ8z5pj4UatnJYMySn
The connectClientShaded sub-project was unconditionally setting
exportJars=true and packageBin=assembly.value, which forced sbt-assembly
to run for ALL Spark versions including pre-3.5 where the sub-project
has no sources or dependencies. This caused pre-3.5 CI builds to hang.
Now uses System.getProperty("sparkVersion") at build.sbt load time to
conditionally apply assembly settings only for Spark 3.5+. Pre-3.5
builds use default packaging (empty JAR, fast).
https://claude.ai/code/session_016uRqdQ8z5pj4UatnJYMySn
- ConnectEnabled.afterAll: only restore SparkSessionProvider if we actually replaced it in beforeAll. Previously on 3.5 (None branch, _previousSession stays null) afterAll would null out the classic session set by DataFrameSuiteBase.sqlBeforeAllTestCases - ConnectBridge.start: synchronized + closes any prior session so repeated start() calls don't leak - ConnectBridge.stop: synchronized for consistency - Remove unused ConnectBridge.getSchemaJson method - Fix import ordering in ConnectEnabled (com.holdenkarau after org.*) - Clarify isConnectSession docstring - Update SampleSparkConnectTest class-level doc to reflect that the bridge does validate gRPC routing on 3.5 (previously said "tests still pass, just not through Connect" which was misleading) https://claude.ai/code/session_016uRqdQ8z5pj4UatnJYMySn
Summary
This PR adds comprehensive support for testing Spark code through the Spark Connect protocol. It introduces new test base classes and DataFrame comparison utilities that work with Spark Connect's gRPC-based architecture, which doesn't support RDD operations.
Key Changes
ConnectSuiteBase trait: A new test base class that:
ConnectDataFrameComparisons trait: DataFrame comparison utilities that:
collect()and DataFrame operationsassertConnectDataFrameEquals()for exact equality checksassertConnectDataFrameApproximateEquals()with numeric tolerance supportassertConnectDataFrameNoOrderEquals()for order-independent comparisonassertConnectDataFrameDataEquals()for schema-agnostic data comparisonSampleSparkConnectTest: Comprehensive test suite demonstrating:
Build configuration updates:
Notable Implementation Details
SparkSession.builder().remote()to create Connect client sessionsDataFrameComparisons.approxEquals()for numeric tolerance comparisonshttps://claude.ai/code/session_016uRqdQ8z5pj4UatnJYMySn