Skip to content

landing_zone: notify waiters after reset_observe repopulates queue#307

Open
Dev-X25874 wants to merge 1 commit into
databricks:mainfrom
Dev-X25874:fix/landing-zone-reset-observe-notify
Open

landing_zone: notify waiters after reset_observe repopulates queue#307
Dev-X25874 wants to merge 1 commit into
databricks:mainfrom
Dev-X25874:fix/landing-zone-reset-observe-notify

Conversation

@Dev-X25874
Copy link
Copy Markdown

What changes are proposed in this pull request?

WHAT: A single-line fix in reset_observe() in
rust/sdk/src/landing_zone.rs: after moving all observed items back to
the front of the unobserved queue, call self.new_item_notify.notify_waiters()
(with the lock already dropped) so that any task blocked inside observe()
is woken and can immediately dequeue the restored items.

WHY: reset_observe() is called during stream recovery (see lib.rs
recovery loop) to replay unacknowledged items on a freshly reconnected
stream. Right after reset_observe() returns, a new sender task is spawned
that calls landing_zone.observe().await in a tight loop.

observe() works like this:

loop {
    let notified = self.new_item_notify.notified(); // future registered
    {
        let mut state = self.state.lock()...;
        if let Some(elem) = state.queue.pop_front() { return elem; }
    }
    notified.await; // parks here if queue was empty at lock time
}

If the sender task acquires the lock before reset_observe() has
repopulated the queue — or if it simply races and checks an empty queue
on its first iteration — it parks on notified.await. The only way to
unblock it is a notify_one() / notify_waiters() call. add() does
this correctly on every push; reset_observe() did not, despite pushing
N items. The result is a silent deadlock: all replayed items accumulate
in the queue while the sender task sleeps forever, so no records are ever
sent on the recovered stream.

The fix tightens the lock scope so the MutexGuard is dropped before
notify_waiters() is called (avoiding any risk of a woken waiter trying
to re-acquire a still-held lock), and guards the notify behind if moved > 0
to avoid a spurious wake when there was nothing to restore.

How is this tested?

The existing test_reset_observe_with_concurrent_add test in
landing_zone.rs exercises reset_observe() but does not cover the
specific race where a sender task is already parked in observe() at the
moment reset_observe() runs. That race is the precise failure mode fixed
here.

Not tested with a new automated test in this PR. A targeted regression
test would spawn a task blocked on observe() against an empty queue,
call reset_observe() with pre-populated observed items from a separate
thread, and assert the blocked task unblocks within a short timeout —
happy to add that if the reviewer would like it included.

@elenagaljak-db
Copy link
Copy Markdown
Contributor

Hi @Dev-X25874, thanks for the contribution. While this could technically happen if reset_observe() and observe() are used concurrently in isolation, the ZerobusStream implementation we have in our repo can't really hit this. observe() is only ever called from the sender task, and reset_observe() only runs during stream creation/recovery, after the previous sender task has already exited and before a new one is spawned. So there's never a task that can wait on observe() at that point.

@Dev-X25874
Copy link
Copy Markdown
Author

Thanks for the clear explanation @elenagaljak-db — that makes sense. Since the sender task lifecycle guarantees the sequencing, I'll close this. Happy to follow up with a doc comment on reset_observe() noting that invariant if that would be useful to future readers.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants