diff --git a/docs-next/content/docs/more/examples/benchmark.mdx b/docs-next/content/docs/more/examples/benchmark.mdx new file mode 100644 index 0000000..f3401cb --- /dev/null +++ b/docs-next/content/docs/more/examples/benchmark.mdx @@ -0,0 +1,230 @@ +--- +title: Benchmark +description: "Throughput benchmark — enqueue / processing / latency, comparison with Celery and Dramatiq." +--- + +import { Callout } from "fumadocs-ui/components/callout"; + +Measure taskito's throughput by enqueuing and processing a large batch of +tasks. + +## benchmark.py + +```python +"""taskito throughput benchmark. + +Measures: +1. Enqueue throughput (jobs/sec) using batch insert +2. Processing throughput (jobs/sec) with N workers +3. End-to-end latency +""" + +import os +import threading +import time + +from taskito import Queue + +# ── Configuration ──────────────────────────────────────── + +NUM_JOBS = 10_000 +NUM_WORKERS = os.cpu_count() or 4 +DB_PATH = ":memory:" # In-memory for pure speed test + +queue = Queue(db_path=DB_PATH, workers=NUM_WORKERS) + +@queue.task() +def noop(x): + """Minimal task — measures framework overhead.""" + return x + +@queue.task() +def cpu_light(x): + """Light CPU work — string formatting.""" + return f"processed-{x}-{'x' * 100}" + +# ── Benchmark Functions ────────────────────────────────── + +def bench_enqueue(task, n): + """Measure batch enqueue throughput.""" + args_list = [(i,) for i in range(n)] + + start = time.perf_counter() + jobs = task.map(args_list) + elapsed = time.perf_counter() - start + + rate = n / elapsed + print(f" Enqueued {n:,} jobs in {elapsed:.2f}s ({rate:,.0f} jobs/s)") + return jobs + +def bench_process(jobs, timeout=120): + """Measure processing throughput by waiting for all jobs.""" + n = len(jobs) + start = time.perf_counter() + + last = jobs[-1] + try: + last.result(timeout=timeout, poll_interval=0.01, max_poll_interval=0.1) + except TimeoutError: + stats = queue.stats() + print(f" Timed out! Stats: {stats}") + return + + elapsed = time.perf_counter() - start + rate = n / elapsed + print(f" Processed {n:,} jobs in {elapsed:.2f}s ({rate:,.0f} jobs/s)") + +def bench_latency(task, samples=100): + """Measure single-job round-trip latency.""" + latencies = [] + for i in range(samples): + start = time.perf_counter() + job = task.delay(i) + job.result(timeout=10) + latencies.append(time.perf_counter() - start) + + avg = sum(latencies) / len(latencies) + p50 = sorted(latencies)[len(latencies) // 2] + p99 = sorted(latencies)[int(len(latencies) * 0.99)] + print(f" Latency (n={samples}): avg={avg*1000:.1f}ms p50={p50*1000:.1f}ms p99={p99*1000:.1f}ms") + +# ── Main ───────────────────────────────────────────────── + +def main(): + print(f"taskito benchmark") + print(f" Workers: {NUM_WORKERS}") + print(f" Jobs: {NUM_JOBS:,}") + print(f" DB: {DB_PATH}") + print() + + # Start worker in background + worker_thread = threading.Thread(target=queue.run_worker, daemon=True) + worker_thread.start() + time.sleep(0.5) + + print("── noop task (framework overhead) ──") + jobs = bench_enqueue(noop, NUM_JOBS) + bench_process(jobs) + print() + + print("── cpu_light task ──") + jobs = bench_enqueue(cpu_light, NUM_JOBS) + bench_process(jobs) + print() + + print("── single-job latency ──") + bench_latency(noop) + print() + + stats = queue.stats() + print(f"Final stats: {stats}") + +if __name__ == "__main__": + main() +``` + +## Running + +```bash +python benchmark.py +``` + +## Sample output + +``` +taskito benchmark + Workers: 8 + Jobs: 10,000 + DB: :memory: + +── noop task (framework overhead) ── + Enqueued 10,000 jobs in 0.18s (55,556 jobs/s) + Processed 10,000 jobs in 2.41s (4,149 jobs/s) + +── cpu_light task ── + Enqueued 10,000 jobs in 0.19s (52,632 jobs/s) + Processed 10,000 jobs in 2.53s (3,953 jobs/s) + +── single-job latency ── + Latency (n=100): avg=1.2ms p50=1.1ms p99=3.4ms + +Final stats: {'pending': 0, 'running': 0, 'completed': 20100, 'failed': 0, 'dead': 0, 'cancelled': 0} +``` + + + Actual numbers depend on your hardware, Python version, and SQLite + configuration. The numbers above are from an 8-core machine with Python + 3.12. + + +## What makes taskito fast + +| Component | How it helps | +|---|---| +| **Batch inserts** | `task.map()` inserts all jobs in a single SQLite transaction | +| **WAL mode** | Concurrent reads while writing — workers don't block enqueue | +| **Rust scheduler** | 50ms poll loop runs in native code, not Python | +| **OS threads** | Workers are Rust `std::thread`, not Python threads | +| **GIL per task** | GIL acquired only during Python task execution, released between tasks | +| **tokio mpsc channels** | Bounded async dispatch to workers | +| **r2d2 pool** | Up to 8 concurrent SQLite connections | +| **Diesel ORM** | Compiled SQL queries, no runtime query building | + +## How it compares + +Rough directional comparison on the same hardware (8-core, single +machine). These are not scientific benchmarks — run the script above on +your own hardware for accurate numbers. + +| Metric | taskito (SQLite) | taskito (Postgres) | Celery + Redis | Dramatiq + Redis | +|--------|-----------------|-------------------|---------------|-----------------| +| Enqueue throughput | ~55,000/s | ~20,000/s | ~5,000/s | ~3,000/s | +| Processing (noop, 8 workers) | ~4,000/s | ~3,500/s | ~2,000/s | ~1,500/s | +| p50 latency | 1.1ms | 2.5ms | 5–10ms | 8–15ms | +| p99 latency | 3.4ms | 8ms | 20–50ms | 30–80ms | +| Memory (idle worker) | ~30 MB | ~35 MB | ~80 MB | ~60 MB | +| Setup | `pip install taskito` | + Postgres | + Redis + Celery | + Redis + Dramatiq | +| External services | 0 | 1 (Postgres) | 2 (Redis + result backend) | 1 (Redis) | + + + Celery numbers are from public benchmarks and community reports. Your + mileage will vary depending on workload, serializer, and broker + configuration. Run your own benchmarks before making decisions. + + +**Why is taskito faster?** + +- Rust scheduler avoids GIL contention — scheduling and dispatch never block Python +- SQLite WAL mode with batch inserts — disk I/O is minimized +- Direct DB polling — no broker hop (enqueue → DB → dequeue is one less network round-trip vs enqueue → Redis → dequeue) +- OS thread pool with per-task GIL acquisition — no multiprocessing overhead for I/O-bound tasks + +## Tune for your workload + +| Symptom | Config to change | Why | +|---------|-----------------|-----| +| Low throughput (I/O tasks) | Increase `workers` | More threads = more concurrent I/O | +| Low throughput (CPU tasks) | Use `pool="prefork"` | Each process gets its own GIL | +| High latency | Decrease `scheduler_poll_interval_ms` | Scheduler checks for ready jobs more often | +| Database too busy | Increase `scheduler_poll_interval_ms` | Less frequent polling reduces DB load | +| Memory growing | Set `result_ttl` | Auto-cleanup old results and metrics | +| Jobs timing out | Increase `default_timeout` | Give tasks more time to complete | +| Jobs piling up | Add more workers or use Postgres | SQLite single-writer limit may bottleneck | + +## Tuning + +Adjust these for your workload: + +```python +# More workers for I/O-bound tasks +queue = Queue(workers=16) + +# Fewer workers for CPU-bound tasks (limited by GIL) +queue = Queue(workers=4) + +# In-memory DB for maximum throughput (no persistence) +queue = Queue(db_path=":memory:") + +# File DB for durability (slightly slower) +queue = Queue(db_path="tasks.db") +``` diff --git a/docs-next/content/docs/more/examples/data-pipeline.mdx b/docs-next/content/docs/more/examples/data-pipeline.mdx new file mode 100644 index 0000000..8c97a87 --- /dev/null +++ b/docs-next/content/docs/more/examples/data-pipeline.mdx @@ -0,0 +1,278 @@ +--- +title: ETL Data Pipeline +description: "Multi-stage extract → transform → load with task dependencies, named queues, progress tracking, error inspection." +--- + +import { Tab, Tabs } from "fumadocs-ui/components/tabs"; + +A multi-stage extract → transform → load pipeline demonstrating task +dependencies, DAG workflows, progress tracking, error history inspection, +metadata, and named queues. + +## Project structure + +``` +data-pipeline/ + pipeline.py # Task definitions + DAG construction + worker.py # Worker entry point + monitor.py # Status monitoring script +``` + +## pipeline.py + +```python +"""ETL pipeline with task dependencies and named queues.""" + +import csv +import json + +import httpx + +from taskito import Queue, current_job + +queue = Queue( + db_path=".taskito/pipeline.db", + workers=6, + default_retry=3, + default_timeout=120, +) + +# ── Extract Tasks ──────────────────────────────────────── + +@queue.task(queue="extract", max_retries=5, retry_backoff=2.0) +def extract_api(endpoint: str) -> list[dict]: + """Pull records from an API endpoint with retries.""" + response = httpx.get(endpoint, timeout=30) + response.raise_for_status() + return response.json() + +@queue.task(queue="extract") +def extract_csv(file_path: str) -> list[dict]: + """Read records from a CSV file.""" + with open(file_path, newline="") as f: + return list(csv.DictReader(f)) + +# ── Transform Tasks ────────────────────────────────────── + +@queue.task(queue="transform") +def normalize(records: list[dict], schema: str) -> list[dict]: + """Normalize records against a schema with progress tracking.""" + results = [] + for i, record in enumerate(records): + results.append({**record, "schema": schema, "normalized": True}) + if (i + 1) % 50 == 0: + current_job.update_progress(int((i + 1) / len(records) * 100)) + current_job.update_progress(100) + return results + +@queue.task(queue="transform") +def deduplicate(records: list[dict]) -> list[dict]: + """Remove duplicate records by ID.""" + seen = set() + unique = [] + for r in records: + if r["id"] not in seen: + seen.add(r["id"]) + unique.append(r) + return unique + +# ── Load Tasks ─────────────────────────────────────────── + +@queue.task(queue="load") +def load_to_warehouse(records: list[dict], table: str) -> dict: + """Load records into the data warehouse (writes JSON to disk as stand-in).""" + dest = f"/tmp/{table.replace('.', '_')}.json" + with open(dest, "w") as f: + json.dump(records, f, indent=2) + return {"table": table, "rows_inserted": len(records), "dest": dest} + +# ── DAG Construction ───────────────────────────────────── + +def build_pipeline(api_endpoint: str, csv_path: str, target_table: str): + """Build a diamond-shaped ETL DAG. + + extract_api ──→ normalize_a ──┐ + ├──→ load + extract_csv ──→ normalize_b ──┘ + """ + + # Stage 1: Extract (parallel) + job_api = extract_api.apply_async( + args=[api_endpoint], + metadata=json.dumps({"source": "api", "endpoint": api_endpoint}), + ) + job_csv = extract_csv.apply_async( + args=[csv_path], + metadata=json.dumps({"source": "csv", "file": csv_path}), + ) + + # Stage 2: Transform (each depends on its extract) + job_norm_a = normalize.apply_async( + args=[[], "schema_v2"], # actual data passed via result + depends_on=job_api.id, + metadata=json.dumps({"stage": "transform", "schema": "v2"}), + ) + job_norm_b = normalize.apply_async( + args=[[], "schema_v2"], + depends_on=job_csv.id, + metadata=json.dumps({"stage": "transform", "schema": "v2"}), + ) + + # Stage 3: Load (depends on both transforms) + job_load = load_to_warehouse.apply_async( + args=[[], target_table], + depends_on=[job_norm_a.id, job_norm_b.id], + priority=10, # high priority once unblocked + metadata=json.dumps({"stage": "load", "table": target_table}), + ) + + return { + "extract": [job_api, job_csv], + "transform": [job_norm_a, job_norm_b], + "load": job_load, + } + + +if __name__ == "__main__": + print("Building ETL pipeline...") + jobs = build_pipeline( + api_endpoint="https://api.example.com/records", + csv_path="/data/export.csv", + target_table="analytics.events", + ) + + print(f"\nDAG created:") + print(f" Extract: {[j.id for j in jobs['extract']]}") + print(f" Transform: {[j.id for j in jobs['transform']]}") + print(f" Load: {jobs['load'].id}") + + # Inspect dependency graph + load_job = queue.get_job(jobs["load"].id) + print(f"\nLoad depends on: {load_job.dependencies}") + + for ext_job in jobs["extract"]: + fetched = queue.get_job(ext_job.id) + print(f" {ext_job.id} dependents: {fetched.dependents}") +``` + +## worker.py + +```python +"""Start the pipeline worker.""" +from pipeline import queue + +if __name__ == "__main__": + print("Starting pipeline worker (queues: extract, transform, load)...") + queue.run_worker(queues=["extract", "transform", "load"]) +``` + +## monitor.py + +```python +"""Monitor pipeline status and inspect errors.""" + +import time +from pipeline import queue + +def monitor(load_job_id: str): + """Poll the pipeline until the load job completes.""" + while True: + stats = queue.stats() + print(f"Queue stats: {stats}") + + job = queue.get_job(load_job_id) + if job is None: + print("Load job not found!") + return + + print(f"Load job status: {job.status}, progress: {job.progress}%") + + if job.status == "complete": + print(f"\nPipeline complete! Result: {job.result(timeout=1)}") + return + + if job.status in ("failed", "dead", "cancelled"): + print(f"\nPipeline failed with status: {job.status}") + + # Inspect error history for all jobs + for dep_id in job.dependencies: + dep = queue.get_job(dep_id) + if dep and dep.status in ("dead", "failed"): + errors = dep.errors + print(f"\n Job {dep_id} errors:") + for err in errors: + print(f" Attempt {err['attempt']}: {err['error']}") + print(f" At: {err['failed_at']}") + + # Check dead letter queue + dead = queue.dead_letters(limit=10) + if dead: + print(f"\nDead letters ({len(dead)}):") + for d in dead: + print(f" {d['id']}: {d['task_name']} — {d['error']}") + return + + time.sleep(2) + +if __name__ == "__main__": + import sys + if len(sys.argv) < 2: + print("Usage: python monitor.py ") + sys.exit(1) + monitor(sys.argv[1]) +``` + +## Running it + + + + +```bash +python worker.py +``` + + + + +```bash +python pipeline.py +# Copy the load job ID from the output +``` + + + + +```bash +python monitor.py +``` + + + + +## Cascade cancellation + +If an extract job fails permanently (exhausts all retries), the entire +downstream chain is automatically cancelled: + +```python +# If extract_api fails after 5 retries: +# → normalize_a is cascade cancelled +# → load is cascade cancelled (because one dependency failed) +# normalize_b may still complete, but load won't run +``` + +This prevents wasting resources on a pipeline that can't succeed. + +## Key patterns demonstrated + +| Pattern | Where | +|---|---| +| Task dependencies | `depends_on` in transform and load stages | +| Diamond DAG | Two branches converge at the load stage | +| Cascade cancel | Extract failure cancels downstream transforms and load | +| Progress tracking | `normalize` reports progress every 50 records | +| Error history | `monitor.py` inspects `job.errors` for failed jobs | +| Metadata | Each job tagged with source/stage info via `metadata` | +| Named queues | `extract`, `transform`, `load` for queue isolation | +| Priority | Load job gets `priority=10` to run first once unblocked | +| Dead letter inspection | `monitor.py` checks `queue.dead_letters()` | diff --git a/docs-next/content/docs/more/examples/fastapi-service.mdx b/docs-next/content/docs/more/examples/fastapi-service.mdx new file mode 100644 index 0000000..776e684 --- /dev/null +++ b/docs-next/content/docs/more/examples/fastapi-service.mdx @@ -0,0 +1,190 @@ +--- +title: "FastAPI Image Processing Service" +description: "REST API with progress tracking, SSE streaming, async result fetching, and cancellation." +--- + +import { Tab, Tabs } from "fumadocs-ui/components/tabs"; + +A REST API for image processing that demonstrates FastAPI integration, +progress tracking, async result fetching, job cancellation, and SSE +progress streaming. + +## Project structure + +``` +image-service/ + app.py # FastAPI app + task definitions + client.py # Example client script +``` + +## app.py + +```python +"""FastAPI image processing service with taskito.""" + +import time + +from fastapi import FastAPI, HTTPException +from taskito import Queue, current_job +from taskito.contrib.fastapi import TaskitoRouter + +queue = Queue(db_path=".taskito/images.db", workers=4, result_ttl=3600) + +# ── Tasks ──────────────────────────────────────────────── + +@queue.task(timeout=300) +def resize_image(image_url: str, sizes: list[int]) -> dict: + """Resize an image to multiple sizes with progress updates.""" + results = {} + for i, size in enumerate(sizes): + # Simulate resize work + time.sleep(1) + results[f"{size}x{size}"] = f"{image_url}?w={size}&h={size}" + current_job.update_progress(int((i + 1) / len(sizes) * 100)) + return results + +@queue.task(max_retries=3, retry_backoff=2.0) +def generate_thumbnail(image_url: str) -> str: + """Generate a thumbnail — retries on failure.""" + time.sleep(0.5) + return f"{image_url}?thumb=true" + +@queue.task(timeout=600) +def apply_filters(image_url: str, filters: list[str]) -> dict: + """Apply a sequence of filters with progress.""" + results = {} + for i, f in enumerate(filters): + time.sleep(2) + results[f] = f"{image_url}?filter={f}" + current_job.update_progress(int((i + 1) / len(filters) * 100)) + return results + +# ── FastAPI App ────────────────────────────────────────── + +app = FastAPI(title="Image Processing Service") + +# Mount the taskito router — adds /tasks/stats, /tasks/jobs/{id}, etc. +app.include_router(TaskitoRouter(queue), prefix="/tasks") + +@app.post("/process") +async def submit_job(image_url: str, sizes: list[int] | None = None): + """Submit an image processing job and return the job ID.""" + if sizes is None: + sizes = [128, 256, 512, 1024] + job = resize_image.delay(image_url, sizes) + return {"job_id": job.id, "status_url": f"/tasks/jobs/{job.id}"} + +@app.post("/process/{job_id}/cancel") +async def cancel_job(job_id: str): + """Cancel a pending image processing job.""" + cancelled = await queue.acancel_job(job_id) + if not cancelled: + raise HTTPException(400, "Job is not in a cancellable state") + return {"cancelled": True, "job_id": job_id} + +@app.get("/process/{job_id}/result") +async def get_result(job_id: str, timeout: float = 0): + """Get the result, optionally blocking until complete.""" + job = queue.get_job(job_id) + if job is None: + raise HTTPException(404, "Job not found") + if timeout > 0: + try: + result = await job.aresult(timeout=timeout) + return {"status": "complete", "result": result} + except TimeoutError: + return {"status": job.status, "result": None} + return {"status": job.status, "progress": job.progress} +``` + +## client.py + +```python +"""Example client for the image processing service.""" + +import httpx +import json +import time + +BASE = "http://localhost:8000" + +# 1. Submit a job +resp = httpx.post(f"{BASE}/process", params={ + "image_url": "https://example.com/photo.jpg", + "sizes": [128, 256, 512, 1024], +}) +data = resp.json() +job_id = data["job_id"] +print(f"Submitted job: {job_id}") + +# 2. Stream progress via SSE +print("\nStreaming progress:") +with httpx.stream("GET", f"{BASE}/tasks/jobs/{job_id}/progress") as r: + for line in r.iter_lines(): + if line.startswith("data:"): + payload = json.loads(line[5:].strip()) + print(f" Progress: {payload['progress']}% — Status: {payload['status']}") + if payload["status"] in ("complete", "failed", "dead", "cancelled"): + break + +# 3. Fetch the final result +result = httpx.get(f"{BASE}/tasks/jobs/{job_id}/result", params={"timeout": 5}) +print(f"\nResult: {result.json()}") +``` + +## Running it + + + + +```bash +taskito worker --app app:queue +``` + + + + +```bash +uvicorn app:app --reload +``` + + + + +```bash +python client.py +``` + + + + +## SSE from the browser + +```javascript +const jobId = "01H5K6X..."; +const source = new EventSource(`/tasks/jobs/${jobId}/progress`); + +const progressBar = document.getElementById("progress"); + +source.onmessage = (event) => { + const data = JSON.parse(event.data); + progressBar.style.width = `${data.progress}%`; + progressBar.textContent = `${data.progress}%`; + + if (["complete", "failed", "dead", "cancelled"].includes(data.status)) { + source.close(); + } +}; +``` + +## Key patterns demonstrated + +| Pattern | Where | +|---|---| +| FastAPI integration | `TaskitoRouter(queue)` mounted at `/tasks` | +| Progress tracking | `current_job.update_progress()` in `resize_image` and `apply_filters` | +| Async result fetch | `await job.aresult(timeout=...)` in `get_result` endpoint | +| Async cancellation | `await queue.acancel_job()` in `cancel_job` endpoint | +| SSE streaming | `/tasks/jobs/{id}/progress` endpoint from `TaskitoRouter` | +| Retry with backoff | `generate_thumbnail` — 3 retries, 2x backoff | +| Result TTL | `result_ttl=3600` — auto-cleanup after 1 hour | diff --git a/docs-next/content/docs/more/examples/index.mdx b/docs-next/content/docs/more/examples/index.mdx index e7da88f..1b76381 100644 --- a/docs-next/content/docs/more/examples/index.mdx +++ b/docs-next/content/docs/more/examples/index.mdx @@ -1,10 +1,15 @@ --- title: Examples -description: "End-to-end examples for common patterns." +description: "End-to-end examples demonstrating common taskito patterns." --- -import { Callout } from 'fumadocs-ui/components/callout'; +End-to-end examples demonstrating common taskito patterns. - - Content port pending. See the [Zensical source](https://github.com/ByteVeda/taskito/tree/master/docs) for current text. - +| Example | Description | +|---------|-------------| +| [FastAPI Service](/docs/more/examples/fastapi-service) | REST API that enqueues tasks and streams progress via SSE | +| [Notification Service](/docs/more/examples/notifications) | Multi-channel notifications with retries and rate limiting | +| [Web Scraper Pipeline](/docs/more/examples/web-scraper) | Distributed scraping with chains and error handling | +| [Data Pipeline](/docs/more/examples/data-pipeline) | ETL pipeline with dependencies, groups, and chords | +| [DAG Workflows](/docs/more/examples/workflows) | Fan-out, conditions, gates, sub-workflows, incremental runs | +| [Benchmark](/docs/more/examples/benchmark) | Performance benchmarks comparing taskito to alternatives | diff --git a/docs-next/content/docs/more/examples/meta.json b/docs-next/content/docs/more/examples/meta.json index d7e7b32..3a29f22 100644 --- a/docs-next/content/docs/more/examples/meta.json +++ b/docs-next/content/docs/more/examples/meta.json @@ -1,4 +1,12 @@ { "title": "Examples", - "pages": ["index"] + "pages": [ + "index", + "fastapi-service", + "notifications", + "web-scraper", + "data-pipeline", + "workflows", + "benchmark" + ] } diff --git a/docs-next/content/docs/more/examples/notifications.mdx b/docs-next/content/docs/more/examples/notifications.mdx new file mode 100644 index 0000000..aaed82b --- /dev/null +++ b/docs-next/content/docs/more/examples/notifications.mdx @@ -0,0 +1,226 @@ +--- +title: Notification Service +description: "Delayed scheduling, unique tasks, priority queues, and job cancellation across email/SMS/push channels." +--- + +import { Tab, Tabs } from "fumadocs-ui/components/tabs"; + +A notification service demonstrating delayed scheduling, unique tasks, +priority queues, and job cancellation. + +## Project structure + +``` +notifications/ + tasks.py # Notification task definitions + service.py # Enqueue notifications with scheduling +``` + +## tasks.py + +```python +"""Notification tasks with priority and deduplication.""" + +import os + +import httpx + +from taskito import Queue + +queue = Queue( + db_path=".taskito/notifications.db", + workers=4, + default_retry=3, + default_timeout=30, + result_ttl=7200, # auto-cleanup after 2 hours +) + +# ── Notification Tasks ─────────────────────────────────── + +@queue.task(priority=10, max_retries=3, retry_backoff=2.0) +def send_urgent_email(to: str, subject: str, body: str) -> dict: + """High-priority email — runs before bulk notifications.""" + response = httpx.post( + "https://api.mailgun.net/v3/YOUR_DOMAIN/messages", + auth=("api", os.environ["MAILGUN_API_KEY"]), + data={"from": "noreply@example.com", "to": to, "subject": subject, "text": body}, + ) + response.raise_for_status() + return {"to": to, "subject": subject, "sent": True} + +@queue.task(priority=0, max_retries=3, retry_backoff=2.0) +def send_bulk_email(to: str, subject: str, body: str) -> dict: + """Low-priority bulk email.""" + response = httpx.post( + "https://api.mailgun.net/v3/YOUR_DOMAIN/messages", + auth=("api", os.environ["MAILGUN_API_KEY"]), + data={"from": "noreply@example.com", "to": to, "subject": subject, "text": body}, + ) + response.raise_for_status() + return {"to": to, "subject": subject, "sent": True} + +@queue.task(priority=5, max_retries=5, retry_backoff=2.0) +def send_push(user_id: str, title: str, message: str) -> dict: + """Push notification with retries.""" + response = httpx.post( + "https://fcm.googleapis.com/fcm/send", + headers={"Authorization": f"key={os.environ['FCM_SERVER_KEY']}"}, + json={"to": f"/topics/user-{user_id}", "notification": {"title": title, "body": message}}, + ) + response.raise_for_status() + return {"user_id": user_id, "title": title, "sent": True} + +@queue.task(max_retries=3, retry_backoff=2.0) +def send_sms(phone: str, message: str) -> dict: + """SMS notification via Twilio.""" + response = httpx.post( + f"https://api.twilio.com/2010-04-01/Accounts/{os.environ['TWILIO_ACCOUNT_SID']}/Messages.json", + auth=(os.environ["TWILIO_ACCOUNT_SID"], os.environ["TWILIO_AUTH_TOKEN"]), + data={"From": os.environ["TWILIO_FROM_NUMBER"], "To": phone, "Body": message}, + ) + response.raise_for_status() + return {"phone": phone, "sent": True} + +# ── Periodic Digest ────────────────────────────────────── + +@queue.periodic(cron="0 9 * * * *") +def daily_digest(): + """Send daily digest email every day at 9 AM.""" + print("[DIGEST] Sending daily digest to all subscribers...") +``` + +## service.py + +```python +"""Notification service — enqueue with scheduling, deduplication, and cancellation.""" + +import time +from tasks import ( + queue, + send_urgent_email, + send_bulk_email, + send_push, + send_sms, +) + +# ── 1. Delayed Scheduling ─────────────────────────────── +# Schedule a reminder 30 minutes from now + +print("1. Delayed scheduling") +reminder = send_push.apply_async( + args=["user_123", "Reminder", "Your meeting starts in 5 minutes"], + delay=1800, # 30 minutes from now +) +print(f" Scheduled reminder: {reminder.id} (runs in 30 min)") + +# Schedule a follow-up email for 1 hour later +followup = send_bulk_email.apply_async( + args=["user@example.com", "How was your meeting?", "We'd love your feedback."], + delay=3600, # 1 hour from now +) +print(f" Scheduled follow-up: {followup.id} (runs in 1 hour)") + +# ── 2. Unique Tasks (Deduplication) ───────────────────── +# Prevent duplicate notifications for the same event + +print("\n2. Unique tasks") +job1 = send_push.apply_async( + args=["user_456", "New message", "You have a new message"], + unique_key="notify:user_456:new_message", +) +job2 = send_push.apply_async( + args=["user_456", "New message", "You have a new message"], + unique_key="notify:user_456:new_message", +) +print(f" First enqueue: {job1.id}") +print(f" Second enqueue: {job2.id}") +print(f" Same job? {job1.id == job2.id}") # True — deduplicated + +# ── 3. Priority Queues ────────────────────────────────── +# Urgent notifications run before bulk + +print("\n3. Priority queues") +# Enqueue bulk emails first +bulk_jobs = send_bulk_email.map([ + ("alice@example.com", "Newsletter", "This week's updates..."), + ("bob@example.com", "Newsletter", "This week's updates..."), + ("carol@example.com", "Newsletter", "This week's updates..."), +]) +print(f" Enqueued {len(bulk_jobs)} bulk emails (priority=0)") + +# Enqueue urgent email after — but it runs first due to priority=10 +urgent = send_urgent_email.delay( + "admin@example.com", + "Server Alert", + "CPU usage exceeded 90%", +) +print(f" Enqueued urgent email (priority=10) — runs before bulk") + +# ── 4. Job Cancellation ───────────────────────────────── +# Cancel a scheduled notification before it sends + +print("\n4. Job cancellation") +scheduled = send_sms.apply_async( + args=["+1234567890", "Your order ships tomorrow"], + delay=7200, # 2 hours from now +) +print(f" Scheduled SMS: {scheduled.id}") + +# User updated their preference — cancel the SMS +cancelled = queue.cancel_job(scheduled.id) +print(f" Cancelled: {cancelled}") # True + +job = queue.get_job(scheduled.id) +print(f" Status: {job.status}") # "cancelled" + +# ── 5. Inspect Pending Notifications ───────────────────── + +print("\n5. Pending notifications") +pending = queue.list_jobs(status="pending", limit=10) +for j in pending: + d = j.to_dict() + print(f" {d['id'][:12]}... | {d['task_name']} | priority={d['priority']}") + +stats = queue.stats() +print(f"\nQueue stats: {stats}") +``` + +## Running it + + + + +```bash +taskito worker --app tasks:queue +``` + + + + +```bash +python service.py +``` + + + + +```bash +taskito info --app tasks:queue --watch +``` + + + + +## Key patterns demonstrated + +| Pattern | Where | +|---|---| +| Delayed scheduling | `delay=1800` — reminder runs 30 min later | +| Unique tasks | `unique_key="notify:user_456:..."` — deduplicates | +| Priority queues | `priority=10` urgent runs before `priority=0` bulk | +| Job cancellation | `queue.cancel_job()` revokes a scheduled SMS | +| Batch enqueue | `send_bulk_email.map()` for newsletter | +| Periodic tasks | `daily_digest` runs every day at 9 AM | +| Result TTL | `result_ttl=7200` — auto-cleanup after 2 hours | +| Retry with backoff | `send_push` — 5 retries, 2x backoff | +| Job inspection | `queue.list_jobs()` to view pending notifications | diff --git a/docs-next/content/docs/more/examples/web-scraper.mdx b/docs-next/content/docs/more/examples/web-scraper.mdx new file mode 100644 index 0000000..67ffaba --- /dev/null +++ b/docs-next/content/docs/more/examples/web-scraper.mdx @@ -0,0 +1,212 @@ +--- +title: Web Scraper Pipeline +description: "Multi-stage scraper with rate limiting, retries, chord workflow, hooks, periodic cleanup, named queues." +--- + +import { Tab, Tabs } from "fumadocs-ui/components/tabs"; + +A complete multi-stage web scraper demonstrating rate limiting, retries, +workflows, progress tracking, hooks, periodic cleanup, and named queues. + +## Project structure + +``` +scraper/ + tasks.py # Task definitions + worker.py # Worker entry point + run.py # Enqueue scraping jobs +``` + +## tasks.py + +```python +import json +import time +from taskito import Queue, current_job, chain, group, chord + +queue = Queue( + db_path=".taskito/scraper.db", + workers=4, + default_retry=3, + default_timeout=60, + result_ttl=3600, # Auto-cleanup results after 1 hour +) + +# ── Hooks ──────────────────────────────────────────────── + +@queue.before_task +def log_start(task_name, args, kwargs): + print(f"[START] {task_name}") + +@queue.on_success +def log_success(task_name, args, kwargs, result): + print(f"[DONE] {task_name}") + +@queue.on_failure +def log_failure(task_name, args, kwargs, error): + print(f"[FAIL] {task_name}: {error}") + +# ── Tasks ──────────────────────────────────────────────── + +@queue.task( + rate_limit="30/m", # Max 30 requests per minute + max_retries=5, + retry_backoff=2.0, + queue="scraping", +) +def fetch_page(url): + """Fetch a single URL. Rate-limited and retried on failure.""" + import urllib.request + with urllib.request.urlopen(url, timeout=10) as resp: + return resp.read().decode("utf-8") + +@queue.task(queue="processing") +def extract_links(html): + """Extract all links from an HTML page.""" + import re + return re.findall(r'href="(https?://[^"]+)"', html) + +@queue.task(queue="processing") +def extract_title(html): + """Extract the page title.""" + import re + match = re.search(r"(.*?)", html, re.IGNORECASE | re.DOTALL) + return match.group(1).strip() if match else "No title" + +@queue.task(queue="storage") +def store_results(results, url=""): + """Store scraped data to a JSON file.""" + data = {"url": url, "results": results, "scraped_at": time.time()} + filename = f"output_{int(time.time())}.json" + with open(filename, "w") as f: + json.dump(data, f, indent=2) + return filename + +@queue.task(queue="processing") +def summarize(pages): + """Aggregate results from multiple pages.""" + total_links = sum(len(p.get("links", [])) for p in pages) + titles = [p.get("title", "?") for p in pages] + return { + "pages_scraped": len(pages), + "total_links": total_links, + "titles": titles, + } + +@queue.task() +def scrape_page(url): + """Full pipeline for a single page: fetch → extract links + title.""" + html = fetch_page(url) # Direct call (not queued) + links = extract_links(html) + title = extract_title(html) + return {"url": url, "title": title, "links": links} + +# ── Periodic cleanup ──────────────────────────────────── + +@queue.periodic(cron="0 0 * * * *") +def hourly_cleanup(): + """Purge completed jobs and dead letters every hour.""" + completed = queue.purge_completed(older_than=3600) + dead = queue.purge_dead(older_than=86400) + print(f"Cleanup: purged {completed} completed, {dead} dead") +``` + +## run.py + +```python +"""Enqueue scraping jobs.""" +from tasks import queue, scrape_page, summarize, store_results +from taskito import group, chord + +urls = [ + "https://example.com", + "https://httpbin.org/html", + "https://jsonplaceholder.typicode.com", +] + +# ── Option 1: Simple parallel scraping ────────────────── + +print("Enqueuing scrape jobs...") +jobs = [scrape_page.delay(url) for url in urls] + +# Wait for all results +for job in jobs: + result = job.result(timeout=30) + print(f" {result['title']} — {len(result['links'])} links") + +# ── Option 2: Chord — scrape in parallel, then summarize ─ + +print("\nRunning chord pipeline...") +result = chord( + group(*[scrape_page.s(url) for url in urls]), + summarize.s(), +).apply(queue) + +summary = result.result(timeout=60) +print(f" Scraped {summary['pages_scraped']} pages") +print(f" Found {summary['total_links']} total links") +print(f" Titles: {summary['titles']}") + +# ── Option 3: Batch enqueue with .map() ───────────────── + +print("\nBatch enqueue with .map()...") +jobs = scrape_page.map([(url,) for url in urls]) +results = [j.result(timeout=30) for j in jobs] +print(f" Scraped {len(results)} pages in batch") + +# ── Check stats ───────────────────────────────────────── + +stats = queue.stats() +print(f"\nQueue stats: {stats}") +``` + +## worker.py + +```python +"""Start the worker.""" +from tasks import queue + +if __name__ == "__main__": + print("Starting scraper worker...") + queue.run_worker(queues=["scraping", "processing", "storage"]) +``` + +## Running it + + + + +```bash +python worker.py +``` + + + + +```bash +python run.py +``` + + + + +```bash +taskito info --app tasks:queue --watch +``` + + + + +## Key patterns demonstrated + +| Pattern | Where | +|---|---| +| Rate limiting | `fetch_page` — 30 requests/min | +| Retry with backoff | `fetch_page` — 5 retries, 2.0x backoff | +| Named queues | `scraping`, `processing`, `storage` | +| Hooks | `log_start`, `log_success`, `log_failure` | +| Workflows (chord) | Parallel scrape → summarize | +| Batch enqueue | `.map()` for bulk job creation | +| Periodic tasks | `hourly_cleanup` runs every hour | +| Result TTL | Auto-cleanup completed jobs after 1 hour | +| Direct call | `scrape_page` calls `fetch_page()` directly | diff --git a/docs-next/content/docs/more/examples/workflows.mdx b/docs-next/content/docs/more/examples/workflows.mdx new file mode 100644 index 0000000..ba01cda --- /dev/null +++ b/docs-next/content/docs/more/examples/workflows.mdx @@ -0,0 +1,268 @@ +--- +title: DAG Workflow Examples +description: "ML training, map-reduce, resilient pipelines, sub-workflows, incremental runs, pre-execution analysis." +--- + +Real-world workflow patterns demonstrating fan-out, conditions, approval +gates, sub-workflows, and incremental runs. + +## ML training pipeline + +A training pipeline that evaluates a model, gates deployment on accuracy, +and has a rollback path. + +```python +from taskito import Queue +from taskito.workflows import Workflow, WorkflowContext + +queue = Queue(db_path="ml.db", workers=4) + + +@queue.task() +def fetch_dataset() -> dict: + return {"rows": 50_000, "path": "/data/train.parquet"} + + +@queue.task() +def train_model(dataset: dict) -> dict: + # ... training logic ... + return {"model_id": "v3.2", "accuracy": 0.97, "loss": 0.08} + + +@queue.task() +def evaluate(model: dict) -> dict: + return {"accuracy": model["accuracy"], "passed": model["accuracy"] > 0.90} + + +@queue.task() +def deploy(model_id: str) -> str: + return f"deployed {model_id}" + + +@queue.task() +def notify_failure() -> str: + return "sent alert: model below threshold" + + +def accuracy_gate(ctx: WorkflowContext) -> bool: + return ctx.results.get("evaluate", {}).get("passed", False) + + +wf = Workflow(name="ml_pipeline") +wf.step("fetch", fetch_dataset) +wf.step("train", train_model, after="fetch") +wf.step("evaluate", evaluate, after="train") +wf.gate("review", after="evaluate", timeout=3600, on_timeout="reject") +wf.step("deploy", deploy, after="review") +wf.step("alert", notify_failure, after="review", condition="on_failure") +``` + + train --> evaluate + evaluate --> review["review\\n(approval gate)"] + review -->|approved| deploy + review -->|rejected| alert`} +/> + +Usage: + +```python +run = queue.submit_workflow(wf) + +# After human review: +queue.approve_gate(run.id, "review") + +result = run.wait(timeout=120) +print(run.visualize("mermaid")) +``` + +--- + +## Map-reduce with fan-out + +Process a batch of items in parallel, then aggregate results. + +```python +@queue.task() +def fetch_urls() -> list[str]: + return [ + "https://api.example.com/page/1", + "https://api.example.com/page/2", + "https://api.example.com/page/3", + ] + + +@queue.task() +def scrape(url: str) -> dict: + import httpx + resp = httpx.get(url) + return {"url": url, "status": resp.status_code, "size": len(resp.content)} + + +@queue.task() +def summarize(results: list[dict]) -> dict: + total = sum(r["size"] for r in results) + return {"pages": len(results), "total_bytes": total} + + +wf = Workflow(name="scrape_pipeline") +wf.step("fetch", fetch_urls) +wf.step("scrape", scrape, after="fetch", fan_out="each") +wf.step("summarize", summarize, after="scrape", fan_in="all") + +run = queue.submit_workflow(wf) +result = run.wait(timeout=60) +# summarize receives [{"url": ..., "size": ...}, ...] +``` + +--- + +## Resilient pipeline with continue mode + +Independent branches keep running even when one fails. + +```python +@queue.task(max_retries=0) +def ingest_orders() -> str: + return "orders ingested" + + +@queue.task(max_retries=0) +def ingest_inventory() -> str: + raise RuntimeError("inventory source down") + + +@queue.task() +def build_report() -> str: + return "report built" + + +@queue.task() +def send_alert() -> str: + return "alert sent to #data-eng" + + +wf = Workflow(name="daily_ingest", on_failure="continue") +wf.step("orders", ingest_orders) +wf.step("inventory", ingest_inventory) +wf.step("report", build_report, after="orders") +wf.step("alert", send_alert, after="inventory", condition="on_failure") +``` + + report["report ✓"] + inventory["inventory ✗"] --> alert["alert ✓\\n(on_failure)"]`} +/> + +`inventory` fails, but `orders → report` runs to completion. `alert` fires +because its predecessor failed. + +--- + +## Multi-region ETL with sub-workflows + +Compose reusable pipelines as sub-workflow steps. + +```python +@queue.task() +def extract(region: str) -> list: + return [{"region": region, "id": i} for i in range(100)] + + +@queue.task() +def load(data: list) -> int: + return len(data) + + +@queue.task() +def reconcile() -> str: + return "all regions reconciled" + + +@queue.workflow("region_etl") +def region_etl(region: str) -> Workflow: + wf = Workflow() + wf.step("extract", extract, args=(region,)) + wf.step("load", load, after="extract") + return wf + + +@queue.workflow("global_etl") +def global_etl() -> Workflow: + wf = Workflow() + wf.step("eu", region_etl.as_step(region="eu")) + wf.step("us", region_etl.as_step(region="us")) + wf.step("ap", region_etl.as_step(region="ap")) + wf.step("reconcile", reconcile, after=["eu", "us", "ap"]) + return wf + + +run = global_etl.submit() +run.wait(timeout=120) +``` + +EU, US, and AP ETL pipelines run concurrently as child workflows. +`reconcile` runs after all three complete. + +--- + +## Incremental re-runs + +Skip unchanged steps on the second run. + +```python +wf = Workflow(name="nightly", cache_ttl=86400) # 24h TTL +wf.step("extract", extract) +wf.step("transform", transform, after="extract") +wf.step("load", load, after="transform") + +# First run: everything executes +run1 = queue.submit_workflow(wf) +run1.wait() + +# Next day: skip completed steps +run2 = queue.submit_workflow(wf, incremental=True, base_run=run1.id) +run2.wait() + +for name, node in run2.status().nodes.items(): + print(f"{name}: {node.status}") +# extract: cache_hit +# transform: cache_hit +# load: cache_hit +``` + +--- + +## Pre-execution analysis + +Analyze a workflow before submitting it. + +```python +wf = Workflow(name="complex") +wf.step("a", task_a) +wf.step("b", task_b, after="a") +wf.step("c", task_c, after="a") +wf.step("d", task_d, after=["b", "c"]) + +# Structure +print(wf.topological_levels()) +# [["a"], ["b", "c"], ["d"]] + +print(wf.stats()) +# {"nodes": 4, "edges": 4, "depth": 3, "width": 2, "density": 0.67} + +# Critical path with estimated durations +path, cost = wf.critical_path({"a": 2, "b": 10, "c": 3, "d": 1}) +print(f"Critical path: {path}, cost: {cost}s") +# Critical path: ["a", "b", "d"], cost: 13s + +# Bottleneck +analysis = wf.bottleneck_analysis({"a": 2, "b": 10, "c": 3, "d": 1}) +print(analysis["suggestion"]) +# "b is the bottleneck (76.9% of total time). Consider optimizing." + +# Visualization +print(wf.visualize("mermaid")) +```