Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 46 additions & 5 deletions src/pubsub/src/subscriber/lease_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,12 @@ impl LeaseLoop {
match event {
LeaseEvent::Flush => state.flush(),
LeaseEvent::Extend => state.extend(),
LeaseEvent::ExtendCompleted(ack_ids) => {
state.update_last_extension(ack_ids);
}
LeaseEvent::ExtendCompletedEO(ack_ids) => {
state.update_last_extension_eo(ack_ids);
}
}
},
message = message_rx.recv() => {
Expand Down Expand Up @@ -378,7 +384,7 @@ mod tests {
}

#[tokio_test_no_panics(start_paused = true)]
async fn deadline_interval() -> anyhow::Result<()> {
async fn extend_interval() -> anyhow::Result<()> {
const EXTEND_PERIOD: Duration = Duration::from_secs(1);
const EXTEND_START: Duration = Duration::from_millis(200);

Expand All @@ -390,8 +396,9 @@ mod tests {
flush_start: Duration::from_secs(900),
extend_period: EXTEND_PERIOD,
extend_start: EXTEND_START,
// extend leases for all messages on the timer
max_lease_extension: Duration::ZERO,
// max_lease_extension is set to 7 seconds as the test advances
// extend_period twice and the buffer is 5 seconds.
max_lease_extension: Duration::from_secs(7),
..Default::default()
};
let lease_loop = LeaseLoop::new(mock.clone(), confirmed_rx, options);
Expand All @@ -403,6 +410,14 @@ mod tests {
lease_loop.strong_message_tx().send(test_message(i))?;
}

// Seed the lease loop with some exactly-once messages
for i in 30..60 {
lease_loop.strong_message_tx().send(NewMessage {
ack_id: test_id(i),
lease_info: exactly_once_info(),
})?;
}

// Confirm initial state
mock.lock().await.checkpoint();

Expand All @@ -414,6 +429,14 @@ mod tests {
.times(1)
.withf(|v| sorted(v) == test_ids(0..30))
.returning(move |ack_ids| ack_ids);

mock.lock()
.await
.expect_extend()
.times(1)
.withf(|v| sorted(v) == test_ids(30..60))
.returning(move |ack_ids| ack_ids);

tokio::time::advance(EXTEND_START).await;

// Yield the current task, so tokio can execute the flush().
Expand All @@ -426,14 +449,32 @@ mod tests {
lease_loop.strong_ack_tx().send(Action::Ack(test_id(i)))?;
}

// Advance to and validate the second extension
// Advance to and validate the second extension period (should be skipped)
{
mock.lock().await.expect_extend().times(0);
tokio::time::advance(EXTEND_PERIOD).await;

// Yield the current task, so tokio can execute the flush().
tokio::task::yield_now().await;
mock.lock().await.checkpoint();
}

// Advance to and validate the third extension (should be extended)
{
mock.lock()
.await
.expect_extend()
.times(1)
.withf(|v| sorted(v) == test_ids(10..30))
.returning(move |ack_ids| ack_ids);
.returning(|ack_ids| ack_ids);

mock.lock()
.await
.expect_extend()
.times(1)
.withf(|v| sorted(v) == test_ids(30..60))
.returning(|ack_ids| ack_ids);

tokio::time::advance(EXTEND_PERIOD).await;

// Yield the current task, so tokio can execute the flush().
Expand Down
70 changes: 48 additions & 22 deletions src/pubsub/src/subscriber/lease_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,10 @@ pub(super) enum LeaseEvent {
Flush,
/// Extend leases
Extend,
/// Pending extensions completed
ExtendCompleted(Vec<String>),
/// Pending exactly-once extensions completed
ExtendCompletedEO(Vec<String>),
}

impl<L> LeaseState<L>
Expand Down Expand Up @@ -225,9 +229,27 @@ where
return LeaseEvent::Flush;
}

tokio::select! {
_ = self.flush_interval.tick() => LeaseEvent::Flush,
_ = self.extend_interval.tick() => LeaseEvent::Extend,
loop {
tokio::select! {
_ = self.flush_interval.tick() => return LeaseEvent::Flush,
_ = self.extend_interval.tick() => return LeaseEvent::Extend,
res = self.pending_extends.join_next(), if !self.pending_extends.is_empty() => {
if let Some(Ok(ack_ids)) = res {
return LeaseEvent::ExtendCompleted(ack_ids);
} else {
// swallow the JoinError.
continue;
}
}
res = self.eo_pending_extends.join_next(), if !self.eo_pending_extends.is_empty() => {
if let Some(Ok(ack_ids)) = res {
return LeaseEvent::ExtendCompletedEO(ack_ids);
} else {
// swallow the JoinError.
continue;
}
}
}
}
}

Expand Down Expand Up @@ -262,14 +284,12 @@ where

/// Updates the `last_extension` timestamp for the given at-least-once ack IDs
/// with the completion time of a successful extension RPC.
#[allow(dead_code)]
pub(super) fn update_last_extension(&mut self, ack_ids: Vec<String>) {
self.leases.update_last_extension(&ack_ids);
}

/// Updates the `last_extension` timestamp for the given exactly-once ack IDs
/// with the completion time of a successful extension RPC.
#[allow(dead_code)]
pub(super) fn update_last_extension_eo(&mut self, ack_ids: Vec<String>) {
self.eo_leases.update_last_extension(&ack_ids);
}
Expand Down Expand Up @@ -320,9 +340,6 @@ where
self.eo_pending_extends
.spawn(async move { leaser.extend(ack_ids).await });
}

// TODO(#5048) - we could process the results as a lease event.
while self.pending_extends.try_join_next().is_some() {}
}

/// Shutdown the leaser
Expand Down Expand Up @@ -955,9 +972,13 @@ pub(super) mod tests {
async fn pending_extends_size_management() {
let mut mock = MockLeaser::new();
mock.expect_extend()
.times(2)
.times(1)
.withf(|v| *v == vec![test_id(1)])
.returning(move |ack_ids| ack_ids);
.returning(|ack_ids| ack_ids);
mock.expect_extend()
.times(1)
.withf(|v| *v == vec![test_id(2)])
.returning(|ack_ids| ack_ids);

let options = LeaseOptions {
max_lease_extension: Duration::ZERO,
Expand All @@ -966,21 +987,26 @@ pub(super) mod tests {
let mut state = LeaseState::new(Arc::new(mock), options);

state.add(test_id(1), at_least_once_info());
state.add(test_id(2), exactly_once_info());
state.extend();
// Yield execution so the extend attempt can execute.
tokio::task::yield_now().await;

// TODO(#5048) - We currently clean up the completed pending extends in
// `LeaseState::extend()`. If we decide to clean up the pending extends
// elsewhere, this test will need an update.
state.extend();
let pending_extends = state.pending_extends.len();
assert!(
pending_extends < 2,
"The first lease extension attempt should have completed. We should not hold onto it."
);
let mut events = Vec::new();
events.push(state.next_event().await);
events.push(state.next_event().await);

assert!(events.contains(&LeaseEvent::ExtendCompleted(test_ids(1..2))));
assert!(events.contains(&LeaseEvent::ExtendCompletedEO(test_ids(2..3))));

let _ = state.pending_extends.join_all().await;
assert_eq!(
state.pending_extends.len(),
0,
"Completed at-least-once extensions should be cleaned up"
);
assert_eq!(
state.eo_pending_extends.len(),
0,
"Completed exactly-once extensions should be cleaned up"
);
}

#[tokio::test]
Expand Down
5 changes: 2 additions & 3 deletions src/pubsub/src/subscriber/lease_state/at_least_once.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,6 @@ impl Leases {
// Flush the batch when it is full.
batches.push(std::mem::take(&mut batch));
}
// TODO(#5048): Do not update last_extension here after update_last_extension fn
// is used to report successful extends.
info.last_extension = Some(now);
true
}
});
Expand Down Expand Up @@ -589,6 +586,7 @@ mod tests {
// We should always send a receipt lease extension upon receiving a
// message.
let batches = leases.retain(MAX_LEASE, MAX_LEASE_EXTENSION);
leases.update_last_extension(&test_ids(0..1));
assert_eq!(batches, vec![vec![test_id(0)]]);
assert_eq!(
TestLeases {
Expand Down Expand Up @@ -617,6 +615,7 @@ mod tests {

// We need to extend the lease again.
let batches = leases.retain(MAX_LEASE, MAX_LEASE_EXTENSION);
leases.update_last_extension(&test_ids(0..1));
assert_eq!(batches, vec![vec![test_id(0)]]);
assert_eq!(
TestLeases {
Expand Down
8 changes: 2 additions & 6 deletions src/pubsub/src/subscriber/lease_state/exactly_once.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,6 @@ impl Leases {
None
} else {
// Continue to extend messages being acked.
// TODO(#5048): Do not update last_extension here after update_last_extension fn
// is used to report successful extends.
info.last_extension = Some(now);
Some(id.clone())
}
}
Expand All @@ -148,9 +145,6 @@ impl Leases {
None
} else {
// Extend leases for all other messages
// TODO(#5048): Do not update last_extension here after update_last_extension fn
// is used to report successful extends.
info.last_extension = Some(now);
Some(id.clone())
}
}
Expand Down Expand Up @@ -874,6 +868,7 @@ mod tests {
// We should always send a receipt lease extension upon receiving a
// message.
let batches = leases.retain(MAX_LEASE, MAX_LEASE_EXTENSION);
leases.update_last_extension(&test_ids(0..2));
let flattened = Batches::flatten(batches);
assert_eq!(sorted(&flattened.ack_ids), test_ids(0..2));
assert_eq!(
Expand Down Expand Up @@ -903,6 +898,7 @@ mod tests {

// We need to extend the lease again.
let batches = leases.retain(MAX_LEASE, MAX_LEASE_EXTENSION);
leases.update_last_extension(&test_ids(0..2));
let flattened = Batches::flatten(batches);
assert_eq!(sorted(&flattened.ack_ids), test_ids(0..2));
assert_eq!(
Expand Down
Loading