From d6ce1c959a6573a7c5c1ba6a90822ec5d16b8684 Mon Sep 17 00:00:00 2001 From: Phong Chuong Date: Fri, 24 Apr 2026 21:23:45 +0000 Subject: [PATCH 1/2] impl(pubsub): update last extension for successful extend --- src/pubsub/src/subscriber/lease_loop.rs | 51 +++++++++-- src/pubsub/src/subscriber/lease_state.rs | 88 +++++++++++++++---- .../subscriber/lease_state/at_least_once.rs | 5 +- .../subscriber/lease_state/exactly_once.rs | 8 +- 4 files changed, 119 insertions(+), 33 deletions(-) diff --git a/src/pubsub/src/subscriber/lease_loop.rs b/src/pubsub/src/subscriber/lease_loop.rs index 811316790f..79545cc099 100644 --- a/src/pubsub/src/subscriber/lease_loop.rs +++ b/src/pubsub/src/subscriber/lease_loop.rs @@ -87,6 +87,12 @@ impl LeaseLoop { match event { LeaseEvent::Flush => state.flush(), LeaseEvent::Extend => state.extend(), + LeaseEvent::ExtendCompleted(ack_ids) => { + state.update_last_extension(ack_ids); + } + LeaseEvent::ExtendCompletedEO(ack_ids) => { + state.update_last_extension_eo(ack_ids); + } } }, message = message_rx.recv() => { @@ -378,7 +384,7 @@ mod tests { } #[tokio_test_no_panics(start_paused = true)] - async fn deadline_interval() -> anyhow::Result<()> { + async fn extend_interval() -> anyhow::Result<()> { const EXTEND_PERIOD: Duration = Duration::from_secs(1); const EXTEND_START: Duration = Duration::from_millis(200); @@ -390,8 +396,9 @@ mod tests { flush_start: Duration::from_secs(900), extend_period: EXTEND_PERIOD, extend_start: EXTEND_START, - // extend leases for all messages on the timer - max_lease_extension: Duration::ZERO, + // max_lease_extension is set to 7 seconds as the test advances + // extend_period twice and the buffer is 5 seconds. + max_lease_extension: Duration::from_secs(7), ..Default::default() }; let lease_loop = LeaseLoop::new(mock.clone(), confirmed_rx, options); @@ -403,6 +410,14 @@ mod tests { lease_loop.strong_message_tx().send(test_message(i))?; } + // Seed the lease loop with some exactly-once messages + for i in 30..60 { + lease_loop.strong_message_tx().send(NewMessage { + ack_id: test_id(i), + lease_info: exactly_once_info(), + })?; + } + // Confirm initial state mock.lock().await.checkpoint(); @@ -414,6 +429,14 @@ mod tests { .times(1) .withf(|v| sorted(v) == test_ids(0..30)) .returning(move |ack_ids| ack_ids); + + mock.lock() + .await + .expect_extend() + .times(1) + .withf(|v| sorted(v) == test_ids(30..60)) + .returning(move |ack_ids| ack_ids); + tokio::time::advance(EXTEND_START).await; // Yield the current task, so tokio can execute the flush(). @@ -426,14 +449,32 @@ mod tests { lease_loop.strong_ack_tx().send(Action::Ack(test_id(i)))?; } - // Advance to and validate the second extension + // Advance to and validate the second extension period (should be skipped) + { + mock.lock().await.expect_extend().times(0); + tokio::time::advance(EXTEND_PERIOD).await; + + // Yield the current task, so tokio can execute the flush(). + tokio::task::yield_now().await; + mock.lock().await.checkpoint(); + } + + // Advance to and validate the third extension (should be extended) { mock.lock() .await .expect_extend() .times(1) .withf(|v| sorted(v) == test_ids(10..30)) - .returning(move |ack_ids| ack_ids); + .returning(|ack_ids| ack_ids); + + mock.lock() + .await + .expect_extend() + .times(1) + .withf(|v| sorted(v) == test_ids(30..60)) + .returning(|ack_ids| ack_ids); + tokio::time::advance(EXTEND_PERIOD).await; // Yield the current task, so tokio can execute the flush(). diff --git a/src/pubsub/src/subscriber/lease_state.rs b/src/pubsub/src/subscriber/lease_state.rs index 66519c30b9..db392533f8 100644 --- a/src/pubsub/src/subscriber/lease_state.rs +++ b/src/pubsub/src/subscriber/lease_state.rs @@ -188,6 +188,10 @@ pub(super) enum LeaseEvent { Flush, /// Extend leases Extend, + /// Pending extensions completed + ExtendCompleted(Vec), + /// Pending exactly-once extensions completed + ExtendCompletedEO(Vec), } impl LeaseState @@ -228,6 +232,22 @@ where tokio::select! { _ = self.flush_interval.tick() => LeaseEvent::Flush, _ = self.extend_interval.tick() => LeaseEvent::Extend, + res = self.pending_extends.join_next(), if !self.pending_extends.is_empty() => { + if let Some(Ok(ack_ids)) = res { + LeaseEvent::ExtendCompleted(ack_ids) + } else { + // swallow the JoinError. + LeaseEvent::ExtendCompleted(Vec::new()) + } + } + res = self.eo_pending_extends.join_next(), if !self.eo_pending_extends.is_empty() => { + if let Some(Ok(ack_ids)) = res { + LeaseEvent::ExtendCompletedEO(ack_ids) + } else { + // swallow the JoinError. + LeaseEvent::ExtendCompletedEO(Vec::new()) + } + } } } @@ -262,14 +282,12 @@ where /// Updates the `last_extension` timestamp for the given at-least-once ack IDs /// with the completion time of a successful extension RPC. - #[allow(dead_code)] pub(super) fn update_last_extension(&mut self, ack_ids: Vec) { self.leases.update_last_extension(&ack_ids); } /// Updates the `last_extension` timestamp for the given exactly-once ack IDs /// with the completion time of a successful extension RPC. - #[allow(dead_code)] pub(super) fn update_last_extension_eo(&mut self, ack_ids: Vec) { self.eo_leases.update_last_extension(&ack_ids); } @@ -305,6 +323,10 @@ where /// /// Drops messages whose lease deadline cannot be extended any further. pub(super) fn extend(&mut self) { + // Old pending extensions and their results are no longer needed. + self.pending_extends = JoinSet::new(); + self.eo_pending_extends = JoinSet::new(); + let batches = self.leases.retain(self.max_lease, self.max_lease_extension); for ack_ids in batches { let leaser = self.leaser.clone(); @@ -320,9 +342,6 @@ where self.eo_pending_extends .spawn(async move { leaser.extend(ack_ids).await }); } - - // TODO(#5048) - we could process the results as a lease event. - while self.pending_extends.try_join_next().is_some() {} } /// Shutdown the leaser @@ -488,6 +507,28 @@ pub(super) mod tests { assert_eq!(flattened.ack_ids, test_ids(2..3)); } + #[tokio::test(start_paused = true)] + async fn extend_clears_pending() { + let mut mock = MockLeaser::new(); + mock.expect_extend().returning(|ack_ids| ack_ids); + + let mut state = LeaseState::new(Arc::new(mock), LeaseOptions::default()); + + state.add(test_id(1), at_least_once_info()); + state.add(test_id(2), exactly_once_info()); + + tokio::time::advance(Duration::from_secs(61)).await; + + state.extend(); + assert_eq!(state.pending_extends.len(), 1); + assert_eq!(state.eo_pending_extends.len(), 1); + + state.extend(); + // The previous pending extend tasks should be cleared and a new one spawned. + assert_eq!(state.pending_extends.len(), 1); + assert_eq!(state.eo_pending_extends.len(), 1); + } + async fn flush_and_await(state: &mut LeaseState) where L: Leaser + Clone + Send + 'static, @@ -955,9 +996,13 @@ pub(super) mod tests { async fn pending_extends_size_management() { let mut mock = MockLeaser::new(); mock.expect_extend() - .times(2) + .times(1) .withf(|v| *v == vec![test_id(1)]) - .returning(move |ack_ids| ack_ids); + .returning(|ack_ids| ack_ids); + mock.expect_extend() + .times(1) + .withf(|v| *v == vec![test_id(2)]) + .returning(|ack_ids| ack_ids); let options = LeaseOptions { max_lease_extension: Duration::ZERO, @@ -966,21 +1011,26 @@ pub(super) mod tests { let mut state = LeaseState::new(Arc::new(mock), options); state.add(test_id(1), at_least_once_info()); + state.add(test_id(2), exactly_once_info()); state.extend(); - // Yield execution so the extend attempt can execute. - tokio::task::yield_now().await; - // TODO(#5048) - We currently clean up the completed pending extends in - // `LeaseState::extend()`. If we decide to clean up the pending extends - // elsewhere, this test will need an update. - state.extend(); - let pending_extends = state.pending_extends.len(); - assert!( - pending_extends < 2, - "The first lease extension attempt should have completed. We should not hold onto it." - ); + let mut events = Vec::new(); + events.push(state.next_event().await); + events.push(state.next_event().await); + + assert!(events.contains(&LeaseEvent::ExtendCompleted(test_ids(1..2)))); + assert!(events.contains(&LeaseEvent::ExtendCompletedEO(test_ids(2..3)))); - let _ = state.pending_extends.join_all().await; + assert_eq!( + state.pending_extends.len(), + 0, + "Completed at-least-once extensions should be cleaned up" + ); + assert_eq!( + state.eo_pending_extends.len(), + 0, + "Completed exactly-once extensions should be cleaned up" + ); } #[tokio::test] diff --git a/src/pubsub/src/subscriber/lease_state/at_least_once.rs b/src/pubsub/src/subscriber/lease_state/at_least_once.rs index ad73485fc5..1885786d53 100644 --- a/src/pubsub/src/subscriber/lease_state/at_least_once.rs +++ b/src/pubsub/src/subscriber/lease_state/at_least_once.rs @@ -111,9 +111,6 @@ impl Leases { // Flush the batch when it is full. batches.push(std::mem::take(&mut batch)); } - // TODO(#5048): Do not update last_extension here after update_last_extension fn - // is used to report successful extends. - info.last_extension = Some(now); true } }); @@ -589,6 +586,7 @@ mod tests { // We should always send a receipt lease extension upon receiving a // message. let batches = leases.retain(MAX_LEASE, MAX_LEASE_EXTENSION); + leases.update_last_extension(&test_ids(0..1)); assert_eq!(batches, vec![vec![test_id(0)]]); assert_eq!( TestLeases { @@ -617,6 +615,7 @@ mod tests { // We need to extend the lease again. let batches = leases.retain(MAX_LEASE, MAX_LEASE_EXTENSION); + leases.update_last_extension(&test_ids(0..1)); assert_eq!(batches, vec![vec![test_id(0)]]); assert_eq!( TestLeases { diff --git a/src/pubsub/src/subscriber/lease_state/exactly_once.rs b/src/pubsub/src/subscriber/lease_state/exactly_once.rs index 4aff32d86d..1832acab74 100644 --- a/src/pubsub/src/subscriber/lease_state/exactly_once.rs +++ b/src/pubsub/src/subscriber/lease_state/exactly_once.rs @@ -130,9 +130,6 @@ impl Leases { None } else { // Continue to extend messages being acked. - // TODO(#5048): Do not update last_extension here after update_last_extension fn - // is used to report successful extends. - info.last_extension = Some(now); Some(id.clone()) } } @@ -148,9 +145,6 @@ impl Leases { None } else { // Extend leases for all other messages - // TODO(#5048): Do not update last_extension here after update_last_extension fn - // is used to report successful extends. - info.last_extension = Some(now); Some(id.clone()) } } @@ -874,6 +868,7 @@ mod tests { // We should always send a receipt lease extension upon receiving a // message. let batches = leases.retain(MAX_LEASE, MAX_LEASE_EXTENSION); + leases.update_last_extension(&test_ids(0..2)); let flattened = Batches::flatten(batches); assert_eq!(sorted(&flattened.ack_ids), test_ids(0..2)); assert_eq!( @@ -903,6 +898,7 @@ mod tests { // We need to extend the lease again. let batches = leases.retain(MAX_LEASE, MAX_LEASE_EXTENSION); + leases.update_last_extension(&test_ids(0..2)); let flattened = Batches::flatten(batches); assert_eq!(sorted(&flattened.ack_ids), test_ids(0..2)); assert_eq!( From 2b1d95057a65bd9257603e857ccb6bb2d6301694 Mon Sep 17 00:00:00 2001 From: Phong Chuong Date: Mon, 27 Apr 2026 17:25:49 +0000 Subject: [PATCH 2/2] address comment --- src/pubsub/src/subscriber/lease_state.rs | 60 +++++++----------------- 1 file changed, 18 insertions(+), 42 deletions(-) diff --git a/src/pubsub/src/subscriber/lease_state.rs b/src/pubsub/src/subscriber/lease_state.rs index db392533f8..bbd2d373bf 100644 --- a/src/pubsub/src/subscriber/lease_state.rs +++ b/src/pubsub/src/subscriber/lease_state.rs @@ -229,23 +229,25 @@ where return LeaseEvent::Flush; } - tokio::select! { - _ = self.flush_interval.tick() => LeaseEvent::Flush, - _ = self.extend_interval.tick() => LeaseEvent::Extend, - res = self.pending_extends.join_next(), if !self.pending_extends.is_empty() => { - if let Some(Ok(ack_ids)) = res { - LeaseEvent::ExtendCompleted(ack_ids) - } else { - // swallow the JoinError. - LeaseEvent::ExtendCompleted(Vec::new()) + loop { + tokio::select! { + _ = self.flush_interval.tick() => return LeaseEvent::Flush, + _ = self.extend_interval.tick() => return LeaseEvent::Extend, + res = self.pending_extends.join_next(), if !self.pending_extends.is_empty() => { + if let Some(Ok(ack_ids)) = res { + return LeaseEvent::ExtendCompleted(ack_ids); + } else { + // swallow the JoinError. + continue; + } } - } - res = self.eo_pending_extends.join_next(), if !self.eo_pending_extends.is_empty() => { - if let Some(Ok(ack_ids)) = res { - LeaseEvent::ExtendCompletedEO(ack_ids) - } else { - // swallow the JoinError. - LeaseEvent::ExtendCompletedEO(Vec::new()) + res = self.eo_pending_extends.join_next(), if !self.eo_pending_extends.is_empty() => { + if let Some(Ok(ack_ids)) = res { + return LeaseEvent::ExtendCompletedEO(ack_ids); + } else { + // swallow the JoinError. + continue; + } } } } @@ -323,10 +325,6 @@ where /// /// Drops messages whose lease deadline cannot be extended any further. pub(super) fn extend(&mut self) { - // Old pending extensions and their results are no longer needed. - self.pending_extends = JoinSet::new(); - self.eo_pending_extends = JoinSet::new(); - let batches = self.leases.retain(self.max_lease, self.max_lease_extension); for ack_ids in batches { let leaser = self.leaser.clone(); @@ -507,28 +505,6 @@ pub(super) mod tests { assert_eq!(flattened.ack_ids, test_ids(2..3)); } - #[tokio::test(start_paused = true)] - async fn extend_clears_pending() { - let mut mock = MockLeaser::new(); - mock.expect_extend().returning(|ack_ids| ack_ids); - - let mut state = LeaseState::new(Arc::new(mock), LeaseOptions::default()); - - state.add(test_id(1), at_least_once_info()); - state.add(test_id(2), exactly_once_info()); - - tokio::time::advance(Duration::from_secs(61)).await; - - state.extend(); - assert_eq!(state.pending_extends.len(), 1); - assert_eq!(state.eo_pending_extends.len(), 1); - - state.extend(); - // The previous pending extend tasks should be cleared and a new one spawned. - assert_eq!(state.pending_extends.len(), 1); - assert_eq!(state.eo_pending_extends.len(), 1); - } - async fn flush_and_await(state: &mut LeaseState) where L: Leaser + Clone + Send + 'static,