Skip to content

F069: Parallel for_each Execution #225

@pocky

Description

@pocky

F069: Parallel for_each Execution

User Stories

US1: Parallel Batch Execution of Independent Loop Items (P1 - Must Have)

As a workflow author,
I want consecutive for_each items marked as parallel to execute concurrently in batches,
So that multi-task workflows complete faster when tasks have no data dependencies.

Acceptance Scenarios:

  • Given a for_each step with parallel_items: true and 4 consecutive items where parallel: true, when the loop executes, then all 4 items run concurrently via ParallelExecutor with all_succeed strategy
  • Given a for_each step with parallel_items: true and items [P, P, S, P, P, P] (P=parallel, S=sequential), when the loop executes, then items 1-2 run as batch 1, item 3 runs sequentially, items 4-6 run as batch 2
  • Given a for_each step with parallel_items: false or omitted, when the loop executes, then all items execute sequentially exactly as today (zero regression)
  • Given a for_each step with parallel_items: true and all items have parallel: false, when the loop executes, then all items execute sequentially (no batches formed)

Independent Test: Create a workflow with 6 items (3 parallel, 1 sequential, 2 parallel). Verify execution order: batch of 3 concurrent, then 1 sequential, then batch of 2 concurrent. Assert total wall time < sequential time.

US2: Concurrency Limiting for Parallel Batches (P1 - Must Have)

As a workflow author,
I want to set a max_concurrent limit on parallel for_each batches,
So that I can control resource consumption and avoid overwhelming the system with too many concurrent tasks.

Acceptance Scenarios:

  • Given parallel_items: true and max_concurrent: 3 with a batch of 6 parallel items, when the batch executes, then at most 3 items run simultaneously (semaphore-bounded)
  • Given parallel_items: true and max_concurrent: 0 (or omitted), when the batch executes, then all items in the batch run concurrently with no limit
  • Given max_concurrent: -1, when the workflow is validated, then validation fails with a clear error message

Independent Test: Set max_concurrent: 2 with 4 parallel items. Instrument execution timestamps. Verify no more than 2 items overlap at any point.

US3: Error Handling in Parallel Batches (P1 - Must Have)

As a workflow author,
I want parallel batch failures to respect continue_on_error semantics,
So that I can choose between fail-fast and best-effort execution for independent tasks.

Acceptance Scenarios:

  • Given parallel_items: true and continue_on_error: false (default), when one item in a batch fails, then the batch uses all_succeed strategy: fails immediately and remaining items are cancelled
  • Given parallel_items: true and continue_on_error: true, when one item in a batch fails, then the batch uses best_effort strategy: continues execution, failed items are recorded with their error, and the loop proceeds to the next batch
  • Given a parallel batch where all items succeed, when the batch completes, then a single checkpoint is written for the entire batch (not per-item)
  • Given a parallel batch with continue_on_error: true where item 2 fails, when the batch completes, then the post-batch marking step marks items 1 and 3 as complete and item 2 as failed

Independent Test: Create a batch of 3 parallel items where item 2 always fails. Run once with continue_on_error: false (expect immediate failure), once with continue_on_error: true (expect items 1 and 3 to succeed, item 2 recorded as failed).

US4: Isolated Execution Context and Namespaced State per Parallel Item (P1 - Must Have)

As a workflow author,
I want each parallel item to have its own isolated loop context (loop.Index, loop.Item) and namespaced step state keys,
So that concurrent items do not interfere with each other's interpolation, state, or shared file writes.

Acceptance Scenarios:

  • Given 3 parallel items executing concurrently, when each item references {{.loop.Item}} and {{.loop.Index}}, then each resolves to its own item data and index (no cross-contamination)
  • Given 3 parallel items executing concurrently, when each item's loop body writes step output (e.g., from route_by_type), then outputs are stored under namespaced state keys: <step_name>__batch<batch_index>_item<item_index> (e.g., route_by_type__batch0_item2)
  • Given a parallel batch completes, when the loop continues to the next sequential or batch execution, then the last item's state is also written to the canonical key (route_by_type) for backward compatibility with post-loop interpolation
  • Given 3 parallel items that each execute mark_task_complete which writes to a shared file, when the items run concurrently, then writes must not corrupt each other (either via atomic writes per item or by deferring shared-file writes to a post-batch sequential step)

Independent Test: Run 3 parallel items that each echo {{.loop.Index}}-{{.loop.Item.name}}. Verify outputs are 0-alpha, 1-beta, 2-gamma with no mixing. Verify state keys route_by_type__batch0_item0, route_by_type__batch0_item1, route_by_type__batch0_item2 all exist.

US5: Validation of Parallel Fields (P2 - Should Have)

As a workflow author,
I want parallel_items and max_concurrent to be validated at workflow load time,
So that misconfiguration is caught early before execution begins.

Acceptance Scenarios:

  • Given a while loop step with parallel_items: true, when the workflow is validated, then validation fails with error "parallel_items is only valid on for_each loops"
  • Given a for_each loop step with max_concurrent: -5, when the workflow is validated, then validation fails with error "max_concurrent must be >= 0"
  • Given a for_each loop step with parallel_items: true and valid max_concurrent, when the workflow is validated, then validation passes

Independent Test: Create YAML fixtures for each invalid case. Run awf validate and assert specific error messages.

US6: Resume with Partially Completed Batches (P2 - Should Have)

As a workflow author,
I want resumed workflows to correctly skip already-completed items within a batch,
So that a crash mid-batch only re-executes the incomplete items, not the entire batch.

