Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 93 additions & 0 deletions src/flat/decoder.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
use std::io::{Read, Seek, SeekFrom};

use crate::{Decode, Error, config::{self, Config, Configuration, Fixint, LittleEndian, NoLimit}};

/// Reads fixed-size decoded items from a flat byte stream.
///
/// `FlatDecoder` expects the input to be laid out as a plain sequence of
/// `ITEM_SIZE`-byte records. There are no headers or chunk markers, so the
/// caller can seek directly to `ITEM_SIZE * n` to load item `n`.
pub struct FlatDecoder<R: Read + Seek, const ITEM_SIZE: usize, C = Configuration<LittleEndian, Fixint, NoLimit>> {
reader: R,
config: C,
}

impl<R: Read + Seek, const ITEM_SIZE: usize> FlatDecoder<R, ITEM_SIZE> {
/// Create a flat decoder using the crate's standard fixed-int configuration.
pub fn new(reader: R) -> Self {
Self::new_with(reader, config::standard().with_fixed_int_encoding())
}
}

impl<R: Read + Seek, const ITEM_SIZE: usize, C: Config> FlatDecoder<R, ITEM_SIZE, C> {
/// Create a flat decoder with a custom decoding configuration.
pub fn new_with(reader: R, config: C) -> Self {
Self { reader, config }
}

/// Read and decode the next fixed-size item from the stream.
///
/// Returns `Ok(None)` on EOF. Partial trailing records are reported as an
/// error because the stream length must be a multiple of `ITEM_SIZE`.
pub fn read_item<T: Decode>(&mut self) -> Result<Option<T>, Error> {
let mut buf = [0u8; ITEM_SIZE]; // stack array, not Vec

// Peek first byte to distinguish EOF from mid-stream error
match self.reader.read(&mut buf[..1])? {
0 => return Ok(None), // clean EOF
_ => {}
}

// Fill the rest — read_exact guarantees all ITEM_SIZE bytes or errors
self.reader.read_exact(&mut buf[1..])?;

let (item, _) = crate::decode_from_slice_with_config(&buf, self.config)?;
Ok(Some(item))
}

/// Read and decode all remaining fixed-size items.
pub fn read_all<T: Decode>(&mut self) -> Result<Vec<T>, Error> {
let mut items = Vec::new();
loop {
let mut buf = [0u8; ITEM_SIZE];
match self.reader.read(&mut buf[..1])? {
0 => break,
_ => {}
}
self.reader.read_exact(&mut buf[1..])?;
let (item, _) = crate::decode_from_slice_with_config(&buf, self.config)?;
items.push(item);
}
Ok(items)
}

/// Borrow the wrapped reader.
pub fn get_ref(&self) -> &R {
&self.reader
}

/// Seek the underlying reader.
///
/// For random access, callers can seek to `SeekFrom::Start((ITEM_SIZE * n) as u64)`
/// to position the decoder at item `n`.
pub fn seek(&mut self, from: SeekFrom) -> Result<u64, Error> {
Ok(self.reader.seek(from)?)
}

/// Since all the Items have the same lenght, we can seek to the position of the item and read it directly.
/// This is more efficient than reading all the items up to the desired index.
/// Note that this method does not check if the index is out of bounds, so it may return an error if the index is too large.
pub fn get<T: Decode>(&mut self, idx: usize) -> Result<T, Error> {
let current = self.seek(SeekFrom::Current(0))?;

self.seek(SeekFrom::Start((idx * ITEM_SIZE) as u64))?;
self.read_item()?.ok_or_else(|| Error::OwnedCustom {message: format!("Unexpected end of file at index {}", idx)})

// seek back to original position
.and_then(|item| {
self.seek(SeekFrom::Start(current))?;
Ok(item)
})
}

}
73 changes: 73 additions & 0 deletions src/flat/encoder.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
use std::io::Write;

use crate::{Encode, Error, config::{self, Config, Configuration, Fixint, LittleEndian, NoLimit}};


/// Writes fixed-size encoded items as a flat byte stream.
///
/// `FlatEncoder` is intended for data types whose binary representation is
/// always exactly `ITEM_SIZE` bytes. It writes each encoded value directly to
/// the underlying writer without chunking, headers, or extra framing.
///
/// Because every item has the same size, the resulting file length is always
/// `ITEM_SIZE * n_items`, which makes direct record access straightforward.
pub struct FlatEncoder<W: Write, const ITEM_SIZE: usize, C = Configuration<LittleEndian, Fixint, NoLimit>> {
writer: W,
config: C,
}

impl<W: Write, const ITEM_SIZE: usize> FlatEncoder<W, ITEM_SIZE> {
/// Create a flat encoder using the crate's standard fixed-int configuration.
pub fn new(writer: W) -> Self {
Self::new_with(writer, config::standard().with_fixed_int_encoding())
}
}

impl<W: Write, const ITEM_SIZE: usize, C: Config> FlatEncoder<W, ITEM_SIZE, C> {
/// Create a flat encoder with a custom encoding configuration.
pub fn new_with(writer: W, config: C) -> Self {
Self { writer, config }
}

/// Encode one item and append it to the underlying writer.
///
/// The encoded byte length is validated in debug builds to match
/// `ITEM_SIZE`.
pub fn write_item<T: Encode>(&mut self, item: &T) -> Result<(), Error> {
let buf = crate::encode_to_vec_with_config(item, self.config)?;
self.writer.write_all(buf.as_slice())?;
debug_assert_eq!(
buf.len(), ITEM_SIZE,
"encoded {} bytes, expected {}. Check impl_flat_encodable!",
buf.len(), ITEM_SIZE
);

Ok(())
}

/// Encode and write all items from an iterator.
pub fn write_all<T: Encode, I: IntoIterator<Item = T>>(&mut self, items: I) -> Result<(), Error> {
for item in items {
self.write_item(&item)?;
}
Ok(())
}

/// Flush the underlying writer.
pub fn flush(&mut self) -> Result<(), Error> {
self.writer.flush()?;
Ok(())
}

/// Flush pending output and return the wrapped writer.
pub fn finish(mut self) -> Result<W, Error> {
self.flush()?;
Ok(self.writer)
}

/// Borrow the wrapped writer.
pub fn get_ref(&self) -> &W {
&self.writer
}
}

48 changes: 48 additions & 0 deletions src/flat/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
//! Flat, fixed-size binary serialization helpers.
//!
//! The `flat` module is the lowest-overhead binary layout in OxiCode. It is
//! designed for types that always encode to the same number of bytes, so the
//! output is just a contiguous sequence of items with no chunk headers, length
//! prefixes, or other framing data.
//!
//! That means the size of a flat file is exactly `ITEM_SIZE * n_items`, and the
//! byte offset of item `n` can be computed directly as `ITEM_SIZE * n`. This
//! makes random access and direct loading of fixed-size records effectively
//! `O(1)` once the item size is known.
//!
//! # Overview
//!
//! - [`FlatEncoder`] writes fixed-size items directly to any `Write` target.
//! - [`FlatDecoder`] reads fixed-size items directly from any `Read + Seek` target.
//!
//! # Example
//!
//! ```rust,ignore
//! use oxicode::flat::{FlatDecoder, FlatEncoder};
//! use std::fs::File;
//! use std::io::SeekFrom;
//!
//! const ITEM_SIZE: usize = 16;
//!
//! # fn example() -> Result<(), Box<dyn std::error::Error>> {
//! let file = File::create("items.bin")?;
//! let mut encoder = FlatEncoder::<_, ITEM_SIZE>::new(file);
//! encoder.write_item(&123u64)?;
//! encoder.write_item(&456u64)?;
//! let file = encoder.finish()?;
//!
//! let mut decoder = FlatDecoder::<_, ITEM_SIZE>::new(file);
//! let first: Option<u64> = decoder.read_item()?;
//! decoder.seek(SeekFrom::Start((ITEM_SIZE * 1) as u64))?;
//! let second: Option<u64> = decoder.read_item()?;
//! # Ok(())
//! # }
//! ```

/// Flat encoder implementation.
pub mod encoder;
/// Flat decoder implementation.
pub mod decoder;

