Skip to content

fix: map stream can ack inflight messages during panic and tests#3267

Open
vaibhavtiwari33 wants to merge 7 commits intomainfrom
global-state-tests
Open

fix: map stream can ack inflight messages during panic and tests#3267
vaibhavtiwari33 wants to merge 7 commits intomainfrom
global-state-tests

Conversation

@vaibhavtiwari33
Copy link
Contributor

@vaibhavtiwari33 vaibhavtiwari33 commented Feb 25, 2026

What this PR does / why we need it

  • Tests to run global-state-tests as part of the broader test suite
  • Fix for a scenario where map stream can ack inflight messages during panics

To track down where the ACKing of a message is happening when the map stream panics, we added a stack trace for whenever a message is acked when dropped:

if self.is_failed.load(std::sync::atomic::Ordering::Relaxed) {
    ack_handle.send(ReadAck::Nak).expect("Failed to send nak");
} else {
    eprintln!("DEBUG AckHandle::drop sending Ack, backtrace:\n{}", std::backtrace::Backtrace::force_capture());
    ack_handle.send(ReadAck::Ack).expect("Failed to send ack");
}

During the run of test_threaded_stream_with_panic test we see the following trace:

DEBUG AckHandle::drop sending Ack, backtrace:
   0: std::backtrace_rs::backtrace::libunwind::trace
             at /rustc/01f6ddf7588f42ae2d7eb0a2f21d44e8e96674cf/library/std/src/../../backtrace/src/backtrace/libunwind.rs:117:9
   1: std::backtrace_rs::backtrace::trace_unsynchronized
             at /rustc/01f6ddf7588f42ae2d7eb0a2f21d44e8e96674cf/library/std/src/../../backtrace/src/backtrace/mod.rs:66:14
   2: std::backtrace::Backtrace::create
             at /rustc/01f6ddf7588f42ae2d7eb0a2f21d44e8e96674cf/library/std/src/backtrace.rs:331:13
   3: <numaflow_core::message::AckHandle as core::ops::drop::Drop>::drop
             at ./src/message.rs:85:80
   4: core::ptr::drop_in_place<numaflow_core::message::AckHandle>
             at /Users/vtiwari5/.rustup/toolchains/1.93-aarch64-apple-darwin/lib/rustlib/src/rust/library/core/src/ptr/mod.rs:805:1
   5: alloc::sync::Arc<T,A>::drop_slow
             at /Users/vtiwari5/.rustup/toolchains/1.93-aarch64-apple-darwin/lib/rustlib/src/rust/library/alloc/src/sync.rs:2120:18
   6: <alloc::sync::Arc<T,A> as core::ops::drop::Drop>::drop
             at /Users/vtiwari5/.rustup/toolchains/1.93-aarch64-apple-darwin/lib/rustlib/src/rust/library/alloc/src/sync.rs:2854:18
   7: core::ptr::drop_in_place<alloc::sync::Arc<numaflow_core::message::AckHandle>>
             at /Users/vtiwari5/.rustup/toolchains/1.93-aarch64-apple-darwin/lib/rustlib/src/rust/library/core/src/ptr/mod.rs:805:1
   8: core::ptr::drop_in_place<core::option::Option<alloc::sync::Arc<numaflow_core::message::AckHandle>>>
             at /Users/vtiwari5/.rustup/toolchains/1.93-aarch64-apple-darwin/lib/rustlib/src/rust/library/core/src/ptr/mod.rs:805:1
   9: core::ptr::drop_in_place<numaflow_core::mapper::map::ParentMessageInfo>
             at /Users/vtiwari5/.rustup/toolchains/1.93-aarch64-apple-darwin/lib/rustlib/src/rust/library/core/src/ptr/mod.rs:805:1
  10: numaflow_core::mapper::map::stream::MapStreamTask::execute::{{closure}}
             at ./src/mapper/map/stream.rs:142:5
  11: numaflow_core::mapper::map::stream::MapStreamTask::spawn::{{closure}}
             at ./src/mapper/map/stream.rs:56:28
...

So, the ack is being called right after there was a call to drop ParentMessageInfo, which means there were no other references to the AckHandle at ./src/mapper/map/stream.rs:142:5.

There are three possibilities for the AckHandle to be dropped here through ParentMessageInfo, which basically are the three match cases:

let result = receiver.recv().await;
match result {
    Some(Ok(results)) => {
        for result in results {
            let mapped_message: Message =
                UserDefinedMessage(result, &parent_info, parent_info.current_index)
                    .into();
            parent_info.current_index += 1;
         ....
        }
    }
    Some(Err(e)) => {
        parent_info
            .ack_handle
            .as_ref()
            .expect("ack handle should be present")
            .is_failed
            .store(true, Ordering::Relaxed);
        let _ = self.shared_ctx.error_tx.send(e).await;
        return;
    }
    None => break;
}
  1. Some(Ok(results)) =>: We know that this case can never happen for the panicking test since the map stream UDF will always panic and won't return any results, so ParentMessageInfo cannot be dropping after this.
  2. Some(Err(e)) =>: If this case would've triggered then the parent_info would've set the ack handle to nack when dropped.
  3. None =>: This seems to be the only viable option where if we match this case then the ParentMessageInfo can drop without Nacking the AckHandle.

Following the above-mentioned scenario, a message was sent to the map stream server, but instead of getting any response, the gRPC channel got closed (this needs more digging).

So, the current fix (hack) is that since we track and increment the current_index as part of the parent_info when a message is processed, we can Nack any messages where the channel got closed but the parent info's current_index remained 0.

Related issues

Fixes #3244
Fixes #3268

Testing

The panic tests added for map stream now pass.

…st suite

Signed-off-by: Vaibhav Tiwari <vaibhav.tiwari33@gmail.com>
Signed-off-by: Vaibhav Tiwari <vaibhav.tiwari33@gmail.com>
@vaibhavtiwari33 vaibhavtiwari33 marked this pull request as ready for review February 25, 2026 21:08
@vaibhavtiwari33 vaibhavtiwari33 marked this pull request as draft February 25, 2026 21:48
Signed-off-by: Vaibhav Tiwari <vaibhav.tiwari33@gmail.com>
@vaibhavtiwari33 vaibhavtiwari33 changed the title test: Tests to run global-state-tests as part of the broader test suite fix: map stream can ack inflight messages during panic and tests Mar 3, 2026
@codecov
Copy link

codecov bot commented Mar 3, 2026

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 81.01%. Comparing base (3878a12) to head (97d1255).

Additional details and impacted files
@@            Coverage Diff             @@
##             main    #3267      +/-   ##
==========================================
+ Coverage   80.97%   81.01%   +0.03%     
==========================================
  Files         316      316              
  Lines       72203    72370     +167     
==========================================
+ Hits        58468    58627     +159     
- Misses      13182    13190       +8     
  Partials      553      553              

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@vaibhavtiwari33 vaibhavtiwari33 marked this pull request as ready for review March 3, 2026 05:28
@vaibhavtiwari33 vaibhavtiwari33 self-assigned this Mar 3, 2026
@vaibhavtiwari33 vaibhavtiwari33 added opex Operational Excellence to make it easy to run in production and debug testing Testing and CI bug Something isn't working and removed opex Operational Excellence to make it easy to run in production and debug labels Mar 3, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

bug Something isn't working testing Testing and CI

Projects

None yet

Development

Successfully merging this pull request may close these issues.

bug: map stream can ack inflight messages during panic Try to update global state tests

1 participant