From 4266d0eaa02dfbc53c152aa1778c990cd6458325 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Mon, 2 Mar 2026 11:17:02 -0300 Subject: [PATCH 01/11] feat: prune states and blocks after a while --- crates/storage/src/store.rs | 331 +++++++++++++++++++++++++++++++++++- 1 file changed, 329 insertions(+), 2 deletions(-) diff --git a/crates/storage/src/store.rs b/crates/storage/src/store.rs index 76945021..255966bc 100644 --- a/crates/storage/src/store.rs +++ b/crates/storage/src/store.rs @@ -78,6 +78,17 @@ const KEY_LATEST_JUSTIFIED: &[u8] = b"latest_justified"; /// Key for "latest_finalized" field of the Store. Its value has type [`Checkpoint`] and it's SSZ-encoded. const KEY_LATEST_FINALIZED: &[u8] = b"latest_finalized"; +/// ~1 day of block history at 4-second slots (86400 / 4 = 21600). +const BLOCKS_TO_KEEP: usize = 21_600; + +/// ~1 hour of state history at 4-second slots (3600 / 4 = 900). +const STATES_TO_KEEP: usize = 900; + +const _: () = assert!( + BLOCKS_TO_KEEP >= STATES_TO_KEEP, + "BLOCKS_TO_KEEP must be >= STATES_TO_KEEP" +); + // ============ Key Encoding Helpers ============ /// Encode a SignatureKey (validator_id, root) to bytes. @@ -381,10 +392,25 @@ impl Store { Table::LatestKnownAggregatedPayloads, finalized.slot, ); - if pruned_chain > 0 || pruned_sigs > 0 || pruned_att_data > 0 { + // Prune old states before blocks: state pruning uses headers for slot lookup + let protected_roots = [finalized.root, self.latest_justified().root]; + let pruned_states = self.prune_old_states(&protected_roots); + let pruned_blocks = self.prune_old_blocks(&protected_roots); + + if pruned_chain > 0 + || pruned_sigs > 0 + || pruned_att_data > 0 + || pruned_states > 0 + || pruned_blocks > 0 + { info!( finalized_slot = finalized.slot, - pruned_chain, pruned_sigs, pruned_att_data, "Pruned finalized data" + pruned_chain, + pruned_sigs, + pruned_att_data, + pruned_states, + pruned_blocks, + "Pruned finalized data" ); } } @@ -558,6 +584,111 @@ impl Store { } } + /// Prune old states beyond the retention window. + /// + /// Keeps the most recent `STATES_TO_KEEP` states (by slot), plus any + /// states whose roots appear in `protected_roots` (finalized, justified). + /// + /// Returns the number of states pruned. + pub fn prune_old_states(&mut self, protected_roots: &[H256]) -> usize { + let view = self.backend.begin_read().expect("read view"); + + // Collect (root_bytes, slot) from BlockHeaders to determine state age. + let mut entries: Vec<(Vec, u64)> = view + .prefix_iterator(Table::BlockHeaders, &[]) + .expect("iterator") + .filter_map(|res| res.ok()) + .map(|(key, value)| { + let header = BlockHeader::from_ssz_bytes(&value).expect("valid header"); + (key.to_vec(), header.slot) + }) + .collect(); + drop(view); + + if entries.len() <= STATES_TO_KEEP { + return 0; + } + + // Sort by slot descending (newest first) + entries.sort_unstable_by(|a, b| b.1.cmp(&a.1)); + + let protected: HashSet> = + protected_roots.iter().map(|r| r.as_ssz_bytes()).collect(); + + // Skip the retention window, collect remaining keys for deletion + let keys_to_delete: Vec> = entries + .into_iter() + .skip(STATES_TO_KEEP) + .filter(|(key, _)| !protected.contains(key)) + .map(|(key, _)| key) + .collect(); + + let count = keys_to_delete.len(); + if count > 0 { + let mut batch = self.backend.begin_write().expect("write batch"); + batch + .delete_batch(Table::States, keys_to_delete) + .expect("delete old states"); + batch.commit().expect("commit"); + } + count + } + + /// Prune old blocks beyond the retention window. + /// + /// Keeps the most recent `BLOCKS_TO_KEEP` blocks (by slot), plus any + /// blocks whose roots appear in `protected_roots` (finalized, justified). + /// Deletes from `BlockHeaders`, `BlockBodies`, and `BlockSignatures`. + /// + /// Returns the number of blocks pruned. + pub fn prune_old_blocks(&mut self, protected_roots: &[H256]) -> usize { + let view = self.backend.begin_read().expect("read view"); + + let mut entries: Vec<(Vec, u64)> = view + .prefix_iterator(Table::BlockHeaders, &[]) + .expect("iterator") + .filter_map(|res| res.ok()) + .map(|(key, value)| { + let header = BlockHeader::from_ssz_bytes(&value).expect("valid header"); + (key.to_vec(), header.slot) + }) + .collect(); + drop(view); + + if entries.len() <= BLOCKS_TO_KEEP { + return 0; + } + + // Sort by slot descending (newest first) + entries.sort_unstable_by(|a, b| b.1.cmp(&a.1)); + + let protected: HashSet> = + protected_roots.iter().map(|r| r.as_ssz_bytes()).collect(); + + let keys_to_delete: Vec> = entries + .into_iter() + .skip(BLOCKS_TO_KEEP) + .filter(|(key, _)| !protected.contains(key)) + .map(|(key, _)| key) + .collect(); + + let count = keys_to_delete.len(); + if count > 0 { + let mut batch = self.backend.begin_write().expect("write batch"); + batch + .delete_batch(Table::BlockHeaders, keys_to_delete.clone()) + .expect("delete old block headers"); + batch + .delete_batch(Table::BlockBodies, keys_to_delete.clone()) + .expect("delete old block bodies"); + batch + .delete_batch(Table::BlockSignatures, keys_to_delete) + .expect("delete old block signatures"); + batch.commit().expect("commit"); + } + count + } + /// Get the block header by root. pub fn get_block_header(&self, root: &H256) -> Option { let view = self.backend.begin_read().expect("read view"); @@ -988,3 +1119,199 @@ fn write_signed_block( block } + +#[cfg(test)] +mod tests { + use super::*; + use crate::backend::InMemoryBackend; + + /// Insert a block header (and dummy body + signature) for a given root and slot. + fn insert_header(backend: &dyn StorageBackend, root: H256, slot: u64) { + let header = BlockHeader { + slot, + proposer_index: 0, + parent_root: H256::ZERO, + state_root: H256::ZERO, + body_root: H256::ZERO, + }; + let mut batch = backend.begin_write().expect("write batch"); + let key = root.as_ssz_bytes(); + batch + .put_batch(Table::BlockHeaders, vec![(key.clone(), header.as_ssz_bytes())]) + .expect("put header"); + batch + .put_batch(Table::BlockBodies, vec![(key.clone(), vec![0u8; 4])]) + .expect("put body"); + batch + .put_batch(Table::BlockSignatures, vec![(key, vec![0u8; 4])]) + .expect("put sigs"); + batch.commit().expect("commit"); + } + + /// Insert a dummy state for a given root. + fn insert_state(backend: &dyn StorageBackend, root: H256) { + let mut batch = backend.begin_write().expect("write batch"); + let key = root.as_ssz_bytes(); + batch + .put_batch(Table::States, vec![(key, vec![0u8; 4])]) + .expect("put state"); + batch.commit().expect("commit"); + } + + /// Count entries in a table. + fn count_entries(backend: &dyn StorageBackend, table: Table) -> usize { + let view = backend.begin_read().expect("read view"); + view.prefix_iterator(table, &[]) + .expect("iterator") + .filter_map(|r| r.ok()) + .count() + } + + /// Check if a key exists in a table. + fn has_key(backend: &dyn StorageBackend, table: Table, root: &H256) -> bool { + let view = backend.begin_read().expect("read view"); + view.get(table, &root.as_ssz_bytes()) + .expect("get") + .is_some() + } + + /// Generate a deterministic H256 root from an index. + fn root(index: u64) -> H256 { + let mut bytes = [0u8; 32]; + bytes[..8].copy_from_slice(&index.to_be_bytes()); + H256::from(bytes) + } + + // ============ Block Pruning Tests ============ + + #[test] + fn prune_old_blocks_within_retention() { + let backend = Arc::new(InMemoryBackend::new()); + let mut store = Store { backend: backend.clone() }; + + // Insert exactly BLOCKS_TO_KEEP blocks + for i in 0..BLOCKS_TO_KEEP as u64 { + insert_header(backend.as_ref(), root(i), i); + } + assert_eq!(count_entries(backend.as_ref(), Table::BlockHeaders), BLOCKS_TO_KEEP); + + let pruned = store.prune_old_blocks(&[]); + assert_eq!(pruned, 0); + assert_eq!(count_entries(backend.as_ref(), Table::BlockHeaders), BLOCKS_TO_KEEP); + } + + #[test] + fn prune_old_blocks_exceeding_retention() { + let backend = Arc::new(InMemoryBackend::new()); + let mut store = Store { backend: backend.clone() }; + + let total = BLOCKS_TO_KEEP + 10; + for i in 0..total as u64 { + insert_header(backend.as_ref(), root(i), i); + } + assert_eq!(count_entries(backend.as_ref(), Table::BlockHeaders), total); + + let pruned = store.prune_old_blocks(&[]); + assert_eq!(pruned, 10); + assert_eq!(count_entries(backend.as_ref(), Table::BlockHeaders), BLOCKS_TO_KEEP); + assert_eq!(count_entries(backend.as_ref(), Table::BlockBodies), BLOCKS_TO_KEEP); + assert_eq!(count_entries(backend.as_ref(), Table::BlockSignatures), BLOCKS_TO_KEEP); + + // Oldest blocks (slots 0..10) should be gone + for i in 0..10u64 { + assert!(!has_key(backend.as_ref(), Table::BlockHeaders, &root(i))); + } + // Newest blocks should still exist + for i in 10..total as u64 { + assert!(has_key(backend.as_ref(), Table::BlockHeaders, &root(i))); + } + } + + #[test] + fn prune_old_blocks_preserves_protected() { + let backend = Arc::new(InMemoryBackend::new()); + let mut store = Store { backend: backend.clone() }; + + let total = BLOCKS_TO_KEEP + 10; + for i in 0..total as u64 { + insert_header(backend.as_ref(), root(i), i); + } + + // Protect the two oldest blocks (slots 0 and 1) + let finalized_root = root(0); + let justified_root = root(1); + let pruned = store.prune_old_blocks(&[finalized_root, justified_root]); + + // 10 would be pruned, but 2 are protected + assert_eq!(pruned, 8); + assert!(has_key(backend.as_ref(), Table::BlockHeaders, &finalized_root)); + assert!(has_key(backend.as_ref(), Table::BlockHeaders, &justified_root)); + assert!(has_key(backend.as_ref(), Table::BlockBodies, &finalized_root)); + assert!(has_key(backend.as_ref(), Table::BlockSignatures, &finalized_root)); + } + + // ============ State Pruning Tests ============ + + #[test] + fn prune_old_states_within_retention() { + let backend = Arc::new(InMemoryBackend::new()); + let mut store = Store { backend: backend.clone() }; + + // Insert STATES_TO_KEEP headers + states + for i in 0..STATES_TO_KEEP as u64 { + insert_header(backend.as_ref(), root(i), i); + insert_state(backend.as_ref(), root(i)); + } + assert_eq!(count_entries(backend.as_ref(), Table::States), STATES_TO_KEEP); + + let pruned = store.prune_old_states(&[]); + assert_eq!(pruned, 0); + } + + #[test] + fn prune_old_states_exceeding_retention() { + let backend = Arc::new(InMemoryBackend::new()); + let mut store = Store { backend: backend.clone() }; + + let total = STATES_TO_KEEP + 5; + for i in 0..total as u64 { + insert_header(backend.as_ref(), root(i), i); + insert_state(backend.as_ref(), root(i)); + } + assert_eq!(count_entries(backend.as_ref(), Table::States), total); + + let pruned = store.prune_old_states(&[]); + assert_eq!(pruned, 5); + assert_eq!(count_entries(backend.as_ref(), Table::States), STATES_TO_KEEP); + + // Oldest states should be gone + for i in 0..5u64 { + assert!(!has_key(backend.as_ref(), Table::States, &root(i))); + } + // Newest states should remain + for i in 5..total as u64 { + assert!(has_key(backend.as_ref(), Table::States, &root(i))); + } + } + + #[test] + fn prune_old_states_preserves_protected() { + let backend = Arc::new(InMemoryBackend::new()); + let mut store = Store { backend: backend.clone() }; + + let total = STATES_TO_KEEP + 5; + for i in 0..total as u64 { + insert_header(backend.as_ref(), root(i), i); + insert_state(backend.as_ref(), root(i)); + } + + let finalized_root = root(0); + let justified_root = root(2); + let pruned = store.prune_old_states(&[finalized_root, justified_root]); + + // 5 would be pruned, but 2 are protected + assert_eq!(pruned, 3); + assert!(has_key(backend.as_ref(), Table::States, &finalized_root)); + assert!(has_key(backend.as_ref(), Table::States, &justified_root)); + } +} From 9b373c5cc25d33193262e1b870ccf65fc16badd6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Mon, 2 Mar 2026 11:36:54 -0300 Subject: [PATCH 02/11] chore: run make fmt --- crates/storage/src/store.rs | 88 +++++++++++++++++++++++++++++-------- 1 file changed, 70 insertions(+), 18 deletions(-) diff --git a/crates/storage/src/store.rs b/crates/storage/src/store.rs index 255966bc..375b3502 100644 --- a/crates/storage/src/store.rs +++ b/crates/storage/src/store.rs @@ -1137,7 +1137,10 @@ mod tests { let mut batch = backend.begin_write().expect("write batch"); let key = root.as_ssz_bytes(); batch - .put_batch(Table::BlockHeaders, vec![(key.clone(), header.as_ssz_bytes())]) + .put_batch( + Table::BlockHeaders, + vec![(key.clone(), header.as_ssz_bytes())], + ) .expect("put header"); batch .put_batch(Table::BlockBodies, vec![(key.clone(), vec![0u8; 4])]) @@ -1187,23 +1190,33 @@ mod tests { #[test] fn prune_old_blocks_within_retention() { let backend = Arc::new(InMemoryBackend::new()); - let mut store = Store { backend: backend.clone() }; + let mut store = Store { + backend: backend.clone(), + }; // Insert exactly BLOCKS_TO_KEEP blocks for i in 0..BLOCKS_TO_KEEP as u64 { insert_header(backend.as_ref(), root(i), i); } - assert_eq!(count_entries(backend.as_ref(), Table::BlockHeaders), BLOCKS_TO_KEEP); + assert_eq!( + count_entries(backend.as_ref(), Table::BlockHeaders), + BLOCKS_TO_KEEP + ); let pruned = store.prune_old_blocks(&[]); assert_eq!(pruned, 0); - assert_eq!(count_entries(backend.as_ref(), Table::BlockHeaders), BLOCKS_TO_KEEP); + assert_eq!( + count_entries(backend.as_ref(), Table::BlockHeaders), + BLOCKS_TO_KEEP + ); } #[test] fn prune_old_blocks_exceeding_retention() { let backend = Arc::new(InMemoryBackend::new()); - let mut store = Store { backend: backend.clone() }; + let mut store = Store { + backend: backend.clone(), + }; let total = BLOCKS_TO_KEEP + 10; for i in 0..total as u64 { @@ -1213,9 +1226,18 @@ mod tests { let pruned = store.prune_old_blocks(&[]); assert_eq!(pruned, 10); - assert_eq!(count_entries(backend.as_ref(), Table::BlockHeaders), BLOCKS_TO_KEEP); - assert_eq!(count_entries(backend.as_ref(), Table::BlockBodies), BLOCKS_TO_KEEP); - assert_eq!(count_entries(backend.as_ref(), Table::BlockSignatures), BLOCKS_TO_KEEP); + assert_eq!( + count_entries(backend.as_ref(), Table::BlockHeaders), + BLOCKS_TO_KEEP + ); + assert_eq!( + count_entries(backend.as_ref(), Table::BlockBodies), + BLOCKS_TO_KEEP + ); + assert_eq!( + count_entries(backend.as_ref(), Table::BlockSignatures), + BLOCKS_TO_KEEP + ); // Oldest blocks (slots 0..10) should be gone for i in 0..10u64 { @@ -1230,7 +1252,9 @@ mod tests { #[test] fn prune_old_blocks_preserves_protected() { let backend = Arc::new(InMemoryBackend::new()); - let mut store = Store { backend: backend.clone() }; + let mut store = Store { + backend: backend.clone(), + }; let total = BLOCKS_TO_KEEP + 10; for i in 0..total as u64 { @@ -1244,10 +1268,26 @@ mod tests { // 10 would be pruned, but 2 are protected assert_eq!(pruned, 8); - assert!(has_key(backend.as_ref(), Table::BlockHeaders, &finalized_root)); - assert!(has_key(backend.as_ref(), Table::BlockHeaders, &justified_root)); - assert!(has_key(backend.as_ref(), Table::BlockBodies, &finalized_root)); - assert!(has_key(backend.as_ref(), Table::BlockSignatures, &finalized_root)); + assert!(has_key( + backend.as_ref(), + Table::BlockHeaders, + &finalized_root + )); + assert!(has_key( + backend.as_ref(), + Table::BlockHeaders, + &justified_root + )); + assert!(has_key( + backend.as_ref(), + Table::BlockBodies, + &finalized_root + )); + assert!(has_key( + backend.as_ref(), + Table::BlockSignatures, + &finalized_root + )); } // ============ State Pruning Tests ============ @@ -1255,14 +1295,19 @@ mod tests { #[test] fn prune_old_states_within_retention() { let backend = Arc::new(InMemoryBackend::new()); - let mut store = Store { backend: backend.clone() }; + let mut store = Store { + backend: backend.clone(), + }; // Insert STATES_TO_KEEP headers + states for i in 0..STATES_TO_KEEP as u64 { insert_header(backend.as_ref(), root(i), i); insert_state(backend.as_ref(), root(i)); } - assert_eq!(count_entries(backend.as_ref(), Table::States), STATES_TO_KEEP); + assert_eq!( + count_entries(backend.as_ref(), Table::States), + STATES_TO_KEEP + ); let pruned = store.prune_old_states(&[]); assert_eq!(pruned, 0); @@ -1271,7 +1316,9 @@ mod tests { #[test] fn prune_old_states_exceeding_retention() { let backend = Arc::new(InMemoryBackend::new()); - let mut store = Store { backend: backend.clone() }; + let mut store = Store { + backend: backend.clone(), + }; let total = STATES_TO_KEEP + 5; for i in 0..total as u64 { @@ -1282,7 +1329,10 @@ mod tests { let pruned = store.prune_old_states(&[]); assert_eq!(pruned, 5); - assert_eq!(count_entries(backend.as_ref(), Table::States), STATES_TO_KEEP); + assert_eq!( + count_entries(backend.as_ref(), Table::States), + STATES_TO_KEEP + ); // Oldest states should be gone for i in 0..5u64 { @@ -1297,7 +1347,9 @@ mod tests { #[test] fn prune_old_states_preserves_protected() { let backend = Arc::new(InMemoryBackend::new()); - let mut store = Store { backend: backend.clone() }; + let mut store = Store { + backend: backend.clone(), + }; let total = STATES_TO_KEEP + 5; for i in 0..total as u64 { From d6c79587e7b0bb2368e8ba1242cc57e2396db08c Mon Sep 17 00:00:00 2001 From: Pablo Deymonnaz Date: Tue, 3 Mar 2026 12:29:43 -0300 Subject: [PATCH 03/11] Add periodic fallback pruning for stalled finalization When finalization stalls (e.g., network issues, low participation), the States and Block tables grow unbounded because retention-based pruning only triggers on finalization advancement. This adds a periodic check every 7200 slots (~8 hours) that calls prune_old_states and prune_old_blocks directly when finalization is far behind, adapted from Zeam's approach. The check is cheap (two integer comparisons per slot-0 tick) and the actual pruning reuses existing idempotent methods. --- crates/blockchain/src/store.rs | 18 ++++- crates/storage/src/lib.rs | 2 +- crates/storage/src/store.rs | 129 +++++++++++++++++++++++++++++++++ 3 files changed, 147 insertions(+), 2 deletions(-) diff --git a/crates/blockchain/src/store.rs b/crates/blockchain/src/store.rs index 093950b7..bddfe915 100644 --- a/crates/blockchain/src/store.rs +++ b/crates/blockchain/src/store.rs @@ -4,7 +4,9 @@ use ethlambda_crypto::aggregate_signatures; use ethlambda_state_transition::{ is_proposer, process_block, process_slots, slot_is_justifiable_after, }; -use ethlambda_storage::{ForkCheckpoints, SignatureKey, Store, StoredAggregatedPayload}; +use ethlambda_storage::{ + ForkCheckpoints, PRUNING_FALLBACK_INTERVAL_SLOTS, SignatureKey, Store, StoredAggregatedPayload, +}; use ethlambda_types::{ ShortRoot, attestation::{ @@ -313,6 +315,20 @@ pub fn on_tick( // NOTE: here we assume on_tick never skips intervals match interval { 0 => { + // Periodic fallback pruning when finalization is stalled + if slot > 0 + && slot.is_multiple_of(PRUNING_FALLBACK_INTERVAL_SLOTS) + && slot.saturating_sub(store.latest_finalized().slot) + > PRUNING_FALLBACK_INTERVAL_SLOTS + { + warn!( + %slot, + finalized_slot = store.latest_finalized().slot, + "Finalization stalled, running periodic fallback pruning" + ); + store.periodic_prune(); + } + // Start of slot - process attestations if proposal exists if should_signal_proposal { accept_new_attestations(store, false); diff --git a/crates/storage/src/lib.rs b/crates/storage/src/lib.rs index 89e2fa36..4c8fc21f 100644 --- a/crates/storage/src/lib.rs +++ b/crates/storage/src/lib.rs @@ -4,5 +4,5 @@ mod store; mod types; pub use api::StorageBackend; -pub use store::{ForkCheckpoints, SignatureKey, Store}; +pub use store::{ForkCheckpoints, PRUNING_FALLBACK_INTERVAL_SLOTS, SignatureKey, Store}; pub use types::{StoredAggregatedPayload, StoredSignature}; diff --git a/crates/storage/src/store.rs b/crates/storage/src/store.rs index b529e845..4ebf567c 100644 --- a/crates/storage/src/store.rs +++ b/crates/storage/src/store.rs @@ -90,6 +90,10 @@ const _: () = assert!( "BLOCKS_TO_KEEP must be >= STATES_TO_KEEP" ); +/// Periodic fallback pruning interval: prune old blocks/states every N slots +/// even if finalization hasn't advanced. Set to 7200 slots (~8 hours at 4s/slot). +pub const PRUNING_FALLBACK_INTERVAL_SLOTS: u64 = 7200; + // ============ Key Encoding Helpers ============ /// Encode a SignatureKey (validator_id, root) to bytes. @@ -417,6 +421,21 @@ impl Store { } } + /// Fallback pruning for when finalization is stalled. + /// + /// Calls `prune_old_states` and `prune_old_blocks` with the current + /// finalized and justified roots as protected. This reuses the same + /// retention-window logic from `update_checkpoints`, but runs + /// independently of finalization advancement. + pub fn periodic_prune(&mut self) { + let protected_roots = [self.latest_finalized().root, self.latest_justified().root]; + let pruned_states = self.prune_old_states(&protected_roots); + let pruned_blocks = self.prune_old_blocks(&protected_roots); + if pruned_states > 0 || pruned_blocks > 0 { + info!(pruned_states, pruned_blocks, "Periodic fallback pruning"); + } + } + // ============ Blocks ============ /// Get block data for fork choice: root -> (slot, parent_root). @@ -1391,4 +1410,114 @@ mod tests { assert!(has_key(backend.as_ref(), Table::States, &finalized_root)); assert!(has_key(backend.as_ref(), Table::States, &justified_root)); } + + // ============ Periodic Pruning Tests ============ + + /// Set up finalized and justified checkpoints in metadata. + fn set_checkpoints(backend: &dyn StorageBackend, finalized: Checkpoint, justified: Checkpoint) { + let mut batch = backend.begin_write().expect("write batch"); + batch + .put_batch( + Table::Metadata, + vec![ + (KEY_LATEST_FINALIZED.to_vec(), finalized.as_ssz_bytes()), + (KEY_LATEST_JUSTIFIED.to_vec(), justified.as_ssz_bytes()), + ], + ) + .expect("put checkpoints"); + batch.commit().expect("commit"); + } + + #[test] + fn periodic_prune_removes_old_states_and_blocks() { + let backend = Arc::new(InMemoryBackend::new()); + let mut store = Store { + backend: backend.clone(), + }; + + // Use roots that are within the retention window as finalized/justified + let finalized_root = root(0); + let justified_root = root(1); + set_checkpoints( + backend.as_ref(), + Checkpoint { + slot: 0, + root: finalized_root, + }, + Checkpoint { + slot: 1, + root: justified_root, + }, + ); + + // Insert more than STATES_TO_KEEP headers + states and BLOCKS_TO_KEEP blocks + let total_states = STATES_TO_KEEP + 5; + for i in 0..total_states as u64 { + insert_header(backend.as_ref(), root(i), i); + insert_state(backend.as_ref(), root(i)); + } + + assert_eq!(count_entries(backend.as_ref(), Table::States), total_states); + assert_eq!( + count_entries(backend.as_ref(), Table::BlockHeaders), + total_states + ); + + // periodic_prune should prune states beyond STATES_TO_KEEP + // (protecting finalized and justified roots) + store.periodic_prune(); + + // States beyond retention should be pruned (5 excess - 2 protected = 3 pruned) + assert_eq!( + count_entries(backend.as_ref(), Table::States), + STATES_TO_KEEP + 2 + ); + // Finalized and justified states must survive + assert!(has_key(backend.as_ref(), Table::States, &finalized_root)); + assert!(has_key(backend.as_ref(), Table::States, &justified_root)); + + // Blocks: total_states < BLOCKS_TO_KEEP, so no blocks should be pruned + assert_eq!( + count_entries(backend.as_ref(), Table::BlockHeaders), + total_states + ); + } + + #[test] + fn periodic_prune_no_op_within_retention() { + let backend = Arc::new(InMemoryBackend::new()); + let mut store = Store { + backend: backend.clone(), + }; + + set_checkpoints( + backend.as_ref(), + Checkpoint { + slot: 0, + root: root(0), + }, + Checkpoint { + slot: 0, + root: root(0), + }, + ); + + // Insert exactly STATES_TO_KEEP entries (no excess) + for i in 0..STATES_TO_KEEP as u64 { + insert_header(backend.as_ref(), root(i), i); + insert_state(backend.as_ref(), root(i)); + } + + store.periodic_prune(); + + // Nothing should be pruned + assert_eq!( + count_entries(backend.as_ref(), Table::States), + STATES_TO_KEEP + ); + assert_eq!( + count_entries(backend.as_ref(), Table::BlockHeaders), + STATES_TO_KEEP + ); + } } From 040a5539c10b6fead1fe54b30e0c6345390929bc Mon Sep 17 00:00:00 2001 From: Pablo Deymonnaz Date: Tue, 3 Mar 2026 12:52:03 -0300 Subject: [PATCH 04/11] Read latest_finalized() once before the periodic pruning check. Avoids 3 redundant DB reads for the same checkpoint value that doesnt change between calls in the interval-0 handler. --- crates/blockchain/src/store.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/crates/blockchain/src/store.rs b/crates/blockchain/src/store.rs index bddfe915..a8cc1c85 100644 --- a/crates/blockchain/src/store.rs +++ b/crates/blockchain/src/store.rs @@ -316,14 +316,15 @@ pub fn on_tick( match interval { 0 => { // Periodic fallback pruning when finalization is stalled + let finalized = store.latest_finalized(); if slot > 0 && slot.is_multiple_of(PRUNING_FALLBACK_INTERVAL_SLOTS) - && slot.saturating_sub(store.latest_finalized().slot) + && slot.saturating_sub(finalized.slot) > PRUNING_FALLBACK_INTERVAL_SLOTS { warn!( %slot, - finalized_slot = store.latest_finalized().slot, + finalized_slot = finalized.slot, "Finalization stalled, running periodic fallback pruning" ); store.periodic_prune(); From 8ef5e9d4f636754b910d642cbb60b6965cc0515a Mon Sep 17 00:00:00 2001 From: Pablo Deymonnaz Date: Tue, 3 Mar 2026 12:53:07 -0300 Subject: [PATCH 05/11] Update crates/storage/src/store.rs Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com> --- crates/storage/src/store.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/storage/src/store.rs b/crates/storage/src/store.rs index 4ebf567c..149eac31 100644 --- a/crates/storage/src/store.rs +++ b/crates/storage/src/store.rs @@ -1450,7 +1450,7 @@ mod tests { }, ); - // Insert more than STATES_TO_KEEP headers + states and BLOCKS_TO_KEEP blocks + // Insert more than STATES_TO_KEEP headers + states, but fewer than BLOCKS_TO_KEEP let total_states = STATES_TO_KEEP + 5; for i in 0..total_states as u64 { insert_header(backend.as_ref(), root(i), i); From 203ee5a92a287d9531984cf8a3d47530990883e9 Mon Sep 17 00:00:00 2001 From: Pablo Deymonnaz Date: Tue, 3 Mar 2026 13:01:29 -0300 Subject: [PATCH 06/11] Add on_tick trigger condition tests for periodic pruning Two tests that verify the 3-part guard in on_tick interval 0: - on_tick_triggers_periodic_pruning_when_finalization_stalled: pruning fires when slot is a multiple of the interval AND finalization lag exceeds the threshold - on_tick_skips_periodic_pruning_when_finalization_healthy: pruning does NOT fire when finalization is recent (lag within threshold) Also exports Table, StorageWriteBatch, and StorageReadView from the storage crate to enable direct backend access in blockchain tests. --- crates/blockchain/src/store.rs | 138 ++++++++++++++++++++++++++++++++- crates/storage/src/lib.rs | 2 +- 2 files changed, 137 insertions(+), 3 deletions(-) diff --git a/crates/blockchain/src/store.rs b/crates/blockchain/src/store.rs index a8cc1c85..63bc9a9b 100644 --- a/crates/blockchain/src/store.rs +++ b/crates/blockchain/src/store.rs @@ -319,8 +319,7 @@ pub fn on_tick( let finalized = store.latest_finalized(); if slot > 0 && slot.is_multiple_of(PRUNING_FALLBACK_INTERVAL_SLOTS) - && slot.saturating_sub(finalized.slot) - > PRUNING_FALLBACK_INTERVAL_SLOTS + && slot.saturating_sub(finalized.slot) > PRUNING_FALLBACK_INTERVAL_SLOTS { warn!( %slot, @@ -1277,3 +1276,138 @@ fn is_reorg(old_head: H256, new_head: H256, store: &Store) -> bool { // Assume the ancestor is behind the latest finalized block false } + +#[cfg(test)] +mod tests { + use super::*; + use std::sync::Arc; + + use ethlambda_storage::{StorageBackend, Table, backend::InMemoryBackend}; + use ethlambda_types::{block::BlockHeader, primitives::ssz::Encode, state::State}; + + use crate::MILLISECONDS_PER_INTERVAL; + + /// Generate a deterministic H256 root from an index. + fn root(index: u64) -> H256 { + let mut bytes = [0u8; 32]; + bytes[..8].copy_from_slice(&index.to_be_bytes()); + H256::from(bytes) + } + + /// Insert a block header (and dummy body + signature) for a given root and slot. + fn insert_header(backend: &dyn StorageBackend, root: H256, slot: u64) { + let header = BlockHeader { + slot, + proposer_index: 0, + parent_root: H256::ZERO, + state_root: H256::ZERO, + body_root: H256::ZERO, + }; + let mut batch = backend.begin_write().expect("write batch"); + let key = root.as_ssz_bytes(); + batch + .put_batch( + Table::BlockHeaders, + vec![(key.clone(), header.as_ssz_bytes())], + ) + .expect("put header"); + batch + .put_batch(Table::BlockBodies, vec![(key.clone(), vec![0u8; 4])]) + .expect("put body"); + batch + .put_batch(Table::BlockSignatures, vec![(key, vec![0u8; 4])]) + .expect("put sigs"); + batch.commit().expect("commit"); + } + + /// Insert a dummy state for a given root. + fn insert_state(backend: &dyn StorageBackend, root: H256) { + let mut batch = backend.begin_write().expect("write batch"); + let key = root.as_ssz_bytes(); + batch + .put_batch(Table::States, vec![(key, vec![0u8; 4])]) + .expect("put state"); + batch.commit().expect("commit"); + } + + /// Count entries in a table. + fn count_entries(backend: &dyn StorageBackend, table: Table) -> usize { + let view = backend.begin_read().expect("read view"); + view.prefix_iterator(table, &[]) + .expect("iterator") + .filter_map(|r| r.ok()) + .count() + } + + /// Tick the store to exactly interval 0 of the given slot. + /// + /// Pre-sets store.time so on_tick processes a single interval, avoiding + /// side effects from other intervals (update_safe_target, etc.). + fn tick_to_interval_0(store: &mut Store, slot: u64) { + store.set_time(slot * INTERVALS_PER_SLOT - 1); + let timestamp_ms = slot * INTERVALS_PER_SLOT * MILLISECONDS_PER_INTERVAL; + on_tick(store, timestamp_ms, false, false); + } + + #[test] + fn on_tick_triggers_periodic_pruning_when_finalization_stalled() { + let backend = Arc::new(InMemoryBackend::new()); + let state = State::from_genesis(0, vec![]); + let mut store = Store::from_anchor_state(backend.clone(), state); + + // Insert 905 additional headers + states (exceeds STATES_TO_KEEP=900). + // Genesis already inserted 1 header + state, so total = 906. + for i in 1..=905u64 { + insert_header(backend.as_ref(), root(i), i); + insert_state(backend.as_ref(), root(i)); + } + let states_before = count_entries(backend.as_ref(), Table::States); + assert!(states_before > 900, "should exceed retention window"); + + // Tick to interval 0 of slot 2*PRUNING_FALLBACK_INTERVAL_SLOTS. + // Finalization is at slot 0, so lag = 14400 > 7200 → periodic pruning fires. + tick_to_interval_0(&mut store, PRUNING_FALLBACK_INTERVAL_SLOTS * 2); + + let states_after = count_entries(backend.as_ref(), Table::States); + assert!( + states_after < states_before, + "states should have been pruned: before={states_before}, after={states_after}" + ); + } + + #[test] + fn on_tick_skips_periodic_pruning_when_finalization_healthy() { + let backend = Arc::new(InMemoryBackend::new()); + let state = State::from_genesis(0, vec![]); + let mut store = Store::from_anchor_state(backend.clone(), state); + + // Advance finalization first (this triggers update_checkpoints' own pruning). + let target_slot = PRUNING_FALLBACK_INTERVAL_SLOTS; + store.update_checkpoints(ForkCheckpoints::new( + store.head(), + None, + Some(Checkpoint { + slot: target_slot - 1, + root: store.head(), + }), + )); + + // Insert excess states AFTER finalization pruning has run. + for i in 1..=905u64 { + insert_header(backend.as_ref(), root(i), i); + insert_state(backend.as_ref(), root(i)); + } + let states_before = count_entries(backend.as_ref(), Table::States); + assert!(states_before > 900, "should exceed retention window"); + + // Tick to interval 0 of the same target slot. + // Finalization is at target_slot - 1, so lag = 1 ≤ threshold → no periodic pruning. + tick_to_interval_0(&mut store, target_slot); + + let states_after = count_entries(backend.as_ref(), Table::States); + assert_eq!( + states_after, states_before, + "no pruning should occur when finalization is healthy" + ); + } +} diff --git a/crates/storage/src/lib.rs b/crates/storage/src/lib.rs index 4c8fc21f..3848ea12 100644 --- a/crates/storage/src/lib.rs +++ b/crates/storage/src/lib.rs @@ -3,6 +3,6 @@ pub mod backend; mod store; mod types; -pub use api::StorageBackend; +pub use api::{StorageBackend, StorageReadView, StorageWriteBatch, Table}; pub use store::{ForkCheckpoints, PRUNING_FALLBACK_INTERVAL_SLOTS, SignatureKey, Store}; pub use types::{StoredAggregatedPayload, StoredSignature}; From 32ee526c58bf867b1d21b915d6a48e59fc1f2cfd Mon Sep 17 00:00:00 2001 From: Pablo Deymonnaz Date: Wed, 4 Mar 2026 17:17:35 -0300 Subject: [PATCH 07/11] Move periodic pruning guard logic from blockchain to storage module periodic_prune now takes a slot parameter and handles the 3-part trigger condition (slot > 0, multiple of interval, finalization lag) internally. The blockchain on_tick handler just calls store.periodic_prune(slot). --- crates/blockchain/src/store.rs | 21 +++++------------- crates/storage/src/store.rs | 40 +++++++++++++++++++++++++++++----- 2 files changed, 39 insertions(+), 22 deletions(-) diff --git a/crates/blockchain/src/store.rs b/crates/blockchain/src/store.rs index defeb224..a29346c7 100644 --- a/crates/blockchain/src/store.rs +++ b/crates/blockchain/src/store.rs @@ -4,9 +4,7 @@ use ethlambda_crypto::aggregate_signatures; use ethlambda_state_transition::{ is_proposer, process_block, process_slots, slot_is_justifiable_after, }; -use ethlambda_storage::{ - ForkCheckpoints, PRUNING_FALLBACK_INTERVAL_SLOTS, SignatureKey, Store, StoredAggregatedPayload, -}; +use ethlambda_storage::{ForkCheckpoints, SignatureKey, Store, StoredAggregatedPayload}; use ethlambda_types::{ ShortRoot, attestation::{ @@ -316,18 +314,7 @@ pub fn on_tick( match interval { 0 => { // Periodic fallback pruning when finalization is stalled - let finalized = store.latest_finalized(); - if slot > 0 - && slot.is_multiple_of(PRUNING_FALLBACK_INTERVAL_SLOTS) - && slot.saturating_sub(finalized.slot) > PRUNING_FALLBACK_INTERVAL_SLOTS - { - warn!( - %slot, - finalized_slot = finalized.slot, - "Finalization stalled, running periodic fallback pruning" - ); - store.periodic_prune(); - } + store.periodic_prune(slot); // Start of slot - process attestations if proposal exists if should_signal_proposal { @@ -1276,7 +1263,9 @@ mod tests { use super::*; use std::sync::Arc; - use ethlambda_storage::{StorageBackend, Table, backend::InMemoryBackend}; + use ethlambda_storage::{ + PRUNING_FALLBACK_INTERVAL_SLOTS, StorageBackend, Table, backend::InMemoryBackend, + }; use ethlambda_types::{block::BlockHeader, primitives::ssz::Encode, state::State}; use crate::MILLISECONDS_PER_INTERVAL; diff --git a/crates/storage/src/store.rs b/crates/storage/src/store.rs index ea261c1e..b87c3e22 100644 --- a/crates/storage/src/store.rs +++ b/crates/storage/src/store.rs @@ -24,7 +24,7 @@ use ethlambda_types::{ signature::ValidatorSignature, state::{ChainConfig, State}, }; -use tracing::info; +use tracing::{info, warn}; /// Key for looking up individual validator signatures. /// Used to index signature caches by (validator, message) pairs. @@ -491,8 +491,33 @@ impl Store { /// finalized and justified roots as protected. This reuses the same /// retention-window logic from `update_checkpoints`, but runs /// independently of finalization advancement. - pub fn periodic_prune(&mut self) { - let protected_roots = [self.latest_finalized().root, self.latest_justified().root]; + /// Periodic fallback pruning for stalled finalization. + /// + /// When finalization stalls (e.g., network issues, low participation), the normal + /// pruning path in `update_checkpoints` never triggers because finalization doesn't + /// advance. This method runs every `PRUNING_FALLBACK_INTERVAL_SLOTS` slots and prunes + /// old states and blocks if finalization is far behind. + /// + /// Trigger conditions (all must be true): + /// 1. `slot > 0` (skip genesis) + /// 2. `slot` is a multiple of `PRUNING_FALLBACK_INTERVAL_SLOTS` + /// 3. `current_slot - finalized_slot > PRUNING_FALLBACK_INTERVAL_SLOTS` + pub fn periodic_prune(&mut self, slot: u64) { + let finalized = self.latest_finalized(); + if slot == 0 + || !slot.is_multiple_of(PRUNING_FALLBACK_INTERVAL_SLOTS) + || slot.saturating_sub(finalized.slot) <= PRUNING_FALLBACK_INTERVAL_SLOTS + { + return; + } + + warn!( + %slot, + finalized_slot = finalized.slot, + "Finalization stalled, running periodic fallback pruning" + ); + + let protected_roots = [finalized.root, self.latest_justified().root]; let pruned_states = self.prune_old_states(&protected_roots); let pruned_blocks = self.prune_old_blocks(&protected_roots); if pruned_states > 0 || pruned_blocks > 0 { @@ -1422,8 +1447,9 @@ mod tests { ); // periodic_prune should prune states beyond STATES_TO_KEEP - // (protecting finalized and justified roots) - store.periodic_prune(); + // (protecting finalized and justified roots). + // Slot must satisfy all 3 trigger conditions: > 0, multiple of interval, lag > interval. + store.periodic_prune(PRUNING_FALLBACK_INTERVAL_SLOTS * 2); // States beyond retention should be pruned (5 excess - 2 protected = 3 pruned) assert_eq!( @@ -1464,7 +1490,9 @@ mod tests { insert_state(backend.as_ref(), root(i)); } - store.periodic_prune(); + // Slot is a multiple of the interval, but finalization is at slot 0 + // and count is within retention — nothing should be pruned + store.periodic_prune(PRUNING_FALLBACK_INTERVAL_SLOTS * 2); // Nothing should be pruned assert_eq!( From d4e19c116885766831cf3adc1e2f1c4b6df578b3 Mon Sep 17 00:00:00 2001 From: Pablo Deymonnaz Date: Wed, 4 Mar 2026 17:22:36 -0300 Subject: [PATCH 08/11] Reorder periodic_prune guards to avoid DB read on every slot Check cheap arithmetic conditions (slot == 0, is_multiple_of) before reading latest_finalized from the database. This avoids a wasted DB read on 7199 out of 7200 calls. --- crates/storage/src/store.rs | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/crates/storage/src/store.rs b/crates/storage/src/store.rs index b87c3e22..c652ae04 100644 --- a/crates/storage/src/store.rs +++ b/crates/storage/src/store.rs @@ -503,11 +503,13 @@ impl Store { /// 2. `slot` is a multiple of `PRUNING_FALLBACK_INTERVAL_SLOTS` /// 3. `current_slot - finalized_slot > PRUNING_FALLBACK_INTERVAL_SLOTS` pub fn periodic_prune(&mut self, slot: u64) { + // Check cheap arithmetic conditions before any DB read + if slot == 0 || !slot.is_multiple_of(PRUNING_FALLBACK_INTERVAL_SLOTS) { + return; + } + let finalized = self.latest_finalized(); - if slot == 0 - || !slot.is_multiple_of(PRUNING_FALLBACK_INTERVAL_SLOTS) - || slot.saturating_sub(finalized.slot) <= PRUNING_FALLBACK_INTERVAL_SLOTS - { + if slot.saturating_sub(finalized.slot) <= PRUNING_FALLBACK_INTERVAL_SLOTS { return; } From c43d0110601cf4c00a237ef3d060db13068192ea Mon Sep 17 00:00:00 2001 From: Pablo Deymonnaz Date: Wed, 4 Mar 2026 17:47:03 -0300 Subject: [PATCH 09/11] Move fallback pruning into update_checkpoints, removing it from on_tick Fallback pruning now lives entirely in the storage module's update_checkpoints method as an else branch: when finalization doesn't advance, it checks the head slot and prunes if finalization is stalled. This keeps the consensus layer completely unaware of pruning logic. Removes periodic_prune as a standalone method, the on_tick call, and the blockchain-level pruning tests (now covered by storage tests). --- crates/blockchain/src/store.rs | 139 --------------------------------- crates/storage/src/lib.rs | 2 +- crates/storage/src/store.rs | 109 +++++++++++++------------- 3 files changed, 55 insertions(+), 195 deletions(-) diff --git a/crates/blockchain/src/store.rs b/crates/blockchain/src/store.rs index a29346c7..9a5a8137 100644 --- a/crates/blockchain/src/store.rs +++ b/crates/blockchain/src/store.rs @@ -313,9 +313,6 @@ pub fn on_tick( // NOTE: here we assume on_tick never skips intervals match interval { 0 => { - // Periodic fallback pruning when finalization is stalled - store.periodic_prune(slot); - // Start of slot - process attestations if proposal exists if should_signal_proposal { accept_new_attestations(store, false); @@ -1258,139 +1255,3 @@ fn is_reorg(old_head: H256, new_head: H256, store: &Store) -> bool { false } -#[cfg(test)] -mod tests { - use super::*; - use std::sync::Arc; - - use ethlambda_storage::{ - PRUNING_FALLBACK_INTERVAL_SLOTS, StorageBackend, Table, backend::InMemoryBackend, - }; - use ethlambda_types::{block::BlockHeader, primitives::ssz::Encode, state::State}; - - use crate::MILLISECONDS_PER_INTERVAL; - - /// Generate a deterministic H256 root from an index. - fn root(index: u64) -> H256 { - let mut bytes = [0u8; 32]; - bytes[..8].copy_from_slice(&index.to_be_bytes()); - H256::from(bytes) - } - - /// Insert a block header (and dummy body + signature) for a given root and slot. - fn insert_header(backend: &dyn StorageBackend, root: H256, slot: u64) { - let header = BlockHeader { - slot, - proposer_index: 0, - parent_root: H256::ZERO, - state_root: H256::ZERO, - body_root: H256::ZERO, - }; - let mut batch = backend.begin_write().expect("write batch"); - let key = root.as_ssz_bytes(); - batch - .put_batch( - Table::BlockHeaders, - vec![(key.clone(), header.as_ssz_bytes())], - ) - .expect("put header"); - batch - .put_batch(Table::BlockBodies, vec![(key.clone(), vec![0u8; 4])]) - .expect("put body"); - batch - .put_batch(Table::BlockSignatures, vec![(key, vec![0u8; 4])]) - .expect("put sigs"); - batch.commit().expect("commit"); - } - - /// Insert a dummy state for a given root. - fn insert_state(backend: &dyn StorageBackend, root: H256) { - let mut batch = backend.begin_write().expect("write batch"); - let key = root.as_ssz_bytes(); - batch - .put_batch(Table::States, vec![(key, vec![0u8; 4])]) - .expect("put state"); - batch.commit().expect("commit"); - } - - /// Count entries in a table. - fn count_entries(backend: &dyn StorageBackend, table: Table) -> usize { - let view = backend.begin_read().expect("read view"); - view.prefix_iterator(table, &[]) - .expect("iterator") - .filter_map(|r| r.ok()) - .count() - } - - /// Tick the store to exactly interval 0 of the given slot. - /// - /// Pre-sets store.time so on_tick processes a single interval, avoiding - /// side effects from other intervals (update_safe_target, etc.). - fn tick_to_interval_0(store: &mut Store, slot: u64) { - store.set_time(slot * INTERVALS_PER_SLOT - 1); - let timestamp_ms = slot * INTERVALS_PER_SLOT * MILLISECONDS_PER_INTERVAL; - on_tick(store, timestamp_ms, false, false); - } - - #[test] - fn on_tick_triggers_periodic_pruning_when_finalization_stalled() { - let backend = Arc::new(InMemoryBackend::new()); - let state = State::from_genesis(0, vec![]); - let mut store = Store::from_anchor_state(backend.clone(), state); - - // Insert 905 additional headers + states (exceeds STATES_TO_KEEP=900). - // Genesis already inserted 1 header + state, so total = 906. - for i in 1..=905u64 { - insert_header(backend.as_ref(), root(i), i); - insert_state(backend.as_ref(), root(i)); - } - let states_before = count_entries(backend.as_ref(), Table::States); - assert!(states_before > 900, "should exceed retention window"); - - // Tick to interval 0 of slot 2*PRUNING_FALLBACK_INTERVAL_SLOTS. - // Finalization is at slot 0, so lag = 14400 > 7200 → periodic pruning fires. - tick_to_interval_0(&mut store, PRUNING_FALLBACK_INTERVAL_SLOTS * 2); - - let states_after = count_entries(backend.as_ref(), Table::States); - assert!( - states_after < states_before, - "states should have been pruned: before={states_before}, after={states_after}" - ); - } - - #[test] - fn on_tick_skips_periodic_pruning_when_finalization_healthy() { - let backend = Arc::new(InMemoryBackend::new()); - let state = State::from_genesis(0, vec![]); - let mut store = Store::from_anchor_state(backend.clone(), state); - - // Advance finalization first (this triggers update_checkpoints' own pruning). - let target_slot = PRUNING_FALLBACK_INTERVAL_SLOTS; - store.update_checkpoints(ForkCheckpoints::new( - store.head(), - None, - Some(Checkpoint { - slot: target_slot - 1, - root: store.head(), - }), - )); - - // Insert excess states AFTER finalization pruning has run. - for i in 1..=905u64 { - insert_header(backend.as_ref(), root(i), i); - insert_state(backend.as_ref(), root(i)); - } - let states_before = count_entries(backend.as_ref(), Table::States); - assert!(states_before > 900, "should exceed retention window"); - - // Tick to interval 0 of the same target slot. - // Finalization is at target_slot - 1, so lag = 1 ≤ threshold → no periodic pruning. - tick_to_interval_0(&mut store, target_slot); - - let states_after = count_entries(backend.as_ref(), Table::States); - assert_eq!( - states_after, states_before, - "no pruning should occur when finalization is healthy" - ); - } -} diff --git a/crates/storage/src/lib.rs b/crates/storage/src/lib.rs index 3848ea12..5cc90805 100644 --- a/crates/storage/src/lib.rs +++ b/crates/storage/src/lib.rs @@ -4,5 +4,5 @@ mod store; mod types; pub use api::{StorageBackend, StorageReadView, StorageWriteBatch, Table}; -pub use store::{ForkCheckpoints, PRUNING_FALLBACK_INTERVAL_SLOTS, SignatureKey, Store}; +pub use store::{ForkCheckpoints, SignatureKey, Store}; pub use types::{StoredAggregatedPayload, StoredSignature}; diff --git a/crates/storage/src/store.rs b/crates/storage/src/store.rs index c652ae04..d9dc17a7 100644 --- a/crates/storage/src/store.rs +++ b/crates/storage/src/store.rs @@ -482,48 +482,35 @@ impl Store { "Pruned finalized data" ); } - } - } - - /// Fallback pruning for when finalization is stalled. - /// - /// Calls `prune_old_states` and `prune_old_blocks` with the current - /// finalized and justified roots as protected. This reuses the same - /// retention-window logic from `update_checkpoints`, but runs - /// independently of finalization advancement. - /// Periodic fallback pruning for stalled finalization. - /// - /// When finalization stalls (e.g., network issues, low participation), the normal - /// pruning path in `update_checkpoints` never triggers because finalization doesn't - /// advance. This method runs every `PRUNING_FALLBACK_INTERVAL_SLOTS` slots and prunes - /// old states and blocks if finalization is far behind. - /// - /// Trigger conditions (all must be true): - /// 1. `slot > 0` (skip genesis) - /// 2. `slot` is a multiple of `PRUNING_FALLBACK_INTERVAL_SLOTS` - /// 3. `current_slot - finalized_slot > PRUNING_FALLBACK_INTERVAL_SLOTS` - pub fn periodic_prune(&mut self, slot: u64) { - // Check cheap arithmetic conditions before any DB read - if slot == 0 || !slot.is_multiple_of(PRUNING_FALLBACK_INTERVAL_SLOTS) { - return; - } - - let finalized = self.latest_finalized(); - if slot.saturating_sub(finalized.slot) <= PRUNING_FALLBACK_INTERVAL_SLOTS { - return; - } - - warn!( - %slot, - finalized_slot = finalized.slot, - "Finalization stalled, running periodic fallback pruning" - ); + } else { + // Fallback pruning when finalization is stalled. + // When finalization doesn't advance, the normal pruning path above never + // triggers. This fallback runs every PRUNING_FALLBACK_INTERVAL_SLOTS slots + // and prunes old states and blocks if finalization is far behind. + let head_slot = self + .get_block_header(&checkpoints.head) + .map(|h| h.slot) + .unwrap_or(0); + + if head_slot > 0 + && head_slot.is_multiple_of(PRUNING_FALLBACK_INTERVAL_SLOTS) + && head_slot.saturating_sub(old_finalized_slot) + > PRUNING_FALLBACK_INTERVAL_SLOTS + { + warn!( + slot = head_slot, + finalized_slot = old_finalized_slot, + "Finalization stalled, running periodic fallback pruning" + ); - let protected_roots = [finalized.root, self.latest_justified().root]; - let pruned_states = self.prune_old_states(&protected_roots); - let pruned_blocks = self.prune_old_blocks(&protected_roots); - if pruned_states > 0 || pruned_blocks > 0 { - info!(pruned_states, pruned_blocks, "Periodic fallback pruning"); + let protected_roots = + [self.latest_finalized().root, self.latest_justified().root]; + let pruned_states = self.prune_old_states(&protected_roots); + let pruned_blocks = self.prune_old_blocks(&protected_roots); + if pruned_states > 0 || pruned_blocks > 0 { + info!(pruned_states, pruned_blocks, "Periodic fallback pruning"); + } + } } } @@ -1416,7 +1403,7 @@ mod tests { } #[test] - fn periodic_prune_removes_old_states_and_blocks() { + fn fallback_pruning_removes_old_states_and_blocks() { let backend = Arc::new(InMemoryBackend::new()); let mut store = Store::test_store_with_backend(backend.clone()); @@ -1448,29 +1435,37 @@ mod tests { total_states ); - // periodic_prune should prune states beyond STATES_TO_KEEP - // (protecting finalized and justified roots). - // Slot must satisfy all 3 trigger conditions: > 0, multiple of interval, lag > interval. - store.periodic_prune(PRUNING_FALLBACK_INTERVAL_SLOTS * 2); + // Insert a head block at a slot that triggers fallback pruning: + // slot > 0, multiple of interval, lag > interval. + let head_slot = PRUNING_FALLBACK_INTERVAL_SLOTS * 2; + let head_root = root(9999); + insert_header(backend.as_ref(), head_root, head_slot); + + // Calling update_checkpoints with head_only triggers the fallback path + // (finalization doesn't advance → else branch fires) + store.update_checkpoints(ForkCheckpoints::head_only(head_root)); - // States beyond retention should be pruned (5 excess - 2 protected = 3 pruned) + // 906 headers total (905 original + 1 head). Top 900 by slot are kept in the + // retention window, leaving 6 candidates. 2 are protected (finalized + justified), + // so 4 are pruned → 905 - 4 = 901 states remaining. assert_eq!( count_entries(backend.as_ref(), Table::States), - STATES_TO_KEEP + 2 + STATES_TO_KEEP + 1 ); // Finalized and justified states must survive assert!(has_key(backend.as_ref(), Table::States, &finalized_root)); assert!(has_key(backend.as_ref(), Table::States, &justified_root)); // Blocks: total_states < BLOCKS_TO_KEEP, so no blocks should be pruned + // (+1 for the head block we inserted) assert_eq!( count_entries(backend.as_ref(), Table::BlockHeaders), - total_states + total_states + 1 ); } #[test] - fn periodic_prune_no_op_within_retention() { + fn fallback_pruning_no_op_within_retention() { let backend = Arc::new(InMemoryBackend::new()); let mut store = Store::test_store_with_backend(backend.clone()); @@ -1492,18 +1487,22 @@ mod tests { insert_state(backend.as_ref(), root(i)); } - // Slot is a multiple of the interval, but finalization is at slot 0 - // and count is within retention — nothing should be pruned - store.periodic_prune(PRUNING_FALLBACK_INTERVAL_SLOTS * 2); + // Insert a head block at a slot that triggers fallback pruning + let head_slot = PRUNING_FALLBACK_INTERVAL_SLOTS * 2; + let head_root = root(9999); + insert_header(backend.as_ref(), head_root, head_slot); - // Nothing should be pruned + store.update_checkpoints(ForkCheckpoints::head_only(head_root)); + + // Nothing should be pruned (within retention window) assert_eq!( count_entries(backend.as_ref(), Table::States), STATES_TO_KEEP ); + // +1 for the head block assert_eq!( count_entries(backend.as_ref(), Table::BlockHeaders), - STATES_TO_KEEP + STATES_TO_KEEP + 1 ); } From 120d1163f1b6f7c7b6ffa41ee95ccb1d607c9ffe Mon Sep 17 00:00:00 2001 From: Pablo Deymonnaz Date: Wed, 4 Mar 2026 17:49:13 -0300 Subject: [PATCH 10/11] Run cargo fmt --- crates/blockchain/src/store.rs | 1 - crates/storage/src/store.rs | 6 ++---- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/crates/blockchain/src/store.rs b/crates/blockchain/src/store.rs index 9a5a8137..63d62ef3 100644 --- a/crates/blockchain/src/store.rs +++ b/crates/blockchain/src/store.rs @@ -1254,4 +1254,3 @@ fn is_reorg(old_head: H256, new_head: H256, store: &Store) -> bool { // Assume the ancestor is behind the latest finalized block false } - diff --git a/crates/storage/src/store.rs b/crates/storage/src/store.rs index d9dc17a7..604808ac 100644 --- a/crates/storage/src/store.rs +++ b/crates/storage/src/store.rs @@ -494,8 +494,7 @@ impl Store { if head_slot > 0 && head_slot.is_multiple_of(PRUNING_FALLBACK_INTERVAL_SLOTS) - && head_slot.saturating_sub(old_finalized_slot) - > PRUNING_FALLBACK_INTERVAL_SLOTS + && head_slot.saturating_sub(old_finalized_slot) > PRUNING_FALLBACK_INTERVAL_SLOTS { warn!( slot = head_slot, @@ -503,8 +502,7 @@ impl Store { "Finalization stalled, running periodic fallback pruning" ); - let protected_roots = - [self.latest_finalized().root, self.latest_justified().root]; + let protected_roots = [self.latest_finalized().root, self.latest_justified().root]; let pruned_states = self.prune_old_states(&protected_roots); let pruned_blocks = self.prune_old_blocks(&protected_roots); if pruned_states > 0 || pruned_blocks > 0 { From b46a0c5f846c5641104bb8436a87f6fd7102c641 Mon Sep 17 00:00:00 2001 From: Pablo Deymonnaz Date: Wed, 4 Mar 2026 18:00:35 -0300 Subject: [PATCH 11/11] Simplify fallback pruning: run on every head update instead of every 7200 slots Remove the interval-based guard and PRUNING_FALLBACK_INTERVAL_SLOTS constant. The else branch in update_checkpoints now unconditionally calls prune_old_states and prune_old_blocks when finalization doesn't advance. The prune methods are already no-ops when within retention limits, so this is safe and more testable. --- crates/storage/src/store.rs | 69 +++++++++++-------------------------- 1 file changed, 21 insertions(+), 48 deletions(-) diff --git a/crates/storage/src/store.rs b/crates/storage/src/store.rs index 604808ac..9bc1b307 100644 --- a/crates/storage/src/store.rs +++ b/crates/storage/src/store.rs @@ -24,7 +24,7 @@ use ethlambda_types::{ signature::ValidatorSignature, state::{ChainConfig, State}, }; -use tracing::{info, warn}; +use tracing::info; /// Key for looking up individual validator signatures. /// Used to index signature caches by (validator, message) pairs. @@ -90,10 +90,6 @@ const _: () = assert!( "BLOCKS_TO_KEEP must be >= STATES_TO_KEEP" ); -/// Periodic fallback pruning interval: prune old blocks/states every N slots -/// even if finalization hasn't advanced. Set to 7200 slots (~8 hours at 4s/slot). -pub const PRUNING_FALLBACK_INTERVAL_SLOTS: u64 = 7200; - /// Hard cap for the known aggregated payload buffer. /// Matches Lantern's approach. With 9 validators, this holds /// ~455 unique attestation messages (~30 min at 1/slot). @@ -485,29 +481,16 @@ impl Store { } else { // Fallback pruning when finalization is stalled. // When finalization doesn't advance, the normal pruning path above never - // triggers. This fallback runs every PRUNING_FALLBACK_INTERVAL_SLOTS slots - // and prunes old states and blocks if finalization is far behind. - let head_slot = self - .get_block_header(&checkpoints.head) - .map(|h| h.slot) - .unwrap_or(0); - - if head_slot > 0 - && head_slot.is_multiple_of(PRUNING_FALLBACK_INTERVAL_SLOTS) - && head_slot.saturating_sub(old_finalized_slot) > PRUNING_FALLBACK_INTERVAL_SLOTS - { - warn!( - slot = head_slot, - finalized_slot = old_finalized_slot, - "Finalization stalled, running periodic fallback pruning" + // triggers. Prune old states and blocks on every head update to keep + // storage bounded. The prune methods are no-ops when within retention limits. + let protected_roots = [self.latest_finalized().root, self.latest_justified().root]; + let pruned_states = self.prune_old_states(&protected_roots); + let pruned_blocks = self.prune_old_blocks(&protected_roots); + if pruned_states > 0 || pruned_blocks > 0 { + info!( + pruned_states, + pruned_blocks, "Fallback pruning (finalization stalled)" ); - - let protected_roots = [self.latest_finalized().root, self.latest_justified().root]; - let pruned_states = self.prune_old_states(&protected_roots); - let pruned_blocks = self.prune_old_blocks(&protected_roots); - if pruned_states > 0 || pruned_blocks > 0 { - info!(pruned_states, pruned_blocks, "Periodic fallback pruning"); - } } } } @@ -1433,32 +1416,26 @@ mod tests { total_states ); - // Insert a head block at a slot that triggers fallback pruning: - // slot > 0, multiple of interval, lag > interval. - let head_slot = PRUNING_FALLBACK_INTERVAL_SLOTS * 2; - let head_root = root(9999); - insert_header(backend.as_ref(), head_root, head_slot); - - // Calling update_checkpoints with head_only triggers the fallback path - // (finalization doesn't advance → else branch fires) + // Use the last inserted root as head. Calling update_checkpoints with + // head_only triggers the fallback path (finalization doesn't advance). + let head_root = root(total_states as u64 - 1); store.update_checkpoints(ForkCheckpoints::head_only(head_root)); - // 906 headers total (905 original + 1 head). Top 900 by slot are kept in the - // retention window, leaving 6 candidates. 2 are protected (finalized + justified), - // so 4 are pruned → 905 - 4 = 901 states remaining. + // 905 headers total. Top 900 by slot are kept in the retention window, + // leaving 5 candidates. 2 are protected (finalized + justified), + // so 3 are pruned → 905 - 3 = 902 states remaining. assert_eq!( count_entries(backend.as_ref(), Table::States), - STATES_TO_KEEP + 1 + STATES_TO_KEEP + 2 ); // Finalized and justified states must survive assert!(has_key(backend.as_ref(), Table::States, &finalized_root)); assert!(has_key(backend.as_ref(), Table::States, &justified_root)); // Blocks: total_states < BLOCKS_TO_KEEP, so no blocks should be pruned - // (+1 for the head block we inserted) assert_eq!( count_entries(backend.as_ref(), Table::BlockHeaders), - total_states + 1 + total_states ); } @@ -1485,11 +1462,8 @@ mod tests { insert_state(backend.as_ref(), root(i)); } - // Insert a head block at a slot that triggers fallback pruning - let head_slot = PRUNING_FALLBACK_INTERVAL_SLOTS * 2; - let head_root = root(9999); - insert_header(backend.as_ref(), head_root, head_slot); - + // Use the last inserted root as head + let head_root = root(STATES_TO_KEEP as u64 - 1); store.update_checkpoints(ForkCheckpoints::head_only(head_root)); // Nothing should be pruned (within retention window) @@ -1497,10 +1471,9 @@ mod tests { count_entries(backend.as_ref(), Table::States), STATES_TO_KEEP ); - // +1 for the head block assert_eq!( count_entries(backend.as_ref(), Table::BlockHeaders), - STATES_TO_KEEP + 1 + STATES_TO_KEEP ); }