Skip to content

Remote Shuffles Ongoing Scratch Space#4

Closed
mccheah wants to merge 15 commits intooperation-remote-shuffles-basefrom
operation-remote-shuffles
Closed

Remote Shuffles Ongoing Scratch Space#4
mccheah wants to merge 15 commits intooperation-remote-shuffles-basefrom
operation-remote-shuffles

Conversation

@mccheah
Copy link
Copy Markdown
Owner

@mccheah mccheah commented Dec 6, 2018

Ongoing prototyping and experimentation for remote shuffle plugins.

Rather than constructing a full end to end prototype in one shot, I decided to build this prototype in sort of a pseudo-diary, "don't polish anything"-like format. I will incrementally build up this work and along the way highlight some key challenges, modifications, unforeseen circumstances, and novel ideas along the way.


sorter.insertRecord(
serBuffer.getBuf(), Platform.BYTE_ARRAY_OFFSET, serializedRecordSize, partitionId);
serBuffer.getBuf(), Platform.BYTE_ARRAY_OFFSET, serializedRecordSize, partitionId);
Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My IDE indented everything, will try to revert these when I can get around to it


public interface ShuffleDataIO {

void initialize();
Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One doesn't need to pass anything here because Utils.loadExtensions will pass a SparkConf object to the constructor if such a constructor exists.

The code was misread and we believe that partitions are all read from a single input stream. In fact this is completely contrary to the way it works; we're merging the contents of spill files together that have the same matching partition ID. The API is updated to reflect this.
* byte from the incoming input stream and multicast ot to multiple backends for replication.
* So OutputStream is too restrictive of an API.
*/
void appendBytesToPartition(InputStream streamReadingBytesToAppend);
Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this just return an instance of OutputStream?

if (tmp.exists() && !tmp.delete()) {
logger.error("Error while deleting temp file {}", tmp.getAbsolutePath());
if (pluggableWriteSupport != null) {
partitionLengths = combineAndWritePartitionsUsingPluggableWriter();
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these partitionLengths are basically ignored, right? Is the idea that mapOutputWriter.commitAllPartitions(); is doing the equivalent to creating the index file in the shuffle store?

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah these are ignored =/ work is needed to rip out the partition length stuff from the code paths that use the pluggable writer and delegate that stuff to, indeed, mapOutputWriter#commitAllPartitions.

ShufflePartitionWriter writer = mapOutputWriter.newPartitionWriter(i);
try {
try (OutputStream out = writer.openPartitionStream()) {
Utils.copyStream(in, out, false, false);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any reason you're not supporting transferTo here? in fact, I wonder if your api should actually expose java.nio.channels.WritableByteChannel as the destination, to allow for a zero-copy transfer. Even if you're writing to a remote destination, that would let the OS to move data from the file to the socket without copying into user space, and back out (aka zero-copy). I'm not sure exactly how this would effect the api, though, you might want to let ShufflePartitionWriters choose to expose either OutputStream or WritableByteChannel

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly because for a first prototype I wanted to keep things simple, and I think there's a chance that exposing channel level methods in the API would be confusing for devs. This stuff is already pretty low level as it is, and adding an additional switch in the API to support another low level option is only going to multiply the complexity even further.

I also don't fully understand how transferTo works in the first place in the context where we're not writing files to local disk. A suggestion on how we could support that as a patch on top of this work would be helpful to get us started.

For what it's worth I don't see many non-Spark native implementations being able to support channel based writes. Internally I'm experimenting with another storage system that certainly won't be able to support channel based write. If we did include channel support, it should be strictly optional while outputStream based write is mandatory. So I see channel support as being additive to this work and not replacing it.

Copy link
Copy Markdown

@squito squito left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than creating entirely new implementations using the new apis, would it make sense to have the old code just use the new apis? I think if your api exposes WritableByteChannel, than that would be possible. It would also help focus the diff on the parts need to change.

(at some point, before merging, we might decide not to do that, or to copy classes so the old version is left entirely untouched, but for now, in this scratch space, maybe this would be helpful?)

buffer.reset()
currentWriter = mapOutputWriter.newPartitionWriter(partitionId)
val currentWriterStream = new ShufflePartitionWriterOutputStream(
currentWriter, buffer, bufferSize)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure I understand the point of this -- is ShufflePartitionWriterOutputStream just a BufferedOutputStream but you get to reuse the buffer?

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup it's a reuseable buffer output stream. If there's already a utility for this that would be helpful.

// branch in ExternalSorter.
if (fastMergeEnabled && fastMergeIsSupported) {
if (pluggableWriteSupport != null) {
partitionLengths = mergeSpillsWithPluggableWriter(spills, compressionCodec);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you also want to support the "fast" paths here. I mentioned the transferTo part above, but even when working with output streams, you want to avoid decompressing and recompressing if possible, so you can just copy the bytes directly from one input stream to the other output stream -- thats the "Using fileStream-based fast merge" path.

(If it would be helpful, we can chat about this direclty)

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Compression is tricky, as is encryption really. The question is, should the underlying implementation of the API be the part that decides how to compress and encrypt the output in the remote storage? It's not immediately obvious. On one hand we can save work by taking the compressed and encrypted local files and passing them directly without unmarshalling them. On the other hand, should the implementation layer be allowed to decide to use its own encryption algorithms and compression codecs? That seems like a reasonable use case as well. But if the storage implementation is doing its own encryption and compression, isn't it encrypting and compressing already-encrypted and already-compressed data?

My intuition is that we want the implementation of the API to decide on its own encryption and compression, for example if it wants to do its own splitting of partitions and thus would prefer to encrypt and compress each block individually. In that case I don't think it makes a lot of sense to pass along encrypted and compressed bytes to this OutputStream. But again I think this is open to discussion.

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do some of these questions around encryption and compression also influence how / if we support transferTo and channel-based writes? This part of the code is still something I'm trying to understand better.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this really influences transferTo -- I think you can only use transferTo with the "fast path" here, but this is a good improvement even for the outputstream version.

I see your point about the external service maybe wanting to manage its own encryption and compression -- but I also don't really think you want to tackle that here. I think it'll keep this first step much easier if we still leave that up to the spark shuffle writers.

@mccheah
Copy link
Copy Markdown
Owner Author

mccheah commented Dec 20, 2018

Rather than creating entirely new implementations using the new apis, would it make sense to have the old code just use the new apis?

As I mentioned in #4 (comment), this would indeed require channels to be exposed in the API but I don't know if we're ready to do that yet. We could certainly have an API that returns Optional<WriteableByteChannel> that defaults to empty, or have a mix-in interface that includes SupportsWriteToByteChannel (kind of like how DataSource V2 is approaching their composite APIs). At this point though I really want to get down something simple and easy to implement and reason about and just inserting code into the right places seems less prone to error than trying to refactor the existing shuffle code, especially given the complexity of the existing shuffle code. But I could be persuaded otherwise if I see a prototype that does this (it would also have to include the implementation side that @yifeih is working on in yifeih#1).

@squito
Copy link
Copy Markdown

squito commented Jan 2, 2019

Sorry for the long delay (and happy new year!).

So at a high level, I agree those are good questions about compression, encryption, etc. but I see a different path forward here. Those are good questions in general, or if we're starting from scratch -- but we've already got all this code in spark and we're just trying to extend it. The key thing we're trying to do is to enable the shuffle data to live elsewhere. Maybe when the data is living in an external service, it starts to allow different decisions around encryption, compression, etc. But until we have an interest in building an implementation of a service that would take advantage of those knobs, I think we shouldn't worry about them for now. The api can continue to evolve.

any reason you're not supporting transferTo here?

Mostly because for a first prototype I wanted to keep things simple

I was actually thinking this would only complicate the implementation of the shuffle writers -- but that code already exists. Each shuffle service would be allowed to provide a WritableByteChannel, but wouldn't have to. I have to look at the code again, but I actually think you could push that decision way up to when creating the shuffle writer. All of the new shuffle services should be free to not provide a WritableByteChannel. Other services could take advantage of transferTo for moves data from local temporary files to a socket, but its possible nothing provides the right api for that. (java.nio.channels.SocketChannel is a WritableByteChannel, but eg. if you are writing to hdfs you aren't manually writing to sockets, you're writing to an HDFSOutputStream).

Now, if we were starting from scratch, that seems like an unnecessary complication, but again we've already got all this spark code. IMHO the big win of this is that existing implementation fits within the new api.

appId, handle.shuffleId, mapId)
blockIds.map {
case blockId@ShuffleBlockId(_, _, reduceId) =>
(blockId, reader.fetchPartition(reduceId))
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this part should probably be pushed behind the api, so its reader.fetchPartitions(blockIds). That's necessary for allowing a lot of the optimizations in the current implementation, like having multiple simultaneous requests & bundling requests for multiple blocks from one source. The second may not make sense for some of these implementations, but the first almost certainly does. Also, it allows the current implementation to move behind this api, which again I think is iimportant for maintainability etc.

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is effectively bulk load, yeah? But we would have to return something like List<InputStream> fetchPartitions(int[] reduceIds). But opening multiple input streams simultaneously as part of the API seems potentially risky.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

well you could just abstract it at the same way ShuffleBlockFetcherIterator does. You give it the full set of blocks, and it gives you an iterator. An implementation is free to do it one at a time at first (don't do anything until next() is called, and then just block until you get the full result). And then a later optimization can pipeline them.

@squito
Copy link
Copy Markdown

squito commented Jan 15, 2019

high level question as I'm thinking about specific implementations -- you've mentioned that some implementations may have the shuffle writers talk to specific instances of the shuffle services (eg. they'd have a list of 10 nodes in the shuffle service, rather than talking to one central load-balancer), and that the driver would manage getting that list out to the executors. Can you point out where the driver would send that list out? And how it would be update that list over time (node 1 goes down, or due to load, the shuffle service somehow decides to add nodes 10-100). Or would that be considered something that just didn't work in that type of implementation, and would only be possible for the implementations where the service was behind something centralized (like the hdfs namenode)?

@mccheah
Copy link
Copy Markdown
Owner Author

mccheah commented Jan 15, 2019

@ifilonenko has some work on that here: https://github.com/mccheah/spark/pull/6/files

@mccheah
Copy link
Copy Markdown
Owner Author

mccheah commented Jan 15, 2019

We're also considering adding a driver-side API to ShuffleDataIO which has an initializeApplication and onApplicationStopped (names to be decided). That module can do the monitoring without needing to make changes to BlockManager or any other existing code. It makes sense to have per-application lifecycle be part of the API as well - we found that for other implementations that would also be useful, primarily to set up hooks and trigger cleanup on unclean exits.

@vanzin
Copy link
Copy Markdown

vanzin commented Jan 18, 2019

Hmm... I'm playing with this code a bit, and something came to my mind. How do you handle speculation here?

Let's say you have a simple remote storage where the block ID suffices for identifying the shuffle block. What happens if two tasks are concurrently writing that blockID?

I think this can be handled by the shuffle io impl itself (e.g. "first commit wins"), but wondering if I'm missing something. Can tasks fail after the shuffle is committed?

}
}
}
threwException = false;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't delete this line

@github-actions
Copy link
Copy Markdown

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

@github-actions github-actions bot added the Stale label Mar 19, 2020
@github-actions github-actions bot closed this Mar 20, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants