Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@ pub mod backend;
mod store;
mod types;

pub use api::StorageBackend;
pub use api::{StorageBackend, StorageReadView, StorageWriteBatch, Table};
pub use store::{ForkCheckpoints, SignatureKey, Store};
pub use types::{StoredAggregatedPayload, StoredSignature};
125 changes: 125 additions & 0 deletions crates/storage/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,20 @@ impl Store {
"Pruned finalized data"
);
}
} else {
// Fallback pruning when finalization is stalled.
// When finalization doesn't advance, the normal pruning path above never
// triggers. Prune old states and blocks on every head update to keep
// storage bounded. The prune methods are no-ops when within retention limits.
let protected_roots = [self.latest_finalized().root, self.latest_justified().root];
let pruned_states = self.prune_old_states(&protected_roots);
let pruned_blocks = self.prune_old_blocks(&protected_roots);
if pruned_states > 0 || pruned_blocks > 0 {
info!(
pruned_states,
pruned_blocks, "Fallback pruning (finalization stalled)"
);
}
}
}

Expand Down Expand Up @@ -1352,6 +1366,117 @@ mod tests {
assert!(has_key(backend.as_ref(), Table::States, &justified_root));
}

// ============ Periodic Pruning Tests ============

/// Set up finalized and justified checkpoints in metadata.
fn set_checkpoints(backend: &dyn StorageBackend, finalized: Checkpoint, justified: Checkpoint) {
let mut batch = backend.begin_write().expect("write batch");
batch
.put_batch(
Table::Metadata,
vec![
(KEY_LATEST_FINALIZED.to_vec(), finalized.as_ssz_bytes()),
(KEY_LATEST_JUSTIFIED.to_vec(), justified.as_ssz_bytes()),
],
)
.expect("put checkpoints");
batch.commit().expect("commit");
}

#[test]
fn fallback_pruning_removes_old_states_and_blocks() {
let backend = Arc::new(InMemoryBackend::new());
let mut store = Store::test_store_with_backend(backend.clone());

// Use roots that are within the retention window as finalized/justified
let finalized_root = root(0);
let justified_root = root(1);
set_checkpoints(
backend.as_ref(),
Checkpoint {
slot: 0,
root: finalized_root,
},
Checkpoint {
slot: 1,
root: justified_root,
},
);

// Insert more than STATES_TO_KEEP headers + states, but fewer than BLOCKS_TO_KEEP
let total_states = STATES_TO_KEEP + 5;
for i in 0..total_states as u64 {
insert_header(backend.as_ref(), root(i), i);
insert_state(backend.as_ref(), root(i));
}

assert_eq!(count_entries(backend.as_ref(), Table::States), total_states);
assert_eq!(
count_entries(backend.as_ref(), Table::BlockHeaders),
total_states
);

// Use the last inserted root as head. Calling update_checkpoints with
// head_only triggers the fallback path (finalization doesn't advance).
let head_root = root(total_states as u64 - 1);
store.update_checkpoints(ForkCheckpoints::head_only(head_root));

// 905 headers total. Top 900 by slot are kept in the retention window,
// leaving 5 candidates. 2 are protected (finalized + justified),
// so 3 are pruned → 905 - 3 = 902 states remaining.
assert_eq!(
count_entries(backend.as_ref(), Table::States),
STATES_TO_KEEP + 2
);
// Finalized and justified states must survive
assert!(has_key(backend.as_ref(), Table::States, &finalized_root));
assert!(has_key(backend.as_ref(), Table::States, &justified_root));

// Blocks: total_states < BLOCKS_TO_KEEP, so no blocks should be pruned
assert_eq!(
count_entries(backend.as_ref(), Table::BlockHeaders),
total_states
);
}

#[test]
fn fallback_pruning_no_op_within_retention() {
let backend = Arc::new(InMemoryBackend::new());
let mut store = Store::test_store_with_backend(backend.clone());

set_checkpoints(
backend.as_ref(),
Checkpoint {
slot: 0,
root: root(0),
},
Checkpoint {
slot: 0,
root: root(0),
},
);

// Insert exactly STATES_TO_KEEP entries (no excess)
for i in 0..STATES_TO_KEEP as u64 {
insert_header(backend.as_ref(), root(i), i);
insert_state(backend.as_ref(), root(i));
}

// Use the last inserted root as head
let head_root = root(STATES_TO_KEEP as u64 - 1);
store.update_checkpoints(ForkCheckpoints::head_only(head_root));

// Nothing should be pruned (within retention window)
assert_eq!(
count_entries(backend.as_ref(), Table::States),
STATES_TO_KEEP
);
assert_eq!(
count_entries(backend.as_ref(), Table::BlockHeaders),
STATES_TO_KEEP
);
}

// ============ PayloadBuffer Tests ============

fn make_payload(slot: u64) -> StoredAggregatedPayload {
Expand Down