Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion crates/bin/docs_rs_admin/src/repackage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,6 @@ pub async fn repackage(
/// repackage contents of a S3 path prefix into a single archive file.
///
/// Not performance optimized, for now it just tries to be simple.
#[instrument(skip(storage))]
async fn repackage_path(
storage: &AsyncStorage,
prefix: &str,
Expand Down
2 changes: 2 additions & 0 deletions crates/lib/docs_rs_storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ docs_rs_utils = { path = "../docs_rs_utils" }
flate2 = "1.1.1"
futures-util = { workspace = true }
http = { workspace = true }
http-body = "1.0.0"
http-body-util = "0.1.3"
itertools = { workspace = true }
mime = { workspace = true }
moka = { version = "0.12.14", features = ["future"] }
Expand Down
12 changes: 7 additions & 5 deletions crates/lib/docs_rs_storage/src/archive_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -830,7 +830,7 @@ where
#[cfg(test)]
mod tests {
use super::*;
use crate::blob::StreamingBlob;
use crate::{blob::StreamingBlob, storage::non_blocking::ZIP_BUFFER_SIZE};
use chrono::Utc;
use docs_rs_config::AppConfig as _;
use docs_rs_opentelemetry::testing::TestMetrics;
Expand All @@ -839,14 +839,15 @@ mod tests {
use zip::write::SimpleFileOptions;

async fn create_test_archive(file_count: u32) -> Result<fs::File> {
spawn_blocking(move || {
let writer = spawn_blocking(move || {
use std::io::Write as _;

let tf = tempfile::tempfile()?;

let objectcontent: Vec<u8> = (0..255).collect();

let mut archive = zip::ZipWriter::new(tf);
let mut archive =
zip::ZipWriter::new(std::io::BufWriter::with_capacity(ZIP_BUFFER_SIZE, tf));
for i in 0..file_count {
archive.start_file(
format!("testfile{i}"),
Expand All @@ -858,8 +859,9 @@ mod tests {
}
Ok(archive.finish()?)
})
.await
.map(fs::File::from_std)
.await?;

Ok(fs::File::from_std(writer.into_inner()?))
}

struct FakeDownloader {
Expand Down
33 changes: 24 additions & 9 deletions crates/lib/docs_rs_storage/src/backends/memory.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
use crate::{
Blob,
backends::StorageBackendMethods,
blob::{BlobUpload, StreamingBlob},
blob::{StreamUpload, StreamingBlob},
errors::PathNotFoundError,
metrics::StorageMetrics,
types::FileRange,
};
use anyhow::{Result, anyhow};
use chrono::Utc;
use dashmap::DashMap;
use docs_rs_headers::compute_etag;
use futures_util::stream::{self, BoxStream};
use itertools::Itertools as _;
use tokio::io;

pub(crate) struct MemoryBackend {
otel_metrics: StorageMetrics,
Expand Down Expand Up @@ -46,16 +48,29 @@ impl StorageBackendMethods for MemoryBackend {
Ok(blob.into())
}

async fn store_batch(&self, batch: Vec<BlobUpload>) -> Result<()> {
self.otel_metrics
.uploaded_files
.add(batch.len() as u64, &[]);
async fn upload_stream(&self, upload: StreamUpload) -> Result<()> {
let StreamUpload {
path,
mime,
source,
compression,
} = upload;

for upload in batch {
let blob: Blob = upload.into();
self.objects.insert(blob.path.clone(), blob);
}
let mut content = source.reader().await?;
let mut buffer = Vec::new();
io::copy(&mut content, &mut buffer).await?;

let blob = Blob {
path,
mime,
date_updated: Utc::now(),
etag: Some(compute_etag(&buffer)),
content: buffer,
compression,
};

self.otel_metrics.uploaded_files.add(1, &[]);
self.objects.insert(blob.path.clone(), blob);
Ok(())
}

Expand Down
8 changes: 4 additions & 4 deletions crates/lib/docs_rs_storage/src/backends/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@
pub(crate) mod memory;
pub(crate) mod s3;

use crate::{BlobUpload, StreamingBlob, types::FileRange};
use crate::{StreamingBlob, blob::StreamUpload, types::FileRange};
use anyhow::Result;
use futures_util::stream::BoxStream;

pub(crate) trait StorageBackendMethods {
async fn exists(&self, path: &str) -> Result<bool>;
async fn get_stream(&self, path: &str, range: Option<FileRange>) -> Result<StreamingBlob>;
async fn store_batch(&self, batch: Vec<BlobUpload>) -> Result<()>;
async fn upload_stream(&self, upload: StreamUpload) -> Result<()>;
async fn list_prefix<'a>(&'a self, prefix: &'a str) -> BoxStream<'a, Result<String>>;
async fn delete_prefix(&self, prefix: &str) -> Result<()>;
}
Expand Down Expand Up @@ -39,8 +39,8 @@ impl StorageBackendMethods for StorageBackend {
call_inner!(self, get_stream(path, range))
}

async fn store_batch(&self, batch: Vec<BlobUpload>) -> Result<()> {
call_inner!(self, store_batch(batch))
async fn upload_stream(&self, upload: StreamUpload) -> Result<()> {
call_inner!(self, upload_stream(upload))
}

async fn list_prefix<'a>(&'a self, prefix: &'a str) -> BoxStream<'a, Result<String>> {
Expand Down
84 changes: 47 additions & 37 deletions crates/lib/docs_rs_storage/src/backends/s3.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::{
Config,
backends::StorageBackendMethods,
blob::{BlobUpload, StreamingBlob},
blob::{StreamUpload, StreamingBlob},
errors::PathNotFoundError,
metrics::StorageMetrics,
types::FileRange,
Expand All @@ -13,15 +13,20 @@ use aws_sdk_s3::{
Client,
config::{Region, retry::RetryConfig},
error::{ProvideErrorMetadata, SdkError},
primitives::ByteStream,
types::{Delete, ObjectIdentifier},
};
use aws_smithy_types_convert::date_time::DateTimeExt;
use chrono::Utc;
use docs_rs_headers::{ETag, compute_etag};
use futures_util::{
future::TryFutureExt,
stream::{BoxStream, FuturesUnordered, StreamExt},
TryStreamExt,
stream::{BoxStream, StreamExt},
};
use http_body::Frame;
use http_body_util::StreamBody;
use opentelemetry::KeyValue;
use tokio_util::io::ReaderStream;
use tracing::{error, warn};

// error codes to check for when trying to determine if an error is
Expand Down Expand Up @@ -230,45 +235,50 @@ impl StorageBackendMethods for S3Backend {
})
}

async fn store_batch(&self, mut batch: Vec<BlobUpload>) -> Result<(), Error> {
// Attempt to upload the batch 3 times
for _ in 0..3 {
let mut futures = FuturesUnordered::new();
for blob in batch.drain(..) {
futures.push(
self.client
.put_object()
.bucket(&self.bucket)
.key(&blob.path)
.body(blob.content.clone().into())
.content_type(blob.mime.to_string())
.set_content_encoding(blob.compression.map(|alg| alg.to_string()))
.send()
.map_ok(|_| {
self.otel_metrics.uploaded_files.add(1, &[]);
})
.map_err(|err| {
warn!(?err, "Failed to upload blob to S3");
// Reintroduce failed blobs for a retry
blob
}),
);
}
async fn upload_stream(&self, upload: StreamUpload) -> Result<(), Error> {
let StreamUpload {
path,
mime,
source,
compression,
} = upload;

while let Some(result) = futures.next().await {
// Push each failed blob back into the batch
if let Err(blob) = result {
batch.push(blob);
}
}
let content_length = source.content_length().await?;

let mut last_err = None;

for attempt in 1..=3 {
let reader = source.reader().await?;
let stream = ReaderStream::new(reader).map_ok(Frame::data);

// If we uploaded everything in the batch, we're done
if batch.is_empty() {
return Ok(());
match self
.client
.put_object()
.bucket(&self.bucket)
.key(&path)
.body(ByteStream::from_body_1_x(StreamBody::new(stream)))
.content_length(content_length as i64)
.content_type(mime.to_string())
.set_content_encoding(compression.map(|alg| alg.to_string()))
.send()
.await
{
Ok(_) => {
self.otel_metrics
.uploaded_files
.add(1, &[KeyValue::new("attempt", attempt.to_string())]);
return Ok(());
}
Err(err) => {
warn!(?err, attempt = attempt + 1, %path, "failed to upload blob to S3");
last_err = Some(err);
}
}
}

panic!("failed to upload 3 times, exiting");
Err(last_err
.expect("upload retry loop exited without a result")
.into())
}

async fn list_prefix<'a>(&'a self, prefix: &'a str) -> BoxStream<'a, Result<String, Error>> {
Expand Down
Loading
Loading