diff --git a/crates/core/src/archiver.rs b/crates/core/src/archiver.rs index 33e0b1e3a..5e5751328 100644 --- a/crates/core/src/archiver.rs +++ b/crates/core/src/archiver.rs @@ -40,16 +40,16 @@ pub struct TreeStackEmptyError; #[allow(clippy::struct_field_names)] pub struct Archiver<'a, BE: DecryptFullBackend, I: ReadGlobalIndex> { /// The `FileArchiver` is responsible for archiving files. - file_archiver: FileArchiver<'a, BE, I>, + file_archiver: FileArchiver<'a, I>, /// The `TreeArchiver` is responsible for archiving trees. - tree_archiver: TreeArchiver<'a, BE, I>, + tree_archiver: TreeArchiver<'a, I>, /// The parent snapshot to use. parent: Parent, /// The `SharedIndexer` is used to index the data. - indexer: SharedIndexer, + indexer: SharedIndexer, /// The backend to write to. be: BE, @@ -83,7 +83,7 @@ impl<'a, BE: DecryptFullBackend, I: ReadGlobalIndex> Archiver<'a, BE, I> { parent: Parent, mut snap: SnapshotFile, ) -> RusticResult { - let indexer = Indexer::new(be.clone()).into_shared(); + let indexer = Indexer::new().into_shared(); let mut summary = snap.summary.take().unwrap_or_default(); summary.backup_start = Local::now(); @@ -217,7 +217,8 @@ impl<'a, BE: DecryptFullBackend, I: ReadGlobalIndex> Archiver<'a, BE, I> { stats.apply(&mut summary, BlobType::Data); self.snap.tree = id; - self.indexer.write().unwrap().finalize()?; + self.indexer + .finalize_and_check_save(|file| self.be.save_file_no_id(file))?; summary.finalize(self.snap.time).map_err(|err| { RusticError::with_source( diff --git a/crates/core/src/archiver/file_archiver.rs b/crates/core/src/archiver/file_archiver.rs index 2ae7a0bbf..8e3bbf851 100644 --- a/crates/core/src/archiver/file_archiver.rs +++ b/crates/core/src/archiver/file_archiver.rs @@ -13,10 +13,7 @@ use crate::{ decrypt::DecryptWriteBackend, node::{Node, NodeType}, }, - blob::{ - BlobId, BlobType, DataId, - packer::{Packer, PackerStats}, - }, + blob::{BlobId, BlobType, DataId, packer::PackerStats, repopacker::RepositoryPacker}, chunker::ChunkIter, crypto::hasher::hash, error::{ErrorKind, RusticError, RusticResult}, @@ -30,16 +27,15 @@ use crate::{ /// /// # Type Parameters /// -/// * `BE` - The backend type. /// * `I` - The index to read from. #[derive(Clone)] -pub(crate) struct FileArchiver<'a, BE: DecryptWriteBackend, I: ReadGlobalIndex> { +pub(crate) struct FileArchiver<'a, I: ReadGlobalIndex> { index: &'a I, - data_packer: Packer, + data_packer: RepositoryPacker, rabin: Rabin64, } -impl<'a, BE: DecryptWriteBackend, I: ReadGlobalIndex> FileArchiver<'a, BE, I> { +impl<'a, I: ReadGlobalIndex> FileArchiver<'a, I> { /// Creates a new `FileArchiver`. /// /// # Type Parameters @@ -53,20 +49,15 @@ impl<'a, BE: DecryptWriteBackend, I: ReadGlobalIndex> FileArchiver<'a, BE, I> { /// * `index` - The index to read from. /// * `indexer` - The indexer to write to. /// * `config` - The config file. - /// - /// # Errors - /// - /// * If sending the message to the raw packer fails. - /// * If converting the data length to u64 fails - pub(crate) fn new( + pub(crate) fn new( be: BE, index: &'a I, - indexer: SharedIndexer, + indexer: SharedIndexer, config: &ConfigFile, ) -> RusticResult { let poly = config.poly()?; - let data_packer = Packer::new( + let data_packer = RepositoryPacker::new_with_default_sizer( be, BlobType::Data, indexer, @@ -144,7 +135,7 @@ impl<'a, BE: DecryptWriteBackend, I: ReadGlobalIndex> FileArchiver<'a, BE, I> { // TODO: add documentation! fn backup_reader( &self, - r: impl Read + Send + 'static, + r: impl Read, node: Node, p: &impl Progress, ) -> RusticResult<(Node, u64)> { diff --git a/crates/core/src/archiver/tree_archiver.rs b/crates/core/src/archiver/tree_archiver.rs index a5edbc72c..3c55ffa20 100644 --- a/crates/core/src/archiver/tree_archiver.rs +++ b/crates/core/src/archiver/tree_archiver.rs @@ -8,7 +8,7 @@ use crate::{ backend::{decrypt::DecryptWriteBackend, node::Node}, blob::{ BlobType, - packer::Packer, + repopacker::RepositoryPacker, tree::{Tree, TreeId}, }, error::{ErrorKind, RusticError, RusticResult}, @@ -22,11 +22,8 @@ pub(crate) type TreeItem = TreeType<(ParentResult<()>, u64), ParentResult { +pub(crate) struct TreeArchiver<'a, I: ReadGlobalIndex> { /// The current tree. tree: Tree, /// The stack of trees. @@ -34,12 +31,12 @@ pub(crate) struct TreeArchiver<'a, BE: DecryptWriteBackend, I: ReadGlobalIndex> /// The index to read from. index: &'a I, /// The packer to write to. - tree_packer: Packer, + tree_packer: RepositoryPacker, /// The summary of the snapshot. summary: SnapshotSummary, } -impl<'a, BE: DecryptWriteBackend, I: ReadGlobalIndex> TreeArchiver<'a, BE, I> { +impl<'a, I: ReadGlobalIndex> TreeArchiver<'a, I> { /// Creates a new `TreeArchiver`. /// /// # Type Parameters @@ -54,19 +51,14 @@ impl<'a, BE: DecryptWriteBackend, I: ReadGlobalIndex> TreeArchiver<'a, BE, I> { /// * `indexer` - The indexer to write to. /// * `config` - The config file. /// * `summary` - The summary of the snapshot. - /// - /// # Errors - /// - /// * If sending the message to the raw packer fails. - /// * If converting the data length to u64 fails - pub(crate) fn new( + pub(crate) fn new( be: BE, index: &'a I, - indexer: SharedIndexer, + indexer: SharedIndexer, config: &ConfigFile, summary: SnapshotSummary, ) -> RusticResult { - let tree_packer = Packer::new( + let tree_packer = RepositoryPacker::new_with_default_sizer( be, BlobType::Tree, indexer, diff --git a/crates/core/src/backend.rs b/crates/core/src/backend.rs index 2179671b2..a08e17ed7 100644 --- a/crates/core/src/backend.rs +++ b/crates/core/src/backend.rs @@ -441,7 +441,7 @@ impl ReadSourceEntry { /// This trait is implemented by all backends that can read data and open from a source. pub trait ReadSourceOpen { /// The Reader used for this source - type Reader: Read + Send + 'static; + type Reader: Read; /// Opens the source. /// @@ -456,7 +456,7 @@ pub trait ReadSourceOpen { } /// blanket implementation for readers -impl ReadSourceOpen for T { +impl ReadSourceOpen for T { type Reader = T; fn open(self) -> RusticResult { Ok(self) diff --git a/crates/core/src/backend/decrypt.rs b/crates/core/src/backend/decrypt.rs index 1f206ba2a..48442bf4a 100644 --- a/crates/core/src/backend/decrypt.rs +++ b/crates/core/src/backend/decrypt.rs @@ -281,6 +281,16 @@ pub trait DecryptWriteBackend: WriteBackend + Clone + 'static { } } + /// Saves the given file without returning the id. + /// + /// # Arguments + /// + /// * `file` - The file to save. + fn save_file_no_id(&self, file: &F) -> RusticResult<()> { + let _ = self.save_file(file)?; + Ok(()) + } + /// Saves the given file uncompressed. /// /// # Arguments diff --git a/crates/core/src/blob.rs b/crates/core/src/blob.rs index 5b0a4c5e6..e9fbe68f5 100644 --- a/crates/core/src/blob.rs +++ b/crates/core/src/blob.rs @@ -1,4 +1,6 @@ +pub(crate) mod pack_sizer; pub(crate) mod packer; +pub(crate) mod repopacker; pub(crate) mod tree; use derive_more::Constructor; diff --git a/crates/core/src/blob/pack_sizer.rs b/crates/core/src/blob/pack_sizer.rs new file mode 100644 index 000000000..25f20ded8 --- /dev/null +++ b/crates/core/src/blob/pack_sizer.rs @@ -0,0 +1,131 @@ +use crate::{blob::BlobType, repofile::ConfigFile}; + +use integer_sqrt::IntegerSquareRoot; + +/// The pack sizer is responsible for computing the size of the pack file. +pub trait PackSizer { + /// Computes the size of the pack file. + #[must_use] + fn pack_size(&self) -> u32; + + /// Evaluates whether the given size is not too small or too large + /// + /// # Arguments + /// + /// * `size` - The size to check + #[must_use] + fn size_ok(&self, size: u32) -> bool { + !self.is_too_small(size) && !self.is_too_large(size) + } + + /// Evaluates whether the given size is too small + /// + /// # Arguments + /// + /// * `size` - The size to check + #[must_use] + fn is_too_small(&self, _size: u32) -> bool { + false + } + + /// Evaluates whether the given size is too large + /// + /// # Arguments + /// + /// * `size` - The size to check + #[must_use] + fn is_too_large(&self, _size: u32) -> bool { + false + } + + /// Adds the given size to the current size. + /// + /// # Arguments + /// + /// * `added` - The size to add + fn add_size(&mut self, _added: u32) {} +} + +/// The default pack sizer computes packs depending on a default size, a grow factor amd a size limit. +#[derive(Debug, Clone, Copy)] +pub struct DefaultPackSizer { + /// The default size of a pack file. + default_size: u32, + /// The grow factor of a pack file. + grow_factor: u32, + /// The size limit of a pack file. + size_limit: u32, + /// The current size of a pack file. + current_size: u64, + /// The minimum pack size tolerance in percent before a repack is triggered. + min_packsize_tolerate_percent: u32, + /// The maximum pack size tolerance in percent before a repack is triggered. + max_packsize_tolerate_percent: u32, +} + +impl DefaultPackSizer { + /// Creates a new `DefaultPackSizer` from a config file. + /// + /// # Arguments + /// + /// * `config` - The config file. + /// * `blob_type` - The blob type. + /// * `current_size` - The current size of the pack file. + /// + /// # Returns + /// + /// A new `DefaultPackSizer`. + #[must_use] + pub fn from_config(config: &ConfigFile, blob_type: BlobType, current_size: u64) -> Self { + let (default_size, grow_factor, size_limit) = config.packsize(blob_type); + let (min_packsize_tolerate_percent, max_packsize_tolerate_percent) = + config.packsize_ok_percents(); + Self { + default_size, + grow_factor, + size_limit, + current_size, + min_packsize_tolerate_percent, + max_packsize_tolerate_percent, + } + } +} + +impl PackSizer for DefaultPackSizer { + #[allow(clippy::cast_possible_truncation)] + fn pack_size(&self) -> u32 { + (self.current_size.integer_sqrt() as u32 * self.grow_factor + self.default_size) + .min(self.size_limit) + } + + fn is_too_small(&self, size: u32) -> bool { + let target_size = self.pack_size(); + // Note: we cast to u64 so that no overflow can occur in the multiplications + u64::from(size) * 100 + < u64::from(target_size) * u64::from(self.min_packsize_tolerate_percent) + } + + fn is_too_large(&self, size: u32) -> bool { + let target_size = self.pack_size(); + // Note: we cast to u64 so that no overflow can occur in the multiplications + u64::from(size) * 100 + > u64::from(target_size) * u64::from(self.max_packsize_tolerate_percent) + } + + fn add_size(&mut self, added: u32) { + self.current_size += u64::from(added); + } +} + +/// A pack sizer which uses a fixed pack size +#[derive(Debug, Clone, Copy)] +pub struct FixedPackSizer(pub u32); + +impl PackSizer for FixedPackSizer { + fn pack_size(&self) -> u32 { + self.0 + } + fn is_too_large(&self, size: u32) -> bool { + size > self.0 + } +} diff --git a/crates/core/src/blob/packer.rs b/crates/core/src/blob/packer.rs index 167303afa..b0e0b3a77 100644 --- a/crates/core/src/blob/packer.rs +++ b/crates/core/src/blob/packer.rs @@ -1,33 +1,25 @@ use std::{ num::NonZeroU32, - sync::{Arc, RwLock}, time::{Duration, SystemTime}, }; use bytes::{Bytes, BytesMut}; -use chrono::Local; -use crossbeam_channel::{Receiver, Sender, bounded}; -use integer_sqrt::IntegerSquareRoot; use log::warn; -use pariter::{IteratorExt, scope}; use crate::{ - backend::{ - FileType, - decrypt::{DecryptFullBackend, DecryptWriteBackend}, - }, blob::{BlobId, BlobType}, crypto::{CryptoKey, hasher::hash}, error::{ErrorKind, RusticError, RusticResult}, - index::indexer::SharedIndexer, repofile::{ - configfile::ConfigFile, - indexfile::{IndexBlob, IndexPack}, - packfile::{PackHeaderLength, PackHeaderRef, PackId}, + HeaderEntry, + indexfile::IndexPack, + packfile::{self, PackHeaderLength, PackHeaderRef}, snapshotfile::SnapshotSummary, }, }; +use super::pack_sizer::PackSizer; + /// [`PackerErrorKind`] describes the errors that can be returned for a Packer #[derive(thiserror::Error, Debug, displaydoc::Display)] #[non_exhaustive] @@ -38,22 +30,9 @@ pub enum PackerErrorKind { from: &'static str, source: std::num::TryFromIntError, }, - /// Sending crossbeam message failed: `size_limit`: `{size_limit:?}`, `id`: `{id:?}`, `data`: `{data:?}` : `{source}` - SendingCrossbeamMessage { - size_limit: Option, - id: BlobId, - data: Bytes, - source: crossbeam_channel::SendError<(Bytes, BlobId, Option)>, - }, - /// Sending crossbeam data message failed: `data`: `{data:?}`, `index_pack`: `{index_pack:?}` : `{source}` - SendingCrossbeamDataMessage { - data: Bytes, - index_pack: IndexPack, - source: crossbeam_channel::SendError<(Bytes, IndexPack)>, - }, } -pub(crate) type PackerResult = Result>; +pub(crate) type PackerResult = Result; pub(super) mod constants { use std::time::Duration; @@ -68,326 +47,8 @@ pub(super) mod constants { pub(super) const MAX_COUNT: u32 = 10_000; /// The maximum age of a pack pub(super) const MAX_AGE: Duration = Duration::from_secs(300); -} - -/// The pack sizer is responsible for computing the size of the pack file. -#[derive(Debug, Clone, Copy)] -pub struct PackSizer { - /// The default size of a pack file. - default_size: u32, - /// The grow factor of a pack file. - grow_factor: u32, - /// The size limit of a pack file. - size_limit: u32, - /// The current size of a pack file. - current_size: u64, - /// The minimum pack size tolerance in percent before a repack is triggered. - min_packsize_tolerate_percent: u32, - /// The maximum pack size tolerance in percent before a repack is triggered. - max_packsize_tolerate_percent: u32, -} - -impl PackSizer { - /// Creates a new `PackSizer` from a config file. - /// - /// # Arguments - /// - /// * `config` - The config file. - /// * `blob_type` - The blob type. - /// * `current_size` - The current size of the pack file. - /// - /// # Returns - /// - /// A new `PackSizer`. - #[must_use] - pub fn from_config(config: &ConfigFile, blob_type: BlobType, current_size: u64) -> Self { - let (default_size, grow_factor, size_limit) = config.packsize(blob_type); - let (min_packsize_tolerate_percent, max_packsize_tolerate_percent) = - config.packsize_ok_percents(); - Self { - default_size, - grow_factor, - size_limit, - current_size, - min_packsize_tolerate_percent, - max_packsize_tolerate_percent, - } - } - - /// Computes the size of the pack file. - #[must_use] - // The cast actually shouldn't pose any problems. - // `current_size` is `u64`, the maximum value is `2^64-1`. - // `isqrt(2^64-1) = 2^32-1` which fits into a `u32`. (@aawsome) - #[allow(clippy::cast_possible_truncation)] - pub fn pack_size(&self) -> u32 { - (self.current_size.integer_sqrt() as u32 * self.grow_factor + self.default_size) - .min(self.size_limit) - .min(constants::MAX_SIZE) - } - - /// Evaluates whether the given size is not too small or too large - /// - /// # Arguments - /// - /// * `size` - The size to check - #[must_use] - pub fn size_ok(&self, size: u32) -> bool { - !self.is_too_small(size) && !self.is_too_large(size) - } - - /// Evaluates whether the given size is too small - /// - /// # Arguments - /// - /// * `size` - The size to check - #[must_use] - pub fn is_too_small(&self, size: u32) -> bool { - let target_size = self.pack_size(); - // Note: we cast to u64 so that no overflow can occur in the multiplications - u64::from(size) * 100 - < u64::from(target_size) * u64::from(self.min_packsize_tolerate_percent) - } - - /// Evaluates whether the given size is too large - /// - /// # Arguments - /// - /// * `size` - The size to check - #[must_use] - pub fn is_too_large(&self, size: u32) -> bool { - let target_size = self.pack_size(); - // Note: we cast to u64 so that no overflow can occur in the multiplications - u64::from(size) * 100 - > u64::from(target_size) * u64::from(self.max_packsize_tolerate_percent) - } - - /// Adds the given size to the current size. - /// - /// # Arguments - /// - /// * `added` - The size to add - /// - /// # Panics - /// - /// * If the size is too large - fn add_size(&mut self, added: u32) { - self.current_size += u64::from(added); - } -} - -/// The `Packer` is responsible for packing blobs into pack files. -/// -/// # Type Parameters -/// -/// * `BE` - The backend type. -#[allow(missing_debug_implementations)] -#[allow(clippy::struct_field_names)] -#[derive(Clone)] -pub struct Packer { - /// The raw packer wrapped in an `Arc` and `RwLock`. - // This is a hack: raw_packer and indexer are only used in the add_raw() method. - // TODO: Refactor as actor, like the other add() methods - raw_packer: Arc>>, - /// The shared indexer containing the backend. - indexer: SharedIndexer, - /// The sender to send blobs to the raw packer. - sender: Sender<(Bytes, BlobId, Option)>, - /// The receiver to receive the status from the raw packer. - finish: Receiver>, -} - -impl Packer { - /// Creates a new `Packer`. - /// - /// # Type Parameters - /// - /// * `BE` - The backend type. - /// - /// # Arguments - /// - /// * `be` - The backend to write to. - /// * `blob_type` - The blob type. - /// * `indexer` - The indexer to write to. - /// * `config` - The config file. - /// * `total_size` - The total size of the pack file. - /// - /// # Errors - /// - /// * If sending the message to the raw packer fails. - /// * If converting the data length to u64 fails - #[allow(clippy::unnecessary_wraps)] - pub fn new( - be: BE, - blob_type: BlobType, - indexer: SharedIndexer, - config: &ConfigFile, - total_size: u64, - ) -> RusticResult { - let raw_packer = Arc::new(RwLock::new(RawPacker::new( - be.clone(), - blob_type, - indexer.clone(), - config, - total_size, - ))); - - let (tx, rx) = bounded(0); - let (finish_tx, finish_rx) = bounded::>(0); - let packer = Self { - raw_packer: raw_packer.clone(), - indexer: indexer.clone(), - sender: tx, - finish: finish_rx, - }; - - let _join_handle = std::thread::spawn(move || { - scope(|scope| { - let status = rx - .into_iter() - .readahead_scoped(scope) - // early check if id is already contained - .filter(|(_, id, _)| !indexer.read().unwrap().has(id)) - .filter(|(_, id, _)| !raw_packer.read().unwrap().has(id)) - .readahead_scoped(scope) - .parallel_map_scoped( - scope, - |(data, id, size_limit): (Bytes, BlobId, Option)| { - let (data, data_len, uncompressed_length) = be.process_data(&data)?; - Ok(( - data, - id, - u64::from(data_len), - uncompressed_length, - size_limit, - )) - }, - ) - .readahead_scoped(scope) - // check again if id is already contained - // TODO: We may still save duplicate blobs - the indexer is only updated when the packfile write has completed - .filter(|res| { - res.as_ref().map_or_else( - |_| true, - |(_, id, _, _, _)| !indexer.read().unwrap().has(id), - ) - }) - .try_for_each(|item: RusticResult<_>| -> RusticResult<()> { - let (data, id, data_len, ul, size_limit) = item?; - raw_packer - .write() - .unwrap() - .add_raw(&data, &id, data_len, ul, size_limit) - }) - .and_then(|()| raw_packer.write().unwrap().finalize()); - _ = finish_tx.send(status); - }) - .unwrap(); - }); - - Ok(packer) - } - - /// Adds the blob to the packfile - /// - /// # Arguments - /// - /// * `data` - The blob data - /// * `id` - The blob id - /// - /// # Errors - /// - /// * If sending the message to the raw packer fails. - pub fn add(&self, data: Bytes, id: BlobId) -> RusticResult<()> { - // compute size limit based on total size and size bounds - self.add_with_sizelimit(data, id, None).map_err(|err| { - RusticError::with_source( - ErrorKind::Internal, - "Failed to add blob `{id}` to packfile.", - err, - ) - .attach_context("id", id.to_string()) - .ask_report() - }) - } - - /// Adds the blob to the packfile, allows specifying a size limit for the pack file - /// - /// # Arguments - /// - /// * `data` - The blob data - /// * `id` - The blob id - /// * `size_limit` - The size limit for the pack file - /// - /// # Errors - /// - /// * If sending the message to the raw packer fails. - fn add_with_sizelimit( - &self, - data: Bytes, - id: BlobId, - size_limit: Option, - ) -> PackerResult<()> { - self.sender - .send((data.clone(), id, size_limit)) - .map_err(|err| PackerErrorKind::SendingCrossbeamMessage { - size_limit, - id, - data, - source: err, - })?; - Ok(()) - } - - /// Adds the already encrypted (and maybe compressed) blob to the packfile - /// - /// # Arguments - /// - /// * `data` - The blob data - /// * `id` - The blob id - /// * `data_len` - The length of the blob data - /// * `uncompressed_length` - The length of the blob data before compression - /// * `size_limit` - The size limit for the pack file - /// - /// # Errors - /// - /// * If the blob is already present in the index - /// * If sending the message to the raw packer fails. - fn add_raw( - &self, - data: &[u8], - id: &BlobId, - data_len: u64, - uncompressed_length: Option, - size_limit: Option, - ) -> RusticResult<()> { - // only add if this blob is not present - if self.indexer.read().unwrap().has(id) { - Ok(()) - } else { - self.raw_packer.write().unwrap().add_raw( - data, - id, - data_len, - uncompressed_length, - size_limit, - ) - } - } - - /// Finalizes the packer and does cleanup - /// - /// # Panics - /// - /// * If the channel could not be dropped - pub fn finalize(self) -> RusticResult { - // cancel channel - drop(self.sender); - // wait for items in channel to be processed - self.finish - .recv() - .expect("Should be able to receive from channel to finalize packer.") - } + /// The maximum size used for padding + pub(super) const MAX_PADDING: u32 = 64 * KB; } // TODO: add documentation! @@ -436,9 +97,9 @@ impl PackerStats { /// /// * `BE` - The backend type. #[allow(missing_debug_implementations, clippy::module_name_repetitions)] -pub(crate) struct RawPacker { - /// The backend to write to. - be: BE, +pub(crate) struct Packer { + /// the key to encrypt data + key: C, /// The blob type to pack. blob_type: BlobType, /// The file to write to @@ -451,15 +112,15 @@ pub(crate) struct RawPacker { created: SystemTime, /// The index of the pack index: IndexPack, - /// The actor to write the pack file - file_writer: Option, /// The pack sizer - pack_sizer: PackSizer, + pub pack_sizer: S, /// The packer stats - stats: PackerStats, + pub stats: PackerStats, + /// add a padding blob to stealthen the packsize + add_padding: bool, } -impl RawPacker { +impl Packer { /// Creates a new `RawPacker`. /// /// # Type Parameters @@ -473,36 +134,18 @@ impl RawPacker { /// * `indexer` - The indexer to write to. /// * `config` - The config file. /// * `total_size` - The total size of the pack file. - fn new( - be: BE, - blob_type: BlobType, - indexer: SharedIndexer, - config: &ConfigFile, - total_size: u64, - ) -> Self { - let file_writer = Some(Actor::new( - FileWriterHandle { - be: be.clone(), - indexer, - cacheable: blob_type.is_cacheable(), - }, - 1, - 1, - )); - - let pack_sizer = PackSizer::from_config(config, blob_type, total_size); - + pub fn new(key: C, pack_sizer: S, blob_type: BlobType, add_padding: bool) -> Self { Self { - be, + key, blob_type, file: BytesMut::new(), size: 0, count: 0, created: SystemTime::now(), index: IndexPack::default(), - file_writer, pack_sizer, stats: PackerStats::default(), + add_padding, } } @@ -511,16 +154,8 @@ impl RawPacker { /// # Errors /// /// * If the packfile could not be saved - fn finalize(&mut self) -> RusticResult { - self.save().map_err(|err| { - err.overwrite_kind(ErrorKind::Internal) - .prepend_guidance_line("Failed to save packfile. Data may be lost.") - .ask_report() - })?; - - self.file_writer.take().unwrap().finalize()?; - - Ok(std::mem::take(&mut self.stats)) + pub fn finalize(mut self) -> RusticResult<(Option<(Bytes, IndexPack)>, PackerStats)> { + Ok((self.save()?, self.stats)) } /// Writes the given data to the packfile. @@ -533,7 +168,7 @@ impl RawPacker { /// /// The number of bytes written. fn write_data(&mut self, data: &[u8]) -> PackerResult { - let len = data + let len: u32 = data .len() .try_into() .map_err(|err| PackerErrorKind::Conversion { @@ -541,6 +176,8 @@ impl RawPacker { from: "usize", source: err, })?; + let data_len_packed: u64 = len.into(); + self.stats.data_packed += data_len_packed; self.file.extend_from_slice(data); self.size += len; Ok(len) @@ -559,34 +196,16 @@ impl RawPacker { /// # Errors /// /// * If converting the data length to u64 fails - fn add_raw( + pub fn add( &mut self, data: &[u8], id: &BlobId, data_len: u64, uncompressed_length: Option, - size_limit: Option, ) -> RusticResult<()> { - if self.has(id) { - return Ok(()); - } self.stats.blobs += 1; - self.stats.data += data_len; - let data_len_packed: u64 = data.len().try_into().map_err(|err| { - RusticError::with_source( - ErrorKind::Internal, - "Failed to convert data length `{length}` to u64.", - err, - ) - .attach_context("length", data.len().to_string()) - })?; - - self.stats.data_packed += data_len_packed; - - let size_limit = size_limit.unwrap_or_else(|| self.pack_sizer.pack_size()); - let offset = self.size; let len = self.write_data(data).map_err(|err| { @@ -596,8 +215,6 @@ impl RawPacker { err, ) .attach_context("id", id.to_string()) - .attach_context("size_limit", size_limit.to_string()) - .attach_context("data_length_packed", data_len_packed.to_string()) })?; self.index @@ -605,23 +222,26 @@ impl RawPacker { self.count += 1; + Ok(()) + } + + /// Determines if the current pack should be saved. + pub fn needs_save(&self) -> bool { + if self.size == 0 { + return false; + } + + let size_limit = self.pack_sizer.pack_size().min(constants::MAX_SIZE); + // check if PackFile needs to be saved let elapsed = self.created.elapsed().unwrap_or_else(|err| { warn!("couldn't get elapsed time from system time: {err:?}"); Duration::ZERO }); - if self.count >= constants::MAX_COUNT + self.count >= constants::MAX_COUNT || self.size >= size_limit || elapsed >= constants::MAX_AGE - { - self.pack_sizer.add_size(self.index.pack_size()); - self.save()?; - self.size = 0; - self.count = 0; - self.created = SystemTime::now(); - } - Ok(()) } /// Writes header and length of header to packfile @@ -644,7 +264,7 @@ impl RawPacker { })?; // encrypt and write to pack file - let data = self.be.key().encrypt_data(&data)?; + let data = self.key.encrypt_data(&data)?; let headerlen: u32 = data.len().try_into().map_err(|err| { RusticError::with_source( @@ -689,290 +309,99 @@ impl RawPacker { Ok(()) } + /// Saves the packfile if conditions for saving are fulfilled + pub fn save_if_needed(&mut self) -> RusticResult> { + if !self.needs_save() { + return Ok(None); + } + + self.save() + } /// Saves the packfile /// /// # Errors /// /// If the header could not be written - /// - /// # Errors - /// - /// * If converting the header length to u32 fails - /// * If the header could not be written - fn save(&mut self) -> RusticResult<()> { + pub fn save(&mut self) -> RusticResult> { + self.created = SystemTime::now(); + self.count = 0; + if self.size == 0 { - return Ok(()); + return Ok(None); } + if self.add_padding { + self.add_padding_blob()?; + } self.write_header()?; - - // write file to backend + // prepare everything for write to the backend + let file = std::mem::take(&mut self.file).into(); let index = std::mem::take(&mut self.index); - let file = std::mem::replace(&mut self.file, BytesMut::new()); - self.file_writer - .as_ref() - .unwrap() - .send((file.into(), index)) - .map_err(|err| { - RusticError::with_source( - ErrorKind::Internal, - "Failed to send packfile to file writer.", - err, - ) - })?; - - Ok(()) - } - - fn has(&self, id: &BlobId) -> bool { - self.index.blobs.iter().any(|b| &b.id == id) - } -} + self.pack_sizer.add_size(self.size); -// TODO: add documentation -/// # Type Parameters -/// -/// * `BE` - The backend type. -#[derive(Clone)] -pub(crate) struct FileWriterHandle { - /// The backend to write to. - be: BE, - /// The shared indexer containing the backend. - indexer: SharedIndexer, - /// Whether the file is cacheable. - cacheable: bool, -} + self.size = 0; -impl FileWriterHandle { - // TODO: add documentation - fn process(&self, load: (Bytes, PackId, IndexPack)) -> RusticResult { - let (file, id, mut index) = load; - index.id = id; - self.be - .write_bytes(FileType::Pack, &id, self.cacheable, file)?; - index.time = Some(Local::now()); - Ok(index) + Ok(Some((file, index))) } - fn index(&self, index: IndexPack) -> RusticResult<()> { - self.indexer.write().unwrap().add(index)?; + // Add a padding blob + fn add_padding_blob(&mut self) -> RusticResult<()> { + pub(super) const KB: u32 = 1024; + pub(super) const MAX_PADDING: u32 = 64 * KB; + + // compute current size including the HeaderEntry and crypt overhead of the padding blob to-add + let size = PackHeaderRef::from_index_pack(&self.index).pack_size() + + HeaderEntry::ENTRY_LEN + + packfile::constants::COMP_OVERHEAD; + + let padding_size = padding_size(size); + + // write padding blob + let data = vec![ + 0; + padding_size + .try_into() + .expect("u32 should convert to usize") + ]; + let id = BlobId(hash(&data)); + let data = self.key.encrypt_data(&data)?; + let padding_size = padding_size.into(); + self.add(&data, &id, padding_size, None)?; + + // correct stats - padding should not contribute to blobs and data_added + self.stats.blobs -= 1; + self.stats.data -= padding_size; Ok(()) } } -// TODO: add documentation -pub(crate) struct Actor { - /// The sender to send blobs to the raw packer. - sender: Sender<(Bytes, IndexPack)>, - /// The receiver to receive the status from the raw packer. - finish: Receiver>, -} - -impl Actor { - /// Creates a new `Actor`. - /// - /// # Type Parameters - /// - /// * `BE` - The backend type. - /// - /// # Arguments - /// - /// * `fwh` - The file writer handle. - /// * `queue_len` - The length of the queue. - /// * `par` - The number of parallel threads. - fn new( - fwh: FileWriterHandle, - queue_len: usize, - _par: usize, - ) -> Self { - let (tx, rx) = bounded(queue_len); - let (finish_tx, finish_rx) = bounded::>(0); - - let _join_handle = std::thread::spawn(move || { - scope(|scope| { - let status = rx - .into_iter() - .readahead_scoped(scope) - .map(|(file, index): (Bytes, IndexPack)| { - let id = hash(&file); - (file, PackId::from(id), index) - }) - .readahead_scoped(scope) - .map(|load| fwh.process(load)) - .readahead_scoped(scope) - .try_for_each(|index| fwh.index(index?)); - _ = finish_tx.send(status); - }) - .unwrap(); - }); - - Self { - sender: tx, - finish: finish_rx, - } +fn padding_size(size: u32) -> u32 { + // compute padding size. Note that we don't add zero-sized blobs here, i.e. padding_size is in 1..=MAX_PADDING. + let padding = constants::MAX_PADDING - size % constants::MAX_PADDING; + if padding == 0 { + constants::MAX_PADDING + } else { + padding } - - /// Sends the given data to the actor. - /// - /// # Arguments - /// - /// * `load` - The data to send. - /// - /// # Errors - /// - /// If sending the message to the actor fails. - fn send(&self, load: (Bytes, IndexPack)) -> PackerResult<()> { - self.sender.send(load.clone()).map_err(|err| { - PackerErrorKind::SendingCrossbeamDataMessage { - data: load.0, - index_pack: load.1, - source: err, - } - })?; - Ok(()) - } - - /// Finalizes the actor and does cleanup - /// - /// # Panics - /// - /// * If the receiver is not present - fn finalize(self) -> RusticResult<()> { - // cancel channel - drop(self.sender); - // wait for items in channel to be processed - self.finish.recv().unwrap() - } -} - -/// The `Repacker` is responsible for repacking blobs into pack files. -/// -/// # Type Parameters -/// -/// * `BE` - The backend to read from. -#[allow(missing_debug_implementations)] -pub struct Repacker -where - BE: DecryptFullBackend, -{ - /// The backend to read from. - be: BE, - /// The packer to write to. - packer: Packer, - /// The size limit of the pack file. - size_limit: u32, } -impl Repacker { - /// Creates a new `Repacker`. - /// - /// # Type Parameters - /// - /// * `BE` - The backend to read from. - /// - /// # Arguments - /// - /// * `be` - The backend to read from. - /// * `blob_type` - The blob type. - /// * `indexer` - The indexer to write to. - /// * `config` - The config file. - /// * `total_size` - The total size of the pack file. - /// - /// # Errors - /// - /// * If the Packer could not be created - pub fn new( - be: BE, - blob_type: BlobType, - indexer: SharedIndexer, - config: &ConfigFile, - total_size: u64, - ) -> RusticResult { - let packer = Packer::new(be.clone(), blob_type, indexer, config, total_size)?; - let size_limit = PackSizer::from_config(config, blob_type, total_size).pack_size(); - Ok(Self { - be, - packer, - size_limit, - }) - } - - /// Adds the blob to the packfile without any check - /// - /// # Arguments - /// - /// * `pack_id` - The pack id - /// * `blob` - The blob to add - /// - /// # Errors - /// - /// * If the blob could not be added - /// * If reading the blob from the backend fails - pub fn add_fast(&self, pack_id: &PackId, blob: &IndexBlob) -> RusticResult<()> { - let data = self.be.read_partial( - FileType::Pack, - pack_id, - blob.tpe.is_cacheable(), - blob.offset, - blob.length, - )?; - - self.packer - .add_raw( - &data, - &blob.id, - 0, - blob.uncompressed_length, - Some(self.size_limit), - ) - .map_err(|err| { - err.overwrite_kind(ErrorKind::Internal) - .prepend_guidance_line( - "Failed to fast-add (unchecked) blob `{blob_id}` to packfile.", - ) - .attach_context("blob_id", blob.id.to_string()) - })?; - - Ok(()) - } - - /// Adds the blob to the packfile - /// - /// # Arguments - /// - /// * `pack_id` - The pack id - /// * `blob` - The blob to add - /// - /// # Errors - /// - /// * If the blob could not be added - /// * If reading the blob from the backend fails - pub fn add(&self, pack_id: &PackId, blob: &IndexBlob) -> RusticResult<()> { - let data = self.be.read_encrypted_partial( - FileType::Pack, - pack_id, - blob.tpe.is_cacheable(), - blob.offset, - blob.length, - blob.uncompressed_length, - )?; - - self.packer - .add_with_sizelimit(data, blob.id, Some(self.size_limit)) - .map_err(|err| { - RusticError::with_source( - ErrorKind::Internal, - "Failed to add blob to packfile.", - err, - ) - })?; - - Ok(()) - } - - /// Finalizes the repacker and returns the stats - pub fn finalize(self) -> RusticResult { - self.packer.finalize() +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_padding_size() { + assert_eq!(padding_size(1), constants::MAX_PADDING - 1); + assert_eq!(padding_size(constants::MAX_PADDING - 1), 1); + assert_eq!(padding_size(constants::MAX_PADDING), constants::MAX_PADDING); + assert_eq!( + padding_size(constants::MAX_PADDING + 1), + constants::MAX_PADDING - 1 + ); + assert_eq!( + padding_size(3 * constants::MAX_PADDING + 5), + constants::MAX_PADDING - 5 + ); } } diff --git a/crates/core/src/blob/repopacker.rs b/crates/core/src/blob/repopacker.rs new file mode 100644 index 000000000..49d798ac4 --- /dev/null +++ b/crates/core/src/blob/repopacker.rs @@ -0,0 +1,485 @@ +use std::num::NonZeroU32; + +use bytes::Bytes; +use chrono::Local; +use crossbeam_channel::{Receiver, Sender, bounded}; +use pariter::{IteratorExt, scope}; + +use crate::{ + backend::{ + FileType, + decrypt::{DecryptFullBackend, DecryptWriteBackend}, + }, + blob::{ + BlobId, BlobType, + pack_sizer::{DefaultPackSizer, FixedPackSizer, PackSizer}, + packer::{Packer, PackerStats}, + }, + crypto::hasher::hash, + error::{ErrorKind, RusticError, RusticResult}, + index::indexer::SharedIndexer, + repofile::{ + configfile::ConfigFile, + indexfile::{IndexBlob, IndexPack}, + packfile::PackId, + }, +}; + +type RawSender = Sender, BlobId, u64, Option)>>; + +/// The `Packer` is responsible for packing blobs into pack files. +/// +/// # Type Parameters +/// +/// * `BE` - The backend type. +#[allow(missing_debug_implementations)] +#[allow(clippy::struct_field_names)] +#[derive(Clone)] +pub struct RepositoryPacker { + /// The shared indexer containing the backend. + indexer: SharedIndexer, + /// The sender to send blobs to the packer. + sender: Sender<(Bytes, BlobId)>, + /// The sender to send raw blobs to the packer. + raw_sender: RawSender, + /// The receiver to receive the status from the raw packer. + finish: Receiver>, +} + +impl RepositoryPacker { + fn padding_from_config(blob_type: BlobType, config: &ConfigFile) -> bool { + if blob_type == BlobType::Data { + config.use_pack_padding() + } else { + false + } + } + + #[allow(clippy::unnecessary_wraps)] + pub fn new_with_default_sizer( + be: BE, + blob_type: BlobType, + indexer: SharedIndexer, + config: &ConfigFile, + total_size: u64, + ) -> RusticResult { + let pack_sizer = DefaultPackSizer::from_config(config, BlobType::Data, total_size); + let add_padding = Self::padding_from_config(blob_type, config); + Self::new(be, blob_type, indexer, pack_sizer, add_padding) + } + + /// Creates a new `Packer`. + /// + /// # Type Parameters + /// + /// * `BE` - The backend type. + /// + /// # Arguments + /// + /// * `be` - The backend to write to. + /// * `blob_type` - The blob type. + /// * `indexer` - The indexer to write to. + /// * `config` - The config file. + /// * `total_size` - The total size of the pack file. + /// + /// # Errors + /// + /// * If sending the message to the raw packer fails. + /// * If converting the data length to u64 fails + #[allow(clippy::unnecessary_wraps)] + pub fn new( + be: BE, + blob_type: BlobType, + indexer: SharedIndexer, + pack_sizer: S, + add_padding: bool, + ) -> RusticResult { + let mut packer = Packer::new(*be.key(), pack_sizer, blob_type, add_padding); + + let file_writer = FileWriter::new( + FileWriterHandle { + be: be.clone(), + indexer: indexer.clone(), + cacheable: blob_type.is_cacheable(), + }, + 1, + 1, + ); + + let (tx, rx) = bounded(0); + let (tx_raw, rx_raw) = bounded(0); + let (finish_tx, finish_rx) = bounded::>(0); + let repository_packer = Self { + indexer: indexer.clone(), + sender: tx, + raw_sender: tx_raw.clone(), + finish: finish_rx, + }; + + let _join_handle = std::thread::spawn(move || { + scope(|scope| { + rx.into_iter() + .readahead_scoped(scope) + // early check if id is already contained and reserve, if not + .filter(|(_, id)| indexer.reserve(id)) + .parallel_map_scoped(scope, |(data, id): (Bytes, BlobId)| { + let (data, data_len, uncompressed_length) = be.process_data(&data)?; + Ok((data, id, u64::from(data_len), uncompressed_length)) + }) + .readahead_scoped(scope) + .for_each(|item: RusticResult<_>| tx_raw.send(item).unwrap()); + }) + .unwrap(); + }); + + let _join_handle_raw = std::thread::spawn(move || { + scope(|scope| { + let status = rx_raw + .into_iter() + .readahead_scoped(scope) + .try_for_each(|item: RusticResult<_>| -> RusticResult<()> { + let (data, id, data_len, ul) = item?; + packer.add(&data, &id, data_len, ul)?; + if let Some((file, index)) = packer.save_if_needed()? { + file_writer.send((file, index))?; + } + + Ok(()) + }) + .and_then(|()| { + let (res, stats) = packer.finalize()?; + if let Some((file, index)) = res { + file_writer.send((file, index))?; + } + file_writer.finalize()?; + Ok(stats) + }); + _ = finish_tx.send(status); + }) + .unwrap(); + }); + + Ok(repository_packer) + } + + /// Adds the blob to the packfile + /// + /// # Arguments + /// + /// * `data` - The blob data + /// * `id` - The blob id + /// + /// # Errors + /// + /// * If sending the message to the raw packer fails. + pub fn add(&self, data: Bytes, id: BlobId) -> RusticResult<()> { + // compute size limit based on total size and size bounds + self.sender.send((data, id)).map_err(|err| { + RusticError::with_source( + ErrorKind::Internal, + "Failed to add blob `{id}` to packfile.", + err, + ) + .attach_context("id", id.to_string()) + .ask_report() + }) + } + + /// Adds the already encrypted (and maybe compressed) blob to the packfile + /// + /// # Arguments + /// + /// * `data` - The blob data + /// * `id` - The blob id + /// * `data_len` - The length of the blob data + /// * `uncompressed_length` - The length of the blob data before compression + /// * `size_limit` - The size limit for the pack file + /// + /// # Errors + /// + /// * If the blob is already present in the index + /// * If sending the message to the raw packer fails. + fn add_raw( + &self, + data: Vec, + id: BlobId, + data_len: u64, + uncompressed_length: Option, + ) -> RusticResult<()> { + // only add if this blob is not present + if self.indexer.reserve(&id) { + // compute size limit based on total size and size bounds + self.raw_sender + .send(Ok((data, id, data_len, uncompressed_length))) + .map_err(|err| { + RusticError::with_source( + ErrorKind::Internal, + "Failed to add blob `{id}` to packfile.", + err, + ) + .attach_context("id", id.to_string()) + .ask_report() + })?; + } + Ok(()) + } + + /// Finalizes the packer and does cleanup + /// + /// # Panics + /// + /// * If the channel could not be dropped + pub fn finalize(self) -> RusticResult { + // cancel channels + drop(self.sender); + drop(self.raw_sender); + // wait for items in channels to be processed + self.finish + .recv() + .expect("Should be able to receive from channel to finalize packer.") + } +} + +/// The handle for the [`FileWriter`] actor, repsonsible for writing pack files and indexing them using an indexer. +/// +/// # Type Parameters +/// +/// * `BE` - The backend type. +#[derive(Clone)] +pub(crate) struct FileWriterHandle { + /// The backend to write to. + be: BE, + /// The shared indexer containing the backend. + indexer: SharedIndexer, + /// Whether the file is cacheable. + cacheable: bool, +} + +impl FileWriterHandle { + fn process(&self, load: (Bytes, PackId, IndexPack)) -> RusticResult { + let (file, id, mut index) = load; + index.id = id; + self.be + .write_bytes(FileType::Pack, &id, self.cacheable, file)?; + index.time = Some(Local::now()); + Ok(index) + } + + fn index(&self, index: IndexPack) -> RusticResult<()> { + self.indexer + .add_and_check_save(index, false, |file| self.be.save_file_no_id(file)) + } +} + +/// An actor which is repsonsible for writing pack files and indexing them using an indexer. +/// +/// This actor is repsonsible for the communication, the actual work is done by [`FileWriterHandle`] +#[derive(Clone)] +pub(crate) struct FileWriter { + /// The sender to send blobs to the raw packer. + sender: Sender<(Bytes, IndexPack)>, + /// The receiver to receive the status from the raw packer. + finish: Receiver>, +} + +impl FileWriter { + /// Creates a new `Actor`. + /// + /// # Type Parameters + /// + /// * `BE` - The backend type. + /// + /// # Arguments + /// + /// * `fwh` - The file writer handle. + /// * `queue_len` - The length of the queue. + /// * `par` - The number of parallel threads. + fn new( + fwh: FileWriterHandle, + queue_len: usize, + _par: usize, + ) -> Self { + let (tx, rx) = bounded(queue_len); + let (finish_tx, finish_rx) = bounded::>(0); + + let _join_handle = std::thread::spawn(move || { + let status = scope(|scope| -> RusticResult<_> { + rx.into_iter() + .readahead_scoped(scope) + .map(|(file, index): (Bytes, IndexPack)| { + let id = hash(&file); + (file, PackId::from(id), index) + }) + .readahead_scoped(scope) + .map(|load| fwh.process(load)) + .readahead_scoped(scope) + .try_for_each(|index| fwh.index(index?)) + }) + .unwrap(); + drop(fwh); + _ = finish_tx.send(status); + }); + + Self { + sender: tx, + finish: finish_rx, + } + } + + /// Sends the given data to the actor. + /// + /// # Arguments + /// + /// * `load` - The data to send. + /// + /// # Errors + /// + /// If sending the message to the actor fails. + fn send(&self, load: (Bytes, IndexPack)) -> RusticResult<()> { + self.sender.send(load).map_err(|err| { + RusticError::with_source( + ErrorKind::Internal, + "Failed to send packfile to file writer.", + err, + ) + })?; + Ok(()) + } + + /// Finalizes the actor and does cleanup + /// + /// # Panics + /// + /// * If the receiver is not present + fn finalize(self) -> RusticResult<()> { + // cancel channel + drop(self.sender); + // wait for items in channel to be processed + self.finish.recv().unwrap() + } +} + +/// The `Repacker` is responsible for repacking blobs into pack files. +/// +/// # Type Parameters +/// +/// * `BE` - The backend to read from. +#[allow(missing_debug_implementations)] +pub struct Repacker +where + BE: DecryptFullBackend, +{ + /// The backend to read from. + be: BE, + /// The packer to write to. + packer: RepositoryPacker, + /// The size limit of the pack file. + size_limit: u32, +} + +impl Repacker { + /// Creates a new `Repacker`. + /// + /// # Type Parameters + /// + /// * `BE` - The backend to read from. + /// + /// # Arguments + /// + /// * `be` - The backend to read from. + /// * `blob_type` - The blob type. + /// * `indexer` - The indexer to write to. + /// * `config` - The config file. + /// * `total_size` - The total size of the pack file. + /// + /// # Errors + /// + /// * If the Packer could not be created + pub fn new( + be: BE, + blob_type: BlobType, + indexer: SharedIndexer, + config: &ConfigFile, + total_size: u64, + ) -> RusticResult { + let size_limit = DefaultPackSizer::from_config(config, blob_type, total_size).pack_size(); + let repo_sizer = FixedPackSizer(size_limit); + let add_padding = RepositoryPacker::padding_from_config(blob_type, config); + let packer = + RepositoryPacker::new(be.clone(), blob_type, indexer, repo_sizer, add_padding)?; + Ok(Self { + be, + packer, + size_limit, + }) + } + + /// Adds the blob to the packfile without any check + /// + /// # Arguments + /// + /// * `pack_id` - The pack id + /// * `blob` - The blob to add + /// + /// # Errors + /// + /// * If the blob could not be added + /// * If reading the blob from the backend fails + pub fn add_fast(&self, pack_id: &PackId, blob: &IndexBlob) -> RusticResult<()> { + let data = self.be.read_partial( + FileType::Pack, + pack_id, + blob.tpe.is_cacheable(), + blob.offset, + blob.length, + )?; + + self.packer + .add_raw( + data.to_vec(), + blob.id, + blob.length.into(), + blob.uncompressed_length, + ) + .map_err(|err| { + err.overwrite_kind(ErrorKind::Internal) + .prepend_guidance_line( + "Failed to fast-add (unchecked) blob `{blob_id}` to packfile.", + ) + .attach_context("blob_id", blob.id.to_string()) + })?; + + Ok(()) + } + + /// Adds the blob to the packfile + /// + /// # Arguments + /// + /// * `pack_id` - The pack id + /// * `blob` - The blob to add + /// + /// # Errors + /// + /// * If the blob could not be added + /// * If reading the blob from the backend fails + pub fn add(&self, pack_id: &PackId, blob: &IndexBlob) -> RusticResult<()> { + let data = self.be.read_encrypted_partial( + FileType::Pack, + pack_id, + blob.tpe.is_cacheable(), + blob.offset, + blob.length, + blob.uncompressed_length, + )?; + + self.packer.add(data, blob.id)?; + + Ok(()) + } + + /// Finalizes the repacker and returns the stats + pub fn finalize(self) -> RusticResult { + self.packer.finalize() + } +} diff --git a/crates/core/src/chunker.rs b/crates/core/src/chunker.rs index 9516a306e..41b2d2f1f 100644 --- a/crates/core/src/chunker.rs +++ b/crates/core/src/chunker.rs @@ -29,7 +29,7 @@ const fn default_predicate(x: u64) -> bool { } /// `ChunkIter` is an iterator that chunks data. -pub(crate) struct ChunkIter { +pub(crate) struct ChunkIter { /// The buffer used for reading. buf: Vec, @@ -58,7 +58,7 @@ pub(crate) struct ChunkIter { finished: bool, } -impl ChunkIter { +impl ChunkIter { /// Creates a new `ChunkIter`. /// /// # Arguments @@ -81,7 +81,7 @@ impl ChunkIter { } } -impl Iterator for ChunkIter { +impl Iterator for ChunkIter { type Item = RusticResult>; fn next(&mut self) -> Option { diff --git a/crates/core/src/commands/config.rs b/crates/core/src/commands/config.rs index 3673a57bc..8e3afc08b 100644 --- a/crates/core/src/commands/config.rs +++ b/crates/core/src/commands/config.rs @@ -109,6 +109,10 @@ pub struct ConfigOptions { #[cfg_attr(feature = "clap", clap(long, value_name = "VERSION"))] pub set_version: Option, + /// Set padding for data packs + #[cfg_attr(feature = "clap", clap(long))] + pub set_use_pack_padding: Option, + /// Set append-only mode. /// Note that only append-only commands work once this is set. `forget`, `prune` or `config` won't work any longer. #[cfg_attr(feature = "clap", clap(long))] @@ -209,6 +213,8 @@ impl ConfigOptions { config.version = version; } + config.use_pack_padding = self.set_use_pack_padding; + if let Some(compression) = self.set_compression { if config.version == 1 && compression != 0 { return Err(RusticError::new( diff --git a/crates/core/src/commands/copy.rs b/crates/core/src/commands/copy.rs index fd7a17bcd..55dbcf04d 100644 --- a/crates/core/src/commands/copy.rs +++ b/crates/core/src/commands/copy.rs @@ -5,7 +5,7 @@ use rayon::prelude::{IntoParallelRefIterator, ParallelBridge, ParallelIterator}; use crate::{ backend::{decrypt::DecryptWriteBackend, node::NodeType}, - blob::{BlobId, BlobType, packer::Packer, tree::TreeStreamerOnce}, + blob::{BlobId, BlobType, repopacker::RepositoryPacker, tree::TreeStreamerOnce}, error::RusticResult, index::{ReadIndex, indexer::Indexer}, progress::{Progress, ProgressBars}, @@ -58,16 +58,16 @@ pub(crate) fn copy<'a, Q, R: IndexedFull, P: ProgressBars, S: IndexedIds>( let be = repo.dbe(); let index = repo.index(); let index_dest = repo_dest.index(); - let indexer = Indexer::new(be_dest.clone()).into_shared(); + let indexer = Indexer::new().into_shared(); - let data_packer = Packer::new( + let data_packer = RepositoryPacker::new_with_default_sizer( be_dest.clone(), BlobType::Data, indexer.clone(), repo_dest.config(), index_dest.total_size(BlobType::Data), )?; - let tree_packer = Packer::new( + let tree_packer = RepositoryPacker::new_with_default_sizer( be_dest.clone(), BlobType::Tree, indexer.clone(), @@ -128,7 +128,7 @@ pub(crate) fn copy<'a, Q, R: IndexedFull, P: ProgressBars, S: IndexedIds>( _ = data_packer.finalize()?; _ = tree_packer.finalize()?; - indexer.write().unwrap().finalize()?; + indexer.finalize_and_check_save(|file| be_dest.save_file_no_id(file))?; let p = pb.progress_counter("saving snapshots..."); be_dest.save_list(snaps.iter(), p)?; diff --git a/crates/core/src/commands/merge.rs b/crates/core/src/commands/merge.rs index 5533f9687..3b28bd626 100644 --- a/crates/core/src/commands/merge.rs +++ b/crates/core/src/commands/merge.rs @@ -8,7 +8,7 @@ use crate::{ backend::{decrypt::DecryptWriteBackend, node::Node}, blob::{ BlobId, BlobType, - packer::Packer, + repopacker::RepositoryPacker, tree::{self, Tree, TreeId}, }, error::{ErrorKind, RusticError, RusticResult}, @@ -103,8 +103,8 @@ pub(crate) fn merge_trees( ) -> RusticResult { let be = repo.dbe(); let index = repo.index(); - let indexer = Indexer::new(repo.dbe().clone()).into_shared(); - let packer = Packer::new( + let indexer = Indexer::new().into_shared(); + let packer = RepositoryPacker::new_with_default_sizer( repo.dbe().clone(), BlobType::Tree, indexer.clone(), @@ -136,7 +136,7 @@ pub(crate) fn merge_trees( let p = repo.pb.progress_spinner("merging snapshots..."); let tree_merged = tree::merge_trees(be, index, trees, cmp, &save, summary)?; let stats = packer.finalize()?; - indexer.write().unwrap().finalize()?; + indexer.finalize_and_check_save(|file| be.save_file_no_id(file))?; p.finish(); stats.apply(summary, BlobType::Tree); diff --git a/crates/core/src/commands/prune.rs b/crates/core/src/commands/prune.rs index 2ea1201ba..4bb1df8d5 100644 --- a/crates/core/src/commands/prune.rs +++ b/crates/core/src/commands/prune.rs @@ -27,7 +27,8 @@ use crate::{ }, blob::{ BlobId, BlobType, BlobTypeMap, Initialize, - packer::{PackSizer, Repacker}, + pack_sizer::{DefaultPackSizer, PackSizer}, + repopacker::Repacker, tree::TreeStreamerOnce, }, error::{ErrorKind, RusticError, RusticResult}, @@ -38,7 +39,7 @@ use crate::{ }, progress::{Progress, ProgressBars}, repofile::{ - HeaderEntry, IndexBlob, IndexFile, IndexPack, SnapshotFile, SnapshotId, indexfile::IndexId, + HeaderEntry, IndexFile, IndexPack, SnapshotFile, SnapshotId, indexfile::IndexId, packfile::PackId, }, repository::{Open, Repository}, @@ -182,7 +183,7 @@ impl PruneOptions { } /// Enum to specify a size limit -#[derive(Clone, Copy, Debug)] +#[derive(Clone, Copy, Debug, Default)] #[non_exhaustive] pub enum LimitOption { /// Size in bytes @@ -190,6 +191,7 @@ pub enum LimitOption { /// Size in percentage of repository size Percentage(u64), /// No limit + #[default] Unlimited, } @@ -236,16 +238,18 @@ pub enum PackStatus { TooSmall, HasUnusedBlobs, HasUsedBlobs, + HasPadding, Marked, } -#[derive(Debug, Clone, Copy, Serialize)] +#[derive(Debug, Default, Clone, Copy, Serialize)] pub struct DebugDetailedStats { pub packs: u64, pub unused_blobs: u64, pub unused_size: u64, pub used_blobs: u64, pub used_size: u64, + pub padding_size: u64, } #[derive(Debug, Clone, Copy, Serialize, PartialEq, Eq, PartialOrd, Ord)] @@ -268,18 +272,13 @@ impl DebugStats { blob_type, status, }) - .or_insert(DebugDetailedStats { - packs: 0, - unused_blobs: 0, - unused_size: 0, - used_blobs: 0, - used_size: 0, - }); + .or_default(); details.packs += 1; details.unused_blobs += u64::from(pi.unused_blobs); details.unused_size += u64::from(pi.unused_size); details.used_blobs += u64::from(pi.used_blobs); details.used_size += u64::from(pi.used_size); + details.padding_size += u64::from(pi.padding_size); } } @@ -404,7 +403,7 @@ struct PruneIndex { impl PruneIndex { // TODO: add documentation! fn len(&self) -> usize { - self.packs.iter().map(|p| p.blobs.len()).sum() + self.packs.iter().map(|p| p.index.blobs.len()).sum() } } @@ -438,8 +437,6 @@ impl Default for PackToDo { /// A pack which is to be pruned #[derive(Debug)] struct PrunePack { - /// The id of the pack - id: PackId, /// The type of the pack blob_type: BlobType, /// The size of the pack @@ -448,10 +445,8 @@ struct PrunePack { delete_mark: bool, /// The task to be executed on the pack to_do: PackToDo, - /// The time the pack was created - time: Option>, - /// The blobs in the pack - blobs: Vec, + /// the pack as saved in the index + index: IndexPack, } impl PrunePack { @@ -462,14 +457,14 @@ impl PrunePack { /// * `p` - The `IndexPack` to create the `PrunePack` from /// * `delete_mark` - Whether the pack is marked for deletion fn from_index_pack(p: IndexPack, delete_mark: bool) -> Self { + let blob_type = p.blob_type(); + let size = p.pack_size(); Self { - id: p.id, - blob_type: p.blob_type(), - size: p.pack_size(), + index: p, + blob_type, + size, delete_mark, to_do: PackToDo::Undecided, - time: p.time, - blobs: p.blobs, } } @@ -493,12 +488,7 @@ impl PrunePack { /// Convert the `PrunePack` into an `IndexPack` fn into_index_pack(self) -> IndexPack { - IndexPack { - id: self.id, - time: self.time, - size: None, - blobs: self.blobs, - } + self.index } /// Convert the `PrunePack` into an `IndexPack` with the given time @@ -507,12 +497,9 @@ impl PrunePack { /// /// * `time` - The time to set fn into_index_pack_with_time(self, time: DateTime) -> IndexPack { - IndexPack { - id: self.id, - time: Some(time), - size: None, - blobs: self.blobs, - } + let mut index_pack = self.into_index_pack(); + index_pack.time = Some(time); + index_pack } /// Set the task to be executed on the pack @@ -567,7 +554,8 @@ impl PrunePack { /// Returns whether the pack is compressed fn is_compressed(&self) -> bool { - self.blobs + self.index + .blobs .iter() .all(|blob| blob.uncompressed_length.is_some()) } @@ -576,6 +564,8 @@ impl PrunePack { /// Reasons why a pack should be repacked #[derive(PartialEq, Eq, Debug)] enum RepackReason { + /// We repack-all was selected + RepackAll, /// The pack is partly used PartlyUsed, /// The pack is to be compressed @@ -585,8 +575,27 @@ enum RepackReason { } /// A plan what should be repacked or removed by a `prune` run -#[derive(Debug)] +#[derive(Debug, Default)] +#[allow(clippy::struct_excessive_bools)] pub struct PrunePlan { + /// The how long to keep packs before considering repacking them + keep_pack: Duration, + /// The how long to keep pack marked for deletion before relly removing them + keep_delete: Duration, + /// indicates whether to only repack cacheable packs (i.e. tree packs) + repack_cacheable_only: bool, + /// indicates whether to repack uncompressed packs + repack_uncompressed: bool, + /// indicates whether to use padding for data packs + use_padding: bool, + /// indicates whether to repack all packs + repack_all: bool, + /// limit packs to repack + max_repack: LimitOption, + /// limit maximum unused size after repacking + max_unused: LimitOption, + /// indicates whether not to resize packs with non-fitting sizes + no_resize: bool, /// The time the plan was created time: DateTime, /// The ids of the blobs which are used @@ -657,7 +666,7 @@ impl PrunePlan { let mut modified = false; index.packs.retain(|p| { !p.delete_mark || { - let duplicate = processed_packs.contains(&p.id); + let duplicate = processed_packs.contains(&p.index.id); modified |= duplicate; !duplicate } @@ -670,9 +679,8 @@ impl PrunePlan { time: Local::now(), used_ids, existing_packs, - repack_candidates: Vec::new(), index_files, - stats: PruneStats::default(), + ..Default::default() } } @@ -744,42 +752,40 @@ impl PrunePlan { let mut pruner = Self::new(used_ids, existing_packs, index_files); pruner.count_used_blobs(); pruner.check()?; - let repack_cacheable_only = opts + + pruner.keep_pack = Duration::from_std(*opts.keep_pack).map_err(|err| { + RusticError::with_source( + ErrorKind::Internal, + "Failed to convert keep_pack duration `{keep_pack}` to std::time::Duration.", + err, + ) + .attach_context("keep_pack", opts.keep_pack.to_string()) + })?; + + pruner.keep_delete = Duration::from_std(*opts.keep_delete).map_err(|err| { + RusticError::with_source( + ErrorKind::Internal, + "Failed to convert keep_delete duration `{keep_delete}` to std::time::Duration.", + err, + ) + .attach_context("keep_delete", opts.keep_delete.to_string()) + })?; + + pruner.repack_cacheable_only = opts .repack_cacheable_only .unwrap_or_else(|| repo.config().is_hot == Some(true)); + pruner.repack_uncompressed = opts.repack_uncompressed; + pruner.repack_all = opts.repack_all; + pruner.use_padding = repo.config().use_pack_padding(); + pruner.max_repack = opts.max_repack; + pruner.max_unused = opts.max_unused; + pruner.no_resize = opts.no_resize; + let pack_sizer = - total_size.map(|tpe, size| PackSizer::from_config(repo.config(), tpe, size)); + total_size.map(|tpe, size| DefaultPackSizer::from_config(repo.config(), tpe, size)); - pruner.decide_packs( - Duration::from_std(*opts.keep_pack).map_err(|err| { - RusticError::with_source( - ErrorKind::Internal, - "Failed to convert keep_pack duration `{keep_pack}` to std::time::Duration.", - err, - ) - .attach_context("keep_pack", opts.keep_pack.to_string()) - })?, - Duration::from_std(*opts.keep_delete).map_err(|err| { - RusticError::with_source( - ErrorKind::Internal, - "Failed to convert keep_delete duration `{keep_delete}` to std::time::Duration.", - err, - ) - .attach_context("keep_delete", opts.keep_delete.to_string()) - })?, - repack_cacheable_only, - opts.repack_uncompressed, - opts.repack_all, - &pack_sizer, - )?; - - pruner.decide_repack( - &opts.max_repack, - &opts.max_unused, - opts.repack_uncompressed || opts.repack_all, - opts.no_resize, - &pack_sizer, - ); + pruner.decide_packs(&pack_sizer)?; + pruner.decide_repack(&pack_sizer); pruner.check_existing_packs()?; pruner.filter_index_files(opts.instant_delete); @@ -793,7 +799,7 @@ impl PrunePlan { .index_files .iter() .flat_map(|index| &index.packs) - .flat_map(|pack| &pack.blobs) + .flat_map(|pack| &pack.index.blobs) { if let Some(count) = self.used_ids.get_mut(&blob.id) { // note that duplicates are only counted up to 255. If there are more @@ -840,15 +846,7 @@ impl PrunePlan { // TODO: add errors! #[allow(clippy::too_many_lines)] #[allow(clippy::unnecessary_wraps)] - fn decide_packs( - &mut self, - keep_pack: Duration, - keep_delete: Duration, - repack_cacheable_only: bool, - repack_uncompressed: bool, - repack_all: bool, - pack_sizer: &BlobTypeMap, - ) -> RusticResult<()> { + fn decide_packs(&mut self, pack_sizer: &BlobTypeMap) -> RusticResult<()> { // first process all marked packs then the unmarked ones: // - first processed packs are more likely to have all blobs seen as unused // - if marked packs have used blob but these blobs are all present in @@ -870,13 +868,18 @@ impl PrunePlan { let mut status = EnumSet::empty(); // Various checks to determine if packs need to be kept - let too_young = pack.time > Some(self.time - keep_pack); + let too_young = pack.index.time > Some(self.time - self.keep_pack); if too_young && !pack.delete_mark { _ = status.insert(PackStatus::TooYoung); } - let keep_uncacheable = repack_cacheable_only && !pack.blob_type.is_cacheable(); + let keep_uncacheable = + self.repack_cacheable_only && !pack.blob_type.is_cacheable(); + let has_padding = self.use_padding && pi.padding_size != 0; + if has_padding { + _ = status.insert(PackStatus::HasPadding); + } - let to_compress = repack_uncompressed && !pack.is_compressed(); + let to_compress = self.repack_uncompressed && !pack.is_compressed(); if to_compress { _ = status.insert(PackStatus::NotCompressed); } @@ -905,22 +908,16 @@ impl PrunePlan { _ = status.insert(PackStatus::HasUsedBlobs); if too_young || keep_uncacheable { pack.set_todo(PackToDo::Keep, &pi, status, &mut self.stats); - } else if to_compress || repack_all { - self.repack_candidates.push(( - pi, - status, - RepackReason::ToCompress, - index_num, - pack_num, - )); - } else if size_mismatch { - self.repack_candidates.push(( - pi, - status, - RepackReason::SizeMismatch, - index_num, - pack_num, - )); + } else if to_compress || self.repack_all | size_mismatch { + let reason = if to_compress { + RepackReason::ToCompress + } else if self.repack_all { + RepackReason::RepackAll + } else { + RepackReason::SizeMismatch + }; + self.repack_candidates + .push((pi, status, reason, index_num, pack_num)); } else { pack.set_todo(PackToDo::Keep, &pi, status, &mut self.stats); } @@ -932,8 +929,8 @@ impl PrunePlan { status .insert_all(PackStatus::HasUsedBlobs | PackStatus::HasUnusedBlobs); - if too_young || keep_uncacheable { - // keep packs which are too young and non-cacheable packs if requested + if too_young || keep_uncacheable || has_padding { + // keep packs which have padding, are too young and non-cacheable packs if requested pack.set_todo(PackToDo::Keep, &pi, status, &mut self.stats); } else { // other partly used pack => candidate for repacking @@ -948,10 +945,10 @@ impl PrunePlan { } (true, 0, _) => { _ = status.insert(PackStatus::Marked); - match pack.time { + match pack.index.time { // unneeded and marked pack => check if we can remove it. Some(local_date_time) - if self.time - local_date_time >= keep_delete => + if self.time - local_date_time >= self.keep_delete => { _ = status.insert(PackStatus::TooYoung); pack.set_todo(PackToDo::Delete, &pi, status, &mut self.stats); @@ -959,7 +956,7 @@ impl PrunePlan { None => { warn!( "pack to delete {}: no time set, this should not happen! Keeping this pack.", - pack.id + pack.index.id ); _ = status.insert(PackStatus::TimeNotSet); pack.set_todo( @@ -1002,15 +999,8 @@ impl PrunePlan { /// # Errors /// // TODO: add errors! - fn decide_repack( - &mut self, - max_repack: &LimitOption, - max_unused: &LimitOption, - repack_uncompressed: bool, - no_resize: bool, - pack_sizer: &BlobTypeMap, - ) { - let max_unused = match (repack_uncompressed, max_unused) { + fn decide_repack(&mut self, pack_sizer: &BlobTypeMap) { + let max_unused = match (self.repack_uncompressed | self.repack_all, self.max_unused) { (true, _) => 0, (false, LimitOption::Unlimited) => u64::MAX, (false, LimitOption::Size(size)) => size.as_u64(), @@ -1020,7 +1010,7 @@ impl PrunePlan { (false, LimitOption::Percentage(p)) => (p * self.stats.size_sum().used) / (100 - p), }; - let max_repack = match max_repack { + let max_repack = match self.max_repack { LimitOption::Unlimited => u64::MAX, LimitOption::Size(size) => size.as_u64(), LimitOption::Percentage(p) => (p * self.stats.size_sum().total()) / 100, @@ -1042,7 +1032,7 @@ impl PrunePlan { || (self.stats.size_sum().unused_after_prune() < max_unused && repack_reason == RepackReason::PartlyUsed && blob_type == BlobType::Data) - || (repack_reason == RepackReason::SizeMismatch && no_resize) + || (repack_reason == RepackReason::SizeMismatch && self.no_resize) { pack.set_todo(PackToDo::Keep, &pi, status, &mut self.stats); } else if repack_reason == RepackReason::SizeMismatch { @@ -1080,7 +1070,7 @@ impl PrunePlan { /// * If a pack does not exist fn check_existing_packs(&mut self) -> RusticResult<()> { for pack in self.index_files.iter().flat_map(|index| &index.packs) { - let existing_size = self.existing_packs.remove(&pack.id); + let existing_size = self.existing_packs.remove(&pack.index.id); // TODO: Unused Packs which don't exist (i.e. only existing in index) let check_size = || { @@ -1090,12 +1080,12 @@ impl PrunePlan { ErrorKind::Internal, "Pack size `{size_in_pack_real}` of id `{pack_id}` does not match the expected size `{size_in_index_expected}` in the index file. ", ) - .attach_context("pack_id", pack.id.to_string()) + .attach_context("pack_id", pack.index.id.to_string()) .attach_context("size_in_index_expected", pack.size.to_string()) .attach_context("size_in_pack_real", size.to_string()) .ask_report()), None => Err(RusticError::new(ErrorKind::Internal, "Pack `{pack_id}` does not exist.") - .attach_context("pack_id", pack.id.to_string()) + .attach_context("pack_id", pack.index.id.to_string()) .ask_report()), } }; @@ -1106,11 +1096,11 @@ impl PrunePlan { ErrorKind::Internal, "Pack `{pack_id}` got no decision what to do with it!", ) - .attach_context("pack_id", pack.id.to_string()) + .attach_context("pack_id", pack.index.id.to_string()) .ask_report()); } PackToDo::Keep | PackToDo::Recover => { - for blob in &pack.blobs { + for blob in &pack.index.blobs { _ = self.used_ids.remove(&blob.id); } check_size()?; @@ -1175,7 +1165,7 @@ impl PrunePlan { .iter() .flat_map(|index| &index.packs) .filter(|pack| pack.to_do == PackToDo::Repack) - .map(|pack| pack.id) + .map(|pack| pack.index.id) .collect() } @@ -1247,7 +1237,7 @@ pub(crate) fn prune_repository( let be = repo.dbe(); let pb = &repo.pb; - let indexer = Indexer::new_unindexed(be.clone()).into_shared(); + let mut indexer = Indexer::new_unindexed(); // Calculate an approximation of sizes after pruning. // The size actually is: @@ -1265,22 +1255,6 @@ pub(crate) fn prune_repository( * u64::from(HeaderEntry::ENTRY_LEN_COMPRESSED) }); - let tree_repacker = Repacker::new( - be.clone(), - BlobType::Tree, - indexer.clone(), - repo.config(), - size_after_prune[BlobType::Tree], - )?; - - let data_repacker = Repacker::new( - be.clone(), - BlobType::Data, - indexer.clone(), - repo.config(), - size_after_prune[BlobType::Data], - )?; - // mark unreferenced packs for deletion if !prune_plan.existing_packs.is_empty() { if opts.instant_delete { @@ -1297,7 +1271,10 @@ pub(crate) fn prune_repository( time: Some(Local::now()), blobs: Vec::new(), }; - indexer.write().unwrap().add_remove(pack)?; + indexer.add_remove(pack); + if let Some(file) = indexer.save_if_needed() { + _ = be.save_file(&file)?; + } p.inc(1); } p.finish(); @@ -1321,15 +1298,33 @@ pub(crate) fn prune_repository( p.set_length(prune_plan.stats.size_sum().repack - prune_plan.stats.size_sum().repackrm); let mut indexes_remove = Vec::new(); + let indexer = indexer.into_shared(); let tree_packs_remove = Arc::new(Mutex::new(Vec::new())); let data_packs_remove = Arc::new(Mutex::new(Vec::new())); - let delete_pack = |pack: PrunePack| { + let tree_repacker = Repacker::new( + be.clone(), + BlobType::Tree, + indexer.clone(), + repo.config(), + size_after_prune[BlobType::Tree], + )?; + + let data_repacker = Repacker::new( + be.clone(), + BlobType::Data, + indexer.clone(), + repo.config(), + size_after_prune[BlobType::Data], + )?; + + let delete_pack = |pack: PrunePack| -> Option<_> { // delete pack match pack.blob_type { - BlobType::Data => data_packs_remove.lock().unwrap().push(pack.id), - BlobType::Tree => tree_packs_remove.lock().unwrap().push(pack.id), + BlobType::Data => data_packs_remove.lock().unwrap().push(pack.index.id), + BlobType::Tree => tree_packs_remove.lock().unwrap().push(pack.index.id), } + None }; let used_ids = Arc::new(Mutex::new(prune_plan.used_ids)); @@ -1353,23 +1348,23 @@ pub(crate) fn prune_repository( packs .into_par_iter() .try_for_each(|pack| -> RusticResult<_> { - match pack.to_do { + let to_index = match pack.to_do { PackToDo::Undecided => { return Err(RusticError::new( ErrorKind::Internal, "Pack `{pack_id}` got no decision what to do with it!", ) - .attach_context("pack_id", pack.id.to_string()) + .attach_context("pack_id", pack.index.id.to_string()) .ask_report()); } PackToDo::Keep => { // keep pack: add to new index let pack = pack.into_index_pack(); - indexer.write().unwrap().add(pack)?; + Some((pack, false)) } PackToDo::Repack => { // TODO: repack in parallel - for blob in &pack.blobs { + for blob in &pack.index.blobs { if used_ids.lock().unwrap().remove(&blob.id).is_none() { // don't save duplicate blobs continue; @@ -1380,52 +1375,55 @@ pub(crate) fn prune_repository( BlobType::Tree => &tree_repacker, }; if opts.fast_repack { - repacker.add_fast(&pack.id, blob)?; + repacker.add_fast(&pack.index.id, blob)?; } else { - repacker.add(&pack.id, blob)?; + repacker.add(&pack.index.id, blob)?; } p.inc(u64::from(blob.length)); } if opts.instant_delete { - delete_pack(pack); + delete_pack(pack) } else { // mark pack for removal let pack = pack.into_index_pack_with_time(prune_plan.time); - indexer.write().unwrap().add_remove(pack)?; + Some((pack, true)) } } PackToDo::MarkDelete => { if opts.instant_delete { - delete_pack(pack); + delete_pack(pack) } else { // mark pack for removal let pack = pack.into_index_pack_with_time(prune_plan.time); - indexer.write().unwrap().add_remove(pack)?; + Some((pack, true)) } } PackToDo::KeepMarked | PackToDo::KeepMarkedAndCorrect => { if opts.instant_delete { - delete_pack(pack); + delete_pack(pack) } else { // keep pack: add to new index; keep the timestamp. // Note the timestamp shouldn't be None here, however if it is not not set, use the current time to heal the entry! - let time = pack.time.unwrap_or(prune_plan.time); + let time = pack.index.time.unwrap_or(prune_plan.time); let pack = pack.into_index_pack_with_time(time); - indexer.write().unwrap().add_remove(pack)?; + Some((pack, true)) } } PackToDo::Recover => { // recover pack: add to new index in section packs let pack = pack.into_index_pack_with_time(prune_plan.time); - indexer.write().unwrap().add(pack)?; + Some((pack, false)) } PackToDo::Delete => delete_pack(pack), + }; + if let Some((pack, delete)) = to_index { + indexer.add_and_check_save(pack, delete, |file| be.save_file_no_id(file))?; } Ok(()) })?; _ = tree_repacker.finalize()?; _ = data_repacker.finalize()?; - indexer.write().unwrap().finalize()?; + indexer.finalize_and_check_save(|file| be.save_file_no_id(file))?; p.finish(); // remove old index files first as they may reference pack files which are removed soon. @@ -1464,6 +1462,8 @@ struct PackInfo { used_size: u32, /// The size of the unused blobs in the pack unused_size: u32, + // The size of an unused blob (also contained in unused_size) which may be a padding blob ; zero if there is no padding blob detected + padding_size: u32, } impl PartialOrd for PackInfo { @@ -1499,6 +1499,7 @@ impl PackInfo { unused_blobs: 0, used_size: 0, unused_size: 0, + padding_size: 0, }; // We search all blobs in the pack for needed ones. We do this by already marking @@ -1507,7 +1508,7 @@ impl PackInfo { // Note that by this processing, we are also able to handle duplicate blobs within a pack // correctly. // If we found a needed blob, we stop and process the information that the pack is actually needed. - let first_needed = pack.blobs.iter().position(|blob| { + let first_needed = pack.index.blobs.iter().position(|blob| { match used_ids.get_mut(&blob.id) { None | Some(0) => { pi.unused_size += blob.length; @@ -1533,7 +1534,7 @@ impl PackInfo { if let Some(first_needed) = first_needed { // The pack is actually needed. // We reprocess the blobs up to the first needed one and mark all blobs which are genarally needed as used. - for blob in &pack.blobs[..first_needed] { + for blob in &pack.index.blobs[..first_needed] { match used_ids.get_mut(&blob.id) { None | Some(0) => {} // already correctly marked Some(count) => { @@ -1547,9 +1548,12 @@ impl PackInfo { } } // Then we process the remaining blobs and mark all blobs which are generally needed as used in this blob - for blob in &pack.blobs[first_needed + 1..] { + for (num, blob) in pack.index.blobs[first_needed + 1..].iter().enumerate() { match used_ids.get_mut(&blob.id) { None | Some(0) => { + if pi.unused_size == 0 && num == pack.index.blobs.len() - 1 { + pi.padding_size = blob.length; + } pi.unused_size += blob.length; pi.unused_blobs += 1; } diff --git a/crates/core/src/commands/repair/index.rs b/crates/core/src/commands/repair/index.rs index 9659bab77..c3f3a35ae 100644 --- a/crates/core/src/commands/repair/index.rs +++ b/crates/core/src/commands/repair/index.rs @@ -73,7 +73,7 @@ pub(crate) fn repair_index( let pack_read_header = checker.into_pack_to_read(); repo.warm_up_wait(pack_read_header.iter().map(|(id, _, _)| *id))?; - let indexer = Indexer::new(be.clone()).into_shared(); + let mut indexer = Indexer::new(); let p = repo.pb.progress_counter("reading pack headers"); p.set_length(pack_read_header.len().try_into().map_err(|err| { @@ -101,13 +101,18 @@ pub(crate) fn repair_index( }; if !dry_run { // write pack file to index - without the delete mark - indexer.write().unwrap().add_with(pack, false)?; + indexer.add_with(pack, false); + if let Some(file) = indexer.save_if_needed() { + _ = be.save_file(&file)?; + } } } } p.inc(1); } - indexer.write().unwrap().finalize()?; + if let Some(file) = indexer.finalize() { + _ = be.save_file(&file)?; + } p.finish(); Ok(()) diff --git a/crates/core/src/commands/repair/snapshots.rs b/crates/core/src/commands/repair/snapshots.rs index ed4370088..0bd0059e3 100644 --- a/crates/core/src/commands/repair/snapshots.rs +++ b/crates/core/src/commands/repair/snapshots.rs @@ -11,7 +11,7 @@ use crate::{ }, blob::{ BlobId, BlobType, - packer::Packer, + repopacker::RepositoryPacker, tree::{Tree, TreeId}, }, error::{ErrorKind, RusticError, RusticResult}, @@ -106,8 +106,8 @@ pub(crate) fn repair_snapshots( let mut state = RepairState::default(); - let indexer = Indexer::new(be.clone()).into_shared(); - let mut packer = Packer::new( + let indexer = Indexer::new().into_shared(); + let mut packer = RepositoryPacker::new_with_default_sizer( be.clone(), BlobType::Tree, indexer.clone(), @@ -154,7 +154,7 @@ pub(crate) fn repair_snapshots( if !dry_run { _ = packer.finalize()?; - indexer.write().unwrap().finalize()?; + indexer.finalize_and_check_save(|file| be.save_file_no_id(file))?; } if opts.delete { @@ -191,11 +191,11 @@ pub(crate) fn repair_snapshots( /// # Returns /// /// A tuple containing the change status and the id of the repaired tree -pub(crate) fn repair_tree( +pub(crate) fn repair_tree( be: &impl DecryptFullBackend, opts: &RepairSnapshotsOptions, index: &impl ReadGlobalIndex, - packer: &mut Packer, + packer: &mut RepositoryPacker, id: Option, state: &mut RepairState, dry_run: bool, diff --git a/crates/core/src/index/indexer.rs b/crates/core/src/index/indexer.rs index 467595c16..087e56545 100644 --- a/crates/core/src/index/indexer.rs +++ b/crates/core/src/index/indexer.rs @@ -7,9 +7,7 @@ use std::{ use log::warn; use crate::{ - backend::decrypt::DecryptWriteBackend, blob::BlobId, - error::RusticResult, repofile::indexfile::{IndexFile, IndexPack}, }; @@ -22,16 +20,9 @@ pub(super) mod constants { pub(super) const MAX_AGE: Duration = Duration::from_secs(300); } -pub(crate) type SharedIndexer = Arc>>; - /// The `Indexer` is responsible for indexing blobs. #[derive(Debug)] -pub struct Indexer -where - BE: DecryptWriteBackend, -{ - /// The backend to write to. - be: BE, +pub struct Indexer { /// The index file. file: IndexFile, /// The number of blobs indexed. @@ -42,7 +33,7 @@ where indexed: Option>, } -impl Indexer { +impl Indexer { /// Creates a new `Indexer`. /// /// # Type Parameters @@ -52,9 +43,8 @@ impl Indexer { /// # Arguments /// /// * `be` - The backend to write to. - pub fn new(be: BE) -> Self { + pub fn new() -> Self { Self { - be, file: IndexFile::default(), count: 0, created: SystemTime::now(), @@ -71,9 +61,8 @@ impl Indexer { /// # Arguments /// /// * `be` - The backend to write to. - pub fn new_unindexed(be: BE) -> Self { + pub fn new_unindexed() -> Self { Self { - be, file: IndexFile::default(), count: 0, created: SystemTime::now(), @@ -81,20 +70,13 @@ impl Indexer { } } - /// Resets the indexer. - pub fn reset(&mut self) { - self.file = IndexFile::default(); - self.count = 0; - self.created = SystemTime::now(); - } - /// Returns a `SharedIndexer` to use in multiple threads. /// /// # Type Parameters /// /// * `BE` - The backend type. - pub fn into_shared(self) -> SharedIndexer { - Arc::new(RwLock::new(self)) + pub fn into_shared(self) -> SharedIndexer { + SharedIndexer(Arc::new(RwLock::new(self))) } /// Finalizes the `Indexer`. @@ -102,7 +84,23 @@ impl Indexer { /// # Errors /// /// * If the index file could not be serialized. - pub fn finalize(&self) -> RusticResult<()> { + pub fn finalize(mut self) -> Option { + self.save() + } + + pub fn needs_save(&self) -> bool { + // check if IndexFile needs to be saved + let elapsed = self.created.elapsed().unwrap_or_else(|err| { + warn!("couldn't get elapsed time from system time: {err:?}"); + Duration::ZERO + }); + self.count >= constants::MAX_COUNT || elapsed >= constants::MAX_AGE + } + + pub fn save_if_needed(&mut self) -> Option { + if !self.needs_save() { + return None; + } self.save() } @@ -111,11 +109,14 @@ impl Indexer { /// # Errors /// /// * If the index file could not be serialized. - pub fn save(&self) -> RusticResult<()> { - if (self.file.packs.len() + self.file.packs_to_delete.len()) > 0 { - _ = self.be.save_file(&self.file)?; + pub fn save(&mut self) -> Option { + if self.file.packs.is_empty() && self.file.packs_to_delete.is_empty() { + return None; } - Ok(()) + let file = std::mem::take(&mut self.file); + self.count = 0; + self.created = SystemTime::now(); + Some(file) } /// Adds a pack to the `Indexer`. @@ -127,8 +128,8 @@ impl Indexer { /// # Errors /// /// * If the index file could not be serialized. - pub fn add(&mut self, pack: IndexPack) -> RusticResult<()> { - self.add_with(pack, false) + pub fn add(&mut self, pack: IndexPack) { + self.add_with(pack, false); } /// Adds a pack to the `Indexer` and removes it from the backend. @@ -140,8 +141,8 @@ impl Indexer { /// # Errors /// /// * If the index file could not be serialized. - pub fn add_remove(&mut self, pack: IndexPack) -> RusticResult<()> { - self.add_with(pack, true) + pub fn add_remove(&mut self, pack: IndexPack) { + self.add_with(pack, true); } /// Adds a pack to the `Indexer`. @@ -154,7 +155,7 @@ impl Indexer { /// # Errors /// /// * If the index file could not be serialized. - pub fn add_with(&mut self, pack: IndexPack, delete: bool) -> RusticResult<()> { + pub fn add_with(&mut self, pack: IndexPack, delete: bool) { self.count += pack.blobs.len(); if let Some(indexed) = &mut self.indexed { @@ -164,27 +165,63 @@ impl Indexer { } self.file.add(pack, delete); - - // check if IndexFile needs to be saved - let elapsed = self.created.elapsed().unwrap_or_else(|err| { - warn!("couldn't get elapsed time from system time: {err:?}"); - Duration::ZERO - }); - if self.count >= constants::MAX_COUNT || elapsed >= constants::MAX_AGE { - self.save()?; - self.reset(); - } - Ok(()) } /// Returns whether the given id is indexed. - /// + /// /// # Arguments /// /// * `id` - The id to check. - pub fn has(&self, id: &BlobId) -> bool { + pub fn has(&mut self, id: &BlobId) -> bool { self.indexed - .as_ref() + .as_mut() .is_some_and(|indexed| indexed.contains(id)) } + + /// Reserve an id for saving in the index. Returns whether the id was newly reseved, i.e. was not already present. + /// + /// # Arguments + /// + /// * `id` - The id to check. + pub fn reserve(&mut self, id: &BlobId) -> bool { + self.indexed + .as_mut() + .is_none_or(|indexed| indexed.insert(*id)) + } +} + +#[derive(Debug, Clone)] +pub(crate) struct SharedIndexer(Arc>); + +impl SharedIndexer { + pub fn add_and_check_save( + &self, + pack: IndexPack, + delete: bool, + writer: impl Fn(&IndexFile) -> Result<(), E>, + ) -> Result<(), E> { + let mut indexer = self.0.write().unwrap(); + indexer.add_with(pack, delete); + let res = indexer.save_if_needed(); + drop(indexer); + + res.as_ref().map_or(Ok(()), writer) + } + + pub fn finalize_and_check_save( + self, + writer: impl Fn(&IndexFile) -> Result<(), E>, + ) -> Result<(), E> { + let res = Arc::try_unwrap(self.0) + .expect("indexer still in use") + .into_inner() + .unwrap() + .finalize(); + + res.as_ref().map_or(Ok(()), writer) + } + + pub fn reserve(&self, id: &BlobId) -> bool { + self.0.write().unwrap().reserve(id) + } } diff --git a/crates/core/src/repofile/configfile.rs b/crates/core/src/repofile/configfile.rs index 52e4d11b4..33e0883ce 100644 --- a/crates/core/src/repofile/configfile.rs +++ b/crates/core/src/repofile/configfile.rs @@ -50,6 +50,11 @@ pub struct ConfigFile { /// The chunker polynomial used to chunk data pub chunker_polynomial: String, + /// Marker if this repository uses padding for pack files to mitigate chunking attacks. + /// + /// If not set, uses padding + pub use_pack_padding: Option, + /// Marker if this is a hot repository. If not set, this is no hot repository /// /// # Note @@ -166,6 +171,12 @@ impl ConfigFile { } } + /// Determine whether to use pack padding + #[must_use] + pub fn use_pack_padding(&self) -> bool { + self.use_pack_padding.unwrap_or(true) + } + /// Get whether an extra verification (decompressing/decrypting data before writing to the repository) should be performed. #[must_use] pub fn extra_verify(&self) -> bool { diff --git a/crates/core/src/repofile/packfile.rs b/crates/core/src/repofile/packfile.rs index 916bf6943..98397e8bc 100644 --- a/crates/core/src/repofile/packfile.rs +++ b/crates/core/src/repofile/packfile.rs @@ -26,13 +26,13 @@ pub(crate) type PackFileResult = Result; impl_repoid!(PackId, FileType::Pack); -pub(super) mod constants { +pub(crate) mod constants { // 32 equals the size of the crypto overhead // TODO: use from crypto mod /// The overhead of compression and encryption - pub(super) const COMP_OVERHEAD: u32 = 32; + pub(crate) const COMP_OVERHEAD: u32 = 32; /// The length of the length field within the pack header - pub(super) const LENGTH_LEN: u32 = 4; + pub(crate) const LENGTH_LEN: u32 = 4; } /// The length field within the pack header (which is the total length of the pack header) @@ -131,7 +131,7 @@ pub enum HeaderEntry { impl HeaderEntry { /// The length of an uncompressed header entry - const ENTRY_LEN: u32 = 37; + pub(crate) const ENTRY_LEN: u32 = 37; /// The length of a compressed header entry pub(crate) const ENTRY_LEN_COMPRESSED: u32 = 41; diff --git a/crates/core/tests/integration/backup.rs b/crates/core/tests/integration/backup.rs index 8c19be0d1..c266db447 100644 --- a/crates/core/tests/integration/backup.rs +++ b/crates/core/tests/integration/backup.rs @@ -9,8 +9,8 @@ use pretty_assertions::assert_eq; use rstest::rstest; use rustic_core::{ - BackupOptions, CommandInput, ParentOptions, PathList, SnapshotGroupCriterion, SnapshotOptions, - StringList, + BackupOptions, CommandInput, ConfigOptions, ParentOptions, PathList, SnapshotGroupCriterion, + SnapshotOptions, StringList, repofile::{PackId, SnapshotFile}, }; @@ -30,10 +30,13 @@ fn test_backup_with_tar_gz_passes( // SimpleLogger::init(log::LevelFilter::Debug, Config::default())?; // Fixtures - let (source, repo) = (tar_gz_testdata?, set_up_repo?.to_indexed_ids()?); - + let (source, mut repo) = (tar_gz_testdata?, set_up_repo?.to_indexed_ids()?); let paths = &source.path_list(); + // we don't use padding in this test as we want to compare packsize with insta! + let opts = ConfigOptions::default().set_use_pack_padding(false); + assert!(repo.apply_config(&opts)?); + // we use as_path to not depend on the actual tempdir let opts = BackupOptions::default().as_path(PathBuf::from_str("test")?); @@ -154,9 +157,10 @@ fn test_backup_dry_run_with_tar_gz_passes( ) -> Result<()> { // Fixtures let (source, repo) = (tar_gz_testdata?, set_up_repo?.to_indexed_ids()?); - let paths = &source.path_list(); + // Note: padding is enabled (automatically) for this test! + // we use as_path to not depend on the actual tempdir let opts = BackupOptions::default() .as_path(PathBuf::from_str("test")?) @@ -222,9 +226,13 @@ fn test_backup_stdin_command( insta_snapshotfile_redaction: Settings, ) -> Result<()> { // Fixtures - let repo = set_up_repo?.to_indexed_ids()?; + let mut repo = set_up_repo?.to_indexed_ids()?; let paths = PathList::from_string("-")?; + // we don't use padding in this test as we want to compare packsize with insta! + let opts = ConfigOptions::default().set_use_pack_padding(false); + assert!(repo.apply_config(&opts)?); + let cmd: CommandInput = "echo test".parse()?; let opts = BackupOptions::default() .stdin_filename("test") diff --git a/crates/core/tests/snapshots/integration__backup-tar-groups-nix.snap b/crates/core/tests/snapshots/integration__backup-tar-groups-nix.snap index d3ca3f2fb..b17a2b427 100644 --- a/crates/core/tests/snapshots/integration__backup-tar-groups-nix.snap +++ b/crates/core/tests/snapshots/integration__backup-tar-groups-nix.snap @@ -35,7 +35,7 @@ expression: snap data_added: "[data_added]", data_added_packed: "[data_added_packed]", data_added_files: 1125653, - data_added_files_packed: 78740, + data_added_files_packed: 81646, data_added_trees: "[data_added_trees]", data_added_trees_packed: "[data_added_trees_packed]", command: "[command]", diff --git a/crates/core/tests/snapshots/integration__backup-tar-groups-windows.snap b/crates/core/tests/snapshots/integration__backup-tar-groups-windows.snap index 5e2bb4018..083d08819 100644 --- a/crates/core/tests/snapshots/integration__backup-tar-groups-windows.snap +++ b/crates/core/tests/snapshots/integration__backup-tar-groups-windows.snap @@ -35,7 +35,7 @@ expression: snap data_added: "[data_added]", data_added_packed: "[data_added_packed]", data_added_files: 1125653, - data_added_files_packed: 78740, + data_added_files_packed: 81646, data_added_trees: "[data_added_trees]", data_added_trees_packed: "[data_added_trees_packed]", command: "[command]", diff --git a/crates/core/tests/snapshots/integration__backup-tar-summary-first-nix.snap b/crates/core/tests/snapshots/integration__backup-tar-summary-first-nix.snap index 4280c5511..bb70a3cc3 100644 --- a/crates/core/tests/snapshots/integration__backup-tar-summary-first-nix.snap +++ b/crates/core/tests/snapshots/integration__backup-tar-summary-first-nix.snap @@ -30,7 +30,7 @@ SnapshotFile( data_added: "[data_added]", data_added_packed: "[data_added_packed]", data_added_files: 1125653, - data_added_files_packed: 78740, + data_added_files_packed: 81646, data_added_trees: "[data_added_trees]", data_added_trees_packed: "[data_added_trees_packed]", command: "[command]", diff --git a/crates/core/tests/snapshots/integration__backup-tar-summary-first-windows.snap b/crates/core/tests/snapshots/integration__backup-tar-summary-first-windows.snap index 4280c5511..bb70a3cc3 100644 --- a/crates/core/tests/snapshots/integration__backup-tar-summary-first-windows.snap +++ b/crates/core/tests/snapshots/integration__backup-tar-summary-first-windows.snap @@ -30,7 +30,7 @@ SnapshotFile( data_added: "[data_added]", data_added_packed: "[data_added_packed]", data_added_files: 1125653, - data_added_files_packed: 78740, + data_added_files_packed: 81646, data_added_trees: "[data_added_trees]", data_added_trees_packed: "[data_added_trees_packed]", command: "[command]", diff --git a/crates/core/tests/snapshots/integration__dryrun-tar-summary-first-nix.snap b/crates/core/tests/snapshots/integration__dryrun-tar-summary-first-nix.snap index 8abb966ad..a851c5fc4 100644 --- a/crates/core/tests/snapshots/integration__dryrun-tar-summary-first-nix.snap +++ b/crates/core/tests/snapshots/integration__dryrun-tar-summary-first-nix.snap @@ -30,7 +30,7 @@ SnapshotFile( data_added: "[data_added]", data_added_packed: "[data_added_packed]", data_added_files: 1125653, - data_added_files_packed: 78740, + data_added_files_packed: 131072, data_added_trees: "[data_added_trees]", data_added_trees_packed: "[data_added_trees_packed]", command: "[command]", diff --git a/crates/core/tests/snapshots/integration__dryrun-tar-summary-first-windows.snap b/crates/core/tests/snapshots/integration__dryrun-tar-summary-first-windows.snap index 8abb966ad..a851c5fc4 100644 --- a/crates/core/tests/snapshots/integration__dryrun-tar-summary-first-windows.snap +++ b/crates/core/tests/snapshots/integration__dryrun-tar-summary-first-windows.snap @@ -30,7 +30,7 @@ SnapshotFile( data_added: "[data_added]", data_added_packed: "[data_added_packed]", data_added_files: 1125653, - data_added_files_packed: 78740, + data_added_files_packed: 131072, data_added_trees: "[data_added_trees]", data_added_trees_packed: "[data_added_trees_packed]", command: "[command]", diff --git a/crates/core/tests/snapshots/integration__stdin-command-summary-nix.snap b/crates/core/tests/snapshots/integration__stdin-command-summary-nix.snap index e837cecb6..31a7eb180 100644 --- a/crates/core/tests/snapshots/integration__stdin-command-summary-nix.snap +++ b/crates/core/tests/snapshots/integration__stdin-command-summary-nix.snap @@ -30,7 +30,7 @@ SnapshotFile( data_added: "[data_added]", data_added_packed: "[data_added_packed]", data_added_files: 5, - data_added_files_packed: 46, + data_added_files_packed: 123, data_added_trees: "[data_added_trees]", data_added_trees_packed: "[data_added_trees_packed]", command: "[command]", diff --git a/crates/core/tests/snapshots/integration__stdin-command-summary-windows.snap b/crates/core/tests/snapshots/integration__stdin-command-summary-windows.snap index e837cecb6..31a7eb180 100644 --- a/crates/core/tests/snapshots/integration__stdin-command-summary-windows.snap +++ b/crates/core/tests/snapshots/integration__stdin-command-summary-windows.snap @@ -30,7 +30,7 @@ SnapshotFile( data_added: "[data_added]", data_added_packed: "[data_added_packed]", data_added_files: 5, - data_added_files_packed: 46, + data_added_files_packed: 123, data_added_trees: "[data_added_trees]", data_added_trees_packed: "[data_added_trees_packed]", command: "[command]",