F069: Parallel for_each Execution
User Stories
US1: Parallel Batch Execution of Independent Loop Items (P1 - Must Have)
As a workflow author,
I want consecutive for_each items marked as parallel to execute concurrently in batches,
So that multi-task workflows complete faster when tasks have no data dependencies.
Acceptance Scenarios:
- Given a for_each step with
parallel_items: true and 4 consecutive items where parallel: true, when the loop executes, then all 4 items run concurrently via ParallelExecutor with all_succeed strategy
- Given a for_each step with
parallel_items: true and items [P, P, S, P, P, P] (P=parallel, S=sequential), when the loop executes, then items 1-2 run as batch 1, item 3 runs sequentially, items 4-6 run as batch 2
- Given a for_each step with
parallel_items: false or omitted, when the loop executes, then all items execute sequentially exactly as today (zero regression)
- Given a for_each step with
parallel_items: true and all items have parallel: false, when the loop executes, then all items execute sequentially (no batches formed)
Independent Test: Create a workflow with 6 items (3 parallel, 1 sequential, 2 parallel). Verify execution order: batch of 3 concurrent, then 1 sequential, then batch of 2 concurrent. Assert total wall time < sequential time.
US2: Concurrency Limiting for Parallel Batches (P1 - Must Have)
As a workflow author,
I want to set a max_concurrent limit on parallel for_each batches,
So that I can control resource consumption and avoid overwhelming the system with too many concurrent tasks.
Acceptance Scenarios:
- Given
parallel_items: true and max_concurrent: 3 with a batch of 6 parallel items, when the batch executes, then at most 3 items run simultaneously (semaphore-bounded)
- Given
parallel_items: true and max_concurrent: 0 (or omitted), when the batch executes, then all items in the batch run concurrently with no limit
- Given
max_concurrent: -1, when the workflow is validated, then validation fails with a clear error message
Independent Test: Set max_concurrent: 2 with 4 parallel items. Instrument execution timestamps. Verify no more than 2 items overlap at any point.
US3: Error Handling in Parallel Batches (P1 - Must Have)
As a workflow author,
I want parallel batch failures to respect continue_on_error semantics,
So that I can choose between fail-fast and best-effort execution for independent tasks.
Acceptance Scenarios:
- Given
parallel_items: true and continue_on_error: false (default), when one item in a batch fails, then the batch uses all_succeed strategy: fails immediately and remaining items are cancelled
- Given
parallel_items: true and continue_on_error: true, when one item in a batch fails, then the batch uses best_effort strategy: continues execution, failed items are recorded with their error, and the loop proceeds to the next batch
- Given a parallel batch where all items succeed, when the batch completes, then a single checkpoint is written for the entire batch (not per-item)
- Given a parallel batch with
continue_on_error: true where item 2 fails, when the batch completes, then the post-batch marking step marks items 1 and 3 as complete and item 2 as failed
Independent Test: Create a batch of 3 parallel items where item 2 always fails. Run once with continue_on_error: false (expect immediate failure), once with continue_on_error: true (expect items 1 and 3 to succeed, item 2 recorded as failed).
US4: Isolated Execution Context and Namespaced State per Parallel Item (P1 - Must Have)
As a workflow author,
I want each parallel item to have its own isolated loop context (loop.Index, loop.Item) and namespaced step state keys,
So that concurrent items do not interfere with each other's interpolation, state, or shared file writes.
Acceptance Scenarios:
- Given 3 parallel items executing concurrently, when each item references
{{.loop.Item}} and {{.loop.Index}}, then each resolves to its own item data and index (no cross-contamination)
- Given 3 parallel items executing concurrently, when each item's loop body writes step output (e.g., from
route_by_type), then outputs are stored under namespaced state keys: <step_name>__batch<batch_index>_item<item_index> (e.g., route_by_type__batch0_item2)
- Given a parallel batch completes, when the loop continues to the next sequential or batch execution, then the last item's state is also written to the canonical key (
route_by_type) for backward compatibility with post-loop interpolation
- Given 3 parallel items that each execute
mark_task_complete which writes to a shared file, when the items run concurrently, then writes must not corrupt each other (either via atomic writes per item or by deferring shared-file writes to a post-batch sequential step)
Independent Test: Run 3 parallel items that each echo {{.loop.Index}}-{{.loop.Item.name}}. Verify outputs are 0-alpha, 1-beta, 2-gamma with no mixing. Verify state keys route_by_type__batch0_item0, route_by_type__batch0_item1, route_by_type__batch0_item2 all exist.
US5: Validation of Parallel Fields (P2 - Should Have)
As a workflow author,
I want parallel_items and max_concurrent to be validated at workflow load time,
So that misconfiguration is caught early before execution begins.
Acceptance Scenarios:
- Given a
while loop step with parallel_items: true, when the workflow is validated, then validation fails with error "parallel_items is only valid on for_each loops"
- Given a
for_each loop step with max_concurrent: -5, when the workflow is validated, then validation fails with error "max_concurrent must be >= 0"
- Given a
for_each loop step with parallel_items: true and valid max_concurrent, when the workflow is validated, then validation passes
Independent Test: Create YAML fixtures for each invalid case. Run awf validate and assert specific error messages.
US6: Resume with Partially Completed Batches (P2 - Should Have)
As a workflow author,
I want resumed workflows to correctly skip already-completed items within a batch,
So that a crash mid-batch only re-executes the incomplete items, not the entire batch.
Acceptance Scenarios:
- Given a batch of [T001, T002, T003] where T001 completed and T002 crashed mid-execution, when the workflow resumes with
resume: true, then the resume filter (based on tasks.md [x] markers) excludes T001 from the items list and T002+T003 form a new batch
- Given all items in a batch completed before crash, when the workflow resumes, then the batch is skipped entirely (all items filtered out)
- Given no items in a batch completed before crash, when the workflow resumes, then the full batch re-executes
Independent Test: Mark T001 as [x] in tasks.md, leave T002 and T003 as [ ]. Run with resume: true. Verify only T002 and T003 execute.
Requirements
Functional Requirements
- FR-001: The LoopExecutor shall group consecutive for_each items where the item's
parallel field is true into batches for concurrent execution
- FR-002: Parallel batch strategy shall be derived from
continue_on_error: all_succeed when false (default), best_effort when true. Strategy is not independently configurable per batch.
- FR-003: The
max_concurrent field shall limit the number of goroutines per batch via the existing semaphore pattern; 0 or omitted means unlimited
- FR-004: Each item within a parallel batch shall receive an isolated
ExecutionContext clone with its own loop.Index, loop.Item, and interpolation scope
- FR-005: Step state keys within a parallel batch shall be namespaced as
<step_name>__batch<N>_item<M> to prevent concurrent write collisions. After batch completion, the canonical (un-namespaced) key shall be set to the last item's value for backward compatibility.
- FR-006: State checkpointing shall occur once per batch completion, not per individual item within a batch
- FR-007: When
parallel_items is false or omitted, for_each behavior shall be identical to the current sequential execution
- FR-008: The
parallel_items field shall only be valid on for_each loop steps; setting it on while loops shall produce a validation error
- FR-009: YAML mapping shall support
parallel_items (bool) and max_concurrent (int) fields on loop step configuration
- FR-010: Loop body steps that write to shared files (e.g.,
mark_task_complete writing to tasks.md) shall be identified as a known concurrency risk. The recommended pattern is to defer shared-file writes to a post-batch sequential step, or use file-level locking.
- FR-011: Each batch item shall execute the full loop body (all body steps including routing and sub-workflow calls), not just a single step. The LoopExecutor creates a
StepExecutor adapter per batch item that runs the complete body sequence.
- FR-012: Resume mode shall filter items before batch grouping. Items already marked
[x] in tasks.md are excluded from the items list, and batches are formed from the remaining items only.
Non-Functional Requirements
- NFR-001: Parallel batch execution wall time shall be bounded by the slowest item in the batch (not the sum of all items)
- NFR-002: Memory overhead per parallel item shall not exceed the cost of one additional loop body execution context
- NFR-003: Zero regression on existing sequential for_each and while loop behavior (measured by full test suite passing)
- NFR-004: No new external dependencies; implementation shall reuse existing ParallelExecutor and errgroup/semaphore patterns
Success Criteria
Key Entities
| Entity |
Description |
Attributes |
| LoopConfig |
Domain configuration for loop steps |
ParallelItems bool, MaxConcurrent int (new fields alongside existing Items, While, Body, MaxIterations, ContinueOnError) |
| ParallelBatch |
Runtime grouping of consecutive parallel items |
Items []any, StartIndex int, EndIndex int (value object, not persisted) |
| BatchItemExecutor |
Adapter running full loop body for one item |
Implements ports.StepExecutor, wraps LoopExecutor body execution with isolated context |
| LoopExecutor |
Application service executing loop steps |
Extended executeForEach() with batch detection and ParallelExecutor delegation |
| ParallelExecutor |
Existing infrastructure for concurrent step execution |
Reused as-is with errgroup + semaphore |
Metadata
- Status: backlog
- Version: v0.4.0
- Priority: high
- Estimation: L
Dependencies
- Blocked by: none
- Unblocks: none
Clarifications
- Strategy selection: Parallel batch strategy (
all_succeed vs best_effort) is derived from the continue_on_error field on the for_each step, not independently configurable. This keeps the YAML surface small and avoids confusing interactions between two strategy knobs.
- State key namespacing: Uses
__batch<N>_item<M> suffix convention (double underscore separator to avoid collision with user-defined step names which use single underscores or hyphens). The canonical key is written after batch completion for backward compatibility with existing {{.states.step_name.Output}} interpolation.
- Shared file writes: The
mark_task_complete step in implement.yaml writes to tasks.md via sed -i. With parallel execution, concurrent sed -i on the same file causes corruption. Two mitigation strategies: (1) move mark_task_complete out of the loop body into a post-batch sequential callback, or (2) replace sed -i with atomic write-to-temp + rename per line. Strategy (1) is recommended as it requires no runtime change.
- Full body execution: Each parallel batch item runs the entire loop body (route_by_type → handler → mark_task_complete), not a single step. This matches how sequential for_each works today — each iteration runs all body steps.
Notes
- The
parallel field on each item in the JSON array is already produced by extract_components in implement.yaml — no upstream changes needed
- ParallelExecutor is reused without modification; the LoopExecutor creates a BatchItemExecutor adapter per batch item that runs the full loop body
- Batch boundaries are determined by consecutive runs of
parallel: true items; interleaved sequential items break batches naturally
- This feature is backward-compatible: omitting
parallel_items or setting it to false preserves current behavior exactly
- Resume filtering happens in
prepare_tdd_loop (existing step) which already removes [x] items — this runs before the loop executor sees items, so batch grouping naturally adapts to the reduced item list
F069: Parallel for_each Execution
User Stories
US1: Parallel Batch Execution of Independent Loop Items (P1 - Must Have)
As a workflow author,
I want consecutive for_each items marked as parallel to execute concurrently in batches,
So that multi-task workflows complete faster when tasks have no data dependencies.
Acceptance Scenarios:
parallel_items: trueand 4 consecutive items whereparallel: true, when the loop executes, then all 4 items run concurrently via ParallelExecutor withall_succeedstrategyparallel_items: trueand items[P, P, S, P, P, P](P=parallel, S=sequential), when the loop executes, then items 1-2 run as batch 1, item 3 runs sequentially, items 4-6 run as batch 2parallel_items: falseor omitted, when the loop executes, then all items execute sequentially exactly as today (zero regression)parallel_items: trueand all items haveparallel: false, when the loop executes, then all items execute sequentially (no batches formed)Independent Test: Create a workflow with 6 items (3 parallel, 1 sequential, 2 parallel). Verify execution order: batch of 3 concurrent, then 1 sequential, then batch of 2 concurrent. Assert total wall time < sequential time.
US2: Concurrency Limiting for Parallel Batches (P1 - Must Have)
As a workflow author,
I want to set a
max_concurrentlimit on parallel for_each batches,So that I can control resource consumption and avoid overwhelming the system with too many concurrent tasks.
Acceptance Scenarios:
parallel_items: trueandmax_concurrent: 3with a batch of 6 parallel items, when the batch executes, then at most 3 items run simultaneously (semaphore-bounded)parallel_items: trueandmax_concurrent: 0(or omitted), when the batch executes, then all items in the batch run concurrently with no limitmax_concurrent: -1, when the workflow is validated, then validation fails with a clear error messageIndependent Test: Set
max_concurrent: 2with 4 parallel items. Instrument execution timestamps. Verify no more than 2 items overlap at any point.US3: Error Handling in Parallel Batches (P1 - Must Have)
As a workflow author,
I want parallel batch failures to respect
continue_on_errorsemantics,So that I can choose between fail-fast and best-effort execution for independent tasks.
Acceptance Scenarios:
parallel_items: trueandcontinue_on_error: false(default), when one item in a batch fails, then the batch usesall_succeedstrategy: fails immediately and remaining items are cancelledparallel_items: trueandcontinue_on_error: true, when one item in a batch fails, then the batch usesbest_effortstrategy: continues execution, failed items are recorded with their error, and the loop proceeds to the next batchcontinue_on_error: truewhere item 2 fails, when the batch completes, then the post-batch marking step marks items 1 and 3 as complete and item 2 as failedIndependent Test: Create a batch of 3 parallel items where item 2 always fails. Run once with
continue_on_error: false(expect immediate failure), once withcontinue_on_error: true(expect items 1 and 3 to succeed, item 2 recorded as failed).US4: Isolated Execution Context and Namespaced State per Parallel Item (P1 - Must Have)
As a workflow author,
I want each parallel item to have its own isolated loop context (
loop.Index,loop.Item) and namespaced step state keys,So that concurrent items do not interfere with each other's interpolation, state, or shared file writes.
Acceptance Scenarios:
{{.loop.Item}}and{{.loop.Index}}, then each resolves to its own item data and index (no cross-contamination)route_by_type), then outputs are stored under namespaced state keys:<step_name>__batch<batch_index>_item<item_index>(e.g.,route_by_type__batch0_item2)route_by_type) for backward compatibility with post-loop interpolationmark_task_completewhich writes to a shared file, when the items run concurrently, then writes must not corrupt each other (either via atomic writes per item or by deferring shared-file writes to a post-batch sequential step)Independent Test: Run 3 parallel items that each echo
{{.loop.Index}}-{{.loop.Item.name}}. Verify outputs are0-alpha,1-beta,2-gammawith no mixing. Verify state keysroute_by_type__batch0_item0,route_by_type__batch0_item1,route_by_type__batch0_item2all exist.US5: Validation of Parallel Fields (P2 - Should Have)
As a workflow author,
I want
parallel_itemsandmax_concurrentto be validated at workflow load time,So that misconfiguration is caught early before execution begins.
Acceptance Scenarios:
whileloop step withparallel_items: true, when the workflow is validated, then validation fails with error "parallel_items is only valid on for_each loops"for_eachloop step withmax_concurrent: -5, when the workflow is validated, then validation fails with error "max_concurrent must be >= 0"for_eachloop step withparallel_items: trueand validmax_concurrent, when the workflow is validated, then validation passesIndependent Test: Create YAML fixtures for each invalid case. Run
awf validateand assert specific error messages.US6: Resume with Partially Completed Batches (P2 - Should Have)
As a workflow author,
I want resumed workflows to correctly skip already-completed items within a batch,
So that a crash mid-batch only re-executes the incomplete items, not the entire batch.
Acceptance Scenarios:
resume: true, then the resume filter (based ontasks.md[x]markers) excludes T001 from the items list and T002+T003 form a new batchIndependent Test: Mark T001 as
[x]in tasks.md, leave T002 and T003 as[ ]. Run withresume: true. Verify only T002 and T003 execute.Requirements
Functional Requirements
parallelfield istrueinto batches for concurrent executioncontinue_on_error:all_succeedwhen false (default),best_effortwhen true. Strategy is not independently configurable per batch.max_concurrentfield shall limit the number of goroutines per batch via the existing semaphore pattern; 0 or omitted means unlimitedExecutionContextclone with its ownloop.Index,loop.Item, and interpolation scope<step_name>__batch<N>_item<M>to prevent concurrent write collisions. After batch completion, the canonical (un-namespaced) key shall be set to the last item's value for backward compatibility.parallel_itemsisfalseor omitted, for_each behavior shall be identical to the current sequential executionparallel_itemsfield shall only be valid onfor_eachloop steps; setting it onwhileloops shall produce a validation errorparallel_items(bool) andmax_concurrent(int) fields on loop step configurationmark_task_completewriting totasks.md) shall be identified as a known concurrency risk. The recommended pattern is to defer shared-file writes to a post-batch sequential step, or use file-level locking.StepExecutoradapter per batch item that runs the complete body sequence.[x]intasks.mdare excluded from the items list, and batches are formed from the remaining items only.Non-Functional Requirements
Success Criteria
golangci-lint run)make lint-arch)Key Entities
ParallelItems bool,MaxConcurrent int(new fields alongside existing Items, While, Body, MaxIterations, ContinueOnError)Items []any,StartIndex int,EndIndex int(value object, not persisted)ports.StepExecutor, wraps LoopExecutor body execution with isolated contextexecuteForEach()with batch detection and ParallelExecutor delegationMetadata
Dependencies
Clarifications
all_succeedvsbest_effort) is derived from thecontinue_on_errorfield on the for_each step, not independently configurable. This keeps the YAML surface small and avoids confusing interactions between two strategy knobs.__batch<N>_item<M>suffix convention (double underscore separator to avoid collision with user-defined step names which use single underscores or hyphens). The canonical key is written after batch completion for backward compatibility with existing{{.states.step_name.Output}}interpolation.mark_task_completestep inimplement.yamlwrites totasks.mdviased -i. With parallel execution, concurrentsed -ion the same file causes corruption. Two mitigation strategies: (1) movemark_task_completeout of the loop body into a post-batch sequential callback, or (2) replacesed -iwith atomic write-to-temp + rename per line. Strategy (1) is recommended as it requires no runtime change.Notes
parallelfield on each item in the JSON array is already produced byextract_componentsinimplement.yaml— no upstream changes neededparallel: trueitems; interleaved sequential items break batches naturallyparallel_itemsor setting it tofalsepreserves current behavior exactlyprepare_tdd_loop(existing step) which already removes[x]items — this runs before the loop executor sees items, so batch grouping naturally adapts to the reduced item list