Skip to content

feat(core): implement core/aggsigdb#452

Merged
emlautarom1 merged 37 commits into
mainfrom
emlautarom1/core-aggsigdb
Jun 3, 2026
Merged

feat(core): implement core/aggsigdb#452
emlautarom1 merged 37 commits into
mainfrom
emlautarom1/core-aggsigdb

Conversation

@emlautarom1
Copy link
Copy Markdown
Collaborator

Supersedes #430


Uses an Actor model as in Charon V1 which results in less locks and correct awaits on readers. Also, this implementation supports arbitrary cancellation through CancellationToken. This is not strictly needed, but adding it is trivial. Lastly, the latest changes on deadliner were merged which makes testing more straightforward.

emlautarom1 and others added 25 commits May 20, 2026 14:06
- Use Actor model
- Implement `store`
- Test `store`
- Expose top-level `new`
- Remove impossible errors
- Add inline docs
- Use Mutex/Notify pattern
- Remove need for handles
- Adjust tests and docs
- Use existing Deadliner API (Arc)
- Add tests
- Rust specific
- Use `try_for_each`
- Proper naming convention
- Annotate `wait_for` abort mechanism
- Log when deadliner does not have a channel
- Faster evictions
- Avoid `notify` and unnecessary awakes
- Removes mutexes
- Use `tokio::select!`
- Apply clippy lints
- Do not sleep, wait for eviction to complete
* test: update qbft test

* fix: compare run on retached thread

* fix: removed hard coded salt in tests

* fix: return error on Context cancelled

* fix: hash from string, not magic number

* fix: one shot cancel when parent is cancelled

* fix: fix make_is_leader test

* fix: minors naming and comments

* fix: linter

* fix: early cancel on the loop

* fix: context cancel in compare

* fix: validate definition

* fix: add check pr < r

* fix: test and document on run

* fix: compare callback failed should timeout

* fix: test use timeout channel instead of sleep

* fix: add comment on cancellation poll interval

* fix: use enum for invalid defnintion error

* fix: add more test on check valid round

* fix: using test-case

* fix: small fixes

* fix: linter

* fix: remove unnecessary filter

* fix: make_is_leader now 0-based

* fix: add cancelled state for fake clock

* fix: flanky tests by ordering and add small settle window time

* fix: harden fake-clock to avoid flanky scheduling

* fix: lint

* refactor: introduce QbftTypes

* refactor: group QBFT callbacks into typed request/logger structs

* refactor: add ClientRecord

* refactor: simplify internal_tests

* refactor: simplify qbft mod.rs

* refactor: remove skip clippy on qbft mod
- Proceed with store only when appropriate
@emlautarom1 emlautarom1 linked an issue May 28, 2026 that may be closed by this pull request
@NethermindEth NethermindEth deleted a comment from claude Bot May 28, 2026
@emlautarom1
Copy link
Copy Markdown
Collaborator Author

@claude Review the code and take into consideration the original PR (430)

@claude
Copy link
Copy Markdown

claude Bot commented May 28, 2026

Claude finished @emlautarom1's task in 5m 36s —— View job


