Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions crates/blockchain/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,12 @@ pub struct BlockChain {
handle: GenServerHandle<BlockChainServer>,
}

/// Seconds in a slot.
pub const SECONDS_PER_SLOT: u64 = 4;
/// Milliseconds in a slot.
pub const MILLISECONDS_PER_SLOT: u64 = 4_000;
/// Milliseconds per interval (800ms ticks).
pub const MILLISECONDS_PER_INTERVAL: u64 = 800;
/// Number of intervals per slot (5 intervals of 800ms = 4 seconds).
pub const INTERVALS_PER_SLOT: u64 = 5;
/// Milliseconds in a slot (derived from interval duration and count).
pub const MILLISECONDS_PER_SLOT: u64 = MILLISECONDS_PER_INTERVAL * INTERVALS_PER_SLOT;
impl BlockChain {
pub fn spawn(
store: Store,
Expand Down
58 changes: 26 additions & 32 deletions crates/blockchain/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use ethlambda_types::{
ShortRoot,
attestation::{
AggregatedAttestation, AggregationBits, Attestation, AttestationData,
SignedAggregatedAttestation, SignedAttestation,
SignedAggregatedAttestation, SignedAttestation, validator_indices,
},
block::{
AggregatedAttestations, AggregatedSignatureProof, Block, BlockBody,
Expand Down Expand Up @@ -94,15 +94,15 @@ fn update_safe_target(store: &mut Store) {
let min_target_score = (num_validators * 2).div_ceil(3);

let blocks = store.get_live_chain();
// Merge both attestation pools. At interval 3 the migration (interval 4) hasn't
// run yet, so attestations that entered "known" directly (proposer's own attestation
// in block body, node's self-attestation) would be invisible without this merge.
let mut all_payloads: HashMap<SignatureKey, Vec<StoredAggregatedPayload>> =
store.iter_known_aggregated_payloads().collect();
for (key, new_proofs) in store.iter_new_aggregated_payloads() {
all_payloads.entry(key).or_default().extend(new_proofs);
}
let attestations = store.extract_latest_attestations(all_payloads.into_keys());
// Merge both attestation pools (keys only — skip payload deserialization).
// At interval 3 the migration (interval 4) hasn't run yet, so attestations
// that entered "known" directly (proposer's own attestation in block body,
// node's self-attestation) would be invisible without this merge.
let all_keys: HashSet<SignatureKey> = store
.iter_known_aggregated_payload_keys()
.chain(store.iter_new_aggregated_payload_keys())
.collect();
let attestations = store.extract_latest_attestations(all_keys.into_iter());
let (safe_target, _weights) = ethlambda_fork_choice::compute_lmd_ghost_head(
store.latest_justified().root,
&blocks,
Expand Down Expand Up @@ -569,15 +569,16 @@ fn on_block_core(

// Process block body attestations.
// Store attestation data by root and proofs in known aggregated payloads.
let mut att_data_entries: Vec<(H256, AttestationData)> = Vec::new();
let mut known_entries: Vec<(SignatureKey, StoredAggregatedPayload)> = Vec::new();
for (att, proof) in aggregated_attestations
.iter()
.zip(attestation_signatures.iter())
{
let data_root = att.data.tree_hash_root();
store.insert_attestation_data_by_root(data_root, att.data.clone());
att_data_entries.push((data_root, att.data.clone()));

let validator_ids = aggregation_bits_to_validator_indices(&att.aggregation_bits);
let validator_ids: Vec<_> = validator_indices(&att.aggregation_bits).collect();
let payload = StoredAggregatedPayload {
slot: att.data.slot,
proof: proof.clone(),
Expand All @@ -588,19 +589,22 @@ fn on_block_core(
metrics::inc_attestations_valid("block");
}
}

// Process proposer attestation as pending (enters "new" stage via gossip path)
// The proposer's attestation should NOT affect this block's fork choice position.
let proposer_vid = proposer_attestation.validator_id;
let proposer_data_root = proposer_attestation.data.tree_hash_root();
att_data_entries.push((proposer_data_root, proposer_attestation.data.clone()));

// Batch-insert all attestation data (body + proposer) in a single commit
store.insert_attestation_data_by_root_batch(att_data_entries);
store.insert_known_aggregated_payloads_batch(known_entries);

// Update forkchoice head based on new block and attestations
// IMPORTANT: This must happen BEFORE processing proposer attestation
// to prevent the proposer from gaining circular weight advantage.
update_head(store, false);

// Process proposer attestation as pending (enters "new" stage via gossip path)
// The proposer's attestation should NOT affect this block's fork choice position.
let proposer_vid = proposer_attestation.validator_id;
let proposer_data_root = proposer_attestation.data.tree_hash_root();
store.insert_attestation_data_by_root(proposer_data_root, proposer_attestation.data.clone());

if !verify {
// Without sig verification, insert directly with a dummy proof
let participants = aggregation_bits_from_validator_indices(&[proposer_vid]);
Expand Down Expand Up @@ -888,15 +892,7 @@ pub enum StoreError {
NotProposer { validator_index: u64, slot: u64 },
}

/// Extract validator indices from aggregation bits.
fn aggregation_bits_to_validator_indices(bits: &AggregationBits) -> Vec<u64> {
bits.iter()
.enumerate()
.filter_map(|(i, bit)| if bit { Some(i as u64) } else { None })
.collect()
}

/// Extract validator indices from aggregation bits.
/// Build an AggregationBits bitfield from a list of validator indices.
fn aggregation_bits_from_validator_indices(bits: &[u64]) -> AggregationBits {
if bits.is_empty() {
return AggregationBits::with_capacity(0).expect("max capacity is non-zero");
Expand Down Expand Up @@ -1085,8 +1081,7 @@ fn select_aggregated_proofs(
let data = &aggregated.data;
let message = data.tree_hash_root();

let validator_ids = aggregation_bits_to_validator_indices(&aggregated.aggregation_bits);
let mut remaining: HashSet<u64> = validator_ids.into_iter().collect();
let mut remaining: HashSet<u64> = validator_indices(&aggregated.aggregation_bits).collect();

// Select existing proofs that cover the most remaining validators
while !remaining.is_empty() {
Expand All @@ -1104,8 +1099,7 @@ fn select_aggregated_proofs(
let (proof, covered) = candidates
.iter()
.map(|p| {
let covered: Vec<_> = aggregation_bits_to_validator_indices(&p.participants)
.into_iter()
let covered: Vec<_> = validator_indices(&p.participants)
.filter(|vid| remaining.contains(vid))
.collect();
(p, covered)
Expand Down Expand Up @@ -1161,7 +1155,7 @@ fn verify_signatures(

// Verify each attestation's signature proof
for (attestation, aggregated_proof) in attestations.iter().zip(attestation_signatures) {
let validator_ids = aggregation_bits_to_validator_indices(&attestation.aggregation_bits);
let validator_ids: Vec<_> = validator_indices(&attestation.aggregation_bits).collect();
if validator_ids.iter().any(|vid| *vid >= num_validators) {
return Err(StoreError::InvalidValidatorIndex);
}
Expand Down
6 changes: 3 additions & 3 deletions crates/blockchain/tests/forkchoice_spectests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::{
sync::Arc,
};

use ethlambda_blockchain::{SECONDS_PER_SLOT, store};
use ethlambda_blockchain::{MILLISECONDS_PER_SLOT, store};
use ethlambda_storage::{Store, backend::InMemoryBackend};
use ethlambda_types::{
attestation::{Attestation, AttestationData},
Expand Down Expand Up @@ -58,8 +58,8 @@ fn run(path: &Path) -> datatest_stable::Result<()> {

let signed_block = build_signed_block(block_data);

let block_time_ms =
(signed_block.message.block.slot * SECONDS_PER_SLOT + genesis_time) * 1000;
let block_time_ms = genesis_time * 1000
+ signed_block.message.block.slot * MILLISECONDS_PER_SLOT;

// NOTE: the has_proposal argument is set to true, following the spec
store::on_tick(&mut store, block_time_ms, true, false);
Expand Down
4 changes: 2 additions & 2 deletions crates/blockchain/tests/signature_spectests.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::path::Path;
use std::sync::Arc;

use ethlambda_blockchain::{SECONDS_PER_SLOT, store};
use ethlambda_blockchain::{MILLISECONDS_PER_SLOT, store};
use ethlambda_storage::{Store, backend::InMemoryBackend};
use ethlambda_types::{
block::{Block, SignedBlockWithAttestation},
Expand Down Expand Up @@ -51,7 +51,7 @@ fn run(path: &Path) -> datatest_stable::Result<()> {

// Advance time to the block's slot
let block_time_ms =
(signed_block.message.block.slot * SECONDS_PER_SLOT + genesis_time) * 1000;
genesis_time * 1000 + signed_block.message.block.slot * MILLISECONDS_PER_SLOT;
store::on_tick(&mut st, block_time_ms, true, false);

// Process the block (this includes signature verification)
Expand Down
7 changes: 7 additions & 0 deletions crates/common/types/src/attestation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,13 @@ pub struct AggregatedAttestation {
/// in some collective action (attestation, signature aggregation, etc.).
pub type AggregationBits = ssz_types::BitList<ValidatorRegistryLimit>;

/// Returns the indices of set bits in an `AggregationBits` bitfield as validator IDs.
pub fn validator_indices(bits: &AggregationBits) -> impl Iterator<Item = u64> + '_ {
bits.iter()
.enumerate()
.filter_map(|(i, bit)| if bit { Some(i as u64) } else { None })
}

/// Aggregated attestation with its signature proof, used for gossip on the aggregation topic.
#[derive(Debug, Clone, Encode, Decode)]
pub struct SignedAggregatedAttestation {
Expand Down
8 changes: 4 additions & 4 deletions crates/common/types/src/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ use serde::Serialize;
use ssz_types::typenum::U1048576;

use crate::{
attestation::{AggregatedAttestation, AggregationBits, Attestation, XmssSignature},
attestation::{
AggregatedAttestation, AggregationBits, Attestation, XmssSignature, validator_indices,
},
primitives::{
ByteList, H256,
ssz::{Decode, Encode, TreeHash},
Expand Down Expand Up @@ -105,9 +107,7 @@ impl AggregatedSignatureProof {

/// Returns the validator indices that are set in the participants bitfield.
pub fn participant_indices(&self) -> impl Iterator<Item = u64> + '_ {
(0..self.participants.len())
.filter(|&i| self.participants.get(i).unwrap_or(false))
.map(|i| i as u64)
validator_indices(&self.participants)
}
}

Expand Down
39 changes: 39 additions & 0 deletions crates/storage/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -631,6 +631,22 @@ impl Store {
batch.commit().expect("commit");
}

/// Batch-insert multiple attestation data entries in a single commit.
pub fn insert_attestation_data_by_root_batch(&mut self, entries: Vec<(H256, AttestationData)>) {
if entries.is_empty() {
return;
}
let mut batch = self.backend.begin_write().expect("write batch");
let ssz_entries = entries
.into_iter()
.map(|(root, data)| (root.as_ssz_bytes(), data.as_ssz_bytes()))
.collect();
batch
.put_batch(Table::AttestationDataByRoot, ssz_entries)
.expect("put attestation data batch");
batch.commit().expect("commit");
}

/// Returns attestation data for the given root hash.
pub fn get_attestation_data_by_root(&self, root: &H256) -> Option<AttestationData> {
let view = self.backend.begin_read().expect("read view");
Expand Down Expand Up @@ -690,6 +706,12 @@ impl Store {
self.iter_aggregated_payloads(Table::LatestKnownAggregatedPayloads)
}

/// Iterates over keys only from the known aggregated payloads table,
/// skipping value deserialization.
pub fn iter_known_aggregated_payload_keys(&self) -> impl Iterator<Item = SignatureKey> + '_ {
self.iter_aggregated_payload_keys(Table::LatestKnownAggregatedPayloads)
}

/// Insert an aggregated payload into the known (fork-choice-active) table.
pub fn insert_known_aggregated_payload(
&mut self,
Expand Down Expand Up @@ -719,6 +741,12 @@ impl Store {
self.iter_aggregated_payloads(Table::LatestNewAggregatedPayloads)
}

/// Iterates over keys only from the new aggregated payloads table,
/// skipping value deserialization.
pub fn iter_new_aggregated_payload_keys(&self) -> impl Iterator<Item = SignatureKey> + '_ {
self.iter_aggregated_payload_keys(Table::LatestNewAggregatedPayloads)
}

/// Insert an aggregated payload into the new (pending) table.
pub fn insert_new_aggregated_payload(
&mut self,
Expand Down Expand Up @@ -792,6 +820,17 @@ impl Store {
entries.into_iter()
}

fn iter_aggregated_payload_keys(&self, table: Table) -> impl Iterator<Item = SignatureKey> {
let view = self.backend.begin_read().expect("read view");
let keys: Vec<_> = view
.prefix_iterator(table, &[])
.expect("iterator")
.filter_map(|res| res.ok())
.map(|(k, _)| decode_signature_key(&k))
.collect();
keys.into_iter()
}

fn insert_aggregated_payload(
&mut self,
table: Table,
Expand Down