diff --git a/Cargo.lock b/Cargo.lock index 009c7203..e5d24398 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5624,6 +5624,7 @@ dependencies = [ "thiserror 2.0.18", "tokio", "tokio-util", + "tower", "tracing", "tree_hash", "vise", diff --git a/Cargo.toml b/Cargo.toml index ac569187..bd7beb79 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -106,6 +106,7 @@ tree_hash_derive = "0.12" tar = "0.4" flate2 = "1.1" wiremock = "0.6" +tower = "0.5" sysinfo = "0.33" quick-xml = { version = "0.39", features = ["serialize"] } diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index adf5f88c..f03f60d6 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -55,6 +55,7 @@ pluto-testutil.workspace = true pluto-tracing.workspace = true tokio = { workspace = true, features = ["test-util"] } wiremock.workspace = true +tower = { workspace = true, features = ["util"] } [build-dependencies] pluto-build-proto.workspace = true diff --git a/crates/core/src/dutydb/memory.rs b/crates/core/src/dutydb/memory.rs index 01a68b86..49b107df 100644 --- a/crates/core/src/dutydb/memory.rs +++ b/crates/core/src/dutydb/memory.rs @@ -2,7 +2,7 @@ //! //! Equivalent to charon/core/dutydb/memory.go. -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use pluto_eth2api::{ spec::{altair, phase0}, @@ -49,6 +49,12 @@ pub enum Error { #[error("dutydb shutdown: query could not be answered")] Shutdown, + /// The awaited duty was evicted before its unsigned data became + /// available. Distinct from `Shutdown` so callers can map this to a + /// timeout-style error rather than a service-down error. + #[error("dutydb: awaited duty expired before data was stored")] + AwaitDutyExpired, + /// Two validators share the same `(slot, committee_index, valIdx)` with /// different public keys. #[error( @@ -177,6 +183,17 @@ struct ContribKey { root: phase0::Root, } +/// Per-poll outcome handed back by an `await_data` lookup closure. +enum Lookup { + /// The awaited value is now present — return it to the caller. + Found(V), + /// The awaited duty has been evicted; the lookup will never succeed. + /// `await_data` returns [`Error::AwaitDutyExpired`]. + Evicted, + /// Neither stored nor evicted yet — park on the notify and retry. + Pending, +} + struct State { attestation_duties: HashMap, attestation_pub_keys: HashMap, @@ -190,6 +207,18 @@ struct State { contrib_duties: HashMap, contrib_keys_by_slot: HashMap>, + /// Slots whose attester duty has been evicted by the deadliner. Lets + /// `await_attestation` return `AwaitDutyExpired` immediately when the + /// awaited slot is gone, rather than spinning on every `store()` until + /// the request-level timeout fires. + evicted_attestation_slots: HashSet, + /// Slots whose proposer duty has been evicted. + evicted_proposer_slots: HashSet, + /// Aggregation roots whose duty has been evicted. + evicted_aggregation_keys: HashSet, + /// Sync contribution keys whose duty has been evicted. + evicted_contrib_keys: HashSet, + deadliner_rx: tokio::sync::mpsc::Receiver, } @@ -225,6 +254,10 @@ impl MemDB { aggregation_keys_by_slot: HashMap::new(), contrib_duties: HashMap::new(), contrib_keys_by_slot: HashMap::new(), + evicted_attestation_slots: HashSet::new(), + evicted_proposer_slots: HashSet::new(), + evicted_aggregation_keys: HashSet::new(), + evicted_contrib_keys: HashSet::new(), deadliner_rx, }), attestation_notify: Notify::new(), @@ -272,7 +305,6 @@ impl MemDB { Some(UnsignedDutyData::Proposal(p)) => state.store_proposal(p)?, Some(_) => return Err(Error::InvalidVersionedProposal), } - self.proposer_notify.notify_waiters(); } DutyType::Attester => { for (pubkey, data) in &unsigned_set { @@ -282,7 +314,6 @@ impl MemDB { }; state.store_attestation(*pubkey, att)?; } - self.attestation_notify.notify_waiters(); } DutyType::Aggregator => { for data in unsigned_set.values() { @@ -292,7 +323,6 @@ impl MemDB { }; state.store_agg_attestation(agg)?; } - self.aggregation_notify.notify_waiters(); } DutyType::SyncContribution => { for data in unsigned_set.values() { @@ -302,24 +332,54 @@ impl MemDB { }; state.store_sync_contribution(contrib)?; } - self.contrib_notify.notify_waiters(); } _ => return Err(Error::UnsupportedDutyType), } - - // Drain all expired duties that the deadliner has sent. + // Wake the matching notify for the duty we just stored, plus + // anything we drain below. `notify_waiters` is cheap if no one is + // parked and just bumps a counter, so calling it under the write + // lock is harmless — woken tasks block on `state.read()` until we + // drop. + self.wake(duty.duty_type); + + // Drain all expired duties that the deadliner has sent. Waiters + // whose duty just expired need to see `Lookup::Evicted` and exit, + // not re-park — so we wake the matching notify after each eviction. while let Ok(expired) = state.deadliner_rx.try_recv() { + let duty_type = expired.duty_type.clone(); state.delete_duty(expired)?; + self.wake(duty_type); } Ok(()) } + /// Wakes the [`Notify`] paired with `duty_type`. No-op for duty types + /// the DB doesn't track (e.g. `Exit`, `BuilderRegistration`). + fn wake(&self, duty_type: DutyType) { + let notify = match duty_type { + DutyType::Proposer => &self.proposer_notify, + DutyType::Attester => &self.attestation_notify, + DutyType::Aggregator => &self.aggregation_notify, + DutyType::SyncContribution => &self.contrib_notify, + _ => return, + }; + notify.notify_waiters(); + } + /// Blocks until a proposal for the given slot is available, then returns /// it. pub async fn await_proposal(&self, slot: u64) -> Result { - self.await_data(&self.proposer_notify, |s| s.proposer_duties.get(&slot)) - .await + self.await_data(&self.proposer_notify, |s| { + if let Some(v) = s.proposer_duties.get(&slot) { + Lookup::Found(v.clone()) + } else if s.evicted_proposer_slots.contains(&slot) { + Lookup::Evicted + } else { + Lookup::Pending + } + }) + .await } /// Blocks until attestation data for the given slot and committee index is @@ -333,8 +393,16 @@ impl MemDB { slot, committee_index, }; - self.await_data(&self.attestation_notify, |s| s.attestation_duties.get(&key)) - .await + self.await_data(&self.attestation_notify, |s| { + if let Some(v) = s.attestation_duties.get(&key) { + Lookup::Found(v.clone()) + } else if s.evicted_attestation_slots.contains(&key.slot) { + Lookup::Evicted + } else { + Lookup::Pending + } + }) + .await } /// Blocks until an aggregated attestation for the given slot and @@ -347,7 +415,13 @@ impl MemDB { root: attestation_root, }; self.await_data(&self.aggregation_notify, |s| { - s.aggregation_duties.get(&key).map(|a| &a.0) + if let Some(v) = s.aggregation_duties.get(&key) { + Lookup::Found(v.0.clone()) + } else if s.evicted_aggregation_keys.contains(&key) { + Lookup::Evicted + } else { + Lookup::Pending + } }) .await } @@ -365,8 +439,16 @@ impl MemDB { subcommittee_index, root: beacon_block_root, }; - self.await_data(&self.contrib_notify, |s| s.contrib_duties.get(&key)) - .await + self.await_data(&self.contrib_notify, |s| { + if let Some(v) = s.contrib_duties.get(&key) { + Lookup::Found(v.clone()) + } else if s.evicted_contrib_keys.contains(&key) { + Lookup::Evicted + } else { + Lookup::Pending + } + }) + .await } // A single Notify per duty type wakes all waiters on every store, not only @@ -374,22 +456,26 @@ impl MemDB { // is small (one per validator), so the extra wakeups are cheap. A keyed // notify (HashMap) would avoid them but adds complexity that // isn't worth it here. + // + // `delete_duty` also wakes the notify so waiters whose duty just expired + // exit immediately via the `Lookup::Evicted` branch, instead of parking + // for another `notify_waiters` call or for the per-request timeout in + // the caller. async fn await_data( &self, notify: &Notify, - lookup: impl for<'s> Fn(&'s State) -> Option<&'s V>, - ) -> Result - where - V: Clone, - { + lookup: impl Fn(&State) -> Lookup, + ) -> Result { loop { let notified = notify.notified(); tokio::pin!(notified); { let state = self.state.read().await; - if let Some(v) = lookup(&state) { - return Ok(v.clone()); + match lookup(&state) { + Lookup::Found(v) => return Ok(v), + Lookup::Evicted => return Err(Error::AwaitDutyExpired), + Lookup::Pending => {} } } @@ -577,6 +663,7 @@ impl State { match duty.duty_type { DutyType::Proposer => { self.proposer_duties.remove(&slot); + self.evicted_proposer_slots.insert(slot); } DutyType::BuilderProposer => return Err(Error::DeprecatedDutyBuilderProposer), DutyType::Attester => { @@ -589,19 +676,22 @@ impl State { }); } } + self.evicted_attestation_slots.insert(slot); } DutyType::Aggregator => { if let Some(keys) = self.aggregation_keys_by_slot.remove(&slot) { - for key in keys { - self.aggregation_duties.remove(&key); + for key in &keys { + self.aggregation_duties.remove(key); } + self.evicted_aggregation_keys.extend(keys); } } DutyType::SyncContribution => { if let Some(keys) = self.contrib_keys_by_slot.remove(&slot) { - for key in keys { - self.contrib_duties.remove(&key); + for key in &keys { + self.contrib_duties.remove(key); } + self.evicted_contrib_keys.extend(keys); } } _ => return Err(Error::UnknownDutyType), diff --git a/crates/core/src/validatorapi/component.rs b/crates/core/src/validatorapi/component.rs new file mode 100644 index 00000000..c92f3d3f --- /dev/null +++ b/crates/core/src/validatorapi/component.rs @@ -0,0 +1,2198 @@ +//! Validator API [`Handler`] implementation. +//! +//! The component owns the upstream beacon-node client plus the public-key +//! and public-share mappings needed to translate between distributed-validator +//! root keys and this node's threshold-BLS share. + +use std::{any::Any, collections::HashMap, future::Future, pin::Pin, sync::Arc, time::Duration}; + +use async_trait::async_trait; +use axum::http::StatusCode; +use pluto_eth2api::{ + EthBeaconNodeApiClient, GetAttesterDutiesRequest, GetAttesterDutiesResponse, + GetProposerDutiesRequest, GetProposerDutiesResponse, GetSyncCommitteeDutiesRequest, + GetSyncCommitteeDutiesResponse, + spec::phase0::{BLSPubKey, Epoch, Root}, +}; +use pluto_eth2util::signing::{self, DomainName, SigningError}; +use tokio::time::error::Elapsed; + +use super::{ + error::ApiError, + handler::Handler, + types::{ + AggregateAttestationOpts, AttestationDataOpts, AttestationDataResponse, AttesterDutiesOpts, + AttesterDutiesResponse, AttesterDuty, BeaconCommitteeSelection, EthResponse, + NodeVersionData, NodeVersionResponse, ProposalOpts, ProposerDutiesOpts, + ProposerDutiesResponse, ProposerDuty, SignedContributionAndProof, + SignedValidatorRegistration, SignedVoluntaryExit, SyncCommitteeContribution, + SyncCommitteeContributionOpts, SyncCommitteeDutiesOpts, SyncCommitteeDutiesResponse, + SyncCommitteeDuty, SyncCommitteeMessage, SyncCommitteeSelection, Validator, ValidatorsOpts, + VersionedAttestation, VersionedProposal, VersionedSignedAggregateAndProof, + VersionedSignedBlindedProposal, VersionedSignedProposal, + }, +}; +use crate::{ + dutydb::{Error as DutyDbError, MemDB}, + signeddata::{ + SignedVoluntaryExit as SignedVoluntaryExitWrapper, SyncContribution, + VersionedAggregatedAttestation, VersionedProposal as UnsignedVersionedProposal, + VersionedSignedValidatorRegistration as VersionedSignedValidatorRegistrationWrapper, + }, + types::{Duty, ParSignedDataSet, PubKey, Signature, SignedData, SlotNumber}, + version, +}; + +/// Boxed error returned by registered callbacks. Mirrors Go's untyped +/// `error` return for `awaitX` / `dutyDef` / `pubKeyByAtt` callbacks. +pub type CallbackError = Box; + +/// Convenience alias for the future returned by an async registered callback. +type CallbackFuture<'a, T> = Pin> + Send + 'a>>; + +/// Subscriber callback for `Subscribe`. Receives a [`Duty`] and a clone of +/// the [`ParSignedDataSet`] — cloning happens inside the registered wrapper, +/// matching Go's `Subscribe` clone-before-fanout behaviour. +pub type SubscriberFn = Arc< + dyn for<'a> Fn(&'a Duty, ParSignedDataSet) -> CallbackFuture<'a, ()> + Send + Sync + 'static, +>; + +/// Looks up an unsigned beacon proposal by slot. Mirrors Go's +/// `awaitProposalFunc(ctx, slot) -> *eth2api.VersionedProposal`. +pub type AwaitProposalFn = + Arc CallbackFuture<'static, UnsignedVersionedProposal> + Send + Sync + 'static>; + +/// Looks up an aggregated attestation by `(slot, attestation_root)`. Mirrors +/// Go's `awaitAggAttFunc(ctx, slot, root) -> *eth2spec.VersionedAttestation`. +pub type AwaitAggAttestationFn = Arc< + dyn Fn(u64, Root) -> CallbackFuture<'static, VersionedAggregatedAttestation> + + Send + + Sync + + 'static, +>; + +/// Looks up a sync committee contribution by `(slot, subcommittee_index, +/// beacon_block_root)`. Mirrors Go's `awaitSyncContributionFunc`. +pub type AwaitSyncContributionFn = Arc< + dyn Fn(u64, u64, Root) -> CallbackFuture<'static, SyncContribution> + Send + Sync + 'static, +>; + +/// Looks up aggregated signed data from the AggSigDB for a `(duty, pubkey)`. +/// Mirrors Go's `awaitAggSigDBFunc(ctx, duty, pubkey) -> core.SignedData`. +pub type AwaitAggSigDBFn = Arc< + dyn Fn(Duty, PubKey) -> CallbackFuture<'static, Box> + Send + Sync + 'static, +>; + +/// Looks up the duty-definition set for a given [`Duty`]. Mirrors Go's +/// `dutyDefFunc(ctx, duty) -> core.DutyDefinitionSet`. The Go return type is +/// an untyped interface map keyed by pubkey; in Rust we keep the same +/// type-erased shape via `Box` so callers can downcast to the +/// concrete `DutyDefinitionSet` they need. +pub type DutyDefFn = Arc< + dyn Fn(Duty) -> CallbackFuture<'static, Box> + Send + Sync + 'static, +>; + +/// Looks up the root pubkey responsible for `(slot, committee_index, +/// validator_index)`. Mirrors Go's `pubKeyByAttFunc`. +pub type PubKeyByAttFn = + Arc CallbackFuture<'static, PubKey> + Send + Sync + 'static>; + +/// Looks up the current set of active validators, returning a map from +/// validator index to the DV root BLS public key. Mirrors Go's +/// `c.eth2Cl.ActiveValidators(ctx)` returning `map[ValidatorIndex]BLSPubKey`. +pub type ActiveValidatorsFn = + Arc CallbackFuture<'static, HashMap> + Send + Sync + 'static>; + +/// Hard deadline for upstream beacon-node calls. Bounds the worst-case +/// handler latency when the upstream hangs or stalls. Roughly one slot. +const UPSTREAM_REQUEST_TIMEOUT: Duration = Duration::from_secs(12); + +/// Hard deadline for the `attestation_data` await on the local DutyDB. +/// Bounded so a request whose slot never produces consensus output cannot +/// hold a handler task indefinitely. Sized at roughly two slots so a real +/// attestation duty has time to flow through the pipeline. +const ATTESTATION_DATA_TIMEOUT: Duration = Duration::from_secs(24); + +/// Validator API [`Handler`] implementation. +/// +/// Holds the upstream beacon-node client and the cluster's public-key / +/// public-share mappings. Each per-endpoint method calls upstream, rewrites +/// root pubkeys to this node's share where the endpoint exposes data to the +/// validator client, and emits partial-signed-data to subscribers on submit +/// endpoints. +pub struct Component { + /// Upstream beacon-node API client. + eth2_cl: Arc, + /// In-memory DutyDB used to await consensus output (e.g. attestation + /// data) produced by the rest of the pipeline. + dutydb: Arc, + /// Threshold BLS share index assigned to this node (1-indexed). + share_idx: u64, + /// Maps DV root public keys to this node's public share. Used to rewrite + /// validator-client-facing endpoints (proposer/attester duties, etc.) so + /// the VC sees the share it is configured to sign with. + pub_share_by_pubkey: HashMap, + /// Whether builder mode is enabled. Read by `propose_block_v3` and the + /// validator-registration submitter. + builder_enabled: bool, + /// Skip signature verification on partial-signed submissions. Test-only. + insecure_test: bool, + /// Subscribers invoked by submit endpoints once a partial-signed-data set + /// has been validated. Each entry clones the set before invoking the + /// user-provided callback, mirroring Go's `Subscribe`. + subs: Vec, + /// Looks up an unsigned beacon proposal for a slot. + #[allow(dead_code, reason = "consumed by proposal handler in later PRs")] + await_proposal_fn: Option, + /// Looks up an aggregated attestation by `(slot, attestation_root)`. + #[allow(dead_code, reason = "consumed by aggregate_attestation in later PRs")] + await_agg_attestation_fn: Option, + /// Looks up a sync committee contribution. + #[allow( + dead_code, + reason = "consumed by sync_committee_contribution in later PRs" + )] + await_sync_contribution_fn: Option, + /// Looks up aggregated signed data for a `(duty, pubkey)`. + #[allow(dead_code, reason = "consumed by submit_* handlers in later PRs")] + await_agg_sig_db_fn: Option, + /// Looks up the duty-definition set for a duty. + #[allow(dead_code, reason = "consumed by submit_attestations in later PRs")] + duty_def_fn: Option, + /// Looks up the root pubkey for an `(slot, commIdx, valIdx)` triple. + #[allow(dead_code, reason = "consumed by submit_attestations in later PRs")] + pub_key_by_att_fn: Option, + /// Looks up the current set of active validators (`validator_index → DV + /// root BLS public key`). Mirrors Go's `c.eth2Cl.ActiveValidators(ctx)` + /// and is consulted by `submit_voluntary_exit` to translate the + /// `Message.ValidatorIndex` carried by an exit into the corresponding + /// root pubkey before partial-signature verification. + active_validators_fn: Option, +} + +impl Component { + /// Builds a new component. + pub fn new( + eth2_cl: Arc, + dutydb: Arc, + share_idx: u64, + pub_share_by_pubkey: HashMap, + builder_enabled: bool, + ) -> Self { + Self { + eth2_cl, + dutydb, + share_idx, + pub_share_by_pubkey, + builder_enabled, + insecure_test: false, + subs: Vec::new(), + await_proposal_fn: None, + await_agg_attestation_fn: None, + await_sync_contribution_fn: None, + await_agg_sig_db_fn: None, + duty_def_fn: None, + pub_key_by_att_fn: None, + active_validators_fn: None, + } + } + + /// Builds a component that skips partial-signature verification on + /// submit endpoints. Gated to test builds — `insecure_test: true` must + /// never reach production, since later submit handlers consult this flag + /// to bypass signature checks. + #[cfg(test)] + pub fn new_insecure( + eth2_cl: Arc, + dutydb: Arc, + share_idx: u64, + ) -> Self { + Self { + eth2_cl, + dutydb, + share_idx, + pub_share_by_pubkey: HashMap::new(), + builder_enabled: false, + insecure_test: true, + subs: Vec::new(), + await_proposal_fn: None, + await_agg_attestation_fn: None, + await_sync_contribution_fn: None, + await_agg_sig_db_fn: None, + duty_def_fn: None, + pub_key_by_att_fn: None, + active_validators_fn: None, + } + } + + /// Appends a subscriber that is invoked by submit endpoints once a + /// partial-signed-data set has been validated. Mirrors Go's + /// `(*Component).Subscribe` — the registered closure receives its own + /// clone of the set, so subscribers can mutate without affecting peers. + pub fn subscribe(&mut self, f: F) + where + F: Fn(Duty, ParSignedDataSet) -> Fut + Send + Sync + 'static, + Fut: Future> + Send + 'static, + { + let wrapped: SubscriberFn = Arc::new(move |duty, set| { + // Clone before invoking each subscriber — exactly as Go's + // `Subscribe` wraps the user fn with a `set.Clone()` step. + let fut = f(duty.clone(), set.clone()); + Box::pin(fut) + }); + self.subs.push(wrapped); + } + + /// Registers (and overwrites any prior) `awaitProposalFunc`. Mirrors + /// Go's `RegisterAwaitProposal` single-function semantics. + pub fn register_await_proposal(&mut self, f: F) + where + F: Fn(u64) -> Fut + Send + Sync + 'static, + Fut: Future> + Send + 'static, + { + self.await_proposal_fn = Some(Arc::new(move |slot| Box::pin(f(slot)))); + } + + /// Registers (and overwrites any prior) `awaitAggAttestationFunc`. + pub fn register_await_agg_attestation(&mut self, f: F) + where + F: Fn(u64, Root) -> Fut + Send + Sync + 'static, + Fut: + Future> + Send + 'static, + { + self.await_agg_attestation_fn = Some(Arc::new(move |slot, root| Box::pin(f(slot, root)))); + } + + /// Registers (and overwrites any prior) `awaitSyncContributionFunc`. + pub fn register_await_sync_contribution(&mut self, f: F) + where + F: Fn(u64, u64, Root) -> Fut + Send + Sync + 'static, + Fut: Future> + Send + 'static, + { + self.await_sync_contribution_fn = Some(Arc::new(move |slot, subcomm, root| { + Box::pin(f(slot, subcomm, root)) + })); + } + + /// Registers (and overwrites any prior) `awaitAggSigDBFunc`. + pub fn register_await_agg_sig_db(&mut self, f: F) + where + F: Fn(Duty, PubKey) -> Fut + Send + Sync + 'static, + Fut: Future, CallbackError>> + Send + 'static, + { + self.await_agg_sig_db_fn = Some(Arc::new(move |duty, pubkey| Box::pin(f(duty, pubkey)))); + } + + /// Registers (and overwrites any prior) `dutyDefFunc`. + pub fn register_get_duty_definition(&mut self, f: F) + where + F: Fn(Duty) -> Fut + Send + Sync + 'static, + Fut: Future, CallbackError>> + Send + 'static, + { + self.duty_def_fn = Some(Arc::new(move |duty| Box::pin(f(duty)))); + } + + /// Registers (and overwrites any prior) `pubKeyByAttFunc`. + pub fn register_pub_key_by_attestation(&mut self, f: F) + where + F: Fn(u64, u64, u64) -> Fut + Send + Sync + 'static, + Fut: Future> + Send + 'static, + { + self.pub_key_by_att_fn = Some(Arc::new(move |slot, comm, val| { + Box::pin(f(slot, comm, val)) + })); + } + + /// Registers (and overwrites any prior) active-validators lookup. + /// Mirrors Go's single-function `eth2Cl.ActiveValidators` dependency — + /// the most recently registered closure wins. + pub fn register_active_validators(&mut self, f: F) + where + F: Fn() -> Fut + Send + Sync + 'static, + Fut: Future, CallbackError>> + Send + 'static, + { + self.active_validators_fn = Some(Arc::new(move || Box::pin(f()))); + } + + /// Verifies and fans out a single builder-registration. Factored out so + /// [`Self::submit_validator_registrations`] iterates over its input in + /// the same shape as Go's `SubmitValidatorRegistrations`. + async fn submit_one_registration( + &self, + registration: SignedValidatorRegistration, + ) -> Result<(), ApiError> { + // Go: validatorapi.go:676-690 — pull the group pubkey out of the + // wrapped registration and gate on it being a DV pubkey on this + // node. Non-DV pubkeys are silently swallowed (matches Go's + // `swallowRegFilter` debug-log behaviour) so a vouch-style VC that + // also registers its proposer key does not get a non-200 from us. + let v1 = registration.0.v1.as_ref().ok_or_else(|| { + ApiError::new( + StatusCode::BAD_REQUEST, + "missing V1 validator registration payload", + ) + })?; + let root_pubkey = v1.message.pubkey; + + if !self.pub_share_by_pubkey.contains_key(&root_pubkey) { + tracing::debug!( + pubkey = ?format_bls_pubkey(&root_pubkey), + "swallowing non-DV registration", + ); + return Ok(()); + } + + let timestamp = v1.message.timestamp; + + // Go: validatorapi.go:693-703 — derive the slot the registration + // belongs to. Pluto inlines `SlotFromTimestamp` here since the + // helper has not been ported into a shared module yet. + let (slot_duration, _) = + tokio::time::timeout(UPSTREAM_REQUEST_TIMEOUT, self.eth2_cl.fetch_slots_config()) + .await + .map_err(|_| upstream_timeout("slots config"))? + .map_err(|err| upstream_call_failed("slots config", err.into()))?; + let genesis_time = + tokio::time::timeout(UPSTREAM_REQUEST_TIMEOUT, self.eth2_cl.fetch_genesis_time()) + .await + .map_err(|_| upstream_timeout("genesis time"))? + .map_err(|err| upstream_call_failed("genesis time", err.into()))?; + + let registration_slot = slot_from_timestamp(genesis_time, slot_duration, timestamp); + let duty = Duty::new_builder_registration_duty(SlotNumber::new(registration_slot)); + + // Go: validatorapi.go:706 — wrap as ParSignedData via the canonical + // partial-sig constructor. + let par_signed = VersionedSignedValidatorRegistrationWrapper::new_partial( + registration.0.clone(), + self.share_idx, + ) + .map_err(|err| { + ApiError::new( + StatusCode::BAD_REQUEST, + "invalid validator registration payload", + ) + .with_source(err) + })?; + + // Go: validatorapi.go:712 — partial-signature verification. The + // application-builder domain ignores the epoch (Go's + // `Epoch()` returns 0); we mirror that here. + let message_root = v1.message.message_root(); + self.verify_partial_sig( + &root_pubkey, + DomainName::ApplicationBuilder, + 0, + message_root, + &sig_to_array(v1.signature), + ) + .await + .map_err(verify_partial_sig_error)?; + + // Go: validatorapi.go:718-725 — fanout. The Go comment ("No need to + // clone since sub auto clones") matches Pluto's wrapped `subscribe` + // closure cloning the set internally. + let core_pubkey = PubKey::new(root_pubkey); + let mut set = ParSignedDataSet::new(); + set.insert(core_pubkey, par_signed); + + for sub in &self.subs { + sub(&duty, set.clone()) + .await + .map_err(subscriber_error_to_api_error)?; + } + + Ok(()) + } + + /// Verifies a partial BLS signature produced by the validator client + /// against this node's public share for the given DV root pubkey. + /// + /// Mirrors Go's `verifyPartialSig` at + /// `core/validatorapi/validatorapi.go:1352`. Unlike the Go variant — + /// which projects the BLS-domain / epoch / message-root through the + /// `core.Eth2SignedData` interface — this Rust hook takes those three + /// values directly. Each submit handler in PRs 3–6 will derive the + /// triple from the concrete signed-data wrapper it is processing, then + /// invoke this helper. + /// + /// Skipped entirely when [`Self::insecure_test`] is set. + pub async fn verify_partial_sig( + &self, + root_pubkey: &BLSPubKey, + domain_name: DomainName, + epoch: Epoch, + message_root: Root, + signature: &Signature, + ) -> Result<(), VerifyPartialSigError> { + if self.insecure_test { + return Ok(()); + } + + // The verify-share is this node's public share for the given DV root + // pubkey. Mirrors Go's `getVerifyShareFunc` lookup. + let pubshare = self + .pub_share_by_pubkey + .get(root_pubkey) + .ok_or(VerifyPartialSigError::UnknownPubKey)?; + + signing::verify( + &self.eth2_cl, + domain_name, + epoch, + message_root, + signature, + pubshare, + ) + .await?; + + Ok(()) + } +} + +/// Errors returned by [`Component::verify_partial_sig`]. +#[derive(Debug, thiserror::Error)] +pub enum VerifyPartialSigError { + /// The supplied DV root public key has no public share registered on + /// this node. Mirrors Go's `getVerifyShareFunc` "unknown public key" + /// branch. + #[error("unknown public key")] + UnknownPubKey, + + /// The beacon-node signing-domain lookup or BLS verification failed. + #[error(transparent)] + Signing(#[from] SigningError), +} + +#[async_trait] +impl Handler for Component { + async fn node_version(&self) -> Result { + let (commit, _) = version::git_commit(); + let version = format!( + "obolnetwork/pluto/{}-{}/{}-{}", + *version::VERSION, + commit, + std::env::consts::ARCH, + std::env::consts::OS, + ); + + Ok(NodeVersionResponse { + data: NodeVersionData { version }, + }) + } + + async fn proposer_duties( + &self, + opts: ProposerDutiesOpts, + ) -> Result { + let request = GetProposerDutiesRequest::builder() + .epoch(opts.epoch.to_string()) + .build() + .map_err(|err| { + ApiError::new(StatusCode::BAD_REQUEST, "invalid epoch") + .with_boxed_source(err.into()) + })?; + + let response = tokio::time::timeout( + UPSTREAM_REQUEST_TIMEOUT, + self.eth2_cl.get_proposer_duties(request), + ) + .await + .map_err(|_| upstream_timeout("proposer duties"))? + .map_err(|err| upstream_call_failed("proposer duties", err.into()))?; + + let mut payload = match response { + GetProposerDutiesResponse::Ok(payload) => payload, + GetProposerDutiesResponse::BadRequest(body) => { + return Err(upstream_status_error( + StatusCode::BAD_REQUEST, + "proposer duties", + body, + )); + } + GetProposerDutiesResponse::ServiceUnavailable(body) => { + return Err(upstream_status_error( + StatusCode::SERVICE_UNAVAILABLE, + "proposer duties", + body, + )); + } + other @ (GetProposerDutiesResponse::InternalServerError(_) + | GetProposerDutiesResponse::Unknown) => { + return Err(upstream_unexpected("proposer duties", other)); + } + }; + + swap_proposer_pubshares(&mut payload.data, &self.pub_share_by_pubkey)?; + + Ok(payload) + } + + async fn attester_duties( + &self, + opts: AttesterDutiesOpts, + ) -> Result { + let request = GetAttesterDutiesRequest::builder() + .epoch(opts.epoch.to_string()) + .body(opts.indices) + .build() + .map_err(|err| { + ApiError::new(StatusCode::BAD_REQUEST, "invalid attester duties request") + .with_boxed_source(err.into()) + })?; + + let response = tokio::time::timeout( + UPSTREAM_REQUEST_TIMEOUT, + self.eth2_cl.get_attester_duties(request), + ) + .await + .map_err(|_| upstream_timeout("attester duties"))? + .map_err(|err| upstream_call_failed("attester duties", err.into()))?; + + let mut payload = match response { + GetAttesterDutiesResponse::Ok(payload) => payload, + GetAttesterDutiesResponse::BadRequest(body) => { + return Err(upstream_status_error( + StatusCode::BAD_REQUEST, + "attester duties", + body, + )); + } + GetAttesterDutiesResponse::ServiceUnavailable(body) => { + return Err(upstream_status_error( + StatusCode::SERVICE_UNAVAILABLE, + "attester duties", + body, + )); + } + other @ (GetAttesterDutiesResponse::InternalServerError(_) + | GetAttesterDutiesResponse::Unknown) => { + return Err(upstream_unexpected("attester duties", other)); + } + }; + + swap_attester_pubshares(&mut payload.data, &self.pub_share_by_pubkey)?; + + Ok(payload) + } + + async fn sync_committee_duties( + &self, + opts: SyncCommitteeDutiesOpts, + ) -> Result { + let request = GetSyncCommitteeDutiesRequest::builder() + .epoch(opts.epoch.to_string()) + .body(opts.indices) + .build() + .map_err(|err| { + ApiError::new( + StatusCode::BAD_REQUEST, + "invalid sync committee duties request", + ) + .with_boxed_source(err.into()) + })?; + + let response = tokio::time::timeout( + UPSTREAM_REQUEST_TIMEOUT, + self.eth2_cl.get_sync_committee_duties(request), + ) + .await + .map_err(|_| upstream_timeout("sync committee duties"))? + .map_err(|err| upstream_call_failed("sync committee duties", err.into()))?; + + let mut payload = match response { + GetSyncCommitteeDutiesResponse::Ok(payload) => payload, + GetSyncCommitteeDutiesResponse::BadRequest(body) => { + return Err(upstream_status_error( + StatusCode::BAD_REQUEST, + "sync committee duties", + body, + )); + } + GetSyncCommitteeDutiesResponse::ServiceUnavailable(body) => { + return Err(upstream_status_error( + StatusCode::SERVICE_UNAVAILABLE, + "sync committee duties", + body, + )); + } + other @ (GetSyncCommitteeDutiesResponse::InternalServerError(_) + | GetSyncCommitteeDutiesResponse::Unknown) => { + return Err(upstream_unexpected("sync committee duties", other)); + } + }; + + swap_sync_committee_pubshares(&mut payload.data, &self.pub_share_by_pubkey)?; + + Ok(payload) + } + + async fn attestation_data( + &self, + opts: AttestationDataOpts, + ) -> Result { + let data = tokio::time::timeout( + ATTESTATION_DATA_TIMEOUT, + self.dutydb + .await_attestation(opts.slot, opts.committee_index), + ) + .await + .map_err(|_: Elapsed| { + ApiError::new( + StatusCode::REQUEST_TIMEOUT, + "attestation data not available before deadline", + ) + })? + .map_err(map_dutydb_error)?; + + Ok(AttestationDataResponse { data }) + } + + async fn submit_attestations( + &self, + _attestations: Vec, + ) -> Result<(), ApiError> { + unimplemented!("submit_attestations not yet ported") + } + + async fn proposal( + &self, + _opts: ProposalOpts, + ) -> Result, ApiError> { + unimplemented!("proposal not yet ported") + } + + async fn submit_proposal(&self, _proposal: VersionedSignedProposal) -> Result<(), ApiError> { + unimplemented!("submit_proposal not yet ported") + } + + async fn submit_blinded_proposal( + &self, + _proposal: VersionedSignedBlindedProposal, + ) -> Result<(), ApiError> { + unimplemented!("submit_blinded_proposal not yet ported") + } + + async fn aggregate_attestation( + &self, + _opts: AggregateAttestationOpts, + ) -> Result, ApiError> { + unimplemented!("aggregate_attestation not yet ported") + } + + async fn submit_aggregate_attestations( + &self, + _aggregates: Vec, + ) -> Result<(), ApiError> { + unimplemented!("submit_aggregate_attestations not yet ported") + } + + async fn beacon_committee_selections( + &self, + _selections: Vec, + ) -> Result>, ApiError> { + unimplemented!("beacon_committee_selections not yet ported") + } + + async fn sync_committee_selections( + &self, + _selections: Vec, + ) -> Result>, ApiError> { + unimplemented!("sync_committee_selections not yet ported") + } + + async fn validators( + &self, + _opts: ValidatorsOpts, + ) -> Result>, ApiError> { + unimplemented!("validators not yet ported") + } + + async fn submit_validator_registrations( + &self, + registrations: Vec, + ) -> Result<(), ApiError> { + // Go: validatorapi.go:732-734 — empty input is a no-op. + if registrations.is_empty() { + return Ok(()); + } + + // Go: validatorapi.go:736-739 — builder-mode gate. When builder mode + // is disabled the registrations are accepted (no client-visible + // error) but never fanned out. Mirrors the swallow-on-disable + // behaviour Go inherited from Vouch. + if !self.builder_enabled { + tracing::warn!( + count = registrations.len(), + "swallowing validator registrations: builder mode disabled", + ); + return Ok(()); + } + + for registration in registrations { + self.submit_one_registration(registration).await?; + } + + Ok(()) + } + + async fn submit_voluntary_exit(&self, exit: SignedVoluntaryExit) -> Result<(), ApiError> { + // Go: validatorapi.go:753-761 — resolve the DV root pubkey for the + // validator index carried by the exit. The Pluto-side lookup runs + // through the registered `active_validators_fn` hook (mirrors the + // Go `eth2Cl.ActiveValidators` indirection). + let active_validators_fn = self.active_validators_fn.as_ref().ok_or_else(|| { + ApiError::new( + StatusCode::INTERNAL_SERVER_ERROR, + "active validators lookup not registered", + ) + })?; + + let active = tokio::time::timeout(UPSTREAM_REQUEST_TIMEOUT, active_validators_fn()) + .await + .map_err(|_| upstream_timeout("active validators"))? + .map_err(|err| upstream_call_failed("active validators", err))?; + + let validator_index = exit.0.message.validator_index; + let root_pubkey = active.get(&validator_index).copied().ok_or_else(|| { + // Go: `errors.New("validator not found")` — bubble up as 400 so a + // misbehaving VC sees a non-retriable rejection without leaking + // upstream details. + ApiError::new(StatusCode::BAD_REQUEST, "validator not found") + })?; + + // Go: validatorapi.go:768-773 — duty slot = slots_per_epoch * epoch. + let (_, slots_per_epoch) = + tokio::time::timeout(UPSTREAM_REQUEST_TIMEOUT, self.eth2_cl.fetch_slots_config()) + .await + .map_err(|_| upstream_timeout("slots config"))? + .map_err(|err| upstream_call_failed("slots config", err.into()))?; + + let exit_epoch = exit.0.message.epoch; + let duty_slot = slots_per_epoch.saturating_mul(exit_epoch); + let duty = Duty::new_voluntary_exit_duty(SlotNumber::new(duty_slot)); + + // Go: validatorapi.go:776 — build the ParSignedData via the canonical + // partial-sig constructor for voluntary exits. + let par_signed = SignedVoluntaryExitWrapper::new_partial(exit.0.clone(), self.share_idx); + + // Go: validatorapi.go:779 — partial-signature verification. + let message_root = exit.0.message_root(); + self.verify_partial_sig( + &root_pubkey, + DomainName::VoluntaryExit, + exit_epoch, + message_root, + &sig_to_array(exit.0.signature), + ) + .await + .map_err(verify_partial_sig_error)?; + + tracing::info!(?duty, "Voluntary exit submitted by validator client"); + + // Go: validatorapi.go:786-793 — fanout to every subscriber. Each + // subscriber clone-and-invokes via the [`Component::subscribe`] + // wrapper, so we hand the same set into every callback. + let core_pubkey = PubKey::new(root_pubkey); + let mut set = ParSignedDataSet::new(); + set.insert(core_pubkey, par_signed); + + for sub in &self.subs { + sub(&duty, set.clone()) + .await + .map_err(subscriber_error_to_api_error)?; + } + + Ok(()) + } + + async fn sync_committee_contribution( + &self, + _opts: SyncCommitteeContributionOpts, + ) -> Result, ApiError> { + unimplemented!("sync_committee_contribution not yet ported") + } + + async fn submit_sync_committee_contributions( + &self, + _contributions: Vec, + ) -> Result<(), ApiError> { + unimplemented!("submit_sync_committee_contributions not yet ported") + } + + async fn submit_sync_committee_messages( + &self, + _messages: Vec, + ) -> Result<(), ApiError> { + unimplemented!("submit_sync_committee_messages not yet ported") + } +} + +/// Builds the `ApiError` returned when an upstream beacon-node call elapses +/// past [`UPSTREAM_REQUEST_TIMEOUT`]. +fn upstream_timeout(endpoint: &'static str) -> ApiError { + ApiError::new( + StatusCode::GATEWAY_TIMEOUT, + format!("upstream {endpoint} timed out"), + ) +} + +/// Builds the `ApiError` returned when an upstream beacon-node call returns a +/// transport-level error. Boxed so `anyhow::Error` (which doesn't itself +/// implement `std::error::Error`) can be attached via `.into()`. +fn upstream_call_failed( + endpoint: &'static str, + err: Box, +) -> ApiError { + ApiError::new( + StatusCode::BAD_GATEWAY, + format!("upstream {endpoint} failed"), + ) + .with_boxed_source(err) +} + +/// Builds the `ApiError` returned when the upstream responds with a faithful +/// HTTP status that we propagate (e.g. 400, 503). The upstream body is +/// attached as a `source` for debug logging — never serialized into the +/// client-visible message. +fn upstream_status_error( + status: StatusCode, + endpoint: &'static str, + body: B, +) -> ApiError { + ApiError::new( + status, + format!("upstream {endpoint} returned {}", status.as_u16()), + ) + .with_source(std::io::Error::other(format!( + "upstream {endpoint} body: {body:?}" + ))) +} + +/// Builds the `ApiError` returned when the upstream responds with an +/// unexpected variant (e.g. `Unknown`, or `InternalServerError`). The variant +/// is attached as a `source` so the debug log retains it but the client +/// message stays generic. +fn upstream_unexpected(endpoint: &'static str, response: R) -> ApiError { + ApiError::new( + StatusCode::BAD_GATEWAY, + format!("unexpected upstream {endpoint} response"), + ) + .with_source(std::io::Error::other(format!( + "upstream {endpoint} variant: {response:?}" + ))) +} + +/// Maps a [`crate::dutydb::Error`] into the `ApiError` returned to the client +/// when an `attestation_data` await fails. `Shutdown` propagates as 503 so the +/// VC can retry; `AwaitDutyExpired` propagates as 408 — same as a timeout — +/// since the duty is gone and the data will never arrive. Anything else is a +/// programming error here and becomes 500. +fn map_dutydb_error(err: DutyDbError) -> ApiError { + let (status, message) = match err { + DutyDbError::Shutdown => (StatusCode::SERVICE_UNAVAILABLE, "dutydb is shutting down"), + DutyDbError::AwaitDutyExpired => ( + StatusCode::REQUEST_TIMEOUT, + "attestation duty expired before data was stored", + ), + _ => ( + StatusCode::INTERNAL_SERVER_ERROR, + "await attestation failed", + ), + }; + ApiError::new(status, message).with_source(err) +} + +/// Rewrites each duty's root public key to this node's public share. Duties +/// whose pubkey is not in `pub_share_by_pubkey` are passed through unchanged +/// (the upstream returns all proposers for the epoch, not just ours). +fn swap_proposer_pubshares( + duties: &mut [ProposerDuty], + pub_share_by_pubkey: &HashMap, +) -> Result<(), ApiError> { + for duty in duties { + let pubkey = parse_bls_pubkey(&duty.pubkey)?; + if let Some(share) = pub_share_by_pubkey.get(&pubkey) { + duty.pubkey = format_bls_pubkey(share); + } + } + Ok(()) +} + +/// Like [`swap_proposer_pubshares`] but for attester duties. Attester duties +/// only ever come back for validators owned by this cluster, so an unknown +/// pubkey indicates a misconfiguration and is rejected. +fn swap_attester_pubshares( + duties: &mut [AttesterDuty], + pub_share_by_pubkey: &HashMap, +) -> Result<(), ApiError> { + for duty in duties { + let pubkey = parse_bls_pubkey(&duty.pubkey)?; + let share = pub_share_by_pubkey.get(&pubkey).ok_or_else(|| { + // Cluster/lock-file misconfiguration — the upstream returned a + // well-formed duty, but this node has no share for that validator. + // 500 (not 502): the failure is local, not gateway-level. + ApiError::new( + StatusCode::INTERNAL_SERVER_ERROR, + "pubshare not found for attester duty", + ) + })?; + duty.pubkey = format_bls_pubkey(share); + } + Ok(()) +} + +/// Sync-committee duties variant of [`swap_attester_pubshares`]. +fn swap_sync_committee_pubshares( + duties: &mut [SyncCommitteeDuty], + pub_share_by_pubkey: &HashMap, +) -> Result<(), ApiError> { + for duty in duties { + let pubkey = parse_bls_pubkey(&duty.pubkey)?; + let share = pub_share_by_pubkey.get(&pubkey).ok_or_else(|| { + // See `swap_attester_pubshares` — same 500-not-502 reasoning. + ApiError::new( + StatusCode::INTERNAL_SERVER_ERROR, + "pubshare not found for sync committee duty", + ) + })?; + duty.pubkey = format_bls_pubkey(share); + } + Ok(()) +} + +fn parse_bls_pubkey(s: &str) -> Result { + let trimmed = s.strip_prefix("0x").unwrap_or(s); + let bytes = hex::decode(trimmed).map_err(|err| { + ApiError::new( + StatusCode::BAD_GATEWAY, + format!("invalid pubkey hex: {err}"), + ) + })?; + bytes.as_slice().try_into().map_err(|_| { + ApiError::new( + StatusCode::BAD_GATEWAY, + format!("invalid pubkey length: got {}, want 48", bytes.len()), + ) + }) +} + +fn format_bls_pubkey(pubkey: &BLSPubKey) -> String { + format!("0x{}", hex::encode(pubkey)) +} + +/// Converts a phase0 [`BLSSignature`] (`[u8; 96]`) into the core +/// [`Signature`] alias used by [`Component::verify_partial_sig`]. The two +/// are interchangeable byte-wise; this helper exists to keep the call sites +/// readable. +fn sig_to_array(sig: pluto_eth2api::spec::phase0::BLSSignature) -> Signature { + sig +} + +/// Maps a [`VerifyPartialSigError`] into the `ApiError` returned to the +/// client. `UnknownPubKey` is a misconfiguration (500), `Signing` is a +/// validator-client mistake (400) — both keep the underlying error as a +/// `source` so the debug log retains it while the client sees a generic +/// message. +fn verify_partial_sig_error(err: VerifyPartialSigError) -> ApiError { + match err { + VerifyPartialSigError::UnknownPubKey => ApiError::new( + StatusCode::INTERNAL_SERVER_ERROR, + "unknown public key for partial signature verification", + ) + .with_source(err), + VerifyPartialSigError::Signing(_) => ApiError::new( + StatusCode::BAD_REQUEST, + "partial signature verification failed", + ) + .with_source(err), + } +} + +/// Maps a subscriber callback failure into an `ApiError`. Subscriber errors +/// are downstream-pipeline failures (parsigdb store, fanout transport, …), +/// so 500 is the appropriate client-visible status — and the underlying +/// error is preserved on `source()` for the debug log. +fn subscriber_error_to_api_error(err: CallbackError) -> ApiError { + ApiError::new( + StatusCode::INTERNAL_SERVER_ERROR, + "downstream subscriber failed", + ) + .with_boxed_source(err) +} + +/// Computes the slot a timestamp belongs to, mirroring Go's +/// `SlotFromTimestamp` at `validatorapi.go:41-70`. When the timestamp is +/// before genesis (testing scenarios), Go falls back to "now"; here we fall +/// back to slot 0 to keep the helper pure — the only consumer is the +/// `Duty` key, where any deterministic placeholder is acceptable. +fn slot_from_timestamp( + genesis_time: chrono::DateTime, + slot_duration: std::time::Duration, + timestamp_secs: u64, +) -> u64 { + let genesis_secs = match u64::try_from(genesis_time.timestamp()) { + Ok(value) => value, + Err(_) => return 0, + }; + if timestamp_secs < genesis_secs { + return 0; + } + let elapsed = timestamp_secs.saturating_sub(genesis_secs); + let secs_per_slot = slot_duration.as_secs().max(1); + elapsed.checked_div(secs_per_slot).unwrap_or(0) +} + +#[cfg(test)] +mod tests { + use chrono::{DateTime, Utc}; + use tokio::sync::mpsc; + use tokio_util::sync::CancellationToken; + + use super::*; + use crate::{ + deadline::{DeadlineCalculator, DeadlinerTask, Result as DeadlineResult}, + dutydb::{UnsignedDataSet, UnsignedDutyData}, + signeddata::{ + AttestationData as SignedAttestationData, AttesterDuty as SignedAttesterDuty, + }, + testutils::random_core_pub_key, + types::{Duty, DutyType, SlotNumber}, + validatorapi::types::AttestationDataOpts, + }; + + /// Schedules every duty with a deadline at `MAX_UTC`, so duties are + /// `Scheduled` but never naturally expire. + struct FarFutureCalculator; + + impl DeadlineCalculator for FarFutureCalculator { + fn deadline(&self, _: &Duty) -> DeadlineResult>> { + Ok(Some(DateTime::::MAX_UTC)) + } + } + + /// Build a Component backed by a real (but never-expiring) DutyDB plus a + /// dummy upstream client. Useful for tests that only exercise endpoints + /// served from the DB. + fn make_test_component() -> (Component, Arc) { + let cancel = CancellationToken::new(); + let (deadliner, _deadliner_rx) = + DeadlinerTask::start(cancel.clone(), "validatorapi-tests", FarFutureCalculator); + // Held to keep the eviction channel's sender alive so the dutydb's + // `evict_rx` doesn't observe a closed channel. + let (_evict_tx, evict_rx) = mpsc::channel(1); + let dutydb = Arc::new(MemDB::new(deadliner, evict_rx, &cancel)); + let eth2_cl = + Arc::new(EthBeaconNodeApiClient::with_base_url("http://127.0.0.1:0").unwrap()); + let component = Component::new_insecure(eth2_cl, Arc::clone(&dutydb), 1); + (component, dutydb) + } + + #[test] + fn swap_replaces_known_pubkeys_and_keeps_unknown() { + let root = [0xAA_u8; 48]; + let share = [0xBB_u8; 48]; + let stranger = [0xCC_u8; 48]; + + let map = HashMap::from([(root, share)]); + + let mut duties = vec![ + ProposerDuty { + pubkey: format_bls_pubkey(&root), + slot: "10".to_owned(), + validator_index: "1".to_owned(), + }, + ProposerDuty { + pubkey: format_bls_pubkey(&stranger), + slot: "11".to_owned(), + validator_index: "2".to_owned(), + }, + ]; + + swap_proposer_pubshares(&mut duties, &map).unwrap(); + + assert_eq!(duties[0].pubkey, format_bls_pubkey(&share)); + assert_eq!(duties[1].pubkey, format_bls_pubkey(&stranger)); + } + + #[test] + fn swap_attester_replaces_pubkeys_and_rejects_unknown() { + let root = [0x11_u8; 48]; + let share = [0x22_u8; 48]; + let unknown = [0x33_u8; 48]; + + let map = HashMap::from([(root, share)]); + + let mut duties = vec![AttesterDuty { + pubkey: format_bls_pubkey(&root), + slot: "1".to_owned(), + committee_index: "0".to_owned(), + committee_length: "16".to_owned(), + committees_at_slot: "4".to_owned(), + validator_committee_index: "0".to_owned(), + validator_index: "5".to_owned(), + }]; + + swap_attester_pubshares(&mut duties, &map).unwrap(); + assert_eq!(duties[0].pubkey, format_bls_pubkey(&share)); + + let mut stranger_duties = vec![AttesterDuty { + pubkey: format_bls_pubkey(&unknown), + slot: "2".to_owned(), + committee_index: "0".to_owned(), + committee_length: "16".to_owned(), + committees_at_slot: "4".to_owned(), + validator_committee_index: "0".to_owned(), + validator_index: "6".to_owned(), + }]; + let err = swap_attester_pubshares(&mut stranger_duties, &map).unwrap_err(); + assert_eq!(err.status_code, StatusCode::INTERNAL_SERVER_ERROR); + } + + #[test] + fn swap_sync_committee_replaces_pubkeys_and_rejects_unknown() { + let root = [0x44_u8; 48]; + let share = [0x55_u8; 48]; + let unknown = [0x66_u8; 48]; + + let map = HashMap::from([(root, share)]); + + let mut duties = vec![SyncCommitteeDuty { + pubkey: format_bls_pubkey(&root), + validator_index: "12".to_owned(), + validator_sync_committee_indices: vec!["0".to_owned()], + }]; + swap_sync_committee_pubshares(&mut duties, &map).unwrap(); + assert_eq!(duties[0].pubkey, format_bls_pubkey(&share)); + + let mut stranger = vec![SyncCommitteeDuty { + pubkey: format_bls_pubkey(&unknown), + validator_index: "13".to_owned(), + validator_sync_committee_indices: vec![], + }]; + let err = swap_sync_committee_pubshares(&mut stranger, &map).unwrap_err(); + assert_eq!(err.status_code, StatusCode::INTERNAL_SERVER_ERROR); + } + + #[test] + fn swap_rejects_malformed_pubkey() { + let mut duties = vec![ProposerDuty { + pubkey: "0xnothex".to_owned(), + slot: "0".to_owned(), + validator_index: "0".to_owned(), + }]; + let err = swap_proposer_pubshares(&mut duties, &HashMap::new()).unwrap_err(); + assert_eq!(err.status_code, StatusCode::BAD_GATEWAY); + } + + #[tokio::test] + async fn node_version_formats_pluto_string() { + let (component, _db) = make_test_component(); + + let response = component.node_version().await.unwrap(); + + assert!(response.data.version.starts_with("obolnetwork/pluto/")); + assert!(response.data.version.contains(std::env::consts::ARCH)); + assert!(response.data.version.contains(std::env::consts::OS)); + } + + #[tokio::test] + async fn attestation_data_returns_data_stored_in_dutydb() { + const SLOT: u64 = 100; + const COMM_IDX: u64 = 4; + const V_IDX: u64 = 1; + + let (component, db) = make_test_component(); + + let unsigned = SignedAttestationData { + data: pluto_eth2api::spec::phase0::AttestationData { + slot: SLOT, + index: COMM_IDX, + beacon_block_root: [0x11; 32], + source: pluto_eth2api::spec::phase0::Checkpoint::default(), + target: pluto_eth2api::spec::phase0::Checkpoint::default(), + }, + duty: SignedAttesterDuty { + slot: SLOT, + validator_index: V_IDX, + committee_index: COMM_IDX, + committee_length: 8, + committees_at_slot: 1, + validator_committee_index: 0, + }, + }; + let mut set = UnsignedDataSet::new(); + set.insert( + random_core_pub_key(), + UnsignedDutyData::Attestation(unsigned.clone()), + ); + db.store(Duty::new(SlotNumber::new(SLOT), DutyType::Attester), set) + .await + .unwrap(); + + let response = component + .attestation_data(AttestationDataOpts { + slot: SLOT, + committee_index: COMM_IDX, + }) + .await + .unwrap(); + assert_eq!(response.data.slot, SLOT); + assert_eq!(response.data.index, COMM_IDX); + assert_eq!(response.data.beacon_block_root, [0x11; 32]); + } + + /// Storing `(SLOT, COMM_IDX)` must NOT satisfy an `attestation_data` + /// request for `(SLOT, COMM_IDX + 1)`. Verifies the dutydb is keyed on + /// the full `(slot, committee_index)` tuple, not just the slot. + #[tokio::test(start_paused = true)] + async fn attestation_data_does_not_resolve_for_wrong_committee_index() { + const SLOT: u64 = 200; + const COMM_IDX: u64 = 7; + + let (component, db) = make_test_component(); + + let unsigned = SignedAttestationData { + data: pluto_eth2api::spec::phase0::AttestationData { + slot: SLOT, + index: COMM_IDX, + beacon_block_root: [0x22; 32], + source: pluto_eth2api::spec::phase0::Checkpoint::default(), + target: pluto_eth2api::spec::phase0::Checkpoint::default(), + }, + duty: SignedAttesterDuty { + slot: SLOT, + validator_index: 9, + committee_index: COMM_IDX, + committee_length: 8, + committees_at_slot: 1, + validator_committee_index: 0, + }, + }; + let mut set = UnsignedDataSet::new(); + set.insert( + random_core_pub_key(), + UnsignedDutyData::Attestation(unsigned), + ); + db.store(Duty::new(SlotNumber::new(SLOT), DutyType::Attester), set) + .await + .unwrap(); + + // Auto-advance past the handler timeout so the await trips on the + // wrong committee_index, not on the existing one. + let err = component + .attestation_data(AttestationDataOpts { + slot: SLOT, + committee_index: COMM_IDX + 1, + }) + .await + .unwrap_err(); + assert_eq!(err.status_code, StatusCode::REQUEST_TIMEOUT); + } + + /// Verifies the handler enforces `ATTESTATION_DATA_TIMEOUT` — an + /// `await_attestation` for a slot that is never stored returns 408 + /// instead of hanging. + #[tokio::test(start_paused = true)] + async fn attestation_data_times_out_when_data_never_arrives() { + let (component, _db) = make_test_component(); + + let err = component + .attestation_data(AttestationDataOpts { + slot: 999, + committee_index: 0, + }) + .await + .unwrap_err(); + assert_eq!(err.status_code, StatusCode::REQUEST_TIMEOUT); + } + + /// Verifies that when the dutydb evicts the awaited duty (via the + /// deadliner), the in-flight handler exits promptly with + /// `REQUEST_TIMEOUT` instead of parking on the notify forever. + #[tokio::test] + async fn attestation_data_returns_408_when_duty_is_evicted() { + use tokio::sync::mpsc::channel; + + const SLOT: u64 = 333; + const COMM_IDX: u64 = 1; + + // Hand-build a Component whose dutydb shares its eviction channel + // with the test, so we can drive eviction deterministically. + let cancel = CancellationToken::new(); + let (deadliner, _deadliner_rx) = + DeadlinerTask::start(cancel.clone(), "validatorapi-tests", FarFutureCalculator); + let (trim_tx, trim_rx) = channel::(8); + let dutydb = Arc::new(MemDB::new(deadliner, trim_rx, &cancel)); + let eth2_cl = + Arc::new(EthBeaconNodeApiClient::with_base_url("http://127.0.0.1:0").unwrap()); + let component = Component::new_insecure(eth2_cl, Arc::clone(&dutydb), 1); + + // Start an await before any data is stored. + let waiter = { + let component = Arc::new(component); + let c = Arc::clone(&component); + tokio::spawn(async move { + c.attestation_data(AttestationDataOpts { + slot: SLOT, + committee_index: COMM_IDX, + }) + .await + }) + }; + + // Yield so the waiter parks. + tokio::task::yield_now().await; + + // Simulate the deadliner emitting an eviction for this slot… + trim_tx + .send(Duty::new(SlotNumber::new(SLOT), DutyType::Attester)) + .await + .unwrap(); + + // …then trigger eviction processing by storing an unrelated duty. + let unsigned = SignedAttestationData { + data: pluto_eth2api::spec::phase0::AttestationData { + slot: SLOT.saturating_add(1), + index: 0, + beacon_block_root: [0x33; 32], + source: pluto_eth2api::spec::phase0::Checkpoint::default(), + target: pluto_eth2api::spec::phase0::Checkpoint::default(), + }, + duty: SignedAttesterDuty { + slot: SLOT.saturating_add(1), + validator_index: 0, + committee_index: 0, + committee_length: 8, + committees_at_slot: 1, + validator_committee_index: 0, + }, + }; + let mut set = UnsignedDataSet::new(); + set.insert( + random_core_pub_key(), + UnsignedDutyData::Attestation(unsigned), + ); + dutydb + .store( + Duty::new(SlotNumber::new(SLOT.saturating_add(1)), DutyType::Attester), + set, + ) + .await + .unwrap(); + + let err = waiter.await.unwrap().unwrap_err(); + assert_eq!(err.status_code, StatusCode::REQUEST_TIMEOUT); + } + + /// Verifies that dropping the handler future releases the dutydb + /// waiter — the next store() should not see a hanging reader on the + /// state lock. + #[tokio::test] + async fn attestation_data_drops_waiter_when_future_dropped() { + let (component, db) = make_test_component(); + let component = Arc::new(component); + + let waiter = { + let component = Arc::clone(&component); + tokio::spawn(async move { + component + .attestation_data(AttestationDataOpts { + slot: 4242, + committee_index: 0, + }) + .await + }) + }; + + tokio::task::yield_now().await; + waiter.abort(); + let _ = waiter.await; + + // Confirm db is still usable — store should not deadlock. + let unsigned = SignedAttestationData { + data: pluto_eth2api::spec::phase0::AttestationData { + slot: 1, + index: 0, + beacon_block_root: [0x44; 32], + source: pluto_eth2api::spec::phase0::Checkpoint::default(), + target: pluto_eth2api::spec::phase0::Checkpoint::default(), + }, + duty: SignedAttesterDuty { + slot: 1, + validator_index: 0, + committee_index: 0, + committee_length: 8, + committees_at_slot: 1, + validator_committee_index: 0, + }, + }; + let mut set = UnsignedDataSet::new(); + set.insert( + random_core_pub_key(), + UnsignedDutyData::Attestation(unsigned), + ); + db.store(Duty::new(SlotNumber::new(1), DutyType::Attester), set) + .await + .unwrap(); + } + + /// `map_dutydb_error` covers the three distinguishable variants from + /// `crate::dutydb::Error`. + #[test] + fn map_dutydb_error_status_codes() { + assert_eq!( + map_dutydb_error(DutyDbError::Shutdown).status_code, + StatusCode::SERVICE_UNAVAILABLE + ); + assert_eq!( + map_dutydb_error(DutyDbError::AwaitDutyExpired).status_code, + StatusCode::REQUEST_TIMEOUT + ); + assert_eq!( + map_dutydb_error(DutyDbError::UnsupportedDutyType).status_code, + StatusCode::INTERNAL_SERVER_ERROR + ); + } + + /// `upstream_status_error` keeps the upstream response body out of the + /// client-visible message but preserves it on `source()` so it lands in + /// the debug log. + #[test] + fn upstream_status_error_does_not_leak_body_into_message() { + use pluto_eth2api::BlindedBlock400Response; + + let body = BlindedBlock400Response { + code: 503.0, + message: "secret upstream stacktrace path=/etc/secret".to_owned(), + stacktraces: Some(vec!["at /etc/secret/lighthouse:42".to_owned()]), + }; + let err = upstream_status_error(StatusCode::SERVICE_UNAVAILABLE, "attester duties", body); + + assert_eq!(err.status_code, StatusCode::SERVICE_UNAVAILABLE); + assert!(!err.message.contains("secret")); + assert!(!err.message.contains("stacktrace")); + // But the source carries it for debug logging. + let src = err.source.as_ref().unwrap().to_string(); + assert!(src.contains("secret")); + } + + /// `upstream_unexpected` mirrors `upstream_status_error`'s no-leak shape + /// for the `Unknown` / `InternalServerError` arms. + #[test] + fn upstream_unexpected_does_not_leak_variant_into_message() { + let err = upstream_unexpected("attester duties", GetAttesterDutiesResponse::Unknown); + assert_eq!(err.status_code, StatusCode::BAD_GATEWAY); + assert!(!err.message.contains("Unknown")); + assert!(err.source.as_ref().unwrap().to_string().contains("Unknown")); + } + + // ==================================================================== + // Plumbing tests — Subscribe / Register* / verify_partial_sig + // ==================================================================== + + use std::sync::Mutex; + + use pluto_crypto::{blst_impl::BlstImpl, tbls::Tbls}; + use pluto_eth2util::signing::DomainName; + use pluto_testutil::BeaconMock; + use serde_json::json; + + use crate::{ + signeddata::{SignedRandao, SyncContribution, VersionedAggregatedAttestation}, + types::PubKey, + }; + + fn dv_pubkey(byte: u8) -> BLSPubKey { + [byte; 48] + } + + fn core_pubkey(byte: u8) -> PubKey { + PubKey::new([byte; 48]) + } + + /// Build a component with one DV pubkey/share pair and a deterministic + /// pub_share_by_pubkey map. + fn make_plumbed_component(map: HashMap) -> Component { + let cancel = CancellationToken::new(); + let (deadliner, _deadliner_rx) = DeadlinerTask::start( + cancel.clone(), + "validatorapi-plumbing-tests", + FarFutureCalculator, + ); + let (_evict_tx, evict_rx) = mpsc::channel(1); + let dutydb = Arc::new(MemDB::new(deadliner, evict_rx, &cancel)); + let eth2_cl = + Arc::new(EthBeaconNodeApiClient::with_base_url("http://127.0.0.1:0").unwrap()); + Component::new(eth2_cl, dutydb, 1, map, false) + } + + /// `Subscribe` invokes every registered subscriber, each receiving its + /// own clone of the set. Mutating one clone does not affect the others. + /// Mirrors Go's `Subscribe` clone-before-fanout behaviour. + #[tokio::test] + async fn subscribe_fanouts_clones_to_every_subscriber() { + let mut component = make_plumbed_component(HashMap::new()); + + let received: Arc>> = Arc::new(Mutex::new(Vec::new())); + + // Two validator entries in the input set. + let key_a = core_pubkey(0x11); + let key_b = core_pubkey(0x22); + + // First subscriber: records the set size, then mutates its own copy + // by removing one entry. The mutation must NOT leak into the second + // subscriber's copy. + { + let received = Arc::clone(&received); + component.subscribe(move |_duty, mut set| { + let received = Arc::clone(&received); + async move { + received.lock().unwrap().push(set.inner().len()); + set.remove(&key_a); + Ok(()) + } + }); + } + // Second subscriber: also records the set size — must see the + // pristine size (2), not the first subscriber's mutated size (1). + { + let received = Arc::clone(&received); + component.subscribe(move |_duty, set| { + let received = Arc::clone(&received); + async move { + received.lock().unwrap().push(set.inner().len()); + Ok(()) + } + }); + } + + // Build a set with two entries. Use SignedRandao — the simplest + // ParSignedData wrapper that doesn't require populating spec fields. + let mut set = ParSignedDataSet::new(); + set.insert(key_a, SignedRandao::new_partial(0, [0; 96], 1)); + set.insert(key_b, SignedRandao::new_partial(0, [0; 96], 1)); + let duty = Duty::new(SlotNumber::new(1), DutyType::Attester); + + // Fanout: every sub must observe its own clone. + for sub in component.subs.iter() { + sub(&duty, set.clone()).await.unwrap(); + } + + let observed = received.lock().unwrap().clone(); + assert_eq!( + observed, + vec![2, 2], + "both subscribers see the pristine (uncloned) set size" + ); + } + + /// `register_await_proposal` overwrites a prior registration — only the + /// most recently registered closure is invoked. Mirrors Go's + /// single-function-input semantics. + #[tokio::test] + async fn register_await_proposal_overwrites_prior_registration() { + let mut component = make_plumbed_component(HashMap::new()); + + let calls_a: Arc> = Arc::new(Mutex::new(0)); + let calls_b: Arc> = Arc::new(Mutex::new(0)); + + { + let calls_a = Arc::clone(&calls_a); + component.register_await_proposal(move |_slot| { + let calls_a = Arc::clone(&calls_a); + async move { + *calls_a.lock().unwrap() += 1; + Err("first registration".into()) + } + }); + } + { + let calls_b = Arc::clone(&calls_b); + component.register_await_proposal(move |_slot| { + let calls_b = Arc::clone(&calls_b); + async move { + *calls_b.lock().unwrap() += 1; + Err("second registration".into()) + } + }); + } + + // The component holds the second registration only. + let fut = (component.await_proposal_fn.as_ref().unwrap())(42); + let _ = fut.await; + + assert_eq!(*calls_a.lock().unwrap(), 0); + assert_eq!(*calls_b.lock().unwrap(), 1); + } + + /// `register_await_agg_attestation` / `register_await_sync_contribution` / + /// `register_await_agg_sig_db` / `register_get_duty_definition` / + /// `register_pub_key_by_attestation` all follow the same overwrite-on- + /// re-register semantics. Spot-check the remaining five hooks store the + /// most-recent closure. + #[tokio::test] + async fn other_register_hooks_store_most_recent_closure() { + let mut component = make_plumbed_component(HashMap::new()); + + component.register_await_agg_attestation(|_slot, _root| async { + Err::("a1".into()) + }); + component.register_await_agg_attestation(|_slot, _root| async { + Err::("a2".into()) + }); + let err = (component.await_agg_attestation_fn.as_ref().unwrap())(0, [0; 32]) + .await + .unwrap_err(); + assert_eq!(err.to_string(), "a2"); + + component.register_await_sync_contribution(|_, _, _| async { + Err::("s1".into()) + }); + component.register_await_sync_contribution(|_, _, _| async { + Err::("s2".into()) + }); + let err = (component.await_sync_contribution_fn.as_ref().unwrap())(0, 0, [0; 32]) + .await + .unwrap_err(); + assert_eq!(err.to_string(), "s2"); + + component.register_await_agg_sig_db(|_duty, _pk| async { + Err::, _>("d1".into()) + }); + component.register_await_agg_sig_db(|_duty, _pk| async { + Err::, _>("d2".into()) + }); + let err = (component.await_agg_sig_db_fn.as_ref().unwrap())( + Duty::new(SlotNumber::new(0), DutyType::Attester), + core_pubkey(0), + ) + .await + .unwrap_err(); + assert_eq!(err.to_string(), "d2"); + + component.register_get_duty_definition(|_duty| async { + Err::, _>("def1".into()) + }); + component.register_get_duty_definition(|_duty| async { + Err::, _>("def2".into()) + }); + let err = (component.duty_def_fn.as_ref().unwrap())(Duty::new( + SlotNumber::new(0), + DutyType::Attester, + )) + .await + .unwrap_err(); + assert_eq!(err.to_string(), "def2"); + + component + .register_pub_key_by_attestation(|_, _, _| async { Err::("p1".into()) }); + component + .register_pub_key_by_attestation(|_, _, _| async { Err::("p2".into()) }); + let err = (component.pub_key_by_att_fn.as_ref().unwrap())(0, 0, 0) + .await + .unwrap_err(); + assert_eq!(err.to_string(), "p2"); + } + + /// Sanity-check: a never-registered hook is `None` so callers can + /// distinguish "not wired up" from "errored". + #[tokio::test] + async fn unregistered_hooks_default_to_none() { + let component = make_plumbed_component(HashMap::new()); + assert!(component.await_proposal_fn.is_none()); + assert!(component.await_agg_attestation_fn.is_none()); + assert!(component.await_sync_contribution_fn.is_none()); + assert!(component.await_agg_sig_db_fn.is_none()); + assert!(component.duty_def_fn.is_none()); + assert!(component.pub_key_by_att_fn.is_none()); + assert!(component.active_validators_fn.is_none()); + assert!(component.subs.is_empty()); + } + + /// Mirrors signing-fixture spec from `pluto_eth2util::signing` tests so + /// `verify_partial_sig` can resolve a real beacon-attester domain. + fn signing_spec_fixture() -> serde_json::Value { + json!({ + "DOMAIN_BEACON_PROPOSER": "0x00000000", + "DOMAIN_BEACON_ATTESTER": "0x01000000", + "DOMAIN_RANDAO": "0x02000000", + "DOMAIN_VOLUNTARY_EXIT": "0x04000000", + "DOMAIN_APPLICATION_BUILDER": "0x00000001", + "ALTAIR_FORK_VERSION": "0x01020304", + "ALTAIR_FORK_EPOCH": "10", + "BELLATRIX_FORK_VERSION": "0x02030405", + "BELLATRIX_FORK_EPOCH": "20", + "CAPELLA_FORK_VERSION": "0x03040506", + "CAPELLA_FORK_EPOCH": "30", + "DENEB_FORK_VERSION": "0x04050607", + "DENEB_FORK_EPOCH": "40", + "ELECTRA_FORK_VERSION": "0x05060708", + "ELECTRA_FORK_EPOCH": "50", + "FULU_FORK_VERSION": "0x06070809", + "FULU_FORK_EPOCH": "60" + }) + } + + async fn mock_beacon_for_signing() -> BeaconMock { + BeaconMock::builder() + .spec(signing_spec_fixture()) + .genesis_time(DateTime::from_timestamp(0, 0).unwrap()) + .genesis_validators_root([0; 32]) + .build() + .await + .unwrap() + } + + /// Helper: build a verify_partial_sig-ready component pinned to a real + /// beacon-mock client and a known DV-root → public-share map. + async fn make_verify_component(map: HashMap) -> (Component, BeaconMock) { + let mock = mock_beacon_for_signing().await; + let cancel = CancellationToken::new(); + let (deadliner, _deadliner_rx) = DeadlinerTask::start( + cancel.clone(), + "validatorapi-verify-tests", + FarFutureCalculator, + ); + let (_evict_tx, evict_rx) = mpsc::channel(1); + let dutydb = Arc::new(MemDB::new(deadliner, evict_rx, &cancel)); + let eth2_cl = Arc::new(EthBeaconNodeApiClient::with_base_url(mock.uri()).unwrap()); + let component = Component::new(eth2_cl, dutydb, 1, map, false); + (component, mock) + } + + /// `verify_partial_sig` accepts a correctly signed share and rejects an + /// invalid one — same domain/epoch/message-root, but a tampered + /// signature. + #[tokio::test] + async fn verify_partial_sig_accepts_valid_and_rejects_invalid() { + // Generate a BLS keypair to act as this node's public share. + let secret = BlstImpl + .generate_insecure_secret(rand::rngs::OsRng) + .unwrap(); + let pubshare = BlstImpl.secret_to_public_key(&secret).unwrap(); + + let dv_root = dv_pubkey(0xAA); + let map = HashMap::from([(dv_root, pubshare)]); + + let (component, mock) = make_verify_component(map).await; + + let domain = DomainName::BeaconAttester; + let epoch: Epoch = 0; + let message_root: Root = [0x42; 32]; + + // Compute the signing root the same way `signing::verify` does, then + // sign it with the share's secret. + let signing_root = + pluto_eth2util::signing::get_data_root(mock.client(), domain, epoch, message_root) + .await + .unwrap(); + let good_signature = BlstImpl.sign(&secret, &signing_root).unwrap(); + + component + .verify_partial_sig(&dv_root, domain, epoch, message_root, &good_signature) + .await + .expect("valid signature accepted"); + + // Tamper one byte of the signature. + let mut bad_signature = good_signature; + bad_signature[0] ^= 0xFF; + let err = component + .verify_partial_sig(&dv_root, domain, epoch, message_root, &bad_signature) + .await + .unwrap_err(); + assert!( + matches!(err, VerifyPartialSigError::Signing(_)), + "expected Signing error, got {err:?}" + ); + } + + /// `verify_partial_sig` rejects when this node has no public share + /// registered for the provided DV root pubkey. Mirrors Go's + /// `getVerifyShareFunc -> errors.New("unknown public key")` branch. + #[tokio::test] + async fn verify_partial_sig_rejects_unknown_pubkey() { + let (component, _mock) = make_verify_component(HashMap::new()).await; + let err = component + .verify_partial_sig( + &dv_pubkey(0xBB), + DomainName::BeaconAttester, + 0, + [0; 32], + &[0; 96], + ) + .await + .unwrap_err(); + assert!(matches!(err, VerifyPartialSigError::UnknownPubKey)); + } + + /// `verify_partial_sig` short-circuits when `insecure_test` is set — + /// mirrors the Go early-return at `validatorapi.go:1353`. This must + /// succeed even with a zero pubshare lookup and zero signature, so we + /// know no BLS verify ran. + #[tokio::test] + async fn verify_partial_sig_skipped_in_insecure_test_mode() { + let cancel = CancellationToken::new(); + let (deadliner, _deadliner_rx) = DeadlinerTask::start( + cancel.clone(), + "validatorapi-insecure-tests", + FarFutureCalculator, + ); + let (_evict_tx, evict_rx) = mpsc::channel(1); + let dutydb = Arc::new(MemDB::new(deadliner, evict_rx, &cancel)); + let eth2_cl = + Arc::new(EthBeaconNodeApiClient::with_base_url("http://127.0.0.1:0").unwrap()); + let component = Component::new_insecure(eth2_cl, dutydb, 1); + + component + .verify_partial_sig( + &dv_pubkey(0xCC), + DomainName::BeaconAttester, + 0, + [0; 32], + &[0; 96], + ) + .await + .expect("insecure_test mode skips verification"); + } + + // ==================================================================== + // submit_voluntary_exit / submit_validator_registrations + // ==================================================================== + + use pluto_eth2api::{ + v1::{SignedValidatorRegistration as V1SignedRegistration, ValidatorRegistration}, + versioned::{BuilderVersion, VersionedSignedValidatorRegistration as VersionedRegPayload}, + }; + + /// Builds a [`Component`] in insecure-test mode but with a real + /// `BeaconMock` upstream so `fetch_slots_config` / `fetch_genesis_time` + /// resolve. Useful for exercising the submit handlers without the BLS + /// verification step. + async fn make_submit_component_insecure( + builder_enabled: bool, + pub_share_by_pubkey: HashMap, + ) -> (Component, BeaconMock) { + let mock = submit_mock().await; + let cancel = CancellationToken::new(); + let (deadliner, _deadliner_rx) = DeadlinerTask::start( + cancel.clone(), + "validatorapi-submit-tests", + FarFutureCalculator, + ); + let (_evict_tx, evict_rx) = mpsc::channel(1); + let dutydb = Arc::new(MemDB::new(deadliner, evict_rx, &cancel)); + let eth2_cl = Arc::new(EthBeaconNodeApiClient::with_base_url(mock.uri()).unwrap()); + let mut component = + Component::new(eth2_cl, dutydb, 1, pub_share_by_pubkey, builder_enabled); + component.insecure_test = true; + (component, mock) + } + + /// Default beacon-mock spec used by submit tests — `signing_spec_fixture` + /// plus the `SECONDS_PER_SLOT` / `SLOTS_PER_EPOCH` fields needed by + /// `fetch_slots_config`. + fn submit_spec_fixture() -> serde_json::Value { + let mut spec = signing_spec_fixture(); + let obj = spec.as_object_mut().unwrap(); + obj.insert("SECONDS_PER_SLOT".to_owned(), json!("12")); + obj.insert("SLOTS_PER_EPOCH".to_owned(), json!("32")); + spec + } + + async fn submit_mock() -> BeaconMock { + BeaconMock::builder() + .spec(submit_spec_fixture()) + .genesis_time(DateTime::from_timestamp(0, 0).unwrap()) + .genesis_validators_root([0; 32]) + .build() + .await + .unwrap() + } + + fn make_signed_exit(epoch: Epoch, validator_index: u64, sig: [u8; 96]) -> SignedVoluntaryExit { + SignedVoluntaryExit(pluto_eth2api::spec::phase0::SignedVoluntaryExit { + message: pluto_eth2api::spec::phase0::VoluntaryExit { + epoch, + validator_index, + }, + signature: sig, + }) + } + + fn make_signed_registration( + pubkey: BLSPubKey, + timestamp: u64, + sig: [u8; 96], + ) -> SignedValidatorRegistration { + SignedValidatorRegistration(VersionedRegPayload { + version: BuilderVersion::V1, + v1: Some(V1SignedRegistration { + message: ValidatorRegistration { + fee_recipient: [0x11; 20], + gas_limit: 30_000_000, + timestamp, + pubkey, + }, + signature: sig, + }), + }) + } + + /// Captures every `(duty, set)` tuple a subscriber receives. Mirrors the + /// pattern used by the `subscribe_fanouts_clones_to_every_subscriber` + /// test above. + type CapturedFanouts = Arc>>; + + fn install_capture(component: &mut Component) -> CapturedFanouts { + let captured: CapturedFanouts = Arc::new(Mutex::new(Vec::new())); + let captured_clone = Arc::clone(&captured); + component.subscribe(move |duty, set| { + let captured_clone = Arc::clone(&captured_clone); + async move { + captured_clone.lock().unwrap().push((duty, set)); + Ok(()) + } + }); + captured + } + + /// `submit_voluntary_exit` resolves the validator-index through the + /// registered `active_validators_fn`, builds a voluntary-exit duty, and + /// fans out to every subscriber. Insecure-test mode bypasses BLS + /// verification so the test can use a placeholder signature. + #[tokio::test] + async fn submit_voluntary_exit_resolves_validator_and_fanouts() { + const EPOCH: u64 = 7; + const VAL_IDX: u64 = 42; + const SLOTS_PER_EPOCH: u64 = 32; + + let dv_root = dv_pubkey(0xAA); + let share = dv_pubkey(0xBB); + let map = HashMap::from([(dv_root, share)]); + + let (mut component, _mock) = make_submit_component_insecure(false, map).await; + + component.register_active_validators(move || { + let active = HashMap::from([(VAL_IDX, dv_root)]); + async move { Ok(active) } + }); + + let captured = install_capture(&mut component); + + let exit = make_signed_exit(EPOCH, VAL_IDX, [0x99; 96]); + component.submit_voluntary_exit(exit).await.unwrap(); + + let fanouts = captured.lock().unwrap(); + assert_eq!(fanouts.len(), 1, "exactly one subscriber invocation"); + let (duty, set) = &fanouts[0]; + + // Duty: voluntary-exit duty keyed at slots_per_epoch * exit_epoch. + assert_eq!(duty.duty_type, DutyType::Exit); + assert_eq!(duty.slot.inner(), SLOTS_PER_EPOCH.saturating_mul(EPOCH)); + + // ParSignedDataSet: indexed by the core PubKey of the DV root. + assert_eq!(set.inner().len(), 1); + let par = set.inner().get(&core_pubkey_from(dv_root)).unwrap(); + assert_eq!(par.share_idx, 1); + } + + /// `submit_voluntary_exit` rejects with a 400 when the validator index is + /// not present in the active set (Go: `errors.New("validator not + /// found")`). + #[tokio::test] + async fn submit_voluntary_exit_rejects_unknown_validator() { + let (mut component, _mock) = make_submit_component_insecure(false, HashMap::new()).await; + + component.register_active_validators(|| async { + // Empty active set — no validator at any index. + Ok::, CallbackError>(HashMap::new()) + }); + + let exit = make_signed_exit(0, 9, [0u8; 96]); + let err = component.submit_voluntary_exit(exit).await.unwrap_err(); + assert_eq!(err.status_code, StatusCode::BAD_REQUEST); + assert_eq!(err.message, "validator not found"); + } + + /// `submit_voluntary_exit` returns 500 when no `active_validators_fn` + /// hook has been registered. Mirrors the Go programmer-error path where + /// `c.eth2Cl` is required infrastructure. + #[tokio::test] + async fn submit_voluntary_exit_returns_500_when_hook_unregistered() { + let (component, _mock) = make_submit_component_insecure(false, HashMap::new()).await; + + let exit = make_signed_exit(0, 1, [0u8; 96]); + let err = component.submit_voluntary_exit(exit).await.unwrap_err(); + assert_eq!(err.status_code, StatusCode::INTERNAL_SERVER_ERROR); + } + + /// `submit_voluntary_exit` rejects an exit whose BLS signature does not + /// verify against the registered public share. Uses a real beacon-mock + /// upstream + real BLS so the verification path actually runs. + #[tokio::test] + async fn submit_voluntary_exit_rejects_bad_signature() { + const VAL_IDX: u64 = 5; + const EPOCH: u64 = 3; + + let secret = BlstImpl + .generate_insecure_secret(rand::rngs::OsRng) + .unwrap(); + let pubshare = BlstImpl.secret_to_public_key(&secret).unwrap(); + let dv_root = dv_pubkey(0xCC); + let map = HashMap::from([(dv_root, pubshare)]); + + let mock = submit_mock().await; + let cancel = CancellationToken::new(); + let (deadliner, _deadliner_rx) = DeadlinerTask::start( + cancel.clone(), + "validatorapi-submit-bad-sig", + FarFutureCalculator, + ); + let (_evict_tx, evict_rx) = mpsc::channel(1); + let dutydb = Arc::new(MemDB::new(deadliner, evict_rx, &cancel)); + let eth2_cl = Arc::new(EthBeaconNodeApiClient::with_base_url(mock.uri()).unwrap()); + let mut component = Component::new(eth2_cl, dutydb, 1, map, false); + + component.register_active_validators(move || { + let active = HashMap::from([(VAL_IDX, dv_root)]); + async move { Ok(active) } + }); + + let exit = make_signed_exit(EPOCH, VAL_IDX, [0x42; 96]); + let err = component.submit_voluntary_exit(exit).await.unwrap_err(); + assert_eq!(err.status_code, StatusCode::BAD_REQUEST); + } + + /// `submit_validator_registrations` returns Ok without fanout when + /// builder mode is disabled. Mirrors Go's + /// `validatorapi.go:737-739` swallow-on-disable branch. + #[tokio::test] + async fn submit_validator_registrations_swallows_when_builder_disabled() { + let dv_root = dv_pubkey(0xDD); + let share = dv_pubkey(0xEE); + let map = HashMap::from([(dv_root, share)]); + + let (mut component, _mock) = make_submit_component_insecure(false, map).await; + let captured = install_capture(&mut component); + + let reg = make_signed_registration(dv_root, 1_000_000, [0x00; 96]); + component + .submit_validator_registrations(vec![reg]) + .await + .unwrap(); + + assert!( + captured.lock().unwrap().is_empty(), + "no fanout when builder mode disabled" + ); + } + + /// `submit_validator_registrations` returns Ok with no fanout on an + /// empty input list — even with builder mode enabled. Mirrors Go's + /// `validatorapi.go:732-734` early return. + #[tokio::test] + async fn submit_validator_registrations_no_op_on_empty_input() { + let (mut component, _mock) = make_submit_component_insecure(true, HashMap::new()).await; + let captured = install_capture(&mut component); + + component + .submit_validator_registrations(Vec::new()) + .await + .unwrap(); + + assert!(captured.lock().unwrap().is_empty()); + } + + /// `submit_validator_registrations` silently skips entries whose pubkey + /// is not a DV root key on this node — same as Go's per-pubkey + /// `swallowRegFilter` branch (`validatorapi.go:686-691`). + #[tokio::test] + async fn submit_validator_registrations_swallows_non_dv_pubkey() { + let dv_root = dv_pubkey(0x55); + let share = dv_pubkey(0x66); + let map = HashMap::from([(dv_root, share)]); + + let (mut component, _mock) = make_submit_component_insecure(true, map).await; + let captured = install_capture(&mut component); + + // Registration for a pubkey not registered on this node. + let reg = make_signed_registration(dv_pubkey(0xFF), 1_000_000, [0x00; 96]); + component + .submit_validator_registrations(vec![reg]) + .await + .unwrap(); + + assert!( + captured.lock().unwrap().is_empty(), + "non-DV registration is swallowed without fanout" + ); + } + + /// `submit_validator_registrations` happy path: a DV registration is + /// verified (skipped in insecure-test mode) and fanned out to every + /// subscriber with a `BuilderRegistration` duty. + #[tokio::test] + async fn submit_validator_registrations_happy_path_fanouts() { + let dv_root = dv_pubkey(0x77); + let share = dv_pubkey(0x88); + let map = HashMap::from([(dv_root, share)]); + + let (mut component, _mock) = make_submit_component_insecure(true, map).await; + let captured = install_capture(&mut component); + + // timestamp = genesis + 24s => slot = 2 (with 12s slot duration). + let reg = make_signed_registration(dv_root, 24, [0x00; 96]); + component + .submit_validator_registrations(vec![reg]) + .await + .unwrap(); + + let fanouts = captured.lock().unwrap(); + assert_eq!(fanouts.len(), 1); + let (duty, set) = &fanouts[0]; + assert_eq!(duty.duty_type, DutyType::BuilderRegistration); + assert_eq!(duty.slot.inner(), 2); + + assert_eq!(set.inner().len(), 1); + let par = set.inner().get(&core_pubkey_from(dv_root)).unwrap(); + assert_eq!(par.share_idx, 1); + } + + /// `submit_validator_registrations` rejects an entry whose BLS signature + /// does not verify against the registered public share. Uses a real + /// upstream + real BLS to drive the verification path. + #[tokio::test] + async fn submit_validator_registrations_rejects_bad_signature() { + let secret = BlstImpl + .generate_insecure_secret(rand::rngs::OsRng) + .unwrap(); + let pubshare = BlstImpl.secret_to_public_key(&secret).unwrap(); + let dv_root = dv_pubkey(0xA5); + let map = HashMap::from([(dv_root, pubshare)]); + + let mock = submit_mock().await; + let cancel = CancellationToken::new(); + let (deadliner, _deadliner_rx) = DeadlinerTask::start( + cancel.clone(), + "validatorapi-submit-reg-bad-sig", + FarFutureCalculator, + ); + let (_evict_tx, evict_rx) = mpsc::channel(1); + let dutydb = Arc::new(MemDB::new(deadliner, evict_rx, &cancel)); + let eth2_cl = Arc::new(EthBeaconNodeApiClient::with_base_url(mock.uri()).unwrap()); + let component = Component::new(eth2_cl, dutydb, 1, map, true); + + let reg = make_signed_registration(dv_root, 24, [0x42; 96]); + let err = component + .submit_validator_registrations(vec![reg]) + .await + .unwrap_err(); + assert_eq!(err.status_code, StatusCode::BAD_REQUEST); + } + + /// Build a core [`PubKey`] from a 48-byte BLS pubkey (`BLSPubKey`). + fn core_pubkey_from(bls: BLSPubKey) -> PubKey { + PubKey::new(bls) + } +} diff --git a/crates/core/src/validatorapi/error.rs b/crates/core/src/validatorapi/error.rs index 12064c51..e13c440d 100644 --- a/crates/core/src/validatorapi/error.rs +++ b/crates/core/src/validatorapi/error.rs @@ -47,6 +47,18 @@ impl ApiError { self.source = Some(Box::new(source)); self } + + /// Attaches a boxed source error for debug logging. Use this when the + /// upstream error is not `std::error::Error` itself (e.g. `anyhow::Error`, + /// which only implements `AsRef` and converts via `.into()`). + #[must_use] + pub fn with_boxed_source( + mut self, + source: Box, + ) -> Self { + self.source = Some(source); + self + } } impl fmt::Display for ApiError { diff --git a/crates/core/src/validatorapi/handler.rs b/crates/core/src/validatorapi/handler.rs index 9aebe758..b1a3a050 100644 --- a/crates/core/src/validatorapi/handler.rs +++ b/crates/core/src/validatorapi/handler.rs @@ -5,11 +5,12 @@ use async_trait::async_trait; use super::{ error::ApiError, types::{ - AggregateAttestationOpts, AttestationData, AttestationDataOpts, AttesterDutiesOpts, - AttesterDuty, BeaconCommitteeSelection, EthResponse, ProposalOpts, ProposerDutiesOpts, - ProposerDuty, SignedContributionAndProof, SignedValidatorRegistration, SignedVoluntaryExit, - SyncCommitteeContribution, SyncCommitteeContributionOpts, SyncCommitteeDutiesOpts, - SyncCommitteeDuty, SyncCommitteeMessage, SyncCommitteeSelection, Validator, ValidatorsOpts, + AggregateAttestationOpts, AttestationDataOpts, AttestationDataResponse, AttesterDutiesOpts, + AttesterDutiesResponse, BeaconCommitteeSelection, EthResponse, NodeVersionResponse, + ProposalOpts, ProposerDutiesOpts, ProposerDutiesResponse, SignedContributionAndProof, + SignedValidatorRegistration, SignedVoluntaryExit, SyncCommitteeContribution, + SyncCommitteeContributionOpts, SyncCommitteeDutiesOpts, SyncCommitteeDutiesResponse, + SyncCommitteeMessage, SyncCommitteeSelection, Validator, ValidatorsOpts, VersionedAttestation, VersionedProposal, VersionedSignedAggregateAndProof, VersionedSignedBlindedProposal, VersionedSignedProposal, }, @@ -27,25 +28,25 @@ pub trait Handler: Send + Sync + 'static { async fn attester_duties( &self, opts: AttesterDutiesOpts, - ) -> Result>, ApiError>; + ) -> Result; /// `GET /eth/v1/validator/duties/proposer/{epoch}`. async fn proposer_duties( &self, opts: ProposerDutiesOpts, - ) -> Result>, ApiError>; + ) -> Result; /// `POST /eth/v1/validator/duties/sync/{epoch}`. async fn sync_committee_duties( &self, opts: SyncCommitteeDutiesOpts, - ) -> Result>, ApiError>; + ) -> Result; /// `GET /eth/v1/validator/attestation_data`. async fn attestation_data( &self, opts: AttestationDataOpts, - ) -> Result, ApiError>; + ) -> Result; /// `POST /eth/v2/beacon/pool/attestations`. async fn submit_attestations( @@ -126,5 +127,5 @@ pub trait Handler: Send + Sync + 'static { ) -> Result<(), ApiError>; /// `GET /eth/v1/node/version`. - async fn node_version(&self) -> Result, ApiError>; + async fn node_version(&self) -> Result; } diff --git a/crates/core/src/validatorapi/mod.rs b/crates/core/src/validatorapi/mod.rs index 399ef53a..8442859c 100644 --- a/crates/core/src/validatorapi/mod.rs +++ b/crates/core/src/validatorapi/mod.rs @@ -3,12 +3,17 @@ //! Serves the subset of beacon-API endpoints related to distributed //! validation and reverse-proxies the rest to the upstream beacon node. +pub mod component; pub mod error; pub mod handler; pub mod metrics; pub mod router; pub mod types; +#[cfg(test)] +pub mod testutils; + +pub use component::Component; pub use error::ApiError; pub use handler::Handler; pub use router::new_router; diff --git a/crates/core/src/validatorapi/router.rs b/crates/core/src/validatorapi/router.rs index bbf70f5d..c71031ec 100644 --- a/crates/core/src/validatorapi/router.rs +++ b/crates/core/src/validatorapi/router.rs @@ -3,27 +3,66 @@ //! The endpoint table preserves the order of the upstream definition, //! including which endpoints unconditionally respond `404`. +use std::sync::Arc; + use axum::{ - Router, + Json, Router, + extract::{DefaultBodyLimit, Path, Query, State, rejection::QueryRejection}, + http::StatusCode, response::IntoResponse, - routing::{get, post}, + routing::{MethodRouter, get, post}, +}; +use serde::Deserialize; + +/// Cap on the `POST /eth/v1/validator/duties/{attester,sync}/{epoch}` request +/// bodies. A realistic cluster ships at most a few thousand validator indices; +/// 64 KiB still allows ~10k indices in either numeric or string encoding, +/// well above any plausible workload. +const DUTIES_BODY_LIMIT: usize = 64 * 1024; + +use super::{ + error::ApiError, + handler::Handler, + types::{ + AttestationDataOpts, AttestationDataResponse, AttesterDutiesOpts, AttesterDutiesResponse, + CommitteeIndex, NodeVersionResponse, ProposerDutiesOpts, ProposerDutiesResponse, + SyncCommitteeDutiesOpts, SyncCommitteeDutiesResponse, ValIndexes, + }, }; -use super::{error::ApiError, handler::Handler}; +/// Query parameters for `GET /eth/v1/validator/attestation_data`. +#[derive(Debug, Clone, Deserialize)] +struct AttestationDataQuery { + slot: u64, + committee_index: CommitteeIndex, +} + +/// Shared router state. Cloned per request via [`Arc`]. +pub(super) struct AppState { + /// Request handler invoked by each route. + pub handler: Arc, + /// Whether builder mode is enabled. Read by `propose_block_v3`. + #[allow(dead_code, reason = "consumed by propose_block_v3 in a later PR")] + pub builder_enabled: bool, +} /// Builds the validator API HTTP router. /// /// Registers the distributed-validator-related endpoints and a fallback /// that reverse-proxies everything else to the upstream beacon node. /// -/// `_handler` will be threaded into Axum router state once request bodies -/// and responses are wired. `_builder_enabled` is consumed only by -/// `propose_block_v3`. -pub fn new_router(_handler: H, _builder_enabled: bool) -> Router { +/// `builder_enabled` is consumed by `propose_block_v3` to maximise the +/// builder boost factor. +pub fn new_router(handler: Arc, builder_enabled: bool) -> Router { + let state = Arc::new(AppState { + handler, + builder_enabled, + }); + Router::new() .route( "/eth/v1/validator/duties/attester/{epoch}", - post(attester_duties), + duties_post(attester_duties), ) .route( "/eth/v1/validator/duties/proposer/{epoch}", @@ -31,7 +70,7 @@ pub fn new_router(_handler: H, _builder_enabled: bool) -> Router { ) .route( "/eth/v1/validator/duties/sync/{epoch}", - post(sync_committee_duties), + duties_post(sync_committee_duties), ) .route("/eth/v1/validator/attestation_data", get(attestation_data)) .route("/eth/v1/beacon/pool/attestations", post(respond_404)) @@ -97,22 +136,87 @@ pub fn new_router(_handler: H, _builder_enabled: bool) -> Router { ) .route("/eth/v1/node/version", get(node_version)) .fallback(proxy_handler) + .with_state(state) } -async fn attester_duties() { - todo!("vapi: attester_duties"); +async fn attester_duties( + State(state): State>, + Path(epoch): Path, + Json(indices): Json, +) -> Result, ApiError> { + let response = state + .handler + .attester_duties(AttesterDutiesOpts { + epoch, + indices: indices.0, + }) + .await?; + + Ok(Json(response)) } -async fn proposer_duties() { - todo!("vapi: proposer_duties"); +async fn proposer_duties( + State(state): State>, + Path(epoch): Path, +) -> Result, ApiError> { + let response = state + .handler + .proposer_duties(ProposerDutiesOpts { epoch }) + .await?; + + Ok(Json(response)) } -async fn sync_committee_duties() { - todo!("vapi: sync_committee_duties"); +async fn sync_committee_duties( + State(state): State>, + Path(epoch): Path, + Json(indices): Json, +) -> Result, ApiError> { + let response = state + .handler + .sync_committee_duties(SyncCommitteeDutiesOpts { + epoch, + indices: indices.0, + }) + .await?; + + Ok(Json(response)) +} + +async fn attestation_data( + State(state): State>, + query: Result, QueryRejection>, +) -> Result, ApiError> { + let Query(query) = query.map_err(query_rejection_to_api_error)?; + let response = state + .handler + .attestation_data(AttestationDataOpts { + slot: query.slot, + committee_index: query.committee_index, + }) + .await?; + + Ok(Json(response)) } -async fn attestation_data() { - todo!("vapi: attestation_data"); +/// Wraps a `POST /eth/v1/validator/duties/*` handler with a body-size cap. +/// The cap is local to these two routes so unrelated POST handlers (e.g. +/// `submit_attestations`) keep axum's default 2 MiB. +fn duties_post(handler: H) -> MethodRouter +where + H: axum::handler::Handler, + T: 'static, + S: Clone + Send + Sync + 'static, +{ + post(handler).route_layer(DefaultBodyLimit::max(DUTIES_BODY_LIMIT)) +} + +/// Renders an axum query-extractor rejection as Pluto's standard +/// [`ApiError`] body shape, so all 4xx responses from this router share the +/// same `{ "code", "message" }` schema. +fn query_rejection_to_api_error(rejection: QueryRejection) -> ApiError { + ApiError::new(StatusCode::BAD_REQUEST, "invalid query parameters") + .with_source(std::io::Error::other(rejection.body_text())) } async fn submit_attestations() { @@ -179,8 +283,12 @@ async fn sync_committee_selections() { todo!("vapi: sync_committee_selections"); } -async fn node_version() { - todo!("vapi: node_version"); +async fn node_version( + State(state): State>, +) -> Result, ApiError> { + let response = state.handler.node_version().await?; + + Ok(Json(response)) } async fn respond_404() -> impl IntoResponse { @@ -190,3 +298,285 @@ async fn respond_404() -> impl IntoResponse { async fn proxy_handler() { todo!("vapi: proxy_handler"); } + +#[cfg(test)] +mod tests { + use super::*; + use pluto_eth2api::spec::phase0; + + use crate::validatorapi::{ + testutils::TestHandler, + types::{ + AttestationDataResponse, AttesterDutiesResponse, AttesterDuty, ProposerDutiesResponse, + ProposerDuty, SyncCommitteeDutiesResponse, SyncCommitteeDuty, ValIndexes, + }, + }; + + #[tokio::test] + async fn node_version_wraps_handler_value() { + let state = Arc::new(AppState { + handler: Arc::new(TestHandler::with_version("pluto/test/v1.0")), + builder_enabled: false, + }); + + let Json(body) = node_version(State(state)).await.unwrap(); + + assert_eq!(body.data.version, "pluto/test/v1.0"); + } + + #[tokio::test] + async fn attester_duties_wraps_handler_value() { + let duty = AttesterDuty { + pubkey: "0xaabbccddeeff".to_owned(), + slot: "12".to_owned(), + committee_index: "3".to_owned(), + committee_length: "16".to_owned(), + committees_at_slot: "4".to_owned(), + validator_committee_index: "2".to_owned(), + validator_index: "7".to_owned(), + }; + let handler = TestHandler::default().with_attester_duties(AttesterDutiesResponse { + data: vec![duty], + dependent_root: "0xab".to_owned(), + execution_optimistic: false, + }); + let state = Arc::new(AppState { + handler: Arc::new(handler), + builder_enabled: false, + }); + + let Json(body) = attester_duties( + State(state), + Path(42u64), + Json(ValIndexes(vec!["7".to_owned()])), + ) + .await + .unwrap(); + + let json = serde_json::to_value(&body).unwrap(); + assert_eq!(json["dependent_root"], "0xab"); + assert_eq!(json["execution_optimistic"], false); + assert_eq!(json["data"][0]["slot"], "12"); + assert_eq!(json["data"][0]["committee_index"], "3"); + assert_eq!(json["data"][0]["validator_index"], "7"); + } + + #[tokio::test] + async fn sync_committee_duties_wraps_handler_value() { + let duty = SyncCommitteeDuty { + pubkey: "0x112233".to_owned(), + validator_index: "9".to_owned(), + validator_sync_committee_indices: vec!["0".to_owned(), "5".to_owned()], + }; + let handler = + TestHandler::default().with_sync_committee_duties(SyncCommitteeDutiesResponse { + data: vec![duty], + execution_optimistic: true, + }); + let state = Arc::new(AppState { + handler: Arc::new(handler), + builder_enabled: false, + }); + + let Json(body) = sync_committee_duties( + State(state), + Path(7u64), + Json(ValIndexes(vec!["9".to_owned()])), + ) + .await + .unwrap(); + + let json = serde_json::to_value(&body).unwrap(); + assert_eq!(json["execution_optimistic"], true); + assert_eq!(json["data"][0]["validator_index"], "9"); + assert_eq!(json["data"][0]["validator_sync_committee_indices"][1], "5"); + } + + #[tokio::test] + async fn attestation_data_wraps_handler_value() { + let data = phase0::AttestationData { + slot: 99, + index: 3, + beacon_block_root: [0xaa; 32], + source: phase0::Checkpoint { + epoch: 7, + root: [0xbb; 32], + }, + target: phase0::Checkpoint { + epoch: 8, + root: [0xcc; 32], + }, + }; + let handler = + TestHandler::default().with_attestation_data(AttestationDataResponse { data }); + let state = Arc::new(AppState { + handler: Arc::new(handler), + builder_enabled: false, + }); + + let Json(body) = attestation_data( + State(state), + Ok(Query(AttestationDataQuery { + slot: 99, + committee_index: 3, + })), + ) + .await + .unwrap(); + + let json = serde_json::to_value(&body).unwrap(); + assert_eq!(json["data"]["slot"], "99"); + assert_eq!(json["data"]["index"], "3"); + assert_eq!(json["data"]["source"]["epoch"], "7"); + } + + #[test] + fn val_indexes_accepts_numbers_and_strings() { + let nums: ValIndexes = serde_json::from_str("[1, 2, 3]").unwrap(); + assert_eq!(nums.0, vec!["1", "2", "3"]); + + let strs: ValIndexes = serde_json::from_str(r#"["4", "5"]"#).unwrap(); + assert_eq!(strs.0, vec!["4", "5"]); + + let bad = serde_json::from_str::(r#"["not-a-number"]"#); + assert!(bad.is_err()); + } + + #[tokio::test] + async fn proposer_duties_wraps_handler_value() { + let duty = ProposerDuty { + pubkey: "0xaabbccddeeff".to_owned(), + slot: "1234".to_owned(), + validator_index: "7".to_owned(), + }; + let handler = TestHandler::default().with_proposer_duties(ProposerDutiesResponse { + data: vec![duty], + dependent_root: "0xcd".to_owned(), + execution_optimistic: true, + }); + let state = Arc::new(AppState { + handler: Arc::new(handler), + builder_enabled: false, + }); + + let Json(body) = proposer_duties(State(state), Path(99u64)).await.unwrap(); + + let json = serde_json::to_value(&body).unwrap(); + assert_eq!(json["dependent_root"], "0xcd"); + assert_eq!(json["execution_optimistic"], true); + assert_eq!(json["data"][0]["slot"], "1234"); + assert_eq!(json["data"][0]["validator_index"], "7"); + assert_eq!(json["data"][0]["pubkey"], "0xaabbccddeeff"); + } + + /// Verifies the manual `Query` rejection path emits the same + /// `{ code, message }` envelope as the rest of the router, instead of + /// axum's default plain-text 400. + #[tokio::test] + async fn attestation_data_returns_api_error_shape_on_bad_query() { + use axum::{ + body::{Body, to_bytes}, + http::Request, + }; + use tower::ServiceExt; + + let handler = TestHandler::default().with_attestation_data(AttestationDataResponse { + data: phase0::AttestationData { + slot: 0, + index: 0, + beacon_block_root: [0; 32], + source: phase0::Checkpoint::default(), + target: phase0::Checkpoint::default(), + }, + }); + let app = new_router(Arc::new(handler), false); + + // Missing `committee_index`. + let req = Request::builder() + .uri("/eth/v1/validator/attestation_data?slot=10") + .body(Body::empty()) + .unwrap(); + let resp = app.clone().oneshot(req).await.unwrap(); + assert_eq!(resp.status(), StatusCode::BAD_REQUEST); + let body = to_bytes(resp.into_body(), 64 * 1024).await.unwrap(); + let json: serde_json::Value = serde_json::from_slice(&body).unwrap(); + assert_eq!(json["code"], 400); + assert!(json["message"].is_string()); + + // Non-numeric `slot`. + let req = Request::builder() + .uri("/eth/v1/validator/attestation_data?slot=foo&committee_index=1") + .body(Body::empty()) + .unwrap(); + let resp = app.oneshot(req).await.unwrap(); + assert_eq!(resp.status(), StatusCode::BAD_REQUEST); + let body = to_bytes(resp.into_body(), 64 * 1024).await.unwrap(); + let json: serde_json::Value = serde_json::from_slice(&body).unwrap(); + assert_eq!(json["code"], 400); + } + + /// Verifies the body-limit layer on `POST /eth/v1/validator/duties/*` + /// rejects oversized bodies — defense against the `Vec` parse + /// amplification on the duties endpoints. + #[tokio::test] + async fn attester_duties_rejects_oversized_body() { + use axum::{ + body::Body, + http::{Method, Request}, + }; + use tower::ServiceExt; + + let handler = TestHandler::default(); + let app = new_router(Arc::new(handler), false); + + // 128 KiB of zeros — well past the 64 KiB cap, valid JSON or not. + let big = vec![b'0'; 128 * 1024]; + let req = Request::builder() + .method(Method::POST) + .uri("/eth/v1/validator/duties/attester/42") + .header("content-type", "application/json") + .header("content-length", big.len()) + .body(Body::from(big)) + .unwrap(); + let resp = app.oneshot(req).await.unwrap(); + assert_eq!(resp.status(), StatusCode::PAYLOAD_TOO_LARGE); + } + + /// `[]` is a valid request body — the upstream returns an empty duty + /// list — and `ValIndexes` should accept it. + #[test] + fn val_indexes_accepts_empty_array() { + let v: ValIndexes = serde_json::from_str("[]").unwrap(); + assert!(v.0.is_empty()); + } + + /// Mixed numeric + string elements are accepted; each element is + /// validated independently. The previous untagged-enum implementation + /// rejected this entirely. + #[test] + fn val_indexes_accepts_mixed_elements() { + let v: ValIndexes = serde_json::from_str(r#"[1, "2", 3, "4"]"#).unwrap(); + assert_eq!(v.0, vec!["1", "2", "3", "4"]); + } + + /// Caps the request to `VAL_INDEXES_MAX_LEN` elements. + #[test] + fn val_indexes_rejects_oversized_array() { + use crate::validatorapi::types::VAL_INDEXES_MAX_LEN; + + let too_many = (0..=VAL_INDEXES_MAX_LEN) + .map(|n| n.to_string()) + .collect::>() + .join(","); + let json = format!("[{too_many}]"); + let err = serde_json::from_str::(&json).unwrap_err(); + assert!(err.to_string().contains("too many validator indices")); + } + + /// Negative integers are rejected (validator indices are u64). + #[test] + fn val_indexes_rejects_negative_numbers() { + let bad = serde_json::from_str::("[-1]"); + assert!(bad.is_err()); + } +} diff --git a/crates/core/src/validatorapi/testutils.rs b/crates/core/src/validatorapi/testutils.rs new file mode 100644 index 00000000..45980fe7 --- /dev/null +++ b/crates/core/src/validatorapi/testutils.rs @@ -0,0 +1,214 @@ +//! Test helpers for the validator API router. +//! +//! [`TestHandler`] implements [`Handler`] with `unimplemented!()` stubs for +//! every method. As each router endpoint is ported, the relevant method is +//! overridden here so the route's unit test can drive it. + +use async_trait::async_trait; + +use super::{ + error::ApiError, + handler::Handler, + types::{ + AggregateAttestationOpts, AttestationDataOpts, AttestationDataResponse, AttesterDutiesOpts, + AttesterDutiesResponse, BeaconCommitteeSelection, EthResponse, NodeVersionData, + NodeVersionResponse, ProposalOpts, ProposerDutiesOpts, ProposerDutiesResponse, + SignedContributionAndProof, SignedValidatorRegistration, SignedVoluntaryExit, + SyncCommitteeContribution, SyncCommitteeContributionOpts, SyncCommitteeDutiesOpts, + SyncCommitteeDutiesResponse, SyncCommitteeMessage, SyncCommitteeSelection, Validator, + ValidatorsOpts, VersionedAttestation, VersionedProposal, VersionedSignedAggregateAndProof, + VersionedSignedBlindedProposal, VersionedSignedProposal, + }, +}; + +/// Mock [`Handler`] used by router unit tests. +#[derive(Debug, Default, Clone)] +pub struct TestHandler { + /// Value returned by [`Handler::node_version`]. + pub version: String, + /// Value returned by [`Handler::proposer_duties`]. + pub proposer_duties_response: Option, + /// Value returned by [`Handler::attester_duties`]. + pub attester_duties_response: Option, + /// Value returned by [`Handler::sync_committee_duties`]. + pub sync_committee_duties_response: Option, + /// Value returned by [`Handler::attestation_data`]. + pub attestation_data_response: Option, +} + +impl TestHandler { + /// Builds a [`TestHandler`] with the given node version string. + pub fn with_version(version: impl Into) -> Self { + Self { + version: version.into(), + ..Self::default() + } + } + + /// Sets the response returned by [`Handler::proposer_duties`]. + pub fn with_proposer_duties(mut self, response: ProposerDutiesResponse) -> Self { + self.proposer_duties_response = Some(response); + self + } + + /// Sets the response returned by [`Handler::attester_duties`]. + pub fn with_attester_duties(mut self, response: AttesterDutiesResponse) -> Self { + self.attester_duties_response = Some(response); + self + } + + /// Sets the response returned by [`Handler::sync_committee_duties`]. + pub fn with_sync_committee_duties(mut self, response: SyncCommitteeDutiesResponse) -> Self { + self.sync_committee_duties_response = Some(response); + self + } + + /// Sets the response returned by [`Handler::attestation_data`]. + pub fn with_attestation_data(mut self, response: AttestationDataResponse) -> Self { + self.attestation_data_response = Some(response); + self + } +} + +#[async_trait] +impl Handler for TestHandler { + async fn node_version(&self) -> Result { + Ok(NodeVersionResponse { + data: NodeVersionData { + version: self.version.clone(), + }, + }) + } + + async fn attester_duties( + &self, + _opts: AttesterDutiesOpts, + ) -> Result { + Ok(self + .attester_duties_response + .clone() + .expect("attester_duties not stubbed in TestHandler")) + } + + async fn proposer_duties( + &self, + _opts: ProposerDutiesOpts, + ) -> Result { + Ok(self + .proposer_duties_response + .clone() + .expect("proposer_duties not stubbed in TestHandler")) + } + + async fn sync_committee_duties( + &self, + _opts: SyncCommitteeDutiesOpts, + ) -> Result { + Ok(self + .sync_committee_duties_response + .clone() + .expect("sync_committee_duties not stubbed in TestHandler")) + } + + async fn attestation_data( + &self, + _opts: AttestationDataOpts, + ) -> Result { + Ok(self + .attestation_data_response + .clone() + .expect("attestation_data not stubbed in TestHandler")) + } + + async fn submit_attestations( + &self, + _attestations: Vec, + ) -> Result<(), ApiError> { + unimplemented!("submit_attestations not stubbed in TestHandler") + } + + async fn proposal( + &self, + _opts: ProposalOpts, + ) -> Result, ApiError> { + unimplemented!("proposal not stubbed in TestHandler") + } + + async fn submit_proposal(&self, _proposal: VersionedSignedProposal) -> Result<(), ApiError> { + unimplemented!("submit_proposal not stubbed in TestHandler") + } + + async fn submit_blinded_proposal( + &self, + _proposal: VersionedSignedBlindedProposal, + ) -> Result<(), ApiError> { + unimplemented!("submit_blinded_proposal not stubbed in TestHandler") + } + + async fn aggregate_attestation( + &self, + _opts: AggregateAttestationOpts, + ) -> Result, ApiError> { + unimplemented!("aggregate_attestation not stubbed in TestHandler") + } + + async fn submit_aggregate_attestations( + &self, + _aggregates: Vec, + ) -> Result<(), ApiError> { + unimplemented!("submit_aggregate_attestations not stubbed in TestHandler") + } + + async fn beacon_committee_selections( + &self, + _selections: Vec, + ) -> Result>, ApiError> { + unimplemented!("beacon_committee_selections not stubbed in TestHandler") + } + + async fn sync_committee_selections( + &self, + _selections: Vec, + ) -> Result>, ApiError> { + unimplemented!("sync_committee_selections not stubbed in TestHandler") + } + + async fn validators( + &self, + _opts: ValidatorsOpts, + ) -> Result>, ApiError> { + unimplemented!("validators not stubbed in TestHandler") + } + + async fn submit_validator_registrations( + &self, + _registrations: Vec, + ) -> Result<(), ApiError> { + unimplemented!("submit_validator_registrations not stubbed in TestHandler") + } + + async fn submit_voluntary_exit(&self, _exit: SignedVoluntaryExit) -> Result<(), ApiError> { + unimplemented!("submit_voluntary_exit not stubbed in TestHandler") + } + + async fn sync_committee_contribution( + &self, + _opts: SyncCommitteeContributionOpts, + ) -> Result, ApiError> { + unimplemented!("sync_committee_contribution not stubbed in TestHandler") + } + + async fn submit_sync_committee_contributions( + &self, + _contributions: Vec, + ) -> Result<(), ApiError> { + unimplemented!("submit_sync_committee_contributions not stubbed in TestHandler") + } + + async fn submit_sync_committee_messages( + &self, + _messages: Vec, + ) -> Result<(), ApiError> { + unimplemented!("submit_sync_committee_messages not stubbed in TestHandler") + } +} diff --git a/crates/core/src/validatorapi/types.rs b/crates/core/src/validatorapi/types.rs index 9fef4de0..ca057c90 100644 --- a/crates/core/src/validatorapi/types.rs +++ b/crates/core/src/validatorapi/types.rs @@ -4,8 +4,29 @@ //! Most data payloads are empty placeholders for now and will be swapped //! for the proper consensus-spec types in a later phase. +use std::fmt; + +use serde::{ + Deserialize, Deserializer, Serialize, + de::{self, SeqAccess, Visitor}, +}; + pub use pluto_crypto::types::{PublicKey as BlsPubKey, Signature as BlsSignature}; -pub use pluto_eth2api::spec::phase0::{Epoch, Root, Slot, ValidatorIndex}; +pub use pluto_eth2api::{ + GetAttesterDutiesResponseResponse as AttesterDutiesResponse, + GetAttesterDutiesResponseResponseDatum as AttesterDuty, + GetProposerDutiesResponseResponse as ProposerDutiesResponse, + GetProposerDutiesResponseResponseDatum as ProposerDuty, + GetSyncCommitteeDutiesResponseResponse as SyncCommitteeDutiesResponse, + GetSyncCommitteeDutiesResponseResponseDatum as SyncCommitteeDuty, + GetVersionResponseResponse as NodeVersionResponse, + GetVersionResponseResponseData as NodeVersionData, + spec::phase0::{self, Epoch, Root, Slot, ValidatorIndex}, + versioned, +}; + +/// Attestation data alias for the consensus-spec phase0 type. +pub type AttestationData = phase0::AttestationData; /// Index of a beacon committee within a slot. pub type CommitteeIndex = u64; @@ -29,8 +50,9 @@ pub struct EthResponse { pub struct AttesterDutiesOpts { /// Epoch to fetch duties for. pub epoch: Epoch, - /// Validator indices to fetch duties for. - pub indices: Vec, + /// Validator indices to fetch duties for. Carried as strings since the + /// upstream auto-generated client takes string-typed indices. + pub indices: Vec, } /// Options for @@ -47,8 +69,9 @@ pub struct ProposerDutiesOpts { pub struct SyncCommitteeDutiesOpts { /// Epoch to fetch duties for. pub epoch: Epoch, - /// Validator indices to fetch duties for. - pub indices: Vec, + /// Validator indices to fetch duties for. Carried as strings since the + /// upstream auto-generated client takes string-typed indices. + pub indices: Vec, } /// Options for @@ -110,21 +133,12 @@ pub struct SyncCommitteeContributionOpts { pub beacon_block_root: Root, } -/// Attester duty payload. Placeholder. -#[derive(Debug, Clone)] -pub struct AttesterDuty {} - -/// Proposer duty payload. Placeholder. -#[derive(Debug, Clone)] -pub struct ProposerDuty {} - -/// Sync-committee duty payload. Placeholder. -#[derive(Debug, Clone)] -pub struct SyncCommitteeDuty {} - -/// Attestation data payload. Placeholder. -#[derive(Debug, Clone)] -pub struct AttestationData {} +/// Response envelope for the `attestation_data` endpoint. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct AttestationDataResponse { + /// Unsigned attestation data produced by the consensus pipeline. + pub data: AttestationData, +} /// Validator payload. Placeholder. #[derive(Debug, Clone)] @@ -150,13 +164,31 @@ pub struct VersionedAttestation {} #[derive(Debug, Clone)] pub struct VersionedSignedAggregateAndProof {} -/// Signed validator registration payload. Placeholder. -#[derive(Debug, Clone)] -pub struct SignedValidatorRegistration {} +/// Signed validator (builder) registration payload. +/// +/// Wraps the versioned eth2api registration so the +/// [`Handler::submit_validator_registrations`](super::handler::Handler::submit_validator_registrations) +/// implementation has access to the same data the Go +/// `*eth2api.VersionedSignedValidatorRegistration` carries. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[serde(transparent)] +pub struct SignedValidatorRegistration( + /// Wrapped versioned registration. + pub versioned::VersionedSignedValidatorRegistration, +); -/// Signed voluntary exit payload. Placeholder. -#[derive(Debug, Clone)] -pub struct SignedVoluntaryExit {} +/// Signed voluntary exit payload. +/// +/// Wraps `phase0::SignedVoluntaryExit` so the +/// [`Handler::submit_voluntary_exit`](super::handler::Handler::submit_voluntary_exit) +/// implementation has access to the same data the Go +/// `*eth2p0.SignedVoluntaryExit` carries. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[serde(transparent)] +pub struct SignedVoluntaryExit( + /// Wrapped phase0 signed voluntary exit. + pub phase0::SignedVoluntaryExit, +); /// Sync-committee message payload. Placeholder. #[derive(Debug, Clone)] @@ -177,3 +209,100 @@ pub struct BeaconCommitteeSelection {} /// Sync-committee selection payload. Placeholder. #[derive(Debug, Clone)] pub struct SyncCommitteeSelection {} + +/// Validator-index request body for the `attester_duties` and +/// `sync_committee_duties` endpoints. +/// +/// Accepts both numeric (`[1, 2]`) and string-encoded (`["1", "2"]`) JSON +/// arrays. Indices are stored as decimal strings so they pass straight through +/// to the auto-generated request builders. +#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize)] +pub struct ValIndexes(pub Vec); + +/// Hard cap on the number of validator indices accepted per request. A real +/// cluster has at most a few hundred validators; the cap is set generously +/// above that to leave room for future growth while still bounding the work +/// per request so a single misbehaving caller cannot drive unbounded +/// allocation. Pairs with the route-level [`DUTIES_BODY_LIMIT`] +/// (`router.rs`) which limits the *bytes* the deserializer ever sees; +/// this limits the *count* even within those bytes. +pub const VAL_INDEXES_MAX_LEN: usize = 8192; + +impl<'de> Deserialize<'de> for ValIndexes { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + // Custom visitor: streams elements via `SeqAccess::next_element`, + // validates each on read, and aborts as soon as the cap is exceeded. + // Avoids the `#[serde(untagged)]` two-pass behavior (which buffers the + // input via serde's `Content` cache before retrying) and the + // single-allocation `Vec` materialization. + struct ValIndexesVisitor; + + impl<'de> Visitor<'de> for ValIndexesVisitor { + type Value = ValIndexes; + + fn expecting(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str("an array of validator indices (numeric or decimal string)") + } + + fn visit_seq(self, mut seq: A) -> Result + where + A: SeqAccess<'de>, + { + let mut out = Vec::with_capacity(seq.size_hint().unwrap_or(0).min(64)); + while let Some(elem) = seq.next_element::()? { + if out.len() >= VAL_INDEXES_MAX_LEN { + return Err(de::Error::custom(format!( + "too many validator indices (max {VAL_INDEXES_MAX_LEN})" + ))); + } + out.push(elem.0); + } + Ok(ValIndexes(out)) + } + } + + deserializer.deserialize_seq(ValIndexesVisitor) + } +} + +/// One validator-index element. Accepts either a JSON number (formatted into +/// a decimal string) or a JSON string (validated as a `u64` then kept +/// verbatim). Single-pass; no untagged-enum buffering. +struct Element(String); + +impl<'de> Deserialize<'de> for Element { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + struct ElemVisitor; + + impl Visitor<'_> for ElemVisitor { + type Value = Element; + + fn expecting(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str("a validator index (u64 or decimal string)") + } + + fn visit_u64(self, v: u64) -> Result { + Ok(Element(v.to_string())) + } + + fn visit_i64(self, v: i64) -> Result { + u64::try_from(v) + .map(|n| Element(n.to_string())) + .map_err(|_| de::Error::custom("validator index must be non-negative")) + } + + fn visit_str(self, v: &str) -> Result { + v.parse::().map_err(de::Error::custom)?; + Ok(Element(v.to_owned())) + } + } + + deserializer.deserialize_any(ElemVisitor) + } +}