Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
230 changes: 230 additions & 0 deletions docs-next/content/docs/more/examples/benchmark.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,230 @@
---
title: Benchmark
description: "Throughput benchmark — enqueue / processing / latency, comparison with Celery and Dramatiq."
---

import { Callout } from "fumadocs-ui/components/callout";

Measure taskito's throughput by enqueuing and processing a large batch of
tasks.

## benchmark.py

```python
"""taskito throughput benchmark.

Measures:
1. Enqueue throughput (jobs/sec) using batch insert
2. Processing throughput (jobs/sec) with N workers
3. End-to-end latency
"""

import os
import threading
import time

from taskito import Queue

# ── Configuration ────────────────────────────────────────

NUM_JOBS = 10_000
NUM_WORKERS = os.cpu_count() or 4
DB_PATH = ":memory:" # In-memory for pure speed test

queue = Queue(db_path=DB_PATH, workers=NUM_WORKERS)

@queue.task()
def noop(x):
"""Minimal task — measures framework overhead."""
return x

@queue.task()
def cpu_light(x):
"""Light CPU work — string formatting."""
return f"processed-{x}-{'x' * 100}"

# ── Benchmark Functions ──────────────────────────────────

def bench_enqueue(task, n):
"""Measure batch enqueue throughput."""
args_list = [(i,) for i in range(n)]

start = time.perf_counter()
jobs = task.map(args_list)
elapsed = time.perf_counter() - start

rate = n / elapsed
print(f" Enqueued {n:,} jobs in {elapsed:.2f}s ({rate:,.0f} jobs/s)")
return jobs

def bench_process(jobs, timeout=120):
"""Measure processing throughput by waiting for all jobs."""
n = len(jobs)
start = time.perf_counter()

last = jobs[-1]
try:
last.result(timeout=timeout, poll_interval=0.01, max_poll_interval=0.1)
except TimeoutError:
stats = queue.stats()
print(f" Timed out! Stats: {stats}")
return

elapsed = time.perf_counter() - start
rate = n / elapsed
print(f" Processed {n:,} jobs in {elapsed:.2f}s ({rate:,.0f} jobs/s)")

def bench_latency(task, samples=100):
"""Measure single-job round-trip latency."""
latencies = []
for i in range(samples):
start = time.perf_counter()
job = task.delay(i)
job.result(timeout=10)
latencies.append(time.perf_counter() - start)

avg = sum(latencies) / len(latencies)
p50 = sorted(latencies)[len(latencies) // 2]
p99 = sorted(latencies)[int(len(latencies) * 0.99)]
print(f" Latency (n={samples}): avg={avg*1000:.1f}ms p50={p50*1000:.1f}ms p99={p99*1000:.1f}ms")

# ── Main ─────────────────────────────────────────────────

def main():
print(f"taskito benchmark")
print(f" Workers: {NUM_WORKERS}")
print(f" Jobs: {NUM_JOBS:,}")
print(f" DB: {DB_PATH}")
print()

# Start worker in background
worker_thread = threading.Thread(target=queue.run_worker, daemon=True)
worker_thread.start()
time.sleep(0.5)

print("── noop task (framework overhead) ──")
jobs = bench_enqueue(noop, NUM_JOBS)
bench_process(jobs)
print()

print("── cpu_light task ──")
jobs = bench_enqueue(cpu_light, NUM_JOBS)
bench_process(jobs)
print()

print("── single-job latency ──")
bench_latency(noop)
print()

stats = queue.stats()
print(f"Final stats: {stats}")

if __name__ == "__main__":
main()
```

## Running

```bash
python benchmark.py
```

## Sample output

```
taskito benchmark
Workers: 8
Jobs: 10,000
DB: :memory:

── noop task (framework overhead) ──
Enqueued 10,000 jobs in 0.18s (55,556 jobs/s)
Processed 10,000 jobs in 2.41s (4,149 jobs/s)

── cpu_light task ──
Enqueued 10,000 jobs in 0.19s (52,632 jobs/s)
Processed 10,000 jobs in 2.53s (3,953 jobs/s)

── single-job latency ──
Latency (n=100): avg=1.2ms p50=1.1ms p99=3.4ms

Final stats: {'pending': 0, 'running': 0, 'completed': 20100, 'failed': 0, 'dead': 0, 'cancelled': 0}
```

