Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions app/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ static_assertions.workspace = true
string-offset.workspace = true
sum_tree.workspace = true
tabwriter = "1.4"
tar = "0.4"
tempfile.workspace = true
thiserror.workspace = true
thousands = "0.2.0"
Expand Down
189 changes: 124 additions & 65 deletions app/src/remote_server/ssh_transport/installation/scp_fallback.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use std::fs::File;
use std::path::{Path, PathBuf};
use std::time::Duration;

use anyhow::Context as _;
use blocking::unblock;
use flate2::read::GzDecoder;
use futures::AsyncWriteExt as _;
use futures::TryStreamExt as _;
use http_client::StatusCode;
Expand Down Expand Up @@ -37,7 +40,7 @@ pub(super) async fn install(socket_path: &Path) -> Result<(), Error> {

let client_tarball_path = cached_remote_server_tarball(&platform)
.await
.map_err(Error::Other)?;
.map_err(|source| Error::ClientDownloadFailed { source })?;
let timeout = remote_server::setup::SCP_INSTALL_TIMEOUT;
let install_dir = remote_server::setup::remote_server_dir();
let remote_tarball_name = format!("oz-upload-{}.tar.gz", uuid::Uuid::new_v4());
Expand Down Expand Up @@ -96,10 +99,6 @@ fn remote_server_tarball_cache_root() -> PathBuf {
.join("tarballs")
}

fn remote_server_tarball_cache_temp_dir() -> PathBuf {
remote_server_tarball_cache_root().join(".tmp")
}

fn current_remote_server_tarball_cache_version() -> &'static str {
remote_server::setup::remote_server_artifact_version()
}
Expand All @@ -116,9 +115,42 @@ fn remote_server_tarball_cache_path(platform: &RemotePlatform) -> PathBuf {
}

async fn is_valid_cached_tarball(path: &Path) -> bool {
async_fs::metadata(path)
let metadata_is_valid = async_fs::metadata(path)
.await
.is_ok_and(|metadata| metadata.is_file() && metadata.len() > 0)
.is_ok_and(|metadata| metadata.is_file() && metadata.len() > 0);
if !metadata_is_valid {
return false;
}

let path = path.to_path_buf();
unblock(move || validate_gzip_tarball(&path)).await.is_ok()
}

fn validate_gzip_tarball(path: &Path) -> anyhow::Result<()> {
let file = File::open(path)
.with_context(|| format!("Failed to open remote-server tarball '{}'", path.display()))?;
let decoder = GzDecoder::new(file);
let mut archive = tar::Archive::new(decoder);
let entries = archive
.entries()
.with_context(|| format!("Failed to read tar entries from '{}'", path.display()))?;
let mut entry_count = 0;

for entry in entries {
let mut entry =
entry.with_context(|| format!("Failed to read tar entry from '{}'", path.display()))?;
std::io::copy(&mut entry, &mut std::io::sink())
.with_context(|| format!("Failed to validate tar entry from '{}'", path.display()))?;
entry_count += 1;
}

anyhow::ensure!(
entry_count > 0,
"Remote-server tarball '{}' contained no entries",
path.display()
);

Ok(())
}

