diff --git a/Cargo.lock b/Cargo.lock index f2039d4e..4bab7659 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1971,6 +1971,7 @@ dependencies = [ "ethlambda-metrics", "ethlambda-state-transition", "ethlambda-storage", + "ethlambda-test-fixtures", "ethlambda-types", "hex", "serde", @@ -2059,6 +2060,7 @@ version = "0.1.0" dependencies = [ "datatest-stable 0.3.3", "ethlambda-metrics", + "ethlambda-test-fixtures", "ethlambda-types", "hex", "serde", @@ -2077,6 +2079,15 @@ dependencies = [ "tracing", ] +[[package]] +name = "ethlambda-test-fixtures" +version = "0.1.0" +dependencies = [ + "ethlambda-types", + "hex", + "serde", +] + [[package]] name = "ethlambda-types" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index 33ed1ea2..1648a2bd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,6 +8,7 @@ members = [ "crates/blockchain/state_transition", "crates/common/crypto", "crates/common/metrics", + "crates/common/test-fixtures", "crates/common/types", "crates/net/p2p", "crates/net/rpc", @@ -30,6 +31,7 @@ ethlambda-fork-choice = { path = "crates/blockchain/fork_choice" } ethlambda-state-transition = { path = "crates/blockchain/state_transition" } ethlambda-crypto = { path = "crates/common/crypto" } ethlambda-metrics = { path = "crates/common/metrics" } +ethlambda-test-fixtures = { path = "crates/common/test-fixtures" } ethlambda-types = { path = "crates/common/types" } ethlambda-p2p = { path = "crates/net/p2p" } ethlambda-rpc = { path = "crates/net/rpc" } diff --git a/bin/ethlambda/src/checkpoint_sync.rs b/bin/ethlambda/src/checkpoint_sync.rs index c0ce60ad..89906693 100644 --- a/bin/ethlambda/src/checkpoint_sync.rs +++ b/bin/ethlambda/src/checkpoint_sync.rs @@ -193,8 +193,9 @@ fn verify_checkpoint_state( mod tests { use super::*; use ethlambda_types::block::BlockHeader; + use ethlambda_types::checkpoint::Checkpoint; use ethlambda_types::primitives::VariableList; - use ethlambda_types::state::{ChainConfig, Checkpoint}; + use ethlambda_types::state::ChainConfig; // Helper to create valid test state fn create_test_state(slot: u64, validators: Vec, genesis_time: u64) -> State { diff --git a/crates/blockchain/Cargo.toml b/crates/blockchain/Cargo.toml index 8f28be5d..535b010c 100644 --- a/crates/blockchain/Cargo.toml +++ b/crates/blockchain/Cargo.toml @@ -28,6 +28,7 @@ tracing.workspace = true hex.workspace = true [dev-dependencies] +ethlambda-test-fixtures.workspace = true serde = { workspace = true } serde_json = { workspace = true } hex = { workspace = true } diff --git a/crates/blockchain/fork_choice/src/lib.rs b/crates/blockchain/fork_choice/src/lib.rs index 4f746c3e..b35a6d38 100644 --- a/crates/blockchain/fork_choice/src/lib.rs +++ b/crates/blockchain/fork_choice/src/lib.rs @@ -84,7 +84,7 @@ pub fn compute_lmd_ghost_head( #[cfg(test)] mod tests { use super::*; - use ethlambda_types::state::Checkpoint; + use ethlambda_types::checkpoint::Checkpoint; fn make_attestation(head_root: H256, slot: u64) -> AttestationData { AttestationData { diff --git a/crates/blockchain/src/fork_choice_tree.rs b/crates/blockchain/src/fork_choice_tree.rs index 52565374..b0c98066 100644 --- a/crates/blockchain/src/fork_choice_tree.rs +++ b/crates/blockchain/src/fork_choice_tree.rs @@ -1,7 +1,7 @@ use std::collections::HashMap; use std::fmt::Write; -use ethlambda_types::{ShortRoot, primitives::H256, state::Checkpoint}; +use ethlambda_types::{ShortRoot, checkpoint::Checkpoint, primitives::H256}; /// Maximum depth of the tree to display before truncating with `...`. const MAX_DISPLAY_DEPTH: usize = 20; diff --git a/crates/blockchain/src/lib.rs b/crates/blockchain/src/lib.rs index 06aa6c40..b07a9536 100644 --- a/crates/blockchain/src/lib.rs +++ b/crates/blockchain/src/lib.rs @@ -7,9 +7,9 @@ use ethlambda_types::{ ShortRoot, attestation::{Attestation, AttestationData, SignedAggregatedAttestation, SignedAttestation}, block::{BlockSignatures, BlockWithAttestation, SignedBlockWithAttestation}, + checkpoint::Checkpoint, primitives::{H256, ssz::TreeHash}, signature::ValidatorSecretKey, - state::Checkpoint, }; use spawned_concurrency::tasks::{ CallResponse, CastResponse, GenServer, GenServerHandle, send_after, @@ -179,7 +179,7 @@ impl BlockChainServer { // Update safe target slot metric (updated by store.on_tick at interval 3) metrics::update_safe_target_slot(self.store.safe_target_slot()); - // Update head slot metric (head may change when attestations are promoted at intervals 0/3) + // Update head slot metric (head may change when attestations are promoted at intervals 0/4) metrics::update_head_slot(self.store.head_slot()); } diff --git a/crates/blockchain/src/store.rs b/crates/blockchain/src/store.rs index bce8dc49..093950b7 100644 --- a/crates/blockchain/src/store.rs +++ b/crates/blockchain/src/store.rs @@ -15,9 +15,10 @@ use ethlambda_types::{ AggregatedAttestations, AggregatedSignatureProof, Block, BlockBody, SignedBlockWithAttestation, }, + checkpoint::Checkpoint, primitives::{H256, ssz::TreeHash}, signature::ValidatorSignature, - state::{Checkpoint, State}, + state::State, }; use tracing::{info, trace, warn}; @@ -101,7 +102,7 @@ fn update_safe_target(store: &mut Store) { for (key, new_proofs) in store.iter_new_aggregated_payloads() { all_payloads.entry(key).or_default().extend(new_proofs); } - let attestations = store.extract_latest_attestations(all_payloads.into_iter()); + let attestations = store.extract_latest_attestations(all_payloads.into_keys()); let (safe_target, _weights) = ethlambda_fork_choice::compute_lmd_ghost_head( store.latest_justified().root, &blocks, @@ -129,6 +130,7 @@ fn aggregate_committee_signatures(store: &mut Store) -> Vec> = HashMap::new(); let mut keys_to_delete: Vec = Vec::new(); + let mut payload_entries: Vec<(SignatureKey, StoredAggregatedPayload)> = Vec::new(); for ((validator_id, data_root), stored_sig) in &gossip_sigs { if let Ok(sig) = stored_sig.to_validator_signature() { @@ -145,7 +147,6 @@ fn aggregate_committee_signatures(store: &mut Store) -> Vec Vec Vec Vec>()?; - let message = aggregated.data.tree_hash_root(); + let data_root = aggregated.data.tree_hash_root(); let epoch: u32 = aggregated.data.slot.try_into().expect("slot exceeds u32"); ethlambda_crypto::verify_aggregated_signature( &aggregated.proof.proof_data, pubkeys, - &message, + &data_root, epoch, ) .map_err(StoreError::AggregateVerificationFailed)?; // Store attestation data by root (content-addressed, idempotent) - let data_root = aggregated.data.tree_hash_root(); store.insert_attestation_data_by_root(data_root, aggregated.data.clone()); - // Store one aggregated payload per participating validator - for validator_id in aggregated.proof.participant_indices() { - let payload = StoredAggregatedPayload { - slot: aggregated.data.slot, - proof: aggregated.proof.clone(), - }; - store.insert_new_aggregated_payload((validator_id, data_root), payload); - } + // Store one aggregated payload per participating validator (batch insert) + let entries: Vec<_> = aggregated + .proof + .participant_indices() + .map(|validator_id| { + let payload = StoredAggregatedPayload { + slot: aggregated.data.slot, + proof: aggregated.proof.clone(), + }; + ((validator_id, data_root), payload) + }) + .collect(); + store.insert_new_aggregated_payloads_batch(entries); let slot = aggregated.data.slot; let num_participants = aggregated.proof.participants.num_set_bits(); @@ -531,8 +540,6 @@ fn on_block_core( let block = signed_block.message.block.clone(); let proposer_attestation = signed_block.message.proposer_attestation.clone(); - let block_root = block.tree_hash_root(); - let slot = block.slot; // Execute state transition function to compute post-block state let mut post_state = parent_state; @@ -562,6 +569,7 @@ fn on_block_core( // Process block body attestations. // Store attestation data by root and proofs in known aggregated payloads. + let mut known_entries: Vec<(SignatureKey, StoredAggregatedPayload)> = Vec::new(); for (att, proof) in aggregated_attestations .iter() .zip(attestation_signatures.iter()) @@ -576,12 +584,11 @@ fn on_block_core( }; for validator_id in &validator_ids { - // Store proof in known aggregated payloads (active in fork choice) - store.insert_known_aggregated_payload((*validator_id, data_root), payload.clone()); - + known_entries.push(((*validator_id, data_root), payload.clone())); metrics::inc_attestations_valid("block"); } } + store.insert_known_aggregated_payloads_batch(known_entries); // Update forkchoice head based on new block and attestations // IMPORTANT: This must happen BEFORE processing proposer attestation @@ -607,7 +614,12 @@ fn on_block_core( let proposer_sig = ValidatorSignature::from_bytes(&signed_block.signature.proposer_signature) .map_err(|_| StoreError::SignatureDecodingFailed)?; - store.insert_gossip_signature(&proposer_attestation.data, proposer_vid, proposer_sig); + store.insert_gossip_signature( + proposer_data_root, + proposer_attestation.data.slot, + proposer_vid, + proposer_sig, + ); } info!(%slot, %block_root, %state_root, "Processed new block"); @@ -748,8 +760,11 @@ pub fn produce_block_with_signatures( }); } - // Convert known aggregated payloads to Attestation objects for build_block - let known_attestations = store.extract_latest_known_attestations(); + // Single pass over known aggregated payloads: extract both attestation data and proofs + let known_payloads: Vec<_> = store.iter_known_aggregated_payloads().collect(); + + let known_attestations = + store.extract_latest_attestations(known_payloads.iter().map(|(key, _)| *key)); let available_attestations: Vec = known_attestations .into_iter() .map(|(validator_id, data)| Attestation { validator_id, data }) @@ -758,9 +773,9 @@ pub fn produce_block_with_signatures( // Get known block roots for attestation validation let known_block_roots = store.get_block_roots(); - // Collect existing proofs for block building from known aggregated payloads - let aggregated_payloads: HashMap> = store - .iter_known_aggregated_payloads() + // Collect existing proofs for block building from the already-fetched payloads + let aggregated_payloads: HashMap> = known_payloads + .into_iter() .map(|(key, stored_payloads)| { let proofs = stored_payloads.into_iter().map(|sp| sp.proof).collect(); (key, proofs) diff --git a/crates/blockchain/state_transition/Cargo.toml b/crates/blockchain/state_transition/Cargo.toml index d5335b8f..7f03716f 100644 --- a/crates/blockchain/state_transition/Cargo.toml +++ b/crates/blockchain/state_transition/Cargo.toml @@ -17,6 +17,7 @@ thiserror.workspace = true tracing.workspace = true [dev-dependencies] +ethlambda-test-fixtures.workspace = true serde.workspace = true serde_json.workspace = true diff --git a/crates/blockchain/state_transition/src/lib.rs b/crates/blockchain/state_transition/src/lib.rs index 8a0ce05b..7d17d4e1 100644 --- a/crates/blockchain/state_transition/src/lib.rs +++ b/crates/blockchain/state_transition/src/lib.rs @@ -3,8 +3,9 @@ use std::collections::HashMap; use ethlambda_types::{ ShortRoot, block::{AggregatedAttestations, Block, BlockHeader}, + checkpoint::Checkpoint, primitives::{H256, ssz::TreeHash}, - state::{Checkpoint, JustificationValidators, State}, + state::{JustificationValidators, State}, }; use tracing::info; diff --git a/crates/blockchain/state_transition/tests/types.rs b/crates/blockchain/state_transition/tests/types.rs index 83e9c9f0..ed8bc633 100644 --- a/crates/blockchain/state_transition/tests/types.rs +++ b/crates/blockchain/state_transition/tests/types.rs @@ -1,5 +1,6 @@ -use ethlambda_types::primitives::{BitList, H256, VariableList}; -use ethlambda_types::state::{State, ValidatorPubkeyBytes}; +pub use ethlambda_test_fixtures::*; + +use ethlambda_types::primitives::H256; use serde::Deserialize; use std::collections::HashMap; use std::path::Path; @@ -36,226 +37,6 @@ pub struct StateTransitionTest { pub info: TestInfo, } -/// Pre-state of the beacon chain -#[derive(Debug, Clone, Deserialize)] -pub struct TestState { - pub config: Config, - pub slot: u64, - #[serde(rename = "latestBlockHeader")] - pub latest_block_header: BlockHeader, - #[serde(rename = "latestJustified")] - pub latest_justified: Checkpoint, - #[serde(rename = "latestFinalized")] - pub latest_finalized: Checkpoint, - #[serde(rename = "historicalBlockHashes")] - pub historical_block_hashes: Container, - #[serde(rename = "justifiedSlots")] - pub justified_slots: Container, - pub validators: Container, - #[serde(rename = "justificationsRoots")] - pub justifications_roots: Container, - #[serde(rename = "justificationsValidators")] - pub justifications_validators: Container, -} - -impl From for State { - fn from(value: TestState) -> Self { - let historical_block_hashes = - VariableList::new(value.historical_block_hashes.data).unwrap(); - let validators = - VariableList::new(value.validators.data.into_iter().map(Into::into).collect()).unwrap(); - let justifications_roots = VariableList::new(value.justifications_roots.data).unwrap(); - - State { - config: value.config.into(), - slot: value.slot, - latest_block_header: value.latest_block_header.into(), - latest_justified: value.latest_justified.into(), - latest_finalized: value.latest_finalized.into(), - historical_block_hashes, - justified_slots: BitList::with_capacity(0).unwrap(), - validators, - justifications_roots, - justifications_validators: BitList::with_capacity(0).unwrap(), - } - } -} - -/// Configuration for the beacon chain -#[derive(Debug, Clone, Deserialize)] -pub struct Config { - #[serde(rename = "genesisTime")] - pub genesis_time: u64, -} - -impl From for ethlambda_types::state::ChainConfig { - fn from(value: Config) -> Self { - ethlambda_types::state::ChainConfig { - genesis_time: value.genesis_time, - } - } -} - -#[derive(Debug, Clone, Deserialize)] -pub struct Checkpoint { - pub root: H256, - pub slot: u64, -} - -impl From for ethlambda_types::state::Checkpoint { - fn from(value: Checkpoint) -> Self { - Self { - root: value.root, - slot: value.slot, - } - } -} - -/// Block header representing the latest block -#[derive(Debug, Clone, Deserialize)] -pub struct BlockHeader { - pub slot: u64, - #[serde(rename = "proposerIndex")] - pub proposer_index: u64, - #[serde(rename = "parentRoot")] - pub parent_root: H256, - #[serde(rename = "stateRoot")] - pub state_root: H256, - #[serde(rename = "bodyRoot")] - pub body_root: H256, -} - -impl From for ethlambda_types::block::BlockHeader { - fn from(value: BlockHeader) -> Self { - Self { - slot: value.slot, - proposer_index: value.proposer_index, - parent_root: value.parent_root, - state_root: value.state_root, - body_root: value.body_root, - } - } -} - -/// Validator information -#[derive(Debug, Clone, Deserialize)] -pub struct Validator { - index: u64, - #[serde(deserialize_with = "deser_pubkey_hex")] - pubkey: ValidatorPubkeyBytes, -} - -impl From for ethlambda_types::state::Validator { - fn from(value: Validator) -> Self { - Self { - index: value.index, - pubkey: value.pubkey, - } - } -} - -/// Generic container for arrays -#[derive(Debug, Clone, Deserialize)] -pub struct Container { - pub data: Vec, -} - -/// A block to be processed -#[derive(Debug, Clone, Deserialize)] -pub struct Block { - pub slot: u64, - #[serde(rename = "proposerIndex")] - pub proposer_index: u64, - #[serde(rename = "parentRoot")] - pub parent_root: H256, - #[serde(rename = "stateRoot")] - pub state_root: H256, - pub body: BlockBody, -} - -impl From for ethlambda_types::block::Block { - fn from(value: Block) -> Self { - Self { - slot: value.slot, - proposer_index: value.proposer_index, - parent_root: value.parent_root, - state_root: value.state_root, - body: value.body.into(), - } - } -} - -/// Block body containing attestations and other data -#[derive(Debug, Clone, Deserialize)] -pub struct BlockBody { - pub attestations: Container, -} - -impl From for ethlambda_types::block::BlockBody { - fn from(value: BlockBody) -> Self { - let attestations = value - .attestations - .data - .into_iter() - .map(Into::into) - .collect(); - Self { - attestations: VariableList::new(attestations).expect("too many attestations"), - } - } -} - -#[derive(Debug, Clone, Deserialize)] -pub struct AggregatedAttestation { - #[serde(rename = "aggregationBits")] - pub aggregation_bits: AggregationBits, - pub data: AttestationData, -} - -impl From for ethlambda_types::attestation::AggregatedAttestation { - fn from(value: AggregatedAttestation) -> Self { - Self { - aggregation_bits: value.aggregation_bits.into(), - data: value.data.into(), - } - } -} - -#[derive(Debug, Clone, Deserialize)] -pub struct AggregationBits { - pub data: Vec, -} - -impl From for ethlambda_types::attestation::AggregationBits { - fn from(value: AggregationBits) -> Self { - let mut bits = - ethlambda_types::attestation::AggregationBits::with_capacity(value.data.len()).unwrap(); - for (i, &b) in value.data.iter().enumerate() { - bits.set(i, b).unwrap(); - } - bits - } -} - -#[derive(Debug, Clone, Deserialize)] -pub struct AttestationData { - pub slot: u64, - pub head: Checkpoint, - pub target: Checkpoint, - pub source: Checkpoint, -} - -impl From for ethlambda_types::attestation::AttestationData { - fn from(value: AttestationData) -> Self { - Self { - slot: value.slot, - head: value.head.into(), - target: value.target.into(), - source: value.source.into(), - } - } -} - #[derive(Debug, Clone, Deserialize)] #[serde(deny_unknown_fields)] pub struct PostState { @@ -301,32 +82,3 @@ pub struct PostState { #[serde(rename = "validatorCount")] pub validator_count: Option, } - -/// Test metadata and information -#[derive(Debug, Clone, Deserialize)] -#[allow(dead_code)] -pub struct TestInfo { - pub hash: String, - pub comment: String, - #[serde(rename = "testId")] - pub test_id: String, - pub description: String, - #[serde(rename = "fixtureFormat")] - pub fixture_format: String, -} - -// Helpers - -pub fn deser_pubkey_hex<'de, D>(d: D) -> Result -where - D: serde::Deserializer<'de>, -{ - use serde::de::Error; - - let value = String::deserialize(d)?; - let pubkey: ValidatorPubkeyBytes = hex::decode(value.strip_prefix("0x").unwrap_or(&value)) - .map_err(|_| D::Error::custom("ValidatorPubkey value is not valid hex"))? - .try_into() - .map_err(|_| D::Error::custom("ValidatorPubkey length != 52"))?; - Ok(pubkey) -} diff --git a/crates/blockchain/tests/common.rs b/crates/blockchain/tests/common.rs index 5d7e804c..1756bc6a 100644 --- a/crates/blockchain/tests/common.rs +++ b/crates/blockchain/tests/common.rs @@ -1,264 +1,14 @@ #![allow(dead_code)] -use ethlambda_types::{ - attestation::{ - AggregatedAttestation as DomainAggregatedAttestation, - AggregationBits as DomainAggregationBits, Attestation as DomainAttestation, - AttestationData as DomainAttestationData, - }, - block::{Block as DomainBlock, BlockBody as DomainBlockBody}, - primitives::{BitList, H256, VariableList}, - state::{ - ChainConfig, Checkpoint as DomainCheckpoint, State, Validator as DomainValidator, - ValidatorPubkeyBytes, - }, -}; -use serde::Deserialize; - -// ============================================================================ -// Generic Container -// ============================================================================ - -#[derive(Debug, Clone, Deserialize)] -pub struct Container { - pub data: Vec, -} - -// ============================================================================ -// Config -// ============================================================================ - -#[derive(Debug, Clone, Deserialize)] -pub struct Config { - #[serde(rename = "genesisTime")] - pub genesis_time: u64, -} - -impl From for ChainConfig { - fn from(value: Config) -> Self { - ChainConfig { - genesis_time: value.genesis_time, - } - } -} - -// ============================================================================ -// Checkpoint -// ============================================================================ - -#[derive(Debug, Clone, Deserialize)] -pub struct Checkpoint { - pub root: H256, - pub slot: u64, -} +pub use ethlambda_test_fixtures::*; -impl From for DomainCheckpoint { - fn from(value: Checkpoint) -> Self { - Self { - root: value.root, - slot: value.slot, - } - } -} - -// ============================================================================ -// BlockHeader -// ============================================================================ - -#[derive(Debug, Clone, Deserialize)] -pub struct BlockHeader { - pub slot: u64, - #[serde(rename = "proposerIndex")] - pub proposer_index: u64, - #[serde(rename = "parentRoot")] - pub parent_root: H256, - #[serde(rename = "stateRoot")] - pub state_root: H256, - #[serde(rename = "bodyRoot")] - pub body_root: H256, -} - -impl From for ethlambda_types::block::BlockHeader { - fn from(value: BlockHeader) -> Self { - Self { - slot: value.slot, - proposer_index: value.proposer_index, - parent_root: value.parent_root, - state_root: value.state_root, - body_root: value.body_root, - } - } -} - -// ============================================================================ -// Validator -// ============================================================================ - -#[derive(Debug, Clone, Deserialize)] -pub struct Validator { - index: u64, - #[serde(deserialize_with = "deser_pubkey_hex")] - pubkey: ValidatorPubkeyBytes, -} - -impl From for DomainValidator { - fn from(value: Validator) -> Self { - Self { - index: value.index, - pubkey: value.pubkey, - } - } -} - -// ============================================================================ -// State -// ============================================================================ - -#[derive(Debug, Clone, Deserialize)] -pub struct TestState { - pub config: Config, - pub slot: u64, - #[serde(rename = "latestBlockHeader")] - pub latest_block_header: BlockHeader, - #[serde(rename = "latestJustified")] - pub latest_justified: Checkpoint, - #[serde(rename = "latestFinalized")] - pub latest_finalized: Checkpoint, - #[serde(rename = "historicalBlockHashes")] - pub historical_block_hashes: Container, - #[serde(rename = "justifiedSlots")] - pub justified_slots: Container, - pub validators: Container, - #[serde(rename = "justificationsRoots")] - pub justifications_roots: Container, - #[serde(rename = "justificationsValidators")] - pub justifications_validators: Container, -} - -impl From for State { - fn from(value: TestState) -> Self { - let historical_block_hashes = - VariableList::new(value.historical_block_hashes.data).unwrap(); - let validators = - VariableList::new(value.validators.data.into_iter().map(Into::into).collect()).unwrap(); - let justifications_roots = VariableList::new(value.justifications_roots.data).unwrap(); - - State { - config: value.config.into(), - slot: value.slot, - latest_block_header: value.latest_block_header.into(), - latest_justified: value.latest_justified.into(), - latest_finalized: value.latest_finalized.into(), - historical_block_hashes, - justified_slots: BitList::with_capacity(0).unwrap(), - validators, - justifications_roots, - justifications_validators: BitList::with_capacity(0).unwrap(), - } - } -} - -// ============================================================================ -// Block Types -// ============================================================================ - -#[derive(Debug, Clone, Deserialize)] -pub struct Block { - pub slot: u64, - #[serde(rename = "proposerIndex")] - pub proposer_index: u64, - #[serde(rename = "parentRoot")] - pub parent_root: H256, - #[serde(rename = "stateRoot")] - pub state_root: H256, - pub body: BlockBody, -} - -impl From for DomainBlock { - fn from(value: Block) -> Self { - Self { - slot: value.slot, - proposer_index: value.proposer_index, - parent_root: value.parent_root, - state_root: value.state_root, - body: value.body.into(), - } - } -} - -#[derive(Debug, Clone, Deserialize)] -pub struct BlockBody { - pub attestations: Container, -} - -impl From for DomainBlockBody { - fn from(value: BlockBody) -> Self { - let attestations = value - .attestations - .data - .into_iter() - .map(Into::into) - .collect(); - Self { - attestations: VariableList::new(attestations).expect("too many attestations"), - } - } -} +use ethlambda_types::attestation::Attestation as DomainAttestation; +use serde::Deserialize; // ============================================================================ -// Attestation Types +// ProposerAttestation (forkchoice/signature tests only) // ============================================================================ -#[derive(Debug, Clone, Deserialize)] -pub struct AggregatedAttestation { - #[serde(rename = "aggregationBits")] - pub aggregation_bits: AggregationBits, - pub data: AttestationData, -} - -impl From for DomainAggregatedAttestation { - fn from(value: AggregatedAttestation) -> Self { - Self { - aggregation_bits: value.aggregation_bits.into(), - data: value.data.into(), - } - } -} - -#[derive(Debug, Clone, Deserialize)] -pub struct AggregationBits { - pub data: Vec, -} - -impl From for DomainAggregationBits { - fn from(value: AggregationBits) -> Self { - let mut bits = DomainAggregationBits::with_capacity(value.data.len()).unwrap(); - for (i, &b) in value.data.iter().enumerate() { - bits.set(i, b).unwrap(); - } - bits - } -} - -#[derive(Debug, Clone, Deserialize)] -pub struct AttestationData { - pub slot: u64, - pub head: Checkpoint, - pub target: Checkpoint, - pub source: Checkpoint, -} - -impl From for DomainAttestationData { - fn from(value: AttestationData) -> Self { - Self { - slot: value.slot, - head: value.head.into(), - target: value.target.into(), - source: value.source.into(), - } - } -} - #[derive(Debug, Clone, Deserialize)] pub struct ProposerAttestation { #[serde(rename = "validatorId")] @@ -274,37 +24,3 @@ impl From for DomainAttestation { } } } - -// ============================================================================ -// Metadata -// ============================================================================ - -#[derive(Debug, Clone, Deserialize)] -pub struct TestInfo { - pub hash: String, - pub comment: String, - #[serde(rename = "testId")] - pub test_id: String, - pub description: String, - #[serde(rename = "fixtureFormat")] - pub fixture_format: String, -} - -// ============================================================================ -// Helpers -// ============================================================================ - -pub fn deser_pubkey_hex<'de, D>(d: D) -> Result -where - D: serde::Deserializer<'de>, -{ - use serde::Deserialize; - use serde::de::Error; - - let value = String::deserialize(d)?; - let pubkey: ValidatorPubkeyBytes = hex::decode(value.strip_prefix("0x").unwrap_or(&value)) - .map_err(|_| D::Error::custom("ValidatorPubkey value is not valid hex"))? - .try_into() - .map_err(|_| D::Error::custom("ValidatorPubkey length != 52"))?; - Ok(pubkey) -} diff --git a/crates/blockchain/tests/forkchoice_spectests.rs b/crates/blockchain/tests/forkchoice_spectests.rs index bdce14fb..bb052c70 100644 --- a/crates/blockchain/tests/forkchoice_spectests.rs +++ b/crates/blockchain/tests/forkchoice_spectests.rs @@ -5,7 +5,7 @@ use std::{ }; use ethlambda_blockchain::{SECONDS_PER_SLOT, store}; -use ethlambda_storage::{SignatureKey, Store, StoredAggregatedPayload, backend::InMemoryBackend}; +use ethlambda_storage::{Store, backend::InMemoryBackend}; use ethlambda_types::{ attestation::{Attestation, AttestationData}, block::{Block, BlockSignatures, BlockWithAttestation, SignedBlockWithAttestation}, @@ -13,27 +13,6 @@ use ethlambda_types::{ state::State, }; -/// Extract per-validator attestation data from aggregated payloads. -/// Test helper that mirrors the private function in blockchain::store. -fn extract_attestations( - store: &Store, - payloads: impl Iterator)>, -) -> HashMap { - let mut result: HashMap = HashMap::new(); - for ((validator_id, data_root), _) in payloads { - let Some(data) = store.get_attestation_data_by_root(&data_root) else { - continue; - }; - let should_update = result - .get(&validator_id) - .is_none_or(|existing| existing.slot < data.slot); - if should_update { - result.insert(validator_id, data); - } - } - result -} - use crate::types::{ForkChoiceTestVector, StoreChecks}; const SUPPORTED_FIXTURE_FORMAT: &str = "fork_choice_test"; @@ -305,8 +284,12 @@ fn validate_attestation_check( let location = check.location.as_str(); let attestations: HashMap = match location { - "new" => extract_attestations(st, st.iter_new_aggregated_payloads()), - "known" => extract_attestations(st, st.iter_known_aggregated_payloads()), + "new" => { + st.extract_latest_attestations(st.iter_new_aggregated_payloads().map(|(key, _)| key)) + } + "known" => { + st.extract_latest_attestations(st.iter_known_aggregated_payloads().map(|(key, _)| key)) + } other => { return Err( format!("Step {}: unknown attestation location: {}", step_idx, other).into(), @@ -387,7 +370,7 @@ fn validate_lexicographic_head_among( let blocks = st.get_live_chain(); let known_attestations: HashMap = - extract_attestations(st, st.iter_known_aggregated_payloads()); + st.extract_latest_attestations(st.iter_known_aggregated_payloads().map(|(key, _)| key)); // Resolve all fork labels to roots and compute their weights // Map: label -> (root, slot, weight) diff --git a/crates/common/test-fixtures/Cargo.toml b/crates/common/test-fixtures/Cargo.toml new file mode 100644 index 00000000..f20d9200 --- /dev/null +++ b/crates/common/test-fixtures/Cargo.toml @@ -0,0 +1,16 @@ +[package] +name = "ethlambda-test-fixtures" +authors.workspace = true +edition.workspace = true +keywords.workspace = true +license.workspace = true +readme.workspace = true +repository.workspace = true +rust-version.workspace = true +version.workspace = true + +[dependencies] +ethlambda-types.workspace = true + +serde.workspace = true +hex.workspace = true diff --git a/crates/common/test-fixtures/src/lib.rs b/crates/common/test-fixtures/src/lib.rs new file mode 100644 index 00000000..ab816308 --- /dev/null +++ b/crates/common/test-fixtures/src/lib.rs @@ -0,0 +1,289 @@ +use ethlambda_types::{ + attestation::{ + AggregatedAttestation as DomainAggregatedAttestation, + AggregationBits as DomainAggregationBits, AttestationData as DomainAttestationData, + }, + block::{Block as DomainBlock, BlockBody as DomainBlockBody}, + checkpoint::Checkpoint as DomainCheckpoint, + primitives::{BitList, H256, VariableList}, + state::{ChainConfig, State, Validator as DomainValidator, ValidatorPubkeyBytes}, +}; +use serde::Deserialize; + +// ============================================================================ +// Generic Container +// ============================================================================ + +#[derive(Debug, Clone, Deserialize)] +pub struct Container { + pub data: Vec, +} + +// ============================================================================ +// Config +// ============================================================================ + +#[derive(Debug, Clone, Deserialize)] +pub struct Config { + #[serde(rename = "genesisTime")] + pub genesis_time: u64, +} + +impl From for ChainConfig { + fn from(value: Config) -> Self { + ChainConfig { + genesis_time: value.genesis_time, + } + } +} + +// ============================================================================ +// Checkpoint +// ============================================================================ + +#[derive(Debug, Clone, Deserialize)] +pub struct Checkpoint { + pub root: H256, + pub slot: u64, +} + +impl From for DomainCheckpoint { + fn from(value: Checkpoint) -> Self { + Self { + root: value.root, + slot: value.slot, + } + } +} + +// ============================================================================ +// BlockHeader +// ============================================================================ + +#[derive(Debug, Clone, Deserialize)] +pub struct BlockHeader { + pub slot: u64, + #[serde(rename = "proposerIndex")] + pub proposer_index: u64, + #[serde(rename = "parentRoot")] + pub parent_root: H256, + #[serde(rename = "stateRoot")] + pub state_root: H256, + #[serde(rename = "bodyRoot")] + pub body_root: H256, +} + +impl From for ethlambda_types::block::BlockHeader { + fn from(value: BlockHeader) -> Self { + Self { + slot: value.slot, + proposer_index: value.proposer_index, + parent_root: value.parent_root, + state_root: value.state_root, + body_root: value.body_root, + } + } +} + +// ============================================================================ +// Validator +// ============================================================================ + +#[derive(Debug, Clone, Deserialize)] +pub struct Validator { + index: u64, + #[serde(deserialize_with = "deser_pubkey_hex")] + pubkey: ValidatorPubkeyBytes, +} + +impl From for DomainValidator { + fn from(value: Validator) -> Self { + Self { + index: value.index, + pubkey: value.pubkey, + } + } +} + +// ============================================================================ +// State +// ============================================================================ + +#[derive(Debug, Clone, Deserialize)] +pub struct TestState { + pub config: Config, + pub slot: u64, + #[serde(rename = "latestBlockHeader")] + pub latest_block_header: BlockHeader, + #[serde(rename = "latestJustified")] + pub latest_justified: Checkpoint, + #[serde(rename = "latestFinalized")] + pub latest_finalized: Checkpoint, + #[serde(rename = "historicalBlockHashes")] + pub historical_block_hashes: Container, + #[serde(rename = "justifiedSlots")] + pub justified_slots: Container, + pub validators: Container, + #[serde(rename = "justificationsRoots")] + pub justifications_roots: Container, + #[serde(rename = "justificationsValidators")] + pub justifications_validators: Container, +} + +impl From for State { + fn from(value: TestState) -> Self { + let historical_block_hashes = + VariableList::new(value.historical_block_hashes.data).unwrap(); + let validators = + VariableList::new(value.validators.data.into_iter().map(Into::into).collect()).unwrap(); + let justifications_roots = VariableList::new(value.justifications_roots.data).unwrap(); + + State { + config: value.config.into(), + slot: value.slot, + latest_block_header: value.latest_block_header.into(), + latest_justified: value.latest_justified.into(), + latest_finalized: value.latest_finalized.into(), + historical_block_hashes, + justified_slots: BitList::with_capacity(0).unwrap(), + validators, + justifications_roots, + justifications_validators: BitList::with_capacity(0).unwrap(), + } + } +} + +// ============================================================================ +// Block Types +// ============================================================================ + +#[derive(Debug, Clone, Deserialize)] +pub struct Block { + pub slot: u64, + #[serde(rename = "proposerIndex")] + pub proposer_index: u64, + #[serde(rename = "parentRoot")] + pub parent_root: H256, + #[serde(rename = "stateRoot")] + pub state_root: H256, + pub body: BlockBody, +} + +impl From for DomainBlock { + fn from(value: Block) -> Self { + Self { + slot: value.slot, + proposer_index: value.proposer_index, + parent_root: value.parent_root, + state_root: value.state_root, + body: value.body.into(), + } + } +} + +#[derive(Debug, Clone, Deserialize)] +pub struct BlockBody { + pub attestations: Container, +} + +impl From for DomainBlockBody { + fn from(value: BlockBody) -> Self { + let attestations = value + .attestations + .data + .into_iter() + .map(Into::into) + .collect(); + Self { + attestations: VariableList::new(attestations).expect("too many attestations"), + } + } +} + +// ============================================================================ +// Attestation Types +// ============================================================================ + +#[derive(Debug, Clone, Deserialize)] +pub struct AggregatedAttestation { + #[serde(rename = "aggregationBits")] + pub aggregation_bits: AggregationBits, + pub data: AttestationData, +} + +impl From for DomainAggregatedAttestation { + fn from(value: AggregatedAttestation) -> Self { + Self { + aggregation_bits: value.aggregation_bits.into(), + data: value.data.into(), + } + } +} + +#[derive(Debug, Clone, Deserialize)] +pub struct AggregationBits { + pub data: Vec, +} + +impl From for DomainAggregationBits { + fn from(value: AggregationBits) -> Self { + let mut bits = DomainAggregationBits::with_capacity(value.data.len()).unwrap(); + for (i, &b) in value.data.iter().enumerate() { + bits.set(i, b).unwrap(); + } + bits + } +} + +#[derive(Debug, Clone, Deserialize)] +pub struct AttestationData { + pub slot: u64, + pub head: Checkpoint, + pub target: Checkpoint, + pub source: Checkpoint, +} + +impl From for DomainAttestationData { + fn from(value: AttestationData) -> Self { + Self { + slot: value.slot, + head: value.head.into(), + target: value.target.into(), + source: value.source.into(), + } + } +} + +// ============================================================================ +// Metadata +// ============================================================================ + +#[derive(Debug, Clone, Deserialize)] +pub struct TestInfo { + pub hash: String, + pub comment: String, + #[serde(rename = "testId")] + pub test_id: String, + pub description: String, + #[serde(rename = "fixtureFormat")] + pub fixture_format: String, +} + +// ============================================================================ +// Helpers +// ============================================================================ + +pub fn deser_pubkey_hex<'de, D>(d: D) -> Result +where + D: serde::Deserializer<'de>, +{ + use serde::Deserialize; + use serde::de::Error; + + let value = String::deserialize(d)?; + let pubkey: ValidatorPubkeyBytes = hex::decode(value.strip_prefix("0x").unwrap_or(&value)) + .map_err(|_| D::Error::custom("ValidatorPubkey value is not valid hex"))? + .try_into() + .map_err(|_| D::Error::custom("ValidatorPubkey length != 52"))?; + Ok(pubkey) +} diff --git a/crates/common/types/src/attestation.rs b/crates/common/types/src/attestation.rs index 6390a2bb..386bed4d 100644 --- a/crates/common/types/src/attestation.rs +++ b/crates/common/types/src/attestation.rs @@ -1,8 +1,9 @@ use crate::{ block::AggregatedSignatureProof, + checkpoint::Checkpoint, primitives::ssz::{Decode, Encode, TreeHash}, signature::SignatureSize, - state::{Checkpoint, ValidatorRegistryLimit}, + state::ValidatorRegistryLimit, }; /// Validator specific attestation wrapping shared attestation data. diff --git a/crates/common/types/src/checkpoint.rs b/crates/common/types/src/checkpoint.rs new file mode 100644 index 00000000..328a7189 --- /dev/null +++ b/crates/common/types/src/checkpoint.rs @@ -0,0 +1,31 @@ +use serde::{Deserialize, Serialize}; + +use crate::primitives::{ + H256, + ssz::{Decode, Encode, TreeHash}, +}; + +/// Represents a checkpoint in the chain's history. +#[derive( + Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize, Encode, Decode, TreeHash, +)] +pub struct Checkpoint { + /// The root hash of the checkpoint's block. + pub root: H256, + /// The slot number of the checkpoint's block. + #[serde(deserialize_with = "deser_dec_str")] + pub slot: u64, +} + +// Taken from ethrex-common +fn deser_dec_str<'de, D>(d: D) -> Result +where + D: serde::Deserializer<'de>, +{ + use serde::de::Error; + + let value = String::deserialize(d)?; + value + .parse() + .map_err(|_| D::Error::custom("Failed to deserialize u64 value")) +} diff --git a/crates/common/types/src/lib.rs b/crates/common/types/src/lib.rs index 8192f005..6f9b28b9 100644 --- a/crates/common/types/src/lib.rs +++ b/crates/common/types/src/lib.rs @@ -1,5 +1,6 @@ pub mod attestation; pub mod block; +pub mod checkpoint; pub mod genesis; pub mod primitives; pub mod signature; diff --git a/crates/common/types/src/state.rs b/crates/common/types/src/state.rs index 893f1a32..1ed9363a 100644 --- a/crates/common/types/src/state.rs +++ b/crates/common/types/src/state.rs @@ -3,6 +3,7 @@ use ssz_types::typenum::{U4096, U262144}; use crate::{ block::{BlockBody, BlockHeader}, + checkpoint::Checkpoint, primitives::{ H256, ssz::{Decode, DecodeError, Encode, TreeHash}, @@ -115,31 +116,6 @@ impl State { } } -/// Represents a checkpoint in the chain's history. -#[derive( - Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize, Encode, Decode, TreeHash, -)] -pub struct Checkpoint { - /// The root hash of the checkpoint's block. - pub root: H256, - /// The slot number of the checkpoint's block. - #[serde(deserialize_with = "deser_dec_str")] - pub slot: u64, -} - -// Taken from ethrex-common -pub fn deser_dec_str<'de, D>(d: D) -> Result -where - D: serde::Deserializer<'de>, -{ - use serde::de::Error; - - let value = String::deserialize(d)?; - value - .parse() - .map_err(|_| D::Error::custom("Failed to deserialize u64 value")) -} - #[derive(Debug, Clone, Serialize, Deserialize, Encode, Decode, TreeHash)] pub struct ChainConfig { pub genesis_time: u64, diff --git a/crates/net/p2p/src/req_resp/handlers.rs b/crates/net/p2p/src/req_resp/handlers.rs index 77546bea..9e50ea66 100644 --- a/crates/net/p2p/src/req_resp/handlers.rs +++ b/crates/net/p2p/src/req_resp/handlers.rs @@ -4,6 +4,7 @@ use rand::seq::SliceRandom; use tokio::time::Duration; use tracing::{debug, error, info, warn}; +use ethlambda_types::checkpoint::Checkpoint; use ethlambda_types::primitives::ssz::TreeHash; use ethlambda_types::{block::SignedBlockWithAttestation, primitives::H256}; @@ -182,7 +183,7 @@ pub fn build_status(store: &Store) -> Status { .slot; Status { finalized, - head: ethlambda_types::state::Checkpoint { + head: Checkpoint { root: head_root, slot: head_slot, }, diff --git a/crates/net/p2p/src/req_resp/messages.rs b/crates/net/p2p/src/req_resp/messages.rs index 607c918c..3d53e366 100644 --- a/crates/net/p2p/src/req_resp/messages.rs +++ b/crates/net/p2p/src/req_resp/messages.rs @@ -1,10 +1,10 @@ use ethlambda_types::{ block::SignedBlockWithAttestation, + checkpoint::Checkpoint, primitives::{ H256, ssz::{Decode, Encode}, }, - state::Checkpoint, }; use ssz_types::typenum; diff --git a/crates/net/rpc/src/fork_choice.rs b/crates/net/rpc/src/fork_choice.rs index 29d21448..75fb2702 100644 --- a/crates/net/rpc/src/fork_choice.rs +++ b/crates/net/rpc/src/fork_choice.rs @@ -1,6 +1,6 @@ use axum::{http::HeaderValue, http::header, response::IntoResponse}; use ethlambda_storage::Store; -use ethlambda_types::primitives::H256; +use ethlambda_types::{checkpoint::Checkpoint, primitives::H256}; use serde::Serialize; use crate::json_response; @@ -12,8 +12,8 @@ const FORK_CHOICE_HTML: &str = include_str!("../static/fork_choice.html"); pub struct ForkChoiceResponse { nodes: Vec, head: H256, - justified: CheckpointInfo, - finalized: CheckpointInfo, + justified: Checkpoint, + finalized: Checkpoint, safe_target: H256, validator_count: u64, } @@ -27,12 +27,6 @@ pub struct ForkChoiceNode { weight: u64, } -#[derive(Serialize)] -pub struct CheckpointInfo { - root: H256, - slot: u64, -} - pub async fn get_fork_choice( axum::extract::State(store): axum::extract::State, ) -> impl IntoResponse { @@ -72,14 +66,8 @@ pub async fn get_fork_choice( let response = ForkChoiceResponse { nodes, head, - justified: CheckpointInfo { - root: justified.root, - slot: justified.slot, - }, - finalized: CheckpointInfo { - root: finalized.root, - slot: finalized.slot, - }, + justified, + finalized, safe_target, validator_count, }; @@ -101,37 +89,11 @@ mod tests { use super::*; use axum::{Router, body::Body, http::Request, http::StatusCode, routing::get}; use ethlambda_storage::{Store, backend::InMemoryBackend}; - use ethlambda_types::{ - block::{BlockBody, BlockHeader}, - primitives::ssz::TreeHash, - state::{ChainConfig, Checkpoint, JustificationValidators, JustifiedSlots, State}, - }; use http_body_util::BodyExt; use std::sync::Arc; use tower::ServiceExt; - fn create_test_state() -> State { - let genesis_header = BlockHeader { - slot: 0, - proposer_index: 0, - parent_root: H256::ZERO, - state_root: H256::ZERO, - body_root: BlockBody::default().tree_hash_root(), - }; - - State { - config: ChainConfig { genesis_time: 1000 }, - slot: 0, - latest_block_header: genesis_header, - latest_justified: Checkpoint::default(), - latest_finalized: Checkpoint::default(), - historical_block_hashes: Default::default(), - justified_slots: JustifiedSlots::with_capacity(0).unwrap(), - validators: Default::default(), - justifications_roots: Default::default(), - justifications_validators: JustificationValidators::with_capacity(0).unwrap(), - } - } + use crate::test_utils::create_test_state; fn build_test_router(store: Store) -> Router { Router::new() diff --git a/crates/net/rpc/src/lib.rs b/crates/net/rpc/src/lib.rs index 4b7e4c8b..40048aee 100644 --- a/crates/net/rpc/src/lib.rs +++ b/crates/net/rpc/src/lib.rs @@ -74,25 +74,16 @@ fn ssz_response(bytes: Vec) -> axum::response::Response { } #[cfg(test)] -mod tests { - use super::*; - use axum::{ - body::Body, - http::{Request, StatusCode}, - }; - use ethlambda_storage::{Store, backend::InMemoryBackend}; +pub(crate) mod test_utils { use ethlambda_types::{ block::{BlockBody, BlockHeader}, + checkpoint::Checkpoint, primitives::{H256, ssz::TreeHash}, - state::{ChainConfig, Checkpoint, JustificationValidators, JustifiedSlots, State}, + state::{ChainConfig, JustificationValidators, JustifiedSlots, State}, }; - use http_body_util::BodyExt; - use serde_json::json; - use std::sync::Arc; - use tower::ServiceExt; /// Create a minimal test state for testing. - fn create_test_state() -> State { + pub(crate) fn create_test_state() -> State { let genesis_header = BlockHeader { slot: 0, proposer_index: 0, @@ -119,6 +110,22 @@ mod tests { justifications_validators: JustificationValidators::with_capacity(0).unwrap(), } } +} + +#[cfg(test)] +mod tests { + use super::*; + use axum::{ + body::Body, + http::{Request, StatusCode}, + }; + use ethlambda_storage::{Store, backend::InMemoryBackend}; + use http_body_util::BodyExt; + use serde_json::json; + use std::sync::Arc; + use tower::ServiceExt; + + use super::test_utils::create_test_state; #[tokio::test] async fn test_get_latest_justified_checkpoint() { diff --git a/crates/storage/src/store.rs b/crates/storage/src/store.rs index 76945021..80f2528d 100644 --- a/crates/storage/src/store.rs +++ b/crates/storage/src/store.rs @@ -16,12 +16,13 @@ use ethlambda_types::{ Block, BlockBody, BlockHeader, BlockSignaturesWithAttestation, BlockWithAttestation, SignedBlockWithAttestation, }, + checkpoint::Checkpoint, primitives::{ H256, ssz::{Decode, Encode, TreeHash}, }, signature::ValidatorSignature, - state::{ChainConfig, Checkpoint, State}, + state::{ChainConfig, State}, }; use tracing::info; @@ -464,62 +465,18 @@ impl Store { /// /// Returns the number of signatures pruned. pub fn prune_gossip_signatures(&mut self, finalized_slot: u64) -> usize { - let view = self.backend.begin_read().expect("read view"); - let mut to_delete = vec![]; - - for (key_bytes, value_bytes) in view - .prefix_iterator(Table::GossipSignatures, &[]) - .expect("iter") - .filter_map(|r| r.ok()) - { - if let Ok(stored) = StoredSignature::from_ssz_bytes(&value_bytes) - && stored.slot <= finalized_slot - { - to_delete.push(key_bytes.to_vec()); - } - } - drop(view); - - let count = to_delete.len(); - if !to_delete.is_empty() { - let mut batch = self.backend.begin_write().expect("write batch"); - batch - .delete_batch(Table::GossipSignatures, to_delete) - .expect("delete"); - batch.commit().expect("commit"); - } - count + self.prune_by_slot(Table::GossipSignatures, finalized_slot, |bytes| { + StoredSignature::from_ssz_bytes(bytes).ok().map(|s| s.slot) + }) } /// Prune attestation data by root for slots <= finalized_slot. /// /// Returns the number of entries pruned. pub fn prune_attestation_data_by_root(&mut self, finalized_slot: u64) -> usize { - let view = self.backend.begin_read().expect("read view"); - let mut to_delete = vec![]; - - for (key_bytes, value_bytes) in view - .prefix_iterator(Table::AttestationDataByRoot, &[]) - .expect("iter") - .filter_map(|r| r.ok()) - { - if let Ok(data) = AttestationData::from_ssz_bytes(&value_bytes) - && data.slot <= finalized_slot - { - to_delete.push(key_bytes.to_vec()); - } - } - drop(view); - - let count = to_delete.len(); - if !to_delete.is_empty() { - let mut batch = self.backend.begin_write().expect("write batch"); - batch - .delete_batch(Table::AttestationDataByRoot, to_delete) - .expect("delete"); - batch.commit().expect("commit"); - } - count + self.prune_by_slot(Table::AttestationDataByRoot, finalized_slot, |bytes| { + AttestationData::from_ssz_bytes(bytes).ok().map(|d| d.slot) + }) } /// Prune an aggregated payload table (new or known) for slots <= finalized_slot. @@ -689,12 +646,17 @@ impl Store { /// (by slot). pub fn extract_latest_attestations( &self, - payloads: impl Iterator)>, + keys: impl Iterator, ) -> HashMap { let mut result: HashMap = HashMap::new(); + let mut data_cache: HashMap> = HashMap::new(); - for ((validator_id, data_root), _payload_list) in payloads { - let Some(data) = self.get_attestation_data_by_root(&data_root) else { + for (validator_id, data_root) in keys { + let data = data_cache + .entry(data_root) + .or_insert_with(|| self.get_attestation_data_by_root(&data_root)); + + let Some(data) = data else { continue; }; @@ -703,7 +665,7 @@ impl Store { .is_none_or(|existing| existing.slot < data.slot); if should_update { - result.insert(validator_id, data); + result.insert(validator_id, data.clone()); } } @@ -713,7 +675,7 @@ impl Store { /// Convenience: extract latest attestation per validator from known /// (fork-choice-active) aggregated payloads only. pub fn extract_latest_known_attestations(&self) -> HashMap { - self.extract_latest_attestations(self.iter_known_aggregated_payloads()) + self.extract_latest_attestations(self.iter_known_aggregated_payloads().map(|(key, _)| key)) } // ============ Known Aggregated Payloads ============ @@ -725,19 +687,7 @@ impl Store { pub fn iter_known_aggregated_payloads( &self, ) -> impl Iterator)> + '_ { - let view = self.backend.begin_read().expect("read view"); - let entries: Vec<_> = view - .prefix_iterator(Table::LatestKnownAggregatedPayloads, &[]) - .expect("iterator") - .filter_map(|res| res.ok()) - .map(|(k, v)| { - let key = decode_signature_key(&k); - let payloads = - Vec::::from_ssz_bytes(&v).expect("valid payloads"); - (key, payloads) - }) - .collect(); - entries.into_iter() + self.iter_aggregated_payloads(Table::LatestKnownAggregatedPayloads) } /// Insert an aggregated payload into the known (fork-choice-active) table. @@ -746,23 +696,15 @@ impl Store { key: SignatureKey, payload: StoredAggregatedPayload, ) { - let encoded_key = encode_signature_key(&key); - let view = self.backend.begin_read().expect("read view"); - let mut payloads: Vec = view - .get(Table::LatestKnownAggregatedPayloads, &encoded_key) - .expect("get") - .map(|bytes| Vec::::from_ssz_bytes(&bytes).expect("valid")) - .unwrap_or_default(); - drop(view); - - payloads.push(payload); + self.insert_aggregated_payload(Table::LatestKnownAggregatedPayloads, key, payload); + } - let mut batch = self.backend.begin_write().expect("write batch"); - let entries = vec![(encoded_key, payloads.as_ssz_bytes())]; - batch - .put_batch(Table::LatestKnownAggregatedPayloads, entries) - .expect("put known aggregated payload"); - batch.commit().expect("commit"); + /// Batch-insert multiple aggregated payloads into the known table in a single commit. + pub fn insert_known_aggregated_payloads_batch( + &mut self, + entries: Vec<(SignatureKey, StoredAggregatedPayload)>, + ) { + self.insert_aggregated_payloads_batch(Table::LatestKnownAggregatedPayloads, entries); } // ============ New Aggregated Payloads ============ @@ -774,9 +716,70 @@ impl Store { pub fn iter_new_aggregated_payloads( &self, ) -> impl Iterator)> + '_ { + self.iter_aggregated_payloads(Table::LatestNewAggregatedPayloads) + } + + /// Insert an aggregated payload into the new (pending) table. + pub fn insert_new_aggregated_payload( + &mut self, + key: SignatureKey, + payload: StoredAggregatedPayload, + ) { + self.insert_aggregated_payload(Table::LatestNewAggregatedPayloads, key, payload); + } + + /// Batch-insert multiple aggregated payloads into the new table in a single commit. + pub fn insert_new_aggregated_payloads_batch( + &mut self, + entries: Vec<(SignatureKey, StoredAggregatedPayload)>, + ) { + self.insert_aggregated_payloads_batch(Table::LatestNewAggregatedPayloads, entries); + } + + // ============ Pruning Helpers ============ + + /// Prune entries from a table where the slot (extracted via `get_slot`) is <= `finalized_slot`. + /// Returns the number of entries pruned. + fn prune_by_slot( + &mut self, + table: Table, + finalized_slot: u64, + get_slot: impl Fn(&[u8]) -> Option, + ) -> usize { + let view = self.backend.begin_read().expect("read view"); + let mut to_delete = vec![]; + + for (key_bytes, value_bytes) in view + .prefix_iterator(table, &[]) + .expect("iter") + .filter_map(|r| r.ok()) + { + if let Some(slot) = get_slot(&value_bytes) + && slot <= finalized_slot + { + to_delete.push(key_bytes.to_vec()); + } + } + drop(view); + + let count = to_delete.len(); + if !to_delete.is_empty() { + let mut batch = self.backend.begin_write().expect("write batch"); + batch.delete_batch(table, to_delete).expect("delete"); + batch.commit().expect("commit"); + } + count + } + + // ============ Aggregated Payload Helpers ============ + + fn iter_aggregated_payloads( + &self, + table: Table, + ) -> impl Iterator)> { let view = self.backend.begin_read().expect("read view"); let entries: Vec<_> = view - .prefix_iterator(Table::LatestNewAggregatedPayloads, &[]) + .prefix_iterator(table, &[]) .expect("iterator") .filter_map(|res| res.ok()) .map(|(k, v)| { @@ -789,28 +792,51 @@ impl Store { entries.into_iter() } - /// Insert an aggregated payload into the new (pending) table. - pub fn insert_new_aggregated_payload( + fn insert_aggregated_payload( &mut self, + table: Table, key: SignatureKey, payload: StoredAggregatedPayload, ) { - let encoded_key = encode_signature_key(&key); + self.insert_aggregated_payloads_batch(table, vec![(key, payload)]); + } + + /// Batch-insert multiple aggregated payloads in a single read-write-commit cycle. + /// Groups entries by key to correctly handle multiple payloads for the same key. + fn insert_aggregated_payloads_batch( + &mut self, + table: Table, + entries: Vec<(SignatureKey, StoredAggregatedPayload)>, + ) { + if entries.is_empty() { + return; + } + + // Group entries by key to handle multiple payloads for the same key + let mut grouped: HashMap, Vec> = HashMap::new(); + for (key, payload) in entries { + let encoded_key = encode_signature_key(&key); + grouped.entry(encoded_key).or_default().push(payload); + } + let view = self.backend.begin_read().expect("read view"); - let mut payloads: Vec = view - .get(Table::LatestNewAggregatedPayloads, &encoded_key) - .expect("get") - .map(|bytes| Vec::::from_ssz_bytes(&bytes).expect("valid")) - .unwrap_or_default(); + let mut batch_entries = Vec::new(); + + for (encoded_key, new_payloads) in grouped { + let mut payloads: Vec = view + .get(table, &encoded_key) + .expect("get") + .map(|bytes| Vec::::from_ssz_bytes(&bytes).expect("valid")) + .unwrap_or_default(); + payloads.extend(new_payloads); + batch_entries.push((encoded_key, payloads.as_ssz_bytes())); + } drop(view); - payloads.push(payload); - let mut batch = self.backend.begin_write().expect("write batch"); - let entries = vec![(encoded_key, payloads.as_ssz_bytes())]; batch - .put_batch(Table::LatestNewAggregatedPayloads, entries) - .expect("put new aggregated payload"); + .put_batch(table, batch_entries) + .expect("put aggregated payloads"); batch.commit().expect("commit"); } @@ -902,12 +928,11 @@ impl Store { /// Stores a gossip signature for later aggregation. pub fn insert_gossip_signature( &mut self, - attestation_data: &AttestationData, + data_root: H256, + slot: u64, validator_id: u64, signature: ValidatorSignature, ) { - let slot = attestation_data.slot; - let data_root = attestation_data.tree_hash_root(); let key = (validator_id, data_root); let stored = StoredSignature::new(slot, signature);