Code review

  • Read PR diff and changed files
  • Compare actor model to Charon MemDB reference
  • Compare to lock-based MemDBV2 (which feat: implement core/aggsigdb #430 mirrored)
  • Review tests and edge cases
  • Post review

Overall I think the actor model is a solid choice over the RwLock approach in #430 — it matches Charon's MemDB (v1) shape, eliminates the read-side spinning that MemDBV2 does with runtime.Gosched, and is what Charon production has used the longest. A few real issues below, then nits.

🔴 Unintentional revert of an unrelated bug fix

crates/eth2util/src/keystore/store.rs removes file.flush().await? from write_file. That line was added in PR #447 "fix(keystore): flush file before return" (commit df5720e) specifically as a bug fix on main. It doesn't belong in this PR and looks like a bad merge resolution.

     file.write_all(data).await?;
-    file.flush().await?;

Please restore it. Fix this →

🟠 wait_for has no per-call cancellation (parity regression vs Charon)

Charon's AggSigDB.Await(ctx, duty, pubKey) accepts a context.Context, so each caller can time out or cancel independently. The Rust wait_for only honors the global CancellationToken passed to Handle::new — there's no way for an individual reader to bail out short of dropping its future (see next point). Callers can wrap with tokio::select!/timeout themselves, but:

  • the doc comment "Might block indefinitely if no data is ever stored" acknowledges this gap, and
  • this is the original purpose of the cancel channel in readQuery in charon/core/aggsigdb/memory.go.

Recommend adding an optional CancellationToken parameter to wait_for, or at minimum documenting the recommended tokio::select! pattern next to the method.

🟠 Waiters never freed when a caller drops its wait_for future

crates/core/src/aggsigdb/memory.rs:60-67 parks each waiter's oneshot::Sender in self.waiters keyed by (Duty, PubKey). If the caller drops the wait_for future (timeout, cancellation, task aborted), the Sender stays in the map. Charon mitigates this in processBlockedQueries (memory.go:196) by skipping any query whose cancel channel is closed, on every write — so cancelled queries are cleaned up opportunistically.

The Pluto port only touches self.waiters for the exact (duty, pubkey) being written. That's a nice perf win, but it means:

  • If a matching Store ever arrives, the dead sender is removed (via remove_entry), then send silently fails. ✅
  • If a matching Store never arrives, the dead sender lives until the actor stops. ❌

In a long-running validator this is an unbounded leak in pathological cases (e.g., readers that time out on duties the cluster ultimately skips). Two options:

  1. Periodically prune waiters of entries whose senders are closed (tx.is_closed()).
  2. Also clear waiters for an evicted duty in evict() — currently evict only removes self.entries.remove(&duty), leaving stale waiters behind. If the duty deadline has passed and the entry was evicted, those readers will never get data anyway.

Fix this →

🟡 store returns on first mismatch — partial state remains

memory.rs:75-101 early-returns Err(MismatchingData) on the first Occupied mismatch, but earlier vacant slots in the same call have already been inserted into for_duty and waiters have already been signalled. The comment "Partial insertions on error match the semantics of Charon" is accurate parity, but it also means:

  • HashMap iteration order is nondeterministic, so the set of partially-stored pubkeys is nondeterministic across runs.
  • A retry of the same Store(...) call where the failing pubkey is the second one will succeed for that pubkey on the second try if no one re-stores the conflicting key, leading to silent inconsistency between retries.

This matches Charon (which has the same characteristic), but worth a // Charon parity: … block comment that calls this out explicitly. The current one-liner doesn't communicate the iteration-order angle.

🟡 Handle::new decoupling from the deadliner is fragile

pub fn new(
    deadliner: deadline::DeadlinerHandle,
    expired_rx: sync::mpsc::Receiver<types::Duty>,
    ct: CancellationToken,
) -> Self

deadliner and expired_rx are passed independently, but they must come from the same DeadlinerTask::start(...) call to be useful. Nothing in the type signature enforces that. A constructor that takes (DeadlinerHandle, mpsc::Receiver<Duty>) as a tuple — or better, takes the DeadlinerTask directly and starts it — would prevent miswiring. Not a blocker, but easy to footgun.

🟡 Misleading comment in unrelated_write_does_not_unblock

// Storing an unrelated key wakes readers, which block again since the store is
// unrelated.

The implementation does not wake readers on unrelated writes (the per-key self.waiters.remove_entry(&k) only touches the matching key). The reader simply never wakes. The comment was true for Charon's processBlockedQueries-on-every-write design but is wrong for this implementation. Suggested:

// An unrelated store does not unblock the reader because waiters are
// keyed by (duty, pubkey) and only matching keys are notified.

Fix this →

🟡 Eviction-completion synchronization in write_evict_wait_then_write test

while expiration_tx.capacity() != expiration_tx.max_capacity() {
    tokio::task::yield_now().await;
}

This polls capacity to infer that the actor consumed the eviction message. It works in practice but it's coupling test progress to mpsc internals. A simpler/more deterministic approach would be: after sending the eviction, call store.wait_for(...) on a sentinel (duty, dummy_key) then store.store(...) to force a sync round-trip through the actor, or expose a #[cfg(test)] ack channel.

Not a blocker, just noting because the test is otherwise the most racy in the file.

🟢 Smaller nits

  • signed_data.clone() happens twice per stored key in the happy path (slot.insert(signed_data.clone()) + the waiter loop). For the common case of zero waiters, you can move signed_data into slot.insert and skip the clone — dyn_clone clones are cheap but not free.
  • memory.rs:38biased ordering puts ct.cancelled() first, then expired_rx, then messages_rx. This is reasonable but means a perpetually-firing expired_rx could starve stores. Probably fine in practice (real expirations are sparse).
  • memory.rs:21type Waiters = HashMap<(Duty, PubKey), Vec<oneshot::Sender<…>>>; allocates a tuple key per insert/lookup. Charon uses a memDBKey struct. Cosmetic, but a named struct would be cheaper to clone and more searchable.
  • The new SignedDataSet = HashMap<PubKey, Box<dyn SignedData>> type alias replaces the previous wrapper. This matches Charon's type SignedDataSet map[PubKey]SignedData exactly. ✅ Good simplification.
  • DeadlinerHandle::always(AddOutcome::Scheduled) test helper is useful — but consider also asserting in tests that the deadliner was called for stored duties (currently nothing checks the tx/duty wiring).

✅ Things I liked

  • Actor model gives clean ownership and removes the runtime.Gosched spin loop from MemDBV2.
  • biased select with cancellation token + expirations + messages is correctly layered.
  • Tests cover the important paths: write-then-read, blocked reader, idempotent write, mismatch error, evict-then-rewrite, many-readers fan-out.
  • Handle: Clone is the right shape and matches the actor pattern idiomatically.
  • The PR description's framing ("Actor model as in Charon V1") is accurate.

Summary

Approve with changes once these are addressed:

  1. Restore the file.flush().await? revert in keystore/store.rs (must-fix).
  2. Decide on a story for caller-side cancellation of wait_for (add a token, or document the tokio::select! pattern).
  3. Either drop waiters on eviction or document why the leak is bounded.
  4. Fix the misleading comment in unrelated_write_does_not_unblock.

Everything else is style / nitpick territory.
Branch: emlautarom1/core-aggsigdb

@emlautarom1
Copy link
Copy Markdown
Collaborator Author

@claude Re-review the code and evaluate if the recent commits fix the issues detected previously.

@claude
Copy link
Copy Markdown

claude Bot commented May 28, 2026

Claude finished @emlautarom1's task in 2m 43s —— View job


Re-review: recent commits

  • Check commits since previous review
  • Verify each previously flagged issue
  • Review smaller nits
  • Post updated review

I re-reviewed the four commits since my previous review (d748f7b, 98a6160, f60f534, 139852d, 7923d67) and cross-referenced against Charon's core/aggsigdb/memory.go. All four flagged issues are addressed. Details below.

Previously flagged issues

✅ 1. file.flush().await? restored — f60f534

crates/eth2util/src/keystore/store.rs:233 has the line back. Bug fix from #447 preserved. Resolved.

✅ 2. wait_for per-call cancellation — 7923d67

You chose the documentation route over adding a CancellationToken parameter. The doc on Handle::wait_for now says:

/// To avoid blocking indefinitely, consider using a timeout,
/// [`CancellationToken`] or racing using `tokio::select!` against other
/// events.

That satisfies the minimum ask. Caller-side tokio::select! is idiomatic Rust and avoids the per-key cancel channel that Charon uses (which exists mostly because Go contexts must be plumbed through). Acceptable.

✅ 3. Waiter leak / dropped futures — 139852d

Your trim_readers solution is better than what I suggested. It's invoked after every message:

fn trim_readers(&mut self) {
    self.waiters.retain(|_, waiters| {
        waiters.retain(|w| !w.is_closed());
        !waiters.is_empty()
    });
}

This handles the dropped-future case (timeouts, task aborts) for all keys, not just on eviction. Note: this is more aggressive than Charon, which only prunes closed queries on writes (processBlockedQueries, memory.go:201). Yours runs after every store, wait_for, and eviction — including the cheap no-op pruning when only an eviction occurs. The extra work is O(waiters) per actor message, which is fine since the typical waiter count is small.

Note also that on evict(), waiters intentionally stay (so a later store for the same evicted duty can still wake a reader — see write_evict_wait_then_write). That matches Charon's case duty := <-db.deadliner.C(): block at memory.go:131-136, which only deletes from data/keysByDuty, never from blockedQueries.

✅ 4. Misleading test comment — 139852d

unrelated_write_does_not_unblock now reads:

// Storing an unrelated key does not affect readers.

Concise and correct. Resolved.

✅ 5. Eviction sync in write_evict_wait_then_write139852d

The capacity() hack is gone. Replaced with a synchronous round-trip via a dummy store:

expiration_tx.send(duty.clone()).await.unwrap();
{
    let dummy = Duty::new_attester_duty(SlotNumber::new(u64::MAX));
    store
        .store(dummy, MockSignedData(0).singleton(pub_key))
        .await
        .unwrap();
}

This works because:

  • expiration_tx.send(...).await completes before the dummy is queued, so the expiration is already buffered.
  • The actor's select! is biased, so when both channels are ready it processes expiration first.
  • The dummy store's reply oneshot guarantees the actor has reached and processed the message — which means it must already have processed the earlier expiration.

Solid. Couple of tiny things:

  • Typo: "compeltes" → "completes" in the inline comment.
  • The reason this works (biased + serial actor + roundtrip) isn't obvious to a future reader. Worth a one-liner like // Relies on biased select: expiration is drained before the next message. Optional.

Newly observed: deadliner.add outcome reverted — 98a6160

d748f7b originally short-circuited store on AlreadyExpired/FailedToCompute. 98a6160 reverted that to:

// TODO(charon): Distinguish between no deadline supported vs already expired.
let _ = self.deadliner.add(duty.clone()).await;

I verified this matches Charon memory.go:147 — same // TODO(corver): Distinguish ... comment, also discards the bool. ✅ Correct decision for parity. The Pluto comment correctly attributes this to Charon via TODO(charon):.

Not addressed (acceptable)

  • store partial-state comment — still the one-liner. I'd still prefer it call out the HashMap iteration-order angle, but matching Charon exactly is reasonable. Non-blocker.
  • Handle::new decoupling from deadliner — signature unchanged. Non-blocker style suggestion.
  • signed_data.clone() twice in happy path — unchanged. Cosmetic.

Status

All must-fix items from the prior review are resolved. Nothing remaining blocks approval from my side. The two minor leftovers (typo in test, optional comment about biased select reliance) are cleanup-while-you're-there changes, not blockers.

Nice work on the trim_readers solution — that's a cleaner answer than my "drop on evict" suggestion.


Branch: emlautarom1/core-aggsigdb

Comment thread crates/core/src/aggsigdb/memory.rs Outdated
deadliner: deadline::DeadlinerHandle,
}

impl Actor {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use actix?

Copy link
Copy Markdown
Collaborator Author

@emlautarom1 emlautarom1 May 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Though I believe it's a pattern that accurately captures the way Charon models services so we can get a lot of benefit from it, we still don't have a lot of actors implemented in Pluto.

For now I would prefer to have them hand-rolled and if we see room for refactoring we can introduce some third-party library.

Comment thread crates/core/src/aggsigdb/memory.rs Outdated
Comment thread crates/core/src/aggsigdb/memory.rs Outdated
- Rename structs to `Memory*`
Comment thread crates/core/src/aggsigdb/mod.rs Outdated
@emlautarom1 emlautarom1 requested a review from varex83 June 1, 2026 16:35
Comment thread crates/core/src/aggsigdb/memory.rs
Comment thread crates/core/src/aggsigdb/memory.rs Outdated
@emlautarom1 emlautarom1 merged commit a287cd2 into main Jun 3, 2026
11 checks passed
@emlautarom1 emlautarom1 deleted the emlautarom1/core-aggsigdb branch June 3, 2026 14:21
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Implement core/aggsigdb

4 participants