Skip to content

Add Spark Connect support for DataFrame testing#491

Merged
holdenk merged 5 commits into
mainfrom
claude/spark-connect-local-testing-UD4Ou
Apr 12, 2026
Merged

Add Spark Connect support for DataFrame testing#491
holdenk merged 5 commits into
mainfrom
claude/spark-connect-local-testing-UD4Ou

Conversation

@holdenk
Copy link
Copy Markdown
Owner

@holdenk holdenk commented Mar 30, 2026

Summary

This PR adds comprehensive support for testing Spark code through the Spark Connect protocol. It introduces new test base classes and DataFrame comparison utilities that work with Spark Connect's gRPC-based architecture, which doesn't support RDD operations.

Key Changes

  • ConnectSuiteBase trait: A new test base class that:

    • Starts a local Spark instance with the Connect gRPC server enabled
    • Provides a Connect client SparkSession for tests
    • Automatically manages server lifecycle (startup/shutdown)
    • Finds and uses a free port for the gRPC server
    • Ensures DataFrame/SQL code works when deployed against a Spark Connect endpoint
  • ConnectDataFrameComparisons trait: DataFrame comparison utilities that:

    • Replace RDD-based comparisons with collect() and DataFrame operations
    • Provide assertConnectDataFrameEquals() for exact equality checks
    • Provide assertConnectDataFrameApproximateEquals() with numeric tolerance support
    • Provide assertConnectDataFrameNoOrderEquals() for order-independent comparison
    • Provide assertConnectDataFrameDataEquals() for schema-agnostic data comparison
    • Show up to 10 unequal rows in failure messages for debugging
  • SampleSparkConnectTest: Comprehensive test suite demonstrating:

    • DataFrame creation and querying through Connect
    • SQL query execution
    • All comparison assertion methods
    • Proper error detection when assertions fail
  • Build configuration updates:

    • Added Spark Connect dependencies (server + client) for Spark 3.5+
    • Updated source directory configuration to include 3.5/scala sources
    • Fixed version comparison logic (>= instead of >) for Spark 4.0.0

Notable Implementation Details

  • Uses SparkSession.builder().remote() to create Connect client sessions
  • Leverages DataFrameComparisons.approxEquals() for numeric tolerance comparisons
  • Implements order-independent comparison using DataFrame groupBy and full outer join
  • Properly handles resource cleanup in afterAll() with null checks for Spark 4.0 compatibility

https://claude.ai/code/session_016uRqdQ8z5pj4UatnJYMySn

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: ff12f066dc

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment thread core/src/main/4.0/scala/com/holdenkarau/spark/testing/ConnectSuiteBase.scala Outdated
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds Spark Connect–compatible testing infrastructure so DataFrame/SQL test suites can run through the Connect gRPC protocol (without relying on .rdd).

Changes:

  • Introduces SparkConnectSuiteBase to start/stop a local Spark Connect server and provide a Connect client SparkSession.
  • Adds ConnectDataFrameComparisons with Connect-safe DataFrame equality/approx/no-order comparison helpers (using collect() and DataFrame ops).
  • Updates build.sbt to include Spark Connect dependencies and to include 3.5/scala sources/tests for Spark 3.5+.

Reviewed changes

Copilot reviewed 3 out of 4 changed files in this pull request and generated 3 comments.

File Description
core/src/main/3.5/scala/com/holdenkarau/spark/testing/ConnectSuiteBase.scala New Spark Connect test base trait managing server + client sessions.
core/src/main/3.5/scala/com/holdenkarau/spark/testing/ConnectDataFrameComparisons.scala New DataFrame comparison utilities compatible with Spark Connect.
core/src/test/3.5/scala/com/holdenkarau/spark/testing/SampleSparkConnectTest.scala Example test suite demonstrating Connect execution and assertion behavior.
build.sbt Adds Spark Connect deps and wires 3.5 source/test directories; adjusts version gating.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread core/src/main/4.0/scala/com/holdenkarau/spark/testing/ConnectSuiteBase.scala Outdated
Comment thread core/src/main/4.0/scala/com/holdenkarau/spark/testing/ConnectSuiteBase.scala Outdated
@holdenk holdenk force-pushed the claude/spark-connect-local-testing-UD4Ou branch 5 times, most recently from 45f2377 to 72074b4 Compare March 31, 2026 00:25
@holdenk holdenk requested a review from Copilot March 31, 2026 00:35
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 3 out of 4 changed files in this pull request and generated 3 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread core/src/main/2.4/scala/com/holdenkarau/spark/testing/DataFrameSuiteBase.scala Outdated
Comment on lines +79 to +88
override def afterAll(): Unit = {
try {
if (_connectSession != null) {
_connectSession.close()
_connectSession = null
}
SparkConnectService.stop()
} finally {
super.afterAll()
}
Copy link

Copilot AI Mar 31, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ConnectEnabled.afterAll closes the Connect session and stops Spark Connect, but it never resets SparkSessionProvider._sparkSession. If a suite overrides reuseContextIfPossible = true, the base DataFrameSuiteBase.afterAll will not clear _sparkSession, leaving a closed/invalid SparkSession in the provider that later suites may reuse. Consider restoring the previous SparkSessionProvider value (saved in beforeAll) or explicitly setting SparkSessionProvider._sparkSession = null in afterAll regardless of reuseContextIfPossible.

Copilot uses AI. Check for mistakes.
@holdenk holdenk force-pushed the claude/spark-connect-local-testing-UD4Ou branch 2 times, most recently from e774c67 to 23020b0 Compare March 31, 2026 16:33
Add `ConnectEnabled` mixin trait: just add `with ConnectEnabled` to any
existing DataFrameSuiteBase test to route operations through Spark Connect.

Usage:
  class MyTest extends ScalaDataFrameSuiteBase with ConnectEnabled {
    test("works through Connect") {
      assertDataFrameEquals(expected, result)  // through Connect!
    }
  }

Architecture:
- connect-client-shaded sub-project: shades spark-connect-client-jvm via
  sbt-assembly, relocating org.apache.spark.sql.** to avoid classpath
  conflicts with spark-sql. Exposes ConnectBridge with classloader-safe
  types (primitives, arrays) for cross-classpath communication.

- ConnectEnabled mixin (in core 3.5/ source dir):
  - Injects Connect gRPC port into server SparkConf via abstract override
  - Starts SparkConnectService on the existing SparkContext
  - On Spark 4.0+: replaces SparkSessionProvider with a Connect session
    (unified SparkSession API has .remote())
  - On Spark 3.5: uses shaded ConnectBridge for Connect validation
    (classic session stays due to classpath conflicts)
  - isConnectSession: true if primary session is Connect (4.0+)
  - isConnectServerActive: true if gRPC server reachable (3.5+)
  - Only assertDataFrameApproximateEquals needs a collect-based override

Also fixes assertDataFrameDataEquals in DataFrameSuiteBaseLike to use
pure DataFrame ops instead of .rdd.cache/.unpersist.

Tests:
- Bridge connectivity test (works on 3.5+ and 4.0+)
- .rdd smoke test gated by assume(isConnectSession) for 4.0+
- Full DataFrame assertion suite through Connect

Build changes:
- New connect-client-shaded sub-project with sbt-assembly shade rules
- spark-connect (server) for 3.5+, spark-connect-client-jvm for 4.0+
- 3.5/ source directory for Connect code
- Fix >= for spark-sql-api dep (was > "4.0.0", now >= "4.0.0")

https://claude.ai/code/session_016uRqdQ8z5pj4UatnJYMySn
@holdenk holdenk force-pushed the claude/spark-connect-local-testing-UD4Ou branch from 23020b0 to fa2ae06 Compare March 31, 2026 17:22
claude added 4 commits March 31, 2026 19:09
- Save/restore SparkSessionProvider._sparkSession in beforeAll/afterAll
  so later suites (or reuseContextIfPossible=true) don't see a closed
  Connect session
- Add test that executes same query via classic session AND shaded
  Connect bridge, asserts results match -- this validates Connect routing
  on both 3.5 and 4.0 (not just DataFrame ops that would pass either way)

https://claude.ai/code/session_016uRqdQ8z5pj4UatnJYMySn
assertDataFrameDataEquals now uses cache()/unpersist() on DataFrames
instead of .rdd.cache/.rdd.unpersist, retaining the prior performance
characteristics (avoid re-scanning inputs across count, groupBy, join)
while staying Connect-compatible.

ConnectEnabled's assertDataFrameApproximateEquals override also adds
cache()/unpersist() around the collect-based comparison.

https://claude.ai/code/session_016uRqdQ8z5pj4UatnJYMySn
The connectClientShaded sub-project was unconditionally setting
exportJars=true and packageBin=assembly.value, which forced sbt-assembly
to run for ALL Spark versions including pre-3.5 where the sub-project
has no sources or dependencies. This caused pre-3.5 CI builds to hang.

Now uses System.getProperty("sparkVersion") at build.sbt load time to
conditionally apply assembly settings only for Spark 3.5+. Pre-3.5
builds use default packaging (empty JAR, fast).

https://claude.ai/code/session_016uRqdQ8z5pj4UatnJYMySn
- ConnectEnabled.afterAll: only restore SparkSessionProvider if we
  actually replaced it in beforeAll. Previously on 3.5 (None branch,
  _previousSession stays null) afterAll would null out the classic
  session set by DataFrameSuiteBase.sqlBeforeAllTestCases
- ConnectBridge.start: synchronized + closes any prior session so
  repeated start() calls don't leak
- ConnectBridge.stop: synchronized for consistency
- Remove unused ConnectBridge.getSchemaJson method
- Fix import ordering in ConnectEnabled (com.holdenkarau after org.*)
- Clarify isConnectSession docstring
- Update SampleSparkConnectTest class-level doc to reflect that the
  bridge does validate gRPC routing on 3.5 (previously said "tests still
  pass, just not through Connect" which was misleading)

https://claude.ai/code/session_016uRqdQ8z5pj4UatnJYMySn
@holdenk holdenk merged commit e0ef5ec into main Apr 12, 2026
18 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants