Skip to content

[Batch] Fix race conditions in _Batching async flush/termination lifecycle#630

Merged
assaf758 merged 6 commits intomlrun:developmentfrom
alxtkr77:ML-12304-fix-batching-race-conditions
Mar 23, 2026
Merged

[Batch] Fix race conditions in _Batching async flush/termination lifecycle#630
assaf758 merged 6 commits intomlrun:developmentfrom
alxtkr77:ML-12304-fix-batching-race-conditions

Conversation

@alxtkr77
Copy link
Copy Markdown
Member

@alxtkr77 alxtkr77 commented Mar 16, 2026

Summary

Fix 6 race conditions in _Batching._emit_batch and _emit_all that cause data loss, KeyError crashes, and writes to closed targets when the timer task (_sleep_and_emit) and the run loop (_do) interleave at await points during slow target writes (S3/TSDB).

Changes Made

  • Pop _batch_events before await _emit() so concurrent _do() creates a fresh list instead of appending to the in-flight one (fixes KeyError and silent event loss)
  • Cancel _timeout_task at the start of termination before _emit_all (prevents writes to closed targets)
  • Replace snapshot-based key iteration in _emit_all with while self._batch loop (catches keys added during flush)
  • Re-insert failed batch data on _emit error for retry/Kafka redelivery (prevents permanent data loss)
  • Add error handling in _emit_all so drain path logs and continues instead of crashing

Testing

  • Added tests/test_batching_races.py — 6 deterministic tests reproducing each race condition using asyncio.Event gating
  • All 6 tests pass

Reference

  • Jira: ML-12304
  • Related: ML-12286 (silent data loss), IG4-1713 (KeyError crash)

@alxtkr77 alxtkr77 force-pushed the ML-12304-fix-batching-race-conditions branch from 05265cb to d42ab9c Compare March 16, 2026 09:14
Copy link
Copy Markdown
Collaborator

@gtopper gtopper left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Important aspect of resilience, but the current changes seem to only help with a limited scenario where a small number of batches failed and we go into termination (e.g. on drain) shortly after.

Comment thread storey/flow.py Outdated
Comment thread tests/test_batching_races.py Outdated
Comment thread tests/test_batching_races.py Outdated
exact moment needed to trigger each race.

Tests operate directly on _Batching subclasses, calling _do() to inject events.
This bypasses the AsyncEmitSource run loop (which processes events one at a time),
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no need to bypass AsyncEmitSource. It would be better if the tests built a graph instead of exercising the target directly.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one is still outstanding.

Comment thread storey/flow.py Outdated
Comment thread storey/flow.py Outdated
Comment thread tests/test_batching_races.py Outdated
Comment thread tests/test_batching_races.py Outdated
Alex Toker added 3 commits March 23, 2026 11:39
…cycle

Fix 6 race conditions in _Batching._emit_batch and _emit_all that cause
data loss, KeyError crashes, and writes to closed targets when the timer
task and run loop interleave at await points during slow target writes.

- Pop _batch_events before await _emit() so concurrent _do() creates a
  fresh list instead of appending to the in-flight one
- Cancel _timeout_task at the start of termination before _emit_all
- Replace snapshot-based key iteration in _emit_all with while loop
- Re-insert failed batch data on _emit error for retry/redelivery
- Add error handling in _emit_all so drain path logs instead of crashing

Reference: ML-12304
After _emit failure, batches are now preserved for retry instead of
being permanently lost. Update expected sum from 9 to 10 since event 1
is retried and included in the next successful flush.

Reference: ML-12304
Address code review feedback: remove the re-insert block and
error-handling logic from _emit_batch and _emit_all. These behavior
changes will be handled in a separate PR after design discussion.

This commit retains only the race condition fixes:
- Pop _batch_events before await _emit() to prevent shared-list races
- Cancel _timeout_task at the start of termination before _emit_all
- Replace snapshot-based key iteration in _emit_all with while loop

Also addresses review comments:
- Remove Kafka-specific references from comments
- Update test module docstring to describe fixes not bugs
- Add comment explaining sleep(0) yield in test
- Remove error-handling tests (belong to separate PR)
- Revert test_error_raising_batch_target to original expectations

Reference: ML-12304
@alxtkr77 alxtkr77 force-pushed the ML-12304-fix-batching-race-conditions branch from 104e562 to 964b1a1 Compare March 23, 2026 11:40
Avoid cancel() on _timeout_task during termination — cancelling
mid-_emit would lose already-popped batch data. Instead, set a
_terminating flag that _sleep_and_emit checks between iterations,
then await the task to let it finish its current emit cleanly.

Reference: ML-12304
Comment thread tests/test_batching_races.py Outdated
Copy link
Copy Markdown
Collaborator

@gtopper gtopper left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Production side LGTM. Would be good to see the tests build a flow as we normally do in unit tests. I don't see a reason that wouldn't work here as well, and it will make the tests less artificial.

Alex Toker added 2 commits March 23, 2026 17:33
Rewrite tests to use build_flow with AsyncEmitSource instead of calling
_do() directly, matching production graph construction.

Also: replace asyncio.sleep(0.01) yields with sleep(0).

Reference: ML-12304
Add step-by-step interleaving diagrams back to all four test
docstrings.  These document the exact await-point interleavings
that trigger each race, serving as the specification for the fixes.

Also add inline comments explaining each asyncio.sleep call.

Reference: ML-12304
@alxtkr77 alxtkr77 force-pushed the ML-12304-fix-batching-race-conditions branch from dae900b to 5f482c5 Compare March 23, 2026 17:42
@assaf758 assaf758 merged commit c34fda6 into mlrun:development Mar 23, 2026
5 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants