From 88657535595a75414357b54ca9f7ad88bef7ffa1 Mon Sep 17 00:00:00 2001
From: Pratyush Sharma <56130065+pratyush618@users.noreply.github.com>
Date: Sun, 3 May 2026 02:41:33 +0530
Subject: [PATCH] docs(docs-next): port guides/workflows (phase 4h)
---
.../docs/guides/workflows/analysis.mdx | 153 ++++++++++++++++++
.../docs/guides/workflows/building.mdx | 140 ++++++++++++++++
.../content/docs/guides/workflows/caching.mdx | 77 +++++++++
.../content/docs/guides/workflows/canvas.mdx | 128 +++++++++++++++
.../docs/guides/workflows/composition.mdx | 97 +++++++++++
.../docs/guides/workflows/conditions.mdx | 132 +++++++++++++++
.../content/docs/guides/workflows/fan-out.mdx | 117 ++++++++++++++
.../content/docs/guides/workflows/gates.mdx | 79 +++++++++
.../content/docs/guides/workflows/index.mdx | 60 ++++++-
.../content/docs/guides/workflows/meta.json | 12 +-
10 files changed, 989 insertions(+), 6 deletions(-)
create mode 100644 docs-next/content/docs/guides/workflows/analysis.mdx
create mode 100644 docs-next/content/docs/guides/workflows/building.mdx
create mode 100644 docs-next/content/docs/guides/workflows/caching.mdx
create mode 100644 docs-next/content/docs/guides/workflows/canvas.mdx
create mode 100644 docs-next/content/docs/guides/workflows/composition.mdx
create mode 100644 docs-next/content/docs/guides/workflows/conditions.mdx
create mode 100644 docs-next/content/docs/guides/workflows/fan-out.mdx
create mode 100644 docs-next/content/docs/guides/workflows/gates.mdx
diff --git a/docs-next/content/docs/guides/workflows/analysis.mdx b/docs-next/content/docs/guides/workflows/analysis.mdx
new file mode 100644
index 0000000..64588fb
--- /dev/null
+++ b/docs-next/content/docs/guides/workflows/analysis.mdx
@@ -0,0 +1,153 @@
+---
+title: Analysis & Visualization
+description: "ancestors / descendants / topological_levels / critical_path / bottleneck_analysis / Mermaid + DOT rendering."
+---
+
+import { Tab, Tabs } from "fumadocs-ui/components/tabs";
+
+Analyze the workflow DAG before execution and render diagrams with live
+status.
+
+## Graph inspection
+
+```python
+wf = Workflow(name="pipeline")
+wf.step("a", task_a)
+wf.step("b", task_b, after="a")
+wf.step("c", task_c, after="a")
+wf.step("d", task_d, after=["b", "c"])
+
+wf.ancestors("d") # ["a", "b", "c"]
+wf.descendants("a") # ["b", "c", "d"]
+wf.topological_levels() # [["a"], ["b", "c"], ["d"]]
+wf.stats()
+# {"nodes": 4, "edges": 4, "depth": 3, "width": 2, "density": 0.6667}
+```
+
+| Method | Returns | Description |
+|--------|---------|-------------|
+| `ancestors(node)` | `list[str]` | All transitive predecessors |
+| `descendants(node)` | `list[str]` | All transitive successors |
+| `topological_levels()` | `list[list[str]]` | Nodes grouped by depth |
+| `stats()` | `dict` | Node count, edge count, depth, width, density |
+
+## Critical path
+
+Find the longest-weighted path through the DAG:
+
+```python
+path, cost = wf.critical_path({
+ "a": 2.0,
+ "b": 7.0,
+ "c": 1.0,
+ "d": 3.0,
+})
+# path = ["a", "b", "d"], cost = 12.0
+```
+
+Pass estimated durations per step. The critical path determines the
+minimum total execution time.
+
+## Execution plan
+
+Generate a step-by-step schedule respecting worker limits:
+
+```python
+plan = wf.execution_plan(max_workers=2)
+# [["a"], ["b", "c"], ["d"]]
+
+plan = wf.execution_plan(max_workers=1)
+# [["a"], ["b"], ["c"], ["d"]]
+```
+
+Each stage contains up to `max_workers` nodes. Nodes in the same
+topological level are batched together.
+
+## Bottleneck analysis
+
+Identify the most expensive step on the critical path:
+
+```python
+result = wf.bottleneck_analysis({
+ "a": 2.0, "b": 7.0, "c": 1.0, "d": 3.0
+})
+# {
+# "node": "b",
+# "cost": 7.0,
+# "percentage": 58.3,
+# "critical_path": ["a", "b", "d"],
+# "total_cost": 12.0,
+# "suggestion": "b is the bottleneck (58.3% of total time). ..."
+# }
+```
+
+## Visualization
+
+Render the DAG as a diagram string:
+
+
+
+
+```python
+print(wf.visualize("mermaid"))
+```
+
+```
+graph LR
+ a[a]
+ b[b]
+ c[c]
+ d[d]
+ a --> b
+ a --> c
+ b --> d
+ c --> d
+```
+
+
+
+
+```python
+print(wf.visualize("dot"))
+```
+
+```
+digraph workflow {
+ rankdir=LR;
+ a [label="a" style=filled fillcolor=white];
+ b [label="b" style=filled fillcolor=white];
+ ...
+}
+```
+
+
+
+
+### Live status visualization
+
+`WorkflowRun.visualize()` includes status colors:
+
+```python
+run = queue.submit_workflow(wf)
+run.wait()
+
+print(run.visualize("mermaid"))
+```
+
+```
+graph LR
+ a[a ✓]
+ b[b ✓]
+ a --> b
+ style a fill:#90EE90
+ style b fill:#90EE90
+```
+
+| Status | Color |
+|--------|-------|
+| Completed | Green `#90EE90` |
+| Failed | Red `#FFB6C1` |
+| Running | Blue `#87CEEB` |
+| Pending | Gray `#D3D3D3` |
+| Skipped | Light gray `#F5F5F5` |
+| Waiting Approval | Yellow `#FFFACD` |
diff --git a/docs-next/content/docs/guides/workflows/building.mdx b/docs-next/content/docs/guides/workflows/building.mdx
new file mode 100644
index 0000000..c3f890b
--- /dev/null
+++ b/docs-next/content/docs/guides/workflows/building.mdx
@@ -0,0 +1,140 @@
+---
+title: Building Workflows
+description: "Workflow.step(), @queue.workflow() decorator, step configuration, DAG structure, node statuses."
+---
+
+A workflow is a DAG of steps. Each step wraps a registered task. The engine
+creates jobs in topological order with `depends_on` chains so the existing
+scheduler handles execution.
+
+## Defining steps
+
+```python
+from taskito.workflows import Workflow
+
+wf = Workflow(name="etl", version=1)
+wf.step("extract", extract_task)
+wf.step("transform", transform_task, after="extract")
+wf.step("load", load_task, after="transform")
+```
+
+Steps are added in order. The `after` parameter declares predecessors — a
+step won't run until all its predecessors complete.
+
+### Multiple predecessors
+
+```python
+wf.step("merge", merge_task, after=["branch_a", "branch_b"])
+```
+
+### Step arguments
+
+```python
+wf.step("fetch", fetch_task, args=("https://api.example.com",))
+wf.step("process", process_task, after="fetch", kwargs={"mode": "strict"})
+```
+
+Arguments are serialized at submission time using the queue's serializer.
+
+## Step configuration
+
+| Parameter | Type | Default | Description |
+|-----------|------|---------|-------------|
+| `name` | `str` | required | Unique step name within the workflow |
+| `task` | `TaskWrapper` | required | Registered `@queue.task()` function |
+| `after` | `str \| list[str]` | `None` | Predecessor step(s) |
+| `args` | `tuple` | `()` | Positional arguments |
+| `kwargs` | `dict` | `None` | Keyword arguments |
+| `queue` | `str` | `None` | Override queue name |
+| `max_retries` | `int` | `None` | Override retry count |
+| `timeout_ms` | `int` | `None` | Override timeout (milliseconds) |
+| `priority` | `int` | `None` | Override priority |
+
+## Workflow decorator
+
+Register reusable workflow factories with `@queue.workflow()`:
+
+```python
+@queue.workflow("nightly_etl")
+def etl_pipeline():
+ wf = Workflow()
+ wf.step("extract", extract)
+ wf.step("load", load, after="extract")
+ return wf
+
+# Build and submit
+run = etl_pipeline.submit()
+run.wait()
+
+# Or build without submitting
+wf = etl_pipeline.build()
+print(wf.step_names) # ["extract", "load"]
+```
+
+## Submitting
+
+```python
+run = queue.submit_workflow(wf)
+```
+
+This creates a `WorkflowRun` handle. Under the hood:
+
+1. A `WorkflowDefinition` is stored (or reused by name + version)
+2. A `WorkflowRun` record is created
+3. For each step in topological order, a job is enqueued with `depends_on` chains
+4. The run transitions to `RUNNING`
+
+>R: submit_workflow(dag, payloads)
+ R->>R: Store definition + run
+ loop Each step in topo order
+ R->>S: enqueue(job, depends_on=[pred_ids])
+ end
+ R-->>P: WorkflowRun handle
+ S->>S: Dequeue jobs as deps satisfied`}
+/>
+
+## Workflow parameters
+
+| Parameter | Type | Default | Description |
+|-----------|------|---------|-------------|
+| `name` | `str` | `"workflow"` | Workflow name |
+| `version` | `int` | `1` | Version number |
+| `on_failure` | `str` | `"fail_fast"` | Error strategy: `"fail_fast"` or `"continue"` |
+| `cache_ttl` | `float` | `None` | Cache TTL in seconds for [incremental runs](/docs/guides/workflows/caching) |
+
+## Node statuses
+
+Each step transitions through these states:
+
+ Pending
+ Pending --> Running : job picked up
+ Running --> Completed : success
+ Running --> Failed : error (retries exhausted)
+ Pending --> Skipped : cascade / condition
+ Pending --> WaitingApproval : gate reached
+ WaitingApproval --> Completed : approved
+ WaitingApproval --> Failed : rejected
+ Pending --> CacheHit : incremental reuse
+ Completed --> [*]
+ Failed --> [*]
+ Skipped --> [*]
+ CacheHit --> [*]`}
+/>
+
+| Status | Terminal | Meaning |
+|--------|----------|---------|
+| `PENDING` | No | Waiting for predecessors or job creation |
+| `RUNNING` | No | Job is executing |
+| `COMPLETED` | Yes | Step succeeded |
+| `FAILED` | Yes | Step failed after retries exhausted |
+| `SKIPPED` | Yes | Skipped due to failure cascade or unmet condition |
+| `WAITING_APPROVAL` | No | Gate awaiting approve/reject |
+| `CACHE_HIT` | Yes | Reused result from a prior run |
diff --git a/docs-next/content/docs/guides/workflows/caching.mdx b/docs-next/content/docs/guides/workflows/caching.mdx
new file mode 100644
index 0000000..601e04c
--- /dev/null
+++ b/docs-next/content/docs/guides/workflows/caching.mdx
@@ -0,0 +1,77 @@
+---
+title: Incremental Runs
+description: "Skip unchanged steps with CACHE_HIT — result hashing, dirty-set propagation, TTL."
+---
+
+Skip unchanged steps by reusing results from a prior run. When a node
+completed successfully in the base run, it gets `CACHE_HIT` status instead
+of re-executing.
+
+## Basic usage
+
+```python
+# First run: everything executes, results hashed (SHA-256)
+run1 = queue.submit_workflow(wf)
+run1.wait()
+
+# Second run: skip completed nodes from run1
+run2 = queue.submit_workflow(wf, incremental=True, base_run=run1.id)
+run2.wait()
+```
+
+Nodes that completed in `run1` with a stored result hash become
+`CACHE_HIT` in `run2`. Nodes that failed or are missing re-execute.
+
+## Dirty-set propagation
+
+If a node is dirty (failed or missing in the base run), all its downstream
+nodes are also dirty — even if they had cached results:
+
+ B["b — dirty (propagated)"]
+ B --> C["c — dirty (propagated)"]
+ style A fill:#FFB6C1
+ style B fill:#FFB6C1
+ style C fill:#FFB6C1`}
+/>
+
+ B["b — dirty (failed in base)"]
+ B --> C["c — dirty (propagated)"]
+ style A fill:#90EE90
+ style B fill:#FFB6C1
+ style C fill:#FFB6C1`}
+/>
+
+## Cache TTL
+
+Set a time-to-live on cached results:
+
+```python
+wf = Workflow(name="pipeline", cache_ttl=3600) # 1 hour
+```
+
+If the base run completed more than `cache_ttl` seconds ago, all nodes are
+treated as dirty (full re-execution).
+
+## How it works
+
+At submit time with `incremental=True`:
+
+1. Fetch the base run's node data: `{name: (status, result_hash)}`
+2. For each node in the new run:
+ - Base node completed + has result_hash → `CACHE_HIT`
+ - Base node failed / missing → dirty
+ - Any predecessor dirty → also dirty (propagated)
+3. `CACHE_HIT` nodes are created with `status=cache_hit` and `completed_at` set — no job enqueued
+4. Dirty nodes get normal jobs
+
+## Parameters
+
+| Parameter | Type | Default | Description |
+|-----------|------|---------|-------------|
+| `incremental` | `bool` | `False` | Enable cache comparison |
+| `base_run` | `str` | `None` | Run ID to compare against |
+| `cache_ttl` | `float` | `None` | TTL in seconds (on `Workflow`) |
diff --git a/docs-next/content/docs/guides/workflows/canvas.mdx b/docs-next/content/docs/guides/workflows/canvas.mdx
new file mode 100644
index 0000000..8823482
--- /dev/null
+++ b/docs-next/content/docs/guides/workflows/canvas.mdx
@@ -0,0 +1,128 @@
+---
+title: Canvas Primitives
+description: "Lightweight chain / group / chord composition for simpler pipelines."
+---
+
+import { Callout } from "fumadocs-ui/components/callout";
+
+For simpler pipelines without DAG features, taskito provides **chain**,
+**group**, and **chord** — lightweight composition that doesn't require the
+workflow engine.
+
+## Signatures
+
+A `Signature` wraps a task call for deferred execution:
+
+```python
+from taskito import chain, group, chord
+
+sig = add.s(1, 2) # Mutable — receives previous result as first arg
+sig = add.si(1, 2) # Immutable — ignores previous result
+```
+
+## Chain
+
+Execute tasks sequentially, piping each result to the next:
+
+|result| S2["transform.s()"]
+ S2 -->|result| S3["load.s()"]`}
+/>
+
+```python
+result = chain(
+ extract.s("https://api.example.com/users"),
+ transform.s(),
+ load.s(),
+).apply(queue)
+
+print(result.result(timeout=30))
+```
+
+
+ Use `.si()` when a step should **not** receive the previous result:
+
+ ```python
+ chain(
+ step_a.s(input_data),
+ step_b.si(independent_data),
+ step_c.s(),
+ ).apply(queue)
+ ```
+
+
+## Group
+
+Execute tasks in parallel (fan-out):
+
+```python
+jobs = group(
+ process.s(1),
+ process.s(2),
+ process.s(3),
+).apply(queue)
+
+results = [j.result(timeout=30) for j in jobs]
+```
+
+### Concurrency limits
+
+```python
+jobs = group(
+ *[fetch.s(url) for url in urls],
+ max_concurrency=5,
+).apply(queue)
+```
+
+## Chord
+
+Fan-out with a callback — run tasks in parallel, then pass all results to
+a final task:
+
+```python
+result = chord(
+ group(
+ fetch.s("https://api1.example.com"),
+ fetch.s("https://api2.example.com"),
+ fetch.s("https://api3.example.com"),
+ ),
+ merge.s(),
+).apply(queue)
+```
+
+## chunks / starmap
+
+```python
+from taskito import chunks, starmap
+
+# Batch processing — split 1000 items into groups of 100
+results = chunks(process_batch, items, chunk_size=100).apply(queue)
+
+# Map-reduce pattern
+result = chord(
+ chunks(process_batch, items, chunk_size=100),
+ merge_results.s(),
+).apply(queue)
+
+# Tuple unpacking
+results = starmap(add, [(1, 2), (3, 4), (5, 6)]).apply(queue)
+```
+
+## When to use canvas vs DAG workflows
+
+| Feature | Canvas | DAG Workflows |
+|---------|--------|---------------|
+| Setup | No imports needed | `from taskito.workflows import Workflow` |
+| Topology | Linear chains, flat groups | Arbitrary DAGs |
+| Fan-out | Static (known at build time) | Dynamic (from return values) |
+| Conditions | None | `on_success`, `on_failure`, `always`, callables |
+| Error handling | Per-task retries only | Workflow-level strategies |
+| Approval gates | No | Yes |
+| Sub-workflows | No | Yes |
+| Incremental runs | No | Yes |
+| Status tracking | Per-job only | Per-workflow + per-node |
+| Visualization | No | Mermaid / DOT |
+
+Use canvas for quick one-off pipelines. Use DAG workflows for production
+pipelines that need monitoring, conditions, or complex topologies.
diff --git a/docs-next/content/docs/guides/workflows/composition.mdx b/docs-next/content/docs/guides/workflows/composition.mdx
new file mode 100644
index 0000000..11aea72
--- /dev/null
+++ b/docs-next/content/docs/guides/workflows/composition.mdx
@@ -0,0 +1,97 @@
+---
+title: Sub-Workflows & Scheduling
+description: "Nest workflows with WorkflowProxy.as_step(), schedule with @queue.periodic()."
+---
+
+import { Callout } from "fumadocs-ui/components/callout";
+
+Nest workflows for composition, and schedule workflows on a cron.
+
+## Sub-workflows
+
+Use `WorkflowProxy.as_step()` to embed one workflow inside another:
+
+```python
+@queue.workflow("etl")
+def etl_pipeline(region):
+ wf = Workflow()
+ wf.step("extract", extract, args=[region])
+ wf.step("load", load, after="extract")
+ return wf
+
+@queue.workflow("daily")
+def daily_pipeline():
+ wf = Workflow()
+ wf.step("eu_etl", etl_pipeline.as_step(region="eu"))
+ wf.step("us_etl", etl_pipeline.as_step(region="us"))
+ wf.step("reconcile", reconcile, after=["eu_etl", "us_etl"])
+ return wf
+
+run = daily_pipeline.submit()
+```
+
+ reconcile
+ us["us_etl"] --> reconcile
+ end
+
+ subgraph Child1["etl (region=eu)"]
+ e1["extract"] --> l1["load"]
+ end
+
+ subgraph Child2["etl (region=us)"]
+ e2["extract"] --> l2["load"]
+ end
+
+ eu -.->|"submits"| Child1
+ us -.->|"submits"| Child2`}
+/>
+
+### How it works
+
+1. Parent workflow submits the child workflow via `queue.submit_workflow()` with `parent_run_id`
+2. The child runs independently with its own nodes and status
+3. When the child completes/fails, the tracker updates the parent node
+4. Downstream steps in the parent evaluate normally
+
+### Cancellation cascade
+
+Cancelling the parent cascades to all active child workflows:
+
+```python
+run.cancel() # Cancels parent + all child sub-workflows
+```
+
+### Failure
+
+If a child workflow fails at runtime, the parent node is marked `FAILED`.
+Downstream steps follow the parent's `on_failure` strategy.
+
+The same holds for failures at *submission* time — if the child's factory
+raises or the DAG fails to compile when the parent node becomes evaluable,
+the parent node is marked `FAILED` immediately (rather than leaving the
+outer run hanging), and the parent run finalizes normally.
+
+## Cron-scheduled workflows
+
+Stack `@queue.periodic()` on top of `@queue.workflow()`:
+
+```python
+@queue.periodic(cron="0 0 2 * * *") # 2:00 AM daily
+@queue.workflow("nightly_analytics")
+def nightly():
+ wf = Workflow()
+ wf.step("extract", extract_clickstream)
+ wf.step("aggregate", build_dashboards, after="extract")
+ return wf
+```
+
+Each cron trigger submits a new workflow run. Under the hood, a bridge task
+`_wf_launcher_nightly_analytics` is registered that calls `proxy.submit()`.
+
+
+ The `@queue.periodic()` decorator must be the **outer** decorator
+ (applied second, listed first).
+
diff --git a/docs-next/content/docs/guides/workflows/conditions.mdx b/docs-next/content/docs/guides/workflows/conditions.mdx
new file mode 100644
index 0000000..df05178
--- /dev/null
+++ b/docs-next/content/docs/guides/workflows/conditions.mdx
@@ -0,0 +1,132 @@
+---
+title: Conditions & Error Handling
+description: "on_success, on_failure, always, callable conditions, fail_fast vs continue, skip propagation."
+---
+
+import { Tab, Tabs } from "fumadocs-ui/components/tabs";
+
+Control which steps execute based on predecessor outcomes, and configure
+how the workflow responds to failures.
+
+## Step conditions
+
+```python
+wf.step("deploy", deploy, after="test") # default: on_success
+wf.step("rollback", rollback, after="deploy", condition="on_failure")
+wf.step("notify", send_slack, after="deploy", condition="always")
+```
+
+| Condition | Runs when |
+|-----------|-----------|
+| `None` / `"on_success"` | All predecessors completed successfully |
+| `"on_failure"` | Any predecessor failed |
+| `"always"` | Predecessors are terminal (regardless of outcome) |
+| `callable` | `condition(ctx)` returns `True` |
+
+## Callable conditions
+
+Pass a function that receives a `WorkflowContext`:
+
+```python
+from taskito.workflows import WorkflowContext
+
+def high_score(ctx: WorkflowContext) -> bool:
+ return ctx.results["validate"]["score"] > 0.95
+
+wf.step("deploy", deploy, after="validate", condition=high_score)
+```
+
+`WorkflowContext` fields:
+
+| Field | Type | Description |
+|-------|------|-------------|
+| `run_id` | `str` | Workflow run ID |
+| `results` | `dict[str, Any]` | Deserialized return values of completed nodes |
+| `statuses` | `dict[str, str]` | Status strings for all terminal nodes |
+| `failure_count` | `int` | Number of failed nodes |
+| `success_count` | `int` | Number of completed nodes |
+
+## Error strategies
+
+Set the workflow-level error strategy:
+
+
+
+
+```python
+wf = Workflow(name="strict", on_failure="fail_fast")
+```
+
+One failure skips **all** pending steps. The workflow transitions to `FAILED`.
+
+ B["b ✗"]
+ B --> C["c ━ SKIPPED"]
+ B --> D["d ━ SKIPPED"]
+ style B fill:#FFB6C1
+ style C fill:#F5F5F5
+ style D fill:#F5F5F5`}
+/>
+
+
+
+
+```python
+wf = Workflow(name="resilient", on_failure="continue")
+```
+
+Failed steps skip their `on_success` dependents, but **independent
+branches keep running**.
+
+ fail_branch["fail ✗"]
+ root --> ok_branch["ok ✓"]
+ fail_branch --> after_fail["after_fail ━ SKIPPED"]
+ ok_branch --> after_ok["after_ok ✓"]
+ style fail_branch fill:#FFB6C1
+ style after_fail fill:#F5F5F5
+ style after_ok fill:#90EE90`}
+/>
+
+
+
+
+## Skip propagation
+
+When a node is skipped, its successors are evaluated recursively:
+
+- `on_success` successors → **SKIPPED** (predecessor didn't succeed)
+- `on_failure` successors → evaluated (predecessor is terminal)
+- `always` successors → **run** regardless of how the predecessor ended
+
+```python
+wf = Workflow(name="cleanup_pipeline")
+wf.step("a", risky_task)
+wf.step("b", next_step, after="a") # SKIPPED if a fails
+wf.step("cleanup", cleanup, after="b", condition="always") # runs even if b is skipped
+```
+
+ B["b ━ SKIPPED"]
+ B --> C["cleanup ✓ ALWAYS"]
+ style A fill:#FFB6C1
+ style B fill:#F5F5F5
+ style C fill:#90EE90`}
+/>
+
+## Combining conditions with fan-out
+
+Conditions work with fan-out nodes. If a fan-out child fails:
+
+```python
+wf.step("fetch", fetch_data)
+wf.step("process", process, after="fetch", fan_out="each")
+wf.step("aggregate", aggregate, after="process", fan_in="all")
+wf.step("on_error", alert, after="process", condition="on_failure")
+```
+
+If any `process[i]` child fails, the fan-out parent is marked `FAILED`,
+`aggregate` is skipped, and `on_error` runs.
diff --git a/docs-next/content/docs/guides/workflows/fan-out.mdx b/docs-next/content/docs/guides/workflows/fan-out.mdx
new file mode 100644
index 0000000..57962c2
--- /dev/null
+++ b/docs-next/content/docs/guides/workflows/fan-out.mdx
@@ -0,0 +1,117 @@
+---
+title: Fan-Out & Fan-In
+description: "Split a step's result into parallel children, collect results into a downstream step."
+---
+
+Split a step's result into parallel child jobs, then collect all results
+into a downstream step.
+
+ process_0["process[0]"]
+ fetch --> process_1["process[1]"]
+ fetch --> process_2["process[2]"]
+ process_0 --> aggregate
+ process_1 --> aggregate
+ process_2 --> aggregate
+
+ style fetch fill:#90EE90
+ style aggregate fill:#87CEEB`}
+/>
+
+## Fan-out with `"each"`
+
+The predecessor's return value must be iterable. Each element becomes a
+separate child job:
+
+```python
+@queue.task()
+def fetch() -> list[int]:
+ return [10, 20, 30]
+
+@queue.task()
+def process(item: int) -> int:
+ return item * 2
+
+@queue.task()
+def aggregate(results: list[int]) -> int:
+ return sum(results) # receives [20, 40, 60]
+
+wf = Workflow(name="map_reduce")
+wf.step("fetch", fetch)
+wf.step("process", process, after="fetch", fan_out="each")
+wf.step("aggregate", aggregate, after="process", fan_in="all")
+```
+
+Child nodes are named `process[0]`, `process[1]`, `process[2]` and appear
+in status queries.
+
+## How it works
+
+1. `fetch` completes — the tracker reads its return value
+2. `apply_fan_out("each", result)` splits the list into individual items
+3. `expand_fan_out()` creates N child nodes + N jobs (no `depends_on` — they're ready immediately)
+4. Each child runs independently in parallel
+5. When all children complete, `check_fan_out_completion()` marks the parent
+6. The tracker collects all child results in index order
+7. The fan-in job is created with `((results_list,), {})` as its payload
+
+>T: JOB_COMPLETED(fetch)
+ T->>R: get_job(fetch_id).result_bytes
+ T->>R: expand_fan_out(3 children)
+ R-->>T: [child_job_ids]
+ Note over R: Children execute in parallel
+ R->>T: JOB_COMPLETED(process[0])
+ R->>T: JOB_COMPLETED(process[1])
+ R->>T: JOB_COMPLETED(process[2])
+ T->>R: check_fan_out_completion → all done
+ T->>R: create_deferred_job(aggregate)`}
+/>
+
+## Empty fan-out
+
+If the predecessor returns an empty list, the fan-out parent is marked
+`COMPLETED` immediately with zero children, and the fan-in receives an
+empty list:
+
+```python
+@queue.task()
+def fetch() -> list:
+ return [] # nothing to process
+
+# aggregate receives []
+```
+
+## Fan-out with downstream steps
+
+Steps after the fan-in work normally:
+
+```python
+wf = Workflow(name="full_pipeline")
+wf.step("fetch", fetch)
+wf.step("process", process, after="fetch", fan_out="each")
+wf.step("aggregate", aggregate, after="process", fan_in="all")
+wf.step("report", send_report, after="aggregate") # runs after aggregate
+```
+
+## Failure handling
+
+By default (`on_failure="fail_fast"`), if any fan-out child fails:
+
+- Remaining pending children are cancelled
+- The fan-out parent is marked `FAILED`
+- The fan-in and downstream steps are `SKIPPED`
+- The workflow transitions to `FAILED`
+
+Combine with [conditions](/docs/guides/workflows/conditions) for more
+control:
+
+```python
+wf.step("handle_error", alert, after="process", condition="on_failure")
+```
diff --git a/docs-next/content/docs/guides/workflows/gates.mdx b/docs-next/content/docs/guides/workflows/gates.mdx
new file mode 100644
index 0000000..9066bc0
--- /dev/null
+++ b/docs-next/content/docs/guides/workflows/gates.mdx
@@ -0,0 +1,79 @@
+---
+title: Approval Gates
+description: "Pause workflows for human review — approve, reject, timeout, gate-reached events."
+---
+
+Pause a workflow for human review. The gate enters `WAITING_APPROVAL`
+status until explicitly approved or rejected — or until a timeout fires.
+
+ eval["evaluate ✓"]
+ eval --> gate["approve ⏸"]
+ gate -->|approved| deploy["deploy"]
+ gate -->|rejected| skip["deploy ━ SKIPPED"]
+ style gate fill:#FFFACD`}
+/>
+
+## Adding a gate
+
+```python
+wf = Workflow(name="ml_deploy")
+wf.step("train", train_model)
+wf.step("evaluate", evaluate, after="train")
+wf.gate("approve", after="evaluate")
+wf.step("deploy", deploy, after="approve")
+```
+
+When the workflow reaches the gate, it pauses. Downstream steps won't
+execute until the gate is resolved.
+
+## Resolving a gate
+
+```python
+run = queue.submit_workflow(wf)
+
+# Later, after review:
+queue.approve_gate(run.id, "approve") # → gate COMPLETED, deploy runs
+# or:
+queue.reject_gate(run.id, "approve") # → gate FAILED, deploy SKIPPED
+```
+
+## Timeout
+
+Auto-resolve after a deadline:
+
+```python
+wf.gate("approve", after="evaluate",
+ timeout=86400, # 24 hours
+ on_timeout="reject") # or "approve"
+```
+
+| Parameter | Type | Default | Description |
+|-----------|------|---------|-------------|
+| `timeout` | `float` | `None` | Seconds until auto-resolve. `None` waits forever |
+| `on_timeout` | `str` | `"reject"` | Action on expiry: `"approve"` or `"reject"` |
+| `message` | `str` | `None` | Human-readable message for approvers |
+
+## Gate with conditions
+
+Gates respect step conditions:
+
+```python
+wf.step("test", run_tests)
+wf.gate("approve", after="test", condition="on_success")
+wf.step("deploy", deploy, after="approve")
+```
+
+If `test` fails, the gate is skipped (condition not met), and `deploy` is
+also skipped.
+
+## Events
+
+When a gate enters `WAITING_APPROVAL`, a `WORKFLOW_GATE_REACHED` event fires:
+
+```python
+@queue.on(EventType.WORKFLOW_GATE_REACHED)
+def notify_team(event_type, payload):
+ send_slack(f"Workflow {payload['run_id']} needs approval at {payload['node_name']}")
+```
diff --git a/docs-next/content/docs/guides/workflows/index.mdx b/docs-next/content/docs/guides/workflows/index.mdx
index 9ccf6ce..7d15778 100644
--- a/docs-next/content/docs/guides/workflows/index.mdx
+++ b/docs-next/content/docs/guides/workflows/index.mdx
@@ -1,10 +1,60 @@
---
title: Workflows
-description: "DAG workflows, fan-out/fan-in, gates, sub-workflows."
+description: "Build multi-step pipelines as directed acyclic graphs."
---
-import { Callout } from 'fumadocs-ui/components/callout';
+Build multi-step pipelines as directed acyclic graphs. Define steps, wire
+dependencies, and let taskito handle execution order, parallelism, failure
+propagation, and state tracking — all backed by a Rust engine with
+dagron-core for graph algorithms.
-
- Content port pending. See the [Zensical source](https://github.com/ByteVeda/taskito/tree/master/docs) for current text.
-
+|"_compile()"| TE["Topology Engine\\ndagron-core DAG"]
+ TE --> WS["Workflow Storage\\nSQLite — definitions, runs, nodes"]
+ WS -->|"enqueue jobs"| SC["Scheduler\\ndepends_on chains"]
+ SC -->|"JOB_COMPLETED event"| WT["Workflow Tracker\\nevent-driven orchestration"]
+ WT -->|"evaluate successors"| WS
+ WT --> WR["WorkflowRun\\nstatus() · wait() · cancel()"]`}
+/>
+
+## Quick start
+
+```python
+from taskito import Queue
+from taskito.workflows import Workflow
+
+queue = Queue(db_path="tasks.db")
+
+@queue.task()
+def extract(): return fetch_data()
+
+@queue.task()
+def transform(data): return clean(data)
+
+@queue.task()
+def load(data): db.insert(data)
+
+wf = Workflow(name="etl_pipeline")
+wf.step("extract", extract)
+wf.step("transform", transform, after="extract")
+wf.step("load", load, after="transform")
+
+run = queue.submit_workflow(wf)
+result = run.wait(timeout=60)
+print(result.state) # WorkflowState.COMPLETED
+```
+
+## Section overview
+
+| Page | What it covers |
+|---|---|
+| [Building Workflows](/docs/guides/workflows/building) | `Workflow.step()`, decorator pattern, step configuration, DAG structure |
+| [Fan-Out & Fan-In](/docs/guides/workflows/fan-out) | Splitting results into parallel jobs, collecting with aggregation |
+| [Conditions & Error Handling](/docs/guides/workflows/conditions) | `on_success`, `on_failure`, `always`, callable conditions, `on_failure` modes |
+| [Approval Gates](/docs/guides/workflows/gates) | Human-in-the-loop pause/resume, timeout, approve/reject API |
+| [Sub-Workflows & Scheduling](/docs/guides/workflows/composition) | Nesting workflows, cron-scheduled runs |
+| [Incremental Runs](/docs/guides/workflows/caching) | Result hashing, `CACHE_HIT`, dirty-set propagation, TTL |
+| [Analysis & Visualization](/docs/guides/workflows/analysis) | Critical path, bottleneck analysis, Mermaid/DOT rendering |
+| [Canvas Primitives](/docs/guides/workflows/canvas) | Chain, group, chord — simple composition without DAGs |
diff --git a/docs-next/content/docs/guides/workflows/meta.json b/docs-next/content/docs/guides/workflows/meta.json
index e15b535..b20359e 100644
--- a/docs-next/content/docs/guides/workflows/meta.json
+++ b/docs-next/content/docs/guides/workflows/meta.json
@@ -1,4 +1,14 @@
{
"title": "Workflows",
- "pages": ["index"]
+ "pages": [
+ "index",
+ "building",
+ "fan-out",
+ "conditions",
+ "gates",
+ "composition",
+ "caching",
+ "analysis",
+ "canvas"
+ ]
}