Skip to content

feat(consensus): implement consensus/qbft#463

Open
iamquang95 wants to merge 19 commits into
mainfrom
iamquang95/consensus-qbft
Open

feat(consensus): implement consensus/qbft#463
iamquang95 wants to merge 19 commits into
mainfrom
iamquang95/consensus-qbft

Conversation

@iamquang95
Copy link
Copy Markdown
Collaborator

@iamquang95 iamquang95 commented Jun 1, 2026

Fix: #157

This PR ports Charon’s core/consensus/qbft wrapper into Rust, but splits the large Go qbft.go into smaller files by responsibility. The existing pluto_core::qbft algorithm stays as the generic QBFT engine; this PR adds the production consensus layer around it: admission, value handling, runner bridge, p2p transport, decision dispatch, and tests/example.

Detailed Scope / Go Mapping

  1. charon/core/consensus/qbft/qbft.go is split mainly into:

    • component.rs: Consensus struct, NewConsensus, Subscribe, Start, handle, message admission, peer/pubkey lookup, instance IO map.
    • runner.rs: Propose, ProposePriority, Participate, internal propose, runInstance, async-to-core bridge, cancellation/teardown.
    • definition.rs: newDefinition, leader selection, decide callback, compare callback, round-change logging helpers, timeout reason logic.
    • transport.rs: adapter between generic core QBFT broadcasts and consensus QBFTConsensusMsg; value cache, self-send, sniffer integration.
    • p2p.rs: Rust libp2p protocol/handler/behaviour for QBFT message send/receive.
  2. Tests cover:

    • message hashing/signing/wrapping,
    • admission/validation,
    • runner lifecycle,
    • transport self-send/value cache,
    • libp2p framing/loopback,
    • full in-memory QBFT consensus scenarios.

@iamquang95
Copy link
Copy Markdown
Collaborator Author

@claude review this PR

@claude
Copy link
Copy Markdown

claude Bot commented Jun 1, 2026

Claude finished @iamquang95's task in 7m 43s —— View job


Code review — feat(consensus): implement consensus/qbft

Summary

Substantial port of Charon's core/consensus/qbft into a dedicated pluto-consensus crate. The bulk of the work — message wrapping (msg.rs), transport (transport.rs), the QBFT-core wiring (definition.rs), the runner (runner.rs), the inbound admission path (component.rs), the libp2p NetworkBehaviour (p2p.rs), and the sniffer — closely tracks the Go reference. Code is well-tested (~5k lines, including a full in-memory consensus simulation) and parity on the wire format is verified against a reference protobuf payload. A few concerns worth addressing below, none blocking.

Findings

[High] qbft_consensus test never exercises a sub-quorum cluster

crates/consensus/src/qbft/qbft_run_test.rs:27-32 — the test takes both threshold and cluster_nodes, but only spins up threshold peers: let active_nodes = in_memory_network(threshold, sniffed_tx);. The cluster_nodes arg is only used for the assert!(threshold <= cluster_nodes) and discarded afterward. Cases like (4, 6) — described as "4 of 6" — actually run a 4-of-4 healthy-cluster simulation. The intended threshold scenarios (silent peers, leader rotation across more nodes) aren't covered. Fix this →

[Medium] Consensus duration / decided-leader / decided-rounds metrics not ported

Charon's runInstance records ObserveConsensusDuration, SetDecidedLeaderIndex, SetDecidedRounds, IncConsensusTimeout, IncConsensusError (charon/core/consensus/qbft/qbft.go:432-606). Pluto plumbs decided_at_tx / take_decided_at_rx in crates/consensus/src/instance.rs:110-205 and writes Instant::now() in runner.rs:279, but nothing consumes the receiver — take_decided_at_rx is unused. Either wire up the metrics (core/consensus/metrics) or remove the dead channel to avoid the appearance of completeness. The dead plumbing has been allocated and tracked since eadb107.

[Medium] Behaviour::handle_broadcast + Handler::pending_open are unbounded queues

