Skip to content

Fix concurrent streaming in ConcurrentExecution [1.11.x]#637

Closed
gtopper wants to merge 1 commit intomlrun:1.11.xfrom
gtopper:ML-12378_1.11.x
Closed

Fix concurrent streaming in ConcurrentExecution [1.11.x]#637
gtopper wants to merge 1 commit intomlrun:1.11.xfrom
gtopper:ML-12378_1.11.x

Conversation

@gtopper
Copy link
Copy Markdown
Collaborator

@gtopper gtopper commented Mar 31, 2026

ML-12378

Problem

When ConcurrentExecution processes streaming (generator) event processors with max_in_flight > 1, the generators are iterated serially in the FIFO worker coroutine. This blocks the worker from picking up other events until the current generator is fully consumed, negating the concurrency benefit of max_in_flight.

Root cause

_handle_completed called _emit_streaming_chunks (inherited from _StreamingStepMixin), which iterates the generator inline — holding the worker for the entire duration of each generator. With 4 events and a 0.5s generator, total time is ~2.0s instead of ~0.5s.

Solution

Offload generator iteration to background asyncio.Tasks that feed an asyncio.Queue, freeing the FIFO worker to process other events concurrently.

New internal types:

  • _GeneratorDone(error) — sentinel placed on the queue when the generator is exhausted (or errors).
  • _StreamingQueue(queue, task) — wraps the queue and its background task for lifecycle management.

New methods on ConcurrentExecution:

  • _iterate_generator — background task that consumes a generator (sync or async) into an asyncio.Queue. Sync generators use run_in_executor per next() call to avoid blocking the event loop.
  • _emit_streaming_from_queue — mirrors _emit_streaming_chunks but reads from the queue, emitting StreamChunk events and a final StreamCompletion (with error if the generator raised).

Process-based mechanisms (process_pool, dedicated_process) use the existing IPC pattern: _concurrent_streaming_run_in_subprocess runs the generator in the subprocess and streams chunks via multiprocessing.Queue, which _async_read_streaming_queue consumes as an async generator on the parent side.

Task lifecycle: _StreamingQueue holds the asyncio.Task reference, and _handle_completed awaits it after draining the queue. This prevents GC of fire-and-forget tasks and surfaces unexpected errors.

Additional changes

  • Migrated ConcurrentExecution to ParallelExecutionMechanisms: replaces the old _supported_concurrency_mechanisms list with the shared enum. Legacy names ("threading", "multiprocessing") are mapped automatically for backward compatibility.
  • Detects generator functions early via inspect.isgeneratorfunction in __init__, used to route process-based mechanisms to the IPC streaming path in _process_event.

Tests

  • Concurrency tests (test_concurrent_streaming_asyncio_async_gen, test_concurrent_streaming_sync_gen parametrized over thread_pool/process_pool/dedicated_process/naive): verify generators run concurrently across events.
  • Error tests (test_concurrent_streaming_error_asyncio_async_gen, test_concurrent_streaming_error_sync_gen parametrized over all mechanisms, test_concurrent_streaming_error_mixed_with_healthy): verify mid-stream errors propagate correctly and don't poison healthy concurrent events.

[ML-12378](https://iguazio.atlassian.net/browse/ML-12378)

Streaming generators were iterated serially in the FIFO worker, negating `max_in_flight` concurrency. Offload generator iteration to background tasks feeding an `asyncio.Queue`, enabling concurrent streaming across events.
@gtopper
Copy link
Copy Markdown
Collaborator Author

gtopper commented Mar 31, 2026

Development branch PR – #638.

@gtopper gtopper changed the base branch from development to 1.11.x March 31, 2026 08:31
@gtopper
Copy link
Copy Markdown
Collaborator Author

gtopper commented Apr 1, 2026

Closing until such time as we may decide to backport this to 1.11.x.

@gtopper gtopper closed this Apr 1, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant