Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
153 changes: 153 additions & 0 deletions docs-next/content/docs/guides/workflows/analysis.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
---
title: Analysis & Visualization
description: "ancestors / descendants / topological_levels / critical_path / bottleneck_analysis / Mermaid + DOT rendering."
---

import { Tab, Tabs } from "fumadocs-ui/components/tabs";

Analyze the workflow DAG before execution and render diagrams with live
status.

## Graph inspection

```python
wf = Workflow(name="pipeline")
wf.step("a", task_a)
wf.step("b", task_b, after="a")
wf.step("c", task_c, after="a")
wf.step("d", task_d, after=["b", "c"])

wf.ancestors("d") # ["a", "b", "c"]
wf.descendants("a") # ["b", "c", "d"]
wf.topological_levels() # [["a"], ["b", "c"], ["d"]]
wf.stats()
# {"nodes": 4, "edges": 4, "depth": 3, "width": 2, "density": 0.6667}
```

| Method | Returns | Description |
|--------|---------|-------------|
| `ancestors(node)` | `list[str]` | All transitive predecessors |
| `descendants(node)` | `list[str]` | All transitive successors |
| `topological_levels()` | `list[list[str]]` | Nodes grouped by depth |
| `stats()` | `dict` | Node count, edge count, depth, width, density |

## Critical path

Find the longest-weighted path through the DAG:

```python
path, cost = wf.critical_path({
"a": 2.0,
"b": 7.0,
"c": 1.0,
"d": 3.0,
})
# path = ["a", "b", "d"], cost = 12.0
```

Pass estimated durations per step. The critical path determines the
minimum total execution time.

## Execution plan

Generate a step-by-step schedule respecting worker limits:

```python
plan = wf.execution_plan(max_workers=2)
# [["a"], ["b", "c"], ["d"]]

plan = wf.execution_plan(max_workers=1)
# [["a"], ["b"], ["c"], ["d"]]
```

Each stage contains up to `max_workers` nodes. Nodes in the same
topological level are batched together.

## Bottleneck analysis

Identify the most expensive step on the critical path:

```python
result = wf.bottleneck_analysis({
"a": 2.0, "b": 7.0, "c": 1.0, "d": 3.0
})
# {
# "node": "b",
# "cost": 7.0,
# "percentage": 58.3,
# "critical_path": ["a", "b", "d"],
# "total_cost": 12.0,
# "suggestion": "b is the bottleneck (58.3% of total time). ..."
# }
```

## Visualization

Render the DAG as a diagram string:

<Tabs items={["Mermaid", "Graphviz DOT"]}>
<Tab value="Mermaid">

```python
print(wf.visualize("mermaid"))
```

```
graph LR
a[a]
b[b]
c[c]
d[d]
a --> b
a --> c
b --> d
c --> d
```

</Tab>
<Tab value="Graphviz DOT">

```python
print(wf.visualize("dot"))
```

```
digraph workflow {
rankdir=LR;
a [label="a" style=filled fillcolor=white];
b [label="b" style=filled fillcolor=white];
...
}
```

</Tab>
</Tabs>

### Live status visualization

`WorkflowRun.visualize()` includes status colors:

```python
run = queue.submit_workflow(wf)
run.wait()

print(run.visualize("mermaid"))
```

```
graph LR
a[a ✓]
b[b ✓]
a --> b
style a fill:#90EE90
style b fill:#90EE90
```

| Status | Color |
|--------|-------|
| Completed | Green `#90EE90` |
| Failed | Red `#FFB6C1` |
| Running | Blue `#87CEEB` |
| Pending | Gray `#D3D3D3` |
| Skipped | Light gray `#F5F5F5` |
| Waiting Approval | Yellow `#FFFACD` |
140 changes: 140 additions & 0 deletions docs-next/content/docs/guides/workflows/building.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
---
title: Building Workflows
description: "Workflow.step(), @queue.workflow() decorator, step configuration, DAG structure, node statuses."
---

A workflow is a DAG of steps. Each step wraps a registered task. The engine
creates jobs in topological order with `depends_on` chains so the existing
scheduler handles execution.

## Defining steps

```python
from taskito.workflows import Workflow

wf = Workflow(name="etl", version=1)
wf.step("extract", extract_task)
wf.step("transform", transform_task, after="extract")
wf.step("load", load_task, after="transform")
```

Steps are added in order. The `after` parameter declares predecessors — a
step won't run until all its predecessors complete.

### Multiple predecessors

```python
wf.step("merge", merge_task, after=["branch_a", "branch_b"])
```

### Step arguments

```python
wf.step("fetch", fetch_task, args=("https://api.example.com",))
wf.step("process", process_task, after="fetch", kwargs={"mode": "strict"})
```

Arguments are serialized at submission time using the queue's serializer.

## Step configuration

| Parameter | Type | Default | Description |
|-----------|------|---------|-------------|
| `name` | `str` | required | Unique step name within the workflow |
| `task` | `TaskWrapper` | required | Registered `@queue.task()` function |
| `after` | `str \| list[str]` | `None` | Predecessor step(s) |
| `args` | `tuple` | `()` | Positional arguments |
| `kwargs` | `dict` | `None` | Keyword arguments |
| `queue` | `str` | `None` | Override queue name |
| `max_retries` | `int` | `None` | Override retry count |
| `timeout_ms` | `int` | `None` | Override timeout (milliseconds) |
| `priority` | `int` | `None` | Override priority |

