Skip to content

feat(core): implement core/fetcher#454

Draft
emlautarom1 wants to merge 1 commit into
mainfrom
worktree-graceful-spinning-wombat
Draft

feat(core): implement core/fetcher#454
emlautarom1 wants to merge 1 commit into
mainfrom
worktree-graceful-spinning-wombat

Conversation

@emlautarom1
Copy link
Copy Markdown
Collaborator

@emlautarom1 emlautarom1 commented May 28, 2026

The fetcher is stage 2 of the duty pipeline (Scheduler → Fetcher → Consensus → …): it takes a Duty plus a per-validator DutyDefinitionSet and fetches the corresponding unsigned duty data from the beacon node, producing an UnsignedDataSet that it hands to its subscribers.

Highlights

  • New crates/core/src/fetcher/ module with the full Fetcher and a GraffitiBuilder, covering all four duty paths: attester, aggregator, block proposer, and sync-committee contribution.
  • DutyDefinition reworked into a heterogeneous enum (Attester / Proposer / SyncCommittee), mirroring Charon's core.DutyDefinition interface — replacing the previous generic wrapper.
  • UnsignedDutyData / UnsignedDataSet promoted into core::types so the fetcher, DutyDB, and consensus share one canonical, PubKey-keyed type (matching Go's map[core.PubKey]core.UnsignedData).
  • Every Go test ported and passing.
  • No new dependencies introduced.

Type-system changes (crates/core/src/types.rs)

The fetcher's input and output types did not previously exist in a usable form; both were reworked first.

DutyDefinition → enum

Charon models duty definitions as a DutyDefinition interface with three implementations (AttesterDefinition, ProposerDefinition, SyncCommitteeDefinition). Pluto previously had a generic DutyDefinition<T> / DutyDefinitionSet<T> which cannot represent a heterogeneous, type-dispatched set the way the fetcher needs.

This was replaced with:

  • enum DutyDefinition { Attester(..), Proposer(..), SyncCommittee(..) } with as_attester / as_proposer / as_sync_committee accessors (the Rust equivalent of Go's type assertions).
  • pub type DutyDefinitionSet = HashMap<PubKey, DutyDefinition> (matches Go's map[core.PubKey]core.DutyDefinition).
  • New ProposerDuty and SyncCommitteeDuty structs (faithful to eth2 v1.ProposerDuty / v1.SyncCommitteeDuty) and the three *Definition newtype wrappers. AttesterDefinition wraps the pre-existing signeddata::AttesterDuty.

UnsignedDataSet promoted to core::types

UnsignedDutyData (enum over Proposal / Attestation / AggAttestation / SyncContribution) and UnsignedDataSet = HashMap<PubKey, UnsignedDutyData> were moved out of dutydb/memory.rs into core::types. The old generic type was removed.

Fetcher implementation (crates/core/src/fetcher/)

graffiti.rs

Port of charon/core/fetcher/graffiti.go: GraffitiBuilder, client_graffiti_mappings, and the build/default/token helpers. The default graffiti uses a pluto/<version>-<commit> prefix (vs Go's charon/...).

mod.rs

Port of charon/core/fetcher/fetcher.go:

  • Fetcher::new, subscribe, register_agg_sig_db, register_await_att_data, and fetch (dispatch on DutyType).
  • The four data paths: fetch_attester_data, fetch_aggregator_data, fetch_proposer_data, fetch_contribution_data, plus verify_fee_recipient and the pubkeys logging tracker.
  • Beacon-node calls map onto the generated EthBeaconNodeApiClient: produce_attestation_data, get_aggregated_attestation_v2, produce_block_v3, produce_sync_committee_contribution. Loosely-typed responses are decoded into the strongly-typed signeddata types via JSON round-trip; the produce_block_v3 response is mapped into an unsigned VersionedProposal per fork/blinded variant.
  • Aggregator selection reuses the existing pluto_eth2util::eth2exp (is_att_aggregator, is_sync_comm_aggregator).
  • AggSigDB and DutyDB are injected as async callbacks (no hard dependency on those components); Box<dyn SignedData> values from AggSigDB are down-cast to concrete types via trait upcasting.
  • Go error strings preserved, e.g. aggregate attestation not found by root (retryable), and the errors.Wrap(err, "fetch <type> data") wrapping is reproduced via a FetcherError::Fetch { context, source } variant.

Tests

All Go tests are ported and passing (test names kept where practical):

  • Graffiti (5, internal): fetch_beacon_node_token, build_graffiti, default_graffiti, get_graffiti, new_graffiti_builder.
  • Attester: fetch_attester.
  • Aggregator (4): different_committee, same_committee, no_aggregator, nil_aggregate.
  • Sync contribution (3): aggregator, not_aggregator, data_error.
  • Blocks: fetch_blocks.
  • Fee recipient (internal): verify_fee_recipient.

Per the chosen approach, beacon-node responses that depend on request parameters (aggregate attestation by data root, sync contribution by subcommittee/root, block proposal echoing randao + graffiti) are provided by test-local wiremock responders rather than by extending BeaconMock. The block-proposal test reuses the existing Electra VersionedProposal golden fixture as the produce_block_v3 payload. Aggregator/sync tests use a custom spec carrying the selection fields required by eth2exp.

verify_fee_recipient's comparison logic was factored into a pure fee_recipient_mismatch helper so it can be asserted directly without a tracing-log capture harness; the warning itself still lives in verify_fee_recipient.

Files changed

  • crates/core/src/types.rsDutyDefinition enum, new duty/definition types, promoted UnsignedDutyData/UnsignedDataSet.
  • crates/core/src/dutydb/memory.rs, crates/core/src/dutydb/mod.rs — import and re-export the promoted types instead of defining them.
  • crates/core/src/lib.rs — register the fetcher module.
  • crates/core/src/fetcher/mod.rs, crates/core/src/fetcher/graffiti.rs — new.

Caveats and notes for review

  • Pre-existing deviation: Pluto's AttesterDuty omits the PubKey field present in eth2's v1.AttesterDuty. The fetcher does not depend on it (the pubkey is always the map key), but it is worth keeping in mind when the scheduler begins populating these definitions.
  • The fetcher's DutyDefinitionSet is produced by the scheduler, which is in-flight on emlautarom1/core-scheduler. The definition types added here may need reconciliation if that branch introduces overlapping types.

This PR was automatically generated by Claude.

Ports the Charon core/fetcher module to Pluto, covering all four duty
paths (attester, aggregator, proposer, sync contribution). Reworks
DutyDefinition into a heterogeneous enum mirroring Charon's interface
and promotes UnsignedDataSet/UnsignedDutyData into core::types so it
is shared by the fetcher, DutyDB, and consensus. All Go fetcher tests
are ported and passing.

Resolves #172.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@emlautarom1 emlautarom1 added the ai AI-related tooling, docs and workflows label May 28, 2026
@emlautarom1 emlautarom1 force-pushed the worktree-graceful-spinning-wombat branch from b1295a1 to 834f8f8 Compare May 28, 2026 20:08
Copy link
Copy Markdown
Collaborator Author

@emlautarom1 emlautarom1 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Walking through the parts of this change that warrant the most discussion. Overall the structure mirrors Charon faithfully and the test coverage looks solid. Leaving notes on a few design points rather than blocking changes.

Comment thread crates/core/src/types.rs
/// Per-validator duty definition. The Rust equivalent of Go's
/// `core.DutyDefinition` interface: a closed set of concrete duty definitions.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum DutyDefinition {
Copy link
Copy Markdown
Collaborator Author

@emlautarom1 emlautarom1 May 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change from the generic DutyDefinition<T> to an enum that mirrors Charon's core.DutyDefinition interface (AttesterDefinition / ProposerDefinition / SyncCommitteeDefinition). The set of types is known ahead of time so no need to replicate the interface with trait tricks.

This change is also required by core/scheduler so it's known to be correct.

type CallbackFuture<T> = Pin<Box<dyn Future<Output = std::result::Result<T, BoxError>> + Send>>;

/// Subscriber callback invoked for each fetched duty data set.
pub type Subscriber = Arc<dyn Fn(Duty, UnsignedDataSet) -> CallbackFuture<()> + Send + Sync>;
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Async-callback shape note: Arc<dyn Fn(...) -> Pin<Box<dyn Future + Send>>> is the same idiom sigagg already uses, so we're consistent across components.

Two small things to keep in mind for review:

  • BoxError = Box<dyn std::error::Error + Send + Sync> is intentionally loose so callers (AggSigDB / DutyDB / subscribers) can return their own error types without dragging concrete types into the fetcher's public surface. The cost is that errors get stringified at the boundary — fine here because the Go contract also propagates opaque errors, and the wrapping at fetch() preserves context.
  • register_* / subscribe take &mut self and are documented as not-thread-safe / call-before-fetch, matching Go's note on the methods. The integration code that wires Fetcher into the pipeline will need to honour that ordering.


/// Converts a `produce_block_v3` response into an unsigned
/// [`VersionedProposal`].
fn versioned_proposal_from_response(
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the trickiest piece in the file. The generated produce_block_v3 response carries a loosely-typed data; mapping it onto the strongly-typed ProposalBlock enum has to fan out per fork and per blinded/unblinded shape, because Deneb+ wrap the block in {block, kzg_proofs, blobs} while pre-Deneb is the block directly. The dispatch here matches validatormock::propose.rs's build_block_body (Deneb/Electra/Fulu use block_field(&data)).

Risk surface: we're trusting the JSON shape the beacon node returns to match the fork keys exactly. The fetch_blocks test exercises the Electra-unblinded path (reusing the existing VersionedProposal golden); the other forks rely on the structural match. If a real beacon node ever returns an unexpected shape, the user sees decode beacon node response: ... from Json(#[from] ...) which is actionable.

Future improvement: once a versioned-decode helper lands in eth2api, we should swap this match out for it.

/// Returns `Some((expected, actual))` when the proposal's fee recipient differs
/// (case-insensitively) from `fee_recipient_address`. Returns `None` for forks
/// without a fee recipient (pre-Bellatrix) or when the addresses match.
fn fee_recipient_mismatch(
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Go test for this is in fetcher_internal_test.go and asserts that a warning is emitted on mismatch / silent on match across every fork. Pluto has no log-capture utility yet, so the comparison logic was factored into fee_recipient_mismatch (pure, testable) and the warn\! stays in verify_fee_recipient. The ported test asserts on the pure helper.

Two notes:

  • The extraction uses execution_payload / execution_payload_header JSON keys uniformly across forks instead of typed field navigation. Cleaner than 5x per-fork accessors and keeps the same behaviour, but it's silent if a future fork renames either key. Worth a once-over if a new fork lands.
  • proposal.version() is queried first to early-return on Phase0/Altair which carry no fee recipient — matches Go's default: return in the switch.

@emlautarom1 emlautarom1 changed the title feat(core): port core/fetcher from Charon feat(core): implement core/fetcher May 28, 2026
@emlautarom1
Copy link
Copy Markdown
Collaborator Author

@claude Review this code

@claude
Copy link
Copy Markdown

claude Bot commented May 28, 2026

Claude finished @emlautarom1's task in 3m 44s —— View job


Code review — feat(core): implement core/fetcher

  • Read PR diff and changed files
  • Cross-reference with Charon Go source
  • Review types.rs changes (DutyDefinition enum, UnsignedDataSet promotion)
  • Review fetcher/mod.rs (Fetcher core, dispatch, four data paths, fee-recipient)
  • Review fetcher/graffiti.rs
  • Review test coverage and parity
  • Post review

Summary

Solid port. Structure, dispatch, sub-fetchers, graffiti, and tracker mirror charon/core/fetcher faithfully. Type-system reshuffle (DutyDefinition → enum, UnsignedDataSetcore::types) is the right call and lines up with how Charon's interface is used. Test parity is good. Findings below are mostly observability and edge-case concerns, not blockers.


Findings

1. PubkeysTracker::log() is skipped on early-return error paths — divergence from Go's defer

crates/core/src/fetcher/mod.rs:309 and :441

Go uses defer pt.log(ctx) in fetchAggregatorData / fetchContributionData, so the not-selected / resolved pubkey lists are logged even when the function bails out partway through (e.g., AggSigDB error, BN error). The Rust port calls tracker.log() only on the success path:

let agg_att = self.aggregate_attestation(slot, comm_idx, data_root).await?;  // ← early return
...
tracker.log();
Ok(resp)

So in the nil_aggregate / data_error scenarios you lose the partial observability Go provides. Easy fix: wrap the tracker in a small RAII guard (Drop impl that logs), or split the loop body into a closure and call tracker.log() after the closure returns regardless of result. Fix this →

2. fee_recipient_mismatch silently swallows missing/malformed payload fields

crates/core/src/fetcher/mod.rs:730-756

The cleanup to a pure helper is nice. But the JSON-path traversal uses ? on each step, so any of these conditions silently return None (= "no mismatch, no warning"):

  • proposal_body(...) returns Null (every fork arm uses .unwrap_or(Value::Null))
  • execution_payload / execution_payload_header key absent
  • fee_recipient key absent
  • as_str() fails

Go's typed accessors would panic / mis-compile if a future fork renames the key — the Rust port simply stops warning. Combined with the test gap below this means a hypothetical Fulu+1 rename could ship as a no-op warning. Consider one of:

  • debug_assert!(actual_addr.is_some(), ...) so dev/test catches it,
  • log a warn! if we have a Bellatrix+ proposal but couldn't extract the address.

You explicitly call this out in the PR description; flagging here as it directly intersects with finding #3.

3. verify_fee_recipient test coverage gap vs Go

crates/core/src/fetcher/mod.rs:929-952

TestVerifyFeeRecipient in Go exercises 10 fork/blinded combinations (Bellatrix..Fulu × {blinded, unblinded}). The Rust test only covers Electra-unblinded via the golden. With the silent-fail behavior in #2, the other 9 paths are effectively untested for both fee_recipient_mismatch and proposal_body matching.

You don't need a BeaconMock log harness for this — fee_recipient_mismatch is already pure. A table-driven test that builds a minimal proposal per fork (deterministic addresses, no golden needed for the smaller bodies) would lock the JSON shape per variant. The blinded variants in particular only need execution_payload_header.fee_recipient. Fix this →

4. Non-Ok BN response collapses 404 and 5xx into the same retryable error

crates/core/src/fetcher/mod.rs:534-539 (and similarly for produce_attestation_data / produce_sync_committee_contribution)

GetAggregatedAttestationV2Response::Ok(ok) => ok,
_ => return Err(FetcherError::AggregateAttestationNotFound),

Go inspects eth2Resp.Data == nil separately from err != nil, so a 5xx surfaces as a non-retryable API error while a nil-data 200 surfaces as the retryable "not found by root". The Rust collapse treats any non-Ok variant — including server errors — as retryable. Whether that's a problem depends on what the upstream pipeline does with AggregateAttestationNotFound: if it triggers a tight retry loop, you'll hammer a broken BN. Worth confirming.

5. Fetcher::fetch clones unsigned_set per subscriber — including the last

crates/core/src/fetcher/mod.rs:245-251

for sub in &self.subs {
    let clone = unsigned_set.clone();
    sub(duty.clone(), clone).await.map_err(FetcherError::Callback)?;
}

This clones once per subscriber even though the last subscriber could take the value by move. Charon does the same (it always clones), so this is parity, not a bug. Mentioning only because it's a per-fetch hot path and the set can be large. A small optimization for later: iterate with iter().enumerate() and mem::take (or into_iter over a vec) for the final call. Optional.

6. Fetcher::new returns Self while Go's New returns (*Fetcher, error)

crates/core/src/fetcher/mod.rs:163-182

Go's New never actually returns an error (always nil), so the Rust simplification is fine. Just flagging so anyone reading both files knows they're equivalent.

7. Phase0/Altair are accepted by versioned_proposal_from_response but BN should never produce them

crates/core/src/fetcher/mod.rs:632-633

(ConsensusVersion::Phase0, _) => ProposalBlock::Phase0(json_from(&data)?),
(ConsensusVersion::Altair, _) => ProposalBlock::Altair(json_from(&data)?),

Go has no such branches — produce_block_v3 only emits Bellatrix+. Decoding them isn't wrong, but it's quietly accepting input the upstream contract says is impossible, and downstream code (verify_fee_recipient) silently no-ops on them. Either reject explicitly (return Err(FetcherError::UnsupportedProposalVersion(...))) or document the looseness. Minor.

8. Minor: stale use std::any::Any is needed only inside downcast

crates/core/src/fetcher/mod.rs:9 imports Any at module scope. It's referenced only in downcast at :603. Keep or scope-local; clippy is fine either way.


Nits / observations

  • crates/core/src/fetcher/mod.rs:42Subscriber = Arc<dyn Fn(...) -> ...> matches the sigagg idiom as noted; consistency is the right call.
  • crates/core/src/fetcher/mod.rs:625 (versioned_proposal_from_response) — agreed with the inline note: once eth2api lands a versioned decode, swap the match out. The mapping is correct per-fork as far as I can verify; the Fulu arm is symmetrical to Deneb/Electra.
  • crates/core/src/fetcher/graffiti.rs:135-138pluto/<version>-<commit> prefix (vs. charon/...) is the right call and matches the rest of the workspace.
  • crates/core/src/types.rs:507-541DutyDefinition enum + as_* accessors is the natural Rust shape for the Go interface, and the deliberate inability to add variants outside the crate is correct (matches Charon's closed set of impls in core/).
  • crates/core/src/types.rs:560-563 — promoting UnsignedDutyData / UnsignedDataSet to core::types and re-exporting from dutydb is the right de-duplication; aligns with how core.UnsignedData lives in Go's core package.
  • The register_* / subscribe &mut self + "call before fetch" contract carries over cleanly; just make sure the integration site honours it (no mid-flight registration).

Conclusion

LGTM with the caveats above. The two I'd actually act on before merging:

  1. Move tracker.log() to fire on every exit path (1-line change with Drop).
  2. Expand verify_fee_recipient to cover every fork/blinded combo — closes the silent-fail risk in Update README #2 at near-zero cost.

The rest are observations and judgment calls.
· Branch: worktree-graceful-spinning-wombat

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

ai AI-related tooling, docs and workflows

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant