Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ tree_hash_derive = "0.12"
tar = "0.4"
flate2 = "1.1"
wiremock = "0.6"
tower = "0.5"
sysinfo = "0.33"
quick-xml = { version = "0.39", features = ["serialize"] }

Expand Down
1 change: 1 addition & 0 deletions crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ pluto-testutil.workspace = true
pluto-tracing.workspace = true
tokio = { workspace = true, features = ["test-util"] }
wiremock.workspace = true
tower = { workspace = true, features = ["util"] }

[build-dependencies]
pluto-build-proto.workspace = true
Expand Down
140 changes: 115 additions & 25 deletions crates/core/src/dutydb/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
//!
//! Equivalent to charon/core/dutydb/memory.go.

use std::collections::HashMap;
use std::collections::{HashMap, HashSet};

use pluto_eth2api::{
spec::{altair, phase0},
Expand Down Expand Up @@ -49,6 +49,12 @@ pub enum Error {
#[error("dutydb shutdown: query could not be answered")]
Shutdown,

/// The awaited duty was evicted before its unsigned data became
/// available. Distinct from `Shutdown` so callers can map this to a
/// timeout-style error rather than a service-down error.
#[error("dutydb: awaited duty expired before data was stored")]
AwaitDutyExpired,

/// Two validators share the same `(slot, committee_index, valIdx)` with
/// different public keys.
#[error(
Expand Down Expand Up @@ -177,6 +183,17 @@ struct ContribKey {
root: phase0::Root,
}

/// Per-poll outcome handed back by an `await_data` lookup closure.
enum Lookup<V> {
/// The awaited value is now present — return it to the caller.
Found(V),
/// The awaited duty has been evicted; the lookup will never succeed.
/// `await_data` returns [`Error::AwaitDutyExpired`].
Evicted,
/// Neither stored nor evicted yet — park on the notify and retry.
Pending,
}

struct State {
attestation_duties: HashMap<AttKey, phase0::AttestationData>,
attestation_pub_keys: HashMap<PkKey, PubKey>,
Expand All @@ -190,6 +207,18 @@ struct State {
contrib_duties: HashMap<ContribKey, altair::SyncCommitteeContribution>,
contrib_keys_by_slot: HashMap<u64, Vec<ContribKey>>,

/// Slots whose attester duty has been evicted by the deadliner. Lets
/// `await_attestation` return `AwaitDutyExpired` immediately when the
/// awaited slot is gone, rather than spinning on every `store()` until
/// the request-level timeout fires.
evicted_attestation_slots: HashSet<u64>,
/// Slots whose proposer duty has been evicted.
evicted_proposer_slots: HashSet<u64>,
/// Aggregation roots whose duty has been evicted.
evicted_aggregation_keys: HashSet<AggKey>,
/// Sync contribution keys whose duty has been evicted.
evicted_contrib_keys: HashSet<ContribKey>,

deadliner_rx: tokio::sync::mpsc::Receiver<Duty>,
}

Expand Down Expand Up @@ -225,6 +254,10 @@ impl MemDB {
aggregation_keys_by_slot: HashMap::new(),
contrib_duties: HashMap::new(),
contrib_keys_by_slot: HashMap::new(),
evicted_attestation_slots: HashSet::new(),
evicted_proposer_slots: HashSet::new(),
evicted_aggregation_keys: HashSet::new(),
evicted_contrib_keys: HashSet::new(),
deadliner_rx,
}),
attestation_notify: Notify::new(),
Expand Down Expand Up @@ -272,7 +305,6 @@ impl MemDB {
Some(UnsignedDutyData::Proposal(p)) => state.store_proposal(p)?,
Some(_) => return Err(Error::InvalidVersionedProposal),
}
self.proposer_notify.notify_waiters();
}
DutyType::Attester => {
for (pubkey, data) in &unsigned_set {
Expand All @@ -282,7 +314,6 @@ impl MemDB {
};
state.store_attestation(*pubkey, att)?;
}
self.attestation_notify.notify_waiters();
}
DutyType::Aggregator => {
for data in unsigned_set.values() {
Expand All @@ -292,7 +323,6 @@ impl MemDB {
};
state.store_agg_attestation(agg)?;
}
self.aggregation_notify.notify_waiters();
}
DutyType::SyncContribution => {
for data in unsigned_set.values() {
Expand All @@ -302,24 +332,54 @@ impl MemDB {
};
state.store_sync_contribution(contrib)?;
}
self.contrib_notify.notify_waiters();
}
_ => return Err(Error::UnsupportedDutyType),
}

// Drain all expired duties that the deadliner has sent.
// Wake the matching notify for the duty we just stored, plus
// anything we drain below. `notify_waiters` is cheap if no one is
// parked and just bumps a counter, so calling it under the write
// lock is harmless — woken tasks block on `state.read()` until we
// drop.
self.wake(duty.duty_type);

// Drain all expired duties that the deadliner has sent. Waiters
// whose duty just expired need to see `Lookup::Evicted` and exit,
// not re-park — so we wake the matching notify after each eviction.
while let Ok(expired) = state.deadliner_rx.try_recv() {
let duty_type = expired.duty_type.clone();
state.delete_duty(expired)?;
self.wake(duty_type);
}

