diff --git a/Cargo.lock b/Cargo.lock index 9d8247e5..7a0215c8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2805,6 +2805,7 @@ checksum = "c08d65885ee38876c4f86fa503fb49d7b507c2b62552df7c70b2fce627e06381" name = "openshell-bootstrap" version = "0.0.0" dependencies = [ + "async-stream", "base64 0.22.1", "bollard", "bytes", diff --git a/crates/openshell-bootstrap/Cargo.toml b/crates/openshell-bootstrap/Cargo.toml index ab57ad57..942ffc48 100644 --- a/crates/openshell-bootstrap/Cargo.toml +++ b/crates/openshell-bootstrap/Cargo.toml @@ -11,6 +11,7 @@ rust-version.workspace = true [dependencies] openshell-core = { path = "../openshell-core" } +async-stream = "0.3" base64 = "0.22" bollard = { version = "0.20", features = ["ssh"] } bytes = { workspace = true } @@ -20,11 +21,11 @@ rcgen = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } tar = "0.4" +tempfile = "3" tokio = { workspace = true } tracing = { workspace = true } [dev-dependencies] -tempfile = "3" [lints] workspace = true diff --git a/crates/openshell-bootstrap/src/push.rs b/crates/openshell-bootstrap/src/push.rs index 0dcbaa6d..336d46c3 100644 --- a/crates/openshell-bootstrap/src/push.rs +++ b/crates/openshell-bootstrap/src/push.rs @@ -8,15 +8,23 @@ //! uploaded into the gateway container as a tar file via the Docker //! `put_archive` API, and then imported into containerd via `ctr images import`. //! +//! To avoid unbounded memory usage with large images, the export is streamed +//! to a temporary file on disk, then streamed back through a tar wrapper into +//! the Docker upload API. Peak memory usage is `O(chunk_size)` regardless of +//! image size. +//! //! The standalone `ctr` binary is used (not `k3s ctr` which may not work in //! all k3s versions) with the k3s containerd socket. The default containerd //! namespace in k3s is already `k8s.io`, which is what kubelet uses. +use std::pin::Pin; + use bollard::Docker; use bollard::query_parameters::UploadToContainerOptionsBuilder; use bytes::Bytes; -use futures::StreamExt; +use futures::{Stream, StreamExt}; use miette::{IntoDiagnostic, Result, WrapErr}; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; use crate::runtime::exec_capture_with_exit; @@ -26,11 +34,19 @@ const CONTAINERD_SOCK: &str = "/run/k3s/containerd/containerd.sock"; /// Path inside the container where the image tar is staged. const IMPORT_TAR_PATH: &str = "/tmp/openshell-images.tar"; +/// Size of chunks read from the temp file during streaming upload (8 MiB). +const UPLOAD_CHUNK_SIZE: usize = 8 * 1024 * 1024; + +/// Report export progress every N bytes (100 MiB). +const PROGRESS_INTERVAL_BYTES: u64 = 100 * 1024 * 1024; + /// Push a list of images from the local Docker daemon into a k3s gateway's /// containerd runtime. /// /// All images are exported as a single tar (shared layers are deduplicated), -/// uploaded to the container filesystem, and imported into containerd. +/// streamed to a temporary file, then uploaded to the container filesystem +/// and imported into containerd. Memory usage is bounded to `O(chunk_size)` +/// regardless of image size. pub async fn push_local_images( local_docker: &Docker, gateway_docker: &Docker, @@ -42,17 +58,30 @@ pub async fn push_local_images( return Ok(()); } - // 1. Export all images from the local Docker daemon as a single tar. - let image_tar = collect_export(local_docker, images).await?; + // 1. Export all images from the local Docker daemon to a temp file. + let (tmp_file, file_size) = export_to_tempfile(local_docker, images, on_log).await?; on_log(format!( "[progress] Exported {} MiB", - image_tar.len() / (1024 * 1024) + file_size / (1024 * 1024) )); - // 2. Wrap the image tar as a file inside an outer tar archive and upload - // it into the container filesystem via the Docker put_archive API. - let outer_tar = wrap_in_tar(IMPORT_TAR_PATH, &image_tar)?; - upload_archive(gateway_docker, container_name, &outer_tar).await?; + // 2. Stream the image tar wrapped in an outer tar archive into the + // container filesystem via the Docker put_archive API. + let parent_dir = IMPORT_TAR_PATH.rsplit_once('/').map_or("/", |(dir, _)| dir); + let options = UploadToContainerOptionsBuilder::default() + .path(parent_dir) + .build(); + + let upload_stream = streaming_tar_upload(IMPORT_TAR_PATH, tmp_file, file_size); + gateway_docker + .upload_to_container( + container_name, + Some(options), + bollard::body_try_stream(upload_stream), + ) + .await + .into_diagnostic() + .wrap_err("failed to upload image tar into container")?; on_log("[progress] Uploaded to gateway".to_string()); // 3. Import the tar into containerd via ctr. @@ -93,59 +122,115 @@ pub async fn push_local_images( Ok(()) } -/// Collect the full export tar from `docker.export_images()` into memory. -async fn collect_export(docker: &Docker, images: &[&str]) -> Result> { +/// Stream the Docker image export directly to a temporary file. +/// +/// Returns the temp file handle and the total number of bytes written. +/// Memory usage is `O(chunk_size)` — only one chunk is held at a time. +/// Progress is reported every [`PROGRESS_INTERVAL_BYTES`] bytes. +async fn export_to_tempfile( + docker: &Docker, + images: &[&str], + on_log: &mut impl FnMut(String), +) -> Result<(tempfile::NamedTempFile, u64)> { + let tmp = tempfile::NamedTempFile::new() + .into_diagnostic() + .wrap_err("failed to create temp file for image export")?; + + // Open a second handle for async writing; the NamedTempFile retains + // ownership and ensures cleanup on drop. + let std_file = tmp + .reopen() + .into_diagnostic() + .wrap_err("failed to reopen temp file for writing")?; + let mut async_file = tokio::fs::File::from_std(std_file); + let mut stream = docker.export_images(images); - let mut buf = Vec::new(); + let mut total_bytes: u64 = 0; + let mut last_reported: u64 = 0; + while let Some(chunk) = stream.next().await { let bytes = chunk .into_diagnostic() .wrap_err("failed to read image export stream")?; - buf.extend_from_slice(&bytes); + async_file + .write_all(&bytes) + .await + .into_diagnostic() + .wrap_err("failed to write image data to temp file")?; + total_bytes += bytes.len() as u64; + + // Report progress periodically. + if total_bytes >= last_reported + PROGRESS_INTERVAL_BYTES { + let mb = total_bytes / (1024 * 1024); + on_log(format!("[progress] Exported {mb} MiB")); + last_reported = total_bytes; + } } - Ok(buf) -} -/// Wrap raw bytes as a single file inside a tar archive. -/// -/// The Docker `put_archive` API expects a tar that is extracted at a target -/// directory. We create a tar containing one entry whose name is the basename -/// of `file_path`, and upload it to the parent directory. -fn wrap_in_tar(file_path: &str, data: &[u8]) -> Result> { - let file_name = file_path.rsplit('/').next().unwrap_or(file_path); - - let mut builder = tar::Builder::new(Vec::new()); - let mut header = tar::Header::new_gnu(); - header.set_path(file_name).into_diagnostic()?; - header.set_size(data.len() as u64); - header.set_mode(0o644); - header.set_cksum(); - builder - .append(&header, data) - .into_diagnostic() - .wrap_err("failed to build tar archive for image upload")?; - builder - .into_inner() + async_file + .flush() + .await .into_diagnostic() - .wrap_err("failed to finalize tar archive") -} - -/// Upload a tar archive into the container at the parent directory of -/// [`IMPORT_TAR_PATH`]. -async fn upload_archive(docker: &Docker, container_name: &str, archive: &[u8]) -> Result<()> { - let parent_dir = IMPORT_TAR_PATH.rsplit_once('/').map_or("/", |(dir, _)| dir); + .wrap_err("failed to flush temp file")?; - let options = UploadToContainerOptionsBuilder::default() - .path(parent_dir) - .build(); + Ok((tmp, total_bytes)) +} - docker - .upload_to_container( - container_name, - Some(options), - bollard::body_full(Bytes::copy_from_slice(archive)), - ) - .await - .into_diagnostic() - .wrap_err("failed to upload image tar into container") +/// Create a stream that yields an outer tar archive containing the image tar +/// as a single entry, reading the image data from the temp file in chunks. +/// +/// The Docker `put_archive` API expects a tar that is extracted at a target +/// directory. We construct a tar with one entry whose name is the basename +/// of `file_path`. The stream yields: +/// 1. A 512-byte GNU tar header +/// 2. The file content in [`UPLOAD_CHUNK_SIZE`] chunks +/// 3. Padding to a 512-byte boundary + two 512-byte zero EOF blocks +/// +/// Memory usage is O([`UPLOAD_CHUNK_SIZE`]) regardless of file size. +fn streaming_tar_upload( + file_path: &str, + tmp_file: tempfile::NamedTempFile, + file_size: u64, +) -> Pin> + Send>> { + let file_name = file_path + .rsplit('/') + .next() + .unwrap_or(file_path) + .to_string(); + + Box::pin(async_stream::try_stream! { + // 1. Build and yield the tar header. + let mut header = tar::Header::new_gnu(); + header.set_path(&file_name)?; + header.set_size(file_size); + header.set_mode(0o644); + header.set_cksum(); + yield Bytes::copy_from_slice(header.as_bytes()); + + // 2. Stream the temp file content in chunks. + let std_file = tmp_file.reopen()?; + let mut async_file = tokio::fs::File::from_std(std_file); + let mut buf = vec![0u8; UPLOAD_CHUNK_SIZE]; + loop { + let n = async_file.read(&mut buf).await?; + if n == 0 { + break; + } + yield Bytes::copy_from_slice(&buf[..n]); + } + + // 3. Yield tar padding and EOF blocks. + // Tar entries must be padded to a 512-byte boundary, followed by + // two 512-byte zero blocks to signal end-of-archive. + let padding_len = if file_size.is_multiple_of(512) { + 0 + } else { + 512 - (file_size % 512) as usize + }; + let footer = vec![0u8; padding_len + 1024]; + yield Bytes::from(footer); + + // The NamedTempFile is dropped here, cleaning up the temp file. + drop(tmp_file); + }) }