From f6e8bf0bfe25629d8431cace4d355e9c270931f8 Mon Sep 17 00:00:00 2001 From: Rick-29 Date: Tue, 28 Apr 2026 14:22:01 +0200 Subject: [PATCH 1/5] added support for custom configs on streaming structs --- src/streaming/decoder.rs | 25 +++++++++++++++-- src/streaming/encoder.rs | 43 +++++++++++++++++++++++++++-- tests/compression_advanced3_test.rs | 2 ++ 3 files changed, 66 insertions(+), 4 deletions(-) diff --git a/src/streaming/decoder.rs b/src/streaming/decoder.rs index f135b08..94b310c 100644 --- a/src/streaming/decoder.rs +++ b/src/streaming/decoder.rs @@ -2,8 +2,11 @@ use super::chunk::ChunkHeader; use super::StreamingProgress; +#[cfg(feature = "std")] +use crate::config::{LittleEndian, NoLimit, Varint}; use crate::de::{Decode, DecoderImpl, SliceReader}; use crate::{config, Error, Result}; +use crate::config::{Config, Configuration}; #[cfg(feature = "alloc")] extern crate alloc; @@ -17,8 +20,9 @@ use std::io::Read; /// allowing processing of very large streams without loading /// everything into memory. #[cfg(feature = "std")] -pub struct StreamingDecoder { +pub struct StreamingDecoder> { reader: R, + config: C, current_chunk: Option, progress: StreamingProgress, finished: bool, @@ -37,12 +41,29 @@ impl StreamingDecoder { pub fn new(reader: R) -> Self { Self { reader, + config: config::standard(), current_chunk: None, progress: StreamingProgress::default(), finished: false, } } +} +#[cfg(feature = "std")] +impl StreamingDecoder { + /// Create a new streaming decoder with custom configuration. + /// Allows to create a decoder with specific endianness, integer encoding, and byte limits. + /// Really usefull for some performance readings (for example when reading the element number n of a p bit object) or to be compatible with some specific encoders. + pub fn new_with(reader: R, config: C) -> Self { + Self { + reader, + config, + current_chunk: None, + progress: StreamingProgress::default(), + finished: false, + } + } + /// Read the next item from the stream. /// /// Returns `None` when the stream is exhausted. @@ -73,7 +94,7 @@ impl StreamingDecoder { // Create reader from remaining chunk data let reader = SliceReader::new(&chunk.data[chunk.offset..]); - let mut decoder = DecoderImpl::new(reader, config::standard()); + let mut decoder = DecoderImpl::new(reader, self.config); let item = T::decode(&mut decoder)?; // Update offset based on how much was read diff --git a/src/streaming/encoder.rs b/src/streaming/encoder.rs index 19a3f76..7e0fbbf 100644 --- a/src/streaming/encoder.rs +++ b/src/streaming/encoder.rs @@ -2,6 +2,9 @@ use super::chunk::ChunkHeader; use super::{StreamingConfig, StreamingProgress}; +use crate::config::Config; +#[cfg(feature = "std")] +use crate::config::{Configuration, LittleEndian, NoLimit, Varint}; use crate::enc::{Encode, EncoderImpl, VecWriter}; use crate::{config, Result}; @@ -22,8 +25,9 @@ use std::io::Write; /// This allows encoding very large collections without loading /// everything into memory at once. #[cfg(feature = "std")] -pub struct StreamingEncoder { +pub struct StreamingEncoder> { writer: W, + encoding_config: C, config: StreamingConfig, buffer: alloc::vec::Vec, items_in_buffer: u32, @@ -42,6 +46,7 @@ impl StreamingEncoder { pub fn with_config(writer: W, config: StreamingConfig) -> Self { Self { writer, + encoding_config: config::standard(), config, buffer: alloc::vec::Vec::new(), items_in_buffer: 0, @@ -49,7 +54,41 @@ impl StreamingEncoder { progress_callback: None, } } +} +#[cfg(feature = "std")] +impl StreamingEncoder { + /// Create a streaming encoder with custom encoding configuration. + /// The chunking configuration is still taken from the default `StreamingConfig`. + /// This allows you to use a custom encoding configuration while still using the default chunking behavior. + pub fn new_with(writer: W, encoding_config: C) -> Self { + Self { + writer, + encoding_config, + config: StreamingConfig::default(), + buffer: alloc::vec::Vec::new(), + items_in_buffer: 0, + progress: StreamingProgress::default(), + progress_callback: None, + } + } + + /// Create a streaming encoder with custom encoding and chunking configuration. + /// This allows you to fully customize both the encoding behavior and the chunking behavior. + /// The `encoding_config` is used for encoding individual items, while the `config` is used for controlling how items are buffered and when chunks are flushed. + /// For example, you could use a custom encoding configuration that uses a different endianness or varint encoding, while still using the default chunking behavior. + /// Or you could use a custom chunking configuration that flushes after every item, while still using the default encoding configuration. + pub fn new_with_config(writer: W, encoding_config: C, config: StreamingConfig) -> Self { + Self { + writer, + encoding_config, + config, + buffer: alloc::vec::Vec::new(), + items_in_buffer: 0, + progress: StreamingProgress::default(), + progress_callback: None, + } + } /// Set a progress callback. pub fn with_progress_callback(mut self, callback: ProgressCallback) -> Self { self.progress_callback = Some(callback); @@ -65,7 +104,7 @@ impl StreamingEncoder { pub fn write_item(&mut self, item: &T) -> Result<()> { // Encode item to temporary buffer let item_writer = VecWriter::new(); - let mut encoder = EncoderImpl::new(item_writer, config::standard()); + let mut encoder = EncoderImpl::new(item_writer, self.encoding_config); item.encode(&mut encoder)?; let item_bytes = encoder.into_writer().into_vec(); diff --git a/tests/compression_advanced3_test.rs b/tests/compression_advanced3_test.rs index 74dd99e..a7547ee 100644 --- a/tests/compression_advanced3_test.rs +++ b/tests/compression_advanced3_test.rs @@ -332,6 +332,7 @@ fn test_adv3_lz4_double_nested_compress_roundtrip() { // Test 12 – CompressionStats default() yields all-zero fields // ───────────────────────────────────────────────────────────────────────────── +#[cfg(any(feature = "compression-lz4", feature = "compression-zstd"))] #[test] fn test_adv3_compression_stats_default_is_zero() { use oxicode::compression::CompressionStats; @@ -361,6 +362,7 @@ fn test_adv3_compression_stats_default_is_zero() { // Test 13 – CompressionStats when compressed_size > original_size (expansion) // ───────────────────────────────────────────────────────────────────────────── +#[cfg(any(feature = "compression-lz4", feature = "compression-zstd"))] #[test] fn test_adv3_compression_stats_expansion_scenario() { use oxicode::compression::CompressionStats; From 6b36b81ccd0301d5e48b3f80dace6740897e22a7 Mon Sep 17 00:00:00 2001 From: Rick-29 Date: Wed, 29 Apr 2026 18:04:56 +0200 Subject: [PATCH 2/5] added flat streaming --- src/flat/decoder.rs | 74 +++++++++++++++++++++++++++++++++++++++++++++ src/flat/encoder.rs | 73 ++++++++++++++++++++++++++++++++++++++++++++ src/flat/mod.rs | 48 +++++++++++++++++++++++++++++ src/lib.rs | 4 +++ 4 files changed, 199 insertions(+) create mode 100644 src/flat/decoder.rs create mode 100644 src/flat/encoder.rs create mode 100644 src/flat/mod.rs diff --git a/src/flat/decoder.rs b/src/flat/decoder.rs new file mode 100644 index 0000000..213df31 --- /dev/null +++ b/src/flat/decoder.rs @@ -0,0 +1,74 @@ +use std::io::{Read, Seek, SeekFrom}; + +use crate::{Decode, Error, config::{self, Config, Configuration, Fixint, LittleEndian, NoLimit}}; + +/// Reads fixed-size decoded items from a flat byte stream. +/// +/// `FlatDecoder` expects the input to be laid out as a plain sequence of +/// `ITEM_SIZE`-byte records. There are no headers or chunk markers, so the +/// caller can seek directly to `ITEM_SIZE * n` to load item `n`. +pub struct FlatDecoder> { + reader: R, + config: C, +} + +impl FlatDecoder { + /// Create a flat decoder using the crate's standard fixed-int configuration. + pub fn new(reader: R) -> Self { + Self::new_with(reader, config::standard().with_fixed_int_encoding()) + } +} + +impl FlatDecoder { + /// Create a flat decoder with a custom decoding configuration. + pub fn new_with(reader: R, config: C) -> Self { + Self { reader, config } + } + + /// Read and decode the next fixed-size item from the stream. + /// + /// Returns `Ok(None)` on EOF. Partial trailing records are reported as an + /// error because the stream length must be a multiple of `ITEM_SIZE`. + pub fn read_item(&mut self) -> Result, Error> { + let mut buf = vec![0u8; ITEM_SIZE]; + let count = self.reader.read(&mut buf)?; + if count == 0 { + return Ok(None); // EOF + } else if count < ITEM_SIZE { + let msg = format!("Unexpected end of file: read {} bytes, expected {}", count, ITEM_SIZE); + return Err(Error::OwnedCustom { message: msg }); + } + let (item, _) = crate::decode_from_slice_with_config(&buf, self.config)?; + Ok(Some(item)) + } + + /// Read and decode all remaining fixed-size items. + pub fn read_all(&mut self) -> Result, Error> { + let mut items = Vec::new(); + let mut buf = vec![0u8; ITEM_SIZE]; + while let Ok(amount) = self.reader.read(&mut buf) { + if amount == 0 { + break; // EOF + } else if amount < ITEM_SIZE { + let msg = format!("Unexpected end of file: read {} bytes, expected {}", amount, ITEM_SIZE); + return Err(Error::OwnedCustom { message: msg }); + } + let (item, _) = crate::decode_from_slice_with_config(&buf, self.config)?; + items.push(item); + } + Ok(items) + } + + /// Borrow the wrapped reader. + pub fn get_ref(&self) -> &R { + &self.reader + } + + /// Seek the underlying reader. + /// + /// For random access, callers can seek to `SeekFrom::Start((ITEM_SIZE * n) as u64)` + /// to position the decoder at item `n`. + pub fn seek(&mut self, from: SeekFrom) -> Result { + Ok(self.reader.seek(from)?) + } +} \ No newline at end of file diff --git a/src/flat/encoder.rs b/src/flat/encoder.rs new file mode 100644 index 0000000..2b6a3eb --- /dev/null +++ b/src/flat/encoder.rs @@ -0,0 +1,73 @@ +use std::io::Write; + +use crate::{Encode, Error, config::{self, Config, Configuration, Fixint, LittleEndian, NoLimit}}; + + +/// Writes fixed-size encoded items as a flat byte stream. +/// +/// `FlatEncoder` is intended for data types whose binary representation is +/// always exactly `ITEM_SIZE` bytes. It writes each encoded value directly to +/// the underlying writer without chunking, headers, or extra framing. +/// +/// Because every item has the same size, the resulting file length is always +/// `ITEM_SIZE * n_items`, which makes direct record access straightforward. +pub struct FlatEncoder> { + writer: W, + config: C, +} + +impl FlatEncoder { + /// Create a flat encoder using the crate's standard fixed-int configuration. + pub fn new(writer: W) -> Self { + Self::new_with(writer, config::standard().with_fixed_int_encoding()) + } +} + +impl FlatEncoder { + /// Create a flat encoder with a custom encoding configuration. + pub fn new_with(writer: W, config: C) -> Self { + Self { writer, config } + } + + /// Encode one item and append it to the underlying writer. + /// + /// The encoded byte length is validated in debug builds to match + /// `ITEM_SIZE`. + pub fn write_item(&mut self, item: &T) -> Result<(), Error> { + let buf = crate::encode_to_vec_with_config(item, self.config)?; + self.writer.write_all(buf.as_slice())?; + debug_assert_eq!( + buf.len(), ITEM_SIZE, + "encoded {} bytes, expected {}. Check impl_flat_encodable!", + buf.len(), ITEM_SIZE + ); + + Ok(()) + } + + /// Encode and write all items from an iterator. + pub fn write_all>(&mut self, items: I) -> Result<(), Error> { + for item in items { + self.write_item(&item)?; + } + Ok(()) + } + + /// Flush the underlying writer. + pub fn flush(&mut self) -> Result<(), Error> { + self.writer.flush()?; + Ok(()) + } + + /// Flush pending output and return the wrapped writer. + pub fn finish(mut self) -> Result { + self.flush()?; + Ok(self.writer) + } + + /// Borrow the wrapped writer. + pub fn get_ref(&self) -> &W { + &self.writer + } +} + diff --git a/src/flat/mod.rs b/src/flat/mod.rs new file mode 100644 index 0000000..034d9c1 --- /dev/null +++ b/src/flat/mod.rs @@ -0,0 +1,48 @@ +//! Flat, fixed-size binary serialization helpers. +//! +//! The `flat` module is the lowest-overhead binary layout in OxiCode. It is +//! designed for types that always encode to the same number of bytes, so the +//! output is just a contiguous sequence of items with no chunk headers, length +//! prefixes, or other framing data. +//! +//! That means the size of a flat file is exactly `ITEM_SIZE * n_items`, and the +//! byte offset of item `n` can be computed directly as `ITEM_SIZE * n`. This +//! makes random access and direct loading of fixed-size records effectively +//! `O(1)` once the item size is known. +//! +//! # Overview +//! +//! - [`FlatEncoder`] writes fixed-size items directly to any `Write` target. +//! - [`FlatDecoder`] reads fixed-size items directly from any `Read + Seek` target. +//! +//! # Example +//! +//! ```rust,ignore +//! use oxicode::flat::{FlatDecoder, FlatEncoder}; +//! use std::fs::File; +//! use std::io::SeekFrom; +//! +//! const ITEM_SIZE: usize = 16; +//! +//! # fn example() -> Result<(), Box> { +//! let file = File::create("items.bin")?; +//! let mut encoder = FlatEncoder::<_, ITEM_SIZE>::new(file); +//! encoder.write_item(&123u64)?; +//! encoder.write_item(&456u64)?; +//! let file = encoder.finish()?; +//! +//! let mut decoder = FlatDecoder::<_, ITEM_SIZE>::new(file); +//! let first: Option = decoder.read_item()?; +//! decoder.seek(SeekFrom::Start((ITEM_SIZE * 1) as u64))?; +//! let second: Option = decoder.read_item()?; +//! # Ok(()) +//! # } +//! ``` + +/// Flat encoder implementation. +pub mod encoder; +/// Flat decoder implementation. +pub mod decoder; + +pub use encoder::FlatEncoder; +pub use decoder::FlatDecoder; \ No newline at end of file diff --git a/src/lib.rs b/src/lib.rs index 458b332..b79c6bf 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -83,6 +83,10 @@ pub mod simd; #[cfg(any(feature = "compression-lz4", feature = "compression-zstd"))] pub mod compression; +#[cfg(feature = "alloc")] +pub mod flat; +#[cfg(feature = "alloc")] +pub use flat::{FlatDecoder, FlatEncoder}; // Schema versioning support pub mod versioning; From 314ad8bbf78e920687fbf58552f93fc76b68f7c5 Mon Sep 17 00:00:00 2001 From: Rick-29 Date: Wed, 29 Apr 2026 18:56:04 +0200 Subject: [PATCH 3/5] added flat streaming --- src/flat/decoder.rs | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/src/flat/decoder.rs b/src/flat/decoder.rs index 213df31..ebbbf50 100644 --- a/src/flat/decoder.rs +++ b/src/flat/decoder.rs @@ -71,4 +71,19 @@ impl FlatDecoder Result { Ok(self.reader.seek(from)?) } + + /// To not get errors + pub fn get(&mut self, idx: usize) -> Result { + let current = self.seek(SeekFrom::Current(0))?; + + self.seek(SeekFrom::Start((idx * ITEM_SIZE) as u64))?; + self.read_item()?.ok_or_else(|| LoaderError::Decode(format!("Unexpected end of file at index {}", idx))) + + // seek back to original position + .and_then(|item| { + self.seek(SeekFrom::Start(current))?; + Ok(item) + }) + } + } \ No newline at end of file From 0f02fe171ac9e86bd527233b2acce4711e8a77fa Mon Sep 17 00:00:00 2001 From: Rick-29 Date: Wed, 29 Apr 2026 18:57:19 +0200 Subject: [PATCH 4/5] added flat streaming --- src/flat/decoder.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/flat/decoder.rs b/src/flat/decoder.rs index ebbbf50..849d764 100644 --- a/src/flat/decoder.rs +++ b/src/flat/decoder.rs @@ -73,11 +73,11 @@ impl FlatDecoder(&mut self, idx: usize) -> Result { + pub fn get(&mut self, idx: usize) -> Result { let current = self.seek(SeekFrom::Current(0))?; self.seek(SeekFrom::Start((idx * ITEM_SIZE) as u64))?; - self.read_item()?.ok_or_else(|| LoaderError::Decode(format!("Unexpected end of file at index {}", idx))) + self.read_item()?.ok_or_else(|| Error::OwnedCustom {message: format!("Unexpected end of file at index {}", idx)}) // seek back to original position .and_then(|item| { From 65ebb39710cded25eb0f7cbe457aeabab4bae232 Mon Sep 17 00:00:00 2001 From: Rick-29 Date: Wed, 29 Apr 2026 21:47:13 +0200 Subject: [PATCH 5/5] fixed FlatDecoder error --- src/flat/decoder.rs | 34 +++++++++++++++++++--------------- 1 file changed, 19 insertions(+), 15 deletions(-) diff --git a/src/flat/decoder.rs b/src/flat/decoder.rs index 849d764..94b9e28 100644 --- a/src/flat/decoder.rs +++ b/src/flat/decoder.rs @@ -30,14 +30,17 @@ impl FlatDecoder(&mut self) -> Result, Error> { - let mut buf = vec![0u8; ITEM_SIZE]; - let count = self.reader.read(&mut buf)?; - if count == 0 { - return Ok(None); // EOF - } else if count < ITEM_SIZE { - let msg = format!("Unexpected end of file: read {} bytes, expected {}", count, ITEM_SIZE); - return Err(Error::OwnedCustom { message: msg }); + let mut buf = [0u8; ITEM_SIZE]; // stack array, not Vec + + // Peek first byte to distinguish EOF from mid-stream error + match self.reader.read(&mut buf[..1])? { + 0 => return Ok(None), // clean EOF + _ => {} } + + // Fill the rest — read_exact guarantees all ITEM_SIZE bytes or errors + self.reader.read_exact(&mut buf[1..])?; + let (item, _) = crate::decode_from_slice_with_config(&buf, self.config)?; Ok(Some(item)) } @@ -45,14 +48,13 @@ impl FlatDecoder(&mut self) -> Result, Error> { let mut items = Vec::new(); - let mut buf = vec![0u8; ITEM_SIZE]; - while let Ok(amount) = self.reader.read(&mut buf) { - if amount == 0 { - break; // EOF - } else if amount < ITEM_SIZE { - let msg = format!("Unexpected end of file: read {} bytes, expected {}", amount, ITEM_SIZE); - return Err(Error::OwnedCustom { message: msg }); + loop { + let mut buf = [0u8; ITEM_SIZE]; + match self.reader.read(&mut buf[..1])? { + 0 => break, + _ => {} } + self.reader.read_exact(&mut buf[1..])?; let (item, _) = crate::decode_from_slice_with_config(&buf, self.config)?; items.push(item); } @@ -72,7 +74,9 @@ impl FlatDecoder(&mut self, idx: usize) -> Result { let current = self.seek(SeekFrom::Current(0))?;