pub use encoder::FlatEncoder;
pub use decoder::FlatDecoder;
4 changes: 4 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ pub mod simd;
#[cfg(any(feature = "compression-lz4", feature = "compression-zstd"))]
pub mod compression;

#[cfg(feature = "alloc")]
pub mod flat;
#[cfg(feature = "alloc")]
pub use flat::{FlatDecoder, FlatEncoder};
// Schema versioning support
pub mod versioning;

Expand Down
25 changes: 23 additions & 2 deletions src/streaming/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@

use super::chunk::ChunkHeader;
use super::StreamingProgress;
#[cfg(feature = "std")]
use crate::config::{LittleEndian, NoLimit, Varint};
use crate::de::{Decode, DecoderImpl, SliceReader};
use crate::{config, Error, Result};
use crate::config::{Config, Configuration};

#[cfg(feature = "alloc")]
extern crate alloc;
Expand All @@ -17,8 +20,9 @@ use std::io::Read;
/// allowing processing of very large streams without loading
/// everything into memory.
#[cfg(feature = "std")]
pub struct StreamingDecoder<R: Read> {
pub struct StreamingDecoder<R: Read, C = Configuration<LittleEndian, Varint, NoLimit>> {
reader: R,
config: C,
current_chunk: Option<ChunkData>,
progress: StreamingProgress,
finished: bool,
Expand All @@ -37,12 +41,29 @@ impl<R: Read> StreamingDecoder<R> {
pub fn new(reader: R) -> Self {
Self {
reader,
config: config::standard(),
current_chunk: None,
progress: StreamingProgress::default(),
finished: false,
}
}
}

#[cfg(feature = "std")]
impl<R: Read, C: Config> StreamingDecoder<R, C> {
/// Create a new streaming decoder with custom configuration.
/// Allows to create a decoder with specific endianness, integer encoding, and byte limits.
/// Really usefull for some performance readings (for example when reading the element number n of a p bit object) or to be compatible with some specific encoders.
pub fn new_with(reader: R, config: C) -> Self {
Comment on lines +54 to +57
Copy link

Copilot AI Apr 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spelling/grammar in doc comments: 'Allows to create' -> 'Allows creating', 'usefull' -> 'useful'. Consider tightening the sentence for clarity as well.

Copilot uses AI. Check for mistakes.
Self {
reader,
config,
current_chunk: None,
progress: StreamingProgress::default(),
finished: false,
}
}

/// Read the next item from the stream.
///
/// Returns `None` when the stream is exhausted.
Expand Down Expand Up @@ -73,7 +94,7 @@ impl<R: Read> StreamingDecoder<R> {

// Create reader from remaining chunk data
let reader = SliceReader::new(&chunk.data[chunk.offset..]);
let mut decoder = DecoderImpl::new(reader, config::standard());
let mut decoder = DecoderImpl::new(reader, self.config);
let item = T::decode(&mut decoder)?;
Comment on lines 95 to 98
Copy link

Copilot AI Apr 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to the encoder, this likely moves self.config out of &mut self when creating DecoderImpl. If C isn't Copy, this will not compile and/or will fail on subsequent calls. Consider passing &self.config (if supported) or adding a Clone bound and using self.config.clone() (or storing the config behind Arc<C>).

Copilot uses AI. Check for mistakes.

// Update offset based on how much was read
Expand Down
43 changes: 41 additions & 2 deletions src/streaming/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

use super::chunk::ChunkHeader;
use super::{StreamingConfig, StreamingProgress};
use crate::config::Config;
#[cfg(feature = "std")]
use crate::config::{Configuration, LittleEndian, NoLimit, Varint};
use crate::enc::{Encode, EncoderImpl, VecWriter};
use crate::{config, Result};

Expand All @@ -22,8 +25,9 @@ use std::io::Write;
/// This allows encoding very large collections without loading
/// everything into memory at once.
#[cfg(feature = "std")]
pub struct StreamingEncoder<W: Write> {
pub struct StreamingEncoder<W: Write, C = Configuration<LittleEndian, Varint, NoLimit>> {
writer: W,
encoding_config: C,
config: StreamingConfig,
buffer: alloc::vec::Vec<u8>,
items_in_buffer: u32,
Expand All @@ -42,14 +46,49 @@ impl<W: Write> StreamingEncoder<W> {
pub fn with_config(writer: W, config: StreamingConfig) -> Self {
Self {
writer,
encoding_config: config::standard(),
config,
buffer: alloc::vec::Vec::new(),
items_in_buffer: 0,
Comment on lines 46 to 52
Copy link

Copilot AI Apr 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This does not compile: the config parameter (a StreamingConfig) shadows the imported crate::config module, so config::standard() resolves to a local variable and cannot be used as a path. Rename the parameter (e.g., streaming_config) or call the module via an unshadowed path (e.g., crate::config::standard()).

Copilot uses AI. Check for mistakes.
progress: StreamingProgress::default(),
progress_callback: None,
}
}
}

#[cfg(feature = "std")]
impl<W: Write, C: Config> StreamingEncoder<W, C> {
/// Create a streaming encoder with custom encoding configuration.
/// The chunking configuration is still taken from the default `StreamingConfig`.
/// This allows you to use a custom encoding configuration while still using the default chunking behavior.
pub fn new_with(writer: W, encoding_config: C) -> Self {
Self {
writer,
encoding_config,
config: StreamingConfig::default(),
buffer: alloc::vec::Vec::new(),
items_in_buffer: 0,
progress: StreamingProgress::default(),
progress_callback: None,
}
}

/// Create a streaming encoder with custom encoding and chunking configuration.
/// This allows you to fully customize both the encoding behavior and the chunking behavior.
/// The `encoding_config` is used for encoding individual items, while the `config` is used for controlling how items are buffered and when chunks are flushed.
/// For example, you could use a custom encoding configuration that uses a different endianness or varint encoding, while still using the default chunking behavior.
/// Or you could use a custom chunking configuration that flushes after every item, while still using the default encoding configuration.
pub fn new_with_config(writer: W, encoding_config: C, config: StreamingConfig) -> Self {
Self {
writer,
encoding_config,
config,
buffer: alloc::vec::Vec::new(),
items_in_buffer: 0,
progress: StreamingProgress::default(),
progress_callback: None,
}
}
/// Set a progress callback.
pub fn with_progress_callback(mut self, callback: ProgressCallback) -> Self {
self.progress_callback = Some(callback);
Expand All @@ -65,7 +104,7 @@ impl<W: Write> StreamingEncoder<W> {
pub fn write_item<T: Encode>(&mut self, item: &T) -> Result<()> {
// Encode item to temporary buffer
let item_writer = VecWriter::new();
let mut encoder = EncoderImpl::new(item_writer, config::standard());
let mut encoder = EncoderImpl::new(item_writer, self.encoding_config);
item.encode(&mut encoder)?;
let item_bytes = encoder.into_writer().into_vec();
Comment on lines 104 to 109
Copy link

Copilot AI Apr 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This likely moves self.encoding_config out of &mut self when constructing EncoderImpl, which will either fail to compile (if C isn't Copy) or prevent reuse of the encoder config across multiple calls. Prefer passing a reference if EncoderImpl::new supports it, or require C: Clone and pass self.encoding_config.clone() (or store the config in a shared wrapper like Arc<C>).

Copilot uses AI. Check for mistakes.

Expand Down
2 changes: 2 additions & 0 deletions tests/compression_advanced3_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,7 @@ fn test_adv3_lz4_double_nested_compress_roundtrip() {
// Test 12 – CompressionStats default() yields all-zero fields
// ─────────────────────────────────────────────────────────────────────────────

#[cfg(any(feature = "compression-lz4", feature = "compression-zstd"))]
#[test]
fn test_adv3_compression_stats_default_is_zero() {
use oxicode::compression::CompressionStats;
Expand Down Expand Up @@ -361,6 +362,7 @@ fn test_adv3_compression_stats_default_is_zero() {
// Test 13 – CompressionStats when compressed_size > original_size (expansion)
// ─────────────────────────────────────────────────────────────────────────────

#[cfg(any(feature = "compression-lz4", feature = "compression-zstd"))]
#[test]
fn test_adv3_compression_stats_expansion_scenario() {
use oxicode::compression::CompressionStats;
Expand Down