diff --git a/docs-next/content/docs/guides/advanced-execution/async-tasks.mdx b/docs-next/content/docs/guides/advanced-execution/async-tasks.mdx new file mode 100644 index 0000000..e0fdc95 --- /dev/null +++ b/docs-next/content/docs/guides/advanced-execution/async-tasks.mdx @@ -0,0 +1,205 @@ +--- +title: Native Async Tasks +description: "async def tasks dispatched directly onto a dedicated event loop." +--- + +import { Callout } from "fumadocs-ui/components/callout"; + +taskito runs async task functions natively — no wrapping in `asyncio.run()`, +no thread-pool bridging. Define a coroutine and the worker dispatches it +directly onto a dedicated event loop. + +```python +from taskito import Queue + +queue = Queue(db_path="myapp.db") + +@queue.task() +async def fetch_data(url: str) -> dict: + import httpx + async with httpx.AsyncClient() as client: + response = await client.get(url) + return response.json() +``` + +Enqueue it the same way as a sync task: + +```python +job = fetch_data.delay("https://api.example.com/data") +result = job.result(timeout=30) +``` + +## How it works + +When a task decorated with `@queue.task()` is an `async def`, taskito marks +it for native dispatch. At worker startup, a `NativeAsyncPool` is initialized +alongside the standard thread pool. When the scheduler dequeues an async job +it routes it to the native pool instead of a sync worker thread. + +(Tokio)"] -->|"sync job"| TP["Thread Pool
spawn_blocking"] + S -->|"async job"| AP["Native Async Pool
AsyncTaskExecutor"] + AP --> EL["Dedicated Event Loop
(daemon thread)"] + EL -->|"result"| RS["PyResultSender"] + RS -->|"report_success / report_failure"| S`} +/> + +The dedicated event loop lives in its own Python daemon thread. All async +tasks for a single worker share that loop; a semaphore caps simultaneous +execution. + +## Concurrency limit + +Control how many async tasks run concurrently on the event loop: + +```python +queue = Queue( + db_path="myapp.db", + async_concurrency=50, # default: 100 +) +``` + +This is independent of the `workers` parameter (sync thread count). A typical +mixed setup might be: + +```python +queue = Queue( + db_path="myapp.db", + workers=4, # 4 OS threads for sync tasks + async_concurrency=200, # up to 200 concurrent async tasks +) +``` + +## Job context + +`current_job` works inside async tasks — it reads from `contextvars` rather +than `threading.local`, so it's safe across `await` boundaries: + +```python +from taskito.context import current_job + +@queue.task() +async def process(item_id: str) -> str: + current_job.log(f"Starting {item_id}") + await asyncio.sleep(1) + current_job.update_progress(50) + await asyncio.sleep(1) + current_job.update_progress(100) + return f"done:{item_id}" +``` + +Each async task gets an isolated `ContextVar` token. Concurrent tasks on the +same loop do not see each other's contexts. + +## Resource injection + +Async tasks support `inject=` and `Inject["name"]` annotations the same way +as sync tasks: + +```python +@queue.worker_resource("http_client") +def make_http_client(): + import httpx + return httpx.AsyncClient() + +@queue.task(inject=["http_client"]) +async def fetch(url: str, http_client=None) -> dict: + response = await http_client.get(url) + return response.json() +``` + + + Resource initialization still runs on worker startup in the main thread. + The resource instance is then passed into the async task at dispatch time. + + +## Middleware + +Middleware `before()` and `after()` hooks run for async tasks the same as for +sync tasks. They are called from within the async execution context, so +`current_job` is available: + +```python +class LoggingMiddleware(TaskMiddleware): + def before(self, ctx): + current_job.log("task started") + + def after(self, ctx, result, error): + current_job.log("task finished") +``` + +## Retry filtering + +`retry_on` and `dont_retry_on` on `@queue.task()` apply to async tasks: + +```python +@queue.task( + max_retries=5, + retry_on=[httpx.TimeoutException], + dont_retry_on=[httpx.HTTPStatusError], +) +async def fetch_with_retry(url: str) -> dict: + ... +``` + +## Mixing sync and async tasks + +A single queue handles both sync and async tasks transparently. The worker +dispatches each job to the correct pool based on the `_taskito_is_async` +attribute set at registration time: + +```python +@queue.task() +def sync_task(x: int) -> int: + return x * 2 + +@queue.task() +async def async_task(x: int) -> int: + await asyncio.sleep(0.1) + return x * 2 +``` + +Both are enqueued, retried, rate-limited, and monitored identically. + +## Feature flag + +The native async dispatch path is compiled into the `taskito-async` Rust +crate and enabled via the `native-async` feature flag. Pre-built wheels on +PyPI include it by default. If you build from source: + +```bash +maturin develop --features native-async +``` + +Without the feature, async tasks are still enqueued and processed — they +fall back to running via `asyncio.run()` on a worker thread. + +## Async queue methods + +All inspection methods have async variants that run in a thread pool: + +```python +# Sync +stats = queue.stats() +dead = queue.dead_letters() +new_id = queue.retry_dead(dead_id) +cancelled = queue.cancel_job(job_id) +result = job.result(timeout=30) + +# Async equivalents +stats = await queue.astats() +dead = await queue.adead_letters() +new_id = await queue.aretry_dead(dead_id) +cancelled = await queue.acancel_job(job_id) +result = await job.aresult(timeout=30) +``` + +### Async worker + +```python +async def main(): + await queue.arun_worker(queues=["default"]) + +asyncio.run(main()) +``` diff --git a/docs-next/content/docs/guides/advanced-execution/batch-enqueue.mdx b/docs-next/content/docs/guides/advanced-execution/batch-enqueue.mdx new file mode 100644 index 0000000..dc056c2 --- /dev/null +++ b/docs-next/content/docs/guides/advanced-execution/batch-enqueue.mdx @@ -0,0 +1,44 @@ +--- +title: Batch Enqueue +description: "Insert many jobs in a single SQLite transaction with task.map() and enqueue_many()." +--- + +Insert many jobs in a single SQLite transaction for high throughput. + +## `task.map()` + +```python +@queue.task() +def process(item_id): + return fetch_and_process(item_id) + +# Enqueue 1000 jobs in one transaction +jobs = process.map([(i,) for i in range(1000)]) +``` + +## `queue.enqueue_many()` + +```python +# Basic batch — same options for all jobs +jobs = queue.enqueue_many( + task_name="myapp.process", + args_list=[(i,) for i in range(1000)], + priority=5, + queue="processing", +) + +# Full parity with enqueue() — per-job overrides +jobs = queue.enqueue_many( + task_name="myapp.process", + args_list=[(i,) for i in range(100)], + delay=5.0, # uniform 5s delay for all + unique_keys=[f"item-{i}" for i in range(100)], # per-job dedup + metadata='{"source": "batch"}', # uniform metadata + expires=3600.0, # expire after 1 hour + result_ttl=600, # keep results for 10 minutes +) +``` + +Per-job lists (`delay_list`, `metadata_list`, `expires_list`, +`result_ttl_list`) override uniform values when both are provided. See the +[API reference](/docs/api-reference/queue) for the full parameter list. diff --git a/docs-next/content/docs/guides/advanced-execution/dependencies.mdx b/docs-next/content/docs/guides/advanced-execution/dependencies.mdx new file mode 100644 index 0000000..952ad20 --- /dev/null +++ b/docs-next/content/docs/guides/advanced-execution/dependencies.mdx @@ -0,0 +1,272 @@ +--- +title: Task Dependencies +description: "DAG-style depends_on for jobs — cascade cancel, missing-dep handling, common patterns." +--- + +import { Callout } from "fumadocs-ui/components/callout"; +import { Tab, Tabs } from "fumadocs-ui/components/tabs"; + +taskito supports declaring dependencies between jobs, allowing you to build +DAG-style workflows where a job only runs after its upstream dependencies +have completed successfully. + +## Basic usage + +Pass `depends_on` when enqueuing a job to declare that it should wait for one +or more other jobs to finish: + + + + +```python +job_a = extract.delay(url) + +# job_b won't start until job_a completes successfully +job_b = transform.apply_async( + args=(job_a.id,), + depends_on=job_a.id, +) +``` + + + + +```python +job_a = fetch.delay("https://api1.example.com") +job_b = fetch.delay("https://api2.example.com") + +# job_c waits for both job_a and job_b +job_c = merge.apply_async( + args=(), + depends_on=[job_a.id, job_b.id], +) +``` + + + + +The `depends_on` parameter accepts: + +| Value | Description | +|---|---| +| `str` | A single job ID | +| `list[str]` | Multiple job IDs (all must complete) | +| `None` (default) | No dependencies | + + + You can also use `depends_on` with `queue.enqueue()` directly: + + ```python + job_id = queue.enqueue( + task_name="myapp.tasks.merge", + args=(), + depends_on=[job_a.id, job_b.id], + ) + ``` + + +## How it works + +1. When a job with `depends_on` is enqueued, it enters a **waiting** state +2. The scheduler periodically checks waiting jobs to see if all dependencies have completed +3. Once every dependency has `status=completed`, the job transitions to `pending` and becomes eligible for dispatch +4. If any dependency fails, dies, or is cancelled, the dependent job is **cascade cancelled** + + W["Status: Waiting"] + W --> CHECK{"All deps completed?"} + CHECK -->|Yes| P["Status: Pending"] + CHECK -->|"Any dep failed/dead/cancelled"| CC["Cascade Cancel"] + P --> R["Dispatched to worker"] + CC --> DONE["Status: Cancelled
reason: dependency_failed"]`} +/> + +## Cascade cancel + +When a dependency fails (exhausts retries and moves to DLQ), dies, or is +cancelled, all downstream dependents are automatically cancelled. This +propagates transitively through the entire dependency graph: + +```python +job_a = step_one.delay() +job_b = step_two.apply_async(args=(), depends_on=job_a.id) +job_c = step_three.apply_async(args=(), depends_on=job_b.id) + +# If job_a fails permanently: +# - job_b is cascade cancelled +# - job_c is cascade cancelled (transitive) +``` + + + As soon as a dependency enters a terminal failure state (`dead` or + `cancelled`), all downstream dependents are cancelled in the same scheduler + tick. There is no grace period. + + +## Inspecting dependencies + +### `job.dependencies` + +Returns the list of job IDs this job depends on: + +```python +job_c = merge.apply_async( + args=(), + depends_on=[job_a.id, job_b.id], +) + +fetched = queue.get_job(job_c.id) +print(fetched.dependencies) # ['01H5K6X...', '01H5K7Y...'] +``` + +### `job.dependents` + +Returns the list of job IDs that depend on this job: + +```python +fetched_a = queue.get_job(job_a.id) +print(fetched_a.dependents) # ['01H5K8Z...'] (job_c's ID) +``` + +## Error handling + +### Missing dependencies + +If you reference a job ID that does not exist, enqueue raises a `ValueError`: + +```python +try: + job = transform.apply_async( + args=(), + depends_on="nonexistent-job-id", + ) +except ValueError as e: + print(e) # "Dependency job 'nonexistent-job-id' not found" +``` + +### Already-dead dependencies + +If a dependency is already in a terminal failure state (`dead` or +`cancelled`) at enqueue time, the dependent job is immediately cancelled: + +```python +dead_job = queue.get_job(some_dead_id) +assert dead_job.status == "dead" + +# This job is immediately cancelled — it will never run +job = transform.apply_async( + args=(), + depends_on=dead_job.id, +) + +fetched = queue.get_job(job.id) +print(fetched.status) # "cancelled" +``` + +## DAG workflow examples + +### Diamond pattern + +A classic diamond dependency graph where two branches converge: + + B["transform_a.apply_async(depends_on=A)"] + A --> C["transform_b.apply_async(depends_on=A)"] + B --> D["load.apply_async(depends_on=[B, C])"] + C --> D`} +/> + +```python +# Extract +job_a = extract.delay(source_url) + +# Two parallel transforms, each depending on extract +job_b = transform_a.apply_async( + args=("schema_a",), + depends_on=job_a.id, +) +job_c = transform_b.apply_async( + args=("schema_b",), + depends_on=job_a.id, +) + +# Load waits for both transforms +job_d = load.apply_async( + args=(), + depends_on=[job_b.id, job_c.id], +) +``` + +### Multi-stage pipeline + +A sequential pipeline with fan-out at one stage: + +```python +# Stage 1: Download +download_jobs = [ + download.delay(url) for url in urls +] + +# Stage 2: Process each download (each depends on its own download) +process_jobs = [ + process.apply_async( + args=(url,), + depends_on=dl.id, + ) + for dl, url in zip(download_jobs, urls) +] + +# Stage 3: Aggregate all results +aggregate_job = aggregate.apply_async( + args=(), + depends_on=[j.id for j in process_jobs], +) +``` + +### Conditional branching + +Combine dependencies with metadata to build conditional workflows: + +```python +job_a = validate.delay(data) + +# Both branches depend on validation +job_success = on_valid.apply_async( + args=(data,), + depends_on=job_a.id, + metadata='{"branch": "success"}', +) + +# Use a separate task to handle the "validation failed" path +# by inspecting job_a's result in the task body +``` + + + `depends_on` is a lower-level primitive than chains, groups, and chords. + Use `depends_on` when you need fine-grained control over a custom DAG. Use + the workflow primitives when your pipeline fits a standard pattern. + + +## Combining with other features + +Dependencies compose naturally with other taskito features: + +```python +job = transform.apply_async( + args=(data,), + depends_on=job_a.id, + priority=10, # High priority once unblocked + queue="processing", # Target queue + max_retries=5, # Retry policy + delay=60, # Additional delay after deps resolve + unique_key="transform-daily", # Deduplication +) +``` + + + When both `delay` and `depends_on` are set, the job first waits for all + dependencies to complete, then waits for the additional delay before + becoming eligible for dispatch. + diff --git a/docs-next/content/docs/guides/advanced-execution/index.mdx b/docs-next/content/docs/guides/advanced-execution/index.mdx index c73957a..d0849f1 100644 --- a/docs-next/content/docs/guides/advanced-execution/index.mdx +++ b/docs-next/content/docs/guides/advanced-execution/index.mdx @@ -1,10 +1,15 @@ --- title: Advanced execution -description: "Async tasks, native async, prefork pool, batching." +description: "Patterns for scaling and optimizing task execution." --- -import { Callout } from 'fumadocs-ui/components/callout'; +Patterns for scaling and optimizing task execution. - - Content port pending. See the [Zensical source](https://github.com/ByteVeda/taskito/tree/master/docs) for current text. - +| Guide | Description | +|---|---| +| [Prefork Pool](/docs/guides/advanced-execution/prefork) | Process-based isolation for CPU-bound or memory-leaking tasks | +| [Native Async Tasks](/docs/guides/advanced-execution/async-tasks) | `async def` tasks with native event loop integration | +| [Result Streaming](/docs/guides/advanced-execution/streaming) | Stream partial results and progress updates in real time | +| [Dependencies](/docs/guides/advanced-execution/dependencies) | DAG-based job dependencies — run tasks in order | +| [Batch Enqueue](/docs/guides/advanced-execution/batch-enqueue) | Enqueue many jobs efficiently with `task.map()` and `enqueue_many()` | +| [Unique Tasks](/docs/guides/advanced-execution/unique-tasks) | Deduplicate active jobs with unique keys | diff --git a/docs-next/content/docs/guides/advanced-execution/meta.json b/docs-next/content/docs/guides/advanced-execution/meta.json index 0b863a5..f336679 100644 --- a/docs-next/content/docs/guides/advanced-execution/meta.json +++ b/docs-next/content/docs/guides/advanced-execution/meta.json @@ -1,4 +1,12 @@ { "title": "Advanced execution", - "pages": ["index"] + "pages": [ + "index", + "prefork", + "async-tasks", + "streaming", + "dependencies", + "batch-enqueue", + "unique-tasks" + ] } diff --git a/docs-next/content/docs/guides/advanced-execution/prefork.mdx b/docs-next/content/docs/guides/advanced-execution/prefork.mdx new file mode 100644 index 0000000..4427edb --- /dev/null +++ b/docs-next/content/docs/guides/advanced-execution/prefork.mdx @@ -0,0 +1,106 @@ +--- +title: Prefork Worker Pool +description: "Spawn child processes for true CPU parallelism — each child has its own GIL." +--- + +Spawn separate child processes for true CPU parallelism. Each child has its +own Python GIL, so CPU-bound tasks don't block each other. + +## When to use + +| Workload | Recommended pool | Why | +|----------|-----------------|-----| +| I/O-bound (HTTP calls, DB queries) | `thread` (default) | Threads release the GIL during I/O | +| CPU-bound (data processing, ML) | `prefork` | Each process owns its GIL | +| Mixed workloads | `prefork` | CPU tasks benefit; I/O tasks work fine too | + +## Getting started + +```python +queue = Queue(db_path="myapp.db", workers=4) +queue.run_worker(pool="prefork", app="myapp:queue") +``` + +```bash +taskito worker --app myapp:queue --pool prefork +``` + +The `app` parameter tells each child process how to import your Queue +instance. It must be an importable path in `module:attribute` format. + +## How it works + +(Rust)"] -->|"Job JSON"| P["PreforkPool"] + P -->|stdin| C1["Child 1
(own GIL)"] + P -->|stdin| C2["Child 2
(own GIL)"] + P -->|stdin| CN["Child N
(own GIL)"] + + C1 -->|stdout| R1["Reader 1"] + C2 -->|stdout| R2["Reader 2"] + CN -->|stdout| RN["Reader N"] + + R1 -->|JobResult| RCH["Result Channel"] + R2 -->|JobResult| RCH + RN -->|JobResult| RCH + + RCH --> ML["Result Handler"] + ML -->|"complete / retry / DLQ"| DB[("Storage")]`} +/> + +1. The Rust scheduler dequeues jobs from storage +2. `PreforkPool` serializes each job as JSON and writes it to the least-loaded child's stdin pipe +3. Each child deserializes the job, executes the task wrapper (with middleware, resources, proxies), and writes the result as JSON to stdout +4. Reader threads parse results and feed them back to the scheduler +5. The scheduler updates job status in storage + +## Configuration + +| Parameter | Type | Default | Description | +|-----------|------|---------|-------------| +| `pool` | `str` | `"thread"` | Set to `"prefork"` to enable | +| `app` | `str` | — | Import path to Queue instance (required) | +| `workers` | `int` | CPU count | Number of child processes | + +## Migrating from thread pool + +The thread pool is the default. To switch to prefork: + +```python +# Before (thread pool) +queue.run_worker() + +# After (prefork) +queue.run_worker(pool="prefork", app="myapp:queue") +``` + +Everything else stays the same — task decorators, middleware, resources, +events, and the scheduler all work identically. The only difference is where +task code executes (child process vs. worker thread). + +## Debugging child processes + +Children inherit the parent's stderr, so `print()` statements and Python +logging appear in the parent's terminal. + +Enable debug logging to see child lifecycle events: + +```python +import logging +logging.getLogger("taskito.prefork.child").setLevel(logging.DEBUG) +``` + +Log output includes: +- `child ready (app=..., pid=...)` — child initialized and waiting for jobs +- `executing task_name[job_id]` — job received (DEBUG level) +- `task task_name[job_id] failed: ...` — task error +- `shutdown received` — clean shutdown +- `resource teardown error` — resource cleanup failure + +## Limitations + +- **Tasks must be importable**: Each child process imports the app module independently. Tasks defined inside functions or closures cannot be imported. +- **No shared state**: Children are separate processes. In-memory caches, globals, or module-level state are not shared between children. +- **Startup cost**: Each child imports the full app module on start. This happens once per child, not per job. +- **Resource re-initialization**: Worker resources (DB connections, etc.) are initialized independently in each child. diff --git a/docs-next/content/docs/guides/advanced-execution/streaming.mdx b/docs-next/content/docs/guides/advanced-execution/streaming.mdx new file mode 100644 index 0000000..29a818a --- /dev/null +++ b/docs-next/content/docs/guides/advanced-execution/streaming.mdx @@ -0,0 +1,135 @@ +--- +title: Result Streaming +description: "Stream partial results from long-running tasks via current_job.publish() and job.stream()." +--- + +Stream intermediate results from long-running tasks. Instead of waiting for +the final result, consumers receive partial data as it becomes available. + +## Publishing partial results + +Inside a task, call `current_job.publish(data)` to emit a partial result: + +```python +from taskito import current_job + +@queue.task() +def process_batch(items): + results = [] + for i, item in enumerate(items): + result = process(item) + results.append(result) + current_job.publish({"item_id": item.id, "status": "ok", "result": result}) + current_job.update_progress(int((i + 1) / len(items) * 100)) + return {"total": len(items), "results": results} +``` + +`publish()` accepts any JSON-serializable value — dicts, lists, strings, numbers. + +## Consuming with `stream()` + +The caller iterates over partial results as they arrive: + +```python +job = process_batch.delay(items) + +for partial in job.stream(timeout=120, poll_interval=0.5): + print(f"Processed item {partial['item_id']}: {partial['status']}") + +# After stream ends, get the final result +final = job.result(timeout=5) +``` + +`stream()` polls the database for new partial results, yields each one, and +stops when the job reaches a terminal state (complete, failed, dead, +cancelled). + +| Parameter | Type | Default | Description | +|-----------|------|---------|-------------| +| `timeout` | `float` | `60.0` | Maximum seconds to wait | +| `poll_interval` | `float` | `0.5` | Seconds between polls | + +## Async streaming + +Use `astream()` in async contexts: + +```python +async for partial in job.astream(timeout=120, poll_interval=0.5): + print(f"Got: {partial}") +``` + +## FastAPI SSE + +The built-in FastAPI progress endpoint supports streaming partial results: + +``` +GET /jobs/{job_id}/progress?include_results=true +``` + +Events include partial results alongside progress: + +``` +data: {"status": "running", "progress": 25} +data: {"status": "running", "progress": 25, "partial_result": {"item_id": 1, "status": "ok"}} +data: {"status": "running", "progress": 50} +data: {"status": "complete", "progress": 100} +``` + +## Patterns + +### ETL pipeline + +```python +@queue.task() +def etl_pipeline(source_tables): + for table in source_tables: + rows = extract(table) + transformed = transform(rows) + load(transformed) + current_job.publish({ + "table": table, + "rows_processed": len(rows), + "status": "loaded", + }) + return {"tables": len(source_tables)} +``` + +### ML training + +```python +@queue.task() +def train_model(config): + model = build_model(config) + for epoch in range(config["epochs"]): + loss = train_epoch(model) + current_job.publish({ + "epoch": epoch + 1, + "loss": float(loss), + "lr": optimizer.param_groups[0]["lr"], + }) + save_model(model) + return {"final_loss": float(loss)} +``` + +### Batch processing with error tracking + +```python +@queue.task() +def process_orders(order_ids): + for oid in order_ids: + try: + process_order(oid) + current_job.publish({"order_id": oid, "status": "ok"}) + except Exception as e: + current_job.publish({"order_id": oid, "status": "error", "error": str(e)}) + return {"total": len(order_ids)} +``` + +## How it works + +`publish()` stores data as a task log entry with `level="result"`, reusing +the existing `task_logs` table. No new tables or Rust changes are needed. + +`stream()` polls `get_task_logs(job_id)`, filters for `level == "result"`, +tracks the last-seen timestamp, and yields only new entries. It stops when +the job's status becomes terminal. diff --git a/docs-next/content/docs/guides/advanced-execution/unique-tasks.mdx b/docs-next/content/docs/guides/advanced-execution/unique-tasks.mdx new file mode 100644 index 0000000..ef4c38b --- /dev/null +++ b/docs-next/content/docs/guides/advanced-execution/unique-tasks.mdx @@ -0,0 +1,27 @@ +--- +title: Unique Tasks +description: "Deduplicate active jobs via unique_key — partial unique index, atomic check-and-insert." +--- + +import { Callout } from "fumadocs-ui/components/callout"; + +Deduplicate active jobs by key — if a job with the same `unique_key` is +already pending or running, the existing job is returned instead of creating +a new one: + +```python +job1 = process.apply_async(args=("report",), unique_key="daily-report") +job2 = process.apply_async(args=("report",), unique_key="daily-report") +assert job1.id == job2.id # Same job, not duplicated +``` + +Once the original job completes (or fails to DLQ), the key is released and a +new job can be created with the same key. + + + Deduplication uses a partial unique index: + `CREATE UNIQUE INDEX ... ON jobs(unique_key) WHERE unique_key IS NOT NULL AND status IN (0, 1)`. + Only pending and running jobs participate. The check-and-insert is atomic + (transaction-protected), so concurrent calls with the same `unique_key` + are handled gracefully without race conditions. + diff --git a/docs-next/content/docs/guides/operations/deployment.mdx b/docs-next/content/docs/guides/operations/deployment.mdx new file mode 100644 index 0000000..ab05a51 --- /dev/null +++ b/docs-next/content/docs/guides/operations/deployment.mdx @@ -0,0 +1,380 @@ +--- +title: Deployment +description: "systemd, Docker, WAL mode, backups, multi-worker, sizing, production checklist." +--- + +import { Callout } from "fumadocs-ui/components/callout"; + +This guide covers running taskito in production environments. + +## SQLite file location + +Choose a persistent, backed-up location for your database: + +```python +queue = Queue(db_path="/var/lib/myapp/taskito.db") +``` + +**Best practices:** + +- Use an absolute path — relative paths depend on the working directory +- Place the database on local storage (not NFS or network mounts) — SQLite file locking doesn't work reliably over network filesystems +- Ensure the directory exists and the worker process has read/write permissions +- The database file, WAL file (`taskito.db-wal`), and shared memory file (`taskito.db-shm`) must all be on the same filesystem + +## systemd service + +Create `/etc/systemd/system/taskito-worker.service`: + +```ini +[Unit] +Description=taskito worker +After=network.target + +[Service] +Type=simple +User=myapp +Group=myapp +WorkingDirectory=/opt/myapp +ExecStart=/opt/myapp/.venv/bin/taskito worker --app myapp:queue +Restart=always +RestartSec=5 + +# Graceful shutdown — taskito handles SIGINT +KillSignal=SIGINT +TimeoutStopSec=35 + +# Environment +Environment=PYTHONPATH=/opt/myapp + +[Install] +WantedBy=multi-user.target +``` + +```bash +sudo systemctl daemon-reload +sudo systemctl enable taskito-worker +sudo systemctl start taskito-worker + +# Check logs +journalctl -u taskito-worker -f +``` + + + Set `TimeoutStopSec` to slightly longer than your longest task timeout + (default graceful shutdown is 30s). This gives in-flight tasks time to + complete before systemd force-kills the process. + + +## Docker + +### Dockerfile + +```dockerfile +FROM python:3.12-slim + +WORKDIR /app +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +COPY . . + +# Store the database in a volume +VOLUME /data +ENV TASKITO_DB_PATH=/data/taskito.db + +CMD ["taskito", "worker", "--app", "myapp:queue"] +``` + +### docker-compose.yml + +```yaml +services: + worker: + build: . + volumes: + - taskito-data:/data + stop_signal: SIGINT + stop_grace_period: 35s + + dashboard: + build: . + command: taskito dashboard --app myapp:queue --host 0.0.0.0 + volumes: + - taskito-data:/data + ports: + - "8080:8080" + +volumes: + taskito-data: +``` + + + The worker and dashboard must access the **same SQLite file**. In Docker, + use a named volume shared between containers. Do not use bind mounts on + network storage. + + +### Graceful shutdown in containers + +taskito handles `SIGINT` for graceful shutdown. Configure your container +orchestrator to send `SIGINT` (not `SIGTERM`): + +- **Docker Compose**: `stop_signal: SIGINT` +- **Kubernetes**: use a `preStop` hook or configure `STOPSIGNAL` in the Dockerfile: + +```dockerfile +STOPSIGNAL SIGINT +``` + +For Kubernetes, set `terminationGracePeriodSeconds` to match your longest +task timeout: + +```yaml +spec: + terminationGracePeriodSeconds: 60 + containers: + - name: worker + ... +``` + +## WAL mode and backups + +taskito uses SQLite in WAL (Write-Ahead Logging) mode for concurrent +read/write access. This affects how you back up the database. + +**Do NOT** simply copy the `.db` file while the worker is running — you may +get a corrupted backup if the WAL hasn't been checkpointed. + +**Safe backup methods:** + +```bash +# Option 1: Use sqlite3 .backup command (safe, online) +sqlite3 /var/lib/myapp/taskito.db ".backup /backups/taskito-$(date +%Y%m%d).db" + +# Option 2: Use the SQLite VACUUM INTO command +sqlite3 /var/lib/myapp/taskito.db "VACUUM INTO '/backups/taskito-$(date +%Y%m%d).db';" +``` + +Both methods are safe while the worker is running. + +## Postgres deployment + +If you're using the [Postgres backend](/docs/guides/operations/postgres), +deployment is simpler in several ways: + +- **No shared-file constraints** — workers connect over the network, no need for shared volumes or local storage +- **Multi-machine workers** — run workers on separate hosts against the same database +- **Standard backups** — use `pg_dump` instead of `sqlite3 .backup` + +### Docker Compose with Postgres + +```yaml +services: + postgres: + image: postgres:16 + environment: + POSTGRES_DB: myapp + POSTGRES_USER: taskito + POSTGRES_PASSWORD: secret + volumes: + - pgdata:/var/lib/postgresql/data + + worker: + build: . + environment: + TASKITO_BACKEND: postgres + TASKITO_DB_URL: postgresql://taskito:secret@postgres:5432/myapp + depends_on: + - postgres + stop_signal: SIGINT + stop_grace_period: 35s + +volumes: + pgdata: +``` + +### Postgres backups + +```bash +# Dump the taskito schema +pg_dump -h localhost -U taskito -d myapp -n taskito > backup.sql + +# Restore +psql -h localhost -U taskito -d myapp < backup.sql +``` + +See the [Postgres Backend guide](/docs/guides/operations/postgres) for full +configuration details. + +## Database maintenance + +### Auto-cleanup + +Set `result_ttl` to automatically purge old completed jobs: + +```python +queue = Queue( + db_path="/var/lib/myapp/taskito.db", + result_ttl=86400, # Purge completed/dead jobs older than 24 hours +) +``` + +### Manual cleanup + +```python +# Purge completed jobs older than 7 days +queue.purge_completed(older_than=604800) + +# Purge dead letters older than 30 days +queue.purge_dead(older_than=2592000) +``` + +### Database size + +SQLite databases grow as jobs accumulate. Without cleanup, expect roughly: + +- ~1 KB per job (metadata + small payloads) +- ~1-10 KB per job with large arguments or results + +With `result_ttl` set, the database stays compact. You can also periodically +run `VACUUM` to reclaim space: + +```bash +sqlite3 /var/lib/myapp/taskito.db "VACUUM;" +``` + + + `VACUUM` rewrites the entire database and requires exclusive access. Run + it during low-traffic periods or during a maintenance window. + + +## Monitoring in production + +### Dashboard + +Run the built-in dashboard alongside the worker: + +```bash +taskito dashboard --app myapp:queue --host 0.0.0.0 --port 8080 +``` + +Place it behind a reverse proxy with authentication for production use — +the dashboard has no built-in auth. + +### Programmatic stats + +Poll `queue.stats()` and export to your monitoring system: + +```python +import time + +def export_metrics(): + while True: + stats = queue.stats() + # Export to Prometheus, Datadog, StatsD, etc. + gauge("taskito.pending", stats["pending"]) + gauge("taskito.running", stats["running"]) + gauge("taskito.dead", stats["dead"]) + time.sleep(15) +``` + +### Hooks for alerting + +```python +@queue.on_failure +def alert_on_failure(task_name, args, kwargs, error): + # Send to PagerDuty, Slack, email, etc. + notify(f"Task {task_name} failed: {error}") +``` + +### Health check endpoint + +If you're using FastAPI: + +```python +from fastapi import FastAPI +from taskito.contrib.fastapi import TaskitoRouter + +app = FastAPI() +app.include_router(TaskitoRouter(queue), prefix="/tasks") + +# GET /tasks/stats returns queue health +# Use this as a health check endpoint in your load balancer +``` + +## Multiple workers + +taskito is designed as a **single-process** task queue when using SQLite. +Running multiple worker processes against the same SQLite file is possible +(WAL mode allows concurrent access), but: + +- Only one process can write at a time — this limits throughput +- SQLite lock contention increases with more writers +- There is no distributed coordination between workers + +For most single-machine workloads, one worker process with multiple threads +(the default) is sufficient: + +```python +queue = Queue( + db_path="myapp.db", + workers=8, # 8 OS threads in the worker pool +) +``` + +If you need distributed workers across multiple machines, use the +[Postgres backend](/docs/guides/operations/postgres) which removes the +single-writer constraint and supports multi-machine deployments. + +## SQLite scaling limits + +taskito uses SQLite as its storage backend. Understanding its limitations +helps you plan for production: + +**Single-writer constraint.** SQLite allows only one write transaction at a +time. WAL mode lets reads proceed concurrently with writes, but all writes +are serialized. This is the primary throughput ceiling. + +**Expected throughput.** On modern hardware with an SSD, expect: + +- **1,000–5,000 jobs/second** for enqueue + dequeue cycles (small payloads) +- Throughput decreases with larger payloads, complex queries, or spinning disks +- The connection pool size (default: 8) controls read concurrency — tune it based on your read/write ratio + +**When to upgrade to Postgres:** + +- You need multi-machine distributed workers +- You consistently exceed ~5,000 jobs/second sustained throughput +- Multiple processes contend heavily for writes (high lock wait times) +- You need sub-millisecond dequeue latency under high load + +taskito's [Postgres backend](/docs/guides/operations/postgres) addresses all +of these limitations while keeping the same API. + +## Sizing your deployment + +| Throughput | Backend | Workers | Pool | Notes | +|-----------|---------|---------|------|-------| +| < 100 jobs/s | SQLite | 4 | thread | Default config works fine | +| 100–1K jobs/s | SQLite | 8–16 | thread or prefork | Increase `workers`, monitor WAL size | +| 1K–5K jobs/s | SQLite | 16 | prefork | Prefork for CPU-bound; SQLite handles this well with WAL | +| 5K–20K jobs/s | Postgres | 16–32 | prefork | Switch to Postgres for concurrent writers | +| 20K–50K jobs/s | Postgres | 32+ | prefork | Multiple worker processes, tune `pool_size` | +| > 50K jobs/s | — | — | — | Consider Celery + RabbitMQ for this scale | + + + These are rough guidelines for noop tasks. Real throughput depends on task + duration, payload size, and I/O patterns. + + +## Checklist + +- [ ] Use an absolute path for `db_path` +- [ ] Place SQLite on local (not network) storage +- [ ] Set `result_ttl` to prevent unbounded database growth +- [ ] Set `timeout` on tasks to recover from worker crashes +- [ ] Configure `SIGINT` as the stop signal in your process manager +- [ ] Set up failure hooks or monitoring for alerting +- [ ] Back up the database using `sqlite3 .backup` (not file copy), or `pg_dump` for Postgres +- [ ] Place the dashboard behind a reverse proxy with authentication diff --git a/docs-next/content/docs/guides/operations/index.mdx b/docs-next/content/docs/guides/operations/index.mdx index 9328ba7..73456e9 100644 --- a/docs-next/content/docs/guides/operations/index.mdx +++ b/docs-next/content/docs/guides/operations/index.mdx @@ -1,10 +1,16 @@ --- title: Operations -description: "Workers, pause/resume, namespacing, scaling, deployment." +description: "Run taskito reliably in production — testing, deployment, scaling, migration." --- -import { Callout } from 'fumadocs-ui/components/callout'; +Run taskito reliably in production. - - Content port pending. See the [Zensical source](https://github.com/ByteVeda/taskito/tree/master/docs) for current text. - +| Guide | Description | +|---|---| +| [Testing](/docs/guides/operations/testing) | Test mode, fixtures, mocking resources, and workflow testing | +| [Job Management](/docs/guides/operations/job-management) | Cancel, pause, archive, revoke, replay, and clean up jobs | +| [Troubleshooting](/docs/guides/operations/troubleshooting) | Diagnose stuck jobs, lock contention, and worker issues | +| [Deployment](/docs/guides/operations/deployment) | systemd, Docker, WAL mode, Postgres, and production checklists | +| [KEDA Autoscaling](/docs/guides/operations/keda) | Kubernetes event-driven autoscaling for workers | +| [Postgres Backend](/docs/guides/operations/postgres) | Set up and run taskito with PostgreSQL | +| [Migrating from Celery](/docs/guides/operations/migration) | Side-by-side comparison and step-by-step migration guide | diff --git a/docs-next/content/docs/guides/operations/job-management.mdx b/docs-next/content/docs/guides/operations/job-management.mdx new file mode 100644 index 0000000..7618939 --- /dev/null +++ b/docs-next/content/docs/guides/operations/job-management.mdx @@ -0,0 +1,203 @@ +--- +title: Job Management +description: "Cancel, pause, archive, revoke, replay, purge, and clean up jobs." +--- + +import { Callout } from "fumadocs-ui/components/callout"; + +Manage running jobs — cancel, pause queues, archive, revoke, replay, and clean up. + +## Job cancellation + +Cancel a pending job before it starts: + +```python +job = send_email.delay("user@example.com", "Hello", "World") +cancelled = queue.cancel_job(job.id) # True if was pending +``` + +- Returns `True` if the job was pending and is now cancelled +- Returns `False` if the job was already running, completed, or in another non-pending state +- Cancelled jobs cannot be un-cancelled + +## Result TTL & auto-cleanup + +### Manual cleanup + +```python +# Purge completed jobs older than 1 hour +deleted = queue.purge_completed(older_than=3600) + +# Purge dead letters older than 24 hours +deleted = queue.purge_dead(older_than=86400) +``` + +### Automatic cleanup + +Set `result_ttl` on the Queue to automatically purge old jobs while the +worker runs: + +```python +queue = Queue( + db_path="myapp.db", + result_ttl=3600, # Auto-purge completed/dead jobs older than 1 hour +) +``` + +The scheduler checks every ~60 seconds and purges: + +- Completed jobs older than `result_ttl` +- Dead letter entries older than `result_ttl` +- Error history records older than `result_ttl` + +Set to `None` (default) to disable auto-cleanup. + +### Cascade cleanup + +When jobs are purged — either manually via `purge_completed()` or +automatically via `result_ttl` — their related child records are also deleted: + +- Error history (`job_errors`) +- Task logs (`task_logs`) +- Task metrics (`task_metrics`) +- Job dependencies (`job_dependencies`) +- Replay history (`replay_history`) + +This prevents orphaned records from accumulating when parent jobs are +removed. + +```python +# Manual purge — child records are cleaned up automatically +deleted = queue.purge_completed(older_than=3600) +print(f"Purged {deleted} jobs and their related records") + +# With per-job TTL — cascade cleanup still applies +job = resize_image.apply_async( + args=("photo.jpg",), + result_ttl=600, # This job's results expire after 10 minutes +) +# When this job is purged (after 10 min), its errors, logs, +# metrics, dependencies, and replay history are also removed. +``` + + + Dead letter entries are **not** cascade-deleted — they have their own + lifecycle managed by `purge_dead()`. Timestamp-based cleanup (`result_ttl`) + of error history, logs, and metrics also continues to run independently, + catching old records regardless of whether the parent job still exists. + + +## Queue pause/resume + +Temporarily pause job processing on a queue without stopping the worker: + +```python +# Pause the "emails" queue +queue.pause("emails") + +# Check which queues are paused +print(queue.paused_queues()) # ["emails"] + +# Resume processing +queue.resume("emails") +``` + +Paused queues still accept new jobs — they just won't be dequeued until +resumed. + +### Maintenance window example + +```python +# Before maintenance: pause all queues +for q in ["default", "emails", "reports"]: + queue.pause(q) +print(f"Paused: {queue.paused_queues()}") + +# ... perform maintenance ... + +# After maintenance: resume all queues +for q in ["default", "emails", "reports"]: + queue.resume(q) +``` + +## Job archival + +Move old completed jobs to an archive table to keep the main jobs table lean: + +```python +# Archive completed jobs older than 24 hours +archived_count = queue.archive(older_than=86400) +print(f"Archived {archived_count} jobs") + +# Browse archived jobs +archived = queue.list_archived(limit=50, offset=0) +for job in archived: + print(f"{job.id}: {job.task_name} ({job.status})") +``` + +Archived jobs are no longer returned by `queue.stats()` or +`queue.list_jobs()`, but remain queryable via `queue.list_archived()`. + +### Scheduled archival + +```python +@queue.periodic(cron="0 0 2 * * *") # Daily at 2 AM +def nightly_archival(): + archived = queue.archive(older_than=7 * 86400) # Archive jobs older than 7 days + current_job.log(f"Archived {archived} jobs") +``` + +## Task revocation + +Cancel all pending jobs for a specific task: + +```python +# Revoke all pending "send_newsletter" jobs +cancelled = queue.revoke_task("myapp.tasks.send_newsletter") +print(f"Revoked {cancelled} jobs") +``` + +## Queue purge + +Remove all pending jobs from a specific queue: + +```python +purged = queue.purge("emails") +print(f"Purged {purged} jobs from the emails queue") +``` + +## Job replay + +Replay a completed or dead job with the same arguments: + +```python +new_job = queue.replay(job_id) +print(f"Replayed as {new_job.id}") + +# Check replay history +history = queue.replay_history(job_id) +``` + +### Retry from dead letter with replay + +```python +# List dead letters and replay them +dead = queue.dead_letters() +for entry in dead: + print(f"Replaying dead job {entry['original_job_id']}: {entry['task_name']}") + new_id = queue.retry_dead(entry["id"]) + print(f" -> New job: {new_id}") +``` + +## SQLite configuration + +taskito configures SQLite for optimal performance: + +| Pragma | Value | Purpose | +|---|---|---| +| `journal_mode` | WAL | Concurrent reads during writes | +| `busy_timeout` | 5000ms | Wait instead of failing on lock contention | +| `synchronous` | NORMAL | Balance between safety and speed | +| `journal_size_limit` | 64MB | Prevent unbounded WAL growth | + +The connection pool uses up to 8 connections via `r2d2`. diff --git a/docs-next/content/docs/guides/operations/keda.mdx b/docs-next/content/docs/guides/operations/keda.mdx new file mode 100644 index 0000000..17950ec --- /dev/null +++ b/docs-next/content/docs/guides/operations/keda.mdx @@ -0,0 +1,215 @@ +--- +title: KEDA Autoscaling +description: "Kubernetes event-driven autoscaling for taskito workers via the bundled scaler server." +--- + +import { Callout } from "fumadocs-ui/components/callout"; + +[KEDA](https://keda.sh) (Kubernetes Event-driven Autoscaling) can scale your +taskito worker deployment up and down based on queue depth. taskito ships a +dedicated scaler server that KEDA queries directly. + +## Scaler server + +Start the scaler alongside your worker: + +```bash +taskito scaler --app myapp:queue --port 9091 +``` + +| Flag | Default | Description | +|---|---|---| +| `--app` | — | Python path to the `Queue` instance | +| `--host` | `0.0.0.0` | Bind address | +| `--port` | `9091` | Bind port | +| `--target-queue-depth` | `10` | Scaling target hint returned to KEDA | + +The scaler exposes three endpoints: + +| Endpoint | Description | +|---|---| +| `GET /api/scaler` | Returns current queue depth and scaling target for KEDA | +| `GET /metrics` | Prometheus text format (requires `prometheus-client`) | +| `GET /health` | Liveness check — always returns `{"status": "ok"}` | + +### `/api/scaler` response + +```json +{ + "metricValue": 42, + "targetValue": 10, + "queueName": "default" +} +``` + +Filter to a specific queue: + +``` +GET /api/scaler?queue=emails +``` + +### Programmatic usage + +```python +from taskito.scaler import serve_scaler + +serve_scaler(queue, host="0.0.0.0", port=9091, target_queue_depth=10) +``` + +## Kubernetes deployment + +Deploy the scaler as a separate `Deployment` and expose it as a `ClusterIP` +service: + +```yaml +apiVersion: apps/v1 +kind: Deployment +metadata: + name: taskito-scaler +spec: + replicas: 1 + selector: + matchLabels: + app: taskito-scaler + template: + metadata: + labels: + app: taskito-scaler + spec: + containers: + - name: scaler + image: your-image:latest + command: ["taskito", "scaler", "--app", "myapp:queue", "--port", "9091"] + ports: + - containerPort: 9091 +--- +apiVersion: v1 +kind: Service +metadata: + name: taskito-scaler +spec: + selector: + app: taskito-scaler + ports: + - port: 9091 + targetPort: 9091 +``` + +## ScaledObject (HTTP trigger) + +Scale a long-running worker `Deployment` based on pending job count: + +```yaml +apiVersion: keda.sh/v1alpha1 +kind: ScaledObject +metadata: + name: taskito-worker + namespace: default +spec: + scaleTargetRef: + name: taskito-worker # your worker Deployment name + pollingInterval: 15 # seconds between KEDA polls + cooldownPeriod: 60 # seconds before scaling to zero + minReplicaCount: 0 # scale to zero when idle + maxReplicaCount: 10 + triggers: + - type: metrics-api + metadata: + url: "http://taskito-scaler:9091/api/scaler" + valueLocation: "metricValue" + targetValue: "10" +``` + +Filter to a specific queue name: + +```yaml + url: "http://taskito-scaler:9091/api/scaler?queue=emails" +``` + +## ScaledJob (ephemeral batch workers) + +For batch/ETL workloads, use `ScaledJob` to create short-lived Kubernetes +Jobs — one pod per N pending tasks: + +```yaml +apiVersion: keda.sh/v1alpha1 +kind: ScaledJob +metadata: + name: taskito-batch-worker + namespace: default +spec: + jobTargetRef: + template: + spec: + containers: + - name: taskito-worker + image: your-image:latest + command: ["taskito", "worker", "--app", "myapp:queue"] + restartPolicy: Never + pollingInterval: 15 + successfulJobsHistoryLimit: 5 + failedJobsHistoryLimit: 5 + maxReplicaCount: 20 + scalingStrategy: + strategy: default # or "accurate" for 1:1 job-to-pod mapping + triggers: + - type: metrics-api + metadata: + url: "http://taskito-scaler:9091/api/scaler" + valueLocation: "metricValue" + targetValue: "5" # one pod per 5 pending jobs +``` + +## Scaling with Prometheus + +If you already have Prometheus scraping your workers, you can skip the scaler +server and use the Prometheus KEDA trigger directly: + +```yaml +apiVersion: keda.sh/v1alpha1 +kind: ScaledObject +metadata: + name: taskito-worker-prometheus + namespace: default +spec: + scaleTargetRef: + name: taskito-worker + pollingInterval: 15 + cooldownPeriod: 60 + minReplicaCount: 0 + maxReplicaCount: 10 + triggers: + - type: prometheus + metadata: + serverAddress: "http://prometheus:9090" + metricName: taskito_queue_depth + query: sum(taskito_queue_depth{queue="default"}) + threshold: "10" + - type: prometheus + metadata: + serverAddress: "http://prometheus:9090" + metricName: taskito_worker_utilization + query: taskito_worker_utilization{queue="default"} + threshold: "0.8" +``` + +See the [Prometheus integration](/docs/guides/integrations) for setting up +the metrics collector. + +## Deploy templates + +Ready-to-use YAML templates are included in the repository under +`deploy/keda/`: + +| File | Purpose | +|---|---| +| `scaled-object.yaml` | `ScaledObject` using the HTTP scaler endpoint | +| `scaled-object-prometheus.yaml` | `ScaledObject` using Prometheus metrics | +| `scaled-job.yaml` | `ScaledJob` for ephemeral batch workers | + + + When using SQLite, all worker replicas must share the same database + volume. For multi-replica Kubernetes deployments, use the + [Postgres backend](/docs/guides/operations/postgres) — workers connect + over the network and there's no shared-file constraint. + diff --git a/docs-next/content/docs/guides/operations/meta.json b/docs-next/content/docs/guides/operations/meta.json index 377a3d4..86c3419 100644 --- a/docs-next/content/docs/guides/operations/meta.json +++ b/docs-next/content/docs/guides/operations/meta.json @@ -1,4 +1,13 @@ { "title": "Operations", - "pages": ["index"] + "pages": [ + "index", + "testing", + "job-management", + "troubleshooting", + "deployment", + "keda", + "postgres", + "migration" + ] } diff --git a/docs-next/content/docs/guides/operations/migration.mdx b/docs-next/content/docs/guides/operations/migration.mdx new file mode 100644 index 0000000..4eefedd --- /dev/null +++ b/docs-next/content/docs/guides/operations/migration.mdx @@ -0,0 +1,359 @@ +--- +title: Migrating from Celery +description: "Side-by-side concept mapping and step-by-step migration from Celery to taskito." +--- + +import { Callout } from "fumadocs-ui/components/callout"; +import { Tab, Tabs } from "fumadocs-ui/components/tabs"; + +This guide maps Celery concepts to their taskito equivalents. If you're +coming from Celery, you'll find that most concepts translate directly — +with less infrastructure and simpler configuration. + +## Concept mapping + +| Celery | taskito | Notes | +|---|---|---| +| `Celery()` app | `Queue()` | No broker URL needed | +| `@app.task` | `@queue.task()` | Same decorator pattern | +| `.apply_async()` | `.apply_async()` | Same name, similar API | +| `.delay()` | `.delay()` | Identical | +| `AsyncResult` | `JobResult` | `.result()` instead of `.get()` | +| Canvas (`chain`, `group`, `chord`) | `chain`, `group`, `chord` | Same names, same concepts | +| `celery beat` | `@queue.periodic()` | Built-in, no separate process | +| Result backend (Redis/DB) | Built-in (SQLite) | No configuration needed | +| Broker (Redis/RabbitMQ) | Not needed | SQLite handles everything | +| `celery worker` | `taskito worker` | Similar CLI | +| `celery inspect` | `taskito info` | Similar CLI | + +## Side-by-side examples + +### App setup + + + + +```python +from celery import Celery + +app = Celery( + "myapp", + broker="redis://localhost:6379/0", + backend="redis://localhost:6379/1", +) +app.conf.task_serializer = "json" +app.conf.result_serializer = "json" +``` + + + + +```python +from taskito import Queue + +queue = Queue(db_path="myapp.db") +# That's it. No broker, no backend, no serializer config. +``` + + + + +### Task definition + + + + +```python +@app.task(bind=True, max_retries=3) +def send_email(self, to, subject, body): + try: + do_send(to, subject, body) + except SMTPError as exc: + raise self.retry(exc=exc, countdown=60) +``` + + + + +```python +@queue.task(max_retries=3, retry_backoff=2.0, retry_on=[SMTPError]) +def send_email(to, subject, body): + do_send(to, subject, body) + # Retries happen automatically on matching exceptions. + # Use retry_on/dont_retry_on for selective retries. +``` + + + + + + In Celery, you must explicitly catch exceptions and call `self.retry()`. In + taskito, any unhandled exception triggers a retry automatically (up to + `max_retries`). + + +### Enqueueing tasks + + + + +```python +# Simple +send_email.delay("user@example.com", "Hello", "World") + +# With options +send_email.apply_async( + args=("user@example.com", "Hello", "World"), + countdown=60, # delay in seconds + queue="emails", + priority=5, +) +``` + + + + +```python +# Simple +send_email.delay("user@example.com", "Hello", "World") + +# With options +send_email.apply_async( + args=("user@example.com", "Hello", "World"), + delay=60, # delay in seconds + queue="emails", + priority=5, +) +``` + + + + +The only change: `countdown` becomes `delay`. + +### Getting results + + + + +```python +result = send_email.delay("user@example.com", "Hi", "Body") + +# Block for result +value = result.get(timeout=30) + +# Check status +result.status # "PENDING", "SUCCESS", "FAILURE" +``` + + + + +```python +job = send_email.delay("user@example.com", "Hi", "Body") + +# Block for result +value = job.result(timeout=30) + +# Check status +job.status # "pending", "running", "complete", "failed", "dead" +``` + + + + +Key differences: +- `.get()` becomes `.result()` +- Status values are lowercase +- `"SUCCESS"` becomes `"complete"` + +### Workflows (canvas) + + + + +```python +from celery import chain, group, chord + +# Chain +chain(fetch.s(url), parse.s(), store.s()).apply_async() + +# Group +group(process.s(item) for item in items).apply_async() + +# Chord +chord( + [download.s(url) for url in urls], + merge.s() +).apply_async() +``` + + + + +```python +from taskito import chain, group, chord + +# Chain +chain(fetch.s(url), parse.s(), store.s()).apply() + +# Group +group(process.s(item) for item in items).apply() + +# Chord +chord( + [download.s(url) for url in urls], + merge.s() +).apply() +``` + + + + +Almost identical. The only change: `.apply_async()` becomes `.apply()`. + +### Periodic tasks + + + + +```python +# celery.py +app.conf.beat_schedule = { + "cleanup-every-hour": { + "task": "myapp.cleanup", + "schedule": crontab(minute=0), + }, +} + +# Requires a separate process: +# celery -A myapp beat +``` + + + + +```python +@queue.periodic(cron="0 0 * * * *") +def cleanup(): + ... + +# No separate process — the worker handles scheduling. +# taskito worker --app myapp:queue +``` + + + + + + taskito uses 6-field cron expressions (with seconds). Celery's `crontab()` + maps to the last 5 fields, with `0` prepended for seconds. + + | Celery `crontab()` | taskito cron | + |---|---| + | `crontab()` (every minute) | `0 * * * * *` | + | `crontab(minute=0)` (every hour) | `0 0 * * * *` | + | `crontab(minute=0, hour=0)` (daily) | `0 0 0 * * *` | + | `crontab(minute=30, hour=9, day_of_week='1-5')` | `0 30 9 * * 1-5` | + + +### Rate limiting + + + + +```python +@app.task(rate_limit="100/m") +def call_api(endpoint): + ... +``` + + + + +```python +@queue.task(rate_limit="100/m") +def call_api(endpoint): + ... +``` + + + + +Identical syntax. + +### Worker + + + + +```bash +celery -A myapp worker --loglevel=info -Q emails,default +``` + + + + +```bash +taskito worker --app myapp:queue --queues emails,default +``` + + + + +### Testing + + + + +```python +# Celery has CELERY_ALWAYS_EAGER mode +app.conf.task_always_eager = True +app.conf.task_eager_propagates = True + +result = add.delay(2, 3) +assert result.get() == 5 +``` + + + + +```python +with queue.test_mode() as results: + add.delay(2, 3) + assert results[0].return_value == 5 +``` + + + + +taskito's test mode uses a context manager instead of a global setting, so +it's safe to use in parallel test runs. + +## What taskito doesn't have + +Some Celery features don't have taskito equivalents: + +| Celery feature | Status in taskito | +|---|---| +| Distributed workers (multi-server) | Use Postgres backend | +| Message routing (exchanges, topics) | Use named queues instead | +| `celery multi` (process management) | Use systemd, supervisor, or Docker | +| Custom serializers (JSON, msgpack) | `JsonSerializer`, `CloudpickleSerializer` (default), or custom `Serializer` protocol | +| Task cancellation (mid-execution) | Cancel pending or running jobs (`cancel_running_job()` + `check_cancelled()`) | +| ETA (absolute datetime scheduling) | Use `delay` (relative seconds) | +| `bind=True` (self argument) | Use `current_job` context instead | +| Custom result backends | Built-in SQLite or Postgres | + +## Migration checklist + +- [ ] Replace `Celery()` with `Queue()` +- [ ] Change `@app.task` to `@queue.task()` +- [ ] Remove `self.retry()` calls — retries are automatic +- [ ] Change `.get()` to `.result()` on job results +- [ ] Change `countdown=` to `delay=` in `.apply_async()` +- [ ] Replace celery beat schedule with `@queue.periodic()` +- [ ] Update cron expressions to 6-field format (prepend seconds) +- [ ] Remove broker and result backend configuration +- [ ] Change `celery worker` to `taskito worker` in deployment scripts +- [ ] Replace `task_always_eager` with `queue.test_mode()` in tests diff --git a/docs-next/content/docs/guides/operations/postgres.mdx b/docs-next/content/docs/guides/operations/postgres.mdx new file mode 100644 index 0000000..47a4401 --- /dev/null +++ b/docs-next/content/docs/guides/operations/postgres.mdx @@ -0,0 +1,246 @@ +--- +title: Postgres Backend +description: "Multi-machine workers and concurrent writes with the PostgreSQL backend." +--- + +taskito supports PostgreSQL as an alternative storage backend for production +deployments that need multi-machine workers or higher write throughput. + +## When to use Postgres + +Choose Postgres over the default SQLite backend when you need: + +- **Multi-machine workers** — run workers on separate hosts against a shared database +- **Higher write throughput** — Postgres handles concurrent writes without SQLite's single-writer constraint +- **Existing Postgres infrastructure** — reuse your existing database server instead of managing SQLite files + +For single-machine workloads, SQLite remains the simpler choice — no +external dependencies required. + +## Installation + +```bash +pip install taskito[postgres] +``` + +## Configuration + +```python +from taskito import Queue + +queue = Queue( + backend="postgres", + db_url="postgresql://user:password@localhost:5432/myapp", + schema="taskito", # optional, default: "taskito" +) +``` + +| Parameter | Type | Default | Description | +|-----------|------|---------|-------------| +| `backend` | `str` | `"sqlite"` | Set to `"postgres"` or `"postgresql"` | +| `db_url` | `str` | `None` | PostgreSQL connection URL (required for Postgres) | +| `schema` | `str` | `"taskito"` | PostgreSQL schema for all tables | +| `workers` | `int` | `0` (auto) | Number of worker threads | + +All other `Queue` parameters (`default_retry`, `default_timeout`, +`default_priority`, `result_ttl`) work identically to the SQLite backend. + +## Django integration + +Configure the Postgres backend via Django settings: + +```python +# settings.py +TASKITO_BACKEND = "postgres" +TASKITO_DB_URL = "postgresql://user:password@localhost:5432/myapp" +TASKITO_SCHEMA = "taskito" +``` + +Then use the Django integration as normal: + +```python +from taskito.contrib.django.settings import get_queue + +queue = get_queue() +``` + +All Django settings: + +| Setting | Default | Description | +|---------|---------|-------------| +| `TASKITO_BACKEND` | `"sqlite"` | Storage backend (`"sqlite"` or `"postgres"`) | +| `TASKITO_DB_URL` | `None` | PostgreSQL connection URL | +| `TASKITO_SCHEMA` | `"taskito"` | PostgreSQL schema name | +| `TASKITO_DB_PATH` | `".taskito/taskito.db"` | SQLite database path (ignored with Postgres) | +| `TASKITO_WORKERS` | `0` | Worker thread count (0 = auto-detect) | +| `TASKITO_DEFAULT_RETRY` | `3` | Default max retries | +| `TASKITO_DEFAULT_TIMEOUT` | `300` | Default task timeout in seconds | +| `TASKITO_DEFAULT_PRIORITY` | `0` | Default task priority | +| `TASKITO_RESULT_TTL` | `None` | Result TTL in seconds | + +## Schema isolation + +taskito creates all tables inside a dedicated PostgreSQL schema (default: +`taskito`). The schema is created automatically if it doesn't exist. + +```python +# Use a custom schema +queue = Queue(backend="postgres", db_url="postgresql://...", schema="myapp_tasks") +``` + +Schema names must contain only alphanumeric characters and underscores. +Invalid names raise a `ConfigError` at startup. + +This lets you run multiple independent taskito instances in the same +database by using different schemas, or keep taskito tables separate from +your application tables. + +## Connection pooling + +The Postgres backend uses Diesel's `r2d2` connection pool with a default +size of **10 connections**. Each connection has the `search_path` set to +the configured schema on acquisition. + +The pool size is configured at the Rust layer. For most workloads, the +default of 10 connections is sufficient. + +## Migrations + +Migrations run automatically on first connection. taskito creates the +following **11 tables** inside the configured schema: + +| Table | Purpose | +|-------|---------| +| `jobs` | Core job storage | +| `dead_letter` | Dead letter queue | +| `rate_limits` | Token bucket rate limiting state | +| `periodic_tasks` | Cron-scheduled task definitions | +| `job_errors` | Per-attempt error tracking | +| `job_dependencies` | Task dependency edges | +| `task_metrics` | Execution time and memory metrics | +| `replay_history` | Job replay audit trail | +| `task_logs` | Structured task log entries | +| `circuit_breakers` | Circuit breaker state | +| `workers` | Worker heartbeat tracking | + +All tables use PostgreSQL-native types (`TEXT`, `BYTEA`, `BIGINT`, +`BOOLEAN`, `DOUBLE PRECISION`) rather than SQLite-compatible types. + +## Differences from SQLite + +| Aspect | SQLite | Postgres | +|--------|--------|----------| +| Connection model | Embedded, file-based | Client/server, networked | +| Write concurrency | Single writer (WAL mode) | Multiple concurrent writers | +| Distribution | Single machine only | Multi-machine workers | +| Setup | Zero config, bundled | Requires Postgres server | +| Connection pool default | 8 connections | 10 connections | +| Schema isolation | N/A (file per database) | Custom PostgreSQL schema | +| Tables | 6 tables | 11 tables (additional: `job_dependencies`, `task_metrics`, `replay_history`, `task_logs`, `circuit_breakers`) | +| Backup | `sqlite3 .backup` | `pg_dump` | + +## Deployment + +### Docker Compose + +```yaml +services: + postgres: + image: postgres:16 + environment: + POSTGRES_DB: myapp + POSTGRES_USER: taskito + POSTGRES_PASSWORD: secret + volumes: + - pgdata:/var/lib/postgresql/data + ports: + - "5432:5432" + + worker: + build: . + environment: + TASKITO_BACKEND: postgres + TASKITO_DB_URL: postgresql://taskito:secret@postgres:5432/myapp + depends_on: + - postgres + stop_signal: SIGINT + stop_grace_period: 35s + + dashboard: + build: . + command: taskito dashboard --app myapp:queue --host 0.0.0.0 + environment: + TASKITO_BACKEND: postgres + TASKITO_DB_URL: postgresql://taskito:secret@postgres:5432/myapp + depends_on: + - postgres + ports: + - "8080:8080" + +volumes: + pgdata: +``` + +With Postgres, there are no shared-file constraints — workers and dashboard +connect over the network. You can run multiple worker containers across +different hosts. + +### systemd + +```ini +[Unit] +Description=taskito worker +After=network.target postgresql.service + +[Service] +Type=simple +User=myapp +Group=myapp +WorkingDirectory=/opt/myapp +ExecStart=/opt/myapp/.venv/bin/taskito worker --app myapp:queue +Restart=always +RestartSec=5 +KillSignal=SIGINT +TimeoutStopSec=35 + +Environment=PYTHONPATH=/opt/myapp +Environment=TASKITO_BACKEND=postgres +Environment=TASKITO_DB_URL=postgresql://taskito:secret@db.internal:5432/myapp + +[Install] +WantedBy=multi-user.target +``` + +### Multi-machine workers + +With Postgres, you can run workers on multiple machines. Each worker +connects to the same database and coordinates through PostgreSQL's row-level +locking: + +```bash +# Machine 1 +taskito worker --app myapp:queue + +# Machine 2 +taskito worker --app myapp:queue + +# Machine 3 +taskito worker --app myapp:queue +``` + +All workers share the same job queue and dequeue work atomically. + +## Backups + +Use standard PostgreSQL backup tools instead of SQLite-specific commands: + +```bash +# Dump the taskito schema +pg_dump -h localhost -U taskito -d myapp -n taskito > backup.sql + +# Restore +psql -h localhost -U taskito -d myapp < backup.sql +``` + +For continuous backups, use PostgreSQL's built-in WAL archiving or a tool +like [pgBackRest](https://pgbackrest.org/). diff --git a/docs-next/content/docs/guides/operations/testing.mdx b/docs-next/content/docs/guides/operations/testing.mdx new file mode 100644 index 0000000..fa61abc --- /dev/null +++ b/docs-next/content/docs/guides/operations/testing.mdx @@ -0,0 +1,438 @@ +--- +title: Testing +description: "Test mode, TestResult, MockResource, fixtures, and workflow testing." +--- + +import { Callout } from "fumadocs-ui/components/callout"; + +taskito includes a built-in test mode that runs tasks **synchronously** in +the calling thread — no worker, no Rust scheduler, no SQLite. This makes +tests fast, deterministic, and easy to write. + +## Quick example + +```python +from taskito import Queue + +queue = Queue() + +@queue.task() +def add(a: int, b: int) -> int: + return a + b + +def test_add(): + with queue.test_mode() as results: + add.delay(2, 3) + + assert len(results) == 1 + assert results[0].return_value == 5 + assert results[0].succeeded +``` + +## How it works + +When you enter `queue.test_mode()`, taskito patches the `enqueue()` method +so that every `.delay()` or `.apply_async()` call: + +1. Looks up the task function in the registry +2. Calls it immediately in the current thread +3. Captures the return value (or exception) in a `TestResult` +4. Appends the result to the `TestResults` list + +No database is created. No worker threads are spawned. Tasks execute +eagerly and synchronously. + +## `queue.test_mode()` + +```python +with queue.test_mode(propagate_errors=False, resources=None) as results: + # tasks run synchronously here + ... +``` + +| Parameter | Type | Default | Description | +|---|---|---|---| +| `propagate_errors` | `bool` | `False` | If `True`, task exceptions are re-raised immediately instead of being captured in `TestResult.error` | +| `resources` | `dict[str, Any] \| None` | `None` | Map of resource name → mock instance or `MockResource` for injection. See [Resource System](/docs/guides/resources). | + +The context manager yields a `TestResults` list that accumulates results as +tasks execute. + +## `TestResult` + +Each executed task produces a `TestResult`: + +```python +with queue.test_mode() as results: + add.delay(2, 3) + + r = results[0] + r.job_id # "test-000001" + r.task_name # "mymodule.add" + r.args # (2, 3) + r.kwargs # {} + r.return_value # 5 + r.error # None + r.traceback # None + r.succeeded # True + r.failed # False +``` + +| Attribute | Type | Description | +|---|---|---| +| `job_id` | `str` | Synthetic ID like `"test-000001"` | +| `task_name` | `str` | Fully qualified task name | +| `args` | `tuple` | Positional arguments passed to the task | +| `kwargs` | `dict` | Keyword arguments passed to the task | +| `return_value` | `Any` | Return value on success, `None` on failure | +| `error` | `Exception \| None` | The exception if the task failed | +| `traceback` | `str \| None` | Formatted traceback if the task failed | +| `succeeded` | `bool` | `True` if no error | +| `failed` | `bool` | `True` if an error occurred | + +## `TestResults` + +`TestResults` is a list of `TestResult` with convenience methods: + +```python +with queue.test_mode() as results: + add.delay(2, 3) + failing_task.delay() + add.delay(10, 20) + + # Filter by outcome + results.succeeded # TestResults with 2 items + results.failed # TestResults with 1 item + + # Filter by task name + results.filter(task_name="mymodule.add") # 2 items + + # Combine filters + results.filter(task_name="mymodule.add", succeeded=True) # 2 items +``` + +### `.filter()` + +```python +results.filter(task_name=None, succeeded=None) -> TestResults +``` + +| Parameter | Type | Description | +|---|---|---| +| `task_name` | `str \| None` | Filter by exact task name | +| `succeeded` | `bool \| None` | `True` for successes, `False` for failures | + +## Testing failures + +By default, task exceptions are captured — not raised: + +```python +@queue.task() +def risky(): + raise ValueError("something broke") + +def test_failure_captured(): + with queue.test_mode() as results: + risky.delay() + + assert len(results) == 1 + assert results[0].failed + assert isinstance(results[0].error, ValueError) + assert "something broke" in str(results[0].error) + assert results[0].traceback is not None +``` + +### Propagating errors + +Use `propagate_errors=True` when you want exceptions to bubble up: + +```python +def test_failure_propagated(): + with queue.test_mode(propagate_errors=True) as results: + with pytest.raises(ValueError, match="something broke"): + risky.delay() +``` + +## Testing workflows + +Chains, groups, and chords work in test mode because they call `enqueue()` +internally, which is intercepted by the test mode patch. + +### Chains + +```python +from taskito import chain + +@queue.task() +def double(n: int) -> int: + return n * 2 + +@queue.task() +def add_ten(n: int) -> int: + return n + 10 + +def test_chain(): + with queue.test_mode() as results: + chain(double.s(5), add_ten.s()).apply() + + assert len(results) == 2 + assert results[0].return_value == 10 # double(5) + assert results[1].return_value == 20 # add_ten(10) +``` + +### Groups + +```python +from taskito import group + +def test_group(): + with queue.test_mode() as results: + group(double.s(1), double.s(2), double.s(3)).apply() + + assert len(results) == 3 + values = [r.return_value for r in results] + assert values == [2, 4, 6] +``` + +## Job context in tests + +`current_job` works inside test mode. The context is set up before each +task runs: + +```python +from taskito import current_job + +@queue.task() +def context_aware(): + return { + "job_id": current_job.id, + "task_name": current_job.task_name, + "retry_count": current_job.retry_count, + "queue_name": current_job.queue_name, + } + +def test_context(): + with queue.test_mode() as results: + context_aware.delay() + + ctx = results[0].return_value + assert ctx["job_id"].startswith("test-") + assert ctx["retry_count"] == 0 + assert ctx["queue_name"] == "default" +``` + +## Pytest integration + +### Fixture pattern + +Create a reusable fixture for test mode: + +```python +# conftest.py +import pytest +from myapp import queue + +@pytest.fixture +def task_results(): + with queue.test_mode() as results: + yield results + +# test_tasks.py +def test_add(task_results): + add.delay(2, 3) + assert task_results[0].return_value == 5 + +def test_email(task_results): + send_email.delay("user@example.com", "Hello", "World") + assert task_results[0].succeeded +``` + +### Fixture with error propagation + +```python +@pytest.fixture +def strict_tasks(): + with queue.test_mode(propagate_errors=True) as results: + yield results +``` + +### Testing async code + +Test mode works with async test functions — the tasks still execute +synchronously: + +```python +import pytest + +@pytest.mark.asyncio +async def test_async_enqueue(task_results): + add.delay(1, 2) + assert task_results[0].return_value == 3 +``` + +## Testing with worker resources + +If your tasks use [worker resources](/docs/guides/resources) (injected via +`inject=` or `Inject["name"]`), pass mock instances through `resources=`: + +```python +from unittest.mock import MagicMock + +@queue.worker_resource("db") +def create_db(): + return real_sessionmaker + +@queue.task(inject=["db"]) +def create_user(name: str, db): + session = db() + session.add(User(name=name)) + session.commit() + +def test_create_user(): + mock_db = MagicMock() + + with queue.test_mode(resources={"db": mock_db}) as results: + create_user.delay("Alice") + + assert results[0].succeeded + mock_db.return_value.add.assert_called_once() +``` + +### `MockResource` + +`MockResource` adds call tracking to a mock value: + +```python +from taskito import MockResource + +spy = MockResource("db", wraps=real_db, track_calls=True) + +with queue.test_mode(resources={"db": spy}) as results: + create_user.delay("Alice") + +assert spy.call_count == 1 +assert results[0].succeeded +``` + +| Parameter | Type | Description | +|---|---|---| +| `name` | `str` | Resource name (informational). | +| `return_value` | `Any` | Value returned when the resource is accessed. | +| `wraps` | `Any` | Wrap a real object — returned as-is when accessed. | +| `track_calls` | `bool` | Increment `call_count` each access. | + +#### `return_value` vs `wraps` + +Use `return_value` when you want a simple stub: + +```python +mock_cache = MockResource("cache", return_value={"key": "value"}) +``` + +Use `wraps` when you need the real object but want call tracking: + +```python +real_db = create_test_database() +spy_db = MockResource("db", wraps=real_db, track_calls=True) +``` + +#### Multiple resources + +Pass multiple resources to `test_mode`: + +```python +with queue.test_mode(resources={ + "db": MockResource("db", return_value=mock_db), + "cache": MockResource("cache", return_value={}), + "mailer": MockResource("mailer", return_value=mock_smtp), +}) as results: + process_order.delay(order_id=123) +``` + +#### Testing with `inject` + +Tasks that use `@queue.task(inject=["db"])` receive the mock resource +automatically: + +```python +@queue.task(inject=["db"]) +def create_user(name, db=None): + db.execute("INSERT INTO users (name) VALUES (?)", (name,)) + +mock_db = MagicMock() +with queue.test_mode(resources={"db": mock_db}) as results: + create_user.delay("Alice") + +assert results[0].succeeded +mock_db.execute.assert_called_once() +``` + + + When `resources=` is provided, proxy reconstruction is bypassed + automatically. Proxy markers in arguments are passed through as-is so + tests don't fail due to missing files or network connections. + + +## What test mode does NOT cover + +Test mode is designed for **unit and integration testing** of task logic. +It does not exercise: + +- SQLite storage or queries +- Retry/backoff scheduling +- Rate limiting +- Timeout reaping +- Worker thread pool dispatch +- Priority ordering + +For end-to-end tests that exercise the full Rust scheduler, run a real +worker in a background thread: + +```python +import threading +import time + +def test_e2e(): + queue_e2e = Queue(db_path=":memory:") + + @queue_e2e.task() + def add(a, b): + return a + b + + t = threading.Thread(target=queue_e2e.run_worker, daemon=True) + t.start() + + job = add.delay(2, 3) + result = job.result(timeout=10) + assert result == 5 +``` + + + Per-task and queue-level `TaskMiddleware` hooks (`before`, `after`, + `on_retry`) **do fire** in test mode, since they run in the Python wrapper + around your task function. This lets you verify middleware behavior in + tests without running a real worker. + + +## Running tests locally + +```bash +# Rust tests +cargo test --workspace + +# Rebuild the Python extension after Rust changes +uv run maturin develop + +# Python tests +uv run python -m pytest tests/python/ -v + +# Linting +uv run ruff check py_src/ tests/ +uv run mypy py_src/taskito/ --no-incremental +``` + +To build with native async support: + +```bash +uv run maturin develop --features native-async +``` diff --git a/docs-next/content/docs/guides/operations/troubleshooting.mdx b/docs-next/content/docs/guides/operations/troubleshooting.mdx new file mode 100644 index 0000000..6268dc5 --- /dev/null +++ b/docs-next/content/docs/guides/operations/troubleshooting.mdx @@ -0,0 +1,259 @@ +--- +title: Troubleshooting +description: "Diagnose stuck jobs, unresponsive workers, growing databases, latency, and missing tasks." +--- + +import { Callout } from "fumadocs-ui/components/callout"; + +Common issues and how to fix them. + +## Jobs stuck in running + +**Symptom**: jobs stay in `running` status long after they should have finished. + +**Diagnosis**: the worker process that picked up the job crashed before +marking it complete. + +```python +# Check how many jobs are stuck +stats = queue.stats() +print(stats) # {'running': 47, 'pending': 0, ...} + +# See which jobs are stuck +stuck = queue.list_jobs(status="running", limit=20) +for job in stuck: + d = job.to_dict() + print(f"{d['id']} | {d['task_name']} | started {d['started_at']}") +``` + +**Fix**: the stale reaper handles this automatically — it detects jobs that +have exceeded their `timeout_ms` and retries them. If a job has no timeout +set, it stays stuck forever. + +To recover a stuck job manually: + +```python +import time + +# Mark the job as failed so it retries +queue._inner.retry(job_id, int(time.time() * 1000)) +``` + +To prevent this in future, always set a timeout on production tasks: + +```python +@queue.task(timeout=300) # 5 minutes max +def process_data(payload): + ... +``` + + + Jobs without `timeout_ms` are never reaped. The stale reaper only detects + jobs that have exceeded their deadline. + + +## Worker is unresponsive + +**Symptom**: worker process is alive but not processing jobs. Heartbeat is stale. + +**Diagnosis**: check worker status via the heartbeat API. + +```python +workers = queue.workers() +for w in workers: + print(f"{w['worker_id']}: {w['status']} (last seen: {w['last_heartbeat']})") +``` + +**Possible causes**: + +1. **GIL-bound CPU task**: a long-running CPU task is holding the GIL, + blocking the scheduler thread from dispatching new jobs. The scheduler + runs in Rust, but it still needs the GIL to call Python functions. + + Fix: switch to the prefork pool for CPU-bound tasks. + + ```bash + taskito worker --app myapp:queue --pool prefork + ``` + +2. **Deadlock**: a task is waiting on a resource held by another task in the + same worker. Check for circular waits in your task code. + +3. **Infinite loop**: a task is looping without yielding. Add a timeout to + detect this: + + ```python + @queue.task(timeout=60) + def risky_task(): + ... + ``` + +## Database growing too large + +**Symptom**: the SQLite file keeps growing; disk space is filling up. + +**Diagnosis**: completed job records and their result payloads are accumulating. + +**Fix**: set `result_ttl` to auto-purge old results. + +```python +queue = Queue( + db_path="myapp.db", + result_ttl=86400, # Purge completed/dead jobs older than 24 hours +) +``` + +Manually purge existing backlog: + +```python +# Purge completed jobs older than 7 days +queue.purge_completed(older_than=604800) + +# Purge dead-lettered jobs older than 30 days +queue.purge_dead(older_than=2592000) +``` + +After purging, reclaim disk space: + +```bash +sqlite3 myapp.db "VACUUM;" +``` + + + `VACUUM` rewrites the entire database and requires exclusive access. Run + it during low-traffic periods. + + +## High job latency + +**Symptom**: jobs sit in `pending` for longer than expected before starting. + +**Diagnosis**: check the queue depth and scheduler configuration. + +```python +stats = queue.stats() +print(f"Pending: {stats['pending']}, Running: {stats['running']}") +``` + +**Possible causes and fixes**: + +1. **Scheduler poll interval too high**: default is 50ms. Jobs can wait up + to one poll interval before being picked up. + + ```python + queue = Queue(scheduler_poll_interval_ms=10) # Poll every 10ms + ``` + + Lower values increase CPU/DB usage. Balance based on your latency + requirements. + +2. **Not enough workers**: all workers are busy. Increase the worker count. + + ```python + queue = Queue(workers=16) + ``` + +3. **Rate limiting**: the task or queue has a rate limit active. + + ```python + # Check if rate limiting is the culprit + # Rate-limited jobs are rescheduled 1 second into the future + pending = queue.list_jobs(status="pending", limit=10) + for job in pending: + print(job.to_dict()["scheduled_at"]) + ``` + +4. **Database performance**: slow dequeue queries. Check SQLite WAL size or + Postgres query plans. + +## Memory usage growing + +**Symptom**: worker process memory climbs over time. + +**Causes**: + +1. **Large result payloads**: task return values are stored in the database + but also held in the scheduler's result buffer briefly. If tasks return + large objects (images, dataframes), memory spikes. + + Fix: return a reference (file path, object key) instead of the data itself. + + ```python + # Bad — large result stored in memory and DB + @queue.task() + def process_image(path: str) -> bytes: + return open(path, "rb").read() + + # Good — return a path + @queue.task() + def process_image(path: str) -> str: + out = path + ".processed" + # ... write output to out ... + return out + ``` + +2. **Accumulated job records**: without `result_ttl`, the database grows + unbounded. See "Database growing too large" above. + +3. **Resource leaks in tasks**: a task opens a file or connection and never + closes it. Use context managers. + +## Periodic task running twice + +**Symptom**: a periodic task fires more than once per interval, or appears +to run on two workers simultaneously. + +**Behavior**: this is safe by design. Periodic tasks use `unique_key` +deduplication — when a periodic task is due, each worker's scheduler checks +and tries to enqueue it, but only one enqueue succeeds because the +`unique_key` constraint prevents duplicates. + +If you see two completed jobs for the same periodic task in the same +interval, check: + +```python +# Look for duplicate completions +jobs = queue.list_jobs(status="complete", limit=50) +periodic_jobs = [j for j in jobs if "daily_report" in j.to_dict()["task_name"]] +for j in periodic_jobs: + print(j.to_dict()["completed_at"]) +``` + +If you're genuinely seeing duplicate execution, ensure all workers use the +same database (same SQLite file path or same Postgres DSN). + +## Task not found in worker + +**Symptom**: worker logs `TaskNotFound` or jobs fail with an error like +`unknown task: myapp.tasks.process`. + +**Cause**: the task name registered at enqueue time doesn't match what the +worker has registered. + +Task names default to `module.function_name`. If you enqueue from one module +path and run the worker with a different import path, the names won't match. + +**Diagnosis**: + +```python +# Check the task name stored in the job +job = queue.get_job(job_id) +print(job.to_dict()["task_name"]) # e.g. "myapp.tasks.process" + +# Check what the worker has registered +# (add this temporarily to your worker startup) +print(list(queue._task_registry.keys())) +``` + +**Fix**: use consistent import paths. If the task is `myapp/tasks.py:process`, +always import it as `myapp.tasks.process` — not `tasks.process` (relative) +or `src.myapp.tasks.process` (with src prefix). + +You can also set an explicit name to decouple the task name from the module +path: + +```python +@queue.task(name="process-data") +def process(payload): + ... +```