diff --git a/ossa-core/src/network/multiplexer.rs b/ossa-core/src/network/multiplexer.rs index 112e2aa..6d48025 100644 --- a/ossa-core/src/network/multiplexer.rs +++ b/ossa-core/src/network/multiplexer.rs @@ -339,7 +339,6 @@ struct MiniprotocolState { sender: mpsc::Sender, } -// JP: TODO: This O probably isn't needed. pub(crate) async fn run_miniprotocol_async( p: P, is_client: bool, diff --git a/ossa-core/src/network/protocol/ecg_sync/v0/mod.rs b/ossa-core/src/network/protocol/ecg_sync/v0/mod.rs index 163f3c2..d72a4fc 100644 --- a/ossa-core/src/network/protocol/ecg_sync/v0/mod.rs +++ b/ossa-core/src/network/protocol/ecg_sync/v0/mod.rs @@ -43,11 +43,11 @@ pub type ECGSync = Send<(), Eps>; // TODO // // Client: // -// Send all headers he have that they don't (batched). +// Send all headers we have that they don't (batched). // // Client: // -// Send all headers he have that they don't (batched). +// Send all headers we have that they don't (batched). // /// The maximum number of `have` hashes that can be sent in each message. diff --git a/ossa-core/src/protocol/manager/v0.rs b/ossa-core/src/protocol/manager/v0.rs index 30ee909..b8a7986 100644 --- a/ossa-core/src/protocol/manager/v0.rs +++ b/ossa-core/src/protocol/manager/v0.rs @@ -158,10 +158,11 @@ impl< debug!("Manager command channel closed."); // TODO: Do something here? } - Some(PeerManagerCommand::RequestStoreSync { store_id, spawn_task_ec, spawn_task_sc }) => { + Some(PeerManagerCommand::RequestStoreSync { store_id, spawn_task_ec, spawn_task_sc, spawn_task_bft }) => { let ec_stream_id = self.next_stream_id(); let sc_stream_id = self.next_stream_id(); - let _response = self.run_request_new_stream_server(&mut stream, store_id, ec_stream_id, spawn_task_ec, sc_stream_id, spawn_task_sc).await; + let bft_stream_id = self.next_stream_id(); + let _response = self.run_request_new_stream_server(&mut stream, store_id, ec_stream_id, spawn_task_ec, sc_stream_id, spawn_task_sc, bft_stream_id, spawn_task_bft).await; debug!("Requested to sync store with peer."); } } @@ -201,6 +202,7 @@ impl< store_id, ec_stream_id, sc_stream_id, + bft_stream_id, } => { debug!( "Received MsgManagerRequest::CreateStoreStream: {ec_stream_id}, {store_id:?}" @@ -210,6 +212,7 @@ impl< store_id, ec_stream_id, sc_stream_id, + bft_stream_id, ) .await; } @@ -247,12 +250,14 @@ impl< store_id: StoreId, ec_stream_id: StreamId, sc_stream_id: StreamId, + bft_stream_id: StreamId, ) { let accept = { // Check if stream is valid (it can be allocated by peer and is available). let is_valid_id_ec = self.is_valid_stream_id(false, &ec_stream_id); let is_valid_id_sc = self.is_valid_stream_id(false, &sc_stream_id); - if !is_valid_id_ec || !is_valid_id_sc { + let is_valid_id_bft = self.is_valid_stream_id(false, &bft_stream_id); + if !is_valid_id_ec || !is_valid_id_sc || !is_valid_id_bft { debug!("Peer sent invalid stream id."); Err(MsgManagerError::InvalidStreamId) } else { @@ -273,7 +278,7 @@ impl< .expect("TODO"); let spawn_task = rx.await.expect("TODO"); - if let Some((ec_spawn_task, sc_spawn_task)) = spawn_task { + if let Some((ec_spawn_task, sc_spawn_task, bft_spawn_task)) = spawn_task { // Tell multiplexer to create EC miniprotocol. let is_running_ec = self .create_multiplexer_stream(ec_stream_id, ec_spawn_task) @@ -285,15 +290,29 @@ impl< .create_multiplexer_stream(sc_stream_id, sc_spawn_task) .await; - if !is_running_sc { + if is_running_sc { + // Tell multiplexer to create BFT miniprotocol. + let is_running_bft = self + .create_multiplexer_stream(bft_stream_id, bft_spawn_task) + .await; + if !is_running_bft { + error!( + "Failed to create miniprotocol stream to sync BFT store." + ); + panic!("TODO: Shutdown EC+SC miniprotocol..."); + } + + Ok(is_running_bft) + } else { error!( "Failed to create miniprotocol stream to strongly sync store." ); panic!("TODO: Shutdown EC miniprotocol..."); } - - Ok(is_running_sc) } else { + error!( + "Failed to create miniprotocol stream to weakly sync store." + ); Ok(false) } } else { @@ -321,12 +340,15 @@ impl< ec_spawn_task: Box, sc_stream_id: StreamId, sc_spawn_task: Box, + bft_stream_id: StreamId, + bft_spawn_task: Box, ) { // Send request message. let req = MsgManagerRequest::CreateStoreStream { store_id, ec_stream_id, sc_stream_id, + bft_stream_id, }; send(stream, req).await.expect("TODO"); @@ -366,6 +388,18 @@ impl< "TODO: Send shutdown for this miniprotocol and restore status to Known." ); } + + // Tell multiplexer to create BFT miniprotocol. + let is_running = self + .create_multiplexer_stream(bft_stream_id, bft_spawn_task) + .await; + + if !is_running { + error!("Failed to create miniprotocol stream to sync BFT store."); + panic!( + "TODO: Send shutdown for this miniprotocol and restore status to Known." + ); + } } } } @@ -544,6 +578,8 @@ pub(crate) enum MsgManagerRequest { ec_stream_id: StreamId, /// Strongly consistent stream id. sc_stream_id: StreamId, + /// BFT stream id. + bft_stream_id: StreamId, }, } @@ -623,5 +659,6 @@ pub(crate) enum PeerManagerCommand { store_id: StoreId, spawn_task_ec: Box, spawn_task_sc: Box, + spawn_task_bft: Box, }, } diff --git a/ossa-core/src/protocol/mod.rs b/ossa-core/src/protocol/mod.rs index 85be2f9..9dafa7f 100644 --- a/ossa-core/src/protocol/mod.rs +++ b/ossa-core/src/protocol/mod.rs @@ -13,8 +13,9 @@ use crate::{ pub mod heartbeat; pub mod manager; -pub mod store_bft_dag; pub mod store_peer; +pub mod store_sc_dag; +pub mod store_bft_sync; pub mod v0; pub(crate) struct MiniProtocolArgs { diff --git a/ossa-core/src/protocol/store_bft_dag/mod.rs b/ossa-core/src/protocol/store_bft_sync/mod.rs similarity index 100% rename from ossa-core/src/protocol/store_bft_dag/mod.rs rename to ossa-core/src/protocol/store_bft_sync/mod.rs diff --git a/ossa-core/src/protocol/store_bft_sync/v0.rs b/ossa-core/src/protocol/store_bft_sync/v0.rs new file mode 100644 index 0000000..25bc9d4 --- /dev/null +++ b/ossa-core/src/protocol/store_bft_sync/v0.rs @@ -0,0 +1,444 @@ +// This is very similar to DAG sync, except there are a few key differences: +// - The DAG is bipartite between block and certificate nodes +// - We have rounds, so we can take advantage of that (ex, only send previous round blocks if it has a full threshold signature?) +// +// Do we need to go back more than 1 round? + +use std::{future::Future, marker::PhantomData}; + +use serde::{Deserialize, Serialize}; +use tokio::sync::{mpsc::{UnboundedReceiver, UnboundedSender}, watch}; +use tracing::{debug, warn}; + +use crate::{auth::DeviceId, network::protocol::{receive, send, MiniProtocol}, protocol::store_peer::dag_sync::MAX_HAVE_HEADERS, store::{bft::{BFTState, Block, BlockId, PartialSignature, Round, ThresholdSignature}, dag, UntypedStoreCommand}, util::{Sha256Hash, Stream}}; + +pub(crate) struct StoreBFTSync { + peer: DeviceId, + // Receive commands from store if we have initiative or send commands to store if we're the responder. + recv_chan: Option>, + // Send commands to store if we're the responder and send results back to store if we're the initiator. + send_chan: UnboundedSender>, // JP: Make this a stream? + // BFT state + bft_state: watch::Receiver>, +} + +impl StoreBFTSync { + pub(crate) fn new_server( + peer: DeviceId, + recv_chan: UnboundedReceiver, + send_chan: UnboundedSender< + UntypedStoreCommand, + >, + bft_state: watch::Receiver>, + ) -> Self { + let recv_chan = Some(recv_chan); + StoreBFTSync { + peer, + recv_chan, + send_chan, + bft_state, + } + } + + pub(crate) fn new_client( + peer: DeviceId, + send_chan: UnboundedSender< + UntypedStoreCommand, + >, + bft_state: watch::Receiver>, + ) -> Self { + StoreBFTSync { + peer, + recv_chan: None, + send_chan, + bft_state, + } + } +} + +impl MiniProtocol for StoreBFTSync +where + Hash: Send, + SHeaderId: for<'a> Deserialize<'a> + Serialize + Send + Sync, + SHeader: Send, + THeaderId: Send, + THeader: Send, +{ + type Message = MsgStoreBFTSync; + + // Has initiative + // JP: Why does this have initiative again? + fn run_server>( + mut self, + mut stream: S, + ) -> impl Future + Send { + async move { + debug!("StoreDAGSync server running!"); + let mut bft_sync: Option> = None; + + let mut recv_chan = self + .recv_chan + .expect("Unreachable. Server must be given a receive channel."); + while let Some(cmd) = recv_chan.recv().await { + match cmd { + StoreBFTSyncCommand::BFTSyncRequest => { + let updates = match bft_sync { + None => { + let (new_bft_sync, operations) = + BFTSyncInitiator::run_new( + &mut stream, + &mut self.bft_state, + ) + .await; + bft_sync = Some(new_bft_sync); + operations + } + Some(ref mut dag_sync) => { + todo!(); + } + }; + // let msg = UntypedStoreCommand::ReceivedBFTOperations { + // peer: self.peer, + // updates, + // }; + // self.send_chan.send(msg).expect("TODO"); + todo!(); + } + } + } + + debug!("StoreBFTSync receiver channel closed"); + } + } + + fn run_client>( + self, + mut stream: S, + ) -> impl Future + Send { + async move { + debug!("StoreBFTSync client running!"); + let mut bft_sync: Option = None; + + // TODO: Check when done. + loop { + // Receive request. + let request = receive(&mut stream).await.expect("TODO"); + match request { + MsgBFTSyncRequest::BFTInitialSync { round, block_tips, round_complete } => { + debug!("Received initial BFT sync request with round: {round:?}\nblock_tips: {block_tips:?}\nround_complete: {round_complete:?}"); + + if bft_sync.is_some() { + todo!("TODO: Error, BFT sync has already been initialized."); + } + + let bft_sync_ = BFTSyncResponder::run_initial( + &mut stream, + &mut self.bft_state, + round, + block_tips, + round_complete, + ).await; + bft_sync = Some(bft_sync_); + + + + // TODO: If our round is behind their round, we may want to request updates from them + } + } + } + debug!("StoreBFTSync client exited"); + } + } +} + +#[derive(Debug, Serialize, Deserialize)] +pub(crate) enum MsgStoreBFTSync { + Request(MsgBFTSyncRequest), + BFTResponse(MsgBFTSyncResponse), +} + +pub(crate) type SignatureId = Sha256Hash; + +// Either the ID of the aggregate signature or IDs of the partial signatures. +// JP: Or just send the signatures? +#[derive(Debug, Serialize, Deserialize)] +pub(crate) enum ThresholdSignatureId { + Threshold(SignatureId), + Partial(Vec), +} + +impl ThresholdSignatureId { + pub fn new() -> Self { + ThresholdSignatureId::Partial(vec![]) + } +} + +// For use in a Vec of block tips, ordered by (Round, BlockId). Block signatures ordered by SignatureId. +#[derive(Debug, Serialize, Deserialize)] +pub(crate) enum BlockStreamElement { + Block(Round, BlockId), + BlockSignatures(SignatureId), + End, +} + +#[derive(Debug, Serialize, Deserialize)] +pub(crate) enum RoundCompleteStreamElement { + RoundSignatures(SignatureId), + End, +} + +#[derive(Debug, Serialize, Deserialize)] +pub(crate) enum MsgBFTSyncRequest { + BFTInitialSync { + /// Current round we're on. + round: Round, + + /// Tips/frontier of blocks with their round and corresponding signatures. + block_tips: Vec, + + /// Signatures attesting to completion of the current round, in order by SignatureId. + // JP: Or just send the signatures? + round_complete: Vec, + }, +} + +impl From for MsgStoreBFTSync { + fn from(msg: MsgBFTSyncRequest) -> Self { + MsgStoreBFTSync::Request(msg) + } +} + +impl TryInto for MsgStoreBFTSync { + type Error = (); + + fn try_into(self) -> Result { + match self { + MsgStoreBFTSync::Request(msg_bftsync_request) => Ok(msg_bftsync_request), + MsgStoreBFTSync::BFTResponse(_msg_bftsync_response) => Err(()), + } + } +} + +#[derive(Debug, Serialize, Deserialize)] +pub(crate) enum MsgBFTSyncResponse { + Response { + blocks: Vec>, + certificate_signatures: Vec<(BlockId, ThresholdSignature)>, + certificate_partial_signatures: Vec<(BlockId, PartialSignature)>, + + round_complete_signatures: Vec<(Round, ThresholdSignature)>, + round_complete_partial_signatures: Vec<(Round, DeviceId, PartialSignature)>, + + // TODO: Share what we have too? Still need to find the meet of our dags? + // For old rounds, we only need to send blocks that are fully certified?? Or maybe that's not enough if 1 malicious node saw the aggregate signature? + }, + Wait, +} + +impl From> for MsgStoreBFTSync { + fn from(msg: MsgBFTSyncResponse) -> Self { + MsgStoreBFTSync::BFTResponse(msg) + } +} + +impl TryInto> for MsgStoreBFTSync { + type Error = (); + + fn try_into(self) -> Result, Self::Error> { + match self { + MsgStoreBFTSync::Request(_msg_bftsync_request) => Err(()), + MsgStoreBFTSync::BFTResponse(msg_bftsync_response) => Ok(msg_bftsync_response), + } + } +} + +#[derive(Debug)] +pub(crate) enum StoreBFTSyncCommand { + BFTSyncRequest, + // { + // // ecg_status: ECGStatus, + // dag_state: crate::store::dag::UntypedState, + // }, +} + +pub struct BFTSyncInitiator { + todo: PhantomData, // JP: Is SHeaderId needed? +} + +impl BFTSyncInitiator { + /// Create a new ECGSyncInitiator and run the first round. + async fn run_new>>(stream: &mut S, bft_state: &mut watch::Receiver>) -> (Self, Vec<()>) { + let req = { + // Acquire read lock on state. + let bft_state = bft_state.borrow_and_update(); + let round = bft_state.current_round(); + + // Get the current tips. + let current_tips = bft_state.get_current_tips(); + + // Sort by (Round, BlockId) + let mut sorted_blocks = current_tips.iter().map(|(round, peer_id)| { + let round_state = &bft_state.round_states()[*round as usize]; + let signed_block = round_state.blocks().get(peer_id).expect("Block not found even though it is a tip"); + let block = signed_block.value(); + let block_id = block.block_id(); + + // Return signatures on block, sorted. + let signatures = round_state.certificates().get(&block_id).map_or_else(|| vec![], |ts| { + let mut sig_ids = ts.signature_ids(); + sig_ids.sort(); + sig_ids + }); + + (round, block_id, signatures) + }).collect::>(); + sorted_blocks.sort_by_key(|(round, peer_id, _)| (*round, *peer_id)); + + let mut block_tips = Vec::with_capacity(MAX_HAVE_HEADERS as usize); + let mut current_block_pos = 0; + let mut current_signature_pos = None; + + // Iterate over them, up to the limit: + while block_tips.len() < MAX_HAVE_HEADERS.into() { + let Some(current_block) = sorted_blocks.get(current_block_pos) else { + // We're done so end the stream and exit the loop. + block_tips.push(BlockStreamElement::End); + break; + }; + + match current_signature_pos { + Some(j) => { + if let Some(signature_id) = current_block.2.get(j) { + // Append signature + let elmt = BlockStreamElement::BlockSignatures(*signature_id); + block_tips.push(elmt); + current_signature_pos = Some(j + 1); + } else { + // We're done with the signatures so go to the next block. + current_block_pos += 1; + current_signature_pos = None; + } + }, + None => { + // Append block + let elmt = BlockStreamElement::Block(*current_block.0, current_block.1); + block_tips.push(elmt); + current_signature_pos = Some(0); + } + } + } + + // Send round complete signatures. + let current_round = &bft_state.round_states()[round as usize]; + let mut round_complete = current_round.commit_round().signature_ids().into_iter().map(RoundCompleteStreamElement::RoundSignatures).collect::>(); + round_complete.push(RoundCompleteStreamElement::End); + let remaining_round_complete = round_complete.split_off(MAX_HAVE_HEADERS.into()); + + // TODO: Store remaining_round_complete and other state. + + MsgBFTSyncRequest::BFTInitialSync { + round, + block_tips, + round_complete, + } + }; + send(stream, req).await.expect("TODO"); + + todo!() + } +} + +pub struct BFTSyncResponder { + their_round: Round, + their_block_tips: Vec, + their_round_complete: Vec, + _phantom: PhantomData, // JP: Is SHeaderId needed? +} + +impl BFTSyncResponder { + + async fn run_initial>>( + stream: &mut S, + bft_state: &mut watch::Receiver>, + their_round: Round, + their_block_tips: Vec, + their_round_complete: Vec, + ) -> Self { + let new = Self { + their_round, + their_block_tips, + their_round_complete, + _phantom: PhantomData, + }; + + // // TODO: Record everything they have + // // + // // For each block_tips: + // // If round is less than our round and we have the block, respond with all children of that block recursively (signatures should be aggregate for these blocks) + // // + // // If their round matches our round, send everything they don't have from the current round. + // // If round is less than our round, respond with round_complete signatures for rounds greater than or equal to round (and less than the rounds that we fully responded with). + + let resp_m = { + // Acquire read lock on state. + let bft_state = bft_state.borrow_and_update(); + let round = bft_state.current_round(); + + // If our round is behind theirs, tell them to wait. + if round < their_round { + None // JP: Send previous tips that they don't have? + } else { + let resp = new.build_response(bft_state); + Some(resp) + } + }; + + match resp_m { + Some(resp) => { + send(stream, resp).await.expect("TODO"); + } + None => { + send(stream, MsgBFTSyncResponse::Wait).await.expect("TODO"); + + // Acquire read lock on state once we've caught up. + let bft_state = bft_state.wait_for(|s| s.current_round() >= their_round).await.expect("TODO: channel closed"); + + let resp = new.build_response(bft_state); + send(stream, resp).await.expect("TODO"); + } + } + + // TODO: Send tips in order (Round, BlockId)? Upon receipt of a tip, if we see that a tip was skipped, respond with the skipped tips. Upon receiving END, queue everything after the last tip + // + // + // Upon receipt of their tip, cases: + // - we have tip + // - Check for skipped tips and send those. (or if we have an aggregate signature and they don't, send the aggregate signature + // - Start queue of children to send + // - we don't have tip + // - Either the tip is: + // - a descendent of what we know (round is later than our current round?) + // - in a fork/sibling of our view (round is <= our current round) + + new + } + + // Precondition: our_round >= their_round + fn build_response( + &self, + bft_state: watch::Ref<'_, BFTState>, + ) -> MsgBFTSyncResponse { + // Send all previous tips (sorted) less that the current round (that they don't have)? + // Get and sort our tips. Along with everything starting from their_round??? + // Iterate over their tips + // JP: With this approach, we'll miss nodes where we know a child that depends on it but they dont? + // Or they know a child that depends on it but we do? But in this case, it's ok, we'll just send it even though they don't need it. + + + // Alternative: + // + // Mark th + + todo!() + } +} diff --git a/ossa-core/src/protocol/store_peer/ecg_sync.rs b/ossa-core/src/protocol/store_peer/dag_sync.rs similarity index 95% rename from ossa-core/src/protocol/store_peer/ecg_sync.rs rename to ossa-core/src/protocol/store_peer/dag_sync.rs index 6c875ab..1b0e567 100644 --- a/ossa-core/src/protocol/store_peer/ecg_sync.rs +++ b/ossa-core/src/protocol/store_peer/dag_sync.rs @@ -81,13 +81,13 @@ pub(crate) enum MsgDAGSyncRequest { pub(crate) enum MsgDAGSyncResponse { Response { have: Vec, - operations: Vec<(Header, RawDAGBody)>, // ECG headers and serialized ECG body. + operations: Vec<(Header, RawDAGBody)>, // DAG headers and serialized DAG body. }, Wait, // JP: Use StoreSyncResponse? } // Has initiative -pub(crate) struct ECGSyncInitiator { +pub(crate) struct DAGSyncInitiator { have: Vec, phantom: PhantomData, } @@ -96,7 +96,7 @@ impl< Hash: Send + Sync, HeaderId: Clone + Debug + Ord + Send + Sync, Header: Debug + Send + Sync, - > ECGSyncInitiator + > DAGSyncInitiator { async fn receive_response_helper, Msg>( stream: &mut S, @@ -125,7 +125,7 @@ impl< // TODO: Eventually take an Arc pub(crate) async fn run_new, Msg>( stream: &mut S, - ecg_state: &dag::UntypedState, + dag_state: &dag::UntypedState, ) -> (Self, Vec<(Header, RawDAGBody)>) where MsgDAGSyncRequest: Into, @@ -134,26 +134,26 @@ impl< // TODO: Limit on tips (128? 64? 32? MAX_HAVE_HEADERS) warn!("TODO: Check request sizes."); let req = MsgDAGSyncRequest::DAGInitialSync { - tips: ecg_state.tips().iter().cloned().collect(), + tips: dag_state.tips().iter().cloned().collect(), }; send(stream, req).await.expect("TODO"); // Receive response. let (have, operations) = Self::receive_response_helper(stream).await; - let ecg_sync = ECGSyncInitiator { + let dag_sync = DAGSyncInitiator { have, phantom: PhantomData, }; - (ecg_sync, operations) + (dag_sync, operations) } /// Run a round of ECG sync, requesting new operations from peer. pub(crate) async fn run_round, Msg>( &mut self, stream: &mut S, - ecg_state: &dag::UntypedState, + dag_state: &dag::UntypedState, ) -> Vec<(Header, RawDAGBody)> where MsgDAGSyncRequest: Into, @@ -162,7 +162,7 @@ impl< // Check which headers they sent us that we know. let mut known_bitmap = BitArray::ZERO; for (i, header_id) in self.have.iter().enumerate() { - if ecg_state.contains(header_id) { + if dag_state.contains(header_id) { // Respond with which headers we know. known_bitmap.set(i, true); } @@ -171,7 +171,7 @@ impl< // TODO: Limit on tips (128? 64? 32? MAX_HAVE_HEADERS) warn!("TODO: Check request sizes."); let req = MsgDAGSyncRequest::DAGSync { - tips: ecg_state.tips().iter().cloned().collect(), + tips: dag_state.tips().iter().cloned().collect(), // JP: Why does this send all the tips again? TODO: Limit the tips... Maybe do this on the DAG construction side? known: known_bitmap, }; send(stream, req).await.expect("TODO"); @@ -186,7 +186,7 @@ impl< } // Responder -pub(crate) struct ECGSyncResponder { +pub(crate) struct DAGSyncResponder { pub(crate) their_known: BTreeSet, pub(crate) our_unknown: BTreeSet, pub(crate) send_queue: BinaryHeap<(Reverse, HeaderId)>, @@ -218,7 +218,7 @@ pub(crate) fn mark_as_known_helper( } } -impl ECGSyncResponder { +impl DAGSyncResponder { fn they_know(&self, header_id: &HeaderId) -> bool where HeaderId: Copy + Ord, @@ -245,7 +245,7 @@ impl ECGSyncResponder { where HeaderId: std::cmp::Ord, { - ECGSyncResponder { + DAGSyncResponder { their_known: BTreeSet::new(), send_queue: BinaryHeap::new(), our_unknown: BTreeSet::new(), @@ -536,7 +536,7 @@ impl ECGSyncResponder { pub(crate) trait DAGStateSubscriber { async fn request_dag_state( &self, - responder: &mut ECGSyncResponder, + responder: &mut DAGSyncResponder, tips: Option>, ) -> dag::UntypedState; } diff --git a/ossa-core/src/protocol/store_peer/mod.rs b/ossa-core/src/protocol/store_peer/mod.rs index 1f99034..4371574 100644 --- a/ossa-core/src/protocol/store_peer/mod.rs +++ b/ossa-core/src/protocol/store_peer/mod.rs @@ -1,2 +1,2 @@ -pub mod ecg_sync; +pub mod dag_sync; pub mod v0; diff --git a/ossa-core/src/protocol/store_peer/v0.rs b/ossa-core/src/protocol/store_peer/v0.rs index 10d44e7..4ac6a0e 100644 --- a/ossa-core/src/protocol/store_peer/v0.rs +++ b/ossa-core/src/protocol/store_peer/v0.rs @@ -10,8 +10,8 @@ use tracing::debug; use crate::{ auth::DeviceId, network::protocol::{receive, send, MiniProtocol}, - protocol::store_peer::ecg_sync::{ - DAGStateSubscriber, ECGSyncInitiator, ECGSyncResponder, MsgDAGSyncRequest, + protocol::store_peer::dag_sync::{ + DAGStateSubscriber, DAGSyncInitiator, DAGSyncResponder, MsgDAGSyncRequest, MsgDAGSyncResponse, }, store::{self, dag, HandlePeerRequest, UntypedStoreCommand}, @@ -199,7 +199,7 @@ where { async fn request_dag_state( &self, - responder: &mut ECGSyncResponder, + responder: &mut DAGSyncResponder, tips: Option>, ) -> dag::UntypedState { debug!("Requesting ECG state"); @@ -356,7 +356,7 @@ impl< mut stream: S, ) -> impl Future + Send { async move { - let mut ecg_sync: Option> = None; + let mut ecg_sync: Option> = None; // Wait for command from store. let mut recv_chan = self @@ -465,7 +465,7 @@ impl< // JP: Eventually switch ecg_state to an Arc? let (new_ecg_sync, operations) = - ECGSyncInitiator::run_new(&mut stream, &ecg_state).await; + DAGSyncInitiator::run_new(&mut stream, &ecg_state).await; ecg_sync = Some(new_ecg_sync); operations } @@ -506,7 +506,7 @@ impl< mut stream: S, ) -> impl Future + Send { async move { - let mut ecg_sync: Option> = None; + let mut ecg_sync: Option> = None; // TODO: Check when done. loop { @@ -567,7 +567,7 @@ impl< todo!("TODO: Error, ECG sync has already been initialized."); } - let mut ecg_sync_ = ECGSyncResponder::new(); + let mut ecg_sync_ = DAGSyncResponder::new(); let ecg_state = self.request_dag_state(&mut ecg_sync_, None).await; diff --git a/ossa-core/src/protocol/store_sc_dag/mod.rs b/ossa-core/src/protocol/store_sc_dag/mod.rs new file mode 100644 index 0000000..2d24cd4 --- /dev/null +++ b/ossa-core/src/protocol/store_sc_dag/mod.rs @@ -0,0 +1 @@ +pub mod v0; diff --git a/ossa-core/src/protocol/store_bft_dag/v0.rs b/ossa-core/src/protocol/store_sc_dag/v0.rs similarity index 91% rename from ossa-core/src/protocol/store_bft_dag/v0.rs rename to ossa-core/src/protocol/store_sc_dag/v0.rs index 745a00f..08886c3 100644 --- a/ossa-core/src/protocol/store_bft_dag/v0.rs +++ b/ossa-core/src/protocol/store_sc_dag/v0.rs @@ -4,18 +4,18 @@ use std::{collections::BTreeSet, fmt::Debug}; use serde::{Deserialize, Serialize}; use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; use tokio::sync::oneshot; -use tracing::debug; +use tracing::{debug, warn}; -use crate::protocol::store_peer::ecg_sync::{DAGStateSubscriber, MsgDAGSyncResponse}; +use crate::protocol::store_peer::dag_sync::{DAGStateSubscriber, MsgDAGSyncResponse}; use crate::store::dag; +use crate::util::Stream; use crate::{ auth::DeviceId, network::protocol::{receive, MiniProtocol}, protocol::store_peer::{ - ecg_sync::{ECGSyncInitiator, ECGSyncResponder, MsgDAGSyncRequest}, - v0::{MsgStoreSync, MsgStoreSyncRequest}, + dag_sync::{DAGSyncInitiator, DAGSyncResponder, MsgDAGSyncRequest}, }, - store::{dag::v0::HeaderId, UntypedStoreCommand}, + store::UntypedStoreCommand, }; /// Miniprotocol to sync the DAG in the strongly consistent BFT consensus protocol. @@ -133,13 +133,13 @@ where type Message = MsgStoreDAGSync; // Has initiative - fn run_server>( + fn run_server>( self, mut stream: S, ) -> impl Future + Send { async move { debug!("StoreDAGSync server running!"); - let mut dag_sync: Option> = None; + let mut dag_sync: Option> = None; let mut recv_chan = self .recv_chan @@ -151,9 +151,9 @@ where None => { // First round of DAG sync, so create and run first round. - // JP: Eventually switch ecg_state to an Arc? + // JP: Eventually switch dag_state to an Arc? let (new_dag_sync, operations) = - ECGSyncInitiator::::run_new( + DAGSyncInitiator::::run_new( &mut stream, &dag_state, ) @@ -162,6 +162,7 @@ where operations } Some(ref mut dag_sync) => { + warn!("TODO: External updates to `dag_state` might make this out of sync?"); // Subsequent rounds of ECG sync. dag_sync.run_round(&mut stream, &dag_state).await } @@ -183,13 +184,13 @@ where } } - fn run_client>( + fn run_client>( self, mut stream: S, ) -> impl Future + Send { async move { debug!("StoreDAGSync client running!"); - let mut dag_sync: Option> = None; + let mut dag_sync: Option> = None; // TODO: Check when done. loop { @@ -203,7 +204,7 @@ where todo!("TODO: Error, SCG sync has already been initialized."); } - let mut dag_sync_ = ECGSyncResponder::new(); + let mut dag_sync_ = DAGSyncResponder::new(); let scg_state = self.request_dag_state(&mut dag_sync_, None).await; @@ -225,6 +226,7 @@ where } } } + debug!("StoreDAGSync client exited"); } } } @@ -236,7 +238,7 @@ where { async fn request_dag_state( &self, - responder: &mut ECGSyncResponder, + responder: &mut DAGSyncResponder, tips: Option>, ) -> dag::UntypedState { debug!("Requesting SCG state"); diff --git a/ossa-core/src/store/bft.rs b/ossa-core/src/store/bft.rs index 03afb3e..58d47ba 100644 --- a/ossa-core/src/store/bft.rs +++ b/ossa-core/src/store/bft.rs @@ -1,6 +1,9 @@ -use std::collections::BTreeSet; +use std::collections::{BTreeMap, BTreeSet}; -use crate::store::dag::{self, Frontier}; +use serde::{Deserialize, Serialize}; +use tokio::sync::watch; + +use crate::{auth::DeviceId, protocol::store_bft_sync::v0::{SignatureId, ThresholdSignatureId}, store::dag::{self, Frontier}, util::Sha256Hash}; /// A round in the BFT strong consistency protocol. pub type Round = u64; @@ -14,11 +17,48 @@ pub trait SCDT { fn is_valid_operation(&self, op: Self::Op) -> bool; } -pub(crate) struct State { +pub(crate) struct State { pub(crate) initial_state: S, // JP: Should this go somewhere else? Potentially `DecryptedState`? - pub(crate) dag_state: dag::State, - /// Frontier of operations that have been comitted in the DAG state. - pub(crate) committed_frontier: Frontier, + pub(crate) dag_state: dag::State, + /// Frontier of operations that have been committed in the DAG state. + pub(crate) committed_frontier: Frontier, +} + +pub(crate) struct BFTState { + // Current round. Starts at 0. + current_round: Round, + /// Round states of BFT sync. + round_states: Vec>, + /// Tips that are blocks (signed by the given peer) from previous rounds. These should all have fully aggregated signatures. + // JP: Are they all from the previous round? + previous_tips: Vec<(Round, DeviceId)>, +} + +impl BFTState { + pub(crate) fn new() -> Self { + Self { + current_round: 0, + round_states: vec![], + previous_tips: vec![], + } + } + + pub(crate) fn current_round(&self) -> u64 { + self.current_round + } + + pub(crate) fn round_states(&self) -> &[RoundState] { + &self.round_states + } + + pub(crate) fn previous_tips(&self) -> &[(Round, DeviceId)] { + &self.previous_tips + } + + pub(crate) fn get_current_tips(&self) -> &[(Round, DeviceId)] { + // Get all active tips + todo!() + } } impl State { @@ -30,3 +70,117 @@ impl State { } } } + +// A signed value. +pub struct Signed { + value: A, + // JP: Generalize this eventually. + signature: ed25519_dalek::Signature, +} + +impl Signed { + pub fn value(&self) -> &A { + &self.value + } +} + +pub(crate) struct RoundState { + // Blocks in this round for each validator. + // A validator can only sign a single block in each round (otherwise, they are detected to be malicious). + blocks: BTreeMap>>, + // 2/3 (?) of (weighted) validators promise to make the block available and validated/approve of operations. + certificates: BTreeMap>, + // 2/3 (1/3?) of (weighted) validators have seen 2/3 (?) of the certificates. + commit_round: ThresholdSigned, +} + +impl RoundState { + pub(crate) fn commit_round(&self) -> &ThresholdSigned { + &self.commit_round + } + + pub(crate) fn blocks(&self) -> &BTreeMap>> { + &self.blocks + } + + pub(crate) fn certificates(&self) -> &BTreeMap> { + &self.certificates + } +} + +#[derive(Debug, Clone, Copy, Eq, Ord, PartialEq, PartialOrd, Serialize, Deserialize)] +pub(crate) struct BlockId(Sha256Hash); + +// A BFT block points to the tips of the DAG and the previous round's certificates. +#[derive(Debug, Serialize)] +pub(crate) struct Block { + round: Round, + // Tips of SC DAG operations + dag_frontier: Frontier, + // Must contain 2/3 of previous round's certificates (or be round 0). + parents: Vec, // Only strong edges, don't need weak edges since blocks point to head of DAG operations anyways + // Who proposed the block. + proposer: DeviceId, +} + +impl Block { + pub fn block_id(&self) -> BlockId { + todo!() + } +} + +impl<'de, SHeaderId> Deserialize<'de> for Block { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de> { + todo!("TODO: Manually implement this since derive is broken for generics") + } +} + +#[derive(Debug, Serialize, Deserialize)] +pub(crate) struct ThresholdSignature(); // TODO +#[derive(Debug, Serialize, Deserialize)] +pub(crate) struct PartialSignature(); // TODO + +#[derive(Debug, Serialize, Deserialize)] +pub(crate) struct CertificateId(Sha256Hash); + +// JP: Do we need this type? Just use the block id? +pub(crate) struct Certificate { + /// The block's id (hash). + block: BlockId, + /// The block's round. + // JP: Is this needed? + round: Round, + /// Who proposed the block. + // JP: Is this needed? + proposer: DeviceId, +} + +// A threshold signed value (or being signed). +pub(crate) enum ThresholdSigned { + // Weighted threshold of validators have signed the value. + ThresholdSignature { + value: A, + signature: ThresholdSignature, + }, + // Weighted threshold hasn't been met yet. + PartialSignatures { + value: A, + signatures: BTreeMap, + }, +} + +impl ThresholdSigned { + // pub fn signature_ids(&self) -> ThresholdSignatureId { + pub fn signature_ids(&self) -> Vec { + todo!() + } +} + +pub(crate) struct RoundComplete { + store_id: Sha256Hash, // StoreId, + round: Round, +} +// JP: Could include hash of previous round's leader block?? Probably not helpful + diff --git a/ossa-core/src/store/mod.rs b/ossa-core/src/store/mod.rs index 83ce2e6..7ea0dcc 100644 --- a/ossa-core/src/store/mod.rs +++ b/ossa-core/src/store/mod.rs @@ -4,6 +4,7 @@ use ossa_typeable::Typeable; use rand::{seq::SliceRandom as _, thread_rng}; use replace_with::replace_with_or_abort; use serde::{Deserialize, Serialize}; +use tokio::sync::watch; use std::fmt::{Debug, Display}; use std::marker::PhantomData; use std::{ @@ -16,7 +17,8 @@ use tokio::sync::{ }; use tracing::{debug, error, warn}; -use crate::store::bft::SCDT; +use crate::protocol::store_bft_sync::v0::StoreBFTSync; +use crate::store::bft::{BFTState, SCDT}; use crate::store::v0::BLOCK_SIZE; use crate::time::ConcretizeTime; use crate::util::merkle_tree::{MerkleTree, Potential}; @@ -26,7 +28,7 @@ use crate::{ network::multiplexer::{run_miniprotocol_async, SpawnMultiplexerTask}, protocol::{ manager::v0::PeerManagerCommand, - store_bft_dag::v0::{StoreDAGSync, StoreSCGSyncCommand}, + store_sc_dag::v0::{StoreDAGSync, StoreSCGSyncCommand}, store_peer::v0::{StoreSync, StoreSyncCommand}, }, store::{ @@ -106,6 +108,8 @@ pub struct State>>, // listeners: Vec>>, + /// State for BFT sync / consensus. + pub(crate) bft_state: watch::Sender>, } // States are: @@ -296,6 +300,7 @@ impl< block_subscribers: BTreeMap::new(), ecg_subscribers: BTreeMap::new(), scg_subscribers: BTreeMap::new(), + bft_state: watch::Sender::new(BFTState::new()), } } @@ -311,6 +316,7 @@ impl< block_subscribers: BTreeMap::new(), ecg_subscribers: BTreeMap::new(), scg_subscribers: BTreeMap::new(), + bft_state: watch::Sender::new(BFTState::new()), } } @@ -627,7 +633,9 @@ impl< let dag_state = sc_state.dag_state.state().clone(); let message = StoreSCGSyncCommand::SCGSyncRequest { dag_state }; debug!("Sending SCG sync request to peer ({})", p.0); - send_command(&mut p.1.scg_status, message) + send_command(&mut p.1.scg_status, message); + + todo!("Send BFT sync requests"); }); } _ => {} @@ -1390,12 +1398,19 @@ async fn manage_peers + Clone + }) }); + let spawn_task_bft = Box::new(move |_party, stream_id, sender, receiver| { + tokio::spawn(async move { + unimplemented!(); + }) + }); + // Send request to peer's manager for stream. let store_id = store.store_id(); let cmd = PeerManagerCommand::RequestStoreSync { store_id, spawn_task_ec, spawn_task_sc, + spawn_task_bft, }; command_chan.send(cmd).expect("TODO"); } @@ -1650,7 +1665,8 @@ pub(crate) async fn run_handler( debug!("Store ECG sync with peer (without initiative) exited.") }) }); - let send_commands_untyped = send_commands_untyped.clone(); + + let send_commands_untyped_ = send_commands_untyped.clone(); let spawn_task_sc: Box = Box::new(move |party, stream_id, sender, receiver| { tokio::spawn(async move { // JP: Maybe this isn't needed??? @@ -1658,15 +1674,28 @@ pub(crate) async fn run_handler( let register_cmd = UntypedStoreCommand::RegisterIncomingPeerSCGSyncing { peer, }; - send_commands_untyped.send(register_cmd).expect("TODO"); + send_commands_untyped_.send(register_cmd).expect("TODO"); // Start miniprotocol as client. - let mp = StoreDAGSync::new_client(peer, send_commands_untyped); + let mp = StoreDAGSync::new_client(peer, send_commands_untyped_); run_miniprotocol_async(mp, true, stream_id, sender, receiver).await; debug!("Store SCG sync with peer (without initiative) exited.") }) }); - Some((spawn_task_ec, spawn_task_sc)) + + let send_commands_untyped_ = send_commands_untyped.clone(); + let bft_state = store.bft_state.subscribe(); + let spawn_task_bft: Box = Box::new(move |party, stream_id, sender, receiver| { + tokio::spawn(async move { + warn!("TODO: Should we tell store we're running?"); + + // Start miniprotocol as client. + let mp = StoreBFTSync::new_client(peer, send_commands_untyped_, bft_state); + run_miniprotocol_async(mp, true, stream_id, sender, receiver).await; + debug!("Store BFT sync with peer (without initiative) exited.") + }) + }); + Some((spawn_task_ec, spawn_task_sc, spawn_task_bft)) } else { debug!("Store is already running"); None @@ -1852,7 +1881,7 @@ pub(crate) enum UntypedStoreCommand, Box)>>, + oneshot::Sender, Box, Box)>>, }, RegisterOutgoingPeerECGSyncing { peer: DeviceId,