From 10d1db9112c4a28f9626ede58be828502d90f56e Mon Sep 17 00:00:00 2001 From: James Parker Date: Fri, 2 Jan 2026 03:32:25 -0500 Subject: [PATCH 1/8] Sketch of BFT state --- ossa-core/src/store/bft.rs | 71 ++++++++++++++++++++++++++++++++++++-- 1 file changed, 69 insertions(+), 2 deletions(-) diff --git a/ossa-core/src/store/bft.rs b/ossa-core/src/store/bft.rs index 03afb3e..13f42bd 100644 --- a/ossa-core/src/store/bft.rs +++ b/ossa-core/src/store/bft.rs @@ -1,6 +1,6 @@ -use std::collections::BTreeSet; +use std::collections::{BTreeMap, BTreeSet}; -use crate::store::dag::{self, Frontier}; +use crate::{auth::DeviceId, store::dag::{self, Frontier}, util::Sha256Hash}; /// A round in the BFT strong consistency protocol. pub type Round = u64; @@ -19,6 +19,7 @@ pub(crate) struct State { pub(crate) dag_state: dag::State, /// Frontier of operations that have been comitted in the DAG state. pub(crate) committed_frontier: Frontier, + pub(crate) round_states: Vec>, } impl State { @@ -27,6 +28,72 @@ impl State { initial_state, dag_state: dag::State::new(), committed_frontier: BTreeSet::new(), + round_states: vec![], } } } + +// A signed value. +pub struct Signed { + value: A, + // JP: Generalize this eventually. + signature: ed25519_dalek::Signature, +} + +pub(crate) struct RoundState { + // Blocks in this round for each validator. + // A validator can only sign a single block in each round (otherwise, they are detected to be malicious). + blocks: BTreeMap>>, + // 2/3 (?) of (weighted) validators promise to make the block available and validated/approve of operations. + certificates: BTreeMap>, + // 2/3 (1/3?) of (weighted) validators have seen 2/3 (?) of the certificates. + commit_round: ThresholdSigned, +} + +pub(crate) struct BlockId(); // TODO: sha256 + +// A BFT block points to the tips of the DAG and the previous round's certificates. +pub(crate) struct Block { + round: u64, + // Tips of DAG operations + dag_frontier: Frontier, + // Must contain 2/3 of previous round's certificates (or be round 0). + parents: Vec, // Only strong edges, don't need weak edges since blocks point to head of DAG operations anyways + // Who proposed the block. + proposer: DeviceId, +} + +pub(crate) struct ThresholdSignature(); // TODO +pub(crate) struct PartialSignature(); // TODO + +pub(crate) struct CertificateId(); // TODO: sha256 + +pub(crate) struct Certificate { + // The block's id (hash). + block: BlockId, + // The block's round. + round: u64, + // Who proposed the block. + proposer: DeviceId, +} + +// A threshold signed value (or being signed). +pub(crate) enum ThresholdSigned { + // Weighted threshold of validators have signed the value. + ThresholdSignature { + value: A, + signature: ThresholdSignature, + }, + // Weighted threshold hasn't been met yet. + PartialSignatures { + value: A, + signatures: BTreeMap, + }, +} + +pub(crate) struct RoundComplete { + store_id: Sha256Hash, // StoreId, + round: u64, +} +// JP: Could include hash of previous round's leader block?? Probably not helpful + From 7c7c5e68194bfbd1b61804f912db0e3db469c33d Mon Sep 17 00:00:00 2001 From: James Parker Date: Sun, 4 Jan 2026 10:43:19 -0500 Subject: [PATCH 2/8] Scaffolding for BFT sync --- ossa-core/src/protocol/manager/v0.rs | 48 ++++++++++++++++--- ossa-core/src/protocol/store_bft_dag/v0.rs | 17 ++++--- .../store_peer/{ecg_sync.rs => dag_sync.rs} | 28 +++++------ ossa-core/src/protocol/store_peer/mod.rs | 2 +- ossa-core/src/protocol/store_peer/v0.rs | 14 +++--- ossa-core/src/store/bft.rs | 4 +- ossa-core/src/store/mod.rs | 18 ++++++- 7 files changed, 90 insertions(+), 41 deletions(-) rename ossa-core/src/protocol/store_peer/{ecg_sync.rs => dag_sync.rs} (96%) diff --git a/ossa-core/src/protocol/manager/v0.rs b/ossa-core/src/protocol/manager/v0.rs index 30ee909..05bdec7 100644 --- a/ossa-core/src/protocol/manager/v0.rs +++ b/ossa-core/src/protocol/manager/v0.rs @@ -158,10 +158,11 @@ impl< debug!("Manager command channel closed."); // TODO: Do something here? } - Some(PeerManagerCommand::RequestStoreSync { store_id, spawn_task_ec, spawn_task_sc }) => { + Some(PeerManagerCommand::RequestStoreSync { store_id, spawn_task_ec, spawn_task_sc, spawn_task_bft }) => { let ec_stream_id = self.next_stream_id(); let sc_stream_id = self.next_stream_id(); - let _response = self.run_request_new_stream_server(&mut stream, store_id, ec_stream_id, spawn_task_ec, sc_stream_id, spawn_task_sc).await; + let bft_stream_id = self.next_stream_id(); + let _response = self.run_request_new_stream_server(&mut stream, store_id, ec_stream_id, spawn_task_ec, sc_stream_id, spawn_task_sc, bft_stream_id, spawn_task_bft).await; debug!("Requested to sync store with peer."); } } @@ -201,6 +202,7 @@ impl< store_id, ec_stream_id, sc_stream_id, + bft_stream_id, } => { debug!( "Received MsgManagerRequest::CreateStoreStream: {ec_stream_id}, {store_id:?}" @@ -210,6 +212,7 @@ impl< store_id, ec_stream_id, sc_stream_id, + bft_stream_id, ) .await; } @@ -247,6 +250,7 @@ impl< store_id: StoreId, ec_stream_id: StreamId, sc_stream_id: StreamId, + bft_stream_id: StreamId, ) { let accept = { // Check if stream is valid (it can be allocated by peer and is available). @@ -273,7 +277,7 @@ impl< .expect("TODO"); let spawn_task = rx.await.expect("TODO"); - if let Some((ec_spawn_task, sc_spawn_task)) = spawn_task { + if let Some((ec_spawn_task, sc_spawn_task, bft_spawn_task)) = spawn_task { // Tell multiplexer to create EC miniprotocol. let is_running_ec = self .create_multiplexer_stream(ec_stream_id, ec_spawn_task) @@ -285,15 +289,29 @@ impl< .create_multiplexer_stream(sc_stream_id, sc_spawn_task) .await; - if !is_running_sc { + if is_running_sc { + // Tell multiplexer to create BFT miniprotocol. + let is_running_bft = self + .create_multiplexer_stream(bft_stream_id, bft_spawn_task) + .await; + if !is_running_bft { + error!( + "Failed to create miniprotocol stream to sync BFT store." + ); + panic!("TODO: Shutdown EC+SC miniprotocol..."); + } + + Ok(is_running_bft) + } else { error!( "Failed to create miniprotocol stream to strongly sync store." ); panic!("TODO: Shutdown EC miniprotocol..."); } - - Ok(is_running_sc) } else { + error!( + "Failed to create miniprotocol stream to weakly sync store." + ); Ok(false) } } else { @@ -321,12 +339,15 @@ impl< ec_spawn_task: Box, sc_stream_id: StreamId, sc_spawn_task: Box, + bft_stream_id: StreamId, + bft_spawn_task: Box, ) { // Send request message. let req = MsgManagerRequest::CreateStoreStream { store_id, ec_stream_id, sc_stream_id, + bft_stream_id, }; send(stream, req).await.expect("TODO"); @@ -366,6 +387,18 @@ impl< "TODO: Send shutdown for this miniprotocol and restore status to Known." ); } + + // Tell multiplexer to create BFT miniprotocol. + let is_running = self + .create_multiplexer_stream(bft_stream_id, bft_spawn_task) + .await; + + if !is_running { + error!("Failed to create miniprotocol stream to sync BFT store."); + panic!( + "TODO: Send shutdown for this miniprotocol and restore status to Known." + ); + } } } } @@ -544,6 +577,8 @@ pub(crate) enum MsgManagerRequest { ec_stream_id: StreamId, /// Strongly consistent stream id. sc_stream_id: StreamId, + /// BFT stream id. + bft_stream_id: StreamId, }, } @@ -623,5 +658,6 @@ pub(crate) enum PeerManagerCommand { store_id: StoreId, spawn_task_ec: Box, spawn_task_sc: Box, + spawn_task_bft: Box, }, } diff --git a/ossa-core/src/protocol/store_bft_dag/v0.rs b/ossa-core/src/protocol/store_bft_dag/v0.rs index 745a00f..42ac880 100644 --- a/ossa-core/src/protocol/store_bft_dag/v0.rs +++ b/ossa-core/src/protocol/store_bft_dag/v0.rs @@ -6,16 +6,15 @@ use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; use tokio::sync::oneshot; use tracing::debug; -use crate::protocol::store_peer::ecg_sync::{DAGStateSubscriber, MsgDAGSyncResponse}; +use crate::protocol::store_peer::dag_sync::{DAGStateSubscriber, MsgDAGSyncResponse}; use crate::store::dag; use crate::{ auth::DeviceId, network::protocol::{receive, MiniProtocol}, protocol::store_peer::{ - ecg_sync::{ECGSyncInitiator, ECGSyncResponder, MsgDAGSyncRequest}, - v0::{MsgStoreSync, MsgStoreSyncRequest}, + dag_sync::{DAGSyncInitiator, DAGSyncResponder, MsgDAGSyncRequest}, }, - store::{dag::v0::HeaderId, UntypedStoreCommand}, + store::UntypedStoreCommand, }; /// Miniprotocol to sync the DAG in the strongly consistent BFT consensus protocol. @@ -139,7 +138,7 @@ where ) -> impl Future + Send { async move { debug!("StoreDAGSync server running!"); - let mut dag_sync: Option> = None; + let mut dag_sync: Option> = None; let mut recv_chan = self .recv_chan @@ -153,7 +152,7 @@ where // JP: Eventually switch ecg_state to an Arc? let (new_dag_sync, operations) = - ECGSyncInitiator::::run_new( + DAGSyncInitiator::::run_new( &mut stream, &dag_state, ) @@ -189,7 +188,7 @@ where ) -> impl Future + Send { async move { debug!("StoreDAGSync client running!"); - let mut dag_sync: Option> = None; + let mut dag_sync: Option> = None; // TODO: Check when done. loop { @@ -203,7 +202,7 @@ where todo!("TODO: Error, SCG sync has already been initialized."); } - let mut dag_sync_ = ECGSyncResponder::new(); + let mut dag_sync_ = DAGSyncResponder::new(); let scg_state = self.request_dag_state(&mut dag_sync_, None).await; @@ -236,7 +235,7 @@ where { async fn request_dag_state( &self, - responder: &mut ECGSyncResponder, + responder: &mut DAGSyncResponder, tips: Option>, ) -> dag::UntypedState { debug!("Requesting SCG state"); diff --git a/ossa-core/src/protocol/store_peer/ecg_sync.rs b/ossa-core/src/protocol/store_peer/dag_sync.rs similarity index 96% rename from ossa-core/src/protocol/store_peer/ecg_sync.rs rename to ossa-core/src/protocol/store_peer/dag_sync.rs index 6c875ab..ca9eea1 100644 --- a/ossa-core/src/protocol/store_peer/ecg_sync.rs +++ b/ossa-core/src/protocol/store_peer/dag_sync.rs @@ -81,13 +81,13 @@ pub(crate) enum MsgDAGSyncRequest { pub(crate) enum MsgDAGSyncResponse { Response { have: Vec, - operations: Vec<(Header, RawDAGBody)>, // ECG headers and serialized ECG body. + operations: Vec<(Header, RawDAGBody)>, // DAG headers and serialized DAG body. }, Wait, // JP: Use StoreSyncResponse? } // Has initiative -pub(crate) struct ECGSyncInitiator { +pub(crate) struct DAGSyncInitiator { have: Vec, phantom: PhantomData, } @@ -96,7 +96,7 @@ impl< Hash: Send + Sync, HeaderId: Clone + Debug + Ord + Send + Sync, Header: Debug + Send + Sync, - > ECGSyncInitiator + > DAGSyncInitiator { async fn receive_response_helper, Msg>( stream: &mut S, @@ -125,7 +125,7 @@ impl< // TODO: Eventually take an Arc pub(crate) async fn run_new, Msg>( stream: &mut S, - ecg_state: &dag::UntypedState, + dag_state: &dag::UntypedState, ) -> (Self, Vec<(Header, RawDAGBody)>) where MsgDAGSyncRequest: Into, @@ -134,26 +134,26 @@ impl< // TODO: Limit on tips (128? 64? 32? MAX_HAVE_HEADERS) warn!("TODO: Check request sizes."); let req = MsgDAGSyncRequest::DAGInitialSync { - tips: ecg_state.tips().iter().cloned().collect(), + tips: dag_state.tips().iter().cloned().collect(), }; send(stream, req).await.expect("TODO"); // Receive response. let (have, operations) = Self::receive_response_helper(stream).await; - let ecg_sync = ECGSyncInitiator { + let dag_sync = DAGSyncInitiator { have, phantom: PhantomData, }; - (ecg_sync, operations) + (dag_sync, operations) } /// Run a round of ECG sync, requesting new operations from peer. pub(crate) async fn run_round, Msg>( &mut self, stream: &mut S, - ecg_state: &dag::UntypedState, + dag_state: &dag::UntypedState, ) -> Vec<(Header, RawDAGBody)> where MsgDAGSyncRequest: Into, @@ -162,7 +162,7 @@ impl< // Check which headers they sent us that we know. let mut known_bitmap = BitArray::ZERO; for (i, header_id) in self.have.iter().enumerate() { - if ecg_state.contains(header_id) { + if dag_state.contains(header_id) { // Respond with which headers we know. known_bitmap.set(i, true); } @@ -171,7 +171,7 @@ impl< // TODO: Limit on tips (128? 64? 32? MAX_HAVE_HEADERS) warn!("TODO: Check request sizes."); let req = MsgDAGSyncRequest::DAGSync { - tips: ecg_state.tips().iter().cloned().collect(), + tips: dag_state.tips().iter().cloned().collect(), known: known_bitmap, }; send(stream, req).await.expect("TODO"); @@ -186,7 +186,7 @@ impl< } // Responder -pub(crate) struct ECGSyncResponder { +pub(crate) struct DAGSyncResponder { pub(crate) their_known: BTreeSet, pub(crate) our_unknown: BTreeSet, pub(crate) send_queue: BinaryHeap<(Reverse, HeaderId)>, @@ -218,7 +218,7 @@ pub(crate) fn mark_as_known_helper( } } -impl ECGSyncResponder { +impl DAGSyncResponder { fn they_know(&self, header_id: &HeaderId) -> bool where HeaderId: Copy + Ord, @@ -245,7 +245,7 @@ impl ECGSyncResponder { where HeaderId: std::cmp::Ord, { - ECGSyncResponder { + DAGSyncResponder { their_known: BTreeSet::new(), send_queue: BinaryHeap::new(), our_unknown: BTreeSet::new(), @@ -536,7 +536,7 @@ impl ECGSyncResponder { pub(crate) trait DAGStateSubscriber { async fn request_dag_state( &self, - responder: &mut ECGSyncResponder, + responder: &mut DAGSyncResponder, tips: Option>, ) -> dag::UntypedState; } diff --git a/ossa-core/src/protocol/store_peer/mod.rs b/ossa-core/src/protocol/store_peer/mod.rs index 1f99034..4371574 100644 --- a/ossa-core/src/protocol/store_peer/mod.rs +++ b/ossa-core/src/protocol/store_peer/mod.rs @@ -1,2 +1,2 @@ -pub mod ecg_sync; +pub mod dag_sync; pub mod v0; diff --git a/ossa-core/src/protocol/store_peer/v0.rs b/ossa-core/src/protocol/store_peer/v0.rs index 10d44e7..4ac6a0e 100644 --- a/ossa-core/src/protocol/store_peer/v0.rs +++ b/ossa-core/src/protocol/store_peer/v0.rs @@ -10,8 +10,8 @@ use tracing::debug; use crate::{ auth::DeviceId, network::protocol::{receive, send, MiniProtocol}, - protocol::store_peer::ecg_sync::{ - DAGStateSubscriber, ECGSyncInitiator, ECGSyncResponder, MsgDAGSyncRequest, + protocol::store_peer::dag_sync::{ + DAGStateSubscriber, DAGSyncInitiator, DAGSyncResponder, MsgDAGSyncRequest, MsgDAGSyncResponse, }, store::{self, dag, HandlePeerRequest, UntypedStoreCommand}, @@ -199,7 +199,7 @@ where { async fn request_dag_state( &self, - responder: &mut ECGSyncResponder, + responder: &mut DAGSyncResponder, tips: Option>, ) -> dag::UntypedState { debug!("Requesting ECG state"); @@ -356,7 +356,7 @@ impl< mut stream: S, ) -> impl Future + Send { async move { - let mut ecg_sync: Option> = None; + let mut ecg_sync: Option> = None; // Wait for command from store. let mut recv_chan = self @@ -465,7 +465,7 @@ impl< // JP: Eventually switch ecg_state to an Arc? let (new_ecg_sync, operations) = - ECGSyncInitiator::run_new(&mut stream, &ecg_state).await; + DAGSyncInitiator::run_new(&mut stream, &ecg_state).await; ecg_sync = Some(new_ecg_sync); operations } @@ -506,7 +506,7 @@ impl< mut stream: S, ) -> impl Future + Send { async move { - let mut ecg_sync: Option> = None; + let mut ecg_sync: Option> = None; // TODO: Check when done. loop { @@ -567,7 +567,7 @@ impl< todo!("TODO: Error, ECG sync has already been initialized."); } - let mut ecg_sync_ = ECGSyncResponder::new(); + let mut ecg_sync_ = DAGSyncResponder::new(); let ecg_state = self.request_dag_state(&mut ecg_sync_, None).await; diff --git a/ossa-core/src/store/bft.rs b/ossa-core/src/store/bft.rs index 13f42bd..0a5c46d 100644 --- a/ossa-core/src/store/bft.rs +++ b/ossa-core/src/store/bft.rs @@ -50,7 +50,7 @@ pub(crate) struct RoundState { commit_round: ThresholdSigned, } -pub(crate) struct BlockId(); // TODO: sha256 +pub(crate) struct BlockId(Sha256Hash); // A BFT block points to the tips of the DAG and the previous round's certificates. pub(crate) struct Block { @@ -66,7 +66,7 @@ pub(crate) struct Block { pub(crate) struct ThresholdSignature(); // TODO pub(crate) struct PartialSignature(); // TODO -pub(crate) struct CertificateId(); // TODO: sha256 +pub(crate) struct CertificateId(Sha256Hash); pub(crate) struct Certificate { // The block's id (hash). diff --git a/ossa-core/src/store/mod.rs b/ossa-core/src/store/mod.rs index 83ce2e6..6eb083b 100644 --- a/ossa-core/src/store/mod.rs +++ b/ossa-core/src/store/mod.rs @@ -1390,12 +1390,19 @@ async fn manage_peers + Clone + }) }); + let spawn_task_bft = Box::new(move |_party, stream_id, sender, receiver| { + tokio::spawn(async move { + unimplemented!(); + }) + }); + // Send request to peer's manager for stream. let store_id = store.store_id(); let cmd = PeerManagerCommand::RequestStoreSync { store_id, spawn_task_ec, spawn_task_sc, + spawn_task_bft, }; command_chan.send(cmd).expect("TODO"); } @@ -1650,6 +1657,7 @@ pub(crate) async fn run_handler( debug!("Store ECG sync with peer (without initiative) exited.") }) }); + let send_commands_untyped = send_commands_untyped.clone(); let spawn_task_sc: Box = Box::new(move |party, stream_id, sender, receiver| { tokio::spawn(async move { @@ -1666,7 +1674,13 @@ pub(crate) async fn run_handler( debug!("Store SCG sync with peer (without initiative) exited.") }) }); - Some((spawn_task_ec, spawn_task_sc)) + + let spawn_task_bft: Box = Box::new(move |party, stream_id, sender, receiver| { + tokio::spawn(async move { + unimplemented!(); + }) + }); + Some((spawn_task_ec, spawn_task_sc, spawn_task_bft)) } else { debug!("Store is already running"); None @@ -1852,7 +1866,7 @@ pub(crate) enum UntypedStoreCommand, Box)>>, + oneshot::Sender, Box, Box)>>, }, RegisterOutgoingPeerECGSyncing { peer: DeviceId, From 7c97fbb7eaa7a09dd62f83f2e6db527cbbf38cec Mon Sep 17 00:00:00 2001 From: James Parker Date: Sun, 25 Jan 2026 17:32:12 -0500 Subject: [PATCH 3/8] WIP: Send initial tips to pull BFT updates --- ossa-core/src/network/multiplexer.rs | 1 - ossa-core/src/protocol/manager/v0.rs | 3 +- ossa-core/src/protocol/mod.rs | 3 +- .../{store_bft_dag => store_bft_sync}/mod.rs | 0 ossa-core/src/protocol/store_bft_sync/v0.rs | 237 ++++++++++++++++++ ossa-core/src/protocol/store_sc_dag/mod.rs | 1 + .../{store_bft_dag => store_sc_dag}/v0.rs | 10 +- ossa-core/src/store/bft.rs | 120 +++++++-- ossa-core/src/store/mod.rs | 10 +- 9 files changed, 358 insertions(+), 27 deletions(-) rename ossa-core/src/protocol/{store_bft_dag => store_bft_sync}/mod.rs (100%) create mode 100644 ossa-core/src/protocol/store_bft_sync/v0.rs create mode 100644 ossa-core/src/protocol/store_sc_dag/mod.rs rename ossa-core/src/protocol/{store_bft_dag => store_sc_dag}/v0.rs (96%) diff --git a/ossa-core/src/network/multiplexer.rs b/ossa-core/src/network/multiplexer.rs index 112e2aa..6d48025 100644 --- a/ossa-core/src/network/multiplexer.rs +++ b/ossa-core/src/network/multiplexer.rs @@ -339,7 +339,6 @@ struct MiniprotocolState { sender: mpsc::Sender, } -// JP: TODO: This O probably isn't needed. pub(crate) async fn run_miniprotocol_async( p: P, is_client: bool, diff --git a/ossa-core/src/protocol/manager/v0.rs b/ossa-core/src/protocol/manager/v0.rs index 05bdec7..b8a7986 100644 --- a/ossa-core/src/protocol/manager/v0.rs +++ b/ossa-core/src/protocol/manager/v0.rs @@ -256,7 +256,8 @@ impl< // Check if stream is valid (it can be allocated by peer and is available). let is_valid_id_ec = self.is_valid_stream_id(false, &ec_stream_id); let is_valid_id_sc = self.is_valid_stream_id(false, &sc_stream_id); - if !is_valid_id_ec || !is_valid_id_sc { + let is_valid_id_bft = self.is_valid_stream_id(false, &bft_stream_id); + if !is_valid_id_ec || !is_valid_id_sc || !is_valid_id_bft { debug!("Peer sent invalid stream id."); Err(MsgManagerError::InvalidStreamId) } else { diff --git a/ossa-core/src/protocol/mod.rs b/ossa-core/src/protocol/mod.rs index 85be2f9..9dafa7f 100644 --- a/ossa-core/src/protocol/mod.rs +++ b/ossa-core/src/protocol/mod.rs @@ -13,8 +13,9 @@ use crate::{ pub mod heartbeat; pub mod manager; -pub mod store_bft_dag; pub mod store_peer; +pub mod store_sc_dag; +pub mod store_bft_sync; pub mod v0; pub(crate) struct MiniProtocolArgs { diff --git a/ossa-core/src/protocol/store_bft_dag/mod.rs b/ossa-core/src/protocol/store_bft_sync/mod.rs similarity index 100% rename from ossa-core/src/protocol/store_bft_dag/mod.rs rename to ossa-core/src/protocol/store_bft_sync/mod.rs diff --git a/ossa-core/src/protocol/store_bft_sync/v0.rs b/ossa-core/src/protocol/store_bft_sync/v0.rs new file mode 100644 index 0000000..0e211f6 --- /dev/null +++ b/ossa-core/src/protocol/store_bft_sync/v0.rs @@ -0,0 +1,237 @@ +// This is very similar to DAG sync, except there are a few key differences: +// - The DAG is bipartite between block and certificate nodes +// - We have rounds, so we can take advantage of that (ex, only send previous round blocks if it has a full threshold signature?) +// +// Do we need to go back more than 1 round? + +use std::{future::Future, marker::PhantomData}; + +use serde::{Deserialize, Serialize}; +use tokio::sync::{mpsc::{UnboundedReceiver, UnboundedSender}, watch}; +use tracing::debug; + +use crate::{auth::DeviceId, network::protocol::{send, MiniProtocol}, store::{bft::{BFTState, Block, BlockId, PartialSignature, Round, ThresholdSignature}, dag, UntypedStoreCommand}, util::{Sha256Hash, Stream}}; + + +pub(crate) struct StoreBFTSync { + peer: DeviceId, + // Receive commands from store if we have initiative or send commands to store if we're the responder. + recv_chan: Option>, + // Send commands to store if we're the responder and send results back to store if we're the initiator. + send_chan: UnboundedSender>, // JP: Make this a stream? + // BFT state + bft_state: watch::Receiver>, +} + +impl StoreBFTSync { + pub(crate) fn new_server( + peer: DeviceId, + recv_chan: UnboundedReceiver, + send_chan: UnboundedSender< + UntypedStoreCommand, + >, + bft_state: watch::Receiver>, + ) -> Self { + let recv_chan = Some(recv_chan); + StoreBFTSync { + peer, + recv_chan, + send_chan, + bft_state, + } + } + + pub(crate) fn new_client( + peer: DeviceId, + send_chan: UnboundedSender< + UntypedStoreCommand, + >, + bft_state: watch::Receiver>, + ) -> Self { + StoreBFTSync { + peer, + recv_chan: None, + send_chan, + bft_state, + } + } +} + +impl MiniProtocol for StoreBFTSync +where + Hash: Send, + SHeaderId: for<'a> Deserialize<'a> + Serialize + Send + Sync, + SHeader: Send, + THeaderId: Send, + THeader: Send, +{ + type Message = MsgStoreBFTSync; + + // Has initiative + // JP: Why does this have initiative again? + fn run_server>( + mut self, + mut stream: S, + ) -> impl Future + Send { + async move { + debug!("StoreDAGSync server running!"); + let mut bft_sync: Option> = None; + + let mut recv_chan = self + .recv_chan + .expect("Unreachable. Server must be given a receive channel."); + while let Some(cmd) = recv_chan.recv().await { + match cmd { + StoreBFTSyncCommand::BFTSyncRequest => { // { dag_state } => { + let updates = match bft_sync { + None => { + let (new_bft_sync, operations) = + BFTSyncInitiator::run_new( + &mut stream, + &mut self.bft_state, + ) + .await; + bft_sync = Some(new_bft_sync); + operations + } + Some(ref mut dag_sync) => { + todo!(); + } + }; + // let msg = UntypedStoreCommand::ReceivedBFTOperations { + // peer: self.peer, + // updates, + // }; + // self.send_chan.send(msg).expect("TODO"); + todo!(); + } + } + } + + debug!("StoreBFTSync receiver channel closed"); + } + } + + fn run_client>( + self, + mut stream: S, + ) -> impl Future + Send { + async move { + todo!() + } + } +} + +#[derive(Debug, Serialize, Deserialize)] +pub(crate) enum MsgStoreBFTSync { + Request(MsgBFTSyncRequest), + BFTResponse(MsgBFTSyncResponse), +} + +pub(crate) type SignatureId = Sha256Hash; + +// Either the ID of the aggregate signature or IDs of the partial signatures. +// JP: Or just send the signatures? +#[derive(Debug, Serialize, Deserialize)] +pub(crate) enum ThresholdSignatureId { + Threshold(SignatureId), + Partial(Vec), +} + +impl ThresholdSignatureId { + pub fn new() -> Self { + ThresholdSignatureId::Partial(vec![]) + } +} + +#[derive(Debug, Serialize, Deserialize)] +pub(crate) enum MsgBFTSyncRequest { + BFTInitialSync { + /// Current round we're on. + round: Round, + + /// Tips/frontier of blocks with their round and corresponding signatures. + block_tips: Vec<(Round, BlockId, ThresholdSignatureId)>, + + /// Signatures attesting to completion of the current round. + // JP: Or just send the signatures? + round_complete: ThresholdSignatureId, + }, +} + +impl From for MsgStoreBFTSync { + fn from(msg: MsgBFTSyncRequest) -> Self { + MsgStoreBFTSync::Request(msg) + } +} + +#[derive(Debug, Serialize, Deserialize)] +pub(crate) enum MsgBFTSyncResponse { + Response { + blocks: Vec>, + certificate_signatures: Vec<(BlockId, ThresholdSignature)>, + certificate_partial_signatures: Vec<(BlockId, PartialSignature)>, + + round_complete_signatures: Vec<(Round, ThresholdSignature)>, + round_complete_partial_signatures: Vec<(Round, DeviceId, PartialSignature)>, + + // TODO: Share what we have too? Still need to find the meet of our dags? + // For old rounds, we only need to send blocks that are fully certified?? Or maybe that's not enough if 1 malicious node saw the aggregate signature? + }, + Wait, +} + + +#[derive(Debug)] +pub(crate) enum StoreBFTSyncCommand { + BFTSyncRequest, + // { + // // ecg_status: ECGStatus, + // dag_state: crate::store::dag::UntypedState, + // }, +} + +pub struct BFTSyncInitiator { + todo: PhantomData, // JP: Is SHeaderId needed? +} + +impl BFTSyncInitiator { + /// Create a new ECGSyncInitiator and run the first round. + async fn run_new>>(stream: &mut S, bft_state: &mut watch::Receiver>) -> (Self, Vec<()>) { + let req = { + // TODO: Limit on request sizes. + // Acquire read lock on state. + let bft_state = bft_state.borrow_and_update(); + let round = bft_state.current_round(); + let current_round = &bft_state.round_states()[round as usize]; + let block_tips = bft_state.previous_tips().iter().map(|(round, peer_id)| { + let round_state = &bft_state.round_states()[*round as usize]; + let signed_block = round_state.blocks().get(peer_id).expect("Block not found even though it is a tip"); + let block = signed_block.value(); + let block_id = block.block_id(); + let signature_ids = round_state.certificates().get(&block_id).expect("Signature not found for previous block that's a tip").signature_ids(); + + (*round, block_id, signature_ids) + }).chain( + current_round.blocks().iter().map(|(_peer_id, signed_block)| { + let block = signed_block.value(); + let block_id = block.block_id(); + let signature_ids = current_round.certificates().get(&block_id).map_or_else(|| ThresholdSignatureId::new(), |s| s.signature_ids()); + (round, block_id, signature_ids) + }) + ).collect(); + + + let round_complete = current_round.commit_round().signature_ids(); + MsgBFTSyncRequest::BFTInitialSync { + round, + block_tips, + round_complete, + } + }; + send(stream, req).await.expect("TODO"); + + todo!() + } +} + diff --git a/ossa-core/src/protocol/store_sc_dag/mod.rs b/ossa-core/src/protocol/store_sc_dag/mod.rs new file mode 100644 index 0000000..2d24cd4 --- /dev/null +++ b/ossa-core/src/protocol/store_sc_dag/mod.rs @@ -0,0 +1 @@ +pub mod v0; diff --git a/ossa-core/src/protocol/store_bft_dag/v0.rs b/ossa-core/src/protocol/store_sc_dag/v0.rs similarity index 96% rename from ossa-core/src/protocol/store_bft_dag/v0.rs rename to ossa-core/src/protocol/store_sc_dag/v0.rs index 42ac880..481c160 100644 --- a/ossa-core/src/protocol/store_bft_dag/v0.rs +++ b/ossa-core/src/protocol/store_sc_dag/v0.rs @@ -4,10 +4,11 @@ use std::{collections::BTreeSet, fmt::Debug}; use serde::{Deserialize, Serialize}; use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; use tokio::sync::oneshot; -use tracing::debug; +use tracing::{debug, warn}; use crate::protocol::store_peer::dag_sync::{DAGStateSubscriber, MsgDAGSyncResponse}; use crate::store::dag; +use crate::util::Stream; use crate::{ auth::DeviceId, network::protocol::{receive, MiniProtocol}, @@ -132,7 +133,7 @@ where type Message = MsgStoreDAGSync; // Has initiative - fn run_server>( + fn run_server>( self, mut stream: S, ) -> impl Future + Send { @@ -150,7 +151,7 @@ where None => { // First round of DAG sync, so create and run first round. - // JP: Eventually switch ecg_state to an Arc? + // JP: Eventually switch dag_state to an Arc? let (new_dag_sync, operations) = DAGSyncInitiator::::run_new( &mut stream, @@ -161,6 +162,7 @@ where operations } Some(ref mut dag_sync) => { + warn!("TODO: External updates to `dag_state` might make this out of sync?"); // Subsequent rounds of ECG sync. dag_sync.run_round(&mut stream, &dag_state).await } @@ -182,7 +184,7 @@ where } } - fn run_client>( + fn run_client>( self, mut stream: S, ) -> impl Future + Send { diff --git a/ossa-core/src/store/bft.rs b/ossa-core/src/store/bft.rs index 0a5c46d..56ace4e 100644 --- a/ossa-core/src/store/bft.rs +++ b/ossa-core/src/store/bft.rs @@ -1,6 +1,9 @@ use std::collections::{BTreeMap, BTreeSet}; -use crate::{auth::DeviceId, store::dag::{self, Frontier}, util::Sha256Hash}; +use serde::{Deserialize, Serialize}; +use tokio::sync::watch; + +use crate::{auth::DeviceId, protocol::store_bft_sync::v0::ThresholdSignatureId, store::dag::{self, Frontier}, util::Sha256Hash}; /// A round in the BFT strong consistency protocol. pub type Round = u64; @@ -14,12 +17,45 @@ pub trait SCDT { fn is_valid_operation(&self, op: Self::Op) -> bool; } -pub(crate) struct State { +pub(crate) struct State { pub(crate) initial_state: S, // JP: Should this go somewhere else? Potentially `DecryptedState`? - pub(crate) dag_state: dag::State, - /// Frontier of operations that have been comitted in the DAG state. - pub(crate) committed_frontier: Frontier, - pub(crate) round_states: Vec>, + pub(crate) dag_state: dag::State, + /// Frontier of operations that have been committed in the DAG state. + pub(crate) committed_frontier: Frontier, + /// State for BFT sync / consensus. + pub(crate) bft_state: watch::Sender>, +} + +pub(crate) struct BFTState { + // Current round. Starts at 0. + current_round: Round, + /// Round states of BFT sync. + round_states: Vec>, + /// Tips that are blocks (signed by the given peer) from previous rounds. These should all have fully aggregated signatures. + // JP: Are they all from the previous round? + previous_tips: Vec<(Round, DeviceId)>, +} + +impl BFTState { + pub(crate) fn new() -> Self { + Self { + current_round: 0, + round_states: vec![], + previous_tips: vec![], + } + } + + pub(crate) fn current_round(&self) -> u64 { + self.current_round + } + + pub(crate) fn round_states(&self) -> &[RoundState] { + &self.round_states + } + + pub(crate) fn previous_tips(&self) -> &[(Round, DeviceId)] { + &self.previous_tips + } } impl State { @@ -28,7 +64,7 @@ impl State { initial_state, dag_state: dag::State::new(), committed_frontier: BTreeSet::new(), - round_states: vec![], + bft_state: watch::Sender::new(BFTState::new()), } } } @@ -40,40 +76,82 @@ pub struct Signed { signature: ed25519_dalek::Signature, } -pub(crate) struct RoundState { +impl Signed { + pub fn value(&self) -> &A { + &self.value + } +} + +pub(crate) struct RoundState { // Blocks in this round for each validator. // A validator can only sign a single block in each round (otherwise, they are detected to be malicious). - blocks: BTreeMap>>, + blocks: BTreeMap>>, // 2/3 (?) of (weighted) validators promise to make the block available and validated/approve of operations. certificates: BTreeMap>, // 2/3 (1/3?) of (weighted) validators have seen 2/3 (?) of the certificates. commit_round: ThresholdSigned, } +impl RoundState { + pub(crate) fn commit_round(&self) -> &ThresholdSigned { + &self.commit_round + } + + pub(crate) fn blocks(&self) -> &BTreeMap>> { + &self.blocks + } + + pub(crate) fn certificates(&self) -> &BTreeMap> { + &self.certificates + } +} + +#[derive(Debug, Eq, Ord, PartialEq, PartialOrd, Serialize, Deserialize)] pub(crate) struct BlockId(Sha256Hash); // A BFT block points to the tips of the DAG and the previous round's certificates. -pub(crate) struct Block { - round: u64, - // Tips of DAG operations - dag_frontier: Frontier, +#[derive(Debug, Serialize)] +pub(crate) struct Block { + round: Round, + // Tips of SC DAG operations + dag_frontier: Frontier, // Must contain 2/3 of previous round's certificates (or be round 0). parents: Vec, // Only strong edges, don't need weak edges since blocks point to head of DAG operations anyways // Who proposed the block. proposer: DeviceId, } +impl Block { + pub fn block_id(&self) -> BlockId { + todo!() + } +} + +impl<'de, SHeaderId> Deserialize<'de> for Block { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de> { + todo!("TODO: Manually implement this since derive is broken for generics") + } +} + +#[derive(Debug, Serialize, Deserialize)] pub(crate) struct ThresholdSignature(); // TODO +#[derive(Debug, Serialize, Deserialize)] pub(crate) struct PartialSignature(); // TODO +#[derive(Debug, Serialize, Deserialize)] pub(crate) struct CertificateId(Sha256Hash); +// JP: Do we need this type? Just use the block id? pub(crate) struct Certificate { - // The block's id (hash). + /// The block's id (hash). block: BlockId, - // The block's round. - round: u64, - // Who proposed the block. + /// The block's round. + // JP: Is this needed? + round: Round, + /// Who proposed the block. + // JP: Is this needed? proposer: DeviceId, } @@ -91,9 +169,15 @@ pub(crate) enum ThresholdSigned { }, } +impl ThresholdSigned { + pub fn signature_ids(&self) -> ThresholdSignatureId { + todo!() + } +} + pub(crate) struct RoundComplete { store_id: Sha256Hash, // StoreId, - round: u64, + round: Round, } // JP: Could include hash of previous round's leader block?? Probably not helpful diff --git a/ossa-core/src/store/mod.rs b/ossa-core/src/store/mod.rs index 6eb083b..a3f7add 100644 --- a/ossa-core/src/store/mod.rs +++ b/ossa-core/src/store/mod.rs @@ -16,6 +16,7 @@ use tokio::sync::{ }; use tracing::{debug, error, warn}; +use crate::protocol::store_bft_sync::v0::StoreBFTSync; use crate::store::bft::SCDT; use crate::store::v0::BLOCK_SIZE; use crate::time::ConcretizeTime; @@ -26,7 +27,7 @@ use crate::{ network::multiplexer::{run_miniprotocol_async, SpawnMultiplexerTask}, protocol::{ manager::v0::PeerManagerCommand, - store_bft_dag::v0::{StoreDAGSync, StoreSCGSyncCommand}, + store_sc_dag::v0::{StoreDAGSync, StoreSCGSyncCommand}, store_peer::v0::{StoreSync, StoreSyncCommand}, }, store::{ @@ -1677,7 +1678,12 @@ pub(crate) async fn run_handler( let spawn_task_bft: Box = Box::new(move |party, stream_id, sender, receiver| { tokio::spawn(async move { - unimplemented!(); + warn!("TODO: Should we tell store we're running?"); + + // Start miniprotocol as client. + let mp = StoreBFTSync::new_client(peer, send_commands_untyped); + run_miniprotocol_async(mp, true, stream_id, sender, receiver).await; + debug!("Store BFT sync with peer (without initiative) exited.") }) }); Some((spawn_task_ec, spawn_task_sc, spawn_task_bft)) From 2d5c0511e186e8777f3ce6d947468c60b2d8b3ed Mon Sep 17 00:00:00 2001 From: James Parker Date: Sun, 25 Jan 2026 21:50:42 -0500 Subject: [PATCH 4/8] Compiles: Move BFT state --- ossa-core/src/store/bft.rs | 3 --- ossa-core/src/store/mod.rs | 17 ++++++++++++----- 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/ossa-core/src/store/bft.rs b/ossa-core/src/store/bft.rs index 56ace4e..de23f59 100644 --- a/ossa-core/src/store/bft.rs +++ b/ossa-core/src/store/bft.rs @@ -22,8 +22,6 @@ pub(crate) struct State { pub(crate) dag_state: dag::State, /// Frontier of operations that have been committed in the DAG state. pub(crate) committed_frontier: Frontier, - /// State for BFT sync / consensus. - pub(crate) bft_state: watch::Sender>, } pub(crate) struct BFTState { @@ -64,7 +62,6 @@ impl State { initial_state, dag_state: dag::State::new(), committed_frontier: BTreeSet::new(), - bft_state: watch::Sender::new(BFTState::new()), } } } diff --git a/ossa-core/src/store/mod.rs b/ossa-core/src/store/mod.rs index a3f7add..ec7e268 100644 --- a/ossa-core/src/store/mod.rs +++ b/ossa-core/src/store/mod.rs @@ -4,6 +4,7 @@ use ossa_typeable::Typeable; use rand::{seq::SliceRandom as _, thread_rng}; use replace_with::replace_with_or_abort; use serde::{Deserialize, Serialize}; +use tokio::sync::watch; use std::fmt::{Debug, Display}; use std::marker::PhantomData; use std::{ @@ -17,7 +18,7 @@ use tokio::sync::{ use tracing::{debug, error, warn}; use crate::protocol::store_bft_sync::v0::StoreBFTSync; -use crate::store::bft::SCDT; +use crate::store::bft::{BFTState, SCDT}; use crate::store::v0::BLOCK_SIZE; use crate::time::ConcretizeTime; use crate::util::merkle_tree::{MerkleTree, Potential}; @@ -107,6 +108,8 @@ pub struct State>>, // listeners: Vec>>, + /// State for BFT sync / consensus. + pub(crate) bft_state: watch::Sender>, } // States are: @@ -297,6 +300,7 @@ impl< block_subscribers: BTreeMap::new(), ecg_subscribers: BTreeMap::new(), scg_subscribers: BTreeMap::new(), + bft_state: watch::Sender::new(BFTState::new()), } } @@ -312,6 +316,7 @@ impl< block_subscribers: BTreeMap::new(), ecg_subscribers: BTreeMap::new(), scg_subscribers: BTreeMap::new(), + bft_state: watch::Sender::new(BFTState::new()), } } @@ -1659,7 +1664,7 @@ pub(crate) async fn run_handler( }) }); - let send_commands_untyped = send_commands_untyped.clone(); + let send_commands_untyped_ = send_commands_untyped.clone(); let spawn_task_sc: Box = Box::new(move |party, stream_id, sender, receiver| { tokio::spawn(async move { // JP: Maybe this isn't needed??? @@ -1667,21 +1672,23 @@ pub(crate) async fn run_handler( let register_cmd = UntypedStoreCommand::RegisterIncomingPeerSCGSyncing { peer, }; - send_commands_untyped.send(register_cmd).expect("TODO"); + send_commands_untyped_.send(register_cmd).expect("TODO"); // Start miniprotocol as client. - let mp = StoreDAGSync::new_client(peer, send_commands_untyped); + let mp = StoreDAGSync::new_client(peer, send_commands_untyped_); run_miniprotocol_async(mp, true, stream_id, sender, receiver).await; debug!("Store SCG sync with peer (without initiative) exited.") }) }); + let send_commands_untyped_ = send_commands_untyped.clone(); + let bft_state = store.bft_state.subscribe(); let spawn_task_bft: Box = Box::new(move |party, stream_id, sender, receiver| { tokio::spawn(async move { warn!("TODO: Should we tell store we're running?"); // Start miniprotocol as client. - let mp = StoreBFTSync::new_client(peer, send_commands_untyped); + let mp = StoreBFTSync::new_client(peer, send_commands_untyped_, bft_state); run_miniprotocol_async(mp, true, stream_id, sender, receiver).await; debug!("Store BFT sync with peer (without initiative) exited.") }) From 25875544f9dec5edb26c5ca7b2efae5d1c993e1a Mon Sep 17 00:00:00 2001 From: James Parker Date: Fri, 30 Jan 2026 15:52:27 -0500 Subject: [PATCH 5/8] Pending --- ossa-core/src/protocol/store_bft_sync/v0.rs | 67 +++++++++++++++++++-- ossa-core/src/protocol/store_sc_dag/v0.rs | 1 + ossa-core/src/store/mod.rs | 4 +- 3 files changed, 66 insertions(+), 6 deletions(-) diff --git a/ossa-core/src/protocol/store_bft_sync/v0.rs b/ossa-core/src/protocol/store_bft_sync/v0.rs index 0e211f6..9be404e 100644 --- a/ossa-core/src/protocol/store_bft_sync/v0.rs +++ b/ossa-core/src/protocol/store_bft_sync/v0.rs @@ -8,9 +8,9 @@ use std::{future::Future, marker::PhantomData}; use serde::{Deserialize, Serialize}; use tokio::sync::{mpsc::{UnboundedReceiver, UnboundedSender}, watch}; -use tracing::debug; +use tracing::{debug, warn}; -use crate::{auth::DeviceId, network::protocol::{send, MiniProtocol}, store::{bft::{BFTState, Block, BlockId, PartialSignature, Round, ThresholdSignature}, dag, UntypedStoreCommand}, util::{Sha256Hash, Stream}}; +use crate::{auth::DeviceId, network::protocol::{receive, send, MiniProtocol}, store::{bft::{BFTState, Block, BlockId, PartialSignature, Round, ThresholdSignature}, dag, UntypedStoreCommand}, util::{Sha256Hash, Stream}}; pub(crate) struct StoreBFTSync { @@ -82,7 +82,7 @@ where .expect("Unreachable. Server must be given a receive channel."); while let Some(cmd) = recv_chan.recv().await { match cmd { - StoreBFTSyncCommand::BFTSyncRequest => { // { dag_state } => { + StoreBFTSyncCommand::BFTSyncRequest => { let updates = match bft_sync { None => { let (new_bft_sync, operations) = @@ -117,7 +117,32 @@ where mut stream: S, ) -> impl Future + Send { async move { - todo!() + debug!("StoreBFTSync client running!"); + let mut bft_sync: Option = None; + + // TODO: Check when done. + loop { + // Receive request. + let request = receive(&mut stream).await.expect("TODO"); + match request { + MsgBFTSyncRequest::BFTInitialSync { round, block_tips, round_complete } => { + debug!("Received initial BFT sync request with round: {round:?}\nblock_tips: {block_tips:?}\nround_complete: {round_complete:?}"); + + if bft_sync.is_some() { + todo!("TODO: Error, BFT sync has already been initialized."); + } + + let mut bft_sync_ = BFTSyncResponder::new(); + bft_sync_.run_initial().await; + bft_sync = Some(bft_sync_); + + + + // TODO: If our round is behind their round, we may want to request updates from them + } + } + } + debug!("StoreBFTSync client exited"); } } } @@ -165,6 +190,17 @@ impl From for MsgStoreBFTSync { } } +impl TryInto for MsgStoreBFTSync { + type Error = (); + + fn try_into(self) -> Result { + match self { + MsgStoreBFTSync::Request(msg_bftsync_request) => Ok(msg_bftsync_request), + MsgStoreBFTSync::BFTResponse(_msg_bftsync_response) => Err(()), + } + } +} + #[derive(Debug, Serialize, Deserialize)] pub(crate) enum MsgBFTSyncResponse { Response { @@ -200,6 +236,7 @@ impl BFTSyncInitiator { async fn run_new>>(stream: &mut S, bft_state: &mut watch::Receiver>) -> (Self, Vec<()>) { let req = { // TODO: Limit on request sizes. + warn!("TODO: Check request sizes."); // Acquire read lock on state. let bft_state = bft_state.borrow_and_update(); let round = bft_state.current_round(); @@ -213,7 +250,7 @@ impl BFTSyncInitiator { (*round, block_id, signature_ids) }).chain( - current_round.blocks().iter().map(|(_peer_id, signed_block)| { + current_round.blocks().values().map(|signed_block| { let block = signed_block.value(); let block_id = block.block_id(); let signature_ids = current_round.certificates().get(&block_id).map_or_else(|| ThresholdSignatureId::new(), |s| s.signature_ids()); @@ -235,3 +272,23 @@ impl BFTSyncInitiator { } } +pub struct BFTSyncResponder { +} + +impl BFTSyncResponder { + fn new() -> Self { + Self { } + } + + async fn run_initial(&self) { + continuehere + // TODO: Record everything they have + // + // For each block_tips: + // If round is less than our round and we have the block, respond with all children of that block recursively (signatures should be aggregate for these blocks) + // + // If their round matches our round, send everything they don't have from the current round. + // If round is less than our round, respond with round_complete signatures for rounds greater than or equal to round (and less than the rounds that we fully responded with). + todo!() + } +} diff --git a/ossa-core/src/protocol/store_sc_dag/v0.rs b/ossa-core/src/protocol/store_sc_dag/v0.rs index 481c160..08886c3 100644 --- a/ossa-core/src/protocol/store_sc_dag/v0.rs +++ b/ossa-core/src/protocol/store_sc_dag/v0.rs @@ -226,6 +226,7 @@ where } } } + debug!("StoreDAGSync client exited"); } } } diff --git a/ossa-core/src/store/mod.rs b/ossa-core/src/store/mod.rs index ec7e268..7ea0dcc 100644 --- a/ossa-core/src/store/mod.rs +++ b/ossa-core/src/store/mod.rs @@ -633,7 +633,9 @@ impl< let dag_state = sc_state.dag_state.state().clone(); let message = StoreSCGSyncCommand::SCGSyncRequest { dag_state }; debug!("Sending SCG sync request to peer ({})", p.0); - send_command(&mut p.1.scg_status, message) + send_command(&mut p.1.scg_status, message); + + todo!("Send BFT sync requests"); }); } _ => {} From 981ed8246e1e74b521cb56b51e29b3bdd20424ae Mon Sep 17 00:00:00 2001 From: James Parker Date: Sat, 31 Jan 2026 18:02:26 -0500 Subject: [PATCH 6/8] Send BFT tips in order --- ossa-core/src/protocol/store_bft_sync/v0.rs | 108 ++++++++++++++++++-- ossa-core/src/store/bft.rs | 10 +- 2 files changed, 109 insertions(+), 9 deletions(-) diff --git a/ossa-core/src/protocol/store_bft_sync/v0.rs b/ossa-core/src/protocol/store_bft_sync/v0.rs index 9be404e..2922ec3 100644 --- a/ossa-core/src/protocol/store_bft_sync/v0.rs +++ b/ossa-core/src/protocol/store_bft_sync/v0.rs @@ -10,8 +10,7 @@ use serde::{Deserialize, Serialize}; use tokio::sync::{mpsc::{UnboundedReceiver, UnboundedSender}, watch}; use tracing::{debug, warn}; -use crate::{auth::DeviceId, network::protocol::{receive, send, MiniProtocol}, store::{bft::{BFTState, Block, BlockId, PartialSignature, Round, ThresholdSignature}, dag, UntypedStoreCommand}, util::{Sha256Hash, Stream}}; - +use crate::{auth::DeviceId, network::protocol::{receive, send, MiniProtocol}, protocol::store_peer::dag_sync::MAX_HAVE_HEADERS, store::{bft::{BFTState, Block, BlockId, PartialSignature, Round, ThresholdSignature}, dag, UntypedStoreCommand}, util::{Sha256Hash, Stream}}; pub(crate) struct StoreBFTSync { peer: DeviceId, @@ -169,6 +168,20 @@ impl ThresholdSignatureId { } } +// For use in a Vec of block tips, ordered by (Round, BlockId). Block signatures ordered by SignatureId. +#[derive(Debug, Serialize, Deserialize)] +pub(crate) enum BlockStreamElement { + Block(Round, BlockId), + BlockSignatures(SignatureId), + End, +} + +#[derive(Debug, Serialize, Deserialize)] +pub(crate) enum RoundCompleteStreamElement { + RoundSignatures(SignatureId), + End, +} + #[derive(Debug, Serialize, Deserialize)] pub(crate) enum MsgBFTSyncRequest { BFTInitialSync { @@ -176,11 +189,11 @@ pub(crate) enum MsgBFTSyncRequest { round: Round, /// Tips/frontier of blocks with their round and corresponding signatures. - block_tips: Vec<(Round, BlockId, ThresholdSignatureId)>, + block_tips: Vec, - /// Signatures attesting to completion of the current round. + /// Signatures attesting to completion of the current round, in order by SignatureId. // JP: Or just send the signatures? - round_complete: ThresholdSignatureId, + round_complete: Vec, }, } @@ -235,10 +248,74 @@ impl BFTSyncInitiator { /// Create a new ECGSyncInitiator and run the first round. async fn run_new>>(stream: &mut S, bft_state: &mut watch::Receiver>) -> (Self, Vec<()>) { let req = { - // TODO: Limit on request sizes. - warn!("TODO: Check request sizes."); // Acquire read lock on state. let bft_state = bft_state.borrow_and_update(); + let round = bft_state.current_round(); + + // Get the current tips. + let current_tips = bft_state.get_current_tips(); + + // Sort by (Round, BlockId) + let sorted_blocks = current_tips.iter().map(|(round, peer_id)| { + let round_state = &bft_state.round_states()[*round as usize]; + let signed_block = round_state.blocks().get(peer_id).expect("Block not found even though it is a tip"); + let block = signed_block.value(); + let block_id = block.block_id(); + + // Return signatures on block, sorted. + let signatures = round_state.certificates().get(&block_id).map_or_else(|| vec![], |ts| { + let mut sig_ids = ts.signature_ids(); + sig_ids.sort(); + sig_ids + }); + + (round, block_id, signatures) + }).collect::>(); + sorted_blocks.sort_by_key(|(round, peer_id, _)| (round, peer_id)); + + let mut block_tips = Vec::with_capacity(MAX_HAVE_HEADERS as usize); + let mut current_block_pos = 0; + let mut current_signature_pos = None; + + // Iterate over them, up to the limit: + while block_tips.len() < MAX_HAVE_HEADERS.into() { + let Some(current_block) = sorted_blocks.get(current_block_pos) else { + // We're done so end the stream and exit the loop. + block_tips.push(BlockStreamElement::End); + break; + }; + + match current_signature_pos { + Some(j) => { + if let Some(signature_id) = current_block.2.get(j) { + // Append signature + let elmt = BlockStreamElement::BlockSignatures(*signature_id); + block_tips.push(elmt); + current_signature_pos = Some(j + 1); + } else { + // We're done with the signatures so go to the next block. + current_block_pos += 1; + current_signature_pos = None; + } + }, + None => { + // Append block + let elmt = BlockStreamElement::Block(*current_block.0, current_block.1); + block_tips.push(elmt) + } + } + + } + + + + + + + + // OLD: + /* + let round = bft_state.current_round(); let current_round = &bft_state.round_states()[round as usize]; let block_tips = bft_state.previous_tips().iter().map(|(round, peer_id)| { @@ -265,6 +342,9 @@ impl BFTSyncInitiator { block_tips, round_complete, } + + */ + todo!() }; send(stream, req).await.expect("TODO"); @@ -290,5 +370,19 @@ impl BFTSyncResponder { // If their round matches our round, send everything they don't have from the current round. // If round is less than our round, respond with round_complete signatures for rounds greater than or equal to round (and less than the rounds that we fully responded with). todo!() + + + // TODO: Send tips in order (Round, BlockId)? Upon receipt of a tip, if we see that a tip was skipped, respond with the skipped tips. Upon receiving END, queue everything after the last tip + // + // If our round is behind theirs, tell them to wait? + // + // Upon receipt of their tip, cases: + // - we have tip + // - Check for skipped tips and send those. (or if we have an aggregate signature and they don't, send the aggregate signature + // - Start queue of children to send + // - we don't have tip + // - Either the tip is: + // - a descendent of what we know (round is later than our current round?) + // - in a fork/sibling of our view (round is <= our current round) } } diff --git a/ossa-core/src/store/bft.rs b/ossa-core/src/store/bft.rs index de23f59..03683e6 100644 --- a/ossa-core/src/store/bft.rs +++ b/ossa-core/src/store/bft.rs @@ -3,7 +3,7 @@ use std::collections::{BTreeMap, BTreeSet}; use serde::{Deserialize, Serialize}; use tokio::sync::watch; -use crate::{auth::DeviceId, protocol::store_bft_sync::v0::ThresholdSignatureId, store::dag::{self, Frontier}, util::Sha256Hash}; +use crate::{auth::DeviceId, protocol::store_bft_sync::v0::{SignatureId, ThresholdSignatureId}, store::dag::{self, Frontier}, util::Sha256Hash}; /// A round in the BFT strong consistency protocol. pub type Round = u64; @@ -54,6 +54,11 @@ impl BFTState { pub(crate) fn previous_tips(&self) -> &[(Round, DeviceId)] { &self.previous_tips } + + pub(crate) fn get_current_tips(&self) -> &[(Round, DeviceId)] { + // Get all active tips + todo!() + } } impl State { @@ -167,7 +172,8 @@ pub(crate) enum ThresholdSigned { } impl ThresholdSigned { - pub fn signature_ids(&self) -> ThresholdSignatureId { + // pub fn signature_ids(&self) -> ThresholdSignatureId { + pub fn signature_ids(&self) -> Vec { todo!() } } From cb6a16e54a479ed985bc6ffc933a4138d7debb74 Mon Sep 17 00:00:00 2001 From: James Parker Date: Sat, 31 Jan 2026 21:45:10 -0500 Subject: [PATCH 7/8] Batch round_complete signatures --- ossa-core/src/protocol/store_bft_sync/v0.rs | 43 +++++---------------- ossa-core/src/store/bft.rs | 2 +- 2 files changed, 10 insertions(+), 35 deletions(-) diff --git a/ossa-core/src/protocol/store_bft_sync/v0.rs b/ossa-core/src/protocol/store_bft_sync/v0.rs index 2922ec3..e700890 100644 --- a/ossa-core/src/protocol/store_bft_sync/v0.rs +++ b/ossa-core/src/protocol/store_bft_sync/v0.rs @@ -256,7 +256,7 @@ impl BFTSyncInitiator { let current_tips = bft_state.get_current_tips(); // Sort by (Round, BlockId) - let sorted_blocks = current_tips.iter().map(|(round, peer_id)| { + let mut sorted_blocks = current_tips.iter().map(|(round, peer_id)| { let round_state = &bft_state.round_states()[*round as usize]; let signed_block = round_state.blocks().get(peer_id).expect("Block not found even though it is a tip"); let block = signed_block.value(); @@ -271,7 +271,7 @@ impl BFTSyncInitiator { (round, block_id, signatures) }).collect::>(); - sorted_blocks.sort_by_key(|(round, peer_id, _)| (round, peer_id)); + sorted_blocks.sort_by_key(|(round, peer_id, _)| (*round, *peer_id)); let mut block_tips = Vec::with_capacity(MAX_HAVE_HEADERS as usize); let mut current_block_pos = 0; @@ -301,50 +301,25 @@ impl BFTSyncInitiator { None => { // Append block let elmt = BlockStreamElement::Block(*current_block.0, current_block.1); - block_tips.push(elmt) + block_tips.push(elmt); + current_signature_pos = Some(0); } } - } - - - - - - - // OLD: - /* - - let round = bft_state.current_round(); + // Send round complete signatures. let current_round = &bft_state.round_states()[round as usize]; - let block_tips = bft_state.previous_tips().iter().map(|(round, peer_id)| { - let round_state = &bft_state.round_states()[*round as usize]; - let signed_block = round_state.blocks().get(peer_id).expect("Block not found even though it is a tip"); - let block = signed_block.value(); - let block_id = block.block_id(); - let signature_ids = round_state.certificates().get(&block_id).expect("Signature not found for previous block that's a tip").signature_ids(); - - (*round, block_id, signature_ids) - }).chain( - current_round.blocks().values().map(|signed_block| { - let block = signed_block.value(); - let block_id = block.block_id(); - let signature_ids = current_round.certificates().get(&block_id).map_or_else(|| ThresholdSignatureId::new(), |s| s.signature_ids()); - (round, block_id, signature_ids) - }) - ).collect(); + let mut round_complete = current_round.commit_round().signature_ids().into_iter().map(RoundCompleteStreamElement::RoundSignatures).collect::>(); + round_complete.push(RoundCompleteStreamElement::End); + let remaining_round_complete = round_complete.split_off(MAX_HAVE_HEADERS.into()); + // TODO: Store remaining_round_complete and other state. - let round_complete = current_round.commit_round().signature_ids(); MsgBFTSyncRequest::BFTInitialSync { round, block_tips, round_complete, } - - */ - todo!() }; send(stream, req).await.expect("TODO"); diff --git a/ossa-core/src/store/bft.rs b/ossa-core/src/store/bft.rs index 03683e6..58d47ba 100644 --- a/ossa-core/src/store/bft.rs +++ b/ossa-core/src/store/bft.rs @@ -108,7 +108,7 @@ impl RoundState { } } -#[derive(Debug, Eq, Ord, PartialEq, PartialOrd, Serialize, Deserialize)] +#[derive(Debug, Clone, Copy, Eq, Ord, PartialEq, PartialOrd, Serialize, Deserialize)] pub(crate) struct BlockId(Sha256Hash); // A BFT block points to the tips of the DAG and the previous round's certificates. From 614ee426c4c42dc75ba8797771efb4d918a9dd98 Mon Sep 17 00:00:00 2001 From: James Parker Date: Sun, 1 Feb 2026 21:59:57 -0500 Subject: [PATCH 8/8] Pending --- .../src/network/protocol/ecg_sync/v0/mod.rs | 4 +- ossa-core/src/protocol/store_bft_sync/v0.rs | 117 +++++++++++++++--- ossa-core/src/protocol/store_peer/dag_sync.rs | 2 +- 3 files changed, 102 insertions(+), 21 deletions(-) diff --git a/ossa-core/src/network/protocol/ecg_sync/v0/mod.rs b/ossa-core/src/network/protocol/ecg_sync/v0/mod.rs index 163f3c2..d72a4fc 100644 --- a/ossa-core/src/network/protocol/ecg_sync/v0/mod.rs +++ b/ossa-core/src/network/protocol/ecg_sync/v0/mod.rs @@ -43,11 +43,11 @@ pub type ECGSync = Send<(), Eps>; // TODO // // Client: // -// Send all headers he have that they don't (batched). +// Send all headers we have that they don't (batched). // // Client: // -// Send all headers he have that they don't (batched). +// Send all headers we have that they don't (batched). // /// The maximum number of `have` hashes that can be sent in each message. diff --git a/ossa-core/src/protocol/store_bft_sync/v0.rs b/ossa-core/src/protocol/store_bft_sync/v0.rs index e700890..25bc9d4 100644 --- a/ossa-core/src/protocol/store_bft_sync/v0.rs +++ b/ossa-core/src/protocol/store_bft_sync/v0.rs @@ -131,8 +131,13 @@ where todo!("TODO: Error, BFT sync has already been initialized."); } - let mut bft_sync_ = BFTSyncResponder::new(); - bft_sync_.run_initial().await; + let bft_sync_ = BFTSyncResponder::run_initial( + &mut stream, + &mut self.bft_state, + round, + block_tips, + round_complete, + ).await; bft_sync = Some(bft_sync_); @@ -230,6 +235,22 @@ pub(crate) enum MsgBFTSyncResponse { Wait, } +impl From> for MsgStoreBFTSync { + fn from(msg: MsgBFTSyncResponse) -> Self { + MsgStoreBFTSync::BFTResponse(msg) + } +} + +impl TryInto> for MsgStoreBFTSync { + type Error = (); + + fn try_into(self) -> Result, Self::Error> { + match self { + MsgStoreBFTSync::Request(_msg_bftsync_request) => Err(()), + MsgStoreBFTSync::BFTResponse(msg_bftsync_response) => Ok(msg_bftsync_response), + } + } +} #[derive(Debug)] pub(crate) enum StoreBFTSyncCommand { @@ -327,29 +348,68 @@ impl BFTSyncInitiator { } } -pub struct BFTSyncResponder { +pub struct BFTSyncResponder { + their_round: Round, + their_block_tips: Vec, + their_round_complete: Vec, + _phantom: PhantomData, // JP: Is SHeaderId needed? } -impl BFTSyncResponder { - fn new() -> Self { - Self { } - } +impl BFTSyncResponder { - async fn run_initial(&self) { - continuehere - // TODO: Record everything they have - // - // For each block_tips: - // If round is less than our round and we have the block, respond with all children of that block recursively (signatures should be aggregate for these blocks) - // - // If their round matches our round, send everything they don't have from the current round. - // If round is less than our round, respond with round_complete signatures for rounds greater than or equal to round (and less than the rounds that we fully responded with). - todo!() + async fn run_initial>>( + stream: &mut S, + bft_state: &mut watch::Receiver>, + their_round: Round, + their_block_tips: Vec, + their_round_complete: Vec, + ) -> Self { + let new = Self { + their_round, + their_block_tips, + their_round_complete, + _phantom: PhantomData, + }; + // // TODO: Record everything they have + // // + // // For each block_tips: + // // If round is less than our round and we have the block, respond with all children of that block recursively (signatures should be aggregate for these blocks) + // // + // // If their round matches our round, send everything they don't have from the current round. + // // If round is less than our round, respond with round_complete signatures for rounds greater than or equal to round (and less than the rounds that we fully responded with). + + let resp_m = { + // Acquire read lock on state. + let bft_state = bft_state.borrow_and_update(); + let round = bft_state.current_round(); + + // If our round is behind theirs, tell them to wait. + if round < their_round { + None // JP: Send previous tips that they don't have? + } else { + let resp = new.build_response(bft_state); + Some(resp) + } + }; + + match resp_m { + Some(resp) => { + send(stream, resp).await.expect("TODO"); + } + None => { + send(stream, MsgBFTSyncResponse::Wait).await.expect("TODO"); + + // Acquire read lock on state once we've caught up. + let bft_state = bft_state.wait_for(|s| s.current_round() >= their_round).await.expect("TODO: channel closed"); + + let resp = new.build_response(bft_state); + send(stream, resp).await.expect("TODO"); + } + } // TODO: Send tips in order (Round, BlockId)? Upon receipt of a tip, if we see that a tip was skipped, respond with the skipped tips. Upon receiving END, queue everything after the last tip // - // If our round is behind theirs, tell them to wait? // // Upon receipt of their tip, cases: // - we have tip @@ -359,5 +419,26 @@ impl BFTSyncResponder { // - Either the tip is: // - a descendent of what we know (round is later than our current round?) // - in a fork/sibling of our view (round is <= our current round) + + new + } + + // Precondition: our_round >= their_round + fn build_response( + &self, + bft_state: watch::Ref<'_, BFTState>, + ) -> MsgBFTSyncResponse { + // Send all previous tips (sorted) less that the current round (that they don't have)? + // Get and sort our tips. Along with everything starting from their_round??? + // Iterate over their tips + // JP: With this approach, we'll miss nodes where we know a child that depends on it but they dont? + // Or they know a child that depends on it but we do? But in this case, it's ok, we'll just send it even though they don't need it. + + + // Alternative: + // + // Mark th + + todo!() } } diff --git a/ossa-core/src/protocol/store_peer/dag_sync.rs b/ossa-core/src/protocol/store_peer/dag_sync.rs index ca9eea1..1b0e567 100644 --- a/ossa-core/src/protocol/store_peer/dag_sync.rs +++ b/ossa-core/src/protocol/store_peer/dag_sync.rs @@ -171,7 +171,7 @@ impl< // TODO: Limit on tips (128? 64? 32? MAX_HAVE_HEADERS) warn!("TODO: Check request sizes."); let req = MsgDAGSyncRequest::DAGSync { - tips: dag_state.tips().iter().cloned().collect(), + tips: dag_state.tips().iter().cloned().collect(), // JP: Why does this send all the tips again? TODO: Limit the tips... Maybe do this on the DAG construction side? known: known_bitmap, }; send(stream, req).await.expect("TODO");