Skip to content

feat(core): implement core/scheduler#466

Open
emlautarom1 wants to merge 41 commits into
mainfrom
emlautarom1/core-scheduler
Open

feat(core): implement core/scheduler#466
emlautarom1 wants to merge 41 commits into
mainfrom
emlautarom1/core-scheduler

Conversation

@emlautarom1
Copy link
Copy Markdown
Collaborator

@emlautarom1 emlautarom1 commented Jun 1, 2026

Closes #176


Uses an Actor model to implement the core/scheduler module, solving some concurrency hacks that the original implementation had.

Metrics have been ported using the existing Vise patterns but are untested.

- Add `Scheduler` struct (WIP)
- Add `new_slot_ticker`
- Fix duplicated code
- `valcache` is required by scheduler
- Fixes cyclical dependency issue
- Use `valcache` to get the list of active validators.
- Manages internal state of the scheduler
- Stored in a `Mutex` for thread safety
- TODO's regarding concurrent access while resolving duties
- Try to reuse existing infrastructure
- Preserve only Actor API
- Add docs
- Simplify error messages
- Fix edge case when trimming small epochs
- Not needed in practice, and they are large structs that would be expensive to clone.
@emlautarom1
Copy link
Copy Markdown
Collaborator Author

@claude review

@claude
Copy link
Copy Markdown

claude Bot commented Jun 1, 2026

Claude finished @emlautarom1's task in 4m 19s —— View job


Review: core/scheduler Implementation

Nicely structured port — the actor model is a clean fit for Rust and the test suite covers the important branches. Below are the issues I'd want addressed before merge, ordered roughly by importance.


1. Functional regression: GetDutyDefinition 100 ms timeout swallows legitimate waits

SchedulerHandle::get_duty_definition (crates/core/src/scheduler.rs:255-270) bounds the response to 100 ms total. Inside the actor, SchedulerActor::get_duty_definition (crates/core/src/scheduler.rs:344-370) issues self.client.fetch_slots_config().await? — a beacon-node network call — before it can answer.

In Charon, GetDutyDefinition runs under RWMutex and waits using isResolvingEpoch polling until the resolution finishes (scheduler.go:185-191). Here the actor model serialises the resolution against the query, and the comment at line 264 ("This has to be very rare event…") is migrated from a context where the call was free, into a context where every call pays a network round-trip. Under a slow beacon node the response will simply be TimeoutError.

Two things to fix together:

  1. Don't put fetch_slots_config behind the timeout. Either fetch slots_per_epoch once when the scheduler starts (it doesn't change at runtime in the relevant configs and is already loaded by new_slot_ticker) or cache it on the actor and avoid hitting the API on every GetDutyDefinition.
  2. Reconsider the 100 ms timeout. With network I/O removed from the hot path, get_duty_definition is essentially synchronous on the actor; the only real wait is "queued behind another message". Either drop the timeout (the actor responds quickly) or let the caller pass one. As written, callers can't distinguish "scheduler busy" from "duty truly absent".

Also note: SchedulerError::EpochNotResolved drops the duty field that Charon includes in the log (scheduler.go:194-196); add it for parity.

2. new_slot_ticker ignores cancellation during the inter-slot sleep

crates/core/src/scheduler.rs:627-655:

let wait = slot.time.signed_duration_since(chrono::Utc::now()).to_std().unwrap_or_default();
tokio::time::sleep(wait).await;

The Go reference uses select between ctx.Done() and the clock (scheduler.go:663-667), so cancellation interrupts the sleep. Here the cancel-token is only consulted after the sleep when sending into the channel — so shutting down mid-slot blocks the ticker task for up to a full slot duration (~12 s). Wrap the sleep in tokio::select! { _ = ct.cancelled() => break, _ = tokio::time::sleep(wait) => {} }.

3. Reorg handler runs unconditionally — Charon gates it on a feature flag

crates/core/src/scheduler.rs:328 / handle_chain_reorg runs whenever a value arrives on reorg_rx. Charon checks featureset.Enabled(featureset.SSEReorgDuties) (scheduler.go:142-154) and warns when the feature is disabled. The comment on with_chain_reorgs ("SSE feature check should be done by the caller") punts this to the caller, but:

  • The behaviour is silent — nothing logs an ignored reorg.
  • Anyone calling with_chain_reorgs skipping the check changes scheduler behaviour silently.

Either re-add the featureset check inside the scheduler (matching Charon) or document the requirement loudly. At minimum, log when a reorg is observed but the resolved epoch is u64::MAX (currently the early-return at line 330 is silent).

4. Subscribers terminate permanently on a single lag

subscribe_slot and subscribe_duty (crates/core/src/scheduler.rs:111-173) drop the subscription forever on RecvError::Lagged. This was a deliberate "NOTE: requires further analysis" call, but it means a single transient slow-down silently disables a downstream component for the rest of the process lifetime. Charon would not lose subscribers this way. Worth at minimum:

  • Documenting this behaviour on the public method.
  • Returning a handle (your TODO at line 118) so the caller can detect that its subscription died.

The 100-element broadcast channel buffer (line 104-105) covers ~20 minutes of slots, so in practice lag means the subscriber is wedged — but it could still happen under back-pressure and disappearing without observability is a regression vs Charon.

5. handle_chain_reorg is pub but on a private struct

crates/core/src/scheduler.rs:328 — declared pub async fn inside impl SchedulerActor, but SchedulerActor itself is private. Should be async fn. Same minor issue isn't worth a separate point but worth a sweep.

6. Redundant clones in SchedulerBuilder::build

crates/core/src/scheduler.rs:205 — new_slot_ticker(&client.clone(), ct.clone()).await?client.clone() then immediately taking a reference is wasteful; new_slot_ticker takes &EthBeaconNodeApiClient. And the next line re-clones into the actor, so the temp clone is unused. Just pass &client.

7. validator_status reset path is O(total_seen_validators) per validator

crates/core/src/scheduler.rs:687-693 iterates every label combination ever recorded just to zero out the previous status for one pubkey. With a stable set this is fine, but for a long-running node with validator churn the linear scan stacks up (O(N²) per epoch). The comment correctly identifies that Vise can't delete labels — so the leak is unavoidable — but the iteration could keep an in-actor HashMap<PubKey, String> of last-known status to avoid the scan. Not blocking, but worth a TODO.

8. Style / nits

  • enum SchedulerMessage at line 234 is missing the conventional blank line / #[derive] and is wedged against the impl Default block above. Add a blank line for readability.
  • All error variants end in Error (TimeoutError, EthBeaconNodeApiClientError, …) — clippy lints recommend the bare form (Timeout, EthBeaconNodeApi). Pre-existing pattern in the codebase, so consistency wins, but it could be cleaned up in a follow-up.
  • fast_backoff is marked // TODO: Duplicated from crates/p2p/src/bootnode.rs — please file a follow-up issue or DRY this into a shared helper.
  • Metric module is named metrics but the file declares mod metrics; and re-exports SCHEDULER_METRICS via use crate::scheduler::metrics::SCHEDULER_METRICS; — fine, but the rest of the crate uses bare-metrics:: imports inline at call sites. Minor.

9. Tests

The test suite is solid, but two coverage gaps stand out:

  • No test exercises the delay_slot_offset path with a non-past slot. All tests use test_past_slot so the deadline has elapsed and the wait is Duration::ZERO. Worth one slow-path test to make sure cancellation actually wins the with_cancellation_token_owned race.
  • No test for the last_in_epoch pre-resolution branch (crates/core/src/scheduler.rs:419-423). Worth one.
  • get_duty_definition is tested in-actor but not via the SchedulerHandle 100ms-timeout path. Given point 1, a test that drives the timeout would surface the regression.

Summary

The actor model is the right call and the bulk of the logic mirrors Charon faithfully. The major concern is #1 (the 100 ms timeout combined with a network call inside the actor is a real behavioural regression). #2 and #3 are smaller but worth fixing for parity. Everything else is cleanup.

  • Read PR diff and changed files
  • Cross-reference with Charon Go implementation
  • Analyze functional correctness
  • Check Rust style and concurrency patterns
  • Review metrics implementation
  • Post review findings
    • Branch: emlautarom1/core-scheduler

Comment thread crates/core/Cargo.toml Outdated
pluto-ssz.workspace = true
ssz.workspace = true
tree_hash.workspace = true
anyhow.workspace = true
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can move it back to dev dependecies

Comment thread crates/core/src/scheduler.rs Outdated
/// Subscribes a callback function for triggered slots.
pub fn subscribe_slot(
&mut self,
f: impl Fn(&types::Slot) -> Result<()> + Send + 'static,
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we support async functions here as well?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to support async callbacks. There is a tradeoff here to make:

  • If we await on each callback we can block the subscriber, meaning that it could lag behind the producer (current behavior), but ensures that callbacks are proceed in order with as single running task.
  • If we run each callback on separate tasks we don't lag behind, but we have an unbounded number of running tasks and we cannot ensure ordering (ex. the callback for slot 1 could not complete before we start running the callback for 2)

Charon uses the second approach but I'm not sure it's the right one.

Note that if we just returned the Receiver channel to the caller then it would be a per-caller decision (more flexibility, we push the decision for later).


// TODO: We might want to return a handle so clients can `.abort()` them to drop
// the subscription
tokio::spawn(async move {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would pass cancellation token here / store the handles in the struct and then on join them on drop

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not needed actually: when the actor gets dropped the rx.recv() call returns Err(Closed) so it exits automatically.

I'm thinking that maybe this approach of passing callbacks is not very good, and instead we should return the .subscribe() result received channel and let the caller decide what to do (ex. run it as tokio::spawn/tokio::spawn_blocking, handle cancellation, etc.)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Implement core/scheduler

2 participants