diff --git a/crates/storage/src/store.rs b/crates/storage/src/store.rs index 80f2528d..b529e845 100644 --- a/crates/storage/src/store.rs +++ b/crates/storage/src/store.rs @@ -79,6 +79,17 @@ const KEY_LATEST_JUSTIFIED: &[u8] = b"latest_justified"; /// Key for "latest_finalized" field of the Store. Its value has type [`Checkpoint`] and it's SSZ-encoded. const KEY_LATEST_FINALIZED: &[u8] = b"latest_finalized"; +/// ~1 day of block history at 4-second slots (86400 / 4 = 21600). +const BLOCKS_TO_KEEP: usize = 21_600; + +/// ~1 hour of state history at 4-second slots (3600 / 4 = 900). +const STATES_TO_KEEP: usize = 900; + +const _: () = assert!( + BLOCKS_TO_KEEP >= STATES_TO_KEEP, + "BLOCKS_TO_KEEP must be >= STATES_TO_KEEP" +); + // ============ Key Encoding Helpers ============ /// Encode a SignatureKey (validator_id, root) to bytes. @@ -382,10 +393,25 @@ impl Store { Table::LatestKnownAggregatedPayloads, finalized.slot, ); - if pruned_chain > 0 || pruned_sigs > 0 || pruned_att_data > 0 { + // Prune old states before blocks: state pruning uses headers for slot lookup + let protected_roots = [finalized.root, self.latest_justified().root]; + let pruned_states = self.prune_old_states(&protected_roots); + let pruned_blocks = self.prune_old_blocks(&protected_roots); + + if pruned_chain > 0 + || pruned_sigs > 0 + || pruned_att_data > 0 + || pruned_states > 0 + || pruned_blocks > 0 + { info!( finalized_slot = finalized.slot, - pruned_chain, pruned_sigs, pruned_att_data, "Pruned finalized data" + pruned_chain, + pruned_sigs, + pruned_att_data, + pruned_states, + pruned_blocks, + "Pruned finalized data" ); } } @@ -515,6 +541,111 @@ impl Store { } } + /// Prune old states beyond the retention window. + /// + /// Keeps the most recent `STATES_TO_KEEP` states (by slot), plus any + /// states whose roots appear in `protected_roots` (finalized, justified). + /// + /// Returns the number of states pruned. + pub fn prune_old_states(&mut self, protected_roots: &[H256]) -> usize { + let view = self.backend.begin_read().expect("read view"); + + // Collect (root_bytes, slot) from BlockHeaders to determine state age. + let mut entries: Vec<(Vec, u64)> = view + .prefix_iterator(Table::BlockHeaders, &[]) + .expect("iterator") + .filter_map(|res| res.ok()) + .map(|(key, value)| { + let header = BlockHeader::from_ssz_bytes(&value).expect("valid header"); + (key.to_vec(), header.slot) + }) + .collect(); + drop(view); + + if entries.len() <= STATES_TO_KEEP { + return 0; + } + + // Sort by slot descending (newest first) + entries.sort_unstable_by(|a, b| b.1.cmp(&a.1)); + + let protected: HashSet> = + protected_roots.iter().map(|r| r.as_ssz_bytes()).collect(); + + // Skip the retention window, collect remaining keys for deletion + let keys_to_delete: Vec> = entries + .into_iter() + .skip(STATES_TO_KEEP) + .filter(|(key, _)| !protected.contains(key)) + .map(|(key, _)| key) + .collect(); + + let count = keys_to_delete.len(); + if count > 0 { + let mut batch = self.backend.begin_write().expect("write batch"); + batch + .delete_batch(Table::States, keys_to_delete) + .expect("delete old states"); + batch.commit().expect("commit"); + } + count + } + + /// Prune old blocks beyond the retention window. + /// + /// Keeps the most recent `BLOCKS_TO_KEEP` blocks (by slot), plus any + /// blocks whose roots appear in `protected_roots` (finalized, justified). + /// Deletes from `BlockHeaders`, `BlockBodies`, and `BlockSignatures`. + /// + /// Returns the number of blocks pruned. + pub fn prune_old_blocks(&mut self, protected_roots: &[H256]) -> usize { + let view = self.backend.begin_read().expect("read view"); + + let mut entries: Vec<(Vec, u64)> = view + .prefix_iterator(Table::BlockHeaders, &[]) + .expect("iterator") + .filter_map(|res| res.ok()) + .map(|(key, value)| { + let header = BlockHeader::from_ssz_bytes(&value).expect("valid header"); + (key.to_vec(), header.slot) + }) + .collect(); + drop(view); + + if entries.len() <= BLOCKS_TO_KEEP { + return 0; + } + + // Sort by slot descending (newest first) + entries.sort_unstable_by(|a, b| b.1.cmp(&a.1)); + + let protected: HashSet> = + protected_roots.iter().map(|r| r.as_ssz_bytes()).collect(); + + let keys_to_delete: Vec> = entries + .into_iter() + .skip(BLOCKS_TO_KEEP) + .filter(|(key, _)| !protected.contains(key)) + .map(|(key, _)| key) + .collect(); + + let count = keys_to_delete.len(); + if count > 0 { + let mut batch = self.backend.begin_write().expect("write batch"); + batch + .delete_batch(Table::BlockHeaders, keys_to_delete.clone()) + .expect("delete old block headers"); + batch + .delete_batch(Table::BlockBodies, keys_to_delete.clone()) + .expect("delete old block bodies"); + batch + .delete_batch(Table::BlockSignatures, keys_to_delete) + .expect("delete old block signatures"); + batch.commit().expect("commit"); + } + count + } + /// Get the block header by root. pub fn get_block_header(&self, root: &H256) -> Option { let view = self.backend.begin_read().expect("read view"); @@ -1013,3 +1144,251 @@ fn write_signed_block( block } + +#[cfg(test)] +mod tests { + use super::*; + use crate::backend::InMemoryBackend; + + /// Insert a block header (and dummy body + signature) for a given root and slot. + fn insert_header(backend: &dyn StorageBackend, root: H256, slot: u64) { + let header = BlockHeader { + slot, + proposer_index: 0, + parent_root: H256::ZERO, + state_root: H256::ZERO, + body_root: H256::ZERO, + }; + let mut batch = backend.begin_write().expect("write batch"); + let key = root.as_ssz_bytes(); + batch + .put_batch( + Table::BlockHeaders, + vec![(key.clone(), header.as_ssz_bytes())], + ) + .expect("put header"); + batch + .put_batch(Table::BlockBodies, vec![(key.clone(), vec![0u8; 4])]) + .expect("put body"); + batch + .put_batch(Table::BlockSignatures, vec![(key, vec![0u8; 4])]) + .expect("put sigs"); + batch.commit().expect("commit"); + } + + /// Insert a dummy state for a given root. + fn insert_state(backend: &dyn StorageBackend, root: H256) { + let mut batch = backend.begin_write().expect("write batch"); + let key = root.as_ssz_bytes(); + batch + .put_batch(Table::States, vec![(key, vec![0u8; 4])]) + .expect("put state"); + batch.commit().expect("commit"); + } + + /// Count entries in a table. + fn count_entries(backend: &dyn StorageBackend, table: Table) -> usize { + let view = backend.begin_read().expect("read view"); + view.prefix_iterator(table, &[]) + .expect("iterator") + .filter_map(|r| r.ok()) + .count() + } + + /// Check if a key exists in a table. + fn has_key(backend: &dyn StorageBackend, table: Table, root: &H256) -> bool { + let view = backend.begin_read().expect("read view"); + view.get(table, &root.as_ssz_bytes()) + .expect("get") + .is_some() + } + + /// Generate a deterministic H256 root from an index. + fn root(index: u64) -> H256 { + let mut bytes = [0u8; 32]; + bytes[..8].copy_from_slice(&index.to_be_bytes()); + H256::from(bytes) + } + + // ============ Block Pruning Tests ============ + + #[test] + fn prune_old_blocks_within_retention() { + let backend = Arc::new(InMemoryBackend::new()); + let mut store = Store { + backend: backend.clone(), + }; + + // Insert exactly BLOCKS_TO_KEEP blocks + for i in 0..BLOCKS_TO_KEEP as u64 { + insert_header(backend.as_ref(), root(i), i); + } + assert_eq!( + count_entries(backend.as_ref(), Table::BlockHeaders), + BLOCKS_TO_KEEP + ); + + let pruned = store.prune_old_blocks(&[]); + assert_eq!(pruned, 0); + assert_eq!( + count_entries(backend.as_ref(), Table::BlockHeaders), + BLOCKS_TO_KEEP + ); + } + + #[test] + fn prune_old_blocks_exceeding_retention() { + let backend = Arc::new(InMemoryBackend::new()); + let mut store = Store { + backend: backend.clone(), + }; + + let total = BLOCKS_TO_KEEP + 10; + for i in 0..total as u64 { + insert_header(backend.as_ref(), root(i), i); + } + assert_eq!(count_entries(backend.as_ref(), Table::BlockHeaders), total); + + let pruned = store.prune_old_blocks(&[]); + assert_eq!(pruned, 10); + assert_eq!( + count_entries(backend.as_ref(), Table::BlockHeaders), + BLOCKS_TO_KEEP + ); + assert_eq!( + count_entries(backend.as_ref(), Table::BlockBodies), + BLOCKS_TO_KEEP + ); + assert_eq!( + count_entries(backend.as_ref(), Table::BlockSignatures), + BLOCKS_TO_KEEP + ); + + // Oldest blocks (slots 0..10) should be gone + for i in 0..10u64 { + assert!(!has_key(backend.as_ref(), Table::BlockHeaders, &root(i))); + } + // Newest blocks should still exist + for i in 10..total as u64 { + assert!(has_key(backend.as_ref(), Table::BlockHeaders, &root(i))); + } + } + + #[test] + fn prune_old_blocks_preserves_protected() { + let backend = Arc::new(InMemoryBackend::new()); + let mut store = Store { + backend: backend.clone(), + }; + + let total = BLOCKS_TO_KEEP + 10; + for i in 0..total as u64 { + insert_header(backend.as_ref(), root(i), i); + } + + // Protect the two oldest blocks (slots 0 and 1) + let finalized_root = root(0); + let justified_root = root(1); + let pruned = store.prune_old_blocks(&[finalized_root, justified_root]); + + // 10 would be pruned, but 2 are protected + assert_eq!(pruned, 8); + assert!(has_key( + backend.as_ref(), + Table::BlockHeaders, + &finalized_root + )); + assert!(has_key( + backend.as_ref(), + Table::BlockHeaders, + &justified_root + )); + assert!(has_key( + backend.as_ref(), + Table::BlockBodies, + &finalized_root + )); + assert!(has_key( + backend.as_ref(), + Table::BlockSignatures, + &finalized_root + )); + } + + // ============ State Pruning Tests ============ + + #[test] + fn prune_old_states_within_retention() { + let backend = Arc::new(InMemoryBackend::new()); + let mut store = Store { + backend: backend.clone(), + }; + + // Insert STATES_TO_KEEP headers + states + for i in 0..STATES_TO_KEEP as u64 { + insert_header(backend.as_ref(), root(i), i); + insert_state(backend.as_ref(), root(i)); + } + assert_eq!( + count_entries(backend.as_ref(), Table::States), + STATES_TO_KEEP + ); + + let pruned = store.prune_old_states(&[]); + assert_eq!(pruned, 0); + } + + #[test] + fn prune_old_states_exceeding_retention() { + let backend = Arc::new(InMemoryBackend::new()); + let mut store = Store { + backend: backend.clone(), + }; + + let total = STATES_TO_KEEP + 5; + for i in 0..total as u64 { + insert_header(backend.as_ref(), root(i), i); + insert_state(backend.as_ref(), root(i)); + } + assert_eq!(count_entries(backend.as_ref(), Table::States), total); + + let pruned = store.prune_old_states(&[]); + assert_eq!(pruned, 5); + assert_eq!( + count_entries(backend.as_ref(), Table::States), + STATES_TO_KEEP + ); + + // Oldest states should be gone + for i in 0..5u64 { + assert!(!has_key(backend.as_ref(), Table::States, &root(i))); + } + // Newest states should remain + for i in 5..total as u64 { + assert!(has_key(backend.as_ref(), Table::States, &root(i))); + } + } + + #[test] + fn prune_old_states_preserves_protected() { + let backend = Arc::new(InMemoryBackend::new()); + let mut store = Store { + backend: backend.clone(), + }; + + let total = STATES_TO_KEEP + 5; + for i in 0..total as u64 { + insert_header(backend.as_ref(), root(i), i); + insert_state(backend.as_ref(), root(i)); + } + + let finalized_root = root(0); + let justified_root = root(2); + let pruned = store.prune_old_states(&[finalized_root, justified_root]); + + // 5 would be pruned, but 2 are protected + assert_eq!(pruned, 3); + assert!(has_key(backend.as_ref(), Table::States, &finalized_root)); + assert!(has_key(backend.as_ref(), Table::States, &justified_root)); + } +}