crates/consensus/src/qbft/p2p.rs:282,492-538pending_open: VecDeque<…> in the handler and pending_by_peer: HashMap<PeerId, VecDeque<PendingSend>> in the behaviour grow without backpressure. If a peer connection stays half-open and never accepts streams, every QBFT round adds another entry. Since QBFT rounds are bounded by round count this is unlikely to bite in practice, but worth documenting or capping (e.g. drop oldest after N pending sends per peer with a warn!). Charon's p2p.Sender.SendAsync has equivalent shape but is bounded by a per-peer worker queue.

[Medium] compare_attestations validity hinges on local value canonicality

crates/consensus/src/qbft/transport.rs:133-144get_value re-hashes local.value via hash_proto_bytes(&local.value) and inserts under that hash. The doc comment correctly notes "the local producer must pack canonical deterministic bytes." Today this is satisfied because the only writer is runner::proposeAny::from_msg(&value) (line runner.rs:121). If anyone in the future enqueues an inbound-derived Any into value_tx (which currently never happens), the hash would no longer match hash_proto(decoded) and get_value would silently start returning UnknownValue. Consider asserting the invariant at the boundary, or wrap value_tx so only locally-encoded Any can be enqueued.

[Medium] decide_callback may drop the decided-at signal silently

crates/consensus/src/qbft/runner.rs:278-282let _ = decided_at_tx.try_send(Instant::now()); uses try_send on a capacity-1 channel. If for any reason the decide-callback fires twice (the core does not, but the closure is Fn not FnOnce), the second value is dropped. Today this is harmless because the receiver isn't consumed (see Medium finding above). Once metrics are wired up, prefer let _ = decided_at_tx.send(...).await from a non-blocking context, or document that exactly-one decision is guaranteed.

[Medium] definition::new_timer swallows timer construction errors

crates/consensus/src/qbft/definition.rs:281-287 — on round_timer.timer(round) failure, the code logs a warn! and returns a qbft::Timer whose receive channel never fires. The QBFT core would then never advance that round. This affects only extreme inputs (negative or i64::MAX rounds, both rejected by verify_msg for inbound messages). Acceptable, but a tracing::error! plus an instance-cancel would be safer than silently stalling.

[Low] Consensus::handle enqueue race when receiver is closed mid-receive

crates/consensus/src/qbft/component.rs:373-386 — the branch Err(_) if inst.has_started() => Ok(()) correctly treats late messages as drops after consensus completed. Note the has_started() check races with the deadline-cleanup task; if the runner just finished and the cleanup task hasn't run yet, late inbound messages are silently dropped, which is the desired behaviour. Good comment in the source. No change required — calling out as evidence that the recent retain-until-deadline change (233e80e) is correct.

[Low] subscribe / subscribe_priority thread-safety doc drift

crates/consensus/src/qbft/component.rs:319-333 — Charon documents these as "not thread safe, called before Start and Propose." Pluto's SubscriberSet uses Arc<Mutex<…>>, so it is thread-safe — an improvement over Go. Worth updating the doc comment to reflect that subscribers may be registered at any time (the doc currently inherits Charon's stricter contract by omission).

[Low] decode_attestation_data_ssz magic numbers lack a spec reference

crates/core/src/dutydb/memory.rs:18-19,225-251 — the constants ATTESTATION_DATA_SSZ_OFFSET = 8, ATTESTER_DUTY_SSZ_SIZE = 96, and the field offsets 48..96 come from the Charon AttestationData + AttesterDuty SSZ layout. Add a comment pointing at the Go source (charon/core/encode.go / core/attestation.go) so future maintainers can verify against the spec.

[Low] Cargo dependency cleanup is well done

crates/core/Cargo.toml correctly drops futures, k256, libp2p, pluto-k1util, pluto-featureset now that the consensus module has moved out. Nothing else in pluto-core should still pull them in — verifiable via cargo machete (the last commit ran it).

Parity matrix

Component Go Rust Match Notes
Inbound admission order: verify→duty gate→justifications→values→deadline yes yes yes component.rs:336-387 vs qbft.go:614-676
Recv buffer capacity (100) instance.RecvBufferSize RECV_BUFFER_SIZE yes instance.rs:38
Single-slot channels (hash/value/verify/err/decided_at, cap 1) yes yes yes instance.rs:117-122
leader = (slot + duty_type + round) % nodes yes yes yes definition.rs:461-476; signed arithmetic via rem_euclid
QBFT v2 protocol ID protocols.QBFTv2ProtocolID QBFT_V2_PROTOCOL_ID yes
Inbound stream receive timeout 5s 5s yes p2p.rs:43
Outbound stream send timeout 7s 7s yes p2p.rs:45
Skip self on broadcast yes yes yes p2p.rs:492-507
Sniffer message buffer append-only append-only yes sniffer.rs
Consensus metrics ported absent no see Medium #2
Round timers (inc / dlinear / linear) yes yes yes timer.rs
compareAttestations source/target equality yes yes yes definition.rs:167-201
qbft.Run cancellation by decide callback yes yes yes runner.rs:277-282; instance_ct.cancel()
Subscribe / SubscribePriority value-type filtering yes yes yes component.rs:237-256

Tests

I could not run cargo +nightly fmt --all --check, cargo clippy ... -D warnings, or cargo test --workspace --all-features from this sandbox (cargo is not on the allowlist). Please verify locally — the in-memory consensus test (qbft_run_test.rs) and the libp2p loopback test in p2p.rs provide strong coverage but rely on tokio::time and may be flaky under high parallelism.

Open questions / assumptions

  • Are consensus metrics intentionally out of scope for this PR? If yes, drop decided_at_tx/take_decided_at_rx to keep the surface honest; if no, file a follow-up.
  • The retain-until-deadline behaviour added in 233e80e is correct, but please confirm there's a follow-up test that explicitly walks the "late inbound message after decision" code path (handle_drops_late_message_after_started_receiver_closed covers the basic case).
  • qbft_run_test should be extended to actually run a sub-quorum cluster — the current test names are misleading.
    · Branch: iamquang95/consensus-qbft

@iamquang95
Copy link
Copy Markdown
Collaborator Author

@varex83agent /review-pr

Copy link
Copy Markdown
Collaborator

@varex83agent varex83agent left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

QBFT consensus port — review

This PR ports Charon's core/consensus/qbft component to a new pluto-consensus crate (moving the existing consensus module out of pluto-core), plus the supporting libp2p adapter, transport, sniffer, round timers, and the dutydb unsigned-data-set SSZ/JSON decoding. Overall this is a high-quality, faithful port with strong test coverage and golden-vector parity tests. Three independent review passes (functional-equivalence vs Charon v1.7.1, security, and concurrency/state-machine) found no correctness bugs.

Verified parity (v1.7.1): leader election (slot+dutyType+round)%nodes, quorum (2n+2)/3, verify_msg/handle validation order and error strings, justification duty-differs check, compare_attester field comparison, hash_proto (SSZ PutBytes over deterministic proto + BTreeMap ordering), increasing/eager-double-linear/proposal timers, sniffer fields & protocol id, dutydb SSZ offsets (8/96), and participate() duty-type/feature gating all match the pinned Go reference.

Security: signature verification runs before any expensive work or instance creation; the instances map is gated by the duty gater + deadliner and reaped by the expired-cleanup loop; inbound size/timeout bounds are enforced; SSZ offset/length validation in decode_attestation_data_ssz is sound; no key material reaches logs/Debug; justification recursion is depth-1. No reachable issues found.

Bugs (must-fix): none.

Findings: 3 minor (a block_on-on-blocking-thread robustness/documentation note, error double-stringification that loses the typed source chain, and a test-coverage gap for the attester-compare end-to-end path) and 2 nits, all inline. None block merge — leaving open for author discretion.

