From 4385864c067f17b730760ee242eb3a1a7c230d2e Mon Sep 17 00:00:00 2001 From: Denis Cornehl Date: Wed, 8 Apr 2026 17:41:31 +0200 Subject: [PATCH 1/2] Revert "fix recursion-limit error" This reverts commit 319dafe306e470e859d3c01fdcb62bdc761d326b. --- crates/bin/docs_rs_admin/src/main.rs | 2 ++ crates/bin/docs_rs_admin/src/repackage.rs | 1 + 2 files changed, 3 insertions(+) diff --git a/crates/bin/docs_rs_admin/src/main.rs b/crates/bin/docs_rs_admin/src/main.rs index 54821f537..726870eee 100644 --- a/crates/bin/docs_rs_admin/src/main.rs +++ b/crates/bin/docs_rs_admin/src/main.rs @@ -1,3 +1,5 @@ +#![recursion_limit = "256"] + mod rebuilds; mod repackage; #[cfg(test)] diff --git a/crates/bin/docs_rs_admin/src/repackage.rs b/crates/bin/docs_rs_admin/src/repackage.rs index d6757ffc6..9d949b8da 100644 --- a/crates/bin/docs_rs_admin/src/repackage.rs +++ b/crates/bin/docs_rs_admin/src/repackage.rs @@ -108,6 +108,7 @@ pub async fn repackage( /// repackage contents of a S3 path prefix into a single archive file. /// /// Not performance optimized, for now it just tries to be simple. +#[instrument(skip(storage))] async fn repackage_path( storage: &AsyncStorage, prefix: &str, From dbbdb6b853d7f1d34e3fc57e1226519dfad428a1 Mon Sep 17 00:00:00 2001 From: Denis Cornehl Date: Wed, 8 Apr 2026 17:41:38 +0200 Subject: [PATCH 2/2] Revert "storage: start streaming zip/index uploads, parallel directory upload" This reverts commit dee3cc6d4c6264d7636e44928f4dfc0c040e4dd5. --- Cargo.lock | 2 - crates/bin/docs_rs_admin/src/main.rs | 2 - crates/lib/docs_rs_storage/Cargo.toml | 2 - .../lib/docs_rs_storage/src/archive_index.rs | 12 +- .../docs_rs_storage/src/backends/memory.rs | 33 +--- .../lib/docs_rs_storage/src/backends/mod.rs | 8 +- crates/lib/docs_rs_storage/src/backends/s3.rs | 84 ++++---- crates/lib/docs_rs_storage/src/blob.rs | 122 +----------- crates/lib/docs_rs_storage/src/config.rs | 10 +- .../docs_rs_storage/src/storage/blocking.rs | 25 ++- .../src/storage/non_blocking.rs | 184 +++++++++--------- 11 files changed, 178 insertions(+), 306 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 42a9a954e..ff35dc327 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2287,8 +2287,6 @@ dependencies = [ "flate2", "futures-util", "http 1.4.0", - "http-body 1.0.1", - "http-body-util", "itertools 0.14.0", "mime", "moka", diff --git a/crates/bin/docs_rs_admin/src/main.rs b/crates/bin/docs_rs_admin/src/main.rs index 726870eee..54821f537 100644 --- a/crates/bin/docs_rs_admin/src/main.rs +++ b/crates/bin/docs_rs_admin/src/main.rs @@ -1,5 +1,3 @@ -#![recursion_limit = "256"] - mod rebuilds; mod repackage; #[cfg(test)] diff --git a/crates/lib/docs_rs_storage/Cargo.toml b/crates/lib/docs_rs_storage/Cargo.toml index 83773084a..b7b6380aa 100644 --- a/crates/lib/docs_rs_storage/Cargo.toml +++ b/crates/lib/docs_rs_storage/Cargo.toml @@ -37,8 +37,6 @@ docs_rs_utils = { path = "../docs_rs_utils" } flate2 = "1.1.1" futures-util = { workspace = true } http = { workspace = true } -http-body = "1.0.0" -http-body-util = "0.1.3" itertools = { workspace = true } mime = { workspace = true } moka = { version = "0.12.14", features = ["future"] } diff --git a/crates/lib/docs_rs_storage/src/archive_index.rs b/crates/lib/docs_rs_storage/src/archive_index.rs index 3e3b54621..334fb96c6 100644 --- a/crates/lib/docs_rs_storage/src/archive_index.rs +++ b/crates/lib/docs_rs_storage/src/archive_index.rs @@ -830,7 +830,7 @@ where #[cfg(test)] mod tests { use super::*; - use crate::{blob::StreamingBlob, storage::non_blocking::ZIP_BUFFER_SIZE}; + use crate::blob::StreamingBlob; use chrono::Utc; use docs_rs_config::AppConfig as _; use docs_rs_opentelemetry::testing::TestMetrics; @@ -839,15 +839,14 @@ mod tests { use zip::write::SimpleFileOptions; async fn create_test_archive(file_count: u32) -> Result { - let writer = spawn_blocking(move || { + spawn_blocking(move || { use std::io::Write as _; let tf = tempfile::tempfile()?; let objectcontent: Vec = (0..255).collect(); - let mut archive = - zip::ZipWriter::new(std::io::BufWriter::with_capacity(ZIP_BUFFER_SIZE, tf)); + let mut archive = zip::ZipWriter::new(tf); for i in 0..file_count { archive.start_file( format!("testfile{i}"), @@ -859,9 +858,8 @@ mod tests { } Ok(archive.finish()?) }) - .await?; - - Ok(fs::File::from_std(writer.into_inner()?)) + .await + .map(fs::File::from_std) } struct FakeDownloader { diff --git a/crates/lib/docs_rs_storage/src/backends/memory.rs b/crates/lib/docs_rs_storage/src/backends/memory.rs index cf4dd5bbe..385cd7f3f 100644 --- a/crates/lib/docs_rs_storage/src/backends/memory.rs +++ b/crates/lib/docs_rs_storage/src/backends/memory.rs @@ -1,18 +1,16 @@ use crate::{ Blob, backends::StorageBackendMethods, - blob::{StreamUpload, StreamingBlob}, + blob::{BlobUpload, StreamingBlob}, errors::PathNotFoundError, metrics::StorageMetrics, types::FileRange, }; use anyhow::{Result, anyhow}; -use chrono::Utc; use dashmap::DashMap; use docs_rs_headers::compute_etag; use futures_util::stream::{self, BoxStream}; use itertools::Itertools as _; -use tokio::io; pub(crate) struct MemoryBackend { otel_metrics: StorageMetrics, @@ -48,29 +46,16 @@ impl StorageBackendMethods for MemoryBackend { Ok(blob.into()) } - async fn upload_stream(&self, upload: StreamUpload) -> Result<()> { - let StreamUpload { - path, - mime, - source, - compression, - } = upload; + async fn store_batch(&self, batch: Vec) -> Result<()> { + self.otel_metrics + .uploaded_files + .add(batch.len() as u64, &[]); - let mut content = source.reader().await?; - let mut buffer = Vec::new(); - io::copy(&mut content, &mut buffer).await?; - - let blob = Blob { - path, - mime, - date_updated: Utc::now(), - etag: Some(compute_etag(&buffer)), - content: buffer, - compression, - }; + for upload in batch { + let blob: Blob = upload.into(); + self.objects.insert(blob.path.clone(), blob); + } - self.otel_metrics.uploaded_files.add(1, &[]); - self.objects.insert(blob.path.clone(), blob); Ok(()) } diff --git a/crates/lib/docs_rs_storage/src/backends/mod.rs b/crates/lib/docs_rs_storage/src/backends/mod.rs index effb27dc5..a257f8de3 100644 --- a/crates/lib/docs_rs_storage/src/backends/mod.rs +++ b/crates/lib/docs_rs_storage/src/backends/mod.rs @@ -2,14 +2,14 @@ pub(crate) mod memory; pub(crate) mod s3; -use crate::{StreamingBlob, blob::StreamUpload, types::FileRange}; +use crate::{BlobUpload, StreamingBlob, types::FileRange}; use anyhow::Result; use futures_util::stream::BoxStream; pub(crate) trait StorageBackendMethods { async fn exists(&self, path: &str) -> Result; async fn get_stream(&self, path: &str, range: Option) -> Result; - async fn upload_stream(&self, upload: StreamUpload) -> Result<()>; + async fn store_batch(&self, batch: Vec) -> Result<()>; async fn list_prefix<'a>(&'a self, prefix: &'a str) -> BoxStream<'a, Result>; async fn delete_prefix(&self, prefix: &str) -> Result<()>; } @@ -39,8 +39,8 @@ impl StorageBackendMethods for StorageBackend { call_inner!(self, get_stream(path, range)) } - async fn upload_stream(&self, upload: StreamUpload) -> Result<()> { - call_inner!(self, upload_stream(upload)) + async fn store_batch(&self, batch: Vec) -> Result<()> { + call_inner!(self, store_batch(batch)) } async fn list_prefix<'a>(&'a self, prefix: &'a str) -> BoxStream<'a, Result> { diff --git a/crates/lib/docs_rs_storage/src/backends/s3.rs b/crates/lib/docs_rs_storage/src/backends/s3.rs index 1e4f4135c..93f7a6c67 100644 --- a/crates/lib/docs_rs_storage/src/backends/s3.rs +++ b/crates/lib/docs_rs_storage/src/backends/s3.rs @@ -1,7 +1,7 @@ use crate::{ Config, backends::StorageBackendMethods, - blob::{StreamUpload, StreamingBlob}, + blob::{BlobUpload, StreamingBlob}, errors::PathNotFoundError, metrics::StorageMetrics, types::FileRange, @@ -13,20 +13,15 @@ use aws_sdk_s3::{ Client, config::{Region, retry::RetryConfig}, error::{ProvideErrorMetadata, SdkError}, - primitives::ByteStream, types::{Delete, ObjectIdentifier}, }; use aws_smithy_types_convert::date_time::DateTimeExt; use chrono::Utc; use docs_rs_headers::{ETag, compute_etag}; use futures_util::{ - TryStreamExt, - stream::{BoxStream, StreamExt}, + future::TryFutureExt, + stream::{BoxStream, FuturesUnordered, StreamExt}, }; -use http_body::Frame; -use http_body_util::StreamBody; -use opentelemetry::KeyValue; -use tokio_util::io::ReaderStream; use tracing::{error, warn}; // error codes to check for when trying to determine if an error is @@ -235,50 +230,45 @@ impl StorageBackendMethods for S3Backend { }) } - async fn upload_stream(&self, upload: StreamUpload) -> Result<(), Error> { - let StreamUpload { - path, - mime, - source, - compression, - } = upload; - - let content_length = source.content_length().await?; - - let mut last_err = None; - - for attempt in 1..=3 { - let reader = source.reader().await?; - let stream = ReaderStream::new(reader).map_ok(Frame::data); + async fn store_batch(&self, mut batch: Vec) -> Result<(), Error> { + // Attempt to upload the batch 3 times + for _ in 0..3 { + let mut futures = FuturesUnordered::new(); + for blob in batch.drain(..) { + futures.push( + self.client + .put_object() + .bucket(&self.bucket) + .key(&blob.path) + .body(blob.content.clone().into()) + .content_type(blob.mime.to_string()) + .set_content_encoding(blob.compression.map(|alg| alg.to_string())) + .send() + .map_ok(|_| { + self.otel_metrics.uploaded_files.add(1, &[]); + }) + .map_err(|err| { + warn!(?err, "Failed to upload blob to S3"); + // Reintroduce failed blobs for a retry + blob + }), + ); + } - match self - .client - .put_object() - .bucket(&self.bucket) - .key(&path) - .body(ByteStream::from_body_1_x(StreamBody::new(stream))) - .content_length(content_length as i64) - .content_type(mime.to_string()) - .set_content_encoding(compression.map(|alg| alg.to_string())) - .send() - .await - { - Ok(_) => { - self.otel_metrics - .uploaded_files - .add(1, &[KeyValue::new("attempt", attempt.to_string())]); - return Ok(()); - } - Err(err) => { - warn!(?err, attempt = attempt + 1, %path, "failed to upload blob to S3"); - last_err = Some(err); + while let Some(result) = futures.next().await { + // Push each failed blob back into the batch + if let Err(blob) = result { + batch.push(blob); } } + + // If we uploaded everything in the batch, we're done + if batch.is_empty() { + return Ok(()); + } } - Err(last_err - .expect("upload retry loop exited without a result") - .into()) + panic!("failed to upload 3 times, exiting"); } async fn list_prefix<'a>(&'a self, prefix: &'a str) -> BoxStream<'a, Result> { diff --git a/crates/lib/docs_rs_storage/src/blob.rs b/crates/lib/docs_rs_storage/src/blob.rs index 6079e47c6..4dd30c473 100644 --- a/crates/lib/docs_rs_storage/src/blob.rs +++ b/crates/lib/docs_rs_storage/src/blob.rs @@ -4,66 +4,8 @@ use chrono::{DateTime, Utc}; use docs_rs_headers::{ETag, compute_etag}; use docs_rs_types::CompressionAlgorithm; use mime::Mime; -use std::{ - fmt, - io::{Cursor, SeekFrom}, - sync::Arc, -}; -use tokio::{ - fs, - io::{self, AsyncBufRead, AsyncBufReadExt, AsyncRead, AsyncSeekExt}, -}; - -pub enum StreamUploadSource { - Bytes(Arc<[u8]>), - File(fs::File), -} - -impl StreamUploadSource { - pub async fn reader(&self) -> io::Result> { - Ok(match self { - Self::Bytes(bytes) => Box::new(Cursor::new(bytes.clone())), - Self::File(file) => { - let mut cloned = file.try_clone().await?; - cloned.seek(SeekFrom::Start(0)).await?; - Box::new(cloned) - } - }) - } - - pub async fn content_length(&self) -> io::Result { - Ok(match self { - Self::Bytes(bytes) => bytes.len() as u64, - Self::File(file) => file.metadata().await?.len(), - }) - } -} - -/// Represents a stream blob to be uploaded to storage. -/// -/// NOTE: Right now we only support uploads where the size is known in advance. -/// We can add support for streams with unknown size, but this would mean -/// using an intermediate fixed-size buffer and multipart uploads for these cases. -/// But: the multipart machinery is only worth the complexity if the stream is: -/// - unknown size -/// - bigger (there's a 5 MiB size limit for each part) -pub struct StreamUpload { - pub path: String, - pub mime: Mime, - pub source: StreamUploadSource, - pub compression: Option, -} - -impl From for StreamUpload { - fn from(value: BlobUpload) -> Self { - Self { - path: value.path, - mime: value.mime, - source: StreamUploadSource::Bytes(Arc::from(value.content)), - compression: value.compression, - } - } -} +use std::io; +use tokio::io::{AsyncBufRead, AsyncBufReadExt}; /// represents a blob to be uploaded to storage. #[derive(Clone, Debug, PartialEq, Eq)] @@ -118,7 +60,7 @@ pub struct StreamingBlob { pub content: Box, } -impl fmt::Debug for StreamingBlob { +impl std::fmt::Debug for StreamingBlob { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("StreamingBlob") .field("path", &self.path) @@ -168,7 +110,7 @@ impl StreamingBlob { let mut content = SizedBuffer::new(max_size); content.reserve(self.content_length); - io::copy(&mut self.content, &mut content).await?; + tokio::io::copy(&mut self.content, &mut content).await?; Ok(Blob { path: self.path, @@ -190,7 +132,7 @@ impl From for StreamingBlob { etag: value.etag, compression: value.compression, content_length: value.content.len(), - content: Box::new(Cursor::new(value.content)), + content: Box::new(io::Cursor::new(value.content)), } } } @@ -200,10 +142,6 @@ mod test { use super::*; use crate::compress_async; use docs_rs_headers::compute_etag; - use tokio::{ - fs, - io::{AsyncReadExt as _, AsyncWriteExt as _}, - }; const ZSTD_EOF_BYTES: [u8; 3] = [0x01, 0x00, 0x00]; @@ -219,7 +157,7 @@ mod test { compression: alg, etag: Some(compute_etag(&content)), content_length: content.len(), - content: Box::new(Cursor::new(content)), + content: Box::new(io::Cursor::new(content)), } } @@ -246,52 +184,6 @@ mod test { Ok(()) } - #[tokio::test] - async fn test_stream_upload_source_bytes_creates_fresh_readers() -> Result<()> { - const CONTENT: &[u8] = b"Hello, world!"; - - let source = StreamUploadSource::Bytes(Arc::from(CONTENT)); - assert_eq!(source.content_length().await?, CONTENT.len() as u64); - - let mut first = source.reader().await?; - let mut first_buf = Vec::new(); - first.read_to_end(&mut first_buf).await?; - assert_eq!(first_buf, CONTENT); - - let mut second = source.reader().await?; - let mut second_buf = Vec::new(); - second.read_to_end(&mut second_buf).await?; - assert_eq!(second_buf, CONTENT); - - Ok(()) - } - - #[tokio::test] - async fn test_stream_upload_source_file_creates_fresh_readers() -> Result<()> { - const CONTENT: &[u8] = b"Hello, world!"; - - let tempfile = tempfile::NamedTempFile::new()?; - let mut file = fs::File::from_std(tempfile.reopen()?); - file.write_all(CONTENT).await?; - file.seek(std::io::SeekFrom::Start(CONTENT.len() as u64)) - .await?; - - let source = StreamUploadSource::File(file); - assert_eq!(source.content_length().await?, CONTENT.len() as u64); - - let mut first = source.reader().await?; - let mut first_buf = Vec::new(); - first.read_to_end(&mut first_buf).await?; - assert_eq!(first_buf, CONTENT); - - let mut second = source.reader().await?; - let mut second_buf = Vec::new(); - second.read_to_end(&mut second_buf).await?; - assert_eq!(second_buf, CONTENT); - - Ok(()) - } - #[tokio::test] async fn test_streaming_broken_zstd_blob() -> Result<()> { const NOT_ZSTD: &[u8] = b"Hello, world!"; @@ -334,7 +226,7 @@ mod test { let mut compressed_content = Vec::new(); let alg = CompressionAlgorithm::Zstd; compress_async( - &mut Cursor::new(CONTENT.to_vec()), + &mut io::Cursor::new(CONTENT.to_vec()), &mut compressed_content, alg, ) diff --git a/crates/lib/docs_rs_storage/src/config.rs b/crates/lib/docs_rs_storage/src/config.rs index 4da949f99..af2ccdd8b 100644 --- a/crates/lib/docs_rs_storage/src/config.rs +++ b/crates/lib/docs_rs_storage/src/config.rs @@ -106,15 +106,11 @@ pub struct Config { // we often also add compression on top of it, which is CPU-bound, // even when just light / simpler compression. pub local_filesystem_parallelism: usize, - - // How much we want to parallelize file uploads / downloads. - pub network_parallelism: usize, } impl AppConfig for Config { fn from_environment() -> anyhow::Result { let prefix: PathBuf = require_env("DOCSRS_PREFIX")?; - let cores = std::thread::available_parallelism()?.get(); Ok(Self { temp_dir: prefix.join("tmp"), @@ -128,8 +124,10 @@ impl AppConfig for Config { max_file_size_html: env("DOCSRS_MAX_FILE_SIZE_HTML", 50 * 1024 * 1024)?, #[cfg(any(test, feature = "testing"))] s3_bucket_is_temporary: false, - local_filesystem_parallelism: env("DOCSRS_LOCAL_FILESYSTEM_PARALLELISM", cores)?, - network_parallelism: env("DOCSRS_NETWORK_PARALLELISM", 8usize.min(cores))?, + local_filesystem_parallelism: env( + "DOCSRS_LOCAL_FILESYSTEM_PARALLELISM", + std::thread::available_parallelism()?.get(), + )?, }) } diff --git a/crates/lib/docs_rs_storage/src/storage/blocking.rs b/crates/lib/docs_rs_storage/src/storage/blocking.rs index 59c28caaa..bd4c5a152 100644 --- a/crates/lib/docs_rs_storage/src/storage/blocking.rs +++ b/crates/lib/docs_rs_storage/src/storage/blocking.rs @@ -1,4 +1,4 @@ -use crate::{blob::Blob, file::FileEntry, storage::non_blocking::AsyncStorage}; +use crate::{blob::Blob, file::FileEntry, storage::non_blocking::AsyncStorage, types::FileRange}; use anyhow::Result; use docs_rs_types::{BuildId, CompressionAlgorithm, KrateName, Version}; use std::{fmt, path::Path, sync::Arc}; @@ -10,6 +10,7 @@ pub struct Storage { runtime: runtime::Handle, } +#[allow(dead_code)] impl Storage { pub fn new(inner: Arc, runtime: runtime::Handle) -> Self { Self { inner, runtime } @@ -69,6 +70,17 @@ impl Storage { self.runtime.block_on(self.inner.get(path, max_size)) } + pub(crate) fn get_range( + &self, + path: &str, + max_size: usize, + range: FileRange, + compression: Option, + ) -> Result { + self.runtime + .block_on(self.inner.get_range(path, max_size, range, compression)) + } + pub fn get_from_archive( &self, archive_path: &str, @@ -128,6 +140,17 @@ impl Storage { self.runtime.block_on(self.inner.store_one(path, content)) } + // Store file into the backend at the given path (also used to detect mime type), returns the + // chosen compression algorithm + pub fn store_path( + &self, + target_path: impl Into + std::fmt::Debug, + source_path: impl AsRef + std::fmt::Debug, + ) -> Result { + self.runtime + .block_on(self.inner.store_path(target_path, source_path)) + } + /// sync wrapper for the list_prefix function /// purely for testing purposes since it collects all files into a Vec. #[cfg(feature = "testing")] diff --git a/crates/lib/docs_rs_storage/src/storage/non_blocking.rs b/crates/lib/docs_rs_storage/src/storage/non_blocking.rs index 7c5e8350d..c7cc48bd0 100644 --- a/crates/lib/docs_rs_storage/src/storage/non_blocking.rs +++ b/crates/lib/docs_rs_storage/src/storage/non_blocking.rs @@ -4,7 +4,7 @@ use crate::{ Config, archive_index::{self, ARCHIVE_INDEX_FILE_EXTENSION}, backends::{StorageBackend, StorageBackendMethods, s3::S3Backend}, - blob::{Blob, StreamUpload, StreamUploadSource, StreamingBlob}, + blob::{Blob, BlobUpload, StreamingBlob}, compression::{compress, compress_async}, errors::PathNotFoundError, file::FileEntry, @@ -21,13 +21,10 @@ use docs_rs_opentelemetry::AnyMeterProvider; use docs_rs_types::{BuildId, CompressionAlgorithm, KrateName, Version}; use docs_rs_utils::spawn_blocking; use futures_util::{TryStreamExt as _, future, stream::BoxStream}; -use std::{fmt, io::Cursor, path::Path, pin::Pin, sync::Arc}; +use std::{fmt, path::Path, pin::Pin, sync::Arc}; use tokio::{fs, io}; use tracing::{info_span, instrument, trace, warn}; -/// buffer size when writing zip files. -pub(crate) const ZIP_BUFFER_SIZE: usize = 1024 * 1024; - pub struct AsyncStorage { backend: StorageBackend, config: Arc, @@ -184,7 +181,8 @@ impl AsyncStorage { Ok(self.get_raw_stream(path).await?.decompress().await?) } - #[cfg(test)] + /// get, decompress and materialize part of an object from store + #[instrument(skip(self))] pub(crate) async fn get_range( &self, path: &str, @@ -298,7 +296,7 @@ impl AsyncStorage { root_dir: impl AsRef + fmt::Debug, ) -> Result<(Vec, CompressionAlgorithm)> { let root_dir = root_dir.as_ref(); - let (zip_file, file_paths) = + let (zip_content, file_paths) = spawn_blocking({ use std::{io, fs}; let archive_path = archive_path.to_owned(); @@ -318,8 +316,7 @@ impl AsyncStorage { // also has to be added as supported algorithm for storage compression, together // with a mapping in `storage::archive_index::Index::new_from_zip`. - - let zip_file = { + let zip_content = { let _span = info_span!("create_zip_archive", %archive_path, root_dir=%root_dir.display()).entered(); @@ -327,9 +324,7 @@ impl AsyncStorage { .compression_method(zip::CompressionMethod::Bzip2) .compression_level(Some(3)); - // rustdoc archives can become a couple of GiB big, so we better use a tempfile. - let zip_file = tempfile::tempfile()?; - let mut zip = zip::ZipWriter::new(io::BufWriter::with_capacity(ZIP_BUFFER_SIZE, zip_file)); + let mut zip = zip::ZipWriter::new(io::Cursor::new(Vec::new())); for file_path in get_file_list(&root_dir) { let file_path = file_path?; @@ -339,59 +334,55 @@ impl AsyncStorage { file_paths.push(FileEntry{path: file_path, size: file.metadata()?.len()}); } - zip.finish()?.into_inner()? + zip.finish()?.into_inner() }; Ok(( - zip_file, + zip_content, file_paths )) } }) .await?; - let zip_file = fs::File::from_std(zip_file); let alg = CompressionAlgorithm::default(); let remote_index_path = format!("{}.{ARCHIVE_INDEX_FILE_EXTENSION}", &archive_path); - let (zip_file, compressed_index_file) = { + let (zip_content, compressed_index_content) = { let _span = info_span!("create_archive_index", %remote_index_path).entered(); fs::create_dir_all(&self.config.temp_dir).await?; let local_index_path = tempfile::NamedTempFile::new_in(&self.config.temp_dir)?.into_temp_path(); - let zip_reader = - archive_index::create(io::BufReader::new(zip_file), &local_index_path).await?; - let zip_file = zip_reader.into_inner(); + let zip_cursor = + archive_index::create(std::io::Cursor::new(zip_content), &local_index_path).await?; + let zip_content = zip_cursor.into_inner(); - // compressed index can become up to a couple 100 MiB big, so rather use a tempfile. - let mut compressed_index_file = - fs::File::from_std(spawn_blocking(|| Ok(tempfile::tempfile()?)).await?); + let mut buf: Vec = Vec::new(); compress_async( &mut io::BufReader::new(fs::File::open(&local_index_path).await?), - &mut io::BufWriter::new(&mut compressed_index_file), + &mut buf, alg, ) .await?; - (zip_file, compressed_index_file) + (zip_content, buf) }; self.backend - .upload_stream(StreamUpload { - path: archive_path.to_string(), - mime: mimes::APPLICATION_ZIP.clone(), - source: StreamUploadSource::File(zip_file), - compression: None, - }) - .await?; - - self.backend - .upload_stream(StreamUpload { - path: remote_index_path, - mime: mime::APPLICATION_OCTET_STREAM, - source: StreamUploadSource::File(compressed_index_file), - compression: Some(alg), - }) + .store_batch(vec![ + BlobUpload { + path: archive_path.to_string(), + mime: mimes::APPLICATION_ZIP.clone(), + content: zip_content, + compression: None, + }, + BlobUpload { + path: remote_index_path, + mime: mime::APPLICATION_OCTET_STREAM, + content: compressed_index_content, + compression: Some(alg), + }, + ]) .await?; Ok((file_paths, CompressionAlgorithm::Bzip2)) @@ -404,64 +395,65 @@ impl AsyncStorage { prefix: impl AsRef + fmt::Debug, root_dir: impl AsRef + fmt::Debug, ) -> Result<(Vec, CompressionAlgorithm)> { - let prefix = prefix.as_ref().to_path_buf(); + let prefix = prefix.as_ref(); let root_dir = root_dir.as_ref(); let alg = CompressionAlgorithm::default(); - let file_paths_and_mimes: Vec<_> = walk_dir_recursive(&root_dir) + let (file_paths_and_mimes, blobs): (Vec<_>, Vec<_>) = walk_dir_recursive(&root_dir) .err_into::() - .map_ok(|item| { - let prefix = prefix.clone(); - async move { - // Some files have insufficient permissions - // (like .lock file created by cargo in documentation directory). - // Skip these files. - let Ok(file) = fs::File::open(&item).await else { - return Ok(None); - }; - - let content = { - let mut buf: Vec = Vec::new(); - compress_async(io::BufReader::new(file), &mut buf, alg).await?; - buf - }; - - let bucket_path = prefix.join(&item.relative).to_string_lossy().to_string(); - - let file_size = item.metadata.len(); - - let file_info = FileEntry { - path: item.relative.clone(), - size: file_size, - }; - let mime = file_info.mime().clone(); - - self.backend - .upload_stream(StreamUpload { - path: bucket_path, - mime, - source: StreamUploadSource::Bytes(content.into()), - compression: Some(alg), - }) - .await?; - - Ok(Some(file_info)) - } + .map_ok(|item| async move { + // Some files have insufficient permissions + // (like .lock file created by cargo in documentation directory). + // Skip these files. + let Ok(file) = fs::File::open(&item).await else { + return Ok(None); + }; + + let content = { + let mut buf: Vec = Vec::new(); + compress_async(io::BufReader::new(file), &mut buf, alg).await?; + buf + }; + + let bucket_path = prefix.join(&item.relative).to_string_lossy().to_string(); + + let file_size = item.metadata.len(); + + let file_info = FileEntry { + path: item.relative.clone(), + size: file_size, + }; + let mime = file_info.mime(); + + Ok(Some(( + file_info, + BlobUpload { + path: bucket_path, + mime, + content, + compression: Some(alg), + }, + ))) }) - .try_buffer_unordered(self.config.network_parallelism) + .try_buffer_unordered(self.config.local_filesystem_parallelism) .try_filter_map(|item| future::ready(Ok(item))) - .try_collect() + .try_fold( + (Vec::new(), Vec::new()), + |(mut file_paths_and_mimes, mut blobs), (file_info, blob)| async move { + file_paths_and_mimes.push(file_info); + blobs.push(blob); + Ok((file_paths_and_mimes, blobs)) + }, + ) .await?; + self.backend.store_batch(blobs).await?; Ok((file_paths_and_mimes, alg)) } #[cfg(test)] - pub async fn store_blobs(&self, blobs: Vec) -> Result<()> { - for blob in blobs { - self.backend.upload_stream(blob.into()).await?; - } - Ok(()) + pub async fn store_blobs(&self, blobs: Vec) -> Result<()> { + self.backend.store_batch(blobs).await } // Store file into the backend at the given path, uncompressed. @@ -477,12 +469,12 @@ impl AsyncStorage { let mime = detect_mime(&path).to_owned(); self.backend - .upload_stream(StreamUpload { + .store_batch(vec![BlobUpload { path, mime, - source: StreamUploadSource::Bytes(content.into()), + content, compression: None, - }) + }]) .await?; Ok(()) @@ -497,17 +489,18 @@ impl AsyncStorage { content: impl Into>, ) -> Result { let path = path.into(); + let content = content.into(); let alg = CompressionAlgorithm::default(); - let content = compress(Cursor::new(content.into()), alg)?; + let content = compress(&*content, alg)?; let mime = detect_mime(&path).to_owned(); self.backend - .upload_stream(StreamUpload { + .store_batch(vec![BlobUpload { path, mime, - source: StreamUploadSource::Bytes(content.into()), + content, compression: Some(alg), - }) + }]) .await?; Ok(alg) @@ -538,12 +531,12 @@ impl AsyncStorage { let mime = detect_mime(&target_path).to_owned(); self.backend - .upload_stream(StreamUpload { + .store_batch(vec![BlobUpload { path: target_path, mime, - source: StreamUploadSource::Bytes(content.into()), + content, compression: Some(alg), - }) + }]) .await?; Ok(alg) @@ -600,7 +593,6 @@ impl fmt::Debug for AsyncStorage { #[cfg(test)] mod backend_tests { use super::*; - use crate::blob::BlobUpload; use crate::{PathNotFoundError, errors::SizeLimitReached}; use docs_rs_headers::compute_etag; use docs_rs_opentelemetry::testing::TestMetrics;