Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 45 additions & 20 deletions bitcoin-wallet/src/wallet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,16 +127,20 @@ pub struct Client {
inner: Arc<ElectrumBalancer>,
/// The history of transactions for each script.
script_history: Arc<TokioRwLock<BTreeMap<ScriptBuf, Vec<GetHistoryRes>>>>,
/// The subscriptions to the status of transactions.
subscriptions: Arc<TokioMutex<HashMap<(Txid, ScriptBuf), Subscription>>>,
/// The status-update channels we poll, keyed by the watched transaction and script.
subscriptions: Arc<TokioMutex<HashMap<(Txid, ScriptBuf), watch::Sender<ScriptStatus>>>>,
/// The time of the last sync.
last_sync: Arc<SyncMutex<Instant>>,
/// How often we sync with the server.
sync_interval: Duration,
/// How long a subscription is kept alive after its last receiver is dropped.
subscription_idle_timeout: Duration,
/// The height of the latest block we know about.
latest_block_height: Arc<SyncMutex<BlockHeight>>,
}

const DEFAULT_SUBSCRIPTION_IDLE_TIMEOUT: Duration = Duration::from_secs(4 * 60);

/// Holds the configuration parameters for creating a Bitcoin wallet.
/// The actual Wallet<Connection> will be constructed from this configuration.
#[derive(Builder, Clone)]
Expand All @@ -159,6 +163,8 @@ pub struct WalletConfig<Seed: BitcoinWalletSeed> {
finality_confirmations: u32,
target_block: u32,
sync_interval: Duration,
#[builder(default = "DEFAULT_SUBSCRIPTION_IDLE_TIMEOUT")]
subscription_idle_timeout: Duration,
#[builder(default)]
tauri_handle: TauriHandle,
#[builder(default = "true")]
Expand All @@ -174,9 +180,10 @@ impl<Seed: BitcoinWalletSeed> WalletBuilder<Seed> {
.validate_config()
.map_err(|e| anyhow!("Builder validation failed: {e}"))?;

let client = Client::new(&config.electrum_rpc_urls, config.sync_interval)
let mut client = Client::new(&config.electrum_rpc_urls, config.sync_interval)
.await
.context("Failed to create Electrum client")?;
client.subscription_idle_timeout = config.subscription_idle_timeout;

match &config.persister {
PersisterConfig::SqliteFile { data_dir } => {
Expand Down Expand Up @@ -803,6 +810,7 @@ impl Wallet {
pub async fn subscribe_to(&self, tx: Box<dyn Watchable>) -> Subscription {
let txid = tx.id();
let script = tx.script();
let idle_timeout = self.electrum_client.subscription_idle_timeout;

let initial_status = match self.electrum_client.status_of_script(&tx, false).await {
Ok(status) => Some(status),
Expand All @@ -814,14 +822,16 @@ impl Wallet {

let mut subscriptions = self.electrum_client.subscriptions.lock().await;

let sub = subscriptions
let sender = subscriptions
.entry((txid, script.clone()))
.or_insert_with(|| {
let (sender, receiver) = watch::channel(ScriptStatus::Unseen);
let (sender, _) = watch::channel(ScriptStatus::Unseen);
let client = self.electrum_client.clone();
let task_sender = sender.clone();

tokio::spawn(async move {
let mut last_status = initial_status;
let mut idle_since: Option<Instant> = None;

loop {
let new_status = client
Expand All @@ -832,32 +842,46 @@ impl Wallet {
ScriptStatus::Retrying
});

if new_status != ScriptStatus::Retrying
{
if new_status != ScriptStatus::Retrying {
last_status = Some(trace_status_change(txid, last_status, new_status));
let _ = task_sender.send(new_status);
}

let all_receivers_gone = sender.send(new_status).is_err();

if all_receivers_gone {
tracing::debug!(%txid, "All receivers gone, removing subscription");
client.subscriptions.lock().await.remove(&(txid, script));
if task_sender.receiver_count() > 0 {
idle_since = None;
} else if idle_since.get_or_insert_with(Instant::now).elapsed()
>= idle_timeout
{
let mut subscriptions = client.subscriptions.lock().await;

if subscriptions
.get(&(txid, script.clone()))
.is_some_and(|sender| sender.receiver_count() == 0)
{
tracing::debug!(%txid, ?idle_timeout, "No subscribers for transaction status, dropping subscription");
subscriptions.remove(&(txid, script));
return;
}

idle_since = None;
}

tokio::time::sleep(Duration::from_secs(5)).await;
}
}.instrument(debug_span!("BitcoinWalletSubscription")));

Subscription {
receiver,
finality_confirmations: self.finality_confirmations,
txid,
}
})
.clone();
sender
});

Subscription {
receiver: sender.subscribe(),
finality_confirmations: self.finality_confirmations,
txid,
}
}

sub
pub async fn active_subscription_count(&self) -> usize {
self.electrum_client.subscriptions.lock().await.len()
}

pub async fn wallet_export(&self, role: &str) -> Result<FullyNodedExport> {
Expand Down Expand Up @@ -1624,6 +1648,7 @@ impl Client {
script_history: Arc::new(TokioRwLock::new(BTreeMap::new())),
last_sync: Arc::new(SyncMutex::new(initial_last_sync)),
sync_interval,
subscription_idle_timeout: DEFAULT_SUBSCRIPTION_IDLE_TIMEOUT,
latest_block_height: Arc::new(SyncMutex::new(BlockHeight::from(0))),
subscriptions: Arc::new(TokioMutex::new(HashMap::new())),
})
Expand Down
75 changes: 75 additions & 0 deletions bitcoin-wallet/tests/integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,3 +173,78 @@ async fn wallet_sends_broadcasts_and_confirms() -> Result<()> {

Ok(())
}

#[tokio::test]
async fn unused_subscription_is_dropped_after_idle_timeout() -> Result<()> {
init_tracing();

let cli = Cli::default();
let env = harness::setup(&cli).await?;

let idle_timeout = Duration::from_secs(3);

let wallet = WalletBuilder::<TestSeed>::default()
.seed(TestSeed::default())
.network(bitcoin::Network::Regtest)
.electrum_rpc_urls(vec![env.electrum_url.clone()])
.persister(PersisterConfig::InMemorySqlite)
.finality_confirmations(1u32)
.target_block(1u32)
.sync_interval(Duration::from_millis(0))
.subscription_idle_timeout(idle_timeout)
.use_mempool_space_fee_estimation(false)
.build()
.await?;

wallet.sync().await?;

let receive_addr = wallet.new_address().await?;
let funding = bitcoin::Amount::from_sat(100_000);
harness::fund_and_mine(&env.bitcoind, receive_addr, funding).await?;
sync_until_balance(&wallet, funding).await?;

let recipient = env
.bitcoind
.with_wallet(harness::BITCOIN_TEST_WALLET_NAME)?
.getnewaddress(None, None)
.await?
.require_network(env.bitcoind.network().await?)?;

let psbt = wallet
.send_to_address(
recipient,
bitcoin::Amount::from_sat(25_000),
bitcoin::Amount::from_sat(2_000),
None,
)
.await?;
let tx = wallet.sign_and_finalize(psbt).await?;

// Broadcasting subscribes to the transaction's status.
let (_txid, sub) = wallet.broadcast(tx, "it-idle-cleanup").await?;
assert_eq!(wallet.active_subscription_count().await, 1);

tokio::time::sleep(idle_timeout * 2).await;
assert_eq!(
wallet.active_subscription_count().await,
1,
"subscription with a live receiver must not be dropped"
);

drop(sub);

let deadline = tokio::time::Instant::now() + Duration::from_secs(30);
loop {
if wallet.active_subscription_count().await == 0 {
break;
}

if tokio::time::Instant::now() >= deadline {
anyhow::bail!("subscription was not dropped after its last receiver was gone");
}

tokio::time::sleep(Duration::from_millis(250)).await;
}

Ok(())
}
Loading