Skip to content
Merged
11 changes: 11 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ members = [
"crates/blockchain/state_transition",
"crates/common/crypto",
"crates/common/metrics",
"crates/common/test-fixtures",
"crates/common/types",
"crates/net/p2p",
"crates/net/rpc",
Expand All @@ -30,6 +31,7 @@ ethlambda-fork-choice = { path = "crates/blockchain/fork_choice" }
ethlambda-state-transition = { path = "crates/blockchain/state_transition" }
ethlambda-crypto = { path = "crates/common/crypto" }
ethlambda-metrics = { path = "crates/common/metrics" }
ethlambda-test-fixtures = { path = "crates/common/test-fixtures" }
ethlambda-types = { path = "crates/common/types" }
ethlambda-p2p = { path = "crates/net/p2p" }
ethlambda-rpc = { path = "crates/net/rpc" }
Expand Down
3 changes: 2 additions & 1 deletion bin/ethlambda/src/checkpoint_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,8 +193,9 @@ fn verify_checkpoint_state(
mod tests {
use super::*;
use ethlambda_types::block::BlockHeader;
use ethlambda_types::checkpoint::Checkpoint;
use ethlambda_types::primitives::VariableList;
use ethlambda_types::state::{ChainConfig, Checkpoint};
use ethlambda_types::state::ChainConfig;

// Helper to create valid test state
fn create_test_state(slot: u64, validators: Vec<Validator>, genesis_time: u64) -> State {
Expand Down
1 change: 1 addition & 0 deletions crates/blockchain/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ tracing.workspace = true
hex.workspace = true

[dev-dependencies]
ethlambda-test-fixtures.workspace = true
serde = { workspace = true }
serde_json = { workspace = true }
hex = { workspace = true }
Expand Down
2 changes: 1 addition & 1 deletion crates/blockchain/fork_choice/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ pub fn compute_lmd_ghost_head(
#[cfg(test)]
mod tests {
use super::*;
use ethlambda_types::state::Checkpoint;
use ethlambda_types::checkpoint::Checkpoint;

fn make_attestation(head_root: H256, slot: u64) -> AttestationData {
AttestationData {
Expand Down
2 changes: 1 addition & 1 deletion crates/blockchain/src/fork_choice_tree.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::collections::HashMap;
use std::fmt::Write;

use ethlambda_types::{ShortRoot, primitives::H256, state::Checkpoint};
use ethlambda_types::{ShortRoot, checkpoint::Checkpoint, primitives::H256};

/// Maximum depth of the tree to display before truncating with `...`.
const MAX_DISPLAY_DEPTH: usize = 20;
Expand Down
4 changes: 2 additions & 2 deletions crates/blockchain/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ use ethlambda_types::{
ShortRoot,
attestation::{Attestation, AttestationData, SignedAggregatedAttestation, SignedAttestation},
block::{BlockSignatures, BlockWithAttestation, SignedBlockWithAttestation},
checkpoint::Checkpoint,
primitives::{H256, ssz::TreeHash},
signature::ValidatorSecretKey,
state::Checkpoint,
};
use spawned_concurrency::tasks::{
CallResponse, CastResponse, GenServer, GenServerHandle, send_after,
Expand Down Expand Up @@ -179,7 +179,7 @@ impl BlockChainServer {

// Update safe target slot metric (updated by store.on_tick at interval 3)
metrics::update_safe_target_slot(self.store.safe_target_slot());
// Update head slot metric (head may change when attestations are promoted at intervals 0/3)
// Update head slot metric (head may change when attestations are promoted at intervals 0/4)
metrics::update_head_slot(self.store.head_slot());
}

Expand Down
73 changes: 44 additions & 29 deletions crates/blockchain/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@ use ethlambda_types::{
AggregatedAttestations, AggregatedSignatureProof, Block, BlockBody,
SignedBlockWithAttestation,
},
checkpoint::Checkpoint,
primitives::{H256, ssz::TreeHash},
signature::ValidatorSignature,
state::{Checkpoint, State},
state::State,
};
use tracing::{info, trace, warn};

Expand Down Expand Up @@ -101,7 +102,7 @@ fn update_safe_target(store: &mut Store) {
for (key, new_proofs) in store.iter_new_aggregated_payloads() {
all_payloads.entry(key).or_default().extend(new_proofs);
}
let attestations = store.extract_latest_attestations(all_payloads.into_iter());
let attestations = store.extract_latest_attestations(all_payloads.into_keys());
let (safe_target, _weights) = ethlambda_fork_choice::compute_lmd_ghost_head(
store.latest_justified().root,
&blocks,
Expand Down Expand Up @@ -129,6 +130,7 @@ fn aggregate_committee_signatures(store: &mut Store) -> Vec<SignedAggregatedAtte
// Group gossip signatures by data_root for batch aggregation
let mut groups: HashMap<H256, Vec<(u64, ValidatorSignature)>> = HashMap::new();
let mut keys_to_delete: Vec<SignatureKey> = Vec::new();
let mut payload_entries: Vec<(SignatureKey, StoredAggregatedPayload)> = Vec::new();

for ((validator_id, data_root), stored_sig) in &gossip_sigs {
if let Ok(sig) = stored_sig.to_validator_signature() {
Expand All @@ -145,7 +147,6 @@ fn aggregate_committee_signatures(store: &mut Store) -> Vec<SignedAggregatedAtte
};

let slot = data.slot;
let message = data.tree_hash_root();

let mut sigs = vec![];
let mut pubkeys = vec![];
Expand All @@ -167,7 +168,8 @@ fn aggregate_committee_signatures(store: &mut Store) -> Vec<SignedAggregatedAtte
continue;
}

let Ok(proof_data) = aggregate_signatures(pubkeys, sigs, &message, slot as u32)
// data_root is already the tree_hash_root of the attestation data
let Ok(proof_data) = aggregate_signatures(pubkeys, sigs, &data_root, slot as u32)
.inspect_err(|err| warn!(%err, "Failed to aggregate committee signatures"))
else {
continue;
Expand All @@ -183,9 +185,9 @@ fn aggregate_committee_signatures(store: &mut Store) -> Vec<SignedAggregatedAtte

let payload = StoredAggregatedPayload { slot, proof };

// Store in new aggregated payloads for each covered validator
// Collect entries for batch insert
for vid in &ids {
store.insert_new_aggregated_payload((*vid, data_root), payload.clone());
payload_entries.push(((*vid, data_root), payload.clone()));
}

// Only delete successfully aggregated signatures
Expand All @@ -195,6 +197,9 @@ fn aggregate_committee_signatures(store: &mut Store) -> Vec<SignedAggregatedAtte
metrics::inc_pq_sig_attestations_in_aggregated_signatures(ids.len() as u64);
}

// Batch-insert all new aggregated payloads in a single commit
store.insert_new_aggregated_payloads_batch(payload_entries);

// Delete aggregated entries from gossip_signatures
store.delete_gossip_signatures(&keys_to_delete);

Expand Down Expand Up @@ -378,7 +383,7 @@ pub fn on_gossip_attestation(
store.insert_attestation_data_by_root(data_root, attestation.data.clone());

// Store gossip signature for later aggregation at interval 2.
store.insert_gossip_signature(&attestation.data, validator_id, signature);
store.insert_gossip_signature(data_root, attestation.data.slot, validator_id, signature);

metrics::inc_attestations_valid("gossip");

Expand Down Expand Up @@ -431,29 +436,33 @@ pub fn on_gossip_aggregated_attestation(
})
.collect::<Result<_, _>>()?;

let message = aggregated.data.tree_hash_root();
let data_root = aggregated.data.tree_hash_root();
let epoch: u32 = aggregated.data.slot.try_into().expect("slot exceeds u32");

ethlambda_crypto::verify_aggregated_signature(
&aggregated.proof.proof_data,
pubkeys,
&message,
&data_root,
epoch,
)
.map_err(StoreError::AggregateVerificationFailed)?;

// Store attestation data by root (content-addressed, idempotent)
let data_root = aggregated.data.tree_hash_root();
store.insert_attestation_data_by_root(data_root, aggregated.data.clone());

// Store one aggregated payload per participating validator
for validator_id in aggregated.proof.participant_indices() {
let payload = StoredAggregatedPayload {
slot: aggregated.data.slot,
proof: aggregated.proof.clone(),
};
store.insert_new_aggregated_payload((validator_id, data_root), payload);
}
// Store one aggregated payload per participating validator (batch insert)
let entries: Vec<_> = aggregated
.proof
.participant_indices()
.map(|validator_id| {
let payload = StoredAggregatedPayload {
slot: aggregated.data.slot,
proof: aggregated.proof.clone(),
};
((validator_id, data_root), payload)
})
.collect();
store.insert_new_aggregated_payloads_batch(entries);

let slot = aggregated.data.slot;
let num_participants = aggregated.proof.participants.num_set_bits();
Expand Down Expand Up @@ -531,8 +540,6 @@ fn on_block_core(

let block = signed_block.message.block.clone();
let proposer_attestation = signed_block.message.proposer_attestation.clone();
let block_root = block.tree_hash_root();
let slot = block.slot;

// Execute state transition function to compute post-block state
let mut post_state = parent_state;
Expand Down Expand Up @@ -562,6 +569,7 @@ fn on_block_core(

// Process block body attestations.
// Store attestation data by root and proofs in known aggregated payloads.
let mut known_entries: Vec<(SignatureKey, StoredAggregatedPayload)> = Vec::new();
for (att, proof) in aggregated_attestations
.iter()
.zip(attestation_signatures.iter())
Expand All @@ -576,12 +584,11 @@ fn on_block_core(
};

for validator_id in &validator_ids {
// Store proof in known aggregated payloads (active in fork choice)
store.insert_known_aggregated_payload((*validator_id, data_root), payload.clone());

known_entries.push(((*validator_id, data_root), payload.clone()));
metrics::inc_attestations_valid("block");
}
}
store.insert_known_aggregated_payloads_batch(known_entries);

// Update forkchoice head based on new block and attestations
// IMPORTANT: This must happen BEFORE processing proposer attestation
Expand All @@ -607,7 +614,12 @@ fn on_block_core(
let proposer_sig =
ValidatorSignature::from_bytes(&signed_block.signature.proposer_signature)
.map_err(|_| StoreError::SignatureDecodingFailed)?;
store.insert_gossip_signature(&proposer_attestation.data, proposer_vid, proposer_sig);
store.insert_gossip_signature(
proposer_data_root,
proposer_attestation.data.slot,
proposer_vid,
proposer_sig,
);
}

info!(%slot, %block_root, %state_root, "Processed new block");
Expand Down Expand Up @@ -748,8 +760,11 @@ pub fn produce_block_with_signatures(
});
}

// Convert known aggregated payloads to Attestation objects for build_block
let known_attestations = store.extract_latest_known_attestations();
// Single pass over known aggregated payloads: extract both attestation data and proofs
let known_payloads: Vec<_> = store.iter_known_aggregated_payloads().collect();

let known_attestations =
store.extract_latest_attestations(known_payloads.iter().map(|(key, _)| *key));
let available_attestations: Vec<Attestation> = known_attestations
.into_iter()
.map(|(validator_id, data)| Attestation { validator_id, data })
Expand All @@ -758,9 +773,9 @@ pub fn produce_block_with_signatures(
// Get known block roots for attestation validation
let known_block_roots = store.get_block_roots();

// Collect existing proofs for block building from known aggregated payloads
let aggregated_payloads: HashMap<SignatureKey, Vec<AggregatedSignatureProof>> = store
.iter_known_aggregated_payloads()
// Collect existing proofs for block building from the already-fetched payloads
let aggregated_payloads: HashMap<SignatureKey, Vec<AggregatedSignatureProof>> = known_payloads
.into_iter()
.map(|(key, stored_payloads)| {
let proofs = stored_payloads.into_iter().map(|sp| sp.proof).collect();
(key, proofs)
Expand Down
1 change: 1 addition & 0 deletions crates/blockchain/state_transition/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ thiserror.workspace = true
tracing.workspace = true

[dev-dependencies]
ethlambda-test-fixtures.workspace = true
serde.workspace = true
serde_json.workspace = true

Expand Down
3 changes: 2 additions & 1 deletion crates/blockchain/state_transition/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ use std::collections::HashMap;
use ethlambda_types::{
ShortRoot,
block::{AggregatedAttestations, Block, BlockHeader},
checkpoint::Checkpoint,
primitives::{H256, ssz::TreeHash},
state::{Checkpoint, JustificationValidators, State},
state::{JustificationValidators, State},
};
use tracing::info;

Expand Down
Loading