Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 90 additions & 5 deletions src/tasks/cache/bundle.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
//! Bundler service responsible for fetching bundles and sending them to the simulator.
use crate::config::BuilderConfig;
use alloy::providers::Provider;
use futures_util::{TryStreamExt, stream};
use init4_bin_base::perms::tx_cache::{BuilderTxCache, BuilderTxCacheError};
use signet_tx_cache::{TxCacheError, types::CachedBundle};
use tokio::{
sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel},
task::JoinHandle,
time::{self, Duration},
};
use tracing::{Instrument, error, trace, trace_span};
use tracing::{Instrument, debug, debug_span, error, trace, trace_span};

/// Poll interval for the bundle poller in milliseconds.
const POLL_INTERVAL_MS: u64 = 1000;
Expand Down Expand Up @@ -71,6 +73,92 @@ impl BundlePoller {
}
}

/// Spawns a tokio task to check the nonces of all host transactions in a bundle
/// before sending it to the cache task via the outbound channel.
///
/// Uses the bundle's `host_tx_reqs()` to extract signer/nonce requirements
/// (reusing the existing validity check pattern from `signet-sim`), then checks
/// all host tx nonces concurrently via [`FuturesUnordered`], cancelling early
/// on the first stale or failed nonce.
///
/// [`FuturesUnordered`]: futures_util::stream::FuturesUnordered
fn spawn_check_bundle_nonces(bundle: CachedBundle, outbound: UnboundedSender<CachedBundle>) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should re-use existing validity checks from crates/sim/src/cache/item.rs

tokio::spawn(async move {
let span = debug_span!("check_bundle_nonces", bundle_id = %bundle.id);

// Recover the bundle to get typed host tx requirements instead of
// manually decoding and recovering signers.
let recovered = match bundle.bundle.try_to_recovered() {
Ok(r) => r,
Err(e) => {
span_debug!(span, ?e, "Failed to recover bundle, dropping");
return;
}
};

// If no host transactions, forward directly
if recovered.host_txs().is_empty() {
if outbound.send(bundle).is_err() {
span_debug!(span, "Outbound channel closed, stopping nonce check task");
}
return;
}

let Ok(host_provider) =
crate::config().connect_host_provider().instrument(span.clone()).await
else {
span_debug!(span, "Failed to connect to host provider, stopping nonce check task");
return;
};

// Collect host tx requirements (signer + nonce) from the recovered bundle
let reqs: Vec<_> = recovered.host_tx_reqs().enumerate().collect();

// Check all host tx nonces concurrently, cancelling on first failure.
let result = stream::iter(reqs)
.map(Ok)
.try_for_each_concurrent(None, |(idx, req)| {
let host_provider = &host_provider;
let span = &span;
async move {
let tx_count = host_provider
.get_transaction_count(req.signer)
.await
.map_err(|_| {
span_debug!(
span,
idx,
sender = %req.signer,
"Failed to fetch nonce for sender, dropping bundle"
);
})?;

if req.nonce < tx_count {
debug!(
parent: span,
sender = %req.signer,
tx_nonce = %req.nonce,
host_nonce = %tx_count,
idx,
"Dropping bundle with stale host tx nonce"
);
return Err(());
}

Ok(())
}
})
.await;

// All host txs have valid nonces, forward the bundle
if result.is_ok() {
if outbound.send(bundle).is_err() {
span_debug!(span, "Outbound channel closed, stopping nonce check task");
}
}
});
}

async fn task_future(self, outbound: UnboundedSender<CachedBundle>) {
loop {
let span = trace_span!("BundlePoller::loop", url = %self.config.tx_pool_url);
Expand All @@ -89,10 +177,7 @@ impl BundlePoller {

if let Ok(bundles) = self.check_bundle_cache().instrument(span.clone()).await {
for bundle in bundles.into_iter() {
if let Err(err) = outbound.send(bundle) {
span_debug!(span, ?err, "Failed to send bundle - channel is dropped");
break;
}
Self::spawn_check_bundle_nonces(bundle, outbound.clone());
}
}

Expand Down