From f8a864d0c895c84b400ec217994ed499c86b5aaf Mon Sep 17 00:00:00 2001 From: willrnch <1873880+willrnch@users.noreply.github.com> Date: Mon, 3 Nov 2025 14:01:39 +0100 Subject: [PATCH] permessage-deflate --- Cargo.lock | 7 ++++--- Cargo.toml | 1 + autobahn/Makefile | 6 +++--- rust-toolchain | 2 +- src/error.rs | 2 ++ src/fragment.rs | 4 +++- src/frame.rs | 41 +++++++++++++++++++++++++++++++++++++++++ src/lib.rs | 24 ++++++++++++++++++++++-- 8 files changed, 77 insertions(+), 10 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7025f80..14f3739 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1,6 +1,6 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. -version = 3 +version = 4 [[package]] name = "addr2line" @@ -409,6 +409,7 @@ dependencies = [ "http-body-util", "hyper", "hyper-util", + "miniz_oxide", "pin-project", "rand", "rustls-pemfile", @@ -726,9 +727,9 @@ checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" [[package]] name = "miniz_oxide" -version = "0.8.3" +version = "0.8.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b8402cab7aefae129c6977bb0ff1b8fd9a04eb5b51efc50a70bea51cda0c7924" +checksum = "1fa76a2c86f704bdb222d66965fb3d63269ce38518b83cb0575fca855ebb6316" dependencies = [ "adler2", ] diff --git a/Cargo.toml b/Cargo.toml index 13bb401..37ac357 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -54,6 +54,7 @@ utf-8 = "0.7.5" rand = "0.8.4" thiserror = "1.0.40" bytes = "1.5.0" +miniz_oxide = "0.8.9" # Axum integration axum-core = { version = "0.5.0", optional = true } diff --git a/autobahn/Makefile b/autobahn/Makefile index 9e20c4b..54c09e3 100644 --- a/autobahn/Makefile +++ b/autobahn/Makefile @@ -1,7 +1,7 @@ AUTOBAHN_TESTSUITE_DOCKER := crossbario/autobahn-testsuite:0.8.2@sha256:5d4ba3aa7d6ab2fdbf6606f3f4ecbe4b66f205ce1cbc176d6cdf650157e52242 build-server: - sudo cargo build --release --example echo_server --features "upgrade" + cargo build --release --example echo_server --features "upgrade" run-server: build-server echo ${PWD} @@ -18,7 +18,7 @@ run-server: build-server ../target/release/examples/echo_server build-client: - sudo cargo build --release --example autobahn_client --features "upgrade" + cargo build --release --example autobahn_client --features "upgrade" run-client: build-client echo ${PWD} @@ -34,4 +34,4 @@ run-client: build-client sleep 5 ../target/release/examples/autobahn_client -.PHONY: build-server run-server build-client run-client \ No newline at end of file +.PHONY: build-server run-server build-client run-client diff --git a/rust-toolchain b/rust-toolchain index 32a6ce3..8d39474 100644 --- a/rust-toolchain +++ b/rust-toolchain @@ -1 +1 @@ -1.76.0 +1.91.1 diff --git a/src/error.rs b/src/error.rs index 848116a..460703f 100644 --- a/src/error.rs +++ b/src/error.rs @@ -34,6 +34,8 @@ pub enum WebSocketError { InvalidSecWebsocketVersion, #[error("Invalid value")] InvalidValue, + #[error("Invalid encoding")] + InvalidEncoding, #[error("Sec-WebSocket-Key header is missing")] MissingSecWebSocketKey, #[error(transparent)] diff --git a/src/fragment.rs b/src/fragment.rs index faf260a..63e00a1 100644 --- a/src/fragment.rs +++ b/src/fragment.rs @@ -222,7 +222,7 @@ impl Fragments { if self.fragments.is_some() { return Err(WebSocketError::InvalidFragment); } - return Ok(Some(Frame::new(true, frame.opcode, None, frame.payload))); + return Ok(Some(Frame::new(true, frame.opcode, None, frame.payload, frame.compressed))); } else { self.fragments = match frame.opcode { OpCode::Text => match utf8::decode(&frame.payload) { @@ -292,6 +292,7 @@ impl Fragments { self.opcode, None, self.fragments.take().unwrap().take_buffer().into(), + false, ))); } } @@ -303,6 +304,7 @@ impl Fragments { self.opcode, None, self.fragments.take().unwrap().take_buffer().into(), + false, ))); } } diff --git a/src/frame.rs b/src/frame.rs index 7178e8b..bba4b57 100644 --- a/src/frame.rs +++ b/src/frame.rs @@ -14,11 +14,16 @@ use tokio::io::AsyncWriteExt; +use miniz_oxide::{MZFlush, MZStatus}; +use miniz_oxide::inflate::stream::{InflateState, inflate}; + use bytes::BytesMut; use core::ops::Deref; use crate::WebSocketError; +const TRAILER: [u8; 4] = [0x00, 0x00, 0xff, 0xff]; + macro_rules! repr_u8 { ($(#[$meta:meta])* $vis:vis enum $name:ident { $($(#[$vmeta:meta])* $vname:ident $(= $val:expr)?,)* @@ -136,6 +141,8 @@ pub struct Frame<'f> { mask: Option<[u8; 4]>, /// The payload of the frame. pub payload: Payload<'f>, + /// Is the frame payload compressed + pub compressed: bool, } const MAX_HEAD_SIZE: usize = 16; @@ -147,12 +154,14 @@ impl<'f> Frame<'f> { opcode: OpCode, mask: Option<[u8; 4]>, payload: Payload<'f>, + compressed: bool, ) -> Self { Self { fin, opcode, mask, payload, + compressed, } } @@ -167,6 +176,7 @@ impl<'f> Frame<'f> { opcode: OpCode::Text, mask: None, payload, + compressed: false, } } @@ -179,6 +189,7 @@ impl<'f> Frame<'f> { opcode: OpCode::Binary, mask: None, payload, + compressed: false, } } @@ -197,6 +208,7 @@ impl<'f> Frame<'f> { opcode: OpCode::Close, mask: None, payload: payload.into(), + compressed: false, } } @@ -211,6 +223,7 @@ impl<'f> Frame<'f> { opcode: OpCode::Close, mask: None, payload, + compressed: false, } } @@ -223,6 +236,7 @@ impl<'f> Frame<'f> { opcode: OpCode::Pong, mask: None, payload, + compressed: false, } } @@ -334,6 +348,33 @@ impl<'f> Frame<'f> { buf[size..size + len].copy_from_slice(&self.payload); &buf[..size + len] } + + pub fn inflate(&self, state: &mut InflateState) -> Result + { + let payload = [self.payload.to_vec().as_slice(), &TRAILER].concat(); + + let max_output_size = usize::max_value(); + let mut out: Vec = vec![0; payload.len().saturating_mul(2).min(max_output_size)]; + + let res = inflate(state, &payload, &mut out, MZFlush::None); + + if res.status != Ok(MZStatus::Ok) { + return Err(WebSocketError::InvalidEncoding); + } + + out.truncate(res.bytes_written); + + let payload = Payload::Owned(out); + + Ok(Self { + fin: self.fin, + opcode: self.opcode, + mask: self.mask, + payload, + compressed: false + }) + } + } repr_u8! { diff --git a/src/lib.rs b/src/lib.rs index eefc955..a1c0fa4 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -175,6 +175,9 @@ use tokio::io::AsyncReadExt; use tokio::io::AsyncWrite; use tokio::io::AsyncWriteExt; +use miniz_oxide::DataFormat; +use miniz_oxide::inflate::stream::InflateState; + pub use crate::close::CloseCode; pub use crate::error::WebSocketError; pub use crate::fragment::FragmentCollector; @@ -208,6 +211,8 @@ pub(crate) struct ReadHalf { writev_threshold: usize, max_message_size: usize, buffer: BytesMut, + + state: InflateState, } #[cfg(feature = "unstable-split")] @@ -364,6 +369,7 @@ pub struct WebSocket { stream: S, write_half: WriteHalf, read_half: ReadHalf, + } impl<'f, S> WebSocket { @@ -577,6 +583,8 @@ impl ReadHalf { pub fn after_handshake(role: Role) -> Self { let buffer = BytesMut::with_capacity(8192); + let state = InflateState::new(DataFormat::Raw); + Self { role, auto_apply_mask: true, @@ -585,6 +593,7 @@ impl ReadHalf { writev_threshold: 1024, max_message_size: 64 << 20, buffer, + state, } } @@ -610,6 +619,13 @@ impl ReadHalf { frame.unmask() }; + if frame.compressed { + frame = match frame.inflate(&mut self.state) { + Ok(frame) => frame, + Err(e) => return (Err(e), None), + } + } + match frame.opcode { OpCode::Close if self.auto_close => { match frame.payload.len() { @@ -681,7 +697,11 @@ impl ReadHalf { let rsv2 = self.buffer[0] & 0b00100000 != 0; let rsv3 = self.buffer[0] & 0b00010000 != 0; - if rsv1 || rsv2 || rsv3 { + let mut compressed = false; + + if rsv1 && !rsv2 && !rsv3 { + compressed = true; + } else if rsv1 || rsv2 || rsv3 { return Err(WebSocketError::ReservedBitsNotZero); } @@ -744,7 +764,7 @@ impl ReadHalf { // if we read too much it will stay in the buffer, for the next call to this method let payload = self.buffer.split_to(payload_len); - let frame = Frame::new(fin, opcode, mask, Payload::Bytes(payload)); + let frame = Frame::new(fin, opcode, mask, Payload::Bytes(payload), compressed); Ok(frame) } }