From d8bc54d2e7d09d115dfb78381fbbac1ac0c90ab0 Mon Sep 17 00:00:00 2001 From: KimDaehyeon Date: Tue, 28 Apr 2026 03:55:21 +0900 Subject: [PATCH 1/2] Fix data loss in DataBufferUtils synchronous write Prior to this commit, WritableByteChannelSubscriber.hookOnNext() called iterator.next() exactly once. If a DataBuffer consisted of multiple NIO ByteBuffers (e.g., NettyDataBuffer wrapping a CompositeByteBuf), only the first buffer was written to the channel, and the remaining buffers were silently ignored and lost. This commit adds the missing while (iterator.hasNext()) outer loop to ensure all fragmented buffers exposed by the iterator are completely and safely written to the synchronous channel. See gh-36714 Signed-off-by: KimDaehyeon --- .../core/io/buffer/DataBufferUtils.java | 8 ++++--- .../core/io/buffer/DataBufferUtilsTests.java | 21 +++++++++++++++++++ 2 files changed, 26 insertions(+), 3 deletions(-) diff --git a/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java b/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java index 6c12d4ed3d86..be9f6f9cc756 100644 --- a/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java +++ b/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java @@ -1138,9 +1138,11 @@ protected void hookOnSubscribe(Subscription subscription) { protected void hookOnNext(DataBuffer dataBuffer) { try { try (DataBuffer.ByteBufferIterator iterator = dataBuffer.readableByteBuffers()) { - ByteBuffer byteBuffer = iterator.next(); - while (byteBuffer.hasRemaining()) { - this.channel.write(byteBuffer); + while (iterator.hasNext()) { + ByteBuffer byteBuffer = iterator.next(); + while (byteBuffer.hasRemaining()) { + this.channel.write(byteBuffer); + } } } this.sink.next(dataBuffer); diff --git a/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferUtilsTests.java b/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferUtilsTests.java index 00d95af03a18..35ebdd9c3aeb 100644 --- a/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferUtilsTests.java +++ b/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferUtilsTests.java @@ -338,6 +338,27 @@ void writeWritableByteChannel(DataBufferFactory bufferFactory) throws Exception channel.close(); } + @ParameterizedDataBufferAllocatingTest + void writeWritableByteChannelWithJoinedBuffer(DataBufferFactory bufferFactory) throws Exception { + super.bufferFactory = bufferFactory; + + DataBuffer foo = stringBuffer("foo"); + DataBuffer bar = stringBuffer("bar"); + DataBuffer joined = bufferFactory.join(List.of(foo, bar)); + + WritableByteChannel channel = Files.newByteChannel(tempFile, StandardOpenOption.WRITE); + + Flux writeResult = DataBufferUtils.write(Flux.just(joined), channel); + StepVerifier.create(writeResult) + .consumeNextWith(stringConsumer("foobar")) + .verifyComplete(); + + String result = String.join("", Files.readAllLines(tempFile)); + + assertThat(result).isEqualTo("foobar"); + channel.close(); + } + @ParameterizedDataBufferAllocatingTest void writeWritableByteChannelErrorInFlux(DataBufferFactory bufferFactory) throws Exception { super.bufferFactory = bufferFactory; From 80534f9df6aaa6b155ef3fb8494c057b0d4f268f Mon Sep 17 00:00:00 2001 From: Brian Clozel Date: Thu, 4 Jun 2026 12:09:52 +0200 Subject: [PATCH 2/2] Polishing contribution This commit adds further fixes in the same area, since there were similar bugs in the WriteCompletionHandler: * databuffers were not always emitted when fully read in the onNext hook * on completion, the iterator was closed too early, before it was fully read * on completion, writing the next bytebuffers from the iterator would always reuse the first one and not update the attachment Closes gh-36714 --- .../core/io/buffer/DataBufferUtils.java | 10 +++++++-- .../core/io/buffer/DataBufferUtilsTests.java | 21 +++++++++++++++++++ 2 files changed, 29 insertions(+), 2 deletions(-) diff --git a/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java b/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java index be9f6f9cc756..bf6a56f99f7a 100644 --- a/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java +++ b/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java @@ -1215,6 +1215,11 @@ protected void hookOnNext(DataBuffer dataBuffer) { failed(ex, attachment); } } + else { + iterator.close(); + this.sink.next(dataBuffer); + request(1); + } } @Override @@ -1238,7 +1243,6 @@ protected void hookOnComplete() { @Override public void completed(Integer written, Attachment attachment) { DataBuffer.ByteBufferIterator iterator = attachment.iterator(); - iterator.close(); long pos = this.position.addAndGet(written); ByteBuffer byteBuffer = attachment.byteBuffer(); @@ -1248,9 +1252,11 @@ public void completed(Integer written, Attachment attachment) { } else if (iterator.hasNext()) { ByteBuffer next = iterator.next(); - this.channel.write(next, pos, attachment, this); + Attachment nextAttachment = new Attachment(next, attachment.dataBuffer(), iterator); + this.channel.write(next, pos, nextAttachment, this); } else { + iterator.close(); this.sink.next(attachment.dataBuffer()); this.writing.set(false); diff --git a/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferUtilsTests.java b/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferUtilsTests.java index 35ebdd9c3aeb..6c0be595624a 100644 --- a/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferUtilsTests.java +++ b/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferUtilsTests.java @@ -466,6 +466,27 @@ private void verifyWrittenData(Flux writeResult) throws IOException assertThat(result).isEqualTo("foobarbazqux"); } + @ParameterizedDataBufferAllocatingTest + void writeAsynchronousFileChannelWithJoinedBuffer(DataBufferFactory bufferFactory) throws Exception { + super.bufferFactory = bufferFactory; + + DataBuffer foo = stringBuffer("foo"); + DataBuffer bar = stringBuffer("bar"); + DataBuffer joined = bufferFactory.join(List.of(foo, bar)); + + AsynchronousFileChannel channel = AsynchronousFileChannel.open(tempFile, StandardOpenOption.WRITE); + + Flux writeResult = DataBufferUtils.write(Flux.just(joined), channel); + StepVerifier.create(writeResult) + .consumeNextWith(stringConsumer("foobar")) + .verifyComplete(); + + String result = String.join("", Files.readAllLines(tempFile)); + + assertThat(result).isEqualTo("foobar"); + channel.close(); + } + @ParameterizedDataBufferAllocatingTest void writeAsynchronousFileChannelErrorInFlux(DataBufferFactory bufferFactory) throws Exception { super.bufferFactory = bufferFactory;