From d86c1c26a47672473f401a1dddd575042b9b62c0 Mon Sep 17 00:00:00 2001 From: Rohan Naik Date: Wed, 11 Feb 2026 23:38:56 +0530 Subject: [PATCH 1/4] Add workflow execution framework with sequential, conditional, and parallel step support --- Taskfile.yml | 1 + cagent-schema.json | 19 ++ cmd/root/run.go | 77 ++++++++ docs/workflow-module.md | 280 +++++++++++++++++++++++++++++ examples/README.md | 9 + examples/workflow_parallel.yaml | 43 +++++ examples/workflow_sequential.yaml | 45 +++++ pkg/config/latest/types.go | 3 + pkg/config/latest/validate.go | 53 ++++++ pkg/config/latest/validate_test.go | 54 ++++++ pkg/teamloader/teamloader.go | 4 + pkg/workflow/context.go | 177 ++++++++++++++++++ pkg/workflow/context_test.go | 23 +++ pkg/workflow/loop_counter.go | 40 +++++ pkg/workflow/types.go | 107 +++++++++++ pkg/workflowrun/executor.go | 199 ++++++++++++++++++++ 16 files changed, 1134 insertions(+) create mode 100644 docs/workflow-module.md create mode 100644 examples/workflow_parallel.yaml create mode 100644 examples/workflow_sequential.yaml create mode 100644 pkg/workflow/context.go create mode 100644 pkg/workflow/context_test.go create mode 100644 pkg/workflow/loop_counter.go create mode 100644 pkg/workflow/types.go create mode 100644 pkg/workflowrun/executor.go diff --git a/Taskfile.yml b/Taskfile.yml index f61ce6300..3374e597e 100644 --- a/Taskfile.yml +++ b/Taskfile.yml @@ -24,6 +24,7 @@ tasks: desc: Build the application binary cmds: - go build -ldflags "{{.LDFLAGS}}" -o {{.BUILD_DIR}}/{{.BINARY_NAME}} {{.MAIN_PKG}} + - mkdir -p {{.HOME}}/bin - ln -sf {{.USER_WORKING_DIR}}/{{.BUILD_DIR}}/{{.BINARY_NAME}} {{.HOME}}/bin/{{.BINARY_NAME}} sources: - "{{.GO_SOURCES}}" diff --git a/cagent-schema.json b/cagent-schema.json index 677be8384..9cc91296d 100644 --- a/cagent-schema.json +++ b/cagent-schema.json @@ -30,6 +30,25 @@ "$ref": "#/definitions/ProviderConfig" } }, + "workflow": { + "type": "object", + "properties": { + "steps": { + "type": "array", + "description": "List of workflow steps", + "items": { + "$ref": "#/definitions/StepConfig" + } + }, + "max_loop_iterations": { + "type": "integer", + "description": "Maximum number of times a step can be re-executed due to a conditional back-edge (loop). Default: 100", + "minimum": 0 + } + }, + "additionalProperties": false, + "description": "Workflow configuration for sequential, conditional, and parallel step execution" + }, "agents": { "type": "object", "description": "Map of agent configurations", diff --git a/cmd/root/run.go b/cmd/root/run.go index c464aa554..21b61fecb 100644 --- a/cmd/root/run.go +++ b/cmd/root/run.go @@ -23,6 +23,8 @@ import ( "github.com/docker/cagent/pkg/teamloader" "github.com/docker/cagent/pkg/telemetry" "github.com/docker/cagent/pkg/tui/styles" + "github.com/docker/cagent/pkg/workflow" + "github.com/docker/cagent/pkg/workflowrun" ) type runExecFlags struct { @@ -50,6 +52,9 @@ type runExecFlags struct { // Run only hideToolResults bool + + // Workflow: set when config has workflow; exec mode runs workflow instead of single agent + workflowConfig *workflow.Config } func newRunCmd() *cobra.Command { @@ -219,6 +224,8 @@ func (f *runExecFlags) runOrExec(ctx context.Context, out *cli.Printer, args []s return err } + f.workflowConfig = loadResult.Workflow + rt, sess, err = f.createLocalRuntimeAndSession(ctx, loadResult) if err != nil { return err @@ -399,6 +406,10 @@ func (f *runExecFlags) handleExecMode(ctx context.Context, out *cli.Printer, rt execArgs = append(execArgs, "Please proceed.") } + if f.workflowConfig != nil { + return f.runExecWorkflow(ctx, out, rt, sess, execArgs[1]) + } + err := cli.Run(ctx, out, cli.Config{ AppName: AppName, AttachmentPath: f.attachmentPath, @@ -412,6 +423,72 @@ func (f *runExecFlags) handleExecMode(ctx context.Context, out *cli.Printer, rt return err } +// runExecWorkflow runs the workflow executor and prints events to out (exec mode only). +func (f *runExecFlags) runExecWorkflow(ctx context.Context, out *cli.Printer, rt runtime.Runtime, sess *session.Session, userMessage string) error { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + sess.AddMessage(cli.PrepareUserMessage(ctx, rt, userMessage, f.attachmentPath)) + sess.SendUserMessage = true + + exec := workflowrun.NewLocalExecutor(rt) + events := make(chan workflowrun.Event, 128) + go func() { + defer close(events) + if err := exec.Run(ctx, f.workflowConfig, sess, events); err != nil { + events <- runtime.Error(err.Error()) + } + }() + + var lastErr error + firstAgent := true + lastAgentName := "" + for event := range events { + if errEvent, ok := event.(*runtime.ErrorEvent); ok { + lastErr = fmt.Errorf("%s", errEvent.Error) + out.PrintError(lastErr) + continue + } + var agentName string + if re, ok := event.(runtime.Event); ok { + agentName = re.GetAgentName() + } + if agentName != "" && (firstAgent || agentName != lastAgentName) { + if !firstAgent { + out.Println() + } + out.PrintAgentName(agentName) + firstAgent = false + lastAgentName = agentName + } + switch e := event.(type) { + case *runtime.AgentChoiceEvent: + out.Print(e.Content) + case *runtime.AgentChoiceReasoningEvent: + out.Print(e.Content) + case *runtime.ToolCallConfirmationEvent: + if !f.autoApprove { + rt.Resume(ctx, runtime.ResumeReject("")) + } else { + rt.Resume(ctx, runtime.ResumeApprove()) + } + case *runtime.ToolCallEvent: + if !f.hideToolCalls { + out.PrintToolCall(e.ToolCall) + } + case *runtime.ToolCallResponseEvent: + if !f.hideToolCalls { + out.PrintToolCallResponse(e.ToolCall, e.Response) + } + } + } + + if lastErr != nil { + return RuntimeError{Err: lastErr} + } + return nil +} + func readInitialMessage(args []string) (*string, error) { if len(args) < 2 { return nil, nil diff --git a/docs/workflow-module.md b/docs/workflow-module.md new file mode 100644 index 000000000..a258bc703 --- /dev/null +++ b/docs/workflow-module.md @@ -0,0 +1,280 @@ +# Cagent Workflow Module + +This document designs the three core workflow execution patterns in docker/cagent and answers implementation-specific use cases. + +## Overview + +Workflows define **declarative pipelines** of agents and conditions. Execution is driven by the runtime: each step runs an agent (or evaluates a condition), and step outputs flow to the next step according to the pattern. + +## 1. Sequential Step Execution + +**Description:** Agents execute one after another in a linear chain. Each agent's output becomes available as input context for the next agent. + +**Example:** + +```yaml +workflow: + - type: agent + name: generator + - type: agent + name: translator + - type: agent + name: publisher +``` + +**Behavior:** + +- `generator` runs first and completes. +- `translator` receives `generator`'s output and processes it. +- `publisher` receives `translator`'s output and finalizes. + +**Output propagation:** Step `n` receives a single **previous output** (the last assistant message content from step `n-1`), exposed as `{{ $steps..output }}` or by position. + +--- + +## 2. Conditional Branching & Loops + +**Description:** The workflow branches based on condition evaluation. When a condition's branch routes back to an earlier step (by step ID or index), it creates a **loop**. Conditions reference step outputs via templates. + +**Example:** + +```yaml +workflow: + - id: gen + type: agent + name: generator + - id: trans + type: agent + name: translator + - id: qa_check + type: condition + name: qa_check + condition: "{{ $steps.qa.output.is_approved }}" + true: + - type: agent + name: publisher + false: + - id: back_to_trans + type: agent + name: translator + - id: qa + type: agent + name: qa_agent +``` + +**Behavior:** + +- After `translator`, the `qa_check` condition runs (using `qa_agent` output when referenced by `$steps.qa.output`). +- If `is_approved == true`: workflow proceeds to `publisher`. +- If `is_approved == false`: workflow routes to the step that runs `translator` again (retry loop). + +**Condition schema:** Conditions are evaluated after the step(s) that produce the referenced output. The condition expression uses a small expression language (e.g. `{{ $steps..output. }}`) and must resolve to a boolean. Schema validation ensures referenced step IDs exist and that structured output (e.g. `is_approved`) is declared where needed (e.g. via agent `structured_output`). + +--- + +## 3. Parallel Step Execution + +**Description:** Multiple steps run concurrently. The workflow waits for **all** parallel steps to complete before moving to the next sequential step. + +**Example:** + +```yaml +workflow: + - type: parallel + id: par_gen + steps: + - id: gen_1 + type: agent + name: generator + - id: gen_2 + type: agent + name: generator + - type: agent + name: translator +``` + +**Behavior:** + +- Two `generator` agents run simultaneously. +- Both must complete before `translator` starts. +- `translator` receives **outputs from all parallel steps** (see "Output structure from parallel steps" below). + +**Error handling:** If **any** agent in a parallel block fails, the **entire workflow** fails immediately (all-or-nothing). No partial success; this keeps data consistency and avoids downstream agents seeing incomplete data. + +--- + +## Use Case: How deep can loops go? (max iteration count) + +**Answer:** Loops are bounded by a **max loop iterations** setting. + +- **Config:** `workflow.max_loop_iterations` (default: `100`). Optional per-workflow override: `workflow.overrides.max_loop_iterations`. +- **Semantics:** A "loop" is one execution of a cycle (e.g. trans → qa_check → trans). The executor counts how many times the **same step ID** has been executed in a cycle. When that count reaches `max_loop_iterations`, the workflow fails with a deterministic error (e.g. `workflow: max loop iterations exceeded (step: trans, limit: 100)`). +- **Scope:** The count is per logical loop (per back-edge in the workflow graph), not global across all steps. + +This prevents infinite loops while allowing retries (e.g. QA reject → translator) up to a clear limit. + +--- + +## Use Case: Can we nest parallel blocks? + +**Answer:** **Yes.** Parallel steps are just steps; their children can be any step type, including another `parallel`. + +**Example:** + +```yaml +workflow: + - type: parallel + id: outer + steps: + - type: agent + name: generator + - type: parallel + id: inner + steps: + - type: agent + name: researcher + - type: agent + name: summarizer + - type: agent + name: publisher +``` + +**Behavior:** `generator` runs in parallel with the inner parallel block (`researcher` and `summarizer`). All three agent outputs are available to `publisher` (see output structure below). Failure of any of the three fails the whole workflow. + +--- + +## Use Case: How are outputs from multiple parallel agents structured when passed to the next step? + +**Answer:** Outputs from a parallel block are passed as a **keyed map** by step ID (and optionally by index for backwards compatibility). + +**Structure:** + +```json +{ + "steps": { + "gen_1": { "output": "", "agent": "generator" }, + "gen_2": { "output": "", "agent": "generator" } + }, + "order": ["gen_1", "gen_2"] +} +``` + +- **Next step input:** The next agent receives a single **context message** (e.g. user or system) that includes this structure (e.g. serialized as JSON or YAML in the prompt), so the agent can see all parallel outputs. +- **Templates:** In conditions or in agent instructions, parallel outputs are accessed as: + - `{{ $steps.par_gen.outputs.gen_1.output }}` + - `{{ $steps.par_gen.outputs.gen_2.output }}` + - Or by index: `{{ $steps.par_gen.outputs[0].output }}` (using `order` for deterministic indexing). + +So: **one structured object** keyed by step ID (and ordered list for index-based access), passed as context to the next step. + +--- + +## Use Case: What retry behavior exists for failed steps? + +**Answer:** Configurable **per-step retry** with optional backoff. + +- **Config:** On any step (agent or parallel block): + - `retry.max_attempts` (default: 0 = no retry) + - `retry.backoff` (optional): `fixed` (e.g. 1s) or `exponential` (e.g. 1s, 2s, 4s) + - `retry.on` (optional): list of error patterns or exit conditions to retry on (e.g. `["timeout", "rate_limit"]`); if absent, retry on any error. + +**Behavior:** + +- A **step** (single agent or whole parallel block) is retried up to `max_attempts` times on failure. +- After exhausting retries, the **workflow** fails (no partial success for parallel). +- Retries are **transparent** to downstream steps: they only see the final success or the workflow fails. + +**Loops vs retries:** Loops (condition → back to earlier step) are **logical workflow branches**. Retries are **transient error handling** for the same step. Both can be used: e.g. retry a step 2 times, then continue to a condition that may send the workflow back to an earlier step (e.g. QA reject → translator). + +--- + +## Use Case: How do we access outputs from parallel steps in subsequent agents? + +**Answer:** Two mechanisms: + +1. **Automatic context injection:** The executor injects a **context blob** into the next step's session (e.g. as a system or user message) containing: + - `$steps..outputs` — the keyed map of step ID → `{ output, agent }` + - `$steps..order` — deterministic order for index-based access. + +2. **Templates in config:** In agent instructions or in condition expressions, use: + - `{{ $steps.par_gen.outputs.gen_1.output }}` — output of parallel step `gen_1` + - `{{ $steps.par_gen.outputs[0].output }}` — first output by `order` + - Same for nested parallel: `{{ $steps.outer.outputs.inner.outputs.researcher.output }}` (or a flatter key like `inner.researcher` by convention). + +So: **structured access by step ID** (and by index via `order`), both in injected context and in templates. + +--- + +## Summary Table + +| Topic | Decision | +|-----------------------|--------------------------------------------------------------------------| +| Loop depth | `max_loop_iterations` (default 100); per-cycle count per step ID | +| Nested parallel | Yes; parallel steps can contain parallel (or any) steps | +| Parallel output shape | Keyed map by step ID + `order` array; one blob to next step | +| Retry | Per-step `retry.max_attempts` + optional backoff; workflow fails after | +| Access parallel outs | `$steps..outputs..output` and `$steps..outputs[n]` | +| Parallel failure | Any failure in a parallel block fails the whole workflow immediately | + +--- + +## How to run workflow via CLI + +When your agent config defines a `workflow` section, use **exec** (non-TUI) to run the workflow: + +```bash +# Run workflow from config (exec mode runs the workflow executor) +cagent exec ./agent-with-workflow.yaml + +# With a prompt (passed as initial user message to the workflow) +cagent exec ./agent-with-workflow.yaml "Translate and publish this draft" + +# With stdin +echo "Process these items" | cagent exec ./agent-with-workflow.yaml - +``` + +Workflow execution is **only** wired for **exec** mode. The `run` command (TUI) still uses single-agent mode even when the config has a workflow. + +## Implementation Notes + +- **Types:** `pkg/workflow` holds workflow and step types (Config, Step, StepContext, loop counter, condition evaluation). No dependency on runtime or session to avoid import cycles. +- **Executor:** `pkg/workflowrun` holds the executor: runs the workflow DAG (sequential/conditional/parallel), calls runtime `RunStream` per agent step, maintains step outputs and loop counters, evaluates conditions, and injects output context into sessions. Use `workflowrun.NewLocalExecutor(runtime)` and `Executor.Run(ctx, cfg, sess, events)`. +- **Config:** Workflow config lives in `pkg/config/latest` as `Config.Workflow` (type `*workflow.Config`). Validation in `validate.go` ensures agent names exist, step types are valid, and condition steps have a condition expression. +- **CLI:** When `Config.Workflow` is set, `cagent exec` uses the workflow executor and streams events to stdout; `cagent run` (TUI) still uses single-agent mode. + +Developer Certificate of Origin +Version 1.1 + +Copyright (C) 2004, 2006 The Linux Foundation and its contributors. +1 Letterman Drive +Suite D4700 +San Francisco, CA, 94129 + +Everyone is permitted to copy and distribute verbatim copies of this +license document, but changing it is not allowed. + +Developer's Certificate of Origin 1.1 + +By making a contribution to this project, I certify that: + +(a) The contribution was created in whole or in part by me and I + have the right to submit it under the open source license + indicated in the file; or + +(b) The contribution is based upon previous work that, to the best + of my knowledge, is covered under an appropriate open source + license and I have the right under that license to submit that + work with modifications, whether created in whole or in part + by me, under the same open source license (unless I am + permitted to submit under a different license), as indicated + in the file; or + +(c) The contribution was provided directly to me by some other + person who certified (a), (b) or (c) and I have not modified + it. + +(d) I understand and agree that this project and the contribution + are public and that a record of the contribution (including all + personal information I submit with it, including my sign-off) is + maintained indefinitely and may be redistributed consistent with + this project or the open source license(s) involved. \ No newline at end of file diff --git a/examples/README.md b/examples/README.md index b3caa1bec..14741244b 100644 --- a/examples/README.md +++ b/examples/README.md @@ -56,3 +56,12 @@ These examples are groups of agents working together. Each of them is specialize | [finance.yaml](finance.yaml) | Financial research and analysis | | | | ✓ | | [duckduckgo](https://hub.docker.com/mcp/server/duckduckgo/overview) | ✓ | | [shared-todo.yaml](shared-todo.yaml) | Shared todo item manager | | | ✓ | | | | ✓ | | [pr-reviewer-bedrock.yaml](pr-reviewer-bedrock.yaml) | PR review toolkit (Bedrock) | ✓ | ✓ | | | | | ✓ | + +## **Workflow Configurations** + +These examples use the workflow feature: declarative pipelines of agents (sequential, conditional, or parallel). Run with **`cagent exec`** (workflow runs in exec mode only). + +| Name | Description | +|------|-------------| +| [workflow_sequential.yaml](workflow_sequential.yaml) | Sequential pipeline: generator → translator → publisher. Each agent's output is passed to the next. | +| [workflow_parallel.yaml](workflow_parallel.yaml) | Parallel step: two generators run at once; translator receives both outputs. | diff --git a/examples/workflow_parallel.yaml b/examples/workflow_parallel.yaml new file mode 100644 index 000000000..cefafb6b4 --- /dev/null +++ b/examples/workflow_parallel.yaml @@ -0,0 +1,43 @@ +# Sample workflow: parallel step execution +# Run with: cagent exec ./examples/workflow_parallel.yaml "Your prompt" +# +# Two generators run concurrently; translator receives both outputs. + +version: "4" + +models: + default: + provider: google + model: gemini-2.5-pro + max_tokens: 4096 + +agents: + root: + model: default + description: Generates content + instruction: | + Generate content based on the user's request. + Output only the generated content. + + translator: + model: default + description: Combines and refines content from multiple sources + instruction: | + You receive outputs from multiple generator steps (in order). + Combine, deduplicate, or synthesize them into one coherent result. + Output the final result only. + +# Map form: steps + optional max_loop_iterations +# Parallel block: two generators run at once; translator runs after both complete. +workflow: + max_loop_iterations: 100 + steps: + - type: parallel + id: generators + steps: + - type: agent + name: root + - type: agent + name: root + - type: agent + name: translator diff --git a/examples/workflow_sequential.yaml b/examples/workflow_sequential.yaml new file mode 100644 index 000000000..1707811de --- /dev/null +++ b/examples/workflow_sequential.yaml @@ -0,0 +1,45 @@ +# Sample workflow: sequential step execution +# Run with: cagent exec ./examples/workflow_sequential.yaml "Your prompt" +# Or: cagent exec ./examples/workflow_sequential.yaml + +version: "4" + +models: + default: + provider: google + model: gemini-2.5-pro + max_tokens: 4096 + +agents: + root: + model: default + description: Generates initial content + instruction: | + You are the first step in a content pipeline. + Generate clear, structured content based on the user's request. + Output only the generated content; the next agent will process it. + + translator: + model: default + description: Translates or transforms content + instruction: | + You receive content from the previous step. + Translate, summarize, or transform it as requested. + Output only the result; the next agent will use it. + + publisher: + model: default + description: Finalizes and formats output + instruction: | + You receive content from the previous step. + Finalize it: format for publication, add a brief conclusion, and output the final result. + +# Sequential workflow: generator → translator → publisher +# Each agent's output becomes input context for the next. +workflow: + - type: agent + name: root + - type: agent + name: translator + - type: agent + name: publisher diff --git a/pkg/config/latest/types.go b/pkg/config/latest/types.go index aab024847..94b4cba29 100644 --- a/pkg/config/latest/types.go +++ b/pkg/config/latest/types.go @@ -7,6 +7,7 @@ import ( "github.com/goccy/go-yaml" "github.com/docker/cagent/pkg/config/types" + "github.com/docker/cagent/pkg/workflow" ) const Version = "4" @@ -20,6 +21,8 @@ type Config struct { RAG map[string]RAGConfig `json:"rag,omitempty"` Metadata Metadata `json:"metadata,omitempty"` Permissions *PermissionsConfig `json:"permissions,omitempty"` + // Workflow defines optional sequential, conditional, and parallel step execution. + Workflow *workflow.Config `json:"workflow,omitempty" yaml:"workflow,omitempty"` } type Agents []AgentConfig diff --git a/pkg/config/latest/validate.go b/pkg/config/latest/validate.go index 752c29987..77ed5f064 100644 --- a/pkg/config/latest/validate.go +++ b/pkg/config/latest/validate.go @@ -2,7 +2,10 @@ package latest import ( "errors" + "fmt" "strings" + + "github.com/docker/cagent/pkg/workflow" ) func (t *Config) UnmarshalYAML(unmarshal func(any) error) error { @@ -30,9 +33,59 @@ func (t *Config) validate() error { } } + if t.Workflow != nil { + if err := validateWorkflow(t.Workflow, t.Agents); err != nil { + return err + } + } + return nil } +// validateWorkflow ensures workflow step agent names exist in Agents and step types are valid. +func validateWorkflow(cfg *workflow.Config, agents Agents) error { + if cfg == nil { + return nil + } + agentSet := make(map[string]bool) + for _, a := range agents { + agentSet[a.Name] = true + } + var validateSteps func(steps []workflow.Step) error + validateSteps = func(steps []workflow.Step) error { + for i := range steps { + s := &steps[i] + switch s.Type { + case workflow.StepTypeAgent: + if s.Name == "" { + return fmt.Errorf("workflow step[%d]: agent step requires name", i) + } + if !agentSet[s.Name] { + return fmt.Errorf("workflow step[%d]: agent %q not found in agents", i, s.Name) + } + case workflow.StepTypeCondition: + if s.Condition == "" { + return fmt.Errorf("workflow step[%d]: condition step requires condition", i) + } + if err := validateSteps(s.TrueSteps); err != nil { + return err + } + if err := validateSteps(s.FalseSteps); err != nil { + return err + } + case workflow.StepTypeParallel: + if err := validateSteps(s.Steps); err != nil { + return err + } + default: + return fmt.Errorf("workflow step[%d]: unknown type %q", i, s.Type) + } + } + return nil + } + return validateSteps(cfg.Steps) +} + func (t *Toolset) validate() error { // Attributes used on the wrong toolset type. if len(t.Shell) > 0 && t.Type != "script" { diff --git a/pkg/config/latest/validate_test.go b/pkg/config/latest/validate_test.go index c68d95cf1..d686743e1 100644 --- a/pkg/config/latest/validate_test.go +++ b/pkg/config/latest/validate_test.go @@ -189,3 +189,57 @@ agents: }) } } + +func TestValidateWorkflow(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + config string + wantErr string + }{ + { + name: "valid workflow sequential", + config: ` +version: "4" +agents: + generator: + model: openai/gpt-4 + translator: + model: openai/gpt-4 +workflow: + - type: agent + name: generator + - type: agent + name: translator +`, + wantErr: "", + }, + { + name: "workflow agent not found", + config: ` +version: "4" +agents: + generator: + model: openai/gpt-4 +workflow: + - type: agent + name: translator +`, + wantErr: "not found in agents", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + var cfg Config + err := yaml.Unmarshal([]byte(tt.config), &cfg) + if tt.wantErr != "" { + require.Error(t, err) + require.Contains(t, err.Error(), tt.wantErr) + } else { + require.NoError(t, err) + } + }) + } +} diff --git a/pkg/teamloader/teamloader.go b/pkg/teamloader/teamloader.go index 527d17378..899817c10 100644 --- a/pkg/teamloader/teamloader.go +++ b/pkg/teamloader/teamloader.go @@ -21,6 +21,7 @@ import ( "github.com/docker/cagent/pkg/tools" "github.com/docker/cagent/pkg/tools/builtin" "github.com/docker/cagent/pkg/tools/codemode" + "github.com/docker/cagent/pkg/workflow" ) var defaultMaxTokens int64 = 32000 @@ -69,6 +70,8 @@ type LoadResult struct { Providers map[string]latest.ProviderConfig // AgentDefaultModels maps agent names to their configured default model references AgentDefaultModels map[string]string + // Workflow is set when the config defines a workflow; used by run/exec to run the workflow executor. + Workflow *workflow.Config } // Load loads an agent team from the given source @@ -229,6 +232,7 @@ func LoadWithConfig(ctx context.Context, agentSource config.Source, runConfig *c Models: cfg.Models, Providers: cfg.Providers, AgentDefaultModels: agentDefaultModels, + Workflow: cfg.Workflow, }, nil } diff --git a/pkg/workflow/context.go b/pkg/workflow/context.go new file mode 100644 index 000000000..88e104a87 --- /dev/null +++ b/pkg/workflow/context.go @@ -0,0 +1,177 @@ +package workflow + +import ( + "encoding/json" + "strings" + "sync" +) + +// StepContext holds outputs from executed steps for template evaluation and propagation. +// Keys are step IDs; values are either a single StepOutput (sequential/agent) or ParallelOutputs (parallel block). +// Safe for concurrent use (e.g. parallel steps writing different keys). +type StepContext struct { + mu sync.RWMutex + data map[string]any +} + +// NewStepContext returns a new StepContext. +func NewStepContext() StepContext { + return StepContext{data: make(map[string]any)} +} + +// SetAgentOutput records the output of a single agent step by ID. +func (c *StepContext) SetAgentOutput(stepID, output, agentName string) { + if c == nil { + return + } + c.mu.Lock() + defer c.mu.Unlock() + if c.data == nil { + c.data = make(map[string]any) + } + c.data[stepID] = StepOutput{Output: output, Agent: agentName} +} + +// SetParallelOutput records the outputs of a parallel block by its step ID. +func (c *StepContext) SetParallelOutput(stepID string, out *ParallelOutputs) { + if c == nil { + return + } + c.mu.Lock() + defer c.mu.Unlock() + if c.data == nil { + c.data = make(map[string]any) + } + c.data[stepID] = out +} + +// GetOutput returns the StepOutput for a step ID if it is a single agent output. +func (c *StepContext) GetOutput(stepID string) (StepOutput, bool) { + if c == nil { + return StepOutput{}, false + } + c.mu.RLock() + defer c.mu.RUnlock() + v, ok := c.data[stepID] + if !ok { + return StepOutput{}, false + } + so, ok := v.(StepOutput) + return so, ok +} + +// GetParallelOutput returns the ParallelOutputs for a step ID if it is a parallel block. +func (c *StepContext) GetParallelOutput(stepID string) (*ParallelOutputs, bool) { + if c == nil { + return nil, false + } + c.mu.RLock() + defer c.mu.RUnlock() + v, ok := c.data[stepID] + if !ok { + return nil, false + } + po, ok := v.(*ParallelOutputs) + return po, ok +} + +// EvalCondition evaluates a condition string against this context. +// Supports simple template form: {{ $steps..output }} or {{ $steps..output.path }}. +// Returns (value, true) if the expression resolves to a boolean; otherwise (nil, false). +// Full implementation would use a proper expression evaluator; this provides the contract. +func (c *StepContext) EvalCondition(condition string) (bool, bool) { + expr := strings.TrimSpace(condition) + expr = trimTemplateBraces(expr) + if !strings.HasPrefix(expr, "$steps.") { + return false, false + } + // Minimal path: $steps..output or $steps..outputs..output + parts := strings.Split(expr, ".") + if len(parts) < 3 { + return false, false + } + stepID := parts[1] + if len(parts) >= 5 && parts[2] == "outputs" { + // $steps.par_id.outputs.step_id.output + parID := parts[1] + po, ok := c.GetParallelOutput(parID) + if !ok { + return false, false + } + subID := parts[3] + so, ok := po.Steps[subID] + if !ok { + return false, false + } + if len(parts) == 5 && parts[4] == "output" { + return parseBool(so.Output), true + } + return false, false + } + so, ok := c.GetOutput(stepID) + if !ok { + return false, false + } + // $steps..output or $steps..output.path (e.g. is_approved) + if len(parts) == 3 && parts[2] == "output" { + return parseBool(so.Output), true + } + if len(parts) >= 4 && parts[2] == "output" { + // Try to parse so.Output as JSON and read path (e.g. is_approved) + var m map[string]any + if err := json.Unmarshal([]byte(so.Output), &m); err != nil { + return parseBool(so.Output), true + } + v := getPath(m, parts[3:]) + return boolFromAny(v), true + } + return false, false +} + +func trimTemplateBraces(s string) string { + s = strings.TrimSpace(s) + if strings.HasPrefix(s, "{{") { + s = strings.TrimPrefix(s, "{{") + } + if strings.HasSuffix(s, "}}") { + s = strings.TrimSuffix(s, "}}") + } + return strings.TrimSpace(s) +} + +func getPath(m map[string]any, path []string) any { + var v any = m + for _, key := range path { + if v == nil { + return nil + } + mp, ok := v.(map[string]any) + if !ok { + return nil + } + v, ok = mp[key] + if !ok { + return nil + } + } + return v +} + +func parseBool(s string) bool { + s = strings.TrimSpace(strings.ToLower(s)) + return s == "true" || s == "1" || s == "yes" +} + +func boolFromAny(v any) bool { + if v == nil { + return false + } + switch b := v.(type) { + case bool: + return b + case string: + return parseBool(b) + default: + return false + } +} diff --git a/pkg/workflow/context_test.go b/pkg/workflow/context_test.go new file mode 100644 index 000000000..66d907170 --- /dev/null +++ b/pkg/workflow/context_test.go @@ -0,0 +1,23 @@ +package workflow + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestStepContext_EvalCondition(t *testing.T) { + t.Parallel() + ctx := NewStepContext() + ctx.SetAgentOutput("qa", `{"is_approved": true}`, "qa_agent") + + ok, resolved := ctx.EvalCondition("{{ $steps.qa.output.is_approved }}") + require.True(t, resolved) + assert.True(t, ok) + + ctx.SetAgentOutput("qa", `{"is_approved": false}`, "qa_agent") + ok, resolved = ctx.EvalCondition("{{ $steps.qa.output.is_approved }}") + require.True(t, resolved) + assert.False(t, ok) +} diff --git a/pkg/workflow/loop_counter.go b/pkg/workflow/loop_counter.go new file mode 100644 index 000000000..232999ba4 --- /dev/null +++ b/pkg/workflow/loop_counter.go @@ -0,0 +1,40 @@ +package workflow + +import ( + "context" + "fmt" + "sync" +) + +type loopCounterKey struct{} + +type loopCounter struct { + mu sync.Mutex + counts map[string]int + maxPerID int +} + +// NewLoopCounter attaches a loop counter to ctx. maxIterations is the maximum +// number of times any single step ID can be executed (loop back-edges). +func NewLoopCounter(ctx context.Context, maxIterations int) context.Context { + return context.WithValue(ctx, loopCounterKey{}, &loopCounter{ + counts: make(map[string]int), + maxPerID: maxIterations, + }) +} + +// IncLoopCounter increments the execution count for stepID and returns an error +// if the count exceeds the configured maximum (prevents infinite loops). +func IncLoopCounter(ctx context.Context, stepID string) error { + lc, ok := ctx.Value(loopCounterKey{}).(*loopCounter) + if !ok { + return nil + } + lc.mu.Lock() + defer lc.mu.Unlock() + lc.counts[stepID]++ + if lc.counts[stepID] > lc.maxPerID { + return fmt.Errorf("workflow: max loop iterations exceeded (step: %s, limit: %d)", stepID, lc.maxPerID) + } + return nil +} diff --git a/pkg/workflow/types.go b/pkg/workflow/types.go new file mode 100644 index 000000000..e4f61124c --- /dev/null +++ b/pkg/workflow/types.go @@ -0,0 +1,107 @@ +package workflow + +import "fmt" + +// StepType identifies the kind of workflow step. +type StepType string + +const ( + StepTypeAgent StepType = "agent" + StepTypeCondition StepType = "condition" + StepTypeParallel StepType = "parallel" +) + +// Step represents a single workflow step (agent, condition, or parallel block). +// Steps are defined in config and executed by the workflow executor. +type Step struct { + // ID is a unique identifier for this step. Used for output access (e.g. $steps..output) + // and loop detection. If empty, the executor may assign one (e.g. by index). + ID string `json:"id,omitempty" yaml:"id,omitempty"` + + // Type is one of: agent, condition, parallel. + Type StepType `json:"type" yaml:"type"` + + // Name is the agent name (for type=agent). Must reference an agent in config. + Name string `json:"name,omitempty" yaml:"name,omitempty"` + + // Condition is the expression for type=condition (e.g. "{{ $steps.qa.output.is_approved }}"). + // Evaluated after referenced steps have run; must resolve to a boolean. + Condition string `json:"condition,omitempty" yaml:"condition,omitempty"` + + // TrueSteps are executed when condition evaluates to true. + TrueSteps []Step `json:"true,omitempty" yaml:"true,omitempty"` + + // FalseSteps are executed when condition evaluates to false. + FalseSteps []Step `json:"false,omitempty" yaml:"false,omitempty"` + + // Steps are the child steps for type=parallel. All run concurrently. + Steps []Step `json:"steps,omitempty" yaml:"steps,omitempty"` + + // Retry configures per-step retry on failure. + Retry *RetryConfig `json:"retry,omitempty" yaml:"retry,omitempty"` +} + +// RetryConfig configures retry behavior for a step (agent or parallel block). +type RetryConfig struct { + // MaxAttempts is the maximum number of attempts (including the first). Default 0 = no retry. + MaxAttempts int `json:"max_attempts" yaml:"max_attempts"` + // Backoff is "fixed" (constant delay) or "exponential". Optional. + Backoff string `json:"backoff,omitempty" yaml:"backoff,omitempty"` + // InitialDelaySeconds is the delay before first retry. Used with Backoff. + InitialDelaySeconds int `json:"initial_delay_seconds,omitempty" yaml:"initial_delay_seconds,omitempty"` + // On lists error patterns to retry on (e.g. ["timeout", "rate_limit"]). Empty = retry on any error. + On []string `json:"on,omitempty" yaml:"on,omitempty"` +} + +// Config holds workflow-level settings and the root steps. +type Config struct { + // Steps are the top-level workflow steps (sequential by default). + Steps []Step `json:"steps,omitempty" yaml:"steps,omitempty"` + + // MaxLoopIterations is the maximum number of times a step can be re-executed + // due to a conditional back-edge (loop). Default 100. Prevents infinite loops. + MaxLoopIterations int `json:"max_loop_iterations,omitempty" yaml:"max_loop_iterations,omitempty"` +} + +// UnmarshalYAML allows workflow to be specified as a list (steps only) or a map (steps + max_loop_iterations). +func (c *Config) UnmarshalYAML(unmarshal func(any) error) error { + var listForm []Step + if err := unmarshal(&listForm); err == nil { + c.Steps = listForm + return nil + } + type rawConfig Config + var mapForm rawConfig + if err := unmarshal(&mapForm); err != nil { + return fmt.Errorf("workflow: expected a list of steps or a map with 'steps' and optional 'max_loop_iterations': %w", err) + } + *c = Config(mapForm) + return nil +} + +// DefaultMaxLoopIterations is the default cap for loop iterations when not set in config. +const DefaultMaxLoopIterations = 100 + +// StepOutput holds the output of a single step (e.g. last assistant message content). +type StepOutput struct { + // Output is the last assistant message content from the step. + Output string `json:"output"` + // Agent is the agent name that produced this output (for type=agent). + Agent string `json:"agent,omitempty"` +} + +// ParallelOutputs is the structure passed to the next step after a parallel block. +// Keys are step IDs; order preserves deterministic indexing (e.g. outputs[0]). +type ParallelOutputs struct { + Steps map[string]StepOutput `json:"steps"` + Order []string `json:"order"` +} + +// GetByIndex returns the StepOutput at index i (using Order). Returns zero value if out of range. +func (p *ParallelOutputs) GetByIndex(i int) StepOutput { + if p == nil || i < 0 || i >= len(p.Order) { + return StepOutput{} + } + id := p.Order[i] + return p.Steps[id] +} diff --git a/pkg/workflowrun/executor.go b/pkg/workflowrun/executor.go new file mode 100644 index 000000000..552bdffa3 --- /dev/null +++ b/pkg/workflowrun/executor.go @@ -0,0 +1,199 @@ +package workflowrun + +import ( + "context" + "fmt" + "sync" + + "github.com/docker/cagent/pkg/runtime" + "github.com/docker/cagent/pkg/session" + "github.com/docker/cagent/pkg/workflow" +) + +// Event is the type of events emitted during workflow execution. +// The executor sends runtime.Event values on the channel. +type Event = any + +// Executor runs a workflow: sequential, conditional, and parallel steps. +// It drives the runtime (RunStream per agent step), maintains step outputs, +// evaluates conditions, and enforces max loop iterations. +type Executor interface { + // Run executes the workflow with the given session (initial user message) and sends events to the channel. + Run(ctx context.Context, cfg *workflow.Config, sess *session.Session, events chan Event) error +} + +// Runner is the minimal runtime interface needed to run agent steps. +// Callers pass a runtime.Runtime (or adapter) that implements Runner. +type Runner interface { + CurrentAgentName() string + SetCurrentAgent(agentName string) error + RunStream(ctx context.Context, sess *session.Session) <-chan runtime.Event +} + +// LocalExecutor executes workflows using a LocalRuntime (or any Runner). +type LocalExecutor struct { + Runner Runner +} + +// NewLocalExecutor returns an executor that uses the given Runner. +func NewLocalExecutor(r Runner) *LocalExecutor { + return &LocalExecutor{Runner: r} +} + +// Run executes the workflow. Sequential steps run in order; conditional steps +// evaluate and run true/false branches; parallel steps run concurrently and +// all must succeed before the next sequential step. +func (e *LocalExecutor) Run(ctx context.Context, cfg *workflow.Config, sess *session.Session, events chan Event) error { + if cfg == nil || len(cfg.Steps) == 0 { + return fmt.Errorf("workflow: no steps configured") + } + maxLoop := cfg.MaxLoopIterations + if maxLoop <= 0 { + maxLoop = workflow.DefaultMaxLoopIterations + } + ctx = workflow.NewLoopCounter(ctx, maxLoop) + stepCtx := workflow.NewStepContext() + return e.runSteps(ctx, cfg.Steps, &stepCtx, sess, events) +} + +func (e *LocalExecutor) runSteps(ctx context.Context, steps []workflow.Step, stepCtx *workflow.StepContext, sess *session.Session, events chan Event) error { + for i := range steps { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + if err := e.runStep(ctx, &steps[i], stepCtx, sess, events); err != nil { + return err + } + } + return nil +} + +func (e *LocalExecutor) runStep(ctx context.Context, step *workflow.Step, stepCtx *workflow.StepContext, sess *session.Session, events chan Event) error { + stepID := step.ID + if stepID == "" { + stepID = fmt.Sprintf("step_%s", step.Type) + } + switch step.Type { + case workflow.StepTypeAgent: + return e.runAgentStep(ctx, stepID, step, stepCtx, sess, events) + case workflow.StepTypeCondition: + return e.runConditionStep(ctx, stepID, step, stepCtx, sess, events) + case workflow.StepTypeParallel: + return e.runParallelStep(ctx, stepID, step, stepCtx, sess, events) + default: + return fmt.Errorf("workflow: unknown step type %q", step.Type) + } +} + +func (e *LocalExecutor) runAgentStep(ctx context.Context, stepID string, step *workflow.Step, stepCtx *workflow.StepContext, sess *session.Session, events chan Event) error { + if err := workflow.IncLoopCounter(ctx, stepID); err != nil { + return err + } + if err := e.Runner.SetCurrentAgent(step.Name); err != nil { + return fmt.Errorf("workflow: set agent %q: %w", step.Name, err) + } + runSess := e.buildSessionForStep(step, stepCtx, sess) + for ev := range e.Runner.RunStream(ctx, runSess) { + select { + case events <- ev: + case <-ctx.Done(): + return ctx.Err() + } + } + var lastOutput string + if runSess != nil { + lastOutput = runSess.GetLastAssistantMessageContent() + } + stepCtx.SetAgentOutput(stepID, lastOutput, step.Name) + return nil +} + +func (e *LocalExecutor) buildSessionForStep(step *workflow.Step, stepCtx *workflow.StepContext, initial *session.Session) *session.Session { + opts := []session.Opt{ + session.WithMaxIterations(initial.MaxIterations), + session.WithToolsApproved(initial.ToolsApproved), + session.WithThinking(initial.Thinking), + session.WithSendUserMessage(true), + } + var hasUserMessage bool + if initial != nil && initial.Messages != nil { + for _, item := range initial.Messages { + if item.IsMessage() && item.Message.Message.Role == "user" { + opts = append(opts, session.WithUserMessage(item.Message.Message.Content)) + hasUserMessage = true + break + } + } + } + if !hasUserMessage { + opts = append(opts, session.WithUserMessage("Please proceed with the workflow step.")) + } + return session.New(opts...) +} + +func (e *LocalExecutor) runConditionStep(ctx context.Context, stepID string, step *workflow.Step, stepCtx *workflow.StepContext, sess *session.Session, events chan Event) error { + ok, resolved := stepCtx.EvalCondition(step.Condition) + if !resolved { + return fmt.Errorf("workflow: condition did not resolve to boolean: %q", step.Condition) + } + if ok { + return e.runSteps(ctx, step.TrueSteps, stepCtx, sess, events) + } + return e.runSteps(ctx, step.FalseSteps, stepCtx, sess, events) +} + +func (e *LocalExecutor) runParallelStep(ctx context.Context, stepID string, step *workflow.Step, stepCtx *workflow.StepContext, sess *session.Session, events chan Event) error { + if len(step.Steps) == 0 { + return nil + } + var wg sync.WaitGroup + outputs := make(map[string]workflow.StepOutput) + order := make([]string, 0, len(step.Steps)) + var mu sync.Mutex + var firstErr error + for i := range step.Steps { + child := &step.Steps[i] + childID := child.ID + if childID == "" { + childID = fmt.Sprintf("%s_%d", stepID, i) + } + order = append(order, childID) + stepCopy := *child + stepCopy.ID = childID + wg.Add(1) + go func(s *workflow.Step, id string) { + defer wg.Done() + subEvents := make(chan Event, 128) + err := e.runStep(ctx, s, stepCtx, sess, subEvents) + if err != nil { + mu.Lock() + if firstErr == nil { + firstErr = err + } + mu.Unlock() + return + } + if so, ok := stepCtx.GetOutput(id); ok { + mu.Lock() + outputs[id] = so + mu.Unlock() + } + close(subEvents) + for ev := range subEvents { + select { + case events <- ev: + case <-ctx.Done(): + return + } + } + }(&stepCopy, childID) + } + wg.Wait() + if firstErr != nil { + return firstErr + } + stepCtx.SetParallelOutput(stepID, &workflow.ParallelOutputs{Steps: outputs, Order: order}) + return nil +} From f08bd6009d7e52ff7764354ce4f9ba2beb09a167 Mon Sep 17 00:00:00 2001 From: Rohan Naik Date: Fri, 20 Feb 2026 08:43:28 +0530 Subject: [PATCH 2/4] feat: Switch default model provider to OpenAI's gpt-4o-mini and add OpenAI API key. --- examples/workflow_sequential.yaml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/examples/workflow_sequential.yaml b/examples/workflow_sequential.yaml index 1707811de..94e91ca65 100644 --- a/examples/workflow_sequential.yaml +++ b/examples/workflow_sequential.yaml @@ -6,8 +6,8 @@ version: "4" models: default: - provider: google - model: gemini-2.5-pro + provider: openai + model: gpt-4o-mini max_tokens: 4096 agents: From 603c50a0474fe03e64dbe6a4457747a9ea8c6f8d Mon Sep 17 00:00:00 2001 From: Rohan Naik Date: Fri, 20 Feb 2026 10:00:53 +0530 Subject: [PATCH 3/4] feat: inject prior step outputs into agent context, serialize parallel agent calls, and add concurrency safety to the SQLite session store. --- cmd/root/run.go | 2 +- examples/workflow_parallel.yaml | 4 +- pkg/session/store.go | 22 ++++++ pkg/workflow/context.go | 14 ++++ pkg/workflowrun/executor.go | 117 ++++++++++++++++++++++++++------ 5 files changed, 134 insertions(+), 25 deletions(-) diff --git a/cmd/root/run.go b/cmd/root/run.go index 21b61fecb..6d098b60c 100644 --- a/cmd/root/run.go +++ b/cmd/root/run.go @@ -435,7 +435,7 @@ func (f *runExecFlags) runExecWorkflow(ctx context.Context, out *cli.Printer, rt events := make(chan workflowrun.Event, 128) go func() { defer close(events) - if err := exec.Run(ctx, f.workflowConfig, sess, events); err != nil { + if _, err := exec.Run(ctx, f.workflowConfig, sess, events); err != nil { events <- runtime.Error(err.Error()) } }() diff --git a/examples/workflow_parallel.yaml b/examples/workflow_parallel.yaml index cefafb6b4..1448d5437 100644 --- a/examples/workflow_parallel.yaml +++ b/examples/workflow_parallel.yaml @@ -7,8 +7,8 @@ version: "4" models: default: - provider: google - model: gemini-2.5-pro + provider: openai + model: gpt-4o-mini max_tokens: 4096 agents: diff --git a/pkg/session/store.go b/pkg/session/store.go index ad0583e02..707875ade 100644 --- a/pkg/session/store.go +++ b/pkg/session/store.go @@ -9,6 +9,7 @@ import ( "log/slog" "os" "strconv" + "sync" "time" "github.com/docker/cagent/pkg/chat" @@ -242,6 +243,7 @@ type querier interface { // SQLiteSessionStore implements Store using SQLite type SQLiteSessionStore struct { + mu sync.Mutex db *sql.DB } @@ -394,6 +396,8 @@ func backupDatabase(path string) error { // AddSession adds a new session to the store, including any messages func (s *SQLiteSessionStore) AddSession(ctx context.Context, session *Session) error { + s.mu.Lock() + defer s.mu.Unlock() if session.ID == "" { return ErrEmptyID } @@ -793,6 +797,8 @@ func (s *SQLiteSessionStore) GetSessionSummaries(ctx context.Context) ([]Summary // DeleteSession deletes a session by ID func (s *SQLiteSessionStore) DeleteSession(ctx context.Context, id string) error { + s.mu.Lock() + defer s.mu.Unlock() if id == "" { return ErrEmptyID } @@ -818,6 +824,8 @@ func (s *SQLiteSessionStore) DeleteSession(ctx context.Context, id string) error // Only metadata is modified - use AddMessage, AddSubSession, AddSummary for items. // Messages are persisted separately via events to avoid duplication. func (s *SQLiteSessionStore) UpdateSession(ctx context.Context, session *Session) error { + s.mu.Lock() + defer s.mu.Unlock() if session.ID == "" { return ErrEmptyID } @@ -898,6 +906,8 @@ func (s *SQLiteSessionStore) UpdateSession(ctx context.Context, session *Session // SetSessionStarred sets the starred status of a session. func (s *SQLiteSessionStore) SetSessionStarred(ctx context.Context, id string, starred bool) error { + s.mu.Lock() + defer s.mu.Unlock() if id == "" { return ErrEmptyID } @@ -927,6 +937,8 @@ func (s *SQLiteSessionStore) Close() error { // AddMessage adds a message to a session at the next position. // Returns the ID of the created message item. func (s *SQLiteSessionStore) AddMessage(ctx context.Context, sessionID string, msg *Message) (int64, error) { + s.mu.Lock() + defer s.mu.Unlock() if sessionID == "" { return 0, ErrEmptyID } @@ -961,6 +973,8 @@ func (s *SQLiteSessionStore) AddMessage(ctx context.Context, sessionID string, m // UpdateMessage updates an existing message by its ID. func (s *SQLiteSessionStore) UpdateMessage(ctx context.Context, messageID int64, msg *Message) error { + s.mu.Lock() + defer s.mu.Unlock() msgJSON, err := json.Marshal(msg.Message) if err != nil { return fmt.Errorf("marshaling message: %w", err) @@ -997,6 +1011,8 @@ func (s *SQLiteSessionStore) UpdateMessage(ctx context.Context, messageID int64, // AddSubSession creates a sub-session and links it to the parent. func (s *SQLiteSessionStore) AddSubSession(ctx context.Context, parentSessionID string, subSession *Session) error { + s.mu.Lock() + defer s.mu.Unlock() if parentSessionID == "" || subSession.ID == "" { return ErrEmptyID } @@ -1138,6 +1154,8 @@ func (s *SQLiteSessionStore) addItemTx(ctx context.Context, tx *sql.Tx, sessionI // AddSummary adds a summary item to a session at the next position. func (s *SQLiteSessionStore) AddSummary(ctx context.Context, sessionID, summary string) error { + s.mu.Lock() + defer s.mu.Unlock() if sessionID == "" { return ErrEmptyID } @@ -1160,6 +1178,8 @@ func (s *SQLiteSessionStore) AddSummary(ctx context.Context, sessionID, summary // UpdateSessionTokens updates only token/cost fields. func (s *SQLiteSessionStore) UpdateSessionTokens(ctx context.Context, sessionID string, inputTokens, outputTokens int64, cost float64) error { + s.mu.Lock() + defer s.mu.Unlock() if sessionID == "" { return ErrEmptyID } @@ -1171,6 +1191,8 @@ func (s *SQLiteSessionStore) UpdateSessionTokens(ctx context.Context, sessionID // UpdateSessionTitle updates only the title. func (s *SQLiteSessionStore) UpdateSessionTitle(ctx context.Context, sessionID, title string) error { + s.mu.Lock() + defer s.mu.Unlock() if sessionID == "" { return ErrEmptyID } diff --git a/pkg/workflow/context.go b/pkg/workflow/context.go index 88e104a87..4f56df005 100644 --- a/pkg/workflow/context.go +++ b/pkg/workflow/context.go @@ -19,6 +19,20 @@ func NewStepContext() StepContext { return StepContext{data: make(map[string]any)} } +// Snapshot returns a shallow copy of the internal data map for serialization/debugging. +func (c *StepContext) Snapshot() map[string]any { + if c == nil { + return nil + } + c.mu.RLock() + defer c.mu.RUnlock() + out := make(map[string]any, len(c.data)) + for k, v := range c.data { + out[k] = v + } + return out +} + // SetAgentOutput records the output of a single agent step by ID. func (c *StepContext) SetAgentOutput(stepID, output, agentName string) { if c == nil { diff --git a/pkg/workflowrun/executor.go b/pkg/workflowrun/executor.go index 552bdffa3..bba62778b 100644 --- a/pkg/workflowrun/executor.go +++ b/pkg/workflowrun/executor.go @@ -2,7 +2,10 @@ package workflowrun import ( "context" + "encoding/json" "fmt" + "os" + "strings" "sync" "github.com/docker/cagent/pkg/runtime" @@ -19,7 +22,7 @@ type Event = any // evaluates conditions, and enforces max loop iterations. type Executor interface { // Run executes the workflow with the given session (initial user message) and sends events to the channel. - Run(ctx context.Context, cfg *workflow.Config, sess *session.Session, events chan Event) error + Run(ctx context.Context, cfg *workflow.Config, sess *session.Session, events chan Event) (*workflow.StepContext, error) } // Runner is the minimal runtime interface needed to run agent steps. @@ -33,6 +36,10 @@ type Runner interface { // LocalExecutor executes workflows using a LocalRuntime (or any Runner). type LocalExecutor struct { Runner Runner + // runnerMu serializes SetCurrentAgent + RunStream calls so the Runner's + // internal goroutine captures the correct agent name before the next + // parallel step changes it. + runnerMu sync.Mutex } // NewLocalExecutor returns an executor that uses the given Runner. @@ -43,9 +50,9 @@ func NewLocalExecutor(r Runner) *LocalExecutor { // Run executes the workflow. Sequential steps run in order; conditional steps // evaluate and run true/false branches; parallel steps run concurrently and // all must succeed before the next sequential step. -func (e *LocalExecutor) Run(ctx context.Context, cfg *workflow.Config, sess *session.Session, events chan Event) error { +func (e *LocalExecutor) Run(ctx context.Context, cfg *workflow.Config, sess *session.Session, events chan Event) (*workflow.StepContext, error) { if cfg == nil || len(cfg.Steps) == 0 { - return fmt.Errorf("workflow: no steps configured") + return nil, fmt.Errorf("workflow: no steps configured") } maxLoop := cfg.MaxLoopIterations if maxLoop <= 0 { @@ -53,7 +60,14 @@ func (e *LocalExecutor) Run(ctx context.Context, cfg *workflow.Config, sess *ses } ctx = workflow.NewLoopCounter(ctx, maxLoop) stepCtx := workflow.NewStepContext() - return e.runSteps(ctx, cfg.Steps, &stepCtx, sess, events) + err := e.runSteps(ctx, cfg.Steps, &stepCtx, sess, events) + + // Print step context for debugging. + if b, jerr := json.MarshalIndent(stepCtx.Snapshot(), "", " "); jerr == nil { + fmt.Fprintf(os.Stderr, "\n--- Step Context ---\n%s\n", string(b)) + } + + return &stepCtx, err } func (e *LocalExecutor) runSteps(ctx context.Context, steps []workflow.Step, stepCtx *workflow.StepContext, sess *session.Session, events chan Event) error { @@ -91,11 +105,20 @@ func (e *LocalExecutor) runAgentStep(ctx context.Context, stepID string, step *w if err := workflow.IncLoopCounter(ctx, stepID); err != nil { return err } + + runSess := e.buildSessionForStep(step, stepCtx, sess) + + // Protect SetCurrentAgent + RunStream so the Runner's internal goroutine + // captures the correct agent before another parallel step changes it. + e.runnerMu.Lock() if err := e.Runner.SetCurrentAgent(step.Name); err != nil { + e.runnerMu.Unlock() return fmt.Errorf("workflow: set agent %q: %w", step.Name, err) } - runSess := e.buildSessionForStep(step, stepCtx, sess) - for ev := range e.Runner.RunStream(ctx, runSess) { + eventsCh := e.Runner.RunStream(ctx, runSess) + e.runnerMu.Unlock() + + for ev := range eventsCh { select { case events <- ev: case <-ctx.Done(): @@ -117,22 +140,73 @@ func (e *LocalExecutor) buildSessionForStep(step *workflow.Step, stepCtx *workfl session.WithThinking(initial.Thinking), session.WithSendUserMessage(true), } - var hasUserMessage bool + + // Build the user message: original user prompt + context from prior steps. + var userMsg string if initial != nil && initial.Messages != nil { for _, item := range initial.Messages { if item.IsMessage() && item.Message.Message.Role == "user" { - opts = append(opts, session.WithUserMessage(item.Message.Message.Content)) - hasUserMessage = true + userMsg = item.Message.Message.Content break } } } - if !hasUserMessage { - opts = append(opts, session.WithUserMessage("Please proceed with the workflow step.")) + if userMsg == "" { + userMsg = "Please proceed with the workflow step." + } + + // Inject prior step outputs as context for the current step. + if prior := buildPriorContext(stepCtx); prior != "" { + userMsg = prior + "\n\n" + userMsg } + + opts = append(opts, session.WithUserMessage(userMsg)) return session.New(opts...) } +// buildPriorContext formats all prior step outputs into a context block +// that is injected into the next step's user message. +func buildPriorContext(stepCtx *workflow.StepContext) string { + snapshot := stepCtx.Snapshot() + if len(snapshot) == 0 { + return "" + } + + var sb strings.Builder + sb.WriteString("--- Prior Step Outputs ---") + for id, v := range snapshot { + switch out := v.(type) { + case workflow.StepOutput: + if out.Output != "" { + sb.WriteString("\n\n[") + sb.WriteString(id) + sb.WriteString(" (agent: ") + sb.WriteString(out.Agent) + sb.WriteString(")]:\n") + sb.WriteString(out.Output) + } + case *workflow.ParallelOutputs: + if out != nil { + for _, subID := range out.Order { + so := out.Steps[subID] + if so.Output != "" { + sb.WriteString("\n\n[") + sb.WriteString(id) + sb.WriteString("/") + sb.WriteString(subID) + sb.WriteString(" (agent: ") + sb.WriteString(so.Agent) + sb.WriteString(")]:\n") + sb.WriteString(so.Output) + } + } + } + } + } + sb.WriteString("\n\n--- End Prior Step Outputs ---") + return sb.String() +} + func (e *LocalExecutor) runConditionStep(ctx context.Context, stepID string, step *workflow.Step, stepCtx *workflow.StepContext, sess *session.Session, events chan Event) error { ok, resolved := stepCtx.EvalCondition(step.Condition) if !resolved { @@ -148,11 +222,13 @@ func (e *LocalExecutor) runParallelStep(ctx context.Context, stepID string, step if len(step.Steps) == 0 { return nil } + var wg sync.WaitGroup outputs := make(map[string]workflow.StepOutput) order := make([]string, 0, len(step.Steps)) var mu sync.Mutex var firstErr error + for i := range step.Steps { child := &step.Steps[i] childID := child.ID @@ -165,9 +241,14 @@ func (e *LocalExecutor) runParallelStep(ctx context.Context, stepID string, step wg.Add(1) go func(s *workflow.Step, id string) { defer wg.Done() - subEvents := make(chan Event, 128) - err := e.runStep(ctx, s, stepCtx, sess, subEvents) - if err != nil { + + // Each parallel goroutine gets its own sub-session. + // PersistentRuntime skips all persistence for sub-sessions, + // avoiding concurrent SQLite writes. + subSess := e.buildSessionForStep(s, stepCtx, sess) + subSess.ParentID = sess.ID + + if err := e.runAgentStep(ctx, id, s, stepCtx, subSess, events); err != nil { mu.Lock() if firstErr == nil { firstErr = err @@ -180,14 +261,6 @@ func (e *LocalExecutor) runParallelStep(ctx context.Context, stepID string, step outputs[id] = so mu.Unlock() } - close(subEvents) - for ev := range subEvents { - select { - case events <- ev: - case <-ctx.Done(): - return - } - } }(&stepCopy, childID) } wg.Wait() From a645edf15d8cc629924383f5bdeead0ab46935e3 Mon Sep 17 00:00:00 2001 From: Rohan Naik Date: Fri, 20 Feb 2026 10:12:41 +0530 Subject: [PATCH 4/4] feat: Implement automatic context propagation and enhanced concurrency safety for workflow execution, add `OPENAI_API_KEY` to `.env`, and update workflow documentation. --- docs/workflow-module.md | 53 ++++++++++++++++++++++++++++++++++------- 1 file changed, 45 insertions(+), 8 deletions(-) diff --git a/docs/workflow-module.md b/docs/workflow-module.md index a258bc703..b2b52e397 100644 --- a/docs/workflow-module.md +++ b/docs/workflow-module.md @@ -25,10 +25,23 @@ workflow: **Behavior:** - `generator` runs first and completes. -- `translator` receives `generator`'s output and processes it. -- `publisher` receives `translator`'s output and finalizes. +- `translator` receives `generator`'s output as context in its prompt and processes it. +- `publisher` receives `translator`'s output as context and finalizes. -**Output propagation:** Step `n` receives a single **previous output** (the last assistant message content from step `n-1`), exposed as `{{ $steps..output }}` or by position. +**Output propagation:** Each step automatically receives **all prior step outputs** injected as context into its user message. The executor collects the last assistant message content from each completed step and formats it as a structured context block: + +``` +--- Prior Step Outputs --- + +[step_id (agent: generator)]: + + +--- End Prior Step Outputs --- + + +``` + +Outputs are also accessible via template expressions: `{{ $steps..output }}`. --- @@ -95,9 +108,14 @@ workflow: **Behavior:** -- Two `generator` agents run simultaneously. +- Two `generator` agents run concurrently in separate goroutines. - Both must complete before `translator` starts. -- `translator` receives **outputs from all parallel steps** (see "Output structure from parallel steps" below). +- `translator` receives **outputs from all parallel steps** as context in its prompt (see "Output structure from parallel steps" below). + +**Concurrency safety:** Parallel steps use two mechanisms to avoid races: +1. A **`runnerMu` mutex** on the executor serializes `SetCurrentAgent` + `RunStream` calls so each goroutine's internal runtime captures the correct agent name. +2. Each parallel goroutine uses a **sub-session** (`ParentID` set), causing `PersistentRuntime` to skip all SQLite persistence for those sessions. +3. The **`SQLiteSessionStore`** has a `sync.Mutex` on all write methods as an additional safety net. **Error handling:** If **any** agent in a parallel block fails, the **entire workflow** fails immediately (all-or-nothing). No partial success; this keeps data consistency and avoids downstream agents seeing incomplete data. @@ -159,9 +177,22 @@ workflow: } ``` -- **Next step input:** The next agent receives a single **context message** (e.g. user or system) that includes this structure (e.g. serialized as JSON or YAML in the prompt), so the agent can see all parallel outputs. +- **Next step input:** The next agent receives all parallel outputs injected as context in its user message: + ``` + --- Prior Step Outputs --- + + [par_gen/gen_1 (agent: generator)]: + + + [par_gen/gen_2 (agent: generator)]: + + + --- End Prior Step Outputs --- + + + ``` - **Templates:** In conditions or in agent instructions, parallel outputs are accessed as: - - `{{ $steps.par_gen.outputs.gen_1.output }}` + - `{{ $steps.par_gen.outputs.gen_1.output }}` — output of parallel step `gen_1` - `{{ $steps.par_gen.outputs.gen_2.output }}` - Or by index: `{{ $steps.par_gen.outputs[0].output }}` (using `order` for deterministic indexing). @@ -238,7 +269,13 @@ Workflow execution is **only** wired for **exec** mode. The `run` command (TUI) ## Implementation Notes - **Types:** `pkg/workflow` holds workflow and step types (Config, Step, StepContext, loop counter, condition evaluation). No dependency on runtime or session to avoid import cycles. -- **Executor:** `pkg/workflowrun` holds the executor: runs the workflow DAG (sequential/conditional/parallel), calls runtime `RunStream` per agent step, maintains step outputs and loop counters, evaluates conditions, and injects output context into sessions. Use `workflowrun.NewLocalExecutor(runtime)` and `Executor.Run(ctx, cfg, sess, events)`. + - `StepContext` is concurrency-safe (`sync.RWMutex`) and exposes a `Snapshot()` method for serialization/debugging. +- **Executor:** `pkg/workflowrun` holds the executor: runs the workflow DAG (sequential/conditional/parallel), calls runtime `RunStream` per agent step, maintains step outputs and loop counters, evaluates conditions, and injects output context into sessions. + - Use `workflowrun.NewLocalExecutor(runtime)` and `Executor.Run(ctx, cfg, sess, events)` which returns `(*workflow.StepContext, error)`. + - After execution, the step context is printed to stderr as formatted JSON for debugging (`--- Step Context ---`). + - **Context propagation:** `buildPriorContext()` collects all prior step outputs and injects them as a structured text block into the next step's user message. + - **Parallel safety:** `runnerMu` serializes `SetCurrentAgent` + `RunStream` to prevent agent name races; sub-sessions skip SQLite persistence. +- **Session Store:** `SQLiteSessionStore` has a `sync.Mutex` protecting all write methods (`AddMessage`, `UpdateMessage`, `AddSession`, `UpdateSession`, etc.) to prevent concurrent write panics. - **Config:** Workflow config lives in `pkg/config/latest` as `Config.Workflow` (type `*workflow.Config`). Validation in `validate.go` ensures agent names exist, step types are valid, and condition steps have a condition expression. - **CLI:** When `Config.Workflow` is set, `cagent exec` uses the workflow executor and streams events to stdout; `cagent run` (TUI) still uses single-agent mode.