From 88657535595a75414357b54ca9f7ad88bef7ffa1 Mon Sep 17 00:00:00 2001 From: Pratyush Sharma <56130065+pratyush618@users.noreply.github.com> Date: Sun, 3 May 2026 02:41:33 +0530 Subject: [PATCH] docs(docs-next): port guides/workflows (phase 4h) --- .../docs/guides/workflows/analysis.mdx | 153 ++++++++++++++++++ .../docs/guides/workflows/building.mdx | 140 ++++++++++++++++ .../content/docs/guides/workflows/caching.mdx | 77 +++++++++ .../content/docs/guides/workflows/canvas.mdx | 128 +++++++++++++++ .../docs/guides/workflows/composition.mdx | 97 +++++++++++ .../docs/guides/workflows/conditions.mdx | 132 +++++++++++++++ .../content/docs/guides/workflows/fan-out.mdx | 117 ++++++++++++++ .../content/docs/guides/workflows/gates.mdx | 79 +++++++++ .../content/docs/guides/workflows/index.mdx | 60 ++++++- .../content/docs/guides/workflows/meta.json | 12 +- 10 files changed, 989 insertions(+), 6 deletions(-) create mode 100644 docs-next/content/docs/guides/workflows/analysis.mdx create mode 100644 docs-next/content/docs/guides/workflows/building.mdx create mode 100644 docs-next/content/docs/guides/workflows/caching.mdx create mode 100644 docs-next/content/docs/guides/workflows/canvas.mdx create mode 100644 docs-next/content/docs/guides/workflows/composition.mdx create mode 100644 docs-next/content/docs/guides/workflows/conditions.mdx create mode 100644 docs-next/content/docs/guides/workflows/fan-out.mdx create mode 100644 docs-next/content/docs/guides/workflows/gates.mdx diff --git a/docs-next/content/docs/guides/workflows/analysis.mdx b/docs-next/content/docs/guides/workflows/analysis.mdx new file mode 100644 index 0000000..64588fb --- /dev/null +++ b/docs-next/content/docs/guides/workflows/analysis.mdx @@ -0,0 +1,153 @@ +--- +title: Analysis & Visualization +description: "ancestors / descendants / topological_levels / critical_path / bottleneck_analysis / Mermaid + DOT rendering." +--- + +import { Tab, Tabs } from "fumadocs-ui/components/tabs"; + +Analyze the workflow DAG before execution and render diagrams with live +status. + +## Graph inspection + +```python +wf = Workflow(name="pipeline") +wf.step("a", task_a) +wf.step("b", task_b, after="a") +wf.step("c", task_c, after="a") +wf.step("d", task_d, after=["b", "c"]) + +wf.ancestors("d") # ["a", "b", "c"] +wf.descendants("a") # ["b", "c", "d"] +wf.topological_levels() # [["a"], ["b", "c"], ["d"]] +wf.stats() +# {"nodes": 4, "edges": 4, "depth": 3, "width": 2, "density": 0.6667} +``` + +| Method | Returns | Description | +|--------|---------|-------------| +| `ancestors(node)` | `list[str]` | All transitive predecessors | +| `descendants(node)` | `list[str]` | All transitive successors | +| `topological_levels()` | `list[list[str]]` | Nodes grouped by depth | +| `stats()` | `dict` | Node count, edge count, depth, width, density | + +## Critical path + +Find the longest-weighted path through the DAG: + +```python +path, cost = wf.critical_path({ + "a": 2.0, + "b": 7.0, + "c": 1.0, + "d": 3.0, +}) +# path = ["a", "b", "d"], cost = 12.0 +``` + +Pass estimated durations per step. The critical path determines the +minimum total execution time. + +## Execution plan + +Generate a step-by-step schedule respecting worker limits: + +```python +plan = wf.execution_plan(max_workers=2) +# [["a"], ["b", "c"], ["d"]] + +plan = wf.execution_plan(max_workers=1) +# [["a"], ["b"], ["c"], ["d"]] +``` + +Each stage contains up to `max_workers` nodes. Nodes in the same +topological level are batched together. + +## Bottleneck analysis + +Identify the most expensive step on the critical path: + +```python +result = wf.bottleneck_analysis({ + "a": 2.0, "b": 7.0, "c": 1.0, "d": 3.0 +}) +# { +# "node": "b", +# "cost": 7.0, +# "percentage": 58.3, +# "critical_path": ["a", "b", "d"], +# "total_cost": 12.0, +# "suggestion": "b is the bottleneck (58.3% of total time). ..." +# } +``` + +## Visualization + +Render the DAG as a diagram string: + + + + +```python +print(wf.visualize("mermaid")) +``` + +``` +graph LR + a[a] + b[b] + c[c] + d[d] + a --> b + a --> c + b --> d + c --> d +``` + + + + +```python +print(wf.visualize("dot")) +``` + +``` +digraph workflow { + rankdir=LR; + a [label="a" style=filled fillcolor=white]; + b [label="b" style=filled fillcolor=white]; + ... +} +``` + + + + +### Live status visualization + +`WorkflowRun.visualize()` includes status colors: + +```python +run = queue.submit_workflow(wf) +run.wait() + +print(run.visualize("mermaid")) +``` + +``` +graph LR + a[a ✓] + b[b ✓] + a --> b + style a fill:#90EE90 + style b fill:#90EE90 +``` + +| Status | Color | +|--------|-------| +| Completed | Green `#90EE90` | +| Failed | Red `#FFB6C1` | +| Running | Blue `#87CEEB` | +| Pending | Gray `#D3D3D3` | +| Skipped | Light gray `#F5F5F5` | +| Waiting Approval | Yellow `#FFFACD` | diff --git a/docs-next/content/docs/guides/workflows/building.mdx b/docs-next/content/docs/guides/workflows/building.mdx new file mode 100644 index 0000000..c3f890b --- /dev/null +++ b/docs-next/content/docs/guides/workflows/building.mdx @@ -0,0 +1,140 @@ +--- +title: Building Workflows +description: "Workflow.step(), @queue.workflow() decorator, step configuration, DAG structure, node statuses." +--- + +A workflow is a DAG of steps. Each step wraps a registered task. The engine +creates jobs in topological order with `depends_on` chains so the existing +scheduler handles execution. + +## Defining steps + +```python +from taskito.workflows import Workflow + +wf = Workflow(name="etl", version=1) +wf.step("extract", extract_task) +wf.step("transform", transform_task, after="extract") +wf.step("load", load_task, after="transform") +``` + +Steps are added in order. The `after` parameter declares predecessors — a +step won't run until all its predecessors complete. + +### Multiple predecessors + +```python +wf.step("merge", merge_task, after=["branch_a", "branch_b"]) +``` + +### Step arguments + +```python +wf.step("fetch", fetch_task, args=("https://api.example.com",)) +wf.step("process", process_task, after="fetch", kwargs={"mode": "strict"}) +``` + +Arguments are serialized at submission time using the queue's serializer. + +## Step configuration + +| Parameter | Type | Default | Description | +|-----------|------|---------|-------------| +| `name` | `str` | required | Unique step name within the workflow | +| `task` | `TaskWrapper` | required | Registered `@queue.task()` function | +| `after` | `str \| list[str]` | `None` | Predecessor step(s) | +| `args` | `tuple` | `()` | Positional arguments | +| `kwargs` | `dict` | `None` | Keyword arguments | +| `queue` | `str` | `None` | Override queue name | +| `max_retries` | `int` | `None` | Override retry count | +| `timeout_ms` | `int` | `None` | Override timeout (milliseconds) | +| `priority` | `int` | `None` | Override priority | + +## Workflow decorator + +Register reusable workflow factories with `@queue.workflow()`: + +```python +@queue.workflow("nightly_etl") +def etl_pipeline(): + wf = Workflow() + wf.step("extract", extract) + wf.step("load", load, after="extract") + return wf + +# Build and submit +run = etl_pipeline.submit() +run.wait() + +# Or build without submitting +wf = etl_pipeline.build() +print(wf.step_names) # ["extract", "load"] +``` + +## Submitting + +```python +run = queue.submit_workflow(wf) +``` + +This creates a `WorkflowRun` handle. Under the hood: + +1. A `WorkflowDefinition` is stored (or reused by name + version) +2. A `WorkflowRun` record is created +3. For each step in topological order, a job is enqueued with `depends_on` chains +4. The run transitions to `RUNNING` + +>R: submit_workflow(dag, payloads) + R->>R: Store definition + run + loop Each step in topo order + R->>S: enqueue(job, depends_on=[pred_ids]) + end + R-->>P: WorkflowRun handle + S->>S: Dequeue jobs as deps satisfied`} +/> + +## Workflow parameters + +| Parameter | Type | Default | Description | +|-----------|------|---------|-------------| +| `name` | `str` | `"workflow"` | Workflow name | +| `version` | `int` | `1` | Version number | +| `on_failure` | `str` | `"fail_fast"` | Error strategy: `"fail_fast"` or `"continue"` | +| `cache_ttl` | `float` | `None` | Cache TTL in seconds for [incremental runs](/docs/guides/workflows/caching) | + +## Node statuses + +Each step transitions through these states: + + Pending + Pending --> Running : job picked up + Running --> Completed : success + Running --> Failed : error (retries exhausted) + Pending --> Skipped : cascade / condition + Pending --> WaitingApproval : gate reached + WaitingApproval --> Completed : approved + WaitingApproval --> Failed : rejected + Pending --> CacheHit : incremental reuse + Completed --> [*] + Failed --> [*] + Skipped --> [*] + CacheHit --> [*]`} +/> + +| Status | Terminal | Meaning | +|--------|----------|---------| +| `PENDING` | No | Waiting for predecessors or job creation | +| `RUNNING` | No | Job is executing | +| `COMPLETED` | Yes | Step succeeded | +| `FAILED` | Yes | Step failed after retries exhausted | +| `SKIPPED` | Yes | Skipped due to failure cascade or unmet condition | +| `WAITING_APPROVAL` | No | Gate awaiting approve/reject | +| `CACHE_HIT` | Yes | Reused result from a prior run | diff --git a/docs-next/content/docs/guides/workflows/caching.mdx b/docs-next/content/docs/guides/workflows/caching.mdx new file mode 100644 index 0000000..601e04c --- /dev/null +++ b/docs-next/content/docs/guides/workflows/caching.mdx @@ -0,0 +1,77 @@ +--- +title: Incremental Runs +description: "Skip unchanged steps with CACHE_HIT — result hashing, dirty-set propagation, TTL." +--- + +Skip unchanged steps by reusing results from a prior run. When a node +completed successfully in the base run, it gets `CACHE_HIT` status instead +of re-executing. + +## Basic usage + +```python +# First run: everything executes, results hashed (SHA-256) +run1 = queue.submit_workflow(wf) +run1.wait() + +# Second run: skip completed nodes from run1 +run2 = queue.submit_workflow(wf, incremental=True, base_run=run1.id) +run2.wait() +``` + +Nodes that completed in `run1` with a stored result hash become +`CACHE_HIT` in `run2`. Nodes that failed or are missing re-execute. + +## Dirty-set propagation + +If a node is dirty (failed or missing in the base run), all its downstream +nodes are also dirty — even if they had cached results: + + B["b — dirty (propagated)"] + B --> C["c — dirty (propagated)"] + style A fill:#FFB6C1 + style B fill:#FFB6C1 + style C fill:#FFB6C1`} +/> + + B["b — dirty (failed in base)"] + B --> C["c — dirty (propagated)"] + style A fill:#90EE90 + style B fill:#FFB6C1 + style C fill:#FFB6C1`} +/> + +## Cache TTL + +Set a time-to-live on cached results: + +```python +wf = Workflow(name="pipeline", cache_ttl=3600) # 1 hour +``` + +If the base run completed more than `cache_ttl` seconds ago, all nodes are +treated as dirty (full re-execution). + +## How it works + +At submit time with `incremental=True`: + +1. Fetch the base run's node data: `{name: (status, result_hash)}` +2. For each node in the new run: + - Base node completed + has result_hash → `CACHE_HIT` + - Base node failed / missing → dirty + - Any predecessor dirty → also dirty (propagated) +3. `CACHE_HIT` nodes are created with `status=cache_hit` and `completed_at` set — no job enqueued +4. Dirty nodes get normal jobs + +## Parameters + +| Parameter | Type | Default | Description | +|-----------|------|---------|-------------| +| `incremental` | `bool` | `False` | Enable cache comparison | +| `base_run` | `str` | `None` | Run ID to compare against | +| `cache_ttl` | `float` | `None` | TTL in seconds (on `Workflow`) | diff --git a/docs-next/content/docs/guides/workflows/canvas.mdx b/docs-next/content/docs/guides/workflows/canvas.mdx new file mode 100644 index 0000000..8823482 --- /dev/null +++ b/docs-next/content/docs/guides/workflows/canvas.mdx @@ -0,0 +1,128 @@ +--- +title: Canvas Primitives +description: "Lightweight chain / group / chord composition for simpler pipelines." +--- + +import { Callout } from "fumadocs-ui/components/callout"; + +For simpler pipelines without DAG features, taskito provides **chain**, +**group**, and **chord** — lightweight composition that doesn't require the +workflow engine. + +## Signatures + +A `Signature` wraps a task call for deferred execution: + +```python +from taskito import chain, group, chord + +sig = add.s(1, 2) # Mutable — receives previous result as first arg +sig = add.si(1, 2) # Immutable — ignores previous result +``` + +## Chain + +Execute tasks sequentially, piping each result to the next: + +|result| S2["transform.s()"] + S2 -->|result| S3["load.s()"]`} +/> + +```python +result = chain( + extract.s("https://api.example.com/users"), + transform.s(), + load.s(), +).apply(queue) + +print(result.result(timeout=30)) +``` + + + Use `.si()` when a step should **not** receive the previous result: + + ```python + chain( + step_a.s(input_data), + step_b.si(independent_data), + step_c.s(), + ).apply(queue) + ``` + + +## Group + +Execute tasks in parallel (fan-out): + +```python +jobs = group( + process.s(1), + process.s(2), + process.s(3), +).apply(queue) + +results = [j.result(timeout=30) for j in jobs] +``` + +### Concurrency limits + +```python +jobs = group( + *[fetch.s(url) for url in urls], + max_concurrency=5, +).apply(queue) +``` + +## Chord + +Fan-out with a callback — run tasks in parallel, then pass all results to +a final task: + +```python +result = chord( + group( + fetch.s("https://api1.example.com"), + fetch.s("https://api2.example.com"), + fetch.s("https://api3.example.com"), + ), + merge.s(), +).apply(queue) +``` + +## chunks / starmap + +```python +from taskito import chunks, starmap + +# Batch processing — split 1000 items into groups of 100 +results = chunks(process_batch, items, chunk_size=100).apply(queue) + +# Map-reduce pattern +result = chord( + chunks(process_batch, items, chunk_size=100), + merge_results.s(), +).apply(queue) + +# Tuple unpacking +results = starmap(add, [(1, 2), (3, 4), (5, 6)]).apply(queue) +``` + +## When to use canvas vs DAG workflows + +| Feature | Canvas | DAG Workflows | +|---------|--------|---------------| +| Setup | No imports needed | `from taskito.workflows import Workflow` | +| Topology | Linear chains, flat groups | Arbitrary DAGs | +| Fan-out | Static (known at build time) | Dynamic (from return values) | +| Conditions | None | `on_success`, `on_failure`, `always`, callables | +| Error handling | Per-task retries only | Workflow-level strategies | +| Approval gates | No | Yes | +| Sub-workflows | No | Yes | +| Incremental runs | No | Yes | +| Status tracking | Per-job only | Per-workflow + per-node | +| Visualization | No | Mermaid / DOT | + +Use canvas for quick one-off pipelines. Use DAG workflows for production +pipelines that need monitoring, conditions, or complex topologies. diff --git a/docs-next/content/docs/guides/workflows/composition.mdx b/docs-next/content/docs/guides/workflows/composition.mdx new file mode 100644 index 0000000..11aea72 --- /dev/null +++ b/docs-next/content/docs/guides/workflows/composition.mdx @@ -0,0 +1,97 @@ +--- +title: Sub-Workflows & Scheduling +description: "Nest workflows with WorkflowProxy.as_step(), schedule with @queue.periodic()." +--- + +import { Callout } from "fumadocs-ui/components/callout"; + +Nest workflows for composition, and schedule workflows on a cron. + +## Sub-workflows + +Use `WorkflowProxy.as_step()` to embed one workflow inside another: + +```python +@queue.workflow("etl") +def etl_pipeline(region): + wf = Workflow() + wf.step("extract", extract, args=[region]) + wf.step("load", load, after="extract") + return wf + +@queue.workflow("daily") +def daily_pipeline(): + wf = Workflow() + wf.step("eu_etl", etl_pipeline.as_step(region="eu")) + wf.step("us_etl", etl_pipeline.as_step(region="us")) + wf.step("reconcile", reconcile, after=["eu_etl", "us_etl"]) + return wf + +run = daily_pipeline.submit() +``` + + reconcile + us["us_etl"] --> reconcile + end + + subgraph Child1["etl (region=eu)"] + e1["extract"] --> l1["load"] + end + + subgraph Child2["etl (region=us)"] + e2["extract"] --> l2["load"] + end + + eu -.->|"submits"| Child1 + us -.->|"submits"| Child2`} +/> + +### How it works + +1. Parent workflow submits the child workflow via `queue.submit_workflow()` with `parent_run_id` +2. The child runs independently with its own nodes and status +3. When the child completes/fails, the tracker updates the parent node +4. Downstream steps in the parent evaluate normally + +### Cancellation cascade + +Cancelling the parent cascades to all active child workflows: + +```python +run.cancel() # Cancels parent + all child sub-workflows +``` + +### Failure + +If a child workflow fails at runtime, the parent node is marked `FAILED`. +Downstream steps follow the parent's `on_failure` strategy. + +The same holds for failures at *submission* time — if the child's factory +raises or the DAG fails to compile when the parent node becomes evaluable, +the parent node is marked `FAILED` immediately (rather than leaving the +outer run hanging), and the parent run finalizes normally. + +## Cron-scheduled workflows + +Stack `@queue.periodic()` on top of `@queue.workflow()`: + +```python +@queue.periodic(cron="0 0 2 * * *") # 2:00 AM daily +@queue.workflow("nightly_analytics") +def nightly(): + wf = Workflow() + wf.step("extract", extract_clickstream) + wf.step("aggregate", build_dashboards, after="extract") + return wf +``` + +Each cron trigger submits a new workflow run. Under the hood, a bridge task +`_wf_launcher_nightly_analytics` is registered that calls `proxy.submit()`. + + + The `@queue.periodic()` decorator must be the **outer** decorator + (applied second, listed first). + diff --git a/docs-next/content/docs/guides/workflows/conditions.mdx b/docs-next/content/docs/guides/workflows/conditions.mdx new file mode 100644 index 0000000..df05178 --- /dev/null +++ b/docs-next/content/docs/guides/workflows/conditions.mdx @@ -0,0 +1,132 @@ +--- +title: Conditions & Error Handling +description: "on_success, on_failure, always, callable conditions, fail_fast vs continue, skip propagation." +--- + +import { Tab, Tabs } from "fumadocs-ui/components/tabs"; + +Control which steps execute based on predecessor outcomes, and configure +how the workflow responds to failures. + +## Step conditions + +```python +wf.step("deploy", deploy, after="test") # default: on_success +wf.step("rollback", rollback, after="deploy", condition="on_failure") +wf.step("notify", send_slack, after="deploy", condition="always") +``` + +| Condition | Runs when | +|-----------|-----------| +| `None` / `"on_success"` | All predecessors completed successfully | +| `"on_failure"` | Any predecessor failed | +| `"always"` | Predecessors are terminal (regardless of outcome) | +| `callable` | `condition(ctx)` returns `True` | + +## Callable conditions + +Pass a function that receives a `WorkflowContext`: + +```python +from taskito.workflows import WorkflowContext + +def high_score(ctx: WorkflowContext) -> bool: + return ctx.results["validate"]["score"] > 0.95 + +wf.step("deploy", deploy, after="validate", condition=high_score) +``` + +`WorkflowContext` fields: + +| Field | Type | Description | +|-------|------|-------------| +| `run_id` | `str` | Workflow run ID | +| `results` | `dict[str, Any]` | Deserialized return values of completed nodes | +| `statuses` | `dict[str, str]` | Status strings for all terminal nodes | +| `failure_count` | `int` | Number of failed nodes | +| `success_count` | `int` | Number of completed nodes | + +## Error strategies + +Set the workflow-level error strategy: + + + + +```python +wf = Workflow(name="strict", on_failure="fail_fast") +``` + +One failure skips **all** pending steps. The workflow transitions to `FAILED`. + + B["b ✗"] + B --> C["c ━ SKIPPED"] + B --> D["d ━ SKIPPED"] + style B fill:#FFB6C1 + style C fill:#F5F5F5 + style D fill:#F5F5F5`} +/> + + + + +```python +wf = Workflow(name="resilient", on_failure="continue") +``` + +Failed steps skip their `on_success` dependents, but **independent +branches keep running**. + + fail_branch["fail ✗"] + root --> ok_branch["ok ✓"] + fail_branch --> after_fail["after_fail ━ SKIPPED"] + ok_branch --> after_ok["after_ok ✓"] + style fail_branch fill:#FFB6C1 + style after_fail fill:#F5F5F5 + style after_ok fill:#90EE90`} +/> + + + + +## Skip propagation + +When a node is skipped, its successors are evaluated recursively: + +- `on_success` successors → **SKIPPED** (predecessor didn't succeed) +- `on_failure` successors → evaluated (predecessor is terminal) +- `always` successors → **run** regardless of how the predecessor ended + +```python +wf = Workflow(name="cleanup_pipeline") +wf.step("a", risky_task) +wf.step("b", next_step, after="a") # SKIPPED if a fails +wf.step("cleanup", cleanup, after="b", condition="always") # runs even if b is skipped +``` + + B["b ━ SKIPPED"] + B --> C["cleanup ✓ ALWAYS"] + style A fill:#FFB6C1 + style B fill:#F5F5F5 + style C fill:#90EE90`} +/> + +## Combining conditions with fan-out + +Conditions work with fan-out nodes. If a fan-out child fails: + +```python +wf.step("fetch", fetch_data) +wf.step("process", process, after="fetch", fan_out="each") +wf.step("aggregate", aggregate, after="process", fan_in="all") +wf.step("on_error", alert, after="process", condition="on_failure") +``` + +If any `process[i]` child fails, the fan-out parent is marked `FAILED`, +`aggregate` is skipped, and `on_error` runs. diff --git a/docs-next/content/docs/guides/workflows/fan-out.mdx b/docs-next/content/docs/guides/workflows/fan-out.mdx new file mode 100644 index 0000000..57962c2 --- /dev/null +++ b/docs-next/content/docs/guides/workflows/fan-out.mdx @@ -0,0 +1,117 @@ +--- +title: Fan-Out & Fan-In +description: "Split a step's result into parallel children, collect results into a downstream step." +--- + +Split a step's result into parallel child jobs, then collect all results +into a downstream step. + + process_0["process[0]"] + fetch --> process_1["process[1]"] + fetch --> process_2["process[2]"] + process_0 --> aggregate + process_1 --> aggregate + process_2 --> aggregate + + style fetch fill:#90EE90 + style aggregate fill:#87CEEB`} +/> + +## Fan-out with `"each"` + +The predecessor's return value must be iterable. Each element becomes a +separate child job: + +```python +@queue.task() +def fetch() -> list[int]: + return [10, 20, 30] + +@queue.task() +def process(item: int) -> int: + return item * 2 + +@queue.task() +def aggregate(results: list[int]) -> int: + return sum(results) # receives [20, 40, 60] + +wf = Workflow(name="map_reduce") +wf.step("fetch", fetch) +wf.step("process", process, after="fetch", fan_out="each") +wf.step("aggregate", aggregate, after="process", fan_in="all") +``` + +Child nodes are named `process[0]`, `process[1]`, `process[2]` and appear +in status queries. + +## How it works + +1. `fetch` completes — the tracker reads its return value +2. `apply_fan_out("each", result)` splits the list into individual items +3. `expand_fan_out()` creates N child nodes + N jobs (no `depends_on` — they're ready immediately) +4. Each child runs independently in parallel +5. When all children complete, `check_fan_out_completion()` marks the parent +6. The tracker collects all child results in index order +7. The fan-in job is created with `((results_list,), {})` as its payload + +>T: JOB_COMPLETED(fetch) + T->>R: get_job(fetch_id).result_bytes + T->>R: expand_fan_out(3 children) + R-->>T: [child_job_ids] + Note over R: Children execute in parallel + R->>T: JOB_COMPLETED(process[0]) + R->>T: JOB_COMPLETED(process[1]) + R->>T: JOB_COMPLETED(process[2]) + T->>R: check_fan_out_completion → all done + T->>R: create_deferred_job(aggregate)`} +/> + +## Empty fan-out + +If the predecessor returns an empty list, the fan-out parent is marked +`COMPLETED` immediately with zero children, and the fan-in receives an +empty list: + +```python +@queue.task() +def fetch() -> list: + return [] # nothing to process + +# aggregate receives [] +``` + +## Fan-out with downstream steps + +Steps after the fan-in work normally: + +```python +wf = Workflow(name="full_pipeline") +wf.step("fetch", fetch) +wf.step("process", process, after="fetch", fan_out="each") +wf.step("aggregate", aggregate, after="process", fan_in="all") +wf.step("report", send_report, after="aggregate") # runs after aggregate +``` + +## Failure handling + +By default (`on_failure="fail_fast"`), if any fan-out child fails: + +- Remaining pending children are cancelled +- The fan-out parent is marked `FAILED` +- The fan-in and downstream steps are `SKIPPED` +- The workflow transitions to `FAILED` + +Combine with [conditions](/docs/guides/workflows/conditions) for more +control: + +```python +wf.step("handle_error", alert, after="process", condition="on_failure") +``` diff --git a/docs-next/content/docs/guides/workflows/gates.mdx b/docs-next/content/docs/guides/workflows/gates.mdx new file mode 100644 index 0000000..9066bc0 --- /dev/null +++ b/docs-next/content/docs/guides/workflows/gates.mdx @@ -0,0 +1,79 @@ +--- +title: Approval Gates +description: "Pause workflows for human review — approve, reject, timeout, gate-reached events." +--- + +Pause a workflow for human review. The gate enters `WAITING_APPROVAL` +status until explicitly approved or rejected — or until a timeout fires. + + eval["evaluate ✓"] + eval --> gate["approve ⏸"] + gate -->|approved| deploy["deploy"] + gate -->|rejected| skip["deploy ━ SKIPPED"] + style gate fill:#FFFACD`} +/> + +## Adding a gate + +```python +wf = Workflow(name="ml_deploy") +wf.step("train", train_model) +wf.step("evaluate", evaluate, after="train") +wf.gate("approve", after="evaluate") +wf.step("deploy", deploy, after="approve") +``` + +When the workflow reaches the gate, it pauses. Downstream steps won't +execute until the gate is resolved. + +## Resolving a gate + +```python +run = queue.submit_workflow(wf) + +# Later, after review: +queue.approve_gate(run.id, "approve") # → gate COMPLETED, deploy runs +# or: +queue.reject_gate(run.id, "approve") # → gate FAILED, deploy SKIPPED +``` + +## Timeout + +Auto-resolve after a deadline: + +```python +wf.gate("approve", after="evaluate", + timeout=86400, # 24 hours + on_timeout="reject") # or "approve" +``` + +| Parameter | Type | Default | Description | +|-----------|------|---------|-------------| +| `timeout` | `float` | `None` | Seconds until auto-resolve. `None` waits forever | +| `on_timeout` | `str` | `"reject"` | Action on expiry: `"approve"` or `"reject"` | +| `message` | `str` | `None` | Human-readable message for approvers | + +## Gate with conditions + +Gates respect step conditions: + +```python +wf.step("test", run_tests) +wf.gate("approve", after="test", condition="on_success") +wf.step("deploy", deploy, after="approve") +``` + +If `test` fails, the gate is skipped (condition not met), and `deploy` is +also skipped. + +## Events + +When a gate enters `WAITING_APPROVAL`, a `WORKFLOW_GATE_REACHED` event fires: + +```python +@queue.on(EventType.WORKFLOW_GATE_REACHED) +def notify_team(event_type, payload): + send_slack(f"Workflow {payload['run_id']} needs approval at {payload['node_name']}") +``` diff --git a/docs-next/content/docs/guides/workflows/index.mdx b/docs-next/content/docs/guides/workflows/index.mdx index 9ccf6ce..7d15778 100644 --- a/docs-next/content/docs/guides/workflows/index.mdx +++ b/docs-next/content/docs/guides/workflows/index.mdx @@ -1,10 +1,60 @@ --- title: Workflows -description: "DAG workflows, fan-out/fan-in, gates, sub-workflows." +description: "Build multi-step pipelines as directed acyclic graphs." --- -import { Callout } from 'fumadocs-ui/components/callout'; +Build multi-step pipelines as directed acyclic graphs. Define steps, wire +dependencies, and let taskito handle execution order, parallelism, failure +propagation, and state tracking — all backed by a Rust engine with +dagron-core for graph algorithms. - - Content port pending. See the [Zensical source](https://github.com/ByteVeda/taskito/tree/master/docs) for current text. - +|"_compile()"| TE["Topology Engine\\ndagron-core DAG"] + TE --> WS["Workflow Storage\\nSQLite — definitions, runs, nodes"] + WS -->|"enqueue jobs"| SC["Scheduler\\ndepends_on chains"] + SC -->|"JOB_COMPLETED event"| WT["Workflow Tracker\\nevent-driven orchestration"] + WT -->|"evaluate successors"| WS + WT --> WR["WorkflowRun\\nstatus() · wait() · cancel()"]`} +/> + +## Quick start + +```python +from taskito import Queue +from taskito.workflows import Workflow + +queue = Queue(db_path="tasks.db") + +@queue.task() +def extract(): return fetch_data() + +@queue.task() +def transform(data): return clean(data) + +@queue.task() +def load(data): db.insert(data) + +wf = Workflow(name="etl_pipeline") +wf.step("extract", extract) +wf.step("transform", transform, after="extract") +wf.step("load", load, after="transform") + +run = queue.submit_workflow(wf) +result = run.wait(timeout=60) +print(result.state) # WorkflowState.COMPLETED +``` + +## Section overview + +| Page | What it covers | +|---|---| +| [Building Workflows](/docs/guides/workflows/building) | `Workflow.step()`, decorator pattern, step configuration, DAG structure | +| [Fan-Out & Fan-In](/docs/guides/workflows/fan-out) | Splitting results into parallel jobs, collecting with aggregation | +| [Conditions & Error Handling](/docs/guides/workflows/conditions) | `on_success`, `on_failure`, `always`, callable conditions, `on_failure` modes | +| [Approval Gates](/docs/guides/workflows/gates) | Human-in-the-loop pause/resume, timeout, approve/reject API | +| [Sub-Workflows & Scheduling](/docs/guides/workflows/composition) | Nesting workflows, cron-scheduled runs | +| [Incremental Runs](/docs/guides/workflows/caching) | Result hashing, `CACHE_HIT`, dirty-set propagation, TTL | +| [Analysis & Visualization](/docs/guides/workflows/analysis) | Critical path, bottleneck analysis, Mermaid/DOT rendering | +| [Canvas Primitives](/docs/guides/workflows/canvas) | Chain, group, chord — simple composition without DAGs | diff --git a/docs-next/content/docs/guides/workflows/meta.json b/docs-next/content/docs/guides/workflows/meta.json index e15b535..b20359e 100644 --- a/docs-next/content/docs/guides/workflows/meta.json +++ b/docs-next/content/docs/guides/workflows/meta.json @@ -1,4 +1,14 @@ { "title": "Workflows", - "pages": ["index"] + "pages": [ + "index", + "building", + "fan-out", + "conditions", + "gates", + "composition", + "caching", + "analysis", + "canvas" + ] }