diff --git a/Cargo.lock b/Cargo.lock index 009c7203..70ebcca8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2428,9 +2428,9 @@ dependencies = [ [[package]] name = "either" -version = "1.15.0" +version = "1.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719" +checksum = "91622ff5e7162018101f2fea40d6ebf4a78bbe5a49736a2020649edf9693679e" dependencies = [ "serde", ] diff --git a/crates/core/src/aggsigdb/memory.rs b/crates/core/src/aggsigdb/memory.rs new file mode 100644 index 00000000..b2165d2a --- /dev/null +++ b/crates/core/src/aggsigdb/memory.rs @@ -0,0 +1,500 @@ +use crate::{ + aggsigdb::types::{AggSigDB, Error}, + deadline, types, +}; +use std::collections::{HashMap, hash_map::Entry}; +use tokio::sync; +use tokio_util::sync::CancellationToken; + +type Waiters = + HashMap<(types::Duty, types::PubKey), Vec>>>; + +struct MemoryDBActor { + entries: HashMap>>, + waiters: Waiters, + deadliner: deadline::DeadlinerHandle, +} + +impl MemoryDBActor { + async fn run( + &mut self, + mut messages_rx: sync::mpsc::Receiver, + mut expired_rx: sync::mpsc::Receiver, + ct: CancellationToken, + ) { + loop { + tokio::select! { + biased; // We want to evaluate expirations first + + _ = ct.cancelled() => break, // Stop the actor when the cancellation token is triggered. + + Some(duty) = expired_rx.recv() => self.evict(duty), + + msg = messages_rx.recv() => match msg { + None => break, // Stop the actor when all handles have been dropped. + Some(msg) => match msg { + Message::Store { + duty, + set, + response, + } => { + let result = self.store(duty, set).await; + let _ = response.send(result); + } + Message::WaitFor { + duty, + pub_key, + response, + } => { + if let Some(found) = self.get(&duty, &pub_key) { + let _ = response.send(found); + } else { + self.waiters + .entry((duty, pub_key)) + .or_default() + .push(response); + } + } + } + } + } + + // After each message, trim waiters in case that the futures are dropped. + self.trim_readers(); + } + } + + async fn store(&mut self, duty: types::Duty, set: types::SignedDataSet) -> Result<(), Error> { + if set.is_empty() { + return Ok(()); + } + + // TODO(charon): Distinguish between no deadline supported vs already expired. + let _ = self.deadliner.add(duty.clone()).await; + + // NOTE: Partial insertions on error match the semantics of Charon. + let for_duty = self.entries.entry(duty.clone()).or_default(); + for (pub_key, signed_data) in set.into_iter() { + match for_duty.entry(pub_key) { + Entry::Vacant(slot) => { + slot.insert(signed_data.clone()); + + let k = (duty.clone(), pub_key); + if let Some((_, waiters)) = self.waiters.remove_entry(&k) { + for w in waiters { + if !w.is_closed() { + let _ = w.send(signed_data.clone()); + } + } + }; + } + Entry::Occupied(slot) if slot.get() != &signed_data => { + return Err(Error::MismatchingData); + } + Entry::Occupied(_) => {} + } + } + + Ok(()) + } + + fn get( + &self, + duty: &types::Duty, + pub_key: &types::PubKey, + ) -> Option> { + self.entries + .get(duty) + .and_then(|for_duty| for_duty.get(pub_key)) + .cloned() + } + + fn evict(&mut self, duty: types::Duty) { + self.entries.remove(&duty); + } + + fn trim_readers(&mut self) { + self.waiters.retain(|_, waiters| { + waiters.retain(|w| !w.is_closed()); + + !waiters.is_empty() + }); + } +} + +enum Message { + Store { + duty: types::Duty, + set: types::SignedDataSet, + response: sync::oneshot::Sender>, + }, + WaitFor { + duty: types::Duty, + pub_key: types::PubKey, + response: sync::oneshot::Sender>, + }, +} + +/// An in-memory implementation of AggSigDB. +/// +/// Share an instance by cloning. Cloning is cheap and creates a new reference +/// to the same underlying data. +#[derive(Clone)] +pub struct MemoryDBHandle { + sender: sync::mpsc::Sender, +} + +impl MemoryDBHandle { + /// Creates a new in-memory AggSigDB instance, and get a handle to it. + /// + /// The underlying instance gets dropped when all handles are dropped. + pub fn new( + deadliner: deadline::DeadlinerHandle, + expired_rx: sync::mpsc::Receiver, + ct: CancellationToken, + ) -> Self { + let (sender, receiver) = sync::mpsc::channel(100); + let mut actor = MemoryDBActor { + entries: HashMap::new(), + waiters: HashMap::new(), + deadliner, + }; + + tokio::spawn(async move { actor.run(receiver, expired_rx, ct).await }); + + Self { sender } + } +} + +#[async_trait::async_trait] +impl AggSigDB for MemoryDBHandle { + async fn store(&self, duty: types::Duty, set: types::SignedDataSet) -> Result<(), Error> { + let (response_tx, response_rx) = sync::oneshot::channel(); + let msg = Message::Store { + duty, + set, + response: response_tx, + }; + self.sender.send(msg).await.map_err(|_| Error::Terminated)?; + response_rx.await.map_err(|_| Error::Terminated)? + } + + async fn wait_for( + &self, + duty: types::Duty, + pub_key: types::PubKey, + ) -> Result, Error> { + let (response_tx, response_rx) = sync::oneshot::channel(); + let msg = Message::WaitFor { + duty, + pub_key, + response: response_tx, + }; + self.sender.send(msg).await.map_err(|_| Error::Terminated)?; + response_rx.await.map_err(|_| Error::Terminated) + } +} + +#[cfg(test)] +mod tests { + use crate::{ + aggsigdb::{ + memory::MemoryDBHandle, + types::{AggSigDB, Error}, + }, + deadline, + signeddata::SignedDataError, + types::{Duty, PubKey, Signature, SignedData, SignedDataSet, SlotNumber}, + }; + use tokio::sync; + use tokio_util::sync::CancellationToken; + + /// Some mock signed data type for testing. + #[derive(Debug, Clone, PartialEq, Eq)] + struct MockSignedData(u8); + + impl SignedData for MockSignedData { + fn signature(&self) -> Result { + Ok([self.0; 96]) + } + + fn set_signature(&self, _signature: Signature) -> Result { + Ok(self.clone()) + } + + fn set_signature_boxed( + &self, + signature: Signature, + ) -> Result, SignedDataError> { + Ok(Box::new(self.set_signature(signature)?)) + } + + fn message_root(&self) -> Result<[u8; 32], SignedDataError> { + Ok([self.0; 32]) + } + } + + impl MockSignedData { + fn singleton(&self, pub_key: PubKey) -> SignedDataSet { + let mut set = SignedDataSet::new(); + set.insert(pub_key, self.boxed()); + set + } + + fn boxed(&self) -> Box { + Box::new(self.clone()) + } + } + + /// Create a test deadline handle and an expiration channel. + fn test_deadline() -> ( + sync::mpsc::Sender, + deadline::DeadlinerHandle, + sync::mpsc::Receiver, + ) { + let (tx, rx) = sync::mpsc::channel(1); + let deadliner = deadline::DeadlinerHandle::always(deadline::AddOutcome::Scheduled); + + (tx, deadliner, rx) + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn write_read() { + let (_, deadliner, expiration_rx) = test_deadline(); + let store = MemoryDBHandle::new(deadliner, expiration_rx, CancellationToken::new()); + + let duty = Duty::new_proposer_duty(SlotNumber::new(10)); + let pub_key = PubKey::new([7u8; 48]); + let signed_data = MockSignedData(42); + + store + .store(duty.clone(), signed_data.singleton(pub_key)) + .await + .unwrap(); + + let result = store.wait_for(duty, pub_key).await.unwrap(); + assert_eq!(result, signed_data.boxed()); + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn write_unblocks() { + let (_, deadliner, expiration_rx) = test_deadline(); + let store = MemoryDBHandle::new(deadliner, expiration_rx, CancellationToken::new()); + + let duty = Duty::new_attester_duty(SlotNumber::new(1)); + let pub_key = PubKey::new([7u8; 48]); + let signed_data = MockSignedData(0); + + let reader = { + let store = store.clone(); + let duty = duty.clone(); + + tokio::spawn(async move { store.wait_for(duty, pub_key).await }) + }; + + // Give the reader a chance to reach `notified.await` before we store, so the + // test actually exercises the notify wakeup path rather than the + // fast-path lookup. + tokio::task::yield_now().await; + assert!(!reader.is_finished(), "wait_for should block until store"); + + let write = store.store(duty, signed_data.singleton(pub_key)).await; + let read = reader.await.unwrap().unwrap(); + + assert!(write.is_ok()); + assert_eq!(read, signed_data.boxed()); + } + + #[tokio::test] + async fn write_while_cancelled() { + let ct = CancellationToken::new(); + + let (_, deadliner, expiration_rx) = test_deadline(); + let store = MemoryDBHandle::new(deadliner, expiration_rx, ct.clone()); + + let duty = Duty::new_proposer_duty(SlotNumber::new(10)); + let pub_key = PubKey::new([7u8; 48]); + let signed_data = MockSignedData(42); + + ct.cancel(); + + let res = store.store(duty, signed_data.singleton(pub_key)).await; + assert!(matches!(res, Err(Error::Terminated))); + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn cannot_overwrite() { + let (_, deadliner, expiration_rx) = test_deadline(); + let store = MemoryDBHandle::new(deadliner, expiration_rx, CancellationToken::new()); + + let duty = Duty::new_proposer_duty(SlotNumber::new(10)); + let pub_key = PubKey::new([7u8; 48]); + let first = MockSignedData(1); + let second = MockSignedData(2); + + store + .store(duty.clone(), first.singleton(pub_key)) + .await + .unwrap(); + + let err = store + .store(duty, second.singleton(pub_key)) + .await + .expect_err("storing mismatching data should fail"); + assert!(matches!(err, super::Error::MismatchingData)); + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn write_idempotent() { + let (_, deadliner, expiration_rx) = test_deadline(); + let store = MemoryDBHandle::new(deadliner, expiration_rx, CancellationToken::new()); + + let duty = Duty::new_proposer_duty(SlotNumber::new(10)); + let pub_key = PubKey::new([7u8; 48]); + let signed_data = MockSignedData(42); + + store + .store(duty.clone(), signed_data.singleton(pub_key)) + .await + .unwrap(); + store + .store(duty.clone(), signed_data.singleton(pub_key)) + .await + .unwrap(); + + let result = store.wait_for(duty, pub_key).await.unwrap(); + assert_eq!(result, signed_data.boxed()); + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn write_evict_wait_then_write() { + let (expiration_tx, deadliner, expiration_rx) = test_deadline(); + + let store = MemoryDBHandle::new(deadliner.clone(), expiration_rx, CancellationToken::new()); + + let duty = Duty::new_attester_duty(SlotNumber::new(1)); + let pub_key = PubKey::new([7u8; 48]); + let first = MockSignedData(1); + let second = MockSignedData(2); + + store + .store(duty.clone(), first.singleton(pub_key)) + .await + .unwrap(); + + // Queue the expiration. Immediately run a dummy store, and by the time it + // compeltes we know that the expiration has been processed. + expiration_tx.send(duty.clone()).await.unwrap(); + { + let dummy = Duty::new_attester_duty(SlotNumber::new(u64::MAX)); + store + .store(dummy, MockSignedData(0).singleton(pub_key)) + .await + .unwrap(); + } + + let reader = { + let store = store.clone(); + let duty = duty.clone(); + + tokio::spawn(async move { store.wait_for(duty, pub_key).await }) + }; + + // The eviction has been applied, so wait_for has no entry to return and must + // block. + tokio::task::yield_now().await; + assert!(!reader.is_finished(), "wait_for should block until store"); + + // Store new data for the same duty and pubkey. The reader should wake up and + // return the new data, not the evicted data. + store.store(duty, second.singleton(pub_key)).await.unwrap(); + + let read = reader.await.unwrap().unwrap(); + assert_eq!(read, second.boxed()); + assert_ne!(read, first.boxed()); + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 4)] + async fn write_unblocks_many() { + const N: usize = 4; + + let (_, deadliner, expiration_rx) = test_deadline(); + let store = MemoryDBHandle::new(deadliner, expiration_rx, CancellationToken::new()); + let duty = Duty::new_proposer_duty(SlotNumber::new(10)); + let pub_key = PubKey::new([7u8; 48]); + let signed_data = MockSignedData(42); + + let readers: Vec<_> = (0..N) + .map(|_| { + let store = store.clone(); + let duty = duty.clone(); + tokio::spawn(async move { store.wait_for(duty, pub_key).await }) + }) + .collect(); + + // Give readers a chance to reach `notified.await` before the store. + tokio::task::yield_now().await; + for reader in &readers { + assert!( + !reader.is_finished(), + "all readers should block until store" + ); + } + + // A single store unblocks all readers. + store + .store(duty, signed_data.singleton(pub_key)) + .await + .unwrap(); + + for reader in readers { + let read = reader.await.unwrap().unwrap(); + assert_eq!(read, signed_data.boxed()); + } + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn unrelated_write_does_not_unblock() { + let (_, deadliner, expiration_rx) = test_deadline(); + let store = MemoryDBHandle::new(deadliner, expiration_rx, CancellationToken::new()); + + let duty_a = Duty::new_proposer_duty(SlotNumber::new(10)); + let data_a = MockSignedData(1); + + let duty_b = Duty::new_attester_duty(SlotNumber::new(20)); + let data_b = MockSignedData(2); + + let pub_key = PubKey::new([7u8; 48]); + + let reader = { + let store = store.clone(); + let duty_a = duty_a.clone(); + tokio::spawn(async move { store.wait_for(duty_a, pub_key).await }) + }; + + tokio::task::yield_now().await; + assert!(!reader.is_finished(), "reader should block initially"); + + // Storing an unrelated key does not affect readers. + store + .store(duty_b, data_b.singleton(pub_key)) + .await + .unwrap(); + + tokio::task::yield_now().await; + assert!( + !reader.is_finished(), + "reader should re-block after unrelated store" + ); + + // Storing the actual key unblocks the reader. + store + .store(duty_a, data_a.singleton(pub_key)) + .await + .unwrap(); + + let read = reader.await.unwrap().unwrap(); + assert_eq!(read, data_a.boxed()); + assert_ne!(read, data_b.boxed()); + } +} diff --git a/crates/core/src/aggsigdb/mod.rs b/crates/core/src/aggsigdb/mod.rs new file mode 100644 index 00000000..1c5a4f3b --- /dev/null +++ b/crates/core/src/aggsigdb/mod.rs @@ -0,0 +1,5 @@ +/// Type definitions and traits for the AggSigDB. +pub mod types; + +/// Memory implementation of the AggSigDB. +pub mod memory; diff --git a/crates/core/src/aggsigdb/types.rs b/crates/core/src/aggsigdb/types.rs new file mode 100644 index 00000000..e4ecba6c --- /dev/null +++ b/crates/core/src/aggsigdb/types.rs @@ -0,0 +1,36 @@ +use crate::types; + +/// Errors for AggSigDB operations. +#[derive(Debug, thiserror::Error)] +pub enum Error { + /// Data for the same duty and public key already exists but does not match + /// the new data. + #[error("Mismatching data")] + MismatchingData, + + /// The request cannot be processed because the instance has been + /// terminated. + #[error("The instance has been terminated")] + Terminated, +} + +/// A persistent store for aggregated signed duty data. +#[async_trait::async_trait] +pub trait AggSigDB { + /// Stores aggregated signed duty data set. + async fn store(&self, duty: types::Duty, data: types::SignedDataSet) -> Result<(), Error>; + + /// Blocks and returns the aggregated signed duty data when available. + /// + /// Might block indefinitely if no data is ever stored for the given duty + /// and public key. + /// + /// To avoid blocking indefinitely, consider using a timeout, + /// [`CancellationToken`] or racing using `tokio::select!` against other + /// events. + async fn wait_for( + &self, + duty: types::Duty, + pub_key: types::PubKey, + ) -> Result, Error>; +} diff --git a/crates/core/src/deadline/mod.rs b/crates/core/src/deadline/mod.rs index fbf9ed85..dca033c2 100644 --- a/crates/core/src/deadline/mod.rs +++ b/crates/core/src/deadline/mod.rs @@ -144,6 +144,24 @@ impl DeadlinerHandle { // `FailedToCompute` if the task dropped the sender (shutdown race). response_rx.await.unwrap_or(AddOutcome::FailedToCompute) } + + /// Create a handle that always returns the given [`AddOutcome`]. + #[cfg(test)] + pub fn always(expected: AddOutcome) -> Self { + let (tx, mut rx) = mpsc::channel(1); + let handle = DeadlinerHandle { + cancel_token: CancellationToken::new(), + input_tx: tx, + }; + + tokio::spawn(async move { + while let Some(input) = rx.recv().await { + let _ = input.response_tx.send(expected); + } + }); + + handle + } } /// Owned state of the background task that drives a [`DeadlinerHandle`]'s diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs index 58b765bf..6cd84def 100644 --- a/crates/core/src/lib.rs +++ b/crates/core/src/lib.rs @@ -36,7 +36,11 @@ pub mod validatorapi; /// SigAgg — threshold BLS signature aggregation. pub mod sigagg; +/// Implementations of AggSigDB. +pub mod aggsigdb; + mod parsigex_codec; + // SSZ codec operates on compile-time-constant byte sizes and offsets. // Arithmetic is bounded and casts from `usize` to `u32` are safe because all // sizes are well below `u32::MAX`. diff --git a/crates/core/src/types.rs b/crates/core/src/types.rs index 2707e5ef..0d85d9c3 100644 --- a/crates/core/src/types.rs +++ b/crates/core/src/types.rs @@ -728,53 +728,8 @@ impl TryFrom<(&DutyType, &pbcore::ParSignedDataSet)> for ParSignedDataSet { } } -/// SignedDataSet is a set of signed duty data. -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct SignedDataSet(HashMap); - -impl Default for SignedDataSet -where - T: SignedData, -{ - fn default() -> Self { - Self(HashMap::default()) - } -} - -impl SignedDataSet -where - T: SignedData, -{ - /// Create a new signed data set. - pub fn new() -> Self { - Self::default() - } - - /// Get a signed data by public key. - pub fn get(&self, pub_key: &PubKey) -> Option<&T> { - self.0.get(pub_key) - } - - /// Insert a signed data. - pub fn insert(&mut self, pub_key: PubKey, signed_data: T) { - self.0.insert(pub_key, signed_data); - } - - /// Remove a signed data by public key. - pub fn remove(&mut self, pub_key: &PubKey) -> Option { - self.0.remove(pub_key) - } - - /// Inner signed data set. - pub fn inner(&self) -> &HashMap { - &self.0 - } - - /// Inner signed data set. - pub fn inner_mut(&mut self) -> &mut HashMap { - &mut self.0 - } -} +/// A set of signed duty data. +pub type SignedDataSet = HashMap>; /// Slot struct #[derive(Debug, Clone, PartialEq, Eq)] @@ -1056,6 +1011,12 @@ mod tests { #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] struct MockSignedData; + impl MockSignedData { + fn boxed(&self) -> Box { + Box::new(self.clone()) + } + } + impl SignedData for MockSignedData { fn signature(&self) -> Result { Ok([42u8; SIGNATURE_LENGTH]) @@ -1095,10 +1056,11 @@ mod tests { #[test] fn signed_data_set() { let mut signed_data_set = SignedDataSet::new(); - signed_data_set.insert(PubKey::new([42u8; PK_LEN]), MockSignedData); + signed_data_set.insert(PubKey::new([42u8; PK_LEN]), MockSignedData.boxed()); + let expected = MockSignedData.boxed(); assert_eq!( signed_data_set.get(&PubKey::new([42u8; PK_LEN])), - Some(&MockSignedData) + Some(&expected) ); }