Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 119 additions & 16 deletions h3/src/frame.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,17 @@ where
);

loop {
let end = self.try_recv(cx)?;
// Hoist the QUIC error out of `?` so that any bytes pulled
// into `self.stream.buf_mut()` on prior wakes get one last
// chance through the decoder. Without this hoist, a
// CONNECTION_CLOSE frame coalesced with stream data in the
// same recv batch wins the race and discards a fully-
// decodable HEADERS frame.
let (end, pending_quic_err) = match self.try_recv(cx) {
Poll::Ready(Ok(end)) => (Poll::Ready(end), None),
Poll::Pending => (Poll::Pending, None),
Poll::Ready(Err(e)) => (Poll::Ready(true), Some(e)),
};

return match self.decoder.decode(self.stream.buf_mut())? {
Some(Frame::Data(PayloadLen(len))) => {
Expand All @@ -82,11 +92,17 @@ where
Poll::Ready(Ok(frame))
}
Some(frame) => Poll::Ready(Ok(Some(frame))),
None => match end {
None => match (end, pending_quic_err) {
// Decoder couldn't make progress on buffered bytes
// and the underlying QUIC stream errored: surface
// the cached error now. Fixed-point: this branch
// terminates the loop because we consume the cached
// error in the same iteration we observed it.
(_, Some(err)) => Poll::Ready(Err(err)),
// Received a chunk but frame is incomplete, poll until we get `Pending`.
Poll::Ready(false) => continue,
Poll::Pending => Poll::Pending,
Poll::Ready(true) => {
(Poll::Ready(false), None) => continue,
(Poll::Pending, None) => Poll::Pending,
(Poll::Ready(true), None) => {
if self.stream.buf_mut().has_remaining() {
// Reached the end of receive stream, but there is still some data:
// The frame is incomplete.
Expand All @@ -112,23 +128,32 @@ where
return Poll::Ready(Ok(None));
};

let end = match self.try_recv(cx) {
Poll::Ready(Ok(end)) => end,
Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
Poll::Pending => false,
// Mirror the hoist from poll_next: a QUIC-level error must not
// discard already-buffered body bytes for the current frame.
let (end, pending_quic_err) = match self.try_recv(cx) {
Poll::Ready(Ok(end)) => (end, None),
Poll::Ready(Err(e)) => (true, Some(e)),
Poll::Pending => (false, None),
};
let data = self.stream.buf_mut().take_chunk(self.remaining_data);

match (data, end) {
(None, true) => Poll::Ready(Ok(None)),
(None, false) => Poll::Pending,
(Some(d), true)
if d.remaining() < self.remaining_data
match (data, end, pending_quic_err) {
// No more buffered body and QUIC errored: surface the error.
(None, _, Some(err)) => Poll::Ready(Err(err)),
(None, true, None) => Poll::Ready(Ok(None)),
(None, false, None) => Poll::Pending,
// Partial body for the current frame and the underlying
// stream / connection ended: caller MUST treat this as
// truncation. Same shape as the existing `Poll::Ready(true)`
// branch below — extended to also cover the QUIC error case.
(Some(d), end_or_err, pending)
if (end_or_err || pending.is_some())
&& d.remaining() < self.remaining_data
&& !self.stream.buf_mut().has_remaining() =>
{
Poll::Ready(Err(FrameStreamError::UnexpectedEnd))
}
(Some(d), _) => {
(Some(d), _, _) => {
self.remaining_data -= d.remaining();
Poll::Ready(Ok(Some(d)))
}
Expand Down Expand Up @@ -323,6 +348,7 @@ mod tests {
use std::collections::VecDeque;

use crate::proto::{coding::Encode, frame::FrameType, varint::VarInt};
use crate::quic::ConnectionErrorIncoming;

// Decoder

Expand Down Expand Up @@ -436,6 +462,65 @@ mod tests {
assert_poll_matches!(|cx| stream.poll_next(cx), Ok(Some(Frame::Headers(_))));
}

/// Regression: a QUIC CONNECTION_CLOSE that lands in the same recv
/// batch as a fully-decodable HEADERS frame must NOT discard the
/// frame. Pre-fix, `try_recv`'s `?` propagated the connection error
/// before `decoder.decode` got to consume the bytes already in
/// `BufRecvStream::buf`. Post-fix, the decoder runs once on the
/// buffered bytes and produces the frame; the error surfaces on
/// the next poll only if no frame can be parsed.
#[tokio::test]
async fn poll_next_drains_buffered_headers_before_quic_close() {
let mut recv = FakeRecv::default();
let mut buf = BytesMut::with_capacity(64);
Frame::headers(&b"header"[..]).encode_with_payload(&mut buf);
// Headers chunk is fully buffered, then poll_data signals a
// connection-level error on the next poll — exactly the
// shape produced by an io_uring-coalesced datagram with
// STREAM(FIN) + CONNECTION_CLOSE(H3_NO_ERROR).
recv.chunk_then_error(
buf.freeze(),
StreamErrorIncoming::ConnectionErrorIncoming {
connection_error: ConnectionErrorIncoming::ApplicationClose { error_code: 0x100 },
},
);
let mut stream: FrameStream<_, ()> = FrameStream::new(BufRecvStream::new(recv));

// First poll: drains buffered HEADERS frame instead of erroring.
assert_poll_matches!(|cx| stream.poll_next(cx), Ok(Some(Frame::Headers(_))));
// Second poll: nothing left to decode, error surfaces.
assert_poll_matches!(|cx| stream.poll_next(cx), Err(FrameStreamError::Quic(_)));
}

/// Regression: same shape as poll_next, but on the body path. A
/// connection error must not strand DATA frame body bytes that
/// were buffered before the error arrived.
#[tokio::test]
async fn poll_data_drains_buffered_body_before_quic_close() {
let mut recv = FakeRecv::default();
let mut buf = BytesMut::with_capacity(64);
Frame::Data(Bytes::from("body")).encode_with_payload(&mut buf);
recv.chunk_then_error(
buf.freeze(),
StreamErrorIncoming::ConnectionErrorIncoming {
connection_error: ConnectionErrorIncoming::ApplicationClose { error_code: 0x100 },
},
);
let mut stream: FrameStream<_, ()> = FrameStream::new(BufRecvStream::new(recv));

// First, poll_next gives us the DATA frame header.
assert_poll_matches!(
|cx| stream.poll_next(cx),
Ok(Some(Frame::Data(PayloadLen(4))))
);
// Then poll_data drains the body even though the underlying
// stream is now signaling a connection error.
assert_poll_matches!(
|cx| to_bytes(stream.poll_data(cx)),
Ok(Some(b)) if b.remaining() == 4
);
}

#[tokio::test]
async fn poll_next_incomplete_frame() {
let mut recv = FakeRecv::default();
Expand Down Expand Up @@ -595,13 +680,25 @@ mod tests {
#[derive(Default)]
struct FakeRecv {
chunks: VecDeque<Bytes>,
pending_error: Option<StreamErrorIncoming>,
}

impl FakeRecv {
fn chunk(&mut self, buf: Bytes) -> &mut Self {
self.chunks.push_back(buf);
self
}

/// Queue a chunk to be returned on a subsequent poll, followed
/// by a synthetic connection-level error. Models the recv batch
/// where stream data and CONNECTION_CLOSE land in the same
/// underlying read — the race this regression covers.
#[allow(dead_code)]
fn chunk_then_error(&mut self, buf: Bytes, err: StreamErrorIncoming) -> &mut Self {
self.chunks.push_back(buf);
self.pending_error = Some(err);
self
}
}

impl RecvStream for FakeRecv {
Expand All @@ -611,7 +708,13 @@ mod tests {
&mut self,
_: &mut Context<'_>,
) -> Poll<Result<Option<Self::Buf>, StreamErrorIncoming>> {
Poll::Ready(Ok(self.chunks.pop_front()))
if let Some(chunk) = self.chunks.pop_front() {
return Poll::Ready(Ok(Some(chunk)));
}
match self.pending_error.take() {
Some(err) => Poll::Ready(Err(err)),
None => Poll::Ready(Ok(None)),
}
}

fn stop_sending(&mut self, _: u64) {
Expand Down
Loading