Skip to content

Conversation

@YaoRazor
Copy link

@YaoRazor YaoRazor commented Dec 12, 2025

What changes were proposed in this pull request?

This PR represents Pinterest work to aggressively delete shuffle data

Ack

The main work is done by @CodingCat while he is working here, I am currently rolling out the feature and see good results

Why are the changes needed?

Does this PR resolve a correctness bug?

Does this PR introduce any user-facing change?

How was this patch tested?


trait RunningStageManager {
def isRunningStage(stageId: Int): Boolean
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe you can go through this PR first.

#3109

I saw something duplicated.

Image

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a version based on the original implementation of that PR, I will clean it up to merge to master branch

@FMX
Copy link
Contributor

FMX commented Dec 14, 2025

Is there any design doc for this feature? This feature is what I needed.

@CodingCat
Copy link
Contributor

CodingCat commented Dec 14, 2025

@FMX I will post the design description/doc while trying to merge this feature to master branch (as it was developed based on 0.4 internally)... on the other side, I am discussing with @YaoRazor , it seems we missed certain important pieces of code when upstreaming, so please do not bother look at the current implementation, it missed many things

@CodingCat
Copy link
Contributor

I have started merging code to main branch at #3569, should be ready before the new year

@CodingCat
Copy link
Contributor

Hi, all #3569 is ready to review , appreciate the feedbacks in advance

Copy link
Contributor

@mridulm mridulm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did not go over the entire PR.

From a quick look, this appears to be an unsound change. Shuffle gets used not just in the currently active stages/jobs - but also 'future' jobs. AQE based sql query execution, RDD reuse, etc all leverage this - I want to make sure I am not missing something here.


import org.apache.spark.scheduler.{EventLoggingListener, SparkListenerInterface}

object CelebornSparkContextHelper {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cleanup methods from this object which are not required ?
It looks like the only thing needed is eventLogger right now.


import org.apache.spark.SparkContext

trait RunningStageManager {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is this being used ?

private def dagScheduler = SparkContext.getActive.get.dagScheduler

override def isRunningStage(stageId: Int): Boolean = {
dagScheduler.runningStages.map(_.id).contains(stageId)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is unsafe - runningStages is expected to be used only from within DAGScheduler

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants