From e7dbeaf2a0d108c4216ae5d4afd084c5c826b7eb Mon Sep 17 00:00:00 2001 From: Drew Newberry Date: Tue, 31 Mar 2026 08:42:43 -0700 Subject: [PATCH] fix(bootstrap): stream image push through temp file to prevent OOM The image push pipeline buffered the entire Docker image tar 3x in memory (export, tar wrap, Bytes copy), causing OOM kills for images over ~1-2 GB. Replace the in-memory pipeline with a temp-file + streaming upload: export to a NamedTempFile, then stream the outer tar (header, 8 MiB file chunks, footer) directly into upload_to_container via body_try_stream. Peak memory drops from ~3x image size to ~8 MiB constant. Also adds incremental export progress reporting every 100 MiB. --- Cargo.lock | 1 + crates/openshell-bootstrap/Cargo.toml | 3 +- crates/openshell-bootstrap/src/push.rs | 193 ++++++++++++++++++------- 3 files changed, 142 insertions(+), 55 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9d8247e5..7a0215c8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2805,6 +2805,7 @@ checksum = "c08d65885ee38876c4f86fa503fb49d7b507c2b62552df7c70b2fce627e06381" name = "openshell-bootstrap" version = "0.0.0" dependencies = [ + "async-stream", "base64 0.22.1", "bollard", "bytes", diff --git a/crates/openshell-bootstrap/Cargo.toml b/crates/openshell-bootstrap/Cargo.toml index ab57ad57..942ffc48 100644 --- a/crates/openshell-bootstrap/Cargo.toml +++ b/crates/openshell-bootstrap/Cargo.toml @@ -11,6 +11,7 @@ rust-version.workspace = true [dependencies] openshell-core = { path = "../openshell-core" } +async-stream = "0.3" base64 = "0.22" bollard = { version = "0.20", features = ["ssh"] } bytes = { workspace = true } @@ -20,11 +21,11 @@ rcgen = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } tar = "0.4" +tempfile = "3" tokio = { workspace = true } tracing = { workspace = true } [dev-dependencies] -tempfile = "3" [lints] workspace = true diff --git a/crates/openshell-bootstrap/src/push.rs b/crates/openshell-bootstrap/src/push.rs index 0dcbaa6d..336d46c3 100644 --- a/crates/openshell-bootstrap/src/push.rs +++ b/crates/openshell-bootstrap/src/push.rs @@ -8,15 +8,23 @@ //! uploaded into the gateway container as a tar file via the Docker //! `put_archive` API, and then imported into containerd via `ctr images import`. //! +//! To avoid unbounded memory usage with large images, the export is streamed +//! to a temporary file on disk, then streamed back through a tar wrapper into +//! the Docker upload API. Peak memory usage is `O(chunk_size)` regardless of +//! image size. +//! //! The standalone `ctr` binary is used (not `k3s ctr` which may not work in //! all k3s versions) with the k3s containerd socket. The default containerd //! namespace in k3s is already `k8s.io`, which is what kubelet uses. +use std::pin::Pin; + use bollard::Docker; use bollard::query_parameters::UploadToContainerOptionsBuilder; use bytes::Bytes; -use futures::StreamExt; +use futures::{Stream, StreamExt}; use miette::{IntoDiagnostic, Result, WrapErr}; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; use crate::runtime::exec_capture_with_exit; @@ -26,11 +34,19 @@ const CONTAINERD_SOCK: &str = "/run/k3s/containerd/containerd.sock"; /// Path inside the container where the image tar is staged. const IMPORT_TAR_PATH: &str = "/tmp/openshell-images.tar"; +/// Size of chunks read from the temp file during streaming upload (8 MiB). +const UPLOAD_CHUNK_SIZE: usize = 8 * 1024 * 1024; + +/// Report export progress every N bytes (100 MiB). +const PROGRESS_INTERVAL_BYTES: u64 = 100 * 1024 * 1024; + /// Push a list of images from the local Docker daemon into a k3s gateway's /// containerd runtime. /// /// All images are exported as a single tar (shared layers are deduplicated), -/// uploaded to the container filesystem, and imported into containerd. +/// streamed to a temporary file, then uploaded to the container filesystem +/// and imported into containerd. Memory usage is bounded to `O(chunk_size)` +/// regardless of image size. pub async fn push_local_images( local_docker: &Docker, gateway_docker: &Docker, @@ -42,17 +58,30 @@ pub async fn push_local_images( return Ok(()); } - // 1. Export all images from the local Docker daemon as a single tar. - let image_tar = collect_export(local_docker, images).await?; + // 1. Export all images from the local Docker daemon to a temp file. + let (tmp_file, file_size) = export_to_tempfile(local_docker, images, on_log).await?; on_log(format!( "[progress] Exported {} MiB", - image_tar.len() / (1024 * 1024) + file_size / (1024 * 1024) )); - // 2. Wrap the image tar as a file inside an outer tar archive and upload - // it into the container filesystem via the Docker put_archive API. - let outer_tar = wrap_in_tar(IMPORT_TAR_PATH, &image_tar)?; - upload_archive(gateway_docker, container_name, &outer_tar).await?; + // 2. Stream the image tar wrapped in an outer tar archive into the + // container filesystem via the Docker put_archive API. + let parent_dir = IMPORT_TAR_PATH.rsplit_once('/').map_or("/", |(dir, _)| dir); + let options = UploadToContainerOptionsBuilder::default() + .path(parent_dir) + .build(); + + let upload_stream = streaming_tar_upload(IMPORT_TAR_PATH, tmp_file, file_size); + gateway_docker + .upload_to_container( + container_name, + Some(options), + bollard::body_try_stream(upload_stream), + ) + .await + .into_diagnostic() + .wrap_err("failed to upload image tar into container")?; on_log("[progress] Uploaded to gateway".to_string()); // 3. Import the tar into containerd via ctr. @@ -93,59 +122,115 @@ pub async fn push_local_images( Ok(()) } -/// Collect the full export tar from `docker.export_images()` into memory. -async fn collect_export(docker: &Docker, images: &[&str]) -> Result> { +/// Stream the Docker image export directly to a temporary file. +/// +/// Returns the temp file handle and the total number of bytes written. +/// Memory usage is `O(chunk_size)` — only one chunk is held at a time. +/// Progress is reported every [`PROGRESS_INTERVAL_BYTES`] bytes. +async fn export_to_tempfile( + docker: &Docker, + images: &[&str], + on_log: &mut impl FnMut(String), +) -> Result<(tempfile::NamedTempFile, u64)> { + let tmp = tempfile::NamedTempFile::new() + .into_diagnostic() + .wrap_err("failed to create temp file for image export")?; + + // Open a second handle for async writing; the NamedTempFile retains + // ownership and ensures cleanup on drop. + let std_file = tmp + .reopen() + .into_diagnostic() + .wrap_err("failed to reopen temp file for writing")?; + let mut async_file = tokio::fs::File::from_std(std_file); + let mut stream = docker.export_images(images); - let mut buf = Vec::new(); + let mut total_bytes: u64 = 0; + let mut last_reported: u64 = 0; + while let Some(chunk) = stream.next().await { let bytes = chunk .into_diagnostic() .wrap_err("failed to read image export stream")?; - buf.extend_from_slice(&bytes); + async_file + .write_all(&bytes) + .await + .into_diagnostic() + .wrap_err("failed to write image data to temp file")?; + total_bytes += bytes.len() as u64; + + // Report progress periodically. + if total_bytes >= last_reported + PROGRESS_INTERVAL_BYTES { + let mb = total_bytes / (1024 * 1024); + on_log(format!("[progress] Exported {mb} MiB")); + last_reported = total_bytes; + } } - Ok(buf) -} -/// Wrap raw bytes as a single file inside a tar archive. -/// -/// The Docker `put_archive` API expects a tar that is extracted at a target -/// directory. We create a tar containing one entry whose name is the basename -/// of `file_path`, and upload it to the parent directory. -fn wrap_in_tar(file_path: &str, data: &[u8]) -> Result> { - let file_name = file_path.rsplit('/').next().unwrap_or(file_path); - - let mut builder = tar::Builder::new(Vec::new()); - let mut header = tar::Header::new_gnu(); - header.set_path(file_name).into_diagnostic()?; - header.set_size(data.len() as u64); - header.set_mode(0o644); - header.set_cksum(); - builder - .append(&header, data) - .into_diagnostic() - .wrap_err("failed to build tar archive for image upload")?; - builder - .into_inner() + async_file + .flush() + .await .into_diagnostic() - .wrap_err("failed to finalize tar archive") -} - -/// Upload a tar archive into the container at the parent directory of -/// [`IMPORT_TAR_PATH`]. -async fn upload_archive(docker: &Docker, container_name: &str, archive: &[u8]) -> Result<()> { - let parent_dir = IMPORT_TAR_PATH.rsplit_once('/').map_or("/", |(dir, _)| dir); + .wrap_err("failed to flush temp file")?; - let options = UploadToContainerOptionsBuilder::default() - .path(parent_dir) - .build(); + Ok((tmp, total_bytes)) +} - docker - .upload_to_container( - container_name, - Some(options), - bollard::body_full(Bytes::copy_from_slice(archive)), - ) - .await - .into_diagnostic() - .wrap_err("failed to upload image tar into container") +/// Create a stream that yields an outer tar archive containing the image tar +/// as a single entry, reading the image data from the temp file in chunks. +/// +/// The Docker `put_archive` API expects a tar that is extracted at a target +/// directory. We construct a tar with one entry whose name is the basename +/// of `file_path`. The stream yields: +/// 1. A 512-byte GNU tar header +/// 2. The file content in [`UPLOAD_CHUNK_SIZE`] chunks +/// 3. Padding to a 512-byte boundary + two 512-byte zero EOF blocks +/// +/// Memory usage is O([`UPLOAD_CHUNK_SIZE`]) regardless of file size. +fn streaming_tar_upload( + file_path: &str, + tmp_file: tempfile::NamedTempFile, + file_size: u64, +) -> Pin> + Send>> { + let file_name = file_path + .rsplit('/') + .next() + .unwrap_or(file_path) + .to_string(); + + Box::pin(async_stream::try_stream! { + // 1. Build and yield the tar header. + let mut header = tar::Header::new_gnu(); + header.set_path(&file_name)?; + header.set_size(file_size); + header.set_mode(0o644); + header.set_cksum(); + yield Bytes::copy_from_slice(header.as_bytes()); + + // 2. Stream the temp file content in chunks. + let std_file = tmp_file.reopen()?; + let mut async_file = tokio::fs::File::from_std(std_file); + let mut buf = vec![0u8; UPLOAD_CHUNK_SIZE]; + loop { + let n = async_file.read(&mut buf).await?; + if n == 0 { + break; + } + yield Bytes::copy_from_slice(&buf[..n]); + } + + // 3. Yield tar padding and EOF blocks. + // Tar entries must be padded to a 512-byte boundary, followed by + // two 512-byte zero blocks to signal end-of-archive. + let padding_len = if file_size.is_multiple_of(512) { + 0 + } else { + 512 - (file_size % 512) as usize + }; + let footer = vec![0u8; padding_len + 1024]; + yield Bytes::from(footer); + + // The NamedTempFile is dropped here, cleaning up the temp file. + drop(tmp_file); + }) }