From 1e66a831973520fb11bf6c9a55b3f69b26d7dbf4 Mon Sep 17 00:00:00 2001 From: Denis Cornehl Date: Fri, 3 Apr 2026 02:44:42 +0200 Subject: [PATCH 1/8] storage: start streaming zip/index uploads, parallel directory upload --- Cargo.lock | 2 + crates/bin/docs_rs_admin/src/main.rs | 2 + crates/lib/docs_rs_storage/Cargo.toml | 2 + .../lib/docs_rs_storage/src/archive_index.rs | 12 +- .../docs_rs_storage/src/backends/memory.rs | 33 +++- .../lib/docs_rs_storage/src/backends/mod.rs | 8 +- crates/lib/docs_rs_storage/src/backends/s3.rs | 84 ++++---- crates/lib/docs_rs_storage/src/blob.rs | 122 +++++++++++- crates/lib/docs_rs_storage/src/config.rs | 10 +- .../docs_rs_storage/src/storage/blocking.rs | 25 +-- .../src/storage/non_blocking.rs | 184 +++++++++--------- 11 files changed, 306 insertions(+), 178 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b3473484d..38c474002 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2299,6 +2299,8 @@ dependencies = [ "flate2", "futures-util", "http 1.4.0", + "http-body 1.0.1", + "http-body-util", "itertools 0.14.0", "mime", "moka", diff --git a/crates/bin/docs_rs_admin/src/main.rs b/crates/bin/docs_rs_admin/src/main.rs index 54821f537..726870eee 100644 --- a/crates/bin/docs_rs_admin/src/main.rs +++ b/crates/bin/docs_rs_admin/src/main.rs @@ -1,3 +1,5 @@ +#![recursion_limit = "256"] + mod rebuilds; mod repackage; #[cfg(test)] diff --git a/crates/lib/docs_rs_storage/Cargo.toml b/crates/lib/docs_rs_storage/Cargo.toml index b7b6380aa..83773084a 100644 --- a/crates/lib/docs_rs_storage/Cargo.toml +++ b/crates/lib/docs_rs_storage/Cargo.toml @@ -37,6 +37,8 @@ docs_rs_utils = { path = "../docs_rs_utils" } flate2 = "1.1.1" futures-util = { workspace = true } http = { workspace = true } +http-body = "1.0.0" +http-body-util = "0.1.3" itertools = { workspace = true } mime = { workspace = true } moka = { version = "0.12.14", features = ["future"] } diff --git a/crates/lib/docs_rs_storage/src/archive_index.rs b/crates/lib/docs_rs_storage/src/archive_index.rs index 334fb96c6..3e3b54621 100644 --- a/crates/lib/docs_rs_storage/src/archive_index.rs +++ b/crates/lib/docs_rs_storage/src/archive_index.rs @@ -830,7 +830,7 @@ where #[cfg(test)] mod tests { use super::*; - use crate::blob::StreamingBlob; + use crate::{blob::StreamingBlob, storage::non_blocking::ZIP_BUFFER_SIZE}; use chrono::Utc; use docs_rs_config::AppConfig as _; use docs_rs_opentelemetry::testing::TestMetrics; @@ -839,14 +839,15 @@ mod tests { use zip::write::SimpleFileOptions; async fn create_test_archive(file_count: u32) -> Result { - spawn_blocking(move || { + let writer = spawn_blocking(move || { use std::io::Write as _; let tf = tempfile::tempfile()?; let objectcontent: Vec = (0..255).collect(); - let mut archive = zip::ZipWriter::new(tf); + let mut archive = + zip::ZipWriter::new(std::io::BufWriter::with_capacity(ZIP_BUFFER_SIZE, tf)); for i in 0..file_count { archive.start_file( format!("testfile{i}"), @@ -858,8 +859,9 @@ mod tests { } Ok(archive.finish()?) }) - .await - .map(fs::File::from_std) + .await?; + + Ok(fs::File::from_std(writer.into_inner()?)) } struct FakeDownloader { diff --git a/crates/lib/docs_rs_storage/src/backends/memory.rs b/crates/lib/docs_rs_storage/src/backends/memory.rs index 385cd7f3f..cf4dd5bbe 100644 --- a/crates/lib/docs_rs_storage/src/backends/memory.rs +++ b/crates/lib/docs_rs_storage/src/backends/memory.rs @@ -1,16 +1,18 @@ use crate::{ Blob, backends::StorageBackendMethods, - blob::{BlobUpload, StreamingBlob}, + blob::{StreamUpload, StreamingBlob}, errors::PathNotFoundError, metrics::StorageMetrics, types::FileRange, }; use anyhow::{Result, anyhow}; +use chrono::Utc; use dashmap::DashMap; use docs_rs_headers::compute_etag; use futures_util::stream::{self, BoxStream}; use itertools::Itertools as _; +use tokio::io; pub(crate) struct MemoryBackend { otel_metrics: StorageMetrics, @@ -46,16 +48,29 @@ impl StorageBackendMethods for MemoryBackend { Ok(blob.into()) } - async fn store_batch(&self, batch: Vec) -> Result<()> { - self.otel_metrics - .uploaded_files - .add(batch.len() as u64, &[]); + async fn upload_stream(&self, upload: StreamUpload) -> Result<()> { + let StreamUpload { + path, + mime, + source, + compression, + } = upload; - for upload in batch { - let blob: Blob = upload.into(); - self.objects.insert(blob.path.clone(), blob); - } + let mut content = source.reader().await?; + let mut buffer = Vec::new(); + io::copy(&mut content, &mut buffer).await?; + + let blob = Blob { + path, + mime, + date_updated: Utc::now(), + etag: Some(compute_etag(&buffer)), + content: buffer, + compression, + }; + self.otel_metrics.uploaded_files.add(1, &[]); + self.objects.insert(blob.path.clone(), blob); Ok(()) } diff --git a/crates/lib/docs_rs_storage/src/backends/mod.rs b/crates/lib/docs_rs_storage/src/backends/mod.rs index a257f8de3..effb27dc5 100644 --- a/crates/lib/docs_rs_storage/src/backends/mod.rs +++ b/crates/lib/docs_rs_storage/src/backends/mod.rs @@ -2,14 +2,14 @@ pub(crate) mod memory; pub(crate) mod s3; -use crate::{BlobUpload, StreamingBlob, types::FileRange}; +use crate::{StreamingBlob, blob::StreamUpload, types::FileRange}; use anyhow::Result; use futures_util::stream::BoxStream; pub(crate) trait StorageBackendMethods { async fn exists(&self, path: &str) -> Result; async fn get_stream(&self, path: &str, range: Option) -> Result; - async fn store_batch(&self, batch: Vec) -> Result<()>; + async fn upload_stream(&self, upload: StreamUpload) -> Result<()>; async fn list_prefix<'a>(&'a self, prefix: &'a str) -> BoxStream<'a, Result>; async fn delete_prefix(&self, prefix: &str) -> Result<()>; } @@ -39,8 +39,8 @@ impl StorageBackendMethods for StorageBackend { call_inner!(self, get_stream(path, range)) } - async fn store_batch(&self, batch: Vec) -> Result<()> { - call_inner!(self, store_batch(batch)) + async fn upload_stream(&self, upload: StreamUpload) -> Result<()> { + call_inner!(self, upload_stream(upload)) } async fn list_prefix<'a>(&'a self, prefix: &'a str) -> BoxStream<'a, Result> { diff --git a/crates/lib/docs_rs_storage/src/backends/s3.rs b/crates/lib/docs_rs_storage/src/backends/s3.rs index 93f7a6c67..1e4f4135c 100644 --- a/crates/lib/docs_rs_storage/src/backends/s3.rs +++ b/crates/lib/docs_rs_storage/src/backends/s3.rs @@ -1,7 +1,7 @@ use crate::{ Config, backends::StorageBackendMethods, - blob::{BlobUpload, StreamingBlob}, + blob::{StreamUpload, StreamingBlob}, errors::PathNotFoundError, metrics::StorageMetrics, types::FileRange, @@ -13,15 +13,20 @@ use aws_sdk_s3::{ Client, config::{Region, retry::RetryConfig}, error::{ProvideErrorMetadata, SdkError}, + primitives::ByteStream, types::{Delete, ObjectIdentifier}, }; use aws_smithy_types_convert::date_time::DateTimeExt; use chrono::Utc; use docs_rs_headers::{ETag, compute_etag}; use futures_util::{ - future::TryFutureExt, - stream::{BoxStream, FuturesUnordered, StreamExt}, + TryStreamExt, + stream::{BoxStream, StreamExt}, }; +use http_body::Frame; +use http_body_util::StreamBody; +use opentelemetry::KeyValue; +use tokio_util::io::ReaderStream; use tracing::{error, warn}; // error codes to check for when trying to determine if an error is @@ -230,45 +235,50 @@ impl StorageBackendMethods for S3Backend { }) } - async fn store_batch(&self, mut batch: Vec) -> Result<(), Error> { - // Attempt to upload the batch 3 times - for _ in 0..3 { - let mut futures = FuturesUnordered::new(); - for blob in batch.drain(..) { - futures.push( - self.client - .put_object() - .bucket(&self.bucket) - .key(&blob.path) - .body(blob.content.clone().into()) - .content_type(blob.mime.to_string()) - .set_content_encoding(blob.compression.map(|alg| alg.to_string())) - .send() - .map_ok(|_| { - self.otel_metrics.uploaded_files.add(1, &[]); - }) - .map_err(|err| { - warn!(?err, "Failed to upload blob to S3"); - // Reintroduce failed blobs for a retry - blob - }), - ); - } + async fn upload_stream(&self, upload: StreamUpload) -> Result<(), Error> { + let StreamUpload { + path, + mime, + source, + compression, + } = upload; - while let Some(result) = futures.next().await { - // Push each failed blob back into the batch - if let Err(blob) = result { - batch.push(blob); - } - } + let content_length = source.content_length().await?; + + let mut last_err = None; + + for attempt in 1..=3 { + let reader = source.reader().await?; + let stream = ReaderStream::new(reader).map_ok(Frame::data); - // If we uploaded everything in the batch, we're done - if batch.is_empty() { - return Ok(()); + match self + .client + .put_object() + .bucket(&self.bucket) + .key(&path) + .body(ByteStream::from_body_1_x(StreamBody::new(stream))) + .content_length(content_length as i64) + .content_type(mime.to_string()) + .set_content_encoding(compression.map(|alg| alg.to_string())) + .send() + .await + { + Ok(_) => { + self.otel_metrics + .uploaded_files + .add(1, &[KeyValue::new("attempt", attempt.to_string())]); + return Ok(()); + } + Err(err) => { + warn!(?err, attempt = attempt + 1, %path, "failed to upload blob to S3"); + last_err = Some(err); + } } } - panic!("failed to upload 3 times, exiting"); + Err(last_err + .expect("upload retry loop exited without a result") + .into()) } async fn list_prefix<'a>(&'a self, prefix: &'a str) -> BoxStream<'a, Result> { diff --git a/crates/lib/docs_rs_storage/src/blob.rs b/crates/lib/docs_rs_storage/src/blob.rs index 4dd30c473..6079e47c6 100644 --- a/crates/lib/docs_rs_storage/src/blob.rs +++ b/crates/lib/docs_rs_storage/src/blob.rs @@ -4,8 +4,66 @@ use chrono::{DateTime, Utc}; use docs_rs_headers::{ETag, compute_etag}; use docs_rs_types::CompressionAlgorithm; use mime::Mime; -use std::io; -use tokio::io::{AsyncBufRead, AsyncBufReadExt}; +use std::{ + fmt, + io::{Cursor, SeekFrom}, + sync::Arc, +}; +use tokio::{ + fs, + io::{self, AsyncBufRead, AsyncBufReadExt, AsyncRead, AsyncSeekExt}, +}; + +pub enum StreamUploadSource { + Bytes(Arc<[u8]>), + File(fs::File), +} + +impl StreamUploadSource { + pub async fn reader(&self) -> io::Result> { + Ok(match self { + Self::Bytes(bytes) => Box::new(Cursor::new(bytes.clone())), + Self::File(file) => { + let mut cloned = file.try_clone().await?; + cloned.seek(SeekFrom::Start(0)).await?; + Box::new(cloned) + } + }) + } + + pub async fn content_length(&self) -> io::Result { + Ok(match self { + Self::Bytes(bytes) => bytes.len() as u64, + Self::File(file) => file.metadata().await?.len(), + }) + } +} + +/// Represents a stream blob to be uploaded to storage. +/// +/// NOTE: Right now we only support uploads where the size is known in advance. +/// We can add support for streams with unknown size, but this would mean +/// using an intermediate fixed-size buffer and multipart uploads for these cases. +/// But: the multipart machinery is only worth the complexity if the stream is: +/// - unknown size +/// - bigger (there's a 5 MiB size limit for each part) +pub struct StreamUpload { + pub path: String, + pub mime: Mime, + pub source: StreamUploadSource, + pub compression: Option, +} + +impl From for StreamUpload { + fn from(value: BlobUpload) -> Self { + Self { + path: value.path, + mime: value.mime, + source: StreamUploadSource::Bytes(Arc::from(value.content)), + compression: value.compression, + } + } +} /// represents a blob to be uploaded to storage. #[derive(Clone, Debug, PartialEq, Eq)] @@ -60,7 +118,7 @@ pub struct StreamingBlob { pub content: Box, } -impl std::fmt::Debug for StreamingBlob { +impl fmt::Debug for StreamingBlob { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("StreamingBlob") .field("path", &self.path) @@ -110,7 +168,7 @@ impl StreamingBlob { let mut content = SizedBuffer::new(max_size); content.reserve(self.content_length); - tokio::io::copy(&mut self.content, &mut content).await?; + io::copy(&mut self.content, &mut content).await?; Ok(Blob { path: self.path, @@ -132,7 +190,7 @@ impl From for StreamingBlob { etag: value.etag, compression: value.compression, content_length: value.content.len(), - content: Box::new(io::Cursor::new(value.content)), + content: Box::new(Cursor::new(value.content)), } } } @@ -142,6 +200,10 @@ mod test { use super::*; use crate::compress_async; use docs_rs_headers::compute_etag; + use tokio::{ + fs, + io::{AsyncReadExt as _, AsyncWriteExt as _}, + }; const ZSTD_EOF_BYTES: [u8; 3] = [0x01, 0x00, 0x00]; @@ -157,7 +219,7 @@ mod test { compression: alg, etag: Some(compute_etag(&content)), content_length: content.len(), - content: Box::new(io::Cursor::new(content)), + content: Box::new(Cursor::new(content)), } } @@ -184,6 +246,52 @@ mod test { Ok(()) } + #[tokio::test] + async fn test_stream_upload_source_bytes_creates_fresh_readers() -> Result<()> { + const CONTENT: &[u8] = b"Hello, world!"; + + let source = StreamUploadSource::Bytes(Arc::from(CONTENT)); + assert_eq!(source.content_length().await?, CONTENT.len() as u64); + + let mut first = source.reader().await?; + let mut first_buf = Vec::new(); + first.read_to_end(&mut first_buf).await?; + assert_eq!(first_buf, CONTENT); + + let mut second = source.reader().await?; + let mut second_buf = Vec::new(); + second.read_to_end(&mut second_buf).await?; + assert_eq!(second_buf, CONTENT); + + Ok(()) + } + + #[tokio::test] + async fn test_stream_upload_source_file_creates_fresh_readers() -> Result<()> { + const CONTENT: &[u8] = b"Hello, world!"; + + let tempfile = tempfile::NamedTempFile::new()?; + let mut file = fs::File::from_std(tempfile.reopen()?); + file.write_all(CONTENT).await?; + file.seek(std::io::SeekFrom::Start(CONTENT.len() as u64)) + .await?; + + let source = StreamUploadSource::File(file); + assert_eq!(source.content_length().await?, CONTENT.len() as u64); + + let mut first = source.reader().await?; + let mut first_buf = Vec::new(); + first.read_to_end(&mut first_buf).await?; + assert_eq!(first_buf, CONTENT); + + let mut second = source.reader().await?; + let mut second_buf = Vec::new(); + second.read_to_end(&mut second_buf).await?; + assert_eq!(second_buf, CONTENT); + + Ok(()) + } + #[tokio::test] async fn test_streaming_broken_zstd_blob() -> Result<()> { const NOT_ZSTD: &[u8] = b"Hello, world!"; @@ -226,7 +334,7 @@ mod test { let mut compressed_content = Vec::new(); let alg = CompressionAlgorithm::Zstd; compress_async( - &mut io::Cursor::new(CONTENT.to_vec()), + &mut Cursor::new(CONTENT.to_vec()), &mut compressed_content, alg, ) diff --git a/crates/lib/docs_rs_storage/src/config.rs b/crates/lib/docs_rs_storage/src/config.rs index af2ccdd8b..4da949f99 100644 --- a/crates/lib/docs_rs_storage/src/config.rs +++ b/crates/lib/docs_rs_storage/src/config.rs @@ -106,11 +106,15 @@ pub struct Config { // we often also add compression on top of it, which is CPU-bound, // even when just light / simpler compression. pub local_filesystem_parallelism: usize, + + // How much we want to parallelize file uploads / downloads. + pub network_parallelism: usize, } impl AppConfig for Config { fn from_environment() -> anyhow::Result { let prefix: PathBuf = require_env("DOCSRS_PREFIX")?; + let cores = std::thread::available_parallelism()?.get(); Ok(Self { temp_dir: prefix.join("tmp"), @@ -124,10 +128,8 @@ impl AppConfig for Config { max_file_size_html: env("DOCSRS_MAX_FILE_SIZE_HTML", 50 * 1024 * 1024)?, #[cfg(any(test, feature = "testing"))] s3_bucket_is_temporary: false, - local_filesystem_parallelism: env( - "DOCSRS_LOCAL_FILESYSTEM_PARALLELISM", - std::thread::available_parallelism()?.get(), - )?, + local_filesystem_parallelism: env("DOCSRS_LOCAL_FILESYSTEM_PARALLELISM", cores)?, + network_parallelism: env("DOCSRS_NETWORK_PARALLELISM", 8usize.min(cores))?, }) } diff --git a/crates/lib/docs_rs_storage/src/storage/blocking.rs b/crates/lib/docs_rs_storage/src/storage/blocking.rs index bd4c5a152..59c28caaa 100644 --- a/crates/lib/docs_rs_storage/src/storage/blocking.rs +++ b/crates/lib/docs_rs_storage/src/storage/blocking.rs @@ -1,4 +1,4 @@ -use crate::{blob::Blob, file::FileEntry, storage::non_blocking::AsyncStorage, types::FileRange}; +use crate::{blob::Blob, file::FileEntry, storage::non_blocking::AsyncStorage}; use anyhow::Result; use docs_rs_types::{BuildId, CompressionAlgorithm, KrateName, Version}; use std::{fmt, path::Path, sync::Arc}; @@ -10,7 +10,6 @@ pub struct Storage { runtime: runtime::Handle, } -#[allow(dead_code)] impl Storage { pub fn new(inner: Arc, runtime: runtime::Handle) -> Self { Self { inner, runtime } @@ -70,17 +69,6 @@ impl Storage { self.runtime.block_on(self.inner.get(path, max_size)) } - pub(crate) fn get_range( - &self, - path: &str, - max_size: usize, - range: FileRange, - compression: Option, - ) -> Result { - self.runtime - .block_on(self.inner.get_range(path, max_size, range, compression)) - } - pub fn get_from_archive( &self, archive_path: &str, @@ -140,17 +128,6 @@ impl Storage { self.runtime.block_on(self.inner.store_one(path, content)) } - // Store file into the backend at the given path (also used to detect mime type), returns the - // chosen compression algorithm - pub fn store_path( - &self, - target_path: impl Into + std::fmt::Debug, - source_path: impl AsRef + std::fmt::Debug, - ) -> Result { - self.runtime - .block_on(self.inner.store_path(target_path, source_path)) - } - /// sync wrapper for the list_prefix function /// purely for testing purposes since it collects all files into a Vec. #[cfg(feature = "testing")] diff --git a/crates/lib/docs_rs_storage/src/storage/non_blocking.rs b/crates/lib/docs_rs_storage/src/storage/non_blocking.rs index c7cc48bd0..7c5e8350d 100644 --- a/crates/lib/docs_rs_storage/src/storage/non_blocking.rs +++ b/crates/lib/docs_rs_storage/src/storage/non_blocking.rs @@ -4,7 +4,7 @@ use crate::{ Config, archive_index::{self, ARCHIVE_INDEX_FILE_EXTENSION}, backends::{StorageBackend, StorageBackendMethods, s3::S3Backend}, - blob::{Blob, BlobUpload, StreamingBlob}, + blob::{Blob, StreamUpload, StreamUploadSource, StreamingBlob}, compression::{compress, compress_async}, errors::PathNotFoundError, file::FileEntry, @@ -21,10 +21,13 @@ use docs_rs_opentelemetry::AnyMeterProvider; use docs_rs_types::{BuildId, CompressionAlgorithm, KrateName, Version}; use docs_rs_utils::spawn_blocking; use futures_util::{TryStreamExt as _, future, stream::BoxStream}; -use std::{fmt, path::Path, pin::Pin, sync::Arc}; +use std::{fmt, io::Cursor, path::Path, pin::Pin, sync::Arc}; use tokio::{fs, io}; use tracing::{info_span, instrument, trace, warn}; +/// buffer size when writing zip files. +pub(crate) const ZIP_BUFFER_SIZE: usize = 1024 * 1024; + pub struct AsyncStorage { backend: StorageBackend, config: Arc, @@ -181,8 +184,7 @@ impl AsyncStorage { Ok(self.get_raw_stream(path).await?.decompress().await?) } - /// get, decompress and materialize part of an object from store - #[instrument(skip(self))] + #[cfg(test)] pub(crate) async fn get_range( &self, path: &str, @@ -296,7 +298,7 @@ impl AsyncStorage { root_dir: impl AsRef + fmt::Debug, ) -> Result<(Vec, CompressionAlgorithm)> { let root_dir = root_dir.as_ref(); - let (zip_content, file_paths) = + let (zip_file, file_paths) = spawn_blocking({ use std::{io, fs}; let archive_path = archive_path.to_owned(); @@ -316,7 +318,8 @@ impl AsyncStorage { // also has to be added as supported algorithm for storage compression, together // with a mapping in `storage::archive_index::Index::new_from_zip`. - let zip_content = { + + let zip_file = { let _span = info_span!("create_zip_archive", %archive_path, root_dir=%root_dir.display()).entered(); @@ -324,7 +327,9 @@ impl AsyncStorage { .compression_method(zip::CompressionMethod::Bzip2) .compression_level(Some(3)); - let mut zip = zip::ZipWriter::new(io::Cursor::new(Vec::new())); + // rustdoc archives can become a couple of GiB big, so we better use a tempfile. + let zip_file = tempfile::tempfile()?; + let mut zip = zip::ZipWriter::new(io::BufWriter::with_capacity(ZIP_BUFFER_SIZE, zip_file)); for file_path in get_file_list(&root_dir) { let file_path = file_path?; @@ -334,55 +339,59 @@ impl AsyncStorage { file_paths.push(FileEntry{path: file_path, size: file.metadata()?.len()}); } - zip.finish()?.into_inner() + zip.finish()?.into_inner()? }; Ok(( - zip_content, + zip_file, file_paths )) } }) .await?; + let zip_file = fs::File::from_std(zip_file); let alg = CompressionAlgorithm::default(); let remote_index_path = format!("{}.{ARCHIVE_INDEX_FILE_EXTENSION}", &archive_path); - let (zip_content, compressed_index_content) = { + let (zip_file, compressed_index_file) = { let _span = info_span!("create_archive_index", %remote_index_path).entered(); fs::create_dir_all(&self.config.temp_dir).await?; let local_index_path = tempfile::NamedTempFile::new_in(&self.config.temp_dir)?.into_temp_path(); - let zip_cursor = - archive_index::create(std::io::Cursor::new(zip_content), &local_index_path).await?; - let zip_content = zip_cursor.into_inner(); + let zip_reader = + archive_index::create(io::BufReader::new(zip_file), &local_index_path).await?; + let zip_file = zip_reader.into_inner(); - let mut buf: Vec = Vec::new(); + // compressed index can become up to a couple 100 MiB big, so rather use a tempfile. + let mut compressed_index_file = + fs::File::from_std(spawn_blocking(|| Ok(tempfile::tempfile()?)).await?); compress_async( &mut io::BufReader::new(fs::File::open(&local_index_path).await?), - &mut buf, + &mut io::BufWriter::new(&mut compressed_index_file), alg, ) .await?; - (zip_content, buf) + (zip_file, compressed_index_file) }; self.backend - .store_batch(vec![ - BlobUpload { - path: archive_path.to_string(), - mime: mimes::APPLICATION_ZIP.clone(), - content: zip_content, - compression: None, - }, - BlobUpload { - path: remote_index_path, - mime: mime::APPLICATION_OCTET_STREAM, - content: compressed_index_content, - compression: Some(alg), - }, - ]) + .upload_stream(StreamUpload { + path: archive_path.to_string(), + mime: mimes::APPLICATION_ZIP.clone(), + source: StreamUploadSource::File(zip_file), + compression: None, + }) + .await?; + + self.backend + .upload_stream(StreamUpload { + path: remote_index_path, + mime: mime::APPLICATION_OCTET_STREAM, + source: StreamUploadSource::File(compressed_index_file), + compression: Some(alg), + }) .await?; Ok((file_paths, CompressionAlgorithm::Bzip2)) @@ -395,65 +404,64 @@ impl AsyncStorage { prefix: impl AsRef + fmt::Debug, root_dir: impl AsRef + fmt::Debug, ) -> Result<(Vec, CompressionAlgorithm)> { - let prefix = prefix.as_ref(); + let prefix = prefix.as_ref().to_path_buf(); let root_dir = root_dir.as_ref(); let alg = CompressionAlgorithm::default(); - let (file_paths_and_mimes, blobs): (Vec<_>, Vec<_>) = walk_dir_recursive(&root_dir) + let file_paths_and_mimes: Vec<_> = walk_dir_recursive(&root_dir) .err_into::() - .map_ok(|item| async move { - // Some files have insufficient permissions - // (like .lock file created by cargo in documentation directory). - // Skip these files. - let Ok(file) = fs::File::open(&item).await else { - return Ok(None); - }; - - let content = { - let mut buf: Vec = Vec::new(); - compress_async(io::BufReader::new(file), &mut buf, alg).await?; - buf - }; - - let bucket_path = prefix.join(&item.relative).to_string_lossy().to_string(); - - let file_size = item.metadata.len(); - - let file_info = FileEntry { - path: item.relative.clone(), - size: file_size, - }; - let mime = file_info.mime(); - - Ok(Some(( - file_info, - BlobUpload { - path: bucket_path, - mime, - content, - compression: Some(alg), - }, - ))) + .map_ok(|item| { + let prefix = prefix.clone(); + async move { + // Some files have insufficient permissions + // (like .lock file created by cargo in documentation directory). + // Skip these files. + let Ok(file) = fs::File::open(&item).await else { + return Ok(None); + }; + + let content = { + let mut buf: Vec = Vec::new(); + compress_async(io::BufReader::new(file), &mut buf, alg).await?; + buf + }; + + let bucket_path = prefix.join(&item.relative).to_string_lossy().to_string(); + + let file_size = item.metadata.len(); + + let file_info = FileEntry { + path: item.relative.clone(), + size: file_size, + }; + let mime = file_info.mime().clone(); + + self.backend + .upload_stream(StreamUpload { + path: bucket_path, + mime, + source: StreamUploadSource::Bytes(content.into()), + compression: Some(alg), + }) + .await?; + + Ok(Some(file_info)) + } }) - .try_buffer_unordered(self.config.local_filesystem_parallelism) + .try_buffer_unordered(self.config.network_parallelism) .try_filter_map(|item| future::ready(Ok(item))) - .try_fold( - (Vec::new(), Vec::new()), - |(mut file_paths_and_mimes, mut blobs), (file_info, blob)| async move { - file_paths_and_mimes.push(file_info); - blobs.push(blob); - Ok((file_paths_and_mimes, blobs)) - }, - ) + .try_collect() .await?; - self.backend.store_batch(blobs).await?; Ok((file_paths_and_mimes, alg)) } #[cfg(test)] - pub async fn store_blobs(&self, blobs: Vec) -> Result<()> { - self.backend.store_batch(blobs).await + pub async fn store_blobs(&self, blobs: Vec) -> Result<()> { + for blob in blobs { + self.backend.upload_stream(blob.into()).await?; + } + Ok(()) } // Store file into the backend at the given path, uncompressed. @@ -469,12 +477,12 @@ impl AsyncStorage { let mime = detect_mime(&path).to_owned(); self.backend - .store_batch(vec![BlobUpload { + .upload_stream(StreamUpload { path, mime, - content, + source: StreamUploadSource::Bytes(content.into()), compression: None, - }]) + }) .await?; Ok(()) @@ -489,18 +497,17 @@ impl AsyncStorage { content: impl Into>, ) -> Result { let path = path.into(); - let content = content.into(); let alg = CompressionAlgorithm::default(); - let content = compress(&*content, alg)?; + let content = compress(Cursor::new(content.into()), alg)?; let mime = detect_mime(&path).to_owned(); self.backend - .store_batch(vec![BlobUpload { + .upload_stream(StreamUpload { path, mime, - content, + source: StreamUploadSource::Bytes(content.into()), compression: Some(alg), - }]) + }) .await?; Ok(alg) @@ -531,12 +538,12 @@ impl AsyncStorage { let mime = detect_mime(&target_path).to_owned(); self.backend - .store_batch(vec![BlobUpload { + .upload_stream(StreamUpload { path: target_path, mime, - content, + source: StreamUploadSource::Bytes(content.into()), compression: Some(alg), - }]) + }) .await?; Ok(alg) @@ -593,6 +600,7 @@ impl fmt::Debug for AsyncStorage { #[cfg(test)] mod backend_tests { use super::*; + use crate::blob::BlobUpload; use crate::{PathNotFoundError, errors::SizeLimitReached}; use docs_rs_headers::compute_etag; use docs_rs_opentelemetry::testing::TestMetrics; From d2ab4ae20ffd602e5b24deeec0f9ea5858b610b6 Mon Sep 17 00:00:00 2001 From: Denis Cornehl Date: Wed, 8 Apr 2026 16:24:37 +0200 Subject: [PATCH 2/8] fix recursion-limit error --- crates/bin/docs_rs_admin/src/main.rs | 2 -- crates/bin/docs_rs_admin/src/repackage.rs | 1 - 2 files changed, 3 deletions(-) diff --git a/crates/bin/docs_rs_admin/src/main.rs b/crates/bin/docs_rs_admin/src/main.rs index 726870eee..54821f537 100644 --- a/crates/bin/docs_rs_admin/src/main.rs +++ b/crates/bin/docs_rs_admin/src/main.rs @@ -1,5 +1,3 @@ -#![recursion_limit = "256"] - mod rebuilds; mod repackage; #[cfg(test)] diff --git a/crates/bin/docs_rs_admin/src/repackage.rs b/crates/bin/docs_rs_admin/src/repackage.rs index 9d949b8da..d6757ffc6 100644 --- a/crates/bin/docs_rs_admin/src/repackage.rs +++ b/crates/bin/docs_rs_admin/src/repackage.rs @@ -108,7 +108,6 @@ pub async fn repackage( /// repackage contents of a S3 path prefix into a single archive file. /// /// Not performance optimized, for now it just tries to be simple. -#[instrument(skip(storage))] async fn repackage_path( storage: &AsyncStorage, prefix: &str, From a0cdcc2ce5c3d115c41941b61c493df440283733 Mon Sep 17 00:00:00 2001 From: Denis Cornehl Date: Thu, 9 Apr 2026 03:22:01 +0200 Subject: [PATCH 3/8] switch stream-upload api to use native SDK file-upload, calculate checksum --- .gitignore | 2 +- Cargo.lock | 4 +- crates/lib/docs_rs_storage/Cargo.toml | 8 +- crates/lib/docs_rs_storage/benches/crc32.rs | 20 +++++ .../docs_rs_storage/src/backends/memory.rs | 15 ++-- crates/lib/docs_rs_storage/src/backends/s3.rs | 80 ++++++++++++----- crates/lib/docs_rs_storage/src/blob.rs | 88 ++----------------- crates/lib/docs_rs_storage/src/config.rs | 4 - crates/lib/docs_rs_storage/src/lib.rs | 1 + .../src/storage/non_blocking.rs | 87 ++++++++++-------- crates/lib/docs_rs_storage/src/utils/crc32.rs | 24 +++++ crates/lib/docs_rs_storage/src/utils/mod.rs | 1 + 12 files changed, 177 insertions(+), 157 deletions(-) create mode 100644 crates/lib/docs_rs_storage/benches/crc32.rs create mode 100644 crates/lib/docs_rs_storage/src/utils/crc32.rs diff --git a/.gitignore b/.gitignore index 3ab5f4610..4bfed8322 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,4 @@ -/ignored +*/ignored /.env /.docker.env /src/web/badge/Cargo.lock diff --git a/Cargo.lock b/Cargo.lock index 38c474002..2179a79e4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2284,8 +2284,10 @@ dependencies = [ "aws-config", "aws-sdk-s3", "aws-smithy-types-convert", + "base64", "bzip2", "chrono", + "crc32fast", "criterion", "dashmap", "docs_rs_config", @@ -2299,8 +2301,6 @@ dependencies = [ "flate2", "futures-util", "http 1.4.0", - "http-body 1.0.1", - "http-body-util", "itertools 0.14.0", "mime", "moka", diff --git a/crates/lib/docs_rs_storage/Cargo.toml b/crates/lib/docs_rs_storage/Cargo.toml index 83773084a..8a22eb248 100644 --- a/crates/lib/docs_rs_storage/Cargo.toml +++ b/crates/lib/docs_rs_storage/Cargo.toml @@ -23,8 +23,10 @@ async-stream = { workspace = true } aws-config = { version = "1.0.0", default-features = false, features = ["default-https-client", "rt-tokio"] } aws-sdk-s3 = { version = "1.3.0", default-features = false, features = ["default-https-client", "rt-tokio"] } aws-smithy-types-convert = { version = "0.60.0", features = ["convert-chrono"] } +base64 = { workspace = true } bzip2 = "0.6.0" chrono = { workspace = true } +crc32fast = "1.4.2" dashmap = { version = "6.0.0", optional = true } docs_rs_config = { path = "../docs_rs_config" } docs_rs_env_vars = { path = "../docs_rs_env_vars" } @@ -37,8 +39,6 @@ docs_rs_utils = { path = "../docs_rs_utils" } flate2 = "1.1.1" futures-util = { workspace = true } http = { workspace = true } -http-body = "1.0.0" -http-body-util = "0.1.3" itertools = { workspace = true } mime = { workspace = true } moka = { version = "0.12.14", features = ["future"] } @@ -73,5 +73,9 @@ name = "archive_index_cache" harness = false required-features = ["testing"] +[[bench]] +name = "crc32" +harness = false + [lints] workspace = true diff --git a/crates/lib/docs_rs_storage/benches/crc32.rs b/crates/lib/docs_rs_storage/benches/crc32.rs new file mode 100644 index 000000000..cf39f569e --- /dev/null +++ b/crates/lib/docs_rs_storage/benches/crc32.rs @@ -0,0 +1,20 @@ +use criterion::{Criterion, Throughput, criterion_group, criterion_main}; +use docs_rs_storage::crc32_for_path; +use std::{fs, hint::black_box}; + +pub fn crc32_file(c: &mut Criterion) { + let fixture_path = tempfile::NamedTempFile::new().unwrap().into_temp_path(); + + let fixture = vec![b'x'; 16 * 1024 * 1024]; + fs::write(&fixture_path, &fixture).unwrap(); + + let mut group = c.benchmark_group("crc32"); + group.throughput(Throughput::Bytes(fixture.len() as u64)); + group.bench_function("file_16mib", |b| { + b.iter(|| crc32_for_path(black_box(&fixture_path))); + }); + group.finish(); +} + +criterion_group!(crc32_benches, crc32_file); +criterion_main!(crc32_benches); diff --git a/crates/lib/docs_rs_storage/src/backends/memory.rs b/crates/lib/docs_rs_storage/src/backends/memory.rs index cf4dd5bbe..818c1e4f4 100644 --- a/crates/lib/docs_rs_storage/src/backends/memory.rs +++ b/crates/lib/docs_rs_storage/src/backends/memory.rs @@ -1,7 +1,7 @@ use crate::{ Blob, backends::StorageBackendMethods, - blob::{StreamUpload, StreamingBlob}, + blob::{StreamUpload, StreamUploadSource, StreamingBlob}, errors::PathNotFoundError, metrics::StorageMetrics, types::FileRange, @@ -12,7 +12,7 @@ use dashmap::DashMap; use docs_rs_headers::compute_etag; use futures_util::stream::{self, BoxStream}; use itertools::Itertools as _; -use tokio::io; +use tokio::fs; pub(crate) struct MemoryBackend { otel_metrics: StorageMetrics, @@ -56,16 +56,17 @@ impl StorageBackendMethods for MemoryBackend { compression, } = upload; - let mut content = source.reader().await?; - let mut buffer = Vec::new(); - io::copy(&mut content, &mut buffer).await?; + let content = match source { + StreamUploadSource::Bytes(content) => content.to_vec(), + StreamUploadSource::File(path) => fs::read(&path).await?, + }; let blob = Blob { path, mime, date_updated: Utc::now(), - etag: Some(compute_etag(&buffer)), - content: buffer, + etag: Some(compute_etag(&content)), + content, compression, }; diff --git a/crates/lib/docs_rs_storage/src/backends/s3.rs b/crates/lib/docs_rs_storage/src/backends/s3.rs index 1e4f4135c..700f9c472 100644 --- a/crates/lib/docs_rs_storage/src/backends/s3.rs +++ b/crates/lib/docs_rs_storage/src/backends/s3.rs @@ -1,7 +1,8 @@ use crate::{ Config, backends::StorageBackendMethods, - blob::{StreamUpload, StreamingBlob}, + blob::{StreamUpload, StreamUploadSource, StreamingBlob}, + crc32_for_path, errors::PathNotFoundError, metrics::StorageMetrics, types::FileRange, @@ -13,20 +14,17 @@ use aws_sdk_s3::{ Client, config::{Region, retry::RetryConfig}, error::{ProvideErrorMetadata, SdkError}, - primitives::ByteStream, - types::{Delete, ObjectIdentifier}, + primitives::{ByteStream, Length}, + types::{ChecksumAlgorithm, Delete, ObjectIdentifier}, }; use aws_smithy_types_convert::date_time::DateTimeExt; +use base64::{Engine as _, engine::general_purpose::STANDARD as b64}; use chrono::Utc; use docs_rs_headers::{ETag, compute_etag}; -use futures_util::{ - TryStreamExt, - stream::{BoxStream, StreamExt}, -}; -use http_body::Frame; -use http_body_util::StreamBody; +use docs_rs_utils::spawn_blocking; +use futures_util::stream::{BoxStream, StreamExt}; use opentelemetry::KeyValue; -use tokio_util::io::ReaderStream; +use tokio::fs; use tracing::{error, warn}; // error codes to check for when trying to determine if an error is @@ -46,6 +44,8 @@ static NOT_FOUND_ERROR_CODES: [&str; 5] = [ "XMinioInvalidObjectName", ]; +const S3_UPLOAD_BUFFER_SIZE: usize = 1024 * 1024; // 1 MiB + trait S3ResultExt { fn convert_errors(self) -> anyhow::Result; } @@ -243,26 +243,66 @@ impl StorageBackendMethods for S3Backend { compression, } = upload; - let content_length = source.content_length().await?; + let (content_length, checksum_crc32) = match &source { + StreamUploadSource::Bytes(bytes) => (bytes.len() as u64, None), + StreamUploadSource::File(local_path) => { + let local_path = local_path.clone(); + + ( + fs::metadata(&local_path).await?.len(), + Some( + spawn_blocking(move || Ok(b64.encode(crc32_for_path(local_path)?))).await?, + ), + ) + } + }; let mut last_err = None; for attempt in 1..=3 { - let reader = source.reader().await?; - let stream = ReaderStream::new(reader).map_ok(Frame::data); + let body = match &source { + StreamUploadSource::Bytes(bytes) => ByteStream::from(bytes.clone()), + StreamUploadSource::File(path) => { + // NOTE: + // reading the upload-data from a local path is + // "retryable" in the AWS SDK sense. + // ".file" (file pointer) is not retryable. + ByteStream::read_from() + .path(path) + .buffer_size(S3_UPLOAD_BUFFER_SIZE) + .length(Length::Exact(content_length)) + .build() + .await? + } + }; - match self + let mut request = self .client .put_object() .bucket(&self.bucket) .key(&path) - .body(ByteStream::from_body_1_x(StreamBody::new(stream))) + .body(body) .content_length(content_length as i64) .content_type(mime.to_string()) - .set_content_encoding(compression.map(|alg| alg.to_string())) - .send() - .await - { + .set_content_encoding(compression.map(|alg| alg.to_string())); + + // NOTE: when you try to stream-upload a local file, the AWS SDK by default + // uses a "middleware" to calculate the checksum for the content, to compare it after + // uploading. + // This piece is broken right now, but only when using S3 directly. On minio, all is + // fiine. + // I don't want to disable checksums so we're sure the files are uploaded correctly. + // So the only alternative (outside of trying to fix the SDK) is to calculate the + // checksum ourselves. This is a little annoying because this means we have to read the + // whole file before upload. But since I don't want to load all files into memory before + // upload, this is the only option. + if let Some(checksum_crc32) = &checksum_crc32 { + request = request + .checksum_algorithm(ChecksumAlgorithm::Crc32) + .checksum_crc32(checksum_crc32); + } + + match request.send().await { Ok(_) => { self.otel_metrics .uploaded_files @@ -270,7 +310,7 @@ impl StorageBackendMethods for S3Backend { return Ok(()); } Err(err) => { - warn!(?err, attempt = attempt + 1, %path, "failed to upload blob to S3"); + warn!(?err, attempt, %path, "failed to upload blob to S3"); last_err = Some(err); } } diff --git a/crates/lib/docs_rs_storage/src/blob.rs b/crates/lib/docs_rs_storage/src/blob.rs index 6079e47c6..f6364ff41 100644 --- a/crates/lib/docs_rs_storage/src/blob.rs +++ b/crates/lib/docs_rs_storage/src/blob.rs @@ -4,39 +4,13 @@ use chrono::{DateTime, Utc}; use docs_rs_headers::{ETag, compute_etag}; use docs_rs_types::CompressionAlgorithm; use mime::Mime; -use std::{ - fmt, - io::{Cursor, SeekFrom}, - sync::Arc, -}; -use tokio::{ - fs, - io::{self, AsyncBufRead, AsyncBufReadExt, AsyncRead, AsyncSeekExt}, -}; +use std::{fmt, io::Cursor, path::PathBuf}; +use tokio::io::{self, AsyncBufRead, AsyncBufReadExt}; +use tokio_util::bytes::Bytes; pub enum StreamUploadSource { - Bytes(Arc<[u8]>), - File(fs::File), -} - -impl StreamUploadSource { - pub async fn reader(&self) -> io::Result> { - Ok(match self { - Self::Bytes(bytes) => Box::new(Cursor::new(bytes.clone())), - Self::File(file) => { - let mut cloned = file.try_clone().await?; - cloned.seek(SeekFrom::Start(0)).await?; - Box::new(cloned) - } - }) - } - - pub async fn content_length(&self) -> io::Result { - Ok(match self { - Self::Bytes(bytes) => bytes.len() as u64, - Self::File(file) => file.metadata().await?.len(), - }) - } + Bytes(Bytes), + File(PathBuf), } /// Represents a stream blob to be uploaded to storage. @@ -59,7 +33,7 @@ impl From for StreamUpload { Self { path: value.path, mime: value.mime, - source: StreamUploadSource::Bytes(Arc::from(value.content)), + source: StreamUploadSource::Bytes(value.content.into()), compression: value.compression, } } @@ -200,10 +174,6 @@ mod test { use super::*; use crate::compress_async; use docs_rs_headers::compute_etag; - use tokio::{ - fs, - io::{AsyncReadExt as _, AsyncWriteExt as _}, - }; const ZSTD_EOF_BYTES: [u8; 3] = [0x01, 0x00, 0x00]; @@ -246,52 +216,6 @@ mod test { Ok(()) } - #[tokio::test] - async fn test_stream_upload_source_bytes_creates_fresh_readers() -> Result<()> { - const CONTENT: &[u8] = b"Hello, world!"; - - let source = StreamUploadSource::Bytes(Arc::from(CONTENT)); - assert_eq!(source.content_length().await?, CONTENT.len() as u64); - - let mut first = source.reader().await?; - let mut first_buf = Vec::new(); - first.read_to_end(&mut first_buf).await?; - assert_eq!(first_buf, CONTENT); - - let mut second = source.reader().await?; - let mut second_buf = Vec::new(); - second.read_to_end(&mut second_buf).await?; - assert_eq!(second_buf, CONTENT); - - Ok(()) - } - - #[tokio::test] - async fn test_stream_upload_source_file_creates_fresh_readers() -> Result<()> { - const CONTENT: &[u8] = b"Hello, world!"; - - let tempfile = tempfile::NamedTempFile::new()?; - let mut file = fs::File::from_std(tempfile.reopen()?); - file.write_all(CONTENT).await?; - file.seek(std::io::SeekFrom::Start(CONTENT.len() as u64)) - .await?; - - let source = StreamUploadSource::File(file); - assert_eq!(source.content_length().await?, CONTENT.len() as u64); - - let mut first = source.reader().await?; - let mut first_buf = Vec::new(); - first.read_to_end(&mut first_buf).await?; - assert_eq!(first_buf, CONTENT); - - let mut second = source.reader().await?; - let mut second_buf = Vec::new(); - second.read_to_end(&mut second_buf).await?; - assert_eq!(second_buf, CONTENT); - - Ok(()) - } - #[tokio::test] async fn test_streaming_broken_zstd_blob() -> Result<()> { const NOT_ZSTD: &[u8] = b"Hello, world!"; diff --git a/crates/lib/docs_rs_storage/src/config.rs b/crates/lib/docs_rs_storage/src/config.rs index 4da949f99..c3429ff84 100644 --- a/crates/lib/docs_rs_storage/src/config.rs +++ b/crates/lib/docs_rs_storage/src/config.rs @@ -75,8 +75,6 @@ impl AppConfig for ArchiveIndexCacheConfig { #[derive(Debug)] pub struct Config { - pub temp_dir: PathBuf, - // Storage params pub storage_backend: StorageKind, @@ -113,11 +111,9 @@ pub struct Config { impl AppConfig for Config { fn from_environment() -> anyhow::Result { - let prefix: PathBuf = require_env("DOCSRS_PREFIX")?; let cores = std::thread::available_parallelism()?.get(); Ok(Self { - temp_dir: prefix.join("tmp"), storage_backend: env("DOCSRS_STORAGE_BACKEND", StorageKind::default())?, aws_sdk_max_retries: env("DOCSRS_AWS_SDK_MAX_RETRIES", 6u32)?, s3_bucket: env("DOCSRS_S3_BUCKET", "rust-docs-rs".to_string())?, diff --git a/crates/lib/docs_rs_storage/src/lib.rs b/crates/lib/docs_rs_storage/src/lib.rs index 3c70b88b1..92d9bb185 100644 --- a/crates/lib/docs_rs_storage/src/lib.rs +++ b/crates/lib/docs_rs_storage/src/lib.rs @@ -22,6 +22,7 @@ pub use storage::blocking::Storage; pub use storage::non_blocking::AsyncStorage; pub use types::StorageKind; pub use utils::{ + crc32::crc32_for_path, file_list::get_file_list, storage_path::{rustdoc_archive_path, rustdoc_json_path, source_archive_path}, }; diff --git a/crates/lib/docs_rs_storage/src/storage/non_blocking.rs b/crates/lib/docs_rs_storage/src/storage/non_blocking.rs index 7c5e8350d..1cfbba8a9 100644 --- a/crates/lib/docs_rs_storage/src/storage/non_blocking.rs +++ b/crates/lib/docs_rs_storage/src/storage/non_blocking.rs @@ -21,8 +21,14 @@ use docs_rs_opentelemetry::AnyMeterProvider; use docs_rs_types::{BuildId, CompressionAlgorithm, KrateName, Version}; use docs_rs_utils::spawn_blocking; use futures_util::{TryStreamExt as _, future, stream::BoxStream}; -use std::{fmt, io::Cursor, path::Path, pin::Pin, sync::Arc}; -use tokio::{fs, io}; +use std::{ + fmt, + io::{Cursor, Write as _}, + path::Path, + pin::Pin, + sync::Arc, +}; +use tokio::{fs, io, io::AsyncWriteExt as _}; use tracing::{info_span, instrument, trace, warn}; /// buffer size when writing zip files. @@ -298,11 +304,16 @@ impl AsyncStorage { root_dir: impl AsRef + fmt::Debug, ) -> Result<(Vec, CompressionAlgorithm)> { let root_dir = root_dir.as_ref(); - let (zip_file, file_paths) = + + let zip_temp_path = tempfile::NamedTempFile::new()?.into_temp_path(); + let zip_path = zip_temp_path.to_path_buf(); + + let file_paths = spawn_blocking({ use std::{io, fs}; let archive_path = archive_path.to_owned(); let root_dir = root_dir.to_owned(); + let zip_path = zip_path.clone(); move || { let mut file_paths = Vec::new(); @@ -319,7 +330,7 @@ impl AsyncStorage { // with a mapping in `storage::archive_index::Index::new_from_zip`. - let zip_file = { + { let _span = info_span!("create_zip_archive", %archive_path, root_dir=%root_dir.display()).entered(); @@ -328,7 +339,7 @@ impl AsyncStorage { .compression_level(Some(3)); // rustdoc archives can become a couple of GiB big, so we better use a tempfile. - let zip_file = tempfile::tempfile()?; + let zip_file = fs::File::create(&zip_path)?; let mut zip = zip::ZipWriter::new(io::BufWriter::with_capacity(ZIP_BUFFER_SIZE, zip_file)); for file_path in get_file_list(&root_dir) { let file_path = file_path?; @@ -339,60 +350,58 @@ impl AsyncStorage { file_paths.push(FileEntry{path: file_path, size: file.metadata()?.len()}); } - zip.finish()?.into_inner()? - }; + let mut zip_file = zip.finish()?.into_inner()?; + zip_file.flush()?; + } - Ok(( - zip_file, - file_paths - )) + Ok(file_paths) } }) .await?; - let zip_file = fs::File::from_std(zip_file); let alg = CompressionAlgorithm::default(); let remote_index_path = format!("{}.{ARCHIVE_INDEX_FILE_EXTENSION}", &archive_path); - let (zip_file, compressed_index_file) = { + let compressed_index_temp_path = tempfile::NamedTempFile::new()?.into_temp_path(); + let compressed_index_path = compressed_index_temp_path.to_path_buf(); + { let _span = info_span!("create_archive_index", %remote_index_path).entered(); - fs::create_dir_all(&self.config.temp_dir).await?; - let local_index_path = - tempfile::NamedTempFile::new_in(&self.config.temp_dir)?.into_temp_path(); - - let zip_reader = - archive_index::create(io::BufReader::new(zip_file), &local_index_path).await?; - let zip_file = zip_reader.into_inner(); + let local_index_path = tempfile::NamedTempFile::new()?.into_temp_path(); - // compressed index can become up to a couple 100 MiB big, so rather use a tempfile. - let mut compressed_index_file = - fs::File::from_std(spawn_blocking(|| Ok(tempfile::tempfile()?)).await?); - compress_async( - &mut io::BufReader::new(fs::File::open(&local_index_path).await?), - &mut io::BufWriter::new(&mut compressed_index_file), - alg, + archive_index::create( + io::BufReader::new(fs::File::open(&zip_path).await?), + &local_index_path, ) .await?; - (zip_file, compressed_index_file) - }; - self.backend - .upload_stream(StreamUpload { + // compressed index can become up to a couple 100 MiB big, so rather use a tempfile. + let mut compressed_index_file = fs::File::create(&compressed_index_path).await?; + { + let mut compressed_index_writer = io::BufWriter::new(&mut compressed_index_file); + compress_async( + &mut io::BufReader::new(fs::File::open(&local_index_path).await?), + &mut compressed_index_writer, + alg, + ) + .await?; + compressed_index_writer.flush().await?; + } + } + + tokio::try_join!( + self.backend.upload_stream(StreamUpload { path: archive_path.to_string(), mime: mimes::APPLICATION_ZIP.clone(), - source: StreamUploadSource::File(zip_file), + source: StreamUploadSource::File(zip_path), compression: None, - }) - .await?; - - self.backend - .upload_stream(StreamUpload { + }), + self.backend.upload_stream(StreamUpload { path: remote_index_path, mime: mime::APPLICATION_OCTET_STREAM, - source: StreamUploadSource::File(compressed_index_file), + source: StreamUploadSource::File(compressed_index_path), compression: Some(alg), }) - .await?; + )?; Ok((file_paths, CompressionAlgorithm::Bzip2)) } diff --git a/crates/lib/docs_rs_storage/src/utils/crc32.rs b/crates/lib/docs_rs_storage/src/utils/crc32.rs new file mode 100644 index 000000000..815b22b93 --- /dev/null +++ b/crates/lib/docs_rs_storage/src/utils/crc32.rs @@ -0,0 +1,24 @@ +use crc32fast::Hasher; +use std::{ + fs, + io::{self, Read as _}, + path::Path, +}; + +pub fn crc32_for_path(path: impl AsRef) -> Result<[u8; 4], io::Error> { + let path = path.as_ref(); + + let mut file = fs::File::open(path)?; + let mut hasher = Hasher::new(); + let mut buffer = [0; 256 * 1024]; + + loop { + let read = file.read(&mut buffer)?; + if read == 0 { + break; + } + hasher.update(&buffer[..read]); + } + + Ok(hasher.finalize().to_be_bytes()) +} diff --git a/crates/lib/docs_rs_storage/src/utils/mod.rs b/crates/lib/docs_rs_storage/src/utils/mod.rs index 65bab4072..2a1258aad 100644 --- a/crates/lib/docs_rs_storage/src/utils/mod.rs +++ b/crates/lib/docs_rs_storage/src/utils/mod.rs @@ -1,3 +1,4 @@ +pub(crate) mod crc32; pub(crate) mod file_list; pub(crate) mod sized_buffer; pub(crate) mod storage_path; From b3630907a1bfcd6e2a2afa2f1c412f9c2ae99c40 Mon Sep 17 00:00:00 2001 From: Denis Cornehl Date: Sun, 19 Apr 2026 17:48:19 +0200 Subject: [PATCH 4/8] crc benchmark: panic on error --- crates/lib/docs_rs_storage/benches/crc32.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/lib/docs_rs_storage/benches/crc32.rs b/crates/lib/docs_rs_storage/benches/crc32.rs index cf39f569e..12f3aa965 100644 --- a/crates/lib/docs_rs_storage/benches/crc32.rs +++ b/crates/lib/docs_rs_storage/benches/crc32.rs @@ -11,7 +11,7 @@ pub fn crc32_file(c: &mut Criterion) { let mut group = c.benchmark_group("crc32"); group.throughput(Throughput::Bytes(fixture.len() as u64)); group.bench_function("file_16mib", |b| { - b.iter(|| crc32_for_path(black_box(&fixture_path))); + b.iter(|| crc32_for_path(black_box(&fixture_path)).unwrap()); }); group.finish(); } From 511bd5b8b51fbf8783dc0a68678d2f9673cd496c Mon Sep 17 00:00:00 2001 From: Denis Cornehl Date: Sun, 19 Apr 2026 17:49:27 +0200 Subject: [PATCH 5/8] storage retries: add sleep between attempts --- crates/lib/docs_rs_storage/src/backends/s3.rs | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/crates/lib/docs_rs_storage/src/backends/s3.rs b/crates/lib/docs_rs_storage/src/backends/s3.rs index 700f9c472..095949cfb 100644 --- a/crates/lib/docs_rs_storage/src/backends/s3.rs +++ b/crates/lib/docs_rs_storage/src/backends/s3.rs @@ -24,7 +24,8 @@ use docs_rs_headers::{ETag, compute_etag}; use docs_rs_utils::spawn_blocking; use futures_util::stream::{BoxStream, StreamExt}; use opentelemetry::KeyValue; -use tokio::fs; +use std::time::Duration; +use tokio::{fs, time::sleep}; use tracing::{error, warn}; // error codes to check for when trying to determine if an error is @@ -290,7 +291,7 @@ impl StorageBackendMethods for S3Backend { // uses a "middleware" to calculate the checksum for the content, to compare it after // uploading. // This piece is broken right now, but only when using S3 directly. On minio, all is - // fiine. + // fine. // I don't want to disable checksums so we're sure the files are uploaded correctly. // So the only alternative (outside of trying to fix the SDK) is to calculate the // checksum ourselves. This is a little annoying because this means we have to read the @@ -312,6 +313,10 @@ impl StorageBackendMethods for S3Backend { Err(err) => { warn!(?err, attempt, %path, "failed to upload blob to S3"); last_err = Some(err); + + if attempt < 3 { + sleep(Duration::from_millis(10 * 2u64.pow(attempt))).await; + } } } } From 32fc0681a3be7d2628a5dabec7bbb9cb1937abd8 Mon Sep 17 00:00:00 2001 From: Denis Cornehl Date: Sun, 19 Apr 2026 17:49:41 +0200 Subject: [PATCH 6/8] remove unused filesystem-parallelism setting --- crates/lib/docs_rs_storage/src/config.rs | 7 ------- 1 file changed, 7 deletions(-) diff --git a/crates/lib/docs_rs_storage/src/config.rs b/crates/lib/docs_rs_storage/src/config.rs index c3429ff84..9f1bc9f14 100644 --- a/crates/lib/docs_rs_storage/src/config.rs +++ b/crates/lib/docs_rs_storage/src/config.rs @@ -99,12 +99,6 @@ pub struct Config { // config for the local archive index cache pub archive_index_cache: Arc, - // How much we want to parallelize local filesystem logic. - // For pure I/O this could be quite high (32/64), but - // we often also add compression on top of it, which is CPU-bound, - // even when just light / simpler compression. - pub local_filesystem_parallelism: usize, - // How much we want to parallelize file uploads / downloads. pub network_parallelism: usize, } @@ -124,7 +118,6 @@ impl AppConfig for Config { max_file_size_html: env("DOCSRS_MAX_FILE_SIZE_HTML", 50 * 1024 * 1024)?, #[cfg(any(test, feature = "testing"))] s3_bucket_is_temporary: false, - local_filesystem_parallelism: env("DOCSRS_LOCAL_FILESYSTEM_PARALLELISM", cores)?, network_parallelism: env("DOCSRS_NETWORK_PARALLELISM", 8usize.min(cores))?, }) } From 398035b9facd80535cb95f4d6b40ffd63a9b82a9 Mon Sep 17 00:00:00 2001 From: Denis Cornehl Date: Sun, 19 Apr 2026 17:50:00 +0200 Subject: [PATCH 7/8] repackage: re-add tracing span --- crates/bin/docs_rs_admin/src/repackage.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/crates/bin/docs_rs_admin/src/repackage.rs b/crates/bin/docs_rs_admin/src/repackage.rs index d6757ffc6..49e00d750 100644 --- a/crates/bin/docs_rs_admin/src/repackage.rs +++ b/crates/bin/docs_rs_admin/src/repackage.rs @@ -8,7 +8,7 @@ use std::collections::HashSet; use std::sync::Arc; use std::sync::atomic::{AtomicUsize, Ordering}; use tokio::{fs, io}; -use tracing::{debug, info, instrument}; +use tracing::{debug, info, info_span, instrument}; /// repackage old rustdoc / source content. /// @@ -115,6 +115,8 @@ async fn repackage_path( ) -> Result, CompressionAlgorithm)>> { const DOWNLOAD_CONCURRENCY: usize = 8; + let _span = info_span!("repackage_path", %prefix, %target_archive).entered(); + info!("repackage path"); let tempdir = spawn_blocking(|| tempfile::tempdir().map_err(Into::into)).await?; let tempdir_path = tempdir.path().to_path_buf(); From 2daa3630589ecdbd23cbed4cc0be5ae6fb0e8bfe Mon Sep 17 00:00:00 2001 From: Denis Cornehl Date: Sun, 19 Apr 2026 17:50:09 +0200 Subject: [PATCH 8/8] storage: remove unused store_path --- .../src/storage/non_blocking.rs | 38 +------------------ 1 file changed, 2 insertions(+), 36 deletions(-) diff --git a/crates/lib/docs_rs_storage/src/storage/non_blocking.rs b/crates/lib/docs_rs_storage/src/storage/non_blocking.rs index 1cfbba8a9..ba438e72a 100644 --- a/crates/lib/docs_rs_storage/src/storage/non_blocking.rs +++ b/crates/lib/docs_rs_storage/src/storage/non_blocking.rs @@ -305,6 +305,8 @@ impl AsyncStorage { ) -> Result<(Vec, CompressionAlgorithm)> { let root_dir = root_dir.as_ref(); + // Keep the TempPath guards alive until after both uploads complete; dropping them earlier + // would delete the files while S3 is still reading from them. let zip_temp_path = tempfile::NamedTempFile::new()?.into_temp_path(); let zip_path = zip_temp_path.to_path_buf(); @@ -522,42 +524,6 @@ impl AsyncStorage { Ok(alg) } - #[instrument(skip(self))] - pub async fn store_path( - &self, - target_path: impl Into + fmt::Debug, - source_path: impl AsRef + fmt::Debug, - ) -> Result { - let target_path = target_path.into(); - let source_path = source_path.as_ref(); - - let alg = CompressionAlgorithm::default(); - - let content = { - let mut buf: Vec = Vec::new(); - compress_async( - io::BufReader::new(fs::File::open(source_path).await?), - &mut buf, - alg, - ) - .await?; - buf - }; - - let mime = detect_mime(&target_path).to_owned(); - - self.backend - .upload_stream(StreamUpload { - path: target_path, - mime, - source: StreamUploadSource::Bytes(content.into()), - compression: Some(alg), - }) - .await?; - - Ok(alg) - } - #[instrument(skip(self))] pub async fn list_prefix<'a>(&'a self, prefix: &'a str) -> BoxStream<'a, Result> { self.backend.list_prefix(prefix).await