## Workflow decorator

Register reusable workflow factories with `@queue.workflow()`:

```python
@queue.workflow("nightly_etl")
def etl_pipeline():
wf = Workflow()
wf.step("extract", extract)
wf.step("load", load, after="extract")
return wf

# Build and submit
run = etl_pipeline.submit()
run.wait()

# Or build without submitting
wf = etl_pipeline.build()
print(wf.step_names) # ["extract", "load"]
```

## Submitting

```python
run = queue.submit_workflow(wf)
```

This creates a `WorkflowRun` handle. Under the hood:

1. A `WorkflowDefinition` is stored (or reused by name + version)
2. A `WorkflowRun` record is created
3. For each step in topological order, a job is enqueued with `depends_on` chains
4. The run transitions to `RUNNING`

<Mermaid
chart={`sequenceDiagram
participant P as Python
participant R as Rust Engine
participant S as Scheduler

P->>R: submit_workflow(dag, payloads)
R->>R: Store definition + run
loop Each step in topo order
R->>S: enqueue(job, depends_on=[pred_ids])
end
R-->>P: WorkflowRun handle
S->>S: Dequeue jobs as deps satisfied`}
/>

## Workflow parameters

| Parameter | Type | Default | Description |
|-----------|------|---------|-------------|
| `name` | `str` | `"workflow"` | Workflow name |
| `version` | `int` | `1` | Version number |
| `on_failure` | `str` | `"fail_fast"` | Error strategy: `"fail_fast"` or `"continue"` |
| `cache_ttl` | `float` | `None` | Cache TTL in seconds for [incremental runs](/docs/guides/workflows/caching) |

## Node statuses

Each step transitions through these states:

<Mermaid
chart={`stateDiagram-v2
[*] --> Pending
Pending --> Running : job picked up
Running --> Completed : success
Running --> Failed : error (retries exhausted)
Pending --> Skipped : cascade / condition
Pending --> WaitingApproval : gate reached
WaitingApproval --> Completed : approved
WaitingApproval --> Failed : rejected
Pending --> CacheHit : incremental reuse
Completed --> [*]
Failed --> [*]
Skipped --> [*]
CacheHit --> [*]`}
/>

| Status | Terminal | Meaning |
|--------|----------|---------|
| `PENDING` | No | Waiting for predecessors or job creation |
| `RUNNING` | No | Job is executing |
| `COMPLETED` | Yes | Step succeeded |
| `FAILED` | Yes | Step failed after retries exhausted |
| `SKIPPED` | Yes | Skipped due to failure cascade or unmet condition |
| `WAITING_APPROVAL` | No | Gate awaiting approve/reject |
| `CACHE_HIT` | Yes | Reused result from a prior run |
77 changes: 77 additions & 0 deletions docs-next/content/docs/guides/workflows/caching.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
---
title: Incremental Runs
description: "Skip unchanged steps with CACHE_HIT — result hashing, dirty-set propagation, TTL."
---

Skip unchanged steps by reusing results from a prior run. When a node
completed successfully in the base run, it gets `CACHE_HIT` status instead
of re-executing.

## Basic usage

```python
# First run: everything executes, results hashed (SHA-256)
run1 = queue.submit_workflow(wf)
run1.wait()

# Second run: skip completed nodes from run1
run2 = queue.submit_workflow(wf, incremental=True, base_run=run1.id)
run2.wait()
```

Nodes that completed in `run1` with a stored result hash become
`CACHE_HIT` in `run2`. Nodes that failed or are missing re-execute.

## Dirty-set propagation

If a node is dirty (failed or missing in the base run), all its downstream
nodes are also dirty — even if they had cached results:

<Mermaid
chart={`graph LR
A["a — dirty (missing)"] --> B["b — dirty (propagated)"]
B --> C["c — dirty (propagated)"]
style A fill:#FFB6C1
style B fill:#FFB6C1
style C fill:#FFB6C1`}
/>

<Mermaid
chart={`graph LR
A["a — CACHE_HIT ✓"] --> B["b — dirty (failed in base)"]
B --> C["c — dirty (propagated)"]
style A fill:#90EE90
style B fill:#FFB6C1
style C fill:#FFB6C1`}
/>

## Cache TTL

Set a time-to-live on cached results:

```python
wf = Workflow(name="pipeline", cache_ttl=3600) # 1 hour
```

If the base run completed more than `cache_ttl` seconds ago, all nodes are
treated as dirty (full re-execution).

## How it works

At submit time with `incremental=True`:

1. Fetch the base run's node data: `{name: (status, result_hash)}`
2. For each node in the new run:
- Base node completed + has result_hash → `CACHE_HIT`
- Base node failed / missing → dirty
- Any predecessor dirty → also dirty (propagated)
3. `CACHE_HIT` nodes are created with `status=cache_hit` and `completed_at` set — no job enqueued
4. Dirty nodes get normal jobs

## Parameters

| Parameter | Type | Default | Description |
|-----------|------|---------|-------------|
| `incremental` | `bool` | `False` | Enable cache comparison |
| `base_run` | `str` | `None` | Run ID to compare against |
| `cache_ttl` | `float` | `None` | TTL in seconds (on `Workflow`) |
Loading
Loading