feat: pre-warming cache to build Keyset for all the transactions#94
feat: pre-warming cache to build Keyset for all the transactions#94
Conversation
| /// Workers simulate transactions, extract keys, and merge into PreWarmedCache. | ||
| pub struct SimulationWorkerPool<T> { | ||
| /// Sender for submitting simulation jobs (clone-able, cheap) | ||
| sender: mpsc::UnboundedSender<SimulationRequest<T>>, |
There was a problem hiding this comment.
[suggest] use a bounded channel. may log warn if channel is full, or add metrics to count blocked sending
| let keys = dummy_simulate(&req.transaction); | ||
|
|
||
| // Merge into cache (thread-safe) | ||
| cache.merge_keys(keys); |
There was a problem hiding this comment.
the current approach might have some issues
- the cache might be cleared before the next block building?
- over fetched key set; the cache contain cache for all pending txs, but next block might only need a subset.
another direction might be
track the access list of every tx. later on, duirng block building, we merge all selected tx's access list for pre fetching. once new block is mined, we remove mined tx's access list from the cache
can keep your current design first; let's run some stress test to analyze the cache miss
can refer to https://github.com/okx/reth/blob/dev/crates/engine/tree/src/tree/payload_processor/prewarm.rs#L720 for how pre_fetching bal slos.
There was a problem hiding this comment.
adding simple accessList for transactions. accessList entry and actual cache of the transaction to be cleared as soon as the transaction is included in the block-built
| let cache = Arc::clone(&cache); | ||
| let config = config.clone(); | ||
|
|
||
| let handle = std::thread::spawn(move || { |
There was a problem hiding this comment.
use more lightweight tokio::spawn(async move {})
refer to existing WorkloadExecutor as in https://github.com/okx/reth/blob/dev/crates/engine/tree/src/tree/payload_processor/executor.rs#L14
There was a problem hiding this comment.
in the midst of migrating this to tokio::spawn
| ); | ||
|
|
||
| // Simulate transaction (dummy for now - Phase 4 will add real EVM) | ||
| let keys = dummy_simulate(&req.transaction); |
There was a problem hiding this comment.
this is in-progress, will be pushed today
There was a problem hiding this comment.
let simulation_timeout = config.simulation_timeout; // Get from config
let keys = match tokio::time::timeout(
simulation_timeout, // <-- ENFORCED HERE
tokio::task::spawn_blocking({
let simulator = simulator.clone();
let tx = req.transaction.clone();
move || simulate_transaction_sync(&simulator, &tx)
})
).await {
Ok(Ok(Ok(keys))) => keys, // Success
Ok(Ok(Err(e))) => dummy_simulate(), // Simulation error
Ok(Err(join_err)) => dummy_simulate(), // Task panicked
Err(_timeout) => { // <-- TIMEOUT TRIGGERED
warn!("Simulation timed out, using fallback");
dummy_simulate(&req.transaction)
}
};tokio::time::timeout(duration, future)
|
| -- Future completes within duration → Ok(result)
|
| -- Duration exceeded → Err(Elapsed) → fallback to dummy_simulate()
- Config Default: 100ms (from config.rs line 64)
- Summary: Timeout IS enforced via tokio::time::timeout() wrapper. If simulation exceeds config.simulation_timeout, it returns Err(_timeout) and falls back to dummy_simulate()
There was a problem hiding this comment.
@cliff0412 added enhancement in the worker_pool for simulate timeout
…recv to avoid blocking call on recv-channel for simulation-requests
| /// | ||
| /// Note: This doesn't interrupt ongoing simulations - they continue with the old snapshot. | ||
| /// Only new simulations will use the updated snapshot. | ||
| pub fn update_snapshot(&mut self, new_snapshot: Arc<SnapshotState>) { |
There was a problem hiding this comment.
pub fn update_snapshot(&self, new_snapshot: Arc<SnapshotState>) {
*self.snapshot_holder.write() = new_snapshot;
}Called by: pool/mod.rs::update_pre_warming_snapshot()
When: New block arrives, state changes, need fresh snapshot for simulation.
Why &self not &mut self: RwLock gives interior mutability. No need for exclusive access to struct.
How workers see update:
sequenceDiagram
participant Caller as on_canonical_state_change()<br/>pool/mod.rs
participant Pool as update_pre_warming_snapshot()<br/>pool/mod.rs
participant WP as update_snapshot()<br/>worker_pool.rs
participant Holder as snapshot_holder<br/>RwLock
participant Worker as worker_loop()<br/>worker_pool.rs
Caller->>Pool: update_pre_warming_snapshot(snapshot)
Pool->>WP: wp.update_snapshot(snapshot)
WP->>Holder: .write() = new_snapshot
Note over Holder: Now holds Block N+1
Worker->>Holder: .read().clone()
Holder-->>Worker: Arc<SnapshotState>
Worker->>Worker: Simulator::new(snapshot, chain_spec)
Not wired yet. on_canonical_state_change() needs to create SnapshotState from StateProvider and call update_pre_warming_snapshot().
/// Updates the snapshot used for simulation when a new block arrives.
///
/// This should be called whenever the chain state changes to ensure simulations
/// are performed against current state.
///
/// TODO: Wire this up - call from on_canonical_state_change() with fresh SnapshotState
/// created from StateProvider.
#[cfg(feature = "pre-warming")]
pub fn update_pre_warming_snapshot(
&self,
snapshot: std::sync::Arc<crate::pre_warming::SnapshotState>,
) {
if let Some(wp) = &self.worker_pool {
wp.update_snapshot(snapshot);
}
}- to be called from this existing function in
src/pool/mod.rs - this function has been enhanced to also clear up the transactions from the cache of simulator which has transactions queued up for simulation
/// Updates the entire pool after a new block was executed.
pub fn on_canonical_state_change<B>(&self, update: CanonicalStateUpdate<'_, B>)
where
B: Block,
{
trace!(target: "txpool", ?update, "updating pool on canonical state change");
let block_info = update.block_info();
let CanonicalStateUpdate {
new_tip, changed_accounts, mined_transactions, update_kind, ..
} = update;
self.validator.on_new_head_block(new_tip);
// Notify pre-warming cache BEFORE passing mined_transactions to pool
// This avoids cloning mined_transactions
self.notify_txs_removed(&mined_transactions);
let changed_senders = self.changed_senders(changed_accounts.into_iter());
// update the pool (takes ownership of mined_transactions)
let outcome = self.pool.write().on_canonical_state_change(
block_info,
mined_transactions,
changed_senders,
update_kind,
);
// This will discard outdated transactions based on the account's nonce
self.delete_discarded_blobs(outcome.discarded.iter());
// notify listeners about updates
self.notify_on_new_state(outcome);
}There was a problem hiding this comment.
@cliff0412 added details on where and how is update_snaphsot used
…hich is eventually to be called by pool
|
Our node is based on optimism node, so you need to wire in features "prewarming" for this binary during build time. |
Pre-Warming Cache Implementation for X-Layer
TL;DR
This PR implements a background transaction simulation system that extracts state keys (accounts, storage slots, bytecode) from pending transactions before block building starts. These keys are then used to batch-prefetch data from MDBX, pre-populating the execution cache so transactions execute with near-100% cache hits instead of sequential database queries.
Result: Reduced I/O latency during block execution, critical for X-Layer's 400ms block time.
Summary of Changes
New Module:
crates/transaction-pool/src/pre_warming/mod.rstypes.rsExtractedKeys,SimulationRequeststructsconfig.rsPreWarmingConfigwith validation and builder patterncache.rsPreWarmedCache- per-TX key storage with RwLockworker_pool.rsSimulationWorkerPool- bounded channel, parallel workerssimulator.rsSimulator- EVM simulation wrappersnapshot_state.rsSnapshotState- immutable state with dedup cachebridge.rsprefetch_with_snapshot- parallel MDBX prefetchtests.rsModified Files
crates/node/core/src/args/txpool.rsreth-node-corecrates/node/core/Cargo.tomlreth-node-corepre-warmingcrates/node/builder/src/components/payload.rsreth-node-builderBasicPayloadJobGeneratorcrates/payload/basic/src/lib.rsreth-basic-payload-builderwith_pool(), prefetch innew_payload_job()crates/payload/basic/Cargo.tomlreth-basic-payload-builderpre-warmingcrates/transaction-pool/Cargo.tomlreth-transaction-poolpre-warmingbin/reth/Cargo.tomlrethHow to Enable
1. Compile with Feature Flag
2. Node Startup Parameters
3. Configuration Options
--txpool.pre-warmingfalse--txpool.pre-warming-workers4--txpool.pre-warming-timeout-ms100--txpool.pre-warming-cache-ttl60--txpool.pre-warming-cache-max10000Transaction Flow After Validation
New Components
1. ExtractedKeys
Location:
crates/transaction-pool/src/pre_warming/types.rsPurpose: Stores the set of state keys that a transaction will access during execution.
Key Methods:
add_account(addr)- Add an account to prefetchadd_storage_slot(addr, slot)- Add a storage slotmerge(other)- Combine keys from multiple transactionsage()- Time since creation (for TTL)2. PreWarmedCache
Location:
crates/transaction-pool/src/pre_warming/cache.rsPurpose: Thread-safe per-transaction key storage. Maps
tx_hash → ExtractedKeys.Key Methods:
store_tx_keys(tx_hash, keys)- Store keys after simulationget_keys_for_txs(&[tx_hash])- Get merged keys for selected TXsremove_txs(&[tx_hash])- Cleanup after block minedstats()- Cache statistics for monitoringWhy Per-TX (not Aggregated)?
3. SimulationWorkerPool
Location:
crates/transaction-pool/src/pre_warming/worker_pool.rsPurpose: Manages N worker tasks that simulate transactions in parallel.
Key Methods:
trigger_simulation(request)- Fire-and-forget, non-blockingupdate_snapshot(new_snapshot)- Called when new block arrivesshutdown()- Graceful shutdownBounded Channel:
num_workers × 10(e.g., 80 for 8 workers)4. SnapshotState
Location:
crates/transaction-pool/src/pre_warming/snapshot_state.rsPurpose: Immutable state snapshot for parallel simulation with internal deduplication cache.
Why Snapshot?
5. Simulator
Location:
crates/transaction-pool/src/pre_warming/simulator.rsPurpose: Wraps EVM to execute transactions in read-only mode and extract accessed keys.
Key Method:
simulate(tx, sender, block_env) → Result<ExtractedKeys>6. Bridge Functions
Location:
crates/transaction-pool/src/pre_warming/bridge.rsPurpose: Bridges between PreWarmedCache keys and CachedReads values.
Key Functions:
prefetch_and_populate(cached_reads, keys, state_provider)- Sequential prefetchprefetch_parallel(cached_reads, keys, snapshot)- Parallel prefetch (requires SnapshotState)Component Wiring
Internal Architecture of Each Component
SimulationWorkerPool Architecture
SnapshotState Architecture
PreWarmedCache Architecture
Tests
Test Summary
types.rsconfig.rscache.rsworker_pool.rssnapshot_state.rstests.rsKey Test Scenarios
Performance Characteristics
trigger_simulation()latencyExpected Impact
Risk Assessment
Metrics
Location
crates/transaction-pool/src/pre_warming/metrics.rsPrometheus Metrics (scope:
txpool_pre_warming)Simulation Metrics
simulations_triggeredsimulations_completedsimulations_failedsimulations_droppedsimulation_durationCache Metrics
cache_entriescache_keys_totalcache_hitscache_missescache_evictionsPrefetch Metrics
prefetch_accountsprefetch_storage_slotsprefetch_contractsprefetch_durationprefetch_operationsSnapshot Metrics
snapshot_updatesAccess Metrics
Key Health Indicators
simulations_droppedsimulations_failedratesimulation_durationp99TODO / Future Work
How to Test