diff --git a/crates/storage/src/lib.rs b/crates/storage/src/lib.rs index 89e2fa36..5cc90805 100644 --- a/crates/storage/src/lib.rs +++ b/crates/storage/src/lib.rs @@ -3,6 +3,6 @@ pub mod backend; mod store; mod types; -pub use api::StorageBackend; +pub use api::{StorageBackend, StorageReadView, StorageWriteBatch, Table}; pub use store::{ForkCheckpoints, SignatureKey, Store}; pub use types::{StoredAggregatedPayload, StoredSignature}; diff --git a/crates/storage/src/store.rs b/crates/storage/src/store.rs index a69d39c2..9bc1b307 100644 --- a/crates/storage/src/store.rs +++ b/crates/storage/src/store.rs @@ -478,6 +478,20 @@ impl Store { "Pruned finalized data" ); } + } else { + // Fallback pruning when finalization is stalled. + // When finalization doesn't advance, the normal pruning path above never + // triggers. Prune old states and blocks on every head update to keep + // storage bounded. The prune methods are no-ops when within retention limits. + let protected_roots = [self.latest_finalized().root, self.latest_justified().root]; + let pruned_states = self.prune_old_states(&protected_roots); + let pruned_blocks = self.prune_old_blocks(&protected_roots); + if pruned_states > 0 || pruned_blocks > 0 { + info!( + pruned_states, + pruned_blocks, "Fallback pruning (finalization stalled)" + ); + } } } @@ -1352,6 +1366,117 @@ mod tests { assert!(has_key(backend.as_ref(), Table::States, &justified_root)); } + // ============ Periodic Pruning Tests ============ + + /// Set up finalized and justified checkpoints in metadata. + fn set_checkpoints(backend: &dyn StorageBackend, finalized: Checkpoint, justified: Checkpoint) { + let mut batch = backend.begin_write().expect("write batch"); + batch + .put_batch( + Table::Metadata, + vec![ + (KEY_LATEST_FINALIZED.to_vec(), finalized.as_ssz_bytes()), + (KEY_LATEST_JUSTIFIED.to_vec(), justified.as_ssz_bytes()), + ], + ) + .expect("put checkpoints"); + batch.commit().expect("commit"); + } + + #[test] + fn fallback_pruning_removes_old_states_and_blocks() { + let backend = Arc::new(InMemoryBackend::new()); + let mut store = Store::test_store_with_backend(backend.clone()); + + // Use roots that are within the retention window as finalized/justified + let finalized_root = root(0); + let justified_root = root(1); + set_checkpoints( + backend.as_ref(), + Checkpoint { + slot: 0, + root: finalized_root, + }, + Checkpoint { + slot: 1, + root: justified_root, + }, + ); + + // Insert more than STATES_TO_KEEP headers + states, but fewer than BLOCKS_TO_KEEP + let total_states = STATES_TO_KEEP + 5; + for i in 0..total_states as u64 { + insert_header(backend.as_ref(), root(i), i); + insert_state(backend.as_ref(), root(i)); + } + + assert_eq!(count_entries(backend.as_ref(), Table::States), total_states); + assert_eq!( + count_entries(backend.as_ref(), Table::BlockHeaders), + total_states + ); + + // Use the last inserted root as head. Calling update_checkpoints with + // head_only triggers the fallback path (finalization doesn't advance). + let head_root = root(total_states as u64 - 1); + store.update_checkpoints(ForkCheckpoints::head_only(head_root)); + + // 905 headers total. Top 900 by slot are kept in the retention window, + // leaving 5 candidates. 2 are protected (finalized + justified), + // so 3 are pruned → 905 - 3 = 902 states remaining. + assert_eq!( + count_entries(backend.as_ref(), Table::States), + STATES_TO_KEEP + 2 + ); + // Finalized and justified states must survive + assert!(has_key(backend.as_ref(), Table::States, &finalized_root)); + assert!(has_key(backend.as_ref(), Table::States, &justified_root)); + + // Blocks: total_states < BLOCKS_TO_KEEP, so no blocks should be pruned + assert_eq!( + count_entries(backend.as_ref(), Table::BlockHeaders), + total_states + ); + } + + #[test] + fn fallback_pruning_no_op_within_retention() { + let backend = Arc::new(InMemoryBackend::new()); + let mut store = Store::test_store_with_backend(backend.clone()); + + set_checkpoints( + backend.as_ref(), + Checkpoint { + slot: 0, + root: root(0), + }, + Checkpoint { + slot: 0, + root: root(0), + }, + ); + + // Insert exactly STATES_TO_KEEP entries (no excess) + for i in 0..STATES_TO_KEEP as u64 { + insert_header(backend.as_ref(), root(i), i); + insert_state(backend.as_ref(), root(i)); + } + + // Use the last inserted root as head + let head_root = root(STATES_TO_KEEP as u64 - 1); + store.update_checkpoints(ForkCheckpoints::head_only(head_root)); + + // Nothing should be pruned (within retention window) + assert_eq!( + count_entries(backend.as_ref(), Table::States), + STATES_TO_KEEP + ); + assert_eq!( + count_entries(backend.as_ref(), Table::BlockHeaders), + STATES_TO_KEEP + ); + } + // ============ PayloadBuffer Tests ============ fn make_payload(slot: u64) -> StoredAggregatedPayload {