From 2e5d4926ba4a9f32f6a0bab0ac06bbeb8d1f792c Mon Sep 17 00:00:00 2001 From: binarybaron Date: Fri, 5 Jun 2026 13:47:02 +0200 Subject: [PATCH 1/2] fix(bitcoin wallet): cleanup subscriptions --- bitcoin-wallet/src/wallet.rs | 65 +++++++++++++++++-------- bitcoin-wallet/tests/integration.rs | 75 +++++++++++++++++++++++++++++ 2 files changed, 120 insertions(+), 20 deletions(-) diff --git a/bitcoin-wallet/src/wallet.rs b/bitcoin-wallet/src/wallet.rs index 6183aad3ca..1173f7870d 100644 --- a/bitcoin-wallet/src/wallet.rs +++ b/bitcoin-wallet/src/wallet.rs @@ -127,16 +127,20 @@ pub struct Client { inner: Arc, /// The history of transactions for each script. script_history: Arc>>>, - /// The subscriptions to the status of transactions. - subscriptions: Arc>>, + /// The status-update channels we poll, keyed by the watched transaction and script. + subscriptions: Arc>>>, /// The time of the last sync. last_sync: Arc>, /// How often we sync with the server. sync_interval: Duration, + /// How long a subscription is kept alive after its last receiver is dropped. + subscription_idle_timeout: Duration, /// The height of the latest block we know about. latest_block_height: Arc>, } +const DEFAULT_SUBSCRIPTION_IDLE_TIMEOUT: Duration = Duration::from_secs(2 * 60); + /// Holds the configuration parameters for creating a Bitcoin wallet. /// The actual Wallet will be constructed from this configuration. #[derive(Builder, Clone)] @@ -159,6 +163,8 @@ pub struct WalletConfig { finality_confirmations: u32, target_block: u32, sync_interval: Duration, + #[builder(default = "DEFAULT_SUBSCRIPTION_IDLE_TIMEOUT")] + subscription_idle_timeout: Duration, #[builder(default)] tauri_handle: TauriHandle, #[builder(default = "true")] @@ -174,9 +180,10 @@ impl WalletBuilder { .validate_config() .map_err(|e| anyhow!("Builder validation failed: {e}"))?; - let client = Client::new(&config.electrum_rpc_urls, config.sync_interval) + let mut client = Client::new(&config.electrum_rpc_urls, config.sync_interval) .await .context("Failed to create Electrum client")?; + client.subscription_idle_timeout = config.subscription_idle_timeout; match &config.persister { PersisterConfig::SqliteFile { data_dir } => { @@ -803,6 +810,7 @@ impl Wallet { pub async fn subscribe_to(&self, tx: Box) -> Subscription { let txid = tx.id(); let script = tx.script(); + let idle_timeout = self.electrum_client.subscription_idle_timeout; let initial_status = match self.electrum_client.status_of_script(&tx, false).await { Ok(status) => Some(status), @@ -814,14 +822,16 @@ impl Wallet { let mut subscriptions = self.electrum_client.subscriptions.lock().await; - let sub = subscriptions + let sender = subscriptions .entry((txid, script.clone())) .or_insert_with(|| { - let (sender, receiver) = watch::channel(ScriptStatus::Unseen); + let (sender, _) = watch::channel(ScriptStatus::Unseen); let client = self.electrum_client.clone(); + let task_sender = sender.clone(); tokio::spawn(async move { let mut last_status = initial_status; + let mut idle_since: Option = None; loop { let new_status = client @@ -832,32 +842,46 @@ impl Wallet { ScriptStatus::Retrying }); - if new_status != ScriptStatus::Retrying - { + if new_status != ScriptStatus::Retrying { last_status = Some(trace_status_change(txid, last_status, new_status)); + let _ = task_sender.send(new_status); + } - let all_receivers_gone = sender.send(new_status).is_err(); - - if all_receivers_gone { - tracing::debug!(%txid, "All receivers gone, removing subscription"); - client.subscriptions.lock().await.remove(&(txid, script)); + if task_sender.receiver_count() > 0 { + idle_since = None; + } else if idle_since.get_or_insert_with(Instant::now).elapsed() + >= idle_timeout + { + let mut subscriptions = client.subscriptions.lock().await; + + if subscriptions + .get(&(txid, script.clone())) + .is_some_and(|sender| sender.receiver_count() == 0) + { + tracing::debug!(%txid, ?idle_timeout, "No subscribers for transaction status, dropping subscription"); + subscriptions.remove(&(txid, script)); return; } + + idle_since = None; } tokio::time::sleep(Duration::from_secs(5)).await; } }.instrument(debug_span!("BitcoinWalletSubscription"))); - Subscription { - receiver, - finality_confirmations: self.finality_confirmations, - txid, - } - }) - .clone(); + sender + }); + + Subscription { + receiver: sender.subscribe(), + finality_confirmations: self.finality_confirmations, + txid, + } + } - sub + pub async fn active_subscription_count(&self) -> usize { + self.electrum_client.subscriptions.lock().await.len() } pub async fn wallet_export(&self, role: &str) -> Result { @@ -1624,6 +1648,7 @@ impl Client { script_history: Arc::new(TokioRwLock::new(BTreeMap::new())), last_sync: Arc::new(SyncMutex::new(initial_last_sync)), sync_interval, + subscription_idle_timeout: DEFAULT_SUBSCRIPTION_IDLE_TIMEOUT, latest_block_height: Arc::new(SyncMutex::new(BlockHeight::from(0))), subscriptions: Arc::new(TokioMutex::new(HashMap::new())), }) diff --git a/bitcoin-wallet/tests/integration.rs b/bitcoin-wallet/tests/integration.rs index bd6846726d..4092a4c9be 100644 --- a/bitcoin-wallet/tests/integration.rs +++ b/bitcoin-wallet/tests/integration.rs @@ -173,3 +173,78 @@ async fn wallet_sends_broadcasts_and_confirms() -> Result<()> { Ok(()) } + +#[tokio::test] +async fn unused_subscription_is_dropped_after_idle_timeout() -> Result<()> { + init_tracing(); + + let cli = Cli::default(); + let env = harness::setup(&cli).await?; + + let idle_timeout = Duration::from_secs(3); + + let wallet = WalletBuilder::::default() + .seed(TestSeed::default()) + .network(bitcoin::Network::Regtest) + .electrum_rpc_urls(vec![env.electrum_url.clone()]) + .persister(PersisterConfig::InMemorySqlite) + .finality_confirmations(1u32) + .target_block(1u32) + .sync_interval(Duration::from_millis(0)) + .subscription_idle_timeout(idle_timeout) + .use_mempool_space_fee_estimation(false) + .build() + .await?; + + wallet.sync().await?; + + let receive_addr = wallet.new_address().await?; + let funding = bitcoin::Amount::from_sat(100_000); + harness::fund_and_mine(&env.bitcoind, receive_addr, funding).await?; + sync_until_balance(&wallet, funding).await?; + + let recipient = env + .bitcoind + .with_wallet(harness::BITCOIN_TEST_WALLET_NAME)? + .getnewaddress(None, None) + .await? + .require_network(env.bitcoind.network().await?)?; + + let psbt = wallet + .send_to_address( + recipient, + bitcoin::Amount::from_sat(25_000), + bitcoin::Amount::from_sat(2_000), + None, + ) + .await?; + let tx = wallet.sign_and_finalize(psbt).await?; + + // Broadcasting subscribes to the transaction's status. + let (_txid, sub) = wallet.broadcast(tx, "it-idle-cleanup").await?; + assert_eq!(wallet.active_subscription_count().await, 1); + + tokio::time::sleep(idle_timeout * 2).await; + assert_eq!( + wallet.active_subscription_count().await, + 1, + "subscription with a live receiver must not be dropped" + ); + + drop(sub); + + let deadline = tokio::time::Instant::now() + Duration::from_secs(30); + loop { + if wallet.active_subscription_count().await == 0 { + break; + } + + if tokio::time::Instant::now() >= deadline { + anyhow::bail!("subscription was not dropped after its last receiver was gone"); + } + + tokio::time::sleep(Duration::from_millis(250)).await; + } + + Ok(()) +} From 0365c79891294050b57bdcb07c780961328490a7 Mon Sep 17 00:00:00 2001 From: Mohan <86064887+binarybaron@users.noreply.github.com> Date: Fri, 5 Jun 2026 14:05:14 +0200 Subject: [PATCH 2/2] Update wallet.rs --- bitcoin-wallet/src/wallet.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bitcoin-wallet/src/wallet.rs b/bitcoin-wallet/src/wallet.rs index 1173f7870d..620c543573 100644 --- a/bitcoin-wallet/src/wallet.rs +++ b/bitcoin-wallet/src/wallet.rs @@ -139,7 +139,7 @@ pub struct Client { latest_block_height: Arc>, } -const DEFAULT_SUBSCRIPTION_IDLE_TIMEOUT: Duration = Duration::from_secs(2 * 60); +const DEFAULT_SUBSCRIPTION_IDLE_TIMEOUT: Duration = Duration::from_secs(4 * 60); /// Holds the configuration parameters for creating a Bitcoin wallet. /// The actual Wallet will be constructed from this configuration.