From 164ffeec2b85e03f817720fddb3d07aa5c7907eb Mon Sep 17 00:00:00 2001 From: Dev-X25874 <283057883+Dev-X25874@users.noreply.github.com> Date: Wed, 20 May 2026 13:04:11 +0530 Subject: [PATCH] landing_zone: notify waiters after reset_observe repopulates queue --- rust/sdk/src/landing_zone.rs | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/rust/sdk/src/landing_zone.rs b/rust/sdk/src/landing_zone.rs index 86f25f2..0db2a1e 100644 --- a/rust/sdk/src/landing_zone.rs +++ b/rust/sdk/src/landing_zone.rs @@ -159,9 +159,21 @@ impl LandingZone { /// This is used during stream recovery to re-send items that were observed but /// not yet acknowledged by the server. pub fn reset_observe(&self) { - let mut state = self.state.lock().expect("Lock poisoned"); - while let Some(observed_item) = state.observed_items.pop_back() { - state.queue.push_front(observed_item); + let moved = { + let mut state = self.state.lock().expect("Lock poisoned"); + let mut count = 0usize; + while let Some(observed_item) = state.observed_items.pop_back() { + state.queue.push_front(observed_item); + count += 1; + } + count + }; + // Wake all blocked `observe()` callers so they can dequeue the items + // that were just moved back. Without this, any task already parked on + // `new_item_notify.notified().await` inside `observe()` will sleep + // indefinitely even though the queue is now non-empty. + if moved > 0 { + self.new_item_notify.notify_waiters(); } }