Remote shuffle4#5
Open
karuppayya wants to merge 324 commits intomasterfrom
Open
Conversation
729a647 to
641f4e3
Compare
a40be17 to
01d0f68
Compare
…g batches ### What changes were proposed in this pull request? The following changes are added to support running queries in Real-time Mode 1. RealTimeScanExec. This is a new scan added for Real-time mode which enables queries to run batches for a fix time duration. It is also allows us to collect the offsets tasks have processed up to at the end of the batch. 2. Offset management changes needed for Real-time Mode. In Real-time Mode, offsets that represent the point that tasks has processed up to are collected at the end of a batch via RealTimeScanExec and sent to the driver. The driver will then write the offsets to the offset log to persist / checkpoint the progress of the batch. The commit log for the batch will also be written afterwards to ensure compatibility with existing streaming queries. This differs from existing streaming queries as offsets that a batch will process up to are determined and written to the offset log before executing the batch. ### Why are the changes needed? With the changes in this PR, simple single stage stateless queries can be executed in Real-time Mode. With this PR, we can run queries in Real-time Mode that read from a MemorySource (LowLatencyMemorySource), do some processing, and write the result to MemorySink (ContinuousMemorySink). ### Does this PR introduce _any_ user-facing change? Yes, adds functionality to run queries in Real-time Mode ### How was this patch tested? Many tests are added in this PR. Additional tests will be added in a subsequent PR to limit the size of this PR. ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#52620 from jerrypeng/SPARK-53915. Authored-by: Jerry Peng <jerry.peng@databricks.com> Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
…`Python 3.14` Dockefile ### What changes were proposed in this pull request? This PR aims to install `pyarrow/torch/torchvision` packages to Python 3.14 Dockefile. After this PR, the only missing dependency will be `MLFlow`. ### Why are the changes needed? Finally, they supports `Python 3.14` officially. - https://pypi.org/project/pyarrow/22.0.0/ (2025-10-24) - https://pypi.org/project/torch/2.9.0/ ### Does this PR introduce _any_ user-facing change? No, this is an infra change. ### How was this patch tested? Manual review. After merging, `Python 3.14` CI will provide a test coverage for this. https://github.com/apache/spark/actions/workflows/build_python_3.14.yml ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#52751 from dongjoon-hyun/SPARK-53835. Authored-by: Dongjoon Hyun <dongjoon@apache.org> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
…lated configures ### What changes were proposed in this pull request? Update the documents of arrow-batching related configures ### Why are the changes needed? remove ``` This configuration is not effective for the grouping API such as DataFrame(.cogroup).groupby.applyInPandas because each group becomes each ArrowRecordBatch. ``` to reflect recent changes on arrow-batching ### Does this PR introduce _any_ user-facing change? yes, doc-only changes ### How was this patch tested? CI ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#52753 from zhengruifeng/update_doc_max_records. Authored-by: Ruifeng Zheng <ruifengz@apache.org> Signed-off-by: Ruifeng Zheng <ruifengz@apache.org>
…methods ### What changes were proposed in this pull request? This PR implements 107 simple methods of `SparkConnectDatabaseMetaData` defined in `java.sql.DatabaseMetaData` interface. ### Why are the changes needed? Improve the API implementation coverage of the Connect JDBC driver. There are 179 methods defined in `java.sql.DatabaseMetaData` interface, after this patch, the coverage reaches 144/179. ### Does this PR introduce _any_ user-facing change? No, new feature. ### How was this patch tested? UTs are added. ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#52741 from pan3793/SPARK-54013. Authored-by: Cheng Pan <chengpan@apache.org> Signed-off-by: yangjie01 <yangjie01@baidu.com>
…alid Map ### What changes were proposed in this pull request? This PR fixes a bug from apache#52557, where we are reading an additional field if all the requested fields of a struct are missing from the Parquet file. We used to always pick the cheapest leaf column of the struct. However, if this leaf was inside a Map column, then we'd generate an invalid Map type like the following: ``` optional group _1 (MAP) { repeated group key_value { required boolean key; } } ``` Since there is no `value` field in this group, we'd fail later when trying to convert this Parquet type to a Spark type. This PR changes the additional field selection logic to enforce selecting a field from both the key and the value of the map, which can now give us a type like following: ``` optional group _1 (MAP) { repeated group key_value { required boolean key; optional group value { optional int32 _2; } } } ``` ### Why are the changes needed? To fix a critical bug where we would throw an exception when reading a Parquet file. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? New unit tests. ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#52758 from ZiyaZa/fix-missing-struct-with-map. Authored-by: Ziya Mukhtarov <ziya5muxtarov@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request? Add new golden file tests for analysis edge-cases discovered during Analyzer support and development. ### Why are the changes needed? Harden Spark testing coverage. ### Does this PR introduce _any_ user-facing change? Test-only change. ### How was this patch tested? Adding new golden files. ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#52734 from vladimirg-db/vladimir-golubev_data/new-golden-files-for-analyzer-edge-cases-2. Authored-by: Vladimir Golubev <vladimir.golubev@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…DependencyOverrides` in `SparkBuild.scala` ### What changes were proposed in this pull request? This PR proposes to retrieve dependency version from `pom.xml` for `DependencyOverrides` in `SparkBuild.scala`. ### Why are the changes needed? Currently, version strings for some dependencies are hard coded in `DependencyOverrides` in `SparkBuild`, so developers need to keep consistent with version strings specified in `pom.xml` manually. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? GA. ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#52760 from sarutak/version-from-pom. Authored-by: Kousuke Saruta <sarutak@amazon.co.jp> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
…ross Catalyst ### What changes were proposed in this pull request? Added Geography and Geometry accessors to core row/column interfaces, extended codegen and physical type handling to properly recognize geospatial types, enabled writing/reading of Geography and Geometry values in unsafe writer, and added other necessary plumbing for Geography and Geometry in projection/row utilities in order to thread through the new accessors. Note that the GEOMETRY and GEOGRAPHY physical types were recently included to Spark SQL as part of: apache#52629. ### Why are the changes needed? To provide first-class support for GEOGRAPHY and GEOMETRY within Catalyst. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Added new tests to: - `GenerateUnsafeProjectionSuite.scala` - `UnsafeRowWriterSuite.scala` ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#52723 from uros-db/geo-interfaces. Authored-by: Uros Bojanic <uros.bojanic@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…ZGC or ShenandoahGC and ON_HEAP are used ### What changes were proposed in this pull request? If ZGC or ShenandoahGC and ON_HEAP are used, we should consider the long array object header size. Test ZGC and ShenandoahGC with and without this optimization: Test code: https://gist.githubusercontent.com/wankunde/b7cf073b31f3a4bf5b525ab4a6b1f700/raw/85db977f5ef8609db27ced49d10ace9a248433c1/TestBlockSize.java Test result: | | Max allocated size without optimization | Max allocated size with optimization | |--------------|-----------------------------------------|---------------------------------------| | G1GC | 817889280 | 1019211984 | | ZGC | 671088640 | 1006629120 | | ShenandoahGC | 855638016 | 964686240 | Detail test result: https://gist.github.com/wankunde/6f3b6ed8abaa247307469ab5a4fae2bb ### Why are the changes needed? Optimize spark memory usage. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Manual test ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#52754 from wankunde/zgc2. Lead-authored-by: WanKun <wankun@bilibili.com> Co-authored-by: Dongjoon Hyun <dongjoon@apache.org> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
…cution classes ### What changes were proposed in this pull request? Introduce internal server-side `Geography` and `Geometry` execution classes in catalyst. Note that the corresponding low-level physical holders (`GeographyVal` and `GeometryVal`) for geospatial types have been previously added as part of apache#52629. ### Why are the changes needed? Establishing a clear internal execution layer for geospatial operations in catalyst and unblocking downstream work for implementing built-in ST functions and geospatial storage support. ### Does this PR introduce _any_ user-facing change? No. This PR does not introduce any new public API, catalyst expressions, nor user-facing SQL functions. Those will be added in the future. ### How was this patch tested? Added new Java test suites for the execution classes: - `GeographyExecutionSuite` - `GeometryExecutionSuite` ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#52737 from uros-db/geo-server-classes. Authored-by: Uros Bojanic <uros.bojanic@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…s for Geometry and Geography ### What changes were proposed in this pull request? This PR follows up on apache#51204, evolving `expressions.proto` to also reserve two numbers in Spark Connect for geospatial types (`Geometry` and `Geography`). ### Why are the changes needed? Geospatial types need to have their proto file representation in expressions as well, and we need to reserve it. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Regenerated Python bindings are verified via the appropriate PySpark Connect unit & integration tests. ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#52746 from uros-db/geo-exprs-proto. Authored-by: Uros Bojanic <uros.bojanic@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
4b325b9 to
65081c1
Compare
…` to `500` to make a test in `KafkaMicroBatchV1SourceWithConsumerSuite` stable ### What changes were proposed in this pull request? This PR proposes to increase `kafka.metadata.max.age.ms` from `1` to `500` to make a test `Query with Trigger.AvailableNow should throw error when topic partitions got unavailable during subsequent batches` in `KafkaMicroBatchV1SourceWithConsumerSuite` stable. Recently, this test frequently fails. https://github.com/apache/spark/actions/runs/18872699886/job/53854858890 https://github.com/apache/spark/actions/runs/18551681640/job/52880215577 ``` [info] - Query with Trigger.AvailableNow should throw error when topic partitions got unavailable during subsequent batches *** FAILED *** (1 minute) [info] java.lang.AssertionError: assertion failed: Exception tree doesn't contain the expected exception with message: Some of partitions in Kafka topic(s) have been lost during running query with Trigger.AvailableNow. [info] org.scalatest.exceptions.TestFailedException: isPropagated was false Partition [topic-41, 1] metadata not propagated after timeout [info] at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472) [info] at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471) [info] at org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1231) [info] at org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:1295) [info] at org.apache.spark.sql.kafka010.KafkaTestUtils.$anonfun$waitUntilMetadataIsPropagated$1(KafkaTestUtils.scala:614) [info] at org.scalatest.enablers.Retrying$$anon$4.makeAValiantAttempt$1(Retrying.scala:184) [info] at org.scalatest.enablers.Retrying$$anon$4.tryTryAgain$2(Retrying.scala:196) [info] at org.scalatest.enablers.Retrying$$anon$4.retry(Retrying.scala:226) [info] at org.scalatest.concurrent.Eventually.eventually(Eventually.scala:348) [info] at org.scalatest.concurrent.Eventually.eventually$(Eventually.scala:347) [info] at org.scalatest.concurrent.Eventually$.eventually(Eventually.scala:457) [info] at org.apache.spark.sql.kafka010.KafkaTestUtils.waitUntilMetadataIsPropagated(KafkaTestUtils.scala:613) [info] at org.apache.spark.sql.kafka010.KafkaTestUtils.$anonfun$createTopic$1(KafkaTestUtils.scala:378) [info] at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:256) [info] at org.apache.spark.sql.kafka010.KafkaTestUtils.createTopic(KafkaTestUtils.scala:377) [info] at org.apache.spark.sql.kafka010.KafkaMicroBatchSourceSuiteBase.$anonfun$new$11(KafkaMicroBatchSourceSuite.scala:352) [info] at org.apache.spark.sql.kafka010.KafkaMicroBatchSourceSuiteBase.$anonfun$new$11$adapted(KafkaMicroBatchSourceSuite.scala:349) [info] at org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.callBatchWriter(ForeachBatchSink.scala:56) [info] at org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatch(ForeachBatchSink.scala:49) [info] at org.apache.spark.sql.execution.streaming.runtime.MicroBatchExecution.$anonfun$runBatch$19(MicroBatchExecution.scala:1063) [info] at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$8(SQLExecution.scala:176) [info] at org.apache.spark.sql.execution.SQLExecution$.withSessionTagsApplied(SQLExecution.scala:284) [info] at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$7(SQLExecution.scala:138) [info] at org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94) [info] at org.apache.spark.sql.artifact.ArtifactManager.$anonfun$withResources$1(ArtifactManager.scala:112) [info] at org.apache.spark.sql.artifact.ArtifactManager.withClassLoaderIfNeeded(ArtifactManager.scala:106) [info] at org.apache.spark.sql.artifact.ArtifactManager.withResources(ArtifactManager.scala:111) [info] at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$6(SQLExecution.scala:138) [info] at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:307) [info] at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$1(SQLExecution.scala:137) [info] at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:804) [info] at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId0(SQLExecution.scala:91) [info] at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:249) [info] at org.apache.spark.sql.execution.streaming.runtime.MicroBatchExecution.$anonfun$runBatch$18(MicroBatchExecution.scala:1054) [info] at org.apache.spark.sql.execution.streaming.runtime.ProgressContext.reportTimeTaken(ProgressReporter.scala:200) [info] at org.apache.spark.sql.execution.streaming.runtime.MicroBatchExecution.runBatch(MicroBatchExecution.scala:1054) [info] at org.apache.spark.sql.execution.streaming.runtime.MicroBatchExecution.$anonfun$executeOneBatch$2(MicroBatchExecution.scala:513) [info] at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18) [info] at org.apache.spark.sql.execution.streaming.runtime.ProgressContext.reportTimeTaken(ProgressReporter.scala:200) [info] at org.apache.spark.sql.execution.streaming.runtime.MicroBatchExecution.executeOneBatch(MicroBatchExecution.scala:478) [info] at org.apache.spark.sql.execution.streaming.runtime.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:458) [info] at org.apache.spark.sql.execution.streaming.runtime.MicroBatchExecution.$anonfun$runActivatedStream$1$adapted(MicroBatchExecution.scala:458) [info] at org.apache.spark.sql.execution.streaming.runtime.TriggerExecutor.runOneBatch(TriggerExecutor.scala:40) [info] at org.apache.spark.sql.execution.streaming.runtime.TriggerExecutor.runOneBatch$(TriggerExecutor.scala:38) [info] at org.apache.spark.sql.execution.streaming.runtime.MultiBatchExecutor.runOneBatch(TriggerExecutor.scala:60) [info] at org.apache.spark.sql.execution.streaming.runtime.MultiBatchExecutor.execute(TriggerExecutor.scala:65) [info] at org.apache.spark.sql.execution.streaming.runtime.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:458) [info] at org.apache.spark.sql.execution.streaming.runtime.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:347) [info] at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18) [info] at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:804) [info] at org.apache.spark.sql.execution.streaming.runtime.StreamExecution.org$apache$spark$sql$execution$streaming$runtime$StreamExecution$$runStream(StreamExecution.scala:307) [info] at org.apache.spark.sql.execution.streaming.runtime.StreamExecution$$anon$1.run(StreamExecution.scala:230) [info] at scala.Predef$.assert(Predef.scala:279) [info] at org.apache.spark.TestUtils$.assertExceptionMsg(TestUtils.scala:198) [info] at org.apache.spark.sql.kafka010.KafkaMicroBatchSourceSuiteBase.$anonfun$new$9(KafkaMicroBatchSourceSuite.scala:374) [info] at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18) [info] at org.scalatest.enablers.Timed$$anon$1.timeoutAfter(Timed.scala:127) [info] at org.scalatest.concurrent.TimeLimits$.failAfterImpl(TimeLimits.scala:282) [info] at org.scalatest.concurrent.TimeLimits.failAfter(TimeLimits.scala:231) [info] at org.scalatest.concurrent.TimeLimits.failAfter$(TimeLimits.scala:230) [info] at org.apache.spark.SparkFunSuite.failAfter(SparkFunSuite.scala:68) [info] at org.apache.spark.SparkFunSuite.$anonfun$test$2(SparkFunSuite.scala:154) [info] at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85) [info] at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83) [info] at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104) [info] at org.scalatest.Transformer.apply(Transformer.scala:22) [info] at org.scalatest.Transformer.apply(Transformer.scala:20) [info] at org.scalatest.funsuite.AnyFunSuiteLike$$anon$1.apply(AnyFunSuiteLike.scala:226) [info] at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:226) [info] at org.scalatest.funsuite.AnyFunSuiteLike.invokeWithFixture$1(AnyFunSuiteLike.scala:224) [info] at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTest$1(AnyFunSuiteLike.scala:236) [info] at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306) [info] at org.scalatest.funsuite.AnyFunSuiteLike.runTest(AnyFunSuiteLike.scala:236) [info] at org.scalatest.funsuite.AnyFunSuiteLike.runTest$(AnyFunSuiteLike.scala:218) [info] at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(SparkFunSuite.scala:68) [info] at org.scalatest.BeforeAndAfterEach.runTest(BeforeAndAfterEach.scala:234) [info] at org.scalatest.BeforeAndAfterEach.runTest$(BeforeAndAfterEach.scala:227) [info] at org.apache.spark.SparkFunSuite.runTest(SparkFunSuite.scala:68) [info] at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTests$1(AnyFunSuiteLike.scala:269) [info] at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413) [info] at scala.collection.immutable.List.foreach(List.scala:323) [info] at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401) [info] at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396) [info] at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475) [info] at org.scalatest.funsuite.AnyFunSuiteLike.runTests(AnyFunSuiteLike.scala:269) [info] at org.scalatest.funsuite.AnyFunSuiteLike.runTests$(AnyFunSuiteLike.scala:268) [info] at org.scalatest.funsuite.AnyFunSuite.runTests(AnyFunSuite.scala:1564) [info] at org.scalatest.Suite.run(Suite.scala:1114) [info] at org.scalatest.Suite.run$(Suite.scala:1096) [info] at org.scalatest.funsuite.AnyFunSuite.org$scalatest$funsuite$AnyFunSuiteLike$$super$run(AnyFunSuite.scala:1564) [info] at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$run$1(AnyFunSuiteLike.scala:273) [info] at org.scalatest.SuperEngine.runImpl(Engine.scala:535) [info] at org.scalatest.funsuite.AnyFunSuiteLike.run(AnyFunSuiteLike.scala:273) [info] at org.scalatest.funsuite.AnyFunSuiteLike.run$(AnyFunSuiteLike.scala:272) [info] at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:68) [info] at org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213) [info] at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210) [info] at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208) [info] at org.apache.spark.SparkFunSuite.run(SparkFunSuite.scala:68) [info] at org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:321) [info] at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:517) [info] at sbt.ForkMain$Run.lambda$runTest$1(ForkMain.java:414) [info] at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) [info] at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) [info] at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) [info] at java.base/java.lang.Thread.run(Thread.java:840) ``` I found the cause is [getPartitionInfo](https://github.com/apache/spark/blob/4622e8584e37289b3e1c03d4694e6431cc10895d/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala#L604) which is invoked through [createTopic](https://github.com/apache/spark/blob/4622e8584e37289b3e1c03d4694e6431cc10895d/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala#L352) can consistently returns `None`. This means that the metadata is unexpectedly expired. I tried to increased `kafka.metadata.max.age.ms` to `10`, `100` and `500` and continued to run this test for 20 min. As a result, I found this test never fails with `500` so I chose this value. ### Why are the changes needed? For test stability. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? As I mentioned above, this test never fails with this change on my environment even if I continued to run this test for 20 min. ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#52766 from sarutak/SPARK-54058. Authored-by: Kousuke Saruta <sarutak@amazon.co.jp> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
… stateless streaming workload specifically ### What changes were proposed in this pull request? This PR proposes to introduce a new SQL config to control AQE for stateless streaming workload. ### Why are the changes needed? We had a post review comment about the impact of turning off AQE config across batch and streaming. apache#52642 (comment) To reduce the side effect of turning off AQE for stateless streaming workload, we demand a new SQL config to control only to stateless streaming workload while leaving the AQE for batch query. ### Does this PR introduce _any_ user-facing change? Yes, new config is proposed to control the AQE enablement for stateless streaming query. ### How was this patch tested? New UT. ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#52752 from HeartSaVioR/SPARK-53941-followup. Authored-by: Jungtaek Lim <kabhwan.opensource@gmail.com> Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
e1756bc to
ef18b00
Compare
### What changes were proposed in this pull request?
Uses a difference error message when kill-on-idle-timeout.
### Why are the changes needed?
Currently the error message when kill-on-idle-timeout is same as when the Python worker crashes.
```py
>>> from pyspark.sql.functions import udf
>>> import time
>>>
>>> udf
... def f(x):
... time.sleep(2)
... return str(x)
...
>>> spark.conf.set("spark.sql.execution.pyspark.udf.idleTimeoutSeconds", "1s")
>>> spark.conf.set("spark.sql.execution.pyspark.udf.killOnIdleTimeout", "true")
>>>
>>> spark.range(1).select(f("id")).show()
25/10/27 16:31:16 WARN PythonUDFWithNamedArgumentsRunner: Idle timeout reached for Python worker (timeout: 1 seconds). No data received from the worker process: handle.map(_.isAlive) = Some(true), channel.isConnected = true, channel.isBlocking = false, selector.isOpen = true, selectionKey.isValid = true, selectionKey.interestOps = 1, hasInputs = false
25/10/27 16:31:16 WARN PythonUDFWithNamedArgumentsRunner: Terminating Python worker process due to idle timeout (timeout: 1 seconds)
25/10/27 16:31:16 ERROR Executor: Exception in task 15.0 in stage 0.0 (TID 15)
org.apache.spark.SparkException: Python worker exited unexpectedly (crashed). Consider setting 'spark.sql.execution.pyspark.udf.faulthandler.enabled' or'spark.python.worker.faulthandler.enabled' configuration to 'true' for the better Python traceback.
...
```
It should show a different message to distinguish the cause:
```py
25/10/27 16:34:55 WARN PythonUDFWithNamedArgumentsRunner: Idle timeout reached for Python worker (timeout: 1 seconds). No data received from the worker process: handle.map(_.isAlive) = Some(true), channel.isConnected = true, channel.isBlocking = false, selector.isOpen = true, selectionKey.isValid = true, selectionKey.interestOps = 1, hasInputs = false
25/10/27 16:34:55 WARN PythonUDFWithNamedArgumentsRunner: Terminating Python worker process due to idle timeout (timeout: 1 seconds)
25/10/27 16:34:55 ERROR Executor: Exception in task 15.0 in stage 0.0 (TID 15)
org.apache.spark.api.python.PythonWorkerException: Python worker process terminated due to idle timeout (timeout: 1 seconds)
...
```
### Does this PR introduce _any_ user-facing change?
Yes, the error message when kill-on-idle-timeout is different.
### How was this patch tested?
Modified the related tests.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes apache#52749 from ueshin/issues/SPARK-54047/kill_on_idle_timeout.
Authored-by: Takuya Ueshin <ueshin@databricks.com>
Signed-off-by: Takuya Ueshin <ueshin@databricks.com>
### What changes were proposed in this pull request? Skip removing `python/coverage.xml` in `run-pip-tests` ### Why are the changes needed? `run-pip-tests` will clear existing coverage data from previous tests. For example: https://github.com/apache/spark/actions/runs/16342241585/job/46167521633 has no coverage data uploaded because the file is removed after the test. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? This is a script change, should reflect in github actions. ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#51552 from gaogaotiantian/keep-coverage-data. Authored-by: Tian Gao <gaogaotiantian@hotmail.com> Signed-off-by: Ruifeng Zheng <ruifengz@apache.org>
…hon 3.14
### What changes were proposed in this pull request?
This PR aims to skip `test_in_memory_data_source` in Python 3.14 for now in order to reveal other failures from the CI.
### Why are the changes needed?
The test case fails in both `classic` and `connect` modes and SPARK-54065 is created to fix it after further investigation in both modes.
- SPARK-54065 Fix `test_in_memory_data_source` in Python 3.14
```
======================================================================
ERROR [0.007s]: test_in_memory_data_source (pyspark.sql.tests.test_python_datasource.PythonDataSourceTests.test_in_memory_data_source)
```
```
======================================================================
ERROR [0.014s]: test_in_memory_data_source (pyspark.sql.tests.connect.test_parity_python_datasource.PythonDataSourceParityTests.test_in_memory_data_source)
```
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Pass the CIs. Manually test on Python 3.14.
```
$ python/run-tests --parallelism 1 --testnames pyspark.sql.tests.test_python_datasource --python-executables python3
Running PySpark tests. Output is in /Users/dongjoon/APACHE/spark-merge/python/unit-tests.log
Will test against the following Python executables: ['python3']
Will test the following Python tests: ['pyspark.sql.tests.test_python_datasource']
python3 python_implementation is CPython
python3 version is: Python 3.14.0
Starting test(python3): pyspark.sql.tests.test_python_datasource (temp output: /Users/dongjoon/APACHE/spark-merge/python/target/0b6ce5cf-6fca-4146-9fdc-739521349dce/python3__pyspark.sql.tests.test_python_datasource__0iy64w4h.log)
Finished test(python3): pyspark.sql.tests.test_python_datasource (21s) ... 1 tests were skipped
Tests passed in 21 seconds
Skipped tests in pyspark.sql.tests.test_python_datasource with python3:
test_in_memory_data_source (pyspark.sql.tests.test_python_datasource.PythonDataSourceTests.test_in_memory_data_source) ... skip (0.000s)
```
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes apache#52769 from dongjoon-hyun/SPARK-54066.
Authored-by: Dongjoon Hyun <dongjoon@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
### What changes were proposed in this pull request? This PR aims to skip `test_to_feather` in Python 3.14 test until SPARK-54068 fixes it in the `Connect` mode. ### Why are the changes needed? We have `test_to_feather` test coverage for both `classic` and `connect` mode, but only `connect` mode fails currently in `Python 3.14`. - https://github.com/apache/spark/actions/workflows/build_python_3.14.yml - https://github.com/apache/spark/actions/runs/18864049006/job/53876343283 We had better focus on `classic` first by skipping this now. SPARK-54068 is created to track it for `connect` mode. ### Does this PR introduce _any_ user-facing change? No, this is a test only change. ### How was this patch tested? Pass the CIs and manually test. ``` $ python/run-tests --parallelism 1 --testnames pyspark.pandas.tests.connect.io.test_parity_feather --python-executables python3 Running PySpark tests. Output is in /Users/dongjoon/APACHE/spark-merge/python/unit-tests.log Will test against the following Python executables: ['python3'] Will test the following Python tests: ['pyspark.pandas.tests.connect.io.test_parity_feather'] python3 python_implementation is CPython python3 version is: Python 3.14.0 Starting test(python3): pyspark.pandas.tests.connect.io.test_parity_feather (temp output: /Users/dongjoon/APACHE/spark-merge/python/target/4d8b6c19-57bf-49af-a582-43bc4c056c78/python3__pyspark.pandas.tests.connect.io.test_parity_feather__h5rneuu6.log) Finished test(python3): pyspark.pandas.tests.connect.io.test_parity_feather (4s) ... 1 tests were skipped Tests passed in 4 seconds Skipped tests in pyspark.pandas.tests.connect.io.test_parity_feather with python3: test_to_feather (pyspark.pandas.tests.connect.io.test_parity_feather.FeatherParityTests.test_to_feather) ... skip (0.000s) ``` ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#52771 from dongjoon-hyun/SPARK-54069. Authored-by: Dongjoon Hyun <dongjoon@apache.org> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
…ordsPerBatch ### What changes were proposed in this pull request? Add tests for negative maxRecordsPerBatch ### Why are the changes needed? to improve test coverage ### Does this PR introduce _any_ user-facing change? no, test-only ### How was this patch tested? CI ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#52775 from zhengruifeng/cogroup_followup. Authored-by: Ruifeng Zheng <ruifengz@apache.org> Signed-off-by: Ruifeng Zheng <ruifengz@apache.org>
200e141 to
2090ddc
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
No description provided.