Note (out of scope, FYI only): crates/consensus/src/timer.rs linear_subsequent_round_timeout uses from_millis with a comment claiming charon#4537 landed in the reference, but v1.7.1's linearRoundTimer still uses the bare time.Duration(200*(round-1)+200) (nanoseconds). This is pre-existing code relocated by the rename, not introduced here, so it is not flagged inline — worth a follow-up to confirm the deliberate deviation.

let transport_error = Arc::clone(&transport_error);
move |request| {
let justification = request.justification.cloned().unwrap_or_default();
let result = runtime.block_on(transport.broadcast(transport::BroadcastRequest {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The core broadcast closure runs on the spawn_blocking thread and calls runtime.block_on(transport.broadcast(...)). This is sound on a multi-threaded Tokio runtime (the call is not on a runtime worker), but it parks one blocking-pool thread for the whole broadcast — which itself awaits the external Broadcaster and tokio::spawns the detached self-send task. On a current_thread runtime those spawned tasks would never make progress while this thread is parked, and a saturated blocking pool would serialize instances. Worth documenting the multi-thread-runtime requirement on Consensus/run_instance (or driving the broadcast from the async side instead of block_on).

fn to_runner_result(result: &Result<()>) -> RunnerResult {
match result {
Ok(()) => Ok(()),
Err(err) => Err(Box::new(RunnerResultError(err.to_string())) as RunnerError),
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to_runner_result stringifies the typed runner::Error into RunnerResultError(err.to_string()), and wait_instance_result (runner.rs:393) stringifies it again into Error::RunnerResult(err.to_string()). The structured error and its #[source] chain (e.g. Core(QbftError), Transport(..)) are flattened to a bare String before any joiner sees it, so source() and matches! on the joined error are impossible. RunnerError = Box<dyn StdError + Send + Sync> can already carry the typed error; consider preserving one level of structure (e.g. RunnerResult(Box<dyn StdError ...>)) instead of String.

.expect("test peer index fits u8"),
),
broadcaster,
compare_attestations: false,
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The multi-node integration test always uses compare_attestations: false, so the attester-compare path — definition.rs::compare_attester, the verify_rxcore_verify_rx bridge, and the cross-thread local_compare_value cancellation-token wait — is only exercised by definition.rs unit tests that drive compare directly. The end-to-end handshake during a real decided round (a blocking callback inside the consensus loop) is never validated together. Consider an integration variant with compare_attestations: true and Attester duties carrying real SSZ attestation data to confirm no deadlock.


/// Returns true once this instance's runner has been started.
pub fn has_started(&self) -> bool {
self.running.load(Ordering::Relaxed)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: has_started() loads running with Relaxed and is consulted (component.rs / runner.rs) after observing the recv/value channel is closed. Because the flag and the channel are independent and Relaxed, observing channel-closed does not establish happens-before with the running=true store, so this may legally read a stale false. It is sound — worst case is a benign fallback (InputChannelFull/TimeoutEnqueuingReceiveBuffer instead of swallowing a late message) — but the doc comment states the flags "do not publish or synchronize," while these read sites do lean on the flag to interpret a channel event. A one-line note (or Acquire/Release if you want the stronger guarantee) would clarify.

Comment thread crates/consensus/src/qbft/p2p.rs Outdated

match result {
Ok(Ok(())) => Ok(()),
Ok(Err(error)) => Err(error.to_string()),
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: the inner future returns Result<(), String>, so in this Ok(Err(error)) arm error is already a String; Err(error.to_string()) performs a needless clone. Use Err(error) directly.

@iamquang95 iamquang95 marked this pull request as ready for review June 2, 2026 10:12
/// network fanout is best-effort and is not cancelled by this token.
pub async fn broadcast(
&self,
_ct: CancellationToken,
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we drop it?

Copy link
Copy Markdown
Collaborator

@emlautarom1 emlautarom1 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you provide some guidance on this PR? At over 6k LOC it's hard to figure out what its is scope.

@iamquang95
Copy link
Copy Markdown
Collaborator Author

@emlautarom1 I updated the PR description on the structure of the PR. Also I have to move consensus out from core to avoid the cyclic dependency

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Implement core/consensus/qbft

4 participants