/// Returns a local tarball for the remote platform.
Expand All @@ -127,25 +159,36 @@ async fn is_valid_cached_tarball(path: &Path) -> bool {
/// tarball into the cache and returns the newly cached path.
async fn cached_remote_server_tarball(platform: &RemotePlatform) -> anyhow::Result<PathBuf> {
let cache_path = remote_server_tarball_cache_path(platform);
if is_valid_cached_tarball(&cache_path).await {
let url = remote_server::setup::download_tarball_url(platform);
cached_remote_server_tarball_from(&url, &cache_path).await
}

async fn cached_remote_server_tarball_from(
url: &str,
cache_path: &Path,
) -> anyhow::Result<PathBuf> {
if is_valid_cached_tarball(cache_path).await {
log::info!(
"Using cached remote-server tarball at {}",
cache_path.display()
);
return Ok(cache_path);
return Ok(cache_path.to_path_buf());
}

if async_fs::metadata(&cache_path).await.is_ok() {
let _ = async_fs::remove_file(&cache_path).await;
if async_fs::metadata(cache_path).await.is_ok() {
log::warn!(
"Discarding invalid cached remote-server tarball at {}",
cache_path.display()
);
let _ = async_fs::remove_file(cache_path).await;
}

let url = remote_server::setup::download_tarball_url(platform);
log::info!(
"Downloading remote-server tarball from {url} into cache at {}",
cache_path.display()
);
download_remote_server_tarball_to_cache(&url, &cache_path).await?;
Ok(cache_path)
download_remote_server_tarball_to_cache(url, cache_path).await?;
Ok(cache_path.to_path_buf())
}

async fn download_remote_server_tarball_to_cache(
Expand All @@ -161,65 +204,32 @@ async fn download_remote_server_tarball_to_cache(
parent.display()
)
})?;
let temp_dir = remote_server_tarball_cache_temp_dir();
let temp_dir = parent.join(".tmp");
async_fs::create_dir_all(&temp_dir).await.with_context(|| {
format!(
"Failed to create remote-server tarball cache temp directory '{}'",
temp_dir.display()
)
})?;

// Download into a unique temp path first so a failed or partial download
// never appears at the shared cache path that other installs may reuse.
let temp_path = temp_dir.join(format!(
".{REMOTE_SERVER_TARBALL_CACHE_FILE_NAME}.{}.tmp",
uuid::Uuid::new_v4()
));

if let Err(e) = download_remote_server_tarball_with_retries(url, &temp_path).await {
let _ = async_fs::remove_file(&temp_path).await;
return Err(e);
}
if !is_valid_cached_tarball(&temp_path).await {
let _ = async_fs::remove_file(&temp_path).await;
anyhow::bail!("Downloaded remote-server tarball from {url} was empty");
}

if is_valid_cached_tarball(cache_path).await {
let _ = async_fs::remove_file(&temp_path).await;
return Ok(());
}

// Publish the validated temp file to the shared cache path. If another
// concurrent fallback populated the cache after the check above, that valid
// cache hit is good enough for this install, so discard our temp file.
match async_fs::rename(&temp_path, cache_path).await {
Ok(()) => Ok(()),
Err(e) if is_valid_cached_tarball(cache_path).await => {
let _ = async_fs::remove_file(&temp_path).await;
Ok(())
}
Err(e) => {
let _ = async_fs::remove_file(&temp_path).await;
Err(e).with_context(|| {
format!(
"Failed to move remote-server tarball into cache at '{}'",
cache_path.display()
)
})
}
}
}

async fn download_remote_server_tarball_with_retries(
url: &str,
temp_path: &Path,
) -> anyhow::Result<()> {
let http_client = http_client::Client::new();
let mut last_retryable_error = None;

for attempt in 1..=REMOTE_SERVER_TARBALL_DOWNLOAD_ATTEMPTS {
match download_remote_server_tarball_internal(&http_client, url, temp_path).await {
// Download into a fresh unique temp path for every attempt so a failed
// or partial response body can never be reused by a later retry.
let temp_path = temp_dir.join(format!(
".{REMOTE_SERVER_TARBALL_CACHE_FILE_NAME}.{}.tmp",
uuid::Uuid::new_v4()
));

let attempt_result =
download_remote_server_tarball_attempt(&http_client, url, cache_path, &temp_path).await;
if attempt_result.is_err() {
let _ = async_fs::remove_file(&temp_path).await;
}

match attempt_result {
Ok(()) => return Ok(()),
Err(DownloadAttemptError::Permanent(e)) => return Err(e),
Err(DownloadAttemptError::Retryable(e)) => {
Expand All @@ -232,15 +242,60 @@ async fn download_remote_server_tarball_with_retries(
}
}

Err(last_retryable_error.unwrap_or_else(|| {
anyhow::anyhow!("Remote-server tarball download failed without an error")
}))
Err(last_retryable_error
.unwrap_or_else(|| anyhow::anyhow!("Remote-server tarball download failed without an error"))
.context(format!(
"Remote-server tarball client download failed after {REMOTE_SERVER_TARBALL_DOWNLOAD_ATTEMPTS} attempts"
)))
}

async fn download_remote_server_tarball_attempt(
http_client: &http_client::Client,
url: &str,
cache_path: &Path,
temp_path: &Path,
) -> Result<(), DownloadAttemptError> {
download_remote_server_tarball_internal(http_client, url, temp_path).await?;
if !is_valid_cached_tarball(temp_path).await {
return Err(DownloadAttemptError::Retryable(anyhow::anyhow!(
"Downloaded remote-server tarball from {url} was not a valid gzip/tar archive"
)));
}

publish_remote_server_tarball_cache(temp_path, cache_path).await
}

enum DownloadAttemptError {
Retryable(anyhow::Error),
Permanent(anyhow::Error),
}
async fn publish_remote_server_tarball_cache(
temp_path: &Path,
cache_path: &Path,
) -> Result<(), DownloadAttemptError> {
if is_valid_cached_tarball(cache_path).await {
let _ = async_fs::remove_file(temp_path).await;
return Ok(());
}

// Publish the validated temp file to the shared cache path. If another
// concurrent fallback populated the cache after the check above, that valid
// cache hit is good enough for this install, so discard our temp file.
match async_fs::rename(temp_path, cache_path).await {
Ok(()) => Ok(()),
Err(e) if is_valid_cached_tarball(cache_path).await => {
let _ = async_fs::remove_file(temp_path).await;
Ok(())
}
Err(e) => {
let _ = async_fs::remove_file(temp_path).await;
Err(DownloadAttemptError::Permanent(anyhow::anyhow!(
"Failed to move remote-server tarball into cache at '{}': {e}",
cache_path.display()
)))
}
}
}

async fn download_remote_server_tarball_internal(
http_client: &http_client::Client,
Expand Down Expand Up @@ -305,3 +360,7 @@ fn is_retryable_download_status(status: StatusCode) -> bool {
StatusCode::REQUEST_TIMEOUT | StatusCode::TOO_MANY_REQUESTS
) || status.is_server_error()
}

#[cfg(test)]
#[path = "scp_fallback_tests.rs"]
mod tests;
Loading