Fix concurrent streaming in ConcurrentExecution#638
Open
gtopper wants to merge 1 commit intomlrun:developmentfrom
Open
Fix concurrent streaming in ConcurrentExecution#638gtopper wants to merge 1 commit intomlrun:developmentfrom
ConcurrentExecution#638gtopper wants to merge 1 commit intomlrun:developmentfrom
Conversation
[ML-12378](https://iguazio.atlassian.net/browse/ML-12378) Streaming generators were iterated serially in the FIFO worker, negating `max_in_flight` concurrency. Offload generator iteration to background tasks feeding an `asyncio.Queue`, enabling concurrent streaming across events.
royischoss
reviewed
Apr 16, 2026
Collaborator
royischoss
left a comment
There was a problem hiding this comment.
Hey LGTM minor comments
| args, | ||
| mp_queue, | ||
| ) | ||
| ipc_gen = _async_read_streaming_queue(mp_queue) |
Collaborator
There was a problem hiding this comment.
what is ipc stand for?
|
|
||
| assert result == ["re_test_0", "re_test_1"] | ||
|
|
||
| # -- ML-12378 concurrency tests ---------------------------------------- |
Collaborator
There was a problem hiding this comment.
do we need the task reference in docs and variables name?
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
ML-12378
Problem
When
ConcurrentExecutionprocesses streaming (generator) event processors withmax_in_flight > 1, the generators are iterated serially in the FIFO worker coroutine. This blocks the worker from picking up other events until the current generator is fully consumed, negating the concurrency benefit ofmax_in_flight.Root cause
_handle_completedcalled_emit_streaming_chunks(inherited from_StreamingStepMixin), which iterates the generator inline — holding the worker for the entire duration of each generator. With 4 events and a 0.5s generator, total time is ~2.0s instead of ~0.5s.Solution
Offload generator iteration to background
asyncio.Tasks that feed anasyncio.Queue, freeing the FIFO worker to process other events concurrently.New internal types:
_GeneratorDone(error)— sentinel placed on the queue when the generator is exhausted (or errors)._StreamingQueue(queue, task)— wraps the queue and its background task for lifecycle management.New methods on
ConcurrentExecution:_iterate_generator— background task that consumes a generator (sync or async) into anasyncio.Queue. Sync generators userun_in_executorpernext()call to avoid blocking the event loop._emit_streaming_from_queue— mirrors_emit_streaming_chunksbut reads from the queue, emittingStreamChunkevents and a finalStreamCompletion(with error if the generator raised).Process-based mechanisms (
process_pool,dedicated_process) use the existing IPC pattern:_concurrent_streaming_run_in_subprocessruns the generator in the subprocess and streams chunks viamultiprocessing.Queue, which_async_read_streaming_queueconsumes as an async generator on the parent side.Task lifecycle:
_StreamingQueueholds theasyncio.Taskreference, and_handle_completedawaits it after draining the queue. This prevents GC of fire-and-forget tasks and surfaces unexpected errors.Additional changes
ConcurrentExecutiontoParallelExecutionMechanisms: replaces the old_supported_concurrency_mechanismslist with the shared enum. Legacy names ("threading","multiprocessing") are mapped automatically for backward compatibility.inspect.isgeneratorfunctionin__init__, used to route process-based mechanisms to the IPC streaming path in_process_event.Tests
test_concurrent_streaming_asyncio_async_gen,test_concurrent_streaming_sync_genparametrized overthread_pool/process_pool/dedicated_process/naive): verify generators run concurrently across events.test_concurrent_streaming_error_asyncio_async_gen,test_concurrent_streaming_error_sync_genparametrized over all mechanisms,test_concurrent_streaming_error_mixed_with_healthy): verify mid-stream errors propagate correctly and don't poison healthy concurrent events.