Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions actix-http/src/encoding/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ where
if let Some(mut encoder) = this.encoder.take() {
if chunk.len() < MAX_CHUNK_SIZE_ENCODE_IN_PLACE {
encoder.write(&chunk).map_err(EncoderError::Io)?;
encoder.flush().map_err(EncoderError::Io)?;
let chunk = encoder.take();
*this.encoder = Some(encoder);

Expand All @@ -217,6 +218,7 @@ where
} else {
*this.fut = Some(spawn_blocking(move || {
encoder.write(&chunk)?;
encoder.flush()?;
Ok(encoder)
}));
}
Expand Down Expand Up @@ -360,6 +362,26 @@ impl ContentEncoder {
}
}

/// Flush internal codec buffers so compressed bytes are available via [`take`](Self::take).
///
/// Calling this after every [`write`](Self::write) ensures streaming responses emit data
/// promptly instead of waiting until the stream ends.
fn flush(&mut self) -> Result<(), io::Error> {
match *self {
#[cfg(feature = "compress-brotli")]
ContentEncoder::Brotli(ref mut encoder) => encoder.flush(),

#[cfg(feature = "compress-gzip")]
ContentEncoder::Deflate(ref mut encoder) => encoder.flush(),

#[cfg(feature = "compress-gzip")]
ContentEncoder::Gzip(ref mut encoder) => encoder.flush(),

#[cfg(feature = "compress-zstd")]
ContentEncoder::Zstd(ref mut encoder) => encoder.flush(),
}
}

fn write(&mut self, data: &[u8]) -> Result<(), io::Error> {
match *self {
#[cfg(feature = "compress-brotli")]
Expand Down
46 changes: 46 additions & 0 deletions actix-web/tests/test_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,52 @@ async fn test_body_zstd_streaming() {
srv.stop().await;
}

/// Regression test for https://github.com/actix/actix-web/issues/3410
///
/// Compress middleware must flush each chunk immediately so streaming responses
/// deliver data progressively rather than buffering everything until the stream ends.
#[actix_rt::test]
#[cfg(feature = "compress-gzip")]
async fn test_compress_streaming_flushes_chunks() {
use futures_util::StreamExt as _;

let srv = actix_test::start_with(actix_test::config().h1(), || {
App::new()
.wrap(Compress::default())
.service(web::resource("/").route(web::get().to(|| async {
// Two-chunk stream: first chunk arrives immediately, second after 500ms.
// Without the flush fix both chunks arrive together after 500ms.
let s = futures_util::stream::once(async {
Ok::<_, std::io::Error>(Bytes::from("hello"))
})
.chain(futures_util::stream::once(async {
tokio::time::sleep(Duration::from_millis(500)).await;
Ok::<_, std::io::Error>(Bytes::from(" world"))
}));
HttpResponse::Ok().streaming(s)
})))
});

let mut res = srv
.get("/")
.no_decompress()
.append_header((header::ACCEPT_ENCODING, "gzip"))
.send()
.await
.unwrap();

assert_eq!(res.status(), StatusCode::OK);

// The first compressed chunk must arrive well before the 500ms delay.
let chunk = tokio::time::timeout(Duration::from_millis(200), res.next())
.await
.expect("first chunk must arrive before the 500ms stream delay (compress flush bug)")
.expect("stream should not be empty");
assert!(chunk.is_ok(), "chunk should not be an error");

srv.stop().await;
}

#[actix_rt::test]
async fn test_zstd_encoding() {
let srv = actix_test::start_with(actix_test::config().h1(), || {
Expand Down
Loading