[Batch] Fix race conditions in _Batching async flush/termination lifecycle#630
Merged
assaf758 merged 6 commits intomlrun:developmentfrom Mar 23, 2026
Merged
Conversation
05265cb to
d42ab9c
Compare
gtopper
reviewed
Mar 18, 2026
Collaborator
gtopper
left a comment
There was a problem hiding this comment.
Important aspect of resilience, but the current changes seem to only help with a limited scenario where a small number of batches failed and we go into termination (e.g. on drain) shortly after.
| exact moment needed to trigger each race. | ||
|
|
||
| Tests operate directly on _Batching subclasses, calling _do() to inject events. | ||
| This bypasses the AsyncEmitSource run loop (which processes events one at a time), |
Collaborator
There was a problem hiding this comment.
There is no need to bypass AsyncEmitSource. It would be better if the tests built a graph instead of exercising the target directly.
Collaborator
There was a problem hiding this comment.
This one is still outstanding.
added 3 commits
March 23, 2026 11:39
…cycle Fix 6 race conditions in _Batching._emit_batch and _emit_all that cause data loss, KeyError crashes, and writes to closed targets when the timer task and run loop interleave at await points during slow target writes. - Pop _batch_events before await _emit() so concurrent _do() creates a fresh list instead of appending to the in-flight one - Cancel _timeout_task at the start of termination before _emit_all - Replace snapshot-based key iteration in _emit_all with while loop - Re-insert failed batch data on _emit error for retry/redelivery - Add error handling in _emit_all so drain path logs instead of crashing Reference: ML-12304
After _emit failure, batches are now preserved for retry instead of being permanently lost. Update expected sum from 9 to 10 since event 1 is retried and included in the next successful flush. Reference: ML-12304
Address code review feedback: remove the re-insert block and error-handling logic from _emit_batch and _emit_all. These behavior changes will be handled in a separate PR after design discussion. This commit retains only the race condition fixes: - Pop _batch_events before await _emit() to prevent shared-list races - Cancel _timeout_task at the start of termination before _emit_all - Replace snapshot-based key iteration in _emit_all with while loop Also addresses review comments: - Remove Kafka-specific references from comments - Update test module docstring to describe fixes not bugs - Add comment explaining sleep(0) yield in test - Remove error-handling tests (belong to separate PR) - Revert test_error_raising_batch_target to original expectations Reference: ML-12304
104e562 to
964b1a1
Compare
Avoid cancel() on _timeout_task during termination — cancelling mid-_emit would lose already-popped batch data. Instead, set a _terminating flag that _sleep_and_emit checks between iterations, then await the task to let it finish its current emit cleanly. Reference: ML-12304
gtopper
reviewed
Mar 23, 2026
gtopper
reviewed
Mar 23, 2026
Collaborator
gtopper
left a comment
There was a problem hiding this comment.
Production side LGTM. Would be good to see the tests build a flow as we normally do in unit tests. I don't see a reason that wouldn't work here as well, and it will make the tests less artificial.
added 2 commits
March 23, 2026 17:33
Rewrite tests to use build_flow with AsyncEmitSource instead of calling _do() directly, matching production graph construction. Also: replace asyncio.sleep(0.01) yields with sleep(0). Reference: ML-12304
Add step-by-step interleaving diagrams back to all four test docstrings. These document the exact await-point interleavings that trigger each race, serving as the specification for the fixes. Also add inline comments explaining each asyncio.sleep call. Reference: ML-12304
dae900b to
5f482c5
Compare
assaf758
approved these changes
Mar 23, 2026
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Fix 6 race conditions in
_Batching._emit_batchand_emit_allthat cause data loss,KeyErrorcrashes, and writes to closed targets when the timer task (_sleep_and_emit) and the run loop (_do) interleave atawaitpoints during slow target writes (S3/TSDB).Changes Made
_batch_eventsbeforeawait _emit()so concurrent_do()creates a fresh list instead of appending to the in-flight one (fixes KeyError and silent event loss)_timeout_taskat the start of termination before_emit_all(prevents writes to closed targets)_emit_allwithwhile self._batchloop (catches keys added during flush)_emiterror for retry/Kafka redelivery (prevents permanent data loss)_emit_allso drain path logs and continues instead of crashingTesting
tests/test_batching_races.py— 6 deterministic tests reproducing each race condition usingasyncio.EventgatingReference