diff --git a/docs-next/content/docs/more/examples/benchmark.mdx b/docs-next/content/docs/more/examples/benchmark.mdx
new file mode 100644
index 0000000..f3401cb
--- /dev/null
+++ b/docs-next/content/docs/more/examples/benchmark.mdx
@@ -0,0 +1,230 @@
+---
+title: Benchmark
+description: "Throughput benchmark — enqueue / processing / latency, comparison with Celery and Dramatiq."
+---
+
+import { Callout } from "fumadocs-ui/components/callout";
+
+Measure taskito's throughput by enqueuing and processing a large batch of
+tasks.
+
+## benchmark.py
+
+```python
+"""taskito throughput benchmark.
+
+Measures:
+1. Enqueue throughput (jobs/sec) using batch insert
+2. Processing throughput (jobs/sec) with N workers
+3. End-to-end latency
+"""
+
+import os
+import threading
+import time
+
+from taskito import Queue
+
+# ── Configuration ────────────────────────────────────────
+
+NUM_JOBS = 10_000
+NUM_WORKERS = os.cpu_count() or 4
+DB_PATH = ":memory:" # In-memory for pure speed test
+
+queue = Queue(db_path=DB_PATH, workers=NUM_WORKERS)
+
+@queue.task()
+def noop(x):
+ """Minimal task — measures framework overhead."""
+ return x
+
+@queue.task()
+def cpu_light(x):
+ """Light CPU work — string formatting."""
+ return f"processed-{x}-{'x' * 100}"
+
+# ── Benchmark Functions ──────────────────────────────────
+
+def bench_enqueue(task, n):
+ """Measure batch enqueue throughput."""
+ args_list = [(i,) for i in range(n)]
+
+ start = time.perf_counter()
+ jobs = task.map(args_list)
+ elapsed = time.perf_counter() - start
+
+ rate = n / elapsed
+ print(f" Enqueued {n:,} jobs in {elapsed:.2f}s ({rate:,.0f} jobs/s)")
+ return jobs
+
+def bench_process(jobs, timeout=120):
+ """Measure processing throughput by waiting for all jobs."""
+ n = len(jobs)
+ start = time.perf_counter()
+
+ last = jobs[-1]
+ try:
+ last.result(timeout=timeout, poll_interval=0.01, max_poll_interval=0.1)
+ except TimeoutError:
+ stats = queue.stats()
+ print(f" Timed out! Stats: {stats}")
+ return
+
+ elapsed = time.perf_counter() - start
+ rate = n / elapsed
+ print(f" Processed {n:,} jobs in {elapsed:.2f}s ({rate:,.0f} jobs/s)")
+
+def bench_latency(task, samples=100):
+ """Measure single-job round-trip latency."""
+ latencies = []
+ for i in range(samples):
+ start = time.perf_counter()
+ job = task.delay(i)
+ job.result(timeout=10)
+ latencies.append(time.perf_counter() - start)
+
+ avg = sum(latencies) / len(latencies)
+ p50 = sorted(latencies)[len(latencies) // 2]
+ p99 = sorted(latencies)[int(len(latencies) * 0.99)]
+ print(f" Latency (n={samples}): avg={avg*1000:.1f}ms p50={p50*1000:.1f}ms p99={p99*1000:.1f}ms")
+
+# ── Main ─────────────────────────────────────────────────
+
+def main():
+ print(f"taskito benchmark")
+ print(f" Workers: {NUM_WORKERS}")
+ print(f" Jobs: {NUM_JOBS:,}")
+ print(f" DB: {DB_PATH}")
+ print()
+
+ # Start worker in background
+ worker_thread = threading.Thread(target=queue.run_worker, daemon=True)
+ worker_thread.start()
+ time.sleep(0.5)
+
+ print("── noop task (framework overhead) ──")
+ jobs = bench_enqueue(noop, NUM_JOBS)
+ bench_process(jobs)
+ print()
+
+ print("── cpu_light task ──")
+ jobs = bench_enqueue(cpu_light, NUM_JOBS)
+ bench_process(jobs)
+ print()
+
+ print("── single-job latency ──")
+ bench_latency(noop)
+ print()
+
+ stats = queue.stats()
+ print(f"Final stats: {stats}")
+
+if __name__ == "__main__":
+ main()
+```
+
+## Running
+
+```bash
+python benchmark.py
+```
+
+## Sample output
+
+```
+taskito benchmark
+ Workers: 8
+ Jobs: 10,000
+ DB: :memory:
+
+── noop task (framework overhead) ──
+ Enqueued 10,000 jobs in 0.18s (55,556 jobs/s)
+ Processed 10,000 jobs in 2.41s (4,149 jobs/s)
+
+── cpu_light task ──
+ Enqueued 10,000 jobs in 0.19s (52,632 jobs/s)
+ Processed 10,000 jobs in 2.53s (3,953 jobs/s)
+
+── single-job latency ──
+ Latency (n=100): avg=1.2ms p50=1.1ms p99=3.4ms
+
+Final stats: {'pending': 0, 'running': 0, 'completed': 20100, 'failed': 0, 'dead': 0, 'cancelled': 0}
+```
+
+
+ Actual numbers depend on your hardware, Python version, and SQLite
+ configuration. The numbers above are from an 8-core machine with Python
+ 3.12.
+
+
+## What makes taskito fast
+
+| Component | How it helps |
+|---|---|
+| **Batch inserts** | `task.map()` inserts all jobs in a single SQLite transaction |
+| **WAL mode** | Concurrent reads while writing — workers don't block enqueue |
+| **Rust scheduler** | 50ms poll loop runs in native code, not Python |
+| **OS threads** | Workers are Rust `std::thread`, not Python threads |
+| **GIL per task** | GIL acquired only during Python task execution, released between tasks |
+| **tokio mpsc channels** | Bounded async dispatch to workers |
+| **r2d2 pool** | Up to 8 concurrent SQLite connections |
+| **Diesel ORM** | Compiled SQL queries, no runtime query building |
+
+## How it compares
+
+Rough directional comparison on the same hardware (8-core, single
+machine). These are not scientific benchmarks — run the script above on
+your own hardware for accurate numbers.
+
+| Metric | taskito (SQLite) | taskito (Postgres) | Celery + Redis | Dramatiq + Redis |
+|--------|-----------------|-------------------|---------------|-----------------|
+| Enqueue throughput | ~55,000/s | ~20,000/s | ~5,000/s | ~3,000/s |
+| Processing (noop, 8 workers) | ~4,000/s | ~3,500/s | ~2,000/s | ~1,500/s |
+| p50 latency | 1.1ms | 2.5ms | 5–10ms | 8–15ms |
+| p99 latency | 3.4ms | 8ms | 20–50ms | 30–80ms |
+| Memory (idle worker) | ~30 MB | ~35 MB | ~80 MB | ~60 MB |
+| Setup | `pip install taskito` | + Postgres | + Redis + Celery | + Redis + Dramatiq |
+| External services | 0 | 1 (Postgres) | 2 (Redis + result backend) | 1 (Redis) |
+
+
+ Celery numbers are from public benchmarks and community reports. Your
+ mileage will vary depending on workload, serializer, and broker
+ configuration. Run your own benchmarks before making decisions.
+
+
+**Why is taskito faster?**
+
+- Rust scheduler avoids GIL contention — scheduling and dispatch never block Python
+- SQLite WAL mode with batch inserts — disk I/O is minimized
+- Direct DB polling — no broker hop (enqueue → DB → dequeue is one less network round-trip vs enqueue → Redis → dequeue)
+- OS thread pool with per-task GIL acquisition — no multiprocessing overhead for I/O-bound tasks
+
+## Tune for your workload
+
+| Symptom | Config to change | Why |
+|---------|-----------------|-----|
+| Low throughput (I/O tasks) | Increase `workers` | More threads = more concurrent I/O |
+| Low throughput (CPU tasks) | Use `pool="prefork"` | Each process gets its own GIL |
+| High latency | Decrease `scheduler_poll_interval_ms` | Scheduler checks for ready jobs more often |
+| Database too busy | Increase `scheduler_poll_interval_ms` | Less frequent polling reduces DB load |
+| Memory growing | Set `result_ttl` | Auto-cleanup old results and metrics |
+| Jobs timing out | Increase `default_timeout` | Give tasks more time to complete |
+| Jobs piling up | Add more workers or use Postgres | SQLite single-writer limit may bottleneck |
+
+## Tuning
+
+Adjust these for your workload:
+
+```python
+# More workers for I/O-bound tasks
+queue = Queue(workers=16)
+
+# Fewer workers for CPU-bound tasks (limited by GIL)
+queue = Queue(workers=4)
+
+# In-memory DB for maximum throughput (no persistence)
+queue = Queue(db_path=":memory:")
+
+# File DB for durability (slightly slower)
+queue = Queue(db_path="tasks.db")
+```
diff --git a/docs-next/content/docs/more/examples/data-pipeline.mdx b/docs-next/content/docs/more/examples/data-pipeline.mdx
new file mode 100644
index 0000000..8c97a87
--- /dev/null
+++ b/docs-next/content/docs/more/examples/data-pipeline.mdx
@@ -0,0 +1,278 @@
+---
+title: ETL Data Pipeline
+description: "Multi-stage extract → transform → load with task dependencies, named queues, progress tracking, error inspection."
+---
+
+import { Tab, Tabs } from "fumadocs-ui/components/tabs";
+
+A multi-stage extract → transform → load pipeline demonstrating task
+dependencies, DAG workflows, progress tracking, error history inspection,
+metadata, and named queues.
+
+## Project structure
+
+```
+data-pipeline/
+ pipeline.py # Task definitions + DAG construction
+ worker.py # Worker entry point
+ monitor.py # Status monitoring script
+```
+
+## pipeline.py
+
+```python
+"""ETL pipeline with task dependencies and named queues."""
+
+import csv
+import json
+
+import httpx
+
+from taskito import Queue, current_job
+
+queue = Queue(
+ db_path=".taskito/pipeline.db",
+ workers=6,
+ default_retry=3,
+ default_timeout=120,
+)
+
+# ── Extract Tasks ────────────────────────────────────────
+
+@queue.task(queue="extract", max_retries=5, retry_backoff=2.0)
+def extract_api(endpoint: str) -> list[dict]:
+ """Pull records from an API endpoint with retries."""
+ response = httpx.get(endpoint, timeout=30)
+ response.raise_for_status()
+ return response.json()
+
+@queue.task(queue="extract")
+def extract_csv(file_path: str) -> list[dict]:
+ """Read records from a CSV file."""
+ with open(file_path, newline="") as f:
+ return list(csv.DictReader(f))
+
+# ── Transform Tasks ──────────────────────────────────────
+
+@queue.task(queue="transform")
+def normalize(records: list[dict], schema: str) -> list[dict]:
+ """Normalize records against a schema with progress tracking."""
+ results = []
+ for i, record in enumerate(records):
+ results.append({**record, "schema": schema, "normalized": True})
+ if (i + 1) % 50 == 0:
+ current_job.update_progress(int((i + 1) / len(records) * 100))
+ current_job.update_progress(100)
+ return results
+
+@queue.task(queue="transform")
+def deduplicate(records: list[dict]) -> list[dict]:
+ """Remove duplicate records by ID."""
+ seen = set()
+ unique = []
+ for r in records:
+ if r["id"] not in seen:
+ seen.add(r["id"])
+ unique.append(r)
+ return unique
+
+# ── Load Tasks ───────────────────────────────────────────
+
+@queue.task(queue="load")
+def load_to_warehouse(records: list[dict], table: str) -> dict:
+ """Load records into the data warehouse (writes JSON to disk as stand-in)."""
+ dest = f"/tmp/{table.replace('.', '_')}.json"
+ with open(dest, "w") as f:
+ json.dump(records, f, indent=2)
+ return {"table": table, "rows_inserted": len(records), "dest": dest}
+
+# ── DAG Construction ─────────────────────────────────────
+
+def build_pipeline(api_endpoint: str, csv_path: str, target_table: str):
+ """Build a diamond-shaped ETL DAG.
+
+ extract_api ──→ normalize_a ──┐
+ ├──→ load
+ extract_csv ──→ normalize_b ──┘
+ """
+
+ # Stage 1: Extract (parallel)
+ job_api = extract_api.apply_async(
+ args=[api_endpoint],
+ metadata=json.dumps({"source": "api", "endpoint": api_endpoint}),
+ )
+ job_csv = extract_csv.apply_async(
+ args=[csv_path],
+ metadata=json.dumps({"source": "csv", "file": csv_path}),
+ )
+
+ # Stage 2: Transform (each depends on its extract)
+ job_norm_a = normalize.apply_async(
+ args=[[], "schema_v2"], # actual data passed via result
+ depends_on=job_api.id,
+ metadata=json.dumps({"stage": "transform", "schema": "v2"}),
+ )
+ job_norm_b = normalize.apply_async(
+ args=[[], "schema_v2"],
+ depends_on=job_csv.id,
+ metadata=json.dumps({"stage": "transform", "schema": "v2"}),
+ )
+
+ # Stage 3: Load (depends on both transforms)
+ job_load = load_to_warehouse.apply_async(
+ args=[[], target_table],
+ depends_on=[job_norm_a.id, job_norm_b.id],
+ priority=10, # high priority once unblocked
+ metadata=json.dumps({"stage": "load", "table": target_table}),
+ )
+
+ return {
+ "extract": [job_api, job_csv],
+ "transform": [job_norm_a, job_norm_b],
+ "load": job_load,
+ }
+
+
+if __name__ == "__main__":
+ print("Building ETL pipeline...")
+ jobs = build_pipeline(
+ api_endpoint="https://api.example.com/records",
+ csv_path="/data/export.csv",
+ target_table="analytics.events",
+ )
+
+ print(f"\nDAG created:")
+ print(f" Extract: {[j.id for j in jobs['extract']]}")
+ print(f" Transform: {[j.id for j in jobs['transform']]}")
+ print(f" Load: {jobs['load'].id}")
+
+ # Inspect dependency graph
+ load_job = queue.get_job(jobs["load"].id)
+ print(f"\nLoad depends on: {load_job.dependencies}")
+
+ for ext_job in jobs["extract"]:
+ fetched = queue.get_job(ext_job.id)
+ print(f" {ext_job.id} dependents: {fetched.dependents}")
+```
+
+## worker.py
+
+```python
+"""Start the pipeline worker."""
+from pipeline import queue
+
+if __name__ == "__main__":
+ print("Starting pipeline worker (queues: extract, transform, load)...")
+ queue.run_worker(queues=["extract", "transform", "load"])
+```
+
+## monitor.py
+
+```python
+"""Monitor pipeline status and inspect errors."""
+
+import time
+from pipeline import queue
+
+def monitor(load_job_id: str):
+ """Poll the pipeline until the load job completes."""
+ while True:
+ stats = queue.stats()
+ print(f"Queue stats: {stats}")
+
+ job = queue.get_job(load_job_id)
+ if job is None:
+ print("Load job not found!")
+ return
+
+ print(f"Load job status: {job.status}, progress: {job.progress}%")
+
+ if job.status == "complete":
+ print(f"\nPipeline complete! Result: {job.result(timeout=1)}")
+ return
+
+ if job.status in ("failed", "dead", "cancelled"):
+ print(f"\nPipeline failed with status: {job.status}")
+
+ # Inspect error history for all jobs
+ for dep_id in job.dependencies:
+ dep = queue.get_job(dep_id)
+ if dep and dep.status in ("dead", "failed"):
+ errors = dep.errors
+ print(f"\n Job {dep_id} errors:")
+ for err in errors:
+ print(f" Attempt {err['attempt']}: {err['error']}")
+ print(f" At: {err['failed_at']}")
+
+ # Check dead letter queue
+ dead = queue.dead_letters(limit=10)
+ if dead:
+ print(f"\nDead letters ({len(dead)}):")
+ for d in dead:
+ print(f" {d['id']}: {d['task_name']} — {d['error']}")
+ return
+
+ time.sleep(2)
+
+if __name__ == "__main__":
+ import sys
+ if len(sys.argv) < 2:
+ print("Usage: python monitor.py ")
+ sys.exit(1)
+ monitor(sys.argv[1])
+```
+
+## Running it
+
+
+
+
+```bash
+python worker.py
+```
+
+
+
+
+```bash
+python pipeline.py
+# Copy the load job ID from the output
+```
+
+
+
+
+```bash
+python monitor.py
+```
+
+
+
+
+## Cascade cancellation
+
+If an extract job fails permanently (exhausts all retries), the entire
+downstream chain is automatically cancelled:
+
+```python
+# If extract_api fails after 5 retries:
+# → normalize_a is cascade cancelled
+# → load is cascade cancelled (because one dependency failed)
+# normalize_b may still complete, but load won't run
+```
+
+This prevents wasting resources on a pipeline that can't succeed.
+
+## Key patterns demonstrated
+
+| Pattern | Where |
+|---|---|
+| Task dependencies | `depends_on` in transform and load stages |
+| Diamond DAG | Two branches converge at the load stage |
+| Cascade cancel | Extract failure cancels downstream transforms and load |
+| Progress tracking | `normalize` reports progress every 50 records |
+| Error history | `monitor.py` inspects `job.errors` for failed jobs |
+| Metadata | Each job tagged with source/stage info via `metadata` |
+| Named queues | `extract`, `transform`, `load` for queue isolation |
+| Priority | Load job gets `priority=10` to run first once unblocked |
+| Dead letter inspection | `monitor.py` checks `queue.dead_letters()` |
diff --git a/docs-next/content/docs/more/examples/fastapi-service.mdx b/docs-next/content/docs/more/examples/fastapi-service.mdx
new file mode 100644
index 0000000..776e684
--- /dev/null
+++ b/docs-next/content/docs/more/examples/fastapi-service.mdx
@@ -0,0 +1,190 @@
+---
+title: "FastAPI Image Processing Service"
+description: "REST API with progress tracking, SSE streaming, async result fetching, and cancellation."
+---
+
+import { Tab, Tabs } from "fumadocs-ui/components/tabs";
+
+A REST API for image processing that demonstrates FastAPI integration,
+progress tracking, async result fetching, job cancellation, and SSE
+progress streaming.
+
+## Project structure
+
+```
+image-service/
+ app.py # FastAPI app + task definitions
+ client.py # Example client script
+```
+
+## app.py
+
+```python
+"""FastAPI image processing service with taskito."""
+
+import time
+
+from fastapi import FastAPI, HTTPException
+from taskito import Queue, current_job
+from taskito.contrib.fastapi import TaskitoRouter
+
+queue = Queue(db_path=".taskito/images.db", workers=4, result_ttl=3600)
+
+# ── Tasks ────────────────────────────────────────────────
+
+@queue.task(timeout=300)
+def resize_image(image_url: str, sizes: list[int]) -> dict:
+ """Resize an image to multiple sizes with progress updates."""
+ results = {}
+ for i, size in enumerate(sizes):
+ # Simulate resize work
+ time.sleep(1)
+ results[f"{size}x{size}"] = f"{image_url}?w={size}&h={size}"
+ current_job.update_progress(int((i + 1) / len(sizes) * 100))
+ return results
+
+@queue.task(max_retries=3, retry_backoff=2.0)
+def generate_thumbnail(image_url: str) -> str:
+ """Generate a thumbnail — retries on failure."""
+ time.sleep(0.5)
+ return f"{image_url}?thumb=true"
+
+@queue.task(timeout=600)
+def apply_filters(image_url: str, filters: list[str]) -> dict:
+ """Apply a sequence of filters with progress."""
+ results = {}
+ for i, f in enumerate(filters):
+ time.sleep(2)
+ results[f] = f"{image_url}?filter={f}"
+ current_job.update_progress(int((i + 1) / len(filters) * 100))
+ return results
+
+# ── FastAPI App ──────────────────────────────────────────
+
+app = FastAPI(title="Image Processing Service")
+
+# Mount the taskito router — adds /tasks/stats, /tasks/jobs/{id}, etc.
+app.include_router(TaskitoRouter(queue), prefix="/tasks")
+
+@app.post("/process")
+async def submit_job(image_url: str, sizes: list[int] | None = None):
+ """Submit an image processing job and return the job ID."""
+ if sizes is None:
+ sizes = [128, 256, 512, 1024]
+ job = resize_image.delay(image_url, sizes)
+ return {"job_id": job.id, "status_url": f"/tasks/jobs/{job.id}"}
+
+@app.post("/process/{job_id}/cancel")
+async def cancel_job(job_id: str):
+ """Cancel a pending image processing job."""
+ cancelled = await queue.acancel_job(job_id)
+ if not cancelled:
+ raise HTTPException(400, "Job is not in a cancellable state")
+ return {"cancelled": True, "job_id": job_id}
+
+@app.get("/process/{job_id}/result")
+async def get_result(job_id: str, timeout: float = 0):
+ """Get the result, optionally blocking until complete."""
+ job = queue.get_job(job_id)
+ if job is None:
+ raise HTTPException(404, "Job not found")
+ if timeout > 0:
+ try:
+ result = await job.aresult(timeout=timeout)
+ return {"status": "complete", "result": result}
+ except TimeoutError:
+ return {"status": job.status, "result": None}
+ return {"status": job.status, "progress": job.progress}
+```
+
+## client.py
+
+```python
+"""Example client for the image processing service."""
+
+import httpx
+import json
+import time
+
+BASE = "http://localhost:8000"
+
+# 1. Submit a job
+resp = httpx.post(f"{BASE}/process", params={
+ "image_url": "https://example.com/photo.jpg",
+ "sizes": [128, 256, 512, 1024],
+})
+data = resp.json()
+job_id = data["job_id"]
+print(f"Submitted job: {job_id}")
+
+# 2. Stream progress via SSE
+print("\nStreaming progress:")
+with httpx.stream("GET", f"{BASE}/tasks/jobs/{job_id}/progress") as r:
+ for line in r.iter_lines():
+ if line.startswith("data:"):
+ payload = json.loads(line[5:].strip())
+ print(f" Progress: {payload['progress']}% — Status: {payload['status']}")
+ if payload["status"] in ("complete", "failed", "dead", "cancelled"):
+ break
+
+# 3. Fetch the final result
+result = httpx.get(f"{BASE}/tasks/jobs/{job_id}/result", params={"timeout": 5})
+print(f"\nResult: {result.json()}")
+```
+
+## Running it
+
+
+
+
+```bash
+taskito worker --app app:queue
+```
+
+
+
+
+```bash
+uvicorn app:app --reload
+```
+
+
+
+
+```bash
+python client.py
+```
+
+
+
+
+## SSE from the browser
+
+```javascript
+const jobId = "01H5K6X...";
+const source = new EventSource(`/tasks/jobs/${jobId}/progress`);
+
+const progressBar = document.getElementById("progress");
+
+source.onmessage = (event) => {
+ const data = JSON.parse(event.data);
+ progressBar.style.width = `${data.progress}%`;
+ progressBar.textContent = `${data.progress}%`;
+
+ if (["complete", "failed", "dead", "cancelled"].includes(data.status)) {
+ source.close();
+ }
+};
+```
+
+## Key patterns demonstrated
+
+| Pattern | Where |
+|---|---|
+| FastAPI integration | `TaskitoRouter(queue)` mounted at `/tasks` |
+| Progress tracking | `current_job.update_progress()` in `resize_image` and `apply_filters` |
+| Async result fetch | `await job.aresult(timeout=...)` in `get_result` endpoint |
+| Async cancellation | `await queue.acancel_job()` in `cancel_job` endpoint |
+| SSE streaming | `/tasks/jobs/{id}/progress` endpoint from `TaskitoRouter` |
+| Retry with backoff | `generate_thumbnail` — 3 retries, 2x backoff |
+| Result TTL | `result_ttl=3600` — auto-cleanup after 1 hour |
diff --git a/docs-next/content/docs/more/examples/index.mdx b/docs-next/content/docs/more/examples/index.mdx
index e7da88f..1b76381 100644
--- a/docs-next/content/docs/more/examples/index.mdx
+++ b/docs-next/content/docs/more/examples/index.mdx
@@ -1,10 +1,15 @@
---
title: Examples
-description: "End-to-end examples for common patterns."
+description: "End-to-end examples demonstrating common taskito patterns."
---
-import { Callout } from 'fumadocs-ui/components/callout';
+End-to-end examples demonstrating common taskito patterns.
-
- Content port pending. See the [Zensical source](https://github.com/ByteVeda/taskito/tree/master/docs) for current text.
-
+| Example | Description |
+|---------|-------------|
+| [FastAPI Service](/docs/more/examples/fastapi-service) | REST API that enqueues tasks and streams progress via SSE |
+| [Notification Service](/docs/more/examples/notifications) | Multi-channel notifications with retries and rate limiting |
+| [Web Scraper Pipeline](/docs/more/examples/web-scraper) | Distributed scraping with chains and error handling |
+| [Data Pipeline](/docs/more/examples/data-pipeline) | ETL pipeline with dependencies, groups, and chords |
+| [DAG Workflows](/docs/more/examples/workflows) | Fan-out, conditions, gates, sub-workflows, incremental runs |
+| [Benchmark](/docs/more/examples/benchmark) | Performance benchmarks comparing taskito to alternatives |
diff --git a/docs-next/content/docs/more/examples/meta.json b/docs-next/content/docs/more/examples/meta.json
index d7e7b32..3a29f22 100644
--- a/docs-next/content/docs/more/examples/meta.json
+++ b/docs-next/content/docs/more/examples/meta.json
@@ -1,4 +1,12 @@
{
"title": "Examples",
- "pages": ["index"]
+ "pages": [
+ "index",
+ "fastapi-service",
+ "notifications",
+ "web-scraper",
+ "data-pipeline",
+ "workflows",
+ "benchmark"
+ ]
}
diff --git a/docs-next/content/docs/more/examples/notifications.mdx b/docs-next/content/docs/more/examples/notifications.mdx
new file mode 100644
index 0000000..aaed82b
--- /dev/null
+++ b/docs-next/content/docs/more/examples/notifications.mdx
@@ -0,0 +1,226 @@
+---
+title: Notification Service
+description: "Delayed scheduling, unique tasks, priority queues, and job cancellation across email/SMS/push channels."
+---
+
+import { Tab, Tabs } from "fumadocs-ui/components/tabs";
+
+A notification service demonstrating delayed scheduling, unique tasks,
+priority queues, and job cancellation.
+
+## Project structure
+
+```
+notifications/
+ tasks.py # Notification task definitions
+ service.py # Enqueue notifications with scheduling
+```
+
+## tasks.py
+
+```python
+"""Notification tasks with priority and deduplication."""
+
+import os
+
+import httpx
+
+from taskito import Queue
+
+queue = Queue(
+ db_path=".taskito/notifications.db",
+ workers=4,
+ default_retry=3,
+ default_timeout=30,
+ result_ttl=7200, # auto-cleanup after 2 hours
+)
+
+# ── Notification Tasks ───────────────────────────────────
+
+@queue.task(priority=10, max_retries=3, retry_backoff=2.0)
+def send_urgent_email(to: str, subject: str, body: str) -> dict:
+ """High-priority email — runs before bulk notifications."""
+ response = httpx.post(
+ "https://api.mailgun.net/v3/YOUR_DOMAIN/messages",
+ auth=("api", os.environ["MAILGUN_API_KEY"]),
+ data={"from": "noreply@example.com", "to": to, "subject": subject, "text": body},
+ )
+ response.raise_for_status()
+ return {"to": to, "subject": subject, "sent": True}
+
+@queue.task(priority=0, max_retries=3, retry_backoff=2.0)
+def send_bulk_email(to: str, subject: str, body: str) -> dict:
+ """Low-priority bulk email."""
+ response = httpx.post(
+ "https://api.mailgun.net/v3/YOUR_DOMAIN/messages",
+ auth=("api", os.environ["MAILGUN_API_KEY"]),
+ data={"from": "noreply@example.com", "to": to, "subject": subject, "text": body},
+ )
+ response.raise_for_status()
+ return {"to": to, "subject": subject, "sent": True}
+
+@queue.task(priority=5, max_retries=5, retry_backoff=2.0)
+def send_push(user_id: str, title: str, message: str) -> dict:
+ """Push notification with retries."""
+ response = httpx.post(
+ "https://fcm.googleapis.com/fcm/send",
+ headers={"Authorization": f"key={os.environ['FCM_SERVER_KEY']}"},
+ json={"to": f"/topics/user-{user_id}", "notification": {"title": title, "body": message}},
+ )
+ response.raise_for_status()
+ return {"user_id": user_id, "title": title, "sent": True}
+
+@queue.task(max_retries=3, retry_backoff=2.0)
+def send_sms(phone: str, message: str) -> dict:
+ """SMS notification via Twilio."""
+ response = httpx.post(
+ f"https://api.twilio.com/2010-04-01/Accounts/{os.environ['TWILIO_ACCOUNT_SID']}/Messages.json",
+ auth=(os.environ["TWILIO_ACCOUNT_SID"], os.environ["TWILIO_AUTH_TOKEN"]),
+ data={"From": os.environ["TWILIO_FROM_NUMBER"], "To": phone, "Body": message},
+ )
+ response.raise_for_status()
+ return {"phone": phone, "sent": True}
+
+# ── Periodic Digest ──────────────────────────────────────
+
+@queue.periodic(cron="0 9 * * * *")
+def daily_digest():
+ """Send daily digest email every day at 9 AM."""
+ print("[DIGEST] Sending daily digest to all subscribers...")
+```
+
+## service.py
+
+```python
+"""Notification service — enqueue with scheduling, deduplication, and cancellation."""
+
+import time
+from tasks import (
+ queue,
+ send_urgent_email,
+ send_bulk_email,
+ send_push,
+ send_sms,
+)
+
+# ── 1. Delayed Scheduling ───────────────────────────────
+# Schedule a reminder 30 minutes from now
+
+print("1. Delayed scheduling")
+reminder = send_push.apply_async(
+ args=["user_123", "Reminder", "Your meeting starts in 5 minutes"],
+ delay=1800, # 30 minutes from now
+)
+print(f" Scheduled reminder: {reminder.id} (runs in 30 min)")
+
+# Schedule a follow-up email for 1 hour later
+followup = send_bulk_email.apply_async(
+ args=["user@example.com", "How was your meeting?", "We'd love your feedback."],
+ delay=3600, # 1 hour from now
+)
+print(f" Scheduled follow-up: {followup.id} (runs in 1 hour)")
+
+# ── 2. Unique Tasks (Deduplication) ─────────────────────
+# Prevent duplicate notifications for the same event
+
+print("\n2. Unique tasks")
+job1 = send_push.apply_async(
+ args=["user_456", "New message", "You have a new message"],
+ unique_key="notify:user_456:new_message",
+)
+job2 = send_push.apply_async(
+ args=["user_456", "New message", "You have a new message"],
+ unique_key="notify:user_456:new_message",
+)
+print(f" First enqueue: {job1.id}")
+print(f" Second enqueue: {job2.id}")
+print(f" Same job? {job1.id == job2.id}") # True — deduplicated
+
+# ── 3. Priority Queues ──────────────────────────────────
+# Urgent notifications run before bulk
+
+print("\n3. Priority queues")
+# Enqueue bulk emails first
+bulk_jobs = send_bulk_email.map([
+ ("alice@example.com", "Newsletter", "This week's updates..."),
+ ("bob@example.com", "Newsletter", "This week's updates..."),
+ ("carol@example.com", "Newsletter", "This week's updates..."),
+])
+print(f" Enqueued {len(bulk_jobs)} bulk emails (priority=0)")
+
+# Enqueue urgent email after — but it runs first due to priority=10
+urgent = send_urgent_email.delay(
+ "admin@example.com",
+ "Server Alert",
+ "CPU usage exceeded 90%",
+)
+print(f" Enqueued urgent email (priority=10) — runs before bulk")
+
+# ── 4. Job Cancellation ─────────────────────────────────
+# Cancel a scheduled notification before it sends
+
+print("\n4. Job cancellation")
+scheduled = send_sms.apply_async(
+ args=["+1234567890", "Your order ships tomorrow"],
+ delay=7200, # 2 hours from now
+)
+print(f" Scheduled SMS: {scheduled.id}")
+
+# User updated their preference — cancel the SMS
+cancelled = queue.cancel_job(scheduled.id)
+print(f" Cancelled: {cancelled}") # True
+
+job = queue.get_job(scheduled.id)
+print(f" Status: {job.status}") # "cancelled"
+
+# ── 5. Inspect Pending Notifications ─────────────────────
+
+print("\n5. Pending notifications")
+pending = queue.list_jobs(status="pending", limit=10)
+for j in pending:
+ d = j.to_dict()
+ print(f" {d['id'][:12]}... | {d['task_name']} | priority={d['priority']}")
+
+stats = queue.stats()
+print(f"\nQueue stats: {stats}")
+```
+
+## Running it
+
+
+
+
+```bash
+taskito worker --app tasks:queue
+```
+
+
+
+
+```bash
+python service.py
+```
+
+
+
+
+```bash
+taskito info --app tasks:queue --watch
+```
+
+
+
+
+## Key patterns demonstrated
+
+| Pattern | Where |
+|---|---|
+| Delayed scheduling | `delay=1800` — reminder runs 30 min later |
+| Unique tasks | `unique_key="notify:user_456:..."` — deduplicates |
+| Priority queues | `priority=10` urgent runs before `priority=0` bulk |
+| Job cancellation | `queue.cancel_job()` revokes a scheduled SMS |
+| Batch enqueue | `send_bulk_email.map()` for newsletter |
+| Periodic tasks | `daily_digest` runs every day at 9 AM |
+| Result TTL | `result_ttl=7200` — auto-cleanup after 2 hours |
+| Retry with backoff | `send_push` — 5 retries, 2x backoff |
+| Job inspection | `queue.list_jobs()` to view pending notifications |
diff --git a/docs-next/content/docs/more/examples/web-scraper.mdx b/docs-next/content/docs/more/examples/web-scraper.mdx
new file mode 100644
index 0000000..67ffaba
--- /dev/null
+++ b/docs-next/content/docs/more/examples/web-scraper.mdx
@@ -0,0 +1,212 @@
+---
+title: Web Scraper Pipeline
+description: "Multi-stage scraper with rate limiting, retries, chord workflow, hooks, periodic cleanup, named queues."
+---
+
+import { Tab, Tabs } from "fumadocs-ui/components/tabs";
+
+A complete multi-stage web scraper demonstrating rate limiting, retries,
+workflows, progress tracking, hooks, periodic cleanup, and named queues.
+
+## Project structure
+
+```
+scraper/
+ tasks.py # Task definitions
+ worker.py # Worker entry point
+ run.py # Enqueue scraping jobs
+```
+
+## tasks.py
+
+```python
+import json
+import time
+from taskito import Queue, current_job, chain, group, chord
+
+queue = Queue(
+ db_path=".taskito/scraper.db",
+ workers=4,
+ default_retry=3,
+ default_timeout=60,
+ result_ttl=3600, # Auto-cleanup results after 1 hour
+)
+
+# ── Hooks ────────────────────────────────────────────────
+
+@queue.before_task
+def log_start(task_name, args, kwargs):
+ print(f"[START] {task_name}")
+
+@queue.on_success
+def log_success(task_name, args, kwargs, result):
+ print(f"[DONE] {task_name}")
+
+@queue.on_failure
+def log_failure(task_name, args, kwargs, error):
+ print(f"[FAIL] {task_name}: {error}")
+
+# ── Tasks ────────────────────────────────────────────────
+
+@queue.task(
+ rate_limit="30/m", # Max 30 requests per minute
+ max_retries=5,
+ retry_backoff=2.0,
+ queue="scraping",
+)
+def fetch_page(url):
+ """Fetch a single URL. Rate-limited and retried on failure."""
+ import urllib.request
+ with urllib.request.urlopen(url, timeout=10) as resp:
+ return resp.read().decode("utf-8")
+
+@queue.task(queue="processing")
+def extract_links(html):
+ """Extract all links from an HTML page."""
+ import re
+ return re.findall(r'href="(https?://[^"]+)"', html)
+
+@queue.task(queue="processing")
+def extract_title(html):
+ """Extract the page title."""
+ import re
+ match = re.search(r"(.*?)", html, re.IGNORECASE | re.DOTALL)
+ return match.group(1).strip() if match else "No title"
+
+@queue.task(queue="storage")
+def store_results(results, url=""):
+ """Store scraped data to a JSON file."""
+ data = {"url": url, "results": results, "scraped_at": time.time()}
+ filename = f"output_{int(time.time())}.json"
+ with open(filename, "w") as f:
+ json.dump(data, f, indent=2)
+ return filename
+
+@queue.task(queue="processing")
+def summarize(pages):
+ """Aggregate results from multiple pages."""
+ total_links = sum(len(p.get("links", [])) for p in pages)
+ titles = [p.get("title", "?") for p in pages]
+ return {
+ "pages_scraped": len(pages),
+ "total_links": total_links,
+ "titles": titles,
+ }
+
+@queue.task()
+def scrape_page(url):
+ """Full pipeline for a single page: fetch → extract links + title."""
+ html = fetch_page(url) # Direct call (not queued)
+ links = extract_links(html)
+ title = extract_title(html)
+ return {"url": url, "title": title, "links": links}
+
+# ── Periodic cleanup ────────────────────────────────────
+
+@queue.periodic(cron="0 0 * * * *")
+def hourly_cleanup():
+ """Purge completed jobs and dead letters every hour."""
+ completed = queue.purge_completed(older_than=3600)
+ dead = queue.purge_dead(older_than=86400)
+ print(f"Cleanup: purged {completed} completed, {dead} dead")
+```
+
+## run.py
+
+```python
+"""Enqueue scraping jobs."""
+from tasks import queue, scrape_page, summarize, store_results
+from taskito import group, chord
+
+urls = [
+ "https://example.com",
+ "https://httpbin.org/html",
+ "https://jsonplaceholder.typicode.com",
+]
+
+# ── Option 1: Simple parallel scraping ──────────────────
+
+print("Enqueuing scrape jobs...")
+jobs = [scrape_page.delay(url) for url in urls]
+
+# Wait for all results
+for job in jobs:
+ result = job.result(timeout=30)
+ print(f" {result['title']} — {len(result['links'])} links")
+
+# ── Option 2: Chord — scrape in parallel, then summarize ─
+
+print("\nRunning chord pipeline...")
+result = chord(
+ group(*[scrape_page.s(url) for url in urls]),
+ summarize.s(),
+).apply(queue)
+
+summary = result.result(timeout=60)
+print(f" Scraped {summary['pages_scraped']} pages")
+print(f" Found {summary['total_links']} total links")
+print(f" Titles: {summary['titles']}")
+
+# ── Option 3: Batch enqueue with .map() ─────────────────
+
+print("\nBatch enqueue with .map()...")
+jobs = scrape_page.map([(url,) for url in urls])
+results = [j.result(timeout=30) for j in jobs]
+print(f" Scraped {len(results)} pages in batch")
+
+# ── Check stats ─────────────────────────────────────────
+
+stats = queue.stats()
+print(f"\nQueue stats: {stats}")
+```
+
+## worker.py
+
+```python
+"""Start the worker."""
+from tasks import queue
+
+if __name__ == "__main__":
+ print("Starting scraper worker...")
+ queue.run_worker(queues=["scraping", "processing", "storage"])
+```
+
+## Running it
+
+
+
+
+```bash
+python worker.py
+```
+
+
+
+
+```bash
+python run.py
+```
+
+
+
+
+```bash
+taskito info --app tasks:queue --watch
+```
+
+
+
+
+## Key patterns demonstrated
+
+| Pattern | Where |
+|---|---|
+| Rate limiting | `fetch_page` — 30 requests/min |
+| Retry with backoff | `fetch_page` — 5 retries, 2.0x backoff |
+| Named queues | `scraping`, `processing`, `storage` |
+| Hooks | `log_start`, `log_success`, `log_failure` |
+| Workflows (chord) | Parallel scrape → summarize |
+| Batch enqueue | `.map()` for bulk job creation |
+| Periodic tasks | `hourly_cleanup` runs every hour |
+| Result TTL | Auto-cleanup completed jobs after 1 hour |
+| Direct call | `scrape_page` calls `fetch_page()` directly |
diff --git a/docs-next/content/docs/more/examples/workflows.mdx b/docs-next/content/docs/more/examples/workflows.mdx
new file mode 100644
index 0000000..ba01cda
--- /dev/null
+++ b/docs-next/content/docs/more/examples/workflows.mdx
@@ -0,0 +1,268 @@
+---
+title: DAG Workflow Examples
+description: "ML training, map-reduce, resilient pipelines, sub-workflows, incremental runs, pre-execution analysis."
+---
+
+Real-world workflow patterns demonstrating fan-out, conditions, approval
+gates, sub-workflows, and incremental runs.
+
+## ML training pipeline
+
+A training pipeline that evaluates a model, gates deployment on accuracy,
+and has a rollback path.
+
+```python
+from taskito import Queue
+from taskito.workflows import Workflow, WorkflowContext
+
+queue = Queue(db_path="ml.db", workers=4)
+
+
+@queue.task()
+def fetch_dataset() -> dict:
+ return {"rows": 50_000, "path": "/data/train.parquet"}
+
+
+@queue.task()
+def train_model(dataset: dict) -> dict:
+ # ... training logic ...
+ return {"model_id": "v3.2", "accuracy": 0.97, "loss": 0.08}
+
+
+@queue.task()
+def evaluate(model: dict) -> dict:
+ return {"accuracy": model["accuracy"], "passed": model["accuracy"] > 0.90}
+
+
+@queue.task()
+def deploy(model_id: str) -> str:
+ return f"deployed {model_id}"
+
+
+@queue.task()
+def notify_failure() -> str:
+ return "sent alert: model below threshold"
+
+
+def accuracy_gate(ctx: WorkflowContext) -> bool:
+ return ctx.results.get("evaluate", {}).get("passed", False)
+
+
+wf = Workflow(name="ml_pipeline")
+wf.step("fetch", fetch_dataset)
+wf.step("train", train_model, after="fetch")
+wf.step("evaluate", evaluate, after="train")
+wf.gate("review", after="evaluate", timeout=3600, on_timeout="reject")
+wf.step("deploy", deploy, after="review")
+wf.step("alert", notify_failure, after="review", condition="on_failure")
+```
+
+ train --> evaluate
+ evaluate --> review["review\\n(approval gate)"]
+ review -->|approved| deploy
+ review -->|rejected| alert`}
+/>
+
+Usage:
+
+```python
+run = queue.submit_workflow(wf)
+
+# After human review:
+queue.approve_gate(run.id, "review")
+
+result = run.wait(timeout=120)
+print(run.visualize("mermaid"))
+```
+
+---
+
+## Map-reduce with fan-out
+
+Process a batch of items in parallel, then aggregate results.
+
+```python
+@queue.task()
+def fetch_urls() -> list[str]:
+ return [
+ "https://api.example.com/page/1",
+ "https://api.example.com/page/2",
+ "https://api.example.com/page/3",
+ ]
+
+
+@queue.task()
+def scrape(url: str) -> dict:
+ import httpx
+ resp = httpx.get(url)
+ return {"url": url, "status": resp.status_code, "size": len(resp.content)}
+
+
+@queue.task()
+def summarize(results: list[dict]) -> dict:
+ total = sum(r["size"] for r in results)
+ return {"pages": len(results), "total_bytes": total}
+
+
+wf = Workflow(name="scrape_pipeline")
+wf.step("fetch", fetch_urls)
+wf.step("scrape", scrape, after="fetch", fan_out="each")
+wf.step("summarize", summarize, after="scrape", fan_in="all")
+
+run = queue.submit_workflow(wf)
+result = run.wait(timeout=60)
+# summarize receives [{"url": ..., "size": ...}, ...]
+```
+
+---
+
+## Resilient pipeline with continue mode
+
+Independent branches keep running even when one fails.
+
+```python
+@queue.task(max_retries=0)
+def ingest_orders() -> str:
+ return "orders ingested"
+
+
+@queue.task(max_retries=0)
+def ingest_inventory() -> str:
+ raise RuntimeError("inventory source down")
+
+
+@queue.task()
+def build_report() -> str:
+ return "report built"
+
+
+@queue.task()
+def send_alert() -> str:
+ return "alert sent to #data-eng"
+
+
+wf = Workflow(name="daily_ingest", on_failure="continue")
+wf.step("orders", ingest_orders)
+wf.step("inventory", ingest_inventory)
+wf.step("report", build_report, after="orders")
+wf.step("alert", send_alert, after="inventory", condition="on_failure")
+```
+
+ report["report ✓"]
+ inventory["inventory ✗"] --> alert["alert ✓\\n(on_failure)"]`}
+/>
+
+`inventory` fails, but `orders → report` runs to completion. `alert` fires
+because its predecessor failed.
+
+---
+
+## Multi-region ETL with sub-workflows
+
+Compose reusable pipelines as sub-workflow steps.
+
+```python
+@queue.task()
+def extract(region: str) -> list:
+ return [{"region": region, "id": i} for i in range(100)]
+
+
+@queue.task()
+def load(data: list) -> int:
+ return len(data)
+
+
+@queue.task()
+def reconcile() -> str:
+ return "all regions reconciled"
+
+
+@queue.workflow("region_etl")
+def region_etl(region: str) -> Workflow:
+ wf = Workflow()
+ wf.step("extract", extract, args=(region,))
+ wf.step("load", load, after="extract")
+ return wf
+
+
+@queue.workflow("global_etl")
+def global_etl() -> Workflow:
+ wf = Workflow()
+ wf.step("eu", region_etl.as_step(region="eu"))
+ wf.step("us", region_etl.as_step(region="us"))
+ wf.step("ap", region_etl.as_step(region="ap"))
+ wf.step("reconcile", reconcile, after=["eu", "us", "ap"])
+ return wf
+
+
+run = global_etl.submit()
+run.wait(timeout=120)
+```
+
+EU, US, and AP ETL pipelines run concurrently as child workflows.
+`reconcile` runs after all three complete.
+
+---
+
+## Incremental re-runs
+
+Skip unchanged steps on the second run.
+
+```python
+wf = Workflow(name="nightly", cache_ttl=86400) # 24h TTL
+wf.step("extract", extract)
+wf.step("transform", transform, after="extract")
+wf.step("load", load, after="transform")
+
+# First run: everything executes
+run1 = queue.submit_workflow(wf)
+run1.wait()
+
+# Next day: skip completed steps
+run2 = queue.submit_workflow(wf, incremental=True, base_run=run1.id)
+run2.wait()
+
+for name, node in run2.status().nodes.items():
+ print(f"{name}: {node.status}")
+# extract: cache_hit
+# transform: cache_hit
+# load: cache_hit
+```
+
+---
+
+## Pre-execution analysis
+
+Analyze a workflow before submitting it.
+
+```python
+wf = Workflow(name="complex")
+wf.step("a", task_a)
+wf.step("b", task_b, after="a")
+wf.step("c", task_c, after="a")
+wf.step("d", task_d, after=["b", "c"])
+
+# Structure
+print(wf.topological_levels())
+# [["a"], ["b", "c"], ["d"]]
+
+print(wf.stats())
+# {"nodes": 4, "edges": 4, "depth": 3, "width": 2, "density": 0.67}
+
+# Critical path with estimated durations
+path, cost = wf.critical_path({"a": 2, "b": 10, "c": 3, "d": 1})
+print(f"Critical path: {path}, cost: {cost}s")
+# Critical path: ["a", "b", "d"], cost: 13s
+
+# Bottleneck
+analysis = wf.bottleneck_analysis({"a": 2, "b": 10, "c": 3, "d": 1})
+print(analysis["suggestion"])
+# "b is the bottleneck (76.9% of total time). Consider optimizing."
+
+# Visualization
+print(wf.visualize("mermaid"))
+```