Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 26 additions & 15 deletions rustic-mongo-buddy/src/mongo_data_exporter.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
use std::cmp;
use std::path::Path;

use tokio::io::AsyncReadExt;

use aws_config::BehaviorVersion;
use aws_sdk_s3::{Client, primitives::ByteStream, primitives::SdkBody};
use rustic_shell::shell_command_executor::ShellCommandExecutor;
Expand All @@ -15,6 +18,10 @@ pub struct MongoDataExporter {
impl MongoDataExporter {
const ZSTD_ARCHIVE_EXTENSION: &str = "tar.zst";
const ZSTD_ARCHIVE_OPTIONS: &str = "-acf";
const GIB: u64 = 1024 * 1024 * 1024;
const S3_PUT_OBJECT_MAX_SIZE_GIB: u64 = 5;
/// S3 PutObject accepts objects up to 5 GiB; at or above that, use multipart.
const S3_PUT_OBJECT_MAX_SIZE: u64 = Self::S3_PUT_OBJECT_MAX_SIZE_GIB * Self::GIB;

pub fn new(
mongo_uri: String,
Expand Down Expand Up @@ -101,11 +108,12 @@ impl MongoDataExporter {

info!("Will upload file to S3 bucket {s3_bucket_name} with key {s3_bucket_key}");

// Use multipart upload for files larger than 5GB
const MAX_SINGLE_UPLOAD_SIZE: u64 = 5 * 1024 * 1024 * 1024; // 5GB in bytes

if file_size > MAX_SINGLE_UPLOAD_SIZE {
info!("File size exceeds 5GB, using multipart upload");
// Use multipart upload at the S3 single-PUT size limit
if file_size >= Self::S3_PUT_OBJECT_MAX_SIZE {
info!(
"File size is at or exceeds {} GiB, using multipart upload",
Self::S3_PUT_OBJECT_MAX_SIZE_GIB
);
self.upload_multipart_to_s3(
&client,
s3_bucket_name,
Expand All @@ -115,7 +123,10 @@ impl MongoDataExporter {
)
.await;
} else {
info!("File size is within 5GB limit, using single upload");
info!(
"File size is below {} GiB limit, using single upload",
Self::S3_PUT_OBJECT_MAX_SIZE_GIB
);
let file_stream = ByteStream::from_path(file_path)
.await
.expect("Failed to read file");
Expand Down Expand Up @@ -157,11 +168,11 @@ impl MongoDataExporter {

info!("Created multipart upload with ID: {}", upload_id);

// Calculate part size (minimum 1GB, maximum 5GB per part)
const MIN_PART_SIZE: u64 = 1024 * 1024 * 1024; // 1GB
const MAX_PART_SIZE: u64 = 5 * 1024 * 1024 * 1024; // 5GB
let part_size = std::cmp::max(MIN_PART_SIZE, file_size / 10); // Aim for max 10 parts
let part_size = std::cmp::min(part_size, MAX_PART_SIZE);
// Calculate part size (minimum 1 GiB, maximum 5 GiB per part)
const MIN_PART_SIZE: u64 = MongoDataExporter::GIB;
const MAX_PART_SIZE: u64 = MongoDataExporter::S3_PUT_OBJECT_MAX_SIZE;
let part_size = cmp::max(MIN_PART_SIZE, file_size / 10); // Aim for max 10 parts
let part_size = cmp::min(part_size, MAX_PART_SIZE);

let mut part_number = 1;
let mut uploaded_parts = Vec::new();
Expand All @@ -173,16 +184,16 @@ impl MongoDataExporter {

// Read file in chunks and upload each part
loop {
let mut buffer = vec![0u8; part_size as usize];
let bytes_read = tokio::io::AsyncReadExt::read(&mut file, &mut buffer)
let mut buffer = Vec::with_capacity(part_size as usize);
let bytes_read = (&mut file)
.take(part_size)
.read_to_end(&mut buffer)
.await
.expect("Failed to read file");

if bytes_read == 0 {
break;
}

buffer.truncate(bytes_read);
let body = ByteStream::from(SdkBody::from(buffer));

info!("Uploading part {} ({} bytes)", part_number, bytes_read);
Expand Down
Loading