From 9fecc3449e3c4000593071e9533b41d3594b995e Mon Sep 17 00:00:00 2001
From: Pratyush Sharma <56130065+pratyush618@users.noreply.github.com>
Date: Sun, 3 May 2026 02:26:22 +0530
Subject: [PATCH] docs(docs-next): port guides/advanced-execution + operations
(phase 4d)
---
.../guides/advanced-execution/async-tasks.mdx | 205 ++++++++
.../advanced-execution/batch-enqueue.mdx | 44 ++
.../advanced-execution/dependencies.mdx | 272 +++++++++++
.../docs/guides/advanced-execution/index.mdx | 15 +-
.../docs/guides/advanced-execution/meta.json | 10 +-
.../guides/advanced-execution/prefork.mdx | 106 +++++
.../guides/advanced-execution/streaming.mdx | 135 ++++++
.../advanced-execution/unique-tasks.mdx | 27 ++
.../docs/guides/operations/deployment.mdx | 380 +++++++++++++++
.../content/docs/guides/operations/index.mdx | 16 +-
.../docs/guides/operations/job-management.mdx | 203 ++++++++
.../content/docs/guides/operations/keda.mdx | 215 +++++++++
.../content/docs/guides/operations/meta.json | 11 +-
.../docs/guides/operations/migration.mdx | 359 ++++++++++++++
.../docs/guides/operations/postgres.mdx | 246 ++++++++++
.../docs/guides/operations/testing.mdx | 438 ++++++++++++++++++
.../guides/operations/troubleshooting.mdx | 259 +++++++++++
17 files changed, 2929 insertions(+), 12 deletions(-)
create mode 100644 docs-next/content/docs/guides/advanced-execution/async-tasks.mdx
create mode 100644 docs-next/content/docs/guides/advanced-execution/batch-enqueue.mdx
create mode 100644 docs-next/content/docs/guides/advanced-execution/dependencies.mdx
create mode 100644 docs-next/content/docs/guides/advanced-execution/prefork.mdx
create mode 100644 docs-next/content/docs/guides/advanced-execution/streaming.mdx
create mode 100644 docs-next/content/docs/guides/advanced-execution/unique-tasks.mdx
create mode 100644 docs-next/content/docs/guides/operations/deployment.mdx
create mode 100644 docs-next/content/docs/guides/operations/job-management.mdx
create mode 100644 docs-next/content/docs/guides/operations/keda.mdx
create mode 100644 docs-next/content/docs/guides/operations/migration.mdx
create mode 100644 docs-next/content/docs/guides/operations/postgres.mdx
create mode 100644 docs-next/content/docs/guides/operations/testing.mdx
create mode 100644 docs-next/content/docs/guides/operations/troubleshooting.mdx
diff --git a/docs-next/content/docs/guides/advanced-execution/async-tasks.mdx b/docs-next/content/docs/guides/advanced-execution/async-tasks.mdx
new file mode 100644
index 0000000..e0fdc95
--- /dev/null
+++ b/docs-next/content/docs/guides/advanced-execution/async-tasks.mdx
@@ -0,0 +1,205 @@
+---
+title: Native Async Tasks
+description: "async def tasks dispatched directly onto a dedicated event loop."
+---
+
+import { Callout } from "fumadocs-ui/components/callout";
+
+taskito runs async task functions natively — no wrapping in `asyncio.run()`,
+no thread-pool bridging. Define a coroutine and the worker dispatches it
+directly onto a dedicated event loop.
+
+```python
+from taskito import Queue
+
+queue = Queue(db_path="myapp.db")
+
+@queue.task()
+async def fetch_data(url: str) -> dict:
+ import httpx
+ async with httpx.AsyncClient() as client:
+ response = await client.get(url)
+ return response.json()
+```
+
+Enqueue it the same way as a sync task:
+
+```python
+job = fetch_data.delay("https://api.example.com/data")
+result = job.result(timeout=30)
+```
+
+## How it works
+
+When a task decorated with `@queue.task()` is an `async def`, taskito marks
+it for native dispatch. At worker startup, a `NativeAsyncPool` is initialized
+alongside the standard thread pool. When the scheduler dequeues an async job
+it routes it to the native pool instead of a sync worker thread.
+
+(Tokio)"] -->|"sync job"| TP["Thread Pool
spawn_blocking"]
+ S -->|"async job"| AP["Native Async Pool
AsyncTaskExecutor"]
+ AP --> EL["Dedicated Event Loop
(daemon thread)"]
+ EL -->|"result"| RS["PyResultSender"]
+ RS -->|"report_success / report_failure"| S`}
+/>
+
+The dedicated event loop lives in its own Python daemon thread. All async
+tasks for a single worker share that loop; a semaphore caps simultaneous
+execution.
+
+## Concurrency limit
+
+Control how many async tasks run concurrently on the event loop:
+
+```python
+queue = Queue(
+ db_path="myapp.db",
+ async_concurrency=50, # default: 100
+)
+```
+
+This is independent of the `workers` parameter (sync thread count). A typical
+mixed setup might be:
+
+```python
+queue = Queue(
+ db_path="myapp.db",
+ workers=4, # 4 OS threads for sync tasks
+ async_concurrency=200, # up to 200 concurrent async tasks
+)
+```
+
+## Job context
+
+`current_job` works inside async tasks — it reads from `contextvars` rather
+than `threading.local`, so it's safe across `await` boundaries:
+
+```python
+from taskito.context import current_job
+
+@queue.task()
+async def process(item_id: str) -> str:
+ current_job.log(f"Starting {item_id}")
+ await asyncio.sleep(1)
+ current_job.update_progress(50)
+ await asyncio.sleep(1)
+ current_job.update_progress(100)
+ return f"done:{item_id}"
+```
+
+Each async task gets an isolated `ContextVar` token. Concurrent tasks on the
+same loop do not see each other's contexts.
+
+## Resource injection
+
+Async tasks support `inject=` and `Inject["name"]` annotations the same way
+as sync tasks:
+
+```python
+@queue.worker_resource("http_client")
+def make_http_client():
+ import httpx
+ return httpx.AsyncClient()
+
+@queue.task(inject=["http_client"])
+async def fetch(url: str, http_client=None) -> dict:
+ response = await http_client.get(url)
+ return response.json()
+```
+
+
+ Resource initialization still runs on worker startup in the main thread.
+ The resource instance is then passed into the async task at dispatch time.
+
+
+## Middleware
+
+Middleware `before()` and `after()` hooks run for async tasks the same as for
+sync tasks. They are called from within the async execution context, so
+`current_job` is available:
+
+```python
+class LoggingMiddleware(TaskMiddleware):
+ def before(self, ctx):
+ current_job.log("task started")
+
+ def after(self, ctx, result, error):
+ current_job.log("task finished")
+```
+
+## Retry filtering
+
+`retry_on` and `dont_retry_on` on `@queue.task()` apply to async tasks:
+
+```python
+@queue.task(
+ max_retries=5,
+ retry_on=[httpx.TimeoutException],
+ dont_retry_on=[httpx.HTTPStatusError],
+)
+async def fetch_with_retry(url: str) -> dict:
+ ...
+```
+
+## Mixing sync and async tasks
+
+A single queue handles both sync and async tasks transparently. The worker
+dispatches each job to the correct pool based on the `_taskito_is_async`
+attribute set at registration time:
+
+```python
+@queue.task()
+def sync_task(x: int) -> int:
+ return x * 2
+
+@queue.task()
+async def async_task(x: int) -> int:
+ await asyncio.sleep(0.1)
+ return x * 2
+```
+
+Both are enqueued, retried, rate-limited, and monitored identically.
+
+## Feature flag
+
+The native async dispatch path is compiled into the `taskito-async` Rust
+crate and enabled via the `native-async` feature flag. Pre-built wheels on
+PyPI include it by default. If you build from source:
+
+```bash
+maturin develop --features native-async
+```
+
+Without the feature, async tasks are still enqueued and processed — they
+fall back to running via `asyncio.run()` on a worker thread.
+
+## Async queue methods
+
+All inspection methods have async variants that run in a thread pool:
+
+```python
+# Sync
+stats = queue.stats()
+dead = queue.dead_letters()
+new_id = queue.retry_dead(dead_id)
+cancelled = queue.cancel_job(job_id)
+result = job.result(timeout=30)
+
+# Async equivalents
+stats = await queue.astats()
+dead = await queue.adead_letters()
+new_id = await queue.aretry_dead(dead_id)
+cancelled = await queue.acancel_job(job_id)
+result = await job.aresult(timeout=30)
+```
+
+### Async worker
+
+```python
+async def main():
+ await queue.arun_worker(queues=["default"])
+
+asyncio.run(main())
+```
diff --git a/docs-next/content/docs/guides/advanced-execution/batch-enqueue.mdx b/docs-next/content/docs/guides/advanced-execution/batch-enqueue.mdx
new file mode 100644
index 0000000..dc056c2
--- /dev/null
+++ b/docs-next/content/docs/guides/advanced-execution/batch-enqueue.mdx
@@ -0,0 +1,44 @@
+---
+title: Batch Enqueue
+description: "Insert many jobs in a single SQLite transaction with task.map() and enqueue_many()."
+---
+
+Insert many jobs in a single SQLite transaction for high throughput.
+
+## `task.map()`
+
+```python
+@queue.task()
+def process(item_id):
+ return fetch_and_process(item_id)
+
+# Enqueue 1000 jobs in one transaction
+jobs = process.map([(i,) for i in range(1000)])
+```
+
+## `queue.enqueue_many()`
+
+```python
+# Basic batch — same options for all jobs
+jobs = queue.enqueue_many(
+ task_name="myapp.process",
+ args_list=[(i,) for i in range(1000)],
+ priority=5,
+ queue="processing",
+)
+
+# Full parity with enqueue() — per-job overrides
+jobs = queue.enqueue_many(
+ task_name="myapp.process",
+ args_list=[(i,) for i in range(100)],
+ delay=5.0, # uniform 5s delay for all
+ unique_keys=[f"item-{i}" for i in range(100)], # per-job dedup
+ metadata='{"source": "batch"}', # uniform metadata
+ expires=3600.0, # expire after 1 hour
+ result_ttl=600, # keep results for 10 minutes
+)
+```
+
+Per-job lists (`delay_list`, `metadata_list`, `expires_list`,
+`result_ttl_list`) override uniform values when both are provided. See the
+[API reference](/docs/api-reference/queue) for the full parameter list.
diff --git a/docs-next/content/docs/guides/advanced-execution/dependencies.mdx b/docs-next/content/docs/guides/advanced-execution/dependencies.mdx
new file mode 100644
index 0000000..952ad20
--- /dev/null
+++ b/docs-next/content/docs/guides/advanced-execution/dependencies.mdx
@@ -0,0 +1,272 @@
+---
+title: Task Dependencies
+description: "DAG-style depends_on for jobs — cascade cancel, missing-dep handling, common patterns."
+---
+
+import { Callout } from "fumadocs-ui/components/callout";
+import { Tab, Tabs } from "fumadocs-ui/components/tabs";
+
+taskito supports declaring dependencies between jobs, allowing you to build
+DAG-style workflows where a job only runs after its upstream dependencies
+have completed successfully.
+
+## Basic usage
+
+Pass `depends_on` when enqueuing a job to declare that it should wait for one
+or more other jobs to finish:
+
+
+
+
+```python
+job_a = extract.delay(url)
+
+# job_b won't start until job_a completes successfully
+job_b = transform.apply_async(
+ args=(job_a.id,),
+ depends_on=job_a.id,
+)
+```
+
+
+
+
+```python
+job_a = fetch.delay("https://api1.example.com")
+job_b = fetch.delay("https://api2.example.com")
+
+# job_c waits for both job_a and job_b
+job_c = merge.apply_async(
+ args=(),
+ depends_on=[job_a.id, job_b.id],
+)
+```
+
+
+
+
+The `depends_on` parameter accepts:
+
+| Value | Description |
+|---|---|
+| `str` | A single job ID |
+| `list[str]` | Multiple job IDs (all must complete) |
+| `None` (default) | No dependencies |
+
+
+ You can also use `depends_on` with `queue.enqueue()` directly:
+
+ ```python
+ job_id = queue.enqueue(
+ task_name="myapp.tasks.merge",
+ args=(),
+ depends_on=[job_a.id, job_b.id],
+ )
+ ```
+
+
+## How it works
+
+1. When a job with `depends_on` is enqueued, it enters a **waiting** state
+2. The scheduler periodically checks waiting jobs to see if all dependencies have completed
+3. Once every dependency has `status=completed`, the job transitions to `pending` and becomes eligible for dispatch
+4. If any dependency fails, dies, or is cancelled, the dependent job is **cascade cancelled**
+
+ W["Status: Waiting"]
+ W --> CHECK{"All deps completed?"}
+ CHECK -->|Yes| P["Status: Pending"]
+ CHECK -->|"Any dep failed/dead/cancelled"| CC["Cascade Cancel"]
+ P --> R["Dispatched to worker"]
+ CC --> DONE["Status: Cancelled
reason: dependency_failed"]`}
+/>
+
+## Cascade cancel
+
+When a dependency fails (exhausts retries and moves to DLQ), dies, or is
+cancelled, all downstream dependents are automatically cancelled. This
+propagates transitively through the entire dependency graph:
+
+```python
+job_a = step_one.delay()
+job_b = step_two.apply_async(args=(), depends_on=job_a.id)
+job_c = step_three.apply_async(args=(), depends_on=job_b.id)
+
+# If job_a fails permanently:
+# - job_b is cascade cancelled
+# - job_c is cascade cancelled (transitive)
+```
+
+
+ As soon as a dependency enters a terminal failure state (`dead` or
+ `cancelled`), all downstream dependents are cancelled in the same scheduler
+ tick. There is no grace period.
+
+
+## Inspecting dependencies
+
+### `job.dependencies`
+
+Returns the list of job IDs this job depends on:
+
+```python
+job_c = merge.apply_async(
+ args=(),
+ depends_on=[job_a.id, job_b.id],
+)
+
+fetched = queue.get_job(job_c.id)
+print(fetched.dependencies) # ['01H5K6X...', '01H5K7Y...']
+```
+
+### `job.dependents`
+
+Returns the list of job IDs that depend on this job:
+
+```python
+fetched_a = queue.get_job(job_a.id)
+print(fetched_a.dependents) # ['01H5K8Z...'] (job_c's ID)
+```
+
+## Error handling
+
+### Missing dependencies
+
+If you reference a job ID that does not exist, enqueue raises a `ValueError`:
+
+```python
+try:
+ job = transform.apply_async(
+ args=(),
+ depends_on="nonexistent-job-id",
+ )
+except ValueError as e:
+ print(e) # "Dependency job 'nonexistent-job-id' not found"
+```
+
+### Already-dead dependencies
+
+If a dependency is already in a terminal failure state (`dead` or
+`cancelled`) at enqueue time, the dependent job is immediately cancelled:
+
+```python
+dead_job = queue.get_job(some_dead_id)
+assert dead_job.status == "dead"
+
+# This job is immediately cancelled — it will never run
+job = transform.apply_async(
+ args=(),
+ depends_on=dead_job.id,
+)
+
+fetched = queue.get_job(job.id)
+print(fetched.status) # "cancelled"
+```
+
+## DAG workflow examples
+
+### Diamond pattern
+
+A classic diamond dependency graph where two branches converge:
+
+ B["transform_a.apply_async(depends_on=A)"]
+ A --> C["transform_b.apply_async(depends_on=A)"]
+ B --> D["load.apply_async(depends_on=[B, C])"]
+ C --> D`}
+/>
+
+```python
+# Extract
+job_a = extract.delay(source_url)
+
+# Two parallel transforms, each depending on extract
+job_b = transform_a.apply_async(
+ args=("schema_a",),
+ depends_on=job_a.id,
+)
+job_c = transform_b.apply_async(
+ args=("schema_b",),
+ depends_on=job_a.id,
+)
+
+# Load waits for both transforms
+job_d = load.apply_async(
+ args=(),
+ depends_on=[job_b.id, job_c.id],
+)
+```
+
+### Multi-stage pipeline
+
+A sequential pipeline with fan-out at one stage:
+
+```python
+# Stage 1: Download
+download_jobs = [
+ download.delay(url) for url in urls
+]
+
+# Stage 2: Process each download (each depends on its own download)
+process_jobs = [
+ process.apply_async(
+ args=(url,),
+ depends_on=dl.id,
+ )
+ for dl, url in zip(download_jobs, urls)
+]
+
+# Stage 3: Aggregate all results
+aggregate_job = aggregate.apply_async(
+ args=(),
+ depends_on=[j.id for j in process_jobs],
+)
+```
+
+### Conditional branching
+
+Combine dependencies with metadata to build conditional workflows:
+
+```python
+job_a = validate.delay(data)
+
+# Both branches depend on validation
+job_success = on_valid.apply_async(
+ args=(data,),
+ depends_on=job_a.id,
+ metadata='{"branch": "success"}',
+)
+
+# Use a separate task to handle the "validation failed" path
+# by inspecting job_a's result in the task body
+```
+
+
+ `depends_on` is a lower-level primitive than chains, groups, and chords.
+ Use `depends_on` when you need fine-grained control over a custom DAG. Use
+ the workflow primitives when your pipeline fits a standard pattern.
+
+
+## Combining with other features
+
+Dependencies compose naturally with other taskito features:
+
+```python
+job = transform.apply_async(
+ args=(data,),
+ depends_on=job_a.id,
+ priority=10, # High priority once unblocked
+ queue="processing", # Target queue
+ max_retries=5, # Retry policy
+ delay=60, # Additional delay after deps resolve
+ unique_key="transform-daily", # Deduplication
+)
+```
+
+
+ When both `delay` and `depends_on` are set, the job first waits for all
+ dependencies to complete, then waits for the additional delay before
+ becoming eligible for dispatch.
+
diff --git a/docs-next/content/docs/guides/advanced-execution/index.mdx b/docs-next/content/docs/guides/advanced-execution/index.mdx
index c73957a..d0849f1 100644
--- a/docs-next/content/docs/guides/advanced-execution/index.mdx
+++ b/docs-next/content/docs/guides/advanced-execution/index.mdx
@@ -1,10 +1,15 @@
---
title: Advanced execution
-description: "Async tasks, native async, prefork pool, batching."
+description: "Patterns for scaling and optimizing task execution."
---
-import { Callout } from 'fumadocs-ui/components/callout';
+Patterns for scaling and optimizing task execution.
-
- Content port pending. See the [Zensical source](https://github.com/ByteVeda/taskito/tree/master/docs) for current text.
-
+| Guide | Description |
+|---|---|
+| [Prefork Pool](/docs/guides/advanced-execution/prefork) | Process-based isolation for CPU-bound or memory-leaking tasks |
+| [Native Async Tasks](/docs/guides/advanced-execution/async-tasks) | `async def` tasks with native event loop integration |
+| [Result Streaming](/docs/guides/advanced-execution/streaming) | Stream partial results and progress updates in real time |
+| [Dependencies](/docs/guides/advanced-execution/dependencies) | DAG-based job dependencies — run tasks in order |
+| [Batch Enqueue](/docs/guides/advanced-execution/batch-enqueue) | Enqueue many jobs efficiently with `task.map()` and `enqueue_many()` |
+| [Unique Tasks](/docs/guides/advanced-execution/unique-tasks) | Deduplicate active jobs with unique keys |
diff --git a/docs-next/content/docs/guides/advanced-execution/meta.json b/docs-next/content/docs/guides/advanced-execution/meta.json
index 0b863a5..f336679 100644
--- a/docs-next/content/docs/guides/advanced-execution/meta.json
+++ b/docs-next/content/docs/guides/advanced-execution/meta.json
@@ -1,4 +1,12 @@
{
"title": "Advanced execution",
- "pages": ["index"]
+ "pages": [
+ "index",
+ "prefork",
+ "async-tasks",
+ "streaming",
+ "dependencies",
+ "batch-enqueue",
+ "unique-tasks"
+ ]
}
diff --git a/docs-next/content/docs/guides/advanced-execution/prefork.mdx b/docs-next/content/docs/guides/advanced-execution/prefork.mdx
new file mode 100644
index 0000000..4427edb
--- /dev/null
+++ b/docs-next/content/docs/guides/advanced-execution/prefork.mdx
@@ -0,0 +1,106 @@
+---
+title: Prefork Worker Pool
+description: "Spawn child processes for true CPU parallelism — each child has its own GIL."
+---
+
+Spawn separate child processes for true CPU parallelism. Each child has its
+own Python GIL, so CPU-bound tasks don't block each other.
+
+## When to use
+
+| Workload | Recommended pool | Why |
+|----------|-----------------|-----|
+| I/O-bound (HTTP calls, DB queries) | `thread` (default) | Threads release the GIL during I/O |
+| CPU-bound (data processing, ML) | `prefork` | Each process owns its GIL |
+| Mixed workloads | `prefork` | CPU tasks benefit; I/O tasks work fine too |
+
+## Getting started
+
+```python
+queue = Queue(db_path="myapp.db", workers=4)
+queue.run_worker(pool="prefork", app="myapp:queue")
+```
+
+```bash
+taskito worker --app myapp:queue --pool prefork
+```
+
+The `app` parameter tells each child process how to import your Queue
+instance. It must be an importable path in `module:attribute` format.
+
+## How it works
+
+(Rust)"] -->|"Job JSON"| P["PreforkPool"]
+ P -->|stdin| C1["Child 1
(own GIL)"]
+ P -->|stdin| C2["Child 2
(own GIL)"]
+ P -->|stdin| CN["Child N
(own GIL)"]
+
+ C1 -->|stdout| R1["Reader 1"]
+ C2 -->|stdout| R2["Reader 2"]
+ CN -->|stdout| RN["Reader N"]
+
+ R1 -->|JobResult| RCH["Result Channel"]
+ R2 -->|JobResult| RCH
+ RN -->|JobResult| RCH
+
+ RCH --> ML["Result Handler"]
+ ML -->|"complete / retry / DLQ"| DB[("Storage")]`}
+/>
+
+1. The Rust scheduler dequeues jobs from storage
+2. `PreforkPool` serializes each job as JSON and writes it to the least-loaded child's stdin pipe
+3. Each child deserializes the job, executes the task wrapper (with middleware, resources, proxies), and writes the result as JSON to stdout
+4. Reader threads parse results and feed them back to the scheduler
+5. The scheduler updates job status in storage
+
+## Configuration
+
+| Parameter | Type | Default | Description |
+|-----------|------|---------|-------------|
+| `pool` | `str` | `"thread"` | Set to `"prefork"` to enable |
+| `app` | `str` | — | Import path to Queue instance (required) |
+| `workers` | `int` | CPU count | Number of child processes |
+
+## Migrating from thread pool
+
+The thread pool is the default. To switch to prefork:
+
+```python
+# Before (thread pool)
+queue.run_worker()
+
+# After (prefork)
+queue.run_worker(pool="prefork", app="myapp:queue")
+```
+
+Everything else stays the same — task decorators, middleware, resources,
+events, and the scheduler all work identically. The only difference is where
+task code executes (child process vs. worker thread).
+
+## Debugging child processes
+
+Children inherit the parent's stderr, so `print()` statements and Python
+logging appear in the parent's terminal.
+
+Enable debug logging to see child lifecycle events:
+
+```python
+import logging
+logging.getLogger("taskito.prefork.child").setLevel(logging.DEBUG)
+```
+
+Log output includes:
+- `child ready (app=..., pid=...)` — child initialized and waiting for jobs
+- `executing task_name[job_id]` — job received (DEBUG level)
+- `task task_name[job_id] failed: ...` — task error
+- `shutdown received` — clean shutdown
+- `resource teardown error` — resource cleanup failure
+
+## Limitations
+
+- **Tasks must be importable**: Each child process imports the app module independently. Tasks defined inside functions or closures cannot be imported.
+- **No shared state**: Children are separate processes. In-memory caches, globals, or module-level state are not shared between children.
+- **Startup cost**: Each child imports the full app module on start. This happens once per child, not per job.
+- **Resource re-initialization**: Worker resources (DB connections, etc.) are initialized independently in each child.
diff --git a/docs-next/content/docs/guides/advanced-execution/streaming.mdx b/docs-next/content/docs/guides/advanced-execution/streaming.mdx
new file mode 100644
index 0000000..29a818a
--- /dev/null
+++ b/docs-next/content/docs/guides/advanced-execution/streaming.mdx
@@ -0,0 +1,135 @@
+---
+title: Result Streaming
+description: "Stream partial results from long-running tasks via current_job.publish() and job.stream()."
+---
+
+Stream intermediate results from long-running tasks. Instead of waiting for
+the final result, consumers receive partial data as it becomes available.
+
+## Publishing partial results
+
+Inside a task, call `current_job.publish(data)` to emit a partial result:
+
+```python
+from taskito import current_job
+
+@queue.task()
+def process_batch(items):
+ results = []
+ for i, item in enumerate(items):
+ result = process(item)
+ results.append(result)
+ current_job.publish({"item_id": item.id, "status": "ok", "result": result})
+ current_job.update_progress(int((i + 1) / len(items) * 100))
+ return {"total": len(items), "results": results}
+```
+
+`publish()` accepts any JSON-serializable value — dicts, lists, strings, numbers.
+
+## Consuming with `stream()`
+
+The caller iterates over partial results as they arrive:
+
+```python
+job = process_batch.delay(items)
+
+for partial in job.stream(timeout=120, poll_interval=0.5):
+ print(f"Processed item {partial['item_id']}: {partial['status']}")
+
+# After stream ends, get the final result
+final = job.result(timeout=5)
+```
+
+`stream()` polls the database for new partial results, yields each one, and
+stops when the job reaches a terminal state (complete, failed, dead,
+cancelled).
+
+| Parameter | Type | Default | Description |
+|-----------|------|---------|-------------|
+| `timeout` | `float` | `60.0` | Maximum seconds to wait |
+| `poll_interval` | `float` | `0.5` | Seconds between polls |
+
+## Async streaming
+
+Use `astream()` in async contexts:
+
+```python
+async for partial in job.astream(timeout=120, poll_interval=0.5):
+ print(f"Got: {partial}")
+```
+
+## FastAPI SSE
+
+The built-in FastAPI progress endpoint supports streaming partial results:
+
+```
+GET /jobs/{job_id}/progress?include_results=true
+```
+
+Events include partial results alongside progress:
+
+```
+data: {"status": "running", "progress": 25}
+data: {"status": "running", "progress": 25, "partial_result": {"item_id": 1, "status": "ok"}}
+data: {"status": "running", "progress": 50}
+data: {"status": "complete", "progress": 100}
+```
+
+## Patterns
+
+### ETL pipeline
+
+```python
+@queue.task()
+def etl_pipeline(source_tables):
+ for table in source_tables:
+ rows = extract(table)
+ transformed = transform(rows)
+ load(transformed)
+ current_job.publish({
+ "table": table,
+ "rows_processed": len(rows),
+ "status": "loaded",
+ })
+ return {"tables": len(source_tables)}
+```
+
+### ML training
+
+```python
+@queue.task()
+def train_model(config):
+ model = build_model(config)
+ for epoch in range(config["epochs"]):
+ loss = train_epoch(model)
+ current_job.publish({
+ "epoch": epoch + 1,
+ "loss": float(loss),
+ "lr": optimizer.param_groups[0]["lr"],
+ })
+ save_model(model)
+ return {"final_loss": float(loss)}
+```
+
+### Batch processing with error tracking
+
+```python
+@queue.task()
+def process_orders(order_ids):
+ for oid in order_ids:
+ try:
+ process_order(oid)
+ current_job.publish({"order_id": oid, "status": "ok"})
+ except Exception as e:
+ current_job.publish({"order_id": oid, "status": "error", "error": str(e)})
+ return {"total": len(order_ids)}
+```
+
+## How it works
+
+`publish()` stores data as a task log entry with `level="result"`, reusing
+the existing `task_logs` table. No new tables or Rust changes are needed.
+
+`stream()` polls `get_task_logs(job_id)`, filters for `level == "result"`,
+tracks the last-seen timestamp, and yields only new entries. It stops when
+the job's status becomes terminal.
diff --git a/docs-next/content/docs/guides/advanced-execution/unique-tasks.mdx b/docs-next/content/docs/guides/advanced-execution/unique-tasks.mdx
new file mode 100644
index 0000000..ef4c38b
--- /dev/null
+++ b/docs-next/content/docs/guides/advanced-execution/unique-tasks.mdx
@@ -0,0 +1,27 @@
+---
+title: Unique Tasks
+description: "Deduplicate active jobs via unique_key — partial unique index, atomic check-and-insert."
+---
+
+import { Callout } from "fumadocs-ui/components/callout";
+
+Deduplicate active jobs by key — if a job with the same `unique_key` is
+already pending or running, the existing job is returned instead of creating
+a new one:
+
+```python
+job1 = process.apply_async(args=("report",), unique_key="daily-report")
+job2 = process.apply_async(args=("report",), unique_key="daily-report")
+assert job1.id == job2.id # Same job, not duplicated
+```
+
+Once the original job completes (or fails to DLQ), the key is released and a
+new job can be created with the same key.
+
+
+ Deduplication uses a partial unique index:
+ `CREATE UNIQUE INDEX ... ON jobs(unique_key) WHERE unique_key IS NOT NULL AND status IN (0, 1)`.
+ Only pending and running jobs participate. The check-and-insert is atomic
+ (transaction-protected), so concurrent calls with the same `unique_key`
+ are handled gracefully without race conditions.
+
diff --git a/docs-next/content/docs/guides/operations/deployment.mdx b/docs-next/content/docs/guides/operations/deployment.mdx
new file mode 100644
index 0000000..ab05a51
--- /dev/null
+++ b/docs-next/content/docs/guides/operations/deployment.mdx
@@ -0,0 +1,380 @@
+---
+title: Deployment
+description: "systemd, Docker, WAL mode, backups, multi-worker, sizing, production checklist."
+---
+
+import { Callout } from "fumadocs-ui/components/callout";
+
+This guide covers running taskito in production environments.
+
+## SQLite file location
+
+Choose a persistent, backed-up location for your database:
+
+```python
+queue = Queue(db_path="/var/lib/myapp/taskito.db")
+```
+
+**Best practices:**
+
+- Use an absolute path — relative paths depend on the working directory
+- Place the database on local storage (not NFS or network mounts) — SQLite file locking doesn't work reliably over network filesystems
+- Ensure the directory exists and the worker process has read/write permissions
+- The database file, WAL file (`taskito.db-wal`), and shared memory file (`taskito.db-shm`) must all be on the same filesystem
+
+## systemd service
+
+Create `/etc/systemd/system/taskito-worker.service`:
+
+```ini
+[Unit]
+Description=taskito worker
+After=network.target
+
+[Service]
+Type=simple
+User=myapp
+Group=myapp
+WorkingDirectory=/opt/myapp
+ExecStart=/opt/myapp/.venv/bin/taskito worker --app myapp:queue
+Restart=always
+RestartSec=5
+
+# Graceful shutdown — taskito handles SIGINT
+KillSignal=SIGINT
+TimeoutStopSec=35
+
+# Environment
+Environment=PYTHONPATH=/opt/myapp
+
+[Install]
+WantedBy=multi-user.target
+```
+
+```bash
+sudo systemctl daemon-reload
+sudo systemctl enable taskito-worker
+sudo systemctl start taskito-worker
+
+# Check logs
+journalctl -u taskito-worker -f
+```
+
+
+ Set `TimeoutStopSec` to slightly longer than your longest task timeout
+ (default graceful shutdown is 30s). This gives in-flight tasks time to
+ complete before systemd force-kills the process.
+
+
+## Docker
+
+### Dockerfile
+
+```dockerfile
+FROM python:3.12-slim
+
+WORKDIR /app
+COPY requirements.txt .
+RUN pip install --no-cache-dir -r requirements.txt
+
+COPY . .
+
+# Store the database in a volume
+VOLUME /data
+ENV TASKITO_DB_PATH=/data/taskito.db
+
+CMD ["taskito", "worker", "--app", "myapp:queue"]
+```
+
+### docker-compose.yml
+
+```yaml
+services:
+ worker:
+ build: .
+ volumes:
+ - taskito-data:/data
+ stop_signal: SIGINT
+ stop_grace_period: 35s
+
+ dashboard:
+ build: .
+ command: taskito dashboard --app myapp:queue --host 0.0.0.0
+ volumes:
+ - taskito-data:/data
+ ports:
+ - "8080:8080"
+
+volumes:
+ taskito-data:
+```
+
+
+ The worker and dashboard must access the **same SQLite file**. In Docker,
+ use a named volume shared between containers. Do not use bind mounts on
+ network storage.
+
+
+### Graceful shutdown in containers
+
+taskito handles `SIGINT` for graceful shutdown. Configure your container
+orchestrator to send `SIGINT` (not `SIGTERM`):
+
+- **Docker Compose**: `stop_signal: SIGINT`
+- **Kubernetes**: use a `preStop` hook or configure `STOPSIGNAL` in the Dockerfile:
+
+```dockerfile
+STOPSIGNAL SIGINT
+```
+
+For Kubernetes, set `terminationGracePeriodSeconds` to match your longest
+task timeout:
+
+```yaml
+spec:
+ terminationGracePeriodSeconds: 60
+ containers:
+ - name: worker
+ ...
+```
+
+## WAL mode and backups
+
+taskito uses SQLite in WAL (Write-Ahead Logging) mode for concurrent
+read/write access. This affects how you back up the database.
+
+**Do NOT** simply copy the `.db` file while the worker is running — you may
+get a corrupted backup if the WAL hasn't been checkpointed.
+
+**Safe backup methods:**
+
+```bash
+# Option 1: Use sqlite3 .backup command (safe, online)
+sqlite3 /var/lib/myapp/taskito.db ".backup /backups/taskito-$(date +%Y%m%d).db"
+
+# Option 2: Use the SQLite VACUUM INTO command
+sqlite3 /var/lib/myapp/taskito.db "VACUUM INTO '/backups/taskito-$(date +%Y%m%d).db';"
+```
+
+Both methods are safe while the worker is running.
+
+## Postgres deployment
+
+If you're using the [Postgres backend](/docs/guides/operations/postgres),
+deployment is simpler in several ways:
+
+- **No shared-file constraints** — workers connect over the network, no need for shared volumes or local storage
+- **Multi-machine workers** — run workers on separate hosts against the same database
+- **Standard backups** — use `pg_dump` instead of `sqlite3 .backup`
+
+### Docker Compose with Postgres
+
+```yaml
+services:
+ postgres:
+ image: postgres:16
+ environment:
+ POSTGRES_DB: myapp
+ POSTGRES_USER: taskito
+ POSTGRES_PASSWORD: secret
+ volumes:
+ - pgdata:/var/lib/postgresql/data
+
+ worker:
+ build: .
+ environment:
+ TASKITO_BACKEND: postgres
+ TASKITO_DB_URL: postgresql://taskito:secret@postgres:5432/myapp
+ depends_on:
+ - postgres
+ stop_signal: SIGINT
+ stop_grace_period: 35s
+
+volumes:
+ pgdata:
+```
+
+### Postgres backups
+
+```bash
+# Dump the taskito schema
+pg_dump -h localhost -U taskito -d myapp -n taskito > backup.sql
+
+# Restore
+psql -h localhost -U taskito -d myapp < backup.sql
+```
+
+See the [Postgres Backend guide](/docs/guides/operations/postgres) for full
+configuration details.
+
+## Database maintenance
+
+### Auto-cleanup
+
+Set `result_ttl` to automatically purge old completed jobs:
+
+```python
+queue = Queue(
+ db_path="/var/lib/myapp/taskito.db",
+ result_ttl=86400, # Purge completed/dead jobs older than 24 hours
+)
+```
+
+### Manual cleanup
+
+```python
+# Purge completed jobs older than 7 days
+queue.purge_completed(older_than=604800)
+
+# Purge dead letters older than 30 days
+queue.purge_dead(older_than=2592000)
+```
+
+### Database size
+
+SQLite databases grow as jobs accumulate. Without cleanup, expect roughly:
+
+- ~1 KB per job (metadata + small payloads)
+- ~1-10 KB per job with large arguments or results
+
+With `result_ttl` set, the database stays compact. You can also periodically
+run `VACUUM` to reclaim space:
+
+```bash
+sqlite3 /var/lib/myapp/taskito.db "VACUUM;"
+```
+
+
+ `VACUUM` rewrites the entire database and requires exclusive access. Run
+ it during low-traffic periods or during a maintenance window.
+
+
+## Monitoring in production
+
+### Dashboard
+
+Run the built-in dashboard alongside the worker:
+
+```bash
+taskito dashboard --app myapp:queue --host 0.0.0.0 --port 8080
+```
+
+Place it behind a reverse proxy with authentication for production use —
+the dashboard has no built-in auth.
+
+### Programmatic stats
+
+Poll `queue.stats()` and export to your monitoring system:
+
+```python
+import time
+
+def export_metrics():
+ while True:
+ stats = queue.stats()
+ # Export to Prometheus, Datadog, StatsD, etc.
+ gauge("taskito.pending", stats["pending"])
+ gauge("taskito.running", stats["running"])
+ gauge("taskito.dead", stats["dead"])
+ time.sleep(15)
+```
+
+### Hooks for alerting
+
+```python
+@queue.on_failure
+def alert_on_failure(task_name, args, kwargs, error):
+ # Send to PagerDuty, Slack, email, etc.
+ notify(f"Task {task_name} failed: {error}")
+```
+
+### Health check endpoint
+
+If you're using FastAPI:
+
+```python
+from fastapi import FastAPI
+from taskito.contrib.fastapi import TaskitoRouter
+
+app = FastAPI()
+app.include_router(TaskitoRouter(queue), prefix="/tasks")
+
+# GET /tasks/stats returns queue health
+# Use this as a health check endpoint in your load balancer
+```
+
+## Multiple workers
+
+taskito is designed as a **single-process** task queue when using SQLite.
+Running multiple worker processes against the same SQLite file is possible
+(WAL mode allows concurrent access), but:
+
+- Only one process can write at a time — this limits throughput
+- SQLite lock contention increases with more writers
+- There is no distributed coordination between workers
+
+For most single-machine workloads, one worker process with multiple threads
+(the default) is sufficient:
+
+```python
+queue = Queue(
+ db_path="myapp.db",
+ workers=8, # 8 OS threads in the worker pool
+)
+```
+
+If you need distributed workers across multiple machines, use the
+[Postgres backend](/docs/guides/operations/postgres) which removes the
+single-writer constraint and supports multi-machine deployments.
+
+## SQLite scaling limits
+
+taskito uses SQLite as its storage backend. Understanding its limitations
+helps you plan for production:
+
+**Single-writer constraint.** SQLite allows only one write transaction at a
+time. WAL mode lets reads proceed concurrently with writes, but all writes
+are serialized. This is the primary throughput ceiling.
+
+**Expected throughput.** On modern hardware with an SSD, expect:
+
+- **1,000–5,000 jobs/second** for enqueue + dequeue cycles (small payloads)
+- Throughput decreases with larger payloads, complex queries, or spinning disks
+- The connection pool size (default: 8) controls read concurrency — tune it based on your read/write ratio
+
+**When to upgrade to Postgres:**
+
+- You need multi-machine distributed workers
+- You consistently exceed ~5,000 jobs/second sustained throughput
+- Multiple processes contend heavily for writes (high lock wait times)
+- You need sub-millisecond dequeue latency under high load
+
+taskito's [Postgres backend](/docs/guides/operations/postgres) addresses all
+of these limitations while keeping the same API.
+
+## Sizing your deployment
+
+| Throughput | Backend | Workers | Pool | Notes |
+|-----------|---------|---------|------|-------|
+| < 100 jobs/s | SQLite | 4 | thread | Default config works fine |
+| 100–1K jobs/s | SQLite | 8–16 | thread or prefork | Increase `workers`, monitor WAL size |
+| 1K–5K jobs/s | SQLite | 16 | prefork | Prefork for CPU-bound; SQLite handles this well with WAL |
+| 5K–20K jobs/s | Postgres | 16–32 | prefork | Switch to Postgres for concurrent writers |
+| 20K–50K jobs/s | Postgres | 32+ | prefork | Multiple worker processes, tune `pool_size` |
+| > 50K jobs/s | — | — | — | Consider Celery + RabbitMQ for this scale |
+
+
+ These are rough guidelines for noop tasks. Real throughput depends on task
+ duration, payload size, and I/O patterns.
+
+
+## Checklist
+
+- [ ] Use an absolute path for `db_path`
+- [ ] Place SQLite on local (not network) storage
+- [ ] Set `result_ttl` to prevent unbounded database growth
+- [ ] Set `timeout` on tasks to recover from worker crashes
+- [ ] Configure `SIGINT` as the stop signal in your process manager
+- [ ] Set up failure hooks or monitoring for alerting
+- [ ] Back up the database using `sqlite3 .backup` (not file copy), or `pg_dump` for Postgres
+- [ ] Place the dashboard behind a reverse proxy with authentication
diff --git a/docs-next/content/docs/guides/operations/index.mdx b/docs-next/content/docs/guides/operations/index.mdx
index 9328ba7..73456e9 100644
--- a/docs-next/content/docs/guides/operations/index.mdx
+++ b/docs-next/content/docs/guides/operations/index.mdx
@@ -1,10 +1,16 @@
---
title: Operations
-description: "Workers, pause/resume, namespacing, scaling, deployment."
+description: "Run taskito reliably in production — testing, deployment, scaling, migration."
---
-import { Callout } from 'fumadocs-ui/components/callout';
+Run taskito reliably in production.
-
- Content port pending. See the [Zensical source](https://github.com/ByteVeda/taskito/tree/master/docs) for current text.
-
+| Guide | Description |
+|---|---|
+| [Testing](/docs/guides/operations/testing) | Test mode, fixtures, mocking resources, and workflow testing |
+| [Job Management](/docs/guides/operations/job-management) | Cancel, pause, archive, revoke, replay, and clean up jobs |
+| [Troubleshooting](/docs/guides/operations/troubleshooting) | Diagnose stuck jobs, lock contention, and worker issues |
+| [Deployment](/docs/guides/operations/deployment) | systemd, Docker, WAL mode, Postgres, and production checklists |
+| [KEDA Autoscaling](/docs/guides/operations/keda) | Kubernetes event-driven autoscaling for workers |
+| [Postgres Backend](/docs/guides/operations/postgres) | Set up and run taskito with PostgreSQL |
+| [Migrating from Celery](/docs/guides/operations/migration) | Side-by-side comparison and step-by-step migration guide |
diff --git a/docs-next/content/docs/guides/operations/job-management.mdx b/docs-next/content/docs/guides/operations/job-management.mdx
new file mode 100644
index 0000000..7618939
--- /dev/null
+++ b/docs-next/content/docs/guides/operations/job-management.mdx
@@ -0,0 +1,203 @@
+---
+title: Job Management
+description: "Cancel, pause, archive, revoke, replay, purge, and clean up jobs."
+---
+
+import { Callout } from "fumadocs-ui/components/callout";
+
+Manage running jobs — cancel, pause queues, archive, revoke, replay, and clean up.
+
+## Job cancellation
+
+Cancel a pending job before it starts:
+
+```python
+job = send_email.delay("user@example.com", "Hello", "World")
+cancelled = queue.cancel_job(job.id) # True if was pending
+```
+
+- Returns `True` if the job was pending and is now cancelled
+- Returns `False` if the job was already running, completed, or in another non-pending state
+- Cancelled jobs cannot be un-cancelled
+
+## Result TTL & auto-cleanup
+
+### Manual cleanup
+
+```python
+# Purge completed jobs older than 1 hour
+deleted = queue.purge_completed(older_than=3600)
+
+# Purge dead letters older than 24 hours
+deleted = queue.purge_dead(older_than=86400)
+```
+
+### Automatic cleanup
+
+Set `result_ttl` on the Queue to automatically purge old jobs while the
+worker runs:
+
+```python
+queue = Queue(
+ db_path="myapp.db",
+ result_ttl=3600, # Auto-purge completed/dead jobs older than 1 hour
+)
+```
+
+The scheduler checks every ~60 seconds and purges:
+
+- Completed jobs older than `result_ttl`
+- Dead letter entries older than `result_ttl`
+- Error history records older than `result_ttl`
+
+Set to `None` (default) to disable auto-cleanup.
+
+### Cascade cleanup
+
+When jobs are purged — either manually via `purge_completed()` or
+automatically via `result_ttl` — their related child records are also deleted:
+
+- Error history (`job_errors`)
+- Task logs (`task_logs`)
+- Task metrics (`task_metrics`)
+- Job dependencies (`job_dependencies`)
+- Replay history (`replay_history`)
+
+This prevents orphaned records from accumulating when parent jobs are
+removed.
+
+```python
+# Manual purge — child records are cleaned up automatically
+deleted = queue.purge_completed(older_than=3600)
+print(f"Purged {deleted} jobs and their related records")
+
+# With per-job TTL — cascade cleanup still applies
+job = resize_image.apply_async(
+ args=("photo.jpg",),
+ result_ttl=600, # This job's results expire after 10 minutes
+)
+# When this job is purged (after 10 min), its errors, logs,
+# metrics, dependencies, and replay history are also removed.
+```
+
+
+ Dead letter entries are **not** cascade-deleted — they have their own
+ lifecycle managed by `purge_dead()`. Timestamp-based cleanup (`result_ttl`)
+ of error history, logs, and metrics also continues to run independently,
+ catching old records regardless of whether the parent job still exists.
+
+
+## Queue pause/resume
+
+Temporarily pause job processing on a queue without stopping the worker:
+
+```python
+# Pause the "emails" queue
+queue.pause("emails")
+
+# Check which queues are paused
+print(queue.paused_queues()) # ["emails"]
+
+# Resume processing
+queue.resume("emails")
+```
+
+Paused queues still accept new jobs — they just won't be dequeued until
+resumed.
+
+### Maintenance window example
+
+```python
+# Before maintenance: pause all queues
+for q in ["default", "emails", "reports"]:
+ queue.pause(q)
+print(f"Paused: {queue.paused_queues()}")
+
+# ... perform maintenance ...
+
+# After maintenance: resume all queues
+for q in ["default", "emails", "reports"]:
+ queue.resume(q)
+```
+
+## Job archival
+
+Move old completed jobs to an archive table to keep the main jobs table lean:
+
+```python
+# Archive completed jobs older than 24 hours
+archived_count = queue.archive(older_than=86400)
+print(f"Archived {archived_count} jobs")
+
+# Browse archived jobs
+archived = queue.list_archived(limit=50, offset=0)
+for job in archived:
+ print(f"{job.id}: {job.task_name} ({job.status})")
+```
+
+Archived jobs are no longer returned by `queue.stats()` or
+`queue.list_jobs()`, but remain queryable via `queue.list_archived()`.
+
+### Scheduled archival
+
+```python
+@queue.periodic(cron="0 0 2 * * *") # Daily at 2 AM
+def nightly_archival():
+ archived = queue.archive(older_than=7 * 86400) # Archive jobs older than 7 days
+ current_job.log(f"Archived {archived} jobs")
+```
+
+## Task revocation
+
+Cancel all pending jobs for a specific task:
+
+```python
+# Revoke all pending "send_newsletter" jobs
+cancelled = queue.revoke_task("myapp.tasks.send_newsletter")
+print(f"Revoked {cancelled} jobs")
+```
+
+## Queue purge
+
+Remove all pending jobs from a specific queue:
+
+```python
+purged = queue.purge("emails")
+print(f"Purged {purged} jobs from the emails queue")
+```
+
+## Job replay
+
+Replay a completed or dead job with the same arguments:
+
+```python
+new_job = queue.replay(job_id)
+print(f"Replayed as {new_job.id}")
+
+# Check replay history
+history = queue.replay_history(job_id)
+```
+
+### Retry from dead letter with replay
+
+```python
+# List dead letters and replay them
+dead = queue.dead_letters()
+for entry in dead:
+ print(f"Replaying dead job {entry['original_job_id']}: {entry['task_name']}")
+ new_id = queue.retry_dead(entry["id"])
+ print(f" -> New job: {new_id}")
+```
+
+## SQLite configuration
+
+taskito configures SQLite for optimal performance:
+
+| Pragma | Value | Purpose |
+|---|---|---|
+| `journal_mode` | WAL | Concurrent reads during writes |
+| `busy_timeout` | 5000ms | Wait instead of failing on lock contention |
+| `synchronous` | NORMAL | Balance between safety and speed |
+| `journal_size_limit` | 64MB | Prevent unbounded WAL growth |
+
+The connection pool uses up to 8 connections via `r2d2`.
diff --git a/docs-next/content/docs/guides/operations/keda.mdx b/docs-next/content/docs/guides/operations/keda.mdx
new file mode 100644
index 0000000..17950ec
--- /dev/null
+++ b/docs-next/content/docs/guides/operations/keda.mdx
@@ -0,0 +1,215 @@
+---
+title: KEDA Autoscaling
+description: "Kubernetes event-driven autoscaling for taskito workers via the bundled scaler server."
+---
+
+import { Callout } from "fumadocs-ui/components/callout";
+
+[KEDA](https://keda.sh) (Kubernetes Event-driven Autoscaling) can scale your
+taskito worker deployment up and down based on queue depth. taskito ships a
+dedicated scaler server that KEDA queries directly.
+
+## Scaler server
+
+Start the scaler alongside your worker:
+
+```bash
+taskito scaler --app myapp:queue --port 9091
+```
+
+| Flag | Default | Description |
+|---|---|---|
+| `--app` | — | Python path to the `Queue` instance |
+| `--host` | `0.0.0.0` | Bind address |
+| `--port` | `9091` | Bind port |
+| `--target-queue-depth` | `10` | Scaling target hint returned to KEDA |
+
+The scaler exposes three endpoints:
+
+| Endpoint | Description |
+|---|---|
+| `GET /api/scaler` | Returns current queue depth and scaling target for KEDA |
+| `GET /metrics` | Prometheus text format (requires `prometheus-client`) |
+| `GET /health` | Liveness check — always returns `{"status": "ok"}` |
+
+### `/api/scaler` response
+
+```json
+{
+ "metricValue": 42,
+ "targetValue": 10,
+ "queueName": "default"
+}
+```
+
+Filter to a specific queue:
+
+```
+GET /api/scaler?queue=emails
+```
+
+### Programmatic usage
+
+```python
+from taskito.scaler import serve_scaler
+
+serve_scaler(queue, host="0.0.0.0", port=9091, target_queue_depth=10)
+```
+
+## Kubernetes deployment
+
+Deploy the scaler as a separate `Deployment` and expose it as a `ClusterIP`
+service:
+
+```yaml
+apiVersion: apps/v1
+kind: Deployment
+metadata:
+ name: taskito-scaler
+spec:
+ replicas: 1
+ selector:
+ matchLabels:
+ app: taskito-scaler
+ template:
+ metadata:
+ labels:
+ app: taskito-scaler
+ spec:
+ containers:
+ - name: scaler
+ image: your-image:latest
+ command: ["taskito", "scaler", "--app", "myapp:queue", "--port", "9091"]
+ ports:
+ - containerPort: 9091
+---
+apiVersion: v1
+kind: Service
+metadata:
+ name: taskito-scaler
+spec:
+ selector:
+ app: taskito-scaler
+ ports:
+ - port: 9091
+ targetPort: 9091
+```
+
+## ScaledObject (HTTP trigger)
+
+Scale a long-running worker `Deployment` based on pending job count:
+
+```yaml
+apiVersion: keda.sh/v1alpha1
+kind: ScaledObject
+metadata:
+ name: taskito-worker
+ namespace: default
+spec:
+ scaleTargetRef:
+ name: taskito-worker # your worker Deployment name
+ pollingInterval: 15 # seconds between KEDA polls
+ cooldownPeriod: 60 # seconds before scaling to zero
+ minReplicaCount: 0 # scale to zero when idle
+ maxReplicaCount: 10
+ triggers:
+ - type: metrics-api
+ metadata:
+ url: "http://taskito-scaler:9091/api/scaler"
+ valueLocation: "metricValue"
+ targetValue: "10"
+```
+
+Filter to a specific queue name:
+
+```yaml
+ url: "http://taskito-scaler:9091/api/scaler?queue=emails"
+```
+
+## ScaledJob (ephemeral batch workers)
+
+For batch/ETL workloads, use `ScaledJob` to create short-lived Kubernetes
+Jobs — one pod per N pending tasks:
+
+```yaml
+apiVersion: keda.sh/v1alpha1
+kind: ScaledJob
+metadata:
+ name: taskito-batch-worker
+ namespace: default
+spec:
+ jobTargetRef:
+ template:
+ spec:
+ containers:
+ - name: taskito-worker
+ image: your-image:latest
+ command: ["taskito", "worker", "--app", "myapp:queue"]
+ restartPolicy: Never
+ pollingInterval: 15
+ successfulJobsHistoryLimit: 5
+ failedJobsHistoryLimit: 5
+ maxReplicaCount: 20
+ scalingStrategy:
+ strategy: default # or "accurate" for 1:1 job-to-pod mapping
+ triggers:
+ - type: metrics-api
+ metadata:
+ url: "http://taskito-scaler:9091/api/scaler"
+ valueLocation: "metricValue"
+ targetValue: "5" # one pod per 5 pending jobs
+```
+
+## Scaling with Prometheus
+
+If you already have Prometheus scraping your workers, you can skip the scaler
+server and use the Prometheus KEDA trigger directly:
+
+```yaml
+apiVersion: keda.sh/v1alpha1
+kind: ScaledObject
+metadata:
+ name: taskito-worker-prometheus
+ namespace: default
+spec:
+ scaleTargetRef:
+ name: taskito-worker
+ pollingInterval: 15
+ cooldownPeriod: 60
+ minReplicaCount: 0
+ maxReplicaCount: 10
+ triggers:
+ - type: prometheus
+ metadata:
+ serverAddress: "http://prometheus:9090"
+ metricName: taskito_queue_depth
+ query: sum(taskito_queue_depth{queue="default"})
+ threshold: "10"
+ - type: prometheus
+ metadata:
+ serverAddress: "http://prometheus:9090"
+ metricName: taskito_worker_utilization
+ query: taskito_worker_utilization{queue="default"}
+ threshold: "0.8"
+```
+
+See the [Prometheus integration](/docs/guides/integrations) for setting up
+the metrics collector.
+
+## Deploy templates
+
+Ready-to-use YAML templates are included in the repository under
+`deploy/keda/`:
+
+| File | Purpose |
+|---|---|
+| `scaled-object.yaml` | `ScaledObject` using the HTTP scaler endpoint |
+| `scaled-object-prometheus.yaml` | `ScaledObject` using Prometheus metrics |
+| `scaled-job.yaml` | `ScaledJob` for ephemeral batch workers |
+
+
+ When using SQLite, all worker replicas must share the same database
+ volume. For multi-replica Kubernetes deployments, use the
+ [Postgres backend](/docs/guides/operations/postgres) — workers connect
+ over the network and there's no shared-file constraint.
+
diff --git a/docs-next/content/docs/guides/operations/meta.json b/docs-next/content/docs/guides/operations/meta.json
index 377a3d4..86c3419 100644
--- a/docs-next/content/docs/guides/operations/meta.json
+++ b/docs-next/content/docs/guides/operations/meta.json
@@ -1,4 +1,13 @@
{
"title": "Operations",
- "pages": ["index"]
+ "pages": [
+ "index",
+ "testing",
+ "job-management",
+ "troubleshooting",
+ "deployment",
+ "keda",
+ "postgres",
+ "migration"
+ ]
}
diff --git a/docs-next/content/docs/guides/operations/migration.mdx b/docs-next/content/docs/guides/operations/migration.mdx
new file mode 100644
index 0000000..4eefedd
--- /dev/null
+++ b/docs-next/content/docs/guides/operations/migration.mdx
@@ -0,0 +1,359 @@
+---
+title: Migrating from Celery
+description: "Side-by-side concept mapping and step-by-step migration from Celery to taskito."
+---
+
+import { Callout } from "fumadocs-ui/components/callout";
+import { Tab, Tabs } from "fumadocs-ui/components/tabs";
+
+This guide maps Celery concepts to their taskito equivalents. If you're
+coming from Celery, you'll find that most concepts translate directly —
+with less infrastructure and simpler configuration.
+
+## Concept mapping
+
+| Celery | taskito | Notes |
+|---|---|---|
+| `Celery()` app | `Queue()` | No broker URL needed |
+| `@app.task` | `@queue.task()` | Same decorator pattern |
+| `.apply_async()` | `.apply_async()` | Same name, similar API |
+| `.delay()` | `.delay()` | Identical |
+| `AsyncResult` | `JobResult` | `.result()` instead of `.get()` |
+| Canvas (`chain`, `group`, `chord`) | `chain`, `group`, `chord` | Same names, same concepts |
+| `celery beat` | `@queue.periodic()` | Built-in, no separate process |
+| Result backend (Redis/DB) | Built-in (SQLite) | No configuration needed |
+| Broker (Redis/RabbitMQ) | Not needed | SQLite handles everything |
+| `celery worker` | `taskito worker` | Similar CLI |
+| `celery inspect` | `taskito info` | Similar CLI |
+
+## Side-by-side examples
+
+### App setup
+
+
+
+
+```python
+from celery import Celery
+
+app = Celery(
+ "myapp",
+ broker="redis://localhost:6379/0",
+ backend="redis://localhost:6379/1",
+)
+app.conf.task_serializer = "json"
+app.conf.result_serializer = "json"
+```
+
+
+
+
+```python
+from taskito import Queue
+
+queue = Queue(db_path="myapp.db")
+# That's it. No broker, no backend, no serializer config.
+```
+
+
+
+
+### Task definition
+
+
+
+
+```python
+@app.task(bind=True, max_retries=3)
+def send_email(self, to, subject, body):
+ try:
+ do_send(to, subject, body)
+ except SMTPError as exc:
+ raise self.retry(exc=exc, countdown=60)
+```
+
+
+
+
+```python
+@queue.task(max_retries=3, retry_backoff=2.0, retry_on=[SMTPError])
+def send_email(to, subject, body):
+ do_send(to, subject, body)
+ # Retries happen automatically on matching exceptions.
+ # Use retry_on/dont_retry_on for selective retries.
+```
+
+
+
+
+
+ In Celery, you must explicitly catch exceptions and call `self.retry()`. In
+ taskito, any unhandled exception triggers a retry automatically (up to
+ `max_retries`).
+
+
+### Enqueueing tasks
+
+
+
+
+```python
+# Simple
+send_email.delay("user@example.com", "Hello", "World")
+
+# With options
+send_email.apply_async(
+ args=("user@example.com", "Hello", "World"),
+ countdown=60, # delay in seconds
+ queue="emails",
+ priority=5,
+)
+```
+
+
+
+
+```python
+# Simple
+send_email.delay("user@example.com", "Hello", "World")
+
+# With options
+send_email.apply_async(
+ args=("user@example.com", "Hello", "World"),
+ delay=60, # delay in seconds
+ queue="emails",
+ priority=5,
+)
+```
+
+
+
+
+The only change: `countdown` becomes `delay`.
+
+### Getting results
+
+
+
+
+```python
+result = send_email.delay("user@example.com", "Hi", "Body")
+
+# Block for result
+value = result.get(timeout=30)
+
+# Check status
+result.status # "PENDING", "SUCCESS", "FAILURE"
+```
+
+
+
+
+```python
+job = send_email.delay("user@example.com", "Hi", "Body")
+
+# Block for result
+value = job.result(timeout=30)
+
+# Check status
+job.status # "pending", "running", "complete", "failed", "dead"
+```
+
+
+
+
+Key differences:
+- `.get()` becomes `.result()`
+- Status values are lowercase
+- `"SUCCESS"` becomes `"complete"`
+
+### Workflows (canvas)
+
+
+
+
+```python
+from celery import chain, group, chord
+
+# Chain
+chain(fetch.s(url), parse.s(), store.s()).apply_async()
+
+# Group
+group(process.s(item) for item in items).apply_async()
+
+# Chord
+chord(
+ [download.s(url) for url in urls],
+ merge.s()
+).apply_async()
+```
+
+
+
+
+```python
+from taskito import chain, group, chord
+
+# Chain
+chain(fetch.s(url), parse.s(), store.s()).apply()
+
+# Group
+group(process.s(item) for item in items).apply()
+
+# Chord
+chord(
+ [download.s(url) for url in urls],
+ merge.s()
+).apply()
+```
+
+
+
+
+Almost identical. The only change: `.apply_async()` becomes `.apply()`.
+
+### Periodic tasks
+
+
+
+
+```python
+# celery.py
+app.conf.beat_schedule = {
+ "cleanup-every-hour": {
+ "task": "myapp.cleanup",
+ "schedule": crontab(minute=0),
+ },
+}
+
+# Requires a separate process:
+# celery -A myapp beat
+```
+
+
+
+
+```python
+@queue.periodic(cron="0 0 * * * *")
+def cleanup():
+ ...
+
+# No separate process — the worker handles scheduling.
+# taskito worker --app myapp:queue
+```
+
+
+
+
+
+ taskito uses 6-field cron expressions (with seconds). Celery's `crontab()`
+ maps to the last 5 fields, with `0` prepended for seconds.
+
+ | Celery `crontab()` | taskito cron |
+ |---|---|
+ | `crontab()` (every minute) | `0 * * * * *` |
+ | `crontab(minute=0)` (every hour) | `0 0 * * * *` |
+ | `crontab(minute=0, hour=0)` (daily) | `0 0 0 * * *` |
+ | `crontab(minute=30, hour=9, day_of_week='1-5')` | `0 30 9 * * 1-5` |
+
+
+### Rate limiting
+
+
+
+
+```python
+@app.task(rate_limit="100/m")
+def call_api(endpoint):
+ ...
+```
+
+
+
+
+```python
+@queue.task(rate_limit="100/m")
+def call_api(endpoint):
+ ...
+```
+
+
+
+
+Identical syntax.
+
+### Worker
+
+
+
+
+```bash
+celery -A myapp worker --loglevel=info -Q emails,default
+```
+
+
+
+
+```bash
+taskito worker --app myapp:queue --queues emails,default
+```
+
+
+
+
+### Testing
+
+
+
+
+```python
+# Celery has CELERY_ALWAYS_EAGER mode
+app.conf.task_always_eager = True
+app.conf.task_eager_propagates = True
+
+result = add.delay(2, 3)
+assert result.get() == 5
+```
+
+
+
+
+```python
+with queue.test_mode() as results:
+ add.delay(2, 3)
+ assert results[0].return_value == 5
+```
+
+
+
+
+taskito's test mode uses a context manager instead of a global setting, so
+it's safe to use in parallel test runs.
+
+## What taskito doesn't have
+
+Some Celery features don't have taskito equivalents:
+
+| Celery feature | Status in taskito |
+|---|---|
+| Distributed workers (multi-server) | Use Postgres backend |
+| Message routing (exchanges, topics) | Use named queues instead |
+| `celery multi` (process management) | Use systemd, supervisor, or Docker |
+| Custom serializers (JSON, msgpack) | `JsonSerializer`, `CloudpickleSerializer` (default), or custom `Serializer` protocol |
+| Task cancellation (mid-execution) | Cancel pending or running jobs (`cancel_running_job()` + `check_cancelled()`) |
+| ETA (absolute datetime scheduling) | Use `delay` (relative seconds) |
+| `bind=True` (self argument) | Use `current_job` context instead |
+| Custom result backends | Built-in SQLite or Postgres |
+
+## Migration checklist
+
+- [ ] Replace `Celery()` with `Queue()`
+- [ ] Change `@app.task` to `@queue.task()`
+- [ ] Remove `self.retry()` calls — retries are automatic
+- [ ] Change `.get()` to `.result()` on job results
+- [ ] Change `countdown=` to `delay=` in `.apply_async()`
+- [ ] Replace celery beat schedule with `@queue.periodic()`
+- [ ] Update cron expressions to 6-field format (prepend seconds)
+- [ ] Remove broker and result backend configuration
+- [ ] Change `celery worker` to `taskito worker` in deployment scripts
+- [ ] Replace `task_always_eager` with `queue.test_mode()` in tests
diff --git a/docs-next/content/docs/guides/operations/postgres.mdx b/docs-next/content/docs/guides/operations/postgres.mdx
new file mode 100644
index 0000000..47a4401
--- /dev/null
+++ b/docs-next/content/docs/guides/operations/postgres.mdx
@@ -0,0 +1,246 @@
+---
+title: Postgres Backend
+description: "Multi-machine workers and concurrent writes with the PostgreSQL backend."
+---
+
+taskito supports PostgreSQL as an alternative storage backend for production
+deployments that need multi-machine workers or higher write throughput.
+
+## When to use Postgres
+
+Choose Postgres over the default SQLite backend when you need:
+
+- **Multi-machine workers** — run workers on separate hosts against a shared database
+- **Higher write throughput** — Postgres handles concurrent writes without SQLite's single-writer constraint
+- **Existing Postgres infrastructure** — reuse your existing database server instead of managing SQLite files
+
+For single-machine workloads, SQLite remains the simpler choice — no
+external dependencies required.
+
+## Installation
+
+```bash
+pip install taskito[postgres]
+```
+
+## Configuration
+
+```python
+from taskito import Queue
+
+queue = Queue(
+ backend="postgres",
+ db_url="postgresql://user:password@localhost:5432/myapp",
+ schema="taskito", # optional, default: "taskito"
+)
+```
+
+| Parameter | Type | Default | Description |
+|-----------|------|---------|-------------|
+| `backend` | `str` | `"sqlite"` | Set to `"postgres"` or `"postgresql"` |
+| `db_url` | `str` | `None` | PostgreSQL connection URL (required for Postgres) |
+| `schema` | `str` | `"taskito"` | PostgreSQL schema for all tables |
+| `workers` | `int` | `0` (auto) | Number of worker threads |
+
+All other `Queue` parameters (`default_retry`, `default_timeout`,
+`default_priority`, `result_ttl`) work identically to the SQLite backend.
+
+## Django integration
+
+Configure the Postgres backend via Django settings:
+
+```python
+# settings.py
+TASKITO_BACKEND = "postgres"
+TASKITO_DB_URL = "postgresql://user:password@localhost:5432/myapp"
+TASKITO_SCHEMA = "taskito"
+```
+
+Then use the Django integration as normal:
+
+```python
+from taskito.contrib.django.settings import get_queue
+
+queue = get_queue()
+```
+
+All Django settings:
+
+| Setting | Default | Description |
+|---------|---------|-------------|
+| `TASKITO_BACKEND` | `"sqlite"` | Storage backend (`"sqlite"` or `"postgres"`) |
+| `TASKITO_DB_URL` | `None` | PostgreSQL connection URL |
+| `TASKITO_SCHEMA` | `"taskito"` | PostgreSQL schema name |
+| `TASKITO_DB_PATH` | `".taskito/taskito.db"` | SQLite database path (ignored with Postgres) |
+| `TASKITO_WORKERS` | `0` | Worker thread count (0 = auto-detect) |
+| `TASKITO_DEFAULT_RETRY` | `3` | Default max retries |
+| `TASKITO_DEFAULT_TIMEOUT` | `300` | Default task timeout in seconds |
+| `TASKITO_DEFAULT_PRIORITY` | `0` | Default task priority |
+| `TASKITO_RESULT_TTL` | `None` | Result TTL in seconds |
+
+## Schema isolation
+
+taskito creates all tables inside a dedicated PostgreSQL schema (default:
+`taskito`). The schema is created automatically if it doesn't exist.
+
+```python
+# Use a custom schema
+queue = Queue(backend="postgres", db_url="postgresql://...", schema="myapp_tasks")
+```
+
+Schema names must contain only alphanumeric characters and underscores.
+Invalid names raise a `ConfigError` at startup.
+
+This lets you run multiple independent taskito instances in the same
+database by using different schemas, or keep taskito tables separate from
+your application tables.
+
+## Connection pooling
+
+The Postgres backend uses Diesel's `r2d2` connection pool with a default
+size of **10 connections**. Each connection has the `search_path` set to
+the configured schema on acquisition.
+
+The pool size is configured at the Rust layer. For most workloads, the
+default of 10 connections is sufficient.
+
+## Migrations
+
+Migrations run automatically on first connection. taskito creates the
+following **11 tables** inside the configured schema:
+
+| Table | Purpose |
+|-------|---------|
+| `jobs` | Core job storage |
+| `dead_letter` | Dead letter queue |
+| `rate_limits` | Token bucket rate limiting state |
+| `periodic_tasks` | Cron-scheduled task definitions |
+| `job_errors` | Per-attempt error tracking |
+| `job_dependencies` | Task dependency edges |
+| `task_metrics` | Execution time and memory metrics |
+| `replay_history` | Job replay audit trail |
+| `task_logs` | Structured task log entries |
+| `circuit_breakers` | Circuit breaker state |
+| `workers` | Worker heartbeat tracking |
+
+All tables use PostgreSQL-native types (`TEXT`, `BYTEA`, `BIGINT`,
+`BOOLEAN`, `DOUBLE PRECISION`) rather than SQLite-compatible types.
+
+## Differences from SQLite
+
+| Aspect | SQLite | Postgres |
+|--------|--------|----------|
+| Connection model | Embedded, file-based | Client/server, networked |
+| Write concurrency | Single writer (WAL mode) | Multiple concurrent writers |
+| Distribution | Single machine only | Multi-machine workers |
+| Setup | Zero config, bundled | Requires Postgres server |
+| Connection pool default | 8 connections | 10 connections |
+| Schema isolation | N/A (file per database) | Custom PostgreSQL schema |
+| Tables | 6 tables | 11 tables (additional: `job_dependencies`, `task_metrics`, `replay_history`, `task_logs`, `circuit_breakers`) |
+| Backup | `sqlite3 .backup` | `pg_dump` |
+
+## Deployment
+
+### Docker Compose
+
+```yaml
+services:
+ postgres:
+ image: postgres:16
+ environment:
+ POSTGRES_DB: myapp
+ POSTGRES_USER: taskito
+ POSTGRES_PASSWORD: secret
+ volumes:
+ - pgdata:/var/lib/postgresql/data
+ ports:
+ - "5432:5432"
+
+ worker:
+ build: .
+ environment:
+ TASKITO_BACKEND: postgres
+ TASKITO_DB_URL: postgresql://taskito:secret@postgres:5432/myapp
+ depends_on:
+ - postgres
+ stop_signal: SIGINT
+ stop_grace_period: 35s
+
+ dashboard:
+ build: .
+ command: taskito dashboard --app myapp:queue --host 0.0.0.0
+ environment:
+ TASKITO_BACKEND: postgres
+ TASKITO_DB_URL: postgresql://taskito:secret@postgres:5432/myapp
+ depends_on:
+ - postgres
+ ports:
+ - "8080:8080"
+
+volumes:
+ pgdata:
+```
+
+With Postgres, there are no shared-file constraints — workers and dashboard
+connect over the network. You can run multiple worker containers across
+different hosts.
+
+### systemd
+
+```ini
+[Unit]
+Description=taskito worker
+After=network.target postgresql.service
+
+[Service]
+Type=simple
+User=myapp
+Group=myapp
+WorkingDirectory=/opt/myapp
+ExecStart=/opt/myapp/.venv/bin/taskito worker --app myapp:queue
+Restart=always
+RestartSec=5
+KillSignal=SIGINT
+TimeoutStopSec=35
+
+Environment=PYTHONPATH=/opt/myapp
+Environment=TASKITO_BACKEND=postgres
+Environment=TASKITO_DB_URL=postgresql://taskito:secret@db.internal:5432/myapp
+
+[Install]
+WantedBy=multi-user.target
+```
+
+### Multi-machine workers
+
+With Postgres, you can run workers on multiple machines. Each worker
+connects to the same database and coordinates through PostgreSQL's row-level
+locking:
+
+```bash
+# Machine 1
+taskito worker --app myapp:queue
+
+# Machine 2
+taskito worker --app myapp:queue
+
+# Machine 3
+taskito worker --app myapp:queue
+```
+
+All workers share the same job queue and dequeue work atomically.
+
+## Backups
+
+Use standard PostgreSQL backup tools instead of SQLite-specific commands:
+
+```bash
+# Dump the taskito schema
+pg_dump -h localhost -U taskito -d myapp -n taskito > backup.sql
+
+# Restore
+psql -h localhost -U taskito -d myapp < backup.sql
+```
+
+For continuous backups, use PostgreSQL's built-in WAL archiving or a tool
+like [pgBackRest](https://pgbackrest.org/).
diff --git a/docs-next/content/docs/guides/operations/testing.mdx b/docs-next/content/docs/guides/operations/testing.mdx
new file mode 100644
index 0000000..fa61abc
--- /dev/null
+++ b/docs-next/content/docs/guides/operations/testing.mdx
@@ -0,0 +1,438 @@
+---
+title: Testing
+description: "Test mode, TestResult, MockResource, fixtures, and workflow testing."
+---
+
+import { Callout } from "fumadocs-ui/components/callout";
+
+taskito includes a built-in test mode that runs tasks **synchronously** in
+the calling thread — no worker, no Rust scheduler, no SQLite. This makes
+tests fast, deterministic, and easy to write.
+
+## Quick example
+
+```python
+from taskito import Queue
+
+queue = Queue()
+
+@queue.task()
+def add(a: int, b: int) -> int:
+ return a + b
+
+def test_add():
+ with queue.test_mode() as results:
+ add.delay(2, 3)
+
+ assert len(results) == 1
+ assert results[0].return_value == 5
+ assert results[0].succeeded
+```
+
+## How it works
+
+When you enter `queue.test_mode()`, taskito patches the `enqueue()` method
+so that every `.delay()` or `.apply_async()` call:
+
+1. Looks up the task function in the registry
+2. Calls it immediately in the current thread
+3. Captures the return value (or exception) in a `TestResult`
+4. Appends the result to the `TestResults` list
+
+No database is created. No worker threads are spawned. Tasks execute
+eagerly and synchronously.
+
+## `queue.test_mode()`
+
+```python
+with queue.test_mode(propagate_errors=False, resources=None) as results:
+ # tasks run synchronously here
+ ...
+```
+
+| Parameter | Type | Default | Description |
+|---|---|---|---|
+| `propagate_errors` | `bool` | `False` | If `True`, task exceptions are re-raised immediately instead of being captured in `TestResult.error` |
+| `resources` | `dict[str, Any] \| None` | `None` | Map of resource name → mock instance or `MockResource` for injection. See [Resource System](/docs/guides/resources). |
+
+The context manager yields a `TestResults` list that accumulates results as
+tasks execute.
+
+## `TestResult`
+
+Each executed task produces a `TestResult`:
+
+```python
+with queue.test_mode() as results:
+ add.delay(2, 3)
+
+ r = results[0]
+ r.job_id # "test-000001"
+ r.task_name # "mymodule.add"
+ r.args # (2, 3)
+ r.kwargs # {}
+ r.return_value # 5
+ r.error # None
+ r.traceback # None
+ r.succeeded # True
+ r.failed # False
+```
+
+| Attribute | Type | Description |
+|---|---|---|
+| `job_id` | `str` | Synthetic ID like `"test-000001"` |
+| `task_name` | `str` | Fully qualified task name |
+| `args` | `tuple` | Positional arguments passed to the task |
+| `kwargs` | `dict` | Keyword arguments passed to the task |
+| `return_value` | `Any` | Return value on success, `None` on failure |
+| `error` | `Exception \| None` | The exception if the task failed |
+| `traceback` | `str \| None` | Formatted traceback if the task failed |
+| `succeeded` | `bool` | `True` if no error |
+| `failed` | `bool` | `True` if an error occurred |
+
+## `TestResults`
+
+`TestResults` is a list of `TestResult` with convenience methods:
+
+```python
+with queue.test_mode() as results:
+ add.delay(2, 3)
+ failing_task.delay()
+ add.delay(10, 20)
+
+ # Filter by outcome
+ results.succeeded # TestResults with 2 items
+ results.failed # TestResults with 1 item
+
+ # Filter by task name
+ results.filter(task_name="mymodule.add") # 2 items
+
+ # Combine filters
+ results.filter(task_name="mymodule.add", succeeded=True) # 2 items
+```
+
+### `.filter()`
+
+```python
+results.filter(task_name=None, succeeded=None) -> TestResults
+```
+
+| Parameter | Type | Description |
+|---|---|---|
+| `task_name` | `str \| None` | Filter by exact task name |
+| `succeeded` | `bool \| None` | `True` for successes, `False` for failures |
+
+## Testing failures
+
+By default, task exceptions are captured — not raised:
+
+```python
+@queue.task()
+def risky():
+ raise ValueError("something broke")
+
+def test_failure_captured():
+ with queue.test_mode() as results:
+ risky.delay()
+
+ assert len(results) == 1
+ assert results[0].failed
+ assert isinstance(results[0].error, ValueError)
+ assert "something broke" in str(results[0].error)
+ assert results[0].traceback is not None
+```
+
+### Propagating errors
+
+Use `propagate_errors=True` when you want exceptions to bubble up:
+
+```python
+def test_failure_propagated():
+ with queue.test_mode(propagate_errors=True) as results:
+ with pytest.raises(ValueError, match="something broke"):
+ risky.delay()
+```
+
+## Testing workflows
+
+Chains, groups, and chords work in test mode because they call `enqueue()`
+internally, which is intercepted by the test mode patch.
+
+### Chains
+
+```python
+from taskito import chain
+
+@queue.task()
+def double(n: int) -> int:
+ return n * 2
+
+@queue.task()
+def add_ten(n: int) -> int:
+ return n + 10
+
+def test_chain():
+ with queue.test_mode() as results:
+ chain(double.s(5), add_ten.s()).apply()
+
+ assert len(results) == 2
+ assert results[0].return_value == 10 # double(5)
+ assert results[1].return_value == 20 # add_ten(10)
+```
+
+### Groups
+
+```python
+from taskito import group
+
+def test_group():
+ with queue.test_mode() as results:
+ group(double.s(1), double.s(2), double.s(3)).apply()
+
+ assert len(results) == 3
+ values = [r.return_value for r in results]
+ assert values == [2, 4, 6]
+```
+
+## Job context in tests
+
+`current_job` works inside test mode. The context is set up before each
+task runs:
+
+```python
+from taskito import current_job
+
+@queue.task()
+def context_aware():
+ return {
+ "job_id": current_job.id,
+ "task_name": current_job.task_name,
+ "retry_count": current_job.retry_count,
+ "queue_name": current_job.queue_name,
+ }
+
+def test_context():
+ with queue.test_mode() as results:
+ context_aware.delay()
+
+ ctx = results[0].return_value
+ assert ctx["job_id"].startswith("test-")
+ assert ctx["retry_count"] == 0
+ assert ctx["queue_name"] == "default"
+```
+
+## Pytest integration
+
+### Fixture pattern
+
+Create a reusable fixture for test mode:
+
+```python
+# conftest.py
+import pytest
+from myapp import queue
+
+@pytest.fixture
+def task_results():
+ with queue.test_mode() as results:
+ yield results
+
+# test_tasks.py
+def test_add(task_results):
+ add.delay(2, 3)
+ assert task_results[0].return_value == 5
+
+def test_email(task_results):
+ send_email.delay("user@example.com", "Hello", "World")
+ assert task_results[0].succeeded
+```
+
+### Fixture with error propagation
+
+```python
+@pytest.fixture
+def strict_tasks():
+ with queue.test_mode(propagate_errors=True) as results:
+ yield results
+```
+
+### Testing async code
+
+Test mode works with async test functions — the tasks still execute
+synchronously:
+
+```python
+import pytest
+
+@pytest.mark.asyncio
+async def test_async_enqueue(task_results):
+ add.delay(1, 2)
+ assert task_results[0].return_value == 3
+```
+
+## Testing with worker resources
+
+If your tasks use [worker resources](/docs/guides/resources) (injected via
+`inject=` or `Inject["name"]`), pass mock instances through `resources=`:
+
+```python
+from unittest.mock import MagicMock
+
+@queue.worker_resource("db")
+def create_db():
+ return real_sessionmaker
+
+@queue.task(inject=["db"])
+def create_user(name: str, db):
+ session = db()
+ session.add(User(name=name))
+ session.commit()
+
+def test_create_user():
+ mock_db = MagicMock()
+
+ with queue.test_mode(resources={"db": mock_db}) as results:
+ create_user.delay("Alice")
+
+ assert results[0].succeeded
+ mock_db.return_value.add.assert_called_once()
+```
+
+### `MockResource`
+
+`MockResource` adds call tracking to a mock value:
+
+```python
+from taskito import MockResource
+
+spy = MockResource("db", wraps=real_db, track_calls=True)
+
+with queue.test_mode(resources={"db": spy}) as results:
+ create_user.delay("Alice")
+
+assert spy.call_count == 1
+assert results[0].succeeded
+```
+
+| Parameter | Type | Description |
+|---|---|---|
+| `name` | `str` | Resource name (informational). |
+| `return_value` | `Any` | Value returned when the resource is accessed. |
+| `wraps` | `Any` | Wrap a real object — returned as-is when accessed. |
+| `track_calls` | `bool` | Increment `call_count` each access. |
+
+#### `return_value` vs `wraps`
+
+Use `return_value` when you want a simple stub:
+
+```python
+mock_cache = MockResource("cache", return_value={"key": "value"})
+```
+
+Use `wraps` when you need the real object but want call tracking:
+
+```python
+real_db = create_test_database()
+spy_db = MockResource("db", wraps=real_db, track_calls=True)
+```
+
+#### Multiple resources
+
+Pass multiple resources to `test_mode`:
+
+```python
+with queue.test_mode(resources={
+ "db": MockResource("db", return_value=mock_db),
+ "cache": MockResource("cache", return_value={}),
+ "mailer": MockResource("mailer", return_value=mock_smtp),
+}) as results:
+ process_order.delay(order_id=123)
+```
+
+#### Testing with `inject`
+
+Tasks that use `@queue.task(inject=["db"])` receive the mock resource
+automatically:
+
+```python
+@queue.task(inject=["db"])
+def create_user(name, db=None):
+ db.execute("INSERT INTO users (name) VALUES (?)", (name,))
+
+mock_db = MagicMock()
+with queue.test_mode(resources={"db": mock_db}) as results:
+ create_user.delay("Alice")
+
+assert results[0].succeeded
+mock_db.execute.assert_called_once()
+```
+
+
+ When `resources=` is provided, proxy reconstruction is bypassed
+ automatically. Proxy markers in arguments are passed through as-is so
+ tests don't fail due to missing files or network connections.
+
+
+## What test mode does NOT cover
+
+Test mode is designed for **unit and integration testing** of task logic.
+It does not exercise:
+
+- SQLite storage or queries
+- Retry/backoff scheduling
+- Rate limiting
+- Timeout reaping
+- Worker thread pool dispatch
+- Priority ordering
+
+For end-to-end tests that exercise the full Rust scheduler, run a real
+worker in a background thread:
+
+```python
+import threading
+import time
+
+def test_e2e():
+ queue_e2e = Queue(db_path=":memory:")
+
+ @queue_e2e.task()
+ def add(a, b):
+ return a + b
+
+ t = threading.Thread(target=queue_e2e.run_worker, daemon=True)
+ t.start()
+
+ job = add.delay(2, 3)
+ result = job.result(timeout=10)
+ assert result == 5
+```
+
+
+ Per-task and queue-level `TaskMiddleware` hooks (`before`, `after`,
+ `on_retry`) **do fire** in test mode, since they run in the Python wrapper
+ around your task function. This lets you verify middleware behavior in
+ tests without running a real worker.
+
+
+## Running tests locally
+
+```bash
+# Rust tests
+cargo test --workspace
+
+# Rebuild the Python extension after Rust changes
+uv run maturin develop
+
+# Python tests
+uv run python -m pytest tests/python/ -v
+
+# Linting
+uv run ruff check py_src/ tests/
+uv run mypy py_src/taskito/ --no-incremental
+```
+
+To build with native async support:
+
+```bash
+uv run maturin develop --features native-async
+```
diff --git a/docs-next/content/docs/guides/operations/troubleshooting.mdx b/docs-next/content/docs/guides/operations/troubleshooting.mdx
new file mode 100644
index 0000000..6268dc5
--- /dev/null
+++ b/docs-next/content/docs/guides/operations/troubleshooting.mdx
@@ -0,0 +1,259 @@
+---
+title: Troubleshooting
+description: "Diagnose stuck jobs, unresponsive workers, growing databases, latency, and missing tasks."
+---
+
+import { Callout } from "fumadocs-ui/components/callout";
+
+Common issues and how to fix them.
+
+## Jobs stuck in running
+
+**Symptom**: jobs stay in `running` status long after they should have finished.
+
+**Diagnosis**: the worker process that picked up the job crashed before
+marking it complete.
+
+```python
+# Check how many jobs are stuck
+stats = queue.stats()
+print(stats) # {'running': 47, 'pending': 0, ...}
+
+# See which jobs are stuck
+stuck = queue.list_jobs(status="running", limit=20)
+for job in stuck:
+ d = job.to_dict()
+ print(f"{d['id']} | {d['task_name']} | started {d['started_at']}")
+```
+
+**Fix**: the stale reaper handles this automatically — it detects jobs that
+have exceeded their `timeout_ms` and retries them. If a job has no timeout
+set, it stays stuck forever.
+
+To recover a stuck job manually:
+
+```python
+import time
+
+# Mark the job as failed so it retries
+queue._inner.retry(job_id, int(time.time() * 1000))
+```
+
+To prevent this in future, always set a timeout on production tasks:
+
+```python
+@queue.task(timeout=300) # 5 minutes max
+def process_data(payload):
+ ...
+```
+
+
+ Jobs without `timeout_ms` are never reaped. The stale reaper only detects
+ jobs that have exceeded their deadline.
+
+
+## Worker is unresponsive
+
+**Symptom**: worker process is alive but not processing jobs. Heartbeat is stale.
+
+**Diagnosis**: check worker status via the heartbeat API.
+
+```python
+workers = queue.workers()
+for w in workers:
+ print(f"{w['worker_id']}: {w['status']} (last seen: {w['last_heartbeat']})")
+```
+
+**Possible causes**:
+
+1. **GIL-bound CPU task**: a long-running CPU task is holding the GIL,
+ blocking the scheduler thread from dispatching new jobs. The scheduler
+ runs in Rust, but it still needs the GIL to call Python functions.
+
+ Fix: switch to the prefork pool for CPU-bound tasks.
+
+ ```bash
+ taskito worker --app myapp:queue --pool prefork
+ ```
+
+2. **Deadlock**: a task is waiting on a resource held by another task in the
+ same worker. Check for circular waits in your task code.
+
+3. **Infinite loop**: a task is looping without yielding. Add a timeout to
+ detect this:
+
+ ```python
+ @queue.task(timeout=60)
+ def risky_task():
+ ...
+ ```
+
+## Database growing too large
+
+**Symptom**: the SQLite file keeps growing; disk space is filling up.
+
+**Diagnosis**: completed job records and their result payloads are accumulating.
+
+**Fix**: set `result_ttl` to auto-purge old results.
+
+```python
+queue = Queue(
+ db_path="myapp.db",
+ result_ttl=86400, # Purge completed/dead jobs older than 24 hours
+)
+```
+
+Manually purge existing backlog:
+
+```python
+# Purge completed jobs older than 7 days
+queue.purge_completed(older_than=604800)
+
+# Purge dead-lettered jobs older than 30 days
+queue.purge_dead(older_than=2592000)
+```
+
+After purging, reclaim disk space:
+
+```bash
+sqlite3 myapp.db "VACUUM;"
+```
+
+
+ `VACUUM` rewrites the entire database and requires exclusive access. Run
+ it during low-traffic periods.
+
+
+## High job latency
+
+**Symptom**: jobs sit in `pending` for longer than expected before starting.
+
+**Diagnosis**: check the queue depth and scheduler configuration.
+
+```python
+stats = queue.stats()
+print(f"Pending: {stats['pending']}, Running: {stats['running']}")
+```
+
+**Possible causes and fixes**:
+
+1. **Scheduler poll interval too high**: default is 50ms. Jobs can wait up
+ to one poll interval before being picked up.
+
+ ```python
+ queue = Queue(scheduler_poll_interval_ms=10) # Poll every 10ms
+ ```
+
+ Lower values increase CPU/DB usage. Balance based on your latency
+ requirements.
+
+2. **Not enough workers**: all workers are busy. Increase the worker count.
+
+ ```python
+ queue = Queue(workers=16)
+ ```
+
+3. **Rate limiting**: the task or queue has a rate limit active.
+
+ ```python
+ # Check if rate limiting is the culprit
+ # Rate-limited jobs are rescheduled 1 second into the future
+ pending = queue.list_jobs(status="pending", limit=10)
+ for job in pending:
+ print(job.to_dict()["scheduled_at"])
+ ```
+
+4. **Database performance**: slow dequeue queries. Check SQLite WAL size or
+ Postgres query plans.
+
+## Memory usage growing
+
+**Symptom**: worker process memory climbs over time.
+
+**Causes**:
+
+1. **Large result payloads**: task return values are stored in the database
+ but also held in the scheduler's result buffer briefly. If tasks return
+ large objects (images, dataframes), memory spikes.
+
+ Fix: return a reference (file path, object key) instead of the data itself.
+
+ ```python
+ # Bad — large result stored in memory and DB
+ @queue.task()
+ def process_image(path: str) -> bytes:
+ return open(path, "rb").read()
+
+ # Good — return a path
+ @queue.task()
+ def process_image(path: str) -> str:
+ out = path + ".processed"
+ # ... write output to out ...
+ return out
+ ```
+
+2. **Accumulated job records**: without `result_ttl`, the database grows
+ unbounded. See "Database growing too large" above.
+
+3. **Resource leaks in tasks**: a task opens a file or connection and never
+ closes it. Use context managers.
+
+## Periodic task running twice
+
+**Symptom**: a periodic task fires more than once per interval, or appears
+to run on two workers simultaneously.
+
+**Behavior**: this is safe by design. Periodic tasks use `unique_key`
+deduplication — when a periodic task is due, each worker's scheduler checks
+and tries to enqueue it, but only one enqueue succeeds because the
+`unique_key` constraint prevents duplicates.
+
+If you see two completed jobs for the same periodic task in the same
+interval, check:
+
+```python
+# Look for duplicate completions
+jobs = queue.list_jobs(status="complete", limit=50)
+periodic_jobs = [j for j in jobs if "daily_report" in j.to_dict()["task_name"]]
+for j in periodic_jobs:
+ print(j.to_dict()["completed_at"])
+```
+
+If you're genuinely seeing duplicate execution, ensure all workers use the
+same database (same SQLite file path or same Postgres DSN).
+
+## Task not found in worker
+
+**Symptom**: worker logs `TaskNotFound` or jobs fail with an error like
+`unknown task: myapp.tasks.process`.
+
+**Cause**: the task name registered at enqueue time doesn't match what the
+worker has registered.
+
+Task names default to `module.function_name`. If you enqueue from one module
+path and run the worker with a different import path, the names won't match.
+
+**Diagnosis**:
+
+```python
+# Check the task name stored in the job
+job = queue.get_job(job_id)
+print(job.to_dict()["task_name"]) # e.g. "myapp.tasks.process"
+
+# Check what the worker has registered
+# (add this temporarily to your worker startup)
+print(list(queue._task_registry.keys()))
+```
+
+**Fix**: use consistent import paths. If the task is `myapp/tasks.py:process`,
+always import it as `myapp.tasks.process` — not `tasks.process` (relative)
+or `src.myapp.tasks.process` (with src prefix).
+
+You can also set an explicit name to decouple the task name from the module
+path:
+
+```python
+@queue.task(name="process-data")
+def process(payload):
+ ...
+```