From b3bf0735e760a125b9867b5f1793e5eb2f6aa36d Mon Sep 17 00:00:00 2001 From: Jeremy Justus Date: Mon, 27 Apr 2026 03:05:52 -0400 Subject: [PATCH] frame: drain buffered bytes before propagating QUIC connection error MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fixes #338. When a QUIC stack delivers stream data and a CONNECTION_CLOSE frame in the same recv batch (common on Linux + io_uring with coalesced UDP datagrams), the BufRecvStream::poll_read backing FrameStream returns the connection error before the application has had a chance to decode HEADERS / DATA frames whose bytes are already sitting in the local buffer. The current `?` propagation in FrameStream::try_recv discards those buffered bytes — clients see StreamError::ConnectionError instead of the response they would otherwise have parsed. This patch hoists the QUIC error out of `try_recv`'s `?` path. When poll_read returns an error, it is cached for the current iteration; the decoder is given one chance to consume `self.stream.buf_mut()` first. If the decoder produces a frame, the application sees it. Only when no frame can be decoded from the buffered bytes does the error surface. The fix terminates: the cached error is consumed on the iteration in which it was observed, so a partial-frame buffer cannot trigger an infinite loop. The same change is applied to FrameStream::poll_data to recover body bytes that were already buffered before the close. Two regression tests are added covering both paths: - poll_next_drains_buffered_headers_before_quic_close - poll_data_drains_buffered_body_before_quic_close The FakeRecv test helper is extended with `chunk_then_error` so the synthetic `StreamErrorIncoming::ConnectionErrorIncoming` can be queued after a chunk — modelling the coalesced-datagram race directly. --- h3/src/frame.rs | 135 ++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 119 insertions(+), 16 deletions(-) diff --git a/h3/src/frame.rs b/h3/src/frame.rs index 00bde628..7080d26a 100644 --- a/h3/src/frame.rs +++ b/h3/src/frame.rs @@ -70,7 +70,17 @@ where ); loop { - let end = self.try_recv(cx)?; + // Hoist the QUIC error out of `?` so that any bytes pulled + // into `self.stream.buf_mut()` on prior wakes get one last + // chance through the decoder. Without this hoist, a + // CONNECTION_CLOSE frame coalesced with stream data in the + // same recv batch wins the race and discards a fully- + // decodable HEADERS frame. + let (end, pending_quic_err) = match self.try_recv(cx) { + Poll::Ready(Ok(end)) => (Poll::Ready(end), None), + Poll::Pending => (Poll::Pending, None), + Poll::Ready(Err(e)) => (Poll::Ready(true), Some(e)), + }; return match self.decoder.decode(self.stream.buf_mut())? { Some(Frame::Data(PayloadLen(len))) => { @@ -82,11 +92,17 @@ where Poll::Ready(Ok(frame)) } Some(frame) => Poll::Ready(Ok(Some(frame))), - None => match end { + None => match (end, pending_quic_err) { + // Decoder couldn't make progress on buffered bytes + // and the underlying QUIC stream errored: surface + // the cached error now. Fixed-point: this branch + // terminates the loop because we consume the cached + // error in the same iteration we observed it. + (_, Some(err)) => Poll::Ready(Err(err)), // Received a chunk but frame is incomplete, poll until we get `Pending`. - Poll::Ready(false) => continue, - Poll::Pending => Poll::Pending, - Poll::Ready(true) => { + (Poll::Ready(false), None) => continue, + (Poll::Pending, None) => Poll::Pending, + (Poll::Ready(true), None) => { if self.stream.buf_mut().has_remaining() { // Reached the end of receive stream, but there is still some data: // The frame is incomplete. @@ -112,23 +128,32 @@ where return Poll::Ready(Ok(None)); }; - let end = match self.try_recv(cx) { - Poll::Ready(Ok(end)) => end, - Poll::Ready(Err(e)) => return Poll::Ready(Err(e)), - Poll::Pending => false, + // Mirror the hoist from poll_next: a QUIC-level error must not + // discard already-buffered body bytes for the current frame. + let (end, pending_quic_err) = match self.try_recv(cx) { + Poll::Ready(Ok(end)) => (end, None), + Poll::Ready(Err(e)) => (true, Some(e)), + Poll::Pending => (false, None), }; let data = self.stream.buf_mut().take_chunk(self.remaining_data); - match (data, end) { - (None, true) => Poll::Ready(Ok(None)), - (None, false) => Poll::Pending, - (Some(d), true) - if d.remaining() < self.remaining_data + match (data, end, pending_quic_err) { + // No more buffered body and QUIC errored: surface the error. + (None, _, Some(err)) => Poll::Ready(Err(err)), + (None, true, None) => Poll::Ready(Ok(None)), + (None, false, None) => Poll::Pending, + // Partial body for the current frame and the underlying + // stream / connection ended: caller MUST treat this as + // truncation. Same shape as the existing `Poll::Ready(true)` + // branch below — extended to also cover the QUIC error case. + (Some(d), end_or_err, pending) + if (end_or_err || pending.is_some()) + && d.remaining() < self.remaining_data && !self.stream.buf_mut().has_remaining() => { Poll::Ready(Err(FrameStreamError::UnexpectedEnd)) } - (Some(d), _) => { + (Some(d), _, _) => { self.remaining_data -= d.remaining(); Poll::Ready(Ok(Some(d))) } @@ -323,6 +348,7 @@ mod tests { use std::collections::VecDeque; use crate::proto::{coding::Encode, frame::FrameType, varint::VarInt}; + use crate::quic::ConnectionErrorIncoming; // Decoder @@ -436,6 +462,65 @@ mod tests { assert_poll_matches!(|cx| stream.poll_next(cx), Ok(Some(Frame::Headers(_)))); } + /// Regression: a QUIC CONNECTION_CLOSE that lands in the same recv + /// batch as a fully-decodable HEADERS frame must NOT discard the + /// frame. Pre-fix, `try_recv`'s `?` propagated the connection error + /// before `decoder.decode` got to consume the bytes already in + /// `BufRecvStream::buf`. Post-fix, the decoder runs once on the + /// buffered bytes and produces the frame; the error surfaces on + /// the next poll only if no frame can be parsed. + #[tokio::test] + async fn poll_next_drains_buffered_headers_before_quic_close() { + let mut recv = FakeRecv::default(); + let mut buf = BytesMut::with_capacity(64); + Frame::headers(&b"header"[..]).encode_with_payload(&mut buf); + // Headers chunk is fully buffered, then poll_data signals a + // connection-level error on the next poll — exactly the + // shape produced by an io_uring-coalesced datagram with + // STREAM(FIN) + CONNECTION_CLOSE(H3_NO_ERROR). + recv.chunk_then_error( + buf.freeze(), + StreamErrorIncoming::ConnectionErrorIncoming { + connection_error: ConnectionErrorIncoming::ApplicationClose { error_code: 0x100 }, + }, + ); + let mut stream: FrameStream<_, ()> = FrameStream::new(BufRecvStream::new(recv)); + + // First poll: drains buffered HEADERS frame instead of erroring. + assert_poll_matches!(|cx| stream.poll_next(cx), Ok(Some(Frame::Headers(_)))); + // Second poll: nothing left to decode, error surfaces. + assert_poll_matches!(|cx| stream.poll_next(cx), Err(FrameStreamError::Quic(_))); + } + + /// Regression: same shape as poll_next, but on the body path. A + /// connection error must not strand DATA frame body bytes that + /// were buffered before the error arrived. + #[tokio::test] + async fn poll_data_drains_buffered_body_before_quic_close() { + let mut recv = FakeRecv::default(); + let mut buf = BytesMut::with_capacity(64); + Frame::Data(Bytes::from("body")).encode_with_payload(&mut buf); + recv.chunk_then_error( + buf.freeze(), + StreamErrorIncoming::ConnectionErrorIncoming { + connection_error: ConnectionErrorIncoming::ApplicationClose { error_code: 0x100 }, + }, + ); + let mut stream: FrameStream<_, ()> = FrameStream::new(BufRecvStream::new(recv)); + + // First, poll_next gives us the DATA frame header. + assert_poll_matches!( + |cx| stream.poll_next(cx), + Ok(Some(Frame::Data(PayloadLen(4)))) + ); + // Then poll_data drains the body even though the underlying + // stream is now signaling a connection error. + assert_poll_matches!( + |cx| to_bytes(stream.poll_data(cx)), + Ok(Some(b)) if b.remaining() == 4 + ); + } + #[tokio::test] async fn poll_next_incomplete_frame() { let mut recv = FakeRecv::default(); @@ -595,6 +680,7 @@ mod tests { #[derive(Default)] struct FakeRecv { chunks: VecDeque, + pending_error: Option, } impl FakeRecv { @@ -602,6 +688,17 @@ mod tests { self.chunks.push_back(buf); self } + + /// Queue a chunk to be returned on a subsequent poll, followed + /// by a synthetic connection-level error. Models the recv batch + /// where stream data and CONNECTION_CLOSE land in the same + /// underlying read — the race this regression covers. + #[allow(dead_code)] + fn chunk_then_error(&mut self, buf: Bytes, err: StreamErrorIncoming) -> &mut Self { + self.chunks.push_back(buf); + self.pending_error = Some(err); + self + } } impl RecvStream for FakeRecv { @@ -611,7 +708,13 @@ mod tests { &mut self, _: &mut Context<'_>, ) -> Poll, StreamErrorIncoming>> { - Poll::Ready(Ok(self.chunks.pop_front())) + if let Some(chunk) = self.chunks.pop_front() { + return Poll::Ready(Ok(Some(chunk))); + } + match self.pending_error.take() { + Some(err) => Poll::Ready(Err(err)), + None => Poll::Ready(Ok(None)), + } } fn stop_sending(&mut self, _: u64) {