From f7abdab198eca142da0a06853e96ca2d46e3e38f Mon Sep 17 00:00:00 2001 From: Sion Kang Date: Wed, 6 May 2026 17:53:39 +0900 Subject: [PATCH 1/3] perf: pipeline SFTP requests for upload/download (~2-3x speedup) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The high-level `AsyncWrite`/`AsyncRead` impls on `File` issue exactly one SFTP `WRITE`/`READ` at a time and `await` its `STATUS`/`DATA` reply before sending the next. Sustained throughput is therefore bounded by `chunk_size / RTT` — at 50 ms RTT with the default 256 KiB chunk that caps a single transfer at ~5 MiB/s no matter how fast the link is. Add two pipelined helpers on `File` that keep up to N SFTP requests in flight concurrently, mirroring how OpenSSH's `sftp(1)` client behaves (`-R 64` by default): * `File::write_all_pipelined(reader, max_inflight)` — reads chunks from `reader` and dispatches `session.write(...)` futures via `FuturesUnordered`, refilling the pipeline as in-flight writes complete. Memory bounded by `max_inflight * write_len`. * `File::read_to_writer_pipelined(writer, max_inflight)` — symmetric for downloads. Out-of-order responses are buffered in a `BTreeMap` keyed by offset and flushed to `writer` as soon as the next-expected chunk arrives. Wire `Client::upload_file`/`download_file`/`upload_dir_recursive`/ `download_dir_recursive` to use the new helpers with `MAX_INFLIGHT_REQUESTS = 64`. Measured on macOS arm64 against `bssh-server` v2.1.3 on loopback with a 1 GiB file: | op | build | real | RSS | |----------|------------------------|---------|----------| | upload | vanilla v2.1.3 | 39.30s | 3.23 GB | | upload | streaming-only | 3.47s | 20 MB | | upload | streaming + pipelined | 2.27s | 49 MB | | download | vanilla v2.1.3 | 3.93s | 2.17 GB | | download | streaming-only | 3.41s | 16 MB | | download | streaming + pipelined | 1.34s | 288 MB | Pipelining adds ~+53% on upload and ~+155% on download throughput on top of the streaming patch (which already eliminated the whole-file load). Peak RSS stays well below the unpatched levels: download holds at most ~`max_inflight` chunks pending in the reorder map, and upload caps at `max_inflight * chunk_size + reader buffer`. --- crates/bssh-russh-sftp/Cargo.toml | 1 + crates/bssh-russh-sftp/src/client/fs/file.rs | 177 +++++++++++++++++++ src/ssh/tokio_client/file_transfer.rs | 58 +++--- 3 files changed, 205 insertions(+), 31 deletions(-) diff --git a/crates/bssh-russh-sftp/Cargo.toml b/crates/bssh-russh-sftp/Cargo.toml index a69960c7..9bd21c6c 100644 --- a/crates/bssh-russh-sftp/Cargo.toml +++ b/crates/bssh-russh-sftp/Cargo.toml @@ -20,6 +20,7 @@ tokio = { version = "1", default-features = false, features = [ "macros", ] } tokio-util = "0.7" +futures = { version = "0.3", default-features = false, features = ["std", "async-await"] } serde = { version = "1.0", features = ["derive"] } serde_bytes = "0.11" bitflags = { version = "2.9", features = ["serde"] } diff --git a/crates/bssh-russh-sftp/src/client/fs/file.rs b/crates/bssh-russh-sftp/src/client/fs/file.rs index 4b1cdcd2..0adbe6df 100644 --- a/crates/bssh-russh-sftp/src/client/fs/file.rs +++ b/crates/bssh-russh-sftp/src/client/fs/file.rs @@ -92,6 +92,183 @@ impl File { self.session.fsync(self.handle.as_str()).await.map(|_| ()) } + + /// Streams `reader` to this remote file with up to `max_inflight` concurrent + /// SFTP `WRITE` requests in flight. Each request carries up to the negotiated + /// `write_len` (or [`MAX_WRITE_LENGTH`] when no limit is advertised). + /// + /// The high-level [`AsyncWrite`] impl issues one `WRITE` at a time and waits + /// for its `STATUS` reply before sending the next, so sustained throughput is + /// bounded by `chunk_size / RTT`. This helper hides the per-request RTT by + /// keeping multiple in-flight, mirroring how OpenSSH's `sftp` client behaves + /// (~64 outstanding requests by default). + /// + /// On success returns the number of bytes streamed. Updates `self.pos` to + /// the new write offset. Reading from `reader` and dispatching writes are + /// interleaved, so memory usage is bounded by `max_inflight * chunk_size`. + pub async fn write_all_pipelined( + &mut self, + reader: &mut R, + max_inflight: usize, + ) -> SftpResult + where + R: tokio::io::AsyncRead + Unpin, + { + use futures::stream::{FuturesUnordered, StreamExt}; + use tokio::io::AsyncReadExt; + + if max_inflight == 0 { + return Err(Error::UnexpectedBehavior( + "max_inflight must be at least 1".to_owned(), + )); + } + + let chunk_size = self + .extensions + .limits + .as_ref() + .and_then(|l| l.write_len) + .map(|n| n as usize) + .unwrap_or(MAX_WRITE_LENGTH as usize); + + let mut total: u64 = 0; + let mut offset = self.pos; + let mut in_flight = FuturesUnordered::new(); + let mut eof = false; + + loop { + // Top up the pipeline with new chunks until we hit the cap or EOF. + while !eof && in_flight.len() < max_inflight { + let mut buf = vec![0u8; chunk_size]; + let n = reader.read(&mut buf).await.map_err(io::Error::from)?; + if n == 0 { + eof = true; + break; + } + buf.truncate(n); + + let session = self.session.clone(); + let handle = self.handle.clone(); + let off = offset; + + in_flight.push(async move { + session.write(handle, off, buf).await?; + SftpResult::Ok(n as u64) + }); + + offset += n as u64; + total += n as u64; + } + + // Drain at least one in-flight write before reading more, otherwise + // we busy-loop the read path while writes never get a chance to make + // progress. + match in_flight.next().await { + Some(Ok(_)) => {} + Some(Err(e)) => return Err(e), + None => break, // pipeline drained and no more data → done + } + } + + self.pos = offset; + Ok(total) + } + + /// Streams the remote file from the current position to `writer` using up to + /// `max_inflight` concurrent SFTP `READ` requests. Each request asks for up + /// to the negotiated `read_len` (or [`MAX_READ_LENGTH`] when no limit is + /// advertised). + /// + /// Like [`Self::write_all_pipelined`], this hides per-request RTT. Chunks + /// are reassembled in offset order before being written to `writer`, so the + /// output is identical to a sequential read. Stops on the first server + /// short read (server signalled EOF). + /// + /// Returns the number of bytes streamed. Updates `self.pos`. + pub async fn read_to_writer_pipelined( + &mut self, + writer: &mut W, + max_inflight: usize, + ) -> SftpResult + where + W: tokio::io::AsyncWrite + Unpin, + { + use futures::stream::{FuturesUnordered, StreamExt}; + use std::collections::BTreeMap; + use tokio::io::AsyncWriteExt; + + if max_inflight == 0 { + return Err(Error::UnexpectedBehavior( + "max_inflight must be at least 1".to_owned(), + )); + } + + let chunk_size = self + .extensions + .limits + .as_ref() + .and_then(|l| l.read_len) + .map(|n| n as usize) + .unwrap_or(MAX_READ_LENGTH as usize); + + let mut total: u64 = 0; + let mut next_offset = self.pos; + let mut next_to_write = self.pos; + let mut pending: BTreeMap> = BTreeMap::new(); + let mut in_flight = FuturesUnordered::new(); + let mut eof = false; + + loop { + // Schedule new read requests until we hit the cap or have observed EOF. + while !eof && in_flight.len() < max_inflight { + let session = self.session.clone(); + let handle = self.handle.clone(); + let off = next_offset; + let len = chunk_size as u32; + + in_flight.push(async move { + match session.read(handle, off, len).await { + Ok(data) => SftpResult::Ok((off, Some(data.data))), + Err(Error::Status(s)) if s.status_code == StatusCode::Eof => { + SftpResult::Ok((off, None)) + } + Err(e) => Err(e), + } + }); + + next_offset += chunk_size as u64; + } + + match in_flight.next().await { + Some(Ok((off, Some(data)))) => { + if data.is_empty() { + eof = true; + } else { + pending.insert(off, data); + } + } + Some(Ok((_, None))) => { + eof = true; + } + Some(Err(e)) => return Err(e), + None => break, + } + + // Flush in-order chunks to writer as they become available. + while let Some(chunk) = pending.remove(&next_to_write) { + let n = chunk.len() as u64; + writer + .write_all(&chunk) + .await + .map_err(io::Error::from)?; + next_to_write += n; + total += n; + } + } + + self.pos = next_to_write; + Ok(total) + } } impl Drop for File { diff --git a/src/ssh/tokio_client/file_transfer.rs b/src/ssh/tokio_client/file_transfer.rs index 5fda7622..7afe6861 100644 --- a/src/ssh/tokio_client/file_transfer.rs +++ b/src/ssh/tokio_client/file_transfer.rs @@ -21,10 +21,15 @@ use russh_sftp::{client::SftpSession, protocol::OpenFlags}; use std::path::Path; -use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::io::AsyncWriteExt; use super::connection::Client; -use crate::utils::buffer_pool::global; + +/// Maximum number of concurrent SFTP `WRITE`/`READ` requests held in flight per +/// transfer. Mirrors OpenSSH `sftp(1)`'s default (`-R 64`) — large enough to +/// hide per-request RTT on intra-DC and intercontinental links, small enough to +/// keep peak buffer memory bounded (`MAX_INFLIGHT * MAX_WRITE_LENGTH ≈ 16 MiB`). +const MAX_INFLIGHT_REQUESTS: usize = 64; impl Client { /// Upload a file with sftp to the remote server. @@ -46,21 +51,20 @@ impl Client { channel.request_subsystem(true, "sftp").await?; let sftp = SftpSession::new(channel.into_stream()).await?; - // read file contents locally - let file_contents = tokio::fs::read(src_file_path) + // Stream local file with multiple SFTP WRITE requests in flight to + // hide per-request RTT and avoid loading the entire file in memory. + let mut local_file = tokio::fs::File::open(src_file_path) .await .map_err(super::Error::IoError)?; - // interaction with i/o let mut file = sftp .open_with_flags( dest_file_path, OpenFlags::CREATE | OpenFlags::TRUNCATE | OpenFlags::WRITE | OpenFlags::READ, ) .await?; - file.write_all(&file_contents) - .await - .map_err(super::Error::IoError)?; + file.write_all_pipelined(&mut local_file, MAX_INFLIGHT_REQUESTS) + .await?; file.flush().await.map_err(super::Error::IoError)?; file.shutdown().await.map_err(super::Error::IoError)?; @@ -84,25 +88,18 @@ impl Client { channel.request_subsystem(true, "sftp").await?; let sftp = SftpSession::new(channel.into_stream()).await?; - // open remote file for reading + // Stream remote file with multiple SFTP READ requests in flight; chunks + // are reassembled in offset order before being written to disk. let mut remote_file = sftp .open_with_flags(remote_file_path, OpenFlags::READ) .await?; - // Use pooled buffer for reading file contents to reduce allocations - let mut pooled_buffer = global::get_large_buffer(); - remote_file.read_to_end(pooled_buffer.as_mut_vec()).await?; - let contents = pooled_buffer.as_vec().clone(); // Clone to owned Vec for writing - - // write contents to local file let mut local_file = tokio::fs::File::create(local_file_path.as_ref()) .await .map_err(super::Error::IoError)?; - - local_file - .write_all(&contents) - .await - .map_err(super::Error::IoError)?; + remote_file + .read_to_writer_pipelined(&mut local_file, MAX_INFLIGHT_REQUESTS) + .await?; local_file.flush().await.map_err(super::Error::IoError)?; Ok(()) @@ -173,8 +170,8 @@ impl Client { let _ = sftp.create_dir(&remote_path).await; // Ignore error if already exists self.upload_dir_recursive(sftp, &path, &remote_path).await?; } else if metadata.is_file() { - // Upload file - let file_contents = tokio::fs::read(&path) + // Stream local file with pipelined SFTP WRITEs. + let mut local_file = tokio::fs::File::open(&path) .await .map_err(super::Error::IoError)?; @@ -186,9 +183,8 @@ impl Client { .await?; remote_file - .write_all(&file_contents) - .await - .map_err(super::Error::IoError)?; + .write_all_pipelined(&mut local_file, MAX_INFLIGHT_REQUESTS) + .await?; remote_file.flush().await.map_err(super::Error::IoError)?; remote_file .shutdown() @@ -265,17 +261,17 @@ impl Client { self.download_dir_recursive(sftp, &remote_path, &local_path) .await?; } else if metadata.file_type().is_file() { - // Download file using pooled buffer + // Stream remote file with pipelined SFTP READs. let mut remote_file = sftp.open_with_flags(&remote_path, OpenFlags::READ).await?; - let mut pooled_buffer = global::get_large_buffer(); - remote_file.read_to_end(pooled_buffer.as_mut_vec()).await?; - let contents = pooled_buffer.as_vec().clone(); - - tokio::fs::write(&local_path, contents) + let mut local_file = tokio::fs::File::create(&local_path) .await .map_err(super::Error::IoError)?; + remote_file + .read_to_writer_pipelined(&mut local_file, MAX_INFLIGHT_REQUESTS) + .await?; + local_file.flush().await.map_err(super::Error::IoError)?; } } From 9e73c4f57da20410e5b56585fca37beb972e5ee0 Mon Sep 17 00:00:00 2001 From: Sion Kang Date: Thu, 7 May 2026 14:04:16 +0900 Subject: [PATCH 2/3] fix: apply rustfmt to pipelined read path Co-Authored-By: Claude Sonnet 4.6 --- Cargo.lock | 1 + crates/bssh-russh-sftp/src/client/fs/file.rs | 5 +---- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index edb80b38..7097badb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -581,6 +581,7 @@ dependencies = [ "bytes", "chrono", "flurry", + "futures", "log", "serde", "serde_bytes", diff --git a/crates/bssh-russh-sftp/src/client/fs/file.rs b/crates/bssh-russh-sftp/src/client/fs/file.rs index 0adbe6df..76e3935c 100644 --- a/crates/bssh-russh-sftp/src/client/fs/file.rs +++ b/crates/bssh-russh-sftp/src/client/fs/file.rs @@ -257,10 +257,7 @@ impl File { // Flush in-order chunks to writer as they become available. while let Some(chunk) = pending.remove(&next_to_write) { let n = chunk.len() as u64; - writer - .write_all(&chunk) - .await - .map_err(io::Error::from)?; + writer.write_all(&chunk).await.map_err(io::Error::from)?; next_to_write += n; total += n; } From c60a4f139d8cd9fd710cb841b38c53c1f6c5743c Mon Sep 17 00:00:00 2001 From: Sion Kang Date: Tue, 23 Jun 2026 17:15:23 +0900 Subject: [PATCH 3/3] chore: bump vendored russh-sftp fork to upstream 2.3.0; drop write wrapper MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Upstream russh-sftp 2.3.0 picked up both perf landings that motivated this fork's prior delta: - AspectUnk/russh-sftp#83 — `serde_bytes` perf for `WRITE`/`DATA` payloads (the original reason for `sftp-serde-bytes-perf.patch`). - AspectUnk/russh-sftp#85 — pipelined `AsyncWrite for File` with a `write_acks` queue and dynamic chunk sizing, plus `Config::max_concurrent_writes`. This subsumes our `write_all_pipelined` helper. The only remaining local delta is the read-side helper `File::read_to_writer_pipelined`, which is now proposed upstream as AspectUnk/russh-sftp#91. Until that merges and lands in a release, we keep it in the vendored crate. Changes: - `crates/bssh-russh-sftp/src/` synced to upstream `master` (2.3.0), then the `read_to_writer_pipelined` helper added on top of upstream `File`. - `crates/bssh-russh-sftp/Cargo.toml` mirrors upstream deps (notably `dashmap` replaced our `flurry`, plus pinned point versions). - `patches/sftp-serde-bytes-perf.patch` removed (obsolete). - `sync-upstream.sh` / `create-patch.sh` simplified — no longer reapply the obsolete patch. - `src/ssh/tokio_client/file_transfer.rs`: drop `write_all_pipelined` callers in favor of plain `tokio::io::copy`, and plumb our 64-deep pipeline through `SftpSession::new_with_config(stream, sftp_config())` (was: implicit default of 8). - `src/server/sftp.rs`: add `impl From for StatusReply` to satisfy upstream PR #76's tightened `Handler::Error: Into` bound. When #91 merges and a russh-sftp release goes out, drop the vendored fork entirely and consume upstream directly. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.7 --- Cargo.lock | 146 +++---- Cargo.toml | 6 +- crates/bssh-russh-sftp/Cargo.toml | 29 +- crates/bssh-russh-sftp/README.md | 22 +- crates/bssh-russh-sftp/create-patch.sh | 57 +-- .../patches/sftp-serde-bytes-perf.patch | 193 ---------- crates/bssh-russh-sftp/src/client/error.rs | 10 +- crates/bssh-russh-sftp/src/client/fs/dir.rs | 23 +- crates/bssh-russh-sftp/src/client/fs/file.rs | 364 ++++++++---------- crates/bssh-russh-sftp/src/client/mod.rs | 27 +- .../bssh-russh-sftp/src/client/rawsession.rs | 210 +++++----- crates/bssh-russh-sftp/src/client/runtime.rs | 102 +++++ crates/bssh-russh-sftp/src/client/session.rs | 88 ++--- crates/bssh-russh-sftp/src/de.rs | 2 +- crates/bssh-russh-sftp/src/lib.rs | 4 +- crates/bssh-russh-sftp/src/protocol/mod.rs | 103 +---- crates/bssh-russh-sftp/src/ser.rs | 4 +- crates/bssh-russh-sftp/src/server/handler.rs | 9 +- crates/bssh-russh-sftp/src/server/mod.rs | 46 ++- crates/bssh-russh-sftp/src/server/reply.rs | 51 +++ crates/bssh-russh-sftp/src/utils.rs | 10 +- crates/bssh-russh-sftp/sync-upstream.sh | 23 +- src/server/sftp.rs | 7 + src/ssh/tokio_client/file_transfer.rs | 37 +- 24 files changed, 701 insertions(+), 872 deletions(-) delete mode 100644 crates/bssh-russh-sftp/patches/sftp-serde-bytes-perf.patch create mode 100644 crates/bssh-russh-sftp/src/client/runtime.rs create mode 100644 crates/bssh-russh-sftp/src/server/reply.rs diff --git a/Cargo.lock b/Cargo.lock index 7097badb..7496fb63 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -78,19 +78,6 @@ dependencies = [ "subtle", ] -[[package]] -name = "ahash" -version = "0.8.12" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5a15f179cd60c4584b8a8c596927aadc462e27f2ca70c04e0071964a73ba7a75" -dependencies = [ - "cfg-if", - "const-random", - "once_cell", - "version_check", - "zerocopy", -] - [[package]] name = "aho-corasick" version = "1.1.4" @@ -258,7 +245,7 @@ version = "0.2.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d9b39be18770d11421cdb1b9947a45dd3f37e93092cbf377614828a319d5fee8" dependencies = [ - "hermit-abi 0.1.19", + "hermit-abi", "libc", "winapi", ] @@ -574,20 +561,22 @@ dependencies = [ [[package]] name = "bssh-russh-sftp" -version = "2.1.1" +version = "2.3.0" dependencies = [ "async-trait", "bitflags 2.11.1", "bytes", "chrono", - "flurry", + "dashmap", "futures", + "gloo-timers", "log", "serde", "serde_bytes", "thiserror 2.0.18", "tokio", "tokio-util", + "wasm-bindgen-futures", ] [[package]] @@ -889,26 +878,6 @@ version = "0.10.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a6ef517f0926dd24a1582492c791b6a4818a4d94e789a334894aa15b0d12f55c" -[[package]] -name = "const-random" -version = "0.1.18" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "87e00182fe74b066627d63b85fd550ac2998d4b0bd86bfed477a0ae4c7c71359" -dependencies = [ - "const-random-macro", -] - -[[package]] -name = "const-random-macro" -version = "0.1.16" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f9d839f2a20b0aee515dc581a6172f2321f96cab76c1a38a4c584a194955390e" -dependencies = [ - "getrandom 0.2.17", - "once_cell", - "tiny-keccak", -] - [[package]] name = "convert_case" version = "0.10.0" @@ -1231,6 +1200,20 @@ dependencies = [ "syn 2.0.117", ] +[[package]] +name = "dashmap" +version = "6.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6361d5c062261c78a176addb82d4c821ae42bed6089de0e12603cd25de2059c" +dependencies = [ + "cfg-if", + "crossbeam-utils", + "hashbrown 0.14.5", + "lock_api", + "once_cell", + "parking_lot_core", +] + [[package]] name = "data-encoding" version = "2.11.0" @@ -1658,18 +1641,6 @@ dependencies = [ "miniz_oxide", ] -[[package]] -name = "flurry" -version = "0.5.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cf5efcf77a4da27927d3ab0509dec5b0954bb3bc59da5a1de9e52642ebd4cdf9" -dependencies = [ - "ahash", - "num_cpus", - "parking_lot", - "seize", -] - [[package]] name = "fnv" version = "1.0.7" @@ -1886,6 +1857,18 @@ version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0cc23270f6e1808e30a928bdc84dea0b9b4136a8bc82338574f23baf47bbd280" +[[package]] +name = "gloo-timers" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "482ce8a491a501da4cd806bd190275363d674f2845005c6ddbd5d3e1dd54495d" +dependencies = [ + "futures-channel", + "futures-core", + "js-sys", + "wasm-bindgen", +] + [[package]] name = "group" version = "0.13.0" @@ -1927,6 +1910,12 @@ dependencies = [ "zerocopy", ] +[[package]] +name = "hashbrown" +version = "0.14.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" + [[package]] name = "hashbrown" version = "0.15.5" @@ -1973,12 +1962,6 @@ dependencies = [ "libc", ] -[[package]] -name = "hermit-abi" -version = "0.5.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fc0fef456e4baa96da950455cd02c081ca953b141298e41db3fc7e36b1da849c" - [[package]] name = "hex" version = "0.4.3" @@ -2467,13 +2450,12 @@ dependencies = [ [[package]] name = "js-sys" -version = "0.3.95" +version = "0.3.102" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2964e92d1d9dc3364cae4d718d93f227e3abb088e747d92e0395bfdedf1c12ca" +checksum = "03d04c30968dffe80775bd4d7fb676131cd04a1fb46d2686dbffbaec2d9dfd31" dependencies = [ "cfg-if", "futures-util", - "once_cell", "wasm-bindgen", ] @@ -2886,16 +2868,6 @@ dependencies = [ "libm", ] -[[package]] -name = "num_cpus" -version = "1.17.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "91df4bbde75afed763b708b7eee1e8e7651e02d97f6d5dd763e89367e957b23b" -dependencies = [ - "hermit-abi 0.5.2", - "libc", -] - [[package]] name = "num_threads" version = "0.1.7" @@ -4265,12 +4237,6 @@ dependencies = [ "libc", ] -[[package]] -name = "seize" -version = "0.3.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "689224d06523904ebcc9b482c6a3f4f7fb396096645c4cd10c0d2ff7371a34d3" - [[package]] name = "semver" version = "1.0.28" @@ -4891,15 +4857,6 @@ version = "0.1.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7694e1cfe791f8d31026952abf09c69ca6f6fa4e1a1229e18988f06a04a12dca" -[[package]] -name = "tiny-keccak" -version = "2.0.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2c9d3793400a45f954c52e73d068316d76b6f4e36977e3fcebb13a2721e80237" -dependencies = [ - "crunchy", -] - [[package]] name = "tinystr" version = "0.8.3" @@ -4989,6 +4946,7 @@ dependencies = [ "bytes", "futures-core", "futures-sink", + "futures-util", "pin-project-lite", "tokio", ] @@ -5357,9 +5315,9 @@ dependencies = [ [[package]] name = "wasm-bindgen" -version = "0.2.118" +version = "0.2.125" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0bf938a0bacb0469e83c1e148908bd7d5a6010354cf4fb73279b7447422e3a89" +checksum = "8ddb3f79143bced6de84270411622a2699cee572fc0875aeaf1e7867cf9fca1a" dependencies = [ "cfg-if", "once_cell", @@ -5370,9 +5328,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-futures" -version = "0.4.68" +version = "0.4.75" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f371d383f2fb139252e0bfac3b81b265689bf45b6874af544ffa4c975ac1ebf8" +checksum = "503b14d284f2c8dac03b819967e155ea753f573586193b2b2c95990cb5d69280" dependencies = [ "js-sys", "wasm-bindgen", @@ -5380,9 +5338,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro" -version = "0.2.118" +version = "0.2.125" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eeff24f84126c0ec2db7a449f0c2ec963c6a49efe0698c4242929da037ca28ed" +checksum = "4e21a184b13fb19e157296e2c46056aec9092264fab83e4ba59e68c61b323c3d" dependencies = [ "quote", "wasm-bindgen-macro-support", @@ -5390,9 +5348,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro-support" -version = "0.2.118" +version = "0.2.125" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9d08065faf983b2b80a79fd87d8254c409281cf7de75fc4b773019824196c904" +checksum = "fecefd9c35bd935a20fc3fc344b5f29138961e4f47fb03297d88f2587afb5ebd" dependencies = [ "bumpalo", "proc-macro2", @@ -5403,9 +5361,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-shared" -version = "0.2.118" +version = "0.2.125" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5fd04d9e306f1907bd13c6361b5c6bfc7b3b3c095ed3f8a9246390f8dbdee129" +checksum = "23939e44bb9a5d7576fa2b563dc2e136628f1224e88a8deed09e04858b77871f" dependencies = [ "unicode-ident", ] @@ -5446,9 +5404,9 @@ dependencies = [ [[package]] name = "web-sys" -version = "0.3.95" +version = "0.3.102" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4f2dfbb17949fa2088e5d39408c48368947b86f7834484e87b73de55bc14d97d" +checksum = "a6430a72df5eb332242960fe84b3002a241163998241eb596d4f739b9757061d" dependencies = [ "js-sys", "wasm-bindgen", diff --git a/Cargo.toml b/Cargo.toml index 9eda90e8..2c445b2b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,8 +24,10 @@ tokio = { version = "1.52.1", features = ["full"] } # - Development: uses local path (crates/bssh-russh) # - Publishing: uses crates.io version (path ignored) russh = { package = "bssh-russh", version = "0.60.1", path = "crates/bssh-russh" } -# Use our internal russh-sftp fork with a serde_bytes perf fix -russh-sftp = { package = "bssh-russh-sftp", version = "2.1.1", path = "crates/bssh-russh-sftp" } +# Local fork of russh-sftp tracking upstream 2.3.0; only delta is the proposed +# `read_to_writer_pipelined` helper (AspectUnk/russh-sftp#91). When that PR +# merges and lands in a russh-sftp release, drop this fork. +russh-sftp = { package = "bssh-russh-sftp", version = "2.3.0", path = "crates/bssh-russh-sftp" } clap = { version = "4.6.1", features = ["derive", "env"] } anyhow = "1.0.102" thiserror = "2.0.18" diff --git a/crates/bssh-russh-sftp/Cargo.toml b/crates/bssh-russh-sftp/Cargo.toml index 9bd21c6c..3e4379c0 100644 --- a/crates/bssh-russh-sftp/Cargo.toml +++ b/crates/bssh-russh-sftp/Cargo.toml @@ -1,8 +1,8 @@ [package] name = "bssh-russh-sftp" -version = "2.1.1" +version = "2.3.0" authors = ["Jeongkyu Shin "] -description = "Temporary fork of russh-sftp with a serde_bytes performance fix for SFTP Write/Data packets" +description = "Temporary fork of russh-sftp tracking upstream 2.3.0; only delta is the proposed `read_to_writer_pipelined` helper (AspectUnk/russh-sftp#91)" documentation = "https://docs.rs/bssh-russh-sftp" edition = "2021" homepage = "https://github.com/lablup/bssh" @@ -19,18 +19,23 @@ tokio = { version = "1", default-features = false, features = [ "time", "macros", ] } -tokio-util = "0.7" +tokio-util = { version = "0.7.18", default-features = false, features = ["rt"] } +serde = { version = "1.0.228", features = ["derive"] } +serde_bytes = "0.11.19" +bitflags = { version = "2.11.1", features = ["serde"] } +async-trait = { version = "0.1.89", optional = true } + +thiserror = "2.0.18" +chrono = "0.4.44" +bytes = "1.11.1" +log = "0.4.29" +dashmap = "6.1.0" + futures = { version = "0.3", default-features = false, features = ["std", "async-await"] } -serde = { version = "1.0", features = ["derive"] } -serde_bytes = "0.11" -bitflags = { version = "2.9", features = ["serde"] } -async-trait = { version = "0.1", optional = true } -thiserror = "2.0" -chrono = "0.4" -bytes = "1.10" -log = "0.4" -flurry = "0.5" +[target.'cfg(target_arch = "wasm32")'.dependencies] +wasm-bindgen-futures = "0.4.71" +gloo-timers = { version = "0.4.0", features = ["futures"] } [features] async-trait = ["dep:async-trait"] diff --git a/crates/bssh-russh-sftp/README.md b/crates/bssh-russh-sftp/README.md index f94ae445..2eb1cbfa 100644 --- a/crates/bssh-russh-sftp/README.md +++ b/crates/bssh-russh-sftp/README.md @@ -1,26 +1,28 @@ # bssh-russh-sftp -Temporary fork of [russh-sftp](https://crates.io/crates/russh-sftp) with a `serde_bytes` performance fix for SFTP `Write` and `Data` packets. +Temporary fork of [russh-sftp](https://crates.io/crates/russh-sftp) tracking upstream `master` (currently 2.3.0). -This crate exists so bssh can ship the packet serialization fix independently while keeping the public crate name usable through Cargo's `package = "bssh-russh-sftp"` dependency alias. +The only delta vs upstream is the proposed `File::read_to_writer_pipelined` helper ([AspectUnk/russh-sftp#91](https://github.com/AspectUnk/russh-sftp/pull/91)) — a pipelined SFTP `READ` wrapper that hides per-request RTT, mirroring how OpenSSH's `sftp` client keeps ~64 outstanding requests by default. The matching write-side optimization is already upstream (see [AspectUnk/russh-sftp#85](https://github.com/AspectUnk/russh-sftp/pull/85)) and `AsyncWrite for File` is now natively pipelined, so no `write_all_pipelined` wrapper is needed here. -## The Problem +This crate exists so bssh can ship the read-side helper today while [#91](https://github.com/AspectUnk/russh-sftp/pull/91) is in review. Once that merges and lands in a `russh-sftp` release, this fork can be deprecated in favor of upstream. -`russh-sftp` 2.1.1 derives serde for `Vec` fields in `SSH_FXP_WRITE` and `SSH_FXP_DATA`. With the crate's custom deserializer, that routes through `deserialize_seq` and reads payload bytes one at a time. Large transfers spend substantial CPU in serde's generic `VecVisitor` path. +## History -## The Fix - -The fork annotates the binary payload fields with `#[serde(with = "serde_bytes")]` and implements compatible `serialize_bytes` framing in the SFTP serializer. The wire format remains `u32 length + bytes`, but deserialization uses the existing bulk byte-buffer path. +| Concern | Status | +|---|---| +| `serde_bytes` perf for `WRITE`/`DATA` payloads (~+29%) | ✅ Upstream — [AspectUnk/russh-sftp#83](https://github.com/AspectUnk/russh-sftp/pull/83) (merged 2026-04-30, in 2.3.0) | +| Pipelined `AsyncWrite for File` (dynamic chunk sizes, `write_acks` queue) | ✅ Upstream — [AspectUnk/russh-sftp#85](https://github.com/AspectUnk/russh-sftp/pull/85) (merged 2026-05-01, in 2.3.0) | +| Pipelined `File::read_to_writer_pipelined` for high-RTT reads | 🟡 Under review — [AspectUnk/russh-sftp#91](https://github.com/AspectUnk/russh-sftp/pull/91) | ## Sync with Upstream ```bash cd crates/bssh-russh-sftp -./sync-upstream.sh 2.1.1 +./sync-upstream.sh master # or pin to a specific commit / tag ``` -Local changes are kept as patch files under `patches/`. +Currently no patch files are needed — the read-side helper lives directly in `src/client/fs/file.rs` until #91 merges. ## License -Apache-2.0 (same as russh-sftp) +Apache-2.0 (same as russh-sftp). diff --git a/crates/bssh-russh-sftp/create-patch.sh b/crates/bssh-russh-sftp/create-patch.sh index 9011ae52..492527ee 100755 --- a/crates/bssh-russh-sftp/create-patch.sh +++ b/crates/bssh-russh-sftp/create-patch.sh @@ -1,48 +1,15 @@ #!/bin/bash -# create-patch.sh -# Creates a patch file from the current bssh-russh-sftp changes compared to upstream russh-sftp. +# create-patch.sh — placeholder # -# Usage: ./create-patch.sh - -set -e - -SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" -BSSH_ROOT="$SCRIPT_DIR/../.." -UPSTREAM_DIR="$BSSH_ROOT/references/russh-sftp/src" -CURRENT_DIR="$SCRIPT_DIR/src" -PATCH_DIR="$SCRIPT_DIR/patches" -PATCH_FILE="$PATCH_DIR/sftp-serde-bytes-perf.patch" - -GREEN='\033[0;32m' -YELLOW='\033[1;33m' -NC='\033[0m' - -log_info() { echo -e "${GREEN}[INFO]${NC} $1"; } -log_warn() { echo -e "${YELLOW}[WARN]${NC} $1"; } - -if [ ! -d "$UPSTREAM_DIR" ]; then - echo "Error: Upstream russh-sftp not found at $UPSTREAM_DIR" - echo "Please ensure references/russh-sftp exists with the upstream source." - exit 1 -fi - -mkdir -p "$PATCH_DIR" - -log_info "Creating patch from differences..." - -/usr/bin/diff -urN "$UPSTREAM_DIR" "$CURRENT_DIR" \ - | sed "s|$UPSTREAM_DIR|a/src|g" \ - | sed "s|$CURRENT_DIR|b/src|g" \ - > "$PATCH_FILE" || true - -if [ -s "$PATCH_FILE" ]; then - LINES=$(wc -l < "$PATCH_FILE" | tr -d ' ') - log_info "Patch created: $PATCH_FILE ($LINES lines)" +# As of upstream russh-sftp 2.3.0 (perf serde_bytes #83 + write pipelined #85), +# this fork carries only a single non-upstreamed delta — the +# `read_to_writer_pipelined` helper, proposed in AspectUnk/russh-sftp#91. +# That helper lives directly in src/client/fs/file.rs, so no patch file is +# needed. +# +# When future local-only changes accumulate again, re-introduce the +# diff-extraction logic here (the obsolete sftp-serde-bytes-perf.patch tooling +# is available in git history as a starting point). - echo "" - echo "Patch summary:" - echo "==============" - grep -E "^@@|^\+\+\+|^---" "$PATCH_FILE" | head -20 -else - log_warn "No differences found - patch file is empty" -fi +echo "No patches to create — single in-tree delta lives at src/client/fs/file.rs::read_to_writer_pipelined." +exit 0 diff --git a/crates/bssh-russh-sftp/patches/sftp-serde-bytes-perf.patch b/crates/bssh-russh-sftp/patches/sftp-serde-bytes-perf.patch deleted file mode 100644 index d896ae13..00000000 --- a/crates/bssh-russh-sftp/patches/sftp-serde-bytes-perf.patch +++ /dev/null @@ -1,193 +0,0 @@ -diff -urN a/src/lib.rs src/lib.rs ---- a/src/lib.rs 2026-04-21 17:00:59 -+++ b/src/lib.rs 2026-04-21 17:05:30 -@@ -1,3 +1,6 @@ -+// Lints tripped by vendored upstream source that we do not want to diverge from. -+#![allow(clippy::io_other_error)] -+ - //! SFTP subsystem with client and server support for Russh and more! - //! - //! Crate can provide compatibility with anything that can provide the raw data -diff -urN a/src/protocol/data.rs src/protocol/data.rs ---- a/src/protocol/data.rs 2026-04-21 17:00:59 -+++ b/src/protocol/data.rs 2026-04-21 17:00:36 -@@ -4,6 +4,7 @@ - #[derive(Debug, Serialize, Deserialize)] - pub struct Data { - pub id: u32, -+ #[serde(with = "serde_bytes")] - pub data: Vec, - } - -diff -urN a/src/protocol/write.rs src/protocol/write.rs ---- a/src/protocol/write.rs 2026-04-21 17:00:59 -+++ b/src/protocol/write.rs 2026-04-21 17:00:36 -@@ -6,6 +6,7 @@ - pub id: u32, - pub handle: String, - pub offset: u64, -+ #[serde(with = "serde_bytes")] - pub data: Vec, - } - -diff -urN a/src/de.rs src/de.rs ---- a/src/de.rs 2026-04-21 17:00:59 -+++ b/src/de.rs 2026-04-29 03:15:42 -@@ -170,7 +170,7 @@ - where - V: serde::de::Visitor<'de>, - { -- self.deserialize_bytes(visitor) -+ visitor.visit_byte_buf(self.input.try_get_bytes()?) - } - - fn deserialize_option(self, _visitor: V) -> Result -diff -urN a/src/protocol/mod.rs src/protocol/mod.rs ---- a/src/protocol/mod.rs 2026-04-21 17:00:59 -+++ b/src/protocol/mod.rs 2026-04-29 03:15:42 -@@ -270,11 +270,112 @@ - Packet::ExtendedReply(reply) => (SSH_FXP_EXTENDED_REPLY, ser::to_bytes(&reply)?), - }; - -- let length = payload.len() as u32 + 1; -+ let length = payload -+ .len() -+ .checked_add(1) -+ .and_then(|len| u32::try_from(len).ok()) -+ .ok_or_else(|| Error::BadMessage("packet length exceeds u32".to_owned()))?; - let mut bytes = BytesMut::new(); - bytes.put_u32(length); - bytes.put_u8(r#type); - bytes.put_slice(&payload); - Ok(bytes.freeze()) -+ } -+} -+ -+#[cfg(test)] -+mod tests { -+ use super::*; -+ -+ #[test] -+ fn write_packet_uses_length_prefixed_bulk_data() { -+ let packet = Packet::Write(Write { -+ id: 7, -+ handle: "h".to_owned(), -+ offset: 9, -+ data: vec![0, 1, 2, 3], -+ }); -+ -+ let encoded = Bytes::try_from(packet).expect("serialize write packet"); -+ assert_eq!( -+ encoded.as_ref(), -+ &[ -+ 0, -+ 0, -+ 0, -+ 26, // packet length -+ SSH_FXP_WRITE, -+ 0, -+ 0, -+ 0, -+ 7, // request id -+ 0, -+ 0, -+ 0, -+ 1, -+ b'h', // handle -+ 0, -+ 0, -+ 0, -+ 0, -+ 0, -+ 0, -+ 0, -+ 9, // offset -+ 0, -+ 0, -+ 0, -+ 4, -+ 0, -+ 1, -+ 2, -+ 3, // data -+ ] -+ ); -+ -+ let mut payload = encoded.slice(4..); -+ let decoded = Packet::try_from(&mut payload).expect("deserialize write packet"); -+ match decoded { -+ Packet::Write(write) => assert_eq!(write.data, [0, 1, 2, 3]), -+ _ => panic!("expected write packet"), -+ } -+ } -+ -+ #[test] -+ fn data_packet_uses_length_prefixed_bulk_data() { -+ let packet = Packet::Data(Data { -+ id: 8, -+ data: vec![4, 5, 6], -+ }); -+ -+ let encoded = Bytes::try_from(packet).expect("serialize data packet"); -+ assert_eq!( -+ encoded.as_ref(), -+ &[ -+ 0, -+ 0, -+ 0, -+ 12, // packet length -+ SSH_FXP_DATA, -+ 0, -+ 0, -+ 0, -+ 8, // request id -+ 0, -+ 0, -+ 0, -+ 3, -+ 4, -+ 5, -+ 6, // data -+ ] -+ ); -+ -+ let mut payload = encoded.slice(4..); -+ let decoded = Packet::try_from(&mut payload).expect("deserialize data packet"); -+ match decoded { -+ Packet::Data(data) => assert_eq!(data.data, [4, 5, 6]), -+ _ => panic!("expected data packet"), -+ } - } - } -diff -urN a/src/ser.rs src/ser.rs ---- a/src/ser.rs 2026-04-21 17:00:59 -+++ b/src/ser.rs 2026-04-21 17:00:36 -@@ -103,8 +103,12 @@ - Ok(()) - } - -- fn serialize_bytes(self, _v: &[u8]) -> Result { -- Err(Error::BadMessage("bytes not supported".to_owned())) -+ fn serialize_bytes(self, v: &[u8]) -> Result { -+ let len = u32::try_from(v.len()) -+ .map_err(|_| Error::BadMessage("bytes length exceeds u32".to_owned()))?; -+ self.output.put_u32(len); -+ self.output.put_slice(v); -+ Ok(()) - } - - fn serialize_none(self) -> Result { -diff -urN a/src/utils.rs src/utils.rs ---- a/src/utils.rs 2026-04-21 17:00:59 -+++ b/src/utils.rs 2026-04-21 17:04:11 -@@ -9,9 +9,7 @@ - DateTime::::from(time).timestamp() as u32 - } - --pub async fn read_packet( -- stream: &mut S, --) -> Result { -+pub async fn read_packet(stream: &mut S) -> Result { - let length = stream.read_u32().await?; - - let mut buf = vec![0; length as usize]; diff --git a/crates/bssh-russh-sftp/src/client/error.rs b/crates/bssh-russh-sftp/src/client/error.rs index 09e1c10a..7d32aa5d 100644 --- a/crates/bssh-russh-sftp/src/client/error.rs +++ b/crates/bssh-russh-sftp/src/client/error.rs @@ -2,10 +2,8 @@ use std::io; use thiserror::Error; use tokio::sync::mpsc::error::SendError as MpscSendError; use tokio::sync::oneshot::error::RecvError as OneshotRecvError; -use tokio::time::error::Elapsed as TimeElapsed; -use crate::error; -use crate::protocol::Status; +use crate::{error, protocol::Status}; /// Enum for client errors #[derive(Debug, Clone, Error)] @@ -54,12 +52,6 @@ impl From for Error { } } -impl From for Error { - fn from(_: TimeElapsed) -> Self { - Self::Timeout - } -} - impl From for Error { fn from(error: error::Error) -> Self { Self::UnexpectedBehavior(error.to_string()) diff --git a/crates/bssh-russh-sftp/src/client/fs/dir.rs b/crates/bssh-russh-sftp/src/client/fs/dir.rs index 7d71de1c..f61f12d9 100644 --- a/crates/bssh-russh-sftp/src/client/fs/dir.rs +++ b/crates/bssh-russh-sftp/src/client/fs/dir.rs @@ -1,4 +1,4 @@ -use std::collections::VecDeque; +use std::{collections::VecDeque, sync::Arc}; use super::Metadata; use crate::protocol::FileType; @@ -6,6 +6,7 @@ use crate::protocol::FileType; /// Entries returned by the [`ReadDir`] iterator. #[derive(Debug)] pub struct DirEntry { + parent: Arc, file: String, metadata: Metadata, } @@ -25,10 +26,29 @@ impl DirEntry { pub fn metadata(&self) -> Metadata { self.metadata.to_owned() } + + /// Returns the full path of the file that this entry points at. + /// + /// The returned path is built by joining the path originally passed to + /// [`SftpSession::read_dir`](crate::client::SftpSession::read_dir) with + /// [`DirEntry::file_name`] using `/` as the separator (SFTP always uses + /// POSIX-style paths on the wire). No canonicalization is performed, so a + /// relative input yields a relative result — mirroring the behaviour of + /// [`std::fs::DirEntry::path`]. + pub fn path(&self) -> String { + if self.parent.is_empty() { + self.file.clone() + } else if self.parent.ends_with('/') { + format!("{}{}", self.parent, self.file) + } else { + format!("{}/{}", self.parent, self.file) + } + } } /// Iterator over the entries in a remote directory. pub struct ReadDir { + pub(crate) parent: Arc, pub(crate) entries: VecDeque<(String, Metadata)>, } @@ -40,6 +60,7 @@ impl Iterator for ReadDir { None => None, Some(entry) if entry.0 == "." || entry.0 == ".." => self.next(), Some(entry) => Some(DirEntry { + parent: self.parent.clone(), file: entry.0, metadata: entry.1, }), diff --git a/crates/bssh-russh-sftp/src/client/fs/file.rs b/crates/bssh-russh-sftp/src/client/fs/file.rs index 76e3935c..9b7c02d4 100644 --- a/crates/bssh-russh-sftp/src/client/fs/file.rs +++ b/crates/bssh-russh-sftp/src/client/fs/file.rs @@ -1,5 +1,6 @@ use std::{ - future::Future, + collections::VecDeque, + future::{self, Future}, io::{self, SeekFrom}, pin::Pin, sync::Arc, @@ -7,26 +8,28 @@ use std::{ }; use tokio::{ io::{AsyncRead, AsyncSeek, AsyncWrite, ReadBuf}, - runtime::Handle, + sync::oneshot, }; use super::Metadata; use crate::{ - client::{error::Error, rawsession::SftpResult, session::Extensions, RawSftpSession}, - protocol::StatusCode, + client::{error::Error, rawsession::SftpResult, session::Features, RawSftpSession}, + protocol::{Packet, StatusCode}, }; type StateFn = Option> + Send + Sync + 'static>>>; -const MAX_READ_LENGTH: u64 = 261120; -const MAX_WRITE_LENGTH: u64 = 261120; +// read packet overhead: type(1) + id(4) + data_len(4) +const READ_OVERHEAD_LENGTH: u32 = 9; +// write packet overhead excluding handle: type(1) + id(4) + handle_len(4) + offset(8) + data_len(4) +const WRITE_OVERHEAD_LENGTH: u32 = 21; struct FileState { f_read: StateFn>>, f_seek: StateFn, - f_write: StateFn, f_flush: StateFn<()>, f_shutdown: StateFn<()>, + write_acks: VecDeque>>, } /// Provides high-level methods for interaction with a remote file. @@ -43,28 +46,24 @@ pub struct File { state: FileState, pos: u64, closed: bool, - extensions: Arc, + features: Features, } impl File { - pub(crate) fn new( - session: Arc, - handle: String, - extensions: Arc, - ) -> Self { + pub(crate) fn new(session: Arc, handle: String, features: Features) -> Self { Self { session, handle, state: FileState { f_read: None, f_seek: None, - f_write: None, f_flush: None, f_shutdown: None, + write_acks: VecDeque::with_capacity(features.max_concurrent_writes), }, pos: 0, closed: false, - extensions, + features, } } @@ -86,105 +85,32 @@ impl File { /// If the server does not support `fsync@openssh.com` sending the request will /// be omitted, but will still pseudo-successfully pub async fn sync_all(&self) -> SftpResult<()> { - if !self.extensions.fsync { + if !self.features.fsync { return Ok(()); } self.session.fsync(self.handle.as_str()).await.map(|_| ()) } - /// Streams `reader` to this remote file with up to `max_inflight` concurrent - /// SFTP `WRITE` requests in flight. Each request carries up to the negotiated - /// `write_len` (or [`MAX_WRITE_LENGTH`] when no limit is advertised). + /// Streams the remote file from the current position to `writer` using up + /// to `max_inflight` concurrent SFTP `READ` requests, hiding per-request + /// RTT. Each request asks for up to the negotiated `read_len` + /// (or `max_packet_len - READ_OVERHEAD_LENGTH` when no limit is advertised). /// - /// The high-level [`AsyncWrite`] impl issues one `WRITE` at a time and waits - /// for its `STATUS` reply before sending the next, so sustained throughput is - /// bounded by `chunk_size / RTT`. This helper hides the per-request RTT by - /// keeping multiple in-flight, mirroring how OpenSSH's `sftp` client behaves - /// (~64 outstanding requests by default). + /// The [`AsyncRead`] impl issues one `READ` at a time and waits for the + /// reply before sending the next, so sustained throughput is bounded by + /// `chunk_size / RTT`. This helper mirrors how OpenSSH's `sftp` client + /// keeps ~64 outstanding requests by default, so on a long-RTT link + /// (e.g. transcontinental SSH) it can saturate the channel. /// - /// On success returns the number of bytes streamed. Updates `self.pos` to - /// the new write offset. Reading from `reader` and dispatching writes are - /// interleaved, so memory usage is bounded by `max_inflight * chunk_size`. - pub async fn write_all_pipelined( - &mut self, - reader: &mut R, - max_inflight: usize, - ) -> SftpResult - where - R: tokio::io::AsyncRead + Unpin, - { - use futures::stream::{FuturesUnordered, StreamExt}; - use tokio::io::AsyncReadExt; - - if max_inflight == 0 { - return Err(Error::UnexpectedBehavior( - "max_inflight must be at least 1".to_owned(), - )); - } - - let chunk_size = self - .extensions - .limits - .as_ref() - .and_then(|l| l.write_len) - .map(|n| n as usize) - .unwrap_or(MAX_WRITE_LENGTH as usize); - - let mut total: u64 = 0; - let mut offset = self.pos; - let mut in_flight = FuturesUnordered::new(); - let mut eof = false; - - loop { - // Top up the pipeline with new chunks until we hit the cap or EOF. - while !eof && in_flight.len() < max_inflight { - let mut buf = vec![0u8; chunk_size]; - let n = reader.read(&mut buf).await.map_err(io::Error::from)?; - if n == 0 { - eof = true; - break; - } - buf.truncate(n); - - let session = self.session.clone(); - let handle = self.handle.clone(); - let off = offset; - - in_flight.push(async move { - session.write(handle, off, buf).await?; - SftpResult::Ok(n as u64) - }); - - offset += n as u64; - total += n as u64; - } - - // Drain at least one in-flight write before reading more, otherwise - // we busy-loop the read path while writes never get a chance to make - // progress. - match in_flight.next().await { - Some(Ok(_)) => {} - Some(Err(e)) => return Err(e), - None => break, // pipeline drained and no more data → done - } - } - - self.pos = offset; - Ok(total) - } - - /// Streams the remote file from the current position to `writer` using up to - /// `max_inflight` concurrent SFTP `READ` requests. Each request asks for up - /// to the negotiated `read_len` (or [`MAX_READ_LENGTH`] when no limit is - /// advertised). - /// - /// Like [`Self::write_all_pipelined`], this hides per-request RTT. Chunks - /// are reassembled in offset order before being written to `writer`, so the - /// output is identical to a sequential read. Stops on the first server - /// short read (server signalled EOF). + /// Chunks are reassembled in offset order before being written to `writer`, + /// so the output is byte-identical to a sequential read. Stops cleanly on + /// the first server-signalled EOF (either an `Eof` status or a short + /// read). /// - /// Returns the number of bytes streamed. Updates `self.pos`. + /// Returns the number of bytes streamed. Updates `self.pos` to the new + /// read offset. Memory usage is bounded by `max_inflight * chunk_size` + /// (chunks held in an in-order reassembly buffer plus in-flight requests). pub async fn read_to_writer_pipelined( &mut self, writer: &mut W, @@ -204,12 +130,14 @@ impl File { } let chunk_size = self - .extensions + .features .limits - .as_ref() .and_then(|l| l.read_len) - .map(|n| n as usize) - .unwrap_or(MAX_READ_LENGTH as usize); + .unwrap_or_else(|| { + self.features + .max_packet_len + .saturating_sub(READ_OVERHEAD_LENGTH) as u64 + }) as usize; let mut total: u64 = 0; let mut next_offset = self.pos; @@ -228,9 +156,9 @@ impl File { in_flight.push(async move { match session.read(handle, off, len).await { - Ok(data) => SftpResult::Ok((off, Some(data.data))), + Ok(data) => Ok::<(u64, Option>), Error>((off, Some(data.data))), Err(Error::Status(s)) if s.status_code == StatusCode::Eof => { - SftpResult::Ok((off, None)) + Ok((off, None)) } Err(e) => Err(e), } @@ -241,6 +169,7 @@ impl File { match in_flight.next().await { Some(Ok((off, Some(data)))) => { + let data: Vec = data; if data.is_empty() { eof = true; } else { @@ -254,7 +183,7 @@ impl File { None => break, } - // Flush in-order chunks to writer as they become available. + // Flush in-order chunks to `writer` as they become available. while let Some(chunk) = pending.remove(&next_to_write) { let n = chunk.len() as u64; writer.write_all(&chunk).await.map_err(io::Error::from)?; @@ -268,20 +197,52 @@ impl File { } } +fn check_write_result( + result: Result, oneshot::error::RecvError>, +) -> io::Result<()> { + match result { + Err(_) => Err(io::Error::new( + io::ErrorKind::BrokenPipe, + "write channel closed", + )), + Ok(Ok(Packet::Status(s))) if s.status_code == StatusCode::Ok => Ok(()), + Ok(Ok(Packet::Status(s))) => Err(io::Error::other(s.error_message)), + Ok(Ok(_)) => Err(io::Error::other("unexpected response packet")), + Ok(Err(e)) => Err(io::Error::other(e.to_string())), + } +} + +fn poll_oldest_write( + pending: &mut VecDeque>>, + cx: &mut Context<'_>, +) -> Option>> { + let rx = pending.front_mut()?; + Some(match Pin::new(rx).poll(cx) { + Poll::Pending => Poll::Pending, + Poll::Ready(r) => { + pending.pop_front(); + Poll::Ready(check_write_result(r)) + } + }) +} + +fn poll_drain_writes( + pending: &mut VecDeque>>, + cx: &mut Context<'_>, +) -> Poll> { + while let Some(poll) = poll_oldest_write(pending, cx) { + ready!(poll)?; + } + Poll::Ready(Ok(())) +} + impl Drop for File { fn drop(&mut self) { if self.closed { return; } - if let Ok(handle) = Handle::try_current() { - let session = self.session.clone(); - let file_handle = self.handle.clone(); - - handle.spawn(async move { - let _ = session.close(file_handle).await; - }); - } + let _ = self.session.close_nowait(std::mem::take(&mut self.handle)); } } @@ -296,30 +257,28 @@ impl AsyncRead for File { None => { let session = self.session.clone(); let max_read_len = self - .extensions + .features .limits - .as_ref() .and_then(|l| l.read_len) - .unwrap_or(MAX_READ_LENGTH) as usize; + .unwrap_or_else(|| { + self.features + .max_packet_len + .saturating_sub(READ_OVERHEAD_LENGTH) as u64 + }) as usize; let file_handle = self.handle.clone(); let offset = self.pos; - let len = if buf.remaining() > max_read_len { - max_read_len - } else { - buf.remaining() - }; + let len = usize::min(buf.remaining(), max_read_len); self.state.f_read.get_or_insert(Box::pin(async move { let result = session.read(file_handle, offset, len as u32).await; - match result { Ok(data) => Ok(Some(data.data)), Err(Error::Status(status)) if status.status_code == StatusCode::Eof => { Ok(None) } - Err(e) => Err(io::Error::new(io::ErrorKind::Other, e.to_string())), + Err(e) => Err(io::Error::other(e.to_string())), } })) } @@ -345,51 +304,49 @@ impl AsyncRead for File { impl AsyncSeek for File { fn start_seek(mut self: Pin<&mut Self>, position: io::SeekFrom) -> io::Result<()> { - match self.state.f_seek { - Some(_) => Err(io::Error::new( - io::ErrorKind::Other, + if self.state.f_seek.is_some() { + return Err(io::Error::other( "other file operation is pending, call poll_complete before start_seek", - )), - None => { + )); + } + + self.state.f_seek = Some(match position { + SeekFrom::Start(pos) => Box::pin(future::ready(Ok(pos))), + SeekFrom::Current(pos) => { + let new_pos = self.pos as i64 + pos; + if new_pos < 0 { + return Err(io::Error::other( + "cannot move file pointer before the beginning", + )); + } + Box::pin(future::ready(Ok(new_pos as u64))) + } + SeekFrom::End(pos) => { let session = self.session.clone(); let file_handle = self.handle.clone(); - let cur_pos = self.pos as i64; - - self.state.f_seek = Some(Box::pin(async move { - let new_pos = match position { - SeekFrom::Start(pos) => pos as i64, - SeekFrom::Current(pos) => cur_pos + pos, - SeekFrom::End(pos) => { - let result = session - .fstat(file_handle) - .await - .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?; - - match result.attrs.size { - Some(size) => size as i64 + pos, - None => { - return Err(io::Error::new( - io::ErrorKind::Other, - "file size unknown", - )) - } + + Box::pin(async move { + let result = session + .fstat(file_handle) + .await + .map_err(|e| io::Error::other(e.to_string()))?; + match result.attrs.size { + Some(size) => { + let new_pos = size as i64 + pos; + if new_pos < 0 { + return Err(io::Error::other( + "cannot move file pointer before the beginning", + )); } + Ok(new_pos as u64) } - }; - - if new_pos < 0 { - return Err(io::Error::new( - io::ErrorKind::Other, - "cannot move file pointer before the beginning", - )); + None => Err(io::Error::other("file size unknown")), } - - Ok(new_pos as u64) - })); - - Ok(()) + }) } - } + }); + + Ok(()) } fn poll_complete(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { @@ -410,51 +367,40 @@ impl AsyncWrite for File { cx: &mut Context<'_>, buf: &[u8], ) -> Poll> { - let poll = Pin::new(match self.state.f_write.as_mut() { - Some(f) => f, - None => { - let session = self.session.clone(); - let max_write_len = self - .extensions - .limits - .as_ref() - .and_then(|l| l.write_len) - .unwrap_or(MAX_WRITE_LENGTH) as usize; - - let file_handle = self.handle.clone(); - let data = buf.to_vec(); - - let offset = self.pos; - let len = if data.len() > max_write_len { - max_write_len - } else { - data.len() - }; - - self.state.f_write.get_or_insert(Box::pin(async move { - session - .write(file_handle, offset, data[..len].to_vec()) - .await - .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?; - Ok(len) - })) + if self.state.write_acks.len() >= self.features.max_concurrent_writes { + if let Some(poll) = poll_oldest_write(&mut self.state.write_acks, cx) { + ready!(poll)?; } - }) - .poll(cx); - - if poll.is_ready() { - self.state.f_write = None; } - if let Poll::Ready(Ok(len)) = poll { - self.pos += len as u64; + let max_write_len = self + .features + .limits + .and_then(|l| l.write_len) + .unwrap_or_else(|| { + let overhead = WRITE_OVERHEAD_LENGTH + self.handle.len() as u32; + self.features.max_packet_len.saturating_sub(overhead) as u64 + }) as usize; + + let len = usize::min(buf.len(), max_write_len); + let data = buf[..len].to_vec(); + let handle = self.handle.clone(); + let offset = self.pos; + + match self.session.write_nowait(handle, offset, data) { + Ok(rx) => { + self.pos += len as u64; + self.state.write_acks.push_back(rx); + Poll::Ready(Ok(len)) + } + Err(e) => Poll::Ready(Err(io::Error::other(e.to_string()))), } - - poll } fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - if !self.extensions.fsync { + ready!(poll_drain_writes(&mut self.state.write_acks, cx))?; + + if !self.features.fsync { return Poll::Ready(Ok(())); } @@ -469,7 +415,7 @@ impl AsyncWrite for File { .fsync(file_handle) .await .map(|_| ()) - .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string())) + .map_err(|e| io::Error::other(e.to_string())) })) } }) @@ -486,6 +432,8 @@ impl AsyncWrite for File { mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll> { + ready!(poll_drain_writes(&mut self.state.write_acks, cx))?; + let poll = Pin::new(match self.state.f_shutdown.as_mut() { Some(f) => f, None => { @@ -496,7 +444,7 @@ impl AsyncWrite for File { session .close(file_handle) .await - .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?; + .map_err(|e| io::Error::other(e.to_string()))?; Ok(()) })) } diff --git a/crates/bssh-russh-sftp/src/client/mod.rs b/crates/bssh-russh-sftp/src/client/mod.rs index 6b4421a3..06cc8ca1 100644 --- a/crates/bssh-russh-sftp/src/client/mod.rs +++ b/crates/bssh-russh-sftp/src/client/mod.rs @@ -2,6 +2,7 @@ pub mod error; pub mod fs; mod handler; pub mod rawsession; +pub(crate) mod runtime; mod session; pub use handler::Handler; @@ -27,6 +28,26 @@ macro_rules! into_wrap { }; } +#[derive(Clone, Debug)] +pub struct Config { + /// Maximum size of a single packet in bytes. Default: 256 KiB. + pub max_packet_len: u32, + /// Maximum number of concurrent in-flight write requests. Default: 8. + pub max_concurrent_writes: usize, + /// Timeout in seconds for each request. Default: 10. + pub request_timeout_secs: u64, +} + +impl Default for Config { + fn default() -> Self { + Self { + max_packet_len: 262144, + max_concurrent_writes: 8, + request_timeout_secs: 10, + } + } +} + async fn execute_handler(bytes: &mut Bytes, handler: &mut H) -> Result<(), error::Error> where H: Handler + Send, @@ -50,7 +71,7 @@ where S: AsyncRead + Unpin, H: Handler + Send, { - let mut bytes = read_packet(stream).await?; + let mut bytes = read_packet(stream, u32::MAX).await?; Ok(execute_handler(&mut bytes, handler).await?) } @@ -67,7 +88,7 @@ where let rc = CancellationToken::new(); let wc = rc.clone(); { - tokio::spawn(async move { + runtime::spawn(async move { loop { select! { result = process_handler(&mut rd, &mut handler) => { @@ -86,7 +107,7 @@ where }); } - tokio::spawn(async move { + runtime::spawn(async move { loop { select! { Some(data) = rx.recv() => { diff --git a/crates/bssh-russh-sftp/src/client/rawsession.rs b/crates/bssh-russh-sftp/src/client/rawsession.rs index 7457f76a..1ea5a40a 100644 --- a/crates/bssh-russh-sftp/src/client/rawsession.rs +++ b/crates/bssh-russh-sftp/src/client/rawsession.rs @@ -1,5 +1,5 @@ use bytes::Bytes; -use flurry::HashMap; +use dashmap::DashMap as HashMap; use std::{ sync::{ atomic::{AtomicU32, AtomicU64, Ordering}, @@ -9,12 +9,12 @@ use std::{ }; use tokio::{ io::{AsyncRead, AsyncWrite}, - sync::{mpsc, RwLock}, - time, + sync::{mpsc, oneshot}, }; -use super::{error::Error, run, Handler}; +use super::{error::Error, runtime, Handler}; use crate::{ + client::{run, Config}, de, extensions::{ self, FsyncExtension, HardlinkExtension, LimitsExtension, Statvfs, StatvfsExtension, @@ -27,7 +27,7 @@ use crate::{ }; pub type SftpResult = Result; -type SharedRequests = HashMap, mpsc::Sender>>; +type SharedRequests = HashMap, oneshot::Sender>>; pub(crate) struct SessionInner { version: Option, @@ -35,8 +35,8 @@ pub(crate) struct SessionInner { } impl SessionInner { - pub async fn reply(&mut self, id: Option, packet: Packet) -> SftpResult<()> { - if let Some(sender) = self.requests.pin().remove(&id) { + pub fn reply(&mut self, id: Option, packet: Packet) -> SftpResult<()> { + if let Some((_, sender)) = self.requests.remove(&id) { let validate = if id.is_some() && self.version.is_none() { Err(Error::UnexpectedPacket) } else if id.is_none() && self.version.is_some() { @@ -45,9 +45,8 @@ impl SessionInner { Ok(()) }; - sender - .try_send(validate.clone().map(|_| packet)) - .map_err(|e| Error::UnexpectedBehavior(e.to_string()))?; + // Ignore send error: receiver was dropped (request timed out). + let _ = sender.send(validate.clone().map(|_| packet)); return validate; } @@ -59,46 +58,44 @@ impl SessionInner { } } -#[cfg_attr(feature = "async-trait", async_trait::async_trait)] impl Handler for SessionInner { type Error = Error; async fn version(&mut self, packet: Version) -> Result<(), Self::Error> { let version = packet.version; - self.reply(None, packet.into()).await?; + self.reply(None, packet.into())?; self.version = Some(version); Ok(()) } async fn name(&mut self, name: Name) -> Result<(), Self::Error> { - self.reply(Some(name.id), name.into()).await + self.reply(Some(name.id), name.into()) } async fn status(&mut self, status: Status) -> Result<(), Self::Error> { - self.reply(Some(status.id), status.into()).await + self.reply(Some(status.id), status.into()) } async fn handle(&mut self, handle: Handle) -> Result<(), Self::Error> { - self.reply(Some(handle.id), handle.into()).await + self.reply(Some(handle.id), handle.into()) } async fn data(&mut self, data: Data) -> Result<(), Self::Error> { - self.reply(Some(data.id), data.into()).await + self.reply(Some(data.id), data.into()) } async fn attrs(&mut self, attrs: Attrs) -> Result<(), Self::Error> { - self.reply(Some(attrs.id), attrs.into()).await + self.reply(Some(attrs.id), attrs.into()) } async fn extended_reply(&mut self, reply: ExtendedReply) -> Result<(), Self::Error> { - self.reply(Some(reply.id), reply.into()).await + self.reply(Some(reply.id), reply.into()) } } #[derive(Debug, Clone, Copy, Default)] pub struct Limits { - // todo: implement - //pub packet_len: Option, + pub packet_len: Option, pub read_len: Option, pub write_len: Option, pub open_handles: Option, @@ -107,30 +104,14 @@ pub struct Limits { impl From for Limits { fn from(limits: LimitsExtension) -> Self { Self { - read_len: if limits.max_read_len > 0 { - Some(limits.max_read_len) - } else { - None - }, - write_len: if limits.max_write_len > 0 { - Some(limits.max_write_len) - } else { - None - }, - open_handles: if limits.max_open_handles > 0 { - Some(limits.max_open_handles) - } else { - None - }, + packet_len: (limits.max_packet_len > 0).then_some(limits.max_packet_len), + read_len: (limits.max_read_len > 0).then_some(limits.max_read_len), + write_len: (limits.max_write_len > 0).then_some(limits.max_write_len), + open_handles: (limits.max_open_handles > 0).then_some(limits.max_open_handles), } } } -pub(crate) struct Options { - timeout: RwLock, - limits: Arc, -} - /// Implements raw work with the protocol in request-response format. /// If the server returns a `Status` packet and it has the code Ok /// then the packet is returned as Ok in other error cases @@ -140,7 +121,8 @@ pub struct RawSftpSession { requests: Arc, next_req_id: AtomicU32, handles: AtomicU64, - options: Options, + timeout: AtomicU64, + limits: Limits, } macro_rules! into_with_status { @@ -165,6 +147,13 @@ macro_rules! into_status { impl RawSftpSession { pub fn new(stream: S) -> Self + where + S: AsyncRead + AsyncWrite + Unpin + Send + 'static, + { + Self::new_with_config(stream, Config::default()) + } + + pub fn new_with_config(stream: S, cfg: Config) -> Self where S: AsyncRead + AsyncWrite + Unpin + Send + 'static, { @@ -179,51 +168,62 @@ impl RawSftpSession { requests: req_map, next_req_id: AtomicU32::new(1), handles: AtomicU64::new(0), - options: Options { - timeout: RwLock::new(10), - limits: Arc::new(Limits::default()), - }, + timeout: AtomicU64::new(cfg.request_timeout_secs), + limits: Limits::default(), } } /// Set the maximum response time in seconds. /// Default: 10 seconds - pub async fn set_timeout(&self, secs: u64) { - *self.options.timeout.write().await = secs; + pub fn set_timeout(&self, secs: u64) { + self.timeout.store(secs, Ordering::Relaxed); } /// Setting limits. For the `limits@openssh.com` extension - pub fn set_limits(&mut self, limits: Arc) { - self.options.limits = limits; + pub fn set_limits(&mut self, limits: Limits) { + self.limits = limits; } - async fn send(&self, id: Option, packet: Packet) -> SftpResult { + fn send( + &self, + id: Option, + packet: Packet, + ) -> SftpResult>> { if self.tx.is_closed() { return Err(Error::UnexpectedBehavior("session closed".into())); } - let (tx, mut rx) = mpsc::channel(1); + let bytes = Bytes::try_from(packet)?; - self.requests.pin().insert(id, tx); - self.tx.send(Bytes::try_from(packet)?)?; + if let Some(max_len) = self.limits.packet_len { + if bytes.len() as u64 > max_len { + return Err(Error::Limited("packet exceeds server limit".to_owned())); + } + } - let timeout = *self.options.timeout.read().await; + let (tx, rx) = oneshot::channel(); + self.requests.insert(id, tx); + self.tx.send(bytes)?; - match time::timeout(Duration::from_secs(timeout), rx.recv()).await { - Ok(Some(result)) => result, - Ok(None) => { - self.requests.pin().remove(&id); - Err(Error::UnexpectedBehavior("recv none message".into())) - } + Ok(rx) + } + + async fn request(&self, id: Option, packet: Packet) -> SftpResult { + let rx = self.send(id, packet)?; + let timeout = self.timeout.load(Ordering::Relaxed); + + match runtime::timeout(Duration::from_secs(timeout), rx).await { + Ok(Ok(result)) => result, + Ok(Err(_)) => Err(Error::UnexpectedBehavior("sender dropped".into())), Err(error) => { - self.requests.pin().remove(&id); - Err(error.into()) + self.requests.remove(&id); + Err(error) } } } fn use_next_id(&self) -> u32 { - self.next_req_id.fetch_add(1, Ordering::SeqCst) + self.next_req_id.fetch_add(1, Ordering::Relaxed) } /// Closes the inner channel stream. Called by [`Drop`] @@ -236,7 +236,7 @@ impl RawSftpSession { } pub async fn init(&self) -> SftpResult { - let result = self.send(None, Init::default().into()).await?; + let result = self.request(None, Init::default().into()).await?; if let Packet::Version(version) = result { Ok(version) } else { @@ -251,7 +251,6 @@ impl RawSftpSession { attrs: FileAttributes, ) -> SftpResult { if self - .options .limits .open_handles .is_some_and(|h| self.handles.load(Ordering::SeqCst) >= h) @@ -261,7 +260,7 @@ impl RawSftpSession { let id = self.use_next_id(); let result = self - .send( + .request( Some(id), Open { id, @@ -283,7 +282,7 @@ impl RawSftpSession { pub async fn close>(&self, handle: H) -> SftpResult { let id = self.use_next_id(); let result = self - .send( + .request( Some(id), Close { id, @@ -313,19 +312,28 @@ impl RawSftpSession { into_status!(result) } + /// Sends a close packet without awaiting the server's acknowledgement. + pub(crate) fn close_nowait( + &self, + handle: String, + ) -> SftpResult>> { + let id = self.use_next_id(); + self.send(Some(id), Close { id, handle }.into()) + } + pub async fn read>( &self, handle: H, offset: u64, len: u32, ) -> SftpResult { - if self.options.limits.read_len.is_some_and(|r| len as u64 > r) { + if self.limits.read_len.is_some_and(|r| len as u64 > r) { return Err(Error::Limited("read limit reached".to_owned())); } let id = self.use_next_id(); let result = self - .send( + .request( Some(id), Read { id, @@ -346,18 +354,13 @@ impl RawSftpSession { offset: u64, data: Vec, ) -> SftpResult { - if self - .options - .limits - .write_len - .is_some_and(|w| data.len() as u64 > w) - { + if self.limits.write_len.is_some_and(|w| data.len() as u64 > w) { return Err(Error::Limited("write limit reached".to_owned())); } let id = self.use_next_id(); let result = self - .send( + .request( Some(id), Write { id, @@ -372,10 +375,34 @@ impl RawSftpSession { into_status!(result) } + /// Sends a write packet without awaiting the server's acknowledgement. + pub(crate) fn write_nowait( + &self, + handle: String, + offset: u64, + data: Vec, + ) -> SftpResult>> { + if self.limits.write_len.is_some_and(|w| data.len() as u64 > w) { + return Err(Error::Limited("write limit reached".to_owned())); + } + + let id = self.use_next_id(); + self.send( + Some(id), + Write { + id, + handle, + offset, + data, + } + .into(), + ) + } + pub async fn lstat>(&self, path: P) -> SftpResult { let id = self.use_next_id(); let result = self - .send( + .request( Some(id), Lstat { id, @@ -391,7 +418,7 @@ impl RawSftpSession { pub async fn fstat>(&self, handle: H) -> SftpResult { let id = self.use_next_id(); let result = self - .send( + .request( Some(id), Fstat { id, @@ -411,7 +438,7 @@ impl RawSftpSession { ) -> SftpResult { let id = self.use_next_id(); let result = self - .send( + .request( Some(id), SetStat { id, @@ -432,7 +459,7 @@ impl RawSftpSession { ) -> SftpResult { let id = self.use_next_id(); let result = self - .send( + .request( Some(id), FSetStat { id, @@ -448,7 +475,6 @@ impl RawSftpSession { pub async fn opendir>(&self, path: P) -> SftpResult { if self - .options .limits .open_handles .is_some_and(|h| self.handles.load(Ordering::SeqCst) >= h) @@ -458,7 +484,7 @@ impl RawSftpSession { let id = self.use_next_id(); let result = self - .send( + .request( Some(id), OpenDir { id, @@ -478,7 +504,7 @@ impl RawSftpSession { pub async fn readdir>(&self, handle: H) -> SftpResult { let id = self.use_next_id(); let result = self - .send( + .request( Some(id), ReadDir { id, @@ -494,7 +520,7 @@ impl RawSftpSession { pub async fn remove>(&self, filename: T) -> SftpResult { let id = self.use_next_id(); let result = self - .send( + .request( Some(id), Remove { id, @@ -514,7 +540,7 @@ impl RawSftpSession { ) -> SftpResult { let id = self.use_next_id(); let result = self - .send( + .request( Some(id), MkDir { id, @@ -531,7 +557,7 @@ impl RawSftpSession { pub async fn rmdir>(&self, path: P) -> SftpResult { let id = self.use_next_id(); let result = self - .send( + .request( Some(id), RmDir { id, @@ -547,7 +573,7 @@ impl RawSftpSession { pub async fn realpath>(&self, path: P) -> SftpResult { let id = self.use_next_id(); let result = self - .send( + .request( Some(id), RealPath { id, @@ -563,7 +589,7 @@ impl RawSftpSession { pub async fn stat>(&self, path: P) -> SftpResult { let id = self.use_next_id(); let result = self - .send( + .request( Some(id), Stat { id, @@ -583,7 +609,7 @@ impl RawSftpSession { { let id = self.use_next_id(); let result = self - .send( + .request( Some(id), Rename { id, @@ -600,7 +626,7 @@ impl RawSftpSession { pub async fn readlink>(&self, path: P) -> SftpResult { let id = self.use_next_id(); let result = self - .send( + .request( Some(id), ReadLink { id, @@ -620,7 +646,7 @@ impl RawSftpSession { { let id = self.use_next_id(); let result = self - .send( + .request( Some(id), Symlink { id, @@ -638,7 +664,7 @@ impl RawSftpSession { /// The extension can return any packet, so it's not specific pub async fn extended>(&self, request: R, data: Vec) -> SftpResult { let id = self.use_next_id(); - self.send( + self.request( Some(id), Extended { id, diff --git a/crates/bssh-russh-sftp/src/client/runtime.rs b/crates/bssh-russh-sftp/src/client/runtime.rs new file mode 100644 index 00000000..6a18c356 --- /dev/null +++ b/crates/bssh-russh-sftp/src/client/runtime.rs @@ -0,0 +1,102 @@ +//! Runtime abstraction over tokio (native) and wasm-bindgen-futures / gloo-timers (wasm32). + +use std::{ + fmt, + future::Future, + pin::Pin, + task::{Context, Poll}, + time::Duration, +}; + +use super::error::Error; + +#[derive(Debug)] +pub struct JoinError; + +impl fmt::Display for JoinError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str("JoinError") + } +} + +impl std::error::Error for JoinError {} + +pub struct JoinHandle { + rx: tokio::sync::oneshot::Receiver, +} + +impl Future for JoinHandle { + type Output = Result; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + match Pin::new(&mut self.rx).poll(cx) { + Poll::Ready(Ok(v)) => Poll::Ready(Ok(v)), + Poll::Ready(Err(_)) => Poll::Ready(Err(JoinError)), + Poll::Pending => Poll::Pending, + } + } +} + +#[cfg(not(target_arch = "wasm32"))] +macro_rules! spawn_impl { + ($fut:expr) => {{ + tokio::spawn($fut); + }}; +} + +#[cfg(target_arch = "wasm32")] +macro_rules! spawn_impl { + ($fut:expr) => { + wasm_bindgen_futures::spawn_local($fut) + }; +} + +pub fn spawn(future: F) -> JoinHandle +where + F: Future + Send + 'static, + T: Send + 'static, +{ + let (tx, rx) = tokio::sync::oneshot::channel(); + spawn_impl!(async move { + let _ = tx.send(future.await); + }); + JoinHandle { rx } +} + +#[cfg(not(target_arch = "wasm32"))] +pub async fn timeout(duration: Duration, future: F) -> Result { + tokio::time::timeout(duration, future) + .await + .map_err(|_| Error::Timeout) +} + +// wasm32-unknown-unknown is single-threaded, so Send is trivially safe +#[cfg(target_arch = "wasm32")] +struct SendWrapper(F); + +#[cfg(target_arch = "wasm32")] +unsafe impl Send for SendWrapper {} + +#[cfg(target_arch = "wasm32")] +unsafe impl Sync for SendWrapper {} + +#[cfg(target_arch = "wasm32")] +impl Future for SendWrapper { + type Output = F::Output; + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + unsafe { self.map_unchecked_mut(|w| &mut w.0) }.poll(cx) + } +} + +#[cfg(target_arch = "wasm32")] +pub async fn timeout(duration: Duration, future: F) -> Result { + let timer = SendWrapper(gloo_timers::future::TimeoutFuture::new( + duration.as_millis() as u32, + )); + tokio::pin!(future); + tokio::pin!(timer); + tokio::select! { + v = &mut future => Ok(v), + _ = &mut timer => Err(Error::Timeout), + } +} diff --git a/crates/bssh-russh-sftp/src/client/session.rs b/crates/bssh-russh-sftp/src/client/session.rs index 6d195b08..d393d6ec 100644 --- a/crates/bssh-russh-sftp/src/client/session.rs +++ b/crates/bssh-russh-sftp/src/client/session.rs @@ -8,23 +8,26 @@ use super::{ RawSftpSession, }; use crate::{ + client::Config, extensions::{self, Statvfs}, protocol::{FileAttributes, OpenFlags, StatusCode}, }; -#[derive(Debug, Default)] -pub(crate) struct Extensions { +#[derive(Debug, Clone, Copy)] +pub(crate) struct Features { pub hardlink: bool, pub fsync: bool, pub statvfs: bool, - pub limits: Option>, + pub limits: Option, + pub max_concurrent_writes: usize, + pub max_packet_len: u32, } /// High-level SFTP implementation for easy interaction with a remote file system. /// Contains most methods similar to the native [filesystem](std::fs) pub struct SftpSession { session: Arc, - extensions: Arc, + features: Features, } impl SftpSession { @@ -33,60 +36,49 @@ impl SftpSession { where S: AsyncRead + AsyncWrite + Unpin + Send + 'static, { - Self::new_opts(stream, None).await + Self::new_with_config(stream, Config::default()).await } - /// Creates a new session with timeout opt before the first request - pub async fn new_opts(stream: S, timeout: Option) -> SftpResult + /// Creates a new session with custom configuration + pub async fn new_with_config(stream: S, cfg: Config) -> SftpResult where S: AsyncRead + AsyncWrite + Unpin + Send + 'static, { - let mut session = RawSftpSession::new(stream); - - // todo: for new options we need builder - if let Some(timeout) = timeout { - session.set_timeout(timeout).await; - } + let max_concurrent_writes = cfg.max_concurrent_writes; + let max_packet_len = cfg.max_packet_len; + let mut session = RawSftpSession::new_with_config(stream, cfg); let version = session.init().await?; - let mut extensions = Extensions { - hardlink: version - .extensions - .get(extensions::HARDLINK) - .is_some_and(|e| e == "1"), - fsync: version - .extensions - .get(extensions::FSYNC) - .is_some_and(|e| e == "1"), - statvfs: version - .extensions - .get(extensions::STATVFS) - .is_some_and(|e| e == "2"), + let has_extension = |name, ver| version.extensions.get(name).is_some_and(|v| v == ver); + + let mut features = Features { + hardlink: has_extension(extensions::HARDLINK, "1"), + fsync: has_extension(extensions::FSYNC, "1"), + statvfs: has_extension(extensions::STATVFS, "2"), limits: None, + max_concurrent_writes, + max_packet_len, }; - if version - .extensions - .get(extensions::LIMITS) - .is_some_and(|e| e == "1") - { - let limits = session.limits().await?; - let limits = Arc::new(Limits::from(limits)); - - session.set_limits(limits.clone()); - extensions.limits = Some(limits); + if has_extension(extensions::LIMITS, "1") { + let limits = Limits::from(session.limits().await?); + session.set_limits(limits); + features.limits = Some(limits); + if let Some(plen) = limits.packet_len { + features.max_packet_len = (plen as u32).min(max_packet_len); + } } Ok(Self { session: Arc::new(session), - extensions: Arc::new(extensions), + features, }) } /// Set the maximum response time in seconds. /// Default: 10 seconds - pub async fn set_timeout(&self, secs: u64) { - self.session.set_timeout(secs).await; + pub fn set_timeout(&self, secs: u64) { + self.session.set_timeout(secs); } /// Closes the inner channel stream. @@ -128,11 +120,7 @@ impl SftpSession { attributes: FileAttributes, ) -> SftpResult { let handle = self.session.open(filename, flags, attributes).await?.handle; - Ok(File::new( - self.session.clone(), - handle, - self.extensions.clone(), - )) + Ok(File::new(self.session.clone(), handle, self.features)) } /// Requests the remote party for the absolute from the relative path. @@ -180,8 +168,11 @@ impl SftpSession { /// Returns an iterator over the entries within a directory. pub async fn read_dir>(&self, path: P) -> SftpResult { - let mut files = vec![]; + let path: String = path.into(); + let parent = Arc::from(path.as_str()); + let handle = self.session.opendir(path).await?.handle; + let mut files = vec![]; loop { match self.session.readdir(handle.as_str()).await { @@ -190,7 +181,7 @@ impl SftpSession { .files .into_iter() .map(|f| (f.filename, f.attrs)) - .chain(files.into_iter()) + .chain(files) .collect(); } Err(Error::Status(status)) if status.status_code == StatusCode::Eof => break, @@ -201,6 +192,7 @@ impl SftpSession { self.session.close(handle).await?; Ok(ReadDir { + parent, entries: files.into(), }) } @@ -265,7 +257,7 @@ impl SftpSession { O: Into, N: Into, { - if !self.extensions.hardlink { + if !self.features.hardlink { return Ok(false); } @@ -275,7 +267,7 @@ impl SftpSession { /// Performs a statvfs on the remote file system path. /// Returns [`Ok(None)`] if the remote SFTP server does not support `statvfs@openssh.com` extension v2. pub async fn fs_info>(&self, path: P) -> SftpResult> { - if !self.extensions.statvfs { + if !self.features.statvfs { return Ok(None); } diff --git a/crates/bssh-russh-sftp/src/de.rs b/crates/bssh-russh-sftp/src/de.rs index 8e63a48c..75b505f7 100644 --- a/crates/bssh-russh-sftp/src/de.rs +++ b/crates/bssh-russh-sftp/src/de.rs @@ -170,7 +170,7 @@ impl<'de> serde::Deserializer<'de> for &mut Deserializer<'de> { where V: serde::de::Visitor<'de>, { - visitor.visit_byte_buf(self.input.try_get_bytes()?) + self.deserialize_bytes(visitor) } fn deserialize_option(self, _visitor: V) -> Result diff --git a/crates/bssh-russh-sftp/src/lib.rs b/crates/bssh-russh-sftp/src/lib.rs index 240bb300..91a7e974 100644 --- a/crates/bssh-russh-sftp/src/lib.rs +++ b/crates/bssh-russh-sftp/src/lib.rs @@ -1,6 +1,3 @@ -// Lints tripped by vendored upstream source that we do not want to diverge from. -#![allow(clippy::io_other_error)] - //! SFTP subsystem with client and server support for Russh and more! //! //! Crate can provide compatibility with anything that can provide the raw data @@ -34,5 +31,6 @@ pub mod extensions; pub mod protocol; pub mod ser; /// Server side +#[cfg(not(target_arch = "wasm32"))] pub mod server; mod utils; diff --git a/crates/bssh-russh-sftp/src/protocol/mod.rs b/crates/bssh-russh-sftp/src/protocol/mod.rs index f1eb5210..50d58ab5 100644 --- a/crates/bssh-russh-sftp/src/protocol/mod.rs +++ b/crates/bssh-russh-sftp/src/protocol/mod.rs @@ -270,11 +270,7 @@ impl TryFrom for Bytes { Packet::ExtendedReply(reply) => (SSH_FXP_EXTENDED_REPLY, ser::to_bytes(&reply)?), }; - let length = payload - .len() - .checked_add(1) - .and_then(|len| u32::try_from(len).ok()) - .ok_or_else(|| Error::BadMessage("packet length exceeds u32".to_owned()))?; + let length = payload.len() as u32 + 1; let mut bytes = BytesMut::new(); bytes.put_u32(length); bytes.put_u8(r#type); @@ -282,100 +278,3 @@ impl TryFrom for Bytes { Ok(bytes.freeze()) } } - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn write_packet_uses_length_prefixed_bulk_data() { - let packet = Packet::Write(Write { - id: 7, - handle: "h".to_owned(), - offset: 9, - data: vec![0, 1, 2, 3], - }); - - let encoded = Bytes::try_from(packet).expect("serialize write packet"); - assert_eq!( - encoded.as_ref(), - &[ - 0, - 0, - 0, - 26, // packet length - SSH_FXP_WRITE, - 0, - 0, - 0, - 7, // request id - 0, - 0, - 0, - 1, - b'h', // handle - 0, - 0, - 0, - 0, - 0, - 0, - 0, - 9, // offset - 0, - 0, - 0, - 4, - 0, - 1, - 2, - 3, // data - ] - ); - - let mut payload = encoded.slice(4..); - let decoded = Packet::try_from(&mut payload).expect("deserialize write packet"); - match decoded { - Packet::Write(write) => assert_eq!(write.data, [0, 1, 2, 3]), - _ => panic!("expected write packet"), - } - } - - #[test] - fn data_packet_uses_length_prefixed_bulk_data() { - let packet = Packet::Data(Data { - id: 8, - data: vec![4, 5, 6], - }); - - let encoded = Bytes::try_from(packet).expect("serialize data packet"); - assert_eq!( - encoded.as_ref(), - &[ - 0, - 0, - 0, - 12, // packet length - SSH_FXP_DATA, - 0, - 0, - 0, - 8, // request id - 0, - 0, - 0, - 3, - 4, - 5, - 6, // data - ] - ); - - let mut payload = encoded.slice(4..); - let decoded = Packet::try_from(&mut payload).expect("deserialize data packet"); - match decoded { - Packet::Data(data) => assert_eq!(data.data, [4, 5, 6]), - _ => panic!("expected data packet"), - } - } -} diff --git a/crates/bssh-russh-sftp/src/ser.rs b/crates/bssh-russh-sftp/src/ser.rs index ee7f6853..8ffba16f 100644 --- a/crates/bssh-russh-sftp/src/ser.rs +++ b/crates/bssh-russh-sftp/src/ser.rs @@ -104,9 +104,7 @@ impl<'a> serde::Serializer for &'a mut Serializer { } fn serialize_bytes(self, v: &[u8]) -> Result { - let len = u32::try_from(v.len()) - .map_err(|_| Error::BadMessage("bytes length exceeds u32".to_owned()))?; - self.output.put_u32(len); + self.output.put_u32(v.len() as u32); self.output.put_slice(v); Ok(()) } diff --git a/crates/bssh-russh-sftp/src/server/handler.rs b/crates/bssh-russh-sftp/src/server/handler.rs index d8a7d483..65f035d1 100644 --- a/crates/bssh-russh-sftp/src/server/handler.rs +++ b/crates/bssh-russh-sftp/src/server/handler.rs @@ -1,16 +1,17 @@ use std::{collections::HashMap, future::Future}; -use crate::protocol::{ - Attrs, Data, FileAttributes, Handle, Name, OpenFlags, Packet, Status, StatusCode, Version, +use crate::{ + protocol::{Attrs, Data, FileAttributes, Handle, Name, OpenFlags, Packet, Status, Version}, + server::StatusReply, }; /// Server handler for each client. This is `async_trait` #[cfg_attr(feature = "async-trait", async_trait::async_trait)] pub trait Handler: Sized { - /// The type must have an `Into` + /// The type must have an `Into` /// implementation because a response must be sent /// to any request, even if completed by error. - type Error: Into + Send; + type Error: Into + Send; /// Called by the handler when the packet is not implemented fn unimplemented(&self) -> Self::Error; diff --git a/crates/bssh-russh-sftp/src/server/mod.rs b/crates/bssh-russh-sftp/src/server/mod.rs index 6228ed44..a3e5a9ba 100644 --- a/crates/bssh-russh-sftp/src/server/mod.rs +++ b/crates/bssh-russh-sftp/src/server/mod.rs @@ -1,25 +1,50 @@ mod handler; +mod reply; use bytes::Bytes; use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt}; pub use self::handler::Handler; +pub use self::reply::StatusReply; use crate::{ error::Error, - protocol::{Packet, StatusCode}, + protocol::{Packet, Status, StatusCode}, utils::read_packet, }; macro_rules! into_wrap { ($id:expr, $handler:expr, $var:ident; $($arg:ident),*) => { match $handler.$var($($var.$arg),*).await { - Err(err) => Packet::error($id, err.into()), + Err(err) => { + let StatusReply { status_code, error_message, language_tag } = err.into(); + Packet::Status(Status { + id: $id, + status_code, + error_message: error_message.unwrap_or_else(|| status_code.to_string()), + language_tag: language_tag.unwrap_or_else(|| "en-US".to_string()), + }) + }, Ok(packet) => packet.into(), } }; } +/// Configuration for the SFTP server. +#[derive(Clone, Debug)] +pub struct Config { + /// Maximum allowed size of SFTP packets sent by clients. Default: 256 KiB. + pub max_client_packet_len: u32, +} + +impl Default for Config { + fn default() -> Self { + Self { + max_client_packet_len: 262144, + } + } +} + async fn process_request(packet: Packet, handler: &mut H) -> Packet where H: Handler + Send, @@ -51,12 +76,12 @@ where } } -async fn process_handler(stream: &mut S, handler: &mut H) -> Result<(), Error> +async fn process_handler(stream: &mut S, handler: &mut H, cfg: &Config) -> Result<(), Error> where H: Handler + Send, S: AsyncRead + AsyncWrite + Unpin, { - let mut bytes = read_packet(stream).await?; + let mut bytes = read_packet(stream, cfg.max_client_packet_len).await?; let response = match Packet::try_from(&mut bytes) { Ok(request) => process_request(request, handler).await, @@ -71,14 +96,23 @@ where } /// Run processing stream as SFTP -pub async fn run(mut stream: S, mut handler: H) +pub async fn run(stream: S, handler: H) +where + S: AsyncRead + AsyncWrite + Unpin + Send + 'static, + H: Handler + Send + 'static, +{ + run_with_config(stream, handler, Config::default()).await +} + +/// Run processing stream as SFTP with custom configuration +pub async fn run_with_config(mut stream: S, mut handler: H, cfg: Config) where S: AsyncRead + AsyncWrite + Unpin + Send + 'static, H: Handler + Send + 'static, { tokio::spawn(async move { loop { - match process_handler(&mut stream, &mut handler).await { + match process_handler(&mut stream, &mut handler, &cfg).await { Err(Error::UnexpectedEof) => break, Err(err) => warn!("{}", err), Ok(_) => (), diff --git a/crates/bssh-russh-sftp/src/server/reply.rs b/crates/bssh-russh-sftp/src/server/reply.rs new file mode 100644 index 00000000..228d7d41 --- /dev/null +++ b/crates/bssh-russh-sftp/src/server/reply.rs @@ -0,0 +1,51 @@ +use crate::protocol::StatusCode; + +/// Response sent by [`Handler`](super::Handler) for any request that completes +/// via the `Err` arm. Mapped to an `SSH_FXP_STATUS` packet. +/// +/// `error_message` and `language_tag` fall back to `status_code.to_string()` +/// and `"en-US"` respectively when left as `None`, so simple cases that only +/// carry a [`StatusCode`] stay allocation-free. +#[derive(Debug, Clone, PartialEq)] +pub struct StatusReply { + pub status_code: StatusCode, + pub error_message: Option, + pub language_tag: Option, +} + +impl StatusReply { + pub fn new(status_code: StatusCode) -> Self { + Self { + status_code, + error_message: None, + language_tag: None, + } + } + + pub fn with_message(mut self, message: impl Into) -> Self { + self.error_message = Some(message.into()); + self + } + + pub fn with_language_tag(mut self, tag: impl Into) -> Self { + self.language_tag = Some(tag.into()); + self + } +} + +impl From for StatusReply { + fn from(status_code: StatusCode) -> Self { + Self::new(status_code) + } +} + +// Lives here, not in `protocol/status.rs`, to keep `protocol` free of a +// dependency on `server`-side types. +impl StatusCode { + /// Attach a custom message to this status code and produce a [`StatusReply`]. + /// + /// Shorthand for `StatusReply::new(code).with_message(msg)`. + pub fn with_message(self, message: impl Into) -> StatusReply { + StatusReply::new(self).with_message(message) + } +} diff --git a/crates/bssh-russh-sftp/src/utils.rs b/crates/bssh-russh-sftp/src/utils.rs index 783cdf14..b841dbb0 100644 --- a/crates/bssh-russh-sftp/src/utils.rs +++ b/crates/bssh-russh-sftp/src/utils.rs @@ -5,12 +5,18 @@ use tokio::io::{AsyncRead, AsyncReadExt}; use crate::error::Error; -pub fn unix(time: SystemTime) -> u32 { +pub(crate) fn unix(time: SystemTime) -> u32 { DateTime::::from(time).timestamp() as u32 } -pub async fn read_packet(stream: &mut S) -> Result { +pub(crate) async fn read_packet( + stream: &mut S, + max_length: u32, +) -> Result { let length = stream.read_u32().await?; + if length > max_length { + return Err(Error::BadMessage("packet length limit exceeded".to_owned())); + } let mut buf = vec![0; length as usize]; stream.read_exact(&mut buf).await?; diff --git a/crates/bssh-russh-sftp/sync-upstream.sh b/crates/bssh-russh-sftp/sync-upstream.sh index 96741fc7..01071bc6 100755 --- a/crates/bssh-russh-sftp/sync-upstream.sh +++ b/crates/bssh-russh-sftp/sync-upstream.sh @@ -10,7 +10,6 @@ set -e SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" UPSTREAM_URL="https://github.com/AspectUnk/russh-sftp.git" TEMP_DIR="/tmp/russh-sftp-sync-$$" -PATCH_FILE="$SCRIPT_DIR/patches/sftp-serde-bytes-perf.patch" RED='\033[0;31m' GREEN='\033[0;32m' @@ -74,27 +73,7 @@ if [ "$VERSION" != "master" ]; then log_info "Updated version to $CLEAN_VERSION" fi -log_info "Applying patches..." - -if [ -f "$PATCH_FILE" ]; then - if patch -p1 --dry-run < "$PATCH_FILE" > /dev/null 2>&1; then - patch -p1 < "$PATCH_FILE" - log_info "Applied sftp-serde-bytes-perf.patch" - else - log_warn "Patch may not apply cleanly, attempting with fuzz..." - if patch -p1 --fuzz=3 < "$PATCH_FILE"; then - log_warn "Patch applied with fuzz - please verify manually" - else - log_error "Failed to apply patch. Manual intervention required." - log_error "Patch file: $PATCH_FILE" - exit 1 - fi - fi -else - log_error "Patch file not found: $PATCH_FILE" - log_error "Please create the patch file first using: ./create-patch.sh" - exit 1 -fi +log_info "No local patches to apply (the read_to_writer_pipelined helper lives directly in src/client/fs/file.rs until upstream PR #91 merges)." log_info "Verifying build..." cd "$SCRIPT_DIR/../.." diff --git a/src/server/sftp.rs b/src/server/sftp.rs index b4119b53..6da92b85 100644 --- a/src/server/sftp.rs +++ b/src/server/sftp.rs @@ -50,6 +50,7 @@ use std::sync::Arc; use russh_sftp::protocol::{ Attrs, Data, FileAttributes, Handle, Name, OpenFlags, Status, StatusCode, Version, }; +use russh_sftp::server::StatusReply; use tokio::fs::{self, File, OpenOptions}; use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt}; use tokio::sync::Mutex; @@ -138,6 +139,12 @@ impl From for StatusCode { } } +impl From for StatusReply { + fn from(err: SftpError) -> Self { + StatusReply::new(err.code).with_message(err.message) + } +} + /// An open file or directory handle. enum OpenHandle { /// An open file. diff --git a/src/ssh/tokio_client/file_transfer.rs b/src/ssh/tokio_client/file_transfer.rs index 7afe6861..7a856b54 100644 --- a/src/ssh/tokio_client/file_transfer.rs +++ b/src/ssh/tokio_client/file_transfer.rs @@ -19,18 +19,30 @@ //! - Recursive directory upload/download //! - Support for glob patterns -use russh_sftp::{client::SftpSession, protocol::OpenFlags}; +use russh_sftp::{client::{Config, SftpSession}, protocol::OpenFlags}; use std::path::Path; use tokio::io::AsyncWriteExt; use super::connection::Client; -/// Maximum number of concurrent SFTP `WRITE`/`READ` requests held in flight per -/// transfer. Mirrors OpenSSH `sftp(1)`'s default (`-R 64`) — large enough to +/// Maximum number of concurrent SFTP `READ`/`WRITE` requests held in flight per +/// transfer. Mirrors OpenSSH `sftp(1)`'s default (`-R 64`) — large enough to /// hide per-request RTT on intra-DC and intercontinental links, small enough to /// keep peak buffer memory bounded (`MAX_INFLIGHT * MAX_WRITE_LENGTH ≈ 16 MiB`). +/// +/// `READ` uses this through `File::read_to_writer_pipelined`. `WRITE` is +/// pipelined natively by `AsyncWrite for File` (upstream russh-sftp 2.3.0+, +/// PR #85), so we plumb the same depth through `Config::max_concurrent_writes` +/// when opening the SFTP session. const MAX_INFLIGHT_REQUESTS: usize = 64; +fn sftp_config() -> Config { + Config { + max_concurrent_writes: MAX_INFLIGHT_REQUESTS, + ..Default::default() + } +} + impl Client { /// Upload a file with sftp to the remote server. /// @@ -49,7 +61,7 @@ impl Client { // start sftp session let channel = self.get_channel().await?; channel.request_subsystem(true, "sftp").await?; - let sftp = SftpSession::new(channel.into_stream()).await?; + let sftp = SftpSession::new_with_config(channel.into_stream(), sftp_config()).await?; // Stream local file with multiple SFTP WRITE requests in flight to // hide per-request RTT and avoid loading the entire file in memory. @@ -63,8 +75,9 @@ impl Client { OpenFlags::CREATE | OpenFlags::TRUNCATE | OpenFlags::WRITE | OpenFlags::READ, ) .await?; - file.write_all_pipelined(&mut local_file, MAX_INFLIGHT_REQUESTS) - .await?; + tokio::io::copy(&mut local_file, &mut file) + .await + .map_err(super::Error::IoError)?; file.flush().await.map_err(super::Error::IoError)?; file.shutdown().await.map_err(super::Error::IoError)?; @@ -86,7 +99,7 @@ impl Client { // start sftp session let channel = self.get_channel().await?; channel.request_subsystem(true, "sftp").await?; - let sftp = SftpSession::new(channel.into_stream()).await?; + let sftp = SftpSession::new_with_config(channel.into_stream(), sftp_config()).await?; // Stream remote file with multiple SFTP READ requests in flight; chunks // are reassembled in offset order before being written to disk. @@ -129,7 +142,7 @@ impl Client { // Start SFTP session let channel = self.get_channel().await?; channel.request_subsystem(true, "sftp").await?; - let sftp = SftpSession::new(channel.into_stream()).await?; + let sftp = SftpSession::new_with_config(channel.into_stream(), sftp_config()).await?; // Create remote directory if it doesn't exist let _ = sftp.create_dir(&remote_dir).await; // Ignore error if already exists @@ -182,9 +195,9 @@ impl Client { ) .await?; - remote_file - .write_all_pipelined(&mut local_file, MAX_INFLIGHT_REQUESTS) - .await?; + tokio::io::copy(&mut local_file, &mut remote_file) + .await + .map_err(super::Error::IoError)?; remote_file.flush().await.map_err(super::Error::IoError)?; remote_file .shutdown() @@ -213,7 +226,7 @@ impl Client { // Start SFTP session let channel = self.get_channel().await?; channel.request_subsystem(true, "sftp").await?; - let sftp = SftpSession::new(channel.into_stream()).await?; + let sftp = SftpSession::new_with_config(channel.into_stream(), sftp_config()).await?; // Create local directory if it doesn't exist tokio::fs::create_dir_all(local_dir)