Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion ossa-core/src/network/multiplexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,6 @@ struct MiniprotocolState {
sender: mpsc::Sender<BytesMut>,
}

// JP: TODO: This O probably isn't needed.
pub(crate) async fn run_miniprotocol_async<P: MiniProtocol>(
p: P,
is_client: bool,
Expand Down
4 changes: 2 additions & 2 deletions ossa-core/src/network/protocol/ecg_sync/v0/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,11 @@ pub type ECGSync = Send<(), Eps>; // TODO
//
// Client:
//
// Send all headers he have that they don't (batched).
// Send all headers we have that they don't (batched).
//
// Client:
//
// Send all headers he have that they don't (batched).
// Send all headers we have that they don't (batched).
//

/// The maximum number of `have` hashes that can be sent in each message.
Expand Down
51 changes: 44 additions & 7 deletions ossa-core/src/protocol/manager/v0.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,10 +158,11 @@ impl<
debug!("Manager command channel closed.");
// TODO: Do something here?
}
Some(PeerManagerCommand::RequestStoreSync { store_id, spawn_task_ec, spawn_task_sc }) => {
Some(PeerManagerCommand::RequestStoreSync { store_id, spawn_task_ec, spawn_task_sc, spawn_task_bft }) => {
let ec_stream_id = self.next_stream_id();
let sc_stream_id = self.next_stream_id();
let _response = self.run_request_new_stream_server(&mut stream, store_id, ec_stream_id, spawn_task_ec, sc_stream_id, spawn_task_sc).await;
let bft_stream_id = self.next_stream_id();
let _response = self.run_request_new_stream_server(&mut stream, store_id, ec_stream_id, spawn_task_ec, sc_stream_id, spawn_task_sc, bft_stream_id, spawn_task_bft).await;
debug!("Requested to sync store with peer.");
}
}
Expand Down Expand Up @@ -201,6 +202,7 @@ impl<
store_id,
ec_stream_id,
sc_stream_id,
bft_stream_id,
} => {
debug!(
"Received MsgManagerRequest::CreateStoreStream: {ec_stream_id}, {store_id:?}"
Expand All @@ -210,6 +212,7 @@ impl<
store_id,
ec_stream_id,
sc_stream_id,
bft_stream_id,
)
.await;
}
Expand Down Expand Up @@ -247,12 +250,14 @@ impl<
store_id: StoreId,
ec_stream_id: StreamId,
sc_stream_id: StreamId,
bft_stream_id: StreamId,
) {
let accept = {
// Check if stream is valid (it can be allocated by peer and is available).
let is_valid_id_ec = self.is_valid_stream_id(false, &ec_stream_id);
let is_valid_id_sc = self.is_valid_stream_id(false, &sc_stream_id);
if !is_valid_id_ec || !is_valid_id_sc {
let is_valid_id_bft = self.is_valid_stream_id(false, &bft_stream_id);
if !is_valid_id_ec || !is_valid_id_sc || !is_valid_id_bft {
debug!("Peer sent invalid stream id.");
Err(MsgManagerError::InvalidStreamId)
} else {
Expand All @@ -273,7 +278,7 @@ impl<
.expect("TODO");

let spawn_task = rx.await.expect("TODO");
if let Some((ec_spawn_task, sc_spawn_task)) = spawn_task {
if let Some((ec_spawn_task, sc_spawn_task, bft_spawn_task)) = spawn_task {
// Tell multiplexer to create EC miniprotocol.
let is_running_ec = self
.create_multiplexer_stream(ec_stream_id, ec_spawn_task)
Expand All @@ -285,15 +290,29 @@ impl<
.create_multiplexer_stream(sc_stream_id, sc_spawn_task)
.await;

if !is_running_sc {
if is_running_sc {
// Tell multiplexer to create BFT miniprotocol.
let is_running_bft = self
.create_multiplexer_stream(bft_stream_id, bft_spawn_task)
.await;
if !is_running_bft {
error!(
"Failed to create miniprotocol stream to sync BFT store."
);
panic!("TODO: Shutdown EC+SC miniprotocol...");
}

Ok(is_running_bft)
} else {
error!(
"Failed to create miniprotocol stream to strongly sync store."
);
panic!("TODO: Shutdown EC miniprotocol...");
}

Ok(is_running_sc)
} else {
error!(
"Failed to create miniprotocol stream to weakly sync store."
);
Ok(false)
}
} else {
Expand Down Expand Up @@ -321,12 +340,15 @@ impl<
ec_spawn_task: Box<SpawnMultiplexerTask>,
sc_stream_id: StreamId,
sc_spawn_task: Box<SpawnMultiplexerTask>,
bft_stream_id: StreamId,
bft_spawn_task: Box<SpawnMultiplexerTask>,
) {
// Send request message.
let req = MsgManagerRequest::CreateStoreStream {
store_id,
ec_stream_id,
sc_stream_id,
bft_stream_id,
};
send(stream, req).await.expect("TODO");

Expand Down Expand Up @@ -366,6 +388,18 @@ impl<
"TODO: Send shutdown for this miniprotocol and restore status to Known."
);
}

// Tell multiplexer to create BFT miniprotocol.
let is_running = self
.create_multiplexer_stream(bft_stream_id, bft_spawn_task)
.await;

if !is_running {
error!("Failed to create miniprotocol stream to sync BFT store.");
panic!(
"TODO: Send shutdown for this miniprotocol and restore status to Known."
);
}
}
}
}
Expand Down Expand Up @@ -544,6 +578,8 @@ pub(crate) enum MsgManagerRequest<StoreId> {
ec_stream_id: StreamId,
/// Strongly consistent stream id.
sc_stream_id: StreamId,
/// BFT stream id.
bft_stream_id: StreamId,
},
}

Expand Down Expand Up @@ -623,5 +659,6 @@ pub(crate) enum PeerManagerCommand<StoreId> {
store_id: StoreId,
spawn_task_ec: Box<SpawnMultiplexerTask>,
spawn_task_sc: Box<SpawnMultiplexerTask>,
spawn_task_bft: Box<SpawnMultiplexerTask>,
},
}
3 changes: 2 additions & 1 deletion ossa-core/src/protocol/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@ use crate::{

pub mod heartbeat;
pub mod manager;
pub mod store_bft_dag;
pub mod store_peer;
pub mod store_sc_dag;
pub mod store_bft_sync;
pub mod v0;

pub(crate) struct MiniProtocolArgs<StoreId, Hash, SHeaderId, SHeader, THeaderId, THeader> {
Expand Down
Loading
Loading