From e083b8bd2a72a5113830eec99a91c3cbe0a01333 Mon Sep 17 00:00:00 2001 From: Pablo Deymonnaz Date: Mon, 2 Mar 2026 15:24:13 -0300 Subject: [PATCH] Skip payload deserialization in update_safe_target, batch attestation data inserts, unify validator_indices, and derive MILLISECONDS_PER_SLOT Four cleanup items from reviewing the devnet 3 merge: 1. update_safe_target was deserializing full StoredAggregatedPayload vectors (containing multi-KB proof data) from both attestation pools only to immediately discard the values and keep the keys. Added key-only iterators (iter_known_aggregated_payload_keys, iter_new_aggregated_payload_keys) that skip value deserialization entirely. 2. on_block_core called insert_attestation_data_by_root once per block body attestation plus once for the proposer, each opening a separate write batch and committing. Added insert_attestation_data_by_root_batch and collected all entries into a single commit. 3. aggregation_bits_to_validator_indices in store.rs duplicated the logic in AggregatedSignatureProof::participant_indices. Extracted a shared validator_indices() function in ethlambda_types::attestation and made both call it. 4. SECONDS_PER_SLOT was only used by two test files and was redundant with MILLISECONDS_PER_SLOT. Removed it and derived MILLISECONDS_PER_SLOT from MILLISECONDS_PER_INTERVAL * INTERVALS_PER_SLOT to prevent consistency drift. Updated test callers to match production arithmetic. --- crates/blockchain/src/lib.rs | 6 +- crates/blockchain/src/store.rs | 58 +++++++++---------- .../blockchain/tests/forkchoice_spectests.rs | 6 +- .../blockchain/tests/signature_spectests.rs | 4 +- crates/common/types/src/attestation.rs | 7 +++ crates/common/types/src/block.rs | 8 +-- crates/storage/src/store.rs | 39 +++++++++++++ 7 files changed, 83 insertions(+), 45 deletions(-) diff --git a/crates/blockchain/src/lib.rs b/crates/blockchain/src/lib.rs index b07a9536..16ec92d2 100644 --- a/crates/blockchain/src/lib.rs +++ b/crates/blockchain/src/lib.rs @@ -41,14 +41,12 @@ pub struct BlockChain { handle: GenServerHandle, } -/// Seconds in a slot. -pub const SECONDS_PER_SLOT: u64 = 4; -/// Milliseconds in a slot. -pub const MILLISECONDS_PER_SLOT: u64 = 4_000; /// Milliseconds per interval (800ms ticks). pub const MILLISECONDS_PER_INTERVAL: u64 = 800; /// Number of intervals per slot (5 intervals of 800ms = 4 seconds). pub const INTERVALS_PER_SLOT: u64 = 5; +/// Milliseconds in a slot (derived from interval duration and count). +pub const MILLISECONDS_PER_SLOT: u64 = MILLISECONDS_PER_INTERVAL * INTERVALS_PER_SLOT; impl BlockChain { pub fn spawn( store: Store, diff --git a/crates/blockchain/src/store.rs b/crates/blockchain/src/store.rs index 093950b7..7aaa5c3d 100644 --- a/crates/blockchain/src/store.rs +++ b/crates/blockchain/src/store.rs @@ -9,7 +9,7 @@ use ethlambda_types::{ ShortRoot, attestation::{ AggregatedAttestation, AggregationBits, Attestation, AttestationData, - SignedAggregatedAttestation, SignedAttestation, + SignedAggregatedAttestation, SignedAttestation, validator_indices, }, block::{ AggregatedAttestations, AggregatedSignatureProof, Block, BlockBody, @@ -94,15 +94,15 @@ fn update_safe_target(store: &mut Store) { let min_target_score = (num_validators * 2).div_ceil(3); let blocks = store.get_live_chain(); - // Merge both attestation pools. At interval 3 the migration (interval 4) hasn't - // run yet, so attestations that entered "known" directly (proposer's own attestation - // in block body, node's self-attestation) would be invisible without this merge. - let mut all_payloads: HashMap> = - store.iter_known_aggregated_payloads().collect(); - for (key, new_proofs) in store.iter_new_aggregated_payloads() { - all_payloads.entry(key).or_default().extend(new_proofs); - } - let attestations = store.extract_latest_attestations(all_payloads.into_keys()); + // Merge both attestation pools (keys only — skip payload deserialization). + // At interval 3 the migration (interval 4) hasn't run yet, so attestations + // that entered "known" directly (proposer's own attestation in block body, + // node's self-attestation) would be invisible without this merge. + let all_keys: HashSet = store + .iter_known_aggregated_payload_keys() + .chain(store.iter_new_aggregated_payload_keys()) + .collect(); + let attestations = store.extract_latest_attestations(all_keys.into_iter()); let (safe_target, _weights) = ethlambda_fork_choice::compute_lmd_ghost_head( store.latest_justified().root, &blocks, @@ -569,15 +569,16 @@ fn on_block_core( // Process block body attestations. // Store attestation data by root and proofs in known aggregated payloads. + let mut att_data_entries: Vec<(H256, AttestationData)> = Vec::new(); let mut known_entries: Vec<(SignatureKey, StoredAggregatedPayload)> = Vec::new(); for (att, proof) in aggregated_attestations .iter() .zip(attestation_signatures.iter()) { let data_root = att.data.tree_hash_root(); - store.insert_attestation_data_by_root(data_root, att.data.clone()); + att_data_entries.push((data_root, att.data.clone())); - let validator_ids = aggregation_bits_to_validator_indices(&att.aggregation_bits); + let validator_ids: Vec<_> = validator_indices(&att.aggregation_bits).collect(); let payload = StoredAggregatedPayload { slot: att.data.slot, proof: proof.clone(), @@ -588,6 +589,15 @@ fn on_block_core( metrics::inc_attestations_valid("block"); } } + + // Process proposer attestation as pending (enters "new" stage via gossip path) + // The proposer's attestation should NOT affect this block's fork choice position. + let proposer_vid = proposer_attestation.validator_id; + let proposer_data_root = proposer_attestation.data.tree_hash_root(); + att_data_entries.push((proposer_data_root, proposer_attestation.data.clone())); + + // Batch-insert all attestation data (body + proposer) in a single commit + store.insert_attestation_data_by_root_batch(att_data_entries); store.insert_known_aggregated_payloads_batch(known_entries); // Update forkchoice head based on new block and attestations @@ -595,12 +605,6 @@ fn on_block_core( // to prevent the proposer from gaining circular weight advantage. update_head(store, false); - // Process proposer attestation as pending (enters "new" stage via gossip path) - // The proposer's attestation should NOT affect this block's fork choice position. - let proposer_vid = proposer_attestation.validator_id; - let proposer_data_root = proposer_attestation.data.tree_hash_root(); - store.insert_attestation_data_by_root(proposer_data_root, proposer_attestation.data.clone()); - if !verify { // Without sig verification, insert directly with a dummy proof let participants = aggregation_bits_from_validator_indices(&[proposer_vid]); @@ -888,15 +892,7 @@ pub enum StoreError { NotProposer { validator_index: u64, slot: u64 }, } -/// Extract validator indices from aggregation bits. -fn aggregation_bits_to_validator_indices(bits: &AggregationBits) -> Vec { - bits.iter() - .enumerate() - .filter_map(|(i, bit)| if bit { Some(i as u64) } else { None }) - .collect() -} - -/// Extract validator indices from aggregation bits. +/// Build an AggregationBits bitfield from a list of validator indices. fn aggregation_bits_from_validator_indices(bits: &[u64]) -> AggregationBits { if bits.is_empty() { return AggregationBits::with_capacity(0).expect("max capacity is non-zero"); @@ -1085,8 +1081,7 @@ fn select_aggregated_proofs( let data = &aggregated.data; let message = data.tree_hash_root(); - let validator_ids = aggregation_bits_to_validator_indices(&aggregated.aggregation_bits); - let mut remaining: HashSet = validator_ids.into_iter().collect(); + let mut remaining: HashSet = validator_indices(&aggregated.aggregation_bits).collect(); // Select existing proofs that cover the most remaining validators while !remaining.is_empty() { @@ -1104,8 +1099,7 @@ fn select_aggregated_proofs( let (proof, covered) = candidates .iter() .map(|p| { - let covered: Vec<_> = aggregation_bits_to_validator_indices(&p.participants) - .into_iter() + let covered: Vec<_> = validator_indices(&p.participants) .filter(|vid| remaining.contains(vid)) .collect(); (p, covered) @@ -1161,7 +1155,7 @@ fn verify_signatures( // Verify each attestation's signature proof for (attestation, aggregated_proof) in attestations.iter().zip(attestation_signatures) { - let validator_ids = aggregation_bits_to_validator_indices(&attestation.aggregation_bits); + let validator_ids: Vec<_> = validator_indices(&attestation.aggregation_bits).collect(); if validator_ids.iter().any(|vid| *vid >= num_validators) { return Err(StoreError::InvalidValidatorIndex); } diff --git a/crates/blockchain/tests/forkchoice_spectests.rs b/crates/blockchain/tests/forkchoice_spectests.rs index bb052c70..e7222c34 100644 --- a/crates/blockchain/tests/forkchoice_spectests.rs +++ b/crates/blockchain/tests/forkchoice_spectests.rs @@ -4,7 +4,7 @@ use std::{ sync::Arc, }; -use ethlambda_blockchain::{SECONDS_PER_SLOT, store}; +use ethlambda_blockchain::{MILLISECONDS_PER_SLOT, store}; use ethlambda_storage::{Store, backend::InMemoryBackend}; use ethlambda_types::{ attestation::{Attestation, AttestationData}, @@ -58,8 +58,8 @@ fn run(path: &Path) -> datatest_stable::Result<()> { let signed_block = build_signed_block(block_data); - let block_time_ms = - (signed_block.message.block.slot * SECONDS_PER_SLOT + genesis_time) * 1000; + let block_time_ms = genesis_time * 1000 + + signed_block.message.block.slot * MILLISECONDS_PER_SLOT; // NOTE: the has_proposal argument is set to true, following the spec store::on_tick(&mut store, block_time_ms, true, false); diff --git a/crates/blockchain/tests/signature_spectests.rs b/crates/blockchain/tests/signature_spectests.rs index 16e4de4f..5d617e33 100644 --- a/crates/blockchain/tests/signature_spectests.rs +++ b/crates/blockchain/tests/signature_spectests.rs @@ -1,7 +1,7 @@ use std::path::Path; use std::sync::Arc; -use ethlambda_blockchain::{SECONDS_PER_SLOT, store}; +use ethlambda_blockchain::{MILLISECONDS_PER_SLOT, store}; use ethlambda_storage::{Store, backend::InMemoryBackend}; use ethlambda_types::{ block::{Block, SignedBlockWithAttestation}, @@ -51,7 +51,7 @@ fn run(path: &Path) -> datatest_stable::Result<()> { // Advance time to the block's slot let block_time_ms = - (signed_block.message.block.slot * SECONDS_PER_SLOT + genesis_time) * 1000; + genesis_time * 1000 + signed_block.message.block.slot * MILLISECONDS_PER_SLOT; store::on_tick(&mut st, block_time_ms, true, false); // Process the block (this includes signature verification) diff --git a/crates/common/types/src/attestation.rs b/crates/common/types/src/attestation.rs index 386bed4d..bd668266 100644 --- a/crates/common/types/src/attestation.rs +++ b/crates/common/types/src/attestation.rs @@ -64,6 +64,13 @@ pub struct AggregatedAttestation { /// in some collective action (attestation, signature aggregation, etc.). pub type AggregationBits = ssz_types::BitList; +/// Returns the indices of set bits in an `AggregationBits` bitfield as validator IDs. +pub fn validator_indices(bits: &AggregationBits) -> impl Iterator + '_ { + bits.iter() + .enumerate() + .filter_map(|(i, bit)| if bit { Some(i as u64) } else { None }) +} + /// Aggregated attestation with its signature proof, used for gossip on the aggregation topic. #[derive(Debug, Clone, Encode, Decode)] pub struct SignedAggregatedAttestation { diff --git a/crates/common/types/src/block.rs b/crates/common/types/src/block.rs index 3c152b52..3197830d 100644 --- a/crates/common/types/src/block.rs +++ b/crates/common/types/src/block.rs @@ -2,7 +2,9 @@ use serde::Serialize; use ssz_types::typenum::U1048576; use crate::{ - attestation::{AggregatedAttestation, AggregationBits, Attestation, XmssSignature}, + attestation::{ + AggregatedAttestation, AggregationBits, Attestation, XmssSignature, validator_indices, + }, primitives::{ ByteList, H256, ssz::{Decode, Encode, TreeHash}, @@ -105,9 +107,7 @@ impl AggregatedSignatureProof { /// Returns the validator indices that are set in the participants bitfield. pub fn participant_indices(&self) -> impl Iterator + '_ { - (0..self.participants.len()) - .filter(|&i| self.participants.get(i).unwrap_or(false)) - .map(|i| i as u64) + validator_indices(&self.participants) } } diff --git a/crates/storage/src/store.rs b/crates/storage/src/store.rs index 80f2528d..f0a77c38 100644 --- a/crates/storage/src/store.rs +++ b/crates/storage/src/store.rs @@ -631,6 +631,22 @@ impl Store { batch.commit().expect("commit"); } + /// Batch-insert multiple attestation data entries in a single commit. + pub fn insert_attestation_data_by_root_batch(&mut self, entries: Vec<(H256, AttestationData)>) { + if entries.is_empty() { + return; + } + let mut batch = self.backend.begin_write().expect("write batch"); + let ssz_entries = entries + .into_iter() + .map(|(root, data)| (root.as_ssz_bytes(), data.as_ssz_bytes())) + .collect(); + batch + .put_batch(Table::AttestationDataByRoot, ssz_entries) + .expect("put attestation data batch"); + batch.commit().expect("commit"); + } + /// Returns attestation data for the given root hash. pub fn get_attestation_data_by_root(&self, root: &H256) -> Option { let view = self.backend.begin_read().expect("read view"); @@ -690,6 +706,12 @@ impl Store { self.iter_aggregated_payloads(Table::LatestKnownAggregatedPayloads) } + /// Iterates over keys only from the known aggregated payloads table, + /// skipping value deserialization. + pub fn iter_known_aggregated_payload_keys(&self) -> impl Iterator + '_ { + self.iter_aggregated_payload_keys(Table::LatestKnownAggregatedPayloads) + } + /// Insert an aggregated payload into the known (fork-choice-active) table. pub fn insert_known_aggregated_payload( &mut self, @@ -719,6 +741,12 @@ impl Store { self.iter_aggregated_payloads(Table::LatestNewAggregatedPayloads) } + /// Iterates over keys only from the new aggregated payloads table, + /// skipping value deserialization. + pub fn iter_new_aggregated_payload_keys(&self) -> impl Iterator + '_ { + self.iter_aggregated_payload_keys(Table::LatestNewAggregatedPayloads) + } + /// Insert an aggregated payload into the new (pending) table. pub fn insert_new_aggregated_payload( &mut self, @@ -792,6 +820,17 @@ impl Store { entries.into_iter() } + fn iter_aggregated_payload_keys(&self, table: Table) -> impl Iterator { + let view = self.backend.begin_read().expect("read view"); + let keys: Vec<_> = view + .prefix_iterator(table, &[]) + .expect("iterator") + .filter_map(|res| res.ok()) + .map(|(k, _)| decode_signature_key(&k)) + .collect(); + keys.into_iter() + } + fn insert_aggregated_payload( &mut self, table: Table,