Ok(())
}

/// Wakes the [`Notify`] paired with `duty_type`. No-op for duty types
/// the DB doesn't track (e.g. `Exit`, `BuilderRegistration`).
fn wake(&self, duty_type: DutyType) {
let notify = match duty_type {
DutyType::Proposer => &self.proposer_notify,
DutyType::Attester => &self.attestation_notify,
DutyType::Aggregator => &self.aggregation_notify,
DutyType::SyncContribution => &self.contrib_notify,
_ => return,
};
notify.notify_waiters();
}

/// Blocks until a proposal for the given slot is available, then returns
/// it.
pub async fn await_proposal(&self, slot: u64) -> Result<VersionedProposal> {
self.await_data(&self.proposer_notify, |s| s.proposer_duties.get(&slot))
.await
self.await_data(&self.proposer_notify, |s| {
if let Some(v) = s.proposer_duties.get(&slot) {
Lookup::Found(v.clone())
} else if s.evicted_proposer_slots.contains(&slot) {
Lookup::Evicted
} else {
Lookup::Pending
}
})
.await
}

/// Blocks until attestation data for the given slot and committee index is
Expand All @@ -333,8 +393,16 @@ impl MemDB {
slot,
committee_index,
};
self.await_data(&self.attestation_notify, |s| s.attestation_duties.get(&key))
.await
self.await_data(&self.attestation_notify, |s| {
if let Some(v) = s.attestation_duties.get(&key) {
Lookup::Found(v.clone())
} else if s.evicted_attestation_slots.contains(&key.slot) {
Lookup::Evicted
} else {
Lookup::Pending
}
})
.await
}

/// Blocks until an aggregated attestation for the given slot and
Expand All @@ -347,7 +415,13 @@ impl MemDB {
root: attestation_root,
};
self.await_data(&self.aggregation_notify, |s| {
s.aggregation_duties.get(&key).map(|a| &a.0)
if let Some(v) = s.aggregation_duties.get(&key) {
Lookup::Found(v.0.clone())
} else if s.evicted_aggregation_keys.contains(&key) {
Lookup::Evicted
} else {
Lookup::Pending
}
})
.await
}
Expand All @@ -365,31 +439,43 @@ impl MemDB {
subcommittee_index,
root: beacon_block_root,
};
self.await_data(&self.contrib_notify, |s| s.contrib_duties.get(&key))
.await
self.await_data(&self.contrib_notify, |s| {
if let Some(v) = s.contrib_duties.get(&key) {
Lookup::Found(v.clone())
} else if s.evicted_contrib_keys.contains(&key) {
Lookup::Evicted
} else {
Lookup::Pending
}
})
.await
}

// A single Notify per duty type wakes all waiters on every store, not only
// those whose key matches. The number of concurrent waiters per duty type
// is small (one per validator), so the extra wakeups are cheap. A keyed
// notify (HashMap<Key, Sender>) would avoid them but adds complexity that
// isn't worth it here.
//
// `delete_duty` also wakes the notify so waiters whose duty just expired
// exit immediately via the `Lookup::Evicted` branch, instead of parking
// for another `notify_waiters` call or for the per-request timeout in
// the caller.
async fn await_data<V>(
&self,
notify: &Notify,
lookup: impl for<'s> Fn(&'s State) -> Option<&'s V>,
) -> Result<V>
where
V: Clone,
{
lookup: impl Fn(&State) -> Lookup<V>,
) -> Result<V> {
loop {
let notified = notify.notified();
tokio::pin!(notified);

{
let state = self.state.read().await;
if let Some(v) = lookup(&state) {
return Ok(v.clone());
match lookup(&state) {
Lookup::Found(v) => return Ok(v),
Lookup::Evicted => return Err(Error::AwaitDutyExpired),
Lookup::Pending => {}
}
}

Expand Down Expand Up @@ -577,6 +663,7 @@ impl State {
match duty.duty_type {
DutyType::Proposer => {
self.proposer_duties.remove(&slot);
self.evicted_proposer_slots.insert(slot);
}
DutyType::BuilderProposer => return Err(Error::DeprecatedDutyBuilderProposer),
DutyType::Attester => {
Expand All @@ -589,19 +676,22 @@ impl State {
});
}
}
self.evicted_attestation_slots.insert(slot);
}
DutyType::Aggregator => {
if let Some(keys) = self.aggregation_keys_by_slot.remove(&slot) {
for key in keys {
self.aggregation_duties.remove(&key);
for key in &keys {
self.aggregation_duties.remove(key);
}
self.evicted_aggregation_keys.extend(keys);
}
}
DutyType::SyncContribution => {
if let Some(keys) = self.contrib_keys_by_slot.remove(&slot) {
for key in keys {
self.contrib_duties.remove(&key);
for key in &keys {
self.contrib_duties.remove(key);
}
self.evicted_contrib_keys.extend(keys);
}
}
_ => return Err(Error::UnknownDutyType),
Expand Down
Loading
Loading