Acceptance Scenarios:

  • Given a batch of [T001, T002, T003] where T001 completed and T002 crashed mid-execution, when the workflow resumes with resume: true, then the resume filter (based on tasks.md [x] markers) excludes T001 from the items list and T002+T003 form a new batch
  • Given all items in a batch completed before crash, when the workflow resumes, then the batch is skipped entirely (all items filtered out)
  • Given no items in a batch completed before crash, when the workflow resumes, then the full batch re-executes

Independent Test: Mark T001 as [x] in tasks.md, leave T002 and T003 as [ ]. Run with resume: true. Verify only T002 and T003 execute.


Requirements

Functional Requirements

  • FR-001: The LoopExecutor shall group consecutive for_each items where the item's parallel field is true into batches for concurrent execution
  • FR-002: Parallel batch strategy shall be derived from continue_on_error: all_succeed when false (default), best_effort when true. Strategy is not independently configurable per batch.
  • FR-003: The max_concurrent field shall limit the number of goroutines per batch via the existing semaphore pattern; 0 or omitted means unlimited
  • FR-004: Each item within a parallel batch shall receive an isolated ExecutionContext clone with its own loop.Index, loop.Item, and interpolation scope
  • FR-005: Step state keys within a parallel batch shall be namespaced as <step_name>__batch<N>_item<M> to prevent concurrent write collisions. After batch completion, the canonical (un-namespaced) key shall be set to the last item's value for backward compatibility.
  • FR-006: State checkpointing shall occur once per batch completion, not per individual item within a batch
  • FR-007: When parallel_items is false or omitted, for_each behavior shall be identical to the current sequential execution
  • FR-008: The parallel_items field shall only be valid on for_each loop steps; setting it on while loops shall produce a validation error
  • FR-009: YAML mapping shall support parallel_items (bool) and max_concurrent (int) fields on loop step configuration
  • FR-010: Loop body steps that write to shared files (e.g., mark_task_complete writing to tasks.md) shall be identified as a known concurrency risk. The recommended pattern is to defer shared-file writes to a post-batch sequential step, or use file-level locking.
  • FR-011: Each batch item shall execute the full loop body (all body steps including routing and sub-workflow calls), not just a single step. The LoopExecutor creates a StepExecutor adapter per batch item that runs the complete body sequence.
  • FR-012: Resume mode shall filter items before batch grouping. Items already marked [x] in tasks.md are excluded from the items list, and batches are formed from the remaining items only.

Non-Functional Requirements

  • NFR-001: Parallel batch execution wall time shall be bounded by the slowest item in the batch (not the sum of all items)
  • NFR-002: Memory overhead per parallel item shall not exceed the cost of one additional loop body execution context
  • NFR-003: Zero regression on existing sequential for_each and while loop behavior (measured by full test suite passing)
  • NFR-004: No new external dependencies; implementation shall reuse existing ParallelExecutor and errgroup/semaphore patterns

Success Criteria

  • All P1 user stories (US1, US2, US3, US4) implemented and tested
  • All P2 user stories (US5, US6) implemented and tested
  • Unit test coverage >= 80% for new/modified code
  • No lint errors (golangci-lint run)
  • No architecture violations (make lint-arch)
  • Existing for_each and while loop tests pass unchanged
  • Documentation updated (YAML syntax reference)

Key Entities

Entity Description Attributes
LoopConfig Domain configuration for loop steps ParallelItems bool, MaxConcurrent int (new fields alongside existing Items, While, Body, MaxIterations, ContinueOnError)
ParallelBatch Runtime grouping of consecutive parallel items Items []any, StartIndex int, EndIndex int (value object, not persisted)
BatchItemExecutor Adapter running full loop body for one item Implements ports.StepExecutor, wraps LoopExecutor body execution with isolated context
LoopExecutor Application service executing loop steps Extended executeForEach() with batch detection and ParallelExecutor delegation
ParallelExecutor Existing infrastructure for concurrent step execution Reused as-is with errgroup + semaphore

Metadata

  • Status: backlog
  • Version: v0.4.0
  • Priority: high
  • Estimation: L

Dependencies

  • Blocked by: none
  • Unblocks: none

Clarifications

  • Strategy selection: Parallel batch strategy (all_succeed vs best_effort) is derived from the continue_on_error field on the for_each step, not independently configurable. This keeps the YAML surface small and avoids confusing interactions between two strategy knobs.
  • State key namespacing: Uses __batch<N>_item<M> suffix convention (double underscore separator to avoid collision with user-defined step names which use single underscores or hyphens). The canonical key is written after batch completion for backward compatibility with existing {{.states.step_name.Output}} interpolation.
  • Shared file writes: The mark_task_complete step in implement.yaml writes to tasks.md via sed -i. With parallel execution, concurrent sed -i on the same file causes corruption. Two mitigation strategies: (1) move mark_task_complete out of the loop body into a post-batch sequential callback, or (2) replace sed -i with atomic write-to-temp + rename per line. Strategy (1) is recommended as it requires no runtime change.
  • Full body execution: Each parallel batch item runs the entire loop body (route_by_type → handler → mark_task_complete), not a single step. This matches how sequential for_each works today — each iteration runs all body steps.

Notes

  • The parallel field on each item in the JSON array is already produced by extract_components in implement.yaml — no upstream changes needed
  • ParallelExecutor is reused without modification; the LoopExecutor creates a BatchItemExecutor adapter per batch item that runs the full loop body
  • Batch boundaries are determined by consecutive runs of parallel: true items; interleaved sequential items break batches naturally
  • This feature is backward-compatible: omitting parallel_items or setting it to false preserves current behavior exactly
  • Resume filtering happens in prepare_tdd_loop (existing step) which already removes [x] items — this runs before the loop executor sees items, so batch grouping naturally adapts to the reduced item list

Metadata

Metadata

Assignees

No one assigned

    Labels

    featureFeature specificationv0.6.0Version v0.6.0

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions