diff --git a/Cargo.lock b/Cargo.lock index ff35dc327..42a9a954e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2287,6 +2287,8 @@ dependencies = [ "flate2", "futures-util", "http 1.4.0", + "http-body 1.0.1", + "http-body-util", "itertools 0.14.0", "mime", "moka", diff --git a/crates/bin/docs_rs_admin/src/repackage.rs b/crates/bin/docs_rs_admin/src/repackage.rs index 9d949b8da..d6757ffc6 100644 --- a/crates/bin/docs_rs_admin/src/repackage.rs +++ b/crates/bin/docs_rs_admin/src/repackage.rs @@ -108,7 +108,6 @@ pub async fn repackage( /// repackage contents of a S3 path prefix into a single archive file. /// /// Not performance optimized, for now it just tries to be simple. -#[instrument(skip(storage))] async fn repackage_path( storage: &AsyncStorage, prefix: &str, diff --git a/crates/lib/docs_rs_storage/Cargo.toml b/crates/lib/docs_rs_storage/Cargo.toml index b7b6380aa..83773084a 100644 --- a/crates/lib/docs_rs_storage/Cargo.toml +++ b/crates/lib/docs_rs_storage/Cargo.toml @@ -37,6 +37,8 @@ docs_rs_utils = { path = "../docs_rs_utils" } flate2 = "1.1.1" futures-util = { workspace = true } http = { workspace = true } +http-body = "1.0.0" +http-body-util = "0.1.3" itertools = { workspace = true } mime = { workspace = true } moka = { version = "0.12.14", features = ["future"] } diff --git a/crates/lib/docs_rs_storage/src/archive_index.rs b/crates/lib/docs_rs_storage/src/archive_index.rs index 334fb96c6..3e3b54621 100644 --- a/crates/lib/docs_rs_storage/src/archive_index.rs +++ b/crates/lib/docs_rs_storage/src/archive_index.rs @@ -830,7 +830,7 @@ where #[cfg(test)] mod tests { use super::*; - use crate::blob::StreamingBlob; + use crate::{blob::StreamingBlob, storage::non_blocking::ZIP_BUFFER_SIZE}; use chrono::Utc; use docs_rs_config::AppConfig as _; use docs_rs_opentelemetry::testing::TestMetrics; @@ -839,14 +839,15 @@ mod tests { use zip::write::SimpleFileOptions; async fn create_test_archive(file_count: u32) -> Result { - spawn_blocking(move || { + let writer = spawn_blocking(move || { use std::io::Write as _; let tf = tempfile::tempfile()?; let objectcontent: Vec = (0..255).collect(); - let mut archive = zip::ZipWriter::new(tf); + let mut archive = + zip::ZipWriter::new(std::io::BufWriter::with_capacity(ZIP_BUFFER_SIZE, tf)); for i in 0..file_count { archive.start_file( format!("testfile{i}"), @@ -858,8 +859,9 @@ mod tests { } Ok(archive.finish()?) }) - .await - .map(fs::File::from_std) + .await?; + + Ok(fs::File::from_std(writer.into_inner()?)) } struct FakeDownloader { diff --git a/crates/lib/docs_rs_storage/src/backends/memory.rs b/crates/lib/docs_rs_storage/src/backends/memory.rs index 385cd7f3f..cf4dd5bbe 100644 --- a/crates/lib/docs_rs_storage/src/backends/memory.rs +++ b/crates/lib/docs_rs_storage/src/backends/memory.rs @@ -1,16 +1,18 @@ use crate::{ Blob, backends::StorageBackendMethods, - blob::{BlobUpload, StreamingBlob}, + blob::{StreamUpload, StreamingBlob}, errors::PathNotFoundError, metrics::StorageMetrics, types::FileRange, }; use anyhow::{Result, anyhow}; +use chrono::Utc; use dashmap::DashMap; use docs_rs_headers::compute_etag; use futures_util::stream::{self, BoxStream}; use itertools::Itertools as _; +use tokio::io; pub(crate) struct MemoryBackend { otel_metrics: StorageMetrics, @@ -46,16 +48,29 @@ impl StorageBackendMethods for MemoryBackend { Ok(blob.into()) } - async fn store_batch(&self, batch: Vec) -> Result<()> { - self.otel_metrics - .uploaded_files - .add(batch.len() as u64, &[]); + async fn upload_stream(&self, upload: StreamUpload) -> Result<()> { + let StreamUpload { + path, + mime, + source, + compression, + } = upload; - for upload in batch { - let blob: Blob = upload.into(); - self.objects.insert(blob.path.clone(), blob); - } + let mut content = source.reader().await?; + let mut buffer = Vec::new(); + io::copy(&mut content, &mut buffer).await?; + + let blob = Blob { + path, + mime, + date_updated: Utc::now(), + etag: Some(compute_etag(&buffer)), + content: buffer, + compression, + }; + self.otel_metrics.uploaded_files.add(1, &[]); + self.objects.insert(blob.path.clone(), blob); Ok(()) } diff --git a/crates/lib/docs_rs_storage/src/backends/mod.rs b/crates/lib/docs_rs_storage/src/backends/mod.rs index a257f8de3..effb27dc5 100644 --- a/crates/lib/docs_rs_storage/src/backends/mod.rs +++ b/crates/lib/docs_rs_storage/src/backends/mod.rs @@ -2,14 +2,14 @@ pub(crate) mod memory; pub(crate) mod s3; -use crate::{BlobUpload, StreamingBlob, types::FileRange}; +use crate::{StreamingBlob, blob::StreamUpload, types::FileRange}; use anyhow::Result; use futures_util::stream::BoxStream; pub(crate) trait StorageBackendMethods { async fn exists(&self, path: &str) -> Result; async fn get_stream(&self, path: &str, range: Option) -> Result; - async fn store_batch(&self, batch: Vec) -> Result<()>; + async fn upload_stream(&self, upload: StreamUpload) -> Result<()>; async fn list_prefix<'a>(&'a self, prefix: &'a str) -> BoxStream<'a, Result>; async fn delete_prefix(&self, prefix: &str) -> Result<()>; } @@ -39,8 +39,8 @@ impl StorageBackendMethods for StorageBackend { call_inner!(self, get_stream(path, range)) } - async fn store_batch(&self, batch: Vec) -> Result<()> { - call_inner!(self, store_batch(batch)) + async fn upload_stream(&self, upload: StreamUpload) -> Result<()> { + call_inner!(self, upload_stream(upload)) } async fn list_prefix<'a>(&'a self, prefix: &'a str) -> BoxStream<'a, Result> { diff --git a/crates/lib/docs_rs_storage/src/backends/s3.rs b/crates/lib/docs_rs_storage/src/backends/s3.rs index 93f7a6c67..1e4f4135c 100644 --- a/crates/lib/docs_rs_storage/src/backends/s3.rs +++ b/crates/lib/docs_rs_storage/src/backends/s3.rs @@ -1,7 +1,7 @@ use crate::{ Config, backends::StorageBackendMethods, - blob::{BlobUpload, StreamingBlob}, + blob::{StreamUpload, StreamingBlob}, errors::PathNotFoundError, metrics::StorageMetrics, types::FileRange, @@ -13,15 +13,20 @@ use aws_sdk_s3::{ Client, config::{Region, retry::RetryConfig}, error::{ProvideErrorMetadata, SdkError}, + primitives::ByteStream, types::{Delete, ObjectIdentifier}, }; use aws_smithy_types_convert::date_time::DateTimeExt; use chrono::Utc; use docs_rs_headers::{ETag, compute_etag}; use futures_util::{ - future::TryFutureExt, - stream::{BoxStream, FuturesUnordered, StreamExt}, + TryStreamExt, + stream::{BoxStream, StreamExt}, }; +use http_body::Frame; +use http_body_util::StreamBody; +use opentelemetry::KeyValue; +use tokio_util::io::ReaderStream; use tracing::{error, warn}; // error codes to check for when trying to determine if an error is @@ -230,45 +235,50 @@ impl StorageBackendMethods for S3Backend { }) } - async fn store_batch(&self, mut batch: Vec) -> Result<(), Error> { - // Attempt to upload the batch 3 times - for _ in 0..3 { - let mut futures = FuturesUnordered::new(); - for blob in batch.drain(..) { - futures.push( - self.client - .put_object() - .bucket(&self.bucket) - .key(&blob.path) - .body(blob.content.clone().into()) - .content_type(blob.mime.to_string()) - .set_content_encoding(blob.compression.map(|alg| alg.to_string())) - .send() - .map_ok(|_| { - self.otel_metrics.uploaded_files.add(1, &[]); - }) - .map_err(|err| { - warn!(?err, "Failed to upload blob to S3"); - // Reintroduce failed blobs for a retry - blob - }), - ); - } + async fn upload_stream(&self, upload: StreamUpload) -> Result<(), Error> { + let StreamUpload { + path, + mime, + source, + compression, + } = upload; - while let Some(result) = futures.next().await { - // Push each failed blob back into the batch - if let Err(blob) = result { - batch.push(blob); - } - } + let content_length = source.content_length().await?; + + let mut last_err = None; + + for attempt in 1..=3 { + let reader = source.reader().await?; + let stream = ReaderStream::new(reader).map_ok(Frame::data); - // If we uploaded everything in the batch, we're done - if batch.is_empty() { - return Ok(()); + match self + .client + .put_object() + .bucket(&self.bucket) + .key(&path) + .body(ByteStream::from_body_1_x(StreamBody::new(stream))) + .content_length(content_length as i64) + .content_type(mime.to_string()) + .set_content_encoding(compression.map(|alg| alg.to_string())) + .send() + .await + { + Ok(_) => { + self.otel_metrics + .uploaded_files + .add(1, &[KeyValue::new("attempt", attempt.to_string())]); + return Ok(()); + } + Err(err) => { + warn!(?err, attempt = attempt + 1, %path, "failed to upload blob to S3"); + last_err = Some(err); + } } } - panic!("failed to upload 3 times, exiting"); + Err(last_err + .expect("upload retry loop exited without a result") + .into()) } async fn list_prefix<'a>(&'a self, prefix: &'a str) -> BoxStream<'a, Result> { diff --git a/crates/lib/docs_rs_storage/src/blob.rs b/crates/lib/docs_rs_storage/src/blob.rs index 4dd30c473..6079e47c6 100644 --- a/crates/lib/docs_rs_storage/src/blob.rs +++ b/crates/lib/docs_rs_storage/src/blob.rs @@ -4,8 +4,66 @@ use chrono::{DateTime, Utc}; use docs_rs_headers::{ETag, compute_etag}; use docs_rs_types::CompressionAlgorithm; use mime::Mime; -use std::io; -use tokio::io::{AsyncBufRead, AsyncBufReadExt}; +use std::{ + fmt, + io::{Cursor, SeekFrom}, + sync::Arc, +}; +use tokio::{ + fs, + io::{self, AsyncBufRead, AsyncBufReadExt, AsyncRead, AsyncSeekExt}, +}; + +pub enum StreamUploadSource { + Bytes(Arc<[u8]>), + File(fs::File), +} + +impl StreamUploadSource { + pub async fn reader(&self) -> io::Result> { + Ok(match self { + Self::Bytes(bytes) => Box::new(Cursor::new(bytes.clone())), + Self::File(file) => { + let mut cloned = file.try_clone().await?; + cloned.seek(SeekFrom::Start(0)).await?; + Box::new(cloned) + } + }) + } + + pub async fn content_length(&self) -> io::Result { + Ok(match self { + Self::Bytes(bytes) => bytes.len() as u64, + Self::File(file) => file.metadata().await?.len(), + }) + } +} + +/// Represents a stream blob to be uploaded to storage. +/// +/// NOTE: Right now we only support uploads where the size is known in advance. +/// We can add support for streams with unknown size, but this would mean +/// using an intermediate fixed-size buffer and multipart uploads for these cases. +/// But: the multipart machinery is only worth the complexity if the stream is: +/// - unknown size +/// - bigger (there's a 5 MiB size limit for each part) +pub struct StreamUpload { + pub path: String, + pub mime: Mime, + pub source: StreamUploadSource, + pub compression: Option, +} + +impl From for StreamUpload { + fn from(value: BlobUpload) -> Self { + Self { + path: value.path, + mime: value.mime, + source: StreamUploadSource::Bytes(Arc::from(value.content)), + compression: value.compression, + } + } +} /// represents a blob to be uploaded to storage. #[derive(Clone, Debug, PartialEq, Eq)] @@ -60,7 +118,7 @@ pub struct StreamingBlob { pub content: Box, } -impl std::fmt::Debug for StreamingBlob { +impl fmt::Debug for StreamingBlob { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("StreamingBlob") .field("path", &self.path) @@ -110,7 +168,7 @@ impl StreamingBlob { let mut content = SizedBuffer::new(max_size); content.reserve(self.content_length); - tokio::io::copy(&mut self.content, &mut content).await?; + io::copy(&mut self.content, &mut content).await?; Ok(Blob { path: self.path, @@ -132,7 +190,7 @@ impl From for StreamingBlob { etag: value.etag, compression: value.compression, content_length: value.content.len(), - content: Box::new(io::Cursor::new(value.content)), + content: Box::new(Cursor::new(value.content)), } } } @@ -142,6 +200,10 @@ mod test { use super::*; use crate::compress_async; use docs_rs_headers::compute_etag; + use tokio::{ + fs, + io::{AsyncReadExt as _, AsyncWriteExt as _}, + }; const ZSTD_EOF_BYTES: [u8; 3] = [0x01, 0x00, 0x00]; @@ -157,7 +219,7 @@ mod test { compression: alg, etag: Some(compute_etag(&content)), content_length: content.len(), - content: Box::new(io::Cursor::new(content)), + content: Box::new(Cursor::new(content)), } } @@ -184,6 +246,52 @@ mod test { Ok(()) } + #[tokio::test] + async fn test_stream_upload_source_bytes_creates_fresh_readers() -> Result<()> { + const CONTENT: &[u8] = b"Hello, world!"; + + let source = StreamUploadSource::Bytes(Arc::from(CONTENT)); + assert_eq!(source.content_length().await?, CONTENT.len() as u64); + + let mut first = source.reader().await?; + let mut first_buf = Vec::new(); + first.read_to_end(&mut first_buf).await?; + assert_eq!(first_buf, CONTENT); + + let mut second = source.reader().await?; + let mut second_buf = Vec::new(); + second.read_to_end(&mut second_buf).await?; + assert_eq!(second_buf, CONTENT); + + Ok(()) + } + + #[tokio::test] + async fn test_stream_upload_source_file_creates_fresh_readers() -> Result<()> { + const CONTENT: &[u8] = b"Hello, world!"; + + let tempfile = tempfile::NamedTempFile::new()?; + let mut file = fs::File::from_std(tempfile.reopen()?); + file.write_all(CONTENT).await?; + file.seek(std::io::SeekFrom::Start(CONTENT.len() as u64)) + .await?; + + let source = StreamUploadSource::File(file); + assert_eq!(source.content_length().await?, CONTENT.len() as u64); + + let mut first = source.reader().await?; + let mut first_buf = Vec::new(); + first.read_to_end(&mut first_buf).await?; + assert_eq!(first_buf, CONTENT); + + let mut second = source.reader().await?; + let mut second_buf = Vec::new(); + second.read_to_end(&mut second_buf).await?; + assert_eq!(second_buf, CONTENT); + + Ok(()) + } + #[tokio::test] async fn test_streaming_broken_zstd_blob() -> Result<()> { const NOT_ZSTD: &[u8] = b"Hello, world!"; @@ -226,7 +334,7 @@ mod test { let mut compressed_content = Vec::new(); let alg = CompressionAlgorithm::Zstd; compress_async( - &mut io::Cursor::new(CONTENT.to_vec()), + &mut Cursor::new(CONTENT.to_vec()), &mut compressed_content, alg, ) diff --git a/crates/lib/docs_rs_storage/src/config.rs b/crates/lib/docs_rs_storage/src/config.rs index af2ccdd8b..4da949f99 100644 --- a/crates/lib/docs_rs_storage/src/config.rs +++ b/crates/lib/docs_rs_storage/src/config.rs @@ -106,11 +106,15 @@ pub struct Config { // we often also add compression on top of it, which is CPU-bound, // even when just light / simpler compression. pub local_filesystem_parallelism: usize, + + // How much we want to parallelize file uploads / downloads. + pub network_parallelism: usize, } impl AppConfig for Config { fn from_environment() -> anyhow::Result { let prefix: PathBuf = require_env("DOCSRS_PREFIX")?; + let cores = std::thread::available_parallelism()?.get(); Ok(Self { temp_dir: prefix.join("tmp"), @@ -124,10 +128,8 @@ impl AppConfig for Config { max_file_size_html: env("DOCSRS_MAX_FILE_SIZE_HTML", 50 * 1024 * 1024)?, #[cfg(any(test, feature = "testing"))] s3_bucket_is_temporary: false, - local_filesystem_parallelism: env( - "DOCSRS_LOCAL_FILESYSTEM_PARALLELISM", - std::thread::available_parallelism()?.get(), - )?, + local_filesystem_parallelism: env("DOCSRS_LOCAL_FILESYSTEM_PARALLELISM", cores)?, + network_parallelism: env("DOCSRS_NETWORK_PARALLELISM", 8usize.min(cores))?, }) } diff --git a/crates/lib/docs_rs_storage/src/storage/blocking.rs b/crates/lib/docs_rs_storage/src/storage/blocking.rs index bd4c5a152..59c28caaa 100644 --- a/crates/lib/docs_rs_storage/src/storage/blocking.rs +++ b/crates/lib/docs_rs_storage/src/storage/blocking.rs @@ -1,4 +1,4 @@ -use crate::{blob::Blob, file::FileEntry, storage::non_blocking::AsyncStorage, types::FileRange}; +use crate::{blob::Blob, file::FileEntry, storage::non_blocking::AsyncStorage}; use anyhow::Result; use docs_rs_types::{BuildId, CompressionAlgorithm, KrateName, Version}; use std::{fmt, path::Path, sync::Arc}; @@ -10,7 +10,6 @@ pub struct Storage { runtime: runtime::Handle, } -#[allow(dead_code)] impl Storage { pub fn new(inner: Arc, runtime: runtime::Handle) -> Self { Self { inner, runtime } @@ -70,17 +69,6 @@ impl Storage { self.runtime.block_on(self.inner.get(path, max_size)) } - pub(crate) fn get_range( - &self, - path: &str, - max_size: usize, - range: FileRange, - compression: Option, - ) -> Result { - self.runtime - .block_on(self.inner.get_range(path, max_size, range, compression)) - } - pub fn get_from_archive( &self, archive_path: &str, @@ -140,17 +128,6 @@ impl Storage { self.runtime.block_on(self.inner.store_one(path, content)) } - // Store file into the backend at the given path (also used to detect mime type), returns the - // chosen compression algorithm - pub fn store_path( - &self, - target_path: impl Into + std::fmt::Debug, - source_path: impl AsRef + std::fmt::Debug, - ) -> Result { - self.runtime - .block_on(self.inner.store_path(target_path, source_path)) - } - /// sync wrapper for the list_prefix function /// purely for testing purposes since it collects all files into a Vec. #[cfg(feature = "testing")] diff --git a/crates/lib/docs_rs_storage/src/storage/non_blocking.rs b/crates/lib/docs_rs_storage/src/storage/non_blocking.rs index c7cc48bd0..7c5e8350d 100644 --- a/crates/lib/docs_rs_storage/src/storage/non_blocking.rs +++ b/crates/lib/docs_rs_storage/src/storage/non_blocking.rs @@ -4,7 +4,7 @@ use crate::{ Config, archive_index::{self, ARCHIVE_INDEX_FILE_EXTENSION}, backends::{StorageBackend, StorageBackendMethods, s3::S3Backend}, - blob::{Blob, BlobUpload, StreamingBlob}, + blob::{Blob, StreamUpload, StreamUploadSource, StreamingBlob}, compression::{compress, compress_async}, errors::PathNotFoundError, file::FileEntry, @@ -21,10 +21,13 @@ use docs_rs_opentelemetry::AnyMeterProvider; use docs_rs_types::{BuildId, CompressionAlgorithm, KrateName, Version}; use docs_rs_utils::spawn_blocking; use futures_util::{TryStreamExt as _, future, stream::BoxStream}; -use std::{fmt, path::Path, pin::Pin, sync::Arc}; +use std::{fmt, io::Cursor, path::Path, pin::Pin, sync::Arc}; use tokio::{fs, io}; use tracing::{info_span, instrument, trace, warn}; +/// buffer size when writing zip files. +pub(crate) const ZIP_BUFFER_SIZE: usize = 1024 * 1024; + pub struct AsyncStorage { backend: StorageBackend, config: Arc, @@ -181,8 +184,7 @@ impl AsyncStorage { Ok(self.get_raw_stream(path).await?.decompress().await?) } - /// get, decompress and materialize part of an object from store - #[instrument(skip(self))] + #[cfg(test)] pub(crate) async fn get_range( &self, path: &str, @@ -296,7 +298,7 @@ impl AsyncStorage { root_dir: impl AsRef + fmt::Debug, ) -> Result<(Vec, CompressionAlgorithm)> { let root_dir = root_dir.as_ref(); - let (zip_content, file_paths) = + let (zip_file, file_paths) = spawn_blocking({ use std::{io, fs}; let archive_path = archive_path.to_owned(); @@ -316,7 +318,8 @@ impl AsyncStorage { // also has to be added as supported algorithm for storage compression, together // with a mapping in `storage::archive_index::Index::new_from_zip`. - let zip_content = { + + let zip_file = { let _span = info_span!("create_zip_archive", %archive_path, root_dir=%root_dir.display()).entered(); @@ -324,7 +327,9 @@ impl AsyncStorage { .compression_method(zip::CompressionMethod::Bzip2) .compression_level(Some(3)); - let mut zip = zip::ZipWriter::new(io::Cursor::new(Vec::new())); + // rustdoc archives can become a couple of GiB big, so we better use a tempfile. + let zip_file = tempfile::tempfile()?; + let mut zip = zip::ZipWriter::new(io::BufWriter::with_capacity(ZIP_BUFFER_SIZE, zip_file)); for file_path in get_file_list(&root_dir) { let file_path = file_path?; @@ -334,55 +339,59 @@ impl AsyncStorage { file_paths.push(FileEntry{path: file_path, size: file.metadata()?.len()}); } - zip.finish()?.into_inner() + zip.finish()?.into_inner()? }; Ok(( - zip_content, + zip_file, file_paths )) } }) .await?; + let zip_file = fs::File::from_std(zip_file); let alg = CompressionAlgorithm::default(); let remote_index_path = format!("{}.{ARCHIVE_INDEX_FILE_EXTENSION}", &archive_path); - let (zip_content, compressed_index_content) = { + let (zip_file, compressed_index_file) = { let _span = info_span!("create_archive_index", %remote_index_path).entered(); fs::create_dir_all(&self.config.temp_dir).await?; let local_index_path = tempfile::NamedTempFile::new_in(&self.config.temp_dir)?.into_temp_path(); - let zip_cursor = - archive_index::create(std::io::Cursor::new(zip_content), &local_index_path).await?; - let zip_content = zip_cursor.into_inner(); + let zip_reader = + archive_index::create(io::BufReader::new(zip_file), &local_index_path).await?; + let zip_file = zip_reader.into_inner(); - let mut buf: Vec = Vec::new(); + // compressed index can become up to a couple 100 MiB big, so rather use a tempfile. + let mut compressed_index_file = + fs::File::from_std(spawn_blocking(|| Ok(tempfile::tempfile()?)).await?); compress_async( &mut io::BufReader::new(fs::File::open(&local_index_path).await?), - &mut buf, + &mut io::BufWriter::new(&mut compressed_index_file), alg, ) .await?; - (zip_content, buf) + (zip_file, compressed_index_file) }; self.backend - .store_batch(vec![ - BlobUpload { - path: archive_path.to_string(), - mime: mimes::APPLICATION_ZIP.clone(), - content: zip_content, - compression: None, - }, - BlobUpload { - path: remote_index_path, - mime: mime::APPLICATION_OCTET_STREAM, - content: compressed_index_content, - compression: Some(alg), - }, - ]) + .upload_stream(StreamUpload { + path: archive_path.to_string(), + mime: mimes::APPLICATION_ZIP.clone(), + source: StreamUploadSource::File(zip_file), + compression: None, + }) + .await?; + + self.backend + .upload_stream(StreamUpload { + path: remote_index_path, + mime: mime::APPLICATION_OCTET_STREAM, + source: StreamUploadSource::File(compressed_index_file), + compression: Some(alg), + }) .await?; Ok((file_paths, CompressionAlgorithm::Bzip2)) @@ -395,65 +404,64 @@ impl AsyncStorage { prefix: impl AsRef + fmt::Debug, root_dir: impl AsRef + fmt::Debug, ) -> Result<(Vec, CompressionAlgorithm)> { - let prefix = prefix.as_ref(); + let prefix = prefix.as_ref().to_path_buf(); let root_dir = root_dir.as_ref(); let alg = CompressionAlgorithm::default(); - let (file_paths_and_mimes, blobs): (Vec<_>, Vec<_>) = walk_dir_recursive(&root_dir) + let file_paths_and_mimes: Vec<_> = walk_dir_recursive(&root_dir) .err_into::() - .map_ok(|item| async move { - // Some files have insufficient permissions - // (like .lock file created by cargo in documentation directory). - // Skip these files. - let Ok(file) = fs::File::open(&item).await else { - return Ok(None); - }; - - let content = { - let mut buf: Vec = Vec::new(); - compress_async(io::BufReader::new(file), &mut buf, alg).await?; - buf - }; - - let bucket_path = prefix.join(&item.relative).to_string_lossy().to_string(); - - let file_size = item.metadata.len(); - - let file_info = FileEntry { - path: item.relative.clone(), - size: file_size, - }; - let mime = file_info.mime(); - - Ok(Some(( - file_info, - BlobUpload { - path: bucket_path, - mime, - content, - compression: Some(alg), - }, - ))) + .map_ok(|item| { + let prefix = prefix.clone(); + async move { + // Some files have insufficient permissions + // (like .lock file created by cargo in documentation directory). + // Skip these files. + let Ok(file) = fs::File::open(&item).await else { + return Ok(None); + }; + + let content = { + let mut buf: Vec = Vec::new(); + compress_async(io::BufReader::new(file), &mut buf, alg).await?; + buf + }; + + let bucket_path = prefix.join(&item.relative).to_string_lossy().to_string(); + + let file_size = item.metadata.len(); + + let file_info = FileEntry { + path: item.relative.clone(), + size: file_size, + }; + let mime = file_info.mime().clone(); + + self.backend + .upload_stream(StreamUpload { + path: bucket_path, + mime, + source: StreamUploadSource::Bytes(content.into()), + compression: Some(alg), + }) + .await?; + + Ok(Some(file_info)) + } }) - .try_buffer_unordered(self.config.local_filesystem_parallelism) + .try_buffer_unordered(self.config.network_parallelism) .try_filter_map(|item| future::ready(Ok(item))) - .try_fold( - (Vec::new(), Vec::new()), - |(mut file_paths_and_mimes, mut blobs), (file_info, blob)| async move { - file_paths_and_mimes.push(file_info); - blobs.push(blob); - Ok((file_paths_and_mimes, blobs)) - }, - ) + .try_collect() .await?; - self.backend.store_batch(blobs).await?; Ok((file_paths_and_mimes, alg)) } #[cfg(test)] - pub async fn store_blobs(&self, blobs: Vec) -> Result<()> { - self.backend.store_batch(blobs).await + pub async fn store_blobs(&self, blobs: Vec) -> Result<()> { + for blob in blobs { + self.backend.upload_stream(blob.into()).await?; + } + Ok(()) } // Store file into the backend at the given path, uncompressed. @@ -469,12 +477,12 @@ impl AsyncStorage { let mime = detect_mime(&path).to_owned(); self.backend - .store_batch(vec![BlobUpload { + .upload_stream(StreamUpload { path, mime, - content, + source: StreamUploadSource::Bytes(content.into()), compression: None, - }]) + }) .await?; Ok(()) @@ -489,18 +497,17 @@ impl AsyncStorage { content: impl Into>, ) -> Result { let path = path.into(); - let content = content.into(); let alg = CompressionAlgorithm::default(); - let content = compress(&*content, alg)?; + let content = compress(Cursor::new(content.into()), alg)?; let mime = detect_mime(&path).to_owned(); self.backend - .store_batch(vec![BlobUpload { + .upload_stream(StreamUpload { path, mime, - content, + source: StreamUploadSource::Bytes(content.into()), compression: Some(alg), - }]) + }) .await?; Ok(alg) @@ -531,12 +538,12 @@ impl AsyncStorage { let mime = detect_mime(&target_path).to_owned(); self.backend - .store_batch(vec![BlobUpload { + .upload_stream(StreamUpload { path: target_path, mime, - content, + source: StreamUploadSource::Bytes(content.into()), compression: Some(alg), - }]) + }) .await?; Ok(alg) @@ -593,6 +600,7 @@ impl fmt::Debug for AsyncStorage { #[cfg(test)] mod backend_tests { use super::*; + use crate::blob::BlobUpload; use crate::{PathNotFoundError, errors::SizeLimitReached}; use docs_rs_headers::compute_etag; use docs_rs_opentelemetry::testing::TestMetrics;