<Callout type="info">
Actual numbers depend on your hardware, Python version, and SQLite
configuration. The numbers above are from an 8-core machine with Python
3.12.
</Callout>

## What makes taskito fast

| Component | How it helps |
|---|---|
| **Batch inserts** | `task.map()` inserts all jobs in a single SQLite transaction |
| **WAL mode** | Concurrent reads while writing — workers don't block enqueue |
| **Rust scheduler** | 50ms poll loop runs in native code, not Python |
| **OS threads** | Workers are Rust `std::thread`, not Python threads |
| **GIL per task** | GIL acquired only during Python task execution, released between tasks |
| **tokio mpsc channels** | Bounded async dispatch to workers |
| **r2d2 pool** | Up to 8 concurrent SQLite connections |
| **Diesel ORM** | Compiled SQL queries, no runtime query building |

## How it compares

Rough directional comparison on the same hardware (8-core, single
machine). These are not scientific benchmarks — run the script above on
your own hardware for accurate numbers.

| Metric | taskito (SQLite) | taskito (Postgres) | Celery + Redis | Dramatiq + Redis |
|--------|-----------------|-------------------|---------------|-----------------|
| Enqueue throughput | ~55,000/s | ~20,000/s | ~5,000/s | ~3,000/s |
| Processing (noop, 8 workers) | ~4,000/s | ~3,500/s | ~2,000/s | ~1,500/s |
| p50 latency | 1.1ms | 2.5ms | 5–10ms | 8–15ms |
| p99 latency | 3.4ms | 8ms | 20–50ms | 30–80ms |
| Memory (idle worker) | ~30 MB | ~35 MB | ~80 MB | ~60 MB |
| Setup | `pip install taskito` | + Postgres | + Redis + Celery | + Redis + Dramatiq |
| External services | 0 | 1 (Postgres) | 2 (Redis + result backend) | 1 (Redis) |

<Callout type="info">
Celery numbers are from public benchmarks and community reports. Your
mileage will vary depending on workload, serializer, and broker
configuration. Run your own benchmarks before making decisions.
</Callout>

**Why is taskito faster?**

- Rust scheduler avoids GIL contention — scheduling and dispatch never block Python
- SQLite WAL mode with batch inserts — disk I/O is minimized
- Direct DB polling — no broker hop (enqueue → DB → dequeue is one less network round-trip vs enqueue → Redis → dequeue)
- OS thread pool with per-task GIL acquisition — no multiprocessing overhead for I/O-bound tasks

## Tune for your workload

| Symptom | Config to change | Why |
|---------|-----------------|-----|
| Low throughput (I/O tasks) | Increase `workers` | More threads = more concurrent I/O |
| Low throughput (CPU tasks) | Use `pool="prefork"` | Each process gets its own GIL |
| High latency | Decrease `scheduler_poll_interval_ms` | Scheduler checks for ready jobs more often |
| Database too busy | Increase `scheduler_poll_interval_ms` | Less frequent polling reduces DB load |
| Memory growing | Set `result_ttl` | Auto-cleanup old results and metrics |
| Jobs timing out | Increase `default_timeout` | Give tasks more time to complete |
| Jobs piling up | Add more workers or use Postgres | SQLite single-writer limit may bottleneck |

## Tuning

Adjust these for your workload:

```python
# More workers for I/O-bound tasks
queue = Queue(workers=16)

# Fewer workers for CPU-bound tasks (limited by GIL)
queue = Queue(workers=4)

# In-memory DB for maximum throughput (no persistence)
queue = Queue(db_path=":memory:")

# File DB for durability (slightly slower)
queue = Queue(db_path="tasks.db")
```
Loading
Loading