diff --git a/CLAUDE.md b/CLAUDE.md index 9509c546..2b42ce96 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -308,6 +308,13 @@ cargo test -p ethlambda-blockchain --test forkchoice_spectests -- --test-threads ## Common Gotchas +### Aggregator Flag Required for Finalization +- At least one node **must** be started with `--is-aggregator` to finalize blocks in production (without `skip-signature-verification`) +- Without this flag, attestations pass signature verification and are logged as "Attestation processed", but the signature is never stored for aggregation (`store.rs:368`), so blocks are always built with `attestation_count=0` +- The attestation pipeline: gossip → verify signature → store gossip signature (only if `is_aggregator`) → aggregate at interval 2 → promote to known → pack into blocks +- With `skip-signature-verification` (tests only), attestations bypass aggregation and go directly to `new_aggregated_payloads`, so the flag is not needed +- **Symptom**: `justified_slot=0` and `finalized_slot=0` indefinitely despite healthy block production and attestation gossip + ### Signature Verification - Fork choice tests use `on_block_without_verification()` to skip signature checks - Signature spec tests use `on_block()` which always verifies diff --git a/Cargo.lock b/Cargo.lock index ff2283d4..f2039d4e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2041,6 +2041,7 @@ name = "ethlambda-rpc" version = "0.1.0" dependencies = [ "axum", + "ethlambda-fork-choice", "ethlambda-metrics", "ethlambda-storage", "ethlambda-types", diff --git a/Makefile b/Makefile index ae133cac..83e8d330 100644 --- a/Makefile +++ b/Makefile @@ -24,7 +24,7 @@ docker-build: ## 🐳 Build the Docker image -t ghcr.io/lambdaclass/ethlambda:$(DOCKER_TAG) . @echo -LEAN_SPEC_COMMIT_HASH:=4edcf7bc9271e6a70ded8aff17710d68beac4266 +LEAN_SPEC_COMMIT_HASH:=8b7636bb8a95fe4bec414cc4c24e74079e6256b6 leanSpec: git clone https://github.com/leanEthereum/leanSpec.git --single-branch diff --git a/README.md b/README.md index 454b14cb..21054723 100644 --- a/README.md +++ b/README.md @@ -40,6 +40,8 @@ make run-devnet This generates fresh genesis files and starts all configured clients with metrics enabled. Press `Ctrl+C` to stop all nodes. +> **Important:** When running nodes manually (outside `make run-devnet`), at least one node must be started with `--is-aggregator` for attestations to be aggregated and included in blocks. Without this flag, the network will produce blocks but never finalize. + For custom devnet configurations, go to `lean-quickstart/local-devnet/genesis/validator-config.yaml` and edit the file before running the command above. See `lean-quickstart`'s documentation for more details on how to configure the devnet. ## Philosophy diff --git a/bin/ethlambda/src/main.rs b/bin/ethlambda/src/main.rs index 27b321c2..7b68abad 100644 --- a/bin/ethlambda/src/main.rs +++ b/bin/ethlambda/src/main.rs @@ -51,6 +51,12 @@ struct CliOptions { /// When set, skips genesis initialization and syncs from checkpoint. #[arg(long)] checkpoint_sync_url: Option, + /// Whether this node acts as a committee aggregator + #[arg(long, default_value = "false")] + is_aggregator: bool, + /// Number of attestation committees (subnets) per slot + #[arg(long, default_value = "1", value_parser = clap::value_parser!(u64).range(1..))] + attestation_committee_count: u64, } #[tokio::main] @@ -114,7 +120,10 @@ async fn main() -> eyre::Result<()> { .inspect_err(|err| error!(%err, "Failed to initialize state"))?; let (p2p_tx, p2p_rx) = tokio::sync::mpsc::unbounded_channel(); - let blockchain = BlockChain::spawn(store.clone(), p2p_tx, validator_keys); + // Use first validator ID for subnet subscription + let first_validator_id = validator_keys.keys().min().copied(); + let blockchain = + BlockChain::spawn(store.clone(), p2p_tx, validator_keys, options.is_aggregator); let p2p_handle = tokio::spawn(start_p2p( node_p2p_key, @@ -123,6 +132,9 @@ async fn main() -> eyre::Result<()> { blockchain, p2p_rx, store.clone(), + first_validator_id, + options.attestation_committee_count, + options.is_aggregator, )); ethlambda_rpc::start_rpc_server(metrics_socket, store) @@ -132,8 +144,8 @@ async fn main() -> eyre::Result<()> { info!("Node initialized"); tokio::select! { - _ = p2p_handle => { - panic!("P2P node task has exited unexpectedly"); + result = p2p_handle => { + panic!("P2P node task has exited unexpectedly: {result:?}"); } _ = tokio::signal::ctrl_c() => { // Ctrl-C received, shutting down diff --git a/crates/blockchain/fork_choice/src/lib.rs b/crates/blockchain/fork_choice/src/lib.rs index 4458122d..4f746c3e 100644 --- a/crates/blockchain/fork_choice/src/lib.rs +++ b/crates/blockchain/fork_choice/src/lib.rs @@ -2,9 +2,35 @@ use std::collections::HashMap; use ethlambda_types::{attestation::AttestationData, primitives::H256}; +/// Compute per-block attestation weights for the fork choice tree. +/// +/// For each validator attestation, walks backward from the attestation's head +/// through the parent chain, incrementing weight for each block above start_slot. +pub fn compute_block_weights( + start_slot: u64, + blocks: &HashMap, + attestations: &HashMap, +) -> HashMap { + let mut weights: HashMap = HashMap::new(); + + for attestation_data in attestations.values() { + let mut current_root = attestation_data.head.root; + while let Some(&(slot, parent_root)) = blocks.get(¤t_root) + && slot > start_slot + { + *weights.entry(current_root).or_default() += 1; + current_root = parent_root; + } + } + + weights +} + /// Compute the LMD GHOST head of the chain, given a starting root, a set of blocks, /// a set of attestations, and a minimum score threshold. /// +/// Returns the head root and the per-block attestation weights used for selection. +/// /// This is the same implementation from leanSpec // TODO: add proto-array implementation pub fn compute_lmd_ghost_head( @@ -12,9 +38,9 @@ pub fn compute_lmd_ghost_head( blocks: &HashMap, attestations: &HashMap, min_score: u64, -) -> H256 { +) -> (H256, HashMap) { if blocks.is_empty() { - return start_root; + return (start_root, HashMap::new()); } if start_root.is_zero() { start_root = *blocks @@ -24,19 +50,9 @@ pub fn compute_lmd_ghost_head( .expect("we already checked blocks is non-empty"); } let Some(&(start_slot, _)) = blocks.get(&start_root) else { - return start_root; + return (start_root, HashMap::new()); }; - let mut weights: HashMap = HashMap::new(); - - for attestation_data in attestations.values() { - let mut current_root = attestation_data.head.root; - while let Some(&(slot, parent_root)) = blocks.get(¤t_root) - && slot > start_slot - { - *weights.entry(current_root).or_default() += 1; - current_root = parent_root; - } - } + let weights = compute_block_weights(start_slot, blocks, attestations); let mut children_map: HashMap> = HashMap::new(); @@ -62,5 +78,59 @@ pub fn compute_lmd_ghost_head( .expect("checked it's not empty"); } - head + (head, weights) +} + +#[cfg(test)] +mod tests { + use super::*; + use ethlambda_types::state::Checkpoint; + + fn make_attestation(head_root: H256, slot: u64) -> AttestationData { + AttestationData { + slot, + head: Checkpoint { + root: head_root, + slot, + }, + target: Checkpoint::default(), + source: Checkpoint::default(), + } + } + + #[test] + fn test_compute_block_weights() { + // Chain: root_a (slot 0) -> root_b (slot 1) -> root_c (slot 2) + let root_a = H256::from([1u8; 32]); + let root_b = H256::from([2u8; 32]); + let root_c = H256::from([3u8; 32]); + + let mut blocks = HashMap::new(); + blocks.insert(root_a, (0, H256::ZERO)); + blocks.insert(root_b, (1, root_a)); + blocks.insert(root_c, (2, root_b)); + + // Two validators: one attests to root_c, one attests to root_b + let mut attestations = HashMap::new(); + attestations.insert(0, make_attestation(root_c, 2)); + attestations.insert(1, make_attestation(root_b, 1)); + + let weights = compute_block_weights(0, &blocks, &attestations); + + // root_c: 1 vote (validator 0) + assert_eq!(weights.get(&root_c).copied().unwrap_or(0), 1); + // root_b: 2 votes (validator 0 walks through it + validator 1 attests directly) + assert_eq!(weights.get(&root_b).copied().unwrap_or(0), 2); + // root_a: at slot 0 = start_slot, so not counted + assert_eq!(weights.get(&root_a).copied().unwrap_or(0), 0); + } + + #[test] + fn test_compute_block_weights_empty() { + let blocks = HashMap::new(); + let attestations = HashMap::new(); + + let weights = compute_block_weights(0, &blocks, &attestations); + assert!(weights.is_empty()); + } } diff --git a/crates/blockchain/src/fork_choice_tree.rs b/crates/blockchain/src/fork_choice_tree.rs new file mode 100644 index 00000000..52565374 --- /dev/null +++ b/crates/blockchain/src/fork_choice_tree.rs @@ -0,0 +1,490 @@ +use std::collections::HashMap; +use std::fmt::Write; + +use ethlambda_types::{ShortRoot, primitives::H256, state::Checkpoint}; + +/// Maximum depth of the tree to display before truncating with `...`. +const MAX_DISPLAY_DEPTH: usize = 20; + +/// Format the fork choice tree as an ASCII art string for terminal logging. +/// +/// Renders a tree showing the chain structure with Unicode connectors, +/// missing-slot indicators, weight annotations, and head markers. +pub(crate) fn format_fork_choice_tree( + blocks: &HashMap, + weights: &HashMap, + head: H256, + justified: Checkpoint, + finalized: Checkpoint, +) -> String { + let mut output = String::new(); + + // Header + writeln!(output, "Fork Choice Tree:").unwrap(); + writeln!( + output, + " Finalized: slot {} | root {}", + finalized.slot, + ShortRoot(&finalized.root.0) + ) + .unwrap(); + writeln!( + output, + " Justified: slot {} | root {}", + justified.slot, + ShortRoot(&justified.root.0) + ) + .unwrap(); + writeln!( + output, + " Head: slot {} | root {}", + blocks.get(&head).map(|(slot, _)| *slot).unwrap_or(0), + ShortRoot(&head.0) + ) + .unwrap(); + + if blocks.is_empty() { + writeln!(output, "\n (empty)").unwrap(); + return output; + } + + // Build children map + let mut children_map: HashMap> = HashMap::new(); + for (root, &(_, parent_root)) in blocks { + if !parent_root.is_zero() && blocks.contains_key(&parent_root) { + children_map.entry(parent_root).or_default().push(*root); + } + } + + // Sort children by weight descending, tiebreaker on root hash descending + for children in children_map.values_mut() { + children.sort_by(|a, b| { + let wa = weights.get(a).copied().unwrap_or(0); + let wb = weights.get(b).copied().unwrap_or(0); + wb.cmp(&wa).then_with(|| b.cmp(a)) + }); + } + + let renderer = TreeRenderer { + blocks, + children_map: &children_map, + weights, + head, + }; + + // Find root node (block whose parent is not in the blocks map) + let tree_root = find_tree_root(blocks); + + // Render linear trunk from root until a fork or leaf + output.push('\n'); + let (trunk_tip, trunk_depth) = renderer.render_trunk(&mut output, tree_root); + + // Render branching subtree from the fork point + let children = children_map.get(&trunk_tip).cloned().unwrap_or_default(); + if children.len() > 1 { + let branch_count = children.len(); + writeln!(output, " \u{2500} {branch_count} branches").unwrap(); + renderer.render_branches(&mut output, &children, " ", trunk_depth); + } else if trunk_tip == head { + writeln!(output, " *").unwrap(); + } else { + writeln!(output).unwrap(); + } + + output +} + +/// Holds shared tree data to avoid passing many arguments through recursive calls. +struct TreeRenderer<'a> { + blocks: &'a HashMap, + children_map: &'a HashMap>, + weights: &'a HashMap, + head: H256, +} + +impl TreeRenderer<'_> { + /// Render the linear trunk (chain without forks) starting from `root`. + /// Returns the last rendered node and current depth. + fn render_trunk(&self, output: &mut String, root: H256) -> (H256, usize) { + let mut current = root; + let mut depth = 0; + let mut prev_slot: Option = None; + + write!(output, " ").unwrap(); + + loop { + let &(slot, _) = &self.blocks[¤t]; + + // Insert missing slot indicators + render_gap(output, prev_slot, slot, &mut depth); + + // Render current node + write!(output, "{}({slot})", ShortRoot(¤t.0)).unwrap(); + depth += 1; + + if depth >= MAX_DISPLAY_DEPTH { + write!(output, "\u{2500}\u{2500} ...").unwrap(); + return (current, depth); + } + + let children = self.children_map.get(¤t); + match children.map(|c| c.len()) { + Some(1) => { + write!(output, "\u{2500}\u{2500} ").unwrap(); + prev_slot = Some(slot); + current = children.unwrap()[0]; + } + _ => { + // Fork point or leaf — stop trunk rendering + return (current, depth); + } + } + } + } + + /// Render branches from a fork point using tree connectors. + fn render_branches(&self, output: &mut String, children: &[H256], prefix: &str, depth: usize) { + for (i, &child) in children.iter().enumerate() { + let is_last = i == children.len() - 1; + let connector = if is_last { + "\u{2514}\u{2500}\u{2500} " + } else { + "\u{251c}\u{2500}\u{2500} " + }; + let continuation = if is_last { " " } else { "\u{2502} " }; + + write!(output, "{prefix}{connector}").unwrap(); + self.render_branch_line(output, child, prefix, continuation, depth); + } + } + + /// Render a single branch line, following the chain until a fork or leaf. + fn render_branch_line( + &self, + output: &mut String, + start: H256, + prefix: &str, + continuation: &str, + mut depth: usize, + ) { + let mut current = start; + let parent_slot = self + .blocks + .get(¤t) + .and_then(|&(_, parent)| self.blocks.get(&parent)) + .map(|&(slot, _)| slot); + let mut prev_slot = parent_slot; + + loop { + let &(slot, _) = &self.blocks[¤t]; + + // Insert missing slot indicators + render_gap(output, prev_slot, slot, &mut depth); + + let is_head = current == self.head; + write!(output, "{}({slot})", ShortRoot(¤t.0)).unwrap(); + depth += 1; + + if depth >= MAX_DISPLAY_DEPTH { + writeln!(output, "\u{2500}\u{2500} ...").unwrap(); + return; + } + + let node_children = self.children_map.get(¤t).map(|c| c.as_slice()); + + match node_children.unwrap_or_default() { + [] => { + // Leaf node — show head marker and weight + let head_marker = if is_head { " *" } else { "" }; + let w = self.weights.get(¤t).copied().unwrap_or(0); + writeln!(output, "{head_marker} [w:{w}]").unwrap(); + return; + } + [only_child] => { + // Continue linear chain + if is_head { + write!(output, " *").unwrap(); + } + write!(output, "\u{2500}\u{2500} ").unwrap(); + prev_slot = Some(slot); + current = *only_child; + } + children => { + // Sub-fork + if is_head { + write!(output, " *").unwrap(); + } + let branch_count = children.len(); + writeln!(output, " \u{2500} {branch_count} branches").unwrap(); + let new_prefix = format!("{prefix}{continuation}"); + self.render_branches(output, children, &new_prefix, depth); + return; + } + } + } + } +} + +/// Find the root of the tree (block whose parent is not in the map). +fn find_tree_root(blocks: &HashMap) -> H256 { + blocks + .iter() + .filter(|(_, (_, parent))| parent.is_zero() || !blocks.contains_key(parent)) + .min_by_key(|(_, (slot, _))| *slot) + .map(|(root, _)| *root) + .expect("blocks is non-empty") +} + +/// Write missing-slot indicators between `prev_slot` and `slot`. +fn render_gap(output: &mut String, prev_slot: Option, slot: u64, depth: &mut usize) { + if let Some(ps) = prev_slot { + let gap = slot.saturating_sub(ps).saturating_sub(1); + if gap == 1 { + write!(output, "[ ]\u{2500}\u{2500} ").unwrap(); + *depth += 1; + } else if gap > 1 { + write!(output, "[{gap}]\u{2500}\u{2500} ").unwrap(); + *depth += 1; + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn h(byte: u8) -> H256 { + H256::from([byte; 32]) + } + + fn cp(root: H256, slot: u64) -> Checkpoint { + Checkpoint { root, slot } + } + + #[test] + fn linear_chain() { + // root(0) -> a(1) -> b(2) -> c(3) + let root = h(1); + let a = h(2); + let b = h(3); + let c = h(4); + + let mut blocks = HashMap::new(); + blocks.insert(root, (0, H256::ZERO)); + blocks.insert(a, (1, root)); + blocks.insert(b, (2, a)); + blocks.insert(c, (3, b)); + + let weights = HashMap::new(); + let result = format_fork_choice_tree(&blocks, &weights, c, cp(root, 0), cp(root, 0)); + + assert!(result.contains("Fork Choice Tree:")); + // Should show nodes in sequence + assert!(result.contains(&format!("{}(0)", ShortRoot(&root.0)))); + assert!(result.contains(&format!("{}(3)", ShortRoot(&c.0)))); + // Head should be marked + assert!(result.contains("*")); + } + + #[test] + fn fork_with_two_branches() { + // root(0) -> a(1) -> b(2) [fork point] + // ├── c(3) [head, w:3] + // └── d(3) [w:1] + let root = h(1); + let a = h(2); + let b = h(3); + let c = h(4); + let d = h(5); + + let mut blocks = HashMap::new(); + blocks.insert(root, (0, H256::ZERO)); + blocks.insert(a, (1, root)); + blocks.insert(b, (2, a)); + blocks.insert(c, (3, b)); + blocks.insert(d, (3, b)); + + let mut weights = HashMap::new(); + weights.insert(c, 3); + weights.insert(d, 1); + + let result = format_fork_choice_tree(&blocks, &weights, c, cp(root, 0), cp(root, 0)); + + assert!( + result.contains("\u{251c}\u{2500}\u{2500}") + || result.contains("\u{2514}\u{2500}\u{2500}") + ); + assert!(result.contains("2 branches")); + assert!(result.contains("[w:3]")); + assert!(result.contains("[w:1]")); + assert!(result.contains("*")); + } + + #[test] + fn missing_single_slot() { + // root(0) -> a(2) (slot 1 missing) + let root = h(1); + let a = h(2); + + let mut blocks = HashMap::new(); + blocks.insert(root, (0, H256::ZERO)); + blocks.insert(a, (2, root)); + + let weights = HashMap::new(); + let result = format_fork_choice_tree(&blocks, &weights, a, cp(root, 0), cp(root, 0)); + + assert!(result.contains("[ ]")); + } + + #[test] + fn missing_multiple_slots() { + // root(0) -> a(4) (slots 1-3 missing) + let root = h(1); + let a = h(2); + + let mut blocks = HashMap::new(); + blocks.insert(root, (0, H256::ZERO)); + blocks.insert(a, (4, root)); + + let weights = HashMap::new(); + let result = format_fork_choice_tree(&blocks, &weights, a, cp(root, 0), cp(root, 0)); + + assert!(result.contains("[3]")); + } + + #[test] + fn empty_blocks() { + let blocks = HashMap::new(); + let weights = HashMap::new(); + let result = format_fork_choice_tree( + &blocks, + &weights, + H256::ZERO, + cp(H256::ZERO, 0), + cp(H256::ZERO, 0), + ); + + assert!(result.contains("Fork Choice Tree:")); + assert!(result.contains("(empty)")); + } + + #[test] + fn single_block_chain() { + let root = h(1); + + let mut blocks = HashMap::new(); + blocks.insert(root, (0, H256::ZERO)); + + let weights = HashMap::new(); + let result = format_fork_choice_tree(&blocks, &weights, root, cp(root, 0), cp(root, 0)); + + assert!(result.contains(&format!("{}(0)", ShortRoot(&root.0)))); + assert!(result.contains("*")); + } + + #[test] + fn depth_truncation() { + // Build a chain of 25 blocks (exceeds MAX_DISPLAY_DEPTH=20) + let nodes: Vec = (1..=25).map(h).collect(); + + let mut blocks = HashMap::new(); + blocks.insert(nodes[0], (0, H256::ZERO)); + for i in 1..25 { + blocks.insert(nodes[i], (i as u64, nodes[i - 1])); + } + + let weights = HashMap::new(); + let head = nodes[24]; + let result = + format_fork_choice_tree(&blocks, &weights, head, cp(nodes[0], 0), cp(nodes[0], 0)); + + assert!( + result.contains("..."), + "long chain should be truncated with ..." + ); + // The last node (slot 24) should NOT appear since we truncate at depth 20 + assert!( + !result.contains("(24)"), + "slot 24 should not appear due to truncation" + ); + } + + #[test] + fn nested_fork() { + // root(0) -> a(1) -> b(2) [fork] + // ├── c(3) -> e(4) [sub-fork] + // │ ├── f(5) [head, w:4] + // │ └── g(5) [w:1] + // └── d(3) [w:2] + let root = h(1); + let a = h(2); + let b = h(3); + let c = h(4); + let d = h(5); + let e = h(6); + let f = h(7); + let g = h(8); + + let mut blocks = HashMap::new(); + blocks.insert(root, (0, H256::ZERO)); + blocks.insert(a, (1, root)); + blocks.insert(b, (2, a)); + blocks.insert(c, (3, b)); + blocks.insert(d, (3, b)); + blocks.insert(e, (4, c)); + blocks.insert(f, (5, e)); + blocks.insert(g, (5, e)); + + let mut weights = HashMap::new(); + weights.insert(c, 5); + weights.insert(d, 2); + weights.insert(e, 4); + weights.insert(f, 4); + weights.insert(g, 1); + + let result = format_fork_choice_tree(&blocks, &weights, f, cp(root, 0), cp(root, 0)); + + // Should have two levels of branching + assert!(result.contains("2 branches"), "should show outer fork"); + // Nested fork should also show branches + let branch_count = result.matches("2 branches").count(); + assert_eq!(branch_count, 2, "should show both outer and inner fork"); + assert!(result.contains("[w:4]")); + assert!(result.contains("[w:1]")); + assert!(result.contains("[w:2]")); + } + + #[test] + fn head_marker_on_correct_node() { + // root(0) -> a(1) -> b(2) [fork] + // ├── c(3) [head] + // └── d(3) + let root = h(1); + let a = h(2); + let b = h(3); + let c = h(4); + let d = h(5); + + let mut blocks = HashMap::new(); + blocks.insert(root, (0, H256::ZERO)); + blocks.insert(a, (1, root)); + blocks.insert(b, (2, a)); + blocks.insert(c, (3, b)); + blocks.insert(d, (3, b)); + + let mut weights = HashMap::new(); + weights.insert(c, 5); + weights.insert(d, 2); + + let result = format_fork_choice_tree(&blocks, &weights, c, cp(root, 0), cp(root, 0)); + + // The head node c should have * after it + let c_repr = format!("{}(3)", ShortRoot(&c.0)); + let c_pos = result.find(&c_repr).expect("c should appear in output"); + let after_c = &result[c_pos + c_repr.len()..]; + assert!( + after_c.starts_with(" *"), + "head marker should follow node c, got: {after_c:?}" + ); + } +} diff --git a/crates/blockchain/src/lib.rs b/crates/blockchain/src/lib.rs index 7ad34d3f..06aa6c40 100644 --- a/crates/blockchain/src/lib.rs +++ b/crates/blockchain/src/lib.rs @@ -5,7 +5,7 @@ use ethlambda_state_transition::is_proposer; use ethlambda_storage::Store; use ethlambda_types::{ ShortRoot, - attestation::{Attestation, AttestationData, SignedAttestation}, + attestation::{Attestation, AttestationData, SignedAggregatedAttestation, SignedAttestation}, block::{BlockSignatures, BlockWithAttestation, SignedBlockWithAttestation}, primitives::{H256, ssz::TreeHash}, signature::ValidatorSecretKey, @@ -19,6 +19,7 @@ use tracing::{error, info, trace, warn}; use crate::store::StoreError; +pub(crate) mod fork_choice_tree; pub mod key_manager; pub mod metrics; pub mod store; @@ -30,6 +31,8 @@ pub enum P2PMessage { PublishAttestation(SignedAttestation), /// Publish a block to the gossip network. PublishBlock(SignedBlockWithAttestation), + /// Publish an aggregated attestation to the gossip network. + PublishAggregatedAttestation(SignedAggregatedAttestation), /// Fetch a block by its root hash. FetchBlock(H256), } @@ -38,14 +41,20 @@ pub struct BlockChain { handle: GenServerHandle, } -/// Seconds in a slot. Each slot has 4 intervals of 1 second each. +/// Seconds in a slot. pub const SECONDS_PER_SLOT: u64 = 4; - +/// Milliseconds in a slot. +pub const MILLISECONDS_PER_SLOT: u64 = 4_000; +/// Milliseconds per interval (800ms ticks). +pub const MILLISECONDS_PER_INTERVAL: u64 = 800; +/// Number of intervals per slot (5 intervals of 800ms = 4 seconds). +pub const INTERVALS_PER_SLOT: u64 = 5; impl BlockChain { pub fn spawn( store: Store, p2p_tx: mpsc::UnboundedSender, validator_keys: HashMap, + is_aggregator: bool, ) -> BlockChain { let genesis_time = store.config().genesis_time; let key_manager = key_manager::KeyManager::new(validator_keys); @@ -54,6 +63,7 @@ impl BlockChain { p2p_tx, key_manager, pending_blocks: HashMap::new(), + is_aggregator, pending_block_parents: HashMap::new(), } .start(); @@ -85,6 +95,20 @@ impl BlockChain { .await .inspect_err(|err| error!(%err, "Failed to notify BlockChain of new attestation")); } + + /// Sends an aggregated attestation to the BlockChain for processing. + pub async fn notify_new_aggregated_attestation( + &mut self, + attestation: SignedAggregatedAttestation, + ) { + let _ = self + .handle + .cast(CastMessage::NewAggregatedAttestation(attestation)) + .await + .inspect_err( + |err| error!(%err, "Failed to notify BlockChain of new aggregated attestation"), + ); + } } /// GenServer that sequences all blockchain updates. @@ -104,16 +128,19 @@ struct BlockChainServer { // chain at lookup time, since a cached ancestor may itself have become pending with // a deeper missing parent after the entry was created. pending_block_parents: HashMap, + + /// Whether this node acts as a committee aggregator. + is_aggregator: bool, } impl BlockChainServer { - fn on_tick(&mut self, timestamp: u64) { - let genesis_time = self.store.config().genesis_time; + fn on_tick(&mut self, timestamp_ms: u64) { + let genesis_time_ms = self.store.config().genesis_time * 1000; - // Calculate current slot and interval - let time_since_genesis = timestamp.saturating_sub(genesis_time); - let slot = time_since_genesis / SECONDS_PER_SLOT; - let interval = time_since_genesis % SECONDS_PER_SLOT; + // Calculate current slot and interval from milliseconds + let time_since_genesis_ms = timestamp_ms.saturating_sub(genesis_time_ms); + let slot = time_since_genesis_ms / MILLISECONDS_PER_SLOT; + let interval = (time_since_genesis_ms % MILLISECONDS_PER_SLOT) / MILLISECONDS_PER_INTERVAL; // Update current slot metric metrics::update_current_slot(slot); @@ -126,7 +153,19 @@ impl BlockChainServer { .flatten(); // Tick the store first - this accepts attestations at interval 0 if we have a proposal - store::on_tick(&mut self.store, timestamp, proposer_validator_id.is_some()); + let new_aggregates = store::on_tick( + &mut self.store, + timestamp_ms, + proposer_validator_id.is_some(), + self.is_aggregator, + ); + + for aggregate in new_aggregates { + let _ = self + .p2p_tx + .send(P2PMessage::PublishAggregatedAttestation(aggregate)) + .inspect_err(|err| error!(%err, "Failed to publish aggregated attestation")); + } // Now build and publish the block (after attestations have been accepted) if let Some(validator_id) = proposer_validator_id { @@ -138,7 +177,7 @@ impl BlockChainServer { self.produce_attestations(slot); } - // Update safe target slot metric (updated by store.on_tick at interval 2) + // Update safe target slot metric (updated by store.on_tick at interval 3) metrics::update_safe_target_slot(self.store.safe_target_slot()); // Update head slot metric (head may change when attestations are promoted at intervals 0/3) metrics::update_head_slot(self.store.head_slot()); @@ -438,15 +477,25 @@ impl BlockChainServer { } fn on_gossip_attestation(&mut self, attestation: SignedAttestation) { + if !self.is_aggregator { + warn!("Received unaggregated attestation but node is not an aggregator"); + return; + } let _ = store::on_gossip_attestation(&mut self.store, attestation) .inspect_err(|err| warn!(%err, "Failed to process gossiped attestation")); } + + fn on_gossip_aggregated_attestation(&mut self, attestation: SignedAggregatedAttestation) { + let _ = store::on_gossip_aggregated_attestation(&mut self.store, attestation) + .inspect_err(|err| warn!(%err, "Failed to process gossiped aggregated attestation")); + } } #[derive(Clone, Debug)] enum CastMessage { NewBlock(SignedBlockWithAttestation), NewAttestation(SignedAttestation), + NewAggregatedAttestation(SignedAggregatedAttestation), Tick, } @@ -477,12 +526,13 @@ impl GenServer for BlockChainServer { let timestamp = SystemTime::UNIX_EPOCH .elapsed() .expect("already past the unix epoch"); - self.on_tick(timestamp.as_secs()); - // Schedule the next tick at the start of the next second - let millis_to_next_sec = - ((timestamp.as_secs() as u128 + 1) * 1000 - timestamp.as_millis()) as u64; + self.on_tick(timestamp.as_millis() as u64); + // Schedule the next tick at the next 800ms interval boundary + let ms_since_epoch = timestamp.as_millis() as u64; + let ms_to_next_interval = + MILLISECONDS_PER_INTERVAL - (ms_since_epoch % MILLISECONDS_PER_INTERVAL); send_after( - Duration::from_millis(millis_to_next_sec), + Duration::from_millis(ms_to_next_interval), handle.clone(), message, ); @@ -491,6 +541,9 @@ impl GenServer for BlockChainServer { self.on_block(signed_block); } CastMessage::NewAttestation(attestation) => self.on_gossip_attestation(attestation), + CastMessage::NewAggregatedAttestation(attestation) => { + self.on_gossip_aggregated_attestation(attestation); + } } CastResponse::NoReply } diff --git a/crates/blockchain/src/store.rs b/crates/blockchain/src/store.rs index 1a336422..bce8dc49 100644 --- a/crates/blockchain/src/store.rs +++ b/crates/blockchain/src/store.rs @@ -4,11 +4,12 @@ use ethlambda_crypto::aggregate_signatures; use ethlambda_state_transition::{ is_proposer, process_block, process_slots, slot_is_justifiable_after, }; -use ethlambda_storage::{ForkCheckpoints, SignatureKey, Store}; +use ethlambda_storage::{ForkCheckpoints, SignatureKey, Store, StoredAggregatedPayload}; use ethlambda_types::{ ShortRoot, attestation::{ - AggregatedAttestation, AggregationBits, Attestation, AttestationData, SignedAttestation, + AggregatedAttestation, AggregationBits, Attestation, AttestationData, + SignedAggregatedAttestation, SignedAttestation, }, block::{ AggregatedAttestations, AggregatedSignatureProof, Block, BlockBody, @@ -16,26 +17,29 @@ use ethlambda_types::{ }, primitives::{H256, ssz::TreeHash}, signature::ValidatorSignature, - state::{Checkpoint, State, Validator}, + state::{Checkpoint, State}, }; use tracing::{info, trace, warn}; -use crate::{SECONDS_PER_SLOT, metrics}; +use crate::{INTERVALS_PER_SLOT, MILLISECONDS_PER_INTERVAL, MILLISECONDS_PER_SLOT, metrics}; const JUSTIFICATION_LOOKBACK_SLOTS: u64 = 3; -/// Accept new attestations, moving them from pending to known. -fn accept_new_attestations(store: &mut Store) { - store.promote_new_attestations(); - update_head(store); +/// Accept new aggregated payloads, promoting them to known for fork choice. +fn accept_new_attestations(store: &mut Store, log_tree: bool) { + store.promote_new_aggregated_payloads(); + update_head(store, log_tree); } /// Update the head based on the fork choice rule. -fn update_head(store: &mut Store) { +/// +/// When `log_tree` is true, also computes block weights and logs an ASCII +/// fork choice tree to the terminal. +fn update_head(store: &mut Store, log_tree: bool) { let blocks = store.get_live_chain(); - let attestations: HashMap = store.iter_known_attestations().collect(); + let attestations = store.extract_latest_known_attestations(); let old_head = store.head(); - let new_head = ethlambda_fork_choice::compute_lmd_ghost_head( + let (new_head, weights) = ethlambda_fork_choice::compute_lmd_ghost_head( store.latest_justified().root, &blocks, &attestations, @@ -68,6 +72,17 @@ fn update_head(store: &mut Store) { "Fork choice head updated" ); } + + if log_tree { + let tree = crate::fork_choice_tree::format_fork_choice_tree( + &blocks, + &weights, + new_head, + store.latest_justified(), + store.latest_finalized(), + ); + info!("\n{tree}"); + } } /// Update the safe target for attestation. @@ -78,8 +93,16 @@ fn update_safe_target(store: &mut Store) { let min_target_score = (num_validators * 2).div_ceil(3); let blocks = store.get_live_chain(); - let attestations: HashMap = store.iter_new_attestations().collect(); - let safe_target = ethlambda_fork_choice::compute_lmd_ghost_head( + // Merge both attestation pools. At interval 3 the migration (interval 4) hasn't + // run yet, so attestations that entered "known" directly (proposer's own attestation + // in block body, node's self-attestation) would be invisible without this merge. + let mut all_payloads: HashMap> = + store.iter_known_aggregated_payloads().collect(); + for (key, new_proofs) in store.iter_new_aggregated_payloads() { + all_payloads.entry(key).or_default().extend(new_proofs); + } + let attestations = store.extract_latest_attestations(all_payloads.into_iter()); + let (safe_target, _weights) = ethlambda_fork_choice::compute_lmd_ghost_head( store.latest_justified().root, &blocks, &attestations, @@ -88,15 +111,106 @@ fn update_safe_target(store: &mut Store) { store.set_safe_target(safe_target); } +/// Aggregate committee signatures at interval 2. +/// +/// Collects individual gossip signatures, aggregates them by attestation data, +/// and stores the resulting proofs in `LatestNewAggregatedPayloads`. +fn aggregate_committee_signatures(store: &mut Store) -> Vec { + let gossip_sigs: Vec<(SignatureKey, _)> = store.iter_gossip_signatures().collect(); + if gossip_sigs.is_empty() { + return Vec::new(); + } + + let mut new_aggregates: Vec = Vec::new(); + + let head_state = store.head_state(); + let validators = &head_state.validators; + + // Group gossip signatures by data_root for batch aggregation + let mut groups: HashMap> = HashMap::new(); + let mut keys_to_delete: Vec = Vec::new(); + + for ((validator_id, data_root), stored_sig) in &gossip_sigs { + if let Ok(sig) = stored_sig.to_validator_signature() { + groups + .entry(*data_root) + .or_default() + .push((*validator_id, sig)); + } + } + + for (data_root, validators_and_sigs) in groups { + let Some(data) = store.get_attestation_data_by_root(&data_root) else { + continue; + }; + + let slot = data.slot; + let message = data.tree_hash_root(); + + let mut sigs = vec![]; + let mut pubkeys = vec![]; + let mut ids = vec![]; + + for (vid, sig) in &validators_and_sigs { + let Some(validator) = validators.get(*vid as usize) else { + continue; + }; + let Ok(pubkey) = validator.get_pubkey() else { + continue; + }; + sigs.push(sig.clone()); + pubkeys.push(pubkey); + ids.push(*vid); + } + + if ids.is_empty() { + continue; + } + + let Ok(proof_data) = aggregate_signatures(pubkeys, sigs, &message, slot as u32) + .inspect_err(|err| warn!(%err, "Failed to aggregate committee signatures")) + else { + continue; + }; + + let participants = aggregation_bits_from_validator_indices(&ids); + let proof = AggregatedSignatureProof::new(participants, proof_data); + + new_aggregates.push(SignedAggregatedAttestation { + data: data.clone(), + proof: proof.clone(), + }); + + let payload = StoredAggregatedPayload { slot, proof }; + + // Store in new aggregated payloads for each covered validator + for vid in &ids { + store.insert_new_aggregated_payload((*vid, data_root), payload.clone()); + } + + // Only delete successfully aggregated signatures + keys_to_delete.extend(ids.iter().map(|vid| (*vid, data_root))); + + metrics::inc_pq_sig_aggregated_signatures(); + metrics::inc_pq_sig_attestations_in_aggregated_signatures(ids.len() as u64); + } + + // Delete aggregated entries from gossip_signatures + store.delete_gossip_signatures(&keys_to_delete); + + new_aggregates +} + /// Validate incoming attestation before processing. /// /// Ensures the vote respects the basic laws of time and topology: /// 1. The blocks voted for must exist in our store. /// 2. A vote cannot span backwards in time (source > target). -/// 3. A vote cannot be for a future slot. -fn validate_attestation(store: &Store, attestation: &Attestation) -> Result<(), StoreError> { +/// 3. The head must be at least as recent as source and target. +/// 4. Checkpoint slots must match the actual block slots. +/// 5. A vote cannot be for a future slot. +fn validate_attestation_data(store: &Store, data: &AttestationData) -> Result<(), StoreError> { let _timing = metrics::time_attestation_validation(); - let data = &attestation.data; // Availability Check - We cannot count a vote if we haven't seen the blocks involved. let source_header = store @@ -106,14 +220,20 @@ fn validate_attestation(store: &Store, attestation: &Attestation) -> Result<(), .get_block_header(&data.target.root) .ok_or(StoreError::UnknownTargetBlock(data.target.root))?; - let _ = store + let head_header = store .get_block_header(&data.head.root) .ok_or(StoreError::UnknownHeadBlock(data.head.root))?; - // Topology Check - Source must be older than Target. + // Topology Check - Source must be older than Target, and Head must be at least as recent. if data.source.slot > data.target.slot { return Err(StoreError::SourceExceedsTarget); } + if data.head.slot < data.target.slot { + return Err(StoreError::HeadOlderThanTarget { + head_slot: data.head.slot, + target_slot: data.target.slot, + }); + } // Consistency Check - Validate checkpoint slots match block slots. if source_header.slot != data.source.slot { @@ -128,10 +248,16 @@ fn validate_attestation(store: &Store, attestation: &Attestation) -> Result<(), block_slot: target_header.slot, }); } + if head_header.slot != data.head.slot { + return Err(StoreError::HeadSlotMismatch { + checkpoint_slot: data.head.slot, + block_slot: head_header.slot, + }); + } // Time Check - Validate attestation is not too far in the future. // We allow a small margin for clock disparity (1 slot), but no further. - let current_slot = store.time() / SECONDS_PER_SLOT; + let current_slot = store.time() / INTERVALS_PER_SLOT; if data.slot > current_slot + 1 { return Err(StoreError::AttestationTooFarInFuture { attestation_slot: data.slot, @@ -143,20 +269,35 @@ fn validate_attestation(store: &Store, attestation: &Attestation) -> Result<(), } /// Process a tick event. -pub fn on_tick(store: &mut Store, timestamp: u64, has_proposal: bool) { - let time = timestamp - store.config().genesis_time; +/// +/// `store.time()` represents interval-count-since-genesis: each increment is one +/// 800ms interval. Slot and interval-within-slot are derived as: +/// slot = store.time() / INTERVALS_PER_SLOT +/// interval = store.time() % INTERVALS_PER_SLOT +pub fn on_tick( + store: &mut Store, + timestamp_ms: u64, + has_proposal: bool, + is_aggregator: bool, +) -> Vec { + let mut new_aggregates: Vec = Vec::new(); + + // Convert UNIX timestamp (ms) to interval count since genesis + let genesis_time_ms = store.config().genesis_time * 1000; + let time_delta_ms = timestamp_ms.saturating_sub(genesis_time_ms); + let time = time_delta_ms / MILLISECONDS_PER_INTERVAL; // If we're more than a slot behind, fast-forward to a slot before. // Operations are idempotent, so this should be fine. - if time.saturating_sub(store.time()) > SECONDS_PER_SLOT { - store.set_time(time - SECONDS_PER_SLOT); + if time.saturating_sub(store.time()) > INTERVALS_PER_SLOT { + store.set_time(time - INTERVALS_PER_SLOT); } while store.time() < time { store.set_time(store.time() + 1); - let slot = store.time() / SECONDS_PER_SLOT; - let interval = store.time() % SECONDS_PER_SLOT; + let slot = store.time() / INTERVALS_PER_SLOT; + let interval = store.time() % INTERVALS_PER_SLOT; trace!(%slot, %interval, "processing tick"); @@ -169,92 +310,75 @@ pub fn on_tick(store: &mut Store, timestamp: u64, has_proposal: bool) { 0 => { // Start of slot - process attestations if proposal exists if should_signal_proposal { - accept_new_attestations(store); + accept_new_attestations(store, false); } } 1 => { - // Second interval - no action + // Vote propagation — no action } 2 => { - // Mid-slot - update safe target for validators - update_safe_target(store); + // Aggregation interval + if is_aggregator { + new_aggregates.extend(aggregate_committee_signatures(store)); + } } 3 => { - // End of slot - accept accumulated attestations - accept_new_attestations(store); + // Update safe target for validators + update_safe_target(store); } - _ => unreachable!("slots only have 4 intervals"), + 4 => { + // End of slot - accept accumulated attestations and log tree + accept_new_attestations(store, true); + } + _ => unreachable!("slots only have 5 intervals"), } } -} -/// Process a gossiped attestation (with signature verification). -/// -/// This is the safe default: it always verifies the validator's XMSS signature -/// and stores it for future block building. -pub fn on_gossip_attestation( - store: &mut Store, - signed_attestation: SignedAttestation, -) -> Result<(), StoreError> { - on_gossip_attestation_core(store, signed_attestation, true) + new_aggregates } -/// Process a gossiped attestation without signature verification. +/// Process a gossiped attestation with signature verification. /// -/// This skips all cryptographic checks and signature storage. Use only in tests -/// where signatures are absent or irrelevant. -pub fn on_gossip_attestation_without_verification( - store: &mut Store, - signed_attestation: SignedAttestation, -) -> Result<(), StoreError> { - on_gossip_attestation_core(store, signed_attestation, false) -} - -/// Core gossip attestation processing logic. -/// -/// When `verify` is true, the validator's XMSS signature is checked and stored -/// for future block building. When false, all signature checks are skipped. -fn on_gossip_attestation_core( +/// Verifies the validator's XMSS signature and stores it for later aggregation +/// at interval 2. Only aggregator nodes receive unaggregated gossip attestations. +pub fn on_gossip_attestation( store: &mut Store, signed_attestation: SignedAttestation, - verify: bool, ) -> Result<(), StoreError> { let validator_id = signed_attestation.validator_id; let attestation = Attestation { validator_id, data: signed_attestation.message, }; - validate_attestation(store, &attestation) + validate_attestation_data(store, &attestation.data) .inspect_err(|_| metrics::inc_attestations_invalid("gossip"))?; - if verify { - let target = attestation.data.target; - let target_state = store - .get_state(&target.root) - .ok_or(StoreError::MissingTargetState(target.root))?; - if validator_id >= target_state.validators.len() as u64 { - return Err(StoreError::InvalidValidatorIndex); - } - let validator_pubkey = target_state.validators[validator_id as usize] - .get_pubkey() - .map_err(|_| StoreError::PubkeyDecodingFailed(validator_id))?; - let message = attestation.data.tree_hash_root(); + let data_root = attestation.data.tree_hash_root(); - // Verify the validator's XMSS signature - let epoch: u32 = attestation.data.slot.try_into().expect("slot exceeds u32"); - let signature = ValidatorSignature::from_bytes(&signed_attestation.signature) - .map_err(|_| StoreError::SignatureDecodingFailed)?; - if !signature.is_valid(&validator_pubkey, epoch, &message) { - return Err(StoreError::SignatureVerificationFailed); - } + let target = attestation.data.target; + let target_state = store + .get_state(&target.root) + .ok_or(StoreError::MissingTargetState(target.root))?; + if validator_id >= target_state.validators.len() as u64 { + return Err(StoreError::InvalidValidatorIndex); + } + let validator_pubkey = target_state.validators[validator_id as usize] + .get_pubkey() + .map_err(|_| StoreError::PubkeyDecodingFailed(validator_id))?; + + // Verify the validator's XMSS signature + let epoch: u32 = attestation.data.slot.try_into().expect("slot exceeds u32"); + let signature = ValidatorSignature::from_bytes(&signed_attestation.signature) + .map_err(|_| StoreError::SignatureDecodingFailed)?; + if !signature.is_valid(&validator_pubkey, epoch, &data_root) { + return Err(StoreError::SignatureVerificationFailed); + } - on_attestation(store, attestation.clone(), false)?; + // Store attestation data by root (content-addressed, idempotent) + store.insert_attestation_data_by_root(data_root, attestation.data.clone()); - // Store signature for later lookup during block building - store.insert_gossip_signature(&attestation.data, validator_id, signature); - } else { - on_attestation(store, attestation.clone(), false)?; - } + // Store gossip signature for later aggregation at interval 2. + store.insert_gossip_signature(&attestation.data, validator_id, signature); metrics::inc_attestations_valid("gossip"); @@ -274,67 +398,75 @@ fn on_gossip_attestation_core( Ok(()) } -/// Process a new attestation and place it into the correct attestation stage. +/// Process a gossiped aggregated attestation from the aggregation subnet. /// -/// Attestations can come from: -/// - a block body (on-chain, `is_from_block=true`), or -/// - the gossip network (off-chain, `is_from_block=false`). -/// -/// The Attestation Pipeline: -/// - Stage 1 (latest_new_attestations): Pending attestations not yet counted in fork choice. -/// - Stage 2 (latest_known_attestations): Active attestations used by LMD-GHOST. -fn on_attestation( +/// Aggregated attestations arrive from committee aggregators and contain a proof +/// covering multiple validators. We store one aggregated payload entry per +/// participating validator so the fork choice extraction works uniformly. +pub fn on_gossip_aggregated_attestation( store: &mut Store, - attestation: Attestation, - is_from_block: bool, + aggregated: SignedAggregatedAttestation, ) -> Result<(), StoreError> { - // First, ensure the attestation is structurally and temporally valid. - validate_attestation(store, &attestation)?; - - let validator_id = attestation.validator_id; - let attestation_data = attestation.data; - let attestation_slot = attestation_data.slot; - - if is_from_block { - // On-chain attestation processing - // These are historical attestations from other validators included by the proposer. - // They are processed immediately as "known" attestations. - - let should_update = store - .get_known_attestation(&validator_id) - .is_none_or(|latest| latest.slot < attestation_slot); + validate_attestation_data(store, &aggregated.data) + .inspect_err(|_| metrics::inc_attestations_invalid("aggregated"))?; + + // Verify aggregated proof signature + let target_state = store + .get_state(&aggregated.data.target.root) + .ok_or(StoreError::MissingTargetState(aggregated.data.target.root))?; + let validators = &target_state.validators; + let num_validators = validators.len() as u64; - if should_update { - store.insert_known_attestation(validator_id, attestation_data.clone()); - } + let participant_indices: Vec = aggregated.proof.participant_indices().collect(); + if participant_indices.iter().any(|&vid| vid >= num_validators) { + return Err(StoreError::InvalidValidatorIndex); + } - // Remove pending attestation if superseded by on-chain attestation - if let Some(existing_new) = store.get_new_attestation(&validator_id) - && existing_new.slot <= attestation_slot - { - store.remove_new_attestation(&validator_id); - } - } else { - // Network gossip attestation processing - // These enter the "new" stage and must wait for interval tick acceptance. - - // Reject attestations from future slots - let current_slot = store.time() / SECONDS_PER_SLOT; - if attestation_slot > current_slot { - return Err(StoreError::AttestationTooFarInFuture { - attestation_slot, - current_slot, - }); - } + let pubkeys: Vec<_> = participant_indices + .iter() + .map(|&vid| { + validators[vid as usize] + .get_pubkey() + .map_err(|_| StoreError::PubkeyDecodingFailed(vid)) + }) + .collect::>()?; + + let message = aggregated.data.tree_hash_root(); + let epoch: u32 = aggregated.data.slot.try_into().expect("slot exceeds u32"); + + ethlambda_crypto::verify_aggregated_signature( + &aggregated.proof.proof_data, + pubkeys, + &message, + epoch, + ) + .map_err(StoreError::AggregateVerificationFailed)?; + + // Store attestation data by root (content-addressed, idempotent) + let data_root = aggregated.data.tree_hash_root(); + store.insert_attestation_data_by_root(data_root, aggregated.data.clone()); + + // Store one aggregated payload per participating validator + for validator_id in aggregated.proof.participant_indices() { + let payload = StoredAggregatedPayload { + slot: aggregated.data.slot, + proof: aggregated.proof.clone(), + }; + store.insert_new_aggregated_payload((validator_id, data_root), payload); + } - let should_update = store - .get_new_attestation(&validator_id) - .is_none_or(|latest| latest.slot < attestation_slot); + let slot = aggregated.data.slot; + let num_participants = aggregated.proof.participants.num_set_bits(); + info!( + slot, + num_participants, + target_slot = aggregated.data.target.slot, + target_root = %ShortRoot(&aggregated.data.target.root.0), + source_slot = aggregated.data.source.slot, + "Aggregated attestation processed" + ); - if should_update { - store.insert_new_attestation(validator_id, attestation_data); - } - } + metrics::inc_attestations_valid("aggregated"); Ok(()) } @@ -429,61 +561,55 @@ fn on_block_core( let attestation_signatures = &signed_block.signature.attestation_signatures; // Process block body attestations. - // TODO: fail the block if an attestation is invalid. Right now we - // just log a warning. + // Store attestation data by root and proofs in known aggregated payloads. for (att, proof) in aggregated_attestations .iter() .zip(attestation_signatures.iter()) { + let data_root = att.data.tree_hash_root(); + store.insert_attestation_data_by_root(data_root, att.data.clone()); + let validator_ids = aggregation_bits_to_validator_indices(&att.aggregation_bits); + let payload = StoredAggregatedPayload { + slot: att.data.slot, + proof: proof.clone(), + }; - for validator_id in validator_ids { - // Update Proof Map - Store the proof so future block builders can reuse this aggregation - store.insert_aggregated_payload(&att.data, validator_id, proof.clone()); + for validator_id in &validator_ids { + // Store proof in known aggregated payloads (active in fork choice) + store.insert_known_aggregated_payload((*validator_id, data_root), payload.clone()); - // Update Fork Choice - Register the vote immediately (historical/on-chain) - let attestation = Attestation { - validator_id, - data: att.data.clone(), - }; - // TODO: validate attestations before processing - let _ = on_attestation(store, attestation, true) - .inspect(|_| metrics::inc_attestations_valid("block")) - .inspect_err(|err| { - warn!(%slot, %validator_id, %err, "Invalid attestation in block"); - metrics::inc_attestations_invalid("block"); - }); + metrics::inc_attestations_valid("block"); } } // Update forkchoice head based on new block and attestations // IMPORTANT: This must happen BEFORE processing proposer attestation // to prevent the proposer from gaining circular weight advantage. - update_head(store); + update_head(store, false); - if verify { + // Process proposer attestation as pending (enters "new" stage via gossip path) + // The proposer's attestation should NOT affect this block's fork choice position. + let proposer_vid = proposer_attestation.validator_id; + let proposer_data_root = proposer_attestation.data.tree_hash_root(); + store.insert_attestation_data_by_root(proposer_data_root, proposer_attestation.data.clone()); + + if !verify { + // Without sig verification, insert directly with a dummy proof + let participants = aggregation_bits_from_validator_indices(&[proposer_vid]); + let payload = StoredAggregatedPayload { + slot: proposer_attestation.data.slot, + proof: AggregatedSignatureProof::empty(participants), + }; + store.insert_new_aggregated_payload((proposer_vid, proposer_data_root), payload); + } else { // Store the proposer's signature for potential future block building let proposer_sig = ValidatorSignature::from_bytes(&signed_block.signature.proposer_signature) .map_err(|_| StoreError::SignatureDecodingFailed)?; - store.insert_gossip_signature( - &proposer_attestation.data, - proposer_attestation.validator_id, - proposer_sig, - ); + store.insert_gossip_signature(&proposer_attestation.data, proposer_vid, proposer_sig); } - // Process proposer attestation as if received via gossip - // The proposer's attestation should NOT affect this block's fork choice position. - // It is treated as pending until interval 3 (end of slot). - // TODO: validate attestations before processing - let _ = on_attestation(store, proposer_attestation, false) - .inspect(|_| metrics::inc_attestations_valid("gossip")) - .inspect_err(|err| { - warn!(%slot, %err, "Invalid proposer attestation in block"); - metrics::inc_attestations_invalid("block"); - }); - info!(%slot, %block_root, %state_root, "Processed new block"); Ok(()) } @@ -583,13 +709,13 @@ pub fn produce_attestation_data(store: &Store, slot: u64) -> AttestationData { /// before returning the canonical head. fn get_proposal_head(store: &mut Store, slot: u64) -> H256 { // Calculate time corresponding to this slot - let slot_time = store.config().genesis_time + slot * SECONDS_PER_SLOT; + let slot_time_ms = store.config().genesis_time * 1000 + slot * MILLISECONDS_PER_SLOT; // Advance time to current slot (ticking intervals) - on_tick(store, slot_time, true); + on_tick(store, slot_time_ms, true, false); // Process any pending attestations before proposal - accept_new_attestations(store); + accept_new_attestations(store, false); store.head() } @@ -622,22 +748,19 @@ pub fn produce_block_with_signatures( }); } - // Convert AttestationData to Attestation objects for build_block - let available_attestations: Vec = store - .iter_known_attestations() + // Convert known aggregated payloads to Attestation objects for build_block + let known_attestations = store.extract_latest_known_attestations(); + let available_attestations: Vec = known_attestations + .into_iter() .map(|(validator_id, data)| Attestation { validator_id, data }) .collect(); // Get known block roots for attestation validation let known_block_roots = store.get_block_roots(); - // Collect signature data for block building - let gossip_signatures: HashMap = store - .iter_gossip_signatures() - .filter_map(|(key, stored)| stored.to_validator_signature().ok().map(|sig| (key, sig))) - .collect(); + // Collect existing proofs for block building from known aggregated payloads let aggregated_payloads: HashMap> = store - .iter_aggregated_payloads() + .iter_known_aggregated_payloads() .map(|(key, stored_payloads)| { let proofs = stored_payloads.into_iter().map(|sp| sp.proof).collect(); (key, proofs) @@ -652,7 +775,6 @@ pub fn produce_block_with_signatures( head_root, &available_attestations, &known_block_roots, - &gossip_signatures, &aggregated_payloads, )?; @@ -698,6 +820,9 @@ pub enum StoreError { #[error("Source checkpoint slot exceeds target")] SourceExceedsTarget, + #[error("Head checkpoint slot {head_slot} is older than target slot {target_slot}")] + HeadOlderThanTarget { head_slot: u64, target_slot: u64 }, + #[error("Source checkpoint slot {checkpoint_slot} does not match block slot {block_slot}")] SourceSlotMismatch { checkpoint_slot: u64, @@ -710,6 +835,12 @@ pub enum StoreError { block_slot: u64, }, + #[error("Head checkpoint slot {checkpoint_slot} does not match block slot {block_slot}")] + HeadSlotMismatch { + checkpoint_slot: u64, + block_slot: u64, + }, + #[error( "Attestation slot {attestation_slot} is too far in future (current slot: {current_slot})" )] @@ -815,7 +946,6 @@ fn aggregate_attestations_by_data(attestations: &[Attestation]) -> Vec, - gossip_signatures: &HashMap, aggregated_payloads: &HashMap>, ) -> Result<(Block, State, Vec), StoreError> { // Start with empty attestation set @@ -833,7 +962,7 @@ fn build_block( let mut included_keys: HashSet = HashSet::new(); // Fixed-point loop: collect attestations until no new ones can be added - let post_state = loop { + loop { // Aggregate attestations by data for the candidate block let aggregated = aggregate_attestations_by_data(&included_attestations); let attestations: AggregatedAttestations = aggregated @@ -857,7 +986,7 @@ fn build_block( // No attestation source provided: done after computing post_state if available_attestations.is_empty() || known_block_roots.is_empty() { - break post_state; + break; } // Find new valid attestations matching post-state requirements @@ -882,11 +1011,8 @@ fn build_block( continue; } - // Only include if we have a signature for this attestation - let has_gossip_sig = gossip_signatures.contains_key(&sig_key); - let has_block_proof = aggregated_payloads.contains_key(&sig_key); - - if has_gossip_sig || has_block_proof { + // Only include if we have a proof for this attestation + if aggregated_payloads.contains_key(&sig_key) { new_attestations.push(attestation.clone()); included_keys.insert(sig_key); } @@ -894,20 +1020,16 @@ fn build_block( // Fixed point reached: no new attestations found if new_attestations.is_empty() { - break post_state; + break; } // Add new attestations and continue iteration included_attestations.extend(new_attestations); - }; + } - // Compute the aggregated signatures for the attestations. - let (aggregated_attestations, aggregated_signatures) = compute_aggregated_signatures( - post_state.validators.iter().as_slice(), - &included_attestations, - gossip_signatures, - aggregated_payloads, - )?; + // Select existing proofs for the attestations to include in the block. + let (aggregated_attestations, aggregated_signatures) = + select_aggregated_proofs(&included_attestations, aggregated_payloads)?; let attestations: AggregatedAttestations = aggregated_attestations .try_into() @@ -931,15 +1053,15 @@ fn build_block( Ok((final_block, post_state, aggregated_signatures)) } -/// Compute aggregated signatures for a set of attestations. +/// Select existing aggregated proofs for attestations to include in a block. /// -/// The result is a list of (attestation, proof) pairs ready for block inclusion. -/// This list might contain attestations with the same data but different signatures. -/// Once we support recursive aggregation, we can aggregate these further. -fn compute_aggregated_signatures( - validators: &[Validator], +/// Fresh gossip aggregation happens at interval 2 (`aggregate_committee_signatures`). +/// This function only selects from existing proofs in the `LatestKnownAggregatedPayloads` table +/// (proofs from previously received blocks and promoted gossip aggregations). +/// +/// Returns a list of (attestation, proof) pairs ready for block inclusion. +fn select_aggregated_proofs( attestations: &[Attestation], - gossip_signatures: &HashMap, aggregated_payloads: &HashMap>, ) -> Result<(Vec, Vec), StoreError> { let mut results = vec![]; @@ -949,51 +1071,14 @@ fn compute_aggregated_signatures( let message = data.tree_hash_root(); let validator_ids = aggregation_bits_to_validator_indices(&aggregated.aggregation_bits); + let mut remaining: HashSet = validator_ids.into_iter().collect(); - // Phase 1: Gossip Collection - // Try to aggregate fresh signatures from the network. - let mut gossip_sigs = vec![]; - let mut gossip_keys = vec![]; - let mut gossip_ids = vec![]; - - let mut remaining = HashSet::new(); - - for vid in &validator_ids { - let key = (*vid, message); - if let Some(sig) = gossip_signatures.get(&key) { - let pubkey = validators[*vid as usize] - .get_pubkey() - .expect("valid pubkey"); - - gossip_sigs.push(sig.clone()); - gossip_keys.push(pubkey); - gossip_ids.push(*vid); - } else { - remaining.insert(*vid); - } - } - - if !gossip_ids.is_empty() { - let participants = aggregation_bits_from_validator_indices(&gossip_ids); - let proof_data = - aggregate_signatures(gossip_keys, gossip_sigs, &message, data.slot as u32) - .map_err(StoreError::SignatureAggregationFailed)?; - let aggregated_attestation = AggregatedAttestation { - aggregation_bits: participants.clone(), - data: data.clone(), + // Select existing proofs that cover the most remaining validators + while !remaining.is_empty() { + let Some(&target_id) = remaining.iter().next() else { + break; }; - let aggregate_proof = AggregatedSignatureProof::new(participants, proof_data); - results.push((aggregated_attestation, aggregate_proof)); - metrics::inc_pq_sig_aggregated_signatures(); - metrics::inc_pq_sig_attestations_in_aggregated_signatures(gossip_ids.len() as u64); - } - - // Phase 2: Fallback to existing proofs - // We might have seen proofs for missing signatures in previously-received blocks. - while !aggregated_payloads.is_empty() - && let Some(&target_id) = remaining.iter().next() - { let Some(candidates) = aggregated_payloads .get(&(target_id, message)) .filter(|v| !v.is_empty()) @@ -1013,10 +1098,11 @@ fn compute_aggregated_signatures( .max_by_key(|(_, covered)| covered.len()) .expect("candidates is not empty"); - // Sanity check: ensure proof covers at least one remaining validator + // No proof covers any remaining validator if covered.is_empty() { break; } + let aggregate = AggregatedAttestation { aggregation_bits: proof.participants.clone(), data: data.clone(), diff --git a/crates/blockchain/tests/forkchoice_spectests.rs b/crates/blockchain/tests/forkchoice_spectests.rs index 092f4165..bdce14fb 100644 --- a/crates/blockchain/tests/forkchoice_spectests.rs +++ b/crates/blockchain/tests/forkchoice_spectests.rs @@ -5,14 +5,35 @@ use std::{ }; use ethlambda_blockchain::{SECONDS_PER_SLOT, store}; -use ethlambda_storage::{Store, backend::InMemoryBackend}; +use ethlambda_storage::{SignatureKey, Store, StoredAggregatedPayload, backend::InMemoryBackend}; use ethlambda_types::{ - attestation::Attestation, + attestation::{Attestation, AttestationData}, block::{Block, BlockSignatures, BlockWithAttestation, SignedBlockWithAttestation}, primitives::{H256, VariableList, ssz::TreeHash}, state::State, }; +/// Extract per-validator attestation data from aggregated payloads. +/// Test helper that mirrors the private function in blockchain::store. +fn extract_attestations( + store: &Store, + payloads: impl Iterator)>, +) -> HashMap { + let mut result: HashMap = HashMap::new(); + for ((validator_id, data_root), _) in payloads { + let Some(data) = store.get_attestation_data_by_root(&data_root) else { + continue; + }; + let should_update = result + .get(&validator_id) + .is_none_or(|existing| existing.slot < data.slot); + if should_update { + result.insert(validator_id, data); + } + } + result +} + use crate::types::{ForkChoiceTestVector, StoreChecks}; const SUPPORTED_FIXTURE_FORMAT: &str = "fork_choice_test"; @@ -58,11 +79,11 @@ fn run(path: &Path) -> datatest_stable::Result<()> { let signed_block = build_signed_block(block_data); - let block_time = - signed_block.message.block.slot * SECONDS_PER_SLOT + genesis_time; + let block_time_ms = + (signed_block.message.block.slot * SECONDS_PER_SLOT + genesis_time) * 1000; // NOTE: the has_proposal argument is set to true, following the spec - store::on_tick(&mut store, block_time, true); + store::on_tick(&mut store, block_time_ms, true, false); let result = store::on_block_without_verification(&mut store, signed_block); match (result.is_ok(), step.valid) { @@ -85,9 +106,9 @@ fn run(path: &Path) -> datatest_stable::Result<()> { } } "tick" => { - let timestamp = step.time.expect("tick step missing time"); + let timestamp_ms = step.time.expect("tick step missing time") * 1000; // NOTE: the has_proposal argument is set to false, following the spec - store::on_tick(&mut store, timestamp, false); + store::on_tick(&mut store, timestamp_ms, false, false); } other => { // Fail for unsupported step types for now @@ -280,14 +301,12 @@ fn validate_attestation_check( check: &types::AttestationCheck, step_idx: usize, ) -> datatest_stable::Result<()> { - use ethlambda_types::attestation::AttestationData; - let validator_id = check.validator; let location = check.location.as_str(); let attestations: HashMap = match location { - "new" => st.iter_new_attestations().collect(), - "known" => st.iter_known_attestations().collect(), + "new" => extract_attestations(st, st.iter_new_aggregated_payloads()), + "known" => extract_attestations(st, st.iter_known_aggregated_payloads()), other => { return Err( format!("Step {}: unknown attestation location: {}", step_idx, other).into(), @@ -367,7 +386,8 @@ fn validate_lexicographic_head_among( } let blocks = st.get_live_chain(); - let known_attestations: HashMap = st.iter_known_attestations().collect(); + let known_attestations: HashMap = + extract_attestations(st, st.iter_known_aggregated_payloads()); // Resolve all fork labels to roots and compute their weights // Map: label -> (root, slot, weight) diff --git a/crates/blockchain/tests/signature_spectests.rs b/crates/blockchain/tests/signature_spectests.rs index 78940d4a..16e4de4f 100644 --- a/crates/blockchain/tests/signature_spectests.rs +++ b/crates/blockchain/tests/signature_spectests.rs @@ -50,8 +50,9 @@ fn run(path: &Path) -> datatest_stable::Result<()> { let signed_block: SignedBlockWithAttestation = test.signed_block_with_attestation.into(); // Advance time to the block's slot - let block_time = signed_block.message.block.slot * SECONDS_PER_SLOT + genesis_time; - store::on_tick(&mut st, block_time, true); + let block_time_ms = + (signed_block.message.block.slot * SECONDS_PER_SLOT + genesis_time) * 1000; + store::on_tick(&mut st, block_time_ms, true, false); // Process the block (this includes signature verification) let result = store::on_block(&mut st, signed_block); @@ -87,6 +88,6 @@ fn run(path: &Path) -> datatest_stable::Result<()> { datatest_stable::harness!({ test = run, - root = "../../../ethlambda/leanSpec/fixtures/consensus/verify_signatures", + root = "../../leanSpec/fixtures/consensus/verify_signatures", pattern = r".*\.json" }); diff --git a/crates/blockchain/tests/signature_types.rs b/crates/blockchain/tests/signature_types.rs index b52e2b32..079f7672 100644 --- a/crates/blockchain/tests/signature_types.rs +++ b/crates/blockchain/tests/signature_types.rs @@ -4,41 +4,10 @@ use ethlambda_types::block::{ AggregatedSignatureProof, AttestationSignatures, BlockSignatures, BlockWithAttestation, SignedBlockWithAttestation, }; -use ethlambda_types::primitives::ssz::{Decode as SszDecode, Encode as SszEncode}; use serde::Deserialize; -use ssz_types::FixedVector; -use ssz_types::typenum::{U28, U32}; use std::collections::HashMap; use std::path::Path; -// ============================================================================ -// SSZ Types matching leansig's GeneralizedXMSSSignature structure -// ============================================================================ - -/// A single hash digest (8 field elements = 32 bytes) -pub type HashDigest = FixedVector; - -/// Randomness (7 field elements = 28 bytes) -pub type Rho = FixedVector; - -/// SSZ-compatible HashTreeOpening matching leansig's structure -#[derive(Clone, SszEncode, SszDecode)] -pub struct SszHashTreeOpening { - pub co_path: Vec, -} - -/// SSZ-compatible XMSS Signature matching leansig's GeneralizedXMSSSignature -#[derive(Clone, SszEncode, SszDecode)] -pub struct SszXmssSignature { - pub path: SszHashTreeOpening, - pub rho: Rho, - pub hashes: Vec, -} - -// ============================================================================ -// Root Structures -// ============================================================================ - /// Root struct for verify signatures test vectors #[derive(Debug, Clone, Deserialize)] pub struct VerifySignaturesTestVector { @@ -92,7 +61,7 @@ impl From for SignedBlockWithAttestation { proposer_attestation: value.message.proposer_attestation.into(), }; - let proposer_signature = value.signature.proposer_signature.to_xmss_signature(); + let proposer_signature = value.signature.proposer_signature; // Convert attestation signatures to AggregatedSignatureProof. // Each proof contains the participants bitfield from the test data. @@ -139,110 +108,12 @@ pub struct TestBlockWithAttestation { #[derive(Debug, Clone, Deserialize)] #[allow(dead_code)] pub struct TestSignatureBundle { - #[serde(rename = "proposerSignature")] - pub proposer_signature: ProposerSignature, + #[serde(rename = "proposerSignature", deserialize_with = "deser_xmss_hex")] + pub proposer_signature: XmssSignature, #[serde(rename = "attestationSignatures")] pub attestation_signatures: Container, } -/// XMSS signature structure as it appears in JSON -#[derive(Debug, Clone, Deserialize)] -pub struct ProposerSignature { - pub path: SignaturePath, - pub rho: RhoData, - pub hashes: HashesData, -} - -impl ProposerSignature { - /// Convert to XmssSignature (FixedVector of bytes). - /// - /// Constructs an SSZ-encoded signature matching leansig's GeneralizedXMSSSignature format. - pub fn to_xmss_signature(&self) -> XmssSignature { - // Build SSZ types from JSON data - let ssz_sig = self.to_ssz_signature(); - - // Encode to SSZ bytes - let bytes = ssz_sig.as_ssz_bytes(); - - // Pad to exactly SignatureSize bytes (3112) - let sig_size = 3112; - let mut padded = bytes.clone(); - padded.resize(sig_size, 0); - - XmssSignature::new(padded).expect("signature size mismatch") - } - - /// Convert to SSZ signature type - fn to_ssz_signature(&self) -> SszXmssSignature { - // Convert path siblings to HashDigest (Vec of 32 bytes each) - let co_path: Vec = self - .path - .siblings - .data - .iter() - .map(|sibling| { - let bytes: Vec = sibling - .data - .iter() - .flat_map(|&val| val.to_le_bytes()) - .collect(); - HashDigest::new(bytes).expect("Invalid sibling length") - }) - .collect(); - - // Convert rho (7 field elements = 28 bytes) - let rho_bytes: Vec = self - .rho - .data - .iter() - .flat_map(|&val| val.to_le_bytes()) - .collect(); - let rho = Rho::new(rho_bytes).expect("Invalid rho length"); - - // Convert hashes to HashDigest - let hashes: Vec = self - .hashes - .data - .iter() - .map(|hash| { - let bytes: Vec = hash - .data - .iter() - .flat_map(|&val| val.to_le_bytes()) - .collect(); - HashDigest::new(bytes).expect("Invalid hash length") - }) - .collect(); - - SszXmssSignature { - path: SszHashTreeOpening { co_path }, - rho, - hashes, - } - } -} - -#[derive(Debug, Clone, Deserialize)] -pub struct SignaturePath { - pub siblings: Container, -} - -#[derive(Debug, Clone, Deserialize)] -#[allow(dead_code)] -pub struct HashElement { - pub data: [u32; 8], -} - -#[derive(Debug, Clone, Deserialize)] -pub struct RhoData { - pub data: [u32; 7], -} - -#[derive(Debug, Clone, Deserialize)] -pub struct HashesData { - pub data: Vec, -} - /// Attestation signature from a validator /// Note: proofData is for future SNARK aggregation, currently just placeholder #[derive(Debug, Clone, Deserialize)] @@ -258,3 +129,19 @@ pub struct AttestationSignature { pub struct ProofData { pub data: String, } + +// ============================================================================ +// Helpers +// ============================================================================ + +pub fn deser_xmss_hex<'de, D>(d: D) -> Result +where + D: serde::Deserializer<'de>, +{ + use serde::de::Error; + + let value = String::deserialize(d)?; + let bytes = hex::decode(value.strip_prefix("0x").unwrap_or(&value)) + .map_err(|_| D::Error::custom("XmssSignature value is not valid hex"))?; + XmssSignature::new(bytes).map_err(|_| D::Error::custom("XmssSignature length != 3112")) +} diff --git a/crates/common/types/src/attestation.rs b/crates/common/types/src/attestation.rs index 309901a2..6390a2bb 100644 --- a/crates/common/types/src/attestation.rs +++ b/crates/common/types/src/attestation.rs @@ -1,4 +1,5 @@ use crate::{ + block::AggregatedSignatureProof, primitives::ssz::{Decode, Encode, TreeHash}, signature::SignatureSize, state::{Checkpoint, ValidatorRegistryLimit}, @@ -61,3 +62,10 @@ pub struct AggregatedAttestation { /// A general-purpose bitfield for tracking which validators have participated /// in some collective action (attestation, signature aggregation, etc.). pub type AggregationBits = ssz_types::BitList; + +/// Aggregated attestation with its signature proof, used for gossip on the aggregation topic. +#[derive(Debug, Clone, Encode, Decode)] +pub struct SignedAggregatedAttestation { + pub data: AttestationData, + pub proof: AggregatedSignatureProof, +} diff --git a/crates/common/types/src/block.rs b/crates/common/types/src/block.rs index 634c7953..3c152b52 100644 --- a/crates/common/types/src/block.rs +++ b/crates/common/types/src/block.rs @@ -102,6 +102,13 @@ impl AggregatedSignatureProof { proof_data: ByteList::empty(), } } + + /// Returns the validator indices that are set in the participants bitfield. + pub fn participant_indices(&self) -> impl Iterator + '_ { + (0..self.participants.len()) + .filter(|&i| self.participants.get(i).unwrap_or(false)) + .map(|i| i as u64) + } } /// Bundle containing a block and the proposer's attestation. diff --git a/crates/net/p2p/src/gossipsub/handler.rs b/crates/net/p2p/src/gossipsub/handler.rs index 9335270e..a52b65f2 100644 --- a/crates/net/p2p/src/gossipsub/handler.rs +++ b/crates/net/p2p/src/gossipsub/handler.rs @@ -1,6 +1,6 @@ use ethlambda_types::{ ShortRoot, - attestation::SignedAttestation, + attestation::{SignedAggregatedAttestation, SignedAttestation}, block::SignedBlockWithAttestation, primitives::ssz::{Decode, Encode, TreeHash}, }; @@ -9,7 +9,7 @@ use tracing::{error, info, trace}; use super::{ encoding::{compress_message, decompress_message}, - messages::{ATTESTATION_TOPIC_KIND, BLOCK_TOPIC_KIND}, + messages::{AGGREGATION_TOPIC_KIND, ATTESTATION_SUBNET_TOPIC_PREFIX, BLOCK_TOPIC_KIND}, }; use crate::P2PServer; @@ -22,7 +22,8 @@ pub async fn handle_gossipsub_message(server: &mut P2PServer, event: Event) { else { unreachable!("we already matched on event_loop"); }; - match message.topic.as_str().split("/").nth(3) { + let topic_kind = message.topic.as_str().split("/").nth(3); + match topic_kind { Some(BLOCK_TOPIC_KIND) => { let Ok(uncompressed_data) = decompress_message(&message.data) .inspect_err(|err| error!(%err, "Failed to decompress gossipped block")) @@ -50,7 +51,33 @@ pub async fn handle_gossipsub_message(server: &mut P2PServer, event: Event) { ); server.blockchain.notify_new_block(signed_block).await; } - Some(ATTESTATION_TOPIC_KIND) => { + Some(AGGREGATION_TOPIC_KIND) => { + let Ok(uncompressed_data) = decompress_message(&message.data) + .inspect_err(|err| error!(%err, "Failed to decompress gossipped aggregation")) + else { + return; + }; + + let Ok(aggregation) = SignedAggregatedAttestation::from_ssz_bytes(&uncompressed_data) + .inspect_err(|err| error!(?err, "Failed to decode gossipped aggregation")) + else { + return; + }; + let slot = aggregation.data.slot; + info!( + %slot, + target_slot = aggregation.data.target.slot, + target_root = %ShortRoot(&aggregation.data.target.root.0), + source_slot = aggregation.data.source.slot, + source_root = %ShortRoot(&aggregation.data.source.root.0), + "Received aggregated attestation from gossip" + ); + server + .blockchain + .notify_new_aggregated_attestation(aggregation) + .await; + } + Some(kind) if kind.starts_with(ATTESTATION_SUBNET_TOPIC_PREFIX) => { let Ok(uncompressed_data) = decompress_message(&message.data) .inspect_err(|err| error!(%err, "Failed to decompress gossipped attestation")) else { @@ -95,7 +122,7 @@ pub async fn publish_attestation(server: &mut P2PServer, attestation: SignedAtte // Compress with raw snappy let compressed = compress_message(&ssz_bytes); - // Publish to gossipsub + // Publish to the attestation subnet topic let _ = server .swarm .behaviour_mut() @@ -148,3 +175,36 @@ pub async fn publish_block(server: &mut P2PServer, signed_block: SignedBlockWith |err| tracing::warn!(%slot, %proposer, %err, "Failed to publish block to gossipsub"), ); } + +pub async fn publish_aggregated_attestation( + server: &mut P2PServer, + attestation: SignedAggregatedAttestation, +) { + let slot = attestation.data.slot; + + // Encode to SSZ + let ssz_bytes = attestation.as_ssz_bytes(); + + // Compress with raw snappy + let compressed = compress_message(&ssz_bytes); + + // Publish to the aggregation topic + let _ = server + .swarm + .behaviour_mut() + .gossipsub + .publish(server.aggregation_topic.clone(), compressed) + .inspect(|_| { + info!( + %slot, + target_slot = attestation.data.target.slot, + target_root = %ShortRoot(&attestation.data.target.root.0), + source_slot = attestation.data.source.slot, + source_root = %ShortRoot(&attestation.data.source.root.0), + "Published aggregated attestation to gossipsub" + ) + }) + .inspect_err(|err| { + tracing::warn!(%slot, %err, "Failed to publish aggregated attestation to gossipsub") + }); +} diff --git a/crates/net/p2p/src/gossipsub/messages.rs b/crates/net/p2p/src/gossipsub/messages.rs index 7df4db14..2b31eb6a 100644 --- a/crates/net/p2p/src/gossipsub/messages.rs +++ b/crates/net/p2p/src/gossipsub/messages.rs @@ -1,4 +1,10 @@ /// Topic kind for block gossip pub const BLOCK_TOPIC_KIND: &str = "block"; -/// Topic kind for attestation gossip -pub const ATTESTATION_TOPIC_KIND: &str = "attestation"; +/// Topic kind prefix for per-committee attestation subnets. +/// +/// Full topic format: `/leanconsensus/{network}/attestation_{subnet_id}/ssz_snappy` +pub const ATTESTATION_SUBNET_TOPIC_PREFIX: &str = "attestation"; +/// Topic kind for aggregated attestation gossip. +/// +/// Full topic format: `/leanconsensus/{network}/aggregation/ssz_snappy` +pub const AGGREGATION_TOPIC_KIND: &str = "aggregation"; diff --git a/crates/net/p2p/src/gossipsub/mod.rs b/crates/net/p2p/src/gossipsub/mod.rs index a66855ea..a6924286 100644 --- a/crates/net/p2p/src/gossipsub/mod.rs +++ b/crates/net/p2p/src/gossipsub/mod.rs @@ -2,5 +2,7 @@ mod encoding; mod handler; mod messages; -pub use handler::{handle_gossipsub_message, publish_attestation, publish_block}; -pub use messages::{ATTESTATION_TOPIC_KIND, BLOCK_TOPIC_KIND}; +pub use handler::{ + handle_gossipsub_message, publish_aggregated_attestation, publish_attestation, publish_block, +}; +pub use messages::{AGGREGATION_TOPIC_KIND, ATTESTATION_SUBNET_TOPIC_PREFIX, BLOCK_TOPIC_KIND}; diff --git a/crates/net/p2p/src/lib.rs b/crates/net/p2p/src/lib.rs index b83196f0..2ebec1d3 100644 --- a/crates/net/p2p/src/lib.rs +++ b/crates/net/p2p/src/lib.rs @@ -24,7 +24,10 @@ use tokio::sync::mpsc; use tracing::{info, trace, warn}; use crate::{ - gossipsub::{ATTESTATION_TOPIC_KIND, BLOCK_TOPIC_KIND, publish_attestation, publish_block}, + gossipsub::{ + AGGREGATION_TOPIC_KIND, ATTESTATION_SUBNET_TOPIC_PREFIX, BLOCK_TOPIC_KIND, + publish_aggregated_attestation, publish_attestation, publish_block, + }, req_resp::{ BLOCKS_BY_ROOT_PROTOCOL_V1, Codec, MAX_COMPRESSED_PAYLOAD_SIZE, Request, STATUS_PROTOCOL_V1, build_status, fetch_block_from_peer, @@ -53,6 +56,7 @@ pub(crate) struct PendingRequest { pub(crate) last_peer: Option, } +#[allow(clippy::too_many_arguments)] pub async fn start_p2p( node_key: Vec, bootnodes: Vec, @@ -60,7 +64,10 @@ pub async fn start_p2p( blockchain: BlockChain, p2p_rx: mpsc::UnboundedReceiver, store: Store, -) { + validator_id: Option, + attestation_committee_count: u64, + is_aggregator: bool, +) -> Result<(), libp2p::gossipsub::SubscriptionError> { let config = libp2p::gossipsub::ConfigBuilder::default() // d .mesh_n(8) @@ -81,9 +88,7 @@ pub async fn start_p2p( // Taken from ream .max_transmit_size(MAX_COMPRESSED_PAYLOAD_SIZE) .max_messages_per_rpc(Some(500)) - .validate_messages() .allow_self_origin(true) - .flood_publish(false) .idontwant_message_size_threshold(1000) .build() .expect("invalid gossipsub config"); @@ -152,20 +157,47 @@ pub async fn start_p2p( .expect("failed to bind gossipsub listening address"); let network = "devnet0"; - let topic_kinds = [BLOCK_TOPIC_KIND, ATTESTATION_TOPIC_KIND]; - for topic_kind in topic_kinds { - let topic_str = format!("/leanconsensus/{network}/{topic_kind}/ssz_snappy"); - let topic = libp2p::gossipsub::IdentTopic::new(topic_str); - swarm.behaviour_mut().gossipsub.subscribe(&topic).unwrap(); - } - // Create topics for outbound messages - let attestation_topic = libp2p::gossipsub::IdentTopic::new(format!( - "/leanconsensus/{network}/{ATTESTATION_TOPIC_KIND}/ssz_snappy" - )); - let block_topic = libp2p::gossipsub::IdentTopic::new(format!( - "/leanconsensus/{network}/{BLOCK_TOPIC_KIND}/ssz_snappy" - )); + // Subscribe to block topic (all nodes) + let block_topic_str = format!("/leanconsensus/{network}/{BLOCK_TOPIC_KIND}/ssz_snappy"); + let block_topic = libp2p::gossipsub::IdentTopic::new(block_topic_str); + swarm + .behaviour_mut() + .gossipsub + .subscribe(&block_topic) + .unwrap(); + + // Subscribe to aggregation topic (all validators) + let aggregation_topic_str = + format!("/leanconsensus/{network}/{AGGREGATION_TOPIC_KIND}/ssz_snappy"); + let aggregation_topic = libp2p::gossipsub::IdentTopic::new(aggregation_topic_str); + swarm + .behaviour_mut() + .gossipsub + .subscribe(&aggregation_topic) + .unwrap(); + + // Build attestation subnet topic (needed for publishing even without subscribing) + // attestation_committee_count is validated to be >= 1 by clap at CLI parse time. + let subnet_id = validator_id.map(|vid| vid % attestation_committee_count); + let attestation_topic_kind = match subnet_id { + Some(id) => format!("{ATTESTATION_SUBNET_TOPIC_PREFIX}_{id}"), + // Non-validators use subnet 0 for publishing + None => format!("{ATTESTATION_SUBNET_TOPIC_PREFIX}_0"), + }; + let attestation_topic_str = + format!("/leanconsensus/{network}/{attestation_topic_kind}/ssz_snappy"); + let attestation_topic = libp2p::gossipsub::IdentTopic::new(attestation_topic_str); + + // Only aggregators subscribe to attestation subnets; non-aggregators + // publish via gossipsub's fanout mechanism without subscribing. + if is_aggregator { + swarm + .behaviour_mut() + .gossipsub + .subscribe(&attestation_topic)?; + info!(%attestation_topic_kind, "Subscribed to attestation subnet"); + } info!(socket=%listening_socket, "P2P node started"); @@ -178,6 +210,7 @@ pub async fn start_p2p( p2p_rx, attestation_topic, block_topic, + aggregation_topic, connected_peers: HashSet::new(), pending_requests: HashMap::new(), request_id_map: HashMap::new(), @@ -187,6 +220,7 @@ pub async fn start_p2p( }; event_loop(server).await; + Ok(()) } /// [libp2p Behaviour](libp2p::swarm::NetworkBehaviour) combining Gossipsub and Request-Response Behaviours @@ -203,6 +237,7 @@ pub(crate) struct P2PServer { pub(crate) p2p_rx: mpsc::UnboundedReceiver, pub(crate) attestation_topic: libp2p::gossipsub::IdentTopic, pub(crate) block_topic: libp2p::gossipsub::IdentTopic, + pub(crate) aggregation_topic: libp2p::gossipsub::IdentTopic, pub(crate) connected_peers: HashSet, pub(crate) pending_requests: HashMap, pub(crate) request_id_map: HashMap, @@ -371,6 +406,9 @@ async fn handle_p2p_message(server: &mut P2PServer, message: P2PMessage) { P2PMessage::PublishBlock(signed_block) => { publish_block(server, signed_block).await; } + P2PMessage::PublishAggregatedAttestation(attestation) => { + publish_aggregated_attestation(server, attestation).await; + } P2PMessage::FetchBlock(root) => { // Deduplicate - if already pending, ignore if server.pending_requests.contains_key(&root) { diff --git a/crates/net/p2p/src/metrics.rs b/crates/net/p2p/src/metrics.rs index 9f24f992..00224d74 100644 --- a/crates/net/p2p/src/metrics.rs +++ b/crates/net/p2p/src/metrics.rs @@ -17,7 +17,7 @@ static NODE_NAME_REGISTRY: LazyLock>> = pub fn populate_name_registry(names_and_privkeys: HashMap) { let mut registry = NODE_NAME_REGISTRY.write().unwrap(); - let name_registry = names_and_privkeys + *registry = names_and_privkeys .into_iter() .filter_map(|(name, mut privkey)| { let Ok(privkey) = secp256k1::SecretKey::try_from_bytes(&mut privkey) else { @@ -31,7 +31,6 @@ pub fn populate_name_registry(names_and_privkeys: HashMap) { Some((peer_id, &*name.leak())) }) .collect(); - *registry = name_registry; } fn resolve(peer_id: &Option) -> &'static str { diff --git a/crates/net/p2p/src/req_resp/codec.rs b/crates/net/p2p/src/req_resp/codec.rs index 70d38a56..0a8a0fd6 100644 --- a/crates/net/p2p/src/req_resp/codec.rs +++ b/crates/net/p2p/src/req_resp/codec.rs @@ -7,12 +7,13 @@ use tracing::{debug, trace}; use super::{ encoding::{decode_payload, write_payload}, messages::{ - BLOCKS_BY_ROOT_PROTOCOL_V1, BlocksByRootRequest, ErrorMessage, Request, Response, - ResponseCode, ResponsePayload, STATUS_PROTOCOL_V1, Status, + BLOCKS_BY_ROOT_PROTOCOL_V1, ErrorMessage, Request, Response, ResponseCode, ResponsePayload, + STATUS_PROTOCOL_V1, Status, }, }; use ethlambda_types::block::SignedBlockWithAttestation; +use ethlambda_types::primitives::ssz::Decode as SszDecode; #[derive(Debug, Clone, Default)] pub struct Codec; @@ -41,10 +42,9 @@ impl libp2p::request_response::Codec for Codec { Ok(Request::Status(status)) } BLOCKS_BY_ROOT_PROTOCOL_V1 => { - let request = - BlocksByRootRequest::from_ssz_bytes_compat(&payload).map_err(|err| { - io::Error::new(io::ErrorKind::InvalidData, format!("{err:?}")) - })?; + let request = SszDecode::from_ssz_bytes(&payload).map_err(|err| { + io::Error::new(io::ErrorKind::InvalidData, format!("{err:?}")) + })?; Ok(Request::BlocksByRoot(request)) } _ => Err(io::Error::new( diff --git a/crates/net/p2p/src/req_resp/messages.rs b/crates/net/p2p/src/req_resp/messages.rs index 3826f5b3..607c918c 100644 --- a/crates/net/p2p/src/req_resp/messages.rs +++ b/crates/net/p2p/src/req_resp/messages.rs @@ -2,7 +2,7 @@ use ethlambda_types::{ block::SignedBlockWithAttestation, primitives::{ H256, - ssz::{Decode, Decode as SszDecode, Encode}, + ssz::{Decode, Encode}, }, state::Checkpoint, }; @@ -139,55 +139,3 @@ pub fn error_message(msg: impl AsRef) -> ErrorMessage { pub struct BlocksByRootRequest { pub roots: RequestedBlockRoots, } - -impl BlocksByRootRequest { - /// Decode from SSZ bytes with backward compatibility. - /// - /// Tries to decode as new format (container with `roots` field) first. - /// Falls back to old format (transparent - direct list of roots) if that fails. - pub fn from_ssz_bytes_compat(bytes: &[u8]) -> Result { - // Try new format (container) first - SszDecode::from_ssz_bytes(bytes).or_else(|_| { - // Fall back to old format (transparent/direct list) - SszDecode::from_ssz_bytes(bytes).map(|roots| Self { roots }) - }) - } -} - -#[cfg(test)] -mod tests { - use super::*; - use ssz::Encode as SszEncode; - - #[test] - fn test_blocks_by_root_backward_compatibility() { - // Create some test roots - let root1 = H256::from_slice(&[1u8; 32]); - let root2 = H256::from_slice(&[2u8; 32]); - let roots_list = - RequestedBlockRoots::new(vec![root1, root2]).expect("Failed to create roots list"); - - // Encode as old format (direct list, similar to transparent) - let old_format_bytes = roots_list.as_ssz_bytes(); - - // Encode as new format (container) - let new_request = BlocksByRootRequest { - roots: roots_list.clone(), - }; - let new_format_bytes = new_request.as_ssz_bytes(); - - // Both formats should decode successfully - let decoded_from_old = BlocksByRootRequest::from_ssz_bytes_compat(&old_format_bytes) - .expect("Failed to decode old format"); - let decoded_from_new = BlocksByRootRequest::from_ssz_bytes_compat(&new_format_bytes) - .expect("Failed to decode new format"); - - // Both should have the same roots - assert_eq!(decoded_from_old.roots.len(), 2); - assert_eq!(decoded_from_new.roots.len(), 2); - assert_eq!(decoded_from_old.roots[0], root1); - assert_eq!(decoded_from_old.roots[1], root2); - assert_eq!(decoded_from_new.roots[0], root1); - assert_eq!(decoded_from_new.roots[1], root2); - } -} diff --git a/crates/net/rpc/Cargo.toml b/crates/net/rpc/Cargo.toml index b6705ac5..65690623 100644 --- a/crates/net/rpc/Cargo.toml +++ b/crates/net/rpc/Cargo.toml @@ -12,6 +12,7 @@ version.workspace = true [dependencies] axum = "0.8.1" tokio.workspace = true +ethlambda-fork-choice.workspace = true ethlambda-metrics.workspace = true tracing.workspace = true ethlambda-storage.workspace = true diff --git a/crates/net/rpc/src/fork_choice.rs b/crates/net/rpc/src/fork_choice.rs new file mode 100644 index 00000000..29d21448 --- /dev/null +++ b/crates/net/rpc/src/fork_choice.rs @@ -0,0 +1,209 @@ +use axum::{http::HeaderValue, http::header, response::IntoResponse}; +use ethlambda_storage::Store; +use ethlambda_types::primitives::H256; +use serde::Serialize; + +use crate::json_response; + +const HTML_CONTENT_TYPE: &str = "text/html; charset=utf-8"; +const FORK_CHOICE_HTML: &str = include_str!("../static/fork_choice.html"); + +#[derive(Serialize)] +pub struct ForkChoiceResponse { + nodes: Vec, + head: H256, + justified: CheckpointInfo, + finalized: CheckpointInfo, + safe_target: H256, + validator_count: u64, +} + +#[derive(Serialize)] +pub struct ForkChoiceNode { + root: H256, + slot: u64, + parent_root: H256, + proposer_index: u64, + weight: u64, +} + +#[derive(Serialize)] +pub struct CheckpointInfo { + root: H256, + slot: u64, +} + +pub async fn get_fork_choice( + axum::extract::State(store): axum::extract::State, +) -> impl IntoResponse { + let blocks = store.get_live_chain(); + let attestations = store.extract_latest_known_attestations(); + + let justified = store.latest_justified(); + let finalized = store.latest_finalized(); + let start_slot = finalized.slot; + + let weights = ethlambda_fork_choice::compute_block_weights(start_slot, &blocks, &attestations); + + let head = store.head(); + let safe_target = store.safe_target(); + + let head_state = store.head_state(); + let validator_count = head_state.validators.len() as u64; + + let nodes: Vec = blocks + .iter() + .map(|(root, &(slot, parent_root))| { + let proposer_index = store + .get_block_header(root) + .map(|h| h.proposer_index) + .unwrap_or(0); + + ForkChoiceNode { + root: *root, + slot, + parent_root, + proposer_index, + weight: weights.get(root).copied().unwrap_or(0), + } + }) + .collect(); + + let response = ForkChoiceResponse { + nodes, + head, + justified: CheckpointInfo { + root: justified.root, + slot: justified.slot, + }, + finalized: CheckpointInfo { + root: finalized.root, + slot: finalized.slot, + }, + safe_target, + validator_count, + }; + + json_response(response) +} + +pub async fn get_fork_choice_ui() -> impl IntoResponse { + let mut response = FORK_CHOICE_HTML.into_response(); + response.headers_mut().insert( + header::CONTENT_TYPE, + HeaderValue::from_static(HTML_CONTENT_TYPE), + ); + response +} + +#[cfg(test)] +mod tests { + use super::*; + use axum::{Router, body::Body, http::Request, http::StatusCode, routing::get}; + use ethlambda_storage::{Store, backend::InMemoryBackend}; + use ethlambda_types::{ + block::{BlockBody, BlockHeader}, + primitives::ssz::TreeHash, + state::{ChainConfig, Checkpoint, JustificationValidators, JustifiedSlots, State}, + }; + use http_body_util::BodyExt; + use std::sync::Arc; + use tower::ServiceExt; + + fn create_test_state() -> State { + let genesis_header = BlockHeader { + slot: 0, + proposer_index: 0, + parent_root: H256::ZERO, + state_root: H256::ZERO, + body_root: BlockBody::default().tree_hash_root(), + }; + + State { + config: ChainConfig { genesis_time: 1000 }, + slot: 0, + latest_block_header: genesis_header, + latest_justified: Checkpoint::default(), + latest_finalized: Checkpoint::default(), + historical_block_hashes: Default::default(), + justified_slots: JustifiedSlots::with_capacity(0).unwrap(), + validators: Default::default(), + justifications_roots: Default::default(), + justifications_validators: JustificationValidators::with_capacity(0).unwrap(), + } + } + + fn build_test_router(store: Store) -> Router { + Router::new() + .route("/lean/v0/fork_choice", get(get_fork_choice)) + .route("/lean/v0/fork_choice/ui", get(get_fork_choice_ui)) + .with_state(store) + } + + #[tokio::test] + async fn test_get_fork_choice_returns_json() { + let state = create_test_state(); + let backend = Arc::new(InMemoryBackend::new()); + let store = Store::from_anchor_state(backend, state); + + let app = build_test_router(store); + + let response = app + .oneshot( + Request::builder() + .uri("/lean/v0/fork_choice") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + + assert_eq!(response.status(), StatusCode::OK); + assert_eq!( + response.headers().get(header::CONTENT_TYPE).unwrap(), + crate::JSON_CONTENT_TYPE + ); + + let body = response.into_body().collect().await.unwrap().to_bytes(); + let json: serde_json::Value = serde_json::from_slice(&body).unwrap(); + + assert!(json["nodes"].is_array()); + assert!(json["head"].is_string()); + assert!(json["justified"]["root"].is_string()); + assert!(json["justified"]["slot"].is_number()); + assert!(json["finalized"]["root"].is_string()); + assert!(json["finalized"]["slot"].is_number()); + assert!(json["safe_target"].is_string()); + assert!(json["validator_count"].is_number()); + } + + #[tokio::test] + async fn test_get_fork_choice_ui_returns_html() { + let state = create_test_state(); + let backend = Arc::new(InMemoryBackend::new()); + let store = Store::from_anchor_state(backend, state); + + let app = build_test_router(store); + + let response = app + .oneshot( + Request::builder() + .uri("/lean/v0/fork_choice/ui") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + + assert_eq!(response.status(), StatusCode::OK); + assert_eq!( + response.headers().get(header::CONTENT_TYPE).unwrap(), + HTML_CONTENT_TYPE + ); + + let body = response.into_body().collect().await.unwrap().to_bytes(); + let html = String::from_utf8(body.to_vec()).unwrap(); + assert!(html.contains("")); + assert!(html.contains("d3")); + } +} diff --git a/crates/net/rpc/src/lib.rs b/crates/net/rpc/src/lib.rs index e1f2b9e6..4b7e4c8b 100644 --- a/crates/net/rpc/src/lib.rs +++ b/crates/net/rpc/src/lib.rs @@ -7,6 +7,7 @@ use ethlambda_types::primitives::ssz::Encode; pub(crate) const JSON_CONTENT_TYPE: &str = "application/json; charset=utf-8"; pub(crate) const SSZ_CONTENT_TYPE: &str = "application/octet-stream"; +mod fork_choice; pub mod metrics; pub async fn start_rpc_server(address: SocketAddr, store: Store) -> Result<(), std::io::Error> { @@ -29,6 +30,11 @@ fn build_api_router(store: Store) -> Router { "/lean/v0/checkpoints/justified", get(get_latest_justified_state), ) + .route("/lean/v0/fork_choice", get(fork_choice::get_fork_choice)) + .route( + "/lean/v0/fork_choice/ui", + get(fork_choice::get_fork_choice_ui), + ) .with_state(store) } diff --git a/crates/net/rpc/static/fork_choice.html b/crates/net/rpc/static/fork_choice.html new file mode 100644 index 00000000..43d11a1a --- /dev/null +++ b/crates/net/rpc/static/fork_choice.html @@ -0,0 +1,495 @@ + + + + + + ethlambda Fork Choice + + + + +
+
+ Head Slot + -- +
+
+ Justified Slot + -- +
+
+ Finalized Slot + -- +
+
+ Validators + -- +
+
+ +
+
Waiting for blocks...
+
+ +
+ + + + diff --git a/crates/storage/src/api/tables.rs b/crates/storage/src/api/tables.rs index deecdadb..7b184d59 100644 --- a/crates/storage/src/api/tables.rs +++ b/crates/storage/src/api/tables.rs @@ -12,14 +12,16 @@ pub enum Table { BlockSignatures, /// State storage: H256 -> State States, - /// Known attestations: u64 -> AttestationData - LatestKnownAttestations, - /// Pending attestations: u64 -> AttestationData - LatestNewAttestations, /// Gossip signatures: SignatureKey -> ValidatorSignature GossipSignatures, - /// Aggregated proofs: SignatureKey -> Vec - AggregatedPayloads, + /// Attestation data indexed by tree hash root: H256 -> AttestationData + AttestationDataByRoot, + /// Pending aggregated payloads (not yet active in fork choice): + /// SignatureKey -> Vec + LatestNewAggregatedPayloads, + /// Active aggregated payloads (counted in fork choice): + /// SignatureKey -> Vec + LatestKnownAggregatedPayloads, /// Metadata: string keys -> various scalar values Metadata, /// Live chain index: (slot || root) -> parent_root @@ -36,10 +38,10 @@ pub const ALL_TABLES: [Table; 10] = [ Table::BlockBodies, Table::BlockSignatures, Table::States, - Table::LatestKnownAttestations, - Table::LatestNewAttestations, Table::GossipSignatures, - Table::AggregatedPayloads, + Table::AttestationDataByRoot, + Table::LatestNewAggregatedPayloads, + Table::LatestKnownAggregatedPayloads, Table::Metadata, Table::LiveChain, ]; diff --git a/crates/storage/src/backend/rocksdb.rs b/crates/storage/src/backend/rocksdb.rs index c25b128b..45565f18 100644 --- a/crates/storage/src/backend/rocksdb.rs +++ b/crates/storage/src/backend/rocksdb.rs @@ -16,10 +16,10 @@ fn cf_name(table: Table) -> &'static str { Table::BlockBodies => "block_bodies", Table::BlockSignatures => "block_signatures", Table::States => "states", - Table::LatestKnownAttestations => "latest_known_attestations", - Table::LatestNewAttestations => "latest_new_attestations", Table::GossipSignatures => "gossip_signatures", - Table::AggregatedPayloads => "aggregated_payloads", + Table::AttestationDataByRoot => "attestation_data_by_root", + Table::LatestNewAggregatedPayloads => "latest_new_aggregated_payloads", + Table::LatestKnownAggregatedPayloads => "latest_known_aggregated_payloads", Table::Metadata => "metadata", Table::LiveChain => "live_chain", } diff --git a/crates/storage/src/store.rs b/crates/storage/src/store.rs index 7f735282..76945021 100644 --- a/crates/storage/src/store.rs +++ b/crates/storage/src/store.rs @@ -13,8 +13,8 @@ use crate::types::{StoredAggregatedPayload, StoredSignature}; use ethlambda_types::{ attestation::AttestationData, block::{ - AggregatedSignatureProof, Block, BlockBody, BlockHeader, BlockSignaturesWithAttestation, - BlockWithAttestation, SignedBlockWithAttestation, + Block, BlockBody, BlockHeader, BlockSignaturesWithAttestation, BlockWithAttestation, + SignedBlockWithAttestation, }, primitives::{ H256, @@ -288,7 +288,11 @@ impl Store { // ============ Time ============ - /// Returns the current store time in seconds since genesis. + /// Returns the current store time in interval counts since genesis. + /// + /// Each increment represents one 800ms interval. Derive slot/interval as: + /// slot = time() / INTERVALS_PER_SLOT + /// interval = time() % INTERVALS_PER_SLOT pub fn time(&self) -> u64 { self.get_metadata(KEY_TIME) } @@ -369,13 +373,18 @@ impl Store { { let pruned_chain = self.prune_live_chain(finalized.slot); - // Prune signatures and payloads for finalized slots + // Prune signatures, payloads, and attestation data for finalized slots let pruned_sigs = self.prune_gossip_signatures(finalized.slot); - let pruned_payloads = self.prune_aggregated_payloads(finalized.slot); - if pruned_chain > 0 || pruned_sigs > 0 || pruned_payloads > 0 { + let pruned_att_data = self.prune_attestation_data_by_root(finalized.slot); + self.prune_aggregated_payload_table(Table::LatestNewAggregatedPayloads, finalized.slot); + self.prune_aggregated_payload_table( + Table::LatestKnownAggregatedPayloads, + finalized.slot, + ); + if pruned_chain > 0 || pruned_sigs > 0 || pruned_att_data > 0 { info!( finalized_slot = finalized.slot, - pruned_chain, pruned_sigs, pruned_payloads, "Pruned finalized data" + pruned_chain, pruned_sigs, pruned_att_data, "Pruned finalized data" ); } } @@ -482,24 +491,51 @@ impl Store { count } - /// Prune aggregated payloads for slots <= finalized_slot. + /// Prune attestation data by root for slots <= finalized_slot. /// - /// Returns the number of payloads pruned. - pub fn prune_aggregated_payloads(&mut self, finalized_slot: u64) -> usize { + /// Returns the number of entries pruned. + pub fn prune_attestation_data_by_root(&mut self, finalized_slot: u64) -> usize { + let view = self.backend.begin_read().expect("read view"); + let mut to_delete = vec![]; + + for (key_bytes, value_bytes) in view + .prefix_iterator(Table::AttestationDataByRoot, &[]) + .expect("iter") + .filter_map(|r| r.ok()) + { + if let Ok(data) = AttestationData::from_ssz_bytes(&value_bytes) + && data.slot <= finalized_slot + { + to_delete.push(key_bytes.to_vec()); + } + } + drop(view); + + let count = to_delete.len(); + if !to_delete.is_empty() { + let mut batch = self.backend.begin_write().expect("write batch"); + batch + .delete_batch(Table::AttestationDataByRoot, to_delete) + .expect("delete"); + batch.commit().expect("commit"); + } + count + } + + /// Prune an aggregated payload table (new or known) for slots <= finalized_slot. + fn prune_aggregated_payload_table(&mut self, table: Table, finalized_slot: u64) { let view = self.backend.begin_read().expect("read view"); let mut updates = vec![]; let mut deletes = vec![]; - let mut removed_count = 0; for (key_bytes, value_bytes) in view - .prefix_iterator(Table::AggregatedPayloads, &[]) + .prefix_iterator(table, &[]) .expect("iter") .filter_map(|r| r.ok()) { if let Ok(mut payloads) = Vec::::from_ssz_bytes(&value_bytes) { let original_len = payloads.len(); payloads.retain(|p| p.slot > finalized_slot); - removed_count += original_len - payloads.len(); if payloads.is_empty() { deletes.push(key_bytes.to_vec()); @@ -513,18 +549,13 @@ impl Store { if !updates.is_empty() || !deletes.is_empty() { let mut batch = self.backend.begin_write().expect("write batch"); if !updates.is_empty() { - batch - .put_batch(Table::AggregatedPayloads, updates) - .expect("put"); + batch.put_batch(table, updates).expect("put"); } if !deletes.is_empty() { - batch - .delete_batch(Table::AggregatedPayloads, deletes) - .expect("delete"); + batch.delete_batch(table, deletes).expect("delete"); } batch.commit().expect("commit"); } - removed_count } /// Get the block header by root. @@ -628,117 +659,219 @@ impl Store { batch.commit().expect("commit"); } - // ============ Attestation Helpers ============ + // ============ Attestation Data By Root ============ + // + // Content-addressed attestation data storage. Used to reconstruct + // per-validator attestation maps from aggregated payloads. - fn iter_attestations(&self, table: Table) -> impl Iterator + '_ { + /// Stores attestation data indexed by its tree hash root. + pub fn insert_attestation_data_by_root(&mut self, root: H256, data: AttestationData) { + let mut batch = self.backend.begin_write().expect("write batch"); + let entries = vec![(root.as_ssz_bytes(), data.as_ssz_bytes())]; + batch + .put_batch(Table::AttestationDataByRoot, entries) + .expect("put attestation data"); + batch.commit().expect("commit"); + } + + /// Returns attestation data for the given root hash. + pub fn get_attestation_data_by_root(&self, root: &H256) -> Option { + let view = self.backend.begin_read().expect("read view"); + view.get(Table::AttestationDataByRoot, &root.as_ssz_bytes()) + .expect("get") + .map(|bytes| AttestationData::from_ssz_bytes(&bytes).expect("valid attestation data")) + } + + /// Reconstruct per-validator attestation data from aggregated payloads. + /// + /// For each (validator_id, data_root) key in the payloads, looks up the + /// attestation data by root. Returns the latest attestation per validator + /// (by slot). + pub fn extract_latest_attestations( + &self, + payloads: impl Iterator)>, + ) -> HashMap { + let mut result: HashMap = HashMap::new(); + + for ((validator_id, data_root), _payload_list) in payloads { + let Some(data) = self.get_attestation_data_by_root(&data_root) else { + continue; + }; + + let should_update = result + .get(&validator_id) + .is_none_or(|existing| existing.slot < data.slot); + + if should_update { + result.insert(validator_id, data); + } + } + + result + } + + /// Convenience: extract latest attestation per validator from known + /// (fork-choice-active) aggregated payloads only. + pub fn extract_latest_known_attestations(&self) -> HashMap { + self.extract_latest_attestations(self.iter_known_aggregated_payloads()) + } + + // ============ Known Aggregated Payloads ============ + // + // "Known" aggregated payloads are active in fork choice weight calculations. + // Promoted from "new" payloads at specific intervals (0 with proposal, 4). + + /// Iterates over all known aggregated payloads. + pub fn iter_known_aggregated_payloads( + &self, + ) -> impl Iterator)> + '_ { let view = self.backend.begin_read().expect("read view"); let entries: Vec<_> = view - .prefix_iterator(table, &[]) + .prefix_iterator(Table::LatestKnownAggregatedPayloads, &[]) .expect("iterator") .filter_map(|res| res.ok()) .map(|(k, v)| { - let validator_id = u64::from_ssz_bytes(&k).expect("valid validator_id"); - let data = AttestationData::from_ssz_bytes(&v).expect("valid attestation data"); - (validator_id, data) + let key = decode_signature_key(&k); + let payloads = + Vec::::from_ssz_bytes(&v).expect("valid payloads"); + (key, payloads) }) .collect(); entries.into_iter() } - fn get_attestation(&self, table: Table, validator_id: &u64) -> Option { + /// Insert an aggregated payload into the known (fork-choice-active) table. + pub fn insert_known_aggregated_payload( + &mut self, + key: SignatureKey, + payload: StoredAggregatedPayload, + ) { + let encoded_key = encode_signature_key(&key); let view = self.backend.begin_read().expect("read view"); - view.get(table, &validator_id.as_ssz_bytes()) + let mut payloads: Vec = view + .get(Table::LatestKnownAggregatedPayloads, &encoded_key) .expect("get") - .map(|bytes| AttestationData::from_ssz_bytes(&bytes).expect("valid attestation data")) - } + .map(|bytes| Vec::::from_ssz_bytes(&bytes).expect("valid")) + .unwrap_or_default(); + drop(view); + + payloads.push(payload); - fn insert_attestation(&mut self, table: Table, validator_id: u64, data: AttestationData) { let mut batch = self.backend.begin_write().expect("write batch"); - let entries = vec![(validator_id.as_ssz_bytes(), data.as_ssz_bytes())]; - batch.put_batch(table, entries).expect("put attestation"); + let entries = vec![(encoded_key, payloads.as_ssz_bytes())]; + batch + .put_batch(Table::LatestKnownAggregatedPayloads, entries) + .expect("put known aggregated payload"); batch.commit().expect("commit"); } - // ============ Known Attestations ============ - // - // "Known" attestations are included in fork choice weight calculations. - // They're promoted from "new" attestations at specific intervals. - - /// Iterates over all known attestations (validator_id, attestation_data). - pub fn iter_known_attestations(&self) -> impl Iterator + '_ { - self.iter_attestations(Table::LatestKnownAttestations) - } - - /// Returns a validator's latest known attestation. - pub fn get_known_attestation(&self, validator_id: &u64) -> Option { - self.get_attestation(Table::LatestKnownAttestations, validator_id) - } - - /// Stores a validator's latest known attestation. - pub fn insert_known_attestation(&mut self, validator_id: u64, data: AttestationData) { - self.insert_attestation(Table::LatestKnownAttestations, validator_id, data); - } - - // ============ New Attestations ============ + // ============ New Aggregated Payloads ============ // - // "New" attestations are pending attestations not yet included in fork choice. - // They're promoted to "known" via `promote_new_attestations`. + // "New" aggregated payloads are pending — not yet counted in fork choice. + // Promoted to "known" via `promote_new_aggregated_payloads`. - /// Iterates over all new (pending) attestations. - pub fn iter_new_attestations(&self) -> impl Iterator + '_ { - self.iter_attestations(Table::LatestNewAttestations) + /// Iterates over all new (pending) aggregated payloads. + pub fn iter_new_aggregated_payloads( + &self, + ) -> impl Iterator)> + '_ { + let view = self.backend.begin_read().expect("read view"); + let entries: Vec<_> = view + .prefix_iterator(Table::LatestNewAggregatedPayloads, &[]) + .expect("iterator") + .filter_map(|res| res.ok()) + .map(|(k, v)| { + let key = decode_signature_key(&k); + let payloads = + Vec::::from_ssz_bytes(&v).expect("valid payloads"); + (key, payloads) + }) + .collect(); + entries.into_iter() } - /// Returns a validator's latest new (pending) attestation. - pub fn get_new_attestation(&self, validator_id: &u64) -> Option { - self.get_attestation(Table::LatestNewAttestations, validator_id) - } + /// Insert an aggregated payload into the new (pending) table. + pub fn insert_new_aggregated_payload( + &mut self, + key: SignatureKey, + payload: StoredAggregatedPayload, + ) { + let encoded_key = encode_signature_key(&key); + let view = self.backend.begin_read().expect("read view"); + let mut payloads: Vec = view + .get(Table::LatestNewAggregatedPayloads, &encoded_key) + .expect("get") + .map(|bytes| Vec::::from_ssz_bytes(&bytes).expect("valid")) + .unwrap_or_default(); + drop(view); - /// Stores a validator's new (pending) attestation. - pub fn insert_new_attestation(&mut self, validator_id: u64, data: AttestationData) { - self.insert_attestation(Table::LatestNewAttestations, validator_id, data); - } + payloads.push(payload); - /// Removes a validator's new (pending) attestation. - pub fn remove_new_attestation(&mut self, validator_id: &u64) { let mut batch = self.backend.begin_write().expect("write batch"); + let entries = vec![(encoded_key, payloads.as_ssz_bytes())]; batch - .delete_batch( - Table::LatestNewAttestations, - vec![validator_id.as_ssz_bytes()], - ) - .expect("delete attestation"); + .put_batch(Table::LatestNewAggregatedPayloads, entries) + .expect("put new aggregated payload"); batch.commit().expect("commit"); } - /// Promotes all new attestations to known attestations. + /// Promotes all new aggregated payloads to known, making them active in fork choice. /// - /// Takes all attestations from `latest_new_attestations` and moves them - /// to `latest_known_attestations`, making them count for fork choice. - pub fn promote_new_attestations(&mut self) { - // Read all new attestations + /// Merges entries from `LatestNewAggregatedPayloads` into `LatestKnownAggregatedPayloads`, + /// appending to existing payload lists rather than overwriting them. + pub fn promote_new_aggregated_payloads(&mut self) { let view = self.backend.begin_read().expect("read view"); - let new_attestations: Vec<(Vec, Vec)> = view - .prefix_iterator(Table::LatestNewAttestations, &[]) + let new_entries: Vec<(Vec, Vec)> = view + .prefix_iterator(Table::LatestNewAggregatedPayloads, &[]) .expect("iterator") .filter_map(|res| res.ok()) .map(|(k, v)| (k.to_vec(), v.to_vec())) .collect(); - drop(view); - if new_attestations.is_empty() { + if new_entries.is_empty() { + drop(view); return; } - // Delete from new and insert to known in a single batch + // Merge new payloads with existing known payloads + let merged: Vec<(Vec, Vec)> = new_entries + .iter() + .map(|(key, new_bytes)| { + let new_payloads = + Vec::::from_ssz_bytes(new_bytes).expect("valid"); + let mut known_payloads: Vec = view + .get(Table::LatestKnownAggregatedPayloads, key) + .expect("get") + .map(|bytes| { + Vec::::from_ssz_bytes(&bytes).expect("valid") + }) + .unwrap_or_default(); + known_payloads.extend(new_payloads); + (key.clone(), known_payloads.as_ssz_bytes()) + }) + .collect(); + drop(view); + + let keys_to_delete: Vec<_> = new_entries.into_iter().map(|(k, _)| k).collect(); let mut batch = self.backend.begin_write().expect("write batch"); - let keys_to_delete: Vec<_> = new_attestations.iter().map(|(k, _)| k.clone()).collect(); batch - .delete_batch(Table::LatestNewAttestations, keys_to_delete) - .expect("delete new attestations"); + .delete_batch(Table::LatestNewAggregatedPayloads, keys_to_delete) + .expect("delete new aggregated payloads"); + batch + .put_batch(Table::LatestKnownAggregatedPayloads, merged) + .expect("put known aggregated payloads"); + batch.commit().expect("commit"); + } + + /// Delete specific gossip signatures by key. + pub fn delete_gossip_signatures(&mut self, keys: &[SignatureKey]) { + if keys.is_empty() { + return; + } + let encoded_keys: Vec<_> = keys.iter().map(encode_signature_key).collect(); + let mut batch = self.backend.begin_write().expect("write batch"); batch - .put_batch(Table::LatestKnownAttestations, new_attestations) - .expect("put known attestations"); + .delete_batch(Table::GossipSignatures, encoded_keys) + .expect("delete gossip signatures"); batch.commit().expect("commit"); } @@ -786,76 +919,6 @@ impl Store { batch.commit().expect("commit"); } - // ============ Aggregated Payloads ============ - // - // Aggregated payloads are leanVM proofs combining multiple signatures. - // Used to verify block signatures efficiently. - - /// Iterates over all aggregated signature payloads. - pub fn iter_aggregated_payloads( - &self, - ) -> impl Iterator)> + '_ { - let view = self.backend.begin_read().expect("read view"); - let entries: Vec<_> = view - .prefix_iterator(Table::AggregatedPayloads, &[]) - .expect("iterator") - .filter_map(|res| res.ok()) - .map(|(k, v)| { - let key = decode_signature_key(&k); - let payloads = - Vec::::from_ssz_bytes(&v).expect("valid payloads"); - (key, payloads) - }) - .collect(); - entries.into_iter() - } - - /// Returns aggregated payloads for a signature key. - fn get_aggregated_payloads(&self, key: &SignatureKey) -> Option> { - let view = self.backend.begin_read().expect("read view"); - view.get(Table::AggregatedPayloads, &encode_signature_key(key)) - .expect("get") - .map(|bytes| { - Vec::::from_ssz_bytes(&bytes).expect("valid payloads") - }) - } - - /// Insert an aggregated signature proof for a validator's attestation. - /// - /// Multiple proofs can be stored for the same (validator, attestation_data) pair, - /// each with its own slot metadata for pruning. - /// - /// # Thread Safety - /// - /// This method uses a read-modify-write pattern that is NOT atomic: - /// 1. Read existing payloads - /// 2. Append new payload - /// 3. Write back - /// - /// Concurrent calls could result in lost updates. This method MUST be called - /// from a single thread. In our case, that thread is the `BlockChain` `GenServer` - pub fn insert_aggregated_payload( - &mut self, - attestation_data: &AttestationData, - validator_id: u64, - proof: AggregatedSignatureProof, - ) { - let slot = attestation_data.slot; - let data_root = attestation_data.tree_hash_root(); - let key = (validator_id, data_root); - - // Read existing, add new, write back (NOT atomic - requires single-threaded access) - let mut payloads = self.get_aggregated_payloads(&key).unwrap_or_default(); - payloads.push(StoredAggregatedPayload { slot, proof }); - - let mut batch = self.backend.begin_write().expect("write batch"); - let entries = vec![(encode_signature_key(&key), payloads.as_ssz_bytes())]; - batch - .put_batch(Table::AggregatedPayloads, entries) - .expect("put proofs"); - batch.commit().expect("commit"); - } - // ============ Derived Accessors ============ /// Returns the slot of the current head block. diff --git a/docs/fork_choice_visualization.md b/docs/fork_choice_visualization.md new file mode 100644 index 00000000..ab508977 --- /dev/null +++ b/docs/fork_choice_visualization.md @@ -0,0 +1,106 @@ +# Fork Choice Visualization + +A browser-based real-time visualization of the LMD GHOST fork choice tree, served from the existing RPC server with no additional dependencies. + +## Endpoints + +| Endpoint | Description | +|----------|-------------| +| `GET /lean/v0/fork_choice/ui` | Interactive D3.js visualization page | +| `GET /lean/v0/fork_choice` | JSON snapshot of the fork choice tree | + +Both endpoints are served on the metrics port (`--metrics-port`, default `5054`). + +## Quick Start + +### Local devnet + +```bash +make run-devnet +``` + +The local devnet runs 3 ethlambda nodes with metrics ports 8085, 8086, and 8087. Open any of them: + +- http://localhost:8085/lean/v0/fork_choice/ui +- http://localhost:8086/lean/v0/fork_choice/ui +- http://localhost:8087/lean/v0/fork_choice/ui + +### Standalone node + +```bash +cargo run --release -- \ + --custom-network-config-dir ./config \ + --node-key ./keys/node.key \ + --node-id 0 \ + --metrics-port 5054 +``` + +Then open http://localhost:5054/lean/v0/fork_choice/ui. + +## Visualization Guide + +### Color coding + +| Color | Meaning | +|-------|---------| +| Green | Finalized block | +| Blue | Justified block | +| Yellow | Safe target block | +| Orange | Current head | +| Gray | Default (no special status) | + +### Layout + +- **Y axis**: slot number (time flows downward) +- **X axis**: fork spreading — branches appear when competing chains exist +- **Circle size**: scaled by `weight / validator_count` — larger circles have more attestation support + +### Interactive features + +- **Tooltips**: hover any block to see root hash, slot, proposer index, and weight +- **Auto-polling**: the page fetches fresh data every 2 seconds +- **Auto-scroll**: the view follows the head as the chain progresses + +### What to look for + +- **Single vertical chain**: healthy consensus, no forks +- **Horizontal branching**: competing chains — check attestation weights to see which branch validators prefer +- **Color transitions**: blocks turning green as finalization advances +- **Stalled finalization**: if justified/finalized slots stop advancing, check validator attestation activity + +## JSON API + +```bash +curl -s http://localhost:5054/lean/v0/fork_choice | jq . +``` + +Response schema: + +```json +{ + "nodes": [ + { + "root": "0x...", + "slot": 42, + "parent_root": "0x...", + "proposer_index": 3, + "weight": 5 + } + ], + "head": "0x...", + "justified": { "root": "0x...", "slot": 10 }, + "finalized": { "root": "0x...", "slot": 5 }, + "safe_target": "0x...", + "validator_count": 8 +} +``` + +| Field | Description | +|-------|-------------| +| `nodes` | All blocks in the live chain (from finalized slot onward) | +| `nodes[].weight` | Number of latest-message attestations whose target is this block or a descendant | +| `head` | Current fork choice head root | +| `justified` | Latest justified checkpoint | +| `finalized` | Latest finalized checkpoint | +| `safe_target` | Block root selected with a 2/3 validator threshold | +| `validator_count` | Total validators in the head state |