landing_zone: notify waiters after reset_observe repopulates queue#307
landing_zone: notify waiters after reset_observe repopulates queue#307Dev-X25874 wants to merge 1 commit into
Conversation
|
Hi @Dev-X25874, thanks for the contribution. While this could technically happen if |
|
Thanks for the clear explanation @elenagaljak-db — that makes sense. Since the sender task lifecycle guarantees the sequencing, I'll close this. Happy to follow up with a doc comment on reset_observe() noting that invariant if that would be useful to future readers. |
What changes are proposed in this pull request?
WHAT: A single-line fix in
reset_observe()inrust/sdk/src/landing_zone.rs: after moving all observed items back tothe front of the unobserved queue, call
self.new_item_notify.notify_waiters()(with the lock already dropped) so that any task blocked inside
observe()is woken and can immediately dequeue the restored items.
WHY:
reset_observe()is called during stream recovery (seelib.rsrecovery loop) to replay unacknowledged items on a freshly reconnected
stream. Right after
reset_observe()returns, a new sender task is spawnedthat calls
landing_zone.observe().awaitin a tight loop.observe()works like this:If the sender task acquires the lock before
reset_observe()hasrepopulated the queue — or if it simply races and checks an empty queue
on its first iteration — it parks on
notified.await. The only way tounblock it is a
notify_one()/notify_waiters()call.add()doesthis correctly on every push;
reset_observe()did not, despite pushingN items. The result is a silent deadlock: all replayed items accumulate
in the queue while the sender task sleeps forever, so no records are ever
sent on the recovered stream.
The fix tightens the lock scope so the
MutexGuardis dropped beforenotify_waiters()is called (avoiding any risk of a woken waiter tryingto re-acquire a still-held lock), and guards the notify behind
if moved > 0to avoid a spurious wake when there was nothing to restore.
How is this tested?
The existing
test_reset_observe_with_concurrent_addtest inlanding_zone.rsexercisesreset_observe()but does not cover thespecific race where a sender task is already parked in
observe()at themoment
reset_observe()runs. That race is the precise failure mode fixedhere.
Not tested with a new automated test in this PR. A targeted regression
test would spawn a task blocked on
observe()against an empty queue,call
reset_observe()with pre-populated observed items from a separatethread, and assert the blocked task unblocks within a short timeout —
happy to add that if the reviewer would like it included.