From 626b9664fa8bd2fa93f4b84c78b2098b3093131e Mon Sep 17 00:00:00 2001 From: Pablo Deymonnaz Date: Tue, 3 Mar 2026 17:48:36 -0300 Subject: [PATCH 1/2] Replace RocksDB aggregated payload tables with in-memory circular buffers and add slot-age pruning for attestation data Aggregated payloads (new and known) are now stored in VecDeque-based circular buffers with a 4096-entry hard cap and FIFO eviction, matching Lantern's approach. This prevents unbounded memory growth when finalization stalls (~11.5 GB OOM after ~27 min). The two RocksDB tables (LatestNewAggregatedPayloads and LatestKnownAggregatedPayloads) are removed entirely since aggregated payloads are ephemeral and rebuilt from gossip on restart. Additionally, GossipSignatures and AttestationDataByRoot are now pruned by slot age (64-slot retention window) every slot at interval 4, independent of finalization progress. --- crates/blockchain/src/store.rs | 15 +- crates/storage/src/api/tables.rs | 10 +- crates/storage/src/backend/rocksdb.rs | 2 - crates/storage/src/store.rs | 476 ++++++++++++++++---------- 4 files changed, 303 insertions(+), 200 deletions(-) diff --git a/crates/blockchain/src/store.rs b/crates/blockchain/src/store.rs index 7aaa5c3d..abc4530e 100644 --- a/crates/blockchain/src/store.rs +++ b/crates/blockchain/src/store.rs @@ -115,7 +115,7 @@ fn update_safe_target(store: &mut Store) { /// Aggregate committee signatures at interval 2. /// /// Collects individual gossip signatures, aggregates them by attestation data, -/// and stores the resulting proofs in `LatestNewAggregatedPayloads`. +/// and stores the resulting proofs in the new aggregated payloads buffer. fn aggregate_committee_signatures(store: &mut Store) -> Vec { let gossip_sigs: Vec<(SignatureKey, _)> = store.iter_gossip_signatures().collect(); if gossip_sigs.is_empty() { @@ -334,6 +334,17 @@ pub fn on_tick( 4 => { // End of slot - accept accumulated attestations and log tree accept_new_attestations(store, true); + + // Prune stale gossip signatures and attestation data by age + let (pruned_sigs, pruned_att_data) = store.prune_attestation_data_by_age(slot); + if pruned_sigs + pruned_att_data > 0 { + info!( + %slot, + pruned_sigs, + pruned_att_data, + "Pruned stale attestation data by age" + ); + } } _ => unreachable!("slots only have 5 intervals"), } @@ -1067,7 +1078,7 @@ fn build_block( /// Select existing aggregated proofs for attestations to include in a block. /// /// Fresh gossip aggregation happens at interval 2 (`aggregate_committee_signatures`). -/// This function only selects from existing proofs in the `LatestKnownAggregatedPayloads` table +/// This function only selects from existing proofs in the known aggregated payloads buffer /// (proofs from previously received blocks and promoted gossip aggregations). /// /// Returns a list of (attestation, proof) pairs ready for block inclusion. diff --git a/crates/storage/src/api/tables.rs b/crates/storage/src/api/tables.rs index 7b184d59..7f7d7a3c 100644 --- a/crates/storage/src/api/tables.rs +++ b/crates/storage/src/api/tables.rs @@ -16,12 +16,6 @@ pub enum Table { GossipSignatures, /// Attestation data indexed by tree hash root: H256 -> AttestationData AttestationDataByRoot, - /// Pending aggregated payloads (not yet active in fork choice): - /// SignatureKey -> Vec - LatestNewAggregatedPayloads, - /// Active aggregated payloads (counted in fork choice): - /// SignatureKey -> Vec - LatestKnownAggregatedPayloads, /// Metadata: string keys -> various scalar values Metadata, /// Live chain index: (slot || root) -> parent_root @@ -33,15 +27,13 @@ pub enum Table { } /// All table variants. -pub const ALL_TABLES: [Table; 10] = [ +pub const ALL_TABLES: [Table; 8] = [ Table::BlockHeaders, Table::BlockBodies, Table::BlockSignatures, Table::States, Table::GossipSignatures, Table::AttestationDataByRoot, - Table::LatestNewAggregatedPayloads, - Table::LatestKnownAggregatedPayloads, Table::Metadata, Table::LiveChain, ]; diff --git a/crates/storage/src/backend/rocksdb.rs b/crates/storage/src/backend/rocksdb.rs index 45565f18..b1338052 100644 --- a/crates/storage/src/backend/rocksdb.rs +++ b/crates/storage/src/backend/rocksdb.rs @@ -18,8 +18,6 @@ fn cf_name(table: Table) -> &'static str { Table::States => "states", Table::GossipSignatures => "gossip_signatures", Table::AttestationDataByRoot => "attestation_data_by_root", - Table::LatestNewAggregatedPayloads => "latest_new_aggregated_payloads", - Table::LatestKnownAggregatedPayloads => "latest_known_aggregated_payloads", Table::Metadata => "metadata", Table::LiveChain => "live_chain", } diff --git a/crates/storage/src/store.rs b/crates/storage/src/store.rs index 2f22db21..242a3a08 100644 --- a/crates/storage/src/store.rs +++ b/crates/storage/src/store.rs @@ -1,4 +1,4 @@ -use std::collections::{HashMap, HashSet}; +use std::collections::{HashMap, HashSet, VecDeque}; use std::sync::{Arc, LazyLock}; /// The tree hash root of an empty block body. @@ -90,6 +90,67 @@ const _: () = assert!( "BLOCKS_TO_KEEP must be >= STATES_TO_KEEP" ); +/// Hard cap for each aggregated payload buffer (new and known). +/// Matches Lantern's approach. With 9 validators, this holds +/// ~455 unique attestation messages (~30 min at 1/slot). +const AGGREGATED_PAYLOAD_CAP: usize = 4096; + +/// Attestation data retention window in slots (~4.3 min at 4s/slot). +const ATTESTATION_RETENTION_SLOTS: u64 = 64; + +/// Fixed-size circular buffer for aggregated payloads. +/// +/// Entries are evicted FIFO when the buffer reaches capacity. +/// This prevents unbounded memory growth when finalization stalls. +#[derive(Clone, Default)] +struct PayloadBuffer { + entries: VecDeque<(SignatureKey, StoredAggregatedPayload)>, + capacity: usize, +} + +impl PayloadBuffer { + fn new(capacity: usize) -> Self { + Self { + entries: VecDeque::with_capacity(capacity), + capacity, + } + } + + /// Insert one entry, FIFO-evicting the oldest if at capacity. + fn push(&mut self, key: SignatureKey, payload: StoredAggregatedPayload) { + if self.entries.len() >= self.capacity { + self.entries.pop_front(); + } + self.entries.push_back((key, payload)); + } + + /// Insert multiple entries, FIFO-evicting as needed. + fn push_batch(&mut self, entries: Vec<(SignatureKey, StoredAggregatedPayload)>) { + for (key, payload) in entries { + self.push(key, payload); + } + } + + /// Take all entries, leaving the buffer empty. + fn drain(&mut self) -> Vec<(SignatureKey, StoredAggregatedPayload)> { + self.entries.drain(..).collect() + } + + /// Group entries by key, preserving insertion order within each group. + fn grouped(&self) -> HashMap> { + let mut map: HashMap> = HashMap::new(); + for (key, payload) in &self.entries { + map.entry(*key).or_default().push(payload.clone()); + } + map + } + + /// Return deduplicated keys. + fn unique_keys(&self) -> HashSet { + self.entries.iter().map(|(key, _)| *key).collect() + } +} + // ============ Key Encoding Helpers ============ /// Encode a SignatureKey (validator_id, root) to bytes. @@ -141,6 +202,8 @@ fn decode_live_chain_key(bytes: &[u8]) -> (u64, H256) { #[derive(Clone)] pub struct Store { backend: Arc, + new_payloads: PayloadBuffer, + known_payloads: PayloadBuffer, } impl Store { @@ -276,7 +339,11 @@ impl Store { info!(%anchor_state_root, %anchor_block_root, "Initialized store"); - Self { backend } + Self { + backend, + new_payloads: PayloadBuffer::new(AGGREGATED_PAYLOAD_CAP), + known_payloads: PayloadBuffer::new(AGGREGATED_PAYLOAD_CAP), + } } // ============ Metadata Helpers ============ @@ -385,14 +452,9 @@ impl Store { { let pruned_chain = self.prune_live_chain(finalized.slot); - // Prune signatures, payloads, and attestation data for finalized slots + // Prune signatures and attestation data for finalized slots let pruned_sigs = self.prune_gossip_signatures(finalized.slot); let pruned_att_data = self.prune_attestation_data_by_root(finalized.slot); - self.prune_aggregated_payload_table(Table::LatestNewAggregatedPayloads, finalized.slot); - self.prune_aggregated_payload_table( - Table::LatestKnownAggregatedPayloads, - finalized.slot, - ); // Prune old states before blocks: state pruning uses headers for slot lookup let protected_roots = [finalized.root, self.latest_justified().root]; let pruned_states = self.prune_old_states(&protected_roots); @@ -505,40 +567,18 @@ impl Store { }) } - /// Prune an aggregated payload table (new or known) for slots <= finalized_slot. - fn prune_aggregated_payload_table(&mut self, table: Table, finalized_slot: u64) { - let view = self.backend.begin_read().expect("read view"); - let mut updates = vec![]; - let mut deletes = vec![]; - - for (key_bytes, value_bytes) in view - .prefix_iterator(table, &[]) - .expect("iter") - .filter_map(|r| r.ok()) - { - if let Ok(mut payloads) = Vec::::from_ssz_bytes(&value_bytes) { - let original_len = payloads.len(); - payloads.retain(|p| p.slot > finalized_slot); - - if payloads.is_empty() { - deletes.push(key_bytes.to_vec()); - } else if payloads.len() < original_len { - updates.push((key_bytes.to_vec(), payloads.as_ssz_bytes())); - } - } - } - drop(view); - - if !updates.is_empty() || !deletes.is_empty() { - let mut batch = self.backend.begin_write().expect("write batch"); - if !updates.is_empty() { - batch.put_batch(table, updates).expect("put"); - } - if !deletes.is_empty() { - batch.delete_batch(table, deletes).expect("delete"); - } - batch.commit().expect("commit"); + /// Prune stale gossip signatures and attestation data by slot age. + /// + /// Independent of finalization — prevents unbounded growth when finalization stalls. + /// Returns (pruned_sigs, pruned_att_data). + pub fn prune_attestation_data_by_age(&mut self, current_slot: u64) -> (usize, usize) { + let cutoff_slot = current_slot.saturating_sub(ATTESTATION_RETENTION_SLOTS); + if cutoff_slot == 0 { + return (0, 0); } + let pruned_sigs = self.prune_gossip_signatures(cutoff_slot); + let pruned_att_data = self.prune_attestation_data_by_root(cutoff_slot); + (pruned_sigs, pruned_att_data) } /// Prune old states beyond the retention window. @@ -822,7 +862,8 @@ impl Store { /// Convenience: extract latest attestation per validator from known /// (fork-choice-active) aggregated payloads only. pub fn extract_latest_known_attestations(&self) -> HashMap { - self.extract_latest_attestations(self.iter_known_aggregated_payloads().map(|(key, _)| key)) + let keys = self.known_payloads.unique_keys(); + self.extract_latest_attestations(keys.into_iter()) } // ============ Known Aggregated Payloads ============ @@ -830,34 +871,33 @@ impl Store { // "Known" aggregated payloads are active in fork choice weight calculations. // Promoted from "new" payloads at specific intervals (0 with proposal, 4). - /// Iterates over all known aggregated payloads. + /// Iterates over all known aggregated payloads, grouped by key. pub fn iter_known_aggregated_payloads( &self, - ) -> impl Iterator)> + '_ { - self.iter_aggregated_payloads(Table::LatestKnownAggregatedPayloads) + ) -> impl Iterator)> { + self.known_payloads.grouped().into_iter() } - /// Iterates over keys only from the known aggregated payloads table, - /// skipping value deserialization. - pub fn iter_known_aggregated_payload_keys(&self) -> impl Iterator + '_ { - self.iter_aggregated_payload_keys(Table::LatestKnownAggregatedPayloads) + /// Iterates over deduplicated keys from the known aggregated payloads. + pub fn iter_known_aggregated_payload_keys(&self) -> impl Iterator { + self.known_payloads.unique_keys().into_iter() } - /// Insert an aggregated payload into the known (fork-choice-active) table. + /// Insert an aggregated payload into the known (fork-choice-active) buffer. pub fn insert_known_aggregated_payload( &mut self, key: SignatureKey, payload: StoredAggregatedPayload, ) { - self.insert_aggregated_payload(Table::LatestKnownAggregatedPayloads, key, payload); + self.known_payloads.push(key, payload); } - /// Batch-insert multiple aggregated payloads into the known table in a single commit. + /// Batch-insert multiple aggregated payloads into the known buffer. pub fn insert_known_aggregated_payloads_batch( &mut self, entries: Vec<(SignatureKey, StoredAggregatedPayload)>, ) { - self.insert_aggregated_payloads_batch(Table::LatestKnownAggregatedPayloads, entries); + self.known_payloads.push_batch(entries); } // ============ New Aggregated Payloads ============ @@ -865,34 +905,33 @@ impl Store { // "New" aggregated payloads are pending — not yet counted in fork choice. // Promoted to "known" via `promote_new_aggregated_payloads`. - /// Iterates over all new (pending) aggregated payloads. + /// Iterates over all new (pending) aggregated payloads, grouped by key. pub fn iter_new_aggregated_payloads( &self, - ) -> impl Iterator)> + '_ { - self.iter_aggregated_payloads(Table::LatestNewAggregatedPayloads) + ) -> impl Iterator)> { + self.new_payloads.grouped().into_iter() } - /// Iterates over keys only from the new aggregated payloads table, - /// skipping value deserialization. - pub fn iter_new_aggregated_payload_keys(&self) -> impl Iterator + '_ { - self.iter_aggregated_payload_keys(Table::LatestNewAggregatedPayloads) + /// Iterates over deduplicated keys from the new aggregated payloads. + pub fn iter_new_aggregated_payload_keys(&self) -> impl Iterator { + self.new_payloads.unique_keys().into_iter() } - /// Insert an aggregated payload into the new (pending) table. + /// Insert an aggregated payload into the new (pending) buffer. pub fn insert_new_aggregated_payload( &mut self, key: SignatureKey, payload: StoredAggregatedPayload, ) { - self.insert_aggregated_payload(Table::LatestNewAggregatedPayloads, key, payload); + self.new_payloads.push(key, payload); } - /// Batch-insert multiple aggregated payloads into the new table in a single commit. + /// Batch-insert multiple aggregated payloads into the new buffer. pub fn insert_new_aggregated_payloads_batch( &mut self, entries: Vec<(SignatureKey, StoredAggregatedPayload)>, ) { - self.insert_aggregated_payloads_batch(Table::LatestNewAggregatedPayloads, entries); + self.new_payloads.push_batch(entries); } // ============ Pruning Helpers ============ @@ -930,132 +969,12 @@ impl Store { count } - // ============ Aggregated Payload Helpers ============ - - fn iter_aggregated_payloads( - &self, - table: Table, - ) -> impl Iterator)> { - let view = self.backend.begin_read().expect("read view"); - let entries: Vec<_> = view - .prefix_iterator(table, &[]) - .expect("iterator") - .filter_map(|res| res.ok()) - .map(|(k, v)| { - let key = decode_signature_key(&k); - let payloads = - Vec::::from_ssz_bytes(&v).expect("valid payloads"); - (key, payloads) - }) - .collect(); - entries.into_iter() - } - - fn iter_aggregated_payload_keys(&self, table: Table) -> impl Iterator { - let view = self.backend.begin_read().expect("read view"); - let keys: Vec<_> = view - .prefix_iterator(table, &[]) - .expect("iterator") - .filter_map(|res| res.ok()) - .map(|(k, _)| decode_signature_key(&k)) - .collect(); - keys.into_iter() - } - - fn insert_aggregated_payload( - &mut self, - table: Table, - key: SignatureKey, - payload: StoredAggregatedPayload, - ) { - self.insert_aggregated_payloads_batch(table, vec![(key, payload)]); - } - - /// Batch-insert multiple aggregated payloads in a single read-write-commit cycle. - /// Groups entries by key to correctly handle multiple payloads for the same key. - fn insert_aggregated_payloads_batch( - &mut self, - table: Table, - entries: Vec<(SignatureKey, StoredAggregatedPayload)>, - ) { - if entries.is_empty() { - return; - } - - // Group entries by key to handle multiple payloads for the same key - let mut grouped: HashMap, Vec> = HashMap::new(); - for (key, payload) in entries { - let encoded_key = encode_signature_key(&key); - grouped.entry(encoded_key).or_default().push(payload); - } - - let view = self.backend.begin_read().expect("read view"); - let mut batch_entries = Vec::new(); - - for (encoded_key, new_payloads) in grouped { - let mut payloads: Vec = view - .get(table, &encoded_key) - .expect("get") - .map(|bytes| Vec::::from_ssz_bytes(&bytes).expect("valid")) - .unwrap_or_default(); - payloads.extend(new_payloads); - batch_entries.push((encoded_key, payloads.as_ssz_bytes())); - } - drop(view); - - let mut batch = self.backend.begin_write().expect("write batch"); - batch - .put_batch(table, batch_entries) - .expect("put aggregated payloads"); - batch.commit().expect("commit"); - } - /// Promotes all new aggregated payloads to known, making them active in fork choice. /// - /// Merges entries from `LatestNewAggregatedPayloads` into `LatestKnownAggregatedPayloads`, - /// appending to existing payload lists rather than overwriting them. + /// Drains the new buffer and pushes all entries into the known buffer. pub fn promote_new_aggregated_payloads(&mut self) { - let view = self.backend.begin_read().expect("read view"); - let new_entries: Vec<(Vec, Vec)> = view - .prefix_iterator(Table::LatestNewAggregatedPayloads, &[]) - .expect("iterator") - .filter_map(|res| res.ok()) - .map(|(k, v)| (k.to_vec(), v.to_vec())) - .collect(); - - if new_entries.is_empty() { - drop(view); - return; - } - - // Merge new payloads with existing known payloads - let merged: Vec<(Vec, Vec)> = new_entries - .iter() - .map(|(key, new_bytes)| { - let new_payloads = - Vec::::from_ssz_bytes(new_bytes).expect("valid"); - let mut known_payloads: Vec = view - .get(Table::LatestKnownAggregatedPayloads, key) - .expect("get") - .map(|bytes| { - Vec::::from_ssz_bytes(&bytes).expect("valid") - }) - .unwrap_or_default(); - known_payloads.extend(new_payloads); - (key.clone(), known_payloads.as_ssz_bytes()) - }) - .collect(); - drop(view); - - let keys_to_delete: Vec<_> = new_entries.into_iter().map(|(k, _)| k).collect(); - let mut batch = self.backend.begin_write().expect("write batch"); - batch - .delete_batch(Table::LatestNewAggregatedPayloads, keys_to_delete) - .expect("delete new aggregated payloads"); - batch - .put_batch(Table::LatestKnownAggregatedPayloads, merged) - .expect("put known aggregated payloads"); - batch.commit().expect("commit"); + let drained = self.new_payloads.drain(); + self.known_payloads.push_batch(drained); } /// Delete specific gossip signatures by key. @@ -1256,6 +1175,8 @@ mod tests { let backend = Arc::new(InMemoryBackend::new()); let mut store = Store { backend: backend.clone(), + new_payloads: PayloadBuffer::new(AGGREGATED_PAYLOAD_CAP), + known_payloads: PayloadBuffer::new(AGGREGATED_PAYLOAD_CAP), }; // Insert exactly BLOCKS_TO_KEEP blocks @@ -1280,6 +1201,8 @@ mod tests { let backend = Arc::new(InMemoryBackend::new()); let mut store = Store { backend: backend.clone(), + new_payloads: PayloadBuffer::new(AGGREGATED_PAYLOAD_CAP), + known_payloads: PayloadBuffer::new(AGGREGATED_PAYLOAD_CAP), }; let total = BLOCKS_TO_KEEP + 10; @@ -1318,6 +1241,8 @@ mod tests { let backend = Arc::new(InMemoryBackend::new()); let mut store = Store { backend: backend.clone(), + new_payloads: PayloadBuffer::new(AGGREGATED_PAYLOAD_CAP), + known_payloads: PayloadBuffer::new(AGGREGATED_PAYLOAD_CAP), }; let total = BLOCKS_TO_KEEP + 10; @@ -1361,6 +1286,8 @@ mod tests { let backend = Arc::new(InMemoryBackend::new()); let mut store = Store { backend: backend.clone(), + new_payloads: PayloadBuffer::new(AGGREGATED_PAYLOAD_CAP), + known_payloads: PayloadBuffer::new(AGGREGATED_PAYLOAD_CAP), }; // Insert STATES_TO_KEEP headers + states @@ -1382,6 +1309,8 @@ mod tests { let backend = Arc::new(InMemoryBackend::new()); let mut store = Store { backend: backend.clone(), + new_payloads: PayloadBuffer::new(AGGREGATED_PAYLOAD_CAP), + known_payloads: PayloadBuffer::new(AGGREGATED_PAYLOAD_CAP), }; let total = STATES_TO_KEEP + 5; @@ -1413,6 +1342,8 @@ mod tests { let backend = Arc::new(InMemoryBackend::new()); let mut store = Store { backend: backend.clone(), + new_payloads: PayloadBuffer::new(AGGREGATED_PAYLOAD_CAP), + known_payloads: PayloadBuffer::new(AGGREGATED_PAYLOAD_CAP), }; let total = STATES_TO_KEEP + 5; @@ -1430,4 +1361,175 @@ mod tests { assert!(has_key(backend.as_ref(), Table::States, &finalized_root)); assert!(has_key(backend.as_ref(), Table::States, &justified_root)); } + + // ============ PayloadBuffer Tests ============ + + fn make_payload(slot: u64) -> StoredAggregatedPayload { + use ethlambda_types::attestation::AggregationBits; + use ethlambda_types::block::AggregatedSignatureProof; + + StoredAggregatedPayload { + slot, + proof: AggregatedSignatureProof::empty(AggregationBits::with_capacity(0).unwrap()), + } + } + + #[test] + fn payload_buffer_fifo_eviction() { + let mut buf = PayloadBuffer::new(3); + let key = (0u64, H256::ZERO); + + buf.push(key, make_payload(1)); + buf.push(key, make_payload(2)); + buf.push(key, make_payload(3)); + assert_eq!(buf.entries.len(), 3); + + // Pushing a 4th entry should evict the oldest (slot 1) + buf.push(key, make_payload(4)); + assert_eq!(buf.entries.len(), 3); + let slots: Vec = buf.entries.iter().map(|(_, p)| p.slot).collect(); + assert_eq!(slots, vec![2, 3, 4]); + } + + #[test] + fn payload_buffer_grouped_returns_correct_groups() { + let mut buf = PayloadBuffer::new(10); + let key_a = (0u64, H256::ZERO); + let key_b = (1u64, H256::ZERO); + + buf.push(key_a, make_payload(1)); + buf.push(key_b, make_payload(2)); + buf.push(key_a, make_payload(3)); + + let grouped = buf.grouped(); + assert_eq!(grouped.len(), 2); + assert_eq!(grouped[&key_a].len(), 2); + assert_eq!(grouped[&key_a][0].slot, 1); + assert_eq!(grouped[&key_a][1].slot, 3); + assert_eq!(grouped[&key_b].len(), 1); + assert_eq!(grouped[&key_b][0].slot, 2); + } + + #[test] + fn payload_buffer_drain_empties_buffer() { + let mut buf = PayloadBuffer::new(10); + let key = (0u64, H256::ZERO); + + buf.push(key, make_payload(1)); + buf.push(key, make_payload(2)); + + let drained = buf.drain(); + assert_eq!(drained.len(), 2); + assert!(buf.entries.is_empty()); + } + + #[test] + fn promote_moves_new_to_known() { + let backend = Arc::new(InMemoryBackend::new()); + let mut store = Store { + backend: backend.clone(), + new_payloads: PayloadBuffer::new(AGGREGATED_PAYLOAD_CAP), + known_payloads: PayloadBuffer::new(AGGREGATED_PAYLOAD_CAP), + }; + + let key = (0u64, H256::ZERO); + store.insert_new_aggregated_payload(key, make_payload(1)); + store.insert_new_aggregated_payload(key, make_payload(2)); + + assert_eq!(store.new_payloads.entries.len(), 2); + assert_eq!(store.known_payloads.entries.len(), 0); + + store.promote_new_aggregated_payloads(); + + assert_eq!(store.new_payloads.entries.len(), 0); + assert_eq!(store.known_payloads.entries.len(), 2); + } + + fn make_attestation_data(slot: u64) -> AttestationData { + AttestationData { + slot, + head: Checkpoint::default(), + target: Checkpoint::default(), + source: Checkpoint::default(), + } + } + + fn make_gossip_signature(slot: u64) -> StoredSignature { + StoredSignature { + slot, + signature_bytes: vec![0u8; 32], + } + } + + #[test] + fn prune_attestation_data_by_age_removes_old() { + let backend = Arc::new(InMemoryBackend::new()); + let mut store = Store { + backend: backend.clone(), + new_payloads: PayloadBuffer::new(AGGREGATED_PAYLOAD_CAP), + known_payloads: PayloadBuffer::new(AGGREGATED_PAYLOAD_CAP), + }; + + // Insert gossip signatures at slots 10 and 100 via raw backend + { + let mut batch = backend.begin_write().expect("write batch"); + let key10 = encode_signature_key(&(0u64, root(10))); + let key100 = encode_signature_key(&(1u64, root(100))); + batch + .put_batch( + Table::GossipSignatures, + vec![ + (key10, make_gossip_signature(10).as_ssz_bytes()), + (key100, make_gossip_signature(100).as_ssz_bytes()), + ], + ) + .expect("put"); + batch.commit().expect("commit"); + } + + // Insert attestation data at slots 10 and 100 + store.insert_attestation_data_by_root(root(10), make_attestation_data(10)); + store.insert_attestation_data_by_root(root(100), make_attestation_data(100)); + + // Prune with current_slot = 100, retention = 64 → cutoff = 36 + let (pruned_sigs, pruned_att_data) = store.prune_attestation_data_by_age(100); + assert_eq!(pruned_sigs, 1); // slot 10 <= 36 + assert_eq!(pruned_att_data, 1); // slot 10 <= 36 + + // Slot 100 entries should remain + assert_eq!(count_entries(backend.as_ref(), Table::GossipSignatures), 1); + assert_eq!( + count_entries(backend.as_ref(), Table::AttestationDataByRoot), + 1 + ); + } + + #[test] + fn prune_attestation_data_by_age_noop_early_slots() { + let backend = Arc::new(InMemoryBackend::new()); + let mut store = Store { + backend: backend.clone(), + new_payloads: PayloadBuffer::new(AGGREGATED_PAYLOAD_CAP), + known_payloads: PayloadBuffer::new(AGGREGATED_PAYLOAD_CAP), + }; + + // Insert gossip signature at slot 5 via raw backend + { + let mut batch = backend.begin_write().expect("write batch"); + let key = encode_signature_key(&(0u64, root(5))); + batch + .put_batch( + Table::GossipSignatures, + vec![(key, make_gossip_signature(5).as_ssz_bytes())], + ) + .expect("put"); + batch.commit().expect("commit"); + } + + // current_slot = 30 → cutoff = 30 - 64 = 0 (saturating) → short-circuits + let (pruned_sigs, pruned_att_data) = store.prune_attestation_data_by_age(30); + assert_eq!(pruned_sigs, 0); + assert_eq!(pruned_att_data, 0); + assert_eq!(count_entries(backend.as_ref(), Table::GossipSignatures), 1); + } } From db87c790cd745a500ba627da0781f658176383d7 Mon Sep 17 00:00:00 2001 From: Pablo Deymonnaz Date: Wed, 4 Mar 2026 14:28:13 -0300 Subject: [PATCH 2/2] Address PR review feedback: wrap payload buffers in Arc, remove Default derive, lower new_payloads cap, remove age-based pruning, and add test helpers for Store construction in tests - Remove #[derive(Default)] from PayloadBuffer (capacity-0 footgun) - Wrap new_payloads/known_payloads in Arc> so Store clones share buffer state, with a test verifying this - Use separate NEW_PAYLOAD_CAP (512) for new_payloads since they are drained every interval (~4s) - Remove prune_attestation_data_by_age and ATTESTATION_RETENTION_SLOTS as the circular buffer alone handles memory bounding - Add Store::test_store() and test_store_with_backend() helpers so block/state pruning tests don't need to know about PayloadBuffer --- crates/blockchain/src/store.rs | 11 -- crates/storage/src/store.rs | 224 ++++++++++----------------------- 2 files changed, 69 insertions(+), 166 deletions(-) diff --git a/crates/blockchain/src/store.rs b/crates/blockchain/src/store.rs index abc4530e..63d62ef3 100644 --- a/crates/blockchain/src/store.rs +++ b/crates/blockchain/src/store.rs @@ -334,17 +334,6 @@ pub fn on_tick( 4 => { // End of slot - accept accumulated attestations and log tree accept_new_attestations(store, true); - - // Prune stale gossip signatures and attestation data by age - let (pruned_sigs, pruned_att_data) = store.prune_attestation_data_by_age(slot); - if pruned_sigs + pruned_att_data > 0 { - info!( - %slot, - pruned_sigs, - pruned_att_data, - "Pruned stale attestation data by age" - ); - } } _ => unreachable!("slots only have 5 intervals"), } diff --git a/crates/storage/src/store.rs b/crates/storage/src/store.rs index 242a3a08..a69d39c2 100644 --- a/crates/storage/src/store.rs +++ b/crates/storage/src/store.rs @@ -1,5 +1,5 @@ use std::collections::{HashMap, HashSet, VecDeque}; -use std::sync::{Arc, LazyLock}; +use std::sync::{Arc, LazyLock, Mutex}; /// The tree hash root of an empty block body. /// @@ -90,19 +90,21 @@ const _: () = assert!( "BLOCKS_TO_KEEP must be >= STATES_TO_KEEP" ); -/// Hard cap for each aggregated payload buffer (new and known). +/// Hard cap for the known aggregated payload buffer. /// Matches Lantern's approach. With 9 validators, this holds /// ~455 unique attestation messages (~30 min at 1/slot). const AGGREGATED_PAYLOAD_CAP: usize = 4096; -/// Attestation data retention window in slots (~4.3 min at 4s/slot). -const ATTESTATION_RETENTION_SLOTS: u64 = 64; +/// Hard cap for the new (pending) aggregated payload buffer. +/// Smaller than known since new payloads are drained every interval (~4s). +/// With 9 validators at 1 attestation/slot, one interval holds ~9 entries. +const NEW_PAYLOAD_CAP: usize = 512; /// Fixed-size circular buffer for aggregated payloads. /// /// Entries are evicted FIFO when the buffer reaches capacity. /// This prevents unbounded memory growth when finalization stalls. -#[derive(Clone, Default)] +#[derive(Clone)] struct PayloadBuffer { entries: VecDeque<(SignatureKey, StoredAggregatedPayload)>, capacity: usize, @@ -202,8 +204,8 @@ fn decode_live_chain_key(bytes: &[u8]) -> (u64, H256) { #[derive(Clone)] pub struct Store { backend: Arc, - new_payloads: PayloadBuffer, - known_payloads: PayloadBuffer, + new_payloads: Arc>, + known_payloads: Arc>, } impl Store { @@ -341,8 +343,8 @@ impl Store { Self { backend, - new_payloads: PayloadBuffer::new(AGGREGATED_PAYLOAD_CAP), - known_payloads: PayloadBuffer::new(AGGREGATED_PAYLOAD_CAP), + new_payloads: Arc::new(Mutex::new(PayloadBuffer::new(NEW_PAYLOAD_CAP))), + known_payloads: Arc::new(Mutex::new(PayloadBuffer::new(AGGREGATED_PAYLOAD_CAP))), } } @@ -567,20 +569,6 @@ impl Store { }) } - /// Prune stale gossip signatures and attestation data by slot age. - /// - /// Independent of finalization — prevents unbounded growth when finalization stalls. - /// Returns (pruned_sigs, pruned_att_data). - pub fn prune_attestation_data_by_age(&mut self, current_slot: u64) -> (usize, usize) { - let cutoff_slot = current_slot.saturating_sub(ATTESTATION_RETENTION_SLOTS); - if cutoff_slot == 0 { - return (0, 0); - } - let pruned_sigs = self.prune_gossip_signatures(cutoff_slot); - let pruned_att_data = self.prune_attestation_data_by_root(cutoff_slot); - (pruned_sigs, pruned_att_data) - } - /// Prune old states beyond the retention window. /// /// Keeps the most recent `STATES_TO_KEEP` states (by slot), plus any @@ -862,7 +850,7 @@ impl Store { /// Convenience: extract latest attestation per validator from known /// (fork-choice-active) aggregated payloads only. pub fn extract_latest_known_attestations(&self) -> HashMap { - let keys = self.known_payloads.unique_keys(); + let keys = self.known_payloads.lock().unwrap().unique_keys(); self.extract_latest_attestations(keys.into_iter()) } @@ -875,12 +863,16 @@ impl Store { pub fn iter_known_aggregated_payloads( &self, ) -> impl Iterator)> { - self.known_payloads.grouped().into_iter() + self.known_payloads.lock().unwrap().grouped().into_iter() } /// Iterates over deduplicated keys from the known aggregated payloads. pub fn iter_known_aggregated_payload_keys(&self) -> impl Iterator { - self.known_payloads.unique_keys().into_iter() + self.known_payloads + .lock() + .unwrap() + .unique_keys() + .into_iter() } /// Insert an aggregated payload into the known (fork-choice-active) buffer. @@ -889,7 +881,7 @@ impl Store { key: SignatureKey, payload: StoredAggregatedPayload, ) { - self.known_payloads.push(key, payload); + self.known_payloads.lock().unwrap().push(key, payload); } /// Batch-insert multiple aggregated payloads into the known buffer. @@ -897,7 +889,7 @@ impl Store { &mut self, entries: Vec<(SignatureKey, StoredAggregatedPayload)>, ) { - self.known_payloads.push_batch(entries); + self.known_payloads.lock().unwrap().push_batch(entries); } // ============ New Aggregated Payloads ============ @@ -909,12 +901,12 @@ impl Store { pub fn iter_new_aggregated_payloads( &self, ) -> impl Iterator)> { - self.new_payloads.grouped().into_iter() + self.new_payloads.lock().unwrap().grouped().into_iter() } /// Iterates over deduplicated keys from the new aggregated payloads. pub fn iter_new_aggregated_payload_keys(&self) -> impl Iterator { - self.new_payloads.unique_keys().into_iter() + self.new_payloads.lock().unwrap().unique_keys().into_iter() } /// Insert an aggregated payload into the new (pending) buffer. @@ -923,7 +915,7 @@ impl Store { key: SignatureKey, payload: StoredAggregatedPayload, ) { - self.new_payloads.push(key, payload); + self.new_payloads.lock().unwrap().push(key, payload); } /// Batch-insert multiple aggregated payloads into the new buffer. @@ -931,7 +923,7 @@ impl Store { &mut self, entries: Vec<(SignatureKey, StoredAggregatedPayload)>, ) { - self.new_payloads.push_batch(entries); + self.new_payloads.lock().unwrap().push_batch(entries); } // ============ Pruning Helpers ============ @@ -973,8 +965,8 @@ impl Store { /// /// Drains the new buffer and pushes all entries into the known buffer. pub fn promote_new_aggregated_payloads(&mut self) { - let drained = self.new_payloads.drain(); - self.known_payloads.push_batch(drained); + let drained = self.new_payloads.lock().unwrap().drain(); + self.known_payloads.lock().unwrap().push_batch(drained); } /// Delete specific gossip signatures by key. @@ -1168,16 +1160,34 @@ mod tests { H256::from(bytes) } + impl Store { + /// Create a Store with an in-memory backend for tests. + fn test_store() -> Self { + let backend = Arc::new(InMemoryBackend::new()); + Self { + backend, + new_payloads: Arc::new(Mutex::new(PayloadBuffer::new(NEW_PAYLOAD_CAP))), + known_payloads: Arc::new(Mutex::new(PayloadBuffer::new(AGGREGATED_PAYLOAD_CAP))), + } + } + + /// Create a Store with a shared in-memory backend for tests that need + /// direct backend access. + fn test_store_with_backend(backend: Arc) -> Self { + Self { + backend, + new_payloads: Arc::new(Mutex::new(PayloadBuffer::new(NEW_PAYLOAD_CAP))), + known_payloads: Arc::new(Mutex::new(PayloadBuffer::new(AGGREGATED_PAYLOAD_CAP))), + } + } + } + // ============ Block Pruning Tests ============ #[test] fn prune_old_blocks_within_retention() { let backend = Arc::new(InMemoryBackend::new()); - let mut store = Store { - backend: backend.clone(), - new_payloads: PayloadBuffer::new(AGGREGATED_PAYLOAD_CAP), - known_payloads: PayloadBuffer::new(AGGREGATED_PAYLOAD_CAP), - }; + let mut store = Store::test_store_with_backend(backend.clone()); // Insert exactly BLOCKS_TO_KEEP blocks for i in 0..BLOCKS_TO_KEEP as u64 { @@ -1199,11 +1209,7 @@ mod tests { #[test] fn prune_old_blocks_exceeding_retention() { let backend = Arc::new(InMemoryBackend::new()); - let mut store = Store { - backend: backend.clone(), - new_payloads: PayloadBuffer::new(AGGREGATED_PAYLOAD_CAP), - known_payloads: PayloadBuffer::new(AGGREGATED_PAYLOAD_CAP), - }; + let mut store = Store::test_store_with_backend(backend.clone()); let total = BLOCKS_TO_KEEP + 10; for i in 0..total as u64 { @@ -1239,11 +1245,7 @@ mod tests { #[test] fn prune_old_blocks_preserves_protected() { let backend = Arc::new(InMemoryBackend::new()); - let mut store = Store { - backend: backend.clone(), - new_payloads: PayloadBuffer::new(AGGREGATED_PAYLOAD_CAP), - known_payloads: PayloadBuffer::new(AGGREGATED_PAYLOAD_CAP), - }; + let mut store = Store::test_store_with_backend(backend.clone()); let total = BLOCKS_TO_KEEP + 10; for i in 0..total as u64 { @@ -1284,11 +1286,7 @@ mod tests { #[test] fn prune_old_states_within_retention() { let backend = Arc::new(InMemoryBackend::new()); - let mut store = Store { - backend: backend.clone(), - new_payloads: PayloadBuffer::new(AGGREGATED_PAYLOAD_CAP), - known_payloads: PayloadBuffer::new(AGGREGATED_PAYLOAD_CAP), - }; + let mut store = Store::test_store_with_backend(backend.clone()); // Insert STATES_TO_KEEP headers + states for i in 0..STATES_TO_KEEP as u64 { @@ -1307,11 +1305,7 @@ mod tests { #[test] fn prune_old_states_exceeding_retention() { let backend = Arc::new(InMemoryBackend::new()); - let mut store = Store { - backend: backend.clone(), - new_payloads: PayloadBuffer::new(AGGREGATED_PAYLOAD_CAP), - known_payloads: PayloadBuffer::new(AGGREGATED_PAYLOAD_CAP), - }; + let mut store = Store::test_store_with_backend(backend.clone()); let total = STATES_TO_KEEP + 5; for i in 0..total as u64 { @@ -1340,11 +1334,7 @@ mod tests { #[test] fn prune_old_states_preserves_protected() { let backend = Arc::new(InMemoryBackend::new()); - let mut store = Store { - backend: backend.clone(), - new_payloads: PayloadBuffer::new(AGGREGATED_PAYLOAD_CAP), - known_payloads: PayloadBuffer::new(AGGREGATED_PAYLOAD_CAP), - }; + let mut store = Store::test_store_with_backend(backend.clone()); let total = STATES_TO_KEEP + 5; for i in 0..total as u64 { @@ -1425,111 +1415,35 @@ mod tests { #[test] fn promote_moves_new_to_known() { - let backend = Arc::new(InMemoryBackend::new()); - let mut store = Store { - backend: backend.clone(), - new_payloads: PayloadBuffer::new(AGGREGATED_PAYLOAD_CAP), - known_payloads: PayloadBuffer::new(AGGREGATED_PAYLOAD_CAP), - }; + let mut store = Store::test_store(); let key = (0u64, H256::ZERO); store.insert_new_aggregated_payload(key, make_payload(1)); store.insert_new_aggregated_payload(key, make_payload(2)); - assert_eq!(store.new_payloads.entries.len(), 2); - assert_eq!(store.known_payloads.entries.len(), 0); + assert_eq!(store.new_payloads.lock().unwrap().entries.len(), 2); + assert_eq!(store.known_payloads.lock().unwrap().entries.len(), 0); store.promote_new_aggregated_payloads(); - assert_eq!(store.new_payloads.entries.len(), 0); - assert_eq!(store.known_payloads.entries.len(), 2); - } - - fn make_attestation_data(slot: u64) -> AttestationData { - AttestationData { - slot, - head: Checkpoint::default(), - target: Checkpoint::default(), - source: Checkpoint::default(), - } - } - - fn make_gossip_signature(slot: u64) -> StoredSignature { - StoredSignature { - slot, - signature_bytes: vec![0u8; 32], - } + assert_eq!(store.new_payloads.lock().unwrap().entries.len(), 0); + assert_eq!(store.known_payloads.lock().unwrap().entries.len(), 2); } #[test] - fn prune_attestation_data_by_age_removes_old() { - let backend = Arc::new(InMemoryBackend::new()); - let mut store = Store { - backend: backend.clone(), - new_payloads: PayloadBuffer::new(AGGREGATED_PAYLOAD_CAP), - known_payloads: PayloadBuffer::new(AGGREGATED_PAYLOAD_CAP), - }; + fn cloned_store_shares_payload_buffers() { + let mut store = Store::test_store(); + let cloned = store.clone(); - // Insert gossip signatures at slots 10 and 100 via raw backend - { - let mut batch = backend.begin_write().expect("write batch"); - let key10 = encode_signature_key(&(0u64, root(10))); - let key100 = encode_signature_key(&(1u64, root(100))); - batch - .put_batch( - Table::GossipSignatures, - vec![ - (key10, make_gossip_signature(10).as_ssz_bytes()), - (key100, make_gossip_signature(100).as_ssz_bytes()), - ], - ) - .expect("put"); - batch.commit().expect("commit"); - } - - // Insert attestation data at slots 10 and 100 - store.insert_attestation_data_by_root(root(10), make_attestation_data(10)); - store.insert_attestation_data_by_root(root(100), make_attestation_data(100)); - - // Prune with current_slot = 100, retention = 64 → cutoff = 36 - let (pruned_sigs, pruned_att_data) = store.prune_attestation_data_by_age(100); - assert_eq!(pruned_sigs, 1); // slot 10 <= 36 - assert_eq!(pruned_att_data, 1); // slot 10 <= 36 - - // Slot 100 entries should remain - assert_eq!(count_entries(backend.as_ref(), Table::GossipSignatures), 1); - assert_eq!( - count_entries(backend.as_ref(), Table::AttestationDataByRoot), - 1 - ); - } + let key = (0u64, H256::ZERO); + store.insert_new_aggregated_payload(key, make_payload(1)); - #[test] - fn prune_attestation_data_by_age_noop_early_slots() { - let backend = Arc::new(InMemoryBackend::new()); - let mut store = Store { - backend: backend.clone(), - new_payloads: PayloadBuffer::new(AGGREGATED_PAYLOAD_CAP), - known_payloads: PayloadBuffer::new(AGGREGATED_PAYLOAD_CAP), - }; + // Modification on original should be visible in clone + assert_eq!(cloned.new_payloads.lock().unwrap().entries.len(), 1); - // Insert gossip signature at slot 5 via raw backend - { - let mut batch = backend.begin_write().expect("write batch"); - let key = encode_signature_key(&(0u64, root(5))); - batch - .put_batch( - Table::GossipSignatures, - vec![(key, make_gossip_signature(5).as_ssz_bytes())], - ) - .expect("put"); - batch.commit().expect("commit"); - } + store.promote_new_aggregated_payloads(); - // current_slot = 30 → cutoff = 30 - 64 = 0 (saturating) → short-circuits - let (pruned_sigs, pruned_att_data) = store.prune_attestation_data_by_age(30); - assert_eq!(pruned_sigs, 0); - assert_eq!(pruned_att_data, 0); - assert_eq!(count_entries(backend.as_ref(), Table::GossipSignatures), 1); + assert_eq!(cloned.new_payloads.lock().unwrap().entries.len(), 0); + assert_eq!(cloned.known_payloads.lock().unwrap().entries.len(), 1); } }