Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
e96d5cd
perf: SIMD unmask + bypass hyper in echo server, fast-path APIs
divybot May 22, 2026
296688a
style: apply cargo fmt
divybot May 22, 2026
dc4a978
fix(example): restore FragmentCollector wrap in echo_server
divybot May 22, 2026
c80460e
perf: revert 64KiB read buffer to 8KiB after empirical regression
divybot May 22, 2026
2f022d0
perf(example): SO_REUSEPORT + per-worker shard runtime
divybot May 22, 2026
e11a8ee
perf(core): Frame::unmask clears self.mask; FragmentCollector pass-th…
divybot May 22, 2026
592da97
perf(examples): mio-driven single-thread echo using fastwebsockets core
divybot May 22, 2026
c8ecd9e
fix(examples): echo_server_mio Linux-only via mod stub for non-Linux CI
divybot May 22, 2026
1dd2878
perf(examples/echo_server_mio): one recv per event, drop WouldBlock loop
divybot May 22, 2026
390b50c
perf(examples/echo_server_mio): in-place response synthesis, single w…
divybot May 22, 2026
a379fb6
perf(examples/echo_server_mio): shared scratch buffer, all five cases…
divybot May 22, 2026
bb1ca34
feat(core): public sync `parse_header` for callback-style frameworks
divybot May 22, 2026
32c99b4
feat(core): ServerEngine — non-async framing engine with thin tokio a…
divybot May 22, 2026
7c86798
style: apply cargo fmt to echo_server_tokio_fast
divybot May 22, 2026
6763d2c
feat(core): zero-copy outbound API + use it in the tokio adapter
divybot May 22, 2026
5978bbf
perf(examples/tokio_fast): try_write for single segment, skip writev …
divybot May 22, 2026
b8ad742
feat(core): public mio-driven Reactor for many-connection workloads
divybot May 22, 2026
cdc7252
style: cargo fmt src/reactor.rs
divybot May 22, 2026
e825bcd
feat(reactor): general Handler/Connection API + cross-thread Sender
divybot May 22, 2026
a14f4ba
feat(reactor): add_session_with_prefix + guaranteed on_open
divybot May 22, 2026
5945432
docs(reactor): document Deno-style embedding direction
divybot May 23, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

57 changes: 57 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,46 @@ name = "echo_server"
path = "examples/echo_server.rs"
required-features = ["upgrade"]

[[example]]
name = "echo_server_low"
path = "examples/echo_server_low.rs"
required-features = ["upgrade"]

# mio-driven echo server (Linux only) — tests whether the single-thread
# gap to uWebSockets is in WebSocket framing/parsing or in Tokio/futures
# runtime overhead. Uses fastwebsockets::ServerEngine for the framing.
[[example]]
name = "echo_server_mio"
path = "examples/echo_server_mio.rs"
required-features = ["upgrade"]

# Tokio-based echo server using fastwebsockets::ServerEngine for the
# per-frame hot path. Same async transport (TcpStream + hyper upgrade)
# that the standard `echo_server` example uses, but the framing/unmask/
# response synthesis runs synchronously inside the engine. This is the
# "Deno-friendly" fast path.
[[example]]
name = "echo_server_tokio_fast"
path = "examples/echo_server_tokio_fast.rs"
required-features = ["upgrade"]

# Bench-shape demo of the public `crate::reactor::Reactor` API.
# Pure echo via `Reactor::run_echo()`; this is the binary that the
# uWebSockets head-to-head benchmark targets. Linux-only.
[[example]]
name = "echo_server_reactor"
path = "examples/echo_server_reactor.rs"
required-features = ["reactor"]

# End-to-end demo of the `Reactor` general API: Handler trait
# (on_open / on_frame / on_close), Connection.send / .close, and
# the cross-thread Sender (queued commands + waker). Implements a
# broadcast chat broker. Linux-only.
[[example]]
name = "reactor_chat_broker"
path = "examples/reactor_chat_broker.rs"
required-features = ["reactor"]

[[example]]
name = "autobahn_client"
path = "examples/autobahn_client.rs"
Expand Down Expand Up @@ -60,6 +100,14 @@ axum-core = { version = "0.5.0", optional = true }
http = { version = "1", optional = true }
async-trait = { version = "0.1", optional = true }

