feat(consensus): implement consensus/qbft#463
Conversation
|
@claude review this PR |
|
Claude finished @iamquang95's task in 7m 43s —— View job Code review — feat(consensus): implement consensus/qbftSummarySubstantial port of Charon's Findings[High]
|
| Component | Go | Rust | Match | Notes |
|---|---|---|---|---|
| Inbound admission order: verify→duty gate→justifications→values→deadline | yes | yes | yes | component.rs:336-387 vs qbft.go:614-676 |
| Recv buffer capacity (100) | instance.RecvBufferSize |
RECV_BUFFER_SIZE |
yes | instance.rs:38 |
| Single-slot channels (hash/value/verify/err/decided_at, cap 1) | yes | yes | yes | instance.rs:117-122 |
leader = (slot + duty_type + round) % nodes |
yes | yes | yes | definition.rs:461-476; signed arithmetic via rem_euclid |
| QBFT v2 protocol ID | protocols.QBFTv2ProtocolID |
QBFT_V2_PROTOCOL_ID |
yes | |
| Inbound stream receive timeout | 5s | 5s | yes | p2p.rs:43 |
| Outbound stream send timeout | 7s | 7s | yes | p2p.rs:45 |
| Skip self on broadcast | yes | yes | yes | p2p.rs:492-507 |
| Sniffer message buffer | append-only | append-only | yes | sniffer.rs |
| Consensus metrics | ported | absent | no | see Medium #2 |
| Round timers (inc / dlinear / linear) | yes | yes | yes | timer.rs |
compareAttestations source/target equality |
yes | yes | yes | definition.rs:167-201 |
qbft.Run cancellation by decide callback |
yes | yes | yes | runner.rs:277-282; instance_ct.cancel() |
Subscribe / SubscribePriority value-type filtering |
yes | yes | yes | component.rs:237-256 |
Tests
I could not run cargo +nightly fmt --all --check, cargo clippy ... -D warnings, or cargo test --workspace --all-features from this sandbox (cargo is not on the allowlist). Please verify locally — the in-memory consensus test (qbft_run_test.rs) and the libp2p loopback test in p2p.rs provide strong coverage but rely on tokio::time and may be flaky under high parallelism.
Open questions / assumptions
- Are consensus metrics intentionally out of scope for this PR? If yes, drop
decided_at_tx/take_decided_at_rxto keep the surface honest; if no, file a follow-up. - The retain-until-deadline behaviour added in
233e80eis correct, but please confirm there's a follow-up test that explicitly walks the "late inbound message after decision" code path (handle_drops_late_message_after_started_receiver_closedcovers the basic case). qbft_run_testshould be extended to actually run a sub-quorum cluster — the current test names are misleading.
· Branch:iamquang95/consensus-qbft
|
@varex83agent /review-pr |
varex83agent
left a comment
There was a problem hiding this comment.
QBFT consensus port — review
This PR ports Charon's core/consensus/qbft component to a new pluto-consensus crate (moving the existing consensus module out of pluto-core), plus the supporting libp2p adapter, transport, sniffer, round timers, and the dutydb unsigned-data-set SSZ/JSON decoding. Overall this is a high-quality, faithful port with strong test coverage and golden-vector parity tests. Three independent review passes (functional-equivalence vs Charon v1.7.1, security, and concurrency/state-machine) found no correctness bugs.
Verified parity (v1.7.1): leader election (slot+dutyType+round)%nodes, quorum (2n+2)/3, verify_msg/handle validation order and error strings, justification duty-differs check, compare_attester field comparison, hash_proto (SSZ PutBytes over deterministic proto + BTreeMap ordering), increasing/eager-double-linear/proposal timers, sniffer fields & protocol id, dutydb SSZ offsets (8/96), and participate() duty-type/feature gating all match the pinned Go reference.
Security: signature verification runs before any expensive work or instance creation; the instances map is gated by the duty gater + deadliner and reaped by the expired-cleanup loop; inbound size/timeout bounds are enforced; SSZ offset/length validation in decode_attestation_data_ssz is sound; no key material reaches logs/Debug; justification recursion is depth-1. No reachable issues found.
Bugs (must-fix): none.
Findings: 3 minor (a block_on-on-blocking-thread robustness/documentation note, error double-stringification that loses the typed source chain, and a test-coverage gap for the attester-compare end-to-end path) and 2 nits, all inline. None block merge — leaving open for author discretion.
Note (out of scope, FYI only): crates/consensus/src/timer.rs linear_subsequent_round_timeout uses from_millis with a comment claiming charon#4537 landed in the reference, but v1.7.1's linearRoundTimer still uses the bare time.Duration(200*(round-1)+200) (nanoseconds). This is pre-existing code relocated by the rename, not introduced here, so it is not flagged inline — worth a follow-up to confirm the deliberate deviation.
| let transport_error = Arc::clone(&transport_error); | ||
| move |request| { | ||
| let justification = request.justification.cloned().unwrap_or_default(); | ||
| let result = runtime.block_on(transport.broadcast(transport::BroadcastRequest { |
There was a problem hiding this comment.
The core broadcast closure runs on the spawn_blocking thread and calls runtime.block_on(transport.broadcast(...)). This is sound on a multi-threaded Tokio runtime (the call is not on a runtime worker), but it parks one blocking-pool thread for the whole broadcast — which itself awaits the external Broadcaster and tokio::spawns the detached self-send task. On a current_thread runtime those spawned tasks would never make progress while this thread is parked, and a saturated blocking pool would serialize instances. Worth documenting the multi-thread-runtime requirement on Consensus/run_instance (or driving the broadcast from the async side instead of block_on).
| fn to_runner_result(result: &Result<()>) -> RunnerResult { | ||
| match result { | ||
| Ok(()) => Ok(()), | ||
| Err(err) => Err(Box::new(RunnerResultError(err.to_string())) as RunnerError), |
There was a problem hiding this comment.
to_runner_result stringifies the typed runner::Error into RunnerResultError(err.to_string()), and wait_instance_result (runner.rs:393) stringifies it again into Error::RunnerResult(err.to_string()). The structured error and its #[source] chain (e.g. Core(QbftError), Transport(..)) are flattened to a bare String before any joiner sees it, so source() and matches! on the joined error are impossible. RunnerError = Box<dyn StdError + Send + Sync> can already carry the typed error; consider preserving one level of structure (e.g. RunnerResult(Box<dyn StdError ...>)) instead of String.
| .expect("test peer index fits u8"), | ||
| ), | ||
| broadcaster, | ||
| compare_attestations: false, |
There was a problem hiding this comment.
The multi-node integration test always uses compare_attestations: false, so the attester-compare path — definition.rs::compare_attester, the verify_rx → core_verify_rx bridge, and the cross-thread local_compare_value cancellation-token wait — is only exercised by definition.rs unit tests that drive compare directly. The end-to-end handshake during a real decided round (a blocking callback inside the consensus loop) is never validated together. Consider an integration variant with compare_attestations: true and Attester duties carrying real SSZ attestation data to confirm no deadlock.
|
|
||
| /// Returns true once this instance's runner has been started. | ||
| pub fn has_started(&self) -> bool { | ||
| self.running.load(Ordering::Relaxed) |
There was a problem hiding this comment.
nit: has_started() loads running with Relaxed and is consulted (component.rs / runner.rs) after observing the recv/value channel is closed. Because the flag and the channel are independent and Relaxed, observing channel-closed does not establish happens-before with the running=true store, so this may legally read a stale false. It is sound — worst case is a benign fallback (InputChannelFull/TimeoutEnqueuingReceiveBuffer instead of swallowing a late message) — but the doc comment states the flags "do not publish or synchronize," while these read sites do lean on the flag to interpret a channel event. A one-line note (or Acquire/Release if you want the stronger guarantee) would clarify.
|
|
||
| match result { | ||
| Ok(Ok(())) => Ok(()), | ||
| Ok(Err(error)) => Err(error.to_string()), |
There was a problem hiding this comment.
nit: the inner future returns Result<(), String>, so in this Ok(Err(error)) arm error is already a String; Err(error.to_string()) performs a needless clone. Use Err(error) directly.
| /// network fanout is best-effort and is not cancelled by this token. | ||
| pub async fn broadcast( | ||
| &self, | ||
| _ct: CancellationToken, |
emlautarom1
left a comment
There was a problem hiding this comment.
Could you provide some guidance on this PR? At over 6k LOC it's hard to figure out what its is scope.
|
@emlautarom1 I updated the PR description on the structure of the PR. Also I have to move consensus out from core to avoid the cyclic dependency |
Fix: #157
This PR ports Charon’s
core/consensus/qbftwrapper into Rust, but splits the large Goqbft.gointo smaller files by responsibility. The existingpluto_core::qbftalgorithm stays as the generic QBFT engine; this PR adds the production consensus layer around it: admission, value handling, runner bridge, p2p transport, decision dispatch, and tests/example.Detailed Scope / Go Mapping
charon/core/consensus/qbft/qbft.gois split mainly into:component.rs:Consensusstruct,NewConsensus,Subscribe,Start,handle, message admission, peer/pubkey lookup, instance IO map.runner.rs:Propose,ProposePriority,Participate, internalpropose,runInstance, async-to-core bridge, cancellation/teardown.definition.rs:newDefinition, leader selection, decide callback, compare callback, round-change logging helpers, timeout reason logic.transport.rs: adapter between generic core QBFT broadcasts and consensusQBFTConsensusMsg; value cache, self-send, sniffer integration.p2p.rs: Rust libp2p protocol/handler/behaviour for QBFT message send/receive.Tests cover: