Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions application/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -729,6 +729,16 @@ nestify::nest! {
pub transfers: #[derive(ToSchema, Deserialize, Serialize, DefaultFromSerde)] #[serde(default)] pub struct SystemTransfers {
#[serde(default)]
pub download_limit: MiB,

#[serde(default)]
#[schema(inline)]
pub storage_pool: #[derive(ToSchema, Deserialize, Serialize, DefaultFromSerde)] #[serde(default)] pub struct SystemTransfersStoragePool {
#[serde(default)]
pub enabled: bool,

#[serde(default)]
pub pool_name: String,
},
},
},
#[serde(default)]
Expand Down
33 changes: 33 additions & 0 deletions application/src/routes/api/transfers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,39 @@ mod post {
}
};

{
let storage_pool = &state.config.system.transfers.storage_pool;
let source_pool = headers
.get("X-Storage-Pool")
.and_then(|v| v.to_str().ok())
.unwrap_or_default();

if storage_pool.enabled
&& !storage_pool.pool_name.is_empty()
&& !source_pool.is_empty()
&& storage_pool.pool_name.eq_ignore_ascii_case(source_pool)
{
server.filesystem.setup().await;
server
.transferring
.store(false, std::sync::atomic::Ordering::SeqCst);
state
.config
.client
.set_server_transfer(subject, true, vec![])
.await?;
server
.websocket
.send(crate::server::websocket::WebsocketMessage::new(
crate::server::websocket::WebsocketEvent::ServerTransferStatus,
["completed".into()].into(),
))
.ok();

return ApiResponse::new_serialized(Response {}).ok();
}
}

let handle = tokio::spawn({
let runtime = tokio::runtime::Handle::current();
let server = server.clone();
Expand Down
11 changes: 10 additions & 1 deletion application/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1521,7 +1521,16 @@ impl Server {
tokio::spawn({
let server = self.clone();

async move { server.filesystem.destroy().await }
async move {
let pool = &server.app_state.config.system.transfers.storage_pool;
let skip_file_removal = pool.enabled
&& !pool.pool_name.is_empty()
&& server.transferring.load(Ordering::SeqCst);

if !skip_file_removal {
server.filesystem.destroy().await;
}
}
});
}

Expand Down
30 changes: 19 additions & 11 deletions application/src/server/transfer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -531,12 +531,19 @@ impl OutgoingServerTransfer {
}
});

let response = reqwest::Client::new()
let storage_pool = &server.app_state.config.system.transfers.storage_pool;
let include_storage_pool =
storage_pool.enabled && !storage_pool.pool_name.is_empty();

let mut response = reqwest::Client::new()
.post(&url)
.header("Authorization", &token)
.header("Multiplex-Stream-Count", multiplex_streams)
.multipart(form)
.send();
.multipart(form);
if include_storage_pool {
response = response.header("X-Storage-Pool", storage_pool.pool_name.as_str());
}
let response = response.send();
let mut multiplex_responses = Vec::new();
multiplex_responses.reserve_exact(multiplex_streams);

Expand Down Expand Up @@ -607,14 +614,15 @@ impl OutgoingServerTransfer {
.unwrap(),
);

multiplex_responses.push(
reqwest::Client::new()
.post(&url)
.header("Authorization", &token)
.header("Multiplex-Stream", i)
.multipart(form)
.send()
);
let mut request = reqwest::Client::new()
.post(&url)
.header("Authorization", &token)
.header("Multiplex-Stream", i)
.multipart(form);
if include_storage_pool {
request = request.header("X-Storage-Pool", storage_pool.pool_name.as_str());
}
multiplex_responses.push(request.send());
multiplex_tasks.push(archive_task);
multiplex_tasks.push(checksum_task);
}
Expand Down