# Linux mio-driven reactor (opt-in via the `reactor` feature). Wraps
# many WebSocket sessions on one thread / one event loop, sharing one
# scratch buffer — the framing path that closes the high-fd / high-
# payload gap to uWebSockets without spinning per-connection tokio
# tasks. See `src/reactor.rs` and `examples/echo_server_reactor.rs`.
mio = { version = "1.0", features = ["net", "os-poll"], optional = true }
slab = { version = "0.4", optional = true }

[features]
default = ["simd"]
upgrade = [
Expand All @@ -74,6 +122,8 @@ simd = ["simdutf8"]
unstable-split = []
# Axum integration
with_axum = ["axum-core", "http", "async-trait"]
# Linux mio-driven server-side reactor. See `crate::reactor`.
reactor = ["mio", "slab", "base64", "sha1"]

[dev-dependencies]
tokio = { version = "1.25.0", features = ["full", "macros"] }
Expand All @@ -89,6 +139,13 @@ anyhow = "1.0.71"
webpki-roots = "0.23.0"
bytes = "1.4.0"
axum = "0.8.1"
# Used by examples/echo_server.rs to set SO_REUSEPORT on per-worker listener
# sockets when FWS_WORKERS > 1. Tokio's TcpListener::bind does not expose
# SO_REUSEPORT; we build the socket via socket2 and convert.
socket2 = "0.5"
mio = { version = "1.0", features = ["net", "os-poll"] }
slab = "0.4"
libc = "0.2"

[[test]]
name = "upgrade"
Expand Down
18 changes: 9 additions & 9 deletions benches/unmask.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
use criterion::*;

fn benchmark(c: &mut Criterion) {
const STREAM_SIZE: usize = 64 << 20;

let mut data: Vec<u8> = (0..STREAM_SIZE).map(|_| rand::random()).collect();
let mut group = c.benchmark_group("unmask2");
group.throughput(Throughput::Bytes(STREAM_SIZE as u64));
group.bench_function("unmask 64 << 20", |b| {
b.iter(|| {
fastwebsockets::unmask(black_box(&mut data), [1, 2, 3, 4]);
let mut group = c.benchmark_group("unmask");
for &size in &[64usize, 1024, 16 * 1024, 64 << 20] {
let mut data: Vec<u8> = (0..size).map(|_| rand::random()).collect();
group.throughput(Throughput::Bytes(size as u64));
group.bench_function(format!("len={}", size), |b| {
b.iter(|| {
fastwebsockets::unmask(black_box(&mut data), [1, 2, 3, 4]);
});
});
});
}
group.finish();
}

Expand Down
136 changes: 120 additions & 16 deletions examples/echo_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@
// limitations under the License.

use fastwebsockets::upgrade;
use fastwebsockets::FragmentCollector;
use fastwebsockets::OpCode;
use fastwebsockets::Role;
use fastwebsockets::WebSocket;
use fastwebsockets::WebSocketError;
use http_body_util::Empty;
use hyper::body::Bytes;
Expand All @@ -22,11 +25,19 @@ use hyper::server::conn::http1;
use hyper::service::service_fn;
use hyper::Request;
use hyper::Response;
use hyper_util::rt::TokioIo;
use tokio::net::TcpListener;
use tokio::net::TcpStream;

async fn handle_client(fut: upgrade::UpgradeFut) -> Result<(), WebSocketError> {
let mut ws = fastwebsockets::FragmentCollector::new(fut.await?);

async fn echo_loop<S>(ws: WebSocket<S>) -> Result<(), WebSocketError>
where
S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin,
{
// The bench load_test.c never fragments, but the Autobahn suite does and
// expects cross-fragment UTF-8 validation. Wrap with FragmentCollector so
// the example stays protocol-compliant; FragmentCollector is a thin
// pass-through for non-fragmented frames (one match per frame).
let mut ws = FragmentCollector::new(ws);
loop {
let frame = ws.read_frame().await?;
match frame.opcode {
Expand All @@ -37,9 +48,47 @@ async fn handle_client(fut: upgrade::UpgradeFut) -> Result<(), WebSocketError> {
_ => {}
}
}
Ok(())
}

async fn handle_client(fut: upgrade::UpgradeFut) -> Result<(), WebSocketError> {
// Drive hyper's upgrade future, then downcast to the underlying TcpStream so
// the steady-state echo loop runs without hyper's read-buffer + trait-object
// indirection on every read/write.
let upgraded = fut.upgraded().await?;
match upgraded.downcast::<TokioIo<TcpStream>>() {
Ok(parts) => {
// hyper may have buffered bytes the client sent right after the upgrade
// request. Carry them into the WebSocket's framing buffer.
let stream = parts.io.into_inner();
let _ = stream.set_nodelay(true);
let ws = WebSocket::after_handshake_with_buffer(
stream,
Role::Server,
&parts.read_buf,
);
echo_loop(ws).await
}
Err(upgraded) => {
// Some other transport (TLS, h2c) — fall back to the generic path.
let ws = WebSocket::after_handshake(TokioIo::new(upgraded), Role::Server);
echo_loop(ws).await
}
}
}

async fn handle_client_tcp(stream: TcpStream) -> Result<(), WebSocketError> {
let _ = stream.set_nodelay(true);
let io = TokioIo::new(stream);
let conn_fut = http1::Builder::new()
.serve_connection(io, service_fn(server_upgrade))
.with_upgrades();
if let Err(e) = conn_fut.await {
eprintln!("An error occurred: {:?}", e);
}
Ok(())
}

async fn server_upgrade(
mut req: Request<Incoming>,
) -> Result<Response<Empty<Bytes>>, WebSocketError> {
Expand All @@ -54,27 +103,82 @@ async fn server_upgrade(
Ok(response)
}

fn main() -> Result<(), WebSocketError> {
fn make_reuseport_listener(addr: &str) -> std::io::Result<TcpListener> {
use socket2::{Domain, Protocol, Socket, Type};
let parsed: std::net::SocketAddr = addr.parse().map_err(|e| {
std::io::Error::new(
std::io::ErrorKind::InvalidInput,
format!("bad addr: {}", e),
)
})?;
let domain = if parsed.is_ipv6() {
Domain::IPV6
} else {
Domain::IPV4
};
let sock = Socket::new(domain, Type::STREAM, Some(Protocol::TCP))?;
sock.set_reuse_address(true)?;
#[cfg(any(target_os = "linux", target_os = "freebsd"))]
sock.set_reuse_port(true)?;
sock.set_nonblocking(true)?;
sock.bind(&parsed.into())?;
sock.listen(1024)?;
TcpListener::from_std(sock.into())
}

fn run_worker(
worker_id: usize,
addr: String,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_io()
.build()
.unwrap();

.build()?;
rt.block_on(async move {
let listener = TcpListener::bind("127.0.0.1:8080").await?;
println!("Server started, listening on {}", "127.0.0.1:8080");
let listener = make_reuseport_listener(&addr)?;
eprintln!("[worker {}] listening on {}", worker_id, addr);
loop {
let (stream, _) = listener.accept().await?;
println!("Client connected");
tokio::spawn(async move {
let io = hyper_util::rt::TokioIo::new(stream);
let conn_fut = http1::Builder::new()
.serve_connection(io, service_fn(server_upgrade))
.with_upgrades();
if let Err(e) = conn_fut.await {
println!("An error occurred: {:?}", e);
if let Err(e) = handle_client_tcp(stream).await {
eprintln!("connection error: {}", e);
}
});
}
})
}

fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let workers = std::env::var("FWS_WORKERS")
.ok()
.and_then(|s| s.parse::<usize>().ok())
.unwrap_or(1);

let addr =
std::env::var("FWS_ADDR").unwrap_or_else(|_| "127.0.0.1:8080".to_string());

if workers <= 1 {
return run_worker(0, addr).map_err(|e| e.into());
}

// Multi-worker: each thread runs its own current_thread runtime and binds
// a SO_REUSEPORT listener on the same port. The kernel load-balances
// accept() across the listeners, so each connection lives entirely inside
// one worker (no cross-thread task migration). This is the same model
// uWebSockets recommends for scaling beyond one core.
let mut handles = Vec::with_capacity(workers);
for i in 0..workers {
let addr = addr.clone();
let h = std::thread::Builder::new()
.name(format!("fws-worker-{}", i))
.spawn(move || {
if let Err(e) = run_worker(i, addr) {
eprintln!("[worker {}] exiting: {}", i, e);
}
})?;
handles.push(h);
}
for h in handles {
let _ = h.join();
}
Ok(())
}
Loading
Loading